[Python] Honor disableCounterMetrics, disableStringSetMetrics, and disableBoundedTrieMetrics experiments#38749
Conversation
…bleBoundedTrieMetrics experiments Mirrors the Java SDK Metrics.MetricsFlag behavior so high-throughput Python pipelines can opt out of user metric kinds that add pressure to metric backends. Adds a process-wide MetricsFlag singleton initialized once at worker harness startup from pipeline experiments. When a flag is set, the corresponding Delegating* class short-circuits its update path so no MetricCell is touched and no monitoring info is emitted. For apache#38746.
Ports Java MetricsTest.testMetricsFlag and adds three smoke tests confirming the disabled Counter / StringSet / BoundedTrie short-circuit without raising when called on a no-current-tracker path. For apache#38746.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request adds support for three specific pipeline experiments—disableCounterMetrics, disableStringSetMetrics, and disableBoundedTrieMetrics—to the Python SDK. By enabling these flags, users can prevent the emission of specific metric kinds, which is particularly useful for optimizing performance in high-throughput pipelines. The changes ensure that the Python SDK mirrors the existing behavior found in the Java SDK, providing consistent control over metric backends. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces MetricsFlag to the Python SDK, allowing it to honor pipeline experiments (disableCounterMetrics, disableStringSetMetrics, and disableBoundedTrieMetrics) to opt out of emitting specific user metric kinds, matching the Java SDK behavior. The feedback suggests optimizing performance on the hot path of metric increments by exposing the disabled flags as public class attributes directly on MetricsFlag instead of using @classmethod getter methods. This avoids method call overhead and aligns with PEP 8 recommendations. The suggestions cover refactoring the MetricsFlag class, updating the delegating metric classes, and adjusting the corresponding unit tests.
| _counter_disabled = False | ||
| _string_set_disabled = False | ||
| _bounded_trie_disabled = False | ||
| _initialized = False | ||
|
|
||
| @classmethod | ||
| def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: | ||
| """Initialize the flags from pipeline experiments. Idempotent.""" | ||
| if cls._initialized: | ||
| return | ||
| debug_options = options.view_as(DebugOptions) | ||
| if debug_options.lookup_experiment('disableCounterMetrics'): | ||
| cls._counter_disabled = True | ||
| _LOGGER.info('Counter metrics are disabled.') | ||
| if debug_options.lookup_experiment('disableStringSetMetrics'): | ||
| cls._string_set_disabled = True | ||
| _LOGGER.info('StringSet metrics are disabled.') | ||
| if debug_options.lookup_experiment('disableBoundedTrieMetrics'): | ||
| cls._bounded_trie_disabled = True | ||
| _LOGGER.info('BoundedTrie metrics are disabled.') | ||
| cls._initialized = True | ||
|
|
||
| @classmethod | ||
| def reset(cls) -> None: | ||
| """Reset flags. Test-only.""" | ||
| cls._counter_disabled = False | ||
| cls._string_set_disabled = False | ||
| cls._bounded_trie_disabled = False | ||
| cls._initialized = False | ||
|
|
||
| @classmethod | ||
| def counter_disabled(cls) -> bool: | ||
| return cls._counter_disabled | ||
|
|
||
| @classmethod | ||
| def string_set_disabled(cls) -> bool: | ||
| return cls._string_set_disabled | ||
|
|
||
| @classmethod | ||
| def bounded_trie_disabled(cls) -> bool: | ||
| return cls._bounded_trie_disabled |
There was a problem hiding this comment.
To avoid the overhead of @classmethod descriptor lookup and function calls on the hot path of metric increments, we can simplify MetricsFlag by exposing the disabled flags as public class attributes directly. This aligns with PEP 8 recommendations to avoid redundant getter/setter methods in Python and improves performance.
counter_disabled = False
string_set_disabled = False
bounded_trie_disabled = False
_initialized = False
@classmethod
def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
"""Initialize the flags from pipeline experiments. Idempotent."""
if cls._initialized:
return
debug_options = options.view_as(DebugOptions)
if debug_options.lookup_experiment('disableCounterMetrics'):
cls.counter_disabled = True
_LOGGER.info('Counter metrics are disabled.')
if debug_options.lookup_experiment('disableStringSetMetrics'):
cls.string_set_disabled = True
_LOGGER.info('StringSet metrics are disabled.')
if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
cls.bounded_trie_disabled = True
_LOGGER.info('BoundedTrie metrics are disabled.')
cls._initialized = True
@classmethod
def reset(cls) -> None:
"""Reset flags. Test-only."""
cls.counter_disabled = False
cls.string_set_disabled = False
cls.bounded_trie_disabled = False
cls._initialized = FalseReferences
- PEP 8 recommends avoiding accessor/getter/setter methods for simple attribute access in Python, preferring direct attribute access instead. (link)
| def inc(self, n: int = 1) -> None: | ||
| if MetricsFlag.counter_disabled(): | ||
| return | ||
| self._updater(n) |
There was a problem hiding this comment.
Access the counter_disabled class attribute directly instead of calling a classmethod getter to avoid method call overhead on the hot path.
| def inc(self, n: int = 1) -> None: | |
| if MetricsFlag.counter_disabled(): | |
| return | |
| self._updater(n) | |
| def inc(self, n: int = 1) -> None: | |
| if MetricsFlag.counter_disabled: | |
| return | |
| self._updater(n) |
| def add(self, value: str) -> None: | ||
| if MetricsFlag.string_set_disabled(): | ||
| return | ||
| self._updater(value) | ||
|
|
||
| class DelegatingBoundedTrie(BoundedTrie): | ||
| """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" | ||
| """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" | ||
| def __init__(self, metric_name: MetricName) -> None: | ||
| super().__init__(metric_name) | ||
| self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] | ||
| self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) | ||
|
|
||
| def add(self, value) -> None: | ||
| if MetricsFlag.bounded_trie_disabled(): | ||
| return | ||
| self._updater(value) |
There was a problem hiding this comment.
Access the string_set_disabled and bounded_trie_disabled class attributes directly instead of calling classmethod getters to avoid method call overhead on the hot path.
| def add(self, value: str) -> None: | |
| if MetricsFlag.string_set_disabled(): | |
| return | |
| self._updater(value) | |
| class DelegatingBoundedTrie(BoundedTrie): | |
| """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" | |
| """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" | |
| def __init__(self, metric_name: MetricName) -> None: | |
| super().__init__(metric_name) | |
| self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] | |
| self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) | |
| def add(self, value) -> None: | |
| if MetricsFlag.bounded_trie_disabled(): | |
| return | |
| self._updater(value) | |
| def add(self, value: str) -> None: | |
| if MetricsFlag.string_set_disabled: | |
| return | |
| self._updater(value) | |
| class DelegatingBoundedTrie(BoundedTrie): | |
| """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" | |
| def __init__(self, metric_name: MetricName) -> None: | |
| super().__init__(metric_name) | |
| self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) | |
| def add(self, value) -> None: | |
| if MetricsFlag.bounded_trie_disabled: | |
| return | |
| self._updater(value) |
| def test_metrics_flag(self): | ||
| """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments.""" | ||
| MetricsFlag.reset() | ||
| self.assertFalse(MetricsFlag.counter_disabled()) | ||
| self.assertFalse(MetricsFlag.string_set_disabled()) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled()) | ||
|
|
||
| options = PipelineOptions(['--experiments=disableCounterMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertTrue(MetricsFlag.counter_disabled()) | ||
| self.assertFalse(MetricsFlag.string_set_disabled()) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled()) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableStringSetMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertFalse(MetricsFlag.counter_disabled()) | ||
| self.assertTrue(MetricsFlag.string_set_disabled()) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled()) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertFalse(MetricsFlag.counter_disabled()) | ||
| self.assertFalse(MetricsFlag.string_set_disabled()) | ||
| self.assertTrue(MetricsFlag.bounded_trie_disabled()) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions([ | ||
| '--experiments=disableCounterMetrics', | ||
| '--experiments=disableStringSetMetrics', | ||
| '--experiments=disableBoundedTrieMetrics', | ||
| ]) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertTrue(MetricsFlag.counter_disabled()) | ||
| self.assertTrue(MetricsFlag.string_set_disabled()) | ||
| self.assertTrue(MetricsFlag.bounded_trie_disabled()) | ||
|
|
||
| MetricsFlag.reset() |
There was a problem hiding this comment.
Update the tests to assert on the public class attributes directly instead of calling the removed getter methods.
def test_metrics_flag(self):
"""Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments."""
MetricsFlag.reset()
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions([
'--experiments=disableCounterMetrics',
'--experiments=disableStringSetMetrics',
'--experiments=disableBoundedTrieMetrics',
])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()Drops the classmethod getter wrapper around each disabled flag and exposes counter_disabled / string_set_disabled / bounded_trie_disabled directly as public class attributes. The gate runs on every metric update, so swapping a descriptor lookup + function call for a single attribute load matters in exactly the high-throughput pipelines these experiments are designed for. Idiomatic Python; java parity is about behavior, not internal API shape. Addresses review feedback. For apache#38746.
|
Assigning reviewers: R: @jrmccluskey for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| class MetricsFlag(object): | ||
| """Process-wide flags controlling which user metric kinds are emitted. | ||
|
|
||
| Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read |
There was a problem hiding this comment.
We don't need to mention "Mirrors the Java SDK..." in Python pydoc, as we don't assume Beam python user have Java knowledge (or even aware of Java SDK). In general please simplify pydocs as MetricsFlag is internal API
There was a problem hiding this comment.
Done in 7839e72. Dropped the Java reference and trimmed method docstrings.
| self.assertTrue(MetricsFlag.string_set_disabled) | ||
| self.assertTrue(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| MetricsFlag.reset() |
There was a problem hiding this comment.
Consider wrap the test into a try block and then
finally:
MetricsFlag.reset()
as below
There was a problem hiding this comment.
Done in 7839e72. Wrapped the body in try and moved reset into finally.
| finally: | ||
| MetricsFlag.reset() | ||
|
|
||
| def test_disabled_bounded_trie_is_noop(self): |
There was a problem hiding this comment.
These ..._is_noop tests aren't checking anything.
There was a problem hiding this comment.
Done in 859715d. Each test now sets up a StateSampler with a MetricsContainer, runs a baseline inc/add to confirm the container grows to 1, then enables the disable experiment and runs another inc/add and asserts the count is still 1, proving no MetricCell was created.
…onor disable* experiments The previous init only ran in the gRPC SDK harness (sdk_worker_main), so DirectRunner and other in-process runners silently ignored the disableCounterMetrics / disableStringSetMetrics / disableBoundedTrieMetrics experiments. Initializing in Pipeline.__init__ matches the pattern of FileSystems.set_options at the same call site and mirrors the Java init point (PipelineRunner.run).
Also updated the PR description with a note that disableBoundedTrieMetrics will suppress Lineage data in Python because Python's Lineage uses DelegatingBoundedTrie unconditionally. In Java that path is gated by lineageRollupEnabled, but porting the dual-backend split is out of scope for this issue. |
Adds Python SDK support for the
disableCounterMetrics,disableStringSetMetrics, anddisableBoundedTrieMetricspipeline experiments. High-throughput Python pipelines previously had no way to opt out of user metric kinds that add pressure to metric backends; Java already honored these three experiments, this PR brings parity to Python.What changes
sdks/python/apache_beam/metrics/metric.py: new internalMetricsFlagclass with idempotentset_default_pipeline_options.DelegatingCounter.inc,DelegatingStringSet.add, andDelegatingBoundedTrie.addnow consult the flag and early-return when disabled, so noMetricCellis touched and no monitoring info is emitted.sdks/python/apache_beam/pipeline.py: callsMetricsFlag.set_default_pipeline_options(self._options)inPipeline.__init__, right afterFileSystems.set_options, so DirectRunner and other in-process runners honor the experiments.sdks/python/apache_beam/runners/worker/sdk_worker_main.py: calls the same init during harness startup so worker subprocesses (which do not inherit Python class state across fork/spawn) re-read their own options.sdks/python/apache_beam/metrics/metric_test.py: tests for the flag values and per-kind no-op behavior (asserts noMetricCellis created in the container when the kind is disabled).CHANGES.md: entry under 2.75.0 New Features / Improvements.Note on Lineage
disableBoundedTrieMetricswill also suppress Lineage data in Python, because Python'sLineageusesDelegatingBoundedTrieunconditionally. The Java SDK avoids this by routing Lineage throughStringSetMetricsLineageunlessenableLineageRollupis set, and a Lineage-specific backend split is out of scope for this issue.Fixes #38746.
R: @Abacn
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.