From 83de132088231b2f7fd335182b950179dbf0b5a6 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 16:38:03 -0400 Subject: [PATCH 01/17] Add optional publish acknowledgement --- .github/workflows/tests.yml | 2 + composer.lock | 113 ++++++++++++------------ src/Queue/Broker/AMQP.php | 78 ++++++++++++++-- src/Queue/Broker/AMQPSwoole.php | 20 ++--- tests/Queue/E2E/Adapter/AMQPAckTest.php | 21 +++++ 5 files changed, 159 insertions(+), 75 deletions(-) create mode 100644 tests/Queue/E2E/Adapter/AMQPAckTest.php diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ed5bb80..f870f0a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,6 +16,8 @@ jobs: adapter: [ AMQP, + AMQPAck + AMQPSwoole Pool, SwooleRedisCluster, Swoole, diff --git a/composer.lock b/composer.lock index b0efdc6..fc63597 100644 --- a/composer.lock +++ b/composer.lock @@ -430,16 +430,16 @@ }, { "name": "open-telemetry/api", - "version": "1.3.0", + "version": "1.4.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86" + "reference": "b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/4e3bb38e069876fb73c2ce85c89583bf2b28cd86", - "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7", + "reference": "b3a9286f9c1c8247c83493c5b1fa475cd0cec7f7", "shasum": "" }, "require": { @@ -459,7 +459,7 @@ ] }, "branch-alias": { - "dev-main": "1.1.x-dev" + "dev-main": "1.4.x-dev" } }, "autoload": { @@ -496,7 +496,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-07T12:32:21+00:00" + "time": "2025-06-19T23:36:51+00:00" }, { "name": "open-telemetry/context", @@ -559,16 +559,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.3.1", + "version": "1.3.2", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e" + "reference": "196f3a1dbce3b2c0f8110d164232c11ac00ddbb2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/8b3ca1f86d01429c73b407bf1a2075d9c187001e", - "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/196f3a1dbce3b2c0f8110d164232c11ac00ddbb2", + "reference": "196f3a1dbce3b2c0f8110d164232c11ac00ddbb2", "shasum": "" }, "require": { @@ -619,7 +619,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-21T12:02:20+00:00" + "time": "2025-06-16T00:24:51+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -686,22 +686,22 @@ }, { "name": "open-telemetry/sdk", - "version": "1.5.0", + "version": "1.6.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657" + "reference": "1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/cd0d7367599717fc29e04eb8838ec061e6c2c657", - "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a", + "reference": "1c0371794e4c0700afd4a9d4d8511cb5e3f78e6a", "shasum": "" }, "require": { "ext-json": "*", "nyholm/psr7-server": "^1.1", - "open-telemetry/api": "~1.0 || ~1.1", + "open-telemetry/api": "~1.4.0", "open-telemetry/context": "^1.0", "open-telemetry/sem-conv": "^1.0", "php": "^8.1", @@ -724,6 +724,10 @@ "type": "library", "extra": { "spi": { + "OpenTelemetry\\API\\Configuration\\ConfigEnv\\EnvComponentLoader": [ + "OpenTelemetry\\API\\Instrumentation\\Configuration\\General\\ConfigEnv\\EnvComponentLoaderHttpConfig", + "OpenTelemetry\\API\\Instrumentation\\Configuration\\General\\ConfigEnv\\EnvComponentLoaderPeerConfig" + ], "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\HookManagerInterface": [ "OpenTelemetry\\API\\Instrumentation\\AutoInstrumentation\\ExtensionHookManager" ] @@ -772,20 +776,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-22T02:33:34+00:00" + "time": "2025-06-19T23:36:51+00:00" }, { "name": "open-telemetry/sem-conv", - "version": "1.32.0", + "version": "1.32.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sem-conv.git", - "reference": "16585cc0dbc3032a318e274043454679430d2ebf" + "reference": "94daa85ea61a8e2b7e1b0af6be0e875bedda7c22" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/16585cc0dbc3032a318e274043454679430d2ebf", - "reference": "16585cc0dbc3032a318e274043454679430d2ebf", + "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/94daa85ea61a8e2b7e1b0af6be0e875bedda7c22", + "reference": "94daa85ea61a8e2b7e1b0af6be0e875bedda7c22", "shasum": "" }, "require": { @@ -829,7 +833,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-05T03:58:53+00:00" + "time": "2025-06-24T02:32:27+00:00" }, { "name": "paragonie/constant_time_encoding", @@ -1029,16 +1033,16 @@ }, { "name": "phpseclib/phpseclib", - "version": "3.0.45", + "version": "3.0.46", "source": { "type": "git", "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8" + "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/bd81b90d5963c6b9d87de50357585375223f4dd8", - "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", + "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", "shasum": "" }, "require": { @@ -1119,7 +1123,7 @@ ], "support": { "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.45" + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.46" }, "funding": [ { @@ -1135,7 +1139,7 @@ "type": "tidelift" } ], - "time": "2025-06-22T22:54:43+00:00" + "time": "2025-06-26T16:29:55+00:00" }, { "name": "psr/container", @@ -1478,21 +1482,20 @@ }, { "name": "ramsey/uuid", - "version": "4.8.1", + "version": "4.9.0", "source": { "type": "git", "url": "https://github.com/ramsey/uuid.git", - "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28" + "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ramsey/uuid/zipball/fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", - "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", + "url": "https://api.github.com/repos/ramsey/uuid/zipball/4e0e23cc785f0724a0e838279a9eb03f28b092a0", + "reference": "4e0e23cc785f0724a0e838279a9eb03f28b092a0", "shasum": "" }, "require": { "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12 || ^0.13", - "ext-json": "*", "php": "^8.0", "ramsey/collection": "^1.2 || ^2.0" }, @@ -1551,9 +1554,9 @@ ], "support": { "issues": "https://github.com/ramsey/uuid/issues", - "source": "https://github.com/ramsey/uuid/tree/4.8.1" + "source": "https://github.com/ramsey/uuid/tree/4.9.0" }, - "time": "2025-06-01T06:28:46+00:00" + "time": "2025-06-25T14:20:11+00:00" }, { "name": "symfony/deprecation-contracts", @@ -1624,16 +1627,16 @@ }, { "name": "symfony/http-client", - "version": "v7.3.0", + "version": "v7.3.1", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9" + "reference": "4403d87a2c16f33345dca93407a8714ee8c05a64" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/57e4fb86314015a695a750ace358d07a7e37b8a9", - "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9", + "url": "https://api.github.com/repos/symfony/http-client/zipball/4403d87a2c16f33345dca93407a8714ee8c05a64", + "reference": "4403d87a2c16f33345dca93407a8714ee8c05a64", "shasum": "" }, "require": { @@ -1645,6 +1648,7 @@ }, "conflict": { "amphp/amp": "<2.5", + "amphp/socket": "<1.1", "php-http/discovery": "<1.15", "symfony/http-foundation": "<6.4" }, @@ -1657,7 +1661,6 @@ "require-dev": { "amphp/http-client": "^4.2.1|^5.0", "amphp/http-tunnel": "^1.0|^2.0", - "amphp/socket": "^1.1", "guzzlehttp/promises": "^1.4|^2.0", "nyholm/psr7": "^1.0", "php-http/httplug": "^1.0|^2.0", @@ -1699,7 +1702,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.3.0" + "source": "https://github.com/symfony/http-client/tree/v7.3.1" }, "funding": [ { @@ -1715,7 +1718,7 @@ "type": "tidelift" } ], - "time": "2025-05-02T08:23:16+00:00" + "time": "2025-06-28T07:58:39+00:00" }, { "name": "symfony/http-client-contracts", @@ -2037,16 +2040,16 @@ }, { "name": "tbachert/spi", - "version": "v1.0.3", + "version": "v1.0.5", "source": { "type": "git", "url": "https://github.com/Nevay/spi.git", - "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a" + "reference": "e7078767866d0a9e0f91d3f9d42a832df5e39002" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/Nevay/spi/zipball/506a79c98e1a51522e76ee921ccb6c62d52faf3a", - "reference": "506a79c98e1a51522e76ee921ccb6c62d52faf3a", + "url": "https://api.github.com/repos/Nevay/spi/zipball/e7078767866d0a9e0f91d3f9d42a832df5e39002", + "reference": "e7078767866d0a9e0f91d3f9d42a832df5e39002", "shasum": "" }, "require": { @@ -2064,7 +2067,7 @@ "extra": { "class": "Nevay\\SPI\\Composer\\Plugin", "branch-alias": { - "dev-main": "0.2.x-dev" + "dev-main": "1.0.x-dev" }, "plugin-optional": true }, @@ -2083,9 +2086,9 @@ ], "support": { "issues": "https://github.com/Nevay/spi/issues", - "source": "https://github.com/Nevay/spi/tree/v1.0.3" + "source": "https://github.com/Nevay/spi/tree/v1.0.5" }, - "time": "2025-04-02T19:38:14+00:00" + "time": "2025-06-29T15:42:06+00:00" }, { "name": "utopia-php/cli", @@ -2510,16 +2513,16 @@ }, { "name": "myclabs/deep-copy", - "version": "1.13.1", + "version": "1.13.2", "source": { "type": "git", "url": "https://github.com/myclabs/DeepCopy.git", - "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c" + "reference": "d25e62e636b0a9b01e3bdebb7823b474876dd829" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/1720ddd719e16cf0db4eb1c6eca108031636d46c", - "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/d25e62e636b0a9b01e3bdebb7823b474876dd829", + "reference": "d25e62e636b0a9b01e3bdebb7823b474876dd829", "shasum": "" }, "require": { @@ -2558,7 +2561,7 @@ ], "support": { "issues": "https://github.com/myclabs/DeepCopy/issues", - "source": "https://github.com/myclabs/DeepCopy/tree/1.13.1" + "source": "https://github.com/myclabs/DeepCopy/tree/1.13.2" }, "funding": [ { @@ -2566,7 +2569,7 @@ "type": "tidelift" } ], - "time": "2025-04-29T12:36:36+00:00" + "time": "2025-07-04T14:07:32+00:00" }, { "name": "nikic/php-parser", diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f547038..ca5cb0a 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -5,9 +5,13 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exception\AMQPChannelClosedException; +use PhpAmqpLib\Exception\AMQPConnectionBlockedException; +use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use PhpAmqpLib\Exception\AMQPTimeoutException; use Utopia\Fetch\Client; use Utopia\Queue\Consumer; use Utopia\Queue\Error\Retryable; @@ -20,6 +24,7 @@ class AMQP implements Publisher, Consumer { protected ?AMQPChannel $channel = null; + private array $exchangeArguments = []; private array $queueArguments = []; private array $consumerArguments = []; @@ -43,10 +48,25 @@ public function __construct( protected readonly string $vhost = '/', protected readonly int $heartbeat = 0, protected readonly float $connectTimeout = 3.0, - protected readonly float $readWriteTimeout = 3.0 + protected readonly float $readWriteTimeout = 3.0, + protected float $ackTimeout = 5.0, + protected bool $requireAck = true, ) { } + /** + * Enable or disable waiting for publisher confirms. + */ + public function setRequireAck(bool $require): void + { + $this->requireAck = $require; + } + + public function setAckTimeout(float $timeout): void + { + $this->ackTimeout = $timeout; + } + public function setExchangeArgument(string $key, string $value): void { $this->exchangeArguments[$key] = $value; @@ -134,6 +154,9 @@ public function close(): void $this->channel?->getConnection()?->close(); } + /** + * @throws \Exception + */ public function enqueue(Queue $queue, array $payload): bool { $payload = [ @@ -142,10 +165,44 @@ public function enqueue(Queue $queue, array $payload): bool 'timestamp' => time(), 'payload' => $payload ]; - $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); + + $message = new AMQPMessage( + \json_encode($payload), + [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ] + ); + $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { - $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); + try { + $channel->basic_publish( + $message, + exchange: $queue->namespace, + routing_key: $queue->name, + mandatory: $this->requireAck + ); + + if (!$this->requireAck) { + // No need to wait for ack if not required + return; + } + + $channel->wait_for_pending_acks($this->ackTimeout); + } catch ( + AMQPTimeoutException | + AMQPConnectionClosedException | + AMQPChannelClosedException | + AMQPConnectionBlockedException $e + ) { + // Retry sending the message if ack is not received or connection has issues + continue; + } + + // Exit the loop if ack is received + break; }); + return true; } @@ -198,12 +255,19 @@ protected function withChannel(callable $callback): void read_write_timeout: $this->readWriteTimeout, heartbeat: $this->heartbeat, ); - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); + if (\is_callable($this->connectionConfigHook)) { + ($this->connectionConfigHook)($connection); } + $channel = $connection->channel(); - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); + + if (\is_callable($this->channelConfigHook)) { + ($this->channelConfigHook)($channel); + } + + // Enable publisher confirms if required + if ($this->requireAck) { + $channel->confirm_select(); } return $channel; }; diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php index 0803e57..3120d53 100644 --- a/src/Queue/Broker/AMQPSwoole.php +++ b/src/Queue/Broker/AMQPSwoole.php @@ -22,25 +22,19 @@ protected function withChannel(callable $callback): void $this->user, $this->password, $this->vhost, - false, // insist - 'AMQPLAIN', // login_method - 'en_US', // locale - $this->connectTimeout, // connection_timeout - $this->readWriteTimeout, // read_write_timeout - null, // context - false, // keepalive - $this->heartbeat, // heartbeat - 0.0 // channel_rpc_timeout + connection_timeout: $this->connectTimeout, + read_write_timeout: $this->readWriteTimeout, + heartbeat: $this->heartbeat, ); - if (is_callable($this->connectionConfigHook)) { - call_user_func($this->connectionConfigHook, $connection); + if (\is_callable($this->connectionConfigHook)) { + ($this->connectionConfigHook)($connection); } $channel = $connection->channel(); - if (is_callable($this->channelConfigHook)) { - call_user_func($this->channelConfigHook, $channel); + if (\is_callable($this->channelConfigHook)) { + ($this->channelConfigHook)($channel); } return $channel; diff --git a/tests/Queue/E2E/Adapter/AMQPAckTest.php b/tests/Queue/E2E/Adapter/AMQPAckTest.php new file mode 100644 index 0000000..9c06d3d --- /dev/null +++ b/tests/Queue/E2E/Adapter/AMQPAckTest.php @@ -0,0 +1,21 @@ + Date: Fri, 4 Jul 2025 16:39:30 -0400 Subject: [PATCH 02/17] Add enqueue retry loop --- src/Queue/Broker/AMQP.php | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index ca5cb0a..fbd5e2f 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -50,6 +50,7 @@ public function __construct( protected readonly float $connectTimeout = 3.0, protected readonly float $readWriteTimeout = 3.0, protected float $ackTimeout = 5.0, + protected int $maxEnqueueAttempts = 3, protected bool $requireAck = true, ) { } @@ -67,6 +68,11 @@ public function setAckTimeout(float $timeout): void $this->ackTimeout = $timeout; } + public function setMaxEnqueueAttempts(int $maxEnqueueAttempts): void + { + $this->maxEnqueueAttempts = $maxEnqueueAttempts; + } + public function setExchangeArgument(string $key, string $value): void { $this->exchangeArguments[$key] = $value; @@ -175,6 +181,7 @@ public function enqueue(Queue $queue, array $payload): bool ); $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { + for ($attempts = 0; $attempts < $this->maxEnqueueAttempts; $attempts++) { try { $channel->basic_publish( $message, @@ -201,6 +208,7 @@ public function enqueue(Queue $queue, array $payload): bool // Exit the loop if ack is received break; + } }); return true; From 0be2649f3029757839ab63c7dadb9ca62b366b93 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 16:40:48 -0400 Subject: [PATCH 03/17] Update amqplib --- composer.json | 2 +- composer.lock | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/composer.json b/composer.json index 076dad8..5c7986f 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "appwrite-labs/php-amqplib": "^0.1", + "appwrite-labs/php-amqplib": "fix-memory-dev", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", diff --git a/composer.lock b/composer.lock index fc63597..3d6f77b 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "9de2edbb13039237d2a64acf9578bc19", + "content-hash": "aef7a10af70ea69436901bc4f9dd7543", "packages": [ { "name": "appwrite-labs/php-amqplib", - "version": "0.1.1", + "version": "dev-fix-memory", "source": { "type": "git", "url": "https://github.com/appwrite-labs/php-amqplib.git", - "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861" + "reference": "38f1fa283674ac1ab3156482deefd25f316309f1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/bd380cbd63c8c0f063a3893b7a0b889d40876861", - "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861", + "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/38f1fa283674ac1ab3156482deefd25f316309f1", + "reference": "38f1fa283674ac1ab3156482deefd25f316309f1", "shasum": "" }, "require": { @@ -95,9 +95,9 @@ "swoole" ], "support": { - "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.1" + "source": "https://github.com/appwrite-labs/php-amqplib/tree/fix-memory" }, - "time": "2025-06-24T18:12:57+00:00" + "time": "2025-07-04T20:40:09+00:00" }, { "name": "brick/math", @@ -4356,7 +4356,9 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": {}, + "stability-flags": { + "appwrite-labs/php-amqplib": 20 + }, "prefer-stable": false, "prefer-lowest": false, "platform": { From 1185ef27d9587946a4f0ad720e6dc83729266383 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 16:55:31 -0400 Subject: [PATCH 04/17] Update release --- composer.json | 2 +- composer.lock | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/composer.json b/composer.json index 5c7986f..cd43171 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "appwrite-labs/php-amqplib": "fix-memory-dev", + "appwrite-labs/php-amqplib": "0.1.*", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", diff --git a/composer.lock b/composer.lock index 3d6f77b..d9ca559 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "aef7a10af70ea69436901bc4f9dd7543", + "content-hash": "6f6f51f98de6804fe085183cc08e6f94", "packages": [ { "name": "appwrite-labs/php-amqplib", - "version": "dev-fix-memory", + "version": "0.1.2", "source": { "type": "git", "url": "https://github.com/appwrite-labs/php-amqplib.git", - "reference": "38f1fa283674ac1ab3156482deefd25f316309f1" + "reference": "c8e043045388ddad5ddab5f48df2b9046ca6873f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/38f1fa283674ac1ab3156482deefd25f316309f1", - "reference": "38f1fa283674ac1ab3156482deefd25f316309f1", + "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/c8e043045388ddad5ddab5f48df2b9046ca6873f", + "reference": "c8e043045388ddad5ddab5f48df2b9046ca6873f", "shasum": "" }, "require": { @@ -95,9 +95,9 @@ "swoole" ], "support": { - "source": "https://github.com/appwrite-labs/php-amqplib/tree/fix-memory" + "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.2" }, - "time": "2025-07-04T20:40:09+00:00" + "time": "2025-07-04T20:54:22+00:00" }, { "name": "brick/math", @@ -4356,9 +4356,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": { - "appwrite-labs/php-amqplib": 20 - }, + "stability-flags": {}, "prefer-stable": false, "prefer-lowest": false, "platform": { From 2199875652c83d0dcdd4ca71d9ac807ea9e10b4b Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 17:16:38 -0400 Subject: [PATCH 05/17] Fix workflow --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f870f0a..c757ff5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -16,8 +16,8 @@ jobs: adapter: [ AMQP, - AMQPAck - AMQPSwoole + AMQPAck, + AMQPSwoole, Pool, SwooleRedisCluster, Swoole, From 02c553924d04847178df13a62f9b1a641c1fc496 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 17:21:37 -0400 Subject: [PATCH 06/17] Default ack not required --- src/Queue/Broker/AMQP.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index fbd5e2f..94e7aa7 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -51,7 +51,7 @@ public function __construct( protected readonly float $readWriteTimeout = 3.0, protected float $ackTimeout = 5.0, protected int $maxEnqueueAttempts = 3, - protected bool $requireAck = true, + protected bool $requireAck = false, ) { } From e0d8b726f3fb4bebf1928a0a791edc41f2a4f684 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 17:28:50 -0400 Subject: [PATCH 07/17] Update dev --- composer.json | 10 +++++----- composer.lock | 53 ++++++++++++++++++++++----------------------------- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/composer.json b/composer.json index cd43171..56780d9 100644 --- a/composer.json +++ b/composer.json @@ -34,11 +34,11 @@ }, "require-dev": { "ext-redis": "*", - "swoole/ide-helper": "4.8.8", - "phpunit/phpunit": "^9.5.5", - "laravel/pint": "^0.2.3", - "workerman/workerman": "^4.0", - "phpstan/phpstan": "^1.8" + "swoole/ide-helper": "5.1.7", + "phpunit/phpunit": "9.*", + "laravel/pint": "1.*", + "workerman/workerman": "4.*", + "phpstan/phpstan": "1.*" }, "suggest": { "ext-swoole": "Needed to support Swoole.", diff --git a/composer.lock b/composer.lock index d9ca559..d478ad8 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "6f6f51f98de6804fe085183cc08e6f94", + "content-hash": "2b8d3acb921b6eae11d6edd946847d05", "packages": [ { "name": "appwrite-labs/php-amqplib", @@ -2447,16 +2447,16 @@ }, { "name": "laravel/pint", - "version": "v0.2.4", + "version": "v1.23.0", "source": { "type": "git", "url": "https://github.com/laravel/pint.git", - "reference": "45c9fe899abfeeb7794c5a8c4074c140250a84c2" + "reference": "9ab851dba4faa51a3c3223dd3d07044129021024" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/laravel/pint/zipball/45c9fe899abfeeb7794c5a8c4074c140250a84c2", - "reference": "45c9fe899abfeeb7794c5a8c4074c140250a84c2", + "url": "https://api.github.com/repos/laravel/pint/zipball/9ab851dba4faa51a3c3223dd3d07044129021024", + "reference": "9ab851dba4faa51a3c3223dd3d07044129021024", "shasum": "" }, "require": { @@ -2464,22 +2464,25 @@ "ext-mbstring": "*", "ext-tokenizer": "*", "ext-xml": "*", - "php": "^8.0" + "php": "^8.2.0" }, "require-dev": { - "friendsofphp/php-cs-fixer": "^3.8.0", - "illuminate/view": "^9.17.0", - "laravel-zero/framework": "^9.1.1", - "mockery/mockery": "^1.5.0", - "nunomaduro/larastan": "^2.1.11", - "nunomaduro/termwind": "^1.10.1", - "pestphp/pest": "^1.21.3" + "friendsofphp/php-cs-fixer": "^3.76.0", + "illuminate/view": "^11.45.1", + "larastan/larastan": "^3.5.0", + "laravel-zero/framework": "^11.45.0", + "mockery/mockery": "^1.6.12", + "nunomaduro/termwind": "^2.3.1", + "pestphp/pest": "^2.36.0" }, "bin": [ "builds/pint" ], "type": "project", "autoload": { + "files": [ + "overrides/Runner/Parallel/ProcessFactory.php" + ], "psr-4": { "App\\": "app/", "Database\\Seeders\\": "database/seeders/", @@ -2509,7 +2512,7 @@ "issues": "https://github.com/laravel/pint/issues", "source": "https://github.com/laravel/pint" }, - "time": "2022-07-13T17:57:52+00:00" + "time": "2025-07-03T10:37:47+00:00" }, { "name": "myclabs/deep-copy", @@ -4200,16 +4203,16 @@ }, { "name": "swoole/ide-helper", - "version": "4.8.8", + "version": "5.1.7", "source": { "type": "git", "url": "https://github.com/swoole/ide-helper.git", - "reference": "dd87843a5040831f9ad40b68fb57879b7342ef61" + "reference": "c6f9cd0aa1a1e3691ed736253f0cdce381d96cae" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/swoole/ide-helper/zipball/dd87843a5040831f9ad40b68fb57879b7342ef61", - "reference": "dd87843a5040831f9ad40b68fb57879b7342ef61", + "url": "https://api.github.com/repos/swoole/ide-helper/zipball/c6f9cd0aa1a1e3691ed736253f0cdce381d96cae", + "reference": "c6f9cd0aa1a1e3691ed736253f0cdce381d96cae", "shasum": "" }, "type": "library", @@ -4226,19 +4229,9 @@ "description": "IDE help files for Swoole.", "support": { "issues": "https://github.com/swoole/ide-helper/issues", - "source": "https://github.com/swoole/ide-helper/tree/4.8.8" + "source": "https://github.com/swoole/ide-helper/tree/5.1.7" }, - "funding": [ - { - "url": "https://gitee.com/swoole/swoole?donate=true", - "type": "custom" - }, - { - "url": "https://github.com/swoole", - "type": "github" - } - ], - "time": "2022-03-17T18:24:39+00:00" + "time": "2025-03-22T23:53:02+00:00" }, { "name": "theseer/tokenizer", From 71233d7ec07eec966dd107ec83023f5f389739ce Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 17:36:52 -0400 Subject: [PATCH 08/17] Add swoole ack test --- tests/Queue/E2E/Adapter/AMQPAckTest.php | 8 +- tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php | 111 ++++++++++++++++++ tests/Queue/E2E/Adapter/AMQPSwooleTest.php | 7 +- tests/Queue/E2E/Adapter/AMQPTest.php | 7 +- 4 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php diff --git a/tests/Queue/E2E/Adapter/AMQPAckTest.php b/tests/Queue/E2E/Adapter/AMQPAckTest.php index 9c06d3d..bb2029d 100644 --- a/tests/Queue/E2E/Adapter/AMQPAckTest.php +++ b/tests/Queue/E2E/Adapter/AMQPAckTest.php @@ -11,7 +11,13 @@ class AMQPAckTest extends Base { protected function getPublisher(): Publisher { - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp', requireAck: true); + return new AMQP( + host: 'amqp', + port: 5672, + user: 'amqp', + password: 'amqp', + requireAck: true + ); } protected function getQueue(): Queue diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php new file mode 100644 index 0000000..4aeb2c0 --- /dev/null +++ b/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php @@ -0,0 +1,111 @@ +getPublisher(); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + public function testConcurrency(): void + { + run(function () { + $publisher = $this->getPublisher(); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + /** + * Override testRetry to run within Swoole coroutines + * @depends testEvents + */ + public function testRetry(): void + { + run(function () { + $publisher = $this->getPublisher(); + go(function () use ($publisher) { + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 1 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 2 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 3 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 4 + ]); + + $this->assertTrue($published); + + sleep(1); + $publisher->retry($this->getQueue()); + sleep(1); + $publisher->retry($this->getQueue(), 2); + sleep(1); + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } +} diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php index 54d1752..5b76886 100644 --- a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php +++ b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php @@ -12,7 +12,12 @@ class AMQPSwooleTest extends Base { protected function getPublisher(): Publisher { - return new AMQPSwoole(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); + return new AMQPSwoole( + host: 'amqp', + port: 5672, + user: 'amqp', + password: 'amqp' + ); } protected function getQueue(): Queue diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index e8557a3..a423214 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -10,7 +10,12 @@ class AMQPTest extends Base { protected function getPublisher(): Publisher { - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); + return new AMQP( + host: 'amqp', + port: 5672, + user: 'amqp', + password: 'amqp' + ); } protected function getQueue(): Queue From 63ed4d664aab250e90889f0d51978fc9752305bb Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:14:04 -0400 Subject: [PATCH 09/17] Single ack --- src/Queue/Broker/AMQP.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 94e7aa7..f703c18 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -113,7 +113,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $result = $messageCallback($message); match (true) { - $result instanceof Commit => $amqpMessage->ack(true), + $result instanceof Commit => $amqpMessage->ack(), $result instanceof NoCommit => null, default => $amqpMessage->ack() }; From 5d08ed7594b0322a780ca73f6743debf9b759c48 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:14:34 -0400 Subject: [PATCH 10/17] Declare exchange on wait ack --- src/Queue/Broker/AMQP.php | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f703c18..aecd7d1 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -112,11 +112,13 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $message = new Message($nextMessage); $result = $messageCallback($message); + match (true) { $result instanceof Commit => $amqpMessage->ack(), $result instanceof NoCommit => null, default => $amqpMessage->ack() }; + $successCallback($message); } catch (Retryable $e) { $amqpMessage->nack(requeue: true); @@ -195,6 +197,13 @@ public function enqueue(Queue $queue, array $payload): bool return; } + // Redeclare topology, because the queue might not exist yet + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); + + // Wait for the message to be acknowledged by the broker $channel->wait_for_pending_acks($this->ackTimeout); } catch ( AMQPTimeoutException | From 041d6c2f329bafa943c7845298ffc7a422655dcb Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:17:10 -0400 Subject: [PATCH 11/17] Lint --- composer.json | 2 +- tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 56780d9..e661550 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ }, "scripts":{ "test": "phpunit", - "check": "vendor/bin/phpstan analyse", + "check": "vendor/bin/phpstan --memory-limit=2G analyse", "format": "vendor/bin/pint", "lint": "vendor/bin/pint --test" }, diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php index 4aeb2c0..89154e8 100644 --- a/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php +++ b/tests/Queue/E2E/Adapter/AMQPSwooleAckTest.php @@ -6,6 +6,7 @@ use Utopia\Queue\Broker\AMQPSwoole; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; + use function Co\run; class AMQPSwooleAckTest extends Base From 45448d750bef204233c2a2f61021662f8c48820f Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:18:39 -0400 Subject: [PATCH 12/17] Validations --- src/Queue/Broker/AMQP.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index aecd7d1..5ffdf3b 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -65,11 +65,17 @@ public function setRequireAck(bool $require): void public function setAckTimeout(float $timeout): void { + if ($timeout <= 0) { + throw new \InvalidArgumentException('Ack timeout must be positive'); + } $this->ackTimeout = $timeout; } public function setMaxEnqueueAttempts(int $maxEnqueueAttempts): void { + if ($maxEnqueueAttempts < 1) { + throw new \InvalidArgumentException('Max enqueue attempts must be at least 1'); + } $this->maxEnqueueAttempts = $maxEnqueueAttempts; } From d77d23121cb2fa83099188ba234ecc56a818f007 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:21:15 -0400 Subject: [PATCH 13/17] Remove redundant branch --- src/Queue/Broker/AMQP.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 5ffdf3b..9328273 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -120,7 +120,6 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $result = $messageCallback($message); match (true) { - $result instanceof Commit => $amqpMessage->ack(), $result instanceof NoCommit => null, default => $amqpMessage->ack() }; From 25f9f57e1a3ce936a98ce7310ab79a78d45ad9bb Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:25:50 -0400 Subject: [PATCH 14/17] Lint --- src/Queue/Broker/AMQP.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 9328273..a06448e 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -18,7 +18,6 @@ use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; -use Utopia\Queue\Result\Commit; use Utopia\Queue\Result\NoCommit; class AMQP implements Publisher, Consumer From afa3cfa7946a8e0027024e881ddb869a74bf0b47 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:41:23 -0400 Subject: [PATCH 15/17] Channel per coroutine --- src/Queue/Broker/AMQP.php | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index a06448e..2fd719b 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -22,7 +22,11 @@ class AMQP implements Publisher, Consumer { - protected ?AMQPChannel $channel = null; + /** + * One channel per coroutine (CID => AMQPChannel). + * Non-coroutine contexts use CID = 0. + */ + protected array $channels = []; private array $exchangeArguments = []; private array $queueArguments = []; @@ -163,7 +167,14 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { - $this->channel?->getConnection()?->close(); + foreach ($this->channels as $cid => $ch) { + try { + $ch->getConnection()?->close(); + } catch (\Throwable) { + // ignore – connection might already be closed + } + unset($this->channels[$cid]); + } } /** @@ -265,6 +276,10 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int */ protected function withChannel(callable $callback): void { + $cid = \class_exists('\\Swoole\\Coroutine') + ? \Swoole\Coroutine::getCid() + : 0; + $createChannel = function (): AMQPChannel { $connection = new AMQPStreamConnection( $this->host, @@ -293,18 +308,18 @@ protected function withChannel(callable $callback): void return $channel; }; - if (!$this->channel) { - $this->channel = $createChannel(); + if (!isset($this->channels[$cid])) { + $this->channels[$cid] = $createChannel(); } try { - $callback($this->channel); + $callback($this->channels[$cid]); } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); + // discard broken channel for this coroutine + unset($this->channels[$cid]); + // create a new channel once; rethrow on second failure + $this->channels[$cid] = $createChannel(); + $callback($this->channels[$cid]); } } } From c032f484921b08eccabb3463d4e174a54a832396 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:51:43 -0400 Subject: [PATCH 16/17] Abstract swoole with channel --- composer.json | 3 ++- composer.lock | 6 ++--- src/Queue/Broker/AMQP.php | 7 ++++- src/Queue/Broker/AMQPSwoole.php | 48 ++------------------------------- 4 files changed, 12 insertions(+), 52 deletions(-) diff --git a/composer.json b/composer.json index e661550..b767ab2 100644 --- a/composer.json +++ b/composer.json @@ -33,7 +33,6 @@ "utopia-php/fetch": "0.4.*" }, "require-dev": { - "ext-redis": "*", "swoole/ide-helper": "5.1.7", "phpunit/phpunit": "9.*", "laravel/pint": "1.*", @@ -41,6 +40,8 @@ "phpstan/phpstan": "1.*" }, "suggest": { + "ext-redis": "Needed to support Redis.", + "ext-amqp": "Needed to support AMQP.", "ext-swoole": "Needed to support Swoole.", "workerman/workerman": "Needed to support Workerman." }, diff --git a/composer.lock b/composer.lock index d478ad8..4528bc6 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "2b8d3acb921b6eae11d6edd946847d05", + "content-hash": "350529d3b55908338187ee3a1f284071", "packages": [ { "name": "appwrite-labs/php-amqplib", @@ -4355,8 +4355,6 @@ "platform": { "php": ">=8.3" }, - "platform-dev": { - "ext-redis": "*" - }, + "platform-dev": {}, "plugin-api-version": "2.6.0" } diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 2fd719b..ee327cf 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -58,6 +58,11 @@ public function __construct( ) { } + public function getConnectionType(): string + { + return AMQPStreamConnection::class; + } + /** * Enable or disable waiting for publisher confirms. */ @@ -281,7 +286,7 @@ protected function withChannel(callable $callback): void : 0; $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection( + $connection = new ($this->getConnectionType())( $this->host, $this->port, $this->user, diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php index 3120d53..7be8f17 100644 --- a/src/Queue/Broker/AMQPSwoole.php +++ b/src/Queue/Broker/AMQPSwoole.php @@ -2,56 +2,12 @@ namespace Utopia\Queue\Broker; -use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPSwooleConnection; class AMQPSwoole extends AMQP { - /** - * Override the withChannel method to use AMQPSwooleConnection instead of AMQPStreamConnection - * - * @param callable(AMQPChannel $channel): void $callback - * @throws \Exception - */ - protected function withChannel(callable $callback): void + public function getConnectionType(): string { - $createChannel = function (): AMQPChannel { - $connection = new AMQPSwooleConnection( - $this->host, - $this->port, - $this->user, - $this->password, - $this->vhost, - connection_timeout: $this->connectTimeout, - read_write_timeout: $this->readWriteTimeout, - heartbeat: $this->heartbeat, - ); - - if (\is_callable($this->connectionConfigHook)) { - ($this->connectionConfigHook)($connection); - } - - $channel = $connection->channel(); - - if (\is_callable($this->channelConfigHook)) { - ($this->channelConfigHook)($channel); - } - - return $channel; - }; - - if (!$this->channel) { - $this->channel = $createChannel(); - } - - try { - $callback($this->channel); - } catch (\Throwable) { - // createChannel() might throw, in that case set the channel to `null` first. - $this->channel = null; - // try creating a new connection once, if this still fails, throw the error - $this->channel = $createChannel(); - $callback($this->channel); - } + return AMQPSwooleConnection::class; } } From 2b85380ff37ab454eab6ff9369631227f8c44f85 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Fri, 4 Jul 2025 18:56:37 -0400 Subject: [PATCH 17/17] Early decl exchange --- src/Queue/Broker/AMQP.php | 14 +++++++------- src/Queue/Broker/AMQPSwoole.php | 1 + 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index ee327cf..607c784 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -205,6 +205,12 @@ public function enqueue(Queue $queue, array $payload): bool $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { for ($attempts = 0; $attempts < $this->maxEnqueueAttempts; $attempts++) { try { + // Redeclare topology, because the queue might not exist yet + $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); + $channel->basic_publish( $message, exchange: $queue->namespace, @@ -212,17 +218,11 @@ public function enqueue(Queue $queue, array $payload): bool mandatory: $this->requireAck ); + // No need to wait for ack if not required if (!$this->requireAck) { - // No need to wait for ack if not required return; } - // Redeclare topology, because the queue might not exist yet - $channel->exchange_declare($queue->namespace, AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); - $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); - // Wait for the message to be acknowledged by the broker $channel->wait_for_pending_acks($this->ackTimeout); } catch ( diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php index 7be8f17..c636292 100644 --- a/src/Queue/Broker/AMQPSwoole.php +++ b/src/Queue/Broker/AMQPSwoole.php @@ -6,6 +6,7 @@ class AMQPSwoole extends AMQP { + #[\Override] public function getConnectionType(): string { return AMQPSwooleConnection::class;