Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"php": ">=8.3",
"php-amqplib/php-amqplib": "^3.7",
"utopia-php/di": "0.3.*",
"utopia-php/lock": "0.2.*",
"utopia-php/servers": "0.4.*",
"utopia-php/pools": "1.*",
"utopia-php/telemetry": "0.4.*",
Expand Down
53 changes: 52 additions & 1 deletion composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

152 changes: 152 additions & 0 deletions src/Queue/Connection/Locking.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?php

namespace Utopia\Queue\Connection;

use Utopia\Lock\Lock;
use Utopia\Lock\Mutex;
use Utopia\Queue\Connection;

/**
* Wraps any {@see Connection} and serializes every command behind a single
* lock, so that concurrent coroutines sharing one connection cannot interleave
* their requests and responses on the same socket.
*
* Outside of a coroutine there is no preemption, so the lock degrades to a
* plain in-process flag (see {@see Mutex}).
*/
class Locking implements Connection
Comment thread
loks0n marked this conversation as resolved.
{
/**
* Wait forever when acquiring the lock; a command should never be dropped
* just because the connection is momentarily busy.
*/
private const float ACQUIRE_TIMEOUT = -1;

public function __construct(
protected readonly Connection $connection,
protected readonly Lock $lock = new Mutex(),
) {
}

/**
* Run a command while holding the lock, ensuring only one runs at a time.
*
* @template T
* @param callable(): T $command
* @return T
*/
protected function synchronize(callable $command): mixed
{
return $this->lock->withLock($command, self::ACQUIRE_TIMEOUT);
}

public function rightPushArray(string $queue, array $payload): bool
{
return $this->synchronize(fn () => $this->connection->rightPushArray($queue, $payload));
}

public function rightPopArray(string $queue, int $timeout): array|false
{
return $this->synchronize(fn () => $this->connection->rightPopArray($queue, $timeout));
}

public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false
{
return $this->synchronize(fn () => $this->connection->rightPopLeftPushArray($queue, $destination, $timeout));
}

public function leftPushArray(string $queue, array $payload): bool
{
return $this->synchronize(fn () => $this->connection->leftPushArray($queue, $payload));
}

public function leftPopArray(string $queue, int $timeout): array|false
{
return $this->synchronize(fn () => $this->connection->leftPopArray($queue, $timeout));
}

public function rightPush(string $queue, string $payload): bool
{
return $this->synchronize(fn () => $this->connection->rightPush($queue, $payload));
}

public function rightPop(string $queue, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout));
}

public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout));
}

public function leftPush(string $queue, string $payload): bool
{
return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload));
}

public function leftPop(string $queue, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout));
}
Comment on lines +73 to +91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Blocking pops starve all other operations while waiting

rightPop, leftPop, and rightPopLeftPush (and their Array variants) hold the mutex for their full blocking timeout. In practice this means a worker coroutine sitting in a blocking pop for up to 2 s will prevent any other coroutine sharing this Locking instance from executing ping(), set(), get(), or any other command during that window. This is documented in the PR description as a known limitation, but callers using the same Locking instance for both queue polling and metadata operations (e.g. periodic heartbeats or stats updates) should be aware they will silently stall for the full pop timeout.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


public function listRemove(string $queue, string $key): bool
{
return $this->synchronize(fn () => $this->connection->listRemove($queue, $key));
}

public function listSize(string $key): int
{
return $this->synchronize(fn () => $this->connection->listSize($key));
}

public function listRange(string $key, int $total, int $offset): array
{
return $this->synchronize(fn () => $this->connection->listRange($key, $total, $offset));
}

public function remove(string $key): bool
{
return $this->synchronize(fn () => $this->connection->remove($key));
}

public function move(string $queue, string $destination): bool
{
return $this->synchronize(fn () => $this->connection->move($queue, $destination));
}

public function set(string $key, string $value, int $ttl = 0): bool
{
return $this->synchronize(fn () => $this->connection->set($key, $value, $ttl));
}

public function get(string $key): array|string|null
{
return $this->synchronize(fn () => $this->connection->get($key));
}

public function setArray(string $key, array $value, int $ttl = 0): bool
{
return $this->synchronize(fn () => $this->connection->setArray($key, $value, $ttl));
}

public function increment(string $key): int
{
return $this->synchronize(fn () => $this->connection->increment($key));
}

public function decrement(string $key): int
{
return $this->synchronize(fn () => $this->connection->decrement($key));
}

public function ping(): bool
{
return $this->synchronize(fn () => $this->connection->ping());
}

public function close(): void
{
$this->synchronize(fn () => $this->connection->close());
}
}
Loading
Loading