Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7539382
[WIP] Migrate to Google Cloud Dataflow Client
jrmccluskey Feb 18, 2026
6931f9e
Trigger relevant postcommits
jrmccluskey Feb 18, 2026
d44155f
base image update
jrmccluskey Feb 18, 2026
9b2dc4b
fix camel case
jrmccluskey Feb 18, 2026
e0457d9
Merge branch 'master' into dataflowClientTesting
jrmccluskey Feb 18, 2026
e1905bf
update dataflow runner + tests
jrmccluskey Feb 18, 2026
1cd8bb6
slide import to avoid triggering unit tests
jrmccluskey Feb 18, 2026
5bd5181
yapf stuff
jrmccluskey Feb 18, 2026
7480bef
remove extra print
jrmccluskey Feb 18, 2026
c7fdc59
further spec structs, fix incorrect piplineUrl option, remove old cli…
jrmccluskey Feb 19, 2026
14be09f
Merge branch 'master' into dataflowClientTesting
jrmccluskey Feb 19, 2026
04598a5
suppress line-too-longs
jrmccluskey Feb 19, 2026
08febd4
formatting
jrmccluskey Feb 19, 2026
1d9f176
linting, tweak metrics tests
jrmccluskey Feb 24, 2026
25b7b2c
Proto-specific changes to metric processing tests
jrmccluskey Feb 24, 2026
849a2ec
try to dump logging
jrmccluskey Feb 24, 2026
db10c20
handle more straightforward metrics values
jrmccluskey Feb 24, 2026
c25f905
add skips since the unit tests now depend on the proto library
jrmccluskey Feb 24, 2026
a7a4015
testing if there's a disconnect between proto behavior locally and in…
jrmccluskey Feb 25, 2026
24d92fe
correct scalar access
jrmccluskey Feb 25, 2026
487f1e8
clean up dist accesses
jrmccluskey Feb 25, 2026
935ac90
linting, various fixes
jrmccluskey Feb 25, 2026
bcd8f2f
fix unit test setup for direct accesses
jrmccluskey Feb 25, 2026
6847567
linting
jrmccluskey Feb 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
55 changes: 15 additions & 40 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _get_match(proto, filter_fn):


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
STRUCTURED_NAME_LABELS = set(
['execution_step', 'original_name', 'output_user_name'])

Expand Down Expand Up @@ -113,7 +112,7 @@ def _translate_step_name(self, internal_name):
step = _get_match(
self._job_graph.proto.steps, lambda x: x.name == internal_name)
user_step_name = _get_match(
step.properties.additionalProperties,
step.properties.properties,
lambda x: x.key == 'user_name').value.string_value
except ValueError:
pass # Exception is handled below.
Expand All @@ -135,24 +134,22 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == STEP_LABEL).value
step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
carried_namespace = metric.name.context['namespace']
if carried_namespace:
namespace = carried_namespace
except ValueError:
pass

for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
for key in metric.name.context:
if key in STRUCTURED_NAME_LABELS:
labels[key] = metric.name.context[key]
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
Expand Down Expand Up @@ -185,10 +182,7 @@ def _populate_metrics(self, response, result, user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [
prop for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true'
]
is_tentative = metric.name.context['tentative']
tentative_or_committed = 'tentative' if is_tentative else 'committed'

metric_key = self._get_metric_key(metric)
Expand All @@ -209,32 +203,13 @@ def _get_metric_value(self, metric):
return None

if metric.scalar is not None:
return metric.scalar.integer_value
# This will always be a single value if there is any data in the field.
return metric.scalar
elif metric.distribution is not None:
dist_count = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_sum = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
if dist_sum is None:
# distribution metric is not meant to use on large values, but in case
# it is, the value can overflow and become double_value, the correctness
# of the value may not be guaranteed.
_LOGGER.info(
"Distribution metric sum value seems to have "
"overflowed integer_value range, the correctness of sum or mean "
"value may not be guaranteed: %s" % metric.distribution)
dist_sum = int(
_get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.double_value)
dist_count = metric.distribution['count']
dist_min = metric.distribution['min']
dist_max = metric.distribution['max']
dist_sum = metric.distribution['sum']
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
#TODO(https://github.com/apache/beam/issues/31788) support StringSet after
Expand Down
Loading
Loading