Skip to content

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477

Open
aho135 wants to merge 21 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream
Open

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477
aho135 wants to merge 21 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream

Conversation

@aho135
Copy link
Copy Markdown
Contributor

@aho135 aho135 commented May 18, 2026

This change adds an endpoint called resetOffsetsAndBackfill to SupervisorResource. This is a useful feature for operating Druid clusters where the most recent data is the most important (such as alerting use cases).

Description

Adds an endpoint called resetOffsetsAndBackfill to automatically ingest skipped data in the case where the offset is reset to latest. This requires useEarliestOffset=false, useConcurrentLocks=true because there can be conflicting time intervals between the backfill task and the main supervisor tasks and the Supervisor needs to be in a running state in order to call updatePartitionLagFromStream() to get the latest offsets

Release note

Adds an endpoint called resetOffsetsAndBackfill for Stream Supervisors. This is a useful feature for operating Druid clusters where the most recent data is the most important


Key changed/added classes in this PR
  • SupervisorResource
  • SupervisorManager

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@aho135 aho135 mentioned this pull request May 18, 2026
9 tasks
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3
Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3

Reviewed 11 of 11 changed files.


This is an automated review by Codex GPT-5.5


@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Kinesis cannot provide backfill end offsets

The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming. I reviewed all 11 changed files in the current head; since this PR still exposes /resetOffsetsAndBackfill to every SeekableStreamSupervisor, Kinesis still reaches SupervisorManager.resetSupervisorAndBackfill(), calls updatePartitionLagFromStream(), then sees the base empty getLatestSequencesFromStream() and returns a 500. If Kinesis support is intentionally deferred, can this PR explicitly reject or disable Kinesis and document the supported supervisor types, rather than leaving the new endpoint available but broken until the follow-up PR lands?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021! I'm not able to do an instanceof check in SupervisorManager due to module dependencies. I could add something like supportsResetAndBackfill to SeekableStreamSupervisor but seems like overkill given that it'll be reverted after getLatestSequencesFromStream() is implemented for Kinesis. Given that the current response of 500 is accurate I think it's fine for now

Alternatively we can hold off on merging this until the Kinesis changes are in

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think returning a 5xx that should be good enough

We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that sounds reasonable if Kinesis support is intentionally deferred. The current docs still say the endpoint applies to a streaming supervisor, and the same page describes streaming supervisors as Kafka or Kinesis, so Kinesis users would still be directed to an endpoint that is expected to 500. Reviewed 11 of 11 changed files; I think this just needs the explicit Kinesis limitation in the new docs section.

Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aho135! Left some comments

Comment thread docs/api-reference/supervisor-api.md Outdated
The following requirements must be met before calling this endpoint:

- The supervisor must be a `SeekableStreamSupervisor`.
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this requirement needed? With useConcurrentLocks and two parallel supervisors, I'd imagine this would still work if the offset ranges are overlapping

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still a requirement even with separate supervisors. This is essentially multi-supervisor support which requires useConcurrentLocks, as the two supervisors can be writing to the same interval

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I was thinking about this requirement: The supervisor's useEarliestSequenceNumber property must be false

Thinking about it more, ingesting from earliest + backfill doesn’t make much sense for the generic reset case, so this seems like a useful guardrail.


@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think returning a 5xx that should be good enough

We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands

}

@Test
public void testResetSupervisorAndBackfill() throws Exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tests @aho135! Do you think it’d be possible to extend the embedded test KafkaBoundedSupervisorTest you added to include some end-to-end coverage for this API as well?

Comment on lines +393 to +433
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");

Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
}
SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs;
SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs;

// Verify useEarliestOffset is false
if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true.");
}

if (!specHasConcurrentLocks(streamSpec)) {
throw new IAE(
"Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks"
);
}

// We need an active recordSupplier to query the latest offsets from the stream
if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) {
throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id);
}

log.info("Capturing latest offsets from stream for supervisor[%s]", id);
streamSupervisor.updatePartitionLagFromStream();
Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();

log.info("Capturing checkpointed offsets for supervisor[%s]", id);
Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();

// Validate that we successfully retrieved offsets
if (endOffsets == null || endOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id);
}
if (startOffsets == null || startOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It’ll likely be cleaner to move these validations into their own function

@github-actions github-actions Bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels May 21, 2026
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1
Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

Reviewed 17 of 17 changed files.


This is an automated review by Codex GPT-5.5

endOffsets
);

streamSupervisor.resetOffsets(resetMetadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Avoid resetting the main supervisor with stale offsets

The main supervisor keeps ingesting while this method captures endOffsets/startOffsets, creates the backfill supervisor, and only then calls resetOffsets. If a running task checkpoints after the captured endOffsets but before this call, resetOffsetsInternal builds currentMetadata.plus(resetMetadata), and plus overwrites the newer partition value with the stale captured end offset. That rolls the main supervisor checkpoint backward and can reingest rows that were already published. The reset should be conditional on metadata still matching the captured range, retried with fresh offsets, or performed while the main supervisor is quiesced.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankChen021 This should be a very rare scenario since I expect this endpoint only to be used if there is some lag built up. I did push a change to prefer the checkpointed offset if it is greater than the requested reset offset to handle this edge case

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue still stands that there can be duplicate ingestion with the backfill Supervisor if the main Supervisor checkpoints after startOffsets is captured. To completely remove this race I think we'd need to suspend the supervisor and wait for all tasks to gracefully shutdown. This is something that I was trying to avoid as then we would need to deal with timeouts and such. Let me know your thoughts on this one though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding resetOffsetsForwardOnly; it handles the common case where the checkpoint has already advanced by the time the reset notice reads metadata. I still think the rollback race is not fully closed, though: resetOffsetsInternal reads current metadata, computes the max map, then calls resetDataSourceMetadata, and that storage method does an unconditional UPDATE ... WHERE dataSource = :dataSource. If a task publishes/checkpoints after the read but before that update, the max is computed from stale metadata and the reset can still overwrite the newer checkpoint. So I would keep this open unless the reset write is CAS/retried against the metadata it read, or the main supervisor is quiesced before capturing/resetting. I agree the backfill-overlap duplicate case is separate and requires either accepting/documenting possible duplicates or suspending the main supervisor. Reviewed 17 of 17 changed files.

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021

I am leaning towards reverting the resetOffsetsForwardOnly changes. My reasoning is that "backfill-overlap duplicate case" is not solvable without suspending and task shutdown. I expect that this duplication scenario is more common than the one resetOffsetsForwardOnly addresses, since it would only make sense to use this endpoint in the case where there is some lag built up. If we're not solving that scenario and accepting that this endpoint has the potential for duplicate ingestion then I think it would make sense to just accept the limitation and document it rather than adding additional complexity to solve the even more rare duplication scenario.

Since many data streaming pipelines follow at least once ingestion and the duplication scenario is rare and bounded I think this is a reasonable trade-off to not have to deal with suspension and task shutdown

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that trade-off makes sense for the separate backfill-overlap duplicate case. I would still keep the reset metadata issue distinct, though. The current resetOffsetsForwardOnly code only protects against checkpoints that are already visible when the reset notice reads metadata; after that it computes the max offsets and calls resetDataSourceMetadata, which still performs an unconditional update. A task checkpoint between that read and update can still be overwritten. If the endpoint intentionally accepts that as bounded at-least-once duplicate risk, the docs should say the main supervisor is not quiesced and duplicate ingestion can happen both from backfill overlap and from this reset race. Otherwise this still needs CAS/retry or suspend/drain before the reset.

Reviewed 17 of 17 changed files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 this makes sense. I went ahead and reverted the resetOffsetsForwardOnly changes and updated the docs explaining the two duplication scenarios. Appreciate the review on this one!

aho135 added 4 commits May 22, 2026 14:51
…t duplicate ingestion"

resetOffsetsForwardOnly does not fully close the race it targets (the write is
still unconditional) and the duplicate scenario it addresses is narrower than
the overlap case, which cannot be solved without suspending the main supervisor.
Accepting the limitation and documenting it is preferable to the added complexity.

This reverts commit 89b5fec.
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

The reset/backfill duplicate-ingestion follow-up is handled: the forward-only reset helper was reverted and the docs now describe both the backfill-overlap and reset-race duplicate windows.

Reviewed 17 of 17 changed files.


This is an automated review by Codex GPT-5.5


private void validateResetAndBackfill(String id, Pair<Supervisor, SupervisorSpec> supervisorPair)
{
if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Reject unsupported Kinesis supervisors before reset/backfill

The new endpoint is documented as unsupported for Amazon Kinesis, but this guard accepts any SeekableStreamSupervisor. KinesisSupervisor extends that base class and does not override getLatestSequencesFromStream, so a Kinesis request that otherwise passes the useEarliest/useConcurrent/RUNNING checks reaches the empty latest-offset check and is returned as a 500 instead of a clear unsupported-supervisor 400. Please either reject Kinesis/supervisors that cannot provide latest offsets here, or implement the latest-offset capture needed for Kinesis.

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our earlier conversation I will leave this one as is. We have the Kinesis implementation planned for within the next two weeks

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.

Follow-up checked: the Kinesis reset/backfill concern has been acknowledged as a known gap with a planned implementation, so no inline reply is useful here.

Reviewed 17 of 17 changed files.


This is an automated review by Codex GPT-5.5


private void validateResetAndBackfill(String id, Pair<Supervisor, SupervisorSpec> supervisorPair)
{
if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe make this invoke requireStreamSupervisor() for consistency with the other manager operations

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I'm thinking to leave this as is since in this case we'd be discarding the returned value and the exception handling differs slightly

The following requirements must be met before calling this endpoint:

- The supervisor must be a `SeekableStreamSupervisor`.
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I was thinking about this requirement: The supervisor's useEarliestSequenceNumber property must be false

Thinking about it more, ingesting from earliest + backfill doesn’t make much sense for the generic reset case, so this seems like a useful guardrail.

- **Backfill overlap:** Any tasks that were in-flight at the time of the reset may publish segments covering part of the backfill range before being shut down.
- **Reset race:** If a task checkpoint is written to the metadata store between when this endpoint captures the current offsets and when it applies the reset, that checkpoint can be overwritten, causing the main supervisor to re-ingest already-processed data.

Both windows are narrow in practice, but cannot be fully eliminated without manually suspending the main supervisor before calling this endpoint and waiting for all pending tasks to complete.
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable to me. If an exactly-once guarantee is truly required, operators can still perform these steps manually, right?

I’m not sure what it would take to fully bake suspend + handoff semantics into this API, but that’s something we can evolve in the future if needed. In the interim, as the docs call out, operators can still:

suspend supervisor → wait for tasks to checkpoint and complete → kick off backfill supervisor with checkpoints & reset main supervisor

(Fwiw, resetOffsetsAutomatically and hard resets have similar caveats around exactly-once semantics)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code requires the Supervisor to be in RUNNING state. To get this working with SUSPENDED we can drop that requirement. The usage is a bit clunky though. It would look something like

  1. Suspend the supervisor and wait for all tasks to complete
  2. Call resetToLatestAndBackfill. Since the backfill supervisor reuses the main supervisor spec it also gets created in SUSPENDED state
  3. Manually resume both supervisors

Some minor modifications also need to be made to Kafka's updatePartitionLagFromStream. This function doesn't work in isolation as it requires first calling recordSupplier.assign(partitions) otherwise recordSupplier.seekToLatest(partitions) will throw an IllegalStateException. Currently this is handled by assignRecordSupplierToPartitionIds() in SeekableStreamSupervisor. Similar tweaks may be needed for Rabbit/Kinesis. I thought to leave this as a follow up if it's needed in the future


#### Sample request

The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up, what do you think about wiring this API to the web-console?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call, will do this in a follow up!

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.

Reviewed 23 of 23 changed files.


This is an automated review by Codex GPT-5.5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants