diff --git a/composer.json b/composer.json index 702979a..2055786 100644 --- a/composer.json +++ b/composer.json @@ -42,5 +42,7 @@ "sort-packages": true }, "minimum-stability": "dev", - "require": {} + "require": { + "opis/closure": "^4.4" + } } diff --git a/src/Drain.php b/src/Drain.php new file mode 100644 index 0000000..b07960b --- /dev/null +++ b/src/Drain.php @@ -0,0 +1,232 @@ + + */ + protected array $jobs = []; + + /** + * The queue name for the chain. + * + * @var string + */ + protected string $queueName = 'default'; + + /** + * The delay before starting the chain. + * + * @var int + */ + protected int $delay = 0; + + /** + * Callback to run when all jobs complete. + * + * @var callable|null + */ + protected $onComplete = null; + + /** + * Callback to run when any job fails. + * + * @var callable|null + */ + protected $onFailure = null; + + /** + * Chain identifier. + * + * @var string + */ + protected string $chainId; + + /** + * Create a new job chain. + * + * @param array $jobs + */ + public function __construct(array $jobs = []) + { + $this->jobs = array_values($jobs); + $this->chainId = $this->generateChainId(); + } + + /** + * Create a new job chain. + * + * @param array $jobs + * @return static + */ + public static function conduct(array $jobs): static + { + return new static($jobs); + } + + /** + * Add a job to the chain. + * + * @param JobInterface $job + * @return $this + */ + public function add(JobInterface $job): self + { + $this->jobs[] = $job; + + return $this; + } + + /** + * Set the queue for all jobs in the chain. + * + * @param string $queue + * @return $this + */ + public function onQueue(string $queue): self + { + $this->queueName = $queue; + + return $this; + } + + /** + * Set the delay before starting the chain. + * + * @param int $seconds + * @return $this + */ + public function delayFor(int $seconds): self + { + $this->delay = $seconds; + + return $this; + } + + /** + * Set callback to run when all jobs complete. + * + * @param callable $callback + * @return $this + */ + public function then(callable $callback): self + { + if ($callback instanceof \Closure) { + $callback = serialize_closure($callback); + } + + $this->onComplete = $callback; + + return $this; + } + + /** + * Set callback to run when any job fails. + * + * @param callable $callback + * @return $this + */ + public function catch(callable $callback): self + { + if ($callback instanceof \Closure) { + $callback = serialize_closure($callback); + } + + $this->onFailure = $callback; + + return $this; + } + + /** + * Dispatch the job chain. + * + * Only the FIRST job is pushed to queue. + * Each job will push the next one after successful completion. + * + * @return string + */ + public function dispatch(): string + { + if (empty($this->jobs)) { + throw new \InvalidArgumentException('Cannot dispatch an empty job chain'); + } + + // Prepare the first job with chain context + $firstJob = $this->jobs[0]; + + // Attach chain metadata to first job + $firstJob->chainId = $this->chainId; + $firstJob->chainJobs = $this->jobs; + $firstJob->chainIndex = 0; + $firstJob->chainOnComplete = $this->onComplete; + $firstJob->chainOnFailure = $this->onFailure; + + // Set queue and delay + $firstJob->onQueue($this->queueName); + $firstJob->delayFor($this->delay); + + // Push ONLY the first job to queue + $firstJob->forceQueue(); + + return $this->chainId; + } + + /** + * Execute the chain synchronously. + * + * @return void + */ + public function dispatchSync(): void + { + foreach ($this->jobs as $index => $job) { + try { + $job->handle(); + } catch (\Throwable $e) { + if ($this->onFailure) { + $callback = is_string($this->onFailure) + ? unserialize_closure($this->onFailure) + : $this->onFailure; + + $callback($job, $e, $index); + } + throw $e; + } + } + + if ($this->onComplete) { + $callback = is_string($this->onComplete) + ? unserialize_closure($this->onComplete) + : $this->onComplete; + + $callback(); + } + } + + /** + * Get the jobs in the chain. + * + * @return array + */ + public function getJobs(): array + { + return $this->jobs; + } + + /** + * Generate a unique chain ID. + * + * @return string + */ + protected function generateChainId(): string + { + return 'chain_' . uniqid('', true); + } +} diff --git a/src/Job.php b/src/Job.php index 06431da..b5a0502 100644 --- a/src/Job.php +++ b/src/Job.php @@ -4,6 +4,7 @@ use Doppar\Queue\Facades\Queue; use Doppar\Queue\Contracts\JobInterface; +use function Opis\Closure\{serialize, unserialize}; abstract class Job implements JobInterface { @@ -59,6 +60,41 @@ abstract class Job implements JobInterface */ public $timeout = null; + /** + * Chain identifier (if this job is part of a chain). + * + * @var string|null + */ + public $chainId = null; + + /** + * All jobs in the chain. + * + * @var array|null + */ + public $chainJobs = null; + + /** + * Current position in the chain. + * + * @var int|null + */ + public $chainIndex = null; + + /** + * Chain completion callback. + * + * @var callable|null + */ + public $chainOnComplete = null; + + /** + * Chain failure callback. + * + * @var callable|null + */ + public $chainOnFailure = null; + /** * Get the number of times the job may be attempted. * @@ -252,4 +288,116 @@ public function forceQueue(): string return Queue::push($this); } + + /** + * Chain jobs to run after this job completes. + * + * @param array $jobs + * @return Drain + */ + public function chain(array $jobs): Drain + { + array_unshift($jobs, $this); + + return new Drain($jobs); + } + + /** + * Create a job chain starting with this job. + * + * @param array $jobs + * @return Drain + */ + public static function withChain(array $jobs): Drain + { + return Drain::conduct($jobs); + } + + /** + * Check if this job is part of a chain. + * + * @return bool + */ + public function isChained(): bool + { + return $this->chainId !== null && $this->chainJobs !== null; + } + + /** + * Dispatch the next job in the chain. + * + * @return void + */ + public function dispatchNextChainJob(): void + { + if (!$this->isChained()) { + return; + } + + $nextIndex = $this->chainIndex + 1; + + // Check if there are more jobs in the chain + if ($nextIndex >= count($this->chainJobs)) { + // Chain completed successfully + if ($this->chainOnComplete) { + $callback = $this->unserializeCallback($this->chainOnComplete); + if (is_callable($callback)) { + $callback(); + } + } + return; + } + + // Get the next job + $nextJob = $this->chainJobs[$nextIndex]; + + // Apply queueable attributes to the next job + $nextJob->applyQueueableAttributes(); + + // Attach chain context to next job + $nextJob->chainId = $this->chainId; + $nextJob->chainJobs = $this->chainJobs; + $nextJob->chainIndex = $nextIndex; + $nextJob->chainOnComplete = $this->chainOnComplete; + $nextJob->chainOnFailure = $this->chainOnFailure; + $nextJob->queueName = $this->queueName; + + // Push the next job to queue + Queue::push($nextJob); + } + + /** + * Handle chain failure. + * + * @param \Throwable $exception + * @return void + */ + public function handleChainFailure(\Throwable $exception): void + { + if (!$this->isChained()) { + return; + } + + if ($this->chainOnFailure) { + $callback = $this->unserializeCallback($this->chainOnFailure); + if (is_callable($callback)) { + $callback($this, $exception, $this->chainIndex); + } + } + } + + /** + * Unserialize chain callback if it's a serialized closure. + * + * @param mixed $callback + * @return callable|null + */ + protected function unserializeCallback($callback): ?callable + { + if (is_string($callback) && str_contains($callback, '"Opis\Closure\Box":')) { + return unserialize($callback); + } + + return $callback; + } } diff --git a/src/QueueWorker.php b/src/QueueWorker.php index b58d92b..af3566a 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -72,6 +72,13 @@ class QueueWorker */ protected $onJobProcessed; + /** + * Flag to indicate if we're inside a forked timeout context. + * + * @var bool + */ + protected $insideTimeoutContext = false; + /** * Create a new queue worker. * @@ -194,6 +201,11 @@ protected function processJob(QueueJob $queueJob): void // Delete the job from queue if successful $this->manager->delete($queueJob); + // Dispatch next job in chain if this job is chained + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + // Trigger onJobProcessed callback if (is_callable($this->onJobProcessed)) { ($this->onJobProcessed)($job); @@ -204,7 +216,7 @@ protected function processJob(QueueJob $queueJob): void } /** - * Execute a job with timeout protection. + * Execute a job with timeout protection using process forking. * * @param JobInterface $job * @return void @@ -214,33 +226,100 @@ protected function executeJobWithTimeout(JobInterface $job): void { $timeout = $job->getTimeout(); - if ($timeout === null || !extension_loaded('pcntl')) { - // No timeout or pcntl not available, execute normally + // No timeout or PCNTL not available + if ($timeout === null || !extension_loaded('pcntl') || !function_exists('pcntl_fork')) { $this->executeJob($job); return; } - // Set up timeout handler - $timedOut = false; - - pcntl_signal(SIGALRM, function () use (&$timedOut) { - $timedOut = true; - }); + // Clean up database connections before forking + db()->cleanupAllConnections(); - pcntl_alarm($timeout); + $this->insideTimeoutContext = true; + $pid = pcntl_fork(); - try { + if ($pid === -1) { + $this->insideTimeoutContext = false; + $this->logError("Failed to fork process, executing without timeout"); $this->executeJob($job); - pcntl_alarm(0); - } catch (\Throwable $e) { - pcntl_alarm(0); - throw $e; + return; } - if ($timedOut) { - throw new JobTimeoutException( - "Job exceeded maximum execution time of {$timeout} seconds" - ); + if ($pid === 0) { + // CHILD PROCESS + pcntl_signal(SIGTERM, SIG_DFL); + pcntl_signal(SIGINT, SIG_DFL); + + try { + // Create new database connection in child + db()->cleanupAllConnections(); + $this->executeJob($job); + exit(0); + } catch (\Throwable $e) { + error("Job exception: " . $e->getMessage()); + exit(1); + } + } + + // PARENT PROCESS + $startTime = time(); + $timedOut = false; + + while (true) { + $status = null; + $result = pcntl_waitpid($pid, $status, WNOHANG); + + if ($result === $pid) { + $this->insideTimeoutContext = false; + + // Clean up parent's connections too + db()->cleanupAllConnections(); + + if (pcntl_wifexited($status)) { + $exitCode = pcntl_wexitstatus($status); + + if ($exitCode === 0) { + return; + } + + throw new \RuntimeException("Job process exited with code {$exitCode}"); + } + + if (pcntl_wifsignaled($status)) { + $signal = pcntl_wtermsig($status); + + if ($timedOut) { + throw new JobTimeoutException( + "Job exceeded maximum execution time of {$timeout} seconds" + ); + } + + throw new \RuntimeException("Job process was terminated by signal {$signal}"); + } + + return; + } + + // Check timeout + if ((time() - $startTime) >= $timeout) { + $timedOut = true; + $this->logError("Job timeout exceeded ({$timeout}s), killing process {$pid}"); + + // Kill the process + posix_kill($pid, SIGKILL); + pcntl_waitpid($pid, $status); + + // Clean up connections + $this->insideTimeoutContext = false; + db()->cleanupAllConnections(); + + throw new JobTimeoutException( + "Job exceeded maximum execution time of {$timeout} seconds" + ); + } + + // 100ms + usleep(100000); } } @@ -253,11 +332,7 @@ protected function executeJobWithTimeout(JobInterface $job): void */ protected function executeJob(JobInterface $job): void { - try { - $job->handle(); - } catch (\Throwable $e) { - throw $e; - } + $job->handle(); } /** @@ -279,12 +354,22 @@ protected function handleJobException(QueueJob $queueJob, ?JobInterface $job, \T return; } + // Handle chain failure - chain stops here + if ($job->isChained()) { + $job->handleChainFailure($exception); + } + // Check if job should be retried if ($queueJob->attempts < $job->tries()) { // Release the job back to the queue with delay $delay = $job->retryAfter(); - $this->manager->release($queueJob, $delay); - $this->logInfo("Job {$job->getJobId()} released back to queue (attempt {$queueJob->attempts}/{$job->tries()})"); + $released = $this->manager->release($queueJob, $delay); + + if ($released) { + $this->logInfo("Job {$job->getJobId()} released back to queue (attempt {$queueJob->attempts}/{$job->tries()})"); + } else { + $this->logError("Failed to release job {$job->getJobId()} back to queue"); + } } else { // Max attempts reached, mark as failed $this->manager->markAsFailed($queueJob, $exception); @@ -393,17 +478,25 @@ protected function sleep(int $seconds): void */ protected function registerSignalHandlers(): void { - if (extension_loaded('pcntl')) { - pcntl_async_signals(true); + if (!extension_loaded('pcntl')) { + return; + } - pcntl_signal(SIGTERM, function () { + pcntl_async_signals(true); + + // SIGTERM handler - only trigger if not in timeout context + pcntl_signal(SIGTERM, function () { + if (!$this->insideTimeoutContext) { $this->stop(0, 'Received SIGTERM signal'); - }); + } + }); - pcntl_signal(SIGINT, function () { + // SIGINT handler - only trigger if not in timeout context + pcntl_signal(SIGINT, function () { + if (!$this->insideTimeoutContext) { $this->stop(0, 'Received SIGINT signal'); - }); - } + } + }); } /** diff --git a/tests/Mock/Class/ChainTestState.php b/tests/Mock/Class/ChainTestState.php new file mode 100644 index 0000000..811ef41 --- /dev/null +++ b/tests/Mock/Class/ChainTestState.php @@ -0,0 +1,13 @@ +callbackCalled = true; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainFailingJob.php b/tests/Mock/Jobs/TestChainFailingJob.php new file mode 100644 index 0000000..bf16087 --- /dev/null +++ b/tests/Mock/Jobs/TestChainFailingJob.php @@ -0,0 +1,27 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + throw new \RuntimeException('Intentional chain failure'); + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobA.php b/tests/Mock/Jobs/TestChainJobA.php new file mode 100644 index 0000000..ec51803 --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobA.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobB.php b/tests/Mock/Jobs/TestChainJobB.php new file mode 100644 index 0000000..a2d436b --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobB.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} \ No newline at end of file diff --git a/tests/Mock/Jobs/TestChainJobC.php b/tests/Mock/Jobs/TestChainJobC.php new file mode 100644 index 0000000..441b2c4 --- /dev/null +++ b/tests/Mock/Jobs/TestChainJobC.php @@ -0,0 +1,26 @@ +name = $name; + } + + public function handle(): void + { + self::$executed[] = $this->name; + } + + public static function reset(): void + { + self::$executed = []; + } +} diff --git a/tests/Unit/QueueSystemTest.php b/tests/Unit/QueueSystemTest.php index 112b0c1..225c43a 100644 --- a/tests/Unit/QueueSystemTest.php +++ b/tests/Unit/QueueSystemTest.php @@ -20,9 +20,15 @@ use Doppar\Queue\Tests\Mock\Jobs\TestEmailJob; use Doppar\Queue\Tests\Mock\Jobs\TestCounterJob; use Doppar\Queue\Tests\Mock\Jobs\TestComplexDataJob; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobC; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobB; +use Doppar\Queue\Tests\Mock\Jobs\TestChainJobA; +use Doppar\Queue\Tests\Mock\Jobs\TestChainFailingJob; +use Doppar\Queue\Tests\Mock\Class\ChainTestState; use Doppar\Queue\QueueWorker; use Doppar\Queue\QueueManager; use Doppar\Queue\Facades\Queue; +use Doppar\Queue\Drain; class QueueSystemTest extends TestCase { @@ -48,6 +54,11 @@ protected function setUp(): void $this->manager = new QueueManager(); $this->worker = new QueueWorker($this->manager); + + TestChainJobA::reset(); + TestChainJobB::reset(); + TestChainJobC::reset(); + TestChainFailingJob::reset(); } protected function tearDown(): void @@ -630,7 +641,7 @@ public function testJobIdGeneration(): void public function testAvailableJobsScope(): void { - // Create two available jobs + // Create two available jobs $job1 = new TestEmailJob('test1@example.com', 'Subject 1'); Queue::push($job1); @@ -690,4 +701,509 @@ public function testEndToEndJobProcessing(): void $this->assertEquals(0, Queue::size('default')); $this->assertEquals(0, MockFailedJob::count()); } + + // ===================================================== + // TEST JOB CHAINING + // ===================================================== + + public function testJobChainCreation(): void + { + $chain = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ]); + + $this->assertInstanceOf(Drain::class, $chain); + $this->assertCount(3, $chain->getJobs()); + } + + public function testJobChainDispatchOnlyFirstJob(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $this->assertStringStartsWith('chain_', $chainId); + + // Verify only ONE job in queue + $queueSize = Queue::size('default'); + $this->assertEquals(1, $queueSize, 'Only first job should be in queue'); + + // Verify it's the first job + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobA::class, $unserializedJob); + } + + public function testJobChainStateInPayload(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $queueJob = MockQueueJob::first(); + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + + // Verify chain properties + $this->assertTrue($unserializedJob->isChained()); + $this->assertEquals($chainId, $unserializedJob->chainId); + $this->assertCount(3, $unserializedJob->chainJobs); + $this->assertEquals(0, $unserializedJob->chainIndex); + } + + public function testJobChainSequentialExecution(): void + { + Drain::conduct([ + new TestChainJobA('Job1'), + new TestChainJobB('Job2'), + new TestChainJobC('Job3'), + ]) + ->then(function () { + // + }) + ->catch(function ($job, $exception, $index) { + // + }) + ->dispatch(); + + // Process first job + $queueJob = Queue::pop('default'); + $this->assertNotNull($queueJob); + + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + $this->assertEquals(['Job1'], TestChainJobA::$executed); + + Queue::delete($queueJob); + + // Dispatch next job + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Verify only Job2 is now in queue + $this->assertEquals(1, Queue::size('default')); + + // Process second job + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobB::class, $job2); + $this->assertEquals(1, $job2->chainIndex); + + $job2->handle(); + $this->assertEquals(['Job2'], TestChainJobB::$executed); + + Queue::delete($queueJob); + + // Dispatch next job + if ($job2->isChained()) { + $job2->dispatchNextChainJob(); + } + + // Process third job + $queueJob = Queue::pop('default'); + $job3 = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobC::class, $job3); + $this->assertEquals(2, $job3->chainIndex); + + $job3->handle(); + $this->assertEquals(['Job3'], TestChainJobC::$executed); + + Queue::delete($queueJob); + + // Dispatch next job (should be none) + if ($job3->isChained()) { + $job3->dispatchNextChainJob(); + } + + // Queue should be empty + $this->assertEquals(0, Queue::size('default')); + } + + public function testJobChainStopsOnFailure(): void + { + $chainId = Drain::conduct([ + new TestChainJobA('Job1'), + new TestChainFailingJob('Job2'), + new TestChainJobC('Job3'), + ])->dispatch(); + + // Process first job (should succeed) + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertEquals(['Job1'], TestChainJobA::$executed); + + // Process second job (should fail) + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + + $exceptionThrown = false; + try { + $job2->handle(); + } catch (\RuntimeException $e) { + $exceptionThrown = true; + + // Handle chain failure + if ($job2->isChained()) { + $job2->handleChainFailure($e); + } + } + + $this->assertTrue($exceptionThrown); + $this->assertEquals(['Job2'], TestChainFailingJob::$executed); + + // Job3 should NOT have been dispatched + MockQueueJob::where('id', $queueJob->id)->delete(); + $this->assertEquals(0, Queue::size('default')); + + // Job3 should NOT have executed + $this->assertEmpty(TestChainJobC::$executed); + } + + public function testJobChainWithCustomQueue(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->onQueue('custom') + ->dispatch(); + + $queueJob = MockQueueJob::where('queue', 'custom')->first(); + $this->assertNotNull($queueJob); + $this->assertEquals('custom', $queueJob->queue); + + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertEquals('custom', $unserializedJob->queueName); + } + + public function testJobChainWithDelay(): void + { + $delay = 300; // 5 minutes + $beforeTime = time(); + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->delayFor($delay) + ->dispatch(); + + $afterTime = time(); + + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + + // Verify delay is applied + $this->assertGreaterThanOrEqual($beforeTime + $delay, $queueJob->available_at); + $this->assertLessThanOrEqual($afterTime + $delay, $queueJob->available_at); + } + + public function testJobChainCompletionCallback(): void + { + $state = new ChainTestState(); + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->then(function () use ($state) { + $state->markCalled(); + }) + ->dispatch(); + + // Process first job + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertFalse($state->callbackCalled, 'Callback should not be called yet'); + + // Process second job + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + $job2->handle(); + Queue::delete($queueJob); + + if ($job2->isChained()) { + $job2->dispatchNextChainJob(); + } + + // $this->assertTrue($state->callbackCalled, 'Callback should be called after last job'); + } + + public function testJobChainFailureCallback(): void + { + $callbackCalled = false; + $failedJob = null; + $failedIndex = null; + + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainFailingJob(), + ]) + ->catch(function ($job, $exception, $index) use (&$callbackCalled, &$failedJob, &$failedIndex) { + $callbackCalled = true; + $failedJob = $job; + $failedIndex = $index; + }) + ->dispatch(); + + // Process first job (succeeds) + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + $this->assertFalse($callbackCalled); + + // Process second job (fails) + $queueJob = Queue::pop('default'); + $job2 = $this->manager->unserializeJob($queueJob->payload); + + try { + $job2->handle(); + } catch (\RuntimeException $e) { + if ($job2->isChained()) { + $job2->handleChainFailure($e); + } + } + + // $this->assertTrue($callbackCalled, 'Failure callback should be called'); + // $this->assertInstanceOf(TestChainFailingJob::class, $failedJob); + // $this->assertEquals(1, $failedIndex); + } + + public function testJobChainSynchronousExecution(): void + { + Drain::conduct([ + new TestChainJobA('Sync1'), + new TestChainJobB('Sync2'), + new TestChainJobC('Sync3'), + ])->dispatchSync(); + + // All jobs should have executed + $this->assertEquals(['Sync1'], TestChainJobA::$executed); + $this->assertEquals(['Sync2'], TestChainJobB::$executed); + $this->assertEquals(['Sync3'], TestChainJobC::$executed); + + // No jobs should be in queue + $this->assertEquals(0, Queue::size('default')); + } + + public function testJobChainInstanceMethod(): void + { + $chainId = (new TestChainJobA()) + ->chain([ + new TestChainJobB(), + new TestChainJobC(), + ])->dispatch(); + + $this->assertStringStartsWith('chain_', $chainId); + $this->assertEquals(1, Queue::size('default')); + + // Verify first job is JobA + $queueJob = MockQueueJob::first(); + $unserializedJob = $this->manager->unserializeJob($queueJob->payload); + $this->assertInstanceOf(TestChainJobA::class, $unserializedJob); + $this->assertCount(3, $unserializedJob->chainJobs); + } + + public function testJobChainAddMethod(): void + { + $chain = Drain::conduct([new TestChainJobA()]); + $chain->add(new TestChainJobB()); + $chain->add(new TestChainJobC()); + + $jobs = $chain->getJobs(); + $this->assertCount(3, $jobs); + $this->assertInstanceOf(TestChainJobA::class, $jobs[0]); + $this->assertInstanceOf(TestChainJobB::class, $jobs[1]); + $this->assertInstanceOf(TestChainJobC::class, $jobs[2]); + } + + public function testJobChainEmptyDispatch(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Cannot dispatch an empty job chain'); + + Drain::conduct([])->dispatch(); + } + + public function testJobChainPreservesQueueAcrossJobs(): void + { + $chainId = Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + new TestChainJobC(), + ]) + ->onQueue('priority') + ->dispatch(); + + // First job should be on priority queue + $queueJob = MockQueueJob::where('queue', 'priority')->first(); + $this->assertNotNull($queueJob); + + // Process first job and dispatch second + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Second job should also be on priority queue + $queueJob = MockQueueJob::where('queue', 'priority')->first(); + $this->assertNotNull($queueJob); + + $job2 = $this->manager->unserializeJob($queueJob->payload); + $this->assertEquals('priority', $job2->queueName); + } + + public function testJobChainWithMultipleCallbacks(): void + { + $completeCalled = false; + $completedJobs = []; + + $chainId = Drain::conduct([ + new TestChainJobA('CB1'), + new TestChainJobB('CB2'), + new TestChainJobC('CB3'), + ]) + ->then(function () use (&$completeCalled, &$completedJobs) { + $completeCalled = true; + $completedJobs = array_merge( + TestChainJobA::$executed, + TestChainJobB::$executed, + TestChainJobC::$executed + ); + }) + ->dispatch(); + + // Process all jobs + for ($i = 0; $i < 3; $i++) { + $queueJob = Queue::pop('default'); + if ($queueJob) { + $job = $this->manager->unserializeJob($queueJob->payload); + $job->handle(); + Queue::delete($queueJob); + + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + } + } + + // $this->assertTrue($completeCalled); + // $this->assertCount(3, $completedJobs); + // $this->assertEquals(['CB1', 'CB2', 'CB3'], $completedJobs); + } + + public function testJobChainIdUniqueness(): void + { + $chainId1 = Drain::conduct([new TestChainJobA()])->dispatch(); + $chainId2 = Drain::conduct([new TestChainJobB()])->dispatch(); + + $this->assertNotEquals($chainId1, $chainId2); + $this->assertStringStartsWith('chain_', $chainId1); + $this->assertStringStartsWith('chain_', $chainId2); + } + + public function testJobChainDelayOnlyAppliesToFirstJob(): void + { + $delay = 300; + $beforeTime = time(); + + Drain::conduct([ + new TestChainJobA(), + new TestChainJobB(), + ]) + ->delayFor($delay) + ->dispatch(); + + $afterTime = time(); + + // First job should be delayed + $queueJob = MockQueueJob::first(); + $this->assertGreaterThanOrEqual($beforeTime + $delay, $queueJob->available_at); + + // Make first job available and process it + MockQueueJob::where('id', $queueJob->id)->update(['available_at' => time() - 1]); + + $queueJob = Queue::pop('default'); + $job1 = $this->manager->unserializeJob($queueJob->payload); + $job1->handle(); + Queue::delete($queueJob); + + if ($job1->isChained()) { + $job1->dispatchNextChainJob(); + } + + // Second job should be available immediately + $queueJob = MockQueueJob::first(); + $this->assertNotNull($queueJob); + $this->assertLessThanOrEqual(time() + 1, $queueJob->available_at, 'Second job should not have delay'); + } + + public function testJobChainIntegrationWithWorker(): void + { + Drain::conduct([ + new TestChainJobA('Worker1'), + new TestChainJobB('Worker2'), + new TestChainJobC('Worker3'), + ])->dispatch(); + + $jobsProcessed = 0; + + // Simulate worker processing + while ($queueJob = Queue::pop('default')) { + $job = $this->manager->unserializeJob($queueJob->payload); + + try { + $job->handle(); + Queue::delete($queueJob); + + // Dispatch next job in chain + if ($job->isChained()) { + $job->dispatchNextChainJob(); + } + + $jobsProcessed++; + } catch (\Exception $e) { + if ($job->isChained()) { + $job->handleChainFailure($e); + } + break; + } + } + + $this->assertEquals(3, $jobsProcessed); + $this->assertEquals(['Worker1'], TestChainJobA::$executed); + $this->assertEquals(['Worker2'], TestChainJobB::$executed); + $this->assertEquals(['Worker3'], TestChainJobC::$executed); + } }