Skip to content

Instantly share code, notes, and snippets.

@g023
Created June 29, 2026 00:40
Show Gist options
  • Select an option

  • Save g023/7f6ec74f7634d939ca8066342fb95970 to your computer and use it in GitHub Desktop.

Select an option

Save g023/7f6ec74f7634d939ca8066342fb95970 to your computer and use it in GitHub Desktop.
DeepSeek v4 Harness - g023's DS Code
#!/usr/bin/env python3
"""
DeepSeek Harness – A single‑file agentic orchestrator for DeepSeek V4+ models.
Supports caching, subagents, skills, memory (PEEK‑style compression), tool calling,
wait/sleep, and background process management.
Usage:
python script.py goal "Your goal" [--model MODEL] [--reasoning LEVEL] [--no-cache]
python script.py init
python script.py --interactive [--no-cache]
API key is read from, in order of preference:
1. ../K_DS.dat (relative to this script)
2. the API_KEY variable below
3. the DEEPSEEK_API_KEY environment variable
Base URL can be overridden via DEEPSEEK_BASE_URL.
"""
from __future__ import annotations # for nicer type hints on 3.9+
import asyncio
import aiohttp
import json
import os
import sys
import hashlib
import time
import re
import traceback
import logging
import argparse
import uuid
from pathlib import Path
from typing import Dict, List, Any, Optional, Callable, Union
import venv as _venv # avoid clashing with local variable name
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
API_KEY = "" # set directly if desired
DEFAULT_MODEL = "deepseek-v4-flash" # adjust to actual model name
DEFAULT_REASONING = "high" # low / medium / high / none
WORK_DIR_NAME = ".deepseek-harness" # working folder inside project root
CACHE_TTL = 3600 * 24 * 7 # 7 days
MAX_CONTEXT_TOKENS = 256000 # adjust per model
MAX_OUTPUT_TOKENS = 32767
RATE_LIMIT_RPM = 10 # requests per minute
MAX_CONCURRENT = 10
RETRY_DELAY = 2.0 # seconds between retries on API error
MAX_RETRIES = 2 # total HTTP retries (outer loop)
MAX_RATE_ATTEMPTS = 3 # attempts per retry for 429 / transient
REQUEST_TIMEOUT = 60 * 5 # seconds for HTTP requests
MAX_CONSECUTIVE_API_ERRORS = 3 # stop loop after this many consecutive API failures
MAX_ITERATIONS = 999 # default max iterations for the main agent loop
SUMMARY_INTERVAL = 60.0 # seconds between summarization attempts
MAX_SEARCH_FILES = 1000
# Directories / files that tools should never touch
EXCLUDED_DIRS = {
".git", "__pycache__", ".venv", "venv", "node_modules",
".deepseek-harness", ".mypy_cache", ".pytest_cache",
"dist", "build", "*.egg-info"
}
EXCLUDED_FILES = {".DS_Store", "Thumbs.db"}
# Minimum Python version required (asyncio.to_thread is 3.9+)
REQUIRED_PYTHON = (3, 9)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------------
def get_api_key() -> str:
try:
kds_path = Path(__file__).resolve().parent.parent / "K_DS.dat"
if kds_path.is_file():
key = kds_path.read_text().strip()
if key:
return key
except Exception:
pass
if API_KEY:
return API_KEY
env_key = os.environ.get("DEEPSEEK_API_KEY")
if env_key:
return env_key
raise ValueError(
"API key not set. Place key in ../K_DS.dat, set API_KEY, or set DEEPSEEK_API_KEY env var."
)
def get_base_url() -> str:
return os.environ.get("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
def get_working_dir(project_root: Optional[Path] = None) -> Path:
if project_root is None:
project_root = Path.cwd()
wd = project_root / WORK_DIR_NAME
wd.mkdir(exist_ok=True, parents=True)
for sub in ["cache", "memory", "subagents", "tools", "skills"]:
subdir = wd / sub
subdir.mkdir(exist_ok=True)
(subdir / ".gitkeep").touch(exist_ok=True)
return wd
# Token counting
_tiktoken_available = False
_tiktoken_warned = False
def count_tokens(text: str) -> int:
global _tiktoken_available, _tiktoken_warned
if not _tiktoken_available:
try:
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
_ = enc.encode("test")
_tiktoken_available = True
except Exception:
if not _tiktoken_warned:
logger.warning(
"tiktoken not available; using len(text)//4 approximation for token counts."
)
_tiktoken_warned = True
return len(text) // 4
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
return len(enc.encode(text))
def tokenize_messages(msgs: List[Dict[str, Any]]) -> int:
return count_tokens(json.dumps(msgs, ensure_ascii=False))
def truncate_text(text: str, max_tokens: int) -> str:
try:
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(text)
if len(tokens) <= max_tokens:
return text
return enc.decode(tokens[:max_tokens])
except Exception:
return text[:max_tokens * 4]
def _get_venv_python(venv_path: Path) -> Path:
if os.name == 'nt':
return venv_path / "Scripts" / "python.exe"
else:
return venv_path / "bin" / "python"
def _should_skip_path(path: Path, project_root: Path, work_dir: Path) -> bool:
"""Return True if path is inside excluded directory or file."""
# Never access the working directory
try:
path.resolve().relative_to(work_dir.resolve())
return True
except ValueError:
pass
# Check each component against excluded directory names
parts = path.relative_to(project_root).parts
for part in parts:
if part in EXCLUDED_DIRS:
return True
# wildcard matching for things like *.egg-info
for pattern in EXCLUDED_DIRS:
if pattern.startswith("*.") and part.endswith(pattern[1:]):
return True
# Check filename exclusion
if path.name in EXCLUDED_FILES:
return True
return False
# ---------------------------------------------------------------------------
# Rate limiter
# ---------------------------------------------------------------------------
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = float(capacity)
self.updated_at = time.monotonic()
self.lock = asyncio.Lock()
async def acquire(self):
while True:
wait_time = 0.0
async with self.lock:
now = time.monotonic()
elapsed = now - self.updated_at
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.updated_at = now
if self.tokens >= 1.0:
self.tokens -= 1.0
return
else:
wait_time = (1.0 - self.tokens) / self.rate
if wait_time > 0:
await asyncio.sleep(wait_time)
class RateLimiter:
def __init__(self, rpm: int, max_concurrent: int):
self.bucket = TokenBucket(rate=rpm / 60.0, capacity=max_concurrent)
async def acquire(self):
await self.bucket.acquire()
# ---------------------------------------------------------------------------
# DeepSeek API client
# ---------------------------------------------------------------------------
class DeepSeekClient:
"""Asynchronous client for the DeepSeek chat completions API."""
def __init__(
self,
api_key: str,
model: str = DEFAULT_MODEL,
reasoning_level: str = DEFAULT_REASONING,
cache_dir: Optional[Path] = None,
base_url: Optional[str] = None,
cache_enabled: bool = True,
):
self.api_key = api_key
self.model = model
self.reasoning_level = reasoning_level
self.base_url = base_url or get_base_url()
self.session: Optional[aiohttp.ClientSession] = None
self.rate_limiter = RateLimiter(RATE_LIMIT_RPM, MAX_CONCURRENT)
self.cache_dir = cache_dir
self.cache_enabled = cache_enabled and cache_dir is not None
def _ensure_session(self) -> None:
if self.session is None:
raise RuntimeError(
"DeepSeekClient must be used inside an 'async with' block to manage the HTTP session."
)
async def __aenter__(self):
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)
self.session = aiohttp.ClientSession(timeout=timeout)
return self
async def __aexit__(self, exc_type, exc, tb):
if self.session:
await self.session.close()
async def check_connectivity(self) -> bool:
self._ensure_session()
try:
async with self.session.get(
f"{self.base_url}/models",
headers={"Authorization": f"Bearer {self.api_key}"}
) as resp:
if resp.status == 200:
logger.info("API connectivity OK.")
return True
else:
text = await resp.text()
logger.error(f"API health check failed: {resp.status} {text}")
return False
except Exception as e:
logger.error(f"API health check error: {e}")
return False
def _cache_path(self, payload: dict) -> Path:
key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
return self.cache_dir / f"api_{key}.json"
def clear_cache(self) -> None:
if self.cache_dir and self.cache_dir.exists():
for f in self.cache_dir.glob("api_*.json"):
f.unlink()
logger.info("API response cache cleared.")
async def _cached_request(self, payload: dict) -> dict:
if not self.cache_enabled:
return await self._do_request(payload)
cache_file = self._cache_path(payload)
if cache_file.exists():
try:
cached = json.loads(cache_file.read_text())
if time.time() - cached.get("timestamp", 0) < CACHE_TTL:
logger.debug("Using cached API response")
return cached["response"]
except Exception:
pass
response = await self._do_request(payload)
try:
cache_file.parent.mkdir(parents=True, exist_ok=True)
tmp_file = cache_file.with_suffix(".tmp")
tmp_file.write_text(json.dumps({
"timestamp": time.time(),
"payload_hash": hashlib.sha256(
json.dumps(payload, sort_keys=True).encode()
).hexdigest(),
"response": response
}))
tmp_file.replace(cache_file)
except Exception as e:
logger.warning(f"Failed to write cache file {cache_file}: {e}")
return response
async def _do_request(self, payload: dict) -> dict:
self._ensure_session()
last_exc = None
for retry in range(MAX_RETRIES):
for attempt in range(MAX_RATE_ATTEMPTS):
try:
await self.rate_limiter.acquire()
resp = await self.session.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=payload
)
if resp.status == 429:
retry_after = resp.headers.get("Retry-After", "5")
delay = float(retry_after) + 1
logger.warning(f"Rate limited (429), waiting {delay}s")
await asyncio.sleep(delay)
continue
if resp.status >= 500:
text = await resp.text()
raise aiohttp.ClientResponseError(
resp.request_info, resp.history, status=resp.status, message=text
)
if resp.status != 200:
text = await resp.text()
raise aiohttp.ClientResponseError(
resp.request_info, resp.history, status=resp.status, message=text
)
result = await resp.json()
return result
except aiohttp.ClientResponseError as e:
if 400 <= e.status < 500 and e.status != 429:
raise # non‑retryable
last_exc = e
if attempt < MAX_RATE_ATTEMPTS - 1:
delay = RETRY_DELAY * (2 ** attempt)
logger.warning(
f"API error (status {e.status}), retrying in {delay}s: {e}"
)
await asyncio.sleep(delay)
else:
break
except Exception as e:
last_exc = e
if attempt < MAX_RATE_ATTEMPTS - 1:
delay = RETRY_DELAY * (2 ** attempt)
logger.warning(
f"API error ({type(e).__name__}), retrying in {delay}s: {e}"
)
await asyncio.sleep(delay)
else:
break
if retry < MAX_RETRIES - 1:
delay = RETRY_DELAY * (2 ** retry)
logger.warning(f"API request retry {retry+1}/{MAX_RETRIES} in {delay}s: {last_exc}")
await asyncio.sleep(delay)
else:
raise last_exc
async def chat_completion(
self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[str] = None,
max_tokens: int = MAX_OUTPUT_TOKENS,
temperature: float = 0.7,
model: Optional[str] = None,
reasoning_effort: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"model": model or self.model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
**kwargs
}
if tools:
payload["tools"] = tools
if tool_choice:
payload["tool_choice"] = tool_choice
effort = reasoning_effort if reasoning_effort is not None else self.reasoning_level
if effort and effort.lower() != "none":
payload["reasoning_effort"] = effort
return await self._cached_request(payload)
# ---------------------------------------------------------------------------
# Memory with PEEK‑inspired compression
# ---------------------------------------------------------------------------
class PeekMemory:
def __init__(
self,
client: DeepSeekClient,
max_tokens: int = MAX_CONTEXT_TOKENS,
keep_recent: int = 15,
summary_max_tokens: int = 2000,
summary_interval: float = SUMMARY_INTERVAL,
):
self.client = client
self.max_tokens = max_tokens
self.keep_recent = keep_recent
self.summary_max_tokens = summary_max_tokens
self.messages: List[Dict[str, Any]] = []
self.compressed_summary: Optional[str] = None
self.last_summary_index = 0
self.last_summary_time = 0.0
self.summary_interval = summary_interval
self._lock = asyncio.Lock()
async def add_message(
self, role: str, *, content: Optional[str] = None, **kwargs
) -> None:
entry: Dict[str, Any] = {"role": role}
if content is not None:
entry["content"] = content
for k, v in kwargs.items():
if k != "content":
entry[k] = v
allowed_keys = {"role", "content", "tool_calls", "tool_call_id", "name", "reasoning_content"}
clean = {k: v for k, v in entry.items() if k in allowed_keys}
async with self._lock:
self.messages.append(clean)
async def add_tool_result(self, tool_call_id: str, content: str) -> None:
# Truncate tool output to save tokens
truncated = truncate_text(content, 2000)
async with self._lock:
self.messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": truncated
})
async def get_context_messages(self) -> List[Dict[str, Any]]:
need_summary = False
old_msgs: List[Dict[str, Any]] = []
async with self._lock:
total_tokens = tokenize_messages(self.messages)
now = time.monotonic()
# Summarize if tokens exceed threshold and enough time has passed since last summary
if total_tokens > self.max_tokens * 0.8 and (
not self.compressed_summary or (now - self.last_summary_time) > self.summary_interval
):
start_idx = self.last_summary_index
end_idx = max(0, len(self.messages) - self.keep_recent)
if start_idx < end_idx:
old_msgs = self.messages[start_idx:end_idx].copy()
self.last_summary_index = end_idx
need_summary = True
else:
# If no old messages, force summarization of everything before recent window
old_msgs = self.messages[:end_idx].copy() if len(self.messages) > self.keep_recent else []
self.last_summary_index = end_idx
need_summary = bool(old_msgs)
if need_summary and old_msgs:
new_summary = await self._generate_summary_async(
old_msgs, self.compressed_summary
)
async with self._lock:
self.compressed_summary = new_summary
self.last_summary_time = time.monotonic()
async with self._lock:
# Build the context
if self.compressed_summary:
system_msg = {
"role": "system",
"content": f"[Previous conversation summary]:\n{self.compressed_summary}"
}
else:
system_msg = {"role": "system", "content": "No previous summary."}
recent = (
self.messages[-self.keep_recent:]
if len(self.messages) > self.keep_recent
else self.messages[:]
)
combined = [system_msg] + recent
# Enforce token budget
combined = await self._enforce_token_budget(combined)
combined = self._remove_orphaned_tool_messages(combined)
return combined
def _remove_orphaned_tool_messages(
self, messages: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
valid_ids = set()
for m in messages:
if m.get("role") == "assistant" and "tool_calls" in m:
for tc in m["tool_calls"]:
valid_ids.add(tc["id"])
return [
m for m in messages
if m.get("role") != "tool" or m.get("tool_call_id") in valid_ids
]
async def _enforce_token_budget(
self, messages: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
# First truncate system message if over budget
total = tokenize_messages(messages)
if total > self.max_tokens and messages:
sys_msg = messages[0]
excess = total - self.max_tokens
current_tokens = count_tokens(sys_msg["content"])
new_len = max(0, current_tokens - excess)
sys_msg["content"] = truncate_text(sys_msg["content"], new_len)
total = tokenize_messages(messages)
# If still over, remove oldest recent messages (after system)
while total > self.max_tokens and len(messages) > 1:
removed = messages.pop(1)
total -= tokenize_messages([removed])
return messages
async def _generate_summary_async(
self, new_msgs: List[Dict[str, Any]], previous_summary: Optional[str]
) -> str:
if previous_summary:
prompt = (
"You are continuing a conversation summary. Below is the previous summary, "
"followed by new messages. Update the summary to include all important facts, "
"decisions, and the current state.\n\n"
f"Previous summary: {previous_summary}\n\nNew messages:\n"
)
else:
prompt = (
"Summarise the following conversation history in a concise paragraph. "
"Retain all important facts, decisions, and the current state.\n\n"
)
text_parts = []
for m in new_msgs:
role = m["role"]
content = m.get("content", "")
if role == "tool":
text_parts.append(
f"Tool result (id={m.get('tool_call_id','')}): {content}"
)
else:
text_parts.append(f"{role}: {content}")
full_text = "\n".join(text_parts)
# Truncate to a reasonable length (larger than before)
prompt += truncate_text(full_text, 8000)
try:
resp = await self.client.chat_completion(
messages=[{"role": "user", "content": prompt}],
max_tokens=self.summary_max_tokens,
temperature=0.3,
reasoning_effort="none", # no need for reasoning on summary
model=self.client.model, # use main model but could be overridden
)
return resp["choices"][0]["message"]["content"]
except Exception as e:
logger.error(f"Summary generation failed: {e}. Falling back to truncation.")
return truncate_text(full_text, self.summary_max_tokens)
async def reset(self) -> None:
async with self._lock:
self.messages.clear()
self.compressed_summary = None
self.last_summary_index = 0
self.last_summary_time = 0.0
# ---------------------------------------------------------------------------
# Tool registry
# ---------------------------------------------------------------------------
class ToolRegistry:
def __init__(self) -> None:
self.tools: Dict[str, Callable] = {}
self.definitions: List[Dict[str, Any]] = []
def register(
self, name: str, func: Callable, description: str, parameters: Dict[str, Any]
) -> None:
self.tools[name] = func
self.definitions.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": parameters
}
})
def get_definitions(self) -> List[Dict[str, Any]]:
return self.definitions
async def execute(self, tool_call: Dict[str, Any]) -> str:
name = tool_call["function"]["name"]
args = json.loads(tool_call["function"]["arguments"])
if name not in self.tools:
return f"Error: tool '{name}' not found"
try:
func = self.tools[name]
if asyncio.iscoroutinefunction(func):
result = await func(**args)
else:
result = await asyncio.to_thread(func, **args)
return result if isinstance(result, str) else json.dumps(result, default=str)
except Exception as e:
return f"Tool error: {traceback.format_exc()}"
# ---------------------------------------------------------------------------
# Background process manager
# ---------------------------------------------------------------------------
class BackgroundProcessManager:
def __init__(self) -> None:
self.jobs: Dict[str, Dict[str, Any]] = {}
async def start(self, command: str) -> str:
job_id = str(uuid.uuid4())[:8]
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
async def reader() -> str:
stdout, stderr = await process.communicate()
return stdout.decode(errors='replace') + "\n" + stderr.decode(errors='replace')
task = asyncio.create_task(reader())
self.jobs[job_id] = {
"process": process,
"task": task,
"output": None,
"status": "running"
}
def callback(t: asyncio.Task) -> None:
try:
output = t.result()
self.jobs[job_id]["output"] = output
self.jobs[job_id]["status"] = "completed"
except Exception as e:
self.jobs[job_id]["output"] = f"Exception: {e}"
self.jobs[job_id]["status"] = "error"
task.add_done_callback(callback)
return job_id
def status(self, job_id: str) -> Optional[str]:
job = self.jobs.get(job_id)
return job["status"] if job else None
def output(self, job_id: str) -> Optional[str]:
job = self.jobs.get(job_id)
if not job:
return None
if job["status"] in ("completed", "error"):
return job["output"]
return "Job still running."
async def kill(self, job_id: str) -> str:
job = self.jobs.get(job_id)
if not job:
return "Job not found."
process = job["process"]
try:
process.kill()
await process.wait()
return "Process killed."
except Exception as e:
return f"Error killing process: {e}"
# ---------------------------------------------------------------------------
# Subagent manager
# ---------------------------------------------------------------------------
class SubagentManager:
def __init__(self, main_agent: "Agent", client: DeepSeekClient) -> None:
self.main_agent = main_agent
self.client = client
async def run_subagent(self, goal: str, instructions: str = "", max_iter: int = 15) -> str:
# Subagents get their own memory and background manager to avoid interference
sub_memory = PeekMemory(client=self.client, max_tokens=MAX_CONTEXT_TOKENS)
await sub_memory.add_message("system", content=instructions)
await sub_memory.add_message("user", content=f"Goal: {goal}")
# Create fresh background manager for isolation
sub_bg = BackgroundProcessManager()
# Build a copy of the tool registry so the subagent can use the same tools
sub_tools = ToolRegistry()
for name, func in self.main_agent.tool_registry.tools.items():
# We need to re‑register with the same function, but the functions
# close over self.main_agent, which is fine for tools that reference the agent.
# For true isolation we could rebind, but reusing is simpler and acceptable.
pass # we'll just reuse the same tool_registry instance to avoid complex closures
# Instead, pass the main registry; the subagent will still use the main agent's tools
# but with its own memory. This is acceptable for most tools; only background tools
# will use the new background manager if we rebind. However, the shell_exec etc. use
# agent.project_root and agent.working_dir, which are same as main agent, safe.
# To truly isolate background jobs, we'll override the agent's background_manager inside
# the subagent's run. So we'll create a new Agent with sub_memory and sub_bg.
sub_agent = Agent(
client=self.client,
working_dir=self.main_agent.working_dir,
project_root=self.main_agent.project_root,
tool_registry=self.main_agent.tool_registry, # share tools
memory=sub_memory,
model=self.client.model,
reasoning_level=self.client.reasoning_level,
background_manager=sub_bg,
)
return await sub_agent._run_loop(max_iterations=max_iter)
# ---------------------------------------------------------------------------
# Skill manager
# ---------------------------------------------------------------------------
class SkillManager:
def __init__(self, skills_dir: Path) -> None:
self.skills_dir = skills_dir
self.skills: Dict[str, str] = {}
self.load_skills()
def load_skills(self) -> None:
if not self.skills_dir.exists():
return
for skill_file in self.skills_dir.glob("*.md"):
self.skills[skill_file.stem] = skill_file.read_text(encoding="utf-8")
def get(self, name: str) -> Optional[str]:
return self.skills.get(name)
def list_names(self) -> List[str]:
return list(self.skills.keys())
def install(self, name: str, content: str) -> str:
"""Write a new skill file."""
safe_name = re.sub(r'[^\w\-]', '_', name)
path = self.skills_dir / f"{safe_name}.md"
path.write_text(content, encoding='utf-8')
self.skills[safe_name] = content
return f"Skill '{safe_name}' installed."
# ---------------------------------------------------------------------------
# Main Agent (orchestrator)
# ---------------------------------------------------------------------------
class Agent:
def __init__(
self,
client: DeepSeekClient,
working_dir: Path,
project_root: Path,
tool_registry: ToolRegistry,
memory: Optional[PeekMemory] = None,
model: str = DEFAULT_MODEL,
reasoning_level: str = DEFAULT_REASONING,
background_manager: Optional[BackgroundProcessManager] = None,
) -> None:
self.client = client
self.working_dir = working_dir
self.project_root = project_root
self.tool_registry = tool_registry
self.memory = memory or PeekMemory(client=client)
self.model = model
self.reasoning_level = reasoning_level
self.skill_manager = SkillManager(working_dir / "skills")
self.subagent_manager = SubagentManager(self, client)
self.background_manager = background_manager or BackgroundProcessManager()
self._last_token_report = 0.0
async def set_system_prompt(self) -> None:
tools_desc = "\n".join(
f"- {d['function']['name']}: {d['function']['description']}"
for d in self.tool_registry.get_definitions()
)
skills = self.skill_manager.list_names()
skills_desc = ", ".join(skills) if skills else "(none)"
prompt = (
f"You are an autonomous AI agent working on the project at {self.project_root}.\n"
f"Your internal workspace is at {self.working_dir} (hidden from you).\n"
f"Available tools:\n{tools_desc}\n\n"
f"Available skills (use load_skill): {skills_desc}\n\n"
"You can spawn subagents with spawn_subagent.\n"
"When using shell_exec for commands that may produce large output, set summarize=true to save tokens.\n"
"Plan your steps using reasoning, and provide a final answer when the task is complete.\n"
"If the API returns an error, the error will be reported to you; you may wait or try a different approach."
)
await self.memory.add_message("system", content=prompt)
def _resolve_safe_path(self, path_str: str) -> Optional[Path]:
full = (self.project_root / path_str).resolve()
wd_resolved = self.working_dir.resolve()
try:
full.relative_to(self.project_root.resolve())
except ValueError:
return None
# Block access to working directory
if full == wd_resolved or wd_resolved in full.parents:
return None
# Additionally, skip paths that would be excluded by our directory policy
if _should_skip_path(full, self.project_root, self.working_dir):
return None
return full
async def run_goal(self, goal: str, max_iterations: int = MAX_ITERATIONS) -> str:
await self.memory.add_message("user", content=f"Goal: {goal}")
return await self._run_loop(max_iterations)
async def _run_loop(self, max_iterations: int = MAX_ITERATIONS) -> str:
iteration = 0
consecutive_errors = 0
while iteration < max_iterations:
iteration += 1
logger.info(f"Iteration {iteration}")
context = await self.memory.get_context_messages()
try:
resp = await self.client.chat_completion(
messages=context,
tools=self.tool_registry.get_definitions() if self.tool_registry.get_definitions() else None,
tool_choice="auto"
)
# Log token usage
usage = resp.get("usage", {})
if usage:
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
total_tokens = usage.get("total_tokens", prompt_tokens + completion_tokens)
logger.info(
f"Tokens: prompt={prompt_tokens}, completion={completion_tokens}, total={total_tokens}"
)
consecutive_errors = 0
except Exception as e:
logger.error(f"API call failed: {e}")
consecutive_errors += 1
error_msg = f"API error during iteration {iteration}: {e}"
await self.memory.add_message("system", content=error_msg)
if consecutive_errors >= MAX_CONSECUTIVE_API_ERRORS:
return f"Too many consecutive API errors ({consecutive_errors}). Last error: {e}"
continue
choice = resp["choices"][0]
msg = choice.get("message", {})
reasoning = msg.get("reasoning_content")
content = msg.get("content", "")
tool_calls = msg.get("tool_calls", [])
if reasoning:
logger.info(f"Reasoning: {reasoning[:200]}...")
assistant_kwargs: Dict[str, Any] = {"content": content}
if reasoning:
assistant_kwargs["reasoning_content"] = reasoning
if tool_calls:
assistant_kwargs["tool_calls"] = tool_calls
await self.memory.add_message("assistant", **assistant_kwargs)
if not tool_calls:
if content:
logger.info("Agent provided final answer.")
return content
else:
logger.warning("No tool calls and no content; ending.")
return "Model did not produce a response."
for tc in tool_calls:
tool_id = tc["id"]
tool_name = tc["function"]["name"]
args_str = tc["function"]["arguments"]
logger.info(f"Executing tool: {tool_name} with args: {args_str[:200]}")
result = await self.tool_registry.execute(tc)
# Log truncated result for monitoring
logger.info(f"Tool {tool_name} result: {result[:200]}...")
await self.memory.add_tool_result(tool_id, result)
return "Max iterations reached without final answer."
# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------
def _build_tool_registry(agent: Agent) -> ToolRegistry:
registry = ToolRegistry()
# File read
def read_file(path: str, start_line: Optional[int] = None, end_line: Optional[int] = None) -> str:
full = agent._resolve_safe_path(path)
if not full:
return "Error: path outside project root, excluded, or in working directory"
if not full.exists():
return f"Error: file {path} not found"
if start_line is not None and start_line < 1:
return "Error: start_line must be >= 1"
if end_line is not None and start_line is not None and end_line < start_line:
return "Error: end_line must be >= start_line"
content = full.read_text(encoding="utf-8")
if start_line is not None or end_line is not None:
lines = content.splitlines()
start = (start_line or 1) - 1
end = end_line or len(lines)
content = "\n".join(lines[start:end])
return content
registry.register("read_file", read_file, "Read a file from the project. Optionally specify line range.", {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Relative path"},
"start_line": {"type": "integer", "description": "First line (1-indexed, optional)"},
"end_line": {"type": "integer", "description": "Last line (inclusive, optional)"}
},
"required": ["path"]
})
# File write
def write_file(path: str, content: str) -> str:
full = agent._resolve_safe_path(path)
if not full:
return "Error: path outside project root, excluded, or in working directory"
full.parent.mkdir(parents=True, exist_ok=True)
full.write_text(content, encoding="utf-8")
return f"Successfully wrote to {path}"
registry.register("write_file", write_file, "Write content to a file (overwrite)", {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
})
# List files with metadata
def list_files(directory: str = ".") -> str:
full = agent._resolve_safe_path(directory)
if not full:
return "Error: path outside project root or excluded"
entries: List[str] = []
for p in sorted(full.iterdir()):
# Skip excluded paths
if _should_skip_path(p, agent.project_root, agent.working_dir):
continue
try:
stat = p.stat()
size = stat.st_size
mtime = time.strftime('%Y-%m-%d %H:%M', time.localtime(stat.st_mtime))
if p.is_dir():
entries.append(f"[DIR] {p.name}/")
else:
# Format size
if size < 1024:
size_str = f"{size}B"
elif size < 1024*1024:
size_str = f"{size/1024:.1f}K"
else:
size_str = f"{size/(1024*1024):.1f}M"
entries.append(f"[FILE] {p.name} {size_str:>6} {mtime}")
except Exception:
entries.append(f"{'[DIR]' if p.is_dir() else '[FILE]'} {p.name}")
return "\n".join(entries) if entries else "(empty)"
registry.register("list_files", list_files, "List directory contents with sizes and modification times", {
"type": "object",
"properties": {"directory": {"type": "string", "description": "Relative directory (default: root)"}}
})
# Search files (respect exclusion)
def search_files(pattern: str, directory: str = ".", max_files: int = MAX_SEARCH_FILES) -> str:
full = agent._resolve_safe_path(directory)
if not full:
return "Error: path outside project root or excluded"
results = []
count = 0
for p in full.rglob("*"):
if not p.is_file():
continue
if _should_skip_path(p, agent.project_root, agent.working_dir):
continue
count += 1
if count > max_files:
break
try:
text = p.read_text(encoding="utf-8")
if re.search(pattern, text):
results.append(str(p.relative_to(agent.project_root)))
except Exception:
pass
return "\n".join(results) if results else "No matches"
registry.register("search_files", search_files, "Search files using regex (limited file count, respects exclusions)", {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"directory": {"type": "string", "description": "Relative directory (default: root)"},
"max_files": {"type": "integer", "description": "Maximum files to inspect (default: 1000)"}
},
"required": ["pattern"]
})
# Shell execution
async def shell_exec(command: str, summarize: bool = False, timeout: int = 30) -> str:
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=agent.project_root
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
output = stdout.decode(errors='replace') + "\n" + stderr.decode(errors='replace')
if not summarize:
if len(output) > 10000:
return output[:10000] + "\n... (output truncated to 10000 characters)"
return output
# Only call summarization API if output is truly large
if len(output) <= 2000:
return output
# Use flash model and no reasoning for summarization
resp = await agent.client.chat_completion(
messages=[{"role": "user", "content": f"Summarize this command output concisely:\n\n{output[:4000]}"}],
max_tokens=500,
temperature=0.3,
model="deepseek-v4-flash", # use flash for cheap summarization
reasoning_effort="none"
)
summary = resp["choices"][0]["message"]["content"]
return f"Summary: {summary}\n\n(Full output length: {len(output)} chars)"
except asyncio.TimeoutError:
return "Command timed out."
except Exception as e:
return f"Error executing command: {e}"
registry.register("shell_exec", shell_exec, "Execute a shell command. Set summarize=true for concise output.", {
"type": "object",
"properties": {
"command": {"type": "string"},
"summarize": {"type": "boolean"},
"timeout": {"type": "integer", "default": 30}
},
"required": ["command"]
})
# Wait tool
async def wait(seconds: float) -> str:
if seconds < 0:
raise ValueError("seconds must be non-negative")
await asyncio.sleep(seconds)
return f"Waited {seconds} second(s)."
registry.register("wait", wait, "Pause for a given number of seconds.", {
"type": "object",
"properties": {"seconds": {"type": "number", "description": "Duration in seconds"}},
"required": ["seconds"]
})
# Background process tools
async def background_shell(command: str) -> str:
job_id = await agent.background_manager.start(command)
return f"Background job started with ID: {job_id}"
registry.register("background_shell", background_shell, "Start a shell command in the background, returning a job ID.", {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]
})
def background_status(job_id: str) -> str:
status = agent.background_manager.status(job_id)
if status is None:
return "Job ID not found."
return f"Job {job_id} status: {status}"
registry.register("background_status", background_status, "Check the status of a background job (running/completed/error).", {
"type": "object",
"properties": {"job_id": {"type": "string"}},
"required": ["job_id"]
})
def background_output(job_id: str) -> str:
output = agent.background_manager.output(job_id)
if output is None:
return "Job not found or still running."
return output
registry.register("background_output", background_output, "Get the output of a completed background job.", {
"type": "object",
"properties": {"job_id": {"type": "string"}},
"required": ["job_id"]
})
async def background_kill(job_id: str) -> str:
return await agent.background_manager.kill(job_id)
registry.register("background_kill", background_kill, "Kill a running background job.", {
"type": "object",
"properties": {"job_id": {"type": "string"}},
"required": ["job_id"]
})
# Create virtual environment
def create_venv(path: str) -> str:
full = agent._resolve_safe_path(path)
if not full:
return "Error: invalid path for venv"
if full.exists():
return f"Error: path '{path}' already exists"
try:
_venv.create(full, with_pip=True)
python_exe = _get_venv_python(full)
if not python_exe.exists():
return "Error: virtual environment created but Python executable not found."
return f"Virtual environment created at {path}"
except Exception as e:
if "ensurepip" in str(e).lower():
return "Error: ensurepip not available. Install python3-venv or use --without-pip and install pip manually."
return f"Error creating venv: {e}"
registry.register("create_venv", create_venv, "Create a Python virtual environment inside the project", {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"]
})
# Install pip package
async def install_package(package: str, venv_path: str = "", timeout: int = 120) -> str:
if venv_path:
venv_full = agent._resolve_safe_path(venv_path)
if not venv_full:
return "Error: invalid venv path"
python_exe = _get_venv_python(venv_full)
if not python_exe.exists():
return f"Error: python not found in venv at {venv_path}"
cmd = [str(python_exe), "-m", "pip", "install", package]
else:
cmd = [sys.executable, "-m", "pip", "install", package]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=agent.project_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
if proc.returncode == 0:
return f"Successfully installed {package}"
else:
return f"Error installing {package}:\n{stderr.decode()}"
except asyncio.TimeoutError:
return f"Installation timed out after {timeout} seconds."
except Exception as e:
return f"Error: {e}"
registry.register("install_package", install_package, "Install a pip package (optionally into a venv)", {
"type": "object",
"properties": {
"package": {"type": "string"},
"venv_path": {"type": "string", "default": ""},
"timeout": {"type": "integer", "default": 120}
},
"required": ["package"]
})
# Execute Python code
async def python_exec(code: str, venv_path: str = "", timeout: int = 30) -> str:
if venv_path:
venv_full = agent._resolve_safe_path(venv_path)
if not venv_full:
return "Error: invalid venv path"
python_exe = str(_get_venv_python(venv_full))
else:
python_exe = sys.executable
try:
proc = await asyncio.create_subprocess_exec(
python_exe, "-c", code,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
cwd=agent.project_root
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
out = stdout.decode(errors='replace')
err = stderr.decode(errors='replace')
if proc.returncode == 0:
return out
else:
return f"Error (exit {proc.returncode}):\n{err}\n{out}"
except asyncio.TimeoutError:
return "Python code timed out."
except Exception as e:
return f"Error: {e}"
registry.register("python_exec", python_exec, "Execute Python code, optionally inside a venv", {
"type": "object",
"properties": {
"code": {"type": "string"},
"venv_path": {"type": "string", "default": ""},
"timeout": {"type": "integer", "default": 30}
},
"required": ["code"]
})
# Search and replace
def search_replace(file_path: str, old_text: str, new_text: str, count: int = 1) -> str:
full = agent._resolve_safe_path(file_path)
if not full:
return "Error: invalid path"
if not full.is_file():
return "File not found."
content = full.read_text(encoding='utf-8')
max_possible = content.count(old_text)
if max_possible == 0:
return "No occurrence found."
if count == -1:
new_content = content.replace(old_text, new_text)
replaced = max_possible
else:
new_content = content.replace(old_text, new_text, count)
replaced = min(count, max_possible)
if new_content != content:
full.write_text(new_content, encoding='utf-8')
return f"Replaced {replaced} occurrence(s)."
else:
return "No occurrence found."
registry.register("search_replace", search_replace, "Replace literal text in a file. count=-1 for all.", {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"},
"count": {"type": "integer", "default": 1}
},
"required": ["file_path", "old_text", "new_text"]
})
# Spawn subagent
async def spawn_subagent(goal: str, instructions: str = "") -> str:
result = await agent.subagent_manager.run_subagent(goal, instructions)
return f"Subagent result: {result}"
registry.register("spawn_subagent", spawn_subagent, "Launch a subagent for a subgoal", {
"type": "object",
"properties": {
"goal": {"type": "string"},
"instructions": {"type": "string", "default": ""}
},
"required": ["goal"]
})
# Load skill
def load_skill(name: str) -> str:
skill = agent.skill_manager.get(name)
if skill:
return skill
return f"Skill '{name}' not found. Available: {', '.join(agent.skill_manager.list_names())}"
registry.register("load_skill", load_skill, "Load a skill by name", {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"]
})
# List skills
def list_skills() -> str:
names = agent.skill_manager.list_names()
return ", ".join(names) if names else "No skills installed"
registry.register("list_skills", list_skills, "List all available skills", {
"type": "object",
"properties": {}
})
# Install skill at runtime
def install_skill(name: str, content: str) -> str:
return agent.skill_manager.install(name, content)
registry.register("install_skill", install_skill, "Install a new skill (markdown content)", {
"type": "object",
"properties": {
"name": {"type": "string"},
"content": {"type": "string"}
},
"required": ["name", "content"]
})
return registry
# ---------------------------------------------------------------------------
# CLI handling
# ---------------------------------------------------------------------------
def create_agents_md(project_root: Path) -> None:
path = project_root / "AGENTS.md"
if path.exists():
print("AGENTS.md already exists.")
return
# Auto‑detect project structure (simple)
try:
dirs = sorted([d.name for d in project_root.iterdir() if d.is_dir() and not d.name.startswith('.')])
files = [f.suffix for f in project_root.iterdir() if f.is_file() and f.suffix]
lang_hint = ""
if any(s in ('.py',) for s in files):
lang_hint = "Python"
elif any(s in ('.js', '.ts') for s in files):
lang_hint = "JavaScript/TypeScript"
elif any(s in ('.java',) for s in files):
lang_hint = "Java"
# add more as needed
except Exception:
dirs = []
lang_hint = ""
top_dirs = ", ".join(dirs[:10]) if dirs else "(none)"
lang_line = f"- Detected language: {lang_hint}\n" if lang_hint else ""
template = f"""# AGENTS.md
This file provides context for the DeepSeek agent.
## Project Overview
(describe your project)
## Key Directories
- Top‑level directories: {top_dirs}
{lang_line}
## Conventions
- Language & version requirements
- Coding style and testing expectations
## Agent Instructions
- Before making changes, read relevant files.
- Use tools to search and modify code.
- For complex tasks, spawn subagents.
- Provide reasoning before actions.
"""
path.write_text(template)
print(f"Created {path}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="DeepSeek Harness – single‑file agentic orchestrator",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python script.py goal "Implement a feature"
python script.py init
python script.py --interactive
"""
)
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model name")
parser.add_argument("--reasoning", default=DEFAULT_REASONING,
choices=["low","medium","high","none"], help="Reasoning effort level")
parser.add_argument("--interactive", "-i", action="store_true",
help="Enter interactive mode")
parser.add_argument("--no-cache", action="store_true",
help="Disable API response caching")
sub = parser.add_subparsers(dest="command", help="Available commands")
goal_parser = sub.add_parser("goal", help="Run a goal")
goal_parser.add_argument("goal_text", help="The goal description")
sub.add_parser("init", help="Create AGENTS.md and working directory")
args = parser.parse_args()
return args
async def interactive_loop(agent: Agent) -> None:
print("DeepSeek Harness – interactive mode. Type /goal <text>, /init, /exit, or /help.")
while True:
try:
cmd = await asyncio.to_thread(input, "> ")
cmd = cmd.strip()
except (EOFError, KeyboardInterrupt):
break
if not cmd:
continue
if cmd.startswith("/goal "):
goal = cmd[6:].strip()
if not goal:
print("Please provide a goal.")
continue
print(f"Running goal: {goal}")
result = await agent.run_goal(goal)
print("Result:")
print(result)
elif cmd == "/init":
create_agents_md(agent.project_root)
elif cmd == "/exit":
break
elif cmd == "/help":
print("Commands: /goal <text>, /init, /exit, /help")
else:
print("Unknown command. Use /goal, /init, or /exit.")
async def main() -> None:
# Enforce minimum Python version
if sys.version_info < REQUIRED_PYTHON:
print(
f"Python {REQUIRED_PYTHON[0]}.{REQUIRED_PYTHON[1]} or higher is required.",
file=sys.stderr
)
sys.exit(1)
args = parse_args()
api_key = get_api_key()
project_root = Path.cwd()
working_dir = get_working_dir(project_root)
if args.command == "init":
create_agents_md(project_root)
return
use_cache = not args.no_cache
async with DeepSeekClient(
api_key, args.model, args.reasoning,
cache_dir=working_dir / "cache",
cache_enabled=use_cache
) as client:
if not await client.check_connectivity():
print(
"Failed to connect to the DeepSeek API. Check your key and network.",
file=sys.stderr
)
sys.exit(1)
agent = Agent(
client=client,
working_dir=working_dir,
project_root=project_root,
tool_registry=ToolRegistry(), # will be replaced
model=args.model,
reasoning_level=args.reasoning
)
agent.tool_registry = _build_tool_registry(agent)
# Memory already empty, no need for explicit reset unless we want to clear initial system prompt
await agent.set_system_prompt()
if args.command == "goal":
result = await agent.run_goal(args.goal_text)
print(result)
else:
await interactive_loop(agent)
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment