From ba235d09404e86cc2c5508b205511e3548086afc Mon Sep 17 00:00:00 2001 From: ArtDu Date: Tue, 31 Mar 2026 15:58:11 +0300 Subject: [PATCH] feat(tracing): add request/response handlers Add Handlers class with 4 lifecycle callbacks: - onBeforeSend: called before sending request - onSuccess: called on successful response - onTimeout: called when request times out - onIgnoredResponse: called when response arrives after timeout Handlers are configured at client level via withHandlers() builder method and propagated through: Builder -> ClientImpl -> Pool -> PoolEntry -> IProtoClient Co-Authored-By: Claude Opus 4.6 --- .../factory/TarantoolBoxClientBuilder.java | 16 + .../factory/TarantoolBoxClientImpl.java | 3 + .../client/factory/TarantoolClientImpl.java | 4 + .../factory/TarantoolCrudClientBuilder.java | 9 + .../factory/TarantoolCrudClientImpl.java | 4 + .../TarantoolDataGridClientBuilder.java | 16 + .../factory/TarantoolDataGridClientImpl.java | 3 + .../integration/TarantoolBoxClientTest.java | 297 +++++++++++++++++- .../java/io/tarantool/core/IProtoClient.java | 3 + .../io/tarantool/core/IProtoClientImpl.java | 17 +- .../io/tarantool/core/protocol/Handlers.java | 145 +++++++++ .../core/protocol/IProtoRequestOpts.java | 10 + .../protocol/fsm/RequestStateMachine.java | 28 +- .../tarantool/pool/IProtoClientPoolImpl.java | 66 +++- .../java/io/tarantool/pool/PoolEntry.java | 7 + 15 files changed, 617 insertions(+), 11 deletions(-) create mode 100644 tarantool-core/src/main/java/io/tarantool/core/protocol/Handlers.java diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java index e463b407..0ecff871 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java @@ -34,6 +34,7 @@ import io.tarantool.client.box.TarantoolBoxClient; import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -159,6 +160,9 @@ public class TarantoolBoxClientBuilder { /** Optional listener for pool events. */ private PoolEventListener poolEventListener; + /** Custom handlers for protocol operations. */ + private Handlers handlers; + /** * Getter for {@link #options}. * @@ -819,6 +823,17 @@ public TarantoolBoxClientBuilder withPoolEventListener(PoolEventListener listene return this; } + /** + * Sets custom handlers for protocol operations. + * + * @param handlers handlers instance + * @return {@link TarantoolBoxClientBuilder} object. + */ + public TarantoolBoxClientBuilder withHandlers(Handlers handlers) { + this.handlers = handlers; + return this; + } + /** * Builds specific {@link TarantoolBoxClient} class instance with parameters. * @@ -859,6 +874,7 @@ public TarantoolBoxClient build() throws Exception { reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, poolEventListener); } diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java index 6b377755..72e7b540 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java @@ -23,6 +23,7 @@ import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; import io.tarantool.core.exceptions.ClientException; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -119,6 +120,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant long reconnectAfter, MeterRegistry metricsRegistry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, SslContext sslContext, PoolEventListener poolEventListener) throws InvocationTargetException, @@ -138,6 +140,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, !fetchSchema, poolEventListener); diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java index 9ca8749a..9b337710 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java @@ -33,6 +33,7 @@ import io.tarantool.core.WatcherOptions; import io.tarantool.core.connection.ConnectionFactory; import io.tarantool.core.exceptions.ServerException; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoRequestOpts; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.mapping.TarantoolJacksonMapping; @@ -94,6 +95,7 @@ abstract class TarantoolClientImpl implements TarantoolClient { * @param reconnectAfter time after which reconnect occurs * @param metricsRegistry micrometer {@link TarantoolClientImpl#metricsRegistry} * @param ignoredPacketsHandler handler for ignored IProto-packets. + * @param handlers handlers for request/response lifecycle events. * @param sslContext SslContext with settings for establishing SSL/TLS connection between * Tarantool. * @param useTupleExtension Use TUPLE_EXT feature if true. @@ -123,6 +125,7 @@ protected TarantoolClientImpl( long reconnectAfter, MeterRegistry metricsRegistry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, SslContext sslContext, boolean useTupleExtension, PoolEventListener poolEventListener) @@ -144,6 +147,7 @@ protected TarantoolClientImpl( watcherOpts, this.metricsRegistry, ignoredPacketsHandler, + handlers, useTupleExtension, poolEventListener); pool.setGroups(groups); diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java index 8fe89b00..0b74b67a 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java @@ -33,6 +33,7 @@ import io.tarantool.client.crud.TarantoolCrudClient; import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -148,6 +149,8 @@ public class TarantoolCrudClientBuilder { /** Optional listener for pool lifecycle events. */ private PoolEventListener poolEventListener; + private Handlers handlers; + public Map, Object> getOptions() { return options; } @@ -762,6 +765,11 @@ public TarantoolCrudClientBuilder withPoolEventListener(PoolEventListener listen return this; } + public TarantoolCrudClientBuilder withHandlers(Handlers handlers) { + this.handlers = handlers; + return this; + } + /** * Builds specific {@link TarantoolCrudClient} class instance with parameters. * @@ -800,6 +808,7 @@ public TarantoolCrudClient build() throws Exception { reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, useTupleExtension, poolEventListener); diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java index 8b420e42..b5dcbb45 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java @@ -20,6 +20,7 @@ import io.tarantool.client.crud.options.CrudOptions; import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -70,6 +71,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran * @param reconnectAfter see reconnectAfter in{@link TarantoolCrudClientBuilder}. * @param metricsRegistry see metricsRegistry in{@link TarantoolCrudClientBuilder}. * @param ignoredPacketsHandler see ignoredPacketsHandler in{@link TarantoolCrudClientBuilder}. + * @param handlers see handlers in{@link TarantoolCrudClientBuilder}. * @param sslContext see sslContext in{@link TarantoolCrudClientBuilder}. * @param useTupleExtension see useTupleExtension in{@link TarantoolCrudClientBuilder}. * @throws NoSuchMethodException if a matching method is not found. @@ -96,6 +98,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran long reconnectAfter, MeterRegistry metricsRegistry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, SslContext sslContext, boolean useTupleExtension, PoolEventListener poolEventListener) @@ -116,6 +119,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, useTupleExtension, poolEventListener); diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java index dedc00fc..cf65378f 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java @@ -33,6 +33,7 @@ import io.tarantool.client.tdg.TarantoolDataGridClient; import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -158,6 +159,9 @@ public class TarantoolDataGridClientBuilder { /** Optional listener for pool lifecycle events. */ private PoolEventListener poolEventListener; + /** Handlers for protocol messages. */ + private Handlers handlers; + public Map, Object> getOptions() { return options; } @@ -840,6 +844,17 @@ public TarantoolDataGridClientBuilder withCredentials(Map creden return this; } + /** + * Sets the {@link #handlers} parameter when constructing an instance of a builder class. + * + * @param handlers see {@link #handlers} field. + * @return {@link TarantoolDataGridClientBuilder} object. + */ + public TarantoolDataGridClientBuilder withHandlers(Handlers handlers) { + this.handlers = handlers; + return this; + } + /** * Builds specific {@link TarantoolDataGridClient} class instance with parameters. * @@ -878,6 +893,7 @@ public TarantoolDataGridClient build() throws Exception { reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, useTupleExtension, useTdg1Context, diff --git a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java index 396ed67d..08d9f9a9 100644 --- a/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java +++ b/tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java @@ -19,6 +19,7 @@ import io.tarantool.client.tdg.TarantoolDataGridSpace; import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.HeartbeatOpts; import io.tarantool.pool.InstanceConnectionGroup; @@ -70,6 +71,7 @@ protected TarantoolDataGridClientImpl( long reconnectAfter, MeterRegistry metricsRegistry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, SslContext sslContext, boolean useTupleExtension, boolean useTdg1Context, @@ -92,6 +94,7 @@ protected TarantoolDataGridClientImpl( reconnectAfter, metricsRegistry, ignoredPacketsHandler, + handlers, sslContext, useTupleExtension, poolEventListener); diff --git a/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java b/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java index b8ebf045..3b052760 100644 --- a/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java +++ b/tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; @@ -42,11 +43,20 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.value.Value; +import org.msgpack.value.ValueFactory; import org.testcontainers.containers.tarantool.TarantoolContainer; import org.testcontainers.containers.utils.TarantoolContainerClientHelper; import org.testcontainers.shaded.com.google.common.base.CaseFormat; import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL; +import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper; import io.tarantool.client.BaseOptions; import io.tarantool.client.ClientType; import io.tarantool.client.Options; @@ -61,7 +71,11 @@ import io.tarantool.client.operation.Operations; import io.tarantool.core.IProtoClient; import io.tarantool.core.protocol.BoxIterator; +import io.tarantool.core.protocol.ByteBodyValueWrapper; +import io.tarantool.core.protocol.Handlers; +import io.tarantool.core.protocol.IProtoRequest; import io.tarantool.core.protocol.IProtoResponse; +import io.tarantool.mapping.BaseTarantoolJacksonMapping; import io.tarantool.mapping.NilErrorResponse; import io.tarantool.mapping.SelectResponse; import io.tarantool.mapping.TarantoolResponse; @@ -248,25 +262,304 @@ public void testCallAndEval() { @Test public void testCallTimeoutWithIgnoredPacketsHandler() throws Exception { + List sentCallRequests = new ArrayList<>(); + List timedOutRequests = new ArrayList<>(); + List successResponses = new ArrayList<>(); + List ignoredResponses = new ArrayList<>(); + List> localTriplets = new ArrayList<>(); + + TarantoolBoxClient testClient = + TarantoolFactory.box() + .withUser(API_USER) + .withPassword(CREDS.get(API_USER)) + .withHost(tt.getHost()) + .withPort(tt.getFirstMappedPort()) + .withHandlers( + Handlers.builder() + .onBeforeSend( + request -> { + // Filter only CALL requests (ignore auth, schema fetch, etc.) + if (request.getRequestType() == IPROTO_TYPE_CALL) { + synchronized (sentCallRequests) { + sentCallRequests.add(request); + } + } + }) + .onSuccess( + response -> { + synchronized (successResponses) { + successResponses.add(response); + } + }) + .onTimeout( + request -> { + synchronized (timedOutRequests) { + timedOutRequests.add(request); + } + }) + .onIgnoredResponse( + response -> { + synchronized (ignoredResponses) { + ignoredResponses.add(response); + } + }) + .build()) + .withIgnoredPacketsHandler( + (tag, index, packet) -> { + synchronized (localTriplets) { + localTriplets.add(Arrays.asList(tag, index, packet)); + } + }) + .build(); + Options options = BaseOptions.builder().withTimeout(1_000L).build(); Exception ex = assertThrows( CompletionException.class, - () -> client.call("slow_echo", Arrays.asList(1, true), options).join()); + () -> testClient.call("slow_echo", Arrays.asList(1, true), options).join()); Throwable cause = ex.getCause(); assertEquals(TimeoutException.class, cause.getClass()); + + // Verify onBeforeSend was called for our CALL request + assertEquals( + 1, sentCallRequests.size(), "onBeforeSend handler should be called once for CALL request"); + IProtoRequest sentRequest = sentCallRequests.get(0); + assertEquals(IPROTO_TYPE_CALL, sentRequest.getRequestType()); + Thread.sleep(600); - assertEquals(1, triplets.size()); + assertEquals(1, localTriplets.size()); + + // Verify onSuccess was NOT called for timed out request + // Filter success responses for our CALL request syncId only + long sentSyncId = sentRequest.getSyncId(); + List matchingSuccessResponses = new ArrayList<>(); + synchronized (successResponses) { + for (IProtoResponse response : successResponses) { + if (response.hasSyncId() && response.getSyncId() == sentSyncId) { + matchingSuccessResponses.add(response); + } + } + } + assertEquals( + 0, + matchingSuccessResponses.size(), + "onSuccess should NOT be called for timed out request (syncId=" + sentSyncId + ")"); + + // Verify onBeforeSend, onTimeout, onIgnoredResponse and ignored packets all have matching + // syncId + long timeoutSyncId = assertRequestHandler(timedOutRequests, "onTimeout"); + long ignoredResponseSyncId = assertIgnoredResponseHandler(ignoredResponses); + long tripletSyncId = assertIgnoredPackets(localTriplets); + assertEquals(sentSyncId, timeoutSyncId, "onBeforeSend and onTimeout should have same syncId"); + assertEquals( + sentSyncId, + ignoredResponseSyncId, + "onBeforeSend and onIgnoredResponse should have same syncId"); + assertEquals(sentSyncId, tripletSyncId, "Request and triplet response syncId should match"); + + testClient.close(); + } + @Test + public void testCallSuccessWithHandlers() throws Exception { + List sentCallRequests = new ArrayList<>(); + List timedOutRequests = new ArrayList<>(); + List successResponses = new ArrayList<>(); + List ignoredResponses = new ArrayList<>(); + List> localTriplets = new ArrayList<>(); + + TarantoolBoxClient testClient = + TarantoolFactory.box() + .withUser(API_USER) + .withPassword(CREDS.get(API_USER)) + .withHost(tt.getHost()) + .withPort(tt.getFirstMappedPort()) + .withHandlers( + Handlers.builder() + .onBeforeSend( + request -> { + // Filter only CALL requests (ignore auth, schema fetch, etc.) + if (request.getRequestType() == IPROTO_TYPE_CALL) { + synchronized (sentCallRequests) { + sentCallRequests.add(request); + } + } + }) + .onSuccess( + response -> { + synchronized (successResponses) { + successResponses.add(response); + } + }) + .onTimeout( + request -> { + synchronized (timedOutRequests) { + timedOutRequests.add(request); + } + }) + .onIgnoredResponse( + response -> { + synchronized (ignoredResponses) { + ignoredResponses.add(response); + } + }) + .build()) + .withIgnoredPacketsHandler( + (tag, index, packet) -> { + synchronized (localTriplets) { + localTriplets.add(Arrays.asList(tag, index, packet)); + } + }) + .build(); + + // Call non-slow function that should succeed + List result = testClient.call("echo", Arrays.asList(1, true)).join().get(); + assertEquals(Arrays.asList(1, true), result); + + // Verify onBeforeSend was called for our CALL request + assertEquals( + 1, sentCallRequests.size(), "onBeforeSend handler should be called once for CALL request"); + IProtoRequest sentRequest = sentCallRequests.get(0); + assertEquals(IPROTO_TYPE_CALL, sentRequest.getRequestType()); + + // Verify onSuccess was called for our CALL request + long sentSyncId = sentRequest.getSyncId(); + List matchingSuccessResponses = new ArrayList<>(); + synchronized (successResponses) { + for (IProtoResponse response : successResponses) { + if (response.hasSyncId() && response.getSyncId() == sentSyncId) { + matchingSuccessResponses.add(response); + } + } + } + assertEquals( + 1, + matchingSuccessResponses.size(), + "onSuccess should be called for successful request (syncId=" + sentSyncId + ")"); + + // Verify onTimeout was NOT called + assertEquals( + 0, timedOutRequests.size(), "onTimeout should NOT be called for successful request"); + + // Verify onIgnoredResponse was NOT called + assertEquals( + 0, + ignoredResponses.size(), + "onIgnoredResponse should NOT be called for successful request"); + + // Verify triplet handler was NOT called + assertEquals( + 0, + localTriplets.size(), + "withIgnoredPacketsHandler should NOT be called for successful request"); + + // Verify request body in onBeforeSend + byte[] packetBytes = sentRequest.getPacket(MessagePack.newDefaultBufferPacker()); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(packetBytes); + unpacker.unpackInt(); // Skip size prefix + Value headerValue = unpacker.unpackValue(); + Value bodyValue = unpacker.unpackValue(); + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(bodyValue); + byte[] bodyBytes = packer.toByteArray(); + Map body = + objectMapper.readValue(bodyBytes, new TypeReference>() {}); + assertEquals("echo", body.get(0x22)); // IPROTO_FUNCTION_NAME + assertEquals(Arrays.asList(1, true), body.get(0x21)); // IPROTO_TUPLE + + // Verify response body in onSuccess + IProtoResponse successResponse = matchingSuccessResponses.get(0); + assertEquals( + sentSyncId, + successResponse.getSyncId(), + "onSuccess response should have same syncId as sent request"); + Map responseBody = new HashMap<>(); + Map byteBodyValues = successResponse.getByteBodyValues(); + for (Map.Entry entry : byteBodyValues.entrySet()) { + responseBody.put( + entry.getKey(), BaseTarantoolJacksonMapping.readValue(entry.getValue(), Object.class)); + } + assertEquals(Arrays.asList(1, true), responseBody.get(IPROTO_DATA)); + + testClient.close(); + } + + private long assertRequestHandler(List requests, String handlerName) + throws Exception { + assertEquals( + 1, requests.size(), handlerName + " handler should receive exactly one CALL request"); + IProtoRequest request = requests.get(0); + assertEquals(IPROTO_TYPE_CALL, request.getRequestType()); + + byte[] packetBytes = request.getPacket(MessagePack.newDefaultBufferPacker()); + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(packetBytes); + unpacker.unpackInt(); // Skip size prefix + + Value headerValue = unpacker.unpackValue(); + Value bodyValue = unpacker.unpackValue(); + + // Extract syncId from header (key 0x01) + long syncId = + headerValue + .asMapValue() + .map() + .get(ValueFactory.newInteger(IPROTO_SYNC_ID)) + .asIntegerValue() + .asLong(); + + // Convert body to bytes and parse with objectMapper + MessageBufferPacker packer = MessagePack.newDefaultBufferPacker(); + packer.packValue(bodyValue); + byte[] bodyBytes = packer.toByteArray(); + Map body = + objectMapper.readValue(bodyBytes, new TypeReference>() {}); + + assertEquals("slow_echo", body.get(0x22)); // IPROTO_FUNCTION_NAME + assertEquals(Arrays.asList(1, true), body.get(0x21)); // IPROTO_TUPLE + + return syncId; + } + + private long assertIgnoredResponseHandler(List responses) throws Exception { + assertEquals( + 1, responses.size(), "onIgnoredResponse handler should receive exactly one response"); + return assertIProtoResponse(responses.get(0)); + } + + private long assertIgnoredPackets(List> triplets) throws Exception { Set tags = new HashSet<>(); Set indexes = new HashSet<>(); + long syncId = -1; + for (List item : triplets) { tags.add((String) item.get(0)); indexes.add((int) item.get(1)); assertInstanceOf(IProtoResponse.class, item.get(2)); + syncId = assertIProtoResponse((IProtoResponse) item.get(2)); } + assertEquals(new HashSet<>(Collections.singletonList("default")), tags); assertEquals(Collections.singleton(0), indexes); + + return syncId; + } + + private long assertIProtoResponse(IProtoResponse response) throws Exception { + assertFalse(response.isError()); + assertTrue(response.hasSyncId()); + long syncId = response.getSyncId(); + assertTrue(syncId > 0); + + Map bodyAsObjects = new HashMap<>(); + Map byteBodyValues = response.getByteBodyValues(); + for (Map.Entry entry : byteBodyValues.entrySet()) { + bodyAsObjects.put( + entry.getKey(), BaseTarantoolJacksonMapping.readValue(entry.getValue(), Object.class)); + } + assertEquals(byteBodyValues.size(), bodyAsObjects.size()); + assertEquals(Arrays.asList(1, true), bodyAsObjects.get(IPROTO_DATA)); + + return syncId; } @Test diff --git a/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java b/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java index 63f50358..3c3d2d8e 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java +++ b/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java @@ -17,6 +17,7 @@ import static io.tarantool.core.protocol.requests.IProtoAuth.AuthType; import io.tarantool.core.connection.ConnectionCloseEvent; import io.tarantool.core.protocol.BoxIterator; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoRequestOpts; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.core.protocol.TransactionIsolationLevel; @@ -241,6 +242,8 @@ CompletableFuture id( IProtoClient onIgnoredPacket(Consumer handler); + IProtoClient withHandlers(Handlers handlers); + boolean isConnected(); Integer getClientProtocolVersion(); diff --git a/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java b/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java index d8a80aa1..cc26b958 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java +++ b/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java @@ -46,6 +46,7 @@ import io.tarantool.core.exceptions.ClientException; import io.tarantool.core.exceptions.ShutdownException; import io.tarantool.core.protocol.BoxIterator; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoRequest; import io.tarantool.core.protocol.IProtoRequestOpts; import io.tarantool.core.protocol.IProtoResponse; @@ -108,6 +109,7 @@ public class IProtoClientImpl implements IProtoClient { private Counter responseErrorCounter; private Counter ignoredResponsesCounter; private Consumer ignoredPacketsHandler; + private Handlers handlers; public IProtoClientImpl(ConnectionFactory factory, Timer timerService) { this(factory, timerService, DEFAULT_WATCHER_OPTS, null, null, false); @@ -717,6 +719,12 @@ public IProtoClient onIgnoredPacket(Consumer handler) { return this; } + @Override + public IProtoClient withHandlers(Handlers handlers) { + this.handlers = handlers; + return this; + } + @Override public boolean isConnected() { return connection.isConnected(); @@ -836,6 +844,7 @@ private void handleMessage(IProtoResponse message) { IProtoStateMachine fsm = fsmRegistry.get(syncId); if (fsm == null) { processIgnoredResponse(message); + log.error("Client cannot handle this message: {}", message); } else if (fsm.process(message)) { fsmRegistry.remove(syncId); if (fsm.hasNextAction() && fsm.next() != null) { @@ -880,7 +889,7 @@ private CompletableFuture runRequest( long syncId = allocateSyncIds(1); RequestStateMachine stateContext = new RequestStateMachine( - connection, syncId, request, resultPromise, opts, fsmRegistry, timerService); + connection, syncId, request, resultPromise, opts, fsmRegistry, timerService, handlers); // when completed stop timeout timer and metrics LongTaskTimer.Sample finalCurrentRequest = currentRequest; @@ -912,6 +921,12 @@ private void failAllRequests(Throwable ex) { private void processIgnoredResponse(IProtoResponse message) { ignoredPacketsHandler.accept(message); + + // Call onIgnoredResponse handler if present + if (handlers != null && handlers.getOnIgnoredResponse() != null) { + handlers.getOnIgnoredResponse().accept(message); + } + if (ignoredResponsesCounter != null) { ignoredResponsesCounter.increment(); } diff --git a/tarantool-core/src/main/java/io/tarantool/core/protocol/Handlers.java b/tarantool-core/src/main/java/io/tarantool/core/protocol/Handlers.java new file mode 100644 index 00000000..54331834 --- /dev/null +++ b/tarantool-core/src/main/java/io/tarantool/core/protocol/Handlers.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2025 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package io.tarantool.core.protocol; + +import java.util.function.Consumer; + +/** + * Container for request/response lifecycle handlers. + * + *

Used for tracing and monitoring request lifecycle: + * + *

    + *
  • {@link #onBeforeSend} - called before sending request + *
  • {@link #onSuccess} - called on successful response + *
  • {@link #onTimeout} - called when request times out + *
  • {@link #onIgnoredResponse} - called when response arrives after timeout + *
+ * + * @author Artyom Dubinin + */ +public class Handlers { + + private final Consumer onBeforeSend; + private final Consumer onSuccess; + private final Consumer onTimeout; + private final Consumer onIgnoredResponse; + + private Handlers(Builder builder) { + this.onBeforeSend = builder.onBeforeSend; + this.onSuccess = builder.onSuccess; + this.onTimeout = builder.onTimeout; + this.onIgnoredResponse = builder.onIgnoredResponse; + } + + /** + * Creates a new builder for Handlers. + * + * @return Builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Returns handler called before sending request. + * + * @return handler or null if not set + */ + public Consumer getOnBeforeSend() { + return onBeforeSend; + } + + /** + * Returns handler called on successful response. + * + * @return handler or null if not set + */ + public Consumer getOnSuccess() { + return onSuccess; + } + + /** + * Returns handler called when request times out. + * + * @return handler or null if not set + */ + public Consumer getOnTimeout() { + return onTimeout; + } + + /** + * Returns handler called when response arrives after timeout. + * + * @return handler or null if not set + */ + public Consumer getOnIgnoredResponse() { + return onIgnoredResponse; + } + + /** Builder for Handlers. */ + public static class Builder { + private Consumer onBeforeSend; + private Consumer onSuccess; + private Consumer onTimeout; + private Consumer onIgnoredResponse; + + private Builder() {} + + /** + * Sets handler called before sending request. + * + * @param handler the handler + * @return this builder + */ + public Builder onBeforeSend(Consumer handler) { + this.onBeforeSend = handler; + return this; + } + + /** + * Sets handler called on successful response. + * + * @param handler the handler + * @return this builder + */ + public Builder onSuccess(Consumer handler) { + this.onSuccess = handler; + return this; + } + + /** + * Sets handler called when request times out. + * + * @param handler the handler + * @return this builder + */ + public Builder onTimeout(Consumer handler) { + this.onTimeout = handler; + return this; + } + + /** + * Sets handler called when response arrives after timeout. + * + * @param handler the handler + * @return this builder + */ + public Builder onIgnoredResponse(Consumer handler) { + this.onIgnoredResponse = handler; + return this; + } + + /** + * Builds Handlers instance. + * + * @return new Handlers instance + */ + public Handlers build() { + return new Handlers(this); + } + } +} diff --git a/tarantool-core/src/main/java/io/tarantool/core/protocol/IProtoRequestOpts.java b/tarantool-core/src/main/java/io/tarantool/core/protocol/IProtoRequestOpts.java index 3f91eefb..3b81d4a5 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/protocol/IProtoRequestOpts.java +++ b/tarantool-core/src/main/java/io/tarantool/core/protocol/IProtoRequestOpts.java @@ -10,6 +10,7 @@ public class IProtoRequestOpts { private Consumer pushHandler; + private Handlers handlers; private long requestTimeoutMs; private Long streamId; @@ -53,4 +54,13 @@ public Long getStreamId() { public Consumer getPushHandler() { return this.pushHandler; } + + public IProtoRequestOpts withHandlers(Handlers handlers) { + this.handlers = handlers; + return this; + } + + public Handlers getHandlers() { + return this.handlers; + } } diff --git a/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/RequestStateMachine.java b/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/RequestStateMachine.java index 152ba929..1d84a25a 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/RequestStateMachine.java +++ b/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/RequestStateMachine.java @@ -18,6 +18,7 @@ import io.tarantool.core.connection.Connection; import io.tarantool.core.exceptions.BoxError; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoMessage; import io.tarantool.core.protocol.IProtoRequest; import io.tarantool.core.protocol.IProtoRequestOpts; @@ -27,6 +28,7 @@ public class RequestStateMachine extends AbstractIProtoStateMachine { private static final Logger log = LoggerFactory.getLogger(RequestStateMachine.class); private final Consumer pushConsumer; + private final Handlers handlers; private final Timer timerService; private final long timeout; private final CompletableFuture promise; @@ -39,10 +41,12 @@ public RequestStateMachine( CompletableFuture promise, IProtoRequestOpts opts, Map fsmRegistry, - Timer timerService) { + Timer timerService, + Handlers requestHandlers) { super(connection, request, fsmRegistry); request.setSyncId(syncId); this.pushConsumer = opts.getPushHandler(); + this.handlers = requestHandlers; this.timerService = timerService; this.timeout = opts.getRequestTimeout(); this.promise = promise; @@ -51,14 +55,23 @@ public RequestStateMachine( @Override public void start() { + // Call onBeforeSend handler before sending the request + if (handlers != null && handlers.getOnBeforeSend() != null) { + handlers.getOnBeforeSend().accept(request); + } + // TODO: remove string format this.timer = runAfter( timeout, - () -> - kill( - new TimeoutException( - String.format("Request timeout: %s; timeout = %sms", request, timeout)))); + () -> { + if (handlers != null && handlers.getOnTimeout() != null) { + handlers.getOnTimeout().accept(request); + } + kill( + new TimeoutException( + String.format("Request timeout: %s; timeout = %sms", request, timeout))); + }); promise.whenComplete((iProtoResponse, throwable) -> this.timer.cancel()); @@ -86,6 +99,11 @@ public boolean process(IProtoResponse message) { return false; } + // Call onSuccess handler when response is received successfully + if (handlers != null && handlers.getOnSuccess() != null) { + handlers.getOnSuccess().accept(message); + } + if (message.isError()) { promise.completeExceptionally(BoxError.fromIProtoMessage(message)); } else { diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java index 8c360e3c..2820a28c 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java @@ -28,6 +28,7 @@ import io.tarantool.core.ManagedResource; import io.tarantool.core.WatcherOptions; import io.tarantool.core.connection.ConnectionFactory; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoResponse; import io.tarantool.pool.exceptions.PoolClosedException; @@ -153,6 +154,8 @@ public class IProtoClientPoolImpl implements IProtoClientPool { */ private final TripleConsumer ignoredPacketsHandler; + private final Handlers handlers; + private final Object connectionPoolLock = new Object(); /** @@ -170,7 +173,7 @@ public class IProtoClientPoolImpl implements IProtoClientPool { * @param timerResource managed timer resource (ownership is defined by the caller) */ public IProtoClientPoolImpl(ConnectionFactory factory, ManagedResource timerResource) { - this(factory, timerResource, true, null, null, null, null, false, null); + this(factory, timerResource, true, null, null, null, null, null, false, null); } /** @@ -200,7 +203,7 @@ public IProtoClientPoolImpl(ConnectionFactory factory, Timer timerService) { */ public IProtoClientPoolImpl( ConnectionFactory factory, ManagedResource timerResource, boolean gracefulShutdown) { - this(factory, timerResource, gracefulShutdown, null, null, null, null, false, null); + this(factory, timerResource, gracefulShutdown, null, null, null, null, null, false, null); } /** @@ -223,7 +226,17 @@ public IProtoClientPoolImpl( ManagedResource timerResource, boolean gracefulShutdown, HeartbeatOpts heartbeatOpts) { - this(factory, timerResource, gracefulShutdown, heartbeatOpts, null, null, null, false, null); + this( + factory, + timerResource, + gracefulShutdown, + heartbeatOpts, + null, + null, + null, + null, + false, + null); } /** @@ -253,6 +266,7 @@ public IProtoClientPoolImpl( watcherOpts, metricsRegistry, null, + null, false, null); } @@ -273,6 +287,49 @@ public IProtoClientPoolImpl( * arguments: a first one is a tag of connection, the second one is an index of connection in * group and the third argument is a packet. * @param useTupleExtension Use TUPLE_EXT feature if true. + * @param poolEventListener optional pool event listener + */ + public IProtoClientPoolImpl( + ConnectionFactory factory, + ManagedResource timerResource, + boolean gracefulShutdown, + HeartbeatOpts heartbeatOpts, + WatcherOptions watcherOpts, + MeterRegistry metricsRegistry, + TripleConsumer ignoredPacketsHandler, + boolean useTupleExtension, + PoolEventListener poolEventListener) { + this( + factory, + timerResource, + gracefulShutdown, + heartbeatOpts, + watcherOpts, + metricsRegistry, + ignoredPacketsHandler, + null, + useTupleExtension, + poolEventListener); + } + + /** + * Constructor for pool instance. + * + * @param factory the bootstrap + * @param timerResource managed timer resource (ownership is defined by the caller) + * @param gracefulShutdown a boolean flag switching gracefulShutdown facility + * @param heartbeatOpts an object with options for heartbeats. If presented heartbeats will be + * used. + * @param watcherOpts an object with options for watchers + * @param metricsRegistry an instance of MeterRegistry containing all necessary counters and + * gauges. + * @param ignoredPacketsHandler a lambda for accepting ignored packets and handling them somehow. + * It is an instance of {@link io.tarantool.pool.TripleConsumer} which accepts three + * arguments: a first one is a tag of connection, the second one is an index of connection in + * group and the third argument is a packet. + * @param handlers handlers for request/response lifecycle events + * @param useTupleExtension Use TUPLE_EXT feature if true. + * @param poolEventListener optional pool event listener */ public IProtoClientPoolImpl( ConnectionFactory factory, @@ -282,6 +339,7 @@ public IProtoClientPoolImpl( WatcherOptions watcherOpts, MeterRegistry metricsRegistry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, boolean useTupleExtension, PoolEventListener poolEventListener) { this.factory = factory; @@ -299,6 +357,7 @@ public IProtoClientPoolImpl( this.totalSize = 0; this.metricsRegistry = metricsRegistry; this.ignoredPacketsHandler = ignoredPacketsHandler; + this.handlers = handlers; this.useTupleExtension = useTupleExtension; this.poolEventListener = poolEventListener; @@ -501,6 +560,7 @@ private void expandGroup(List connects, InstanceConnectionGroup group reconnecting, metricsRegistry, ignoredPacketsHandler, + handlers, useTupleExtension, poolEventListener)); } diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index dafbadaf..344489ba 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -28,6 +28,7 @@ import io.tarantool.core.connection.ConnectionCloseEvent; import io.tarantool.core.connection.ConnectionFactory; import io.tarantool.core.connection.exceptions.ConnectionException; +import io.tarantool.core.protocol.Handlers; import io.tarantool.core.protocol.IProtoRequestOpts; import io.tarantool.core.protocol.IProtoResponse; @@ -86,6 +87,9 @@ final class PoolEntry { */ private final IProtoRequestOpts heartbeatPingOpts; + /** Handlers for request/response lifecycle events. */ + private final Handlers handlers; + /** Instance of {@link io.tarantool.pool.InstanceConnectionGroup}. */ private final InstanceConnectionGroup group; @@ -230,9 +234,11 @@ public PoolEntry( AtomicInteger reconnecting, MeterRegistry registry, TripleConsumer ignoredPacketsHandler, + Handlers handlers, boolean useTupleExtension, PoolEventListener poolEventListener) { this.metricsRegistry = registry; + this.handlers = handlers; this.client = new IProtoClientImpl( factory, @@ -241,6 +247,7 @@ public PoolEntry( registry, group.getFlushConsolidationHandler(), useTupleExtension); + this.client.withHandlers(handlers); // heartbeat related this.isHeartbeatStarted = false; this.lastHeartbeatEvent = HeartbeatEvent.KILL;