友盟数据平台承担了大量 APP 的统计分析任务,为开发者计算 APP 的新增/活跃用户、用户留存/沉默、用户访问路径、自定义事件等丰富、多维、 精准的统计分析指标,并在友盟的海量数据上进行深度挖掘。 由于友盟服务的开发者众多,很多优秀的 APP 也用户量巨大,为了能够全面、准确 并且及时的为开发者提供数据,数据平台面临了很大的挑战。
Lambda Architecture
在友盟,整个移动大数据平台架构基本符合 Nathan Marz 和 James Warren 在 Big Data 一书中介绍的 Lambda Architecture。 这个架构设计清晰合理,融合了大数据处理系统必修的数据不可变性、 读写分离、计算和服务隔离等特性,拥有低延迟、易维护、可扩展、高容错的特点。 我所在的数据平台团队位于整个架构中的 Batch Layer, 主要负责全量数据的离线计算; Speed Layer 则由我们的另一个团队完成,负责实时数据的计算。 实时和离线的计算结果一起,组成了开发者在友盟后台看到的数据报表。
在 Nathan 的架构中,Batch Layer 有下面几个重要的特征:
- 接收并保存全量的数据,数据只追加不修改,在全量数据上进行计算;
- 对计算的实时性要求不高,但要求计算系统吞吐量大,横向扩展性强;
- 故障情况下,或增加新功能的情况下,任务可重入,数据结果可重算;
在友盟数据平台当前的实现里,我们写了大量的 MapReduce 任务来实现 Batch Layer 的计算功能,这些任务周期性地运行在我们 500+ 物理节点的 Hadoop 集群上, 每天消化掉压缩后 7TB+ 的新增日志,为超过 64 万 App 提供各种统计指标的计算。 经过多次升级,数据平台目前采用成熟的 Hadoop on Yarn 的架构, 集群中还部署了 HBase 作为存储服务,也支持 Pig 和 Hive 这两个上层组件, 技术和非技术人员都可以进行快速数据分析。
关于数据平台的架构,磊叔之前在 "友盟移动大数据平台的架构与实践"和 "浅谈移动应用分析平台中的开源系统实践" 等文中已经有详细的介绍,这里就不多说了。 本文后面主要从数据平台目前 MapReduce 任务的组织部署的优缺点展开, 简单聊一聊 MapReduce 计算模型的限制,和我们对另一个计算模型/框架 Spark 的使用。
MapReduce
MapReduce 计算模型简单粗暴有效。这个计算模型适用于这种场景:
- 数据量大:和整个集群的内存量相比数据量巨大,数据不能全部加载进内存;
- 单次:数据流经算法一遍就输出结果,不需要在同一数据集上反复迭代;
在我们的平台中,日常指标从逻辑上看由多个 MapReduce 任务组(Job Group)构成。 每个任务组包含多个 MapReduce 任务,负责完成逻辑上的一轮日志处理, 组内的任务根据互相之间的依赖构成一个有向无环图。 每组任务有一或多个开始任务,经过中间任务的几轮处理,最终有一或多个结束任务。 任务组之间也存在依赖关系或嵌套关系,所有的任务组在一起构成几个大的任务图, 图中的任务组可能以不同的周期重复。 目前我们的任务有小时级、天级、周级和月级几个常用周期,也有其他一些自定义周期, 大的周期内通常包含多个小周期的任务组。
这些大任务图从整体上看,其输入是友盟的原始日志、历史纪录和 Meta 信息等, 输出是多种计算指标。 其中前后依赖的 MapReduce 任务产生很多中间结果,多数作为文件存放在 HDFS 上, 而最终的指标结果大部分放在 HBase 中,供后面的查询服务层提取和服务。
采用这种任务安排有不少明显的优点。 首先,按照业务逻辑将 MapReduce 任务分组,依赖关系清晰,方便维护和升级; 其次,不同任务之间除了数据依赖之外,基本是独立的单元,在任务失败或修 Bug 补数据 需要重跑时,只需要沿着数据流,重启下游相应的任务即可; 最后,各个中间任务产生了大量的中间数据集,这些数据集虽然很多不是永久保存, 但也经常被用于数据验证、问题排查、新 Feature 开发、数据挖掘、数据报告分析等。
另一方面,采用这种任务安排,我们在实践中也遇到不少问题, 很多问题和 MapReduce 计算模型本身的限制是密切相关的。 一个 MapReduce 任务中,最多只能有一种 Map 计算 (Multiple Input 中 Map 输入虽然不同,但输出 (K, V) 相同,认为是同一种 Map), 以及一种 Reduce 计算,Map 和 Reduce 之间由 Shuffle 过程连接, 实现把同类数据按照 Key 聚合,并分发到多个节点计算和汇总结果的算法。
我们的指标计算很多,不少还很复杂,因此很多业务逻辑都需要一连串的 MapReduce 任务来计算。对于计算链条当中的大部分中间任务,它们都需要从 HDFS 上读取文件, 经过 Map, Shuffle, Reduce,产出的结果再写回到 HDFS。 因此,一个业务逻辑的计算中,需要许多 MapReduce 任务反复读写文件系统。 同时,每个 MapReduce 任务内部,Shuffle 两端的数据都要在本地磁盘上暂存, IO 开销比较大,这也是为什么 MapReduce 任务设计中,有效的利用 Combiner 能够大幅度提高任务运行速度的一个原因 (另一个更重要的原因是 Combiner 减少了需要 Shuffle 的数据量, 从而大大减少了网络 IO)。
除了我们这种复杂计算的业务场景,很多数据挖掘和机器学习的算法需要大量的循环迭代, 如果使用一连串的 MapReduce 任务来实现,巨大的磁盘 IO 开销会严重影响任务执行效率。 因此,在一些适用的场景下,我们目前也在使用 Spark 进行数据分析, 一部分是新业务,一部分则是替代 MapReduce 并不十分适合的旧任务。
Spark
Spark 当前风头正劲。 这个计算框架的特性包括:快速的数据处理,简明有效的数据模型和计算模型, 友好易上手的 API 和开发环境 (像是开发单机程序一般的体验,还有交互式的 Spark Shell 这种开发调试利器), 丰富的系统套件(如方便易用的 Spark SQL,机器学习和图计算支持,Streaming)等。 而且 Spark 支持 Standalone、Mesos 和 Yarn 多种运行模式, Spark on Yarn 和 Hadoop 生态系统结合紧密, 在 Hadoop 平台上耕耘已久的友盟数据平台很早就有数据挖掘工程师开始尝试 Spark, 团队几位先行者也多次进行内部技术交流,目前 Spark 已经在生产环境中成功运行, 另有一些新的项目正在开发中。
Spark 中的核心概念是 RDD (Resilient Distributed Datasets),以及围绕 RDD 的 Transformations 和 Actions 操作。 RDD 是对 Spark 中数据的逻辑抽象, 可以理解为分布在集群中各节点上、可以被并行处理的数据集片段, 而 Transformations 则可以理解为对 RDD 中数据做出一种转换,这种转换会产生新的 RDD。 因此,可以将连续的 Transformations 串起来对数据做多次转换, 最终由 Action 操作产生数据处理结果,也就是最终的 Value。 RDD 这种数据模型的设计和其上的操作,较之 MapReduce 模型中单一的 Map 和 Reduce, 是一种更灵活的数据处理模型。
Transformations 操作是惰性的,只有在进行 Action 操作时, Spark 才会回溯要进行这次 Action 依赖的所有 Transformations, 从创建初始 RDD 开始逐步计算。 采用惰性计算,Spark 可以对整个计算流程进行必要的优化,以加快运行速度; 同时,Spark 的 RDD 中包含了血缘关系信息,当出现节点失效造成 RDD 丢失时, Spark 可以依照血缘关系再造 RDD,从而容忍集群中的机器故障; 最后,Spark 支持对处理过程中产生的 RDD 进行持久化处理,即 persist() 和 cache(), persist() 分为多个级别,如持久化到 JVM 内存、磁盘或者 Tachyon 等外部存储, 而 cache() 则等效于 persist() 到 JVM 内存这个级别。 通过持久化的功能,一个 RDD 可以被多次利用,从而不必每次都重新产生, 这在需要在同一数据集上反复迭代的算法中非常实用。
此外在 Spark 中,RDD 之间的依赖有窄依赖和宽依赖之分。 简言之,一个 RDD 中的数据片段可以直接经过转换生成新的 RDD 时, 两个 RDD 之间即为窄依赖; 而如果需要多个 RDD 中的数据汇总到一起产生新的 RDD 时,新的 RDD 对老 RDDs 的依赖 即为宽依赖。 不难看出,在生成对上层 RDDs 有宽依赖的 RDD 时,需要在集群中个节点间进行数据 Shuffle, 因此在写 Spark 程序时,要仔细处理宽依赖,减少 Shuffle 的数据量。
RDD 本质上是一个内存数据集,使得 Spark 在处理数据时可以充分利用内存,因此速度很快。 如果数据量巨大而集群内存不够,导致必须要使用到磁盘时,Spark 的性能就会大打折扣。 在集群资源有限的情况下,就要求被处理的数据集大小适应整个集群的内存限制, 因此 Spark 更适合于对不那么"大"的数据进行快速地、需要反复迭代的处理。 而 MapReduce 计算模型则对内存要求没有这么严格,可以处理更大规模、不能适配内存大小的数据。 由于对内存的高要求,在集群混布的情况下,Spark 可能会由于内存资源不足而影响性能发挥, 因此需要对集群资源和 Spark 上运行的任务进行合理的规划, 对不能够 Fit 进集群内存的数据进行计算,目前还是使用 MapReduce 来处理更加可靠。
除了进行数据批处理计算,使用 Spark 还可以构建 Streaming 系统, 多数据源聚合分析系统,数据仓库等。 其中 Streaming 目前已经在我们数据平台有一个线上项目,运行良好, 相关技术希望在不久的将来由这个项目的作者亲自为大家撰文分享, 我们会在友盟数据平台非官方微信公众号 TheFortyTwo 上进行分享。 与此同时,数据平台的同学们也都在继续深入学习、利用 Spark, 以构建更稳健、高效的数据平台, 相关技术也会在 TheFortyTwo、友盟官微(umengcom)等渠道和大家分享。