Skip to content

Simplify event loop handling and remove deprecated get_event_loop#340

Open
GDYendell wants to merge 2 commits intodocsfrom
simplify-event-loop
Open

Simplify event loop handling and remove deprecated get_event_loop#340
GDYendell wants to merge 2 commits intodocsfrom
simplify-event-loop

Conversation

@GDYendell
Copy link
Contributor

@GDYendell GDYendell commented Mar 13, 2026


In the process of writing documentation it became clear that

  • We are using get_event_loop, which is deprecated and planned for removal in 3.16
  • The event loop handling is over complicated

Summary:

  • Remove the loop parameter to FastCS and instead require clients to call serve from an async context with the loop they want it to run in
  • Don't pass a loop to transports and instead retrieve it via get_running_loop only where required (EpicaCA and Tango)
  • Use asyncio.create_task from an async context consistently in tests, rather than run_until_complete or ensure_future
  • Improve tango test handling of DeviceTestContext to fix resource warnings

Summary by CodeRabbit

  • Refactor
    • Simplified event loop configuration by removing explicit loop parameters from FastCS initialization and transport connection methods
    • Event loop management is now handled automatically by the framework
    • Updated async task handling patterns throughout the codebase for improved reliability

@GDYendell GDYendell requested review from coretl and shihab-dls March 13, 2026 16:47
@coderabbitai
Copy link

coderabbitai bot commented Mar 13, 2026

📝 Walkthrough

Walkthrough

This PR removes explicit event loop parameters from the FastCS API and internal implementations, replacing manual loop passing with implicit asyncio patterns such as asyncio.get_running_loop() and asyncio.run(). The refactoring simplifies constructor signatures across all transport classes and the main control system.

Changes

Cohort / File(s) Summary
Core Control System
src/fastcs/control_system.py
Removed loop parameter from __init__, replaced explicit loop/task management with asyncio.run(), switched to asyncio.create_task(), enhanced task cancellation in _stop_scan_tasks(), and updated signal handler installation to use the current running loop.
Launch and CLI
src/fastcs/launch.py
Simplified FastCS instantiation by removing loop parameter from constructor call; removed asyncio import.
EPICS CA Transport
src/fastcs/transports/epics/ca/ioc.py, src/fastcs/transports/epics/ca/transport.py
EpicsCAIOC.run() signature changed to remove loop parameter and now uses asyncio.get_running_loop(); EpicsCATransport.connect() simplified to accept only controller_api and removed loop storage.
EPICS PVA Transport
src/fastcs/transports/epics/pva/transport.py
Removed loop parameter from EpicsPVATransport.connect() signature; simplified initialization to rely on implicit event loop handling.
GraphQL & REST Transports
src/fastcs/transports/graphql/transport.py, src/fastcs/transports/rest/transport.py
Both transport classes had loop parameter removed from connect() method signatures; simplified server initialization.
Tango Transport
src/fastcs/transports/tango/transport.py
Removed loop parameter from connect() and updated TangoDSR initialization to use asyncio.get_running_loop(); simplified serve() to directly await thread execution.
Transport Base
src/fastcs/transports/transport.py
Updated abstract connect() method to remove loop parameter and removed asyncio import; updated documentation to indicate implicit loop retrieval.
Test Updates
tests/benchmarking/controller.py, tests/test_control_system.py, tests/transports/epics/ca/test_initial_value.py, tests/transports/epics/pva/test_p4p.py, tests/transports/graphQL/test_graphql.py, tests/transports/rest/test_rest.py, tests/transports/tango/test_dsr.py
Updated all test instantiations to remove loop parameters; modernized async test patterns using @pytest.mark.asyncio and asyncio.create_task() instead of ensure_future; adjusted event loop lifecycle management in test setup and teardown.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • PR #332: Modifies src/fastcs/control_system.py to remove legacy build_controller_api path and transition to Controller.create_api_and_tasks, which may intersect with event loop initialization changes in this PR.

Suggested reviewers

  • shihab-dls

Poem

🐰 Loop, loop, whence did you go?
Event loops freed from chains below,
Asyncio patterns now take the lead,
Implicit flows are all we need!
Refactored paths run light and free,
The rabbit's work brings harmony! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 19.15% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main objective of the PR: simplifying event loop handling and removing deprecated get_event_loop usage. The changes across all modified files consistently reflect this primary goal.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch simplify-event-loop
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@GDYendell
Copy link
Contributor Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Mar 13, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/fastcs/control_system.py`:
- Around line 54-55: The code currently installs SIGINT/SIGTERM handlers inside
serve(), which hijacks signal handling when serve() is called from an existing
async runtime; move the signal-handler installation out of serve() and into
run() so handlers are only installed when run() is explicitly invoked, and guard
the installation with a main-thread check (threading.current_thread() is
threading.main_thread()) and ensure the event loop supports add_signal_handler
before calling it (e.g., get the loop from asyncio.get_event_loop() and call
add_signal_handler only when permitted). Modify serve() to no longer call
add_signal_handler, and update run() to install the handlers conditionally for
SIGINT/SIGTERM using the same handler functions currently referenced in serve().
- Around line 164-167: The interactive_shell task created inside run()/wrapper
must be tracked and its exceptions handled: when creating the coroutine
(interactive_shell / asyncio.to_thread(partial(shell.mainloop, ...))) retain the
Task returned by asyncio.create_task and attach a done callback that
catches/logs exceptions and always calls stop_event.set() on error; also ensure
tasks spawned by run() are appended to a local registry so failures are visible
(e.g., add the created Task to a list in run() and on completion remove it), and
update wrapper to create the task via that helper so any raised exceptions won't
silently block await stop_event.wait().

In `@tests/transports/tango/test_dsr.py`:
- Around line 58-71: The current finally block's cleanup iterates
gc.get_objects() and closes any asyncio.AbstractEventLoop it finds, which is
unsafe; change the cleanup to only close loops created by this test/context by
recording references to event loops you create (e.g., the 'loop' variable and
any loops returned or owned by DeviceTestContext) or by snapshotting existing
loops before entering DeviceTestContext and closing only loops that appear
afterwards, and remove the gc.get_objects() iteration; ensure you still call
loop.run_until_complete(asyncio.sleep(0)), loop.close(),
asyncio.set_event_loop(None) and gc.collect() but limit any additional .close()
calls to the specific loop objects you tracked rather than all AbstractEventLoop
instances.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: cc07bde6-a8ed-470a-a36d-83ba46c30250

📥 Commits

Reviewing files that changed from the base of the PR and between 6839f1d and b86c206.

📒 Files selected for processing (16)
  • src/fastcs/control_system.py
  • src/fastcs/launch.py
  • src/fastcs/transports/epics/ca/ioc.py
  • src/fastcs/transports/epics/ca/transport.py
  • src/fastcs/transports/epics/pva/transport.py
  • src/fastcs/transports/graphql/transport.py
  • src/fastcs/transports/rest/transport.py
  • src/fastcs/transports/tango/transport.py
  • src/fastcs/transports/transport.py
  • tests/benchmarking/controller.py
  • tests/test_control_system.py
  • tests/transports/epics/ca/test_initial_value.py
  • tests/transports/epics/pva/test_p4p.py
  • tests/transports/graphQL/test_graphql.py
  • tests/transports/rest/test_rest.py
  • tests/transports/tango/test_dsr.py
💤 Files with no reviewable changes (1)
  • tests/transports/epics/ca/test_initial_value.py

Comment on lines 164 to +167
def wrapper():
asyncio.create_task(coro)

self._loop.call_soon_threadsafe(wrapper)
loop.call_soon_threadsafe(wrapper)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -n "create_task\\(|stop_event.wait|call_soon_threadsafe" src/fastcs/control_system.py -C3

Repository: DiamondLightSource/fastcs

Length of output: 904


🏁 Script executed:

# Get the full _interactive_shell method and interactive_shell function
sed -n '155,195p' src/fastcs/control_system.py

Repository: DiamondLightSource/fastcs

Length of output: 1103


🏁 Script executed:

# Also search for the interactive_shell function definition
rg -n "async def interactive_shell" src/fastcs/control_system.py -A 20

Repository: DiamondLightSource/fastcs

Length of output: 684


🏁 Script executed:

# Check for any try-except blocks around stop_event.wait() or in the method
sed -n '157,190p' src/fastcs/control_system.py

Repository: DiamondLightSource/fastcs

Length of output: 1054


🏁 Script executed:

# Check if InteractiveShellEmbed or shell.mainloop might handle exceptions internally
rg -n "class.*_interactive_shell|try:|except|finally:" src/fastcs/control_system.py -B2 -A2

Repository: DiamondLightSource/fastcs

Length of output: 857


🏁 Script executed:

# Check where _interactive_shell is called from
rg -n "_interactive_shell" src/fastcs/control_system.py

Repository: DiamondLightSource/fastcs

Length of output: 200


🏁 Script executed:

# Check if there's any outer error handling that might catch exceptions
rg -n "def.*_interactive_shell|await.*_interactive_shell" src/fastcs/ -B5 -A10

Repository: DiamondLightSource/fastcs

Length of output: 1183


🏁 Script executed:

# Get the full serve method context to understand the full call chain
sed -n '120,156p' src/fastcs/control_system.py

Repository: DiamondLightSource/fastcs

Length of output: 1265


🏁 Script executed:

# Verify if InteractiveShellEmbed could raise exceptions during mainloop
rg -n "InteractiveShellEmbed|from.*IPython" src/fastcs/control_system.py -B2 -A2

Repository: DiamondLightSource/fastcs

Length of output: 596


Handle exceptions from interactive_shell task to prevent blocking and silent failures.

At line 181, the interactive_shell task is created without a retained handle and without error supervision. If asyncio.to_thread(partial(shell.mainloop, ...)) raises an exception, stop_event.set() is never called, causing await stop_event.wait() at line 182 to block indefinitely and the exception to be silently lost.

Additionally, at line 165, tasks created via the run() function are not tracked, preventing visibility into task failures from the interactive shell.

🔧 Suggested fix
     async def _interactive_shell(self, context: dict[str, Any]):
@@
-        def run(coro: Coroutine[None, None, None]):
+        pending_tasks: set[asyncio.Task[Any]] = set()
+
+        def _track(task: asyncio.Task[Any]) -> asyncio.Task[Any]:
+            pending_tasks.add(task)
+            task.add_done_callback(pending_tasks.discard)
+            return task
+
+        def run(coro: Coroutine[None, None, None]):
@@
             def wrapper():
-                asyncio.create_task(coro)
+                _track(asyncio.create_task(coro))
@@
-        stop_event = asyncio.Event()
-        asyncio.create_task(interactive_shell(context, stop_event))
-        await stop_event.wait()
+        stop_event = asyncio.Event()
+        shell_task = _track(asyncio.create_task(interactive_shell(context, stop_event)))
+        wait_task = asyncio.create_task(stop_event.wait())
+        done, pending = await asyncio.wait(
+            {shell_task, wait_task},
+            return_when=asyncio.FIRST_COMPLETED,
+        )
+        for task in pending:
+            task.cancel()
+        if shell_task in done:
+            await shell_task
🧰 Tools
🪛 Ruff (0.15.5)

[warning] 165-165: Store a reference to the return value of asyncio.create_task

(RUF006)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/fastcs/control_system.py` around lines 164 - 167, The interactive_shell
task created inside run()/wrapper must be tracked and its exceptions handled:
when creating the coroutine (interactive_shell /
asyncio.to_thread(partial(shell.mainloop, ...))) retain the Task returned by
asyncio.create_task and attach a done callback that catches/logs exceptions and
always calls stop_event.set() on error; also ensure tasks spawned by run() are
appended to a local registry so failures are visible (e.g., add the created Task
to a list in run() and on completion remove it), and update wrapper to create
the task via that helper so any raised exceptions won't silently block await
stop_event.wait().

Comment on lines +58 to +71
finally:
import gc

# Flush pending callbacks and force GC to close Tango C-level resources
loop.run_until_complete(asyncio.sleep(0))
loop.close()
asyncio.set_event_loop(None)

# Close any event loops created internally by DeviceTestContext so that
# gc.collect() does not trigger ResourceWarning from their __del__
for obj in gc.get_objects():
if isinstance(obj, asyncio.AbstractEventLoop) and not obj.is_closed():
obj.close()
gc.collect()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Risky cleanup: iterating gc.get_objects() and closing arbitrary event loops may cause instability.

The cleanup logic iterates over all Python objects in memory and closes any unclosed AbstractEventLoop instances. This approach has several issues:

  1. gc.get_objects() is expensive and returns thousands of objects
  2. Closing event loops owned by pytest infrastructure, other fixtures, or the Tango library itself can cause the ValueError: Invalid file descriptor: -1 error seen in pipeline failures
  3. This is a shotgun approach that may mask the root cause rather than fix it

Consider tracking only the loops explicitly created in this test context:

🛠️ Proposed fix: track created loops explicitly
 def create_test_context(tango_controller_api: AssertableControllerAPI):
     loop = asyncio.new_event_loop()
     asyncio.set_event_loop(loop)
     try:
         dsr = TangoDSR(tango_controller_api, loop)
         device = dsr._device
         # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html
         with DeviceTestContext(device, debug=0) as proxy:
             yield proxy
     finally:
-        import gc
-
         # Flush pending callbacks and force GC to close Tango C-level resources
         loop.run_until_complete(asyncio.sleep(0))
         loop.close()
         asyncio.set_event_loop(None)
-
-        # Close any event loops created internally by DeviceTestContext so that
-        # gc.collect() does not trigger ResourceWarning from their __del__
-        for obj in gc.get_objects():
-            if isinstance(obj, asyncio.AbstractEventLoop) and not obj.is_closed():
-                obj.close()
-        gc.collect()

If DeviceTestContext creates internal event loops that leak, consider filing an issue with pytango or using a more targeted cleanup such as tracking references before/after the context manager.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/transports/tango/test_dsr.py` around lines 58 - 71, The current finally
block's cleanup iterates gc.get_objects() and closes any
asyncio.AbstractEventLoop it finds, which is unsafe; change the cleanup to only
close loops created by this test/context by recording references to event loops
you create (e.g., the 'loop' variable and any loops returned or owned by
DeviceTestContext) or by snapshotting existing loops before entering
DeviceTestContext and closing only loops that appear afterwards, and remove the
gc.get_objects() iteration; ensure you still call
loop.run_until_complete(asyncio.sleep(0)), loop.close(),
asyncio.set_event_loop(None) and gc.collect() but limit any additional .close()
calls to the specific loop objects you tracked rather than all AbstractEventLoop
instances.

Copy link
Contributor

@coretl coretl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajgdls is now our resident Tango expert so I'll defer to him to review the tango bits. Everything else looks fine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants