Flink CDC 1.18.1 Oracle 数据同步到postgresql
Flink CDC (Changelog Data Capture) 是一种用于捕获数据库变更日志的技术,可以用于实时数据同步。以下是使用Flink CDC 1.18.1将Oracle数据库同步到PostgreSQL的基本步骤和示例代码:
- 确保Oracle和PostgreSQL数据库中已经有需要同步的表,并且Oracle数据库开启了归档模式和支持LogMiner或者是通过XStream。
- 在PostgreSQL中创建与Oracle同样结构的表。
- 添加Flink CDC相关的依赖到项目中,例如使用Maven可以添加如下依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>1.18.1</version>
</dependency>
- 编写Flink程序以实现数据同步:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.postgres.PostgreSQLSink;
public class OracleToPostgreSQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建Oracle Source
String sourceDDL = "" +
"CREATE TABLE source_table (" +
" id INT," +
" name STRING," +
" pwd STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'oracle-cdc'," +
" 'hostname' = 'your_oracle_host'," +
" 'port' = '1521'," +
" 'username' = 'your_username'," +
" 'password' = 'your_password'," +
" 'db-name' = 'your_db_name'," +
" 'table-name' = 'your_table_name'" +
")";
tableEnv.executeSql(sourceDDL);
// 创建PostgreSQL Sink
String sinkDDL = "" +
"CREATE TABLE sink_table (" +
" id INT," +
" name STRING," +
" pwd STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'postgres-cdc'," +
" 'hostname' = 'your_postgres_host'," +
" 'port' = '5432'," +
" 'username' = 'your_username'," +
" 'p
评论已关闭