diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md index d321af143020..8f9c5c36dc5c 100644 --- a/docs/api-reference/supervisor-api.md +++ b/docs/api-reference/supervisor-api.md @@ -3539,6 +3539,109 @@ when the supervisor's tasks restart, they resume reading from `{"0": 100, "1": 1 ``` +### Reset offsets to latest and start a backfill supervisor + +This endpoint is supported for Apache Kafka and RabbitMQ Stream supervisors. Amazon Kinesis is not supported yet. + +Resets the supervisor to the latest available stream offsets and starts a new bounded backfill supervisor to ingest the data in the skipped range. + +This endpoint is useful when a supervisor has fallen behind and you want to catch it up to the latest offsets without losing the skipped data. The main supervisor resumes ingesting from the latest offsets, while the backfill supervisor processes the range from the previously checkpointed offsets up to the latest offsets at the time of the reset. + +**Duplicate ingestion notice:** The main supervisor is not quiesced before the reset. This means duplicate data can occur in two ways: +- **Backfill overlap:** Any tasks that were in-flight at the time of the reset may publish segments covering part of the backfill range before being shut down. +- **Reset race:** If a task checkpoint is written to the metadata store between when this endpoint captures the current offsets and when it applies the reset, that checkpoint can be overwritten, causing the main supervisor to re-ingest already-processed data. + +Both windows are narrow in practice, but cannot be fully eliminated without manually suspending the main supervisor before calling this endpoint and waiting for all pending tasks to complete. + +The following requirements must be met before calling this endpoint: + +- The supervisor must be a [streaming supervisor](../ingestion/supervisor.md). +- The supervisor's `useEarliestSequenceNumber` property must be `false`. +- The supervisor context must have `useConcurrentLocks` set to `true` to allow the backfill supervisor's tasks to write concurrently with the main supervisor's tasks. +- The supervisor must be in a `RUNNING` state. + +The backfill supervisor has the same configuration as the source supervisor except for its ID, which takes the form `{supervisorId}_backfill_{randomSuffix}`, and its `boundedStreamConfig`, which is set to the skipped offset range. If `backfillTaskCount` is specified, it overrides the `taskCount` for the backfill supervisor only. + +#### URL + +`POST` `/druid/indexer/v1/supervisor/{supervisorId}/resetToLatestAndBackfill` + +#### Query parameters + +| Parameter | Type | Description | Default | +|---------|---------|---------|---------| +| `backfillTaskCount` | Integer | Number of parallel tasks for the backfill supervisor. | Defaults to `taskCount` from the source supervisor if not specified | + +#### Responses + + + + + + +*Successfully reset and started backfill supervisor* + + + + + +*Supervisor does not meet requirements (wrong type, `useEarliestSequenceNumber` is true, `useConcurrentLocks` not enabled, or supervisor not RUNNING)* + + + + + +*Invalid supervisor ID* + + + + + +*Failed to retrieve stream offsets or serialize the backfill spec* + + + + +--- + +#### Sample request + +The following example resets a supervisor named `social_media` and starts a backfill supervisor with 2 tasks. + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/resetToLatestAndBackfill?backfillTaskCount=2 HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ View the response + + ```json +{ + "id": "social_media", + "backfillSupervisorId": "social_media_backfill_abcdefgh" +} + ``` +
+ ### Terminate a supervisor Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When you terminate a supervisor, Druid places a tombstone marker in the metadata store to prevent reloading on restart. diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java index 7e22d85d9cab..fa184418df52 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java @@ -292,6 +292,48 @@ public void test_boundedSupervisor_doesNotSilentlyCompleteWhenStaleOffsetExceeds Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), "Supervisor state should be UNHEALTHY_SUPERVISOR"); } + @Test + public void test_resetToLatestAndBackfill() + { + final String topic = IdUtils.getRandomId(); + kafkaServer.createTopicWithPartitions(topic, 2); + + // Create a streaming supervisor with concurrent locks and withUseEarliestSequenceNumber=false + final KafkaSupervisorSpec supervisor = createKafkaSupervisor(kafkaServer) + .withContext(Map.of("useConcurrentLocks", true)) + .withIoConfig(io -> io + .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, null)) + .withUseEarliestSequenceNumber(false) + ) + .build(dataSource, topic); + + cluster.callApi().postSupervisor(supervisor); + + waitForSupervisorDetailedState(supervisor.getId(), "RUNNING"); + + final int totalRecords = publish1kRecords(topic, false); + waitUntilPublishedRecordsAreIngested(totalRecords); + + // Reset the main supervisor and spin up a backfill supervisor. + // Since all records are already ingested before the call, the backfill + // supervisor will complete immediately without ingesting anything. + final Map result = cluster.callApi().resetToLatestAndBackfill(supervisor.getId()); + Assertions.assertEquals(supervisor.getId(), result.get("id")); + final String backfillSupervisorId = (String) result.get("backfillSupervisorId"); + + // Wait for the backfill to finish + waitForSupervisorToComplete(backfillSupervisorId); + + // Main supervisor should still be running + final SupervisorStatus mainStatus = cluster.callApi().getSupervisorStatus(supervisor.getId()); + Assertions.assertEquals("RUNNING", mainStatus.getState()); + Assertions.assertTrue(mainStatus.isHealthy()); + + final SupervisorStatus backfillStatus = cluster.callApi().getSupervisorStatus(backfillSupervisorId); + Assertions.assertEquals("COMPLETED", backfillStatus.getState()); + Assertions.assertTrue(backfillStatus.isHealthy()); + } + private void waitForSupervisorToComplete(String supervisorId) { overlord.latchableEmitter().waitForEvent( @@ -301,6 +343,15 @@ private void waitForSupervisorToComplete(String supervisorId) ); } + private void waitForSupervisorDetailedState(String supervisorId, String detailedState) + { + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("supervisor/count") + .hasDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .hasDimension("detailedState", detailedState) + ); + } + private void waitForSupervisorToBeUnhealthy(String supervisorId) { overlord.latchableEmitter().waitForEvent( diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 6099105b3374..04973a5272fd 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -322,7 +322,7 @@ protected Map getTimeLagPerPartition(Map currentOffs } @Override - protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) + public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map)); } @@ -408,7 +408,7 @@ public LagStats computeLagStats() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { getRecordSupplierLock().lock(); @@ -435,7 +435,7 @@ protected void updatePartitionLagFromStream() } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>(); } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java index 4a445f6f1c11..4763a949a615 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorSpec.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; @@ -155,6 +156,55 @@ protected RabbitStreamSupervisorSpec toggleSuspend(boolean suspend) supervisorStateManagerConfig); } + @Override + public RabbitStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + RabbitStreamSupervisorIOConfig ioConfig = getSpec().getIOConfig(); + RabbitStreamSupervisorIOConfig backfillIoConfig = new RabbitStreamSupervisorIOConfig( + ioConfig.getStream(), + ioConfig.getUri(), + ioConfig.getInputFormat(), + ioConfig.getReplicas(), + taskCount != null ? taskCount : ioConfig.getTaskCount(), + ioConfig.getTaskDuration().toPeriod(), + ioConfig.getConsumerProperties(), + ioConfig.getAutoScalerConfig(), + ioConfig.getPollTimeout(), + ioConfig.getStartDelay().toPeriod(), + ioConfig.getPeriod().toPeriod(), + ioConfig.getCompletionTimeout().toPeriod(), + ioConfig.isUseEarliestSequenceNumber(), + ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null, + ioConfig.getStopTaskCount(), + ioConfig.getServerPriorityToReplicas(), + boundedStreamConfig + ); + return new RabbitStreamSupervisorSpec( + backfillId, + null, + getSpec().getDataSchema(), + getSpec().getTuningConfig(), + backfillIoConfig, + getContext(), + isSuspended(), + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + (RabbitStreamIndexTaskClientFactory) indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig + ); + } + @Override public String toString() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 727eb52db272..5863284cc2d9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -356,7 +356,7 @@ protected Map getTimeLagPerPartition(Map map) + public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map) { return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map)); } @@ -548,7 +548,7 @@ private Map getTimestampPerPartitionAtCurrentOffset(S *

