Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Attributes/Queueable.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class Queueable
* @param int|null $retryAfter
* @param int|null $delayFor
* @param string|null $onQueue
* @param int|null $timeout
*/
public function __construct(
public ?int $tries = null,
public ?int $retryAfter = null,
public ?int $delayFor = null,
public ?string $onQueue = null
public ?string $onQueue = null,
public ?int $timeout = null
) {}
}
7 changes: 7 additions & 0 deletions src/Contracts/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ public function getJobId(): ?string;
* @return void
*/
public function setJobId(string $id): void;

/**
* Get the timeout in seconds.
*
* @return int|null
*/
public function getTimeout(): ?int;
}
8 changes: 8 additions & 0 deletions src/Exceptions/JobTimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Doppar\Queue\Exceptions;

class JobTimeoutException extends \RuntimeException
{
//
}
4 changes: 4 additions & 0 deletions src/InteractsWithQueueableAttributes.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ protected function applyQueueableAttributes(): void
if ($attribute->onQueue !== null) {
$this->queueName = $attribute->onQueue;
}

if ($attribute->timeout !== null) {
$this->timeout = $attribute->timeout;
}
}
}
}
17 changes: 17 additions & 0 deletions src/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ abstract class Job implements JobInterface
*/
public $attempts = 0;

/**
* Maximum execution time in seconds.
*
* @var int|null
*/
public $timeout = null;

/**
* Get the number of times the job may be attempted.
*
Expand Down Expand Up @@ -150,6 +157,16 @@ public function delayFor(int $delay): self
return $this;
}

/**
* Get the timeout in seconds.
*
* @return int|null
*/
public function getTimeout(): ?int
{
return $this->timeout;
}

/**
* Dispatch the job to the queue.
*
Expand Down
46 changes: 44 additions & 2 deletions src/QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Doppar\Queue;

use Doppar\Queue\Models\QueueJob;
use Doppar\Queue\Exceptions\JobTimeoutException;
use Doppar\Queue\Contracts\JobInterface;

class QueueWorker
Expand Down Expand Up @@ -187,8 +188,8 @@ protected function processJob(QueueJob $queueJob): void
($this->onJobProcessing)($job);
}

// Execute the job
$this->executeJob($job);
// Execute the job with timeout
$this->executeJobWithTimeout($job);

// Delete the job from queue if successful
$this->manager->delete($queueJob);
Expand All @@ -202,6 +203,47 @@ protected function processJob(QueueJob $queueJob): void
}
}

/**
* Execute a job with timeout protection.
*
* @param JobInterface $job
* @return void
* @throws \Throwable
*/
protected function executeJobWithTimeout(JobInterface $job): void
{
$timeout = $job->getTimeout();

if ($timeout === null || !extension_loaded('pcntl')) {
// No timeout or pcntl not available, execute normally
$this->executeJob($job);
return;
}

// Set up timeout handler
$timedOut = false;

pcntl_signal(SIGALRM, function () use (&$timedOut) {
$timedOut = true;
});

pcntl_alarm($timeout);

try {
$this->executeJob($job);
pcntl_alarm(0);
} catch (\Throwable $e) {
pcntl_alarm(0);
throw $e;
}

if ($timedOut) {
throw new JobTimeoutException(
"Job exceeded maximum execution time of {$timeout} seconds"
);
}
}

/**
* Execute a job.
*
Expand Down