diff --git a/can/notifier.py b/can/notifier.py index cb91cf7b4..e52cd6857 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -10,12 +10,9 @@ from collections.abc import Awaitable, Callable, Iterable from contextlib import AbstractContextManager from types import TracebackType -from typing import ( - Any, - Final, - NamedTuple, -) +from typing import Any, Final, NamedTuple +from can import CanError from can.bus import BusABC from can.listener import Listener from can.message import Message @@ -101,7 +98,6 @@ def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]: class Notifier(AbstractContextManager["Notifier"]): - _registry: Final = _NotifierRegistry() def __init__( @@ -158,28 +154,21 @@ def bus(self) -> BusABC | tuple["BusABC", ...]: return tuple(self._bus_list) def add_bus(self, bus: BusABC) -> None: - """Add a bus for notification. + """Add a bus for notification.""" + self._bus_list.append(bus) + self._start_reader(bus) - :param bus: - CAN bus instance. - :raises ValueError: - If the *bus* is already assigned to an active :class:`~can.Notifier`. - """ - # add bus to notifier registry + def _start_reader(self, bus: BusABC) -> None: + """Internal helper to spin up the actual background worker for a bus.""" Notifier._registry.register(bus, self) - # add bus to internal bus list - self._bus_list.append(bus) - file_descriptor: int = -1 try: file_descriptor = bus.fileno() except NotImplementedError: - # Bus doesn't support fileno, we fall back to thread based reader pass if self._loop is not None and file_descriptor >= 0: - # Use bus file descriptor to watch for messages self._loop.add_reader(file_descriptor, self._on_message_available, bus) self._readers.append(file_descriptor) else: @@ -218,6 +207,10 @@ def stop(self, timeout: float = 5.0) -> None: for bus in self._bus_list: Notifier._registry.unregister(bus, self) + self._readers = [] + # Clear any pending asyncio tasks to prevent stale references + self._tasks.clear() + def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop if self._loop: @@ -233,18 +226,20 @@ def _rx_thread(self, bus: BusABC) -> None: if msg := bus.recv(self.timeout): with self._lock: handle_message(msg) - except Exception as exc: # pylint: disable=broad-except + except CanError as exc: self.exception = exc + # Always notify the system when the bus fails if self._loop is not None: self._loop.call_soon_threadsafe(self._on_error, exc) - # Raise anyway - raise - elif not self._on_error(exc): - # If it was not handled, raise the exception here - raise else: - # It was handled, so only log it - logger.debug("suppressed exception: %s", exc) + self._on_error(exc) + logger.error("CAN error in notifier thread: %s", exc) + except Exception as exc: + # Catching other runtime errors to prevent silent thread death + self.exception = exc + logger.critical("Unexpected error in notifier thread: %s", exc) + self._on_error(exc) + raise # Re-raise unexpected non-CAN errors def _on_message_available(self, bus: BusABC) -> None: if msg := bus.recv(0): @@ -317,6 +312,30 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]: """ return Notifier._registry.find_instances(bus) + def restart(self) -> None: + """Restarts the Notifier if it has been stopped. + + :raises RuntimeWarning: If the notifier is already running. + """ + with self._lock: + if not self._stopped: + raise RuntimeWarning("Notifier is already running.") + + self._stopped = False + self.exception = None + # Note: _bus_list is preserved from previous run + + for bus in self._bus_list: + self._start_reader(bus) + + # Re-trigger listeners if they have a start method + for listener in self.listeners: + if hasattr(listener, "start"): + try: + listener.start() + except (AttributeError, RuntimeError, TypeError, ValueError) as e: + logger.error("Failed to restart listener: %s", e) + def __exit__( self, exc_type: type[BaseException] | None, diff --git a/doc/changelog.d/2055.added.rst b/doc/changelog.d/2055.added.rst new file mode 100644 index 000000000..7586aaa26 --- /dev/null +++ b/doc/changelog.d/2055.added.rst @@ -0,0 +1 @@ +Added new Notifier Restart method. diff --git a/test/notifier_test.py b/test/notifier_test.py index d8512a00b..24a68c1b5 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -72,6 +72,59 @@ def test_registry(self): # find_instance must return the existing instance self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) + def test_restart(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + notifier = can.Notifier(bus, [reader], 0.1) + + bus.send(can.Message(arbitration_id=0x123)) + self.assertIsNotNone(reader.get_message(1)) + + notifier.stop() + self.assertTrue(notifier.stopped) + + new_reader = can.BufferedReader() + notifier.listeners = [new_reader] + + notifier.restart() + self.assertFalse(notifier.stopped) + + # Small settling time for the thread to actually start + time.sleep(0.2) + + # Send and Verify + msg = can.Message(arbitration_id=0xABC) + bus.send(msg) + + recv = new_reader.get_message(1) + self.assertIsNotNone(recv, "Failed to receive message after restart") + self.assertEqual(recv.arbitration_id, 0xABC) + + notifier.stop() + + def test_restart_registry_lifecycle(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + notifier = can.Notifier(bus, [], 0.1) + notifier.stop() + + # Verify it is removed from registry + self.assertEqual(can.Notifier.find_instances(bus), ()) + + notifier.restart() + + # Verify it is back in the registry and blocking others + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) + self.assertRaises(ValueError, can.Notifier, bus, [], 0.1) + + notifier.stop() + + def test_double_restart_warning(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + with can.Notifier(bus, [], 0.1) as notifier: + # Attempting to restart an already running notifier should warn + with self.assertRaises(RuntimeWarning): + notifier.restart() + class AsyncNotifierTest(unittest.TestCase): def test_asyncio_notifier(self): @@ -88,6 +141,36 @@ async def run_it(): asyncio.run(run_it()) + def test_asyncio_notifier_restart(self): + async def run_it(): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.AsyncBufferedReader() + notifier = can.Notifier( + bus, [reader], 0.1, loop=asyncio.get_running_loop() + ) + + notifier.stop() + + # In some cases, creating a new listener is the safest path for a restart + new_reader = can.AsyncBufferedReader() + notifier.listeners = [new_reader] + + notifier.restart() + + # Small yield to let the selector register the FD + await asyncio.sleep(0.1) + + bus.send(can.Message(arbitration_id=0x123)) + + recv_msg = await asyncio.wait_for(new_reader.get_message(), 1.5) + + self.assertIsNotNone(recv_msg) + self.assertEqual(recv_msg.arbitration_id, 0x123) + + notifier.stop() + + asyncio.run(run_it()) + if __name__ == "__main__": unittest.main()