*/ @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { if (getIoConfig().isEmitTimeLagMetrics()) { updatePartitionTimeAndRecordLagFromStream(); @@ -597,7 +597,7 @@ private void updateOffsetSnapshot( } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return offsetSnapshotRef.get().getLatestOffsetsFromStream(); } @@ -630,7 +630,7 @@ protected boolean isMultiTopic() * Gets the offsets as stored in the metadata store. The map returned will only contain * offsets from topic partitions that match the current supervisor config stream. This * override is needed because in the case of multi-topic, a user could have updated the supervisor - * config from single topic to mult-topic, where the new multi-topic pattern regex matches the + * config from single topic to multi-topic, where the new multi-topic pattern regex matches the * old config single topic. Without this override, the previously stored metadata for the single * topic would be deemed as different from the currently configure stream, and not be included in * the offset map returned. This implementation handles these cases appropriately. @@ -640,7 +640,7 @@ protected boolean isMultiTopic() * updated to single topic or multi-topic depending on the supervisor config, as needed. */ @Override - protected Map getOffsetsFromMetadataStorage() + public Map getOffsetsFromMetadataStorage() { final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); if (checkSourceMetadataMatch(dataSourceMetadata)) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index b607ade1acfe..31d3e8fad691 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend) ); } + @Override + public KafkaSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig(); + KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig( + ioConfig.getTopic(), + ioConfig.getTopicPattern(), + ioConfig.getInputFormat(), + ioConfig.getReplicas(), + taskCount != null ? taskCount : ioConfig.getTaskCount(), + ioConfig.getTaskDuration().toPeriod(), + ioConfig.getConsumerProperties(), + ioConfig.getAutoScalerConfig(), + ioConfig.getLagAggregator(), + ioConfig.getPollTimeout(), + ioConfig.getStartDelay().toPeriod(), + ioConfig.getPeriod().toPeriod(), + ioConfig.isUseEarliestSequenceNumber(), + ioConfig.getCompletionTimeout().toPeriod(), + ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null, + ioConfig.getConfigOverrides(), + ioConfig.getIdleConfig(), + ioConfig.getStopTaskCount(), + ioConfig.isEmitTimeLagMetrics(), + ioConfig.getServerPriorityToReplicas(), + boundedStreamConfig + ); + return new KafkaSupervisorSpec( + backfillId, + null, + getSpec().getDataSchema(), + getSpec().getTuningConfig(), + backfillIoConfig, + getContext(), + isSuspended(), + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + (KafkaIndexTaskClientFactory) indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig + ); + } + /** * Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic. *

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 8879ff6d9753..06ca9b64ced5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig; import org.apache.druid.jackson.DefaultObjectMapper; @@ -564,6 +565,38 @@ public void test_validateSpecUpdateTo() sourceSpec.validateSpecUpdateTo(validDestSpec); } + @Test + public void testCreateBackfillSpec() + { + KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder() + .withDataSchema( + schema -> schema + .withTimestamp(TimestampSpec.DEFAULT) + .withAggregators(new CountAggregatorFactory("rows")) + .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null)) + ) + .withIoConfig( + ioConfig -> ioConfig + .withJsonInputFormat() + .withConsumerProperties(Map.of("bootstrap.servers", "localhost:9092")) + .withTaskCount(3) + ) + .build("testDs", "metrics"); + + BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig( + Map.of("0", 100L, "1", 200L), + Map.of("0", 500L, "1", 600L) + ); + + KafkaSupervisorSpec backfill = (KafkaSupervisorSpec) spec.createBackfillSpec("backfill-id", boundedStreamConfig, 2); + + Assert.assertEquals("backfill-id", backfill.getId()); + Assert.assertEquals("testDs", backfill.getSpec().getDataSchema().getDataSource()); + Assert.assertEquals("metrics", backfill.getSpec().getIOConfig().getTopic()); + Assert.assertEquals(2, backfill.getSpec().getIOConfig().getTaskCount()); + Assert.assertEquals(boundedStreamConfig, backfill.getSpec().getIOConfig().getBoundedStreamConfig()); + } + private KafkaSupervisorSpec getSpec(String topic, String topicPattern) { KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder() diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 0f91fc0965db..3f1f4034f3ce 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -321,7 +321,7 @@ protected Map getTimeLagPerPartition(Map currentOf } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -336,7 +336,7 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; // this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index 8e6615716809..4899337797bf 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; @@ -193,4 +194,57 @@ protected KinesisSupervisorSpec toggleSuspend(boolean suspend) supervisorStateManagerConfig ); } + + @Override + public KinesisSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + KinesisSupervisorIOConfig ioConfig = getSpec().getIOConfig(); + KinesisSupervisorIOConfig backfillIoConfig = new KinesisSupervisorIOConfig( + ioConfig.getStream(), + ioConfig.getInputFormat(), + ioConfig.getEndpoint(), + null, + ioConfig.getReplicas(), + taskCount != null ? taskCount : ioConfig.getTaskCount(), + ioConfig.getTaskDuration().toPeriod(), + ioConfig.getStartDelay().toPeriod(), + ioConfig.getPeriod().toPeriod(), + ioConfig.isUseEarliestSequenceNumber(), + ioConfig.getCompletionTimeout().toPeriod(), + ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null, + ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null, + ioConfig.getRecordsPerFetch(), + ioConfig.getFetchDelayMillis(), + ioConfig.getAwsAssumedRoleArn(), + ioConfig.getAwsExternalId(), + ioConfig.getAutoScalerConfig(), + ioConfig.isDeaggregate(), + ioConfig.getServerPriorityToReplicas(), + boundedStreamConfig + ); + return new KinesisSupervisorSpec( + backfillId, + null, + getSpec().getDataSchema(), + getSpec().getTuningConfig(), + backfillIoConfig, + getContext(), + isSuspended(), + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + (KinesisIndexTaskClientFactory) indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + awsCredentialsConfig, + supervisorStateManagerConfig + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 52f3cba7fc11..fa7d96634ae6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.error.NotFound; @@ -35,8 +37,11 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -129,33 +134,8 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; - if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { - SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; - Map context = seekableStreamSupervisorSpec.getContext(); - if (context != null) { - Boolean useConcurrentLocks = QueryContexts.getAsBoolean( - Tasks.USE_CONCURRENT_LOCKS, - context.get(Tasks.USE_CONCURRENT_LOCKS) - ); - if (useConcurrentLocks == null) { - TaskLockType taskLockType = QueryContexts.getAsEnum( - Tasks.TASK_LOCK_TYPE, - context.get(Tasks.TASK_LOCK_TYPE), - TaskLockType.class - ); - if (taskLockType == null) { - hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; - } else if (taskLockType == TaskLockType.APPEND) { - hasAppendLock = true; - } else { - hasAppendLock = false; - } - } else { - hasAppendLock = useConcurrentLocks; - } - } - } + boolean hasAppendLock = supervisorSpec instanceof SeekableStreamSupervisorSpec + && specHasConcurrentLocks((SeekableStreamSupervisorSpec) supervisorSpec); if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() @@ -393,6 +373,116 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData return true; } + /** + * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to + * process the skipped range from the previously checkpointed offsets up to the latest offsets. + * + * @param id supervisor ID + * @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec + * @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"} + * @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor}, + * if {@code useEarliestSequenceNumber} is true, + * if {@code useConcurrentLocks} is not set to true in the supervisor context, + * or if the supervisor is not in a RUNNING state + * @throws IllegalStateException if the latest or checkpointed offsets cannot be retrieved, + * or if the backfill spec cannot be serialized + */ + public Map resetToLatestAndBackfill(String id, @Nullable Integer backfillTaskCount) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(id, "id"); + + Pair supervisor = supervisors.get(id); + + if (supervisor == null) { + throw new IAE("Supervisor[%s] does not exist", id); + } + + if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { + throw new IAE("Supervisor[%s] is not a streaming supervisor", id); + } + + SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisor.lhs; + SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisor.rhs; + + validateResetAndBackfill(id, streamSupervisor, streamSpec); + + log.info("Capturing latest offsets from stream for supervisor[%s]", id); + streamSupervisor.updatePartitionLagFromStream(); + Map endOffsets = streamSupervisor.getLatestSequencesFromStream(); + + log.info("Capturing checkpointed offsets for supervisor[%s]", id); + Map startOffsets = streamSupervisor.getOffsetsFromMetadataStorage(); + + if (endOffsets == null || endOffsets.isEmpty()) { + throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id); + } + if (startOffsets == null || startOffsets.isEmpty()) { + throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id); + } + + String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill"); + + try { + Map normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class); + Map normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class); + BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets); + SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, backfillTaskCount); + createOrUpdateAndStartSupervisor(backfillSpec); + } + catch (JsonProcessingException e) { + throw new ISE(e, "Failed to serialize offsets for backfill supervisor[%s]", backfillSupervisorId); + } + + log.info( + "Started backfill supervisor[%s] for supervisor[%s] with startOffsets[%s] and endOffsets[%s]", + backfillSupervisorId, + id, + startOffsets, + endOffsets + ); + + log.info("Resetting supervisor[%s] metadata to latest offsets", id); + DataSourceMetadata resetMetadata = streamSupervisor.createDataSourceMetaDataForReset( + streamSupervisor.getIoConfig().getStream(), + endOffsets + ); + + streamSupervisor.resetOffsets(resetMetadata); + + // Reset autoscaler if present + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.reset(); + } + + return ImmutableMap.of( + "id", id, + "backfillSupervisorId", backfillSupervisorId + ); + } + + private void validateResetAndBackfill( + String id, + SeekableStreamSupervisor streamSupervisor, + SeekableStreamSupervisorSpec streamSpec + ) + { + if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) { + throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true."); + } + + if (!specHasConcurrentLocks(streamSpec)) { + throw new IAE( + "Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks" + ); + } + + if (streamSupervisor.getState() != SupervisorStateManager.BasicState.RUNNING) { + throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id); + } + } + public boolean checkPointDataSourceMetadata( String supervisorId, int taskGroupId, @@ -631,4 +721,29 @@ private SupervisorSpec getSpec(String id) return supervisor == null ? null : supervisor.rhs; } } + + /** + * Returns true if the spec's context enables concurrent (append) locks, accepting both + * {@code useConcurrentLocks: true} (or any truthy string) and {@code taskLockType: APPEND}. + */ + private static boolean specHasConcurrentLocks(SeekableStreamSupervisorSpec spec) + { + Map context = spec.getContext(); + if (context == null) { + return Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + } + Boolean useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS) + ); + if (useConcurrentLocks != null) { + return useConcurrentLocks; + } + TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + return taskLockType == TaskLockType.APPEND; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index aff9edf19af9..8d0e04eb7988 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -640,6 +640,50 @@ private Response handleResetRequest( ); } + @POST + @Path("/{id}/resetToLatestAndBackfill") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response resetToLatestAndBackfill( + @PathParam("id") final String id, + @QueryParam("backfillTaskCount") @Nullable final Integer backfillTaskCount + ) + { + return handleResetToLatestAndBackfill(id, backfillTaskCount); + } + + private Response handleResetToLatestAndBackfill(final String id, @Nullable final Integer backfillTaskCount) + { + if (backfillTaskCount != null && backfillTaskCount < 1) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", "backfillTaskCount must be a positive integer")) + .build(); + } + return asLeaderWithSupervisorManager( + manager -> { + if (!manager.getSupervisorIds().contains(id)) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) + .build(); + } + try { + Map result = manager.resetToLatestAndBackfill(id, backfillTaskCount); + return Response.ok(result).build(); + } + catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + catch (Exception e) { + return Response.serverError() + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + } + ); + } + private Response asLeaderWithSupervisorManager(Function f) { Optional supervisorManager = taskMaster.getSupervisorManager(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 91b4244c0bf3..a3e4dc1cd764 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2158,7 +2158,7 @@ public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMet final boolean metadataUpdateSuccess; final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(supervisorId); if (metadata == null) { - log.info("Checkpointed metadata in null for supervisor[%s] for dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource, resetMetadata); + log.info("Checkpointed metadata is null for supervisor[%s] for dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource, resetMetadata); metadataUpdateSuccess = indexerMetadataStorageCoordinator.insertDataSourceMetadata(supervisorId, resetMetadata); } else { if (!checkSourceMetadataMatch(metadata)) { @@ -3274,7 +3274,7 @@ private boolean updatePartitionDataFromStream() /** * gets mapping of partitions in stream to their latest offsets. */ - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return new HashMap<>(); } @@ -4552,7 +4552,7 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti } } - protected Map getOffsetsFromMetadataStorage() + public Map getOffsetsFromMetadataStorage() { final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata(); if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata @@ -4939,7 +4939,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept coalesceAndAwait(futures); } - protected abstract void updatePartitionLagFromStream(); + public abstract void updatePartitionLagFromStream(); /** * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. @@ -5196,7 +5196,7 @@ protected abstract List sequence * @return specific instance of datasource metadata */ - protected abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 842f0de4774e..ecbd51757c37 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -297,4 +297,10 @@ public void merge(@Nullable SupervisorSpec existingSpec) protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); + public abstract SeekableStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ); + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 525444e23dea..199e004b4243 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -19,6 +19,9 @@ package org.apache.druid.indexing.overlord.supervisor; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -26,6 +29,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.InvalidInput; @@ -35,7 +40,11 @@ import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; +import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -43,6 +52,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.metrics.SupervisorStatsProvider; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -59,6 +69,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import javax.annotation.Nullable; import java.lang.reflect.Field; import java.util.Collection; import java.util.Collections; @@ -1068,6 +1079,186 @@ public void test_isAnotherTaskGroupPublishingToPartitions() ); } + @Test + public void testResetToLatestAndBackfill() throws Exception + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of()); + replayAll(); + manager.start(); + + final ConcurrentHashMap> supervisorsMap = getSupervisorsMap(); + final SeekableStreamSupervisorSpec streamSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class); + final SeekableStreamSupervisor streamSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class); + final SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createNiceMock(SeekableStreamSupervisorIOConfig.class); + + // non-SeekableStream supervisor → IAE + // Use a concrete anonymous Supervisor (not a mock) to reliably fail instanceof SeekableStreamSupervisor + final Supervisor nonStreamSupervisor = new Supervisor() + { + @Override + public void start() + { + } + + @Override + public void stop(boolean stopGracefully) + { + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return null; + } + + @Override + public void reset(DataSourceMetadata dataSourceMetadata) + { + } + }; + supervisorsMap.put("id1", Pair.of(nonStreamSupervisor, streamSpec)); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + + // useEarliestSequenceNumber=true → IAE + supervisorsMap.put("id1", Pair.of(streamSupervisor, streamSpec)); + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(true).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks not set (null context) → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(null).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks=false → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", false)).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // useConcurrentLocks="true" (string) → accepted, fails at next guard (not RUNNING) + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", "true")).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // taskLockType=APPEND → accepted, fails at next guard (not RUNNING) + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("taskLockType", "APPEND")).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // supervisor not RUNNING → IAE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalArgumentException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // empty latest offsets → ISE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once(); + streamSupervisor.updatePartitionLagFromStream(); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of()).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalStateException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + EasyMock.reset(streamSupervisor, streamSpec, ioConfig); + + // empty start offsets from metadata → ISE + EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once(); + EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once(); + EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once(); + streamSupervisor.updatePartitionLagFromStream(); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of("0", 100L)).once(); + EasyMock.expect(streamSupervisor.getOffsetsFromMetadataStorage()).andReturn(ImmutableMap.of()).once(); + EasyMock.replay(streamSupervisor, streamSpec, ioConfig); + Assert.assertThrows( + IllegalStateException.class, + () -> manager.resetToLatestAndBackfill("id1", null) + ); + + verifyAll(); + } + + @Test + public void testCreateBackfillSpec() + { + final TestBackfillSupervisorSpec.IOConfig ioConfig = new TestBackfillSupervisorSpec.IOConfig("test-stream", null, null); + final TestBackfillSupervisorSpec.IngestionSpec ingestionSpec = new TestBackfillSupervisorSpec.IngestionSpec(ioConfig); + final SeekableStreamSupervisorSpec sourceSpec = new TestBackfillSupervisorSpec("original-id", ingestionSpec); + + final BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig( + ImmutableMap.of("0", 100L), + ImmutableMap.of("0", 200L) + ); + + // Without overriding taskCount + final SupervisorSpec backfillSpec = sourceSpec.createBackfillSpec("backfill-id", boundedStreamConfig, null); + Assert.assertEquals("backfill-id", backfillSpec.getId()); + final TestBackfillSupervisorSpec backfillCast = (TestBackfillSupervisorSpec) backfillSpec; + final BoundedStreamConfig actualConfig = backfillCast.getIoConfig().getBoundedStreamConfig(); + Assert.assertNotNull(actualConfig); + Assert.assertEquals(ImmutableMap.of("0", 100L), actualConfig.getStartSequenceNumbers()); + Assert.assertEquals(ImmutableMap.of("0", 200L), actualConfig.getEndSequenceNumbers()); + Assert.assertEquals(1, backfillCast.getIoConfig().getTaskCount()); + + // With overriding taskCount + final SupervisorSpec backfillSpecWithCount = sourceSpec.createBackfillSpec("backfill-id-2", boundedStreamConfig, 5); + Assert.assertEquals("backfill-id-2", backfillSpecWithCount.getId()); + final TestBackfillSupervisorSpec backfillWithCount = (TestBackfillSupervisorSpec) backfillSpecWithCount; + Assert.assertEquals(5, backfillWithCount.getIoConfig().getTaskCount()); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; @@ -1137,4 +1328,103 @@ public List getDataSources() return Collections.singletonList(id); } } + + @JsonTypeName("testBackfill") + private static class TestBackfillSupervisorSpec extends SeekableStreamSupervisorSpec + { + @JsonCreator + TestBackfillSupervisorSpec( + @JsonProperty("id") String id, + @JsonProperty("spec") IngestionSpec ingestionSpec + ) + { + super( + id, + ingestionSpec, + ImmutableMap.of("useConcurrentLocks", true), + false, + null, null, null, null, + MAPPER, + null, null, null, null + ); + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public String getType() + { + return "testBackfill"; + } + + @Override + public String getSource() + { + return "test-stream"; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return this; + } + + @Override + public SeekableStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + return new TestBackfillSupervisorSpec( + backfillId, + new IngestionSpec(new IOConfig(getIoConfig().getStream(), taskCount, boundedStreamConfig)) + ); + } + + @Override + public SeekableStreamSupervisorIOConfig getIoConfig() + { + return getSpec().getIOConfig(); + } + + @JsonTypeName("testBackfillIngestionSpec") + static class IngestionSpec extends SeekableStreamSupervisorIngestionSpec + { + @JsonCreator + IngestionSpec( + @JsonProperty("ioConfig") IOConfig ioConfig + ) + { + super( + new DataSchema( + "testDS", + new TimestampSpec("time", "auto", null), + new DimensionsSpec(Collections.emptyList()), + null, null, null, null, null + ), + ioConfig, + null + ); + } + } + + @JsonTypeName("testBackfillIOConfig") + static class IOConfig extends SeekableStreamSupervisorIOConfig + { + @JsonCreator + IOConfig( + @JsonProperty("stream") String stream, + @JsonProperty("taskCount") Integer taskCount, + @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig + ) + { + super(stream, null, 1, taskCount, null, null, null, false, null, null, null, null, LagAggregator.DEFAULT, null, null, null, null, boundedStreamConfig); + } + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 4ccf4659994f..31e0d604a222 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; @@ -1379,6 +1380,100 @@ public void testResetOffsets() verifyAll(); } + @Test + public void testResetToLatestAndBackfill() + { + // 200 - success + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null)) + .andReturn(ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh")); + replayAll(); + + Response response = supervisorResource.resetToLatestAndBackfill("my-id", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 404 - supervisor does not exist + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of()); + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", null); + Assert.assertEquals(404, response.getStatus()); + verifyAll(); + resetAll(); + + // 400 - IAE (e.g. supervisor not running) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null)) + .andThrow(new IllegalArgumentException("Supervisor[my-id] must be in a RUNNING state")); + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", null); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "Supervisor[my-id] must be in a RUNNING state"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 500 - ISE (e.g. failed to retrieve offsets) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id")); + EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null)) + .andThrow(new IllegalStateException("Failed to get latest offsets from stream")); + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", null); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "Failed to get latest offsets from stream"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 400 - invalid backfillTaskCount (zero) + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", 0); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "backfillTaskCount must be a positive integer"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 400 - invalid backfillTaskCount (negative) + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", -1); + Assert.assertEquals(400, response.getStatus()); + Assert.assertEquals( + ImmutableMap.of("error", "backfillTaskCount must be a positive integer"), + response.getEntity() + ); + verifyAll(); + resetAll(); + + // 503 - no supervisor manager (not leader) + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + response = supervisorResource.resetToLatestAndBackfill("my-id", null); + Assert.assertEquals(503, response.getStatus()); + verifyAll(); + } + @Test public void testNoopSupervisorSpecSerde() throws Exception { @@ -1668,6 +1763,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) return null; } + @Override + public SeekableStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + return null; + } + @JsonIgnore @Nonnull @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 3d0c6426feb2..8182ae482ca6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -884,6 +884,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) return null; } + @Override + public SeekableStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + return null; + } + @Override public String getType() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index eff5d1acd980..e19d68cb2b3f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -3059,7 +3059,7 @@ public String toString() final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor() { @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -3284,7 +3284,7 @@ protected String baseTaskName() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { // do nothing } @@ -3381,7 +3381,7 @@ protected boolean doesTaskMatchSupervisor(Task task) } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -3521,7 +3521,7 @@ public LagStats computeLagStats() } @Override - protected Map getLatestSequencesFromStream() + public Map getLatestSequencesFromStream() { return streamOffsets; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java index 4eefaed9bd99..c96a64211b97 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java @@ -124,7 +124,7 @@ protected String baseTaskName() } @Override - protected void updatePartitionLagFromStream() + public void updatePartitionLagFromStream() { // do nothing } @@ -205,7 +205,7 @@ protected boolean doesTaskMatchSupervisor(Task task) } @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( String stream, Map map ) @@ -436,6 +436,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) { return null; } + + @Override + public SeekableStreamSupervisorSpec createBackfillSpec( + String backfillId, + BoundedStreamConfig boundedStreamConfig, + @Nullable Integer taskCount + ) + { + return null; + } } protected static SeekableStreamSupervisorTuningConfig getTuningConfig() diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java index 0ca643f109f7..35488be081b1 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java @@ -322,6 +322,12 @@ public ListenableFuture> terminateSupervisor(String supervis throw new UOE("Not implemented"); } + @Override + public ListenableFuture> resetToLatestAndBackfill(String supervisorId) + { + throw new UOE("Not implemented"); + } + @Override public ListenableFuture> supervisorStatuses() { diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java index 81fccf19f131..2b1ad6a555a7 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/NoopOverlordClient.java @@ -114,6 +114,12 @@ public ListenableFuture> terminateSupervisor(String supervis throw new UnsupportedOperationException(); } + @Override + public ListenableFuture> resetToLatestAndBackfill(String supervisorId) + { + throw new UnsupportedOperationException(); + } + @Override public ListenableFuture> supervisorStatuses() { diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index c4d348997779..baf7e4297c9d 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -197,6 +197,15 @@ ListenableFuture> taskStatuses( */ ListenableFuture> terminateSupervisor(String supervisorId); + /** + * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor. + *

