Skip to content

Instantly share code, notes, and snippets.

@mrbid
Last active January 26, 2026 07:25
Show Gist options
  • Select an option

  • Save mrbid/6ead544f56a752c2912327b6275ce46b to your computer and use it in GitHub Desktop.

Select an option

Save mrbid/6ead544f56a752c2912327b6275ce46b to your computer and use it in GitHub Desktop.
LMStudio IRC BOT

Exposes LMStudio over IRC using the LMStudio API Server

Created by Test_User

  • go.sh
rm llmbot.sock
xterm -e "python3 llm.py" > /dev/null 2>&1 &
xterm -e "python3 irc.py" > /dev/null 2>&1 &
  • irc.py
#!/usr/bin/env python3

import socket
import ssl
import threading
import time
import os
import sys

bot_nick = b"llmbot"
if len(sys.argv) > 1: bot_nick = sys.argv[1].encode()

channel = b"#llm"
prefix = b"!"
llm_socket_path="./llmbot.sock"

irc_ready = False
llm_ready = False

irc_socket = None
llm_socket = None

state_lock = threading.Lock()

sfi = os.path.join(os.path.dirname(__file__), "llmbot.sock")
if os.path.isfile(sfi): os.remove(sfi)

def llm_side():
	global state_lock

	global irc_ready
	global irc_socket

	global llm_ready
	global llm_socket

	main_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
	main_socket.bind(llm_socket_path)
	main_socket.listen(1)

	while True:
		tmp, _ = main_socket.accept()
		state_lock.acquire()
		llm_socket = tmp
		llm_ready = True
		state_lock.release()

		msg = b""
		while True:
			error = False
			while True:
				newmsg = llm_socket.recv(4096)

				if newmsg == b"":
					error = True
					break

				oldmsg = msg
				msg = msg + newmsg

				if (oldmsg[-1:-1] == b"\r" and newmsg[0:0] == b"\n") or newmsg.find(b"\r\n") != -1:
					break

			if error:
				break

			split_lines = msg.split(b"\r\n")
			msg = split_lines[-1]
			split_lines = split_lines[:-1]

			state_lock.acquire()
			if not irc_ready:
				state_lock.release()
				continue

			for line in split_lines:
				irc_socket.send(line + b"\r\n")
			state_lock.release()

		state_lock.acquire()
		llm_ready = False
		llm_socket.close()
		state_lock.release()

def irc_side():
	global state_lock

	global irc_ready
	global irc_socket

	global llm_ready
	global llm_socket

	state_lock.acquire()

	irc_socket = ssl.create_default_context().wrap_socket(socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname="xeroxirc.net")

	try:
		irc_socket.connect(("us.xeroxirc.net", 6697))
	except Exception:
		return

	irc_socket.sendall(b"USER ai * * :some sus bot\r\nNICK " + bot_nick + b"\r\n")

	irc_socket.settimeout(60)

	msg = b""
	while True:
		state_lock.release()

		fails = 0
		while True:
			try:
				newmsg = irc_socket.recv(4096)
			except Exception as e:
				if fails < 1:
					state_lock.acquire()
					irc_socket.sendall(b"PING e\r\n")
					state_lock.release()
				else:
					return
				fails = fails + 1

			if newmsg == b"":
				return

			oldmsg = msg
			msg = oldmsg + newmsg

			if (oldmsg[-1:-1] == b"\r" and newmsg[0:0] == b"\n") or newmsg.find(b"\r\n") != -1:
				break

		# last is partial
		split_lines = msg.split(b"\r\n")
		msg = split_lines[-1]
		split_lines = split_lines[:-1]

		state_lock.acquire()

		for line in split_lines:
			source = b""
			lastarg = None
			if line.startswith(b":"):
				source = line.split(b" ", maxsplit=1)[0][1:]
				line = line.split(b" ", maxsplit=1)[1]

			command = line.split(b" ", maxsplit=1)[0]
			line = line.split(b" ", maxsplit=1)[1]

			if line.startswith(b":"):
				line = b" " + line
			if len(line.split(b" :", maxsplit=1)) > 1:
				lastarg = line.split(b" :", maxsplit=1)[1]
				line = line.split(b" :")[0]

			args = [arg for arg in line.split(b" ") if arg != b""]
			if lastarg is not None:
				args.append(lastarg)

			if command == b"PING":
				if len(args) == 0:
					irc_socket.sendall(b"PONG\r\n")
				elif len(args) == 1:
					irc_socket.sendall(b"PONG :" + args[0] + b"\r\n")
				else:
					irc_socket.sendall(b"PONG " + args[1] + b" " + args[0] + b"\r\n")

			elif command == b"001":
				irc_ready = True
				irc_socket.sendall(b"JOIN " + channel + b"\r\n")

			elif command == b"PRIVMSG" and irc_ready:
				if len(args) < 2:
					continue

				if args[0] != channel:
					continue

				if not args[1].startswith(prefix):
					continue

				privmsg = args[1][len(prefix):]

				if llm_ready:
					llm_socket.sendall(privmsg + b"\n")

