Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions docs/concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Concurrency model

How flydocs handles concurrent operations safely when you scale workers
horizontally. Everything below is the *production* story — the docs
that describe individual stages still hold.

If you only run one `worker` and one `bbox-worker` container, you can
skip this page. If you ever plan to run more than one of either, read
it once.

---

## Where the work lives

The async path is split across three processes:

| Process | Owns |
| -------------------- | ------------------------------------------------------------------------------------- |
| `flydocs serve` | HTTP API. Persists the job row + publishes `IDPJobSubmitted` on the EDA bus. |
| `flydocs worker` | Main extraction. Subscribes to `IDPJobSubmitted`. Bundles a `JobReaper` sidecar. |
| `flydocs bbox-worker`| Out-of-band bbox grounding. Subscribes to `IDPBboxRefineRequested`. Bundles a `BboxReaper` sidecar. |

Each worker container runs **two cooperating async tasks**: the
consumer loop and the reaper. If either crashes, the container exits
and the orchestrator restarts it — both are reset together.

---

## The four invariants

The whole concurrency story rests on four guarantees, in order from
inside-out:

### 1. Atomic state transitions (`extraction_jobs`)

Every state-changing repository method is a single conditional
`UPDATE ... WHERE id=? AND status IN (legal_predecessors) RETURNING *`.
Two writers racing on the same row are serialised by Postgres'
row-level UPDATE lock; the `WHERE` precondition picks exactly one
winner. The loser gets `None` back — never a partial write, never a
silently-clobbered field.

The legal-predecessor matrix:

| Method | Predecessors |
| ----------------------------- | ----------------------------------------------------------------------- |
| `mark_running` | `QUEUED` OR (`RUNNING` with stale `started_at` past `job_run_lease_s`) |
| `mark_succeeded` | `RUNNING` OR `REFINING_BBOXES` |
| `mark_failed` | `RUNNING` |
| `mark_partial_succeeded` | `RUNNING` |
| `mark_bbox_refining` | `PARTIAL_SUCCEEDED` OR (`REFINING_BBOXES` with stale `bbox_refine_started_at`) |
| `mark_bbox_refined` | `REFINING_BBOXES` |
| `mark_bbox_refine_failed` | `REFINING_BBOXES` |
| `mark_cancelled` | `QUEUED` |
| `requeue_for_retry` | `RUNNING` |
| `requeue_bbox_refine` | `REFINING_BBOXES` |

A worker that receives the same `IDPJobSubmitted` event twice — for
example because the bus redelivered while a peer was still claiming —
calls `mark_running` and gets `None` on the second call. It logs and
bails. No duplicate orchestrator invocation, no duplicate webhook.

### 2. Per-group advisory lock on the EDA drain (`PostgresEventBus`)

The Postgres EDA adapter (in `fireflyframework-pyfly`) wraps every
drain pass in `pg_try_advisory_lock(group_key)`. The key is a
deterministic SHA-256 fold of the consumer-group name. Concurrent
replicas in the same group all attempt the lock; whoever wins drains
the outbox; everyone else returns immediately and waits for the next
`NOTIFY` or poll tick.

This is the layer that *prevents* the duplicate-dispatch problem in
the first place. Layer 1 (atomic claim) is the defense-in-depth that
makes the system correct even if a future bus adapter drops it.

Session-level lock → auto-releases on connection death. A worker that
crashes mid-drain never zombies the group.

### 3. Idempotency-key collision recovery (`SubmitJobHandler`)

The submit path is `SELECT-by-key` then `INSERT`. Two concurrent
requests with the same `Idempotency-Key` can both miss the SELECT and
both attempt the INSERT. One wins; the other hits the partial unique
index `uq_extraction_jobs_idempotency_key` and the SDK / handler
catches `IntegrityError`, re-resolves the winning row, and returns the
idempotent `SubmitJobResponse` shape. The caller sees no difference
between winning and losing.

### 4. Periodic reaper revives orphans (`JobReaper` + `BboxReaper`)

Atomic claim solves *duplicate processing*. The reaper solves the
opposite failure mode: when the event chain breaks and a job is stuck
because **nothing** is going to deliver to it.

The five orphan classes the reaper handles:

| # | Status | Cause | Reaper |
| -- | ---------------------- | ------------------------------------------------------------------------ | ------------ |
| 1 | `QUEUED` | Submit handler crashed between row INSERT and outbox PUBLISH | `JobReaper` |
| 2 | `RUNNING` | Worker crashed mid-extraction past its lease | `JobReaper` |
| 3 | `QUEUED` (post-retry) | Worker's `_delayed_publish` task died before its `asyncio.sleep` completed | `JobReaper` |
| 4 | `PARTIAL_SUCCEEDED` | Main worker crashed between `mark_partial_succeeded` and publish-bbox | `BboxReaper` |
| 5 | `REFINING_BBOXES` | Bbox worker crashed mid-grounding | `BboxReaper` |

Every `reaper_sweep_interval_s` (default 60 s) each reaper:

1. Queries rows that match each orphan signature.
2. Republishes a fresh EDA event for each id.
3. Lets the atomic claim from invariant #1 pick exactly one consumer.

Duplicate republishes from multiple reaper replicas all funnel through
the atomic claim, so running a reaper in every worker container is
safe.

---

## Lease windows

A "lease" is the wall-clock window during which a `RUNNING` (or
`REFINING_BBOXES`) row is considered legitimately owned by its
claimant. Past the lease, the reaper assumes the claimant is dead and
republishes. The next claim succeeds because `mark_running` matches
`RUNNING WITH stale started_at`.

| Setting | Default | What it means |
| -------------------------------- | ------- | ---------------------------------------------------------------------- |
| `FLYDOCS_JOB_RUN_LEASE_S` | 1260 | `async_timeout_s + 60s`. The worker's own `asyncio.wait_for` caps any legitimate run at `async_timeout_s`, so a lease past that means crash. |
| `FLYDOCS_BBOX_REFINE_LEASE_S` | 660 | `bbox_refine_timeout_s + 60s`. Same idea for the bbox leg. |
| `FLYDOCS_REAPER_SWEEP_INTERVAL_S` | 60 | How often each reaper polls for stuck rows. |
| `FLYDOCS_QUEUED_ORPHAN_THRESHOLD_S` | 600 | `2 * retry_max_delay_s`. How long a `QUEUED` row waits before the reaper considers its triggering event lost. |
| `FLYDOCS_PARTIAL_SUCCEEDED_ORPHAN_THRESHOLD_S` | 1320 | `async_timeout_s + 120s`. How long after the main extraction's `started_at` we conclude the bbox-refine event was lost. |

Recovery time after a crash is bounded by `lease + reaper_sweep_interval_s`
≈ 22 min with the shipped defaults and `async_timeout_s=1200`. Lower
`async_timeout_s` for faster recovery on a use case where extraction
should be quick.

---

## What the reaper does NOT do

- **It doesn't dedupe republishes within a single sweep window.**
A backlog of 50 jobs that are legitimately QUEUED for >
`queued_orphan_threshold_s` will get republished on every 60 s
sweep until they're picked up. That's bounded outbox bloat under
heavy load, not unbounded. If this matters in your deployment,
raise the threshold.
- **It doesn't fence multiple reaper replicas.** Two reapers in two
containers both find the same stale rows; both republish; the
atomic claim dedupes the work. Cost is the extra outbox INSERTs
and `NOTIFY` traffic.
- **It doesn't cancel a stuck `RUNNING` job.** Mid-flight cancel is
intentionally not supported (the orchestrator has no cancellation
hook). To kill a stuck job today, wait for the lease + reaper to
revive it, then issue cancel against the redelivered QUEUED entry.

---

## Recipes

### Run more than one worker safely

`docker compose --scale worker=3` (or the equivalent in K8s).
Everything is bounded by the four invariants above. No coordination
config to set.

### Detect orphans manually

```sql
-- Stuck RUNNING (lease expired)
SELECT id, started_at, attempts FROM extraction_jobs
WHERE status='RUNNING'
AND started_at < now() - INTERVAL '21 minutes'
ORDER BY started_at;

-- QUEUED with no event in outbox (submit/retry-publish lost)
SELECT id, created_at FROM extraction_jobs
WHERE status='QUEUED'
AND COALESCE(started_at, created_at) < now() - INTERVAL '10 minutes'
ORDER BY created_at;

-- PARTIAL_SUCCEEDED waiting on a bbox event that may have been lost
SELECT id, started_at FROM extraction_jobs
WHERE status='PARTIAL_SUCCEEDED'
AND bbox_refine_status='pending'
AND bbox_refine_started_at IS NULL
AND started_at < now() - INTERVAL '22 minutes';
```

The reaper would have caught all of these before the next sweep
completes; the queries are useful for ad-hoc audits.

### Force a re-claim now (operator override)

Trim `started_at` to a past instant; the reaper's next sweep picks
the row up:

```sql
UPDATE extraction_jobs
SET started_at = now() - INTERVAL '24 hours'
WHERE id = '<job-id>' AND status='RUNNING';
```

---

## Adapter compatibility

| EDA adapter (`FLYDOCS_EDA_ADAPTER`) | Multi-worker safe? | Notes |
| ----------------------------------- | ------------------ | ----- |
| `postgres` (default) | ✅ Yes | Via per-group `pg_try_advisory_lock` in pyfly's adapter. |
| `redis` | ✅ Yes | Redis Streams `XREADGROUP` is a competitive consumer by design. |
| `kafka` | ✅ Yes | Kafka consumer groups partition delivery across replicas. |
| `memory` | ❌ Single-process only | Process-local queue; not for multi-replica deployments. |

The repository-level atomic claim is adapter-agnostic — even with the
`memory` adapter in tests, the same `mark_running` precondition wins.
7 changes: 4 additions & 3 deletions docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,10 @@ outbound_call target=worker op=job.run status=ok latency_ms=42557 job
| Component | Strategy |
| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **API** | Stateless — scale horizontally. Set uvicorn workers via `--workers` (one per CPU is a good default). Sticky sessions are not required. |
| **Worker** | Stateless — scale horizontally. Redis consumer groups shard message delivery. Right-size against peak job arrival; one worker handles ~1 job/min if each takes ~30 s. |
| **Postgres** | A single primary is fine for the `extraction_jobs` table size you'll see. Add a read replica only if `/api/v1/jobs/{id}` reads dominate and you want to offload them. |
| **Redis** | Single primary is fine; the stream durability covers worker restarts. Use a managed offering or a small Sentinel setup for HA. |
| **Worker** | Stateless — scale horizontally. Multi-worker safety is enforced at three layers: a per-group `pg_try_advisory_lock` in the Postgres EDA adapter (or competitive consumer for Redis/Kafka), atomic `UPDATE … WHERE … RETURNING` state transitions on `extraction_jobs`, and a periodic reaper sidecar that republishes orphans whose triggering events were lost to a crash. See [concurrency.md](concurrency.md). Right-size against peak job arrival; one worker handles ~1 job/min if each takes ~30 s. |
| **Bbox worker** | Same scaling story as the main worker, on a separate consumer group (`flydocs-bbox-workers`). Its `BboxReaper` sidecar revives `REFINING_BBOXES` and `PARTIAL_SUCCEEDED` orphans the same way. |
| **Postgres** | A single primary is fine for the `extraction_jobs` table size you'll see. Add a read replica only if `/api/v1/jobs/{id}` reads dominate and you want to offload them. The EDA outbox grows monotonically — schedule a periodic VACUUM of `pyfly_eda_outbox` if you don't already. |
| **Redis** | Only required when `FLYDOCS_EDA_ADAPTER=redis`. Single primary is fine; the stream durability covers worker restarts. Postgres adapter (default) uses the database itself for the outbox — no Redis dependency. |

---

Expand Down
69 changes: 69 additions & 0 deletions docs/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,75 @@ DAG nodes were changed back to `RuleSpec` objects. Nodes are strings

---

## Stuck jobs (concurrency / orphan recovery)

The reaper sidecar in each worker container revives jobs whose
triggering event was lost. If a job appears stuck, walk this short
checklist first. See [concurrency.md](concurrency.md) for the model.

### Job stuck in `RUNNING` for > 22 minutes

`async_timeout_s` (default 1200 s) caps any legitimate run; the lease
(default 1260 s = `async_timeout_s + 60`) adds a small grace period.
Past the lease, the reaper's next sweep republishes the event:

```sql
SELECT id, started_at, attempts, finished_at
FROM extraction_jobs
WHERE status='RUNNING'
AND started_at < now() - INTERVAL '21 minutes';
```

If the reaper is running but the row stays `RUNNING`, check the worker
container logs for `JobReaper republished job <id>` lines. No
republish over multiple sweep intervals means the reaper isn't seeing
the row -- usually `started_at` was bumped by a heartbeat we don't yet
implement, or the worker isn't actually crashed. Operator override:

```sql
UPDATE extraction_jobs
SET started_at = now() - INTERVAL '24 hours'
WHERE id = '<job-id>' AND status='RUNNING';
```

The next reaper sweep will pick it up.

### Job stuck in `QUEUED` for > 10 minutes

Either the submit handler crashed between row INSERT and outbox
PUBLISH, or a worker's retry-path `_delayed_publish` task died before
its `asyncio.sleep` fired. The reaper republishes after
`FLYDOCS_QUEUED_ORPHAN_THRESHOLD_S` (default 600 s).

Backlog vs. orphan: if many `QUEUED` rows are old, your workers are
saturated, not crashed -- check throughput, not lifecycle. Add worker
replicas (`docker compose --scale worker=N` or the K8s equivalent;
multi-worker safe by design).

### Job stuck in `PARTIAL_SUCCEEDED` with `bbox_refine_status='pending'`

Main extraction finished but the bbox-refine event wasn't published
(main worker crashed between `mark_partial_succeeded` and the publish
call). `BboxReaper` revives it after
`FLYDOCS_PARTIAL_SUCCEEDED_ORPHAN_THRESHOLD_S` (default 1320 s, sized
to `async_timeout_s + 120`). Until then the LLM-bbox result is
already readable via `GET /api/v1/jobs/{id}/result`.

### "Cannot cancel a RUNNING job"

Mid-flight cancellation is intentionally not supported. To stop a
runaway extraction:

1. Wait for the lease to expire (or trim `started_at` per the override
above).
2. The reaper republishes; the row briefly transitions to `QUEUED`
when a worker requeues itself on the next failure, OR stays
`RUNNING` if the redelivery wins again.
3. Issue `DELETE /api/v1/jobs/{id}` while it's `QUEUED` to land in
`CANCELLED`.

---

## Observability

### No metrics on `/actuator/metrics`
Expand Down
29 changes: 29 additions & 0 deletions env_template
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,35 @@ FLYDOCS_SYNC_TIMEOUT_S=60
FLYDOCS_ASYNC_TIMEOUT_S=300
FLYDOCS_JOB_MAX_ATTEMPTS=3

# ----------------------------------------------------------------------------
# Concurrency: lease windows + reaper (see docs/concurrency.md)
# ----------------------------------------------------------------------------
# How long a RUNNING claim is considered fresh. Past this, the row is
# treated as orphaned (worker crashed mid-run) and the reaper republishes
# the triggering event for another worker to pick up. Must be > the
# per-attempt budget the orchestrator uses (asyncio.wait_for ceiling).
# Defaults are sized to ``async_timeout_s + 60`` and ``bbox_refine_timeout_s
# + 60``; tighten them only if you've also tightened the per-attempt
# budgets above.
FLYDOCS_JOB_RUN_LEASE_S=1260
FLYDOCS_BBOX_REFINE_LEASE_S=660

# How often each reaper sweeps for stuck jobs (per worker process).
FLYDOCS_REAPER_SWEEP_INTERVAL_S=60

# How long a QUEUED row can sit before the reaper concludes its triggering
# event was lost (submit handler crashed between row INSERT and outbox
# PUBLISH, or worker's delayed-publish task was killed before its delay
# fired). Sized to ``2 * retry_max_delay_s`` so a legitimate retry is
# never trampled.
FLYDOCS_QUEUED_ORPHAN_THRESHOLD_S=600

# How long after the main extraction's ``started_at`` we conclude the
# bbox-refine event was lost (PARTIAL_SUCCEEDED with bbox_refine_status=
# 'pending' and bbox_refine_started_at IS NULL). Sized to
# ``async_timeout_s + 120``.
FLYDOCS_PARTIAL_SUCCEEDED_ORPHAN_THRESHOLD_S=1320

# Optional pre-extraction text rendering. ``none`` (default) sends only the
# binary to the LLM. ``docling`` runs Docling over the document first and
# splices the resulting Markdown into the user prompt ahead of the binary
Expand Down
Loading
Loading