+ * API: {@code POST /druid/indexer/v1/supervisor//resetToLatestAndBackfill} + * + * @return Map containing "id" and "backfillSupervisorId" + */ + ListenableFuture> resetToLatestAndBackfill(String supervisorId); + /** * Returns all current supervisor statuses. */ diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 0499a62f090a..3657d8b83a6f 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -265,6 +265,23 @@ public ListenableFuture> terminateSupervisor(String supervis ); } + @Override + public ListenableFuture> resetToLatestAndBackfill(String supervisorId) + { + final String path = StringUtils.format( + "/druid/indexer/v1/supervisor/%s/resetToLatestAndBackfill", + StringUtils.urlEncode(supervisorId) + ); + + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, path), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<>() {}) + ); + } + @Override public ListenableFuture> supervisorStatuses() { diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java index 257533aecbd0..6ae8750b8d8e 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java @@ -430,6 +430,16 @@ public String postSupervisor(SupervisorSpec supervisor) return onLeaderOverlord(o -> o.postSupervisor(supervisor)).get("id"); } + /** + * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor. + * + * @return Map containing "id" and "backfillSupervisorId" + */ + public Map resetToLatestAndBackfill(String supervisorId) + { + return onLeaderOverlord(o -> o.resetToLatestAndBackfill(supervisorId)); + } + /** * Fetches the current status of the given supervisor ID. */