Last active
August 14, 2025 18:09
-
-
Save vacmar01/8c1c2387766ba5f07e7d44ab420c6ea5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# dependencies = [ | |
# "dspy", | |
# "rich" | |
# ] | |
# /// | |
import dspy | |
import os | |
import inspect | |
import subprocess | |
import argparse | |
from typing import Any, Dict, Optional | |
# Core business logic | |
def list_files(path: str): | |
"""List files in a directory""" | |
try: | |
return os.listdir(path) | |
except FileNotFoundError: | |
return f"Error: Directory '{path}' does not exist" | |
except PermissionError: | |
return f"Error: Permission denied accessing directory '{path}'" | |
except NotADirectoryError: | |
return f"Error: '{path}' is not a directory" | |
except Exception as e: | |
return f"Error listing files in '{path}': {str(e)}" | |
def read_file(path: str): | |
"""Read a file""" | |
try: | |
with open(path, 'r') as f: | |
return f.read() | |
except FileNotFoundError: | |
return f"Error: File '{path}' does not exist" | |
except PermissionError: | |
return f"Error: Permission denied reading file '{path}'" | |
except IsADirectoryError: | |
return f"Error: '{path}' is a directory, not a file" | |
except UnicodeDecodeError: | |
return f"Error: Cannot decode file '{path}' - it may be binary or have wrong encoding" | |
except Exception as e: | |
return f"Error reading file '{path}': {str(e)}" | |
def write_file(path: str, content: str): | |
"""Write content to a file""" | |
try: | |
with open(path, 'w') as f: | |
f.write(content) | |
return "Content written successfully" | |
except PermissionError: | |
return f"Error: Permission denied writing to file '{path}'" | |
except IsADirectoryError: | |
return f"Error: '{path}' is a directory, cannot write to it" | |
except FileNotFoundError: | |
return f"Error: Directory containing '{path}' does not exist" | |
except OSError as e: | |
if e.errno == 28: # No space left | |
return f"Error: No space left on device when writing to '{path}'" | |
return f"Error: OS error writing to '{path}': {str(e)}" | |
except Exception as e: | |
return f"Error writing to file '{path}': {str(e)}" | |
def run_cmd(cmd: str): | |
"""Run a command in the shell and return the output.""" | |
try: | |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) | |
if result.returncode == 0: | |
return result.stdout.strip() if result.stdout.strip() else "Command executed successfully (no output)" | |
else: | |
error_msg = result.stderr.strip() if result.stderr.strip() else "Unknown error" | |
return f"Command failed (exit code {result.returncode}): {error_msg}" | |
except subprocess.TimeoutExpired: | |
return f"Error: Command '{cmd}' timed out after 30 seconds" | |
except FileNotFoundError: | |
return f"Error: Command not found - '{cmd.split()[0] if cmd.split() else cmd}'" | |
except PermissionError: | |
return f"Error: Permission denied executing command '{cmd}'" | |
except Exception as e: | |
return f"Error executing command '{cmd}': {str(e)}" | |
def finish(answer: str): | |
"""Conclude the trajectory and return the final answer.""" | |
return answer | |
def fn_metadata(func): | |
sig = inspect.signature(func) | |
doc = inspect.getdoc(func) or "No docstring." | |
return dict(function_name=func.__name__, arguments=str(sig), docstring=doc) | |
# UI functions for clean display | |
from rich.console import Console | |
console = Console() | |
def confirm_cmd(cmd: str) -> bool: | |
console.print() | |
console.print("│ Execute command:", style="dim") | |
console.print(f"│ [bold cyan]$ {cmd}[/bold cyan]") | |
resp = console.input("│ Continue? [dim](y/n)[/dim] ") | |
return resp.lower() == 'y' | |
def show_step(item: Dict[str, Any]): | |
fn = item['selected_fn'] | |
args = item['args'] | |
# Function call line | |
console.print(f"\n[dim]→[/dim] [bold]{fn}[/bold]", end="") | |
# Show args inline if simple, otherwise on new line | |
if args: | |
args_str = ', '.join(f"{k}={repr(v)[:50]}" for k, v in args.items()) | |
if len(args_str) < 40: | |
console.print(f"[dim]({args_str})[/dim]") | |
else: | |
console.print() | |
for k, v in args.items(): | |
val_str = repr(v) | |
if len(val_str) > 60: | |
val_str = val_str[:57] + "..." | |
console.print(f" [dim]{k}:[/dim] {val_str}") | |
else: | |
console.print() | |
# Show full reasoning (no truncation) | |
if fn != "finish": | |
reason = item['reasoning'] | |
lines = reason.split('\n') | |
for line in lines: | |
if line.strip(): | |
console.print(f" [dim italic]{line}[/dim italic]") | |
# Show output if it's short and relevant | |
output = str(item.get('fn_output', '')) | |
if output and len(output) < 100 and fn != "finish": | |
console.print(f" [dim]↳[/dim] [green]{output[:80]}[/green]") | |
def get_input() -> Optional[str]: | |
console.print() | |
msg = console.input("[bold]>[/bold] ") | |
return None if msg.lower() in ["/exit", "/quit", "/q"] else msg | |
def show_answer(answer: str): | |
console.print() | |
console.print("─" * 40, style="dim") | |
if len(answer) < 100: | |
console.print(answer) | |
else: | |
lines = answer.split('\n') | |
for line in lines: | |
if line.strip(): | |
console.print(line) | |
console.print("─" * 40, style="dim") | |
def show_prompt(): | |
console.clear() | |
console.print("[bold]DSPy Agent CLI[/bold] [dim]• Type '/exit' to quit[/dim]\n") | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description='DSPy Agent CLI') | |
parser.add_argument('--max-steps', type=int, default=10, | |
help='Maximum number of steps for the agent (default: 10)') | |
args = parser.parse_args() | |
# Setup | |
tools = { | |
"list_files": list_files, | |
"read_file": read_file, | |
"write_file": write_file, | |
"run_cmd": run_cmd, | |
"finish": finish | |
} | |
tools_metadata = {nm: fn_metadata(fn) for nm, fn in tools.items()} | |
lm = dspy.LM("groq/moonshotai/kimi-k2-instruct", | |
api_key=os.environ["GROQ_API_KEY"]) | |
dspy.configure(lm=lm) | |
sig = dspy.Signature('question, trajectory, functions, history -> next_selected_fn, args: dict[str, Any]') | |
react = dspy.ChainOfThought(sig) | |
max_steps = args.max_steps | |
# Main loop | |
show_prompt() | |
history = dspy.History(messages=[]) | |
while True: | |
msg = get_input() | |
if msg is None: | |
break | |
# Agent logic moved into main loop | |
traj = [] | |
for _ in range(max_steps): | |
pred = react(question=msg, trajectory=traj, | |
functions=tools_metadata, history=history) | |
fn_name = pred.next_selected_fn.strip('"').strip("'") | |
# Handle command confirmation | |
if fn_name == "run_cmd": | |
if not confirm_cmd(pred.args['cmd']): | |
output = "Command execution cancelled by user" | |
else: | |
output = tools[fn_name](**pred.args) | |
else: | |
output = tools[fn_name](**pred.args) | |
item = dict(reasoning=pred.reasoning, selected_fn=fn_name, | |
args=pred.args, fn_output=output) | |
traj.append(item) | |
show_step(item) | |
if fn_name == "finish": | |
break | |
history.messages.append({"message": msg, "answer": output}) | |
show_answer(output) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
with
uv
installed, run it like this:Click on the raw button on the top right.
Copy the url.
uv run <url>
in your terminal.You need a
GROQ_API_KEY
in your environment (export GROQ_API_KEY=<api_key>
when in doubt).The max_steps of one agent action can be defined as a argument like
--max-steps=30