Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

@Override
public String stringKey() {
try {
Expand Down Expand Up @@ -170,6 +174,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

public int getTriggerIndex() {
return triggerIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private StateTags() {}

private interface SystemStateTag<StateT extends State> {
StateTag<StateT> asKind(StateKind kind);

StateKind getKind();
}

/** Create a state tag for the given id and spec. */
Expand Down Expand Up @@ -243,6 +245,16 @@ public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
return typedTag.asKind(StateKind.SYSTEM);
}

/*
* Returns true if the tag is a system internal tag.
*/
public static <StateT extends State> boolean isSystemTagInternal(StateTag<StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
return false;
}
return StateKind.SYSTEM.equals(((SystemStateTag<?>) tag).getKind());
}

public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal(
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
Expand Down Expand Up @@ -358,6 +370,11 @@ public StateTag<StateT> asKind(StateKind kind) {
return new SimpleStateTag<>(id.asKind(kind), spec);
}

@Override
public StateKind getKind() {
return id.kind;
}

@Override
public boolean equals(@Nullable Object other) {
if (!(other instanceof SimpleStateTag)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -154,13 +156,14 @@ public StreamingModeExecutionContext(
String computationId,
ReaderCache readerCache,
Map<String, String> stateNameMap,
WindmillStateCache.ForComputation stateCache,
ForComputation stateCache,
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
boolean throwExceptionOnLargeOutput,
boolean enableWindmillTagEncodingV2) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -171,7 +174,10 @@ public StreamingModeExecutionContext(
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.windmillTagEncoding = WindmillTagEncodingV1.instance();
this.windmillTagEncoding =
enableWindmillTagEncodingV2
? WindmillTagEncodingV2.instance()
: WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
Expand All @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Clear the hold in case a previous iteration of this timer set one.
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ public abstract class WindmillTagEncoding {
/**
* Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark
* hold that is only freed after the timer fires.
*
* @param timerTag tag of the timer that maps to the hold.
*/
public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData);
public abstract ByteString timerHoldTag(
WindmillNamespacePrefix prefix, TimerData timerData, ByteString timerTag);

/**
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public InternedByteString stateTag(StateNamespace namespace, StateTag<?> address

/** {@inheritDoc} */
@Override
public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) {
public ByteString timerHoldTag(
WindmillNamespacePrefix prefix, TimerData timerData, ByteString unusedTimerTag) {
String tagString;
if ("".equals(timerData.getTimerFamilyId())) {
tagString =
Expand Down
Loading
Loading