Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ scalastyle-output.xml
.bloop
.cursor
.claude
.worktrees
.metadata
.settings
.project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
Expand All @@ -43,9 +44,11 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -140,6 +143,12 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl

private boolean closed;

/**
* Handles pinned (retained) per checkpoint to prevent premature discard during truncation. Keyed
* by checkpointId, value is the list of handles that were retained for that checkpoint.
*/
private final TreeMap<Long, List<StreamStateHandle>> checkpointPinnedHandles = new TreeMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious - do we need the values to have order? If not can we use a Set.


private final MailboxExecutor mailboxExecutor;

private final TaskChangelogRegistry changelogRegistry;
Expand Down Expand Up @@ -253,12 +262,13 @@ private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persis
if (range.size() == readyToReturn.size()) {
checkState(toUpload.isEmpty());
return CompletableFuture.completedFuture(
buildSnapshotResult(keyGroupRange, readyToReturn, 0L));
buildSnapshotResult(keyGroupRange, readyToReturn, 0L, checkpointId));
} else {
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
new CompletableFuture<>();
uploadCompletionListeners.add(
new UploadCompletionListener(keyGroupRange, range, readyToReturn, future));
new UploadCompletionListener(
keyGroupRange, range, readyToReturn, future, checkpointId));
if (!toUpload.isEmpty()) {
UploadTask uploadTask =
new UploadTask(
Expand Down Expand Up @@ -335,6 +345,10 @@ public void close() {
activeChangeSetSize = 0;
notUploaded.clear();
uploaded.clear();
for (List<StreamStateHandle> handles : checkpointPinnedHandles.values()) {
handles.forEach(changelogRegistry::release);
}
checkpointPinnedHandles.clear();
}

@Override
Expand Down Expand Up @@ -380,6 +394,21 @@ public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {
from,
to,
activeSequenceNumber);
// Unpin handles for this and all older checkpoints.
// Using stopTracking (not release) because JM has confirmed ownership.
NavigableMap<Long, List<StreamStateHandle>> toUnpin =
checkpointPinnedHandles.headMap(checkpointId, true);
if (!toUnpin.isEmpty()) {
LOG.debug(
Copy link
Copy Markdown
Contributor

@davidradl davidradl Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:move the for into the if.

"Unpinning handles for {} checkpoint(s) up to {}",
toUnpin.size(),
checkpointId);
}
for (List<StreamStateHandle> handles : toUnpin.values()) {
handles.forEach(changelogRegistry::stopTracking);
}
toUnpin.clear();

// it is possible that "uploaded" has already been truncated (after checkpoint subsumption)
// so do not check that "uploaded" contains the specified range
LOG.debug("Confirm [{}, {})", from, to);
Expand All @@ -400,14 +429,27 @@ public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {

@Override
public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) {
// Release pinned handles for the aborted checkpoint only.
List<StreamStateHandle> pinned = checkpointPinnedHandles.remove(checkpointId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious should we be checking SequenceNumber from or SequenceNumber to here?

if (pinned != null) {
LOG.debug(
"Releasing {} pinned handles for aborted checkpoint {}",
pinned.size(),
checkpointId);
pinned.forEach(changelogRegistry::release);
}

// delete all accumulated local dstl files when abort
localChangelogRegistry.discardUpToCheckpoint(checkpointId + 1);
}

private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
KeyGroupRange keyGroupRange,
NavigableMap<SequenceNumber, UploadResult> results,
long incrementalSize) {
long incrementalSize,
long checkpointId) {
pinHandlesForCheckpoint(results, checkpointId);

List<Tuple2<StreamStateHandle, Long>> tuples = new ArrayList<>();
long size = 0;
for (UploadResult uploadResult : results.values()) {
Expand Down Expand Up @@ -452,6 +494,33 @@ private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
return SnapshotResult.of(jmChangelogStateHandle);
}

private void pinHandlesForCheckpoint(
NavigableMap<SequenceNumber, UploadResult> results, long checkpointId) {
if (checkpointId == DUMMY_PERSIST_CHECKPOINT) {
return;
}
Set<PhysicalStateHandleID> seen = new HashSet<>();
List<StreamStateHandle> pinned = new ArrayList<>();
for (UploadResult result : results.values()) {
StreamStateHandle handle = result.getStreamStateHandle();
if (seen.add(handle.getStreamStateHandleID())) {
changelogRegistry.retain(handle);
pinned.add(handle);
}
StreamStateHandle localHandle = result.getLocalStreamHandleStateHandle();
if (localHandle != null && seen.add(localHandle.getStreamStateHandleID())) {
changelogRegistry.retain(localHandle);
pinned.add(localHandle);
}
}
if (!pinned.isEmpty()) {
LOG.debug("Pinned {} handles for checkpoint {}", pinned.size(), checkpointId);
checkpointPinnedHandles
.computeIfAbsent(checkpointId, k -> new ArrayList<>())
.addAll(pinned);
}
}

@VisibleForTesting
SequenceNumber lastAppendedSqnUnsafe() {
return activeSequenceNumber;
Expand Down Expand Up @@ -482,19 +551,22 @@ private final class UploadCompletionListener {
completionFuture;
private final KeyGroupRange keyGroupRange;
private final SequenceNumberRange changeRange;
private final long checkpointId;

private UploadCompletionListener(
KeyGroupRange keyGroupRange,
SequenceNumberRange changeRange,
Map<SequenceNumber, UploadResult> uploaded,
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
completionFuture) {
completionFuture,
long checkpointId) {
checkArgument(
!changeRange.isEmpty(), "Empty change range not allowed: %s", changeRange);
this.uploaded = new TreeMap<>(uploaded);
this.completionFuture = completionFuture;
this.keyGroupRange = keyGroupRange;
this.changeRange = changeRange;
this.checkpointId = checkpointId;
}

public boolean onSuccess(List<UploadResult> uploadResults) {
Expand All @@ -505,7 +577,11 @@ public boolean onSuccess(List<UploadResult> uploadResults) {
incrementalSize += uploadResult.getSize();
if (uploaded.size() == changeRange.size()) {
completionFuture.complete(
buildSnapshotResult(keyGroupRange, uploaded, incrementalSize));
buildSnapshotResult(
keyGroupRange,
uploaded,
incrementalSize,
checkpointId));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public interface TaskChangelogRegistry {
/** Stop tracking the state, so that it's not tracked (some other component is doing that). */
void stopTracking(StreamStateHandle handle);

/**
* Increase the reference count of the state by one, e.g. to pin a handle referenced by an
* in-flight checkpoint snapshot so that concurrent truncation cannot discard it prematurely.
*/
void retain(StreamStateHandle handle);

/**
* Decrease the reference count of the state by one, e.g. if it was pre-emptively uploaded and
* materialized. Once the reference count reaches zero, it is discarded (unless it was {@link
Expand All @@ -67,6 +73,9 @@ public void startTracking(StreamStateHandle handle, long refCount) {}
@Override
public void stopTracking(StreamStateHandle handle) {}

@Override
public void retain(StreamStateHandle handle) {}

@Override
public void release(StreamStateHandle handle) {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public void stopTracking(StreamStateHandle handle) {
entries.remove(handle.getStreamStateHandleID());
}

@Override
public void retain(StreamStateHandle handle) {
PhysicalStateHandleID key = handle.getStreamStateHandleID();
LOG.debug("state reference count increased by one, key: {}, state: {}", key, handle);

entries.compute(
key,
(handleID, refCount) -> {
if (refCount == null) {
LOG.warn("state is not in tracking, key: {}, state: {}", key, handle);
return null;
}
return refCount + 1;
});
}

@Override
public void release(StreamStateHandle handle) {
PhysicalStateHandleID key = handle.getStreamStateHandleID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,98 @@ void testPersistNonFailedChanges() throws Exception {
});
}

@Test
void testHandleNotDiscardedAfterTruncateBeforeConfirm() throws Exception {
long appendPersistThreshold = 1000;

TaskChangelogRegistry taskChangelogRegistry =
new TaskChangelogRegistryImpl(Executors.directExecutor());

try (DiscardRecordableStateChangeUploader uploader =
new DiscardRecordableStateChangeUploader(taskChangelogRegistry);
TestingBatchingUploadScheduler uploadScheduler =
new TestingBatchingUploadScheduler(uploader);
FsStateChangelogWriter writer =
new FsStateChangelogWriter(
UUID.randomUUID(),
KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
uploadScheduler,
appendPersistThreshold,
new SyncMailboxExecutor(),
taskChangelogRegistry,
TestLocalRecoveryConfig.disabled(),
LocalChangelogRegistry.NO_OP)) {
SequenceNumber initialSqn = writer.initialSequenceNumber();

writer.append(KEY_GROUP, getBytes(10)); // sqn: 0

long checkpointId = 1L;
// checkpoint 1 trigger & complete
SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
writer.persist(initialSqn, checkpointId);
uploadScheduler.scheduleAll();

// materialization completes — truncates changes before checkpoint1sqn
writer.truncate(checkpoint1sqn);

// now confirm — handle should NOT have been discarded by truncate
writer.confirm(initialSqn, checkpoint1sqn, checkpointId);

SnapshotResult<ChangelogStateHandleStreamImpl> result = future.get();
for (Tuple2<StreamStateHandle, Long> handleAndOffset :
result.getJobManagerOwnedSnapshot().getHandlesAndOffsets()) {
assertThat(uploader.isDiscarded(handleAndOffset.f0)).isFalse();
}
}
}

@Test
void testHandleDiscardedAfterTruncateAndReset() throws Exception {
long appendPersistThreshold = 1000;

TaskChangelogRegistry taskChangelogRegistry =
new TaskChangelogRegistryImpl(Executors.directExecutor());

try (DiscardRecordableStateChangeUploader uploader =
new DiscardRecordableStateChangeUploader(taskChangelogRegistry);
TestingBatchingUploadScheduler uploadScheduler =
new TestingBatchingUploadScheduler(uploader);
FsStateChangelogWriter writer =
new FsStateChangelogWriter(
UUID.randomUUID(),
KeyGroupRange.of(KEY_GROUP, KEY_GROUP),
uploadScheduler,
appendPersistThreshold,
new SyncMailboxExecutor(),
taskChangelogRegistry,
TestLocalRecoveryConfig.disabled(),
LocalChangelogRegistry.NO_OP)) {
SequenceNumber initialSqn = writer.initialSequenceNumber();

writer.append(KEY_GROUP, getBytes(10)); // sqn: 0

long checkpointId = 1L;
// checkpoint 1 trigger & complete
SequenceNumber checkpoint1sqn = writer.nextSequenceNumber();
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
writer.persist(initialSqn, checkpointId);
uploadScheduler.scheduleAll();

// materialization completes — truncates
writer.truncate(checkpoint1sqn);

// checkpoint aborted — handle should be discarded since truncation already released it
writer.reset(initialSqn, checkpoint1sqn, checkpointId);

SnapshotResult<ChangelogStateHandleStreamImpl> result = future.get();
for (Tuple2<StreamStateHandle, Long> handleAndOffset :
result.getJobManagerOwnedSnapshot().getHandlesAndOffsets()) {
assertThat(uploader.isDiscarded(handleAndOffset.f0)).isTrue();
}
}
}

@Test
void testTruncate() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,48 @@ void testNotDiscardedIfStoppedTracking() {
}
assertThat(handle.isDisposed()).isFalse();
}

@Test
void testRetainIncrementsRefCount() {
TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(directExecutor());
TestingStreamStateHandle handle = new TestingStreamStateHandle();
long refCount = 1;
registry.startTracking(handle, refCount);
registry.retain(handle);
// Original refCount=1 + retain=1 → need 2 releases to discard
registry.release(handle);
assertThat(handle.isDisposed()).isFalse();
registry.release(handle);
assertThat(handle.isDisposed()).isTrue();
}

@Test
void testRetainPreventsDiscard() {
TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(directExecutor());
TestingStreamStateHandle handle = new TestingStreamStateHandle();
long refCount = 2;
registry.startTracking(handle, refCount);
registry.retain(handle);
// Release the original 2 refs — should NOT discard because retain added one
registry.release(handle);
registry.release(handle);
assertThat(handle.isDisposed()).isFalse();
// Final release from the retain
registry.release(handle);
assertThat(handle.isDisposed()).isTrue();
}

@Test
void testRetainAndStopTracking() {
TaskChangelogRegistry registry = new TaskChangelogRegistryImpl(directExecutor());
TestingStreamStateHandle handle = new TestingStreamStateHandle();
long refCount = 1;
registry.startTracking(handle, refCount);
registry.retain(handle);
// stopTracking after retain — simulates JM taking ownership
registry.stopTracking(handle);
// Releases should not discard because stopTracking removed from registry
registry.release(handle);
assertThat(handle.isDisposed()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testMaterialization() throws Exception {
sharedObjects.add(new AtomicInteger());
SharedReference<Set<StateHandleID>> currentMaterializationId =
sharedObjects.add(ConcurrentHashMap.newKeySet());
StreamExecutionEnvironment env = getEnv(checkpointFolder, 100, 2, 200, 0);
StreamExecutionEnvironment env = getEnv(checkpointFolder, 100, 2, 50, 0);
waitAndAssert(
buildJobGraph(
delegatedStateBackend,
Expand Down