Skip to content

Instantly share code, notes, and snippets.

@chrisboyle
Last active December 28, 2019 00:32
Show Gist options
  • Select an option

  • Save chrisboyle/33ef42e1f880ab126391cf17a6acd18e to your computer and use it in GitHub Desktop.

Select an option

Save chrisboyle/33ef42e1f880ab126391cf17a6acd18e to your computer and use it in GitHub Desktop.
Custom LED / mic button behaviour and battery checking on Corsair Virtuoso
#!/usr/bin/env python3
# Custom LED / mic button behaviour and battery checking on Corsair Virtuoso
# Suggested udev rules to put in e.g. /etc/udev/rules.d/52-corsair.rules
# SUBSYSTEM=="usb", ATTR{idVendor}=="1b1c", ATTR{idProduct}=="0a41", MODE="0660", GROUP="plugdev"
# SUBSYSTEM=="usb", ATTR{idVendor}=="1b1c", ATTR{idProduct}=="0a42", MODE="0660", GROUP="plugdev"
import alsaaudio
import enum
import hid
import re
import sys
import time
from pynput.keyboard import Key, KeyCode, Controller, Listener
PLAY_PAUSE_KEY = KeyCode.from_vk(0x1008ff14) # press once for this
ATC_KEY = Key.scroll_lock # press and hold to hold this key
TEAMSPEAK_KEY = Key.shift_r # double press and hold to hold this key
DOUBLE_TAP_SECS = 0.15 # you get 2x this to complete the whole double-tap
REOPEN_SECS = 0.5 # how soon to retry if the headset is missing
REOPEN_TIMEOUTS = 5 # how many timeouts means start again
VENDOR_CORSAIR = 0x1b1c
PRODUCTS_AND_WIRELESS_BIT = [(0x0a41, 0), (0x0a42, 1)]
DATA_LENGTH = 64
CHARGE_MAP = 0xe2a8a2a02ee2e3aee03a8eee2ee3a000
def to_hex(inp):
return ' '.join(['{:02x}'.format(b) for b in inp])
def from_hex6(s):
if not re.match(r'^[0-9a-f]{6}$', s):
return None
return tuple(int(s[i:i+2], 16) for i in (0, 2, 4))
def drop_trailing_zeros(inp):
drop = next((i for i, x in enumerate(inp[::-1]) if x), len(inp))
return inp[:-drop]
def print_without_trail(prefix, inp):
print(prefix + ' ' + to_hex(drop_trailing_zeros(inp)))
def rainbow():
return map(lambda offset: int(max(0, min(1, abs(((time.time() - 2*offset) % 6) - 3) - 1)) * 255), [0, 1, 2])
def pad(*x):
return list(x) + [0x00] * (DATA_LENGTH - len(x))
class DoubleTapState(enum.Enum):
IDLE = 0
PRESSED_ONCE = 1
RELEASED_ONCE = 2
TX_ATC = 3 # pressed and held, we are holding ATC key
TX_TEAMSPEAK = 4 # double pressed and held, we are holding Teamspeak key
doubleTapState = DoubleTapState.IDLE
doubleTapExpiry = None
def handle_mic_button(is_press):
global doubleTapState, doubleTapExpiry, atc_pressed, ts_pressed
if is_press:
if doubleTapState == DoubleTapState.IDLE:
doubleTapState = DoubleTapState.PRESSED_ONCE
doubleTapExpiry = time.time() + DOUBLE_TAP_SECS
else:
doubleTapState = DoubleTapState.TX_TEAMSPEAK
doubleTapExpiry = None
keyboard.press(TEAMSPEAK_KEY)
ts_pressed = True
else:
if doubleTapState == DoubleTapState.PRESSED_ONCE:
doubleTapState = DoubleTapState.RELEASED_ONCE
doubleTapExpiry = time.time() + DOUBLE_TAP_SECS
elif doubleTapState == DoubleTapState.TX_ATC:
doubleTapState = DoubleTapState.IDLE
keyboard.release(ATC_KEY)
atc_pressed = False
elif doubleTapState == DoubleTapState.TX_TEAMSPEAK:
doubleTapState = DoubleTapState.IDLE
keyboard.release(TEAMSPEAK_KEY)
ts_pressed = False
def check_double_tap_expiry():
global doubleTapState, doubleTapExpiry, atc_pressed, ts_pressed
if doubleTapExpiry is not None and time.time() > doubleTapExpiry:
doubleTapExpiry = None
if doubleTapState == DoubleTapState.PRESSED_ONCE:
doubleTapState = DoubleTapState.TX_ATC
keyboard.press(ATC_KEY)
atc_pressed = True
else: # pressed/release once, and then nothing
keyboard.press(PLAY_PAUSE_KEY)
keyboard.release(PLAY_PAUSE_KEY)
doubleTapState = DoubleTapState.IDLE
def mute_sidetone(mute):
try:
sidetone = alsaaudio.Mixer('Sidetone', device='hw:Gamin')
sidetone.setmute([0,1][mute])
except:
pass # Likely unpatched kernel with indistinguishable Sidetone and Headset; never mind
def key_pressed(key, pressed):
global atc_pressed, ts_pressed
if key == ATC_KEY:
if atc_pressed == pressed:
return
atc_pressed = pressed
elif key == TEAMSPEAK_KEY:
if ts_pressed == pressed:
return
ts_pressed = pressed
else:
return
# Muting the capture channel here might save battery life (debatable; it
# probably knows nothing's reading) but it eventually silently breaks
# capture entirely, so don't do that. Settle for toggling sidetone.
mute_sidetone(not(atc_pressed or ts_pressed))
last_battery_check = None
last_battery = None
is_plugged_in = None
def check_battery():
global last_battery_check
if last_battery_check is None or (time.time() - last_battery_check > 180):
last_battery_check = time.time()
communicate(0x02, 0x10) # request plugged in or not
communicate(0x02, 0x0f) # request level
def battery_result(result):
global last_battery, last_battery_check, is_plugged_in
last_battery_check = time.time()
if result == 1: # TODO actually track what's being replied to instead
is_plugged_in = True
elif result == 2:
is_plugged_in = False
elif result != last_battery:
last_battery = result
print('Battery: %d%%%s' % (last_battery/10.0, ', charging' if is_plugged_in else ''))
def communicate(*out, expect = None):
global timeouts
out = pad(0x02, 0x08 | wireless_bit, *out)
if expect:
if not hasattr(expect[0], '__len__'):
expect = [expect]
expect = map(lambda e: pad(0x01, wireless_bit, *e), expect)
device.write(out)
inp = device.read(DATA_LENGTH, INTERVAL_MS)
while len(inp) > 0:
if inp in [MUTE_PRESS, MUTE_REL]:
handle_mic_button(inp == MUTE_PRESS)
elif inp[:4] == [0x01, wireless_bit, 0x02, 0x00]: # battery reply
battery_result((inp[5] << 8) + inp[4])
elif inp[:5] == [0x03, wireless_bit, 0x01, 0x0f, 0x00]: # unsolicited battery
battery_result((inp[6] << 8) + inp[5])
elif inp[:5] == [0x03, wireless_bit, 0x01, 0x10, 0x00]: # unsolicited plug/unplug
battery_result(inp[5])
elif inp in [pad(0x01, wireless_bit, 0x05, 0x01),
pad(0x01, wireless_bit, 0x06, 0x03),
pad(0x01, wireless_bit, 0x07, 0x03)]:
# One of these typically happens in reply to the first command of first connect after power-on.
# No idea what they mean, but the expected ack comes after them so *shrug*?
pass
elif expect and (inp not in expect):
print_without_trail('>>', out)
print_without_trail('<!', inp)
sys.exit('Unexpected response, try again or powercycle headset' +
(' and remove/insert dongle' if wireless_bit else ''))
else:
expect = None
inp = device.read(DATA_LENGTH, INTERVAL_MS)
if expect:
timeouts += 1
#print('Expected output not seen:')
#for e in expect:
# print_without_trail('??', e)
if __name__ == "__main__":
fixed_rgb = from_hex6(sys.argv[1]) if len(sys.argv) >= 2 else None
if len(sys.argv) > 2 or (len(sys.argv) == 2 and fixed_rgb is None):
sys.exit('Usage: %s [rrggbb]\nDefaults to rainbow if no colour specified' % sys.argv[0])
# Snoop key presses to light the LED if we're transmitting on ATC/Teamspeak
keyboard = Controller()
atc_pressed = False
ts_pressed = False
listener = Listener(on_press=lambda k: key_pressed(k, True), on_release=lambda k: key_pressed(k, False))
listener.start()
connected = True
while True:
try:
device = None
timeouts = 0
for (prod, w) in PRODUCTS_AND_WIRELESS_BIT:
e = hid.enumerate(VENDOR_CORSAIR, prod)
if len(e) >= 2:
d = hid.device()
d.open_path(e[1]['path'])
device = d
wireless_bit = w
break
if not device:
raise OSError()
if not connected: # it's a retry
time.sleep(5)
INTERVAL_MS = 300 if wireless_bit else 100
MUTE_PRESS = pad(0x03, wireless_bit, 0x02, 0x01)
MUTE_REL = pad(0x03, wireless_bit, 0x02, 0x00)
# Connect to ALSA; mute when the button isn't pressed
mute_sidetone(True)
# Clear default LED state and mute button behaviour, allowing custom control
communicate(0x01, 0x03, 0x00, 0x02, expect = [0x01]) # lights go off
communicate(0x0d, 0x00, 0x01, expect = [[0x0d], [0x0d, 0x03]]) # lights come back
if timeouts > 0:
raise OSError()
print('Headset connected')
connected = True
chg = "{0:b}".format(CHARGE_MAP)
while True:
if timeouts >= REOPEN_TIMEOUTS:
raise OSError()
check_double_tap_expiry()
check_battery()
idle = not atc_pressed and not ts_pressed and doubleTapState == DoubleTapState.IDLE
(logo_red, logo_green, logo_blue) = fixed_rgb or rainbow()
(charge_red, charge_green, charge_blue) = (0x00, int(chg[int(time.time() * 10/3) % len(chg)]) * 0xff, 0x00)
(mic_red, mic_green, mic_blue) = (0x80 if idle else 0x00,
0xff if atc_pressed else 0x00,
0xff if ts_pressed else 0x00)
communicate(0x06, 0x00, 0x09, 0x00, 0x00, 0x00,
logo_red, charge_red, mic_red,
logo_green, charge_green, mic_green,
logo_blue, charge_blue, mic_blue,
expect = [0x06])
except OSError:
if connected:
print('Headset not connected')
connected = False
if device is not None:
device.close()
device = None
time.sleep(REOPEN_SECS)
continue # reopen the device
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment