Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Testcontainers

- Add constructor/builder parameters to supply the initial Lua script as a string or as a file path, and optional additional script paths copied into the container data directory (`Tarantool2Container`, `CartridgeClusterContainer`, `VshardClusterContainer`); simplify bundled `server.lua` accordingly.
- Upgrade TQE to v3.5.0.

### Documentation

Expand Down Expand Up @@ -65,7 +66,7 @@
- Fix a memory leak due to the use of a local thread pool

### Spring-data
- Add `tarantool-spring-data-35` module with support for Spring Boot 3.5.x and Spring Data 3.5
- Add `tarantool-spring-data-35` module with support for Spring Boot 3.5.x and Spring Data 3.5

### Client
- Add `TupleMapper` utility class for easy tuple-to-POJO mapping using field format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface GrpcContainer<SELF extends GrpcContainer<SELF>>
enum GrpcRole {
CONSUMER("consumer"),

PUBLISHER("publisher");
PRODUCER("producer");

private final String role;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.testcontainers.containers.tqe.configuration.grpc.ConsumerConfig;
import org.testcontainers.containers.tqe.configuration.grpc.GrpcConfiguration;
import org.testcontainers.containers.tqe.configuration.grpc.GrpcListen;
import org.testcontainers.containers.tqe.configuration.grpc.PublisherConfig;
import org.testcontainers.containers.tqe.configuration.grpc.ProducerConfig;
import org.testcontainers.containers.utils.Utils;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -223,11 +223,10 @@ private static Set<Integer> resolveGrpcPorts(GrpcConfiguration config, Path conf
* (required).
*/
private static Set<GrpcRole> resolveRoles(GrpcConfiguration config, Path configPath) {
final Optional<Boolean> isPublisher =
config.getPublisher().flatMap(PublisherConfig::getEnabled);
final Optional<Boolean> isPublisher = config.getProducer().flatMap(ProducerConfig::getEnabled);
final Set<GrpcRole> roles = new LinkedHashSet<>();
if (isPublisher.isPresent() && isPublisher.get()) {
roles.add(GrpcRole.PUBLISHER);
roles.add(GrpcRole.PRODUCER);
LOGGER.trace("Publisher role is enabled for: {}", configPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,8 @@ public synchronized void start() {
this.queue().size(),
this.grpc().size());

startParallel(this.configurator.queue(), this.configurator);
startParallel(this.configurator.grpc(), this.configurator);
if (this.configurator.isConfigured()) {
LOGGER.warn(
"TQE cluster [name = {}, queue = {}, grpc = {}] already configured",
this.configurator.clusterName(),
this.configurator.queue().size(),
this.configurator.grpc().size());
return;
}

this.configurator.configure();
startTarantoolCluster();
startGrpcEndpoints();
}

@Override
Expand Down Expand Up @@ -95,6 +85,24 @@ private static void startParallel(
}
}

private void startTarantoolCluster() {
startParallel(this.configurator.queue(), this.configurator);
if (this.configurator.isConfigured()) {
LOGGER.warn(
"TQE cluster [name = {}, queue = {}, grpc = {}] already configured",
this.configurator.clusterName(),
this.configurator.queue().size(),
this.configurator.grpc().size());
return;
}

this.configurator.configure();
}

private void startGrpcEndpoints() {
startParallel(this.configurator.grpc(), this.configurator);
}

@Override
public String clusterName() {
return this.configurator.clusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class FileTQEConfigurator implements TQEConfigurator {
private static final String CONFIGURATOR_ERROR_MSG =
"An error occurred when configuring the TQE cluster. See logs for details.";

private static final String TQE_ROUTER_ROLE = "app.roles.api";
private static final String TQE_ROUTER_ROLE = "roles.tqe-router";

/*
/**********************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
EtcdAddr string `mapstructure:"etcd_addr"`
Daemon bool `mapstructure:"daemon"`

Publisher PublisherConfig `mapstructure:"publisher"`
Publisher ProducerConfig `mapstructure:"producer"`
Consumer ConsumerConfig `mapstructure:"consumer"`
MockServer MockServerConfig `mapstructure:"mock_server"`

Expand Down Expand Up @@ -72,8 +72,8 @@ public class GrpcConfiguration {
@JsonProperty("daemon")
private final Boolean daemon;

@JsonProperty("publisher")
private final PublisherConfig publisher;
@JsonProperty("producer")
private final ProducerConfig producer;

@JsonProperty("consumer")
private final ConsumerConfig consumer;
Expand All @@ -100,7 +100,7 @@ public GrpcConfiguration(
@JsonProperty("config_path") String configPath,
@JsonProperty("etcd_addr") String etcdAddr,
@JsonProperty("daemon") Boolean daemon,
@JsonProperty("publisher") PublisherConfig publisher,
@JsonProperty("producer") ProducerConfig producer,
@JsonProperty("consumer") ConsumerConfig consumer,
@JsonProperty("mock_server") MockServerConfig mockServer,
@JsonProperty("tracing") TracingConfig tracing,
Expand All @@ -116,7 +116,7 @@ public GrpcConfiguration(
this.configPath = configPath;
this.etcdAddr = etcdAddr;
this.daemon = daemon;
this.publisher = publisher;
this.producer = producer;
this.consumer = consumer;
this.mockServer = mockServer;
this.tracing = tracing;
Expand Down Expand Up @@ -167,8 +167,8 @@ public Optional<Boolean> getDaemon() {
return Optional.ofNullable(this.daemon);
}

public Optional<PublisherConfig> getPublisher() {
return Optional.ofNullable(this.publisher);
public Optional<ProducerConfig> getProducer() {
return Optional.ofNullable(this.producer);
}

public Optional<ConsumerConfig> getConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import com.fasterxml.jackson.annotation.JsonProperty;

/*
type PublisherConfig struct {
type ProducerConfig struct {
Enabled bool `mapstructure:"enabled"`
LocalRouting bool `mapstructure:"local_routing"`
Tarantool TarantoolClientConfig `mapstructure:"tarantool"`
}
*/
public class PublisherConfig {
public class ProducerConfig {

@JsonProperty("enabled")
private final Boolean enabled;
Expand All @@ -29,7 +29,7 @@ public class PublisherConfig {
private final TarantoolClientConfig tarantool;

@JsonCreator
public PublisherConfig(
public ProducerConfig(
@JsonProperty("enabled") Boolean enabled,
@JsonProperty("local_routing") Boolean localRoutingEnabled,
@JsonProperty("tarantool") TarantoolClientConfig tarantool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class CommonTest {
protected static final DockerImageName IMAGE_NAME =
DockerImageName.parse(
System.getenv().getOrDefault("TARANTOOL_REGISTRY", "")
+ "tarantool/message-queue-ee:2.5.3");
+ "tarantool/message-queue-ee:v3.5.0");

protected static final Path SIMPLE_GRPC_CONFIG;
protected static final Path SIMPLE_QUEUE_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void simpleConfiguration() {
FileTQEConfigurator.builder(IMAGE_NAME, SIMPLE_QUEUE_CONFIG, Set.of(SIMPLE_GRPC_CONFIG))
.build()) {
configurator.queue().values().parallelStream().forEach(Startable::start);
configurator.configure();
configurator.grpc().values().parallelStream().forEach(Startable::start);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@
import org.testcontainers.containers.tqe.configuration.TQEConfigurator;
import org.testcontainers.containers.utils.pojo.User;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import tarantool.queue_ee.Consumer.SubscriptionNotifications;
import tarantool.queue_ee.Consumer.SubscriptionRequest;
import tarantool.queue_ee.Consumer.SubscriptionStreamRequest;
import tarantool.queue_ee.Consumer.SubscriptionStreamResponse;
import tarantool.queue_ee.ConsumerServiceGrpc;
import tarantool.queue_ee.ConsumerServiceGrpc.ConsumerServiceStub;
import tarantool.queue_ee.Publisher.BatchRequestMessage;
import tarantool.queue_ee.Publisher.PublishBatchRequest;
import tarantool.queue_ee.PublisherServiceGrpc;
import tarantool.queue_ee.PublisherServiceGrpc.PublisherServiceBlockingStub;
import tarantool.queue_ee.ProducerGrpc;
import tarantool.queue_ee.ProducerGrpc.ProducerBlockingStub;
import tarantool.queue_ee.ProducerOuterClass.ProduceMessage;
import tarantool.queue_ee.ProducerOuterClass.ProduceRequest;

class TQEClusterImplTest extends CommonTest {

Expand Down Expand Up @@ -237,7 +238,7 @@ public static Stream<Path> dataForTestInvalidGrpcConfig() {
grpc_listen:
- uri: 'tcp://0.0.0.0:18182'

publisher:
producer:
enabled: true
tarantool:
user: test-super
Expand All @@ -255,13 +256,13 @@ public static Stream<Path> dataForTestInvalidGrpcConfig() {
storage:
- "master:3301"
""",
// no consumers and publishers
// no consumers and producers
"""
core_port: 1111
grpc_listen:
- uri: 'tcp://0.0.0.0:18182'

publisher:
producer:
enabled: false
tarantool:
user: test-super
Expand All @@ -284,7 +285,7 @@ public static Stream<Path> dataForTestInvalidGrpcConfig() {
grpc_listen:
- uri: 'tcp://0.0.0.0:18182'

publisher:
producer:
enabled: true
tarantool:
user: test-super
Expand All @@ -306,7 +307,7 @@ public static Stream<Path> dataForTestInvalidGrpcConfig() {
"""
core_port: 1111

publisher:
producer:
enabled: true
tarantool:
user: test-super
Expand Down Expand Up @@ -369,19 +370,19 @@ void testPublishAndConsumeData() {

final String queueName = "test";

final List<GrpcContainer<?>> publishers =
final List<GrpcContainer<?>> producers =
cluster.grpc().values().stream()
.filter(g -> g.roles().contains(GrpcRole.PUBLISHER))
.filter(g -> g.roles().contains(GrpcRole.PRODUCER))
.toList();
final List<GrpcContainer<?>> consumers =
cluster.grpc().values().stream()
.filter(g -> g.roles().contains(GrpcRole.CONSUMER))
.toList();

Assertions.assertFalse(publishers.isEmpty());
Assertions.assertFalse(producers.isEmpty());
Assertions.assertFalse(consumers.isEmpty());

final Set<InetSocketAddress> grpcAddresses = publishers.get(0).grpcAddresses();
final Set<InetSocketAddress> grpcAddresses = producers.get(0).grpcAddresses();
final Set<InetSocketAddress> consumerAddresses = consumers.get(0).grpcAddresses();

final Optional<InetSocketAddress> publisherAddress = grpcAddresses.stream().findFirst();
Expand All @@ -390,7 +391,7 @@ void testPublishAndConsumeData() {
consumerAddresses.stream().findFirst();
Assertions.assertTrue(consumerAddress.isPresent());

final ManagedChannel publisherChannel =
final ManagedChannel producerChannel =
ManagedChannelBuilder.forAddress(
publisherAddress.get().getHostName(), publisherAddress.get().getPort())
.usePlaintext()
Expand All @@ -402,9 +403,8 @@ void testPublishAndConsumeData() {
.usePlaintext()
.build();

final PublisherServiceBlockingStub pService =
PublisherServiceGrpc.newBlockingStub(publisherChannel);
final ConsumerServiceStub cService = ConsumerServiceGrpc.newStub(consumerChannel);
final ProducerBlockingStub producer = ProducerGrpc.newBlockingStub(producerChannel);
final ConsumerServiceStub consumer = ConsumerServiceGrpc.newStub(consumerChannel);

final List<User> users =
Instancio.ofList(User.class)
Expand All @@ -415,46 +415,52 @@ void testPublishAndConsumeData() {
.generate(Select.field(User::getAge), Generators::ints)
.create();

final PublishBatchRequest.Builder requestBuilder = PublishBatchRequest.newBuilder();
final ProduceRequest.Builder requestBuilder =
ProduceRequest.newBuilder().setQueue(queueName);
for (User user : users) {
requestBuilder.addMessages(
BatchRequestMessage.newBuilder()
ProduceMessage.newBuilder()
.setPayload(ByteString.copyFrom(MAPPER.writeValueAsBytes(user))));
}
final PublishBatchRequest publishRequest = requestBuilder.setQueue(queueName).build();
pService.publishBatch(publishRequest);
final ProduceRequest produceRequest = requestBuilder.build();
producer.produce(produceRequest);

final Set<User> result = new CopyOnWriteArraySet<>();
cService.subscribe(
SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName).build(),
new StreamObserver<>() {
@Override
public void onNext(SubscriptionNotifications value) {
value.getNotificationsList().stream()
.map(
n -> {
try {
return MAPPER.readValue(
n.getMessage().getPayload().toByteArray(), User.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.forEach(result::add);
}

@Override
public void onError(Throwable t) {}

@Override
public void onCompleted() {}
});
StreamObserver<SubscriptionStreamRequest> requestsStream =
consumer.subscribe(
new StreamObserver<SubscriptionStreamResponse>() {
@Override
public void onNext(SubscriptionStreamResponse response) {
response.getNotifications().getNotificationsList().stream()
.map(
n -> {
try {
return MAPPER.readValue(
n.getMessage().getPayload().toByteArray(), User.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.forEach(result::add);
}

@Override
public void onError(Throwable t) {}

@Override
public void onCompleted() {}
});
requestsStream.onNext(
SubscriptionStreamRequest.newBuilder()
.setSubscribeRequest(
SubscriptionRequest.newBuilder().setCursor("").setQueue(queueName))
.build());

Unreliables.retryUntilTrue(
5, TimeUnit.SECONDS, () -> new LinkedHashSet<>(users).size() == result.size());
Assertions.assertEquals(new LinkedHashSet<>(users), result);
consumerChannel.shutdownNow();
publisherChannel.shutdownNow();
producerChannel.shutdownNow();
}
});
}
Expand Down
Loading