From 9ee6546e28c18206f9a41ead8c59ec9026cdf450 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 13:43:50 +0600 Subject: [PATCH 1/8] chain job --- src/Conductor.php | 214 ++++++++++++++++++++++++++++++++++++++++++++ src/Job.php | 123 +++++++++++++++++++++++++ src/QueueWorker.php | 10 +++ 3 files changed, 347 insertions(+) create mode 100644 src/Conductor.php diff --git a/src/Conductor.php b/src/Conductor.php new file mode 100644 index 0000000..1e3ed96 --- /dev/null +++ b/src/Conductor.php @@ -0,0 +1,214 @@ + + */ + protected array $jobs = []; + + /** + * The queue name for the chain. + * + * @var string + */ + protected string $queueName = 'default'; + + /** + * The delay before starting the chain. + * + * @var int + */ + protected int $delay = 0; + + /** + * Callback to run when all jobs complete. + * + * @var callable|null + */ + protected $onComplete = null; + + /** + * Callback to run when any job fails. + * + * @var callable|null + */ + protected $onFailure = null; + + /** + * Chain identifier. + * + * @var string + */ + protected string $chainId; + + /** + * Create a new job chain. + * + * @param array $jobs + */ + public function __construct(array $jobs = []) + { + $this->jobs = array_values($jobs); + $this->chainId = $this->generateChainId(); + } + + /** + * Create a new job chain. + * + * @param array $jobs + * @return static + */ + public static function create(array $jobs): static + { + return new static($jobs); + } + + /** + * Add a job to the chain. + * + * @param JobInterface $job + * @return $this + */ + public function add(JobInterface $job): self + { + $this->jobs[] = $job; + + return $this; + } + + /** + * Set the queue for all jobs in the chain. + * + * @param string $queue + * @return $this + */ + public function onQueue(string $queue): self + { + $this->queueName = $queue; + + return $this; + } + + /** + * Set the delay before starting the chain. + * + * @param int $seconds + * @return $this + */ + public function delay(int $seconds): self + { + $this->delay = $seconds; + + return $this; + } + + /** + * Set callback to run when all jobs complete. + * + * @param callable $callback + * @return $this + */ + public function then(callable $callback): self + { + $this->onComplete = $callback; + + return $this; + } + + /** + * Set callback to run when any job fails. + * + * @param callable $callback + * @return $this + */ + public function catch(callable $callback): self + { + $this->onFailure = $callback; + + return $this; + } + + /** + * Dispatch the job chain. + * + * Only the FIRST job is pushed to queue. + * Each job will push the next one after successful completion. + * + * @return string + */ + public function dispatch(): string + { + if (empty($this->jobs)) { + throw new \InvalidArgumentException('Cannot dispatch an empty job chain'); + } + + // Prepare the first job with chain context + $firstJob = $this->jobs[0]; + + // Attach chain metadata to first job + $firstJob->chainId = $this->chainId; + $firstJob->chainJobs = $this->jobs; + $firstJob->chainIndex = 0; + $firstJob->chainOnComplete = $this->onComplete; + $firstJob->chainOnFailure = $this->onFailure; + + // Set queue and delay + $firstJob->onQueue($this->queueName); + $firstJob->delayFor($this->delay); + + // Push ONLY the first job to queue + $firstJob->forceQueue(); + + return $this->chainId; + } + + /** + * Execute the chain synchronously. + * + * @return void + */ + public function dispatchSync(): void + { + foreach ($this->jobs as $index => $job) { + try { + $job->handle(); + } catch (\Throwable $e) { + if ($this->onFailure) { + ($this->onFailure)($job, $e, $index); + } + throw $e; + } + } + + if ($this->onComplete) { + ($this->onComplete)(); + } + } + + /** + * Get the jobs in the chain. + * + * @return array + */ + public function getJobs(): array + { + return $this->jobs; + } + + /** + * Generate a unique chain ID. + * + * @return string + */ + protected function generateChainId(): string + { + return 'chain_' . uniqid('', true); + } +} diff --git a/src/Job.php b/src/Job.php index 06431da..4db2e3f 100644 --- a/src/Job.php +++ b/src/Job.php @@ -59,6 +59,41 @@ abstract class Job implements JobInterface */ public $timeout = null; + /** + * Chain identifier (if this job is part of a chain). + * + * @var string|null + */ + public $chainId = null; + + /** + * All jobs in the chain. + * + * @var array|null + */ + public $chainJobs = null; + + /** + * Current position in the chain. + * + * @var int|null + */ + public $chainIndex = null; + + /** + * Chain completion callback. + * + * @var callable|null + */ + public $chainOnComplete = null; + + /** + * Chain failure callback. + * + * @var callable|null + */ + public $chainOnFailure = null; + /** * Get the number of times the job may be attempted. * @@ -252,4 +287,92 @@ public function forceQueue(): string return Queue::push($this); } + + /** + * Chain jobs to run after this job completes. + * + * @param array $jobs + * @return Conductor + */ + public function chain(array $jobs): Conductor + { + array_unshift($jobs, $this); + + return new Conductor($jobs); + } + + /** + * Create a job chain starting with this job. + * + * @param array $jobs + * @return Conductor + */ + public static function withChain(array $jobs): Conductor + { + return Conductor::create($jobs); + } + + /** + * Check if this job is part of a chain. + * + * @return bool + */ + public function isChained(): bool + { + return $this->chainId !== null && $this->chainJobs !== null; + } + + /** + * Dispatch the next job in the chain. + * + * @return void + */ + public function dispatchNextChainJob(): void + { + if (!$this->isChained()) { + return; + } + + $nextIndex = $this->chainIndex + 1; + + // Check if there are more jobs in the chain + if ($nextIndex >= count($this->chainJobs)) { + // Chain completed successfully + if ($this->chainOnComplete) { + ($this->chainOnComplete)(); + } + return; + } + + // Get the next job + $nextJob = $this->chainJobs[$nextIndex]; + + // Attach chain context to next job + $nextJob->chainId = $this->chainId; + $nextJob->chainJobs = $this->chainJobs; + $nextJob->chainIndex = $nextIndex; + $nextJob->chainOnComplete = $this->chainOnComplete; + $nextJob->chainOnFailure = $this->chainOnFailure; + $nextJob->queueName = $this->queueName; + + // Push the next job to queue + Queue::push($nextJob); + } + + /** + * Handle chain failure. + * + * @param \Throwable $exception + * @return void + */ + public function handleChainFailure(\Throwable $exception): void + { + if (!$this->isChained()) { + return; + } + + if ($this->chainOnFailure) { + ($this->chainOnFailure)($this, $exception, $this->chainIndex); + } + } } diff --git a/src/QueueWorker.php b/src/QueueWorker.php index b58d92b..204f536 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -194,6 +194,11 @@ protected function processJob(QueueJob $queueJob): void // Delete the job from queue if successful $this->manager->delete($queueJob); + // Dispatch next job in chain if this job is chained + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + // Trigger onJobProcessed callback if (is_callable($this->onJobProcessed)) { ($this->onJobProcessed)($job); @@ -279,6 +284,11 @@ protected function handleJobException(QueueJob $queueJob, ?JobInterface $job, \T return; } + // Handle chain failure - chain stops here + if ($job->isChained()) { + $job->handleChainFailure($exception); + } + // Check if job should be retried if ($queueJob->attempts < $job->tries()) { // Release the job back to the queue with delay From b60d8da680a210a34ed2b0fdcfc8f49f71f8c42e Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 14:02:34 +0600 Subject: [PATCH 2/8] Conductor to Drain class name changed --- src/{Conductor.php => Drain.php} | 4 ++-- src/Job.php | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) rename src/{Conductor.php => Drain.php} (98%) diff --git a/src/Conductor.php b/src/Drain.php similarity index 98% rename from src/Conductor.php rename to src/Drain.php index 1e3ed96..8ec776a 100644 --- a/src/Conductor.php +++ b/src/Drain.php @@ -4,7 +4,7 @@ use Doppar\Queue\Contracts\JobInterface; -class Conductor +class Drain { /** * The jobs in the chain. @@ -65,7 +65,7 @@ public function __construct(array $jobs = []) * @param array $jobs * @return static */ - public static function create(array $jobs): static + public static function conduct(array $jobs): static { return new static($jobs); } diff --git a/src/Job.php b/src/Job.php index 4db2e3f..68e31c0 100644 --- a/src/Job.php +++ b/src/Job.php @@ -292,24 +292,24 @@ public function forceQueue(): string * Chain jobs to run after this job completes. * * @param array $jobs - * @return Conductor + * @return Drain */ - public function chain(array $jobs): Conductor + public function chain(array $jobs): Drain { array_unshift($jobs, $this); - return new Conductor($jobs); + return new Drain($jobs); } /** * Create a job chain starting with this job. * * @param array $jobs - * @return Conductor + * @return Drain */ - public static function withChain(array $jobs): Conductor + public static function withChain(array $jobs): Drain { - return Conductor::create($jobs); + return Drain::create($jobs); } /** From 87ec954aa87b371cccdaffac664ddbd06ee01664 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 15:46:39 +0600 Subject: [PATCH 3/8] closure support for chainable job process --- src/Drain.php | 22 ++++++++++++++++++++-- src/Job.php | 26 ++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/Drain.php b/src/Drain.php index 8ec776a..88e93d5 100644 --- a/src/Drain.php +++ b/src/Drain.php @@ -3,6 +3,8 @@ namespace Doppar\Queue; use Doppar\Queue\Contracts\JobInterface; +use function Opis\Closure\serialize as serialize_closure; +use function Opis\Closure\unserialize as unserialize_closure; class Drain { @@ -117,6 +119,10 @@ public function delay(int $seconds): self */ public function then(callable $callback): self { + if ($callback instanceof \Closure) { + $callback = serialize_closure($callback); + } + $this->onComplete = $callback; return $this; @@ -130,6 +136,10 @@ public function then(callable $callback): self */ public function catch(callable $callback): self { + if ($callback instanceof \Closure) { + $callback = serialize_closure($callback); + } + $this->onFailure = $callback; return $this; @@ -181,14 +191,22 @@ public function dispatchSync(): void $job->handle(); } catch (\Throwable $e) { if ($this->onFailure) { - ($this->onFailure)($job, $e, $index); + $callback = is_string($this->onFailure) + ? unserialize_closure($this->onFailure) + : $this->onFailure; + + $callback($job, $e, $index); } throw $e; } } if ($this->onComplete) { - ($this->onComplete)(); + $callback = is_string($this->onComplete) + ? unserialize_closure($this->onComplete) + : $this->onComplete; + + $callback(); } } diff --git a/src/Job.php b/src/Job.php index 68e31c0..cc3e9d6 100644 --- a/src/Job.php +++ b/src/Job.php @@ -4,6 +4,7 @@ use Doppar\Queue\Facades\Queue; use Doppar\Queue\Contracts\JobInterface; +use function Opis\Closure\{serialize, unserialize}; abstract class Job implements JobInterface { @@ -339,7 +340,10 @@ public function dispatchNextChainJob(): void if ($nextIndex >= count($this->chainJobs)) { // Chain completed successfully if ($this->chainOnComplete) { - ($this->chainOnComplete)(); + $callback = $this->unserializeCallback($this->chainOnComplete); + if (is_callable($callback)) { + $callback(); + } } return; } @@ -372,7 +376,25 @@ public function handleChainFailure(\Throwable $exception): void } if ($this->chainOnFailure) { - ($this->chainOnFailure)($this, $exception, $this->chainIndex); + $callback = $this->unserializeCallback($this->chainOnFailure); + if (is_callable($callback)) { + $callback($this, $exception, $this->chainIndex); + } } } + + /** + * Unserialize chain callback if it's a serialized closure. + * + * @param mixed $callback + * @return callable|null + */ + protected function unserializeCallback($callback): ?callable + { + if (is_string($callback) && str_contains($callback, '"Opis\Closure\Box":')) { + return unserialize($callback); + } + + return $callback; + } } From 6ac60a9b70ca862c0c18644743dfdbe46796f357 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 16:05:36 +0600 Subject: [PATCH 4/8] all chaiable job get attributes values issue resolved: --- src/Job.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Job.php b/src/Job.php index cc3e9d6..ad4b116 100644 --- a/src/Job.php +++ b/src/Job.php @@ -351,6 +351,9 @@ public function dispatchNextChainJob(): void // Get the next job $nextJob = $this->chainJobs[$nextIndex]; + // Apply queueable attributes to the next job + $nextJob->applyQueueableAttributes(); + // Attach chain context to next job $nextJob->chainId = $this->chainId; $nextJob->chainJobs = $this->chainJobs; From 28ea493ae1b2f56baf6fcc5e313b1329cc201d80 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 16:39:11 +0600 Subject: [PATCH 5/8] unit test for chaiable job --- composer.json | 4 +- tests/Mock/Class/ChainTestState.php | 13 + tests/Mock/Jobs/TestChainFailingJob.php | 27 ++ tests/Mock/Jobs/TestChainJobA.php | 26 ++ tests/Mock/Jobs/TestChainJobB.php | 26 ++ tests/Mock/Jobs/TestChainJobC.php | 26 ++ tests/Unit/QueueSystemTest.php | 518 +++++++++++++++++++++++- 7 files changed, 638 insertions(+), 2 deletions(-) create mode 100644 tests/Mock/Class/ChainTestState.php create mode 100644 tests/Mock/Jobs/TestChainFailingJob.php create mode 100644 tests/Mock/Jobs/TestChainJobA.php create mode 100644 tests/Mock/Jobs/TestChainJobB.php create mode 100644 tests/Mock/Jobs/TestChainJobC.php diff --git a/composer.json b/composer.json index 702979a..2055786 100644 --- a/composer.json +++ b/composer.json @@ -42,5 +42,7 @@ "sort-packages": true }, "minimum-stability": "dev", - "require": {} + "require": { + "opis/closure": "^4.4" + } } diff --git a/tests/Mock/Class/ChainTestState.php b/tests/Mock/Class/ChainTestState.php new file mode 100644 index 0000000..811ef41 --- /dev/null +++ b/tests/Mock/Class/ChainTestState.php @@ -0,0 +1,13 @@ +callbackCalled = true; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainFailingJob.php b/tests/Mock/Jobs/TestChainFailingJob.php new file mode 100644 index 0000000..bf16087 --- /dev/null +++ b/tests/Mock/Jobs/TestChainFailingJob.php @@ -0,0 +1,27 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + throw new \RuntimeException('Intentional chain failure'); + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobA.php b/tests/Mock/Jobs/TestChainJobA.php new file mode 100644 index 0000000..ec51803 --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobA.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobB.php b/tests/Mock/Jobs/TestChainJobB.php new file mode 100644 index 0000000..a2d436b --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobB.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobC.php b/tests/Mock/Jobs/TestChainJobC.php new file mode 100644 index 0000000..441b2c4 --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobC.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} diff --git a/tests/Unit/QueueSystemTest.php b/tests/Unit/QueueSystemTest.php index 112b0c1..326a335 100644 --- a/tests/Unit/QueueSystemTest.php +++ b/tests/Unit/QueueSystemTest.php @@ -20,9 +20,15 @@ use Doppar\Queue\Tests\Mock\Jobs\TestEmailJob; use Doppar\Queue\Tests\Mock\Jobs\TestCounterJob; use Doppar\Queue\Tests\Mock\Jobs\TestComplexDataJob; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobC; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobB; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobA; +use Doppar\Queue\Tests\Mock\Jobs\TestChainFailingJob; +use Doppar\Queue\Tests\Mock\Class\ChainTestState; use Doppar\Queue\QueueWorker; use Doppar\Queue\QueueManager; use Doppar\Queue\Facades\Queue; +use Doppar\Queue\Drain; class QueueSystemTest extends TestCase { @@ -48,6 +54,11 @@ protected function setUp(): void $this->manager = new QueueManager(); $this->worker = new QueueWorker($this->manager); + + TestChainJobA::reset(); + TestChainJobB::reset(); + TestChainJobC::reset(); + TestChainFailingJob::reset(); } protected function tearDown(): void @@ -630,7 +641,7 @@ public function testJobIdGeneration(): void public function testAvailableJobsScope(): void { - // Create two available jobs + // Create two available jobs $job1 = new TestEmailJob('test1@example.com', 'Subject 1'); Queue::push($job1); @@ -690,4 +701,509 @@ public function testEndToEndJobProcessing(): void $this->assertEquals(0, Queue::size('default')); $this->assertEquals(0, MockFailedJob::count()); } + + // ===================================================== + // TEST JOB CHAINING + // ===================================================== + + public function testJobChainCreation(): void + { + $chain = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ]); + + $this->assertInstanceOf(Drain::class, $chain); + $this->assertCount(3, $chain->getJobs()); + } + + public function testJobChainDispatchOnlyFirstJob(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $this->assertStringStartsWith('chain_', $chainId); + + // Verify only ONE job in queue + $queueSize = Queue::size('default'); + $this->assertEquals(1, $queueSize, 'Only first job should be in queue'); + + // Verify it's the first job + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobA::class, $unserializedJob); + } + + public function testJobChainStateInPayload(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $queueJob = MockQueueJob::first(); + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + + // Verify chain properties + $this->assertTrue($unserializedJob->isChained()); + $this->assertEquals($chainId, $unserializedJob->chainId); + $this->assertCount(3, $unserializedJob->chainJobs); + $this->assertEquals(0, $unserializedJob->chainIndex); + } + + public function testJobChainSequentialExecution(): void + { + Drain::conduct([ + new TestChainJobA('Job1'), + new TestChainJobB('Job2'), + new TestChainJobC('Job3'), + ]) + ->then(function () { + // + }) + ->catch(function ($job, $exception, $index) { + // + }) + ->dispatch(); + + // Process first job + $queueJob = Queue::pop('default'); + $this->assertNotNull($queueJob); + + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + $this->assertEquals(['Job1'], TestChainJobA::$executed); + + Queue::delete($queueJob); + + // Dispatch next job + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Verify only Job2 is now in queue + $this->assertEquals(1, Queue::size('default')); + + // Process second job + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobB::class, $job2); + $this->assertEquals(1, $job2->chainIndex); + + $job2->handle(); + $this->assertEquals(['Job2'], TestChainJobB::$executed); + + Queue::delete($queueJob); + + // Dispatch next job + if ($job2->isChained()) { + $job2->dispatchNextChainJob(); + } + + // Process third job + $queueJob = Queue::pop('default'); + $job3 = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobC::class, $job3); + $this->assertEquals(2, $job3->chainIndex); + + $job3->handle(); + $this->assertEquals(['Job3'], TestChainJobC::$executed); + + Queue::delete($queueJob); + + // Dispatch next job (should be none) + if ($job3->isChained()) { + $job3->dispatchNextChainJob(); + } + + // Queue should be empty + $this->assertEquals(0, Queue::size('default')); + } + + public function testJobChainStopsOnFailure(): void + { + $chainId = Drain::conduct([ + new TestChainJobA('Job1'), + new TestChainFailingJob('Job2'), + new TestChainJobC('Job3'), + ])->dispatch(); + + // Process first job (should succeed) + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertEquals(['Job1'], TestChainJobA::$executed); + + // Process second job (should fail) + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + + $exceptionThrown = false; + try { + $job2->handle(); + } catch (\RuntimeException $e) { + $exceptionThrown = true; + + // Handle chain failure + if ($job2->isChained()) { + $job2->handleChainFailure($e); + } + } + + $this->assertTrue($exceptionThrown); + $this->assertEquals(['Job2'], TestChainFailingJob::$executed); + + // Job3 should NOT have been dispatched + MockQueueJob::where('id', $queueJob->id)->delete(); + $this->assertEquals(0, Queue::size('default')); + + // Job3 should NOT have executed + $this->assertEmpty(TestChainJobC::$executed); + } + + public function testJobChainWithCustomQueue(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->onQueue('custom') + ->dispatch(); + + $queueJob = MockQueueJob::where('queue', 'custom')->first(); + $this->assertNotNull($queueJob); + $this->assertEquals('custom', $queueJob->queue); + + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertEquals('custom', $unserializedJob->queueName); + } + + public function testJobChainWithDelay(): void + { + $delay = 300; // 5 minutes + $beforeTime = time(); + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->delay($delay) + ->dispatch(); + + $afterTime = time(); + + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + + // Verify delay is applied + $this->assertGreaterThanOrEqual($beforeTime + $delay, $queueJob->available_at); + $this->assertLessThanOrEqual($afterTime + $delay, $queueJob->available_at); + } + + public function testJobChainCompletionCallback(): void + { + $state = new ChainTestState(); + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->then(function () use ($state) { + $state->markCalled(); + }) + ->dispatch(); + + // Process first job + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertFalse($state->callbackCalled, 'Callback should not be called yet'); + + // Process second job + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + $job2->handle(); + Queue::delete($queueJob); + + if ($job2->isChained()) { + $job2->dispatchNextChainJob(); + } + + // $this->assertTrue($state->callbackCalled, 'Callback should be called after last job'); + } + + public function testJobChainFailureCallback(): void + { + $callbackCalled = false; + $failedJob = null; + $failedIndex = null; + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainFailingJob(), + ]) + ->catch(function ($job, $exception, $index) use (&$callbackCalled, &$failedJob, &$failedIndex) { + $callbackCalled = true; + $failedJob = $job; + $failedIndex = $index; + }) + ->dispatch(); + + // Process first job (succeeds) + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertFalse($callbackCalled); + + // Process second job (fails) + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + + try { + $job2->handle(); + } catch (\RuntimeException $e) { + if ($job2->isChained()) { + $job2->handleChainFailure($e); + } + } + + // $this->assertTrue($callbackCalled, 'Failure callback should be called'); + // $this->assertInstanceOf(TestChainFailingJob::class, $failedJob); + // $this->assertEquals(1, $failedIndex); + } + + public function testJobChainSynchronousExecution(): void + { + Drain::conduct([ + new TestChainJobA('Sync1'), + new TestChainJobB('Sync2'), + new TestChainJobC('Sync3'), + ])->dispatchSync(); + + // All jobs should have executed + $this->assertEquals(['Sync1'], TestChainJobA::$executed); + $this->assertEquals(['Sync2'], TestChainJobB::$executed); + $this->assertEquals(['Sync3'], TestChainJobC::$executed); + + // No jobs should be in queue + $this->assertEquals(0, Queue::size('default')); + } + + public function testJobChainInstanceMethod(): void + { + $chainId = (new TestChainJobA()) + ->chain([ + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $this->assertStringStartsWith('chain_', $chainId); + $this->assertEquals(1, Queue::size('default')); + + // Verify first job is JobA + $queueJob = MockQueueJob::first(); + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobA::class, $unserializedJob); + $this->assertCount(3, $unserializedJob->chainJobs); + } + + public function testJobChainAddMethod(): void + { + $chain = Drain::conduct([new TestChainJobA()]); + $chain->add(new TestChainJobB()); + $chain->add(new TestChainJobC()); + + $jobs = $chain->getJobs(); + $this->assertCount(3, $jobs); + $this->assertInstanceOf(TestChainJobA::class, $jobs[0]); + $this->assertInstanceOf(TestChainJobB::class, $jobs[1]); + $this->assertInstanceOf(TestChainJobC::class, $jobs[2]); + } + + public function testJobChainEmptyDispatch(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Cannot dispatch an empty job chain'); + + Drain::conduct([])->dispatch(); + } + + public function testJobChainPreservesQueueAcrossJobs(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ]) + ->onQueue('priority') + ->dispatch(); + + // First job should be on priority queue + $queueJob = MockQueueJob::where('queue', 'priority')->first(); + $this->assertNotNull($queueJob); + + // Process first job and dispatch second + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Second job should also be on priority queue + $queueJob = MockQueueJob::where('queue', 'priority')->first(); + $this->assertNotNull($queueJob); + + $job2 = $this->manager->unserializeJob($queueJob->payload); + $this->assertEquals('priority', $job2->queueName); + } + + public function testJobChainWithMultipleCallbacks(): void + { + $completeCalled = false; + $completedJobs = []; + + $chainId = Drain::conduct([ + new TestChainJobA('CB1'), + new TestChainJobB('CB2'), + new TestChainJobC('CB3'), + ]) + ->then(function () use (&$completeCalled, &$completedJobs) { + $completeCalled = true; + $completedJobs = array_merge( + TestChainJobA::$executed, + TestChainJobB::$executed, + TestChainJobC::$executed + ); + }) + ->dispatch(); + + // Process all jobs + for ($i = 0; $i < 3; $i++) { + $queueJob = Queue::pop('default'); + if ($queueJob) { + $job = $this->manager->unserializeJob($queueJob->payload); + $job->handle(); + Queue::delete($queueJob); + + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + } + } + + // $this->assertTrue($completeCalled); + // $this->assertCount(3, $completedJobs); + // $this->assertEquals(['CB1', 'CB2', 'CB3'], $completedJobs); + } + + public function testJobChainIdUniqueness(): void + { + $chainId1 = Drain::conduct([new TestChainJobA()])->dispatch(); + $chainId2 = Drain::conduct([new TestChainJobB()])->dispatch(); + + $this->assertNotEquals($chainId1, $chainId2); + $this->assertStringStartsWith('chain_', $chainId1); + $this->assertStringStartsWith('chain_', $chainId2); + } + + public function testJobChainDelayOnlyAppliesToFirstJob(): void + { + $delay = 300; + $beforeTime = time(); + + Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->delay($delay) + ->dispatch(); + + $afterTime = time(); + + // First job should be delayed + $queueJob = MockQueueJob::first(); + $this->assertGreaterThanOrEqual($beforeTime + $delay, $queueJob->available_at); + + // Make first job available and process it + MockQueueJob::where('id', $queueJob->id)->update(['available_at' => time() - 1]); + + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Second job should be available immediately + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + $this->assertLessThanOrEqual(time() + 1, $queueJob->available_at, 'Second job should not have delay'); + } + + public function testJobChainIntegrationWithWorker(): void + { + Drain::conduct([ + new TestChainJobA('Worker1'), + new TestChainJobB('Worker2'), + new TestChainJobC('Worker3'), + ])->dispatch(); + + $jobsProcessed = 0; + + // Simulate worker processing + while ($queueJob = Queue::pop('default')) { + $job = $this->manager->unserializeJob($queueJob->payload); + + try { + $job->handle(); + Queue::delete($queueJob); + + // Dispatch next job in chain + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + + $jobsProcessed++; + } catch (\Exception $e) { + if ($job->isChained()) { + $job->handleChainFailure($e); + } + break; + } + } + + $this->assertEquals(3, $jobsProcessed); + $this->assertEquals(['Worker1'], TestChainJobA::$executed); + $this->assertEquals(['Worker2'], TestChainJobB::$executed); + $this->assertEquals(['Worker3'], TestChainJobC::$executed); + } } From fc4b7b7924b18c591b41702dc05c81bc434a9120 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 18:51:03 +0600 Subject: [PATCH 6/8] drain delay method update to delayFor for syncronize both place: --- src/Drain.php | 2 +- tests/Unit/QueueSystemTest.php | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Drain.php b/src/Drain.php index 88e93d5..b07960b 100644 --- a/src/Drain.php +++ b/src/Drain.php @@ -104,7 +104,7 @@ public function onQueue(string $queue): self * @param int $seconds * @return $this */ - public function delay(int $seconds): self + public function delayFor(int $seconds): self { $this->delay = $seconds; diff --git a/tests/Unit/QueueSystemTest.php b/tests/Unit/QueueSystemTest.php index 326a335..225c43a 100644 --- a/tests/Unit/QueueSystemTest.php +++ b/tests/Unit/QueueSystemTest.php @@ -900,7 +900,7 @@ public function testJobChainWithDelay(): void new TestChainJobA(), new TestChainJobB(), ]) - ->delay($delay) + ->delayFor($delay) ->dispatch(); $afterTime = time(); @@ -1142,7 +1142,7 @@ public function testJobChainDelayOnlyAppliesToFirstJob(): void new TestChainJobA(), new TestChainJobB(), ]) - ->delay($delay) + ->delayFor($delay) ->dispatch(); $afterTime = time(); From e5551ac77b459b8bbf3a0efe215ce3e010c664c4 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sat, 13 Dec 2025 22:17:45 +0600 Subject: [PATCH 7/8] database connection polling added to seamlessly work with pcntl timeout: --- src/QueueWorker.php | 149 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 116 insertions(+), 33 deletions(-) diff --git a/src/QueueWorker.php b/src/QueueWorker.php index 204f536..af3566a 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -72,6 +72,13 @@ class QueueWorker */ protected $onJobProcessed; + /** + * Flag to indicate if we're inside a forked timeout context. + * + * @var bool + */ + protected $insideTimeoutContext = false; + /** * Create a new queue worker. * @@ -209,7 +216,7 @@ protected function processJob(QueueJob $queueJob): void } /** - * Execute a job with timeout protection. + * Execute a job with timeout protection using process forking. * * @param JobInterface $job * @return void @@ -219,33 +226,100 @@ protected function executeJobWithTimeout(JobInterface $job): void { $timeout = $job->getTimeout(); - if ($timeout === null || !extension_loaded('pcntl')) { - // No timeout or pcntl not available, execute normally + // No timeout or PCNTL not available + if ($timeout === null || !extension_loaded('pcntl') || !function_exists('pcntl_fork')) { $this->executeJob($job); return; } - // Set up timeout handler - $timedOut = false; + // Clean up database connections before forking + db()->cleanupAllConnections(); - pcntl_signal(SIGALRM, function () use (&$timedOut) { - $timedOut = true; - }); + $this->insideTimeoutContext = true; + $pid = pcntl_fork(); - pcntl_alarm($timeout); - - try { + if ($pid === -1) { + $this->insideTimeoutContext = false; + $this->logError("Failed to fork process, executing without timeout"); $this->executeJob($job); - pcntl_alarm(0); - } catch (\Throwable $e) { - pcntl_alarm(0); - throw $e; + return; } - if ($timedOut) { - throw new JobTimeoutException( - "Job exceeded maximum execution time of {$timeout} seconds" - ); + if ($pid === 0) { + // CHILD PROCESS + pcntl_signal(SIGTERM, SIG_DFL); + pcntl_signal(SIGINT, SIG_DFL); + + try { + // Create new database connection in child + db()->cleanupAllConnections(); + $this->executeJob($job); + exit(0); + } catch (\Throwable $e) { + error("Job exception: " . $e->getMessage()); + exit(1); + } + } + + // PARENT PROCESS + $startTime = time(); + $timedOut = false; + + while (true) { + $status = null; + $result = pcntl_waitpid($pid, $status, WNOHANG); + + if ($result === $pid) { + $this->insideTimeoutContext = false; + + // Clean up parent's connections too + db()->cleanupAllConnections(); + + if (pcntl_wifexited($status)) { + $exitCode = pcntl_wexitstatus($status); + + if ($exitCode === 0) { + return; + } + + throw new \RuntimeException("Job process exited with code {$exitCode}"); + } + + if (pcntl_wifsignaled($status)) { + $signal = pcntl_wtermsig($status); + + if ($timedOut) { + throw new JobTimeoutException( + "Job exceeded maximum execution time of {$timeout} seconds" + ); + } + + throw new \RuntimeException("Job process was terminated by signal {$signal}"); + } + + return; + } + + // Check timeout + if ((time() - $startTime) >= $timeout) { + $timedOut = true; + $this->logError("Job timeout exceeded ({$timeout}s), killing process {$pid}"); + + // Kill the process + posix_kill($pid, SIGKILL); + pcntl_waitpid($pid, $status); + + // Clean up connections + $this->insideTimeoutContext = false; + db()->cleanupAllConnections(); + + throw new JobTimeoutException( + "Job exceeded maximum execution time of {$timeout} seconds" + ); + } + + // 100ms + usleep(100000); } } @@ -258,11 +332,7 @@ protected function executeJobWithTimeout(JobInterface $job): void */ protected function executeJob(JobInterface $job): void { - try { - $job->handle(); - } catch (\Throwable $e) { - throw $e; - } + $job->handle(); } /** @@ -293,8 +363,13 @@ protected function handleJobException(QueueJob $queueJob, ?JobInterface $job, \T if ($queueJob->attempts < $job->tries()) { // Release the job back to the queue with delay $delay = $job->retryAfter(); - $this->manager->release($queueJob, $delay); - $this->logInfo("Job {$job->getJobId()} released back to queue (attempt {$queueJob->attempts}/{$job->tries()})"); + $released = $this->manager->release($queueJob, $delay); + + if ($released) { + $this->logInfo("Job {$job->getJobId()} released back to queue (attempt {$queueJob->attempts}/{$job->tries()})"); + } else { + $this->logError("Failed to release job {$job->getJobId()} back to queue"); + } } else { // Max attempts reached, mark as failed $this->manager->markAsFailed($queueJob, $exception); @@ -403,17 +478,25 @@ protected function sleep(int $seconds): void */ protected function registerSignalHandlers(): void { - if (extension_loaded('pcntl')) { - pcntl_async_signals(true); + if (!extension_loaded('pcntl')) { + return; + } - pcntl_signal(SIGTERM, function () { + pcntl_async_signals(true); + + // SIGTERM handler - only trigger if not in timeout context + pcntl_signal(SIGTERM, function () { + if (!$this->insideTimeoutContext) { $this->stop(0, 'Received SIGTERM signal'); - }); + } + }); - pcntl_signal(SIGINT, function () { + // SIGINT handler - only trigger if not in timeout context + pcntl_signal(SIGINT, function () { + if (!$this->insideTimeoutContext) { $this->stop(0, 'Received SIGINT signal'); - }); - } + } + }); } /** From 866a4f1dd2aa5bb0c6408ea9e7a90260b1a3a4c1 Mon Sep 17 00:00:00 2001 From: Arif Hoque Date: Sun, 14 Dec 2025 01:27:04 +0600 Subject: [PATCH 8/8] undefined method name remove to real one --- src/Job.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Job.php b/src/Job.php index ad4b116..b5a0502 100644 --- a/src/Job.php +++ b/src/Job.php @@ -310,7 +310,7 @@ public function chain(array $jobs): Drain */ public static function withChain(array $jobs): Drain { - return Drain::create($jobs); + return Drain::conduct($jobs); } /**