Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 61 additions & 21 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1279,10 +1279,9 @@ public class CallbackConfig
/// </summary>
public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Custom serializer for callback result.
/// </summary>
public ICheckpointSerializer? Serializer { get; set; }
// Note: there is no Serializer property here. Custom serializers are
// supplied via the AOT-safe CreateCallbackAsync(..., ICheckpointSerializer<T>, ...)
// overload, matching the pattern established by StepAsync.
}

/// <summary>
Expand All @@ -1307,14 +1306,14 @@ public class InvokeConfig
public TimeSpan Timeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Custom serializer for the payload.
/// Optional tenant identifier propagated to the chained invocation.
/// Matches the tenantId field on Python/JS/Java InvokeConfig.
/// </summary>
public ICheckpointSerializer? PayloadSerializer { get; set; }
public string? TenantId { get; set; }

/// <summary>
/// Custom serializer for the result.
/// </summary>
public ICheckpointSerializer? ResultSerializer { get; set; }
// Note: payload and result serializers are supplied via the AOT-safe
// InvokeAsync(..., ICheckpointSerializer<TPayload>, ICheckpointSerializer<TResult>, ...)
// overload, matching the pattern established by StepAsync.
}

/// <summary>
Expand Down Expand Up @@ -1429,10 +1428,9 @@ public class CompletionConfig
/// </summary>
public class ChildContextConfig
{
/// <summary>
/// Custom serializer for the child context's return value.
/// </summary>
public ICheckpointSerializer? Serializer { get; set; }
// Note: there is no Serializer property here. Custom serializers are
// supplied via the AOT-safe RunInChildContextAsync(..., ICheckpointSerializer<T>, ...)
// overload, matching the pattern established by StepAsync.

/// <summary>
/// Operation sub-type label for observability (e.g., in test runner output).
Expand Down Expand Up @@ -1473,34 +1471,54 @@ public class WaitForConditionConfig<TState>
public interface IBatchResult<T>
{
/// <summary>
/// All items (succeeded and failed).
/// All items, in original index order.
/// </summary>
IReadOnlyList<IBatchItem<T>> All { get; }

/// <summary>
/// Only successful items.
/// Items whose Status is Succeeded.
/// </summary>
IReadOnlyList<IBatchItem<T>> Succeeded { get; }

/// <summary>
/// Only failed items.
/// Items whose Status is Failed.
/// </summary>
IReadOnlyList<IBatchItem<T>> Failed { get; }

/// <summary>
/// Get all successful results. Throws if any failed.
/// Items still in flight when the batch resolved (CompletionConfig short-circuit).
/// </summary>
IReadOnlyList<IBatchItem<T>> Started { get; }

/// <summary>
/// Get all successful results in original index order. Throws if any failed.
/// </summary>
IReadOnlyList<T> GetResults();

/// <summary>
/// Throw an exception if any item failed.
/// Get all errors from failed items.
/// </summary>
IReadOnlyList<DurableExecutionException> GetErrors();

/// <summary>
/// Throw a single aggregated exception if any item failed.
/// </summary>
void ThrowIfError();

/// <summary>
/// Why the operation completed.
/// True if any item is in the Failed state.
/// </summary>
bool HasFailure { get; }

/// <summary>
/// Why the batch resolved.
/// </summary>
CompletionReason CompletionReason { get; }

int SuccessCount { get; }
int FailureCount { get; }
int StartedCount { get; }
int TotalCount { get; }
}

public interface IBatchItem<T>
Expand All @@ -1511,7 +1529,29 @@ public interface IBatchItem<T>
DurableExecutionException? Error { get; }
}

public enum BatchItemStatus { Succeeded, Failed, Cancelled }
/// <summary>
/// Status of an individual item in a batch result.
/// Mirrors the wire-state observed at the time the batch resolved — items still
/// running when a CompletionConfig short-circuits remain in <see cref="Started"/>.
/// </summary>
public enum BatchItemStatus
{
/// <summary>
/// The branch ran to completion and produced a result.
/// </summary>
Succeeded,

/// <summary>
/// The branch ran to completion and threw.
/// </summary>
Failed,

/// <summary>
/// The branch was still in flight when the batch's CompletionConfig
/// resolved (e.g., FirstSuccessful returned before this branch finished).
/// </summary>
Started
}
public enum CompletionReason { AllCompleted, MinSuccessfulReached, FailureToleranceExceeded }

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,25 @@ internal static class OperationStatuses
public const string Cancelled = "CANCELLED";
public const string Ready = "READY";
public const string Stopped = "STOPPED";
public const string TimedOut = "TIMED_OUT";
}

/// <summary>
/// Wire-format <see cref="Operation.SubType"/> string constants. Subtypes are
/// observability labels mapped from the user-facing context method that
/// produced the operation. The service does not interpret them; downstream
/// consumers (test runner, traces, console) display them as-is.
/// </summary>
internal static class OperationSubTypes
{
public const string Step = "Step";
public const string Wait = "Wait";
public const string Callback = "Callback";
public const string WaitForCallback = "WaitForCallback";
public const string Invoke = "Invoke";
public const string WaitForCondition = "WaitForCondition";
public const string Parallel = "Parallel";
public const string ParallelBranch = "ParallelBranch";
public const string Map = "Map";
public const string MapIteration = "MapIteration";
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private async Task<T> ExecuteFunc(int attemptNumber, CancellationToken cancellat
Id = OperationId,
Type = OperationTypes.Step,
Action = "START",
SubType = "Step",
SubType = OperationSubTypes.Step,
Name = Name
};

Expand All @@ -196,7 +196,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Step,
Action = "SUCCEED",
SubType = "Step",
SubType = OperationSubTypes.Step,
Name = Name,
Payload = SerializeResult(result)
}, cancellationToken);
Expand Down Expand Up @@ -233,7 +233,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Step,
Action = "RETRY",
SubType = "Step",
SubType = OperationSubTypes.Step,
Name = Name,
Error = ToSdkError(ex),
StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds }
Expand All @@ -248,7 +248,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Step,
Action = "FAIL",
SubType = "Step",
SubType = OperationSubTypes.Step,
Name = Name,
Error = ToSdkError(ex)
}, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ await EnqueueAsync(new SdkOperationUpdate
Id = OperationId,
Type = OperationTypes.Wait,
Action = "START",
SubType = "Wait",
SubType = OperationSubTypes.Wait,
Name = Name,
WaitOptions = new SdkWaitOptions { WaitSeconds = _waitSeconds }
}, cancellationToken);
Expand Down
Loading