Skip to content

Instantly share code, notes, and snippets.

@ActiveTK
Created October 12, 2025 13:52
Show Gist options
  • Select an option

  • Save ActiveTK/db8583a30fbaa8ed1e97ff883644951d to your computer and use it in GitHub Desktop.

Select an option

Save ActiveTK/db8583a30fbaa8ed1e97ff883644951d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import os
import re
import subprocess
import sys
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
RESULTS_JSON = os.path.join(SCRIPT_DIR, "results.json")
INSTANCE_JSON = os.path.join(SCRIPT_DIR, "instance.json")
DEFAULT_IMAGE = "vastai/base-image:@vastai-automatic-tag"
DEFAULT_DISK_GB = 8
DEFAULT_ENV = ""
SSH_FLAG_DEFAULT = True
USE_DIRECT_DEFAULT = True
MAX_WORKERS = 75
INIT_TIMEOUT = 1800.0
POLL_INTERVAL = 3.0
DESTROY_TIMEOUT = 120.0
UI_TICK = 0.5
TABLE_MONITOR_INTERVAL = 1.0
TABLE_MONITOR_TIMEOUT = 30
DEBUG = False
ST_INIT = "init"
ST_STOP = "stopped"
ST_RESTART = "restart"
ST_DONE = "done"
ST_DEST = "destroy"
CANCEL_EVENT = threading.Event()
CANCEL_EVENT_2 = threading.Event()
INIT_COMMANDS: List[str] = []
def _newline_after_status() -> None:
active = getattr(print_status_line, "_active", False)
if active:
sys.stderr.write("\n")
sys.stderr.flush()
print_status_line._active = False
print_status_line._last_len = 0
def log(msg: str) -> None:
_newline_after_status()
print(msg, file=sys.stderr, flush=True)
def dlog(msg: str) -> None:
if DEBUG:
_newline_after_status()
print(msg, file=sys.stderr, flush=True)
def run_cmd(cmd: List[str], timeout: Optional[float] = None, check: bool = False) -> subprocess.CompletedProcess:
dlog(f"[CMD] {' '.join(cmd)}")
try:
cp = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=check)
if DEBUG:
if cp.stdout:
dlog(f"[STDOUT]\n{cp.stdout.strip()}")
if cp.stderr:
dlog(f"[STDERR]\n{cp.stderr.strip()}")
return cp
except subprocess.TimeoutExpired:
dlog("[ERROR] command timeout")
raise
except subprocess.CalledProcessError as e:
if DEBUG:
dlog(f"[CALLED-PROC-ERROR] rc={e.returncode}")
if e.stdout:
dlog(f"[STDOUT]\n{e.stdout.strip()}")
if e.stderr:
dlog(f"[STDERR]\n{e.stderr.strip()}")
raise
def _extract_instance_id(obj: dict) -> Optional[int]:
for k in ("id", "instance_id", "contract", "new_contract"):
if k in obj:
try:
return int(obj[k])
except Exception:
continue
return None
def list_all_instance_ids() -> List[int]:
try:
cp = run_cmd(["vastai", "show", "instances", "--raw"], timeout=30)
data = (cp.stdout or "").strip()
if not data:
return []
obj = json.loads(data)
if isinstance(obj, dict):
obj = [obj]
if not isinstance(obj, list):
return []
ids: List[int] = []
for it in obj:
if isinstance(it, dict):
iid = _extract_instance_id(it)
if iid is not None:
ids.append(iid)
return sorted(set(ids))
except Exception as e:
log(f"[ERR ] failed to list instances: {e}")
return []
def show_instance_raw(instance_id: int) -> Optional[dict]:
try:
cp = run_cmd(["vastai", "show", "instance", str(instance_id), "--raw"], timeout=20)
data = (cp.stdout or "").strip()
if not data:
return None
obj = json.loads(data)
if isinstance(obj, list) and obj:
return obj[0]
if isinstance(obj, dict):
return obj
return None
except Exception:
return None
def maybe_start_instance(instance_id: int) -> None:
ins = show_instance_raw(instance_id)
if not ins:
return
st = (ins.get("status") or ins.get("cur_state") or "").lower()
intended = (ins.get("intended_status") or "").lower()
if "stop" in st or intended == "stopped":
try:
run_cmd(["vastai", "start", "instance", str(instance_id)], timeout=20)
except subprocess.CalledProcessError:
dlog(f"[WARN] start instance {instance_id} failed")
def start_instance(instance_id: int) -> None:
try:
run_cmd(["vastai", "start", "instance", str(instance_id)], timeout=20)
except subprocess.CalledProcessError:
dlog(f"[WARN] start failed id={instance_id}")
def stop_instance(instance_id: int) -> None:
try:
run_cmd(["vastai", "stop", "instance", str(instance_id)], timeout=30)
except subprocess.CalledProcessError:
dlog(f"[WARN] stop failed id={instance_id}")
def destroy_instance(instance_id: int, must_confirm: bool = True) -> bool:
try:
run_cmd(["vastai", "destroy", "instance", str(instance_id)], timeout=30)
except subprocess.CalledProcessError:
dlog(f"[ERROR] destroy failed id={instance_id}")
if not must_confirm:
return True
deadline = time.time() + DESTROY_TIMEOUT
while time.time() < deadline:
ins = show_instance_raw(instance_id)
if ins is None:
return True
time.sleep(POLL_INTERVAL)
return False
def parse_instances_table(timeout: int = 30) -> List[dict]:
try:
cp = subprocess.run(["vastai", "show", "instances"], capture_output=True, text=True, timeout=timeout)
table = (cp.stdout or "").strip()
except Exception as e:
dlog(f"[TABLE] fetch error: {e}")
return []
lines = [ln.rstrip("\n") for ln in table.splitlines() if ln.strip()]
if not lines:
return []
header = lines[0].strip()
headers = re.split(r"\s{2,}", header)
idx = {name: i for i, name in enumerate(headers)}
rows: List[dict] = []
for line in lines[1:]:
if line.lower().startswith("totals:"):
continue
cols = re.split(r"\s{2,}", line.strip())
if len(cols) < len(headers):
continue
row = {h: cols[i] if i < len(cols) else "" for h, i in idx.items()}
rows.append(row)
return rows
def cmd_cleanup() -> None:
rows = parse_instances_table(timeout=30)
if not rows:
print("No instances found (table empty).", file=sys.stderr)
return
targets: List[int] = []
for r in rows:
iid = (r.get("ID", "") or "").strip()
st = (r.get("Status", "") or "").strip().lower()
st = st.rstrip("*").strip()
if iid and st != "running":
try:
targets.append(int(iid))
except Exception:
continue
if not targets:
print("All instances are Running — nothing to clean up.", file=sys.stderr)
return
total = len(targets)
print(f"Destroying {total} non-running instances (max_workers={MAX_WORKERS}) …")
ok = 0
ng = 0
start_time = time.time()
def status_line():
elapsed = int(time.time() - start_time)
m, s = divmod(elapsed, 60)
el = f"{m}m{s}s" if m else f"{s}s"
line = f"elapsed: {el}, destroyed={ok}, failed={ng}, pending={total - ok - ng}"
print("\r" + line + " " * 8, end="", file=sys.stderr, flush=True)
status_line()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
fut2id = {ex.submit(destroy_instance, iid, True): iid for iid in targets}
pending = set(fut2id.keys())
while pending:
done_now = [f for f in list(pending) if f.done()]
if not done_now:
time.sleep(UI_TICK)
status_line()
continue
for f in done_now:
pending.remove(f)
iid = fut2id[f]
try:
res = f.result()
except Exception as e:
dlog(f"[CLEANUP-EXC] id={iid} {e}")
res = False
if res:
ok += 1
print(f"\n -> destroyed id={iid}")
else:
ng += 1
print(f"\n -> FAILED to destroy id={iid}", file=sys.stderr)
status_line()
print("\nSummary: destroyed={}, failed={}, total={}".format(ok, ng, total))
class TableMonitor:
def __init__(self, interval: float = TABLE_MONITOR_INTERVAL, timeout: int = TABLE_MONITOR_TIMEOUT):
self.interval = max(0.2, float(interval))
self.timeout = int(timeout)
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self._by_id: Dict[str, dict] = {}
self._streak: Dict[str, int] = {}
self._stop = False
self._thread: Optional[threading.Thread] = None
def start(self) -> None:
if self._thread is not None:
return
self._thread = threading.Thread(target=self._run, name="vast-table-monitor", daemon=True)
self._thread.start()
def stop(self) -> None:
with self._lock:
self._stop = True
self._cond.notify_all()
if self._thread:
self._thread.join(timeout=2.0)
def _run(self) -> None:
while True:
with self._lock:
if self._stop:
return
rows = self._fetch_and_parse()
with self._lock:
by_id = {}
for r in rows:
iid = (r.get("ID", "") or "").strip()
if iid:
by_id[iid] = r
for iid, row in by_id.items():
st = (row.get("Status", "") or "").strip().lower()
if st == "running":
self._streak[iid] = self._streak.get(iid, 0) + 1
else:
self._streak[iid] = 0
for iid in set(self._streak.keys()) - set(by_id.keys()):
self._streak[iid] = 0
self._by_id = by_id
self._cond.notify_all()
time.sleep(self.interval)
def _fetch_and_parse(self) -> List[dict]:
try:
cp = subprocess.run(["vastai", "show", "instances"], capture_output=True, text=True, timeout=self.timeout)
table = (cp.stdout or "").strip()
except Exception as e:
dlog(f"[TABLE] fetch error: {e}")
return []
lines = [ln.rstrip("\n") for ln in table.splitlines() if ln.strip()]
if not lines:
return []
header = lines[0].strip()
headers = re.split(r"\s{2,}", header)
idx = {name: i for i, name in enumerate(headers)}
rows: List[dict] = []
for line in lines[1:]:
if line.lower().startswith("totals:"):
continue
cols = re.split(r"\s{2,}", line.strip())
if len(cols) < len(headers):
continue
row = {h: cols[i] if i < len(cols) else "" for h, i in idx.items()}
rows.append(row)
return rows
def wait_for_running(self, instance_id: int, consecutive: int, timeout: float, cancel_event: threading.Event) -> bool:
end_by = time.time() + timeout
iid = str(instance_id)
with self._lock:
while time.time() < end_by:
if cancel_event.is_set():
return False
if self._streak.get(iid, 0) >= consecutive:
return True
remaining = end_by - time.time()
if remaining <= 0:
break
self._cond.wait(timeout=min(remaining, max(0.5, self.interval * 1.5)))
return False
_TABLE_MONITOR: Optional[TableMonitor] = None
def _build_onstart_from_init_commands() -> str:
pieces = ["set -e"]
pieces.extend(INIT_COMMANDS)
return "; ".join(pieces)
def _build_init_commands(screen_specs: List[str], raw_onstart: Optional[str]) -> None:
INIT_COMMANDS.clear()
base = [
"dpkg --configure -a || true",
"apt-get -f -y install || true",
"apt-get update && apt-get install -y screen curl",
"mkdir -p /work",
]
INIT_COMMANDS.extend(base)
if raw_onstart:
INIT_COMMANDS.append(raw_onstart)
for spec in screen_specs:
if ":" in spec:
name, cmd = spec.split(":", 1)
else:
name, cmd = "main", spec
name = re.sub(r"[^A-Za-z0-9_-]", "_", name.strip() or "main")
safe = cmd.strip()
if not safe:
continue
INIT_COMMANDS.append(f"screen -S {name} -dm bash -lc '{safe} >>/work/{name}.log 2>&1'")
def create_instance(offer_id: int, args) -> Optional[int]:
cmd = [
"vastai",
"create",
"instance",
str(offer_id),
"--image",
args.image,
"--disk",
str(args.disk),
"--onstart-cmd",
_build_onstart_from_init_commands(),
"--raw",
]
if args.env:
cmd.extend(["--env", args.env])
if args.ssh:
cmd.append("--ssh")
if args.direct:
cmd.append("--direct")
try:
cp = run_cmd(cmd, timeout=60)
try:
resp = json.loads(cp.stdout or "{}")
except json.JSONDecodeError:
resp = {}
new_id = None
if isinstance(resp, dict) and "new_contract" in resp:
try:
new_id = int(resp["new_contract"])
except Exception:
pass
if new_id is None:
log(f"[ERR ] create failed to return new_contract (offer={offer_id})")
return None
return new_id
except subprocess.TimeoutExpired:
log(f"[ERR ] create timeout (offer={offer_id})")
return None
except subprocess.CalledProcessError:
log(f"[ERR ] create failed (offer={offer_id})")
return None
def worker_flow_phase1(offer: dict, report: Dict[str, str], args, consecutive_ok: int = 2) -> Optional[int]:
if CANCEL_EVENT.is_set():
return None
offer_id = int(offer["id"])
report_key = None
log(f"[TRY] offer={offer_id}")
inst_id = create_instance(offer_id, args)
if inst_id is None:
log(f"[NG ] offer={offer_id} create failed")
return None
report_key = str(inst_id)
report[report_key] = ST_INIT
if CANCEL_EVENT.is_set():
return None
maybe_start_instance(inst_id)
ok = False
if _TABLE_MONITOR is not None:
ok = _TABLE_MONITOR.wait_for_running(inst_id, consecutive=consecutive_ok, timeout=INIT_TIMEOUT, cancel_event=CANCEL_EVENT)
else:
tmp_mon = TableMonitor(interval=TABLE_MONITOR_INTERVAL, timeout=TABLE_MONITOR_TIMEOUT)
tmp_mon.start()
try:
ok = tmp_mon.wait_for_running(inst_id, consecutive=consecutive_ok, timeout=INIT_TIMEOUT, cancel_event=CANCEL_EVENT)
finally:
tmp_mon.stop()
if CANCEL_EVENT.is_set():
return None
if ok:
log(f"[OK ] offer={offer_id} inst={inst_id} (Running x{consecutive_ok}) -> stop")
stop_instance(inst_id)
report[report_key] = ST_STOP
return inst_id
report[report_key] = ST_DEST
log(f"[TIMEOUT] offer={offer_id} inst={inst_id} not Running after {int(INIT_TIMEOUT)}s -> destroy")
destroy_instance(inst_id, must_confirm=True)
return None
def human_elapsed(sec: float) -> str:
m = int(sec // 60)
s = int(sec % 60)
return f"{m}m{s}s" if m else f"{s}s"
def print_status_line(start_time: float, report: Dict[str, str]) -> None:
now = time.time()
elapsed = human_elapsed(now - start_time)
vals = list(report.values())
n_init = vals.count(ST_INIT)
n_stop = vals.count(ST_STOP)
n_restart = vals.count(ST_RESTART)
n_done = vals.count(ST_DONE)
n_destroy = vals.count(ST_DEST)
loaded = n_done
line = (
f"elapsed: {elapsed}, loaded={loaded}, destroy={n_destroy}; "
f"[init]={n_init}, [stopped]={n_stop}, [restart]={n_restart}, [done]={n_done}"
)
last_len = int(getattr(print_status_line, "_last_len", 0))
pad = max(0, last_len - len(line)) + 2
sys.stderr.write("\r" + line + (" " * pad))
sys.stderr.flush()
print_status_line._active = True
print_status_line._last_len = len(line)
def _destroy_all_in_stage(report: Dict[str, str], stage: str) -> None:
ids = [int(iid) for iid, st in list(report.items()) if st == stage]
if not ids:
log(f"[CLEANUP] no instances in stage '{stage}' to destroy")
return
log(f"[CLEANUP] destroying {len(ids)} instances in stage '{stage}' …")
ok = 0
ng = 0
start_time = time.time()
def status_line():
elapsed = int(time.time() - start_time)
m, s = divmod(elapsed, 60)
el = f"{m}m{s}s" if m else f"{s}s"
line = f"elapsed: {el}, destroyed={ok}, failed={ng}, pending={len(ids) - ok - ng}"
print("\r" + line + " " * 8, end="", file=sys.stderr, flush=True)
status_line()
with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, 64)) as ex:
fut2id = {ex.submit(destroy_instance, iid, True): iid for iid in ids}
pending = set(fut2id.keys())
while pending:
done_now = [f for f in list(pending) if f.done()]
if not done_now:
time.sleep(UI_TICK)
status_line()
continue
for f in done_now:
pending.remove(f)
iid = fut2id[f]
try:
res = f.result()
except Exception as e:
dlog(f"[CLEANUP-EXC] id={iid} {e}")
res = False
if res:
ok += 1
report[str(iid)] = ST_DEST
print(f"\n -> destroyed id={iid}")
else:
ng += 1
print(f"\n -> FAILED to destroy id={iid}", file=sys.stderr)
status_line()
print()
def _restart_and_wait(ids: List[int], report: Dict[str, str], consecutive_ok: int = 2) -> List[int]:
seen = set()
unique_ids: List[int] = []
for x in ids:
if x not in seen:
seen.add(x)
unique_ids.append(x)
for iid in unique_ids:
try:
start_instance(iid)
report[str(iid)] = ST_RESTART
except Exception:
dlog(f"[RESTART] start failed id={iid}")
successes: List[int] = []
logged_ok: set = set()
total = len(unique_ids)
try:
with ThreadPoolExecutor(max_workers=min(MAX_WORKERS, 64)) as ex:
fut2id = {
ex.submit(_TABLE_MONITOR.wait_for_running, iid, consecutive_ok, INIT_TIMEOUT, CANCEL_EVENT_2): iid
for iid in unique_ids
}
pending = set(fut2id.keys())
while pending:
time.sleep(UI_TICK)
if CANCEL_EVENT_2.is_set():
log("[CTRL-C] restart cancel received — destroying pending instances…")
to_destroy = [fut2id[f] for f in pending]
for iid in to_destroy:
destroy_instance(iid, must_confirm=True)
report[str(iid)] = ST_DEST
pending.clear()
break
done_now = [f for f in list(pending) if f.done()]
for f in done_now:
pending.remove(f)
iid = fut2id[f]
ok = False
try:
ok = f.result()
except Exception as e:
dlog(f"[RESTART-WAIT] id={iid} exc={e}")
if ok and iid not in logged_ok:
report[str(iid)] = ST_DONE
successes.append(iid)
logged_ok.add(iid)
log(f"[OK ] restart inst={iid} (Running x{consecutive_ok})")
if len(successes) >= total:
log(f"[WRAPUP] all {total} restarted and Running — proceeding to save …")
break
except KeyboardInterrupt:
CANCEL_EVENT_2.set()
log("[CTRL-C] restart cancel (outer) — destroying pending and wrapping up …")
to_destroy = [int(iid) for iid, st in report.items() if st == ST_RESTART]
for iid in to_destroy:
destroy_instance(iid, must_confirm=True)
report[str(iid)] = ST_DEST
return successes
def cmd_start(n: int, args) -> None:
global _TABLE_MONITOR
if not os.path.exists(RESULTS_JSON):
print("results.json not found. Run search.py first.", file=sys.stderr)
sys.exit(1)
with open(RESULTS_JSON, "r", encoding="utf-8") as f:
offers = json.load(f)
if not isinstance(offers, list) or len(offers) == 0:
print("results.json is empty.", file=sys.stderr)
sys.exit(1)
targets = offers[:n]
total_vcpu = sum(float(o.get("vcpus") or 0.0) for o in targets)
total_ram = sum(float(o.get("ram_gb") or 0.0) for o in targets)
total_dph = sum(float(o.get("dph_total") or 0.0) for o in targets)
print(f"Targets: {len(targets)} offers")
print(f"Totals (reference): vCPUs={int(total_vcpu)}, RAM={int(total_ram)} GB, $/hr={total_dph:.3f}")
ans = input("Proceed? [y/N]: ").strip().lower()
if ans != "y":
print("Cancelled.")
return
_TABLE_MONITOR = TableMonitor(interval=TABLE_MONITOR_INTERVAL, timeout=TABLE_MONITOR_TIMEOUT)
_TABLE_MONITOR.start()
report: Dict[str, str] = {}
start_time = time.time()
stopped_ids: List[int] = []
stopped_set: set = set()
final_successes: List[Dict[str, Optional[int]]] = []
try:
try:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
futs = [ex.submit(worker_flow_phase1, o, report, args) for o in targets]
pending = set(futs)
while pending:
try:
print_status_line(start_time, report)
time.sleep(UI_TICK)
done_now = [f for f in list(pending) if f.done()]
for f in done_now:
pending.remove(f)
iid = f.result()
if iid and iid not in stopped_set:
stopped_set.add(iid)
stopped_ids.append(iid)
except KeyboardInterrupt:
print("\n[CTRL-C] cancel received — destroying init-stage instances and moving to restart phase…", file=sys.stderr)
CANCEL_EVENT.set()
break
for f in futs:
if f.done():
iid = f.result()
if iid and iid not in stopped_set:
stopped_set.add(iid)
stopped_ids.append(iid)
finally:
_destroy_all_in_stage(report, ST_INIT)
CANCEL_EVENT.clear()
CANCEL_EVENT_2.clear()
restart_ids = list(stopped_set)
if restart_ids:
log(f"[RESTART] starting {len(restart_ids)} previously stopped instances …")
try:
restarted_ok = _restart_and_wait(restart_ids, report, consecutive_ok=2)
except KeyboardInterrupt:
CANCEL_EVENT_2.set()
log("[CTRL-C] restart cancel — destroying pending and proceeding to save …")
to_destroy = [int(iid) for iid, st in report.items() if st == ST_RESTART]
for iid in to_destroy:
destroy_instance(iid, must_confirm=True)
report[str(iid)] = ST_DEST
restarted_ok = [int(iid) for iid, st in report.items() if st == ST_DONE]
still_pending = [int(iid) for iid, st in list(report.items()) if st == ST_RESTART]
for iid in still_pending:
log(f"[TIMEOUT] restart inst={iid} not Running after {int(INIT_TIMEOUT)}s -> destroy")
destroy_instance(iid, must_confirm=True)
report[str(iid)] = ST_DEST
log("[WRAPUP] building output list …")
uniq_ok: List[int] = []
seen_ok = set()
for iid in restarted_ok:
if iid not in seen_ok:
seen_ok.add(iid)
uniq_ok.append(iid)
for iid in uniq_ok:
final_successes.append({"instance_id": iid, "offer_id": None})
finally:
if _TABLE_MONITOR:
_TABLE_MONITOR.stop()
try:
print("\n[WRAPUP] Saving instance.json …", file=sys.stderr, flush=True)
with open(INSTANCE_JSON, "w", encoding="utf-8") as f:
json.dump(final_successes, f, ensure_ascii=False, indent=2)
print(f"Saved instance.json ({len(final_successes)} instances): {INSTANCE_JSON}", file=sys.stderr, flush=True)
except Exception as e:
print(f"[ERROR] failed to write {INSTANCE_JSON}: {e}", file=sys.stderr, flush=True)
def cmd_status() -> None:
def to_float(s: str) -> float:
if s is None:
return 0.0
m = re.search(r"-?\d+(?:,\d{3})*(?:\.\d+)?", s)
if not m:
return 0.0
return float(m.group(0).replace(",", ""))
try:
cp = subprocess.run(["vastai", "show", "instances"], capture_output=True, text=True, timeout=30)
table = cp.stdout or ""
sys.stdout.write(table)
sys.stdout.flush()
except KeyboardInterrupt:
return
lines = [ln.rstrip("\n") for ln in table.splitlines() if ln.strip()]
if not lines:
print("\nTotals: instances=0, vCPUs=0, RAM=0.0 GB, $/hr=0.000")
return
header = lines[0].strip()
headers = re.split(r"\s{2,}", header)
idx = {name: i for i, name in enumerate(headers)}
required = ("vCPUs", "RAM", "$\/hr")
missing = [k for k in required if k not in idx]
if missing:
dlog(f"[STATUS-TOTALS] missing columns in header: {missing}")
print("\nTotals: (failed to compute — header mismatch)", file=sys.stderr)
return
total_instances = 0
total_vcpus = 0.0
total_ram_gb = 0.0
total_dph = 0.0
for line in lines[1:]:
if line.lower().startswith("totals:"):
continue
cols = re.split(r"\s{2,}", line.strip())
if len(cols) <= max(idx.values()):
continue
total_instances += 1
total_vcpus += to_float(cols[idx["vCPUs"]])
total_ram_gb += to_float(cols[idx["RAM"]])
total_dph += to_float(cols[idx["$/hr"]])
print("\nTotals: instances={}, vCPUs={}, RAM={:.1f} GB, $/hr={:.3f}".format(total_instances, int(total_vcpus), total_ram_gb, total_dph))
def cmd_destroy() -> None:
ids = list_all_instance_ids()
if not ids:
print("No instances to destroy (0 found via `vastai show instances`).", file=sys.stderr)
return
total = len(ids)
print(f"Destroying {total} instances in parallel (max_workers={MAX_WORKERS}) …")
ok = 0
ng = 0
start_time = time.time()
def status_line():
elapsed = int(time.time() - start_time)
m, s = divmod(elapsed, 60)
el = f"{m}m{s}s" if m else f"{s}s"
line = f"elapsed: {el}, destroyed={ok}, failed={ng}, pending={total - ok - ng}"
print("\r" + line + " " * 8, end="", file=sys.stderr, flush=True)
status_line()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
fut2id = {ex.submit(destroy_instance, iid, True): iid for iid in ids}
pending = set(fut2id.keys())
while pending:
done_now = [f for f in list(pending) if f.done()]
if not done_now:
time.sleep(UI_TICK)
status_line()
continue
for f in done_now:
pending.remove(f)
iid = fut2id[f]
try:
res = f.result()
except Exception as e:
dlog(f"[DESTROY-EXC] id={iid} {e}")
res = False
if res:
ok += 1
print(f"\n -> destroyed id={iid}")
else:
ng += 1
print(f"\n -> FAILED to destroy id={iid}", file=sys.stderr)
status_line()
print("\nSummary: destroyed={}, failed={}, total={}".format(ok, ng, total))
def main() -> None:
global DEBUG
ap = argparse.ArgumentParser(description="Vast.ai loader template (table-based readiness; run arbitrary screen sessions or raw onstart)")
g = ap.add_mutually_exclusive_group(required=True)
g.add_argument("--start", type=int, help="Create & wait -> stop; Ctrl+C: destroy inits & restart all stopped; second Ctrl+C: destroy non-restarted; finally write instance.json")
g.add_argument("--status", action="store_true", help="Show current instances (vastai show instances)")
g.add_argument("--destroy", action="store_true", help="Destroy all live instances (queried via show instances)")
g.add_argument("--cleanup", action="store_true", help="Destroy all instances that are not currently Status=Running")
ap.add_argument("--debug", action="store_true", help="Verbose logs")
ap.add_argument("--image", default=DEFAULT_IMAGE, help="Image name for creation")
ap.add_argument("--env", default=DEFAULT_ENV, help="Raw --env string for port/env (optional)")
ap.add_argument("--disk", type=int, default=DEFAULT_DISK_GB, help=f"Disk size GB (default: {DEFAULT_DISK_GB})")
ap.add_argument("--ssh", action="store_true", default=SSH_FLAG_DEFAULT, help="Pass --ssh on create (default: on)")
ap.add_argument("--no-ssh", dest="ssh", action="store_false", help="Disable --ssh on create")
ap.add_argument("--direct", action="store_true", default=USE_DIRECT_DEFAULT, help="Pass --direct on create (default: on)")
ap.add_argument("--no-direct", dest="direct", action="store_false", help="Disable --direct on create")
ap.add_argument("--screen", action="append", default=[], help="Specify screen sessions as 'name:command' or just 'command' (repeatable)")
ap.add_argument("--onstart", default=None, help="Raw command to append to on-start before screens")
ap.add_argument("--ssh-key", default=os.environ.get("VASTAI_SSH_KEY", None) or (os.path.expandvars(r"%USERPROFILE%\.ssh\vastai") if os.name == "nt" else os.path.expanduser("~/.ssh/vastai")), help="Path to SSH key (not used for readiness)")
args = ap.parse_args()
DEBUG = bool(args.debug)
_build_init_commands(args.screen, args.onstart)
if args.start is not None:
cmd_start(args.start, args)
elif args.status:
cmd_status()
elif args.destroy:
cmd_destroy()
elif args.cleanup:
cmd_cleanup()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment