2025-06-02

Spark分布式运行原理深度解析

在大数据时代,Apache Spark 以其高速计算、易用编程模型和丰富生态圈成为主流分布式计算框架之一。要深入理解 Spark 的强大能力,必须从其分布式运行机制出发,掌握 Driver、Executor、Cluster Manager、DAG 调度、Shuffle 机制、容错设计等核心原理。本文将从 Spark 核心架构开始,结合代码示例与 ASCII 图解,逐步剖析 Spark 在集群环境中的任务提交、调度、执行到结果返回的全过程,帮助你快速学会并深入掌握 Spark 分布式运行原理。


目录

  1. Spark 概述与核心组件
  2. RDD 与 DAG 依赖关系
  3. Job 提交到任务执行流程
    3.1. Driver 启动与 SparkContext 初始化
    3.2. 作业(Job)划分与阶段(Stage)生成
    3.3. Task 集合与 TaskScheduler 调度
  4. Executor 与 Task 执行
    4.1. Executor 启动机制
    4.2. Task 计算流程与代码序列化
    4.3. 累加器(Accumulator)与广播变量(Broadcast)
  5. Shuffle 过程与优化
    5.1. Shuffle 原理与中间文件存储
    5.2. Map 端与 Reduce 端交互
    5.3. Sort-Based Shuffle 与 Hash-Based Shuffle
    5.4. Shuffle 性能优化建议
  6. 容错机制与数据重算(Lineage)
    6.1. DAG 的弹性分布式容错思路
    6.2. Task 失败重试与 Executor 失效恢复
    6.3. Checkpoint 与外部存储持久化
  7. 代码示例:WordCount 与 Join
    7.1. 基本 WordCount 示例(Scala)
    7.2. 带 GroupByKey 的示例(演示 Shuffle)
    7.3. RDD Cache 与广播变量在 Join 中的示例
  8. 图解:Spark 分布式运行架构
    8.1. Driver 与 Cluster Manager 通信流程
    8.2. Task 调度与 Shuffle 流程示意图
  9. 总结与最佳实践

1. Spark 概述与核心组件

Spark 是一个通用的分布式数据处理引擎,核心设计围绕弹性分布式数据集(Resilient Distributed Dataset,RDD)。相比传统 MapReduce,Spark 在内存中计算、DAG 调度、迭代性能等方面具有显著优势。

1.1. 核心组件

  1. Driver Program

    • 负责整个 Spark 应用程序的生命周期:从创建 SparkContext 开始,到提交作业、监控、收集结果并最终退出。
    • 将用户的算子调用(如 mapfilterreduceByKey)封装成依赖图(DAG),并交给 DAGScheduler 划分成 Stage。
    • 维护 SparkContext,向 Cluster Manager 请求资源,并调度 Task 到各 Executor。
  2. Cluster Manager(集群管理器)

    • 负责资源分配:Spark 支持多种 Cluster Manager,包括 Standalone、YARN、Mesos 和 Kubernetes。
    • Driver 向 Cluster Manager 请求 Executor 资源(CPU、内存),然后 Cluster Manager 启动对应数量的 Executor。
  3. Executor

    • 运行在集群节点上的进程,负责执行 Task、缓存 RDD 分区数据(支持内存与磁盘)、以及向 Driver 汇报 Task 状态和计算结果。
    • 典型配置:每个 Executor 多核、多内存,Executor 数量与每个 Executor 的核心/内存可在 spark-submit 时指定。
  4. TaskScheduler 与 DAGScheduler

    • DAGScheduler:将用户程序的 RDD 依赖转换为多个阶段(Stage),并构建 Stage 之间的依赖关系图。
    • TaskScheduler:在 Driver 端使用集群模式感知,负责将待执行的 Task 提交给具体的 Executor,并添加重试逻辑。
  5. Broadcast(广播变量)与 Accumulator(累加器)

    • 广播变量:用来高效分发大只读数据(如维度表、模型参数)到所有 Executor,避免重复传输。
    • 累加器:提供分布式计数/求和功能,Task 可以在各自节点累加到 Driver 上的累加器,用于统计与监控。

2. RDD 与 DAG 依赖关系

RDD 是 Spark 分布式计算的基本抽象,代表一个不可变的分布式数据集。RDD 以延迟评估(Lazy Evaluation)方式构建依赖图,只有当触发算子(Action,如 collectcountsaveAsTextFile)时,Spark 才会通过 DAG 调度计算。

2.1. RDD 的两类依赖

  • 宽依赖(Shuffle Dependency)

    • 例如 groupByKeyreduceByKeyjoin 等算子,会导致数据跨分区重组,需要进行 Shuffle 操作。
    • 宽依赖会将一个父 RDD 的多个分区映射到多个子 RDD 分区,Spark 通过 ShuffleBlockManager 将中间文件写入磁盘并分发。
  • 窄依赖(Narrow Dependency)

    • 例如 mapfilterflatMap 等算子,父 RDD 的每个分区只产生一个子 RDD 分区,数据在 Executor 内部完成转换,无需网络传输。
    • 窄依赖允许 Spark 在同一个 Task 中完成连续算子链式执行,提高了性能。

2.2. DAG 图解

假设有如下算子链:

val textRDD = sc.textFile("hdfs://input.txt")
val words = textRDD.flatMap(_.split("\\s+"))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
val result = counts.collect()
  • textFile:读取分布式文件,生成一个初始 RDD(分区数取决于 HDFS block 数或 user 指定)。
  • flatMapmap:都是窄依赖,链式执行于同一个 Stage。
  • reduceByKey:宽依赖,需要进行 Shuffle,将同一 Key 的数据发送到同一个分区,再合并统计。

生成的 DAG 如下所示(“→” 表示依赖关系):

Stage 1 (窄依赖: flatMap、map, 仅在 Executor 内部计算)
  textFile --> flatMap --> map
          \                   \
           \                   \      (输出中间 Pair RDD 分区,需要 Shuffle)
            \                   \   Shuffle
             \                   \
              -----------------> reduceByKey --> collect (Stage 2)
  • Stage 1:执行从 textFilemap 的所有窄依赖算子,在各 Executor 本地完成分区转换,不涉及 Shuffle。
  • Stage 2:执行 reduceByKey,先进行 Map 端分区写入中间 Shuffle 文件,再在 Reduce 端读取并合并;最后触发 collect 动作将结果返回 Driver。

3. Job 提交到任务执行流程

Spark 作业(Job)执行流程可以分为以下几个阶段:

  1. 将用户代码中所有算子封装成 RDD,建立依赖 DAG;
  2. 触发 Action 时,Driver 将 DAG 交给 DAGScheduler,并划分成多个 Stage;
  3. TaskScheduler 将 Stage 中的 Task 划分到不同 Executor;
  4. Executor 在资源分配(CPU、内存)中执行 Task,并将结果或中间数据写入 Shuffle 目录;
  5. Driver 收集各 Task 计算完成信号后聚合结果,Action 返回最终结果或写出文件。

下面通过细化每一步,逐步展示 Spark 最终在集群中运行的全过程。

3.1. Driver 启动与 SparkContext 初始化

当我们执行以下示例代码时,Spark 会启动 Driver 并初始化 SparkContext

import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("yarn")  // 或 spark://master:7077
      .set("spark.executor.memory", "2g")
      .set("spark.executor.cores", "2")

    val sc = new SparkContext(conf)
    // RDD 转换与行动算子...
    sc.stop()
  }
}
  • SparkConf 中配置应用名称、Cluster Manager 地址(如 YARN、Standalone、Mesos)及 Executor 资源要求。
  • SparkContext:Driver 端的核心入口,负责与 Cluster Manager 交互,申请 Executor 资源,并维护元数据(如 RDD 元信息、广播变量、累加器状态)。

当出现 new SparkContext(conf) 时,Driver 会:

  1. 连接到 Cluster Manager,注册 Application。
  2. 根据配置的资源需求(Executor 内存与核心数),向 Cluster Manager 请求相应数量的 Executor。
  3. Cluster Manager 接收到请求后,在各 Worker 节点上启动 Executor 进程(Java 进程),并在 Executor 中初始化 ExecutorBackend,向 Driver 注册自己。
  4. Driver 收到 Executor 注册后,将其纳入可用执行池,等待 Task 调度。

图示:Driver 请求 Executor 资源

┌───────────────┐   1. Register app & request executors   ┌────────────────┐
│ Spark Driver  │────────────────────────────────────────▶│ Cluster Manager│
│ (SparkContext)│                                          └────────────────┘
└───────┬───────┘                                            ▲          ▲
        │ 3. Launch executors on workers                    │          │
        ▼                                                   │          │
┌──────────────────┐                                        │          │
│   Worker Nodes   │◀───── 2. Assign resources ─────────────┘          │
│  ┌────────────┐  │                                                   │
│  │ Executor   │  │                      ┌──────────────┐             │
│  │  (Backend)   │  │◀────── 4. Register ─│ Spark Driver│             │
│  └────────────┘  │                      └──────────────┘             │
│  ┌────────────┐  │                                                   │
│  │ Executor   │  │                                                   │
│  │  (Backend)   │  │                                                   │
│  └────────────┘  │                                                   │
└──────────────────┘                                                   │
                                                                       │
                                        Executor 心跳 & Task 调度状态更新 ─┘

3.2. 作业(Job)划分与阶段(Stage)生成

当我们在 Driver 端调用行动算子时,例如:

val textRDD = sc.textFile("hdfs://input.txt")
val words = textRDD.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
val result = counts.collect()
  • Step 1:Driver 将 RDD 操作转化为一个逻辑 DAG,直到执行 collect() 触发实际计算。
  • Step 2:Driver 中的 DAGScheduler 接收这个 DAG,将其按照 Shuffle 边界划分为Stage 0Stage 1

    • Stage 0:包含 textFile 来源、flatMapmap 等窄依赖算子,必须先执行并写入中间 Shuffle 数据;
    • Stage 1:包含 reduceByKey 的计算,需要先读取 Stage 0 产生的 Shuffle 文件进行分区聚合,然后执行 collect
  • Stage 划分细节

    • 从图中找到所有窄依赖链起始点,合并为一个 Stage;
    • 遇到第一个宽依赖算子(如 reduceByKey)时,将其前面算子属于一个 Stage,将宽依赖本身作为下一个 Stage 的一部分。

Stage 生成示意

DAG:
 textFile --> flatMap --> map --> reduceByKey --> collect

划分为两个 Stage:
  Stage 0: textFile -> flatMap -> map     (生成中间 Shuffle 文件)
  Stage 1: reduceByKey -> collect        (读取 Shuffle 数据并执行聚合)
  • DAGScheduler 会生成一个对应的 Stage 对象集合,每个 Stage 包含多个 Task,每个 Task 对应一个 RDD 分区。
  • Task 的数量等于对应 RDD 的分区数。假设 textFile 被分为 4 个分区,则 Stage 0 有 4 个 Task;同理,Stage 1 也会有 4 个 Task(因为 Shuffle 后输出分区数默认为 4,或可以用户自定义)。

3.3. Task 集合与 TaskScheduler 调度

DAGScheduler 生成 Stage 列表后,会按拓扑顺序依次提交每个 Stage 给 TaskScheduler。示例流程:

  1. Stage 0

    • DAGScheduler 将 Stage 0 中的每个分区构造成一个 Task,将包含该分区对应的 RDD 计算逻辑(flatMap、map)序列化。
    • 调用 TaskScheduler.submitTasks(stage0Tasks)TaskScheduler 将这些 Task 分配给空闲的 Executor。
  2. Task 发送

    • Driver 将每个 Task 通过 RPC(Netty 或 Akka)发送到对应 Executor,Executor 在本地反序列化并执行 Task 逻辑:读取分区所在数据块(如 HDFS Block)、执行 flatMap、map,将结果写入 Shuffle 目录(本地磁盘)。
    • 执行完毕后,Executor 将 Task 状态(成功或失败)以及 Task 统计信息上传给 Driver。
  3. Stage 1

    • 当所有 Stage 0 的 Task 都完成后,Driver 收到全体完成信号,DAGScheduler 标记 Stage 0 完成,开始提交 Stage 1。
    • Stage 1 的 Task 会在 Shuffle Reduce 阶段从对应 Stage 0 Executor 的 Shuffle 目录拉取中间文件,并执行 reduceByKey 逻辑。
    • 完成后再向 Driver 返回执行结果(如最终的 Key-Count Map)。

Task 调度示意

DAGScheduler:
  for each stage in topologicalOrder:
    generate listOfTasks for stage
    TaskScheduler.submitTasks(listOfTasks)

TaskScheduler 在 Driver 端:
  - 查看 Executor 空闲情况
  - 将 Task 发给合适的 Executor(可考虑数据本地化)
  - 记录 Task 状态(pending, running, success, failed)
  - 失败重试:若 Task 失败,重新调度到其他 Executor(最多重试 4 次)

Executor 端:
  on receive Task:
    - 反序列化 Task
    - 执行 Task.run(): 包括 map 或 reduce 逻辑
    - 将计算结果或中间文件写入本地磁盘
    - 上报 TaskStatus back to Driver
  • 数据本地化优化TaskScheduler 会尽量将 Task 调度到保存对应数据分区的 Executor(如 HDFS Block 本地副本所在节点),减少网络传输开销。
  • 失败重试:若 Task 在 Executor 上因 JVM OOM、磁盘故障、网络中断等原因失败,TaskScheduler 会自动重试到其他可用 Executor(默认最多 4 次)。

4. Executor 与 Task 执行

Executor 是 Spark 集群中真正执行计算的工作单元,一个 Application 会对应多个 Executor 进程。Executor 接收 Task 后,要完成以下关键步骤。

4.1. Executor 启动机制

  • Standalone 模式:Driver 通过 spark://master:7077 向 Standalone Cluster Manager 注册,Cluster Manager 在各 Worker 节点上启动相应数量的 Executor 进程。
  • YARN 模式:Driver(ApplicationMaster)向 YARN ResourceManager 申请 Container,在 Container 中启动 Executor 进程;
  • Mesos / Kubernetes:同理,在 Mesos Agent 或 Kubernetes Pod 中启动 Executor 容器。

每个 Executor 启动时,会注册到 Driver 上,Driver 将维护一个 ExecutorRegistry,用于跟踪所有可用 Executor 及其资源信息(剩余 CPU、内存使用情况等)。

4.2. Task 计算流程与代码序列化

当 Executor 收到 Task 时,会:

  1. 反序列化 Task 信息

    • Task 包含逻辑序列化后的 RDD 衍生链(如 flatMapmap 函数),以及本地分区索引;
    • Spark 使用 Java 序列化或 Kryo 序列化,将 Task 逻辑打包后发往 Executor。
  2. 执行 Task.run()

    • 根据 RDD 类型(例如 MapPartitionsRDDShuffleMapRDDAggregateRDD),依次调用其对应的算子函数;
    • 对于窄依赖任务,只需在本地内存/磁盘中读取父 RDD 分区数据并执行转换;
    • 对于宽依赖任务(ShuffleMapTask 或 ResultTask):

      • ShuffleMapTask:读取父 RDD 分区数据,执行 Map 端逻辑,将结果写入本地 Shuffle 存储(shuffle_0_0_0.mapshuffle_0_0_1.map 等文件);
      • ResultTask:从对应所有 Map 端 Executor 上拉取中间文件,合并并执行 Reduce 逻辑。
  3. 更新累加器与广播变量

    • 如果 Task 中使用了累加器,会将本地累加结果发送给 Driver,Driver 汇总到全局累加器;
    • 通过广播变量访问大只读数据时,Executor 首先检查本地是否已有广播副本(保存在 Block Manager 缓存),若没有则从 Driver 或 Distributed Cache 中拉取。
  4. 写入任务输出

    • 对于 saveAsTextFilesaveAsParquet 等文件输出算子,ResultTask 会将最终结果写入分布式存储(如 HDFS);
    • Task 完成后,Executor 将 TaskStatus 上报给 Driver,并释放相应资源(线程、序列化缓存等)。

4.3. 累加器(Accumulator)与广播变量(Broadcast)

  • Accumulator(累加器)

    • 用途:在分布式任务中做全局计数、求和或统计,用于调试与监控;
    • 实现:Driver 端 LongAccumulator 或自定义累加器,Executor 端获得累加器的临时副本,Task 执行期间对副本进行本地累加,执行完毕后将差值发送到 Driver,Driver 更新全局累加器值;
    • 特性:累加器仅在 Action 执行时才更新,且需谨慎在多个 Action 中使用,避免幂等性问题。
  • Broadcast(广播变量)

    • 用途:在多个 Task 中共享只读数据(如大哈希表、模型参数、配置文件),避免多次传输;
    • 实现:Driver 调用 sc.broadcast(value),Spark 将数据写入分布式文件系统(根据配置),并在 Executor 端缓存本地副本;
    • 特性:广播变量只读且可重复使用,适合大规模 Join、机器学习模型参数分发等场景;

5. Shuffle 过程与优化

Shuffle 是 Spark 中最耗时也最关键的环节,它决定了宽依赖算子(如 reduceByKeygroupByjoin 等)的性能。Shuffle 涉及数据重新分区与跨节点传输,需要在速度与稳定性之间做平衡。

5.1. Shuffle 原理与中间文件存储

  • ShuffleMapTask:在 Map 阶段负责:

    1. 读取父 RDD 分区数据;
    2. 对每条记录计算需要发往哪个 Reduce 分区(partitioner.getPartition(key));
    3. 将结果按照 Partition 分类写入本地临时文件。
  • 中间文件格式:Spark 默认使用Sort-Based Shuffle(2.0+ 版本)或旧版的 Hash-Based Shuffle:

    • Sort-Based Shuffle:对每个 MapTask,将数据先排序(按 Partition、Key 排序),再生成一组文件,并写入索引文件(.index)。
    • Hash-Based Shuffle:对每条记录直接将 Value 写入对应 Partition 的文件,但缺少排序。
  • Shuffle 文件路径:通常位于 Executor 本地磁盘的工作目录下,如:

    /tmp/spark-shuffle/user-123/shuffle_0_0_0
    /tmp/spark-shuffle/user-123/shuffle_0_0_0.index

    其中 shuffle_0 表示第 0 号 Stage,_0 表示 MapTask 的 Partition,后一个 _0 表示 Reduce 分区号。

