Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9d6dfb3
test: round-3 coverage expansion across 10 more classes
glennawatson May 18, 2026
406746f
Merge branch 'main' into tests/expand-operator-coverage-round-3
glennawatson May 19, 2026
a1bebf2
test: cover terminal branches of scheduled, debounce and pairing sync…
glennawatson May 19, 2026
bedac54
test: cover retry, throttle, where-select fusion and from-array facto…
glennawatson May 19, 2026
de71560
fix(tests): drop unreliable timing probe in AsyncGate contention test
glennawatson May 19, 2026
6d2681d
fix(tests): wait for resource dispose after scheduled Using completion
glennawatson May 19, 2026
1a3869a
test: cover null-guards, terminal forwarding and disposal edges acros…
glennawatson May 19, 2026
a512a8e
Merge branch 'main' into tests/expand-operator-coverage-round-3
glennawatson May 19, 2026
1cbd241
test: cover partition broadcasts, fused-operator error forwarding, ex…
glennawatson May 19, 2026
0f789b5
test: cover after-terminal sink guards and fused-operator slow paths
glennawatson May 19, 2026
72e2c49
test: cover skip/take-while error paths, partition mid-array remove, …
glennawatson May 19, 2026
4342ffb
test: cover filter-fusion error forwarding, TakeUntil cancellable ove…
glennawatson May 19, 2026
1d751b8
test+ci: cover RunAll/BooleanReduce/MinMax/SampleLatest after-termina…
glennawatson May 19, 2026
e47bc83
refactor(parity-fusions): extract post-delay decision and partition a…
glennawatson May 19, 2026
9355e4c
test+refactor: cover after-terminal sinks on remaining sync operators…
glennawatson May 19, 2026
db59d4e
refactor(merge,conflate): promote nested sinks to internal and extrac…
glennawatson May 19, 2026
e80160d
remove(observer-async,parity-fusions): drop dead try/catches that mod…
glennawatson May 19, 2026
3164849
test+remove: drop redundant OCE catch in Throttle delay loop, add For…
glennawatson May 19, 2026
2bb9441
test: cover DebounceUntilObservable error/completion forwarding and W…
glennawatson May 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
minverMinimumMajorMinor: '2.3'
sonarProjectKey: reactiveui_Extensions
sonarOrganization: reactiveui
sonarExclusions: '**/tests/**,**/tools/**,**/benchmarks/**'
sonarExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/TestResults/**'
sonarCoverageExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/*Tests/**,**/*Tests.cs,**/Generated/**'
# CombineLatest{N}.cs are template-style arity-specific operator files. The remaining
# duplication after the lifecycle / indexed-observer / base-class extractions is the
Expand Down
17 changes: 5 additions & 12 deletions src/ReactiveUI.Extensions/Async/ObserverAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,11 @@ public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellat
return default;
}

ValueTask core;
try
{
core = OnErrorResumeAsync_Private(error, scope.Token);
}
catch (Exception e)
{
UnhandledExceptionHandler.OnUnhandledException(e);
scope.Dispose();
ExitOnSomethingCall();
return default;
}
// OnErrorResumeAsync_Private is an async ValueTask method — any sync or async exception
// it raises is captured into the returned ValueTask and surfaces through the await in
// OnErrorResumeAsyncSlow. A try/catch around the invocation expression itself would be
// dead code in modern C# async semantics.
var core = OnErrorResumeAsync_Private(error, scope.Token);

if (core.IsCompletedSuccessfully)
{
Expand Down
92 changes: 70 additions & 22 deletions src/ReactiveUI.Extensions/Async/Operators/Merge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,40 @@ internal void LinkExternalCancellation(CancellationToken external)
_disposeCts);
}

/// <summary>
/// Re-checks the disposed flag inside the serialization gate and forwards the value to
/// downstream if still alive. Extracted as an <see langword="internal"/> method so the
/// inside-gate after-dispose decision is directly unit-testable without racing the gate.
/// </summary>
/// <param name="value">The value to forward.</param>
/// <returns>A task representing the asynchronous forward operation.</returns>
internal ValueTask ForwardOnNextLocked(T value)
{
if (DisposalHelper.IsDisposed(_disposed))
{
return default;
}

return _observer.OnNextAsync(value, DisposedCancellationToken);
}

/// <summary>
/// Re-checks the disposed flag inside the serialization gate and forwards the error to
/// downstream if still alive. Extracted as an <see langword="internal"/> method for
/// direct unit testing.
/// </summary>
/// <param name="exception">The error to forward.</param>
/// <returns>A task representing the asynchronous forward operation.</returns>
internal ValueTask ForwardOnErrorResumeLocked(Exception exception)
{
if (DisposalHelper.IsDisposed(_disposed))
{
return default;
}

return _observer.OnErrorResumeAsync(exception, DisposedCancellationToken);
}

/// <summary>
/// Forwards a value to the downstream observer under the serialization gate.
/// </summary>
Expand All @@ -244,12 +278,7 @@ protected internal async ValueTask ForwardOnNext(T value, CancellationToken canc

using (await _onSomethingGate.LockAsync(DisposedCancellationToken).ConfigureAwait(false))
{
if (DisposalHelper.IsDisposed(_disposed))
{
return;
}

await _observer.OnNextAsync(value, DisposedCancellationToken).ConfigureAwait(false);
await ForwardOnNextLocked(value).ConfigureAwait(false);
}
}

Expand All @@ -271,12 +300,7 @@ protected internal async ValueTask ForwardOnErrorResume(

using (await _onSomethingGate.LockAsync(DisposedCancellationToken).ConfigureAwait(false))
{
if (DisposalHelper.IsDisposed(_disposed))
{
return;
}

await _observer.OnErrorResumeAsync(exception, DisposedCancellationToken).ConfigureAwait(false);
await ForwardOnErrorResumeLocked(exception).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -649,13 +673,25 @@ internal async ValueTask OnNextAsync(T value, CancellationToken token)

using (await _onSomethingGate.LockAsync(_disposedCancellationToken).ConfigureAwait(false))
{
if (DisposalHelper.IsDisposed(_disposed))
{
return;
}
await OnNextAsyncLocked(value).ConfigureAwait(false);
}
}

await _observer.OnNextAsync(value, _disposedCancellationToken).ConfigureAwait(false);
/// <summary>
/// Re-checks the disposed flag inside the serialization gate and forwards the value
/// to downstream if still alive. Extracted as an <see langword="internal"/> method for
/// direct unit testing of the inside-gate after-dispose decision.
/// </summary>
/// <param name="value">The value to forward.</param>
/// <returns>A task representing the asynchronous forward operation.</returns>
internal ValueTask OnNextAsyncLocked(T value)
{
if (DisposalHelper.IsDisposed(_disposed))
{
return default;
}

return _observer.OnNextAsync(value, _disposedCancellationToken);
}

/// <summary>
Expand All @@ -674,13 +710,25 @@ internal async ValueTask OnErrorResumeAsync(Exception ex, CancellationToken toke

using (await _onSomethingGate.LockAsync(_disposedCancellationToken).ConfigureAwait(false))
{
if (DisposalHelper.IsDisposed(_disposed))
{
return;
}
await OnErrorResumeAsyncLocked(ex).ConfigureAwait(false);
}
}

await _observer.OnErrorResumeAsync(ex, _disposedCancellationToken).ConfigureAwait(false);
/// <summary>
/// Re-checks the disposed flag inside the serialization gate and forwards the error
/// to downstream if still alive. Extracted as an <see langword="internal"/> method
/// for direct unit testing of the inside-gate after-dispose decision.
/// </summary>
/// <param name="ex">The error to forward.</param>
/// <returns>A task representing the asynchronous forward operation.</returns>
internal ValueTask OnErrorResumeAsyncLocked(Exception ex)
{
if (DisposalHelper.IsDisposed(_disposed))
{
return default;
}

return _observer.OnErrorResumeAsync(ex, _disposedCancellationToken);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,39 @@ protected override ValueTask<IAsyncDisposable> SubscribeAsyncCore(
internal sealed class ObserveOnObserver(IObserverAsync<T> observer, AsyncContext asyncContext, bool forceYielding)
: ObserverAsync<T>
{
/// <summary>Slow path: switch to the target context then forward the value.
/// Exposed as <see langword="internal"/> so tests can invoke the slow-path body
/// directly without needing to race the current-context check.</summary>
/// <param name="value">The value to forward.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
internal async ValueTask SwitchThenForwardAsync(T value, CancellationToken cancellationToken)
{
await asyncContext.SwitchContextAsync(forceYielding, cancellationToken);
await observer.OnNextAsync(value, cancellationToken).ConfigureAwait(false);
}

/// <summary>Slow path: switch to the target context then forward the error.
/// Exposed as <see langword="internal"/> for direct unit testing.</summary>
/// <param name="error">The error to forward.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
internal async ValueTask SwitchThenErrorAsync(Exception error, CancellationToken cancellationToken)
{
await asyncContext.SwitchContextAsync(forceYielding, cancellationToken);
await observer.OnErrorResumeAsync(error, cancellationToken).ConfigureAwait(false);
}

/// <summary>Slow path: switch to the target context then forward completion.
/// Exposed as <see langword="internal"/> for direct unit testing.</summary>
/// <param name="result">The completion result.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
internal async ValueTask SwitchThenCompletedAsync(Result result)
{
await asyncContext.SwitchContextAsync(forceYielding, CancellationToken.None);
await observer.OnCompletedAsync(result).ConfigureAwait(false);
}

/// <inheritdoc/>
protected override ValueTask OnNextAsyncCore(T value, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -68,34 +101,5 @@ protected override ValueTask OnCompletedAsyncCore(Result result)

return SwitchThenCompletedAsync(result);
}

/// <summary>Slow path: switch to the target context then forward the value.</summary>
/// <param name="value">The value to forward.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
private async ValueTask SwitchThenForwardAsync(T value, CancellationToken cancellationToken)
{
await asyncContext.SwitchContextAsync(forceYielding, cancellationToken);
await observer.OnNextAsync(value, cancellationToken).ConfigureAwait(false);
}

/// <summary>Slow path: switch to the target context then forward the error.</summary>
/// <param name="error">The error to forward.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
private async ValueTask SwitchThenErrorAsync(Exception error, CancellationToken cancellationToken)
{
await asyncContext.SwitchContextAsync(forceYielding, cancellationToken);
await observer.OnErrorResumeAsync(error, cancellationToken).ConfigureAwait(false);
}

/// <summary>Slow path: switch to the target context then forward completion.</summary>
/// <param name="result">The completion result.</param>
/// <returns>A task that completes after the context switch and downstream forward.</returns>
private async ValueTask SwitchThenCompletedAsync(Result result)
{
await asyncContext.SwitchContextAsync(forceYielding, CancellationToken.None);
await observer.OnCompletedAsync(result).ConfigureAwait(false);
}
}
}
Loading
Loading