diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java index 48c0ef82f25a..9aeb48567d0b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReader.java @@ -76,8 +76,9 @@ public class CDCSourceSplitReader private TableReaderInfo currentTableReaderInfo; @Nullable private LazyRecordReader currentReader; @Nullable private String currentSplitId; - private long currentNumRead; + private long currentDataRowsRead; private RecordIterator currentFirstBatch; + private final Queue currentSchemaChangeEvents = new LinkedList<>(); private boolean paused; private final AtomicBoolean wakeup; @@ -183,6 +184,7 @@ public void wakeUp() { @Override public void close() throws Exception { + currentSchemaChangeEvents.clear(); if (currentReader != null) { if (currentReader.lazyRecordReader != null) { currentReader.lazyRecordReader.close(); @@ -214,13 +216,17 @@ private void checkSplitOrStartNext() throws IOException { List schemaChangeEvents = tableManager.generateSchemaChangeEventList( identifier, nextSplit.getLastSchemaId(), nextSplit.getSchemaId()); - currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema, schemaChangeEvents); + currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema); currentSplitId = nextSplit.splitId(); currentReader = createLazyRecordReader(nextSplit.split()); - currentNumRead = nextSplit.recordsToSkip(); + currentDataRowsRead = nextSplit.recordsToSkip(); + currentSchemaChangeEvents.clear(); + if (currentDataRowsRead == 0) { + currentSchemaChangeEvents.addAll(schemaChangeEvents); + } - if (currentNumRead > 0) { - seek(currentNumRead); + if (currentDataRowsRead > 0) { + seek(currentDataRowsRead); } } @@ -257,6 +263,7 @@ private CDCRecordsWithSplitIds finishSplit() throws IOException { currentReader = null; } + currentSchemaChangeEvents.clear(); final CDCRecordsWithSplitIds finishRecords = CDCRecordsWithSplitIds.finishedSplit(currentSplitId); currentSplitId = null; @@ -271,33 +278,24 @@ private class FileStoreRecordIterator implements BulkFormat.RecordIterator(); private TableReaderInfo tableReaderInfo; - private final Queue schemaChangeEventList = new LinkedList<>(); public FileStoreRecordIterator replace( RecordIterator iterator, TableReaderInfo tableReaderInfo) { this.iterator = iterator; - this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentNumRead); + this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentDataRowsRead); this.tableReaderInfo = tableReaderInfo; - this.schemaChangeEventList.addAll(tableReaderInfo.schemaChangeEvents); return this; } @Nullable @Override public RecordAndPosition next() { - Event event = nextEvent(); - if (event == null) { - return null; - } - - recordAndPosition.setNext(event); - currentNumRead++; - return recordAndPosition; - } - - private Event nextEvent() { - if (!schemaChangeEventList.isEmpty()) { - return schemaChangeEventList.poll(); + if (!currentSchemaChangeEvents.isEmpty()) { + recordAndPosition.set( + currentSchemaChangeEvents.poll(), + RecordAndPosition.NO_OFFSET, + currentDataRowsRead); + return recordAndPosition; } InternalRow row; @@ -310,11 +308,14 @@ private Event nextEvent() { return null; } - return convertRowToDataChangeEvent( - tableReaderInfo.tableId, - row, - tableReaderInfo.fieldGetters, - tableReaderInfo.generator); + recordAndPosition.setNext( + convertRowToDataChangeEvent( + tableReaderInfo.tableId, + row, + tableReaderInfo.fieldGetters, + tableReaderInfo.generator)); + currentDataRowsRead++; + return recordAndPosition; } @Override @@ -358,19 +359,14 @@ private static class TableReaderInfo { private final Identifier identifier; private final TableId tableId; private final TableSchema currentSchema; - private final List schemaChangeEvents; private final BinaryRecordDataGenerator generator; private final List fieldGetters; - private TableReaderInfo( - Identifier identifier, - TableSchema currentSchema, - List schemaChangeEvents) { + private TableReaderInfo(Identifier identifier, TableSchema currentSchema) { this.identifier = identifier; this.tableId = TableId.tableId(identifier.getDatabaseName(), identifier.getTableName()); this.currentSchema = currentSchema; - this.schemaChangeEvents = schemaChangeEvents; org.apache.flink.cdc.common.schema.Schema currentCDCSchema = convertPaimonSchemaToFlinkCDCSchema(currentSchema); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java index e12ff8f19e9a..82199811b5e3 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/source/reader/CDCSourceSplitReaderTest.java @@ -42,9 +42,13 @@ import org.apache.paimon.utils.RecordWriter; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.OperationType; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; @@ -62,6 +66,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -144,8 +149,15 @@ public void testSplitReaderWakeupAble() throws Exception { } private CDCSourceSplitReader createReader(TableRead tableRead) { + return createReader(tableRead, Collections.emptyList()); + } + + private CDCSourceSplitReader createReader( + TableRead tableRead, List schemaChangeEvents) { return new TestCDCSourceSplitReader( - new FileStoreSourceReaderMetrics(new DummyMetricGroup()), tableRead); + new FileStoreSourceReaderMetrics(new DummyMetricGroup()), + tableRead, + schemaChangeEvents); } private void innerTestOnce(int skip) throws Exception { @@ -251,6 +263,92 @@ public void testMultipleBatchInSplit() throws Exception { reader.close(); } + @Test + public void testSchemaChangeEventOnlyEmittedOnceInMultipleBatchSplit() throws Exception { + TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath); + CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents()); + + List> input1 = kvs(); + List files = rw.writeFiles(row(1), 0, input1); + + List> input2 = kvs(6); + List files2 = rw.writeFiles(row(1), 0, input2); + files.addAll(files2); + + assignSplit(reader, newSourceSplit("id1", row(1), 0, files)); + + RecordsWithSplitIds> records = reader.fetch(); + assertThat(readEventTypes(records, "id1")) + .containsExactly( + SchemaChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class); + + records = reader.fetch(); + assertThat(readEventTypes(records, "id1")) + .containsExactly( + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class, + DataChangeEvent.class); + + records = reader.fetch(); + assertRecords(records, "id1", "id1", 0, null); + + reader.close(); + } + + @Test + public void testSchemaChangeEventDoesNotAdvanceRecordsToSkip() throws Exception { + TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath); + CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents()); + + List> input = kvs(); + List files = rw.writeFiles(row(1), 0, input); + + assignSplit(reader, newSourceSplit("id1", row(1), 0, files)); + + RecordsWithSplitIds> records = reader.fetch(); + assertThat(readRecordSkipCounts(records, "id1")) + .containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L); + + reader.close(); + } + + @Test + public void testRestoreWithSchemaChangeEventsDoesNotReemitSchemaEvent() throws Exception { + TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath); + CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents()); + + List> input1 = kvs(); + List files = rw.writeFiles(row(1), 0, input1); + + List> input2 = kvs(6); + List files2 = rw.writeFiles(row(1), 0, input2); + files.addAll(files2); + + assignSplit(reader, newSourceSplit("id1", row(1), 0, files, input1.size())); + + RecordsWithSplitIds> records = reader.fetch(); + assertRecords(records, null, "id1", input1.size(), Collections.emptyList()); + + records = reader.fetch(); + assertRecords( + records, + null, + "id1", + input1.size(), + input2.stream().map(t -> t.f1).collect(Collectors.toList())); + + reader.close(); + } + @Test public void testRestore() throws Exception { TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath); @@ -456,6 +554,53 @@ private List> readRecords( return result; } + private List> readEventTypes( + RecordsWithSplitIds> records, String nextSplit) { + assertThat(records.finishedSplits()).isEmpty(); + assertThat(records.nextSplit()).isEqualTo(nextSplit); + List> result = new ArrayList<>(); + RecordIterator iterator; + while ((iterator = records.nextRecordFromSplit()) != null) { + RecordAndPosition record; + while ((record = iterator.next()) != null) { + result.add( + record.getRecord() instanceof SchemaChangeEvent + ? SchemaChangeEvent.class + : DataChangeEvent.class); + } + } + records.recycle(); + return result; + } + + private List readRecordSkipCounts( + RecordsWithSplitIds> records, String nextSplit) { + assertThat(records.finishedSplits()).isEmpty(); + assertThat(records.nextSplit()).isEqualTo(nextSplit); + List result = new ArrayList<>(); + RecordIterator iterator; + while ((iterator = records.nextRecordFromSplit()) != null) { + RecordAndPosition record; + while ((record = iterator.next()) != null) { + result.add(record.getRecordSkipCount()); + } + } + records.recycle(); + return result; + } + + private List schemaChangeEvents() { + return Collections.singletonList( + new AddColumnEvent( + TableId.tableId(DATABASE, TABLE), + Arrays.asList( + AddColumnEvent.last( + Column.physicalColumn( + "extra", + org.apache.flink.cdc.common.types.DataTypes + .BIGINT()))))); + } + private List> kvs() { return kvs(0); } @@ -524,8 +669,11 @@ public static TableAwareFileStoreSourceSplit newSourceSplit( private static class TestCDCSourceSplitReader extends CDCSourceSplitReader { private final TableRead tableRead; - public TestCDCSourceSplitReader(FileStoreSourceReaderMetrics metrics, TableRead tableRead) { - super(metrics, new TestTableManager(tableRead)); + public TestCDCSourceSplitReader( + FileStoreSourceReaderMetrics metrics, + TableRead tableRead, + List schemaChangeEvents) { + super(metrics, new TestTableManager(tableRead, schemaChangeEvents)); this.tableRead = tableRead; } @@ -555,10 +703,16 @@ public RecordReader recordReader() throws IOException { private static class TestTableManager extends CDCSource.TableManager { private final TableRead tableRead; + private final List schemaChangeEvents; public TestTableManager(TableRead tableRead) { + this(tableRead, Collections.emptyList()); + } + + public TestTableManager(TableRead tableRead, List schemaChangeEvents) { super(null, null, null); this.tableRead = tableRead; + this.schemaChangeEvents = schemaChangeEvents; } @Override @@ -571,5 +725,11 @@ public TestTableManager(TableRead tableRead) { public TableRead getTableRead(Identifier identifier, TableSchema schema) { return tableRead; } + + @Override + public List generateSchemaChangeEventList( + Identifier identifier, @Nullable Long lastSchemaId, long schemaId) { + return schemaChangeEvents; + } } }