5.2. Map 端与 Reduce 端交互

  • Map 端写盘:在 ShuffleMapTask 完成后,Executor 在本地生成一或多个(取决于 reducePartitions 数量)中间文件,并将这些文件的元信息(Map Task ID、Reduce Partition ID、逻辑偏移量)注册到 Driver 或 Shuffle 服务。
  • Reduce 端拉取:当 ReduceTask 开始时,Executor 会向所有包含 Shuffle 文件的 MapTask Executor 发出 RPC 请求,批量拉取对应 Reduce Partition 的中间数据文件。
  • 合并排序:拉取回来的各个 Shuffle 文件段,将按 Partition 合并并排序成最终数据供 Reduce 逻辑执行。

Shuffle 过程示意

MapTask (Stage 0) on Executor A:
  ┌────────────────────────────────────────────────┐
  │ 读取分区 0 数据                              │
  │ flatMap -> map -> pairRDD                    │
  │ 对每条 pairRDD 按 key.hashCode % numReducers  │
  │ 将 KV 写入本地文件:shuffle_0_0_0, shuffle_0_0_1 │
  │                    shuffle_0_0_2 ...           │
  └────────────────────────────────────────────────┘

ReduceTask (Stage 1) on Executor B:
  ┌────────────────────────────────────────────────┐
  │ 从所有 Map 端 Executor 拉取 Partition 0 文件   │
  │ shuffle_0_0_0 from Executor A                 │
  │ shuffle_0_1_0 from Executor C                 │
  │ ...                                           │
  │ 合并排序后执行 reduceByKey 逻辑                │
  │ 输出最终结果                                    │
  └────────────────────────────────────────────────┘

5.3. Sort-Based Shuffle 与 Hash-Based Shuffle

  • Hash-Based Shuffle(旧版)

    • Map 阶段不排序,直接将 KV 输出到不同 Partition 的文件,写入速度快;
    • Reduce 阶段需要在内存中合并所有拉回的中间文件,并在内存中做排序,导致内存开销高、易 OOM。
    • 已在 Spark 2.x 逐步淘汰,仅在某些极端场景可回退使用。
  • Sort-Based Shuffle(默认)

    • Map 阶段对输出数据先做外部排序(对内存不足时会借助本地磁盘做 Spill),然后生成有序文件;
    • Reduce 阶段仅依赖合并有序文件(多路归并),无需额外内存排序,I/O 模型更稳定。
    • 缺点:Map 端排序开销;优点:Reduce 端压力更小,性能更可预测。

可以通过以下配置查看或切换 Shuffle 类型:

# 查看当前 Shuffle 实现
spark.shuffle.manager=sort  # 默认 sort-based
# 或将其设置为 hash 以启用 Hash-Based Shuffle(不推荐生产)
spark.shuffle.manager=hash

5.4. Shuffle 性能优化建议

  1. 调整并行度(numShufflePartitions)

    • 默认为 200,可根据数据量与集群规模调整为更高或更低;
    • 过少会造成单个 Reduce 任务数据量过大,过多会导致 Task 过多带来调度开销。
    // 在应用里设置
    spark.conf.set("spark.sql.shuffle.partitions", "500")
  2. 启用加密 Shuffle(如环境安全需求)

    • 可设置 spark.shuffle.encrypt=true,但会带来 CPU 与 I/O 开销。
  3. 开启远程 Shuffle 服务(External Shuffle Service)

    • 在使用动态资源分配(Dynamic Allocation)时,避免 Executor 随意关闭导致 Shuffle 文件丢失;
    • External Shuffle Service 将 Shuffle 文件保存在独立进程中,Executor 下线后 Shuffle 文件仍然可用。
    spark.shuffle.service.enabled=true
  4. 减少全局排序

    • GroupByKey、SortBy 会产生全局排序,对大数据量不友好;
    • 优先使用 reduceByKeyaggregateByKeycombineByKey,减少网络传输与排序开销。
  5. 利用 Kryo 序列化

    • Spark 默认使用 Java 序列化,较慢且体积大;
    • 可在 SparkConf 中配置 Kryo 序列化并注册自定义类,提高网络传输速度。
    val conf = new SparkConf()
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[YourCustomClass]))

6. 容错机制与数据重算(Lineage)

Spark 的容错设计基于 RDD 的血缘依赖(Lineage),不依赖数据副本,而是通过重算从上游数据重新恢复丢失的分区数据。

6.1. DAG 的弹性分布式容错思路

  • Spark 不会将所有中间数据持久化到 HDFS,而只通过 RDD 的血缘依赖信息记录如何从原始数据集或先前的中间结果生成当前 RDD。
  • 当某个 Task 失败或某个 Executor 故障时,Driver 会根据 DAG 重新生成需要的 RDD 分区,并在其他健康 Executor 上重跑对应 Task。
       original.txt (HDFS)  
            │  
            ▼  
        RDD1: textFile            <-- 依赖原始数据  
            │  
     mapPartitions → RDD2         <-- 窄依赖  
            │  
  reduceByKey (Shuffle) → RDD3    <-- 宽依赖  
            │  
            ▼  
        Action: collect  
  • 若执行 Stage 2(reduceByKey)时,有一个 ReduceTask 失败,Spark 会:

    1. Driver 检测 Task 失败并向 DAGScheduler 报告;
    2. DAGScheduler 标记对应 Stage 未完成,将出问题的 ReduceTask 重新放回 TaskScheduler 队列;
    3. 若 Map 端数据丢失,Spark 可使用 Lineage 重算相应 MapTask,再重新执行 Shuffle。

6.2. Task 失败重试与 Executor 失效恢复

  • Task 重试机制

    • Task 在某个 Executor 上失败后,将从 TaskScheduler 队列中重新分配给其他 Executor(最多重试 spark.task.maxFailures 次,默认 4 次)。
    • 如果一次 Task 在同一 Executor 上失败多次(如 JVM 代码错误),将考虑不同 Executor 并上报给 Driver;
  • Executor 失效

    • 当某个 Executor 进程挂掉(如 JVM OOM、节点故障),驱动端会收到心跳超时信息;
    • TaskScheduler 会将该 Executor 上运行的所有未完成 Task 标记为失败,并重新调度到其他可用 Executor;
    • 如果可用 Executor 不足,则新分区数据无法并行计算,最终可能导致作业失败。

6.3. Checkpoint 与外部存储持久化

  • RDD Checkpoint:将 RDD 数据写入可靠的外部存储(如 HDFS),同时将血缘依赖裁剪,防止 DAG 过长引起重算链路复杂与性能下降。

    sc.setCheckpointDir("hdfs://checkpoint-dir")
    rdd.checkpoint()
  • DataFrame/Structured Streaming Checkpoint(应用于 Spark Streaming):在流式计算中,还需做状态持久化与元数据保存,便于从失败中恢复。

7. 代码示例:WordCount 与 Join

下面通过几个示例演示常见算子的使用,并说明其背后的分布式执行原理。

7.1. 基本 WordCount 示例(Scala)

import org.apache.spark.{SparkConf, SparkContext}

object WordCountApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("yarn")   // 或 spark://master:7077
    val sc = new SparkContext(conf)

    // 1. 从 HDFS 读取文本文件,分区数由 HDFS block 决定
    val textRDD = sc.textFile("hdfs://namenode:9000/input.txt")

    // 2. 使用 flatMap 将每行拆分为单词
    val words = textRDD.flatMap(line => line.split("\\s+"))

    // 3. 将单词映射为 (word, 1) 键值对
    val pairs = words.map(word => (word, 1))

    // 4. 使用 reduceByKey 执行 Shuffle 并统计单词出现次数
    val counts = pairs.reduceByKey(_ + _)

    // 5. 将结果保存到 HDFS
    counts.saveAsTextFile("hdfs://namenode:9000/output")

    sc.stop()
  }
}
  • 执行过程

    1. Driver 构建 RDD DAG:textFileflatMapmapreduceByKeysaveAsTextFile
    2. 划分 Stage:

      • Stage 0:textFileflatMapmap(窄依赖);
      • Stage 1:reduceByKey(宽依赖)→saveAsTextFile
    3. 调度 Stage 0 Task,Executor 读取 HDFS Block、执行窄依赖算子,并将 (word,1) 写入 Shuffle 文件;
    4. 调度 Stage 1 Task,Executor 拉取 Shuffle 数据并执行 Key 聚合;
    5. 将最终结果写入 HDFS。

7.2. 带 GroupByKey 的示例(演示 Shuffle)

val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local[*]")
val sc = new SparkContext(conf)

val data = Seq(("A", 1), ("B", 2), ("A", 3), ("B", 4), ("C", 5))
val pairRDD = sc.parallelize(data, 2)  // 两个分区

// groupByKey 会产生 Shuffle,将相同 key 的值收集到同一个分区
val grouped = pairRDD.groupByKey()

grouped.foreach { case (key, iter) =>
  println(s"$key -> ${iter.mkString("[", ",", "]")}")
}

