Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions can/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
from collections.abc import Awaitable, Callable, Iterable
from contextlib import AbstractContextManager
from types import TracebackType
from typing import (
Any,
Final,
NamedTuple,
)
from typing import Any, Final, NamedTuple

from can import CanError
from can.bus import BusABC
from can.listener import Listener
from can.message import Message
Expand Down Expand Up @@ -101,7 +98,6 @@ def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]:


class Notifier(AbstractContextManager["Notifier"]):

_registry: Final = _NotifierRegistry()

def __init__(
Expand Down Expand Up @@ -158,28 +154,21 @@ def bus(self) -> BusABC | tuple["BusABC", ...]:
return tuple(self._bus_list)

def add_bus(self, bus: BusABC) -> None:
"""Add a bus for notification.
"""Add a bus for notification."""
self._bus_list.append(bus)
self._start_reader(bus)

:param bus:
CAN bus instance.
:raises ValueError:
If the *bus* is already assigned to an active :class:`~can.Notifier`.
"""
# add bus to notifier registry
def _start_reader(self, bus: BusABC) -> None:
"""Internal helper to spin up the actual background worker for a bus."""
Notifier._registry.register(bus, self)

# add bus to internal bus list
self._bus_list.append(bus)

file_descriptor: int = -1
try:
file_descriptor = bus.fileno()
except NotImplementedError:
# Bus doesn't support fileno, we fall back to thread based reader
pass

if self._loop is not None and file_descriptor >= 0:
# Use bus file descriptor to watch for messages
self._loop.add_reader(file_descriptor, self._on_message_available, bus)
self._readers.append(file_descriptor)
else:
Expand Down Expand Up @@ -218,6 +207,10 @@ def stop(self, timeout: float = 5.0) -> None:
for bus in self._bus_list:
Notifier._registry.unregister(bus, self)

self._readers = []
# Clear any pending asyncio tasks to prevent stale references
self._tasks.clear()

def _rx_thread(self, bus: BusABC) -> None:
# determine message handling callable early, not inside while loop
if self._loop:
Expand All @@ -233,18 +226,20 @@ def _rx_thread(self, bus: BusABC) -> None:
if msg := bus.recv(self.timeout):
with self._lock:
handle_message(msg)
except Exception as exc: # pylint: disable=broad-except
except CanError as exc:
self.exception = exc
# Always notify the system when the bus fails
if self._loop is not None:
self._loop.call_soon_threadsafe(self._on_error, exc)
# Raise anyway
raise
elif not self._on_error(exc):
# If it was not handled, raise the exception here
raise
else:
# It was handled, so only log it
logger.debug("suppressed exception: %s", exc)
self._on_error(exc)
logger.error("CAN error in notifier thread: %s", exc)
except Exception as exc:
# Catching other runtime errors to prevent silent thread death
self.exception = exc
logger.critical("Unexpected error in notifier thread: %s", exc)
self._on_error(exc)
raise # Re-raise unexpected non-CAN errors

def _on_message_available(self, bus: BusABC) -> None:
if msg := bus.recv(0):
Expand Down Expand Up @@ -317,6 +312,30 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]:
"""
return Notifier._registry.find_instances(bus)

def restart(self) -> None:
"""Restarts the Notifier if it has been stopped.

:raises RuntimeWarning: If the notifier is already running.
"""
with self._lock:
if not self._stopped:
raise RuntimeWarning("Notifier is already running.")

self._stopped = False
self.exception = None
# Note: _bus_list is preserved from previous run

for bus in self._bus_list:
self._start_reader(bus)

# Re-trigger listeners if they have a start method
for listener in self.listeners:
if hasattr(listener, "start"):
try:
listener.start()
except (AttributeError, RuntimeError, TypeError, ValueError) as e:
logger.error("Failed to restart listener: %s", e)

def __exit__(
self,
exc_type: type[BaseException] | None,
Expand Down
1 change: 1 addition & 0 deletions doc/changelog.d/2055.added.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added new Notifier Restart method.
83 changes: 83 additions & 0 deletions test/notifier_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,59 @@ def test_registry(self):
# find_instance must return the existing instance
self.assertEqual(can.Notifier.find_instances(bus), (notifier,))

def test_restart(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.BufferedReader()
notifier = can.Notifier(bus, [reader], 0.1)

bus.send(can.Message(arbitration_id=0x123))
self.assertIsNotNone(reader.get_message(1))

notifier.stop()
self.assertTrue(notifier.stopped)

new_reader = can.BufferedReader()
notifier.listeners = [new_reader]

notifier.restart()
self.assertFalse(notifier.stopped)

# Small settling time for the thread to actually start
time.sleep(0.2)

# Send and Verify
msg = can.Message(arbitration_id=0xABC)
bus.send(msg)

recv = new_reader.get_message(1)
self.assertIsNotNone(recv, "Failed to receive message after restart")
self.assertEqual(recv.arbitration_id, 0xABC)

notifier.stop()

def test_restart_registry_lifecycle(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
notifier = can.Notifier(bus, [], 0.1)
notifier.stop()

# Verify it is removed from registry
self.assertEqual(can.Notifier.find_instances(bus), ())

notifier.restart()

# Verify it is back in the registry and blocking others
self.assertEqual(can.Notifier.find_instances(bus), (notifier,))
self.assertRaises(ValueError, can.Notifier, bus, [], 0.1)

notifier.stop()

def test_double_restart_warning(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
with can.Notifier(bus, [], 0.1) as notifier:
# Attempting to restart an already running notifier should warn
with self.assertRaises(RuntimeWarning):
notifier.restart()


class AsyncNotifierTest(unittest.TestCase):
def test_asyncio_notifier(self):
Expand All @@ -88,6 +141,36 @@ async def run_it():

asyncio.run(run_it())

def test_asyncio_notifier_restart(self):
async def run_it():
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.AsyncBufferedReader()
notifier = can.Notifier(
bus, [reader], 0.1, loop=asyncio.get_running_loop()
)

notifier.stop()

# In some cases, creating a new listener is the safest path for a restart
new_reader = can.AsyncBufferedReader()
notifier.listeners = [new_reader]

notifier.restart()

# Small yield to let the selector register the FD
await asyncio.sleep(0.1)

bus.send(can.Message(arbitration_id=0x123))

recv_msg = await asyncio.wait_for(new_reader.get_message(), 1.5)

self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.arbitration_id, 0x123)

notifier.stop()

asyncio.run(run_it())


if __name__ == "__main__":
unittest.main()
Loading