diff --git a/CHANGELOG.md b/CHANGELOG.md index f8b06609..2ff3face 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ ### Testcontainers - Add constructor/builder parameters to supply the initial Lua script as a string or as a file path, and optional additional script paths copied into the container data directory (`Tarantool2Container`, `CartridgeClusterContainer`, `VshardClusterContainer`); simplify bundled `server.lua` accordingly. +- Upgrade TQE to v3.5.0. ### Documentation @@ -65,7 +66,7 @@ - Fix a memory leak due to the use of a local thread pool ### Spring-data -- Add `tarantool-spring-data-35` module with support for Spring Boot 3.5.x and Spring Data 3.5 +- Add `tarantool-spring-data-35` module with support for Spring Boot 3.5.x and Spring Data 3.5 ### Client - Add `TupleMapper` utility class for easy tuple-to-POJO mapping using field format diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java index 2f6ac768..74883e51 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainer.java @@ -50,7 +50,7 @@ public interface GrpcContainer> enum GrpcRole { CONSUMER("consumer"), - PUBLISHER("publisher"); + PRODUCER("producer"); private final String role; diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java index 8bc3594b..aba179a4 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/GrpcContainerImpl.java @@ -26,7 +26,7 @@ import org.testcontainers.containers.tqe.configuration.grpc.ConsumerConfig; import org.testcontainers.containers.tqe.configuration.grpc.GrpcConfiguration; import org.testcontainers.containers.tqe.configuration.grpc.GrpcListen; -import org.testcontainers.containers.tqe.configuration.grpc.PublisherConfig; +import org.testcontainers.containers.tqe.configuration.grpc.ProducerConfig; import org.testcontainers.containers.utils.Utils; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.utility.DockerImageName; @@ -223,11 +223,10 @@ private static Set resolveGrpcPorts(GrpcConfiguration config, Path conf * (required). */ private static Set resolveRoles(GrpcConfiguration config, Path configPath) { - final Optional isPublisher = - config.getPublisher().flatMap(PublisherConfig::getEnabled); + final Optional isPublisher = config.getProducer().flatMap(ProducerConfig::getEnabled); final Set roles = new LinkedHashSet<>(); if (isPublisher.isPresent() && isPublisher.get()) { - roles.add(GrpcRole.PUBLISHER); + roles.add(GrpcRole.PRODUCER); LOGGER.trace("Publisher role is enabled for: {}", configPath); } diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java index f7607ad6..bcc85aa9 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/TQEClusterImpl.java @@ -41,18 +41,8 @@ public synchronized void start() { this.queue().size(), this.grpc().size()); - startParallel(this.configurator.queue(), this.configurator); - startParallel(this.configurator.grpc(), this.configurator); - if (this.configurator.isConfigured()) { - LOGGER.warn( - "TQE cluster [name = {}, queue = {}, grpc = {}] already configured", - this.configurator.clusterName(), - this.configurator.queue().size(), - this.configurator.grpc().size()); - return; - } - - this.configurator.configure(); + startTarantoolCluster(); + startGrpcEndpoints(); } @Override @@ -95,6 +85,24 @@ private static void startParallel( } } + private void startTarantoolCluster() { + startParallel(this.configurator.queue(), this.configurator); + if (this.configurator.isConfigured()) { + LOGGER.warn( + "TQE cluster [name = {}, queue = {}, grpc = {}] already configured", + this.configurator.clusterName(), + this.configurator.queue().size(), + this.configurator.grpc().size()); + return; + } + + this.configurator.configure(); + } + + private void startGrpcEndpoints() { + startParallel(this.configurator.grpc(), this.configurator); + } + @Override public String clusterName() { return this.configurator.clusterName(); diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java index 66f69f47..49defe6e 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/FileTQEConfigurator.java @@ -67,7 +67,7 @@ public class FileTQEConfigurator implements TQEConfigurator { private static final String CONFIGURATOR_ERROR_MSG = "An error occurred when configuring the TQE cluster. See logs for details."; - private static final String TQE_ROUTER_ROLE = "app.roles.api"; + private static final String TQE_ROUTER_ROLE = "roles.tqe-router"; /* /********************************************************** diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java index 22c00797..0594fd3d 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/GrpcConfiguration.java @@ -28,7 +28,7 @@ EtcdAddr string `mapstructure:"etcd_addr"` Daemon bool `mapstructure:"daemon"` - Publisher PublisherConfig `mapstructure:"publisher"` + Publisher ProducerConfig `mapstructure:"producer"` Consumer ConsumerConfig `mapstructure:"consumer"` MockServer MockServerConfig `mapstructure:"mock_server"` @@ -72,8 +72,8 @@ public class GrpcConfiguration { @JsonProperty("daemon") private final Boolean daemon; - @JsonProperty("publisher") - private final PublisherConfig publisher; + @JsonProperty("producer") + private final ProducerConfig producer; @JsonProperty("consumer") private final ConsumerConfig consumer; @@ -100,7 +100,7 @@ public GrpcConfiguration( @JsonProperty("config_path") String configPath, @JsonProperty("etcd_addr") String etcdAddr, @JsonProperty("daemon") Boolean daemon, - @JsonProperty("publisher") PublisherConfig publisher, + @JsonProperty("producer") ProducerConfig producer, @JsonProperty("consumer") ConsumerConfig consumer, @JsonProperty("mock_server") MockServerConfig mockServer, @JsonProperty("tracing") TracingConfig tracing, @@ -116,7 +116,7 @@ public GrpcConfiguration( this.configPath = configPath; this.etcdAddr = etcdAddr; this.daemon = daemon; - this.publisher = publisher; + this.producer = producer; this.consumer = consumer; this.mockServer = mockServer; this.tracing = tracing; @@ -167,8 +167,8 @@ public Optional getDaemon() { return Optional.ofNullable(this.daemon); } - public Optional getPublisher() { - return Optional.ofNullable(this.publisher); + public Optional getProducer() { + return Optional.ofNullable(this.producer); } public Optional getConsumer() { diff --git a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/PublisherConfig.java b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/ProducerConfig.java similarity index 93% rename from testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/PublisherConfig.java rename to testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/ProducerConfig.java index 99a5aab0..59290288 100644 --- a/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/PublisherConfig.java +++ b/testcontainers/src/main/java/org/testcontainers/containers/tqe/configuration/grpc/ProducerConfig.java @@ -11,13 +11,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; /* -type PublisherConfig struct { +type ProducerConfig struct { Enabled bool `mapstructure:"enabled"` LocalRouting bool `mapstructure:"local_routing"` Tarantool TarantoolClientConfig `mapstructure:"tarantool"` } */ -public class PublisherConfig { +public class ProducerConfig { @JsonProperty("enabled") private final Boolean enabled; @@ -29,7 +29,7 @@ public class PublisherConfig { private final TarantoolClientConfig tarantool; @JsonCreator - public PublisherConfig( + public ProducerConfig( @JsonProperty("enabled") Boolean enabled, @JsonProperty("local_routing") Boolean localRoutingEnabled, @JsonProperty("tarantool") TarantoolClientConfig tarantool) { diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java index 78db72b5..134d35e6 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/CommonTest.java @@ -20,7 +20,7 @@ public abstract class CommonTest { protected static final DockerImageName IMAGE_NAME = DockerImageName.parse( System.getenv().getOrDefault("TARANTOOL_REGISTRY", "") - + "tarantool/message-queue-ee:2.5.3"); + + "tarantool/message-queue-ee:v3.5.0"); protected static final Path SIMPLE_GRPC_CONFIG; protected static final Path SIMPLE_QUEUE_CONFIG; diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java index 4bf4f112..25fb4c07 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/FileTQEConfiguratorTest.java @@ -30,6 +30,7 @@ void simpleConfiguration() { FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG)) .build()) { configurator.queue().values().parallelStream().forEach(Startable::start); + configurator.configure(); configurator.grpc().values().parallelStream().forEach(Startable::start); } catch (Exception e) { throw new RuntimeException(e); diff --git a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java index 20990794..7633380c 100644 --- a/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java +++ b/testcontainers/src/test/java/org/testcontainers/containers/integration/tqe/TQEClusterImplTest.java @@ -43,14 +43,15 @@ import org.testcontainers.containers.tqe.configuration.TQEConfigurator; import org.testcontainers.containers.utils.pojo.User; import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; -import tarantool.queue_ee.Consumer.SubscriptionNotifications; import tarantool.queue_ee.Consumer.SubscriptionRequest; +import tarantool.queue_ee.Consumer.SubscriptionStreamRequest; +import tarantool.queue_ee.Consumer.SubscriptionStreamResponse; import tarantool.queue_ee.ConsumerServiceGrpc; import tarantool.queue_ee.ConsumerServiceGrpc.ConsumerServiceStub; -import tarantool.queue_ee.Publisher.BatchRequestMessage; -import tarantool.queue_ee.Publisher.PublishBatchRequest; -import tarantool.queue_ee.PublisherServiceGrpc; -import tarantool.queue_ee.PublisherServiceGrpc.PublisherServiceBlockingStub; +import tarantool.queue_ee.ProducerGrpc; +import tarantool.queue_ee.ProducerGrpc.ProducerBlockingStub; +import tarantool.queue_ee.ProducerOuterClass.ProduceMessage; +import tarantool.queue_ee.ProducerOuterClass.ProduceRequest; class TQEClusterImplTest extends CommonTest { @@ -237,7 +238,7 @@ public static Stream dataForTestInvalidGrpcConfig() { grpc_listen: - uri: 'tcp://0.0.0.0:18182' - publisher: + producer: enabled: true tarantool: user: test-super @@ -255,13 +256,13 @@ public static Stream dataForTestInvalidGrpcConfig() { storage: - "master:3301" """, - // no consumers and publishers + // no consumers and producers """ core_port: 1111 grpc_listen: - uri: 'tcp://0.0.0.0:18182' - publisher: + producer: enabled: false tarantool: user: test-super @@ -284,7 +285,7 @@ public static Stream dataForTestInvalidGrpcConfig() { grpc_listen: - uri: 'tcp://0.0.0.0:18182' - publisher: + producer: enabled: true tarantool: user: test-super @@ -306,7 +307,7 @@ public static Stream dataForTestInvalidGrpcConfig() { """ core_port: 1111 - publisher: + producer: enabled: true tarantool: user: test-super @@ -369,19 +370,19 @@ void testPublishAndConsumeData() { final String queueName = "test"; - final List> publishers = + final List> producers = cluster.grpc().values().stream() - .filter(g -> g.roles().contains(GrpcRole.PUBLISHER)) + .filter(g -> g.roles().contains(GrpcRole.PRODUCER)) .toList(); final List> consumers = cluster.grpc().values().stream() .filter(g -> g.roles().contains(GrpcRole.CONSUMER)) .toList(); - Assertions.assertFalse(publishers.isEmpty()); + Assertions.assertFalse(producers.isEmpty()); Assertions.assertFalse(consumers.isEmpty()); - final Set grpcAddresses = publishers.get(0).grpcAddresses(); + final Set grpcAddresses = producers.get(0).grpcAddresses(); final Set consumerAddresses = consumers.get(0).grpcAddresses(); final Optional publisherAddress = grpcAddresses.stream().findFirst(); @@ -390,7 +391,7 @@ void testPublishAndConsumeData() { consumerAddresses.stream().findFirst(); Assertions.assertTrue(consumerAddress.isPresent()); - final ManagedChannel publisherChannel = + final ManagedChannel producerChannel = ManagedChannelBuilder.forAddress( publisherAddress.get().getHostName(), publisherAddress.get().getPort()) .usePlaintext() @@ -402,9 +403,8 @@ void testPublishAndConsumeData() { .usePlaintext() .build(); - final PublisherServiceBlockingStub pService = - PublisherServiceGrpc.newBlockingStub(publisherChannel); - final ConsumerServiceStub cService = ConsumerServiceGrpc.newStub(consumerChannel); + final ProducerBlockingStub producer = ProducerGrpc.newBlockingStub(producerChannel); + final ConsumerServiceStub consumer = ConsumerServiceGrpc.newStub(consumerChannel); final List users = Instancio.ofList(User.class) @@ -415,46 +415,52 @@ void testPublishAndConsumeData() { .generate(Select.field(User::getAge), Generators::ints) .create(); - final PublishBatchRequest.Builder requestBuilder = PublishBatchRequest.newBuilder(); + final ProduceRequest.Builder requestBuilder = + ProduceRequest.newBuilder().setQueue(queueName); for (User user : users) { requestBuilder.addMessages( - BatchRequestMessage.newBuilder() + ProduceMessage.newBuilder() .setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user)))); } - final PublishBatchRequest publishRequest = requestBuilder.setQueue(queueName).build(); - pService.publishBatch(publishRequest); + final ProduceRequest produceRequest = requestBuilder.build(); + producer.produce(produceRequest); final Set result = new CopyOnWriteArraySet<>(); - cService.subscribe( - SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName).build(), - new StreamObserver<>() { - @Override - public void onNext(SubscriptionNotifications value) { - value.getNotificationsList().stream() - .map( - n -> { - try { - return MAPPER.readValue( - n.getMessage().getPayload().toByteArray(), User.class); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - .forEach(result::add); - } - - @Override - public void onError(Throwable t) {} - - @Override - public void onCompleted() {} - }); + StreamObserver requestsStream = + consumer.subscribe( + new StreamObserver() { + @Override + public void onNext(SubscriptionStreamResponse response) { + response.getNotifications().getNotificationsList().stream() + .map( + n -> { + try { + return MAPPER.readValue( + n.getMessage().getPayload().toByteArray(), User.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach(result::add); + } + + @Override + public void onError(Throwable t) {} + + @Override + public void onCompleted() {} + }); + requestsStream.onNext( + SubscriptionStreamRequest.newBuilder() + .setSubscribeRequest( + SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName)) + .build()); Unreliables.retryUntilTrue( 5, TimeUnit.SECONDS, () -> new LinkedHashSet<>(users).size() == result.size()); Assertions.assertEquals(new LinkedHashSet<>(users), result); consumerChannel.shutdownNow(); - publisherChannel.shutdownNow(); + producerChannel.shutdownNow(); } }); } diff --git a/testcontainers/src/test/proto/messages/cursor.proto b/testcontainers/src/test/proto/messages/cursor.proto new file mode 100644 index 00000000..7d616758 --- /dev/null +++ b/testcontainers/src/test/proto/messages/cursor.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package tarantool.queue_ee; + +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol"; + +// Cursor — позиция внутри очереди consumer-group с привязкой к партиции. +// Схема Lua-оркестратора представляет это же значение как массив из двух +// элементов [partition_id, message_id]; см. client/consumer/group/cursor.go — +// in-process Go-аналог (consumer.Cursor). +message Cursor { + // Идентификатор партиции, на которую указывает курсор. + uint32 partition_id = 1; + + // Идентификатор сообщения внутри партиции. + uint64 message_id = 2; +} diff --git a/testcontainers/src/test/proto/messages/message.proto b/testcontainers/src/test/proto/messages/message.proto index a9526ac8..a6e255bd 100644 --- a/testcontainers/src/test/proto/messages/message.proto +++ b/testcontainers/src/test/proto/messages/message.proto @@ -2,15 +2,15 @@ syntax = "proto3"; package tarantool.queue_ee; -option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol"; // Пара ключ-значение message Pair { // Ключ пары - string key = 1; + bytes key = 1; // Значение пары - string value = 2; + bytes value = 2; } // Сообщение в очереди @@ -24,28 +24,24 @@ message QueueMessage { // Ключ маршрутизации сообщения (тип сообщения) // необходим для фильтрации сообщений из очереди на консьюмерах - optional string routing_key = 3; + optional bytes routing_key = 3; // Ключ шардирования // необходим для распределения данных в системе - optional string sharding_key = 4; + optional bytes sharding_key = 4; // Ключ дедупликации // необходим для проверки повторных сообщений, // если не указан, то проверка не производится - optional string deduplication_key = 5; + optional bytes deduplication_key = 5; // Произвольные данные в бинарном формате, содержит тело сообщения bytes payload = 6; - // Произвольные данные в бинарном формате, - // содержит дополнительные для сообщения данные, - // необходимые для отладки и трассировки - map metadata = 7 [deprecated = true]; + // Произвольные данные в формате списка из пар ключ-значение. + repeated Pair metadata = 7; // Время вставки сообщения в очередь в наносекундах int64 timestamp = 8; - // Произвольные данные в формате списка из пар ключ-значения - repeated Pair metadata_pairs = 9; } diff --git a/testcontainers/src/test/proto/services/consumer.proto b/testcontainers/src/test/proto/services/consumer.proto index c7eaf0b1..b095c5fa 100644 --- a/testcontainers/src/test/proto/services/consumer.proto +++ b/testcontainers/src/test/proto/services/consumer.proto @@ -2,14 +2,31 @@ syntax = "proto3"; package tarantool.queue_ee; +import "messages/cursor.proto"; import "messages/message.proto"; -option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol"; // Сервер подписок на сообщения брокера очередей service ConsumerService { // Подписка на сообщения с фильтром - rpc Subscribe(SubscriptionRequest) returns (stream SubscriptionNotifications); + rpc Subscribe(stream SubscriptionStreamRequest) + returns (stream SubscriptionStreamResponse); + + // Подписка на сообщения Consumer Group + rpc ConsumeGroup(stream ConsumeGroupStreamRequest) + returns (stream ConsumeGroupStreamResponse); +} + +// Поток запросов подписки. +message SubscriptionStreamRequest { + oneof request { + // Запрос на подписку + SubscriptionRequest subscribe_request = 1; + + // Запрос на обновление состояния подписки. + CommitRequest commit_request = 2; + } } // Запрос на подписку @@ -38,6 +55,32 @@ message SubscriptionRequest { // Ключи шардирования позволяют производить фильтрацию по нескольким ключам // шардирования в рамках одной подписки repeated string sharding_keys = 5; + + // Идентификатор состояния подписки. + optional string consume_id = 6; + + // Время жизни состояние подписки. + optional float ttl = 7; + + // Флаг чтения с последнего сохраненного состояния. + optional bool restore = 8; +} + +// Запрос на обновление состояния подписки. +message CommitRequest { + // Указатель на последнее обработанное сообщение. + string cursor = 1; +} + +// Поток ответов подписки. +message SubscriptionStreamResponse { + oneof response { + // Сообщение в стриме подписки + SubscriptionNotifications notifications = 1; + + // Ответ на обновление состояния подписки. + CommitResponse commit_response = 2; + } } // Сообщение в стриме подписки @@ -46,7 +89,7 @@ message SubscriptionNotifications { repeated SubscriptionNotification notifications = 1; } -// Уведомление клиента о новых сообщения в очереди +// Уведомление клиента о новых сообщениях в очереди message SubscriptionNotification { // Строка-указатель сообщения string cursor = 1; @@ -54,3 +97,73 @@ message SubscriptionNotification { // Сообщение QueueMessage message = 2; } + +// Ответ на обновление состояния подписки. +message CommitResponse { + // Обновленная строка-указатель сообщения. + string cursor = 1; +} + +// Поток запросов к Consumer Group +message ConsumeGroupStreamRequest { + oneof request { + // Запрос к Consumer Group + ConsumeGroupRequest consume_request = 1; + + // Запрос на подтверждение обработки сообщения + CommitGroupRequest commit_request = 2; + } +} + +// Поток ответов Consumer Group +message ConsumeGroupStreamResponse { + oneof response { + // Сообщение в стриме подписки + ConsumeGroupResponse consume_response = 1; + + // Ответ на подтверждение обработки сообщения + CommitGroupResponse commit_response = 2; + } +} + +// Запрос в Consumer Group +message ConsumeGroupRequest { + // Идентификатор группы потребителей + string group_id = 1; + + // Название очереди + string queue = 2; + + // Опциональный ключ фильтрации + optional string routing_key = 3; + + // Время жизни состояния подписки + optional float ttl = 4; +} + +// Ответ с потребленными сообщениями группы +message ConsumeGroupResponse { + // Список потребленных сообщений + repeated ConsumeGroupNotification notifications = 1; +} + +// Уведомление о сообщении в Consumer Group +message ConsumeGroupNotification { + // Указатель на текущее сообщение + Cursor cursor = 1; + + // Сообщение + QueueMessage message = 2; +} + +// Запрос на подтверждение обработки сообщения Consumer Group +message CommitGroupRequest { + // Указатель на сообщение + Cursor cursor = 1; +} + +// Ответ на подтверждение обработки сообщения Consumer Group +message CommitGroupResponse { + // Обновленный указатель сообщения + Cursor cursor = 1; +} diff --git a/testcontainers/src/test/proto/services/publisher.proto b/testcontainers/src/test/proto/services/producer.proto similarity index 50% rename from testcontainers/src/test/proto/services/publisher.proto rename to testcontainers/src/test/proto/services/producer.proto index 6269daab..47d860a2 100644 --- a/testcontainers/src/test/proto/services/publisher.proto +++ b/testcontainers/src/test/proto/services/producer.proto @@ -4,23 +4,18 @@ package tarantool.queue_ee; import "messages/message.proto"; -option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v2/server/protocol"; +option go_package = "gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/server/protocol"; // Сервер публикации сообщений брокера очередей -service PublisherService { - // Публикация сообщения в очередь - rpc Publish(PublishRequest) returns (PublishResponse); - // Публикация сообщений в очередь через двусторонний стрим - rpc PublishStream(stream PublishStreamRequest) returns (stream PublishStreamResponse); - +service Producer { // Публикация набора сообщений в очередь - rpc PublishBatch(PublishBatchRequest) returns (PublishBatchResponse); + rpc Produce(ProduceRequest) returns (ProduceResponse); // Публикация набора сообщений в очередь через двусторонний стрим - rpc PublishBatchStream( - stream PublishBatchStreamRequest - ) returns ( - stream PublishBatchStreamResponse - ); + rpc ProduceStream( + stream ProduceStreamRequest + ) returns ( + stream ProduceStreamResponse + ); // Публикация сообщения на указанные шарды очереди rpc Broadcast(BroadcastRequest) returns (BroadcastResponse); @@ -35,144 +30,68 @@ enum Deduplication { DEDUPLICATION_KEEP_LATEST = 4; } -// Запрос на публикацию сообщения в очередь -message PublishRequest { - // Название очереди в которой необходимо опубликовать сообщение - string queue = 1; - - // Ключ маршрутизации сообщения (тип сообщения) - // необходим для фильтрации сообщений из очереди на консьюмерах - optional string routing_key = 2; - - // Ключ шардирования - // необходим для распределения данных в системе - optional string sharding_key = 3; - - // Ключ дедупликации - // необходим для проверки повторных сообщений, - // если не указан, то проверка не производится - optional string deduplication_key = 4; - - // Произвольные данные в бинарном формате, содержит тело сообщения - bytes payload = 5; - - // Произвольные данные в бинарном формате, - // содержит дополнительные для сообщения данные, - // необходимые для отладки и трассировки - map metadata = 6 [deprecated = true]; - - // Произвольные данные в формате списка из пар ключ-значения - repeated Pair metadata_pairs = 7; - - // Режим дедупликации сообщения - Deduplication deduplication = 8; -} - // Запрос на публикация набора сообщений в очередь -message PublishBatchRequest { +message ProduceRequest { // Название очереди в которой необходимо опубликовать сообщения string queue = 1; // Ключ шардирования // необходим для распределения данных в системе - optional string sharding_key = 2; + optional bytes sharding_key = 2; // Набор сообщений - repeated BatchRequestMessage messages = 3; + repeated ProduceMessage messages = 3; - // Содержит дополнительные данные необходимые для отладки и трассировки - map metadata = 4 [deprecated = true]; - - // Произвольные данные в формате списка из пар ключ-значения - repeated Pair metadata_pairs = 5; + // Произвольные данные в формате списка из пар ключ-значение. + repeated Pair metadata = 4; // Режим дедупликации сообщения - Deduplication deduplication = 8; + Deduplication deduplication = 5; } -// Набор сообщений -message BatchRequestMessage { +// Сообщение для публикации +message ProduceMessage { // Ключ маршрутизации сообщения (тип сообщения) // необходим для фильтрации сообщений из очереди на консьюмерах - optional string routing_key = 1; + optional bytes routing_key = 1; // Ключ дедупликации // Необходим для проверки повторных сообщений, // если не указан, то проверка не производится - optional string deduplication_key = 2; + optional bytes deduplication_key = 2; // Произвольные данные в бинарном формате, содержит тело сообщения bytes payload = 3; - // Произвольные данные в бинарном формате, - // содержит дополнительные для сообщения данные, - // необходимые для отладки и трассировки - map metadata = 4 [deprecated = true]; - - // Произвольные данные в формате списка из пар ключ-значения - repeated Pair metadata_pairs = 5; + // Произвольные данные в формате списка из пар ключ-значение. + repeated Pair metadata = 4; } // Ответ на публикацию набора сообщений -message PublishBatchResponse { +message ProduceResponse { // Идентификаторы сообщений repeated uint64 ids = 1; - // Содержит дополнительные данные необходимые для отладки и трассировки - map metadata = 2 [deprecated = true]; // Флаги наличия дубликатов - repeated bool is_duplicates = 3; -} - -// Ответ на публикацию сообщения -message PublishResponse { - // Идентификатор сообщения добавленного в очередь - // (возможно не нужно) - uint64 id = 1; - // Содержит дополнительные данные необходимые для отладки и трассировки - map metadata = 2 [deprecated = true]; - // Если true, то был дубликат сообщения - bool is_duplicate = 3; -} - -// Зарос на публикацию сообщения через двусторонний стрим -message PublishStreamRequest { - // Идентификатор запроса на публикацию сообщения - uint64 request_id = 1; - - // Запрос на публикацию сообщения - PublishRequest request = 2; -} - -// Ответ на публикацию сообщения через двусторонний стрим -message PublishStreamResponse { - // Идентификатор запроса на публикацию сообщения - uint64 request_id = 1; - - oneof result { - // Сообщение об успешной публикации - PublishResponse success = 2; - // Сообщение об ошибке публикации - Error error = 3; - } + repeated bool is_duplicates = 2; } // Запрос на публикацию набора сообщений через двусторонний стрим -message PublishBatchStreamRequest { +message ProduceStreamRequest { // Идентификатор запроса на публикацию сообщения uint64 request_id = 1; // Запрос на публикацию набора сообщений - PublishBatchRequest request = 2; + ProduceRequest request = 2; } // Ответ на публикацию набора сообщений через двусторонний стрим -message PublishBatchStreamResponse { +message ProduceStreamResponse { // Идентификатор запроса на публикацию сообщения uint64 request_id = 1; oneof result { // Сообщение об успешной публикации - PublishBatchResponse success = 2; + ProduceResponse success = 2; // Сообщение об ошибке публикации Error error = 3; } @@ -185,20 +104,18 @@ message BroadcastRequest { // Ключ маршрутизации сообщения (тип сообщения) // необходим для фильтрации сообщений из очереди на консьюмерах - optional string routing_key = 2; + optional bytes routing_key = 2; // Ключ дедупликации // необходим для проверки повторных сообщений, // если не указан, то проверка не производится - optional string deduplication_key = 3; + optional bytes deduplication_key = 3; // Произвольные данные в бинарном формате, содержит тело сообщения bytes payload = 4; - // Произвольные данные в бинарном формате, - // содержит дополнительные для сообщения данные, - // необходимые для отладки и трассировки - map metadata = 5 [deprecated = true]; + // Произвольные данные в формате списка из пар ключ-значение. + repeated Pair metadata = 5; // Список с названиями репликасетов, на которые нужно опубликовать сообщение. // По умолчанию рассылка происходит на все шарды. @@ -207,11 +124,8 @@ message BroadcastRequest { // Максимальное время на рассылку сообщения optional uint64 timeout = 7; - // Произвольные данные в формате списка из пар ключ-значения - repeated Pair metadata_pairs = 8; - // Режим дедупликации сообщения - Deduplication deduplication = 9; + Deduplication deduplication = 8; } // Сообщение об успешной публикации @@ -255,7 +169,4 @@ message BroadcastResponse { // Набор ответов с шардов map replicasets = 3; - - // Содержит дополнительные данные необходимые для отладки и трассировки - map metadata = 4 [deprecated = true]; } diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml b/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml index d8044bac..2c4c609f 100644 --- a/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml +++ b/testcontainers/src/test/resources/tqe/simple-config/simple-grpc.yml @@ -1,21 +1,29 @@ core_port: 1111 grpc_listen: - - uri: 'tcp://0.0.0.0:18182' +- uri: 'tcp://0.0.0.0:18182' -publisher: +producer: enabled: true tarantool: user: test-super pass: test + initial_connection: + retry_count: 30 + retry_timeout: 120s + retry_delay: 5s connections: routers: - - "router:3301" + - "router:3301" consumer: enabled: true tarantool: user: test-super pass: test + initial_connection: + retry_count: 30 + retry_timeout: 120s + retry_delay: 5s connections: storage: - - "master:3301" + - "master:3301" diff --git a/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml b/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml index 2b9e3813..8827657f 100644 --- a/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml +++ b/testcontainers/src/test/resources/tqe/simple-config/simple-queue.yml @@ -3,15 +3,19 @@ credentials: users: test-super: password: 'test' - roles: [ super ] + roles: + - super admin: password: 'secret-cluster-cookie' - roles: [ super ] + roles: + - super replicator: password: 'secret' - roles: [ replication ] + roles: + - replication storage: - roles: [ sharding ] + roles: + - sharding password: storage # advertise configs for all nodes @@ -23,45 +27,51 @@ iproto: login: storage password: storage -roles: [ roles.metrics-export ] +roles: +- roles.metrics-export + # queues configs roles_cfg: - app.roles.queue: + roles.tqe-storage: queues: - - name: test - deduplication_mode: keep_latest - disabled_filters_by: [ sharding_key ] + - name: test + deduplication_mode: keep_latest + disabled_filters_by: [ sharding_key ] roles.metrics-export: http: - - listen: 8081 - endpoints: - - format: prometheus - path: '/metrics' + - listen: 8081 + endpoints: + - format: prometheus + path: '/metrics' groups: routers: replicasets: - r-1: + router-1: sharding: - roles: [ router ] - roles: [ app.roles.api ] + roles: + - router + roles: + - roles.tqe-router instances: router: iproto: listen: - - uri: router:3301 + - uri: router:3301 + storages: replicasets: - shard-1: + storage-1: replication: failover: manual sharding: - roles: [ storage ] - roles: [ app.roles.queue ] + roles: + - storage + roles: + - roles.tqe-storage leader: master instances: master: iproto: listen: - - uri: master:3301 - + - uri: master:3301