Created
February 13, 2023 13:43
-
-
Save ov1d1u/df2fc630579ccdce66e655b17a2eebb3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
from homeassistant.components import bluetooth | |
from bleak import BleakClient, exc | |
queues = {} | |
disconnectTasks = {} | |
class MessageQueue(queue.Queue): | |
def get(self, block=False, timeout=None): | |
try: | |
return super(MessageQueue, self).get(block, timeout) | |
except queue.Empty: | |
return None | |
class BLEMessage: | |
def __init__(self, address, characteristic, data, isHex, repeats, ttl): | |
self.address = address | |
self.characteristic = characteristic | |
self.data = data | |
self.isHex = isHex | |
self.repeats = repeats | |
self.ttl = ttl | |
self.retryCount = 0 | |
def ble_get_client(address): | |
present = bluetooth.async_address_present(hass, address, connectable=True) | |
if not present: | |
log.error("Device with address {0} is not present".format(address)) | |
return | |
device = bluetooth.async_ble_device_from_address(hass, address) | |
if not device: | |
log.error("Device with address {0} is not available".format(address)) | |
return | |
client = BleakClient(device) | |
return client | |
def ble_delayed_disconnect(client): | |
task.wait_until(timeout=5) | |
address = client.address | |
log.info("Disconnecting from {0}".format(address)) | |
del disconnectTasks[address] | |
client.disconnect() | |
def ble_write_client(client, msg): | |
if msg.isHex: | |
data = bytes.fromhex(msg.data) | |
else: | |
data = data.encode() | |
for i in range(msg.repeats+1): | |
client.write_gatt_char(msg.characteristic, data) | |
task.sleep(1) | |
def process_device_queue(address): | |
messageQueue = queues.get(address) | |
while msg := messageQueue.get(): | |
log.info("Got message of length {0} for charac: {1}, retry {2}".format(len(msg.data), msg.characteristic, msg.retryCount)) | |
client = ble_get_client(msg.address) | |
if not client: | |
log.error("BLE client with address {0} not found".format(msg.address)) | |
continue | |
try: | |
if not client.is_connected: | |
log.info("Connecting to {0}".format(msg.address)) | |
client.connect() | |
else: | |
# Cancel any previous disconnection task for this device | |
if prevTaskId := disconnectTasks.get(msg.address): | |
task.cancel(prevTaskId) | |
log.info("Reusing connection with {0}".format(msg.address)) | |
ble_write_client(client, msg) | |
except Exception as e: # (exc.BleakDBusError, TimeoutError) as e: | |
log.error("BLE exception: {0}".format(e)) | |
msg.retryCount += 1 | |
if msg.retryCount < msg.ttl: | |
messageQueue.put(msg) | |
# Delay disconnection in case other requests are coming in | |
disconnectTasks[address] = task.create(ble_delayed_disconnect, client) | |
log.info('Clearing queue for device {0}'.format(address)) | |
del queues[address] | |
@service | |
def ble_write(address: str, characteristic: str, data: bytes, hex=False, repeats=0, ttl=1): | |
"""yaml | |
name: BLE Write | |
description: Write data to a BLE characteristic | |
fields: | |
address: | |
description: Bluetooth address of the device | |
example: 11:22:33:44:55:66 | |
required: true | |
selector: | |
text: | |
characteristic: | |
description: BLE characteristic to write onto | |
example: 00002a19-0000-1000-8000-00805f9b34fb | |
required: true | |
selector: | |
text: | |
data: | |
description: Data to write | |
example: bf626d541868626d4e189a62724900ff | |
required: true | |
selector: | |
text: | |
hex: | |
description: If data is in hex format | |
example: true | |
required: false | |
selector: | |
boolean: | |
repeats: | |
description: How many times should the write be repeated. | |
example: 1 | |
required: false | |
selector: | |
number: | |
min: 0 | |
max: 10 | |
ttl: | |
description: How many times should try writing to the BLE device. | |
example: 1 | |
required: false | |
selector: | |
number: | |
min: 1 | |
max: 10 | |
""" | |
msg = BLEMessage(address, characteristic, data, hex, repeats, ttl) | |
messageQueue = queues.get(address) | |
if messageQueue: | |
messageQueue.put(msg) | |
else: | |
messageQueue = MessageQueue() | |
messageQueue.put(msg) | |
queues[address] = messageQueue | |
process_device_queue(address) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment