diff --git a/CHANGES.md b/CHANGES.md index 74209bb7499c..252684c416ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -69,6 +69,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Python SDK now honors the `disableCounterMetrics`, `disableStringSetMetrics`, and `disableBoundedTrieMetrics` pipeline experiments to opt out of emitting the corresponding user metric kinds, matching Java SDK behavior ([#38746](https://github.com/apache/beam/issues/38746)). ## Breaking Changes diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index b15237cae020..a66eed640be6 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -46,11 +46,13 @@ from apache_beam.metrics.metricbase import Histogram from apache_beam.metrics.metricbase import MetricName from apache_beam.metrics.metricbase import StringSet +from apache_beam.options.pipeline_options import DebugOptions if TYPE_CHECKING: from apache_beam.internal.metrics.metric import MetricLogger from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.metricbase import Metric + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils.histogram import BucketType __all__ = ['Metrics', 'MetricsFilter', 'Lineage'] @@ -58,6 +60,37 @@ _LOGGER = logging.getLogger(__name__) +class MetricsFlag(object): + """Process-wide flags controlling which user metric kinds are emitted.""" + counter_disabled = False + string_set_disabled = False + bounded_trie_disabled = False + _initialized = False + + @classmethod + def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: + if cls._initialized: + return + debug_options = options.view_as(DebugOptions) + if debug_options.lookup_experiment('disableCounterMetrics'): + cls.counter_disabled = True + _LOGGER.info('Counter metrics are disabled.') + if debug_options.lookup_experiment('disableStringSetMetrics'): + cls.string_set_disabled = True + _LOGGER.info('StringSet metrics are disabled.') + if debug_options.lookup_experiment('disableBoundedTrieMetrics'): + cls.bounded_trie_disabled = True + _LOGGER.info('BoundedTrie metrics are disabled.') + cls._initialized = True + + @classmethod + def reset(cls) -> None: + cls.counter_disabled = False + cls.string_set_disabled = False + cls.bounded_trie_disabled = False + cls._initialized = False + + class Metrics(object): """Lets users create/access metric objects during pipeline execution.""" @staticmethod @@ -204,12 +237,17 @@ class DelegatingCounter(Counter): def __init__( self, metric_name: MetricName, process_wide: bool = False) -> None: super().__init__(metric_name) - self.inc = MetricUpdater( # type: ignore[method-assign] + self._updater = MetricUpdater( cells.CounterCell, metric_name, default_value=1, process_wide=process_wide) + def inc(self, n: int = 1) -> None: + if MetricsFlag.counter_disabled: + return + self._updater(n) + class DelegatingDistribution(Distribution): """Metrics Distribution Delegates functionality to MetricsEnvironment.""" def __init__( @@ -231,13 +269,23 @@ class DelegatingStringSet(StringSet): """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] + self._updater = MetricUpdater(cells.StringSetCell, metric_name) + + def add(self, value: str) -> None: + if MetricsFlag.string_set_disabled: + return + self._updater(value) class DelegatingBoundedTrie(BoundedTrie): - """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" + """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" def __init__(self, metric_name: MetricName) -> None: super().__init__(metric_name) - self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] + self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) + + def add(self, value) -> None: + if MetricsFlag.bounded_trie_disabled: + return + self._updater(value) class MetricResults(object): diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index ae66200737b5..6937236a8aad 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -32,7 +32,9 @@ from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import Metrics from apache_beam.metrics.metric import MetricsFilter +from apache_beam.metrics.metric import MetricsFlag from apache_beam.metrics.metricbase import MetricName +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner from apache_beam.runners.worker import statesampler from apache_beam.testing.metric_result_matchers import DistributionMatcher @@ -121,6 +123,108 @@ def test_get_namespace_error(self): with self.assertRaises(ValueError): Metrics.get_namespace(object()) + def test_metrics_flag(self): + MetricsFlag.reset() + try: + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) + + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) + + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertFalse(MetricsFlag.bounded_trie_disabled) + + MetricsFlag.reset() + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + self.assertFalse(MetricsFlag.counter_disabled) + self.assertFalse(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) + + MetricsFlag.reset() + options = PipelineOptions([ + '--experiments=disableCounterMetrics', + '--experiments=disableStringSetMetrics', + '--experiments=disableBoundedTrieMetrics', + ]) + MetricsFlag.set_default_pipeline_options(options) + self.assertTrue(MetricsFlag.counter_disabled) + self.assertTrue(MetricsFlag.string_set_disabled) + self.assertTrue(MetricsFlag.bounded_trie_disabled) + finally: + MetricsFlag.reset() + + def test_disabled_counter_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) + MetricsFlag.reset() + try: + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.counter('ns', 'baseline').inc() + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableCounterMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.counter('ns', 'after_disable').inc() + Metrics.counter('ns', 'after_disable').inc(5) + Metrics.counter('ns', 'after_disable').dec() + self.assertEqual(len(container.metrics), 1) + finally: + sampler.stop() + MetricsFlag.reset() + + def test_disabled_string_set_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) + MetricsFlag.reset() + try: + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.string_set('ns', 'baseline').add('seed') + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableStringSetMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.string_set('ns', 'after_disable').add('value') + self.assertEqual(len(container.metrics), 1) + finally: + sampler.stop() + MetricsFlag.reset() + + def test_disabled_bounded_trie_is_noop(self): + sampler = statesampler.StateSampler('', counters.CounterFactory()) + statesampler.set_current_tracker(sampler) + state = sampler.scoped_state( + 'mystep', 'myState', metrics_container=MetricsContainer('mystep')) + MetricsFlag.reset() + try: + sampler.start() + with state: + container = MetricsEnvironment.current_container() + Metrics.bounded_trie('ns', 'baseline').add(['a']) + self.assertEqual(len(container.metrics), 1) + options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) + MetricsFlag.set_default_pipeline_options(options) + Metrics.bounded_trie('ns', 'after_disable').add(['a', 'b']) + self.assertEqual(len(container.metrics), 1) + finally: + sampler.stop() + MetricsFlag.reset() + def test_counter_empty_name(self): with self.assertRaises(ValueError): Metrics.counter("namespace", "") diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 750868f7443a..594660d9bea9 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -73,6 +73,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems +from apache_beam.metrics.metric import MetricsFlag from apache_beam.options.pipeline_options import CrossLanguageOptions from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -192,6 +193,7 @@ def __init__( self._options = PipelineOptions([]) FileSystems.set_options(self._options) + MetricsFlag.set_default_pipeline_options(self._options) if runner is None: runner = self._options.view_as(StandardOptions).runner diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 754a631eaf33..58beda96d63d 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -33,6 +33,7 @@ from apache_beam.internal import pickler from apache_beam.io import filesystems +from apache_beam.metrics import metric from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) + metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options) pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library pickler.set_library(pickle_library)