百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Flink 实时去重方案

csdh11 2024-11-30 14:14 4 浏览

实时去重,一直都是实时领域常见的需求,但是同时也是一个难点。

在本场 Chat 中,会基于 Flink 提供不同的去重方案,深入分析每一种方案的使用方式,并且提供代码参考,会讲到如下内容:

  1. MapState 方式去重
  2. SQL 方式去重
  3. HyperLogLog 方式去重
  4. HyperLogLog 去重优化
  5. bitmap 精确去重

去重计算是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过 distinct 的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。本篇将会基于 Flink 讲解不同的实现方案:

  • MapState 方式去重
  • SQL 方式去重
  • HyperLogLog 方式去重
  • Bitmap 精确去重

下面将以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位 ID、用户设备 ID(idfa/imei/cookie)、点击时间。

MapState 方式去重

MapState 是 Flink 中 KeyedState 的状态类型,这种方式实现去重是一个精确的去重结果,将设备 ID 保存在 MapState 中。

实现步骤分析

  1. 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分
  2. 数据分组使用广告位 ID+点击事件所属的小时
  3. 选择 processFunction 来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量
  4. 计算完成之后的数据清理,按照时间进度注册定时器清理

实现流程

广告数据

case class AdData(id:Int,devId:String,time:Long)

分组数据

case class AdKey(id:Int,time:Long)

主流程

val env=StreamExecutionEnvironment.getExecutionEnvironment  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    val kafkaConfig=new Properties()    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)    val ds=env.addSource(consumer)      .map(x=>{        val s=x.split(",")        AdData(s(0).toInt,s(1),s(2).toLong)      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {      override def extractTimestamp(element: AdData): Long = element.time    })      .keyBy(x=>{        val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,          Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds        AdKey(x.id,endTime)      })

指定时间时间属性,这里设置允许 1min 的延时,可根据实际情况调整;时间的转换选择 TimeWindow.getWindowStartWithOffset Flink 在处理 window 中自带的方法,使用起来很方便,第一个参数 表示数据时间,第二个参数 offset 偏移量,默认为 0,正常窗口划分都是整点方式,例如从 0 开始划分,这个 offset 就是相对于 0 的偏移量,第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位 ID 作为分组的 Key。

去重逻辑

自定义 Distinct1ProcessFunction 继承了 KeyedProcessFunction, 方便起见使用输出类型使用 Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后在输出。

定义两个状态:MapState,key 表示 devId, value 表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用 rocksdb 作为 statebackend, 那么会将 mapstate 中 key 作为 rocksdb 中 key 的一部分,mapstate 中 value 作为 rocksdb 中的 value, rocksdb 中 value 大小是有上限的,这种方式可以减少 rocksdb value 的大小;另外一个 ValueState,存储当前 MapState 的数据量,是由于 mapstate 只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。

class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {  var devIdState: MapState[String, Int] = _  var devIdStateDesc: MapStateDescriptor[String, Int] = _  var countState: ValueState[Long] = _  var countStateDesc: ValueStateDescriptor[Long] = _  override def open(parameters: Configuration): Unit = {    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))    devIdState = getRuntimeContext.getMapState(devIdStateDesc)    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))    countState = getRuntimeContext.getState(countStateDesc)  }  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {    val currW=ctx.timerService().currentWatermark()    if(ctx.getCurrentKey.time+1<=currW) {        println("late data:" + value)        return      }    val devId = value.devId    devIdState.get(devId) match {      case 1 => {        //表示已经存在      }      case _ => {        //表示不存在        devIdState.put(devId, 1)        val c = countState.value()        countState.update(c + 1)        //还需要注册一个定时器        ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)      }    }    println(countState.value())  }  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {    println(timestamp + " exec clean~~~")    println(countState.value())    devIdState.clear()    countState.clear()  }}

数据清理通过注册定时器方式 ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1) 表示当 watermark 大于该小时结束时间+1 就会执行清理动作,调用 onTimer 方法。

在处理逻辑里面加了

val currW=ctx.timerService().currentWatermark()if(ctx.getCurrentKey.time+1<=currW){        println("late data:" + value)        return  }

主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果,做了一个类似 window 机制里面的一个延时判断,将延时的数据过滤掉,也可以使用 OutputTag 单独处理。

SQL 方式去重

上一节中介绍了使用编码方式完成去重,但是这种方式开发周期比较长,我们可能需要针对不同的业务逻辑实现不同的编码,对于业务开发来说也需要熟悉 Flink 编码,也会增加相应的成本,我们更多希望能够以 sql 的方式提供给业务开发完成自己的去重逻辑。

为了与离线分析保持一致的分析语义,Flink SQL 中提供了 distinct 去重方式,使用方式:

SELECT DISTINCT devId FROM pv

表示对设备 ID 进行去重,得到一个明细结果,那么我们在使用 distinct 来统计去重结果通常有两种方式, 仍然以统计网站 uv 为例。

第一种方式

SELECT datatime,count(DISTINCT devId) FROM pv group by datatime

该语义表示计算网页每日的 uv 数量,其内部核心实现主要依靠 DistinctAccumulator 与 CountAccumulator,DistinctAccumulator 内部包含一个 map 结构,key 表示的是 distinct 的字段,value 表示重复的计数,CountAccumulator 就是一个计数器的作用,这两部分都是作为动态生成聚合函数的中间结果 accumulator,透过之前的聚合函数的分析可知中间结果是存储在状态里面的,也就是容错并且具有一致性语义的其处理流程是:

