Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
40302fe
Implement enhanced worker crash handling in JobDispatcher
Nov 28, 2025
7a5bb52
agentknob changes for worker crash
Nov 28, 2025
c05c057
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Nov 28, 2025
f2f4138
enhanced worker crash reporting for listner and added telemetry
Dec 3, 2025
95aedec
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 3, 2025
f0a3af4
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 3, 2025
e18c663
removed feature flag telemetry
Dec 4, 2025
425e29b
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 4, 2025
af10a40
cleanup the code
Dec 5, 2025
1d3c339
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 5, 2025
1944d29
refactored the chnages
Dec 10, 2025
eedd2e4
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 10, 2025
ea914c3
refactored the changes
Dec 10, 2025
50fe7d7
refactored the changes
Dec 10, 2025
8e617cf
refactored the changes
Dec 10, 2025
f2699ce
refactored the changes
Dec 10, 2025
e67f638
refactored the changes
Dec 10, 2025
0448035
added retry mechanism
Dec 11, 2025
b56bfc8
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 12, 2025
3a093df
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 15, 2025
7342cee
added try-catch
Dec 16, 2025
be0f03a
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 16, 2025
3c5cd62
Merge branch 'users/raujaiswal/worker-crash' of https://github.com/mi…
Dec 16, 2025
641d8a0
added tracepoint in telemetry
Dec 16, 2025
7aee989
Merge branch 'master' into users/raujaiswal/worker-crash
raujaiswal Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Agent.Listener/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,21 @@ private async Task InitializeRuntimeFeatures()
var enhancedLoggingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.UseEnhancedLogging", Trace);
bool enhancedLoggingEnabled = string.Equals(enhancedLoggingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase);

// Check enhanced worker crash handling feature flag
var enhancedWorkerCrashHandlingFlag = await featureFlagProvider.GetFeatureFlagAsync(HostContext, "DistributedTask.Agent.EnhancedWorkerCrashHandling", Trace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This FF needs to be defined in TFS or else. this might cause exceptions in server. See ICM 723787941

bool enhancedWorkerCrashHandlingEnabled = string.Equals(enhancedWorkerCrashHandlingFlag?.EffectiveState, "On", StringComparison.OrdinalIgnoreCase);

Trace.Info($"Enhanced logging feature flag is {(enhancedLoggingEnabled ? "enabled" : "disabled")}");
// Set the result on TraceManager - this automatically switches all trace sources
traceManager.SetEnhancedLoggingEnabled(enhancedLoggingEnabled);

// Ensure child processes (worker/plugin) pick up enhanced logging via knob
Environment.SetEnvironmentVariable("AZP_USE_ENHANCED_LOGGING", enhancedLoggingEnabled ? "true" : null);

Trace.Info($"Enhanced worker crash handling feature flag is {(enhancedWorkerCrashHandlingEnabled ? "enabled" : "disabled")}");
// Ensure child processes (worker/plugin) pick up enhanced crash handling via knob
Environment.SetEnvironmentVariable("AZP_ENHANCED_WORKER_CRASH_HANDLING", enhancedWorkerCrashHandlingEnabled ? "true" : null);

Trace.Info("Runtime features initialization completed successfully");
}
catch (Exception ex)
Expand Down
181 changes: 144 additions & 37 deletions src/Agent.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
await LogWorkerProcessUnhandledException(message, detailInfo, agentCertManager.SkipServerCertificateValidation);

// Publish worker crash telemetry for Kusto analysis
var telemetryPublisher = HostContext.GetService<IWorkerCrashTelemetryPublisher>();
await telemetryPublisher.PublishWorkerCrashTelemetryAsync(HostContext, message.JobId, returnCode, "200");
}

TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
Expand All @@ -641,8 +645,20 @@ await processChannel.SendAsync(
await renewJobRequest;

Trace.Info($"Job request completion initiated - Completing job request for job: {message.JobId}");
// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);

if (ShouldUseEnhancedCrashHandling(message, returnCode))
{
// Direct plan event reporting for Plan v8+ worker crashes
await ReportJobCompletionEventAsync(message, result, agentCertManager.SkipServerCertificateValidation);
Trace.Info("Plan event reporting executed successfully for worker crash");
}
else
{
// Standard completion for Plan v7 or normal Plan v8+ scenarios, or when enhanced handling is disabled
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
Trace.Info("Standard completion executed successfully");
}

Trace.Info("Job request completion completed");

// print out unhandled exception happened in worker after we complete job request.
Expand Down Expand Up @@ -971,55 +987,146 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
throw new AggregateException(exceptions);
}

// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
// Determines if enhanced crash handling should be used for Plan v8+ worker crashes
private bool ShouldUseEnhancedCrashHandling(Pipelines.AgentJobRequestMessage message, int returnCode)
{
try
if (!AgentKnobs.EnhancedWorkerCrashHandling.GetValue(UtilKnobValueContext.Instance()).AsBoolean())
return false;

bool isPlanV8Plus = PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent);
bool isWorkerCrash = !TaskResultUtil.IsValidReturnCode(returnCode);

return isPlanV8Plus && isWorkerCrash;
}

// Creates a job server connection with proper URL normalization for OnPremises servers
private async Task<VssConnection> CreateJobServerConnectionAsync(Pipelines.AgentJobRequestMessage message, bool skipServerCertificateValidation = false)
{
Trace.Info("Creating job server connection");

var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;

Trace.Verbose($"Initial connection details [JobId:{message.JobId}, OriginalUrl:{jobServerUrl}]");

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));
try
{
Uri urlResult = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out urlResult))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
Trace.Info($"URL replacement for OnPremises server - Original: {jobServerUrl}, New: {urlResult}");
jobServerUrl = urlResult;
}
}
catch (InvalidOperationException ex)
{
Trace.Error(ex);
}
catch (UriFormatException ex)
{
Trace.Error(ex);
}
}

var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation);
await jobServer.ConnectAsync(jobConnection);
Trace.Info($"Job server connection established successfully");

return jobConnection;
}

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
// Reports job completion to server via plan event (similar to how worker reports)
// Used for Plan v8+ scenarios where listener needs to notify server of job completion
private async Task ReportJobCompletionEventAsync(Pipelines.AgentJobRequestMessage message, TaskResult result, bool skipServerCertificateValidation = false)
{
Trace.Info($"Plan event reporting initiated - Sending job completion event to server");

// Make sure SystemConnection Url match Config Url base for OnPremises server
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
try
{
using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation))
{
try
var jobServer = HostContext.GetService<IJobServer>();
// Create job completed event (similar to worker)
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result);

// Send plan event with retry logic (similar to worker pattern)
int retryLimit = 5;
var exceptions = new List<Exception>();

while (retryLimit-- > 0)
{
Uri result = null;
Uri configUri = new Uri(_agentSetting.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), jobServerUrl.PathAndQuery, out result))
try
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
jobServerUrl = result;
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
Trace.Info($"Plan event reporting completed successfully [JobId:{message.JobId}, Result:{result}]");
return;
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
return; // No point retrying
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException during plan event reporting for job {message.JobId}");
Trace.Error(ex);
return; // No point retrying
}
catch (Exception ex)
{
Trace.Error(ex);
exceptions.Add(ex);
}

// delay 5 seconds before next retry
Trace.Info($"Plan event reporting retry delay - Waiting 5 seconds before retry {5 - retryLimit}/5");
await Task.Delay(TimeSpan.FromSeconds(5));
}
catch (InvalidOperationException ex)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
catch (UriFormatException ex)

// If we get here, all retries failed
Trace.Warning($"Plan event reporting failed after all retries [JobId:{message.JobId}, TotalExceptions:{exceptions.Count}]");
foreach (var ex in exceptions)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
}
}
catch (Exception ex)
{
Trace.Error("Critical error during plan event reporting setup");
Trace.Error(ex);
}
}

var jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, trace: Trace, skipServerCertificateValidation);
await jobServer.ConnectAsync(jobConnection);
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
ArgUtil.NotNull(timeline, nameof(timeline));
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
jobRecord.ErrorCount++;
jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage });
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
// log an error issue to job level timeline record
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Maintainability", "CA2000:Dispose objects before losing scope", MessageId = "jobServer")]
private async Task LogWorkerProcessUnhandledException(Pipelines.AgentJobRequestMessage message, string errorMessage, bool skipServerCertificateValidation = false)
{
try
{
using (var jobConnection = await CreateJobServerConnectionAsync(message, skipServerCertificateValidation))
{
var jobServer = HostContext.GetService<IJobServer>();
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
ArgUtil.NotNull(timeline, nameof(timeline));
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
jobRecord.ErrorCount++;
jobRecord.Issues.Add(new Issue() { Type = IssueType.Error, Message = errorMessage });
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
}
}
catch (SocketException ex)
{
Expand Down
50 changes: 50 additions & 0 deletions src/Agent.Listener/Telemetry/WorkerCrashTelemetryPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Newtonsoft.Json;

namespace Microsoft.VisualStudio.Services.Agent.Listener.Telemetry
{
[ServiceLocator(Default = typeof(WorkerCrashTelemetryPublisher))]
public interface IWorkerCrashTelemetryPublisher : IAgentService
{
Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode, string tracePoint);
}

public sealed class WorkerCrashTelemetryPublisher : AgentService, IWorkerCrashTelemetryPublisher
{
public async Task PublishWorkerCrashTelemetryAsync(IHostContext hostContext, Guid jobId, int exitCode, string tracePoint)
{
try
{
var telemetryPublisher = hostContext.GetService<IAgenetListenerTelemetryPublisher>();

var telemetryData = new Dictionary<string, object>
{
["JobId"] = jobId.ToString(),
["ExitCode"] = exitCode.ToString(),
["TracePoint"] = tracePoint
};

var command = new Command("telemetry", "publish")
{
Data = JsonConvert.SerializeObject(telemetryData)
};
command.Properties.Add("area", "AzurePipelinesAgent");
command.Properties.Add("feature", "WorkerCrash");

await telemetryPublisher.PublishEvent(hostContext, command);
Trace.Info($"Published worker crash telemetry for job {jobId} with exit code {exitCode}");
}
catch (Exception ex)
{
Trace.Warning($"Failed to publish worker crash telemetry: {ex}");
}
}
}
}
6 changes: 6 additions & 0 deletions src/Agent.Sdk/Knob/AgentKnobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,12 @@ public class AgentKnobs
new EnvironmentKnobSource("FAIL_JOB_WHEN_AGENT_DIES"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob EnhancedWorkerCrashHandling = new Knob(
nameof(EnhancedWorkerCrashHandling),
"If true, enables enhanced worker crash handling with forced completion for Plan v8+ scenarios where worker crashes cannot send completion events",
new EnvironmentKnobSource("AZP_ENHANCED_WORKER_CRASH_HANDLING"),
new BuiltInDefaultKnobSource("false"));

public static readonly Knob AllowWorkDirectoryRepositories = new Knob(
nameof(AllowWorkDirectoryRepositories),
"Allows repositories to be checked out below work directory level on self hosted agents.",
Expand Down
Loading
Loading