Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/Model/Table/QueuedJobsTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,31 @@ protected function jobTask(string $jobType): string {
}

/**
* Whether a not-yet-completed job exists for the given reference.
*
* By default any row with `completed IS NULL` counts — including a job
* that was fetched by a worker which then died (OOM, timeout, kill)
* without ever marking the row completed or failed. Such a row stays
* `completed IS NULL` indefinitely, so callers that gate on `isQueued()`
* (e.g. a non-concurrent scheduler) can wedge permanently behind a job
* that will never make progress.
*
* Pass `$staleTimeout` (seconds) to discount those abandoned rows: a row
* that was fetched longer ago than the timeout and is still not completed
* is presumed dead and no longer counts as queued. Rows that have not been
* fetched yet, or were fetched within the window, still count. The default
* `null` preserves the original behaviour and is fully backward compatible.
*
* @param string $reference
* @param string|null $jobTask
* @param int|null $staleTimeout Seconds after which a fetched-but-not-completed
* row is treated as abandoned and excluded. `null` (default) disables the check.
*
* @throws \InvalidArgumentException
*
* @return bool
*/
public function isQueued(string $reference, ?string $jobTask = null): bool {
public function isQueued(string $reference, ?string $jobTask = null, ?int $staleTimeout = null): bool {
if (!$reference) {
throw new InvalidArgumentException('A reference is needed');
}
Expand All @@ -295,6 +312,12 @@ public function isQueued(string $reference, ?string $jobTask = null): bool {
if ($jobTask) {
$conditions['job_task'] = $jobTask;
}
if ($staleTimeout !== null) {
$conditions['OR'] = [
'fetched IS' => null,
'fetched >=' => $this->getDateTime()->subSeconds($staleTimeout),
];
}

return (bool)$this->find()->where($conditions)->select(['id'])->first();
}
Expand Down
41 changes: 41 additions & 0 deletions tests/TestCase/Model/Table/QueuedJobsTableTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,47 @@ public function testIsQueued() {
$this->assertFalse($result);
}

/**
* A job fetched longer ago than the stale timeout, but never completed,
* is presumed abandoned and excluded once `$staleTimeout` is given.
*
* @return void
*/
public function testIsQueuedStaleTimeout() {
$queuedJob = $this->QueuedJobs->newEntity([
'key' => 'key',
'job_task' => 'FooBar',
'reference' => 'foo-bar',
'fetched' => (new DateTime())->subSeconds(120),
]);
$this->QueuedJobs->saveOrFail($queuedJob);

// Without the timeout the abandoned row still counts (unchanged default).
$this->assertTrue($this->QueuedJobs->isQueued('foo-bar'));

// Fetched 120s ago, timeout 60s => presumed dead, excluded.
$this->assertFalse($this->QueuedJobs->isQueued('foo-bar', null, 60));

// Fetched 120s ago, timeout 300s => still within window, counts.
$this->assertTrue($this->QueuedJobs->isQueued('foo-bar', null, 300));
}

/**
* A not-yet-fetched job always counts as queued, regardless of timeout.
*
* @return void
*/
public function testIsQueuedStaleTimeoutIgnoresUnfetched() {
$queuedJob = $this->QueuedJobs->newEntity([
'key' => 'key',
'job_task' => 'FooBar',
'reference' => 'foo-bar',
]);
$this->QueuedJobs->saveOrFail($queuedJob);

$this->assertTrue($this->QueuedJobs->isQueued('foo-bar', null, 1));
}

/**
* @return void
*/
Expand Down
Loading