Skip to content

Add Locking connection decorator for coroutine-safe access#82

Merged
loks0n merged 2 commits into
mainfrom
broker-concurrency
Jun 1, 2026
Merged

Add Locking connection decorator for coroutine-safe access#82
loks0n merged 2 commits into
mainfrom
broker-concurrency

Conversation

@loks0n
Copy link
Copy Markdown
Contributor

@loks0n loks0n commented Jun 1, 2026

What

Adds Utopia\Queue\Connection\Locking — a decorator that wraps any Connection and serializes every command behind a single lock.

$connection = new Locking(new Redis('localhost'));
// optionally inject a shared/custom lock:
$connection = new Locking($redis, $mutex);

Why

A single \Redis/\RedisCluster socket cannot be shared by concurrent coroutines — their commands interleave on the wire and corrupt the request/response pairing. The decorator ensures only one command runs on the connection at a time.

How

  • Uses Utopia\Lock\Mutex from the new utopia-php/lock dependency, which gates on actual coroutine context (Coroutine::getCid() > 0) and degrades to a plain in-process flag outside Swoole (tests, Workerman, CLI).
  • Acquires with a -1 (wait-forever) timeout so commands queue rather than fail when the connection is momentarily busy.
  • Implemented as opt-in composition (like the Pool adapter) — Redis and RedisCluster are untouched. Each wrapper method calls the inner connection directly, so there's no lock reentrancy/self-deadlock even for the composite *Array methods.

Notes

  • Blocking pops (rightPop/leftPop/rightPopLeftPush) hold the lock for their full timeout (POP_TIMEOUT = 2s), serializing other commands during the wait — inherent to sharing one socket.

Verification

  • pint --test — PSR-12 pass (30 files)
  • phpstan analyse — no errors

🤖 Generated with Claude Code

Wraps any Connection and serializes every command behind a mutex
(utopia-php/lock) so concurrent coroutines sharing one connection
cannot interleave requests and responses on the same socket.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@loks0n loks0n force-pushed the broker-concurrency branch from e1c26ea to 51fb3a5 Compare June 1, 2026 18:54
@greptile-apps
Copy link
Copy Markdown

greptile-apps Bot commented Jun 1, 2026

Greptile Summary

Introduces Utopia\Queue\Connection\Locking, a decorator that wraps any Connection and serializes all commands behind a Utopia\Lock\Mutex, preventing coroutine command interleaving on a shared socket. The utopia-php/lock package is added as a new dependency.

  • All 22 Connection interface methods are delegated to the inner connection inside synchronize(), which acquires the lock with a wait-forever (-1) timeout.
  • Tests are thorough: per-operation ordering assertions, timeout value verification, exception propagation with guaranteed lock release, default lock type check, and a reflection-based regression guard that fails if new interface methods ship without coverage.

Confidence Score: 5/5

The change is purely additive — existing adapters are untouched and the decorator is opt-in — so there is no regression surface on current callers.

The decorator correctly delegates all interface methods, the lock is acquired with a consistent wait-forever timeout, and the test suite covers ordering, argument forwarding, exception handling, and interface completeness. No logic gaps or correctness issues were found.

No files require special attention.

Important Files Changed

Filename Overview
src/Queue/Connection/Locking.php New decorator that wraps any Connection and serializes every command behind a Mutex; all 22 interface methods delegated correctly.
tests/Queue/E2E/Adapter/LockingTest.php Comprehensive unit tests covering per-method synchronization, timeout value, exception propagation with lock release, default lock type, and interface coverage regression guard.
composer.json Adds utopia-php/lock 0.2.* dependency needed by the Locking decorator.

Reviews (2): Last reviewed commit: "Add unit tests for Locking connection de..." | Re-trigger Greptile

Comment on lines +73 to +91
public function rightPop(string $queue, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->rightPop($queue, $timeout));
}

public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->rightPopLeftPush($queue, $destination, $timeout));
}

public function leftPush(string $queue, string $payload): bool
{
return $this->synchronize(fn () => $this->connection->leftPush($queue, $payload));
}

public function leftPop(string $queue, int $timeout): string|false
{
return $this->synchronize(fn () => $this->connection->leftPop($queue, $timeout));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Blocking pops starve all other operations while waiting

rightPop, leftPop, and rightPopLeftPush (and their Array variants) hold the mutex for their full blocking timeout. In practice this means a worker coroutine sitting in a blocking pop for up to 2 s will prevent any other coroutine sharing this Locking instance from executing ping(), set(), get(), or any other command during that window. This is documented in the PR description as a known limitation, but callers using the same Locking instance for both queue polling and metadata operations (e.g. periodic heartbeats or stats updates) should be aware they will silently stall for the full pop timeout.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment thread src/Queue/Connection/Locking.php
Spy Lock + spy Connection verify every Connection method runs its
inner call wrapped in a single acquire/release pair, forwards its
arguments, and returns the inner result. Also covers wait-forever
acquire timeout, lock release on inner exception, the default Mutex,
and a reflection guard so new Connection methods cannot ship without
synchronization coverage.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@loks0n loks0n merged commit 954cd37 into main Jun 1, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant