Tests for async result processing#1129
Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughAdds a VS Code debug attach config and a helper script for running tests with debugpy, a new test module covering error scenarios in pipeline result processing, and small TaskStateManager changes (lock key helper and a get_progress read-only view plus in-place pending updates). Changes
Sequence Diagram(s)sequenceDiagram
participant NATS
participant Worker as Celery Worker
participant State as TaskStateManager (Redis)
participant DB
participant API
NATS->>Worker: publish pipeline result (success/error)
Worker->>State: acquire lock (using _lock_key(job_id))
Worker->>State: _get_progress -> update pending, compute progress
State-->>Worker: progress snapshot
alt result has image_id and detections
Worker->>DB: save detections
else result is error or missing image_id
Worker->>DB: record failure / skip save
end
Worker->>State: update_state / persist progress
Worker->>API: (optionally) call result endpoint / enqueue follow-up task
Worker-->>NATS: ack
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
No actionable comments were generated in the recent review. 🎉 🧹 Recent nitpick comments
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This pull request adds comprehensive test coverage for async result processing in the jobs module, specifically focusing on error handling paths when PipelineResultsError is received. The PR also includes developer tooling improvements with a debug helper script and VS Code launch configuration for debugging tests.
Changes:
- Added comprehensive E2E tests for error handling in
process_nats_pipeline_resulttask - Refactored lock key generation in
TaskStateManagerto a separate function for testability - Added debugging utilities (shell script and VS Code configuration) to facilitate test debugging
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| scripts/debug_tests.sh | New helper script to launch tests with debugpy for VS Code debugging |
| ami/ml/orchestration/task_state.py | Extracted _lock_key function for reusability in tests |
| ami/jobs/test_tasks.py | New comprehensive test suite covering error handling scenarios in pipeline result processing |
| .vscode/launch.json | Added "Attach: Tests" debug configuration for test debugging |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@ami/jobs/test_tasks.py`:
- Around line 396-404: The test incorrectly assumes apply_async positional args;
update the assertions to extract kwargs robustly from mock_apply_async.call_args
by handling both calling conventions: inspect call =
mock_apply_async.call_args[0] and call_kwargs = mock_apply_async.call_args[1],
then determine the task kwargs either from call_kwargs.get('kwargs') or (if the
old convention used) from the second positional element; assert on
process_nats_pipeline_result.delay parameters by checking that the resolved
task_kwargs contains job_id == self.job.pk, reply_subject ==
"test.reply.error.1", and that "error" is in task_kwargs["result_data"]
(referencing mock_apply_async, process_nats_pipeline_result.delay, call_args,
and task_kwargs to locate the code).
In `@scripts/debug_tests.sh`:
- Around line 1-7: Add a shebang to the top of the debug_tests.sh script to
define the shell interpreter and replace the unquoted "$*" usage with the safe
"$@" form in the manage.py test invocation; update the script that defines the
docker run command (the file debug_tests.sh and the command invoking python -m
debugpy ... manage.py test $*) to start with a proper shebang (e.g.,
#!/usr/bin/env bash) and pass arguments as "$@" so arguments containing spaces
are preserved.
🧹 Nitpick comments (2)
ami/jobs/test_tasks.py (2)
240-240: Unusedmock_manager_classparameter — consider suppressing or documenting.The
@patchdecorator injects this argument, but the test doesn't use NATS (it tests lock contention before NATS is reached). Either add a brief comment explaining why it's unused or rename to_mock_manager_classto signal intent.Proposed fix
`@patch`("ami.jobs.tasks.TaskQueueManager") - def test_process_nats_pipeline_result_error_concurrent_locking(self, mock_manager_class): + def test_process_nats_pipeline_result_error_concurrent_locking(self, _mock_manager_class):
89-97:_get_progresshas side effects — calling it in assertions mutates Redis state.
_get_progressinternally callscache.set(...)to update the pending images list (Line 106 in task_state.py). While the empty set passed here means the list is effectively unchanged, this couples the assertion helper to internal implementation details and could cause subtle ordering issues if tests are extended later.Consider adding a brief comment in
_assert_progress_updatednoting this side effect, so future test authors are aware.
Add a read-only get_progress(stage) method that returns a progress snapshot without acquiring a lock or mutating state. Use it in test_tasks.py instead of calling the private _get_progress() directly. Co-Authored-By: Claude <noreply@anthropic.com>
Three reviewers were confused by how mock.call_args works here. .delay(**kw) passes ((), kw) as two positional args to apply_async, which is different from apply_async(kwargs=kw). Co-Authored-By: Claude <noreply@anthropic.com>
mihow
left a comment
There was a problem hiding this comment.
yay! thanks for the end to end tests
Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in #1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com>
* merge * Update ML job counts in async case * Update date picker version and tweak layout logic (#1105) * fix: update date picker version and tweak layout logic * feat: set start month based on selected date * fix: Properly handle async job state with celery tasks (#1114) * merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * PSv2: Implement queue clean-up upon job completion (#1113) * merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> * fix: PSv2: Workers should not try to fetch tasks from v1 jobs (#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> * PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> * track captures and failures * Update tests, CR feedback, log error images * CR feedback * fix type checking * refactor: rename _get_progress to _commit_update in TaskStateManager Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in #1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com> * fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass - Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com> * refactor: rename TaskProgress to JobStateProgress Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com> * docs: update NATS todo and planning docs with session learnings Mark connection handling as done (PR #1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com> * Rename TaskStateManager to AsyncJobStateManager * Track results counts in the job itself vs Redis * small simplification * Reset counts to 0 on reset * chore: remove local planning docs from PR branch Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify three-layer job state architecture in docstrings Explain the relationship between AsyncJobStateManager (Redis), JobProgress (JSONB), and JobState (enum). Clarify that all counts in JobStateProgress refer to source images (captures). Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Anna Viklund <annamariaviklund@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* merge * Tests for async result processing * fix formatting * CR feedback * refactor: add public get_progress() to TaskStateManager Add a read-only get_progress(stage) method that returns a progress snapshot without acquiring a lock or mutating state. Use it in test_tasks.py instead of calling the private _get_progress() directly. Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify Celery .delay() vs .apply_async() calling convention Three reviewers were confused by how mock.call_args works here. .delay(**kw) passes ((), kw) as two positional args to apply_async, which is different from apply_async(kwargs=kw). Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* merge * Update ML job counts in async case * Update date picker version and tweak layout logic (RolnickLab#1105) * fix: update date picker version and tweak layout logic * feat: set start month based on selected date * fix: Properly handle async job state with celery tasks (RolnickLab#1114) * merge * fix: Properly handle async job state with celery tasks * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Delete implemented plan --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * PSv2: Implement queue clean-up upon job completion (RolnickLab#1113) * merge * feat: PSv2 - Queue/redis clean-up upon job completion * fix: catch specific exception * chore: move tests to a subdir --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> * fix: PSv2: Workers should not try to fetch tasks from v1 jobs (RolnickLab#1118) Introduces the dispatch_mode field on the Job model to track how each job dispatches its workload. This allows API clients (including the AMI worker) to filter jobs by dispatch mode — for example, fetching only async_api jobs so workers don't pull synchronous or internal jobs. JobDispatchMode enum (ami/jobs/models.py): internal — work handled entirely within the platform (Celery worker, no external calls). Default for all jobs. sync_api — worker calls an external processing service API synchronously and waits for each response. async_api — worker publishes items to NATS for external processing service workers to pick up independently. Database and Model Changes: Added dispatch_mode CharField with TextChoices, defaulting to internal, with the migration in ami/jobs/migrations/0019_job_dispatch_mode.py. ML jobs set dispatch_mode = async_api when the project's async_pipeline_workers feature flag is enabled. ML jobs set dispatch_mode = sync_api on the synchronous processing path (previously unset). API and Filtering: dispatch_mode is exposed (read-only) in job list and detail serializers. Filterable via query parameter: ?dispatch_mode=async_api The /tasks endpoint now returns 400 for non-async_api jobs, since only those have NATS tasks to fetch. Architecture doc: docs/claude/job-dispatch-modes.md documents the three modes, naming decisions, and per-job-type mapping. --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com> * PSv2 cleanup: use is_complete() and dispatch_mode in job progress handler (RolnickLab#1125) * refactor: use is_complete() and dispatch_mode in job progress handler Replace hardcoded `stage == "results"` check with `job.progress.is_complete()` which verifies ALL stages are done, making it work for any job type. Replace feature flag check in cleanup with `dispatch_mode == ASYNC_API` which is immutable for the job's lifetime and more correct than re-reading a mutable flag that could change between job creation and completion. Co-Authored-By: Claude <noreply@anthropic.com> * test: update cleanup tests for is_complete() and dispatch_mode checks Set dispatch_mode=ASYNC_API on test jobs to match the new cleanup guard. Complete all stages (collect, process, results) in the completion test since is_complete() correctly requires all stages to be done. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> * track captures and failures * Update tests, CR feedback, log error images * CR feedback * fix type checking * refactor: rename _get_progress to _commit_update in TaskStateManager Clarify naming to distinguish mutating vs read-only methods: - _commit_update(): private, writes mutations to Redis, returns progress - get_progress(): public, read-only snapshot (added in RolnickLab#1129) - update_state(): public API, acquires lock, calls _commit_update() Co-Authored-By: Claude <noreply@anthropic.com> * fix: unify FAILURE_THRESHOLD and convert TaskProgress to dataclass - Single FAILURE_THRESHOLD constant in tasks.py, imported by models.py - Fix async path to use `> FAILURE_THRESHOLD` (was `>=`) to match the sync path's boundary behavior at exactly 50% - Convert TaskProgress from namedtuple to dataclass with defaults, so new fields don't break existing callers Co-Authored-By: Claude <noreply@anthropic.com> * refactor: rename TaskProgress to JobStateProgress Clarify that this dataclass tracks job-level progress in Redis, not individual task/image progress. Aligns with the naming of JobProgress (the Django/Pydantic model equivalent). Co-Authored-By: Claude <noreply@anthropic.com> * docs: update NATS todo and planning docs with session learnings Mark connection handling as done (PR RolnickLab#1130), add worktree/remote mapping and docker testing notes for future sessions. Co-Authored-By: Claude <noreply@anthropic.com> * Rename TaskStateManager to AsyncJobStateManager * Track results counts in the job itself vs Redis * small simplification * Reset counts to 0 on reset * chore: remove local planning docs from PR branch Co-Authored-By: Claude <noreply@anthropic.com> * docs: clarify three-layer job state architecture in docstrings Explain the relationship between AsyncJobStateManager (Redis), JobProgress (JSONB), and JobState (enum). Clarify that all counts in JobStateProgress refer to source images (captures). Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Carlos Garcia Jurado Suarez <carlos@irreverentlabs.com> Co-authored-by: Anna Viklund <annamariaviklund@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
Summary
This pull request adds tests for the result processing of async tasks.
It also adds a helper script and launch target for debugging tests.
Checklist
Summary by CodeRabbit
Tests
Chores
Refactor