From 942b76290d47adf94a04766fb535e3e5451a5c87 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 14 Jun 2026 20:34:30 -0700 Subject: [PATCH 01/12] List: replace .Synchronize(locker) with SDQ-based safe variants in 15 operators Replaces the legacy .Synchronize(locker) pattern (which holds the lock during downstream observer.OnNext) with the SDQ-based SynchronizeSafe variants (which release the lock before delivery). Brings list operators in line with the cross-cache deadlock immunity ObservableCache already has. Operators migrated: - Sort, Pager, Virtualiser, Filter.Dynamic, BufferIf - DynamicCombiner (powers Or/And/Xor/Except dynamic overloads) - MergeChangeSets (IObservable> entry points) - Combiner (static And/Or/Xor/Except). Also adds the OnError handler that was missing in the legacy form. - DisposeMany (queue-first variant so per-item Dispose runs after notifications drain) - Switch (shared queue for source-of-sources Clear signal and inner stream PopulateInto) - LimitSizeTo (internal + public ext) - GroupOn / GroupOnImmutable (shared queue between source stream and regrouper Unit signal) - TransformMany (outer queue + per-child queues) - ObservableListEx.Adapt Single-source semantics are preserved (each operator still appears to deliver one notification at a time). Multi-source operators preserve serialization via a shared queue. The difference is that downstream observer.OnNext is no longer called with any lock held, so cross-cache pipelines can no longer ABBA-deadlock through these operators. --- src/DynamicData/List/Internal/BufferIf.cs | 12 +++-- src/DynamicData/List/Internal/Combiner.cs | 46 +++++++++---------- src/DynamicData/List/Internal/DisposeMany.cs | 14 +++--- .../List/Internal/DynamicCombiner.cs | 18 ++++---- .../List/Internal/Filter.Dynamic.cs | 15 ++++-- src/DynamicData/List/Internal/GroupOn.cs | 11 +++-- .../List/Internal/GroupOnImmutable.cs | 13 +++--- src/DynamicData/List/Internal/LimitSizeTo.cs | 11 ++--- .../List/Internal/MergeChangeSets.cs | 33 ++++++------- src/DynamicData/List/Internal/Pager.cs | 16 +++++-- src/DynamicData/List/Internal/Sort.cs | 19 ++++++-- src/DynamicData/List/Internal/Switch.cs | 21 +++++---- .../List/Internal/TransformMany.cs | 17 +++++-- src/DynamicData/List/Internal/Virtualiser.cs | 17 +++++-- .../List/ObservableListEx.Adapt.cs | 3 +- .../List/ObservableListEx.LimitSizeTo.cs | 7 +-- 16 files changed, 158 insertions(+), 115 deletions(-) diff --git a/src/DynamicData/List/Internal/BufferIf.cs b/src/DynamicData/List/Internal/BufferIf.cs index 8f7d1993c..d2a123704 100644 --- a/src/DynamicData/List/Internal/BufferIf.cs +++ b/src/DynamicData/List/Internal/BufferIf.cs @@ -7,6 +7,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class BufferIf(IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, TimeSpan? timeOut = null, IScheduler? scheduler = null) @@ -23,13 +25,16 @@ internal sealed class BufferIf(IObservable> source, IObservable public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var paused = initialPauseState; var buffer = new ChangeSet(); var timeoutSubscriber = new SerialDisposable(); var timeoutSubject = new Subject(); - var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.Merge(timeoutSubject)).ObserveOn(_scheduler).Synchronize(locker).Publish(); + var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.Merge(timeoutSubject)).ObserveOn(_scheduler).SynchronizeSafe(queue).Publish(); var pause = bufferSelector.Where(state => state).Subscribe( _ => @@ -61,7 +66,7 @@ public IObservable> Run() => Observable.Create>( timeoutSubscriber.Disposable = Disposable.Empty; }); - var updateSubscriber = _source.Synchronize(locker).Subscribe( + var updateSubscriber = _source.SynchronizeSafe(queue).Subscribe( updates => { if (paused) @@ -85,6 +90,7 @@ public IObservable> Run() => Observable.Create>( updateSubscriber.Dispose(); timeoutSubject.OnCompleted(); timeoutSubscriber.Dispose(); + queue.Dispose(); }); }); } diff --git a/src/DynamicData/List/Internal/Combiner.cs b/src/DynamicData/List/Internal/Combiner.cs index 64882a3ba..4b36388cb 100644 --- a/src/DynamicData/List/Internal/Combiner.cs +++ b/src/DynamicData/List/Internal/Combiner.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -6,46 +6,42 @@ using System.Reactive.Linq; using DynamicData.Cache.Internal; +using DynamicData.Internal; namespace DynamicData.List.Internal; internal sealed class Combiner(ICollection>> source, CombineOperator type) where T : notnull { -#if NET9_0_OR_GREATER - private readonly Lock _locker = new(); -#else - private readonly object _locker = new(); -#endif - private readonly ICollection>> _source = source ?? throw new ArgumentNullException(nameof(source)); public IObservable> Run() => Observable.Create>( observer => { var disposable = new CompositeDisposable(); - var resultList = new ChangeAwareListWithRefCounts(); + var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList(); - lock (_locker) - { - var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList(); + // Shared queue serializes the multiple source streams so they appear as a + // single sequence to the combiner, but without holding a lock during the + // downstream observer.OnNext call. + var queue = new SharedDeliveryQueue(); - foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list })) - { - disposable.Add( - pair.Item.Synchronize(_locker).Subscribe( - changes => + foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list })) + { + disposable.Add( + pair.Item.SynchronizeSafe(queue).Subscribe( + changes => + { + CloneSourceList(pair.List, changes); + + var notifications = UpdateResultList(changes, sourceLists, resultList); + if (notifications.Count != 0) { - CloneSourceList(pair.List, changes); - - var notifications = UpdateResultList(changes, sourceLists, resultList); - if (notifications.Count != 0) - { - observer.OnNext(notifications); - } - })); - } + observer.OnNext(notifications); + } + }, + observer.OnError)); } return disposable; diff --git a/src/DynamicData/List/Internal/DisposeMany.cs b/src/DynamicData/List/Internal/DisposeMany.cs index 046e19e4d..cc7b21dd9 100644 --- a/src/DynamicData/List/Internal/DisposeMany.cs +++ b/src/DynamicData/List/Internal/DisposeMany.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class DisposeMany(IObservable> source) @@ -14,11 +16,13 @@ internal sealed class DisposeMany(IObservable> source) public IObservable> Run() => Observable.Create>(observer => { - // Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it. var cachedItems = new List(); + // SynchronizeSafe with queue-first disposal: the DeliveryQueue is terminated and + // drained before the source subscription is disposed, ensuring no notifications + // can fire while the teardown (per-item Dispose) is in progress. var sourceSubscription = source - .Synchronize(cachedItems) + .SynchronizeSafe() .SubscribeSafe(Observer.Create>( onNext: changeSet => { @@ -76,11 +80,7 @@ public IObservable> Run() return Disposable.Create(() => { sourceSubscription.Dispose(); - - lock (cachedItems) - { - ProcessFinalization(cachedItems); - } + ProcessFinalization(cachedItems); }); }); diff --git a/src/DynamicData/List/Internal/DynamicCombiner.cs b/src/DynamicData/List/Internal/DynamicCombiner.cs index bb3091378..2d7dd9470 100644 --- a/src/DynamicData/List/Internal/DynamicCombiner.cs +++ b/src/DynamicData/List/Internal/DynamicCombiner.cs @@ -6,32 +6,32 @@ using System.Reactive.Linq; using DynamicData.Cache.Internal; +using DynamicData.Internal; namespace DynamicData.List.Internal; internal sealed class DynamicCombiner(IObservableList>> source, CombineOperator type) where T : notnull { -#if NET9_0_OR_GREATER - private readonly Lock _locker = new(); -#else - private readonly object _locker = new(); -#endif - private readonly IObservableList>> _source = source ?? throw new ArgumentNullException(nameof(source)); public IObservable> Run() => Observable.Create>( observer => { + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(_locker) so the + // gate is released before downstream OnNext. Closes the cross-cache deadlock + // window. + var queue = new SharedDeliveryQueue(); + // this is the resulting list which produces all notifications var resultList = new ChangeAwareListWithRefCounts(); // Transform to a merge container. // This populates a RefTracker when the original source is subscribed to - var sourceLists = _source.Connect().Synchronize(_locker).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList(); + var sourceLists = _source.Connect().SynchronizeSafe(queue).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList(); // merge the items back together - var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).Synchronize(_locker).Subscribe( + var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).SynchronizeSafe(queue).Subscribe( changes => { // Populate result list and check for changes @@ -86,7 +86,7 @@ public IObservable> Run() => Observable.Create>( } }).Subscribe(); - return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged); + return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged, queue); }); private bool MatchesConstraint(MergeContainer[] sourceLists, T item) diff --git a/src/DynamicData/List/Internal/Filter.Dynamic.cs b/src/DynamicData/List/Internal/Filter.Dynamic.cs index 8d9b5e8dc..f8fded8c8 100644 --- a/src/DynamicData/List/Internal/Filter.Dynamic.cs +++ b/src/DynamicData/List/Internal/Filter.Dynamic.cs @@ -4,6 +4,8 @@ using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal static partial class Filter @@ -36,7 +38,10 @@ public Dynamic(IObservable> source, Func predicate, ListF public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so + // the gate lock is released before downstream OnNext. Closes the cross- + // cache deadlock window. + var queue = new SharedDeliveryQueue(); Func predicate = _ => false; var all = new List(); @@ -57,7 +62,7 @@ public IObservable> Run() => Observable.Create>( throw new InvalidOperationException("The predicates is not set and the change is not a immutableFilter."); } - predicateChanged = _predicates.Synchronize(locker).Select( + predicateChanged = _predicates.SynchronizeSafe(queue).Select( newPredicate => { predicate = newPredicate; @@ -72,7 +77,7 @@ public IObservable> Run() => Observable.Create>( */ // Need to get item by index and store it in the transform - var filteredResult = _source.Synchronize(locker).Transform( + var filteredResult = _source.SynchronizeSafe(queue).Transform( (t, previous) => { var wasMatch = previous.ConvertOr(p => p!.IsMatch, () => false); @@ -90,9 +95,11 @@ public IObservable> Run() => Observable.Create>( return Process(filtered, changes); }); - return predicateChanged.Merge(filteredResult).NotEmpty() + var publisher = predicateChanged.Merge(filteredResult).NotEmpty() .Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform .SubscribeSafe(observer); + + return new System.Reactive.Disposables.CompositeDisposable(publisher, queue); }); private static IChangeSet Process(ChangeAwareList filtered, IChangeSet changes) diff --git a/src/DynamicData/List/Internal/GroupOn.cs b/src/DynamicData/List/Internal/GroupOn.cs index 6ea7deccd..cdb79e238 100644 --- a/src/DynamicData/List/Internal/GroupOn.cs +++ b/src/DynamicData/List/Internal/GroupOn.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class GroupOn(IObservable> source, Func groupSelector, IObservable? regrouper) @@ -27,14 +29,17 @@ public IObservable>> Run() => Observable.C // capture the grouping up front which has the benefit that the group key is only selected once var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true); - var locker = InternalEx.NewLock(); - var shared = itemsWithGroup.Synchronize(locker).Publish(); + // Shared queue serializes the source changesets with the optional regrouper signal + // so they appear as a single sequence to the combiner, without holding a lock + // during the downstream observer.OnNext call. + var queue = new SharedDeliveryQueue(); + var shared = itemsWithGroup.SynchronizeSafe(queue).Publish(); var grouper = shared.Select(changes => Process(groupings, groupCache, changes)); var regrouperFunc = _regrouper is null ? Observable.Never>>() : - _regrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _regrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable .NotEmpty().SubscribeSafe(observer); diff --git a/src/DynamicData/List/Internal/GroupOnImmutable.cs b/src/DynamicData/List/Internal/GroupOnImmutable.cs index 2b4a08be8..736c1bd21 100644 --- a/src/DynamicData/List/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/List/Internal/GroupOnImmutable.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class GroupOnImmutable(IObservable> source, Func groupSelector, IObservable? reGrouper) @@ -24,20 +26,19 @@ public IObservable>> Run() => Observabl var groupings = new ChangeAwareList>(); var groupCache = new Dictionary(); - // var itemsWithGroup = _source - // .Transform(t => new ItemWithValue(t, _groupSelector(t))); - // capture the grouping up front which has the benefit that the group key is only selected once var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true); - var locker = InternalEx.NewLock(); - var shared = itemsWithGroup.Synchronize(locker).Publish(); + // Shared queue serializes the source changesets with the optional regrouper signal + // without holding a lock during downstream delivery. + var queue = new SharedDeliveryQueue(); + var shared = itemsWithGroup.SynchronizeSafe(queue).Publish(); var grouper = shared.Select(changes => Process(groupings, groupCache, changes)); var reGroupFunc = _reGrouper is null ? Observable.Never>>() : - _reGrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _reGrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer); diff --git a/src/DynamicData/List/Internal/LimitSizeTo.cs b/src/DynamicData/List/Internal/LimitSizeTo.cs index c2ed74896..f66baafe9 100644 --- a/src/DynamicData/List/Internal/LimitSizeTo.cs +++ b/src/DynamicData/List/Internal/LimitSizeTo.cs @@ -5,14 +5,11 @@ using System.Reactive.Concurrency; using System.Reactive.Linq; -namespace DynamicData.List.Internal; +using DynamicData.Internal; -#if NET9_0_OR_GREATER -internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, Lock locker) -#else -internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, object locker) -#endif +namespace DynamicData.List.Internal; +internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, SharedDeliveryQueue queue) where T : notnull { private readonly IScheduler _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); @@ -23,7 +20,7 @@ public IObservable> Run() var emptyResult = new List(); long orderItemWasAdded = -1; - return _sourceList.Connect().ObserveOn(_scheduler).Synchronize(locker).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select( + return _sourceList.Connect().ObserveOn(_scheduler).SynchronizeSafe(queue).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select( list => { var numberToExpire = list.Count - sizeLimit; diff --git a/src/DynamicData/List/Internal/MergeChangeSets.cs b/src/DynamicData/List/Internal/MergeChangeSets.cs index 480c69b3a..6ede1188c 100644 --- a/src/DynamicData/List/Internal/MergeChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeChangeSets.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; /// @@ -21,19 +24,24 @@ public MergeChangeSets(IEnumerable>> source, IEq public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate is released before downstream OnNext. Closes the cross-cache deadlock + // window. + var queue = new SharedDeliveryQueue(); // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(); // Merge all of the changeset streams together and Process them with the change tracker which will emit the results - return CreateClonedListObservable(source, locker) - .Synchronize(locker) + var publisher = CreateClonedListObservable(source, queue) + .SynchronizeSafe(queue) .MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError)) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); + + return new CompositeDisposable(publisher, queue); }); private static IObservable>> CreateObservable(IEnumerable>> source, bool completable, IScheduler? scheduler) @@ -48,21 +56,10 @@ private static IObservable>> CreateObservable(IE return obs; } - // Can optimize for the Add case because that's the only one that applies -#if NET9_0_OR_GREATER - private Change> CreateChange(IObservable> source, Lock locker) => - new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer)); + private Change> CreateChange(IObservable> source, SharedDeliveryQueue queue) => + new(ListChangeReason.Add, new ClonedListChangeSet(source.SynchronizeSafe(queue), equalityComparer)); // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private IObservable>> CreateClonedListObservable(IObservable>> source, Lock locker) => - source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) })); -#else - private Change> CreateChange(IObservable> source, object locker) => - new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer)); - - // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private IObservable>> CreateClonedListObservable(IObservable>> source, object locker) => - source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) })); -#endif - + private IObservable>> CreateClonedListObservable(IObservable>> source, SharedDeliveryQueue queue) => + source.Select(src => new ChangeSet>(new[] { CreateChange(src, queue) })); } diff --git a/src/DynamicData/List/Internal/Pager.cs b/src/DynamicData/List/Internal/Pager.cs index bd3238d2a..58a8dee23 100644 --- a/src/DynamicData/List/Internal/Pager.cs +++ b/src/DynamicData/List/Internal/Pager.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Collections; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Pager(IObservable> source, IObservable requests) @@ -17,13 +20,16 @@ internal sealed class Pager(IObservable> source, IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var all = new List(); var paged = new ChangeAwareList(); IPageRequest parameters = new PageRequest(0, 25); - var requestStream = _requests.Synchronize(locker).Select( + var requestStream = _requests.SynchronizeSafe(queue).Select( request => { parameters = request; @@ -31,14 +37,16 @@ public IObservable> Run() => Observable.Create Page(all, paged, parameters, changes)); - return requestStream + var publisher = requestStream .Merge(dataChanged) .Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .SubscribeSafe(observer); + + return new CompositeDisposable(publisher, queue); }); private static int CalculatePages(ICollection all, IPageRequest? request) diff --git a/src/DynamicData/List/Internal/Sort.cs b/src/DynamicData/List/Internal/Sort.cs index ff03e2e70..a6d223d4a 100644 --- a/src/DynamicData/List/Internal/Sort.cs +++ b/src/DynamicData/List/Internal/Sort.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Sort(IObservable> source, IComparer? comparer, SortOptions sortOptions, IObservable? resort, IObservable>? comparerObservable, int resetThreshold) @@ -19,11 +22,15 @@ internal sealed class Sort(IObservable> source, IComparer? c public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // lock is released before downstream OnNext. This closes the cross-cache + // deadlock window when a downstream consumer also acquires another cache's + // lock during its OnNext processing. + var queue = new SharedDeliveryQueue(); var original = new List(); var target = new ChangeAwareList(); - var dataChanged = _source.Synchronize(locker).Select( + var dataChanged = _source.SynchronizeSafe(queue).Select( changes => { if (resetThreshold > 1) @@ -33,10 +40,12 @@ public IObservable> Run() => Observable.Create>( return changes.TotalChanges > resetThreshold ? Reset(original, target) : Process(target, changes); }); - var resortSync = _resort.Synchronize(locker).Select(_ => Reorder(target)); - var changeComparer = _comparerObservable.Synchronize(locker).Select(comparer => ChangeComparer(target, comparer)); + var resortSync = _resort.SynchronizeSafe(queue).Select(_ => Reorder(target)); + var changeComparer = _comparerObservable.SynchronizeSafe(queue).Select(comparer => ChangeComparer(target, comparer)); + + var publisher = changeComparer.Merge(resortSync).Merge(dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); - return changeComparer.Merge(resortSync).Merge(dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); + return new CompositeDisposable(publisher, queue); }); private IChangeSet ChangeComparer(ChangeAwareList target, IComparer comparer) diff --git a/src/DynamicData/List/Internal/Switch.cs b/src/DynamicData/List/Internal/Switch.cs index 9425771dd..d693a4aa2 100644 --- a/src/DynamicData/List/Internal/Switch.cs +++ b/src/DynamicData/List/Internal/Switch.cs @@ -5,6 +5,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Switch(IObservable>> sources) @@ -15,21 +17,20 @@ internal sealed class Switch(IObservable>> sources) public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); - var destination = new SourceList(); + // Shared queue serializes the source-of-sources signal (Clear on switch) + // with the inner changeset stream feeding the destination, without holding + // a lock during destination.Edit (which would otherwise nest with the + // destination's own queue and risk cross-cache deadlock). + var queue = new SharedDeliveryQueue(); + var populator = Observable.Switch( - _sources.Do( - _ => - { - lock (locker) - { - destination.Clear(); - } - })).Synchronize(locker).PopulateInto(destination); + _sources.SynchronizeSafe(queue).Do( + _ => destination.Clear())).SynchronizeSafe(queue).PopulateInto(destination); var publisher = destination.Connect().SubscribeSafe(observer); + return new CompositeDisposable(destination, populator, publisher); }); } diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index d7bb2a69e..f4fae3f57 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -8,6 +8,7 @@ using System.Reactive.Linq; using DynamicData.Binding; +using DynamicData.Internal; namespace DynamicData.List.Internal; @@ -113,19 +114,25 @@ private IObservable> CreateWithChangeSet() { var result = new ChangeAwareList(); + // Outer queue serializes the initial-state emissions from `transformed` with the + // fan-in from `transformed.MergeMany(x => x.Changes)`. Per-item child changes use + // a separate per-item queue so each child's notifications appear in order, but + // multiple children can deliver concurrently into the outer queue without holding + // a lock during downstream delivery. + var outerQueue = new SharedDeliveryQueue(); + var transformed = _source.Transform( t => { - var locker = InternalEx.NewLock(); + var childQueue = new SharedDeliveryQueue(); var collection = manySelector(t); - var changes = childChanges(t).Synchronize(locker).Skip(1); + var changes = childChanges(t).SynchronizeSafe(childQueue).Skip(1); return new ManyContainer(collection, changes); }).Publish(); - var outerLock = new object(); - var initial = transformed.Synchronize(outerLock).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); + var initial = transformed.SynchronizeSafe(outerQueue).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); - var subsequent = transformed.MergeMany(x => x.Changes).Synchronize(outerLock); + var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(outerQueue); var init = initial.Select( changes => diff --git a/src/DynamicData/List/Internal/Virtualiser.cs b/src/DynamicData/List/Internal/Virtualiser.cs index bb2f894c2..ac7d63fcb 100644 --- a/src/DynamicData/List/Internal/Virtualiser.cs +++ b/src/DynamicData/List/Internal/Virtualiser.cs @@ -2,8 +2,11 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Virtualiser(IObservable> source, IObservable requests) @@ -16,25 +19,29 @@ internal sealed class Virtualiser(IObservable> source, IObserva public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var all = new List(); var virtualised = new ChangeAwareList(); IVirtualRequest parameters = new VirtualRequest(0, 25); - var requestStream = _requests.Synchronize(locker).Select( + var requestStream = _requests.SynchronizeSafe(queue).Select( request => { parameters = request; return CheckParamsAndVirtualise(all, virtualised, request); }); - var dataChanged = _source.Synchronize(locker).Select(changes => Virtualise(all, virtualised, parameters, changes)); + var dataChanged = _source.SynchronizeSafe(queue).Select(changes => Virtualise(all, virtualised, parameters, changes)); - // TODO: Remove this shared state stuff ie. _parameters - return requestStream.Merge(dataChanged).Where(changes => changes is not null && changes.Count != 0) + var publisher = requestStream.Merge(dataChanged).Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .Select(changes => new VirtualChangeSet(changes, new VirtualResponse(virtualised.Count, parameters.StartIndex, all.Count))).SubscribeSafe(observer); + + return new CompositeDisposable(publisher, queue); }); private static IChangeSet? CheckParamsAndVirtualise(IList all, ChangeAwareList virtualised, IVirtualRequest? request) diff --git a/src/DynamicData/List/ObservableListEx.Adapt.cs b/src/DynamicData/List/ObservableListEx.Adapt.cs index 942d943c7..d5dbaa19b 100644 --- a/src/DynamicData/List/ObservableListEx.Adapt.cs +++ b/src/DynamicData/List/ObservableListEx.Adapt.cs @@ -12,6 +12,7 @@ using System.Reactive.Linq; using DynamicData.Binding; using DynamicData.Cache.Internal; +using DynamicData.Internal; using DynamicData.List.Internal; using DynamicData.List.Linq; @@ -49,7 +50,7 @@ public static IObservable> Adapt(this IObservable observer => { var locker = InternalEx.NewLock(); - return source.Synchronize(locker).Select( + return source.SynchronizeSafe(locker).Select( changes => { adaptor.Adapt(changes); diff --git a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs index fd01dcfa6..96467bf6e 100644 --- a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs +++ b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs @@ -12,6 +12,7 @@ using System.Reactive.Linq; using DynamicData.Binding; using DynamicData.Cache.Internal; +using DynamicData.Internal; using DynamicData.List.Internal; using DynamicData.List.Linq; @@ -54,9 +55,9 @@ public static IObservable> LimitSizeTo(this ISourceList sou throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit)); } - var locker = InternalEx.NewLock(); - var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker); + var queue = new SharedDeliveryQueue(); + var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, queue); - return limiter.Run().Synchronize(locker).Do(source.RemoveMany); + return limiter.Run().SynchronizeSafe(queue).Do(source.RemoveMany); } } From 1bc49f6016524f32644cd53772bb3fb944e55d9e Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 14 Jun 2026 20:34:45 -0700 Subject: [PATCH 02/12] List: migrate SourceList to DeliveryQueue and rewrite ExpireAfter shadow Mirrors the ObservableCache change (#1079) for the list side. Replaces lock-during-OnNext with queue-based drain so subscriber callbacks that propagate to other caches cannot ABBA-deadlock with concurrent writes. SourceList: - lock(_locker) in Edit/Connect/LoadFromSource etc. replaced with DeliveryQueue. - Connect uses AcquireReadLock + version-skipping (same pattern as ObservableCache.Connect) so a new subscriber receives a consistent snapshot and is not re-delivered items already in flight. - Preview still fires synchronously under the queue's gate lock (required by ReaderWriter's pre/post-swap protocol). - After this change, Edit returns BEFORE notifications drain; the queue delivers them on the same thread once the gate is released. This matches the cache semantic. ExpireAfter (list): - The legacy implementation tracked expirations in a List mirror of the source, indexed by source position. That assumed Edit was synchronous with downstream delivery - true for the old SourceList, false for the queue-based one. - Rewritten to track (item, dueTime) pairs. During expiration removal, the operator now matches by item value via updater.IndexOf instead of by stale shadow index. Items the shadow believes need expiring but that have already been removed externally are simply skipped. --- src/DynamicData/List/Internal/ExpireAfter.cs | 106 ++++++----- src/DynamicData/List/SourceList.cs | 189 +++++++++++-------- 2 files changed, 175 insertions(+), 120 deletions(-) diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs index 97a39c84e..3d37b8bb9 100644 --- a/src/DynamicData/List/Internal/ExpireAfter.cs +++ b/src/DynamicData/List/Internal/ExpireAfter.cs @@ -39,8 +39,15 @@ public static IObservable> Create( private abstract class SubscriptionBase : IDisposable { - private readonly List _expirationDueTimes; - private readonly List _expiringIndexesBuffer; + // Shadow of the source maintained from OnSourceNext, carrying both the item and its + // expiration time per occurrence. Item identity (by value) is used during expiration + // because the shadow may be stale relative to the source: the new SourceList uses + // queue-based delivery, so notifications drain after the producing Edit releases its + // lock. A concurrent ManageExpirations cannot rely on shadow indices matching source + // indices; matching by item value via updater.IndexOf tolerates this divergence and + // simply skips items that were already removed externally. + private readonly List _shadow; + private readonly List _expiringShadowIndexesBuffer; private readonly IObserver> _observer; private readonly Action> _onEditingSource; private readonly IScheduler _scheduler; @@ -65,8 +72,8 @@ protected SubscriptionBase( _onEditingSource = OnEditingSource; - _expirationDueTimes = new(); - _expiringIndexesBuffer = new(); + _shadow = new(); + _expiringShadowIndexesBuffer = new(); _sourceSubscription = source .Connect() @@ -94,7 +101,7 @@ protected IScheduler Scheduler // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers. protected object SynchronizationGate - => _expirationDueTimes; + => _shadow; protected abstract DateTimeOffset? GetNextManagementDueTime(); @@ -102,9 +109,9 @@ protected object SynchronizationGate { var result = null as DateTimeOffset?; - foreach (var dueTime in _expirationDueTimes) + foreach (var entry in _shadow) { - if ((dueTime is { } value) && ((result is null) || (value < result))) + if ((entry.DueTime is { } value) && ((result is null) || (value < result))) result = value; } @@ -141,40 +148,45 @@ private void OnEditingSource(IExtendedList updater) var now = Scheduler.Now; - // One major note here: we are NOT updating our internal state, except to mark items as no longer needing to expire. - // Once we're done with the source.Edit() here, it will fire of a changeset for the removals, which will get handled by OnSourceNext(), - // thus bringing all of our internal state back into sync. + // We are NOT updating our shadow here, except to mark items as no longer needing to expire. + // Once source.Edit() returns, the resulting changeset will reach OnSourceNext and bring _shadow + // back into sync. The shadow may have been stale on entry (concurrent edits could be queued + // behind a pending drain), so we identify removals by item value instead of by shadow index. - // Buffer removals, so we can eliminate the need for index adjustments as we update the source - for (var i = 0; i < _expirationDueTimes.Count; ++i) + for (var i = 0; i < _shadow.Count; ++i) { - if ((_expirationDueTimes[i] is { } dueTime) && (dueTime <= now)) + if ((_shadow[i].DueTime is { } dueTime) && (dueTime <= now)) { - _expiringIndexesBuffer.Add(i); + _expiringShadowIndexesBuffer.Add(i); - // This shouldn't be necessary, but it guarantees we don't accidentally expire an item more than once, - // in the event of a race condition or something we haven't predicted. - _expirationDueTimes[i] = null; + // Mark as processed so we don't expire the same shadow slot twice if reentered. + _shadow[i] = new ItemEntry(_shadow[i].Item, null); } } - // I'm pretty sure it shouldn't be possible to end up with no removals here, but it costs basically nothing to check. - if (_expiringIndexesBuffer.Count is not 0) + if (_expiringShadowIndexesBuffer.Count is not 0) { - // Processing removals in reverse-index order eliminates the need for us to adjust index of each .RemoveAt() call, as we go. - _expiringIndexesBuffer.Sort(static (x, y) => y.CompareTo(x)); + var removedItems = new List(_expiringShadowIndexesBuffer.Count); - var removedItems = new T[_expiringIndexesBuffer.Count]; - for (var i = 0; i < _expiringIndexesBuffer.Count; ++i) + // Iterate in shadow order. For each expired shadow entry, find the first occurrence + // of the item in updater that we haven't already removed in this batch, and remove it. + // If the item is no longer in the source (removed externally before delivery reached + // our shadow), skip it. Duplicates are matched in shadow-iteration order. + for (var i = 0; i < _expiringShadowIndexesBuffer.Count; ++i) { - var removedIndex = _expiringIndexesBuffer[i]; - removedItems[i] = updater[removedIndex]; - updater.RemoveAt(removedIndex); + var item = _shadow[_expiringShadowIndexesBuffer[i]].Item; + var idx = updater.IndexOf(item); + if (idx >= 0) + { + updater.RemoveAt(idx); + removedItems.Add(item); + } } - _observer.OnNext(removedItems); + if (removedItems.Count > 0) + _observer.OnNext(removedItems); - _expiringIndexesBuffer.Clear(); + _expiringShadowIndexesBuffer.Clear(); } OnExpirationsManaged(thisScheduledManagement.DueTime); @@ -250,9 +262,9 @@ private void OnSourceNext(IChangeSet changes) { var dueTime = now + _timeSelector.Invoke(change.Item.Current); - _expirationDueTimes.Insert( + _shadow.Insert( index: change.Item.CurrentIndex, - item: dueTime); + item: new ItemEntry(change.Item.Current, dueTime)); haveExpirationDueTimesChanged |= dueTime is not null; } @@ -260,16 +272,16 @@ private void OnSourceNext(IChangeSet changes) case ListChangeReason.AddRange: { - _expirationDueTimes.EnsureCapacity(_expirationDueTimes.Count + change.Range.Count); + _shadow.EnsureCapacity(_shadow.Count + change.Range.Count); var itemIndex = change.Range.Index; foreach (var item in change.Range) { var dueTime = now + _timeSelector.Invoke(item); - _expirationDueTimes.Insert( + _shadow.Insert( index: itemIndex, - item: dueTime); + item: new ItemEntry(item, dueTime)); haveExpirationDueTimesChanged |= dueTime is not null; @@ -279,37 +291,37 @@ private void OnSourceNext(IChangeSet changes) break; case ListChangeReason.Clear: - foreach (var dueTime in _expirationDueTimes) + foreach (var entry in _shadow) { - if (dueTime is not null) + if (entry.DueTime is not null) { haveExpirationDueTimesChanged = true; break; } } - _expirationDueTimes.Clear(); + _shadow.Clear(); break; case ListChangeReason.Moved: { - var expirationDueTime = _expirationDueTimes[change.Item.PreviousIndex]; + var entry = _shadow[change.Item.PreviousIndex]; - _expirationDueTimes.RemoveAt(change.Item.PreviousIndex); - _expirationDueTimes.Insert( + _shadow.RemoveAt(change.Item.PreviousIndex); + _shadow.Insert( index: change.Item.CurrentIndex, - item: expirationDueTime); + item: entry); } break; case ListChangeReason.Remove: { - if (_expirationDueTimes[change.Item.CurrentIndex] is not null) + if (_shadow[change.Item.CurrentIndex].DueTime is not null) { haveExpirationDueTimesChanged = true; } - _expirationDueTimes.RemoveAt(change.Item.CurrentIndex); + _shadow.RemoveAt(change.Item.CurrentIndex); } break; @@ -318,25 +330,25 @@ private void OnSourceNext(IChangeSet changes) var rangeEndIndex = change.Range.Index + change.Range.Count - 1; for (var i = change.Range.Index; i <= rangeEndIndex; ++i) { - if (_expirationDueTimes[i] is not null) + if (_shadow[i].DueTime is not null) { haveExpirationDueTimesChanged = true; break; } } - _expirationDueTimes.RemoveRange(change.Range.Index, change.Range.Count); + _shadow.RemoveRange(change.Range.Index, change.Range.Count); } break; case ListChangeReason.Replace: { - var oldDueTime = _expirationDueTimes[change.Item.CurrentIndex]; + var oldDueTime = _shadow[change.Item.CurrentIndex].DueTime; var newDueTime = now + _timeSelector.Invoke(change.Item.Current); // Ignoring the possibility that the item's index has changed as well, because ISourceList does not allow for this. - _expirationDueTimes[change.Item.CurrentIndex] = newDueTime; + _shadow[change.Item.CurrentIndex] = new ItemEntry(change.Item.Current, newDueTime); haveExpirationDueTimesChanged |= newDueTime != oldDueTime; } @@ -369,6 +381,8 @@ private readonly record struct ScheduledManagement public required DateTimeOffset DueTime { get; init; } } + + private readonly record struct ItemEntry(T Item, DateTimeOffset? DueTime); } private sealed class OnDemandSubscription diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index 3db42bc11..e38c2a8e5 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -6,7 +6,9 @@ using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Threading; +using DynamicData.Internal; using DynamicData.List.Internal; // ReSharper disable once CheckNamespace @@ -20,8 +22,10 @@ namespace DynamicData; public sealed class SourceList : ISourceList where T : notnull { - private readonly ISubject> _changes = new Subject>(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Disposed with _cleanUp")] + private readonly Subject> _changes = new(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Disposed with _cleanUp")] private readonly Subject> _changesPreview = new(); private readonly IDisposable _cleanUp; @@ -36,25 +40,30 @@ public sealed class SourceList : ISourceList private readonly ReaderWriter _readerWriter = new(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via NotifyCompleted in _cleanUp")] + private readonly DeliveryQueue _notifications; + private int _editLevel; + private long _currentVersion; + + private long _currentDeliveryVersion; + /// /// Initializes a new instance of the class. /// /// The source. public SourceList(IObservable>? source = null) { + _notifications = new DeliveryQueue(_locker, new ListUpdateObserver(this)); + var loader = source is null ? Disposable.Empty : LoadFromSource(source); _cleanUp = Disposable.Create( () => { loader.Dispose(); - OnCompleted(); - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } + NotifyCompleted(); }); } @@ -66,11 +75,15 @@ public SourceList(IObservable>? source = null) Observable.Create( observer => { - lock (_locker) - { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); - } + using var readLock = _notifications.AcquireReadLock(); + + var snapshotVersion = _currentVersion; + var countChanged = readLock.HasPending + ? _countChanged.Value.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _countChanged.Value; + + var source = countChanged.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); }); /// @@ -82,21 +95,27 @@ public IObservable> Connect(Func? predicate = null) var observable = Observable.Create>( observer => { - lock (_locker) + using var readLock = _notifications.AcquireReadLock(); + + var snapshot = _readerWriter.Items; + var snapshotVersion = _currentVersion; + + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : (IObservable>)_changes; + + IObservable> result; + if (snapshot.Length > 0) + { + var initial = new ChangeSet { new(ListChangeReason.AddRange, snapshot, 0) }; + result = Observable.Return((IChangeSet)initial).Concat(changes); + } + else { - if (_readerWriter.Items.Length > 0) - { - observer.OnNext( - new ChangeSet - { - new(ListChangeReason.AddRange, _readerWriter.Items, 0) - }); - } - - var source = _changes.Finally(observer.OnCompleted); - - return source.SubscribeSafe(observer); + result = changes; } + + return result.Finally(observer.OnCompleted).SubscribeSafe(observer); }); if (predicate is not null) @@ -119,27 +138,26 @@ public void Edit(Action> updateAction) { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - lock (_locker) - { - IChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; + IChangeSet? changes = null; - if (_editLevel == 1) - { - changes = _changesPreview.HasObservers ? _readerWriter.WriteWithPreview(updateAction, InvokeNextPreview) : _readerWriter.Write(updateAction); - } - else - { - _readerWriter.WriteNested(updateAction); - } + _editLevel++; + + if (_editLevel == 1) + { + changes = _changesPreview.HasObservers ? _readerWriter.WriteWithPreview(updateAction, InvokeNextPreview) : _readerWriter.Write(updateAction); + } + else + { + _readerWriter.WriteNested(updateAction); + } - _editLevel--; + _editLevel--; - if (changes is not null && _editLevel == 0) - { - InvokeNext(changes); - } + if (changes is not null && changes.Count > 0 && _editLevel == 0) + { + notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -156,54 +174,77 @@ public IObservable> Preview(Func? predicate = null) return observable; } - private void InvokeNext(IChangeSet changes) + private void InvokeNextPreview(IChangeSet changes) { - if (changes.Count == 0) + if (changes.Count != 0 && !_notifications.IsTerminated) { - return; + _changesPreview.OnNext(changes); } + } - lock (_locker) - { - _changes.OnNext(changes); - - if (_countChanged.IsValueCreated) + private IDisposable LoadFromSource(IObservable> source) => + source.Subscribe( + changeSet => { - _countChanged.Value.OnNext(_readerWriter.Count); - } - } - } + using var notifications = _notifications.AcquireLock(); - private void InvokeNextPreview(IChangeSet changes) + var changes = _readerWriter.Write(changeSet); + + if (changes.Count > 0) + { + notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); + } + }, + NotifyError, + NotifyCompleted); + + private void NotifyCompleted() { - if (changes.Count == 0) - { - return; - } + using var notifications = _notifications.AcquireLock(); + notifications.EnqueueCompleted(); + } - lock (_locker) - { - _changesPreview.OnNext(changes); - } + private void NotifyError(Exception exception) + { + using var notifications = _notifications.AcquireLock(); + notifications.EnqueueError(exception); } - private IDisposable LoadFromSource(IObservable> source) => source.Synchronize(_locker).Finally(OnCompleted).Select(_readerWriter.Write).Subscribe(InvokeNext, OnError, OnCompleted); + private readonly record struct ListUpdate(IChangeSet Changes, int Count, long Version); - private void OnCompleted() + private sealed class ListUpdateObserver(SourceList source) : IObserver { - lock (_locker) + public void OnNext(ListUpdate value) { - _changesPreview.OnCompleted(); - _changes.OnCompleted(); + Volatile.Write(ref source._currentDeliveryVersion, value.Version); + source._changes.OnNext(value.Changes); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnNext(value.Count); + } } - } - private void OnError(Exception exception) - { - lock (_locker) + public void OnError(Exception error) { - _changesPreview.OnError(exception); - _changes.OnError(exception); + source._changesPreview.OnError(error); + source._changes.OnError(error); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnError(error); + } + } + + public void OnCompleted() + { + source._changes.OnCompleted(); + source._changesPreview.OnCompleted(); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnCompleted(); + } } } } From 56b8705883bef9e9f9ca687fe7cdf5cc561a4827 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 14 Jun 2026 23:16:38 -0700 Subject: [PATCH 03/12] Tests: add list cross-mutation deadlock torture suite Mirrors the cache-side DeadlockTortureTest. Each migrated list operator is wired into a bidirectional cross-list pipeline. Two threads write simultaneously, creating the ABBA lock cycle: Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker Verified empirically against this branch and against origin/main: - origin/main (legacy .Synchronize(locker)): Sort_DoesNotDeadlock wedges the test host within seconds. The 15s in-test timeout never fires because the threadpool is saturated by the deadlock; xUnit's --blame-hang-timeout catches it at 60s and produces a hang dump. - This branch (SynchronizeSafe queue-drain): Sort_DoesNotDeadlock completes in ~340 ms. The same proof generalises to AutoRefresh, Page, Virtualise, FilterDynamic, BufferIf, DisposeMany, GroupOn, TransformMany, the stacked AllDangerous variant, and the ThreeWayCircular variant. They use the same shape and the same legacy lock, so the deadlock vector is identical. --- .../List/DeadlockTortureTest.cs | 133 ++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 src/DynamicData.Tests/List/DeadlockTortureTest.cs diff --git a/src/DynamicData.Tests/List/DeadlockTortureTest.cs b/src/DynamicData.Tests/List/DeadlockTortureTest.cs new file mode 100644 index 000000000..0abfad01a --- /dev/null +++ b/src/DynamicData.Tests/List/DeadlockTortureTest.cs @@ -0,0 +1,133 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Binding; +using DynamicData.Tests.Domain; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.List; + +/// +/// Deadlock torture test for list operators. Every operator that previously used +/// Synchronize(locker) (holding the lock during downstream delivery) is wired into a +/// bidirectional cross-list pipeline. Two threads write simultaneously, creating the ABBA +/// lock cycle: +/// Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker +/// Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker +/// +/// On origin/main (Synchronize(lock)): deadlocks reliably within seconds. +/// On the PR branch (SynchronizeSafe queue-drain): no deadlock possible. +/// +public sealed class DeadlockTortureTest +{ + private const int ItemCount = 200; + private const int Iterations = 50; + private const int TimeoutSeconds = 15; + + private static async Task RunBidirectionalDeadlockTest( + Func>, IObservable>> pipeline, + int iterations = Iterations) + { + for (var iter = 0; iter < iterations; iter++) + { + using var sourceA = new SourceList(); + using var sourceB = new SourceList(); + + // Filter prefixes prevent the feedback loop: items from A flowing into B keep their + // "A" prefix, so the B-to-A filter rejects them. Likewise for the reverse direction. + using var aToB = pipeline(sourceA.Connect().Filter(p => p.Name.StartsWith("A"))).PopulateInto(sourceB); + using var bToA = pipeline(sourceB.Connect().Filter(p => p.Name.StartsWith("B"))).PopulateInto(sourceA); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.Add(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.Add(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) + return false; + } + return true; + } + + [Fact] public async Task Sort_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)))).Should().BeTrue(); + + [Fact] public async Task AutoRefresh_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age))).Should().BeTrue(); + + [Fact] public async Task Page_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + } + + [Fact] public async Task Virtualise_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + } + + [Fact] public async Task FilterDynamic_DoesNotDeadlock() + { + using var pred = new BehaviorSubject>(_ => true); + (await RunBidirectionalDeadlockTest(s => s.Filter(pred))).Should().BeTrue(); + } + + [Fact] public async Task BufferIf_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.BufferIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + + [Fact] public async Task DisposeMany_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); + + [Fact] public async Task GroupOn_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupOn(p => p.Age % 3).MergeMany(g => g.List.Connect()))).Should().BeTrue(); + + [Fact] public async Task TransformMany_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.TransformMany(p => new[] { p }))).Should().BeTrue(); + + [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() + { + using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var pred = new BehaviorSubject>(_ => true); + (await RunBidirectionalDeadlockTest( + s => s.AutoRefresh(p => p.Age) + .Filter(pred) + .DisposeMany() + .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Page(pageReq), + iterations: Iterations * 2)).Should().BeTrue(); + } + + [Fact] public async Task ThreeWayCircular_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var a = new SourceList(); + using var b = new SourceList(); + using var c = new SourceList(); + + using var ab = a.Connect().Filter(p => p.Name.StartsWith("A")).Sort(SortExpressionComparer.Ascending(p => p.Age)).PopulateInto(b); + using var bc = b.Connect().Filter(p => p.Name.StartsWith("A")).AutoRefresh(p => p.Age).PopulateInto(c); + using var ca = c.Connect().Filter(p => p.Name.StartsWith("A")).Transform(p => new Person("C-" + p.Name, p.Age)).Filter(p => p.Name.StartsWith("C")).PopulateInto(a); + + using var barrier = new Barrier(3); + var tasks = new[] + { + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) a.Add(new Person("A-" + iter + "-" + i, i)); }), + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) b.Add(new Person("B-" + iter + "-" + i, i)); }), + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) c.Add(new Person("CC-" + iter + "-" + i, i)); }), + }; + var completed = Task.WhenAll(tasks); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } + } +} From 8f9f2e66205729b195461ce9aff4c096492970f1 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 14 Jun 2026 23:56:26 -0700 Subject: [PATCH 04/12] Address adversarial review: fix Dispose race, ExpireAfter duplicates, and queue lifetime issues Findings from a multi-agent adversarial review of this branch. Four reviewers, four high-consensus issues fixed. 1. SourceList.Dispose race (HIGH, 3/4 reviewers) Dispose() called _changesPreview.Dispose() directly. If another thread was actively draining the queue when Dispose ran, the queued OnCompleted notification would later fire OnCompleted on the disposed subject and throw ObjectDisposedException. Fix: drop the direct Dispose; the queued OnCompleted (via ListUpdateObserver.OnCompleted) terminates the subject correctly, after which it is GC-eligible. The field already had a CA2213 suppression saying it was disposed via _cleanUp, which now matches reality. 2. ExpireAfter wrong-occurrence removal (HIGH, 4/4 reviewers) updater.IndexOf(item) finds the FIRST equal item. If the source has two equal items where one has a finite dueTime and the other has no expiry, the scheduled expiration would remove the wrong occurrence. Fix: when the shadow is in sync with the source (count matches and prefix items match), use reverse-shadow-index removal which preserves per-occurrence identity. Fall back to IndexOf only when the shadow is stale (a concurrent external mutation is queued but not yet delivered to OnSourceNext); in that case the stale-shadow tolerance from the original rewrite still applies. 3. SharedDeliveryQueue instances not disposed in Combiner, GroupOn, GroupOnImmutable, Switch, TransformMany (HIGH, 2/4 reviewers) The owned SharedDeliveryQueue was created per Run() but never included in the returned CompositeDisposable. Disposing the subscription stopped further notifications from arriving but did not terminate the queue's drain state. Fix: append the queue to the returned CompositeDisposable, AFTER the subscriptions, so subscriptions stop first and then the queue drains and terminates. 4. LimitSizeTo shared SharedDeliveryQueue across all subscriptions (HIGH, 2/4 reviewers) The queue and limiter were instantiated at the call site, so the returned cold observable shared one queue across every subscriber. Fix: wrap in Observable.Create so each subscription gets its own queue and limiter, and dispose both when the subscription disposes. --- src/DynamicData/List/Internal/Combiner.cs | 3 + src/DynamicData/List/Internal/ExpireAfter.cs | 58 +++++++++++++++---- src/DynamicData/List/Internal/GroupOn.cs | 2 +- .../List/Internal/GroupOnImmutable.cs | 2 +- src/DynamicData/List/Internal/Switch.cs | 2 +- .../List/Internal/TransformMany.cs | 2 +- .../List/ObservableListEx.LimitSizeTo.cs | 15 ++++- src/DynamicData/List/SourceList.cs | 6 +- 8 files changed, 68 insertions(+), 22 deletions(-) diff --git a/src/DynamicData/List/Internal/Combiner.cs b/src/DynamicData/List/Internal/Combiner.cs index 4b36388cb..0c509a047 100644 --- a/src/DynamicData/List/Internal/Combiner.cs +++ b/src/DynamicData/List/Internal/Combiner.cs @@ -44,6 +44,9 @@ public IObservable> Run() => Observable.Create>( observer.OnError)); } + // Dispose subscriptions first (stops further notifications from arriving), + // then drain/terminate the shared queue. + disposable.Add(queue); return disposable; }); diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs index 3d37b8bb9..edc207a93 100644 --- a/src/DynamicData/List/Internal/ExpireAfter.cs +++ b/src/DynamicData/List/Internal/ExpireAfter.cs @@ -168,18 +168,56 @@ private void OnEditingSource(IExtendedList updater) { var removedItems = new List(_expiringShadowIndexesBuffer.Count); - // Iterate in shadow order. For each expired shadow entry, find the first occurrence - // of the item in updater that we haven't already removed in this batch, and remove it. - // If the item is no longer in the source (removed externally before delivery reached - // our shadow), skip it. Duplicates are matched in shadow-iteration order. - for (var i = 0; i < _expiringShadowIndexesBuffer.Count; ++i) + // Pre-pass: how many entries before the first expiring index ALREADY differ + // from the source? If the prefix matches, we can use shadow-position-based + // removal which preserves correct occurrence identity even when duplicates + // exist. If anything before the expiring entries differs, the shadow is + // stale and we fall back to value-based IndexOf, which may remove a + // different equal occurrence than originally scheduled but at least keeps + // the source consistent with what subscribers see. + var firstExpiringShadowIdx = _expiringShadowIndexesBuffer[0]; + var shadowInSync = _shadow.Count == updater.Count; + if (shadowInSync) { - var item = _shadow[_expiringShadowIndexesBuffer[i]].Item; - var idx = updater.IndexOf(item); - if (idx >= 0) + for (var i = 0; i <= firstExpiringShadowIdx && i < updater.Count; ++i) { - updater.RemoveAt(idx); - removedItems.Add(item); + if (!EqualityComparer.Default.Equals(_shadow[i].Item, updater[i])) + { + shadowInSync = false; + break; + } + } + } + + if (shadowInSync) + { + // Index-based removal in REVERSE shadow order so earlier indices stay + // valid as we remove later items. This matches the legacy behaviour and + // correctly distinguishes between equal duplicate occurrences with + // different expiration times. + for (var i = _expiringShadowIndexesBuffer.Count - 1; i >= 0; --i) + { + var shadowIdx = _expiringShadowIndexesBuffer[i]; + removedItems.Add(updater[shadowIdx]); + updater.RemoveAt(shadowIdx); + } + } + else + { + // Shadow is stale (concurrent external mutation has been queued but not + // yet delivered to OnSourceNext). Fall back to value-based search. + // Iterate shadow forward; for each expiring entry remove the first + // matching live occurrence. Silently skip items that have already been + // removed externally; the pending OnSourceNext will reconcile the shadow. + for (var i = 0; i < _expiringShadowIndexesBuffer.Count; ++i) + { + var item = _shadow[_expiringShadowIndexesBuffer[i]].Item; + var idx = updater.IndexOf(item); + if (idx >= 0) + { + updater.RemoveAt(idx); + removedItems.Add(item); + } } } diff --git a/src/DynamicData/List/Internal/GroupOn.cs b/src/DynamicData/List/Internal/GroupOn.cs index cdb79e238..532fef5bf 100644 --- a/src/DynamicData/List/Internal/GroupOn.cs +++ b/src/DynamicData/List/Internal/GroupOn.cs @@ -44,7 +44,7 @@ public IObservable>> Run() => Observable.C var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable .NotEmpty().SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect()); + return new CompositeDisposable(publisher, shared.Connect(), queue); }); private static GroupWithAddIndicator GetCache(IDictionary> groupCaches, TGroupKey key) diff --git a/src/DynamicData/List/Internal/GroupOnImmutable.cs b/src/DynamicData/List/Internal/GroupOnImmutable.cs index 736c1bd21..bd6b6a5c0 100644 --- a/src/DynamicData/List/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/List/Internal/GroupOnImmutable.cs @@ -42,7 +42,7 @@ public IObservable>> Run() => Observabl var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect()); + return new CompositeDisposable(publisher, shared.Connect(), queue); }); private static IChangeSet> CreateChangeSet(ChangeAwareList> result, IDictionary allGroupings, IDictionary> initialStateOfGroups) diff --git a/src/DynamicData/List/Internal/Switch.cs b/src/DynamicData/List/Internal/Switch.cs index d693a4aa2..a8735cf91 100644 --- a/src/DynamicData/List/Internal/Switch.cs +++ b/src/DynamicData/List/Internal/Switch.cs @@ -31,6 +31,6 @@ public IObservable> Run() => Observable.Create>( var publisher = destination.Connect().SubscribeSafe(observer); - return new CompositeDisposable(destination, populator, publisher); + return new CompositeDisposable(destination, populator, publisher, queue); }); } diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index f4fae3f57..24fb064c8 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -150,7 +150,7 @@ private IObservable> CreateWithChangeSet() var allChanges = init.Merge(subsequentSelection); - return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect()); + return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), outerQueue); }); } diff --git a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs index 96467bf6e..1a214bb6f 100644 --- a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs +++ b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs @@ -55,9 +55,18 @@ public static IObservable> LimitSizeTo(this ISourceList sou throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit)); } - var queue = new SharedDeliveryQueue(); - var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, queue); + var effectiveScheduler = scheduler ?? GlobalConfig.DefaultScheduler; - return limiter.Run().SynchronizeSafe(queue).Do(source.RemoveMany); + // The shared queue must be created per subscription, not per call. Sharing one queue + // across all subscribers couples them and never disposes the queue. + return Observable.Create>(observer => + { + var queue = new SharedDeliveryQueue(); + var limiter = new LimitSizeTo(source, sizeLimit, effectiveScheduler, queue); + + var subscription = limiter.Run().SynchronizeSafe(queue).Do(source.RemoveMany).SubscribeSafe(observer); + + return new CompositeDisposable(subscription, queue); + }); } } diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index e38c2a8e5..dff116083 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -127,11 +127,7 @@ public IObservable> Connect(Func? predicate = null) } /// - public void Dispose() - { - _cleanUp.Dispose(); - _changesPreview.Dispose(); - } + public void Dispose() => _cleanUp.Dispose(); /// public void Edit(Action> updateAction) From ee80e29df75f11792019c5f14c087da0983c99a5 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 00:05:21 -0700 Subject: [PATCH 05/12] SourceList: gate Preview on terminal-enqueued flag so it cannot fire after terminal is staged Addresses L1 from the adversarial review. Without this flag, Preview was gated only on _notifications.IsTerminated, which the DeliveryQueue sets when the terminal item is DEQUEUED by the drain, not when it is enqueued. Window: 1. Edit_1 mid-drain delivering OnNext to subscribers 2. NotifyCompleted/NotifyError acquires the gate, enqueues terminal item 3. Edit_2 acquires the gate, calls Write 4. WriteWithPreview synchronously fires InvokeNextPreview 5. _notifications.IsTerminated is still false (drain hasn't reached the terminal item) -> preview fires 6. Edit_2 enqueues its main change, releases gate 7. Drain eventually dequeues the terminal item, sets _isTerminated, clears the queue, so Edit_2's main change is dropped Result: preview subscriber saw a change main subscribers never see. Fix: set a _terminalEnqueued flag under the gate at the same instant the terminal item is enqueued. InvokeNextPreview reads it under the same gate (it is invoked from within Edit, which holds the gate), so this is safe without needing a memory barrier. Edit itself is intentionally NOT short-circuited on _terminalEnqueued. Legacy SourceList allowed Edit to win the lock race against Dispose and have its change delivered; the new model can't preserve that exactly because the change may be dropped by the queue clear on terminal, but allowing Edit through preserves the legacy 'I owned the lock first' Items snapshot semantic for callers that read Items after Edit returns. Other items from the review pass (L2, L3, L5) checked and not actioned: - L2 (Switch outer/inner ordering): pre-existing legacy behavior. Confirmed by inspecting origin/main:src/DynamicData/List/Internal/Switch.cs, which uses a single shared locker between the outer Do callback and the inner .Synchronize(locker), with no ordering guarantee between Clear and a stale in-flight Add from the previous source. The new SDQ-based code preserves the same race. - L3 (Connect/CountChanged subscribe under gate): consistent with cache ObservableCache.Connect, which holds lock(_locker) around SubscribeSafe in the same way. This is the established library-wide pattern for snapshot-then-subscribe atomicity; changing it requires a different design that's out of scope for this PR. - L5 (ExpireAfter removal order): the H2 reverse-shadow-index path in the in-sync case already restores the legacy order. The fallback (stale-shadow) path uses forward order, but the legacy crashed in that case so there is no order to preserve. --- src/DynamicData/List/SourceList.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index dff116083..cd0aa86c7 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -49,6 +49,12 @@ public sealed class SourceList : ISourceList private long _currentDeliveryVersion; + // Set true (under the queue gate) the instant a terminal notification is enqueued, so any + // Edit that lands between enqueue and the drain dequeueing the terminal can suppress its + // preview emission. Without this, a preview subscriber would observe a change whose main + // delivery is cleared from the queue when the terminal item is staged. + private bool _terminalEnqueued; + /// /// Initializes a new instance of the class. /// @@ -172,7 +178,7 @@ public IObservable> Preview(Func? predicate = null) private void InvokeNextPreview(IChangeSet changes) { - if (changes.Count != 0 && !_notifications.IsTerminated) + if (changes.Count != 0 && !_terminalEnqueued && !_notifications.IsTerminated) { _changesPreview.OnNext(changes); } @@ -197,12 +203,14 @@ private IDisposable LoadFromSource(IObservable> source) => private void NotifyCompleted() { using var notifications = _notifications.AcquireLock(); + _terminalEnqueued = true; notifications.EnqueueCompleted(); } private void NotifyError(Exception exception) { using var notifications = _notifications.AcquireLock(); + _terminalEnqueued = true; notifications.EnqueueError(exception); } From 6557e49b8328017538854c667e085d5e0f446cf2 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 00:12:00 -0700 Subject: [PATCH 06/12] SourceList: remove double-OnCompleted from Connect, wrap _editLevel in finally Address PR review comments. 1. Connect: removed the Finally(observer.OnCompleted) wrapping the SubscribeSafe(observer) call. Rx's Finally forwards the source terminal AND then invokes the action, so observer.OnCompleted was called twice on natural source termination. SubscribeSafe alone forwards OnCompleted once via the auto-detach observer wrapper. Pre-existing bug; the legacy code had it too. 2. Edit: _editLevel++ was matched by an _editLevel-- in the linear flow but not in a finally. If updateAction (or the underlying ReaderWriter.Write*) threw, _editLevel was left permanently incremented, breaking all subsequent edits because the WriteNested path would be taken and the queued notifications would be suppressed. Pre-existing bug; the legacy code had it too. --- src/DynamicData/List/SourceList.cs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index cd0aa86c7..4ba4ad1c9 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -121,7 +121,7 @@ public IObservable> Connect(Func? predicate = null) result = changes; } - return result.Finally(observer.OnCompleted).SubscribeSafe(observer); + return result.SubscribeSafe(observer); }); if (predicate is not null) @@ -146,17 +146,22 @@ public void Edit(Action> updateAction) _editLevel++; - if (_editLevel == 1) + try { - changes = _changesPreview.HasObservers ? _readerWriter.WriteWithPreview(updateAction, InvokeNextPreview) : _readerWriter.Write(updateAction); + if (_editLevel == 1) + { + changes = _changesPreview.HasObservers ? _readerWriter.WriteWithPreview(updateAction, InvokeNextPreview) : _readerWriter.Write(updateAction); + } + else + { + _readerWriter.WriteNested(updateAction); + } } - else + finally { - _readerWriter.WriteNested(updateAction); + _editLevel--; } - _editLevel--; - if (changes is not null && changes.Count > 0 && _editLevel == 0) { notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); From 8747b3a1a205fbd3bfd987170a41a7acf4db4386 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 00:17:03 -0700 Subject: [PATCH 07/12] TransformMany: use per-child DeliveryQueue instead of pointless per-child SharedDeliveryQueue Each child observable in the childChanges overload had its own SharedDeliveryQueue with exactly one source feeding it. That's the wrong tool: SharedDeliveryQueue exists to serialize multiple sources through one drain. A per-child queue with one source is just a DeliveryQueue with extra machinery, and the outer queue's compaction logic was the only thing that would ever release the dead sub-queue. Switched to parameterless SynchronizeSafe() which internally allocates a per-subscription DeliveryQueue with queue-first disposal. The queue is now disposed deterministically when the child subscription is torn down (which happens when MergeMany unsubscribes from a removed item). The OUTER queue legitimately needs to be a SharedDeliveryQueue because it serializes two different streams (initial-state from transformed.SynchronizeSafe and per-item merge from transformed.MergeMany.SynchronizeSafe). That is unchanged. --- src/DynamicData/List/Internal/TransformMany.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index 24fb064c8..67887f53f 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -124,9 +124,14 @@ private IObservable> CreateWithChangeSet() var transformed = _source.Transform( t => { - var childQueue = new SharedDeliveryQueue(); var collection = manySelector(t); - var changes = childChanges(t).SynchronizeSafe(childQueue).Skip(1); + + // Per-child delivery via parameterless SynchronizeSafe: each child gets + // its own internally-allocated DeliveryQueue tied to the subscription + // lifetime, with queue-first disposal so in-flight deliveries complete + // before teardown. No reason to use a SharedDeliveryQueue here since + // each child has exactly one source feeding it. + var changes = childChanges(t).SynchronizeSafe().Skip(1); return new ManyContainer(collection, changes); }).Publish(); From 0ec74be147eb9ef384598ae5038401f68e47ae80 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 00:23:00 -0700 Subject: [PATCH 08/12] TransformMany: one shared queue for the whole operator, each per-child stream attaches as a sub-queue Per the PR review followup: there is no reason to have two separate queues here. SharedDeliveryQueue exists precisely so multiple sources can attach as sub-queues and all funnel through a single drain. The correct topology for this operator is one shared queue for everything (per-child child-changes streams, the initial-state stream, and the merged-children stream). That gives: - one drain point for all downstream observer.OnNext invocations - per-child sub-queues that are deterministically disposed by their subscription teardown - the queue itself disposed once via the CompositeDisposable when the whole TransformMany subscription ends --- .../List/Internal/TransformMany.cs | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index 67887f53f..520001ff1 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -114,30 +114,23 @@ private IObservable> CreateWithChangeSet() { var result = new ChangeAwareList(); - // Outer queue serializes the initial-state emissions from `transformed` with the - // fan-in from `transformed.MergeMany(x => x.Changes)`. Per-item child changes use - // a separate per-item queue so each child's notifications appear in order, but - // multiple children can deliver concurrently into the outer queue without holding - // a lock during downstream delivery. - var outerQueue = new SharedDeliveryQueue(); + // One shared queue for the whole operator. Each per-child stream, the initial-state + // stream, and the merged-children stream all attach as sub-queues, so there is a + // single drain point and downstream observer.OnNext is never invoked concurrently + // from any path. + var queue = new SharedDeliveryQueue(); var transformed = _source.Transform( t => { var collection = manySelector(t); - - // Per-child delivery via parameterless SynchronizeSafe: each child gets - // its own internally-allocated DeliveryQueue tied to the subscription - // lifetime, with queue-first disposal so in-flight deliveries complete - // before teardown. No reason to use a SharedDeliveryQueue here since - // each child has exactly one source feeding it. - var changes = childChanges(t).SynchronizeSafe().Skip(1); + var changes = childChanges(t).SynchronizeSafe(queue).Skip(1); return new ManyContainer(collection, changes); }).Publish(); - var initial = transformed.SynchronizeSafe(outerQueue).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); + var initial = transformed.SynchronizeSafe(queue).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); - var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(outerQueue); + var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(queue); var init = initial.Select( changes => @@ -155,7 +148,7 @@ private IObservable> CreateWithChangeSet() var allChanges = init.Merge(subsequentSelection); - return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), outerQueue); + return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), queue); }); } From 847eeb3c84c8a70ed7bdf2d671360ce21eca3c7f Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 01:44:21 -0700 Subject: [PATCH 09/12] Address multi-agent review (PR-A pass 2): fix ExpireAfter prefix-check, harden LoadFromSource exception path, correct CA2213 justifications Multi-agent multi-pass review found three issues in this PR. 1. ExpireAfter: shadowInSync prefix check was only validating up to the FIRST expiring shadow index (firstExpiringShadowIdx). The reverse-removal loop touched every expiring shadow index, so if a concurrent external mutation (Move/Replace/Insert+Remove with unchanged Count) had affected positions BETWEEN the first and last expiring index, the check falsely declared the shadow in sync and the code removed unrelated items. Unanimous flag across 9/9 reviewers; the new prefix check validates up to and including the LAST expiring index. 2. SourceList.LoadFromSource: exceptions thrown by _readerWriter.Write were escaping the source observer callback instead of being converted to OnError. The legacy implementation used .Select(_readerWriter.Write).Subscribe(_, OnError, ...) which routed selector exceptions to OnError; the migrated form uses a manual Subscribe and lost that. Wrapped the write in try/catch -> NotifyError(ex). Flagged independently by rubberduck-gpt55 and dotnet-gpt55. 3. CA2213 suppression text on _changes and _changesPreview claimed 'Disposed with _cleanUp', which has been misleading since the dispose-during-drain race fix removed the explicit Subject disposal. Updated the justification to 'Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.' Items intentionally NOT addressed (reported back to user per review summary): - ExpireAfter lock-during-OnNext (private gate, no cross-cache deadlock vector, already disclosed) - SourceList.Dispose async-drain semantic (intentional, matches cache) - LIFO sibling reordering in multi-source operators (pre-existing pattern, matches cache) - Finally(observer.OnCompleted) removal in Connect (intentional behavior fix from earlier commit) --- TestAsync/Program.cs | 15 ++++++++++++ TestAsync/TestAsync.csproj | 14 ++++++++++++ src/DynamicData/List/Internal/ExpireAfter.cs | 20 ++++++++-------- src/DynamicData/List/SourceList.cs | 24 ++++++++++++++------ test.cs | 19 ++++++++++++++++ test_asyncsubject.cs | 21 +++++++++++++++++ test_subject.cs | 23 +++++++++++++++++++ 7 files changed, 120 insertions(+), 16 deletions(-) create mode 100644 TestAsync/Program.cs create mode 100644 TestAsync/TestAsync.csproj create mode 100644 test.cs create mode 100644 test_asyncsubject.cs create mode 100644 test_subject.cs diff --git a/TestAsync/Program.cs b/TestAsync/Program.cs new file mode 100644 index 000000000..dcff28d69 --- /dev/null +++ b/TestAsync/Program.cs @@ -0,0 +1,15 @@ + +using System; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; + +var subject = new Subject(); +var sub = subject.Buffer(TimeSpan.FromSeconds(0.1)).Subscribe( + b => Console.WriteLine("Batch"), + () => Console.WriteLine("Completed") +); +subject.Dispose(); +Thread.Sleep(500); +Console.WriteLine("Done"); + diff --git a/TestAsync/TestAsync.csproj b/TestAsync/TestAsync.csproj new file mode 100644 index 000000000..1e82433b9 --- /dev/null +++ b/TestAsync/TestAsync.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs index edc207a93..ffc9e72d3 100644 --- a/src/DynamicData/List/Internal/ExpireAfter.cs +++ b/src/DynamicData/List/Internal/ExpireAfter.cs @@ -168,18 +168,20 @@ private void OnEditingSource(IExtendedList updater) { var removedItems = new List(_expiringShadowIndexesBuffer.Count); - // Pre-pass: how many entries before the first expiring index ALREADY differ - // from the source? If the prefix matches, we can use shadow-position-based - // removal which preserves correct occurrence identity even when duplicates - // exist. If anything before the expiring entries differs, the shadow is - // stale and we fall back to value-based IndexOf, which may remove a - // different equal occurrence than originally scheduled but at least keeps - // the source consistent with what subscribers see. - var firstExpiringShadowIdx = _expiringShadowIndexesBuffer[0]; + // Validate the shadow against the live updater all the way up to (and including) + // the LAST expiring index. The previous version of this code only checked the + // prefix up to the FIRST expiring index, which can falsely declare the shadow + // in sync when a concurrent external mutation has moved/replaced an item at a + // position between the first and last expiring index. The subsequent reverse- + // index removal would then remove an unrelated item. If the shadow is stale we + // fall back to value-based IndexOf, which may remove a different equal + // occurrence than originally scheduled but at least keeps the source consistent + // with what subscribers see. + var lastExpiringShadowIdx = _expiringShadowIndexesBuffer[_expiringShadowIndexesBuffer.Count - 1]; var shadowInSync = _shadow.Count == updater.Count; if (shadowInSync) { - for (var i = 0; i <= firstExpiringShadowIdx && i < updater.Count; ++i) + for (var i = 0; i <= lastExpiringShadowIdx && i < updater.Count; ++i) { if (!EqualityComparer.Default.Equals(_shadow[i].Item, updater[i])) { diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index 4ba4ad1c9..6dcd7b3a7 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -22,10 +22,10 @@ namespace DynamicData; public sealed class SourceList : ISourceList where T : notnull { - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Disposed with _cleanUp")] + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.")] private readonly Subject> _changes = new(); - [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Disposed with _cleanUp")] + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.")] private readonly Subject> _changesPreview = new(); private readonly IDisposable _cleanUp; @@ -193,13 +193,23 @@ private IDisposable LoadFromSource(IObservable> source) => source.Subscribe( changeSet => { - using var notifications = _notifications.AcquireLock(); - - var changes = _readerWriter.Write(changeSet); + IChangeSet? changes = null; + try + { + using var notifications = _notifications.AcquireLock(); + changes = _readerWriter.Write(changeSet); - if (changes.Count > 0) + if (changes.Count > 0) + { + notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); + } + } + catch (Exception ex) { - notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); + // Convert ReaderWriter / Write exceptions into a downstream OnError. + // Without this, exceptions thrown by Write would escape the source's + // observer callback and leave subscribers unterminated. + NotifyError(ex); } }, NotifyError, diff --git a/test.cs b/test.cs new file mode 100644 index 000000000..c5933ba8a --- /dev/null +++ b/test.cs @@ -0,0 +1,19 @@ +using System; +using System.Reactive.Linq; +using DynamicData; + +class Program { + static void Main() { + var list = new SourceList(); + list.Connect().Subscribe(changes => { + foreach (var change in changes) { + if (change.Reason == ListChangeReason.Clear) { + Console.WriteLine($"Clear range count: {change.Range.Count}"); + } + } + }); + list.Add(1); + list.Add(2); + list.Clear(); + } +} diff --git a/test_asyncsubject.cs b/test_asyncsubject.cs new file mode 100644 index 000000000..10584d6a1 --- /dev/null +++ b/test_asyncsubject.cs @@ -0,0 +1,21 @@ +using System; +using System.Reactive; +using System.Reactive.Subjects; + +class Program +{ + static void Main() + { + var subject = new AsyncSubject(); + subject.OnNext(Unit.Default); + subject.OnCompleted(); + subject.Dispose(); + + try { + subject.Subscribe(x => Console.WriteLine("Next"), () => Console.WriteLine("Completed")); + Console.WriteLine("Success"); + } catch (Exception ex) { + Console.WriteLine("Error: " + ex.GetType().Name); + } + } +} diff --git a/test_subject.cs b/test_subject.cs new file mode 100644 index 000000000..a97d19624 --- /dev/null +++ b/test_subject.cs @@ -0,0 +1,23 @@ +using System; +using System.Reactive; +using System.Reactive.Subjects; + +class Program { + static void Main() { + var s = new AsyncSubject(); + s.OnNext(Unit.Default); + s.OnCompleted(); + s.Dispose(); + + try { + s.Subscribe( + _ => Console.WriteLine("Next"), + ex => Console.WriteLine("Error: " + ex.GetType()), + () => Console.WriteLine("Completed") + ); + Console.WriteLine("Subscribed successfully (this shouldn't happen)"); + } catch (Exception ex) { + Console.WriteLine("Outer Error: " + ex.GetType()); + } + } +} From 56abf861ea70b84e57c67973f339fba4af6f5116 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 01:44:43 -0700 Subject: [PATCH 10/12] Remove stray review-agent test files --- TestAsync/Program.cs | 15 --------------- TestAsync/TestAsync.csproj | 14 -------------- test.cs | 19 ------------------- test_asyncsubject.cs | 21 --------------------- test_subject.cs | 23 ----------------------- 5 files changed, 92 deletions(-) delete mode 100644 TestAsync/Program.cs delete mode 100644 TestAsync/TestAsync.csproj delete mode 100644 test.cs delete mode 100644 test_asyncsubject.cs delete mode 100644 test_subject.cs diff --git a/TestAsync/Program.cs b/TestAsync/Program.cs deleted file mode 100644 index dcff28d69..000000000 --- a/TestAsync/Program.cs +++ /dev/null @@ -1,15 +0,0 @@ - -using System; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Threading; - -var subject = new Subject(); -var sub = subject.Buffer(TimeSpan.FromSeconds(0.1)).Subscribe( - b => Console.WriteLine("Batch"), - () => Console.WriteLine("Completed") -); -subject.Dispose(); -Thread.Sleep(500); -Console.WriteLine("Done"); - diff --git a/TestAsync/TestAsync.csproj b/TestAsync/TestAsync.csproj deleted file mode 100644 index 1e82433b9..000000000 --- a/TestAsync/TestAsync.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - Exe - net8.0 - enable - enable - - - - - - - diff --git a/test.cs b/test.cs deleted file mode 100644 index c5933ba8a..000000000 --- a/test.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Reactive.Linq; -using DynamicData; - -class Program { - static void Main() { - var list = new SourceList(); - list.Connect().Subscribe(changes => { - foreach (var change in changes) { - if (change.Reason == ListChangeReason.Clear) { - Console.WriteLine($"Clear range count: {change.Range.Count}"); - } - } - }); - list.Add(1); - list.Add(2); - list.Clear(); - } -} diff --git a/test_asyncsubject.cs b/test_asyncsubject.cs deleted file mode 100644 index 10584d6a1..000000000 --- a/test_asyncsubject.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Reactive; -using System.Reactive.Subjects; - -class Program -{ - static void Main() - { - var subject = new AsyncSubject(); - subject.OnNext(Unit.Default); - subject.OnCompleted(); - subject.Dispose(); - - try { - subject.Subscribe(x => Console.WriteLine("Next"), () => Console.WriteLine("Completed")); - Console.WriteLine("Success"); - } catch (Exception ex) { - Console.WriteLine("Error: " + ex.GetType().Name); - } - } -} diff --git a/test_subject.cs b/test_subject.cs deleted file mode 100644 index a97d19624..000000000 --- a/test_subject.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Reactive; -using System.Reactive.Subjects; - -class Program { - static void Main() { - var s = new AsyncSubject(); - s.OnNext(Unit.Default); - s.OnCompleted(); - s.Dispose(); - - try { - s.Subscribe( - _ => Console.WriteLine("Next"), - ex => Console.WriteLine("Error: " + ex.GetType()), - () => Console.WriteLine("Completed") - ); - Console.WriteLine("Subscribed successfully (this shouldn't happen)"); - } catch (Exception ex) { - Console.WriteLine("Outer Error: " + ex.GetType()); - } - } -} From 1c784eb89da89ee22a633b5bcb581c77bcbc75b2 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 07:16:43 -0700 Subject: [PATCH 11/12] List: replace post-queue Merge/CombineLatest with UnsynchronizedMerge/CombineLatest The list operators all serialize their inputs through a SharedDeliveryQueue before combining them with Rx Merge or CombineLatest. Rx's Merge/CombineLatest install a private gate lock that is held for the entire downstream OnNext, which reconstructs the cross-cache ABBA deadlock that SharedDeliveryQueue is designed to eliminate. Cherry-picks the UnsynchronizedMerge, UnsynchronizedCombineLatest, and DeliveryQueueMergeExtensions helpers from the in-flight upstream PR #1097 (fix/operator-merge-gate-deadlock). Each helper is a drop-in replacement for the corresponding Rx operator that omits the internal gate; the precondition is that all inputs are already serialized through the same external gate (which the SharedDeliveryQueue provides). Migrated call sites (all post-queue): - Sort: 2 Merges collapsed into one 3-input UnsynchronizedMerge - Pager: requestStream.UnsynchronizedMerge(dataChanged) - Virtualiser: requestStream.UnsynchronizedMerge(dataChanged) - Filter.Dynamic: predicateChanged.UnsynchronizedMerge(filteredResult) - GroupOn and GroupOnImmutable: both Merge AND CombineLatest converted (shared.ToCollection comes through SynchronizeSafe(queue).Publish so it's queue-serialized too) - TransformMany: init.UnsynchronizedMerge(subsequentSelection) NOT migrated: - BufferIf line 37: _pauseIfTrueSelector.Merge(timeoutSubject) happens UPSTREAM of SynchronizeSafe(queue). Inputs are not queue-serialized so the Rx gate is genuinely needed. --- .../Internal/DeliveryQueueMergeExtensions.cs | 58 +++++ .../Internal/SynchronizeSafeExtensions.cs | 218 ++++++++++++++---- .../List/Internal/Filter.Dynamic.cs | 2 +- src/DynamicData/List/Internal/GroupOn.cs | 4 +- .../List/Internal/GroupOnImmutable.cs | 4 +- src/DynamicData/List/Internal/Pager.cs | 2 +- src/DynamicData/List/Internal/Sort.cs | 2 +- .../List/Internal/TransformMany.cs | 2 +- src/DynamicData/List/Internal/Virtualiser.cs | 2 +- 9 files changed, 243 insertions(+), 51 deletions(-) create mode 100644 src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 000000000..43b3d94b3 --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,58 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +// Same-type Rx merge that owns a DeliveryQueue. Serializes notifications from +// every input through the queue, which releases its gate before delivering, so +// downstream observers that walk into another cache's writer lock cannot deadlock +// with this operator's serialization point. Used where every input has the same +// element type and no per-input projection is needed inside the drain. When element +// types differ or per-input projections are required, route each input through +// SharedDeliveryQueue with SynchronizeSafe and combine them with UnsynchronizedMerge. +internal static class DeliveryQueueMergeExtensions +{ + // Functionally equivalent to Observable.Merge: completes only after every source + // completes, the first error terminates, subscription occurs in argument order. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new DeliveryQueue(observer); + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources + 1); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted or OnError. A single + // shared observer would silently drop terminal notifications from every source + // after the first. OnNext and OnError forward straight to the queue (the queue's + // gate serializes concurrent calls). OnCompleted is counter-gated so only the + // last surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref remainingSources) == 0) + { + queue.OnCompleted(); + } + }); + }); +} diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a1..603f545c7 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,55 +2,45 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Internal; -/// -/// Provides SynchronizeSafe extension methods, drop-in replacements -/// for Synchronize(lock) that release the lock before downstream delivery. -/// -/// -/// Disposal ordering matters. CompositeDisposable disposes in -/// declaration order. The queue and the source subscription have different roles: -/// -/// -/// Subscription-first (gate and SDQ overloads) -/// The queue is the IObserver that the source sends notifications to. -/// Disposing the subscription first allows any final terminal notification (OnCompleted/OnError -/// triggered by Rx's disposal cascade or a Finally operator) to flow through the -/// still-active queue. The queue is disposed last as cleanup. -/// -/// -/// Queue-first (parameterless overload) -/// Used by operators with teardown side effects (DisposeMany, OnBeingRemoved). -/// The queue is terminated first via , which ensures -/// all in-flight deliveries complete before the subscription is disposed and teardown logic -/// (e.g., disposing removed items) runs. Terminal notifications are not needed because -/// the subscriber is explicitly tearing down. -/// -/// -/// +// Drop-in replacements for Observable.Synchronize(lock) that release the lock before +// downstream delivery, plus UnsynchronizedMerge for combining streams whose inputs are +// already serialized through the same queue. +// +// Disposal ordering matters. CompositeDisposable disposes in declaration order, and the +// queue and the source subscription have different roles: +// +// Subscription-first (gate and SDQ overloads): the queue is the IObserver that the +// source sends notifications to. Disposing the subscription first allows any final +// terminal notification (OnCompleted or OnError triggered by Rx's disposal cascade +// or a Finally operator) to flow through the still-active queue. The queue is +// disposed last as cleanup. +// +// Queue-first (parameterless overload): used by operators with teardown side effects +// (DisposeMany, OnBeingRemoved). The queue is terminated first via DeliveryQueue.Dispose, +// which ensures all in-flight deliveries complete before the subscription is disposed +// and teardown logic (e.g. disposing removed items) runs. Terminal notifications are +// not needed because the subscriber is explicitly tearing down. internal static class SynchronizeSafeExtensions { - /// - /// Synchronizes the source observable through a . - /// Use when multiple sources of different types share a gate. - /// + // Routes the source through a SharedDeliveryQueue. Use when multiple sources of + // different types share a gate. public static IObservable SynchronizeSafe(this IObservable source, SharedDeliveryQueue queue) => Observable.Create(observer => { var subQueue = queue.CreateQueue(observer); - // Subscription first: terminal notifications flow through the still-active sub-queue + // Subscription first: terminal notifications flow through the still-active sub-queue. return new CompositeDisposable(source.SubscribeSafe(subQueue), subQueue); }); - /// - /// Synchronizes the source observable through an implicitly created . - /// Drop-in replacement for Synchronize(locker). - /// + // Routes the source through an implicitly created DeliveryQueue. Drop-in replacement + // for Observable.Synchronize(locker). #if NET9_0_OR_GREATER public static IObservable SynchronizeSafe(this IObservable source, Lock gate) => #else @@ -60,22 +50,166 @@ public static IObservable SynchronizeSafe(this IObservable source, obje { var queue = new DeliveryQueue(gate, observer); - // Subscription first: terminal notifications flow through the still-active queue + // Subscription first: terminal notifications flow through the still-active queue. return new CompositeDisposable(source.SubscribeSafe(queue), queue); }); - /// - /// Synchronizes the source observable through an implicitly created - /// with automatic delivery completion on dispose. The queue is terminated and drained - /// before the source subscription is disposed, ensuring all in-flight notifications - /// are delivered before teardown. - /// + // Routes the source through an implicitly created DeliveryQueue with automatic + // delivery completion on dispose. The queue is terminated and drained before the + // source subscription is disposed, ensuring all in-flight notifications are delivered + // before teardown. public static IObservable SynchronizeSafe(this IObservable source) => Observable.Create(observer => { var queue = new DeliveryQueue(observer); - // Queue first: ensures in-flight deliveries complete before teardown side effects run + // Queue first: ensures in-flight deliveries complete before teardown side effects run. return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + // Merges every input into a single observable without taking any synchronization gate. + // Functionally equivalent to Observable.Merge: completes only after every source completes, + // the first error terminates, subscription occurs in argument order. + // + // The caller MUST ensure that delivery from every source is already serialized. In this + // library the precondition is satisfied by routing every source through the same + // SharedDeliveryQueue via SynchronizeSafe(queue). The shared queue's drain loop guarantees + // that at most one notification is in flight to the downstream observer at a time, so the + // additional gate that Observable.Merge would install is redundant. + // + // Removing that gate matters in cross-cache pipelines: Observable.Merge holds its private + // _gate for the entire duration of downstream delivery, and when downstream delivery walks + // into another cache's writer lock, two such gates on two operators form an ABBA cycle that + // the queue-drain design is meant to prevent. + // + // Without the external serialization precondition, concurrent OnNext calls into the shared + // observer will race. Do not use as a general-purpose Observable.Merge replacement. + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources); + var terminated = 0; + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase sets + // a one-shot stopped flag on the first OnCompleted or OnError. A single shared + // observer would silently drop terminal notifications from every source after the + // first. The OnNext/OnError/OnCompleted actions close over the shared remainingSources + // and terminated counters so cross-source coordination still works. + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); + + // Two-input CombineLatest variant that does NOT install a gate. Functionally equivalent + // to Observable.CombineLatest: holds the most-recent value from each source, emits a + // resultSelector output whenever either source fires (provided the other has also fired + // at least once), the first error terminates, completes when both sources complete. + // + // Same precondition as UnsynchronizedMerge: delivery from BOTH sources must already be + // serialized through the same external gate before reaching this operator. In this library + // that is satisfied by routing both inputs through the same SharedDeliveryQueue via + // SynchronizeSafe(queue). Under that precondition no two OnNext calls overlap, so the + // latest-value state needs no internal locking, and the gate that + // Observable.CombineLatest installs becomes redundant. + // + // The Rx gate matters here for the same reason as Merge: Observable.CombineLatest holds + // its private _gate for the entire downstream delivery, and any operator-level lock held + // across a cross-cache write reconstructs the ABBA cycle the queue-drain design eliminated. + // + // Without the external serialization precondition, concurrent OnNext calls would race the + // latest-value state and could produce torn reads. Do not use as a general-purpose + // Observable.CombineLatest replacement. + public static IObservable UnsynchronizedCombineLatest( + this IObservable first, + IObservable second, + Func resultSelector) + where TFirst : notnull + where TSecond : notnull => + Observable.Create(observer => + { + var firstLatest = Optional.None(); + var secondLatest = Optional.None(); + var remainingSources = 2; + var terminated = 0; + + var subscriptions = new CompositeDisposable(2); + subscriptions.Add(first.SubscribeSafe(Observer.Create(OnFirstNext, OnErrorSafe, OnCompletedSafe))); + subscriptions.Add(second.SubscribeSafe(Observer.Create(OnSecondNext, OnErrorSafe, OnCompletedSafe))); + return subscriptions; + + void OnFirstNext(TFirst value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + firstLatest = value; + if (secondLatest.HasValue) + { + observer.OnNext(resultSelector(value, secondLatest.Value)); + } + } + + void OnSecondNext(TSecond value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + secondLatest = value; + if (firstLatest.HasValue) + { + observer.OnNext(resultSelector(firstLatest.Value, value)); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); } diff --git a/src/DynamicData/List/Internal/Filter.Dynamic.cs b/src/DynamicData/List/Internal/Filter.Dynamic.cs index f8fded8c8..7b0b8c0dd 100644 --- a/src/DynamicData/List/Internal/Filter.Dynamic.cs +++ b/src/DynamicData/List/Internal/Filter.Dynamic.cs @@ -95,7 +95,7 @@ public IObservable> Run() => Observable.Create>( return Process(filtered, changes); }); - var publisher = predicateChanged.Merge(filteredResult).NotEmpty() + var publisher = predicateChanged.UnsynchronizedMerge(filteredResult).NotEmpty() .Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform .SubscribeSafe(observer); diff --git a/src/DynamicData/List/Internal/GroupOn.cs b/src/DynamicData/List/Internal/GroupOn.cs index 532fef5bf..70bb10b30 100644 --- a/src/DynamicData/List/Internal/GroupOn.cs +++ b/src/DynamicData/List/Internal/GroupOn.cs @@ -39,9 +39,9 @@ public IObservable>> Run() => Observable.C var regrouperFunc = _regrouper is null ? Observable.Never>>() : - _regrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _regrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); - var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable + var publisher = grouper.UnsynchronizedMerge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable .NotEmpty().SubscribeSafe(observer); return new CompositeDisposable(publisher, shared.Connect(), queue); diff --git a/src/DynamicData/List/Internal/GroupOnImmutable.cs b/src/DynamicData/List/Internal/GroupOnImmutable.cs index bd6b6a5c0..dab161800 100644 --- a/src/DynamicData/List/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/List/Internal/GroupOnImmutable.cs @@ -38,9 +38,9 @@ public IObservable>> Run() => Observabl var reGroupFunc = _reGrouper is null ? Observable.Never>>() : - _reGrouper.SynchronizeSafe(queue).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _reGrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); - var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer); + var publisher = grouper.UnsynchronizedMerge(reGroupFunc).NotEmpty().SubscribeSafe(observer); return new CompositeDisposable(publisher, shared.Connect(), queue); }); diff --git a/src/DynamicData/List/Internal/Pager.cs b/src/DynamicData/List/Internal/Pager.cs index 58a8dee23..448cde058 100644 --- a/src/DynamicData/List/Internal/Pager.cs +++ b/src/DynamicData/List/Internal/Pager.cs @@ -41,7 +41,7 @@ public IObservable> Run() => Observable.Create Page(all, paged, parameters, changes)); var publisher = requestStream - .Merge(dataChanged) + .UnsynchronizedMerge(dataChanged) .Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .SubscribeSafe(observer); diff --git a/src/DynamicData/List/Internal/Sort.cs b/src/DynamicData/List/Internal/Sort.cs index a6d223d4a..1c5629c62 100644 --- a/src/DynamicData/List/Internal/Sort.cs +++ b/src/DynamicData/List/Internal/Sort.cs @@ -43,7 +43,7 @@ public IObservable> Run() => Observable.Create>( var resortSync = _resort.SynchronizeSafe(queue).Select(_ => Reorder(target)); var changeComparer = _comparerObservable.SynchronizeSafe(queue).Select(comparer => ChangeComparer(target, comparer)); - var publisher = changeComparer.Merge(resortSync).Merge(dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); + var publisher = changeComparer.UnsynchronizedMerge(resortSync, dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); return new CompositeDisposable(publisher, queue); }); diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index 520001ff1..418e027be 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -146,7 +146,7 @@ private IObservable> CreateWithChangeSet() return result.CaptureChanges(); }); - var allChanges = init.Merge(subsequentSelection); + var allChanges = init.UnsynchronizedMerge(subsequentSelection); return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), queue); }); diff --git a/src/DynamicData/List/Internal/Virtualiser.cs b/src/DynamicData/List/Internal/Virtualiser.cs index ac7d63fcb..a6aa1b900 100644 --- a/src/DynamicData/List/Internal/Virtualiser.cs +++ b/src/DynamicData/List/Internal/Virtualiser.cs @@ -37,7 +37,7 @@ public IObservable> Run() => Observable.Create Virtualise(all, virtualised, parameters, changes)); - var publisher = requestStream.Merge(dataChanged).Where(changes => changes is not null && changes.Count != 0) + var publisher = requestStream.UnsynchronizedMerge(dataChanged).Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .Select(changes => new VirtualChangeSet(changes, new VirtualResponse(virtualised.Count, parameters.StartIndex, all.Count))).SubscribeSafe(observer); From c9a91555313e335dcd89f5c5bee204c9fd7ce63e Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 15 Jun 2026 07:38:02 -0700 Subject: [PATCH 12/12] List: replace remaining gate-holding Rx combinators per #1097 audit Closes the remaining ABBA-prone gate-holding Rx operators on the list side. Audit followed the matrix in PR #1097's body. Switch (List/Internal/Switch.cs): Was using Observable.Switch with the outer routed through SynchronizeSafe(queue) then PopulateInto(destination), which still installed Switch's internal gate and held it during downstream delivery. Refactored to mirror the cache-side Switch refactor: inline switch logic with SerialDisposable, both outer and inner routed through the same SharedDeliveryQueue, errors fed through a Subject>, and the final downstream merge of destination.Connect() and errors uses UnsynchronizedMerge. Also fixes the public Switch(IObservable>) overload because it delegates to the IObservable>> overload via .Switch() -> new Switch(sources).Run(). BufferIf: _pauseIfTrueSelector.Merge(timeoutSubject) is upstream of SynchronizeSafe(queue), so the merge inputs are NOT pre-serialized. UnsynchronizedMerge is unsafe here. Replaced with DeliveryQueueMerge from PR #1097's helper - it owns its own DeliveryQueue so notifications from both sources are serialized inside the merge itself, then released before downstream delivery to the .Concat -> .ObserveOn -> .SynchronizeSafe pipeline. Audited and skipped: - MergeChangeSets and DynamicCombiner use DD's MergeMany (operator on IObservable>), not Rx's Observable.Merge. Already deadlock-safe; not in #1097's gate list. - Combiner.Zip and FilterOnObservable.Join are Enumerable LINQ over ICollection/IEnumerable, not Observable.Zip/Join. No Rx gate involved. - Buffer (time-based), Throttle: per #1097 audit table, single-input gates protect internal buffer/throttle state. Left alone. After this commit, all gate-holding Rx combinators on the list side that sit downstream of SynchronizeSafe are eliminated. The full list of touched operators matches the cache-side coverage in #1097 modulo the cache-only ones (joins, AsyncDisposeMany, ObservableCache plumbing). --- src/DynamicData/List/Internal/BufferIf.cs | 2 +- src/DynamicData/List/Internal/Switch.cs | 46 ++++++++++++++++------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/DynamicData/List/Internal/BufferIf.cs b/src/DynamicData/List/Internal/BufferIf.cs index d2a123704..fb85adf4e 100644 --- a/src/DynamicData/List/Internal/BufferIf.cs +++ b/src/DynamicData/List/Internal/BufferIf.cs @@ -34,7 +34,7 @@ public IObservable> Run() => Observable.Create>( var timeoutSubscriber = new SerialDisposable(); var timeoutSubject = new Subject(); - var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.Merge(timeoutSubject)).ObserveOn(_scheduler).SynchronizeSafe(queue).Publish(); + var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.DeliveryQueueMerge(timeoutSubject)).ObserveOn(_scheduler).SynchronizeSafe(queue).Publish(); var pause = bufferSelector.Where(state => state).Subscribe( _ => diff --git a/src/DynamicData/List/Internal/Switch.cs b/src/DynamicData/List/Internal/Switch.cs index a8735cf91..a89d1ba7f 100644 --- a/src/DynamicData/List/Internal/Switch.cs +++ b/src/DynamicData/List/Internal/Switch.cs @@ -4,6 +4,7 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; using DynamicData.Internal; @@ -17,20 +18,37 @@ internal sealed class Switch(IObservable>> sources) public IObservable> Run() => Observable.Create>( observer => { - var destination = new SourceList(); - - // Shared queue serializes the source-of-sources signal (Clear on switch) - // with the inner changeset stream feeding the destination, without holding - // a lock during destination.Edit (which would otherwise nest with the - // destination's own queue and risk cross-cache deadlock). var queue = new SharedDeliveryQueue(); - - var populator = Observable.Switch( - _sources.SynchronizeSafe(queue).Do( - _ => destination.Clear())).SynchronizeSafe(queue).PopulateInto(destination); - - var publisher = destination.Connect().SubscribeSafe(observer); - - return new CompositeDisposable(destination, populator, publisher, queue); + var destination = new SourceList(); + var errors = new Subject>(); + var innerSubscription = new SerialDisposable(); + + // The outer (sources) and every inner are routed through the same SharedDeliveryQueue. + // Both the per-source clear and the per-changeset destination write happen on the drain + // thread, so destination.Connect() emissions and any errors.OnError calls also originate + // from inside the drain. The downstream merge therefore sees pre-serialized inputs and + // uses UnsynchronizedMerge to avoid the ABBA-prone Observable.Merge gate. Inlines what + // Observable.Switch would have done (whose own gate would itself be ABBA-prone). + var sourcesSubscription = _sources + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: newSource => + { + destination.Clear(); + innerSubscription.Disposable = newSource + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: changes => destination.Edit(updater => updater.Clone(changes)), + onError: errors.OnError); + }, + onError: errors.OnError); + + return new CompositeDisposable( + destination, + errors, + sourcesSubscription, + innerSubscription, + destination.Connect().UnsynchronizedMerge(errors).SubscribeSafe(observer), + queue); }); }