diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index ff43ecbc4..7a525e52d 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -270,9 +270,11 @@ def __init__( self._next_restart_at: List[float] = [0.0 for _ in self.workers] # Lock to protect process list during concurrent access (monitor thread vs main thread) self._process_lock = threading.Lock() + self._processes_started = False logger.info("TaskHandler initialized") def __enter__(self): + self.start_processes() return self def __exit__(self, exc_type, exc_value, traceback): @@ -286,11 +288,15 @@ def stop_processes(self) -> None: with self._process_lock: self.__stop_task_runner_processes() self.__stop_metrics_provider_process() + self._processes_started = False logger.info("Stopped worker processes...") self.queue.put(None) self.logger_process.terminate() def start_processes(self) -> None: + if self._processes_started: + return + self._processes_started = True logger.info("Starting worker processes...") freeze_support() self._monitor_stop_event.clear() diff --git a/src/conductor/client/workflow/executor/workflow_executor.py b/src/conductor/client/workflow/executor/workflow_executor.py index ba723e54d..220c70133 100644 --- a/src/conductor/client/workflow/executor/workflow_executor.py +++ b/src/conductor/client/workflow/executor/workflow_executor.py @@ -1,7 +1,10 @@ from __future__ import annotations +import logging import uuid from typing import Any, Dict, List, Optional +logger = logging.getLogger(__name__) + from typing_extensions import Self from conductor.client.configuration.configuration import Configuration @@ -106,12 +109,26 @@ def execute(self, name: str, version: Optional[int] = None, workflow_input: Any if domain is not None: request.task_to_domain = {"*": domain} - return self.workflow_client.execute_workflow( + workflow_run = self.workflow_client.execute_workflow( start_workflow_request=request, request_id=request_id, wait_until_task_ref=wait_until_task_ref, wait_for_seconds=wait_for_seconds, ) + if getattr(workflow_run, 'status', None) == 'RUNNING': + try: + full = self.workflow_client.get_workflow(workflow_run.workflow_id, include_tasks=True) + failed = [t for t in (full.tasks or []) if t.status in ('FAILED', 'FAILED_WITH_TERMINAL_ERROR')] + if failed: + t = failed[0] + reason = getattr(t, 'reason_for_incompletion', None) or 'see server logs' + logger.warning( + "Workflow %s returned RUNNING after timeout, but task '%s' has status %s: %s", + workflow_run.workflow_id, t.reference_task_name, t.status, reason, + ) + except Exception: + pass + return workflow_run def remove_workflow(self, workflow_id: str, archive_workflow: Optional[bool] = None) -> None: """Removes the workflow permanently from the system""" diff --git a/tests/unit/automator/test_task_handler_coverage.py b/tests/unit/automator/test_task_handler_coverage.py index b97e00838..dd85af223 100644 --- a/tests/unit/automator/test_task_handler_coverage.py +++ b/tests/unit/automator/test_task_handler_coverage.py @@ -663,12 +663,19 @@ def test_context_manager_enter(self, mock_process_class, mock_import, mock_loggi @patch('conductor.client.automator.task_handler._setup_logging_queue') @patch('importlib.import_module') - def test_context_manager_exit(self, mock_import, mock_logging): + @patch('conductor.client.automator.task_handler.Process') + def test_context_manager_exit(self, mock_process_class, mock_import, mock_logging): """Test context manager __exit__ method.""" mock_queue = Mock() mock_logger_process = Mock() mock_logging.return_value = (mock_logger_process, mock_queue) + mock_process = Mock() + mock_process.terminate = Mock() + mock_process.kill = Mock() + mock_process.is_alive = Mock(return_value=False) + mock_process_class.return_value = mock_process + worker = ClassWorker('test_task') handler = TaskHandler( workers=[worker], @@ -679,10 +686,16 @@ def test_context_manager_exit(self, mock_import, mock_logging): # Override the queue and logger_process with fresh mocks handler.queue = Mock() handler.logger_process = Mock() + handler.logger_process.is_alive = Mock(return_value=False) + handler.metrics_provider_process = Mock() + handler.metrics_provider_process.terminate = Mock() + handler.metrics_provider_process.is_alive = Mock(return_value=False) # Mock terminate on all processes for process in handler.task_runner_processes: process.terminate = Mock() + process.kill = Mock() + process.is_alive = Mock(return_value=False) with handler: pass