sc.stop()
  • groupByKey 会触发一个新的 Stage,读写 Shuffle:Map 端只做分区和写文件,Reduce 端再将同一 Key 的所有值聚合成一个可迭代集合。
  • 注意事项groupByKey 会将所有相同 Key 的值保存在一个 Iterator 中,若对应 Key 的数据量非常大,容易造成 OOM。通常推荐使用 reduceByKeyaggregateByKey

7.3. RDD Cache 与广播变量在 Join 中的示例

当需要做 RDD 与 RDD 之间的 Join 时,如果一个 RDD 较小(如 lookup 表),可以使用广播变量优化:

val conf = new SparkConf().setAppName("BroadcastJoinExample").setMaster("local[*]")
val sc = new SparkContext(conf)

// 小表,小于 100 MB
val smallTable = sc.textFile("hdfs://namenode:9000/smallTable.txt")
  .map(line => {
    val parts = line.split(",")
    (parts(0), parts(1))
  }).collectAsMap()  // 在 Driver 端收集为 Map

// 将小表广播到所有 Executor
val broadcastSmallTable = sc.broadcast(smallTable)

val largeRDD = sc.textFile("hdfs://namenode:9000/largeData.txt")
  .map(line => {
    val parts = line.split(",")
    val key = parts(0)
    val value = parts(1)
    (key, value)
  })

// 使用广播变量进行 Join
val joined = largeRDD.mapPartitions(iter => {
  val smallMap = broadcastSmallTable.value
  iter.flatMap { case (key, value) =>
    smallMap.get(key) match {
      case Some(smallValue) => Some((key, (smallValue, value)))
      case None => None
    }
  }
})

joined.saveAsTextFile("hdfs://namenode:9000/joinOutput")

sc.stop()
  • 优化说明

    • collectAsMap() 将小表拉到 Driver,然后通过 sc.broadcast() 在初始化时广播到各 Executor;
    • 在每个 Partition 的 Task 内部使用 broadcastSmallTable.value 直接从内存获取广播数据,避免了 Shuffle。

8. 图解:Spark 分布式运行架构

下面通过 ASCII 图展示 Spark 在分布式集群中的各组件交互流程。

8.1. Driver 与 Cluster Manager 通信流程

┌────────────────────────────────────────────────────────────────┐
│                          Spark Driver                         │
│  ┌────────────────┐    ┌───────────────────────────┐            │
│  │ SparkContext   │    │  DAGScheduler & TaskScheduler │            │
│  └───┬───────────┘    └─────────────┬─────────────┘            │
│      │                           │                          │
│      │ 1. 启动 Driver 时           │                          │
│      │ requestExecutorResources   │                          │
│      ▼                           │                          │
│ ┌───────────────────┐             │                          │
│ │ Cluster Manager   │◀────────────┘                          │
│ │  (YARN/Standalone)│                                        │
│ └───────────────────┘                                        │
│      │ 2. 分配资源 (Executors)                                      │
│      │                                                        │
│      ▼                                                        │
│ ┌─────────────────────────────────────────┐                     │
│ │  Executor 1      Executor 2     Executor N │                 │
│ │(Worker Node A)  (Worker B)     (Worker C) │                 │
│ └─────────────────────────────────────────┘                     │
└────────────────────────────────────────────────────────────────┘
  • Driver 与 Cluster Manager 建立连接并请求 Executor 资源;
  • Cluster Manager 在各 Worker 上启动 Executor 进程;

