Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/flydocs/core/services/jobs/cancel_job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,4 @@ async def do_handle(self, command: CancelJobCommand) -> JobStatusResponse | None
job = await self._repository.get(command.job_id)
if job is None:
return None
raise JobNotCancellable(
f"Job {job.id!r} cannot be cancelled in status {job.status}"
)
raise JobNotCancellable(f"Job {job.id!r} cannot be cancelled in status {job.status}")
6 changes: 2 additions & 4 deletions src/flydocs/core/services/workers/bbox_reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
import socket
from typing import Any

from pyfly.eda import EventPublisher

Expand Down Expand Up @@ -65,13 +65,11 @@ async def run_forever(self) -> None:
await self._sweep()
except Exception: # noqa: BLE001
logger.exception("BboxReaper sweep failed; will retry next interval")
try:
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(
self._stop.wait(),
timeout=max(1, self._settings.reaper_sweep_interval_s),
)
except asyncio.TimeoutError:
pass

def stop(self) -> None:
self._stop.set()
Expand Down
6 changes: 2 additions & 4 deletions src/flydocs/core/services/workers/job_reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
from __future__ import annotations

import asyncio
import contextlib
import logging
import socket
from typing import Any

from pyfly.eda import EventPublisher

Expand Down Expand Up @@ -70,13 +70,11 @@ async def run_forever(self) -> None:
await self._sweep()
except Exception: # noqa: BLE001
logger.exception("JobReaper sweep failed; will retry next interval")
try:
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(
self._stop.wait(),
timeout=max(1, self._settings.reaper_sweep_interval_s),
)
except asyncio.TimeoutError:
pass

def stop(self) -> None:
self._stop.set()
Expand Down
22 changes: 6 additions & 16 deletions src/flydocs/core/services/workers/job_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ async def _process(self, job_id: str) -> None:
# (or stale-RUNNING) job. ``None`` means another worker beat us
# to it or the job was cancelled between our ``get`` and this
# claim -- both are silent no-ops.
claimed = await self._repository.mark_running(
job.id, lease_seconds=self._settings.job_run_lease_s
)
claimed = await self._repository.mark_running(job.id, lease_seconds=self._settings.job_run_lease_s)
if claimed is None:
logger.info(
"Job %s could not be claimed -- already owned by another worker or "
Expand Down Expand Up @@ -206,20 +204,15 @@ async def _process(self, job_id: str) -> None:
# the bboxes change between PARTIAL_SUCCEEDED and SUCCEEDED.
terminal_status = JobStatus.PARTIAL_SUCCEEDED if wants_bbox_refine else JobStatus.SUCCEEDED
if wants_bbox_refine:
finalised = await self._repository.mark_partial_succeeded(
job.id, result=result_payload
)
finalised = await self._repository.mark_partial_succeeded(job.id, result=result_payload)
else:
finalised = await self._repository.mark_succeeded(
job.id, result=result_payload
)
finalised = await self._repository.mark_succeeded(job.id, result=result_payload)
if finalised is None:
# Another worker (or the bbox leg) already advanced the
# row past RUNNING. Our work is duplicate -- don't fire
# the webhook a second time, don't republish.
logger.info(
"Job %s already finalised by another worker -- "
"discarding our duplicate result",
"Job %s already finalised by another worker -- discarding our duplicate result",
job.id,
)
return
Expand Down Expand Up @@ -283,9 +276,7 @@ async def _process(self, job_id: str) -> None:
)

if terminal:
failed = await self._repository.mark_failed(
job.id, code=error_code, message=str(exc)
)
failed = await self._repository.mark_failed(job.id, code=error_code, message=str(exc))
if failed is None:
logger.info(
"Job %s no longer in RUNNING -- another worker handled the "
Expand Down Expand Up @@ -323,8 +314,7 @@ async def _process(self, job_id: str) -> None:
requeued = await self._repository.requeue_for_retry(job.id)
if requeued is None:
logger.info(
"Job %s not requeueable (status changed under us) -- "
"skipping retry publish",
"Job %s not requeueable (status changed under us) -- skipping retry publish",
job.id,
)
else:
Expand Down
3 changes: 2 additions & 1 deletion src/flydocs/models/repositories/extraction_job_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ async def find_stale_queued(
func.coalesce(
ExtractionJob.started_at,
ExtractionJob.created_at,
) < cutoff,
)
< cutoff,
)
.limit(limit)
)
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_eda_advisory_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import os

import pytest

from pyfly.eda.adapters.postgres import PostgresEventBus

_PG_URL = os.environ.get("FLYDOCS_TEST_PG_URL")
Expand Down Expand Up @@ -72,7 +71,6 @@ async def handler_b(envelope) -> None:
await bus_b.start()
try:
# Publish 20 events; each must be delivered exactly once.
published_ids: list[str] = []
for i in range(20):
# Use bus_a as the producer; events land in the shared outbox.
await bus_a.publish(
Expand Down
4 changes: 1 addition & 3 deletions tests/integration/test_postgres_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ async def test_postgres_atomic_claim_single_winner(pg_repo: ExtractionJobReposit
seeded = await _seed(pg_repo)

n = 8
results = await asyncio.gather(
*(pg_repo.mark_running(seeded.id, lease_seconds=300) for _ in range(n))
)
results = await asyncio.gather(*(pg_repo.mark_running(seeded.id, lease_seconds=300) for _ in range(n)))
winners = [r for r in results if r is not None]
assert len(winners) == 1, f"expected 1 winner, got {len(winners)} (lost-update?!)"
assert winners[0].attempts == 1
Expand Down
9 changes: 2 additions & 7 deletions tests/integration/test_reaper_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import os
from datetime import UTC, datetime, timedelta
from typing import Any
from unittest.mock import AsyncMock, MagicMock

import pytest
Expand Down Expand Up @@ -128,9 +127,7 @@ async def test_job_reaper_revives_all_three_job_orphan_classes(

await reaper._sweep()

published_ids = [
c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list
]
published_ids = [c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list]
assert submit_orphan.id in published_ids
assert crashed_runner.id in published_ids
assert retry_orphan.id in published_ids
Expand Down Expand Up @@ -176,9 +173,7 @@ async def test_bbox_reaper_revives_both_bbox_orphan_classes(

await reaper._sweep()

published_ids = [
c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list
]
published_ids = [c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list]
assert publish_orphan.id in published_ids
assert crashed_bbox.id in published_ids
assert fresh.id not in published_ids
Expand Down
11 changes: 2 additions & 9 deletions tests/unit/test_reapers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import asyncio
from datetime import UTC, datetime, timedelta
from typing import Any
from unittest.mock import AsyncMock, MagicMock

import pytest
Expand Down Expand Up @@ -224,10 +223,7 @@ async def test_job_reaper_republishes_stale_running_and_queued() -> None:

await reaper._sweep()

published_ids = [
call.kwargs["payload"]["job_id"]
for call in publisher.publish.await_args_list
]
published_ids = [call.kwargs["payload"]["job_id"] for call in publisher.publish.await_args_list]
assert stale_running.id in published_ids
assert orphan_queued.id in published_ids
assert fresh_running.id not in published_ids
Expand Down Expand Up @@ -272,10 +268,7 @@ async def test_bbox_reaper_republishes_stale_refining_and_pending() -> None:

await reaper._sweep()

published_ids = [
call.kwargs["payload"]["job_id"]
for call in publisher.publish.await_args_list
]
published_ids = [call.kwargs["payload"]["job_id"] for call in publisher.publish.await_args_list]
assert stale_refining.id in published_ids
assert pending_orphan.id in published_ids
assert fresh_refining.id not in published_ids
Expand Down
4 changes: 1 addition & 3 deletions tests/unit/test_submit_job_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ async def _get_by_key(key: str):
],
docs=[_doc_spec()],
)
response = await handler.do_handle(
SubmitJobCommand(request=request, idempotency_key="dupe-key")
)
response = await handler.do_handle(SubmitJobCommand(request=request, idempotency_key="dupe-key"))

assert response.job_id == "winner-job-id"
assert response.status is JobStatus.QUEUED
Expand Down
9 changes: 2 additions & 7 deletions tests/unit/test_worker_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from flydocs.core.services.workers.job_worker import JobWorker
from flydocs.interfaces.enums.job_status import JobStatus


# --------------------------------------------------------------- shared fixtures


Expand Down Expand Up @@ -224,9 +223,7 @@ async def test_job_worker_skips_webhook_when_finalise_returns_none() -> None:
],
}
],
"documents": [
{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}
],
"documents": [{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}],
},
)
repo = _Repo(job, finalise_returns_none=True)
Expand Down Expand Up @@ -258,9 +255,7 @@ async def test_job_worker_retry_path_uses_atomic_requeue() -> None:
],
}
],
"documents": [
{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}
],
"documents": [{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}],
},
)
repo = _Repo(job)
Expand Down
Loading