import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.descriptors.*;
public class FlinkETLExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 配置MySQL源连接
tableEnv.executeSql("CREATE TABLE source_mysql_table (" +
"id INT," +
"name STRING," +
"full_name STRING," +
"price DECIMAL(32, 2)," +
"ts TIMESTAMP(3)," +
"`proc` STRING," +
"source_table STRING," +
"event_type STRING," +
"before STRING," +
"after STRING," +
"primary key (id) not enforced" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname' = 'your-mysql-host'," +
"'port' = '3306'," +
"'username' = 'your-username'," +
"'password' = 'your-password'," +
"'database' = 'your-database'," +
"'table-name' = 'your-table-name'" +
")");
// 配置Oracle CDC连接
tableEnv.executeSql("CREATE TABLE source_oracle_table (" +
"id INT," +
"name STRING," +
"full_name STRING," +
"price DECIMAL(32, 2)," +
"ts TIMESTAMP(3)," +
"`proc` STRING," +
"source_table STRING," +
"event_type STRING," +
"before STRING," +
"after STRING," +
"primary key (id) not enforced" +
") WITH (" +
"'connector' = 'oracle-cdc'," +
"'hostname' = 'your-oracle-host'," +
"'port' = '1521'," +
"'username' = 'your-username'," +
"'password' = 'your-password'," +
"'database-name' = 'your-database-name'," +
"'schema-name' = 'your-schema-name'," +
"'ta
评论已关闭