Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
50ad68b
Eliminate Rx Merge gate in queue-serialized operators
May 27, 2026
7bfe343
Fix UnsynchronizedMerge dropping terminal notifications
May 27, 2026
860adb5
Introduce DeliveryQueueMerge to combine SDQ routing and gate-free merge
May 27, 2026
f4e983a
Scope DeliveryQueueMerge to AutoRefresh's same-type case
May 27, 2026
0cab88e
Exercise subject-driven branches in DeadlockTortureTest
May 27, 2026
9d526a3
Use typed DeliveryQueue<T> for DeliveryQueueMerge
May 27, 2026
b362db9
Trim AllDangerous_Stacked pusher load to fit per-iteration timeout
May 27, 2026
07e202f
Raise DeadlockTortureTest per-iteration timeout to 60s
May 27, 2026
33df67d
Merge branch 'main' into fix/operator-merge-gate-deadlock
dwcullop May 28, 2026
1b3a03d
Apply UnsynchronizedMerge to SortAndPage/SortAndVirtualize and add fo…
May 28, 2026
d007b2b
Merge branch 'main' into fix/operator-merge-gate-deadlock
dwcullop May 29, 2026
0af69ea
Merge branch 'main' into fix/operator-merge-gate-deadlock
dwcullop May 30, 2026
07a3823
Refactor comments and improve observer handling
dwcullop May 30, 2026
30d5400
Extend Rx gate elimination to 6 additional cache operators
dwcullop May 30, 2026
2aaceb5
Merge branch 'main' into fix/operator-merge-gate-deadlock
dwcullop Jun 14, 2026
baa049e
Add canonical Rx contract rules reference
dwcullop Jun 14, 2026
103f851
Document citation format for Rx contract rule IDs
dwcullop Jun 14, 2026
9383308
Link to original Rx Design Guidelines PDF in rx-contract reference
dwcullop Jun 14, 2026
d9c6a4e
Dedupe section-structure blurb in rx-contract reference
dwcullop Jun 14, 2026
2fc623a
Rename "Quick reference by consumer type" to "Document structure"
dwcullop Jun 14, 2026
a243b18
Restore PDF section order in rx-contract reference
dwcullop Jun 14, 2026
4eb91eb
Rename rx-contract to rx-design-guide; complete PDF distillation
dwcullop Jun 14, 2026
a4df581
Tighten rx-design-guide.instructions.md from 798 to 301 lines
dwcullop Jun 14, 2026
1d77314
Drop CacheParentSubscription mention from rx-design-guide
dwcullop Jun 14, 2026
faf6b05
Remove DynamicData references from rx-design-guide
dwcullop Jun 14, 2026
31bdda9
Virtualise: collapse per-branch null filters into single post-merge f…
dwcullop Jun 14, 2026
78fc364
GroupOn/GroupOnImmutable: collapse per-branch Where into post-merge
dwcullop Jun 14, 2026
5efb0d0
Merge branch 'main' into fix/operator-merge-gate-deadlock
dwcullop Jun 15, 2026
146c6e9
Trim PR-narration from gate-elimination comments
Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ When optimizing, measure allocation rates and lock contention, not just wall-clo

DynamicData operators compose — the output of one is the input of the next. If any operator violates the Rx contract (e.g., concurrent `OnNext` calls, calls after `OnCompleted`), every downstream operator can corrupt its internal state. This is not a crash — it's silent data corruption that manifests as wrong results, missing items, or phantom entries. In a reactive UI, this means the user sees stale or incorrect data with no error message.

See `.github/instructions/rx.instructions.md` for comprehensive Rx contract rules, scheduler usage, disposable patterns, and a complete standard Rx operator reference.
See `.github/instructions/rx.instructions.md` for the DynamicData-flavored practical guide (hot/cold, schedulers, disposable helpers, custom operator patterns, common pitfalls), and `.github/instructions/rx-design-guide.instructions.md` for the canonical rule reference (a complete distillation of the Microsoft Rx Design Guidelines, with stable `§X.Y` IDs to cite in PRs, code reviews, and commit messages, e.g. "§6.6", "§5.2").

## Breaking Changes

Expand Down
300 changes: 300 additions & 0 deletions .github/instructions/rx-design-guide.instructions.md

Large diffs are not rendered by default.

144 changes: 59 additions & 85 deletions .github/instructions/rx.instructions.md

Large diffs are not rendered by default.

193 changes: 176 additions & 17 deletions src/DynamicData.Tests/Cache/DeadlockTortureTest.cs

Large diffs are not rendered by default.

232 changes: 232 additions & 0 deletions src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

using DynamicData.Internal;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.Internal;

/// <summary>
/// Focused behavioural tests for <see cref="DeliveryQueueMergeExtensions.DeliveryQueueMerge{T}(IObservable{T}, IObservable{T}[])"/>.
/// Verifies the Rx Merge-compatible terminal semantics and the queue's serialization guarantee
/// for concurrent producers.
/// </summary>
public sealed class DeliveryQueueMergeFixture
{
[Fact]
public void OnNext_FromAllSources_IsForwardedInArrivalOrder()
{
using var a = new Subject<int>();
using var b = new Subject<int>();
using var c = new Subject<int>();

var received = new List<int>();
using var sub = a.DeliveryQueueMerge(b, c).Subscribe(received.Add);

a.OnNext(1);
b.OnNext(2);
c.OnNext(3);
a.OnNext(4);

received.Should().Equal(1, 2, 3, 4);
}

[Fact]
public void OnCompleted_FiresOnlyAfterEverySourceCompletes()
{
using var a = new Subject<int>();
using var b = new Subject<int>();
using var c = new Subject<int>();

var completed = false;
using var sub = a.DeliveryQueueMerge(b, c).Subscribe(_ => { }, () => completed = true);

a.OnCompleted();
completed.Should().BeFalse();

b.OnCompleted();
completed.Should().BeFalse();

c.OnCompleted();
completed.Should().BeTrue();
}

[Fact]
public void OnError_FromAnySource_TerminatesImmediately()
{
using var a = new Subject<int>();
using var b = new Subject<int>();

Exception? captured = null;
var completed = false;
using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true);

var error = new InvalidOperationException();
a.OnError(error);

captured.Should().BeSameAs(error);
completed.Should().BeFalse();
}

[Fact]
public void OnError_AfterFirstError_IsDroppedByQueue()
{
using var a = new Subject<int>();
using var b = new Subject<int>();

Exception? captured = null;
using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => { });

var first = new InvalidOperationException("first");
var second = new InvalidOperationException("second");
a.OnError(first);
b.OnError(second);

captured.Should().BeSameAs(first);
}

[Fact]
public void OnCompleted_AfterError_IsDroppedByQueue()
{
using var a = new Subject<int>();
using var b = new Subject<int>();

Exception? captured = null;
var completed = false;
using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true);

var error = new InvalidOperationException();
a.OnError(error);
b.OnCompleted();

captured.Should().BeSameAs(error);
completed.Should().BeFalse();
}

[Fact]
public void SynchronousTerminal_AtSubscribe_IsCountedTowardCompletion()
{
var immediate = Observable.Empty<int>();
using var live = new Subject<int>();

var completed = false;
using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, () => completed = true);

completed.Should().BeFalse();
live.OnCompleted();
completed.Should().BeTrue();
}

[Fact]
public void SynchronousError_AtSubscribe_PropagatesImmediately()
{
var error = new InvalidOperationException();
var immediate = Observable.Throw<int>(error);
using var live = new Subject<int>();

Exception? captured = null;
using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, e => captured = e);

captured.Should().BeSameAs(error);
}

[Fact]
public async Task ConcurrentOnNext_FromManyProducers_IsSerializedToObserver()
{
// The queue's contract is that the downstream observer never sees concurrent OnNext calls,
// regardless of how many producers are racing on the inputs. Subscribe to two sources via
// two concurrent tasks, push interleaved items, and verify that no two OnNext calls overlap
// and every item is delivered exactly once.
const int itemsPerProducer = 1_000;

using var a = new Subject<int>();
using var b = new Subject<int>();

var inFlight = 0;
var maxInFlight = 0;
var received = new ConcurrentQueue<int>();

using var sub = a.DeliveryQueueMerge(b).Subscribe(v =>
{
var now = Interlocked.Increment(ref inFlight);
var prev = Volatile.Read(ref maxInFlight);
while (now > prev && Interlocked.CompareExchange(ref maxInFlight, now, prev) != prev)
{
prev = Volatile.Read(ref maxInFlight);
}
received.Enqueue(v);
Interlocked.Decrement(ref inFlight);
});

using var barrier = new Barrier(2);
var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) a.OnNext(i); });
var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) b.OnNext(itemsPerProducer + i); });

await Task.WhenAll(taskA, taskB);

received.Count.Should().Be(itemsPerProducer * 2);
maxInFlight.Should().Be(1, "concurrent OnNext to the observer must be serialized by the queue");

var expected = Enumerable.Range(0, itemsPerProducer * 2).ToHashSet();
received.Should().BeEquivalentTo(expected);
}

[Fact]
public void Subscription_OccursInArgumentOrder()
{
var subscribed = new List<int>();
var first = Observable.Create<int>(o => { subscribed.Add(0); return () => { }; });
var second = Observable.Create<int>(o => { subscribed.Add(1); return () => { }; });
var third = Observable.Create<int>(o => { subscribed.Add(2); return () => { }; });

using var sub = first.DeliveryQueueMerge(second, third).Subscribe(_ => { });

subscribed.Should().Equal(0, 1, 2);
}

[Fact]
public void Dispose_StopsForwardingFromAnySource()
{
using var a = new Subject<int>();
using var b = new Subject<int>();

var received = new List<int>();
var sub = a.DeliveryQueueMerge(b).Subscribe(received.Add);

a.OnNext(1);
sub.Dispose();
a.OnNext(2);
b.OnNext(3);

received.Should().Equal(1);
}

[Fact]
public void NoOthers_FallsBackToFirstAlone()
{
using var a = new Subject<int>();
var received = new List<int>();
var completed = false;
using var sub = a.DeliveryQueueMerge().Subscribe(received.Add, () => completed = true);

a.OnNext(7);
a.OnNext(11);
a.OnCompleted();

received.Should().Equal(7, 11);
completed.Should().BeTrue();
}
}
Loading
Loading