Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/typehints/row_type_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ class RowTypeTest(unittest.TestCase):
@staticmethod
def _check_key_type_and_count(x) -> int:
key_type = type(x[0])
if not row_type._user_type_is_generated(key_type):
raise RuntimeError("Expect type after GBK to be generated user type")
if row_type._user_type_is_generated(key_type):
raise RuntimeError("Type after GBK not preserved, get generated type")
if not hasattr(key_type, row_type._BEAM_SCHEMA_ID):
raise RuntimeError("Type after GBK missing Beam schema ID")

return len(x[1])

Expand Down
29 changes: 16 additions & 13 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,19 +587,22 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
descriptions[field.name] = field.description
subfields.append((field.name, field_py_type))

user_type = NamedTuple(type_name, subfields)

# Define a reduce function, otherwise these types can't be pickled
# (See BEAM-9574)
setattr(
user_type,
'__reduce__',
_named_tuple_reduce_method(schema.SerializeToString()))
setattr(user_type, "_field_descriptions", descriptions)
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)

self.schema_registry.add(user_type, schema)
coders.registry.register_coder(user_type, coders.RowCoder)
if schema.id in self.schema_registry.by_id:
user_type = self.schema_registry.by_id[schema.id][0]
else:
user_type = NamedTuple(type_name, subfields)

# Define a reduce function, otherwise these types can't be pickled
# (See BEAM-9574)
setattr(
user_type,
'__reduce__',
_named_tuple_reduce_method(schema.SerializeToString()))
setattr(user_type, "_field_descriptions", descriptions)
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)

self.schema_registry.add(user_type, schema)
coders.registry.register_coder(user_type, coders.RowCoder)

return user_type

Expand Down
Loading