Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ def get_default_host_address() -> str:
return "localhost:4001"


DEFAULT_GRPC_KEEPALIVE_OPTIONS: tuple[tuple[str, int], ...] = (
("grpc.keepalive_time_ms", 30_000),
("grpc.keepalive_timeout_ms", 10_000),
("grpc.http2.max_pings_without_data", 0),
("grpc.keepalive_permit_without_calls", 1),
)


def _merge_grpc_options(
user_options: Optional[Sequence[tuple[str, Any]]],
defaults: Sequence[tuple[str, Any]] = DEFAULT_GRPC_KEEPALIVE_OPTIONS,
) -> list[tuple[str, Any]]:
"""Merge user gRPC options with defaults. User options take precedence."""
merged = dict(defaults)
if user_options:
merged.update(dict(user_options))
return list(merged.items())


def get_grpc_channel(
host_address: Optional[str],
secure_channel: bool = False,
Expand Down Expand Up @@ -81,10 +100,13 @@ def get_grpc_channel(
host_address = host_address[len(protocol) :]
break

merged_options = _merge_grpc_options(options)
Comment thread
CasperGN marked this conversation as resolved.
Outdated
if secure_channel:
channel = grpc.secure_channel(host_address, grpc.ssl_channel_credentials(), options=options)
channel = grpc.secure_channel(
host_address, grpc.ssl_channel_credentials(), options=merged_options
)
else:
channel = grpc.insecure_channel(host_address, options=options)
channel = grpc.insecure_channel(host_address, options=merged_options)

# Apply interceptors ONLY if they exist
if interceptors:
Expand Down
121 changes: 48 additions & 73 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,13 @@ async def _async_run_loop(self):
current_stub = None
current_reader_thread = None
conn_retry_count = 0
conn_max_retry_delay = 60
conn_max_retry_delay = 15

def create_fresh_connection():
nonlocal current_channel, current_stub, conn_retry_count
if current_channel:
try:
current_channel.close()
except Exception:
pass
# Don't call channel.close() — in-flight activity RPCs on the
# old stub may still reference the channel from another thread.
# The old channel is GC'd once all references are released.
Comment thread
CasperGN marked this conversation as resolved.
Outdated
current_channel = None
current_stub = None
try:
Expand All @@ -417,31 +415,20 @@ def create_fresh_connection():

def invalidate_connection():
nonlocal current_channel, current_stub, current_reader_thread
# Cancel the response stream first to signal the reader thread to stop
if self._response_stream is not None:
try:
if hasattr(self._response_stream, "call"):
self._response_stream.call.cancel() # type: ignore
else:
self._response_stream.cancel() # type: ignore
except Exception as e:
self._logger.warning(f"Error cancelling response stream: {e}")
self._response_stream = None

# Wait for the reader thread to finish
if current_reader_thread is not None:
current_reader_thread.join(timeout=1)
current_reader_thread = None

# Close the channel
if current_channel:
try:
current_channel.close()
except Exception:
pass
# Null out references so the next iteration creates a fresh connection.
# Do NOT call channel.close() here — in-flight activity RPCs
# (CompleteActivityTask) may still be using the stub on another
# thread. Closing the channel concurrently causes segfaults in the
# gRPC C extension. The old channel is GC'd once all references
# (including captured stub refs in activity threads) are released.
current_channel = None
self._current_channel = None
current_stub = None
self._response_stream = None

if current_reader_thread is not None:
current_reader_thread.join(timeout=5)
current_reader_thread = None

def should_invalidate_connection(rpc_error):
error_code = rpc_error.code() # type: ignore
Expand All @@ -451,6 +438,7 @@ def should_invalidate_connection(rpc_error):
grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAUTHENTICATED,
grpc.StatusCode.ABORTED,
grpc.StatusCode.INTERNAL, # RST_STREAM from proxy means connection is dead
}
return error_code in connection_level_errors

Expand Down Expand Up @@ -532,7 +520,11 @@ def stream_reader():
break
# Other RPC errors - put in queue for async loop to handle
self._logger.warning(
f"Stream reader: RPC error (code={rpc_error.code()}): {rpc_error}"
"Stream reader: RPC error (code=%s): %s",
rpc_error.code(),
rpc_error.details()
if hasattr(rpc_error, "details")
else rpc_error,
)
break
except Exception as stream_error:
Expand Down Expand Up @@ -654,31 +646,24 @@ def stream_reader():
if should_invalidate:
invalidate_connection()
error_code = rpc_error.code() # type: ignore
error_details = str(rpc_error)
error_detail = (
rpc_error.details() if hasattr(rpc_error, "details") else str(rpc_error)
)

if error_code == grpc.StatusCode.CANCELLED:
self._logger.info(f"Disconnected from {self._host_address}")
break
elif error_code == grpc.StatusCode.UNAVAILABLE:
# Check if this is a connection timeout scenario
if (
"Timeout occurred" in error_details
or "Failed to connect to remote host" in error_details
):
self._logger.warning(
f"Connection timeout to {self._host_address}: {error_details} - will retry with fresh connection"
)
else:
self._logger.warning(
f"The sidecar at address {self._host_address} is unavailable: {error_details} - will continue retrying"
)
elif should_invalidate:
self._logger.warning(
f"Connection-level gRPC error ({error_code}): {rpc_error} - resetting connection"
"Connection error (%s): %s — resetting connection",
error_code,
error_detail,
)
else:
self._logger.warning(
f"Application-level gRPC error ({error_code}): {rpc_error}"
"gRPC error (%s): %s",
Comment thread
CasperGN marked this conversation as resolved.
Outdated
error_code,
error_detail,
)
except RuntimeError as ex:
# RuntimeError often indicates asyncio loop issues (e.g., "cannot schedule new futures after shutdown")
Expand Down Expand Up @@ -744,16 +729,8 @@ def stop(self):
return

self._logger.info("Stopping gRPC worker...")
if self._response_stream is not None:
try:
if hasattr(self._response_stream, "call"):
self._response_stream.call.cancel() # type: ignore
else:
self._response_stream.cancel() # type: ignore
except Exception as e:
self._logger.warning(f"Error cancelling response stream: {e}")
self._shutdown.set()
# Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up
# Close the channel — propagates cancellation to all streams and cleans up resources
if self._current_channel is not None:
try:
self._current_channel.close()
Expand All @@ -778,32 +755,30 @@ def stop(self):

# TODO: This should be removed in the future as we do handle grpc errs
def _handle_grpc_execution_error(self, rpc_error: grpc.RpcError, request_type: str):
"""Handle a gRPC execution error during shutdown or benign condition."""
# During shutdown or if the instance was terminated, the channel may be close
# or the instance may no longer be recognized by the sidecar. Treat these as benign
# to reduce noisy logging when shutting down.
"""Handle a gRPC execution error during shutdown or connection reset."""
details = str(rpc_error).lower()
benign_errors = {
# These errors are transient — the sidecar will re-dispatch the work item.
transient_errors = {
grpc.StatusCode.CANCELLED,
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.UNKNOWN,
grpc.StatusCode.INTERNAL,
}
if (
self._shutdown.is_set()
and rpc_error.code() in benign_errors
or (
"unknown instance id/task id combo" in details
or "channel closed" in details
or "locally cancelled by application" in details
)
):
self._logger.debug(
f"Ignoring gRPC {request_type} execution error during shutdown/benign condition: {rpc_error}"
is_transient = rpc_error.code() in transient_errors
is_benign = (
"unknown instance id/task id combo" in details
or "channel closed" in details
or "locally cancelled by application" in details
)
if is_transient or is_benign or self._shutdown.is_set():
self._logger.warning(
Comment thread
CasperGN marked this conversation as resolved.
"Could not deliver %s result (%s): %s — sidecar will re-dispatch",
request_type,
rpc_error.code(),
rpc_error.details() if hasattr(rpc_error, "details") else rpc_error,
)
else:
self._logger.exception(
f"Failed to execute gRPC {request_type} execution error: {rpc_error}"
)
self._logger.exception(f"Failed to deliver {request_type} result: {rpc_error}")

def _execute_orchestrator(
self,
Expand Down
37 changes: 37 additions & 0 deletions tests/test_shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
Comment thread
CasperGN marked this conversation as resolved.
Outdated
Comment thread
CasperGN marked this conversation as resolved.
Outdated

from durabletask.internal.shared import (
DEFAULT_GRPC_KEEPALIVE_OPTIONS,
_merge_grpc_options,
)


class TestMergeGrpcOptions:
def test_user_options_take_precedence(self):
"""User-supplied options override defaults with the same key."""
user_options = [
("grpc.keepalive_time_ms", 60_000),
("grpc.custom_option", 42),
]
result = _merge_grpc_options(user_options)
result_dict = dict(result)

# User override should win
assert result_dict["grpc.keepalive_time_ms"] == 60_000
# User-only option should be present
assert result_dict["grpc.custom_option"] == 42
# Non-overridden defaults should still be present
assert result_dict["grpc.keepalive_timeout_ms"] == 10_000
assert result_dict["grpc.http2.max_pings_without_data"] == 0
assert result_dict["grpc.keepalive_permit_without_calls"] == 1

def test_defaults_used_when_no_user_options(self):
"""When user passes an empty sequence, all defaults are returned."""
result = _merge_grpc_options([])
assert result == list(DEFAULT_GRPC_KEEPALIVE_OPTIONS)

def test_none_user_options(self):
"""When user passes None, all defaults are returned."""
result = _merge_grpc_options(None)
assert result == list(DEFAULT_GRPC_KEEPALIVE_OPTIONS)
Loading