前言

最近发现从kafka同步到Paimon中的数据不正确。具体表现为,明明数据库中某条记录已经Update了,但是Paimon中的同一条记录没有同步更新。经过一系列的排查发现,是由于公司ogg json格式不统一,导致Flink ogg-json format解析失败,同时因为配置了ogg-json.ignore-parse-errors = true,最终导致整条ogg更新Record被丢弃,没有发送到下流的Paimon。

代码记录

org.apache.flink.formats.json.ogg.OggJsonDeserializationSchema#deserialize(byte[], org.apache.flink.util.Collector<org.apache.flink.table.data.RowData>)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
if (message == null || message.length == 0) {
// skip tombstone messages
return;
}
try {
final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root);

GenericRowData before = (GenericRowData) row.getField(0);
GenericRowData after = (GenericRowData) row.getField(1);
String op = row.getField(2).toString();
if (OP_CREATE.equals(op)) {
after.setRowKind(RowKind.INSERT);
emitRow(row, after, out);
} else if (OP_UPDATE.equals(op)) {
if (before == null) {
throw new IllegalStateException(
String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE"));
}

// for case: "before":{}
if (!root.get("before").isEmpty()) {
before.setRowKind(RowKind.UPDATE_BEFORE);
emitRow(row, before, out);
}

after.setRowKind(RowKind.UPDATE_AFTER);
emitRow(row, after, out);
} else if (OP_DELETE.equals(op)) {
if (before == null) {
throw new IllegalStateException(
String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE"));
}
before.setRowKind(RowKind.DELETE);
emitRow(row, before, out);
} else {
if (!ignoreParseErrors) {
throw new IOException(
format(
"Unknown \"op_type\" value \"%s\". The Ogg JSON message is '%s'",
op, new String(message)));
}
}
} catch (Throwable t) {
// a big try catch to protect the processing.
if (!ignoreParseErrors) {
throw new IOException(
format("Corrupt Ogg JSON message '%s'.", new String(message)), t);
}
}
}

PR

https://github.com/apache/flink/pull/23102