From 369ecc5e0a11a18f10ad894ab16aa20d8c8af902 Mon Sep 17 00:00:00 2001 From: ancongui Date: Tue, 19 May 2026 12:55:38 +0200 Subject: [PATCH 1/2] chore: clean up ruff findings introduced by #23 / #24 / #25 The Lint check failed on all three merged PRs because the new files tripped ``F401`` (unused ``typing.Any``), ``SIM105`` (replace ``try``/``except``/``pass`` with ``contextlib.suppress``), ``UP041`` (replace ``asyncio.TimeoutError`` with builtin ``TimeoutError``), ``I001`` (import ordering), and ``F841`` (unused local). The other CI jobs (Unit tests, SDK Python, SDK Java, Typecheck, Docling) were all green on each PR; the merges weren't gated on Lint. This is the follow-up sweep so ``ruff check`` is clean on ``main``. 11 errors fixed (8 auto-fixed by ``ruff --fix``, 3 manual). --- src/flydocs/core/services/workers/bbox_reaper.py | 6 ++---- src/flydocs/core/services/workers/job_reaper.py | 6 ++---- tests/integration/test_eda_advisory_lock.py | 2 -- tests/integration/test_reaper_postgres.py | 1 - tests/unit/test_reapers.py | 1 - tests/unit/test_worker_concurrency.py | 1 - 6 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/flydocs/core/services/workers/bbox_reaper.py b/src/flydocs/core/services/workers/bbox_reaper.py index f636421..35b09cb 100644 --- a/src/flydocs/core/services/workers/bbox_reaper.py +++ b/src/flydocs/core/services/workers/bbox_reaper.py @@ -18,9 +18,9 @@ from __future__ import annotations import asyncio +import contextlib import logging import socket -from typing import Any from pyfly.eda import EventPublisher @@ -65,13 +65,11 @@ async def run_forever(self) -> None: await self._sweep() except Exception: # noqa: BLE001 logger.exception("BboxReaper sweep failed; will retry next interval") - try: + with contextlib.suppress(TimeoutError): await asyncio.wait_for( self._stop.wait(), timeout=max(1, self._settings.reaper_sweep_interval_s), ) - except asyncio.TimeoutError: - pass def stop(self) -> None: self._stop.set() diff --git a/src/flydocs/core/services/workers/job_reaper.py b/src/flydocs/core/services/workers/job_reaper.py index ca00620..941db3e 100644 --- a/src/flydocs/core/services/workers/job_reaper.py +++ b/src/flydocs/core/services/workers/job_reaper.py @@ -26,9 +26,9 @@ from __future__ import annotations import asyncio +import contextlib import logging import socket -from typing import Any from pyfly.eda import EventPublisher @@ -70,13 +70,11 @@ async def run_forever(self) -> None: await self._sweep() except Exception: # noqa: BLE001 logger.exception("JobReaper sweep failed; will retry next interval") - try: + with contextlib.suppress(TimeoutError): await asyncio.wait_for( self._stop.wait(), timeout=max(1, self._settings.reaper_sweep_interval_s), ) - except asyncio.TimeoutError: - pass def stop(self) -> None: self._stop.set() diff --git a/tests/integration/test_eda_advisory_lock.py b/tests/integration/test_eda_advisory_lock.py index 8a2f6ee..2d3063a 100644 --- a/tests/integration/test_eda_advisory_lock.py +++ b/tests/integration/test_eda_advisory_lock.py @@ -14,7 +14,6 @@ import os import pytest - from pyfly.eda.adapters.postgres import PostgresEventBus _PG_URL = os.environ.get("FLYDOCS_TEST_PG_URL") @@ -72,7 +71,6 @@ async def handler_b(envelope) -> None: await bus_b.start() try: # Publish 20 events; each must be delivered exactly once. - published_ids: list[str] = [] for i in range(20): # Use bus_a as the producer; events land in the shared outbox. await bus_a.publish( diff --git a/tests/integration/test_reaper_postgres.py b/tests/integration/test_reaper_postgres.py index 63853e5..c3179c0 100644 --- a/tests/integration/test_reaper_postgres.py +++ b/tests/integration/test_reaper_postgres.py @@ -22,7 +22,6 @@ import os from datetime import UTC, datetime, timedelta -from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest diff --git a/tests/unit/test_reapers.py b/tests/unit/test_reapers.py index c774473..a7d40b4 100644 --- a/tests/unit/test_reapers.py +++ b/tests/unit/test_reapers.py @@ -12,7 +12,6 @@ import asyncio from datetime import UTC, datetime, timedelta -from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest diff --git a/tests/unit/test_worker_concurrency.py b/tests/unit/test_worker_concurrency.py index 4170b11..4242be6 100644 --- a/tests/unit/test_worker_concurrency.py +++ b/tests/unit/test_worker_concurrency.py @@ -26,7 +26,6 @@ from flydocs.core.services.workers.job_worker import JobWorker from flydocs.interfaces.enums.job_status import JobStatus - # --------------------------------------------------------------- shared fixtures From b75675691a73842fafab1281c9fa303d2332da1b Mon Sep 17 00:00:00 2001 From: ancongui Date: Tue, 19 May 2026 12:58:53 +0200 Subject: [PATCH 2/2] chore: apply ``ruff format`` to the same files The Lint job runs both ``ruff check`` and ``ruff format --check``. The previous commit cleared the ``check`` half; this one runs ``ruff format`` over the 8 files in the same change set so the formatter half passes too. No behaviour change. --- .../core/services/jobs/cancel_job_handler.py | 4 +--- .../core/services/workers/job_worker.py | 22 +++++-------------- .../repositories/extraction_job_repository.py | 3 ++- .../integration/test_postgres_concurrency.py | 4 +--- tests/integration/test_reaper_postgres.py | 8 ++----- tests/unit/test_reapers.py | 10 ++------- tests/unit/test_submit_job_handler.py | 4 +--- tests/unit/test_worker_concurrency.py | 8 ++----- 8 files changed, 17 insertions(+), 46 deletions(-) diff --git a/src/flydocs/core/services/jobs/cancel_job_handler.py b/src/flydocs/core/services/jobs/cancel_job_handler.py index 873f6ce..4917565 100644 --- a/src/flydocs/core/services/jobs/cancel_job_handler.py +++ b/src/flydocs/core/services/jobs/cancel_job_handler.py @@ -60,6 +60,4 @@ async def do_handle(self, command: CancelJobCommand) -> JobStatusResponse | None job = await self._repository.get(command.job_id) if job is None: return None - raise JobNotCancellable( - f"Job {job.id!r} cannot be cancelled in status {job.status}" - ) + raise JobNotCancellable(f"Job {job.id!r} cannot be cancelled in status {job.status}") diff --git a/src/flydocs/core/services/workers/job_worker.py b/src/flydocs/core/services/workers/job_worker.py index 759251b..23c89f9 100644 --- a/src/flydocs/core/services/workers/job_worker.py +++ b/src/flydocs/core/services/workers/job_worker.py @@ -151,9 +151,7 @@ async def _process(self, job_id: str) -> None: # (or stale-RUNNING) job. ``None`` means another worker beat us # to it or the job was cancelled between our ``get`` and this # claim -- both are silent no-ops. - claimed = await self._repository.mark_running( - job.id, lease_seconds=self._settings.job_run_lease_s - ) + claimed = await self._repository.mark_running(job.id, lease_seconds=self._settings.job_run_lease_s) if claimed is None: logger.info( "Job %s could not be claimed -- already owned by another worker or " @@ -206,20 +204,15 @@ async def _process(self, job_id: str) -> None: # the bboxes change between PARTIAL_SUCCEEDED and SUCCEEDED. terminal_status = JobStatus.PARTIAL_SUCCEEDED if wants_bbox_refine else JobStatus.SUCCEEDED if wants_bbox_refine: - finalised = await self._repository.mark_partial_succeeded( - job.id, result=result_payload - ) + finalised = await self._repository.mark_partial_succeeded(job.id, result=result_payload) else: - finalised = await self._repository.mark_succeeded( - job.id, result=result_payload - ) + finalised = await self._repository.mark_succeeded(job.id, result=result_payload) if finalised is None: # Another worker (or the bbox leg) already advanced the # row past RUNNING. Our work is duplicate -- don't fire # the webhook a second time, don't republish. logger.info( - "Job %s already finalised by another worker -- " - "discarding our duplicate result", + "Job %s already finalised by another worker -- discarding our duplicate result", job.id, ) return @@ -283,9 +276,7 @@ async def _process(self, job_id: str) -> None: ) if terminal: - failed = await self._repository.mark_failed( - job.id, code=error_code, message=str(exc) - ) + failed = await self._repository.mark_failed(job.id, code=error_code, message=str(exc)) if failed is None: logger.info( "Job %s no longer in RUNNING -- another worker handled the " @@ -323,8 +314,7 @@ async def _process(self, job_id: str) -> None: requeued = await self._repository.requeue_for_retry(job.id) if requeued is None: logger.info( - "Job %s not requeueable (status changed under us) -- " - "skipping retry publish", + "Job %s not requeueable (status changed under us) -- skipping retry publish", job.id, ) else: diff --git a/src/flydocs/models/repositories/extraction_job_repository.py b/src/flydocs/models/repositories/extraction_job_repository.py index d400af1..ec18718 100644 --- a/src/flydocs/models/repositories/extraction_job_repository.py +++ b/src/flydocs/models/repositories/extraction_job_repository.py @@ -169,7 +169,8 @@ async def find_stale_queued( func.coalesce( ExtractionJob.started_at, ExtractionJob.created_at, - ) < cutoff, + ) + < cutoff, ) .limit(limit) ) diff --git a/tests/integration/test_postgres_concurrency.py b/tests/integration/test_postgres_concurrency.py index 2265ff9..c09eb7b 100644 --- a/tests/integration/test_postgres_concurrency.py +++ b/tests/integration/test_postgres_concurrency.py @@ -69,9 +69,7 @@ async def test_postgres_atomic_claim_single_winner(pg_repo: ExtractionJobReposit seeded = await _seed(pg_repo) n = 8 - results = await asyncio.gather( - *(pg_repo.mark_running(seeded.id, lease_seconds=300) for _ in range(n)) - ) + results = await asyncio.gather(*(pg_repo.mark_running(seeded.id, lease_seconds=300) for _ in range(n))) winners = [r for r in results if r is not None] assert len(winners) == 1, f"expected 1 winner, got {len(winners)} (lost-update?!)" assert winners[0].attempts == 1 diff --git a/tests/integration/test_reaper_postgres.py b/tests/integration/test_reaper_postgres.py index c3179c0..fd2b5f7 100644 --- a/tests/integration/test_reaper_postgres.py +++ b/tests/integration/test_reaper_postgres.py @@ -127,9 +127,7 @@ async def test_job_reaper_revives_all_three_job_orphan_classes( await reaper._sweep() - published_ids = [ - c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list - ] + published_ids = [c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list] assert submit_orphan.id in published_ids assert crashed_runner.id in published_ids assert retry_orphan.id in published_ids @@ -175,9 +173,7 @@ async def test_bbox_reaper_revives_both_bbox_orphan_classes( await reaper._sweep() - published_ids = [ - c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list - ] + published_ids = [c.kwargs["payload"]["job_id"] for c in publisher.publish.await_args_list] assert publish_orphan.id in published_ids assert crashed_bbox.id in published_ids assert fresh.id not in published_ids diff --git a/tests/unit/test_reapers.py b/tests/unit/test_reapers.py index a7d40b4..8198fe6 100644 --- a/tests/unit/test_reapers.py +++ b/tests/unit/test_reapers.py @@ -223,10 +223,7 @@ async def test_job_reaper_republishes_stale_running_and_queued() -> None: await reaper._sweep() - published_ids = [ - call.kwargs["payload"]["job_id"] - for call in publisher.publish.await_args_list - ] + published_ids = [call.kwargs["payload"]["job_id"] for call in publisher.publish.await_args_list] assert stale_running.id in published_ids assert orphan_queued.id in published_ids assert fresh_running.id not in published_ids @@ -271,10 +268,7 @@ async def test_bbox_reaper_republishes_stale_refining_and_pending() -> None: await reaper._sweep() - published_ids = [ - call.kwargs["payload"]["job_id"] - for call in publisher.publish.await_args_list - ] + published_ids = [call.kwargs["payload"]["job_id"] for call in publisher.publish.await_args_list] assert stale_refining.id in published_ids assert pending_orphan.id in published_ids assert fresh_refining.id not in published_ids diff --git a/tests/unit/test_submit_job_handler.py b/tests/unit/test_submit_job_handler.py index f4d177d..1b9cd06 100644 --- a/tests/unit/test_submit_job_handler.py +++ b/tests/unit/test_submit_job_handler.py @@ -202,9 +202,7 @@ async def _get_by_key(key: str): ], docs=[_doc_spec()], ) - response = await handler.do_handle( - SubmitJobCommand(request=request, idempotency_key="dupe-key") - ) + response = await handler.do_handle(SubmitJobCommand(request=request, idempotency_key="dupe-key")) assert response.job_id == "winner-job-id" assert response.status is JobStatus.QUEUED diff --git a/tests/unit/test_worker_concurrency.py b/tests/unit/test_worker_concurrency.py index 4242be6..939516d 100644 --- a/tests/unit/test_worker_concurrency.py +++ b/tests/unit/test_worker_concurrency.py @@ -223,9 +223,7 @@ async def test_job_worker_skips_webhook_when_finalise_returns_none() -> None: ], } ], - "documents": [ - {"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"} - ], + "documents": [{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}], }, ) repo = _Repo(job, finalise_returns_none=True) @@ -257,9 +255,7 @@ async def test_job_worker_retry_path_uses_atomic_requeue() -> None: ], } ], - "documents": [ - {"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"} - ], + "documents": [{"filename": "a.pdf", "content_base64": "Zm9v", "content_type": "application/pdf"}], }, ) repo = _Repo(job)