import argparse
import re
from collections.abc import Sequence
from typing import Any
import can
from can.typechecking import CanFilter, TAdditionalCliArgs
from can.util import _dict2timing, cast_from_string
[docs]
def add_bus_arguments(
parser: argparse.ArgumentParser,
*,
filter_arg: bool = False,
prefix: str | None = None,
group_title: str | None = None,
) -> None:
"""Adds CAN bus configuration options to an argument parser.
:param parser:
The argument parser to which the options will be added.
:param filter_arg:
Whether to include the filter argument.
:param prefix:
An optional prefix for the argument names, allowing configuration of multiple buses.
:param group_title:
The title of the argument group. If not provided, a default title will be generated
based on the prefix. For example, "bus arguments (prefix)" if a prefix is specified,
or "bus arguments" otherwise.
"""
if group_title is None:
group_title = f"bus arguments ({prefix})" if prefix else "bus arguments"
group = parser.add_argument_group(group_title)
flags = [f"--{prefix}-channel"] if prefix else ["-c", "--channel"]
dest = f"{prefix}_channel" if prefix else "channel"
group.add_argument(
*flags,
dest=dest,
default=argparse.SUPPRESS,
metavar="CHANNEL",
help=r"Most backend interfaces require some sort of channel. For "
r"example with the serial interface the channel might be a rfcomm"
r' device: "/dev/rfcomm0". With the socketcan interface valid '
r'channel examples include: "can0", "vcan0".',
)
flags = [f"--{prefix}-interface"] if prefix else ["-i", "--interface"]
dest = f"{prefix}_interface" if prefix else "interface"
group.add_argument(
*flags,
dest=dest,
default=argparse.SUPPRESS,
choices=sorted(can.VALID_INTERFACES),
help="""Specify the backend CAN interface to use. If left blank,
fall back to reading from configuration files.""",
)
flags = [f"--{prefix}-bitrate"] if prefix else ["-b", "--bitrate"]
dest = f"{prefix}_bitrate" if prefix else "bitrate"
group.add_argument(
*flags,
dest=dest,
type=int,
default=argparse.SUPPRESS,
metavar="BITRATE",
help="Bitrate to use for the CAN bus.",
)
flags = [f"--{prefix}-fd"] if prefix else ["--fd"]
dest = f"{prefix}_fd" if prefix else "fd"
group.add_argument(
*flags,
dest=dest,
default=argparse.SUPPRESS,
action="store_true",
help="Activate CAN-FD support",
)
flags = [f"--{prefix}-data-bitrate"] if prefix else ["--data-bitrate"]
dest = f"{prefix}_data_bitrate" if prefix else "data_bitrate"
group.add_argument(
*flags,
dest=dest,
type=int,
default=argparse.SUPPRESS,
metavar="DATA_BITRATE",
help="Bitrate to use for the data phase in case of CAN-FD.",
)
flags = [f"--{prefix}-timing"] if prefix else ["--timing"]
dest = f"{prefix}_timing" if prefix else "timing"
group.add_argument(
*flags,
dest=dest,
action=_BitTimingAction,
nargs=argparse.ONE_OR_MORE,
default=argparse.SUPPRESS,
metavar="TIMING_ARG",
help="Configure bit rate and bit timing. For example, use "
"`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN "
"or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 "
"data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. "
"Check the python-can documentation to verify whether your "
"CAN interface supports the `timing` argument.",
)
if filter_arg:
flags = [f"--{prefix}-filter"] if prefix else ["--filter"]
dest = f"{prefix}_can_filters" if prefix else "can_filters"
group.add_argument(
*flags,
dest=dest,
nargs=argparse.ONE_OR_MORE,
action=_CanFilterAction,
default=argparse.SUPPRESS,
metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
help="R|Space separated CAN filters for the given CAN interface:"
"\n <can_id>:<can_mask> (matches when <received_can_id> & mask =="
" can_id & mask)"
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
" can_id & mask)"
"\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
"\n python -m can.viewer --filter 100:7FC 200:7F0"
"\nNote that the ID and mask are always interpreted as hex values",
)
flags = [f"--{prefix}-bus-kwargs"] if prefix else ["--bus-kwargs"]
dest = f"{prefix}_bus_kwargs" if prefix else "bus_kwargs"
group.add_argument(
*flags,
dest=dest,
action=_BusKwargsAction,
nargs=argparse.ONE_OR_MORE,
default=argparse.SUPPRESS,
metavar="BUS_KWARG",
help="Pass keyword arguments down to the instantiation of the bus class. "
"For example, `-i vector -c 1 --bus-kwargs app_name=MyCanApp serial=1234` is equivalent "
"to opening the bus with `can.Bus('vector', channel=1, app_name='MyCanApp', serial=1234)",
)
[docs]
def create_bus_from_namespace(
namespace: argparse.Namespace,
*,
prefix: str | None = None,
**kwargs: Any,
) -> can.BusABC:
"""Creates and returns a CAN bus instance based on the provided namespace and arguments.
:param namespace:
The namespace containing parsed arguments.
:param prefix:
An optional prefix for the argument names, enabling support for multiple buses.
:param kwargs:
Additional keyword arguments to configure the bus.
:return:
A CAN bus instance.
"""
config: dict[str, Any] = {"single_handle": True, **kwargs}
for keyword in (
"channel",
"interface",
"bitrate",
"fd",
"data_bitrate",
"can_filters",
"timing",
"bus_kwargs",
):
prefixed_keyword = f"{prefix}_{keyword}" if prefix else keyword
if prefixed_keyword in namespace:
value = getattr(namespace, prefixed_keyword)
if keyword == "bus_kwargs":
config.update(value)
else:
config[keyword] = value
try:
return can.Bus(**config)
except Exception as exc:
err_msg = f"Unable to instantiate bus from arguments {vars(namespace)}."
raise argparse.ArgumentError(None, err_msg) from exc
class _CanFilterAction(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
if not isinstance(values, list):
raise argparse.ArgumentError(self, "Invalid filter argument")
print(f"Adding filter(s): {values}")
can_filters: list[CanFilter] = []
for filt in values:
if ":" in filt:
parts = filt.split(":")
can_id = int(parts[0], base=16)
can_mask = int(parts[1], base=16)
elif "~" in filt:
parts = filt.split("~")
can_id = int(parts[0], base=16) | 0x20000000 # CAN_INV_FILTER
can_mask = int(parts[1], base=16) & 0x20000000 # socket.CAN_ERR_FLAG
else:
raise argparse.ArgumentError(self, "Invalid filter argument")
can_filters.append({"can_id": can_id, "can_mask": can_mask})
setattr(namespace, self.dest, can_filters)
class _BitTimingAction(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
if not isinstance(values, list):
raise argparse.ArgumentError(self, "Invalid --timing argument")
timing_dict: dict[str, int] = {}
for arg in values:
try:
key, value_string = arg.split("=")
value = int(value_string)
timing_dict[key] = value
except ValueError:
raise argparse.ArgumentError(
self, f"Invalid timing argument: {arg}"
) from None
if not (timing := _dict2timing(timing_dict)):
err_msg = "Invalid --timing argument. Incomplete parameters."
raise argparse.ArgumentError(self, err_msg)
setattr(namespace, self.dest, timing)
print(timing)
class _BusKwargsAction(argparse.Action):
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: str | Sequence[Any] | None,
option_string: str | None = None,
) -> None:
if not isinstance(values, list):
raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument")
bus_kwargs: dict[str, str | int | float | bool] = {}
for arg in values:
try:
match = re.match(
r"^(?P<name>[_a-zA-Z][_a-zA-Z0-9]*)=(?P<value>\S*?)$",
arg,
)
if not match:
raise ValueError
key = match["name"].replace("-", "_")
string_val = match["value"]
bus_kwargs[key] = cast_from_string(string_val)
except ValueError:
raise argparse.ArgumentError(
self,
f"Unable to parse bus keyword argument '{arg}'",
) from None
setattr(namespace, self.dest, bus_kwargs)
def _add_extra_args(
parser: argparse.ArgumentParser | argparse._ArgumentGroup,
) -> None:
parser.add_argument(
"extra_args",
nargs=argparse.REMAINDER,
help="The remaining arguments will be used for logger/player initialisation. "
"For example, `can_logger -i virtual -c test -f logfile.blf --compression-level=9` "
"passes the keyword argument `compression_level=9` to the BlfWriter.",
)
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
for arg in unknown_args:
if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg):
raise ValueError(f"Parsing argument {arg} failed")
def _split_arg(_arg: str) -> tuple[str, str]:
left, right = _arg.split("=", 1)
return left.lstrip("-").replace("-", "_"), right
args: dict[str, str | int | float | bool] = {}
for key, string_val in map(_split_arg, unknown_args):
args[key] = cast_from_string(string_val)
return args
def _set_logging_level_from_namespace(namespace: argparse.Namespace) -> None:
if "verbosity" in namespace:
logging_level_names = [
"critical",
"error",
"warning",
"info",
"debug",
"subdebug",
]
can.set_logging_level(logging_level_names[min(5, namespace.verbosity)])