Source code for can.interfaces.gs_usb

import logging
from typing import Any

import usb
from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG
from gs_usb.gs_usb import GsUsb
from gs_usb.gs_usb_frame import GS_USB_NONE_ECHO_ID, GsUsbFrame

import can

from ..exceptions import CanInitializationError, CanOperationError

logger = logging.getLogger(__name__)


def _find_gs_usb_devices(
    bus: int | None = None, address: int | None = None
) -> list[usb.core.Device]:
    """Find raw USB devices for gs_usb using auto-detected backend.

    Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend,
    allowing ``pyusb`` to auto-detect the best available backend. This enables
    support for WinUSB on Windows in addition to libusbK.

    :param bus: number of the bus that the device is connected to
    :param address: address of the device on the bus it is connected to
    :return: a list of found raw USB devices
    """
    kwargs = {}
    if bus is not None:
        kwargs["bus"] = bus
    if address is not None:
        kwargs["address"] = address

    return list(
        usb.core.find(
            find_all=True,
            custom_match=GsUsb.is_gs_usb_device,
            **kwargs,
        )
        or []
    )


[docs] class GsUsbBus(can.BusABC): def __init__( self, channel: can.typechecking.Channel, bitrate: int = 500_000, index: int | None = None, bus: int | None = None, address: int | None = None, can_filters: can.typechecking.CanFilters | None = None, **kwargs: Any, ) -> None: """ :param channel: usb device name :param index: device number if using automatic scan, starting from 0. If specified, bus/address shall not be provided. :param bus: number of the bus that the device is connected to :param address: address of the device on the bus it is connected to :param can_filters: not supported :param bitrate: CAN network bandwidth (bits/s) """ if (index is not None) and ((bus or address) is not None): raise CanInitializationError( "index and bus/address cannot be used simultaneously" ) if index is None and address is None and bus is None: _index: Any = channel else: _index = index self._index: int | None = None if _index is not None: if not isinstance(_index, int): try: _index = int(_index) except (ValueError, TypeError): raise CanInitializationError( f"index must be an integer, but got {type(_index).__name__} ({_index})" ) from None devs = _find_gs_usb_devices() if len(devs) <= _index: raise CanInitializationError( f"Cannot find device {_index}. Devices found: {len(devs)}" ) gs_usb_dev = devs[_index] self._index = _index else: devs = _find_gs_usb_devices(bus=bus, address=address) if not devs: raise CanInitializationError(f"Cannot find device {channel}") gs_usb_dev = devs[0] self.gs_usb = GsUsb(gs_usb_dev) self.channel_info = str(channel) self._can_protocol = can.CanProtocol.CAN_20 bit_timing = can.BitTiming.from_sample_point( f_clock=self.gs_usb.device_capability.fclk_can, bitrate=bitrate, sample_point=87.5, ) props_seg = 1 self.gs_usb.set_timing( prop_seg=props_seg, phase_seg1=bit_timing.tseg1 - props_seg, phase_seg2=bit_timing.tseg2, sjw=bit_timing.sjw, brp=bit_timing.brp, ) self.gs_usb.start() self._bitrate = bitrate super().__init__( channel=channel, can_filters=can_filters, **kwargs, )
[docs] def send(self, msg: can.Message, timeout: float | None = None) -> None: """Transmit a message to the CAN bus. :param Message msg: A message object. :param timeout: timeout is not supported. The function won't return until message is sent or exception is raised. :raises CanOperationError: if the message could not be sent """ can_id = msg.arbitration_id if msg.is_extended_id: can_id = can_id | CAN_EFF_FLAG if msg.is_remote_frame: can_id = can_id | CAN_RTR_FLAG if msg.is_error_frame: can_id = can_id | CAN_ERR_FLAG # Pad message data msg.data.extend([0x00] * (CAN_MAX_DLC - len(msg.data))) frame = GsUsbFrame() frame.can_id = can_id frame.can_dlc = msg.dlc frame.timestamp_us = 0 # timestamp frame field is only useful on receive frame.data = list(msg.data) try: self.gs_usb.send(frame) except usb.core.USBError as exc: raise CanOperationError("The message could not be sent") from exc
def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, bool]: """ Read a message from the bus and tell whether it was filtered. This methods may be called by :meth:`~can.BusABC.recv` to read a message multiple times if the filters set by :meth:`~can.BusABC.set_filters` do not match and the call has not yet timed out. Never raises an error/exception. :param float timeout: seconds to wait for a message, see :meth:`~can.BusABC.send` 0 and None will be converted to minimum value 1ms. :return: 1. a message that was read or None on timeout 2. a bool that is True if message filtering has already been done and else False. In this interface it is always False since filtering is not available """ frame = GsUsbFrame() if timeout is None: timeout_ms = 0 else: # Do not set timeout as None or zero here to avoid blocking timeout_ms = round(timeout * 1000) if timeout else 1 if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms): return None, False msg = can.Message( timestamp=frame.timestamp, arbitration_id=frame.arbitration_id, is_extended_id=frame.is_extended_id, is_remote_frame=frame.is_remote_frame, is_error_frame=frame.is_error_frame, channel=self.channel_info, dlc=frame.can_dlc, data=bytearray(frame.data)[0 : frame.can_dlc], is_rx=frame.echo_id == GS_USB_NONE_ECHO_ID, ) return msg, False
[docs] def shutdown(self): already_shutdown = self._is_shutdown super().shutdown() if already_shutdown: return self.gs_usb.stop() if self._index is not None: # Avoid errors on subsequent __init() by repeating the .scan() and # .start() that would otherwise fail the next time the device is # opened in __init__() devs = _find_gs_usb_devices() if self._index < len(devs): gs_usb = GsUsb(devs[self._index]) try: gs_usb.set_bitrate(self._bitrate) gs_usb.start() gs_usb.stop() except usb.core.USBError: pass