diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Message.java b/src/main/java/io/numaproj/numaflow/batchmapper/Message.java index 99452369..7f54a159 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Message.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Message.java @@ -1,14 +1,17 @@ package io.numaproj.numaflow.batchmapper; +import io.numaproj.numaflow.shared.NackOptions; import lombok.Getter; /** Message is used to wrap the data returned by Mapper. */ @Getter public class Message { private static final String[] DROP_TAGS = {"U+005C__DROP__"}; + private static final String[] NACK_TAGS = {"U+005C__NACK__"}; private final String[] keys; private final byte[] value; private final String[] tags; + private final NackOptions nackOptions; /** * used to create Message with value, keys and tags(used for conditional forwarding) @@ -18,10 +21,15 @@ public class Message { * @param tags message tags which will be used for conditional forwarding */ public Message(byte[] value, String[] keys, String[] tags) { + this(value, keys, tags, (NackOptions) null); + } + + private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); this.tags = tags == null ? null : tags.clone(); + this.nackOptions = nackOptions; } /** @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) { public static Message toDrop() { return new Message(new byte[0], null, DROP_TAGS); } + + /** + * creates a Message that negatively acknowledges the input message, requesting redelivery. + * + * @param nackOptions optional redelivery options (may be null) + * @return the Message which will be nacked + */ + public static Message toNack(NackOptions nackOptions) { + return new Message(new byte[0], null, NACK_TAGS, nackOptions); + } } diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index 98e2f792..7cf1442d 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -137,18 +137,15 @@ private void buildAndStreamResponse( responses.getItems().forEach(message -> { List mapResponseResult = new ArrayList<>(); message.getItems().forEach(res -> { - mapResponseResult.add( - MapOuterClass.MapResponse.Result - .newBuilder() - .setValue(res.getValue() - == null ? ByteString.EMPTY : ByteString.copyFrom( - res.getValue())) - .addAllKeys(res.getKeys() - == null ? new ArrayList<>() : Arrays.asList(res.getKeys())) - .addAllTags(res.getTags() - == null ? new ArrayList<>() : Arrays.asList(res.getTags())) - .build() - ); + MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result + .newBuilder() + .setValue(res.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(res.getValue())) + .addAllKeys(res.getKeys() == null ? new ArrayList<>() : Arrays.asList(res.getKeys())) + .addAllTags(res.getTags() == null ? new ArrayList<>() : Arrays.asList(res.getTags())); + if (res.getNackOptions() != null) { + resultBuilder.setNackOptions(res.getNackOptions().toProto()); + } + mapResponseResult.add(resultBuilder.build()); }); MapOuterClass.MapResponse singleRequestResponse = MapOuterClass.MapResponse .newBuilder() diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java index a517e816..489fef0d 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapperActor.java @@ -87,7 +87,7 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String .newBuilder(); messageList.getMessages().forEach(message -> { - responseBuilder.addResults(MapOuterClass.MapResponse.Result.newBuilder() + MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder() .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( message.getValue())) .addAllKeys(message.getKeys() @@ -95,8 +95,11 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String .addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) .setMetadata(message.getUserMetadata() - == null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto()) - .build()); + == null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto()); + if (message.getNackOptions() != null) { + resultBuilder.setNackOptions(message.getNackOptions().toProto()); + } + responseBuilder.addResults(resultBuilder.build()); }); return responseBuilder.setId(ID).build(); } diff --git a/src/main/java/io/numaproj/numaflow/mapper/Message.java b/src/main/java/io/numaproj/numaflow/mapper/Message.java index 518a6f62..83c7cfef 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Message.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Message.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.mapper; +import io.numaproj.numaflow.shared.NackOptions; import io.numaproj.numaflow.shared.UserMetadata; import lombok.Getter; @@ -11,10 +12,12 @@ @Getter public class Message { private static final String[] DROP_TAGS = {"U+005C__DROP__"}; + private static final String[] NACK_TAGS = {"U+005C__NACK__"}; private final String[] keys; private final byte[] value; private final String[] tags; private final UserMetadata userMetadata; + private final NackOptions nackOptions; /** * used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata @@ -25,12 +28,17 @@ public class Message { * @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex */ public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) { + this(value, keys, tags, userMetadata, null); + } + + private Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata, NackOptions nackOptions) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); this.tags = tags == null ? null : tags.clone(); // Copy the data using copy constructor to prevent mutation this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata); + this.nackOptions = nackOptions; } /** @@ -71,4 +79,14 @@ public Message(byte[] value, String[] keys, String[] tags) { public static Message toDrop() { return new Message(new byte[0], null, DROP_TAGS, null); } + + /** + * creates a Message that negatively acknowledges the input message, requesting redelivery. + * + * @param nackOptions optional redelivery options (may be null) + * @return the Message which will be nacked + */ + public static Message toNack(NackOptions nackOptions) { + return new Message(new byte[0], null, NACK_TAGS, null, nackOptions); + } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Message.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Message.java index e378fc8a..4632d9e5 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Message.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Message.java @@ -1,14 +1,17 @@ package io.numaproj.numaflow.mapstreamer; +import io.numaproj.numaflow.shared.NackOptions; import lombok.Getter; /** Message is used to wrap the data returned by MapStreamer. */ @Getter public class Message { private static final String[] DROP_TAGS = {"U+005C__DROP__"}; + private static final String[] NACK_TAGS = {"U+005C__NACK__"}; private final String[] keys; private final byte[] value; private final String[] tags; + private final NackOptions nackOptions; /** * used to create Message with value, keys and tags(used for conditional forwarding) @@ -18,10 +21,15 @@ public class Message { * @param tags message tags which will be used for conditional forwarding */ public Message(byte[] value, String[] keys, String[] tags) { + this(value, keys, tags, (NackOptions) null); + } + + private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); this.tags = tags == null ? null : tags.clone(); + this.nackOptions = nackOptions; } /** @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) { public static Message toDrop() { return new Message(new byte[0], null, DROP_TAGS); } + + /** + * creates a Message that negatively acknowledges the input message, requesting redelivery. + * + * @param nackOptions optional redelivery options (may be null) + * @return the Message which will be nacked + */ + public static Message toNack(NackOptions nackOptions) { + return new Message(new byte[0], null, NACK_TAGS, nackOptions); + } } diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java b/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java index 0a008b48..7dfc5b80 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/OutputObserverImpl.java @@ -29,17 +29,17 @@ public void send(Message message) { if (message == null) { return; } + MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder() + .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())) + .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) + .addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())); + if (message.getNackOptions() != null) { + resultBuilder.setNackOptions(message.getNackOptions().toProto()); + } MapOuterClass.MapResponse response = MapOuterClass.MapResponse.newBuilder() .setId(requestID) - .addResults(MapOuterClass.MapResponse.Result.newBuilder() - .setValue( - message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( - message.getValue())) - .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) - .addAllTags(message.getTags() - == null ? new ArrayList<>() : Arrays.asList(message.getTags())) - .build()).build(); + .addResults(resultBuilder.build()) + .build(); supervisorActor.tell(response, ActorRef.noSender()); } diff --git a/src/main/java/io/numaproj/numaflow/shared/NackOptions.java b/src/main/java/io/numaproj/numaflow/shared/NackOptions.java new file mode 100644 index 00000000..12fb35ec --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/NackOptions.java @@ -0,0 +1,53 @@ +package io.numaproj.numaflow.shared; + +import lombok.Builder; +import lombok.Getter; + +/** + * NackOptions carries per-message redelivery options for a negative acknowledgement (nack). + * All fields are optional; a null value means unset. + */ +@Getter +@Builder(builderMethodName = "newBuilder") +public class NackOptions { + /** redelivery delay in milliseconds. */ + private final Long delay; + /** maximum number of redelivery attempts. */ + private final Integer maxDeliveries; + /** human-readable reason for the nack. */ + private final String reason; + + /** Converts to the outgoing proto type, setting only the fields that are present. */ + public common.NackOptionsOuterClass.NackOptions toProto() { + common.NackOptionsOuterClass.NackOptions.Builder b = + common.NackOptionsOuterClass.NackOptions.newBuilder(); + if (delay != null) { + b.setDelay(delay); + } + if (maxDeliveries != null) { + b.setMaxDeliveries(maxDeliveries); + } + if (reason != null) { + b.setReason(reason); + } + return b.build(); + } + + /** Converts from the incoming proto type. Returns null for null input. */ + public static NackOptions fromProto(common.NackOptionsOuterClass.NackOptions p) { + if (p == null) { + return null; + } + NackOptionsBuilder b = NackOptions.newBuilder(); + if (p.hasDelay()) { + b.delay(p.getDelay()); + } + if (p.hasMaxDeliveries()) { + b.maxDeliveries(p.getMaxDeliveries()); + } + if (p.hasReason()) { + b.reason(p.getReason()); + } + return b.build(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/sinker/Response.java b/src/main/java/io/numaproj/numaflow/sinker/Response.java index f7f57673..ab6b16ee 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Response.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Response.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sinker; +import io.numaproj.numaflow.shared.NackOptions; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -20,6 +21,8 @@ public class Response { private final byte[] serveResponse; private final Boolean onSuccess; private final Message onSuccessMessage; + private final Boolean nack; + private final NackOptions nackOptions; /** * Static method to create response for successful message processing. @@ -28,7 +31,7 @@ public class Response { * @return Response object with success status */ public static Response responseOK(String id) { - return new Response(id, true, null, false, false, null, false, null); + return new Response(id, true, null, false, false, null, false, null, false, null); } /** @@ -39,7 +42,7 @@ public static Response responseOK(String id) { * @return Response object with failure status and error message */ public static Response responseFailure(String id, String errMsg) { - return new Response(id, false, errMsg, false, false, null, false, null); + return new Response(id, false, errMsg, false, false, null, false, null, false, null); } /** @@ -50,7 +53,7 @@ public static Response responseFailure(String id, String errMsg) { * @return Response object with fallback status */ public static Response responseFallback(String id) { - return new Response(id, false, null, true, false, null, false, null); + return new Response(id, false, null, true, false, null, false, null, false, null); } /** @@ -63,7 +66,7 @@ public static Response responseFallback(String id) { * @return Response object with serve status and serve response */ public static Response responseServe(String id, byte[] serveResponse) { - return new Response(id, false, null, false, true, serveResponse, false, null); + return new Response(id, false, null, false, true, serveResponse, false, null, false, null); } /** @@ -76,6 +79,18 @@ public static Response responseServe(String id, byte[] serveResponse) { * @return Response object with onSuccess status and onSuccess message */ public static Response responseOnSuccess(String id, Message onSuccessMessage) { - return new Response(id, false, null, false, false, null, true, onSuccessMessage); + return new Response(id, false, null, false, false, null, true, onSuccessMessage, false, null); + } + + /** + * Static method to create a nack response, indicating the message should be negatively + * acknowledged and redelivered. nackOptions may be null. + * + * @param id id of the message + * @param nackOptions optional redelivery options + * @return Response object with nack status + */ + public static Response responseNack(String id, NackOptions nackOptions) { + return new Response(id, false, null, false, false, null, false, null, true, nackOptions); } } diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 3c51df58..822fb14b 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -161,6 +161,14 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) { .setStatus(SinkOuterClass.Status.ON_SUCCESS) .setOnSuccessMsg(Message.toProto(response.getOnSuccessMessage())) .build(); + } else if (response.getNack() != null && response.getNack()) { + SinkOuterClass.SinkResponse.Result.Builder b = SinkOuterClass.SinkResponse.Result.newBuilder() + .setId(response.getId() == null ? "" : response.getId()) + .setStatus(SinkOuterClass.Status.NACK); + if (response.getNackOptions() != null) { + b.setNackOptions(response.getNackOptions().toProto()); + } + return b.build(); } else { // FIXME: Return error when error message is not set? return SinkOuterClass.SinkResponse.Result.newBuilder() diff --git a/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java b/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java index a7c23fd0..dd9534b3 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sourcer; +import io.numaproj.numaflow.shared.NackOptions; import java.util.List; @@ -11,4 +12,9 @@ public interface NackRequest { * @return the list of offsets to be negatively acknowledged. */ List getOffsets(); + + /** + * @return the redelivery options for this nack, or null if none were provided. + */ + NackOptions getNackOptions(); } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java b/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java index 29a423cd..abdeb237 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java @@ -1,5 +1,6 @@ package io.numaproj.numaflow.sourcer; +import io.numaproj.numaflow.shared.NackOptions; import lombok.AllArgsConstructor; import java.util.List; @@ -10,9 +11,15 @@ @AllArgsConstructor class NackRequestImpl implements NackRequest { private final List offsets; + private final NackOptions nackOptions; @Override public List getOffsets() { return this.offsets; } + + @Override + public NackOptions getNackOptions() { + return this.nackOptions; + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 6f76a5d7..ff4cf7da 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -5,6 +5,7 @@ import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.shared.ExceptionUtils; +import io.numaproj.numaflow.shared.NackOptions; import io.numaproj.numaflow.source.v1.SourceGrpc; import io.numaproj.numaflow.source.v1.SourceOuterClass; import lombok.AllArgsConstructor; @@ -198,7 +199,10 @@ public void nackFn( offset.getOffset().toByteArray(), offset.getPartitionId())); } - NackRequest nackRequestImpl = new NackRequestImpl(offsets); + NackOptions nackOptions = nackRequest.getRequest().hasNackOptions() + ? NackOptions.fromProto(nackRequest.getRequest().getNackOptions()) + : null; + NackRequest nackRequestImpl = new NackRequestImpl(offsets, nackOptions); this.sourcer.nack(nackRequestImpl); SourceOuterClass.NackResponse nackResponse = SourceOuterClass.NackResponse .newBuilder() diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java index f1c3244d..183572dc 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Message.java @@ -2,6 +2,7 @@ import java.time.Instant; +import io.numaproj.numaflow.shared.NackOptions; import io.numaproj.numaflow.shared.UserMetadata; import lombok.Getter; @@ -9,11 +10,13 @@ @Getter public class Message { private static final String[] DROP_TAGS = {"U+005C__DROP__"}; + private static final String[] NACK_TAGS = {"U+005C__NACK__"}; private final String[] keys; private final byte[] value; private final Instant eventTime; private final String[] tags; private final UserMetadata userMetadata; + private final NackOptions nackOptions; /** * used to create Message with value, eventTime, keys, tags(used for conditional forwarding) and userMetadata @@ -25,6 +28,10 @@ public class Message { * @param userMetadata user metadata */ public Message(byte[] value, Instant eventTime, String[] keys, String[] tags, UserMetadata userMetadata) { + this(value, eventTime, keys, tags, userMetadata, null); + } + + private Message(byte[] value, Instant eventTime, String[] keys, String[] tags, UserMetadata userMetadata, NackOptions nackOptions) { // defensive copy - once the Message is created, the caller should not be able to modify it. this.keys = keys == null ? null : keys.clone(); this.value = value == null ? null : value.clone(); @@ -32,6 +39,7 @@ public Message(byte[] value, Instant eventTime, String[] keys, String[] tags, Us // The Instant class in Java is already immutable. this.eventTime = eventTime; this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata); + this.nackOptions = nackOptions; } /** @@ -78,4 +86,17 @@ public Message(byte[] value, Instant eventTime, String[] keys) { public static Message toDrop(Instant eventTime) { return new Message(new byte[0], eventTime, null, DROP_TAGS, null); } + + /** + * creates a Message that negatively acknowledges the input message, requesting redelivery. + * eventTime is required: even though the message is nacked, it is considered processed for + * watermark purposes. + * + * @param eventTime message eventTime + * @param nackOptions optional redelivery options (may be null) + * @return the Message which will be nacked + */ + public static Message toNack(Instant eventTime, NackOptions nackOptions) { + return new Message(new byte[0], eventTime, null, NACK_TAGS, null, nackOptions); + } } diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java index 4fd9a7ae..a81f952d 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java @@ -110,22 +110,19 @@ private Sourcetransformer.SourceTransformResponse buildResponse( .newBuilder(); messageList.getMessages().forEach(message -> { - responseBuilder.addResults(Sourcetransformer.SourceTransformResponse.Result.newBuilder() - .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom( - message.getValue())) + Sourcetransformer.SourceTransformResponse.Result.Builder resultBuilder = Sourcetransformer.SourceTransformResponse.Result.newBuilder() + .setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue())) .setEventTime(Timestamp.newBuilder() - .setSeconds(message - .getEventTime() - .getEpochSecond()) + .setSeconds(message.getEventTime().getEpochSecond()) .setNanos(message.getEventTime().getNano())) - .addAllKeys(message.getKeys() - == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) - .addAllTags(message.getTags() - == null ? new ArrayList<>() : Arrays.asList(message.getTags())) - .setMetadata(message.getUserMetadata() - == null ? MetadataOuterClass.Metadata.getDefaultInstance() - : message.getUserMetadata().toProto()) - .build()); + .addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys())) + .addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags())) + .setMetadata(message.getUserMetadata() == null ? MetadataOuterClass.Metadata.getDefaultInstance() + : message.getUserMetadata().toProto()); + if (message.getNackOptions() != null) { + resultBuilder.setNackOptions(message.getNackOptions().toProto()); + } + responseBuilder.addResults(resultBuilder.build()); }); return responseBuilder.setId(ID).build(); } diff --git a/src/main/proto/common/nack_options.proto b/src/main/proto/common/nack_options.proto new file mode 100644 index 00000000..28fd72d6 --- /dev/null +++ b/src/main/proto/common/nack_options.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; +option go_package = "github.com/numaproj/numaflow/pkg/apis/proto/common"; +package common; + +message NackOptions { + optional string reason = 1; + optional uint32 max_deliveries = 2; + optional uint64 delay = 3; +} diff --git a/src/main/proto/map/v1/map.proto b/src/main/proto/map/v1/map.proto index e72acf3e..b337ad92 100644 --- a/src/main/proto/map/v1/map.proto +++ b/src/main/proto/map/v1/map.proto @@ -5,6 +5,7 @@ option java_package = "io.numaproj.numaflow.map.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "common/metadata.proto"; +import "common/nack_options.proto"; package map.v1; @@ -59,6 +60,7 @@ message MapResponse { bytes value = 2; repeated string tags = 3; common.Metadata metadata = 4; + optional common.NackOptions nack_options = 5; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/src/main/proto/sink/v1/sink.proto b/src/main/proto/sink/v1/sink.proto index 4fb516b4..9413066c 100644 --- a/src/main/proto/sink/v1/sink.proto +++ b/src/main/proto/sink/v1/sink.proto @@ -5,6 +5,7 @@ option java_package = "io.numaproj.numaflow.sink.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "common/metadata.proto"; +import "common/nack_options.proto"; package sink.v1; @@ -69,6 +70,7 @@ enum Status { FALLBACK = 2; SERVE = 3; ON_SUCCESS = 4; + NACK = 5; } /** @@ -90,6 +92,7 @@ message SinkResponse { optional bytes serve_response = 4; // on_success_msg is the message to be sent to on_success sink. optional Message on_success_msg = 5; + optional common.NackOptions nack_options = 6; } repeated Result results = 1; optional Handshake handshake = 2; diff --git a/src/main/proto/source/v1/source.proto b/src/main/proto/source/v1/source.proto index aae5a7c4..feb7c6dd 100644 --- a/src/main/proto/source/v1/source.proto +++ b/src/main/proto/source/v1/source.proto @@ -5,6 +5,7 @@ option java_package = "io.numaproj.numaflow.source.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "common/metadata.proto"; +import "common/nack_options.proto"; package source.v1; @@ -152,6 +153,7 @@ message NackRequest { message Request { // Required field holding the offsets to be nacked repeated Offset offsets = 1; + optional common.NackOptions nack_options = 2; } // Required field holding the request. The list will be ordered and will have the same order as the original Read response. Request request = 1; diff --git a/src/main/proto/sourcetransform/v1/sourcetransformer.proto b/src/main/proto/sourcetransform/v1/sourcetransformer.proto index dda7cc6a..658344fe 100644 --- a/src/main/proto/sourcetransform/v1/sourcetransformer.proto +++ b/src/main/proto/sourcetransform/v1/sourcetransformer.proto @@ -5,6 +5,7 @@ option java_package = "io.numaproj.numaflow.sourcetransformer.v1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "common/metadata.proto"; +import "common/nack_options.proto"; package sourcetransformer.v1; @@ -54,6 +55,7 @@ message SourceTransformResponse { google.protobuf.Timestamp event_time = 3; repeated string tags = 4; common.Metadata metadata = 5; + optional common.NackOptions nack_options = 6; } repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerNackTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerNackTest.java new file mode 100644 index 00000000..13c7ac3b --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerNackTest.java @@ -0,0 +1,102 @@ +package io.numaproj.numaflow.batchmapper; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.NackOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ServerNackTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + server = new Server(grpcServerConfig, new NackBatchMapFn(), null, serverName); + server.start(); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void batchMapNack() { + // expect: handshake resp + 1 per-id response + 1 EOT = 3 + BatchMapOutputStreamObserver outputStreamObserver = new BatchMapOutputStreamObserver(3); + StreamObserver in = MapGrpc.newStub(inProcessChannel) + .mapFn(outputStreamObserver); + in.onNext(MapOuterClass.MapRequest.newBuilder() + .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)).build()); + in.onNext(MapOuterClass.MapRequest.newBuilder() + .setRequest(MapOuterClass.MapRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("x")).addKeys("k").build()) + .setId("id-1").build()); + in.onNext(MapOuterClass.MapRequest.newBuilder() + .setStatus(MapOuterClass.TransmissionStatus.newBuilder().setEot(true)).build()); + in.onCompleted(); + try { + outputStreamObserver.done.get(); + } catch (InterruptedException | ExecutionException e) { + fail("Error in getting done signal " + e.getMessage()); + } + List result = outputStreamObserver.getMapResponses(); + MapOuterClass.MapResponse.Result r = result.stream() + .filter(resp -> resp.getResultsCount() > 0) + .findFirst().orElseThrow(() -> new AssertionError("no result")).getResults(0); + assertEquals(Arrays.asList("U+005C__NACK__"), r.getTagsList()); + assertTrue(r.hasNackOptions()); + assertEquals(500L, r.getNackOptions().getDelay()); + assertEquals("retry", r.getNackOptions().getReason()); + } + + private static class NackBatchMapFn extends BatchMapper { + @Override + public BatchResponses processMessage(DatumIterator datumStream) { + BatchResponses batchResponses = new BatchResponses(); + while (true) { + Datum datum; + try { + datum = datumStream.next(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + if (datum == null) { + break; + } + BatchResponse batchResponse = new BatchResponse(datum.getId()); + batchResponse.append(Message.toNack(NackOptions.newBuilder().delay(500L).reason("retry").build())); + batchResponses.append(batchResponse); + } + return batchResponses; + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerNackTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerNackTest.java new file mode 100644 index 00000000..80717156 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerNackTest.java @@ -0,0 +1,84 @@ +package io.numaproj.numaflow.mapper; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.NackOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ServerNackTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + server = new Server(grpcServerConfig, new NackMapFn(), null, serverName); + server.start(); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void mapperNack() { + MapOuterClass.MapRequest handshake = MapOuterClass.MapRequest.newBuilder() + .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)).build(); + MapOuterClass.MapRequest inDatum = MapOuterClass.MapRequest.newBuilder() + .setRequest(MapOuterClass.MapRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("x")).addKeys("k").build()).build(); + + MapOutputStreamObserver responseObserver = new MapOutputStreamObserver(2); + var stub = MapGrpc.newStub(inProcessChannel); + var requestStreamObserver = stub.mapFn(responseObserver); + requestStreamObserver.onNext(handshake); + requestStreamObserver.onNext(inDatum); + try { + responseObserver.done.get(); + } catch (InterruptedException | ExecutionException e) { + fail("Error while waiting for response" + e.getMessage()); + } + List responses = responseObserver.getMapResponses().subList(1, 2); + MapOuterClass.MapResponse.Result r = responses.get(0).getResults(0); + assertEquals(Arrays.asList("U+005C__NACK__"), r.getTagsList()); + assertTrue(r.hasNackOptions()); + assertEquals(500L, r.getNackOptions().getDelay()); + assertEquals("retry", r.getNackOptions().getReason()); + requestStreamObserver.onCompleted(); + } + + private static class NackMapFn extends Mapper { + @Override + public MessageList processMessage(String[] keys, Datum datum) { + return MessageList.newBuilder() + .addMessage(Message.toNack(NackOptions.newBuilder().delay(500L).reason("retry").build())) + .build(); + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceNackTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceNackTest.java new file mode 100644 index 00000000..fad1bf80 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServiceNackTest.java @@ -0,0 +1,82 @@ +package io.numaproj.numaflow.mapstreamer; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.map.v1.MapGrpc; +import io.numaproj.numaflow.map.v1.MapOuterClass; +import io.numaproj.numaflow.shared.NackOptions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ServiceNackTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Service service; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + CompletableFuture shutdownSignal = new CompletableFuture<>(); + service = new Service(new NackMapStreamer(), shutdownSignal); + grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor() + .addService(service).build().start()); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() { + inProcessChannel.shutdownNow(); + } + + @Test + public void mapStreamerNack() { + MapOuterClass.MapRequest handshake = MapOuterClass.MapRequest.newBuilder() + .setHandshake(MapOuterClass.Handshake.newBuilder().setSot(true)).build(); + MapOuterClass.MapRequest inDatum = MapOuterClass.MapRequest.newBuilder() + .setRequest(MapOuterClass.MapRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("x")).addKeys("k").build()).build(); + + // expect: handshake resp + 1 result + 1 EOT = 3 responses + MapStreamOutputStreamObserver responseObserver = new MapStreamOutputStreamObserver(3); + var stub = MapGrpc.newStub(inProcessChannel); + var requestStreamObserver = stub.mapFn(responseObserver); + requestStreamObserver.onNext(handshake); + requestStreamObserver.onNext(inDatum); + try { + responseObserver.done.get(); + } catch (Exception e) { + fail("Error while waiting for response" + e.getMessage()); + } + List responses = responseObserver.getMapResponses(); + MapOuterClass.MapResponse.Result r = responses.stream() + .filter(resp -> resp.getResultsCount() > 0) + .findFirst().orElseThrow(() -> new AssertionError("no result")).getResults(0); + assertEquals(Arrays.asList("U+005C__NACK__"), r.getTagsList()); + assertTrue(r.hasNackOptions()); + assertEquals(500L, r.getNackOptions().getDelay()); + assertEquals("retry", r.getNackOptions().getReason()); + requestStreamObserver.onCompleted(); + } + + private static class NackMapStreamer extends MapStreamer { + @Override + public void processMessage(String[] keys, Datum datum, OutputObserver outputObserver) { + outputObserver.send(Message.toNack(NackOptions.newBuilder().delay(500L).reason("retry").build())); + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/shared/NackOptionsTest.java b/src/test/java/io/numaproj/numaflow/shared/NackOptionsTest.java new file mode 100644 index 00000000..0af26ab3 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/shared/NackOptionsTest.java @@ -0,0 +1,47 @@ +package io.numaproj.numaflow.shared; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class NackOptionsTest { + + @Test + public void toProto_allFields() { + NackOptions n = NackOptions.newBuilder().delay(500L).maxDeliveries(3).reason("retry").build(); + common.NackOptionsOuterClass.NackOptions p = n.toProto(); + assertTrue(p.hasDelay()); + assertEquals(500L, p.getDelay()); + assertTrue(p.hasMaxDeliveries()); + assertEquals(3, p.getMaxDeliveries()); + assertTrue(p.hasReason()); + assertEquals("retry", p.getReason()); + } + + @Test + public void toProto_partialFields() { + NackOptions n = NackOptions.newBuilder().delay(100L).build(); + common.NackOptionsOuterClass.NackOptions p = n.toProto(); + assertTrue(p.hasDelay()); + assertFalse(p.hasMaxDeliveries()); + assertFalse(p.hasReason()); + } + + @Test + public void fromProto_roundTrip() { + common.NackOptionsOuterClass.NackOptions p = common.NackOptionsOuterClass.NackOptions.newBuilder() + .setDelay(500L).setMaxDeliveries(3).setReason("retry").build(); + NackOptions n = NackOptions.fromProto(p); + assertEquals(Long.valueOf(500L), n.getDelay()); + assertEquals(Integer.valueOf(3), n.getMaxDeliveries()); + assertEquals("retry", n.getReason()); + } + + @Test + public void fromProto_null() { + assertNull(NackOptions.fromProto(null)); + } +} diff --git a/src/test/java/io/numaproj/numaflow/sinker/ServerNackTest.java b/src/test/java/io/numaproj/numaflow/sinker/ServerNackTest.java new file mode 100644 index 00000000..023d6b8f --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sinker/ServerNackTest.java @@ -0,0 +1,98 @@ +package io.numaproj.numaflow.sinker; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.NackOptions; +import io.numaproj.numaflow.sink.v1.SinkGrpc; +import io.numaproj.numaflow.sink.v1.SinkOuterClass; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ServerNackTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + server = new Server(grpcServerConfig, new NackSinkFn(), null, serverName); + server.start(); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void sinkerNack() { + SinkOutputStreamObserver outputStreamObserver = new SinkOutputStreamObserver(); + StreamObserver in = + SinkGrpc.newStub(inProcessChannel).sinkFn(outputStreamObserver); + + in.onNext(SinkOuterClass.SinkRequest.newBuilder() + .setHandshake(SinkOuterClass.Handshake.newBuilder().setSot(true).build()).build()); + in.onNext(SinkOuterClass.SinkRequest.newBuilder() + .setRequest(SinkOuterClass.SinkRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("x")).setId("nack-1").build()).build()); + in.onNext(SinkOuterClass.SinkRequest.newBuilder() + .setStatus(SinkOuterClass.TransmissionStatus.newBuilder().setEot(true).build()).build()); + in.onCompleted(); + + while (!outputStreamObserver.completed.get()) { + // busy-wait, matching the existing sinker ServerTest pattern + } + List responses = outputStreamObserver.getSinkResponse(); + SinkOuterClass.SinkResponse.Result r = responses.stream() + .flatMap(resp -> resp.getResultsList().stream()) + .filter(res -> res.getId().equals("nack-1")) + .findFirst().orElseThrow(() -> new AssertionError("no result for nack-1")); + assertEquals(SinkOuterClass.Status.NACK, r.getStatus()); + assertTrue(r.hasNackOptions()); + assertEquals(500L, r.getNackOptions().getDelay()); + assertEquals("retry", r.getNackOptions().getReason()); + } + + private static class NackSinkFn extends Sinker { + @Override + public ResponseList processMessages(DatumIterator datumStream) { + ResponseList.ResponseListBuilder builder = ResponseList.newBuilder(); + while (true) { + Datum datum; + try { + datum = datumStream.next(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + if (datum == null) { + break; + } + builder.addResponse(Response.responseNack( + datum.getId(), + NackOptions.newBuilder().delay(500L).reason("retry").build())); + } + return builder.build(); + } + } +} diff --git a/src/test/java/io/numaproj/numaflow/sourcer/ServerNackOptionsTest.java b/src/test/java/io/numaproj/numaflow/sourcer/ServerNackOptionsTest.java new file mode 100644 index 00000000..e050a281 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sourcer/ServerNackOptionsTest.java @@ -0,0 +1,88 @@ +package io.numaproj.numaflow.sourcer; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.NackOptions; +import io.numaproj.numaflow.source.v1.SourceGrpc; +import io.numaproj.numaflow.source.v1.SourceOuterClass; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ServerNackOptionsTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + private final AtomicReference captured = new AtomicReference<>(); + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + server = new Server(grpcServerConfig, new CapturingSourcer(captured), null, serverName); + server.start(); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void nackFnForwardsOptions() throws Exception { + CountDownLatch done = new CountDownLatch(1); + SourceOuterClass.NackRequest req = SourceOuterClass.NackRequest.newBuilder() + .setRequest(SourceOuterClass.NackRequest.Request.newBuilder() + .addOffsets(SourceOuterClass.Offset.newBuilder() + .setOffset(com.google.protobuf.ByteString.copyFromUtf8("o1")) + .setPartitionId(0).build()) + .setNackOptions(common.NackOptionsOuterClass.NackOptions.newBuilder() + .setDelay(500L).setMaxDeliveries(3).setReason("retry").build()) + .build()) + .build(); + SourceGrpc.newStub(inProcessChannel).nackFn(req, new StreamObserver<>() { + @Override public void onNext(SourceOuterClass.NackResponse v) { } + @Override public void onError(Throwable t) { done.countDown(); } + @Override public void onCompleted() { done.countDown(); } + }); + assertEquals(true, done.await(5, TimeUnit.SECONDS)); + + NackRequest got = captured.get(); + assertNotNull(got); + NackOptions opts = got.getNackOptions(); + assertNotNull(opts); + assertEquals(Long.valueOf(500L), opts.getDelay()); + assertEquals(Integer.valueOf(3), opts.getMaxDeliveries()); + assertEquals("retry", opts.getReason()); + } + + private static class CapturingSourcer extends Sourcer { + private final AtomicReference sink; + CapturingSourcer(AtomicReference sink) { this.sink = sink; } + @Override public void read(ReadRequest request, OutputObserver observer) { } + @Override public void ack(AckRequest request) { } + @Override public void nack(NackRequest request) { sink.set(request); } + @Override public long getPending() { return 0; } + @Override public List getActivePartitions() { return java.util.Collections.emptyList(); } + @Override public Integer getTotalPartitions() { return null; } + } +} diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerNackTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerNackTest.java new file mode 100644 index 00000000..d310e2a1 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerNackTest.java @@ -0,0 +1,89 @@ +package io.numaproj.numaflow.sourcetransformer; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.numaproj.numaflow.shared.NackOptions; +import io.numaproj.numaflow.sourcetransformer.v1.SourceTransformGrpc; +import io.numaproj.numaflow.sourcetransformer.v1.Sourcetransformer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ServerNackTest { + private static final Instant TEST_EVENT_TIME = Instant.ofEpochMilli(1000L); + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private Server server; + private ManagedChannel inProcessChannel; + + @Before + public void setUp() throws Exception { + String serverName = InProcessServerBuilder.generateName(); + GRPCConfig grpcServerConfig = GRPCConfig.newBuilder() + .maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE) + .socketPath(Constants.DEFAULT_SOCKET_PATH) + .infoFilePath("/tmp/numaflow-test-server-info)") + .build(); + server = new Server(grpcServerConfig, new NackTransformer(), null, serverName); + server.start(); + inProcessChannel = grpcCleanup.register( + InProcessChannelBuilder.forName(serverName).directExecutor().build()); + } + + @After + public void tearDown() throws Exception { + server.stop(); + } + + @Test + public void transformerNack() { + Sourcetransformer.SourceTransformRequest handshake = Sourcetransformer.SourceTransformRequest.newBuilder() + .setHandshake(Sourcetransformer.Handshake.newBuilder().setSot(true).build()).build(); + Sourcetransformer.SourceTransformRequest req = Sourcetransformer.SourceTransformRequest.newBuilder() + .setRequest(Sourcetransformer.SourceTransformRequest.Request.newBuilder() + .setValue(ByteString.copyFromUtf8("x")).addKeys("k").build()).build(); + + TransformerOutputStreamObserver responseObserver = new TransformerOutputStreamObserver(2); + var stub = SourceTransformGrpc.newStub(inProcessChannel); + var requestStreamObserver = stub.sourceTransformFn(responseObserver); + requestStreamObserver.onNext(handshake); + requestStreamObserver.onNext(req); + try { + responseObserver.done.get(); + } catch (InterruptedException | ExecutionException e) { + fail("Error while waiting for response" + e.getMessage()); + } + List responses = responseObserver.getResponses().subList(1, 2); + Sourcetransformer.SourceTransformResponse.Result r = responses.get(0).getResults(0); + assertEquals(Arrays.asList("U+005C__NACK__"), r.getTagsList()); + assertEquals(TEST_EVENT_TIME.getEpochSecond(), r.getEventTime().getSeconds()); + assertTrue(r.hasNackOptions()); + assertEquals(500L, r.getNackOptions().getDelay()); + assertEquals("retry", r.getNackOptions().getReason()); + requestStreamObserver.onCompleted(); + } + + private static class NackTransformer extends SourceTransformer { + @Override + public MessageList processMessage(String[] keys, Datum datum) { + return MessageList.newBuilder() + .addMessage(Message.toNack(TEST_EVENT_TIME, + NackOptions.newBuilder().delay(500L).reason("retry").build())) + .build(); + } + } +}