diff --git a/composer.json b/composer.json index 46fda8e..a07a1f5 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,7 @@ "utopia-php/di": "0.3.*", "utopia-php/servers": "0.4.*", "utopia-php/pools": "1.*", - "utopia-php/telemetry": "0.2.*", + "utopia-php/telemetry": "0.4.*", "utopia-php/validators": "0.2.*" }, "require-dev": { diff --git a/composer.lock b/composer.lock index efb6a4e..0acc091 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "5d9941e9b20a90ae4e6170f6de617073", + "content-hash": "1f8c2839dc26ae8525c7adcca7838164", "packages": [ { "name": "brick/math", @@ -1549,16 +1549,16 @@ }, { "name": "symfony/deprecation-contracts", - "version": "v3.6.0", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/deprecation-contracts.git", - "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62" + "reference": "50f59d1f3ca46d41ac911f97a78626b6756af35b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/63afe740e99a13ba87ec199bb07bbdee937a5b62", - "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62", + "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/50f59d1f3ca46d41ac911f97a78626b6756af35b", + "reference": "50f59d1f3ca46d41ac911f97a78626b6756af35b", "shasum": "" }, "require": { @@ -1571,7 +1571,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -1596,7 +1596,7 @@ "description": "A generic function and convention to trigger deprecation notices", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/deprecation-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/deprecation-contracts/tree/v3.7.0" }, "funding": [ { @@ -1607,25 +1607,29 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2024-09-25T14:21:43+00:00" + "time": "2026-04-13T15:52:40+00:00" }, { "name": "symfony/http-client", - "version": "v7.4.9", + "version": "v7.4.13", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6" + "reference": "e8a112b8415707265a7e614278136a9d92989a6a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6", - "reference": "7e941c6abf4e3bf7dca160bf0e11ef36a9f832f6", + "url": "https://api.github.com/repos/symfony/http-client/zipball/e8a112b8415707265a7e614278136a9d92989a6a", + "reference": "e8a112b8415707265a7e614278136a9d92989a6a", "shasum": "" }, "require": { @@ -1693,7 +1697,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.4.9" + "source": "https://github.com/symfony/http-client/tree/v7.4.13" }, "funding": [ { @@ -1713,20 +1717,20 @@ "type": "tidelift" } ], - "time": "2026-04-29T13:25:15+00:00" + "time": "2026-05-24T09:57:54+00:00" }, { "name": "symfony/http-client-contracts", - "version": "v3.6.0", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/http-client-contracts.git", - "reference": "75d7043853a42837e68111812f4d964b01e5101c" + "reference": "4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/75d7043853a42837e68111812f4d964b01e5101c", - "reference": "75d7043853a42837e68111812f4d964b01e5101c", + "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d", + "reference": "4a2d00c37651c0bdc2b9e1c773487a8bf4edb12d", "shasum": "" }, "require": { @@ -1739,7 +1743,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -1775,7 +1779,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/http-client-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/http-client-contracts/tree/v3.7.0" }, "funding": [ { @@ -1786,25 +1790,29 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2025-04-29T11:18:49+00:00" + "time": "2026-03-06T13:17:50+00:00" }, { "name": "symfony/polyfill-mbstring", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-mbstring.git", - "reference": "6a21eb99c6973357967f6ce3708cd55a6bec6315" + "reference": "14c5439eec4ccff081ac14eca2dc57feb2a66d92" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/6a21eb99c6973357967f6ce3708cd55a6bec6315", - "reference": "6a21eb99c6973357967f6ce3708cd55a6bec6315", + "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/14c5439eec4ccff081ac14eca2dc57feb2a66d92", + "reference": "14c5439eec4ccff081ac14eca2dc57feb2a66d92", "shasum": "" }, "require": { @@ -1856,7 +1864,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.38.1" }, "funding": [ { @@ -1876,20 +1884,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T17:25:58+00:00" + "time": "2026-05-26T12:51:13+00:00" }, { "name": "symfony/polyfill-php82", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php82.git", - "reference": "34808efe3e68f69685796f7c253a2f1d8ea9df59" + "reference": "002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php82/zipball/34808efe3e68f69685796f7c253a2f1d8ea9df59", - "reference": "34808efe3e68f69685796f7c253a2f1d8ea9df59", + "url": "https://api.github.com/repos/symfony/polyfill-php82/zipball/002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b", + "reference": "002dc0cfe5fd4ed6033d48f27d4f19a486c4b04b", "shasum": "" }, "require": { @@ -1936,7 +1944,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php82/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-php82/tree/v1.38.1" }, "funding": [ { @@ -1956,20 +1964,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T16:19:22+00:00" + "time": "2026-05-26T12:45:58+00:00" }, { "name": "symfony/polyfill-php83", - "version": "v1.37.0", + "version": "v1.38.1", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php83.git", - "reference": "3600c2cb22399e25bb226e4a135ce91eeb2a6149" + "reference": "8339098cae28673c15cce00d80734af0453054e2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-php83/zipball/3600c2cb22399e25bb226e4a135ce91eeb2a6149", - "reference": "3600c2cb22399e25bb226e4a135ce91eeb2a6149", + "url": "https://api.github.com/repos/symfony/polyfill-php83/zipball/8339098cae28673c15cce00d80734af0453054e2", + "reference": "8339098cae28673c15cce00d80734af0453054e2", "shasum": "" }, "require": { @@ -2016,7 +2024,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php83/tree/v1.37.0" + "source": "https://github.com/symfony/polyfill-php83/tree/v1.38.1" }, "funding": [ { @@ -2036,20 +2044,20 @@ "type": "tidelift" } ], - "time": "2026-04-10T17:25:58+00:00" + "time": "2026-05-26T12:51:13+00:00" }, { "name": "symfony/service-contracts", - "version": "v3.6.1", + "version": "v3.7.0", "source": { "type": "git", "url": "https://github.com/symfony/service-contracts.git", - "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43" + "reference": "d25d82433a80eba6aa0e6c24b61d7370d99e444a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/service-contracts/zipball/45112560a3ba2d715666a509a0bc9521d10b6c43", - "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43", + "url": "https://api.github.com/repos/symfony/service-contracts/zipball/d25d82433a80eba6aa0e6c24b61d7370d99e444a", + "reference": "d25d82433a80eba6aa0e6c24b61d7370d99e444a", "shasum": "" }, "require": { @@ -2067,7 +2075,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.6-dev" + "dev-main": "3.7-dev" } }, "autoload": { @@ -2103,7 +2111,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/service-contracts/tree/v3.6.1" + "source": "https://github.com/symfony/service-contracts/tree/v3.7.0" }, "funding": [ { @@ -2123,7 +2131,7 @@ "type": "tidelift" } ], - "time": "2025-07-15T11:30:57+00:00" + "time": "2026-03-28T09:44:51+00:00" }, { "name": "tbachert/spi", @@ -2337,16 +2345,16 @@ }, { "name": "utopia-php/telemetry", - "version": "0.2.0", + "version": "0.4.0", "source": { "type": "git", "url": "https://github.com/utopia-php/telemetry.git", - "reference": "9997ebf59bb77920a7223ad73d834a76b09152c3" + "reference": "e0630df7d8176833cd4882f78814a5b893dcb0e0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/telemetry/zipball/9997ebf59bb77920a7223ad73d834a76b09152c3", - "reference": "9997ebf59bb77920a7223ad73d834a76b09152c3", + "url": "https://api.github.com/repos/utopia-php/telemetry/zipball/e0630df7d8176833cd4882f78814a5b893dcb0e0", + "reference": "e0630df7d8176833cd4882f78814a5b893dcb0e0", "shasum": "" }, "require": { @@ -2386,9 +2394,9 @@ ], "support": { "issues": "https://github.com/utopia-php/telemetry/issues", - "source": "https://github.com/utopia-php/telemetry/tree/0.2.0" + "source": "https://github.com/utopia-php/telemetry/tree/0.4.0" }, - "time": "2025-12-17T07:56:38+00:00" + "time": "2026-05-29T11:57:04+00:00" }, { "name": "utopia-php/validators", diff --git a/src/Queue/Server.php b/src/Queue/Server.php index c329c2b..f822b6f 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,8 +8,8 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; -use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; +use Utopia\Telemetry\ObservableGauge; use Utopia\Validator; class Server @@ -68,7 +68,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; - private Gauge $queueDepth; + private ObservableGauge $queueDepth; /** * Creates an instance of a Queue server. @@ -161,29 +161,28 @@ public function setTelemetry(Telemetry $telemetry): void ], ); - $this->queueDepth = $telemetry->createGauge( + $this->queueDepth = $telemetry->createObservableGauge( 'messaging.queue.depth', '{message}', 'Number of pending messages in the queue.', ); - } - private function recordQueueDepth(): void - { - if (!$this->adapter->consumer instanceof Publisher) { - return; - } + $this->queueDepth->observe(function (callable $observe): void { + if (!$this->adapter->consumer instanceof Publisher) { + return; + } - try { - $this->queueDepth->record( - $this->adapter->consumer->getQueueSize($this->adapter->queue), - [ - 'messaging.destination.name' => $this->adapter->queue->name, - 'messaging.destination.namespace' => $this->adapter->queue->namespace, - ], - ); - } catch (Throwable) { - } + try { + $size = $this->adapter->consumer->getQueueSize($this->adapter->queue); + } catch (Throwable) { + return; + } + + $observe($size, [ + 'messaging.destination.name' => $this->adapter->queue->name, + 'messaging.destination.namespace' => $this->adapter->queue->namespace, + ]); + }); } /** @@ -242,8 +241,6 @@ public function start(): self $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } - $this->recordQueueDepth(); - $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { @@ -297,7 +294,6 @@ function (Message $message) { $processDuration = microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration); - $this->recordQueueDepth(); } }, function (Message $message) { diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 0cb27ae..a4d1109 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -28,11 +28,9 @@ public function testRecordsQueueDepth(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([3, 2], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + $this->assertSame([3], $this->collectObservations($telemetry, 'messaging.queue.depth')); + $this->assertSame([2], $this->collectObservations($telemetry, 'messaging.queue.depth')); } public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void @@ -50,11 +48,8 @@ public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + $this->assertSame([], $this->collectObservations($telemetry, 'messaging.queue.depth')); } public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void @@ -72,13 +67,28 @@ public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); - /** @var object{values: array} $queueDepth */ - $queueDepth = $telemetry->gauges['messaging.queue.depth']; - $this->assertObjectHasProperty('values', $queueDepth); - $this->assertSame([], $queueDepth->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->observableGauges); + $this->assertSame([], $this->collectObservations($telemetry, 'messaging.queue.depth')); $this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters); } + + /** + * @return array + */ + private function collectObservations(TestTelemetry $telemetry, string $name): array + { + /** @var object{callback: ?\Closure} $gauge */ + $gauge = $telemetry->observableGauges[$name]; + + $values = []; + if ($gauge->callback !== null) { + ($gauge->callback)(function (float|int $value, iterable $attributes = []) use (&$values): void { + $values[] = $value; + }); + } + + return $values; + } } final class ServerTelemetryAdapter extends Adapter