Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/batchmapper/Message.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.numaproj.numaflow.batchmapper;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.Getter;

/** Message is used to wrap the data returned by Mapper. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
Expand All @@ -18,10 +21,15 @@ public class Message {
* @param tags message tags which will be used for conditional forwarding
*/
public Message(byte[] value, String[] keys, String[] tags) {
this(value, keys, tags, (NackOptions) null);
}

private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, nackOptions);
}
}
21 changes: 9 additions & 12 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,15 @@ private void buildAndStreamResponse(
responses.getItems().forEach(message -> {
List<MapOuterClass.MapResponse.Result> mapResponseResult = new ArrayList<>();
message.getItems().forEach(res -> {
mapResponseResult.add(
MapOuterClass.MapResponse.Result
.newBuilder()
.setValue(res.getValue()
== null ? ByteString.EMPTY : ByteString.copyFrom(
res.getValue()))
.addAllKeys(res.getKeys()
== null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
.addAllTags(res.getTags()
== null ? new ArrayList<>() : Arrays.asList(res.getTags()))
.build()
);
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result
.newBuilder()
.setValue(res.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(res.getValue()))
.addAllKeys(res.getKeys() == null ? new ArrayList<>() : Arrays.asList(res.getKeys()))
.addAllTags(res.getTags() == null ? new ArrayList<>() : Arrays.asList(res.getTags()));
if (res.getNackOptions() != null) {
resultBuilder.setNackOptions(res.getNackOptions().toProto());
}
mapResponseResult.add(resultBuilder.build());
});
MapOuterClass.MapResponse singleRequestResponse = MapOuterClass.MapResponse
.newBuilder()
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/numaproj/numaflow/mapper/MapperActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,19 @@ private MapOuterClass.MapResponse buildResponse(MessageList messageList, String
.newBuilder();

messageList.getMessages().forEach(message -> {
responseBuilder.addResults(MapOuterClass.MapResponse.Result.newBuilder()
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder()
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.setMetadata(message.getUserMetadata()
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto())
.build());
== null ? MetadataOuterClass.Metadata.getDefaultInstance() : message.getUserMetadata().toProto());
if (message.getNackOptions() != null) {
resultBuilder.setNackOptions(message.getNackOptions().toProto());
}
responseBuilder.addResults(resultBuilder.build());
});
return responseBuilder.setId(ID).build();
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapper/Message.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapper;

import io.numaproj.numaflow.shared.NackOptions;
import io.numaproj.numaflow.shared.UserMetadata;
import lombok.Getter;

Expand All @@ -11,10 +12,12 @@
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final UserMetadata userMetadata;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys, tags(used for conditional forwarding) and userMetadata
Expand All @@ -25,12 +28,17 @@ public class Message {
* @param userMetadata user metadata, this is used to pass user defined metadata to the next vertex
*/
public Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata) {
this(value, keys, tags, userMetadata, null);
}

private Message(byte[] value, String[] keys, String[] tags, UserMetadata userMetadata, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
// Copy the data using copy constructor to prevent mutation
this.userMetadata = userMetadata == null ? null : new UserMetadata(userMetadata);
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -71,4 +79,14 @@ public Message(byte[] value, String[] keys, String[] tags) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS, null);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, null, nackOptions);
}
}
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Message.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.numaproj.numaflow.mapstreamer;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.Getter;

/** Message is used to wrap the data returned by MapStreamer. */
@Getter
public class Message {
private static final String[] DROP_TAGS = {"U+005C__DROP__"};
private static final String[] NACK_TAGS = {"U+005C__NACK__"};
private final String[] keys;
private final byte[] value;
private final String[] tags;
private final NackOptions nackOptions;

/**
* used to create Message with value, keys and tags(used for conditional forwarding)
Expand All @@ -18,10 +21,15 @@ public class Message {
* @param tags message tags which will be used for conditional forwarding
*/
public Message(byte[] value, String[] keys, String[] tags) {
this(value, keys, tags, (NackOptions) null);
}

private Message(byte[] value, String[] keys, String[] tags, NackOptions nackOptions) {
// defensive copy - once the Message is created, the caller should not be able to modify it.
this.keys = keys == null ? null : keys.clone();
this.value = value == null ? null : value.clone();
this.tags = tags == null ? null : tags.clone();
this.nackOptions = nackOptions;
}

/**
Expand Down Expand Up @@ -51,4 +59,14 @@ public Message(byte[] value, String[] keys) {
public static Message toDrop() {
return new Message(new byte[0], null, DROP_TAGS);
}

/**
* creates a Message that negatively acknowledges the input message, requesting redelivery.
*
* @param nackOptions optional redelivery options (may be null)
* @return the Message which will be nacked
*/
public static Message toNack(NackOptions nackOptions) {
return new Message(new byte[0], null, NACK_TAGS, nackOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ public void send(Message message) {
if (message == null) {
return;
}
MapOuterClass.MapResponse.Result.Builder resultBuilder = MapOuterClass.MapResponse.Result.newBuilder()
.setValue(message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys() == null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags() == null ? new ArrayList<>() : Arrays.asList(message.getTags()));
if (message.getNackOptions() != null) {
resultBuilder.setNackOptions(message.getNackOptions().toProto());
}
MapOuterClass.MapResponse response = MapOuterClass.MapResponse.newBuilder()
.setId(requestID)
.addResults(MapOuterClass.MapResponse.Result.newBuilder()
.setValue(
message.getValue() == null ? ByteString.EMPTY : ByteString.copyFrom(
message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>() : Arrays.asList(message.getKeys()))
.addAllTags(message.getTags()
== null ? new ArrayList<>() : Arrays.asList(message.getTags()))
.build()).build();
.addResults(resultBuilder.build())
.build();
supervisorActor.tell(response, ActorRef.noSender());
}

Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/NackOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.numaproj.numaflow.shared;

import lombok.Builder;
import lombok.Getter;

/**
* NackOptions carries per-message redelivery options for a negative acknowledgement (nack).
* All fields are optional; a null value means unset.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class NackOptions {
/** redelivery delay in milliseconds. */
private final Long delay;
/** maximum number of redelivery attempts. */
private final Integer maxDeliveries;
/** human-readable reason for the nack. */
private final String reason;

/** Converts to the outgoing proto type, setting only the fields that are present. */
public common.NackOptionsOuterClass.NackOptions toProto() {
common.NackOptionsOuterClass.NackOptions.Builder b =
common.NackOptionsOuterClass.NackOptions.newBuilder();
if (delay != null) {
b.setDelay(delay);
}
if (maxDeliveries != null) {
b.setMaxDeliveries(maxDeliveries);
}
if (reason != null) {
b.setReason(reason);
}
return b.build();
}

/** Converts from the incoming proto type. Returns null for null input. */
public static NackOptions fromProto(common.NackOptionsOuterClass.NackOptions p) {
if (p == null) {
return null;
}
NackOptionsBuilder b = NackOptions.newBuilder();
if (p.hasDelay()) {
b.delay(p.getDelay());
}
if (p.hasMaxDeliveries()) {
b.maxDeliveries(p.getMaxDeliveries());
}
if (p.hasReason()) {
b.reason(p.getReason());
}
return b.build();
}
}
25 changes: 20 additions & 5 deletions src/main/java/io/numaproj/numaflow/sinker/Response.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sinker;

import io.numaproj.numaflow.shared.NackOptions;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -20,6 +21,8 @@ public class Response {
private final byte[] serveResponse;
private final Boolean onSuccess;
private final Message onSuccessMessage;
private final Boolean nack;
private final NackOptions nackOptions;

/**
* Static method to create response for successful message processing.
Expand All @@ -28,7 +31,7 @@ public class Response {
* @return Response object with success status
*/
public static Response responseOK(String id) {
return new Response(id, true, null, false, false, null, false, null);
return new Response(id, true, null, false, false, null, false, null, false, null);
}

/**
Expand All @@ -39,7 +42,7 @@ public static Response responseOK(String id) {
* @return Response object with failure status and error message
*/
public static Response responseFailure(String id, String errMsg) {
return new Response(id, false, errMsg, false, false, null, false, null);
return new Response(id, false, errMsg, false, false, null, false, null, false, null);
}

/**
Expand All @@ -50,7 +53,7 @@ public static Response responseFailure(String id, String errMsg) {
* @return Response object with fallback status
*/
public static Response responseFallback(String id) {
return new Response(id, false, null, true, false, null, false, null);
return new Response(id, false, null, true, false, null, false, null, false, null);
}

/**
Expand All @@ -63,7 +66,7 @@ public static Response responseFallback(String id) {
* @return Response object with serve status and serve response
*/
public static Response responseServe(String id, byte[] serveResponse) {
return new Response(id, false, null, false, true, serveResponse, false, null);
return new Response(id, false, null, false, true, serveResponse, false, null, false, null);
}

/**
Expand All @@ -76,6 +79,18 @@ public static Response responseServe(String id, byte[] serveResponse) {
* @return Response object with onSuccess status and onSuccess message
*/
public static Response responseOnSuccess(String id, Message onSuccessMessage) {
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
return new Response(id, false, null, false, false, null, true, onSuccessMessage, false, null);
}

/**
* Static method to create a nack response, indicating the message should be negatively
* acknowledged and redelivered. nackOptions may be null.
*
* @param id id of the message
* @param nackOptions optional redelivery options
* @return Response object with nack status
*/
public static Response responseNack(String id, NackOptions nackOptions) {
return new Response(id, false, null, false, false, null, false, null, true, nackOptions);
}
}
8 changes: 8 additions & 0 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
.setStatus(SinkOuterClass.Status.ON_SUCCESS)
.setOnSuccessMsg(Message.toProto(response.getOnSuccessMessage()))
.build();
} else if (response.getNack() != null && response.getNack()) {
SinkOuterClass.SinkResponse.Result.Builder b = SinkOuterClass.SinkResponse.Result.newBuilder()
.setId(response.getId() == null ? "" : response.getId())
.setStatus(SinkOuterClass.Status.NACK);
if (response.getNackOptions() != null) {
b.setNackOptions(response.getNackOptions().toProto());
}
return b.build();
} else {
// FIXME: Return error when error message is not set?
return SinkOuterClass.SinkResponse.Result.newBuilder()
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.sourcer;

import io.numaproj.numaflow.shared.NackOptions;

import java.util.List;

Expand All @@ -11,4 +12,9 @@ public interface NackRequest {
* @return the list of offsets to be negatively acknowledged.
*/
List<Offset> getOffsets();

/**
* @return the redelivery options for this nack, or null if none were provided.
*/
NackOptions getNackOptions();
}
Loading
Loading