Last active
January 24, 2025 19:17
-
-
Save lanmower/2c0328fce83eef3a2f7a0ee34106e024 to your computer and use it in GitHub Desktop.
apc25 driver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import time | |
import signal | |
import jack | |
from bisect import bisect | |
from copy import deepcopy | |
from functools import partial | |
import multiprocessing as mp | |
from threading import Thread, RLock, Event | |
import liblo | |
from threading import Timer | |
import os | |
from zynlibs.zynseq import zynseq | |
from zyncoder.zyncore import lib_zyncore | |
from zyngine.zynthian_signal_manager import zynsigman | |
from zynconf import ServerPort | |
from .zynthian_ctrldev_base import ( | |
zynthian_ctrldev_zynmixer, zynthian_ctrldev_zynpad | |
) | |
from .zynthian_ctrldev_base_extended import RunTimer, KnobSpeedControl, ButtonTimer, CONST | |
# SooperLooper State Codes | |
SL_STATE_UNKNOWN = -1 | |
SL_STATE_OFF = 0 | |
SL_STATE_WAIT_START = 1 | |
SL_STATE_RECORDING = 2 | |
SL_STATE_WAIT_STOP = 3 | |
SL_STATE_PLAYING = 4 | |
SL_STATE_OVERDUBBING = 5 | |
SL_STATE_MULTIPLYING = 6 | |
SL_STATE_INSERTING = 7 | |
SL_STATE_REPLACING = 8 | |
SL_STATE_DELAY = 9 | |
SL_STATE_MUTED = 10 | |
SL_STATE_SCRATCHING = 11 | |
SL_STATE_ONESHOT = 12 | |
SL_STATE_SUBSTITUTE = 13 | |
SL_STATE_PAUSED = 14 | |
# LED Colors | |
LED_OFF = 0 | |
LED_GREEN = 1 | |
LED_GREEN_BLINK = 2 | |
LED_RED = 3 | |
LED_RED_BLINK = 4 | |
LED_YELLOW = 5 | |
LED_YELLOW_BLINK = 6 | |
# Looper pad mappings - first column is mute, last column is status | |
MUTE_LIGHT_NOTES = [34, 26, 18, 10, 2] # Left column - mute status (same as mute_on buttons) | |
STATUS_LIGHT_NOTES = [39, 31, 23, 15, 7] # Right column - looper status | |
# SooperLooper action mappings for each row (top to bottom) | |
LOOPER_ACTIONS = { | |
0: { # First row (bottom) | |
36: 'reverse', | |
37: 'multiply', | |
38: 'overdub', | |
39: 'record', | |
34: 'mute_on', | |
35: 'mute_off' | |
}, | |
1: { # Second row | |
28: 'reverse', | |
29: 'multiply', | |
30: 'overdub', | |
31: 'record', | |
26: 'mute_on', | |
27: 'mute_off' | |
}, | |
2: { # Third row | |
20: 'reverse', | |
21: 'multiply', | |
22: 'overdub', | |
23: 'record', | |
18: 'mute_on', | |
19: 'mute_off' | |
}, | |
3: { # Fourth row | |
12: 'reverse', | |
13: 'multiply', | |
14: 'overdub', | |
15: 'record', | |
10: 'mute_on', | |
11: 'mute_off' | |
}, | |
4: { # Fifth row (top) | |
4: 'reverse', | |
5: 'multiply', | |
6: 'overdub', | |
7: 'record', | |
2: 'mute_on', | |
3: 'mute_off' | |
} | |
} | |
# APC Key25 buttons | |
BTN_SHIFT = 0x62 | |
BTN_STOP_ALL_CLIPS = 0x51 | |
BTN_PLAY = 0x5B | |
BTN_RECORD = 0x5D | |
BTN_SYNC_MIDI = 25 # Button to set sync source to MIDI | |
BTN_SYNC_LOOP1 = 33 # Button to set sync source to Loop 1 | |
BTN_PAD_START = 0x00 | |
BTN_PAD_END = 0x27 | |
# Button mappings | |
SNAPSHOT_NOTES = range(82, 88) # Notes 82-87 for snapshot selection | |
ZS3_TOGGLE_NOTES = [64, 65] # Special toggle pair | |
ZS3_REGULAR_NOTES = range(66, 72) # Regular ZS3 notes | |
ALL_ZS3_NOTES = list(ZS3_TOGGLE_NOTES) + list(ZS3_REGULAR_NOTES) | |
# LED states | |
LED_ON = 1 | |
class zynthian_ctrldev_akai_apc_key25(zynthian_ctrldev_zynmixer, zynthian_ctrldev_zynpad): | |
"""Zynthian Controller Device: Akai APC Key 25""" | |
_autoload = True | |
dev_ids = ["APC Key 25 MIDI 1", "APC Key 25 IN 1"] | |
SL_PORT = ServerPort["sooperlooper_osc"] | |
APC_OSC_PORT = 18796 | |
MAX_LOOPS = 5 | |
def __init__(self, state_manager=None, idev_in=None, idev_out=None, *args, **kwargs): | |
"""Initialize the controller with required parameters""" | |
print("\n=================================================================") | |
print("APC Key 25 __init__ starting...") | |
print(f"state_manager: {state_manager}") | |
print(f"idev_in: {idev_in}, type: {type(idev_in)}") | |
print(f"idev_out: {idev_out}, type: {type(idev_out)}") | |
# Call both parent class initializers explicitly | |
print("Calling parent class __init__...") | |
zynthian_ctrldev_zynmixer.__init__(self, state_manager, idev_in, idev_out) | |
zynthian_ctrldev_zynpad.__init__(self, state_manager, idev_in, idev_out) | |
print("Parent class __init__ completed") | |
print(f"self.idev_out after parent init: {self.idev_out}") | |
self._init_complete = False | |
self._shutting_down = False | |
self._leds = None | |
self.osc_server = None | |
self.osc_target = None | |
self._loop_states = {} | |
self._init_time = 0 | |
# Toggle state management for action buttons | |
self.toggle_states = {action: False for actions in LOOPER_ACTIONS.values() for action in actions.values()} | |
# Add state tracking for record and play | |
self._record_state = False | |
self._play_state = False | |
if self._leds is None: | |
print("Initializing LED controller...") | |
self._leds = FeedbackLEDs(self.idev_out) | |
print("LED controller initialized") | |
self.selected_snapshot = 0 # First snapshot (note 82) | |
self.selected_zs3_toggle = 0 # For 64/65 toggle state | |
self.selected_zs3_regular = 0 # For regular ZS3 selection | |
self.light_up_initial_state() | |
print("APC Key 25 __init__ completed") | |
print("=================================================================\n") | |
def init(self): | |
print("Starting APC Key 25 init sequence") | |
"""Initialize the device""" | |
if self._shutting_down: | |
print("Skipping initialization - device is shutting down") | |
return | |
self._init_complete = False | |
super().init() | |
if not self._shutting_down: | |
print("Initializing zynthian_ctrldev_akai_apc_key25...") | |
# Initialize LED controller | |
if self._leds is None: | |
print("Initializing LED controller...") | |
self._leds = FeedbackLEDs(self.idev_out) | |
print("LED controller initialized") | |
# Initialize OSC server | |
try: | |
print("Creating OSC server...") | |
self.osc_server = liblo.ServerThread() | |
self.osc_server_port = self.osc_server.get_port() | |
self.osc_server_url = f"osc.udp://localhost:{self.osc_server_port}" | |
print(f"OSC server initialized on port {self.osc_server_port}") | |
# Register OSC methods | |
print("Registering OSC methods...") | |
self.osc_server.add_method("/sl/0/down", 'sf', self._cb_osc_monitor) | |
self.osc_server.add_method("/sl/0/up", 'sf', self._cb_osc_monitor) | |
self.osc_server.add_method("/sl/-1/down", 'sf', self._cb_osc_monitor) | |
self.osc_server.add_method("/sl/-1/up", 'sf', self._cb_osc_monitor) | |
self.osc_server.add_method("/sl/0/state", 'f', self._cb_osc_state) | |
self.osc_server.add_method("/sl/0/next_state", 'f', self._cb_osc_next_state) | |
self.osc_server.add_method("/sl/0/waiting", 'f', self._cb_osc_waiting) | |
self.osc_server.add_method("/info", 'ssi', self._cb_osc_info) | |
self.osc_server.add_method("/monitor", 'isf', self._cb_osc_monitor) | |
self.osc_server.add_method(None, None, self._cb_osc_fallback) | |
# Start the OSC server | |
print("Starting OSC server...") | |
self.osc_server.start() | |
print("OSC server started successfully") | |
print("Attempting to connect to SooperLooper...") | |
# Start connection attempt timer | |
self._init_time = time.time() | |
self._try_connect_to_sooperlooper() | |
except liblo.ServerError as err: | |
print(f"Error initializing OSC: {err}") | |
self.osc_server = None | |
# After establishing connection to SooperLooper, query current sync source | |
if self.osc_target is not None: | |
# Query current sync source | |
self.osc_server.send(self.osc_target, "/get", "sync_source", | |
self.osc_server_url, "/sync_source") | |
# Add method to handle sync source response | |
self.osc_server.add_method("/sync_source", 'i', self._cb_sync_source) | |
def cleanup(self): | |
"""Clean up resources before shutting down""" | |
if self._shutting_down: | |
return | |
print("Shutting down APC Key 25...") | |
self._shutting_down = True | |
if self.osc_server: | |
try: | |
self.osc_server.stop() | |
self.osc_server = None | |
print("OSC server stopped") | |
except Exception as e: | |
print(f"Error stopping OSC server: {e}") | |
if self._leds: | |
self._leds.all_off() | |
print("All LEDs turned off") | |
super().cleanup() | |
print("APC Key 25 shutdown complete") | |
def end(self): | |
self._shutting_down = True | |
if self.osc_server: | |
self.osc_server.stop() | |
self.osc_server = None | |
if self._leds is not None: | |
try: | |
self._leds.all_off() | |
print("All LEDs turned off during end") | |
except Exception as e: | |
print(f"Warning: Error turning off LEDs: {e}") | |
super().end() | |
print("Device ended.") | |
def _try_connect_to_sooperlooper(self): | |
"""Attempt to connect to SooperLooper via OSC after initial delay""" | |
if self._init_complete: | |
return True | |
# Check if enough time has passed since initialization | |
elapsed = time.time() - self._init_time | |
if elapsed < 20: | |
print(f"Waiting for sooperlooper to start... ({elapsed:.1f}s)") | |
# Schedule next attempt | |
Timer(1.0, self._try_connect_to_sooperlooper).start() | |
return False | |
if self.osc_server is None: | |
print("OSC server not initialized") | |
return False | |
try: | |
print(f"Attempting to connect to SooperLooper on port {self.SL_PORT}...") | |
self.osc_target = liblo.Address(self.SL_PORT) | |
print("Successfully connected to SooperLooper via OSC") | |
# Configure global settings | |
print("Configuring global settings...") | |
self.osc_server.send(self.osc_target, "/set", "sync_source", -2) # Set sync source to MIDI | |
self.osc_server.send(self.osc_target, "/set", "eighth_per_cycle", 16) # Set cycle length | |
# Clear all loopers | |
self.osc_server.send(self.osc_target, '/sl/-1/hit', ('s', 'undo_all')) | |
self.osc_server.send(self.osc_target, "/sl/-1/hit", ('s', 'mute_off')) | |
print("Registering for automatic updates...") | |
self._register_auto_updates() | |
self._init_complete = True | |
return True | |
except Exception as e: | |
print(f"Failed to connect to SooperLooper: {e}") | |
# Retry after delay if still within reasonable time | |
if elapsed < 30: # Try for up to 30 seconds | |
print("Scheduling retry...") | |
Timer(2.0, self._try_connect_to_sooperlooper).start() | |
return False | |
def _setup_osc_server(self): | |
if self.osc_server is None: | |
print("OSC server not initialized") | |
return False | |
if self.osc_target is None: | |
try: | |
print(f"Attempting to connect to SooperLooper on port {self.SL_PORT}...") | |
self.osc_target = liblo.Address(self.SL_PORT) | |
print("Successfully connected to SooperLooper via OSC") | |
print("Registering for automatic updates...") | |
self._register_auto_updates() | |
return True | |
except Exception as e: | |
print(f"Failed to connect to SooperLooper: {e}") | |
return False | |
return True | |
def _cb_osc_monitor(self, path, args, types, src): | |
"""Callback for monitor messages from SooperLooper""" | |
try: | |
if len(args) == 3: # Handle the standard monitor message format | |
loop_num, param, value = args | |
print(f"Received monitor update: loop={loop_num} {param}={value}") | |
if param == "state": | |
# Update loop state | |
state = int(float(value)) | |
if loop_num not in self._loop_states: | |
self._loop_states[loop_num] = {} | |
self._loop_states[loop_num]['state'] = state | |
print(f"Updated state for loop {loop_num} to {state}") | |
self._update_loop_led(loop_num) | |
elif param == "mute": | |
# Update mute state | |
mute_state = int(float(value)) | |
if loop_num not in self._loop_states: | |
self._loop_states[loop_num] = {} | |
self._loop_states[loop_num]['mute'] = mute_state | |
print(f"Updated mute state for loop {loop_num} to {mute_state}") | |
# Update LED for mute state | |
if loop_num < len(MUTE_LIGHT_NOTES): | |
mute_note = MUTE_LIGHT_NOTES[loop_num] | |
self._leds.led_state(mute_note, LED_RED if mute_state else LED_GREEN) | |
elif param == "rate_output": | |
self._handle_rate_output(value) | |
elif len(args) == 2: # Handle the up/down message format | |
param, value = args | |
print(f"Received up/down update: {param}={value}") | |
# Handle up/down messages if needed | |
except Exception as e: | |
print(f"Error in monitor callback: {e}") | |
def _cb_osc_state(self, path, args): | |
"""Callback for state updates from SooperLooper""" | |
try: | |
if len(args) >= 1: | |
state = int(args[0]) | |
loop_num = int(path.split('/')[2]) # Extract loop number from path | |
print(f"Updating loop {loop_num} with state {state}") | |
self._loop_states[loop_num] = {'state': state} | |
self._update_loop_led(loop_num) | |
except Exception as e: | |
print(f"Error in state callback: {e}") | |
def _cb_osc_next_state(self, path, args): | |
"""Callback for next_state updates from SooperLooper""" | |
try: | |
if len(args) >= 1: | |
next_state = int(args[0]) | |
loop_num = int(path.split('/')[2]) | |
print(f"Setting next_state[{loop_num}] = {next_state}") | |
if loop_num in self._loop_states: | |
self._loop_states[loop_num]['next_state'] = next_state | |
else: | |
self._loop_states[loop_num] = {'next_state': next_state} | |
self._update_loop_led(loop_num) | |
except Exception as e: | |
print(f"Error in next_state callback: {e}") | |
def _cb_osc_waiting(self, path, args): | |
"""Callback for waiting updates from SooperLooper""" | |
try: | |
if len(args) >= 1: | |
waiting = int(args[0]) | |
loop_num = int(path.split('/')[2]) | |
print(f"Setting waiting[{loop_num}] = {waiting}") | |
if loop_num in self._loop_states: | |
self._loop_states[loop_num]['waiting'] = waiting | |
else: | |
self._loop_states[loop_num] = {'waiting': waiting} | |
self._update_loop_led(loop_num) | |
except Exception as e: | |
print(f"Error in waiting callback: {e}") | |
def _cb_osc_info(self, path, args): | |
"""Callback for info messages from SooperLooper""" | |
try: | |
if len(args) >= 3: | |
hosturl, version, loopcount = args[:3] | |
if loopcount > 0: | |
# Configure global settings | |
liblo.send(self.osc_target, "/set", "sync_source", -2) # Set sync source to MIDI | |
liblo.send(self.osc_target, "/set", "eighth_per_cycle", 16) # Set cycle length to 4 8ths per cycle | |
# Configure settings for all loops | |
liblo.send(self.osc_target, '/sl/-1/hit', ('s', 'undo_all')) # Clear all loopers | |
liblo.send(self.osc_target, "/sl/-1/hit", ('s', 'mute_off')) | |
liblo.send(self.osc_target, "/sl/-1/set", "quantize", 1) # Set quantize to cycle | |
liblo.send(self.osc_target, "/sl/-1/set", "playback_sync", 1) # Set playback_sync on | |
liblo.send(self.osc_target, "/sl/-1/set", "sync", 1) # Set sync on | |
liblo.send(self.osc_target, "/sl/-1/set", "relative_sync", 0) # Set rel_sync off | |
# Register for updates for each loop | |
for i in range(min(loopcount, self.MAX_LOOPS)): | |
self._register_loop_updates(i) | |
# Update loop count if changed | |
if loopcount != len(self._loop_states): | |
old_count = len(self._loop_states) | |
print(f"Loop count changed: {old_count}, new loop count: {loopcount}") | |
# Update controller labels if needed | |
labels = ['Internal', 'MidiClock', 'Jack/Host', 'None'] | |
for i in range(loopcount): | |
labels.append(f'Loop {i+1}') | |
print(f"Setting labels for controllers: {labels}") | |
# Register auto-updates for new loops | |
for i in range(old_count, loopcount): | |
print(f"Registering auto update for new loop {i}") | |
self._register_loop_updates(i) | |
except Exception as e: | |
print(f"Error in info callback: {e}") | |
def _cb_osc_fallback(self, path, args, types, src): | |
"""Fallback callback for unhandled OSC messages""" | |
print(f"Received unhandled OSC message: {path} {args}") | |
def _handle_rate_output(self, value): | |
"""Handle rate output changes""" | |
try: | |
if self.osc_server and self.osc_target: | |
self.osc_server.send(self.osc_target, "/sl/0/set", "rate", value) | |
elif not self.osc_target: | |
print("Cannot send rate change - SooperLooper connection not ready") | |
except Exception as e: | |
print(f"Error handling rate output: {e}") | |
def _update_loop_led(self, loop_num): | |
"""Update individual loop LED based on state""" | |
if self._leds is None: | |
print("LED controller not initialized, skipping LED update") | |
return | |
try: | |
# Get the mute and status light notes for this loop | |
if loop_num >= len(MUTE_LIGHT_NOTES): | |
print(f"Loop {loop_num} is out of LED range") | |
return | |
mute_note = MUTE_LIGHT_NOTES[loop_num] | |
status_note = STATUS_LIGHT_NOTES[loop_num] | |
state = self._loop_states[loop_num].get('state', SL_STATE_UNKNOWN) | |
mute_state = self._loop_states[loop_num].get('mute', 0) | |
print(f"Updating LEDs for loop {loop_num} (state={state}, mute={mute_state})") | |
print(f"Using mute_note={mute_note}, status_note={status_note}") | |
# Update mute button LED based on actual mute state from SooperLooper | |
if state == SL_STATE_MUTED: | |
self._leds.led_state(mute_note, LED_RED) # Red when muted | |
else: | |
self._leds.led_state(mute_note, LED_GREEN) # Green when unmuted | |
# Update status LED (right column) | |
if state == SL_STATE_MUTED: | |
print(f"Setting status LED {status_note} to RED_BLINK (muted)") | |
self._leds.led_state(status_note, LED_RED_BLINK) | |
elif state == SL_STATE_RECORDING: | |
print(f"Setting status LED {status_note} to RED (recording)") | |
self._leds.led_state(status_note, LED_RED) | |
elif state == SL_STATE_PLAYING: | |
print(f"Setting status LED {status_note} to GREEN (playing)") | |
self._leds.led_state(status_note, LED_GREEN) | |
elif state == SL_STATE_OVERDUBBING: | |
print(f"Setting status LED {status_note} to GREEN_BLINK (overdubbing)") | |
self._leds.led_state(status_note, LED_GREEN_BLINK) | |
elif state == SL_STATE_MULTIPLYING: | |
print(f"Setting status LED {status_note} to RED_BLINK (multiplying)") | |
self._leds.led_state(status_note, LED_RED_BLINK) | |
elif state == SL_STATE_PAUSED: | |
print(f"Setting status LED {status_note} to RED_BLINK (paused)") | |
self._leds.led_state(status_note, LED_RED_BLINK) | |
else: | |
print(f"Setting status LED {status_note} to YELLOW (other state: {state})") | |
self._leds.led_state(status_note, LED_YELLOW) | |
except Exception as e: | |
print(f"Error updating loop LED: {e}") | |
def _register_auto_updates(self): | |
"""Register for automatic updates from SooperLooper""" | |
try: | |
# Register for state updates for all loops | |
for i in range(self.MAX_LOOPS): | |
# Register for state and mute updates | |
self.osc_server.send(self.osc_target, f"/sl/{i}/register_auto_update", | |
('s', 'state'), ('i', 250), ('s', self.osc_server_url), ('s', '/monitor')) | |
self.osc_server.send(self.osc_target, f'/sl/{i}/register_auto_update', | |
('s', 'mute'), ('i', 250), ('s', self.osc_server_url), ('s', '/monitor')) | |
# Request initial loop count | |
self.osc_server.send(self.osc_target, '/ping', self.osc_server_url, '/info') | |
except Exception as e: | |
print(f"Error registering for updates: {e}") | |
def _register_loop_updates(self, loop_num): | |
"""Register for automatic updates for a specific loop""" | |
try: | |
if self.osc_server and self.osc_target: | |
# Configure sync settings for this loop | |
self.osc_server.send(self.osc_target, f"/sl/{loop_num}/set", "playback_sync", 1) | |
self.osc_server.send(self.osc_target, f"/sl/{loop_num}/set", "sync", 1) | |
self.osc_server.send(self.osc_target, f"/sl/{loop_num}/set", "quantize", 1) | |
self.osc_server.send(self.osc_target, f"/sl/{loop_num}/set", "relative_sync", 0) | |
print(f"Configured sync settings for loop {loop_num}") | |
# Register for state updates | |
self.osc_server.send(self.osc_target, | |
f"/sl/{loop_num}/register_auto_update", "state", 250) | |
# Register for next_state updates | |
self.osc_server.send(self.osc_target, | |
f"/sl/{loop_num}/register_auto_update", "next_state", 250) | |
# Register for waiting updates | |
self.osc_server.send(self.osc_target, | |
f"/sl/{loop_num}/register_auto_update", "waiting", 250) | |
# Register for mute updates | |
self.osc_server.send(self.osc_target, | |
f"/sl/{loop_num}/register_auto_update", "mute", 250) | |
except Exception as e: | |
print(f"Error registering updates for loop {loop_num}: {e}") | |
def update_loop_states(self): | |
"""Update loop states using OSC communication""" | |
if not self._try_connect_to_sooperlooper(): | |
print("No SooperLooper connection available") | |
return | |
try: | |
# Request current states for all loops | |
for i in range(self.MAX_LOOPS): | |
self.osc_server.send(self.osc_target, f'/sl/{i}/get', | |
'state', self.osc_server_url, '/monitor') | |
except Exception as e: | |
print(f"Error updating loop states: {e}") | |
def send_sooperlooper_action(self, action, row): | |
"""Send an action to SooperLooper for a specific row""" | |
try: | |
if self.osc_server and self.osc_target: | |
loop_num = row | |
if loop_num < self.MAX_LOOPS: | |
# Special handling for mute actions | |
if action in ['mute_on', 'mute_off']: | |
# Just send mute hit command to toggle mute state | |
self.osc_server.send(self.osc_target, f'/sl/{loop_num}/hit', ('s', 'mute')) | |
print(f"Sent mute toggle for loop {loop_num}") | |
# Request current mute state | |
self.osc_server.send(self.osc_target, f'/sl/{loop_num}/get', | |
('s', 'mute'), ('s', self.osc_server_url), ('s', '/monitor')) | |
else: | |
self.osc_server.send(self.osc_target, f'/sl/{loop_num}/hit', ('s', action)) | |
print(f"Sent action '{action}' to loop {loop_num}") | |
elif not self.osc_target: | |
print(f"Cannot send action '{action}' - SooperLooper connection not ready") | |
except Exception as e: | |
print(f"Error sending SooperLooper action: {e}") | |
def light_up_initial_state(self): | |
"""Set initial LED states""" | |
# Turn off all buttons first | |
for note in SNAPSHOT_NOTES: | |
self._leds.led_state(note, LED_OFF) | |
for note in ALL_ZS3_NOTES: | |
self._leds.led_state(note, LED_OFF) | |
# Light up first button in each section | |
self._leds.led_state(82, LED_ON) # First snapshot | |
self._leds.led_state(64, LED_ON) # First toggle state | |
self._leds.led_state(66, LED_ON) # First regular zs3 | |
def handle_snapshot_selection(self, note): | |
"""Handle snapshot button press (notes 82-86)""" | |
snapshot_index = note - 82 | |
self.selected_snapshot = snapshot_index | |
# Update snapshot LEDs - only selected one is lit | |
for n in SNAPSHOT_NOTES: | |
self._leds.led_state(n, LED_ON if n == note else LED_OFF) | |
# Keep current zs3 LED lit | |
self._leds.led_state(64 + self.selected_zs3_toggle, LED_ON) | |
# Load the snapshot | |
self.load_snapshot(snapshot_index) | |
def handle_zs3_selection(self, note): | |
"""Handle zs3 button press with separate toggle and regular functionality""" | |
if note in ZS3_TOGGLE_NOTES: | |
# Handle toggle pair (64/65) | |
if note == 64: | |
return # Ignore note 64 presses | |
# Toggle between 0 and 1 when pressing note 65 | |
self.selected_zs3_toggle = 1 if self.selected_zs3_toggle == 0 else 0 | |
# Update toggle LEDs | |
self._leds.led_state(64, LED_ON if self.selected_zs3_toggle == 0 else LED_OFF) | |
self._leds.led_state(65, LED_ON if self.selected_zs3_toggle == 1 else LED_OFF) | |
# Load the toggle zs3 subsnap | |
self.load_zs3_snapshot(self.selected_zs3_toggle) | |
elif note in ZS3_REGULAR_NOTES: | |
# Handle regular ZS3 notes (66-71) | |
self.selected_zs3_regular = note - 66 # Convert to 0-based index | |
# Update regular ZS3 LEDs | |
for n in ZS3_REGULAR_NOTES: | |
self._leds.led_state(n, LED_ON if n == note else LED_OFF) | |
# Keep current toggle LED state | |
toggle_note = 65 if self.selected_zs3_toggle == 1 else 64 | |
self._leds.led_state(toggle_note, LED_ON) | |
# Load the regular zs3 subsnap (offset by 2 to account for toggle states) | |
self.load_zs3_snapshot(self.selected_zs3_regular + 2) | |
# Keep current snapshot LED lit | |
self._leds.led_state(82 + self.selected_snapshot, LED_ON) | |
def handle_control_change(self, control, value): | |
"""Handle MIDI control change messages""" | |
pass | |
def midi_event(self, event): | |
"""Handle MIDI input for snapshot and zs3 selection""" | |
evtype = (event[0] >> 4) & 0x0F | |
note = event[1] & 0x7F | |
velocity = event[2] & 0x7F | |
channel = event[0] & 0x0F | |
# Only process Note On messages with non-zero velocity on channel 1 | |
if evtype != CONST.MIDI_NOTE_ON or channel != 0 or velocity == 0: | |
return super().midi_event(event) | |
# Add sync source control handling | |
if note == BTN_SYNC_MIDI: | |
if self.osc_target is not None: | |
print("Setting sync source to MIDI") | |
self.osc_server.send(self.osc_target, "/set", "sync_source", -2) # -2 is MIDI sync source | |
# Ensure quantize is set to cycle for all loops | |
self.osc_server.send(self.osc_target, "/sl/-1/set", "quantize", 1) | |
# Update LEDs to show current state | |
self._leds.led_state(BTN_SYNC_MIDI, LED_GREEN) | |
self._leds.led_state(BTN_SYNC_LOOP1, LED_OFF) | |
return True | |
elif note == BTN_SYNC_LOOP1: | |
if self.osc_target is not None: | |
print("Setting sync source to Loop 1") | |
self.osc_server.send(self.osc_target, "/set", "sync_source", 1) # 0 is first loop | |
# Ensure quantize is set to cycle for all loops | |
self.osc_server.send(self.osc_target, "/sl/-1/set", "quantize", 1) | |
# Update LEDs to show current state | |
self._leds.led_state(BTN_SYNC_MIDI, LED_OFF) | |
self._leds.led_state(BTN_SYNC_LOOP1, LED_GREEN) | |
return True | |
# Handle snapshot selection (notes 82-86) first | |
if note in SNAPSHOT_NOTES: | |
self.handle_snapshot_selection(note) | |
return True | |
# Handle zs3 selection (notes 64-71) next | |
if note in ALL_ZS3_NOTES: | |
self.handle_zs3_selection(note) | |
return True | |
found_action = False | |
print(f"Debug: Starting action search for note {note}") | |
for row, actions in LOOPER_ACTIONS.items(): | |
print(f"Debug: Checking row {row} with actions {actions}") | |
if note in actions: | |
action = actions[note] | |
print(f"Debug: Found row action: {action} for row {row}") | |
self.send_sooperlooper_action(action, row) | |
print(f"Debug: Sent action '{action}' for row {row}") | |
found_action = True | |
break | |
if not found_action: | |
print(f"Debug: No action found for note {note}") | |
# Handle system restart | |
if note == 0: | |
print("Triggering system restart...") | |
if self.osc_target is not None: | |
self.osc_server.send(self.osc_target, '/sl/-1/hit', ('s', 'mute_on')) # Mute all loops before restart | |
# Flash LED briefly then turn off | |
self._leds.led_state(0, LED_RED) | |
time.sleep(0.5) # Brief delay | |
self._leds.led_state(0, LED_OFF) | |
os.system("systemctl restart zynthian") | |
return True | |
# Handle system reboot | |
if note == 8: | |
print("Triggering system reboot...") | |
if self.osc_target is not None: | |
self.osc_server.send(self.osc_target, '/sl/-1/hit', ('s', 'mute_on')) # Mute all loops before reboot | |
# Flash LED briefly then turn off | |
self._leds.led_state(8, LED_RED) | |
time.sleep(0.5) # Brief delay | |
self._leds.led_state(8, LED_OFF) | |
os.system("systemctl reboot") | |
return True | |
# Handle special buttons | |
if note == BTN_RECORD: | |
self.state_manager.send_cuia("TOGGLE_RECORD") | |
self._record_state = not self._record_state | |
self._leds.led_state(0, LED_RED if self._record_state else LED_OFF) | |
return True | |
elif note == BTN_PLAY: | |
self.state_manager.send_cuia("TOGGLE_PLAY") | |
self._play_state = not self._play_state | |
self._leds.led_state(1, LED_GREEN if self._play_state else LED_OFF) | |
return True | |
elif note == BTN_STOP_ALL_CLIPS: | |
if self.osc_target is not None: | |
self.osc_server.send(self.osc_target, '/sl/-1/hit', ('s', 'undo_all')) # Clear all loopers | |
else: | |
print("Cannot send OSC message - SooperLooper connection not ready") | |
return True | |
# If we handled a looper action, return True | |
if found_action: | |
return True | |
# If we didn't handle the event, let the parent class try | |
return super().midi_event(event) | |
def load_snapshot(self, snapshot_index): | |
"""Load a main snapshot""" | |
try: | |
print(f"Attempting to load snapshot {snapshot_index}") | |
# Set the bank to 000 (default bank) if not already set | |
if self.state_manager.snapshot_bank is None: | |
print("No bank set, setting to bank 0") | |
self.state_manager.set_snapshot_midi_bank(0) | |
print(f"Current bank: {self.state_manager.snapshot_bank}") | |
print(f"Loading snapshot with index: {snapshot_index}") | |
self.state_manager.load_snapshot_by_prog(snapshot_index) | |
return True | |
except Exception as e: | |
print(f"Error loading snapshot {snapshot_index}: {str(e)}") | |
print(f"State manager: {self.state_manager}") | |
return False | |
def load_zs3_snapshot(self, zs3_index): | |
"""Load a zs3 subsnapshot""" | |
try: | |
zs3_id = f"zs3-{zs3_index+1}" | |
zs3_state = self.state_manager.zs3.get(zs3_id) | |
if zs3_state is None: | |
print(f"ZS3 snapshot {zs3_id} not found") | |
return False | |
self.state_manager.load_zs3(zs3_state) | |
return True | |
except Exception as e: | |
print(f"Error loading zs3 snapshot {zs3_index}: {str(e)}") | |
return False | |
def refresh(self): | |
"""Refresh device state""" | |
if self._try_connect_to_sooperlooper(): | |
self.update_loop_states() | |
def _cb_sync_source(self, path, args): | |
"""Callback for sync source updates""" | |
try: | |
if len(args) >= 1: | |
sync_source = args[0] | |
# Update LEDs based on current sync source | |
if sync_source == -2: # MIDI | |
self._leds.led_state(BTN_SYNC_MIDI, LED_GREEN) | |
self._leds.led_state(BTN_SYNC_LOOP1, LED_OFF) | |
elif sync_source == 1: # Loop 1 | |
self._leds.led_state(BTN_SYNC_MIDI, LED_OFF) | |
self._leds.led_state(BTN_SYNC_LOOP1, LED_GREEN) | |
except Exception as e: | |
print(f"Error in sync source callback: {e}") | |
class FeedbackLEDs: | |
def __init__(self, idev): | |
print("Initializing FeedbackLEDs class") | |
self._idev = idev | |
self._state = {} | |
self._timer = RunTimer() | |
print("FeedbackLEDs initialization complete") | |
print("Initialized FeedbackLEDs.") | |
def all_off(self): | |
print("Turning all control LEDs off.") | |
self.control_leds_off() | |
# Removed the call to pad_leds_off() since it does not exist | |
def control_leds_off(self): | |
buttons = [ | |
BTN_STOP_ALL_CLIPS, BTN_PLAY, BTN_RECORD, | |
] | |
for btn in buttons: | |
print(f"Turning off control LED: {btn}") | |
self.led_state(btn, 0) | |
def led_state(self, led, color): | |
print(f"Turning on LED: {led}, color: {color}") | |
print(f"MIDI device status - idev: {self._idev}, type: {type(self._idev)}") | |
if self._idev is None: | |
print("MIDI output device is None!") | |
return | |
self._timer.remove(led) | |
try: | |
print(f"Sending MIDI note_on message - channel: 0, note: {led}, velocity: {color}") | |
lib_zyncore.dev_send_note_on(self._idev, 0, led, color) | |
print(f"Successfully sent MIDI message for LED {led}") | |
except Exception as e: | |
print(f"Failed to send MIDI message: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment