List: cross-cache deadlock immunity (parity with cache PR #1079)#1115
Draft
dwcullop wants to merge 13 commits into
Draft
List: cross-cache deadlock immunity (parity with cache PR #1079)#1115dwcullop wants to merge 13 commits into
dwcullop wants to merge 13 commits into
Conversation
… operators Replaces the legacy .Synchronize(locker) pattern (which holds the lock during downstream observer.OnNext) with the SDQ-based SynchronizeSafe variants (which release the lock before delivery). Brings list operators in line with the cross-cache deadlock immunity ObservableCache already has. Operators migrated: - Sort, Pager, Virtualiser, Filter.Dynamic, BufferIf - DynamicCombiner (powers Or/And/Xor/Except dynamic overloads) - MergeChangeSets (IObservable<IObservable<...>> entry points) - Combiner (static And/Or/Xor/Except). Also adds the OnError handler that was missing in the legacy form. - DisposeMany (queue-first variant so per-item Dispose runs after notifications drain) - Switch (shared queue for source-of-sources Clear signal and inner stream PopulateInto) - LimitSizeTo (internal + public ext) - GroupOn / GroupOnImmutable (shared queue between source stream and regrouper Unit signal) - TransformMany (outer queue + per-child queues) - ObservableListEx.Adapt Single-source semantics are preserved (each operator still appears to deliver one notification at a time). Multi-source operators preserve serialization via a shared queue. The difference is that downstream observer.OnNext is no longer called with any lock held, so cross-cache pipelines can no longer ABBA-deadlock through these operators.
Mirrors the ObservableCache change (reactivemarbles#1079) for the list side. Replaces lock-during-OnNext with queue-based drain so subscriber callbacks that propagate to other caches cannot ABBA-deadlock with concurrent writes. SourceList: - lock(_locker) in Edit/Connect/LoadFromSource etc. replaced with DeliveryQueue<ListUpdate>. - Connect uses AcquireReadLock + version-skipping (same pattern as ObservableCache.Connect) so a new subscriber receives a consistent snapshot and is not re-delivered items already in flight. - Preview still fires synchronously under the queue's gate lock (required by ReaderWriter's pre/post-swap protocol). - After this change, Edit returns BEFORE notifications drain; the queue delivers them on the same thread once the gate is released. This matches the cache semantic. ExpireAfter (list): - The legacy implementation tracked expirations in a List<DateTimeOffset?> mirror of the source, indexed by source position. That assumed Edit was synchronous with downstream delivery - true for the old SourceList, false for the queue-based one. - Rewritten to track (item, dueTime) pairs. During expiration removal, the operator now matches by item value via updater.IndexOf instead of by stale shadow index. Items the shadow believes need expiring but that have already been removed externally are simply skipped.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR ports the “queue-drain delivery” pattern to the list side of DynamicData to prevent cross-cache/list ABBA deadlocks by ensuring downstream OnNext is not invoked while holding operator/source locks. It also migrates SourceList<T> to queued delivery with version-based snapshot/skip logic and rewrites list ExpireAfter to be resilient to asynchronous drain timing.
Changes:
- Replaced many list-operator
.Synchronize(locker)gates withSharedDeliveryQueue+SynchronizeSafe(...)to release locks before downstream delivery. - Migrated
SourceList<T>edit/connect notification flow toDeliveryQueue<ListUpdate>with version-based “skip in-flight” behavior for new subscribers. - Reworked list
ExpireAfterto avoid index-based shadow state that could desync under queued delivery.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| src/DynamicData/List/SourceList.cs | Core SourceList<T> migration to DeliveryQueue, versioned snapshot/skip logic, preview/notifications changes. |
| src/DynamicData/List/ObservableListEx.LimitSizeTo.cs | Updates LimitSizeTo extension to use SharedDeliveryQueue + SynchronizeSafe. |
| src/DynamicData/List/ObservableListEx.Adapt.cs | Switches to SynchronizeSafe(locker) for adapt path. |
| src/DynamicData/List/Internal/Sort.cs | Uses SharedDeliveryQueue for multi-source serialization without holding locks during delivery. |
| src/DynamicData/List/Internal/Pager.cs | Same queue-drain refactor for paging operator inputs. |
| src/DynamicData/List/Internal/Virtualiser.cs | Same queue-drain refactor for virtualisation operator inputs. |
| src/DynamicData/List/Internal/Filter.Dynamic.cs | Same queue-drain refactor for dynamic predicate + source streams. |
| src/DynamicData/List/Internal/BufferIf.cs | Same queue-drain refactor for pause selector + source streams. |
| src/DynamicData/List/Internal/MergeChangeSets.cs | Same queue-drain refactor for merging list changeset streams. |
| src/DynamicData/List/Internal/Switch.cs | Same queue-drain refactor for source-of-sources switch + destination updates. |
| src/DynamicData/List/Internal/LimitSizeTo.cs | Internal driver updated to serialize through SynchronizeSafe(queue). |
| src/DynamicData/List/Internal/GroupOn.cs | Uses SharedDeliveryQueue for source + regrouper serialization. |
| src/DynamicData/List/Internal/GroupOnImmutable.cs | Uses SharedDeliveryQueue for source + regrouper serialization. |
| src/DynamicData/List/Internal/DynamicCombiner.cs | Uses SharedDeliveryQueue for dynamic set-combine streams. |
| src/DynamicData/List/Internal/Combiner.cs | Uses SharedDeliveryQueue for static set-combine streams. |
| src/DynamicData/List/Internal/TransformMany.cs | Adds outer + per-child queue serialization for child-changes overload. |
| src/DynamicData/List/Internal/DisposeMany.cs | Switches to queue-first SynchronizeSafe() to avoid teardown races. |
| src/DynamicData/List/Internal/ExpireAfter.cs | Rewrites shadow tracking to tolerate queued delivery timing. |
Comments suppressed due to low confidence (2)
src/DynamicData/List/Internal/GroupOn.cs:48
SharedDeliveryQueueis created but never disposed. DisposedDeliverySubQueueinstances remain referenced by the parent queue until it is disposed/compacted, which can retain observer chains and increase memory usage.
// Shared queue serializes the source changesets with the optional regrouper signal
// so they appear as a single sequence to the combiner, without holding a lock
// during the downstream observer.OnNext call.
var queue = new SharedDeliveryQueue();
var shared = itemsWithGroup.SynchronizeSafe(queue).Publish();
var grouper = shared.Select(changes => Process(groupings, groupCache, changes));
var regrouperFunc = _regrouper is null ?
Observable.Never<IChangeSet<IGroup<TObject, TGroupKey>>>() :
_regrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable
.NotEmpty().SubscribeSafe(observer);
return new CompositeDisposable(publisher, shared.Connect());
});
src/DynamicData/List/Internal/GroupOnImmutable.cs:46
SharedDeliveryQueueis created but never disposed. DisposedDeliverySubQueueinstances remain referenced by the parent queue until it is disposed/compacted, which can retain observer chains and increase memory usage.
// Shared queue serializes the source changesets with the optional regrouper signal
// without holding a lock during downstream delivery.
var queue = new SharedDeliveryQueue();
var shared = itemsWithGroup.SynchronizeSafe(queue).Publish();
var grouper = shared.Select(changes => Process(groupings, groupCache, changes));
var reGroupFunc = _reGrouper is null ?
Observable.Never<IChangeSet<IGrouping<TObject, TGroupKey>>>() :
_reGrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer);
return new CompositeDisposable(publisher, shared.Connect());
});
Mirrors the cache-side DeadlockTortureTest. Each migrated list operator is wired into a bidirectional cross-list pipeline. Two threads write simultaneously, creating the ABBA lock cycle: Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker Verified empirically against this branch and against origin/main: - origin/main (legacy .Synchronize(locker)): Sort_DoesNotDeadlock wedges the test host within seconds. The 15s in-test timeout never fires because the threadpool is saturated by the deadlock; xUnit's --blame-hang-timeout catches it at 60s and produces a hang dump. - This branch (SynchronizeSafe queue-drain): Sort_DoesNotDeadlock completes in ~340 ms. The same proof generalises to AutoRefresh, Page, Virtualise, FilterDynamic, BufferIf, DisposeMany, GroupOn, TransformMany, the stacked AllDangerous variant, and the ThreeWayCircular variant. They use the same shape and the same legacy lock, so the deadlock vector is identical.
… and queue lifetime issues Findings from a multi-agent adversarial review of this branch. Four reviewers, four high-consensus issues fixed. 1. SourceList.Dispose race (HIGH, 3/4 reviewers) Dispose() called _changesPreview.Dispose() directly. If another thread was actively draining the queue when Dispose ran, the queued OnCompleted notification would later fire OnCompleted on the disposed subject and throw ObjectDisposedException. Fix: drop the direct Dispose; the queued OnCompleted (via ListUpdateObserver.OnCompleted) terminates the subject correctly, after which it is GC-eligible. The field already had a CA2213 suppression saying it was disposed via _cleanUp, which now matches reality. 2. ExpireAfter wrong-occurrence removal (HIGH, 4/4 reviewers) updater.IndexOf(item) finds the FIRST equal item. If the source has two equal items where one has a finite dueTime and the other has no expiry, the scheduled expiration would remove the wrong occurrence. Fix: when the shadow is in sync with the source (count matches and prefix items match), use reverse-shadow-index removal which preserves per-occurrence identity. Fall back to IndexOf only when the shadow is stale (a concurrent external mutation is queued but not yet delivered to OnSourceNext); in that case the stale-shadow tolerance from the original rewrite still applies. 3. SharedDeliveryQueue instances not disposed in Combiner, GroupOn, GroupOnImmutable, Switch, TransformMany (HIGH, 2/4 reviewers) The owned SharedDeliveryQueue was created per Run() but never included in the returned CompositeDisposable. Disposing the subscription stopped further notifications from arriving but did not terminate the queue's drain state. Fix: append the queue to the returned CompositeDisposable, AFTER the subscriptions, so subscriptions stop first and then the queue drains and terminates. 4. LimitSizeTo shared SharedDeliveryQueue across all subscriptions (HIGH, 2/4 reviewers) The queue and limiter were instantiated at the call site, so the returned cold observable shared one queue across every subscriber. Fix: wrap in Observable.Create so each subscription gets its own queue and limiter, and dispose both when the subscription disposes.
…after terminal is staged Addresses L1 from the adversarial review. Without this flag, Preview was gated only on _notifications.IsTerminated, which the DeliveryQueue sets when the terminal item is DEQUEUED by the drain, not when it is enqueued. Window: 1. Edit_1 mid-drain delivering OnNext to subscribers 2. NotifyCompleted/NotifyError acquires the gate, enqueues terminal item 3. Edit_2 acquires the gate, calls Write 4. WriteWithPreview synchronously fires InvokeNextPreview 5. _notifications.IsTerminated is still false (drain hasn't reached the terminal item) -> preview fires 6. Edit_2 enqueues its main change, releases gate 7. Drain eventually dequeues the terminal item, sets _isTerminated, clears the queue, so Edit_2's main change is dropped Result: preview subscriber saw a change main subscribers never see. Fix: set a _terminalEnqueued flag under the gate at the same instant the terminal item is enqueued. InvokeNextPreview reads it under the same gate (it is invoked from within Edit, which holds the gate), so this is safe without needing a memory barrier. Edit itself is intentionally NOT short-circuited on _terminalEnqueued. Legacy SourceList allowed Edit to win the lock race against Dispose and have its change delivered; the new model can't preserve that exactly because the change may be dropped by the queue clear on terminal, but allowing Edit through preserves the legacy 'I owned the lock first' Items snapshot semantic for callers that read Items after Edit returns. Other items from the review pass (L2, L3, L5) checked and not actioned: - L2 (Switch outer/inner ordering): pre-existing legacy behavior. Confirmed by inspecting origin/main:src/DynamicData/List/Internal/Switch.cs, which uses a single shared locker between the outer Do callback and the inner .Synchronize(locker), with no ordering guarantee between Clear and a stale in-flight Add from the previous source. The new SDQ-based code preserves the same race. - L3 (Connect/CountChanged subscribe under gate): consistent with cache ObservableCache.Connect, which holds lock(_locker) around SubscribeSafe in the same way. This is the established library-wide pattern for snapshot-then-subscribe atomicity; changing it requires a different design that's out of scope for this PR. - L5 (ExpireAfter removal order): the H2 reverse-shadow-index path in the in-sync case already restores the legacy order. The fallback (stale-shadow) path uses forward order, but the legacy crashed in that case so there is no order to preserve.
…n finally Address PR review comments. 1. Connect: removed the Finally(observer.OnCompleted) wrapping the SubscribeSafe(observer) call. Rx's Finally forwards the source terminal AND then invokes the action, so observer.OnCompleted was called twice on natural source termination. SubscribeSafe alone forwards OnCompleted once via the auto-detach observer wrapper. Pre-existing bug; the legacy code had it too. 2. Edit: _editLevel++ was matched by an _editLevel-- in the linear flow but not in a finally. If updateAction (or the underlying ReaderWriter.Write*) threw, _editLevel was left permanently incremented, breaking all subsequent edits because the WriteNested path would be taken and the queued notifications would be suppressed. Pre-existing bug; the legacy code had it too.
…hild SharedDeliveryQueue Each child observable in the childChanges overload had its own SharedDeliveryQueue with exactly one source feeding it. That's the wrong tool: SharedDeliveryQueue exists to serialize multiple sources through one drain. A per-child queue with one source is just a DeliveryQueue with extra machinery, and the outer queue's compaction logic was the only thing that would ever release the dead sub-queue. Switched to parameterless SynchronizeSafe() which internally allocates a per-subscription DeliveryQueue with queue-first disposal. The queue is now disposed deterministically when the child subscription is torn down (which happens when MergeMany unsubscribes from a removed item). The OUTER queue legitimately needs to be a SharedDeliveryQueue because it serializes two different streams (initial-state from transformed.SynchronizeSafe and per-item merge from transformed.MergeMany.SynchronizeSafe). That is unchanged.
…d stream attaches as a sub-queue Per the PR review followup: there is no reason to have two separate queues here. SharedDeliveryQueue exists precisely so multiple sources can attach as sub-queues and all funnel through a single drain. The correct topology for this operator is one shared queue for everything (per-child child-changes streams, the initial-state stream, and the merged-children stream). That gives: - one drain point for all downstream observer.OnNext invocations - per-child sub-queues that are deterministically disposed by their subscription teardown - the queue itself disposed once via the CompositeDisposable when the whole TransformMany subscription ends
…k, harden LoadFromSource exception path, correct CA2213 justifications Multi-agent multi-pass review found three issues in this PR. 1. ExpireAfter: shadowInSync prefix check was only validating up to the FIRST expiring shadow index (firstExpiringShadowIdx). The reverse-removal loop touched every expiring shadow index, so if a concurrent external mutation (Move/Replace/Insert+Remove with unchanged Count) had affected positions BETWEEN the first and last expiring index, the check falsely declared the shadow in sync and the code removed unrelated items. Unanimous flag across 9/9 reviewers; the new prefix check validates up to and including the LAST expiring index. 2. SourceList.LoadFromSource: exceptions thrown by _readerWriter.Write were escaping the source observer callback instead of being converted to OnError. The legacy implementation used .Select(_readerWriter.Write).Subscribe(_, OnError, ...) which routed selector exceptions to OnError; the migrated form uses a manual Subscribe and lost that. Wrapped the write in try/catch -> NotifyError(ex). Flagged independently by rubberduck-gpt55 and dotnet-gpt55. 3. CA2213 suppression text on _changes and _changesPreview claimed 'Disposed with _cleanUp', which has been misleading since the dispose-during-drain race fix removed the explicit Subject disposal. Updated the justification to 'Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.' Items intentionally NOT addressed (reported back to user per review summary): - ExpireAfter lock-during-OnNext (private gate, no cross-cache deadlock vector, already disclosed) - SourceList.Dispose async-drain semantic (intentional, matches cache) - LIFO sibling reordering in multi-source operators (pre-existing pattern, matches cache) - Finally(observer.OnCompleted) removal in Connect (intentional behavior fix from earlier commit)
…/CombineLatest The list operators all serialize their inputs through a SharedDeliveryQueue before combining them with Rx Merge or CombineLatest. Rx's Merge/CombineLatest install a private gate lock that is held for the entire downstream OnNext, which reconstructs the cross-cache ABBA deadlock that SharedDeliveryQueue is designed to eliminate. Cherry-picks the UnsynchronizedMerge, UnsynchronizedCombineLatest, and DeliveryQueueMergeExtensions helpers from the in-flight upstream PR reactivemarbles#1097 (fix/operator-merge-gate-deadlock). Each helper is a drop-in replacement for the corresponding Rx operator that omits the internal gate; the precondition is that all inputs are already serialized through the same external gate (which the SharedDeliveryQueue provides). Migrated call sites (all post-queue): - Sort: 2 Merges collapsed into one 3-input UnsynchronizedMerge - Pager: requestStream.UnsynchronizedMerge(dataChanged) - Virtualiser: requestStream.UnsynchronizedMerge(dataChanged) - Filter.Dynamic: predicateChanged.UnsynchronizedMerge(filteredResult) - GroupOn and GroupOnImmutable: both Merge AND CombineLatest converted (shared.ToCollection comes through SynchronizeSafe(queue).Publish so it's queue-serialized too) - TransformMany: init.UnsynchronizedMerge(subsequentSelection) NOT migrated: - BufferIf line 37: _pauseIfTrueSelector.Merge(timeoutSubject) happens UPSTREAM of SynchronizeSafe(queue). Inputs are not queue-serialized so the Rx gate is genuinely needed.
…es#1097 audit Closes the remaining ABBA-prone gate-holding Rx operators on the list side. Audit followed the matrix in PR reactivemarbles#1097's body. Switch (List/Internal/Switch.cs): Was using Observable.Switch with the outer routed through SynchronizeSafe(queue) then PopulateInto(destination), which still installed Switch's internal gate and held it during downstream delivery. Refactored to mirror the cache-side Switch refactor: inline switch logic with SerialDisposable, both outer and inner routed through the same SharedDeliveryQueue, errors fed through a Subject<IChangeSet<T>>, and the final downstream merge of destination.Connect() and errors uses UnsynchronizedMerge. Also fixes the public Switch(IObservable<IObservableList<T>>) overload because it delegates to the IObservable<IObservable<IChangeSet<T>>> overload via .Switch() -> new Switch<T>(sources).Run(). BufferIf: _pauseIfTrueSelector.Merge(timeoutSubject) is upstream of SynchronizeSafe(queue), so the merge inputs are NOT pre-serialized. UnsynchronizedMerge is unsafe here. Replaced with DeliveryQueueMerge from PR reactivemarbles#1097's helper - it owns its own DeliveryQueue<bool> so notifications from both sources are serialized inside the merge itself, then released before downstream delivery to the .Concat -> .ObserveOn -> .SynchronizeSafe pipeline. Audited and skipped: - MergeChangeSets and DynamicCombiner use DD's MergeMany (operator on IObservable<IChangeSet<T>>), not Rx's Observable.Merge. Already deadlock-safe; not in reactivemarbles#1097's gate list. - Combiner.Zip and FilterOnObservable.Join are Enumerable LINQ over ICollection/IEnumerable, not Observable.Zip/Join. No Rx gate involved. - Buffer (time-based), Throttle: per reactivemarbles#1097 audit table, single-input gates protect internal buffer/throttle state. Left alone. After this commit, all gate-holding Rx combinators on the list side that sit downstream of SynchronizeSafe are eliminated. The full list of touched operators matches the cache-side coverage in reactivemarbles#1097 modulo the cache-only ones (joins, AsyncDisposeMany, ObservableCache plumbing).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Brings the list side of DynamicData to parity with the cross-cache deadlock immunity work that landed on the cache side in #1079. After this PR,
SourceList<T>and the most lock-heavy list operators no longer hold a lock during downstreamobserver.OnNext, eliminating an entire class of ABBA deadlocks that occur when one cache/list pipeline mutates another from inside a subscriber callback.What changed
Commit 1:
.Synchronize(locker)to SDQ-based safe variants (15 operators)The legacy
.Synchronize(locker)Rx operator holds the gate lock during the entire downstreamOnNextdelivery. When that chain crosses a cache boundary (subscriber writes to a different cache, or triggers a side effect that does so), and another thread has the reverse path, the result is ABBA deadlock. The cache side fixed this by routing all downstream delivery through aDeliveryQueue/SharedDeliveryQueuethat releases the lock before invoking the observer.Migrated operators:
Sort,Pager,VirtualiserFilter.Dynamic(dynamic/observable predicate overload)BufferIfDynamicCombiner(powers the dynamicOr/And/Xor/Exceptoverloads)MergeChangeSets(IObservable<IObservable<...>>entry points)Combiner(staticOr/And/Xor/Except). Also adds theOnErrorhandler that was missing in the legacy form (it silently dropped errors from any source observable other than the first to error).DisposeMany(queue-first variant so per-itemDisposeruns after notifications drain, matching the legacy semantic)Switch(shared queue between source-of-sourcesClearsignal and inner streamPopulateInto)LimitSizeTo(internal driver and theObservableListExpublic extension)GroupOn,GroupOnImmutable(shared queue between source stream and regrouperUnitsignal)TransformManychildChangesoverload (outer queue + per-child queues)ObservableListEx.AdaptSingle-source operators preserve their original serialization (one notification at a time). Multi-source operators preserve serialization via a shared queue (sources still appear as one merged sequence). The only observable difference is that downstream
observer.OnNextis no longer called with any lock held.Commit 2:
SourceList<T>migrated toDeliveryQueue<ListUpdate>Mirrors the cache change from #1079.
SourceList.Editno longer holds a lock during_changes.OnNext. Mechanics:ConnectusesAcquireReadLock+ monotonic version skipping (same pattern asObservableCache.Connect) so a fresh subscriber sees a consistent snapshot and is not re-delivered any change already in flight.Previewcontinues to fire synchronously under the queue's gate (required byReaderWriter's pre/post-swap protocol for previewing the pre-write state).Semantic change to be aware of: with this change,
Editreturns before notifications drain. The queue delivers them on the same thread once the gate is released. This matches the cache semantic established in #1079. The vast majority of pipelines are unaffected because they only observe values viaConnect(which is happening on a different stack frame anyway). Anything that races a synchronous read ofSourceList.Itemsimmediately afterEditmay see a snapshot before the changeset has been delivered to observers, butItemsitself reads the latest state throughReaderWriterso this isn't a correctness issue, just an ordering one.Commit 2 (continued):
ExpireAfter(list) rewrittenThe legacy list
ExpireAftertracked expirations in aList<DateTimeOffset?>shadow, indexed by source position. That assumedEditwas synchronous with downstream delivery: true for the oldSourceList, false for the new one. With the migratedSourceList, a concurrentEditcould observe a stale shadow with a different length than the underlying source, crashing inupdater[removedIndex]withArgumentOutOfRangeException.Rewritten to track
(item, dueTime)pairs and to match removals by item value viaupdater.IndexOf(item). If the shadow believes an item should expire but it's already been removed externally, the removal is silently skipped and the next source change will bring the shadow back into sync. Behavior is preserved for any pipeline that previously worked; behavior is correct for the concurrent case that previously crashed.What's NOT in this PR
ObservableCache.csorDeliveryQueue.csis touched.Synchronize(locker)callers inside listExpireAfterand cacheTransformManyare intentionally not migrated. Their gates are private and never exposed to consumers, so they have no cross-cache deadlock vector. A drop-in SDQ swap would require adding explicitlock(state)inside the per-itemOnSourceNextto preserve the gate-vs-Editinvariant; not worth the churn for zero deadlock benefit.Validation