Last active
September 18, 2023 14:58
-
-
Save bbartling/88e95557b0b297bea721c0f285657d53 to your computer and use it in GitHub Desktop.
simple BAC0 scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import BAC0 | |
import time | |
import random | |
bacnet = BAC0.lite() | |
while True: | |
# MSTP device hardware address 2 on MSTP network 12345 | |
read_sensor_str = f'12345:2 analogInput 2 presentValue' | |
print("Executing read_sensor_str statement:", read_sensor_str) | |
sensor = bacnet.read(read_sensor_str) | |
print("sensor: ",sensor) | |
# write to rasp pi BACnet server running bacpypes | |
write_vals = f'192.168.0.101/24 analogValue 1 presentValue {sensor} - 10' | |
bacnet.write(write_vals) | |
print("Executed write_vals statement:", write_vals) | |
# read to rasp pi BACnet server running bacpypes calc's | |
read_future_data_str = f'192.168.0.101/24 analogValue 2 presentValue' | |
print("Executing read_future_data_str statement:", read_future_data_str) | |
future_data = bacnet.read(read_future_data_str) | |
print(future_data) | |
# read to rasp pi BACnet server running bacpypes calc's | |
read_rate_of_change_str = f'192.168.0.101/24 analogValue 3 presentValue' | |
print("Executing read_rate_of_change_str statement:", read_rate_of_change_str) | |
rate_of_change = bacnet.read(read_rate_of_change_str) | |
print(rate_of_change) | |
time.sleep(60) | |
bacnet.disconnect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import BAC0, time, random | |
bacnet = BAC0.lite() | |
time.sleep(2) | |
devices = bacnet.whois(global_broadcast=True) | |
device_mapping = {} | |
addresses = [] | |
for device in devices: | |
if isinstance(device, tuple): | |
device_mapping[device[1]] = device[0] | |
print("Detected device %s with address %s" % (str(device[1]), str(device[0]))) | |
print(device_mapping) | |
print((str(len(device_mapping)) + " devices discovered on network.")) | |
for bacnet_inst,address in device_mapping.items(): | |
if bacnet_inst < 100: #JCI boxes have inst ID less than 100 on this site | |
addresses.append(address) | |
print("addresses is: ",addresses) | |
''' | |
addresses = [ | |
"11:2","11:4","11:5","12:22","11:7", | |
"12:23","11:8","11:9","12:24","11:10", | |
"12:25","11:11","12:26","12:27","11:12", | |
"12:28","12:29","11:13","12:30","11:14", | |
"12:31","11:15","12:32","11:16","12:33", | |
"11:17","12:34","12:35","11:18","11:39", | |
"11:19","12:36","11:20","12:40", | |
"11:21","12:41","11:37","11:38", | |
] | |
''' | |
# change VAV box space setpoint MIN MAX VALUES | |
object_type = 'analogValue' | |
object_instance = '1103' | |
priority = 'default' | |
min_value = 55.0 | |
max_value = 85.0 | |
for address in addresses: | |
try: | |
print("Doing DEVICE: ",address) | |
read_minPresValue = f'{address} {object_type} {object_instance} minPresValue' | |
check_min = bacnet.read(read_minPresValue) | |
print("check_min: ",check_min) | |
if check_min != min_value: | |
write_minPresValue = f'{address} {object_type} {object_instance} minPresValue {min_value} - {priority}' | |
write_min = bacnet.write(write_minPresValue) | |
print(write_min) | |
read_minPresValue = f'{address} {object_type} {object_instance} minPresValue' | |
check_min = bacnet.read(read_minPresValue) | |
print("check_min: ",check_min) | |
else: | |
print("MINS are good on device: {address}") | |
read_maxPresValue = f'{address} {object_type} {object_instance} maxPresValue' | |
check_max = bacnet.read(read_maxPresValue) | |
print("check_max: ",check_max) | |
if check_max != max_value: | |
write_maxPresValue = f'{address} {object_type} {object_instance} maxPresValue {max_value} - {priority}' | |
write_max = bacnet.write(write_maxPresValue) | |
print(write_max) | |
read_maxPresValue = f'{address} {object_type} {object_instance} maxPresValue' | |
check_max = bacnet.read(read_maxPresValue) | |
print("check_max: ",check_max) | |
else: | |
print("MAX are good on device: {address}") | |
print("*********************************") | |
except Exception as error: | |
print(f"OOF error on device {address}: {error}") | |
print("ALL DIGITY DONE!!!") | |
bacnet.disconnect() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import BAC0, time, random | |
bacnet = BAC0.lite() | |
time.sleep(2) | |
devices = bacnet.whois(global_broadcast=True) | |
device_mapping = {} | |
addresses = [] | |
for device in devices: | |
if isinstance(device, tuple): | |
device_mapping[device[1]] = device[0] | |
print("Detected device %s with address %s" % (str(device[1]), str(device[0]))) | |
print(device_mapping) | |
print((str(len(device_mapping)) + " devices discovered on network.")) | |
for bacnet_inst,address in device_mapping.items(): | |
if bacnet_inst < 100: #JCI boxes have inst ID less than 100 on this site | |
addresses.append(address) | |
print("addresses is: ",addresses) | |
''' | |
addresses = [ | |
"11:2","11:4","11:5","12:22","11:7", | |
"12:23","11:8","11:9","12:24","11:10", | |
"12:25","11:11","12:26","12:27","11:12", | |
"12:28","12:29","11:13","12:30","11:14", | |
"12:31","11:15","12:32","11:16","12:33", | |
"11:17","12:34","12:35","11:18","11:39", | |
"11:19","12:36","11:20","12:40", | |
"11:21","12:41","11:37","11:38", | |
] | |
''' | |
# change VAV box space setpoint MIN MAX VALUES | |
object_type = 'analogOutput' | |
object_instance = '2014' | |
priority = '10' | |
for address in addresses: | |
try: | |
print("Doing DEVICE: ",address) | |
release_me = f'{address} {object_type} {object_instance} presentValue null - {priority}' | |
bacnet.write(release_me) | |
except Exception as error: | |
print(f"error! {error}") | |
print("ALL DIGITY DONE!!!") | |
bacnet.disconnect() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import schedule | |
import time | |
from datetime import datetime, timedelta | |
import BAC0 | |
bacnet = BAC0.lite() | |
address = '12345:2' | |
sensor_object_type = 'analogInput' | |
sensor_object_instance = '2' | |
cmd_object_type = 'analogValue' | |
cmd_object_instance = '302' | |
priority = 10 | |
# adr event | |
hi_zone_temp_release = 80.0 | |
duration_minutes = 30 | |
start_time = datetime.now() | |
end_time = start_time + timedelta(minutes=duration_minutes) | |
def half_time_job(): | |
print("Checking BAS sensor!!!!") | |
read_vals = f'{address} {sensor_object_type} {sensor_object_instance} presentValue' | |
check = bacnet.read(read_vals) | |
print("half_time_job sensor value is: ",check) | |
if check >= hi_zone_temp_release: | |
# create write statement and run | |
write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue null - {priority}' | |
print("Release triggered half_time_job:", write_vals) | |
bacnet.write(write_vals) | |
print('Release triggered half_time_jo GOOD!!!') | |
print("ON START WRITING OVERRIDES TO THE BAS!!!") | |
# create write statement and run | |
write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue 0 - {priority}' | |
print("Excecuting write_vals statement:", write_vals) | |
bacnet.write(write_vals) | |
print('EVENT START OVERRIDES GOOD!!!') | |
""" | |
Schedule reoccuring tasks | |
""" | |
print("scheduling HALF TIME EVENT!!!!") | |
schedule.every(1).minutes.do(half_time_job) | |
# if current time is greater than endtime, break | |
while datetime.now() < end_time: | |
schedule.run_pending() | |
time.sleep(1) | |
print("HALF TIME OVER!!!!") | |
write_vals = f'{address} {cmd_object_type} {cmd_object_instance} presentValue null - {priority}' | |
print("End EVENT RELEASE!!!!:", write_vals) | |
bacnet.write(write_vals) | |
print('End EVENT RELEASE!!!! GOOD!!!') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import BAC0, time | |
bacnet = BAC0.lite(ip="10.7.6.202/24") | |
address = "10.7.6.201/24" | |
object_type = "analogValue" | |
object_instance = "1" | |
while True: | |
dr_signal = f"{address} {object_type} {object_instance} presentValue" | |
check = bacnet.read(dr_signal) | |
print(f"The signal level is {check}") | |
time.sleep(60) | |
bacnet.disconnect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import BAC0 | |
import time | |
import random | |
bacnet = BAC0.lite() | |
address = "192.168.0.101/24" | |
object_type = "analogValue" | |
object_instance = "1" | |
priority = 10 # BACnet priority | |
# Replace with the actual latitude and longitude of the desired location | |
location = '46.7867,-92.1005' | |
def get_current_temperature(location): | |
points_url = f"https://api.weather.gov/points/{location}" | |
response = requests.get(points_url) | |
if response.status_code != 200: | |
print("Failed to retrieve location data.") | |
return None | |
data = response.json() | |
forecast_url = data["properties"]["forecast"] | |
response = requests.get(forecast_url) | |
if response.status_code != 200: | |
print("Failed to retrieve forecast data.") | |
return None | |
data = response.json() | |
temperature = data["properties"]["periods"][0]["temperature"] | |
return temperature | |
while True: | |
temperature = get_current_temperature(location) | |
if temperature is not None: | |
print(f"Current temperature for the location: {temperature}°F") | |
# Write the retrieved temperature to the BACnet device | |
write_vals = f'{address} {object_type} {object_instance} presentValue {temperature} - {priority}' | |
bacnet.write(write_vals) | |
else: | |
print("Temperature data not available.") | |
time.sleep(60) | |
bacnet.disconnect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment