Simplify event loop handling and remove deprecated get_event_loop#340
Simplify event loop handling and remove deprecated get_event_loop#340
Conversation
📝 WalkthroughWalkthroughThis PR removes explicit event loop parameters from the FastCS API and internal implementations, replacing manual loop passing with implicit asyncio patterns such as Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/fastcs/control_system.py`:
- Around line 54-55: The code currently installs SIGINT/SIGTERM handlers inside
serve(), which hijacks signal handling when serve() is called from an existing
async runtime; move the signal-handler installation out of serve() and into
run() so handlers are only installed when run() is explicitly invoked, and guard
the installation with a main-thread check (threading.current_thread() is
threading.main_thread()) and ensure the event loop supports add_signal_handler
before calling it (e.g., get the loop from asyncio.get_event_loop() and call
add_signal_handler only when permitted). Modify serve() to no longer call
add_signal_handler, and update run() to install the handlers conditionally for
SIGINT/SIGTERM using the same handler functions currently referenced in serve().
- Around line 164-167: The interactive_shell task created inside run()/wrapper
must be tracked and its exceptions handled: when creating the coroutine
(interactive_shell / asyncio.to_thread(partial(shell.mainloop, ...))) retain the
Task returned by asyncio.create_task and attach a done callback that
catches/logs exceptions and always calls stop_event.set() on error; also ensure
tasks spawned by run() are appended to a local registry so failures are visible
(e.g., add the created Task to a list in run() and on completion remove it), and
update wrapper to create the task via that helper so any raised exceptions won't
silently block await stop_event.wait().
In `@tests/transports/tango/test_dsr.py`:
- Around line 58-71: The current finally block's cleanup iterates
gc.get_objects() and closes any asyncio.AbstractEventLoop it finds, which is
unsafe; change the cleanup to only close loops created by this test/context by
recording references to event loops you create (e.g., the 'loop' variable and
any loops returned or owned by DeviceTestContext) or by snapshotting existing
loops before entering DeviceTestContext and closing only loops that appear
afterwards, and remove the gc.get_objects() iteration; ensure you still call
loop.run_until_complete(asyncio.sleep(0)), loop.close(),
asyncio.set_event_loop(None) and gc.collect() but limit any additional .close()
calls to the specific loop objects you tracked rather than all AbstractEventLoop
instances.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cc07bde6-a8ed-470a-a36d-83ba46c30250
📒 Files selected for processing (16)
src/fastcs/control_system.pysrc/fastcs/launch.pysrc/fastcs/transports/epics/ca/ioc.pysrc/fastcs/transports/epics/ca/transport.pysrc/fastcs/transports/epics/pva/transport.pysrc/fastcs/transports/graphql/transport.pysrc/fastcs/transports/rest/transport.pysrc/fastcs/transports/tango/transport.pysrc/fastcs/transports/transport.pytests/benchmarking/controller.pytests/test_control_system.pytests/transports/epics/ca/test_initial_value.pytests/transports/epics/pva/test_p4p.pytests/transports/graphQL/test_graphql.pytests/transports/rest/test_rest.pytests/transports/tango/test_dsr.py
💤 Files with no reviewable changes (1)
- tests/transports/epics/ca/test_initial_value.py
| def wrapper(): | ||
| asyncio.create_task(coro) | ||
|
|
||
| self._loop.call_soon_threadsafe(wrapper) | ||
| loop.call_soon_threadsafe(wrapper) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n "create_task\\(|stop_event.wait|call_soon_threadsafe" src/fastcs/control_system.py -C3Repository: DiamondLightSource/fastcs
Length of output: 904
🏁 Script executed:
# Get the full _interactive_shell method and interactive_shell function
sed -n '155,195p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs
Length of output: 1103
🏁 Script executed:
# Also search for the interactive_shell function definition
rg -n "async def interactive_shell" src/fastcs/control_system.py -A 20Repository: DiamondLightSource/fastcs
Length of output: 684
🏁 Script executed:
# Check for any try-except blocks around stop_event.wait() or in the method
sed -n '157,190p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs
Length of output: 1054
🏁 Script executed:
# Check if InteractiveShellEmbed or shell.mainloop might handle exceptions internally
rg -n "class.*_interactive_shell|try:|except|finally:" src/fastcs/control_system.py -B2 -A2Repository: DiamondLightSource/fastcs
Length of output: 857
🏁 Script executed:
# Check where _interactive_shell is called from
rg -n "_interactive_shell" src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs
Length of output: 200
🏁 Script executed:
# Check if there's any outer error handling that might catch exceptions
rg -n "def.*_interactive_shell|await.*_interactive_shell" src/fastcs/ -B5 -A10Repository: DiamondLightSource/fastcs
Length of output: 1183
🏁 Script executed:
# Get the full serve method context to understand the full call chain
sed -n '120,156p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs
Length of output: 1265
🏁 Script executed:
# Verify if InteractiveShellEmbed could raise exceptions during mainloop
rg -n "InteractiveShellEmbed|from.*IPython" src/fastcs/control_system.py -B2 -A2Repository: DiamondLightSource/fastcs
Length of output: 596
Handle exceptions from interactive_shell task to prevent blocking and silent failures.
At line 181, the interactive_shell task is created without a retained handle and without error supervision. If asyncio.to_thread(partial(shell.mainloop, ...)) raises an exception, stop_event.set() is never called, causing await stop_event.wait() at line 182 to block indefinitely and the exception to be silently lost.
Additionally, at line 165, tasks created via the run() function are not tracked, preventing visibility into task failures from the interactive shell.
🔧 Suggested fix
async def _interactive_shell(self, context: dict[str, Any]):
@@
- def run(coro: Coroutine[None, None, None]):
+ pending_tasks: set[asyncio.Task[Any]] = set()
+
+ def _track(task: asyncio.Task[Any]) -> asyncio.Task[Any]:
+ pending_tasks.add(task)
+ task.add_done_callback(pending_tasks.discard)
+ return task
+
+ def run(coro: Coroutine[None, None, None]):
@@
def wrapper():
- asyncio.create_task(coro)
+ _track(asyncio.create_task(coro))
@@
- stop_event = asyncio.Event()
- asyncio.create_task(interactive_shell(context, stop_event))
- await stop_event.wait()
+ stop_event = asyncio.Event()
+ shell_task = _track(asyncio.create_task(interactive_shell(context, stop_event)))
+ wait_task = asyncio.create_task(stop_event.wait())
+ done, pending = await asyncio.wait(
+ {shell_task, wait_task},
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ for task in pending:
+ task.cancel()
+ if shell_task in done:
+ await shell_task🧰 Tools
🪛 Ruff (0.15.5)
[warning] 165-165: Store a reference to the return value of asyncio.create_task
(RUF006)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/fastcs/control_system.py` around lines 164 - 167, The interactive_shell
task created inside run()/wrapper must be tracked and its exceptions handled:
when creating the coroutine (interactive_shell /
asyncio.to_thread(partial(shell.mainloop, ...))) retain the Task returned by
asyncio.create_task and attach a done callback that catches/logs exceptions and
always calls stop_event.set() on error; also ensure tasks spawned by run() are
appended to a local registry so failures are visible (e.g., add the created Task
to a list in run() and on completion remove it), and update wrapper to create
the task via that helper so any raised exceptions won't silently block await
stop_event.wait().
| finally: | ||
| import gc | ||
|
|
||
| # Flush pending callbacks and force GC to close Tango C-level resources | ||
| loop.run_until_complete(asyncio.sleep(0)) | ||
| loop.close() | ||
| asyncio.set_event_loop(None) | ||
|
|
||
| # Close any event loops created internally by DeviceTestContext so that | ||
| # gc.collect() does not trigger ResourceWarning from their __del__ | ||
| for obj in gc.get_objects(): | ||
| if isinstance(obj, asyncio.AbstractEventLoop) and not obj.is_closed(): | ||
| obj.close() | ||
| gc.collect() |
There was a problem hiding this comment.
Risky cleanup: iterating gc.get_objects() and closing arbitrary event loops may cause instability.
The cleanup logic iterates over all Python objects in memory and closes any unclosed AbstractEventLoop instances. This approach has several issues:
gc.get_objects()is expensive and returns thousands of objects- Closing event loops owned by pytest infrastructure, other fixtures, or the Tango library itself can cause the
ValueError: Invalid file descriptor: -1error seen in pipeline failures - This is a shotgun approach that may mask the root cause rather than fix it
Consider tracking only the loops explicitly created in this test context:
🛠️ Proposed fix: track created loops explicitly
def create_test_context(tango_controller_api: AssertableControllerAPI):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
dsr = TangoDSR(tango_controller_api, loop)
device = dsr._device
# https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html
with DeviceTestContext(device, debug=0) as proxy:
yield proxy
finally:
- import gc
-
# Flush pending callbacks and force GC to close Tango C-level resources
loop.run_until_complete(asyncio.sleep(0))
loop.close()
asyncio.set_event_loop(None)
-
- # Close any event loops created internally by DeviceTestContext so that
- # gc.collect() does not trigger ResourceWarning from their __del__
- for obj in gc.get_objects():
- if isinstance(obj, asyncio.AbstractEventLoop) and not obj.is_closed():
- obj.close()
- gc.collect()If DeviceTestContext creates internal event loops that leak, consider filing an issue with pytango or using a more targeted cleanup such as tracking references before/after the context manager.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/transports/tango/test_dsr.py` around lines 58 - 71, The current finally
block's cleanup iterates gc.get_objects() and closes any
asyncio.AbstractEventLoop it finds, which is unsafe; change the cleanup to only
close loops created by this test/context by recording references to event loops
you create (e.g., the 'loop' variable and any loops returned or owned by
DeviceTestContext) or by snapshotting existing loops before entering
DeviceTestContext and closing only loops that appear afterwards, and remove the
gc.get_objects() iteration; ensure you still call
loop.run_until_complete(asyncio.sleep(0)), loop.close(),
asyncio.set_event_loop(None) and gc.collect() but limit any additional .close()
calls to the specific loop objects you tracked rather than all AbstractEventLoop
instances.
In the process of writing documentation it became clear that
Summary:
loopparameter toFastCSand instead require clients to callservefrom an async context with the loop they want it to run inloopto transports and instead retrieve it viaget_running_looponly where required (EpicaCA and Tango)asyncio.create_taskfrom an async context consistently in tests, rather thanrun_until_completeorensure_futureSummary by CodeRabbit