From 43800b8347ea1479d720ec35b45019a3dd56d02d Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 26 Jan 2026 21:02:46 +0000 Subject: [PATCH 1/4] Add cancellation check for Redis queue workflows Check for job cancellation before each command execution in CommandExecutor. This allows workflows running in Redis queue mode to stop at the next command boundary when the user presses Stop, rather than requiring multiple presses. https://claude.ai/code/session_013VFNJ1Sznugpony3r2Mgxt --- src/workflow/CommandExecutor.py | 32 +++++++++++++++++++++++++++ src/workflow/tasks.py | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 0225c9fc..4346b02f 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -4,12 +4,19 @@ import subprocess import threading from pathlib import Path +from typing import Callable, Optional from .Logger import Logger from .ParameterManager import ParameterManager import sys import importlib.util import json + +class WorkflowCancelled(Exception): + """Raised when a workflow is cancelled by the user.""" + pass + + class CommandExecutor: """ Manages the execution of external shell commands such as OpenMS TOPP tools within a Streamlit application. @@ -24,6 +31,27 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame self.pid_dir = Path(workflow_dir, "pids") self.logger = logger self.parameter_manager = parameter_manager + self._should_stop: Optional[Callable[[], bool]] = None + + def set_cancellation_check(self, should_stop_func: Callable[[], bool]) -> None: + """ + Set a function that checks if the workflow should be cancelled. + + Args: + should_stop_func: A callable that returns True if the workflow should stop. + """ + self._should_stop = should_stop_func + + def _check_cancellation(self) -> None: + """ + Check if the workflow was cancelled and raise WorkflowCancelled if so. + + This is called before each command execution to allow stopping workflows + between commands when running in Redis queue mode. + """ + if self._should_stop and self._should_stop(): + self.logger.log("Workflow cancelled by user") + raise WorkflowCancelled("Workflow was cancelled") def run_multiple_commands( self, commands: list[str] @@ -81,8 +109,12 @@ def run_command(self, command: list[str]) -> bool: command (list[str]): The shell command to execute, provided as a list of strings. Raises: + WorkflowCancelled: If the workflow was cancelled by the user. Exception: If the command execution results in any errors. """ + # Check for cancellation before starting the command + self._check_cancellation() + # Ensure all command parts are strings command = [str(c) for c in command] diff --git a/src/workflow/tasks.py b/src/workflow/tasks.py index 7c283bc6..def51ea4 100644 --- a/src/workflow/tasks.py +++ b/src/workflow/tasks.py @@ -11,6 +11,8 @@ import traceback from pathlib import Path +from .CommandExecutor import WorkflowCancelled + def execute_workflow( workflow_dir: str, @@ -74,6 +76,16 @@ def execute_workflow( executor = CommandExecutor(workflow_path, logger, parameter_manager) executor.pid_dir.mkdir(parents=True, exist_ok=True) + # Set up cancellation check for Redis queue mode + if job is not None: + def should_stop(): + try: + job.refresh() # Get latest status from Redis + return job.is_stopped + except Exception: + return False + executor.set_cancellation_check(should_stop) + _update_progress(job, 0.1, "Starting workflow execution...") # Create workflow instance @@ -118,6 +130,32 @@ def execute_workflow( "message": "Workflow completed successfully" } + except WorkflowCancelled: + # Handle user-initiated cancellation cleanly + try: + log_dir = workflow_path / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]: + log_file = log_dir / log_name + with open(log_file, "a") as f: + f.write("\n\nWorkflow cancelled by user\n") + except Exception: + pass + + # Clean up pid directory + try: + pid_dir = workflow_path / "pids" + shutil.rmtree(pid_dir, ignore_errors=True) + except Exception: + pass + + return { + "success": False, + "workflow_dir": str(workflow_path), + "cancelled": True, + "error": "Workflow cancelled by user" + } + except Exception as e: error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}" From 55b461d0bae19e99dfdfc4a9bb69c10924661046 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:09:07 +0000 Subject: [PATCH 2/4] Revert "Add cancellation check for Redis queue workflows" This reverts commit 43800b8347ea1479d720ec35b45019a3dd56d02d. --- src/workflow/CommandExecutor.py | 32 --------------------------- src/workflow/tasks.py | 38 --------------------------------- 2 files changed, 70 deletions(-) diff --git a/src/workflow/CommandExecutor.py b/src/workflow/CommandExecutor.py index 4346b02f..0225c9fc 100644 --- a/src/workflow/CommandExecutor.py +++ b/src/workflow/CommandExecutor.py @@ -4,19 +4,12 @@ import subprocess import threading from pathlib import Path -from typing import Callable, Optional from .Logger import Logger from .ParameterManager import ParameterManager import sys import importlib.util import json - -class WorkflowCancelled(Exception): - """Raised when a workflow is cancelled by the user.""" - pass - - class CommandExecutor: """ Manages the execution of external shell commands such as OpenMS TOPP tools within a Streamlit application. @@ -31,27 +24,6 @@ def __init__(self, workflow_dir: Path, logger: Logger, parameter_manager: Parame self.pid_dir = Path(workflow_dir, "pids") self.logger = logger self.parameter_manager = parameter_manager - self._should_stop: Optional[Callable[[], bool]] = None - - def set_cancellation_check(self, should_stop_func: Callable[[], bool]) -> None: - """ - Set a function that checks if the workflow should be cancelled. - - Args: - should_stop_func: A callable that returns True if the workflow should stop. - """ - self._should_stop = should_stop_func - - def _check_cancellation(self) -> None: - """ - Check if the workflow was cancelled and raise WorkflowCancelled if so. - - This is called before each command execution to allow stopping workflows - between commands when running in Redis queue mode. - """ - if self._should_stop and self._should_stop(): - self.logger.log("Workflow cancelled by user") - raise WorkflowCancelled("Workflow was cancelled") def run_multiple_commands( self, commands: list[str] @@ -109,12 +81,8 @@ def run_command(self, command: list[str]) -> bool: command (list[str]): The shell command to execute, provided as a list of strings. Raises: - WorkflowCancelled: If the workflow was cancelled by the user. Exception: If the command execution results in any errors. """ - # Check for cancellation before starting the command - self._check_cancellation() - # Ensure all command parts are strings command = [str(c) for c in command] diff --git a/src/workflow/tasks.py b/src/workflow/tasks.py index def51ea4..7c283bc6 100644 --- a/src/workflow/tasks.py +++ b/src/workflow/tasks.py @@ -11,8 +11,6 @@ import traceback from pathlib import Path -from .CommandExecutor import WorkflowCancelled - def execute_workflow( workflow_dir: str, @@ -76,16 +74,6 @@ def execute_workflow( executor = CommandExecutor(workflow_path, logger, parameter_manager) executor.pid_dir.mkdir(parents=True, exist_ok=True) - # Set up cancellation check for Redis queue mode - if job is not None: - def should_stop(): - try: - job.refresh() # Get latest status from Redis - return job.is_stopped - except Exception: - return False - executor.set_cancellation_check(should_stop) - _update_progress(job, 0.1, "Starting workflow execution...") # Create workflow instance @@ -130,32 +118,6 @@ def should_stop(): "message": "Workflow completed successfully" } - except WorkflowCancelled: - # Handle user-initiated cancellation cleanly - try: - log_dir = workflow_path / "logs" - log_dir.mkdir(parents=True, exist_ok=True) - for log_name in ["minimal.log", "commands-and-run-times.log", "all.log"]: - log_file = log_dir / log_name - with open(log_file, "a") as f: - f.write("\n\nWorkflow cancelled by user\n") - except Exception: - pass - - # Clean up pid directory - try: - pid_dir = workflow_path / "pids" - shutil.rmtree(pid_dir, ignore_errors=True) - except Exception: - pass - - return { - "success": False, - "workflow_dir": str(workflow_path), - "cancelled": True, - "error": "Workflow cancelled by user" - } - except Exception as e: error_msg = f"Workflow failed: {str(e)}\n{traceback.format_exc()}" From d4effbab18e76166c078c2b60574861ee41a0e8c Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:14:34 +0000 Subject: [PATCH 3/4] Fix double-click required to stop Redis queue workflows Don't clear the .job_id file immediately after calling cancel_job(). This allows get_workflow_status() to see the "canceled" state and correctly report the workflow as stopped. Previously, clearing the file caused a fallback to PID checking, which showed "running" if commands were still executing. https://claude.ai/code/session_013VFNJ1Sznugpony3r2Mgxt --- src/workflow/WorkflowManager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py index e9601d01..a0eb7f81 100644 --- a/src/workflow/WorkflowManager.py +++ b/src/workflow/WorkflowManager.py @@ -184,7 +184,8 @@ def stop_workflow(self) -> bool: if job_id: success = self._queue_manager.cancel_job(job_id) if success: - self._queue_manager.clear_job_id(self.workflow_dir) + # Don't clear job_id here - let get_workflow_status() see the + # "canceled" state. The file is cleaned up when the job expires. return True # Fallback: stop local process From 49faaaf00f95b97fe9b8acde8c4c627df13806c7 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 29 Jan 2026 21:21:42 +0000 Subject: [PATCH 4/4] Revert "Fix double-click required to stop Redis queue workflows" This reverts commit d4effbab18e76166c078c2b60574861ee41a0e8c. --- src/workflow/WorkflowManager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/workflow/WorkflowManager.py b/src/workflow/WorkflowManager.py index a0eb7f81..e9601d01 100644 --- a/src/workflow/WorkflowManager.py +++ b/src/workflow/WorkflowManager.py @@ -184,8 +184,7 @@ def stop_workflow(self) -> bool: if job_id: success = self._queue_manager.cancel_job(job_id) if success: - # Don't clear job_id here - let get_workflow_status() see the - # "canceled" state. The file is cleaned up when the job expires. + self._queue_manager.clear_job_id(self.workflow_dir) return True # Fallback: stop local process