Source code for can.interfaces.socketcand.socketcand

"""
Interface to socketcand
see https://github.com/linux-can/socketcand

Authors: Marvin Seiler, Gerrit Telkamp

Copyright (C) 2021  DOMOLOGIC GmbH
http://www.domologic.de
"""

import logging
import os
import select
import socket
import time
import traceback
import urllib.parse as urlparselib
import xml.etree.ElementTree as ET
from collections import deque

import can

log = logging.getLogger(__name__)

DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS = ""
DEFAULT_SOCKETCAND_DISCOVERY_PORT = 42000


[docs] def detect_beacon(timeout_ms: int = 3100) -> list[can.typechecking.AutoDetectedConfig]: """ Detects socketcand servers This is what :meth:`can.detect_available_configs` ends up calling to search for available socketcand servers with a default timeout of 3100ms (socketcand sends a beacon packet every 3000ms). Using this method directly allows for adjusting the timeout. Extending the timeout beyond the default time period could be useful if UDP packet loss is a concern. :param timeout_ms: Timeout in milliseconds to wait for socketcand beacon packets :return: See :meth:`~can.detect_available_configs` """ with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.bind( (DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS, DEFAULT_SOCKETCAND_DISCOVERY_PORT) ) log.info( "Listening on for socketcand UDP advertisement on %s:%s", DEFAULT_SOCKETCAND_DISCOVERY_ADDRESS, DEFAULT_SOCKETCAND_DISCOVERY_PORT, ) now = time.time() * 1000 end_time = now + timeout_ms while (time.time() * 1000) < end_time: try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) ready_receive_sockets, _, _ = select.select([sock], [], [], 1) if not ready_receive_sockets: log.debug("No advertisement received") continue msg = sock.recv(1024).decode("utf-8") root = ET.fromstring(msg) if root.tag != "CANBeacon": log.debug("Unexpected message received over UDP") continue det_devs = [] det_host = None det_port = None for child in root: if child.tag == "Bus": bus_name = child.attrib["name"] det_devs.append(bus_name) elif child.tag == "URL": url = urlparselib.urlparse(child.text) det_host = url.hostname det_port = url.port if not det_devs: log.debug( "Got advertisement, but no SocketCAN devices advertised by socketcand" ) continue if (det_host is None) or (det_port is None): det_host = None det_port = None log.debug( "Got advertisement, but no SocketCAN URL advertised by socketcand" ) continue log.info(f"Found SocketCAN devices: {det_devs}") return [ { "interface": "socketcand", "host": det_host, "port": det_port, "channel": channel, } for channel in det_devs ] except ET.ParseError: log.debug("Unexpected message received over UDP") continue except Exception as exc: # something bad happened (e.g. the interface went down) log.error(f"Failed to detect beacon: {exc} {traceback.format_exc()}") raise OSError( f"Failed to detect beacon: {exc} {traceback.format_exc()}" ) from exc return []
def convert_ascii_message_to_can_message(ascii_msg: str) -> can.Message: if not ascii_msg.endswith(" >"): log.warning(f"Missing ending character in ascii message: {ascii_msg}") return None if ascii_msg.startswith("< frame "): # frame_string = ascii_msg.removeprefix("< frame ").removesuffix(" >") frame_string = ascii_msg[8:-2] parts = frame_string.split(" ", 3) can_id, timestamp = int(parts[0], 16), float(parts[1]) is_ext = len(parts[0]) != 3 data = bytearray.fromhex(parts[2]) can_dlc = len(data) can_message = can.Message( timestamp=timestamp, arbitration_id=can_id, data=data, dlc=can_dlc, is_extended_id=is_ext, is_rx=True, ) return can_message if ascii_msg.startswith("< error "): frame_string = ascii_msg[8:-2] parts = frame_string.split(" ", 3) can_id, timestamp = int(parts[0], 16), float(parts[1]) is_ext = len(parts[0]) != 3 # socketcand sends no data in the error message so we don't have information # about the error details, therefore the can frame is created with one # data byte set to zero data = bytearray([0]) can_dlc = len(data) can_message = can.Message( timestamp=timestamp, arbitration_id=can_id & 0x1FFFFFFF, is_error_frame=True, data=data, dlc=can_dlc, is_extended_id=True, is_rx=True, ) return can_message log.warning(f"Could not parse ascii message: {ascii_msg}") return None def convert_can_message_to_ascii_message(can_message: can.Message) -> str: # Note: socketcan bus adds extended flag, remote_frame_flag & error_flag to id # not sure if that is necessary here can_id = can_message.arbitration_id if can_message.is_extended_id: can_id_string = f"{(can_id&0x1FFFFFFF):08X}" else: can_id_string = f"{(can_id&0x7FF):03X}" # Note: seems like we cannot add CANFD_BRS (bitrate_switch) and CANFD_ESI (error_state_indicator) flags data = can_message.data length = can_message.dlc bytes_string = " ".join(f"{x:x}" for x in data[0:length]) return f"< send {can_id_string} {length:X} {bytes_string} >" def connect_to_server(s, host, port): timeout_ms = 10000 now = time.time() * 1000 end_time = now + timeout_ms while now < end_time: try: s.connect((host, port)) return except Exception as e: log.warning(f"Failed to connect to server: {type(e)} Message: {e}") now = time.time() * 1000 raise TimeoutError( f"connect_to_server: Failed to connect server for {timeout_ms} ms" )
[docs] class SocketCanDaemonBus(can.BusABC): def __init__(self, channel, host, port, tcp_tune=False, can_filters=None, **kwargs): """Connects to a CAN bus served by socketcand. It implements :meth:`can.BusABC._detect_available_configs` to search for available interfaces. It will attempt to connect to the server for up to 10s, after which a TimeoutError exception will be thrown. If the handshake with the socketcand server fails, a CanError exception is thrown. :param channel: The can interface name served by socketcand. An example channel would be 'vcan0' or 'can0'. :param host: The host address of the socketcand server. :param port: The port of the socketcand server. :param tcp_tune: This tunes the TCP socket for low latency (TCP_NODELAY, and TCP_QUICKACK). This option is not available under windows. :param can_filters: See :meth:`can.BusABC.set_filters`. """ self.__host = host self.__port = port self.__tcp_tune = tcp_tune self.__socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self.__tcp_tune: if os.name == "nt": self.__tcp_tune = False log.warning("'tcp_tune' not available in Windows. Setting to False") else: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.__message_buffer = deque() self.__receive_buffer = "" # i know string is not the most efficient here self.channel = channel self.channel_info = f"socketcand on {channel}@{host}:{port}" connect_to_server(self.__socket, self.__host, self.__port) self._expect_msg("< hi >") log.info( f"SocketCanDaemonBus: connected with address {self.__socket.getsockname()}" ) self._tcp_send(f"< open {channel} >") self._expect_msg("< ok >") self._tcp_send("< rawmode >") self._expect_msg("< ok >") super().__init__(channel=channel, can_filters=can_filters, **kwargs) def _recv_internal(self, timeout): if len(self.__message_buffer) != 0: can_message = self.__message_buffer.popleft() return can_message, False try: # get all sockets that are ready (can be a list with a single value # being self.socket or an empty list if self.socket is not ready) ready_receive_sockets, _, _ = select.select( [self.__socket], [], [], timeout ) except OSError as exc: # something bad happened (e.g. the interface went down) log.error(f"Failed to receive: {exc}") raise can.CanError(f"Failed to receive: {exc}") from exc try: if not ready_receive_sockets: # socket wasn't readable or timeout occurred log.debug("Socket not ready") return None, False ascii_msg = self.__socket.recv(1024).decode( "ascii" ) # may contain multiple messages if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) self.__receive_buffer += ascii_msg log.debug(f"Received Ascii Message: {ascii_msg}") buffer_view = self.__receive_buffer chars_processed_successfully = 0 while True: if len(buffer_view) == 0: break start = buffer_view.find("<") if start == -1: log.warning( f"Bad data: No opening < found => discarding entire buffer '{buffer_view}'" ) chars_processed_successfully = len(self.__receive_buffer) break end = buffer_view.find(">") if end == -1: log.warning("Got incomplete message => waiting for more data") if len(buffer_view) > 200: log.warning( "Incomplete message exceeds 200 chars => Discarding" ) chars_processed_successfully = len(self.__receive_buffer) break chars_processed_successfully += end + 1 single_message = buffer_view[start : end + 1] parsed_can_message = convert_ascii_message_to_can_message( single_message ) if parsed_can_message is None: log.warning(f"Invalid Frame: {single_message}") else: parsed_can_message.channel = self.channel self.__message_buffer.append(parsed_can_message) buffer_view = buffer_view[end + 1 :] self.__receive_buffer = self.__receive_buffer[chars_processed_successfully:] can_message = ( None if len(self.__message_buffer) == 0 else self.__message_buffer.popleft() ) return can_message, False except Exception as exc: log.error(f"Failed to receive: {exc} {traceback.format_exc()}") raise can.CanError( f"Failed to receive: {exc} {traceback.format_exc()}" ) from exc def _tcp_send(self, msg: str): log.debug(f"Sending TCP Message: '{msg}'") self.__socket.sendall(msg.encode("ascii")) if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) def _expect_msg(self, msg): ascii_msg = self.__socket.recv(256).decode("ascii") if self.__tcp_tune: self.__socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) if not ascii_msg == msg: raise can.CanError(f"Expected '{msg}' got: '{ascii_msg}'")
[docs] def send(self, msg, timeout=None): """Transmit a message to the CAN bus. :param msg: A message object. :param timeout: Ignored """ ascii_msg = convert_can_message_to_ascii_message(msg) self._tcp_send(ascii_msg)
[docs] def shutdown(self): """Stops all active periodic tasks and closes the socket.""" super().shutdown() self.__socket.close()
@staticmethod def _detect_available_configs() -> list[can.typechecking.AutoDetectedConfig]: try: return detect_beacon() except Exception as e: log.warning(f"Could not detect socketcand beacon: {e}") return []