From 70cc425548375e91f6902ea03840d3441eaad5ee Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 1 Jun 2026 14:05:20 +0100 Subject: [PATCH 1/2] Sample queue depth via observable gauge Migrate messaging.queue.depth from a synchronous gauge recorded on the message hot path to an observable gauge sampled by the telemetry SDK at each collection interval. The depth now stays fresh even when the queue is idle or stuck, instead of only being re-recorded once per processed message, and the per-message getQueueSize() call leaves the hot path. Requires utopia-php/telemetry 0.4.* for createObservableGauge. Co-Authored-By: Claude Opus 4.8 (1M context) --- composer.json | 2 +- composer.lock | 112 ++++++++++-------- src/Queue/Server.php | 43 ++++--- .../Queue/E2E/Adapter/ServerTelemetryTest.php | 45 ++++--- 4 files changed, 112 insertions(+), 90 deletions(-) diff --git a/composer.json b/composer.json index 46fda8e..a07a1f5 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,7 @@ "utopia-php/di": "0.3.*", "utopia-php/servers": "0.4.*", "utopia-php/pools": "1.*", - "utopia-php/telemetry": "0.2.*", + "utopia-php/telemetry": "0.4.*", "utopia-php/validators": "0.2.*" }, "require-dev": { diff --git a/composer.lock b/composer.lock index efb6a4e..0acc091 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "5d9941e9b20a90ae4e6170f6de617073", + "content-hash": "1f8c2839dc26ae8525c7adcca7838164", "packages": [ { "name": "brick/math", @@ -1549,16 +1549,16 @@ }, { "name": "symfony/deprecation-contracts", - "version": "v3.6.0", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/deprecation-contracts.git", - "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62" + "reference": "50f59d1f3ca46d41ac911f97a78626b6756af35b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/63afe740e99a13ba87ec199bb07bbdee937a5b62", - "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62", + "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/50f59d1f3ca46d41ac911f97a78626b6756af35b", + "reference": "50f59d1f3ca46d41ac911f97a78626b6756af35b", "shasum": "" }, "require": { @@ -1571,7 +1571,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -1596,7 +1596,7 @@ "description": "A generic function and convention to trigger deprecation notices", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/deprecation-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/deprecation-contracts/tree/v3.7.0" }, "funding": [ { @@ -1607,25 +1607,29 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2024-09-25T14:21:43+00:00" + "time": "2026-04-13T15:52:40+00:00" }, { "name": "symfony/http-client", - "version": "v7.4.9", + "version": "v7.4.13", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6" + "reference": "e8a112b8415707265a7e614278136a9d92989a6a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6", - "reference": "7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6", + "url": "https://api.github.com/repos/symfony/http-client/zipball/e8a112b8415707265a7e614278136a9d92989a6a", + "reference": "e8a112b8415707265a7e614278136a9d92989a6a", "shasum": "" }, "require": { @@ -1693,7 +1697,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.4.9" + "source": "https://github.com/symfony/http-client/tree/v7.4.13" }, "funding": [ { @@ -1713,20 +1717,20 @@ "type": "tidelift" } ], - "time": "2026-04-29T13:25:15+00:00" + "time": "2026-05-24T09:57:54+00:00" }, { "name": "symfony/http-client-contracts", - "version": "v3.6.0", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/http-client-contracts.git", - "reference": "75d7043853a42837e68111812f4d964b01e5101c" + "reference": "4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/75d7043853a42837e68111812f4d964b01e5101c", - "reference": "75d7043853a42837e68111812f4d964b01e5101c", + "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d", + "reference": "4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d", "shasum": "" }, "require": { @@ -1739,7 +1743,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -1775,7 +1779,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/http-client-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/http-client-contracts/tree/v3.7.0" }, "funding": [ { @@ -1786,25 +1790,29 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2025-04-29T11:18:49+00:00" + "time": "2026-03-06T13:17:50+00:00" }, { "name": "symfony/polyfill-mbstring", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-mbstring.git", - "reference": "6a21eb99c6973357967f6ce3708cd55a6bec6315" + "reference": "14c5439eec4ccff081ac14eca2dc57feb2a66d92" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/6a21eb99c6973357967f6ce3708cd55a6bec6315", - "reference": "6a21eb99c6973357967f6ce3708cd55a6bec6315", + "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/14c5439eec4ccff081ac14eca2dc57feb2a66d92", + "reference": "14c5439eec4ccff081ac14eca2dc57feb2a66d92", "shasum": "" }, "require": { @@ -1856,7 +1864,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.38.1" }, "funding": [ { @@ -1876,20 +1884,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T17:25:58+00:00" + "time": "2026-05-26T12:51:13+00:00" }, { "name": "symfony/polyfill-php82", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php82.git", - "reference": "34808efe3e68f69685796f7c253a2f1d8ea9df59" + "reference": "002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php82/zipball/34808efe3e68f69685796f7c253a2f1d8ea9df59", - "reference": "34808efe3e68f69685796f7c253a2f1d8ea9df59", + "url": "https://api.github.com/repos/symfony/polyfill-php82/zipball/002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b", + "reference": "002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b", "shasum": "" }, "require": { @@ -1936,7 +1944,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php82/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-php82/tree/v1.38.1" }, "funding": [ { @@ -1956,20 +1964,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T16:19:22+00:00" + "time": "2026-05-26T12:45:58+00:00" }, { "name": "symfony/polyfill-php83", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php83.git", - "reference": "3600c2cb22399e25bb226e4a135ce91eeb2a6149" + "reference": "8339098cae28673c15cce00d80734af0453054e2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php83/zipball/3600c2cb22399e25bb226e4a135ce91eeb2a6149", - "reference": "3600c2cb22399e25bb226e4a135ce91eeb2a6149", + "url": "https://api.github.com/repos/symfony/polyfill-php83/zipball/8339098cae28673c15cce00d80734af0453054e2", + "reference": "8339098cae28673c15cce00d80734af0453054e2", "shasum": "" }, "require": { @@ -2016,7 +2024,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php83/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-php83/tree/v1.38.1" }, "funding": [ { @@ -2036,20 +2044,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T17:25:58+00:00" + "time": "2026-05-26T12:51:13+00:00" }, { "name": "symfony/service-contracts", - "version": "v3.6.1", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/service-contracts.git", - "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43" + "reference": "d25d82433a80eba6aa0e6c24b61d7370d99e444a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/service-contracts/zipball/45112560a3ba2d715666a509a0bc9521d10b6c43", - "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43", + "url": "https://api.github.com/repos/symfony/service-contracts/zipball/d25d82433a80eba6aa0e6c24b61d7370d99e444a", + "reference": "d25d82433a80eba6aa0e6c24b61d7370d99e444a", "shasum": "" }, "require": { @@ -2067,7 +2075,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -2103,7 +2111,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/service-contracts/tree/v3.6.1" + "source": "https://github.com/symfony/service-contracts/tree/v3.7.0" }, "funding": [ { @@ -2123,7 +2131,7 @@ "type": "tidelift" } ], - "time": "2025-07-15T11:30:57+00:00" + "time": "2026-03-28T09:44:51+00:00" }, { "name": "tbachert/spi", @@ -2337,16 +2345,16 @@ }, { "name": "utopia-php/telemetry", - "version": "0.2.0", + "version": "0.4.0", "source": { "type": "git", "url": "https://github.com/utopia-php/telemetry.git", - "reference": "9997ebf59bb77920a7223ad73d834a76b09152c3" + "reference": "e0630df7d8176833cd4882f78814a5b893dcb0e0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/telemetry/zipball/9997ebf59bb77920a7223ad73d834a76b09152c3", - "reference": "9997ebf59bb77920a7223ad73d834a76b09152c3", + "url": "https://api.github.com/repos/utopia-php/telemetry/zipball/e0630df7d8176833cd4882f78814a5b893dcb0e0", + "reference": "e0630df7d8176833cd4882f78814a5b893dcb0e0", "shasum": "" }, "require": { @@ -2386,9 +2394,9 @@ ], "support": { "issues": "https://github.com/utopia-php/telemetry/issues", - "source": "https://github.com/utopia-php/telemetry/tree/0.2.0" + "source": "https://github.com/utopia-php/telemetry/tree/0.4.0" }, - "time": "2025-12-17T07:56:38+00:00" + "time": "2026-05-29T11:57:04+00:00" }, { "name": "utopia-php/validators", diff --git a/src/Queue/Server.php b/src/Queue/Server.php index c329c2b..172ecbe 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,8 +8,8 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; -use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; +use Utopia\Telemetry\ObservableGauge; use Utopia\Validator; class Server @@ -68,7 +68,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; - private Gauge $queueDepth; + private ObservableGauge $queueDepth; /** * Creates an instance of a Queue server. @@ -161,29 +161,31 @@ public function setTelemetry(Telemetry $telemetry): void ], ); - $this->queueDepth = $telemetry->createGauge( + $this->queueDepth = $telemetry->createObservableGauge( 'messaging.queue.depth', '{message}', 'Number of pending messages in the queue.', ); - } - private function recordQueueDepth(): void - { - if (!$this->adapter->consumer instanceof Publisher) { - return; - } + // Sampled by the telemetry SDK at each collection interval rather than + // on the message hot path, so the depth stays fresh even when the queue + // is idle or stuck and isn't re-recorded once per processed message. + $this->queueDepth->observe(function (callable $observe): void { + if (!$this->adapter->consumer instanceof Publisher) { + return; + } - try { - $this->queueDepth->record( - $this->adapter->consumer->getQueueSize($this->adapter->queue), - [ - 'messaging.destination.name' => $this->adapter->queue->name, - 'messaging.destination.namespace' => $this->adapter->queue->namespace, - ], - ); - } catch (Throwable) { - } + try { + $size = $this->adapter->consumer->getQueueSize($this->adapter->queue); + } catch (Throwable) { + return; + } + + $observe($size, [ + 'messaging.destination.name' => $this->adapter->queue->name, + 'messaging.destination.namespace' => $this->adapter->queue->namespace, + ]); + }); } /** @@ -242,8 +244,6 @@ public function start(): self $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } - $this->recordQueueDepth(); - $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { @@ -297,7 +297,6 @@ function (Message $message) { $processDuration = microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration); - $this->recordQueueDepth(); } }, function (Message $message) { diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 0cb27ae..9a33547 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -28,11 +28,11 @@ public function testRecordsQueueDepth(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([3, 2], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + // Each collection samples the live queue size, so two collections + // observe the two successive depths reported by the consumer. + $this->assertSame([3], $this->collectObservations($telemetry, 'messaging.queue.depth')); + $this->assertSame([2], $this->collectObservations($telemetry, 'messaging.queue.depth')); } public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void @@ -50,11 +50,8 @@ public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + $this->assertSame([], $this->collectObservations($telemetry, 'messaging.queue.depth')); } public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void @@ -72,13 +69,31 @@ public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + $this->assertSame([], $this->collectObservations($telemetry, 'messaging.queue.depth')); $this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters); } + + /** + * Invoke an observable gauge's registered callback once and capture the + * values it reports, mirroring a single telemetry collection cycle. + * + * @return array + */ + private function collectObservations(TestTelemetry $telemetry, string $name): array + { + /** @var object{callback: ?\Closure} $gauge */ + $gauge = $telemetry->observableGauges[$name]; + + $values = []; + if ($gauge->callback !== null) { + ($gauge->callback)(function (float|int $value, iterable $attributes = []) use (&$values): void { + $values[] = $value; + }); + } + + return $values; + } } final class ServerTelemetryAdapter extends Adapter From b55ebf0edb692965193929cdca17615778667355 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 1 Jun 2026 14:06:43 +0100 Subject: [PATCH 2/2] Drop explanatory comments Co-Authored-By: Claude Opus 4.8 (1M context) --- src/Queue/Server.php | 3 --- tests/Queue/E2E/Adapter/ServerTelemetryTest.php | 5 ----- 2 files changed, 8 deletions(-) diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 172ecbe..f822b6f 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -167,9 +167,6 @@ public function setTelemetry(Telemetry $telemetry): void 'Number of pending messages in the queue.', ); - // Sampled by the telemetry SDK at each collection interval rather than - // on the message hot path, so the depth stays fresh even when the queue - // is idle or stuck and isn't re-recorded once per processed message. $this->queueDepth->observe(function (callable $observe): void { if (!$this->adapter->consumer instanceof Publisher) { return; diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 9a33547..a4d1109 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -29,8 +29,6 @@ public function testRecordsQueueDepth(): void $server->start(); $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); - // Each collection samples the live queue size, so two collections - // observe the two successive depths reported by the consumer. $this->assertSame([3], $this->collectObservations($telemetry, 'messaging.queue.depth')); $this->assertSame([2], $this->collectObservations($telemetry, 'messaging.queue.depth')); } @@ -75,9 +73,6 @@ public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void } /** - * Invoke an observable gauge's registered callback once and capture the - * values it reports, mirroring a single telemetry collection cycle. - * * @return array */ private function collectObservations(TestTelemetry $telemetry, string $name): array