threading.Thread(target=llm_side).start()

last_time = time.time() - 60
while True:
	current_time = time.time()
	if last_time + 60 > current_time:
		time.sleep(last_time + 60 - current_time)
	last_time = current_time

	irc_side()

	state_lock.acquire()

	irc_ready = False
	irc_socket.close()

	state_lock.release()
  • llm.py
#!/usr/bin/env python3

import lmstudio as lms
import socket
import ssl
import time
import traceback

# ---------- IRC / LLM CONFIG ----------
channel = b"#llm"

# ---------- CHAT STATE (LM Studio Chat API) ----------
chat_mode_enabled = False # toggled via !chaton / !chatoff
chat_sessions = {}        # {channel_bytes: lms.Chat}

def get_chat(ch: bytes) -> "lms.Chat":
	"""Get or create a Chat session for the channel."""
	if ch not in chat_sessions:
		chat_sessions[ch] = lms.Chat("You are a helpful assistant.")
	return chat_sessions[ch]

def reset_chat(ch: bytes | None = None):
	"""Clear chat state for a channel (or all)."""
	if ch is None:
		chat_sessions.clear()
	else:
		chat_sessions.pop(ch, None)

def strip_llm_markup(raw: str) -> str:
	if "<|message|>" in raw:
		return raw.split("<|message|>")[-1].strip()
	return raw.strip()

# ---------- MODEL ----------
model = lms.llm()

def respond_stateful(ch: bytes, user_text: str) -> str:
	chat = get_chat(ch)
	chat.add_user_message(user_text)
	result = model.respond(chat)
	raw = getattr(result, "content", str(result))
	reply = strip_llm_markup(raw)
	chat.add_assistant_response(reply)
	return reply

def respond_stateless(user_text: str) -> str:
	result = model.respond(user_text)
	raw = getattr(result, "content", str(result))
	return strip_llm_markup(raw)

def run(): # goto or labeled continue would've been nice
	global chat_mode_enabled

	s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
	s.connect("./llmbot.sock")
	while True:
		msg = s.recv(4096)
		while not msg.endswith(b"\n"):
			newmsg = s.recv(4096)
			if newmsg == b"":
				s.close()
				return
			msg = msg + newmsg

		for line in [line for line in msg.split(b"\n") if line != b""]:
			payload = line.decode("utf-8", "surrogateescape")
			lower = payload.lower()

			# ----- Commands -----
			if lower == "chaton":
				chat_mode_enabled = True
				s.sendall(b"PRIVMSG " + channel + b" :Chat mode: ON.\r\n")
				continue

			if lower == "chatoff":
				chat_mode_enabled = False
				#reset_chat(channel)  # clear channel context
				s.sendall(b"PRIVMSG " + channel + b" :Chat mode: OFF.\r\n")
				continue

			if lower == "reset":
				# Delete current channel session, then immediately re-create a fresh one.
				reset_chat(channel)
				get_chat(channel)  # ensures a new chat object exists right away
				s.sendall(b"PRIVMSG " + channel + b" :Chat context reset and re-initialized for this channel.\r\n")
				continue

			# ----- Not a command: treat as prompt -----
			try:
				if chat_mode_enabled:
					reply = respond_stateful(channel, payload)
				else:
					reply = respond_stateless(payload)
			except Exception as e:
				tb = traceback.format_exc(limit=1)
				reply = f"\x0304Unexpected error: {e} | {tb}."

			for res in reply.replace("\r", "").split("\n"):
				s.sendall(b"PRIVMSG " + channel + b" :" + res.encode("utf-8", "surrogateescape") + b"\r\n")

while True:
	run() # >_>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment