Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 22 additions & 25 deletions src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,37 +150,37 @@ public bool IsOutOfOrderMessage(MessageData message)
return false;
}

// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
bool nonExistentInstance = this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount <= 5;

// If the instance does not exist, even if the message has ben dequeued > 5 times, the next check for trying
// to find the corresponding task scheduled message will always fail so we will endlessly abandon the message.
// To avoid this we return early here.
if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5)
{
// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
return false;
}

if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
{
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
return false;
}
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
bool isStaleCheckpoint = this.LastCheckpointTime <= message.TaskMessage.Event.Timestamp;

bool triggeringTaskDoesNotExist = true;
if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
{
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
// this task was scheduled.
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId);
if (mostRecentTaskEvent != null)
{
return false;
}
triggeringTaskDoesNotExist = mostRecentTaskEvent == null;
}

if (message.TaskMessage.Event.EventType == EventType.EventRaised)
Expand All @@ -190,15 +190,12 @@ public bool IsOutOfOrderMessage(MessageData message)
if (requestId != null)
{
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
if (mostRecentTaskEvent != null)
{
return false;
}
triggeringTaskDoesNotExist = mostRecentTaskEvent == null;
}
}

// The message is out of order and cannot be handled by the current session.
return true;
return nonExistentInstance || isStaleCheckpoint || triggeringTaskDoesNotExist;
}

Guid? FindRequestId(string input)
Expand Down
Loading