Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 186 additions & 3 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ workflows. It supports parallel execution, conditional branching, retries, timeo
and fan-out/fan-in patterns -- everything needed to model real-world enterprise
processing pipelines.

`PipelineBuilder` has two modes:

* **Port-based** (legacy, parallel) — nodes communicate via `output_key` /
`input_key` edge ports and run concurrently within each topological level.
Best for ETL-shaped DAGs. Documented in the bulk of this guide.
* **State-based** — opt-in via `PipelineBuilder("name", state=SomeModel)`.
Nodes become `async (state) -> dict` over a typed shared state. One
`.branch(source, router)` call covers conditional routing; `Send(target, payload)`
covers runtime fan-out; a `Checkpointer` enables resume after failure. Best for
agentic workflows and ReAct-style loops. See [State-Based Pipelines](#state-based-pipelines).

---

## Concepts
Expand Down Expand Up @@ -88,15 +99,181 @@ The framework provides these built-in executors:
- **CallableStep** -- Wraps any `async` function `(context, inputs) -> output`.
- **BatchLLMStep** -- Processes multiple prompts concurrently through an agent for
cost optimization. See [Batch Processing](#batch-processing-batchllmstep) below.
- **BranchStep** -- Routes execution to one of several downstream paths based on
a predicate (see [Conditional Branching](#conditional-branching-branchstep) below).
- **FanOutStep** -- Splits input into a list for parallel downstream processing.
- **BranchStep** _(deprecated)_ -- Routes execution to one of several downstream paths based on
a predicate. Use `.branch(...)` in [State-Based Pipelines](#state-based-pipelines) instead.
- **FanOutStep** _(deprecated)_ -- Splits input into a list for parallel downstream processing.
Use `Send` in [State-Based Pipelines](#runtime-fan-out-via-send) instead.
- **FanInStep** -- Merges outputs from multiple upstream nodes.

---

## State-Based Pipelines

Set `state=` on `PipelineBuilder` to switch to a declarative API designed for
agentic workflows. Nodes become `async (state) -> dict | None` functions over
a typed shared-state object; the engine reduces each node's partial-update
dict back into the state.

```python
from typing import Annotated
from pydantic import BaseModel
from fireflyframework_agentic.pipeline import PipelineBuilder, append


class AgentState(BaseModel):
messages: Annotated[list[str], append] = [] # reducer: append
intent: str | None = None # default reducer: replace
answer: str | None = None


async def classify(state: AgentState) -> dict:
return {"intent": "complaint" if "refund" in state.messages[-1] else "general"}


async def answer(state: AgentState) -> dict:
return {"answer": "Here is your answer."}


async def escalate(state: AgentState) -> dict:
return {"answer": "Escalated to human."}


def route(state: AgentState) -> str:
return "escalate" if state.intent == "complaint" else "answer"


pipeline = (
PipelineBuilder("support-agent", state=AgentState)
.add_node(classify) # node id derived from fn.__name__
.add_node(answer)
.add_node(escalate)
.branch(classify, route) # router returns target node id
.build()
)
result = await pipeline.invoke(AgentState(messages=["I want a refund"]))
print(result.state.answer)
```

### Reducers

Reducers are declared as `Annotated[T, reducer_fn]` on the state schema. The
built-ins live in `fireflyframework_agentic.pipeline.reducers`:

| Reducer | Semantics |
|---------------|-------------------------------------------------|
| `replace` | Last-write-wins (the default for any field). |
| `append` | Append a single item to a list. |
| `extend` | Concatenate two iterables. |
| `merge_dict` | Shallow-merge two dicts; update wins on conflict. |

Custom reducers are any callable `(current, update) -> merged`.

### Branching

`.branch(source, router, mapping=None)` registers a synchronous
`(state) -> str | Send | list[Send]` router on `source`:

* Returning a node id (string) routes to that node directly.
* Passing `mapping={"label": target_node, ...}` lets the router return an
abstract label instead of a node id.
* Returning a `Send` or `list[Send]` triggers runtime fan-out (see below).

### Checkpoint + Resume

Pass a `Checkpointer` to persist state after each successful node. The
filesystem implementation ships in the package; Redis/Postgres backends are
straightforward to plug in via the `Checkpointer` Protocol.

```python
from fireflyframework_agentic.pipeline import FileCheckpointer

pipeline = (
PipelineBuilder("software-factory", state=BuildState,
checkpointer=FileCheckpointer("./checkpoints"))
.add_node(architect)
.add_node(python_dev)
.add_node(deployer)
.add_node(evaluator)
.chain(architect, python_dev, deployer, evaluator)
.build()
)

# Fresh run
result = await pipeline.invoke(BuildState(requirements="user-mgmt service"))

# Resume after crash — picks up at the failed node, skips completed ones
result = await pipeline.invoke(run_id=result.run_id)

# Or jump into a specific node with explicit state
result = await pipeline.invoke(state=loaded_state, start_at=deployer)
```

### Cycles and `recursion_limit`

State pipelines permit cycles for ReAct loops and retry-with-critique patterns.
The builder accepts `recursion_limit` (default 25) as a safety net — a runaway
loop surfaces as `result.success=False` with a clean error, not an infinite hang.

```python
def route(state):
return "done" if state.counter >= 3 else "step"

PipelineBuilder("loop", state=LoopState, recursion_limit=25)
.add_node(step).add_node(done).branch(step, route).build()
```

### Runtime Fan-Out via `Send`

A router may return `list[Send(target, payload)]` to dispatch multiple
invocations of the same (or different) workers concurrently. Each Send's
payload is applied to a copy of the current state before its target runs;
results reduce back into shared state. Replaces the legacy `FanOutStep`.

```python
from fireflyframework_agentic.pipeline import Send

def dispatch(state):
return [Send("worker", {"item": x}) for x in state.items]

PipelineBuilder("mapreduce", state=MapReduceState)
.add_node(planner).add_node(worker).add_node(collect)
.add_edge(worker, collect)
.branch(planner, dispatch)
.build()
```

When all worker targets share a common successor, the engine continues there
once the fan-out completes; the aggregator runs once with all results in
shared state.

### Mermaid Export

`StatePipeline.to_mermaid()` and `DAG.to_mermaid()` render the topology as a
Mermaid flowchart. Branch edges declared with an explicit mapping show their
label; dynamic routers are noted as such.

### When to use which mode

| Use port-based when… | Use state-based when… |
|----------------------|------------------------|
| Pure ETL: parallel, fan-out/fan-in, no shared state | Agentic workflow: classify → branch → respond / loop / retry |
| Each step's input is a single value from the previous step | Multiple agents reading/writing different fields of a shared object |
| You want the engine to run independent nodes concurrently | You want resume-after-failure and start-from-middle semantics |
| You're happy with `BranchStep` + per-node `condition` lambdas | You want one `.branch(...)` call and inspectable routing |

See [`examples/pipeline_state.py`](../examples/pipeline_state.py) for a
runnable demo covering branching, software-factory checkpoint/resume, and
map-reduce fan-out.

---

## Parallel Execution (Fan-Out / Fan-In)

> **`FanOutStep` is deprecated.** For runtime fan-out (one dispatch per item,
> arbitrary count), prefer `Send` from [State-Based Pipelines](#runtime-fan-out-via-send).
> `FanOutStep` still works for now (it emits a `DeprecationWarning` on
> construction); `FanInStep` is not deprecated.

```mermaid
graph TD
SPLIT[Fan-Out] --> W1[Worker 1]
Expand Down Expand Up @@ -248,6 +425,12 @@ dag.add_node(DAGNode(

### Conditional Branching (BranchStep)

> **Deprecated.** Prefer [State-Based Pipelines](#state-based-pipelines) with
> `.branch(source, router)` — one call instead of `BranchStep` + per-node
> `condition` lambdas, and the topology becomes inspectable as data.
> `BranchStep` still works (it emits a `DeprecationWarning` on construction);
> removal will be tracked in a follow-up issue once internal callers migrate.

`BranchStep` provides router-based conditional branching. The router callable
receives the node's input and returns a string key. Downstream nodes use
condition gates to check the branch key and execute only the matching path.
Expand Down
Loading