-
Notifications
You must be signed in to change notification settings - Fork 6
Simplify event loop handling and remove deprecated get_event_loop #340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: docs
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,11 +32,9 @@ def __init__( | |
| self, | ||
| controller: Controller, | ||
| transports: Sequence[Transport], | ||
| loop: asyncio.AbstractEventLoop | None = None, | ||
| ): | ||
| self._controller = controller | ||
| self._transports = transports | ||
| self._loop = loop or asyncio.get_event_loop() | ||
|
|
||
| self._scan_coros: list[ScanCallback] = [] | ||
| self._initial_coros: list[ScanCallback] = [] | ||
|
|
@@ -47,24 +45,20 @@ def run(self, interactive: bool = True): | |
| """Run the application | ||
|
|
||
| This is a convenience method to call `serve` in a synchronous context. | ||
| To use in an async context, call `serve` directly. | ||
|
|
||
| Args: | ||
| interactive: Whether to create an interactive IPython shell | ||
|
|
||
| """ | ||
| serve = asyncio.ensure_future(self.serve(interactive=interactive)) | ||
|
|
||
| if os.name != "nt": | ||
| self._loop.add_signal_handler(signal.SIGINT, serve.cancel) | ||
| self._loop.add_signal_handler(signal.SIGTERM, serve.cancel) | ||
| self._loop.run_until_complete(serve) | ||
| asyncio.run(self.serve(interactive=interactive)) | ||
|
|
||
| async def _run_initial_coros(self): | ||
| for coro in self._initial_coros: | ||
| await coro() | ||
|
|
||
| async def _start_scan_tasks(self): | ||
| self._scan_tasks = {self._loop.create_task(coro()) for coro in self._scan_coros} | ||
| self._scan_tasks = {asyncio.create_task(coro()) for coro in self._scan_coros} | ||
|
|
||
| def _stop_scan_tasks(self): | ||
| for task in self._scan_tasks: | ||
|
|
@@ -93,6 +87,13 @@ async def serve(self, interactive: bool = True) -> None: | |
| interactive: Whether to create an interactive IPython shell | ||
|
|
||
| """ | ||
| loop = asyncio.get_running_loop() | ||
| if os.name != "nt": | ||
| task = asyncio.current_task() | ||
| if task is not None: | ||
| loop.add_signal_handler(signal.SIGINT, task.cancel) | ||
| loop.add_signal_handler(signal.SIGTERM, task.cancel) | ||
|
|
||
| await self._controller.initialise() | ||
| self._controller.post_initialise() | ||
|
|
||
|
|
@@ -110,7 +111,7 @@ async def serve(self, interactive: bool = True) -> None: | |
|
|
||
| coros: list[Coroutine] = [] | ||
| for transport in self._transports: | ||
| transport.connect(controller_api=self.controller_api, loop=self._loop) | ||
| transport.connect(controller_api=self.controller_api) | ||
| coros.append(transport.serve()) | ||
| common_context = context.keys() & transport.context.keys() | ||
| if common_context: | ||
|
|
@@ -155,14 +156,15 @@ async def block_forever(): | |
|
|
||
| async def _interactive_shell(self, context: dict[str, Any]): | ||
| """Spawn interactive shell in another thread and wait for it to complete.""" | ||
| loop = asyncio.get_running_loop() | ||
|
|
||
| def run(coro: Coroutine[None, None, None]): | ||
| """Run coroutine on FastCS event loop from IPython thread.""" | ||
|
|
||
| def wrapper(): | ||
| asyncio.create_task(coro) | ||
|
|
||
| self._loop.call_soon_threadsafe(wrapper) | ||
| loop.call_soon_threadsafe(wrapper) | ||
|
Comment on lines
164
to
+167
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
rg -n "create_task\\(|stop_event.wait|call_soon_threadsafe" src/fastcs/control_system.py -C3Repository: DiamondLightSource/fastcs Length of output: 904 🏁 Script executed: # Get the full _interactive_shell method and interactive_shell function
sed -n '155,195p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs Length of output: 1103 🏁 Script executed: # Also search for the interactive_shell function definition
rg -n "async def interactive_shell" src/fastcs/control_system.py -A 20Repository: DiamondLightSource/fastcs Length of output: 684 🏁 Script executed: # Check for any try-except blocks around stop_event.wait() or in the method
sed -n '157,190p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs Length of output: 1054 🏁 Script executed: # Check if InteractiveShellEmbed or shell.mainloop might handle exceptions internally
rg -n "class.*_interactive_shell|try:|except|finally:" src/fastcs/control_system.py -B2 -A2Repository: DiamondLightSource/fastcs Length of output: 857 🏁 Script executed: # Check where _interactive_shell is called from
rg -n "_interactive_shell" src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs Length of output: 200 🏁 Script executed: # Check if there's any outer error handling that might catch exceptions
rg -n "def.*_interactive_shell|await.*_interactive_shell" src/fastcs/ -B5 -A10Repository: DiamondLightSource/fastcs Length of output: 1183 🏁 Script executed: # Get the full serve method context to understand the full call chain
sed -n '120,156p' src/fastcs/control_system.pyRepository: DiamondLightSource/fastcs Length of output: 1265 🏁 Script executed: # Verify if InteractiveShellEmbed could raise exceptions during mainloop
rg -n "InteractiveShellEmbed|from.*IPython" src/fastcs/control_system.py -B2 -A2Repository: DiamondLightSource/fastcs Length of output: 596 Handle exceptions from At line 181, the Additionally, at line 165, tasks created via the 🔧 Suggested fix async def _interactive_shell(self, context: dict[str, Any]):
@@
- def run(coro: Coroutine[None, None, None]):
+ pending_tasks: set[asyncio.Task[Any]] = set()
+
+ def _track(task: asyncio.Task[Any]) -> asyncio.Task[Any]:
+ pending_tasks.add(task)
+ task.add_done_callback(pending_tasks.discard)
+ return task
+
+ def run(coro: Coroutine[None, None, None]):
@@
def wrapper():
- asyncio.create_task(coro)
+ _track(asyncio.create_task(coro))
@@
- stop_event = asyncio.Event()
- asyncio.create_task(interactive_shell(context, stop_event))
- await stop_event.wait()
+ stop_event = asyncio.Event()
+ shell_task = _track(asyncio.create_task(interactive_shell(context, stop_event)))
+ wait_task = asyncio.create_task(stop_event.wait())
+ done, pending = await asyncio.wait(
+ {shell_task, wait_task},
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ for task in pending:
+ task.cancel()
+ if shell_task in done:
+ await shell_task🧰 Tools🪛 Ruff (0.15.5)[warning] 165-165: Store a reference to the return value of (RUF006) 🤖 Prompt for AI Agents |
||
|
|
||
| async def interactive_shell( | ||
| context: dict[str, object], stop_event: asyncio.Event | ||
|
|
@@ -176,7 +178,7 @@ async def interactive_shell( | |
| context["run"] = run | ||
|
|
||
| stop_event = asyncio.Event() | ||
| self._loop.create_task(interactive_shell(context, stop_event)) | ||
| asyncio.create_task(interactive_shell(context, stop_event)) | ||
| await stop_event.wait() | ||
|
|
||
| def __del__(self): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.