基于FlinkCDC-3.1.1&Flink-1.18实现MySQL DDL审计告警
-- 创建MySQL DDL审计告警的Flink SQL作业
CREATE TABLE mysql_source_ddl_events (
id INT,
type STRING,
database_name STRING,
table_name STRING,
ddl STRING
) WITH (
'connector' = 'changelog-jdbc',
'username' = 'root',
'password' = 'yourpassword',
'scan.startup.mode' = 'earliest-offset',
'changelog-mode' = 'all',
'monitor-username' = 'root',
'monitor-password' = 'yourpassword',
'hostname' = 'your-mysql-host',
'port' = '3306',
'catalog-name' = 'mysql-cdc-test'
);
CREATE TABLE ddl_alarm_sink (
type STRING,
database_name STRING,
table_name STRING,
ddl STRING
) WITH (
'connector' = 'logger'
);
INSERT INTO ddl_alarm_sink
SELECT type, database_name, table_name, ddl
FROM mysql_source_ddl_events
WHERE type IN ('CREATE', 'ALTER', 'DROP');
这个简单的Flink SQL作业会从MySQL的binlog中读取DDL事件,并将类型为CREATE、ALTER和DROP的事件输出到ddl_alarm_sink
表,该表配置为logger连接器,实际使用中可以替换为实际的告警实现,如发送邮件、写入日志系统等。
评论已关闭