Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f0d63a3
resetOffsetsAndBackfill using bounded stream supervisor
aho135 May 18, 2026
750037d
Reject non-positive backfillTaskCount
aho135 May 18, 2026
9953135
Reset supervisor after backfill Supervisor has already been started
aho135 May 19, 2026
a5f169b
Add helper method specHasConcurrentLocks
aho135 May 19, 2026
e999260
Fix doc reference
aho135 May 21, 2026
eec7d88
Move validations into helper function
aho135 May 21, 2026
6fdb183
Add embedded-test for resetSupervisorAndBackfill
aho135 May 21, 2026
d40aee2
Remove flaky waitUntilPublishedRecordsAreIngested
aho135 May 21, 2026
dc6920f
Update KafkaBoundedSupervisorTest.java
aho135 May 21, 2026
946347a
Wait for supervisor to be RUNNING
aho135 May 22, 2026
89b5fec
Use checkpointed offset if > requested reset offset to prevent duplic…
aho135 May 22, 2026
551e887
Update KafkaBoundedSupervisorTest.java
aho135 May 23, 2026
c503e21
Revert "Use checkpointed offset if > requested reset offset to preven…
aho135 May 26, 2026
6c10fca
Doc update - duplication notice and Kinesis callout
aho135 May 26, 2026
3898688
Rename endpoint from resetOffsetsAndBackfill to resetToLatestAndBackfill
aho135 May 28, 2026
a13169f
Update test name to reflect new endpoint
aho135 May 28, 2026
5860750
Address clean up from review comments
aho135 May 28, 2026
26c10f0
Log out start/end offsets
aho135 May 28, 2026
384cc6e
Add abstract createBackfillSpec
aho135 May 29, 2026
99755dc
Unit test createBackfillSpec
aho135 May 29, 2026
e00f33e
Fix deprecation notices
aho135 May 29, 2026
21a4ce7
Rename functions to align with new endpoint name
aho135 May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1
```
</details>

### Reset offsets to latest and start a backfill supervisor

This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. Amazon Kinesis is not supported yet.

Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range.

This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the this section's title could also be:
Reset offset to latest and start a backfill supervisor


**Duplicate ingestion notice:** The main supervisor is not quiesced before the reset. This means duplicate data can occur in two ways:
- **Backfill overlap:** Any tasks that were in-flight at the time of the reset may publish segments covering part of the backfill range before being shut down.
- **Reset race:** If a task checkpoint is written to the metadata store between when this endpoint captures the current offsets and when it applies the reset, that checkpoint can be overwritten, causing the main supervisor to re-ingest already-processed data.

Both windows are narrow in practice, but cannot be fully eliminated without manually suspending the main supervisor before calling this endpoint and waiting for all pending tasks to complete.
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable to me. If an exactly-once guarantee is truly required, operators can still perform these steps manually, right?

I’m not sure what it would take to fully bake suspend + handoff semantics into this API, but that’s something we can evolve in the future if needed. In the interim, as the docs call out, operators can still:

suspend supervisor → wait for tasks to checkpoint and complete → kick off backfill supervisor with checkpoints & reset main supervisor

(Fwiw, resetOffsetsAutomatically and hard resets have similar caveats around exactly-once semantics)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code requires the Supervisor to be in RUNNING state. To get this working with SUSPENDED we can drop that requirement. The usage is a bit clunky though. It would look something like

  1. Suspend the supervisor and wait for all tasks to complete
  2. Call resetToLatestAndBackfill. Since the backfill supervisor reuses the main supervisor spec it also gets created in SUSPENDED state
  3. Manually resume both supervisors

Some minor modifications also need to be made to Kafka's updatePartitionLagFromStream. This function doesn't work in isolation as it requires first calling recordSupplier.assign(partitions) otherwise recordSupplier.seekToLatest(partitions) will throw an IllegalStateException. Currently this is handled by assignRecordSupplierToPartitionIds() in SeekableStreamSupervisor. Similar tweaks may be needed for Rabbit/Kinesis. I thought to leave this as a follow up if it's needed in the future


The following requirements must be met before calling this endpoint:

- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md).
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this requirement needed? With useConcurrentLocks and two parallel supervisors, I'd imagine this would still work if the offset ranges are overlapping

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still a requirement even with separate supervisors. This is essentially multi-supervisor support which requires useConcurrentLocks, as the two supervisors can be writing to the same interval

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I was thinking about this requirement: The supervisor's useEarliestSequenceNumber property must be false

Thinking about it more, ingesting from earliest + backfill doesn’t make much sense for the generic reset case, so this seems like a useful guardrail.

- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks.
- The supervisor must be in a `RUNNING` state.

The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only.

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill`