8.2. Task 调度与 Shuffle 流程示意图

  ┌───────────────────────────────────────────────────────────┐
  │                         Spark Driver                     │
  │  ┌────────────────────────────┐     ┌───────────────────┐ │
  │  │   DAGScheduler (Stage0)    │     │   DAGScheduler    │ │
  │  │  generate 4 Tasks for RDD1 │     │   (Stage1)        │ │
  │  └───────────────┬────────────┘     └─────────┬─────────┘ │
  │                  │                              │          │
  │                  │  submit Tasks                │          │
  │                  ▼                              ▼          │
  │            ┌────────────────────────────────────────────────┐│
  │            │                TaskScheduler                   ││
  │            │ assign Tasks to Executors based on data locality││
  │            └──────────────┬─────────────────────────────────┘│
  │                           │                                   │
  │        ┌──────────────────▼──────────────────┐                │
  │        │Executor 1     Executor 2      Executor 3│                │
  │        │(Worker A)     (Worker B)      (Worker C) │                │
  │        │  ┌───────────┐  ┌───────────┐  ┌───────────┐  │                │
  │        │  │ Task0     │  │ Task1     │  │ Task2     │  │                │
  │        │  └───┬───────┘  └───┬───────┘  └───┬───────┘  │                │
  │        │      │              │              │          │                │
  │        │  mapPartitions  mapPartitions  mapPartitions │                │
  │        │      │              │              │          │                │
  │        │  write Shuffle files              │          │                │
  │        │           shuffle_0_0_0, shuffle_0_0_1...   │                │
  │        │      │              │              │          │                │
  │        │     ...            ...            ...         │                │
  │        │      │              │              │          │                │
  │        │      ▼              ▼              ▼  (Phase Completed)         │
  │        │  Task0 Done     Task1 Done    Task2 Done                          │
  │        │                                                                       │
  │        │  Now Stage0 Done, start Stage1                                           │
  │        │       ┌───────────────────────────────────────────────────┐        │
  │        │       │                 TaskScheduler                     │        │
  │        │       │ assign ReduceTasks based on Shuffle files location │        │
  │        │       └─────────────┬───────────────────────────────────┘        │
  │        │                    │                                           │
  │        │      ┌─────────────▼────────────────────┐                      │
  │        │      │Executor 4   Executor 5     Executor 6│                      │
  │        │      │(Worker D)   (Worker B)     (Worker C)│                      │
  │        │      │  ┌───────────┐ ┌───────────┐  ┌───────────┐  │               │
  │        │      │  │Reduce0    │ │Reduce1    │  │Reduce2    │  │               │
  │        │      │  └─────┬─────┘ └─────┬─────┘  └─────┬─────┘  │               │
  │        │      │        │             │             │       │               │
  │        │      │  pull shuffle_0_*_0 pull shuffle_0_*_1 pull shuffle_0_*_2│               │
  │        │      │        │             │             │       │               │
  │        │      │  reduceByKey   reduceByKey    reduceByKey │               │
  │        │      │        │             │             │       │               │
  │        │      └────────▼────────────┴─────────────▼───────┘               │
  │        │              Task Completed → Write final output                  │
  │        └───────────────────────────────────────────────────────────────────┘
  │                                                                      │
  └──────────────────────────────────────────────────────────────────────┘
  • 阶段 0(Stage0):Executor A、B、C 分别执行 MapPartitions,将中间结果写到本地 Shuffle 文件;
  • 阶段 1(Stage1):Executor D、E、F 分别从 A、B、C 拉取对应 Shuffle 文件片段(如 shuffle\_0\_0\_0、shuffle\_0\_1\_0 等),并执行 reduceByKey 操作,最后输出结果。

9. 总结与最佳实践

本文从 Spark 的核心架构出发,详细剖析了 Spark 在分布式集群中的执行流程,包括 Driver 启动、Cluster Manager 资源分配、Executor 启动与 Task 调度、Shuffle 过程与容错设计。以下是部分最佳实践与注意事项:

  1. 合理设置并行度

    • spark.default.parallelism:控制 RDD 的默认分区数;
    • spark.sql.shuffle.partitions:影响 Spark SQL 中 Shuffle 阶段分区数;
    • 分区数过少会导致并行度不足,过多会带来 Task 调度与 Shuffle 文件过多。
  2. 数据本地化与数据倾斜

    • 尽量让 Task 调度到数据本地节点,以减少网络 I/O;
    • 对于具有高度倾斜的 Key(Hot Key),可使用 saltingCustomPartitioner 或者提前做样本统计进行优化。
  3. 减少 Shuffle 开销

    • 使用 reduceByKeyaggregateByKey 而非 groupByKey
    • 利用广播变量(Broadcast)替代小表 Join,避免 Shuffle;
    • 避免全局排序操作,如 sortBydistinct 等。
  4. 内存与序列化优化

    • 使用 Kryo 序列化:spark.serializer=org.apache.spark.serializer.KryoSerializer,并注册自定义类;
    • 合理设置 executor.memoryexecutor.coresspark.memory.fraction,防止 OOM;
    • 避免大量短生命周期的对象,减少 GC 开销。
  5. 容错与 Checkpoint

    • 对长链式依赖的 RDD,定期做 Checkpoint,缩减 DAG 长度,降低失败重算开销;
    • 在 Spark Streaming 中,配置 Checkpoint 目录,保证流式应用可从失败中恢复。
  6. 监控与调优

    • 使用 Spark UI(4040 端口)查看 DAG、Stage、Task 详情;
    • 监控 Executor 日志中的 Shuffle 文件大小、Task 执行时间与 GC 时间;
    • 定期分析 Shuffle 读写 I/O、Task 失败率,持续优化上下游依赖。

通过深入理解 Spark 的分布式运行原理,你将能够更有效地设计 Spark 应用程序、诊断性能瓶颈,并在大规模集群环境下实现高性能与高可用的数据处理。希望本文的详解与代码示例能够帮助你轻松上手 Spark 分布式编程与调优,实现真正的“数据即服务”与“算法即服务”。