OceanBase 实时分析Demo 解析:Flink + OceanBase
    		       		warning:
    		            这篇文章距离上次修改已过422天,其中的内容可能已经有所变动。
    		        
        		                
                
-- 创建OceanBase数据库表
CREATE TABLE IF NOT EXISTS ob_table (
    id INT,
    name VARCHAR(255),
    amount BIGINT
) ENGINE=oceanbase_catalog;
 
-- 创建Flink SQL表
CREATE TABLE IF NOT EXISTS flink_table (
    id INT,
    name VARCHAR(255),
    amount BIGINT,
    event_timestamp TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',                -- 使用数据生成器作为数据源
    'fields.id.kind' = 'random',            -- id字段使用随机数据生成
    'fields.id.min' = '1',                  -- id字段最小值
    'fields.id.max' = '100',                -- id字段最大值
    'fields.name.length' = '10',             -- name字段长度
    'rows-per-second' = '10'                -- 每秒生成的行数
);
 
-- 将Flink SQL表的数据写入到OceanBase表
INSERT INTO ob_table
SELECT
    id,
    name,
    amount
FROM
    flink_table;这个简单的Flink SQL脚本演示了如何使用Flink连接器将数据生成器生成的数据实时写入到OceanBase数据库表中。这个例子展示了如何在Flink中集成OceanBase作为数据目的地,并演示了如何通过Flink SQL API进行数据的实时写入操作。
评论已关闭