#### Query parameters

| Parameter | Type | Description | Default |
|---------|---------|---------|---------|
| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. | Defaults to `taskCount` from the source supervisor if not specified |

#### Responses

<Tabs>

<TabItem value="5" label="200 SUCCESS">


*Successfully reset and started backfill supervisor*

</TabItem>
<TabItem value="6" label="400 BAD REQUEST">


*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)*

</TabItem>
<TabItem value="7" label="404 NOT FOUND">


*Invalid supervisor ID*

</TabItem>
<TabItem value="8" label="500 SERVER ERROR">


*Failed to retrieve stream offsets or serialize the backfill spec*

</TabItem>
</Tabs>

---

#### Sample request

The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For follow-up, what do you think about wiring this API to the web-console?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call, will do this in a follow up!


<Tabs>

<TabItem value="9" label="cURL">


```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2"
```

</TabItem>
<TabItem value="10" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2 HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media",
"backfillSupervisorId": "social_media_backfill_abcdefgh"
}
```
</details>

### Terminate a supervisor

Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds
Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR");
}

@Test
public void test_resetToLatestAndBackfill()
{
final String topic = IdUtils.getRandomId();
kafkaServer.createTopicWithPartitions(topic, 2);

// Create a streaming supervisor with concurrent locks and withUseEarliestSequenceNumber=false
final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer)
.withContext(Map.of("useConcurrentLocks", true))
.withIoConfig(io -> io
.withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null))
.withUseEarliestSequenceNumber(false)
)
.build(dataSource, topic);

cluster.callApi().postSupervisor(supervisor);

waitForSupervisorDetailedState(supervisor.getId(), "RUNNING");

final int totalRecords = publish1kRecords(topic, false);
waitUntilPublishedRecordsAreIngested(totalRecords);

// Reset the main supervisor and spin up a backfill supervisor.
// Since all records are already ingested before the call, the backfill
// supervisor will complete immediately without ingesting anything.
final Map<String, Object> result = cluster.callApi().resetToLatestAndBackfill(supervisor.getId());
Assertions.assertEquals(supervisor.getId(), result.get("id"));
final String backfillSupervisorId = (String) result.get("backfillSupervisorId");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, also validate that the primary supervisor ID is also included in the response of this API?


// Wait for the backfill to finish
waitForSupervisorToComplete(backfillSupervisorId);

// Main supervisor should still be running
final SupervisorStatus mainStatus = cluster.callApi().getSupervisorStatus(supervisor.getId());
Assertions.assertEquals("RUNNING", mainStatus.getState());
Assertions.assertTrue(mainStatus.isHealthy());

final SupervisorStatus backfillStatus = cluster.callApi().getSupervisorStatus(backfillSupervisorId);
Assertions.assertEquals("COMPLETED", backfillStatus.getState());
Assertions.assertTrue(backfillStatus.isHealthy());
}

private void waitForSupervisorToComplete(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
Expand All @@ -301,6 +343,15 @@ private void waitForSupervisorToComplete(String supervisorId)
);
}

private void waitForSupervisorDetailedState(String supervisorId, String detailedState)
{
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("supervisor/count")
.hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.hasDimension("detailedState", detailedState)
);
}

private void waitForSupervisorToBeUnhealthy(String supervisorId)
{
overlord.latchableEmitter().waitForEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, Long> currentOffs
}

@Override
protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -408,7 +408,7 @@ public LagStats computeLagStats()
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();

Expand All @@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream()
}

@Override
protected Map<String, Long> getLatestSequencesFromStream()
public Map<String, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
Expand Down Expand Up @@ -155,6 +156,55 @@ protected RabbitStreamSupervisorSpec toggleSuspend(boolean suspend)
supervisorStateManagerConfig);
}

@Override
public RabbitStreamSupervisorSpec createBackfillSpec(
String backfillId,
BoundedStreamConfig boundedStreamConfig,
@Nullable Integer taskCount
)
{
RabbitStreamSupervisorIOConfig ioConfig = getSpec().getIOConfig();
RabbitStreamSupervisorIOConfig backfillIoConfig = new RabbitStreamSupervisorIOConfig(
ioConfig.getStream(),
ioConfig.getUri(),
ioConfig.getInputFormat(),
ioConfig.getReplicas(),
taskCount != null ? taskCount : ioConfig.getTaskCount(),
ioConfig.getTaskDuration().toPeriod(),
ioConfig.getConsumerProperties(),
ioConfig.getAutoScalerConfig(),
ioConfig.getPollTimeout(),
ioConfig.getStartDelay().toPeriod(),
ioConfig.getPeriod().toPeriod(),
ioConfig.getCompletionTimeout().toPeriod(),
ioConfig.isUseEarliestSequenceNumber(),
ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
ioConfig.getStopTaskCount(),
ioConfig.getServerPriorityToReplicas(),
boundedStreamConfig
);
return new RabbitStreamSupervisorSpec(
backfillId,
null,
getSpec().getDataSchema(),
getSpec().getTuningConfig(),
backfillIoConfig,
getContext(),
isSuspended(),
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
(RabbitStreamIndexTaskClientFactory) indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig
);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPa
}

@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -548,7 +548,7 @@ private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(S
* </p>
*/
@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
Expand Down Expand Up @@ -597,7 +597,7 @@ private void updateOffsetSnapshot(
}

@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
public Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
Expand Down Expand Up @@ -630,7 +630,7 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
Expand All @@ -640,7 +640,7 @@ protected boolean isMultiTopic()
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
public Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}

@Override
public KafkaSupervisorSpec createBackfillSpec(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much cleaner - thanks for adding this!

String backfillId,
BoundedStreamConfig boundedStreamConfig,
@Nullable Integer taskCount
)
{
KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig();
KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig(
ioConfig.getTopic(),
ioConfig.getTopicPattern(),
ioConfig.getInputFormat(),
ioConfig.getReplicas(),
taskCount != null ? taskCount : ioConfig.getTaskCount(),
ioConfig.getTaskDuration().toPeriod(),
ioConfig.getConsumerProperties(),
ioConfig.getAutoScalerConfig(),
ioConfig.getLagAggregator(),
ioConfig.getPollTimeout(),
ioConfig.getStartDelay().toPeriod(),
ioConfig.getPeriod().toPeriod(),
ioConfig.isUseEarliestSequenceNumber(),
ioConfig.getCompletionTimeout().toPeriod(),
ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
ioConfig.getConfigOverrides(),
ioConfig.getIdleConfig(),
ioConfig.getStopTaskCount(),
ioConfig.isEmitTimeLagMetrics(),
ioConfig.getServerPriorityToReplicas(),
boundedStreamConfig
);
return new KafkaSupervisorSpec(
backfillId,
null,
getSpec().getDataSchema(),
getSpec().getTuningConfig(),
backfillIoConfig,
getContext(),
isSuspended(),
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
(KafkaIndexTaskClientFactory) indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig
);
}

/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic.
* <p>
Expand Down
Loading
Loading