Paimon-Flink-Sink源码分析
前言
Paimon在Flink下面的层次结构,大概为:Catalog -> Database -> Table -> Record。因此看Paimon如何在Flink中实现Read、Write等操作,先从Catalog开始。
Catalog
Paimon在Flink中实现Catalog的源码位于org.apache.paimon.flink.FlinkCatalog
,该类实现了org.apache.flink.table.catalog.Catalog
接口。该接口中定义了一系列方法,包括listTables、listViews等方法,而paimon一一实现了这些接口。
在Flink中自定义Catalog,还需要实现org.apache.flink.table.factories.CatalogFactory
工厂接口,paimon中对应的为org.apache.paimon.flink.FlinkCatalogFactory
类。该工厂类当然是用来创建Catalog了。
最后还需要将Catalog工厂实现类,添加到将此实现类添加到 META_INF/services/org.apache.flink.table.factories.Factory
中。用于SPI。
Table
在paimon实现的FlinkCatalog类中,org.apache.paimon.flink.FlinkCatalog#getFactory
方法,用于提供写入和读取Paimon的具体实现。
1 |
|
在Paimon中,这个方法返回的是org.apache.paimon.flink.FlinkTableFactory
。该工厂类实现了org.apache.flink.table.factories.DynamicTableSourceFactory
,org.apache.flink.table.factories.DynamicTableSinkFactory
接口。分别用来实现读取和写入Paimon表的逻辑。而写入Paimon表的实现类为org.apache.paimon.flink.sink.FlinkTableSink
。该类主要实现了org.apache.flink.table.connector.sink.DynamicTableSink
接口。
1 | /** |
将动态表汇入外部存储系统。
动态表是 Flink 的 Table & SQL API 的核心概念,用于以统一的方式处理有界和无界数据。根据定义,动态表可以随时间变化。
在编写动态表时,内容始终可以被视为一个 changelog(有限或无限),所有更改都被连续写出,直到 changelog 耗尽。给定的ChangelogMode指示接收器在运行时接受的更改集。
对于常规批处理方案,接收器只能接受仅插入行并写出有界流。
对于常规流场景,接收器只能接受仅插入行,并且可以写出无界流。
对于变更数据捕获 (CDC) 场景,接收器可以写出带有插入、更新和删除行的有界或无界流。另请参阅RowKind 。
DynamicTableSink的实例可以被视为最终生成用于写入实际数据的具体运行时实现的工厂。
根据可选声明的能力,规划器可能会将更改应用于实例,从而改变生成的运行时实现。
DynamicTableSink可以实现以下功能:
SupportsPartitioning
SupportsOverwrite
SupportsWritingMetadata
在最后一步中,规划器将调用getSinkRuntimeProvider(DynamicTableSink.Context)来获取运行时实现的提供者。
Write
构建Paimon Sink Flink DAG源码流程
入口类以及对应的入口方法为:org.apache.paimon.flink.sink.FlinkSinkBuilder#build,
进入这个方法中看看,发现会先对DataStream进行,按照分桶进行分区转换
1 | org.apache.paimon.flink.sink.FlinkSinkBuilder#build |
1、createWriteOperator:实际进行写入Record的算子, org.apache.paimon.flink.sink.RowDataStoreWriteOperator
org.apache.paimon.flink.AbstractFlinkTableFactory#buildPaimonTable2、CommitterOperator:CK时进行snapshot commit的地方,保证数据可见性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39org.apache.paimon.flink.sink.FlinkSink#sinkFrom(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, org.apache.paimon.flink.sink.StoreSinkWrite.Provider);
public DataStreamSink<?> sinkFrom(
DataStream<T> input, String commitUser, StoreSinkWrite.Provider sinkProvider) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
isStreaming && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
assertCheckpointConfiguration(env);
}
CommittableTypeInfo typeInfo = new CommittableTypeInfo();
SingleOutputStreamOperator<Committable> written =
input.transform(
WRITER_NAME + " -> " + table.name(),
typeInfo,
// 1、createWriteOperator:实际进行写入Record的算子
createWriteOperator(sinkProvider, isStreaming, commitUser))
.setParallelism(input.getParallelism());
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " -> " + table.name(),
typeInfo,
// 2、CommitterOperator:CK时进行snapshot commit的地方,保证数据可见性。
new CommitterOperator(
streamingCheckpointEnabled,
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager()))
.setParallelism(1)
.setMaxParallelism(1);
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}1、createWriteOperator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig checkpointConfig) {
boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
// 如果配置为writeOnly(),则不进行在线压缩
waitCompaction = false;
} else {
Options options = table.coreOptions().toConfiguration();
ChangelogProducer changelogProducer = table.coreOptions().changelogProducer();
// 当ChangelogProducer为LOOKUP时,则等待压缩
waitCompaction =
changelogProducer == ChangelogProducer.LOOKUP
&& options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT);
// 决定FULL_COMPACTION的压缩间隔
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
long fullCompactionThresholdMs =
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
deltaCommits =
(int)
(fullCompactionThresholdMs
/ checkpointConfig.getCheckpointInterval());
}
// Generate changelog files with each full compaction
// 当进行FULL_COMPACTION的时候,需要生成changelog files
if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
return (table, commitUser, state, ioManager) ->
new GlobalFullCompactionSinkWrite(
table,
commitUser,
state,
ioManager,
isOverwrite,
waitCompaction,
finalDeltaCommits);
}
}
return (table, commitUser, state, ioManager) ->
new StoreSinkWriteImpl(
table, commitUser, state, ioManager, isOverwrite, waitCompaction);
}
Paimon Flink CK流程
我们知道Flink paimon写入主要涉及两个算子:
1、org.apache.paimon.flink.sink.RowDataStoreWriteOperator
这个算子实现了org.apache.flink.streaming.api.operators.StreamOperator#prepareSnapshotPreBarrier
方法,这个方法会在算子接受到driver的checkpoint请求后被调用。在prepareSnapshotPreBarrier方法中会调用org.apache.paimon.flink.sink.PrepareCommitOperator#emitCommittables
方法,emitCommittables方法的作用是向后面的commit算子发送committable信息。
然而这个emitCommittables方法,又会调用prepareCommit方法,最终会调用org.apache.paimon.mergetree.MergeTreeWriter#prepareCommit
因此Flink每次进行checkpoint的时候,Paimon都会强制进行Memory Flush,完成数据的落盘,保证数据写入到文件系统,完成写入事务,保证一致性。
1 | org.apache.paimon.flink.sink.PrepareCommitOperator#prepareSnapshotPreBarrier |
2、CommitterOperator
这个算子实现了org.apache.flink.api.common.state.CheckpointListener#notifyCheckpointComplete
方法,该方法会在FLink CK完成后被调用,paimon-flink-sink在这个方法中会进行snapshot快照的提交,主要就是将本次快照生成的snapshot、manifest文件写入到文件系统。