Skip to content

Instantly share code, notes, and snippets.

@MunifTanjim
Created May 28, 2025 13:57
Show Gist options
  • Save MunifTanjim/c9cc0263a0cef85b7c441d28a3083a8f to your computer and use it in GitHub Desktop.
Save MunifTanjim/c9cc0263a0cef85b7c441d28a3083a8f to your computer and use it in GitHub Desktop.
RTN over unix socket
async function process() {
try {
await using helper = new PythonHelper("dmm-hashlist-sync");
using py = await helper.client();
const res = await py.send({ cmd: "parse_torrent_title", data: [{ hash: '', title: '' }] });
// ...
} catch (err) {
console.error(err);
throw err;
}
}
import { resolve as resolvePath } from "node:path";
import fs from "node:fs/promises";
import net from "node:net";
import { spawn, ChildProcessWithoutNullStreams } from "node:child_process";
import { socketExists } from "./fs";
export type ParsedTorrent = {
raw_title: string;
parsed_title: string;
normalized_title: string;
trash: boolean;
adult: boolean;
year: number | null;
resolution: string | null;
seasons: number[];
episodes: number[];
complete: boolean;
volumes: string[];
languages: string[];
quality: string | null;
hdr: string[];
codec: string | null;
audio: string[];
channels: string[];
dubbed: boolean;
subbed: boolean;
date: string | null;
group: string | null;
edition: string | null;
bit_depth: string | null;
bitrate: string | null;
network: string | null;
extended: boolean;
converted: boolean;
hardcoded: boolean;
region: string | null;
ppv: boolean;
site: string | null;
size: string | null;
proper: boolean;
repack: boolean;
retail: boolean;
upscaled: boolean;
remastered: boolean;
unrated: boolean;
documentary: boolean;
episode_code: string | null;
country: string | null;
container: string | null;
extension: string | null;
extras: string[];
torrent: boolean;
scene: boolean;
type: "movie" | "show";
};
type PayloadByCommand = {
print: unknown;
shutdown: unknown;
parse_torrent_title: Array<{ hash: string; title: string }>;
};
type ResponseByCommand = {
print: void;
shutdown: void;
parse_torrent_title: { items: Array<{ hash: string; data: ParsedTorrent }> };
};
class SocketClient<
Payload extends Record<string, unknown>,
Response extends Record<keyof Payload, unknown>,
> implements Disposable
{
path: string;
#conn: net.Socket | null = null;
#decoder: TextDecoder;
#encoder: TextEncoder;
constructor(path: string) {
this.path = path;
this.#decoder = new TextDecoder();
this.#encoder = new TextEncoder();
}
[Symbol.dispose]() {
this.conn.destroy();
}
async connect() {
if (!this.#conn) {
this.#conn = net.createConnection(this.path);
}
return this;
}
get conn() {
if (!this.#conn) {
throw new Error("not connected yet");
}
return this.#conn;
}
async send<C extends keyof Payload>(payload: {
cmd: C;
data: NoInfer<Payload[C]>;
}): Promise<Response[C] | null> {
const write = async () => {
const req = this.#encoder.encode(JSON.stringify(payload) + "\r\n\r\n");
await new Promise<void>((resolve, reject) => {
this.conn.write(req, (err) => {
return err ? reject(err) : resolve();
});
});
};
const read = async () => {
const res = await new Promise<string>((resolve, reject) => {
let res = "";
const onData = (data: Buffer<ArrayBufferLike>) => {
res += this.#decoder.decode(data);
if (res.endsWith("\r\n\r\n")) {
res = res.slice(0, -4);
this.conn.off("data", onData);
this.conn.off("error", onError);
resolve(res);
}
};
this.conn.on("data", onData);
const onError = (err: Error) => {
this.conn.off("data", onData);
this.conn.off("error", onError);
reject(err);
};
this.conn.on("error", onError);
});
if (!res) {
return null;
}
const result = JSON.parse(res);
if ("error" in result) {
throw new Error(JSON.stringify(result.error));
}
return result.data;
};
const [_, res] = await Promise.all([write(), read()]);
return res;
}
}
const process_by_id: Record<string, ChildProcessWithoutNullStreams> = {};
export class PythonHelper implements AsyncDisposable {
#id: string;
#socket_path: string;
constructor(id: string) {
this.#id = id;
this.#socket_path = resolvePath("./python", `${id}.sock`);
if (process_by_id[this.#id]) {
throw new Error(`already in use: ${this.#id}`);
}
}
async [Symbol.asyncDispose]() {
if (!process_by_id[this.#id]) {
return;
}
using c = await this.client();
await c.send({ cmd: "shutdown", data: null });
process_by_id[this.#id].kill();
delete process_by_id[this.#id];
}
#start = async () => {
if (process_by_id[this.#id]) {
return;
}
if (await socketExists(this.#socket_path)) {
await fs.rm(this.#socket_path);
}
process_by_id[this.#id] = spawn(
"poetry",
["run", "python", "-m", "server", this.#socket_path],
{ cwd: "./python" },
);
while (!(await socketExists(this.#socket_path))) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
};
client = async () => {
await this.#start();
const c = new SocketClient<PayloadByCommand, ResponseByCommand>(
this.#socket_path,
);
return await c.connect();
};
}
import asyncio
import os
import traceback
from json import JSONDecodeError
from signal import Signals
from sys import argv
from RTN import ParsedData, parse
from RTN.models import json
class SocketServer:
socket_path: str
def __init__(self, socket_path: str):
if os.path.exists(socket_path):
raise Exception("socket already exists")
self.socket_path = socket_path
self.loop = asyncio.get_event_loop()
for s in (Signals.SIGQUIT, Signals.SIGTERM, Signals.SIGINT):
self.loop.add_signal_handler(
s, lambda s=s: asyncio.create_task(self.shutdown(signal=s))
)
self.loop.set_exception_handler(lambda loop, context: self.shutdown())
async def shutdown(self, signal: Signals | None = None):
if signal:
print(f"received exit signal: {signal.name}")
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
tasks = [
task
for task in asyncio.all_tasks(self.loop)
if task is not asyncio.current_task(self.loop)
]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def cmd_shutdown(self):
await self.shutdown()
async def cmd_print(self, data):
print(data)
async def cmd_parse_torrent_title(self, data):
if not isinstance(data, list):
return {
"error": {
"message": "invalid data",
"code": "BAD_REQUEST",
"status_code": 400,
}
}
items = data
res_items = []
for item in items:
result: ParsedData = parse(item["title"])
data = result.model_dump()
data["type"] = result.type
res_items.append(
{
"hash": item["hash"],
"data": data,
}
)
return {
"data": {
"items": res_items,
}
}
async def handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
try:
while True:
data = bytes()
while True:
chunk = await reader.read(1024)
if not chunk:
break
data += chunk
if data.endswith(b"\r\n\r\n"):
data = data[:-4]
break
if not data:
break
res = ""
try:
input = json.loads(data.decode("utf-8"))
cmd = input["cmd"]
if input == "shutdown":
await self.cmd_shutdown()
break
input_data = input["data"]
if cmd == "print":
res = await self.cmd_print(input_data)
if cmd == "parse_torrent_title":
res = await self.cmd_parse_torrent_title(input_data)
except Exception as e:
traceback.print_exception(e)
if isinstance(e, JSONDecodeError) or isinstance(e, TypeError):
res = {
"error": {
"message": "invalid data",
"code": "BAD_REQUEST",
"status_code": 400,
}
}
else:
res = {
"error": {
"message": "Internal Server Error",
"code": "INTERNAL_SERVER_ERROR",
"status_code": 500,
}
}
if not res:
res = ""
elif isinstance(res, dict):
res = json.dumps(res)
writer.write((res + "\r\n\r\n").encode("utf-8"))
await writer.drain()
except asyncio.CancelledError:
pass
finally:
writer.close()
await writer.wait_closed()
async def __start(self):
socket = await asyncio.start_unix_server(self.handle_client, self.socket_path)
await socket.serve_forever()
def start(self):
try:
self.loop.create_task(self.__start())
self.loop.run_forever()
finally:
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
def run_socket(socket_path: str = "buddy.sock"):
server = SocketServer(socket_path)
server.start()
if __name__ == "__main__":
run_socket(argv[1] if len(argv) > 1 else "buddy.sock")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment