Spark分布式运行原理深度解析
在大数据时代,Apache Spark 以其高速计算、易用编程模型和丰富生态圈成为主流分布式计算框架之一。要深入理解 Spark 的强大能力,必须从其分布式运行机制出发,掌握 Driver、Executor、Cluster Manager、DAG 调度、Shuffle 机制、容错设计等核心原理。本文将从 Spark 核心架构开始,结合代码示例与 ASCII 图解,逐步剖析 Spark 在集群环境中的任务提交、调度、执行到结果返回的全过程,帮助你快速学会并深入掌握 Spark 分布式运行原理。
目录
- Spark 概述与核心组件
- RDD 与 DAG 依赖关系
- Job 提交到任务执行流程
3.1. Driver 启动与 SparkContext 初始化
3.2. 作业(Job)划分与阶段(Stage)生成
3.3. Task 集合与 TaskScheduler 调度 - Executor 与 Task 执行
4.1. Executor 启动机制
4.2. Task 计算流程与代码序列化
4.3. 累加器(Accumulator)与广播变量(Broadcast) - Shuffle 过程与优化
5.1. Shuffle 原理与中间文件存储
5.2. Map 端与 Reduce 端交互
5.3. Sort-Based Shuffle 与 Hash-Based Shuffle
5.4. Shuffle 性能优化建议 - 容错机制与数据重算(Lineage)
6.1. DAG 的弹性分布式容错思路
6.2. Task 失败重试与 Executor 失效恢复
6.3. Checkpoint 与外部存储持久化 - 代码示例:WordCount 与 Join
7.1. 基本 WordCount 示例(Scala)
7.2. 带 GroupByKey 的示例(演示 Shuffle)
7.3. RDD Cache 与广播变量在 Join 中的示例 - 图解:Spark 分布式运行架构
8.1. Driver 与 Cluster Manager 通信流程
8.2. Task 调度与 Shuffle 流程示意图 - 总结与最佳实践
1. Spark 概述与核心组件
Spark 是一个通用的分布式数据处理引擎,核心设计围绕弹性分布式数据集(Resilient Distributed Dataset,RDD)。相比传统 MapReduce,Spark 在内存中计算、DAG 调度、迭代性能等方面具有显著优势。
1.1. 核心组件
Driver Program
- 负责整个 Spark 应用程序的生命周期:从创建
SparkContext
开始,到提交作业、监控、收集结果并最终退出。 - 将用户的算子调用(如
map
、filter
、reduceByKey
)封装成依赖图(DAG),并交给DAGScheduler
划分成 Stage。 - 维护
SparkContext
,向 Cluster Manager 请求资源,并调度 Task 到各 Executor。
- 负责整个 Spark 应用程序的生命周期:从创建
Cluster Manager(集群管理器)
- 负责资源分配:Spark 支持多种 Cluster Manager,包括 Standalone、YARN、Mesos 和 Kubernetes。
- Driver 向 Cluster Manager 请求
Executor
资源(CPU、内存),然后 Cluster Manager 启动对应数量的 Executor。
Executor
- 运行在集群节点上的进程,负责执行 Task、缓存 RDD 分区数据(支持内存与磁盘)、以及向 Driver 汇报 Task 状态和计算结果。
- 典型配置:每个 Executor 多核、多内存,Executor 数量与每个 Executor 的核心/内存可在
spark-submit
时指定。
TaskScheduler 与 DAGScheduler
- DAGScheduler:将用户程序的 RDD 依赖转换为多个阶段(Stage),并构建 Stage 之间的依赖关系图。
- TaskScheduler:在 Driver 端使用集群模式感知,负责将待执行的 Task 提交给具体的 Executor,并添加重试逻辑。
Broadcast(广播变量)与 Accumulator(累加器)
- 广播变量:用来高效分发大只读数据(如维度表、模型参数)到所有 Executor,避免重复传输。
- 累加器:提供分布式计数/求和功能,Task 可以在各自节点累加到 Driver 上的累加器,用于统计与监控。
2. RDD 与 DAG 依赖关系
RDD 是 Spark 分布式计算的基本抽象,代表一个不可变的分布式数据集。RDD 以延迟评估(Lazy Evaluation)方式构建依赖图,只有当触发算子(Action,如 collect
、count
、saveAsTextFile
)时,Spark 才会通过 DAG 调度计算。
2.1. RDD 的两类依赖
宽依赖(Shuffle Dependency)
- 例如
groupByKey
、reduceByKey
、join
等算子,会导致数据跨分区重组,需要进行 Shuffle 操作。 - 宽依赖会将一个父 RDD 的多个分区映射到多个子 RDD 分区,Spark 通过 ShuffleBlockManager 将中间文件写入磁盘并分发。
- 例如
窄依赖(Narrow Dependency)
- 例如
map
、filter
、flatMap
等算子,父 RDD 的每个分区只产生一个子 RDD 分区,数据在 Executor 内部完成转换,无需网络传输。 - 窄依赖允许 Spark 在同一个 Task 中完成连续算子链式执行,提高了性能。
- 例如
2.2. DAG 图解
假设有如下算子链:
val textRDD = sc.textFile("hdfs://input.txt")
val words = textRDD.flatMap(_.split("\\s+"))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
val result = counts.collect()
textFile
:读取分布式文件,生成一个初始 RDD(分区数取决于 HDFS block 数或 user 指定)。flatMap
与map
:都是窄依赖,链式执行于同一个 Stage。reduceByKey
:宽依赖,需要进行 Shuffle,将同一 Key 的数据发送到同一个分区,再合并统计。
生成的 DAG 如下所示(“→” 表示依赖关系):
Stage 1 (窄依赖: flatMap、map, 仅在 Executor 内部计算)
textFile --> flatMap --> map
\ \
\ \ (输出中间 Pair RDD 分区,需要 Shuffle)
\ \ Shuffle
\ \
-----------------> reduceByKey --> collect (Stage 2)
- Stage 1:执行从
textFile
到map
的所有窄依赖算子,在各 Executor 本地完成分区转换,不涉及 Shuffle。 - Stage 2:执行
reduceByKey
,先进行 Map 端分区写入中间 Shuffle 文件,再在 Reduce 端读取并合并;最后触发collect
动作将结果返回 Driver。
3. Job 提交到任务执行流程
Spark 作业(Job)执行流程可以分为以下几个阶段:
- 将用户代码中所有算子封装成 RDD,建立依赖 DAG;
- 触发 Action 时,Driver 将 DAG 交给
DAGScheduler
,并划分成多个 Stage; TaskScheduler
将 Stage 中的 Task 划分到不同 Executor;- Executor 在资源分配(CPU、内存)中执行 Task,并将结果或中间数据写入 Shuffle 目录;
- Driver 收集各 Task 计算完成信号后聚合结果,Action 返回最终结果或写出文件。
下面通过细化每一步,逐步展示 Spark 最终在集群中运行的全过程。
3.1. Driver 启动与 SparkContext 初始化
当我们执行以下示例代码时,Spark 会启动 Driver 并初始化 SparkContext
:
import org.apache.spark.{SparkConf, SparkContext}
object WordCountApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("yarn") // 或 spark://master:7077
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val sc = new SparkContext(conf)
// RDD 转换与行动算子...
sc.stop()
}
}
- SparkConf 中配置应用名称、Cluster Manager 地址(如 YARN、Standalone、Mesos)及 Executor 资源要求。
- SparkContext:Driver 端的核心入口,负责与 Cluster Manager 交互,申请 Executor 资源,并维护元数据(如 RDD 元信息、广播变量、累加器状态)。
当出现 new SparkContext(conf)
时,Driver 会:
- 连接到 Cluster Manager,注册 Application。
- 根据配置的资源需求(Executor 内存与核心数),向 Cluster Manager 请求相应数量的 Executor。
- Cluster Manager 接收到请求后,在各 Worker 节点上启动 Executor 进程(Java 进程),并在 Executor 中初始化
ExecutorBackend
,向 Driver 注册自己。 - Driver 收到 Executor 注册后,将其纳入可用执行池,等待 Task 调度。
图示:Driver 请求 Executor 资源
┌───────────────┐ 1. Register app & request executors ┌────────────────┐
│ Spark Driver │────────────────────────────────────────▶│ Cluster Manager│
│ (SparkContext)│ └────────────────┘
└───────┬───────┘ ▲ ▲
│ 3. Launch executors on workers │ │
▼ │ │
┌──────────────────┐ │ │
│ Worker Nodes │◀───── 2. Assign resources ─────────────┘ │
│ ┌────────────┐ │ │
│ │ Executor │ │ ┌──────────────┐ │
│ │ (Backend) │ │◀────── 4. Register ─│ Spark Driver│ │
│ └────────────┘ │ └──────────────┘ │
│ ┌────────────┐ │ │
│ │ Executor │ │ │
│ │ (Backend) │ │ │
│ └────────────┘ │ │
└──────────────────┘ │
│
Executor 心跳 & Task 调度状态更新 ─┘
3.2. 作业(Job)划分与阶段(Stage)生成
当我们在 Driver 端调用行动算子时,例如:
val textRDD = sc.textFile("hdfs://input.txt")
val words = textRDD.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
val result = counts.collect()
- Step 1:Driver 将 RDD 操作转化为一个逻辑 DAG,直到执行
collect()
触发实际计算。 Step 2:Driver 中的
DAGScheduler
接收这个 DAG,将其按照 Shuffle 边界划分为Stage 0 和 Stage 1:- Stage 0:包含
textFile
来源、flatMap
、map
等窄依赖算子,必须先执行并写入中间 Shuffle 数据; - Stage 1:包含
reduceByKey
的计算,需要先读取 Stage 0 产生的 Shuffle 文件进行分区聚合,然后执行collect
。
- Stage 0:包含
Stage 划分细节:
- 从图中找到所有窄依赖链起始点,合并为一个 Stage;
- 遇到第一个宽依赖算子(如
reduceByKey
)时,将其前面算子属于一个 Stage,将宽依赖本身作为下一个 Stage 的一部分。
Stage 生成示意
DAG:
textFile --> flatMap --> map --> reduceByKey --> collect
划分为两个 Stage:
Stage 0: textFile -> flatMap -> map (生成中间 Shuffle 文件)
Stage 1: reduceByKey -> collect (读取 Shuffle 数据并执行聚合)
DAGScheduler
会生成一个对应的Stage
对象集合,每个 Stage 包含多个 Task,每个 Task 对应一个 RDD 分区。- Task 的数量等于对应 RDD 的分区数。假设
textFile
被分为 4 个分区,则 Stage 0 有 4 个 Task;同理,Stage 1 也会有 4 个 Task(因为 Shuffle 后输出分区数默认为 4,或可以用户自定义)。
3.3. Task 集合与 TaskScheduler 调度
当 DAGScheduler
生成 Stage 列表后,会按拓扑顺序依次提交每个 Stage 给 TaskScheduler
。示例流程:
Stage 0:
DAGScheduler
将 Stage 0 中的每个分区构造成一个 Task,将包含该分区对应的 RDD 计算逻辑(flatMap、map)序列化。- 调用
TaskScheduler.submitTasks(stage0Tasks)
,TaskScheduler
将这些 Task 分配给空闲的 Executor。
Task 发送:
- Driver 将每个 Task 通过 RPC(Netty 或 Akka)发送到对应 Executor,Executor 在本地反序列化并执行 Task 逻辑:读取分区所在数据块(如 HDFS Block)、执行 flatMap、map,将结果写入 Shuffle 目录(本地磁盘)。
- 执行完毕后,Executor 将 Task 状态(成功或失败)以及 Task 统计信息上传给 Driver。
Stage 1:
- 当所有 Stage 0 的 Task 都完成后,Driver 收到全体完成信号,
DAGScheduler
标记 Stage 0 完成,开始提交 Stage 1。 - Stage 1 的 Task 会在 Shuffle Reduce 阶段从对应 Stage 0 Executor 的 Shuffle 目录拉取中间文件,并执行 reduceByKey 逻辑。
- 完成后再向 Driver 返回执行结果(如最终的 Key-Count Map)。
- 当所有 Stage 0 的 Task 都完成后,Driver 收到全体完成信号,
Task 调度示意
DAGScheduler:
for each stage in topologicalOrder:
generate listOfTasks for stage
TaskScheduler.submitTasks(listOfTasks)
TaskScheduler 在 Driver 端:
- 查看 Executor 空闲情况
- 将 Task 发给合适的 Executor(可考虑数据本地化)
- 记录 Task 状态(pending, running, success, failed)
- 失败重试:若 Task 失败,重新调度到其他 Executor(最多重试 4 次)
Executor 端:
on receive Task:
- 反序列化 Task
- 执行 Task.run(): 包括 map 或 reduce 逻辑
- 将计算结果或中间文件写入本地磁盘
- 上报 TaskStatus back to Driver
- 数据本地化优化:
TaskScheduler
会尽量将 Task 调度到保存对应数据分区的 Executor(如 HDFS Block 本地副本所在节点),减少网络传输开销。 - 失败重试:若 Task 在 Executor 上因 JVM OOM、磁盘故障、网络中断等原因失败,
TaskScheduler
会自动重试到其他可用 Executor(默认最多 4 次)。
4. Executor 与 Task 执行
Executor 是 Spark 集群中真正执行计算的工作单元,一个 Application 会对应多个 Executor 进程。Executor 接收 Task 后,要完成以下关键步骤。
4.1. Executor 启动机制
- Standalone 模式:Driver 通过
spark://master:7077
向 Standalone Cluster Manager 注册,Cluster Manager 在各 Worker 节点上启动相应数量的 Executor 进程。 - YARN 模式:Driver(ApplicationMaster)向 YARN ResourceManager 申请 Container,在 Container 中启动 Executor 进程;
- Mesos / Kubernetes:同理,在 Mesos Agent 或 Kubernetes Pod 中启动 Executor 容器。
每个 Executor 启动时,会注册到 Driver 上,Driver 将维护一个 ExecutorRegistry,用于跟踪所有可用 Executor 及其资源信息(剩余 CPU、内存使用情况等)。
4.2. Task 计算流程与代码序列化
当 Executor 收到 Task 时,会:
反序列化 Task 信息
- Task 包含逻辑序列化后的 RDD 衍生链(如
flatMap
、map
函数),以及本地分区索引; - Spark 使用 Java 序列化或 Kryo 序列化,将 Task 逻辑打包后发往 Executor。
- Task 包含逻辑序列化后的 RDD 衍生链(如
执行 Task.run()
- 根据 RDD 类型(例如
MapPartitionsRDD
、ShuffleMapRDD
、AggregateRDD
),依次调用其对应的算子函数; - 对于窄依赖任务,只需在本地内存/磁盘中读取父 RDD 分区数据并执行转换;
对于宽依赖任务(ShuffleMapTask 或 ResultTask):
- ShuffleMapTask:读取父 RDD 分区数据,执行 Map 端逻辑,将结果写入本地 Shuffle 存储(
shuffle_0_0_0.map
、shuffle_0_0_1.map
等文件); - ResultTask:从对应所有 Map 端 Executor 上拉取中间文件,合并并执行 Reduce 逻辑。
- ShuffleMapTask:读取父 RDD 分区数据,执行 Map 端逻辑,将结果写入本地 Shuffle 存储(
- 根据 RDD 类型(例如
更新累加器与广播变量
- 如果 Task 中使用了累加器,会将本地累加结果发送给 Driver,Driver 汇总到全局累加器;
- 通过广播变量访问大只读数据时,Executor 首先检查本地是否已有广播副本(保存在 Block Manager 缓存),若没有则从 Driver 或 Distributed Cache 中拉取。
写入任务输出
- 对于
saveAsTextFile
、saveAsParquet
等文件输出算子,ResultTask 会将最终结果写入分布式存储(如 HDFS); - Task 完成后,Executor 将 TaskStatus 上报给 Driver,并释放相应资源(线程、序列化缓存等)。
- 对于
4.3. 累加器(Accumulator)与广播变量(Broadcast)
Accumulator(累加器)
- 用途:在分布式任务中做全局计数、求和或统计,用于调试与监控;
- 实现:Driver 端
LongAccumulator
或自定义累加器,Executor 端获得累加器的临时副本,Task 执行期间对副本进行本地累加,执行完毕后将差值发送到 Driver,Driver 更新全局累加器值; - 特性:累加器仅在 Action 执行时才更新,且需谨慎在多个 Action 中使用,避免幂等性问题。
Broadcast(广播变量)
- 用途:在多个 Task 中共享只读数据(如大哈希表、模型参数、配置文件),避免多次传输;
- 实现:Driver 调用
sc.broadcast(value)
,Spark 将数据写入分布式文件系统(根据配置),并在 Executor 端缓存本地副本; - 特性:广播变量只读且可重复使用,适合大规模 Join、机器学习模型参数分发等场景;
5. Shuffle 过程与优化
Shuffle 是 Spark 中最耗时也最关键的环节,它决定了宽依赖算子(如 reduceByKey
、groupBy
、join
等)的性能。Shuffle 涉及数据重新分区与跨节点传输,需要在速度与稳定性之间做平衡。
5.1. Shuffle 原理与中间文件存储
ShuffleMapTask:在 Map 阶段负责:
- 读取父 RDD 分区数据;
- 对每条记录计算需要发往哪个 Reduce 分区(
partitioner.getPartition(key)
); - 将结果按照 Partition 分类写入本地临时文件。
中间文件格式:Spark 默认使用Sort-Based Shuffle(2.0+ 版本)或旧版的 Hash-Based Shuffle:
- Sort-Based Shuffle:对每个 MapTask,将数据先排序(按 Partition、Key 排序),再生成一组文件,并写入索引文件(
.index
)。 - Hash-Based Shuffle:对每条记录直接将 Value 写入对应 Partition 的文件,但缺少排序。
- Sort-Based Shuffle:对每个 MapTask,将数据先排序(按 Partition、Key 排序),再生成一组文件,并写入索引文件(
Shuffle 文件路径:通常位于 Executor 本地磁盘的工作目录下,如:
/tmp/spark-shuffle/user-123/shuffle_0_0_0 /tmp/spark-shuffle/user-123/shuffle_0_0_0.index
其中
shuffle_0
表示第 0 号 Stage,_0
表示 MapTask 的 Partition,后一个_0
表示 Reduce 分区号。
5.2. Map 端与 Reduce 端交互
- Map 端写盘:在 ShuffleMapTask 完成后,Executor 在本地生成一或多个(取决于 reducePartitions 数量)中间文件,并将这些文件的元信息(Map Task ID、Reduce Partition ID、逻辑偏移量)注册到 Driver 或 Shuffle 服务。
- Reduce 端拉取:当 ReduceTask 开始时,Executor 会向所有包含 Shuffle 文件的 MapTask Executor 发出 RPC 请求,批量拉取对应 Reduce Partition 的中间数据文件。
- 合并排序:拉取回来的各个 Shuffle 文件段,将按 Partition 合并并排序成最终数据供 Reduce 逻辑执行。
Shuffle 过程示意
MapTask (Stage 0) on Executor A:
┌────────────────────────────────────────────────┐
│ 读取分区 0 数据 │
│ flatMap -> map -> pairRDD │
│ 对每条 pairRDD 按 key.hashCode % numReducers │
│ 将 KV 写入本地文件:shuffle_0_0_0, shuffle_0_0_1 │
│ shuffle_0_0_2 ... │
└────────────────────────────────────────────────┘
ReduceTask (Stage 1) on Executor B:
┌────────────────────────────────────────────────┐
│ 从所有 Map 端 Executor 拉取 Partition 0 文件 │
│ shuffle_0_0_0 from Executor A │
│ shuffle_0_1_0 from Executor C │
│ ... │
│ 合并排序后执行 reduceByKey 逻辑 │
│ 输出最终结果 │
└────────────────────────────────────────────────┘
5.3. Sort-Based Shuffle 与 Hash-Based Shuffle
Hash-Based Shuffle(旧版)
- Map 阶段不排序,直接将 KV 输出到不同 Partition 的文件,写入速度快;
- Reduce 阶段需要在内存中合并所有拉回的中间文件,并在内存中做排序,导致内存开销高、易 OOM。
- 已在 Spark 2.x 逐步淘汰,仅在某些极端场景可回退使用。
Sort-Based Shuffle(默认)
- Map 阶段对输出数据先做外部排序(对内存不足时会借助本地磁盘做 Spill),然后生成有序文件;
- Reduce 阶段仅依赖合并有序文件(多路归并),无需额外内存排序,I/O 模型更稳定。
- 缺点:Map 端排序开销;优点:Reduce 端压力更小,性能更可预测。
可以通过以下配置查看或切换 Shuffle 类型:
# 查看当前 Shuffle 实现
spark.shuffle.manager=sort # 默认 sort-based
# 或将其设置为 hash 以启用 Hash-Based Shuffle(不推荐生产)
spark.shuffle.manager=hash
5.4. Shuffle 性能优化建议
调整并行度(numShufflePartitions)
- 默认为 200,可根据数据量与集群规模调整为更高或更低;
- 过少会造成单个 Reduce 任务数据量过大,过多会导致 Task 过多带来调度开销。
// 在应用里设置 spark.conf.set("spark.sql.shuffle.partitions", "500")
启用加密 Shuffle(如环境安全需求)
- 可设置
spark.shuffle.encrypt=true
,但会带来 CPU 与 I/O 开销。
- 可设置
开启远程 Shuffle 服务(External Shuffle Service)
- 在使用动态资源分配(Dynamic Allocation)时,避免 Executor 随意关闭导致 Shuffle 文件丢失;
- External Shuffle Service 将 Shuffle 文件保存在独立进程中,Executor 下线后 Shuffle 文件仍然可用。
spark.shuffle.service.enabled=true
减少全局排序
- GroupByKey、SortBy 会产生全局排序,对大数据量不友好;
- 优先使用
reduceByKey
、aggregateByKey
或combineByKey
,减少网络传输与排序开销。
利用 Kryo 序列化
- Spark 默认使用 Java 序列化,较慢且体积大;
- 可在
SparkConf
中配置 Kryo 序列化并注册自定义类,提高网络传输速度。
val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[YourCustomClass]))
6. 容错机制与数据重算(Lineage)
Spark 的容错设计基于 RDD 的血缘依赖(Lineage),不依赖数据副本,而是通过重算从上游数据重新恢复丢失的分区数据。
6.1. DAG 的弹性分布式容错思路
- Spark 不会将所有中间数据持久化到 HDFS,而只通过 RDD 的血缘依赖信息记录如何从原始数据集或先前的中间结果生成当前 RDD。
- 当某个 Task 失败或某个 Executor 故障时,Driver 会根据 DAG 重新生成需要的 RDD 分区,并在其他健康 Executor 上重跑对应 Task。
original.txt (HDFS)
│
▼
RDD1: textFile <-- 依赖原始数据
│
mapPartitions → RDD2 <-- 窄依赖
│
reduceByKey (Shuffle) → RDD3 <-- 宽依赖
│
▼
Action: collect
若执行 Stage 2(reduceByKey)时,有一个 ReduceTask 失败,Spark 会:
- Driver 检测 Task 失败并向 DAGScheduler 报告;
- DAGScheduler 标记对应 Stage 未完成,将出问题的 ReduceTask 重新放回 TaskScheduler 队列;
- 若 Map 端数据丢失,Spark 可使用 Lineage 重算相应 MapTask,再重新执行 Shuffle。
6.2. Task 失败重试与 Executor 失效恢复
Task 重试机制:
- Task 在某个 Executor 上失败后,将从
TaskScheduler
队列中重新分配给其他 Executor(最多重试spark.task.maxFailures
次,默认 4 次)。 - 如果一次 Task 在同一 Executor 上失败多次(如 JVM 代码错误),将考虑不同 Executor 并上报给 Driver;
- Task 在某个 Executor 上失败后,将从
Executor 失效:
- 当某个 Executor 进程挂掉(如 JVM OOM、节点故障),驱动端会收到心跳超时信息;
TaskScheduler
会将该 Executor 上运行的所有未完成 Task 标记为失败,并重新调度到其他可用 Executor;- 如果可用 Executor 不足,则新分区数据无法并行计算,最终可能导致作业失败。
6.3. Checkpoint 与外部存储持久化
RDD Checkpoint:将 RDD 数据写入可靠的外部存储(如 HDFS),同时将血缘依赖裁剪,防止 DAG 过长引起重算链路复杂与性能下降。
sc.setCheckpointDir("hdfs://checkpoint-dir") rdd.checkpoint()
- DataFrame/Structured Streaming Checkpoint(应用于 Spark Streaming):在流式计算中,还需做状态持久化与元数据保存,便于从失败中恢复。
7. 代码示例:WordCount 与 Join
下面通过几个示例演示常见算子的使用,并说明其背后的分布式执行原理。
7.1. 基本 WordCount 示例(Scala)
import org.apache.spark.{SparkConf, SparkContext}
object WordCountApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("yarn") // 或 spark://master:7077
val sc = new SparkContext(conf)
// 1. 从 HDFS 读取文本文件,分区数由 HDFS block 决定
val textRDD = sc.textFile("hdfs://namenode:9000/input.txt")
// 2. 使用 flatMap 将每行拆分为单词
val words = textRDD.flatMap(line => line.split("\\s+"))
// 3. 将单词映射为 (word, 1) 键值对
val pairs = words.map(word => (word, 1))
// 4. 使用 reduceByKey 执行 Shuffle 并统计单词出现次数
val counts = pairs.reduceByKey(_ + _)
// 5. 将结果保存到 HDFS
counts.saveAsTextFile("hdfs://namenode:9000/output")
sc.stop()
}
}
执行过程:
- Driver 构建 RDD DAG:
textFile
→flatMap
→map
→reduceByKey
→saveAsTextFile
; 划分 Stage:
- Stage 0:
textFile
→flatMap
→map
(窄依赖); - Stage 1:
reduceByKey
(宽依赖)→saveAsTextFile
;
- Stage 0:
- 调度 Stage 0 Task,Executor 读取 HDFS Block、执行窄依赖算子,并将 (word,1) 写入 Shuffle 文件;
- 调度 Stage 1 Task,Executor 拉取 Shuffle 数据并执行 Key 聚合;
- 将最终结果写入 HDFS。
- Driver 构建 RDD DAG:
7.2. 带 GroupByKey 的示例(演示 Shuffle)
val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local[*]")
val sc = new SparkContext(conf)
val data = Seq(("A", 1), ("B", 2), ("A", 3), ("B", 4), ("C", 5))
val pairRDD = sc.parallelize(data, 2) // 两个分区
// groupByKey 会产生 Shuffle,将相同 key 的值收集到同一个分区
val grouped = pairRDD.groupByKey()
grouped.foreach { case (key, iter) =>
println(s"$key -> ${iter.mkString("[", ",", "]")}")
}
sc.stop()
groupByKey
会触发一个新的 Stage,读写 Shuffle:Map 端只做分区和写文件,Reduce 端再将同一 Key 的所有值聚合成一个可迭代集合。- 注意事项:
groupByKey
会将所有相同 Key 的值保存在一个 Iterator 中,若对应 Key 的数据量非常大,容易造成 OOM。通常推荐使用reduceByKey
或aggregateByKey
。
7.3. RDD Cache 与广播变量在 Join 中的示例
当需要做 RDD 与 RDD 之间的 Join 时,如果一个 RDD 较小(如 lookup 表),可以使用广播变量优化:
val conf = new SparkConf().setAppName("BroadcastJoinExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 小表,小于 100 MB
val smallTable = sc.textFile("hdfs://namenode:9000/smallTable.txt")
.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
}).collectAsMap() // 在 Driver 端收集为 Map
// 将小表广播到所有 Executor
val broadcastSmallTable = sc.broadcast(smallTable)
val largeRDD = sc.textFile("hdfs://namenode:9000/largeData.txt")
.map(line => {
val parts = line.split(",")
val key = parts(0)
val value = parts(1)
(key, value)
})
// 使用广播变量进行 Join
val joined = largeRDD.mapPartitions(iter => {
val smallMap = broadcastSmallTable.value
iter.flatMap { case (key, value) =>
smallMap.get(key) match {
case Some(smallValue) => Some((key, (smallValue, value)))
case None => None
}
}
})
joined.saveAsTextFile("hdfs://namenode:9000/joinOutput")
sc.stop()
优化说明:
collectAsMap()
将小表拉到 Driver,然后通过sc.broadcast()
在初始化时广播到各 Executor;- 在每个 Partition 的 Task 内部使用
broadcastSmallTable.value
直接从内存获取广播数据,避免了 Shuffle。
8. 图解:Spark 分布式运行架构
下面通过 ASCII 图展示 Spark 在分布式集群中的各组件交互流程。
8.1. Driver 与 Cluster Manager 通信流程
┌────────────────────────────────────────────────────────────────┐
│ Spark Driver │
│ ┌────────────────┐ ┌───────────────────────────┐ │
│ │ SparkContext │ │ DAGScheduler & TaskScheduler │ │
│ └───┬───────────┘ └─────────────┬─────────────┘ │
│ │ │ │
│ │ 1. 启动 Driver 时 │ │
│ │ requestExecutorResources │ │
│ ▼ │ │
│ ┌───────────────────┐ │ │
│ │ Cluster Manager │◀────────────┘ │
│ │ (YARN/Standalone)│ │
│ └───────────────────┘ │
│ │ 2. 分配资源 (Executors) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Executor 1 Executor 2 Executor N │ │
│ │(Worker Node A) (Worker B) (Worker C) │ │
│ └─────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘
- Driver 与 Cluster Manager 建立连接并请求 Executor 资源;
- Cluster Manager 在各 Worker 上启动 Executor 进程;
8.2. Task 调度与 Shuffle 流程示意图
┌───────────────────────────────────────────────────────────┐
│ Spark Driver │
│ ┌────────────────────────────┐ ┌───────────────────┐ │
│ │ DAGScheduler (Stage0) │ │ DAGScheduler │ │
│ │ generate 4 Tasks for RDD1 │ │ (Stage1) │ │
│ └───────────────┬────────────┘ └─────────┬─────────┘ │
│ │ │ │
│ │ submit Tasks │ │
│ ▼ ▼ │
│ ┌────────────────────────────────────────────────┐│
│ │ TaskScheduler ││
│ │ assign Tasks to Executors based on data locality││
│ └──────────────┬─────────────────────────────────┘│
│ │ │
│ ┌──────────────────▼──────────────────┐ │
│ │Executor 1 Executor 2 Executor 3│ │
│ │(Worker A) (Worker B) (Worker C) │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Task0 │ │ Task1 │ │ Task2 │ │ │
│ │ └───┬───────┘ └───┬───────┘ └───┬───────┘ │ │
│ │ │ │ │ │ │
│ │ mapPartitions mapPartitions mapPartitions │ │
│ │ │ │ │ │ │
│ │ write Shuffle files │ │ │
│ │ shuffle_0_0_0, shuffle_0_0_1... │ │
│ │ │ │ │ │ │
│ │ ... ... ... │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ (Phase Completed) │
│ │ Task0 Done Task1 Done Task2 Done │
│ │ │
│ │ Now Stage0 Done, start Stage1 │
│ │ ┌───────────────────────────────────────────────────┐ │
│ │ │ TaskScheduler │ │
│ │ │ assign ReduceTasks based on Shuffle files location │ │
│ │ └─────────────┬───────────────────────────────────┘ │
│ │ │ │
│ │ ┌─────────────▼────────────────────┐ │
│ │ │Executor 4 Executor 5 Executor 6│ │
│ │ │(Worker D) (Worker B) (Worker C)│ │
│ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ │Reduce0 │ │Reduce1 │ │Reduce2 │ │ │
│ │ │ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ │ │
│ │ │ │ │ │ │ │
│ │ │ pull shuffle_0_*_0 pull shuffle_0_*_1 pull shuffle_0_*_2│ │
│ │ │ │ │ │ │ │
│ │ │ reduceByKey reduceByKey reduceByKey │ │
│ │ │ │ │ │ │ │
│ │ └────────▼────────────┴─────────────▼───────┘ │
│ │ Task Completed → Write final output │
│ └───────────────────────────────────────────────────────────────────┘
│ │
└──────────────────────────────────────────────────────────────────────┘
- 阶段 0(Stage0):Executor A、B、C 分别执行 MapPartitions,将中间结果写到本地 Shuffle 文件;
- 阶段 1(Stage1):Executor D、E、F 分别从 A、B、C 拉取对应 Shuffle 文件片段(如 shuffle\_0\_0\_0、shuffle\_0\_1\_0 等),并执行 reduceByKey 操作,最后输出结果。
9. 总结与最佳实践
本文从 Spark 的核心架构出发,详细剖析了 Spark 在分布式集群中的执行流程,包括 Driver 启动、Cluster Manager 资源分配、Executor 启动与 Task 调度、Shuffle 过程与容错设计。以下是部分最佳实践与注意事项:
合理设置并行度
spark.default.parallelism
:控制 RDD 的默认分区数;spark.sql.shuffle.partitions
:影响 Spark SQL 中 Shuffle 阶段分区数;- 分区数过少会导致并行度不足,过多会带来 Task 调度与 Shuffle 文件过多。
数据本地化与数据倾斜
- 尽量让 Task 调度到数据本地节点,以减少网络 I/O;
- 对于具有高度倾斜的 Key(Hot Key),可使用
salting
、CustomPartitioner
或者提前做样本统计进行优化。
减少 Shuffle 开销
- 使用
reduceByKey
、aggregateByKey
而非groupByKey
; - 利用广播变量(Broadcast)替代小表 Join,避免 Shuffle;
- 避免全局排序操作,如
sortBy
、distinct
等。
- 使用
内存与序列化优化
- 使用 Kryo 序列化:
spark.serializer=org.apache.spark.serializer.KryoSerializer
,并注册自定义类; - 合理设置
executor.memory
、executor.cores
、spark.memory.fraction
,防止 OOM; - 避免大量短生命周期的对象,减少 GC 开销。
- 使用 Kryo 序列化:
容错与 Checkpoint
- 对长链式依赖的 RDD,定期做 Checkpoint,缩减 DAG 长度,降低失败重算开销;
- 在 Spark Streaming 中,配置 Checkpoint 目录,保证流式应用可从失败中恢复。
监控与调优
- 使用 Spark UI(4040 端口)查看 DAG、Stage、Task 详情;
- 监控 Executor 日志中的 Shuffle 文件大小、Task 执行时间与 GC 时间;
- 定期分析 Shuffle 读写 I/O、Task 失败率,持续优化上下游依赖。
通过深入理解 Spark 的分布式运行原理,你将能够更有效地设计 Spark 应用程序、诊断性能瓶颈,并在大规模集群环境下实现高性能与高可用的数据处理。希望本文的详解与代码示例能够帮助你轻松上手 Spark 分布式编程与调优,实现真正的“数据即服务”与“算法即服务”。