Created
June 15, 2022 06:13
-
-
Save JoelBender/df2a4d365ee4bf2b842002a1670f1d56 to your computer and use it in GitHub Desktop.
Put in a "fuzz" layer in an application stack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This application presents a 'console' prompt to the user asking for Who-Is | |
commands which create the related APDUs and sends them through a "fuzz" layer | |
for additional processing. | |
""" | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.consolecmd import ConsoleCmd | |
from bacpypes.core import run, enable_sleeping | |
from bacpypes.pdu import Address, GlobalBroadcast | |
from bacpypes.comm import bind, Client, Server | |
from bacpypes.app import ApplicationIOController | |
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint | |
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement | |
from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer | |
from bacpypes.local.device import LocalDeviceObject | |
from bacpypes.service.device import WhoIsIAmServices | |
# some debugging | |
_debug = 1 | |
_log = ModuleLogger(globals()) | |
# globals | |
this_device = None | |
this_application = None | |
@bacpypes_debugging | |
class Fuzz(Client, Server): | |
def __init__(self, cid=None, sid=None): | |
if _debug: | |
Fuzz._debug("__init__ cid=%r sid=%r", cid, sid) | |
Client.__init__(self, cid) | |
Server.__init__(self, sid) | |
def confirmation(self, *args, **kwargs): | |
print("Fuzz.confirmation") | |
for i, arg in enumerate(args): | |
print(" - args[{:d}]: {!r}".format(i, arg)) | |
if hasattr(arg, "debug_contents"): | |
arg.debug_contents(2) | |
for key, value in kwargs.items(): | |
print(" - kwargs[{!r}]: {!r}".format(key, value)) | |
if hasattr(value, "debug_contents"): | |
value.debug_contents(2) | |
if self.serverPeer: | |
self.response(*args, **kwargs) | |
def indication(self, *args, **kwargs): | |
print("Fuzz.indication") | |
for i, arg in enumerate(args): | |
print(" - args[{:d}]: {!r}".format(i, arg)) | |
if hasattr(arg, "debug_contents"): | |
arg.debug_contents(2) | |
for key, value in kwargs.items(): | |
print(" - kwargs[{!r}]: {!r}".format(key, value)) | |
if hasattr(value, "debug_contents"): | |
value.debug_contents(2) | |
if self.clientPeer: | |
self.request(*args, **kwargs) | |
# | |
# FuzzApplication | |
# | |
@bacpypes_debugging | |
class FuzzApplication(ApplicationIOController, WhoIsIAmServices): | |
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None): | |
if _debug: | |
FuzzApplication._debug( | |
"__init__ %r %r deviceInfoCache=%r aseID=%r", | |
localDevice, | |
localAddress, | |
deviceInfoCache, | |
aseID, | |
) | |
ApplicationIOController.__init__( | |
self, localDevice, localAddress, deviceInfoCache, aseID=aseID | |
) | |
# local address might be useful for subclasses | |
if isinstance(localAddress, Address): | |
self.localAddress = localAddress | |
else: | |
self.localAddress = Address(localAddress) | |
# include a application decoder | |
self.asap = ApplicationServiceAccessPoint() | |
# pass the device object to the state machine access point so it | |
# can know if it should support segmentation | |
self.smap = StateMachineAccessPoint(localDevice) | |
# the segmentation state machines need access to the same device | |
# information cache as the application | |
self.smap.deviceInfoCache = self.deviceInfoCache | |
# a network service access point will be needed | |
self.nsap = NetworkServiceAccessPoint() | |
# give the NSAP a generic network layer service element | |
self.nse = NetworkServiceElement() | |
bind(self.nse, self.nsap) | |
# bind the top layers | |
bind(self, self.asap, self.smap, self.nsap) | |
# create a generic BIP stack, bound to the Annex J server | |
# on the UDP multiplexer | |
self.bip = BIPSimple() | |
self.annexj = AnnexJCodec() | |
# create a fuzzing layer | |
self.fuzz = Fuzz() | |
# layer just above reading/writing sockets | |
self.mux = UDPMultiplexer(self.localAddress) | |
# bind the bottom layers | |
bind(self.bip, self.annexj, self.fuzz, self.mux.annexJ) | |
# bind the BIP stack to the network, no network number | |
self.nsap.bind(self.bip, address=self.localAddress) | |
def request(self, apdu): | |
if _debug: | |
FuzzApplication._debug("request %r", apdu) | |
# forward it along | |
ApplicationIOController.request(self, apdu) | |
def indication(self, apdu): | |
if _debug: | |
FuzzApplication._debug("indication %r", apdu) | |
# forward it along | |
ApplicationIOController.indication(self, apdu) | |
def confirmation(self, apdu): | |
if _debug: | |
FuzzApplication._debug("confirmation %r", apdu) | |
# forward it along | |
ApplicationIOController.confirmation(self, apdu) | |
def close_socket(self): | |
if _debug: | |
FuzzApplication._debug("close_socket") | |
# pass to the multiplexer, then down to the sockets | |
self.mux.close_socket() | |
# | |
# WhoIsIAmConsoleCmd | |
# | |
@bacpypes_debugging | |
class WhoIsIAmConsoleCmd(ConsoleCmd): | |
def do_whois(self, args): | |
"""whois [ <addr> ] [ <lolimit> <hilimit> ]""" | |
args = args.split() | |
if _debug: | |
WhoIsIAmConsoleCmd._debug("do_whois %r", args) | |
try: | |
# gather the parameters | |
if (len(args) == 1) or (len(args) == 3): | |
addr = Address(args[0]) | |
del args[0] | |
else: | |
addr = GlobalBroadcast() | |
if len(args) == 2: | |
lolimit = int(args[0]) | |
hilimit = int(args[1]) | |
else: | |
lolimit = hilimit = None | |
# code lives in the device service | |
this_application.who_is(lolimit, hilimit, addr) | |
except Exception as error: | |
WhoIsIAmConsoleCmd._exception("exception: %r", error) | |
def do_iam(self, args): | |
"""iam""" | |
args = args.split() | |
if _debug: | |
WhoIsIAmConsoleCmd._debug("do_iam %r", args) | |
# code lives in the device service | |
this_application.i_am() | |
# | |
# __main__ | |
# | |
def main(): | |
global this_device | |
global this_application | |
# parse the command line arguments | |
args = ConfigArgumentParser(description=__doc__).parse_args() | |
if _debug: | |
_log.debug("initialization") | |
if _debug: | |
_log.debug(" - args: %r", args) | |
# make a device object | |
this_device = LocalDeviceObject( | |
objectName=args.ini.objectname, | |
objectIdentifier=int(args.ini.objectidentifier), | |
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), | |
segmentationSupported=args.ini.segmentationsupported, | |
vendorIdentifier=int(args.ini.vendoridentifier), | |
) | |
# make a simple application | |
this_application = FuzzApplication(this_device, args.ini.address) | |
# make a console | |
this_console = WhoIsIAmConsoleCmd() | |
if _debug: | |
_log.debug(" - this_console: %r", this_console) | |
# enable sleeping will help with threads | |
enable_sleeping() | |
_log.debug("running") | |
run() | |
_log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment