Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 342 additions & 0 deletions pcip/pcip-4.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -88,6 +89,62 @@ default CompletableFuture<V> requestAsync(String correlationId, T value) {
*/
CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config);

/**
* Deliver the message only at or after the specified absolute timestamp.
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server.
* @return A CompletableFuture that will complete with the reply value.
*/
default CompletableFuture<V> requestAtAsync(String correlationId, T value, long timestamp) {
return requestAtAsync(correlationId, value, Collections.emptyMap(), timestamp);
}

/**
* Deliver the message only at or after the specified absolute timestamp.
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param config Configuration map for creating a request producer,
* will call {@link TypedMessageBuilder#loadConf(Map)}
* @param timestamp Absolute timestamp indicating when the message should be delivered to rpc-server.
* @return A CompletableFuture that will complete with the reply value.
*/
CompletableFuture<V> requestAtAsync(String correlationId, T value, Map<String, Object> config,
long timestamp);

/**
* Request to deliver the message only after the specified relative delay.
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param delay The amount of delay before the message will be delivered.
* @param unit The time unit for the delay.
* @return A CompletableFuture that will complete with the reply value.
*/
default CompletableFuture<V> requestAfterAsync(String correlationId, T value, long delay, TimeUnit unit) {
return requestAfterAsync(correlationId, value, Collections.emptyMap(), delay, unit);
}

/**
* Request to deliver the message only after the specified relative delay.
* Asynchronously sends a request and returns a future that completes with the reply.
*
* @param correlationId A unique identifier for the request.
* @param value The value used to generate the request message
* @param config Configuration map for creating a request producer,
* will call {@link TypedMessageBuilder#loadConf(Map)}
* @param delay The amount of delay before the message will be delivered.
* @param unit The time unit for the delay.
* @return A CompletableFuture that will complete with the reply value.
*/
CompletableFuture<V> requestAfterAsync(String correlationId, T value, Map<String, Object> config,
long delay, TimeUnit unit);

/**
* Removes a request from the tracking map based on its correlation ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.pulsar.rpc.contrib.client;

import static lombok.AccessLevel.PACKAGE;
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -104,6 +105,7 @@ public void close() throws PulsarRpcClientException {
}
}

@Override
public V request(String correlationId, T value, Map<String, Object> config) throws PulsarRpcClientException {
try {
return requestAsync(correlationId, value, config).get();
Expand All @@ -113,36 +115,80 @@ public V request(String correlationId, T value, Map<String, Object> config) thro
}
}

@Override
public CompletableFuture<V> requestAsync(String correlationId, T value, Map<String, Object> config) {
return internalRequest(correlationId, value, config, -1, -1, null);
}

@Override
public CompletableFuture<V> requestAtAsync(String correlationId, T value, Map<String, Object> config,
long timestamp) {
return internalRequest(correlationId, value, config, timestamp, -1, null);
}

@Override
public CompletableFuture<V> requestAfterAsync(String correlationId, T value, Map<String, Object> config,
long delay, TimeUnit unit) {
return internalRequest(correlationId, value, config, -1, delay, unit);
}

private CompletableFuture<V> internalRequest(String correlationId, T value, Map<String, Object> config,
long timestamp, long delay, TimeUnit unit) {
CompletableFuture<V> replyFuture = new CompletableFuture<>();
long replyTimeoutMillis = replyTimeout.toMillis();
replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS)
.exceptionally(e -> {
replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage()));
callback.onTimeout(correlationId, e);
removeRequest(correlationId);
return null;
});
pendingRequestsMap.put(correlationId, replyFuture);

TypedMessageBuilder<T> requestMessage = newRequestMessage(correlationId, value, config);
if (timestamp == -1 && delay == -1) {
replyFuture.orTimeout(replyTimeoutMillis, TimeUnit.MILLISECONDS)
.exceptionally(e -> {
replyFuture.completeExceptionally(new PulsarRpcClientException(e.getMessage()));
callback.onTimeout(correlationId, e);
removeRequest(correlationId);
return null;
});
pendingRequestsMap.put(correlationId, replyFuture);

sender.sendRequest(requestMessage, replyTimeoutMillis)
.thenAccept(requestMessageId -> {
if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) {
sender.sendRequest(requestMessage, replyTimeoutMillis)
.thenAccept(requestMessageId -> {
if (replyFuture.isCancelled() || replyFuture.isCompletedExceptionally()) {
removeRequest(correlationId);
} else {
callback.onSendRequestSuccess(correlationId, requestMessageId);
}
}).exceptionally(ex -> {
if (callback != null) {
callback.onSendRequestError(correlationId, ex, replyFuture);
} else {
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
}
removeRequest(correlationId);
} else {
callback.onSendRequestSuccess(correlationId, requestMessageId);
}
}).exceptionally(ex -> {
if (callback != null) {
callback.onSendRequestError(correlationId, ex, replyFuture);
} else {
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
}
removeRequest(correlationId);
return null;
});
return null;
});
} else {
// Handle Delayed RPC.
if (pendingRequestsMap.containsKey(correlationId)) {
removeRequest(correlationId);
}

if (timestamp > 0) {
requestMessage.property(REQUEST_DELIVER_AT_TIME, String.valueOf(timestamp));
requestMessage.deliverAt(timestamp);
} else if (delay > 0 && unit != null) {
String delayedAt = String.valueOf(System.currentTimeMillis() + unit.toMillis(delay));
requestMessage.property(REQUEST_DELIVER_AT_TIME, delayedAt);
requestMessage.deliverAfter(delay, unit);
}
sender.sendRequest(requestMessage, replyTimeoutMillis).thenAccept(requestMessageId -> {
callback.onSendRequestSuccess(correlationId, requestMessageId);
}).exceptionally(ex -> {
if (callback != null) {
callback.onSendRequestError(correlationId, ex, replyFuture);
} else {
replyFuture.completeExceptionally(new PulsarRpcClientException(ex.getMessage()));
}
return null;
});
}

return replyFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package org.apache.pulsar.rpc.contrib.client;

import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE;
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AccessLevel;
Expand Down Expand Up @@ -50,17 +52,19 @@ class ReplyListener<V> implements MessageListener<V> {
public void received(Consumer<V> consumer, Message<V> msg) {
String correlationId = msg.getKey();
try {
if (!pendingRequestsMap.containsKey(correlationId)) {
if (!pendingRequestsMap.containsKey(correlationId) && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) {
log.warn("[{}] [{}] No pending request found for correlationId {}."
+ " This may indicate the message has already been processed or timed out.",
consumer.getTopic(), consumer.getConsumerName(), correlationId);
} else {
CompletableFuture<V> future = pendingRequestsMap.get(correlationId);
CompletableFuture<V> future = pendingRequestsMap.computeIfAbsent(correlationId,
key -> new CompletableFuture<>());
String errorMessage = msg.getProperty(ERROR_MESSAGE);
String serverSub = msg.getProperty(SERVER_SUB);
if (errorMessage != null) {
callBack.onReplyError(correlationId, consumer.getSubscription(), errorMessage, future);
callBack.onReplyError(correlationId, serverSub, errorMessage, future);
} else {
callBack.onReplySuccess(correlationId, consumer.getSubscription(), msg.getValue(), future);
callBack.onReplySuccess(correlationId, serverSub, msg.getValue(), future);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ public class Constants {
public static final String REPLY_TOPIC = "replyTopic";
public static final String ERROR_MESSAGE = "errorMessage";
public static final String SERVER_SUB = "serverSub";
public static final String REQUEST_DELIVER_AT_TIME = "deliverAt";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.pulsar.rpc.contrib.server;

import static org.apache.pulsar.rpc.contrib.common.Constants.ERROR_MESSAGE;
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
import static org.apache.pulsar.rpc.contrib.common.Constants.SERVER_SUB;
import java.util.function.BiConsumer;
import lombok.AccessLevel;
Expand Down Expand Up @@ -50,8 +51,9 @@ class ReplySender<T, V> {
* @param sub The subscriber name involved in this interaction.
*/
@SneakyThrows
void sendReply(String topic, String correlationId, V reply, T value, String sub) {
onSend(topic, correlationId, msg -> msg.value(reply), value, sub);
void sendReply(String topic, String correlationId, V reply, T value, String sub,
long delayedAt) {
onSend(topic, correlationId, msg -> msg.value(reply), value, sub, delayedAt);
}

/**
Expand All @@ -64,8 +66,10 @@ void sendReply(String topic, String correlationId, V reply, T value, String sub)
* @param sub The subscriber name involved in this interaction.
*/
@SneakyThrows
void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub) {
onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null), value, sub);
void sendErrorReply(String topic, String correlationId, String errorMessage, T value, String sub,
long delayedAt) {
onSend(topic, correlationId, msg -> msg.property(ERROR_MESSAGE, errorMessage).value(null),
value, sub, delayedAt);
}

