Skip to content

optimize: extend internalConcat dispatch for value-presented sources#2978

Open
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:optimize-internal-concat-value-presented
Open

optimize: extend internalConcat dispatch for value-presented sources#2978
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:optimize-internal-concat-value-presented

Conversation

@He-Pin
Copy link
Copy Markdown
Member

@He-Pin He-Pin commented May 17, 2026

Motivation

FlowOps#internalConcat previously had only one fast-path: a SingleSource on the right-hand side of concat / concatLazy was rerouted through the lightweight SingleConcat stage instead of the general two-port Concat[U](2, detached) fan-in graph (which materializes the whole substream plus a detacher buffer). All other value-presented sources (IterableSource, IteratorSource, RangeSource, RepeatSource, JavaStreamSource, FutureSource, FailedSource) still took the heavy concatGraph path even though their data is already in memory or trivially producible — the fan-in machinery and substream materialization were pure overhead. Heavy concat users (pekko-http and others) carry that cost on every materialization.

This mirrors the optimization shipped for FlattenConcat / FlattenMerge in #2977 (now merged), applied to the concat operator chain.

Modification

Add specialized GraphStage[FlowShape[E, E]] siblings of SingleConcat, each passing through elements while upstream is alive and draining its captured value-presented payload on onUpstreamFinish:

New stage Handles Drain behavior
IterableConcat[E](createIterator) IterableSource, IteratorSource, RangeSource emitMultiple(out, createIterator(), () => completeStage())
JavaStreamConcat[E](open) JavaStreamSource Open BaseStream, drain its iterator, close in postStop (mirrors InflightJavaStreamSource resource hygiene from #2977)
RepeatConcat[E](elem) RepeatSource Swap OutHandler so each onPull pushes elem
FailedConcat[E](failure) FailedSource failStage(failure)
FutureConcat[E](future) FutureSource If completed: emit/fail. If pending: swap OutHandler to a no-op (avoid pulling closed in), register parasitic async callback that emits/fails when resolved. Success(null) calls completeStage() — lock-step with Source.future(Future.successful(null)) rerouting to Source.empty and InflightSources.hasFutureElement treating Success(null) as no-element (no null is ever forwarded — Reactive Streams compliance).

internalConcat is extended to dispatch via TraversalBuilder.getValuePresentedSource and pattern-match the eight value-presented source types (existing SingleSource path is preserved). The detached flag is irrelevant for these stages — the right-hand data is already present, so the one-element pre-fetch buffer that detached=true provides has nothing to fetch (matching SingleConcat's precedent).

Note: Source.future(Future.successful(x)) is itself optimized upstream into SingleSource, so completed futures dispatch via SingleConcat. Same for Source.future(Future.failed(ex))FailedSourceFailedConcat. Only genuinely-pending futures land on FutureConcat.

Result

For the eight value-presented source types, concat and concatLazy no longer pay for substream materialization or the two-port fan-in graph. Observable behavior is unchanged for all other sources, which still take the existing concatGraph path. The Java-stream BaseStream is closed deterministically on exhaustion, downstream cancellation, and exceptions. Pending-future Success(null) is consistent with the materialized FutureSource path and the flatten inflight path.

Tests

Directional tests added to AbstractFlowConcatSpec (cover both concat and concatLazy paths):

  • optimize iterable concat / range concat / iterator concat / java-stream concat — assert correct stage in the traversal builder, output correct.
  • optimize repeat concat — assert RepeatConcat(0), bounded with .take(6).
  • optimize failed concat — assert FailedConcat, error propagated.
  • optimize completed-future concat — confirms it routes through SingleConcat (upstream optimization).
  • optimize pending-future concat — uses an unresolved Promise, asserts FutureConcat and correct delivery after resolution.
  • treat pending-future resolved with null as completion in concatPromise.success(null) ⇒ stream completes without emitting null.
  • fail the stream when pending-future resolves with failure in concatPromise.failure(ex) ⇒ stream fails with ex.
  • optimize failed-future concat — confirms it routes through FailedConcat (upstream optimization).
  • close the underlying java-stream after java-stream concat completesBaseStream.close() runs on stage exhaustion (postStop).
  • close the underlying java-stream when downstream cancels mid java-stream concattake(2) mid-stream still triggers BaseStream.close().
  • avoid downstream substream materialization for value-presented sources — counter-based assertion of zero substream materialization.

Verification commands:

sbt "stream-tests/testOnly *FlowConcat*Spec"               # 90/90 pass
sbt "stream-tests/testOnly *FlattenMerge* *FlatMapConcat*" # 70/70 pass (regression)
sbt "stream/mimaReportBinaryIssues"                        # MiMa clean
sbt scalafmtCheckAll headerCheckAll                        # clean

All new stages are private[pekko] / @InternalApi so MiMa stays green.

References

@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch 2 times, most recently from f01178f to c5a4e58 Compare May 18, 2026 03:54
@He-Pin He-Pin added this to the 2.0.0-M3 milestone May 18, 2026
@He-Pin
Copy link
Copy Markdown
Member Author

He-Pin commented May 18, 2026

Hope this can improve some performance of pekko-http

@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch from c5a4e58 to 64595fb Compare May 20, 2026 04:42
He-Pin added a commit to He-Pin/incubator-pekko that referenced this pull request May 20, 2026
Motivation:
PR apache#2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged.
- `JavaStreamConcat` mirrors `InflightJavaStreamSource` resource hygiene:
  the underlying `BaseStream` is opened on `onUpstreamFinish` and closed
  in `postStop` so cancellation, exhaustion, and exceptions all release
  the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive
  bound on Scala 3, matching apache#2977's fix in `5d03002142`).
- `FutureConcat` uses `Future.value` for the already-completed fast path
  and registers a parasitic async callback for the pending case. The
  pending path swaps the `out` handler to a no-op while waiting for the
  callback so we don't pull the now-closed `in`. `Success(null)` calls
  `completeStage()` (no null forwarded), staying in lock-step with
  `Source.future(Future.successful(null))` rerouting to `Source.empty`
  and `InflightSources.hasFutureElement` treating `Success(null)` as
  no-element. `DO NOT CHANGE` comment captures the invariant.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), avoidance of substream
  materialization for value-presented sources (counter never increments
  for the inlined source), Java-stream `BaseStream.close()` on both
  exhaustion and downstream cancellation (`take(2)` mid-stream), pending
  future resolved with `Success(null)` treated as completion, pending
  future resolved with `Failure(_)` failing the stream, and the existing
  `eager` / `concatLazy` parity matrix.

Result:
The eager and lazy concat operators take the same value-presented fast
path as apache#2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
pending-future null path is consistent with `Source.future` and the
flatten inflight path; all 90 tests across the four FlowConcat suites
pass, plus all 70 FlattenMerge / FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR apache#2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR apache#2978 (this work)
He-Pin added a commit to He-Pin/incubator-pekko that referenced this pull request May 20, 2026
Motivation:
PR apache#2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged.
- `JavaStreamConcat` mirrors `InflightJavaStreamSource` resource hygiene:
  the underlying `BaseStream` is opened on `onUpstreamFinish` and closed
  in `postStop` so cancellation, exhaustion, and exceptions all release
  the resource (`stream` typed `BaseStream[E, _]` to avoid a recursive
  bound on Scala 3, matching apache#2977's fix in `5d03002142`).
- `FutureConcat` uses `Future.value` for the already-completed fast path
  and registers a parasitic async callback for the pending case. The
  pending path swaps the `out` handler to a no-op while waiting for the
  callback so we don't pull the now-closed `in`. `Success(null)` calls
  `completeStage()` (no null forwarded), staying in lock-step with
  `Source.future(Future.successful(null))` rerouting to `Source.empty`
  and `InflightSources.hasFutureElement` treating `Success(null)` as
  no-element. `DO NOT CHANGE` comment captures the invariant.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), avoidance of substream
  materialization for value-presented sources (counter never increments
  for the inlined source), Java-stream `BaseStream.close()` on both
  exhaustion and downstream cancellation (`take(2)` mid-stream), pending
  future resolved with `Success(null)` treated as completion, pending
  future resolved with `Failure(_)` failing the stream, and the existing
  `eager` / `concatLazy` parity matrix.

Result:
The eager and lazy concat operators take the same value-presented fast
path as apache#2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
pending-future null path is consistent with `Source.future` and the
flatten inflight path; all 90 tests across the four FlowConcat suites
pass, plus all 70 FlattenMerge / FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR apache#2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR apache#2978 (this work)
@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch from 64595fb to f2164c2 Compare May 20, 2026 04:55
@He-Pin He-Pin marked this pull request as ready for review May 20, 2026 04:56
@He-Pin He-Pin requested a review from Copilot May 20, 2026 04:57
@He-Pin He-Pin requested a review from pjfanning May 20, 2026 04:58
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends FlowOps#internalConcat to avoid the heavyweight Concat(2, detached) fan-in path when the right-hand side is a “value-presented” source (single/iterable/iterator/range/repeat/java-stream/future/failed), by dispatching to specialized internal GraphStage implementations that inline-drain the RHS after the left side completes.

Changes:

  • Extend internalConcat dispatch to use TraversalBuilder.getValuePresentedSource and route supported source types to new lightweight concat stages.
  • Add new internal stages: IterableConcat, JavaStreamConcat, RepeatConcat, FutureConcat, FailedConcat.
  • Add directional tests in FlowConcatSpec to assert the optimized stages are selected and produce expected output/closure behavior.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala Adds value-presented-source dispatch in internalConcat to select optimized concat stages.
stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala New stage to drain iterable/iterator/range RHS after upstream completion.
stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala New stage to drain Java-stream RHS and close the underlying BaseStream via postStop.
stream/src/main/scala/org/apache/pekko/stream/impl/RepeatConcat.scala New stage to emit repeated element after upstream completion.
stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala New stage to emit/fail from a future RHS after upstream completion.
stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala New stage to fail the stream from a failed RHS after upstream completion.
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala Adds tests asserting optimized stage selection and behavior for each value-presented RHS type.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala Outdated
Comment thread stream/src/main/scala/org/apache/pekko/stream/impl/IterableConcat.scala Outdated
Comment thread stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowConcatSpec.scala Outdated
Comment thread stream/src/main/scala/org/apache/pekko/stream/impl/FailedConcat.scala Outdated
Comment thread stream/src/main/scala/org/apache/pekko/stream/impl/FutureConcat.scala Outdated
Comment thread stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamConcat.scala Outdated
He-Pin added a commit to He-Pin/incubator-pekko that referenced this pull request May 20, 2026
Motivation:
PR apache#2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concat(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions it throws happen at
  materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  apache#2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concat(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream
  `BaseStream.close()` on both exhaustion and downstream cancellation
  (`take(2)` mid-stream), pending future resolved with `Success(null)`
  treated as completion, pending future resolved with `Failure(_)`
  failing the stream, eager failure for `Source.never.concat(Source.failed(ex))`,
  eager failure for `Source.never.concat(Source.future(pendingPromise))`
  when the promise later fails, and the existing `eager` / `concatLazy`
  parity matrix.

Result:
The eager and lazy concat operators take the same value-presented fast
path as apache#2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
failed / future eager-failure timing matches the existing concat-graph
path so inputs that never complete cannot hang; the pending-future null
path is consistent with `Source.future` and the flatten inflight path.
All FlowConcat suites pass, plus all FlattenMerge / FlatMapConcat
regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR apache#2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR apache#2978 (this work)
@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch from f2164c2 to fa3597a Compare May 20, 2026 05:55
@He-Pin He-Pin requested a review from Copilot May 20, 2026 06:01
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

He-Pin added a commit to He-Pin/incubator-pekko that referenced this pull request May 20, 2026
Motivation:
PR apache#2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the eager / lazy `concat` operators, where the second source is often
a `Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

Modification:
- `Flow.internalConcat` (Scala DSL): when `TraversalBuilder.getValuePresentedSource`
  recognises a known kind, fuse with a dedicated lightweight stage instead
  of `Concat`. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `RangeConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`. `Source.empty` short-
  circuits to the upstream Source unchanged. The inlined source's wrapping
  attributes (from `Source.foo(...).withAttributes(...)`) are carried over
  to the optimized stage via `addAttributes(other.traversalBuilder.attributes)`
  so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
  or stage name still applies on the fast path — preserving behavioural
  parity with the substream-materializing concatGraph path.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concat(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions from `open()` happen
  at materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  apache#2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concat(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- `IterableConcat` reads `inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
  and applies it on iterator-creation / iterator-iteration failure,
  mirroring `IterableSource` / `IteratorSource` semantics. Without this,
  a Resume / Restart decider attached to the inlined source via
  `withAttributes(supervisionStrategy(...))` would silently degrade to
  Stop on the optimized path. Restart re-invokes the iterator factory and
  resumes emission with a fresh iterator.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `RangeConcat`, `IteratorConcat`, `JavaStreamConcat`,
  `RepeatConcat`, `FailedConcat`, `FutureConcat`), Java-stream
  `BaseStream.close()` on both exhaustion and downstream cancellation
  (`take(2)` mid-stream), pending future resolved with `Success(null)`
  treated as completion, pending future resolved with `Failure(_)`
  failing the stream, eager failure for `Source.never.concat(Source.failed(ex))`,
  eager failure for `Source.never.concat(Source.future(pendingPromise))`
  when the promise later fails, and supervision parity for the
  `IterableConcat` fast path: Resume-skips-throwing-element, Restart-
  recreates-iterator-after-throw, and default-Stop-fails. Existing
  `eager` / `concatLazy` parity matrix is preserved.

