通过Flink-SQL,将Kafka中的Oracle-CDC-Log同步到Doris
前言
- Oracle的binlog日志已经由DBA通过OGG同步到Kafka中了,因此用不到Flink CDC
- 同步到Kafka中的JSON样式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}
Flink SQL
需要下载以下Jar包,放在{flink_home}/lib/下
flink-sql-connector-kafka_2.12-1.14.5.jar
flink-json-1.15.1.jar
flink-doris-connector-1.14_2.12-1.1.0.jar
开启CheckPoint:
SET 'execution.checkpointing.interval' = '10min';
创建Kafka数据源表,设置
'format' = 'ogg-json'
,只有org.apache.flink.flink-json-1.15.1
中以上才支持ogg-json fromat1
2
3
4
5
6
7
8
9
10
11
12
13
14CREATE TABLE topic_products (
id INT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_ogg_1',
'properties.bootstrap.servers' = '172.30.160.5:9092',
'properties.group.id' = 'testGroup',
'format' = 'ogg-json',
'scan.startup.mode' = 'earliest-offset',
'ogg-json.ignore-parse-errors' = 'true'
);创建Doris-Sink表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17CREATE TABLE doris_sink (
id INT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
)
WITH (
'connector' = 'doris',
'fenodes' = '172.30.160.5:8030',
'table.identifier' = 'test.product',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true',
'sink.label-prefix' = 'doris_label'
);执行
INSERT into doris_sink select * from topic_products;
语句,写入Doris
Code Repo
- bin/sql-client.sh embedded -i init_file -f file -s yarn-session
- Execute SQL Files
1 | -- Define available catalogs |
1 | CREATE TEMPORARY TABLE users ( |
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 破晓!
评论
ValineDisqus