"""
Interface for slcan compatible interfaces (win32/linux).
"""
import io
import logging
import time
import warnings
from queue import SimpleQueue
from typing import Any, cast
from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message, typechecking
from can.exceptions import (
CanInitializationError,
CanInterfaceNotImplementedError,
CanOperationError,
error_check,
)
from can.util import (
CAN_FD_DLC,
check_or_adjust_timing_clock,
deprecated_args_alias,
len2dlc,
)
logger = logging.getLogger(__name__)
try:
import serial
except ImportError:
logger.warning(
"You won't be able to use the slcan can backend without "
"the serial module installed!"
)
serial = None
[docs]
class slcanBus(BusABC):
"""
slcan interface
"""
# the supported bitrates and their commands
_BITRATES = {
10000: "S0",
20000: "S1",
50000: "S2",
100000: "S3",
125000: "S4",
250000: "S5",
500000: "S6",
750000: "S7",
1000000: "S8",
83300: "S9",
}
_DATA_BITRATES = {
0: "",
2000000: "Y2",
5000000: "Y5",
}
_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds
_OK = b"\r"
_ERROR = b"\a"
LINE_TERMINATOR = b"\r"
@deprecated_args_alias(
deprecation_start="4.5.0",
deprecation_end="5.0.0",
ttyBaudrate="tty_baudrate",
)
def __init__(
self,
channel: typechecking.ChannelStr,
tty_baudrate: int = 115200,
bitrate: int | None = None,
timing: BitTiming | BitTimingFd | None = None,
sleep_after_open: float = _SLEEP_AFTER_SERIAL_OPEN,
rtscts: bool = False,
listen_only: bool = False,
timeout: float = 0.001,
**kwargs: Any,
) -> None:
"""
:param str channel:
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
:param int tty_baudrate:
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
:param bitrate:
Bitrate in bit/s
:param timing:
Optional :class:`~can.BitTiming` instance to use for custom bit timing setting.
If this argument is set then it overrides the bitrate and btr arguments. The
`f_clock` value of the timing instance must be set to 8_000_000 (8MHz)
for standard CAN.
CAN FD and the :class:`~can.BitTimingFd` class have partial support according to the non-standard
slcan protocol implementation in the CANABLE 2.0 firmware: currently only data rates of 2M and 5M.
:param poll_interval:
Poll interval in seconds when reading messages
:param sleep_after_open:
Time to wait in seconds after opening serial connection
:param rtscts:
turn hardware handshake (RTS/CTS) on and off
:param listen_only:
If True, open interface/channel in listen mode with ``L`` command.
Otherwise, the (default) ``O`` command is still used. See ``open`` method.
:param timeout:
Timeout for the serial or usb device in seconds (default 0.001)
:raise ValueError: if both ``bitrate`` and ``btr`` are set or the channel is invalid
:raise CanInterfaceNotImplementedError: if the serial module is missing
:raise CanInitializationError: if the underlying serial connection could not be established
"""
self._listen_only = listen_only
if serial is None:
raise CanInterfaceNotImplementedError("The serial module is not installed")
btr: str | None = kwargs.get("btr", None)
if btr is not None:
warnings.warn(
"The 'btr' argument is deprecated since python-can v4.5.0 "
"and scheduled for removal in v5.0.0. "
"Use the 'timing' argument instead.",
DeprecationWarning,
stacklevel=1,
)
if not channel: # if None or empty
raise ValueError("Must specify a serial port.")
if "@" in channel:
channel, baudrate = channel.split("@")
tty_baudrate = int(baudrate)
with error_check(exception_type=CanInitializationError):
self.serialPortOrig = serial.serial_for_url(
channel,
baudrate=tty_baudrate,
rtscts=rtscts,
timeout=timeout,
)
self._queue: SimpleQueue[str] = SimpleQueue()
self._buffer = bytearray()
self._can_protocol = CanProtocol.CAN_20
time.sleep(sleep_after_open)
with error_check(exception_type=CanInitializationError):
if isinstance(timing, BitTiming):
timing = check_or_adjust_timing_clock(timing, valid_clocks=[8_000_000])
self.set_bitrate_reg(f"{timing.btr0:02X}{timing.btr1:02X}")
elif isinstance(timing, BitTimingFd):
self.set_bitrate(timing.nom_bitrate, timing.data_bitrate)
else:
if bitrate is not None and btr is not None:
raise ValueError("Bitrate and btr mutually exclusive.")
if bitrate is not None:
self.set_bitrate(bitrate)
if btr is not None:
self.set_bitrate_reg(btr)
self.open()
super().__init__(channel, **kwargs)
[docs]
def set_bitrate(self, bitrate: int, data_bitrate: int | None = None) -> None:
"""
:param bitrate:
Bitrate in bit/s
:param data_bitrate:
Data Bitrate in bit/s for FD frames
:raise ValueError: if ``bitrate`` is not among the possible values
"""
if bitrate in self._BITRATES:
bitrate_code = self._BITRATES[bitrate]
else:
bitrates = ", ".join(str(k) for k in self._BITRATES.keys())
raise ValueError(f"Invalid bitrate, choose one of {bitrates}.")
# If data_bitrate is None, we set it to 0 which means no data bitrate
if data_bitrate is None:
data_bitrate = 0
if data_bitrate in self._DATA_BITRATES:
dbitrate_code = self._DATA_BITRATES[data_bitrate]
else:
dbitrates = ", ".join(str(k) for k in self._DATA_BITRATES.keys())
raise ValueError(f"Invalid data bitrate, choose one of {dbitrates}.")
self.close()
self._write(bitrate_code)
self._write(dbitrate_code)
self.open()
[docs]
def set_bitrate_reg(self, btr: str) -> None:
"""
:param btr:
BTR register value to set custom can speed as a string `xxyy` where
xx is the BTR0 value in hex and yy is the BTR1 value in hex.
"""
self.close()
self._write("s" + btr)
self.open()
def _write(self, string: str) -> None:
with error_check("Could not write to serial device"):
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()
def _read(self, timeout: float | None) -> str | None:
_timeout = serial.Timeout(timeout)
with error_check("Could not read from serial device"):
while True:
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
# We read the `serialPortOrig.in_waiting` only once here.
size = self.serialPortOrig.in_waiting or 1
self._buffer.extend(self.serialPortOrig.read(size))
for i, byte in enumerate(self._buffer):
if byte in (self._OK[0], self._ERROR[0]):
string = self._buffer[: i + 1].decode()
del self._buffer[: i + 1]
return string
if _timeout.expired():
break
return None
def flush(self) -> None:
self._buffer.clear()
with error_check("Could not flush"):
self.serialPortOrig.reset_input_buffer()
def open(self) -> None:
if self._listen_only:
self._write("L")
else:
self._write("O")
def close(self) -> None:
self._write("C")
def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]:
canId = None
remote = False
extended = False
data = None
isFd = False
fdBrs = False
if self._queue.qsize():
string: str | None = self._queue.get_nowait()
else:
string = self._read(timeout)
if not string:
pass
elif string[0] in (
"T",
"x", # x is an alternative extended message identifier for CANDapter
):
# extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
data = bytearray.fromhex(string[10 : 10 + dlc * 2])
elif string[0] == "t":
# normal frame
canId = int(string[1:4], 16)
dlc = int(string[4])
data = bytearray.fromhex(string[5 : 5 + dlc * 2])
elif string[0] == "r":
# remote frame
canId = int(string[1:4], 16)
dlc = int(string[4])
remote = True
elif string[0] == "R":
# remote extended frame
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
remote = True
elif string[0] == "d":
# FD standard frame
canId = int(string[1:4], 16)
dlc = int(string[4], 16)
isFd = True
data = bytearray.fromhex(string[5 : 5 + CAN_FD_DLC[dlc] * 2])
elif string[0] == "D":
# FD extended frame
canId = int(string[1:9], 16)
dlc = int(string[9], 16)
extended = True
isFd = True
data = bytearray.fromhex(string[10 : 10 + CAN_FD_DLC[dlc] * 2])
elif string[0] == "b":
# FD with bitrate switch
canId = int(string[1:4], 16)
dlc = int(string[4], 16)
isFd = True
fdBrs = True
data = bytearray.fromhex(string[5 : 5 + CAN_FD_DLC[dlc] * 2])
elif string[0] == "B":
# FD extended with bitrate switch
canId = int(string[1:9], 16)
dlc = int(string[9], 16)
extended = True
isFd = True
fdBrs = True
data = bytearray.fromhex(string[10 : 10 + CAN_FD_DLC[dlc] * 2])
if canId is not None:
msg = Message(
arbitration_id=canId,
is_extended_id=extended,
timestamp=time.time(), # Better than nothing...
is_remote_frame=remote,
is_fd=isFd,
bitrate_switch=fdBrs,
dlc=CAN_FD_DLC[dlc],
data=data,
)
return msg, False
return None, False
[docs]
def send(self, msg: Message, timeout: float | None = None) -> None:
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout
if msg.is_remote_frame:
if msg.is_extended_id:
sendStr = f"R{msg.arbitration_id:08X}{msg.dlc:d}"
else:
sendStr = f"r{msg.arbitration_id:03X}{msg.dlc:d}"
elif msg.is_fd:
fd_dlc = len2dlc(msg.dlc)
if msg.bitrate_switch:
if msg.is_extended_id:
sendStr = f"B{msg.arbitration_id:08X}{fd_dlc:X}"
else:
sendStr = f"b{msg.arbitration_id:03X}{fd_dlc:X}"
sendStr += msg.data.hex().upper()
else:
if msg.is_extended_id:
sendStr = f"D{msg.arbitration_id:08X}{fd_dlc:X}"
else:
sendStr = f"d{msg.arbitration_id:03X}{fd_dlc:X}"
sendStr += msg.data.hex().upper()
else:
if msg.is_extended_id:
sendStr = f"T{msg.arbitration_id:08X}{msg.dlc:d}"
else:
sendStr = f"t{msg.arbitration_id:03X}{msg.dlc:d}"
sendStr += msg.data.hex().upper()
self._write(sendStr)
[docs]
def shutdown(self) -> None:
super().shutdown()
self.close()
with error_check("Could not close serial socket"):
self.serialPortOrig.close()
def fileno(self) -> int:
try:
return cast("int", self.serialPortOrig.fileno())
except io.UnsupportedOperation:
raise NotImplementedError(
"fileno is not implemented using current CAN bus on this platform"
) from None
except Exception as exception:
raise CanOperationError("Cannot fetch fileno") from exception
[docs]
def get_version(self, timeout: float | None) -> tuple[int | None, int | None]:
"""Get HW and SW version of the slcan interface.
:param timeout:
seconds to wait for version or None to wait indefinitely
:returns: tuple (hw_version, sw_version)
WHERE
int hw_version is the hardware version or None on timeout
int sw_version is the software version or None on timeout
"""
_timeout = serial.Timeout(timeout)
cmd = "V"
self._write(cmd)
while True:
if string := self._read(_timeout.time_left()):
if string[0] == cmd:
# convert ASCII coded version
hw_version = int(string[1:3])
sw_version = int(string[3:5])
return hw_version, sw_version
else:
self._queue.put_nowait(string)
if _timeout.expired():
break
return None, None
[docs]
def get_serial_number(self, timeout: float | None) -> str | None:
"""Get serial number of the slcan interface.
:param timeout:
seconds to wait for serial number or :obj:`None` to wait indefinitely
:return:
:obj:`None` on timeout or a :class:`str` object.
"""
_timeout = serial.Timeout(timeout)
cmd = "N"
self._write(cmd)
while True:
if string := self._read(_timeout.time_left()):
if string[0] == cmd:
serial_number = string[1:-1]
return serial_number
else:
self._queue.put_nowait(string)
if _timeout.expired():
break
return None