import io.debezium.config.Configuration;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
public class DebeziumConfig {
@Autowired
private Environment env;
@Bean
public io.debezium.engine.Engine<ChangeEventFormat> debeziumEngine() {
Configuration config = Configuration.create()
.with("name", "pg-connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("tasks.max", "1")
.with("database.hostname", env.getProperty("spring.datasource.url").split(":")[1].split("/")[2])
.with("database.port", Integer.parseInt(env.getProperty("spring.datasource.url").split(":")[2].split("/")[0]))
.with("database.user", env.getProperty("spring.datasource.username"))
.with("database.password", env.getProperty("spring.datasource.password"))
.with("database.dbname", env.getProperty("spring.datasource.url").split("/")[1])
.with("database.server.name", "dbserver1")
.with("database.include.list", "test_db")
.with("database.history.kafka.bootstrap.servers", env.getProperty("kafka.bootstrap.servers"))
.with("database.history.kafka.topic", "schema-changes.test_db")
.with("include.schema.changes", "true")
.with("change.capture.policy", "schema_only")
.with("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.with("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.with("key.converter.schemas.enable", "false")
评论已关闭