diff --git a/docs/workflow-message-queue.md b/docs/workflow-message-queue.md new file mode 100644 index 000000000..cef36c09f --- /dev/null +++ b/docs/workflow-message-queue.md @@ -0,0 +1,146 @@ +# Workflow Message Queue (WMQ) + +Send messages to a running workflow and process them inside the workflow using the +`PULL_WORKFLOW_MESSAGES` system task. + +## Server requirement + +WMQ must be enabled on the Conductor server: + +```properties +conductor.workflow-message-queue.enabled=true +``` + +--- + +## Sending a message + +After starting (or executing) a workflow you can push any JSON-serialisable dict +to it using `executor.send_message` or `workflow_client.send_message`. + +```python +from conductor.client.configuration.configuration import Configuration +from conductor.client.http.models import StartWorkflowRequest +from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor + +config = Configuration() # reads CONDUCTOR_SERVER_URL / CONDUCTOR_AUTH_TOKEN from env +executor = WorkflowExecutor(config) + +# --- start a workflow that has a PULL_WORKFLOW_MESSAGES task in it --- +workflow_id = executor.start_workflow( + StartWorkflowRequest(name="order_processing", input={"orderId": "ORD-42"}) +) + +# --- send a message to the running workflow --- +message_id = executor.send_message( + workflow_id, + {"event": "payment_confirmed", "amount": 99.99, "currency": "USD"}, +) +print(f"Message enqueued: {message_id}") +``` + +You can call `send_message` multiple times; each call returns a unique UUID. + +```python +# Send a batch of status updates +for status in ["PICKED", "SHIPPED", "OUT_FOR_DELIVERY"]: + executor.send_message(workflow_id, {"status": status}) +``` + +--- + +## Defining a workflow that receives messages + +Use `PullWorkflowMessagesTask` inside your workflow definition to consume the queue. + +```python +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.task.pull_workflow_messages_task import PullWorkflowMessagesTask +from conductor.client.workflow.task.simple_task import SimpleTask + +# Pull up to 5 messages at a time +pull = PullWorkflowMessagesTask(task_ref_name="pull_messages", batch_size=5) + +process = SimpleTask( + task_def_name="process_message_worker", + task_reference_name="process_message", +) +# Pass pulled messages to the next task via input parameter references +process.input_parameters["messages"] = "${pull_messages.output.messages}" + +wf = ( + ConductorWorkflow(executor=executor, name="order_processing", version=1) + .add(pull) + .add(process) +) + +wf.register(overwrite=True) +``` + +### Task output shape + +When messages are available the `PULL_WORKFLOW_MESSAGES` task output looks like: + +```json +{ + "messages": [ + { + "id": "f3c2a1b0-...", + "workflowId": "", + "payload": { "event": "payment_confirmed", "amount": 99.99 }, + "receivedAt": "2024-01-01T12:00:00Z" + } + ], + "count": 1 +} +``` + +Reference individual fields in subsequent tasks: + +```python +next_task.input_parameters["firstMessage"] = "${pull_messages.output.messages[0].payload}" +``` + +--- + +## Using the low-level client directly + +If you prefer the `WorkflowClient` directly: + +```python +from conductor.client.orkes_clients import OrkesClients + +clients = OrkesClients(config) +workflow_client = clients.get_workflow_client() + +message_id = workflow_client.send_message( + workflow_id, + {"type": "notification", "text": "Hello from outside the workflow"}, +) +``` + +--- + +## Error handling + +| HTTP status | Reason | What to do | +|-------------|--------|------------| +| `404 Not Found` | Workflow ID does not exist | Verify the workflow was started successfully | +| `409 Conflict` | Workflow is not `RUNNING` | Check workflow status before sending | +| `429 Too Many Requests` | Queue is at capacity (default 1 000 messages) | Back off and retry, or increase `conductor.workflow-message-queue.maxQueueSize` | + +```python +from conductor.client.http.rest import ApiException + +try: + executor.send_message(workflow_id, {"ping": True}) +except ApiException as e: + if e.status == 404: + print("Workflow not found") + elif e.status == 409: + print("Workflow is not running") + elif e.status == 429: + print("Queue full — back off and retry") + else: + raise +``` diff --git a/src/conductor/client/http/api/workflow_resource_api.py b/src/conductor/client/http/api/workflow_resource_api.py index 063104b04..8e01c9fcf 100644 --- a/src/conductor/client/http/api/workflow_resource_api.py +++ b/src/conductor/client/http/api/workflow_resource_api.py @@ -3178,4 +3178,94 @@ def execute_workflow_with_return_strategy_with_http_info(self, body, name, versi _return_http_data_only=params.get('_return_http_data_only'), _preload_content=params.get('_preload_content', True), _request_timeout=params.get('_request_timeout'), + collection_formats=collection_formats) + + def send_workflow_message(self, body, workflow_id, **kwargs): # noqa: E501 + """Push a message into a running workflow's message queue (WMQ). # noqa: E501 + + This method makes a synchronous HTTP request by default. To make an + asynchronous HTTP request, please pass async_req=True + >>> thread = api.send_workflow_message(body, workflow_id, async_req=True) + >>> result = thread.get() + + :param async_req bool + :param dict(str, object) body: Arbitrary JSON payload (required) + :param str workflow_id: (required) + :return: str — the UUID assigned to the pushed message + If the method is called asynchronously, + returns the request thread. + """ + kwargs['_return_http_data_only'] = True + if kwargs.get('async_req'): + return self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501 + else: + (data) = self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501 + return data + + def send_workflow_message_with_http_info(self, body, workflow_id, **kwargs): # noqa: E501 + """Push a message into a running workflow's message queue (WMQ). # noqa: E501 + + :param async_req bool + :param dict(str, object) body: Arbitrary JSON payload (required) + :param str workflow_id: (required) + :return: str — the UUID assigned to the pushed message + """ + + all_params = ['body', 'workflow_id'] # noqa: E501 + all_params.append('async_req') + all_params.append('_return_http_data_only') + all_params.append('_preload_content') + all_params.append('_request_timeout') + + params = locals() + for key, val in six.iteritems(params['kwargs']): + if key not in all_params: + raise TypeError( + "Got an unexpected keyword argument '%s'" + " to method send_workflow_message" % key + ) + params[key] = val + del params['kwargs'] + + if ('body' not in params or params['body'] is None): + raise ValueError("Missing the required parameter `body` when calling `send_workflow_message`") # noqa: E501 + if ('workflow_id' not in params or params['workflow_id'] is None): + raise ValueError("Missing the required parameter `workflow_id` when calling `send_workflow_message`") # noqa: E501 + + collection_formats = {} + + path_params = {} + if 'workflow_id' in params: + path_params['workflowId'] = params['workflow_id'] # noqa: E501 + + query_params = [] + header_params = {} + form_params = [] + local_var_files = {} + + body_params = None + if 'body' in params: + body_params = params['body'] + + header_params['Accept'] = self.api_client.select_header_accept( + ['text/plain']) # noqa: E501 + header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501 + ['application/json']) # noqa: E501 + + auth_settings = ['api_key'] # noqa: E501 + + return self.api_client.call_api( + '/workflow/{workflowId}/messages', 'POST', + path_params, + query_params, + header_params, + body=body_params, + post_params=form_params, + files=local_var_files, + response_type='str', # noqa: E501 + auth_settings=auth_settings, + async_req=params.get('async_req'), + _return_http_data_only=params.get('_return_http_data_only'), + _preload_content=params.get('_preload_content', True), + _request_timeout=params.get('_request_timeout'), collection_formats=collection_formats) \ No newline at end of file diff --git a/src/conductor/client/http/models/__init__.py b/src/conductor/client/http/models/__init__.py index 377cadcd1..cc2c52103 100644 --- a/src/conductor/client/http/models/__init__.py +++ b/src/conductor/client/http/models/__init__.py @@ -66,4 +66,5 @@ from conductor.client.http.models.circuit_breaker_transition_response import CircuitBreakerTransitionResponse from conductor.client.http.models.signal_response import SignalResponse, TaskStatus from conductor.client.http.models.authentication_config import AuthenticationConfig -from conductor.client.http.models.tag_object import TagObject \ No newline at end of file +from conductor.client.http.models.tag_object import TagObject +from conductor.client.http.models.workflow_message import WorkflowMessage \ No newline at end of file diff --git a/src/conductor/client/http/models/workflow_message.py b/src/conductor/client/http/models/workflow_message.py new file mode 100644 index 000000000..3b78153a2 --- /dev/null +++ b/src/conductor/client/http/models/workflow_message.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass, field +from typing import Dict, Optional + + +@dataclass +class WorkflowMessage: + """Represents a message pushed into a running workflow's queue (WMQ). + + Attributes: + id: UUID assigned by the server on push. + workflow_id: The workflow instance that owns this message. + payload: Arbitrary JSON payload supplied by the caller. + received_at: ISO-8601 UTC timestamp set at ingestion time. + """ + + id: Optional[str] = field(default=None) + workflow_id: Optional[str] = field(default=None) + payload: Optional[Dict[str, object]] = field(default=None) + received_at: Optional[str] = field(default=None) + + swagger_types = { + 'id': 'str', + 'workflow_id': 'str', + 'payload': 'dict(str, object)', + 'received_at': 'str', + } + + attribute_map = { + 'id': 'id', + 'workflow_id': 'workflowId', + 'payload': 'payload', + 'received_at': 'receivedAt', + } + + def to_dict(self) -> dict: + result = {} + for attr, _ in self.swagger_types.items(): + value = getattr(self, attr) + if value is not None: + result[self.attribute_map[attr]] = value + return result diff --git a/src/conductor/client/orkes/orkes_workflow_client.py b/src/conductor/client/orkes/orkes_workflow_client.py index bba497658..24472260e 100644 --- a/src/conductor/client/orkes/orkes_workflow_client.py +++ b/src/conductor/client/orkes/orkes_workflow_client.py @@ -215,3 +215,17 @@ def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate, kwargs["wait_for_seconds"] = wait_for_seconds return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs) + + def send_message(self, workflow_id: str, message: Dict[str, object]) -> str: + """Push a message into the message queue of a running workflow (WMQ). + + Requires conductor.workflow-message-queue.enabled=true on the server. + + Args: + workflow_id: The running workflow instance ID. + message: Arbitrary JSON-serialisable dict to deliver to the workflow. + + Returns: + The UUID string assigned to the message by the server. + """ + return self.workflowResourceApi.send_workflow_message(message, workflow_id) diff --git a/src/conductor/client/workflow/executor/workflow_executor.py b/src/conductor/client/workflow/executor/workflow_executor.py index ba723e54d..20074de09 100644 --- a/src/conductor/client/workflow/executor/workflow_executor.py +++ b/src/conductor/client/workflow/executor/workflow_executor.py @@ -283,6 +283,25 @@ def signal_async(self, workflow_id: str, status: str, body: Dict[str, Any]) -> N body=body ) + def send_message(self, workflow_id: str, message: Dict[str, Any]) -> str: + """Push a message into the message queue of a running workflow (WMQ). + + The workflow must have a PULL_WORKFLOW_MESSAGES task to consume messages. + Requires conductor.workflow-message-queue.enabled=true on the server. + + Args: + workflow_id: The running workflow instance ID. + message: Arbitrary JSON-serialisable dict to deliver to the workflow. + + Returns: + The UUID string assigned to the message by the server. + + Raises: + ApiException: 404 if the workflow is not found, 409 if not in RUNNING state, + 429 if the queue is at capacity. + """ + return self.workflow_client.send_message(workflow_id, message) + def __get_task_result(self, task_id: str, workflow_id: str, task_output: Dict[str, Any], status: str) -> TaskResult: return TaskResult( workflow_instance_id=workflow_id, diff --git a/src/conductor/client/workflow/task/pull_workflow_messages_task.py b/src/conductor/client/workflow/task/pull_workflow_messages_task.py new file mode 100644 index 000000000..fc9c5e4fb --- /dev/null +++ b/src/conductor/client/workflow/task/pull_workflow_messages_task.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing_extensions import Self + +from conductor.client.workflow.task.task import TaskInterface +from conductor.client.workflow.task.task_type import TaskType + + +class PullWorkflowMessagesTask(TaskInterface): + """Consume messages from the workflow's message queue (WMQ). + + When messages are available the task completes with: + output.messages — list of WorkflowMessage objects + output.count — number of messages returned + + When the queue is empty the task stays IN_PROGRESS and is re-evaluated + after ~1 second (non-blocking polling behavior). + + Args: + task_ref_name: Unique task reference name within the workflow. + batch_size: Maximum number of messages to dequeue per execution (default 1, + server cap is typically 100). + """ + + def __init__(self, task_ref_name: str, batch_size: int = 1) -> Self: + super().__init__( + task_reference_name=task_ref_name, + task_type=TaskType.PULL_WORKFLOW_MESSAGES, + ) + self.input_parameters["batchSize"] = batch_size diff --git a/src/conductor/client/workflow/task/task_type.py b/src/conductor/client/workflow/task/task_type.py index 38ebb16ad..da9755534 100644 --- a/src/conductor/client/workflow/task/task_type.py +++ b/src/conductor/client/workflow/task/task_type.py @@ -40,3 +40,4 @@ class TaskType(str, Enum): LLM_SEARCH_EMBEDDINGS = "LLM_SEARCH_EMBEDDINGS" LIST_MCP_TOOLS = "LIST_MCP_TOOLS" CALL_MCP_TOOL = "CALL_MCP_TOOL" + PULL_WORKFLOW_MESSAGES = "PULL_WORKFLOW_MESSAGES" diff --git a/src/conductor/client/workflow_client.py b/src/conductor/client/workflow_client.py index 4e3e61a60..fe619662f 100644 --- a/src/conductor/client/workflow_client.py +++ b/src/conductor/client/workflow_client.py @@ -3,7 +3,7 @@ from typing import Optional, List, Dict from conductor.client.http.models import WorkflowRun, SkipTaskRequest, WorkflowStatus, \ - ScrollableSearchResultWorkflowSummary, SignalResponse + ScrollableSearchResultWorkflowSummary, SignalResponse, WorkflowMessage from conductor.client.http.models.correlation_ids_search_request import CorrelationIdsSearchRequest from conductor.client.http.models.rerun_workflow_request import RerunWorkflowRequest from conductor.client.http.models.start_workflow_request import StartWorkflowRequest @@ -120,3 +120,18 @@ def update_variables(self, workflow_id: str, variables: Optional[Dict[str, objec def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate, wait_until_task_ref_names: Optional[List[str]] = None, wait_for_seconds: Optional[int] = None) -> WorkflowRun: pass + + @abstractmethod + def send_message(self, workflow_id: str, message: Dict[str, object]) -> str: + """Push a message into the message queue of a running workflow (WMQ). + + Requires conductor.workflow-message-queue.enabled=true on the server. + + Args: + workflow_id: The running workflow instance ID. + message: Arbitrary JSON-serialisable dict to deliver to the workflow. + + Returns: + The UUID string assigned to the message by the server. + """ + pass