Skip to content

Instantly share code, notes, and snippets.

@bbartling
Last active January 19, 2026 14:46
Show Gist options
  • Select an option

  • Save bbartling/fe0dbae5fa4f934ad992f3d999a2153e to your computer and use it in GitHub Desktop.

Select an option

Save bbartling/fe0dbae5fa4f934ad992f3d999a2153e to your computer and use it in GitHub Desktop.
BACnet Auto-Scan to CSV with bacpypes3 and enhanced tester py with the bacpypes console

BACnet Diagnostic Tools

Two scripts for discovering, auditing, and manually testing BACnet networks.

1. Install Requirements

Run this once to get the required libraries:

pip install bacpypes3 ifaddr rdflib

2. Auto-Scan (bacnet_autoscan.py)

Scans a range of device IDs and saves a CSV inventory for each device found. Captures names, values, units, and priority arrays.

Run the scan to output CSV file only:

python bacnet_autoscan.py --low-instance 1 --high-instance 3456999 --output-dir autoscan_csv

Output:

  • Check the autoscan_csv/ folder.
  • You will see files like audit_3456789.csv.

Run the scan to output CSV and ASHRAE 223P model:

python bacnet_autoscan.py \
  --low-instance 1 \
  --high-instance 3456999 \
  --output-dir autoscan_csv \
  --output-223p autoscan_223p.ttl \
  --model-base-uri urn:my-building

Output:

  • Write per-device CSVs into autoscan_csv/ (e.g., audit_.csv)
  • Write one combined Turtle file autoscan_223p.ttl for the same Who-Is range

TODO:

  • validate or enrich the 223P model afterward (BuildingMOTIF, SHACL, inference)
pip install pyshacl ontoenv buildingmotif

3. Interactive Shell (tester.py)

A command-line shell for manually reading, writing, and overriding BACnet points. It also includes integrated RDF tools for querying (SPARQL) and validating (SHACL) your generated ASHRAE 223P models without leaving the console.

Start the shell:

python tester.py

(If you need to bind to a specific IP, add --address 192.168.1.X/24)

Shell Command Cheat Sheet

Once inside the shell (>), type these commands:

Discovery

> whois                     # Find all devices
> whois 1000 2000           # Find devices in range 1000-2000
> objects 192.168.1.20 123  # List all points on device 123 (auto-fallbacks to single read if needed)

Reading & Writing Data *Format: write <IP> <Object> <Property> <Value> <Priority>*

> read 192.168.1.20 analog-input,1 present-value
> write 192.168.1.20 binary-output,1 present-value active 8    # Turn ON (Priority 8)
> write 192.168.1.20 binary-output,1 present-value null 8      # Release Override

Checking Priorities Always check who is controlling a point before you override it.

> priority 192.168.1.20 binary-output,1

Model Tools (RDF / 223P) Run queries and validation against your generated model.ttl file.

# Run a SHACL validation report (Pass/Fail)
> shacl model.ttl shapes.ttl

# Run a SPARQL query (Dump first 10 items)
> sparql model.ttl "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"

# Find all Devices in the model
> sparql model.ttl "SELECT ?device ?name WHERE { ?device a bacnet:BACnetDevice ; rdfs:label ?name }"

# Find all 'Temp' sensors and their values
> sparql model.ttl "SELECT ?point ?val WHERE { ?point a s223:QuantifiableObservableProperty ; rdfs:label ?name ; bacnet:presentValue ?val . FILTER regex(?name, 'Temp', 'i') }"

Test Bench Hammers (Drills)

Use these sequences to test real hardware response and model accuracy.

Hardware Drill

> whois 1000 3456799
> read 192.168.204.13 analog-input,1 present-value
> priority 192.168.204.14 analog-output,1
> write 192.168.204.14 analog-output,1 present-value 999.8 9
> write 192.168.204.14 analog-output,1 present-value null 9
> priority 192.168.204.14 analog-output,1

Data Drill

> sparql model.ttl "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"
> sparql model.ttl "SELECT ?point ?val WHERE { ?point a s223:QuantifiableObservableProperty ; bacnet:presentValue ?val }"

Exit

> exit

Under the Hood

The tester.py script runs a single Python process that manages distinct workloads to keep the system responsive:

  1. The BACnet Stack: It maintains an active UDP socket listener on port 47808 using bacpypes3. This runs on the main event loop, allowing it to receive I-Am and COV notifications asynchronously while you type.
  2. The RDF Engine: When you run sparql or shacl, the script offloads these CPU-intensive tasks to a separate thread using asyncio.to_thread. This prevents the heavy graph processing (loading thousands of triples) from blocking the network socket. You can validate a massive model without causing BACnet timeouts or dropping packets.
  3. The SHACL Validator: This tool applies the logic defined in shapes.ttl to your data graph. Think of shapes.ttl as a Pydantic Model for your graph. Just as Pydantic enforces that a Python dictionary has the correct keys and value types (e.g., ip: str), SHACL enforces that your graph nodes possess the required RDF properties (e.g., bacnet:deviceInstance). It transforms the graph from a simple collection of data points into a validated, compliant model.
#!/usr/bin/env python
"""
BACnet Auto-Scan
=======================================================
Scans a range of devices and builds a "Wide" CSV inventory.
Each row is one BACnet Object, containing columns for common
properties useful for network documentation and troubleshooting.
NEW (Optional):
- Can also emit a lightweight "223P-ish" RDF/Turtle model containing:
* One node per BACnet device
* One node per BACnet object/point
* BACnet addressing metadata (IP, device instance, object type/instance)
* Writability flag (supports priority-array == commandable proxy)
pip install rdflib pyshacl ontoenv bacpypes3 ifaddr
This RDF output is intended as a *proto-model* for brownfield workflows:
it captures telemetry + addressing + commandability, and can be enriched later
with proper 223P equipment topology/templates and QUDT unit URIs.
Usage:
python bacnet_autoscan.py --low-instance 1 --high-instance 3456999 --output-dir autoscan_csv
python bacnet_autoscan.py --low-instance 1 --high-instance 3456999 --output-dir autoscan_csv --output-223p model.ttl
"""
import asyncio
import csv
import logging
import os
from typing import Any, List, Optional, Tuple, Dict
import bacpypes3
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.app import Application
from bacpypes3.apdu import AbortPDU, ErrorRejectAbortNack
from bacpypes3.constructeddata import AnyAtomic
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import ObjectIdentifier, BitString
# Global application instance
app: Optional[Application] = None
log = logging.getLogger(__name__)
# -----------------------------
# Optional RDF/223P builder
# -----------------------------
class Telemetry223PBuilder:
"""
Minimal RDF builder that writes a "223P-ish" telemetry inventory graph.
This does NOT attempt to infer HVAC topology; it focuses on points + addressing
and uses 223P classes where safe (QuantifiableObservableProperty / ObservableProperty).
"""
def __init__(self, model_base_uri: str):
# Delay imports so users don't need rdflib unless they enable --output-223p
from rdflib import Graph, Namespace, Literal, URIRef
from rdflib.namespace import RDF, RDFS, XSD
self.Graph = Graph
self.Namespace = Namespace
self.Literal = Literal
self.URIRef = URIRef
self.RDF = RDF
self.RDFS = RDFS
self.XSD = XSD
self.g = Graph()
# Namespaces
self.S223 = Namespace("http://data.ashrae.org/standard223#")
self.QUDT = Namespace("http://qudt.org/schema/qudt/")
self.BACNET = Namespace(model_base_uri.rstrip("/") + "/bacnet#")
self.EX = Namespace(model_base_uri.rstrip("/") + "/entity/")
# Bind prefixes for readable Turtle
self.g.bind("s223", self.S223)
self.g.bind("qudt", self.QUDT)
self.g.bind("bacnet", self.BACNET)
self.g.bind("ex", self.EX)
def device_uri(self, device_instance: int) -> Any:
return self.EX[f"device/{device_instance}"]
def point_uri(self, device_instance: int, obj_type: str, obj_inst: int) -> Any:
safe_type = str(obj_type).replace(" ", "_")
return self.EX[f"point/{device_instance}/{safe_type}/{obj_inst}"]
def add_device(self, device_instance: int, ip: str) -> None:
dev = self.device_uri(device_instance)
self.g.add((dev, self.RDF.type, self.BACNET.BACnetDevice))
self.g.add((dev, self.RDFS.label, self.Literal(f"BACnet Device {device_instance}")))
self.g.add((dev, self.BACNET.deviceInstance, self.Literal(device_instance, datatype=self.XSD.integer)))
self.g.add((dev, self.BACNET.ipAddress, self.Literal(ip)))
def add_point(
self,
device_instance: int,
ip: str,
obj_type: str,
obj_inst: int,
name: str,
description: str,
units: str,
present_value: str,
supports_priority_array: bool,
) -> None:
dev = self.device_uri(device_instance)
pt = self.point_uri(device_instance, obj_type, obj_inst)
# Attach device if not already present
self.add_device(device_instance, ip)
# Type the point in a conservative 223P-friendly way.
# If we have units, treat as QuantifiableObservableProperty; otherwise ObservableProperty.
if units:
self.g.add((pt, self.RDF.type, self.S223.QuantifiableObservableProperty))
else:
self.g.add((pt, self.RDF.type, self.S223.ObservableProperty))
# Link point to device (custom relationship; later enrichment can map to true 223P topology)
self.g.add((dev, self.BACNET.hasPoint, pt))
# Human labels
if name:
self.g.add((pt, self.RDFS.label, self.Literal(name)))
if description:
self.g.add((pt, self.BACNET.description, self.Literal(description)))
# BACnet addressing metadata
self.g.add((pt, self.BACNET.objectType, self.Literal(str(obj_type))))
self.g.add((pt, self.BACNET.objectInstance, self.Literal(int(obj_inst), datatype=self.XSD.integer)))
self.g.add((pt, self.BACNET.deviceInstance, self.Literal(int(device_instance), datatype=self.XSD.integer)))
self.g.add((pt, self.BACNET.ipAddress, self.Literal(ip)))
# Telemetry metadata (strings because BACnet values can be complex / vendor-specific)
if present_value != "":
self.g.add((pt, self.BACNET.presentValue, self.Literal(str(present_value))))
if units:
# In a “real” 223P model, you'd map this to a QUDT unit URI.
# Here we keep the raw BACnet EngineeringUnits text for downstream mapping.
self.g.add((pt, self.BACNET.engineeringUnits, self.Literal(str(units))))
# Writability/commandability proxy:
# If the point supports priority-array, it is generally commandable.
self.g.add(
(pt, self.BACNET.supportsPriorityArray, self.Literal(bool(supports_priority_array), datatype=self.XSD.boolean))
)
self.g.add(
(pt, self.BACNET.isWritable, self.Literal(bool(supports_priority_array), datatype=self.XSD.boolean))
)
def serialize(self, output_path: str) -> None:
with open(output_path, "wb") as f:
self.g.serialize(f, format="turtle")
# -----------------------------
# BACnet read helpers
# -----------------------------
async def get_device_object_list(
device_address: Address,
device_identifier: ObjectIdentifier,
) -> List[ObjectIdentifier]:
"""
Reads the object-list from a device.
INCLUDES FALLBACK: Handles standard arrays and index-by-index reading.
"""
assert app is not None
object_list: List[ObjectIdentifier] = []
log.info(" - Reading object-list from %s...", device_identifier)
# 1. Try reading entire array at once (Fastest)
try:
object_list = await app.read_property(
device_address, device_identifier, "object-list"
)
return object_list
except (AbortPDU, ErrorRejectAbortNack):
pass
# 2. FALLBACK MECHANISM
try:
list_len = await app.read_property(
device_address, device_identifier, "object-list", array_index=0
)
log.info(" * Fallback triggered: Reading %s objects one-by-one...", list_len)
for i in range(list_len):
obj_id = await app.read_property(
device_address, device_identifier, "object-list", array_index=i + 1
)
object_list.append(obj_id)
if i % 10 == 0:
print(".", end="", flush=True)
print("")
return object_list
except Exception as e:
log.warning(" ! Failed to read object-list even with fallback: %s", e)
return []
async def read_prop_safe(
dev_addr: Address,
obj_id: ObjectIdentifier,
prop_id: str
) -> str:
"""
Reads a single property safely. Returns string representation or empty string if failed.
"""
assert app is not None
try:
val = await app.read_property(dev_addr, obj_id, prop_id)
if isinstance(val, AnyAtomic):
val = val.get_value()
if isinstance(val, BitString):
return str(val)
if hasattr(val, "attr"):
return str(val.attr)
return str(val)
except (ErrorRejectAbortNack, AbortPDU, AttributeError, ValueError):
return ""
except Exception as e:
log.debug(f"Error reading {prop_id} on {obj_id}: {e}")
return ""
async def read_priority_array_support(
dev_addr: Address,
obj_id: ObjectIdentifier
) -> Tuple[bool, str]:
"""
Returns:
(supports_priority_array, active_slots_str)
- supports_priority_array is True if reading 'priority-array' succeeds (even if all NULL).
- active_slots_str is a compact "{8: 72.0, 16: 70.0}"-style string of non-NULL slots, else "".
"""
assert app is not None
try:
pa = await app.read_property(dev_addr, obj_id, "priority-array")
except (ErrorRejectAbortNack, AbortPDU):
return (False, "")
except Exception as e:
log.debug(f"Could not read priority-array: {e}")
return (False, "")
# If we got here, the property exists/succeeds => treat as "supports"
if not pa:
return (True, "")
active_slots: Dict[int, Any] = {}
for idx, item in enumerate(pa):
if item is not None:
val_type = getattr(item, "_choice", None)
val = getattr(item, val_type, None) if val_type else None
if isinstance(val, AnyAtomic):
val = val.get_value()
if val is not None:
active_slots[idx + 1] = val
if not active_slots:
return (True, "")
return (True, str(active_slots))
# -----------------------------
# Scanner
# -----------------------------
async def scan_range(
low: int,
high: int,
output_dir: Optional[str],
output_223p: Optional[str],
model_base_uri: str,
):
assert app is not None
if output_dir:
os.makedirs(output_dir, exist_ok=True)
builder: Optional[Telemetry223PBuilder] = None
if output_223p:
builder = Telemetry223PBuilder(model_base_uri=model_base_uri)
log.info(f"Broadcasting Who-Is {low} - {high}...")
i_ams = await app.who_is(low, high)
if not i_ams:
log.info("No devices found.")
return
headers = [
"DeviceID", "IP", "ObjType", "ObjInst",
"Name", "Description",
"PresentValue", "Units/StateText",
"Reliability", "OutOfService", "StatusFlags",
"PriorityArray(Active)"
]
for i_am in i_ams:
dev_id_obj: ObjectIdentifier = i_am.iAmDeviceIdentifier
dev_addr: Address = i_am.pduSource
instance = dev_id_obj[1]
if not (low <= instance <= high):
continue
log.info(f"Scanning Device {instance} @ {dev_addr}...")
obj_list = await get_device_object_list(dev_addr, dev_id_obj)
if not obj_list:
continue
rows = []
for obj_id in obj_list:
obj_type, obj_inst = obj_id
name = await read_prop_safe(dev_addr, obj_id, "object-name")
desc = await read_prop_safe(dev_addr, obj_id, "description")
pv = await read_prop_safe(dev_addr, obj_id, "present-value")
units = await read_prop_safe(dev_addr, obj_id, "units")
if not units:
active_txt = await read_prop_safe(dev_addr, obj_id, "active-text")
if active_txt:
units = f"Active: {active_txt}"
rel = await read_prop_safe(dev_addr, obj_id, "reliability")
oos = await read_prop_safe(dev_addr, obj_id, "out-of-service")
flags = await read_prop_safe(dev_addr, obj_id, "status-flags")
supports_pa, pa_str = await read_priority_array_support(dev_addr, obj_id)
row = [
instance, str(dev_addr), str(obj_type), obj_inst,
name, desc,
pv, units,
rel, oos, flags,
pa_str
]
rows.append(row)
# Add point to optional RDF model
if builder:
builder.add_point(
device_instance=instance,
ip=str(dev_addr),
obj_type=str(obj_type),
obj_inst=int(obj_inst),
name=name,
description=desc,
units=units,
present_value=pv,
supports_priority_array=supports_pa,
)
print(f" > Found {obj_type}:{obj_inst} | {name} | {pv} {units}")
if output_dir:
fname = os.path.join(output_dir, f"audit_{instance}.csv")
with open(fname, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
log.info(f"Saved audit report to {fname}")
# Write 223P-ish model at end (single combined model)
if builder and output_223p:
builder.serialize(output_223p)
log.info(f"Saved 223P telemetry model to {output_223p}")
async def main():
global app
parser = SimpleArgumentParser()
parser.add_argument("--low-instance", type=int, required=True)
parser.add_argument("--high-instance", type=int, required=True)
parser.add_argument("--output-dir", type=str, default="bacnet_audit")
# NEW:
parser.add_argument(
"--output-223p",
type=str,
default=None,
help="Optional: write a Turtle (.ttl) file containing a 223P-ish telemetry inventory graph",
)
parser.add_argument(
"--model-base-uri",
type=str,
default="urn:bacnet-autoscan",
help="Base URI used for generated RDF entities (default: urn:bacnet-autoscan)",
)
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
app = Application.from_args(args)
try:
await scan_range(
args.low_instance,
args.high_instance,
args.output_dir,
args.output_223p,
args.model_base_uri,
)
finally:
app.close()
if __name__ == "__main__":
asyncio.run(main())
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix bacnet: <urn:bacnet-autoscan/bacnet#> .
@prefix s223: <http://data.ashrae.org/standard223#> .
# Shape 1: Validate Devices
# Every node of type bacnet:BACnetDevice must have exactly one IP and one Instance.
bacnet:DeviceShape
a sh:NodeShape ;
sh:targetClass bacnet:BACnetDevice ;
sh:property [
sh:path bacnet:ipAddress ;
sh:datatype xsd:string ;
sh:minCount 1 ;
sh:maxCount 1 ;
sh:message "Device must have exactly one IP address string." ;
] ;
sh:property [
sh:path bacnet:deviceInstance ;
sh:datatype xsd:integer ;
sh:minCount 1 ;
sh:maxCount 1 ;
sh:message "Device must have exactly one integer Instance ID." ;
] .
# Shape 2: Validate Points
# Every Quantifiable point must have a present value.
bacnet:PointShape
a sh:NodeShape ;
sh:targetClass s223:QuantifiableObservableProperty ;
sh:property [
sh:path bacnet:presentValue ;
sh:minCount 1 ;
sh:message "Real-world points must have a present value!" ;
] .
from pathlib import Path
from rdflib import Graph
import pyshacl
from ontoenv import OntoEnv
import asyncio
import re
from typing import List, Optional, Tuple
from bacpypes3.pdu import Address
from bacpypes3.comm import bind
from bacpypes3.debugging import bacpypes_debugging
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.app import Application
from bacpypes3.console import Console
from bacpypes3.cmd import Cmd
from bacpypes3.primitivedata import Null, ObjectIdentifier
from bacpypes3.npdu import IAmRouterToNetwork
from bacpypes3.constructeddata import AnyAtomic
from bacpypes3.apdu import (
ErrorRejectAbortNack,
PropertyReference,
PropertyIdentifier,
ErrorType,
AbortPDU,
AbortReason
)
from bacpypes3.vendor import get_vendor_info
from bacpypes3.netservice import NetworkAdapter
import sys
import argparse
"""
Test Bench Hammers
> whois 1000 3456799
> read 192.168.204.13 analog-input,1 present-value
> priority 192.168.204.14 analog-output,1
> write 192.168.204.14 analog-output,1 present-value 999.8 9
> write 192.168.204.14 analog-output,1 present-value null 9
> priority 192.168.204.14 analog-output,1
Drill 1: The "Hello World" (Dump everything)
> sparql model.ttl "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10"
Drill 2: Find all Devices
> sparql model.ttl "SELECT ?device ?name WHERE { ?device a bacnet:BACnetDevice ; rdfs:label ?name }"
Drill 3: Find "Temp" sensors and their values
> sparql model.ttl "SELECT ?point ?val WHERE { ?point a s223:QuantifiableObservableProperty ; rdfs:label ?name ; bacnet:presentValue ?val . FILTER regex(?name, 'Temp', 'i') }"
"""
# 'property[index]' matching
property_index_re = re.compile(r"^([A-Za-z-]+)(?:\[([0-9]+)\])?$")
# globals
app: Optional[Application] = None
def load_graph(path: str) -> Graph:
g = Graph()
g.parse(path, format="turtle")
return g
def run_sparql(model_path: str, query_text: str) -> None:
g = load_graph(model_path)
res = g.query(query_text)
# Correctly handle boolean (ASK) vs Table (SELECT) results
if res.type == "ASK":
print(res.askAnswer)
return
# Handle SELECT results (iterable rows)
rows = list(res)
if not rows:
print("(no results)")
return
for row in rows:
# Clean up output: remove huge URIs for readability if possible
clean_row = []
for v in row:
s = str(v)
# Optional: Shorten standard prefixes for cleaner shell output
s = s.replace("http://data.ashrae.org/standard223#", "s223:")
s = s.replace("urn:bacnet-autoscan/bacnet#", "bacnet:")
s = s.replace("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdf:")
s = s.replace("http://www.w3.org/2000/01/rdf-schema#", "rdfs:")
clean_row.append(s)
print(" | ".join(clean_row))
def run_shacl(model_path: str, shapes_path: str, inplace: bool = False) -> None:
data_g = load_graph(model_path)
env = OntoEnv(temporary=True, no_search=True)
sid = env.add(shapes_path)
shacl_g = env.get_graph(sid)
env.import_dependencies(shacl_g)
valid, report_graph, report_text = pyshacl.validate(
data_graph=data_g,
shacl_graph=shacl_g,
ont_graph=shacl_g,
advanced=True,
inplace=inplace,
js=True,
allow_warnings=True,
)
print(report_text)
print(f"Valid? {valid}")
@bacpypes_debugging
class InteractiveCmd(Cmd):
"""
Interactive BACnet Console with added RDF/SHACL capabilities.
"""
async def do_sparql(self, model_path: str, query: str) -> None:
"""
Run a SPARQL query on a Turtle model file.
NOTE: Enclose the query in quotes!
usage: sparql <model.ttl> <query_string>
example: sparql model.ttl "SELECT ?s WHERE { ?s rdf:type s223:QuantifiableObservableProperty }"
"""
# Inject standard prefixes if the user was lazy
if "PREFIX" not in query.upper():
query = (
"PREFIX s223: <http://data.ashrae.org/standard223#>\n"
"PREFIX bacnet: <urn:bacnet-autoscan/bacnet#>\n"
"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n"
"PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>\n"
+ query
)
print(f"Running SPARQL on {model_path}...")
# Run in a separate thread to avoid blocking BACnet traffic
try:
await asyncio.to_thread(run_sparql, model_path, query)
except Exception as e:
print(f"SPARQL Error: {e}")
async def do_shacl(self, model_path: str, shapes_path: str) -> None:
"""
Validate a model against a SHACL shapes file.
usage: shacl <model.ttl> <shapes.ttl>
"""
print(f"Validating {model_path} against {shapes_path}...")
# Run in a separate thread because pyshacl is heavy/blocking
try:
await asyncio.to_thread(run_shacl, model_path, shapes_path)
except Exception as e:
print(f"SHACL Error: {e}")
async def do_whois(
self, low_limit: Optional[int] = None, high_limit: Optional[int] = None
) -> None:
"""
Send a Who-Is request and print responses.
usage: whois [ low_limit high_limit ]
"""
print(f"Broadcasting Who-Is {low_limit if low_limit else ''} {high_limit if high_limit else ''}...")
i_ams = await app.who_is(low_limit, high_limit)
if not i_ams:
print("No response(s) received")
else:
for i_am in i_ams:
dev_addr: Address = i_am.pduSource
dev_id: ObjectIdentifier = i_am.iAmDeviceIdentifier
vendor_id = i_am.vendorID
print(f"Device {dev_id} @ {dev_addr} (Vendor: {vendor_id})")
async def do_objects(self, address: Address, instance_id: int) -> None:
"""
List all objects in a specific device.
Includes fallback logic if the device does not support bulk object-list reads.
usage: objects <ip_address> <device_instance_id>
example: objects 192.168.1.10 1001
"""
device_identifier = ObjectIdentifier(f"device,{instance_id}")
print(f"Reading object-list from {device_identifier} @ {address}...")
object_list = []
# 1. Try reading entire array at once (Fastest)
try:
object_list = await app.read_property(
address, device_identifier, "object-list"
)
except (AbortPDU, ErrorRejectAbortNack) as e:
print(f"Standard read failed ({e}), attempting fallback method...")
# 2. FALLBACK: Read Length, then read index-by-index
try:
list_len = await app.read_property(
address, device_identifier, "object-list", array_index=0
)
print(f"Device contains {list_len} objects. Reading one by one...")
for i in range(list_len):
obj_id = await app.read_property(
address, device_identifier, "object-list", array_index=i + 1
)
object_list.append(obj_id)
if i % 10 == 0:
print(".", end="", flush=True)
print() # Newline
except Exception as err:
print(f"Failed to read object list: {err}")
return
print(f"Found {len(object_list)} objects:")
for obj in object_list:
# Optional: Try to get the name for a nicer display
try:
name = await app.read_property(address, obj, "object-name")
except:
name = "???"
print(f" - {obj} : {name}")
async def do_read(
self,
address: Address,
object_identifier: ObjectIdentifier,
property_identifier: str,
) -> None:
"""
Read a single property.
usage: read <address> <objid> <prop>
example: read 192.168.1.10 analog-value,1 present-value
"""
# Split the property identifier and its index
property_index_match = property_index_re.match(property_identifier)
if not property_index_match:
print("Property specification incorrect")
return
prop_id, array_index = property_index_match.groups()
if array_index is not None:
array_index = int(array_index)
print(f"Reading {object_identifier} {property_identifier} from {address}...")
try:
value = await app.read_property(
address, object_identifier, prop_id, array_index
)
if isinstance(value, AnyAtomic):
value = value.get_value()
print(f" = {value}")
except ErrorRejectAbortNack as err:
print(f" ! Error: {err}")
async def do_write(
self,
address: Address,
object_identifier: ObjectIdentifier,
property_identifier: str,
value: str,
priority: int = -1,
) -> None:
"""
Write a property value.
usage: write <address> <objid> <prop> <value> [priority]
example: write 192.168.1.10 analog-value,1 present-value 50.0 8
"""
# Parse property index
property_index_match = property_index_re.match(property_identifier)
if not property_index_match:
print("Property specification incorrect")
return
prop_id, array_index = property_index_match.groups()
if array_index is not None:
array_index = int(array_index)
# Handle 'null' for releasing overrides
if value.lower() == "null":
if priority == -1:
print("Error: 'null' can only be used with a specific priority level.")
return
value = Null(())
try:
print(f"Writing to {object_identifier}...")
await app.write_property(
address,
object_identifier,
prop_id,
value,
array_index,
priority,
)
print(" Write successful (Ack received).")
except ErrorRejectAbortNack as err:
print(f" ! Write failed: {err}")
async def do_priority(
self,
address: Address,
object_identifier: ObjectIdentifier,
) -> None:
"""
Display the Priority Array of an object.
usage: priority <address> <objid>
"""
try:
response = await app.read_property(
address, object_identifier, "priority-array"
)
if not response:
print("Priority array is empty or None.")
return
print(f"Priority Array for {object_identifier}:")
has_entries = False
for index, priority_value in enumerate(response):
val_type = priority_value._choice
val = getattr(priority_value, val_type, None)
# Only print slots that are NOT null
if val_type != "null":
has_entries = True
if isinstance(val, AnyAtomic):
val = val.get_value()
print(f" [{index + 1}] : {val} ({val_type})")
if not has_entries:
print(" (All slots are NULL/Relinquished)")
except ErrorRejectAbortNack as err:
print(f"Error reading priority-array: {err}")
async def do_rpm(self, address: Address, *args: str) -> None:
"""
Read Property Multiple (Advanced Debugging).
usage: rpm <address> ( <objid> ( <prop[indx]> )... )...
"""
args_list = list(args)
# Get device info for correct datatype parsing
device_info = await app.device_info_cache.get_device_info(address)
vendor_info = get_vendor_info(
device_info.vendor_identifier if device_info else 0
)
parameter_list = []
while args_list:
obj_id = vendor_info.object_identifier(args_list.pop(0))
obj_class = vendor_info.get_object_class(obj_id[0])
if not obj_class:
print(f"Unknown object type: {obj_id}")
return
parameter_list.append(obj_id)
property_reference_list = []
while args_list:
prop_ref = PropertyReference(args_list.pop(0), vendor_info=vendor_info)
property_reference_list.append(prop_ref)
if args_list and ((":" in args_list[0]) or ("," in args_list[0])):
break
parameter_list.append(property_reference_list)
if not parameter_list:
print("Object identifier expected")
return
try:
response = await app.read_property_multiple(address, parameter_list)
for (obj_id, prop_id, arr_index, prop_value) in response:
print(f"{obj_id} {prop_id}{f'[{arr_index}]' if arr_index is not None else ''} = {prop_value}")
if isinstance(prop_value, ErrorType):
print(f" Error: {prop_value}")
except ErrorRejectAbortNack as err:
print(f"RPM Failed: {err}")
async def do_whohas(self, *args: str) -> None:
"""
Find devices containing a specific object ID or Name.
usage: whohas [ low_limit high_limit ] [ objid ] [ objname ]
"""
args_list = list(args)
low_limit = int(args_list.pop(0)) if args_list and args_list[0].isdigit() else None
high_limit = int(args_list.pop(0)) if args_list and args_list[0].isdigit() else None
obj_id = None
obj_name = None
if args_list:
try:
obj_id = ObjectIdentifier(args_list[0])
args_list.pop(0)
except ValueError:
pass
if args_list:
obj_name = args_list[0]
if obj_id is None and obj_name is None:
print("Usage: whohas [limits] <objid> OR <objname>")
return
print(f"Searching for {obj_id if obj_id else ''} {obj_name if obj_name else ''}...")
i_haves = await app.who_has(low_limit, high_limit, obj_id, obj_name)
if not i_haves:
print("No response(s)")
else:
for i_have in i_haves:
print(f"Device {i_have.deviceIdentifier} @ {i_have.pduSource} has {i_have.objectIdentifier} '{i_have.objectName}'")
async def do_router(self, address: Optional[Address] = None, network: Optional[int] = None) -> None:
"""
Discover BACnet routers.
usage: router [address] [network]
"""
print(f"Sending Who-Is-Router-To-Network...")
if not app.nse:
print("Network Service Element not enabled.")
return
result = await app.nse.who_is_router_to_network(destination=address, network=network)
if not result:
print("No routers found.")
return
for adapter, i_am_router in result:
# Logic to display router info
print(f"Router @ {i_am_router.pduSource} serves networks: {i_am_router.iartnNetworkList}")
def build_model_cli_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="tester.py model")
sub = p.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("sparql")
sp.add_argument("--model", required=True)
g = sp.add_mutually_exclusive_group(required=True)
g.add_argument("--query", help="SPARQL query string")
g.add_argument("--query-file", help="Path to .rq file")
va = sub.add_parser("validate")
va.add_argument("--model", required=True)
va.add_argument("--shapes", required=True)
va.add_argument("--inplace", action="store_true")
return p
async def main() -> None:
# NEW: if first arg is "model", run offline tooling and exit
if len(sys.argv) > 1 and sys.argv[1] == "model":
p = build_model_cli_parser()
args = p.parse_args(sys.argv[2:])
if args.cmd == "sparql":
if args.query_file:
query_text = Path(args.query_file).read_text(encoding="utf-8")
else:
query_text = args.query
# optional convenience prefixes if user didn't include them
if "PREFIX" not in query_text.upper():
query_text = (
"PREFIX s223: <http://data.ashrae.org/standard223#>\n"
"PREFIX bacnet: <urn:bacnet-autoscan/bacnet#>\n"
"PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>\n"
"PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>\n"
+ query_text
)
run_sparql(args.model, query_text)
return
if args.cmd == "validate":
run_shacl(args.model, args.shapes, inplace=args.inplace)
return
# otherwise: EXISTING interactive shell behavior (unchanged)
global app
parser = SimpleArgumentParser()
args = parser.parse_args()
console = Console()
cmd = InteractiveCmd()
bind(console, cmd)
app = Application.from_args(args)
print("\n--- Interactive BACnet Shell ---")
print("Type 'help' for commands (whois, read, write, objects, priority, etc.)")
print("--------------------------------\n")
try:
await console.fini.wait()
except KeyboardInterrupt:
pass
finally:
if app:
app.close()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment