Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public class CDCSourceSplitReader
private TableReaderInfo currentTableReaderInfo;
@Nullable private LazyRecordReader currentReader;
@Nullable private String currentSplitId;
private long currentNumRead;
private long currentDataRowsRead;
private RecordIterator<InternalRow> currentFirstBatch;
private final Queue<SchemaChangeEvent> currentSchemaChangeEvents = new LinkedList<>();

private boolean paused;
private final AtomicBoolean wakeup;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void wakeUp() {

@Override
public void close() throws Exception {
currentSchemaChangeEvents.clear();
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
Expand Down Expand Up @@ -214,13 +216,17 @@ private void checkSplitOrStartNext() throws IOException {
List<SchemaChangeEvent> schemaChangeEvents =
tableManager.generateSchemaChangeEventList(
identifier, nextSplit.getLastSchemaId(), nextSplit.getSchemaId());
currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema, schemaChangeEvents);
currentTableReaderInfo = new TableReaderInfo(identifier, tableSchema);
currentSplitId = nextSplit.splitId();
currentReader = createLazyRecordReader(nextSplit.split());
currentNumRead = nextSplit.recordsToSkip();
currentDataRowsRead = nextSplit.recordsToSkip();
currentSchemaChangeEvents.clear();
if (currentDataRowsRead == 0) {
currentSchemaChangeEvents.addAll(schemaChangeEvents);
}

if (currentNumRead > 0) {
seek(currentNumRead);
if (currentDataRowsRead > 0) {
seek(currentDataRowsRead);
}
}

Expand Down Expand Up @@ -257,6 +263,7 @@ private CDCRecordsWithSplitIds finishSplit() throws IOException {
currentReader = null;
}

currentSchemaChangeEvents.clear();
final CDCRecordsWithSplitIds finishRecords =
CDCRecordsWithSplitIds.finishedSplit(currentSplitId);
currentSplitId = null;
Expand All @@ -271,33 +278,24 @@ private class FileStoreRecordIterator implements BulkFormat.RecordIterator<Event
new MutableRecordAndPosition<>();

private TableReaderInfo tableReaderInfo;
private final Queue<SchemaChangeEvent> schemaChangeEventList = new LinkedList<>();

public FileStoreRecordIterator replace(
RecordIterator<InternalRow> iterator, TableReaderInfo tableReaderInfo) {
this.iterator = iterator;
this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentNumRead);
this.recordAndPosition.set(null, RecordAndPosition.NO_OFFSET, currentDataRowsRead);
this.tableReaderInfo = tableReaderInfo;
this.schemaChangeEventList.addAll(tableReaderInfo.schemaChangeEvents);
return this;
}

@Nullable
@Override
public RecordAndPosition<Event> next() {
Event event = nextEvent();
if (event == null) {
return null;
}

recordAndPosition.setNext(event);
currentNumRead++;
return recordAndPosition;
}

private Event nextEvent() {
if (!schemaChangeEventList.isEmpty()) {
return schemaChangeEventList.poll();
if (!currentSchemaChangeEvents.isEmpty()) {
recordAndPosition.set(
currentSchemaChangeEvents.poll(),
RecordAndPosition.NO_OFFSET,
currentDataRowsRead);
return recordAndPosition;
}

InternalRow row;
Expand All @@ -310,11 +308,14 @@ private Event nextEvent() {
return null;
}

return convertRowToDataChangeEvent(
tableReaderInfo.tableId,
row,
tableReaderInfo.fieldGetters,
tableReaderInfo.generator);
recordAndPosition.setNext(
convertRowToDataChangeEvent(
tableReaderInfo.tableId,
row,
tableReaderInfo.fieldGetters,
tableReaderInfo.generator));
currentDataRowsRead++;
return recordAndPosition;
}

@Override
Expand Down Expand Up @@ -358,19 +359,14 @@ private static class TableReaderInfo {
private final Identifier identifier;
private final TableId tableId;
private final TableSchema currentSchema;
private final List<SchemaChangeEvent> schemaChangeEvents;
private final BinaryRecordDataGenerator generator;
private final List<InternalRow.FieldGetter> fieldGetters;

private TableReaderInfo(
Identifier identifier,
TableSchema currentSchema,
List<SchemaChangeEvent> schemaChangeEvents) {
private TableReaderInfo(Identifier identifier, TableSchema currentSchema) {
this.identifier = identifier;
this.tableId = TableId.tableId(identifier.getDatabaseName(), identifier.getTableName());

this.currentSchema = currentSchema;
this.schemaChangeEvents = schemaChangeEvents;

org.apache.flink.cdc.common.schema.Schema currentCDCSchema =
convertPaimonSchemaToFlinkCDCSchema(currentSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
import org.apache.paimon.utils.RecordWriter;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
Expand All @@ -62,6 +66,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -144,8 +149,15 @@ public void testSplitReaderWakeupAble() throws Exception {
}

private CDCSourceSplitReader createReader(TableRead tableRead) {
return createReader(tableRead, Collections.emptyList());
}

private CDCSourceSplitReader createReader(
TableRead tableRead, List<SchemaChangeEvent> schemaChangeEvents) {
return new TestCDCSourceSplitReader(
new FileStoreSourceReaderMetrics(new DummyMetricGroup()), tableRead);
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
tableRead,
schemaChangeEvents);
}

private void innerTestOnce(int skip) throws Exception {
Expand Down Expand Up @@ -251,6 +263,92 @@ public void testMultipleBatchInSplit() throws Exception {
reader.close();
}

@Test
public void testSchemaChangeEventOnlyEmittedOnceInMultipleBatchSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents());

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);

List<Tuple2<Long, Long>> input2 = kvs(6);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files));

RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = reader.fetch();
assertThat(readEventTypes(records, "id1"))
.containsExactly(
SchemaChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class);

records = reader.fetch();
assertThat(readEventTypes(records, "id1"))
.containsExactly(
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class,
DataChangeEvent.class);

records = reader.fetch();
assertRecords(records, "id1", "id1", 0, null);

reader.close();
}

@Test
public void testSchemaChangeEventDoesNotAdvanceRecordsToSkip() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents());

List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files));

RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = reader.fetch();
assertThat(readRecordSkipCounts(records, "id1"))
.containsExactly(0L, 1L, 2L, 3L, 4L, 5L, 6L);

reader.close();
}

@Test
public void testRestoreWithSchemaChangeEventsDoesNotReemitSchemaEvent() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
CDCSourceSplitReader reader = createReader(rw.createReadWithKey(), schemaChangeEvents());

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);

List<Tuple2<Long, Long>> input2 = kvs(6);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, input1.size()));

RecordsWithSplitIds<BulkFormat.RecordIterator<Event>> records = reader.fetch();
assertRecords(records, null, "id1", input1.size(), Collections.emptyList());

records = reader.fetch();
assertRecords(
records,
null,
"id1",
input1.size(),
input2.stream().map(t -> t.f1).collect(Collectors.toList()));

reader.close();
}

@Test
public void testRestore() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
Expand Down Expand Up @@ -456,6 +554,53 @@ private List<Tuple2<RowKind, Long>> readRecords(
return result;
}

private List<Class<?>> readEventTypes(
RecordsWithSplitIds<RecordIterator<Event>> records, String nextSplit) {
assertThat(records.finishedSplits()).isEmpty();
assertThat(records.nextSplit()).isEqualTo(nextSplit);
List<Class<?>> result = new ArrayList<>();
RecordIterator<Event> iterator;
while ((iterator = records.nextRecordFromSplit()) != null) {
RecordAndPosition<Event> record;
while ((record = iterator.next()) != null) {
result.add(
record.getRecord() instanceof SchemaChangeEvent
? SchemaChangeEvent.class
: DataChangeEvent.class);
}
}
records.recycle();
return result;
}

private List<Long> readRecordSkipCounts(
RecordsWithSplitIds<RecordIterator<Event>> records, String nextSplit) {
assertThat(records.finishedSplits()).isEmpty();
assertThat(records.nextSplit()).isEqualTo(nextSplit);
List<Long> result = new ArrayList<>();
RecordIterator<Event> iterator;
while ((iterator = records.nextRecordFromSplit()) != null) {
RecordAndPosition<Event> record;
while ((record = iterator.next()) != null) {
result.add(record.getRecordSkipCount());
}
}
records.recycle();
return result;
}

private List<SchemaChangeEvent> schemaChangeEvents() {
return Collections.singletonList(
new AddColumnEvent(
TableId.tableId(DATABASE, TABLE),
Arrays.asList(
AddColumnEvent.last(
Column.physicalColumn(
"extra",
org.apache.flink.cdc.common.types.DataTypes
.BIGINT())))));
}

private List<Tuple2<Long, Long>> kvs() {
return kvs(0);
}
Expand Down Expand Up @@ -524,8 +669,11 @@ public static TableAwareFileStoreSourceSplit newSourceSplit(
private static class TestCDCSourceSplitReader extends CDCSourceSplitReader {
private final TableRead tableRead;

public TestCDCSourceSplitReader(FileStoreSourceReaderMetrics metrics, TableRead tableRead) {
super(metrics, new TestTableManager(tableRead));
public TestCDCSourceSplitReader(
FileStoreSourceReaderMetrics metrics,
TableRead tableRead,
List<SchemaChangeEvent> schemaChangeEvents) {
super(metrics, new TestTableManager(tableRead, schemaChangeEvents));
this.tableRead = tableRead;
}

Expand Down Expand Up @@ -555,10 +703,16 @@ public RecordReader<InternalRow> recordReader() throws IOException {

private static class TestTableManager extends CDCSource.TableManager {
private final TableRead tableRead;
private final List<SchemaChangeEvent> schemaChangeEvents;

public TestTableManager(TableRead tableRead) {
this(tableRead, Collections.emptyList());
}

public TestTableManager(TableRead tableRead, List<SchemaChangeEvent> schemaChangeEvents) {
super(null, null, null);
this.tableRead = tableRead;
this.schemaChangeEvents = schemaChangeEvents;
}

@Override
Expand All @@ -571,5 +725,11 @@ public TestTableManager(TableRead tableRead) {
public TableRead getTableRead(Identifier identifier, TableSchema schema) {
return tableRead;
}

@Override
public List<SchemaChangeEvent> generateSchemaChangeEventList(
Identifier identifier, @Nullable Long lastSchemaId, long schemaId) {
return schemaChangeEvents;
}
}
}
Loading