-
-
Save ofekp/539ce199a96e6a9ace2c1511cc7409ce to your computer and use it in GitHub Desktop.
| # ReachView code is placed under the GPL license. | |
| # Written by Egor Fedorov ([email protected]) | |
| # Copyright (c) 2015, Emlid Limited | |
| # All rights reserved. | |
| # If you are interested in using ReachView code as a part of a | |
| # closed source project, please contact Emlid Limited ([email protected]). | |
| # This file is part of ReachView. | |
| # ReachView is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # ReachView is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # You should have received a copy of the GNU General Public License | |
| # along with ReachView. If not, see <http://www.gnu.org/licenses/>. | |
| import time | |
| import pexpect | |
| import subprocess | |
| import sys | |
| import re | |
| class BluetoothctlError(Exception): | |
| """This exception is raised, when bluetoothctl fails to start.""" | |
| pass | |
| class Bluetoothctl: | |
| """A wrapper for bluetoothctl utility.""" | |
| def __init__(self): | |
| out = subprocess.check_output("rfkill unblock bluetooth", shell = True) | |
| self.child = pexpect.spawn("bluetoothctl", echo = False) | |
| def get_output(self, command, pause = 0): | |
| """Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
| self.child.send(command + "\n") | |
| time.sleep(pause) | |
| start_failed = self.child.expect(["bluetooth", pexpect.EOF]) | |
| if start_failed: | |
| raise BluetoothctlError("Bluetoothctl failed after running " + command) | |
| return self.child.before.split("\r\n") | |
| def start_scan(self): | |
| """Start bluetooth scanning process.""" | |
| try: | |
| out = self.get_output("scan on") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| def make_discoverable(self): | |
| """Make device discoverable.""" | |
| try: | |
| out = self.get_output("discoverable on") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| def parse_device_info(self, info_string): | |
| """Parse a string corresponding to a device.""" | |
| device = {} | |
| block_list = ["[\x1b[0;", "removed"] | |
| string_valid = not any(keyword in info_string for keyword in block_list) | |
| if string_valid: | |
| try: | |
| device_position = info_string.index("Device") | |
| except ValueError: | |
| pass | |
| else: | |
| if device_position > -1: | |
| attribute_list = info_string[device_position:].split(" ", 2) | |
| device = { | |
| "mac_address": attribute_list[1], | |
| "name": attribute_list[2] | |
| } | |
| return device | |
| def get_available_devices(self): | |
| """Return a list of tuples of paired and discoverable devices.""" | |
| try: | |
| out = self.get_output("devices") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| available_devices = [] | |
| for line in out: | |
| device = self.parse_device_info(line) | |
| if device: | |
| available_devices.append(device) | |
| return available_devices | |
| def get_paired_devices(self): | |
| """Return a list of tuples of paired devices.""" | |
| try: | |
| out = self.get_output("paired-devices") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| paired_devices = [] | |
| for line in out: | |
| device = self.parse_device_info(line) | |
| if device: | |
| paired_devices.append(device) | |
| return paired_devices | |
| def get_discoverable_devices(self): | |
| """Filter paired devices out of available.""" | |
| available = self.get_available_devices() | |
| paired = self.get_paired_devices() | |
| return [d for d in available if d not in paired] | |
| def get_device_info(self, mac_address): | |
| """Get device info by mac address.""" | |
| try: | |
| out = self.get_output("info " + mac_address) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| return out | |
| def get_connectable_devices(self): | |
| """Get a list of connectable devices. | |
| Must install 'sudo apt-get install bluez blueztools' to use this""" | |
| try: | |
| res = [] | |
| out = subprocess.check_output(["hcitool", "scan"]) # Requires 'apt-get install bluez' | |
| out = out.split("\n") | |
| device_name_re = re.compile("^\t([0-9,:,A-F]{17})\t(.*)$") | |
| for line in out: | |
| device_name = device_name_re.match(line) | |
| if device_name != None: | |
| res.append({ | |
| "mac_address": device_name.group(1), | |
| "name": device_name.group(2) | |
| }) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| return res | |
| def is_connected(self): | |
| """Returns True if there is a current connection to any device, otherwise returns False""" | |
| try: | |
| res = False | |
| out = subprocess.check_output(["hcitool", "con"]) # Requires 'apt-get install bluez' | |
| out = out.split("\n") | |
| mac_addr_re = re.compile("^.*([0-9,:,A-F]{17}).*$") | |
| for line in out: | |
| mac_addr = mac_addr_re.match(line) | |
| if mac_addr != None: | |
| res = True | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| return res | |
| def pair(self, mac_address): | |
| """Try to pair with a device by mac address.""" | |
| try: | |
| out = self.get_output("pair " + mac_address, 4) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def remove(self, mac_address): | |
| """Remove paired device by mac address, return success of the operation.""" | |
| try: | |
| out = self.get_output("remove " + mac_address, 3) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def connect(self, mac_address): | |
| """Try to connect to a device by mac address.""" | |
| try: | |
| out = self.get_output("connect " + mac_address, 2) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def disconnect(self, mac_address): | |
| """Try to disconnect to a device by mac address.""" | |
| try: | |
| out = self.get_output("disconnect " + mac_address, 2) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def trust(self, mac_address): | |
| """Trust the device with the given MAC address""" | |
| try: | |
| out = self.get_output("trust " + mac_address, 4) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["not available", "trust succeeded", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def start_agent(self): | |
| """Start agent""" | |
| try: | |
| out = self.get_output("agent on") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| def default_agent(self): | |
| """Start default agent""" | |
| try: | |
| out = self.get_output("default-agent") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| if __name__ == "__main__": | |
| print("Init bluetooth...") | |
| bl = Bluetoothctl() | |
| print("Ready!") | |
| bl.start_scan() | |
| print("Scanning for 10 seconds...") | |
| for i in range(0, 10): | |
| print(i) | |
| time.sleep(1) | |
| print(bl.get_discoverable_devices()) |
Hello,
Thank you very much for this code.
I am using python3 and there are some errors to compile it.
Is it possible to get this wrapper for python3?
Thanks in advance
If I start the agent and default-agent, how can I get the pi to answer the pair request, so that it can pair with the device?
I am getting this error when I try to connect to a device that is connected to another device, it should return "Failed to connect" but it's a TIMEOUT exception.
raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0xb62e3ab0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
before (last 100 chars): 'mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD RSSI: -47\r\n[\x 1b[0;93mCHG\x1b[0m] Device 98:5F:D3:57:1A:BD TxPower: 4\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 544
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile("Failed to connect")
1: re.compile("Connection successful")
2: EOF
EDIT2: changed "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" to "start_failed = self.child.expect( [ "#", pexpect.EOF ] )", and now it works even if you have another bluetooth deviced connected.
EDIT: I think I know where the problem is. the line "start_failed = self.child.expect(["bluetooth", pexpect.EOF])" is waiting for the bluetoothctl prompt to be "[bluetooth]", and when a device is already connected, the prompt is "[NAME OF THE DEVICE]". Now I'm figuring out how to make it ignore that.
Thanks for the code. Not sure if it's a bug, but if there's any other device already connected, the script gets stuck on "Ready!" message, and after a few second it stops and prints this:
Traceback (most recent call last):
File "scripts/bluetoothctl.py", line 270, in
bl.start_scan()
File "scripts/bluetoothctl.py", line 60, in start_scan
out = self.get_output("scan on")
File "scripts/bluetoothctl.py", line 50, in get_output
start_failed = self.child.expect(["bluetooth", pexpect.EOF])
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 341, in expect
timeout, searchwindowsize, async_)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/spawnbase.py", line 369, in expect_list
return exp.expect_loop(timeout)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 119, in expect_loop
return self.timeout(e)
File "/home/pi/.local/lib/python2.7/site-packages/pexpect/expect.py", line 82, in timeout
raise TIMEOUT(msg)
pexpect.exceptions.TIMEOUT: Timeout exceeded.
<pexpect.pty_spawn.spawn object at 0x769f98f0>
command: /usr/bin/bluetoothctl
args: ['/usr/bin/bluetoothctl']
buffer (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
before (last 100 chars): 'M30 Modkit]\x1b[0m# Discovery started\r\n[\x1b[0;93mCHG\x1b[0m] Controller B8:27:EB:94:DE:6D Discovering: yes\r\n'
after: <class 'pexpect.exceptions.TIMEOUT'>
match: None
match_index: None
exitstatus: None
flag_eof: False
pid: 688
child_fd: 5
closed: False
timeout: 30
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 1
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_re:
0: re.compile('bluetooth')
1: EOF
If you replace "bluetooth" with "(\x1b\\[0;94m)\\[.+\\](\x1b\\[0m)" it will work when connected to a device. This checks for any text between a set of brackets that's wrapped in the ansi color code bluetoothctl uses for the prompt text.
Thank you @ShibaSama that works great !
If pairing is not working, try calling the "connect" function this will automatically try attempting to pair the device to its client. This worked for me
I know the use of hcitool is not exactly warping the bluetoothctl, but bluetoothctl just does not have all the needed functionality. 😃