From 1163c115bc045b71550253decdd4d5357b90cfd4 Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sat, 30 May 2026 11:45:20 +0530 Subject: [PATCH 1/7] [Python] Honor disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics experiments Mirrors the Java SDK Metrics.MetricsFlag behavior so high-throughput Python pipelines can opt out of user metric kinds that add pressure to metric backends. Adds a process-wide MetricsFlag singleton initialized once at worker harness startup from pipeline experiments. When a flag is set, the corresponding Delegating* class short-circuits its update path so no MetricCell is touched and no monitoring info is emitted. For #38746. --- sdks/python/apache_beam/metrics/metric.py | 75 ++++++++++++++++++- .../runners/worker/sdk_worker_main.py | 2 + 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index b15237cae020..1b15e939aca1 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -46,11 +46,13 @@ from apache_beam.metrics.metricbase import Histogram from apache_beam.metrics.metricbase import MetricName from apache_beam.metrics.metricbase import StringSet +from apache_beam.options.pipeline_options import DebugOptions if TYPE_CHECKING: from apache_beam.internal.metrics.metric import MetricLogger from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils.histogram import BucketType __all__ = ['Metrics', 'MetricsFilter', 'Lineage'] @@ -58,6 +60,56 @@ _LOGGER = logging.getLogger(__name__) +class MetricsFlag(object): + """Process-wide flags controlling which user metric kinds are emitted. + + Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read + once at worker harness initialization from pipeline experiments and apply + for the lifetime of the worker. + """ + _counter_disabled = False + _string_set_disabled = False + _bounded_trie_disabled = False + _initialized = False + + @classmethod + def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: + """Initialize the flags from pipeline experiments. Idempotent.""" + if cls._initialized: + return + debug_options = options.view_as(DebugOptions) + if debug_options.lookup_experiment('disableCounterMetrics'): + cls._counter_disabled = True + _LOGGER.info('Counter metrics are disabled.') + if debug_options.lookup_experiment('disableStringSetMetrics'): + cls._string_set_disabled = True + _LOGGER.info('StringSet metrics are disabled.') + if debug_options.lookup_experiment('disableBoundedTrieMetrics'): + cls._bounded_trie_disabled = True + _LOGGER.info('BoundedTrie metrics are disabled.') + cls._initialized = True + + @classmethod + def reset(cls) -> None: + """Reset flags. Test-only.""" + cls._counter_disabled = False + cls._string_set_disabled = False + cls._bounded_trie_disabled = False + cls._initialized = False + + @classmethod + def counter_disabled(cls) -> bool: + return cls._counter_disabled + + @classmethod + def string_set_disabled(cls) -> bool: + return cls._string_set_disabled + + @classmethod + def bounded_trie_disabled(cls) -> bool: + return cls._bounded_trie_disabled + + class Metrics(object): """Lets users create/access metric objects during pipeline execution.""" @staticmethod @@ -204,12 +256,17 @@ class DelegatingCounter(Counter): def __init__( self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.inc = MetricUpdater( # type: ignore[method-assign] + self._updater = MetricUpdater( cells.CounterCell, metric_name, default_value=1, process_wide=process_wide) + def inc(self, n: int = 1) -> None: + if MetricsFlag.counter_disabled(): + return + self._updater(n) + class DelegatingDistribution(Distribution): """Metrics Distribution Delegates functionality to MetricsEnvironment.""" def __init__( @@ -231,13 +288,23 @@ class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] + self._updater = MetricUpdater(cells.StringSetCell, metric_name) + + def add(self, value: str) -> None: + if MetricsFlag.string_set_disabled(): + return + self._updater(value) class DelegatingBoundedTrie(BoundedTrie): - """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" + """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] + self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) + + def add(self, value) -> None: + if MetricsFlag.bounded_trie_disabled(): + return + self._updater(value) class MetricResults(object): diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 754a631eaf33..58beda96d63d 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -33,6 +33,7 @@ from apache_beam.internal import pickler from apache_beam.io import filesystems +from apache_beam.metrics import metric from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) + metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options) pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library pickler.set_library(pickle_library) From 93c99a9d596e67f2e5f88def6d6d981018fa54e6 Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sat, 30 May 2026 11:45:37 +0530 Subject: [PATCH 2/7] [Python] Add unit tests for MetricsFlag Ports Java MetricsTest.testMetricsFlag and adds three smoke tests confirming the disabled Counter / StringSet / BoundedTrie short-circuit without raising when called on a no-current-tracker path. For #38746. --- .../python/apache_beam/metrics/metric_test.py | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index ae66200737b5..6e44fc45570c 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -32,7 +32,9 @@ from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter +from apache_beam.metrics.metric import MetricsFlag from apache_beam.metrics.metricbase import MetricName +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner from apache_beam.runners.worker import statesampler from apache_beam.testing.metric_result_matchers import DistributionMatcher @@ -121,6 +123,78 @@ def test_get_namespace_error(self): with self.assertRaises(ValueError): Metrics.get_namespace(object()) + def test_metrics_flag(self): + """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments.""" + MetricsFlag.reset() + self.assertFalse(MetricsFlag.counter_disabled()) + self.assertFalse(MetricsFlag.string_set_disabled()) + self.assertFalse(MetricsFlag.bounded_trie_disabled()) + + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled()) + self.assertFalse(MetricsFlag.string_set_disabled()) + self.assertFalse(MetricsFlag.bounded_trie_disabled()) + + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled()) + self.assertTrue(MetricsFlag.string_set_disabled()) + self.assertFalse(MetricsFlag.bounded_trie_disabled()) + + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled()) + self.assertFalse(MetricsFlag.string_set_disabled()) + self.assertTrue(MetricsFlag.bounded_trie_disabled()) + + MetricsFlag.reset() + options = PipelineOptions([ + '--experiments=disableCounterMetrics', + '--experiments=disableStringSetMetrics', + '--experiments=disableBoundedTrieMetrics', + ]) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled()) + self.assertTrue(MetricsFlag.string_set_disabled()) + self.assertTrue(MetricsFlag.bounded_trie_disabled()) + + MetricsFlag.reset() + + def test_disabled_counter_is_noop(self): + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + try: + counter = Metrics.counter('ns', 'disabled_counter') + counter.inc() + counter.inc(5) + counter.dec() + finally: + MetricsFlag.reset() + + def test_disabled_string_set_is_noop(self): + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + try: + string_set = Metrics.string_set('ns', 'disabled_set') + string_set.add('value') + finally: + MetricsFlag.reset() + + def test_disabled_bounded_trie_is_noop(self): + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + try: + bounded_trie = Metrics.bounded_trie('ns', 'disabled_trie') + bounded_trie.add(['a', 'b']) + finally: + MetricsFlag.reset() + def test_counter_empty_name(self): with self.assertRaises(ValueError): Metrics.counter("namespace", "") From f860d86b3fd57d443250bc132e1713b070ed0e63 Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sat, 30 May 2026 11:49:59 +0530 Subject: [PATCH 3/7] [Python] Note disable*Metrics support in CHANGES.md For #38746. --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 74209bb7499c..252684c416ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Python SDK now honors the `disableCounterMetrics`, `disableStringSetMetrics`, and `disableBoundedTrieMetrics` pipeline experiments to opt out of emitting the corresponding user metric kinds, matching Java SDK behavior ([#38746](https://github.com/apache/beam/issues/38746)). ## Breaking Changes From 222c8d6c3f1eb3db6d3768361169ecdf936d8ed5 Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sat, 30 May 2026 12:20:37 +0530 Subject: [PATCH 4/7] [Python] Use public class attrs on MetricsFlag for hot-path reads Drops the classmethod getter wrapper around each disabled flag and exposes counter_disabled / string_set_disabled / bounded_trie_disabled directly as public class attributes. The gate runs on every metric update, so swapping a descriptor lookup + function call for a single attribute load matters in exactly the high-throughput pipelines these experiments are designed for. Idiomatic Python; java parity is about behavior, not internal API shape. Addresses review feedback. For #38746. --- sdks/python/apache_beam/metrics/metric.py | 40 +++++++------------ .../python/apache_beam/metrics/metric_test.py | 30 +++++++------- 2 files changed, 30 insertions(+), 40 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 1b15e939aca1..535ac7e9344c 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -65,11 +65,13 @@ class MetricsFlag(object): Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read once at worker harness initialization from pipeline experiments and apply - for the lifetime of the worker. + for the lifetime of the worker. Exposed as public class attributes so + Delegating* gates can read them with a single attribute load on the hot + path of metric emission. """ - _counter_disabled = False - _string_set_disabled = False - _bounded_trie_disabled = False + counter_disabled = False + string_set_disabled = False + bounded_trie_disabled = False _initialized = False @classmethod @@ -79,36 +81,24 @@ def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: return debug_options = options.view_as(DebugOptions) if debug_options.lookup_experiment('disableCounterMetrics'): - cls._counter_disabled = True + cls.counter_disabled = True _LOGGER.info('Counter metrics are disabled.') if debug_options.lookup_experiment('disableStringSetMetrics'): - cls._string_set_disabled = True + cls.string_set_disabled = True _LOGGER.info('StringSet metrics are disabled.') if debug_options.lookup_experiment('disableBoundedTrieMetrics'): - cls._bounded_trie_disabled = True + cls.bounded_trie_disabled = True _LOGGER.info('BoundedTrie metrics are disabled.') cls._initialized = True @classmethod def reset(cls) -> None: """Reset flags. Test-only.""" - cls._counter_disabled = False - cls._string_set_disabled = False - cls._bounded_trie_disabled = False + cls.counter_disabled = False + cls.string_set_disabled = False + cls.bounded_trie_disabled = False cls._initialized = False - @classmethod - def counter_disabled(cls) -> bool: - return cls._counter_disabled - - @classmethod - def string_set_disabled(cls) -> bool: - return cls._string_set_disabled - - @classmethod - def bounded_trie_disabled(cls) -> bool: - return cls._bounded_trie_disabled - class Metrics(object): """Lets users create/access metric objects during pipeline execution.""" @@ -263,7 +253,7 @@ def __init__( process_wide=process_wide) def inc(self, n: int = 1) -> None: - if MetricsFlag.counter_disabled(): + if MetricsFlag.counter_disabled: return self._updater(n) @@ -291,7 +281,7 @@ def __init__(self, metric_name: MetricName) -> None: self._updater = MetricUpdater(cells.StringSetCell, metric_name) def add(self, value: str) -> None: - if MetricsFlag.string_set_disabled(): + if MetricsFlag.string_set_disabled: return self._updater(value) @@ -302,7 +292,7 @@ def __init__(self, metric_name: MetricName) -> None: self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) def add(self, value) -> None: - if MetricsFlag.bounded_trie_disabled(): + if MetricsFlag.bounded_trie_disabled: return self._updater(value) diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 6e44fc45570c..b67056b42f78 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -126,29 +126,29 @@ def test_get_namespace_error(self): def test_metrics_flag(self): """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments.""" MetricsFlag.reset() - self.assertFalse(MetricsFlag.counter_disabled()) - self.assertFalse(MetricsFlag.string_set_disabled()) - self.assertFalse(MetricsFlag.bounded_trie_disabled()) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) options = PipelineOptions(['--experiments=disableCounterMetrics']) MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled()) - self.assertFalse(MetricsFlag.string_set_disabled()) - self.assertFalse(MetricsFlag.bounded_trie_disabled()) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) MetricsFlag.reset() options = PipelineOptions(['--experiments=disableStringSetMetrics']) MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled()) - self.assertTrue(MetricsFlag.string_set_disabled()) - self.assertFalse(MetricsFlag.bounded_trie_disabled()) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) MetricsFlag.reset() options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled()) - self.assertFalse(MetricsFlag.string_set_disabled()) - self.assertTrue(MetricsFlag.bounded_trie_disabled()) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) MetricsFlag.reset() options = PipelineOptions([ @@ -157,9 +157,9 @@ def test_metrics_flag(self): '--experiments=disableBoundedTrieMetrics', ]) MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled()) - self.assertTrue(MetricsFlag.string_set_disabled()) - self.assertTrue(MetricsFlag.bounded_trie_disabled()) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) MetricsFlag.reset() From 7839e72f4bb48b901485e61863d72df7223b396a Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sun, 31 May 2026 00:40:03 +0530 Subject: [PATCH 5/7] [Python] Address PR review: simplify MetricsFlag pydoc and reset in finally --- sdks/python/apache_beam/metrics/metric.py | 11 +--- .../python/apache_beam/metrics/metric_test.py | 66 +++++++++---------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 535ac7e9344c..a66eed640be6 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -61,14 +61,7 @@ class MetricsFlag(object): - """Process-wide flags controlling which user metric kinds are emitted. - - Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read - once at worker harness initialization from pipeline experiments and apply - for the lifetime of the worker. Exposed as public class attributes so - Delegating* gates can read them with a single attribute load on the hot - path of metric emission. - """ + """Process-wide flags controlling which user metric kinds are emitted.""" counter_disabled = False string_set_disabled = False bounded_trie_disabled = False @@ -76,7 +69,6 @@ class MetricsFlag(object): @classmethod def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: - """Initialize the flags from pipeline experiments. Idempotent.""" if cls._initialized: return debug_options = options.view_as(DebugOptions) @@ -93,7 +85,6 @@ def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: @classmethod def reset(cls) -> None: - """Reset flags. Test-only.""" cls.counter_disabled = False cls.string_set_disabled = False cls.bounded_trie_disabled = False diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index b67056b42f78..1e84e8cc8e9a 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -124,44 +124,44 @@ def test_get_namespace_error(self): Metrics.get_namespace(object()) def test_metrics_flag(self): - """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments.""" MetricsFlag.reset() - self.assertFalse(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) + try: + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) - options = PipelineOptions(['--experiments=disableCounterMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) - MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableStringSetMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled) - self.assertTrue(MetricsFlag.string_set_disabled) - self.assertFalse(MetricsFlag.bounded_trie_disabled) - - MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) - MetricsFlag.set_default_pipeline_options(options) - self.assertFalse(MetricsFlag.counter_disabled) - self.assertFalse(MetricsFlag.string_set_disabled) - self.assertTrue(MetricsFlag.bounded_trie_disabled) + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) - MetricsFlag.reset() - options = PipelineOptions([ - '--experiments=disableCounterMetrics', - '--experiments=disableStringSetMetrics', - '--experiments=disableBoundedTrieMetrics', - ]) - MetricsFlag.set_default_pipeline_options(options) - self.assertTrue(MetricsFlag.counter_disabled) - self.assertTrue(MetricsFlag.string_set_disabled) - self.assertTrue(MetricsFlag.bounded_trie_disabled) + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) - MetricsFlag.reset() + MetricsFlag.reset() + options = PipelineOptions([ + '--experiments=disableCounterMetrics', + '--experiments=disableStringSetMetrics', + '--experiments=disableBoundedTrieMetrics', + ]) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) + finally: + MetricsFlag.reset() def test_disabled_counter_is_noop(self): MetricsFlag.reset() From 859715d223d68ac25a7cd98eb4e22757c3f565e9 Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sun, 31 May 2026 01:12:16 +0530 Subject: [PATCH 6/7] [Python] Address PR review: assert no MetricCell created when disabled --- .../python/apache_beam/metrics/metric_test.py | 58 ++++++++++++++----- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 1e84e8cc8e9a..6937236a8aad 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -164,35 +164,65 @@ def test_metrics_flag(self): MetricsFlag.reset() def test_disabled_counter_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableCounterMetrics']) - MetricsFlag.set_default_pipeline_options(options) try: - counter = Metrics.counter('ns', 'disabled_counter') - counter.inc() - counter.inc(5) - counter.dec() + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.counter('ns', 'baseline').inc() + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.counter('ns', 'after_disable').inc() + Metrics.counter('ns', 'after_disable').inc(5) + Metrics.counter('ns', 'after_disable').dec() + self.assertEqual(len(container.metrics), 1) finally: + sampler.stop() MetricsFlag.reset() def test_disabled_string_set_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableStringSetMetrics']) - MetricsFlag.set_default_pipeline_options(options) try: - string_set = Metrics.string_set('ns', 'disabled_set') - string_set.add('value') + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.string_set('ns', 'baseline').add('seed') + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.string_set('ns', 'after_disable').add('value') + self.assertEqual(len(container.metrics), 1) finally: + sampler.stop() MetricsFlag.reset() def test_disabled_bounded_trie_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) MetricsFlag.reset() - options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) - MetricsFlag.set_default_pipeline_options(options) try: - bounded_trie = Metrics.bounded_trie('ns', 'disabled_trie') - bounded_trie.add(['a', 'b']) + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.bounded_trie('ns', 'baseline').add(['a']) + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b']) + self.assertEqual(len(container.metrics), 1) finally: + sampler.stop() MetricsFlag.reset() def test_counter_empty_name(self): From 80c13c3ba489337cad8a003b8b8272560d3b0d8e Mon Sep 17 00:00:00 2001 From: Anuragp22 Date: Sun, 31 May 2026 01:12:28 +0530 Subject: [PATCH 7/7] [Python] Initialize MetricsFlag from Pipeline so in-process runners honor disable* experiments The previous init only ran in the gRPC SDK harness (sdk_worker_main), so DirectRunner and other in-process runners silently ignored the disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics experiments. Initializing in Pipeline.__init__ matches the pattern of FileSystems.set_options at the same call site and mirrors the Java init point (PipelineRunner.run). --- sdks/python/apache_beam/pipeline.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 750868f7443a..594660d9bea9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -73,6 +73,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems +from apache_beam.metrics.metric import MetricsFlag from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -192,6 +193,7 @@ def __init__( self._options = PipelineOptions([]) FileSystems.set_options(self._options) + MetricsFlag.set_default_pipeline_options(self._options) if runner is None: runner = self._options.view_as(StandardOptions).runner