Orchestrator: Operator for Composing Per-Item Observables#1114
Orchestrator: Operator for Composing Per-Item Observables#1114dwcullop wants to merge 33 commits into
Conversation
Added a new test `FilterObservableError_PropagatesDownstream` to ensure error propagation in `FilterOnObservableFixture`. Introduced `AggregateManyExtensions` class to provide the `AggregateMany` operator for aggregating changesets. Refactored `FilterOnObservable` to utilize the new `AggregateMany` operator, removing the `FilterProxy` implementation. Adjusted copyright header for consistency. Replaced the old `AggregateManyExtensions` with a new implementation.
Renamed `onSource` to `onSourceChangeSet` in `FilterOnObservable` and `AggregateManyExtensions` for better clarity. Refactored `GroupOnObservable` and `TransformOnObservable` to use `Observable.Using` and `Observable.Defer` with `AggregateMany`, removing the `Subscription` class to simplify code and improve efficiency.
Replaces inheritance-based CacheParentSubscription with composition-based ICacheOrchestrator + Orchestrator runner. Per-subscription state is owned by orchestrator instances; runtime context (Track/Serialize/ScheduleEmit) is injected via Initialize on a fresh Subscription created inside each Observable.Create call, so multi-subscribe correctness is preserved. Primitive - ICacheOrchestrator / ICacheOrchestratorContext (Cache.Internal) - Orchestrator<TSource,TKey,TInner,TResult>.Run() wraps Observable.Create - IntObservableCacheEx.OrchestrateMany family of extension methods plus AsCacheOrchestrator lambda adapter - Specialized helpers OrchestrateManyChanges (ChangeAwareCache mirror), OrchestrateManyMerged (cache->cache merge), and OrchestrateManyMergedList (cache->list merge) Migrated operators - FilterOnObservable, TransformOnObservable, GroupOnObservable - MergeManyCacheChangeSets, MergeManyCacheChangeSetsSourceCompare, MergeManyListChangeSets (Cache/Internal/) - TransformManyAsync (showcase for Serialize hook) - AutoRefresh (uses ScheduleEmit for shared-grid buffered refresh batching; source pass-through preserved verbatim; refreshes suppressed for keys the source has touched in the same drain, fixes reactivemarbles#1099) Behavior changes (for next major version) - AutoRefresh and AutoRefreshOnObservable now propagate reevaluator exceptions (sync throw and OnError) instead of silently swallowing them - AutoRefresh now throws ArgumentNullException for null propertyAccessor / reevaluator at composition time instead of throwing NRE on first event - FilterOnObservable with non-null buffer now uses quiescence-based Pub+Buf+Throttle+Amb instead of fixed-grid Buffer; same coalescing, zero idle allocations - AutoRefresh no longer emits Refresh changes for keys touched by source in the same drain (eliminates redundant Add+Refresh and prevents Add+Remove+Refresh) Other fixes - Removed over-strict debug assertion in SuspendNotifications (functional state unaffected, only DEBUG-mode race window flagged) - Deterministic seed in TransformAsyncFixture.RemoveFlowsToTheEnd (replaces Random.Shared) - Cross-cache deadlock fixture ported to OrchestrateManyFixture Deletes - CacheParentSubscription.cs and CacheParentSubscriptionFixture.cs
Adds an optional CacheChangeHandlerBase abstract class that implements ICacheOrchestrator.OnSourceChangeSet by dispatching each Change to one of five protected virtual hooks (OnItemAdded/Updated/Removed/Refreshed + OnChangeSetProcessed). Orchestrators that inherit it stop hand-rolling the foreach+switch over change.Reason. Migrated to the base class: - ChangesOrchestrator (OrchestrateManyChanges) - MergedOrchestrator (OrchestrateManyMerged) - MergedListOrchestrator (OrchestrateManyMergedList) - TransformManyAsync.Orchestrator - AutoRefresh.UnbufferedOrchestrator + AutoRefresh.BufferedOrchestrator Removed ICacheOrchestratorContext.ScheduleEmit. Only AutoRefresh used it, and the buffered path now uses Subject<Change>.Buffer routed through Context.Serialize to keep all activity under the shared queue without needing a dedicated 'schedule a future drain' primitive. AutoRefresh.WithBuffering behavior change: - Now dedups Refresh changes within each buffer window (one Refresh per key per window), matching Jake's choice in the upstream WIP. - Source events and refreshes remain independent (refreshes for keys source touched in the same window are still emitted). AutoRefresh.WithoutBuffering preserves the reactivemarbles#1099 fix: refreshes for keys the source touched in the same drain cycle are suppressed. All 2386 tests pass, 0 skipped.
# Conflicts: # src/DynamicData/Cache/ObservableCacheEx.cs
Capture the downstream observer (as a DeliverySubQueue) at Initialize time instead of passing it to Emit per drain. The orchestrator now calls Emitter.OnNext/OnError/OnCompleted directly under the queue gate; serialization, post-termination drop, and reentrant drain are all handled by the sub-queue. ICacheOrchestrator becomes (Initialize(context, emitter), OnSourceChangeSet, OnInner, OnDrainComplete). Emit(observer) is gone. Renamed CacheChangeHandlerBase -> OrchestratorCacheChangeBase. Eliminated from the runtime: _hasTerminated, TerminalError helper, direct calls to the raw downstream observer. Eliminated from orchestrators: _pendingSource accumulator (both AutoRefresh orchestrators), _shouldFlushBuffered + _bufferedRefreshes (BufferedOrchestrator), OnChangeSetProcessed virtual (no remaining consumers). Net diff: +106 / -281 lines across the orchestrator surface. All 2386 tests pass.
Replace the quiescence-based throttle+amb scheme with a simpler leading-edge buffer: Publish + Buffer(() => Take(1).Delay(window, scheduler)) routed through the existing public FlattenBufferResult helper. Behaviour: idle streams allocate nothing (no timer scheduled until an actual emission arrives). Bursts still emit at most every 'window' interval. The throttle-based settle window is gone; if a caller relied on early flushing during quiet periods, they will now wait the full window. Net wire-output is the same set of merged changesets, just at slightly different timing.
Modified the buffering logic in the `OnInner` method of the `AutoRefresh` class. Changed from using `_refreshes.Buffer(_window, _scheduler)` to `_refreshes.Buffer(() => _refreshes.Take(1).Delay(_window, _scheduler))`. This update introduces a dynamic window for buffering events, potentially improving the timing and responsiveness of the refresh batching mechanism.
Collapse UnbufferedOrchestrator and BufferedOrchestrator into a single Orchestrator. Source changes still flow to the emitter immediately in both modes; refreshes accumulate in a per-key dictionary (latest value wins) and flush either at drain end (no buffer) or via a SerialDisposable-wrapped Timer armed by the first pending refresh (buffered). Pending refreshes are dropped when the source Updates or Removes the same key, so a Refresh carrying a value the source has already superseded is never emitted. The reactivemarbles#1099 source-touched suppression now applies to the buffered path as well, removing the sibling-bug between the two modes. Orchestration runtime now disposes orchestrators that implement IDisposable, fixing a pre-existing leak where the buffered orchestrator's Subject and buffer subscription survived subscription teardown. Adds 6 regression tests in AutoRefreshOnObservableFixture covering each scenario (Remove drops pending refresh, Update replaces it, multi-Update keeps only the latest) for both buffer modes.
Asserts that when an Update lands inside an armed window and the new instance's reevaluator never emits, no Refresh ever surfaces. Pins the 'pending empty disarms the timer' branch in isolation, separate from the existing test that follows up with a v2 emission.
The mid-test assertion in ChangeSetBufferIsGiven_UpdateDuringWindow_RefreshEmittedForNewInstance already proves the v1-armed timer is disarmed by the Update (asserts BeEmpty at the original v1 window boundary). A disarmed timer cannot spontaneously fire later, so the separate quiet-after-update case is transitively covered.
Both operators were hand-rolling the per-key inner subscription, completion counter, and queue plumbing that the Orchestrator runtime already provides. The orchestrator version eliminates the StrongBox<int> counter, the Concat(Observable.Never) hack to suppress source completion, and the manual Finally(CheckCompleted) bookkeeping on each inner subscription. Inner-observable errors now terminate the merged stream, matching the standard Rx contract. The previous behavior swallowed inner errors and treated them as completion. Two tests that asserted the swallow are updated to assert the propagation contract; consumers that want the old behavior can wrap their selector output with .Catch(Observable.Empty<...>()). OrchestratorCacheChangeBase.OnDrainComplete is now virtual with an empty default. Orchestrators that have no per-drain coalescing work no longer need to override it.
C1 (CRITICAL): TransformManyAsync and other aggregating orchestrators intermittently dropped concurrent inner emissions. When OnDrainComplete called tracker.EmitChanges(Emitter), the resulting Emitter.OnNext triggered a reentrant DrainPending that delivered the emitter item AND drained any concurrent inner items that had arrived during the brief lock release in DeliverStaged. Those items added to the tracker via OnInner but were never flushed: the outer DrainAll loop saw active bits empty and exited without firing _onDrainComplete again. Fix: ChangeSetMergeTracker.EmitChanges (and the List variant + DynamicGrouper) now return bool indicating whether they emitted. Every aggregating orchestrator loops in OnDrainComplete until the tracker reports nothing left to emit. C2 (CRITICAL): Buffered AutoRefresh dropped pending refreshes when the source completed before the buffer timer fired. The Timer subscription was registered via Context.Serialize, which does not participate in the orchestrator's completion counter; source completion could race ahead and fire Emitter.OnCompleted while a refresh sat in the pending dictionary unflushed. Fix: added ICacheOrchestratorContext.TrackAuxiliary<T>(observable, onNext) that subscribes through the queue AND contributes to completion accounting via Finally(DecrementSubscriptionCount). AutoRefresh.Orchestrator's buffered timer now uses TrackAuxiliary, so completion waits for the timer to fire and flush any pending refresh. H1 (HIGH, latent): The 3-lambda OrchestrateMany overload constructed a single LambdaCacheOrchestrator eagerly. Each subscription to the returned observable called Initialize on the same orchestrator instance, overwriting _context and _emitter from earlier subscribers. Current consumers wrapped the call site in Observable.Defer/Create so the bug was latent, but the primitive was a trap. Fix: wrapped the lambda overload in Observable.Defer so each subscription constructs a fresh orchestrator. Adds three regression tests covering the reentrant-drain hazard, the multi-subscribe trap, and the buffered completion-flush case. Full suite passes 2402/2402; the previously-flaky TransformManyAsyncFixture test now passes 30/30.
Three new tests in OrchestrateManyFixture: InnerError_PropagatesAndTerminatesStream - asserts that an OnError from a tracked inner observable terminates the merged stream with the same error and that completion does not also fire. SourceAlreadyCompleted_PropagatesCompletion - asserts that subscribing to an Observable.Empty source through the orchestrator immediately completes the downstream stream. SourceAlreadyErrored_PropagatesError - asserts that subscribing to an Observable.Throw source propagates the error through the orchestrator.
The previous fix added TrackAuxiliary to ICacheOrchestratorContext so that the buffered AutoRefresh timer subscription could participate in completion accounting. That polluted the orchestrator context with a method only one consumer needed. Replace it with a parameter on the existing OnDrainComplete hook: the runtime snapshots its completion latch BEFORE calling the orchestrator and passes the result as bool sourcesCompleted. AutoRefresh's buffered branch now flushes pending refreshes synchronously when sourcesCompleted is true, so the refresh surfaces before the runtime fires OnCompleted downstream and the timer subscription becomes purely opportunistic (no completion accounting needed). All ICacheOrchestrator implementations and OrchestratorCacheChangeBase updated for the new signature; consumers that did not need the flag simply ignore it. The lambda OrchestrateMany overload's onDrainComplete callback signature is unchanged (none of its current consumers need the flag). C2 regression test rewritten to match the new semantics: source + reevaluator completion now triggers an immediate synchronous flush rather than waiting for the buffer timer; the test verifies the refresh surfaces, completion happens immediately, and the timer is cancelled (no duplicate refresh at the original window boundary). Tests: 2405/2405 pass. TransformManyAsync flake check 30/30 pass.
Change OrchestrateMany to accept Func<context, emitter, ICacheOrchestrator> instead of a pre-constructed orchestrator instance. The runtime invokes the factory once per subscription, passing the per-subscription context and downstream emitter; the orchestrator captures both as readonly constructor params and is fully initialized when its first method is called. Eliminates: - Initialize method on ICacheOrchestrator (one fewer interface method) - The 'null!' partial-init zone in every orchestrator (_context, _emitter fields no longer needed) - The Observable.Defer wrapper around the lambda OrchestrateMany overload (factory pattern makes per-subscription instances structurally required, so H1's multi-subscribe corruption hazard cannot recur) - The redundant Observable.Create wrappers in MergeMany, MergeManyItems, TransformManyAsync, OrchestrateManyMerged, OrchestrateManyMergedList, OrchestrateManyChanges Run() methods (OrchestrateMany already creates per-subscription state) OrchestratorCacheChangeBase is now a primary-constructor abstract class that forwards context/emitter via constructor chaining. Derived orchestrators (Orchestrator inside AutoRefresh/MergeMany/MergeManyItems/TransformManyAsync, MergedOrchestrator, MergedListOrchestrator, ChangesOrchestrator, LambdaCacheOrchestrator, and the test TestOrchestrator) take context and emitter as their first two constructor parameters and chain to the base. Test orchestrator construction in OrchestrateManyFixture is wrapped by a Wire(...) helper that returns the observable plus a Func<TestOrchestrator> thunk; tests call Wire then subscribe to capture the per-subscription orchestrator instance. Tests: 2405/2405 pass. TransformManyAsync flake check 30/30 pass.
The C1 fix previously required every orchestrator's OnDrainComplete to loop until its tracker reported empty. The hazard was actually a queue-level lifecycle gap: when OnDrainComplete called Emitter.OnNext, ExitLockAndDrain took the reentrant path and drained items inline but left no trace; the outer DrainAll then checked _activeBits, saw it empty, and exited without re-firing OnDrainComplete. State accumulated during the reentrant drain (e.g. ChangeSetMergeTracker entries) was stranded until the next unrelated drain. Move the fix to SharedDeliveryQueue: set a _drainReentered flag in the reentrant path; reset it before each OnDrainComplete callback; loop back in DrainAll when either _activeBits.HasAny() OR _drainReentered. The flag is only touched by the drain thread, so no synchronization is needed. Removes: - bool return from ChangeSetMergeTracker.EmitChanges (Cache + List) - bool return from DynamicGrouper.EmitChanges - per-consumer while-loops in AutoRefresh, TransformManyAsync, OrchestrateManyMerged, OrchestrateManyMergedList, OrchestrateManyChanges, TransformOnObservable, GroupOnObservable Every orchestrator's OnDrainComplete now emits at most once per call; the queue re-calls them whenever a reentrant drain may have updated their state. Tests: 2405/2405 pass. TransformManyAsync flake check 30/30 pass.
Rename: OnDrainComplete(bool sourcesCompleted) -> OnDrainComplete(bool isFinal). The new name is action-oriented (signals to the orchestrator that this is the last chance to emit before downstream OnCompleted) rather than implementation-coupled (the old name referenced the runtime's _isCompleted latch mechanism). XML doc updated to make the meaning explicit. Add two missing contract tests: 1. OrchestrateManyFixture.OnDrainComplete_IsFinalIsFalseUntilSourceAndAllInnersComplete - Verifies isFinal=false on every call while source and tracked inners are still active, that source completion alone does not flip isFinal while an inner subscription remains, and that isFinal=true on at least one call after the last inner completes (followed by downstream OnCompleted). 2. SharedDeliveryQueueFixture.OnDrainComplete_RefiresAfterReentrantDrainTriggeredByCallback - Focused regression test for the queue-level C1 fix: when the onDrainComplete callback enqueues an item onto a sub-queue, the reentrant drain processes the item but the outer DrainAll must re-fire onDrainComplete because of the _drainReentered flag. Red-green verified: fails when _drainReentered check is removed; passes when restored. Tests: 2407/2407 pass (+2 new). TransformManyAsync flake check 30/30 pass.
Bundles eight design fixes from the orchestrator review: (1+2) Counter-based final check + CAS-latched completion: OnDrainComplete derives isFinal from the live subscription counter rather than a separate _isCompleted latch, and uses a one-shot CAS on _completionEmitted to guarantee exactly-one OnCompleted. Fixes a race where an orchestrator that called Track during OnDrainComplete(isFinal=true) would have the new inner's emissions silently dropped because the latch stayed set. (3) Factory exception safety: the factory call is wrapped in try/catch that disposes the already-allocated queue and emitter before rethrowing, so a faulting orchestrator constructor doesn't leak the runtime's resources. (4) Disposal order: source and inner subscriptions are disposed before the queue, so source pumps cannot keep firing into a terminating queue. (8) Documented Serialize lifetime semantics: a remarks block on ICacheOrchestratorContext.Serialize calls out explicitly that Serialize does NOT participate in completion accounting (unlike Track), preventing future TrackAuxiliary-style confusion. (9) Generic devirtualization: Orchestration and OrchestrateMany take a TOrch type parameter so dispatch sites (OnSourceChangeSet, OnInner, OnDrainComplete) call the concrete sealed class through a non-interface field and the JIT can devirtualize. All internal call sites updated to specify TOrch explicitly (older C# language modes can't infer it from the lambda body). (11) Track + Untrack split: the nullable observable parameter on Track is gone; Untrack(key) is now the explicit way to remove a registration. KeyedDisposable.Add doc clarified (Add has replace-or-add semantics). (13) wasReentrant flag on OnDrainComplete: the SharedDeliveryQueue callback now passes the captured _drainReentered flag through, and ICacheOrchestrator.OnDrainComplete(bool isFinal, bool wasReentrant) exposes it for advanced consumers. Most orchestrators ignore it. (14) Named SourceSubscriptionWeight constant replaces the magic 1 in _subscriptionCounter initialization. API rename: the 3-lambda convenience overload is now OrchestrateLambdas (not OrchestrateMany) to avoid generic inference conflict with the factory overload. GroupOnObservable and TransformOnObservable updated accordingly. Tests: 2407/2407 pass.
Adds the level-1 (in-isolation) test pattern complementing the existing level-2 (end-to-end) fixtures. Each orchestrator can now be tested two ways: via OrchestrateMany through the full SharedDeliveryQueue runtime (already covered by existing fixtures like AutoRefreshOnObservableFixture, TransformManyAsyncFixture), or directly using FakeOrchestratorContext to drive method calls and assert on the orchestrator's contract with its context (Track/Untrack calls) and emit policy. New isolation fixtures: - MergeManyOrchestratorFixture (4 tests): Add-tracks, Remove-untracks, OnInner-passthrough, OnDrainComplete-is-noop - MergeManyItemsOrchestratorFixture (2 tests): ItemWithValue pairing, Untrack on Remove - TransformManyAsyncOrchestratorFixture (4 tests): tracking, untrack, drain-emits-tracker, transformer-throws-routes-through-error-handler - MergedOrchestratorFixture (2 tests): Add-tracks-and-routes, Remove-untracks - MergedListOrchestratorFixture (2 tests): list parity - ChangesOrchestratorFixture (3 tests): onSourceChange per change, onInner routes, Untrack on Remove - LambdaCacheOrchestratorFixture (3 tests): forwarding contract for the 3-lambda overload - AutoRefreshOrchestratorFixture (5 tests): unbuffered drain flush, source-touched suppression, buffered defers to timer, buffered+isFinal flushes synchronously, Remove drops pending refresh Required visibility bumps from private to internal on 8 nested orchestrator classes (still inside internal enclosing types, so no expanded public surface). Supporting infrastructure: - FakeOrchestratorContext<TKey, TInner>: in-memory ICacheOrchestratorContext that records Track/Untrack calls without subscribing to observables (Serialize returns the observable as-is). Tests must call OnInner manually. - CollectingObserver<T>: minimal IObserver<T> that records values + terminal state. Tests: 2432/2432 pass (+25). TransformManyAsync flake check 30/30 pass.
Four changes: 1. SizeLimitFixture.OnCompleteIsInvokedWhenSourceIsDisposed: re-skip with the original message. The test is unrelated to this branch's changes (LimitSizeTo was not migrated to the orchestrator) and is racy under full-suite ThreadPool load. The previous commit un-skipped it speculatively; that was a mistake. 2. Orchestration.OrchestratorContext: extend the try/catch to cover all construction steps (CreateQueue, factory, source subscribe), not just the factory call. Without this, an exception from CreateQueue or the source subscribe leaks the queue/emitter/orchestrator/source-subscription because the ctor never completes and Dispose never runs. The catch block tears down everything that was successfully allocated. 3. AutoRefresh.Orchestrator.OnItemRefreshed: also DropPending(key) when the source forwards a Refresh. Without this, a pending refresh from a prior drain (queued by the reevaluator) and the source's Refresh were both emitted to the consumer for the same item, in quick succession. Symmetrical with OnItemUpdated which already drops pending. Regression test added in AutoRefreshOrchestratorFixture. 4. Rename OrchestrateLambdas to an OrchestrateMany overload and reshape the source-change callback. The (changes, track, untrack) shape was messy and could not expose the full ICacheOrchestratorContext (no Serialize, no Untrack-by-token). The new shape is (changes, context) passing the runtime context directly. GroupOnObservable and TransformOnObservable updated. Test fixtures updated.
Major internal refactor to unify orchestration logic for cache operators. Renamed OrchestrateMany to Orchestrate (and related types), updated all usages, and consolidated cache/list merging logic into new OrchestrateChangeSets/OrchestrateManyChangeSets methods. Lambda orchestrators now receive the emitter in onInner. Removed obsolete orchestrator classes and merged related logic. Updated tests and internal docs to match new structure. No public API changes.
Targeted at comments that read like PR narration or restate standard patterns. Kept comments that document non-obvious WHY: lock and disposal ordering, completion-accounting races, subtle Rx contract behavior, and AutoRefresh's pending-refresh drop semantics. Files touched: CacheOrchestration, AutoRefresh, ICacheOrchestrator, ICacheOrchestratorContext, IntObservableCacheEx.Orchestrate / .OrchestrateChangeSets / .OrchestrateManyChangeSets, SharedDeliveryQueue. No behavioral change. Build clean.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new internal “Orchestrate” composition primitive to standardize (and harden) the common DynamicData cache-operator pattern of maintaining 1:1 per-item inner subscriptions, routing all notifications through SharedDeliveryQueue, and performing correct completion/error/disposal accounting. It removes the older CacheParentSubscription base-class pattern and migrates multiple operators to the new orchestrator-driven implementation, alongside substantial new internal test coverage and several AutoRefresh correctness fixes.
Changes:
- Added internal orchestrator contracts + driver (
ICacheOrchestrator*,CacheOrchestration, andIntObservableCacheEx.Orchestrate*) and updatedSharedDeliveryQueueto surface reentrant-drain info. - Migrated per-item-subscription operators (e.g.,
MergeMany,FilterOnObservable,GroupOnObservable,TransformOnObservable,TransformManyAsync, MergeMany*ChangeSets) offCacheParentSubscription. - Expanded/adjusted tests to validate orchestration contracts and un-skipped/fixed prior AutoRefresh-related defects.
Reviewed changes
Copilot reviewed 41 out of 41 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/DynamicData/Internal/SharedDeliveryQueue.cs | Adds reentrant-drain tracking and passes reentrancy info into a drain-complete callback. |
| src/DynamicData/Internal/KeyedDisposable.cs | Clarifies Add semantics in XML docs (replace-or-add and disposal behavior). |
| src/DynamicData/Internal/CacheParentSubscription.cs | Removes the legacy parent/child subscription primitive. |
| src/DynamicData/Cache/ObservableCacheEx.MergeManyChangeSets.cs | Routes MergeManyChangeSets through the new orchestration primitives; inlines source-priority helper types. |
| src/DynamicData/Cache/ObservableCacheEx.AutoRefreshOnObservable.cs | Adds missing null-argument validation for the reevaluator overload. |
| src/DynamicData/Cache/ObservableCacheEx.AutoRefresh.cs | Adds missing null-argument validation for propertyAccessor. |
| src/DynamicData/Cache/ObservableCache.cs | Removes an assertion in SuspendNotifications. |
| src/DynamicData/Cache/Internal/TransformOnObservable.cs | Replaces CacheParentSubscription implementation with Orchestrate + per-drain ChangeAwareCache flush. |
| src/DynamicData/Cache/Internal/TransformManyAsync.cs | Replaces CacheParentSubscription implementation with an orchestrator-based implementation and merge tracker. |
| src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs | Switches to OrchestrateManyChangeSets (list-shape overload). |
| src/DynamicData/Cache/Internal/MergeManyItems.cs | Reimplements via Orchestrate (per-key tracking + forwarding). |
| src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs | Removes the legacy wrapper implementation (now handled via orchestrator path). |
| src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs | Removes the legacy wrapper implementation (now handled via orchestrator path). |
| src/DynamicData/Cache/Internal/MergeMany.cs | Reimplements via Orchestrate (per-key tracking + forwarding). |
| src/DynamicData/Cache/Internal/IntObservableCacheEx.OrchestrateManyChangeSets.cs | New internal helper: orchestration specialization for merging child cache/list changesets. |
| src/DynamicData/Cache/Internal/IntObservableCacheEx.OrchestrateChangeSets.cs | New internal helper: orchestration specialization for “mirror manipulator” cache shapes via ChangeAwareCache. |
| src/DynamicData/Cache/Internal/IntObservableCacheEx.Orchestrate.cs | New internal entry points: typed-factory and lambda overloads for orchestrating source + inner observables. |
| src/DynamicData/Cache/Internal/ICacheOrchestratorContext.cs | New orchestrator runtime context API (Track, Untrack, Serialize). |
| src/DynamicData/Cache/Internal/ICacheOrchestrator.cs | New orchestrator contract interface (OnSourceChangeSet, OnInner, OnDrainComplete). |
| src/DynamicData/Cache/Internal/GroupOnObservable.cs | Migrates GroupOnObservable to Orchestrate (with DynamicGrouper lifetime managed via Observable.Using). |
| src/DynamicData/Cache/Internal/FilterOnObservable.cs | Migrates FilterOnObservable to OrchestrateChangeSets and updates buffering strategy. |
| src/DynamicData/Cache/Internal/CacheOrchestratorBase.cs | Adds a dispatching base class for per-ChangeReason orchestration. |
| src/DynamicData/Cache/Internal/CacheOrchestration.cs | New driver managing queue, per-key inner subscriptions, completion accounting, and construction cleanup. |
| src/DynamicData/Cache/Internal/AutoRefresh.cs | Rewrites AutoRefresh using an orchestrator with pending-refresh coalescing and buffering fixes. |
| src/DynamicData.Tests/Utilities/FakeOrchestratorContext.cs | New test utility for in-isolation orchestrator testing. |
| src/DynamicData.Tests/Utilities/CollectingObserver.cs | New test utility to collect values/terminal state for in-isolation tests. |
| src/DynamicData.Tests/Internal/TransformManyAsyncOrchestratorFixture.cs | New in-isolation tests for the TransformManyAsync orchestrator. |
| src/DynamicData.Tests/Internal/SharedDeliveryQueueFixture.cs | Adds regression coverage for reentrant drain + onDrainComplete behavior. |
| src/DynamicData.Tests/Internal/OrchestrateFixture.cs | New orchestration contract tests (serialization, completion counting, deadlock proof, reentrancy). |
| src/DynamicData.Tests/Internal/MergedOrchestratorFixture.cs | New in-isolation tests for the cache-shape merged orchestrator. |
| src/DynamicData.Tests/Internal/MergedListOrchestratorFixture.cs | New in-isolation tests for the list-shape merged orchestrator. |
| src/DynamicData.Tests/Internal/LambdaCacheOrchestratorFixture.cs | New in-isolation tests for the lambda orchestrator adapter. |
| src/DynamicData.Tests/Internal/ChangeSetOrchestratorFixture.cs | New in-isolation tests for the ChangeAwareCache-based orchestrator. |
| src/DynamicData.Tests/Internal/CacheParentSubscriptionFixture.cs | Removes legacy primitive tests (now replaced by OrchestrateFixture). |
| src/DynamicData.Tests/Internal/AutoRefreshOrchestratorFixture.cs | New in-isolation tests for AutoRefresh orchestrator buffering + suppression rules. |
| src/DynamicData.Tests/Cache/TransformAsyncFixture.cs | Makes delay randomness deterministic (Bogus Randomizer) and cleans up formatting. |
| src/DynamicData.Tests/Cache/MergeManyWithKeyOverloadFixture.cs | Updates expectations around child-observable error propagation. |
| src/DynamicData.Tests/Cache/FilterOnObservableFixture.cs | Adds coverage for propagating errors from per-item filter observables. |
| src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.WithoutKey.cs | Un-skips reevaluator null-check regression test. |
| src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs | Un-skips/fixes multiple AutoRefreshOnObservable defect tests and adds new buffered-window regression tests. |
| src/DynamicData.Tests/Cache/AutoRefreshFixture.WithPropertyAccessor.cs | Un-skips propertyAccessor null-check regression test. |
Comments suppressed due to low confidence (1)
src/DynamicData/Cache/Internal/IntObservableCacheEx.OrchestrateManyChangeSets.cs:190
- Same double-queue pattern as the cache-shape overload: Serialize(...) is applied before creating ClonedListChangeSet, and then entry.Source is also passed to Track(...), causing nested SynchronizeSafe layers per key. This adds avoidable overhead in hot paths.
}
}
Five fixes from the test review: 1. OrchestrateFixture.Serialization_ParentAndChildDoNotInterleave: was vacuous (only one source push, no child emissions, callLog only contained one start/end pair). Rewritten to race source-side Refresh and child-side OnNext from two threads via Barrier, so the serialization gate actually has work to serialize. 2. OrchestrateFixture.ReentrantDrain: replaced await Task.Delay(50) with SpinWait.SpinUntil(predicate, 5s deadline). Deterministic completion check with a deadlock guard instead of a fixed wait. 3. TransformManyAsyncOrchestratorFixture.TransformerThrows: replaced await Task.Delay(20) with await trackedObs.DefaultIfEmpty(). LastOrDefaultAsync(). The deferred async lambda runs sync (throw + catch + handler-call + return Empty), so the inner completes deterministically. 4. ChangeSetOrchestratorFixture.OnDrainComplete_EmptyChangeSet_ NoEmission: new test verifying the orchestrator's empty-changeset suppression (the if (captured.Count != 0) guard). 5. MergedOrchestratorFixture / MergedListOrchestratorFixture: expanded from 2 tests each to 5/4. Added OnItemUpdated (re-tracks), MultipleKeys (independent tracking), and (cache only) OnItemRefreshed_WithoutReevalOnRefresh (negative path no-op). Did NOT add the positive reevalOnRefresh test: it can't pass in isolation because the orchestrator inspects the per-key entry's internal cache, which is only populated by the entry's source observable (driven by the real CacheOrchestration runtime, not the FakeOrchestratorContext). That path is covered by the existing integration tests in MergeManyChangeSetsCacheFixture. Also: AutoRefreshOrchestratorFixture now uses "using Microsoft.Reactive.Testing;" instead of the fully-qualified TestScheduler name.
…avior change Two issues raised by the PR reviewer: 1. Double queue wrap in OrchestrateManyChangeSets / TransformManyAsync. ChangeSetCache (and ClonedListChangeSet) were being constructed over Context.Serialize(inner), and then entry.Source was passed to Context.Track, which also wraps with SynchronizeSafe. Each per-key inner emission was passing through two SharedDeliveryQueue sub-queues, doubling allocations and queue traffic. Drop the explicit Serialize and let Track do it once. 2. Stale XML docs on MergeMany. With the migration to the orchestrator, errors from per-item observables now terminate the merged stream rather than being swallowed. Updated the docs on MergeMany itself plus the four operators that route through it (WhenAnyPropertyChanged, WhenPropertyChanged, WhenValueChanged, AutoRefreshOnObservable) to describe the new behavior and call out the prior swallowing behavior.
The reviewer's double-wrap concern was a symptom of ChangeSetCache (and ClonedListChangeSet) doing .Do(Cache.Clone) inside their constructor. That forced every caller to pre-serialize the source observable so the mirror stayed coherent with whatever else read it, even when the caller already had its own serialization layer downstream. The orchestrator was having to do it twice: once via Context.Serialize to protect the mirror, and again via Context.Track to dispatch OnInner on the queue thread. Dropping just one wrap raced OnItemRemoved's enumeration of the mirror against concurrent inner emissions. Make both types passive: Source returns the raw observable, and a new Process(changes) method updates the mirror. Each caller now drives the mirror at the point in its pipeline where delivery is already serialized: - Cache and List MergeChangeSets call Process inside the MergeMany Do(). - MergeManyCacheChangeSets and MergeManyListChangeSets do the same. - CacheOrchestration-based operators (MergedOrchestrator, MergedListOrchestrator, TransformManyAsync) call Process in OnInner, on the queue thread, alongside the tracker call. They no longer need Context.Serialize. Single SynchronizeSafe wrap per per-key inner subscription. Mirror mutation happens on the same thread that reads it.
…o-op error forwarders
The list MergeMany was actively swallowing inner errors with an empty
onError handler. Forward the error to the merged observer so children
that fail terminate the merged stream, matching the cache MergeMany
behavior change.
The cache and list MergeChangeSets pipelines had
.Do(static _ => { }, observer.OnError) sitting after the MergeMany
projection. With Rx already propagating inner errors through MergeMany
to the downstream SubscribeSafe, that .Do was a no-op that just looked
like custom error handling. Remove it.
List-side docs updated for MergeMany, WhenAnyPropertyChanged,
WhenPropertyChanged, WhenValueChanged, AutoRefreshOnObservable, and
FilterOnObservable to describe the new propagation contract.
The list MergeManyFixture had a test asserting the old swallow
behavior (MergedStreamCompletesIfLastItemFails). Replaced with
MergedStreamFailsIfChildFails, matching the cache fixture's equivalent.
…hangeSets for list Completes the parity with cache operators from PR reactivemarbles#1114: - MergeManyItems: per-item observable emissions paired with the source item via ItemWithValue<T, TDest>. - TransformManyAsync: async transformer producing a child list changeset per source item; merged into a single output. Built on Task.FromAsync + SelectMany to integrate the await into the orchestrator pipeline. - MergeManyChangeSets (parent-comparer overload): conflict resolution when the same destination key appears in multiple parents' child changesets; the parent comparer picks the winning parent. Implemented via ParentChildEntry wrapper and reuses the existing MergeManyCacheChangeSetsOrchestrator. 15 new tests across 3 fixtures; 768/768 List tests pass. Public API additions: 3 new extension methods on ObservableListEx; all internal types remain internal.
Removed XML and inline comments across new and modified files that won't make sense after this PR merges or that narrate test/code flow without adding maintenance value: - Test fixture class summaries that just restated the SUT name - Inline test-flow narration (Arrange/Setup-style markers) - 'Prior to v9' release-note bullets on public docs (current behavior is already documented; release notes go in release notes) - 'Used by FooOperator, BarOperator' callouts in internal API docs - 'Unlike MergeMany, child errors are NOT swallowed' comparison on MergeManyChangeSets (no longer accurate now that MergeMany propagates) - The 'C1 fix' regression-test summary on SharedDeliveryQueueFixture - 'legacy CPS-subclass shape' and similar pre-orchestrator-refactor refs Kept comments that explain non-obvious WHY for the code at hand: lock ordering, race-condition rationale, subtle Rx behavior.
The cache-side MergeManyListChangeSets<TObject, TKey, TDestination> class was a one-line wrapper around OrchestrateManyChangeSets. Inline the call at the public API site and remove the file. The list-side MergeManyListChangeSets<TObject, TDestination> remains; it serves a list-source pipeline and has no orchestrator equivalent.
What this is
A common pattern in cache operators: for each item in a source changeset, maintain a separate external subscription whose values feed into the operator's output.
MergeManysubscribes to one observable per item.AutoRefreshsubscribes to one re-evaluator per item.FilterOnObservablesubscribes to one bool stream per item.TransformManyAsync,GroupOnObservable, and several others do the same. The shape is universal: 1:1 between source items and external subscriptions, with results flowing back into a single output.Getting that right involves a lot of boilerplate: spin up a subscription on Add/Update, tear it down on Remove, route everything through the same serialization gate to avoid the kind of cross-cache deadlocks that motivated #1097, keep the downstream stream alive until both the source and every per-item subscription have completed, dispose cleanly on error or unsubscribe.
CacheParentSubscriptionwas the previous primitive for cache operators that needed parent/child subscription accounting. It worked, but every consumer had to subclass it and own per-subscription state as subclass fields. That made it easy to leak state across subscribers or forget to wire up the queue-drain machinery from #1097. It also didn't compose: operators wanting parent/child accounting AND aChangeAwareCache, or AND a merge tracker, had to inline both concerns.This PR replaces it with Orchestrator: one primitive that handles subscription accounting, queue routing, completion accounting, and disposal. Operator authors write a small implementation describing what their operator does. The runtime handles how.
Two interfaces and a driver:
ICacheOrchestrator<TSource, TKey, TInner, TResult>: authors implement this. Three methods (OnSourceChangeSet,OnInner,OnDrainComplete).ICacheOrchestratorContext<TKey, TInner>: authors call this. Three methods (Track,Untrack,Serialize).CacheOrchestration<...>: the driver. Per-subscription instances built by a caller-supplied factory, so all state is naturally isolated.Architecture
flowchart TB subgraph public[Public extension methods] ops[AutoRefresh · MergeMany · MergeManyChangeSets ·<br/>FilterOnObservable · TransformManyAsync ·<br/>GroupOnObservable · TransformOnObservable · MergeManyItems ·<br/>MergeManyListChangeSets] end subgraph specializations[Internal specializations] oc[OrchestrateChangeSets<br/>mirror-manipulator with ChangeAwareCache] omc1[OrchestrateManyChangeSets cache<br/>merges child cache changesets] omc2[OrchestrateManyChangeSets list<br/>merges child list changesets] end subgraph base[Internal base entry points] ot[Orchestrate<TOrch><br/>typed factory] ol[Orchestrate lambdas<br/>convenience overload] end subgraph driver[Driver] co[CacheOrchestration<br/>per-subscription state owner] end subgraph contract[Contract] ico[ICacheOrchestrator<br/>what you implement] ictx[ICacheOrchestratorContext<br/>what you call] cob[CacheOrchestratorBase<br/>per-ChangeReason dispatch convenience] end subgraph runtime[Runtime] sdq[SharedDeliveryQueue + DeliverySubQueue<br/>lock-released serialized delivery] end ops --> specializations ops --> ol ops --> ot specializations --> ot ot --> co ol --> co co --> sdq co -.implements.-> ictx co -.calls.-> ico cob -.implements.-> ico style public fill:#FFCDD2,stroke:#C62828,color:#000 style specializations fill:#FFE0B2,stroke:#E65100,color:#000 style base fill:#FFF9C4,stroke:#F57F17,color:#000 style driver fill:#C8E6C9,stroke:#2E7D32,color:#000 style contract fill:#BBDEFB,stroke:#1565C0,color:#000 style runtime fill:#E1BEE7,stroke:#6A1B9A,color:#000Interfaces
ICacheOrchestrator<TSource, TKey, TInner, TResult>(you implement)OnSourceChangeSet(IChangeSet<TSource, TKey> changes)context.Track(key, ...)on Add/Update,context.Untrack(key)on Remove. Update any per-key state you keep.OnInner(TInner value, TKey key)OnDrainComplete(bool isFinal, bool wasReentrant)isFinal == truemeans the source and every tracked inner have terminated, so flush synchronously even if you'd normally defer (timer buffering, etc.).wasReentrantdistinguishes a reentrant drain from a clean cycle; most orchestrators ignore it.ICacheOrchestratorContext<TKey, TInner>(you call)Track(TKey key, IObservable<TInner> observable)Untrack(TKey key)Serialize<T>(IObservable<T> observable)Lifecycle
%%{init: {"themeVariables": {"sequenceRectStroke": "#000000", "sequenceRectStrokeWidth": "1px"}}}%% sequenceDiagram box rgba(255, 205, 210, 1) Consumer participant Sub as Subscriber end box rgba(255, 224, 178, 1) Driver participant Driver as CacheOrchestration end box rgba(255, 249, 196, 1) Runtime participant Q as SharedDeliveryQueue end box rgba(200, 230, 201, 1) Orchestrator participant Orch as Your orchestrator end box rgba(187, 222, 251, 1) Source participant Src as Source changeset end box rgba(225, 190, 231, 1) Inner participant Inn as Inner observable end Sub->>Driver: Subscribe Driver->>Q: new SharedDeliveryQueue Driver->>Orch: factory(context, emitter) Driver->>Src: SynchronizeSafe(queue).SubscribeSafe rect rgba(245, 245, 245, 1) note over Q: drain cycle Src->>Q: OnNext(changeset) Q->>Orch: OnSourceChangeSet(changeset) Orch->>Driver: ctx.Track(key, innerObs) Driver->>Inn: SynchronizeSafe(queue).SubscribeSafe Inn->>Q: OnNext(value) Q->>Orch: OnInner(value, key) Q->>Orch: OnDrainComplete(isFinal, wasReentrant) Orch->>Q: emitter.OnNext(result) Q->>Sub: result endThe Orchestrate family: five entry points
Two base entry points (typed factory and lambda overload) plus three specializations that bake in common output shapes.
Base:
OrchestrateSame
CacheOrchestrationdriver underneath. Use the typed factory when state and helpers want to live in a class. Use the lambda overload when closures suffice.OrchestrateChangeSets: cache changeset output viaChangeAwareCacheFor filter-like or per-key transform shapes: source is a keyed changeset, output is a keyed changeset, each source key contributes 0 or 1 items to the output. The runtime gives you a
ChangeAwareCachethat you mutate from both source and inner events. It captures and emits once per drain cycle. Used byFilterOnObservableandTransformOnObservable.OrchestrateManyChangeSets(cache): merge child cache changesetsThe shape
MergeManyChangeSetsproduces: per-source-key inner observables that are themselves cache changesets, merged into one cache changeset output. The runtime gives you aChangeSetMergeTrackerthat handles add/update/remove/refresh and conflict resolution via the supplied comparers. Used by bothMergeManyChangeSetsoverloads (vanilla and source-priority).OrchestrateManyChangeSets(list): merge child list changesetsSame shape but for list-typed inner changesets. Disambiguated by parameter type (no destination key). Used by
MergeManyListChangeSets.Picking an entry point
Orchestrate<TOrch>typed factory, optionally withCacheOrchestratorBasefor per-ChangeReasonvirtual hooksOrchestratelambda overloadChangeAwareCacheOrchestrateChangeSetsOrchestrateManyChangeSets(cache)OrchestrateManyChangeSets(list)Worked example:
MergeManyThe whole operator is one method-call expression. The pre-orchestrator version subclassed
CacheParentSubscriptionwith its own fields, constructor, and manualOnNext/Subscribeplumbing.Operators migrated
AutoRefreshOrchestrate<TOrch>+ custom orchestrator (timer state, pending-refresh map)FilterOnObservableOrchestrateChangeSetsGroupOnObservableOrchestratelambda (closes over aDynamicGrouper)MergeManyOrchestratelambdaMergeManyItemsOrchestratelambdaMergeManyChangeSets(cache)OrchestrateManyChangeSets(cache)MergeManyChangeSets(source-compare)OrchestrateManyChangeSets(cache) with per-item parent-child boxingMergeManyListChangeSetsOrchestrateManyChangeSets(list)TransformManyAsyncOrchestrate<TOrch>+ custom orchestrator (per-key cache, merge tracker)TransformOnObservableOrchestratelambda (closes over aChangeAwareCache)AutoRefresh correctness fixes
The rewrite fixes four pre-existing defects that had previously-skipped tests. All four tests are un-skipped here and pass:
PropertyAccessorIsNull_ThrowsException:propertyAccessoris now validated upfront. Previously threwNullReferenceExceptionon the first source notification.ReevaluatorIsNull_ThrowsException:reEvaluatoris now validated upfront. Same prior failure mode.ReevaluatorEmitsImmediately_ItemDoesNotRefresh([Bug]: AutoRefreshOnObservable produces refreshes for immediately-removed items #1099): a reevaluator that fires synchronously during item subscription no longer produces a redundant Refresh paired with the Add.ReevaluatorThrows_ExceptionPropagates: reevaluator exceptions now propagate toOnErrorinstead of being silently swallowed (Rx contract §6.4).Other fixes
AutoRefreshdrops a pending refresh when the source forwards its own Refresh. A reevaluator-queued refresh from a prior drain was being emitted in addition to the source's Refresh for the same key. Symmetrical to the existing pending-drop logic on Update/Remove. Regression test added.isFinalis computed from the subscription counter, not a latched flag. A latched flag races when the orchestrator callsTrackduring theisFinalcallback: counter goes back to 1 but the flag still says true, soOnCompletedfires while a live inner exists. The counter is the source of truth.MergeManyCacheChangeSetsandMergeManyCacheChangeSetsSourceComparewrapper classes. Inlined their bodies into the publicMergeManyChangeSetsoverloads. The source-compare overload's supporting types now live as private nested generic types in the same partial-class file as the public API.Virtualise,GroupOn,GroupOnImmutable: collapsed per-branch null/empty filters into single post-merge filters. Matches thePageshape and removes duplicated filtering on every merge input.Tests
OrchestrateFixture(16 tests): primitive contract. Source/inner serialization, per-drain coalesced emission, completion accounting (parent and all children), error propagation, cross-cache deadlock proofs, reentrant drain.SharedDeliveryQueueFixture,DeliveryQueueFixture: queue contract.AutoRefreshOrchestratorFixture,ChangeSetOrchestratorFixture,LambdaCacheOrchestratorFixture,MergedOrchestratorFixture,MergedListOrchestratorFixture,TransformManyAsyncOrchestratorFixture.Full Cache + Internal suite: pass.
DeadlockTortureTestfixture (added in #1097): pass.Public surface
No new or changed public APIs.
Behavioral change: per-item observable errors now propagate
Several existing public operators previously swallowed errors from per-item observables (the offending item was silently unsubscribed and the merged stream continued). They now forward those errors, terminating the merged stream. This matches the contract of standard
Observable.Merge.Cache side:
MergeManyWhenAnyPropertyChanged(routes throughMergeMany)WhenPropertyChanged(routes throughMergeMany)WhenValueChanged(routes throughMergeMany)AutoRefreshOnObservableList side:
MergeManyWhenAnyPropertyChanged(routes throughMergeMany)WhenPropertyChanged(routes throughMergeMany)WhenValueChanged(routes throughMergeMany)AutoRefreshOnObservableFilterOnObservableXML docs on each operator now describe the new behavior and call out the prior swallowing.
If an application was relying on the old swallowing behavior, the workaround is to defensively
.Catch(...)inside the per-item observable.