Skip to content

Instantly share code, notes, and snippets.

@Moraxyc
Created June 2, 2025 05:49
Show Gist options
  • Save Moraxyc/8280f6eb69e88a3bb9c1b360ed6bf12e to your computer and use it in GitHub Desktop.
Save Moraxyc/8280f6eb69e88a3bb9c1b360ed6bf12e to your computer and use it in GitHub Desktop.
Pinching with five fingers to activate Niri Overview
import json
import logging
import os
import signal
import socket
import sys
import time
from select import select
from evdev import InputDevice, ecodes, list_devices
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
exit_flag = False
def signal_handler(signum, _):
global exit_flag
logger.info(f"Received signal {signum}, exiting...")
exit_flag = True
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def send_niri_ipc_request(request: dict):
socket_path = os.environ.get("NIRI_SOCKET")
if not socket_path:
raise EnvironmentError("Environment variable NIRI_SOCKET is not set")
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(socket_path)
client.sendall((json.dumps(request) + "\n").encode("utf-8"))
response = b""
while True:
chunk = client.recv(4096)
if not chunk or b"\n" in chunk:
response += chunk
break
response += chunk
return json.loads(response.decode("utf-8").strip())
except (socket.error, json.JSONDecodeError) as e:
logger.error(f"IPC request failed: {e}")
return None
def get_touch_devices():
devices = [InputDevice(path) for path in list_devices()]
return [
dev
for dev in devices
if ecodes.EV_ABS in dev.capabilities() and
ecodes.ABS_MT_POSITION_X in dict(dev.capabilities()[ecodes.EV_ABS])
]
def fingerprint_touch_devices():
return sorted(list_devices())
def restart_self():
logger.info("Input device list changed, restarting self...")
os.execv(sys.executable, [sys.executable] + sys.argv)
def calc_spread(slot_positions: dict[int, list[int]]) -> int:
xs = [pos[0] for pos in slot_positions.values()]
ys = [pos[1] for pos in slot_positions.values()]
return (max(xs) - min(xs)) + (max(ys) - min(ys))
def handle_events(devices, previous_fingerprint):
fds = {dev.fd: dev for dev in devices}
slot_positions = {}
current_slot = 0
initial_spread = None
last_check_time = 0
last_device_check = time.time()
try:
while not exit_flag:
if time.time() - last_device_check > 3:
current_fingerprint = fingerprint_touch_devices()
if current_fingerprint != previous_fingerprint:
restart_self()
last_device_check = time.time()
r, _, _ = select(fds.keys(), [], [], 0.03)
for fd in r:
dev = fds[fd]
try:
for event in dev.read():
if event.type == ecodes.EV_ABS:
if event.code == ecodes.ABS_MT_SLOT:
current_slot = event.value
elif event.code == ecodes.ABS_MT_POSITION_X:
slot_positions.setdefault(current_slot, [0, 0])[
0
] = event.value
elif event.code == ecodes.ABS_MT_POSITION_Y:
slot_positions.setdefault(current_slot, [0, 0])[
1
] = event.value
elif (
event.code == ecodes.ABS_MT_TRACKING_ID and
event.value == -1
):
slot_positions.pop(current_slot, None)
except Exception as e:
logger.warning(f"Failed to read from device: {e}")
now = time.time()
if len(slot_positions) == 5:
spread = calc_spread(slot_positions)
if initial_spread is None:
initial_spread = spread
last_check_time = now
elif now - last_check_time > 0.1:
delta = spread - initial_spread
logger.debug(f"Detected delta: {delta}")
if delta < -150:
logger.info("Pinch-in gesture detected")
send_niri_ipc_request({"Action": {"OpenOverview": {}}})
elif delta > 200:
logger.info("Pinch-out gesture detected")
send_niri_ipc_request({"Action": {"CloseOverview": {}}})
initial_spread = None
slot_positions.clear()
else:
initial_spread = None
finally:
logger.info("Cleaning up devices")
for dev in devices:
dev.close()
if __name__ == "__main__":
devices = get_touch_devices()
if not devices:
logger.error("No touch devices detected")
sys.exit(1)
fingerprint = fingerprint_touch_devices()
handle_events(devices, fingerprint)
@Moraxyc
Copy link
Author

Moraxyc commented Jun 2, 2025

Home Manager users can use the following code to create a systemd user service.

{ lib, pkgs, ... }:
let
  script =
    pkgs.writers.writePython3Bin "pinching-five-fingers-niri-overview"
      {
        libraries = with pkgs.python3Packages; [ evdev ];
        flakeIgnore = [
          "E501"
          "W504"
        ];
      }
      ''
        # TODO: Replace me to the script
      '';
in
{
  systemd.user.services = {
    pinching-five-fingers-niri-overview = {
      Unit = {
        After = [ "graphical-session.target" ];
      };
      Install.WantedBy = [ "graphical-session.target" ];

      Service = {
        ExecStart = lib.getExe script;
        Restart = "on-failure";
        RestartSec = 1;
      };
    };
  };
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment