利用Flink Doris Connector将Kafka中的数据实时导入到Doris

该Connector支持Flink SQL和DataStream API

注意:

  1. 修改和删除只支持在 Unique Key 模型上
  2. 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。Flink CDC 的数据删除使用方式参照本文档最后一节

Maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- flink-doris-connector -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.1.0</version>
</dependency>

参数配置

Flink Doris Connector Sink 的内部实现是通过 Stream Load 服务向 Doris 写入数据, 同时也支持 Stream Load 请求参数的配置设置,具体参数可参考这里,配置方法如下:

  • SQL 使用 WITH 参数 sink.properties. 配置
  • DataStream 使用方法DorisExecutionOptions.builder().setStreamLoadProp(Properties)配置

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package cn.jxau.yuan


import cn.jxau.yuan.common.Config
import org.apache.doris.flink.cfg.{DorisExecutionOptions, DorisOptions, DorisReadOptions}
import org.apache.doris.flink.sink.DorisSink
import org.apache.doris.flink.sink.writer.SimpleStringSerializer
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala._

import java.util.Properties


object KafkaConnectTest {

def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)


val kafkaSource: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers(Config.broker)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build

env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String](), "Kafka Source")
.sinkTo(buildDorisSink())

env.execute()
}

def buildDorisSink(): DorisSink[String] = {
//sink config
val builder: DorisSink.Builder[String] = DorisSink.builder();
val dorisBuilder: DorisOptions.Builder = DorisOptions.builder();
dorisBuilder.setFenodes("127.0.0.1:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");


val pro: Properties = new Properties();
//json data format
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");


val executionOptions: DorisExecutionOptions = DorisExecutionOptions.builder()
.setLabelPrefix("label-doris") //streamload label prefix,
.setStreamLoadProp(pro)
.build()

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer()) //serialize according to string
.setDorisOptions(dorisBuilder.build())
.build()
}
}

我们想要在代码中使用Table API,必须引入相关的依赖。

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

这里的依赖是一个 Java 的“桥接器”(bridge),主要就是负责 Table API 和下层 DataStream API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。

如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

这里主要添加的依赖是一个“计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带planner,所以在生产集群环境中提交的作业不需要打包这个赖。而在Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。
另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>