From 8f6a3e2ff4b59f7e1c49a81cdf4d8d6d385c1a49 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 16:55:49 -0600 Subject: [PATCH 1/8] fix: signal when dt reader stream is ready within wf client start call Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/workflow_runtime.py | 109 ++++++++++++++++-- 1 file changed, 98 insertions(+), 11 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 593e55c68..4b28310ff 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -14,6 +14,7 @@ """ import inspect +import time from functools import wraps from typing import Optional, Sequence, TypeVar, Union @@ -56,6 +57,7 @@ def __init__( maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -86,10 +88,19 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" - daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) - if inp is None: - return fn(daprWfContext) - return fn(daprWfContext, inp) + instance_id = getattr(ctx, 'instance_id', 'unknown') + + try: + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) + if inp is None: + result = fn(daprWfContext) + else: + result = fn(daprWfContext, inp) + return result + except Exception as e: + self._logger.exception(f"Workflow execution failed - " + f"instance_id: {instance_id}, error: {e}") + raise if hasattr(fn, '_workflow_registered'): # whenever a workflow is registered, it has a _dapr_alternate_name attribute @@ -116,10 +127,19 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" - wfActivityContext = WorkflowActivityContext(ctx) - if inp is None: - return fn(wfActivityContext) - return fn(wfActivityContext, inp) + activity_id = getattr(ctx, 'task_id', 'unknown') + + try: + wfActivityContext = WorkflowActivityContext(ctx) + if inp is None: + result = fn(wfActivityContext) + else: + result = fn(wfActivityContext, inp) + return result + except Exception as e: + self._logger.exception(f"Activity execution failed - " + f"task_id: {activity_id}, error: {e}") + raise if hasattr(fn, '_activity_registered'): # whenever an activity is registered, it has a _dapr_alternate_name attribute @@ -138,13 +158,80 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): ) fn.__dict__['_activity_registered'] = True + def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: + """ + Wait for the worker's gRPC stream to become ready to receive work items. + + This method polls the worker's is_worker_ready() method until it returns True + or the timeout is reached. + + Args: + timeout: Maximum time in seconds to wait for the worker to be ready. + Defaults to 30 seconds. + + Returns: + True if the worker's gRPC stream is ready to receive work items, False if timeout. + """ + if not hasattr(self.__worker, 'is_worker_ready'): + return False + + elapsed = 0.0 + poll_interval = 0.1 # 100ms + + while elapsed < timeout: + if self.__worker.is_worker_ready(): + return True + time.sleep(poll_interval) + elapsed += poll_interval + + self._logger.warning( + f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" + ) + return False + def start(self): - """Starts the listening for work items on a background thread.""" - self.__worker.start() + """Starts the listening for work items on a background thread. + + This method waits for the worker's gRPC stream to be fully initialized + before returning, ensuring that workflows can be scheduled immediately + after start() completes. + """ + try: + try: + self.__worker.start() + except Exception as start_error: + self._logger.exception( + f"WorkflowRuntime worker did not start: {start_error}" + ) + raise + + # Verify the worker and its stream reader are ready + if hasattr(self.__worker, 'is_worker_ready'): + try: + is_ready = self.wait_for_worker_ready(timeout=5.0) + if not is_ready: + raise RuntimeError("WorkflowRuntime worker and its stream are not ready") + else: + self._logger.debug("WorkflowRuntime worker is ready and its stream can receive work items") + except Exception as ready_error: + self._logger.exception( + f"WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}" + ) + raise ready_error + else: + self._logger.warning( + "Unable to verify stream readiness. Workflows scheduled immediately may not be received." + ) + except Exception: + raise + def shutdown(self): """Stops the listening for work items on a background thread.""" - self.__worker.stop() + try: + self.__worker.stop() + except Exception: + raise def workflow(self, __fn: Workflow = None, *, name: Optional[str] = None): """Decorator to register a workflow function. From bace1e478c83a723b17ae71cc7257229bb499132 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Thu, 29 Jan 2026 17:02:13 -0600 Subject: [PATCH 2/8] style: appease linter Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/workflow_runtime.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 9def0900b..bd28187f4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -57,7 +57,7 @@ def __init__( maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) - + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -89,7 +89,7 @@ def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): """Responsible to call Workflow function in orchestrationWrapper""" instance_id = getattr(ctx, 'instance_id', 'unknown') - + try: daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) if inp is None: @@ -164,7 +164,7 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" activity_id = getattr(ctx, 'task_id', 'unknown') - + try: wfActivityContext = WorkflowActivityContext(ctx) if inp is None: @@ -210,16 +210,16 @@ def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: """ if not hasattr(self.__worker, 'is_worker_ready'): return False - + elapsed = 0.0 poll_interval = 0.1 # 100ms - + while elapsed < timeout: if self.__worker.is_worker_ready(): return True time.sleep(poll_interval) elapsed += poll_interval - + self._logger.warning( f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" ) @@ -240,7 +240,7 @@ def start(self): f"WorkflowRuntime worker did not start: {start_error}" ) raise - + # Verify the worker and its stream reader are ready if hasattr(self.__worker, 'is_worker_ready'): try: @@ -260,7 +260,7 @@ def start(self): ) except Exception: raise - + def shutdown(self): """Stops the listening for work items on a background thread.""" From 3a88a6b074612e5da5db8cb2bbe2c806df2f6279 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 09:51:19 -0600 Subject: [PATCH 3/8] fix: enable configurability + lint fixes Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/workflow_runtime.py | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index bd28187f4..9f5edb2b4 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -55,8 +55,10 @@ def __init__( maximum_concurrent_activity_work_items: Optional[int] = None, maximum_concurrent_orchestration_work_items: Optional[int] = None, maximum_thread_pool_workers: Optional[int] = None, + worker_ready_timeout: Optional[float] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) + self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout metadata = tuple() if settings.DAPR_API_TOKEN: @@ -98,8 +100,9 @@ def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = result = fn(daprWfContext, inp) return result except Exception as e: - self._logger.exception(f"Workflow execution failed - " - f"instance_id: {instance_id}, error: {e}") + self._logger.exception( + f'Workflow execution failed - instance_id: {instance_id}, error: {e}' + ) raise if hasattr(fn, '_workflow_registered'): @@ -173,8 +176,9 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): result = fn(wfActivityContext, inp) return result except Exception as e: - self._logger.exception(f"Activity execution failed - " - f"task_id: {activity_id}, error: {e}") + self._logger.exception( + f'Activity execution failed - task_id: {activity_id}, error: {e}' + ) raise if hasattr(fn, '_activity_registered'): @@ -197,7 +201,6 @@ def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: """ Wait for the worker's gRPC stream to become ready to receive work items. - This method polls the worker's is_worker_ready() method until it returns True or the timeout is reached. @@ -221,13 +224,12 @@ def wait_for_worker_ready(self, timeout: float = 30.0) -> bool: elapsed += poll_interval self._logger.warning( - f"WorkflowRuntime worker readiness check timed out after {timeout} seconds" + f'WorkflowRuntime worker readiness check timed out after {timeout} seconds' ) return False def start(self): """Starts the listening for work items on a background thread. - This method waits for the worker's gRPC stream to be fully initialized before returning, ensuring that workflows can be scheduled immediately after start() completes. @@ -236,32 +238,31 @@ def start(self): try: self.__worker.start() except Exception as start_error: - self._logger.exception( - f"WorkflowRuntime worker did not start: {start_error}" - ) + self._logger.exception(f'WorkflowRuntime worker did not start: {start_error}') raise # Verify the worker and its stream reader are ready if hasattr(self.__worker, 'is_worker_ready'): try: - is_ready = self.wait_for_worker_ready(timeout=5.0) + is_ready = self.wait_for_worker_ready(timeout=self._worker_ready_timeout) if not is_ready: - raise RuntimeError("WorkflowRuntime worker and its stream are not ready") + raise RuntimeError('WorkflowRuntime worker and its stream are not ready') else: - self._logger.debug("WorkflowRuntime worker is ready and its stream can receive work items") + self._logger.debug( + 'WorkflowRuntime worker is ready and its stream can receive work items' + ) except Exception as ready_error: self._logger.exception( - f"WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}" + f'WorkflowRuntime wait_for_worker_ready() raised exception: {ready_error}' ) raise ready_error else: self._logger.warning( - "Unable to verify stream readiness. Workflows scheduled immediately may not be received." + 'Unable to verify stream readiness. Workflows scheduled immediately may not be received.' ) except Exception: raise - def shutdown(self): """Stops the listening for work items on a background thread.""" try: From e91a923bd9fee04f7523b576f7f0858a17951d88 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 10:44:48 -0600 Subject: [PATCH 4/8] fix(build): add tests and fix bug Signed-off-by: Samantha Coyle --- .../dapr/ext/workflow/logger/logger.py | 3 + .../tests/test_workflow_runtime.py | 63 +++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py index b93e7074f..dd33cab86 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py @@ -32,5 +32,8 @@ def warning(self, msg, *args, **kwargs): def error(self, msg, *args, **kwargs): self._logger.error(msg, *args, **kwargs) + def exception(self, msg, *args, **kwargs): + self._logger.exception(msg, *args, **kwargs) + def critical(self, msg, *args, **kwargs): self._logger.critical(msg, *args, **kwargs) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index bf18cd689..14cb0b5ca 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -171,3 +171,66 @@ def test_decorator_register_optinal_name(self): wanted_activity = ['test_act'] assert listActivities == wanted_activity assert client_act._dapr_alternate_name == 'test_act' + + +class WorkflowRuntimeWorkerReadyTest(unittest.TestCase): + """Tests for wait_for_worker_ready() and start() stream readiness.""" + + def setUp(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + self.runtime = WorkflowRuntime() + + def test_wait_for_worker_ready_returns_false_when_no_is_worker_ready(self): + mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) + del mock_worker.is_worker_ready + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.1)) + + def test_wait_for_worker_ready_returns_true_when_ready(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0)) + mock_worker.is_worker_ready.assert_called() + + def test_wait_for_worker_ready_returns_false_on_timeout(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = False + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertFalse(self.runtime.wait_for_worker_ready(timeout=0.2)) + + def test_start_succeeds_when_worker_ready(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + self.runtime.start() + mock_worker.start.assert_called_once() + mock_worker.is_worker_ready.assert_called() + + def test_start_raises_when_worker_not_ready(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + runtime = WorkflowRuntime(worker_ready_timeout=0.2) + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = False + runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + runtime.start() + self.assertIn('not ready', str(ctx.exception)) + + def test_start_logs_warning_when_no_is_worker_ready(self): + mock_worker = mock.MagicMock(spec=['start', 'stop', '_registry']) + del mock_worker.is_worker_ready + self.runtime._WorkflowRuntime__worker = mock_worker + self.runtime.start() + mock_worker.start.assert_called_once() + + def test_worker_ready_timeout_init(self): + listActivities.clear() + listOrchestrators.clear() + mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() + rt = WorkflowRuntime(worker_ready_timeout=15.0) + self.assertEqual(rt._worker_ready_timeout, 15.0) From a69d9953a9fdcb093caba78b969dc77aa96addb7 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 11:34:55 -0600 Subject: [PATCH 5/8] fix(build): add more tests Signed-off-by: Samantha Coyle --- .../tests/test_workflow_runtime.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 14cb0b5ca..a2f8a87cf 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -18,6 +18,7 @@ from unittest import mock from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext +from dapr.ext.workflow.logger import Logger from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name @@ -26,11 +27,17 @@ class FakeTaskHubGrpcWorker: + def __init__(self): + self._orchestrator_fns = {} + self._activity_fns = {} + def add_named_orchestrator(self, name: str, fn): listOrchestrators.append(name) + self._orchestrator_fns[name] = fn def add_named_activity(self, name: str, fn): listActivities.append(name) + self._activity_fns[name] = fn class WorkflowRuntimeTest(unittest.TestCase): @@ -234,3 +241,31 @@ def test_worker_ready_timeout_init(self): mock.patch('durabletask.worker._Registry', return_value=FakeTaskHubGrpcWorker()).start() rt = WorkflowRuntime(worker_ready_timeout=15.0) self.assertEqual(rt._worker_ready_timeout, 15.0) + + def test_start_raises_when_worker_start_fails(self): + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + mock_worker.start.side_effect = RuntimeError('start failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + self.runtime.start() + self.assertIn('start failed', str(ctx.exception)) + mock_worker.start.assert_called_once() + + def test_start_raises_when_wait_for_worker_ready_raises(self): + mock_worker = mock.MagicMock() + mock_worker.start.return_value = None + mock_worker.is_worker_ready.side_effect = ValueError('ready check failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(ValueError) as ctx: + self.runtime.start() + self.assertIn('ready check failed', str(ctx.exception)) + + def test_shutdown_raises_when_worker_stop_fails(self): + mock_worker = mock.MagicMock() + mock_worker.stop.side_effect = RuntimeError('stop failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with self.assertRaises(RuntimeError) as ctx: + self.runtime.shutdown() + self.assertIn('stop failed', str(ctx.exception)) + From 34a770dc43aa484dfcefdcea950fd8a9e54539ca Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 11:46:35 -0600 Subject: [PATCH 6/8] style: appease linter Signed-off-by: Samantha Coyle --- ext/dapr-ext-workflow/tests/test_workflow_runtime.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index a2f8a87cf..61f8c2126 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -18,7 +18,6 @@ from unittest import mock from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext -from dapr.ext.workflow.logger import Logger from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name @@ -268,4 +267,3 @@ def test_shutdown_raises_when_worker_stop_fails(self): with self.assertRaises(RuntimeError) as ctx: self.runtime.shutdown() self.assertIn('stop failed', str(ctx.exception)) - From 075f491b4ad9e7dedc365f73bfffff25ea564e5a Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 13:45:56 -0600 Subject: [PATCH 7/8] fix(build): add another test Signed-off-by: Samantha Coyle --- ext/dapr-ext-workflow/tests/test_workflow_runtime.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 61f8c2126..bc4d46463 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -201,6 +201,14 @@ def test_wait_for_worker_ready_returns_true_when_ready(self): self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0)) mock_worker.is_worker_ready.assert_called() + def test_wait_for_worker_ready_returns_true_after_poll(self): + """Worker becomes ready on second poll.""" + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.side_effect = [False, True] + self.runtime._WorkflowRuntime__worker = mock_worker + self.assertTrue(self.runtime.wait_for_worker_ready(timeout=1.0)) + self.assertEqual(mock_worker.is_worker_ready.call_count, 2) + def test_wait_for_worker_ready_returns_false_on_timeout(self): mock_worker = mock.MagicMock() mock_worker.is_worker_ready.return_value = False From 810b53ca83917c352068e50a0fb8a5b09af5d3b5 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 3 Feb 2026 14:08:10 -0600 Subject: [PATCH 8/8] test: add even more tests for build to pass lol Signed-off-by: Samantha Coyle --- .../tests/test_workflow_runtime.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index bc4d46463..16eb4946f 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -223,6 +223,29 @@ def test_start_succeeds_when_worker_ready(self): mock_worker.start.assert_called_once() mock_worker.is_worker_ready.assert_called() + def test_start_logs_debug_when_worker_stream_ready(self): + """start() logs at debug when worker and stream are ready.""" + mock_worker = mock.MagicMock() + mock_worker.is_worker_ready.return_value = True + self.runtime._WorkflowRuntime__worker = mock_worker + with mock.patch.object(self.runtime._logger, 'debug') as mock_debug: + self.runtime.start() + mock_debug.assert_called_once() + call_args = mock_debug.call_args[0][0] + self.assertIn('ready', call_args) + self.assertIn('stream', call_args) + + def test_start_logs_exception_when_worker_start_fails(self): + """start() logs exception when worker.start() raises.""" + mock_worker = mock.MagicMock() + mock_worker.start.side_effect = RuntimeError('start failed') + self.runtime._WorkflowRuntime__worker = mock_worker + with mock.patch.object(self.runtime._logger, 'exception') as mock_exception: + with self.assertRaises(RuntimeError): + self.runtime.start() + mock_exception.assert_called_once() + self.assertIn('did not start', mock_exception.call_args[0][0]) + def test_start_raises_when_worker_not_ready(self): listActivities.clear() listOrchestrators.clear()