From e337df76e9395c9b1dd48ec07350cc146de55b55 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Fri, 6 Feb 2026 17:40:14 -0600 Subject: [PATCH 1/5] fix: use correct type + use proper close method Signed-off-by: Samantha Coyle --- durabletask/worker.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 0ec2f66..89732aa 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta from threading import Event, Thread from types import GeneratorType -from typing import Any, Generator, Optional, Sequence, TypeVar, Union +from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar import grpc from google.protobuf import empty_pb2 @@ -30,7 +30,8 @@ # If `opentelemetry-sdk` is available, enable the tracer try: from opentelemetry import trace - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + from opentelemetry.trace.propagation.tracecontext import \ + TraceContextTextMapPropagator otel_propagator = TraceContextTextMapPropagator() otel_tracer = trace.get_tracer(__name__) @@ -283,7 +284,7 @@ class TaskHubGrpcWorker: activity function. """ - _response_stream: Optional[grpc.Future] = None + _response_stream: Optional[Iterator[Any]] = None _interceptors: Optional[list[shared.ClientInterceptor]] = None def __init__( @@ -418,10 +419,10 @@ def create_fresh_connection(): def invalidate_connection(): nonlocal current_channel, current_stub, current_reader_thread - # Cancel the response stream first to signal the reader thread to stop + # Close the response stream first to signal the reader thread to stop if self._response_stream is not None: try: - self._response_stream.cancel() + self._response_stream.close() except Exception: pass self._response_stream = None @@ -740,7 +741,10 @@ def stop(self): self._logger.info("Stopping gRPC worker...") if self._response_stream is not None: - self._response_stream.cancel() + try: + self._response_stream.close() + except Exception as e: + self._logger.exception(f"Error stopping response stream: {e}") self._shutdown.set() # Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up if self._current_channel is not None: From 566781ee11e0eb754be456d68e09c75321893501 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Fri, 6 Feb 2026 17:45:16 -0600 Subject: [PATCH 2/5] style: appease linter Signed-off-by: Samantha Coyle --- durabletask/worker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 89732aa..e10538c 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta from threading import Event, Thread from types import GeneratorType -from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar +from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar, Union import grpc from google.protobuf import empty_pb2 @@ -30,8 +30,7 @@ # If `opentelemetry-sdk` is available, enable the tracer try: from opentelemetry import trace - from opentelemetry.trace.propagation.tracecontext import \ - TraceContextTextMapPropagator + from opentelemetry.trace.propagation.tracecontext TraceContextTextMapPropagator otel_propagator = TraceContextTextMapPropagator() otel_tracer = trace.get_tracer(__name__) From 35e5b8d6809c93fb6da9a94d93dafe9011c29d16 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 16 Mar 2026 08:48:15 -0500 Subject: [PATCH 3/5] fix: proper handle and fail for resource exhausted due to large payloads Signed-off-by: Samantha Coyle --- durabletask/worker.py | 66 +++++++++++++++++++++++++++++- tests/durabletask/test_registry.py | 5 ++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index e10538c..c1a0aec 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -201,6 +201,18 @@ class ActivityNotRegisteredError(ValueError): pass +def _is_message_too_large(rpc_error: grpc.RpcError) -> bool: + """Return True if a RESOURCE_EXHAUSTED error is due to a message exceeding the gRPC size limit. + + RESOURCE_EXHAUSTED is also used for rate limiting / quota errors, which are transient and + should not be treated the same as a permanent message-size violation. + """ + return ( + rpc_error.code() == grpc.StatusCode.RESOURCE_EXHAUSTED + and "received message larger than max" in (rpc_error.details() or "").lower() + ) + + # TODO: refactor this to closely match durabletask-go/client/worker_grpc.go instead of this. class TaskHubGrpcWorker: """A gRPC-based worker for processing durable task orchestrations and activities. @@ -841,7 +853,34 @@ def _execute_orchestrator( try: stub.CompleteOrchestratorTask(res) except grpc.RpcError as rpc_error: # type: ignore - self._handle_grpc_execution_error(rpc_error, "orchestrator") + if _is_message_too_large(rpc_error): + # Response is too large to deliver - fail the orchestration immediately. + # This can only be fixed with infrastructure changes (increasing gRPC max message size). + self._logger.error( + f"Orchestrator response for '{req.instanceId}' is too large to deliver " + f"(RESOURCE_EXHAUSTED). Failing the orchestration task: {rpc_error.details()}" + ) + failure_actions = [ + ph.new_complete_orchestration_action( + -1, pb.ORCHESTRATION_STATUS_FAILED, "", + ph.new_failure_details(RuntimeError( + f"Orchestrator response exceeds gRPC max message size: {rpc_error.details()}" + )) + ) + ] + failure_res = pb.OrchestratorResponse( + instanceId=req.instanceId, + actions=failure_actions, + completionToken=completionToken, + ) + try: + stub.CompleteOrchestratorTask(failure_res) + except Exception as ex: + self._logger.exception( + f"Failed to deliver orchestrator failure response for '{req.instanceId}': {ex}" + ) + else: + self._handle_grpc_execution_error(rpc_error, "orchestrator") except Exception as ex: self._logger.exception( f"Failed to deliver orchestrator response for '{req.instanceId}' to sidecar: {ex}" @@ -889,7 +928,30 @@ def _execute_activity( try: stub.CompleteActivityTask(res) except grpc.RpcError as rpc_error: # type: ignore - self._handle_grpc_execution_error(rpc_error, "activity") + if _is_message_too_large(rpc_error): + # Result is too large to deliver - fail the activity immediately. + # This can only be fixed with infrastructure changes (increasing gRPC max message size). + self._logger.error( + f"Activity '{req.name}#{req.taskId}' result is too large to deliver " + f"(RESOURCE_EXHAUSTED). Failing the activity task: {rpc_error.details()}" + ) + failure_res = pb.ActivityResponse( + instanceId=instance_id, + taskId=req.taskId, + failureDetails=ph.new_failure_details(RuntimeError( + f"Activity result exceeds gRPC max message size: {rpc_error.details()}" + )), + completionToken=completionToken, + ) + try: + stub.CompleteActivityTask(failure_res) + except Exception as ex: + self._logger.exception( + f"Failed to deliver activity failure response for '{req.name}#{req.taskId}' " + f"of orchestration ID '{instance_id}': {ex}" + ) + else: + self._handle_grpc_execution_error(rpc_error, "activity") except Exception as ex: self._logger.exception( f"Failed to deliver activity response for '{req.name}#{req.taskId}' of orchestration ID '{instance_id}' to sidecar: {ex}" diff --git a/tests/durabletask/test_registry.py b/tests/durabletask/test_registry.py index b5fcfa9..f5cf319 100644 --- a/tests/durabletask/test_registry.py +++ b/tests/durabletask/test_registry.py @@ -165,6 +165,7 @@ def activity2(ctx, input): assert registry.get_activity(name1) is activity1 assert registry.get_activity(name2) is activity2 + def test_registry_add_named_versioned_orchestrators(): """Test adding versioned orchestrators.""" registry = worker._Registry() @@ -179,7 +180,9 @@ def orchestrator3(ctx, input): return "two" registry.add_named_orchestrator(name="orchestrator", fn=orchestrator1, version_name="v1") - registry.add_named_orchestrator(name="orchestrator", fn=orchestrator2, version_name="v2", is_latest=True) + registry.add_named_orchestrator( + name="orchestrator", fn=orchestrator2, version_name="v2", is_latest=True + ) registry.add_named_orchestrator(name="orchestrator", fn=orchestrator3, version_name="v3") orquestrator, version = registry.get_orchestrator(name="orchestrator") From 61b049cfd498d8822fd11b98d949bc964d6fbe43 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 16 Mar 2026 09:00:24 -0500 Subject: [PATCH 4/5] style: address feedback Signed-off-by: Samantha Coyle --- durabletask/worker.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index aeca44a..9f176cf 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -31,7 +31,7 @@ # If `opentelemetry-sdk` is available, enable the tracer try: from opentelemetry import trace - from opentelemetry.trace.propagation.tracecontext TraceContextTextMapPropagator + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator otel_propagator = TraceContextTextMapPropagator() otel_tracer = trace.get_tracer(__name__) @@ -207,9 +207,13 @@ def _is_message_too_large(rpc_error: grpc.RpcError) -> bool: RESOURCE_EXHAUSTED is also used for rate limiting / quota errors, which are transient and should not be treated the same as a permanent message-size violation. """ + if rpc_error.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: + return False + details = (rpc_error.details() or "").lower() return ( - rpc_error.code() == grpc.StatusCode.RESOURCE_EXHAUSTED - and "received message larger than max" in (rpc_error.details() or "").lower() + "message larger than max" in details + or "received message larger than max" in details + or "sent message larger than max" in details ) From 8afc5c09c632ed712c90daab68887d81748ea57c Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 16 Mar 2026 11:05:49 -0500 Subject: [PATCH 5/5] fix: address feedback Signed-off-by: Samantha Coyle --- durabletask/worker.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/durabletask/worker.py b/durabletask/worker.py index 9f176cf..9aebec9 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -202,19 +202,12 @@ class ActivityNotRegisteredError(ValueError): def _is_message_too_large(rpc_error: grpc.RpcError) -> bool: - """Return True if a RESOURCE_EXHAUSTED error is due to a message exceeding the gRPC size limit. + """Return True if the gRPC error is RESOURCE_EXHAUSTED. - RESOURCE_EXHAUSTED is also used for rate limiting / quota errors, which are transient and - should not be treated the same as a permanent message-size violation. + All RESOURCE_EXHAUSTED errors are treated as a permanent message-size violation + so the sidecar always receives an acknowledgment and avoids infinite redelivery. """ - if rpc_error.code() != grpc.StatusCode.RESOURCE_EXHAUSTED: - return False - details = (rpc_error.details() or "").lower() - return ( - "message larger than max" in details - or "received message larger than max" in details - or "sent message larger than max" in details - ) + return rpc_error.code() == grpc.StatusCode.RESOURCE_EXHAUSTED # TODO: refactor this to closely match durabletask-go/client/worker_grpc.go instead of this.