diff --git a/maas-client/kafka-blue-green-consumer/bg2-states-kafka-offsets.md b/maas-client/kafka-blue-green-consumer/bg2-states-kafka-offsets.md index 0a3f13aaf..9c20ad77b 100644 --- a/maas-client/kafka-blue-green-consumer/bg2-states-kafka-offsets.md +++ b/maas-client/kafka-blue-green-consumer/bg2-states-kafka-offsets.md @@ -26,7 +26,7 @@ state "Standalone (1)" as Standalone1 <> { Standalone1 --> Active1 : InitDomain -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders 2. [new] ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00 @@ -62,7 +62,7 @@ state "Active (2)" as Active1 <> { Active1 --> BGPrepare1 : Warmup -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-a_i-2023-07-07_10-30-00 2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00 @@ -99,7 +99,7 @@ state "BG Prepare (3)" as BGPrepare1 <> { BGPrepare1 --> Active2 : Commit -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-a_c-2023-07-07_11-30-00 2. ns-1.ms-1.orders-v2-c_a-2023-07-07_11-30-00 @@ -136,7 +136,7 @@ state "Active (4)" as Active2 <> { Active2 --> BGPrepare2 : Warmup -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-a_i-2023-07-07_12-30-00 2. [new] ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00 @@ -173,7 +173,7 @@ state "BG Prepare (5)" as BGPrepare2 <> { BGPrepare2 --> BGFinalize2 : Promote -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-a_c-2023-07-07_13-30-00 2. ns-1.ms-1.orders-v3-c_a-2023-07-07_13-30-00 @@ -219,7 +219,7 @@ state "BG Finalize (6)" as BGFinalize2 <> { BGFinalize2 --> BGPrepare3 : Rollback -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-l_a-2023-07-07_14-30-00 2. ns-1.ms-1.orders-v3-a_l-2023-07-07_14-30-00 @@ -265,7 +265,7 @@ state "BG Prepare (7)" as BGPrepare3 <> { BGPrepare3 --> BGFinalize3 : Promote -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-a_c-2023-07-07_15-30-00 2. ns-1.ms-1.orders-v3-c_a-2023-07-07_15-30-00 @@ -311,7 +311,7 @@ state "BG Finalize (8)" as BGFinalize3 <> { BGFinalize3 --> Active4 : Commit -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v1-l_a-2023-07-07_16-30-00 2. ns-1.ms-1.orders-v3-a_l-2023-07-07_16-30-00 @@ -347,7 +347,7 @@ state "Active (9)" as Active4 <> { Active4 --> Standalone2 : DestroyDomain -note right #white +note on link #white existing consumer-groups: 1. ns-1.ms-1.orders-v3-a_i-2023-07-07_17-30-00 2. [new] ns-1.ms-1.orders diff --git a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/BGKafkaConsumer.java b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/BGKafkaConsumer.java index 30286143c..4c2575c8f 100644 --- a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/BGKafkaConsumer.java +++ b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/BGKafkaConsumer.java @@ -20,5 +20,9 @@ public interface BGKafkaConsumer extends AutoCloseable { Collection assignment(); + void setPartitionsAssignedListener(PartitionsAssignedListener listener); + void close(); } + + diff --git a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/PartitionsAssignedListener.java b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/PartitionsAssignedListener.java new file mode 100644 index 000000000..b2669e0b6 --- /dev/null +++ b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/PartitionsAssignedListener.java @@ -0,0 +1,10 @@ +package com.netcracker.cloud.maas.bluegreen.kafka; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + +@FunctionalInterface +public interface PartitionsAssignedListener { + void onPartitionsAssigned(Collection partitions); +} diff --git a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/BGKafkaConsumerImpl.java b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/BGKafkaConsumerImpl.java index c7e8c36ea..d2186a972 100644 --- a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/BGKafkaConsumerImpl.java +++ b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/BGKafkaConsumerImpl.java @@ -6,6 +6,7 @@ import com.netcracker.cloud.bluegreen.api.service.BlueGreenStatePublisher; import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer; import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker; +import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener; import com.netcracker.cloud.maas.bluegreen.kafka.Record; import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch; import com.netcracker.cloud.maas.bluegreen.versiontracker.impl.VersionFilterConstructor; @@ -38,6 +39,7 @@ public class BGKafkaConsumerImpl implements BGKafkaConsumer { private BlueGreenState activeState; private final AtomicReference bgStateRef = new AtomicReference<>(); + private PartitionsAssignedListener partitionsAssignedListener; public BGKafkaConsumerImpl(BGKafkaConsumerConfig config) { this.config = config; @@ -125,6 +127,11 @@ public void commitSync(CommitMarker marker) { } } + @Override + public void setPartitionsAssignedListener(PartitionsAssignedListener listener) { + this.partitionsAssignedListener = listener; + } + @SneakyThrows public void close() { log.info("Closing consumer"); @@ -209,6 +216,9 @@ public void onPartitionsAssigned(Collection partitions) { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); log.debug("Adjusting current consumer offsets to: {}", alignedOffsets); alignedOffsets.forEach(((topicPartition, offsetAndMetadata) -> kafkaConsumer.seek(topicPartition, offsetAndMetadata))); + if (partitionsAssignedListener != null) { + partitionsAssignedListener.onPartitionsAssigned(partitions); + } } }; log.debug("Subscribing kafka consumer to the topics: {}", config.getTopics().stream().sorted().toList()); diff --git a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/DefaultKafkaConsumer.java b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/DefaultKafkaConsumer.java index 96b0594f5..fb0544e06 100644 --- a/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/DefaultKafkaConsumer.java +++ b/maas-client/kafka-blue-green-consumer/src/main/java/com/netcracker/cloud/maas/bluegreen/kafka/impl/DefaultKafkaConsumer.java @@ -2,6 +2,7 @@ import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer; import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker; +import com.netcracker.cloud.maas.bluegreen.kafka.PartitionsAssignedListener; import com.netcracker.cloud.maas.bluegreen.kafka.Record; import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch; import lombok.extern.slf4j.Slf4j; @@ -21,6 +22,7 @@ public class DefaultKafkaConsumer implements BGKafkaConsumer { private final Consumer consumer; + private PartitionsAssignedListener partitionsAssignedListener; public DefaultKafkaConsumer(BGKafkaConsumerConfig config) { this.consumer = config.getConsumerSupplier().apply(config.getProperties()); @@ -36,6 +38,9 @@ public void onPartitionsAssigned(Collection partitions) { log.debug("Partitions were assigned: {}", partitions); consumer.committed(new HashSet<>(partitions)).forEach(((topicPartition, offsetAndMetadata) -> consumer.seek(topicPartition, Optional.ofNullable(offsetAndMetadata).map(OffsetAndMetadata::offset).orElse(0L)))); + if (partitionsAssignedListener != null) { + partitionsAssignedListener.onPartitionsAssigned(partitions); + } } }); } @@ -63,6 +68,11 @@ public void commitSync(CommitMarker marker) { consumer.commitSync(marker.getPosition()); } + @Override + public void setPartitionsAssignedListener(PartitionsAssignedListener listener) { + this.partitionsAssignedListener = listener; + } + @Override public void pause() { this.consumer.pause(this.assignment()); diff --git a/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/MaasKafkaConsumerImpl.java b/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/MaasKafkaConsumerImpl.java index 73da89052..fbee2a652 100644 --- a/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/MaasKafkaConsumerImpl.java +++ b/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/MaasKafkaConsumerImpl.java @@ -275,11 +275,6 @@ private ConsumerExecContext createExecContext(TopicAddress topic) { execContext.setDeserializerHolder(deserializerHolder); execContext.setPollDuration(Duration.ofMillis(pollDuration)); - MaasConsumingExecutor executor; - executor = new MaasConsumingExecutor(execContext, - errorHandler, kafkaClientCreationService, recordFilters, statePublisher); - execContext.setExecutor(executor); - MaasKafkaBlueGreenDefinition blueGreenDefinition = consumerDefinition.getBlueGreenDefinition(); KafkaConsumerConfiguration.Builder kafkaConsumerConfigurationBuilder = KafkaConsumerConfiguration.builder(consumerCfg); if (blueGreenDefinition != null) { @@ -291,7 +286,12 @@ private ConsumerExecContext createExecContext(TopicAddress topic) { .setLocaldev(blueGreenDefinition.isLocaldev()) .setVersioned(topic.isVersioned()); } + // Must be set before MaasConsumingExecutor construction: executor reads max.poll.records from here. execContext.setBlueGreenConfiguration(kafkaConsumerConfigurationBuilder.build()); + + MaasConsumingExecutor executor = new MaasConsumingExecutor(execContext, + errorHandler, kafkaClientCreationService, recordFilters, statePublisher); + execContext.setExecutor(executor); return execContext; } diff --git a/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutor.java b/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutor.java index 04b158936..624f01865 100644 --- a/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutor.java +++ b/maas-declarative-client-commons/maas-kafka-client/src/main/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutor.java @@ -1,5 +1,6 @@ package com.netcracker.maas.declarative.kafka.client.impl.client.consumer.executor; +import com.netcracker.cloud.bluegreen.api.model.NamespaceVersion; import com.netcracker.cloud.bluegreen.api.service.BlueGreenStatePublisher; import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer; import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker; @@ -11,14 +12,25 @@ import com.netcracker.maas.declarative.kafka.client.impl.client.consumer.filter.Chain; import com.netcracker.maas.declarative.kafka.client.impl.client.consumer.filter.impl.FilterExecutor; import com.netcracker.maas.declarative.kafka.client.impl.client.creator.KafkaClientCreationService; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -26,27 +38,51 @@ public class MaasConsumingExecutor implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(MaasConsumingExecutor.class); + private static final int MIN_EFFECTIVE_BATCH = 3; // Floor for backpressure math when max.poll.records is very small (avoids pause/resume flicker). private final ConsumerInstanceHolder consumer; private final ConsumerExecContext context; private final AtomicReference stateRef = new AtomicReference<>(ExecutorState.INACTIVE); private final MaasKafkaConsumerErrorHandler errorHandler; private final AwaitExecutorService rescheduleTimeoutValueProvider; - private final RecordsBatchIterator batchIterator = new RecordsBatchIterator(); private final BlueGreenStatePublisher statePublisher; private final List consumerRecordProcessors; private final ConsumerRecordFilter terminalRecordHandler = new ConsumerRecordFilter() { - @Override - public void doFilter(Record record, Chain> next) { - context.getHandler().accept(record.getConsumerRecord()); - } + @Override + public void doFilter(Record record, Chain> next) { + context.getHandler().accept(record.getConsumerRecord()); + } - @Override - public int order() { - return Integer.MAX_VALUE; - } - }; + @Override + public int order() { + return Integer.MAX_VALUE; + } + }; + + // backpressure thresholds derived from max.poll.records + private final int queueCapacity; + private final int pauseThreshold; + private final int resumeThreshold; + + // shared state between executor thread and worker thread + private final BlockingQueue> workerQueue; + // keyed by (partition, version) — NOT partition alone. The async queue can hold records polled before + // and after a blue-green switch, so the same partition may be in flight under two generations at once + // Offsets of different generations live in different consumer groups and are not comparable, so each + // (partition, version) gets its own slot + private final ConcurrentHashMap readyToCommit = new ConcurrentHashMap<>(); + private final AtomicBoolean shouldBePaused = new AtomicBoolean(false); + // BGKafkaConsumerImpl creates the underlying Kafka consumer lazily on the first poll(); + // pause()/resume() must not be called before that. + private final AtomicBoolean consumerInitiated = new AtomicBoolean(false); + private final AtomicBoolean kafkaPauseApplied = new AtomicBoolean(false); + private final AtomicReference workerError = new AtomicReference<>(); + // set by the worker on a fatal error; stops it from processing further records so the committed + // offset can never advance past a failed record. Reset by the consumer thread during recovery. + private final AtomicBoolean processingHalted = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); + private Thread workerThread; public MaasConsumingExecutor(ConsumerExecContext context, MaasKafkaConsumerErrorHandler errorHandler, @@ -63,10 +99,27 @@ public MaasConsumingExecutor(ConsumerExecContext context, this.consumerRecordProcessors.addAll(consumerRecordFilters); this.consumerRecordProcessors.sort(Comparator.comparingInt(RecordFilter::order)); this.consumerRecordProcessors.add(terminalRecordHandler); // must be the very last one + + int maxPollRecords = Optional.ofNullable(context.getBlueGreenConfiguration()) + .map(c -> c.getConfigs().get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)) + .map(v -> Integer.parseInt(v.toString())) + .orElse(500); + int effectiveBatch = Math.max(maxPollRecords, MIN_EFFECTIVE_BATCH); + this.queueCapacity = effectiveBatch * 3; + this.pauseThreshold = effectiveBatch; + this.resumeThreshold = effectiveBatch / 2; + this.workerQueue = new LinkedBlockingQueue<>(queueCapacity); + LOG.info("max.poll.records={}, effectiveBatch={}, queue capacity={}, pause>={}, resume<={}", + maxPollRecords, effectiveBatch, queueCapacity, pauseThreshold, resumeThreshold); } public void init() { LOG.debug("Initializing consumer executor for topic: {}", context.getTopic().getTopicName()); + running.set(true); + workerThread = new Thread(this::workerLoop, + "maas-kafka-worker-" + context.getTopic().getTopicName()); + workerThread.setDaemon(true); + workerThread.start(); context.getExecutorService().execute(this); } @@ -74,7 +127,7 @@ public void init() { public void run() { var state = stateRef.get(); LOG.debug("Run in state: {}", state); - switch(state) { + switch (state) { case SUSPENDED, INACTIVE -> { consumer.release(); LOG.debug("Reschedule run after: 100ms"); @@ -108,43 +161,166 @@ public void run() { } public void consume(BGKafkaConsumer consumer) throws Exception { - if (batchIterator.isProcessed()) { - LOG.debug("Poll records and refill iterator"); - consumer.poll(context.getPollDuration()).ifPresent(batchIterator::set); + // commit offsets the worker has finished. commitSync blocks only for the broker round-trip (~ms) + // done before the workerError check so successfully processed records are committed even when a later record's error handler failed + if (!readyToCommit.isEmpty()) { + Map snapshot = Map.copyOf(readyToCommit); + snapshot.forEach(readyToCommit::remove); // conditional remove: only removes if value still matches + // commit one marker per NamespaceVersion: the async queue can hold records polled before and + // after a blue-green switch, so a snapshot may mix versions. Each commitSync carries a single + // version, which BGKafkaConsumer gates against the active state — committing offsets from one + // generation under another would corrupt the committed position. + // switch of the consumer is internal to BGKafkaConsumerImpl (this.activeState = reinitializeConsumerIfNeeded) + for (CommitMarker marker : groupByVersion(snapshot)) { + LOG.debug("[consumer] committing version={} position={}", marker.getVersion(), marker.getPosition()); + consumer.commitSync(marker); + } + } + + // rethrow any unrecoverable worker error — triggers consumer.release() in run() + // drop in-flight records: the recreated consumer re-fetches from the last committed offset + Exception err = workerError.getAndSet(null); + if (err != null) { + readyToCommit.clear(); + workerQueue.clear(); + // worker halted on the fatal error and stopped advancing the committed offset + // clear the halt so it can resume once the recreated consumer re-fetches from the last committed offset. + processingHalted.set(false); + throw err; + } + + // backpressure: update desired pause state based on queue fill + int queueSize = workerQueue.size(); + if (!shouldBePaused.get() && queueSize >= pauseThreshold) { + shouldBePaused.set(true); + LOG.debug("[consumer] PAUSED — queue size {} >= threshold {}", queueSize, pauseThreshold); + } else if (shouldBePaused.get() && queueSize <= resumeThreshold) { + shouldBePaused.set(false); + LOG.debug("[consumer] RESUMED — queue size {} <= threshold {}", queueSize, resumeThreshold); } - if (!batchIterator.isProcessed()) { - LOG.debug("Process batch: {}", batchIterator); - CommitMarker lastProcessedRecordMarker = null; - while(batchIterator.record() != null) { + // apply pause/resume only after the first poll() initiated the underlying consumer + if (consumerInitiated.get()) { + if (shouldBePaused.get()) { + if (!kafkaPauseApplied.getAndSet(true)) { + consumer.pause(); + } + } else if (kafkaPauseApplied.getAndSet(false)) { + consumer.resume(); + } + } + + // poll — short timeout when paused (heartbeat only, no fetches) + Duration pollTimeout = shouldBePaused.get() ? Duration.ofMillis(200) : context.getPollDuration(); + consumer.poll(pollTimeout).ifPresent(batch -> { + for (Record record : batch.getBatch()) { + if (!workerQueue.offer(record)) { + // emergency: queue hit capacity. it could happen if poll returned more records than max.poll.records + // Pause BEFORE spinning so poll(0) can only send + // heartbeats and can never fetch-and-drop records + shouldBePaused.set(true); + if (consumerInitiated.get() && !kafkaPauseApplied.getAndSet(true)) { + consumer.pause(); + } + do { + // NOT wrapped in safe() — exception must propagate to trigger normal error recovery + LOG.debug("[consumer] queue full ({}/{}), paused; calling poll(0) to stay alive", + workerQueue.size(), queueCapacity); + try { + consumer.poll(Duration.ZERO); // send heartbeat only, emergency loop and need to offer again asap, only network delay + } catch (Exception e) { + throw new RuntimeException(e); + } + } while (!workerQueue.offer(record)); + } + LOG.debug("[consumer] queued partition={} offset={} key={}", + record.getConsumerRecord().partition(), record.getConsumerRecord().offset(), + record.getConsumerRecord().key()); + } + }); + consumerInitiated.set(true); + } + + private void workerLoop() { + LOG.info("[worker] started for topic: {}", context.getTopic().getTopicName()); + while (running.get()) { + try { + if (processingHalted.get()) { + // fatal error pending — do NOT touch the queue until the consumer thread recovers, + // otherwise later successes would push the committed offset past the failed record + Thread.sleep(50); + continue; + } + Record record = workerQueue.poll(200, TimeUnit.MILLISECONDS); + if (record == null) continue; + + LOG.debug("[worker] processing partition={} offset={} key={}", + record.getConsumerRecord().partition(), record.getConsumerRecord().offset(), + record.getConsumerRecord().key()); try { - LOG.debug("Process record: {}", batchIterator.record()); - FilterExecutor.execute(consumerRecordProcessors, batchIterator.record()); - lastProcessedRecordMarker = batchIterator.record().getCommitMarker(); + FilterExecutor.execute(consumerRecordProcessors, record); + // mark offset as ready to commit. Key is (partition, version) so blue-green generations + // never overwrite each other. Compare this partition's offset only — BG markers are + // cumulative across partitions; a global max would let another partition mask a real advance. + var partition = new TopicPartition(record.getConsumerRecord().topic(), record.getConsumerRecord().partition()); + readyToCommit.merge( + new CommitKey(partition, record.getCommitMarker().getVersion()), + record.getCommitMarker(), + (existing, latest) -> offsetOf(latest, partition) > offsetOf(existing, partition) ? latest : existing + ); + LOG.debug("[worker] done partition={} offset={}", + record.getConsumerRecord().partition(), record.getConsumerRecord().offset()); } catch (Exception processingException) { try { - LOG.debug("Handle record processing exception: {}", processingException.getMessage()); - errorHandler.handle(processingException, - batchIterator.record().getConsumerRecord(), - batchIterator.handledRecords()); - } catch (Exception errorHandleException) { - LOG.debug("Exception in record error handler. Try to commit already processed records and rethrow: {}", errorHandleException.getMessage()); - Optional.ofNullable(lastProcessedRecordMarker).ifPresent(m -> safe(() -> consumer.commitSync(m))); - batchIterator.reset(); - errorHandleException.addSuppressed(processingException); - throw errorHandleException; + LOG.warn("[worker] handle record processing exception: {}", processingException.getMessage()); + errorHandler.handle(processingException, record.getConsumerRecord(), List.of()); //List.of empty because we don't have any records to send -- they are now committed in consumer loop separately + } catch (Exception errorHandlerException) { + LOG.error("[worker] error handler threw — halting worker until recovery", errorHandlerException); + if (errorHandlerException != processingException) { + errorHandlerException.addSuppressed(processingException); + } + // halt BEFORE publishing the error so the next loop iteration cannot dequeue and + // process another record (which would advance the committed offset past this one) + processingHalted.set(true); + workerError.set(errorHandlerException); + // do not exit loop — consumer thread will trigger recovery on next run() } } - // move to next record either if record successfully processed or, in case of error, successfully error handled - batchIterator.moveToNextRecord(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } - - LOG.debug("Commit records batch offset: {}", batchIterator.getBatchCommitMarker()); - consumer.commitSync(batchIterator.getBatchCommitMarker()); - batchIterator.markProcessed(); - } else { - LOG.debug("nothing to process"); } + LOG.info("[worker] stopped for topic: {}", context.getTopic().getTopicName()); + } + + private static long offsetOf(CommitMarker marker, TopicPartition partition) { + OffsetAndMetadata offset = marker.getPosition().get(partition); + return offset != null ? offset.offset() : -1L; + } + + private static Collection groupByVersion(Map snapshot) { + // poll returns one generation, but the queue can hold records from previous generations. + // group offsets by their owning NamespaceVersion so each is committed under its own generation. + // NamespaceVersion is used as a map key, so a null version (DefaultKafkaConsumer) needs a wrapper. + Map, Map> byVersion = new HashMap<>(); + snapshot.forEach((key, marker) -> { + Map offsets = + byVersion.computeIfAbsent(Optional.ofNullable(key.version()), v -> new HashMap<>()); + // each marker is cumulative across partitions; multiple (partition, version) keys for the same + // generation can carry overlapping entries — per-partition max, never blind putAll. + marker.getPosition().forEach((tp, off) -> + offsets.merge(tp, off, (a, b) -> b.offset() > a.offset() ? b : a)); + }); + + List result = new ArrayList<>(byVersion.size()); + byVersion.forEach((version, offsets) -> result.add(new CommitMarker(version.orElse(null), offsets))); + return result; + } + + // composite key so the same partition under two blue-green generations occupies distinct slots in + // readyToCommit; version may be null for the non-blue-green DefaultKafkaConsumer (records handle null). + private record CommitKey(TopicPartition partition, NamespaceVersion version) { } public void suspend() { @@ -160,6 +336,15 @@ public void resume() { public void close() { LOG.info("Close executor"); stateRef.set(ExecutorState.CLOSED); + running.set(false); + if (workerThread != null) { + workerThread.interrupt(); // if we are on workerQueue.poll() + try { + workerThread.join(5_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } public void start() { @@ -179,14 +364,28 @@ class ConsumerInstanceHolder { public BGKafkaConsumer getOrCreateInstance() { withSync(() -> instance.get() == null, () -> { LOG.info("Create new kafka consumer instance"); - instance.set(kafkaClientCreationService.createKafkaConsumer( + BGKafkaConsumer newConsumer = kafkaClientCreationService.createKafkaConsumer( context.getBlueGreenConfiguration(), context.getDeserializerHolder().getKeyDeserializer(), context.getDeserializerHolder().getValueDeserializer(), context.getTopic().getTopicName(), null, statePublisher - )); + ); + + // re-apply pause inside onPartitionsAssigned to close the rebalance window: + // kafka clears pause on revoke, but fetch for the new assignment hasn't been sent yet — + // pausing here prevents any records from slipping through in the same poll() call + newConsumer.setPartitionsAssignedListener(partitions -> { + if (shouldBePaused.get()) { + newConsumer.pause(); + kafkaPauseApplied.set(true); + LOG.debug("[consumer] re-applied pause after rebalance on partitions={}", partitions); + } + }); + instance.set(newConsumer); + consumerInitiated.set(false); + kafkaPauseApplied.set(false); }); return instance.get(); } diff --git a/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/api/MaasKafkaClientTest.java b/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/api/MaasKafkaClientTest.java index 59c6a7b46..5119b6339 100644 --- a/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/api/MaasKafkaClientTest.java +++ b/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/api/MaasKafkaClientTest.java @@ -33,6 +33,8 @@ import com.netcracker.maas.declarative.kafka.client.impl.definition.api.MaasKafkaClientDefinitionService; import com.netcracker.maas.declarative.kafka.client.impl.tenant.api.InternalTenantService; import com.netcracker.maas.declarative.kafka.client.impl.tracing.TracingService; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -256,7 +258,9 @@ void consumerBg() throws Exception { TopicInfo topicInfo = createTopicInfo(); when(maasKafkaTopicService.getTopicAddressByDefinition(any())).thenReturn(new TopicAddressImpl(topicInfo)); final CompletableFuture msg = new CompletableFuture<>(); - try (var ignored = maasKafkaConsumer(topicInfo.getName(), record -> msg.complete(record.value()), false, true)) { + String groupId = "group-" + RANDOM_STRING.get(); + try (var ignored = maasKafkaConsumer(topicInfo.getName(), record -> msg.complete(record.value()), false, true, groupId)) { + waitForBgConsumerReady(groupId); String testMsg = "message-" + RANDOM_STRING.get(); kafkaProducer.send(new ProducerRecord<>(topicInfo.getName(), testMsg)); assertEquals(testMsg, msg.get(ASYNC_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS)); @@ -272,6 +276,7 @@ void consumerBgRecreateOnException() throws Exception { String firstMsg = "message-first-" + RANDOM_STRING.get(); String secondMsg = "message-second-" + RANDOM_STRING.get(); AtomicInteger c = new AtomicInteger(0); + String groupId = "group-" + RANDOM_STRING.get(); try (var ignored = maasKafkaConsumer(topicInfo.getName(), rec -> { int counter = c.getAndIncrement(); if (counter == 0) { @@ -281,7 +286,8 @@ void consumerBgRecreateOnException() throws Exception { } else { secondMsgReceived.complete(rec.value()); } - }, false, true)) { + }, false, true, groupId)) { + waitForBgConsumerReady(groupId); kafkaProducer.send(new ProducerRecord<>(topicInfo.getName(), firstMsg)); assertEquals(firstMsg, firstMsgReceived.get(ASYNC_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS)); kafkaProducer.send(new ProducerRecord<>(topicInfo.getName(), secondMsg)); @@ -446,6 +452,26 @@ MaasKafkaConsumer maasKafkaConsumer(String topicName, Consumer { @Override public String deserialize(String topic, byte[] data) { diff --git a/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutorTest.java b/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutorTest.java index a53452df9..955e15eb9 100644 --- a/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutorTest.java +++ b/maas-declarative-client-commons/maas-kafka-client/src/test/java/com/netcracker/maas/declarative/kafka/client/impl/client/consumer/executor/MaasConsumingExecutorTest.java @@ -1,25 +1,42 @@ package com.netcracker.maas.declarative.kafka.client.impl.client.consumer.executor; +import com.netcracker.maas.declarative.kafka.client.impl.common.bg.KafkaConsumerConfiguration; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.netcracker.cloud.bluegreen.api.model.NamespaceVersion; +import com.netcracker.cloud.bluegreen.api.model.State; +import com.netcracker.cloud.bluegreen.api.model.Version; import com.netcracker.cloud.bluegreen.impl.service.InMemoryBlueGreenStatePublisher; import com.netcracker.cloud.bluegreen.impl.util.EnvUtil; import com.netcracker.cloud.maas.bluegreen.kafka.BGKafkaConsumer; +import com.netcracker.cloud.maas.bluegreen.kafka.CommitMarker; +import com.netcracker.cloud.maas.bluegreen.kafka.Record; +import com.netcracker.cloud.maas.bluegreen.kafka.RecordsBatch; import com.netcracker.cloud.maas.client.api.kafka.TopicAddress; import com.netcracker.maas.declarative.kafka.client.SyncBarrier; import com.netcracker.maas.declarative.kafka.client.api.MaasKafkaConsumerErrorHandler; import com.netcracker.maas.declarative.kafka.client.impl.client.consumer.DeserializerHolder; import com.netcracker.maas.declarative.kafka.client.impl.client.creator.KafkaClientCreationService; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.mockito.ArgumentCaptor; import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Consumer; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.*; class MaasConsumingExecutorTest { @@ -33,6 +50,7 @@ void setup() { ctx = new ConsumerExecContext(); ctx.setAwaitAfterErrorTimeList(List.of(100L)); ctx.setDeserializerHolder(new DeserializerHolder(new StringDeserializer(), new StringDeserializer())); + ctx.setPollDuration(Duration.ofSeconds(1)); var topicAddress = mock(TopicAddress.class); when(topicAddress.getTopicName()).thenReturn("orders"); @@ -210,8 +228,10 @@ void testRecordHandleException() { try { executor.start(); executor.init(); - barrier.await("handled", Duration.ofSeconds(1)); - barrier.await("consumed", Duration.ofSeconds(1)); + // handler error is handled on worker thread — errorHandler notifies "handled" + barrier.await("handled", Duration.ofSeconds(5)); + // next record is successfully handled on worker thread + barrier.await("consumed", Duration.ofSeconds(5)); } finally { executor.close(); } @@ -219,10 +239,10 @@ void testRecordHandleException() { barrier.reset(); verify(consumerCreatorService, times(1)).createKafkaConsumer(any(), any(), any(), any(), any(), any()); + // errorHandler returns normally → no consumer release → only closed once at executor.close() verify(consumer, timeout(10_000).times(1)).close(); } - @Test void testRecordErrorHandlerException() throws Exception { var consumer = mock(BGKafkaConsumer.class); @@ -261,17 +281,324 @@ void testRecordErrorHandlerException() throws Exception { try { executor.start(); executor.init(); - barrier.await("consumed", Duration.ofSeconds(1)); + // record 1 processed on worker thread + barrier.await("consumed", Duration.ofSeconds(5)); + // record 2 fails: errorHandler throws → worker halts → workerError → consumer.release() from executor thread verify(consumer, timeout(10_000).times(1)).close(); - barrier.await("consumed", Duration.ofSeconds(1)); + // after recovery the worker resumes (halt cleared) and processes a freshly polled record + barrier.await("consumed", Duration.ofSeconds(5)); + // wait until executor reschedules and creates a new consumer (100ms timeout + jitter) + verify(consumerCreatorService, timeout(10_000).times(2)).createKafkaConsumer(any(), any(), any(), any(), any(), any()); } finally { executor.close(); } barrier.reset(); - verify(consumerCreatorService, times(2)).createKafkaConsumer(any(), any(), any(), any(), any(), any()); - verify(consumer, timeout(10_000).atLeast(2)).commitSync(any()); // TODO validate commit marker + // record 1 offset is committed synchronously after worker marks it done + verify(consumer, timeout(10_000).atLeast(1)).commitSync(any()); verify(consumer, timeout(10_000).times(2)).close(); } + + @Test + void testWorkerHaltsAfterFatalErrorNoCommitPastFailure() throws Exception { + var consumer = mock(BGKafkaConsumer.class); + var consumerCreatorService = mock(KafkaClientCreationService.class); + var errorHandler = mock(MaasKafkaConsumerErrorHandler.class); + when(consumerCreatorService.createKafkaConsumer(any(), any(), any(), any(), any(), any())).thenReturn(consumer); + + var recordHandler = mock(Consumer.class); + ctx.setHandler(recordHandler); + + var executor = new MaasConsumingExecutor( + ctx, + errorHandler, + consumerCreatorService, + List.of(), + new InMemoryBlueGreenStatePublisher()); + + // single partition batch: offsets 0,1,2 — record at offset 1 fails fatally + when(consumer.poll(any())) + .thenReturn(singlePartitionBatch(3)) + .thenReturn(Optional.empty()); + + // offset 0 succeeds, offset 1 throws (processing error), offset 2 must NEVER be processed + doNothing() // offset 0 + .doThrow(new RuntimeException("oops")) // offset 1 + .when(recordHandler).accept(any()); + + // errorHandler rethrows → fatal → worker must halt + doThrow(new RuntimeException("ouch!")).when(errorHandler).handle(any(), any(), any()); + + try { + executor.start(); + executor.init(); + + // recovery happened: failed consumer released and a fresh one created + verify(consumerCreatorService, timeout(10_000).times(2)) + .createKafkaConsumer(any(), any(), any(), any(), any(), any()); + + // worker stopped after the fatal record: only offsets 0 and 1 ever reached the handler, + // offset 2 (still in the queue at failure time) was dropped, never processed + verify(recordHandler, timeout(10_000).times(2)).accept(any()); + verify(recordHandler, after(500).times(2)).accept(any()); + + // the committed offset never advanced past the failed record (offset 1 → next offset 2); + // only offset 0 was committable, i.e. commit position 1. Nothing at position >= 3. + verify(consumer, never()).commitSync(argThat(MaasConsumingExecutorTest::commitsPastFailure)); + } finally { + executor.close(); + } + } + + @Test + void testMixedVersionsCommittedSeparately() { + var consumer = mock(BGKafkaConsumer.class); + var barrier = new SyncBarrier(); + var consumerCreatorService = mock(KafkaClientCreationService.class); + when(consumerCreatorService.createKafkaConsumer(any(), any(), any(), any(), any(), any())).thenReturn(consumer); + + var recordHandler = mock(Consumer.class); + ctx.setHandler(recordHandler); + + var v1 = new NamespaceVersion("order-processor", State.ACTIVE, new Version("v1")); + var v2 = new NamespaceVersion("order-processor", State.CANDIDATE, new Version("v2")); + + // one poll batch holding records from two different generations on two partitions: + // partition 0 → v1, partition 1 → v2. After the worker processes both, readyToCommit + // holds a mixed-version snapshot, which must be committed as two separate markers. + when(consumer.poll(any())) + .thenReturn(twoVersionBatch(v1, v2)) + .thenAnswer(i -> { + barrier.notify("second-poll"); + return Optional.empty(); + }); + + var executor = new MaasConsumingExecutor(ctx, + (exception, errorRecord, handledRecords) -> {}, + consumerCreatorService, + List.of(), + new InMemoryBlueGreenStatePublisher()); + + try { + executor.start(); + executor.init(); + // wait until the batch has been polled and a later iteration ran (commit happened) + barrier.await("second-poll", Duration.ofSeconds(5)); + + var captor = ArgumentCaptor.forClass(CommitMarker.class); + verify(consumer, timeout(5_000).atLeast(2)).commitSync(captor.capture()); + + var committedVersions = captor.getAllValues().stream() + .map(CommitMarker::getVersion) + .collect(java.util.stream.Collectors.toSet()); + // both generations committed under their own version — never merged into one + org.junit.jupiter.api.Assertions.assertTrue(committedVersions.contains(v1), + "expected a commit for v1, got: " + committedVersions); + org.junit.jupiter.api.Assertions.assertTrue(committedVersions.contains(v2), + "expected a commit for v2, got: " + committedVersions); + + // each commit marker is single-version and single-partition (no cross-version mixing) + captor.getAllValues().forEach(marker -> { + if (v1.equals(marker.getVersion())) { + org.junit.jupiter.api.Assertions.assertEquals( + java.util.Set.of(new TopicPartition("orders", 0)), marker.getPosition().keySet()); + } else if (v2.equals(marker.getVersion())) { + org.junit.jupiter.api.Assertions.assertEquals( + java.util.Set.of(new TopicPartition("orders", 1)), marker.getPosition().keySet()); + } + }); + } finally { + executor.close(); + } + } + + + + @Test + void testSamePartitionTwoVersionsActiveOffsetNotDropped() throws InterruptedException { + var consumer = mock(BGKafkaConsumer.class); + var barrier = new SyncBarrier(); + var consumerCreatorService = mock(KafkaClientCreationService.class); + when(consumerCreatorService.createKafkaConsumer(any(), any(), any(), any(), any(), any())).thenReturn(consumer); + + var recordHandler = mock(Consumer.class); + ctx.setHandler(recordHandler); + + var vStale = new NamespaceVersion("order-processor", State.ACTIVE, new Version("v1")); + var vActive = new NamespaceVersion("order-processor", State.CANDIDATE, new Version("v2")); + + // Same partition (orders-0) in flight under two generations: + // - stale v1 at the HIGHER offset 10 (commit position 11) + // - active v2 at the LOWER offset 2 (commit position 3) — e.g. v2 re-read lower after offset alignment + // poll #3 blocks so no commit can run between the two worker merges, forcing both markers to coexist + // in readyToCommit at the same time — the exact collision the (partition, version) key must survive. + when(consumer.poll(any())) + .thenReturn(singlePartitionVersionBatch(vStale, 10)) // poll 1: v1 p0 offset 10 + .thenReturn(singlePartitionVersionBatch(vActive, 2)) // poll 2: v2 p0 offset 2 + .thenAnswer(i -> { // poll 3: block until released + barrier.notify("poll-blocked"); + barrier.await("release-poll", Duration.ofSeconds(10)); + return Optional.empty(); + }) + .thenReturn(Optional.empty()); // poll 4+: don't block + + // first record (v1) blocks the worker until released, so v2 stays queued and no commit removes v1; + // second record (v2) returns immediately + doAnswer(i -> { + barrier.notify("v1-processing"); + barrier.await("release-v1", Duration.ofSeconds(10)); + return null; + }).doAnswer(i -> { + barrier.notify("v2-handled"); + return null; + }).when(recordHandler).accept(any()); + + var executor = new MaasConsumingExecutor(ctx, + (exception, errorRecord, handledRecords) -> {}, + consumerCreatorService, + List.of(), + new InMemoryBlueGreenStatePublisher()); + + try { + executor.start(); + executor.init(); + + // worker has picked v1 and is blocked; poll #3 has blocked so no commit can run + barrier.await("v1-processing", Duration.ofSeconds(5)); + barrier.await("poll-blocked", Duration.ofSeconds(5)); + + // release v1: worker merges (p0,v1)=11, then processes v2 and merges (p0,v2)=3. + // both now coexist in readyToCommit because the blocked poll prevents any intervening commit. + barrier.notify("release-v1"); + barrier.await("v2-handled", Duration.ofSeconds(5)); + // the v2 notify fires inside the handler, just before the worker performs the merge — give it a moment + Thread.sleep(300); + + // unblock the poll loop → next consume() commits the snapshot holding BOTH versions + barrier.notify("release-poll"); + + var captor = ArgumentCaptor.forClass(CommitMarker.class); + verify(consumer, timeout(5_000).atLeast(1)).commitSync(captor.capture()); + + var tp0 = new TopicPartition("orders", 0); + // the active (lower-offset) v2 marker MUST be committed + boolean activeCommitted = captor.getAllValues().stream().anyMatch(m -> + vActive.equals(m.getVersion()) + && m.getPosition().get(tp0) != null + && m.getPosition().get(tp0).offset() == 3); + org.junit.jupiter.api.Assertions.assertTrue(activeCommitted, + "active version offset must not be dropped; commits=" + captor.getAllValues()); + + // the stale v1 marker is committed separately, never merged with v2 + boolean staleCommitted = captor.getAllValues().stream().anyMatch(m -> + vStale.equals(m.getVersion()) + && m.getPosition().get(tp0) != null + && m.getPosition().get(tp0).offset() == 11); + org.junit.jupiter.api.Assertions.assertTrue(staleCommitted, + "stale version marker should be committed separately; commits=" + captor.getAllValues()); + } finally { + executor.close(); + } + } + + @Test + void testBackpressurePause() { + var consumer = mock(BGKafkaConsumer.class); + var barrier = new SyncBarrier(); + var consumerCreatorService = mock(KafkaClientCreationService.class); + when(consumerCreatorService.createKafkaConsumer(any(), any(), any(), any(), any(), any())).thenReturn(consumer); + + // max.poll.records=2 → effectiveBatch=3 → pauseThreshold=3, resumeThreshold=1, queueCapacity=9 + var configs = new HashMap(); + configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); + var bgConfig = KafkaConsumerConfiguration.builder(configs).build(); + ctx.setBlueGreenConfiguration(bgConfig); + + var recordHandler = mock(Consumer.class); + ctx.setHandler(recordHandler); + + // Worker blocks on first record until test releases it — keeps queue full above pauseThreshold + doAnswer(i -> { + barrier.await("release-worker", Duration.ofSeconds(5)); + return null; + }) + .doNothing() + .when(recordHandler).accept(any()); + + // Two polls: worker pops one record from queue so by the 3rd consume() iteration + // the queue still has 3 records (2+2-1=3) ≥ pauseThreshold(3) → pause triggered + when(consumer.poll(any())) + .thenAnswer(i -> recordsGenerator.next()) // poll 1: 2 records + .thenAnswer(i -> recordsGenerator.next()) // poll 2: 2 more records while worker is blocked + .thenReturn(Optional.empty()); + + var executor = new MaasConsumingExecutor(ctx, + (exception, errorRecord, handledRecords) -> {}, + consumerCreatorService, + List.of(), + new InMemoryBlueGreenStatePublisher()); + + try { + executor.start(); + executor.init(); + + // after 2nd poll: queueSize(3) >= pauseThreshold(3) → pause applied + verify(consumer, timeout(5_000).atLeastOnce()).pause(); + + // release the worker — it drains the queue to 0 + barrier.notify("release-worker"); + + // queueSize drops below resumeThreshold(1) → resume applied + verify(consumer, timeout(5_000).atLeastOnce()).resume(); + } finally { + executor.close(); + } + } + + /** Build a batch with two partitions stamped with different versions: orders-0 → v1, orders-1 → v2. */ + private static Optional twoVersionBatch(NamespaceVersion v1, NamespaceVersion v2) { + var tp0 = new TopicPartition("orders", 0); + var tp1 = new TopicPartition("orders", 1); + var m0 = new CommitMarker(v1, Map.of(tp0, new OffsetAndMetadata(1))); + var m1 = new CommitMarker(v2, Map.of(tp1, new OffsetAndMetadata(1))); + List records = new ArrayList<>(); + records.add(new Record(new ConsumerRecord("orders", 0, 0, "k0", "d0"), m0)); + records.add(new Record(new ConsumerRecord("orders", 1, 0, "k1", "d1"), m1)); + return Optional.of(new RecordsBatch(records, m1)); + } + + /** Build a single-partition (orders-0), single-version batch with one record at the given offset. */ + private static Optional singlePartitionVersionBatch(NamespaceVersion version, int offset) { + var tp = new TopicPartition("orders", 0); + var marker = new CommitMarker(version, Map.of(tp, new OffsetAndMetadata(offset + 1))); + List records = new ArrayList<>(); + records.add(new Record(new ConsumerRecord("orders", 0, offset, "k" + offset, "d" + offset), marker)); + return Optional.of(new RecordsBatch(records, marker)); + } + + /** True if the marker would commit a position strictly greater than "right after the failed record" (offset 1 → pos 2). */ + private static boolean commitsPastFailure(CommitMarker marker) { + return marker.getPosition().values().stream() + .mapToLong(OffsetAndMetadata::offset) + .max() + .orElse(0) > 2; + } + + /** Build a single-partition (orders-0) batch with offsets 0..count-1 and cumulative commit markers. */ + private static Optional singlePartitionBatch(int count) { + var tp = new TopicPartition("orders", 0); + var version = new NamespaceVersion("order-processor", State.ACTIVE, new Version("v1")); + List records = new ArrayList<>(count); + CommitMarker batchMarker = null; + for (int i = 0; i < count; i++) { + Map pos = new HashMap<>(); + pos.put(tp, new OffsetAndMetadata(i + 1)); // commit "next offset" semantics + CommitMarker marker = new CommitMarker(version, pos); + records.add(new Record(new ConsumerRecord("orders", 0, i, "order" + i, "data" + i), marker)); + batchMarker = marker; + } + return Optional.of(new RecordsBatch(records, batchMarker)); + } + } diff --git a/maas-declarative-client-commons/pom.xml b/maas-declarative-client-commons/pom.xml index 2df31b95d..486a6f62b 100644 --- a/maas-declarative-client-commons/pom.xml +++ b/maas-declarative-client-commons/pom.xml @@ -27,7 +27,7 @@ 3.5.6 4.2.0 - 12.2.1 + 12.2.2-SNAPSHOT diff --git a/maas-declarative-client-quarkus/maas-kafka-quarkus-client/runtime/src/main/java/com/netcracker/maas/declarative/kafka/quarkus/client/ConfigUtils.java b/maas-declarative-client-quarkus/maas-kafka-quarkus-client/runtime/src/main/java/com/netcracker/maas/declarative/kafka/quarkus/client/ConfigUtils.java index 2cea448e3..1e90e1785 100644 --- a/maas-declarative-client-quarkus/maas-kafka-quarkus-client/runtime/src/main/java/com/netcracker/maas/declarative/kafka/quarkus/client/ConfigUtils.java +++ b/maas-declarative-client-quarkus/maas-kafka-quarkus-client/runtime/src/main/java/com/netcracker/maas/declarative/kafka/quarkus/client/ConfigUtils.java @@ -40,7 +40,11 @@ private static Optional getOptionalPropertyOfType(Config config, String k private static void putIfPresent(Config config, String key, String mapKey, Map map, Class... types) { for (Class type : types) { - getOptionalPropertyOfType(config, key, type).ifPresent(value -> map.put(mapKey, value)); + Optional value = getOptionalPropertyOfType(config, key, type); + if (value.isPresent()) { + map.put(mapKey, value.get()); + return; + } } } }