OceanBase 实时分析Demo 解析:Flink + OceanBase
-- 创建OceanBase数据库表
CREATE TABLE IF NOT EXISTS ob_table (
id INT,
name VARCHAR(255),
amount BIGINT
) ENGINE=oceanbase_catalog;
-- 创建Flink SQL表
CREATE TABLE IF NOT EXISTS flink_table (
id INT,
name VARCHAR(255),
amount BIGINT,
event_timestamp TIMESTAMP(3)
) WITH (
'connector' = 'datagen', -- 使用数据生成器作为数据源
'fields.id.kind' = 'random', -- id字段使用随机数据生成
'fields.id.min' = '1', -- id字段最小值
'fields.id.max' = '100', -- id字段最大值
'fields.name.length' = '10', -- name字段长度
'rows-per-second' = '10' -- 每秒生成的行数
);
-- 将Flink SQL表的数据写入到OceanBase表
INSERT INTO ob_table
SELECT
id,
name,
amount
FROM
flink_table;
这个简单的Flink SQL脚本演示了如何使用Flink连接器将数据生成器生成的数据实时写入到OceanBase数据库表中。这个例子展示了如何在Flink中集成OceanBase作为数据目的地,并演示了如何通过Flink SQL API进行数据的实时写入操作。
评论已关闭