Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ message RemoteGrpcPort {
service BeamFnControl {
// Instructions sent by the runner to the SDK requesting different types
// of work.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc Control(
// A stream of responses to instructions the SDK was asked to be
// performed.
Expand All @@ -130,6 +133,9 @@ service BeamFnControl {

// Used to get the full process bundle descriptors for bundles one
// is asked to process.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc GetProcessBundleDescriptor(GetProcessBundleDescriptorRequest) returns (
ProcessBundleDescriptor) {}
}
Expand Down Expand Up @@ -416,14 +422,22 @@ message ProcessBundleRequest {
// at https://s.apache.org/beam-fn-api-control-data-embedding.
Elements elements = 3;

// indicates that the runner has no stare for the keys in this bundle
// Indicates that the runner has no state for the keys in this bundle
// so SDk can safely begin stateful processing with a locally-generated
// initial empty state
// initial empty state.
bool has_no_state = 4;

// indicates that the runner will never process another bundle for the keys
// Indicates that the runner will never process another bundle for the keys
// in this bundle so state need not be included in the bundle commit.
bool only_bundle_for_keys = 5;

// (Optional) If non-empty, the ID of the data stream to use for all data
// requests related to this bundle. See comments at BeamFnData.Data for
// more details.
//
// The runner should only populate this field if the sdk advertises the
// beam:protocol:named_data_streams:v1 capability.
string data_stream_id = 6;
}

message ProcessBundleResponse {
Expand Down Expand Up @@ -834,7 +848,15 @@ message Elements {

// Stable
service BeamFnData {
// Used to send data between harnesses.
// Used to send data between harnesses. Sdks default to using an unnamed data stream
// (without "data_stream_id" header value) for bundles unless the runner requests another named stream to be
// used for a bundle. SDKs can advertise that they support named data streams with the capability
// `beam:protocol:named_data_streams:v1`.
//
// Header metadata has the specified keys pairs:
// - "worker_id": value is the id of the sdk
// - "data_stream_id": value is the id of the data stream, distinguishing it from other data streams from the same
// sdk. This field should only be populated if requested in a received ProcessBundleRequest from the runner.
rpc Data(
// A stream of data representing input.
stream Elements)
Expand Down Expand Up @@ -900,6 +922,9 @@ message StateResponse {

service BeamFnState {
// Used to get/append/clear state stored by the runner on behalf of the SDK.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc State(
// A stream of state instructions requested of the runner.
stream StateRequest)
Expand Down Expand Up @@ -1295,6 +1320,9 @@ message LogControl {}
service BeamFnLogging {
// Allows for the SDK to emit log entries which the runner can
// associate with the active job.
//
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc Logging(
// A stream of log entries batched into lists emitted by the SDK harness.
stream LogEntry.List)
Expand Down Expand Up @@ -1356,6 +1384,8 @@ message WorkerStatusResponse {

// API for SDKs to report debug-related statuses to runner during pipeline execution.
service BeamFnWorkerStatus {
// Header metadata has the specified keys pairs:
// - "worker_id": the id of the sdk
rpc WorkerStatus (stream WorkerStatusResponse)
returns (stream WorkerStatusRequest) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,10 @@ message StandardProtocols {
// Indicates whether the SDK supports multimap state.
MULTIMAP_STATE = 12
[(beam_urn) = "beam:protocol:multimap_state:v1"];

// Indicates whether the SDK supports data stream ids being requested by the runner in
// ProcessBundleRequests.
NAMED_DATA_STREAMS = 13 [(beam_urn) = "beam:protocol:named_data_streams:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public ActiveBundle newBundle(
ImmutableMap.Builder<LogicalEndpoint, FnDataReceiver<?>> receiverBuilder =
ImmutableMap.builder();
BeamFnDataOutboundAggregator beamFnDataOutboundAggregator =
fnApiDataService.createOutboundAggregator(() -> bundleId, false);
fnApiDataService.createOutboundAggregator(bundleId, false);
for (RemoteInputDestination remoteInput : remoteInputs) {
LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, remoteInput.getPTransformId());
receiverBuilder.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.fnexecution.data;

import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
Expand Down Expand Up @@ -69,5 +68,5 @@ public interface FnDataService {
* <p>The returned aggregator is not thread safe.
*/
BeamFnDataOutboundAggregator createOutboundAggregator(
Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes);
String processBundleId, boolean collectElementsIfNoFlushes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
Expand Down Expand Up @@ -175,13 +174,13 @@ public void unregisterReceiver(String instructionId) {

@Override
public BeamFnDataOutboundAggregator createOutboundAggregator(
Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes) {
String instructionId, boolean collectElementsIfNoFlushes) {
try {
return new BeamFnDataOutboundAggregator(
options,
processBundleRequestIdSupplier,
connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver(),
collectElementsIfNoFlushes);
BeamFnDataOutboundAggregator aggregator =
new BeamFnDataOutboundAggregator(options, collectElementsIfNoFlushes);
aggregator.prepareForInstruction(
instructionId, connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver());
return aggregator;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testMessageReceivedBySingleClientWhenThereAreMultipleClients() throw
for (int i = 0; i < 3; ++i) {
final String instructionId = Integer.toString(i);
BeamFnDataOutboundAggregator aggregator =
service.createOutboundAggregator(() -> instructionId, false);
service.createOutboundAggregator(instructionId, false);
aggregator.start();
FnDataReceiver<WindowedValue<String>> consumer =
aggregator.registerOutputDataLocation(TRANSFORM_ID, CODER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
private final Cache</*instructionId=*/ String, /*unused=*/ Boolean> poisonedInstructionIds;

private static class PoisonedException extends RuntimeException {
public PoisonedException() {
private PoisonedException() {
super("Instruction poisoned");
}
};
Expand Down
Loading
Loading