Debezium系列之:使用Debezium接入PostgreSQL数据库数据到Kafka集群的详细技术文档
-- 创建PostgreSQL数据库中的source表
CREATE TABLE "source" (
"id" INT PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"bs" BYTEA
);
-- 插入示例数据
INSERT INTO "source" ("id", "name", "bs") VALUES (1, 'Name 1', 'bytea E\'\\xDEADBEEF\'');
INSERT INTO "source" ("id", "name", "bs") VALUES (2, 'Name 2', 'bytea E\'\\xBEEFDEAD\'');
{
"name": "pg-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "pg-source-connector",
"table.include.list": "source",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name": "pg-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "postgres",
"database.server.name": "pg-source-connector",
"table.include.list": "source",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}'
-- 查询Kafka中的数据变更事件
kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.postgres.public.source --from-beginning
这个实例展示了如何创建一个用于数据变更监听的PostgreSQL表,如何使用Debezium连接器监听这个表的变更,并将变更事件发送到Kafka。同时,提供了使用curl命令行工具发送HTTP请求的方法来配置连接器。最后,演示了如何从Kafka中读取和查看这些变更事件。
评论已关闭