Skip to content

Implement Kafka consumer pause/resume feature with async processing#103

Open
Smet2133 wants to merge 7 commits into
mainfrom
feat/kafka-consumer-pause-resume
Open

Implement Kafka consumer pause/resume feature with async processing#103
Smet2133 wants to merge 7 commits into
mainfrom
feat/kafka-consumer-pause-resume

Conversation

@Smet2133

Copy link
Copy Markdown

tests were implemented in kafka pipelines, quarkus ms intentionally having small max.poll.records and long work (sleep)

# max.poll.records=2 � effectiveBatch=3, pause when queue >= 3 (with handler-delay-ms).
maas.kafka.client.consumer.versioned.kafka-consumer.property.max.poll.records=2 
@Startup
public void createVersionedConsumer() {
    log.info("Creating versioned consumer (handler-delay-ms={})", handlerDelayMs);
    MaasKafkaConsumerCreationRequest consumerCreationRequest =
            MaasKafkaConsumerCreationRequest.builder()
                    .setConsumerDefinition(clientFactory.getConsumerDefinition("versioned"))
                    .setHandler(rec -> {
                        if (handlerDelayMs > 0) {
                            log.info("Versioned handler: sleeping {}ms before processing offset={}",
                                    handlerDelayMs, rec.offset());
                            try {
                                Thread.sleep(handlerDelayMs);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                            }
                        }
                        log.info("Received message: {}", rec);
                        lastOffset = rec.offset();
                    })
                    .build();
    maasKafkaConsumer = clientFactory.createConsumer(consumerCreationRequest);
    maasKafkaConsumer.initSync();
    maasKafkaConsumer.activateSync();
}

test bursts messages to topic:

@Test
@Tag("e2e-phase:test[satellite]:standalone/[baseline]:bgd{O{i}*P{a}*C}")
void testBeforeWarmup() {
    URL pgwBaselinePeer = getPublicGwUrl(BASELINE, PEER);

    for (int i = 0; i < VERSIONED_BURST_SIZE; i++) {
        sendRequest(pgwBaselinePeer, microserviceName, "versioned", "test-for-peer-" + i, null);
    }

    checkOffset(pgwBaselinePeer, microserviceName, VERSIONED_BURST_SIZE - 1);
}

@Test
@Tag("e2e-phase:test[satellite]:standalone/[baseline]:bgd{O{c}*P{a}*C}")
void testAfterWarmup() {
    URL pgwBaselinePeer = getPublicGwUrl(BASELINE, PEER);
    URL pgwBaselineOrigin = getPublicGwUrl(BASELINE, ORIGIN);

    for (int i = 0; i < VERSIONED_BURST_SIZE; i++) {
        sendRequest(pgwBaselinePeer, microserviceName, "versioned", "test-after-warmup-peer-" + i, null);
    }
    for (int i = 0; i < VERSIONED_BURST_SIZE; i++) {
        sendRequest(pgwBaselineOrigin, microserviceName, "versioned", "test-after-warmup-origin-" + i, null);
    }

    // peer: continues after testBeforeWarmup (offsets 0..4), +5 → last offset 9
    checkOffset(pgwBaselinePeer, microserviceName, 2 * VERSIONED_BURST_SIZE - 1);
    checkOffset(pgwBaselineOrigin, microserviceName, VERSIONED_BURST_SIZE - 1);
}

validation pause is working:

