From 4ba599c3ffb3e7760982ca38d2f1305a2501669c Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Wed, 8 Apr 2026 12:51:30 +0200 Subject: [PATCH] [Drain] Expose drain to dofn processElement and onTimer - add missing implementation for SDF and WindowExpiration processContext --- .../java/org/apache/beam/fn/harness/FnApiDoFnRunner.java | 5 +++++ .../fn/harness/SplittablePairWithRestrictionDoFnRunner.java | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 2eb5816a864f..4d7819ce9b42 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2422,6 +2422,11 @@ public BoundedWindow window() { return currentWindow; } + @Override + public CausedByDrain causedByDrain(DoFn doFn) { + return currentTimer.causedByDrain(); + } + @Override public Instant timestamp(DoFn doFn) { return currentTimer.getHoldTimestamp(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java index e83db404cb7e..fc31f7ca1b58 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.ParDoTranslation; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -278,6 +279,11 @@ public PaneInfo paneInfo(DoFn doFn) { return getCurrentElementOrFail().getPaneInfo(); } + @Override + public CausedByDrain causedByDrain(DoFn doFn) { + return getCurrentElementOrFail().causedByDrain(); + } + @Override public Object schemaElement(int index) { SerializableFunction converter =