Skip to content

Instantly share code, notes, and snippets.

@mwinkler
Created February 19, 2021 23:21
Show Gist options
  • Save mwinkler/35aae5d00d77230f192bdae451d685f2 to your computer and use it in GitHub Desktop.
Save mwinkler/35aae5d00d77230f192bdae451d685f2 to your computer and use it in GitHub Desktop.
# thanks to Vinz1911 (https://github.com/Vinz1911/PrimePoweredUP) for the BLE handling
from spike import PrimeHub, LightMatrix, Button, StatusLight, ForceSensor, MotionSensor, Speaker, ColorSensor, App, DistanceSensor, Motor, MotorPair
from spike.control import wait_for_seconds, wait_until, Timer
from math import *
import utime
import ubluetooth
import ubinascii
import struct
from micropython import const
import time
hub = PrimeHub()
class BLE_Handler:
def __init__(self):
# constants
self.__IRQ_SCAN_RESULT = const(1 << 4)
self.__IRQ_SCAN_COMPLETE = const(1 << 5)
self.__IRQ_PERIPHERAL_CONNECT = const(1 << 6)
self.__IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
self.__IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
self.__IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
self.__IRQ_GATTC_READ_RESULT = const(1 << 11)
self.__IRQ_GATTC_NOTIFY = const(1 << 13)
self.__SERVICE_HID_UUID = ubluetooth.UUID(0x1812) # HID
self.__CHAR_REPORT_UUID = ubluetooth.UUID(0x2a4d) # Report
self.__CHAR_REPORT_MAP_UUID = ubluetooth.UUID(0x2a4b) # Report Map
# class specific
self.__ble = ubluetooth.BLE()
self.__ble.active(True)
self.__ble.irq(handler=self.__irq)
self.__decoder = _Decoder()
self.__reset()
self.debug = False
def __reset(self):
# cached data
self.__addr = None
self.__addr_type = None
self.__adv_type = None
self.__services = None
self.__man_data = None
self.__name = None
self.__conn_handle = None
self.__value_handle = None
# reserved callbacks
self.__scan_callback = None
self.__read_callback = None
self.__notify_callback = None
self.__connected_callback = None
self.__disconnected_callback = None
def __log(self, *args):
if not self.debug:
return
print(args)
# start scan for ble devices
def scan_start(self, timeout, callback):
self.__log("start scanning for", self.__SERVICE_HID_UUID)
self.__scan_callback = callback
self.__ble.gap_scan(timeout, 30000, 30000)
# stop current scan
def scan_stop(self):
self.__ble.gap_scan(None)
# write gatt client data
def write(self, data, adv_value=None):
if not self.__is_connected():
return
if adv_value:
self.__ble.gattc_write(self.__conn_handle, adv_value, data)
else:
self.__ble.gattc_write(self.__conn_handle, self.__value_handle, data)
# read gatt client
def read(self, callback):
if not self.__is_connected():
return
self.__read_callback = callback
self.__ble.gattc_read(self.__conn_handle, self.__value_handle)
# connect to ble device
def connect(self, addr_type, addr):
self.__ble.gap_connect(addr_type, addr)
# disconnect from ble device
def disconnect(self):
if not self.__is_connected():
return
self.__ble.gap_disconnect(self.__conn_handle)
self.__reset()
# get notification
def on_notify(self, callback):
self.__notify_callback = callback
# get callback on connect
def on_connect(self, callback):
self.__connected_callback = callback
# get callback on connect
def on_disconnect(self, callback):
self.__disconnected_callback = callback
# MARK: - Private Functions
# connection status
def __is_connected(self):
return self.__conn_handle is not None
# ble event handler
def __irq(self, event, data):
# called for every result of a ble scan
if event == self.__IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
#self.__log("__IRQ_SCAN_RESULT")
#self.__log("addr_type:", addr_type, " addr:", ubinascii.hexlify(addr, ":"), " adv_type:", adv_type, " rssi:", rssi)
#self.__log("adv_data:", ubinascii.hexlify(adv_data))
self.__log("__IRQ_SCAN_RESULT -> uuid:", self.__decoder.decode_services(adv_data))
if self.__SERVICE_HID_UUID in self.__decoder.decode_services(adv_data):
self.__addr_type = addr_type
self.__addr = bytes(addr)
self.__adv_type = adv_type
self.__name = self.__decoder.decode_name(adv_data)
self.__services = self.__decoder.decode_services(adv_data)
self.__man_data = self.__decoder.decode_manufacturer(adv_data)
self.scan_stop()
# called after a ble scan is finished
elif event == self.__IRQ_SCAN_COMPLETE:
if self.__scan_callback:
self.__scan_callback(self.__addr_type, self.__addr, self.__man_data)
self.__scan_callback = None
# called if a peripheral device is connected
elif event == self.__IRQ_PERIPHERAL_CONNECT:
self.__log("__IRQ_PERIPHERAL_CONNECT")
conn_handle, addr_type, addr = data
self.__conn_handle = conn_handle
self.__ble.gattc_discover_services(self.__conn_handle)
# called if a peripheral device is disconnected
elif event == self.__IRQ_PERIPHERAL_DISCONNECT:
self.__log("__IRQ_PERIPHERAL_DISCONNECT")
conn_handle, _, _ = data
if self.__disconnected_callback:
self.__disconnected_callback()
if conn_handle == self.__conn_handle:
self.__reset()
# called if a service is returned
elif event == self.__IRQ_GATTC_SERVICE_RESULT:
conn_handle, start_handle, end_handle, uuid = data
self.__log("__IRQ_GATTC_SERVICE_RESULT -> uuid:", uuid)
if conn_handle == self.__conn_handle and uuid == self.__SERVICE_HID_UUID:
self.__ble.gattc_discover_characteristics(self.__conn_handle, start_handle, end_handle)
# called if a characteristic is returned
elif event == self.__IRQ_GATTC_CHARACTERISTIC_RESULT:
conn_handle, def_handle, value_handle, properties, uuid = data
self.__log("__IRQ_GATTC_CHARACTERISTIC_RESULT -> uuid:", uuid)
if conn_handle == self.__conn_handle and uuid == self.__CHAR_REPORT_UUID:
self.__value_handle = value_handle
# finished discovering, connecting finished
self.__connected_callback()
# called if data was successfully read
elif event == self.__IRQ_GATTC_READ_RESULT:
conn_handle, value_handle, char_data = data
self.__log("__IRQ_GATTC_READ_RESULT -> char_data:")
if self.__read_callback:
self.__read_callback(char_data)
# called if a notification appears
elif event == self.__IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
self.__log("__IRQ_GATTC_NOTIFY -> notify_data:")
if self.__notify_callback:
self.__notify_callback(notify_data)
class _Decoder:
"""Class to decode BLE adv_data"""
def __init__(self):
self.__COMPANY_IDENTIFIER_CODES = {"0397": "LEGO System A/S"}
def decode_manufacturer(self, payload):
man_data = []
n = self.__decode_field(payload, const(0xFF))
if not n:
return []
company_identifier = ubinascii.hexlify(struct.pack('<h', *struct.unpack('>h', n[0])))
company_name = self.__COMPANY_IDENTIFIER_CODES.get(company_identifier.decode(), "?")
company_data = n[0][2:]
man_data.append(company_identifier.decode())
man_data.append(company_name)
man_data.append(company_data)
return man_data
def decode_name(self, payload):
n = self.__decode_field(payload, const(0x09))
return str(n[0], "utf-8") if n else "parsing failed!"
def decode_services(self, payload):
services = []
for u in self.__decode_field(payload, const(0x3)):
services.append(ubluetooth.UUID(struct.unpack("<h", u)[0]))
for u in self.__decode_field(payload, const(0x5)):
services.append(ubluetooth.UUID(struct.unpack("<d", u)[0]))
for u in self.__decode_field(payload, const(0x7)):
services.append(ubluetooth.UUID(u))
return services
def __decode_field(self, payload, adv_type):
i = 0
result = []
while i + 1 < len(payload):
if payload[i + 1] == adv_type:
result.append(payload[i + 2: i + payload[i] + 1])
i += 1 + payload[i]
return result
moto = Motor('B')
def blink_pixel(x,y):
hub.light_matrix.set_pixel(x, y, 100)
time.sleep_ms(50)
hub.light_matrix.set_pixel(x, y, 0)
def scan_finished(addr_type, addr, man_data):
#print("scan finished ", addr)
if addr:
handler.connect(addr_type, addr)
def connected():
#print("connected")
hub.status_light.on('blue')
def notify(notify_data):
pad_b = notify_data[0]
pad_x = struct.unpack('<h', notify_data[8:10])[0] + 32768
pad_x_norm = int(pad_x / 65536 * 100)
hub.light_matrix.set_pixel(1, 0, int(pad_b * 100))
hub.light_matrix.set_pixel(1, 1, pad_x_norm)
moto.start_at_power(pad_x_norm * pad_b)
def read(data):
#print(ubinascii.hexlify(data))
blink_pixel(4,4)
hub.status_light.on('white')
handler = BLE_Handler()
handler.on_connect(connected)
handler.on_notify(notify)
handler.scan_start(3000, callback=scan_finished)
while True:
if hub.left_button.was_pressed():
handler.disconnect()
hub.status_light.on('white')
raise SystemExit
#if hub.right_button.was_pressed():
#handler.read(read)
#print(pad_x_norm)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment