Atomic provides a queue manager under Engine\Atomic\Queue\Managers\Manager with database and Redis drivers.
use Engine\Atomic\Queue\Managers\Manager;
$queue = new Manager('default');
$queue->push(
[App\Jobs\SendEmail::class, 'handle'],
['email' => 'user@example.com', 'subject' => 'Hello']
);
$queue->push(
[App\Jobs\SendEmail::class, 'handle'],
['user@example.com', 'Hello']
);
$queue->push(
[App\Jobs\SendEmail::class, 'handle'],
['email' => 'user@example.com'],
[
'delay' => 60,
'priority' => 1,
'timeout' => 30,
'max_attempts' => 3,
'retry_delay' => 10,
'ttl' => 3600,
]
);push() signature:
push(array $payload, array $data = [], array $options = [], string $uuid = ''): boolHandler rules:
- handler format must be
[ClassName::class, 'method'] - the class and method are validated before enqueueing
- indexed
datais passed to the handler positionally - associative
datais matched to handler parameter names via reflection
Required queue options are:
delayprioritytimeoutmax_attemptsretry_delayttl
If an option is omitted in push(), the manager fills it from the configured queue definition.
The manager also exposes these runtime methods:
pop_batch()process_job(array $job)release(array $job, int $delay)mark_failed(array $job, \Throwable $exception)mark_completed(array $job)cancel(string $uuid)Redis onlyis_cancel_requested(string $uuid)Redis onlymark_cancelled(array $job, ?string $reason = null)Redis onlyretry()retry_by_uuid(string $uuid)delete_job(string $uuid)
Queue telemetry is emitted automatically through TelemetryManager when jobs are created, fetched, retried, completed, failed, recovered, cancellation-requested, or cancelled.
Queue job state values are:
pendingrunningcancel_requestedcancelledcompletedfailed
Use state as the standard queue job state field. UI/API filters should use state.
Telemetry state filters:
?state=pending
?state=running
?state=cancel_requested
?state=cancelled
?state=completed
?state=failed
Redis stores every active and finished job in its registry with a state field and keeps state-specific sorted indexes. The database driver maps rows in the active jobs table to pending or running, rows in the failed table to failed, and rows in the completed table to completed. Database cancellation states are not available.
php atomic queue/db
php atomic queue/worker <queue_name>
php atomic queue/monitor
php atomic queue/retry
php atomic queue/retry <job_uuid>
php atomic queue/retry <queue_name>
php atomic queue/delete <job_uuid>Test helpers exposed by Engine\Atomic\App\System:
php atomic queue/test success
php atomic queue/test failed
php atomic queue/test timeout
php atomic queue/test cancel_requested
php atomic queue/test cancelled
php atomic queue/test all
php atomic queue/test/monitor
php atomic queue/test/monitor defaultqueue/test enqueues Engine\Atomic\Queue\Tests\TestJob::<method> on the
default queue.
Supported types: success, failed, timeout, cancel_requested, cancelled,
all (aliases: completed, failure, cancel, canceled, request_cancelled,
request_canceled).
queue/test/monitor seeds stuck or in-progress test jobs for the current queue
driver so queue/monitor can be validated.
Job cancellation is supported by the Redis queue driver. The database queue
driver throws a RuntimeException for cancellation APIs, and
Manager::supports_cancel() returns false.
Cancel by UUID:
$cancelled_or_requested = $queue->cancel($uuid);Flow:
pendingjobs move directly tocancelled.runningjobs move tocancel_requested, the optionalcancel_handlerruns, the worker PID receivesSIGUSR1, then the worker marks the jobcancelled.completed,failed, andcancelledjobs are no-op.- Cancelled jobs are never retried.
Cancellation is cooperative for running jobs. The framework requests cancellation and signals the worker, but PHP code that is blocked in I/O, an HTTP request, or a long CPU loop may not observe the signal until control returns to PHP. Long-running jobs should be split into chunks and should keep their own cleanup in try/catch/finally blocks.
State guarantees for running jobs:
- If the worker receives
SIGUSR1andJobCancelledExceptionreaches the worker, the worker marks the jobcancelled. - If job code catches
JobCancelledException, cleans up, and rethrows it, the worker marks the jobcancelled. - If job code catches
JobCancelledExceptionand returns normally,mark_completed()still checks the queue state and converts the job tocancelledinstead ofcompleted. - If a cancel-requested job becomes stuck, the monitor treats it as cancelled instead of retrying it.
Recommended local cleanup pattern:
use Engine\Atomic\Queue\Exceptions\JobCancelledException;
class ImportJob
{
public function handle(int $import_id): void
{
try {
foreach ($this->chunks($import_id) as $chunk) {
$this->process_chunk($chunk);
}
} catch (JobCancelledException $e) {
$this->cleanup_local_state($import_id);
throw $e;
} finally {
$this->close_open_resources();
}
}
}Add optional cancel handler when pushing:
$queue->push(
[ImportJob::class, 'handle'],
['import_id' => 10],
['cancel_handler' => [ImportJob::class, 'cancel']]
);Short form uses same class as main queued handler:
$queue->push(
[ImportJob::class, 'handle'],
['import_id' => 10],
['cancel_handler' => 'cancel']
);Cancel handler receives original job data. If it accepts $job, add named job parameter.
cancel_handler runs in the process that calls $queue->cancel($uuid), not inside the already-running worker job. Use it for external cleanup such as marking application state as cancelling, deleting shared temporary files, or calling a provider API to cancel remote work. Use try/catch/finally in the job handler for resources owned by the running worker process.
Local cancel ($queue->cancel($uuid)) updates Atomic queue state and signals local worker PID. Remote APIs owned by job code must be cancelled by your cancel_handler (for example, call provider API to stop remote import/transcode/task).
Queue options must be serializable. Closures are not allowed in cancel_handler or other queue options; use [ClassName::class, 'method'] or 'method'.