[2026-06-12 07:36:33.840] [INFO ] [requestId=1781249232276.0.2392317897896057] [caller=MaasKafkaProducerImpl] Start activation maas kafka producer: com.netcracker.maas.declarative.kafka.client.impl.client.producer.MaasKafkaProducerImpl$1@daed209[topic=MaasTopicDefinition(actualName=null, name=versioned, template=null, onTopicExist=null, namespace=core-2-bg-ci, partitions=null, replicationFactor=null, managedBy=MAAS, configs={}, versioned=false),isTenant=false,clientConfig={key.serializer=org.apache.kafka.common.serialization.StringSerializer, value.serializer=org.apache.kafka.common.serialization.StringSerializer}]
[2026-06-12 07:36:33.879] [INFO ] [requestId=1781249232276.0.2392317897896057] [caller=MaasKafkaProducerImpl] Finish activation maas kafka producer: com.netcracker.maas.declarative.kafka.client.impl.client.producer.MaasKafkaProducerImpl$1@11f54a43[topic=MaasTopicDefinition(actualName=null, name=versioned, template=null, onTopicExist=null, namespace=core-2-bg-ci, partitions=null, replicationFactor=null, managedBy=MAAS, configs={}, versioned=false),isTenant=false,clientConfig={key.serializer=org.apache.kafka.common.serialization.StringSerializer, value.serializer=org.apache.kafka.common.serialization.StringSerializer}]
[2026-06-12 07:36:33.979] [INFO ] [requestId=bg-kafka-it-f8f36fd8] [caller=VersionedController] Publish message: MaasProducerRecord[partition=null, key=versioned-topic-test-value, value="test-for-peer-0", timestamp=1781249793979, headers=null] to core-2-bg-ci-versioned
[2026-06-12 07:36:34.094] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] queued partition=0 offset=0 key=versioned-topic-test-value
[2026-06-12 07:36:34.094] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [worker] processing partition=0 offset=0 key=versioned-topic-test-value
[2026-06-12 07:36:34.096] [INFO ] [requestId=bg-kafka-it-f8f36fd8] [caller=OffsetHolder] Versioned handler: sleeping 2000ms before processing offset=0
[2026-06-12 07:36:34.113] [INFO ] [requestId=bg-kafka-it-ab5d7c92] [caller=VersionedController] Publish message: MaasProducerRecord[partition=null, key=versioned-topic-test-value, value="test-for-peer-1", timestamp=1781249794113, headers=null] to core-2-bg-ci-versioned
[2026-06-12 07:36:34.126] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] queued partition=0 offset=1 key=versioned-topic-test-value
[2026-06-12 07:36:34.142] [INFO ] [requestId=bg-kafka-it-80b7ea24] [caller=VersionedController] Publish message: MaasProducerRecord[partition=null, key=versioned-topic-test-value, value="test-for-peer-2", timestamp=1781249794142, headers=null] to core-2-bg-ci-versioned
[2026-06-12 07:36:34.161] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] queued partition=0 offset=2 key=versioned-topic-test-value
[2026-06-12 07:36:34.184] [INFO ] [requestId=bg-kafka-it-89b8ca5e] [caller=VersionedController] Publish message: MaasProducerRecord[partition=null, key=versioned-topic-test-value, value="test-for-peer-3", timestamp=1781249794184, headers=null] to core-2-bg-ci-versioned
[2026-06-12 07:36:34.196] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] queued partition=0 offset=3 key=versioned-topic-test-value
[2026-06-12 07:36:34.197] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] PAUSED — queue size 3 >= threshold 3
[2026-06-12 07:36:34.209] [INFO ] [requestId=bg-kafka-it-7eece7b8] [caller=VersionedController] Publish message: MaasProducerRecord[partition=null, key=versioned-topic-test-value, value="test-for-peer-4", timestamp=1781249794209, headers=null] to core-2-bg-ci-versioned
[2026-06-12 07:36:36.101] [INFO ] [requestId=bg-kafka-it-f8f36fd8] [caller=OffsetHolder] Received message: ConsumerRecord(topic = core-2-bg-ci-versioned, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1781249793979, deliveryCount = null, serialized key size = 26, serialized value size = 17, headers = RecordHeaders(headers = [RecordHeader(key = X-Request-Id, value = [98, 103, 45, 107, 97, 102, 107, 97, 45, 105, 116, 45, 102, 56, 102, 51, 54, 102, 100, 56])], isReadOnly = false), key = versioned-topic-test-value, value = "test-for-peer-0")
[2026-06-12 07:36:36.103] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] done partition=0 offset=0
[2026-06-12 07:36:36.103] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] processing partition=0 offset=1 key=versioned-topic-test-value
[2026-06-12 07:36:36.103] [INFO ] [requestId=bg-kafka-it-ab5d7c92] [caller=OffsetHolder] Versioned handler: sleeping 2000ms before processing offset=1
[2026-06-12 07:36:36.214] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] committing version=null position={core-2-bg-ci-versioned-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
[2026-06-12 07:36:38.104] [INFO ] [requestId=bg-kafka-it-ab5d7c92] [caller=OffsetHolder] Received message: ConsumerRecord(topic = core-2-bg-ci-versioned, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1781249794113, deliveryCount = null, serialized key size = 26, serialized value size = 17, headers = RecordHeaders(headers = [RecordHeader(key = X-Request-Id, value = [98, 103, 45, 107, 97, 102, 107, 97, 45, 105, 116, 45, 97, 98, 53, 100, 55, 99, 57, 50])], isReadOnly = false), key = versioned-topic-test-value, value = "test-for-peer-1")
[2026-06-12 07:36:38.104] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] done partition=0 offset=1
[2026-06-12 07:36:38.104] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] processing partition=0 offset=2 key=versioned-topic-test-value
[2026-06-12 07:36:38.104] [INFO ] [requestId=bg-kafka-it-80b7ea24] [caller=OffsetHolder] Versioned handler: sleeping 2000ms before processing offset=2
[2026-06-12 07:36:38.240] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] committing version=null position={core-2-bg-ci-versioned-0=OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''}}
[2026-06-12 07:36:38.250] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] RESUMED — queue size 1 <= threshold 1
[2026-06-12 07:36:38.251] [DEBUG] [requestId=1781249232276.0.2392317897896057] [caller=MaasConsumingExecutor] [consumer] queued partition=0 offset=4 key=versioned-topic-test-value
[2026-06-12 07:36:40.105] [INFO ] [requestId=bg-kafka-it-80b7ea24] [caller=OffsetHolder] Received message: ConsumerRecord(topic = core-2-bg-ci-versioned, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1781249794142, deliveryCount = null, serialized key size = 26, serialized value size = 17, headers = RecordHeaders(headers = [RecordHeader(key = X-Request-Id, value = [98, 103, 45, 107, 97, 102, 107, 97, 45, 105, 116, 45, 56, 48, 98, 55, 101, 97, 50, 52])], isReadOnly = false), key = versioned-topic-test-value, value = "test-for-peer-2")
[2026-06-12 07:36:40.105] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] done partition=0 offset=2
[2026-06-12 07:36:40.105] [DEBUG] [requestId=] [caller=MaasConsumingExecutor] [worker] processing partition=0 offset=3 key=versioned-topic-test-value
[2026-06-12 07:36:40.105] [INFO ] [requestId=bg-kafka-it-89b8ca5e] [caller=OffsetHolder] Versioned handler: sleeping 2000ms before processing offset=3

@Smet2133 Smet2133 requested a review from lis0x90 as a code owner June 12, 2026 11:46
@github-actions

Copy link
Copy Markdown


Thank you for your submission, we really appreciate it. Like many open-source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution. You can sign the CLA by just posting a Pull Request Comment same as the below format.


I have read the CLA Document and I hereby sign the CLA


You can retrigger this bot by commenting recheck in this Pull Request. Posted by the CLA Assistant Lite bot.

@github-actions github-actions Bot added the enhancement New feature or request label Jun 12, 2026
@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
79.3% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants