getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index b607ade1acfe..31d3e8fad691 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -36,6 +36,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -173,6 +174,59 @@ protected KafkaSupervisorSpec toggleSuspend(boolean suspend)
);
}
+ @Override
+ public KafkaSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KafkaSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KafkaSupervisorIOConfig backfillIoConfig = new KafkaSupervisorIOConfig(
+ ioConfig.getTopic(),
+ ioConfig.getTopicPattern(),
+ ioConfig.getInputFormat(),
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getConsumerProperties(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.getLagAggregator(),
+ ioConfig.getPollTimeout(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getConfigOverrides(),
+ ioConfig.getIdleConfig(),
+ ioConfig.getStopTaskCount(),
+ ioConfig.isEmitTimeLagMetrics(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KafkaSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KafkaIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig
+ );
+ }
+
/**
* Extends {@link SeekableStreamSupervisorSpec#validateSpecUpdateTo} to ensure that the proposed spec and current spec are either both multi-topic or both single-topic.
*
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index 8879ff6d9753..06ca9b64ced5 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -564,6 +565,38 @@ public void test_validateSpecUpdateTo()
sourceSpec.validateSpecUpdateTo(validDestSpec);
}
+ @Test
+ public void testCreateBackfillSpec()
+ {
+ KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(TimestampSpec.DEFAULT)
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers", "localhost:9092"))
+ .withTaskCount(3)
+ )
+ .build("testDs", "metrics");
+
+ BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(
+ Map.of("0", 100L, "1", 200L),
+ Map.of("0", 500L, "1", 600L)
+ );
+
+ KafkaSupervisorSpec backfill = (KafkaSupervisorSpec) spec.createBackfillSpec("backfill-id", boundedStreamConfig, 2);
+
+ Assert.assertEquals("backfill-id", backfill.getId());
+ Assert.assertEquals("testDs", backfill.getSpec().getDataSchema().getDataSource());
+ Assert.assertEquals("metrics", backfill.getSpec().getIOConfig().getTopic());
+ Assert.assertEquals(2, backfill.getSpec().getIOConfig().getTaskCount());
+ Assert.assertEquals(boundedStreamConfig, backfill.getSpec().getIOConfig().getBoundedStreamConfig());
+ }
+
private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
{
KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder()
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 0f91fc0965db..3f1f4034f3ce 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -321,7 +321,7 @@ protected Map getTimeLagPerPartition(Map currentOf
}
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -336,7 +336,7 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
index 8e6615716809..4899337797bf 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java
@@ -35,6 +35,7 @@
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
@@ -193,4 +194,57 @@ protected KinesisSupervisorSpec toggleSuspend(boolean suspend)
supervisorStateManagerConfig
);
}
+
+ @Override
+ public KinesisSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ KinesisSupervisorIOConfig ioConfig = getSpec().getIOConfig();
+ KinesisSupervisorIOConfig backfillIoConfig = new KinesisSupervisorIOConfig(
+ ioConfig.getStream(),
+ ioConfig.getInputFormat(),
+ ioConfig.getEndpoint(),
+ null,
+ ioConfig.getReplicas(),
+ taskCount != null ? taskCount : ioConfig.getTaskCount(),
+ ioConfig.getTaskDuration().toPeriod(),
+ ioConfig.getStartDelay().toPeriod(),
+ ioConfig.getPeriod().toPeriod(),
+ ioConfig.isUseEarliestSequenceNumber(),
+ ioConfig.getCompletionTimeout().toPeriod(),
+ ioConfig.getLateMessageRejectionPeriod().isPresent() ? ioConfig.getLateMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? ioConfig.getEarlyMessageRejectionPeriod().get().toPeriod() : null,
+ ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? ioConfig.getLateMessageRejectionStartDateTime().get() : null,
+ ioConfig.getRecordsPerFetch(),
+ ioConfig.getFetchDelayMillis(),
+ ioConfig.getAwsAssumedRoleArn(),
+ ioConfig.getAwsExternalId(),
+ ioConfig.getAutoScalerConfig(),
+ ioConfig.isDeaggregate(),
+ ioConfig.getServerPriorityToReplicas(),
+ boundedStreamConfig
+ );
+ return new KinesisSupervisorSpec(
+ backfillId,
+ null,
+ getSpec().getDataSchema(),
+ getSpec().getTuningConfig(),
+ backfillIoConfig,
+ getContext(),
+ isSuspended(),
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ (KinesisIndexTaskClientFactory) indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ awsCredentialsConfig,
+ supervisorStateManagerConfig
+ );
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 52f3cba7fc11..fa7d96634ae6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -23,9 +23,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
@@ -35,8 +37,11 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -129,33 +134,8 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String
final Supervisor supervisor = entry.getValue().lhs;
final SupervisorSpec supervisorSpec = entry.getValue().rhs;
- boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
- if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
- SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec;
- Map context = seekableStreamSupervisorSpec.getContext();
- if (context != null) {
- Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
- Tasks.USE_CONCURRENT_LOCKS,
- context.get(Tasks.USE_CONCURRENT_LOCKS)
- );
- if (useConcurrentLocks == null) {
- TaskLockType taskLockType = QueryContexts.getAsEnum(
- Tasks.TASK_LOCK_TYPE,
- context.get(Tasks.TASK_LOCK_TYPE),
- TaskLockType.class
- );
- if (taskLockType == null) {
- hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
- } else if (taskLockType == TaskLockType.APPEND) {
- hasAppendLock = true;
- } else {
- hasAppendLock = false;
- }
- } else {
- hasAppendLock = useConcurrentLocks;
- }
- }
- }
+ boolean hasAppendLock = supervisorSpec instanceof SeekableStreamSupervisorSpec
+ && specHasConcurrentLocks((SeekableStreamSupervisorSpec) supervisorSpec);
if (supervisor instanceof SeekableStreamSupervisor
&& !supervisorSpec.isSuspended()
@@ -393,6 +373,116 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData
return true;
}
+ /**
+ * Resets a supervisor to the latest stream offsets and starts a bounded backfill supervisor to
+ * process the skipped range from the previously checkpointed offsets up to the latest offsets.
+ *
+ * @param id supervisor ID
+ * @param backfillTaskCount number of tasks for the backfill supervisor, or null to inherit from the source spec
+ * @return map with {@code "id"} (the original supervisor ID) and {@code "backfillSupervisorId"}
+ * @throws IllegalArgumentException if the supervisor is not a {@link SeekableStreamSupervisor},
+ * if {@code useEarliestSequenceNumber} is true,
+ * if {@code useConcurrentLocks} is not set to true in the supervisor context,
+ * or if the supervisor is not in a RUNNING state
+ * @throws IllegalStateException if the latest or checkpointed offsets cannot be retrieved,
+ * or if the backfill spec cannot be serialized
+ */
+ public Map resetToLatestAndBackfill(String id, @Nullable Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair supervisor = supervisors.get(id);
+
+ if (supervisor == null) {
+ throw new IAE("Supervisor[%s] does not exist", id);
+ }
+
+ if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) {
+ throw new IAE("Supervisor[%s] is not a streaming supervisor", id);
+ }
+
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisor.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisor.rhs;
+
+ validateResetAndBackfill(id, streamSupervisor, streamSpec);
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id);
+ }
+
+ String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + "_backfill");
+
+ try {
+ Map normalizedStartOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
+ Map normalizedEndOffsets = jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+ BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
+ SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, backfillTaskCount);
+ createOrUpdateAndStartSupervisor(backfillSpec);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE(e, "Failed to serialize offsets for backfill supervisor[%s]", backfillSupervisorId);
+ }
+
+ log.info(
+ "Started backfill supervisor[%s] for supervisor[%s] with startOffsets[%s] and endOffsets[%s]",
+ backfillSupervisorId,
+ id,
+ startOffsets,
+ endOffsets
+ );
+
+ log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+ DataSourceMetadata resetMetadata = streamSupervisor.createDataSourceMetaDataForReset(
+ streamSupervisor.getIoConfig().getStream(),
+ endOffsets
+ );
+
+ streamSupervisor.resetOffsets(resetMetadata);
+
+ // Reset autoscaler if present
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
+
+ return ImmutableMap.of(
+ "id", id,
+ "backfillSupervisorId", backfillSupervisorId
+ );
+ }
+
+ private void validateResetAndBackfill(
+ String id,
+ SeekableStreamSupervisor streamSupervisor,
+ SeekableStreamSupervisorSpec streamSpec
+ )
+ {
+ if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+ throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true.");
+ }
+
+ if (!specHasConcurrentLocks(streamSpec)) {
+ throw new IAE(
+ "Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks"
+ );
+ }
+
+ if (streamSupervisor.getState() != SupervisorStateManager.BasicState.RUNNING) {
+ throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id);
+ }
+ }
+
public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
@@ -631,4 +721,29 @@ private SupervisorSpec getSpec(String id)
return supervisor == null ? null : supervisor.rhs;
}
}
+
+ /**
+ * Returns true if the spec's context enables concurrent (append) locks, accepting both
+ * {@code useConcurrentLocks: true} (or any truthy string) and {@code taskLockType: APPEND}.
+ */
+ private static boolean specHasConcurrentLocks(SeekableStreamSupervisorSpec spec)
+ {
+ Map context = spec.getContext();
+ if (context == null) {
+ return Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
+ }
+ Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
+ Tasks.USE_CONCURRENT_LOCKS,
+ context.get(Tasks.USE_CONCURRENT_LOCKS)
+ );
+ if (useConcurrentLocks != null) {
+ return useConcurrentLocks;
+ }
+ TaskLockType taskLockType = QueryContexts.getAsEnum(
+ Tasks.TASK_LOCK_TYPE,
+ context.get(Tasks.TASK_LOCK_TYPE),
+ TaskLockType.class
+ );
+ return taskLockType == TaskLockType.APPEND;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index aff9edf19af9..8d0e04eb7988 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -640,6 +640,50 @@ private Response handleResetRequest(
);
}
+ @POST
+ @Path("/{id}/resetToLatestAndBackfill")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response resetToLatestAndBackfill(
+ @PathParam("id") final String id,
+ @QueryParam("backfillTaskCount") @Nullable final Integer backfillTaskCount
+ )
+ {
+ return handleResetToLatestAndBackfill(id, backfillTaskCount);
+ }
+
+ private Response handleResetToLatestAndBackfill(final String id, @Nullable final Integer backfillTaskCount)
+ {
+ if (backfillTaskCount != null && backfillTaskCount < 1) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", "backfillTaskCount must be a positive integer"))
+ .build();
+ }
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ if (!manager.getSupervisorIds().contains(id)) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id)))
+ .build();
+ }
+ try {
+ Map result = manager.resetToLatestAndBackfill(id, backfillTaskCount);
+ return Response.ok(result).build();
+ }
+ catch (IllegalArgumentException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ catch (Exception e) {
+ return Response.serverError()
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ }
+ );
+ }
+
private Response asLeaderWithSupervisorManager(Function f)
{
Optional supervisorManager = taskMaster.getSupervisorManager();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 91b4244c0bf3..a3e4dc1cd764 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2158,7 +2158,7 @@ public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMet
final boolean metadataUpdateSuccess;
final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(supervisorId);
if (metadata == null) {
- log.info("Checkpointed metadata in null for supervisor[%s] for dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource, resetMetadata);
+ log.info("Checkpointed metadata is null for supervisor[%s] for dataSource[%s] - inserting metadata[%s]", supervisorId, dataSource, resetMetadata);
metadataUpdateSuccess = indexerMetadataStorageCoordinator.insertDataSourceMetadata(supervisorId, resetMetadata);
} else {
if (!checkSourceMetadataMatch(metadata)) {
@@ -3274,7 +3274,7 @@ private boolean updatePartitionDataFromStream()
/**
* gets mapping of partitions in stream to their latest offsets.
*/
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return new HashMap<>();
}
@@ -4552,7 +4552,7 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti
}
}
- protected Map getOffsetsFromMetadataStorage()
+ public Map getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (dataSourceMetadata instanceof SeekableStreamDataSourceMetadata
@@ -4939,7 +4939,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept
coalesceAndAwait(futures);
}
- protected abstract void updatePartitionLagFromStream();
+ public abstract void updatePartitionLagFromStream();
/**
* Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets.
@@ -5196,7 +5196,7 @@ protected abstract List sequence
* @return specific instance of datasource metadata
*/
- protected abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public abstract SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 842f0de4774e..ecbd51757c37 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -297,4 +297,10 @@ public void merge(@Nullable SupervisorSpec existingSpec)
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);
+ public abstract SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ );
+
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 525444e23dea..199e004b4243 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -19,6 +19,9 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -26,6 +29,8 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.InvalidInput;
@@ -35,7 +40,11 @@
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -43,6 +52,7 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.metrics.SupervisorStatsProvider;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -59,6 +69,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
+import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
@@ -1068,6 +1079,186 @@ public void test_isAnotherTaskGroupPublishingToPartitions()
);
}
+ @Test
+ public void testResetToLatestAndBackfill() throws Exception
+ {
+ EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(ImmutableMap.of());
+ replayAll();
+ manager.start();
+
+ final ConcurrentHashMap> supervisorsMap = getSupervisorsMap();
+ final SeekableStreamSupervisorSpec streamSpec = EasyMock.createNiceMock(SeekableStreamSupervisorSpec.class);
+ final SeekableStreamSupervisor streamSupervisor = EasyMock.createNiceMock(SeekableStreamSupervisor.class);
+ final SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createNiceMock(SeekableStreamSupervisorIOConfig.class);
+
+ // non-SeekableStream supervisor → IAE
+ // Use a concrete anonymous Supervisor (not a mock) to reliably fail instanceof SeekableStreamSupervisor
+ final Supervisor nonStreamSupervisor = new Supervisor()
+ {
+ @Override
+ public void start()
+ {
+ }
+
+ @Override
+ public void stop(boolean stopGracefully)
+ {
+ }
+
+ @Override
+ public SupervisorReport getStatus()
+ {
+ return null;
+ }
+
+ @Override
+ public SupervisorStateManager.State getState()
+ {
+ return null;
+ }
+
+ @Override
+ public void reset(DataSourceMetadata dataSourceMetadata)
+ {
+ }
+ };
+ supervisorsMap.put("id1", Pair.of(nonStreamSupervisor, streamSpec));
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+
+ // useEarliestSequenceNumber=true → IAE
+ supervisorsMap.put("id1", Pair.of(streamSupervisor, streamSpec));
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(true).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks not set (null context) → IAE
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(null).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks=false → IAE
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", false)).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // useConcurrentLocks="true" (string) → accepted, fails at next guard (not RUNNING)
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", "true")).once();
+ EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // taskLockType=APPEND → accepted, fails at next guard (not RUNNING)
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("taskLockType", "APPEND")).once();
+ EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // supervisor not RUNNING → IAE
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once();
+ EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.SUSPENDED).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // empty latest offsets → ISE
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once();
+ EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once();
+ streamSupervisor.updatePartitionLagFromStream();
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of()).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+ EasyMock.reset(streamSupervisor, streamSpec, ioConfig);
+
+ // empty start offsets from metadata → ISE
+ EasyMock.expect(streamSupervisor.getIoConfig()).andReturn(ioConfig).anyTimes();
+ EasyMock.expect(ioConfig.isUseEarliestSequenceNumber()).andReturn(false).once();
+ EasyMock.expect(streamSpec.getContext()).andReturn(ImmutableMap.of("useConcurrentLocks", true)).once();
+ EasyMock.expect(streamSupervisor.getState()).andReturn(SupervisorStateManager.BasicState.RUNNING).once();
+ streamSupervisor.updatePartitionLagFromStream();
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(streamSupervisor.getLatestSequencesFromStream()).andReturn(ImmutableMap.of("0", 100L)).once();
+ EasyMock.expect(streamSupervisor.getOffsetsFromMetadataStorage()).andReturn(ImmutableMap.of()).once();
+ EasyMock.replay(streamSupervisor, streamSpec, ioConfig);
+ Assert.assertThrows(
+ IllegalStateException.class,
+ () -> manager.resetToLatestAndBackfill("id1", null)
+ );
+
+ verifyAll();
+ }
+
+ @Test
+ public void testCreateBackfillSpec()
+ {
+ final TestBackfillSupervisorSpec.IOConfig ioConfig = new TestBackfillSupervisorSpec.IOConfig("test-stream", null, null);
+ final TestBackfillSupervisorSpec.IngestionSpec ingestionSpec = new TestBackfillSupervisorSpec.IngestionSpec(ioConfig);
+ final SeekableStreamSupervisorSpec sourceSpec = new TestBackfillSupervisorSpec("original-id", ingestionSpec);
+
+ final BoundedStreamConfig boundedStreamConfig = new BoundedStreamConfig(
+ ImmutableMap.of("0", 100L),
+ ImmutableMap.of("0", 200L)
+ );
+
+ // Without overriding taskCount
+ final SupervisorSpec backfillSpec = sourceSpec.createBackfillSpec("backfill-id", boundedStreamConfig, null);
+ Assert.assertEquals("backfill-id", backfillSpec.getId());
+ final TestBackfillSupervisorSpec backfillCast = (TestBackfillSupervisorSpec) backfillSpec;
+ final BoundedStreamConfig actualConfig = backfillCast.getIoConfig().getBoundedStreamConfig();
+ Assert.assertNotNull(actualConfig);
+ Assert.assertEquals(ImmutableMap.of("0", 100L), actualConfig.getStartSequenceNumbers());
+ Assert.assertEquals(ImmutableMap.of("0", 200L), actualConfig.getEndSequenceNumbers());
+ Assert.assertEquals(1, backfillCast.getIoConfig().getTaskCount());
+
+ // With overriding taskCount
+ final SupervisorSpec backfillSpecWithCount = sourceSpec.createBackfillSpec("backfill-id-2", boundedStreamConfig, 5);
+ Assert.assertEquals("backfill-id-2", backfillSpecWithCount.getId());
+ final TestBackfillSupervisorSpec backfillWithCount = (TestBackfillSupervisorSpec) backfillSpecWithCount;
+ Assert.assertEquals(5, backfillWithCount.getIoConfig().getTaskCount());
+ }
+
private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;
@@ -1137,4 +1328,103 @@ public List getDataSources()
return Collections.singletonList(id);
}
}
+
+ @JsonTypeName("testBackfill")
+ private static class TestBackfillSupervisorSpec extends SeekableStreamSupervisorSpec
+ {
+ @JsonCreator
+ TestBackfillSupervisorSpec(
+ @JsonProperty("id") String id,
+ @JsonProperty("spec") IngestionSpec ingestionSpec
+ )
+ {
+ super(
+ id,
+ ingestionSpec,
+ ImmutableMap.of("useConcurrentLocks", true),
+ false,
+ null, null, null, null,
+ MAPPER,
+ null, null, null, null
+ );
+ }
+
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return "testBackfill";
+ }
+
+ @Override
+ public String getSource()
+ {
+ return "test-stream";
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return this;
+ }
+
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return new TestBackfillSupervisorSpec(
+ backfillId,
+ new IngestionSpec(new IOConfig(getIoConfig().getStream(), taskCount, boundedStreamConfig))
+ );
+ }
+
+ @Override
+ public SeekableStreamSupervisorIOConfig getIoConfig()
+ {
+ return getSpec().getIOConfig();
+ }
+
+ @JsonTypeName("testBackfillIngestionSpec")
+ static class IngestionSpec extends SeekableStreamSupervisorIngestionSpec
+ {
+ @JsonCreator
+ IngestionSpec(
+ @JsonProperty("ioConfig") IOConfig ioConfig
+ )
+ {
+ super(
+ new DataSchema(
+ "testDS",
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(Collections.emptyList()),
+ null, null, null, null, null
+ ),
+ ioConfig,
+ null
+ );
+ }
+ }
+
+ @JsonTypeName("testBackfillIOConfig")
+ static class IOConfig extends SeekableStreamSupervisorIOConfig
+ {
+ @JsonCreator
+ IOConfig(
+ @JsonProperty("stream") String stream,
+ @JsonProperty("taskCount") Integer taskCount,
+ @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig
+ )
+ {
+ super(stream, null, 1, taskCount, null, null, null, false, null, null, null, null, LagAggregator.DEFAULT, null, null, null, null, boundedStreamConfig);
+ }
+ }
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 4ccf4659994f..31e0d604a222 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -34,6 +34,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
@@ -1379,6 +1380,100 @@ public void testResetOffsets()
verifyAll();
}
+ @Test
+ public void testResetToLatestAndBackfill()
+ {
+ // 200 - success
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andReturn(ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh"));
+ replayAll();
+
+ Response response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("id", "my-id", "backfillSupervisorId", "my-id_backfill_abcdefgh"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 404 - supervisor does not exist
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of());
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(404, response.getStatus());
+ verifyAll();
+ resetAll();
+
+ // 400 - IAE (e.g. supervisor not running)
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andThrow(new IllegalArgumentException("Supervisor[my-id] must be in a RUNNING state"));
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Supervisor[my-id] must be in a RUNNING state"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 500 - ISE (e.g. failed to retrieve offsets)
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id"));
+ EasyMock.expect(supervisorManager.resetToLatestAndBackfill("my-id", null))
+ .andThrow(new IllegalStateException("Failed to get latest offsets from stream"));
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(500, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "Failed to get latest offsets from stream"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 400 - invalid backfillTaskCount (zero)
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", 0);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "backfillTaskCount must be a positive integer"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 400 - invalid backfillTaskCount (negative)
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", -1);
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(
+ ImmutableMap.of("error", "backfillTaskCount must be a positive integer"),
+ response.getEntity()
+ );
+ verifyAll();
+ resetAll();
+
+ // 503 - no supervisor manager (not leader)
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+
+ response = supervisorResource.resetToLatestAndBackfill("my-id", null);
+ Assert.assertEquals(503, response.getStatus());
+ verifyAll();
+ }
+
@Test
public void testNoopSupervisorSpecSerde() throws Exception
{
@@ -1668,6 +1763,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
return null;
}
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
+
@JsonIgnore
@Nonnull
@Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 3d0c6426feb2..8182ae482ca6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -884,6 +884,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
return null;
}
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
+
@Override
public String getType()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index eff5d1acd980..e19d68cb2b3f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3059,7 +3059,7 @@ public String toString()
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor()
{
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -3284,7 +3284,7 @@ protected String baseTaskName()
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
// do nothing
}
@@ -3381,7 +3381,7 @@ protected boolean doesTaskMatchSupervisor(Task task)
}
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -3521,7 +3521,7 @@ public LagStats computeLagStats()
}
@Override
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return streamOffsets;
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
index 4eefaed9bd99..c96a64211b97 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java
@@ -124,7 +124,7 @@ protected String baseTaskName()
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
// do nothing
}
@@ -205,7 +205,7 @@ protected boolean doesTaskMatchSupervisor(Task task)
}
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -436,6 +436,16 @@ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
{
return null;
}
+
+ @Override
+ public SeekableStreamSupervisorSpec createBackfillSpec(
+ String backfillId,
+ BoundedStreamConfig boundedStreamConfig,
+ @Nullable Integer taskCount
+ )
+ {
+ return null;
+ }
}
protected static SeekableStreamSupervisorTuningConfig getTuningConfig()
diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
index 0ca643f109f7..35488be081b1 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncherRetryTest.java
@@ -322,6 +322,12 @@ public ListenableFuture