Skip to content

Instantly share code, notes, and snippets.

@Ramblurr
Created July 17, 2025 16:12
Show Gist options
  • Save Ramblurr/46bac043f2d0fc52f17dbc6055e5093e to your computer and use it in GitHub Desktop.
Save Ramblurr/46bac043f2d0fc52f17dbc6055e5093e to your computer and use it in GitHub Desktop.
fake systemd notify socket listener with proper barrier support
#!/usr/bin/env python3
# sd-notify-listener.py - fake systemd notify socket listener with proper barrier support
# this represents the outer process waiting for the ready signal, usually this role is played by systemd
# but if you are trying to manage a process that uses sd_notify/systemd-notify and you aren't using systemd... well here you go
import socket
import os
import struct
import array
sock_path = "/tmp/fake-notify.sock"
# Clean up old socket
try:
os.unlink(sock_path)
except:
pass
# Create datagram socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind(sock_path)
print(f"Listening on {sock_path}...")
while True:
# Use recvmsg to receive both message data and ancillary data (file descriptors)
# Buffer for the message data
bufsize = 4096
# Buffer for ancillary data (file descriptors)
# We expect at most one file descriptor
ancbufsize = socket.CMSG_SPACE(struct.calcsize("i"))
try:
msg, ancdata, flags, addr = sock.recvmsg(bufsize, ancbufsize)
except Exception as e:
print(f"Error receiving message: {e}")
continue
msg_str = msg.decode('utf-8')
print(f"Received: {msg_str.strip()}")
# Check for ready signal
if "READY=1" in msg_str:
pass # do something
# Handle BARRIER=1 properly according to sd_notify protocol
if "BARRIER=1" in msg_str:
# Extract file descriptor from ancillary data
for cmsg_level, cmsg_type, cmsg_data in ancdata:
if cmsg_level == socket.SOL_SOCKET and cmsg_type == socket.SCM_RIGHTS:
# Extract the file descriptor
fd_array = array.array("i")
fd_array.frombytes(cmsg_data)
for fd in fd_array:
print(f"Received file descriptor {fd} with BARRIER=1, closing it to acknowledge")
# Close the fd immediately to signal barrier acknowledgment
os.close(fd)
print("Barrier acknowledged by closing fd")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment