diff --git a/src/DynamicData.Tests/List/DeadlockTortureTest.cs b/src/DynamicData.Tests/List/DeadlockTortureTest.cs
new file mode 100644
index 000000000..0abfad01a
--- /dev/null
+++ b/src/DynamicData.Tests/List/DeadlockTortureTest.cs
@@ -0,0 +1,133 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System;
+using System.Reactive.Subjects;
+using System.Threading;
+using System.Threading.Tasks;
+
+using DynamicData.Binding;
+using DynamicData.Tests.Domain;
+
+using FluentAssertions;
+
+using Xunit;
+
+namespace DynamicData.Tests.List;
+
+///
+/// Deadlock torture test for list operators. Every operator that previously used
+/// Synchronize(locker) (holding the lock during downstream delivery) is wired into a
+/// bidirectional cross-list pipeline. Two threads write simultaneously, creating the ABBA
+/// lock cycle:
+/// Thread A: sourceA._locker -> operator lock -> PopulateInto -> sourceB._locker
+/// Thread B: sourceB._locker -> operator lock -> PopulateInto -> sourceA._locker
+///
+/// On origin/main (Synchronize(lock)): deadlocks reliably within seconds.
+/// On the PR branch (SynchronizeSafe queue-drain): no deadlock possible.
+///
+public sealed class DeadlockTortureTest
+{
+ private const int ItemCount = 200;
+ private const int Iterations = 50;
+ private const int TimeoutSeconds = 15;
+
+ private static async Task RunBidirectionalDeadlockTest(
+ Func>, IObservable>> pipeline,
+ int iterations = Iterations)
+ {
+ for (var iter = 0; iter < iterations; iter++)
+ {
+ using var sourceA = new SourceList();
+ using var sourceB = new SourceList();
+
+ // Filter prefixes prevent the feedback loop: items from A flowing into B keep their
+ // "A" prefix, so the B-to-A filter rejects them. Likewise for the reverse direction.
+ using var aToB = pipeline(sourceA.Connect().Filter(p => p.Name.StartsWith("A"))).PopulateInto(sourceB);
+ using var bToA = pipeline(sourceB.Connect().Filter(p => p.Name.StartsWith("B"))).PopulateInto(sourceA);
+
+ using var barrier = new Barrier(2);
+ var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.Add(new Person("A-" + iter + "-" + i, i)); });
+ var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.Add(new Person("B-" + iter + "-" + i, i)); });
+
+ var completed = Task.WhenAll(taskA, taskB);
+ if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed)
+ return false;
+ }
+ return true;
+ }
+
+ [Fact] public async Task Sort_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)))).Should().BeTrue();
+
+ [Fact] public async Task AutoRefresh_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age))).Should().BeTrue();
+
+ [Fact] public async Task Page_DoesNotDeadlock()
+ {
+ using var req = new BehaviorSubject(new PageRequest(1, 50));
+ (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue();
+ }
+
+ [Fact] public async Task Virtualise_DoesNotDeadlock()
+ {
+ using var req = new BehaviorSubject(new VirtualRequest(0, 50));
+ (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue();
+ }
+
+ [Fact] public async Task FilterDynamic_DoesNotDeadlock()
+ {
+ using var pred = new BehaviorSubject>(_ => true);
+ (await RunBidirectionalDeadlockTest(s => s.Filter(pred))).Should().BeTrue();
+ }
+
+ [Fact] public async Task BufferIf_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.BufferIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue();
+
+ [Fact] public async Task DisposeMany_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue();
+
+ [Fact] public async Task GroupOn_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.GroupOn(p => p.Age % 3).MergeMany(g => g.List.Connect()))).Should().BeTrue();
+
+ [Fact] public async Task TransformMany_DoesNotDeadlock() =>
+ (await RunBidirectionalDeadlockTest(s => s.TransformMany(p => new[] { p }))).Should().BeTrue();
+
+ [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock()
+ {
+ using var pageReq = new BehaviorSubject(new PageRequest(1, 100));
+ using var pred = new BehaviorSubject>(_ => true);
+ (await RunBidirectionalDeadlockTest(
+ s => s.AutoRefresh(p => p.Age)
+ .Filter(pred)
+ .DisposeMany()
+ .Sort(SortExpressionComparer.Ascending(p => p.Age))
+ .Page(pageReq),
+ iterations: Iterations * 2)).Should().BeTrue();
+ }
+
+ [Fact] public async Task ThreeWayCircular_DoesNotDeadlock()
+ {
+ for (var iter = 0; iter < Iterations; iter++)
+ {
+ using var a = new SourceList();
+ using var b = new SourceList();
+ using var c = new SourceList();
+
+ using var ab = a.Connect().Filter(p => p.Name.StartsWith("A")).Sort(SortExpressionComparer.Ascending(p => p.Age)).PopulateInto(b);
+ using var bc = b.Connect().Filter(p => p.Name.StartsWith("A")).AutoRefresh(p => p.Age).PopulateInto(c);
+ using var ca = c.Connect().Filter(p => p.Name.StartsWith("A")).Transform(p => new Person("C-" + p.Name, p.Age)).Filter(p => p.Name.StartsWith("C")).PopulateInto(a);
+
+ using var barrier = new Barrier(3);
+ var tasks = new[]
+ {
+ Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) a.Add(new Person("A-" + iter + "-" + i, i)); }),
+ Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) b.Add(new Person("B-" + iter + "-" + i, i)); }),
+ Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) c.Add(new Person("CC-" + iter + "-" + i, i)); }),
+ };
+ var completed = Task.WhenAll(tasks);
+ (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter);
+ }
+ }
+}
diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs
new file mode 100644
index 000000000..43b3d94b3
--- /dev/null
+++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs
@@ -0,0 +1,58 @@
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Roland Pheasant licenses this file to you under the MIT license.
+// See the LICENSE file in the project root for full license information.
+
+using System.Reactive;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+
+namespace DynamicData.Internal;
+
+// Same-type Rx merge that owns a DeliveryQueue. Serializes notifications from
+// every input through the queue, which releases its gate before delivering, so
+// downstream observers that walk into another cache's writer lock cannot deadlock
+// with this operator's serialization point. Used where every input has the same
+// element type and no per-input projection is needed inside the drain. When element
+// types differ or per-input projections are required, route each input through
+// SharedDeliveryQueue with SynchronizeSafe and combine them with UnsynchronizedMerge.
+internal static class DeliveryQueueMergeExtensions
+{
+ // Functionally equivalent to Observable.Merge: completes only after every source
+ // completes, the first error terminates, subscription occurs in argument order.
+ public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) =>
+ Observable.Create(observer =>
+ {
+ var queue = new DeliveryQueue(observer);
+ var remainingSources = others.Length + 1;
+ var subscriptions = new CompositeDisposable(remainingSources + 1);
+
+ subscriptions.Add(first.SubscribeSafe(CreateInner()));
+ foreach (var source in others)
+ {
+ subscriptions.Add(source.SubscribeSafe(CreateInner()));
+ }
+
+ // Subscription first so any terminal notification produced during Rx's disposal
+ // cascade still flows through the still-active queue. Queue last as cleanup.
+ subscriptions.Add(queue);
+ return subscriptions;
+
+ // Each source needs its own inner observer instance because Rx's ObserverBase
+ // sets a one-shot stopped flag on the first OnCompleted or OnError. A single
+ // shared observer would silently drop terminal notifications from every source
+ // after the first. OnNext and OnError forward straight to the queue (the queue's
+ // gate serializes concurrent calls). OnCompleted is counter-gated so only the
+ // last surviving source's completion terminates the merged stream.
+ IObserver CreateInner() =>
+ Observer.Create(
+ queue.OnNext,
+ queue.OnError,
+ () =>
+ {
+ if (Interlocked.Decrement(ref remainingSources) == 0)
+ {
+ queue.OnCompleted();
+ }
+ });
+ });
+}
diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs
index ace8ed4a1..603f545c7 100644
--- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs
+++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs
@@ -2,55 +2,45 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace DynamicData.Internal;
-///
-/// Provides SynchronizeSafe extension methods, drop-in replacements
-/// for Synchronize(lock) that release the lock before downstream delivery.
-///
-///
-/// Disposal ordering matters. CompositeDisposable disposes in
-/// declaration order. The queue and the source subscription have different roles:
-///
-/// -
-/// Subscription-first (gate and SDQ overloads)
-/// The queue is the IObserver that the source sends notifications to.
-/// Disposing the subscription first allows any final terminal notification (OnCompleted/OnError
-/// triggered by Rx's disposal cascade or a Finally operator) to flow through the
-/// still-active queue. The queue is disposed last as cleanup.
-///
-/// -
-/// Queue-first (parameterless overload)
-/// Used by operators with teardown side effects (DisposeMany, OnBeingRemoved).
-/// The queue is terminated first via , which ensures
-/// all in-flight deliveries complete before the subscription is disposed and teardown logic
-/// (e.g., disposing removed items) runs. Terminal notifications are not needed because
-/// the subscriber is explicitly tearing down.
-///
-///
-///
+// Drop-in replacements for Observable.Synchronize(lock) that release the lock before
+// downstream delivery, plus UnsynchronizedMerge for combining streams whose inputs are
+// already serialized through the same queue.
+//
+// Disposal ordering matters. CompositeDisposable disposes in declaration order, and the
+// queue and the source subscription have different roles:
+//
+// Subscription-first (gate and SDQ overloads): the queue is the IObserver that the
+// source sends notifications to. Disposing the subscription first allows any final
+// terminal notification (OnCompleted or OnError triggered by Rx's disposal cascade
+// or a Finally operator) to flow through the still-active queue. The queue is
+// disposed last as cleanup.
+//
+// Queue-first (parameterless overload): used by operators with teardown side effects
+// (DisposeMany, OnBeingRemoved). The queue is terminated first via DeliveryQueue.Dispose,
+// which ensures all in-flight deliveries complete before the subscription is disposed
+// and teardown logic (e.g. disposing removed items) runs. Terminal notifications are
+// not needed because the subscriber is explicitly tearing down.
internal static class SynchronizeSafeExtensions
{
- ///
- /// Synchronizes the source observable through a .
- /// Use when multiple sources of different types share a gate.
- ///
+ // Routes the source through a SharedDeliveryQueue. Use when multiple sources of
+ // different types share a gate.
public static IObservable SynchronizeSafe(this IObservable source, SharedDeliveryQueue queue) =>
Observable.Create(observer =>
{
var subQueue = queue.CreateQueue(observer);
- // Subscription first: terminal notifications flow through the still-active sub-queue
+ // Subscription first: terminal notifications flow through the still-active sub-queue.
return new CompositeDisposable(source.SubscribeSafe(subQueue), subQueue);
});
- ///
- /// Synchronizes the source observable through an implicitly created .
- /// Drop-in replacement for Synchronize(locker).
- ///
+ // Routes the source through an implicitly created DeliveryQueue. Drop-in replacement
+ // for Observable.Synchronize(locker).
#if NET9_0_OR_GREATER
public static IObservable SynchronizeSafe(this IObservable source, Lock gate) =>
#else
@@ -60,22 +50,166 @@ public static IObservable SynchronizeSafe(this IObservable source, obje
{
var queue = new DeliveryQueue(gate, observer);
- // Subscription first: terminal notifications flow through the still-active queue
+ // Subscription first: terminal notifications flow through the still-active queue.
return new CompositeDisposable(source.SubscribeSafe(queue), queue);
});
- ///
- /// Synchronizes the source observable through an implicitly created
- /// with automatic delivery completion on dispose. The queue is terminated and drained
- /// before the source subscription is disposed, ensuring all in-flight notifications
- /// are delivered before teardown.
- ///
+ // Routes the source through an implicitly created DeliveryQueue with automatic
+ // delivery completion on dispose. The queue is terminated and drained before the
+ // source subscription is disposed, ensuring all in-flight notifications are delivered
+ // before teardown.
public static IObservable SynchronizeSafe(this IObservable source) =>
Observable.Create(observer =>
{
var queue = new DeliveryQueue(observer);
- // Queue first: ensures in-flight deliveries complete before teardown side effects run
+ // Queue first: ensures in-flight deliveries complete before teardown side effects run.
return new CompositeDisposable(queue, source.SubscribeSafe(queue));
});
+
+ // Merges every input into a single observable without taking any synchronization gate.
+ // Functionally equivalent to Observable.Merge: completes only after every source completes,
+ // the first error terminates, subscription occurs in argument order.
+ //
+ // The caller MUST ensure that delivery from every source is already serialized. In this
+ // library the precondition is satisfied by routing every source through the same
+ // SharedDeliveryQueue via SynchronizeSafe(queue). The shared queue's drain loop guarantees
+ // that at most one notification is in flight to the downstream observer at a time, so the
+ // additional gate that Observable.Merge would install is redundant.
+ //
+ // Removing that gate matters in cross-cache pipelines: Observable.Merge holds its private
+ // _gate for the entire duration of downstream delivery, and when downstream delivery walks
+ // into another cache's writer lock, two such gates on two operators form an ABBA cycle that
+ // the queue-drain design is meant to prevent.
+ //
+ // Without the external serialization precondition, concurrent OnNext calls into the shared
+ // observer will race. Do not use as a general-purpose Observable.Merge replacement.
+ public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) =>
+ Observable.Create(observer =>
+ {
+ var remainingSources = others.Length + 1;
+ var subscriptions = new CompositeDisposable(remainingSources);
+ var terminated = 0;
+
+ subscriptions.Add(first.SubscribeSafe(CreateInner()));
+ foreach (var source in others)
+ {
+ subscriptions.Add(source.SubscribeSafe(CreateInner()));
+ }
+
+ return subscriptions;
+
+ // Each source needs its own inner observer instance because Rx's ObserverBase sets
+ // a one-shot stopped flag on the first OnCompleted or OnError. A single shared
+ // observer would silently drop terminal notifications from every source after the
+ // first. The OnNext/OnError/OnCompleted actions close over the shared remainingSources
+ // and terminated counters so cross-source coordination still works.
+ IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe);
+
+ void OnNextSafe(T value)
+ {
+ if (Volatile.Read(ref terminated) == 0)
+ {
+ observer.OnNext(value);
+ }
+ }
+
+ void OnErrorSafe(Exception error)
+ {
+ if (Interlocked.Exchange(ref terminated, 1) == 0)
+ {
+ observer.OnError(error);
+ }
+ }
+
+ void OnCompletedSafe()
+ {
+ if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0)
+ {
+ observer.OnCompleted();
+ }
+ }
+ });
+
+ // Two-input CombineLatest variant that does NOT install a gate. Functionally equivalent
+ // to Observable.CombineLatest: holds the most-recent value from each source, emits a
+ // resultSelector output whenever either source fires (provided the other has also fired
+ // at least once), the first error terminates, completes when both sources complete.
+ //
+ // Same precondition as UnsynchronizedMerge: delivery from BOTH sources must already be
+ // serialized through the same external gate before reaching this operator. In this library
+ // that is satisfied by routing both inputs through the same SharedDeliveryQueue via
+ // SynchronizeSafe(queue). Under that precondition no two OnNext calls overlap, so the
+ // latest-value state needs no internal locking, and the gate that
+ // Observable.CombineLatest installs becomes redundant.
+ //
+ // The Rx gate matters here for the same reason as Merge: Observable.CombineLatest holds
+ // its private _gate for the entire downstream delivery, and any operator-level lock held
+ // across a cross-cache write reconstructs the ABBA cycle the queue-drain design eliminated.
+ //
+ // Without the external serialization precondition, concurrent OnNext calls would race the
+ // latest-value state and could produce torn reads. Do not use as a general-purpose
+ // Observable.CombineLatest replacement.
+ public static IObservable UnsynchronizedCombineLatest(
+ this IObservable first,
+ IObservable second,
+ Func resultSelector)
+ where TFirst : notnull
+ where TSecond : notnull =>
+ Observable.Create(observer =>
+ {
+ var firstLatest = Optional.None();
+ var secondLatest = Optional.None();
+ var remainingSources = 2;
+ var terminated = 0;
+
+ var subscriptions = new CompositeDisposable(2);
+ subscriptions.Add(first.SubscribeSafe(Observer.Create(OnFirstNext, OnErrorSafe, OnCompletedSafe)));
+ subscriptions.Add(second.SubscribeSafe(Observer.Create(OnSecondNext, OnErrorSafe, OnCompletedSafe)));
+ return subscriptions;
+
+ void OnFirstNext(TFirst value)
+ {
+ if (Volatile.Read(ref terminated) != 0)
+ {
+ return;
+ }
+
+ firstLatest = value;
+ if (secondLatest.HasValue)
+ {
+ observer.OnNext(resultSelector(value, secondLatest.Value));
+ }
+ }
+
+ void OnSecondNext(TSecond value)
+ {
+ if (Volatile.Read(ref terminated) != 0)
+ {
+ return;
+ }
+
+ secondLatest = value;
+ if (firstLatest.HasValue)
+ {
+ observer.OnNext(resultSelector(firstLatest.Value, value));
+ }
+ }
+
+ void OnErrorSafe(Exception error)
+ {
+ if (Interlocked.Exchange(ref terminated, 1) == 0)
+ {
+ observer.OnError(error);
+ }
+ }
+
+ void OnCompletedSafe()
+ {
+ if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0)
+ {
+ observer.OnCompleted();
+ }
+ }
+ });
}
diff --git a/src/DynamicData/List/Internal/BufferIf.cs b/src/DynamicData/List/Internal/BufferIf.cs
index 8f7d1993c..fb85adf4e 100644
--- a/src/DynamicData/List/Internal/BufferIf.cs
+++ b/src/DynamicData/List/Internal/BufferIf.cs
@@ -7,6 +7,8 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class BufferIf(IObservable> source, IObservable pauseIfTrueSelector, bool initialPauseState = false, TimeSpan? timeOut = null, IScheduler? scheduler = null)
@@ -23,13 +25,16 @@ internal sealed class BufferIf(IObservable> source, IObservable
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the
+ // gate lock is released before downstream OnNext. Closes the cross-cache
+ // deadlock window.
+ var queue = new SharedDeliveryQueue();
var paused = initialPauseState;
var buffer = new ChangeSet();
var timeoutSubscriber = new SerialDisposable();
var timeoutSubject = new Subject();
- var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.Merge(timeoutSubject)).ObserveOn(_scheduler).Synchronize(locker).Publish();
+ var bufferSelector = Observable.Return(initialPauseState).Concat(_pauseIfTrueSelector.DeliveryQueueMerge(timeoutSubject)).ObserveOn(_scheduler).SynchronizeSafe(queue).Publish();
var pause = bufferSelector.Where(state => state).Subscribe(
_ =>
@@ -61,7 +66,7 @@ public IObservable> Run() => Observable.Create>(
timeoutSubscriber.Disposable = Disposable.Empty;
});
- var updateSubscriber = _source.Synchronize(locker).Subscribe(
+ var updateSubscriber = _source.SynchronizeSafe(queue).Subscribe(
updates =>
{
if (paused)
@@ -85,6 +90,7 @@ public IObservable> Run() => Observable.Create>(
updateSubscriber.Dispose();
timeoutSubject.OnCompleted();
timeoutSubscriber.Dispose();
+ queue.Dispose();
});
});
}
diff --git a/src/DynamicData/List/Internal/Combiner.cs b/src/DynamicData/List/Internal/Combiner.cs
index 64882a3ba..0c509a047 100644
--- a/src/DynamicData/List/Internal/Combiner.cs
+++ b/src/DynamicData/List/Internal/Combiner.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -6,48 +6,47 @@
using System.Reactive.Linq;
using DynamicData.Cache.Internal;
+using DynamicData.Internal;
namespace DynamicData.List.Internal;
internal sealed class Combiner(ICollection>> source, CombineOperator type)
where T : notnull
{
-#if NET9_0_OR_GREATER
- private readonly Lock _locker = new();
-#else
- private readonly object _locker = new();
-#endif
-
private readonly ICollection>> _source = source ?? throw new ArgumentNullException(nameof(source));
public IObservable> Run() => Observable.Create>(
observer =>
{
var disposable = new CompositeDisposable();
-
var resultList = new ChangeAwareListWithRefCounts();
+ var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList();
- lock (_locker)
- {
- var sourceLists = Enumerable.Range(0, _source.Count).Select(_ => new ReferenceCountTracker()).ToList();
+ // Shared queue serializes the multiple source streams so they appear as a
+ // single sequence to the combiner, but without holding a lock during the
+ // downstream observer.OnNext call.
+ var queue = new SharedDeliveryQueue();
- foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list }))
- {
- disposable.Add(
- pair.Item.Synchronize(_locker).Subscribe(
- changes =>
+ foreach (var pair in _source.Zip(sourceLists, (item, list) => new { Item = item, List = list }))
+ {
+ disposable.Add(
+ pair.Item.SynchronizeSafe(queue).Subscribe(
+ changes =>
+ {
+ CloneSourceList(pair.List, changes);
+
+ var notifications = UpdateResultList(changes, sourceLists, resultList);
+ if (notifications.Count != 0)
{
- CloneSourceList(pair.List, changes);
-
- var notifications = UpdateResultList(changes, sourceLists, resultList);
- if (notifications.Count != 0)
- {
- observer.OnNext(notifications);
- }
- }));
- }
+ observer.OnNext(notifications);
+ }
+ },
+ observer.OnError));
}
+ // Dispose subscriptions first (stops further notifications from arriving),
+ // then drain/terminate the shared queue.
+ disposable.Add(queue);
return disposable;
});
diff --git a/src/DynamicData/List/Internal/DisposeMany.cs b/src/DynamicData/List/Internal/DisposeMany.cs
index 046e19e4d..cc7b21dd9 100644
--- a/src/DynamicData/List/Internal/DisposeMany.cs
+++ b/src/DynamicData/List/Internal/DisposeMany.cs
@@ -6,6 +6,8 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class DisposeMany(IObservable> source)
@@ -14,11 +16,13 @@ internal sealed class DisposeMany(IObservable> source)
public IObservable> Run()
=> Observable.Create>(observer =>
{
- // Will be locking on cachedItems directly, instead of using an anonymous gate object. This is acceptable, since it's a privately-held object, there's no risk of deadlock from other consumers locking on it.
var cachedItems = new List();
+ // SynchronizeSafe with queue-first disposal: the DeliveryQueue is terminated and
+ // drained before the source subscription is disposed, ensuring no notifications
+ // can fire while the teardown (per-item Dispose) is in progress.
var sourceSubscription = source
- .Synchronize(cachedItems)
+ .SynchronizeSafe()
.SubscribeSafe(Observer.Create>(
onNext: changeSet =>
{
@@ -76,11 +80,7 @@ public IObservable> Run()
return Disposable.Create(() =>
{
sourceSubscription.Dispose();
-
- lock (cachedItems)
- {
- ProcessFinalization(cachedItems);
- }
+ ProcessFinalization(cachedItems);
});
});
diff --git a/src/DynamicData/List/Internal/DynamicCombiner.cs b/src/DynamicData/List/Internal/DynamicCombiner.cs
index bb3091378..2d7dd9470 100644
--- a/src/DynamicData/List/Internal/DynamicCombiner.cs
+++ b/src/DynamicData/List/Internal/DynamicCombiner.cs
@@ -6,32 +6,32 @@
using System.Reactive.Linq;
using DynamicData.Cache.Internal;
+using DynamicData.Internal;
namespace DynamicData.List.Internal;
internal sealed class DynamicCombiner(IObservableList>> source, CombineOperator type)
where T : notnull
{
-#if NET9_0_OR_GREATER
- private readonly Lock _locker = new();
-#else
- private readonly object _locker = new();
-#endif
-
private readonly IObservableList>> _source = source ?? throw new ArgumentNullException(nameof(source));
public IObservable> Run() => Observable.Create>(
observer =>
{
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(_locker) so the
+ // gate is released before downstream OnNext. Closes the cross-cache deadlock
+ // window.
+ var queue = new SharedDeliveryQueue();
+
// this is the resulting list which produces all notifications
var resultList = new ChangeAwareListWithRefCounts();
// Transform to a merge container.
// This populates a RefTracker when the original source is subscribed to
- var sourceLists = _source.Connect().Synchronize(_locker).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList();
+ var sourceLists = _source.Connect().SynchronizeSafe(queue).Transform(changeSet => new MergeContainer(changeSet)).AsObservableList();
// merge the items back together
- var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).Synchronize(_locker).Subscribe(
+ var allChanges = sourceLists.Connect().MergeMany(mc => mc.Source).SynchronizeSafe(queue).Subscribe(
changes =>
{
// Populate result list and check for changes
@@ -86,7 +86,7 @@ public IObservable> Run() => Observable.Create>(
}
}).Subscribe();
- return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged);
+ return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged, queue);
});
private bool MatchesConstraint(MergeContainer[] sourceLists, T item)
diff --git a/src/DynamicData/List/Internal/ExpireAfter.cs b/src/DynamicData/List/Internal/ExpireAfter.cs
index 97a39c84e..ffc9e72d3 100644
--- a/src/DynamicData/List/Internal/ExpireAfter.cs
+++ b/src/DynamicData/List/Internal/ExpireAfter.cs
@@ -39,8 +39,15 @@ public static IObservable> Create(
private abstract class SubscriptionBase
: IDisposable
{
- private readonly List _expirationDueTimes;
- private readonly List _expiringIndexesBuffer;
+ // Shadow of the source maintained from OnSourceNext, carrying both the item and its
+ // expiration time per occurrence. Item identity (by value) is used during expiration
+ // because the shadow may be stale relative to the source: the new SourceList uses
+ // queue-based delivery, so notifications drain after the producing Edit releases its
+ // lock. A concurrent ManageExpirations cannot rely on shadow indices matching source
+ // indices; matching by item value via updater.IndexOf tolerates this divergence and
+ // simply skips items that were already removed externally.
+ private readonly List _shadow;
+ private readonly List _expiringShadowIndexesBuffer;
private readonly IObserver> _observer;
private readonly Action> _onEditingSource;
private readonly IScheduler _scheduler;
@@ -65,8 +72,8 @@ protected SubscriptionBase(
_onEditingSource = OnEditingSource;
- _expirationDueTimes = new();
- _expiringIndexesBuffer = new();
+ _shadow = new();
+ _expiringShadowIndexesBuffer = new();
_sourceSubscription = source
.Connect()
@@ -94,7 +101,7 @@ protected IScheduler Scheduler
// Instead of using a dedicated _synchronizationGate object, we can save an allocation by using any object that is never exposed to public consumers.
protected object SynchronizationGate
- => _expirationDueTimes;
+ => _shadow;
protected abstract DateTimeOffset? GetNextManagementDueTime();
@@ -102,9 +109,9 @@ protected object SynchronizationGate
{
var result = null as DateTimeOffset?;
- foreach (var dueTime in _expirationDueTimes)
+ foreach (var entry in _shadow)
{
- if ((dueTime is { } value) && ((result is null) || (value < result)))
+ if ((entry.DueTime is { } value) && ((result is null) || (value < result)))
result = value;
}
@@ -141,40 +148,85 @@ private void OnEditingSource(IExtendedList updater)
var now = Scheduler.Now;
- // One major note here: we are NOT updating our internal state, except to mark items as no longer needing to expire.
- // Once we're done with the source.Edit() here, it will fire of a changeset for the removals, which will get handled by OnSourceNext(),
- // thus bringing all of our internal state back into sync.
+ // We are NOT updating our shadow here, except to mark items as no longer needing to expire.
+ // Once source.Edit() returns, the resulting changeset will reach OnSourceNext and bring _shadow
+ // back into sync. The shadow may have been stale on entry (concurrent edits could be queued
+ // behind a pending drain), so we identify removals by item value instead of by shadow index.
- // Buffer removals, so we can eliminate the need for index adjustments as we update the source
- for (var i = 0; i < _expirationDueTimes.Count; ++i)
+ for (var i = 0; i < _shadow.Count; ++i)
{
- if ((_expirationDueTimes[i] is { } dueTime) && (dueTime <= now))
+ if ((_shadow[i].DueTime is { } dueTime) && (dueTime <= now))
{
- _expiringIndexesBuffer.Add(i);
+ _expiringShadowIndexesBuffer.Add(i);
- // This shouldn't be necessary, but it guarantees we don't accidentally expire an item more than once,
- // in the event of a race condition or something we haven't predicted.
- _expirationDueTimes[i] = null;
+ // Mark as processed so we don't expire the same shadow slot twice if reentered.
+ _shadow[i] = new ItemEntry(_shadow[i].Item, null);
}
}
- // I'm pretty sure it shouldn't be possible to end up with no removals here, but it costs basically nothing to check.
- if (_expiringIndexesBuffer.Count is not 0)
+ if (_expiringShadowIndexesBuffer.Count is not 0)
{
- // Processing removals in reverse-index order eliminates the need for us to adjust index of each .RemoveAt() call, as we go.
- _expiringIndexesBuffer.Sort(static (x, y) => y.CompareTo(x));
+ var removedItems = new List(_expiringShadowIndexesBuffer.Count);
+
+ // Validate the shadow against the live updater all the way up to (and including)
+ // the LAST expiring index. The previous version of this code only checked the
+ // prefix up to the FIRST expiring index, which can falsely declare the shadow
+ // in sync when a concurrent external mutation has moved/replaced an item at a
+ // position between the first and last expiring index. The subsequent reverse-
+ // index removal would then remove an unrelated item. If the shadow is stale we
+ // fall back to value-based IndexOf, which may remove a different equal
+ // occurrence than originally scheduled but at least keeps the source consistent
+ // with what subscribers see.
+ var lastExpiringShadowIdx = _expiringShadowIndexesBuffer[_expiringShadowIndexesBuffer.Count - 1];
+ var shadowInSync = _shadow.Count == updater.Count;
+ if (shadowInSync)
+ {
+ for (var i = 0; i <= lastExpiringShadowIdx && i < updater.Count; ++i)
+ {
+ if (!EqualityComparer.Default.Equals(_shadow[i].Item, updater[i]))
+ {
+ shadowInSync = false;
+ break;
+ }
+ }
+ }
- var removedItems = new T[_expiringIndexesBuffer.Count];
- for (var i = 0; i < _expiringIndexesBuffer.Count; ++i)
+ if (shadowInSync)
{
- var removedIndex = _expiringIndexesBuffer[i];
- removedItems[i] = updater[removedIndex];
- updater.RemoveAt(removedIndex);
+ // Index-based removal in REVERSE shadow order so earlier indices stay
+ // valid as we remove later items. This matches the legacy behaviour and
+ // correctly distinguishes between equal duplicate occurrences with
+ // different expiration times.
+ for (var i = _expiringShadowIndexesBuffer.Count - 1; i >= 0; --i)
+ {
+ var shadowIdx = _expiringShadowIndexesBuffer[i];
+ removedItems.Add(updater[shadowIdx]);
+ updater.RemoveAt(shadowIdx);
+ }
+ }
+ else
+ {
+ // Shadow is stale (concurrent external mutation has been queued but not
+ // yet delivered to OnSourceNext). Fall back to value-based search.
+ // Iterate shadow forward; for each expiring entry remove the first
+ // matching live occurrence. Silently skip items that have already been
+ // removed externally; the pending OnSourceNext will reconcile the shadow.
+ for (var i = 0; i < _expiringShadowIndexesBuffer.Count; ++i)
+ {
+ var item = _shadow[_expiringShadowIndexesBuffer[i]].Item;
+ var idx = updater.IndexOf(item);
+ if (idx >= 0)
+ {
+ updater.RemoveAt(idx);
+ removedItems.Add(item);
+ }
+ }
}
- _observer.OnNext(removedItems);
+ if (removedItems.Count > 0)
+ _observer.OnNext(removedItems);
- _expiringIndexesBuffer.Clear();
+ _expiringShadowIndexesBuffer.Clear();
}
OnExpirationsManaged(thisScheduledManagement.DueTime);
@@ -250,9 +302,9 @@ private void OnSourceNext(IChangeSet changes)
{
var dueTime = now + _timeSelector.Invoke(change.Item.Current);
- _expirationDueTimes.Insert(
+ _shadow.Insert(
index: change.Item.CurrentIndex,
- item: dueTime);
+ item: new ItemEntry(change.Item.Current, dueTime));
haveExpirationDueTimesChanged |= dueTime is not null;
}
@@ -260,16 +312,16 @@ private void OnSourceNext(IChangeSet changes)
case ListChangeReason.AddRange:
{
- _expirationDueTimes.EnsureCapacity(_expirationDueTimes.Count + change.Range.Count);
+ _shadow.EnsureCapacity(_shadow.Count + change.Range.Count);
var itemIndex = change.Range.Index;
foreach (var item in change.Range)
{
var dueTime = now + _timeSelector.Invoke(item);
- _expirationDueTimes.Insert(
+ _shadow.Insert(
index: itemIndex,
- item: dueTime);
+ item: new ItemEntry(item, dueTime));
haveExpirationDueTimesChanged |= dueTime is not null;
@@ -279,37 +331,37 @@ private void OnSourceNext(IChangeSet changes)
break;
case ListChangeReason.Clear:
- foreach (var dueTime in _expirationDueTimes)
+ foreach (var entry in _shadow)
{
- if (dueTime is not null)
+ if (entry.DueTime is not null)
{
haveExpirationDueTimesChanged = true;
break;
}
}
- _expirationDueTimes.Clear();
+ _shadow.Clear();
break;
case ListChangeReason.Moved:
{
- var expirationDueTime = _expirationDueTimes[change.Item.PreviousIndex];
+ var entry = _shadow[change.Item.PreviousIndex];
- _expirationDueTimes.RemoveAt(change.Item.PreviousIndex);
- _expirationDueTimes.Insert(
+ _shadow.RemoveAt(change.Item.PreviousIndex);
+ _shadow.Insert(
index: change.Item.CurrentIndex,
- item: expirationDueTime);
+ item: entry);
}
break;
case ListChangeReason.Remove:
{
- if (_expirationDueTimes[change.Item.CurrentIndex] is not null)
+ if (_shadow[change.Item.CurrentIndex].DueTime is not null)
{
haveExpirationDueTimesChanged = true;
}
- _expirationDueTimes.RemoveAt(change.Item.CurrentIndex);
+ _shadow.RemoveAt(change.Item.CurrentIndex);
}
break;
@@ -318,25 +370,25 @@ private void OnSourceNext(IChangeSet changes)
var rangeEndIndex = change.Range.Index + change.Range.Count - 1;
for (var i = change.Range.Index; i <= rangeEndIndex; ++i)
{
- if (_expirationDueTimes[i] is not null)
+ if (_shadow[i].DueTime is not null)
{
haveExpirationDueTimesChanged = true;
break;
}
}
- _expirationDueTimes.RemoveRange(change.Range.Index, change.Range.Count);
+ _shadow.RemoveRange(change.Range.Index, change.Range.Count);
}
break;
case ListChangeReason.Replace:
{
- var oldDueTime = _expirationDueTimes[change.Item.CurrentIndex];
+ var oldDueTime = _shadow[change.Item.CurrentIndex].DueTime;
var newDueTime = now + _timeSelector.Invoke(change.Item.Current);
// Ignoring the possibility that the item's index has changed as well, because ISourceList does not allow for this.
- _expirationDueTimes[change.Item.CurrentIndex] = newDueTime;
+ _shadow[change.Item.CurrentIndex] = new ItemEntry(change.Item.Current, newDueTime);
haveExpirationDueTimesChanged |= newDueTime != oldDueTime;
}
@@ -369,6 +421,8 @@ private readonly record struct ScheduledManagement
public required DateTimeOffset DueTime { get; init; }
}
+
+ private readonly record struct ItemEntry(T Item, DateTimeOffset? DueTime);
}
private sealed class OnDemandSubscription
diff --git a/src/DynamicData/List/Internal/Filter.Dynamic.cs b/src/DynamicData/List/Internal/Filter.Dynamic.cs
index 8d9b5e8dc..7b0b8c0dd 100644
--- a/src/DynamicData/List/Internal/Filter.Dynamic.cs
+++ b/src/DynamicData/List/Internal/Filter.Dynamic.cs
@@ -4,6 +4,8 @@
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal static partial class Filter
@@ -36,7 +38,10 @@ public Dynamic(IObservable> source, Func predicate, ListF
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so
+ // the gate lock is released before downstream OnNext. Closes the cross-
+ // cache deadlock window.
+ var queue = new SharedDeliveryQueue();
Func predicate = _ => false;
var all = new List();
@@ -57,7 +62,7 @@ public IObservable> Run() => Observable.Create>(
throw new InvalidOperationException("The predicates is not set and the change is not a immutableFilter.");
}
- predicateChanged = _predicates.Synchronize(locker).Select(
+ predicateChanged = _predicates.SynchronizeSafe(queue).Select(
newPredicate =>
{
predicate = newPredicate;
@@ -72,7 +77,7 @@ public IObservable> Run() => Observable.Create>(
*/
// Need to get item by index and store it in the transform
- var filteredResult = _source.Synchronize(locker).Transform(
+ var filteredResult = _source.SynchronizeSafe(queue).Transform(
(t, previous) =>
{
var wasMatch = previous.ConvertOr(p => p!.IsMatch, () => false);
@@ -90,9 +95,11 @@ public IObservable> Run() => Observable.Create>(
return Process(filtered, changes);
});
- return predicateChanged.Merge(filteredResult).NotEmpty()
+ var publisher = predicateChanged.UnsynchronizedMerge(filteredResult).NotEmpty()
.Select(changes => changes.Transform(iwm => iwm.Item)) // use convert, not transform
.SubscribeSafe(observer);
+
+ return new System.Reactive.Disposables.CompositeDisposable(publisher, queue);
});
private static IChangeSet Process(ChangeAwareList filtered, IChangeSet changes)
diff --git a/src/DynamicData/List/Internal/GroupOn.cs b/src/DynamicData/List/Internal/GroupOn.cs
index 6ea7deccd..70bb10b30 100644
--- a/src/DynamicData/List/Internal/GroupOn.cs
+++ b/src/DynamicData/List/Internal/GroupOn.cs
@@ -6,6 +6,8 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class GroupOn(IObservable> source, Func groupSelector, IObservable? regrouper)
@@ -27,19 +29,22 @@ public IObservable>> Run() => Observable.C
// capture the grouping up front which has the benefit that the group key is only selected once
var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true);
- var locker = InternalEx.NewLock();
- var shared = itemsWithGroup.Synchronize(locker).Publish();
+ // Shared queue serializes the source changesets with the optional regrouper signal
+ // so they appear as a single sequence to the combiner, without holding a lock
+ // during the downstream observer.OnNext call.
+ var queue = new SharedDeliveryQueue();
+ var shared = itemsWithGroup.SynchronizeSafe(queue).Publish();
var grouper = shared.Select(changes => Process(groupings, groupCache, changes));
var regrouperFunc = _regrouper is null ?
Observable.Never>>() :
- _regrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
+ _regrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
- var publisher = grouper.Merge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable
+ var publisher = grouper.UnsynchronizedMerge(regrouperFunc).DisposeMany() // dispose removes as the grouping is disposable
.NotEmpty().SubscribeSafe(observer);
- return new CompositeDisposable(publisher, shared.Connect());
+ return new CompositeDisposable(publisher, shared.Connect(), queue);
});
private static GroupWithAddIndicator GetCache(IDictionary> groupCaches, TGroupKey key)
diff --git a/src/DynamicData/List/Internal/GroupOnImmutable.cs b/src/DynamicData/List/Internal/GroupOnImmutable.cs
index 2b4a08be8..dab161800 100644
--- a/src/DynamicData/List/Internal/GroupOnImmutable.cs
+++ b/src/DynamicData/List/Internal/GroupOnImmutable.cs
@@ -6,6 +6,8 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class GroupOnImmutable(IObservable> source, Func groupSelector, IObservable? reGrouper)
@@ -24,24 +26,23 @@ public IObservable>> Run() => Observabl
var groupings = new ChangeAwareList>();
var groupCache = new Dictionary();
- // var itemsWithGroup = _source
- // .Transform(t => new ItemWithValue(t, _groupSelector(t)));
-
// capture the grouping up front which has the benefit that the group key is only selected once
var itemsWithGroup = _source.Transform((t, previous) => new ItemWithGroupKey(t, _groupSelector(t), previous.Convert(p => p.Group)), true);
- var locker = InternalEx.NewLock();
- var shared = itemsWithGroup.Synchronize(locker).Publish();
+ // Shared queue serializes the source changesets with the optional regrouper signal
+ // without holding a lock during downstream delivery.
+ var queue = new SharedDeliveryQueue();
+ var shared = itemsWithGroup.SynchronizeSafe(queue).Publish();
var grouper = shared.Select(changes => Process(groupings, groupCache, changes));
var reGroupFunc = _reGrouper is null ?
Observable.Never>>() :
- _reGrouper.Synchronize(locker).CombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
+ _reGrouper.SynchronizeSafe(queue).UnsynchronizedCombineLatest(shared.ToCollection(), (_, collection) => Regroup(groupings, groupCache, collection));
- var publisher = grouper.Merge(reGroupFunc).NotEmpty().SubscribeSafe(observer);
+ var publisher = grouper.UnsynchronizedMerge(reGroupFunc).NotEmpty().SubscribeSafe(observer);
- return new CompositeDisposable(publisher, shared.Connect());
+ return new CompositeDisposable(publisher, shared.Connect(), queue);
});
private static IChangeSet> CreateChangeSet(ChangeAwareList> result, IDictionary allGroupings, IDictionary> initialStateOfGroups)
diff --git a/src/DynamicData/List/Internal/LimitSizeTo.cs b/src/DynamicData/List/Internal/LimitSizeTo.cs
index c2ed74896..f66baafe9 100644
--- a/src/DynamicData/List/Internal/LimitSizeTo.cs
+++ b/src/DynamicData/List/Internal/LimitSizeTo.cs
@@ -5,14 +5,11 @@
using System.Reactive.Concurrency;
using System.Reactive.Linq;
-namespace DynamicData.List.Internal;
+using DynamicData.Internal;
-#if NET9_0_OR_GREATER
-internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, Lock locker)
-#else
-internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, object locker)
-#endif
+namespace DynamicData.List.Internal;
+internal sealed class LimitSizeTo(ISourceList sourceList, int sizeLimit, IScheduler scheduler, SharedDeliveryQueue queue)
where T : notnull
{
private readonly IScheduler _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
@@ -23,7 +20,7 @@ public IObservable> Run()
var emptyResult = new List();
long orderItemWasAdded = -1;
- return _sourceList.Connect().ObserveOn(_scheduler).Synchronize(locker).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select(
+ return _sourceList.Connect().ObserveOn(_scheduler).SynchronizeSafe(queue).Transform(t => new ExpirableItem(t, _scheduler.Now.UtcDateTime, Interlocked.Increment(ref orderItemWasAdded))).ToCollection().Select(
list =>
{
var numberToExpire = list.Count - sizeLimit;
diff --git a/src/DynamicData/List/Internal/MergeChangeSets.cs b/src/DynamicData/List/Internal/MergeChangeSets.cs
index 480c69b3a..6ede1188c 100644
--- a/src/DynamicData/List/Internal/MergeChangeSets.cs
+++ b/src/DynamicData/List/Internal/MergeChangeSets.cs
@@ -3,8 +3,11 @@
// See the LICENSE file in the project root for full license information.
using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
///
@@ -21,19 +24,24 @@ public MergeChangeSets(IEnumerable>> source, IEq
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the
+ // gate is released before downstream OnNext. Closes the cross-cache deadlock
+ // window.
+ var queue = new SharedDeliveryQueue();
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker();
// Merge all of the changeset streams together and Process them with the change tracker which will emit the results
- return CreateClonedListObservable(source, locker)
- .Synchronize(locker)
+ var publisher = CreateClonedListObservable(source, queue)
+ .SynchronizeSafe(queue)
.MergeMany(clonedList => clonedList.Source.RemoveIndex().Do(static _ => { }, observer.OnError))
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
+
+ return new CompositeDisposable(publisher, queue);
});
private static IObservable>> CreateObservable(IEnumerable>> source, bool completable, IScheduler? scheduler)
@@ -48,21 +56,10 @@ private static IObservable>> CreateObservable(IE
return obs;
}
- // Can optimize for the Add case because that's the only one that applies
-#if NET9_0_OR_GREATER
- private Change> CreateChange(IObservable> source, Lock locker) =>
- new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer));
+ private Change> CreateChange(IObservable> source, SharedDeliveryQueue queue) =>
+ new(ListChangeReason.Add, new ClonedListChangeSet(source.SynchronizeSafe(queue), equalityComparer));
// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
- private IObservable>> CreateClonedListObservable(IObservable>> source, Lock locker) =>
- source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) }));
-#else
- private Change> CreateChange(IObservable> source, object locker) =>
- new(ListChangeReason.Add, new ClonedListChangeSet(source.Synchronize(locker), equalityComparer));
-
- // Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
- private IObservable>> CreateClonedListObservable(IObservable>> source, object locker) =>
- source.Select(src => new ChangeSet>(new[] { CreateChange(src, locker) }));
-#endif
-
+ private IObservable>> CreateClonedListObservable(IObservable>> source, SharedDeliveryQueue queue) =>
+ source.Select(src => new ChangeSet>(new[] { CreateChange(src, queue) }));
}
diff --git a/src/DynamicData/List/Internal/Pager.cs b/src/DynamicData/List/Internal/Pager.cs
index bd3238d2a..448cde058 100644
--- a/src/DynamicData/List/Internal/Pager.cs
+++ b/src/DynamicData/List/Internal/Pager.cs
@@ -3,8 +3,11 @@
// See the LICENSE file in the project root for full license information.
using System.Collections;
+using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class Pager(IObservable> source, IObservable requests)
@@ -17,13 +20,16 @@ internal sealed class Pager(IObservable> source, IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the
+ // gate lock is released before downstream OnNext. Closes the cross-cache
+ // deadlock window.
+ var queue = new SharedDeliveryQueue();
var all = new List();
var paged = new ChangeAwareList();
IPageRequest parameters = new PageRequest(0, 25);
- var requestStream = _requests.Synchronize(locker).Select(
+ var requestStream = _requests.SynchronizeSafe(queue).Select(
request =>
{
parameters = request;
@@ -31,14 +37,16 @@ public IObservable> Run() => Observable.Create Page(all, paged, parameters, changes));
- return requestStream
- .Merge(dataChanged)
+ var publisher = requestStream
+ .UnsynchronizedMerge(dataChanged)
.Where(changes => changes is not null && changes.Count != 0)
.Select(x => x!)
.SubscribeSafe(observer);
+
+ return new CompositeDisposable(publisher, queue);
});
private static int CalculatePages(ICollection all, IPageRequest? request)
diff --git a/src/DynamicData/List/Internal/Sort.cs b/src/DynamicData/List/Internal/Sort.cs
index ff03e2e70..1c5629c62 100644
--- a/src/DynamicData/List/Internal/Sort.cs
+++ b/src/DynamicData/List/Internal/Sort.cs
@@ -3,8 +3,11 @@
// See the LICENSE file in the project root for full license information.
using System.Reactive;
+using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class Sort(IObservable> source, IComparer? comparer, SortOptions sortOptions, IObservable? resort, IObservable>? comparerObservable, int resetThreshold)
@@ -19,11 +22,15 @@ internal sealed class Sort(IObservable> source, IComparer? c
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the
+ // lock is released before downstream OnNext. This closes the cross-cache
+ // deadlock window when a downstream consumer also acquires another cache's
+ // lock during its OnNext processing.
+ var queue = new SharedDeliveryQueue();
var original = new List();
var target = new ChangeAwareList();
- var dataChanged = _source.Synchronize(locker).Select(
+ var dataChanged = _source.SynchronizeSafe(queue).Select(
changes =>
{
if (resetThreshold > 1)
@@ -33,10 +40,12 @@ public IObservable> Run() => Observable.Create>(
return changes.TotalChanges > resetThreshold ? Reset(original, target) : Process(target, changes);
});
- var resortSync = _resort.Synchronize(locker).Select(_ => Reorder(target));
- var changeComparer = _comparerObservable.Synchronize(locker).Select(comparer => ChangeComparer(target, comparer));
+ var resortSync = _resort.SynchronizeSafe(queue).Select(_ => Reorder(target));
+ var changeComparer = _comparerObservable.SynchronizeSafe(queue).Select(comparer => ChangeComparer(target, comparer));
+
+ var publisher = changeComparer.UnsynchronizedMerge(resortSync, dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer);
- return changeComparer.Merge(resortSync).Merge(dataChanged).Where(changes => changes.Count != 0).SubscribeSafe(observer);
+ return new CompositeDisposable(publisher, queue);
});
private IChangeSet ChangeComparer(ChangeAwareList target, IComparer comparer)
diff --git a/src/DynamicData/List/Internal/Switch.cs b/src/DynamicData/List/Internal/Switch.cs
index 9425771dd..a89d1ba7f 100644
--- a/src/DynamicData/List/Internal/Switch.cs
+++ b/src/DynamicData/List/Internal/Switch.cs
@@ -4,6 +4,9 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
+using System.Reactive.Subjects;
+
+using DynamicData.Internal;
namespace DynamicData.List.Internal;
@@ -15,21 +18,37 @@ internal sealed class Switch(IObservable>> sources)
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
-
+ var queue = new SharedDeliveryQueue();
var destination = new SourceList();
+ var errors = new Subject>();
+ var innerSubscription = new SerialDisposable();
- var populator = Observable.Switch(
- _sources.Do(
- _ =>
+ // The outer (sources) and every inner are routed through the same SharedDeliveryQueue.
+ // Both the per-source clear and the per-changeset destination write happen on the drain
+ // thread, so destination.Connect() emissions and any errors.OnError calls also originate
+ // from inside the drain. The downstream merge therefore sees pre-serialized inputs and
+ // uses UnsynchronizedMerge to avoid the ABBA-prone Observable.Merge gate. Inlines what
+ // Observable.Switch would have done (whose own gate would itself be ABBA-prone).
+ var sourcesSubscription = _sources
+ .SynchronizeSafe(queue)
+ .SubscribeSafe(
+ onNext: newSource =>
{
- lock (locker)
- {
- destination.Clear();
- }
- })).Synchronize(locker).PopulateInto(destination);
+ destination.Clear();
+ innerSubscription.Disposable = newSource
+ .SynchronizeSafe(queue)
+ .SubscribeSafe(
+ onNext: changes => destination.Edit(updater => updater.Clone(changes)),
+ onError: errors.OnError);
+ },
+ onError: errors.OnError);
- var publisher = destination.Connect().SubscribeSafe(observer);
- return new CompositeDisposable(destination, populator, publisher);
+ return new CompositeDisposable(
+ destination,
+ errors,
+ sourcesSubscription,
+ innerSubscription,
+ destination.Connect().UnsynchronizedMerge(errors).SubscribeSafe(observer),
+ queue);
});
}
diff --git a/src/DynamicData/List/Internal/TransformMany.cs b/src/DynamicData/List/Internal/TransformMany.cs
index d7bb2a69e..418e027be 100644
--- a/src/DynamicData/List/Internal/TransformMany.cs
+++ b/src/DynamicData/List/Internal/TransformMany.cs
@@ -8,6 +8,7 @@
using System.Reactive.Linq;
using DynamicData.Binding;
+using DynamicData.Internal;
namespace DynamicData.List.Internal;
@@ -113,19 +114,23 @@ private IObservable> CreateWithChangeSet()
{
var result = new ChangeAwareList();
+ // One shared queue for the whole operator. Each per-child stream, the initial-state
+ // stream, and the merged-children stream all attach as sub-queues, so there is a
+ // single drain point and downstream observer.OnNext is never invoked concurrently
+ // from any path.
+ var queue = new SharedDeliveryQueue();
+
var transformed = _source.Transform(
t =>
{
- var locker = InternalEx.NewLock();
var collection = manySelector(t);
- var changes = childChanges(t).Synchronize(locker).Skip(1);
+ var changes = childChanges(t).SynchronizeSafe(queue).Skip(1);
return new ManyContainer(collection, changes);
}).Publish();
- var outerLock = new object();
- var initial = transformed.Synchronize(outerLock).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer)));
+ var initial = transformed.SynchronizeSafe(queue).Select(changes => new ChangeSet(new DestinationEnumerator(changes, _equalityComparer)));
- var subsequent = transformed.MergeMany(x => x.Changes).Synchronize(outerLock);
+ var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(queue);
var init = initial.Select(
changes =>
@@ -141,9 +146,9 @@ private IObservable> CreateWithChangeSet()
return result.CaptureChanges();
});
- var allChanges = init.Merge(subsequentSelection);
+ var allChanges = init.UnsynchronizedMerge(subsequentSelection);
- return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect());
+ return new CompositeDisposable(allChanges.SubscribeSafe(observer), transformed.Connect(), queue);
});
}
diff --git a/src/DynamicData/List/Internal/Virtualiser.cs b/src/DynamicData/List/Internal/Virtualiser.cs
index bb2f894c2..a6aa1b900 100644
--- a/src/DynamicData/List/Internal/Virtualiser.cs
+++ b/src/DynamicData/List/Internal/Virtualiser.cs
@@ -2,8 +2,11 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
+using System.Reactive.Disposables;
using System.Reactive.Linq;
+using DynamicData.Internal;
+
namespace DynamicData.List.Internal;
internal sealed class Virtualiser(IObservable> source, IObservable requests)
@@ -16,25 +19,29 @@ internal sealed class Virtualiser(IObservable> source, IObserva
public IObservable> Run() => Observable.Create>(
observer =>
{
- var locker = InternalEx.NewLock();
+ // SharedDeliveryQueue + SynchronizeSafe replaces Synchronize(locker) so the
+ // gate lock is released before downstream OnNext. Closes the cross-cache
+ // deadlock window.
+ var queue = new SharedDeliveryQueue();
var all = new List();
var virtualised = new ChangeAwareList();
IVirtualRequest parameters = new VirtualRequest(0, 25);
- var requestStream = _requests.Synchronize(locker).Select(
+ var requestStream = _requests.SynchronizeSafe(queue).Select(
request =>
{
parameters = request;
return CheckParamsAndVirtualise(all, virtualised, request);
});
- var dataChanged = _source.Synchronize(locker).Select(changes => Virtualise(all, virtualised, parameters, changes));
+ var dataChanged = _source.SynchronizeSafe(queue).Select(changes => Virtualise(all, virtualised, parameters, changes));
- // TODO: Remove this shared state stuff ie. _parameters
- return requestStream.Merge(dataChanged).Where(changes => changes is not null && changes.Count != 0)
+ var publisher = requestStream.UnsynchronizedMerge(dataChanged).Where(changes => changes is not null && changes.Count != 0)
.Select(x => x!)
.Select(changes => new VirtualChangeSet(changes, new VirtualResponse(virtualised.Count, parameters.StartIndex, all.Count))).SubscribeSafe(observer);
+
+ return new CompositeDisposable(publisher, queue);
});
private static IChangeSet? CheckParamsAndVirtualise(IList all, ChangeAwareList virtualised, IVirtualRequest? request)
diff --git a/src/DynamicData/List/ObservableListEx.Adapt.cs b/src/DynamicData/List/ObservableListEx.Adapt.cs
index 942d943c7..d5dbaa19b 100644
--- a/src/DynamicData/List/ObservableListEx.Adapt.cs
+++ b/src/DynamicData/List/ObservableListEx.Adapt.cs
@@ -12,6 +12,7 @@
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
+using DynamicData.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;
@@ -49,7 +50,7 @@ public static IObservable> Adapt(this IObservable
observer =>
{
var locker = InternalEx.NewLock();
- return source.Synchronize(locker).Select(
+ return source.SynchronizeSafe(locker).Select(
changes =>
{
adaptor.Adapt(changes);
diff --git a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs
index fd01dcfa6..1a214bb6f 100644
--- a/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs
+++ b/src/DynamicData/List/ObservableListEx.LimitSizeTo.cs
@@ -12,6 +12,7 @@
using System.Reactive.Linq;
using DynamicData.Binding;
using DynamicData.Cache.Internal;
+using DynamicData.Internal;
using DynamicData.List.Internal;
using DynamicData.List.Linq;
@@ -54,9 +55,18 @@ public static IObservable> LimitSizeTo(this ISourceList sou
throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit));
}
- var locker = InternalEx.NewLock();
- var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? GlobalConfig.DefaultScheduler, locker);
+ var effectiveScheduler = scheduler ?? GlobalConfig.DefaultScheduler;
- return limiter.Run().Synchronize(locker).Do(source.RemoveMany);
+ // The shared queue must be created per subscription, not per call. Sharing one queue
+ // across all subscribers couples them and never disposes the queue.
+ return Observable.Create>(observer =>
+ {
+ var queue = new SharedDeliveryQueue();
+ var limiter = new LimitSizeTo(source, sizeLimit, effectiveScheduler, queue);
+
+ var subscription = limiter.Run().SynchronizeSafe(queue).Do(source.RemoveMany).SubscribeSafe(observer);
+
+ return new CompositeDisposable(subscription, queue);
+ });
}
}
diff --git a/src/DynamicData/List/SourceList.cs b/src/DynamicData/List/SourceList.cs
index 3db42bc11..6dcd7b3a7 100644
--- a/src/DynamicData/List/SourceList.cs
+++ b/src/DynamicData/List/SourceList.cs
@@ -1,4 +1,4 @@
-// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
+// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.
@@ -6,7 +6,9 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
+using System.Threading;
+using DynamicData.Internal;
using DynamicData.List.Internal;
// ReSharper disable once CheckNamespace
@@ -20,8 +22,10 @@ namespace DynamicData;
public sealed class SourceList : ISourceList
where T : notnull
{
- private readonly ISubject