Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public CommittableCollector(SinkCommitterMetricGroup metricGroup) {
SinkCommitterMetricGroup metricGroup) {
this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
this.metricGroup = metricGroup;
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);
}

private int getNumPending() {
Expand All @@ -82,7 +81,9 @@ private int getNumPending() {
* @return {@link CommittableCollector}
*/
public static <CommT> CommittableCollector<CommT> of(SinkCommitterMetricGroup metricGroup) {
return new CommittableCollector<>(metricGroup);
CommittableCollector<CommT> collector = new CommittableCollector<>(metricGroup);
metricGroup.setCurrentPendingCommittablesGauge(collector::getNumPending);
return collector;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@

package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.MetricNames;

import java.util.concurrent.atomic.AtomicInteger;

/** Util class to create metric groups for SinkV2 tests. */
public class MetricsGroupTestUtils {
Expand All @@ -46,4 +54,88 @@ public static InternalSinkCommitterMetricGroup mockCommitterMetricGroup() {
new UnregisteredMetricsGroup(),
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
}

public static TrackableCommitterMetricGroup mockTrackableCommitterMetricGroup() {
return new TrackableCommitterMetricGroup(
new UnregisteredMetricsGroup(),
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
}

public static class TrackableCommitterMetricGroup extends ProxyMetricGroup<MetricGroup>
implements SinkCommitterMetricGroup {

private final AtomicInteger gaugeCallCount = new AtomicInteger(0);

private final Counter numCommittablesTotal;
private final Counter numCommittablesFailure;
private final Counter numCommittablesRetry;
private final Counter numCommitatblesSuccess;
private final Counter numCommitatblesAlreadyCommitted;
private final OperatorIOMetricGroup operatorIOMetricGroup;

@VisibleForTesting
public TrackableCommitterMetricGroup(
MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
super(parentMetricGroup);
numCommittablesTotal = parentMetricGroup.counter(MetricNames.TOTAL_COMMITTABLES);
numCommittablesFailure = parentMetricGroup.counter(MetricNames.FAILED_COMMITTABLES);
numCommittablesRetry = parentMetricGroup.counter(MetricNames.RETRIED_COMMITTABLES);
numCommitatblesSuccess = parentMetricGroup.counter(MetricNames.SUCCESSFUL_COMMITTABLES);
numCommitatblesAlreadyCommitted =
parentMetricGroup.counter(MetricNames.ALREADY_COMMITTED_COMMITTABLES);

this.operatorIOMetricGroup = operatorIOMetricGroup;
}

public static TrackableCommitterMetricGroup wrap(OperatorMetricGroup operatorMetricGroup) {
return new TrackableCommitterMetricGroup(
operatorMetricGroup, operatorMetricGroup.getIOMetricGroup());
}

@Override
public OperatorIOMetricGroup getIOMetricGroup() {
return operatorIOMetricGroup;
}

@Override
public Counter getNumCommittablesTotalCounter() {
return numCommittablesTotal;
}

@Override
public Counter getNumCommittablesFailureCounter() {
return numCommittablesFailure;
}

@Override
public Counter getNumCommittablesRetryCounter() {
return numCommittablesRetry;
}

@Override
public Counter getNumCommittablesSuccessCounter() {
return numCommitatblesSuccess;
}

@Override
public Counter getNumCommittablesAlreadyCommittedCounter() {
return numCommitatblesAlreadyCommitted;
}

@Override
public void setCurrentPendingCommittablesGauge(
Gauge<Integer> currentPendingCommittablesGauge) {
gaugeCallCount.incrementAndGet();
parentMetricGroup.gauge(
MetricNames.PENDING_COMMITTABLES, currentPendingCommittablesGauge);
}

public int getGaugeCallCount() {
return gaugeCallCount.get();
}

public void resetGaugeCallCount() {
gaugeCallCount.set(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@

package org.apache.flink.streaming.runtime.operators.sink.committables;

import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class CommittableCollectorTest {
private static final SinkCommitterMetricGroup METRIC_GROUP =
MetricsGroupTestUtils.mockCommitterMetricGroup();
private static final MetricsGroupTestUtils.TrackableCommitterMetricGroup METRIC_GROUP =
MetricsGroupTestUtils.mockTrackableCommitterMetricGroup();

@BeforeEach
public void setUp() {
METRIC_GROUP.resetGaugeCallCount();
}

@Test
void testGetCheckpointCommittablesUpTo() {
Expand All @@ -42,4 +47,16 @@ void testGetCheckpointCommittablesUpTo() {

assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2);
}

@Test
void testSetPendingGaugeNotCalledOnCopy() {
final CommittableCollector<Integer> committableCollector =
CommittableCollector.of(METRIC_GROUP);

assertThat(METRIC_GROUP.getGaugeCallCount()).isEqualTo(1);

committableCollector.copy();

assertThat(METRIC_GROUP.getGaugeCallCount()).isEqualTo(1);
}
}