Created
May 24, 2020 16:39
-
-
Save olieidel/edc78949bef6106beb077f92deb85393 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
# A combined poller / calibrator when reading data from the BME680 and | |
# SGP30 sensors with a raspberry pi. | |
import csv | |
import datetime | |
import os | |
import time | |
import adafruit_sgp30 | |
import bme680 | |
import board | |
import busio | |
DATA_FILE = 'data-2.csv' | |
POLLING_INTERVAL = 20 | |
DEBUG = False | |
def absolute_humidity(temperature, humidity): | |
"""Taken from a BME280 library (see below). This ignores the pressure | |
but should be accurate enough. | |
https://github.com/finitespace/BME280/blob/7a211f03aa3ac5567b14e2cbc12ac21e185b6e0b/src/EnvironmentCalculations.cpp#L65 | |
""" | |
mw = 18.01534 # molar mass of water g/mol | |
r = 8.31447215 # Universal gas constant J/mol/K | |
temp = pow(2.718281828, (17.67 * temperature) / (temperature + 243.5)) | |
return (6.112 * temp * humidity * mw) / ((273.15 + temperature) * r) | |
# SGP30 | |
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000) | |
# Create library object on our I2C port | |
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c) | |
print("SGP30 serial #", [hex(i) for i in sgp30.serial]) | |
sgp30.iaq_init() | |
# TODO: Set baseline | |
# sgp30.set_iaq_baseline(0x8973, 0x8aae) | |
# BME680 | |
try: | |
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY) | |
except IOError: | |
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY) | |
# These calibration data can safely be commented | |
# out, if desired. | |
print('Calibration data:') | |
for name in dir(sensor.calibration_data): | |
if not name.startswith('_'): | |
value = getattr(sensor.calibration_data, name) | |
if isinstance(value, int): | |
print('{}: {}'.format(name, value)) | |
# These oversampling settings can be tweaked to | |
# change the balance between accuracy and noise in | |
# the data. | |
sensor.set_humidity_oversample(bme680.OS_2X) | |
sensor.set_pressure_oversample(bme680.OS_4X) | |
sensor.set_temperature_oversample(bme680.OS_8X) | |
sensor.set_filter(bme680.FILTER_SIZE_3) | |
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) | |
print('\n\nInitial reading:') | |
for name in dir(sensor.data): | |
value = getattr(sensor.data, name) | |
if not name.startswith('_'): | |
print('{}: {}'.format(name, value)) | |
sensor.set_gas_heater_temperature(320) | |
sensor.set_gas_heater_duration(150) | |
sensor.select_gas_heater_profile(0) | |
# Up to 10 heater profiles can be configured, each | |
# with their own temperature and duration. | |
# sensor.set_gas_heater_profile(200, 150, nb_profile=1) | |
# sensor.select_gas_heater_profile(1) | |
fieldnames = ['Datetime', 'Temperature', 'Pressure', 'Humidity', | |
'eCO2', 'TVOC', 'Gas Resistance', 'Baseline eCO2', | |
'Baseline TVOC'] | |
if not os.path.isfile(DATA_FILE): | |
with open(DATA_FILE, 'w', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
writer.writeheader() | |
print('\n\nPolling:') | |
try: | |
while True: | |
# print("**** Baseline values: eCO2 = 0x%x, TVOC = 0x%x" | |
# % (sgp30.baseline_eCO2, sgp30.baseline_TVOC)) | |
if sensor.get_sensor_data(): | |
with open(DATA_FILE, 'a', newline='') as f: | |
temperature = sensor.data.temperature | |
humidity = sensor.data.humidity | |
entry = { | |
'Datetime': datetime.datetime.now(), | |
'Temperature': temperature, | |
'Pressure': sensor.data.pressure, | |
'Humidity': humidity, | |
'eCO2': sgp30.eCO2, | |
'TVOC': sgp30.TVOC, | |
'Gas Resistance': sensor.data.gas_resistance, | |
'Baseline eCO2': sgp30.baseline_eCO2, | |
'Baseline TVOC': sgp30.baseline_TVOC, | |
} | |
if DEBUG: | |
print(entry) | |
# Update humidity compensation on the SGP30 | |
sgp30.set_iaq_humidity( | |
gramsPM3=absolute_humidity( | |
humidity=humidity, | |
temperature=temperature, | |
), | |
) | |
if not DEBUG: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
writer.writerow(entry) | |
time.sleep(POLLING_INTERVAL) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment