Skip to content

Add AIMD based dynamic producer batch sizing to JCQueue#8796

Open
GGraziadei wants to merge 1 commit into
apache:masterfrom
GGraziadei:jcqueue-dynamic-batch-size
Open

Add AIMD based dynamic producer batch sizing to JCQueue#8796
GGraziadei wants to merge 1 commit into
apache:masterfrom
GGraziadei:jcqueue-dynamic-batch-size

Conversation

@GGraziadei

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR introduces an adaptive batch-sizing strategy for JCQueue's producer-side inserter, controlled by a new feature flag (topology.producer.batch.dynamic). Instead of committing to a fixed producerBatchSz, the new DynamicBatchInserter starts at a batch size of 1 and adjusts it online using AIMD: it additively grows the effective size (+1) after flushing a full batch (heavy load) and multiplicatively shrinks it (halving toward 1) after a timer-driven partial flush (light load), with the configured batch size acting as a ceiling rather than a fixed target. This lets the queue favor low latency under light load while preserving throughput under sustained back-pressure, without manual per-topology tuning.

How was the change tested

  • Unit tests
  • Benchmark BatchInserter (baseline) vs DynamicBatchInserter, report in the first comment.

@GGraziadei

Copy link
Copy Markdown
Member Author

Performance analysis

The benchmarks were run against the FileReadWordCountTopo topology, a standard word-count workload from Storm's performance suite, exercising the inter-executor transfer path that JCQueue's producer batching governs. The topology was deployed on a dockerized dev-cluster, providing a reproducible, self-contained Storm environment so that each configuration ran under identical resource and parallelism conditions, with the same fixed topology shape (2 workers, 7 tasks, 7 executors, single spout executor) across all runs so that the producer batch-sizing strategy was the only variable. The results consistently favored dynamic batching: across batch ceilings of 10, 100, and 1000 it matched or beat every static configuration on all three metrics, delivering throughput gains of up to ~9% and average complete-latency reductions of 8–12%, with the largest benefit at small ceilings where static batching faces the sharpest latency-versus-throughput trade-off and no measured downside at any setting. An extended 600-second run at ceiling 1000 further showed a brief learning phase after which the AIMD controller converges on a stable optimum, with latency settling near 376 ms within a ~2 ms standard deviation, confirming that the policy discovers a good batch size online and then locks onto it rather than relying on a manually tuned fixed value.

Attached raw data and report.

jcqueue-dynamic-batch-size.txt

Dynamic Producer Batch Sizing in Apache Storm.pdf

@reiabreu reiabreu requested review from jnioche and rzo1 June 24, 2026 15:11
@reiabreu

Copy link
Copy Markdown
Contributor

@GGraziadei thanks for the submission. Overall it looks good.
I always try to run the change set through an LLM to make sure we are not missing anything.
Here are some suggestions. Please note that point A was already present before your PR.

A. Non-Static ThreadLocal Memory Leak Cycle

The PR defines the thread-local fields as instance variables of JCQueue :

private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>();                                                                                                                                           
private final ThreadLocal<DynamicBatchInserter> thdLocalDynamicBatcher = new ThreadLocal<DynamicBatchInserter>();                                                                                                                      

In Java, this creates a cyclic reference cycle:

• Thread holds ThreadLocalMap .
• ThreadLocalMap contains value DynamicBatchInserter .
• DynamicBatchInserter holds a strong reference back to the outer JCQueue instance via this.queue .
• JCQueue holds the ThreadLocal key instance.

Even if the JCQueue reference is discarded elsewhere, it cannot be garbage collected because it is reachable from the thread's map, which prevents the key from being weakly cleared.

│ [!NOTE]
│ This pattern already existed for thdLocalBatcher in JCQueue prior to this PR. However, adding thdLocalDynamicBatcher replicates the leak. While usually bounded by the lifetime of the worker thread in Storm, in environments
│ where topologies are frequently started/stopped on static threads, this will leak JCQueue memory.

│ Mitigation: The ThreadLocal could be made static , using a composite key (like ThreadLocal<Map<JCQueue, Inserter>> where keys are weak references) or cleaning them up during close() .

B. Transient Backpressure Smoothing

In tryFlush() , when the queue is full under backpressure, tryPublishInternal returns 0. The method returns false but still executes the afterFlush(wasFull) hook:

    public boolean tryFlush() {
        ...
        boolean wasFull = currentBatch.size() >= batchSize();
        int publishCount = queue.tryPublishInternal(currentBatch);
        if (publishCount == 0) {
            ...
            afterFlush(wasFull);
            return false;
        ...

If wasFull was true , effectiveBatchSz increases.
When the caller retries tryPublish() , the new effectiveBatchSz is larger. As a result, the check currentBatch.size() >= batchSize() now returns false .
This allows the retried item to be appended to the local batch, and tryPublish() returns true (success) instead of immediately propagating backpressure to the caller.

│ [!TIP]
│ This behaves as a transient "buffer absorber" under sudden backpressure. The backpressure signal is temporarily delayed until the local batch size reaches the hard cap ( maxBatchSz ), at which point it consistently blocks. This is
a
│ safe and beneficial smoothing behavior, but a subtle consequence of calling afterFlush() on failed flushes.

@rzo1 rzo1 added this to the 3.0.0 milestone Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants