diff --git a/pyproject.toml b/pyproject.toml index f178a375c..b0e80707e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,14 @@ addopts = """ """ # https://iscinumpy.gitlab.io/post/bound-version-constraints/#watch-for-warnings # https://github.com/DiamondLightSource/FastCS/issues/230 -filterwarnings = "error" +filterwarnings = [ + "error", + # DeviceTestContext (pytango) leaks Unix IPC sockets and its internal asyncio + # event loop on some Python 3.11.x versions. These cannot be closed from + # Python because they are Tango C-level resources. Suppress rather than fail. + "ignore:unclosed None: interactive: Whether to create an interactive IPython shell """ + loop = asyncio.get_running_loop() + if os.name != "nt": + task = asyncio.current_task() + if task is not None: + loop.add_signal_handler(signal.SIGINT, task.cancel) + loop.add_signal_handler(signal.SIGTERM, task.cancel) + await self._controller.initialise() self._controller.post_initialise() @@ -110,7 +111,7 @@ async def serve(self, interactive: bool = True) -> None: coros: list[Coroutine] = [] for transport in self._transports: - transport.connect(controller_api=self.controller_api, loop=self._loop) + transport.connect(controller_api=self.controller_api) coros.append(transport.serve()) common_context = context.keys() & transport.context.keys() if common_context: @@ -155,6 +156,7 @@ async def block_forever(): async def _interactive_shell(self, context: dict[str, Any]): """Spawn interactive shell in another thread and wait for it to complete.""" + loop = asyncio.get_running_loop() def run(coro: Coroutine[None, None, None]): """Run coroutine on FastCS event loop from IPython thread.""" @@ -162,7 +164,7 @@ def run(coro: Coroutine[None, None, None]): def wrapper(): asyncio.create_task(coro) - self._loop.call_soon_threadsafe(wrapper) + loop.call_soon_threadsafe(wrapper) async def interactive_shell( context: dict[str, object], stop_event: asyncio.Event @@ -176,7 +178,7 @@ async def interactive_shell( context["run"] = run stop_event = asyncio.Event() - self._loop.create_task(interactive_shell(context, stop_event)) + asyncio.create_task(interactive_shell(context, stop_event)) await stop_event.wait() def __del__(self): diff --git a/src/fastcs/launch.py b/src/fastcs/launch.py index 99874cb9d..e6ef5171b 100644 --- a/src/fastcs/launch.py +++ b/src/fastcs/launch.py @@ -1,4 +1,3 @@ -import asyncio import inspect import json from pathlib import Path @@ -158,9 +157,7 @@ def run( else: controller = controller_class() - instance = FastCS( - controller, instance_options.transport, loop=asyncio.get_event_loop() - ) + instance = FastCS(controller, instance_options.transport) instance.run() diff --git a/src/fastcs/transports/epics/ca/ioc.py b/src/fastcs/transports/epics/ca/ioc.py index 3f510a1b7..9e031a1eb 100644 --- a/src/fastcs/transports/epics/ca/ioc.py +++ b/src/fastcs/transports/epics/ca/ioc.py @@ -41,11 +41,8 @@ def __init__( _create_and_link_attribute_pvs(pv_prefix, controller_api) _create_and_link_command_pvs(pv_prefix, controller_api) - def run( - self, - loop: asyncio.AbstractEventLoop, - ) -> None: - dispatcher = AsyncioDispatcher(loop) # Needs running loop + def run(self) -> None: + dispatcher = AsyncioDispatcher(asyncio.get_running_loop()) builder.LoadDatabase() softioc.iocInit(dispatcher) diff --git a/src/fastcs/transports/epics/ca/transport.py b/src/fastcs/transports/epics/ca/transport.py index cc51b5d66..40da3547c 100644 --- a/src/fastcs/transports/epics/ca/transport.py +++ b/src/fastcs/transports/epics/ca/transport.py @@ -1,4 +1,3 @@ -import asyncio from dataclasses import dataclass, field from typing import Any @@ -28,13 +27,8 @@ class EpicsCATransport(Transport): gui: EpicsGUIOptions | None = None """Options for the GUI. If not set, no GUI will be created.""" - def connect( - self, - controller_api: ControllerAPI, - loop: asyncio.AbstractEventLoop, - ) -> None: + def connect(self, controller_api: ControllerAPI) -> None: self._controller_api = controller_api - self._loop = loop self._pv_prefix = self.epicsca.pv_prefix self._ioc = EpicsCAIOC(self.epicsca.pv_prefix, controller_api) @@ -47,7 +41,7 @@ def connect( async def serve(self) -> None: """Serve `ControllerAPI` over EPICS Channel Access""" logger.info("Running IOC", pv_prefix=self._pv_prefix) - self._ioc.run(self._loop) + self._ioc.run() @property def context(self) -> dict[str, Any]: diff --git a/src/fastcs/transports/epics/pva/transport.py b/src/fastcs/transports/epics/pva/transport.py index 5aff45916..a656a8e5c 100644 --- a/src/fastcs/transports/epics/pva/transport.py +++ b/src/fastcs/transports/epics/pva/transport.py @@ -1,13 +1,8 @@ -import asyncio from dataclasses import dataclass, field from fastcs.controllers import ControllerAPI from fastcs.logging import logger -from fastcs.transports.epics import ( - EpicsDocsOptions, - EpicsGUIOptions, - EpicsIOCOptions, -) +from fastcs.transports.epics import EpicsDocsOptions, EpicsGUIOptions, EpicsIOCOptions from fastcs.transports.epics.docs import EpicsDocs from fastcs.transports.epics.pva.gui import PvaEpicsGUI from fastcs.transports.transport import Transport @@ -23,11 +18,7 @@ class EpicsPVATransport(Transport): docs: EpicsDocsOptions | None = None gui: EpicsGUIOptions | None = None - def connect( - self, - controller_api: ControllerAPI, - loop: asyncio.AbstractEventLoop, - ) -> None: + def connect(self, controller_api: ControllerAPI) -> None: self._controller_api = controller_api self._pv_prefix = self.epicspva.pv_prefix self._ioc = P4PIOC(self.epicspva.pv_prefix, controller_api) diff --git a/src/fastcs/transports/graphql/transport.py b/src/fastcs/transports/graphql/transport.py index a590c062f..94b935d89 100644 --- a/src/fastcs/transports/graphql/transport.py +++ b/src/fastcs/transports/graphql/transport.py @@ -1,4 +1,3 @@ -import asyncio from dataclasses import dataclass, field from fastcs.controllers import ControllerAPI @@ -14,11 +13,7 @@ class GraphQLTransport(Transport): graphql: GraphQLServerOptions = field(default_factory=GraphQLServerOptions) - def connect( - self, - controller_api: ControllerAPI, - loop: asyncio.AbstractEventLoop, - ): + def connect(self, controller_api: ControllerAPI): self._server = GraphQLServer(controller_api) async def serve(self) -> None: diff --git a/src/fastcs/transports/rest/transport.py b/src/fastcs/transports/rest/transport.py index d89a89b9e..c4d9224ad 100644 --- a/src/fastcs/transports/rest/transport.py +++ b/src/fastcs/transports/rest/transport.py @@ -1,4 +1,3 @@ -import asyncio from dataclasses import dataclass, field from fastcs.controllers import ControllerAPI @@ -14,11 +13,7 @@ class RestTransport(Transport): rest: RestServerOptions = field(default_factory=RestServerOptions) - def connect( - self, - controller_api: ControllerAPI, - loop: asyncio.AbstractEventLoop, - ): + def connect(self, controller_api: ControllerAPI): self._server = RestServer(controller_api) async def serve(self) -> None: diff --git a/src/fastcs/transports/tango/transport.py b/src/fastcs/transports/tango/transport.py index 6ce3b102b..424165ece 100644 --- a/src/fastcs/transports/tango/transport.py +++ b/src/fastcs/transports/tango/transport.py @@ -13,13 +13,8 @@ class TangoTransport(Transport): tango: TangoDSROptions = field(default_factory=TangoDSROptions) - def connect( - self, - controller_api: ControllerAPI, - loop: asyncio.AbstractEventLoop, - ): - self._dsr = TangoDSR(controller_api, loop) + def connect(self, controller_api: ControllerAPI): + self._dsr = TangoDSR(controller_api, asyncio.get_running_loop()) async def serve(self) -> None: - coro = asyncio.to_thread(self._dsr.run, self.tango) - await coro + await asyncio.to_thread(self._dsr.run, self.tango) diff --git a/src/fastcs/transports/transport.py b/src/fastcs/transports/transport.py index 705880cec..bd9921887 100644 --- a/src/fastcs/transports/transport.py +++ b/src/fastcs/transports/transport.py @@ -1,4 +1,3 @@ -import asyncio from abc import abstractmethod from dataclasses import dataclass from typing import Any, ClassVar, Union @@ -24,13 +23,12 @@ def union(cls): return Union[tuple(cls.subclasses)] # noqa: UP007 @abstractmethod - def connect( - self, controller_api: ControllerAPI, loop: asyncio.AbstractEventLoop - ) -> None: + def connect(self, controller_api: ControllerAPI) -> None: """Connect the ``Transport`` to the control system - The `ControllerAPI` should be exposed over the transport. The provided event - loop should be used where required instead of creating a new one. + The `ControllerAPI` should be exposed over the transport. Transports that + require the event loop should retrieve it with `asyncio.get_running_loop`, + as this method is called from within an async context. """ pass diff --git a/tests/benchmarking/controller.py b/tests/benchmarking/controller.py index 7ffc2238a..11fedaec6 100644 --- a/tests/benchmarking/controller.py +++ b/tests/benchmarking/controller.py @@ -1,5 +1,3 @@ -import asyncio - from fastcs import FastCS from fastcs.attributes import AttrR, AttrW from fastcs.controllers import Controller @@ -25,7 +23,7 @@ def run(): ), TangoTransport(tango=TangoDSROptions(dev_name="MY/BENCHMARK/DEVICE")), ] - instance = FastCS(MyTestController(), transport_options, asyncio.get_event_loop()) + instance = FastCS(MyTestController(), transport_options) instance.run() diff --git a/tests/test_control_system.py b/tests/test_control_system.py index c231b136b..523448910 100644 --- a/tests/test_control_system.py +++ b/tests/test_control_system.py @@ -13,9 +13,8 @@ @pytest.mark.asyncio async def test_scan_tasks(controller): - loop = asyncio.get_event_loop() transport_options = [] - fastcs = FastCS(controller, transport_options, loop) + fastcs = FastCS(controller, transport_options) asyncio.create_task(fastcs.serve(interactive=False)) await asyncio.sleep(0.1) @@ -43,9 +42,8 @@ async def do_nothing_static(self): pass controller = MyTestController() - loop = asyncio.get_event_loop() transport_options = [] - fastcs = FastCS(controller, transport_options, loop) + fastcs = FastCS(controller, transport_options) asyncio.create_task(fastcs.serve(interactive=False)) await asyncio.sleep(0.1) @@ -79,9 +77,7 @@ class MyController(Controller): ) controller = MyController(ios=[AttributeIOTimesCalled()]) - loop = asyncio.get_event_loop() - - fastcs = FastCS(controller, [], loop) + fastcs = FastCS(controller, []) assert controller.update_quickly.get() == 0 assert controller.update_once.get() == 0 @@ -108,9 +104,7 @@ async def disconnect(self): self.connected = False controller = MyTestController() - - loop = asyncio.get_event_loop() - fastcs = FastCS(controller, [], loop) + fastcs = FastCS(controller, []) task = asyncio.create_task(fastcs.serve(interactive=False)) diff --git a/tests/transports/epics/ca/test_initial_value.py b/tests/transports/epics/ca/test_initial_value.py index 727cb0d7d..137ff8caf 100644 --- a/tests/transports/epics/ca/test_initial_value.py +++ b/tests/transports/epics/ca/test_initial_value.py @@ -51,12 +51,10 @@ class InitialValuesController(Controller): async def test_initial_values_set_in_ca(mocker): pv_prefix = "SOFTIOC_INITIAL_DEVICE" - loop = asyncio.get_event_loop() controller = InitialValuesController() fastcs = FastCS( controller, [EpicsCATransport(epicsca=EpicsIOCOptions(pv_prefix=pv_prefix))], - loop, ) record_spy = mocker.spy(ca_ioc, "_make_in_record") diff --git a/tests/transports/epics/pva/test_p4p.py b/tests/transports/epics/pva/test_p4p.py index 3a1a06aca..f4bbcffd9 100644 --- a/tests/transports/epics/pva/test_p4p.py +++ b/tests/transports/epics/pva/test_p4p.py @@ -216,7 +216,8 @@ def make_fastcs(pv_prefix: str, controller: Controller) -> FastCS: ) -def test_read_signal_set(): +@pytest.mark.asyncio +async def test_read_signal_set(): class SomeController(Controller): a: AttrRW = AttrRW(Int(max=400_000, max_alarm=40_000)) b: AttrR = AttrR(Float(min=-1, min_alarm=-0.5, prec=2)) @@ -240,12 +241,10 @@ async def _wait_and_set_attr_r(): a_values, b_values = [], [] a_monitor = ctxt.monitor(f"{pv_prefix}:A_RBV", a_values.append) b_monitor = ctxt.monitor(f"{pv_prefix}:B", b_values.append) - serve = asyncio.ensure_future(fastcs.serve(interactive=False)) - wait_and_set_attr_r = asyncio.ensure_future(_wait_and_set_attr_r()) + serve = asyncio.create_task(fastcs.serve(interactive=False)) + wait_and_set_attr_r = asyncio.create_task(_wait_and_set_attr_r()) try: - asyncio.get_event_loop().run_until_complete( - asyncio.wait_for(asyncio.gather(serve, wait_and_set_attr_r), timeout=0.2) - ) + await asyncio.wait_for(asyncio.gather(serve, wait_and_set_attr_r), timeout=0.2) except TimeoutError: ... finally: @@ -257,7 +256,8 @@ async def _wait_and_set_attr_r(): assert b_values == [0.0, -0.99, -0.91] # Last is -0.91 because of prec -def test_pvi_grouping(): +@pytest.mark.asyncio +async def test_pvi_grouping(): class ChildChildController(Controller): attr_e: AttrRW = AttrRW(Int()) attr_f: AttrR = AttrR(String()) @@ -321,12 +321,10 @@ class SomeController(Controller): child_child_child_controller_monitor = ctxt.monitor( f"{pv_prefix}Child:0:ChildChild:PVI", child_child_child_controller_pvi.append ) - serve = asyncio.ensure_future(fastcs.serve(interactive=False)) + serve = asyncio.create_task(fastcs.serve(interactive=False)) try: - asyncio.get_event_loop().run_until_complete( - asyncio.wait_for(serve, timeout=0.2) - ) + await asyncio.wait_for(serve, timeout=0.2) except TimeoutError: ... finally: @@ -525,7 +523,8 @@ async def _wait_and_put_pvs(): ) -def test_command_method_put_twice(caplog): +@pytest.mark.asyncio +async def test_command_method_put_twice(caplog): class SomeController(Controller): command_runs_for_a_while_times = [] command_spawns_a_task_times = [] @@ -572,11 +571,9 @@ async def put_pvs(): ) assert expected_error_string in caplog.text - serve = asyncio.ensure_future(fastcs.serve(interactive=False)) + serve = asyncio.create_task(fastcs.serve(interactive=False)) try: - asyncio.get_event_loop().run_until_complete( - asyncio.wait_for(asyncio.gather(serve, put_pvs()), timeout=1) - ) + await asyncio.wait_for(asyncio.gather(serve, put_pvs()), timeout=1) except TimeoutError: ... serve.cancel() @@ -613,7 +610,8 @@ async def put_pvs(): ) -def test_block_flag_waits_for_callback_completion(): +@pytest.mark.asyncio +async def test_block_flag_waits_for_callback_completion(): class SomeController(Controller): @command() async def command_runs_for_a_while(self): @@ -635,14 +633,9 @@ async def put_pvs(): ) command_runs_for_a_while_times.append((start_time, datetime.now())) - serve = asyncio.ensure_future(fastcs.serve(interactive=False)) + serve = asyncio.create_task(fastcs.serve(interactive=False)) try: - asyncio.get_event_loop().run_until_complete( - asyncio.wait_for( - asyncio.gather(serve, put_pvs()), - timeout=0.5, - ) - ) + await asyncio.wait_for(asyncio.gather(serve, put_pvs()), timeout=0.5) except TimeoutError: ... serve.cancel() diff --git a/tests/transports/graphQL/test_graphql.py b/tests/transports/graphQL/test_graphql.py index 24f1c7c16..2cb95448e 100644 --- a/tests/transports/graphQL/test_graphql.py +++ b/tests/transports/graphQL/test_graphql.py @@ -1,4 +1,3 @@ -import asyncio import copy import json from typing import Any @@ -66,7 +65,7 @@ def nest_response(path: list[str], value: Any) -> dict: def create_test_client(gql_controller_api: AssertableControllerAPI) -> TestClient: graphql_transport = GraphQLTransport() - graphql_transport.connect(gql_controller_api, asyncio.AbstractEventLoop()) + graphql_transport.connect(gql_controller_api) return TestClient(graphql_transport._server._app) diff --git a/tests/transports/rest/test_rest.py b/tests/transports/rest/test_rest.py index 2be0b21b9..2cf870fc4 100644 --- a/tests/transports/rest/test_rest.py +++ b/tests/transports/rest/test_rest.py @@ -1,4 +1,3 @@ -import asyncio import enum import numpy as np @@ -32,7 +31,7 @@ def rest_controller_api(class_mocker: MockerFixture): def create_test_client(rest_controller_api: ControllerAPI) -> TestClient: rest_transport = RestTransport() - rest_transport.connect(rest_controller_api, asyncio.AbstractEventLoop()) + rest_transport.connect(rest_controller_api) return TestClient(rest_transport._server._app) diff --git a/tests/transports/tango/test_dsr.py b/tests/transports/tango/test_dsr.py index b269ce5a8..aeeed58b2 100644 --- a/tests/transports/tango/test_dsr.py +++ b/tests/transports/tango/test_dsr.py @@ -13,7 +13,7 @@ from fastcs.attributes import AttrR, AttrRW, AttrW from fastcs.datatypes import Bool, Enum, Float, Int, String, Waveform -from fastcs.transports.tango.transport import TangoTransport +from fastcs.transports.tango.dsr import TangoDSR async def patch_run_threadsafe_blocking(coro, loop): @@ -47,15 +47,34 @@ def tango_controller_api(class_mocker: MockerFixture) -> AssertableControllerAPI def create_test_context(tango_controller_api: AssertableControllerAPI): - tango_transport = TangoTransport() - tango_transport.connect( - tango_controller_api, - asyncio.get_event_loop(), - ) - device = tango_transport._dsr._device - # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html - with DeviceTestContext(device, debug=0) as proxy: - yield proxy + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + dsr = TangoDSR(tango_controller_api, loop) + device = dsr._device + # https://tango-controls.readthedocs.io/projects/pytango/en/v9.5.1/testing/test_context.html + with DeviceTestContext(device, debug=0) as proxy: + yield proxy + finally: + import gc + + # Flush pending callbacks and force GC to close Tango C-level resources + loop.run_until_complete(asyncio.sleep(0)) + loop.close() + asyncio.set_event_loop(None) + + # Close any event loops created internally by DeviceTestContext so that + # gc.collect() does not trigger ResourceWarning from their __del__. + # Use try/except because on some Python 3.11.x versions the loop's + # internal self-pipe may already be invalidated (fd=-1) even though + # is_closed() returns False, causing a ValueError in close(). + for obj in gc.get_objects(): + if isinstance(obj, asyncio.AbstractEventLoop) and not obj.is_closed(): + try: + obj.close() + except Exception: + pass + gc.collect() class TestTangoDevice: