diff --git a/composer.json b/composer.json index a07a1f5..a50802d 100644 --- a/composer.json +++ b/composer.json @@ -29,6 +29,7 @@ "php": ">=8.3", "php-amqplib/php-amqplib": "^3.7", "utopia-php/di": "0.3.*", + "utopia-php/lock": "0.2.*", "utopia-php/servers": "0.4.*", "utopia-php/pools": "1.*", "utopia-php/telemetry": "0.4.*", diff --git a/composer.lock b/composer.lock index 0acc091..87cfa7d 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1f8c2839dc26ae8525c7adcca7838164", + "content-hash": "1d5f6649fb727b85e109128125f7ff9d", "packages": [ { "name": "brick/math", @@ -2236,6 +2236,57 @@ }, "time": "2026-03-21T07:42:10+00:00" }, + { + "name": "utopia-php/lock", + "version": "0.2.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/lock.git", + "reference": "49317c9493d8f747e4299aa24c22862aa5f6e106" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/lock/zipball/49317c9493d8f747e4299aa24c22862aa5f6e106", + "reference": "49317c9493d8f747e4299aa24c22862aa5f6e106", + "shasum": "" + }, + "require": { + "php": ">=8.3" + }, + "require-dev": { + "laravel/pint": "1.*", + "phpstan/phpstan": "2.*", + "phpunit/phpunit": "11.*", + "swoole/ide-helper": "*" + }, + "suggest": { + "ext-pcntl": "Required to run the File lock tests", + "ext-redis": "Required for the Distributed lock", + "ext-swoole": "Required for the Mutex and Semaphore locks (>=6.0)" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Lock\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Appwrite Team", + "email": "team@appwrite.io" + } + ], + "description": "Mutex, semaphore, file and distributed locks for PHP — one interface, four backends.", + "support": { + "issues": "https://github.com/utopia-php/lock/issues", + "source": "https://github.com/utopia-php/lock/tree/0.2.0" + }, + "time": "2026-04-24T10:47:56+00:00" + }, { "name": "utopia-php/pools", "version": "1.0.3", diff --git a/src/Queue/Connection/Locking.php b/src/Queue/Connection/Locking.php new file mode 100644 index 0000000..d8c935a --- /dev/null +++ b/src/Queue/Connection/Locking.php @@ -0,0 +1,152 @@ +lock->withLock($command, self::ACQUIRE_TIMEOUT); + } + + public function rightPushArray(string $queue, array $payload): bool + { + return $this->synchronize(fn () => $this->connection->rightPushArray($queue, $payload)); + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + return $this->synchronize(fn () => $this->connection->rightPopArray($queue, $timeout)); + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + return $this->synchronize(fn () => $this->connection->rightPopLeftPushArray($queue, $destination, $timeout)); + } + + public function leftPushArray(string $queue, array $payload): bool + { + return $this->synchronize(fn () => $this->connection->leftPushArray($queue, $payload)); + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + return $this->synchronize(fn () => $this->connection->leftPopArray($queue, $timeout)); + } + + public function rightPush(string $queue, string $payload): bool + { + return $this->synchronize(fn () => $this->connection->rightPush($queue, $payload)); + } + + public function rightPop(string $queue, int $timeout): string|false + { + return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout)); + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout)); + } + + public function leftPush(string $queue, string $payload): bool + { + return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload)); + } + + public function leftPop(string $queue, int $timeout): string|false + { + return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout)); + } + + public function listRemove(string $queue, string $key): bool + { + return $this->synchronize(fn () => $this->connection->listRemove($queue, $key)); + } + + public function listSize(string $key): int + { + return $this->synchronize(fn () => $this->connection->listSize($key)); + } + + public function listRange(string $key, int $total, int $offset): array + { + return $this->synchronize(fn () => $this->connection->listRange($key, $total, $offset)); + } + + public function remove(string $key): bool + { + return $this->synchronize(fn () => $this->connection->remove($key)); + } + + public function move(string $queue, string $destination): bool + { + return $this->synchronize(fn () => $this->connection->move($queue, $destination)); + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + return $this->synchronize(fn () => $this->connection->set($key, $value, $ttl)); + } + + public function get(string $key): array|string|null + { + return $this->synchronize(fn () => $this->connection->get($key)); + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + return $this->synchronize(fn () => $this->connection->setArray($key, $value, $ttl)); + } + + public function increment(string $key): int + { + return $this->synchronize(fn () => $this->connection->increment($key)); + } + + public function decrement(string $key): int + { + return $this->synchronize(fn () => $this->connection->decrement($key)); + } + + public function ping(): bool + { + return $this->synchronize(fn () => $this->connection->ping()); + } + + public function close(): void + { + $this->synchronize(fn () => $this->connection->close()); + } +} diff --git a/tests/Queue/E2E/Adapter/LockingTest.php b/tests/Queue/E2E/Adapter/LockingTest.php new file mode 100644 index 0000000..a4b3607 --- /dev/null +++ b/tests/Queue/E2E/Adapter/LockingTest.php @@ -0,0 +1,463 @@ + $args + */ + #[DataProvider('operationProvider')] + public function testOperationIsSynchronized(string $method, array $args, mixed $expected): void + { + $recorder = new Recorder(); + $locking = new Locking( + new RecordingConnection($recorder), + new RecordingLock($recorder), + ); + + $result = $locking->$method(...$args); + + $this->assertSame($expected, $result, "{$method}() should return the inner connection's value"); + $this->assertSame(['acquire', $method, 'release'], $recorder->events, "{$method}() must run inside the lock"); + $this->assertSame([$method => $args], $recorder->calls, "{$method}() must forward its arguments"); + } + + /** + * The lock must be acquired with a wait-forever timeout, so a command is + * queued rather than dropped when the connection is momentarily busy. + */ + public function testLockIsAcquiredWithWaitForeverTimeout(): void + { + $recorder = new Recorder(); + $locking = new Locking( + new RecordingConnection($recorder), + new RecordingLock($recorder), + ); + + $locking->ping(); + + $this->assertSame(-1.0, $recorder->lastTimeout); + } + + /** + * The lock must be released even when the inner command throws, otherwise + * a single failure would deadlock every other command on the connection. + */ + public function testLockIsReleasedWhenInnerCommandThrows(): void + { + $recorder = new Recorder(); + $locking = new Locking( + new ThrowingConnection(), + new RecordingLock($recorder), + ); + + try { + $locking->ping(); + $this->fail('Expected the inner exception to propagate.'); + } catch (\RuntimeException $e) { + $this->assertSame('boom', $e->getMessage()); + } + + $this->assertSame(['acquire', 'release'], $recorder->events); + } + + /** + * Defaults to a coroutine-aware Mutex when no lock is injected. + */ + public function testDefaultLockIsAMutex(): void + { + $locking = new Locking(new RecordingConnection(new Recorder())); + + $lock = (new \ReflectionProperty(Locking::class, 'lock'))->getValue($locking); + + $this->assertInstanceOf(Mutex::class, $lock); + } + + /** + * Regression guard: every method on the Connection interface must be + * exercised by operationProvider(), so newly added methods cannot ship + * without verifying they are synchronized. + */ + public function testEveryConnectionMethodIsCovered(): void + { + $declared = \array_map( + static fn (\ReflectionMethod $method): string => $method->getName(), + (new \ReflectionClass(Connection::class))->getMethods(), + ); + + $covered = \array_map( + static fn (array $case): string => $case[0], + \iterator_to_array($this->operationProvider(), false), + ); + + \sort($declared); + \sort($covered); + + $this->assertSame($declared, $covered, 'Every Connection method must be covered by the Locking test.'); + } + + /** + * @return iterable, 2: mixed}> + */ + public static function operationProvider(): iterable + { + yield 'rightPushArray' => ['rightPushArray', ['queue', ['a' => 1]], true]; + yield 'rightPopArray' => ['rightPopArray', ['queue', 5], ['popped' => 'right']]; + yield 'rightPopLeftPushArray' => ['rightPopLeftPushArray', ['queue', 'dest', 5], ['rpoplpush' => true]]; + yield 'leftPushArray' => ['leftPushArray', ['queue', ['a' => 1]], true]; + yield 'leftPopArray' => ['leftPopArray', ['queue', 5], ['popped' => 'left']]; + yield 'rightPush' => ['rightPush', ['queue', 'value'], true]; + yield 'rightPop' => ['rightPop', ['queue', 5], 'right-pop']; + yield 'rightPopLeftPush' => ['rightPopLeftPush', ['queue', 'dest', 5], 'rpoplpush']; + yield 'leftPush' => ['leftPush', ['queue', 'value'], true]; + yield 'leftPop' => ['leftPop', ['queue', 5], 'left-pop']; + yield 'listRemove' => ['listRemove', ['queue', 'key'], true]; + yield 'listSize' => ['listSize', ['key'], 7]; + yield 'listRange' => ['listRange', ['key', 10, 0], ['a', 'b']]; + yield 'remove' => ['remove', ['key'], true]; + yield 'move' => ['move', ['queue', 'dest'], true]; + yield 'set' => ['set', ['key', 'value', 60], true]; + yield 'get' => ['get', ['key'], 'value']; + yield 'setArray' => ['setArray', ['key', ['a' => 1], 60], true]; + yield 'increment' => ['increment', ['key'], 3]; + yield 'decrement' => ['decrement', ['key'], 2]; + yield 'ping' => ['ping', [], true]; + yield 'close' => ['close', [], null]; + } +} + +/** + * Shared event log written to by both the spy lock and the spy connection, so + * tests can assert the ordering of acquire/command/release across them. + */ +class Recorder +{ + /** @var list */ + public array $events = []; + + /** @var array> */ + public array $calls = []; + + public ?float $lastTimeout = null; +} + +class RecordingLock implements Lock +{ + public function __construct(private readonly Recorder $recorder) + { + } + + public function acquire(float $timeout = 0.0): bool + { + return true; + } + + public function tryAcquire(): bool + { + return true; + } + + public function release(): void + { + } + + public function withLock(callable $callback, float $timeout = 0.0): mixed + { + $this->recorder->lastTimeout = $timeout; + $this->recorder->events[] = 'acquire'; + + try { + return $callback(); + } finally { + $this->recorder->events[] = 'release'; + } + } +} + +class RecordingConnection implements Connection +{ + public function __construct(private readonly Recorder $recorder) + { + } + + private function record(string $method, array $args): void + { + $this->recorder->events[] = $method; + $this->recorder->calls[$method] = $args; + } + + public function rightPushArray(string $queue, array $payload): bool + { + $this->record('rightPushArray', [$queue, $payload]); + + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $this->record('rightPopArray', [$queue, $timeout]); + + return ['popped' => 'right']; + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + $this->record('rightPopLeftPushArray', [$queue, $destination, $timeout]); + + return ['rpoplpush' => true]; + } + + public function leftPushArray(string $queue, array $payload): bool + { + $this->record('leftPushArray', [$queue, $payload]); + + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + $this->record('leftPopArray', [$queue, $timeout]); + + return ['popped' => 'left']; + } + + public function rightPush(string $queue, string $payload): bool + { + $this->record('rightPush', [$queue, $payload]); + + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + $this->record('rightPop', [$queue, $timeout]); + + return 'right-pop'; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + $this->record('rightPopLeftPush', [$queue, $destination, $timeout]); + + return 'rpoplpush'; + } + + public function leftPush(string $queue, string $payload): bool + { + $this->record('leftPush', [$queue, $payload]); + + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + $this->record('leftPop', [$queue, $timeout]); + + return 'left-pop'; + } + + public function listRemove(string $queue, string $key): bool + { + $this->record('listRemove', [$queue, $key]); + + return true; + } + + public function listSize(string $key): int + { + $this->record('listSize', [$key]); + + return 7; + } + + public function listRange(string $key, int $total, int $offset): array + { + $this->record('listRange', [$key, $total, $offset]); + + return ['a', 'b']; + } + + public function remove(string $key): bool + { + $this->record('remove', [$key]); + + return true; + } + + public function move(string $queue, string $destination): bool + { + $this->record('move', [$queue, $destination]); + + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + $this->record('set', [$key, $value, $ttl]); + + return true; + } + + public function get(string $key): array|string|null + { + $this->record('get', [$key]); + + return 'value'; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + $this->record('setArray', [$key, $value, $ttl]); + + return true; + } + + public function increment(string $key): int + { + $this->record('increment', [$key]); + + return 3; + } + + public function decrement(string $key): int + { + $this->record('decrement', [$key]); + + return 2; + } + + public function ping(): bool + { + $this->record('ping', []); + + return true; + } + + public function close(): void + { + $this->record('close', []); + } +} + +class ThrowingConnection implements Connection +{ + public function rightPushArray(string $queue, array $payload): bool + { + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + return false; + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + return false; + } + + public function leftPushArray(string $queue, array $payload): bool + { + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + return false; + } + + public function rightPush(string $queue, string $payload): bool + { + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + return false; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + return false; + } + + public function leftPush(string $queue, string $payload): bool + { + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + return false; + } + + public function listRemove(string $queue, string $key): bool + { + return true; + } + + public function listSize(string $key): int + { + return 0; + } + + public function listRange(string $key, int $total, int $offset): array + { + return []; + } + + public function remove(string $key): bool + { + return true; + } + + public function move(string $queue, string $destination): bool + { + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + return true; + } + + public function get(string $key): array|string|null + { + return null; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + return true; + } + + public function increment(string $key): int + { + return 1; + } + + public function decrement(string $key): int + { + return 0; + } + + public function ping(): bool + { + throw new \RuntimeException('boom'); + } + + public function close(): void + { + } +}