Skip to content

Instantly share code, notes, and snippets.

@un1tz3r0
Created June 8, 2026 14:17
Show Gist options
  • Select an option

  • Save un1tz3r0/3e0025e70c1c3f1657a4ff34e78757fa to your computer and use it in GitHub Desktop.

Select an option

Save un1tz3r0/3e0025e70c1c3f1657a4ff34e78757fa to your computer and use it in GitHub Desktop.
Python Ollama Scan
"""
This script reads the output file written by [this script](https://gist.github.com/un1tz3r0/8ebd1295aa3a9b85eb65ece1d226b3a8),
and then attempts to connect and query the models available on each host found by the leakix.net service port search.
It writes the results to a file scan_results.jsonl in the current directory.
Depending on where you live, this may be illegal, and is certainly frowned upon by many.
It is my stern reccomendation that under no circumstances should you or anyone else use this, for any purpose.
I take no responsibility for any consequences of disregarding this warning. This purely informative content,
and is furnished as-is with no warranty or license of any kind. Please don't be a dick, it's just not polite.
- V.M.O.C., June 8th, 2026
"""
import asyncio
import json
import time
from datetime import datetime, timezone
from pathlib import Path
import aiofiles
import aiohttp
import click
from rich.console import Group
from rich.live import Live
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
class WorkerState:
__slots__ = ("wid", "host", "stage", "started", "last_result")
def __init__(self, wid: int) -> None:
self.wid = wid
self.host = ""
self.stage = "idle"
self.started: float | None = None
self.last_result = ""
def begin(self, host: str) -> None:
self.host = host
self.stage = "connect"
self.started = time.monotonic()
def to(self, stage: str) -> None:
self.stage = stage
def finish(self, summary: str) -> None:
self.stage = "idle"
self.host = ""
self.started = None
self.last_result = summary
async def call_endpoint(
session: aiohttp.ClientSession,
url: str,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.get(url, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
async def post_endpoint(
session: aiohttp.ClientSession,
url: str,
payload: dict,
timeout: aiohttp.ClientTimeout,
) -> tuple[object | None, int]:
t0 = time.monotonic()
result: object | None = None
try:
async with session.post(url, json=payload, timeout=timeout) as r:
if r.status == 200:
try:
result = await r.json(content_type=None)
except (json.JSONDecodeError, aiohttp.ContentTypeError, UnicodeDecodeError):
result = None
except (aiohttp.ClientError, asyncio.TimeoutError, OSError):
result = None
elapsed_ms = int((time.monotonic() - t0) * 1000)
return result, elapsed_ms
def extract_model_names(result: object | None) -> list[str]:
if not isinstance(result, dict):
return []
models = result.get("models")
if not isinstance(models, list):
return []
names: list[str] = []
for m in models:
if not isinstance(m, dict):
continue
name = m.get("name") or m.get("model")
if isinstance(name, str) and name:
names.append(name)
return names
async def probe_host(
host: str,
port: int,
connect_timeout: float,
read_timeout: float,
state: WorkerState,
seen_models: dict,
models_lock: asyncio.Lock,
) -> dict:
base = f"http://{host}:{port}"
timeout = aiohttp.ClientTimeout(
total=None,
sock_connect=connect_timeout,
sock_read=read_timeout,
)
list_result: object | None = None
list_time: int | None = None
ps_result: object | None = None
ps_time: int | None = None
connector = aiohttp.TCPConnector(limit=2, force_close=True)
try:
async with aiohttp.ClientSession(connector=connector) as session:
state.to("list")
list_result, list_time = await call_endpoint(
session, f"{base}/api/tags", timeout
)
state.to("ps")
ps_result, ps_time = await call_endpoint(
session, f"{base}/api/ps", timeout
)
list_names = extract_model_names(list_result)
ps_names = extract_model_names(ps_result)
needs_show: list[str] = []
if list_names or ps_names:
async with models_lock:
for name in list_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[0].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
for name in ps_names:
entry = seen_models.get(name)
if entry is None:
entry = [set(), set(), None]
seen_models[name] = entry
entry[1].add(host)
if entry[2] is None and name not in needs_show:
needs_show.append(name)
if needs_show:
state.to("show")
for name in needs_show:
info, _ = await post_endpoint(
session, f"{base}/api/show", {"model": name}, timeout
)
if not isinstance(info, dict):
continue
async with models_lock:
entry = seen_models.get(name)
if entry is not None and entry[2] is None:
entry[2] = info
except Exception:
pass
return {
"host": host,
"list_result": list_result,
"list_time": list_time,
"ps_result": ps_result,
"ps_time": ps_time,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
async def worker(
state: WorkerState,
queue: "asyncio.Queue[str | None]",
write_q: "asyncio.Queue[dict | None]",
counters: dict,
port: int,
connect_timeout: float,
read_timeout: float,
seen_models: dict,
models_lock: asyncio.Lock,
) -> None:
while True:
host = await queue.get()
if host is None:
queue.task_done()
return
state.begin(host)
try:
result = await probe_host(
host, port, connect_timeout, read_timeout, state,
seen_models, models_lock,
)
if result["list_result"] is not None or result["ps_result"] is not None:
counters["ok"] += 1
state.finish(f"ok {host}")
else:
counters["fail"] += 1
state.finish(f"fail {host}")
except Exception as e:
counters["fail"] += 1
state.finish(f"err {host}: {type(e).__name__}")
result = {
"host": host,
"list_result": None,
"list_time": None,
"ps_result": None,
"ps_time": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
await write_q.put(result)
counters["done"] += 1
queue.task_done()
async def writer_task(
write_q: "asyncio.Queue[dict | None]", path: Path
) -> None:
async with aiofiles.open(path, "a") as f:
while True:
item = await write_q.get()
if item is None:
write_q.task_done()
return
await f.write(json.dumps(item, ensure_ascii=False) + "\n")
await f.flush()
write_q.task_done()
async def dump_models(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
backup: Path,
) -> None:
async with models_lock:
snapshot = [
(name, sorted(entry[0]), sorted(entry[1]), entry[2])
for name, entry in seen_models.items()
]
if not snapshot:
return
lines = [
json.dumps(
[name, [list_servers, ps_servers, info]],
ensure_ascii=False,
)
for name, list_servers, ps_servers, info in snapshot
]
if backup.exists():
backup.unlink()
if path.exists():
path.rename(backup)
async with aiofiles.open(path, "w") as f:
await f.write("\n".join(lines) + "\n")
await f.flush()
if backup.exists():
backup.unlink()
async def models_dumper(
seen_models: dict,
models_lock: asyncio.Lock,
path: Path,
interval: float,
stop_event: asyncio.Event,
) -> None:
backup = Path(str(path) + ".bak")
next_dump = time.monotonic() + interval
while not stop_event.is_set():
wait = max(0.0, next_dump - time.monotonic())
try:
await asyncio.wait_for(stop_event.wait(), timeout=wait)
except asyncio.TimeoutError:
pass
if stop_event.is_set():
break
await dump_models(seen_models, models_lock, path, backup)
next_dump = time.monotonic() + interval
await dump_models(seen_models, models_lock, path, backup)
def render(
progress: Progress,
workers: list[WorkerState],
counters: dict,
) -> Group:
table = Table(expand=True, show_edge=False, pad_edge=False)
table.add_column("W", justify="right", style="cyan", no_wrap=True, width=4)
table.add_column("Host", no_wrap=True, width=22)
table.add_column("Stage", no_wrap=True, width=8)
table.add_column("Elapsed", justify="right", no_wrap=True, width=8)
table.add_column("Last", overflow="ellipsis", no_wrap=True)
now = time.monotonic()
stage_style = {
"idle": "dim",
"connect": "yellow",
"list": "blue",
"ps": "magenta",
"show": "cyan",
}
for w in workers:
elapsed = f"{(now - w.started):.2f}s" if w.started else "-"
color = stage_style.get(w.stage, "")
stage_cell = f"[{color}]{w.stage}[/]" if color else w.stage
last_color = ""
if w.last_result.startswith("ok"):
last_color = "green"
elif w.last_result.startswith(("fail", "err")):
last_color = "red"
last_cell = (
f"[{last_color}]{w.last_result}[/]" if last_color else w.last_result
)
table.add_row(str(w.wid), w.host or "-", stage_cell, elapsed, last_cell)
summary = (
f"[green]ok:[/] {counters['ok']} "
f"[red]fail:[/] {counters['fail']} "
f"[cyan]done:[/] {counters['done']}/{counters['total']}"
)
return Group(progress, summary, table)
async def refresher(
live: Live, interval: float, render_fn
) -> None:
try:
while True:
live.update(render_fn(), refresh=True)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass
async def amain(
input_path: Path,
output_path: Path,
models_output_path: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
dump_interval: float,
) -> None:
hosts: list[str] = []
seen: set[str] = set()
for line in input_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
click.echo(f"skipping non-JSON line: {line[:80]}", err=True)
continue
ip = obj.get("ip") if isinstance(obj, dict) else None
if not ip or ip in seen:
continue
seen.add(ip)
hosts.append(ip)
if not hosts:
click.echo("no hosts to scan", err=True)
return
queue: asyncio.Queue[str | None] = asyncio.Queue()
for h in hosts:
queue.put_nowait(h)
for _ in range(concurrency):
queue.put_nowait(None)
write_q: asyncio.Queue[dict | None] = asyncio.Queue()
counters = {"ok": 0, "fail": 0, "done": 0, "total": len(hosts)}
workers = [WorkerState(i) for i in range(concurrency)]
seen_models: dict = {}
models_lock = asyncio.Lock()
stop_event = asyncio.Event()
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
expand=True,
)
task_id = progress.add_task("scanning", total=len(hosts))
def render_fn() -> Group:
progress.update(task_id, completed=counters["done"])
return render(progress, workers, counters)
with Live(render_fn(), auto_refresh=False) as live:
ref = asyncio.create_task(refresher(live, update_interval, render_fn))
writer = asyncio.create_task(writer_task(write_q, output_path))
dumper = asyncio.create_task(
models_dumper(
seen_models, models_lock, models_output_path,
dump_interval, stop_event,
)
)
worker_tasks = [
asyncio.create_task(
worker(
workers[i],
queue,
write_q,
counters,
port,
connect_timeout,
read_timeout,
seen_models,
models_lock,
)
)
for i in range(concurrency)
]
try:
await asyncio.gather(*worker_tasks)
finally:
stop_event.set()
await dumper
await write_q.put(None)
await writer
ref.cancel()
try:
await ref
except asyncio.CancelledError:
pass
live.update(render_fn(), refresh=True)
click.echo(
f"\nDone. ok={counters['ok']} fail={counters['fail']} "
f"total={counters['total']} output={output_path} "
f"models={len(seen_models)} -> {models_output_path}"
)
@click.command()
@click.option(
"--input",
"-i",
"input_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=Path("results-port-11434.jsonl"),
show_default=True,
help="Input JSONL file (from 1_search.py); the 'ip' field is extracted from each line.",
)
@click.option(
"--output",
"-o",
"output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("connect-results.jsonl"),
show_default=True,
help="Output JSONL file (appended).",
)
@click.option(
"--concurrency",
"-c",
type=int,
default=50,
show_default=True,
help="Number of concurrent worker tasks.",
)
@click.option(
"--connect-timeout",
type=float,
default=5.0,
show_default=True,
help="Per-request TCP connect timeout in seconds.",
)
@click.option(
"--read-timeout",
type=float,
default=10.0,
show_default=True,
help="Per-request socket read timeout in seconds.",
)
@click.option(
"--port",
"-p",
type=int,
default=11434,
show_default=True,
help="TCP port to connect on.",
)
@click.option(
"--update-interval",
type=float,
default=0.5,
show_default=True,
help="Status display refresh interval in seconds.",
)
@click.option(
"--models-output",
"models_output_file",
type=click.Path(dir_okay=False, path_type=Path),
default=Path("scan-results.jsonl"),
show_default=True,
help="Per-model aggregation output (overwritten each dump).",
)
@click.option(
"--dump-interval",
type=float,
default=5.0,
show_default=True,
help="How often (seconds) to rewrite the models-output file.",
)
def main(
input_file: Path,
output_file: Path,
concurrency: int,
connect_timeout: float,
read_timeout: float,
port: int,
update_interval: float,
models_output_file: Path,
dump_interval: float,
) -> None:
asyncio.run(
amain(
input_file,
output_file,
models_output_file,
concurrency,
connect_timeout,
read_timeout,
port,
update_interval,
dump_interval,
)
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment