Skip to content

Instantly share code, notes, and snippets.

@lukefor
Created January 11, 2019 17:37
Show Gist options
  • Save lukefor/ba98a254e6cf91a1d8fb7efa05d4a9f0 to your computer and use it in GitHub Desktop.
Save lukefor/ba98a254e6cf91a1d8fb7efa05d4a9f0 to your computer and use it in GitHub Desktop.
B910 bluetooth bulb beat detection https://www.youtube.com/watch?v=VolFFilKrfU
from __future__ import print_function
import sys
import numpy
#import pymedia.audio.sound as sound
import math
import queue
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast, c_short, c_ushort, c_float, c_uint, c_int
# From https://github.com/Valodim/python-pulseaudio
from pulseaudio.lib_pulseaudio import *
import sys
import os
import time
import subprocess
import random
import signal
import binascii
import array
import colorsys
random.seed()
os.chdir(os.path.dirname(__file__))
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
for pid in pids:
try:
if "python" in open(os.path.join('/proc', pid, 'cmdline'), 'rb').read().decode("utf-8") and int(pid) != int(os.getpid()):
os.kill(int(pid), signal.SIGINT)
except IOError: # proc has already terminated
continue
# edit to match your sink
SINK_NAME = b'alsa_output.pci-0000_03_04.0.analog-stereo'
METER_RATE = 8000
MAX_SAMPLE_VALUE = 127
DISPLAY_SCALE = 2
MAX_SPACES = MAX_SAMPLE_VALUE >> DISPLAY_SCALE
Debugging = False
helperExe = os.path.join(os.path.abspath(os.path.dirname(__file__)), "bluepy-helper")
SEC_LEVEL_LOW = "low"
SEC_LEVEL_MEDIUM = "medium"
SEC_LEVEL_HIGH = "high"
class PeakMonitor(object):
def __init__(self, sink_name, rate):
self.sink_name = sink_name
self.rate = rate
# Wrap callback methods in appropriate ctypefunc instances so
# that the Pulseaudio C API can call them
self._context_notify_cb = pa_context_notify_cb_t(self.context_notify_cb)
self._sink_info_cb = pa_sink_info_cb_t(self.sink_info_cb)
self._stream_read_cb = pa_stream_request_cb_t(self.stream_read_cb)
# stream_read_cb() puts peak samples into this Queue instance
self._samples = queue.Queue()
# Create the mainloop thread and set our context_notify_cb
# method to be called when there's updates relating to the
# connection to Pulseaudio
_mainloop = pa_threaded_mainloop_new()
_mainloop_api = pa_threaded_mainloop_get_api(_mainloop)
context = pa_context_new(_mainloop_api, b"peak_demo")
pa_context_set_state_callback(context, self._context_notify_cb, None)
pa_context_connect(context, None, 0, None)
pa_threaded_mainloop_start(_mainloop)
def __iter__(self):
while True:
yield self._samples.get()
def context_notify_cb(self, context, _):
state = pa_context_get_state(context)
if state == PA_CONTEXT_READY:
print("Pulseaudio connection ready...")
# Connected to Pulseaudio. Now request that sink_info_cb
# be called with information about the available sinks.
o = pa_context_get_sink_info_list(context, self._sink_info_cb, None)
pa_operation_unref(o)
elif state == PA_CONTEXT_FAILED :
print("Connection failed (pulse)")
elif state == PA_CONTEXT_TERMINATED:
print("Connection terminated")
def sink_info_cb(self, context, sink_info_p, _, __):
if not sink_info_p:
return
sink_info = sink_info_p.contents
print('-'* 60)
print('index:', sink_info.index)
print('name:', sink_info.name)
print('description:', sink_info.description)
if sink_info.name == self.sink_name:
# Found the sink we want to monitor for peak levels.
# Tell PA to call stream_read_cb with peak samples.
print('setting up peak recording using', sink_info.monitor_source_name)
samplespec = pa_sample_spec()
samplespec.channels = 1
samplespec.format = PA_SAMPLE_S16LE
samplespec.rate = self.rate
pa_stream = pa_stream_new(context, b"peak detect demo", samplespec, None)
pa_stream_set_read_callback(pa_stream,
self._stream_read_cb,
sink_info.index)
pa_stream_connect_record(pa_stream,
sink_info.monitor_source_name,
None,
PA_STREAM_PEAK_DETECT)
def stream_read_cb(self, stream, length, index_incr):
data = c_void_p()
pa_stream_peek(stream, data, c_ulong(length))
data = cast(data, POINTER(c_short))
for i in iter(range(length)):
# When PA_SAMPLE_U8 is used, samples values range from 128
# to 255 because the underlying audio data is signed but
# it doesn't make sense to return signed peaks.
self._samples.put(data[i])
pa_stream_drop(stream)
def DBG(*args):
if Debugging:
msg = " ".join([str(a) for a in args])
print(msg)
class BTLEException(Exception):
"""BTLE Exception."""
DISCONNECTED = 1
COMM_ERROR = 2
INTERNAL_ERROR = 3
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
return self.message
class UUID:
def __init__(self, val, commonName=None):
'''We accept: 32-digit hex strings, with and without '-' characters,
4 to 8 digit hex strings, and integers'''
if isinstance(val, int):
if (val < 0) or (val > 0xFFFFFFFF):
raise ValueError(
"Short form UUIDs must be in range 0..0xFFFFFFFF")
val = "%04X" % val
elif isinstance(val, self.__class__):
val = str(val)
else:
val = str(val) # Do our best
val = val.replace("-", "")
if len(val) <= 8: # Short form
val = ("0" * (8 - len(val))) + val + "00001000800000805F9B34FB"
self.binVal = binascii.a2b_hex(val)
if len(self.binVal) != 16:
raise ValueError(
"UUID must be 16 bytes, got '%s' (len=%d)" % (val,
len(self.binVal)))
self.commonName = commonName
def __str__(self):
s = binascii.b2a_hex(self.binVal).decode('utf-8')
return "-".join([s[0:8], s[8:12], s[12:16], s[16:20], s[20:32]])
def __eq__(self, other):
return self.binVal == UUID(other).binVal
def __cmp__(self, other):
return cmp(self.binVal, UUID(other).binVal)
def __hash__(self):
return hash(self.binVal)
def getCommonName(self):
s = AssignedNumbers.getCommonName(self)
if s:
return s
s = str(self)
if s.endswith("-0000-1000-8000-00805f9b34fb"):
s = s[0:8]
if s.startswith("0000"):
s = s[4:]
return s
class Service:
def __init__(self, *args):
(self.peripheral, uuidVal, self.hndStart, self.hndEnd) = args
self.uuid = UUID(uuidVal)
self.chars = None
def getCharacteristics(self, forUUID=None):
if not self.chars: # Unset, or empty
self.chars = self.peripheral.getCharacteristics(self.hndStart, self.hndEnd)
if forUUID is not None:
u = UUID(forUUID)
return [ch for ch in self.chars if ch.uuid==u]
return self.chars
def __str__(self):
return "Service <uuid=%s handleStart=%s handleEnd=%s>" % (self.uuid.getCommonName(),
self.hndStart,
self.hndEnd)
class Characteristic:
def __init__(self, *args):
(self.peripheral, uuidVal, self.handle, self.properties, self.valHandle) = args
self.uuid = UUID(uuidVal)
def read(self):
return self.peripheral.readCharacteristic(self.valHandle)
def write(self, val, withResponse=False):
#return False
return self.peripheral.writeCharacteristic(self.valHandle, val, withResponse)
# TODO: descriptors
def __str__(self):
return "Characteristic <%s>" % self.uuid.getCommonName()
class Descriptor:
def __init__(self, *args):
(self.peripheral, uuidVal, self.handle) = args
self.uuid = UUID(uuidVal)
def __str__(self):
return "Descriptor <%s>" % self.uuid.getCommonName()
class Peripheral:
def __init__(self, deviceAddr=None):
self._helper = None
self.services = {} # Indexed by UUID
self.discoveredAllServices = False
if deviceAddr is not None:
self.connect(deviceAddr)
def _startHelper(self):
if self._helper is None:
DBG("Running ", helperExe)
self._helper = subprocess.Popen([helperExe],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True)
def _stopHelper(self):
if self._helper is not None:
DBG("Stopping ", helperExe)
self._helper.stdin.write("quit\n")
self._helper.stdin.flush()
self._helper.wait()
self._helper = None
def _writeCmd(self, cmd):
if self._helper is None:
raise BTLEException(BTLEException.INTERNAL_ERROR,
"Helper not started (did you call connect()?)")
DBG("Sent: ", cmd)
self._helper.stdin.write(cmd)
self._helper.stdin.flush()
@staticmethod
def parseResp(line):
resp = {}
for item in line.rstrip().split(' '):
(tag, tval) = item.split('=')
if len(tval)==0:
val = None
elif tval[0]=="$" or tval[0]=="'":
# Both symbols and strings as Python strings
val = tval[1:]
elif tval[0]=="h":
val = int(tval[1:], 16)
elif tval[0]=='b':
val = binascii.a2b_hex(tval[1:])
else:
raise BTLEException(BTLEException.INTERNAL_ERROR,
"Cannot understand response value %s" % repr(tval))
if tag not in resp:
resp[tag] = [val]
else:
resp[tag].append(val)
return resp
def _getResp(self, wantType):
while True:
if self._helper.poll() is not None:
raise BTLEException(BTLEException.INTERNAL_ERROR, "Helper exited")
rv = self._helper.stdout.readline()
DBG("Got:", repr(rv))
if rv.startswith('#'):
continue
resp = Peripheral.parseResp(rv)
if 'rsp' not in resp:
raise BTLEException(BTLEException.INTERNAL_ERROR,
"No response type indicator")
respType = resp['rsp'][0]
if respType == wantType:
return resp
elif respType == 'stat' and resp['state'][0] == 'disc':
self._stopHelper()
raise BTLEException(BTLEException.DISCONNECTED, "Device disconnected")
elif respType == 'err':
errcode=resp['code'][0]
raise BTLEException(BTLEException.COMM_ERROR, "Error from Bluetooth stack (%s)" % errcode)
elif respType == 'ntfy':
DBG("Ignoring notification")
continue
else:
raise BTLEException(BTLEException.INTERNAL_ERROR, "Unexpected response (%s)" % respType)
def status(self):
self._writeCmd("stat\n")
return self._getResp('stat')
def connect(self, addr):
if len(addr.split(":")) != 6:
raise ValueError("Expected MAC address, got %s", repr(addr))
self._startHelper()
self.deviceAddr = addr
self._writeCmd("conn %s\n" % addr)
rsp = self._getResp('stat')
while rsp['state'][0] == 'tryconn':
rsp = self._getResp('stat')
if rsp['state'][0] != 'conn':
self._stopHelper()
raise BTLEException(BTLEException.DISCONNECTED,
"Failed to connect to peripheral %s" % addr)
def disconnect(self):
if self._helper is None:
return
self._writeCmd("disc\n")
self._getResp('stat')
self._stopHelper()
def discoverServices(self):
self._writeCmd("svcs\n")
rsp = self._getResp('find')
starts = rsp['hstart']
ends = rsp['hend']
uuids = rsp['uuid']
nSvcs = len(uuids)
assert(len(starts)==nSvcs and len(ends)==nSvcs)
self.services = {}
for i in range(nSvcs):
self.services[UUID(uuids[i])] = Service(self, uuids[i], starts[i], ends[i])
self.discoveredAllServices = True
return self.services
def getServices(self):
if not self.discoveredAllServices:
self.discoverServices()
return self.services.values()
def getServiceByUUID(self, uuidVal):
uuid = UUID(uuidVal)
if uuid in self.services:
return self.services[uuid]
self._writeCmd("svcs %s\n" % uuid)
rsp = self._getResp('find')
svc = Service(self, uuid, rsp['hstart'][0], rsp['hend'][0])
self.services[uuid] = svc
return svc
def _getIncludedServices(self, startHnd=1, endHnd=0xFFFF):
# TODO: No working example of this yet
self._writeCmd("incl %X %X\n" % (startHnd, endHnd))
return self._getResp('find')
def getCharacteristics(self, startHnd=1, endHnd=0xFFFF, uuid=None):
cmd = 'char %X %X' % (startHnd, endHnd)
if uuid:
cmd += ' %s' % UUID(uuid)
self._writeCmd(cmd + "\n")
rsp = self._getResp('find')
nChars = len(rsp['hnd'])
return [Characteristic(self, rsp['uuid'][i], rsp['hnd'][i],
rsp['props'][i], rsp['vhnd'][i])
for i in range(nChars)]
def getDescriptors(self, startHnd=1, endHnd=0xFFFF):
self._writeCmd("desc %X %X\n" % (startHnd, endHnd) )
resp = self._getResp('desc')
nDesc = len(resp['hnd'])
return [Descriptor(self, resp['uuid'][i], resp['hnd'][i]) for i in
range(nDesc)]
def readCharacteristic(self, handle):
self._writeCmd("rd %X\n" % handle)
resp = self._getResp('rd')
return resp['d'][0]
def _readCharacteristicByUUID(self, uuid, startHnd, endHnd):
# Not used at present
self._writeCmd("rdu %s %X %X\n" % (UUID(uuid), startHnd, endHnd))
return self._getResp('rd')
def writeCharacteristic(self, handle, val, withResponse=False):
cmd = "wrr" if withResponse else "wr"
self._writeCmd("%s %X %s\n" % (cmd, handle, binascii.b2a_hex(val).decode('utf-8')))
return self._getResp('wr')
def setSecurityLevel(self, level):
self._writeCmd("secu %s\n" % level)
return self._getResp('stat')
def setMTU(self, mtu):
self._writeCmd("mtu %x\n" % mtu)
return self._getResp('stat')
def __del__(self):
self.disconnect()
def capitaliseName(descr):
words = descr.split(" ")
capWords = [ words[0].lower() ]
capWords += [ w[0:1].upper() + w[1:].lower() for w in words[1:] ]
return "".join(capWords)
class SimpleBeatDetection:
"""
Simple beat detection algorithm from
http://archive.gamedev.net/archive/reference/programming/features/beatdetection/index.html
"""
def __init__(self, history = 43):
self.local_energy = numpy.zeros(history)
self.local_energy_index = 0
def detect_beat(self, samples):
instant_energy = numpy.dot(samples, samples) / float(0xffffffff)
local_energy_average = self.local_energy.mean()
local_energy_variance = self.local_energy.var()
beat_sensibility = (-0.0025714 * local_energy_variance) + 0.9 #0.75 is good
beat = instant_energy - beat_sensibility * local_energy_average
self.local_energy[self.local_energy_index] = instant_energy
self.local_energy_index -= 1
if self.local_energy_index < 0:
self.local_energy_index = len(self.local_energy) - 1
return beat
class _UUIDNameMap:
# Constructor sets self.currentTimeService, self.txPower, and so on
# from names.
def __init__(self, idList):
self.idMap = {}
for uuid in idList:
attrName = capitaliseName(uuid.commonName)
vars(self) [attrName] = uuid
self.idMap[uuid] = uuid
def getCommonName(self, uuid):
if uuid in self.idMap:
return self.idMap[uuid].commonName
return None
AssignedNumbers = _UUIDNameMap( [
# Service UUIDs
UUID(0x1811, "Alert Notification Service"),
UUID(0x180F, "Battery Service"),
UUID(0x1810, "Blood Pressure"),
UUID(0x1805, "Current Time Service"),
UUID(0x1818, "Cycling Power"),
UUID(0x1816, "Cycling Speed and Cadence"),
UUID(0x180A, "Device Information"),
UUID(0x1800, "Generic Access"),
UUID(0x1801, "Generic Attribute"),
UUID(0x1808, "Glucose"),
UUID(0x1809, "Health Thermometer"),
UUID(0x180D, "Heart Rate"),
UUID(0x1812, "Human Interface Device"),
UUID(0x1802, "Immediate Alert"),
UUID(0x1803, "Link Loss"),
UUID(0x1819, "Location and Navigation"),
UUID(0x1807, "Next DST Change Service"),
UUID(0x180E, "Phone Alert Status Service"),
UUID(0x1806, "Reference Time Update Service"),
UUID(0x1814, "Running Speed and Cadence"),
UUID(0x1813, "Scan Parameters"),
UUID(0x1804, "Tx Power"),
UUID(0x181C, "User Data"),
# Characteristic UUIDs
UUID(0x2A00, "Device Name"),
UUID(0x2A07, "Tx Power Level"),
UUID(0x2A19, "Battery Level"),
UUID(0x2A24, "Model Number String"),
UUID(0x2A25, "Serial Number String"),
UUID(0x2A26, "Firmware Revision String"),
UUID(0x2A27, "Hardware Revision String"),
UUID(0x2A28, "Software Revision String"),
UUID(0x2A29, "Manufacturer Name String"),
])
rgbcolor = array.array('b', [-86, 10, -4, 58, -122, 1, 13, 6, 1, 0, 0, 0, 32, 48, 15, -112, 13])
rgbAddDec = array.array('b', [-86, 10, -4, 58, -122, 1, 11, 1, 9, 5, 54, 13])
lOpen = array.array('b', [ -86, 10, -4, 58, -122, 1, 10, 1, 1, 0, 40, 13])
lClose = array.array('b', [ -86, 10, -4, 58, -122, 1, 10, 1, 0, 1, 40, 13])
brightness = array.array('b', [ -86, 10, -4, 58, -122, 1, 12, 1, 1, 8, 50, 13])
night = array.array('b', [-86, 10, -4, 58, -122, 1, 16, 2, 3, 1, 8, 50, 13])
cct = array.array('b', [-86, 10, -4, 58, -122, 1, 14, 1, 1, 11, 55, 13])
def getCheckSum(arr):
i = 1
total = 0
for char in arr:
if i == 1:
i += 1
continue
if i > len(arr)-2:
break
total += char
if total > 127:
total = -128 + (total - 128)
if total < -128:
total = 128 - abs(total + 128)
#print(total)
i += 1
total += 85
if total > 127:
total = -128 + (total - 128)
if total < -128:
total = 128 - abs(total + 128)
return total
def getRGBColor(r, g, b):
rgbcolor[8] = 1;
rgbcolor[9] = int(r / 2)
rgbcolor[10] = int(g / 2)
rgbcolor[11] = int(b / 2)
rgbcolor[14] = random.randrange(255) - 128;
rgbcolor[15] = getCheckSum(rgbcolor);
return rgbcolor
def getRGBReset():
rgbcolor[8] = 1;
rgbcolor[9] = -128;
rgbcolor[10] = -128;
rgbcolor[11] = -128;
rgbcolor[12] = -128;
rgbcolor[13] = -128;
rgbcolor[14] = 55;
rgbcolor[15] = getCheckSum(rgbcolor);
return rgbcolor
def getWhiteReset():
rgbcolor[8] = 2;
rgbcolor[9] = -128;
rgbcolor[10] = -128;
rgbcolor[11] = -128;
rgbcolor[12] = -128;
rgbcolor[13] = -128;
rgbcolor[14] = random.randrange(255) - 128;
rgbcolor[15] = getCheckSum(rgbcolor);
return rgbcolor
def getRGBAddAndDec(i):
rgbAddDec[8] = i
rgbAddDec[9] = random.randrange(255) - 128;
rgbAddDec[10] = getCheckSum(rgbAddDec);
return rgbAddDec;
def getBrightness(i):
brightness[8] = i + 2
brightness[9] = random.randrange(255) - 128;
brightness[10] = getCheckSum(brightness);
return brightness;
def getCCT(i):
cct[8] = i + 2
cct[9] = random.randrange(255) - 128;
cct[10] = getCheckSum(cct);
return cct;
def getNight():
night[10] = random.randrange(255) - 128;
night[11] = getCheckSum(night);
return night;
def main():
monitor = PeakMonitor(SINK_NAME, METER_RATE)
beatdetect = SimpleBeatDetection()
samples = []
volumes = []
volumesB = []
i = 0
avgVolB = 99
avgVol = 99
lastBright = 0
lastWhite = 0
lastDim = 0
enabled = False
Debugging = False
isWhite = False
devaddr = "D0:39:72:B7:1A:19"
print("Connecting to:", devaddr)
conn = Peripheral(devaddr)
try:
conn.discoverServices()
svc = conn.getServiceByUUID("fff0")
wc = svc.getCharacteristics("fff1")[0]
if(len(sys.argv) > 1 and sys.argv[1] == "on"):
wc.write(getWhiteReset())
time.sleep(0.5)
wc.write(getCCT(9))
time.sleep(0.5)
wc.write(getBrightness(7))
time.sleep(0.5)
#conn.disconnect()
sys.exit(0)
elif(len(sys.argv) > 1 and sys.argv[1] == "dim"):
wc.write(getWhiteReset())
time.sleep(0.5)
wc.write(getCCT(9))
time.sleep(0.5)
wc.write(getBrightness(0))
time.sleep(0.5)
#conn.disconnect()
sys.exit(0)
print("connected, starting peak monitor")
#time.sleep(1)
#spectr = sound.SpectrAnalyzer(1, 300, 512)
for sample in monitor:
if not math.isnan(sample):
samples.append(sample)
if(len(samples) >= 400):
i = i + 1
#samples = monitor[300:]
#unpacked = [x**2 for x in samples]
#volume = sum(unpacked) / float(len(unpacked))
#volume = numpy.sqrt(numpy.mean(unpacked))
# here's the volume
#volume = [20 * numpy.log10(numpy.sqrt(i)) for i in unpacked]
#print(spectr.asBands(25, samples))
#print(volume)
#(man, exp) = math.frexp(volume)
#print(exp)
#fft = numpy.abs(numpy.fft.fft(samples) / 600)
#print(fft[100])
sum_squares = sum(sample**2 for sample in samples)
volume = int(math.sqrt(sum_squares / 300))
volumes.append(volume)
volumesB.append(volume)
if(len(volumes) > 80):
avgVol = sum(volumes) / float(len(volumes))
#print(volume)
if(avgVol > 7500):
enabled = True
else:
enabled = False
wc.write(getWhiteReset())
time.sleep(0.1)
wc.write(getCCT(9))
time.sleep(0.1)
wc.write(getBrightness(4))
time.sleep(0.1)
volumes = []
if(len(volumesB) > 35):
avgVolB = sum(volumesB) / float(len(volumesB))
volumesB = []
#print(volume)
#sample = fft[70] / 10
#sample = sum(fft[30:130]) / float(100) / 10
#print(volume)
if enabled:
beat = beatdetect.detect_beat(samples)
isbeat = beat > 0
if(beat > 9):
print("beat over")
if(volume / avgVolB > 1.5):
print("vol over 1.5")
doWhite = beat > 9 or volume > avgVolB * 1.5
doWhite = False
#print(isbeat)
#print(i - lastBright)
if((isbeat and i - lastDim > 4) or doWhite):
#if(random.randrange(6) < 2):
if((i - lastWhite > 130 and isWhite) or random.randrange(6) == 1 or doWhite):
if(doWhite):
lastWhite = i
avgVol = 9999999999
wc.write(getWhiteReset())
time.sleep(0.1)
wc.write(getCCT(random.randrange(1, 9)))
isWhite = True
else:
wc.write(getRGBReset())
color = colorsys.hsv_to_rgb(random.random(), 1.0, 1.0)
wc.write(getRGBColor(int(color[0] * 255.0), int(color[1] * 255.0), int(color[2] * 255.0)))
#wc.write(getRGBColor(0, 255, 0))
print([int(color[0] * 255.0), int(color[1] * 255.0), int(color[2] * 255.0)])
isWhite = False
time.sleep(0.2)
#wc.write(getRGBColor(random.randrange(255), random.randrange(255), random.randrange(255)))
print("new color")
#else:
wc.write(getBrightness(random.randrange(7, 10)))
lastBright = i
lastDim = 999999999999
print("bright")
#wc.write(getWhiteReset())
elif(i - lastBright > 12):
lastDim = i
lastBright = 999999999999
wc.write(getBrightness(random.randrange(min(4, max(0, int(5-beat))),5)))
print("dim")
#wc.write(getRGBColor(255,0,0))
samples = []
#sample = int( volume / 600) - 14
#if(sample < 0):
# sample = 0
#bar = '>' * sample
#spaces = ' ' * (MAX_SPACES - sample)
#print(' %3d %s%s\r' % (sample, bar, spaces),)
#wc.write(getBrightness(sample))
#sample = sample >> DISPLAY_SCALE
#sample = round(sample / 1000)
#bar = '>' * sample
#spaces = ' ' * (MAX_SPACES - sample)
#print(' %3d %s%s\r' % (sample, bar, spaces),)
#if(sample > 0.2):
#print(sample)
#sys.stdout.flush()
except BTLEException as e:
print(" ->", e)
finally:
#wc.write(getWhiteReset())
#wc.write(getCCT(10))
conn.disconnect()
if __name__ == '__main__':
main()
@lukefor
Copy link
Author

lukefor commented Feb 28, 2021

An upgrade for simple on/off white temperature and colour control

import sys
import numpy

import math
import queue
from ctypes import POINTER, c_ubyte, c_void_p, c_ulong, cast, c_short, c_ushort, c_float, c_uint, c_int


import sys
import os
import time
import subprocess
import random
import signal
import binascii
import array
import colorsys
from bluepy import btle

random.seed()

helperExe = os.path.join(os.path.abspath(os.path.dirname(__file__)), "bluepy-helper")
Debugging = True

def DBG(*args):
        if Debugging:
                msg = " ".join([str(a) for a in args])
                print(msg)

rgbcolor = array.array('b', [-86, 10, -4, 58, -122, 1, 13, 6, 1, 0, 0, 0, 32, 48, 15, -112, 13])
rgbAddDec = array.array('b', [-86, 10, -4, 58, -122, 1, 11, 1, 9, 5, 54, 13])
lOpen = array.array('b',  [ -86, 10, -4, 58, -122, 1, 10, 1, 1, 0, 40, 13])
lClose = array.array('b', [ -86, 10, -4, 58, -122, 1, 10, 1, 0, 1, 40, 13])
brightness = array.array('b', [  -86, 10, -4, 58, -122, 1, 12, 1, 1, 8,  50, 13])
night = array.array('b', [-86, 10, -4, 58, -122, 1, 16, 2, 3, 1, 8, 50, 13])
cct = array.array('b', [-86, 10, -4, 58, -122, 1, 14, 1, 1, 11, 55, 13])

def getCheckSum(arr):
        i = 1
        total = 0
        for char in arr:
                if i == 1:
                        i += 1
                        continue
                if i > len(arr)-2:
                        break
                total += char
                if total > 127:
                        total = -128 + (total - 128)
                if total < -128:
                        total = 128 - abs(total + 128)
                #print(total)
                i += 1  
        total += 85
        if total > 127:
                total = -128 + (total - 128)
        if total < -128:
                total = 128 - abs(total + 128)
        return total


def getRGBColor(r, g, b):
        rgbcolor[8] = 1;
        rgbcolor[9] = int(r / 2)
        rgbcolor[10] = int(g / 2)
        rgbcolor[11] = int(b / 2)
        rgbcolor[14] = random.randrange(255) - 128;
        rgbcolor[15] = getCheckSum(rgbcolor);
        return rgbcolor


def getRGBReset():
        rgbcolor[8] = 1;
        rgbcolor[9] = -128;
        rgbcolor[10] = -128;
        rgbcolor[11] = -128;
        rgbcolor[12] = -128;
        rgbcolor[13] = -128;
        rgbcolor[14] = 55;
        rgbcolor[15] = getCheckSum(rgbcolor);
        return rgbcolor



def getWhiteReset():
        rgbcolor[8] = 2;
        rgbcolor[9] = -128;
        rgbcolor[10] = -128;
        rgbcolor[11] = -128;
        rgbcolor[12] = -128;
        rgbcolor[13] = -128;
        rgbcolor[14] = random.randrange(255) - 128;
        rgbcolor[15] = getCheckSum(rgbcolor);
        return rgbcolor

def getRGBAddAndDec(i):
        rgbAddDec[8] = i
        rgbAddDec[9] = random.randrange(255) - 128;
        rgbAddDec[10] = getCheckSum(rgbAddDec);
        return rgbAddDec;


def getBrightness(i):
        brightness[8] = i + 2
        brightness[9] = random.randrange(255) - 128;
        brightness[10] = getCheckSum(brightness);
        return brightness;



def getCCT(i):
        cct[8] = i + 2
        cct[9] = random.randrange(255) - 128;
        cct[10] = getCheckSum(cct);
        return cct;

def getNight():
        night[10] = random.randrange(255) - 128;
        night[11] = getCheckSum(night);
        return night;






def main():
        enabled = False
        #Debugging = False
        isWhite = False
        devaddr = "D0:39:72:B7:1A:19"
        conn = btle.Peripheral(devaddr)
        try:
                conn.discoverServices()
                svc = conn.getServiceByUUID("fff0")
                wc = svc.getCharacteristics("fff1")[0]
                
                if sys.argv[1] == "off":
                    wc.write(lClose)
                else:
                    wc.write(lOpen)
                    time.sleep(0.5)
                    wc.write(getWhiteReset())
                    time.sleep(0.5)
                    wc.write(getCCT(int(sys.argv[1])))
                    time.sleep(0.5)
                    wc.write(getBrightness(int(sys.argv[2])))

                time.sleep(0.5)
        except BTLEException as e:
                print("    ->", e)
        finally:
                conn.disconnect()

if __name__ == '__main__':
        main()```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment