Skip to content

Instantly share code, notes, and snippets.

@jpic
Last active November 20, 2025 14:37
Show Gist options
  • Select an option

  • Save jpic/715f46ba7c32228db9f0d97ce90dca68 to your computer and use it in GitHub Desktop.

Select an option

Save jpic/715f46ba7c32228db9f0d97ce90dca68 to your computer and use it in GitHub Desktop.
file read tool
read_file_slice() {
local path="$1"
local start_byte="${2:-0}"
local length="${3:-32768}"
local show_hex="${4:-false}"
# Safety caps
(( length > 131072 )) && length=131072
(( start_byte < 0 )) && start_byte=0
# Resolve path
path=$(realpath "$path" 2>/dev/null || printf '%s\n' "$path")
# Header
printf '[read_file_slice] %s\n' "$path"
printf '[Requested bytes: %d–%d (%d bytes)]\n\n' \
"$start_byte" "$((start_byte + length - 1))" "$length"
# ── Extract the exact byte range ─────────────────────────────────────
if [[ "$path" == *.gz ]]; then
block_size=1048576
approx_block=$(( start_byte / block_size ))
offset_in_block=$(( start_byte % block_size ))
(dd if="$path" bs="$block_size" skip="$approx_block" count=8 status=none 2>/dev/null || true) |
gzip -dc 2>/dev/null |
dd bs=1 skip="$offset_in_block" count="$length" status=none 2>/dev/null
else
dd if="$path" bs=1 skip="$start_byte" count="$length" status=none 2>/dev/null
fi | {
# Read everything once (preserves NUL bytes!)
IFS= read -r -d '' data || true
if [ "$show_hex" = true ]; then
printf '[show_hex forced → hexdump]\n'
printf '%s' "$data" | hexdump -ve '1/1 "%.02X "' -e '16/1 "%_c" "\n"'
return
fi
# Detect binary: >15% non-printable/control chars in first 32KB
nonprint=$(printf '%s' "$data" | head -c 32768 |
tr -d -c '[\001-\010\016-\037\177-\377]' | wc -c)
if (( nonprint * 100 <= 32768 * 15 )); then
# Text → raw output
printf '%s' "$data"
else
# Binary → hexdump
printf '[BINARY DETECTED → hexdump of requested range]\n'
printf '%s' "$data" | hexdump -ve '1/1 "%.02X "' -e '16/1 "%_c" "\n"'
fi
}
}
@jpic
Copy link
Author

jpic commented Nov 20, 2025

{
  "name": "read_file_slice",
  "description": "Read an exact byte range from a file (plain or .gz) using highly efficient seeking. Works instantly even on multi-gigabyte logs and compressed archives. Use this whenever you know the approximate byte offset from grep -b, journalctl, or previous slices. Automatically detects and safely handles binary files (core dumps, pcaps, corrupted gz, etc.).",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "description": "Absolute or relative path to the file (e.g. /var/log/syslog, /var/log/nginx/access.log.7.gz)"
      },
      "start_byte": {
        "type": "integer",
        "minimum": 0,
        "description": "Starting byte offset in the *uncompressed* content (0 = beginning of file)"
      },
      "length": {
        "type": "integer",
        "minimum": 1,
        "maximum": 131072,
        "default": 32768,
        "description": "Maximum number of bytes to read. Will be capped at 128 KB for safety."
      },
      "show_hex": {
        "type": "boolean",
        "default": false,
        "description": "If true, force hexdump output even for text-looking files (useful for partial binary inspection)"
      }
    },
    "required": ["path", "start_byte"],
    "additionalProperties": false
  }
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

grep_file() {
    local path="$1"
    local pattern="$2"
    local context="${3:-3}"
    local max_matches="${4:-50}"
    local ignore_case="${5:-true}"

    local grep_cmd="grep"
    [[ "$path" == *.gz ]] && grep_cmd="zgrep"

    local flags="-E -I --color=never --with-filename --line-number --byte-offset"
    (( context > 0 ))     && flags="$flags -C $context"
    [[ "$ignore_case" == "true" ]] && flags="$flags -i"

    echo "[grep_file] $grep_cmd '$pattern' in $path (max $max_matches matches, context $context)"
    echo

    $grep_cmd $flags -- "$pattern" "$path" 2>/dev/null | \
        head -n $(( max_matches * (2 * context + 5) )) | \
        awk -v max="$max_matches" '
            BEGIN {count=0}
            /^[^:]+:[0-9]+:[0-9]+:/ {count++; if(count > max) exit}
            {print}
        '
}
{
  "name": "grep_file",
  "description": "Search inside plain text files or gzip-compressed files (.gz) using regex. Automatically uses zgrep when needed. Returns matches with filename, line number, byte offset (uncompressed), and context lines. Perfect for chaining with read_file_slice. Extremely fast and safe on huge rotated logs.",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "description": "File or glob (e.g. /var/log/syslog, /var/log/nginx/access.log*.gz)"
      },
      "pattern": {
        "type": "string",
        "description": "Regex pattern (extended regex syntax)"
      },
      "context_lines": {
        "type": "integer",
        "minimum": 0,
        "maximum": 10,
        "default": 3,
        "description": "Lines of leading/trailing context per match (-C)"
      },
      "max_matches": {
        "type": "integer",
        "minimum": 1,
        "maximum": 150,
        "default": 50,
        "description": "Stop after this many matches to protect context window"
      },
      "ignore_case": {
        "type": "boolean",
        "default": true,
        "description": "Case-insensitive search (-i)"
      }
    },
    "required": ["path", "pattern"],
    "additionalProperties": false
  }
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

{
  "name": "list_processes",
  "description": "List running processes with smart filtering, sorting, and rich fields. Automatically highlights anomalies (high CPU/mem, zombie, old, huge RSS, etc.). Perfect replacement for ps -aux, top, htopt — never use raw ps again.",
  "parameters": {
    "type": "object",
    "properties": {
      "filter": {
        "type": "string",
        "description": "Filter by command name, path, or argument (e.g. nginx, java, docker, python3.11)"
      },
      "user": {
        "type": "string",
        "description": "Filter by username or UID (e.g. root, www-data, 1001)"
      },
      "pid": {
        "type": "integer",
        "description": "Show only this PID"
      },
      "parent_pid": {
        "type": "integer",
        "description": "Show children of this PPID"
      },
      "high_cpu": {
        "type": "boolean",
        "default": false,
        "description": "Only show processes using > 30% CPU (last sample)"
      },
      "high_mem": {
        "type": "boolean",
        "default": false,
        "description": "Only show processes using > 500 MB RSS"
      },
      "zombie": {
        "type": "boolean",
        "default": false,
        "description": "Show zombie/defunct processes"
      },
      "old": {
        "type": "boolean",
        "default": false,
        "description": "Show processes running > 30 days"
      },
      "limit": {
        "type": "integer",
        "default": 100,
        "maximum": 500,
        "description": "Max results (default 100)"
      },
      "sort_by": {
        "type": "string",
        "enum": ["cpu", "mem", "time", "pid", "age"],
        "default": "cpu",
        "description": "Sort descending by this field"
      }
    },
    "additionalProperties": false
  }
}
list_processes() {
    local filter="$1" user="$2" pid="$3" ppid="$4"
    local high_cpu="$5" high_mem="$6" zombie="$7" old="$8"
    local limit="${9:-100}" sort_by="${10:-cpu}"

    printf "\e[1;36m[list_processes]\e[0m Showing up to \e[1m%s\e[0m processes" "$limit"
    [ -n "$filter" ]    && printf " | filter: \e[33m%s\e[0m" "$filter"
    [ "$high_cpu" = "true" ] && printf " | \e[31mCPU > 30%%\e[0m"
    [ "$high_mem" = "true" ] && printf " | \e[35mRSS > 500M\e[0m"
    [ "$zombie" = "true" ]   && printf " | \e[91mzombies\e[0m"
    [ "$old" = "true" ]      && printf " | \e[90m> 30 days\e[0m"
    printf "\n\n"

    # Header
    printf "\e[1m%-10s %6s %6s %5s %5s %7s %9s %5s  %s\e[0m\n" \
           "USER" "PID" "PPID" "%CPU" "%MEM" "RSS(M)" "TIME+" "AGE" "COMMAND"
    
    # We'll collect lines in an array and sort in bash (more reliable than sort -k with variable columns)
    declare -a lines=()

    for proc in /proc/[0-9]*; do
        [[ -f "$proc/stat" ]] || continue
        read -r pid_comm state ppid _ _ _ _ _ _ _ _ _ utime stime _ _ _ _ _ _ _ _ _ _ rest < "$proc/stat" 2>/dev/null || continue

        pid="${proc##*/}"
        comm="${pid_comm#(}"    # remove leading (
        comm="${comm%)}"        # remove trailing )

        # Get real UID → username
        uid=$(awk '/^Uid:/ {print $2}' "$proc/status" 2>/dev/null || echo "")
        username=$(id -un "$uid" 2>/dev/null || echo "$uid")

        # CPU % (approximate, over last sampling period)
        cpu=$(awk '{printf "%.1f", ($13+$14)/100}' "$proc/stat")  # utime + stime in jiffies → rough %

        # Memory
        rss=$(awk '/VmRSS:/ {print $2/1024}' "$proc/status")  # MB

        # Start time → elapsed
        starttime=$(awk '{print $22}' "$proc/stat")
        boot_time=$(cut -d. -f1 /proc/uptime)
        clk_tck=$(getconf CLK_TCK)
        seconds_running=$(( (boot_time * clk_tck - starttime) / clk_tck ))
        days=$(( seconds_running / 86400 ))
        elapsed=$(printf "%dd%02d:%02d" "$days" $((seconds_running%86400/3600)) $((seconds_running%3600/60)))

        # Full command line with proper argument separation
        if [[ -f "$proc/cmdline" ]]; then
            cmdline=$(tr '\0' ' ' < "$proc/cmdline" | sed 's/ $//')
            [[ -z "$cmdline" ]] && cmdline="[$comm]"
        else
            cmdline="[$comm]"
        fi

        # State: Z for zombie
        [[ "$state" == "Z" ]] && cmdline="$cmdline <defunct>"

        # Filters
        (( pid && pid != "$3" )) && continue
        (( ppid && ppid != "$4" )) && continue
        [[ -n "$user" && "$username" != "$user" ]] && continue
        [[ -n "$filter" && "$cmdline" != *"$filter"* && "$comm" != *"$filter"* ]] && continue
        [[ "$high_cpu" = "true" && $(awk 'BEGIN{print '"$cpu"' < 30}') -eq 1 ]] && continue
        [[ "$high_mem" = "true" && $(awk 'BEGIN{print '"$rss"' < 500}') -eq 1 ]] && continue
        [[ "$zombie" = "true" && "$state" != "Z" ]] && continue
        [[ "$old" = "true" && "$days" -lt 30 ]] && continue

        # Format line
        time_plus=$(awk -v u=$utime -v s=$stime -v t=$clk_tck 'BEGIN{printf "%d:%02d", (u+s)/t/60, (u+s)/t%60}')

        printf -v line "%-10s %6s %6s %5.1f %5.1f %7s %9s %5s  %s" \
               "$username" "$pid" "$ppid" "$cpu" "0.0" "${rss%.*}" "$time_plus" "${days}d" "$cmdline"

        lines+=("$line")
    done

    # Sort in memory (handles alignment perfectly)
    case "$sort_by" in
        cpu)  IFS=$'\n' sorted=($(printf '%s\n' "${lines[@]}" | sort -k4,4nr)) ;;
        mem)  IFS=$'\n' sorted=($(printf '%s\n' "${lines[@]}" | sort -k6,6nr)) ;;
        time) IFS=$'\n' sorted=($(printf '%s\n' "${lines[@]}" | sort -k7,7nr)) ;;
        age)  IFS=$'\n' sorted=($(printf '%s\n' "${lines[@]}" | sort -k8,8nr)) ;;
        *)    IFS=$'\n' sorted=($(printf '%s\n' "${lines[@]}" | sort -k2,2nr)) ;;  # pid
    esac

    # Output
    printf '%s\n' "${sorted[@]:0:$limit}"
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

