Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 6d94454

Browse files
authored
Reconnect to GRPC when the sidecar restarts (#45)
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent a7ff4a1 commit 6d94454

1 file changed

Lines changed: 23 additions & 6 deletions

File tree

durabletask/worker.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -636,12 +636,15 @@ def stream_reader():
636636

637637
if self._shutdown.is_set():
638638
self._logger.info(f"Disconnected from {self._host_address}")
639-
else:
640-
self._logger.info("Work item stream ended normally")
641-
# When stream ends (SHUTDOWN_SENTINEL received), always break outer loop
642-
# The stream reader has exited, so we should exit too, not reconnect
643-
# This matches Go SDK behavior where stream ending causes the listener to exit
644-
break
639+
break
640+
641+
# Stream ended without shutdown being requested - reconnect
642+
self._logger.info(
643+
f"Work item stream ended. Will attempt to reconnect to {self._host_address}..."
644+
)
645+
invalidate_connection()
646+
# Fall through to the top of the outer loop, which will
647+
# create a fresh connection (with retry/backoff if needed)
645648
except grpc.RpcError as rpc_error:
646649
# Check shutdown first - if shutting down, exit immediately
647650
if self._shutdown.is_set():
@@ -847,6 +850,13 @@ def _execute_orchestrator(
847850
stub.CompleteOrchestratorTask(res)
848851
except grpc.RpcError as rpc_error: # type: ignore
849852
self._handle_grpc_execution_error(rpc_error, "orchestrator")
853+
except ValueError:
854+
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
855+
self._logger.debug(
856+
f"Could not deliver orchestrator response for '{req.instanceId}': "
857+
f"channel was closed (likely due to reconnection). "
858+
f"The sidecar will re-dispatch this work item."
859+
)
850860
except Exception as ex:
851861
self._logger.exception(
852862
f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}"
@@ -897,6 +907,13 @@ def _execute_activity(
897907
stub.CompleteActivityTask(res)
898908
except grpc.RpcError as rpc_error: # type: ignore
899909
self._handle_grpc_execution_error(rpc_error, "activity")
910+
except ValueError:
911+
# gRPC raises ValueError when the underlying channel has been closed (e.g. during reconnection).
912+
self._logger.debug(
913+
f"Could not deliver activity response for '{req.name}#{req.taskId}' of "
914+
f"orchestration ID '{instance_id}': channel was closed (likely due to "
915+
f"reconnection). The sidecar will re-dispatch this work item."
916+
)
900917
except Exception as ex:
901918
self._logger.exception(
902919
f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}"

0 commit comments

Comments
 (0)