Skip to content

Instantly share code, notes, and snippets.

@nbuchwitz
Created April 17, 2025 10:15
Show Gist options
  • Save nbuchwitz/eee269388816e4a6d84ad8fb24092573 to your computer and use it in GitHub Desktop.
Save nbuchwitz/eee269388816e4a6d84ad8fb24092573 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import random
import subprocess
import sys
import can
verbose = False
def print_verbose(message: str) -> None:
"""Prints the message if verbose mode is enabled."""
if verbose:
print(message)
def check_interface_exists(interface: str) -> bool:
"""Check if the specified CAN interface exists."""
try:
subprocess.run(
["ip", "link", "show", interface],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print_verbose(f"Interface {interface} exists.")
return True
except subprocess.CalledProcessError:
print(f"Error: Interface {interface} does not exist.")
return False
def configure_can_interface(
interface: str, bitrate: int, termination: bool = None
) -> None:
"""Sets up the CAN interface with the specified bitrate and termination if specified."""
try:
# Bring the interface down to set bitrate and termination
subprocess.run(["sudo", "ip", "link", "set", interface, "down"], check=True)
# Set the bitrate
subprocess.run(
[
"sudo",
"ip",
"link",
"set",
interface,
"type",
"can",
"bitrate",
str(bitrate),
],
check=True,
)
# Set termination if specified
if termination:
subprocess.run(
[
"sudo",
"ip",
"link",
"set",
interface,
"type",
"can",
"termination",
"120",
],
check=True,
)
print_verbose(f"Termination enabled on {interface}.")
# Bring the interface up
subprocess.run(["sudo", "ip", "link", "set", interface, "up"], check=True)
print_verbose(f"Configured {interface} with bitrate {bitrate} bps.")
except subprocess.CalledProcessError as e:
print(f"Error: Failed to configure {interface}. {e}")
sys.exit(1)
def run_host_mode(interface: str) -> None:
"""Listens for incoming CAN messages and responds with reversed data."""
try:
with can.interface.Bus(interface, bustype="socketcan") as bus:
print_verbose(f"Listening for messages on {interface}...")
for msg in bus:
if msg is not None:
print_verbose(f"Received message: {msg}")
reversed_data = msg.data[::-1]
response = can.Message(
arbitration_id=msg.arbitration_id,
data=reversed_data,
is_extended_id=msg.is_extended_id,
)
try:
bus.send(response)
print_verbose(f"Responded with reversed message: {response}")
except can.CanError:
print("Error: Failed to send response message.")
except KeyboardInterrupt:
print_verbose("Got CTRL+C. Exiting ...")
except OSError:
print(
f"Error: Interface {interface} could not be opened. Make sure it's available and up."
)
sys.exit(1)
def run_dut_mode(interface: str, timeout: int = 5) -> bool:
"""Sends a random CAN message and checks for a reversed response."""
try:
with can.interface.Bus(interface, bustype="socketcan") as bus:
# Generate a random CAN message with random data (max 4 bytes)
# Start / end message with 8 identical bits to ensure that the stuff bit
# and end of frame is detected correctly
# See https://de.wikipedia.org/wiki/Controller_Area_Network#Frame-Aufbau
data = bytearray(b"\x00\x00")
data += bytearray(random.getrandbits(8) for _ in range(random.randint(1, 4)))
data += bytearray(b"\xff\xff")
message = can.Message(
arbitration_id=random.randint(0, 0x7FF), data=data, is_extended_id=False
)
print_verbose(f"Sending message: {message}")
bus.send(message)
# Calculate expected reversed data for verification
expected_data = data[::-1]
# Wait for response with a timeout
response = bus.recv(timeout=timeout)
if response is None:
print("Error: No response received.")
return False
# Print only the received response
print_verbose(f"Received response: {response}")
# Check if the response matches the expected reversed data
if response.data == expected_data:
print_verbose("Success: Received reversed message.")
return True
else:
print("Error: Response data does not match expected reversed message.")
return False
except can.CanError:
print("Error: Failed to send CAN message.")
return False
except OSError:
print(
f"Error: Interface {interface} could not be opened. Make sure it's available and up."
)
sys.exit(1)
def main(mode, interface: str, bitrate: int, termination: bool) -> None:
print(f"Mode: {mode}")
print(f"Interface: {interface}")
print(f"Bitrate: {bitrate} bps")
print(f"Termination: {'Enabled' if termination else 'Disabled'}")
if not check_interface_exists(interface):
sys.exit(1)
configure_can_interface(interface, bitrate, termination)
if mode == "dut":
success = run_dut_mode(interface)
if success:
print("SUCCESS: DUT test passed")
else:
print("ERROR: DUT test failed")
elif mode == "host":
run_host_mode(interface)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test CAN interface on DUT or Host.")
parser.add_argument(
"mode", choices=["dut", "host"], help="Mode to operate in: 'dut' or 'host'"
)
parser.add_argument(
"-i", "--interface", default="can0", help="Interface name (default: can0)"
)
parser.add_argument(
"-b",
"--bitrate",
type=int,
default=1000000,
help="Bitrate for CAN interface (default: 1000000 bps)",
)
parser.add_argument(
"-t",
"--termination",
action="store_true",
help="Enable termination on the CAN interface.",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose output."
)
args = parser.parse_args()
verbose = args.verbose
main(args.mode, args.interface, args.bitrate, args.termination)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment