diff --git a/src/DynamicData.Tests/List/DeadlockTortureTest.cs b/src/DynamicData.Tests/List/DeadlockTortureTest.cs new file mode 100644 index 000000000..0abfad01a --- /dev/null +++ b/src/DynamicData.Tests/List/DeadlockTortureTest.cs @@ -0,0 +1,133 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Binding; +using DynamicData.Tests.Domain; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.List; + +/// +/// Deadlock torture test for list operators. Every operator that previously used +/// Synchronize(locker) (holding the lock during downstream delivery) is wired into a +/// bidirectional cross-list pipeline. Two threads write simultaneously, creating the ABBA +/// lock cycle: +/// Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker +/// Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker +/// +/// On origin/main (Synchronize(lock)): deadlocks reliably within seconds. +/// On the PR branch (SynchronizeSafe queue-drain): no deadlock possible. +/// +public sealed class DeadlockTortureTest +{ + private const int ItemCount = 200; + private const int Iterations = 50; + private const int TimeoutSeconds = 15; + + private static async Task RunBidirectionalDeadlockTest( + Func>, IObservable>> pipeline, + int iterations = Iterations) + { + for (var iter = 0; iter < iterations; iter++) + { + using var sourceA = new SourceList(); + using var sourceB = new SourceList(); + + // Filter prefixes prevent the feedback loop: items from A flowing into B keep their + // "A" prefix, so the B-to-A filter rejects them. Likewise for the reverse direction. + using var aToB = pipeline(sourceA.Connect().Filter(p => p.Name.StartsWith("A"))).PopulateInto(sourceB); + using var bToA = pipeline(sourceB.Connect().Filter(p => p.Name.StartsWith("B"))).PopulateInto(sourceA); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.Add(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.Add(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) + return false; + } + return true; + } + + [Fact] public async Task Sort_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)))).Should().BeTrue(); + + [Fact] public async Task AutoRefresh_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age))).Should().BeTrue(); + + [Fact] public async Task Page_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + } + + [Fact] public async Task Virtualise_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + } + + [Fact] public async Task FilterDynamic_DoesNotDeadlock() + { + using var pred = new BehaviorSubject>(_ => true); + (await RunBidirectionalDeadlockTest(s => s.Filter(pred))).Should().BeTrue(); + } + + [Fact] public async Task BufferIf_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.BufferIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + + [Fact] public async Task DisposeMany_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); + + [Fact] public async Task GroupOn_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupOn(p => p.Age % 3).MergeMany(g => g.List.Connect()))).Should().BeTrue(); + + [Fact] public async Task TransformMany_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.TransformMany(p => new[] { p }))).Should().BeTrue(); + + [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() + { + using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var pred = new BehaviorSubject>(_ => true); + (await RunBidirectionalDeadlockTest( + s => s.AutoRefresh(p => p.Age) + .Filter(pred) + .DisposeMany() + .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Page(pageReq), + iterations: Iterations * 2)).Should().BeTrue(); + } + + [Fact] public async Task ThreeWayCircular_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var a = new SourceList(); + using var b = new SourceList(); + using var c = new SourceList(); + + using var ab = a.Connect().Filter(p => p.Name.StartsWith("A")).Sort(SortExpressionComparer.Ascending(p => p.Age)).PopulateInto(b); + using var bc = b.Connect().Filter(p => p.Name.StartsWith("A")).AutoRefresh(p => p.Age).PopulateInto(c); + using var ca = c.Connect().Filter(p => p.Name.StartsWith("A")).Transform(p => new Person("C-" + p.Name, p.Age)).Filter(p => p.Name.StartsWith("C")).PopulateInto(a); + + using var barrier = new Barrier(3); + var tasks = new[] + { + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) a.Add(new Person("A-" + iter + "-" + i, i)); }), + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) b.Add(new Person("B-" + iter + "-" + i, i)); }), + Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) c.Add(new Person("CC-" + iter + "-" + i, i)); }), + }; + var completed = Task.WhenAll(tasks); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } + } +} diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 000000000..43b3d94b3 --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,58 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +// Same-type Rx merge that owns a DeliveryQueue. Serializes notifications from +// every input through the queue, which releases its gate before delivering, so +// downstream observers that walk into another cache's writer lock cannot deadlock +// with this operator's serialization point. Used where every input has the same +// element type and no per-input projection is needed inside the drain. When element +// types differ or per-input projections are required, route each input through +// SharedDeliveryQueue with SynchronizeSafe and combine them with UnsynchronizedMerge. +internal static class DeliveryQueueMergeExtensions +{ + // Functionally equivalent to Observable.Merge: completes only after every source + // completes, the first error terminates, subscription occurs in argument order. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new DeliveryQueue(observer); + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources + 1); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted or OnError. A single + // shared observer would silently drop terminal notifications from every source + // after the first. OnNext and OnError forward straight to the queue (the queue's + // gate serializes concurrent calls). OnCompleted is counter-gated so only the + // last surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref remainingSources) == 0) + { + queue.OnCompleted(); + } + }); + }); +} diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a1..603f545c7 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,55 +2,45 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Internal; -/// -/// Provides SynchronizeSafe extension methods, drop-in replacements -/// for Synchronize(lock) that release the lock before downstream delivery. -/// -/// -/// Disposal ordering matters. CompositeDisposable disposes in -/// declaration order. The queue and the source subscription have different roles: -/// -/// -/// Subscription-first (gate and SDQ overloads) -/// The queue is the IObserver that the source sends notifications to. -/// Disposing the subscription first allows any final terminal notification (OnCompleted/OnError -/// triggered by Rx's disposal cascade or a Finally operator) to flow through the -/// still-active queue. The queue is disposed last as cleanup. -/// -/// -/// Queue-first (parameterless overload) -/// Used by operators with teardown side effects (DisposeMany, OnBeingRemoved). -/// The queue is terminated first via , which ensures -/// all in-flight deliveries complete before the subscription is disposed and teardown logic -/// (e.g., disposing removed items) runs. Terminal notifications are not needed because -/// the subscriber is explicitly tearing down. -/// -/// -/// +// Drop-in replacements for Observable.Synchronize(lock) that release the lock before +// downstream delivery, plus UnsynchronizedMerge for combining streams whose inputs are +// already serialized through the same queue. +// +// Disposal ordering matters. CompositeDisposable disposes in declaration order, and the +// queue and the source subscription have different roles: +// +// Subscription-first (gate and SDQ overloads): the queue is the IObserver that the +// source sends notifications to. Disposing the subscription first allows any final +// terminal notification (OnCompleted or OnError triggered by Rx's disposal cascade +// or a Finally operator) to flow through the still-active queue. The queue is +// disposed last as cleanup. +// +// Queue-first (parameterless overload): used by operators with teardown side effects +// (DisposeMany, OnBeingRemoved). The queue is terminated first via DeliveryQueue.Dispose, +// which ensures all in-flight deliveries complete before the subscription is disposed +// and teardown logic (e.g. disposing removed items) runs. Terminal notifications are +// not needed because the subscriber is explicitly tearing down. internal static class SynchronizeSafeExtensions { - /// - /// Synchronizes the source observable through a . - /// Use when multiple sources of different types share a gate. - /// + // Routes the source through a SharedDeliveryQueue. Use when multiple sources of + // different types share a gate. public static IObservable SynchronizeSafe(this IObservable source, SharedDeliveryQueue queue) => Observable.Create(observer => { var subQueue = queue.CreateQueue(observer); - // Subscription first: terminal notifications flow through the still-active sub-queue + // Subscription first: terminal notifications flow through the still-active sub-queue. return new CompositeDisposable(source.SubscribeSafe(subQueue), subQueue); }); - /// - /// Synchronizes the source observable through an implicitly created . - /// Drop-in replacement for Synchronize(locker). - /// + // Routes the source through an implicitly created DeliveryQueue. Drop-in replacement + // for Observable.Synchronize(locker). #if NET9_0_OR_GREATER public static IObservable SynchronizeSafe(this IObservable source, Lock gate) => #else @@ -60,22 +50,166 @@ public static IObservable SynchronizeSafe(this IObservable source, obje { var queue = new DeliveryQueue(gate, observer); - // Subscription first: terminal notifications flow through the still-active queue + // Subscription first: terminal notifications flow through the still-active queue. return new CompositeDisposable(source.SubscribeSafe(queue), queue); }); - /// - /// Synchronizes the source observable through an implicitly created - /// with automatic delivery completion on dispose. The queue is terminated and drained - /// before the source subscription is disposed, ensuring all in-flight notifications - /// are delivered before teardown. - /// + // Routes the source through an implicitly created DeliveryQueue with automatic + // delivery completion on dispose. The queue is terminated and drained before the + // source subscription is disposed, ensuring all in-flight notifications are delivered + // before teardown. public static IObservable SynchronizeSafe(this IObservable source) => Observable.Create(observer => { var queue = new DeliveryQueue(observer); - // Queue first: ensures in-flight deliveries complete before teardown side effects run + // Queue first: ensures in-flight deliveries complete before teardown side effects run. return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + // Merges every input into a single observable without taking any synchronization gate. + // Functionally equivalent to Observable.Merge: completes only after every source completes, + // the first error terminates, subscription occurs in argument order. + // + // The caller MUST ensure that delivery from every source is already serialized. In this + // library the precondition is satisfied by routing every source through the same + // SharedDeliveryQueue via SynchronizeSafe(queue). The shared queue's drain loop guarantees + // that at most one notification is in flight to the downstream observer at a time, so the + // additional gate that Observable.Merge would install is redundant. + // + // Removing that gate matters in cross-cache pipelines: Observable.Merge holds its private + // _gate for the entire duration of downstream delivery, and when downstream delivery walks + // into another cache's writer lock, two such gates on two operators form an ABBA cycle that + // the queue-drain design is meant to prevent. + // + // Without the external serialization precondition, concurrent OnNext calls into the shared + // observer will race. Do not use as a general-purpose Observable.Merge replacement. + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources); + var terminated = 0; + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase sets + // a one-shot stopped flag on the first OnCompleted or OnError. A single shared + // observer would silently drop terminal notifications from every source after the + // first. The OnNext/OnError/OnCompleted actions close over the shared remainingSources + // and terminated counters so cross-source coordination still works. + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); + + // Two-input CombineLatest variant that does NOT install a gate. Functionally equivalent + // to Observable.CombineLatest: holds the most-recent value from each source, emits a + // resultSelector output whenever either source fires (provided the other has also fired + // at least once), the first error terminates, completes when both sources complete. + // + // Same precondition as UnsynchronizedMerge: delivery from BOTH sources must already be + // serialized through the same external gate before reaching this operator. In this library + // that is satisfied by routing both inputs through the same SharedDeliveryQueue via + // SynchronizeSafe(queue). Under that precondition no two OnNext calls overlap, so the + // latest-value state needs no internal locking, and the gate that + // Observable.CombineLatest installs becomes redundant. + // + // The Rx gate matters here for the same reason as Merge: Observable.CombineLatest holds + // its private _gate for the entire downstream delivery, and any operator-level lock held + // across a cross-cache write reconstructs the ABBA cycle the queue-drain design eliminated. + // + // Without the external serialization precondition, concurrent OnNext calls would race the + // latest-value state and could produce torn reads. Do not use as a general-purpose + // Observable.CombineLatest replacement. + public static IObservable UnsynchronizedCombineLatest( + this IObservable first, + IObservable second, + Func resultSelector) + where TFirst : notnull + where TSecond : notnull => + Observable.Create(observer => + { + var firstLatest = Optional.None(); + var secondLatest = Optional.None(); + var remainingSources = 2; + var terminated = 0; + + var subscriptions = new CompositeDisposable(2); + subscriptions.Add(first.SubscribeSafe(Observer.Create(OnFirstNext, OnErrorSafe, OnCompletedSafe))); + subscriptions.Add(second.SubscribeSafe(Observer.Create(OnSecondNext, OnErrorSafe, OnCompletedSafe))); + return subscriptions; + + void OnFirstNext(TFirst value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + firstLatest = value; + if (secondLatest.HasValue) + { + observer.OnNext(resultSelector(value, secondLatest.Value)); + } + } + + void OnSecondNext(TSecond value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + secondLatest = value; + if (firstLatest.HasValue) + { + observer.OnNext(resultSelector(firstLatest.Value, value)); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); } diff --git a/src/DynamicData/List/Internal/BufferIf.cs b/src/DynamicData/List/Internal/BufferIf.cs index 8f7d1993c..fb85adf4e 100644 --- a/src/DynamicData/List/Internal/BufferIf.cs +++ b/src/DynamicData/List/Internal/BufferIf.cs @@ -7,6 +7,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class BufferIf(IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, TimeSpan? timeOut = null, IScheduler? scheduler = null) @@ -23,13 +25,16 @@ internal sealed class BufferIf(IObservable> source, IObservable public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var paused = initialPauseState; var buffer = new ChangeSet(); var timeoutSubscriber = new SerialDisposable(); var timeoutSubject = new Subject(); - var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.Merge(timeoutSubject)).ObserveOn(_scheduler).Synchronize(locker).Publish(); + var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.DeliveryQueueMerge(timeoutSubject)).ObserveOn(_scheduler).SynchronizeSafe(queue).Publish(); var pause = bufferSelector.Where(state => state).Subscribe( _ => @@ -61,7 +66,7 @@ public IObservable> Run() => Observable.Create>( timeoutSubscriber.Disposable = Disposable.Empty; }); - var updateSubscriber = _source.Synchronize(locker).Subscribe( + var updateSubscriber = _source.SynchronizeSafe(queue).Subscribe( updates => { if (paused) @@ -85,6 +90,7 @@ public IObservable> Run() => Observable.Create>( updateSubscriber.Dispose(); timeoutSubject.OnCompleted(); timeoutSubscriber.Dispose(); + queue.Dispose(); }); }); } diff --git a/src/DynamicData/List/Internal/Combiner.cs b/src/DynamicData/List/Internal/Combiner.cs index 64882a3ba..0c509a047 100644 --- a/src/DynamicData/List/Internal/Combiner.cs +++ b/src/DynamicData/List/Internal/Combiner.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -6,48 +6,47 @@ using System.Reactive.Linq; using DynamicData.Cache.Internal; +using DynamicData.Internal; namespace DynamicData.List.Internal; internal sealed class Combiner(ICollection>> source, CombineOperator type) where T : notnull { -#if NET9_0_OR_GREATER - private readonly Lock _locker = new(); -#else - private readonly object _locker = new(); -#endif - private readonly ICollection>> _source = source ?? throw new ArgumentNullException(nameof(source)); public IObservable> Run() => Observable.Create>( observer => { var disposable = new CompositeDisposable(); - var resultList = new ChangeAwareListWithRefCounts(); + var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList(); - lock (_locker) - { - var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList(); + // Shared queue serializes the multiple source streams so they appear as a + // single sequence to the combiner, but without holding a lock during the + // downstream observer.OnNext call. + var queue = new SharedDeliveryQueue(); - foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list })) - { - disposable.Add( - pair.Item.Synchronize(_locker).Subscribe( - changes => + foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list })) + { + disposable.Add( + pair.Item.SynchronizeSafe(queue).Subscribe( + changes => + { + CloneSourceList(pair.List, changes); + + var notifications = UpdateResultList(changes, sourceLists, resultList); + if (notifications.Count != 0) { - CloneSourceList(pair.List, changes); - - var notifications = UpdateResultList(changes, sourceLists, resultList); - if (notifications.Count != 0) - { - observer.OnNext(notifications); - } - })); - } + observer.OnNext(notifications); + } + }, + observer.OnError)); } + // Dispose subscriptions first (stops further notifications from arriving), + // then drain/terminate the shared queue. + disposable.Add(queue); return disposable; }); diff --git a/src/DynamicData/List/Internal/DisposeMany.cs b/src/DynamicData/List/Internal/DisposeMany.cs index 046e19e4d..cc7b21dd9 100644 --- a/src/DynamicData/List/Internal/DisposeMany.cs +++ b/src/DynamicData/List/Internal/DisposeMany.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class DisposeMany(IObservable> source) @@ -14,11 +16,13 @@ internal sealed class DisposeMany(IObservable> source) public IObservable> Run() => Observable.Create>(observer => { - // Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it. var cachedItems = new List(); + // SynchronizeSafe with queue-first disposal: the DeliveryQueue is terminated and + // drained before the source subscription is disposed, ensuring no notifications + // can fire while the teardown (per-item Dispose) is in progress. var sourceSubscription = source - .Synchronize(cachedItems) + .SynchronizeSafe() .SubscribeSafe(Observer.Create>( onNext: changeSet => { @@ -76,11 +80,7 @@ public IObservable> Run() return Disposable.Create(() => { sourceSubscription.Dispose(); - - lock (cachedItems) - { - ProcessFinalization(cachedItems); - } + ProcessFinalization(cachedItems); }); }); diff --git a/src/DynamicData/List/Internal/DynamicCombiner.cs b/src/DynamicData/List/Internal/DynamicCombiner.cs index bb3091378..2d7dd9470 100644 --- a/src/DynamicData/List/Internal/DynamicCombiner.cs +++ b/src/DynamicData/List/Internal/DynamicCombiner.cs @@ -6,32 +6,32 @@ using System.Reactive.Linq; using DynamicData.Cache.Internal; +using DynamicData.Internal; namespace DynamicData.List.Internal; internal sealed class DynamicCombiner(IObservableList>> source, CombineOperator type) where T : notnull { -#if NET9_0_OR_GREATER - private readonly Lock _locker = new(); -#else - private readonly object _locker = new(); -#endif - private readonly IObservableList>> _source = source ?? throw new ArgumentNullException(nameof(source)); public IObservable> Run() => Observable.Create>( observer => { + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(_locker) so the + // gate is released before downstream OnNext. Closes the cross-cache deadlock + // window. + var queue = new SharedDeliveryQueue(); + // this is the resulting list which produces all notifications var resultList = new ChangeAwareListWithRefCounts(); // Transform to a merge container. // This populates a RefTracker when the original source is subscribed to - var sourceLists = _source.Connect().Synchronize(_locker).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList(); + var sourceLists = _source.Connect().SynchronizeSafe(queue).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList(); // merge the items back together - var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).Synchronize(_locker).Subscribe( + var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).SynchronizeSafe(queue).Subscribe( changes => { // Populate result list and check for changes @@ -86,7 +86,7 @@ public IObservable> Run() => Observable.Create>( } }).Subscribe(); - return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged); + return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged, queue); }); private bool MatchesConstraint(MergeContainer[] sourceLists, T item) diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs index 97a39c84e..ffc9e72d3 100644 --- a/src/DynamicData/List/Internal/ExpireAfter.cs +++ b/src/DynamicData/List/Internal/ExpireAfter.cs @@ -39,8 +39,15 @@ public static IObservable> Create( private abstract class SubscriptionBase : IDisposable { - private readonly List _expirationDueTimes; - private readonly List _expiringIndexesBuffer; + // Shadow of the source maintained from OnSourceNext, carrying both the item and its + // expiration time per occurrence. Item identity (by value) is used during expiration + // because the shadow may be stale relative to the source: the new SourceList uses + // queue-based delivery, so notifications drain after the producing Edit releases its + // lock. A concurrent ManageExpirations cannot rely on shadow indices matching source + // indices; matching by item value via updater.IndexOf tolerates this divergence and + // simply skips items that were already removed externally. + private readonly List _shadow; + private readonly List _expiringShadowIndexesBuffer; private readonly IObserver> _observer; private readonly Action> _onEditingSource; private readonly IScheduler _scheduler; @@ -65,8 +72,8 @@ protected SubscriptionBase( _onEditingSource = OnEditingSource; - _expirationDueTimes = new(); - _expiringIndexesBuffer = new(); + _shadow = new(); + _expiringShadowIndexesBuffer = new(); _sourceSubscription = source .Connect() @@ -94,7 +101,7 @@ protected IScheduler Scheduler // Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers. protected object SynchronizationGate - => _expirationDueTimes; + => _shadow; protected abstract DateTimeOffset? GetNextManagementDueTime(); @@ -102,9 +109,9 @@ protected object SynchronizationGate { var result = null as DateTimeOffset?; - foreach (var dueTime in _expirationDueTimes) + foreach (var entry in _shadow) { - if ((dueTime is { } value) && ((result is null) || (value < result))) + if ((entry.DueTime is { } value) && ((result is null) || (value < result))) result = value; } @@ -141,40 +148,85 @@ private void OnEditingSource(IExtendedList updater) var now = Scheduler.Now; - // One major note here: we are NOT updating our internal state, except to mark items as no longer needing to expire. - // Once we're done with the source.Edit() here, it will fire of a changeset for the removals, which will get handled by OnSourceNext(), - // thus bringing all of our internal state back into sync. + // We are NOT updating our shadow here, except to mark items as no longer needing to expire. + // Once source.Edit() returns, the resulting changeset will reach OnSourceNext and bring _shadow + // back into sync. The shadow may have been stale on entry (concurrent edits could be queued + // behind a pending drain), so we identify removals by item value instead of by shadow index. - // Buffer removals, so we can eliminate the need for index adjustments as we update the source - for (var i = 0; i < _expirationDueTimes.Count; ++i) + for (var i = 0; i < _shadow.Count; ++i) { - if ((_expirationDueTimes[i] is { } dueTime) && (dueTime <= now)) + if ((_shadow[i].DueTime is { } dueTime) && (dueTime <= now)) { - _expiringIndexesBuffer.Add(i); + _expiringShadowIndexesBuffer.Add(i); - // This shouldn't be necessary, but it guarantees we don't accidentally expire an item more than once, - // in the event of a race condition or something we haven't predicted. - _expirationDueTimes[i] = null; + // Mark as processed so we don't expire the same shadow slot twice if reentered. + _shadow[i] = new ItemEntry(_shadow[i].Item, null); } } - // I'm pretty sure it shouldn't be possible to end up with no removals here, but it costs basically nothing to check. - if (_expiringIndexesBuffer.Count is not 0) + if (_expiringShadowIndexesBuffer.Count is not 0) { - // Processing removals in reverse-index order eliminates the need for us to adjust index of each .RemoveAt() call, as we go. - _expiringIndexesBuffer.Sort(static (x, y) => y.CompareTo(x)); + var removedItems = new List(_expiringShadowIndexesBuffer.Count); + + // Validate the shadow against the live updater all the way up to (and including) + // the LAST expiring index. The previous version of this code only checked the + // prefix up to the FIRST expiring index, which can falsely declare the shadow + // in sync when a concurrent external mutation has moved/replaced an item at a + // position between the first and last expiring index. The subsequent reverse- + // index removal would then remove an unrelated item. If the shadow is stale we + // fall back to value-based IndexOf, which may remove a different equal + // occurrence than originally scheduled but at least keeps the source consistent + // with what subscribers see. + var lastExpiringShadowIdx = _expiringShadowIndexesBuffer[_expiringShadowIndexesBuffer.Count - 1]; + var shadowInSync = _shadow.Count == updater.Count; + if (shadowInSync) + { + for (var i = 0; i <= lastExpiringShadowIdx && i < updater.Count; ++i) + { + if (!EqualityComparer.Default.Equals(_shadow[i].Item, updater[i])) + { + shadowInSync = false; + break; + } + } + } - var removedItems = new T[_expiringIndexesBuffer.Count]; - for (var i = 0; i < _expiringIndexesBuffer.Count; ++i) + if (shadowInSync) { - var removedIndex = _expiringIndexesBuffer[i]; - removedItems[i] = updater[removedIndex]; - updater.RemoveAt(removedIndex); + // Index-based removal in REVERSE shadow order so earlier indices stay + // valid as we remove later items. This matches the legacy behaviour and + // correctly distinguishes between equal duplicate occurrences with + // different expiration times. + for (var i = _expiringShadowIndexesBuffer.Count - 1; i >= 0; --i) + { + var shadowIdx = _expiringShadowIndexesBuffer[i]; + removedItems.Add(updater[shadowIdx]); + updater.RemoveAt(shadowIdx); + } + } + else + { + // Shadow is stale (concurrent external mutation has been queued but not + // yet delivered to OnSourceNext). Fall back to value-based search. + // Iterate shadow forward; for each expiring entry remove the first + // matching live occurrence. Silently skip items that have already been + // removed externally; the pending OnSourceNext will reconcile the shadow. + for (var i = 0; i < _expiringShadowIndexesBuffer.Count; ++i) + { + var item = _shadow[_expiringShadowIndexesBuffer[i]].Item; + var idx = updater.IndexOf(item); + if (idx >= 0) + { + updater.RemoveAt(idx); + removedItems.Add(item); + } + } } - _observer.OnNext(removedItems); + if (removedItems.Count > 0) + _observer.OnNext(removedItems); - _expiringIndexesBuffer.Clear(); + _expiringShadowIndexesBuffer.Clear(); } OnExpirationsManaged(thisScheduledManagement.DueTime); @@ -250,9 +302,9 @@ private void OnSourceNext(IChangeSet changes) { var dueTime = now + _timeSelector.Invoke(change.Item.Current); - _expirationDueTimes.Insert( + _shadow.Insert( index: change.Item.CurrentIndex, - item: dueTime); + item: new ItemEntry(change.Item.Current, dueTime)); haveExpirationDueTimesChanged |= dueTime is not null; } @@ -260,16 +312,16 @@ private void OnSourceNext(IChangeSet changes) case ListChangeReason.AddRange: { - _expirationDueTimes.EnsureCapacity(_expirationDueTimes.Count + change.Range.Count); + _shadow.EnsureCapacity(_shadow.Count + change.Range.Count); var itemIndex = change.Range.Index; foreach (var item in change.Range) { var dueTime = now + _timeSelector.Invoke(item); - _expirationDueTimes.Insert( + _shadow.Insert( index: itemIndex, - item: dueTime); + item: new ItemEntry(item, dueTime)); haveExpirationDueTimesChanged |= dueTime is not null; @@ -279,37 +331,37 @@ private void OnSourceNext(IChangeSet changes) break; case ListChangeReason.Clear: - foreach (var dueTime in _expirationDueTimes) + foreach (var entry in _shadow) { - if (dueTime is not null) + if (entry.DueTime is not null) { haveExpirationDueTimesChanged = true; break; } } - _expirationDueTimes.Clear(); + _shadow.Clear(); break; case ListChangeReason.Moved: { - var expirationDueTime = _expirationDueTimes[change.Item.PreviousIndex]; + var entry = _shadow[change.Item.PreviousIndex]; - _expirationDueTimes.RemoveAt(change.Item.PreviousIndex); - _expirationDueTimes.Insert( + _shadow.RemoveAt(change.Item.PreviousIndex); + _shadow.Insert( index: change.Item.CurrentIndex, - item: expirationDueTime); + item: entry); } break; case ListChangeReason.Remove: { - if (_expirationDueTimes[change.Item.CurrentIndex] is not null) + if (_shadow[change.Item.CurrentIndex].DueTime is not null) { haveExpirationDueTimesChanged = true; } - _expirationDueTimes.RemoveAt(change.Item.CurrentIndex); + _shadow.RemoveAt(change.Item.CurrentIndex); } break; @@ -318,25 +370,25 @@ private void OnSourceNext(IChangeSet changes) var rangeEndIndex = change.Range.Index + change.Range.Count - 1; for (var i = change.Range.Index; i <= rangeEndIndex; ++i) { - if (_expirationDueTimes[i] is not null) + if (_shadow[i].DueTime is not null) { haveExpirationDueTimesChanged = true; break; } } - _expirationDueTimes.RemoveRange(change.Range.Index, change.Range.Count); + _shadow.RemoveRange(change.Range.Index, change.Range.Count); } break; case ListChangeReason.Replace: { - var oldDueTime = _expirationDueTimes[change.Item.CurrentIndex]; + var oldDueTime = _shadow[change.Item.CurrentIndex].DueTime; var newDueTime = now + _timeSelector.Invoke(change.Item.Current); // Ignoring the possibility that the item's index has changed as well, because ISourceList does not allow for this. - _expirationDueTimes[change.Item.CurrentIndex] = newDueTime; + _shadow[change.Item.CurrentIndex] = new ItemEntry(change.Item.Current, newDueTime); haveExpirationDueTimesChanged |= newDueTime != oldDueTime; } @@ -369,6 +421,8 @@ private readonly record struct ScheduledManagement public required DateTimeOffset DueTime { get; init; } } + + private readonly record struct ItemEntry(T Item, DateTimeOffset? DueTime); } private sealed class OnDemandSubscription diff --git a/src/DynamicData/List/Internal/Filter.Dynamic.cs b/src/DynamicData/List/Internal/Filter.Dynamic.cs index 8d9b5e8dc..7b0b8c0dd 100644 --- a/src/DynamicData/List/Internal/Filter.Dynamic.cs +++ b/src/DynamicData/List/Internal/Filter.Dynamic.cs @@ -4,6 +4,8 @@ using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal static partial class Filter @@ -36,7 +38,10 @@ public Dynamic(IObservable> source, Func predicate, ListF public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so + // the gate lock is released before downstream OnNext. Closes the cross- + // cache deadlock window. + var queue = new SharedDeliveryQueue(); Func predicate = _ => false; var all = new List(); @@ -57,7 +62,7 @@ public IObservable> Run() => Observable.Create>( throw new InvalidOperationException("The predicates is not set and the change is not a immutableFilter."); } - predicateChanged = _predicates.Synchronize(locker).Select( + predicateChanged = _predicates.SynchronizeSafe(queue).Select( newPredicate => { predicate = newPredicate; @@ -72,7 +77,7 @@ public IObservable> Run() => Observable.Create>( */ // Need to get item by index and store it in the transform - var filteredResult = _source.Synchronize(locker).Transform( + var filteredResult = _source.SynchronizeSafe(queue).Transform( (t, previous) => { var wasMatch = previous.ConvertOr(p => p!.IsMatch, () => false); @@ -90,9 +95,11 @@ public IObservable> Run() => Observable.Create>( return Process(filtered, changes); }); - return predicateChanged.Merge(filteredResult).NotEmpty() + var publisher = predicateChanged.UnsynchronizedMerge(filteredResult).NotEmpty() .Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform .SubscribeSafe(observer); + + return new System.Reactive.Disposables.CompositeDisposable(publisher, queue); }); private static IChangeSet Process(ChangeAwareList filtered, IChangeSet changes) diff --git a/src/DynamicData/List/Internal/GroupOn.cs b/src/DynamicData/List/Internal/GroupOn.cs index 6ea7deccd..70bb10b30 100644 --- a/src/DynamicData/List/Internal/GroupOn.cs +++ b/src/DynamicData/List/Internal/GroupOn.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class GroupOn(IObservable> source, Func groupSelector, IObservable? regrouper) @@ -27,19 +29,22 @@ public IObservable>> Run() => Observable.C // capture the grouping up front which has the benefit that the group key is only selected once var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true); - var locker = InternalEx.NewLock(); - var shared = itemsWithGroup.Synchronize(locker).Publish(); + // Shared queue serializes the source changesets with the optional regrouper signal + // so they appear as a single sequence to the combiner, without holding a lock + // during the downstream observer.OnNext call. + var queue = new SharedDeliveryQueue(); + var shared = itemsWithGroup.SynchronizeSafe(queue).Publish(); var grouper = shared.Select(changes => Process(groupings, groupCache, changes)); var regrouperFunc = _regrouper is null ? Observable.Never>>() : - _regrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _regrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); - var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable + var publisher = grouper.UnsynchronizedMerge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable .NotEmpty().SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect()); + return new CompositeDisposable(publisher, shared.Connect(), queue); }); private static GroupWithAddIndicator GetCache(IDictionary> groupCaches, TGroupKey key) diff --git a/src/DynamicData/List/Internal/GroupOnImmutable.cs b/src/DynamicData/List/Internal/GroupOnImmutable.cs index 2b4a08be8..dab161800 100644 --- a/src/DynamicData/List/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/List/Internal/GroupOnImmutable.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class GroupOnImmutable(IObservable> source, Func groupSelector, IObservable? reGrouper) @@ -24,24 +26,23 @@ public IObservable>> Run() => Observabl var groupings = new ChangeAwareList>(); var groupCache = new Dictionary(); - // var itemsWithGroup = _source - // .Transform(t => new ItemWithValue(t, _groupSelector(t))); - // capture the grouping up front which has the benefit that the group key is only selected once var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true); - var locker = InternalEx.NewLock(); - var shared = itemsWithGroup.Synchronize(locker).Publish(); + // Shared queue serializes the source changesets with the optional regrouper signal + // without holding a lock during downstream delivery. + var queue = new SharedDeliveryQueue(); + var shared = itemsWithGroup.SynchronizeSafe(queue).Publish(); var grouper = shared.Select(changes => Process(groupings, groupCache, changes)); var reGroupFunc = _reGrouper is null ? Observable.Never>>() : - _reGrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); + _reGrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection)); - var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer); + var publisher = grouper.UnsynchronizedMerge(reGroupFunc).NotEmpty().SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect()); + return new CompositeDisposable(publisher, shared.Connect(), queue); }); private static IChangeSet> CreateChangeSet(ChangeAwareList> result, IDictionary allGroupings, IDictionary> initialStateOfGroups) diff --git a/src/DynamicData/List/Internal/LimitSizeTo.cs b/src/DynamicData/List/Internal/LimitSizeTo.cs index c2ed74896..f66baafe9 100644 --- a/src/DynamicData/List/Internal/LimitSizeTo.cs +++ b/src/DynamicData/List/Internal/LimitSizeTo.cs @@ -5,14 +5,11 @@ using System.Reactive.Concurrency; using System.Reactive.Linq; -namespace DynamicData.List.Internal; +using DynamicData.Internal; -#if NET9_0_OR_GREATER -internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, Lock locker) -#else -internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, object locker) -#endif +namespace DynamicData.List.Internal; +internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, SharedDeliveryQueue queue) where T : notnull { private readonly IScheduler _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); @@ -23,7 +20,7 @@ public IObservable> Run() var emptyResult = new List(); long orderItemWasAdded = -1; - return _sourceList.Connect().ObserveOn(_scheduler).Synchronize(locker).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select( + return _sourceList.Connect().ObserveOn(_scheduler).SynchronizeSafe(queue).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select( list => { var numberToExpire = list.Count - sizeLimit; diff --git a/src/DynamicData/List/Internal/MergeChangeSets.cs b/src/DynamicData/List/Internal/MergeChangeSets.cs index 480c69b3a..6ede1188c 100644 --- a/src/DynamicData/List/Internal/MergeChangeSets.cs +++ b/src/DynamicData/List/Internal/MergeChangeSets.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; /// @@ -21,19 +24,24 @@ public MergeChangeSets(IEnumerable>> source, IEq public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate is released before downstream OnNext. Closes the cross-cache deadlock + // window. + var queue = new SharedDeliveryQueue(); // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(); // Merge all of the changeset streams together and Process them with the change tracker which will emit the results - return CreateClonedListObservable(source, locker) - .Synchronize(locker) + var publisher = CreateClonedListObservable(source, queue) + .SynchronizeSafe(queue) .MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError)) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); + + return new CompositeDisposable(publisher, queue); }); private static IObservable>> CreateObservable(IEnumerable>> source, bool completable, IScheduler? scheduler) @@ -48,21 +56,10 @@ private static IObservable>> CreateObservable(IE return obs; } - // Can optimize for the Add case because that's the only one that applies -#if NET9_0_OR_GREATER - private Change> CreateChange(IObservable> source, Lock locker) => - new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer)); + private Change> CreateChange(IObservable> source, SharedDeliveryQueue queue) => + new(ListChangeReason.Add, new ClonedListChangeSet(source.SynchronizeSafe(queue), equalityComparer)); // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private IObservable>> CreateClonedListObservable(IObservable>> source, Lock locker) => - source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) })); -#else - private Change> CreateChange(IObservable> source, object locker) => - new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer)); - - // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable - private IObservable>> CreateClonedListObservable(IObservable>> source, object locker) => - source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) })); -#endif - + private IObservable>> CreateClonedListObservable(IObservable>> source, SharedDeliveryQueue queue) => + source.Select(src => new ChangeSet>(new[] { CreateChange(src, queue) })); } diff --git a/src/DynamicData/List/Internal/Pager.cs b/src/DynamicData/List/Internal/Pager.cs index bd3238d2a..448cde058 100644 --- a/src/DynamicData/List/Internal/Pager.cs +++ b/src/DynamicData/List/Internal/Pager.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Collections; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Pager(IObservable> source, IObservable requests) @@ -17,13 +20,16 @@ internal sealed class Pager(IObservable> source, IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var all = new List(); var paged = new ChangeAwareList(); IPageRequest parameters = new PageRequest(0, 25); - var requestStream = _requests.Synchronize(locker).Select( + var requestStream = _requests.SynchronizeSafe(queue).Select( request => { parameters = request; @@ -31,14 +37,16 @@ public IObservable> Run() => Observable.Create Page(all, paged, parameters, changes)); - return requestStream - .Merge(dataChanged) + var publisher = requestStream + .UnsynchronizedMerge(dataChanged) .Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .SubscribeSafe(observer); + + return new CompositeDisposable(publisher, queue); }); private static int CalculatePages(ICollection all, IPageRequest? request) diff --git a/src/DynamicData/List/Internal/Sort.cs b/src/DynamicData/List/Internal/Sort.cs index ff03e2e70..1c5629c62 100644 --- a/src/DynamicData/List/Internal/Sort.cs +++ b/src/DynamicData/List/Internal/Sort.cs @@ -3,8 +3,11 @@ // See the LICENSE file in the project root for full license information. using System.Reactive; +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Sort(IObservable> source, IComparer? comparer, SortOptions sortOptions, IObservable? resort, IObservable>? comparerObservable, int resetThreshold) @@ -19,11 +22,15 @@ internal sealed class Sort(IObservable> source, IComparer? c public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // lock is released before downstream OnNext. This closes the cross-cache + // deadlock window when a downstream consumer also acquires another cache's + // lock during its OnNext processing. + var queue = new SharedDeliveryQueue(); var original = new List(); var target = new ChangeAwareList(); - var dataChanged = _source.Synchronize(locker).Select( + var dataChanged = _source.SynchronizeSafe(queue).Select( changes => { if (resetThreshold > 1) @@ -33,10 +40,12 @@ public IObservable> Run() => Observable.Create>( return changes.TotalChanges > resetThreshold ? Reset(original, target) : Process(target, changes); }); - var resortSync = _resort.Synchronize(locker).Select(_ => Reorder(target)); - var changeComparer = _comparerObservable.Synchronize(locker).Select(comparer => ChangeComparer(target, comparer)); + var resortSync = _resort.SynchronizeSafe(queue).Select(_ => Reorder(target)); + var changeComparer = _comparerObservable.SynchronizeSafe(queue).Select(comparer => ChangeComparer(target, comparer)); + + var publisher = changeComparer.UnsynchronizedMerge(resortSync, dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); - return changeComparer.Merge(resortSync).Merge(dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer); + return new CompositeDisposable(publisher, queue); }); private IChangeSet ChangeComparer(ChangeAwareList target, IComparer comparer) diff --git a/src/DynamicData/List/Internal/Switch.cs b/src/DynamicData/List/Internal/Switch.cs index 9425771dd..a89d1ba7f 100644 --- a/src/DynamicData/List/Internal/Switch.cs +++ b/src/DynamicData/List/Internal/Switch.cs @@ -4,6 +4,9 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Reactive.Subjects; + +using DynamicData.Internal; namespace DynamicData.List.Internal; @@ -15,21 +18,37 @@ internal sealed class Switch(IObservable>> sources) public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); - + var queue = new SharedDeliveryQueue(); var destination = new SourceList(); + var errors = new Subject>(); + var innerSubscription = new SerialDisposable(); - var populator = Observable.Switch( - _sources.Do( - _ => + // The outer (sources) and every inner are routed through the same SharedDeliveryQueue. + // Both the per-source clear and the per-changeset destination write happen on the drain + // thread, so destination.Connect() emissions and any errors.OnError calls also originate + // from inside the drain. The downstream merge therefore sees pre-serialized inputs and + // uses UnsynchronizedMerge to avoid the ABBA-prone Observable.Merge gate. Inlines what + // Observable.Switch would have done (whose own gate would itself be ABBA-prone). + var sourcesSubscription = _sources + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: newSource => { - lock (locker) - { - destination.Clear(); - } - })).Synchronize(locker).PopulateInto(destination); + destination.Clear(); + innerSubscription.Disposable = newSource + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: changes => destination.Edit(updater => updater.Clone(changes)), + onError: errors.OnError); + }, + onError: errors.OnError); - var publisher = destination.Connect().SubscribeSafe(observer); - return new CompositeDisposable(destination, populator, publisher); + return new CompositeDisposable( + destination, + errors, + sourcesSubscription, + innerSubscription, + destination.Connect().UnsynchronizedMerge(errors).SubscribeSafe(observer), + queue); }); } diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs index d7bb2a69e..418e027be 100644 --- a/src/DynamicData/List/Internal/TransformMany.cs +++ b/src/DynamicData/List/Internal/TransformMany.cs @@ -8,6 +8,7 @@ using System.Reactive.Linq; using DynamicData.Binding; +using DynamicData.Internal; namespace DynamicData.List.Internal; @@ -113,19 +114,23 @@ private IObservable> CreateWithChangeSet() { var result = new ChangeAwareList(); + // One shared queue for the whole operator. Each per-child stream, the initial-state + // stream, and the merged-children stream all attach as sub-queues, so there is a + // single drain point and downstream observer.OnNext is never invoked concurrently + // from any path. + var queue = new SharedDeliveryQueue(); + var transformed = _source.Transform( t => { - var locker = InternalEx.NewLock(); var collection = manySelector(t); - var changes = childChanges(t).Synchronize(locker).Skip(1); + var changes = childChanges(t).SynchronizeSafe(queue).Skip(1); return new ManyContainer(collection, changes); }).Publish(); - var outerLock = new object(); - var initial = transformed.Synchronize(outerLock).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); + var initial = transformed.SynchronizeSafe(queue).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer))); - var subsequent = transformed.MergeMany(x => x.Changes).Synchronize(outerLock); + var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(queue); var init = initial.Select( changes => @@ -141,9 +146,9 @@ private IObservable> CreateWithChangeSet() return result.CaptureChanges(); }); - var allChanges = init.Merge(subsequentSelection); + var allChanges = init.UnsynchronizedMerge(subsequentSelection); - return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect()); + return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), queue); }); } diff --git a/src/DynamicData/List/Internal/Virtualiser.cs b/src/DynamicData/List/Internal/Virtualiser.cs index bb2f894c2..a6aa1b900 100644 --- a/src/DynamicData/List/Internal/Virtualiser.cs +++ b/src/DynamicData/List/Internal/Virtualiser.cs @@ -2,8 +2,11 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.List.Internal; internal sealed class Virtualiser(IObservable> source, IObservable requests) @@ -16,25 +19,29 @@ internal sealed class Virtualiser(IObservable> source, IObserva public IObservable> Run() => Observable.Create>( observer => { - var locker = InternalEx.NewLock(); + // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the + // gate lock is released before downstream OnNext. Closes the cross-cache + // deadlock window. + var queue = new SharedDeliveryQueue(); var all = new List(); var virtualised = new ChangeAwareList(); IVirtualRequest parameters = new VirtualRequest(0, 25); - var requestStream = _requests.Synchronize(locker).Select( + var requestStream = _requests.SynchronizeSafe(queue).Select( request => { parameters = request; return CheckParamsAndVirtualise(all, virtualised, request); }); - var dataChanged = _source.Synchronize(locker).Select(changes => Virtualise(all, virtualised, parameters, changes)); + var dataChanged = _source.SynchronizeSafe(queue).Select(changes => Virtualise(all, virtualised, parameters, changes)); - // TODO: Remove this shared state stuff ie. _parameters - return requestStream.Merge(dataChanged).Where(changes => changes is not null && changes.Count != 0) + var publisher = requestStream.UnsynchronizedMerge(dataChanged).Where(changes => changes is not null && changes.Count != 0) .Select(x => x!) .Select(changes => new VirtualChangeSet(changes, new VirtualResponse(virtualised.Count, parameters.StartIndex, all.Count))).SubscribeSafe(observer); + + return new CompositeDisposable(publisher, queue); }); private static IChangeSet? CheckParamsAndVirtualise(IList all, ChangeAwareList virtualised, IVirtualRequest? request) diff --git a/src/DynamicData/List/ObservableListEx.Adapt.cs b/src/DynamicData/List/ObservableListEx.Adapt.cs index 942d943c7..d5dbaa19b 100644 --- a/src/DynamicData/List/ObservableListEx.Adapt.cs +++ b/src/DynamicData/List/ObservableListEx.Adapt.cs @@ -12,6 +12,7 @@ using System.Reactive.Linq; using DynamicData.Binding; using DynamicData.Cache.Internal; +using DynamicData.Internal; using DynamicData.List.Internal; using DynamicData.List.Linq; @@ -49,7 +50,7 @@ public static IObservable> Adapt(this IObservable observer => { var locker = InternalEx.NewLock(); - return source.Synchronize(locker).Select( + return source.SynchronizeSafe(locker).Select( changes => { adaptor.Adapt(changes); diff --git a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs index fd01dcfa6..1a214bb6f 100644 --- a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs +++ b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs @@ -12,6 +12,7 @@ using System.Reactive.Linq; using DynamicData.Binding; using DynamicData.Cache.Internal; +using DynamicData.Internal; using DynamicData.List.Internal; using DynamicData.List.Linq; @@ -54,9 +55,18 @@ public static IObservable> LimitSizeTo(this ISourceList sou throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit)); } - var locker = InternalEx.NewLock(); - var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker); + var effectiveScheduler = scheduler ?? GlobalConfig.DefaultScheduler; - return limiter.Run().Synchronize(locker).Do(source.RemoveMany); + // The shared queue must be created per subscription, not per call. Sharing one queue + // across all subscribers couples them and never disposes the queue. + return Observable.Create>(observer => + { + var queue = new SharedDeliveryQueue(); + var limiter = new LimitSizeTo(source, sizeLimit, effectiveScheduler, queue); + + var subscription = limiter.Run().SynchronizeSafe(queue).Do(source.RemoveMany).SubscribeSafe(observer); + + return new CompositeDisposable(subscription, queue); + }); } } diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs index 3db42bc11..6dcd7b3a7 100644 --- a/src/DynamicData/List/SourceList.cs +++ b/src/DynamicData/List/SourceList.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -6,7 +6,9 @@ using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Threading; +using DynamicData.Internal; using DynamicData.List.Internal; // ReSharper disable once CheckNamespace @@ -20,8 +22,10 @@ namespace DynamicData; public sealed class SourceList : ISourceList where T : notnull { - private readonly ISubject> _changes = new Subject>(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.")] + private readonly Subject> _changes = new(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via OnCompleted/OnError delivered through _notifications; explicit Dispose would race with in-flight queue drains.")] private readonly Subject> _changesPreview = new(); private readonly IDisposable _cleanUp; @@ -36,25 +40,36 @@ public sealed class SourceList : ISourceList private readonly ReaderWriter _readerWriter = new(); + [System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "CA2213:Disposable fields should be disposed", Justification = "Terminated via NotifyCompleted in _cleanUp")] + private readonly DeliveryQueue _notifications; + private int _editLevel; + private long _currentVersion; + + private long _currentDeliveryVersion; + + // Set true (under the queue gate) the instant a terminal notification is enqueued, so any + // Edit that lands between enqueue and the drain dequeueing the terminal can suppress its + // preview emission. Without this, a preview subscriber would observe a change whose main + // delivery is cleared from the queue when the terminal item is staged. + private bool _terminalEnqueued; + /// /// Initializes a new instance of the class. /// /// The source. public SourceList(IObservable>? source = null) { + _notifications = new DeliveryQueue(_locker, new ListUpdateObserver(this)); + var loader = source is null ? Disposable.Empty : LoadFromSource(source); _cleanUp = Disposable.Create( () => { loader.Dispose(); - OnCompleted(); - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } + NotifyCompleted(); }); } @@ -66,11 +81,15 @@ public SourceList(IObservable>? source = null) Observable.Create( observer => { - lock (_locker) - { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); - } + using var readLock = _notifications.AcquireReadLock(); + + var snapshotVersion = _currentVersion; + var countChanged = readLock.HasPending + ? _countChanged.Value.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _countChanged.Value; + + var source = countChanged.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); }); /// @@ -82,21 +101,27 @@ public IObservable> Connect(Func? predicate = null) var observable = Observable.Create>( observer => { - lock (_locker) - { - if (_readerWriter.Items.Length > 0) - { - observer.OnNext( - new ChangeSet - { - new(ListChangeReason.AddRange, _readerWriter.Items, 0) - }); - } + using var readLock = _notifications.AcquireReadLock(); + + var snapshot = _readerWriter.Items; + var snapshotVersion = _currentVersion; - var source = _changes.Finally(observer.OnCompleted); + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : (IObservable>)_changes; - return source.SubscribeSafe(observer); + IObservable> result; + if (snapshot.Length > 0) + { + var initial = new ChangeSet { new(ListChangeReason.AddRange, snapshot, 0) }; + result = Observable.Return((IChangeSet)initial).Concat(changes); + } + else + { + result = changes; } + + return result.SubscribeSafe(observer); }); if (predicate is not null) @@ -108,23 +133,21 @@ public IObservable> Connect(Func? predicate = null) } /// - public void Dispose() - { - _cleanUp.Dispose(); - _changesPreview.Dispose(); - } + public void Dispose() => _cleanUp.Dispose(); /// public void Edit(Action> updateAction) { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - lock (_locker) - { - IChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; + IChangeSet? changes = null; + _editLevel++; + + try + { if (_editLevel == 1) { changes = _changesPreview.HasObservers ? _readerWriter.WriteWithPreview(updateAction, InvokeNextPreview) : _readerWriter.Write(updateAction); @@ -133,13 +156,15 @@ public void Edit(Action> updateAction) { _readerWriter.WriteNested(updateAction); } - + } + finally + { _editLevel--; + } - if (changes is not null && _editLevel == 0) - { - InvokeNext(changes); - } + if (changes is not null && changes.Count > 0 && _editLevel == 0) + { + notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -156,54 +181,89 @@ public IObservable> Preview(Func? predicate = null) return observable; } - private void InvokeNext(IChangeSet changes) + private void InvokeNextPreview(IChangeSet changes) { - if (changes.Count == 0) + if (changes.Count != 0 && !_terminalEnqueued && !_notifications.IsTerminated) { - return; + _changesPreview.OnNext(changes); } + } - lock (_locker) - { - _changes.OnNext(changes); - - if (_countChanged.IsValueCreated) + private IDisposable LoadFromSource(IObservable> source) => + source.Subscribe( + changeSet => { - _countChanged.Value.OnNext(_readerWriter.Count); - } - } - } + IChangeSet? changes = null; + try + { + using var notifications = _notifications.AcquireLock(); + changes = _readerWriter.Write(changeSet); - private void InvokeNextPreview(IChangeSet changes) + if (changes.Count > 0) + { + notifications.EnqueueNext(new ListUpdate(changes, _readerWriter.Count, ++_currentVersion)); + } + } + catch (Exception ex) + { + // Convert ReaderWriter / Write exceptions into a downstream OnError. + // Without this, exceptions thrown by Write would escape the source's + // observer callback and leave subscribers unterminated. + NotifyError(ex); + } + }, + NotifyError, + NotifyCompleted); + + private void NotifyCompleted() { - if (changes.Count == 0) - { - return; - } + using var notifications = _notifications.AcquireLock(); + _terminalEnqueued = true; + notifications.EnqueueCompleted(); + } - lock (_locker) - { - _changesPreview.OnNext(changes); - } + private void NotifyError(Exception exception) + { + using var notifications = _notifications.AcquireLock(); + _terminalEnqueued = true; + notifications.EnqueueError(exception); } - private IDisposable LoadFromSource(IObservable> source) => source.Synchronize(_locker).Finally(OnCompleted).Select(_readerWriter.Write).Subscribe(InvokeNext, OnError, OnCompleted); + private readonly record struct ListUpdate(IChangeSet Changes, int Count, long Version); - private void OnCompleted() + private sealed class ListUpdateObserver(SourceList source) : IObserver { - lock (_locker) + public void OnNext(ListUpdate value) { - _changesPreview.OnCompleted(); - _changes.OnCompleted(); + Volatile.Write(ref source._currentDeliveryVersion, value.Version); + source._changes.OnNext(value.Changes); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnNext(value.Count); + } } - } - private void OnError(Exception exception) - { - lock (_locker) + public void OnError(Exception error) { - _changesPreview.OnError(exception); - _changes.OnError(exception); + source._changesPreview.OnError(error); + source._changes.OnError(error); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnError(error); + } + } + + public void OnCompleted() + { + source._changes.OnCompleted(); + source._changesPreview.OnCompleted(); + + if (source._countChanged.IsValueCreated) + { + source._countChanged.Value.OnCompleted(); + } } } }