Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7d96675
Drop of new internal operator
dwcullop May 29, 2026
a3d5717
Refactor FilterOnObservable and add new test
dwcullop May 29, 2026
51eaec6
Refactor Observable Classes for Clarity
dwcullop May 30, 2026
4231432
Merge branch 'main' into feature/aggregate_many
dwcullop May 31, 2026
524baf7
Refactor cache operators onto Orchestrator composition primitive
dwcullop Jun 1, 2026
51fdc6b
Add CacheChangeHandlerBase, eliminate ScheduleEmit
dwcullop Jun 6, 2026
18b4a7f
Merge branch 'main' into feature/aggregate_many
dwcullop Jun 6, 2026
67b9653
Refactor orchestrator to sub-queue emitter contract
dwcullop Jun 7, 2026
fb471d7
FilterOnObservable: switch buffered path to leading-edge timer
dwcullop Jun 7, 2026
1ec5e4c
Refactor AutoRefresh buffering logic
dwcullop Jun 7, 2026
058754e
AutoRefresh: unify buffered/unbuffered, fix stale-refresh hazard
dwcullop Jun 7, 2026
b2fe104
AutoRefresh: add buffered Update-then-quiet regression test
dwcullop Jun 7, 2026
da3ac61
Revert redundant Update-then-quiet test
dwcullop Jun 7, 2026
d24f102
Migrate MergeMany and MergeManyItems to orchestrator
dwcullop Jun 7, 2026
b244998
Fix three correctness bugs in orchestrator runtime
dwcullop Jun 7, 2026
fd4b588
Add test coverage for OrchestrateMany contract gaps
dwcullop Jun 7, 2026
563a6e4
AutoRefresh: replace TrackAuxiliary with OnDrainComplete(bool) hook
dwcullop Jun 7, 2026
de681f8
Orchestrator: factory-based construction instead of Initialize hook
dwcullop Jun 7, 2026
a07e8fb
SharedDeliveryQueue: handle drain-reentrancy in queue, not consumers
dwcullop Jun 8, 2026
52a4606
Rename OnDrainComplete parameter to isFinal; add contract + queue tests
dwcullop Jun 8, 2026
31da024
Orchestrator: correctness, ergonomics, and devirtualization improvements
dwcullop Jun 11, 2026
48e7264
Add isolation test fixtures for all cache orchestrators
dwcullop Jun 11, 2026
cb5f41d
Address review feedback on orchestrator primitive
dwcullop Jun 14, 2026
ef4ceee
Refactor: unify and rename orchestration primitives
dwcullop Jun 14, 2026
2537e59
Trim verbose comments across orchestrator and queue files
dwcullop Jun 14, 2026
0c69976
Strengthen orchestrator test fixtures
dwcullop Jun 14, 2026
9e45d5c
Address PR feedback: drop redundant Serialize, document MergeMany beh…
Jun 15, 2026
4f99e3c
Make ChangeSetCache and ClonedListChangeSet passive
Jun 15, 2026
acaa9f3
Trim out-of-scope comments from passive ChangeSetCache refactor
Jun 15, 2026
a29d1b0
Propagate per-item observable errors from list MergeMany and remove n…
Jun 15, 2026
527d0ab
Strip PR-narration and verbose noise from comments
Jun 15, 2026
bf0ed30
Delete redundant cache MergeManyListChangeSets wrapper
Jun 15, 2026
994aea4
Merge branch 'main' into feature/aggregate_many
dwcullop Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static partial class AutoRefreshFixture
public class WithPropertyAccessor
: Base
{
[Fact(Skip = "Existing defect: propertyAccessor is not null checked, throws NRE on first notification, instead")]
[Fact]
public void PropertyAccessorIsNull_ThrowsException()
=> FluentActions.Invoking(() => ObservableCacheEx.AutoRefresh(
source: Observable.Never<IChangeSet<Item, int>>(),
Expand Down
318 changes: 314 additions & 4 deletions src/DynamicData.Tests/Cache/AutoRefreshOnObservableFixture.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,317 @@ public void ChangeSetBufferIsGiven_ReevaluatorNotificationsAreBufferedOnSchedule
results.RecordedItemsByKey.Values.Should().BeEquivalentTo(source.Items, "no items should have changed, within the source");
results.HasCompleted.Should().BeFalse("the source has not completed");
}


[Fact]
public void ChangeSetBufferIsGiven_RemoveDuringWindow_DropsPendingRefresh()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1 = new Item() { Id = 1 };
source.AddOrUpdate(item1);

var scheduler = new TestScheduler();


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValueChanged,
changeSetBuffer: TimeSpan.FromSeconds(10),
scheduler: scheduler)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);

results.RecordedChangeSets.Count.Should().Be(1, "the initial Add changeset should propagate");


// UUT Action (publish reevaluator notification at T=5)
scheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
++item1.Value;


// UUT Action (remove the item at T=9, before the buffer window ends at T=15)
scheduler.AdvanceTo(TimeSpan.FromSeconds(9).Ticks);
source.Remove(item1);

results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "the Remove should propagate immediately");
results.RecordedChangeSets.Skip(1).First().Removes.Should().Be(1, "the source removed item #1");


// UUT Action (advance past the buffer window)
scheduler.AdvanceTo(TimeSpan.FromSeconds(20).Ticks);

results.Error.Should().BeNull();
results.RecordedChangeSets.Skip(2).Should().BeEmpty(
"a Refresh for a key the source has already removed is incoherent on arrival");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void ChangeSetBufferIsGiven_UpdateDuringWindow_RefreshEmittedForNewInstance()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1V1 = new Item() { Id = 1 };
using var item1V2 = new Item() { Id = 1 };
source.AddOrUpdate(item1V1);

var scheduler = new TestScheduler();


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValueChanged,
changeSetBuffer: TimeSpan.FromSeconds(10),
scheduler: scheduler)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);

results.RecordedChangeSets.Count.Should().Be(1, "the initial Add changeset should propagate");


// UUT Action (v1's reevaluator fires at T=5)
scheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
++item1V1.Value;


// UUT Action (Update replaces v1 with v2 at T=9, within the window armed by the v1 refresh)
scheduler.AdvanceTo(TimeSpan.FromSeconds(9).Ticks);
source.AddOrUpdate(item1V2);

results.RecordedChangeSets.Skip(1).Count().Should().Be(1, "the Update should propagate immediately");


// UUT Action (v2's reevaluator fires at T=12, arming a fresh window for T=22)
scheduler.AdvanceTo(TimeSpan.FromSeconds(12).Ticks);
++item1V2.Value;


// UUT Action (advance to T=15, the original v1-armed window boundary)
scheduler.AdvanceTo(TimeSpan.FromSeconds(15).Ticks);

results.RecordedChangeSets.Skip(2).Should().BeEmpty(
"the v1-armed window must not fire after the Update replaced its pending refresh");


// UUT Action (advance past the new buffer window at T=22)
scheduler.AdvanceTo(TimeSpan.FromSeconds(22).Ticks);

results.Error.Should().BeNull();
results.RecordedChangeSets.Skip(2).Count().Should().Be(1, "the v2 buffer window has expired");
results.RecordedChangeSets.Skip(2).First().Refreshes.Should().Be(1, "exactly one Refresh is buffered for the key");
results.RecordedChangeSets.Skip(2).First().First().Current.Should().BeSameAs(item1V2, "a Refresh carries the instance the source currently holds, not a superseded one");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void ChangeSetBufferIsGiven_MultipleUpdatesDuringWindow_OnlyLatestRefreshEmitted()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1V1 = new Item() { Id = 1 };
using var item1V2 = new Item() { Id = 1 };
using var item1V3 = new Item() { Id = 1 };
source.AddOrUpdate(item1V1);

var scheduler = new TestScheduler();


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValueChanged,
changeSetBuffer: TimeSpan.FromSeconds(10),
scheduler: scheduler)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);

results.RecordedChangeSets.Count.Should().Be(1, "the initial Add changeset should propagate");


