Skip to content

[mqtt] Add Python Xlang Messaging PostCommit with MQTT integration tests#38966

Open
tkaymak wants to merge 1 commit into
apache:masterfrom
tkaymak:mqtt-xlang-postcommit
Open

[mqtt] Add Python Xlang Messaging PostCommit with MQTT integration tests#38966
tkaymak wants to merge 1 commit into
apache:masterfrom
tkaymak:mqtt-xlang-postcommit

Conversation

@tkaymak

@tkaymak tkaymak commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

What

Follow-up to #38493 (now merged). Sets up a PostCommit suite for the cross-language MQTT transforms (ReadFromMqtt / WriteToMqtt) served by the messaging expansion service, and re-adds the CHANGES.md announcement that was deferred from #38493.

How

Clones the existing beam_PostCommit_Python_Xlang_IO_Direct pattern for the messaging expansion service:

  • New workflow beam_PostCommit_Python_Xlang_Messaging_Direct.yml + trigger file + .github/workflows/README.md row.
  • New messagingCrossLanguage CrossLanguageTask (collect marker uses_messaging_java_expansion_service) in sdks/python/test-suites/xlang/build.gradle, plus a messagingCrossLanguagePostCommit aggregator task and the pytest marker registration.
  • New integration test apache_beam/io/external/xlang_mqttio_it_test.py that starts an Eclipse Mosquitto broker via testcontainers and covers:
    • bounded ReadFromMqtt on the DirectRunner (max_num_records, with a continuous mosquitto_pub publisher inside the container since MQTT has no message retention),
    • WriteToMqtt on the DirectRunner (verified with a mosquitto_sub subscriber),
    • the unbounded (streaming) path on the Prism runner: a streaming ReadFromMqtt feeding WriteToMqtt on a second topic, observed via mosquitto_sub, then cancelled.
  • The tests skip on Dataflow, where a testcontainers broker is not reachable from the workers. A Dataflow variant would need a remotely-hosted broker (like the hosted Kafka cluster used by the Xlang IO Dataflow suite) and is out of scope here.

Testing

All three integration tests pass locally against the messaging expansion-service shadowJar

If the binary deployment of Prism to CI workers proves unreliable, the streaming test can be aborted.
R: @Abacn

This addresses your post-approval comment on #38493 about setting up the Xlang Messaging PostCommit.

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request establishes a robust testing infrastructure for cross-language MQTT IO transforms in Apache Beam. By setting up a dedicated PostCommit suite and leveraging testcontainers for local broker simulation, it ensures the reliability of ReadFromMqtt and WriteToMqtt operations across both batch and streaming pipelines. The changes also include necessary build configuration updates and documentation improvements to support these new portable messaging capabilities.

Highlights

  • New PostCommit Suite: Introduced a new CI workflow 'beam_PostCommit_Python_Xlang_Messaging_Direct' to validate cross-language MQTT transforms.
  • Integration Testing: Added comprehensive integration tests in 'xlang_mqttio_it_test.py' covering bounded and unbounded MQTT operations using testcontainers.
  • Build Infrastructure: Configured new Gradle tasks and pytest markers to support the messaging expansion service in cross-language test suites.
  • Documentation: Updated CHANGES.md to reflect the availability of portable ReadFromMqtt and WriteToMqtt transforms in Python.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Ignored Files
  • Ignored by pattern: .github/workflows/** (2)
    • .github/workflows/README.md
    • .github/workflows/beam_PostCommit_Python_Xlang_Messaging_Direct.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces integration tests and build configurations for cross-language MQTT IO transforms (ReadFromMqtt / WriteToMqtt) in Python, served by the messaging expansion service. The review feedback suggests replacing a bare except clause with except Exception to avoid catching system-exiting exceptions, and ensuring that any partially started MQTT broker container is stopped during retries to prevent container leaks.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py Outdated
Comment thread sdks/python/apache_beam/io/external/xlang_mqttio_it_test.py
Sets up beam_PostCommit_Python_Xlang_Messaging_Direct, modeled on the
Xlang IO/GCP Direct PostCommits, to exercise the cross-language MQTT
transforms served by the messaging expansion service:

- New workflow + trigger file + workflows README entry.
- New CrossLanguageTask "messagingCrossLanguage" (collect marker
  uses_messaging_java_expansion_service) wired into xlangTasks, and a
  messagingCrossLanguagePostCommit aggregator task.
- New integration test xlang_mqttio_it_test.py: starts an Eclipse
  Mosquitto broker via testcontainers and covers
  - bounded ReadFromMqtt on the DirectRunner (max_num_records, with a
    continuous mosquitto_pub publisher inside the container since MQTT
    has no retention),
  - WriteToMqtt on the DirectRunner (verified with a mosquitto_sub
    subscriber),
  - the unbounded (streaming) path on the Prism runner: streaming
    ReadFromMqtt feeding WriteToMqtt on a second topic, observed via
    mosquitto_sub and then cancelled.
  The tests skip on Dataflow, where a testcontainers broker would not be
  reachable from the workers (a Dataflow variant would need a remotely
  hosted MQTT broker, like the hosted Kafka cluster used by the Xlang IO
  Dataflow suite).
- Re-adds the CHANGES.md announcement that was deferred from apache#38493.

Per review feedback from @Abacn on apache#38493.
@tkaymak tkaymak force-pushed the mqtt-xlang-postcommit branch from bb3954c to 8387d94 Compare June 15, 2026 18:15
@tkaymak

tkaymak commented Jun 15, 2026

Copy link
Copy Markdown
Contributor Author

Addressed @gemini-code-assist's two comments in 8387d94710b (bare except:except Exception:; the broker-start retry now stops a partially-started container to avoid leaks).

The two red checks are both pre-existing master breakages, unrelated to this PR (which only adds the messaging expansion service + its IT/workflow):

The MQTT integration tests in this PR run under beam_PostCommit_Python_Xlang_Messaging_Direct (PostCommit), which isn't part of the PreCommit set.
I verified all three locally against the messaging shadowJar.

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Abacn Abacn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

There is merge conflict for CHANGES.md. Let's revert this file and add annoucement when the tests are confirmed green

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants