[mqtt] Add Python Xlang Messaging PostCommit with MQTT integration tests#38966
[mqtt] Add Python Xlang Messaging PostCommit with MQTT integration tests#38966tkaymak wants to merge 1 commit into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes a robust testing infrastructure for cross-language MQTT IO transforms in Apache Beam. By setting up a dedicated PostCommit suite and leveraging testcontainers for local broker simulation, it ensures the reliability of ReadFromMqtt and WriteToMqtt operations across both batch and streaming pipelines. The changes also include necessary build configuration updates and documentation improvements to support these new portable messaging capabilities. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces integration tests and build configurations for cross-language MQTT IO transforms (ReadFromMqtt / WriteToMqtt) in Python, served by the messaging expansion service. The review feedback suggests replacing a bare except clause with except Exception to avoid catching system-exiting exceptions, and ensuring that any partially started MQTT broker container is stopped during retries to prevent container leaks.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Sets up beam_PostCommit_Python_Xlang_Messaging_Direct, modeled on the
Xlang IO/GCP Direct PostCommits, to exercise the cross-language MQTT
transforms served by the messaging expansion service:
- New workflow + trigger file + workflows README entry.
- New CrossLanguageTask "messagingCrossLanguage" (collect marker
uses_messaging_java_expansion_service) wired into xlangTasks, and a
messagingCrossLanguagePostCommit aggregator task.
- New integration test xlang_mqttio_it_test.py: starts an Eclipse
Mosquitto broker via testcontainers and covers
- bounded ReadFromMqtt on the DirectRunner (max_num_records, with a
continuous mosquitto_pub publisher inside the container since MQTT
has no retention),
- WriteToMqtt on the DirectRunner (verified with a mosquitto_sub
subscriber),
- the unbounded (streaming) path on the Prism runner: streaming
ReadFromMqtt feeding WriteToMqtt on a second topic, observed via
mosquitto_sub and then cancelled.
The tests skip on Dataflow, where a testcontainers broker would not be
reachable from the workers (a Dataflow variant would need a remotely
hosted MQTT broker, like the hosted Kafka cluster used by the Xlang IO
Dataflow suite).
- Re-adds the CHANGES.md announcement that was deferred from apache#38493.
Per review feedback from @Abacn on apache#38493.
bb3954c to
8387d94
Compare
|
Addressed @gemini-code-assist's two comments in The two red checks are both pre-existing master breakages, unrelated to this PR (which only adds the messaging expansion service + its IT/workflow):
The MQTT integration tests in this PR run under |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Abacn
left a comment
There was a problem hiding this comment.
Thanks!
There is merge conflict for CHANGES.md. Let's revert this file and add annoucement when the tests are confirmed green
What
Follow-up to #38493 (now merged). Sets up a PostCommit suite for the cross-language MQTT transforms (
ReadFromMqtt/WriteToMqtt) served by the messaging expansion service, and re-adds the CHANGES.md announcement that was deferred from #38493.How
Clones the existing
beam_PostCommit_Python_Xlang_IO_Directpattern for the messaging expansion service:beam_PostCommit_Python_Xlang_Messaging_Direct.yml+ trigger file +.github/workflows/README.mdrow.messagingCrossLanguageCrossLanguageTask(collect markeruses_messaging_java_expansion_service) insdks/python/test-suites/xlang/build.gradle, plus amessagingCrossLanguagePostCommitaggregator task and the pytest marker registration.apache_beam/io/external/xlang_mqttio_it_test.pythat starts an Eclipse Mosquitto broker via testcontainers and covers:ReadFromMqtton the DirectRunner (max_num_records, with a continuousmosquitto_pubpublisher inside the container since MQTT has no message retention),WriteToMqtton the DirectRunner (verified with amosquitto_subsubscriber),ReadFromMqttfeedingWriteToMqtton a second topic, observed viamosquitto_sub, then cancelled.Testing
All three integration tests pass locally against the messaging expansion-service shadowJar
If the binary deployment of Prism to CI workers proves unreliable, the streaming test can be aborted.
R: @Abacn
This addresses your post-approval comment on #38493 about setting up the Xlang Messaging PostCommit.