// UUT Action (v1 reEval at T=2, then Update to v2 at T=4)
scheduler.AdvanceTo(TimeSpan.FromSeconds(2).Ticks);
++item1V1.Value;

scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
source.AddOrUpdate(item1V2);


// UUT Action (v2 reEval at T=6, then Update to v3 at T=8)
scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);
++item1V2.Value;

scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
source.AddOrUpdate(item1V3);


// UUT Action (v3 reEval at T=10)
scheduler.AdvanceTo(TimeSpan.FromSeconds(10).Ticks);
++item1V3.Value;

results.RecordedChangeSets.Skip(1).Count().Should().Be(2, "two Updates propagated immediately");


// UUT Action (advance past v3's window at T=20)
scheduler.AdvanceTo(TimeSpan.FromSeconds(20).Ticks);

results.Error.Should().BeNull();
results.RecordedChangeSets.Skip(3).Count().Should().Be(1, "the chain produces a single Refresh changeset");
results.RecordedChangeSets.Skip(3).First().Refreshes.Should().Be(1, "the chain coalesces to a single Refresh");
results.RecordedChangeSets.Skip(3).First().First().Current.Should().BeSameAs(item1V3, "a Refresh carries the instance the source currently holds, not a superseded one");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void ChangeSetBufferIsGiven_SourceCompletesBeforeWindowExpires_PendingRefreshIsEmittedBeforeCompletion()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item = new Item() { Id = 1 };
source.AddOrUpdate(item);

var scheduler = new TestScheduler();


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValueChanged,
changeSetBuffer: TimeSpan.FromSeconds(10),
scheduler: scheduler)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);


// UUT Action (reevaluator fires at T=5, arms the buffer window for T=15)
scheduler.AdvanceTo(TimeSpan.FromSeconds(5).Ticks);
++item.Value;


// UUT Action (complete the source and the reevaluator while the buffer window is still open)
source.Complete();
item.Complete();

results.Error.Should().BeNull();
results.RecordedChangeSets.SelectMany(static cs => cs).Where(static c => c.Reason is ChangeReason.Refresh).Should().HaveCount(1,
"a pending buffered refresh must surface before completion, even when source and reevaluator have already completed");
results.HasCompleted.Should().BeTrue(
"completion of all upstream subscriptions triggers an immediate flush of pending refreshes");


// UUT Action (advance past the original window boundary; the timer was cancelled when sources completed)
scheduler.AdvanceTo(TimeSpan.FromSeconds(15).Ticks);

results.RecordedChangeSets.SelectMany(static cs => cs).Where(static c => c.Reason is ChangeReason.Refresh).Should().HaveCount(1,
"the pending timer was disposed when sources completed; no second refresh should fire at the window boundary");
}

[Fact]
public void NoChangeSetBuffer_AddAndRemoveInSameChangeset_NoRefreshEmitted()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1 = new Item() { Id = 1 };


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValue)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);


// UUT Action (Add + Remove in a single changeset)
source.Edit(updater =>
{
updater.AddOrUpdate(item1);
updater.Remove(item1);
});

results.Error.Should().BeNull();
results.RecordedChangeSets.SelectMany(static cs => cs).Where(static c => c.Reason is ChangeReason.Refresh).Should().BeEmpty(
"a sync reevaluator emission queued during Add must not surface once the same drain removes the item");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void NoChangeSetBuffer_AddAndUpdateInSameChangeset_NoRefreshFromObsoleteInstance()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1V1 = new Item() { Id = 1 };
using var item1V2 = new Item() { Id = 1 };


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValue)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);


// UUT Action (Add v1 + Update to v2 in a single changeset)
source.Edit(updater =>
{
updater.AddOrUpdate(item1V1);
updater.AddOrUpdate(item1V2);
});

results.Error.Should().BeNull();
results.RecordedChangeSets.SelectMany(static cs => cs).Where(static c => c.Reason is ChangeReason.Refresh).Should().BeEmpty(
"a Refresh carrying an instance the source has already replaced is incoherent on arrival");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void NoChangeSetBuffer_MultipleUpdatesInSameChangeset_NoRefreshFromIntermediateInstances()
{
// Setup
using var source = new TestSourceCache<Item, int>(Item.SelectId);

using var item1V1 = new Item() { Id = 1 };
using var item1V2 = new Item() { Id = 1 };
using var item1V3 = new Item() { Id = 1 };


// UUT Initialization
using var subscription = BuildUut(
source: source.Connect(),
reevaluator: Item.ObserveValue)
.ValidateSynchronization()
.ValidateChangeSets(Item.SelectId)
.RecordCacheItems(out var results);


// UUT Action (Add v1 + Update v2 + Update v3 in a single changeset)
source.Edit(updater =>
{
updater.AddOrUpdate(item1V1);
updater.AddOrUpdate(item1V2);
updater.AddOrUpdate(item1V3);
});

results.Error.Should().BeNull();
results.RecordedChangeSets.SelectMany(static cs => cs).Where(static c => c.Reason is ChangeReason.Refresh).Should().BeEmpty(
"every Refresh carrying a value the source has already superseded is incoherent on arrival");
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact]
public void ItemIsAdded_SubscribesToReevaluator()
{
Expand Down Expand Up @@ -495,7 +805,7 @@ public void ReevaluatorEmitsAsynchronously_ItemRefreshes()
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Fact(Skip = "Existing defect, #1099")]
[Fact]
public void ReevaluatorEmitsImmediately_ItemDoesNotRefresh()
{
// Setup
Expand Down Expand Up @@ -523,7 +833,7 @@ public void ReevaluatorEmitsImmediately_ItemDoesNotRefresh()
results.HasCompleted.Should().BeFalse("the source has not completed");
}

[Theory(Skip = "Existing defect. Docs say that ignoring reevaluator exceptions is intentional, but it shouldn't be. Basic RX philosophy is that exceptions should basically always propagate.")]
[Theory]
[InlineData(NotificationStrategy.Immediate)]
[InlineData(NotificationStrategy.Asynchronous)]
public void ReevaluatorFails_ErrorPropagates(NotificationStrategy notificationStrategy)
Expand Down Expand Up @@ -564,7 +874,7 @@ public void ReevaluatorFails_ErrorPropagates(NotificationStrategy notificationSt
}
}

[Fact(Skip = "Existing defect. Docs say that ignoring reevaluator exceptions is intentional, but it shouldn't be. Basic RX philosophy is that exceptions should basically always propagate.")]
[Fact]
public void ReevaluatorThrows_ExceptionPropagates()
{
// Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static partial class AutoRefreshOnObservableFixture
public class WithoutKey
: Base
{
[Fact(Skip = "Existing defect: reevaluator is not null checked, throws NRW on first notification, instead")]
[Fact]
public void ReevaluatorIsNull_ThrowsException()
=> FluentActions.Invoking(() => ObservableCacheEx.AutoRefreshOnObservable(
source: Observable.Never<IChangeSet<Item, int>>(),
Expand Down
21 changes: 21 additions & 0 deletions src/DynamicData.Tests/Cache/FilterOnObservableFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,27 @@ public void ObservableFilterDuplicateValuesHaveNoEffect()
filterStats.Summary.Overall.Adds.Should().Be(MagicNumber);
}

[Fact]
public void FilterObservableError_PropagatesDownstream()
{
// arrange: a filter observable that errors on the second emission
var filterSubject = new Subject<bool>();
var error = new InvalidOperationException("filter exploded");
Exception? observed = null;

using var sub = _source.Connect()
.FilterOnObservable(_ => filterSubject)
.Subscribe(_ => { }, ex => observed = ex);

AddPeople(MagicNumber);

// act: trigger the error
filterSubject.OnError(error);

// assert
observed.Should().BeSameAs(error, "errors from per-item filter observables must terminate the downstream stream");
}

[Fact]
public void ObservableFilterChangesCanBeBuffered()
{
Expand Down
Loading
Loading