Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Test/DurableTask.Core.Tests/TaskOrchestrationContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ public async Task ScheduleTask_WithFailure_ShouldPropagateException()
}
}

[TestMethod]
public void GetOpenTasksSummary_WithNoOpenTasks_ReturnsEmptyString()
{
string summary = context.GetOpenTasksSummary();
Assert.AreEqual(string.Empty, summary);
}

[TestMethod]
public void GetOpenTasksSummary_WithManyOpenTasks_UsesBoundedSummary()
{
for (int i = 0; i < 100; i++)
{
_ = context.ScheduleTask<int>(new string('A', 100), "1.0", i);
}

Assert.AreEqual(100, context.OpenTaskCount);

string summary = context.GetOpenTasksSummary();

Assert.IsTrue(summary.Length <= 1024, "Summary should be capped to the configured maximum length.");
Assert.IsTrue(summary.Contains("more task(s))"), "Summary should indicate omitted tasks when capped.");
}

private class MockTaskOrchestrationContext : TaskOrchestrationContext
{
public List<ScheduledTaskInfo> ScheduledTasks { get; } = new List<ScheduledTaskInfo>();
Expand Down
29 changes: 26 additions & 3 deletions src/DurableTask.Core/Logging/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,19 @@ void IEventSourceEvent.WriteEventSource() =>
/// </summary>
internal class OrchestrationExecuted : StructuredLogEvent, IEventSourceEvent
{
public OrchestrationExecuted(OrchestrationInstance instance, string name, int actionCount)
public OrchestrationExecuted(
OrchestrationInstance instance,
string name,
int actionCount,
int openTaskCount,
string openTaskNames)
{
this.InstanceId = instance.InstanceId;
this.ExecutionId = instance.ExecutionId ?? string.Empty;
this.Name = name;
this.ActionCount = actionCount;
this.OpenTaskCount = openTaskCount;
this.OpenTaskNames = openTaskNames ?? string.Empty;
}

[StructuredLogField]
Expand All @@ -949,21 +956,37 @@ public OrchestrationExecuted(OrchestrationInstance instance, string name, int ac
[StructuredLogField]
public int ActionCount { get; }

[StructuredLogField]
public int OpenTaskCount { get; }

[StructuredLogField]
public string OpenTaskNames { get; }

public override EventId EventId => new EventId(
EventIds.OrchestrationExecuted,
nameof(EventIds.OrchestrationExecuted));

public override LogLevel Level => LogLevel.Information;

protected override string CreateLogMessage() =>
$"{this.InstanceId}: Orchestration '{this.Name}' awaited and scheduled {this.ActionCount} durable operation(s).";
protected override string CreateLogMessage()
{
string message = $"{this.InstanceId}: Orchestration '{this.Name}' awaited and scheduled {this.ActionCount} durable operation(s).";
if (this.OpenTaskCount > 0)
{
message += $" {this.OpenTaskCount} pending task(s): {this.OpenTaskNames}";
}

return message;
}

void IEventSourceEvent.WriteEventSource() =>
StructuredEventSource.Log.OrchestrationExecuted(
this.InstanceId,
this.ExecutionId,
this.Name,
this.ActionCount,
this.OpenTaskCount,
this.OpenTaskNames,
Utils.AppName,
Utils.PackageVersion);
}
Expand Down
9 changes: 7 additions & 2 deletions src/DurableTask.Core/Logging/LogHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,19 @@ internal void OrchestrationExecuting(OrchestrationInstance instance, string name
/// <param name="instance">The orchestration instance that was executed.</param>
/// <param name="name">The name of the orchestration.</param>
/// <param name="actions">The actions taken by the orchestration</param>
/// <param name="openTaskCount">The number of pending tasks awaiting completion.</param>
/// <param name="openTaskNames">A summary of the pending task names.</param>
internal void OrchestrationExecuted(
OrchestrationInstance instance,
string name,
IReadOnlyList<OrchestratorAction> actions)
IReadOnlyList<OrchestratorAction> actions,
int openTaskCount,
string openTaskNames)
{
if (this.IsStructuredLoggingEnabled)
{
this.WriteStructuredLog(new LogEvents.OrchestrationExecuted(instance, name, actions.Count));
this.WriteStructuredLog(new LogEvents.OrchestrationExecuted(
instance, name, actions.Count, openTaskCount, openTaskNames));
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/DurableTask.Core/Logging/StructuredEventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ internal void OrchestrationExecuted(
string ExecutionId,
string Name,
int ActionCount,
int OpenTaskCount,
string OpenTaskNames,
string AppName,
string ExtensionVersion)
{
Expand All @@ -508,6 +510,8 @@ internal void OrchestrationExecuted(
ExecutionId,
Name,
ActionCount,
OpenTaskCount,
OpenTaskNames,
AppName,
ExtensionVersion);
}
Expand Down
4 changes: 4 additions & 0 deletions src/DurableTask.Core/OrchestrationExecutionCursor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,9 @@ public OrchestrationExecutionCursor(
public TaskOrchestrationExecutor OrchestrationExecutor { get; }

public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }

public int OpenTaskCount { get; set; }

public string OpenTaskNames { get; set; } = string.Empty;
}
}
14 changes: 14 additions & 0 deletions src/DurableTask.Core/OrchestratorExecutionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ public class OrchestratorExecutionResult
[JsonProperty("customStatus")]
public string? CustomStatus { get; set; }

/// <summary>
/// The number of tasks that the orchestration is currently waiting on.
/// This is used for diagnostic purposes and is not serialized.
/// </summary>
[JsonIgnore]
internal int OpenTaskCount { get; set; }

/// <summary>
/// A summary of the open tasks the orchestration is waiting on (e.g., "GetData#3, Timer#5").
/// This is used for diagnostic purposes and is not serialized.
/// </summary>
[JsonIgnore]
internal string OpenTaskNames { get; set; } = string.Empty;

/// <summary>
/// Creates an orchestrator failure result with a specified message and exception.
/// </summary>
Expand Down
72 changes: 72 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace DurableTask.Core
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Command;
Expand All @@ -30,6 +31,8 @@ namespace DurableTask.Core

internal class TaskOrchestrationContext : OrchestrationContext
{
private const int MaxOpenTaskSummaryEntries = 32;
private const int MaxOpenTaskSummaryLength = 1024;
private readonly IDictionary<int, OpenTaskInfo> openTasks;
private readonly IDictionary<int, OrchestratorAction> orchestratorActionsMap;
private OrchestrationCompleteOrchestratorAction continueAsNew;
Expand Down Expand Up @@ -75,6 +78,75 @@ public TaskOrchestrationContext(

public bool HasOpenTasks => this.openTasks.Count > 0;

internal int OpenTaskCount => this.openTasks.Count;

internal string GetOpenTasksSummary()
{
int openTaskCount = this.openTasks.Count;
if (openTaskCount == 0)
{
return string.Empty;
}

var summaryBuilder = new StringBuilder(Math.Min(openTaskCount * 16, MaxOpenTaskSummaryLength));
int includedEntries = 0;
bool summaryTruncatedByLength = false;

foreach (KeyValuePair<int, OpenTaskInfo> openTask in this.openTasks)
{
if (includedEntries >= MaxOpenTaskSummaryEntries)
{
break;
}

string entry = $"{openTask.Value.Name ?? "Timer"}#{openTask.Key}";
int separatorLength = includedEntries > 0 ? 2 : 0;
int requiredLength = separatorLength + entry.Length;

if (summaryBuilder.Length + requiredLength > MaxOpenTaskSummaryLength)
{
summaryTruncatedByLength = true;
break;
}

if (separatorLength > 0)
{
summaryBuilder.Append(", ");
}

summaryBuilder.Append(entry);
includedEntries++;
}

int omittedTaskCount = openTaskCount - includedEntries;
if (omittedTaskCount > 0 || summaryTruncatedByLength)
{
string suffix = $"... (+{omittedTaskCount} more task(s))";
int separatorLength = summaryBuilder.Length > 0 ? 2 : 0;
int requiredLength = separatorLength + suffix.Length;

if (summaryBuilder.Length + requiredLength > MaxOpenTaskSummaryLength)
{
int maxPrefixLength = MaxOpenTaskSummaryLength - requiredLength;
if (maxPrefixLength < 0)
{
return suffix.Substring(0, MaxOpenTaskSummaryLength);
}

summaryBuilder.Length = Math.Min(summaryBuilder.Length, maxPrefixLength);
}

if (separatorLength > 0)
{
summaryBuilder.Append(", ");
}

summaryBuilder.Append(suffix);
}

return summaryBuilder.ToString();
}
Comment thread
solankisamir marked this conversation as resolved.

internal void ClearPendingActions()
{
this.orchestratorActionsMap.Clear();
Expand Down
20 changes: 16 additions & 4 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,15 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
}
}

int openTaskCount = workItem.Cursor?.OpenTaskCount ?? 0;
string openTaskNames = workItem.Cursor?.OpenTaskNames ?? string.Empty;

this.logHelper.OrchestrationExecuted(
runtimeState.OrchestrationInstance!,
runtimeState.Name,
decisions);
decisions,
openTaskCount,
openTaskNames);
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
Expand Down Expand Up @@ -825,10 +830,13 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
});