/**
Expand All @@ -79,17 +83,17 @@ void sendErrorReply(String topic, String correlationId, String errorMessage, T v
* @param sub The subscriber name to be included in the message metadata.
*/
@SneakyThrows
void onSend(String topic,
String correlationId,
java.util.function.Consumer<TypedMessageBuilder<V>> consumer,
T value,
String sub) {
void onSend(String topic, String correlationId, java.util.function.Consumer<TypedMessageBuilder<V>> consumer,
T value, String sub, long delayedAt) {
log.debug("Sending {}", correlationId);
Producer<V> producer = pool.borrowObject(topic);
try {
TypedMessageBuilder<V> builder = producer.newMessage()
.key(correlationId)
.property(SERVER_SUB, sub);
if (delayedAt > 0) {
builder.property(REQUEST_DELIVER_AT_TIME, String.valueOf(delayedAt));
}
consumer.accept(builder);
builder.sendAsync()
.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.pulsar.rpc.contrib.server;

import static org.apache.pulsar.rpc.contrib.common.Constants.REPLY_TOPIC;
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_DELIVER_AT_TIME;
import static org.apache.pulsar.rpc.contrib.common.Constants.REQUEST_TIMEOUT_MILLIS;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -53,7 +54,7 @@ class RequestListener<T, V> implements MessageListener<T> {
public void received(Consumer<T> consumer, Message<T> msg) {
long replyTimeout = Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS))
- (System.currentTimeMillis() - msg.getPublishTime());
if (replyTimeout <= 0) {
if (replyTimeout <= 0 && !msg.hasProperty(REQUEST_DELIVER_AT_TIME)) {
consumer.acknowledgeAsync(msg);
return;
}
Expand All @@ -62,12 +63,15 @@ public void received(Consumer<T> consumer, Message<T> msg) {
String requestSubscription = consumer.getSubscription();
String replyTopic = msg.getProperty(REPLY_TOPIC);
T value = msg.getValue();

long delayedAt = msg.hasProperty(REQUEST_DELIVER_AT_TIME)
? Long.parseLong(msg.getProperty(REQUEST_DELIVER_AT_TIME))
+ Long.parseLong(msg.getProperty(REQUEST_TIMEOUT_MILLIS))
: 0;
try {
requestFunction.apply(value)
.orTimeout(replyTimeout, TimeUnit.MILLISECONDS)
.thenAccept(reply -> {
sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription);
sender.sendReply(replyTopic, correlationId, reply, value, requestSubscription, delayedAt);
})
.get();
} catch (ExecutionException e) {
Expand All @@ -79,7 +83,7 @@ public void received(Consumer<T> consumer, Message<T> msg) {
log.error("[{}] Error processing request", correlationId, e);
sender.sendErrorReply(replyTopic, correlationId,
cause.getClass().getName() + ": " + cause.getMessage(),
value, requestSubscription);
value, requestSubscription, delayedAt);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Loading