Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions docs/workflow-message-queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Workflow Message Queue (WMQ)

Send messages to a running workflow and process them inside the workflow using the
`PULL_WORKFLOW_MESSAGES` system task.

## Server requirement

WMQ must be enabled on the Conductor server:

```properties
conductor.workflow-message-queue.enabled=true
```

---

## Sending a message

After starting (or executing) a workflow you can push any JSON-serialisable dict
to it using `executor.send_message` or `workflow_client.send_message`.

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import StartWorkflowRequest
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor

config = Configuration() # reads CONDUCTOR_SERVER_URL / CONDUCTOR_AUTH_TOKEN from env
executor = WorkflowExecutor(config)

# --- start a workflow that has a PULL_WORKFLOW_MESSAGES task in it ---
workflow_id = executor.start_workflow(
StartWorkflowRequest(name="order_processing", input={"orderId": "ORD-42"})
)

# --- send a message to the running workflow ---
message_id = executor.send_message(
workflow_id,
{"event": "payment_confirmed", "amount": 99.99, "currency": "USD"},
)
print(f"Message enqueued: {message_id}")
```

You can call `send_message` multiple times; each call returns a unique UUID.

```python
# Send a batch of status updates
for status in ["PICKED", "SHIPPED", "OUT_FOR_DELIVERY"]:
executor.send_message(workflow_id, {"status": status})
```

---

## Defining a workflow that receives messages

Use `PullWorkflowMessagesTask` inside your workflow definition to consume the queue.

```python
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.pull_workflow_messages_task import PullWorkflowMessagesTask
from conductor.client.workflow.task.simple_task import SimpleTask

# Pull up to 5 messages at a time
pull = PullWorkflowMessagesTask(task_ref_name="pull_messages", batch_size=5)

process = SimpleTask(
task_def_name="process_message_worker",
task_reference_name="process_message",
)
# Pass pulled messages to the next task via input parameter references
process.input_parameters["messages"] = "${pull_messages.output.messages}"

wf = (
ConductorWorkflow(executor=executor, name="order_processing", version=1)
.add(pull)
.add(process)
)

wf.register(overwrite=True)
```

### Task output shape

When messages are available the `PULL_WORKFLOW_MESSAGES` task output looks like:

```json
{
"messages": [
{
"id": "f3c2a1b0-...",
"workflowId": "<workflow-instance-id>",
"payload": { "event": "payment_confirmed", "amount": 99.99 },
"receivedAt": "2024-01-01T12:00:00Z"
}
],
"count": 1
}
```

Reference individual fields in subsequent tasks:

```python
next_task.input_parameters["firstMessage"] = "${pull_messages.output.messages[0].payload}"
```

---

## Using the low-level client directly

If you prefer the `WorkflowClient` directly:

```python
from conductor.client.orkes_clients import OrkesClients

clients = OrkesClients(config)
workflow_client = clients.get_workflow_client()

message_id = workflow_client.send_message(
workflow_id,
{"type": "notification", "text": "Hello from outside the workflow"},
)
```

---

## Error handling

| HTTP status | Reason | What to do |
|-------------|--------|------------|
| `404 Not Found` | Workflow ID does not exist | Verify the workflow was started successfully |
| `409 Conflict` | Workflow is not `RUNNING` | Check workflow status before sending |
| `429 Too Many Requests` | Queue is at capacity (default 1 000 messages) | Back off and retry, or increase `conductor.workflow-message-queue.maxQueueSize` |

```python
from conductor.client.http.rest import ApiException

try:
executor.send_message(workflow_id, {"ping": True})
except ApiException as e:
if e.status == 404:
print("Workflow not found")
elif e.status == 409:
print("Workflow is not running")
elif e.status == 429:
print("Queue full — back off and retry")
else:
raise
```
90 changes: 90 additions & 0 deletions src/conductor/client/http/api/workflow_resource_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3178,4 +3178,94 @@ def execute_workflow_with_return_strategy_with_http_info(self, body, name, versi
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def send_workflow_message(self, body, workflow_id, **kwargs): # noqa: E501
"""Push a message into a running workflow's message queue (WMQ). # noqa: E501

This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.send_workflow_message(body, workflow_id, async_req=True)
>>> result = thread.get()

:param async_req bool
:param dict(str, object) body: Arbitrary JSON payload (required)
:param str workflow_id: (required)
:return: str — the UUID assigned to the pushed message
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501
else:
(data) = self.send_workflow_message_with_http_info(body, workflow_id, **kwargs) # noqa: E501
return data

def send_workflow_message_with_http_info(self, body, workflow_id, **kwargs): # noqa: E501
"""Push a message into a running workflow's message queue (WMQ). # noqa: E501

:param async_req bool
:param dict(str, object) body: Arbitrary JSON payload (required)
:param str workflow_id: (required)
:return: str — the UUID assigned to the pushed message
"""

all_params = ['body', 'workflow_id'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method send_workflow_message" % key
)
params[key] = val
del params['kwargs']

if ('body' not in params or params['body'] is None):
raise ValueError("Missing the required parameter `body` when calling `send_workflow_message`") # noqa: E501
if ('workflow_id' not in params or params['workflow_id'] is None):
raise ValueError("Missing the required parameter `workflow_id` when calling `send_workflow_message`") # noqa: E501

collection_formats = {}

path_params = {}
if 'workflow_id' in params:
path_params['workflowId'] = params['workflow_id'] # noqa: E501

query_params = []
header_params = {}
form_params = []
local_var_files = {}

body_params = None
if 'body' in params:
body_params = params['body']

header_params['Accept'] = self.api_client.select_header_accept(
['text/plain']) # noqa: E501
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['application/json']) # noqa: E501

auth_settings = ['api_key'] # noqa: E501

return self.api_client.call_api(
'/workflow/{workflowId}/messages', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='str', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)
3 changes: 2 additions & 1 deletion src/conductor/client/http/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@
from conductor.client.http.models.circuit_breaker_transition_response import CircuitBreakerTransitionResponse
from conductor.client.http.models.signal_response import SignalResponse, TaskStatus
from conductor.client.http.models.authentication_config import AuthenticationConfig
from conductor.client.http.models.tag_object import TagObject
from conductor.client.http.models.tag_object import TagObject
from conductor.client.http.models.workflow_message import WorkflowMessage
41 changes: 41 additions & 0 deletions src/conductor/client/http/models/workflow_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dataclasses import dataclass, field
from typing import Dict, Optional


@dataclass
class WorkflowMessage:
"""Represents a message pushed into a running workflow's queue (WMQ).

Attributes:
id: UUID assigned by the server on push.
workflow_id: The workflow instance that owns this message.
payload: Arbitrary JSON payload supplied by the caller.
received_at: ISO-8601 UTC timestamp set at ingestion time.
"""

id: Optional[str] = field(default=None)
workflow_id: Optional[str] = field(default=None)
payload: Optional[Dict[str, object]] = field(default=None)
received_at: Optional[str] = field(default=None)

swagger_types = {
'id': 'str',
'workflow_id': 'str',
'payload': 'dict(str, object)',
'received_at': 'str',
}

attribute_map = {
'id': 'id',
'workflow_id': 'workflowId',
'payload': 'payload',
'received_at': 'receivedAt',
}

def to_dict(self) -> dict:
result = {}
for attr, _ in self.swagger_types.items():
value = getattr(self, attr)
if value is not None:
result[self.attribute_map[attr]] = value
return result
14 changes: 14 additions & 0 deletions src/conductor/client/orkes/orkes_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,17 @@ def update_state(self, workflow_id: str, update_requesst: WorkflowStateUpdate,
kwargs["wait_for_seconds"] = wait_for_seconds

return self.workflowResourceApi.update_workflow_and_task_state(update_requesst=update_requesst, workflow_id=workflow_id, **kwargs)

def send_message(self, workflow_id: str, message: Dict[str, object]) -> str:
"""Push a message into the message queue of a running workflow (WMQ).

Requires conductor.workflow-message-queue.enabled=true on the server.

Args:
workflow_id: The running workflow instance ID.
message: Arbitrary JSON-serialisable dict to deliver to the workflow.

Returns:
The UUID string assigned to the message by the server.
"""
return self.workflowResourceApi.send_workflow_message(message, workflow_id)
19 changes: 19 additions & 0 deletions src/conductor/client/workflow/executor/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,25 @@ def signal_async(self, workflow_id: str, status: str, body: Dict[str, Any]) -> N
body=body
)

def send_message(self, workflow_id: str, message: Dict[str, Any]) -> str:
"""Push a message into the message queue of a running workflow (WMQ).

The workflow must have a PULL_WORKFLOW_MESSAGES task to consume messages.
Requires conductor.workflow-message-queue.enabled=true on the server.

Args:
workflow_id: The running workflow instance ID.
message: Arbitrary JSON-serialisable dict to deliver to the workflow.

Returns:
The UUID string assigned to the message by the server.

Raises:
ApiException: 404 if the workflow is not found, 409 if not in RUNNING state,
429 if the queue is at capacity.
"""
return self.workflow_client.send_message(workflow_id, message)

def __get_task_result(self, task_id: str, workflow_id: str, task_output: Dict[str, Any], status: str) -> TaskResult:
return TaskResult(
workflow_instance_id=workflow_id,
Expand Down
30 changes: 30 additions & 0 deletions src/conductor/client/workflow/task/pull_workflow_messages_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from typing_extensions import Self

from conductor.client.workflow.task.task import TaskInterface
from conductor.client.workflow.task.task_type import TaskType


class PullWorkflowMessagesTask(TaskInterface):
"""Consume messages from the workflow's message queue (WMQ).

When messages are available the task completes with:
output.messages — list of WorkflowMessage objects
output.count — number of messages returned

When the queue is empty the task stays IN_PROGRESS and is re-evaluated
after ~1 second (non-blocking polling behavior).

Args:
task_ref_name: Unique task reference name within the workflow.
batch_size: Maximum number of messages to dequeue per execution (default 1,
server cap is typically 100).
"""

def __init__(self, task_ref_name: str, batch_size: int = 1) -> Self:
super().__init__(
task_reference_name=task_ref_name,
task_type=TaskType.PULL_WORKFLOW_MESSAGES,
)
self.input_parameters["batchSize"] = batch_size
1 change: 1 addition & 0 deletions src/conductor/client/workflow/task/task_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ class TaskType(str, Enum):
LLM_SEARCH_EMBEDDINGS = "LLM_SEARCH_EMBEDDINGS"
LIST_MCP_TOOLS = "LIST_MCP_TOOLS"
CALL_MCP_TOOL = "CALL_MCP_TOOL"
PULL_WORKFLOW_MESSAGES = "PULL_WORKFLOW_MESSAGES"
Loading
Loading