var result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
IEnumerable<OrchestratorAction> decisions = result?.Actions ?? Enumerable.Empty<OrchestratorAction>();
var decisions = (result?.Actions ?? Enumerable.Empty<OrchestratorAction>()).ToList();
runtimeState.Status = result?.CustomStatus;

return new OrchestrationExecutionCursor(runtimeState, taskOrchestration, executor, decisions);
var cursor = new OrchestrationExecutionCursor(runtimeState, taskOrchestration, executor, decisions);
cursor.OpenTaskCount = result?.OpenTaskCount ?? 0;
cursor.OpenTaskNames = result?.OpenTaskNames ?? string.Empty;
return cursor;
Comment thread
solankisamir marked this conversation as resolved.
}

async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem)
Expand Down Expand Up @@ -860,8 +868,12 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
});

var result = dispatchContext.GetProperty<OrchestratorExecutionResult>();
cursor.LatestDecisions = result?.Actions ?? Enumerable.Empty<OrchestratorAction>();
var decisions = (result?.Actions ?? Enumerable.Empty<OrchestratorAction>()).ToList();
cursor.RuntimeState.Status = result?.CustomStatus;

cursor.LatestDecisions = decisions;
cursor.OpenTaskCount = result?.OpenTaskCount ?? 0;
cursor.OpenTaskNames = result?.OpenTaskNames ?? string.Empty;
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ void ProcessEvents(IEnumerable<HistoryEvent> events)
{
Actions = this.context.OrchestratorActions,
CustomStatus = this.taskOrchestration.GetStatus(),
OpenTaskCount = this.context.OpenTaskCount,
OpenTaskNames = this.context.GetOpenTasksSummary(),
};
}
finally
Expand Down
Loading