  1. 将 devId 添加到对应的 DistinctAccumulator 对象中,首先会判断 map 中是否存在该 devId, 不存在则插入 map 中并且将对应 value 记 1,并且返回 True;存在则将对应的 value+1 更新到 map 中,并且返回 False
  2. 只有当返回 True 时才会对 CountAccumulator 做累加 1 的操作,以此达到计数目的

第二种方式

select count(*),datatime from(      select distinct devId,datatime from pv ) a            group by datatime

内部是一个对 devId,datatime 进行 distinct 的计算,在 flink 内部会转换为以 devId,datatime 进行分组的流并且进行聚合操作,在内部会动态生成一个聚合函数,该聚合函 createAccumulators 方法生成的是一个 Row(0) 的 accumulator 对象,其 accumulate 方法是一个空实现,也就是该聚合函数每次聚合之后返回的结果都是 Row(0),通过之前对 sql 中聚合函数的分析(可查看 GroupAggProcessFunction 函数源码), 如果聚合函数处理前后得到的值相同那么可能会不发送该条结果也可能发送一条撤回一条新增的结果,但是其最终的效果是不会影响下游计算的,在这里我们简单理解为在处理相同的 devId,datatime 不会向下游发送数据即可,也就是每一对 devId,datatime 只会向下游发送一次数据。

外部就是一个简单的按照时间维度的计数计算,由于内部每一组 devId,datatime 只会发送一次数据到外部,那么外部对应 datatime 维度的每一个 devId 都是唯一的一次计数,得到的结果就是我们需要的去重计数结果。

两种方式对比

