From 5833e065a8b0b55b5f8b80115dbc0256b05f4c11 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Wed, 22 Jan 2025 23:09:37 +0800 Subject: [PATCH 1/2] optimize the delay RPC future --- .../rpc/contrib/client/PulsarRpcClient.java | 57 +++++++++ .../contrib/client/PulsarRpcClientImpl.java | 94 +++++++++++---- .../rpc/contrib/client/ReplyListener.java | 12 +- .../pulsar/rpc/contrib/common/Constants.java | 1 + .../rpc/contrib/server/ReplySender.java | 22 ++-- .../rpc/contrib/server/RequestListener.java | 12 +- .../pulsar/rpc/contrib/SimpleRpcCallTest.java | 112 ++++++++++++++++++ .../base/SingletonPulsarContainer.java | 1 + 8 files changed, 270 insertions(+), 41 deletions(-) diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClient.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClient.java index 4925bfb..3da5528 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClient.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClient.java @@ -16,6 +16,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.NonNull; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; @@ -88,6 +89,62 @@ default CompletableFuture requestAsync(String correlationId, T value) { */ CompletableFuture requestAsync(String correlationId, T value, Map config); + /** + * Deliver the message only at or after the specified absolute timestamp. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server. + * @return A CompletableFuture that will complete with the reply value. + */ + default CompletableFuture requestAtAsync(String correlationId, T value, long timestamp) { + return requestAtAsync(correlationId, value, Collections.emptyMap(), timestamp); + } + + /** + * Deliver the message only at or after the specified absolute timestamp. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param config Configuration map for creating a request producer, + * will call {@link TypedMessageBuilder#loadConf(Map)} + * @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server. + * @return A CompletableFuture that will complete with the reply value. + */ + CompletableFuture requestAtAsync(String correlationId, T value, Map config, + long timestamp); + + /** + * Request to deliver the message only after the specified relative delay. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param delay The amount of delay before the message will be delivered. + * @param unit The time unit for the delay. + * @return A CompletableFuture that will complete with the reply value. + */ + default CompletableFuture requestAfterAsync(String correlationId, T value, long delay, TimeUnit unit) { + return requestAfterAsync(correlationId, value, Collections.emptyMap(), delay, unit); + } + + /** + * Request to deliver the message only after the specified relative delay. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param config Configuration map for creating a request producer, + * will call {@link TypedMessageBuilder#loadConf(Map)} + * @param delay The amount of delay before the message will be delivered. + * @param unit The time unit for the delay. + * @return A CompletableFuture that will complete with the reply value. + */ + CompletableFuture requestAfterAsync(String correlationId, T value, Map config, + long delay, TimeUnit unit); + /** * Removes a request from the tracking map based on its correlation ID. * diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClientImpl.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClientImpl.java index 97fba89..adb5a63 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClientImpl.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/PulsarRpcClientImpl.java @@ -14,6 +14,7 @@ package org.apache.pulsar.rpc.contrib.client; import static lombok.AccessLevel.PACKAGE; +import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME; import java.io.IOException; import java.time.Duration; import java.util.Map; @@ -104,6 +105,7 @@ public void close() throws PulsarRpcClientException { } } + @Override public V request(String correlationId, T value, Map config) throws PulsarRpcClientException { try { return requestAsync(correlationId, value, config).get(); @@ -113,36 +115,80 @@ public V request(String correlationId, T value, Map config) thro } } + @Override public CompletableFuture requestAsync(String correlationId, T value, Map config) { + return internalRequest(correlationId, value, config, -1, -1, null); + } + + @Override + public CompletableFuture requestAtAsync(String correlationId, T value, Map config, + long timestamp) { + return internalRequest(correlationId, value, config, timestamp, -1, null); + } + + @Override + public CompletableFuture requestAfterAsync(String correlationId, T value, Map config, + long delay, TimeUnit unit) { + return internalRequest(correlationId, value, config, -1, delay, unit); + } + + private CompletableFuture internalRequest(String correlationId, T value, Map config, + long timestamp, long delay, TimeUnit unit) { CompletableFuture replyFuture = new CompletableFuture<>(); long replyTimeoutMillis = replyTimeout.toMillis(); - replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS) - .exceptionally(e -> { - replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage())); - callback.onTimeout(correlationId, e); - removeRequest(correlationId); - return null; - }); - pendingRequestsMap.put(correlationId, replyFuture); - TypedMessageBuilder requestMessage = newRequestMessage(correlationId, value, config); + if (timestamp == -1 && delay == -1) { + replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS) + .exceptionally(e -> { + replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage())); + callback.onTimeout(correlationId, e); + removeRequest(correlationId); + return null; + }); + pendingRequestsMap.put(correlationId, replyFuture); - sender.sendRequest(requestMessage, replyTimeoutMillis) - .thenAccept(requestMessageId -> { - if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) { + sender.sendRequest(requestMessage, replyTimeoutMillis) + .thenAccept(requestMessageId -> { + if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) { + removeRequest(correlationId); + } else { + callback.onSendRequestSuccess(correlationId, requestMessageId); + } + }).exceptionally(ex -> { + if (callback != null) { + callback.onSendRequestError(correlationId, ex, replyFuture); + } else { + replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage())); + } removeRequest(correlationId); - } else { - callback.onSendRequestSuccess(correlationId, requestMessageId); - } - }).exceptionally(ex -> { - if (callback != null) { - callback.onSendRequestError(correlationId, ex, replyFuture); - } else { - replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage())); - } - removeRequest(correlationId); - return null; - }); + return null; + }); + } else { + // Handle Delayed RPC. + if (pendingRequestsMap.containsKey(correlationId)) { + removeRequest(correlationId); + } + + if (timestamp > 0) { + requestMessage.property(REQUEST_DELIVER_AT_TIME, String.valueOf(timestamp)); + requestMessage.deliverAt(timestamp); + } else if (delay > 0 && unit != null) { + String delayedAt = String.valueOf(System.currentTimeMillis() + unit.toMillis(delay)); + requestMessage.property(REQUEST_DELIVER_AT_TIME, delayedAt); + requestMessage.deliverAfter(delay, unit); + } + sender.sendRequest(requestMessage, replyTimeoutMillis).thenAccept(requestMessageId -> { + callback.onSendRequestSuccess(correlationId, requestMessageId); + }).exceptionally(ex -> { + if (callback != null) { + callback.onSendRequestError(correlationId, ex, replyFuture); + } else { + replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage())); + } + return null; + }); + } + return replyFuture; } diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java index 8948994..326fe68 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java @@ -14,6 +14,8 @@ package org.apache.pulsar.rpc.contrib.client; import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE; +import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME; +import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import lombok.AccessLevel; @@ -50,17 +52,19 @@ class ReplyListener implements MessageListener { public void received(Consumer consumer, Message msg) { String correlationId = msg.getKey(); try { - if (!pendingRequestsMap.containsKey(correlationId)) { + if (!pendingRequestsMap.containsKey(correlationId) && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) { log.warn("[{}] [{}] No pending request found for correlationId {}." + " This may indicate the message has already been processed or timed out.", consumer.getTopic(), consumer.getConsumerName(), correlationId); } else { - CompletableFuture future = pendingRequestsMap.get(correlationId); + CompletableFuture future = pendingRequestsMap.computeIfAbsent(correlationId, + key -> new CompletableFuture<>()); String errorMessage = msg.getProperty(ERROR_MESSAGE); + String serverSub = msg.getProperty(SERVER_SUB); if (errorMessage != null) { - callBack.onReplyError(correlationId, consumer.getSubscription(), errorMessage, future); + callBack.onReplyError(correlationId, serverSub, errorMessage, future); } else { - callBack.onReplySuccess(correlationId, consumer.getSubscription(), msg.getValue(), future); + callBack.onReplySuccess(correlationId, serverSub, msg.getValue(), future); } } } finally { diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/Constants.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/Constants.java index 476bd34..b51d943 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/Constants.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/Constants.java @@ -21,4 +21,5 @@ public class Constants { public static final String REPLY_TOPIC = "replyTopic"; public static final String ERROR_MESSAGE = "errorMessage"; public static final String SERVER_SUB = "serverSub"; + public static final String REQUEST_DELIVER_AT_TIME = "deliverAt"; } diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/ReplySender.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/ReplySender.java index 7ed9b67..efb44e3 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/ReplySender.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/ReplySender.java @@ -14,6 +14,7 @@ package org.apache.pulsar.rpc.contrib.server; import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE; +import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME; import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB; import java.util.function.BiConsumer; import lombok.AccessLevel; @@ -50,8 +51,9 @@ class ReplySender { * @param sub The subscriber name involved in this interaction. */ @SneakyThrows - void sendReply(String topic, String correlationId, V reply, T value, String sub) { - onSend(topic, correlationId, msg -> msg.value(reply), value, sub); + void sendReply(String topic, String correlationId, V reply, T value, String sub, + long delayedAt) { + onSend(topic, correlationId, msg -> msg.value(reply), value, sub, delayedAt); } /** @@ -64,8 +66,10 @@ void sendReply(String topic, String correlationId, V reply, T value, String sub) * @param sub The subscriber name involved in this interaction. */ @SneakyThrows - void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub) { - onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null), value, sub); + void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub, + long delayedAt) { + onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null), + value, sub, delayedAt); } /** @@ -79,17 +83,17 @@ void sendErrorReply(String topic, String correlationId, String errorMessage, T v * @param sub The subscriber name to be included in the message metadata. */ @SneakyThrows - void onSend(String topic, - String correlationId, - java.util.function.Consumer> consumer, - T value, - String sub) { + void onSend(String topic, String correlationId, java.util.function.Consumer> consumer, + T value, String sub, long delayedAt) { log.debug("Sending {}", correlationId); Producer producer = pool.borrowObject(topic); try { TypedMessageBuilder builder = producer.newMessage() .key(correlationId) .property(SERVER_SUB, sub); + if (delayedAt > 0) { + builder.property(REQUEST_DELIVER_AT_TIME, String.valueOf(delayedAt)); + } consumer.accept(builder); builder.sendAsync() .exceptionally(e -> { diff --git a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/RequestListener.java b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/RequestListener.java index e06930a..3f0f0f7 100644 --- a/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/RequestListener.java +++ b/pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/RequestListener.java @@ -14,6 +14,7 @@ package org.apache.pulsar.rpc.contrib.server; import static org.apache.pulsar.rpc.contrib.common.Constants.REPLY_TOPIC; +import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME; import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_TIMEOUT_MILLIS; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -53,7 +54,7 @@ class RequestListener implements MessageListener { public void received(Consumer consumer, Message msg) { long replyTimeout = Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS)) - (System.currentTimeMillis() - msg.getPublishTime()); - if (replyTimeout <= 0) { + if (replyTimeout <= 0 && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) { consumer.acknowledgeAsync(msg); return; } @@ -62,12 +63,15 @@ public void received(Consumer consumer, Message msg) { String requestSubscription = consumer.getSubscription(); String replyTopic = msg.getProperty(REPLY_TOPIC); T value = msg.getValue(); - + long delayedAt = msg.hasProperty(REQUEST_DELIVER_AT_TIME) + ? Long.parseLong(msg.getProperty(REQUEST_DELIVER_AT_TIME)) + + Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS)) + : 0; try { requestFunction.apply(value) .orTimeout(replyTimeout, TimeUnit.MILLISECONDS) .thenAccept(reply -> { - sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription); + sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription, delayedAt); }) .get(); } catch (ExecutionException e) { @@ -79,7 +83,7 @@ public void received(Consumer consumer, Message msg) { log.error("[{}] Error processing request", correlationId, e); sender.sendErrorReply(replyTopic, correlationId, cause.getClass().getName() + ": " + cause.getMessage(), - value, requestSubscription); + value, requestSubscription, delayedAt); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java index 8acffe1..b8839de 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java @@ -436,6 +436,118 @@ public void testRpcCallProcessFailedOnServerSide() throws Exception { Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> resultMap.size() == messageNum * 2); } + @Test + public void testDelayedRpcAt() throws Exception { + setupTopic("testDelayedRpcAt"); + Map requestProducerConfigMap = new HashMap<>(); + requestProducerConfigMap.put("producerName", "requestProducer"); + requestProducerConfigMap.put("messageRoutingMode", MessageRoutingMode.RoundRobinPartition); + + Map resultMap = new ConcurrentHashMap<>(); + final int ackNums = 2; + + RequestCallBack callBack = new RequestCallBack<>() { + @Override + public void onSendRequestSuccess(String correlationId, MessageId messageId) { + log.info(" CorrelationId[{}] Send request message success. MessageId: {}", + correlationId, messageId); + } + + @Override + public void onSendRequestError(String correlationId, Throwable t, + CompletableFuture replyFuture) { + log.warn(" CorrelationId[{}] Send request message failed. {}", + correlationId, t.getMessage()); + replyFuture.completeExceptionally(t); + } + + @Override + public void onReplySuccess(String correlationId, String subscription, + TestReply value, CompletableFuture replyFuture) { + log.info(" CorrelationId[{}] Subscription[{}] Receive reply message success. Value: {}", + correlationId, subscription, value); + if (resultMap.get(correlationId).getAndIncrement() == ackNums - 1) { + rpcClient.removeRequest(correlationId); + } + replyFuture.complete(value); + } + + @Override + public void onReplyError(String correlationId, String subscription, + String errorMessage, CompletableFuture replyFuture) { + log.warn(" CorrelationId[{}] Subscription[{}] Receive reply message failed. {}", + correlationId, subscription, errorMessage); + replyFuture.completeExceptionally(new Exception(errorMessage)); + } + + @Override + public void onTimeout(String correlationId, Throwable t) { + log.warn(" CorrelationId[{}] Receive reply message timed out. {}", + correlationId, t.getMessage()); + } + + @Override + public void onReplyMessageAckFailed(String correlationId, Consumer consumer, + Message msg, Throwable t) { + consumer.acknowledgeAsync(msg.getMessageId()).exceptionally(ex -> { + log.warn(" [{}] [{}] Acknowledging message {} failed again.", + msg.getTopicName(), correlationId, msg.getMessageId(), ex); + return null; + }); + } + }; + + rpcClient = createPulsarRpcClient(pulsarClient, requestProducerConfigMap, null, callBack); + + final int defaultEpoch = 1; + AtomicInteger epoch = new AtomicInteger(defaultEpoch); + // What do we do when we receive the request message + requestFunction = request -> { + epoch.getAndIncrement(); + return CompletableFuture.completedFuture(new TestReply(request.value() + "-----------done")); + }; + // If the server side is stateful, an error occurs after the server side executes 3-1, and a mechanism for + // checking and rolling back needs to be provided. + rollbackFunction = (id, request) -> { + if (epoch.get() != defaultEpoch) { + epoch.set(defaultEpoch); + } + }; + PulsarRpcServer rpcServer1 = createPulsarRpcServer(pulsarClient, requestSubBase + "-1", + requestFunction, rollbackFunction, null); + PulsarRpcServer rpcServer2 = createPulsarRpcServer(pulsarClient, requestSubBase + "-2", + requestFunction, rollbackFunction, null); + PulsarRpcServer rpcServer3 = createPulsarRpcServer(pulsarClient, requestSubBase + "-3", + requestFunction, rollbackFunction, null); + + long delayedTime = 5000; + + Map requestMessageConfigMap = new HashMap<>(); + requestMessageConfigMap.put(TypedMessageBuilder.CONF_DISABLE_REPLICATION, true); + for (int i = 0; i < messageNum; i++) { + String correlationId = correlationIdSupplier.get(); + TestRequest message = new TestRequest(asynchronousMessage + i); + long eventTime = System.currentTimeMillis(); + requestMessageConfigMap.put(TypedMessageBuilder.CONF_EVENT_TIME, eventTime); + resultMap.put(correlationId, new AtomicInteger()); + rpcClient.requestAfterAsync(correlationId, message, requestMessageConfigMap, + delayedTime, TimeUnit.MILLISECONDS); + } + long current = System.currentTimeMillis(); + + Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> { + AtomicInteger success = new AtomicInteger(); + resultMap.forEach((__, count) -> success.getAndAdd(count.get())); + if (System.currentTimeMillis() - current < delayedTime && success.get() > 0) { + return false; + } + return success.get() >= messageNum * ackNums && System.currentTimeMillis() - current >= delayedTime; + }); + rpcServer1.close(); + rpcServer2.close(); + rpcServer3.close(); + } + private PulsarRpcClient createPulsarRpcClient( PulsarClient pulsarClient, Map requestProducerConfigMap, Pattern replyTopicsPattern, RequestCallBack callBack) throws PulsarRpcClientException { diff --git a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java index be34654..c418ab9 100644 --- a/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java +++ b/pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/base/SingletonPulsarContainer.java @@ -31,6 +31,7 @@ public class SingletonPulsarContainer { static { PULSAR_CONTAINER = new PulsarContainer(getPulsarImage()) .withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true") + .withEnv("PULSAR_PREFIX_delayedDeliveryEnabled", "true") .withStartupTimeout(Duration.ofMinutes(3)); PULSAR_CONTAINER.start(); } From e9ae275be15813deffb35606893167b30b21629d Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Wed, 29 Jan 2025 23:23:06 +0800 Subject: [PATCH 2/2] PCIP-4 Improve delayed RPC message handling in pulsar-rpc --- pcip/pcip-4.md | 342 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 pcip/pcip-4.md diff --git a/pcip/pcip-4.md b/pcip/pcip-4.md new file mode 100644 index 0000000..ebbdb6b --- /dev/null +++ b/pcip/pcip-4.md @@ -0,0 +1,342 @@ +# PCIP-4: Improve delayed RPC message handling in pulsar-rpc + +# Background knowledge + +In the Pulsar RPC framework, the client typically places an RPC request into a `pendingRequestsMap`. When the server processes this request, it sends back a reply, which in turn matches the original request in `pendingRequestsMap`. If there is no corresponding pending request, the reply is “discarded” and no result is returned to the caller. + +However, when an RPC request is delayed (e.g., not delivered until a scheduled time in the future), the original design would require leaving the request in the `pendingRequestsMap` for an extended period. This can cause potential resource and management overhead, especially when the delay is long. + +# Motivation + +We want to enable the **delayed sending** of RPC requests, where: + +- The request can be sent immediately, but only delivered to the server at or after a certain timestamp +- The request can be sent to the server only after a specified relative delay. + +In such cases, holding the request in the `pendingRequestsMap` for a long time (until the server receives and processes the message) is suboptimal and leads to potential inefficiencies or timeouts. Therefore, we introduce a change in behavior to avoid adding these delayed requests to the `pendingRequestsMap` immediately on the client side. + +When the delayed request finally arrives at the server and the server sends a reply, the reply still needs to be matched to the original request ID. Hence, we create a mechanism so that, upon receiving the delayed reply on the client side, the client reconstructs the pending request context on demand (i.e., adding it into the `pendingRequestsMap` just in time) and completes the callback flow. + +# Goals + +## In Scope + +- Provide APIs to schedule an RPC request at a specified absolute time (`requestAtAsync`) or after a specified relative delay (`requestAfterAsync`). +- Adjust the Pulsar RPC client flow so that **delayed** requests are **not** tracked in `pendingRequestsMap` immediately, thereby reducing resource usage. +- Ensure the **reply** for a delayed request can still be correlated, even if it arrives much later. + +## Out of Scope + +- Enhancements to Pulsar’s core delayed-delivery mechanism. The changes here rely on Pulsar’s existing delayed-delivery support. +- Detailed transaction or rollback logic on the client side for delayed requests. That remains part of user-defined logic. + +# High Level Design + +When the user calls: +- `requestAtAsync(correlationId, value, timestamp)` or +- `requestAfterAsync(correlationId, value, delay, TimeUnit unit)` + +the client **immediately** sends a message to the request topic with additional properties indicating delayed delivery (`deliverAt(...)` or `deliverAfter(...)`). However, it **does not** store the request in the `pendingRequestsMap`. Instead, it simply sends the message to Pulsar with the appropriate delay/timestamp. + +When the server eventually processes the message and responds, the client’s `ReplyListener` checks if this reply corresponds to a **delayed** request by looking at specific properties (e.g. `REQUEST_DELIVER_AT_TIME`). If so, it **lazily** puts the request into the `pendingRequestsMap`, completes any callback logic, and removes the request. + +# Detailed Design + +## Design & Implementation Details + +Below is an overview of the **key changes** from the code: + +1. **New Methods in `PulsarRpcClient`:** + +- requestAtAsync(String correlationId, T value, long timestamp) +- requestAfterAsync(String correlationId, T value, long delay, TimeUnit unit) +- And their overloads that accept Map config. + +These methods allow the client to specify absolute (requestAtAsync) or relative (requestAfterAsync) delivery times. + +2. **Implementation in `PulsarRpcClientImpl`:** + +```java + @Override + public CompletableFuture requestAtAsync(String correlationId, T value, Map config, + long timestamp) { + return internalRequest(correlationId, value, config, timestamp, -1, null); + } + + @Override + public CompletableFuture requestAfterAsync(String correlationId, T value, Map config, + long delay, TimeUnit unit) { + return internalRequest(correlationId, value, config, -1, delay, unit); + } + + private CompletableFuture internalRequest(String correlationId, T value, Map config, + long timestamp, long delay, TimeUnit unit) { + CompletableFuture replyFuture = new CompletableFuture<>(); + // ... + if (timestamp == -1 && delay == -1) { + // Normal, non-delayed behavior: add request to pendingRequestsMap and handle as before. + } else { + // Delayed behavior: do NOT immediately add to pendingRequestsMap. + // Instead, set deliverAt or deliverAfter properties. Send message. + // The actual correlation is done when a reply arrives. + } + return replyFuture; + } +``` + +- If not delayed (both timestamp == -1 and delay == -1), the client uses the old path of tracking in pendingRequestsMap. +- If the request is delayed, it sets special properties (e.g. REQUEST_DELIVER_AT_TIME) and does not add the request to pendingRequestsMap. + +3. Server-Side and Reply Handling: + +- A new property REQUEST_DELIVER_AT_TIME is used to record the scheduled delivery time. +- If the server sees a request containing REQUEST_DELIVER_AT_TIME, it processes it after the actual delivery time is reached (relying on the broker’s delayed-delivery feature). +- When the server responds, it includes the same property for the client’s reference. +- On the client side, ReplyListener checks if REQUEST_DELIVER_AT_TIME is present: + +```java + if (!pendingRequestsMap.containsKey(correlationId) && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) { + // This is a normal message with no matching pending request + ... + } else { + // If it’s a delayed message, we computeIfAbsent in the map and proceed with callback logic + } +``` + +Delayed replies are inserted into the map, and the user’s callback (`RequestCallBack.onReplySuccess`, etc.) is triggered. + +## Public-facing Changes + +### Public API + +New methods in PulsarRpcClient: + +```java + /** + * Deliver the message only at or after the specified absolute timestamp. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server. + * @return A CompletableFuture that will complete with the reply value. + */ + default CompletableFuture requestAtAsync(String correlationId, T value, long timestamp) { + return requestAtAsync(correlationId, value, Collections.emptyMap(), timestamp); + } + + /** + * Deliver the message only at or after the specified absolute timestamp. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param config Configuration map for creating a request producer, + * will call {@link TypedMessageBuilder#loadConf(Map)} + * @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server. + * @return A CompletableFuture that will complete with the reply value. + */ + CompletableFuture requestAtAsync(String correlationId, T value, Map config, + long timestamp); + + /** + * Request to deliver the message only after the specified relative delay. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param delay The amount of delay before the message will be delivered. + * @param unit The time unit for the delay. + * @return A CompletableFuture that will complete with the reply value. + */ + default CompletableFuture requestAfterAsync(String correlationId, T value, long delay, TimeUnit unit) { + return requestAfterAsync(correlationId, value, Collections.emptyMap(), delay, unit); + } + + /** + * Request to deliver the message only after the specified relative delay. + * Asynchronously sends a request and returns a future that completes with the reply. + * + * @param correlationId A unique identifier for the request. + * @param value The value used to generate the request message + * @param config Configuration map for creating a request producer, + * will call {@link TypedMessageBuilder#loadConf(Map)} + * @param delay The amount of delay before the message will be delivered. + * @param unit The time unit for the delay. + * @return A CompletableFuture that will complete with the reply value. + */ + CompletableFuture requestAfterAsync(String correlationId, T value, Map config, + long delay, TimeUnit unit); +``` + +### Configuration + +### CLI + +# Get started + +## Quick Start + +Below is a practical example of delayed RPC using requestAfterAsync, referencing a test scenario similar to testDelayedRpcAt from our SimpleRpcCallTest. + +1. Create a PulsarClient, define schemas, and initialize topics: + +```java + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .build(); + + Schema requestSchema = Schema.JSON(TestRequest.class); + Schema replySchema = Schema.JSON(TestReply.class); + + String requestTopic = "testDelayedRpcAt-request"; + String replySubscription = "testDelayedRpcAt-reply-sub"; + + final int ackNums = 2; // For demonstration: we want to remove the request after we see replies multiple times. + final int messageNum = 5; // Example count of messages to send + final long delayedTime = 5000; // 5 seconds delay +``` + +2. Implement a RequestCallBack to handle events like “send success”, “reply success/error”, and “timeout”: + +```java +// Example records: +public record TestRequest(String value) { } +public record TestReply(String value) { } + +Map resultMap = new ConcurrentHashMap<>(); + +// This callback will track how many times each correlationId sees a reply. +RequestCallBack callBack = new RequestCallBack<>() { + @Override + public void onSendRequestSuccess(String correlationId, MessageId messageId) { + log.info(" CorrelationId[{}] Send request message success. MessageId: {}", + correlationId, messageId); + } + + @Override + public void onSendRequestError(String correlationId, Throwable t, + CompletableFuture replyFuture) { + log.warn(" CorrelationId[{}] failed. {}", + correlationId, t.getMessage()); + replyFuture.completeExceptionally(t); + } + + @Override + public void onReplySuccess(String correlationId, String subscription, + TestReply value, CompletableFuture replyFuture) { + log.info(" CorrelationId[{}] Subscription[{}] Value: {}", + correlationId, subscription, value); + // Count the number of times we've successfully received a reply for this correlationId. + if (resultMap.get(correlationId).getAndIncrement() == ackNums - 1) { + // Once we hit ackNums, remove the request from the pending map + rpcClient.removeRequest(correlationId); + } + replyFuture.complete(value); + } + + @Override + public void onReplyError(String correlationId, String subscription, + String errorMessage, CompletableFuture replyFuture) { + log.warn(" CorrelationId[{}] Subscription[{}] Error: {}", + correlationId, subscription, errorMessage); + replyFuture.completeExceptionally(new Exception(errorMessage)); + } + + @Override + public void onTimeout(String correlationId, Throwable t) { + log.warn(" CorrelationId[{}] Timed out. {}", correlationId, t.getMessage()); + } + + @Override + public void onReplyMessageAckFailed(String correlationId, Consumer consumer, + Message msg, Throwable t) { + consumer.acknowledgeAsync(msg.getMessageId()).exceptionally(ex -> { + log.warn(" Acknowledging message {} failed again.", msg.getMessageId(), ex); + return null; + }); + } +}; +``` + +3. Create the RPC Client using requestCallBack(callBack): + +```java +Map requestProducerConfigMap = new HashMap<>(); +requestProducerConfigMap.put("producerName", "requestProducer"); +requestProducerConfigMap.put("messageRoutingMode", MessageRoutingMode.RoundRobinPartition); + +// Build the RPC client +PulsarRpcClient rpcClient = + PulsarRpcClient.builder(requestSchema, replySchema) + .requestTopic(requestTopic) + .requestProducerConfig(requestProducerConfigMap) + .replySubscription(replySubscription) + .replyTimeout(Duration.ofSeconds(10)) + .requestCallBack(callBack) // Set the callback + .build(pulsarClient); +``` + +4. Create an RPC Server (or multiple servers) that subscribes to the request topic, processes incoming messages, and sends replies: + +```java +// Example of request processing logic: +AtomicInteger epoch = new AtomicInteger(1); + +Function> requestFunction = request -> { + epoch.getAndIncrement(); + return CompletableFuture.completedFuture(new TestReply(request.value() + "-----------done")); +}; + +BiConsumer rollbackFunction = (id, request) -> { + // Example rollback logic + epoch.set(1); +}; + +// Create 1 or more RPC servers listening on the same subscription but different consumer names +PulsarRpcServer rpcServer = PulsarRpcServer + .builder(requestSchema, replySchema) + .requestTopic(requestTopic) + .requestSubscription("myServerSub-1") + .build(pulsarClient, requestFunction, rollbackFunction); +``` + +5. Send delayed requests with requestAfterAsync (relative delay). For each request, we use a unique correlationId and track its replies in resultMap: + +```java + // For each message, we store an AtomicInteger to track how many replies we've received for that correlationId + Map requestMessageConfigMap = new HashMap<>(); + requestMessageConfigMap.put(TypedMessageBuilder.CONF_DISABLE_REPLICATION, true); + + for (int i = 0; i < messageNum; i++) { + String correlationId = "corr-" + i; + TestRequest message = new TestRequest("DelayedRPC-" + i); + resultMap.put(correlationId, new AtomicInteger()); + rpcClient.requestAfterAsync(correlationId, message, requestMessageConfigMap, + delayedTime, TimeUnit.MILLISECONDS); + } +``` + +This sends each request immediately to Pulsar but tells the broker to delay actual delivery to the server by 5 seconds. + +6. Verify results after waiting enough time (e.g., using Awaitility in tests): + +```java + long current = System.currentTimeMillis(); + + Awaitility.await().atMost(20, TimeUnit.SECONDS).until(() -> { + AtomicInteger success = new AtomicInteger(); + resultMap.forEach((__, count) -> success.getAndAdd(count.get())); + if (System.currentTimeMillis() - current < delayedTime && success.get() > 0) { + return false; + } + return success.get() >= messageNum * ackNums && System.currentTimeMillis() - current >= delayedTime; + }); + + // Close servers if needed + rpcServer.close(); +``` + +7. Confirm that each request’s callback logic (onReplySuccess, onTimeout, etc.) behaves as expected. Once ackNums replies have been processed (or you decide your “success” threshold), the RPC client can call removeRequest(correlationId) to stop tracking it. + +This flow demonstrates how to schedule or delay Pulsar RPC calls without holding them in memory for the entire delay period, yet still receive normal request–reply handling after the delay expires.