<specification_planning>
-
Core system architecture & key workflows
a. CLI entry ➔ parse args (depth,breadth,tool_calls, models, etc.).
b. Instantiate Orchestrator (state machine).
c. Step 0 – cheap plan: use fast model to decide whether direct answer possible; otherwise, prepare sub-queries.
d. Step 1 – breadth retrieval fan-out: firebreadthasync search jobs, collect results, persist in cache (SQLite + FTS).
e. Step 2 – rank & prune: fast cross-encoder LLM → top K (≤50) docs retained.
f. Step 3 – depth loop: for i in range(depth) while calls_left & confidence<τ:
• smart model reads synthesis_so_far, gaps → targeted queries.
• run search / specialty tasks.
• update doc store; decrement budgets.
g. Step 4 – synthesis & critique with smart model (optionally k candidates).
h. Step 5 – final output (markdown + bibliography, JSON side-car).Challenges / clarifications
• Budget accounting across parallel async calls.
• LLM context window: need summarisation & chunking.
• Source deduplication, URL normalisation.
• Citation alignment in generated text (token ↔ doc mapping).
• API key management (openai, anthropic, search provider).
• Resilience to network failures / HTTP 429 – back-off & retry.
• Local vs cloud execution; offline mode. -
Project structure & organisation
• src/cli.py (entry)
• src/core/orchestrator.py
• src/core/budget.py
• src/llm/fast.py, src/llm/smart.py (adapters)
• src/retrieval/search.py (pluggable providers)
• src/retrieval/ranker.py
• src/storage/doc_store.py (SQLite FTS)
• src/synthesis/synthesiser.py
• src/utils/…
Tests/, docs/, examples/ -
Feature specifications
• Budget knobs with hard ceilings and soft warnings.
• Multiple search back-ends (SerpAPI, Bing, mcp).
• Iterative depth loops.
• Multiple candidate drafts & adjudication.
• Config file override + CLI override precedence.
• Caching layer to avoid duplicate tool calls.
• Streaming progress bar (rich).
Edge cases: 0 depth, no results, exceeded budget mid-iteration. -
DB schema
SQLite: tables for documents (id, url, title, snippet, content, vector, added_at), searches, budgets, runs. Indexes on url, FTS on content. -
Server actions & integrations
Not a web service but external APIs: search, OpenAI/Anthropic chat completions. Abstract adaptor pattern. -
Design system
CLI colours (rich); docs website (mkdocs) if future UI is added. -
Component architecture
Pure Python modules; orchestrator coordinates. -
Auth & authorisation
API keys via env vars or ~/.deep_research/config.toml; file permissions. -
Data flow & state
Orchestrator context object carries query, budgets, doc_store ref; messages flow through. -
Payment / billing
Stripe out-of-scope for CLI; placeholder to support future SaaS daemon. -
Analytics
Optional: PostHog self-host URL; events: run_started, search_called, draft_generated. -
Testing strategy
Unit: budget accounting, search adaptor mocking, ranker scoring.
e2e: fixtures with deterministic llm stub.
Potential risks
• Rapid API changes; mitigate with interface layer.
• LLM hallucination; self-critique + citations.
• Cost overruns; strict tool_call ledger.
Open questions
• Preferred search provider? default to SerpAPI.
• PDF extraction needed now or future plug-in? Provide interface stub.
• Concurrency model: asyncio vs multiprocessing? Choose asyncio.
</specification_planning>
• Purpose: A Python CLI that orchestrates configurable, cost-aware, multi-stage deep research on any user query.
• Value proposition: Combines fast/cheap LLMs for orchestration with smart/expensive LLMs for reasoning, enforcing strict budgets while yielding high-quality, citation-rich reports.
• High-level workflow: Plan → Breadth retrieval → Rank & prune → Depth loops → Synthesis & critique → Final output.
• System architecture:
- CLI (Click) → Orchestrator (state machine)
- LLM Adapters (fast/smart)
- Search Adapters (SerpAPI, Bing, mcp)
- Ranker (fast LLM)
- Document Store (SQLite w/ FTS5 + optional FAISS vector index)
- Synthesiser (smart LLM)
- Budget Ledger
deepresearch/
├─ src/
│ ├─ cli.py
│ ├─ core/
│ │ ├─ orchestrator.py
│ │ ├─ budget.py
│ │ └─ config.py
│ ├─ llm/
│ │ ├─ base.py
│ │ ├─ fast.py
│ │ └─ smart.py
│ ├─ retrieval/
│ │ ├─ search.py
│ │ ├─ ranker.py
│ │ └─ pdf_extractor.py
│ ├─ storage/
│ │ ├─ doc_store.py
│ │ └─ schema.sql
│ ├─ synthesis/
│ │ └─ synthesiser.py
│ └─ utils/
│ ├─ logging.py
│ └─ text.py
├─ tests/
├─ examples/
└─ pyproject.toml
• User story: As a researcher, I run deepresearch "prompt" --depth 3 --breadth 5 ... to obtain a report.
• Implementation steps:
- Parse CLI flags with Click.
- Merge with
~/.deep_research/config.tomldefaults. - Validate budgets; error if any negative or conflicting.
- Instantiate Orchestrator with a
RunContext.
Edge cases: missing API key → instruct user; invalid numeric flag → exit code 2.
• Tracks counts for tool_calls, elapsed tokens (thinking_budget), wall-time.
• Each external call passes through ledger.reserve(cost); raises BudgetExceeded.
Edge cases: concurrent async calls double-booking → use asyncio.Lock.
• Fast model prompt template decides:
“Given QUERY, can you answer from prior knowledge w/out web? respond YES/NO and short answer if YES.”
• If YES, skip to Step 4 with answer as synthesis.
Error handling: ambiguous answer → treat as NO.
• Fast model generates breadth query variants (temperature=0.7).
• Launch async search tasks: search_api.search(q, top_k); each returns {url,title,snippet}.
• Persist into doc_store.
Challenges: provider rate limits → back-off exponential (max 3 retries).
• For each snippet, call fast cross-encoder LLM: “Score 0-10 relevance to original query.”
• Keep top 50 by score.
• Optionally compute embedding & vector similarity for tie-breakers.
Edge: if <5 docs retrieved → skip prune.
Pseudocode outline:
for i in range(depth):
gaps = smart_llm.identify_gaps(context_summary, citations)
if not gaps: break
queries = smart_llm.formulate_queries(gaps, max_q=3)
for q in queries:
if ledger.remaining('tool_calls') < 1: break
results = search_api.search(q, top_k=5)
doc_store.upsert(results)
ledger.decrement('tool_calls')
update_context()
if confidence > τ: breakEdge cases: no new docs added → break loop early.
• Option --drafts N (default 1).
• For each draft: smart model prompted with curated sources → produce markdown with [n] citations.
• Self-critique: smart_llm.critique(draft) → list issues → smart_llm.revise(draft, issues).
• If N>1: fast judge scores factuality/coherence and selects best.
Error: missing citation marker → regex pass to fix.
• Save report.md and report.json ({body, citations:[{id,url}]}) under ./runs/{timestamp}/
• Print summary stats: tokens used, tool calls, elapsed cost.
documents
- id INTEGER PK
- url TEXT UNIQUE NOT NULL
- title TEXT
- snippet TEXT
- content TEXT
- vector BLOB NULL
- added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
searches
- id INTEGER PK
- query TEXT
- provider TEXT
- run_id TEXT
- executed_at TIMESTAMP
runs
- id TEXT PK (uuid)
- root_query TEXT
- config JSON
- started_at TIMESTAMP
- ended_at TIMESTAMP
- total_tokens INTEGER
- total_tool_calls INTEGER
Indexes:
• UNIQUE(url) on documents.
• FTS5 virtual table documents_fts(content, title, snippet) linked to documents.
• doc_store.upsert(docs) – bulk insert; on conflict update snippet/content.
• doc_store.search_fts(query, k) – SQL: SELECT * FROM documents JOIN documents_fts ... ORDER BY rank LIMIT ?.
• Search API: GET /search (SerpAPI) with params q, num, api_key.
• LLM API: POST /v1/chat/completions (OpenAI) or /v1/complete (Anthropic).
• PDF extraction: pdftotext CLI wrapper or Unstructured lib.
• Colors: primary cyan (#00BCD4), secondary grey (#B0BEC5), error red (#FF5252).
• Typography: monospace font inherits terminal; headings bold.
• Layout: Rich Panels, aligned tables, progress bars.
• ProgressBar(id, total)
• Panel(title, body, style)
• Table(headers, rows, highlight)
Interactive states: spinner (searching), checkmark (completed), warning (budget almost exceeded).
(Not applicable; CLI only)
• State stored in RunContext dataclass.
• Event callbacks (on_search_complete, on_draft_ready).
• Concurrency: asyncio tasks with asyncio.gather.
• API keys read in order: CLI flag → ENV (OPENAI_API_KEY, SERPAPI_KEY) → config file.
• Permission check: config file chmod 600.
• If missing → prompt once (hidden input) and offer to save.
CLI → Orchestrator
↘ search_api → internet
↘ fast_llm → OpenAI/Anthropic
↘ smart_llm → OpenAI/Anthropic
↘ doc_store → SQLite
RunContext mutable object passed down, gathers tokens and docs.
(Currently N/A, but planned)
• Future daemon could expose /pay endpoints; Stripe checkout session; webhook payment_succeeded marks runs.paid = true.
• Optional --analytics flag.
• Events: run_started, search_performed, llm_call, run_completed.
• Identify by anonymised hash of machine id.
• test_budget.py – ensure exception when limit crossed.
• test_ranker.py – deterministic stub returns ordered scores.
• test_orchestrator_skip_web.py – fast model stub answers YES.
• Scenario: “Who is Ada Lovelace?” depth=0 – expect no web calls.
• Scenario: Solid state batteries query depth=1 – expect output file with ≥5 citations.
class Orchestrator:
def __init__(self, ctx: RunContext):
self.ctx = ctx
self.ledger = BudgetLedger(ctx.tool_calls, ctx.thinking_budget)
self.fast = FastLLM(ctx.model_fast)
self.smart = SmartLLM(ctx.model_smart)
self.search = SearchProvider(ctx.search_backend, self.ledger)
async def run(self):
if await self._cheap_plan():
return await self._output_and_exit()
await self._breadth_retrieval()
await self._rank_and_prune()
await self._depth_loop()
await self._synthesise()
await self._finalise()
async def _cheap_plan(self):
resp = await self.fast.chat(prompt_plan(self.ctx.query))
self.ledger.count(resp)
if resp.answerable:
self.ctx.drafts = [resp.answer]
return True
return False
async def _breadth_retrieval(self):
variants = await self.fast.chat(prompt_variants(self.ctx.query, self.ctx.breadth))
tasks = [self.search.search(v) for v in variants]
for docs in await asyncio.gather(*tasks):
self.ctx.doc_store.upsert(docs)
async def _rank_and_prune(self):
scored = []
for doc in self.ctx.doc_store.all():
score = await self.fast.chat(prompt_score(doc, self.ctx.query))
scored.append((score, doc))
top = sorted(scored, key=lambda t: t[0], reverse=True)[:50]
self.ctx.doc_store.keep([d for _, d in top])
async def _depth_loop(self):
for i in range(self.ctx.depth):
gaps = await self.smart.chat(prompt_identify_gaps(self.ctx))
if not gaps: break
queries = await self.smart.chat(prompt_queries(gaps))
for q in queries:
if not self.ledger.reserve(1): return
docs = await self.search.search(q)
self.ctx.doc_store.upsert(docs)
if self._confidence_high(): break
async def _synthesise(self):
drafts = []
for _ in range(self.ctx.drafts_n):
draft = await self.smart.chat(prompt_synth(self.ctx))
critique = await self.smart.chat(prompt_critique(draft))
draft = await self.smart.chat(prompt_revise(draft, critique))
drafts.append(draft)
if len(drafts) == 1:
self.ctx.final = drafts[0]
else:
scores = [await self.fast.chat(prompt_judge(d, self.ctx.query)) for d in drafts]
self.ctx.final = drafts[scores.index(max(scores))]
async def _finalise(self):
save_report(self.ctx.final, self.ctx.doc_store.citations())
print_summary(self.ledger, self.ctx)The specification meets all outlined requirements and is ready for hand-off to the code-generation phase.