ClickHouse分布式部署、表创建及数据迁移全攻略
ClickHouse分布式部署、表创建及数据迁移全攻略
在大数据时代,ClickHouse 以其高性能 OLAP 查询能力和良好的水平扩展性,逐渐成为海量时序与分析场景的首选数据库。要将 ClickHouse 用于生产环境,往往需要部署分布式集群,实现数据的分片与复制,并针对业务场景设计分布式表结构与数据迁移策略。本文将从分布式架构原理出发,结合代码示例与图解,全面介绍如何完成 ClickHouse 分布式集群的部署、表的创建(含复制表与分布式表)、以及数据迁移的多种手段,帮助你快速掌握 ClickHouse 在生产环境的使用要点。
目录
- ClickHouse 分布式架构概述
1.1. 单节点 vs 分布式
1.2. 分片(Shard)与副本(Replica)
1.3. ZooKeeper 在分布式中的作用 - 环境准备与组件安装
2.1. 系统与网络要求
2.2. 安装 ZooKeeper 集群
2.3. 安装 ClickHouse 节点 - 分布式集群部署示例
3.1. 集群拓扑设计与图解
3.2. ZooKeeper 配置
3.3. ClickHouseconfig.xml
与users.xml
配置
3.4. 启动 ClickHouse 服务与校验 - 分布式表引擎与表创建
4.1. MergeTree 与 ReplicatedMergeTree 引擎
4.2. Distributed 引擎原理与实现
4.3. 本地表与分布式表创建示例
4.4. 示例:查询分布式表的执行流程图解 - 数据写入、查询与负载均衡
5.1. 写入到 ReplicatedMergeTree 且分片自动路由
5.2. 分布式表查询流程详解
5.3. Insert、Select 示例 - 数据迁移与同步策略
6.1. 单机 ClickHouse 到分布式集群迁移
6.2. MySQL 到 ClickHouse 的迁移示例(使用 Kafka 或clickhouse-mysql
)
6.3.clickhouse-copier
工具使用
6.4.INSERT SELECT
与外部表引擎同步
6.5. 实时同步示例:使用 Kafka 引擎 + Materialized View - 运维与监控要点
7.1. ZooKeeper 集群监控
7.2. ClickHouse 节点健康检查
7.3. 分片与副本恢复流程
7.4. 备份与恢复策略 - 常见问题与优化建议
8.1. 查询慢或分布式 JOIN 性能优化
8.2. 数据倾斜与分片键设计
8.3. 磁盘、内存、网络调优 - 总结
1. ClickHouse 分布式架构概述
在深入部署细节之前,首先要明确 ClickHouse 在分布式场景下的几大核心概念:分片(Shard)、副本(Replica)、ZooKeeper 元数据管理,以及分布式表(Distributed Engine)与本地表(MergeTree/ReplicatedMergeTree)的配合。
1.1 单节点 vs 分布式
单节点部署
- 典型用于测试、小规模数据或单机分析。
- 数据存储在本地 MergeTree 或其派生引擎(如 SummingMergeTree、AggregatingMergeTree 等)表中。
- 缺点:无法横向扩展,无副本冗余,节点宕机即数据不可用。
分布式部署
- 通过将数据按某种分片策略均匀分布到多个实例(Shard)上,同时为每个 Shard 配置副本(Replica),实现高可用与水平扩展。
- 查询时,客户端可通过分布式表路由到对应 Shard,或跨 Shard 聚合查询。
核心组件:
- ClickHouse 节点:负责存储与执行。
- ZooKeeper:负责存储分布式元数据(表的分片 & 副本信息、DDL 同步)。
1.2 分片(Shard)与副本(Replica)
Shard(分片)
- 将逻辑数据集按分片键(如用户 ID、时间范围或哈希值)均匀切分为多个子集,每个子集部署在不同的节点上。
常见策略:
- Hash 分片:
shard_key = cityHash64(user_id) % shard_count
。 - 范围分片:根据时间/业务范围拆分。
- Hash 分片:
Replica(副本)
- 每个 Shard 下可部署多个 Replica,保证 Shard 内数据的一致性与高可用。
- Replica 间基于 ZooKeeper 的复制队列自动同步数据。
- 在一个 Replica 挂掉时,点击恢复或重启,其他 Replica 可继续提供服务。
图解:多 Shard / 多 Replica 架构示例
┌────────────────────────────────────────────────┐ │ ZooKeeper 集群(3 节点) │ │ 存储:/clickhouse/tables/{db}.{table}/shardN │ └────────────────────────────────────────────────┘ │ │ │ ┌────────────────┴─────┐ ┌─────────┴────────┐ │ │ Shard 1 │ │ Shard 2 │ │ │ ┌─────────┐ ┌───────┐ │ │ ┌─────────┐ ┌──────┐ │ │ │ │Replica1 │ │Replica2│ │ │ │Replica1 │ │Replica2│ │ │ │ │ Node A │ │ Node B │ │ │ │ Node C │ │ Node D │ │ │ │ └─────────┘ └───────┘ │ │ └─────────┘ └──────┘ │ │ └───────────────────────┘ └─────────────────────┘ │ │ │ │ │ │ │ 分布式表路由 / 跨 Shard 聚合查询 │
- Shard 内部:Replica1、Replica2 两个副本互为冗余,Replica1、Replica2 分别部署在不同物理机上,以应对单点故障。
- 跨 Shard:客户端通过分布式表(Distributed Engine)将查询分发至每个 Shard 下的副本,由 ZooKeeper 协调副本选择。
1.3 ZooKeeper 在分布式中的作用
ClickHouse 的分布式功能依赖 ZooKeeper 来保证以下核心功能:
DDL 同步
- 所有 Replica 在创建表、修改表结构时通过 ZooKeeper 写入变更路径,确保各节点同步执行 DDL。
复制队列管理(ReplicatedMergeTree)
- 每个 Replica 会将本地插入/删除任务写入 ZooKeeper 中对应分片的队列节点,其他 Replica 订阅该队列并拉取任务执行,实现数据复制。
分布式表元数据
- Distributed Engine 在 ZooKeeper 中读取集群信息,确定如何将某条 SQL 分发到各个分片。
副本故障检测与恢复
- ZooKeeper 记录当前可用 Replica 列表,当某个 Replica 宕机或网络不可达,其他 Replica 会继续提供写入与查询。
ZooKeeper 目录示例(部分)
/clickhouse/ ├─ tables/ │ └─ default.hits/ # hits 表对应的节点 │ ├─ shard1/ # Shard1 下的所有 Replica │ │ ├─ leader_election -> 存储当前 leader 信息 │ │ └─ queue/Replica1 -> 存储 Replica1 的写入操作 │ └─ shard2/ # Shard2 下 │ └─ queue/Replica3 ├─ macros/ # 宏定义,可在配置中使用 └─ replication_alter_columns/... # DDL 同步信息
2. 环境准备与组件安装
本文以 Ubuntu 20.04 为示例操作系统,假设即将部署 2 个 Shard,每个 Shard 2 个 Replica,共 4 台 ClickHouse 节点,并使用 3 节点 ZooKeeper 集群保障高可用。
2.1 系统与网络要求
操作系统
- 建议使用 Debian/Ubuntu/CentOS 等 Linux 发行版,本文以 Ubuntu 20.04 为例。
网络连通性
所有节点之间需互相能通:
ping zk1 zk2 zk3 ping click1 click2 click3 click4
关闭防火墙或放通必要端口:
- ZooKeeper:2181(客户端访问)、2888/3888(集群内部选举)。
- ClickHouse:9000(TCP 协议,默认客户端端口)、8123(HTTP 接口)、9009(Keeper 通信,若启用 Keeper 模式,可忽略)。
时间同步
建议使用 NTP 或 chrony 保证各节点时间同步,否则会影响 ReplicatedMergeTree 的副本选举与健康检查。
sudo apt-get install chrony sudo systemctl enable chrony sudo systemctl start chrony
2.2 安装 ZooKeeper 集群
在 3 台节点(假设 IP 分别为 192.168.1.10/11/12
)上完成 ZooKeeper 安装与集群配置。
2.2.1 下载与解压
# 在每台机器执行
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.7.1-bin /opt/zookeeper
2.2.2 配置 zoo.cfg
# 编辑 /opt/zookeeper/conf/zoo.cfg (如果目录下无 zoo.cfg 示例,可复制 conf/zoo_sample.cfg)
cat <<EOF > /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
# 集群内部通信端口(选举与同步)
server.1=192.168.1.10:2888:3888
server.2=192.168.1.11:2888:3888
server.3=192.168.1.12:2888:3888
EOF
2.2.3 创建 dataDir 与 myid
# 在每台机器分别执行
sudo mkdir -p /var/lib/zookeeper
sudo chown $(whoami):$(whoami) /var/lib/zookeeper
# 将编号写入 myid(与 zoo.cfg 中 server.N 对应)
# 机器 192.168.1.10
echo "1" > /var/lib/zookeeper/myid
# 机器 192.168.1.11
echo "2" > /var/lib/zookeeper/myid
# 机器 192.168.1.12
echo "3" > /var/lib/zookeeper/myid
2.2.4 启动 ZooKeeper
# 同步在 3 台节点上启动
/opt/zookeeper/bin/zkServer.sh start
# 检查集群状态
/opt/zookeeper/bin/zkServer.sh status
# 期望输出类似 “Mode: leader” 或 “Mode: follower”
至此,3 节点 ZooKeeper 集群已启动并形成仲裁,可支持多副本 ClickHouse 的元数据管理。
2.3 安装 ClickHouse 节点
在 4 台 ClickHouse 节点(假设 IP 为 192.168.1.20/21/22/23
)上,按照以下步骤安装 ClickHouse:
2.3.1 安装 Yandex 官方仓库并安装
# 安装官方 GPG Key
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4
# 添加仓库
echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
# 更新并安装
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
2.3.2 配置防火墙与端口
# 放通 TCP 9000、8123、9009 端口(若使用 CentOS,可用 firewalld 或 iptables)
sudo ufw allow 9000/tcp
sudo ufw allow 8123/tcp
sudo ufw allow 9009/tcp
sudo ufw reload
2.3.3 启动 ClickHouse 服务
sudo systemctl enable clickhouse-server
sudo systemctl start clickhouse-server
# 查看日志,确认正常启动
sudo journalctl -u clickhouse-server -f
注意:此时 ClickHouse 还未配置分布式功能,仅是默认的单节点模式。
3. 分布式集群部署示例
下面以 2 Shard × 2 Replica 为例,演示如何将 4 个 ClickHouse 节点组成分布式集群。假设对应节点如下:
Shard1
- Replica1:
192.168.1.20
(click1) - Replica2:
192.168.1.21
(click2)
- Replica1:
Shard2
- Replica1:
192.168.1.22
(click3) - Replica2:
192.168.1.23
(click4)
- Replica1:
3.1 集群拓扑设计与图解
┌────────────────────────────────────────────────┐
│ ZooKeeper 3 节点 │
│ [192.168.1.10, 11, 12] 端口 2181,2888,3888 │
└────────────────────────────────────────────────┘
│ │ │
┌────────────────┴──────────────┴──────────────┴───────────────┐
│ ClickHouse 分布式集群 │
│ Shard1 Shard2 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ click1 │ │ click2 │ │ click3 │ │ click4 │ │
│ │ (Replica) │ │ (Replica) │ │ (Replica) │ │ (Replica) │ │
│ │ zk:2181 │ │ zk:2181 │ │ zk:2181 │ │ zk:2181 │ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │
└───────────────────────────────────────────────────────────────┘
│ │ │ │
│ ReplicatedMergeTree 本地表 (pathy) │ Distributed 表 (path) │
│ 数据分片 & 自动复制 │ 跨 Shard 查询路由 │
- ZooKeeper:运行在
192.168.1.10/11/12:2181
- click1/click2:Shard1 的 2 个 Replica,两个节点负责存储 Shard1 的数据,数据通过 ZooKeeper 自动复制。
- click3/click4:Shard2 的 2 个 Replica,同理。
3.2 ZooKeeper 配置
上文已完成 ZooKeeper 集群搭建,确认集群健康后,ClickHouse 参考以下 ZooKeeper 连接方式即可。
<!-- /etc/clickhouse-server/config.xml (各节点相同,只需保证 zk 配置正确) -->
<yandex>
<!-- 其他配置省略 -->
<zookeeper>
<node>
<host>192.168.1.10</host>
<port>2181</port>
</node>
<node>
<host>192.168.1.11</host>
<port>2181</port>
</node>
<node>
<host>192.168.1.12</host>
<port>2181</port>
</node>
</zookeeper>
<!-- 更多配置... -->
</yandex>
3.3 ClickHouse config.xml
与 users.xml
配置
为了实现 ReplicatedMergeTree 与 Distributed 引擎,需修改以下配置文件。
3.3.1 修改 config.xml
编辑 /etc/clickhouse-server/config.xml
,在 <yandex>
节点内添加以下段落:
<yandex>
<!-- ... 原有配置 ... -->
<!-- ZooKeeper 节点 (已如上所示) -->
<zookeeper>
<node>
<host>192.168.1.10</host>
<port>2181</port>
</node>
<node>
<host>192.168.1.11</host>
<port>2181</port>
</node>
<node>
<host>192.168.1.12</host>
<port>2181</port>
</node>
</zookeeper>
<!-- 为分布式部署添加 shards 与 replicas 定义 -->
<remote_servers>
<!-- 定义一个逻辑集群名 cluster1,包含 2 个 shard -->
<cluster1>
<shard>
<replica>
<host>192.168.1.20</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.21</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>192.168.1.22</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.23</host>
<port>9000</port>
</replica>
</shard>
</cluster1>
</remote_servers>
<!-- 定义默认数据库 macros,方便在 SQL 中使用 {cluster} -->
<macros>
<cluster>cluster1</cluster>
<shard>shard1</shard> <!-- 可留空,主要使用 macros.cluster -->
</macros>
<!-- 持久化参数,以及其他可选配置 -->
<!-- ... -->
</yandex>
<remote_servers>
- 定义逻辑集群名称
cluster1
,下有两个<shard>
节点,每个<shard>
下有若干<replica>
。 - 在后续创建 Distributed 表时,会引用
cluster1
,ClickHouse 自动根据此配置将查询分发到各 shard 下的一个副本。
- 定义逻辑集群名称
<macros>
- 定义了
{cluster}
宏,后续 SQL 可直接使用remote('cluster1', ...)
或{cluster}
。
- 定义了
修改完成后,重启 ClickHouse 节点以使配置生效:
sudo systemctl restart clickhouse-server
3.3.2 修改 users.xml
(可选)
若需为分布式表访问设置白名单,建议修改 /etc/clickhouse-server/users.xml
,在相应用户下添加 <networks>
:
<!-- users.xml 片段 -->
<profiles>
<default>
<!-- 其他配置 -->
</default>
</profiles>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip> <!-- 允许任意 IP 访问 -->
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
若公司内部有统一授权管理,可为特定用户专门配置分布式访问权限。
3.4 启动 ClickHouse 服务与校验
重启所有 ClickHouse 节点
sudo systemctl restart clickhouse-server
校验 ZooKeeper 连接
clickhouse-client --query="SELECT * FROM system.zookeeper WHERE path LIKE '/clickhouse/%' LIMIT 5;"
- 若能正常返回节点信息,则表明 ClickHouse 成功连接到 ZooKeeper。
校验
remote_servers
配置是否生效
在任意一台节点上执行:clickhouse-client --query="SELECT host_name(), version();" # 查看本地信息
然后执行跨集群的 Hello 查询:
clickhouse-client --query="SELECT * FROM remote('cluster1', system.one) LIMIT 4;"
- 该查询会在
cluster1
下的每个 Replica 上执行SELECT * FROM system.one LIMIT 1
,汇总 4 条记录。如果能正常返回 4 条,则表示 remote\_servers 生效。
- 该查询会在
4. 分布式表引擎与表创建
在完成分布式部署后,需要了解 ClickHouse 提供的几种常见表引擎,并结合分布式场景设计合适的表结构。
4.1 MergeTree 与 ReplicatedMergeTree 引擎
MergeTree 系列
- 最常用的引擎,适用于单机场景或非严格高可用需求。
- 支持分区(
PARTITION BY
)、排序键(ORDER BY
)、TTL、物化视图等。 示例创建:
CREATE TABLE default.events_mt ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = MergeTree() PARTITION BY toYYYYMM(dt) ORDER BY (user_id, dt);
ReplicatedMergeTree 系列
- 在 MergeTree 基础上,增加了通过 ZooKeeper 实现副本复制与容灾能力。
需要传入两个重要参数:
- ZooKeeper 路径:例如
/clickhouse/tables/{database}.{table}/shardN
。 - Replica 名称:在同一 Shard 下需唯一,如
replica1
、replica2
。
- ZooKeeper 路径:例如
示例创建(在 Shard1 下的两个 Replica 分别执行):
CREATE TABLE default.events_shard1_replica1 ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/default.events/shard1', -- ZooKeeper 路径 'replica1' -- Replica 名称 ) PARTITION BY toYYYYMM(dt) ORDER BY (user_id, dt); CREATE TABLE default.events_shard1_replica2 ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/default.events/shard1', -- 与 replica1 相同的路径 'replica2' ) PARTITION BY toYYYYMM(dt) ORDER BY (user_id, dt);
Shard2 下分别创建两个 Replica
CREATE TABLE default.events_shard2_replica1 ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/default.events/shard2', 'replica1' ) PARTITION BY toYYYYMM(dt) ORDER BY (user_id, dt); CREATE TABLE default.events_shard2_replica2 ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = ReplicatedMergeTree( '/clickhouse/tables/default.events/shard2', 'replica2' ) PARTITION BY toYYYYMM(dt) ORDER BY (user_id, dt);
说明:
- ZooKeeper 路径
'/clickhouse/tables/default.events/shard1'
与 Shard 名称保持一致,有助于后续维护。- 每个 Shard 下的 Replica 都指定相同的 ZooKeeper 路径,Replica 在同一路径上协调数据复制。
4.2 Distributed 引擎原理与实现
Distributed 引擎
- 提供跨 Shard 的查询路由能力,本质上是一个逻辑视图,将查询分发到建在各 Shard 下的本地表,再在客户端聚合结果。
创建时需要指定:
- 集群名称:与
config.xml
中remote_servers
配置保持一致,如cluster1
。 - 数据库和表名:在各 Replica 上实际存在的本地表名(如
default.events_shard1_replica1
、..._replica2
、...shard2_replica1
、...shard2_replica2
)。 - 分片键(可选):用于将写入分发到某个 Shard,而不是广播到所有 Shard。
- 集群名称:与
示例创建:
CREATE TABLE default.events_distributed ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = Distributed( 'cluster1', -- 与 config.xml 中 remote_servers 的 <cluster1> 'default', -- 数据库名 'events_local',-- 各 Shard 对应的本地表前缀(需在各节点上创建同名本地表) rand() -- 分片键,可改为 cityHash64(user_id) );
由于各 Shard 下的本地表可能使用 ReplicatedMergeTree 并加入了 Replica 后缀,为简化管理,可在各 local 表下创建一个同名别名表
events_local
,指向当前 Replica。示例:每台节点(click1\~click4)都创建一个同名的本地别名表:
CREATE TABLE default.events_local AS default.events_shard1_replica1; -- click1 CREATE TABLE default.events_local AS default.events_shard1_replica2; -- click2 CREATE TABLE default.events_local AS default.events_shard2_replica1; -- click3 CREATE TABLE default.events_local AS default.events_shard2_replica2; -- click4
这样,在 Distributed 引擎中只需引用
events_local
,ClickHouse 会自动查找每个节点上对应的本地表。
4.3 本地表与分布式表创建示例
下面结合 Shard1/Shard2、Replica1/Replica2 全流程示例。
4.3.1 Shard1 Replica1 上创建本地表
-- 点击 click1 (Shard1 Replica1)
CREATE DATABASE IF NOT EXISTS default;
CREATE TABLE default.events_shard1_replica1 (
dt Date,
user_id UInt64,
action String,
value Float32
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/default.events/shard1',
'replica1'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);
4.3.2 Shard1 Replica2 上创建本地表
-- 点击 click2 (Shard1 Replica2)
CREATE DATABASE IF NOT EXISTS default;
CREATE TABLE default.events_shard1_replica2 (
dt Date,
user_id UInt64,
action String,
value Float32
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/default.events/shard1',
'replica2'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);
4.3.3 Shard2 Replica1 上创建本地表
-- 点击 click3 (Shard2 Replica1)
CREATE DATABASE IF NOT EXISTS default;
CREATE TABLE default.events_shard2_replica1 (
dt Date,
user_id UInt64,
action String,
value Float32
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/default.events/shard2',
'replica1'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);
4.3.4 Shard2 Replica2 上创建本地表
-- 点击 click4 (Shard2 Replica2)
CREATE DATABASE IF NOT EXISTS default;
CREATE TABLE default.events_shard2_replica2 (
dt Date,
user_id UInt64,
action String,
value Float32
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/default.events/shard2',
'replica2'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);
提示:在创建完上述本地表后,可使用以下命令检查副本同步是否正常:
-- 在任意节点执行 SELECT database, table, is_leader, queue_size, future_parts, parts_to_merge, last_queue_update, last_queue_update_time FROM system.replicas WHERE database = 'default' AND table LIKE 'events%';
- 查看
is_leader
、queue_size
是否为 0,表示副本同步正常;若有积压任务,可等待或手动修复。
4.3.5 在每个节点上创建本地别名表
为了让分布式引擎统一使用同名本地表,建议在每个节点上都创建一个 events_local
别名表,指向上一步创建的 Replica 表。示例如下:
click1(Shard1 Replica1)
CREATE TABLE default.events_local AS default.events_shard1_replica1;
click2(Shard1 Replica2)
CREATE TABLE default.events_local AS default.events_shard1_replica2;
click3(Shard2 Replica1)
CREATE TABLE default.events_local AS default.events_shard2_replica1;
click4(Shard2 Replica2)
CREATE TABLE default.events_local AS default.events_shard2_replica2;
说明:别名表不会在存储目录再新建数据;它只是一个对 ReplicatedMergeTree 本地表的引用(ATTACH TABLE 方式)。如果希望更严格隔离,也可以使用ATTACH TABLE
语法,但AS ...
方式足够常见。
4.3.6 创建分布式表
在任意一台节点(建议使用 click1)上执行:
CREATE TABLE default.events_distributed (
dt Date,
user_id UInt64,
action String,
value Float32
)
ENGINE = Distributed(
'cluster1', -- 与 config.xml 中定义的集群名称
'default', -- 数据库名
'events_local', -- 各节点上本地表别名
cityHash64(user_id) -- 分片键
);
关键说明:
cityHash64(user_id)
:ClickHouse 内置的一种哈希函数,可将user_id
映射到 [0, 2^64) 区间后再% shard_count
,分散写入到不同的 Shard。- 如果不填分片键(如填
rand()
或''
),则 Insert 操作会自动将每条记录广播到所有 Shard。
到此,分布式表与本地 Replica 表的创建已完成。
4.4 示例:查询分布式表的执行流程图解
┌─────────────────────────────────────────────────────────────────────────┐
│ ClickHouse Client │
│ SELECT user_id, count() FROM default.events_distributed GROUP BY user_id │
└─────────────────────────────────────────────────────────────────────────┘
│
查询路由到 cluster1
│
┌────────────────────┴────────────────────┐
│ │
┌───────────────┐ ┌───────────────┐
│ Shard1 │ │ Shard2 │
│ (click1/2) │ │ (click3/4) │
│ Distributed │ │ Distributed │
│ Engine Worker │ │ Engine Worker │
└───────┬───────┘ └───────┬───────┘
│ 查询对应本地表 events_local │ 查询对应本地表 events_local
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Local Table │ │ Local Table │
│ events_local │ │ events_local │
│ (Shard1 Data) │ │ (Shard2 Data) │
│ ReplicatedMT │ │ ReplicatedMT │
└───────┬───────┘ └───────┬───────┘
│ │
│ 执行 group by、count() 本地聚合 │ 执行本地聚合
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Partial Results │ │ Partial Results │
│ (user_id, count) │ │ (user_id, count) │
└──────────┬───────┘ └──────────┬───────┘
│ │
│ 将部分结果汇总到客户端并进行最终合并 │
└───────────────┬─────────────────────────────────────┘
▼
客户端合并聚合结果
│
▼
返回最终 (user_id, total_count) 列表
- Shard1/Shard2:分布式表引擎仅充当调度者,真正的计算在各节点本地的
events_local
。 - 本地聚合:为了减少网络传输,ClickHouse 默认会先在本地执行 GroupBy、聚合等操作,只有聚合后较小的中间结果通过网络返回再做最终合并。这样能显著提高分布式查询性能。
5. 数据写入、查询与负载均衡
完成表结构创建后,接下来演示如何将数据写入分布式表与查询,以及写入时如何自动分片或广播。
5.1 写入到 ReplicatedMergeTree 且分片自动路由
使用分布式表写入
- 推荐通过分布式表
events_distributed
写入,ClickHouse 会根据cityHash64(user_id) % shard_count
自动将数据路由到相应 Shard 的 Replica(随机选择一个可用 Replica 写入)。 示例插入 3 条数据,user\_id 为 1、2、3:
INSERT INTO default.events_distributed VALUES ('2023-09-01', 1, 'click', 10.5), ('2023-09-01', 2, 'view', 5.0), ('2023-09-01', 3, 'purchase', 100.0);
若 Shard Count=2,那么:
- 对于
user_id = 1
:cityHash64(1) % 2 = 1
(假设),路由到 Shard2; user_id = 2
:%2 = 0
,写入 Shard1;user_id = 3
:%2 = 1
,写入 Shard2。
- 对于
- 推荐通过分布式表
写入副本选择
- Shard 内部多个 Replica 会随机选择一个可写 Replica;若写入的 Replica 挂掉,其他 Replica 会接受写入请求。写入后,Replica 间基于 ZooKeeper 自动同步数据。
5.2 分布式表查询流程详解
查询
events_distributed
- 当执行
SELECT * FROM events_distributed WHERE user_id = 2;
时,ClickHouse 会根据分片键cityHash64(2) % 2
计算出目标 Shard(Shard1),并将查询请求发给 Shard1 的一个 Replica。 - 然后在该 Replica 上查询
events_local
(即 Shard1 本地的 ReplicatedMergeTree 表),返回结果。 - 如果 Query 涉及跨 Shard(如
GROUP BY
或不带WHERE
的SELECT *
),则请求会广播到所有 Shard,每个 Shard 返回部分结果,最后由客户端合并。
- 当执行
分布式聚合与性能
- 对于大表聚合查询,分布式表引擎会首先在每个 Shard 本地进行“部分聚合(partial aggregation)”,然后再把各 Shard 的部分结果收集到一个节点进行“最终聚合(final aggregation)”,大幅减少网络传输量。
5.3 Insert、Select 示例
批量插入示例
INSERT INTO default.events_distributed SELECT toDate('2023-09-02') AS dt, number AS user_id, 'auto' AS action, number * 1.1 AS value FROM numbers(100000) -- 生成 100,000 条测试数据 WHERE number < 10000; -- 只写入前 10,000 条
查询示例
-- 查看 Shard1 上的数据量(仅在 Shard1 的 click1 或 click2 节点上执行) SELECT count(*) FROM default.events_shard1_replica1; SELECT count(*) FROM default.events_shard1_replica2; -- 查询分布式表中的总数据量 SELECT count(*) FROM default.events_distributed; -- 分布式聚合示例 SELECT user_id, count() AS cnt FROM default.events_distributed GROUP BY user_id ORDER BY cnt DESC LIMIT 10;
验证数据一致性
在 Shard1 Replica1 与 Replica2 上分别查询本地表,确认两者数据同步:SELECT count(*) FROM default.events_shard1_replica1; SELECT count(*) FROM default.events_shard1_replica2;
6. 数据迁移与同步策略
在实际生产中,经常需要将已有数据迁移到新的分布式 ClickHouse 集群,或与外部数据库(如 MySQL)集成,实现实时或离线数据同步。下面介绍几种常见迁移与同步方案。
6.1 单机 ClickHouse 到分布式集群迁移
假设已有一个单节点 ClickHouse(192.168.1.30
),其中有表 default.events_single
,需要将其数据迁移到上述分布式集群并保持不间断服务。
6.1.1 在新集群创建同结构的分布式表
- 在新集群创建 ReplicatedMergeTree 本地表与 Distributed 表(与前节示例一致)。
- 确保
events_distributed
已就绪。
6.1.2 使用 INSERT SELECT
迁移数据
在原单节点上执行以下操作,将数据复制到分布式表(通过 clickhouse-client 连接到分布式集群任一节点即可):
clickhouse-client --host=192.168.1.20 --query="
INSERT INTO default.events_distributed
SELECT * FROM remote('single_host', default, 'events_single')
"
需先在
config.xml
的remote_servers
中配置single_host
,以便分布式查询原节点数据。示例配置(在每个新集群节点的/etc/clickhouse-server/config.xml
添加):<remote_servers> <single_host_cluster> <shard> <replica> <host>192.168.1.30</host> <port>9000</port> </replica> </shard> </single_host_cluster> </remote_servers>
然后在新集群中执行:
INSERT INTO default.events_distributed SELECT * FROM remote('single_host_cluster', default, 'events_single');
- 上述操作会将单节点数据分批读取,并插入到分布式表,分布式表会自动分片到各 Shard。在数据量大的情况下,建议拆分范围分批执行,例如按照
dt
范围分区多次执行。
6.1.3 增量同步
在完成初次全量迁移后,可使用 ZooKeeper + Kafka 或持续抓取增量数据进入分布式表,以实现接近实时的迁移。
方案一:Materialized View + Kafka
- 在原单节点 ClickHouse 上创建一个 Kafka 引擎表,订阅写入事件;
- 创建一个 Materialized View,将 Kafka 中的数据插入到新集群的分布式表。
方案二:Debezium + Kafka Connect
- 使用 Debezium 将 MySQL/ClickHouse 的 Binlog 推到 Kafka;
- ClickHouse 侧使用 Kafka 引擎与 Materialized View 实时消费,插入分布式表。
6.2 MySQL 到 ClickHouse 的迁移示例(使用 Kafka 或 clickhouse-mysql
)
很多场景需要将 MySQL 中的业务表迁移到 ClickHouse 进行高性能 OLAP 查询。常用方案如下:
6.2.1 使用 Kafka + ClickHouse Kafka 引擎
- 在 MySQL 中开启 Binlog,并使用 Kafka Connect + Debezium 将数据写入 Kafka 主题(如
mysql.events
)。 在 ClickHouse 集群上创建 Kafka 引擎表
CREATE TABLE default.events_kafka ( `dt` Date, `user_id` UInt64, `action` String, `value` Float32 ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka1:9092,kafka2:9092', kafka_topic_list = 'mysql.events', kafka_group_name = 'ch_consumer_group', kafka_format = 'JSONEachRow', kafka_num_consumers = 4;
创建 Materialized View
Materialized View 将消费
events_kafka
,并将数据插入分布式表:CREATE MATERIALIZED VIEW default.events_mv TO default.events_distributed AS SELECT dt, user_id, action, value FROM default.events_kafka;
- 这样,Kafka 中的新数据会自动被 MV 推送到分布式表,实现实时同步。
6.2.2 使用 clickhouse-mysql
工具
clickhouse-mysql
是社区提供的一个 Python 脚本,可直接将 MySQL 表结构与数据迁移到 ClickHouse。
安装依赖
pip install clickhouse-mysql
执行迁移命令
clickhouse-mysql --mysql-host mysql_host --mysql-port 3306 --mysql-user root --mysql-password secret \ --clickhouse-host 192.168.1.20 --clickhouse-port 9000 --clickhouse-user default --clickhouse-password '' \ --database mydb --table events --clickhouse-database default --clickhouse-table events_distributed
- 默认会将 MySQL 表自动映射为 ClickHouse 表,如创建合适的 MergeTree 引擎表,再批量插入数据。
- 对于分布式环境,可先在新集群创建分布式表,再指定
--clickhouse-table
为分布式表,脚本会自动往分布式表写入数据。
6.3 clickhouse-copier
工具使用
clickhouse-copier
是 ClickHouse 社区自带的工具,可在集群内部做分片间或集群间的数据搬迁。
准备复制任务的配置文件(
copier_config.xml
)<copy> <shard> <cluster>cluster1</cluster> <replica>click1</replica> </shard> <shard> <cluster>cluster1</cluster> <replica>click3</replica> </shard> <tables> <table> <database>default</database> <name>events_local</name> </table> </tables> </copy>
- 上述示例将指定将
events_local
从 Shard1 的 click1 复制到 Shard2 的 click3,需根据实际场景配置更多<shard>
与<table>
。
- 上述示例将指定将
执行复制
clickhouse-copier --config /path/to/copier_config.xml --replication 0
--replication 0
表示不做 ReplicatedMergeTree 的基于日志复制,仅做一次全量迁移。- 适用于集群扩容、分片重平衡等操作。
6.4 INSERT SELECT
与外部表引擎同步
INSERT SELECT
适用于跨集群、跨数据库全量复制:
INSERT INTO default.events_distributed SELECT * FROM default.events_local WHERE dt >= '2023-09-01';
- 可分批(按日期、ID 范围)多次执行。
外部表引擎
ClickHouse 支持通过 MySQL 引擎访问 MySQL 表,如:
CREATE TABLE mysql_events ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = MySQL('mysql_host:3306', 'mydb', 'events', 'root', 'secret');
然后可在 ClickHouse 侧做:
INSERT INTO default.events_distributed SELECT * FROM mysql_events;
- 外部表引擎适合数据量相对较小或批量一次性导入,若是实时增量同步,仍推荐 Kafka + Materialized View。
6.5 实时同步示例:使用 Kafka 引擎 + Materialized View
在 MySQL 侧将 Binlog 推到 Kafka 后,ClickHouse 侧通过 Kafka 引擎表 + MV,实现近实时同步。
MySQL → Kafka
- 部署 Kafka 集群。
- 使用 Debezium Connector for MySQL,将 MySQL Binlog 写入 Kafka 主题
mysql.events_binlog
。
ClickHouse 侧创建 Kafka 表
CREATE TABLE default.events_binlog_kafka ( dt Date, user_id UInt64, action String, value Float32 ) ENGINE = Kafka SETTINGS kafka_broker_list = 'k1:9092,k2:9092', kafka_topic_list = 'mysql.events_binlog', kafka_group_name = 'ch_binlog_consumer', kafka_format = 'JSONEachRow', kafka_num_consumers = 4;
创建 Materialized View
CREATE MATERIALIZED VIEW default.events_binlog_mv TO default.events_distributed AS SELECT dt, user_id, action, value FROM default.events_binlog_kafka;
- 当 Kafka 有新消息(INSERT/UPDATE/DELETE)时,MV 自动触发,将数据写入分布式表。
- 对于 UPDATE/DELETE,可根据具体业务需求将这些操作转化为 ClickHouse 的 MergeTree 修改或 VXIN 等逻辑。
7. 运维与监控要点
在生产环境下,ClickHouse 分布式集群的健壮性和性能调优尤为关键。以下介绍一些常见的运维与监控要点。
7.1 ZooKeeper 集群监控
节点状态检查
echo ruok | nc 192.168.1.10 2181 # 返回 imok 则正常 echo stat | nc 192.168.1.10 2181 # 查看节点状态、客户端连接数
集群状态检查
echo srvr | nc 192.168.1.10 2181
- 可查看是否有选举 leader、是否存在掉线节点等。
监控指标
- 使用 Prometheus JMX Exporter + Grafana 监控 ZooKeeper 的请求延时、连接数、选举延时等。
7.2 ClickHouse 节点健康检查
系统表
system.replication_queue
:查看各 Replica 的复制队列积压情况。SELECT database, table, is_currently_executing, parts_to_merge, queue_size FROM system.replication_queue;
system.mutations
:查看表的 mutations(更新/删除)状态。SELECT database, table, mutation_id, is_done, parts_to_do, parts_done FROM system.mutations;
system.parts
:查看数据分区与磁盘占用情况。SELECT database, table, partition, name, active, bytes_on_disk FROM system.parts WHERE database='default' AND table LIKE 'events%';
system.metrics
/system.events
:监控 ClickHouse 实时指标,如 Query、Insert 吞吐量,Cache 命中率等。
持续监控
使用 ClickHouse-Prometheus Exporter + Grafana,可监控:
- Queries per second
- Replication lag
- Merge 任务数
- 内存使用占比
- 磁盘 I/O
7.3 分片与副本恢复流程
7.3.1 Replica 加入流程
新增 Replica
- 在一个 Shard 下新增 Replica,先在 ZooKeeper 对应路径下创建新 Replica 的目录。
- 在新节点上创建本地表(表结构需与原 Shard 保持一致),并指定新的 Replica 名称。
- 启动 ClickHouse,该 Replica 会从 ZooKeeper 上的复制队列拉取现有数据,完成全量数据复制。
Shard 扩容(横向扩容)
如果要增加 Shard 数量(比如从 2 个 Shard 扩容到 3 个),则需:
- 暂停写入,或者使用
clickhouse-copier
做分片重平衡。 - 在新节点上创建对应的本地 ReplicatedMergeTree 表,指定新的 Shard 路径。
- 使用
clickhouse-copier
或脚本将已有数据重分布到新的 Shard。
- 暂停写入,或者使用
7.3.2 副本修复流程
当某个 Replica 节点发生故障并恢复后,需要让它重新同步数据:
- 重启节点,它会检测到 ZooKeeper 上已有的副本信息。
- Replica 恢复复制,从 Leader 主动拉取尚未复制的分区文件并恢复。
检查状态
SELECT database, table, replica_name, is_leader, queue_size FROM system.replicas WHERE database='default' AND table LIKE 'events%';
queue_size=0
且is_currently_executing=0
表示恢复完成。
7.4 备份与恢复策略
备份工具
- Altinity ClickHouse Backup:社区推荐备份工具。支持全量/增量备份与恢复。
- 也可手动使用
clickhouse-client --query="SELECT * FROM table FORMAT Native"
导出,然后再用clickhouse-client --query="INSERT INTO table FORMAT Native"
导入。
ZooKeeper 数据备份
- 可使用
zkCli.sh
导出关键路径的节点数据,以及定期备份/var/lib/zookeeper/version-2
。
- 可使用
恢复流程
- 恢复 ZooKeeper 数据,保证
ReplicatedMergeTree
的队列信息完整。 - 重启 ClickHouse,Replica 会从 ZooKeeper 获取需要恢复的分区;
- 如果只想恢复部分数据,可手动删除对应的本地分区文件,再让 Replica 重新执行复制。
- 恢复 ZooKeeper 数据,保证
8. 常见问题与优化建议
在 ClickHouse 分布式生产环境中,经常会遇到性能瓶颈、数据倾斜、Shard 节点不均衡等问题。下面总结一些常见问题与优化技巧。
8.1 查询慢或分布式 JOIN 性能优化
避免跨 Shard JOIN
ClickHouse 的分布式 JOIN 在多 Shard 场景下需要将数据从一个 Shard 拉取到另一个 Shard 进行 Join,网络 I/O 成本高。建议:
- 数据预聚合(Denormalization):将需要关联的数据预先合并到同一个表中;
- 使用物化视图:在本地 MergeTree 表上预先计算好关键信息;
- 单 Shard 物理表:如果某个表非常小,可把它复制到每个 Shard 上本地 Join。
Distributed 聚合优化
- 对于大规模聚合查询,建议先在本地执行聚合(
aggregate_overflow_mode='throw'
),再在客户端进行最终合并。 - 使用
settings max_threads = X, max_memory_usage = Y
控制查询资源消耗。
- 对于大规模聚合查询,建议先在本地执行聚合(
8.2 数据倾斜与分片键设计
数据倾斜
- 如果分片键导出的数据在某个 Shard 过多而其他 Shard 较少,导致 Shard1 负载过重,Shards2/3 空闲。
解决方案:
- 重新设计分片键,例如使用复合键或哈希函数与随机数结合;
- 动态调整分片策略,使用一致性哈希等更均衡的方案;
- 扩容 Shard 节点,将热点数据分摊到更多 Shard。
8.3 磁盘、内存、网络调优
磁盘性能
- 推荐使用 SSD 或 NVMe,至少提供 10,000+ IOPS;
- ClickHouse 在 Merge 任务、高并发写入时对磁盘 I/O 敏感。可使用 RAID0 多盘并行提升吞吐。
内存配置
- 设置合理的
max_memory_usage
- 调整 [
max_threads
] 来控制并行度,避免 OOM; - 若有大量 Map/Join 操作,可考虑开启 [
join_use_nulls_for_low_cardinality_keys
] 以减少内存占用。
- 设置合理的
网络带宽与延迟
分布式查询与复制都依赖网络:
- 使用至少 10Gb/s 以降低跨 Shard 数据传输延迟;
- 配置
max_distributed_connections
、receive_timeout
、send_timeout
等参数优化通信。
9. 总结
本文从 ClickHouse 分布式架构原理入手,详细讲解了如何在生产环境下:
- 部署 ZooKeeper 高可用集群,并配置 ClickHouse 节点连接;
- 设计分布式集群拓扑,实现 Shard 与 Replica 的高可用与负载均衡;
- 在各节点创建 ReplicatedMergeTree 本地表,通过 ZooKeeper 管理副本复制;
- 使用 Distributed 引擎创建逻辑表,自动实现跨 Shard 路由与分布式聚合;
- 演示数据写入与查询流程,并提供批量 Insert、Distributed 聚合等常见操作示例;
- 提供多种数据迁移方案,包括单机→分布式迁移、MySQL→ClickHouse 同步、Kafka 实时同步等全流程;
- 总结运维与监控要点,探讨 Replica 恢复、Shard 扩容、性能调优等实战经验;
- 针对常见问题给出优化建议,如数据倾斜、跨 Shard JOIN 降低网络开销、硬件选型等。
通过本文内容,你可以:
- 搭建一个稳定的 ClickHouse 分布式集群,实现数据的高可用与水平扩展;
- 利用 ReplicatedMergeTree 与 Distributed 引擎,灵活构建分布式表结构;
- 结合 Kafka、Materialized View、
clickhouse-copier
等工具,实现多源异构数据迁移与实时同步; - 在运维过程中通过系统表与监控手段快速排查问题,保证集群高效运行;
- 通过合理的分片键与硬件优化,避免数据倾斜与性能瓶颈。
评论已关闭