Spring Boot+Debezium:解决 MySQL Binlog监听
import io.debezium.config.Configuration;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.RecordChangeEventSource;
// 配置 Debezium 来监听 MySQL 的 binlog
Configuration config = Configuration.create()
.set("name", "mysql-connector")
.set("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.set("tasks.max", "1")
.set("database.hostname", "your-mysql-host")
.set("database.port", "3306")
.set("database.user", "your-username")
.set("database.password", "your-password")
.set("database.server.id", "184054")
.set("database.server.name", "my-app-connector")
.set("database.include.list", "mydb")
.set("database.history.kafka.bootstrap.servers", "kafka:9092")
.set("database.history.kafka.topic", "mydb.history")
.set("change.capture.policy", "incrementing")
.build();
// 创建 Debezium 的 RecordChangeEventSource
RecordChangeEventSource source = new RecordChangeEventSource(
config,
() -> ChangeEventFormat.of(ChangeEventFormat.ChangeEventFormatType.DEBEZIUM_V1)
);
// 启动监听
source.start(record -> {
// 处理接收到的变更事件
System.out.println(record);
});
// 在适当的时候停止监听
source.stop();
这段代码展示了如何使用 Debezium 来监听 MySQL 的 binlog 并处理变更事件。配置参数需要根据实际情况进行调整,例如数据库的主机名、端口、用户、密码以及需要监听的数据库名等。处理函数中可以根据实际需求来更新应用程序状态或者发送消息。
评论已关闭