This reference describes the concurrency types exposed through Hugo.Go. Each section lists primary APIs, behaviour, and diagnostics emitted via GoDiagnostics.
- WaitGroup
- Mutex
- RwMutex
- Channels
- Task queue leasing
- Select / fan-in utilities
- Result orchestration helpers
- Timers
- Deterministic utilities
Best practice: Configure
GoDiagnostics(or callAddHugoDiagnostics) before constructing these primitives so counters, histograms, and activity sources capture the full lifecycle.
Tracks asynchronous operations and delays shutdown until every task completes.
WaitGroup.Add(int delta)/Add(Task task)WaitGroup.Go(Func<Task> work, CancellationToken cancellationToken = default, TaskScheduler? scheduler = null, TaskCreationOptions creationOptions = TaskCreationOptions.DenyChildAttach)WaitGroup.Go(Task task)/WaitGroup.Go(ValueTask task)WaitGroup.Done()WaitGroup.WaitAsync(CancellationToken cancellationToken = default)WaitGroup.WaitAsync(TimeSpan timeout, TimeProvider? provider = null, CancellationToken cancellationToken = default)(returnsbool)GoWaitGroupExtensions.Go(Func<CancellationToken, Task> work, CancellationToken cancellationToken, TaskScheduler? scheduler = null, TaskCreationOptions creationOptions = TaskCreationOptions.DenyChildAttach)when you prefer to pass an explicit token along with scheduling hints
var wg = new WaitGroup();
wg.Go(async () =>
{
await Task.Delay(50, cancellationToken);
});
var completed = await wg.WaitAsync(
timeout: TimeSpan.FromMilliseconds(250),
provider: timeProvider,
cancellationToken: cancellationToken);WaitAsync(TimeSpan, ...)returnsfalsewhen a timeout elapses; the parameterless overload completes when the counter reaches zero.- Cancellation surfaces as
Error.Canceledin result pipelines andOperationCanceledExceptionotherwise. - Diagnostics:
waitgroup.additions,waitgroup.completions,waitgroup.outstanding. - Prefer the scheduler-aware
Gooverloads when you needTaskCreationOptions.LongRunningor a customTaskScheduler, and useGo(Task)/Go(ValueTask)(orGo.Run(ValueTask)+WaitGroup.Add) when you already have a running operation that should be tracked without an extraTask.Run.
Provides mutual exclusion with synchronous (EnterScope) and asynchronous (LockAsync) releasers.
var mutex = new Mutex();
await using (await mutex.LockAsync(cancellationToken))
{
// Exclusive access
}- Pending waiters honour cancellation tokens.
- Dispose the releaser (via
await using/using) to release the lock.
Reader/writer lock that allows concurrent readers or a single writer.
var rwMutex = new RwMutex();
await using (await rwMutex.RLockAsync(ct))
{
// Multiple readers permitted
}
await using (await rwMutex.LockAsync(ct))
{
// Exclusive writer
}- Writer acquisition blocks new readers until released.
- Cancellation of a pending reader/writer propagates
OperationCanceledException.
Go.MakeChannel<T> creates unbounded or bounded channels for message passing. Fluent builders provide DI-friendly factories when you need to customise options once and share the channel through dependency injection.
MakeChannel<T>(int? capacity = null)MakeChannel<T>(BoundedChannelOptions options)MakePrioritizedChannel<T>(int priorityLevels, int defaultPriority)Go.BoundedChannel<T>(int capacity)Go.PrioritizedChannel<T>()/Go.PrioritizedChannel<T>(int priorityLevels)
var channel = Go.MakeChannel<string>(capacity: 32);
await channel.Writer.WriteAsync("message", ct);
var value = await channel.Reader.ReadAsync(ct);- Bounded channels respect
FullModefromBoundedChannelOptions. TryComplete(Exception?)propagates faults to readers.
services.AddBoundedChannel<Job>(capacity: 64, builder => builder
.SingleWriter()
.WithFullMode(BoundedChannelFullMode.DropOldest));
services.AddPrioritizedChannel<Job>(priorityLevels: 3, builder => builder
.WithCapacityPerLevel(32)
.WithPrefetchPerPriority(2)
.WithDefaultPriority(1));AddBoundedChannel<T>registers aChannel<T>alongside its reader and writer.AddPrioritizedChannel<T>registersPrioritizedChannel<T>, the prioritized reader/writer helpers, and the baseChannelReader<T>/ChannelWriter<T>facades.- Prioritized channels prefetch at most
PrefetchPerPriorityitems per lane (default 1). Tune it throughPrioritizedChannelOptions.PrefetchPerPriorityorWithPrefetchPerPriorityto balance throughput against backpressure. - Perf tip: When only one consumer drains the channel, set
SingleReader = true(or callSingleReader()on the builder). Hugo then uses a lightweight per-lane buffer instead of multi-producer queues, reducing allocations and contention while remaining Native AOT friendly. Keep itfalseif multiple readers may observe the channel. - The prioritized reader’s slow path now reuses wait registrations instead of
Task.WhenAnyarrays, soWaitToReadAsyncstays effectively allocation-free when lanes run hot. - Exceptions or cancellations from individual priority lanes are observed immediately and surfaced through the unified reader, preventing
UnobservedTaskExceptionwarnings when a single lane faults. - Builders expose
.Build()when you need an inline channel instance without DI.
TaskQueue<T> layers cooperative leasing semantics on top of channels. Producers enqueue work items, workers lease them for a configurable duration, and can heartbeat, complete, or fail each lease. Options configure queue naming, buffer size, lease duration, heartbeat cadence, sweep interval, requeue delay, maximum delivery attempts, and optional backpressure callbacks.
var options = new TaskQueueOptions
{
Name = "telemetry-queue",
LeaseDuration = TimeSpan.FromSeconds(10),
HeartbeatInterval = TimeSpan.FromSeconds(2),
RequeueDelay = TimeSpan.FromMilliseconds(250),
Backpressure = new TaskQueueBackpressureOptions
{
HighWatermark = 256,
LowWatermark = 64,
Cooldown = TimeSpan.FromSeconds(5),
StateChanged = state =>
{
if (state.IsActive)
{
logger.LogWarning("Throttling appends at depth {Depth}", state.PendingCount);
}
else
{
logger.LogInformation("Backpressure cleared at depth {Depth}", state.PendingCount);
}
}
}
};
await using var queue = new TaskQueue<Job>(options);
await queue.EnqueueAsync(new Job("alpha"), ct);
var lease = await queue.LeaseAsync(ct);
try
{
logger.LogDebug("Ownership token {Token}", lease.OwnershipToken);
await ProcessAsync(lease.Value, ct);
await lease.CompleteAsync(ct);
}
catch (Exception ex)
{
await lease.FailAsync(Error.FromException(ex), requeue: true, ct);
}When draining a node for maintenance, capture the outstanding work and restore it on the new process:
IReadOnlyList<TaskQueuePendingItem<Job>> pending = await queue.DrainPendingItemsAsync(ct);
await durableStore.SaveAsync(pending, ct);
// Later, or on another instance
await queue.RestorePendingItemsAsync(await durableStore.LoadAsync(ct), ct);- Heartbeats extend the lease without handing work to another worker while still respecting
HeartbeatIntervalthrottling. FailAsynccaptures the providedError, increments the attempt, and either requeues or dead-letters whenMaxDeliveryAttemptsis exceeded.- Expired leases are detected by a background sweep and automatically requeued with an
error.taskqueue.lease_expiredpayload. - Every lease exposes a monotonic
OwnershipToken(sequence,attempt,leaseId) that you can log or persist as a fencing token. DrainPendingItemsAsync/RestorePendingItemsAsyncmake rolling upgrades safe by snapshotting in-flight work without losing metadata.ConfigureBackpressureorTaskQueueBackpressureMonitor<T>keep backlog transitions observable without recreating queues;QueueNameexposes the resolved name for diagnostics.
Hook TaskQueueHealthCheck<T> into ASP.NET Core health probes to block rollouts whenever backlog exceeds a threshold:
builder.Services.AddTaskQueueHealthCheck<Job>(
name: "job-queue",
queueFactory: sp => sp.GetRequiredService<TaskQueue<Job>>(),
configure: options =>
{
options.PendingDegradedThreshold = 512;
options.PendingUnhealthyThreshold = 1024;
options.ActiveLeaseDegradedThreshold = 32;
});Expose /health/ready via app.MapHealthChecks so orchestrators wait for pending work to drain before terminating replicas.
TaskQueueBackpressureMonitor<T> exports SafeTaskQueue-compatible backpressure signals, rate limiter switches, diagnostics channels, and WaitForDrainingAsync helpers so producers can await relief before enqueueing more work. Monitors reconfigure the queue’s backpressure thresholds at runtime and publish typed TaskQueueBackpressureSignal instances whenever depth crosses the configured watermarks.
await using var queue = new TaskQueue<Job>(new TaskQueueOptions { Name = "telemetry", Capacity = 2048 });
await using var monitor = new TaskQueueBackpressureMonitor<Job>(queue, new TaskQueueBackpressureMonitorOptions
{
HighWatermark = 512,
LowWatermark = 128,
Cooldown = TimeSpan.FromSeconds(5)
});
await using var diagnostics = new TaskQueueBackpressureDiagnosticsListener(capacity: 256);
using var diagnosticsSubscription = monitor.RegisterListener(diagnostics);
var limiter = new BackpressureAwareRateLimiter(
unthrottledLimiter: new ConcurrencyLimiter(new ConcurrencyLimiterOptions(permitLimit: 256, queueLimit: 0, queueProcessingOrder: QueueProcessingOrder.OldestFirst)),
backpressureLimiter: new ConcurrencyLimiter(new ConcurrencyLimiterOptions(permitLimit: 16, queueLimit: 512, queueProcessingOrder: QueueProcessingOrder.OldestFirst)),
disposeUnthrottledLimiter: true,
disposeBackpressureLimiter: true);
using var limiterSubscription = monitor.RegisterListener(limiter);
builder.Services.AddRateLimiter(options =>
{
options.RejectionStatusCode = StatusCodes.Status429TooManyRequests;
options.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(_ =>
RateLimitPartition.Get("taskqueue", _ => limiter.LimiterSelector()));
});
app.UseRateLimiter();
app.MapGet("/control-plane/backpressure", async context =>
{
await foreach (var signal in diagnostics.Reader.ReadAllAsync(context.RequestAborted))
{
await context.Response.WriteAsJsonAsync(signal, context.RequestAborted);
}
});
// Enqueueers can await relief instead of blindly retrying.
if (monitor.IsActive)
{
await monitor.WaitForDrainingAsync(ct);
}Metrics emitted via GoDiagnostics include hugo.taskqueue.backpressure.active (up/down gauge per queue), hugo.taskqueue.backpressure.pending (histogram of pending depth), hugo.taskqueue.backpressure.transitions (counter), and hugo.taskqueue.backpressure.duration (histogram of state durations). Attach a TaskQueueBackpressureDiagnosticsListener to expose current/streamed signals over HTTP or gRPC for control-plane automation.
await using var queue = new TaskQueue<EventPayload>();
await using var adapter = TaskQueueChannelAdapter<EventPayload>.Create(queue, concurrency: 4);
await queue.EnqueueAsync(payload, ct);
await foreach (var lease in adapter.Reader.ReadAllAsync(ct))
{
try
{
await HandleAsync(lease.Value, ct);
await lease.CompleteAsync(ct);
}
catch (Exception ex)
{
await lease.FailAsync(Error.FromException(ex), requeue: true, ct);
}
}- Pumps run in the background and publish leases to the channel reader. If the channel is closed or cancellation triggers before delivery, the adapter requeues the lease with
Error.Canceledmetadata. concurrencycontrols both the number of pumps and the number of outstanding leases; the default channel is bounded to this limit so slow consumers cannot cause_queue._leasesto grow unchecked. Provide a custom boundedChannel<TaskQueueLease<T>>if you need different buffering semantics.- Disposing the adapter waits for pumps to finish; when
ownsQueueistrue, the underlying queue is disposed as well.
SafeTaskQueueWrapper<T> converts the exception-heavy surface of TaskQueue<T> into Result<Unit> and Result<SafeTaskQueueLease<T>> responses so producers and consumers can branch without try/catch blocks.
await using var queue = new TaskQueue<Job>();
await using var safeQueue = new SafeTaskQueueWrapper<Job>(queue);
var enqueue = await safeQueue.EnqueueAsync(new Job("alpha"), ct);
if (enqueue.IsFailure)
{
logger.LogWarning("Queue enqueue failed: {Error}", enqueue.Error);
return;
}
Result<SafeTaskQueueLease<Job>> leaseResult = await safeQueue.LeaseAsync(ct);
if (leaseResult.IsFailure)
{
logger.LogWarning("Lease failed: {Error}", leaseResult.Error);
return;
}
SafeTaskQueueLease<Job> lease = leaseResult.Value;
var complete = await lease.CompleteAsync(ct);
if (complete.IsFailure)
{
logger.LogWarning("Completion failed: {Error}", complete.Error);
}EnqueueAsyncandLeaseAsynctranslateOperationCanceledException,ObjectDisposedException, and other failures into structuredErrorcodes such aserror.taskqueue.disposed.Wrap(TaskQueueLease<T>)adapts existing leases (for example those surfaced byTaskQueueChannelAdapter) intoSafeTaskQueueLease<T>instances.DisposeAsyncoptionally disposes the underlying queue whenownsQueueistrue.SafeTaskQueueLease<T>methods (CompleteAsync,HeartbeatAsync,FailAsync) returnResult<Unit>and normalise cancellations and inactive lease states toerror.taskqueue.lease_inactive.SafeTaskQueueLease<T>surfaces the underlyingSequenceIdandOwnershipTokenso producers/consumers can log the same fencing metadata as the raw lease.
Await whichever channel case becomes ready first.
Go.SelectAsync<TResult>(params ChannelCase<TResult>[] cases)to await the first ready case.Go.SelectAsync<TResult>(TimeSpan timeout, TimeProvider? provider = null, CancellationToken cancellationToken = default, params ChannelCase<TResult>[] cases)for deadline-aware selects.Go.Select<TResult>(TimeProvider? provider = null, CancellationToken cancellationToken = default)/Go.Select<TResult>(TimeSpan timeout, TimeProvider? provider = null, CancellationToken cancellationToken = default)fluent builders.ChannelCase.Create<T, TResult>(ChannelReader<T>, Func<T, CancellationToken, ValueTask<Result<TResult>>>)plus overloads for ValueTask callbacks andChannelCase.CreateDefault<TResult>(...).
var result = await Go.SelectAsync(
timeout: TimeSpan.FromSeconds(1),
cases: new[]
{
ChannelCase.Create(textChannel.Reader, async (text, ct) =>
{
Console.WriteLine($"text: {text}");
return Result.Ok(Go.Unit.Value);
}),
ChannelCase.Create(signal.Reader, (unit, _) =>
{
Console.WriteLine("signal received");
return Task.FromResult(Result.Ok(Go.Unit.Value));
})
},
cancellationToken: ct);- Diagnostics capture attempts, completions, latency, cancellations.
- Timeouts use
TimeProviderwhen supplied. - Cancelled tokens surface as
Error.Canceledwith originating token metadata. SelectBuildersupports.Default(...)fallbacks, per-casepriorityordering, and.Deadline(...)helpers for timer-driven outcomes.
Go.SelectFanInAsync(...)repeatedly selects until all cases complete, returningResult<Unit>so caller code can surface errors. When you pass a timeout the helper captures a single absolute deadline (using the suppliedTimeProviderwhen present) and enforces it across the entire fan-in session rather than per-iteration waits. UseGo.SelectFanInValueTaskAsync(...)when your continuations already returnValueTaskso the loop can stay allocation free.Go.FanInAsync(IEnumerable<ChannelReader<T>>, ChannelWriter<T>, bool completeDestination, TimeSpan? timeout = null, TimeProvider? provider = null, CancellationToken cancellationToken = default)merges multiple readers into an existing writer and returnsResult<Unit>.Go.FanIn(IEnumerable<ChannelReader<T>>, TimeSpan? timeout = null, TimeProvider? provider = null, CancellationToken cancellationToken = default)returns a new reader and internally manages the destination channel lifecycle.
See Coordinate fan-in workflows for step-by-step usage.
Go.FanOutAsync(IEnumerable<Func<CancellationToken, Task<Result<T>>>> operations, ResultExecutionPolicy? policy = null, CancellationToken cancellationToken = default, TimeProvider? timeProvider = null)executes delegates concurrently and aggregates values throughResult.WhenAll. PreferGo.FanOutValueTaskAsync(IEnumerable<Func<CancellationToken, ValueTask<Result<T>>>> ...)when your delegates already returnValueTask<Result<T>>.Go.RaceAsync(IEnumerable<Func<CancellationToken, Task<Result<T>>>> operations, ResultExecutionPolicy? policy = null, CancellationToken cancellationToken = default, TimeProvider? timeProvider = null)returns the first successful result (Result.WhenAnyunder the covers) and compensates secondary successes. UseGo.RaceValueTaskAsync(...)for ValueTask-based delegates.Go.WithTimeoutAsync(Func<CancellationToken, Task<Result<T>>> operation, TimeSpan timeout, TimeProvider? timeProvider = null, CancellationToken cancellationToken = default)producesError.Timeoutwhen the deadline elapses, returnsError.Canceledif the supplied token fires first, otherwise forwards the inner result.Go.WithTimeoutValueTaskAsync(...)mirrors the behavior for ValueTask-returning delegates.Go.RetryAsync(Func<int, CancellationToken, Task<Result<T>>> operation, int maxAttempts = 3, TimeSpan? initialDelay = null, TimeProvider? timeProvider = null, ILogger? logger = null, CancellationToken cancellationToken = default)applies exponential backoff usingResult.RetryWithPolicyAsync, propagates structured retry metadata, and halts immediately when the delegate throws or returns anError.Canceled, regardless of which linked token triggered it. Reach forGo.RetryValueTaskAsync(...)when the retried delegate returnsValueTask<Result<T>>.
Timer primitives mirror Go semantics while honouring TimeProvider.
Go.DelayAsync(TimeSpan delay, TimeProvider? provider = null, CancellationToken cancellationToken = default)Go.After(TimeSpan delay, TimeProvider? provider = null, CancellationToken cancellationToken = default)Go.AfterAsync(TimeSpan delay, TimeProvider? provider = null, CancellationToken cancellationToken = default)Go.AfterValueTaskAsync(TimeSpan delay, TimeProvider? provider = null, CancellationToken cancellationToken = default)Go.NewTicker(TimeSpan period, TimeProvider? provider = null, CancellationToken cancellationToken = default)Go.Tick(TimeSpan period, TimeProvider? provider = null, CancellationToken cancellationToken = default)
var provider = new FakeTimeProvider();
var once = await Go.AfterAsync(TimeSpan.FromSeconds(5), provider, ct);
await using var ticker = Go.NewTicker(TimeSpan.FromSeconds(1), provider, ct);
var tick = await ticker.ReadAsync(ct);GoTicker.Stop()/StopAsync()dispose timers and complete the channel.- Use fake time providers in tests for deterministic scheduling.
These helpers keep workflow-style orchestrations deterministic across replays by persisting decisions externally.
VersionGaterecords version markers for long-lived change management using optimistic inserts. CallRequire(changeId, minVersion, maxVersion)to retrieve the persisted version or create one deterministically. Supply a custom provider when you need to phase rollouts; concurrent writers that lose the insert returnerror.version.conflictso callers can fallback.DeterministicEffectStorecaptures side-effect results once and replays them. Use a durable implementation ofIDeterministicStateStorein production and the providedInMemoryDeterministicStateStoreinside tests.
var store = new InMemoryDeterministicStateStore();
var gate = new VersionGate(store);
var decision = gate.Require("workflow.stepA", VersionGate.DefaultVersion, maxSupportedVersion: 2);
if (decision.IsFailure)
{
return decision.Error!;
}
switch (decision.Value.Version)
{
case VersionGate.DefaultVersion:
return await RunLegacyAsync(ct);
case 2:
return await RunV2Async(ct);
default:
throw new InvalidOperationException();
}var effectStore = new DeterministicEffectStore(store);
var payload = await effectStore.CaptureAsync("side-effect.payment", async token =>
{
var response = await httpClient.PostAsync("/payments", content, token);
return response.IsSuccessStatusCode
? Result.Ok(await response.Content.ReadFromJsonAsync<Receipt>(cancellationToken: token))
: Result.Fail<Receipt>(Error.From("payment failed", ErrorCodes.Validation));
});
if (payload.IsFailure)
{
return payload.Error!;
}
return payload.Value;