From 7eeed78cf90cd14483ac56f4afd31bf68be14385 Mon Sep 17 00:00:00 2001 From: Tobias Kaymak Date: Fri, 12 Jun 2026 13:15:55 +0000 Subject: [PATCH] [mqtt] Add Python Xlang Messaging PostCommit with MQTT integration tests Sets up beam_PostCommit_Python_Xlang_Messaging_Direct, modeled on the Xlang IO/GCP Direct PostCommits, to exercise the cross-language MQTT transforms served by the messaging expansion service: - New workflow + trigger file + workflows README entry. - New CrossLanguageTask "messagingCrossLanguage" (collect marker uses_messaging_java_expansion_service) wired into xlangTasks, and a messagingCrossLanguagePostCommit aggregator task. - New integration test xlang_mqttio_it_test.py: starts an Eclipse Mosquitto broker via testcontainers and covers - bounded ReadFromMqtt on the DirectRunner (max_num_records, with a continuous mosquitto_pub publisher inside the container since MQTT has no retention), - WriteToMqtt on the DirectRunner (verified with a mosquitto_sub subscriber), - the unbounded (streaming) path on the Prism runner: streaming ReadFromMqtt feeding WriteToMqtt on a second topic, observed via mosquitto_sub and then cancelled. The tests skip on Dataflow, where a testcontainers broker would not be reachable from the workers (a Dataflow variant would need a remotely hosted MQTT broker, like the hosted Kafka cluster used by the Xlang IO Dataflow suite). Per review feedback from @Abacn on #38493. --- ...tCommit_Python_Xlang_Messaging_Direct.json | 4 + .github/workflows/README.md | 1 + ...stCommit_Python_Xlang_Messaging_Direct.yml | 97 ++++++ .../io/external/xlang_mqttio_it_test.py | 300 ++++++++++++++++++ sdks/python/pytest.ini | 1 + sdks/python/test-suites/direct/build.gradle | 6 + sdks/python/test-suites/xlang/build.gradle | 8 +- 7 files changed, 416 insertions(+), 1 deletion(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json create mode 100644 .github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml create mode 100644 sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json new file mode 100644 index 000000000000..e3d6056a5de9 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 0785b12a1d12..1715365f4ec6 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -414,6 +414,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get | [ PostCommit Java ValidatesRunner Dataflow Streaming Engine ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml) | N/A |`beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json`| [![.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml?query=event%3Aschedule) | | [ PostCommit Python Portable Flink ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml) | N/A |`beam_PostCommit_Python_Portable_Flink.json`| [![.github/workflows/beam_PostCommit_Python_Portable_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Portable_Flink.yml?query=event%3Aschedule) | | [ PostCommit Python Xlang IO Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml) | N/A |`beam_PostCommit_Python_Xlang_IO_Direct.json`| [![.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml?query=event%3Aschedule) | +| [ PostCommit Python Xlang Messaging Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml) | N/A |`beam_PostCommit_Python_Xlang_Messaging_Direct.json`| [![.github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml?query=event%3Aschedule) | ### PerformanceTests and Benchmark Jobs diff --git a/.github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml b/.github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml new file mode 100644 index 000000000000..943dfbaffd94 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PostCommit Python Xlang Messaging Direct + +on: + schedule: + - cron: '45 5/6 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Python_Xlang_Messaging_Direct.json'] + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.pull_request.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Python_Xlang_Messaging_Direct: + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Python_Xlang_Messaging_Direct PostCommit' + runs-on: [self-hosted, ubuntu-24.04, main] + timeout-minutes: 100 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PostCommit_Python_Xlang_Messaging_Direct"] + job_phrase: ["Run Python_Xlang_Messaging_Direct PostCommit"] + steps: + - uses: actions/checkout@v6 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + python-version: | + 3.10 + 3.14 + - name: run PostCommit Python Xlang Messaging Direct script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:python:test-suites:direct:messagingCrossLanguagePostCommit + arguments: -PuseWheelDistribution + - name: Archive Python Test Results + uses: actions/upload-artifact@v7 + if: failure() + with: + name: Python Test Results + path: '**/pytest*.xml' + - name: Publish Python Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/pytest*.xml' + large_files: true diff --git a/sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py b/sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py new file mode 100644 index 000000000000..cae6ccdfacad --- /dev/null +++ b/sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py @@ -0,0 +1,300 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for the cross-language MQTT IO transforms +(ReadFromMqtt / WriteToMqtt), served by the messaging expansion service. + +Runs against an MQTT broker (Eclipse Mosquitto) started via testcontainers. +The DirectRunner tests use reads bounded with max_num_records; unbounded +(streaming) reads require a portable streaming runner (see the +MqttReadSchemaTransformProvider description) and are exercised by the +Prism runner test below. +""" + +import logging +import threading +import time +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import BeamAssertException +from apache_beam.testing.util import assert_that +from apache_beam.typehints.row_type import RowTypeConstraint + +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.io import ReadFromMqtt + from apache_beam.io import WriteToMqtt +except ImportError: + ReadFromMqtt = None + WriteToMqtt = None + +try: + from testcontainers.core.container import DockerContainer + from testcontainers.core.waiting_utils import wait_for_logs +except ImportError: + DockerContainer = None + +NUM_RECORDS = 3 +BYTES_ROW = RowTypeConstraint.from_fields([('bytes', bytes)]) + + +def _payload_count_and_prefix_matcher(expected_count, expected_prefix): + """Matches a bounded read of a continuous publisher: exactly + expected_count payloads, each starting with expected_prefix (the absolute + sequence numbers depend on when the reader subscribed).""" + def _matcher(actual): + actual = list(actual) + if len(actual) != expected_count: + raise BeamAssertException( + 'Expected %d payloads, got %d: %s' % + (expected_count, len(actual), actual)) + for payload in actual: + if not payload.startswith(expected_prefix): + raise BeamAssertException('Unexpected payload: %s' % payload) + + return _matcher + + +@pytest.mark.uses_messaging_java_expansion_service +@unittest.skipIf( + DockerContainer is None, 'testcontainers package is not installed') +@unittest.skipIf( + ReadFromMqtt is None or WriteToMqtt is None, + 'MQTT cross-language wrappers are not generated') +@unittest.skipIf( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner + is None, + 'Do not run this test on precommit suites.') +@unittest.skipIf( + 'Dataflow' in ( + TestPipeline().get_pipeline_options().view_as(StandardOptions).runner or + ''), + 'The testcontainers broker is not reachable from Dataflow workers; ' + 'a Dataflow variant would need a remotely hosted MQTT broker.') +class CrossLanguageMqttIOTest(unittest.TestCase): + def setUp(self): + self.start_mqtt_container(retries=3) + host = self.broker.get_container_host_ip() + port = self.broker.get_exposed_port(1883) + self.server_uri = 'tcp://%s:%s' % (host, port) + + def tearDown(self): + # Sometimes stopping the container raises ReadTimeout. We can ignore it + # here to avoid the test failure. + try: + self.broker.stop() + except Exception: + logging.error('Could not stop the MQTT broker container.') + + # Creating a container with testcontainers sometimes raises ReadTimeout + # error, so retry a couple of times. + def start_mqtt_container(self, retries): + for i in range(retries): + try: + # /mosquitto-no-auth.conf ships with the image and enables an + # anonymous listener on port 1883. + self.broker = DockerContainer('eclipse-mosquitto:2').with_command( + 'mosquitto -c /mosquitto-no-auth.conf').with_exposed_ports(1883) + self.broker.start() + wait_for_logs(self.broker, 'mosquitto version .* running', timeout=30) + break + except Exception as e: + # If start() succeeded but a later step (e.g. wait_for_logs) failed, + # stop the partially started container so the next retry / the raised + # error does not leak a running Docker container. + try: + self.broker.stop() + except Exception: + pass + if i == retries - 1: + logging.error('Unable to initialize the MQTT broker container.') + raise e + + def _connection_configuration(self, topic, client_id): + return { + 'server_uri': self.server_uri, 'topic': topic, 'client_id': client_id + } + + def test_xlang_mqtt_read(self): + topic = 'xlang-mqtt-read-topic' + # MQTT has no message retention for regular messages, so publish + # continuously (via mosquitto_pub inside the broker container) until the + # bounded read collected what it needs. + stop_publishing = threading.Event() + + def publish_loop(): + container = self.broker.get_wrapped_container() + i = 0 + while not stop_publishing.is_set(): + container.exec_run( + ['mosquitto_pub', '-t', topic, '-m', 'msg-%d' % i, '-q', '1']) + i += 1 + time.sleep(0.5) + + publisher = threading.Thread(target=publish_loop, daemon=True) + publisher.start() + try: + with TestPipeline() as p: + p.not_use_test_runner_api = True + payloads = ( + p + | 'ReadFromMqtt' >> ReadFromMqtt( + connection_configuration=self._connection_configuration( + topic, 'xlang-mqtt-read'), + max_num_records=NUM_RECORDS, + max_read_time_seconds=120) + | 'ExtractBytes' >> beam.Map(lambda row: row.bytes)) + assert_that( + payloads, _payload_count_and_prefix_matcher(NUM_RECORDS, b'msg-')) + finally: + stop_publishing.set() + publisher.join() + + def test_xlang_mqtt_write(self): + topic = 'xlang-mqtt-write-topic' + expected_payloads = [b'msg-%d' % i for i in range(NUM_RECORDS)] + subscriber_result = {} + + def subscribe(): + # mosquitto_sub exits after receiving NUM_RECORDS messages (-C) or + # after the timeout (-W), printing one payload per line. + container = self.broker.get_wrapped_container() + exit_code, output = container.exec_run([ + 'mosquitto_sub', + '-t', + topic, + '-q', + '1', + '-C', + str(NUM_RECORDS), + '-W', + '120' + ]) + subscriber_result['exit_code'] = exit_code + subscriber_result['output'] = output + + subscriber = threading.Thread(target=subscribe, daemon=True) + subscriber.start() + # Give the subscriber time to connect before publishing. + time.sleep(5) + + with TestPipeline() as p: + p.not_use_test_runner_api = True + _ = ( + p + | 'CreatePayloads' >> beam.Create(expected_payloads) + | 'ToRow' >> beam.Map(lambda payload: beam.Row(bytes=payload)). + with_output_types(BYTES_ROW) + | 'WriteToMqtt' >> WriteToMqtt( + connection_configuration=self._connection_configuration( + topic, 'xlang-mqtt-write'))) + + subscriber.join(timeout=150) + self.assertEqual(subscriber_result.get('exit_code'), 0) + received = sorted(subscriber_result.get('output', b'').split()) + self.assertEqual(sorted(expected_payloads), received) + + def test_xlang_mqtt_read_write_streaming_on_prism(self): + """Exercises the unbounded (streaming) path on the Prism runner, which + the legacy DirectRunner cannot run: an unbounded ReadFromMqtt feeding a + WriteToMqtt on a second topic. The result is observed with a + mosquitto_sub subscriber on the output topic, after which the + (never-terminating) pipeline is cancelled.""" + source_topic = 'xlang-mqtt-streaming-source' + sink_topic = 'xlang-mqtt-streaming-sink' + stop_publishing = threading.Event() + subscriber_result = {} + + def publish_loop(): + container = self.broker.get_wrapped_container() + i = 0 + while not stop_publishing.is_set(): + container.exec_run([ + 'mosquitto_pub', '-t', source_topic, '-m', 'msg-%d' % i, '-q', '1' + ]) + i += 1 + time.sleep(0.5) + + def subscribe(): + container = self.broker.get_wrapped_container() + exit_code, output = container.exec_run([ + 'mosquitto_sub', + '-t', + sink_topic, + '-q', + '1', + '-C', + str(NUM_RECORDS), + '-W', + '180' + ]) + subscriber_result['exit_code'] = exit_code + subscriber_result['output'] = output + + publisher = threading.Thread(target=publish_loop, daemon=True) + subscriber = threading.Thread(target=subscribe, daemon=True) + publisher.start() + subscriber.start() + + options = PipelineOptions([ + '--runner=PrismRunner', + '--environment_type=LOOPBACK', + '--streaming', + ]) + p = beam.Pipeline(options=options) + _ = ( + p + | 'ReadFromMqtt' >> ReadFromMqtt( + connection_configuration=self._connection_configuration( + source_topic, 'xlang-mqtt-streaming-read')) + | 'Passthrough' >> beam.Map( + lambda row: beam.Row(bytes=row.bytes)).with_output_types(BYTES_ROW) + | 'WriteToMqtt' >> WriteToMqtt( + connection_configuration=self._connection_configuration( + sink_topic, 'xlang-mqtt-streaming-write'))) + result = p.run() + try: + # The subscriber exits once NUM_RECORDS messages flowed through the + # streaming pipeline (or fails the assertions below on its timeout). + subscriber.join(timeout=200) + finally: + stop_publishing.set() + publisher.join() + try: + result.cancel() + except Exception: # pylint: disable=broad-except + # The unbounded pipeline never finishes on its own; cancellation + # after the assertion data was collected is best-effort. + logging.warning('Ignoring error while cancelling the pipeline.') + + self.assertEqual(subscriber_result.get('exit_code'), 0) + payloads = subscriber_result.get('output', b'').split() + self.assertEqual(NUM_RECORDS, len(payloads)) + for payload in payloads: + self.assertTrue( + payload.startswith(b'msg-'), 'Unexpected payload: %s' % payload) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index e2f5a70cf353..082c3ed634b1 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -34,6 +34,7 @@ markers = uses_java_expansion_service: collect Cross Language Java transforms test runs uses_python_expansion_service: collect Cross Language Python transforms test runs uses_io_java_expansion_service: collect Cross Language IO Java transform test runs (with Kafka bootstrap server) + uses_messaging_java_expansion_service: collect Cross Language Messaging IO Java transform test runs xlang_wrapper_generation: collect tests that validate Cross Language wrapper generation uses_transform_service: collect Cross Language test runs that uses the Transform Service xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs diff --git a/sdks/python/test-suites/direct/build.gradle b/sdks/python/test-suites/direct/build.gradle index 4b1025343985..d1fe45683a83 100644 --- a/sdks/python/test-suites/direct/build.gradle +++ b/sdks/python/test-suites/direct/build.gradle @@ -43,6 +43,12 @@ task ioCrossLanguagePostCommit { } } +task messagingCrossLanguagePostCommit { + getVersionsAsList('cross_language_validates_py_versions').each { + dependsOn.add(":sdks:python:test-suites:direct:py${getVersionSuffix(it)}:messagingCrossLanguagePythonUsingJava") + } +} + task crossLanguageWrapperValidationPreCommit { // Different python versions may output types that look different and lead to // false failures. To be consistent, we test on the lowest version only diff --git a/sdks/python/test-suites/xlang/build.gradle b/sdks/python/test-suites/xlang/build.gradle index 1cbbaa0db534..458e90be292e 100644 --- a/sdks/python/test-suites/xlang/build.gradle +++ b/sdks/python/test-suites/xlang/build.gradle @@ -42,6 +42,12 @@ def ioXlang = new CrossLanguageTask().tap { additionalEnvs = ["KAFKA_BOOTSTRAP_SERVER":project.findProperty('kafkaBootstrapServer')] } +def messagingXlang = new CrossLanguageTask().tap { + name = "messagingCrossLanguage" + expansionProjectPaths = [messagingExpansionPath] + collectMarker = "uses_messaging_java_expansion_service" +} + // This list should include all expansion service targets in sdks/python/standard_expansion_services.yaml def servicesToGenerateFrom = [ioExpansionPath, messagingExpansionPath, gcpExpansionPath] def xlangWrapperValidation = new CrossLanguageTask().tap { @@ -54,6 +60,6 @@ def xlangWrapperValidation = new CrossLanguageTask().tap { // List of task metadata objects to create cross-language tasks from. // Each object contains the minimum relevant metadata. -def xlangTasks = [gcpXlang, ioXlang, xlangWrapperValidation] +def xlangTasks = [gcpXlang, ioXlang, messagingXlang, xlangWrapperValidation] ext.xlangTasks = xlangTasks \ No newline at end of file