ClickHouse分布式部署、表创建及数据迁移全攻略‌

ClickHouse分布式部署、表创建及数据迁移全攻略

在大数据时代,ClickHouse 以其高性能 OLAP 查询能力和良好的水平扩展性,逐渐成为海量时序与分析场景的首选数据库。要将 ClickHouse 用于生产环境,往往需要部署分布式集群,实现数据的分片与复制,并针对业务场景设计分布式表结构与数据迁移策略。本文将从分布式架构原理出发,结合代码示例与图解,全面介绍如何完成 ClickHouse 分布式集群的部署、表的创建(含复制表与分布式表)、以及数据迁移的多种手段,帮助你快速掌握 ClickHouse 在生产环境的使用要点。


目录

  1. ClickHouse 分布式架构概述
    1.1. 单节点 vs 分布式
    1.2. 分片(Shard)与副本(Replica)
    1.3. ZooKeeper 在分布式中的作用
  2. 环境准备与组件安装
    2.1. 系统与网络要求
    2.2. 安装 ZooKeeper 集群
    2.3. 安装 ClickHouse 节点
  3. 分布式集群部署示例
    3.1. 集群拓扑设计与图解
    3.2. ZooKeeper 配置
    3.3. ClickHouse config.xmlusers.xml 配置
    3.4. 启动 ClickHouse 服务与校验
  4. 分布式表引擎与表创建
    4.1. MergeTree 与 ReplicatedMergeTree 引擎
    4.2. Distributed 引擎原理与实现
    4.3. 本地表与分布式表创建示例
    4.4. 示例:查询分布式表的执行流程图解
  5. 数据写入、查询与负载均衡
    5.1. 写入到 ReplicatedMergeTree 且分片自动路由
    5.2. 分布式表查询流程详解
    5.3. Insert、Select 示例
  6. 数据迁移与同步策略
    6.1. 单机 ClickHouse 到分布式集群迁移
    6.2. MySQL 到 ClickHouse 的迁移示例(使用 Kafka 或 clickhouse-mysql
    6.3. clickhouse-copier 工具使用
    6.4. INSERT SELECT 与外部表引擎同步
    6.5. 实时同步示例:使用 Kafka 引擎 + Materialized View
  7. 运维与监控要点
    7.1. ZooKeeper 集群监控
    7.2. ClickHouse 节点健康检查
    7.3. 分片与副本恢复流程
    7.4. 备份与恢复策略
  8. 常见问题与优化建议
    8.1. 查询慢或分布式 JOIN 性能优化
    8.2. 数据倾斜与分片键设计
    8.3. 磁盘、内存、网络调优
  9. 总结

1. ClickHouse 分布式架构概述

在深入部署细节之前,首先要明确 ClickHouse 在分布式场景下的几大核心概念:分片(Shard)、副本(Replica)、ZooKeeper 元数据管理,以及分布式表(Distributed Engine)与本地表(MergeTree/ReplicatedMergeTree)的配合

1.1 单节点 vs 分布式

  • 单节点部署

    • 典型用于测试、小规模数据或单机分析。
    • 数据存储在本地 MergeTree 或其派生引擎(如 SummingMergeTree、AggregatingMergeTree 等)表中。
    • 缺点:无法横向扩展,无副本冗余,节点宕机即数据不可用。
  • 分布式部署

    • 通过将数据按某种分片策略均匀分布到多个实例(Shard)上,同时为每个 Shard 配置副本(Replica),实现高可用与水平扩展。
    • 查询时,客户端可通过分布式表路由到对应 Shard,或跨 Shard 聚合查询。
    • 核心组件:

      • ClickHouse 节点:负责存储与执行。
      • ZooKeeper:负责存储分布式元数据(表的分片 & 副本信息、DDL 同步)。

1.2 分片(Shard)与副本(Replica)

  • Shard(分片)

    • 将逻辑数据集按分片键(如用户 ID、时间范围或哈希值)均匀切分为多个子集,每个子集部署在不同的节点上。
    • 常见策略:

      • Hash 分片shard_key = cityHash64(user_id) % shard_count
      • 范围分片:根据时间/业务范围拆分。
  • Replica(副本)

    • 每个 Shard 下可部署多个 Replica,保证 Shard 内数据的一致性与高可用。
    • Replica 间基于 ZooKeeper 的复制队列自动同步数据。
    • 在一个 Replica 挂掉时,点击恢复或重启,其他 Replica 可继续提供服务。

图解:多 Shard / 多 Replica 架构示例

               ┌────────────────────────────────────────────────┐
               │               ZooKeeper 集群(3 节点)          │
               │  存储:/clickhouse/tables/{db}.{table}/shardN   │
               └────────────────────────────────────────────────┘
                      │                   │               │
     ┌────────────────┴─────┐     ┌─────────┴────────┐      │
     │ Shard 1              │     │ Shard 2           │      │
     │ ┌─────────┐ ┌───────┐ │     │ ┌─────────┐ ┌──────┐ │      │
     │ │Replica1 │ │Replica2│ │     │ │Replica1 │ │Replica2│ │      │
     │ │ Node A  │ │ Node B │ │     │ │ Node C  │ │ Node D │ │      │
     │ └─────────┘ └───────┘ │     │ └─────────┘ └──────┘ │      │
     └───────────────────────┘     └─────────────────────┘      │
                      │                   │                   │
                      │                   │                   │
                分布式表路由 / 跨 Shard 聚合查询              │
  • Shard 内部:Replica1、Replica2 两个副本互为冗余,Replica1、Replica2 分别部署在不同物理机上,以应对单点故障。
  • 跨 Shard:客户端通过分布式表(Distributed Engine)将查询分发至每个 Shard 下的副本,由 ZooKeeper 协调副本选择。

1.3 ZooKeeper 在分布式中的作用

ClickHouse 的分布式功能依赖 ZooKeeper 来保证以下核心功能:

  1. DDL 同步

    • 所有 Replica 在创建表、修改表结构时通过 ZooKeeper 写入变更路径,确保各节点同步执行 DDL。
  2. 复制队列管理(ReplicatedMergeTree)

    • 每个 Replica 会将本地插入/删除任务写入 ZooKeeper 中对应分片的队列节点,其他 Replica 订阅该队列并拉取任务执行,实现数据复制。
  3. 分布式表元数据

    • Distributed Engine 在 ZooKeeper 中读取集群信息,确定如何将某条 SQL 分发到各个分片。
  4. 副本故障检测与恢复

    • ZooKeeper 记录当前可用 Replica 列表,当某个 Replica 宕机或网络不可达,其他 Replica 会继续提供写入与查询。

ZooKeeper 目录示例(部分)

/clickhouse/
   ├─ tables/
   │    └─ default.hits/            # hits 表对应的节点
   │         ├─ shard1/             # Shard1 下的所有 Replica
   │         │    ├─ leader_election -> 存储当前 leader 信息
   │         │    └─ queue/Replica1  -> 存储 Replica1 的写入操作
   │         └─ shard2/             # Shard2 下
   │              └─ queue/Replica3
   ├─ macros/                       # 宏定义,可在配置中使用
   └─ replication_alter_columns/... # DDL 同步信息

2. 环境准备与组件安装

本文以 Ubuntu 20.04 为示例操作系统,假设即将部署 2 个 Shard,每个 Shard 2 个 Replica,共 4 台 ClickHouse 节点,并使用 3 节点 ZooKeeper 集群保障高可用。

2.1 系统与网络要求

  1. 操作系统

    • 建议使用 Debian/Ubuntu/CentOS 等 Linux 发行版,本文以 Ubuntu 20.04 为例。
  2. 网络连通性

    • 所有节点之间需互相能通:

      ping zk1 zk2 zk3
      ping click1 click2 click3 click4
    • 关闭防火墙或放通必要端口:

      • ZooKeeper:2181(客户端访问)、2888/3888(集群内部选举)。
      • ClickHouse:9000(TCP 协议,默认客户端端口)、8123(HTTP 接口)、9009(Keeper 通信,若启用 Keeper 模式,可忽略)。
  3. 时间同步

    • 建议使用 NTP 或 chrony 保证各节点时间同步,否则会影响 ReplicatedMergeTree 的副本选举与健康检查。

      sudo apt-get install chrony
      sudo systemctl enable chrony
      sudo systemctl start chrony

2.2 安装 ZooKeeper 集群

在 3 台节点(假设 IP 分别为 192.168.1.10/11/12)上完成 ZooKeeper 安装与集群配置。

2.2.1 下载与解压

# 在每台机器执行
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/
ln -s /opt/apache-zookeeper-3.7.1-bin /opt/zookeeper

2.2.2 配置 zoo.cfg

# 编辑 /opt/zookeeper/conf/zoo.cfg (如果目录下无 zoo.cfg 示例,可复制 conf/zoo_sample.cfg)
cat <<EOF > /opt/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
# 集群内部通信端口(选举与同步)
server.1=192.168.1.10:2888:3888
server.2=192.168.1.11:2888:3888
server.3=192.168.1.12:2888:3888
EOF

2.2.3 创建 dataDir 与 myid

# 在每台机器分别执行
sudo mkdir -p /var/lib/zookeeper
sudo chown $(whoami):$(whoami) /var/lib/zookeeper

# 将编号写入 myid(与 zoo.cfg 中 server.N 对应)
# 机器 192.168.1.10
echo "1" > /var/lib/zookeeper/myid
# 机器 192.168.1.11
echo "2" > /var/lib/zookeeper/myid
# 机器 192.168.1.12
echo "3" > /var/lib/zookeeper/myid

2.2.4 启动 ZooKeeper

# 同步在 3 台节点上启动
/opt/zookeeper/bin/zkServer.sh start
# 检查集群状态
/opt/zookeeper/bin/zkServer.sh status
# 期望输出类似 “Mode: leader” 或 “Mode: follower”

至此,3 节点 ZooKeeper 集群已启动并形成仲裁,可支持多副本 ClickHouse 的元数据管理。

2.3 安装 ClickHouse 节点

在 4 台 ClickHouse 节点(假设 IP 为 192.168.1.20/21/22/23)上,按照以下步骤安装 ClickHouse:

2.3.1 安装 Yandex 官方仓库并安装

# 安装官方 GPG Key
sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4
# 添加仓库
echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
# 更新并安装
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client

2.3.2 配置防火墙与端口

# 放通 TCP 9000、8123、9009 端口(若使用 CentOS,可用 firewalld 或 iptables)
sudo ufw allow 9000/tcp
sudo ufw allow 8123/tcp
sudo ufw allow 9009/tcp
sudo ufw reload

2.3.3 启动 ClickHouse 服务

sudo systemctl enable clickhouse-server
sudo systemctl start clickhouse-server
# 查看日志,确认正常启动
sudo journalctl -u clickhouse-server -f
注意:此时 ClickHouse 还未配置分布式功能,仅是默认的单节点模式。

3. 分布式集群部署示例

下面以 2 Shard × 2 Replica 为例,演示如何将 4 个 ClickHouse 节点组成分布式集群。假设对应节点如下:

  • Shard1

    • Replica1:192.168.1.20(click1)
    • Replica2:192.168.1.21(click2)
  • Shard2

    • Replica1:192.168.1.22(click3)
    • Replica2:192.168.1.23(click4)

3.1 集群拓扑设计与图解

            ┌────────────────────────────────────────────────┐
            │                   ZooKeeper 3 节点           │
            │   [192.168.1.10, 11, 12] 端口 2181,2888,3888  │
            └────────────────────────────────────────────────┘
                        │              │              │
       ┌────────────────┴──────────────┴──────────────┴───────────────┐
       │                    ClickHouse 分布式集群                       │
       │ Shard1                                  Shard2                 │
       │ ┌───────────┐ ┌───────────┐         ┌───────────┐ ┌───────────┐ │
       │ │ click1    │ │ click2    │         │ click3    │ │ click4    │ │
       │ │ (Replica) │ │ (Replica) │         │ (Replica) │ │ (Replica) │ │
       │ │ zk:2181   │ │ zk:2181   │         │ zk:2181   │ │ zk:2181   │ │
       │ └───────────┘ └───────────┘         └───────────┘ └───────────┘ │
       └───────────────────────────────────────────────────────────────┘
               │                  │              │                  │
               │  ReplicatedMergeTree 本地表 (pathy)  │ Distributed 表 (path) │
               │  数据分片 & 自动复制                 │ 跨 Shard 查询路由     │
  • ZooKeeper:运行在 192.168.1.10/11/12:2181
  • click1/click2:Shard1 的 2 个 Replica,两个节点负责存储 Shard1 的数据,数据通过 ZooKeeper 自动复制。
  • click3/click4:Shard2 的 2 个 Replica,同理。

3.2 ZooKeeper 配置

上文已完成 ZooKeeper 集群搭建,确认集群健康后,ClickHouse 参考以下 ZooKeeper 连接方式即可。

<!-- /etc/clickhouse-server/config.xml (各节点相同,只需保证 zk 配置正确) -->
<yandex>
    <!-- 其他配置省略 -->
    <zookeeper>
        <node>
            <host>192.168.1.10</host>
            <port>2181</port>
        </node>
        <node>
            <host>192.168.1.11</host>
            <port>2181</port>
        </node>
        <node>
            <host>192.168.1.12</host>
            <port>2181</port>
        </node>
    </zookeeper>
    <!-- 更多配置... -->
</yandex>

3.3 ClickHouse config.xmlusers.xml 配置

为了实现 ReplicatedMergeTree 与 Distributed 引擎,需修改以下配置文件。

3.3.1 修改 config.xml

编辑 /etc/clickhouse-server/config.xml,在 <yandex> 节点内添加以下段落:

<yandex>
    <!-- ... 原有配置 ... -->

    <!-- ZooKeeper 节点 (已如上所示) -->
    <zookeeper>
        <node>
            <host>192.168.1.10</host>
            <port>2181</port>
        </node>
        <node>
            <host>192.168.1.11</host>
            <port>2181</port>
        </node>
        <node>
            <host>192.168.1.12</host>
            <port>2181</port>
        </node>
    </zookeeper>

    <!-- 为分布式部署添加 shards 与 replicas 定义 -->
    <remote_servers>
        <!-- 定义一个逻辑集群名 cluster1,包含 2 个 shard -->
        <cluster1>
            <shard>
                <replica>
                    <host>192.168.1.20</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>192.168.1.21</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>192.168.1.22</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>192.168.1.23</host>
                    <port>9000</port>
                </replica>
            </shard>
        </cluster1>
    </remote_servers>

    <!-- 定义默认数据库 macros,方便在 SQL 中使用 {cluster} -->
    <macros>
        <cluster>cluster1</cluster>
        <shard>shard1</shard> <!-- 可留空,主要使用 macros.cluster -->
    </macros>

    <!-- 持久化参数,以及其他可选配置 -->
    <!-- ... -->
</yandex>
  • <remote_servers>

    • 定义逻辑集群名称 cluster1,下有两个 <shard> 节点,每个 <shard> 下有若干 <replica>
    • 在后续创建 Distributed 表时,会引用 cluster1,ClickHouse 自动根据此配置将查询分发到各 shard 下的一个副本。
  • <macros>

    • 定义了 {cluster} 宏,后续 SQL 可直接使用 remote('cluster1', ...){cluster}

修改完成后,重启 ClickHouse 节点以使配置生效:

sudo systemctl restart clickhouse-server

3.3.2 修改 users.xml(可选)

若需为分布式表访问设置白名单,建议修改 /etc/clickhouse-server/users.xml,在相应用户下添加 <networks>

<!-- users.xml 片段 -->
<profiles>
    <default>
        <!-- 其他配置 -->
    </default>
</profiles>

<users>
    <default>
        <password></password>
        <networks>
            <ip>::/0</ip> <!-- 允许任意 IP 访问 -->
        </networks>
        <profile>default</profile>
        <quota>default</quota>
    </default>
</users>

若公司内部有统一授权管理,可为特定用户专门配置分布式访问权限。

3.4 启动 ClickHouse 服务与校验

  1. 重启所有 ClickHouse 节点

    sudo systemctl restart clickhouse-server
  2. 校验 ZooKeeper 连接

    clickhouse-client --query="SELECT * FROM system.zookeeper WHERE path LIKE '/clickhouse/%' LIMIT 5;"
    • 若能正常返回节点信息,则表明 ClickHouse 成功连接到 ZooKeeper。
  3. 校验 remote_servers 配置是否生效
    在任意一台节点上执行:

    clickhouse-client --query="SELECT host_name(), version();"
    # 查看本地信息

    然后执行跨集群的 Hello 查询:

    clickhouse-client --query="SELECT * FROM remote('cluster1', system.one) LIMIT 4;"
    • 该查询会在 cluster1 下的每个 Replica 上执行 SELECT * FROM system.one LIMIT 1,汇总 4 条记录。如果能正常返回 4 条,则表示 remote\_servers 生效。

4. 分布式表引擎与表创建

在完成分布式部署后,需要了解 ClickHouse 提供的几种常见表引擎,并结合分布式场景设计合适的表结构。

4.1 MergeTree 与 ReplicatedMergeTree 引擎

  • MergeTree 系列

    • 最常用的引擎,适用于单机场景或非严格高可用需求。
    • 支持分区(PARTITION BY)、排序键(ORDER BY)、TTL、物化视图等。
    • 示例创建:

      CREATE TABLE default.events_mt (
        dt Date,
        user_id UInt64,
        action String,
        value Float32
      )
      ENGINE = MergeTree()
      PARTITION BY toYYYYMM(dt)
      ORDER BY (user_id, dt);
  • ReplicatedMergeTree 系列

    • 在 MergeTree 基础上,增加了通过 ZooKeeper 实现副本复制与容灾能力。
    • 需要传入两个重要参数:

      1. ZooKeeper 路径:例如 /clickhouse/tables/{database}.{table}/shardN
      2. Replica 名称:在同一 Shard 下需唯一,如 replica1replica2
    • 示例创建(在 Shard1 下的两个 Replica 分别执行):

      CREATE TABLE default.events_shard1_replica1 (
        dt Date,
        user_id UInt64,
        action String,
        value Float32
      )
      ENGINE = ReplicatedMergeTree(
        '/clickhouse/tables/default.events/shard1',  -- ZooKeeper 路径
        'replica1'                                   -- Replica 名称
      )
      PARTITION BY toYYYYMM(dt)
      ORDER BY (user_id, dt);
      
      CREATE TABLE default.events_shard1_replica2 (
        dt Date,
        user_id UInt64,
        action String,
        value Float32
      )
      ENGINE = ReplicatedMergeTree(
        '/clickhouse/tables/default.events/shard1',  -- 与 replica1 相同的路径
        'replica2'
      )
      PARTITION BY toYYYYMM(dt)
      ORDER BY (user_id, dt);
  • Shard2 下分别创建两个 Replica

    CREATE TABLE default.events_shard2_replica1 (
      dt Date,
      user_id UInt64,
      action String,
      value Float32
    )
    ENGINE = ReplicatedMergeTree(
      '/clickhouse/tables/default.events/shard2',
      'replica1'
    )
    PARTITION BY toYYYYMM(dt)
    ORDER BY (user_id, dt);
    
    CREATE TABLE default.events_shard2_replica2 (
      dt Date,
      user_id UInt64,
      action String,
      value Float32
    )
    ENGINE = ReplicatedMergeTree(
      '/clickhouse/tables/default.events/shard2',
      'replica2'
    )
    PARTITION BY toYYYYMM(dt)
    ORDER BY (user_id, dt);

说明

  • ZooKeeper 路径 '/clickhouse/tables/default.events/shard1' 与 Shard 名称保持一致,有助于后续维护。
  • 每个 Shard 下的 Replica 都指定相同的 ZooKeeper 路径,Replica 在同一路径上协调数据复制。

4.2 Distributed 引擎原理与实现

  • Distributed 引擎

    • 提供跨 Shard 的查询路由能力,本质上是一个逻辑视图,将查询分发到建在各 Shard 下的本地表,再在客户端聚合结果。
    • 创建时需要指定:

      1. 集群名称:与 config.xmlremote_servers 配置保持一致,如 cluster1
      2. 数据库和表名:在各 Replica 上实际存在的本地表名(如 default.events_shard1_replica1..._replica2...shard2_replica1...shard2_replica2)。
      3. 分片键(可选):用于将写入分发到某个 Shard,而不是广播到所有 Shard。
    • 示例创建:

      CREATE TABLE default.events_distributed (
        dt Date,
        user_id UInt64,
        action String,
        value Float32
      )
      ENGINE = Distributed(
        'cluster1',    -- 与 config.xml 中 remote_servers 的 <cluster1>
        'default',     -- 数据库名
        'events_local',-- 各 Shard 对应的本地表前缀(需在各节点上创建同名本地表)
        rand()         -- 分片键,可改为 cityHash64(user_id)
      );
    • 由于各 Shard 下的本地表可能使用 ReplicatedMergeTree 并加入了 Replica 后缀,为简化管理,可在各 local 表下创建一个同名别名表 events_local,指向当前 Replica。示例:

      每台节点(click1\~click4)都创建一个同名的本地别名表:

      CREATE TABLE default.events_local AS default.events_shard1_replica1;  -- click1
      CREATE TABLE default.events_local AS default.events_shard1_replica2;  -- click2
      CREATE TABLE default.events_local AS default.events_shard2_replica1;  -- click3
      CREATE TABLE default.events_local AS default.events_shard2_replica2;  -- click4

      这样,在 Distributed 引擎中只需引用 events_local,ClickHouse 会自动查找每个节点上对应的本地表。

4.3 本地表与分布式表创建示例

下面结合 Shard1/Shard2、Replica1/Replica2 全流程示例。

4.3.1 Shard1 Replica1 上创建本地表

-- 点击 click1 (Shard1 Replica1)
CREATE DATABASE IF NOT EXISTS default;

CREATE TABLE default.events_shard1_replica1 (
  dt Date,
  user_id UInt64,
  action String,
  value Float32
)
ENGINE = ReplicatedMergeTree(
  '/clickhouse/tables/default.events/shard1',
  'replica1'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);

4.3.2 Shard1 Replica2 上创建本地表

-- 点击 click2 (Shard1 Replica2)
CREATE DATABASE IF NOT EXISTS default;

CREATE TABLE default.events_shard1_replica2 (
  dt Date,
  user_id UInt64,
  action String,
  value Float32
)
ENGINE = ReplicatedMergeTree(
  '/clickhouse/tables/default.events/shard1',
  'replica2'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);

4.3.3 Shard2 Replica1 上创建本地表

-- 点击 click3 (Shard2 Replica1)
CREATE DATABASE IF NOT EXISTS default;

CREATE TABLE default.events_shard2_replica1 (
  dt Date,
  user_id UInt64,
  action String,
  value Float32
)
ENGINE = ReplicatedMergeTree(
  '/clickhouse/tables/default.events/shard2',
  'replica1'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);

4.3.4 Shard2 Replica2 上创建本地表

-- 点击 click4 (Shard2 Replica2)
CREATE DATABASE IF NOT EXISTS default;

CREATE TABLE default.events_shard2_replica2 (
  dt Date,
  user_id UInt64,
  action String,
  value Float32
)
ENGINE = ReplicatedMergeTree(
  '/clickhouse/tables/default.events/shard2',
  'replica2'
)
PARTITION BY toYYYYMM(dt)
ORDER BY (user_id, dt);

提示:在创建完上述本地表后,可使用以下命令检查副本同步是否正常:

-- 在任意节点执行
SELECT
  database,
  table,
  is_leader,
  queue_size,
  future_parts,
  parts_to_merge,
  last_queue_update,
  last_queue_update_time
FROM system.replicas
WHERE database = 'default' AND table LIKE 'events%';
  • 查看 is_leaderqueue_size 是否为 0,表示副本同步正常;若有积压任务,可等待或手动修复。

4.3.5 在每个节点上创建本地别名表

为了让分布式引擎统一使用同名本地表,建议在每个节点上都创建一个 events_local 别名表,指向上一步创建的 Replica 表。示例如下:

  • click1(Shard1 Replica1)

    CREATE TABLE default.events_local AS default.events_shard1_replica1;
  • click2(Shard1 Replica2)

    CREATE TABLE default.events_local AS default.events_shard1_replica2;
  • click3(Shard2 Replica1)

    CREATE TABLE default.events_local AS default.events_shard2_replica1;
  • click4(Shard2 Replica2)

    CREATE TABLE default.events_local AS default.events_shard2_replica2;
说明:别名表不会在存储目录再新建数据;它只是一个对 ReplicatedMergeTree 本地表的引用(ATTACH TABLE 方式)。如果希望更严格隔离,也可以使用 ATTACH TABLE 语法,但 AS ... 方式足够常见。

4.3.6 创建分布式表

在任意一台节点(建议使用 click1)上执行:

CREATE TABLE default.events_distributed (
  dt Date,
  user_id UInt64,
  action String,
  value Float32
)
ENGINE = Distributed(
  'cluster1',         -- 与 config.xml 中定义的集群名称
  'default',          -- 数据库名
  'events_local',     -- 各节点上本地表别名
  cityHash64(user_id) -- 分片键
);

关键说明

  • cityHash64(user_id):ClickHouse 内置的一种哈希函数,可将 user_id 映射到 [0, 2^64) 区间后再 % shard_count,分散写入到不同的 Shard。
  • 如果不填分片键(如填 rand()''),则 Insert 操作会自动将每条记录广播到所有 Shard。

到此,分布式表与本地 Replica 表的创建已完成。

4.4 示例:查询分布式表的执行流程图解

┌─────────────────────────────────────────────────────────────────────────┐
│                         ClickHouse Client                              │
│   SELECT user_id, count() FROM default.events_distributed GROUP BY user_id  │
└─────────────────────────────────────────────────────────────────────────┘
                             │
                   查询路由到 cluster1
                             │
        ┌────────────────────┴────────────────────┐
        │                                         │
┌───────────────┐                       ┌───────────────┐
│    Shard1     │                       │    Shard2     │
│ (click1/2)    │                       │ (click3/4)    │
│ Distributed   │                       │ Distributed   │
│ Engine Worker │                       │ Engine Worker │
└───────┬───────┘                       └───────┬───────┘
        │      查询对应本地表 events_local             │      查询对应本地表 events_local
        ▼                                         ▼
┌───────────────┐                       ┌───────────────┐
│ Local Table   │                       │ Local Table   │
│ events_local  │                       │ events_local  │
│ (Shard1 Data) │                       │ (Shard2 Data) │
│ ReplicatedMT  │                       │ ReplicatedMT  │
└───────┬───────┘                       └───────┬───────┘
        │                                         │
        │ 执行 group by、count() 本地聚合            │ 执行本地聚合
        │                                         │
        ▼                                         ▼
┌──────────────────┐                     ┌──────────────────┐
│ Partial Results  │                     │ Partial Results  │
│ (user_id, count) │                     │ (user_id, count) │
└──────────┬───────┘                     └──────────┬───────┘
           │                                         │
           │         将部分结果汇总到客户端并进行最终合并         │
           └───────────────┬─────────────────────────────────────┘
                           ▼
                    客户端合并聚合结果
                           │
                           ▼
               返回最终 (user_id, total_count) 列表
  • Shard1/Shard2:分布式表引擎仅充当调度者,真正的计算在各节点本地的 events_local
  • 本地聚合:为了减少网络传输,ClickHouse 默认会先在本地执行 GroupBy、聚合等操作,只有聚合后较小的中间结果通过网络返回再做最终合并。这样能显著提高分布式查询性能。

5. 数据写入、查询与负载均衡

完成表结构创建后,接下来演示如何将数据写入分布式表与查询,以及写入时如何自动分片或广播。

5.1 写入到 ReplicatedMergeTree 且分片自动路由

  • 使用分布式表写入

    • 推荐通过分布式表 events_distributed 写入,ClickHouse 会根据 cityHash64(user_id) % shard_count 自动将数据路由到相应 Shard 的 Replica(随机选择一个可用 Replica 写入)。
    • 示例插入 3 条数据,user\_id 为 1、2、3:

      INSERT INTO default.events_distributed VALUES
      ('2023-09-01', 1, 'click', 10.5),
      ('2023-09-01', 2, 'view', 5.0),
      ('2023-09-01', 3, 'purchase', 100.0);
      • 若 Shard Count=2,那么:

        • 对于 user_id = 1cityHash64(1) % 2 = 1(假设),路由到 Shard2;
        • user_id = 2%2 = 0,写入 Shard1;
        • user_id = 3%2 = 1,写入 Shard2。
  • 写入副本选择

    • Shard 内部多个 Replica 会随机选择一个可写 Replica;若写入的 Replica 挂掉,其他 Replica 会接受写入请求。写入后,Replica 间基于 ZooKeeper 自动同步数据。

5.2 分布式表查询流程详解

  • 查询 events_distributed

    • 当执行 SELECT * FROM events_distributed WHERE user_id = 2; 时,ClickHouse 会根据分片键 cityHash64(2) % 2 计算出目标 Shard(Shard1),并将查询请求发给 Shard1 的一个 Replica。
    • 然后在该 Replica 上查询 events_local(即 Shard1 本地的 ReplicatedMergeTree 表),返回结果。
    • 如果 Query 涉及跨 Shard(如 GROUP BY 或不带 WHERESELECT *),则请求会广播到所有 Shard,每个 Shard 返回部分结果,最后由客户端合并。
  • 分布式聚合与性能

    • 对于大表聚合查询,分布式表引擎会首先在每个 Shard 本地进行“部分聚合(partial aggregation)”,然后再把各 Shard 的部分结果收集到一个节点进行“最终聚合(final aggregation)”,大幅减少网络传输量。

5.3 Insert、Select 示例

  • 批量插入示例

    INSERT INTO default.events_distributed
    SELECT 
      toDate('2023-09-02') AS dt, 
      number AS user_id, 
      'auto' AS action, 
      number * 1.1 AS value
    FROM numbers(100000)  -- 生成 100,000 条测试数据
    WHERE number < 10000; -- 只写入前 10,000 条
  • 查询示例

    -- 查看 Shard1 上的数据量(仅在 Shard1 的 click1 或 click2 节点上执行)
    SELECT count(*) FROM default.events_shard1_replica1;
    SELECT count(*) FROM default.events_shard1_replica2;
    
    -- 查询分布式表中的总数据量
    SELECT count(*) FROM default.events_distributed;
    
    -- 分布式聚合示例
    SELECT user_id, count() AS cnt
    FROM default.events_distributed
    GROUP BY user_id
    ORDER BY cnt DESC
    LIMIT 10;
  • 验证数据一致性
    在 Shard1 Replica1 与 Replica2 上分别查询本地表,确认两者数据同步:

    SELECT count(*) FROM default.events_shard1_replica1;
    SELECT count(*) FROM default.events_shard1_replica2;

6. 数据迁移与同步策略

在实际生产中,经常需要将已有数据迁移到新的分布式 ClickHouse 集群,或与外部数据库(如 MySQL)集成,实现实时或离线数据同步。下面介绍几种常见迁移与同步方案。

6.1 单机 ClickHouse 到分布式集群迁移

假设已有一个单节点 ClickHouse(192.168.1.30),其中有表 default.events_single,需要将其数据迁移到上述分布式集群并保持不间断服务。

6.1.1 在新集群创建同结构的分布式表

  1. 在新集群创建 ReplicatedMergeTree 本地表与 Distributed 表(与前节示例一致)。
  2. 确保 events_distributed 已就绪。

6.1.2 使用 INSERT SELECT 迁移数据

在原单节点上执行以下操作,将数据复制到分布式表(通过 clickhouse-client 连接到分布式集群任一节点即可):

clickhouse-client --host=192.168.1.20 --query="
INSERT INTO default.events_distributed
SELECT * FROM remote('single_host', default, 'events_single')
"
  • 需先在 config.xmlremote_servers 中配置 single_host,以便分布式查询原节点数据。示例配置(在每个新集群节点的 /etc/clickhouse-server/config.xml 添加):

    <remote_servers>
        <single_host_cluster>
            <shard>
                <replica>
                    <host>192.168.1.30</host>
                    <port>9000</port>
                </replica>
            </shard>
        </single_host_cluster>
    </remote_servers>
  • 然后在新集群中执行:

    INSERT INTO default.events_distributed
    SELECT * FROM remote('single_host_cluster', default, 'events_single');
  • 上述操作会将单节点数据分批读取,并插入到分布式表,分布式表会自动分片到各 Shard。在数据量大的情况下,建议拆分范围分批执行,例如按照 dt 范围分区多次执行。

6.1.3 增量同步

在完成初次全量迁移后,可使用 ZooKeeper + Kafka 或持续抓取增量数据进入分布式表,以实现接近实时的迁移。

  • 方案一:Materialized View + Kafka

    • 在原单节点 ClickHouse 上创建一个 Kafka 引擎表,订阅写入事件;
    • 创建一个 Materialized View,将 Kafka 中的数据插入到新集群的分布式表。
  • 方案二:Debezium + Kafka Connect

    • 使用 Debezium 将 MySQL/ClickHouse 的 Binlog 推到 Kafka;
    • ClickHouse 侧使用 Kafka 引擎与 Materialized View 实时消费,插入分布式表。

6.2 MySQL 到 ClickHouse 的迁移示例(使用 Kafka 或 clickhouse-mysql

很多场景需要将 MySQL 中的业务表迁移到 ClickHouse 进行高性能 OLAP 查询。常用方案如下:

6.2.1 使用 Kafka + ClickHouse Kafka 引擎

  1. 在 MySQL 中开启 Binlog,并使用 Kafka Connect + Debezium 将数据写入 Kafka 主题(如 mysql.events)。
  2. 在 ClickHouse 集群上创建 Kafka 引擎表

    CREATE TABLE default.events_kafka (
      `dt` Date,
      `user_id` UInt64,
      `action` String,
      `value` Float32
    ) ENGINE = Kafka SETTINGS
      kafka_broker_list = 'kafka1:9092,kafka2:9092',
      kafka_topic_list = 'mysql.events',
      kafka_group_name = 'ch_consumer_group',
      kafka_format = 'JSONEachRow',
      kafka_num_consumers = 4;
  3. 创建 Materialized View

    • Materialized View 将消费 events_kafka,并将数据插入分布式表:

      CREATE MATERIALIZED VIEW default.events_mv TO default.events_distributed AS
      SELECT
        dt,
        user_id,
        action,
        value
      FROM default.events_kafka;
    • 这样,Kafka 中的新数据会自动被 MV 推送到分布式表,实现实时同步。

6.2.2 使用 clickhouse-mysql 工具

clickhouse-mysql 是社区提供的一个 Python 脚本,可直接将 MySQL 表结构与数据迁移到 ClickHouse。

  1. 安装依赖

    pip install clickhouse-mysql
  2. 执行迁移命令

    clickhouse-mysql --mysql-host mysql_host --mysql-port 3306 --mysql-user root --mysql-password secret \
      --clickhouse-host 192.168.1.20 --clickhouse-port 9000 --clickhouse-user default --clickhouse-password '' \
      --database mydb --table events --clickhouse-database default --clickhouse-table events_distributed
    • 默认会将 MySQL 表自动映射为 ClickHouse 表,如创建合适的 MergeTree 引擎表,再批量插入数据。
    • 对于分布式环境,可先在新集群创建分布式表,再指定 --clickhouse-table 为分布式表,脚本会自动往分布式表写入数据。

6.3 clickhouse-copier 工具使用

clickhouse-copier 是 ClickHouse 社区自带的工具,可在集群内部做分片间或集群间的数据搬迁。

  1. 准备复制任务的配置文件copier_config.xml

    <copy>
      <shard>
        <cluster>cluster1</cluster>
        <replica>click1</replica>
      </shard>
      <shard>
        <cluster>cluster1</cluster>
        <replica>click3</replica>
      </shard>
    
      <tables>
        <table>
          <database>default</database>
          <name>events_local</name>
        </table>
      </tables>
    </copy>
    • 上述示例将指定将 events_local 从 Shard1 的 click1 复制到 Shard2 的 click3,需根据实际场景配置更多 <shard><table>
  2. 执行复制

    clickhouse-copier --config /path/to/copier_config.xml --replication 0
    • --replication 0 表示不做 ReplicatedMergeTree 的基于日志复制,仅做一次全量迁移。
    • 适用于集群扩容、分片重平衡等操作。

6.4 INSERT SELECT 与外部表引擎同步

  • INSERT SELECT

    • 适用于跨集群、跨数据库全量复制:

      INSERT INTO default.events_distributed
      SELECT * FROM default.events_local WHERE dt >= '2023-09-01';
    • 可分批(按日期、ID 范围)多次执行。
  • 外部表引擎

    • ClickHouse 支持通过 MySQL 引擎访问 MySQL 表,如:

      CREATE TABLE mysql_events (
        dt Date,
        user_id UInt64,
        action String,
        value Float32
      )
      ENGINE = MySQL('mysql_host:3306', 'mydb', 'events', 'root', 'secret');
    • 然后可在 ClickHouse 侧做:

      INSERT INTO default.events_distributed
      SELECT * FROM mysql_events;
    • 外部表引擎适合数据量相对较小或批量一次性导入,若是实时增量同步,仍推荐 Kafka + Materialized View。

6.5 实时同步示例:使用 Kafka 引擎 + Materialized View

在 MySQL 侧将 Binlog 推到 Kafka 后,ClickHouse 侧通过 Kafka 引擎表 + MV,实现近实时同步。

  1. MySQL → Kafka

    • 部署 Kafka 集群。
    • 使用 Debezium Connector for MySQL,将 MySQL Binlog 写入 Kafka 主题 mysql.events_binlog
  2. ClickHouse 侧创建 Kafka 表

    CREATE TABLE default.events_binlog_kafka (
      dt Date,
      user_id UInt64,
      action String,
      value Float32
    ) ENGINE = Kafka SETTINGS
      kafka_broker_list = 'k1:9092,k2:9092',
      kafka_topic_list = 'mysql.events_binlog',
      kafka_group_name = 'ch_binlog_consumer',
      kafka_format = 'JSONEachRow',
      kafka_num_consumers = 4;
  3. 创建 Materialized View

    CREATE MATERIALIZED VIEW default.events_binlog_mv TO default.events_distributed AS
    SELECT dt, user_id, action, value
    FROM default.events_binlog_kafka;
    • 当 Kafka 有新消息(INSERT/UPDATE/DELETE)时,MV 自动触发,将数据写入分布式表。
    • 对于 UPDATE/DELETE,可根据具体业务需求将这些操作转化为 ClickHouse 的 MergeTree 修改或 VXIN 等逻辑。

7. 运维与监控要点

在生产环境下,ClickHouse 分布式集群的健壮性和性能调优尤为关键。以下介绍一些常见的运维与监控要点。

7.1 ZooKeeper 集群监控

  • 节点状态检查

    echo ruok | nc 192.168.1.10 2181  # 返回 imok 则正常
    echo stat | nc 192.168.1.10 2181  # 查看节点状态、客户端连接数
  • 集群状态检查

    echo srvr | nc 192.168.1.10 2181
    • 可查看是否有选举 leader、是否存在掉线节点等。
  • 监控指标

7.2 ClickHouse 节点健康检查

  • 系统表

    • system.replication_queue:查看各 Replica 的复制队列积压情况。

      SELECT database, table, is_currently_executing, parts_to_merge, queue_size 
      FROM system.replication_queue;
    • system.mutations:查看表的 mutations(更新/删除)状态。

      SELECT database, table, mutation_id, is_done, parts_to_do, parts_done 
      FROM system.mutations;
    • system.parts:查看数据分区与磁盘占用情况。

      SELECT database, table, partition, name, active, bytes_on_disk 
      FROM system.parts WHERE database='default' AND table LIKE 'events%';
    • system.metrics / system.events:监控 ClickHouse 实时指标,如 Query、Insert 吞吐量,Cache 命中率等。
  • 持续监控

7.3 分片与副本恢复流程

7.3.1 Replica 加入流程

  1. 新增 Replica

    • 在一个 Shard 下新增 Replica,先在 ZooKeeper 对应路径下创建新 Replica 的目录。
    • 在新节点上创建本地表(表结构需与原 Shard 保持一致),并指定新的 Replica 名称。
    • 启动 ClickHouse,该 Replica 会从 ZooKeeper 上的复制队列拉取现有数据,完成全量数据复制。
  2. Shard 扩容(横向扩容)

    • 如果要增加 Shard 数量(比如从 2 个 Shard 扩容到 3 个),则需:

      • 暂停写入,或者使用 clickhouse-copier 做分片重平衡。
      • 在新节点上创建对应的本地 ReplicatedMergeTree 表,指定新的 Shard 路径。
      • 使用 clickhouse-copier 或脚本将已有数据重分布到新的 Shard。

7.3.2 副本修复流程

当某个 Replica 节点发生故障并恢复后,需要让它重新同步数据:

  1. 重启节点,它会检测到 ZooKeeper 上已有的副本信息。
  2. Replica 恢复复制,从 Leader 主动拉取尚未复制的分区文件并恢复。
  3. 检查状态

    SELECT database, table, replica_name, is_leader, queue_size 
    FROM system.replicas WHERE database='default' AND table LIKE 'events%';
    • queue_size=0is_currently_executing=0 表示恢复完成。

7.4 备份与恢复策略

  • 备份工具

    • Altinity ClickHouse Backup:社区推荐备份工具。支持全量/增量备份与恢复。
    • 也可手动使用 clickhouse-client --query="SELECT * FROM table FORMAT Native" 导出,然后再用 clickhouse-client --query="INSERT INTO table FORMAT Native" 导入。
  • ZooKeeper 数据备份

    • 可使用 zkCli.sh 导出关键路径的节点数据,以及定期备份 /var/lib/zookeeper/version-2
  • 恢复流程

    1. 恢复 ZooKeeper 数据,保证 ReplicatedMergeTree 的队列信息完整。
    2. 重启 ClickHouse,Replica 会从 ZooKeeper 获取需要恢复的分区;
    3. 如果只想恢复部分数据,可手动删除对应的本地分区文件,再让 Replica 重新执行复制。

8. 常见问题与优化建议

在 ClickHouse 分布式生产环境中,经常会遇到性能瓶颈、数据倾斜、Shard 节点不均衡等问题。下面总结一些常见问题与优化技巧。

8.1 查询慢或分布式 JOIN 性能优化

  • 避免跨 Shard JOIN

    • ClickHouse 的分布式 JOIN 在多 Shard 场景下需要将数据从一个 Shard 拉取到另一个 Shard 进行 Join,网络 I/O 成本高。建议:

      • 数据预聚合(Denormalization):将需要关联的数据预先合并到同一个表中;
      • 使用物化视图:在本地 MergeTree 表上预先计算好关键信息;
      • 单 Shard 物理表:如果某个表非常小,可把它复制到每个 Shard 上本地 Join。
  • Distributed 聚合优化

    • 对于大规模聚合查询,建议先在本地执行聚合(aggregate_overflow_mode='throw'),再在客户端进行最终合并。
    • 使用 settings max_threads = X, max_memory_usage = Y 控制查询资源消耗。

8.2 数据倾斜与分片键设计

  • 数据倾斜

    • 如果分片键导出的数据在某个 Shard 过多而其他 Shard 较少,导致 Shard1 负载过重,Shards2/3 空闲。
    • 解决方案:

      • 重新设计分片键,例如使用复合键或哈希函数与随机数结合;
      • 动态调整分片策略,使用一致性哈希等更均衡的方案;
      • 扩容 Shard 节点,将热点数据分摊到更多 Shard。

8.3 磁盘、内存、网络调优

  • 磁盘性能

    • 推荐使用 SSD 或 NVMe,至少提供 10,000+ IOPS;
    • ClickHouse 在 Merge 任务、高并发写入时对磁盘 I/O 敏感。可使用 RAID0 多盘并行提升吞吐。
  • 内存配置

    • 设置合理的 max_memory_usage
    • 调整 [max_threads] 来控制并行度,避免 OOM;
    • 若有大量 Map/Join 操作,可考虑开启 [join_use_nulls_for_low_cardinality_keys] 以减少内存占用。
  • 网络带宽与延迟

    • 分布式查询与复制都依赖网络:

      • 使用至少 10Gb/s 以降低跨 Shard 数据传输延迟;
      • 配置 max_distributed_connectionsreceive_timeoutsend_timeout 等参数优化通信。

9. 总结

本文从 ClickHouse 分布式架构原理入手,详细讲解了如何在生产环境下:

  1. 部署 ZooKeeper 高可用集群,并配置 ClickHouse 节点连接;
  2. 设计分布式集群拓扑,实现 Shard 与 Replica 的高可用与负载均衡;
  3. 在各节点创建 ReplicatedMergeTree 本地表,通过 ZooKeeper 管理副本复制;
  4. 使用 Distributed 引擎创建逻辑表,自动实现跨 Shard 路由与分布式聚合;
  5. 演示数据写入与查询流程,并提供批量 Insert、Distributed 聚合等常见操作示例;
  6. 提供多种数据迁移方案,包括单机→分布式迁移、MySQL→ClickHouse 同步、Kafka 实时同步等全流程;
  7. 总结运维与监控要点,探讨 Replica 恢复、Shard 扩容、性能调优等实战经验;
  8. 针对常见问题给出优化建议,如数据倾斜、跨 Shard JOIN 降低网络开销、硬件选型等。

通过本文内容,你可以:

  • 搭建一个稳定的 ClickHouse 分布式集群,实现数据的高可用与水平扩展;
  • 利用 ReplicatedMergeTree 与 Distributed 引擎,灵活构建分布式表结构;
  • 结合 Kafka、Materialized View、clickhouse-copier 等工具,实现多源异构数据迁移与实时同步;
  • 在运维过程中通过系统表与监控手段快速排查问题,保证集群高效运行;
  • 通过合理的分片键与硬件优化,避免数据倾斜与性能瓶颈。

评论已关闭

推荐阅读

AIGC实战——Transformer模型
2024年12月01日
Socket TCP 和 UDP 编程基础(Python)
2024年11月30日
python , tcp , udp
如何使用 ChatGPT 进行学术润色?你需要这些指令
2024年12月01日
AI
最新 Python 调用 OpenAi 详细教程实现问答、图像合成、图像理解、语音合成、语音识别(详细教程)
2024年11月24日
ChatGPT 和 DALL·E 2 配合生成故事绘本
2024年12月01日
omegaconf,一个超强的 Python 库!
2024年11月24日
【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测
2024年12月01日
[超级详细]如何在深度学习训练模型过程中使用 GPU 加速
2024年11月29日
Python 物理引擎pymunk最完整教程
2024年11月27日
MediaPipe 人体姿态与手指关键点检测教程
2024年11月27日
深入了解 Taipy:Python 打造 Web 应用的全面教程
2024年11月26日
基于Transformer的时间序列预测模型
2024年11月25日
Python在金融大数据分析中的AI应用(股价分析、量化交易)实战
2024年11月25日
AIGC Gradio系列学习教程之Components
2024年12月01日
Python3 `asyncio` — 异步 I/O,事件循环和并发工具
2024年11月30日
llama-factory SFT系列教程:大模型在自定义数据集 LoRA 训练与部署
2024年12月01日
Python 多线程和多进程用法
2024年11月24日
Python socket详解,全网最全教程
2024年11月27日
python之plot()和subplot()画图
2024年11月26日
理解 DALL·E 2、Stable Diffusion 和 Midjourney 工作原理
2024年12月01日