Created
October 19, 2019 19:41
-
-
Save ChristianTremblay/9bbd655775982730e62e538e44512771 to your computer and use it in GitHub Desktop.
Example of using BAC0 to create a BACnet device. (Meteo parser not included)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import atoma, requests | |
import BAC0 | |
import re | |
from meteo_parser import MeteoGC | |
from bacpypes.basetypes import EngineeringUnits, DateTime | |
from bacpypes.primitivedata import CharacterString, Date, Time | |
from BAC0.core.devices.create_objects import ( | |
create_AV, | |
create_MV, | |
create_BV, | |
create_AI, | |
create_BI, | |
create_AO, | |
create_BO, | |
create_CharStrValue, | |
create_DateTimeValue, | |
) | |
from BAC0.tasks.RecurringTask import RecurringTask | |
import time | |
def start_device(): | |
print("Starting BACnet device") | |
new_device = BAC0.lite(deviceId=10031) | |
time.sleep(1) | |
# Analog Values | |
av = [] | |
current_temp = create_AV(oid=1, name="Current_Temp", pv=0, pv_writable=False) | |
current_temp.units = EngineeringUnits("degreesCelsius") | |
current_temp.description = CharacterString("Current Temperature in degC") | |
av.append(current_temp) | |
current_pressure = create_AV( | |
oid=2, name="Current_Pressure", pv=0, pv_writable=False | |
) | |
current_pressure.units = EngineeringUnits("kilopascals") | |
current_pressure.description = CharacterString("Current Pressure in kPa") | |
av.append(current_pressure) | |
current_dewpoint = create_AV( | |
oid=3, name="Current_DewPoint", pv=0, pv_writable=False | |
) | |
current_dewpoint.units = EngineeringUnits("degreesCelsius") | |
current_dewpoint.description = CharacterString("Current Dew Point in degC") | |
av.append(current_dewpoint) | |
current_humidity = create_AV( | |
oid=4, name="Current_Humidity", pv=0, pv_writable=False | |
) | |
current_humidity.units = EngineeringUnits("percentRelativeHumidity") | |
current_humidity.description = CharacterString( | |
"Current Humidity in percent relative humidity" | |
) | |
av.append(current_humidity) | |
current_wind_speed = create_AV( | |
oid=5, name="Current_Wind_Speed", pv=0, pv_writable=False | |
) | |
current_wind_speed.units = EngineeringUnits("kilometersPerHour") | |
current_wind_speed.description = CharacterString("Current Wind Speed in km/h") | |
av.append(current_wind_speed) | |
current_visibility = create_AV( | |
oid=6, name="Current_Visibility", pv=0, pv_writable=False | |
) | |
current_visibility.units = EngineeringUnits("kilometers") | |
current_visibility.description = CharacterString("Current Visibility in km") | |
av.append(current_visibility) | |
# CharacterStringValues | |
charstr = [] | |
default_pv = CharacterString("empty") | |
location = create_CharStrValue(oid=1, name="Location", pv=default_pv) | |
charstr.append(location) | |
current_pressure_tend = create_CharStrValue( | |
oid=2, name="Current_Press_Tend", pv=default_pv | |
) | |
charstr.append(current_pressure_tend) | |
current_conditions = create_CharStrValue( | |
oid=3, name="Current_Conditions", pv=default_pv | |
) | |
charstr.append(current_conditions) | |
current_wind_direction = create_CharStrValue( | |
oid=4, name="Current_Wind_Dir", pv=default_pv | |
) | |
charstr.append(current_wind_direction) | |
# DateTimeValueObjects | |
dtv = [] | |
updated = create_DateTimeValue( | |
oid=1, name="Current_Conditions_Update", date=None, time=None | |
) | |
dtv.append(updated) | |
for each in av: | |
# print(each) | |
new_device.this_application.add_object(each) | |
for each in charstr: | |
new_device.this_application.add_object(each) | |
for each in dtv: | |
new_device.this_application.add_object(each) | |
return new_device | |
class App: | |
dev = start_device() | |
meteo = MeteoGC(city="qc-147", lang="e") | |
app = App() | |
def update(): | |
# print(app.meteo.actual_conditions) | |
app.meteo.update() | |
current_temp = app.dev.this_application.get_object_id(("analogValue", 1)) | |
current_pressure = app.dev.this_application.get_object_id(("analogValue", 2)) | |
current_dewpoint = app.dev.this_application.get_object_id(("analogValue", 3)) | |
current_humidity = app.dev.this_application.get_object_id(("analogValue", 4)) | |
current_wind_speed = app.dev.this_application.get_object_id(("analogValue", 5)) | |
current_visibility = app.dev.this_application.get_object_id(("analogValue", 6)) | |
location = app.dev.this_application.get_object_id(("characterstringValue", 1)) | |
current_pressure_tend = app.dev.this_application.get_object_id( | |
("characterstringValue", 2) | |
) | |
current_conditions = app.dev.this_application.get_object_id( | |
("characterstringValue", 3) | |
) | |
current_wind_direction = app.dev.this_application.get_object_id( | |
("characterstringValue", 4) | |
) | |
updated = app.dev.this_application.get_object_id(("datetimeValue", 1)) | |
current_temp.presentValue = app.meteo.actual_conditions["temperature"][0] | |
current_pressure.presentValue = app.meteo.actual_conditions["pressure / tendency"][ | |
0 | |
] | |
current_dewpoint.presentValue = app.meteo.actual_conditions.get( | |
"dewpoint", ("", "") | |
)[0] | |
current_humidity.presentValue = app.meteo.actual_conditions["humidity"][0] | |
current_wind_speed.presentValue = app.meteo.actual_conditions["wind"][1] | |
current_visibility.presentValue = app.meteo.actual_conditions["visibility"][0] | |
location.presentValue = CharacterString(app.meteo.actual_conditions["location"]) | |
current_pressure_tend.presentValue = CharacterString( | |
app.meteo.actual_conditions["pressure / tendency"][2] | |
) | |
current_conditions.presentValue = CharacterString( | |
app.meteo.actual_conditions["condition"] | |
) | |
current_wind_direction.presentValue = CharacterString( | |
app.meteo.actual_conditions["wind"][0] | |
) | |
updated.presentValue.date = Date( | |
app.meteo.actual_conditions["date"].date().isoformat() | |
) | |
updated.presentValue.time = Time( | |
app.meteo.actual_conditions["date"].time().isoformat() | |
) | |
def main(): | |
task_device = RecurringTask(update, delay=300) | |
task_device.start() | |
while True: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment