diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 95d5c3c41..8d9261e1d 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -39,7 +39,7 @@ When optimizing, measure allocation rates and lock contention, not just wall-clo DynamicData operators compose — the output of one is the input of the next. If any operator violates the Rx contract (e.g., concurrent `OnNext` calls, calls after `OnCompleted`), every downstream operator can corrupt its internal state. This is not a crash — it's silent data corruption that manifests as wrong results, missing items, or phantom entries. In a reactive UI, this means the user sees stale or incorrect data with no error message. -See `.github/instructions/rx.instructions.md` for comprehensive Rx contract rules, scheduler usage, disposable patterns, and a complete standard Rx operator reference. +See `.github/instructions/rx.instructions.md` for the DynamicData-flavored practical guide (hot/cold, schedulers, disposable helpers, custom operator patterns, common pitfalls), and `.github/instructions/rx-design-guide.instructions.md` for the canonical rule reference (a complete distillation of the Microsoft Rx Design Guidelines, with stable `§X.Y` IDs to cite in PRs, code reviews, and commit messages, e.g. "§6.6", "§5.2"). ## Breaking Changes diff --git a/.github/instructions/rx-design-guide.instructions.md b/.github/instructions/rx-design-guide.instructions.md new file mode 100644 index 000000000..0f08c7851 --- /dev/null +++ b/.github/instructions/rx-design-guide.instructions.md @@ -0,0 +1,300 @@ +--- +applyTo: "**/*.cs" +--- +# Rx Design Guide + +Complete distillation of the **[Microsoft Rx Design Guidelines v1.0 (October 2010)](https://go.microsoft.com/fwlink/?LinkID=205219)**. This is the authoritative reference for the Rx contract and the rules for using Rx and implementing Rx operators. + +**Code samples and API names have been updated to current Rx.NET (modernized from the 2010 spec).** Treat the conventions here as current. + +**Every operator added or modified must self-audit against these rules. Every bug fix must explicitly state which rules were verified.** + +Rules use the original document's `§X.Y` numbering. Cite rule IDs in code reviews, PR descriptions, and commit messages (e.g., "Fixes §6.6 violation in `BatchIf`"). Always use the literal `§` character — never ASCII substitutes (`S`, `SS`, `sec`, etc.). + +--- + +## §4: The Rx contract + +`IObservable` and `IObserver` only specify their methods' arguments and return types. The Rx library makes additional assumptions about these interfaces that are not expressible in the .NET type system. These assumptions form a contract that **all producers and consumers** of Rx types must follow. + +### §4.1. Assume the Rx Grammar + +Messages follow `OnNext* (OnCompleted | OnError)?`: +- Zero or more `OnNext`, optionally followed by exactly one terminal notification. +- `OnError` and `OnCompleted` are **mutually exclusive**. +- After a terminal, **no further notifications of any kind**, not even another terminal. + +The single terminal lets consumers deterministically clean up; the single-failure rule supports abort semantics in multi-source operators (§6.6). + +**When to ignore:** only for non-conforming `IObservable` sources. Restore conformance with `Synchronize()` (§5.8). + +### §4.2. Assume observer instances are called in a serialized fashion + +`OnNext`, `OnError`, and `OnCompleted` calls to a single observer **MUST never execute concurrently.** Only the operators that produce multi-source sequences are required to serialize (§6.7); consumers can safely assume serialization. + +This is the most-violated rule in practice. Violations produce silent state corruption, not exceptions. + +**When to ignore:** for a custom `IObservable` that doesn't serialize, wrap with `Synchronize()` to restore the guarantee. + +### §4.3. Assume resources are cleaned up after an OnError or OnCompleted message + +After a terminal, the operator MUST immediately release its resources. `Observable.Using` / `Finally` fire deterministically. + +### §4.4. Assume a best effort to stop all outstanding work on Unsubscribe + +When `Dispose` is called: +- Queued work that has not started is cancelled. +- Work already in progress may complete, but its results **MUST NOT** be signaled to the unsubscribed observer. +- Messages may arrive **during** the `Dispose` call itself (Dispose races with `OnNext`). +- After `Dispose` returns: no more messages arrive. +- The unsubscription process may continue asynchronously on a different context after `Dispose` returns. + +--- + +## §5: Using Rx + +Rules for code that consumes Rx. Apply recursively inside operator implementations (§6.20). + +### §5.1. Consider drawing a Marble-diagram + +Sketching the inputs and outputs over time often makes the operator choice obvious (delay-then-call → `Throttle`; new sequence per input → `SelectMany`). + +**When to ignore:** when you're comfortable enough without one. + +### §5.2. Consider passing multiple arguments to Subscribe + +`Subscribe(onNext)` **rethrows OnError on the source thread, crashing the app.** The default `OnCompleted` is a no-op. Provide all three handlers unless: +- The sequence is guaranteed not to complete (e.g. a UI event) +- The sequence is guaranteed not to error +- The default behavior is genuinely desired + +### §5.3. Consider using LINQ query expression syntax + +Rx implements the query-expression pattern; `SelectMany`-based pipelines often read better as LINQ. Skip when many of your operators have no query-syntax equivalent. + +### §5.4. Consider passing a specific scheduler to concurrency-introducing operators + +Better to introduce concurrency on the right scheduler from the start than to fix it up with `ObserveOn`: +```csharp +keyUp.Throttle(TimeSpan.FromSeconds(1), DispatcherScheduler.Current); +``` +Without the scheduler, the default `Throttle` overload would deliver on the ThreadPool. + +**When to ignore:** when combining many sources from different contexts, use §5.5 (one `ObserveOn` at the end). + +### §5.5. Call the ObserveOn operator as late and in as few places as possible + +`ObserveOn` schedules per-message work. Placing it after filters avoids scheduling work for messages that get filtered out. Skip entirely when no specific context is required. + +### §5.6. Consider limiting buffers + +`Replay`, `Buffer`, etc. without size/time limits cause unbounded memory growth: `Replay(10000, TimeSpan.FromHours(1))`. + +### §5.7. Make side-effects explicit using the Do operator + +Side effects buried in selector/predicate lambdas are unauditable, and they run **per subscription** (unless shared via §5.10). Hoist them into `Do(...)`: +```csharp +xs.Where(x => x.Failed).Do(x => Log(x)).Subscribe(...); +``` + +**When to ignore:** when the side effect needs data unavailable to `Do`. + +### §5.8. Use the Synchronize operator only to "fix" custom IObservable implementations + +Operators created by Rx already satisfy §4.1 / §4.2. Calling `Synchronize()` on one of them is redundant and counterproductive. Only use it on external sources that don't follow the contract. + +> NOTE: this refers to the **single-argument `Synchronize()`** for non-conforming sources. The **gate-based `Synchronize(gate)`** in multi-source operators (§6.7) is a different pattern and is valid. + +### §5.9. Assume messages can come through until unsubscribe has completed + +Messages can be in flight while `Dispose` is being called and may still arrive during the `Dispose` call. After `Dispose` returns control, no more messages arrive. Unsubscription itself may still be running on another context. + +### §5.10. Use the Publish operator to share side-effects + +Most observables are cold: each subscription replays side effects. When side effects must happen only once, share via `Publish(shared => ...)` or `Publish().RefCount()`. + +**When to ignore:** when subscriptions have no side effects, or when repeating them is harmless. The extra machinery is unnecessary. + +--- + +## §6: Operator implementations + +### §6.1. Implement new operators by composing existing operators + +Composition reuses the corner-case handling the Rx team built into base operators: +```csharp +public static IObservable SelectMany( + this IObservable source, + Func> selector) + => source.Select(selector).Merge(); +``` +`Select` already protects against selector exceptions (§6.4); `Merge` already serializes (§6.7). + +**When to ignore:** no appropriate base operators exist, OR profiling proves the composed form is too slow. + +### §6.2. Implement custom operators using Observable.Create + +When composition isn't enough, use `Observable.Create`. It enforces grammar compliance: auto-unsubscribe on terminal, single-terminal enforcement. + +**When to ignore:** the operator must return a non-conforming sequence (rare; usually testing), or the return type must implement more than `IObservable` (e.g. `ISubject`). + +### §6.3. Implement operators for existing observable sequences as generic extension methods + +Extension methods → IntelliSense on every sequence. Generics → applicable to any element type. + +**When to ignore:** the operator doesn't work on a source sequence, or genuinely cannot be generic. + +### §6.4. Protect calls to user code from within an operator + +Wrap every user-provided delegate in try/catch and route exceptions to `observer.OnError`: +- Selectors, predicates, comparers, key selectors +- Action callbacks (`Do`, `OnItemRemoved`, `SubscribeMany`) +- Calls to dictionaries / lists / hashsets that use a user-provided comparer + +```csharp +source.Subscribe( + x => + { + TResult result; + try { result = selector(x); } + catch (Exception ex) { observer.OnError(ex); return; } + observer.OnNext(result); + }, + observer.OnError, + observer.OnCompleted); +``` + +**Edge of the monad** (do NOT wrap): `Subscribe`, `Dispose`, `OnNext`, `OnError`, `OnCompleted`. Calling `OnError` from these places is undefined behavior. + +**Exception:** for `IScheduler` calls, protect inside the scheduler implementation, not at every call site. + +**When to ignore:** for calls to user code made **before** creating the observable (outside `Observable.Create`). Those run on the current execution context and follow normal control flow. + +### §6.5. Subscribe implementations should not throw + +`Subscribe` may be called asynchronously (e.g. the second source of `Concat` after the first completes). A throw crashes the app because no observer is in scope. Route errors via `observer.OnError(...)` then `return Disposable.Empty;`. + +**When to ignore:** when the error is catastrophic and should bring the program down anyway. + +### §6.6. OnError messages should have abort semantics + +Once `OnError` arrives, the operator MUST emit no further messages — **not even buffered or aggregated state.** The canonical violation: a buffering operator that "salvages" its buffer into a final `OnNext` on error. The buffer must be **discarded**. Aggregating operators (Sort, Group, Page, etc.) must not salvage state on error. + +### §6.7. Serialize calls to IObserver methods within observable sequence implementations + +When combining multiple sources into one output, serialize **all three notification types** (OnNext, OnError, OnCompleted) through a shared gate: +```csharp +var gate = new object(); +source1.Synchronize(gate).Subscribe(observer); +source2.Synchronize(gate).Subscribe(observer); +``` + +**When to ignore:** +- Single-source operator (§6.8 applies) +- No concurrency introduced +- Other constraints guarantee no concurrency + +> NOTE: if a source breaks the contract, the consumer can fix it with `Synchronize()` (§5.8) before passing it in. + +### §6.8. Avoid serializing operators + +Per §6.7 every operator already serializes; downstream operators can **assume** serialized input. Adding `Synchronize` "just in case" clutters code, harms performance, and signals misunderstanding of the contract. Fix non-conforming sources at the consumer boundary (§5.8), not inside operators. + +### §6.9. Parameterize concurrency by providing a scheduler argument + +Concurrency-introducing operators take an `IScheduler`: +```csharp +public static IObservable Return(TValue value, IScheduler scheduler) + => Observable.Create(observer => + scheduler.Schedule(() => { observer.OnNext(value); observer.OnCompleted(); })); +``` + +**When to ignore:** the operator doesn't control concurrency creation (e.g. event-to-observable wrappers), or must use a specific scheduler internally. + +### §6.10. Provide a default scheduler + +In most cases there is a good default. Provide it as an overload so callers can stay succinct. Per §6.12, prefer `Scheduler.Immediate`. + +**When to ignore:** when no good default exists. + +### §6.11. The scheduler should be the last argument to the operator + +Makes the operator fluent in IntelliSense; combined with §6.10's overload, callers can add or omit the scheduler without changing argument order. + +**When to ignore:** `params T[]` operators require `params` last. Make the scheduler the **second-to-last** argument instead. + +### §6.12. Avoid introducing concurrency + +Adding concurrency changes timeliness, and **delivery time is itself observable data** — concurrency skews that data. Defaults should be `Scheduler.Immediate` where possible; only introduce concurrency when essential to the operator's semantics. + +> NOTE: with `Scheduler.Immediate`, `Subscribe` becomes blocking. Expensive computation in that situation is a candidate for introducing concurrency. + +### §6.13. Hand out all disposable instances created inside the operator to consumers + +Every `IDisposable` created inside an operator (subscriptions, scheduled actions, resources) MUST be reachable through the disposable returned to the subscriber. Compose via the `System.Reactive.Disposables` family: + +| Type | Purpose | +|---|---| +| `CompositeDisposable` | Groups multiple disposables; dispose together | +| `SerialDisposable` | Replaceable holder; assignment disposes the previous | +| `SingleAssignmentDisposable` | Assignable once; throws on second assignment | +| `RefCountDisposable` | Underlying disposed only when all dependents released | +| `BooleanDisposable` | Exposes `IsDisposed` state | +| `CancellationDisposable` | Bridges `IDisposable` and `CancellationToken` | +| `ContextDisposable` | Disposes on a specified `SynchronizationContext` | +| `ScheduledDisposable` | Disposes via a scheduler | + +Hidden disposables leak at unsubscribe time and break cleanup. + +### §6.14. Operators should not block + +Return `IObservable`, never `T`. Aggregation operators like `Sum` return `IObservable`; callers escape via `First*`/`Last*`/`Single*` when they need a value. + +### §6.15. Avoid deep stacks caused by recursion in operators + +Stack depth at operator invocation is unknown; recursive operators blow the stack faster than expected. Two solutions: +- The recursive `IScheduler.Schedule(self => ...)` overload +- `yield`-based `IEnumerable>` + `Concat` + +### §6.16. Argument validation should occur outside Observable.Create + +Per §6.5 the subscribe lambda must not throw. Therefore null checks and other validation belong **before** the `Observable.Create(...)` call: +```csharp +if (source == null) throw new ArgumentNullException(nameof(source)); +if (selector == null) throw new ArgumentNullException(nameof(selector)); +return Observable.Create(observer => /* ... */); +``` + +**When to ignore:** when validation genuinely requires the subscription to be live (rare). + +### §6.17. Unsubscription should be idempotent + +The `IDisposable` returned from `Subscribe` doesn't expose state. Consumers may dispose defensively. First `Dispose` runs cleanup; subsequent calls are no-ops. Use a `_disposedValue` flag or equivalent guard. + +### §6.18. Unsubscription should not throw + +Disposal cascades through compositions. A throw crashes the app, and the observer is already unsubscribed so `OnError` can't route the exception. +- If cleanup can fail: swallow + log; never propagate. +- When disposing multiple children: wrap each in try/catch so one failure doesn't skip the rest. + +### §6.19. Custom IObservable implementations should follow the Rx contract + +When you can't use `Observable.Create` (§6.2), your custom `IObservable` must still satisfy all of §4. You take on the burden the library would otherwise carry. + +**When to ignore:** only for sequences that intentionally break the contract (e.g. testing how downstream code behaves under broken contracts). + +### §6.20. Operator implementations should follow guidelines for Rx usage + +Operators internally compose other operators (§6.1). The §5 rules apply **recursively** inside operator implementations. + +--- + +## Maintaining this document + +Derivative of the [Microsoft Rx Design Guidelines v1.0 (October 2010)](https://go.microsoft.com/fwlink/?LinkID=205219). The Microsoft document has not been republished since v1.0 and the underlying Rx contract is stable. **Consult the PDF when revising rule text or resolving questions about original intent.** + +**Do NOT modify rule IDs.** External code reviews, PRs, and commit messages cite `§X.Y` IDs; renumbering breaks references. + +**DO add new rules** in a new section if the codebase discovers patterns the Microsoft document doesn't cover. Use a non-numeric prefix (e.g., `X-1`) to make clear they are not from the original spec. + +**See also:** `rx.instructions.md` for practical Rx.NET reference material and codebase-specific patterns (Defer pattern, disposable code samples, modern operator catalog, common pitfalls). diff --git a/.github/instructions/rx.instructions.md b/.github/instructions/rx.instructions.md index f5f3a21c8..edd557f0d 100644 --- a/.github/instructions/rx.instructions.md +++ b/.github/instructions/rx.instructions.md @@ -1,29 +1,33 @@ --- applyTo: "**/*.cs" --- -# Reactive Extensions (Rx) — Comprehensive Guide +# Reactive Extensions (Rx) — DynamicData Practical Guide Reference: [ReactiveX Observable Contract](http://reactivex.io/documentation/contract.html) | [Rx.NET GitHub](https://github.com/dotnet/reactive) | [IntroToRx.com](http://introtorx.com/) +**Authoritative reference: [`rx-design-guide.instructions.md`](./rx-design-guide.instructions.md)** is the complete distillation of the Microsoft Rx Design Guidelines (October 2010), with stable `§X.Y` rule IDs. Cite those IDs in PR descriptions, code reviews, and commit messages. This file covers practical, DynamicData-flavored material that complements the design guide: hot vs cold semantics, the modern Rx.NET scheduler/disposable APIs, an operator quick-reference, custom-operator patterns specific to this codebase, and common pitfalls. + ## Core Concepts -### Observables are Composable +### Composition -Rx's power comes from composition. Every operator returns a new `IObservable`, enabling fluent chaining: +Rx's power comes from composition (per [§6.1](./rx-design-guide.instructions.md#61-implement-new-operators-by-composing-existing-operators)). Every operator returns a new `IObservable`, enabling fluent chaining: ```csharp source - .Where(x => x.IsValid) // filter - .Select(x => x.Transform()) // project - .DistinctUntilChanged() // deduplicate + .Where(x => x.IsValid) // filter + .Select(x => x.Transform()) // project + .DistinctUntilChanged() // deduplicate .ObserveOn(RxApp.MainThreadScheduler) // marshal to UI thread - .Subscribe(x => UpdateUI(x)); // consume + .Subscribe(x => UpdateUI(x)); // consume ``` -Each operator in the chain is a separate subscription. Disposing the final subscription cascades disposal upstream through the entire chain. This composability is what makes Rx powerful — and what makes contract violations devastating, since a bug in any operator corrupts the entire downstream chain. +Each operator is a separate subscription. Disposing the final subscription cascades disposal upstream through the entire chain. This composability is what makes Rx powerful, and what makes contract violations devastating: a bug in any operator corrupts the entire downstream chain. ### Hot vs Cold Observables +Not covered by the PDF. Critical to understand for DynamicData consumers because most cache pipelines are cold and incorrectly assuming hot semantics is a common source of duplicated work. + **Cold**: starts producing items when subscribed to. Each subscriber gets its own sequence. Created with `Observable.Create`, `Observable.Defer`, `Observable.Return`, etc. ```csharp @@ -38,7 +42,7 @@ var cold = Observable.FromAsync(() => httpClient.GetAsync(url)); var hot = Observable.FromEventPattern(button, nameof(button.Click)); ``` -**Converting**: `Publish()` + `Connect()` or `Publish().RefCount()` converts cold to hot (shared). +**Converting**: `Publish()` + `Connect()` or `Publish().RefCount()` converts cold to hot (shared, see [§5.10](./rx-design-guide.instructions.md#510-use-the-publish-operator-to-share-side-effects)). ```csharp var shared = coldSource.Publish().RefCount(); // auto-connect on first sub, auto-disconnect on last unsub @@ -46,52 +50,14 @@ var shared = coldSource.Publish().RefCount(); // auto-connect on first sub, auto ## The Observable Contract -### 1. Serialized Notifications (THE critical rule) - -`OnNext`, `OnError`, and `OnCompleted` calls MUST be serialized — they must never execute concurrently. This is the most commonly violated rule and causes the most insidious bugs. +The full contract lives in [§4](./rx-design-guide.instructions.md#4-the-rx-contract) of the design guide. Highlights: -```csharp -// WRONG: two sources can call OnNext concurrently -source1.Subscribe(x => observer.OnNext(Process(x))); // thread A -source2.Subscribe(x => observer.OnNext(Process(x))); // thread B — RACE! - -// RIGHT: use Synchronize to serialize -source1.Synchronize(gate).Subscribe(observer); -source2.Synchronize(gate).Subscribe(observer); +- **[§4.1](./rx-design-guide.instructions.md#41-assume-the-rx-grammar)** Grammar: `OnNext* (OnCompleted | OnError)?`. Mutually-exclusive terminals. No notifications after a terminal. +- **[§4.2](./rx-design-guide.instructions.md#42-assume-observer-instances-are-called-in-a-serialized-fashion)** Serialized notifications. The single most-violated rule in practice. Violation produces silent state corruption. +- **[§4.3](./rx-design-guide.instructions.md#43-assume-resources-are-cleaned-up-after-an-onerror-or-oncompleted-message)** Resource cleanup on terminal. +- **[§4.4](./rx-design-guide.instructions.md#44-assume-a-best-effort-to-stop-all-outstanding-work-on-unsubscribe)** Unsubscribe semantics: best-effort stop, in-flight results suppressed. -// RIGHT: use Merge (serializes internally) -source1.Merge(source2).Subscribe(observer); - -// RIGHT: use Subject (serializes OnNext calls via Synchronize) -var subject = new Subject(); -source1.Subscribe(subject); // Subject.OnNext is NOT thread-safe by default! -// Use Subject with Synchronize if multiple threads call OnNext -``` - -**Why it matters**: operators maintain mutable internal state (caches, dictionaries, counters). Concurrent `OnNext` calls corrupt this state silently — no exception, just wrong data. - -### 2. Terminal Notifications - -``` -Grammar: OnNext* (OnError | OnCompleted)? -``` - -- Zero or more `OnNext`, followed by at most one terminal notification -- `OnError` and `OnCompleted` are **mutually exclusive** — emit one or neither, never both -- After a terminal notification, **no further notifications** of any kind -- Operators receiving a terminal notification should release resources - -### 3. Subscription Lifecycle - -- `Subscribe` returns `IDisposable` — disposing it **unsubscribes** -- After disposal, no further notifications should be delivered -- Disposal must be **idempotent** (safe to call multiple times) and **thread-safe** -- Operators should stop producing when their subscription is disposed - -### 4. Error Handling - -- Exceptions thrown inside `OnNext` handlers propagate synchronously to the producing operator -- Use `SubscribeSafe` instead of `Subscribe` to route subscriber exceptions to `OnError`: +DynamicData-specific: subscribers that throw inside `OnNext` propagate exceptions to the producing operator. Use `SubscribeSafe` to route subscriber exceptions to `OnError`: ```csharp // Subscribe: exception in handler crashes the source @@ -105,7 +71,7 @@ source.SubscribeSafe(Observer.Create( ## Schedulers -Schedulers control **when** and **where** work executes. They are Rx's abstraction over threading. +Schedulers control **when** and **where** work executes; they are Rx's abstraction over threading. The design guide's [§5.4](./rx-design-guide.instructions.md#54-consider-passing-a-specific-scheduler-to-concurrency-introducing-operators), [§5.5](./rx-design-guide.instructions.md#55-call-the-observeon-operator-as-late-and-in-as-few-places-as-possible), [§6.9–6.12](./rx-design-guide.instructions.md#69-parameterize-concurrency-by-providing-a-scheduler-argument) cover the rules. This section is the modern Rx.NET reference for which scheduler to actually use. ### Common Schedulers @@ -140,7 +106,7 @@ source.SubscribeOn(TaskPoolScheduler.Default) // subscribe on background thread ### Scheduler Injection for Testability -**Always inject schedulers** instead of using defaults. This enables deterministic testing: +DynamicData convention (not in the PDF): **always inject schedulers** instead of using defaults. This enables deterministic testing via `TestScheduler`: ```csharp // WRONG: hardcoded scheduler — untestable time-dependent behavior @@ -161,7 +127,7 @@ results.Should().HaveCount(1); ## Disposable Helpers -Rx provides several `IDisposable` implementations for managing subscription lifecycles: +The design guide's [§6.13](./rx-design-guide.instructions.md#613-hand-out-all-disposable-instances-created-inside-the-operator-to-consumers) lists the disposable family and the requirement to hand all internal disposables back to consumers. This section is the practical reference with code samples for each. ### Disposable.Create @@ -178,7 +144,7 @@ var cleanup = Disposable.Create(() => ### Disposable.Empty -A no-op disposable. Useful as a default or placeholder. +A no-op disposable. Useful as a default or placeholder (required by [§6.5](./rx-design-guide.instructions.md#65-subscribe-implementations-should-not-throw) when routing an error from `Subscribe`). ```csharp public IDisposable Subscribe(IObservable source) => @@ -215,7 +181,7 @@ Observable.Create(observer => ### SerialDisposable -Holds a single disposable that can be **replaced**. Disposing the previous value when a new one is set. Useful for "switch" patterns. +Holds a single disposable that can be **replaced**. Disposing the previous value when a new one is set. Useful for "switch" patterns. (PDF calls this `MutableDisposable`.) ```csharp var serial = new SerialDisposable(); @@ -231,7 +197,7 @@ serial.Dispose(); ### SingleAssignmentDisposable -Like SerialDisposable but can only be assigned **once**. Throws on second assignment. Useful when a subscription is created asynchronously but disposal might happen before it's ready. +Like SerialDisposable but can only be assigned **once**. Throws on second assignment. Useful when a subscription is created asynchronously but disposal might happen before it's ready. (Modern addition, not in the PDF.) ```csharp var holder = new SingleAssignmentDisposable(); @@ -249,7 +215,7 @@ holder.Dispose(); ### RefCountDisposable -Tracks multiple "dependent" disposables. The underlying resource is only disposed when **all** dependents (plus the primary) are disposed. +Tracks multiple "dependent" disposables. The underlying resource is only disposed when **all** dependents (plus the primary) are disposed. (Modern addition, not in the PDF.) ```csharp var primary = new RefCountDisposable(expensiveResource); @@ -279,6 +245,8 @@ cd.Dispose(); // triggers cancellation ## Standard Rx Operators Reference +Not in the PDF. A quick catalog of modern Rx.NET operators by category. For each operator, the design guide rules in [§5](./rx-design-guide.instructions.md#5-using-rx) and [§6](./rx-design-guide.instructions.md#6-operator-implementations) tell you how to use it correctly. + ### Creation | Operator | Description | @@ -387,38 +355,40 @@ cd.Dispose(); // triggers cancellation | `SubscribeOn(scheduler)` | Subscribe (and produce) on specified scheduler | | `Delay(timeSpan)` | Delay each notification by a time span | | `Timeout(timeSpan)` | Error if no notification within timeout | -| `Synchronize()` | Serialize notifications with internal gate | -| `Synchronize(gate)` | Serialize notifications with external gate object | +| `Synchronize()` | Serialize notifications with internal gate (per [§5.8](./rx-design-guide.instructions.md#58-use-the-synchronize-operator-only-to-fix-custom-iobservable-implementations) — only for non-conforming sources) | +| `Synchronize(gate)` | Serialize notifications with external gate object (the multi-source pattern from [§6.7](./rx-design-guide.instructions.md#67-serialize-calls-to-iobserver-methods-within-observable-sequence-implementations)) | ### Utility | Operator | Description | |----------|-------------| -| `Do(action)` | Perform side effect for each notification | +| `Do(action)` | Perform side effect for each notification (see [§5.7](./rx-design-guide.instructions.md#57-make-side-effects-explicit-using-the-do-operator)) | | `Publish().RefCount()` | Share a subscription among multiple subscribers | | `Replay(bufferSize).RefCount()` | Share with replay | | `AsObservable()` | Hide the implementation type (e.g., Subject → IObservable) | | `Subscribe(observer)` | Subscribe with an IObserver | -| `Subscribe(onNext, onError, onCompleted)` | Subscribe with callbacks | +| `Subscribe(onNext, onError, onCompleted)` | Subscribe with callbacks (see [§5.2](./rx-design-guide.instructions.md#52-consider-passing-multiple-arguments-to-subscribe)) | | `SubscribeSafe(observer)` | Subscribe with exception routing to OnError | | `ForEachAsync(action)` | Async iteration (returns Task) | -| `Wait()` | Block until complete (avoid on UI thread) | +| `Wait()` | Block until complete (avoid on UI thread; see [§6.14](./rx-design-guide.instructions.md#614-operators-should-not-block)) | | `ToTask()` | Convert to Task (last value) | ## Writing Custom Operators +[§6.1](./rx-design-guide.instructions.md#61-implement-new-operators-by-composing-existing-operators) requires composition over `Observable.Create`. [§6.2](./rx-design-guide.instructions.md#62-implement-custom-operators-using-observablecreate) governs how to use `Observable.Create` when composition isn't enough. This section is the DynamicData-specific elaboration of those rules: the Defer pattern for per-subscription state without `Observable.Create`, the canonical `Observable.Create` skeleton, and a multi-source pattern. + ### Composition First — Observable.Create is a Last Resort -**The Rx contracts are axioms, not guidelines.** `Merge` subscribes sequentially. `Defer` evaluates at subscription time. `Do` fires synchronously during delivery. `Concat` subscribes to the second source only after the first completes. These guarantees are unconditional — they hold in every case, on every scheduler, under every threading model. If they didn't, nothing in Rx would work. +**The Rx contracts are axioms, not guidelines.** `Merge` subscribes sequentially. `Defer` evaluates at subscription time. `Do` fires synchronously during delivery. `Concat` subscribes to the second source only after the first completes. These guarantees are unconditional: they hold in every case, on every scheduler, under every threading model. If they didn't, nothing in Rx would work. **Trust the contracts completely.** When you compose operators, you can reason about ordering, state, and lifecycle *because* the contracts are absolute. The moment you doubt them and add "safety" wrappers, you've abandoned the very thing that makes Rx code correct by construction. -**Before reaching for `Observable.Create`, ask: can this be expressed as a composition of existing operators?** Rx operators already handle subscription lifecycle, error propagation, disposal, and serialization. Manual observer forwarding inside `Observable.Create` reimplements all of that — and introduces bugs that the operators would have prevented. +**Before reaching for `Observable.Create`, ask: can this be expressed as a composition of existing operators?** Rx operators already handle subscription lifecycle, error propagation, disposal, and serialization. Manual observer forwarding inside `Observable.Create` reimplements all of that, and introduces bugs that the operators would have prevented. -**The smell:** If you're writing `observer.OnNext(x)` / `observer.OnError(ex)` / `observer.OnCompleted()` inside `Observable.Create`, you're manually reimplementing what `Subscribe` already does. Stop and look for the composition. +**The smell:** if you're writing `observer.OnNext(x)` / `observer.OnError(ex)` / `observer.OnCompleted()` inside `Observable.Create`, you're manually reimplementing what `Subscribe` already does. Stop and look for the composition. ```csharp -// ❌ WRONG: imperative Observable.Create with manual forwarding +// WRONG: imperative Observable.Create with manual forwarding // This reimplements Subscribe, adds boilerplate, and is harder to reason about. return Observable.Create>(observer => { @@ -439,7 +409,7 @@ return Observable.Create>(observer => return sub; }); -// ✅ RIGHT: declarative composition using existing operators +// RIGHT: declarative composition using existing operators // Each operator does one thing. The intent is immediately clear. return Observable.Defer(() => { @@ -496,7 +466,7 @@ Each subscription gets its own `index` / `hasEmitted` — cold observable semant ### The Observable.Create Pattern -When you genuinely need `Observable.Create`, follow this structure: +When you genuinely need `Observable.Create`, follow this structure. Notice the [§6.4](./rx-design-guide.instructions.md#64-protect-calls-to-user-code-from-within-an-operator) try/catch around the user selector and the [§6.16](./rx-design-guide.instructions.md#616-argument-validation-should-occur-outside-observablecreate) argument validation that would be required at the public extension method boundary. ```csharp public static IObservable MyOperator( @@ -526,7 +496,7 @@ public static IObservable MyOperator( ### Multi-Source Operator Pattern -When combining multiple sources, serialize their notifications: +When combining multiple sources, serialize their notifications through a shared gate (per [§6.7](./rx-design-guide.instructions.md#67-serialize-calls-to-iobserver-methods-within-observable-sequence-implementations)): ```csharp public static IObservable MyMerge( @@ -545,23 +515,27 @@ public static IObservable MyMerge( } ``` -**Note**: `Synchronize(gate)` holds the lock during downstream `OnNext` delivery. This ensures serialization but means the lock is held for the duration of all downstream processing. Keep downstream chains lightweight when using shared gates. +**Note**: `Synchronize(gate)` holds the lock during downstream `OnNext` delivery. This ensures serialization but means the lock is held for the duration of all downstream processing. Keep downstream chains lightweight when using shared gates. DynamicData's `SynchronizeSafe` + `SharedDeliveryQueue` is the in-library alternative that releases the lock before downstream delivery (used to prevent cross-cache deadlocks). ### Operator Checklist -When writing or reviewing an Rx operator: - -- [ ] **Serialized delivery**: can `OnNext` be called concurrently? If multiple sources, are they serialized? -- [ ] **Terminal semantics**: does `OnError`/`OnCompleted` propagate correctly? No notifications after terminal? -- [ ] **Disposal**: does disposing the subscription clean up all resources? Is it idempotent? -- [ ] **Error handling**: does `SubscribeSafe` catch subscriber exceptions? Are errors propagated, not swallowed? -- [ ] **Back-pressure**: does the operator buffer unboundedly? Could it cause memory issues? -- [ ] **Scheduler**: are time-dependent operations using an injectable scheduler? -- [ ] **Cold/Hot**: is the observable cold (deferred via `Observable.Create`)? If hot, is sharing handled correctly? +When writing or reviewing an Rx operator, walk this checklist alongside the rule audits in the design guide: + +- [ ] **Serialized delivery** ([§4.2](./rx-design-guide.instructions.md#42-assume-observer-instances-are-called-in-a-serialized-fashion) / [§6.7](./rx-design-guide.instructions.md#67-serialize-calls-to-iobserver-methods-within-observable-sequence-implementations)): can `OnNext` be called concurrently? If multiple sources, are they serialized through the same gate? +- [ ] **Terminal semantics** ([§4.1](./rx-design-guide.instructions.md#41-assume-the-rx-grammar) / [§6.6](./rx-design-guide.instructions.md#66-onerror-messages-should-have-abort-semantics)): does `OnError`/`OnCompleted` propagate correctly? No notifications after terminal? No buffer flush on error? +- [ ] **Disposal** ([§4.4](./rx-design-guide.instructions.md#44-assume-a-best-effort-to-stop-all-outstanding-work-on-unsubscribe) / [§6.13](./rx-design-guide.instructions.md#613-hand-out-all-disposable-instances-created-inside-the-operator-to-consumers) / [§6.17](./rx-design-guide.instructions.md#617-unsubscription-should-be-idempotent) / [§6.18](./rx-design-guide.instructions.md#618-unsubscription-should-not-throw)): does disposing the subscription clean up all resources? Are all internal disposables exposed to the subscriber? Idempotent and non-throwing? +- [ ] **User code protection** ([§6.4](./rx-design-guide.instructions.md#64-protect-calls-to-user-code-from-within-an-operator)): are user-provided selectors / predicates / comparers wrapped in try/catch with errors routed to `OnError`? +- [ ] **Subscribe doesn't throw** ([§6.5](./rx-design-guide.instructions.md#65-subscribe-implementations-should-not-throw)): error conditions detected in subscribe go through `observer.OnError(...)` + `return Disposable.Empty;`? +- [ ] **Argument validation** ([§6.16](./rx-design-guide.instructions.md#616-argument-validation-should-occur-outside-observablecreate)): null checks happen before `Observable.Create`, not inside the subscribe lambda? +- [ ] **Back-pressure / buffers** ([§5.6](./rx-design-guide.instructions.md#56-consider-limiting-buffers)): does the operator buffer unboundedly? Could it cause memory issues? +- [ ] **Scheduler** ([§5.4](./rx-design-guide.instructions.md#54-consider-passing-a-specific-scheduler-to-concurrency-introducing-operators) / [§6.9–6.12](./rx-design-guide.instructions.md#69-parameterize-concurrency-by-providing-a-scheduler-argument)): time-dependent operations take a scheduler argument? Default uses `Scheduler.Immediate` where possible? +- [ ] **Cold/Hot**: is the observable cold (deferred via `Observable.Create` / `Observable.Defer`)? If hot, is sharing handled correctly via `Publish().RefCount()` (per [§5.10](./rx-design-guide.instructions.md#510-use-the-publish-operator-to-share-side-effects))? - [ ] **Thread safety**: is mutable state protected? Are there race conditions between subscribe/dispose/OnNext? ## Common Pitfalls +Practical pitfalls that don't have direct PDF analogues but appear repeatedly in DynamicData code review. + ### 1. Subscribing Multiple Times to a Cold Observable ```csharp @@ -570,7 +544,7 @@ var data = Observable.FromAsync(() => httpClient.GetAsync(url)); data.Subscribe(handler1); // call 1 data.Subscribe(handler2); // call 2 — probably not intended -// RIGHT: share the result +// RIGHT: share the result (per §5.10) var shared = data.Publish().RefCount(); shared.Subscribe(handler1); // shares shared.Subscribe(handler2); // same result @@ -590,7 +564,7 @@ _cleanup.Add(source.Subscribe(x => UpdateUI(x))); ### 3. Blocking on Rx (sync-over-async) ```csharp -// WRONG: blocks the thread, can hang on UI thread +// WRONG: blocks the thread, can hang on UI thread (see §6.14) var result = source.FirstAsync().Wait(); // RIGHT: use async/await @@ -611,7 +585,7 @@ public IObservable Values => _values.AsObservable(); ### 5. Not Handling OnError ```csharp -// WRONG: unhandled OnError crashes the app (routes to DefaultExceptionHandler) +// WRONG: unhandled OnError crashes the app (routes to DefaultExceptionHandler — see §5.2) source.Subscribe(x => Process(x)); // RIGHT: always handle errors diff --git a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs index 4f4ddf9a4..e986b6858 100644 --- a/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs +++ b/src/DynamicData.Tests/Cache/DeadlockTortureTest.cs @@ -30,10 +30,11 @@ public sealed class DeadlockTortureTest { private const int ItemCount = 200; private const int Iterations = 50; - private const int TimeoutSeconds = 15; + private const int TimeoutSeconds = 60; private static async Task RunBidirectionalDeadlockTest( Func>, IObservable>> pipeline, + Action? subjectPusher = null, int iterations = Iterations) { for (var iter = 0; iter < iterations; iter++) @@ -44,11 +45,13 @@ private static async Task RunBidirectionalDeadlockTest( using var aToB = pipeline(sourceA.Connect().Filter(x => x.Name.StartsWith("A"))).PopulateInto(sourceB); using var bToA = pipeline(sourceB.Connect().Filter(x => x.Name.StartsWith("B"))).PopulateInto(sourceA); - using var barrier = new Barrier(2); + var participants = subjectPusher is null ? 2 : 3; + using var barrier = new Barrier(participants); var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + var taskC = subjectPusher is null ? null : Task.Run(() => { barrier.SignalAndWait(); subjectPusher(); }); - var completed = Task.WhenAll(taskA, taskB); + var completed = taskC is null ? Task.WhenAll(taskA, taskB) : Task.WhenAll(taskA, taskB, taskC); if (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds))) != completed) return false; } @@ -64,26 +67,116 @@ [Fact] public async Task AutoRefresh_DoesNotDeadlock() => [Fact] public async Task GroupOn_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()))).Should().BeTrue(); + [Fact] public async Task GroupWithImmutableState_DoesNotDeadlock() => + (await RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey))).Should().BeTrue(); + + [Fact] public async Task GroupOnWithRegrouper_DoesNotDeadlock() + { + using var regrouper = new System.Reactive.Subjects.Subject(); + (await RunBidirectionalDeadlockTest( + s => s.Group(p => p.Age % 3, regrouper).MergeMany(g => g.Cache.Connect()), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) regrouper.OnNext(System.Reactive.Unit.Default); })).Should().BeTrue(); + } + + [Fact] public async Task GroupOnDynamicSelector_DoesNotDeadlock() + { + using var selector = new BehaviorSubject>((p, _) => p.Age % 3); + using var regrouper = new System.Reactive.Subjects.Subject(); + (await RunBidirectionalDeadlockTest( + s => s.Group(selector, regrouper).MergeMany(g => g.Cache.Connect()), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + selector.OnNext((p, _) => p.Age % (2 + (j % 4))); + regrouper.OnNext(System.Reactive.Unit.Default); + } + })).Should().BeTrue(); + } + + [Fact] public async Task TransformAsyncWithForce_DoesNotDeadlock() + { + using var force = new System.Reactive.Subjects.Subject>(); + (await RunBidirectionalDeadlockTest( + s => s.TransformAsync(p => Task.FromResult(new Person("T-" + p.Name, p.Age)), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (_, _) => true); })).Should().BeTrue(); + } + [Fact] public async Task Page_DoesNotDeadlock() { using var req = new BehaviorSubject(new PageRequest(1, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndPage_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new PageRequest(1, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); })).Should().BeTrue(); } [Fact] public async Task Virtualise_DoesNotDeadlock() { using var req = new BehaviorSubject(new VirtualRequest(0, 50)); - (await RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task SortAndVirtualize_DoesNotDeadlock() + { + using var req = new BehaviorSubject(new VirtualRequest(0, 50)); + (await RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), req), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) req.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); })).Should().BeTrue(); + } + + [Fact] public async Task QueryWhenChanged_DoesNotDeadlock() + { + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // QueryWhenChanged with an itemChangedTrigger exercises the Merge branch. + // A side-channel write into the other cache closes the same ABBA cycle that + // PopulateInto would close for changeset-shaped operators. + using var aToB = sourceA.Connect() + .Filter(p => p.Name.StartsWith("A")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceB.AddOrUpdate(new Person("A-marker", 0))); + using var bToA = sourceB.Connect() + .Filter(p => p.Name.StartsWith("B")) + .QueryWhenChanged(p => p.WhenPropertyChanged(x => x.Age)) + .Subscribe(_ => sourceA.AddOrUpdate(new Person("B-marker", 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } } [Fact] public async Task TransformWithForce_DoesNotDeadlock() { using var force = new Subject>(); - (await RunBidirectionalDeadlockTest(s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force))).Should().BeTrue(); + (await RunBidirectionalDeadlockTest( + s => s.Transform((p, k) => new Person("T-" + p.Name, p.Age), force), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) force.OnNext(static (p, _) => true); })).Should().BeTrue(); } - [Fact] public async Task BatchIf_DoesNotDeadlock() => - (await RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null))).Should().BeTrue(); + [Fact] public async Task BatchIf_DoesNotDeadlock() + { + using var pause = new BehaviorSubject(false); + (await RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); })).Should().BeTrue(); + } [Fact] public async Task DisposeMany_DoesNotDeadlock() => (await RunBidirectionalDeadlockTest(s => s.DisposeMany())).Should().BeTrue(); @@ -94,31 +187,65 @@ [Fact] public async Task OnItemRemoved_DoesNotDeadlock() => [Fact] public async Task AllDangerous_Stacked_DoNotDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 100)); + using var virtReq = new BehaviorSubject(new VirtualRequest(0, 100)); using var force = new Subject>(); (await RunBidirectionalDeadlockTest( - s => s.AutoRefresh(p => p.Age) + s => s.GroupWithImmutableState(p => p.Age % 3) + .TransformMany(g => g.Items, p => p.UniqueKey) + .AutoRefresh(p => p.Age) .Filter(p => p.Age >= 0) .Transform((p, k) => new Person("X-" + p.Name, p.Age), force) .OnItemRemoved(_ => { }) .DisposeMany() .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .Virtualise(virtReq) .Page(pageReq), + subjectPusher: () => + { + for (var j = 0; j < ItemCount; j++) + { + force.OnNext(static (p, _) => true); + pageReq.OnNext(new PageRequest(1 + (j % 4), 50 + (j % 4) * 50)); + virtReq.OnNext(new VirtualRequest(j * 5, 50 + (j % 4) * 50)); + } + }, iterations: Iterations * 2)).Should().BeTrue(); } [Fact] public async Task MultiplePairs_Simultaneous_NoDeadlock() { using var pageReq = new BehaviorSubject(new PageRequest(1, 50)); + using var pageReq2 = new BehaviorSubject(new PageRequest(1, 50)); using var virtReq = new BehaviorSubject(new VirtualRequest(0, 50)); + using var virtReq2 = new BehaviorSubject(new VirtualRequest(0, 50)); + using var pause = new BehaviorSubject(false); var results = await Task.WhenAll( - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), 30), - RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), 30), - RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), 30), - RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), 30), - RunBidirectionalDeadlockTest(s => s.DisposeMany(), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), 30), - RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), 30), - RunBidirectionalDeadlockTest(s => s.BatchIf(new BehaviorSubject(false), false, (TimeSpan?)null), 30)); + RunBidirectionalDeadlockTest(s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)), iterations: 30), + RunBidirectionalDeadlockTest(s => s.AutoRefresh(p => p.Age), iterations: 30), + RunBidirectionalDeadlockTest(s => s.Group(p => p.Age % 3).MergeMany(g => g.Cache.Connect()), iterations: 30), + RunBidirectionalDeadlockTest(s => s.GroupWithImmutableState(p => p.Age % 3).TransformMany(g => g.Items, p => p.UniqueKey), iterations: 30), + RunBidirectionalDeadlockTest(s => s.OnItemRemoved(_ => { }), iterations: 30), + RunBidirectionalDeadlockTest(s => s.DisposeMany(), iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Page(pageReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndPage(SortExpressionComparer.Ascending(p => p.Age), pageReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pageReq2.OnNext(new PageRequest(1 + (j % 4), 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.Sort(SortExpressionComparer.Ascending(p => p.Age)).Virtualise(virtReq), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.SortAndVirtualize(SortExpressionComparer.Ascending(p => p.Age), virtReq2), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) virtReq2.OnNext(new VirtualRequest(j * 5, 25 + (j % 4) * 25)); }, + iterations: 30), + RunBidirectionalDeadlockTest( + s => s.BatchIf(pause, false, (TimeSpan?)null), + subjectPusher: () => { for (var j = 0; j < ItemCount; j++) pause.OnNext(j % 2 == 0); }, + iterations: 30)); results.Should().AllSatisfy(r => r.Should().BeTrue()); } @@ -145,4 +272,36 @@ [Fact] public async Task ThreeWayCircular_DoesNotDeadlock() (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); } } + + [Fact] public async Task TransformToTree_DoesNotDeadlock() + { + // Cross-cache cycle is closed via a side-channel Subscribe that writes a marker + // into the other cache for every tree changeset. + for (var iter = 0; iter < Iterations; iter++) + { + using var sourceA = new SourceCache(p => p.UniqueKey); + using var sourceB = new SourceCache(p => p.UniqueKey); + + // The pivotOn function returns the parent's key (or the item's own key for roots). Half the + // items become children of "A-{iter}-0" / "B-{iter}-0", populating the inner tree structure. + using var aToB = sourceA.Connect() + .TransformToTree(p => p.Age == 0 ? p.UniqueKey : "A-" + iter + "-0") + .Subscribe(_ => sourceB.AddOrUpdate(new Person("from-a-tree-" + iter, 0))); + using var bToA = sourceB.Connect() + .TransformToTree(p => p.Age == 0 ? p.UniqueKey : "B-" + iter + "-0") + .Subscribe(_ => sourceA.AddOrUpdate(new Person("from-b-tree-" + iter, 0))); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceA.AddOrUpdate(new Person("A-" + iter + "-" + i, i)); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < ItemCount; i++) sourceB.AddOrUpdate(new Person("B-" + iter + "-" + i, i)); }); + + var completed = Task.WhenAll(taskA, taskB); + (await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(TimeoutSeconds)))).Should().BeSameAs(completed, "iteration " + iter); + } + } + + [Fact] public async Task Switch_DoesNotDeadlock() => + // Observable.Return(s).Switch() drives exactly one outer notification, which is enough + // to wire up the destination cache and exercise its inner-change delivery path. + (await RunBidirectionalDeadlockTest(s => System.Reactive.Linq.Observable.Return(s).Switch())).Should().BeTrue(); } diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs new file mode 100644 index 000000000..fe67508a6 --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueMergeFixture.cs @@ -0,0 +1,232 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Verifies the Rx Merge-compatible terminal semantics and the queue's serialization guarantee +/// for concurrent producers. +/// +public sealed class DeliveryQueueMergeFixture +{ + [Fact] + public void OnNext_FromAllSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var received = new List(); + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + c.OnNext(3); + a.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterEverySourceCompletes() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.DeliveryQueueMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse(); + + b.OnCompleted(); + completed.Should().BeFalse(); + + c.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void OnError_AfterFirstError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first); + } + + [Fact] + public void OnCompleted_AfterError_IsDroppedByQueue() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.DeliveryQueueMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse(); + } + + [Fact] + public void SynchronousTerminal_AtSubscribe_IsCountedTowardCompletion() + { + var immediate = Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse(); + live.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_AtSubscribe_PropagatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.DeliveryQueueMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public async Task ConcurrentOnNext_FromManyProducers_IsSerializedToObserver() + { + // The queue's contract is that the downstream observer never sees concurrent OnNext calls, + // regardless of how many producers are racing on the inputs. Subscribe to two sources via + // two concurrent tasks, push interleaved items, and verify that no two OnNext calls overlap + // and every item is delivered exactly once. + const int itemsPerProducer = 1_000; + + using var a = new Subject(); + using var b = new Subject(); + + var inFlight = 0; + var maxInFlight = 0; + var received = new ConcurrentQueue(); + + using var sub = a.DeliveryQueueMerge(b).Subscribe(v => + { + var now = Interlocked.Increment(ref inFlight); + var prev = Volatile.Read(ref maxInFlight); + while (now > prev && Interlocked.CompareExchange(ref maxInFlight, now, prev) != prev) + { + prev = Volatile.Read(ref maxInFlight); + } + received.Enqueue(v); + Interlocked.Decrement(ref inFlight); + }); + + using var barrier = new Barrier(2); + var taskA = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) a.OnNext(i); }); + var taskB = Task.Run(() => { barrier.SignalAndWait(); for (var i = 0; i < itemsPerProducer; i++) b.OnNext(itemsPerProducer + i); }); + + await Task.WhenAll(taskA, taskB); + + received.Count.Should().Be(itemsPerProducer * 2); + maxInFlight.Should().Be(1, "concurrent OnNext to the observer must be serialized by the queue"); + + var expected = Enumerable.Range(0, itemsPerProducer * 2).ToHashSet(); + received.Should().BeEquivalentTo(expected); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.DeliveryQueueMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void Dispose_StopsForwardingFromAnySource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + var sub = a.DeliveryQueueMerge(b).Subscribe(received.Add); + + a.OnNext(1); + sub.Dispose(); + a.OnNext(2); + b.OnNext(3); + + received.Should().Equal(1); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.DeliveryQueueMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs new file mode 100644 index 000000000..256937218 --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedCombineLatestFixture.cs @@ -0,0 +1,217 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: emits only after both sources have produced at least one value, then on every subsequent OnNext from either side; first error terminates; +/// completes only after both sources complete. +/// +public sealed class UnsynchronizedCombineLatestFixture +{ + [Fact] + public void OnNext_DoesNotEmit_UntilBothSourcesHaveProduced() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add); + + a.OnNext(1); + received.Should().BeEmpty("only the first source has produced"); + + a.OnNext(2); + received.Should().BeEmpty("the second source still has not produced"); + + b.OnNext("first"); + received.Should().Equal("2:first"); + } + + [Fact] + public void OnNext_AfterBothHaveProduced_EmitsOnEverySubsequentValue() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add); + + a.OnNext(1); + b.OnNext("x"); + a.OnNext(2); + b.OnNext("y"); + b.OnNext("z"); + a.OnNext(3); + + received.Should().Equal("1:x", "2:x", "2:y", "2:z", "3:z"); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterBothSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("the second source is still live"); + + b.OnCompleted(); + completed.Should().BeTrue(); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnNext_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + Exception? captured = null; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(received.Add, e => captured = e); + + a.OnNext(1); + b.OnNext("x"); + received.Should().Equal("1:x"); + + var error = new InvalidOperationException(); + a.OnError(error); + + a.OnNext(2); + b.OnNext("y"); + received.Should().Equal(new[] { "1:x" }, "no further OnNext must arrive after OnError has fired"); + captured.Should().BeSameAs(error); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => $"{x}:{y}").Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError"); + } + + [Fact] + public void ResultSelector_ReceivesMostRecentValueFromEachSource() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => x * 10 + y).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + received.Should().Equal(12); + + a.OnNext(3); + received.Should().Equal(new[] { 12, 32 }, "the second source's most recent value (2) must still be in effect"); + + b.OnNext(4); + received.Should().Equal(12, 32, 34); + } + + [Fact] + public void SynchronousValues_AtSubscribeTime_AreCombinedCorrectly() + { + // Behaviour subjects deliver their initial value synchronously at Subscribe time. + // The helper must capture the first source's value before subscribing to the second, + // and immediately emit when the second source's initial value arrives. + using var a = new System.Reactive.Subjects.BehaviorSubject(7); + using var b = new System.Reactive.Subjects.BehaviorSubject(11); + + var received = new List(); + using var sub = a.UnsynchronizedCombineLatest(b, (x, y) => x + y).Subscribe(received.Add); + + received.Should().Equal(new[] { 18 }, "both subjects delivered synchronously at subscribe time"); + } + + [Fact] + public void SynchronousCompletion_BeforeOther_StillCompletesOnlyAfterBoth() + { + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedCombineLatest(live, (x, y) => x + y).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOther_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedCombineLatest(live, (x, y) => x + y).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } +} diff --git a/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs new file mode 100644 index 000000000..85b95b8ea --- /dev/null +++ b/src/DynamicData.Tests/Internal/UnsynchronizedMergeFixture.cs @@ -0,0 +1,175 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Reactive.Subjects; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +/// +/// Focused behavioural tests for . +/// Covers the contract the helper has to honour as a drop-in +/// replacement: subscription order, all-must-complete OnCompleted, first-error-wins OnError, and synchronous terminal +/// notifications. +/// +public sealed class UnsynchronizedMergeFixture +{ + [Fact] + public void OnNext_FromBothSources_IsForwardedInArrivalOrder() + { + using var a = new Subject(); + using var b = new Subject(); + + var received = new List(); + using var sub = a.UnsynchronizedMerge(b).Subscribe(received.Add); + + a.OnNext(1); + b.OnNext(2); + a.OnNext(3); + b.OnNext(4); + + received.Should().Equal(1, 2, 3, 4); + } + + [Fact] + public void OnCompleted_FiresOnlyAfterAllSourcesComplete() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, () => completed = true); + + a.OnCompleted(); + completed.Should().BeFalse("a single source completion must not terminate the merged stream"); + + b.OnCompleted(); + completed.Should().BeFalse("two of three completions still leave one source live"); + + c.OnCompleted(); + completed.Should().BeTrue("after every source has completed the merged stream must emit OnCompleted"); + } + + [Fact] + public void OnError_FromAnySource_TerminatesImmediately() + { + using var a = new Subject(); + using var b = new Subject(); + using var c = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b, c).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException("first"); + b.OnError(error); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("OnCompleted must not fire after OnError"); + } + + [Fact] + public void OnError_AfterFirstError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => { }); + + var first = new InvalidOperationException("first"); + var second = new InvalidOperationException("second"); + a.OnError(first); + b.OnError(second); + + captured.Should().BeSameAs(first, "first error wins; subsequent errors from other sources must be dropped"); + } + + [Fact] + public void OnCompleted_AfterError_IsIgnored() + { + using var a = new Subject(); + using var b = new Subject(); + + Exception? captured = null; + var completed = false; + using var sub = a.UnsynchronizedMerge(b).Subscribe(_ => { }, e => captured = e, () => completed = true); + + var error = new InvalidOperationException(); + a.OnError(error); + b.OnCompleted(); + + captured.Should().BeSameAs(error); + completed.Should().BeFalse("a late OnCompleted from a surviving source must not arrive after OnError has fired"); + } + + [Fact] + public void Subscription_OccursInArgumentOrder() + { + var subscribed = new List(); + var first = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(0); return () => { }; }); + var second = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(1); return () => { }; }); + var third = System.Reactive.Linq.Observable.Create(o => { subscribed.Add(2); return () => { }; }); + + using var sub = first.UnsynchronizedMerge(second, third).Subscribe(_ => { }); + + subscribed.Should().Equal(0, 1, 2); + } + + [Fact] + public void SynchronousTerminal_BeforeOtherSourcesSubscribe_IsHandled() + { + // A source that completes synchronously at subscribe time decrements the pending counter immediately. + // If the helper miscounted, the merged stream would either complete prematurely or never complete. + var immediate = System.Reactive.Linq.Observable.Empty(); + using var live = new Subject(); + + var completed = false; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, () => completed = true); + + completed.Should().BeFalse("the live source has not completed yet"); + + live.OnCompleted(); + + completed.Should().BeTrue(); + } + + [Fact] + public void SynchronousError_BeforeOtherSourcesSubscribe_TerminatesImmediately() + { + var error = new InvalidOperationException(); + var immediate = System.Reactive.Linq.Observable.Throw(error); + using var live = new Subject(); + + Exception? captured = null; + using var sub = immediate.UnsynchronizedMerge(live).Subscribe(_ => { }, e => captured = e); + + captured.Should().BeSameAs(error); + } + + [Fact] + public void NoOthers_FallsBackToFirstAlone() + { + // Boundary: zero entries in the params array. Behaviour must mirror Observable.Merge over a single source. + using var a = new Subject(); + var received = new List(); + var completed = false; + using var sub = a.UnsynchronizedMerge().Subscribe(received.Add, () => completed = true); + + a.OnNext(7); + a.OnNext(11); + a.OnCompleted(); + + received.Should().Equal(7, 11); + completed.Should().BeTrue(); + } +} \ No newline at end of file diff --git a/src/DynamicData/Cache/Internal/AutoRefresh.cs b/src/DynamicData/Cache/Internal/AutoRefresh.cs index ee81fe581..f1f17d677 100644 --- a/src/DynamicData/Cache/Internal/AutoRefresh.cs +++ b/src/DynamicData/Cache/Internal/AutoRefresh.cs @@ -32,9 +32,8 @@ public IObservable> Run() => Observable.Create list.Count > 0).Select(items => new ChangeSet(items)); // publish refreshes and underlying changes - var queue = new SharedDeliveryQueue(); - var publisher = shared.SynchronizeSafe(queue).Merge(refreshChanges.SynchronizeSafe(queue)).SubscribeSafe(observer); + var publisher = shared.DeliveryQueueMerge(refreshChanges).SubscribeSafe(observer); - return new CompositeDisposable(publisher, shared.Connect(), queue); + return new CompositeDisposable(publisher, shared.Connect()); }); } diff --git a/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs b/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs index f65df1b96..17afafebd 100644 --- a/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs +++ b/src/DynamicData/Cache/Internal/EditDiffChangeSetOptional.cs @@ -17,35 +17,35 @@ internal sealed class EditDiffChangeSetOptional(IObservable _keySelector = keySelector ?? throw new ArgumentNullException(nameof(keySelector)); public IObservable> Run() => Observable.Create>(observer => - { - var previous = Optional.None(); + { + var previous = Optional.None(); - return _source.Synchronize().Subscribe( - nextValue => - { - var current = nextValue.Convert(val => new ValueContainer(val, _keySelector(val))); + return _source.Subscribe( + nextValue => + { + var current = nextValue.Convert(val => new ValueContainer(val, _keySelector(val))); - // Determine the changes - var changes = (previous.HasValue, current.HasValue) switch - { - (true, true) => CreateUpdateChanges(previous.Value, current.Value), - (false, true) => [new Change(ChangeReason.Add, current.Value.Key, current.Value.Object)], - (true, false) => [new Change(ChangeReason.Remove, previous.Value.Key, previous.Value.Object)], - (false, false) => [], - }; + // Determine the changes + var changes = (previous.HasValue, current.HasValue) switch + { + (true, true) => CreateUpdateChanges(previous.Value, current.Value), + (false, true) => [new Change(ChangeReason.Add, current.Value.Key, current.Value.Object)], + (true, false) => [new Change(ChangeReason.Remove, previous.Value.Key, previous.Value.Object)], + (false, false) => [], + }; - // Save the value for the next round - previous = current; + // Save the value for the next round + previous = current; - // If there are changes, emit as a ChangeSet - if (changes.Length > 0) - { - observer.OnNext(new ChangeSet(changes)); - } - }, - observer.OnError, - observer.OnCompleted); - }); + // If there are changes, emit as a ChangeSet + if (changes.Length > 0) + { + observer.OnNext(new ChangeSet(changes)); + } + }, + observer.OnError, + observer.OnCompleted); + }); private Change[] CreateUpdateChanges(in ValueContainer prev, in ValueContainer curr) { diff --git a/src/DynamicData/Cache/Internal/GroupOn.cs b/src/DynamicData/Cache/Internal/GroupOn.cs index fd0936fe7..9cf8ca0e2 100644 --- a/src/DynamicData/Cache/Internal/GroupOn.cs +++ b/src/DynamicData/Cache/Internal/GroupOn.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class GroupOn(IObservable> source, Func groupSelectorKey, IObservable? regrouper) @@ -25,11 +27,11 @@ public IObservable> Run() => Observabl var queue = new SharedDeliveryQueue(); var grouper = new Grouper(_groupSelectorKey); - var groups = _source.SynchronizeSafe(queue).Finally(observer.OnCompleted).Select(grouper.Update).Where(changes => changes.Count != 0); + var groups = _source.SynchronizeSafe(queue).Finally(observer.OnCompleted).Select(grouper.Update); - var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); + var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()); - var published = groups.Merge(regroup).Publish(); + var published = groups.UnsynchronizedMerge(regroup).Where(changes => changes.Count != 0).Publish(); var subscriber = published.SubscribeSafe(observer); var disposer = published.DisposeMany().Subscribe(); diff --git a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs index 0e11bc594..91efad22f 100644 --- a/src/DynamicData/Cache/Internal/GroupOnDynamic.cs +++ b/src/DynamicData/Cache/Internal/GroupOnDynamic.cs @@ -6,6 +6,8 @@ using System.Reactive.Disposables; using System.Reactive.Linq; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class GroupOnDynamic(IObservable> source, IObservable> selectGroupObservable, IObservable? regrouper = null) @@ -71,8 +73,11 @@ public IObservable> Run() => Observabl }, onError: observer.OnError); - // Create an observable that completes when all 3 inputs complete so the downstream can be completed as well - var subOnComplete = Observable.Merge(sharedSource.ToUnit(), sharedGroupSelector.ToUnit(), sharedRegrouper) + // All three inputs are routed through the same SharedDeliveryQueue so their notifications + // are already serialized; the merge is only here to coalesce their completion into a single + // downstream OnCompleted. UnsynchronizedMerge avoids the ABBA-prone gate that Observable.Merge + // would hold across the downstream observer.OnCompleted/observer.OnError call. + var subOnComplete = sharedSource.ToUnit().UnsynchronizedMerge(sharedGroupSelector.ToUnit(), sharedRegrouper) .IgnoreElements() .SubscribeSafe(observer.OnError, observer.OnCompleted); diff --git a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs index ba33db031..1f0923af7 100644 --- a/src/DynamicData/Cache/Internal/GroupOnImmutable.cs +++ b/src/DynamicData/Cache/Internal/GroupOnImmutable.cs @@ -25,11 +25,11 @@ public IObservable> Run() => var queue = new SharedDeliveryQueue(); var grouper = new Grouper(_groupSelectorKey); - var groups = _source.SynchronizeSafe(queue).Select(grouper.Update).Where(changes => changes.Count != 0); + var groups = _source.SynchronizeSafe(queue).Select(grouper.Update); - var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()).Where(changes => changes.Count != 0); + var regroup = _regrouper.SynchronizeSafe(queue).Select(_ => grouper.Regroup()); - return new CompositeDisposable(groups.Merge(regroup).SubscribeSafe(observer), queue); + return new CompositeDisposable(groups.UnsynchronizedMerge(regroup).Where(changes => changes.Count != 0).SubscribeSafe(observer), queue); }); private sealed class Grouper(Func groupSelectorKey) diff --git a/src/DynamicData/Cache/Internal/Page.cs b/src/DynamicData/Cache/Internal/Page.cs index e41595871..2d91ac29d 100644 --- a/src/DynamicData/Cache/Internal/Page.cs +++ b/src/DynamicData/Cache/Internal/Page.cs @@ -19,7 +19,7 @@ public IObservable> Run() => Observable.Create updates is not null) .Select(x => x!) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs index 01828e85e..7e9764244 100644 --- a/src/DynamicData/Cache/Internal/QueryWhenChanged.cs +++ b/src/DynamicData/Cache/Internal/QueryWhenChanged.cs @@ -49,7 +49,7 @@ public IObservable> Run() return cache; }).Select(list => new AnonymousQuery(list)); - return new CompositeDisposable(sourceChanged.Merge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); + return new CompositeDisposable(sourceChanged.UnsynchronizedMerge(inlineChange).SubscribeSafe(observer), shared.Connect(), queue); }); } } diff --git a/src/DynamicData/Cache/Internal/Sort.cs b/src/DynamicData/Cache/Internal/Sort.cs index 11b39035e..6afb9632f 100644 --- a/src/DynamicData/Cache/Internal/Sort.cs +++ b/src/DynamicData/Cache/Internal/Sort.cs @@ -57,7 +57,7 @@ public IObservable> Run() => Observable.Create result is not null).Select(x => x!).SubscribeSafe(observer), queue); + return new CompositeDisposable(comparerChanged.UnsynchronizedMerge(dataChanged, sortAgain).Where(result => result is not null).Select(x => x!).SubscribeSafe(observer), queue); }); private sealed class Sorter(SortOptimisations optimisations, IComparer? comparer = null, int resetThreshold = -1) diff --git a/src/DynamicData/Cache/Internal/SortAndPage.cs b/src/DynamicData/Cache/Internal/SortAndPage.cs index 2d612a9b6..02ef996dc 100644 --- a/src/DynamicData/Cache/Internal/SortAndPage.cs +++ b/src/DynamicData/Cache/Internal/SortAndPage.cs @@ -111,10 +111,10 @@ public IObservable>> Run() => return ApplyPagedChanges(changes); }); - return new CompositeDisposable(Observable.Merge( - comparerChanged.Skip(1), - paramsChanged.Where(changes => changes.Count is not 0), - dataChange.Where(changes => changes.Count is not 0)) + return new CompositeDisposable(comparerChanged.Skip(1) + .UnsynchronizedMerge( + paramsChanged.Where(changes => changes.Count is not 0), + dataChange.Where(changes => changes.Count is not 0)) .SubscribeSafe(observer), queue); ChangeSet> ApplyPagedChanges(IChangeSet? changeSet = null) diff --git a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs index 44c6d3dc3..0e6ae5508 100644 --- a/src/DynamicData/Cache/Internal/SortAndVirtualize.cs +++ b/src/DynamicData/Cache/Internal/SortAndVirtualize.cs @@ -113,8 +113,7 @@ public IObservable>> Run() => return new CompositeDisposable( comparerChanged - .Merge(paramsChanged) - .Merge(dataChange) + .UnsynchronizedMerge(paramsChanged, dataChange) .Where(changes => changes.Count is not 0) .SubscribeSafe(observer), queue); diff --git a/src/DynamicData/Cache/Internal/Switch.cs b/src/DynamicData/Cache/Internal/Switch.cs index 766f9f14a..d5c2302d1 100644 --- a/src/DynamicData/Cache/Internal/Switch.cs +++ b/src/DynamicData/Cache/Internal/Switch.cs @@ -6,6 +6,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class Switch(IObservable>> sources) @@ -18,29 +20,35 @@ public IObservable> Run() => Observable.Create { var queue = new SharedDeliveryQueue(); - var destination = new LockFreeObservableCache(); - var errors = new Subject>(); - - var populator = Observable.Switch( - _sources - .SynchronizeSafe(queue) - .Do(onNext: _ => destination.Clear(), - onError: error => errors.OnError(error))) + var innerSubscription = new SerialDisposable(); + + // The outer (sources) and every inner are routed through the same SharedDeliveryQueue. + // Both the per-source clear and the per-changeset destination write happen on the drain + // thread, so destination.Connect() emissions and any errors.OnError calls also originate + // from inside the drain. The downstream merge therefore sees pre-serialized inputs and + // uses UnsynchronizedMerge to avoid the ABBA-prone Observable.Merge gate. + var sourcesSubscription = _sources .SynchronizeSafe(queue) - .Do(onNext: static _ => { }, - onError: error => errors.OnError(error)) - .PopulateInto(destination); + .SubscribeSafe( + onNext: newSource => + { + destination.Clear(); + innerSubscription.Disposable = newSource + .SynchronizeSafe(queue) + .SubscribeSafe( + onNext: changes => destination.Edit(updater => updater.Clone(changes)), + onError: errors.OnError); + }, + onError: errors.OnError); return new CompositeDisposable( destination, errors, - populator, - destination - .Connect() - .Merge(errors) - .SubscribeSafe(observer), + sourcesSubscription, + innerSubscription, + destination.Connect().UnsynchronizedMerge(errors).SubscribeSafe(observer), queue); }); } diff --git a/src/DynamicData/Cache/Internal/TransformAsync.cs b/src/DynamicData/Cache/Internal/TransformAsync.cs index 73b6f62a9..66918d9ee 100644 --- a/src/DynamicData/Cache/Internal/TransformAsync.cs +++ b/src/DynamicData/Cache/Internal/TransformAsync.cs @@ -6,6 +6,8 @@ using System.Reactive.Linq; using System.Reactive.Threading.Tasks; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal class TransformAsync( @@ -32,7 +34,7 @@ public IObservable> Run() => var forced = forceTransform.SynchronizeSafe(queue) .Select(shouldTransform => DoTransform(cache, shouldTransform)).Concat(); - transformer = transformer.SynchronizeSafe(queue).Merge(forced); + transformer = transformer.SynchronizeSafe(queue).UnsynchronizedMerge(forced); return new CompositeDisposable(transformer.SubscribeSafe(observer), queue); } diff --git a/src/DynamicData/Cache/Internal/TransformMany.cs b/src/DynamicData/Cache/Internal/TransformMany.cs index f38853c08..a356d09c9 100644 --- a/src/DynamicData/Cache/Internal/TransformMany.cs +++ b/src/DynamicData/Cache/Internal/TransformMany.cs @@ -8,6 +8,7 @@ using System.Reactive.Linq; using DynamicData.Binding; +using DynamicData.Internal; namespace DynamicData.Cache.Internal; @@ -122,7 +123,7 @@ private IObservable> CreateWithChangeS var subsequent = transformed.MergeMany(x => x.Changes).SynchronizeSafe(queue); - var allChanges = initial.Merge(subsequent).Select( + var allChanges = initial.UnsynchronizedMerge(subsequent).Select( changes => { result.Clone(changes); diff --git a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs index c7c95aa66..4bce97ac4 100644 --- a/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs +++ b/src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs @@ -25,7 +25,7 @@ public IObservable> Run() => Observable.Create CaptureChanges(cache, selector)).Select(changes => new ChangeSet(changes)).NotEmpty(); - var sourceAndRefreshes = shared.Merge(refresher); + var sourceAndRefreshes = shared.UnsynchronizedMerge(refresher); // do raw transform var transform = new Transform(sourceAndRefreshes, transformFactory, exceptionCallback, true).Run(); diff --git a/src/DynamicData/Cache/Internal/TreeBuilder.cs b/src/DynamicData/Cache/Internal/TreeBuilder.cs index bf379ef99..d94783e87 100644 --- a/src/DynamicData/Cache/Internal/TreeBuilder.cs +++ b/src/DynamicData/Cache/Internal/TreeBuilder.cs @@ -7,6 +7,8 @@ using System.Reactive.Linq; using System.Reactive.Subjects; +using DynamicData.Internal; + namespace DynamicData.Cache.Internal; internal sealed class TreeBuilder(IObservable> source, Func pivotOn, IObservable, bool>>? predicateChanged) @@ -197,7 +199,10 @@ void UpdateChildren(Node parentNode) reFilterObservable.OnNext(Unit.Default); }).DisposeMany().Subscribe(); - var filter = _predicateChanged.SynchronizeSafe(queue).CombineLatest(reFilterObservable, (predicate, _) => predicate); + // Both inputs are routed through the same SharedDeliveryQueue so their delivery is + // serialized; UnsynchronizedCombineLatest avoids the ABBA-prone gate that + // Observable.CombineLatest would hold across downstream delivery. + var filter = _predicateChanged.SynchronizeSafe(queue).UnsynchronizedCombineLatest(reFilterObservable.SynchronizeSafe(queue), (predicate, _) => predicate); var result = allNodes.Connect().Filter(filter).SubscribeSafe(observer); return new CompositeDisposable(result, parentSetter, allData, allNodes, groupedByPivot, Disposable.Create(() => reFilterObservable.OnCompleted()), queue); diff --git a/src/DynamicData/Cache/Internal/Virtualise.cs b/src/DynamicData/Cache/Internal/Virtualise.cs index 9a5fefbcc..8eb827a28 100644 --- a/src/DynamicData/Cache/Internal/Virtualise.cs +++ b/src/DynamicData/Cache/Internal/Virtualise.cs @@ -21,9 +21,13 @@ public IObservable> Run() => Observable.Create< var virtualiser = new Virtualiser(); var queue = new SharedDeliveryQueue(); - var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise).Where(x => x is not null).Select(x => x!); - var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update).Where(x => x is not null).Select(x => x!); - return new CompositeDisposable(request.Merge(dataChange).Where(updates => updates is not null).SubscribeSafe(observer), queue); + var request = _virtualRequests.SynchronizeSafe(queue).Select(virtualiser.Virtualise); + var dataChange = _source.SynchronizeSafe(queue).Select(virtualiser.Update); + + return new CompositeDisposable(request.UnsynchronizedMerge(dataChange) + .Where(updates => updates is not null) + .Select(x => x!) + .SubscribeSafe(observer), queue); }); private sealed class Virtualiser(VirtualRequest? request = null) diff --git a/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs new file mode 100644 index 000000000..43b3d94b3 --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs @@ -0,0 +1,58 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive; +using System.Reactive.Disposables; +using System.Reactive.Linq; + +namespace DynamicData.Internal; + +// Same-type Rx merge that owns a DeliveryQueue. Serializes notifications from +// every input through the queue, which releases its gate before delivering, so +// downstream observers that walk into another cache's writer lock cannot deadlock +// with this operator's serialization point. Used where every input has the same +// element type and no per-input projection is needed inside the drain. When element +// types differ or per-input projections are required, route each input through +// SharedDeliveryQueue with SynchronizeSafe and combine them with UnsynchronizedMerge. +internal static class DeliveryQueueMergeExtensions +{ + // Functionally equivalent to Observable.Merge: completes only after every source + // completes, the first error terminates, subscription occurs in argument order. + public static IObservable DeliveryQueueMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var queue = new DeliveryQueue(observer); + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources + 1); + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + // Subscription first so any terminal notification produced during Rx's disposal + // cascade still flows through the still-active queue. Queue last as cleanup. + subscriptions.Add(queue); + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase + // sets a one-shot stopped flag on the first OnCompleted or OnError. A single + // shared observer would silently drop terminal notifications from every source + // after the first. OnNext and OnError forward straight to the queue (the queue's + // gate serializes concurrent calls). OnCompleted is counter-gated so only the + // last surviving source's completion terminates the merged stream. + IObserver CreateInner() => + Observer.Create( + queue.OnNext, + queue.OnError, + () => + { + if (Interlocked.Decrement(ref remainingSources) == 0) + { + queue.OnCompleted(); + } + }); + }); +} diff --git a/src/DynamicData/Internal/SharedDeliveryQueue.cs b/src/DynamicData/Internal/SharedDeliveryQueue.cs index 6eab28894..9ec57a8e1 100644 --- a/src/DynamicData/Internal/SharedDeliveryQueue.cs +++ b/src/DynamicData/Internal/SharedDeliveryQueue.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. diff --git a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs index ace8ed4a1..063e9270a 100644 --- a/src/DynamicData/Internal/SynchronizeSafeExtensions.cs +++ b/src/DynamicData/Internal/SynchronizeSafeExtensions.cs @@ -2,55 +2,45 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; namespace DynamicData.Internal; -/// -/// Provides SynchronizeSafe extension methods, drop-in replacements -/// for Synchronize(lock) that release the lock before downstream delivery. -/// -/// -/// Disposal ordering matters. CompositeDisposable disposes in -/// declaration order. The queue and the source subscription have different roles: -/// -/// -/// Subscription-first (gate and SDQ overloads) -/// The queue is the IObserver that the source sends notifications to. -/// Disposing the subscription first allows any final terminal notification (OnCompleted/OnError -/// triggered by Rx's disposal cascade or a Finally operator) to flow through the -/// still-active queue. The queue is disposed last as cleanup. -/// -/// -/// Queue-first (parameterless overload) -/// Used by operators with teardown side effects (DisposeMany, OnBeingRemoved). -/// The queue is terminated first via , which ensures -/// all in-flight deliveries complete before the subscription is disposed and teardown logic -/// (e.g., disposing removed items) runs. Terminal notifications are not needed because -/// the subscriber is explicitly tearing down. -/// -/// -/// +// Drop-in replacements for Observable.Synchronize(lock) that release the lock before +// downstream delivery, plus UnsynchronizedMerge for combining streams whose inputs are +// already serialized through the same queue. +// +// Disposal ordering matters. CompositeDisposable disposes in declaration order, and the +// queue and the source subscription have different roles: +// +// Subscription-first (gate and SDQ overloads): the queue is the IObserver that the +// source sends notifications to. Disposing the subscription first allows any final +// terminal notification (OnCompleted or OnError triggered by Rx's disposal cascade +// or a Finally operator) to flow through the still-active queue. The queue is +// disposed last as cleanup. +// +// Queue-first (parameterless overload): used by operators with teardown side effects +// (DisposeMany, OnBeingRemoved). The queue is terminated first via DeliveryQueue.Dispose, +// which ensures all in-flight deliveries complete before the subscription is disposed +// and teardown logic (e.g. disposing removed items) runs. Terminal notifications are +// not needed because the subscriber is explicitly tearing down. internal static class SynchronizeSafeExtensions { - /// - /// Synchronizes the source observable through a . - /// Use when multiple sources of different types share a gate. - /// + // Routes the source through a SharedDeliveryQueue. Use when multiple sources of + // different types share a gate. public static IObservable SynchronizeSafe(this IObservable source, SharedDeliveryQueue queue) => Observable.Create(observer => { var subQueue = queue.CreateQueue(observer); - // Subscription first: terminal notifications flow through the still-active sub-queue + // Subscription first: terminal notifications flow through the still-active sub-queue. return new CompositeDisposable(source.SubscribeSafe(subQueue), subQueue); }); - /// - /// Synchronizes the source observable through an implicitly created . - /// Drop-in replacement for Synchronize(locker). - /// + // Routes the source through an implicitly created DeliveryQueue. Drop-in replacement + // for Observable.Synchronize(locker). #if NET9_0_OR_GREATER public static IObservable SynchronizeSafe(this IObservable source, Lock gate) => #else @@ -60,22 +50,167 @@ public static IObservable SynchronizeSafe(this IObservable source, obje { var queue = new DeliveryQueue(gate, observer); - // Subscription first: terminal notifications flow through the still-active queue + // Subscription first: terminal notifications flow through the still-active queue. return new CompositeDisposable(source.SubscribeSafe(queue), queue); }); - /// - /// Synchronizes the source observable through an implicitly created - /// with automatic delivery completion on dispose. The queue is terminated and drained - /// before the source subscription is disposed, ensuring all in-flight notifications - /// are delivered before teardown. - /// + // Routes the source through an implicitly created DeliveryQueue with automatic + // delivery completion on dispose. The queue is terminated and drained before the + // source subscription is disposed, ensuring all in-flight notifications are delivered + // before teardown. public static IObservable SynchronizeSafe(this IObservable source) => Observable.Create(observer => { var queue = new DeliveryQueue(observer); - // Queue first: ensures in-flight deliveries complete before teardown side effects run + // Queue first: ensures in-flight deliveries complete before teardown side effects run. return new CompositeDisposable(queue, source.SubscribeSafe(queue)); }); + + // Merges every input into a single observable without taking any synchronization gate. + // Functionally equivalent to Observable.Merge: completes only after every source completes, + // the first error terminates, subscription occurs in argument order. + // + // The caller MUST ensure that delivery from every source is already serialized. In this + // library the precondition is satisfied by routing every source through the same + // SharedDeliveryQueue via SynchronizeSafe(queue). The shared queue's drain loop guarantees + // that at most one notification is in flight to the downstream observer at a time, so the + // additional gate that Observable.Merge would install is redundant. + // + // The gate omission matters in cross-cache pipelines: Observable.Merge holds its private + // _gate for the entire duration of downstream delivery, and when downstream delivery walks + // into another cache's writer lock, two such gates on two operators form an ABBA cycle that + // the queue-drain design is meant to prevent. + // + // Without the external serialization precondition, concurrent OnNext calls into the shared + // observer will race. Do not use as a general-purpose Observable.Merge replacement. + public static IObservable UnsynchronizedMerge(this IObservable first, params IObservable[] others) => + Observable.Create(observer => + { + var remainingSources = others.Length + 1; + var subscriptions = new CompositeDisposable(remainingSources); + var terminated = 0; + + subscriptions.Add(first.SubscribeSafe(CreateInner())); + foreach (var source in others) + { + subscriptions.Add(source.SubscribeSafe(CreateInner())); + } + + return subscriptions; + + // Each source needs its own inner observer instance because Rx's ObserverBase sets + // a one-shot stopped flag on the first OnCompleted or OnError. A single shared + // observer would silently drop terminal notifications from every source after the + // first. The OnNext/OnError/OnCompleted actions close over the shared remainingSources + // and terminated counters so cross-source coordination still works. + IObserver CreateInner() => Observer.Create(OnNextSafe, OnErrorSafe, OnCompletedSafe); + + void OnNextSafe(T value) + { + if (Volatile.Read(ref terminated) == 0) + { + observer.OnNext(value); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); + + // Two-input CombineLatest variant that does NOT install a gate. Functionally equivalent + // to Observable.CombineLatest: holds the most-recent value from each source, emits a + // resultSelector output whenever either source fires (provided the other has also fired + // at least once), the first error terminates, completes when both sources complete. + // + // Same precondition as UnsynchronizedMerge: delivery from BOTH sources must already be + // serialized through the same external gate before reaching this operator. In this library + // that is satisfied by routing both inputs through the same SharedDeliveryQueue via + // SynchronizeSafe(queue). Under that precondition no two OnNext calls overlap, so the + // latest-value state needs no internal locking, and the gate that + // Observable.CombineLatest installs becomes redundant. + // + // The Rx gate matters here for the same reason as Merge: Observable.CombineLatest holds + // its private _gate for the entire downstream delivery, and any operator-level lock held + // across a cross-cache write reconstructs the ABBA cycle the queue-drain design is meant + // to prevent. + // + // Without the external serialization precondition, concurrent OnNext calls would race the + // latest-value state and could produce torn reads. Do not use as a general-purpose + // Observable.CombineLatest replacement. + public static IObservable UnsynchronizedCombineLatest( + this IObservable first, + IObservable second, + Func resultSelector) + where TFirst : notnull + where TSecond : notnull => + Observable.Create(observer => + { + var firstLatest = Optional.None(); + var secondLatest = Optional.None(); + var remainingSources = 2; + var terminated = 0; + + var subscriptions = new CompositeDisposable(2); + subscriptions.Add(first.SubscribeSafe(Observer.Create(OnFirstNext, OnErrorSafe, OnCompletedSafe))); + subscriptions.Add(second.SubscribeSafe(Observer.Create(OnSecondNext, OnErrorSafe, OnCompletedSafe))); + return subscriptions; + + void OnFirstNext(TFirst value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + firstLatest = value; + if (secondLatest.HasValue) + { + observer.OnNext(resultSelector(value, secondLatest.Value)); + } + } + + void OnSecondNext(TSecond value) + { + if (Volatile.Read(ref terminated) != 0) + { + return; + } + + secondLatest = value; + if (firstLatest.HasValue) + { + observer.OnNext(resultSelector(firstLatest.Value, value)); + } + } + + void OnErrorSafe(Exception error) + { + if (Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnError(error); + } + } + + void OnCompletedSafe() + { + if (Interlocked.Decrement(ref remainingSources) == 0 && Interlocked.Exchange(ref terminated, 1) == 0) + { + observer.OnCompleted(); + } + } + }); }