Source code for can.notifier

"""
This module contains the implementation of :class:`~can.Notifier`.
"""

import asyncio
import functools
import logging
import threading
import time
from collections.abc import Awaitable, Callable, Iterable
from contextlib import AbstractContextManager
from types import TracebackType
from typing import (
    Any,
    Final,
    NamedTuple,
)

from can.bus import BusABC
from can.listener import Listener
from can.message import Message

logger = logging.getLogger("can.Notifier")

MessageRecipient = Listener | Callable[[Message], Awaitable[None] | None]


class _BusNotifierPair(NamedTuple):
    bus: "BusABC"
    notifier: "Notifier"


class _NotifierRegistry:
    """A registry to manage the association between CAN buses and Notifiers.

    This class ensures that a bus is not added to multiple active Notifiers.
    """

    def __init__(self) -> None:
        """Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
        self.pairs: list[_BusNotifierPair] = []
        self.lock = threading.Lock()

    def register(self, bus: BusABC, notifier: "Notifier") -> None:
        """Register a bus and its associated notifier.

        Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.

        :param bus:
            The CAN bus to register.
        :param notifier:
            The :class:`~can.Notifier` instance associated with the bus.
        :raises ValueError:
            If the bus is already assigned to an active Notifier.
        """
        with self.lock:
            for pair in self.pairs:
                if bus is pair.bus and not pair.notifier.stopped:
                    raise ValueError(
                        "A bus can not be added to multiple active Notifier instances."
                    )
            self.pairs.append(_BusNotifierPair(bus, notifier))

    def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
        """Unregister a bus and its associated notifier.

        Removes the bus-notifier pair from the registry.

        :param bus:
            The CAN bus to unregister.
        :param notifier:
            The :class:`~can.Notifier` instance associated with the bus.
        """
        with self.lock:
            registered_pairs_to_remove: list[_BusNotifierPair] = []
            for pair in self.pairs:
                if pair.bus is bus and pair.notifier is notifier:
                    registered_pairs_to_remove.append(pair)
            for pair in registered_pairs_to_remove:
                self.pairs.remove(pair)

    def find_instances(self, bus: BusABC) -> tuple["Notifier", ...]:
        """Find the :class:`~can.Notifier` instances associated with a given CAN bus.

        This method searches the registry for the :class:`~can.Notifier`
        that is linked to the specified bus. If the bus is found, the
        corresponding :class:`~can.Notifier` instances are returned. If the bus is not
        found in the registry, an empty tuple is returned.

        :param bus:
            The CAN bus for which to find the associated :class:`~can.Notifier` .
        :return:
            A tuple of :class:`~can.Notifier` instances associated with the given bus.
        """
        instance_list = []
        with self.lock:
            for pair in self.pairs:
                if bus is pair.bus:
                    instance_list.append(pair.notifier)
        return tuple(instance_list)


[docs] class Notifier(AbstractContextManager["Notifier"]): _registry: Final = _NotifierRegistry() def __init__( self, bus: BusABC | list[BusABC], listeners: Iterable[MessageRecipient], timeout: float = 1.0, loop: asyncio.AbstractEventLoop | None = None, ) -> None: """Manages the distribution of :class:`~can.Message` instances to listeners. Supports multiple buses and listeners. .. Note:: Remember to call :meth:`~can.Notifier.stop` after all messages are received as many listeners carry out flush operations to persist data. :param bus: A :ref:`bus` or a list of buses to consume messages from. :param listeners: An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` and return nothing. :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. :raises ValueError: If a passed in *bus* is already assigned to an active :class:`~can.Notifier`. """ self.listeners: list[MessageRecipient] = list(listeners) self._bus_list: list[BusABC] = [] self.timeout = timeout self._loop = loop #: Exception raised in thread self.exception: Exception | None = None self._stopped = False self._lock = threading.Lock() self._readers: list[int | threading.Thread] = [] self._tasks: set[asyncio.Task] = set() _bus_list: list[BusABC] = bus if isinstance(bus, list) else [bus] for each_bus in _bus_list: self.add_bus(each_bus) @property def bus(self) -> BusABC | tuple["BusABC", ...]: """Return the associated bus or a tuple of buses.""" if len(self._bus_list) == 1: return self._bus_list[0] return tuple(self._bus_list)
[docs] def add_bus(self, bus: BusABC) -> None: """Add a bus for notification. :param bus: CAN bus instance. :raises ValueError: If the *bus* is already assigned to an active :class:`~can.Notifier`. """ # add bus to notifier registry Notifier._registry.register(bus, self) # add bus to internal bus list self._bus_list.append(bus) file_descriptor: int = -1 try: file_descriptor = bus.fileno() except NotImplementedError: # Bus doesn't support fileno, we fall back to thread based reader pass if self._loop is not None and file_descriptor >= 0: # Use bus file descriptor to watch for messages self._loop.add_reader(file_descriptor, self._on_message_available, bus) self._readers.append(file_descriptor) else: reader_thread = threading.Thread( target=self._rx_thread, args=(bus,), name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"', ) reader_thread.daemon = True reader_thread.start() self._readers.append(reader_thread)
[docs] def stop(self, timeout: float = 5.0) -> None: """Stop notifying Listeners when new :class:`~can.Message` objects arrive and call :meth:`~can.Listener.stop` on each Listener. :param timeout: Max time in seconds to wait for receive threads to finish. Should be longer than timeout given at instantiation. """ self._stopped = True end_time = time.time() + timeout for reader in self._readers: if isinstance(reader, threading.Thread): now = time.time() if now < end_time: reader.join(end_time - now) elif self._loop: # reader is a file descriptor self._loop.remove_reader(reader) for listener in self.listeners: if hasattr(listener, "stop"): listener.stop() # remove bus from registry for bus in self._bus_list: Notifier._registry.unregister(bus, self)
def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop if self._loop: handle_message: Callable[[Message], Any] = functools.partial( self._loop.call_soon_threadsafe, self._on_message_received, # type: ignore[arg-type] ) else: handle_message = self._on_message_received while not self._stopped: try: if msg := bus.recv(self.timeout): with self._lock: handle_message(msg) except Exception as exc: # pylint: disable=broad-except self.exception = exc if self._loop is not None: self._loop.call_soon_threadsafe(self._on_error, exc) # Raise anyway raise elif not self._on_error(exc): # If it was not handled, raise the exception here raise else: # It was handled, so only log it logger.debug("suppressed exception: %s", exc) def _on_message_available(self, bus: BusABC) -> None: if msg := bus.recv(0): self._on_message_received(msg) def _on_message_received(self, msg: Message) -> None: for callback in self.listeners: res = callback(msg) if res and self._loop and asyncio.iscoroutine(res): # Schedule coroutine and keep a reference to the task task = self._loop.create_task(res) self._tasks.add(task) task.add_done_callback(self._tasks.discard) def _on_error(self, exc: Exception) -> bool: """Calls ``on_error()`` for all listeners if they implement it. :returns: ``True`` if at least one error handler was called. """ was_handled = False for listener in self.listeners: if hasattr(listener, "on_error"): try: listener.on_error(exc) except NotImplementedError: pass else: was_handled = True return was_handled
[docs] def add_listener(self, listener: MessageRecipient) -> None: """Add new Listener to the notification list. If it is already present, it will be called two times each time a message arrives. :param listener: Listener to be added to the list to be notified """ self.listeners.append(listener)
[docs] def remove_listener(self, listener: MessageRecipient) -> None: """Remove a listener from the notification list. This method throws an exception if the given listener is not part of the stored listeners. :param listener: Listener to be removed from the list to be notified :raises ValueError: if `listener` was never added to this notifier """ self.listeners.remove(listener)
@property def stopped(self) -> bool: """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" return self._stopped
[docs] @staticmethod def find_instances(bus: BusABC) -> tuple["Notifier", ...]: """Find :class:`~can.Notifier` instances associated with a given CAN bus. This method searches the registry for the :class:`~can.Notifier` that is linked to the specified bus. If the bus is found, the corresponding :class:`~can.Notifier` instances are returned. If the bus is not found in the registry, an empty tuple is returned. :param bus: The CAN bus for which to find the associated :class:`~can.Notifier` . :return: A tuple of :class:`~can.Notifier` instances associated with the given bus. """ return Notifier._registry.find_instances(bus)
def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: if not self._stopped: self.stop()