diff --git a/src/Model/Table/QueuedJobsTable.php b/src/Model/Table/QueuedJobsTable.php index d94e07ae..9065fac9 100644 --- a/src/Model/Table/QueuedJobsTable.php +++ b/src/Model/Table/QueuedJobsTable.php @@ -276,14 +276,31 @@ protected function jobTask(string $jobType): string { } /** + * Whether a not-yet-completed job exists for the given reference. + * + * By default any row with `completed IS NULL` counts — including a job + * that was fetched by a worker which then died (OOM, timeout, kill) + * without ever marking the row completed or failed. Such a row stays + * `completed IS NULL` indefinitely, so callers that gate on `isQueued()` + * (e.g. a non-concurrent scheduler) can wedge permanently behind a job + * that will never make progress. + * + * Pass `$staleTimeout` (seconds) to discount those abandoned rows: a row + * that was fetched longer ago than the timeout and is still not completed + * is presumed dead and no longer counts as queued. Rows that have not been + * fetched yet, or were fetched within the window, still count. The default + * `null` preserves the original behaviour and is fully backward compatible. + * * @param string $reference * @param string|null $jobTask + * @param int|null $staleTimeout Seconds after which a fetched-but-not-completed + * row is treated as abandoned and excluded. `null` (default) disables the check. * * @throws \InvalidArgumentException * * @return bool */ - public function isQueued(string $reference, ?string $jobTask = null): bool { + public function isQueued(string $reference, ?string $jobTask = null, ?int $staleTimeout = null): bool { if (!$reference) { throw new InvalidArgumentException('A reference is needed'); } @@ -295,6 +312,12 @@ public function isQueued(string $reference, ?string $jobTask = null): bool { if ($jobTask) { $conditions['job_task'] = $jobTask; } + if ($staleTimeout !== null) { + $conditions['OR'] = [ + 'fetched IS' => null, + 'fetched >=' => $this->getDateTime()->subSeconds($staleTimeout), + ]; + } return (bool)$this->find()->where($conditions)->select(['id'])->first(); } diff --git a/tests/TestCase/Model/Table/QueuedJobsTableTest.php b/tests/TestCase/Model/Table/QueuedJobsTableTest.php index 3dd24c0d..3843c69d 100644 --- a/tests/TestCase/Model/Table/QueuedJobsTableTest.php +++ b/tests/TestCase/Model/Table/QueuedJobsTableTest.php @@ -834,6 +834,47 @@ public function testIsQueued() { $this->assertFalse($result); } + /** + * A job fetched longer ago than the stale timeout, but never completed, + * is presumed abandoned and excluded once `$staleTimeout` is given. + * + * @return void + */ + public function testIsQueuedStaleTimeout() { + $queuedJob = $this->QueuedJobs->newEntity([ + 'key' => 'key', + 'job_task' => 'FooBar', + 'reference' => 'foo-bar', + 'fetched' => (new DateTime())->subSeconds(120), + ]); + $this->QueuedJobs->saveOrFail($queuedJob); + + // Without the timeout the abandoned row still counts (unchanged default). + $this->assertTrue($this->QueuedJobs->isQueued('foo-bar')); + + // Fetched 120s ago, timeout 60s => presumed dead, excluded. + $this->assertFalse($this->QueuedJobs->isQueued('foo-bar', null, 60)); + + // Fetched 120s ago, timeout 300s => still within window, counts. + $this->assertTrue($this->QueuedJobs->isQueued('foo-bar', null, 300)); + } + + /** + * A not-yet-fetched job always counts as queued, regardless of timeout. + * + * @return void + */ + public function testIsQueuedStaleTimeoutIgnoresUnfetched() { + $queuedJob = $this->QueuedJobs->newEntity([ + 'key' => 'key', + 'job_task' => 'FooBar', + 'reference' => 'foo-bar', + ]); + $this->QueuedJobs->saveOrFail($queuedJob); + + $this->assertTrue($this->QueuedJobs->isQueued('foo-bar', null, 1)); + } + /** * @return void */