From 3bd0f51cf6f3b5803b3759fa2b27202c98730b93 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 8 Apr 2026 14:46:43 -0400 Subject: [PATCH 1/4] Use registered type for Row --- .../apache_beam/typehints/row_type_test.py | 6 ++-- sdks/python/apache_beam/typehints/schemas.py | 29 ++++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/typehints/row_type_test.py b/sdks/python/apache_beam/typehints/row_type_test.py index 97012d9561d7..aabf9c946a2d 100644 --- a/sdks/python/apache_beam/typehints/row_type_test.py +++ b/sdks/python/apache_beam/typehints/row_type_test.py @@ -33,8 +33,10 @@ class RowTypeTest(unittest.TestCase): @staticmethod def _check_key_type_and_count(x) -> int: key_type = type(x[0]) - if not row_type._user_type_is_generated(key_type): - raise RuntimeError("Expect type after GBK to be generated user type") + if row_type._user_type_is_generated(key_type): + raise RuntimeError("Type after GBK not preserved, get generated type") + if not hasattr(key_type, row_type._BEAM_SCHEMA_ID): + raise RuntimeError("Type after GBK missing Beam schema ID") return len(x[1]) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index d2c4db8cabca..108e75aac9c0 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -587,19 +587,22 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type: descriptions[field.name] = field.description subfields.append((field.name, field_py_type)) - user_type = NamedTuple(type_name, subfields) - - # Define a reduce function, otherwise these types can't be pickled - # (See BEAM-9574) - setattr( - user_type, - '__reduce__', - _named_tuple_reduce_method(schema.SerializeToString())) - setattr(user_type, "_field_descriptions", descriptions) - setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id) - - self.schema_registry.add(user_type, schema) - coders.registry.register_coder(user_type, coders.RowCoder) + if schema.id in self.schema_registry.by_id: + user_type = self.schema_registry.by_id[schema.id][0] + else: + user_type = NamedTuple(type_name, subfields) + + # Define a reduce function, otherwise these types can't be pickled + # (See BEAM-9574) + setattr( + user_type, + '__reduce__', + _named_tuple_reduce_method(schema.SerializeToString())) + setattr(user_type, "_field_descriptions", descriptions) + setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id) + + self.schema_registry.add(user_type, schema) + coders.registry.register_coder(user_type, coders.RowCoder) return user_type From ef026682d65868c2c46ae4aeb1f5b2cd1d30a641 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 9 Apr 2026 11:05:04 -0400 Subject: [PATCH 2/4] trigger postcommit tests --- .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index bb5da04014ec..83346d34aee0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 15 + "modification": 16 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..c5309eebb070 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 17 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } From 1e00d27af635d3e23ad26269409ec32dfe6f5e6a Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 9 Apr 2026 19:08:35 -0400 Subject: [PATCH 3/4] Revert "trigger postcommit tests" This reverts commit ef026682d65868c2c46ae4aeb1f5b2cd1d30a641. Tests passed --- .../beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index 83346d34aee0..bb5da04014ec 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 15 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index c5309eebb070..83346d34aee0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 17 + "modification": 16 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index e3d6056a5de9..b60f5c4cc3c8 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 0 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json index c537844dc84a..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 2 } From 4e49db9ff58dfc0da5f9e4eb26e809b4e90a9cc1 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 10 Apr 2026 14:09:28 -0400 Subject: [PATCH 4/4] Introduce register_row to register with both coder and schema registry Save schema registry id->type mapping --- ...beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 2 +- .../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +- sdks/python/apache_beam/coders/typecoders.py | 15 +++++++++++++++ .../apache_beam/internal/cloudpickle_pickler.py | 13 ++++++++++++- .../io/external/xlang_jdbcio_it_test.py | 6 +++--- .../python/apache_beam/typehints/row_type_test.py | 12 ++++-------- .../apache_beam/typehints/schema_registry.py | 11 ++++++++++- 7 files changed, 46 insertions(+), 15 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json index bb5da04014ec..83346d34aee0 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 15 + "modification": 16 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json index 83346d34aee0..c5309eebb070 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 16 + "modification": 17 } diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 9683e00f0c2a..9bf58091bb6c 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -124,6 +124,7 @@ def _normalize_typehint_type(typehint_type): def register_coder( self, typehint_type: Any, typehint_coder_class: Type[coders.Coder]) -> None: + "Register a user type with a coder" if not isinstance(typehint_coder_class, type): raise TypeError( 'Coder registration requires a coder class object. ' @@ -133,6 +134,20 @@ def register_coder( self._register_coder_internal( self._normalize_typehint_type(typehint_type), typehint_coder_class) + def register_row(self, typehint_type: Any) -> None: + """ + Register a user type with a Beam Row. + + This registers the type with a RowCoder and register its schema. + """ + from apache_beam.typehints.schemas import typing_to_runner_api + from apache_beam.coders import RowCoder + # Register with row coder + self.register_coder(typehint_type, RowCoder) + # This call generated a schema id for the type and register it with + # schema registry + typing_to_runner_api(typehint_type) + def get_coder(self, typehint: Any) -> coders.Coder: if typehint and typehint.__module__ == '__main__': # See https://github.com/apache/beam/issues/21541 diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index acdcc46cd40d..cea4f01f803c 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -256,20 +256,27 @@ def dump_session(file_path): # dump supported Beam Registries (currently only logical type registry) from apache_beam.coders import typecoders from apache_beam.typehints import schemas + from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY with _pickle_lock, open(file_path, 'wb') as file: coder_reg = typecoders.registry.get_custom_type_coder_tuples() logical_type_reg = schemas.LogicalType._known_logical_types.copy_custom() + schema_reg = SCHEMA_REGISTRY.get_registered_typings() pickler = cloudpickle.CloudPickler(file) # TODO(https://github.com/apache/beam/issues/18500) add file system registry # once implemented - pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg}) + pickler.dump({ + "coder": coder_reg, + "logical_type": logical_type_reg, + "schema": schema_reg + }) def load_session(file_path): from apache_beam.coders import typecoders from apache_beam.typehints import schemas + from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY with _pickle_lock, open(file_path, 'rb') as file: registries = cloudpickle.load(file) @@ -284,3 +291,7 @@ def load_session(file_path): schemas.LogicalType._known_logical_types.load(registries["logical_type"]) else: _LOGGER.warning('No logical type registry found in saved session') + if "schema" in registries: + SCHEMA_REGISTRY.load_registered_typings(registries["schema"]) + else: + _LOGGER.warning('No schema registry found in saved session') diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index 069f13e11bfb..4c07d4db9300 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -64,7 +64,7 @@ ("f_timestamp", Timestamp), ("f_decimal", Decimal), ("f_date", datetime.date), ("f_time", datetime.time)], ) -coders.registry.register_coder(JdbcTestRow, coders.RowCoder) +coders.registry.register_row(JdbcTestRow) CustomSchemaRow = typing.NamedTuple( "CustomSchemaRow", @@ -82,11 +82,11 @@ ("renamed_time", datetime.time), ], ) -coders.registry.register_coder(CustomSchemaRow, coders.RowCoder) +coders.registry.register_row(CustomSchemaRow) SimpleRow = typing.NamedTuple( "SimpleRow", [("id", int), ("name", str), ("value", float)]) -coders.registry.register_coder(SimpleRow, coders.RowCoder) +coders.registry.register_row(SimpleRow) @pytest.mark.uses_gcp_java_expansion_service diff --git a/sdks/python/apache_beam/typehints/row_type_test.py b/sdks/python/apache_beam/typehints/row_type_test.py index aabf9c946a2d..54e64caf6fa7 100644 --- a/sdks/python/apache_beam/typehints/row_type_test.py +++ b/sdks/python/apache_beam/typehints/row_type_test.py @@ -44,8 +44,7 @@ def test_group_by_key_namedtuple(self): MyNamedTuple = typing.NamedTuple( "MyNamedTuple", [("id", int), ("name", str)]) - beam.coders.typecoders.registry.register_coder( - MyNamedTuple, beam.coders.RowCoder) + beam.coders.typecoders.registry.register_row(MyNamedTuple) def generate(num: int): for i in range(100): @@ -69,8 +68,7 @@ class MyDataClass: id: int name: str - beam.coders.typecoders.registry.register_coder( - MyDataClass, beam.coders.RowCoder) + beam.coders.typecoders.registry.register_row(MyDataClass) def generate(num: int): for i in range(100): @@ -122,10 +120,8 @@ class DataClassInt: class DataClassStr(DataClassInt): name: str - beam.coders.typecoders.registry.register_coder( - DataClassInt, beam.coders.RowCoder) - beam.coders.typecoders.registry.register_coder( - DataClassStr, beam.coders.RowCoder) + beam.coders.typecoders.registry.register_row(DataClassInt) + beam.coders.typecoders.registry.register_row(DataClassStr) def generate(num: int): for i in range(10): diff --git a/sdks/python/apache_beam/typehints/schema_registry.py b/sdks/python/apache_beam/typehints/schema_registry.py index 7d8cdcf57d3f..684bf8734a5f 100644 --- a/sdks/python/apache_beam/typehints/schema_registry.py +++ b/sdks/python/apache_beam/typehints/schema_registry.py @@ -26,7 +26,7 @@ class SchemaTypeRegistry(object): def __init__(self): self.by_id = {} - self.by_typing = {} + self.by_typing = {} # currently not used def generate_new_id(self): for _ in range(100): @@ -43,6 +43,15 @@ def add(self, typing, schema): if schema.id: self.by_id[schema.id] = (typing, schema) + def load_registered_typings(self, by_id): + for id, typing in by_id.items(): + if id not in self.by_id: + self.by_id[id] = (typing, None) + + def get_registered_typings(self): + # Used by save_main_session, as pb2.schema isn't picklable + return {k: v[0] for k, v in self.by_id.items()} + def get_typing_by_id(self, unique_id): if not unique_id: return None