MySQL到Doris的StreamingETL实现(Flink CDC 3.0)
-- 创建Kafka源表,用于接收来自Flink CDC的变更数据
CREATE EXTERNAL RESOURCE "kafka_source"
PROPERTIES
(
"connector" = "kafka",
"topic" = "your_kafka_topic",
"properties.bootstrap.servers" = "your_kafka_broker_list",
"properties.group.id" = "your_kafka_group_id",
"format" = "json"
);
CREATE TABLE cdc_source_table
(
id INT,
name VARCHAR(255),
update_time DATETIME
)
WITH (
"resource" = "kafka_source",
"format" = "json"
);
-- 创建Doris目标表,用于接收ETL后的数据
CREATE TABLE if not exists your_doris_db.your_doris_table
(
id INT,
name VARCHAR(255),
update_time DATETIME
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10;
-- 创建Flink作业,执行实时数据ETL
INSERT INTO your_doris_db.your_doris_table
SELECT * FROM cdc_source_table;
这个例子展示了如何在Flink中使用CDC 3.0从MySQL同步变更数据到Doris数据库。首先,我们创建了一个Kafka源表来接收变更日志,然后定义了一个Doris目标表,最后通过INSERT语句实现了ETL操作。这个流程是实时数据集成的一个典型示例,对于需要实时同步数据变更的场景,如数据仓库构建、OLAP分析等,具有很好的指导意义。
评论已关闭