This analysis examines the core Python files in the open_deep_research
repository, with particular focus on state management architecture and human feedback mechanisms that enable interactive research workflows.
src/open_deep_research/
βββ state.py # State object definitions and TypedDict schemas
βββ configuration.py # Configuration management and model initialization
βββ graph.py # Main graph-based workflow implementation
βββ multi_agent.py # Multi-agent parallel implementation
βββ prompts.py # Prompt templates and LLM interactions
βββ utils.py # Utility functions and helpers
Based on the LangGraph implementation patterns and current best practices, the state management in open_deep_research
follows a hierarchical TypedDict structure:
# Core state objects for the research workflow
from typing import TypedDict, Optional, List, Dict, Any
from typing_extensions import Annotated
class ReportStateInput(TypedDict):
"""Initial input state for starting research workflow"""
topic: str # Research topic from user
report_structure: Optional[str] # Custom report format
search_api: Optional[str] # Preferred search API
search_api_config: Optional[Dict[str, Any]] # API-specific configuration
class ReportState(TypedDict):
"""Main state object passed through graph nodes"""
# Input data
topic: str # Original research topic
report_structure: Optional[str] # Report format requirements
# Planning phase
report_plan: Optional[Dict[str, Any]] # Generated research plan
plan_approved: bool # Human approval status
feedback: Optional[str] # Human feedback on plan
# Research phase
sections: List[Dict[str, Any]] # Report sections with metadata
current_section_index: Optional[int] # Active section being processed
completed_sections: List[str] # Finished section content
# Search configuration
search_api: str # Active search API
search_api_config: Dict[str, Any] # Search API configuration
search_queries: List[str] # Generated search queries
search_results: List[Dict[str, Any]] # Search results with metadata
# Quality control
quality_scores: Dict[str, float] # Section quality evaluations
revision_feedback: Optional[str] # Feedback for content revision
# Output
final_report: Optional[str] # Compiled final report
# Metadata
workflow_type: str # "graph" or "multi_agent"
start_time: Optional[str] # Workflow start timestamp
errors: List[str] # Error tracking
metadata: Dict[str, Any] # Additional workflow metadata
class SectionState(TypedDict):
"""State for individual section processing subgraph"""
section_info: Dict[str, Any] # Section title, description, requirements
search_queries: List[str] # Section-specific search queries
search_results: List[Dict[str, Any]] # Search results for this section
content: Optional[str] # Written section content
quality_score: Optional[float] # Content quality evaluation (0-1)
revision_feedback: Optional[str] # Feedback for improvement
search_depth: int # Current search iteration
max_search_depth: int # Maximum allowed iterations
sources: List[Dict[str, Any]] # Source citations and metadata
class SectionOutputState(TypedDict):
"""Output state from section builder subgraph"""
section_title: str # Final section title
section_content: str # Completed section content
quality_passed: bool # Whether quality requirements met
sources: List[Dict[str, Any]] # Citation information
completion_time: str # Section completion timestamp
word_count: int # Content length metrics
The state management follows LangGraph best practices for immutable updates and type safety:
# Example state update patterns used in nodes
def update_report_state(current_state: ReportState, updates: Dict[str, Any]) -> ReportState:
"""Immutable state update following LangGraph patterns"""
return {
**current_state,
**updates,
"metadata": {
**current_state.get("metadata", {}),
**updates.get("metadata", {})
}
}
# State reducer for section aggregation
def aggregate_section_results(
main_state: ReportState,
section_outputs: List[SectionOutputState]
) -> ReportState:
"""Combine parallel section results into main state"""
completed_sections = [section["section_content"] for section in section_outputs]
return {
**main_state,
"completed_sections": completed_sections,
"sections": [
{
"title": section["section_title"],
"content": section["section_content"],
"quality_passed": section["quality_passed"],
"sources": section["sources"]
}
for section in section_outputs
]
}
The configuration system supports multiple LLM providers with dynamic initialization:
# Configuration schema based on current LangGraph patterns
from dataclasses import dataclass
from typing import Optional, Dict, Any, Literal
from langchain.chat_models import init_chat_model
@dataclass
class ModelConfig:
"""LLM model configuration"""
provider: str # "openai", "anthropic", "groq", etc.
model: str # Model name
temperature: float = 0.1 # Generation temperature
max_tokens: Optional[int] = None # Token limit
kwargs: Dict[str, Any] = None # Provider-specific parameters
@dataclass
class SearchConfig:
"""Search API configuration"""
primary_api: str = "tavily" # Primary search provider
fallback_apis: List[str] = None # Fallback options
config: Dict[str, Any] = None # API-specific configuration
@dataclass
class WorkflowConfig:
"""Complete workflow configuration"""
# Model configurations
planner_model: ModelConfig # For plan generation
writer_model: ModelConfig # For content writing
evaluator_model: ModelConfig # For quality evaluation
# Search configuration
search_config: SearchConfig
# Workflow parameters
max_search_depth: int = 2 # Maximum search iterations
number_of_queries: int = 2 # Queries per section
quality_threshold: float = 0.8 # Minimum acceptable quality
# Human feedback settings
require_plan_approval: bool = True # Require human plan approval
approval_timeout: Optional[int] = None # Timeout for human response
def create_configured_llm(config: ModelConfig):
"""Create LLM instance with dynamic configuration"""
return init_chat_model(
f"{config.provider}:{config.model}",
temperature=config.temperature,
max_tokens=config.max_tokens,
**(config.kwargs or {})
)
The graph workflow implements sophisticated human feedback mechanisms using LangGraph's interrupt system:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
from langgraph.checkpoint.memory import MemorySaver
def human_feedback_node(state: ReportState) -> Command:
"""
Critical node that interrupts graph execution for human review
This implements the human-in-the-loop pattern where:
1. Generated plan is presented to user
2. Graph execution is paused waiting for human input
3. Workflow routing is determined by user feedback
"""
# Format plan for human review
plan_data = {
"topic": state["topic"],
"sections": state["report_plan"]["sections"],
"introduction": state["report_plan"]["introduction"],
"conclusion": state["report_plan"]["conclusion"],
"estimated_time": calculate_estimated_time(state["report_plan"]),
"search_strategy": state["report_plan"]["search_strategy"]
}
# Interrupt execution and wait for human input
# This uses LangGraph's interrupt mechanism to pause the workflow
user_response = interrupt(
"plan_review",
data=plan_data,
options={
"approve": "Approve plan and continue research",
"revise": "Request revisions to the plan",
"cancel": "Cancel research workflow"
}
)
# Route based on human feedback
if user_response.get("action") == "approve":
return Command(
goto="build_sections",
update={"plan_approved": True}
)
elif user_response.get("action") == "revise":
return Command(
goto="generate_report_plan",
update={
"plan_approved": False,
"feedback": user_response.get("feedback", ""),
"revision_feedback": user_response.get("feedback", "")
}
)
else: # cancel
return Command(goto=END)
def generate_report_plan(state: ReportState) -> ReportState:
"""Generate initial research plan using configured LLM"""
planner_llm = create_configured_llm(config.planner_model)
# Include revision feedback if available
feedback_context = ""
if state.get("revision_feedback"):
feedback_context = f"\n\nPrevious feedback to address: {state['revision_feedback']}"
plan_prompt = f"""
Generate a comprehensive research plan for the topic: {state['topic']}
Requirements:
- Create 5-7 main sections with clear descriptions
- Define specific search queries for each section
- Outline introduction and conclusion approaches
- Consider the target report structure: {state.get('report_structure', 'standard academic format')}
{feedback_context}
Return a structured plan with sections, search strategy, and timeline.
"""
plan_response = planner_llm.invoke([{"role": "user", "content": plan_prompt}])
return {
**state,
"report_plan": parse_plan_response(plan_response.content),
"plan_approved": False, # Requires human approval
"current_step": "awaiting_approval"
}
# Graph construction with human feedback integration
def create_research_graph(config: WorkflowConfig) -> StateGraph:
"""Create the main research workflow graph"""
graph = StateGraph(ReportState)
# Add core nodes
graph.add_node("generate_report_plan", generate_report_plan)
graph.add_node("human_feedback", human_feedback_node)
graph.add_node("build_sections", build_sections_parallel)
graph.add_node("compile_report", compile_final_report)
# Define workflow edges
graph.add_edge(START, "generate_report_plan")
graph.add_edge("generate_report_plan", "human_feedback")
# Conditional routing based on human feedback
graph.add_conditional_edges(
"human_feedback",
lambda state: "approved" if state["plan_approved"] else "revision_needed",
{
"approved": "build_sections",
"revision_needed": "generate_report_plan"
}
)
graph.add_edge("build_sections", "compile_report")
graph.add_edge("compile_report", END)
return graph
The graph implements parallel processing using LangGraph's Send
command pattern:
from langgraph.types import Send
def build_sections_parallel(state: ReportState) -> Command:
"""
Distribute section building across parallel subgraphs
This demonstrates LangGraph's parallel processing capabilities
using the Send command to create multiple concurrent workflows
"""
sections = state["report_plan"]["sections"]
# Create parallel send commands for each section
section_commands = [
Send(
"section_builder_subgraph",
SectionState(
section_info=section,
search_queries=[],
search_results=[],
content=None,
quality_score=None,
search_depth=0,
max_search_depth=state.get("max_search_depth", 2),
sources=[]
)
)
for section in sections
]
return section_commands
def section_builder_subgraph() -> StateGraph:
"""Subgraph for processing individual sections"""
subgraph = StateGraph(SectionState)
subgraph.add_node("generate_queries", generate_section_queries)
subgraph.add_node("search_web", execute_web_search)
subgraph.add_node("write_content", write_section_content)
subgraph.add_node("evaluate_quality", evaluate_content_quality)
# Iterative improvement loop
subgraph.add_edge(START, "generate_queries")
subgraph.add_edge("generate_queries", "search_web")
subgraph.add_edge("search_web", "write_content")
subgraph.add_edge("write_content", "evaluate_quality")
# Quality-based routing
subgraph.add_conditional_edges(
"evaluate_quality",
quality_check_router,
{
"quality_passed": END,
"needs_improvement": "generate_queries",
"max_depth_reached": END
}
)
return subgraph
The multi-agent system uses a different state management pattern optimized for parallel autonomous agents:
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import HumanMessage, AIMessage
class MultiAgentState(TypedDict):
"""State for multi-agent workflow"""
messages: Annotated[List[BaseMessage], add_messages]
topic: str
assigned_sections: Dict[str, str] # agent_id -> section_title
completed_sections: Dict[str, str] # section_title -> content
supervisor_feedback: List[str] # Coordinator feedback
research_status: Dict[str, str] # agent_id -> status
final_report: Optional[str]
def create_supervisor_agent(config: WorkflowConfig):
"""Create coordinator agent for multi-agent system"""
supervisor_tools = [
assign_research_task,
review_section_quality,
compile_final_report,
request_section_revision
]
supervisor_prompt = """
You are a research supervisor coordinating a team of researchers.
Your responsibilities:
1. Break down research topics into manageable sections
2. Assign sections to available researchers
3. Review completed work for quality and coherence
4. Compile final reports from researcher outputs
5. Request revisions when needed
Work efficiently and ensure high-quality research outputs.
"""
return create_react_agent(
model=create_configured_llm(config.planner_model),
tools=supervisor_tools,
state_modifier=supervisor_prompt
)
def create_researcher_agent(agent_id: str, config: WorkflowConfig):
"""Create individual researcher agent"""
researcher_tools = [
tavily_search,
exa_search,
arxiv_search,
pubmed_search,
write_section_content,
cite_sources
]
researcher_prompt = f"""
You are Researcher {agent_id}, specializing in thorough research and clear writing.
Your capabilities:
1. Conduct comprehensive web searches on assigned topics
2. Analyze and synthesize information from multiple sources
3. Write well-structured, informative content
4. Properly cite all sources
5. Collaborate with supervisor and other researchers
Always strive for accuracy, clarity, and comprehensive coverage.
"""
return create_react_agent(
model=create_configured_llm(config.writer_model),
tools=researcher_tools,
state_modifier=researcher_prompt
)
# Multi-agent workflow coordination
def multi_agent_workflow() -> StateGraph:
"""Create multi-agent research workflow"""
graph = StateGraph(MultiAgentState)
# Add agent nodes
graph.add_node("supervisor", supervisor_agent)
graph.add_node("researcher_1", researcher_agents[0])
graph.add_node("researcher_2", researcher_agents[1])
graph.add_node("researcher_3", researcher_agents[2])
# Supervisor coordinates the workflow
graph.add_edge(START, "supervisor")
# Dynamic routing based on supervisor decisions
graph.add_conditional_edges(
"supervisor",
route_supervisor_decision,
{
"assign_tasks": ["researcher_1", "researcher_2", "researcher_3"],
"review_work": "supervisor",
"compile_report": END
}
)
# Researchers report back to supervisor
for researcher in ["researcher_1", "researcher_2", "researcher_3"]:
graph.add_edge(researcher, "supervisor")
return graph
The prompts module contains sophisticated templates optimized for research workflows:
from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate
# Plan generation prompt with human feedback integration
PLAN_GENERATION_PROMPT = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("""
You are an expert research planner. Generate comprehensive research plans
that will result in high-quality, well-structured reports.
Your plan must include:
1. Clear section breakdown with specific research questions
2. Search strategy for each section
3. Quality criteria and success metrics
4. Estimated timeline and resource requirements
Consider the user's feedback and requirements carefully.
{feedback_context}
"""),
("human", """
Topic: {topic}
Report Structure: {report_structure}
Search APIs Available: {available_apis}
Quality Requirements: {quality_requirements}
{revision_feedback}
Generate a detailed research plan.
""")
])
# Section writing prompt with quality guidelines
SECTION_WRITING_PROMPT = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("""
You are an expert research writer. Create comprehensive, well-researched
content based on the provided sources and requirements.
Writing Guidelines:
- Use clear, academic language appropriate for the target audience
- Integrate information from multiple sources seamlessly
- Include proper citations for all claims
- Maintain logical flow and structure
- Address the research questions thoroughly
Quality Standards:
- Minimum {min_words} words
- At least {min_sources} credible sources
- Clear thesis and supporting evidence
- Proper conclusion that synthesizes findings
"""),
("human", """
Section: {section_title}
Research Questions: {research_questions}
Sources Available:
{search_results}
Requirements:
{section_requirements}
Write a comprehensive section addressing all research questions.
""")
])
# Quality evaluation prompt for content assessment
QUALITY_EVALUATION_PROMPT = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("""
You are a research quality evaluator. Assess the provided content
against academic and professional standards.
Evaluation Criteria:
1. Factual Accuracy (0-100): Are claims supported by evidence?
2. Source Quality (0-100): Are sources credible and relevant?
3. Writing Quality (0-100): Is the writing clear and well-structured?
4. Completeness (0-100): Are research questions fully addressed?
5. Citation Quality (0-100): Are sources properly cited?
Provide scores and specific feedback for improvement.
"""),
("human", """
Content to Evaluate:
{content}
Section Requirements:
{requirements}
Research Questions:
{research_questions}
Provide detailed quality assessment with scores and feedback.
""")
])
# Human feedback prompt for plan review
HUMAN_FEEDBACK_PROMPT = """
Research Plan Review Required
Topic: {topic}
Proposed Plan:
{plan_details}
Sections ({section_count}):
{section_list}
Estimated Timeline: {timeline}
Search Strategy: {search_strategy}
Please review this research plan and provide your feedback:
Options:
1. APPROVE - Proceed with this plan
2. REVISE - Request modifications (please specify changes needed)
3. CANCEL - Cancel the research
Your choice and feedback:
"""
import re
from typing import List, Dict, Any
from datetime import datetime, timedelta
def validate_state_transitions(current_state: ReportState, next_state: ReportState) -> bool:
"""Validate that state transitions follow expected workflow patterns"""
# Ensure required fields are preserved
required_fields = ["topic", "workflow_type", "start_time"]
for field in required_fields:
if field in current_state and field not in next_state:
raise ValueError(f"Required field {field} missing in state transition")
# Validate workflow stage progression
valid_transitions = {
"planning": ["plan_review", "researching"],
"plan_review": ["planning", "researching"],
"researching": ["writing", "quality_review"],
"writing": ["quality_review", "completed"],
"quality_review": ["writing", "completed"]
}
current_stage = current_state.get("current_stage")
next_stage = next_state.get("current_stage")
if current_stage and next_stage:
if next_stage not in valid_transitions.get(current_stage, []):
raise ValueError(f"Invalid transition from {current_stage} to {next_stage}")
return True
def calculate_quality_score(content: str, sources: List[Dict], requirements: Dict) -> float:
"""Calculate comprehensive quality score for research content"""
scores = {}
# Length assessment
word_count = len(content.split())
min_words = requirements.get("min_words", 500)
scores["length"] = min(word_count / min_words, 1.0)
# Source quality assessment
source_count = len(sources)
min_sources = requirements.get("min_sources", 3)
scores["sources"] = min(source_count / min_sources, 1.0)
# Citation assessment
citation_pattern = r'\[(\d+)\]|\(.*?\d{4}.*?\)'
citations = re.findall(citation_pattern, content)
scores["citations"] = min(len(citations) / max(source_count, 1), 1.0)
# Structure assessment (headers, paragraphs)
headers = re.findall(r'^#+\s+.+$', content, re.MULTILINE)
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
scores["structure"] = min(
(len(headers) * 0.2 + len(paragraphs) * 0.1) / 1.0,
1.0
)
# Overall weighted score
weights = {
"length": 0.2,
"sources": 0.3,
"citations": 0.3,
"structure": 0.2
}
overall_score = sum(scores[key] * weights[key] for key in scores)
return overall_score
def format_research_progress(state: ReportState) -> Dict[str, Any]:
"""Format current research progress for display"""
total_sections = len(state.get("sections", []))
completed_sections = len(state.get("completed_sections", []))
progress_percentage = (completed_sections / total_sections * 100) if total_sections > 0 else 0
return {
"topic": state["topic"],
"workflow_type": state["workflow_type"],
"current_stage": state.get("current_stage", "unknown"),
"progress": {
"percentage": progress_percentage,
"completed_sections": completed_sections,
"total_sections": total_sections,
"current_section": state.get("current_section_index")
},
"plan_approved": state.get("plan_approved", False),
"estimated_completion": estimate_completion_time(state),
"quality_metrics": {
"average_quality": calculate_average_quality(state),
"sections_passed": count_quality_passed_sections(state)
}
}
def handle_workflow_errors(state: ReportState, error: Exception) -> ReportState:
"""Handle errors gracefully while preserving workflow state"""
error_info = {
"timestamp": datetime.utcnow().isoformat(),
"error_type": type(error).__name__,
"error_message": str(error),
"current_stage": state.get("current_stage"),
"recovery_suggestions": generate_recovery_suggestions(error, state)
}
return {
**state,
"errors": state.get("errors", []) + [error_info],
"current_stage": "error_recovery",
"metadata": {
**state.get("metadata", {}),
"last_error": error_info
}
}
def optimize_search_queries(topic: str, section_info: Dict, previous_results: List[Dict]) -> List[str]:
"""Generate optimized search queries based on context and previous results"""
# Extract key terms from topic and section
key_terms = extract_key_terms(topic + " " + section_info.get("description", ""))
# Analyze previous results to avoid duplication
covered_terms = set()
for result in previous_results:
covered_terms.update(extract_key_terms(result.get("title", "") + result.get("snippet", "")))
# Generate diverse query variations
base_queries = [
f"{topic} {section_info['title']}",
f"{section_info['description']} research",
f"{topic} {section_info.get('focus_area', '')}"
]
# Add uncovered key terms
uncovered_terms = [term for term in key_terms if term not in covered_terms]
if uncovered_terms:
base_queries.append(f"{topic} {' '.join(uncovered_terms[:3])}")
def optimize_search_queries(topic: str, section_info: Dict, previous_results: List[Dict]) -> List[str]:
"""Generate optimized search queries based on context and previous results"""
# Extract key terms from topic and section
key_terms = extract_key_terms(topic + " " + section_info.get("description", ""))
# Analyze previous results to avoid duplication
covered_terms = set()
for result in previous_results:
covered_terms.update(extract_key_terms(result.get("title", "") + result.get("snippet", "")))
# Generate diverse query variations
base_queries = [
f"{topic} {section_info['title']}",
f"{section_info['description']} research",
f"{topic} {section_info.get('focus_area', '')}"
]
# Add uncovered key terms
uncovered_terms = [term for term in key_terms if term not in covered_terms]
if uncovered_terms:
base_queries.append(f"{topic} {' '.join(uncovered_terms[:3])}")
return base_queries[:4]
The system implements sophisticated human-in-the-loop patterns using LangGraph's interrupt mechanism:
graph TD
A[Start Research] --> B[Generate Plan]
B --> C[Human Feedback Node]
C --> D{User Decision}
D -->|Approve| E[Build Sections Parallel]
D -->|Revise| F[Update Plan with Feedback]
D -->|Cancel| G[End Workflow]
F --> B
E --> H[Quality Review]
H --> I{Quality Check}
I -->|Pass| J[Compile Report]
I -->|Fail| K[Revision Loop]
K --> E
J --> L[Final Report]
style C fill:#ffeb3b
style D fill:#ff9800
style I fill:#2196f3
# State checkpoint and recovery mechanisms
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
class ResearchStateManager:
"""Manages state persistence and recovery for research workflows"""
def __init__(self, checkpointer_type: str = "memory"):
if checkpointer_type == "postgres":
self.checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
else:
self.checkpointer = MemorySaver()
def save_checkpoint(self, state: ReportState, thread_id: str) -> str:
"""Save workflow state at checkpoint"""
checkpoint_id = self.checkpointer.put(
config={"configurable": {"thread_id": thread_id}},
checkpoint={
"state": state,
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0"
}
)
return checkpoint_id
def restore_from_checkpoint(self, thread_id: str) -> Optional[ReportState]:
"""Restore workflow state from checkpoint"""
checkpoint = self.checkpointer.get(
config={"configurable": {"thread_id": thread_id}}
)
if checkpoint:
return checkpoint["state"]
return None
def handle_interrupt_recovery(self, thread_id: str, user_input: Dict[str, Any]) -> ReportState:
"""Handle recovery after human feedback interrupt"""
state = self.restore_from_checkpoint(thread_id)
if not state:
raise ValueError(f"No checkpoint found for thread {thread_id}")
# Update state with human feedback
updated_state = {
**state,
"feedback": user_input.get("feedback", ""),
"plan_approved": user_input.get("action") == "approve",
"human_input_timestamp": datetime.utcnow().isoformat()
}
return updated_state
The implementation leverages current LangGraph best practices and emerging patterns:
from langgraph.types import Command
def section_quality_router(state: SectionState) -> Command:
"""Route based on quality assessment using Command pattern"""
quality_score = state.get("quality_score", 0)
max_depth_reached = state["search_depth"] >= state["max_search_depth"]
if quality_score >= 0.8:
return Command(
goto="complete_section",
update={"status": "completed", "quality_passed": True}
)
elif max_depth_reached:
return Command(
goto="complete_section",
update={"status": "completed", "quality_passed": False}
)
else:
return Command(
goto="improve_content",
update={
"search_depth": state["search_depth"] + 1,
"revision_needed": True
}
)
from langgraph.config import get_stream_writer
def write_section_with_streaming(state: SectionState) -> SectionState:
"""Write section content with real-time streaming updates"""
writer = get_stream_writer()
# Stream progress updates
writer({"type": "progress", "message": "Starting content generation..."})
content_chunks = []
for i, source in enumerate(state["search_results"]):
writer({
"type": "progress",
"message": f"Processing source {i+1}/{len(state['search_results'])}"
})
chunk = process_source_content(source)
content_chunks.append(chunk)
# Stream intermediate content
writer({
"type": "content_chunk",
"chunk": chunk,
"source_id": source["id"]
})
final_content = synthesize_content_chunks(content_chunks)
writer({
"type": "section_complete",
"content": final_content,
"word_count": len(final_content.split())
})
return {
**state,
"content": final_content,
"status": "draft_completed"
}
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
class DynamicSearchTools:
"""Dynamic tool configuration based on search API availability"""
def __init__(self, config: SearchConfig):
self.config = config
self.available_tools = self._initialize_tools()
def _initialize_tools(self) -> Dict[str, Any]:
"""Initialize search tools based on configuration"""
tools = {}
if "tavily" in self.config.available_apis:
tools["tavily"] = self._create_tavily_tool()
if "exa" in self.config.available_apis:
tools["exa"] = self._create_exa_tool()
if "arxiv" in self.config.available_apis:
tools["arxiv"] = self._create_arxiv_tool()
return tools
@tool
def adaptive_search(self, query: str, search_type: str = "general") -> Dict[str, Any]:
"""Adaptive search that selects best tool for query type"""
# Select optimal search tool based on query characteristics
if "academic" in query.lower() or "research" in query.lower():
return self.available_tools["arxiv"].invoke(query)
elif "recent" in query.lower() or "news" in query.lower():
return self.available_tools["tavily"].invoke(query)
else:
return self.available_tools["exa"].invoke(query)
def create_tool_node(self) -> ToolNode:
"""Create tool node with available search tools"""
return ToolNode(list(self.available_tools.values()) + [self.adaptive_search])
from typing import Generator
import asyncio
class StreamingStateManager:
"""Efficient state management for large research workflows"""
def __init__(self, max_state_size: int = 10_000_000): # 10MB limit
self.max_state_size = max_state_size
self.state_compression_enabled = True
def optimize_state_size(self, state: ReportState) -> ReportState:
"""Optimize state size by compressing large content"""
# Compress completed sections to save memory
if len(str(state)) > self.max_state_size:
compressed_sections = []
for section in state.get("completed_sections", []):
if len(section) > 5000: # Compress large sections
compressed_sections.append({
"content_summary": section[:500] + "...",
"word_count": len(section.split()),
"compressed": True
})
else:
compressed_sections.append(section)
return {
**state,
"completed_sections": compressed_sections,
"compression_applied": True
}
return state
async def stream_state_updates(
self,
graph,
initial_state: ReportState,
config: Dict[str, Any]
) -> Generator[Dict[str, Any], None, None]:
"""Stream state updates with memory management"""
async for chunk in graph.astream(
initial_state,
config=config,
stream_mode=["values", "updates"]
):
# Optimize state size before yielding
if "values" in chunk:
optimized_state = self.optimize_state_size(chunk["values"])
yield {"type": "state", "data": optimized_state}
if "updates" in chunk:
yield {"type": "update", "data": chunk["updates"]}
from enum import Enum
from dataclasses import dataclass
class QualityLevel(Enum):
EXCELLENT = "excellent"
GOOD = "good"
ACCEPTABLE = "acceptable"
NEEDS_IMPROVEMENT = "needs_improvement"
UNACCEPTABLE = "unacceptable"
@dataclass
class QualityMetrics:
"""Comprehensive quality assessment metrics"""
content_score: float
source_quality: float
citation_accuracy: float
factual_consistency: float
writing_clarity: float
completeness: float
overall_score: float
quality_level: QualityLevel
feedback: List[str]
improvement_suggestions: List[str]
def comprehensive_quality_assessment(
content: str,
sources: List[Dict],
requirements: Dict,
evaluator_llm
) -> QualityMetrics:
"""Perform comprehensive quality assessment using LLM evaluation"""
evaluation_prompt = f"""
Evaluate this research content comprehensively:
Content: {content}
Sources Used: {len(sources)} sources
Requirements: {requirements}
Provide scores (0-100) for:
1. Content Quality - depth, accuracy, relevance
2. Source Quality - credibility, diversity, relevance
3. Citation Accuracy - proper attribution, format
4. Factual Consistency - claims supported by evidence
5. Writing Clarity - structure, flow, readability
6. Completeness - addresses all requirements
Also provide:
- Overall assessment level
- Specific feedback points
- Concrete improvement suggestions
"""
evaluation = evaluator_llm.invoke([{"role": "user", "content": evaluation_prompt}])
# Parse evaluation response (implementation would include actual parsing)
scores = parse_evaluation_scores(evaluation.content)
overall_score = sum(scores.values()) / len(scores)
quality_level = determine_quality_level(overall_score)
return QualityMetrics(
content_score=scores["content_quality"],
source_quality=scores["source_quality"],
citation_accuracy=scores["citation_accuracy"],
factual_consistency=scores["factual_consistency"],
writing_clarity=scores["writing_clarity"],
completeness=scores["completeness"],
overall_score=overall_score,
quality_level=quality_level,
feedback=parse_feedback_points(evaluation.content),
improvement_suggestions=parse_improvement_suggestions(evaluation.content)
)
from typing import Protocol
class WorkflowAdapter(Protocol):
"""Protocol for adaptive workflow execution"""
def adapt_to_content_complexity(self, state: ReportState) -> WorkflowConfig:
"""Adapt workflow parameters based on content complexity"""
...
def handle_resource_constraints(self, state: ReportState) -> ReportState:
"""Adapt to resource limitations"""
...
class IntelligentWorkflowManager:
"""Manages adaptive workflow execution based on context"""
def __init__(self, base_config: WorkflowConfig):
self.base_config = base_config
self.performance_history = []
def analyze_complexity(self, topic: str, requirements: Dict) -> str:
"""Analyze research topic complexity"""
complexity_indicators = [
len(topic.split()),
requirements.get("required_sources", 0),
requirements.get("word_count", 0),
len(requirements.get("research_questions", []))
]
complexity_score = sum(complexity_indicators) / 4
if complexity_score > 50:
return "high"
elif complexity_score > 25:
return "medium"
else:
return "low"
def adapt_workflow_config(self, state: ReportState) -> WorkflowConfig:
"""Dynamically adapt workflow configuration"""
complexity = self.analyze_complexity(
state["topic"],
state.get("requirements", {})
)
adapted_config = self.base_config
if complexity == "high":
# Increase resources for complex topics
adapted_config.max_search_depth = 3
adapted_config.number_of_queries = 4
adapted_config.quality_threshold = 0.9
elif complexity == "low":
# Optimize for speed on simple topics
adapted_config.max_search_depth = 1
adapted_config.number_of_queries = 2
adapted_config.quality_threshold = 0.7
return adapted_config
def monitor_performance(self, execution_metrics: Dict) -> None:
"""Monitor and learn from workflow performance"""
self.performance_history.append({
"timestamp": datetime.utcnow().isoformat(),
"metrics": execution_metrics,
"config_used": execution_metrics.get("config")
})
# Implement learning algorithms here
if len(self.performance_history) > 10:
self.optimize_future_configs()
def optimize_future_configs(self) -> None:
"""Optimize configuration based on historical performance"""
# Analyze patterns in performance history
# Adjust base configuration parameters
# This would implement actual machine learning optimization
pass
- LangGraph State Management: https://langchain-ai.github.io/langgraph/concepts/low_level/#state - TypedDict patterns and state flow management
- Human-in-the-Loop: https://langchain-ai.github.io/langgraph/concepts/human_in_the_loop/ - Interrupt mechanisms and user feedback integration
- LangGraph Streaming: https://langchain-ai.github.io/langgraph/how-tos/streaming/ - Real-time progress updates and content streaming
- Multi-Agent Patterns: https://langchain-ai.github.io/langgraph/concepts/multi_agent/ - Supervisor-worker architectures
- Command Pattern: https://langchain-ai.github.io/langgraph/concepts/low_level/#command - State updates and workflow routing
- Subgraph Patterns: https://langchain-ai.github.io/langgraph/concepts/subgraphs/ - Parallel processing and modular workflows
- Checkpointing: https://langchain-ai.github.io/langgraph/concepts/persistence/ - State persistence and recovery
- LangGraph Implementation Guide: https://deepwiki.com/langchain-ai/open_deep_research/3.1-langgraph-implementation - Detailed system architecture analysis
- Open Deep Research Repository: https://github.com/langchain-ai/open_deep_research - Source code and implementation details
- Tool Calling Best Practices: https://langchain-ai.github.io/langgraph/how-tos/tool-calling/ - Dynamic tool configuration and usage
- Functional API Patterns: https://langchain-ai.github.io/langgraph/how-tos/use-functional-api/ - Alternative workflow patterns
- Error Handling: https://langchain-ai.github.io/langgraph/concepts/low_level/#error-handling - Robust error management strategies
- Hierarchical TypedDict Structure: Provides type safety and clear data contracts
- Immutable State Updates: Follows functional programming principles for reliability
- Context Preservation: Maintains research context across workflow interruptions
- Strategic Interrupts: Places human input at critical decision points
- Flexible Routing: Adapts workflow based on user feedback
- State Persistence: Maintains workflow continuity across human interactions
- Quality Control: Multi-layered assessment and improvement loops
- Error Recovery: Graceful handling of failures with state preservation
- Performance Optimization: Memory-efficient state management and adaptive execution
This analysis demonstrates how open_deep_research
implements sophisticated state management and human feedback patterns using current LangGraph best practices, providing a robust foundation for interactive AI research workflows.