Skip to content

Commit 5dfc57f

Browse files
committed
Match other rewind implementations better
1 parent 9ae8aa0 commit 5dfc57f

2 files changed

Lines changed: 743 additions & 2 deletions

File tree

durabletask/worker.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1443,13 +1443,27 @@ def _build_rewind_result(
14431443
# failed results so the orchestration can replay successfully.
14441444
all_events = list(old_events) + list(new_events)
14451445

1446+
# Extract the executionRewound event from new_events so we can
1447+
# read its parentExecutionId (set when this is a sub-orchestration
1448+
# being rewound by its parent).
1449+
rewind_event: pb.ExecutionRewoundEvent | None = None
1450+
for e in new_events:
1451+
if e.HasField("executionRewound"):
1452+
rewind_event = e.executionRewound
1453+
break
1454+
1455+
# Generate a new execution ID for the rewound execution.
1456+
new_execution_id = uuid.uuid4().hex
1457+
14461458
# First pass: collect the task-scheduled IDs that correspond to
1447-
# failed activities so we can remove the matching taskScheduled
1448-
# events in the second pass.
1459+
# failed activities / sub-orchestrations so we can remove the
1460+
# matching taskScheduled events in the second pass.
14491461
failed_task_ids: set[int] = set()
14501462
for event in all_events:
14511463
if event.HasField("taskFailed"):
14521464
failed_task_ids.add(event.taskFailed.taskScheduledId)
1465+
elif event.HasField("subOrchestrationInstanceFailed"):
1466+
failed_task_ids.add(event.subOrchestrationInstanceFailed.taskScheduledId)
14531467

14541468
# Second pass: build the clean history.
14551469
clean_history: list[pb.HistoryEvent] = []
@@ -1462,6 +1476,23 @@ def _build_rewind_result(
14621476
continue
14631477
if event.HasField("executionCompleted"):
14641478
continue
1479+
1480+
# Modify the executionStarted event: assign a fresh
1481+
# execution ID and, for sub-orchestrations, update the
1482+
# parent's execution ID so it matches the parent's new run.
1483+
if event.HasField("executionStarted"):
1484+
event_copy = pb.HistoryEvent()
1485+
event_copy.CopyFrom(event)
1486+
event_copy.executionStarted.orchestrationInstance.executionId.CopyFrom(
1487+
ph.get_string_value(new_execution_id))
1488+
if (rewind_event is not None
1489+
and rewind_event.HasField("parentExecutionId")
1490+
and rewind_event.parentExecutionId.value):
1491+
event_copy.executionStarted.parentInstance.orchestrationInstance.executionId.CopyFrom(
1492+
rewind_event.parentExecutionId)
1493+
clean_history.append(event_copy)
1494+
continue
1495+
14651496
clean_history.append(event)
14661497

14671498
rewind_action = pb.RewindOrchestrationAction(newHistory=clean_history)

0 commit comments

Comments
 (0)