  1. 这两种方式最终都能得到相同的结果,但是经过分析其在内部实现上差异还是比较大,第一种在分组上选择 datatime ,内部使用的累加器 DistinctAccumulator 每一个 datatime 都会与之对应一个对象,在该维度上所有的设备 id, 都会存储在该累加器对象的 map 中,而第二种选择首先细化分组,使用 datatime+devId 分开存储,然后外部使用时间维度进行计数,简单归纳就是:
  • 第一种: datatime->Value{devI1,devId2..}
  • 第二种: datatime+devId->row(0)聚合函数中 accumulator 是存储在 ValueState 中的,第二种方式的 key 会比第一种方式数量上多很多,但是其 ValueState 占用空间却小很多,而在实际中我们通常会选择 Rocksdb 方式作为状态后端,rocksdb 中 value 大小是有上限的,第一种方式很容易到达上限,那么使用第二种方式会更加合适。
  1. 这两种方式都是全量保存设备数据的,会消耗很大的存储空间,但是我们的计算通常是带有时间属性的,那么可以通过配置 StreamQueryConfig 设置状态 ttl。

HyperLogLog 方式去重

HyperLogLog 算法 也就是基数估计统计算法,预估一个集合中不同数据的个数,也就是我们常说的去重统计,在 redis 中也存在 hyperloglog 类型的结构,能够使用 12k 的内存,允许误差在 0.81%的情况下统计 2^64 个数据,在这种大数据量情况下能够减少存储空间的消耗,但是前提是允许存在一定的误差。关于 HyperLogLog 算法原理可以参考这篇文章:https://www.jianshu.com/p/55defda6dcd2 里面做了详细的介绍,其算法实现在开源 java 流式计算库 stream-lib 提供了其具体实现代码。测试一下其使用效果,准备了 97320 不同数据:

public static void main(String[] args) throws Exception{        String filePath = "000000_0";        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));        Set<String> values =new HashSet<>();        HyperLogLog logLog=new HyperLogLog(0.01); //允许误差        String line = "";        while ((line = br.readLine()) != null) {            String[] s = line.split(",");            String uuid = s[0];            values.add(uuid);            logLog.offer(uuid);        }        long rs=logLog.cardinality();    }

当误差值为 0.01 时; rs 为 98228,需要内存大小 int[1366] //内部数据结构当误差值为 0.001 时;rs 为 97304 ,需要内存大小 int[174763]误差越小也就越来越接近其真实数据,但是在这个过程中需要的内存也就越来越大,这个取舍可根据实际情况决定。

实现

在开发中更多希望通过 sql 方式来完成,那么就将 hll 与 udaf 结合起来使用,实现代码如下:

public class HLLDistinctFunction extends AggregateFunction<Long,HyperLogLog> {    @Override public HyperLogLog createAccumulator() {        return new HyperLogLog(0.001);    }    public void accumulate(HyperLogLog hll,String id){      hll.offer(id);    }    @Override public Long getValue(HyperLogLog accumulator) {        return accumulator.cardinality();    }}

定义的返回类型是 long 也就是去重的结果,accumulator 是一个 HyperLogLog 类型的结构。测试:

case class AdData(id:Int,devId:String,datatime:Long)object Distinct1 {  def main(args: Array[String]): Unit = {    val env=StreamExecutionEnvironment.getExecutionEnvironment    val tabEnv=StreamTableEnvironment.create(env)    tabEnv.registerFunction("hllDistinct",new HLLDistinctFunction)    val kafkaConfig=new Properties()   kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")    val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)    consumer.setStartFromLatest()    val ds=env.addSource(consumer)      .map(x=>{        val s=x.split(",")        AdData(s(0).toInt,s(1),s(2).toLong)      })    tabEnv.registerDataStream("pv",ds)    val rs=tabEnv.sqlQuery(      """ select hllDistinct(devId) ,datatime                                          from pv group by datatime      """.stripMargin)    rs.writeToSink(new PaulRetractStreamTableSink)    env.execute()  }}

准备测试数据:

1,devId1,15778080000001,devId2,15778080000001,devId1,1577808000000

得到结果:

4> (true,1,1577808000000)4> (false,1,1577808000000)4> (true,2,1577808000000)

优化

在 HyperLogLog 去重实现中,如果要求误差在 0.001 以内,那么就需要 1048576 个 int, 也就是会消耗 4M 的存储空间,但是在实际使用中有很多的维度的统计是达不到这个数据量,那么可以在这里做一个优化,优化方式是:初始 HyperLogLog 内部使用存储是一个 set 集合,当 set 大小达到了指定大小(1048576)就转换为 HyperLogLog 存储方式。这种方式可以有效减小内存消耗。实现代码:

public class OptimizationHyperLogLog {    //hyperloglog 结构    private HyperLogLog hyperLogLog;    //初始的一个 set    private Set<Integer> set;    private double rsd;    //hyperloglog 的桶个数,主要内存占用    private int bucket;    public OptimizationHyperLogLog(double rsd){        this.rsd=rsd;        this.bucket=1 << HyperLogLog.log2m(rsd);        set=new HashSet<>();             }   //插入一条数据    public void offer(Object object){        final int x = MurmurHash.hash(object);        int currSize=set.size();        if(hyperLogLog==null && currSize+1>bucket){            //升级为 hyperloglog           hyperLogLog=new HyperLogLog(rsd);           for(int d: set){               hyperLogLog.offerHashed(d);           }           set.clear();        }        if(hyperLogLog!=null){            hyperLogLog.offerHashed(x);        }else {            set.add(x);        }    }    //获取大小    public long cardinality() {      if(hyperLogLog!=null) return hyperLogLog.cardinality();      return set.size();    }}

初始化:入参同样是一个允许的误差范围值 rsd,计算出 hyperloglog 需要桶的个数 bucket,也就需要是 int 数组大小,并且初始化一个 set 集合 hashset;

数据插入:使用与 hyperloglog 同样的方式将插入数据转 hash, 判断当前集合的大小+1 是否达到了 bucket,不满足则直接添加到 set 中,满足则将 set 里面数据转移到 hyperloglog 对象中并且清空 set, 后续数据将会被添加到 hyperloglog 中;

Bitmap 精确去重

在前面提到的精确去重方案都是会保存全量的数据,但是这种方式是以牺牲存储为代价的,而 hyperloglog 方式虽然减少了存储但是损失了精度,那么如何能够做到精确去重又能不消耗太多的存储呢,接下来讲解如何使用 bitmap 做精确去重。

ID-mapping

在使用 bitmap 去重需要将去重的 id 转换为一串数字,但是我们去重的通常是一串包含字符的字符串例如设备 ID,那么第一步需要将字符串转换为数字,首先可能想到对字符串做 hash,但是 hash 是会存在概率冲突的,那么可以使用美团开源的 leaf 分布式唯一自增 ID 算法,也可以使用 Twitter 开源的 snowflake 分布式唯一 ID 雪花算法,我们选择了实现相对较为方便的 snowflake 算法,代码如下:

public class SnowFlake {    /**     * 起始的时间戳     */    private final static long START_STMP = 1480166465731L;    /**     * 每一部分占用的位数     */    private final static long SEQUENCE_BIT = 12; //序列号占用的位数    private final static long MACHINE_BIT = 5;   //机器标识占用的位数    private final static long DATACENTER_BIT = 5;//数据中心占用的位数    /**     * 每一部分的最大值     */    private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);    private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);    private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);    /**     * 每一部分向左的位移     */    private final static long MACHINE_LEFT = SEQUENCE_BIT;    private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;    private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;    private long datacenterId;  //数据中心    private long machineId;     //机器标识    private long sequence = 0L; //序列号    private long lastStmp = -1L;//上一次时间戳    public SnowFlake(long datacenterId, long machineId) {        if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {            throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");        }        if (machineId > MAX_MACHINE_NUM || machineId < 0) {            throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");        }        this.datacenterId = datacenterId;        this.machineId = machineId;    }    /**     * 产生下一个 ID     *     * @return     */    public synchronized long nextId() {        long currStmp = getNewstmp();        if (currStmp < lastStmp) {            throw new RuntimeException("Clock moved backwards.  Refusing to generate id");        }        if (currStmp == lastStmp) {            //相同毫秒内,序列号自增            sequence = (sequence + 1) & MAX_SEQUENCE;            //同一毫秒的序列数已经达到最大            if (sequence == 0L) {                currStmp = getNextMill();            }        } else {            //不同毫秒内,序列号置为 0            sequence = 0L;        }        lastStmp = currStmp;        return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分                | datacenterId << DATACENTER_LEFT       //数据中心部分                | machineId << MACHINE_LEFT             //机器标识部分                | sequence;                             //序列号部分    }    private long getNextMill() {        long mill = getNewstmp();        while (mill <= lastStmp) {            mill = getNewstmp();        }        return mill;    }    private long getNewstmp() {        return System.currentTimeMillis();    }}

snowflake 算法的实现是与机器码以及时间有关的,为了保证其高可用做了两个机器码不同的对外提供的服务。首先从 Hbase 中查询是否有 UID 对应的 ID,如果有则直接获取,如果没有则会调用 ID-Mapping 服务,然后将其对应关系存储到 Hbase 中,最后返回 ID 至下游处理。

UDF 化

为了方便提供业务方使用,同样需要将其封装成为 UDF, 由于 snowflake 算法得到的是一个长整型,因此选择了 Roaring64NavgabelMap 作为存储对象,由于去重是按照维度来计算,所以使用 UDAF,首先定义一个 accumulator:

public class PreciseAccumulator{    private Roaring64NavigableMap bitmap;    public PreciseAccumulator(){        bitmap=new Roaring64NavigableMap();    }    public void add(long id){        bitmap.addLong(id);    }    public long getCardinality(){        return bitmap.getLongCardinality();    }}

udaf 实现

public class PreciseDistinct extends AggregateFunction<Long, PreciseAccumulator> {    @Override public PreciseAccumulator createAccumulator() {        return new PreciseAccumulator();    }    public void accumulate(PreciseAccumulator accumulator,long id){        accumulator.add(id);    }    @Override public Long getValue(PreciseAccumulator accumulator) {        return accumulator.getCardinality();    }}

那么在实际使用中只需要注册 udaf 即可。

阅读全文: http://gitbook.cn/gitchat/activity/5e4a6cc21d77a21bea92ab5a

相关推荐

Micheal Nielsen&#39;s神经网络学习之二

依然是跟着MichaelNielsen的神经网络学习,基于前一篇的学习,已经大概明白了神经网络的基本结构和BP算法,也能通过神经网络训练数字识别功能,之后我试验了一下使用神经网络训练之前的文本分类,...

CocoaPods + XCTest进行单元测试 c单元测试工具

在使用XCTest进行单元测试时,我们经常会遇到一些CocoaPods中的开源框架的调用,比如“Realm”或“Alamofire”在测试的时候,如果配置不当,会导致“frameworknotfo...

Java基础知识回顾第四篇 java基础讲解

1、&和&&的区别作为逻辑运算符:&(不管左边是什么,右边都参与运算),&&(如果左边为false,右边则不参与运算,短路)另外&可作为位运算符...

项目中的流程及类似业务的设计模式总结

说到业务流程,可能是我做过的项目中涉及业务最多的一个方面了。除了在流程设计之外,在一些考核系统、产业审批、还有很多地方,都用到相似的设计思路,在此一并总结一下。再说到模式,并不是因为流行才用这个词,而...

联想三款显示器首批获得 Eyesafe Certified 2.0 认证

IT之家7月31日消息,据外媒报道,三款全新联想显示器是全球首批满足EyesafeCertified2.0的设备。据报道,联想获得EyesafeCertified2.0认证的显...

maven的生命周期,插件介绍(二) 一个典型的maven构建生命周期

1.maven生命周期一个完整的项目构建过程通常包括清理、编译、测试、打包、集成测试、验证、部署等步骤,Maven从中抽取了一套完善的、易扩展的生命周期。Maven的生命周期是抽象的,其中的具体任务都...

多线程(3)-基于Object的线程等待与唤醒

概述在使用synchronized进行线程同步中介绍了依赖对象锁定线程,本篇文章介绍如何依赖对象协调线程。同synchronized悲观锁一样,线程本身不能等待与唤醒,也是需要对象才能完成等待与唤醒的...

jquery mobile + 百度地图 + phonegap 写的一个&quot;校园助手&quot;的app

1jquerymobile+百度地图+phonegap写的一个"校园助手"的app,使用的是基于Flat-UI的jQueryMobile,请参考:https://github.com/...

Apache 服务启动不了 apache系统服务启动不了

{我是新手,从未遇到此问题,请各位大大勿喷}事由:今天早上上班突然发现公司网站出现问题。经过排查,发现是Apache出现问题。首先检查配置文件没有出问题后,启动服务发现Apache服务能启动,但是没法...

健康债和技术债都不能欠 公众号: 我是攻城师(woshigcs)

在Solr4.4之后,Solr提供了SolrCloud分布式集群的模式,它带来的主要好处是:(1)大数据量下更高的性能(2)更好扩展性(3)更高的可靠性(4)更简单易用什么时候应该使用Sol...

Eye Experience怎么用?HTC告诉你 eyebeam怎么用

IT之家(www.ithome.com):EyeExperience怎么用?HTC告诉你HTC上周除了发布HTCDesireEYE自拍机和HTCRE管状运动相机之外,还发布了一系列新的智能手机...

Android系统应用隐藏和应用禁止卸载

1、应用隐藏与禁用Android设置中的应用管理器提供了一个功能,就是【应用停用】功能,这是针对某些系统应用的。当应用停用之后,应用的图标会被隐藏,但apk还是存在,不会删除,核心接口就是Packag...

计算机软件技术分享--赠人玫瑰,手遗余香

一、Netty介绍Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty...

Gecco爬虫框架的线程和队列模型 爬虫通用框架

简述爬虫在抓取一个页面后一般有两个任务,一个是解析页面内容,一个是将需要继续抓取的url放入队列继续抓取。因此,当爬取的网页很多的情况下,待抓取url的管理也是爬虫框架需要解决的问题。本文主要说的是g...

一点感悟(一) 初识 初读感知的意思

时间过得很快,在IT业已从业了两年多。人这一辈子到底需要什么,在路边看着人来人往,大部分人脸上都是很匆忙。上海真是一个魔都,它有魅力,有底蕴,但是一个外地人在这里扎根置业,真的是举全家之力,还贷3...