{
  "name": "analyze_core_dump",
  "description": "Fully analyze any Linux core dump (ELF, Java hs_err, Go core, minidump, etc.). Automatically finds the crashed binary, runs the correct debugger (gdb, lldb, jhsdb, rust-gdb, minidump_stackwalk), and returns clean backtrace + signal + fault address. This is the ONLY way to diagnose crashes — never run gdb manually.",
  "parameters": {
    "type": "object",
    "properties": {
      "core_path": {
        "type": "string",
        "description": "Path to core file (e.g. /var/crash/core, core.12345, hs_err_pid123.log)"
      },
      "binary_path": {
        "type": "string",
        "description": "Optional: path to the executable (auto-detected if omitted)"
      }
    },
    "required": ["core_path"],
    "additionalProperties": false
  }
}
analyze_core_dump() {
    local core="$1"
    local binary="${2:-}"

    [[ -e "$core" ]] || { echo "ERROR: $core not found"; return 1; }

    echo "[analyze_core_dump] Analyzing $core"
    echo

    # 1. Detect type
    local type=$(file -b "$core" 2>/dev/null || echo "unknown")
    case "$type" in
        *ELF*core*)
            # Classic Linux ELF core
            [[ -z "$binary" ]] && binary=$(eu-readelf -n "$core" 2>/dev/null | grep -m1 "core_prpsinfo_prfname" | awk '{print $NF}' | tr -d '\000')
            [[ -z "$binary" ]] && binary=$(strings "$core" 2>/dev/null | grep -m1 "^/.*bin/" || true)

            if [[ -z "$binary" || ! -x "$binary" ]]; then
                echo "Warning: Could not auto-detect executable, trying common locations..."
                # Fallback heuristics
                for p in /usr/sbin/* /usr/bin/* /bin/*; do
                    [[ -x "$p" ]] && strings "$core" | grep -q "$(basename "$p")" && binary="$p" && break
                done
            fi

            if [[ -x "$binary" ]]; then
                echo "Found executable: $binary"
                gdb -q -batch -c "$core" "$binary" -ex "info shared" -ex "bt full" -ex "thread apply all bt" -ex "quit" 2>/dev/null || \
                lldb -Q -b -c "$core" --one-line "bt all" --one-line "image list" --one-line "quit" 2>/dev/null || \
                echo "Error: gdb/lldb failed"
            else
                echo "Error: Could not find executable for core dump"
                file "$core"
                eu-readelf -n "$core" 2>/dev/null || true
            fi
            ;;

        *hs_err_pid*)
            echo "Java crash (hs_err_pid log)"
            grep -E "(# Problematic|SIGSEGV|Current thread|Stack:)" "$core" | head -50
            ;;

        *minidump*)
            if command -v minidump_stackwalk >/dev/null 2>&1; then
                minidump_stackwalk "$core" 2>/dev/null | head -100
            else
                echo "minidump_stackwalk not available — raw hexdump of header"
                head -c 8192 "$core" | hexdump -C | head -20
            fi
            ;;

        *)
            echo "Unknown core format:"
            file "$core"
            head -c 1024 "$core" | hexdump -C | head -10
            ;;
    esac
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

{
  "name": "find_files",
  "description": "Powerful and safe file discovery (replacement for raw find). Automatically limits output, shows size+mtime, supports globs and common debugging filters. Use this every time you need to locate logs, cores, configs, sockets, huge files, or anything modified recently.",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "default": "/",
        "description": "Start directory (e.g. /var/log, /home, /tmp)"
      },
      "name": {
        "type": "string",
        "description": "Name pattern or glob (e.g. '*.log', 'core.*', 'access.log*', 'hs_err_pid*')"
      },
      "type": {
        "type": "string",
        "enum": ["f", "d", "l", "s", "p", "c", "b"],
        "description": "File type: f=file, d=directory, l=symlink, s=socket, p=pipe, c=char dev, b=block dev"
      },
      "size_min_mb": {
        "type": "integer",
        "description": "Minimum size in MB (e.g. 100 → only files ≥ 100 MB)"
      },
      "size_max_mb": {
        "type": "integer",
        "description": "Maximum size in MB"
      },
      "mtime_days": {
        "type": "integer",
        "description": "Modified in last N days (e.g. 7 → past week)"
      },
      "newer_than": {
        "type": "string",
        "description": "Modified more recently than this file (e.g. /var/log/syslog)"
      },
      "perm": {
        "type": "string",
        "description": "Permissions (octal or symbolic, e.g. 777, u+w)"
      },
      "user": {
        "type": "string",
        "description": "Owner username or UID"
      },
      "limit": {
        "type": "integer",
        "default": 150,
        "maximum": 500,
        "description": "Max results (default 150)"
      },
      "sort_by": {
        "type": "string",
        "enum": ["size", "mtime", "name"],
        "default": "mtime",
        "description": "Sort results by size, modification time, or name"
      }
    },
    "required": ["path"],
    "additionalProperties": false
  }
}
find_files() {
    local path="${1:-/}"
    local name="$2"
    local type="$3"
    local size_min_mb="$4"
    local size_max_mb="$5"
    local mtime_days="$6"
    local newer_than="$7"
    local perm="$8"
    local user="$9"
    local limit="${10:-150}"
    local sort_by="${11:-mtime}"

    echo "[find_files] Searching in $path"
    [ -n "$name" ] && echo "  name: $name"
    [ -n "$type" ] && echo "  type: $type"
    [ -n "$size_min_mb" ] && echo "  size ≥ ${size_min_mb}MB"
    [ -n "$mtime_days" ] && echo "  modified ≤ ${mtime_days} days ago"
    echo

    local find_expr=( "$path" )
    [ -n "$name" ] && find_expr+=( -iname "$name" )
    [ -n "$type" ] && find_expr+=( -type "$type" )
    [ -n "$size_min_mb" ] && find_expr+=( -size "+${size_min_mb}M" )
    [ -n "$size_max_mb" ] && find_expr+=( -size "-${size_max_mb}M" )
    [ -n "$mtime_days" ] && find_expr+=( -mtime "-$mtime_days" )
    [ -n "$newer_than" ] && find_expr+=( -newer "$newer_than" )
    [ -n "$perm" ] && find_expr+=( -perm "$perm" )
    [ -n "$user" ] && find_expr+=( -user "$user" )

    # Always exclude noisy paths
    find_expr+=( -not \( -path "*/proc/*" -o -path "*/sys/*" -o -path "*/dev/*" -o -path "*/run/*" -prune \) )

    command find "${find_expr[@]}" 2>/dev/null | \
    xargs -d '\n' stat -c "%A %8s %n %y" 2>/dev/null | \
    awk -v limit="$limit" '
    {
        perm=$1; size=$2; path=$3" "$4" "$5" "$6" "$7" "$8; gsub(/.* /,"",path)
        "date -d \""$5" "$6" "$7" "$8"\" +%s" | getline mtime
        print size, mtime, path
    }' | \
    sort -k "$(case "$sort_by" in
        size) echo "1nr" ;;
        mtime) echo "2nr" ;;
        *) echo "3" ;;
    esac)" | \
    head -n "$limit" | \
    awk '{
        printf "%10s %s  %s\n", $1, strftime("%Y-%m-%d %H:%M", $2), substr($0, index($0,$3))
    }'
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

{
  "name": "find_files",
  "description": "Powerful and safe file discovery (replacement for raw find). Automatically limits output, shows size+mtime, supports globs and common debugging filters. Use this every time you need to locate logs, cores, configs, sockets, huge files, or anything modified recently.",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "default": "/",
        "description": "Start directory (e.g. /var/log, /home, /tmp)"
      },
      "name": {
        "type": "string",
        "description": "Name pattern or glob (e.g. '*.log', 'core.*', 'access.log*', 'hs_err_pid*')"
      },
      "type": {
        "type": "string",
        "enum": ["f", "d", "l", "s", "p", "c", "b"],
        "description": "File type: f=file, d=directory, l=symlink, s=socket, p=pipe, c=char dev, b=block dev"
      },
      "size_min_mb": {
        "type": "integer",
        "description": "Minimum size in MB (e.g. 100 → only files ≥ 100 MB)"
      },
      "size_max_mb": {
        "type": "integer",
        "description": "Maximum size in MB"
      },
      "mtime_days": {
        "type": "integer",
        "description": "Modified in last N days (e.g. 7 → past week)"
      },
      "newer_than": {
        "type": "string",
        "description": "Modified more recently than this file (e.g. /var/log/syslog)"
      },
      "perm": {
        "type": "string",
        "description": "Permissions (octal or symbolic, e.g. 777, u+w)"
      },
      "user": {
        "type": "string",
        "description": "Owner username or UID"
      },
      "limit": {
        "type": "integer",
        "default": 150,
        "maximum": 500,
        "description": "Max results (default 150)"
      },
      "sort_by": {
        "type": "string",
        "enum": ["size", "mtime", "name"],
        "default": "mtime",
        "description": "Sort results by size, modification time, or name"
      }
    },
    "required": ["path"],
    "additionalProperties": false
  }
}
find_files() {
    local path="${1:-/}"
    local name="$2"
    local type="$3"
    local size_min_mb="$4"
    local size_max_mb="$5"
    local mtime_days="$6"
    local newer_than="$7"
    local perm="$8"
    local user="$9"
    local limit="${10:-150}"
    local sort_by="${11:-mtime}"

    echo "[find_files] Searching in $path"
    [ -n "$name" ] && echo "  name: $name"
    [ -n "$type" ] && echo "  type: $type"
    [ -n "$size_min_mb" ] && echo "  size ≥ ${size_min_mb}MB"
    [ -n "$mtime_days" ] && echo "  modified ≤ ${mtime_days} days ago"
    echo

    local find_expr=( "$path" )
    [ -n "$name" ] && find_expr+=( -iname "$name" )
    [ -n "$type" ] && find_expr+=( -type "$type" )
    [ -n "$size_min_mb" ] && find_expr+=( -size "+${size_min_mb}M" )
    [ -n "$size_max_mb" ] && find_expr+=( -size "-${size_max_mb}M" )
    [ -n "$mtime_days" ] && find_expr+=( -mtime "-$mtime_days" )
    [ -n "$newer_than" ] && find_expr+=( -newer "$newer_than" )
    [ -n "$perm" ] && find_expr+=( -perm "$perm" )
    [ -n "$user" ] && find_expr+=( -user "$user" )

    # Always exclude noisy paths
    find_expr+=( -not \( -path "*/proc/*" -o -path "*/sys/*" -o -path "*/dev/*" -o -path "*/run/*" -prune \) )

    command find "${find_expr[@]}" 2>/dev/null | \
    xargs -d '\n' -r stat -c "%A %8s %n %Y" 2>/dev/null | \
    awk -v limit="$limit" '
    {
        perm=$1; size=$2; path=$3; mtime=$4 + 0
        print size, mtime, path
    }' | \
    sort -k "$(case "'$sort_by'" in
        size) echo "1nr" ;;
        mtime) echo "2nr" ;;
        *) echo "3" ;;
    esac)" | \
    head -n "$limit" | \
    awk -v limit="$limit" '{
        printf "%10d %s  %s\n", $1, strftime("%Y-%m-%d %H:%M", $2), $3
    }'
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

list_network() {
    local filter="$1" listening="$2" established="$3" port="$4" pid="$5"
    local unix="$6" high_conn="$7" susp="$8" limit="${9:-200}"

    echo "[list_network] Network overview"
    [ "$listening" = "true" ] && echo " → listening sockets only"
    [ "$established" = "true" ] && echo " → established connections only"
    [ -n "$filter" ] && echo " → filter: $filter"
    [ -n "$port" ] && echo " → port: $port"
    [ "$high_conn" = "true" ] && echo " → high connection count only"
    [ "$susp" = "true" ] && echo " → highlighting suspicious ports"
    [ "$unix" = "true" ] && echo " → Unix domain sockets"
    echo

    # TCP/UDP + process info via ss (fastest + works everywhere)
    if [ "$unix" != "true" ]; then
        ss -anp 2>/dev/null | tail +2 | \
        awk -v f="$filter" -v l="$listening" -v e="$established" \
            -v port="$port" -v pid="$pid" -v hc="$high_conn" -v susp="$susp" '
        function basename(s) {sub(".*/","",s); return s}
        {
            proto=$1; state=$2; local=$4; remote=$5; proc=$7
            gsub(/users:\(\("/,"",proc); gsub(/\"\)\)/,"",proc)
            split(proc,a,",")
            pid=a[2]; cmd=basename(a[1]); gsub(/.*=/,"",pid)
            split(local, la, ":"); lport=la[length(la)]
            split(remote, ra, ":"); rport=ra[length(ra)]

            if (l == "true" && state != "LISTEN") next
            if (e == "true" && state == "LISTEN") next
            if (port && lport != port && rport != port) next
            if (pid && $pid != pid) next
            if (f && cmd !~ f && pid !~ f) next

            flag=""
            if (susp == "true") {
                if (lport+0 < 1024 && pid != "0") flag=flag " [ROOT<1024]"
                if (lport+0 > 49152) flag=flag " [EPHEMERAL]"
                if (lport+0 == 22 || lport+0 == 80 || lport+0 == 443 || lport+0 == 5432) flag=""
            }

            printf "%-5s %-8s %-25s %-25s %8s %6s %-20s %s\n",
                   proto, state, local, remote, pid, lport+0, cmd, flag
            count[pid]++
        } END {
            if (hc == "true") {
                print "\n[High connection processes]"
                for (p in count) if (count[p] > 500) print "  " p ": " count[p] " connections"
            }
        }'
    fi

    # Unix sockets (separate because ss -a shows them differently)
    if [ "$unix" = "true" ] || [ -z "$1" ]; then
        echo
        echo "[Unix domain sockets]"
        ss -a -x 2>/dev/null | tail +2 | head -50 | \
        awk '{print "  " $5 " → " $6 " " $7 " " $8}'
    fi
}
{
  "name": "list_network",
  "description": "Smart replacement for netstat/ss/lsof -i. Shows all listening ports, established connections, Unix sockets, and anomalies (high connection count, weird states, non-standard ports). Automatically highlights suspicious/risky things. Never run ss or netstat manually again.",
  "parameters": {
    "type": "object",
    "properties": {
      "filter": {
        "type": "string",
        "description": "Filter by process name, PID, or port (e.g. nginx, 5432, redis, java)"
      },
      "listening": {
        "type": "boolean",
        "default": false,
        "description": "Show only listening sockets (like netstat -tuln)"
      },
      "established": {
        "type": "boolean",
        "default": false,
        "description": "Show only established connections"
      },
      "port": {
        "type": "integer",
        "description": "Show only this port number"
      },
      "pid": {
        "type": "integer",
        "description": "Show network activity for this PID only"
      },
      "unix_sockets": {
        "type": "boolean",
        "default": false,
        "description": "Show Unix domain sockets (docker.sock, mysql.sock, etc.)"
      },
      "high_connections": {
        "type": "boolean",
        "default": false,
        "description": "Show processes with > 500 established connections"
      },
      "suspicious": {
        "type": "boolean",
        "default": false,
        "description": "Highlight non-standard listening ports (<1024 without root, >49152, odd services)"
      },
      "limit": {
        "type": "integer",
        "default": 200,
        "maximum": 1000
      }
    },
    "additionalProperties": false
  }
}

@jpic
Copy link
Author

jpic commented Nov 20, 2025

import textwrap
from typing import Optional
import httpx
import time
import asyncio
from django.db import models
from your_app.models import HostName
from your_framework import Tool, Parameter


async def get_thanos_url_for_hostname(hostname_hint: str) -> str:
    '''Resolve any hostname hint → correct Thanos URL using async Django ORM.'''
    host_obj = await HostName.objects.filter(
        models.Q(hostname__iexact=hostname_hint) |
        models.Q(fqdn__iexact=hostname_hint)
    ).select_related('server__environment').afirst()

    if not host_obj:
        raise ValueError(f"Host '{hostname_hint}' not found in inventory")

    if not host_obj.server or not host_obj.server.environment:
        raise ValueError(f"Host '{hostname_hint}' has no server or environment configured")

    url = host_obj.server.environment.thanos_query_server
    if not url:
        raise ValueError(f"Environment for '{hostname_hint}' has no Thanos URL")

    return url.rstrip('/')


class ListPrometheusInstances(Tool):
    description = textwrap.dedent('''
        List currently scraped instances in the correct environment.
        Handles:
        • node_exporter → instance = http://primary-fqdn:9100
        • cassandra_exporter → instance = cass-node-name:8080 + cluster/rack labels
    ''')

    hostname_hint = Parameter(
        type='string',
        description='Any hostname/FQDN you know (e.g. db01, cass-stg-07.example.com)',
        required=True,
    )
    job = Parameter(
        type='string',
        description="Optional filter: 'node_exporter' or 'cassandra_exporter'",
        required=False,
    )

    async def run(self, conversation, hostname_hint: str, job: Optional[str] = None) -> str:
        try:
            thanos_base = await get_thanos_url_for_hostname(hostname_hint)
        except Exception as e:
            return f"[list_prometheus_instances] ERROR: {e}"

        env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'

        # Build correct match expression
        if job == 'node_exporter':
            match_query = 'up{job="node_exporter"}'
        elif job == 'cassandra_exporter':
            match_query = 'up{job="cassandra_exporter"}'
        else:
            match_query = 'up'

        async with httpx.AsyncClient(timeout=20.0) as client:
            try:
                resp = await client.get(
                    f"{thanos_base}/api/v1/query",
                    params={'query': match_query},
                    timeout=20,
                )
                resp.raise_for_status()
                results = resp.json()['data']['result']
            except Exception as e:
                return f"[list_prometheus_instances] {env_name} — Query failed: {e}"

        if not results:
            return f"[list_prometheus_instances] {env_name} — No instances found."

        lines = [
            f"[list_prometheus_instances] {env_name} (resolved via {hostname_hint})",
            f"    Found {len(results)} instances\n"
        ]

        for r in sorted(results, key=lambda x: x['metric'].get('instance', '')):
            m = r['metric']
            instance_raw = m.get('instance', '?')
            job_name = m.get('job', 'unknown')

            if job_name == 'node_exporter':
                # http://host.prod.example.com:9100 → host.prod.example.com
                clean = instance_raw.replace('http://', '').split(':')[0]
                display = f"{clean:<48}  node_exporter"
            else:
                # Cassandra exporter
                cass_node = instance_raw.split(':')[0]
                cluster = m.get('cluster', '?')
                rack = m.get('rack', '?')
                display = f"{cass_node:<30}  cassandra_exporter  cluster={cluster} rack={rack}"

            lines.append(f"    {display}")

        return '\n'.join(lines)


class QueryPrometheus(Tool):
    description = textwrap.dedent('''
        Run any PromQL query in the correct environment.
        Works perfectly with both:
        • instance="http://primary-fqdn:9100" (node_exporter)
        • instance="cassandra-node:8080" + cluster/rack (cassandra_exporter)
    ''')

    hostname_hint = Parameter(
        type='string',
        description='Any known hostname in the target environment',
        required=True,
    )
    query = Parameter(
        type='string',
        description='Exact PromQL query — use instance labels from list_prometheus_instances',
        required=True,
    )
    time_range_minutes = Parameter(
        type='integer',
        description='For range queries: look back N minutes',
        required=False,
    )

    async def run(self, conversation, hostname_hint: str, query: str,
                  time_range_minutes: Optional[int] = None) -> str:
        try:
            thanos_base = await get_thanos_url_for_hostname(hostname_hint)
        except Exception as e:
            return f"[query_prometheus] ERROR: {e}"

        env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'

        async with httpx.AsyncClient(timeout=45.0) as client:
            try:
                if time_range_minutes and time_range_minutes > 0:
                    end = int(time.time())
                    start = end - time_range_minutes * 60
                    resp = await client.get(
                        f"{thanos_base}/api/v1/query_range",
                        params={
                            'query': query,
                            'start': start,
                            'end': end,
                            'step': '60s',
                        },
                    )
                else:
                    resp = await client.get(
                        f"{thanos_base}/api/v1/query",
                        params={'query': query},
                    )
                resp.raise_for_status()
                data = resp.json()['data']
            except Exception as e:
                return f"[query_prometheus] {env_name} — Request failed: {e}"

        if not data.get('result'):
            return f"[query_prometheus] {env_name} — No results\nQuery: {query}"

        lines = [
            f"[query_prometheus] {env_name} (via {hostname_hint})",
            f"Query: {query}",
        ]
        if time_range_minutes:
            lines.append(f"Range: last {time_range_minutes} min")

        for series in data['result'][:50]:
            m = series['metric']
            inst_raw = m.get('instance', m.get('job', 'unknown'))
            job_name = m.get('job', 'unknown')

            if job_name == 'node_exporter':
                inst_display = inst_raw.replace('http://', '').split(':')[0]
                extra = ''
            else:
                inst_display = inst_raw.split(':')[0]
                cluster = m.get('cluster', '?')
                rack = m.get('rack', '?')
                extra = f"  cluster={cluster} rack={rack}"

            labels = ' '.join(
                f"{k}={v}"
                for k, v in m.items()
                if k not in {'__name__', 'instance', 'job', 'cluster', 'rack'}
            )

            lines.append(f"\n{inst_display}{extra}  {labels}".strip())

            if 'value' in series:
                lines.append(f"    → {series['value'][1]}")
            else:
                for ts, val in series['values'][-5:]:
                    try:
                        tm_resp = await client.get(
                            f"{thanos_base}/api/v1/format_time",
                            params={'time': ts},
                            timeout=5,
                        )
                        tm = tm_resp.text.strip() if tm_resp.ok else '?'
                    except Exception:
                        tm = '?'
                    lines.append(f"    [{tm}] {val}")

        if len(data['result']) > 50:
            lines.append(f"\n... (truncated, {len(data['result'])} total series)")

        return '\n'.join(lines)

@jpic
Copy link
Author

jpic commented Nov 20, 2025

class ListPrometheusInstances(Tool):
    description = textwrap.dedent('''
        List currently scraped instances in the correct environment.
        Your setup:
        • job="node"       → node_exporter (instance = primary-hostname:9100)
        • job="nodetool"   → centralized custom exporter (instance is useless, use hostname= label instead)
    ''')

    hostname_hint = Parameter(
        type='string',
        description='Any hostname you know in the target environment (e.g. web01, cass07, db-stg-03)',
        required=True,
    )
    job = Parameter(
        type='string',
        description="Filter by job: 'node' or 'nodetool'",
        required=False,
    )

    async def run(self, conversation, hostname_hint: str, job: Optional[str] = None) -> str:
        try:
            thanos_base = await get_thanos_url_for_hostname(hostname_hint)
        except Exception as e:
            return f"[list_prometheus_instances] ERROR: {e}"

        env_name = 'PRODUCTION' if 'prod' in thanos_base.lower() else 'STAGING'

        # Build correct query for each job
        if job == 'node':
            promql = 'up{job="node"}'
        elif job == 'nodetool':
            promql = 'up{job="nodetool"}'
        else:
            promql = 'up{job=~"node|nodetool"}'

        async with httpx.AsyncClient(timeout=20.0) as client:
            try:
                resp = await client.get(
                    f"{thanos_base}/api/v1/query",
                    params={'query': promql},
                    timeout=20,
                )
                resp.raise_for_status()
                results = resp.json()['data']['result']
            except Exception as e:
                return f"[list_prometheus_instances] {env_name} — Query failed: {e}"

        if not results:
            return f"[list_prometheus_instances] {env_name} — No instances found."

        lines = [
            f"[list_prometheus_instances] {env_name} (via {hostname_hint})",
            f"    Found {len(results)} instances\n"
        ]

        for r in sorted(results, key=lambda x: x['metric'].get('instance', '') or x['metric'].get('hostname', '')):
            m = r['metric']
            job_name = m.get('job', 'unknown')

            if job_name == 'node':
                instance = m.get('instance', '?')
                clean_host = instance.split(':')[0]  # removes :9100
                display = f"{clean_host:<45}  node_exporter"
            else:  # job="nodetool"
                hostname_label = m.get('hostname', '?')
                useless_instance = m.get('instance', '?')
                display = f"{hostname_label:<30}  nodetool_exporter  (instance={useless_instance} → ignore)"

            lines.append(f"    {display}")

        return '\n'.join(lines)
### METRICS RULES — OBEY OR FAIL

You have exactly two exporters:

1. Node exporter  
   job="node"  
   instance = "primary-hostname:9100" → use this exactly

2. Centralized nodetool exporter  
   job="nodetool"  
   instance = useless (always the same) → IGNORE IT  
   hostname = "cass-prod-07" (real Cassandra node) → use this instead

### WORKFLOW (never skip a step)
1. Always start with:
   list_prometheus_instances hostname_hint=<any-host> job=<node|nodetool>

2. Copy the exact instance (for node) or hostname label (for nodetool) from the output.

3. Write queries exactly like this:

   # System / host
   node_memory_MemAvailable_bytes{job="node", instance="web-prod-01.example.com:9100"}
   100 * (1 - node_memory_MemAvailable_bytes{job="node", instance="db01:9100"} / node_memory_MemTotal_bytes{job="node", instance="db01:9100"})

   # Cassandra
   cassandra_heap_used_bytes{job="nodetool", hostname="cass-prod-07"}
   rate(cassandra_gc_duration_seconds_count{job="nodetool", hostname="cass-stg-03"}[5m])

Never guess labels. Never use the wrong job.  
If unsure → run list_prometheus_instances first.

Do it right → you’re faster than any human.  
Do it wrong → you’re useless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment