【Flink CDC】实现mysql整表与增量读取
-- 创建mysql源表
CREATE TABLE sourceTable (
id INT,
name STRING,
count INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'your_mysql_host_ip',
'port' = '3306',
'username' = 'your_username',
'password' = 'your_password',
'database-name' = 'your_database_name',
'table-name' = 'your_table_name'
);
-- 创建sink表,例如在Flink中使用内存表进行测试
CREATE TABLE sinkTable (
id INT,
name STRING,
count INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'print'
);
-- 整表数据读取
INSERT INTO sinkTable
SELECT * FROM sourceTable;
-- 增量数据读取
INSERT INTO sinkTable
SELECT * FROM sourceTable;
在这个例子中,我们首先定义了一个名为sourceTable
的源表,它连接到了MySQL数据库,并使用了CDC连接器来监听表的变化。然后我们创建了一个名为sinkTable
的接收器表,用于接收数据,并且在这里我们使用了Flink的内置连接器print
来简单地打印结果。接下来,我们展示了如何执行整表读取和增量读取操作。这个例子展示了如何使用Flink CDC连接器来同步MySQL中的数据变更到Flink系统中。
评论已关闭