optimize: extend internalConcat dispatch for value-presented sources#2978
optimize: extend internalConcat dispatch for value-presented sources#2978He-Pin wants to merge 2 commits into
Conversation
f01178f to
c5a4e58
Compare
|
Hope this can improve some performance of pekko-http |
c5a4e58 to
64595fb
Compare
Motivation: PR apache#2977 added value-presented source dispatch for FlattenConcat / FlattenMerge so that materializing a substream is skipped when the inner source already carries its element(s) inline. The same opportunity exists for the eager / lazy `concat` operators, where the second source is often a `Source.single`, `Source.future`, an iterable, a range, a Java stream, `Source.repeat`, or a failed source. Materializing a fan-in graph for any of these is pure overhead. Modification: - `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource` recognises a known kind, fuse with a dedicated lightweight stage instead of `Concat`. New stages live under `pekko.stream.impl`: `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short- circuits to the upstream Source unchanged. - `JavaStreamConcat` mirrors `InflightJavaStreamSource` resource hygiene: the underlying `BaseStream` is opened on `onUpstreamFinish` and closed in `postStop` so cancellation, exhaustion, and exceptions all release the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching apache#2977's fix in `5d03002142`). - `FutureConcat` uses `Future.value` for the already-completed fast path and registers a parasitic async callback for the pending case. The pending path swaps the `out` handler to a no-op while waiting for the callback so we don't pull the now-closed `in`. `Success(null)` calls `completeStage()` (no null forwarded), staying in lock-step with `Source.future(Future.successful(null))` rerouting to `Source.empty` and `InflightSources.hasFutureElement` treating `Success(null)` as no-element. `DO NOT CHANGE` comment captures the invariant. - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`, `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`), avoidance of substream materialization for value-presented sources (counter never increments for the inlined source), Java-stream `BaseStream.close()` on both exhaustion and downstream cancellation (`take(2)` mid-stream), pending future resolved with `Success(null)` treated as completion, pending future resolved with `Failure(_)` failing the stream, and the existing `eager` / `concatLazy` parity matrix. Result: The eager and lazy concat operators take the same value-presented fast path as apache#2977's flatten operators. No substream is materialized for the inlined kinds; the Java-stream resource is closed deterministically; the pending-future null path is consistent with `Source.future` and the flatten inflight path; all 90 tests across the four FlowConcat suites pass, plus all 70 FlattenMerge / FlatMapConcat regression tests. Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat* References: PR apache#2977 (value-presented optimization for FlattenConcat / FlattenMerge); PR apache#2978 (this work)
Motivation: PR apache#2977 added value-presented source dispatch for FlattenConcat / FlattenMerge so that materializing a substream is skipped when the inner source already carries its element(s) inline. The same opportunity exists for the eager / lazy `concat` operators, where the second source is often a `Source.single`, `Source.future`, an iterable, a range, a Java stream, `Source.repeat`, or a failed source. Materializing a fan-in graph for any of these is pure overhead. Modification: - `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource` recognises a known kind, fuse with a dedicated lightweight stage instead of `Concat`. New stages live under `pekko.stream.impl`: `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short- circuits to the upstream Source unchanged. - `JavaStreamConcat` mirrors `InflightJavaStreamSource` resource hygiene: the underlying `BaseStream` is opened on `onUpstreamFinish` and closed in `postStop` so cancellation, exhaustion, and exceptions all release the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching apache#2977's fix in `5d03002142`). - `FutureConcat` uses `Future.value` for the already-completed fast path and registers a parasitic async callback for the pending case. The pending path swaps the `out` handler to a no-op while waiting for the callback so we don't pull the now-closed `in`. `Success(null)` calls `completeStage()` (no null forwarded), staying in lock-step with `Source.future(Future.successful(null))` rerouting to `Source.empty` and `InflightSources.hasFutureElement` treating `Success(null)` as no-element. `DO NOT CHANGE` comment captures the invariant. - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`, `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`), avoidance of substream materialization for value-presented sources (counter never increments for the inlined source), Java-stream `BaseStream.close()` on both exhaustion and downstream cancellation (`take(2)` mid-stream), pending future resolved with `Success(null)` treated as completion, pending future resolved with `Failure(_)` failing the stream, and the existing `eager` / `concatLazy` parity matrix. Result: The eager and lazy concat operators take the same value-presented fast path as apache#2977's flatten operators. No substream is materialized for the inlined kinds; the Java-stream resource is closed deterministically; the pending-future null path is consistent with `Source.future` and the flatten inflight path; all 90 tests across the four FlowConcat suites pass, plus all 70 FlattenMerge / FlatMapConcat regression tests. Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat* References: PR apache#2977 (value-presented optimization for FlattenConcat / FlattenMerge); PR apache#2978 (this work)
64595fb to
f2164c2
Compare
There was a problem hiding this comment.
Pull request overview
This PR extends FlowOps#internalConcat to avoid the heavyweight Concat(2, detached) fan-in path when the right-hand side is a “value-presented” source (single/iterable/iterator/range/repeat/java-stream/future/failed), by dispatching to specialized internal GraphStage implementations that inline-drain the RHS after the left side completes.
Changes:
- Extend
internalConcatdispatch to useTraversalBuilder.getValuePresentedSourceand route supported source types to new lightweight concat stages. - Add new internal stages:
IterableConcat,JavaStreamConcat,RepeatConcat,FutureConcat,FailedConcat. - Add directional tests in
FlowConcatSpecto assert the optimized stages are selected and produce expected output/closure behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala | Adds value-presented-source dispatch in internalConcat to select optimized concat stages. |
| stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala | New stage to drain iterable/iterator/range RHS after upstream completion. |
| stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala | New stage to drain Java-stream RHS and close the underlying BaseStream via postStop. |
| stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala | New stage to emit repeated element after upstream completion. |
| stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala | New stage to emit/fail from a future RHS after upstream completion. |
| stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala | New stage to fail the stream from a failed RHS after upstream completion. |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala | Adds tests asserting optimized stage selection and behavior for each value-presented RHS type. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Motivation: PR apache#2977 added value-presented source dispatch for FlattenConcat / FlattenMerge so that materializing a substream is skipped when the inner source already carries its element(s) inline. The same opportunity exists for the eager / lazy `concat` operators, where the second source is often a `Source.single`, `Source.future`, an iterable, a range, a Java stream, `Source.repeat`, or a failed source. Materializing a fan-in graph for any of these is pure overhead. Modification: - `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource` recognises a known kind, fuse with a dedicated lightweight stage instead of `Concat`. New stages live under `pekko.stream.impl`: `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short- circuits to the upstream Source unchanged. - `FailedConcat` calls `failStage(failure)` from `preStart` to mirror `FailedSource.preStart` semantics. The existing `concatGraph` path materialises `FailedSource` as a substream that fails immediately and `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same eager-failure timing avoids hangs on inputs that never complete (e.g. `Source.never.concat(Source.failed(ex))`). - `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to match `JavaStreamSource.preStart`. Side effects of `open()` (file / network resource acquisition) and any exceptions it throws happen at materialization time, matching the existing `concatGraph` path. The stream is closed in `postStop` so cancellation, exhaustion, iterator failure, and stage failure all release the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching apache#2977's fix in `5d03002142`). - `FutureConcat` registers the async callback in `preStart`, mirroring `FutureSource.preStart`: a pending-future failure surfaces eagerly via `failStage(ex)` even while upstream is still active (otherwise `Source.never.concat(Source.future(failingFuture))` would hang). A successful value is buffered in `futureResult` and emitted only after `onUpstreamFinish`, preserving concat ordering. `Future.value` is used for the already-completed fast path. The pending path swaps the `out` handler to a no-op while waiting for the callback so we don't pull the now-closed `in`. `Success(null)` calls `completeStage()` (no null forwarded), staying in lock-step with `Source.future(Future.successful(null))` rerouting to `Source.empty` and `InflightSources.hasFutureElement` treating `Success(null)` as no-element. `DO NOT CHANGE` comments capture both invariants. - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`, `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on both exhaustion and downstream cancellation (`take(2)` mid-stream), pending future resolved with `Success(null)` treated as completion, pending future resolved with `Failure(_)` failing the stream, eager failure for `Source.never.concat(Source.failed(ex))`, eager failure for `Source.never.concat(Source.future(pendingPromise))` when the promise later fails, and the existing `eager` / `concatLazy` parity matrix. Result: The eager and lazy concat operators take the same value-presented fast path as apache#2977's flatten operators. No substream is materialized for the inlined kinds; the Java-stream resource is closed deterministically; the failed / future eager-failure timing matches the existing concat-graph path so inputs that never complete cannot hang; the pending-future null path is consistent with `Source.future` and the flatten inflight path. All FlowConcat suites pass, plus all FlattenMerge / FlatMapConcat regression tests. Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat* References: PR apache#2977 (value-presented optimization for FlattenConcat / FlattenMerge); PR apache#2978 (this work)
f2164c2 to
fa3597a
Compare
Motivation: PR apache#2977 added value-presented source dispatch for FlattenConcat / FlattenMerge so that materializing a substream is skipped when the inner source already carries its element(s) inline. The same opportunity exists for the eager / lazy `concat` operators, where the second source is often a `Source.single`, `Source.future`, an iterable, a range, a Java stream, `Source.repeat`, or a failed source. Materializing a fan-in graph for any of these is pure overhead. Modification: - `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource` recognises a known kind, fuse with a dedicated lightweight stage instead of `Concat`. New stages live under `pekko.stream.impl`: `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short- circuits to the upstream Source unchanged. The inlined source's wrapping attributes (from `Source.foo(...).withAttributes(...)`) are carried over to the optimized stage via `addAttributes(other.traversalBuilder.attributes)` so any user-supplied `SupervisionStrategy`, dispatcher hint, log level, or stage name still applies on the fast path — preserving behavioural parity with the substream-materializing concatGraph path. - `FailedConcat` calls `failStage(failure)` from `preStart` to mirror `FailedSource.preStart` semantics. The existing `concatGraph` path materialises `FailedSource` as a substream that fails immediately and `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same eager-failure timing avoids hangs on inputs that never complete (e.g. `Source.never.concat(Source.failed(ex))`). - `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to match `JavaStreamSource.preStart`. Side effects of `open()` (file / network resource acquisition) and any exceptions from `open()` happen at materialization time, matching the existing `concatGraph` path. The stream is closed in `postStop` so cancellation, exhaustion, iterator failure, and stage failure all release the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching apache#2977's fix in `5d03002142`). - `FutureConcat` registers the async callback in `preStart`, mirroring `FutureSource.preStart`: a pending-future failure surfaces eagerly via `failStage(ex)` even while upstream is still active (otherwise `Source.never.concat(Source.future(failingFuture))` would hang). A successful value is buffered in `futureResult` and emitted only after `onUpstreamFinish`, preserving concat ordering. `Future.value` is used for the already-completed fast path. The pending path swaps the `out` handler to a no-op while waiting for the callback so we don't pull the now-closed `in`. `Success(null)` calls `completeStage()` (no null forwarded), staying in lock-step with `Source.future(Future.successful(null))` rerouting to `Source.empty` and `InflightSources.hasFutureElement` treating `Success(null)` as no-element. `DO NOT CHANGE` comments capture both invariants. - `IterableConcat` reads `inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider` and applies it on iterator-creation / iterator-iteration failure, mirroring `IterableSource` / `IteratorSource` semantics. Without this, a Resume / Restart decider attached to the inlined source via `withAttributes(supervisionStrategy(...))` would silently degrade to Stop on the optimized path. Restart re-invokes the iterator factory and resumes emission with a fresh iterator. - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`, `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on both exhaustion and downstream cancellation (`take(2)` mid-stream), pending future resolved with `Success(null)` treated as completion, pending future resolved with `Failure(_)` failing the stream, eager failure for `Source.never.concat(Source.failed(ex))`, eager failure for `Source.never.concat(Source.future(pendingPromise))` when the promise later fails, and supervision parity for the `IterableConcat` fast path: Resume-skips-throwing-element, Restart- recreates-iterator-after-throw, and default-Stop-fails. Existing `eager` / `concatLazy` parity matrix is preserved. Result: The eager and lazy concat operators take the same value-presented fast path as apache#2977's flatten operators. No substream is materialized for the inlined kinds; the Java-stream resource is closed deterministically; the failed / future eager-failure timing matches the existing concat-graph path so inputs that never complete cannot hang; the pending-future null path is consistent with `Source.future` and the flatten inflight path; and supervision deciders attached to inlined iterables / iterators flow through to the optimized stage, so `Source.fromIterator(...).withAttributes(...)` behaves identically whether dispatched through this fast path or through the substream path. All FlowConcat suites pass, plus all FlattenMerge / FlatMapConcat regression tests. Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat* References: PR apache#2977 (value-presented optimization for FlattenConcat / FlattenMerge); PR apache#2978 (this work)
fa3597a to
dd8e3c6
Compare
|
Follow-up on Comment 1 ( What changed
The only stage where this is observably load-bearing is
Directional tests added in
Each test asserts dispatch via Note on #2977 The same gap (attributes dropped during All FlowConcat suites (85 tests) pass, FlattenMerge / FlatMapConcat regression tests pass, MiMa is clean. |
Motivation: PR apache#2977 added value-presented source dispatch for FlattenConcat / FlattenMerge so that materializing a substream is skipped when the inner source already carries its element(s) inline. The same opportunity exists for the lazy `concat` operator, where the second source is often a `Source.single`, `Source.future`, an iterable, a range, a Java stream, `Source.repeat`, or a failed source. Materializing a fan-in graph for any of these is pure overhead. `concat` (the eager / detached variant) intentionally retains the substream-materializing path so that `Concat(_, detachedInputs = true)` + `Detacher` semantics — eager pull at materialization, one-element prefetch buffer, deadlock-breaking for cyclic graphs — are preserved. Modification: - `Flow.internalConcat` (Scala DSL): when `detached = false` (i.e. `concatLazy`) and `TraversalBuilder.getValuePresentedSource` recognises a known kind, fuse with a dedicated lightweight stage instead of `Concat`. When `detached = true` (i.e. `concat`), fall through to `concatGraph` to keep the eager-pull / prefetch / cycle-deadlock contract intact. New stages live under `pekko.stream.impl`: `SingleConcat`, `IterableConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-circuits to the upstream Source unchanged. The inlined source's wrapping attributes (from `Source.foo(...).withAttributes(...)`) are carried over to the optimized stage via `addAttributes(other.traversalBuilder.attributes)` so any user-supplied `SupervisionStrategy`, dispatcher hint, log level, or stage name still applies on the fast path. - `FailedConcat` calls `failStage(failure)` from `preStart` to mirror `FailedSource.preStart` semantics. The existing `concatGraph` path materialises `FailedSource` as a substream that fails immediately and `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same eager-failure timing avoids hangs on inputs that never complete (e.g. `Source.never.concatLazy(Source.failed(ex))`). - `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to match `JavaStreamSource.preStart`. Side effects of `open()` (file / network resource acquisition) and any exceptions from `open()` happen at materialization time, matching the existing `concatGraph` path. The stream is closed in `postStop` so cancellation, exhaustion, iterator failure, and stage failure all release the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching apache#2977's fix in `5d03002142`). - `FutureConcat` registers the async callback in `preStart`, mirroring `FutureSource.preStart`: a pending-future failure surfaces eagerly via `failStage(ex)` even while upstream is still active (otherwise `Source.never.concatLazy(Source.future(failingFuture))` would hang). A successful value is buffered in `futureResult` and emitted only after `onUpstreamFinish`, preserving concat ordering. `Future.value` is used for the already-completed fast path. The pending path swaps the `out` handler to a no-op while waiting for the callback so we don't pull the now-closed `in`. `Success(null)` calls `completeStage()` (no null forwarded), staying in lock-step with `Source.future(Future.successful(null))` rerouting to `Source.empty` and `InflightSources.hasFutureElement` treating `Success(null)` as no-element. `DO NOT CHANGE` comments capture both invariants. - `IterableConcat` reads `inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider` and applies it on iterator-creation / iterator-iteration failure, mirroring `IterableSource` / `IteratorSource` semantics. Without this, a Resume / Restart decider attached to the inlined source via `withAttributes(supervisionStrategy(...))` would silently degrade to Stop on the optimized path. Restart re-invokes the iterator factory and resumes emission with a fresh iterator. - Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`, `IterableConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on both exhaustion and downstream cancellation (`take(2)` mid-stream), pending future resolved with `Success(null)` treated as completion, pending future resolved with `Failure(_)` failing the stream, eager failure for `Source.never.concatLazy(Source.failed(ex))`, eager failure for `Source.never.concatLazy(Source.future(pendingPromise))` when the promise later fails, supervision parity for the `IterableConcat` fast path (Resume-skips, Restart-recreates, default-Stop-fails), and the detached-gating: a directional test asserts the fast path is not taken for `concat` (detached = true) and another asserts that `concat` preserves Detacher's eager pull side-effect timing for an `IteratorSource` factory while `concatLazy` defers it. Existing eager / concatLazy parity matrix is preserved by gating each `pendingBuilder.toString` assertion on `!eager`. Result: The lazy concat operator takes the same value-presented fast path as apache#2977's flatten operators while the eager / detached `concat` keeps its documented Detacher semantics. No substream is materialized for the inlined kinds on the lazy path; the Java-stream resource is closed deterministically; the failed / future eager-failure timing matches the existing concat-graph path so inputs that never complete cannot hang; the pending-future null path is consistent with `Source.future` and the flatten inflight path; supervision deciders attached to inlined iterables / iterators flow through to the optimized stage; and the detached-true contract is preserved bit-for-bit by falling through to `concatGraph`. All FlowConcat suites pass, plus all FlattenMerge / FlatMapConcat regression tests. Tests: stream-tests/testOnly *FlowConcat*Spec *Concat* *FlattenMerge* *FlatMapConcat* References: PR apache#2977 (value-presented optimization for FlattenConcat / FlattenMerge); PR apache#2978 (this work)
dd8e3c6 to
8a462ef
Compare
|
@pjfanning I think this one should be ready now |
Motivation
FlowOps#internalConcatpreviously had only one fast-path: aSingleSourceon the right-hand side ofconcat/concatLazywas rerouted through the lightweightSingleConcatstage instead of the general two-portConcat[U](2, detached)fan-in graph (which materializes the whole substream plus a detacher buffer). All other value-presented sources (IterableSource,IteratorSource,RangeSource,RepeatSource,JavaStreamSource,FutureSource,FailedSource) still took the heavyconcatGraphpath even though their data is already in memory or trivially producible — the fan-in machinery and substream materialization were pure overhead. Heavyconcatusers (pekko-http and others) carry that cost on every materialization.This mirrors the optimization shipped for
FlattenConcat/FlattenMergein #2977 (now merged), applied to theconcatoperator chain.Modification
Add specialized
GraphStage[FlowShape[E, E]]siblings ofSingleConcat, each passing through elements while upstream is alive and draining its captured value-presented payload ononUpstreamFinish:IterableConcat[E](createIterator)IterableSource,IteratorSource,RangeSourceemitMultiple(out, createIterator(), () => completeStage())JavaStreamConcat[E](open)JavaStreamSourceBaseStream, drain its iterator, close inpostStop(mirrorsInflightJavaStreamSourceresource hygiene from #2977)RepeatConcat[E](elem)RepeatSourceOutHandlerso eachonPullpusheselemFailedConcat[E](failure)FailedSourcefailStage(failure)FutureConcat[E](future)FutureSourceOutHandlerto a no-op (avoid pulling closedin), register parasitic async callback that emits/fails when resolved.Success(null)callscompleteStage()— lock-step withSource.future(Future.successful(null))rerouting toSource.emptyandInflightSources.hasFutureElementtreatingSuccess(null)as no-element (no null is ever forwarded — Reactive Streams compliance).internalConcatis extended to dispatch viaTraversalBuilder.getValuePresentedSourceand pattern-match the eight value-presented source types (existingSingleSourcepath is preserved). Thedetachedflag is irrelevant for these stages — the right-hand data is already present, so the one-element pre-fetch buffer thatdetached=trueprovides has nothing to fetch (matchingSingleConcat's precedent).Result
For the eight value-presented source types,
concatandconcatLazyno longer pay for substream materialization or the two-port fan-in graph. Observable behavior is unchanged for all other sources, which still take the existingconcatGraphpath. The Java-streamBaseStreamis closed deterministically on exhaustion, downstream cancellation, and exceptions. Pending-futureSuccess(null)is consistent with the materializedFutureSourcepath and the flatten inflight path.Tests
Directional tests added to
AbstractFlowConcatSpec(cover bothconcatandconcatLazypaths):optimize iterable concat/range concat/iterator concat/java-stream concat— assert correct stage in the traversal builder, output correct.optimize repeat concat— assertRepeatConcat(0), bounded with.take(6).optimize failed concat— assertFailedConcat, error propagated.optimize completed-future concat— confirms it routes throughSingleConcat(upstream optimization).optimize pending-future concat— uses an unresolvedPromise, assertsFutureConcatand correct delivery after resolution.treat pending-future resolved with null as completion in concat—Promise.success(null)⇒ stream completes without emitting null.fail the stream when pending-future resolves with failure in concat—Promise.failure(ex)⇒ stream fails withex.optimize failed-future concat— confirms it routes throughFailedConcat(upstream optimization).close the underlying java-stream after java-stream concat completes—BaseStream.close()runs on stage exhaustion (postStop).close the underlying java-stream when downstream cancels mid java-stream concat—take(2)mid-stream still triggersBaseStream.close().avoid downstream substream materialization for value-presented sources— counter-based assertion of zero substream materialization.Verification commands:
All new stages are
private[pekko]/@InternalApiso MiMa stays green.References
FlattenConcat/FlattenMerge, merged).