-
Notifications
You must be signed in to change notification settings - Fork 4
Add Locking connection decorator for coroutine-safe access #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,152 @@ | ||
| <?php | ||
|
|
||
| namespace Utopia\Queue\Connection; | ||
|
|
||
| use Utopia\Lock\Lock; | ||
| use Utopia\Lock\Mutex; | ||
| use Utopia\Queue\Connection; | ||
|
|
||
| /** | ||
| * Wraps any {@see Connection} and serializes every command behind a single | ||
| * lock, so that concurrent coroutines sharing one connection cannot interleave | ||
| * their requests and responses on the same socket. | ||
| * | ||
| * Outside of a coroutine there is no preemption, so the lock degrades to a | ||
| * plain in-process flag (see {@see Mutex}). | ||
| */ | ||
| class Locking implements Connection | ||
| { | ||
| /** | ||
| * Wait forever when acquiring the lock; a command should never be dropped | ||
| * just because the connection is momentarily busy. | ||
| */ | ||
| private const float ACQUIRE_TIMEOUT = -1; | ||
|
|
||
| public function __construct( | ||
| protected readonly Connection $connection, | ||
| protected readonly Lock $lock = new Mutex(), | ||
| ) { | ||
| } | ||
|
|
||
| /** | ||
| * Run a command while holding the lock, ensuring only one runs at a time. | ||
| * | ||
| * @template T | ||
| * @param callable(): T $command | ||
| * @return T | ||
| */ | ||
| protected function synchronize(callable $command): mixed | ||
| { | ||
| return $this->lock->withLock($command, self::ACQUIRE_TIMEOUT); | ||
| } | ||
|
|
||
| public function rightPushArray(string $queue, array $payload): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPushArray($queue, $payload)); | ||
| } | ||
|
|
||
| public function rightPopArray(string $queue, int $timeout): array|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPopArray($queue, $timeout)); | ||
| } | ||
|
|
||
| public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPopLeftPushArray($queue, $destination, $timeout)); | ||
| } | ||
|
|
||
| public function leftPushArray(string $queue, array $payload): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPushArray($queue, $payload)); | ||
| } | ||
|
|
||
| public function leftPopArray(string $queue, int $timeout): array|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPopArray($queue, $timeout)); | ||
| } | ||
|
|
||
| public function rightPush(string $queue, string $payload): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPush($queue, $payload)); | ||
| } | ||
|
|
||
| public function rightPop(string $queue, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout)); | ||
| } | ||
|
|
||
| public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout)); | ||
| } | ||
|
|
||
| public function leftPush(string $queue, string $payload): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload)); | ||
| } | ||
|
|
||
| public function leftPop(string $queue, int $timeout): string|false | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout)); | ||
| } | ||
|
Comment on lines
+73
to
+91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
|
|
||
| public function listRemove(string $queue, string $key): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->listRemove($queue, $key)); | ||
| } | ||
|
|
||
| public function listSize(string $key): int | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->listSize($key)); | ||
| } | ||
|
|
||
| public function listRange(string $key, int $total, int $offset): array | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->listRange($key, $total, $offset)); | ||
| } | ||
|
|
||
| public function remove(string $key): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->remove($key)); | ||
| } | ||
|
|
||
| public function move(string $queue, string $destination): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->move($queue, $destination)); | ||
| } | ||
|
|
||
| public function set(string $key, string $value, int $ttl = 0): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->set($key, $value, $ttl)); | ||
| } | ||
|
|
||
| public function get(string $key): array|string|null | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->get($key)); | ||
| } | ||
|
|
||
| public function setArray(string $key, array $value, int $ttl = 0): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->setArray($key, $value, $ttl)); | ||
| } | ||
|
|
||
| public function increment(string $key): int | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->increment($key)); | ||
| } | ||
|
|
||
| public function decrement(string $key): int | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->decrement($key)); | ||
| } | ||
|
|
||
| public function ping(): bool | ||
| { | ||
| return $this->synchronize(fn () => $this->connection->ping()); | ||
| } | ||
|
|
||
| public function close(): void | ||
| { | ||
| $this->synchronize(fn () => $this->connection->close()); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.