Skip to content

Instantly share code, notes, and snippets.

@francisrstokes
Last active October 20, 2024 21:43
Show Gist options
  • Save francisrstokes/647bd9e5eb8536db4732e1ce197876e0 to your computer and use it in GitHub Desktop.
Save francisrstokes/647bd9e5eb8536db4732e1ce197876e0 to your computer and use it in GitHub Desktop.
Decodes SPI transactions for the ENC28J60 chip recorded and exported from the Saleae logic2 software
import pandas as pd
import sys
from enum import Enum
""" protocol-decoder.py <saleae csv file>
Takes in an exported csv file from the Saleae logic2 software, where the SPI analyzer was used to trace communication with an enc28j60 ethernet
chip from Microchip, and produces the instructions and responses the communication represents.
- Software resets
- Registers and their bitfields are decoded for reads, writes, and bit sets and clears
- Buffer memory reads and writes
- PHY register writes
"""
RCR = 0
RBM = 1
WCR = 2
WBM = 3
BFS = 4
BFC = 5
SRC = 7
EIE = 0x1b
EIR = 0x1c
ESTAT = 0x1d
ECON2 = 0x1e
ECON1 = 0x1f
MIREGADR = 0x14
MIWRL = 0x16
MIWRH = 0x17
PHY_BANK = 2
phy_regs = [
{"name": "PHCON1", "value": 0x00},
{"name": "PHSTAT1", "value": 0x01},
{"name": "PHID1", "value": 0x02},
{"name": "PHID2", "value": 0x03},
{"name": "PHCON2", "value": 0x10},
{"name": "PHSTAT2", "value": 0x11},
{"name": "PHIE", "value": 0x12},
{"name": "PHIR", "value": 0x13},
{"name": "PHLCON", "value": 0x14},
]
def find_phy_reg(reg):
for phy_reg in phy_regs:
if phy_reg["value"] == reg:
return phy_reg["name"]
raise Exception(f"No such PHY register 0x{reg:02x}")
def get_mask_shift(mask):
shift = 0
while (mask & 1) == 0:
shift += 1
mask >>= 1
return shift
ERXFCON_BITS = [
("UCEN", (1 << 7)),
("ANDOR", (1 << 6)),
("CRCEN", (1 << 5)),
("PMEN", (1 << 4)),
("MPEN", (1 << 3)),
("HTEN", (1 << 2)),
("MCEN", (1 << 1)),
("BCEN", (1 << 0))
]
MACON1_BITS = [
("TXPAUS", (1 << 3)),
("RXPAUS", (1 << 2)),
("PASSALL", (1 << 1)),
("MARXEN", (1 << 0)),
]
MACON3_BITS = [
("PADCFG", (7 << 5)),
("TXCRCEN", (1 << 4)),
("PHDREN", (1 << 3)),
("HFRMEN", (1 << 2)),
("FRMLNEN", (1 << 1)),
("FULDPX", (1 << 0)),
]
MICMD_BITS = [
("MIISCAN", (1 << 1)),
("MIIRD", (1 << 0)),
]
ECON1_BITS = [
("TXRST", (1 << 7)),
("RXRST", (1 << 6)),
("DMAST", (1 << 5)),
("CSUMEN", (1 << 4)),
("TXRTS", (1 << 3)),
("RXEN", (1 << 2)),
("BSEL", (3 << 0)),
]
ECON2_BITS = [
("AUTOINC", (1 << 7)),
("PKTDEC", (1 << 6)),
("PWRSV", (1 << 5)),
("VRPS", (1 << 3)),
]
ESTAT_BITS = [
("INT", (1 << 7)),
("BUFER", (1 << 6)),
("LATECOL", (1 << 4)),
("RXBUSY", (1 << 2)),
("TXABRT", (1 << 1)),
("CLKRDY", (1 << 0)),
]
EIE_BITS = [
("INTIE", (1 << 7)),
("PKTIE", (1 << 6)),
("DMAIE", (1 << 5)),
("LINKIE", (1 << 4)),
("TXIE", (1 << 3)),
("TXERIE", (1 << 1)),
("RXERIE", (1 << 0)),
]
EIR_BITS = [
("PKTIF", (1 << 6)),
("DMAIF", (1 << 5)),
("LINKIF", (1 << 4)),
("TXIF", (1 << 3)),
("TXERIF", (1 << 1)),
("RXERIF", (1 << 0)),
]
EBSTCON_BITS = [
("PSV2", (1 << 7)),
("PSV1", (1 << 6)),
("PSV0", (1 << 5)),
("PSEL", (1 << 4)),
("TMSEL1", (1 << 3)),
("TMSEL0", (1 << 2)),
("TME", (1 << 1)),
("BISTST", (1 << 0)),
]
MISTAT_BITS = [
("NVALID", (1 << 2)),
("SCAN", (1 << 1)),
("BUSY", (1 << 0)),
]
class RegDef():
def __init__(self, name, value, is_mac_mii, bit_defs = []):
self.name = name
self.value = value
self.is_mac_mii = is_mac_mii
self.bit_defs = bit_defs
def has_bit_defs(self):
return len(self.bit_defs) > 0
def value_to_bits(self, value):
bit_def_strs = [f"{bd[0]}=0x{(bd[1] & value) >> get_mask_shift(bd[1]):x}" for bd in self.bit_defs]
return ",".join(bit_def_strs)
regs = {
"bank_independent": [
RegDef("EIE", 0x1B, False, EIE_BITS),
RegDef("EIR", 0x1C, False, EIR_BITS),
RegDef("ESTAT", 0x1D, False, ESTAT_BITS),
RegDef("ECON2", 0x1E, False, ECON2_BITS),
RegDef("ECON1", 0x1F, False, ECON1_BITS),
],
"banks": [
# 0
[
RegDef("ERDPTL", 0x00, False),
RegDef("ERDPTH", 0x01, False),
RegDef("EWRPTL", 0x02, False),
RegDef("EWRPTH", 0x03, False),
RegDef("ETXSTL", 0x04, False),
RegDef("ETXSTH", 0x05, False),
RegDef("ETXNDL", 0x06, False),
RegDef("ETXNDH", 0x07, False),
RegDef("ERXSTL", 0x08, False),
RegDef("ERXSTH", 0x09, False),
RegDef("ERXNDL", 0x0A, False),
RegDef("ERXNDH", 0x0B, False),
RegDef("ERXRDPTL", 0x0C, False),
RegDef("ERXRDPTH", 0x0D, False),
RegDef("ERXWRPTL", 0x0E, False),
RegDef("ERXWRPTH", 0x0F, False),
RegDef("EDMASTL", 0x10, False),
RegDef("EDMASTH", 0x11, False),
RegDef("EDMANDL", 0x12, False),
RegDef("EDMANDH", 0x13, False),
RegDef("EDMADSTL", 0x14, False),
RegDef("EDMADSTH", 0x15, False),
RegDef("EDMACSL", 0x16, False),
RegDef("EDMACSH", 0x17, False),
],
# 1
[
RegDef("EHT0", 0x00, False),
RegDef("EHT1", 0x01, False),
RegDef("EHT2", 0x02, False),
RegDef("EHT3", 0x03, False),
RegDef("EHT4", 0x04, False),
RegDef("EHT5", 0x05, False),
RegDef("EHT6", 0x06, False),
RegDef("EHT7", 0x07, False),
RegDef("EPMM0", 0x08, False),
RegDef("EPMM1", 0x09, False),
RegDef("EPMM2", 0x0A, False),
RegDef("EPMM3", 0x0B, False),
RegDef("EPMM4", 0x0C, False),
RegDef("EPMM5", 0x0D, False),
RegDef("EPMM6", 0x0E, False),
RegDef("EPMM7", 0x0F, False),
RegDef("EPMCSL", 0x10, False),
RegDef("EPMCSH", 0x11, False),
RegDef("EPMOL", 0x14, False),
RegDef("EPMOH", 0x15, False),
RegDef("EWOLIE", 0x16, False),
RegDef("EWOLIR", 0x17, False),
RegDef("ERXFCON", 0x18, False, ERXFCON_BITS),
RegDef("EPKTCNT", 0x19, False),
],
# 2
[
RegDef("MACON1", 0x00, True, MACON1_BITS),
RegDef("MACON2", 0x01, True),
RegDef("MACON3", 0x02, True, MACON3_BITS),
RegDef("MACON4", 0x03, True),
RegDef("MABBIPG", 0x04, True),
RegDef("MAIPGL", 0x06, True),
RegDef("MAIPGH", 0x07, True),
RegDef("MACLCON1", 0x08, True),
RegDef("MACLCON2", 0x09, True),
RegDef("MAMXFLL", 0x0A, True),
RegDef("MAMXFLH", 0x0B, True),
RegDef("MAPHSUP", 0x0D, True),
RegDef("MICON", 0x11, True),
RegDef("MICMD", 0x12, True, MICMD_BITS),
RegDef("MIREGADR", 0x14, True),
RegDef("MIWRL", 0x16, True),
RegDef("MIWRH", 0x17, True),
RegDef("MIRDL", 0x18, True),
RegDef("MIRDH", 0x19, True),
],
# 3
[
RegDef("MAADR1", 0x00, True),
RegDef("MAADR0", 0x01, True),
RegDef("MAADR3", 0x02, True),
RegDef("MAADR2", 0x03, True),
RegDef("MAADR5", 0x04, True),
RegDef("MAADR4", 0x05, True),
RegDef("EBSTSD", 0x06, False),
RegDef("EBSTCON", 0x07, False, EBSTCON_BITS),
RegDef("EBSTCSL", 0x08, False),
RegDef("EBSTCSH", 0x09, False),
RegDef("MISTAT", 0x0A, True, MISTAT_BITS),
RegDef("EREVID", 0x12, False),
RegDef("ECOCON", 0x15, False),
RegDef("EFLOCON", 0x17, False),
RegDef("EPAUSL", 0x18, False),
RegDef("EPAUSH", 0x19, False),
]
]
}
if len(sys.argv) < 2:
print(f"usage: {sys.argv[0]} <capture file>")
exit(1)
capture_file = sys.argv[1]
df = pd.read_csv(capture_file)
df.columns = ["protocol", "event_type", "start", "duration", "mosi", "miso"]
df["mosi"] = df["mosi"].fillna("0x00")
df["miso"] = df["miso"].fillna("0x00")
# Extract two bytestreams from the csv data
mosi = bytearray(df["mosi"].apply(lambda x: int(x, 16)))
miso = bytearray(df["miso"].apply(lambda x: int(x, 16)))
events = df["event_type"]
class PHYPhase(Enum):
MIREGADR = 0
MIWRL = 1
MIWRH = 2
class ENC28J60ProtocolDecoder():
def __init__(self, mosi, miso, events, log_bank_switches = False):
self.stream_index = 0
self.current_bank = 0
self.in_transmission = False
self.log_bank_switches = log_bank_switches
self.mosi = mosi
self.miso = miso
self.events = events
self.phy_phase = PHYPhase.MIREGADR
self.last_phy_addr = 0x00
self.last_phy_value_l = 0x00
self.last_phy_value_h = 0x00
def find_reg(self, reg):
found_reg = None
for reg_def in regs["bank_independent"]:
if reg_def.value == reg:
found_reg = reg_def
break
if not found_reg:
for reg_def in regs["banks"][self.current_bank]:
if reg_def.value == reg:
found_reg = reg_def
break
if not found_reg:
raise Exception(f"Reg {reg:02x} in bank {self.current_bank} not found")
return found_reg
def read_reg(self, reg):
found_reg = self.find_reg(reg)
if found_reg.is_mac_mii:
# Dummy read
self.stream_index += 2
else:
self.stream_index += 1
read_value = self.miso[self.stream_index]
self.stream_index += 1
print(f"{'[Read]:':<13}{found_reg.name:<10}\t0x{read_value:02x}", end="")
if found_reg.has_bit_defs():
print(f"\t[{found_reg.value_to_bits(read_value)}]", end="")
print("")
def write_reg(self, reg):
found_reg = self.find_reg(reg)
self.stream_index += 1
written_value = self.mosi[self.stream_index]
was_phy_write = False
if reg == ECON1:
prev_bank = self.current_bank
self.current_bank = written_value & 0x3
if prev_bank != self.current_bank and self.log_bank_switches:
print(f" bank switch [{prev_bank} -> {self.current_bank}]")
elif reg == MIREGADR and self.current_bank == PHY_BANK:
self.last_phy_addr = written_value
self.phy_phase = PHYPhase.MIWRL
elif reg == MIWRL and self.current_bank == PHY_BANK and self.phy_phase == PHYPhase.MIWRL:
self.last_phy_value_l = written_value
self.phy_phase = PHYPhase.MIWRH
elif reg == MIWRH and self.current_bank == PHY_BANK and self.phy_phase == PHYPhase.MIWRH:
self.last_phy_value_h = written_value
self.phy_phase = PHYPhase.MIREGADR
was_phy_write = True
self.stream_index += 1
print(f"{'[Write]:':<13}{found_reg.name:<10}\t0x{written_value:02x}", end="")
if found_reg.has_bit_defs():
print(f"\t[{found_reg.value_to_bits(written_value)}]", end="")
if was_phy_write:
print(f"\t<(PHY) {find_phy_reg(self.last_phy_addr)} 0x{(self.last_phy_value_h << 8) | self.last_phy_value_l:04x}>", end="")
print("")
def clear_bits_reg(self, reg):
found_reg = self.find_reg(reg)
self.stream_index += 1
bits_cleared = self.mosi[self.stream_index]
if reg == ECON1 and (bits_cleared & 0x3) != 0:
prev_bank = self.current_bank
self.current_bank = self.current_bank & (~bits_cleared & 0x3)
if prev_bank != self.current_bank and self.log_bank_switches:
print(f" bank switch [{prev_bank} -> {self.current_bank}]")
self.stream_index += 1
print(f"{'[Bit clear]:':<13}{found_reg.name:<10}\t0x{bits_cleared:02x}", end="")
if found_reg.has_bit_defs():
print(f"\t[{found_reg.value_to_bits(bits_cleared)}]", end="")
print("")
def set_bits_reg(self, reg):
found_reg = self.find_reg(reg)
self.stream_index += 1
bits_set = self.mosi[self.stream_index]
if reg == ECON1 and (bits_set & 0x3) != 0:
prev_bank = self.current_bank
self.current_bank = (self.current_bank & ~0x3) | (bits_set & 0x3)
if prev_bank != self.current_bank and self.log_bank_switches:
print(f" bank switch [{prev_bank} -> {self.current_bank}]")
self.stream_index += 1
print(f"{'[Bit set]:':<13}{found_reg.name:<10}\t0x{bits_set:02x}", end="")
if found_reg.has_bit_defs():
print(f"\t[{found_reg.value_to_bits(bits_set)}]", end="")
print("")
def consume_cs_disable(self):
event = self.events[self.stream_index]
if not event == "disable":
raise Exception(f"Expected CS disable at stream index: {self.stream_index}")
self.stream_index += 1
self.in_transmission = False
def write_buffer_memory(self):
self.stream_index += 1
bytes_written = []
while self.events[self.stream_index] != "disable":
bytes_written.append(self.mosi[self.stream_index])
self.stream_index += 1
bytes_formatted = [f"0x{b:02x}" for b in bytes_written]
bytes_formatted = ",".join(bytes_formatted)
print(f"{'[Buf write]:':<13}\t\t{bytes_formatted}")
def read_buffer_memory(self):
self.stream_index += 1
bytes_written = []
while self.events[self.stream_index] != "disable":
bytes_written.append(self.miso[self.stream_index])
self.stream_index += 1
bytes_formatted = [f"0x{b:02x}" for b in bytes_written]
bytes_formatted = ",".join(bytes_formatted)
print(f"{'[Buf read]:':<13}\t\t{bytes_formatted}")
def parse(self):
while self.stream_index < len(self.mosi):
event = self.events[self.stream_index]
if not self.in_transmission:
if not event == "enable":
raise Exception(f"Expected enable event, got '{event}' @ index {self.stream_index}")
self.in_transmission = True
self.stream_index += 1
continue
# In transmission
if event == "result":
opcode = self.mosi[self.stream_index] >> 5
if opcode == RCR:
self.read_reg(mosi[self.stream_index] & 0x1f)
self.consume_cs_disable()
continue
elif opcode == RBM:
self.read_buffer_memory()
self.consume_cs_disable()
elif opcode == WCR:
self.write_reg(mosi[self.stream_index] & 0x1f)
self.consume_cs_disable()
continue
elif opcode == WBM:
self.write_buffer_memory()
self.consume_cs_disable()
elif opcode == BFS:
self.set_bits_reg(mosi[self.stream_index] & 0x1f)
self.consume_cs_disable()
continue
elif opcode == BFC:
self.clear_bits_reg(mosi[self.stream_index] & 0x1f)
self.consume_cs_disable()
continue
elif opcode == SRC:
print("[Reset]")
self.stream_index += 1
continue
else:
raise Exception(f"Invalid opcode: {opcode}")
elif event == "disable":
self.in_transmission = False
self.stream_index += 1
continue
enc28j60_protocol_decoder = ENC28J60ProtocolDecoder(mosi, miso, events)
enc28j60_protocol_decoder.parse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment