-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Python] Add UnboundedSource SDF wrapper (#19137) #38724
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
4786d94
3bf67cb
9a71653
32af9e3
97dadea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -919,7 +919,11 @@ def __init__(self, source: SourceBase) -> None: | |
| """Initializes a Read transform. | ||
|
|
||
| Args: | ||
| source: Data source to read from. | ||
| source: Data source to read from. A ``BoundedSource`` is wrapped in the | ||
| bounded SDF reader; an ``UnboundedSource`` is dispatched through | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pydoc audiences are Beam users, not necessarily mentioning the implement details for BoundedSource or UnboundedSource (like bounded SDF reader etc) However it's worthwhile to mention source can be a BoundedSource, UnboundedSource, PTransform, or treat Read itself as a primitive (relay to runner implementation) |
||
| :class:`apache_beam.io.unbounded_source.ReadFromUnboundedSource` with | ||
| the default poll interval (users wanting a custom poll cadence must | ||
| instantiate ``ReadFromUnboundedSource`` directly). | ||
| """ | ||
| super().__init__() | ||
| self.source = source | ||
|
|
@@ -945,6 +949,16 @@ def expand(self, pbegin): | |
| | 'EmitSource' >> | ||
| core.Map(lambda _: self.source).with_output_types(BoundedSource) | ||
| | SDFBoundedSourceReader(display_data)) | ||
| # Lazy import to break the iobase <-> unbounded_source cycle: the | ||
| # unbounded_source module imports iobase (UnboundedSource extends | ||
| # SourceBase). Pattern matches the _PubSubSource lazy import below. | ||
| from apache_beam.io.unbounded_source import ReadFromUnboundedSource | ||
| from apache_beam.io.unbounded_source import UnboundedSource | ||
| if isinstance(self.source, UnboundedSource): | ||
| # Delegate to the dedicated SDF PTransform; identical to the user | ||
| # writing `p | ReadFromUnboundedSource(self.source)` directly. Custom | ||
| # poll_interval_seconds requires using ReadFromUnboundedSource directly. | ||
| return pbegin | ReadFromUnboundedSource(self.source) | ||
| elif isinstance(self.source, ptransform.PTransform): | ||
| # The Read transform can also admit a full PTransform as an input | ||
| # rather than an anctual source. If the input is a PTransform, then | ||
|
|
@@ -986,7 +1000,15 @@ def to_runner_api_parameter( | |
| timestamp_attribute=self.source.timestamp_attribute, | ||
| with_attributes=self.source.with_attributes, | ||
| id_attribute=self.source.id_label)) | ||
| if isinstance(self.source, BoundedSource): | ||
| # Lazy import to avoid the iobase <-> unbounded_source cycle. | ||
| from apache_beam.io.unbounded_source import UnboundedSource | ||
| if isinstance(self.source, (BoundedSource, UnboundedSource)): | ||
| # READ.urn covers both source flavours; the IsBounded enum distinguishes | ||
| # them. NB: today the bundle_processor.py IMPULSE_READ_TRANSFORM handler | ||
| # only consumes BOUNDED - the UNBOUNDED branch round-trips correctly | ||
| # through the protobuf graph but execution still flows through this | ||
| # composite's expanded sub-transforms (Impulse | Map | SDF-ParDo), not | ||
| # through the URN-handler. Runner-side UNBOUNDED dispatch is W2 work. | ||
| return ( | ||
| common_urns.deprecated_primitives.READ.urn, | ||
| beam_runner_api_pb2.ReadPayload( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,12 @@ | |
|
|
||
| import unittest | ||
|
|
||
| import mock | ||
|
|
||
| import apache_beam as beam | ||
| from apache_beam.io.concat_source import ConcatSource | ||
| from apache_beam.io.concat_source_test import RangeSource | ||
| import mock | ||
| from apache_beam.io import iobase | ||
| from apache_beam.io import range_trackers | ||
| from apache_beam.io.concat_source import ConcatSource | ||
| from apache_beam.io.concat_source_test import RangeSource | ||
| from apache_beam.io.iobase import SourceBundle | ||
| from apache_beam.options.pipeline_options import DebugOptions | ||
| from apache_beam.testing.util import assert_that | ||
|
|
@@ -220,5 +219,82 @@ def test_sdf_wrap_range_source(self): | |
| self._run_sdf_wrapper_pipeline(RangeSource(0, 4), [0, 1, 2, 3]) | ||
|
|
||
|
|
||
| class UseSdfUnboundedSourcesTests(unittest.TestCase): | ||
| """Mirrors UseSdfBoundedSourcesTests for the new UnboundedSource branch in | ||
| iobase.Read.expand(). Uses CountingSource from unbounded_source_test as the | ||
| fake finite UnboundedSource (avoids dragging the network in). | ||
| """ | ||
| def test_read_dispatches_to_read_from_unbounded_source(self): | ||
| from apache_beam.io.unbounded_source_test import CountingSource | ||
| with mock.patch( | ||
| 'apache_beam.io.unbounded_source.ReadFromUnboundedSource.expand' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test does nothing. It mocks expand essentially manipulate the test to pass. We should exercise expand and assert for something not trivial. |
||
| ) as mock_expand: | ||
| mock_expand.side_effect = ( | ||
| lambda pbegin: pbegin | beam.Impulse() | beam.Map(lambda _: 'fake')) | ||
| with beam.Pipeline() as p: | ||
| out = p | beam.io.Read(CountingSource(3)) | ||
| assert_that(out, equal_to(['fake'])) | ||
| mock_expand.assert_called_once() | ||
|
|
||
| def test_read_end_to_end_unbounded(self): | ||
| from apache_beam.io.unbounded_source_test import CountingSource | ||
| with beam.Pipeline() as p: | ||
| out = p | beam.io.Read(CountingSource(5)) | ||
| assert_that(out, equal_to([0, 1, 2, 3, 4])) | ||
|
|
||
| def test_read_unbounded_pcollection_is_unbounded(self): | ||
| from apache_beam.io.unbounded_source_test import CountingSource | ||
| with beam.Pipeline() as p: | ||
| out = p | beam.io.Read(CountingSource(3)) | ||
| self.assertFalse(out.is_bounded) | ||
|
|
||
| def test_to_runner_api_emits_unbounded_read_payload(self): | ||
| """``Read.to_runner_api_parameter`` must serialize an UnboundedSource as | ||
| ``READ.urn`` with ``IsBounded.UNBOUNDED``. The runner-side handler is W2 | ||
| and ignores this enum today, but the wire format must round-trip | ||
| consistently for pipeline persistence / cross-runner submission. | ||
| """ | ||
| from apache_beam.io.unbounded_source_test import CountingSource | ||
| from apache_beam.portability import common_urns | ||
| from apache_beam.portability.api import beam_runner_api_pb2 | ||
| from apache_beam.runners.pipeline_context import PipelineContext | ||
|
|
||
| read = beam.io.Read(CountingSource(5)) | ||
| urn, payload = read.to_runner_api_parameter(PipelineContext()) | ||
|
|
||
| self.assertEqual(urn, common_urns.deprecated_primitives.READ.urn) | ||
| self.assertIsInstance(payload, beam_runner_api_pb2.ReadPayload) | ||
| self.assertEqual( | ||
| payload.is_bounded, beam_runner_api_pb2.IsBounded.UNBOUNDED) | ||
| # The source field must be populated -- a non-empty FunctionSpec proto. | ||
| self.assertTrue(payload.source.urn) | ||
|
|
||
| def test_read_unbounded_round_trips_through_runner_api(self): | ||
| """Encode then decode via the runner-API protobuf. The restored | ||
| transform must be a ``Read`` wrapping an equivalent UnboundedSource. | ||
| """ | ||
| from apache_beam.io.unbounded_source import UnboundedSource | ||
| from apache_beam.io.unbounded_source_test import CountingSource | ||
| from apache_beam.portability import common_urns | ||
| from apache_beam.portability.api import beam_runner_api_pb2 | ||
| from apache_beam.runners.pipeline_context import PipelineContext | ||
|
|
||
| original = beam.io.Read(CountingSource(7)) | ||
| context = PipelineContext() | ||
| urn, payload = original.to_runner_api_parameter(context) | ||
|
|
||
| transform_proto = beam_runner_api_pb2.PTransform() | ||
| transform_proto.spec.urn = urn | ||
| restored = iobase.Read.from_runner_api_parameter( | ||
| transform_proto, payload, context) | ||
|
|
||
| self.assertIsInstance(restored, iobase.Read) | ||
| self.assertIsInstance(restored.source, UnboundedSource) | ||
| self.assertIsInstance(restored.source, CountingSource) | ||
| self.assertFalse(restored.source.is_bounded()) | ||
| # Verify the source's internal state survived pickle round-trip. | ||
| self.assertEqual(restored.source._count, 7) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| unittest.main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can defer the announcement after milestones completed (at least ValidatesRunner tests confirmed passing)