Skip to content

Commit c6ab2f5

Browse files
committed
rename field
1 parent 16e646b commit c6ab2f5

4 files changed

Lines changed: 16 additions & 9 deletions

File tree

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ static void ensureEOSSupport() {
162162
@Override
163163
public PCollection<Void> expand(PCollection<ProducerRecord<K, V>> input) {
164164
String topic = Preconditions.checkStateNotNull(spec.getTopic());
165-
int numElements = spec.getNumElements();
166-
Duration timeout = spec.getTimeout();
165+
int numElements = spec.getEosTriggerNumElements();
166+
Duration timeout = spec.getEosTriggerTimeout();
167167
int numShards = spec.getNumShards();
168168
if (numShards <= 0) {
169169
try (Consumer<?, ?> consumer = openConsumer(spec)) {

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -648,8 +648,8 @@ public static <K, V> WriteRecords<K, V> writeRecords() {
648648
return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()
649649
.setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)
650650
.setEOS(false)
651-
.setNumElements(100)
652-
.setTimeout(Duration.standardSeconds(1))
651+
.setEosTriggerNumElements(100)
652+
.setEosTriggerTimeout(Duration.standardSeconds(1))
653653
.setNumShards(0)
654654
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
655655
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
@@ -3187,9 +3187,9 @@ public abstract static class WriteRecords<K, V>
31873187
@Pure
31883188
public abstract boolean isEOS();
31893189

3190-
public abstract int getNumElements();
3190+
public abstract int getEosTriggerNumElements();
31913191

3192-
public abstract Duration getTimeout();
3192+
public abstract Duration getEosTriggerTimeout();
31933193

31943194
@Pure
31953195
public abstract @Nullable String getSinkGroupId();
@@ -3227,9 +3227,9 @@ abstract Builder<K, V> setPublishTimestampFunction(
32273227

32283228
abstract Builder<K, V> setEOS(boolean eosEnabled);
32293229

3230-
abstract Builder<K, V> setNumElements(int numElements);
3230+
abstract Builder<K, V> setEosTriggerNumElements(int numElements);
32313231

3232-
abstract Builder<K, V> setTimeout(Duration timeout);
3232+
abstract Builder<K, V> setEosTriggerTimeout(Duration timeout);
32333233

32343234
abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
32353235

@@ -3381,7 +3381,7 @@ public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {
33813381
public WriteRecords<K, V> withEOSTriggerConfig(int numElements, Duration timeout) {
33823382
checkArgument(numElements >= 1, "numElements should be >= 1");
33833383
checkArgument(timeout != null, "timeout is required for exactly-once sink");
3384-
return toBuilder().setNumElements(numElements).setTimeout(timeout).build();
3384+
return toBuilder().setEosTriggerNumElements(numElements).setEosTriggerTimeout(timeout).build();
33853385
}
33863386

33873387
/**

sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,8 @@ static class KafkaIOWriteTranslator implements TransformPayloadTranslator<Write<
479479
.addNullableByteArrayField("producer_factory_fn")
480480
.addNullableByteArrayField("publish_timestamp_fn")
481481
.addBooleanField("eos")
482+
.addInt32Field("eos_trigger_num_elements")
483+
.addInt64Field("eos_trigger_timeout_ms")
482484
.addInt32Field("num_shards")
483485
.addNullableStringField("sink_group_id")
484486
.addNullableByteArrayField("consumer_factory_fn")
@@ -547,6 +549,9 @@ public Row toConfigRow(Write<?, ?> transform) {
547549
}
548550

549551
fieldValues.put("eos", writeRecordsTransform.isEOS());
552+
fieldValues.put(
553+
"eos_trigger_timeout_ms", writeRecordsTransform.getEosTriggerTimeout().getMillis());
554+
fieldValues.put("eos_trigger_num_elements", writeRecordsTransform.getEosTriggerNumElements());
550555
fieldValues.put("num_shards", writeRecordsTransform.getNumShards());
551556

552557
if (writeRecordsTransform.getSinkGroupId() != null) {

sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public class KafkaIOTranslationTest {
9494
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getValueSerializer", "value_serializer");
9595
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getPublishTimestampFunction", "publish_timestamp_fn");
9696
WRITE_TRANSFORM_SCHEMA_MAPPING.put("isEOS", "eos");
97+
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getEosTriggerTimeout", "eos_trigger_timeout_ms");
98+
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getEosTriggerNumElements", "eos_trigger_num_elements");
9799
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSinkGroupId", "sink_group_id");
98100
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getNumShards", "num_shards");
99101
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn");

0 commit comments

Comments
 (0)