通过Flink-SQL,将Kafka中的Oracle-CDC-Log同步到Doris
前言
- Oracle的binlog日志已经由DBA通过OGG同步到Kafka中了,因此用不到Flink CDC
- 同步到Kafka中的JSON样式1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22{ 
 "before": {
 "id": 111,
 "name": "scooter",
 "description": "Big 2-wheel scooter",
 "weight": 5.18
 },
 "after": {
 "id": 111,
 "name": "scooter",
 "description": "Big 2-wheel scooter",
 "weight": 5.15
 },
 "op_type": "U",
 "op_ts": "2020-05-13 15:40:06.000000",
 "current_ts": "2020-05-13 15:40:07.000000",
 "primary_keys": [
 "id"
 ],
 "pos": "00000000000000000000143",
 "table": "PRODUCTS"
 }
Flink SQL
需要下载以下Jar包,放在{flink_home}/lib/下
flink-sql-connector-kafka_2.12-1.14.5.jar
flink-json-1.15.1.jar
flink-doris-connector-1.14_2.12-1.1.0.jar
- 开启CheckPoint: - SET 'execution.checkpointing.interval' = '10min';
- 创建Kafka数据源表,设置 - 'format' = 'ogg-json',只有- org.apache.flink.flink-json-1.15.1中以上才支持ogg-json fromat- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14- CREATE TABLE topic_products ( 
 id INT,
 name STRING,
 description STRING,
 weight DECIMAL(10, 2)
 ) WITH (
 'connector' = 'kafka',
 'topic' = 'products_ogg_1',
 'properties.bootstrap.servers' = '172.30.160.5:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'ogg-json',
 'scan.startup.mode' = 'earliest-offset',
 'ogg-json.ignore-parse-errors' = 'true'
 );
- 创建Doris-Sink表 - 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17- CREATE TABLE doris_sink ( 
 id INT,
 name STRING,
 description STRING,
 weight DECIMAL(10, 2)
 )
 WITH (
 'connector' = 'doris',
 'fenodes' = '172.30.160.5:8030',
 'table.identifier' = 'test.product',
 'username' = 'root',
 'password' = '',
 'sink.properties.format' = 'json',
 'sink.properties.read_json_by_line' = 'true',
 'sink.enable-delete' = 'true',
 'sink.label-prefix' = 'doris_label'
 );
- 执行 - INSERT into doris_sink select * from topic_products;语句,写入Doris
Code Repo
- bin/sql-client.sh embedded -i init_file -f file -s yarn-session
- Execute SQL Files
| 1 | -- Define available catalogs | 
| 1 | CREATE TEMPORARY TABLE users ( | 
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 破晓!
 评论
ValineDisqus




