Created
July 26, 2024 11:11
-
-
Save Langerz82/f7d08822974fe3860d4428e5728667bc to your computer and use it in GitHub Desktop.
EE - emuelec-bluetooth - add accept authorization request.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
################################################################################ | |
# SPDX-License-Identifier: GPL-2.0-or-later | |
# Copyright (C) 2023-present ebeem (https://github.com/ebeem) | |
# Modifications made by: | |
# Langerz82 (https://github.com/Langerz82) | |
# wang80919 (https://github.com/wang80919) | |
# Testing done by: | |
# junm6802030 (https://github.com/junm6802030) | |
# Special thanks to everyone who worked on this. | |
################################################################################ | |
from subprocess import Popen, PIPE, STDOUT | |
from dataclasses import dataclass | |
import time | |
import sys | |
import os | |
from multiprocessing import Process | |
DEBUG = True | |
@dataclass | |
class BluetoothDevice: | |
mac: str | |
name: str | |
alias: str | |
clas: str | |
icon: str | |
paired: bool | |
bounded: bool | |
trusted: bool | |
blocked: bool | |
connected: bool | |
wake_allowed: bool | |
legacy_pairing: bool | |
rssi: int | |
class ShellIO: | |
@staticmethod | |
def execute(cmd: list[str], debug=DEBUG) -> list[str]: | |
if debug: | |
print("> " + " ".join(x for x in cmd)) | |
out: list[str] = [] | |
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p: | |
for line in p.stdout: | |
out.append(line) | |
if debug: | |
print(line) | |
return out | |
@staticmethod | |
def execute_lookup(cmd: list[str], match: str, debug=DEBUG) -> str: | |
out = ShellIO.execute(cmd, debug=debug) | |
for o in out: | |
if match in o: | |
return o | |
return "" | |
@staticmethod | |
def execute_async(cmd: list[str]): | |
Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) | |
class BluetoothCTL: | |
async_bluetooth_process: "Popen | None" | |
def __init__(self): | |
self.async_bluetooth_process = None | |
def _parse_device_info(self, mac: str): | |
info = ShellIO.execute(["bluetoothctl", "info", mac], debug=False) | |
dev = BluetoothDevice( | |
mac=mac, | |
name="", | |
alias="", | |
clas="", | |
icon="", | |
paired=False, | |
bounded=False, | |
trusted=False, | |
blocked=False, | |
connected=False, | |
wake_allowed=False, | |
legacy_pairing=False, | |
rssi=0, | |
) | |
for i in info: | |
i = i.strip() | |
if i.startswith("Name:"): | |
dev.name = i.split(":")[1].strip() | |
if i.startswith("Alias:"): | |
dev.alias = i.split(":")[1].strip() | |
if i.startswith("Class:"): | |
dev.clas = i.split(":")[1].strip() | |
if i.startswith("Icon:"): | |
dev.icon = i.split(":")[1].strip() | |
if i.startswith("Paired:"): | |
dev.paired = i.split(":")[1].strip() == "yes" | |
if i.startswith("Bounded:"): | |
dev.bounded = i.split(":")[1].strip() == "yes" | |
if i.startswith("Trusted:"): | |
dev.trusted = i.split(":")[1].strip() == "yes" | |
if i.startswith("Blocked:"): | |
dev.blocked = i.split(":")[1].strip() == "yes" | |
if i.startswith("Connected:"): | |
dev.connected = i.split(":")[1].strip() == "yes" | |
if i.startswith("WakeAllowed:"): | |
dev.wake_allowed = i.split(":")[1].strip() == "yes" | |
if i.startswith("LegacyPairing:"): | |
dev.legacy_pairing = i.split(":")[1].strip() == "yes" | |
if i.startswith("RSSI:"): | |
# Following code tested on bluez 5.66, and should work with 5.72 too. | |
dev.rssi = int(i.split(" ")[-1].replace("(","").replace(")","").strip()) | |
return dev | |
@property | |
def power(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable", debug=False) | |
@property | |
def discoverable(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discoverable", debug=False) | |
@property | |
def pairable(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Pairable", debug=False) | |
@power.setter | |
def power(self, value): | |
ShellIO.execute(["bluetoothctl", "power", "on" if value else "off"]) | |
@discoverable.setter | |
def discoverable(self, value): | |
ShellIO.execute(["bluetoothctl", "discoverable", "on" if value else "off"]) | |
@pairable.setter | |
def pairable(self, value): | |
ShellIO.execute(["bluetoothctl", "pairable", "on" if value else "off"]) | |
@pairable.setter | |
def agent(self, value): | |
ShellIO.execute(["bluetoothctl", "agent", "on" if value else "off"]) | |
def scan(self, timeout=10): | |
ShellIO.execute(["bluetoothctl", "--timeout", str(timeout), "scan", "on"]) | |
def scan_async(self, timeout=10): | |
# CODE START - LANGERZ82 - BT AUTHORIZE AGENT | |
# using stdbuf to flush buffer and get output immediately (bluetoothctl issue) | |
# Removed the scan in the command line start as needed to be done later so auth agent appears. | |
cmd = ( | |
"stdbuf -oL bluetoothctl --timeout " + str(timeout) | |
if timeout > 0 | |
else "stdbuf -oL bluetoothctl" | |
) | |
process = Popen("bluetoothctl", stdout=PIPE, stdin=PIPE, stderr=PIPE, universal_newlines=True) | |
process.stdin.write('agent on\n') | |
process.stdin.flush() | |
process.stdin.write('scan on\n') | |
process.stdin.flush() | |
# CODE END | |
while True: | |
line = process.stdout.readline() | |
print(line) | |
if line == "" and process.poll() is not None: | |
break | |
if line: | |
line = line.strip().rstrip() | |
if DEBUG: | |
print("< ", line) | |
# CODE START - LANGERZ82 - BT AUTHORIZE AGENT | |
# The code should be something like this, however entering just yes comes up with bluetoothctl invalid cmd. | |
if ("[agent]" in line.split()[0] and "Authorize service" in line): | |
print("authorize agent") | |
process.stdin.write('yes\n') | |
process.stdin.flush() | |
# CODE END | |
if ( | |
len(line.split()) >= 3 | |
and len(line.split()[2]) == 17 | |
and ( | |
"NEW" in line.split()[0] | |
or "CHG" in line.split()[0] | |
or "DEL" in line.split()[0] | |
) | |
): | |
if DEBUG: | |
print(bt.devices) | |
devs = [x for x in bt.devices if x.mac == line.split()[2]] | |
if len(devs) > 0: | |
if "DEL" in line.split()[0]: | |
on_device_removed(devs[0]) | |
elif "NEW" in line.split()[0] or ( | |
"CHG" in line.split()[0] and "RSSI:" in line | |
): | |
if DEBUG: | |
print(devs) | |
if not on_device_discovered(devs[0]): | |
print("stopping thread") | |
break | |
def scan_stop(self): | |
ShellIO.execute_async(["pkill", "-f", "bluetoothctl"]) | |
@property | |
def scanning(self) -> bool: | |
return "yes" in ShellIO.execute_lookup(["bluetoothctl", "show"], "Discovering", debug=False) | |
@property | |
def controllers(self) -> list[str]: | |
controllers = ShellIO.execute(["bluetoothctl", "list"], debug=False) | |
return [ctrl.split()[1] for ctrl in controllers] | |
@property | |
def devices(self) -> list[BluetoothDevice]: | |
devs = ShellIO.execute(["bluetoothctl", "devices"], debug=False) | |
return [self._parse_device_info(dev.split()[1]) for dev in devs] | |
def trust(self, dev: BluetoothDevice) -> bool: | |
return "succeeded" in ShellIO.execute_lookup( | |
["bluetoothctl", "trust", dev.mac], "trust succeeded" | |
) | |
def untrust(self, dev: BluetoothDevice) -> bool: | |
return "succeeded" in ShellIO.execute_lookup( | |
["bluetoothctl", "untrust", dev.mac], "untrust succeeded" | |
) | |
def pair(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "pair", dev.mac], "Pairing successful" | |
) | |
def disconnect(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "disconnect", dev.mac], "Disconnection successful" | |
) | |
def connect(self, dev: BluetoothDevice) -> bool: | |
return "successful" in ShellIO.execute_lookup( | |
["bluetoothctl", "connect", dev.mac], "Connection successful" | |
) | |
def forget(self, dev: BluetoothDevice) -> bool: | |
return "removed" in ShellIO.execute_lookup( | |
["bluetoothctl", "remove", dev.mac], "Device has been removed" | |
) | |
# returns true if a game is running via the emuelecRunEmu script | |
def is_in_game(): | |
return ShellIO.execute_lookup(["ps", "-ef"], "emuelecRunEmu.sh", debug=False) | |
# returns true if another process (not current) is running infinite scan | |
def is_bluetooth_running(): | |
out = ShellIO.execute(["ps", "-ef"], debug=False) | |
for o in out: | |
if "emuelec-bluetooth -1" in o: | |
return False if str(os.getpid()) in o else True | |
return False | |
def on_device_discovered(dev: BluetoothDevice) -> bool: | |
if DEBUG: | |
print(dev) | |
if dev.blocked: | |
return True | |
if (dev.trusted and dev.paired and not dev.connected and bt.connect(dev)): | |
return True | |
if ( | |
not dev.connected # exclude already connected devices | |
and dev.rssi != 0 # exclude inactive devices (saved) | |
and "input-" in dev.icon # exclude any non-input device | |
): | |
print( | |
"found device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
if bt.trust(dev) and bt.pair(dev) and bt.connect(dev): | |
print("connected device {}".format(dev.name)) | |
return False | |
else: | |
bt.forget(dev) | |
print("forgot device {}".format(dev.name)) | |
elif ( | |
dev.connected and not dev.trusted | |
# and dev.rssi == 0 | |
# and "input-" in dev.icon | |
): | |
print( | |
"trusting and pairing device {}, mac: {}, icon: {}, paired: {}".format( | |
dev.name, dev.mac, dev.icon, dev.paired | |
) | |
) | |
if bt.trust(dev): | |
bt.pair(dev) | |
print("connect cable.") | |
return True | |
def on_device_removed(dev: BluetoothDevice): | |
pass | |
SCAN_TIME = 60 | |
# ELAPSED_TIME = 0 | |
UPDATE_INTERVAL = 1 | |
BEEN_IN_GAME = False | |
BT_THREAD: Process = None | |
if __name__ == "__main__": | |
if len(sys.argv) >= 2: | |
SCAN_TIME = int(sys.argv[1]) | |
print("running scan for " + str(SCAN_TIME) + " seconds.") | |
bt = BluetoothCTL() | |
if is_bluetooth_running(): | |
print("bluetooth is already running, exiting.") | |
exit() | |
if len(bt.controllers) == 0: | |
print("no controller found, exiting.") | |
exit() | |
bt.power = True | |
bt.agent = True | |
bt.discoverable = True | |
bt.pairable = True | |
start_time = time.perf_counter() | |
while True: | |
diff_time = time.perf_counter() - start_time | |
# if SCAN_TIME >= 0: | |
# if ELAPSED_TIME > SCAN_TIME: | |
# print(diff_time) | |
if SCAN_TIME != -1 and diff_time > SCAN_TIME: | |
exit() | |
if not BEEN_IN_GAME: | |
BEEN_IN_GAME = is_in_game() | |
connected_devs = [dev for dev in bt.devices if dev.connected] | |
if not bt.scanning and ( | |
not BEEN_IN_GAME or len(connected_devs) == 0 or SCAN_TIME > 0 | |
): | |
print("resuming scan") | |
BT_THREAD = Process(target=bt.scan_async, args=(SCAN_TIME,)) | |
BT_THREAD.start() | |
time.sleep(5) | |
elif ( | |
bt.scanning and BEEN_IN_GAME and (len(connected_devs) > 0 or SCAN_TIME < 0) | |
): | |
print("stopping scan") | |
if BT_THREAD is not None: | |
BT_THREAD.terminate() | |
bt.scan_stop() | |
time.sleep(UPDATE_INTERVAL) | |
# ELAPSED_TIME += UPDATE_INTERVAL |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment