From fc15d1aef1c79e7557134d6a9c59576a452085d2 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 20:31:05 +0400 Subject: [PATCH 01/11] refactor: update AMQP connection handling to support Swoole and enhance test coverage - Replaced php-amqplib/php-amqplib with appwrite-labs/php-amqplib in composer.json for Swoole support. - Added connection class flexibility in AMQP broker to allow using AMQPSwooleConnection. - Enhanced tests to cover both AMQPStreamConnection and AMQPSwooleConnection. - Updated various dependencies in composer.lock for compatibility and improvements. --- composer.json | 8 +- composer.lock | 480 ++++++++++++++------------- src/Queue/Broker/AMQP.php | 11 +- tests/Queue/E2E/Adapter/AMQPTest.php | 101 +++++- tests/Queue/E2E/Adapter/Base.php | 78 ----- 5 files changed, 363 insertions(+), 315 deletions(-) diff --git a/composer.json b/composer.json index 7de9530..7e3f969 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "php-amqplib/php-amqplib": "^3.7", + "appwrite-labs/php-amqplib": "dev-master", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", @@ -44,6 +44,12 @@ "ext-swoole": "Needed to support Swoole.", "workerman/workerman": "Needed to support Workerman." }, + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/appwrite-labs/php-amqplib" + } + ], "config": { "allow-plugins": { "php-http/discovery": true, diff --git a/composer.lock b/composer.lock index 126551c..c4cdf8d 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,119 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "574ad3b103f97c1668af99674784aae8", + "content-hash": "23b5f3bf9991a1527ac430fd3d2a0357", "packages": [ + { + "name": "appwrite-labs/php-amqplib", + "version": "dev-master", + "source": { + "type": "git", + "url": "https://github.com/appwrite-labs/php-amqplib.git", + "reference": "30d709df7510e784d16ad7a7f817d26fe79615a0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/30d709df7510e784d16ad7a7f817d26fe79615a0", + "reference": "30d709df7510e784d16ad7a7f817d26fe79615a0", + "shasum": "" + }, + "require": { + "ext-mbstring": "*", + "ext-sockets": "*", + "php": "^7.2||^8.0", + "phpseclib/phpseclib": "^2.0|^3.0" + }, + "conflict": { + "php": "7.4.0 - 7.4.1" + }, + "replace": { + "php-amqplib/php-amqplib": "self.version", + "videlalvaro/php-amqplib": "self.version" + }, + "require-dev": { + "ext-curl": "*", + "nategood/httpful": "^0.2.20", + "phpunit/phpunit": "^7.5|^9.5", + "squizlabs/php_codesniffer": "^3.6", + "swoole/ide-helper": "^5.0" + }, + "suggest": { + "ext-swoole": "For Swoole coroutine support" + }, + "default-branch": true, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0-dev" + } + }, + "autoload": { + "psr-4": { + "PhpAmqpLib\\": "PhpAmqpLib/" + } + }, + "autoload-dev": { + "psr-4": { + "PhpAmqpLib\\Tests\\Functional\\": "tests/Functional", + "PhpAmqpLib\\Tests\\Unit\\": "tests/Unit" + } + }, + "license": [ + "LGPL-2.1-or-later" + ], + "authors": [ + { + "name": "Appwrite Labs", + "email": "team@appwrite.io", + "role": "Fork Maintainer" + }, + { + "name": "Alvaro Videla", + "role": "Original Maintainer" + }, + { + "name": "Raúl Araya", + "email": "nubeiro@gmail.com", + "role": "Maintainer" + }, + { + "name": "Luke Bakken", + "email": "luke@bakken.io", + "role": "Maintainer" + }, + { + "name": "Ramūnas Dronga", + "email": "github@ramuno.lt", + "role": "Maintainer" + } + ], + "description": "Fork of php-amqplib with Swoole coroutine support. A pure PHP implementation of the AMQP protocol tested against RabbitMQ.", + "homepage": "https://github.com/appwrite-labs/php-amqplib/", + "keywords": [ + "async", + "coroutine", + "message", + "queue", + "rabbitmq", + "swoole" + ], + "support": { + "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.0" + }, + "time": "2025-06-13T21:45:04+00:00" + }, { "name": "brick/math", - "version": "0.12.3", + "version": "0.13.1", "source": { "type": "git", "url": "https://github.com/brick/math.git", - "reference": "866551da34e9a618e64a819ee1e01c20d8a588ba" + "reference": "fc7ed316430118cc7836bf45faff18d5dfc8de04" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/brick/math/zipball/866551da34e9a618e64a819ee1e01c20d8a588ba", - "reference": "866551da34e9a618e64a819ee1e01c20d8a588ba", + "url": "https://api.github.com/repos/brick/math/zipball/fc7ed316430118cc7836bf45faff18d5dfc8de04", + "reference": "fc7ed316430118cc7836bf45faff18d5dfc8de04", "shasum": "" }, "require": { @@ -56,7 +155,7 @@ ], "support": { "issues": "https://github.com/brick/math/issues", - "source": "https://github.com/brick/math/tree/0.12.3" + "source": "https://github.com/brick/math/tree/0.13.1" }, "funding": [ { @@ -64,7 +163,7 @@ "type": "github" } ], - "time": "2025-02-28T13:11:00+00:00" + "time": "2025-03-29T13:50:30+00:00" }, { "name": "composer/semver", @@ -149,16 +248,16 @@ }, { "name": "google/protobuf", - "version": "v4.30.2", + "version": "v4.31.1", "source": { "type": "git", "url": "https://github.com/protocolbuffers/protobuf-php.git", - "reference": "a4c4d8565b40b9f76debc9dfeb221412eacb8ced" + "reference": "2b028ce8876254e2acbeceea7d9b573faad41864" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/a4c4d8565b40b9f76debc9dfeb221412eacb8ced", - "reference": "a4c4d8565b40b9f76debc9dfeb221412eacb8ced", + "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/2b028ce8876254e2acbeceea7d9b573faad41864", + "reference": "2b028ce8876254e2acbeceea7d9b573faad41864", "shasum": "" }, "require": { @@ -187,9 +286,9 @@ "proto" ], "support": { - "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.30.2" + "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.31.1" }, - "time": "2025-03-26T18:01:50+00:00" + "time": "2025-05-28T18:52:35+00:00" }, { "name": "nyholm/psr7", @@ -337,16 +436,16 @@ }, { "name": "open-telemetry/api", - "version": "1.2.3", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1" + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/4e3bb38e069876fb73c2ce85c89583bf2b28cd86", + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86", "shasum": "" }, "require": { @@ -403,20 +502,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-05T21:42:54+00:00" + "time": "2025-05-07T12:32:21+00:00" }, { "name": "open-telemetry/context", - "version": "1.1.0", + "version": "1.2.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/context.git", - "reference": "0cba875ea1953435f78aec7f1d75afa87bdbf7f3" + "reference": "1eb2b837ee9362db064a6b65d5ecce15a9f9f020" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/context/zipball/0cba875ea1953435f78aec7f1d75afa87bdbf7f3", - "reference": "0cba875ea1953435f78aec7f1d75afa87bdbf7f3", + "url": "https://api.github.com/repos/opentelemetry-php/context/zipball/1eb2b837ee9362db064a6b65d5ecce15a9f9f020", + "reference": "1eb2b837ee9362db064a6b65d5ecce15a9f9f020", "shasum": "" }, "require": { @@ -462,20 +561,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2024-08-21T00:29:20+00:00" + "time": "2025-05-07T23:36:50+00:00" }, { "name": "open-telemetry/exporter-otlp", - "version": "1.2.1", + "version": "1.3.1", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747" + "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/b7580440b7481a98da97aceabeb46e1b276c8747", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/8b3ca1f86d01429c73b407bf1a2075d9c187001e", + "reference": "8b3ca1f86d01429c73b407bf1a2075d9c187001e", "shasum": "" }, "require": { @@ -526,7 +625,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-06T23:21:56+00:00" + "time": "2025-05-21T12:02:20+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -593,16 +692,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.2.4", + "version": "1.5.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "47fcb66ae5328c5a799195247b1dce551d85873e" + "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/47fcb66ae5328c5a799195247b1dce551d85873e", - "reference": "47fcb66ae5328c5a799195247b1dce551d85873e", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/cd0d7367599717fc29e04eb8838ec061e6c2c657", + "reference": "cd0d7367599717fc29e04eb8838ec061e6c2c657", "shasum": "" }, "require": { @@ -679,20 +778,20 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-04-15T07:02:07+00:00" + "time": "2025-05-22T02:33:34+00:00" }, { "name": "open-telemetry/sem-conv", - "version": "1.30.0", + "version": "1.32.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sem-conv.git", - "reference": "4178c9f390da8e4dbca9b181a9d1efd50cf7ee0a" + "reference": "16585cc0dbc3032a318e274043454679430d2ebf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/4178c9f390da8e4dbca9b181a9d1efd50cf7ee0a", - "reference": "4178c9f390da8e4dbca9b181a9d1efd50cf7ee0a", + "url": "https://api.github.com/repos/opentelemetry-php/sem-conv/zipball/16585cc0dbc3032a318e274043454679430d2ebf", + "reference": "16585cc0dbc3032a318e274043454679430d2ebf", "shasum": "" }, "require": { @@ -736,7 +835,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-02-06T00:21:48+00:00" + "time": "2025-05-05T03:58:53+00:00" }, { "name": "paragonie/constant_time_encoding", @@ -855,87 +954,6 @@ }, "time": "2020-10-15T08:29:30+00:00" }, - { - "name": "php-amqplib/php-amqplib", - "version": "v3.7.3", - "source": { - "type": "git", - "url": "https://github.com/php-amqplib/php-amqplib.git", - "reference": "9f50fe69a9f1a19e2cb25596a354d705de36fe59" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/9f50fe69a9f1a19e2cb25596a354d705de36fe59", - "reference": "9f50fe69a9f1a19e2cb25596a354d705de36fe59", - "shasum": "" - }, - "require": { - "ext-mbstring": "*", - "ext-sockets": "*", - "php": "^7.2||^8.0", - "phpseclib/phpseclib": "^2.0|^3.0" - }, - "conflict": { - "php": "7.4.0 - 7.4.1" - }, - "replace": { - "videlalvaro/php-amqplib": "self.version" - }, - "require-dev": { - "ext-curl": "*", - "nategood/httpful": "^0.2.20", - "phpunit/phpunit": "^7.5|^9.5", - "squizlabs/php_codesniffer": "^3.6" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-master": "3.0-dev" - } - }, - "autoload": { - "psr-4": { - "PhpAmqpLib\\": "PhpAmqpLib/" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "LGPL-2.1-or-later" - ], - "authors": [ - { - "name": "Alvaro Videla", - "role": "Original Maintainer" - }, - { - "name": "Raúl Araya", - "email": "nubeiro@gmail.com", - "role": "Maintainer" - }, - { - "name": "Luke Bakken", - "email": "luke@bakken.io", - "role": "Maintainer" - }, - { - "name": "Ramūnas Dronga", - "email": "github@ramuno.lt", - "role": "Maintainer" - } - ], - "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", - "homepage": "https://github.com/php-amqplib/php-amqplib/", - "keywords": [ - "message", - "queue", - "rabbitmq" - ], - "support": { - "issues": "https://github.com/php-amqplib/php-amqplib/issues", - "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.3" - }, - "time": "2025-02-18T20:11:13+00:00" - }, { "name": "php-http/discovery", "version": "1.20.0", @@ -1017,16 +1035,16 @@ }, { "name": "phpseclib/phpseclib", - "version": "3.0.43", + "version": "3.0.45", "source": { "type": "git", "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "709ec107af3cb2f385b9617be72af8cf62441d02" + "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/709ec107af3cb2f385b9617be72af8cf62441d02", - "reference": "709ec107af3cb2f385b9617be72af8cf62441d02", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/bd81b90d5963c6b9d87de50357585375223f4dd8", + "reference": "bd81b90d5963c6b9d87de50357585375223f4dd8", "shasum": "" }, "require": { @@ -1107,7 +1125,7 @@ ], "support": { "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.43" + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.45" }, "funding": [ { @@ -1123,7 +1141,7 @@ "type": "tidelift" } ], - "time": "2024-12-14T21:12:59+00:00" + "time": "2025-06-22T22:54:43+00:00" }, { "name": "psr/container", @@ -1466,20 +1484,20 @@ }, { "name": "ramsey/uuid", - "version": "4.7.6", + "version": "4.8.1", "source": { "type": "git", "url": "https://github.com/ramsey/uuid.git", - "reference": "91039bc1faa45ba123c4328958e620d382ec7088" + "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/ramsey/uuid/zipball/91039bc1faa45ba123c4328958e620d382ec7088", - "reference": "91039bc1faa45ba123c4328958e620d382ec7088", + "url": "https://api.github.com/repos/ramsey/uuid/zipball/fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", + "reference": "fdf4dd4e2ff1813111bd0ad58d7a1ddbb5b56c28", "shasum": "" }, "require": { - "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12", + "brick/math": "^0.8.8 || ^0.9 || ^0.10 || ^0.11 || ^0.12 || ^0.13", "ext-json": "*", "php": "^8.0", "ramsey/collection": "^1.2 || ^2.0" @@ -1488,26 +1506,23 @@ "rhumsaa/uuid": "self.version" }, "require-dev": { - "captainhook/captainhook": "^5.10", + "captainhook/captainhook": "^5.25", "captainhook/plugin-composer": "^5.3", - "dealerdirect/phpcodesniffer-composer-installer": "^0.7.0", - "doctrine/annotations": "^1.8", - "ergebnis/composer-normalize": "^2.15", - "mockery/mockery": "^1.3", + "dealerdirect/phpcodesniffer-composer-installer": "^1.0", + "ergebnis/composer-normalize": "^2.47", + "mockery/mockery": "^1.6", "paragonie/random-lib": "^2", - "php-mock/php-mock": "^2.2", - "php-mock/php-mock-mockery": "^1.3", - "php-parallel-lint/php-parallel-lint": "^1.1", - "phpbench/phpbench": "^1.0", - "phpstan/extension-installer": "^1.1", - "phpstan/phpstan": "^1.8", - "phpstan/phpstan-mockery": "^1.1", - "phpstan/phpstan-phpunit": "^1.1", - "phpunit/phpunit": "^8.5 || ^9", - "ramsey/composer-repl": "^1.4", - "slevomat/coding-standard": "^8.4", - "squizlabs/php_codesniffer": "^3.5", - "vimeo/psalm": "^4.9" + "php-mock/php-mock": "^2.6", + "php-mock/php-mock-mockery": "^1.5", + "php-parallel-lint/php-parallel-lint": "^1.4.0", + "phpbench/phpbench": "^1.2.14", + "phpstan/extension-installer": "^1.4", + "phpstan/phpstan": "^2.1", + "phpstan/phpstan-mockery": "^2.0", + "phpstan/phpstan-phpunit": "^2.0", + "phpunit/phpunit": "^9.6", + "slevomat/coding-standard": "^8.18", + "squizlabs/php_codesniffer": "^3.13" }, "suggest": { "ext-bcmath": "Enables faster math with arbitrary-precision integers using BCMath.", @@ -1542,32 +1557,22 @@ ], "support": { "issues": "https://github.com/ramsey/uuid/issues", - "source": "https://github.com/ramsey/uuid/tree/4.7.6" + "source": "https://github.com/ramsey/uuid/tree/4.8.1" }, - "funding": [ - { - "url": "https://github.com/ramsey", - "type": "github" - }, - { - "url": "https://tidelift.com/funding/github/packagist/ramsey/uuid", - "type": "tidelift" - } - ], - "time": "2024-04-27T21:32:50+00:00" + "time": "2025-06-01T06:28:46+00:00" }, { "name": "symfony/deprecation-contracts", - "version": "v3.5.1", + "version": "v3.6.0", "source": { "type": "git", "url": "https://github.com/symfony/deprecation-contracts.git", - "reference": "74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6" + "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6", - "reference": "74c71c939a79f7d5bf3c1ce9f5ea37ba0114c6f6", + "url": "https://api.github.com/repos/symfony/deprecation-contracts/zipball/63afe740e99a13ba87ec199bb07bbdee937a5b62", + "reference": "63afe740e99a13ba87ec199bb07bbdee937a5b62", "shasum": "" }, "require": { @@ -1580,7 +1585,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.5-dev" + "dev-main": "3.6-dev" } }, "autoload": { @@ -1605,7 +1610,7 @@ "description": "A generic function and convention to trigger deprecation notices", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/deprecation-contracts/tree/v3.5.1" + "source": "https://github.com/symfony/deprecation-contracts/tree/v3.6.0" }, "funding": [ { @@ -1621,20 +1626,20 @@ "type": "tidelift" } ], - "time": "2024-09-25T14:20:29+00:00" + "time": "2024-09-25T14:21:43+00:00" }, { "name": "symfony/http-client", - "version": "v7.2.4", + "version": "v7.3.0", "source": { "type": "git", "url": "https://github.com/symfony/http-client.git", - "reference": "78981a2ffef6437ed92d4d7e2a86a82f256c6dc6" + "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client/zipball/78981a2ffef6437ed92d4d7e2a86a82f256c6dc6", - "reference": "78981a2ffef6437ed92d4d7e2a86a82f256c6dc6", + "url": "https://api.github.com/repos/symfony/http-client/zipball/57e4fb86314015a695a750ace358d07a7e37b8a9", + "reference": "57e4fb86314015a695a750ace358d07a7e37b8a9", "shasum": "" }, "require": { @@ -1700,7 +1705,7 @@ "http" ], "support": { - "source": "https://github.com/symfony/http-client/tree/v7.2.4" + "source": "https://github.com/symfony/http-client/tree/v7.3.0" }, "funding": [ { @@ -1716,20 +1721,20 @@ "type": "tidelift" } ], - "time": "2025-02-13T10:27:23+00:00" + "time": "2025-05-02T08:23:16+00:00" }, { "name": "symfony/http-client-contracts", - "version": "v3.5.2", + "version": "v3.6.0", "source": { "type": "git", "url": "https://github.com/symfony/http-client-contracts.git", - "reference": "ee8d807ab20fcb51267fdace50fbe3494c31e645" + "reference": "75d7043853a42837e68111812f4d964b01e5101c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/ee8d807ab20fcb51267fdace50fbe3494c31e645", - "reference": "ee8d807ab20fcb51267fdace50fbe3494c31e645", + "url": "https://api.github.com/repos/symfony/http-client-contracts/zipball/75d7043853a42837e68111812f4d964b01e5101c", + "reference": "75d7043853a42837e68111812f4d964b01e5101c", "shasum": "" }, "require": { @@ -1742,7 +1747,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.5-dev" + "dev-main": "3.6-dev" } }, "autoload": { @@ -1778,7 +1783,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/http-client-contracts/tree/v3.5.2" + "source": "https://github.com/symfony/http-client-contracts/tree/v3.6.0" }, "funding": [ { @@ -1794,23 +1799,24 @@ "type": "tidelift" } ], - "time": "2024-12-07T08:49:48+00:00" + "time": "2025-04-29T11:18:49+00:00" }, { "name": "symfony/polyfill-mbstring", - "version": "v1.31.0", + "version": "v1.32.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-mbstring.git", - "reference": "85181ba99b2345b0ef10ce42ecac37612d9fd341" + "reference": "6d857f4d76bd4b343eac26d6b539585d2bc56493" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/85181ba99b2345b0ef10ce42ecac37612d9fd341", - "reference": "85181ba99b2345b0ef10ce42ecac37612d9fd341", + "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/6d857f4d76bd4b343eac26d6b539585d2bc56493", + "reference": "6d857f4d76bd4b343eac26d6b539585d2bc56493", "shasum": "" }, "require": { + "ext-iconv": "*", "php": ">=7.2" }, "provide": { @@ -1858,7 +1864,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.31.0" + "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.32.0" }, "funding": [ { @@ -1874,11 +1880,11 @@ "type": "tidelift" } ], - "time": "2024-09-09T11:45:10+00:00" + "time": "2024-12-23T08:48:59+00:00" }, { "name": "symfony/polyfill-php82", - "version": "v1.31.0", + "version": "v1.32.0", "source": { "type": "git", "url": "https://github.com/symfony/polyfill-php82.git", @@ -1934,7 +1940,7 @@ "shim" ], "support": { - "source": "https://github.com/symfony/polyfill-php82/tree/v1.31.0" + "source": "https://github.com/symfony/polyfill-php82/tree/v1.32.0" }, "funding": [ { @@ -1954,16 +1960,16 @@ }, { "name": "symfony/service-contracts", - "version": "v3.5.1", + "version": "v3.6.0", "source": { "type": "git", "url": "https://github.com/symfony/service-contracts.git", - "reference": "e53260aabf78fb3d63f8d79d69ece59f80d5eda0" + "reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/service-contracts/zipball/e53260aabf78fb3d63f8d79d69ece59f80d5eda0", - "reference": "e53260aabf78fb3d63f8d79d69ece59f80d5eda0", + "url": "https://api.github.com/repos/symfony/service-contracts/zipball/f021b05a130d35510bd6b25fe9053c2a8a15d5d4", + "reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4", "shasum": "" }, "require": { @@ -1981,7 +1987,7 @@ "name": "symfony/contracts" }, "branch-alias": { - "dev-main": "3.5-dev" + "dev-main": "3.6-dev" } }, "autoload": { @@ -2017,7 +2023,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/service-contracts/tree/v3.5.1" + "source": "https://github.com/symfony/service-contracts/tree/v3.6.0" }, "funding": [ { @@ -2033,7 +2039,7 @@ "type": "tidelift" } ], - "time": "2024-09-25T14:20:29+00:00" + "time": "2025-04-25T09:37:31+00:00" }, { "name": "tbachert/spi", @@ -2184,16 +2190,16 @@ }, { "name": "utopia-php/fetch", - "version": "0.4.1", + "version": "0.4.2", "source": { "type": "git", "url": "https://github.com/utopia-php/fetch.git", - "reference": "65095dac14037db0c822fb5e209e5bd3187a0303" + "reference": "83986d1be75a2fae4e684107fe70dd78a8e19b77" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/fetch/zipball/65095dac14037db0c822fb5e209e5bd3187a0303", - "reference": "65095dac14037db0c822fb5e209e5bd3187a0303", + "url": "https://api.github.com/repos/utopia-php/fetch/zipball/83986d1be75a2fae4e684107fe70dd78a8e19b77", + "reference": "83986d1be75a2fae4e684107fe70dd78a8e19b77", "shasum": "" }, "require": { @@ -2217,22 +2223,22 @@ "description": "A simple library that provides an interface for making HTTP Requests.", "support": { "issues": "https://github.com/utopia-php/fetch/issues", - "source": "https://github.com/utopia-php/fetch/tree/0.4.1" + "source": "https://github.com/utopia-php/fetch/tree/0.4.2" }, - "time": "2025-04-14T07:34:27+00:00" + "time": "2025-04-25T13:48:02+00:00" }, { "name": "utopia-php/framework", - "version": "0.33.19", + "version": "0.33.20", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0" + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", + "url": "https://api.github.com/repos/utopia-php/http/zipball/e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", "shasum": "" }, "require": { @@ -2264,9 +2270,9 @@ ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/0.33.19" + "source": "https://github.com/utopia-php/http/tree/0.33.20" }, - "time": "2025-03-06T11:37:49+00:00" + "time": "2025-05-18T23:51:21+00:00" }, { "name": "utopia-php/pools", @@ -2510,16 +2516,16 @@ }, { "name": "myclabs/deep-copy", - "version": "1.13.0", + "version": "1.13.1", "source": { "type": "git", "url": "https://github.com/myclabs/DeepCopy.git", - "reference": "024473a478be9df5fdaca2c793f2232fe788e414" + "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/024473a478be9df5fdaca2c793f2232fe788e414", - "reference": "024473a478be9df5fdaca2c793f2232fe788e414", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/1720ddd719e16cf0db4eb1c6eca108031636d46c", + "reference": "1720ddd719e16cf0db4eb1c6eca108031636d46c", "shasum": "" }, "require": { @@ -2558,7 +2564,7 @@ ], "support": { "issues": "https://github.com/myclabs/DeepCopy/issues", - "source": "https://github.com/myclabs/DeepCopy/tree/1.13.0" + "source": "https://github.com/myclabs/DeepCopy/tree/1.13.1" }, "funding": [ { @@ -2566,20 +2572,20 @@ "type": "tidelift" } ], - "time": "2025-02-12T12:17:51+00:00" + "time": "2025-04-29T12:36:36+00:00" }, { "name": "nikic/php-parser", - "version": "v5.4.0", + "version": "v5.5.0", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "447a020a1f875a434d62f2a401f53b82a396e494" + "reference": "ae59794362fe85e051a58ad36b289443f57be7a9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/447a020a1f875a434d62f2a401f53b82a396e494", - "reference": "447a020a1f875a434d62f2a401f53b82a396e494", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/ae59794362fe85e051a58ad36b289443f57be7a9", + "reference": "ae59794362fe85e051a58ad36b289443f57be7a9", "shasum": "" }, "require": { @@ -2622,9 +2628,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v5.4.0" + "source": "https://github.com/nikic/PHP-Parser/tree/v5.5.0" }, - "time": "2024-12-30T11:07:19+00:00" + "time": "2025-05-31T08:24:38+00:00" }, { "name": "phar-io/manifest", @@ -2746,16 +2752,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.24", + "version": "1.12.27", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "338b92068f58d9f8035b76aed6cf2b9e5624c025" + "reference": "3a6e423c076ab39dfedc307e2ac627ef579db162" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/338b92068f58d9f8035b76aed6cf2b9e5624c025", - "reference": "338b92068f58d9f8035b76aed6cf2b9e5624c025", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/3a6e423c076ab39dfedc307e2ac627ef579db162", + "reference": "3a6e423c076ab39dfedc307e2ac627ef579db162", "shasum": "" }, "require": { @@ -2800,7 +2806,7 @@ "type": "github" } ], - "time": "2025-04-16T13:01:53+00:00" + "time": "2025-05-21T20:51:45+00:00" }, { "name": "phpunit/php-code-coverage", @@ -3123,16 +3129,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.22", + "version": "9.6.23", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "f80235cb4d3caa59ae09be3adf1ded27521d1a9c" + "reference": "43d2cb18d0675c38bd44982a5d1d88f6d53d8d95" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/f80235cb4d3caa59ae09be3adf1ded27521d1a9c", - "reference": "f80235cb4d3caa59ae09be3adf1ded27521d1a9c", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/43d2cb18d0675c38bd44982a5d1d88f6d53d8d95", + "reference": "43d2cb18d0675c38bd44982a5d1d88f6d53d8d95", "shasum": "" }, "require": { @@ -3143,7 +3149,7 @@ "ext-mbstring": "*", "ext-xml": "*", "ext-xmlwriter": "*", - "myclabs/deep-copy": "^1.12.1", + "myclabs/deep-copy": "^1.13.1", "phar-io/manifest": "^2.0.4", "phar-io/version": "^3.2.1", "php": ">=7.3", @@ -3206,7 +3212,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.22" + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.23" }, "funding": [ { @@ -3217,12 +3223,20 @@ "url": "https://github.com/sebastianbergmann", "type": "github" }, + { + "url": "https://liberapay.com/sebastianbergmann", + "type": "liberapay" + }, + { + "url": "https://thanks.dev/u/gh/sebastianbergmann", + "type": "thanks_dev" + }, { "url": "https://tidelift.com/funding/github/packagist/phpunit/phpunit", "type": "tidelift" } ], - "time": "2024-12-05T13:48:26+00:00" + "time": "2025-05-02T06:40:34+00:00" }, { "name": "sebastian/cli-parser", @@ -4345,7 +4359,9 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": {}, + "stability-flags": { + "appwrite-labs/php-amqplib": 20 + }, "prefer-stable": false, "prefer-lowest": false, "platform": { diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 91afd58..a1f65c7 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -5,9 +5,11 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Connection\AMQPSwooleConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; +use PhpAmqpLib\Wire\IO\SwooleIO; use Utopia\Fetch\Client; use Utopia\Queue\Consumer; use Utopia\Queue\Error\Retryable; @@ -44,7 +46,11 @@ public function __construct( private readonly int $heartbeat = 0, private readonly float $connectTimeout = 3.0, private readonly float $readWriteTimeout = 3.0, + private readonly string $connectionClass = AMQPStreamConnection::class ) { + if (!is_subclass_of($this->connectionClass, AbstractConnection::class)) { + throw new \InvalidArgumentException('Connection class must be a subclass of AbstractConnection'); + } } public function setExchangeArgument(string $key, string $value): void @@ -185,7 +191,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int private function withChannel(callable $callback): void { $createChannel = function (): AMQPChannel { - $connection = new AMQPStreamConnection( + $connection = new ($this->connectionClass)( $this->host, $this->port, $this->user, @@ -193,8 +199,9 @@ private function withChannel(callable $callback): void $this->vhost, connection_timeout: $this->connectTimeout, read_write_timeout: $this->readWriteTimeout, - heartbeat: $this->heartbeat, + heartbeat: $this->heartbeat ); + if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index e8557a3..f45cd99 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -2,19 +2,116 @@ namespace Tests\E2E\Adapter; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Connection\AMQPSwooleConnection; use Utopia\Queue\Broker\AMQP; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; +use function Co\run; + class AMQPTest extends Base { - protected function getPublisher(): Publisher + public function connectionsProvider(): array { - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); + return [ + 'AMQPStreamConnection' => [AMQPStreamConnection::class], + 'AMQPSwooleConnection' => [AMQPSwooleConnection::class], + ]; + } + + /** + * @dataProvider connectionsProvider + */ + protected function getPublisher(string $connectionClass): Publisher + { + if ($connectionClass === AMQPSwooleConnection::class && !extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not available'); + } + + return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp', connectionClass: $connectionClass); } protected function getQueue(): Queue { return new Queue('amqp'); } + + /** + * @dataProvider connectionsProvider + */ + public function testEvents(string $connectionClass): void + { + $publisher = $this->getPublisher($connectionClass); + + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + } + + /** + * @dataProvider connectionsProvider + */ + public function testConcurrency(string $connectionClass): void + { + if (!extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not available'); + } + + run(function () use ($connectionClass) { + $publisher = $this->getPublisher($connectionClass); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + }); + }); + } + + /** + * @dataProvider connectionsProvider + * @depends testEvents + */ + public function testRetry(string $connectionClass): void + { + $publisher = $this->getPublisher($connectionClass); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 1 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 2 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 3 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 4 + ]); + + $this->assertTrue($published); + + sleep(1); + $publisher->retry($this->getQueue()); + sleep(1); + $publisher->retry($this->getQueue(), 2); + sleep(1); + } } diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index e507d0d..3acfe8a 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -3,10 +3,6 @@ namespace Tests\E2E\Adapter; use PHPUnit\Framework\TestCase; -use Utopia\Queue\Publisher; -use Utopia\Queue\Queue; - -use function Co\run; abstract class Base extends TestCase { @@ -53,78 +49,4 @@ public function setUp(): void ] ]; } - - /** - * @return Publisher - */ - abstract protected function getPublisher(): Publisher; - - abstract protected function getQueue(): Queue; - - public function testEvents(): void - { - $publisher = $this->getPublisher(); - - foreach ($this->payloads as $payload) { - $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); - } - - sleep(1); - } - - public function testConcurrency(): void - { - run(function () { - $publisher = $this->getPublisher(); - go(function () use ($publisher) { - foreach ($this->payloads as $payload) { - $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); - } - - sleep(1); - }); - }); - } - - /** - * @depends testEvents - */ - public function testRetry(): void - { - $publisher = $this->getPublisher(); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 1 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 2 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 3 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 4 - ]); - - $this->assertTrue($published); - - sleep(1); - $publisher->retry($this->getQueue()); - sleep(1); - $publisher->retry($this->getQueue(), 2); - sleep(1); - } } From c119b29c30c18f0a45eb7deab74c0f6e0e0475b5 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 20:35:36 +0400 Subject: [PATCH 02/11] chore: update docker-compose healthcheck configuration for Redis and RabbitMQ - Added start_period to healthcheck for Redis cluster services to ensure proper startup timing. - Updated RabbitMQ service healthcheck to include start_period for improved reliability. --- docker-compose.yml | 7 ++++++- src/Queue/Broker/AMQP.php | 2 -- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5db1285..8fafb09 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,6 +77,7 @@ services: healthcheck: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s + start_period: 0s redis-cluster-1: image: docker.io/bitnami/redis-cluster:7.4 @@ -87,6 +88,7 @@ services: healthcheck: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s + start_period: 0s redis-cluster-2: image: docker.io/bitnami/redis-cluster:7.4 @@ -97,6 +99,7 @@ services: healthcheck: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s + start_period: 0s redis-cluster-3: image: docker.io/bitnami/redis-cluster:7.4 @@ -107,6 +110,7 @@ services: healthcheck: test: [ "CMD", "redis-cli", "-h", "localhost", "-p", "6379", "ping" ] start_interval: 1s + start_period: 0s amqp: image: rabbitmq:4 @@ -116,4 +120,5 @@ services: RABBITMQ_DEFAULT_VHOST: "/" healthcheck: test: [ "CMD", "rabbitmqctl", "node_health_check" ] - start_interval: 1s \ No newline at end of file + start_interval: 1s + start_period: 0s \ No newline at end of file diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index a1f65c7..f143fa7 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -5,11 +5,9 @@ use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Connection\AMQPSwooleConnection; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; -use PhpAmqpLib\Wire\IO\SwooleIO; use Utopia\Fetch\Client; use Utopia\Queue\Consumer; use Utopia\Queue\Error\Retryable; From ad824a3c7f65b8cf6af5a9ef46393bf4610fcdf0 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 21:42:44 +0400 Subject: [PATCH 03/11] refactor: enhance AMQP tests to utilize connection class and improve structure - Introduced a private property for connection class in AMQPTest to streamline publisher retrieval. - Updated test methods to use the connection class directly, improving clarity and maintainability. - Renamed test methods for better readability and consistency. - Consolidated publisher retrieval logic into the Base class for reuse across tests. --- tests/Queue/E2E/Adapter/AMQPTest.php | 81 +++++++-------------------- tests/Queue/E2E/Adapter/Base.php | 82 ++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 61 deletions(-) diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index f45cd99..6e8108f 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -12,6 +12,8 @@ class AMQPTest extends Base { + private string $connectionClass; + public function connectionsProvider(): array { return [ @@ -23,13 +25,13 @@ public function connectionsProvider(): array /** * @dataProvider connectionsProvider */ - protected function getPublisher(string $connectionClass): Publisher + protected function getPublisher(): Publisher { - if ($connectionClass === AMQPSwooleConnection::class && !extension_loaded('swoole')) { + if ($this->connectionClass === AMQPSwooleConnection::class && !extension_loaded('swoole')) { $this->markTestSkipped('Swoole extension is not available'); } - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp', connectionClass: $connectionClass); + return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp', connectionClass: $this->connectionClass); } protected function getQueue(): Queue @@ -40,78 +42,35 @@ protected function getQueue(): Queue /** * @dataProvider connectionsProvider */ - public function testEvents(string $connectionClass): void + public function testEventsWithConnection(string $connectionClass): void { - $publisher = $this->getPublisher($connectionClass); - - foreach ($this->payloads as $payload) { - $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); - } - - sleep(1); + $this->connectionClass = $connectionClass; + parent::testEvents(); } /** * @dataProvider connectionsProvider */ - public function testConcurrency(string $connectionClass): void + public function testConcurrencyWithConnection(string $connectionClass): void { - if (!extension_loaded('swoole')) { - $this->markTestSkipped('Swoole extension is not available'); - } + $this->connectionClass = $connectionClass; - run(function () use ($connectionClass) { - $publisher = $this->getPublisher($connectionClass); - go(function () use ($publisher) { - foreach ($this->payloads as $payload) { - $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); - } + if ($this->connectionClass !== AMQPSwooleConnection::class) { + $this->markTestSkipped('Concurrency test can only be run with Swoole'); + } - sleep(1); - }); - }); + parent::testConcurrency(); } /** * @dataProvider connectionsProvider - * @depends testEvents */ - public function testRetry(string $connectionClass): void + public function testRetryWithConnection(string $connectionClass): void { - $publisher = $this->getPublisher($connectionClass); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 1 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 2 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 3 - ]); - - $this->assertTrue($published); - - $published = $publisher->enqueue($this->getQueue(), [ - 'type' => 'test_exception', - 'id' => 4 - ]); - - $this->assertTrue($published); - - sleep(1); - $publisher->retry($this->getQueue()); - sleep(1); - $publisher->retry($this->getQueue(), 2); - sleep(1); + $this->connectionClass = $connectionClass; + // The "depends" annotation will not work with a dataprovider. + // We need to manually call the dependency. + parent::testEvents(); + parent::testRetry(); } } diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 3acfe8a..1a47d59 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -3,6 +3,10 @@ namespace Tests\E2E\Adapter; use PHPUnit\Framework\TestCase; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; + +use function Co\run; abstract class Base extends TestCase { @@ -49,4 +53,82 @@ public function setUp(): void ] ]; } + + /** + * @return Publisher + */ + abstract protected function getPublisher(): Publisher; + + abstract protected function getQueue(): Queue; + + public function testEvents(): void + { + $publisher = $this->getPublisher(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + } + + public function testConcurrency(): void + { + if (!extension_loaded('swoole')) { + $this->markTestSkipped('Swoole extension is not available'); + } + + run(function () { + $publisher = $this->getPublisher(); + go(function () use ($publisher) { + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + }); + }); + } + + /** + * @depends testEvents + */ + public function testRetry(): void + { + $publisher = $this->getPublisher(); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 1 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 2 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 3 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 4 + ]); + + $this->assertTrue($published); + + sleep(1); + $publisher->retry($this->getQueue()); + sleep(1); + $publisher->retry($this->getQueue(), 2); + sleep(1); + } } From 57033c071d3de1c7f85ae590857a6ac8856a6f5e Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 21:51:57 +0400 Subject: [PATCH 04/11] refactor: simplify AMQP tests by commenting out unused connection class and adding debug output - Commented out the AMQPSwooleConnection in the connection class array to streamline test configuration. - Added a debug output statement in the testEventsWithConnection method for troubleshooting purposes. - Removed the Swoole extension check from the testConcurrency method to avoid unnecessary test skips. --- tests/Queue/E2E/Adapter/AMQPTest.php | 3 ++- tests/Queue/E2E/Adapter/Base.php | 4 ---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index 6e8108f..d5e4fd7 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -18,7 +18,7 @@ public function connectionsProvider(): array { return [ 'AMQPStreamConnection' => [AMQPStreamConnection::class], - 'AMQPSwooleConnection' => [AMQPSwooleConnection::class], + // 'AMQPSwooleConnection' => [AMQPSwooleConnection::class], ]; } @@ -44,6 +44,7 @@ protected function getQueue(): Queue */ public function testEventsWithConnection(string $connectionClass): void { + var_dump("Hello"); $this->connectionClass = $connectionClass; parent::testEvents(); } diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 1a47d59..e507d0d 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -74,10 +74,6 @@ public function testEvents(): void public function testConcurrency(): void { - if (!extension_loaded('swoole')) { - $this->markTestSkipped('Swoole extension is not available'); - } - run(function () { $publisher = $this->getPublisher(); go(function () use ($publisher) { From 346599d51509a39244a6b14924f8fdbbe16dcefa Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 22:38:12 +0400 Subject: [PATCH 05/11] chore: update AMQP dependency and refactor connection handling - Replaced appwrite-labs/php-amqplib with appwrite/php-amqplib version 0.1.0 in composer.json and composer.lock for improved stability. - Changed visibility of properties and methods in the AMQP class from private to protected to enhance extensibility. - Removed unused connection class references and streamlined the AMQPTest class for better clarity and maintainability. --- composer.json | 8 +--- composer.lock | 16 ++----- src/Queue/Broker/AMQP.php | 35 +++++++--------- src/Queue/Broker/AMQPSwoole.php | 63 ++++++++++++++++++++++++++++ tests/Queue/E2E/Adapter/AMQPTest.php | 59 +------------------------- 5 files changed, 84 insertions(+), 97 deletions(-) create mode 100644 src/Queue/Broker/AMQPSwoole.php diff --git a/composer.json b/composer.json index 7e3f969..c367905 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "appwrite-labs/php-amqplib": "dev-master", + "appwrite/php-amqplib": "0.1.0", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", @@ -44,12 +44,6 @@ "ext-swoole": "Needed to support Swoole.", "workerman/workerman": "Needed to support Workerman." }, - "repositories": [ - { - "type": "vcs", - "url": "https://github.com/appwrite-labs/php-amqplib" - } - ], "config": { "allow-plugins": { "php-http/discovery": true, diff --git a/composer.lock b/composer.lock index c4cdf8d..48d8bdf 100644 --- a/composer.lock +++ b/composer.lock @@ -4,11 +4,11 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "23b5f3bf9991a1527ac430fd3d2a0357", + "content-hash": "434886be4fd421b224a7d4a9164c22fb", "packages": [ { "name": "appwrite-labs/php-amqplib", - "version": "dev-master", + "version": "0.1.0", "source": { "type": "git", "url": "https://github.com/appwrite-labs/php-amqplib.git", @@ -43,7 +43,6 @@ "suggest": { "ext-swoole": "For Swoole coroutine support" }, - "default-branch": true, "type": "library", "extra": { "branch-alias": { @@ -55,12 +54,7 @@ "PhpAmqpLib\\": "PhpAmqpLib/" } }, - "autoload-dev": { - "psr-4": { - "PhpAmqpLib\\Tests\\Functional\\": "tests/Functional", - "PhpAmqpLib\\Tests\\Unit\\": "tests/Unit" - } - }, + "notification-url": "https://packagist.org/downloads/", "license": [ "LGPL-2.1-or-later" ], @@ -4359,9 +4353,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": { - "appwrite-labs/php-amqplib": 20 - }, + "stability-flags": {}, "prefer-stable": false, "prefer-lowest": false, "platform": { diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index f143fa7..8808b1d 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -19,7 +19,7 @@ class AMQP implements Publisher, Consumer { - private ?AMQPChannel $channel = null; + protected ?AMQPChannel $channel = null; private array $exchangeArguments = []; private array $queueArguments = []; private array $consumerArguments = []; @@ -27,28 +27,24 @@ class AMQP implements Publisher, Consumer /** * @var callable(AbstractConnection $connection): void */ - private $connectionConfigHook; + protected $connectionConfigHook; /** * @var callable(AMQPChannel $channel): void */ - private $channelConfigHook; + protected $channelConfigHook; public function __construct( - private readonly string $host, - private readonly int $port = 5672, - private readonly int $httpPort = 15672, - private readonly ?string $user = null, - private readonly ?string $password = null, - private readonly string $vhost = '/', - private readonly int $heartbeat = 0, - private readonly float $connectTimeout = 3.0, - private readonly float $readWriteTimeout = 3.0, - private readonly string $connectionClass = AMQPStreamConnection::class + protected readonly string $host, + protected readonly int $port = 5672, + protected readonly int $httpPort = 15672, + protected readonly ?string $user = null, + protected readonly ?string $password = null, + protected readonly string $vhost = '/', + protected readonly int $heartbeat = 0, + protected readonly float $connectTimeout = 3.0, + protected readonly float $readWriteTimeout = 3.0, ) { - if (!is_subclass_of($this->connectionClass, AbstractConnection::class)) { - throw new \InvalidArgumentException('Connection class must be a subclass of AbstractConnection'); - } } public function setExchangeArgument(string $key, string $value): void @@ -186,10 +182,10 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int * @param callable(AMQPChannel $channel): void $callback * @throws \Exception */ - private function withChannel(callable $callback): void + protected function withChannel(callable $callback): void { $createChannel = function (): AMQPChannel { - $connection = new ($this->connectionClass)( + $connection = new AMQPStreamConnection( $this->host, $this->port, $this->user, @@ -197,9 +193,8 @@ private function withChannel(callable $callback): void $this->vhost, connection_timeout: $this->connectTimeout, read_write_timeout: $this->readWriteTimeout, - heartbeat: $this->heartbeat + heartbeat: $this->heartbeat, ); - if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php new file mode 100644 index 0000000..3f0ceac --- /dev/null +++ b/src/Queue/Broker/AMQPSwoole.php @@ -0,0 +1,63 @@ +host, + $this->port, + $this->user, + $this->password, + $this->vhost, + false, // insist + 'AMQPLAIN', // login_method + 'en_US', // locale + $this->connectTimeout, // connection_timeout + $this->readWriteTimeout, // read_write_timeout + null, // context + false, // keepalive + $this->heartbeat, // heartbeat + 0.0 // channel_rpc_timeout + ); + + if (is_callable($this->connectionConfigHook)) { + call_user_func($this->connectionConfigHook, $connection); + } + + $channel = $connection->channel(); + + if (is_callable($this->channelConfigHook)) { + call_user_func($this->channelConfigHook, $channel); + } + + return $channel; + }; + + if (!$this->channel) { + $this->channel = $createChannel(); + } + + try { + $callback($this->channel); + } catch (\Throwable) { + // createChannel() might throw, in that case set the channel to `null` first. + $this->channel = null; + // try creating a new connection once, if this still fails, throw the error + $this->channel = $createChannel(); + $callback($this->channel); + } + } +} \ No newline at end of file diff --git a/tests/Queue/E2E/Adapter/AMQPTest.php b/tests/Queue/E2E/Adapter/AMQPTest.php index d5e4fd7..e8557a3 100644 --- a/tests/Queue/E2E/Adapter/AMQPTest.php +++ b/tests/Queue/E2E/Adapter/AMQPTest.php @@ -2,76 +2,19 @@ namespace Tests\E2E\Adapter; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Connection\AMQPSwooleConnection; use Utopia\Queue\Broker\AMQP; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; -use function Co\run; - class AMQPTest extends Base { - private string $connectionClass; - - public function connectionsProvider(): array - { - return [ - 'AMQPStreamConnection' => [AMQPStreamConnection::class], - // 'AMQPSwooleConnection' => [AMQPSwooleConnection::class], - ]; - } - - /** - * @dataProvider connectionsProvider - */ protected function getPublisher(): Publisher { - if ($this->connectionClass === AMQPSwooleConnection::class && !extension_loaded('swoole')) { - $this->markTestSkipped('Swoole extension is not available'); - } - - return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp', connectionClass: $this->connectionClass); + return new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); } protected function getQueue(): Queue { return new Queue('amqp'); } - - /** - * @dataProvider connectionsProvider - */ - public function testEventsWithConnection(string $connectionClass): void - { - var_dump("Hello"); - $this->connectionClass = $connectionClass; - parent::testEvents(); - } - - /** - * @dataProvider connectionsProvider - */ - public function testConcurrencyWithConnection(string $connectionClass): void - { - $this->connectionClass = $connectionClass; - - if ($this->connectionClass !== AMQPSwooleConnection::class) { - $this->markTestSkipped('Concurrency test can only be run with Swoole'); - } - - parent::testConcurrency(); - } - - /** - * @dataProvider connectionsProvider - */ - public function testRetryWithConnection(string $connectionClass): void - { - $this->connectionClass = $connectionClass; - // The "depends" annotation will not work with a dataprovider. - // We need to manually call the dependency. - parent::testEvents(); - parent::testRetry(); - } } From d46c4f2ecf1999da6a2027044d2fcd8a7252f67e Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 22:38:52 +0400 Subject: [PATCH 06/11] refactor: clean up whitespace in AMQPSwoole class - Removed unnecessary whitespace in the AMQPSwoole class to improve code readability. - Ensured consistent formatting by adding a newline at the end of the file. --- src/Queue/Broker/AMQPSwoole.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Queue/Broker/AMQPSwoole.php b/src/Queue/Broker/AMQPSwoole.php index 3f0ceac..0803e57 100644 --- a/src/Queue/Broker/AMQPSwoole.php +++ b/src/Queue/Broker/AMQPSwoole.php @@ -9,7 +9,7 @@ class AMQPSwoole extends AMQP { /** * Override the withChannel method to use AMQPSwooleConnection instead of AMQPStreamConnection - * + * * @param callable(AMQPChannel $channel): void $callback * @throws \Exception */ @@ -32,17 +32,17 @@ protected function withChannel(callable $callback): void $this->heartbeat, // heartbeat 0.0 // channel_rpc_timeout ); - + if (is_callable($this->connectionConfigHook)) { call_user_func($this->connectionConfigHook, $connection); } - + $channel = $connection->channel(); - + if (is_callable($this->channelConfigHook)) { call_user_func($this->channelConfigHook, $channel); } - + return $channel; }; @@ -60,4 +60,4 @@ protected function withChannel(callable $callback): void $callback($this->channel); } } -} \ No newline at end of file +} From 12808fdf46fea7c6349bb48f49b9cc12ee22fb43 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 23:29:02 +0400 Subject: [PATCH 07/11] chore: update dependencies and enhance docker-compose configuration - Replaced appwrite/php-amqplib with appwrite-labs/php-amqplib in composer.json for better compatibility. - Added amqp-swoole service to docker-compose.yml for improved testing capabilities. - Updated phpunit.xml to enable process isolation and stop on failure for more robust test execution. --- composer.json | 2 +- docker-compose.yml | 13 +++ docs/AMQPSwoole.md | 114 +++++++++++++++++++++ phpunit.xml | 4 +- tests/Queue/E2E/Adapter/AMQPSwooleTest.php | 108 +++++++++++++++++++ tests/Queue/servers/AMQPSwoole/Dockerfile | 3 + tests/Queue/servers/AMQPSwoole/worker.php | 32 ++++++ 7 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 docs/AMQPSwoole.md create mode 100644 tests/Queue/E2E/Adapter/AMQPSwooleTest.php create mode 100644 tests/Queue/servers/AMQPSwoole/Dockerfile create mode 100644 tests/Queue/servers/AMQPSwoole/worker.php diff --git a/composer.json b/composer.json index c367905..5379dce 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "appwrite/php-amqplib": "0.1.0", + "appwrite-labs/php-amqplib": "0.1.0", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", diff --git a/docker-compose.yml b/docker-compose.yml index 8fafb09..3caf1c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,9 +5,11 @@ services: volumes: - ./src:/usr/local/src/src - ./tests:/usr/local/src/tests + - ./phpunit.xml:/usr/local/src/phpunit.xml depends_on: - swoole - swoole-amqp + - amqp-swoole - swoole-redis-cluster - workerman @@ -43,6 +45,17 @@ services: amqp: condition: service_healthy + amqp-swoole: + container_name: amqp-swoole + build: ./tests/Queue/servers/AMQPSwoole/. + command: php /usr/src/code/tests/Queue/servers/AMQPSwoole/worker.php + volumes: + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests + depends_on: + amqp: + condition: service_healthy + workerman: container_name: workerman build: ./tests/Queue/servers/Workerman/. diff --git a/docs/AMQPSwoole.md b/docs/AMQPSwoole.md new file mode 100644 index 0000000..2a67790 --- /dev/null +++ b/docs/AMQPSwoole.md @@ -0,0 +1,114 @@ +# AMQPSwoole Broker + +## Overview + +The `AMQPSwoole` class is a specialized AMQP broker implementation designed for use in Swoole environments. It extends the base `AMQP` class and resolves compatibility issues that occur when using the standard AMQP broker in Swoole coroutines. + +## Why AMQPSwoole is Needed + +When using the standard `AMQP` broker in Swoole environments, you may encounter errors like: + +``` +Fatal error: Uncaught Swoole\Error: API must be called in the coroutine +``` + +This happens because: +- The standard AMQP broker uses `AMQPStreamConnection` with `StreamIO` +- `StreamIO` calls `stream_select()` which is not allowed outside Swoole coroutines +- This causes fatal errors during connection cleanup and heartbeat operations + +## Solution + +The `AMQPSwoole` class solves this by: +- Using `AMQPSwooleConnection` instead of `AMQPStreamConnection` +- Leveraging `SwooleIO` which is designed for Swoole environments +- Properly handling coroutine-based I/O operations + +## Usage + +### Basic Usage + +```php +use Utopia\Queue\Broker\AMQPSwoole; +use Utopia\Queue\Queue; + +// Create broker instance +$broker = new AMQPSwoole( + host: 'localhost', + port: 5672, + user: 'guest', + password: 'guest' +); + +// Create queue +$queue = new Queue('my-queue'); + +// Enqueue a message +$broker->enqueue($queue, ['message' => 'Hello World']); +``` + +### Consumer Usage + +```php +use Utopia\Queue\Consumer; +use Utopia\Queue\Message; + +$broker->consume( + $queue, + function (Message $message) { + // Process the message + $payload = $message->getPayload(); + echo "Processing: " . json_encode($payload) . PHP_EOL; + + // Return result (optional) + return new \Utopia\Queue\Result\Commit(); + }, + function (Message $message) { + echo "Message processed successfully" . PHP_EOL; + }, + function (?Message $message, \Throwable $error) { + echo "Error processing message: " . $error->getMessage() . PHP_EOL; + } +); +``` + +## Configuration + +The `AMQPSwoole` class accepts the same configuration parameters as the base `AMQP` class: + +- `host`: RabbitMQ server hostname +- `port`: RabbitMQ server port (default: 5672) +- `httpPort`: RabbitMQ management HTTP port (default: 15672) +- `user`: Username for authentication +- `password`: Password for authentication +- `vhost`: Virtual host (default: '/') +- `heartbeat`: Heartbeat interval in seconds (default: 0) +- `connectTimeout`: Connection timeout in seconds (default: 3.0) +- `readWriteTimeout`: Read/write timeout in seconds (default: 3.0) + +## Testing + +The AMQPSwoole broker includes comprehensive tests that verify: +- Basic message enqueueing and processing +- Concurrency handling in Swoole environments +- Error handling and retry mechanisms +- Queue size reporting + +Run tests with: +```bash +composer test +``` + +## Migration from AMQP to AMQPSwoole + +If you're currently using the `AMQP` broker in a Swoole environment and experiencing errors, migration is simple: + +```php +// Before +$broker = new \Utopia\Queue\Broker\AMQP($host, $port, $httpPort, $user, $password); + +// After +$broker = new \Utopia\Queue\Broker\AMQPSwoole($host, $port, $httpPort, $user, $password); +``` + +All other APIs remain exactly the same. \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml index 1b8f40d..61f6f70 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -5,8 +5,8 @@ convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" - processIsolation="false" - stopOnFailure="false" + processIsolation="true" + stopOnFailure="true" > diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php new file mode 100644 index 0000000..9893b7a --- /dev/null +++ b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php @@ -0,0 +1,108 @@ +getPublisher(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + // Allow some time for async processing (if any) + sleep(1); + + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + public function testConcurrency(): void + { + run(function () { + go(function () { + $publisher = $this->getPublisher(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); + } + + sleep(1); + + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } + + /** + * Override testRetry to run within Swoole coroutines + * @depends testEvents + */ + public function testRetry(): void + { + run(function () { + go(function () { + $publisher = $this->getPublisher(); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 1 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 2 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 3 + ]); + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ + 'type' => 'test_exception', + 'id' => 4 + ]); + + $this->assertTrue($published); + + sleep(1); + + /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + $publisher->close(); + }); + }); + } +} \ No newline at end of file diff --git a/tests/Queue/servers/AMQPSwoole/Dockerfile b/tests/Queue/servers/AMQPSwoole/Dockerfile new file mode 100644 index 0000000..ef72435 --- /dev/null +++ b/tests/Queue/servers/AMQPSwoole/Dockerfile @@ -0,0 +1,3 @@ +FROM phpswoole/swoole:php8.3-alpine + +RUN apk add autoconf build-base \ No newline at end of file diff --git a/tests/Queue/servers/AMQPSwoole/worker.php b/tests/Queue/servers/AMQPSwoole/worker.php new file mode 100644 index 0000000..fe17173 --- /dev/null +++ b/tests/Queue/servers/AMQPSwoole/worker.php @@ -0,0 +1,32 @@ +job() + ->inject('message') + ->action(function (Message $message) { + handleRequest($message); + }); + +$server + ->error() + ->inject('error') + ->action(function ($th) { + echo $th->getMessage() . PHP_EOL; + }); + +$server + ->workerStart() + ->action(function () { + echo "Worker Started" . PHP_EOL; + }); + +$server->start(); \ No newline at end of file From 7343eeba980d5de848ce176dacffaf13fa8e32c6 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 23:35:39 +0400 Subject: [PATCH 08/11] refactor: streamline AMQPSwoole tests and improve readability - Simplified test methods by removing redundant go function declarations and utilizing the publisher directly. - Added retry logic in the testRetry method for enhanced testing of message retries. - Ensured consistent formatting by adding a newline at the end of the worker.php file. --- tests/Queue/E2E/Adapter/AMQPSwooleTest.php | 31 ++++++++++++---------- tests/Queue/servers/AMQPSwoole/worker.php | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php index 9893b7a..7c2f539 100644 --- a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php +++ b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php @@ -26,34 +26,35 @@ protected function getQueue(): Queue public function testEvents(): void { run(function () { - go(function () { - $publisher = $this->getPublisher(); - + $publisher = $this->getPublisher(); + go(function () use ($publisher) { foreach ($this->payloads as $payload) { $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); } - // Allow some time for async processing (if any) sleep(1); - /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ $publisher->close(); }); }); } + // public function tearDown(): void + // { + // $publisher = $this->getPublisher(); + // /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ + // } + public function testConcurrency(): void { run(function () { - go(function () { - $publisher = $this->getPublisher(); - + $publisher = $this->getPublisher(); + go(function () use ($publisher) { foreach ($this->payloads as $payload) { $this->assertTrue($publisher->enqueue($this->getQueue(), $payload)); } sleep(1); - /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ $publisher->close(); }); @@ -67,9 +68,8 @@ public function testConcurrency(): void public function testRetry(): void { run(function () { - go(function () { - $publisher = $this->getPublisher(); - + $publisher = $this->getPublisher(); + go(function () use ($publisher) { $published = $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 1 @@ -99,10 +99,13 @@ public function testRetry(): void $this->assertTrue($published); sleep(1); - + $publisher->retry($this->getQueue()); + sleep(1); + $publisher->retry($this->getQueue(), 2); + sleep(1); /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ $publisher->close(); }); }); } -} \ No newline at end of file +} diff --git a/tests/Queue/servers/AMQPSwoole/worker.php b/tests/Queue/servers/AMQPSwoole/worker.php index fe17173..107b4db 100644 --- a/tests/Queue/servers/AMQPSwoole/worker.php +++ b/tests/Queue/servers/AMQPSwoole/worker.php @@ -29,4 +29,4 @@ echo "Worker Started" . PHP_EOL; }); -$server->start(); \ No newline at end of file +$server->start(); From baa8c074ce1d73769cab61d2a1c2c4a9e4f97896 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Mon, 23 Jun 2025 23:50:06 +0400 Subject: [PATCH 09/11] refactor: update AMQP service configuration and remove AMQPSwoole documentation - Changed the build path for the amqp-swoole service in docker-compose.yml to reflect the new directory structure. - Deleted the AMQPSwoole.md documentation file as it is no longer needed. - Added exchange declaration in the AMQP class to ensure proper message routing. --- docker-compose.yml | 2 +- docs/AMQPSwoole.md | 114 ---------------------- src/Queue/Broker/AMQP.php | 7 ++ tests/Queue/servers/AMQPSwoole/Dockerfile | 3 - 4 files changed, 8 insertions(+), 118 deletions(-) delete mode 100644 docs/AMQPSwoole.md delete mode 100644 tests/Queue/servers/AMQPSwoole/Dockerfile diff --git a/docker-compose.yml b/docker-compose.yml index 3caf1c4..20cb2c0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,7 +47,7 @@ services: amqp-swoole: container_name: amqp-swoole - build: ./tests/Queue/servers/AMQPSwoole/. + build: ./tests/Queue/servers/AMQP/. command: php /usr/src/code/tests/Queue/servers/AMQPSwoole/worker.php volumes: - ./src:/usr/local/src/src diff --git a/docs/AMQPSwoole.md b/docs/AMQPSwoole.md deleted file mode 100644 index 2a67790..0000000 --- a/docs/AMQPSwoole.md +++ /dev/null @@ -1,114 +0,0 @@ -# AMQPSwoole Broker - -## Overview - -The `AMQPSwoole` class is a specialized AMQP broker implementation designed for use in Swoole environments. It extends the base `AMQP` class and resolves compatibility issues that occur when using the standard AMQP broker in Swoole coroutines. - -## Why AMQPSwoole is Needed - -When using the standard `AMQP` broker in Swoole environments, you may encounter errors like: - -``` -Fatal error: Uncaught Swoole\Error: API must be called in the coroutine -``` - -This happens because: -- The standard AMQP broker uses `AMQPStreamConnection` with `StreamIO` -- `StreamIO` calls `stream_select()` which is not allowed outside Swoole coroutines -- This causes fatal errors during connection cleanup and heartbeat operations - -## Solution - -The `AMQPSwoole` class solves this by: -- Using `AMQPSwooleConnection` instead of `AMQPStreamConnection` -- Leveraging `SwooleIO` which is designed for Swoole environments -- Properly handling coroutine-based I/O operations - -## Usage - -### Basic Usage - -```php -use Utopia\Queue\Broker\AMQPSwoole; -use Utopia\Queue\Queue; - -// Create broker instance -$broker = new AMQPSwoole( - host: 'localhost', - port: 5672, - user: 'guest', - password: 'guest' -); - -// Create queue -$queue = new Queue('my-queue'); - -// Enqueue a message -$broker->enqueue($queue, ['message' => 'Hello World']); -``` - -### Consumer Usage - -```php -use Utopia\Queue\Consumer; -use Utopia\Queue\Message; - -$broker->consume( - $queue, - function (Message $message) { - // Process the message - $payload = $message->getPayload(); - echo "Processing: " . json_encode($payload) . PHP_EOL; - - // Return result (optional) - return new \Utopia\Queue\Result\Commit(); - }, - function (Message $message) { - echo "Message processed successfully" . PHP_EOL; - }, - function (?Message $message, \Throwable $error) { - echo "Error processing message: " . $error->getMessage() . PHP_EOL; - } -); -``` - -## Configuration - -The `AMQPSwoole` class accepts the same configuration parameters as the base `AMQP` class: - -- `host`: RabbitMQ server hostname -- `port`: RabbitMQ server port (default: 5672) -- `httpPort`: RabbitMQ management HTTP port (default: 15672) -- `user`: Username for authentication -- `password`: Password for authentication -- `vhost`: Virtual host (default: '/') -- `heartbeat`: Heartbeat interval in seconds (default: 0) -- `connectTimeout`: Connection timeout in seconds (default: 3.0) -- `readWriteTimeout`: Read/write timeout in seconds (default: 3.0) - -## Testing - -The AMQPSwoole broker includes comprehensive tests that verify: -- Basic message enqueueing and processing -- Concurrency handling in Swoole environments -- Error handling and retry mechanisms -- Queue size reporting - -Run tests with: -```bash -composer test -``` - -## Migration from AMQP to AMQPSwoole - -If you're currently using the `AMQP` broker in a Swoole environment and experiencing errors, migration is simple: - -```php -// Before -$broker = new \Utopia\Queue\Broker\AMQP($host, $port, $httpPort, $user, $password); - -// After -$broker = new \Utopia\Queue\Broker\AMQPSwoole($host, $port, $httpPort, $user, $password); -``` - -All other APIs remain exactly the same. \ No newline at end of file diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 8808b1d..b6fc0c3 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -145,6 +145,13 @@ public function enqueue(Queue $queue, array $payload): bool ]; $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { + $channel->exchange_declare( + $queue->namespace, + AMQPExchangeType::TOPIC, + durable: true, + auto_delete: false, + arguments: new AMQPTable($this->exchangeArguments) + ); $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); }); return true; diff --git a/tests/Queue/servers/AMQPSwoole/Dockerfile b/tests/Queue/servers/AMQPSwoole/Dockerfile deleted file mode 100644 index ef72435..0000000 --- a/tests/Queue/servers/AMQPSwoole/Dockerfile +++ /dev/null @@ -1,3 +0,0 @@ -FROM phpswoole/swoole:php8.3-alpine - -RUN apk add autoconf build-base \ No newline at end of file From 080639c0cd05abf5d897c1486825cf74c1ad1961 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Tue, 24 Jun 2025 14:55:47 +0400 Subject: [PATCH 10/11] refactor: update PHPUnit configuration and clean up docker-compose - Removed phpunit.xml from docker-compose volumes to streamline configuration. - Disabled process isolation and stop on failure in phpunit.xml for improved test execution. - Commented out unused tearDown method in AMQPSwooleTest to enhance code clarity. --- docker-compose.yml | 1 - phpunit.xml | 4 ++-- tests/Queue/E2E/Adapter/AMQPSwooleTest.php | 6 ------ 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 20cb2c0..76740b6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,6 @@ services: volumes: - ./src:/usr/local/src/src - ./tests:/usr/local/src/tests - - ./phpunit.xml:/usr/local/src/phpunit.xml depends_on: - swoole - swoole-amqp diff --git a/phpunit.xml b/phpunit.xml index 61f6f70..1b8f40d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -5,8 +5,8 @@ convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" - processIsolation="true" - stopOnFailure="true" + processIsolation="false" + stopOnFailure="false" > diff --git a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php index 7c2f539..54d1752 100644 --- a/tests/Queue/E2E/Adapter/AMQPSwooleTest.php +++ b/tests/Queue/E2E/Adapter/AMQPSwooleTest.php @@ -39,12 +39,6 @@ public function testEvents(): void }); } - // public function tearDown(): void - // { - // $publisher = $this->getPublisher(); - // /** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */ - // } - public function testConcurrency(): void { run(function () { From 91579cd1f6b09c14e99faa53de3c52a54ce52282 Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Tue, 24 Jun 2025 22:48:20 +0400 Subject: [PATCH 11/11] chore: update appwrite-labs/php-amqplib dependency version - Changed the version constraint for appwrite-labs/php-amqplib in composer.json to allow for minor updates. - Updated composer.lock to reflect the new version 0.1.1 of appwrite-labs/php-amqplib. - Removed unnecessary exchange declaration code in the AMQP class to streamline message publishing. --- composer.json | 2 +- composer.lock | 14 +++++++------- src/Queue/Broker/AMQP.php | 9 +-------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/composer.json b/composer.json index 5379dce..076dad8 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,7 @@ }, "require": { "php": ">=8.3", - "appwrite-labs/php-amqplib": "0.1.0", + "appwrite-labs/php-amqplib": "^0.1", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*", diff --git a/composer.lock b/composer.lock index 48d8bdf..b0efdc6 100644 --- a/composer.lock +++ b/composer.lock @@ -4,20 +4,20 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "434886be4fd421b224a7d4a9164c22fb", + "content-hash": "9de2edbb13039237d2a64acf9578bc19", "packages": [ { "name": "appwrite-labs/php-amqplib", - "version": "0.1.0", + "version": "0.1.1", "source": { "type": "git", "url": "https://github.com/appwrite-labs/php-amqplib.git", - "reference": "30d709df7510e784d16ad7a7f817d26fe79615a0" + "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/30d709df7510e784d16ad7a7f817d26fe79615a0", - "reference": "30d709df7510e784d16ad7a7f817d26fe79615a0", + "url": "https://api.github.com/repos/appwrite-labs/php-amqplib/zipball/bd380cbd63c8c0f063a3893b7a0b889d40876861", + "reference": "bd380cbd63c8c0f063a3893b7a0b889d40876861", "shasum": "" }, "require": { @@ -95,9 +95,9 @@ "swoole" ], "support": { - "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.0" + "source": "https://github.com/appwrite-labs/php-amqplib/tree/0.1.1" }, - "time": "2025-06-13T21:45:04+00:00" + "time": "2025-06-24T18:12:57+00:00" }, { "name": "brick/math", diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index b6fc0c3..1811302 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -43,7 +43,7 @@ public function __construct( protected readonly string $vhost = '/', protected readonly int $heartbeat = 0, protected readonly float $connectTimeout = 3.0, - protected readonly float $readWriteTimeout = 3.0, + protected readonly float $readWriteTimeout = 3.0 ) { } @@ -145,13 +145,6 @@ public function enqueue(Queue $queue, array $payload): bool ]; $message = new AMQPMessage(json_encode($payload), ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->withChannel(function (AMQPChannel $channel) use ($message, $queue) { - $channel->exchange_declare( - $queue->namespace, - AMQPExchangeType::TOPIC, - durable: true, - auto_delete: false, - arguments: new AMQPTable($this->exchangeArguments) - ); $channel->basic_publish($message, $queue->namespace, routing_key: $queue->name); }); return true;