Result:
The eager and lazy concat operators take the same value-presented fast
path as apache#2977's flatten operators. No substream is materialized for the
inlined kinds; the Java-stream resource is closed deterministically; the
failed / future eager-failure timing matches the existing concat-graph
path so inputs that never complete cannot hang; the pending-future null
path is consistent with `Source.future` and the flatten inflight path;
and supervision deciders attached to inlined iterables / iterators flow
through to the optimized stage, so `Source.fromIterator(...).withAttributes(...)`
behaves identically whether dispatched through this fast path or through
the substream path. All FlowConcat suites pass, plus all FlattenMerge /
FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *FlattenMerge* *FlatMapConcat*
References: PR apache#2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR apache#2978 (this work)
@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch from fa3597a to dd8e3c6 Compare May 20, 2026 07:14
@He-Pin
Copy link
Copy Markdown
Member Author

He-Pin commented May 20, 2026

Follow-up on Comment 1 (getValuePresentedSource drops attributes): force-pushed dd8e3c6e2a with a proper fix.

What changed

Flow.internalConcat now propagates the inlined source's wrapping attributes (other.traversalBuilder.attributes) onto the new optimized stage via addAttributes(...), so anything the user attached via .withAttributes(...) on the inlined source — SupervisionStrategy, dispatcher hint, log level, stage name — survives the fast path.

The only stage where this is observably load-bearing is IterableConcat (covers IterableSource / IteratorSource / RangeSource). Its createLogic now reads inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider and mirrors the exact IteratorSource Resume / Restart / Stop semantics:

  • Stop (default decider): failStage(ex)
  • Resume: skip the throwing element, continue iteration via tryPushNextOrComplete()
  • Restart: re-invoke the iterator factory (createIterator()) and resume emission with the fresh iterator

JavaStreamSource itself does not honour supervision (it propagates iterator throws via the framework default), so JavaStreamConcat matches that behaviour and does not need decider logic. The other optimized stages (SingleConcat, RepeatConcat, FailedConcat, FutureConcat) cannot throw mid-emission, so supervision is moot for them — but they still receive the source attributes for cosmetic / dispatcher consistency.

Directional tests added in FlowConcatSpec (run on both eager and concatLazy matrices):

  • Source.fromIterator(...).withAttributes(supervisionStrategy(resumingDecider)) — Resume skips the throwing element; expect Seq(0, 1, 2, 4, 5) for an iterator that throws on 3.
  • Source.fromIterator(factory).withAttributes(supervisionStrategy(restartingDecider)) — Restart re-invokes the factory; expect emission to continue with the fresh iterator.
  • Default decider (Stop) — failure propagates as before.

Each test asserts dispatch via traversalBuilder.pendingBuilder.toString should include("IterableConcat") to guarantee the optimized path is exercised.

Note on #2977

The same gap (attributes dropped during getValuePresentedSource unwrap) exists in FlattenConcat / FlattenMerge paths in the already-merged #2977. Happy to open a follow-up that applies the same attribute-propagation + decider-aware handling to InflightIteratorSource if maintainers want it tracked separately — kept out of this PR to minimise scope.

All FlowConcat suites (85 tests) pass, FlattenMerge / FlatMapConcat regression tests pass, MiMa is clean.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Comment thread stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala Outdated
Motivation:
PR apache#2977 added value-presented source dispatch for FlattenConcat /
FlattenMerge so that materializing a substream is skipped when the inner
source already carries its element(s) inline. The same opportunity exists
for the lazy `concat` operator, where the second source is often a
`Source.single`, `Source.future`, an iterable, a range, a Java stream,
`Source.repeat`, or a failed source. Materializing a fan-in graph for any
of these is pure overhead.

`concat` (the eager / detached variant) intentionally retains the
substream-materializing path so that `Concat(_, detachedInputs = true)`
+ `Detacher` semantics — eager pull at materialization, one-element
prefetch buffer, deadlock-breaking for cyclic graphs — are preserved.

Modification:
- `Flow.internalConcat` (Scala DSL): when `detached = false` (i.e.
  `concatLazy`) and `TraversalBuilder.getValuePresentedSource` recognises
  a known kind, fuse with a dedicated lightweight stage instead of
  `Concat`. When `detached = true` (i.e. `concat`), fall through to
  `concatGraph` to keep the eager-pull / prefetch / cycle-deadlock
  contract intact. New stages live under `pekko.stream.impl`:
  `SingleConcat`, `IterableConcat`, `JavaStreamConcat`, `RepeatConcat`,
  `FailedConcat`, `FutureConcat`. `Source.empty` short-circuits to the
  upstream Source unchanged. The inlined source's wrapping attributes
  (from `Source.foo(...).withAttributes(...)`) are carried over to the
  optimized stage via `addAttributes(other.traversalBuilder.attributes)`
  so any user-supplied `SupervisionStrategy`, dispatcher hint, log level,
  or stage name still applies on the fast path.
- `FailedConcat` calls `failStage(failure)` from `preStart` to mirror
  `FailedSource.preStart` semantics. The existing `concatGraph` path
  materialises `FailedSource` as a substream that fails immediately and
  `Concat`'s `onUpstreamFailure` propagates eagerly; keeping the same
  eager-failure timing avoids hangs on inputs that never complete (e.g.
  `Source.never.concatLazy(Source.failed(ex))`).
- `JavaStreamConcat` opens the underlying `BaseStream` in `preStart` to
  match `JavaStreamSource.preStart`. Side effects of `open()` (file /
  network resource acquisition) and any exceptions from `open()` happen
  at materialization time, matching the existing `concatGraph` path. The
  stream is closed in `postStop` so cancellation, exhaustion, iterator
  failure, and stage failure all release the resource (`stream` typed
  `BaseStream[E, _]` to avoid a recursive bound on Scala 3, matching
  apache#2977's fix in `5d03002142`).
- `FutureConcat` registers the async callback in `preStart`, mirroring
  `FutureSource.preStart`: a pending-future failure surfaces eagerly via
  `failStage(ex)` even while upstream is still active (otherwise
  `Source.never.concatLazy(Source.future(failingFuture))` would hang). A
  successful value is buffered in `futureResult` and emitted only after
  `onUpstreamFinish`, preserving concat ordering. `Future.value` is used
  for the already-completed fast path. The pending path swaps the `out`
  handler to a no-op while waiting for the callback so we don't pull the
  now-closed `in`. `Success(null)` calls `completeStage()` (no null
  forwarded), staying in lock-step with `Source.future(Future.successful(null))`
  rerouting to `Source.empty` and `InflightSources.hasFutureElement`
  treating `Success(null)` as no-element. `DO NOT CHANGE` comments
  capture both invariants.
- `IterableConcat` reads `inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider`
  and applies it on iterator-creation / iterator-iteration failure,
  mirroring `IterableSource` / `IteratorSource` semantics. Without this,
  a Resume / Restart decider attached to the inlined source via
  `withAttributes(supervisionStrategy(...))` would silently degrade to
  Stop on the optimized path. Restart re-invokes the iterator factory and
  resumes emission with a fresh iterator.
- Tests: `FlowConcatSpec` covers each optimized dispatch (`SingleConcat`,
  `IterableConcat`, `IteratorConcat`, `JavaStreamConcat`, `RepeatConcat`,
  `FailedConcat`, `FutureConcat`), Java-stream `BaseStream.close()` on
  both exhaustion and downstream cancellation (`take(2)` mid-stream),
  pending future resolved with `Success(null)` treated as completion,
  pending future resolved with `Failure(_)` failing the stream, eager
  failure for `Source.never.concatLazy(Source.failed(ex))`, eager failure
  for `Source.never.concatLazy(Source.future(pendingPromise))` when the
  promise later fails, supervision parity for the `IterableConcat` fast
  path (Resume-skips, Restart-recreates, default-Stop-fails), and the
  detached-gating: a directional test asserts the fast path is not taken
  for `concat` (detached = true) and another asserts that `concat`
  preserves Detacher's eager pull side-effect timing for an
  `IteratorSource` factory while `concatLazy` defers it. Existing eager
  / concatLazy parity matrix is preserved by gating each
  `pendingBuilder.toString` assertion on `!eager`.

Result:
The lazy concat operator takes the same value-presented fast path as
apache#2977's flatten operators while the eager / detached `concat` keeps its
documented Detacher semantics. No substream is materialized for the
inlined kinds on the lazy path; the Java-stream resource is closed
deterministically; the failed / future eager-failure timing matches the
existing concat-graph path so inputs that never complete cannot hang;
the pending-future null path is consistent with `Source.future` and the
flatten inflight path; supervision deciders attached to inlined
iterables / iterators flow through to the optimized stage; and the
detached-true contract is preserved bit-for-bit by falling through to
`concatGraph`. All FlowConcat suites pass, plus all FlattenMerge /
FlatMapConcat regression tests.

Tests: stream-tests/testOnly *FlowConcat*Spec *Concat* *FlattenMerge* *FlatMapConcat*
References: PR apache#2977 (value-presented optimization for FlattenConcat /
FlattenMerge); PR apache#2978 (this work)
@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch from dd8e3c6 to 8a462ef Compare May 20, 2026 09:27
@He-Pin
Copy link
Copy Markdown
Member Author

He-Pin commented May 20, 2026

@pjfanning I think this one should be ready now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants