Created
January 17, 2021 19:48
-
-
Save olieidel/25c1072b2fa8a53528971c18e5a5997a to your computer and use it in GitHub Desktop.
Ancient python code which I coded ages ago to analyse air quality with the BME680 and SGP30 sensors. Has some nifty calibration stuff in there which I couldn't find anywhere else, at least not in this combination. It was part of a django app, that's why it imports the "models". That's also how it persists data.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import time | |
import threading | |
import adafruit_sgp30 | |
import bme680 | |
import board | |
import busio | |
from piaq.monitor.models import SensorReading, SensorSettings | |
def absolute_humidity(temperature, humidity): | |
"""Taken from a Pull Request to the Adafruit SGP30 Arduino library: | |
https://github.com/adafruit/Adafruit_SGP30/pull/4 | |
This refers to a formula mentioned in the SGP30 sensor docs: | |
https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/9_Gas_Sensors/Sensirion_Gas_Sensors_SGP30_Driver-Integration-Guide_SW_I2C.pdf | |
""" | |
t = temperature | |
rh = humidity | |
ah = 216.7 * \ | |
((rh / 100.0) * 6.112 * math.exp((17.62 * t) / (243.12 + t))) / \ | |
(273.15 + t) | |
return ah | |
class PollingThread(threading.Thread): | |
def __init__(self, polling_interval): | |
self.polling_interval = polling_interval | |
self.stoprequest = threading.Event() | |
self.bme680 = None | |
self.sgp30 = None | |
self._init_bme680() | |
self._init_sgp30() | |
super().__init__() | |
def _init_bme680(self): | |
try: | |
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY) | |
except IOError: | |
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY) | |
sensor.set_humidity_oversample(bme680.OS_2X) | |
sensor.set_pressure_oversample(bme680.OS_4X) | |
sensor.set_temperature_oversample(bme680.OS_8X) | |
sensor.set_filter(bme680.FILTER_SIZE_3) | |
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) | |
sensor.set_gas_heater_temperature(320) | |
sensor.set_gas_heater_duration(150) | |
sensor.select_gas_heater_profile(0) | |
self.bme680 = sensor | |
def _init_sgp30(self): | |
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000) | |
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c) | |
sgp30.iaq_init() | |
self.sgp30 = sgp30 | |
self._set_sgp30_baseline() | |
def _set_sgp30_baseline(self): | |
last_reading = SensorReading.objects.exclude( | |
# Exclude readings where both values are 0, this can | |
# happen when the sensor is warming up | |
baseline_eCO2=0, | |
baseline_TVOC=0, | |
).order_by('-time').first() | |
if last_reading: | |
self.sgp30.set_iaq_baseline( | |
eCO2=last_reading.baseline_eCO2, | |
TVOC=last_reading.baseline_TVOC, | |
) | |
def _set_sgp30_humidity_compensation(self, humidity, temperature): | |
"""Update humidity compensation on the SGP30""" | |
self.sgp30.set_iaq_humidity( | |
gramsPM3=absolute_humidity( | |
humidity=humidity, | |
temperature=temperature, | |
), | |
) | |
def _should_stop(self): | |
settings = SensorSettings.objects.first() | |
return (not settings.is_started) or self.stoprequest.isSet() | |
def _read_and_save(self): | |
got_data = self.bme680.get_sensor_data() | |
if not got_data: | |
return | |
temperature = self.bme680.data.temperature | |
humidity = self.bme680.data.humidity | |
SensorReading.objects.create( | |
temperature=temperature, | |
humidity=humidity, | |
pressure=self.bme680.data.pressure, | |
gas_resistance=self.bme680.data.gas_resistance, | |
eCO2=self.sgp30.eCO2, | |
TVOC=self.sgp30.TVOC, | |
baseline_eCO2=self.sgp30.baseline_eCO2, | |
baseline_TVOC=self.sgp30.baseline_TVOC, | |
) | |
self._set_sgp30_humidity_compensation(temperature, humidity) | |
def join(self, timeout=None): | |
self.stoprequest.set() | |
super().join(timeout) | |
def run(self): | |
while not self._should_stop(): | |
self._read_and_save() | |
time.sleep(self.polling_interval) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment