-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat: resetOffsetsAndBackfill using bounded stream supervisor #19477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
f0d63a3
750037d
9953135
a5f169b
e999260
eec7d88
6fdb183
d40aee2
dc6920f
946347a
89b5fec
551e887
c503e21
6c10fca
3898688
a13169f
5860750
26c10f0
384cc6e
99755dc
e00f33e
21a4ce7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1 | |
| ``` | ||
| </details> | ||
|
|
||
| ### Reset offsets to latest and start a backfill supervisor | ||
|
|
||
| This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. Amazon Kinesis is not supported yet. | ||
|
|
||
| Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range. | ||
|
|
||
| This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset. | ||
|
|
||
| **Duplicate ingestion notice:** The main supervisor is not quiesced before the reset. This means duplicate data can occur in two ways: | ||
| - **Backfill overlap:** Any tasks that were in-flight at the time of the reset may publish segments covering part of the backfill range before being shut down. | ||
| - **Reset race:** If a task checkpoint is written to the metadata store between when this endpoint captures the current offsets and when it applies the reset, that checkpoint can be overwritten, causing the main supervisor to re-ingest already-processed data. | ||
|
|
||
| Both windows are narrow in practice, but cannot be fully eliminated without manually suspending the main supervisor before calling this endpoint and waiting for all pending tasks to complete. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems reasonable to me. If an exactly-once guarantee is truly required, operators can still perform these steps manually, right? I’m not sure what it would take to fully bake suspend + handoff semantics into this API, but that’s something we can evolve in the future if needed. In the interim, as the docs call out, operators can still: suspend supervisor → wait for tasks to checkpoint and complete → kick off backfill supervisor with checkpoints & reset main supervisor (Fwiw,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current code requires the Supervisor to be in RUNNING state. To get this working with SUSPENDED we can drop that requirement. The usage is a bit clunky though. It would look something like
Some minor modifications also need to be made to Kafka's |
||
|
|
||
| The following requirements must be met before calling this endpoint: | ||
|
|
||
| - The supervisor must be a [streaming supervisor](../ingestion/supervisor.md). | ||
| - The supervisor's `useEarliestSequenceNumber` property must be `false`. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this requirement needed? With
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is still a requirement even with separate supervisors. This is essentially multi-supervisor support which requires useConcurrentLocks, as the two supervisors can be writing to the same interval
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yeah, I was thinking about this requirement: Thinking about it more, ingesting from earliest + backfill doesn’t make much sense for the generic reset case, so this seems like a useful guardrail. |
||
| - The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks. | ||
| - The supervisor must be in a `RUNNING` state. | ||
|
|
||
| The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only. | ||
|
|
||
| #### URL | ||
|
|
||
| `POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill` | ||
|
|
||
| #### Query parameters | ||
|
|
||
| | Parameter | Type | Description | Default | | ||
| |---------|---------|---------|---------| | ||
| | `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. | Defaults to `taskCount` from the source supervisor if not specified | | ||
|
|
||
| #### Responses | ||
|
|
||
| <Tabs> | ||
|
|
||
| <TabItem value="5" label="200 SUCCESS"> | ||
|
|
||
|
|
||
| *Successfully reset and started backfill supervisor* | ||
|
|
||
| </TabItem> | ||
| <TabItem value="6" label="400 BAD REQUEST"> | ||
|
|
||
|
|
||
| *Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)* | ||
|
|
||
| </TabItem> | ||
| <TabItem value="7" label="404 NOT FOUND"> | ||
|
|
||
|
|
||
| *Invalid supervisor ID* | ||
|
|
||
| </TabItem> | ||
| <TabItem value="8" label="500 SERVER ERROR"> | ||
|
|
||
|
|
||
| *Failed to retrieve stream offsets or serialize the backfill spec* | ||
|
|
||
| </TabItem> | ||
| </Tabs> | ||
|
|
||
| --- | ||
|
|
||
| #### Sample request | ||
|
|
||
| The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For follow-up, what do you think about wiring this API to the web-console?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good call, will do this in a follow up! |
||
|
|
||
| <Tabs> | ||
|
|
||
| <TabItem value="9" label="cURL"> | ||
|
|
||
|
|
||
| ```shell | ||
| curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2" | ||
| ``` | ||
|
|
||
| </TabItem> | ||
| <TabItem value="10" label="HTTP"> | ||
|
|
||
|
|
||
| ```HTTP | ||
| POST /druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2 HTTP/1.1 | ||
| Host: http://ROUTER_IP:ROUTER_PORT | ||
| ``` | ||
|
|
||
| </TabItem> | ||
| </Tabs> | ||
|
|
||
| #### Sample response | ||
|
|
||
| <details> | ||
| <summary>View the response</summary> | ||
|
|
||
| ```json | ||
| { | ||
| "id": "social_media", | ||
| "backfillSupervisorId": "social_media_backfill_abcdefgh" | ||
| } | ||
| ``` | ||
| </details> | ||
|
|
||
| ### Terminate a supervisor | ||
|
|
||
| Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -292,6 +292,48 @@ public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds | |
| Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); | ||
| } | ||
|
|
||
| @Test | ||
| public void test_resetToLatestAndBackfill() | ||
| { | ||
| final String topic = IdUtils.getRandomId(); | ||
| kafkaServer.createTopicWithPartitions(topic, 2); | ||
|
|
||
| // Create a streaming supervisor with concurrent locks and withUseEarliestSequenceNumber=false | ||
| final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer) | ||
| .withContext(Map.of("useConcurrentLocks", true)) | ||
| .withIoConfig(io -> io | ||
| .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) | ||
| .withUseEarliestSequenceNumber(false) | ||
| ) | ||
| .build(dataSource, topic); | ||
|
|
||
| cluster.callApi().postSupervisor(supervisor); | ||
|
|
||
| waitForSupervisorDetailedState(supervisor.getId(), "RUNNING"); | ||
|
|
||
| final int totalRecords = publish1kRecords(topic, false); | ||
| waitUntilPublishedRecordsAreIngested(totalRecords); | ||
|
|
||
| // Reset the main supervisor and spin up a backfill supervisor. | ||
| // Since all records are already ingested before the call, the backfill | ||
| // supervisor will complete immediately without ingesting anything. | ||
| final Map<String, Object> result = cluster.callApi().resetToLatestAndBackfill(supervisor.getId()); | ||
| Assertions.assertEquals(supervisor.getId(), result.get("id")); | ||
| final String backfillSupervisorId = (String) result.get("backfillSupervisorId"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For completeness, also validate that the primary supervisor ID is also included in the response of this API? |
||
|
|
||
| // Wait for the backfill to finish | ||
| waitForSupervisorToComplete(backfillSupervisorId); | ||
|
|
||
| // Main supervisor should still be running | ||
| final SupervisorStatus mainStatus = cluster.callApi().getSupervisorStatus(supervisor.getId()); | ||
| Assertions.assertEquals("RUNNING", mainStatus.getState()); | ||
| Assertions.assertTrue(mainStatus.isHealthy()); | ||
|
|
||
| final SupervisorStatus backfillStatus = cluster.callApi().getSupervisorStatus(backfillSupervisorId); | ||
| Assertions.assertEquals("COMPLETED", backfillStatus.getState()); | ||
| Assertions.assertTrue(backfillStatus.isHealthy()); | ||
| } | ||
|
|
||
| private void waitForSupervisorToComplete(String supervisorId) | ||
| { | ||
| overlord.latchableEmitter().waitForEvent( | ||
|
|
@@ -301,6 +343,15 @@ private void waitForSupervisorToComplete(String supervisorId) | |
| ); | ||
| } | ||
|
|
||
| private void waitForSupervisorDetailedState(String supervisorId, String detailedState) | ||
| { | ||
| overlord.latchableEmitter().waitForEvent( | ||
| event -> event.hasMetricName("supervisor/count") | ||
| .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) | ||
| .hasDimension("detailedState", detailedState) | ||
| ); | ||
| } | ||
|
|
||
| private void waitForSupervisorToBeUnhealthy(String supervisorId) | ||
| { | ||
| overlord.latchableEmitter().waitForEvent( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ | |
| import org.apache.druid.indexing.overlord.supervisor.Supervisor; | ||
| import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; | ||
| import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; | ||
| import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; | ||
| import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; | ||
| import org.apache.druid.java.util.common.StringUtils; | ||
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
|
|
@@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) | |
| ); | ||
| } | ||
|
|
||
| @Override | ||
| public KafkaSupervisorSpec createBackfillSpec( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks much cleaner - thanks for adding this! |
||
| String backfillId, | ||
| BoundedStreamConfig boundedStreamConfig, | ||
| @Nullable Integer taskCount | ||
| ) | ||
| { | ||
| KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig(); | ||
| KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig( | ||
| ioConfig.getTopic(), | ||
| ioConfig.getTopicPattern(), | ||
| ioConfig.getInputFormat(), | ||
| ioConfig.getReplicas(), | ||
| taskCount != null ? taskCount : ioConfig.getTaskCount(), | ||
| ioConfig.getTaskDuration().toPeriod(), | ||
| ioConfig.getConsumerProperties(), | ||
| ioConfig.getAutoScalerConfig(), | ||
| ioConfig.getLagAggregator(), | ||
| ioConfig.getPollTimeout(), | ||
| ioConfig.getStartDelay().toPeriod(), | ||
| ioConfig.getPeriod().toPeriod(), | ||
| ioConfig.isUseEarliestSequenceNumber(), | ||
| ioConfig.getCompletionTimeout().toPeriod(), | ||
| ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null, | ||
| ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null, | ||
| ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null, | ||
| ioConfig.getConfigOverrides(), | ||
| ioConfig.getIdleConfig(), | ||
| ioConfig.getStopTaskCount(), | ||
| ioConfig.isEmitTimeLagMetrics(), | ||
| ioConfig.getServerPriorityToReplicas(), | ||
| boundedStreamConfig | ||
| ); | ||
| return new KafkaSupervisorSpec( | ||
| backfillId, | ||
| null, | ||
| getSpec().getDataSchema(), | ||
| getSpec().getTuningConfig(), | ||
| backfillIoConfig, | ||
| getContext(), | ||
| isSuspended(), | ||
| taskStorage, | ||
| taskMaster, | ||
| indexerMetadataStorageCoordinator, | ||
| (KafkaIndexTaskClientFactory) indexTaskClientFactory, | ||
| mapper, | ||
| emitter, | ||
| monitorSchedulerConfig, | ||
| rowIngestionMetersFactory, | ||
| supervisorStateManagerConfig | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic. | ||
| * <p> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the this section's title could also be:
Reset offset to latest and start a backfill supervisor