Skip to content

Commit e1c26ea

Browse files
loks0nclaude
andcommitted
Add Locking connection decorator for coroutine-safe access
Wraps any Connection and serializes every command behind a mutex (utopia-php/lock) so concurrent coroutines sharing one connection cannot interleave requests and responses on the same socket. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 08e361d commit e1c26ea

3 files changed

Lines changed: 206 additions & 2 deletions

File tree

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"php-amqplib/php-amqplib": "^3.7",
2929
"utopia-php/servers": "0.2.*",
3030
"utopia-php/fetch": "0.5.*",
31+
"utopia-php/lock": "0.2.*",
3132
"utopia-php/pools": "1.*",
3233
"utopia-php/telemetry": "0.2.*",
3334
"utopia-php/validators": "0.2.*"

composer.lock

Lines changed: 53 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Queue/Connection/Locking.php

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
<?php
2+
3+
namespace Utopia\Queue\Connection;
4+
5+
use Utopia\Lock\Lock;
6+
use Utopia\Lock\Mutex;
7+
use Utopia\Queue\Connection;
8+
9+
/**
10+
* Wraps any {@see Connection} and serializes every command behind a single
11+
* lock, so that concurrent coroutines sharing one connection cannot interleave
12+
* their requests and responses on the same socket.
13+
*
14+
* Outside of a coroutine there is no preemption, so the lock degrades to a
15+
* plain in-process flag (see {@see Mutex}).
16+
*/
17+
class Locking implements Connection
18+
{
19+
/**
20+
* Wait forever when acquiring the lock; a command should never be dropped
21+
* just because the connection is momentarily busy.
22+
*/
23+
private const float ACQUIRE_TIMEOUT = -1;
24+
25+
public function __construct(
26+
protected readonly Connection $connection,
27+
protected readonly Lock $lock = new Mutex(),
28+
) {
29+
}
30+
31+
/**
32+
* Run a command while holding the lock, ensuring only one runs at a time.
33+
*
34+
* @template T
35+
* @param callable(): T $command
36+
* @return T
37+
*/
38+
protected function synchronize(callable $command): mixed
39+
{
40+
return $this->lock->withLock($command, self::ACQUIRE_TIMEOUT);
41+
}
42+
43+
public function rightPushArray(string $queue, array $payload): bool
44+
{
45+
return $this->synchronize(fn () => $this->connection->rightPushArray($queue, $payload));
46+
}
47+
48+
public function rightPopArray(string $queue, int $timeout): array|false
49+
{
50+
return $this->synchronize(fn () => $this->connection->rightPopArray($queue, $timeout));
51+
}
52+
53+
public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false
54+
{
55+
return $this->synchronize(fn () => $this->connection->rightPopLeftPushArray($queue, $destination, $timeout));
56+
}
57+
58+
public function leftPushArray(string $queue, array $payload): bool
59+
{
60+
return $this->synchronize(fn () => $this->connection->leftPushArray($queue, $payload));
61+
}
62+
63+
public function leftPopArray(string $queue, int $timeout): array|false
64+
{
65+
return $this->synchronize(fn () => $this->connection->leftPopArray($queue, $timeout));
66+
}
67+
68+
public function rightPush(string $queue, string $payload): bool
69+
{
70+
return $this->synchronize(fn () => $this->connection->rightPush($queue, $payload));
71+
}
72+
73+
public function rightPop(string $queue, int $timeout): string|false
74+
{
75+
return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout));
76+
}
77+
78+
public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
79+
{
80+
return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout));
81+
}
82+
83+
public function leftPush(string $queue, string $payload): bool
84+
{
85+
return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload));
86+
}
87+
88+
public function leftPop(string $queue, int $timeout): string|false
89+
{
90+
return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout));
91+
}
92+
93+
public function listRemove(string $queue, string $key): bool
94+
{
95+
return $this->synchronize(fn () => $this->connection->listRemove($queue, $key));
96+
}
97+
98+
public function listSize(string $key): int
99+
{
100+
return $this->synchronize(fn () => $this->connection->listSize($key));
101+
}
102+
103+
public function listRange(string $key, int $total, int $offset): array
104+
{
105+
return $this->synchronize(fn () => $this->connection->listRange($key, $total, $offset));
106+
}
107+
108+
public function remove(string $key): bool
109+
{
110+
return $this->synchronize(fn () => $this->connection->remove($key));
111+
}
112+
113+
public function move(string $queue, string $destination): bool
114+
{
115+
return $this->synchronize(fn () => $this->connection->move($queue, $destination));
116+
}
117+
118+
public function set(string $key, string $value, int $ttl = 0): bool
119+
{
120+
return $this->synchronize(fn () => $this->connection->set($key, $value, $ttl));
121+
}
122+
123+
public function get(string $key): array|string|null
124+
{
125+
return $this->synchronize(fn () => $this->connection->get($key));
126+
}
127+
128+
public function setArray(string $key, array $value, int $ttl = 0): bool
129+
{
130+
return $this->synchronize(fn () => $this->connection->setArray($key, $value, $ttl));
131+
}
132+
133+
public function increment(string $key): int
134+
{
135+
return $this->synchronize(fn () => $this->connection->increment($key));
136+
}
137+
138+
public function decrement(string $key): int
139+
{
140+
return $this->synchronize(fn () => $this->connection->decrement($key));
141+
}
142+
143+
public function ping(): bool
144+
{
145+
return $this->synchronize(fn () => $this->connection->ping());
146+
}
147+
148+
public function close(): void
149+
{
150+
$this->synchronize(fn () => $this->connection->close());
151+
}
152+
}

0 commit comments

Comments
 (0)