MySQL 到 Kafka 实时数据同步实操分享
-- 创建 MySQL 到 Kafka 的实时数据同步
-- 1. 创建 MySQL 表
CREATE TABLE `orders` (
`id` INT NOT NULL AUTO_INCREMENT,
`order_no` VARCHAR(20),
`amount` DECIMAL(10, 2),
PRIMARY KEY (`id`)
);
-- 2. 插入示例数据
INSERT INTO `orders` (`order_no`, `amount`) VALUES ('order001', 1000.00);
-- 3. 创建 Kafka 主题
kafka-topics.sh --create --topic orders_topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
-- 4. 使用 Debezium 连接器监控数据变更
curl -i -X POST -H "Content-Type: application/json" -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "your_username",
"database.password": "your_password",
"database.server.id": "184054",
"database.server.name": "myapp",
"database.include.list": "orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.myapp",
"include.schema.changes": "true"
}
}' http://localhost:8083/connectors
-- 5. 连接器启动后,Debezium 监控 MySQL 数据变更,并将变更消息发布到 Kafka 的 orders_topic 主题
这个实操例子展示了如何创建一个 MySQL 表、插入数据、创建 Kafka 主题以及使用 Debezium 连接器来监控数据变更并将变更消息发布到 Kafka 的主题中。这是实现 MySQL 到 Kafka 实时数据同步的一个基本流程。
评论已关闭