AutoGPT implements a sophisticated feedback loop system through its interaction loop architecture that closely follows the observe-reasoning-action pattern. Here's a detailed breakdown:
AutoGPT observes the results of previous actions through the ActionResult
system:
class AssistantThoughts(ModelWithSummary):
observations: str = Field(
description="Relevant observations from your last action (if any)"
)
Key Observation Sources:
- Action Results: Success/error status from previous tool executions
- Environment Changes: File system modifications, API responses
- User Feedback: Human input and corrections
- Context History: Message history and event tracking
while cycles_remaining > 0:
# Observe current state and previous results
if not (_ep := agent.event_history.current_episode) or _ep.result:
# Fresh observation cycle
action_proposal = await agent.propose_action()
else:
# Continue with previous proposal
action_proposal = _ep.action
AutoGPT implements reasoning through the AssistantThoughts
structure:
class AssistantThoughts(ModelWithSummary):
observations: str # What was observed
text: str # Current thoughts
reasoning: str # Logic behind decisions
self_criticism: str # Self-reflection
plan: list[str] # Strategic planning
speak: str # Communication intent
- Observation Analysis: Processing previous action results
- Strategic Planning: Long-term goal decomposition
- Self-Criticism: Error analysis and improvement
- Constraint Validation: Checking against AI directives
async def propose_action(self) -> OneShotAgentActionProposal:
# Reasoning phase produces action proposal
proposal = ActionProposal(
thoughts=assistant_thoughts, # Reasoning output
use_tool=selected_tool, # Action to execute
raw_message=llm_response # Full context
)
async def execute(self, proposal: OneShotAgentActionProposal) -> ActionResult:
try:
return_value = await self._execute_tool(proposal.use_tool)
result = ActionSuccessResult(outputs=return_value)
except Exception as e:
result = ActionErrorResult.from_exception(e)
return result
AutoGPT implements feedback through cycle budget management:
cycle_budget = cycles_remaining = _get_cycle_budget(
app_config.continuous_mode, app_config.continuous_limit
)
while cycles_remaining > 0:
# 1. OBSERVE: Get current state
# 2. REASON: Generate action proposal
# 3. ACT: Execute proposed action
# 4. FEEDBACK: Update state and continue
cycles_remaining -= 1
# Register action results for next observation cycle
agent.event_history.register_result(tool_result)
# Accumulate context for reasoning
messages = await self.run_pipeline(MessageProvider.get_messages)
async def get_user_feedback() -> tuple[UserFeedback, str, int | None]:
# Human-in-the-loop feedback
if console_input.lower().strip() == config.authorise_key:
user_feedback = UserFeedback.AUTHORIZE
else:
user_feedback = UserFeedback.TEXT
user_input = console_input # Feedback for next reasoning cycle
consecutive_failures = 0
try:
action_proposal = await agent.propose_action()
except InvalidAgentResponseError as e:
consecutive_failures += 1
if consecutive_failures >= 3:
# Break infinite loops
raise AgentTerminated("Too many consecutive failures")
continue # Return to observation phase
In the modern platform, feedback loops are implemented through graph execution:
# Node execution creates feedback for next nodes
execution_data = {
"input_data": previous_node_outputs, # OBSERVE
"node_processing": reasoning_logic, # REASON
"output_data": execution_results, # ACT
"next_nodes": dependent_nodes # FEEDBACK
}
# WebSocket updates provide real-time feedback
if execution.status in ["QUEUED", "RUNNING"]:
# Continue monitoring feedback loop
setUpdateQueue(prev => [...prev, ...execution.node_executions])
- Persistent Context: Each cycle builds on previous observations
- Self-Reflection: Built-in criticism and improvement mechanisms
- Human Oversight: User feedback integration at each cycle
- Error Recovery: Automatic handling of failed actions
- Budget Control: Prevents infinite loops through cycle limits
- State Persistence: Maintains context across sessions
This sophisticated feedback loop system ensures that AutoGPT continuously learns from its actions, adapts to changing conditions, and maintains coherent long-term behavior while preventing degradation or infinite loops.
The relationship between AutoGPT's autonomy and graph-based design represents a strategic architectural evolution that addresses the limitations of pure autonomous agents while preserving AI decision-making capabilities.
The original AutoGPT (Classic) implemented pure autonomy with these challenges:
# Classic approach: Unpredictable autonomous execution
while cycles_remaining > 0:
action_proposal = await agent.propose_action() # AI decides everything
result = await agent.execute(action_proposal) # Unpredictable outcomes
cycles_remaining -= 1 # Only cycle budget provides control
Problems with Pure Autonomy:
- Unpredictable Execution Paths: AI could choose any action sequence
- Difficult Debugging: Hard to trace why specific decisions were made
- Inconsistent Results: Same task could produce different workflows
- Limited Scalability: Hard to compose complex, reliable systems
The modern platform addresses these issues through structured autonomy:
# Modern approach: Controlled autonomous execution within graph structure
class GraphExecution:
def execute_node(self, node_id: str):
# AI autonomy operates WITHIN predefined node boundaries
node_result = await self.execute_block(node_id, input_data)
# Graph structure determines WHAT happens next
next_nodes = self.get_dependent_nodes(node_id, node_result)
return next_nodes
- Predictable Workflow Skeleton: Defines the overall execution structure
- Reproducible Results: Same graph produces consistent behavior
- Easy Debugging: Clear visualization of execution flow
- Composability: Nodes can be reused and combined
- Individual Nodes: AI makes decisions within block boundaries
- Dynamic Parameters: AI chooses tool parameters and strategies
- Content Generation: AI creates outputs based on inputs
- Error Recovery: AI handles failures within node scope
# Graph manages the overall feedback loop structure
class GraphExecutionManager:
def execute_graph(self, graph_exec: GraphExecutionEntry):
queue = ExecutionQueue[NodeExecutionEntry]()
while not queue.empty():
node_exec = queue.get()
# Execute node with AI autonomy
result = await self.execute_node(node_exec)
# Graph determines next steps based on result
next_nodes = self.get_next_nodes(node_exec, result)
for next_node in next_nodes:
queue.add(next_node) # Structured feedback
# AI autonomy within individual blocks
class LLMBlock(Block):
def run(self, input_data: Input) -> BlockOutput:
# AI makes autonomous decisions within this scope
response = await self.llm_provider.create_completion(
prompt=self.build_prompt(input_data),
# AI chooses response strategy autonomously
)
return self.process_response(response)
- Reliability: Graphs provide predictable execution paths
- Maintainability: Visual workflows are easier to understand and modify
- Scalability: Nodes can be distributed and parallelized
- Debugging: Clear execution traces and state management
- Decision Making: AI still makes intelligent choices within nodes
- Adaptability: AI can handle varying inputs and contexts
- Creativity: AI generates dynamic content and responses
- Learning: AI can improve performance within node boundaries
# Easy to create complex workflows
workflow = Graph([
InputNode("user_query"),
LLMNode("analyze_query"), # AI autonomy here
ConditionalNode("route_decision"), # AI decides routing
ParallelNodes([
WebSearchNode("search_web"), # AI chooses search terms
DatabaseNode("query_db"), # AI constructs queries
]),
LLMNode("synthesize_results"), # AI combines results
OutputNode("final_response")
])
- Predictable Behavior: Know what the workflow will do
- Customizable: Can modify graph structure without coding
- Reliable: Consistent results for similar inputs
- Transparent: Can see exactly what happened
AutoGPT even provides an AgentExecutorBlock
that allows embedding autonomous agents within graph workflows:
class AgentExecutorBlock(Block):
def run(self, input_data: Input) -> BlockOutput:
# Execute a full autonomous agent as a single graph node
graph_exec = execution_utils.add_graph_execution(
graph_id=input_data.graph_id,
user_id=input_data.user_id,
inputs=input_data.inputs,
)
# The autonomous agent runs within this controlled context
return await self.execute_agent_graph(graph_exec)
- Graphs provide the "WHAT" and "WHEN": Define workflow structure and execution order
- AI provides the "HOW": Makes intelligent decisions within each step
- Together they enable: Reliable, intelligent, and maintainable systems
- Pure Autonomy: Great for research and exploration
- Pure Structure: Great for simple, repetitive tasks
- Hybrid Approach: Great for production systems that need both reliability and intelligence
The graph-based design doesn't eliminate autonomy—it channels and focuses it where it's most valuable while providing the structural guarantees needed for production systems. This represents AutoGPT's evolution from a research prototype to an enterprise-ready platform.
I'll examine the AutoGPT platform codebase to understand if it supports inter-node feedback loops - where nodes can send feedback or data back to previous nodes in the execution flow. Search files... Ran tool Ran tool Read file: classic/benchmark/agbenchmark/utils/dependencies/graphs.py Search files... Ran tool Based on my comprehensive analysis of the AutoGPT platform codebase, I can now provide a definitive answer about inter-node feedback loop support.
AutoGPT platform has limited support for inter-node feedback loops. While it supports some feedback mechanisms, it does not support true inter-node feedback loops where nodes can send data backward to previously executed nodes.
The platform supports nodes connecting to themselves:
# Example from block_autogen.py - Constant block loopback
Link(
source_id=input_query_constant.id,
sink_id=input_query_constant.id, # Same node
source_name="output",
sink_name="data",
)
Link( # Loopback for constant block
source_id=search_result_constant.id,
sink_id=search_result_constant.id, # Self-connection
source_name="output",
sink_name="data",
)
# Static links allow data reuse without consumption
class Link(BaseDbModel):
source_id: str
sink_id: str
source_name: str
sink_name: str
is_static: bool = False # When True, data persists for reuse
# Same node can execute multiple times with different inputs
for iexec in db_client.get_node_executions(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
statuses=[ExecutionStatus.INCOMPLETE],
):
# Re-execute with updated static link data
enqueued_executions.append(add_enqueued_execution(...))
The platform explicitly prevents cycles between different nodes:
# From graphs.py - Cycle detection algorithm
def is_circular(graph):
def dfs(node, visited, stack, parent_map):
visited.add(node)
stack.add(node)
for edge in graph["edges"]:
if edge["from"] == node:
if edge["to"] in stack:
# Detected a cycle - THIS IS PREVENTED
cycle_path = []
# ... build cycle path for error reporting
return cycle_path
return None
def validate_skill_tree(graph, skill_tree_name):
cycle_path = is_circular(graph)
if cycle_path:
cycle_str = " -> ".join(cycle_path)
raise ValueError(
f"{skill_tree_name} skill tree is circular! "
f"Detected circular path: {cycle_str}."
)
The execution model is strictly forward-only:
# Execution follows DAG topology - no backward flow
def _enqueue_next_nodes(...) -> list[NodeExecutionEntry]:
return [
execution
for link in node.output_links # Only forward links
for execution in register_next_executions(link)
]
# Graph validation ensures no cycles
@staticmethod
def _validate_graph(graph: BaseGraph, for_run: bool = False):
# Validates that all links form a valid DAG
# Prevents any circular dependencies between nodes
# Execution follows dependency order
while not queue.empty():
queued_node_exec = queue.get()
# Execute node and enqueue dependent nodes
# No mechanism for backward feedback
# From block_autogen.py - Error feedback triggers re-execution
Link(
source_id=block_installation.id,
sink_id=prompt_text_formatter.id,
source_name="error", # Error output
sink_name="values_#_previous_attempt", # Feeds back as input
),
Link( # Re-trigger search result on error
source_id=block_installation.id,
sink_id=search_result_constant.id,
source_name="error",
sink_name="input",
),
Note: This is not a true feedback loop but rather error-triggered re-execution of the same workflow.
# Embed autonomous agents within graph nodes
class AgentExecutorBlock(Block):
def run(self, input_data: Input) -> BlockOutput:
# Execute a full autonomous agent as a single node
# The agent can have internal feedback loops
graph_exec = execution_utils.add_graph_execution(...)
return await self.execute_agent_graph(graph_exec)
- Predictable Execution: DAG ensures deterministic execution order
- Debugging: Clear execution traces without circular dependencies
- Termination Guarantees: No infinite loops between nodes
- Parallel Execution: Independent nodes can run in parallel
- Resource Management: Predictable resource usage patterns
- State Management: Simpler state tracking without cycles
- Reproducible Results: Same graph produces consistent outcomes
- Error Isolation: Failures don't create cascading feedback loops
- Monitoring: Clear execution flow for observability
Feature | Classic AutoGPT | Modern Platform |
---|---|---|
Inter-Agent Feedback | ✅ Full support | ❌ Not supported |
Intra-Node Feedback | ✅ Built-in | ✅ Via self-connections |
Cycle Prevention | ✅ DAG enforcement | |
Predictability | ❌ Unpredictable | ✅ Deterministic |
Production Ready | ❌ Research-focused | ✅ Enterprise-ready |
AutoGPT platform prioritizes reliability and predictability over flexible feedback loops. While it supports some feedback mechanisms (self-connections, static links, error-triggered re-execution), it does not support true inter-node feedback loops where nodes can send data backward to previously executed nodes.
This design choice reflects the platform's evolution from a research prototype to an enterprise-ready system that values:
- Deterministic behavior over flexible autonomy
- Production reliability over experimental features
- Maintainable workflows over complex feedback systems
For use cases requiring complex feedback loops, users can either:
- Use Agent Executor blocks to embed autonomous agents
- Design multiple sequential graphs with external orchestration
- Implement feedback logic within individual nodes rather than between them
I'll examine the AutoGPT framework to understand how intra-node feedback loops are implemented - that is, how individual nodes can have internal feedback mechanisms. Search files... Search files... Read file: autogpt_platform/backend/backend/blocks/basic.py Ran tool Based on my comprehensive analysis of the AutoGPT framework, I can now provide a detailed explanation of how intra-node feedback loops are implemented.
AutoGPT implements intra-node feedback loops through several sophisticated mechanisms that allow individual nodes to maintain internal state and create self-referential data flows.
The most common intra-node feedback implementation uses the StoreValueBlock
:
class StoreValueBlock(Block):
class Input(BlockSchema):
input: Any = SchemaField(description="Trigger the block to produce output")
data: Any = SchemaField(description="Constant data retained in block", default=None)
class Output(BlockSchema):
output: Any = SchemaField(description="The stored data retained in the block")
def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "output", input_data.data or input_data.input
Self-Connection Pattern:
{
"source_id": "store_block_id",
"sink_id": "store_block_id", // Same node - self-connection
"source_name": "output",
"sink_name": "data",
"is_static": true // Enables persistent feedback
}
Chat and conversation blocks implement sophisticated self-referential patterns:
// Discord Chatbot example with history feedback
{
"source_id": "a33a4e7f-70fa-43ca-a0dd-713411ec4c9b",
"sink_id": "a33a4e7f-70fa-43ca-a0dd-713411ec4c9b", // Self-connection
"source_name": "updated_list",
"sink_name": "list",
"is_static": false
}
class Block:
def __init__(self, static_output: bool = False):
self.static_output = static_output # Enables persistent data flow
Static Link Behavior:
- Non-Consuming: Data persists after being read
- Multiple Access: Same output can feed multiple inputs
- State Retention: Maintains data across execution cycles
class Link(BaseDbModel):
source_id: str
sink_id: str
source_name: str
sink_name: str
is_static: bool = False # Key flag for feedback loops
Each node maintains internal state through multiple mechanisms:
// Frontend node state
export type CustomNodeData = {
hardcodedValues: { [key: string]: any }; // Persistent input values
executionResults?: NodeExecutionResult[]; // Execution history
connections: ConnectionData; // Connection state
isOutputStatic: boolean; // Static output flag
};
# Backend execution state
class NodeExecutionEntry:
node_exec_id: str
node_id: str
inputs: dict[str, Any] # Input state
outputs: dict[str, Any] # Output state
status: ExecutionStatus # Execution status
class SmartDecisionMakerBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Accumulate conversation history through self-feedback
input_data.conversation_history = input_data.conversation_history or []
# Process pending tool calls from history
pending_tool_calls = get_pending_tool_calls(input_data.conversation_history)
# Add new responses to history for next iteration
prompt.extend(tool_output)
class AddMemoryBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Memory blocks can reference their own previous outputs
if input_data.limit_memory_to_run:
params["run_id"] = graph_exec_id # Self-referential context
class Block:
def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
# Normal execution
yield "output", result
except Exception as e:
# Error feedback for self-correction
yield "error", str(e)
// Error output feeds back to trigger retry
{
"source_id": "block_id",
"sink_id": "block_id", // Self-connection
"source_name": "error",
"sink_name": "retry_input",
"is_static": false
}
const isInputHandleConnected = (key: string) => {
return data.connections?.some((conn: any) => {
return conn.target === id && conn.targetHandle === key;
});
};
const isOutputHandleConnected = (key: string) => {
return data.connections?.some((conn: any) => {
return conn.source === id && conn.sourceHandle === key;
});
};
const onConnect: OnConnect = useCallback((connection: Connection) => {
// Allow self-connections for feedback loops
if (connection.source === connection.target) {
// Self-connection logic
const newEdge: CustomEdge = {
id: formatEdgeID(connection),
data: { isStatic: sourceNode!.data.isOutputStatic },
...connection,
};
}
}, []);
- Self-Referential Links: Nodes can connect outputs to their own inputs
- Static Data Persistence:
is_static
flag enables non-consuming data flow - State Accumulation: Internal state builds up over multiple executions
- Context Preservation: Conversation history and memory persist across cycles
- Error Recovery: Error outputs can trigger self-correction mechanisms
# Intra-node feedback execution pattern
while not queue.empty():
node_exec = queue.get()
# Execute node with current state
result = await execute_node(node_exec)
# Check for self-connections
if has_self_connections(node_exec.node_id):
# Update internal state with output
update_node_state(node_exec.node_id, result)
# Re-enqueue if feedback conditions met
if should_continue_feedback(result):
queue.add(create_feedback_execution(node_exec))
- Conversation blocks accumulate message history
- Each response feeds back into context for next turn
- Maintains coherent multi-turn dialogue
- Blocks can refine their outputs through self-feedback
- Error correction through retry mechanisms
- Progressive improvement of results
- Nodes can implement state machine behavior
- Internal state transitions through self-connections
- Complex decision-making patterns
AutoGPT's intra-node feedback loops provide a sophisticated mechanism for:
- Internal State Management: Nodes maintain persistent state across executions
- Context Accumulation: Building up conversation history and memory
- Self-Correction: Error recovery through feedback mechanisms
- Iterative Processing: Progressive refinement of outputs
This design enables complex, stateful behavior within individual nodes while maintaining the overall DAG structure of the graph execution system.
I'll provide 3 concrete complex workflow examples using ASCII diagrams that showcase AutoGPT platform's capabilities.
Scenario: Automated customer support system that handles inquiries, escalates issues, and manages follow-ups.
┌─────────────────────────────────────────────────────────────────────────────────────────┐
│ E-COMMERCE CUSTOMER SUPPORT WORKFLOW │
└─────────────────────────────────────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Email/Chat │ │ Extract Info │ │ AI Sentiment │ │ Route Decision │
│ Webhook │───▶│ Block │───▶│ Analysis │───▶│ Block │
│ Trigger │ │ │ │ │ │ │
└──────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Customer Info │ │ Urgency Score │ │ Category │
│ Lookup │ │ (1-10) │ │ Classification │
│ │ │ │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
└────────────┬───────────┴────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ ROUTING LOGIC │
│ IF urgency > 8 OR sentiment < -0.5 → HUMAN ESCALATION │
│ ELIF category = "refund" → AUTOMATED REFUND FLOW │
│ ELIF category = "shipping" → TRACKING LOOKUP │
│ ELSE → AI RESPONSE GENERATION │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HUMAN AGENT │ │ AUTOMATED │ │ AI RESPONSE │
│ ESCALATION │ │ PROCESSES │ │ GENERATION │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Create Ticket │ │ Process Refund │ │ Knowledge Base │
│ in Zendesk │ │ Update Order │ │ Search │
│ Notify Agent │ │ Send Tracking │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ │ ▼
│ │ ┌─────────────────┐
│ │ │ Generate AI │
│ │ │ Response │
│ │ │ │
│ │ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ RESPONSE DELIVERY │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Send Email │ │ Update Chat │ │ Log to CRM │
│ Response │ │ Platform │ │ System │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ FOLLOW-UP AUTOMATION │
│ Schedule satisfaction survey (24h delay) │
│ Set reminder for agent follow-up (if escalated) │
│ Update customer profile with interaction history │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────────────────┐
│ CONTENT MARKETING AUTOMATION PIPELINE │
└─────────────────────────────────────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Content Idea │ │ Trend Analysis │ │ Competitor │ │ Topic Research │
│ Input/Schedule │───▶│ (Google │───▶│ Analysis │───▶│ & Validation │
│ │ │ Trends API) │ │ (Web Scraping) │ │ │
└──────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Trending │ │ Competitor │ │ SEO Keywords │
│ Keywords │ │ Content Gaps │ │ Research │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
└────────────┬───────────┴────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ CONTENT STRATEGY AI │
│ Analyze data → Generate content brief → Set targets │
│ • Target audience • Content type • Distribution plan │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ BLOG POST │ │ SOCIAL MEDIA │ │ VIDEO SCRIPT │
│ GENERATION │ │ CONTENT │ │ CREATION │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AI Blog Writer │ │ Platform- │ │ AI Script │
│ (GPT-4) │ │ Specific │ │ Generator │
│ + SEO Optimize │ │ Adaptations │ │ + Storyboard │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Grammar Check │ │ Image/GIF │ │ AI Voice │
│ Plagiarism │ │ Generation │ │ Generation │
│ Detection │ │ (DALL-E/Midj.) │ │ (ElevenLabs) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ QUALITY ASSURANCE │
│ • Content scoring • Brand compliance • Fact checking │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ APPROVAL WORKFLOW │
│ IF score > 8.5 → Auto-publish │
│ ELIF score > 7.0 → Send for review │
│ ELSE → Flag for major revision │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AUTO-PUBLISH │ │ HUMAN REVIEW │ │ REVISION │
│ │ │ │ │ REQUIRED │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ │
│ DISTRIBUTION │ │ Review Queue │ │
│ AUTOMATION │ │ Notification │ │
└─────────────────┘ └─────────────────┘ │
│ │ │
▼ ▼ │
┌─────────────────────────────────────────────────────────────┐ │
│ MULTI-PLATFORM PUBLISHING │ │
└─────────────────────────────────────────────────────────────┘ │
│ │
┌───────────────┼───────────────┬───────────────┬───────────────┐ │
▼ ▼ ▼ ▼ ▼ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ WordPress │ │ LinkedIn │ │ Twitter/X │ │ YouTube │ │ Medium │ │
│ Blog │ │ Articles │ │ Threads │ │ Videos │ │ Stories │ │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
└───────────────┼───────────────┴───────────────┴───────────────┘ │
▼ │
┌─────────────────────────────────────────────────────────────┐ │
│ PERFORMANCE TRACKING │ │
│ • Engagement metrics • SEO rankings • Conversion rates │ │
│ • A/B testing results • Audience feedback │ │
└─────────────────────────────────────────────────────────────┘ │
│ │
▼ │
┌─────────────────────────────────────────────────────────────┐ │
│ OPTIMIZATION FEEDBACK LOOP │ │
│ Analyze performance → Update content strategy → │ │
│ Retrain AI models → Improve future content │◀───────────┘
└─────────────────────────────────────────────────────────────┘
Scenario: Real-time financial transaction monitoring, risk assessment, and regulatory compliance automation.
┌─────────────────────────────────────────────────────────────────────────────────────────┐
│ FINANCIAL RISK MANAGEMENT & COMPLIANCE SYSTEM │
└─────────────────────────────────────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Transaction │ │ Data │ │ Customer │ │ External Data │
│ Stream │───▶│ Validation │───▶│ Profile │───▶│ Enrichment │
│ (Real-time) │ │ & Parsing │ │ Lookup │ │ │
└──────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────┐ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Payment APIs │ │ Schema │ │ KYC Database │ │ Sanctions │
│ Bank Feeds │ │ Validation │ │ Credit History │ │ Lists (OFAC) │
│ Card Networks │ │ Error Handling │ │ Risk Profile │ │ PEP Lists │
└──────────────────┘ └─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
└────────────┬───────────┴────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ RISK SCORING ENGINE │
│ ML Models: Fraud Detection + AML + Credit Risk │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ FRAUD RISK │ │ AML RISK │ │ CREDIT RISK │
│ ASSESSMENT │ │ ASSESSMENT │ │ ASSESSMENT │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Pattern │ │ Suspicious │ │ Exposure │
│ Analysis │ │ Activity │ │ Calculation │
│ Anomaly Det. │ │ Detection │ │ Limit Checks │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ RISK DECISION MATRIX │
│ │
│ HIGH RISK (Score > 8.0): │
│ → Block transaction + Manual review │
│ → Generate SAR/STR report │
│ → Notify compliance team │
│ │
│ MEDIUM RISK (Score 5.0-8.0): │
│ → Additional verification required │
│ → Enhanced monitoring │
│ → Customer notification │
│ │
│ LOW RISK (Score < 5.0): │
│ → Auto-approve transaction │
│ → Standard monitoring │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HIGH RISK │ │ MEDIUM RISK │ │ LOW RISK │
│ WORKFLOW │ │ WORKFLOW │ │ WORKFLOW │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Block Trans. │ │ Request Add'l │ │ Approve & │
│ Create Alert │ │ Documentation │ │ Process │
│ Queue Review │ │ Set Monitoring │ │ Transaction │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Generate │ │ Customer │ │ Update │
│ Compliance │ │ Communication │ │ Transaction │
│ Reports │ │ (SMS/Email) │ │ History │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ REGULATORY REPORTING │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SAR/STR │ │ CTR Reports │ │ FBAR/FATCA │
│ Generation │ │ (>$10K) │ │ Reporting │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ FinCEN │ │ IRS Filing │ │ International │
│ Submission │ │ Automation │ │ Regulatory │
│ │ │ │ │ Bodies │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ MONITORING & ANALYTICS │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Real-time │ │ Compliance │ │ Risk Model │
│ Dashboards │ │ Audit Trail │ │ Performance │
│ │ │ │ │ Monitoring │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Alert │ │ Regulatory │ │ Model │
│ Management │ │ Examination │ │ Retraining │
│ System │ │ Preparation │ │ Pipeline │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└────────────────────┼────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ CONTINUOUS IMPROVEMENT │
│ • False positive analysis • Model drift detection │
│ • Regulatory updates • Performance optimization │
│ • Feedback incorporation • Automated model updates │
└─────────────────────────────────────────────────────────┘
- Multiple data sources feeding into single decision points
- Real-time and batch processing combinations
- External API integrations
- Risk-based routing decisions
- Multi-criteria evaluation systems
- Escalation pathways
- Simultaneous analysis across different dimensions
- Independent workflow branches that merge later
- Concurrent external service calls
- Performance monitoring feeding back into optimization
- Quality scores influencing future processing
- Continuous learning and model improvement
- Multiple external systems (CRM, payment processors, regulatory bodies)
- API orchestration and error handling
- Data transformation between systems
- Regulatory reporting automation
- Complete transaction logging
- Audit-ready documentation generation
These examples showcase AutoGPT's ability to handle enterprise-level complexity while maintaining visual clarity and operational reliability.
I'll search through the AutoGPT platform codebase to understand how conditional edges and dynamic routing are implemented. Search files... Search files...
Ran tool Search files... Based on my comprehensive analysis of the AutoGPT platform codebase, I can now provide you with a detailed answer about conditional edges and dynamic routing capabilities.
I'll create 3 concrete complex LinkedIn workflow examples using ASCII diagrams. Let me first explore the AutoGPT platform structure to understand the available blocks and components for LinkedIn workflows. Search files... Ran tool Search files... Search files... Based on my exploration of the AutoGPT platform, I can now create 3 concrete complex LinkedIn workflow examples using ASCII diagrams. The platform has various blocks for AI text generation, web requests, content creation, and social media automation that can be combined for LinkedIn workflows.
Here are 3 concrete complex LinkedIn workflow examples for AutoGPT platform:
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ LinkedIn Content Creator Workflow │
└─────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Agent Input │ │ Extract Website │ │ AI Text │ │ AI Text │
│ │───▶│ Content │───▶│ Summarizer │───▶│ Generator │
│ Industry Topic │ │ │ │ │ │ │
│ Target URL │ │ Scrape articles │ │ Key insights │ │ LinkedIn post │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Send Web │ │ Store Value │ │ AI List │ │ Fill Text │
│ Request │◀───│ │◀───│ Generator │◀───│ Template │
│ │ │ Post content │ │ │ │ │
│ POST to │ │ + hashtags │ │ Trending │ │ Add hashtags │
│ LinkedIn API │ │ │ │ hashtags │ │ & CTA │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ Condition │ │ Agent Output │ │ Get Current │ │
│ Block │───▶│ │ │ Time │──────────────┘
│ │ │ Success/Error │ │ │
│ Check response │ │ Message │ │ Optimal posting │
│ status │ │ │ │ time check │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Flow Description:
1. Input industry topic and source URL
2. Scrape content from target website
3. Summarize key insights using AI
4. Generate LinkedIn-optimized post content
5. Create trending hashtags list
6. Format post with hashtags and CTA
7. Check optimal posting time
8. Post to LinkedIn via API
9. Return success/error status
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ LinkedIn Lead Generation Workflow │
└─────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Agent Input │ │ Send Web │ │ Extract Website │ │ AI Structured │
│ │───▶│ Request │───▶│ Content │───▶│ Response Gen │
│ Company domain │ │ │ │ │ │ │
│ Job titles │ │ LinkedIn search │ │ Parse profiles │ │ Extract contact │
│ Industry │ │ API calls │ │ & company info │ │ information │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Add to List │ │ AI Text │ │ Find in │ │ Data Sampling │
│ │◀───│ Generator │◀───│ Dictionary │◀───│ │
│ Qualified │ │ │ │ │ │ Filter top │
│ leads list │ │ Personalized │ │ Match criteria │ │ prospects │
│ │ │ message │ │ & scoring │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ Send Web │ │ Countdown Timer │ │ AI Conversation │ │
│ Request │ │ │ │ │──────────────┘
│ │ │ Rate limiting │ │ Follow-up │
│ LinkedIn │ │ between │ │ sequence │
│ message API │ │ messages │ │ generator │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Store Value │ │ Agent Output │ │ Read CSV │
│ │───▶│ │ │ │
│ Track sent │ │ Campaign │ │ Import existing │
│ messages & │ │ results & │ │ contact list │
│ responses │ │ metrics │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Flow Description:
1. Input target company domains and job criteria
2. Search LinkedIn for matching profiles
3. Extract and parse profile information
4. Structure contact data and company details
5. Filter and sample top prospects
6. Score leads based on criteria matching
7. Generate personalized outreach messages
8. Send messages with rate limiting
9. Track responses and follow-up sequences
10. Export campaign results and metrics
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ LinkedIn Analytics & Monitoring Workflow │
└─────────────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Agent Input │ │ Send Web │ │ Extract Website │ │ AI List │
│ │───▶│ Request │───▶│ Content │───▶│ Generator │
│ Competitor │ │ │ │ │ │ │
│ LinkedIn URLs │ │ Fetch company │ │ Parse posts, │ │ Extract key │
│ Keywords │ │ pages & posts │ │ engagement data │ │ topics & trends │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Calculator │ │ AI Text │ │ Match Text │ │ Add to │
│ │◀───│ Summarizer │◀───│ Pattern │◀───│ Dictionary │
│ Engagement │ │ │ │ │ │ │
│ rate & reach │ │ Competitor │ │ Filter by │ │ Organize data │
│ calculations │ │ analysis │ │ keywords │ │ by competitor │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
▼ ▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ Ideogram Model │ │ AI Structured │ │ Get Current │ │
│ │ │ Response Gen │ │ Date │──────────────────┘
│ Generate │ │ │ │ │
│ charts & │ │ Insights & │ │ Timestamp │
│ visualizations │ │ recommendations │ │ analysis │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Send Email │ │ Store Value │ │ Condition │
│ │ │ │ │ Block │
│ Weekly report │ │ Historical │ │ │
│ to stakeholders │ │ data tracking │ │ Alert on │
│ │ │ │ │ significant │
└─────────────────┘ └─────────────────┘ │ changes │
│ │ └─────────────────┘
▼ ▼ │
┌─────────────────┐ ┌─────────────────┐ ▼
│ Agent Output │ │ Read CSV │ ┌─────────────────┐
│ │◀───│ │ │ AI Text │
│ Complete │ │ Load previous │ │ Generator │
│ analytics │ │ data for │ │ │
│ dashboard │ │ comparison │ │ Alert messages │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Flow Description:
1. Input competitor LinkedIn URLs and monitoring keywords
2. Fetch competitor company pages and recent posts
3. Extract post content, engagement metrics, and follower data
4. Organize data by competitor and filter by keywords
5. Identify trending topics and content patterns
6. Calculate engagement rates and reach metrics
7. Generate comparative analysis and insights
8. Create visualizations and charts
9. Set up alerts for significant changes
10. Generate weekly reports and dashboard
11. Store historical data for trend analysis
12. Email reports to stakeholders
These workflows demonstrate how AutoGPT's block-based architecture can create sophisticated LinkedIn automation systems by combining AI text generation, web scraping, data processing, and API integration blocks. Each workflow addresses different LinkedIn use cases:
- Content Creation: Automated content generation and publishing
- Lead Generation: Prospect identification and outreach automation
- Analytics: Competitive intelligence and performance monitoring
The workflows use available AutoGPT blocks like AI Text Generator, Send Web Request, Extract Website Content, Data Sampling, and various utility blocks to create comprehensive LinkedIn automation solutions.
Yes, AutoGPT platform supports conditional edges and dynamic routing through multiple mechanisms:
The platform's primary mechanism for conditional routing is the SmartDecisionMakerBlock
, which uses AI to intelligently decide which tool/node to execute next.
class SmartDecisionMakerBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Creates function signatures for all connected tools
tool_functions = self._create_function_signature(node_id)
# Uses LLM to decide which tool to call based on context
response = llm.llm_call(
credentials=credentials,
llm_model=input_data.model,
prompt=prompt,
tools=tool_functions, # Available next nodes
parallel_tool_calls=False,
)
# Routes to selected tool with arguments
for tool_call in response.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
for arg_name, arg_value in tool_args.items():
yield f"tools_^_{tool_name}_~_{arg_name}", arg_value
- AI Decision Making: Uses GPT/Claude to analyze context and choose appropriate next node
- Multiple Tool Options: Can connect to multiple downstream nodes and select one dynamically
- Context-Aware: Considers conversation history and current state
- Function Signatures: Automatically generates tool descriptions for LLM decision-making
For deterministic conditional logic, AutoGPT provides the ConditionBlock
:
class ConditionBlock(Block):
def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Evaluate condition
result = comparison_funcs[operator](value1, value2)
yield "result", result
# Route based on condition result
if result:
yield "yes_output", yes_value
else:
yield "no_output", no_value
- Comparison Operators:
==
,!=
,>
,<
,>=
,<=
- Multiple Output Paths:
yes_output
andno_output
pins - Type Flexible: Works with numbers, strings, booleans
- Branching Logic: Connect different nodes to yes/no outputs
The execution engine supports dynamic routing through the link resolution system:
def _enqueue_next_nodes(node: Node, output: BlockData, ...) -> list[NodeExecutionEntry]:
def _register_next_executions(node_link: Link) -> list[NodeExecutionEntry]:
# Parse dynamic output names
next_data = parse_execution_output(output, next_output_name)
# Route to next node based on output data
if next_data is None:
return [] # Skip this path
# Enqueue execution for matched path
return [add_enqueued_execution(...)]
# Process all possible output links
return [execution for link in node.output_links
for execution in register_next_executions(link)]
The visual builder supports conditional edge creation:
// Smart Decision Maker special routing
if (sourceNode?.data.block_id === SpecialBlockID.SMART_DECISION &&
sourceName.toLowerCase() === "tools") {
sourceName = `tools_^_${normalizeToolName(getToolFuncName(edge.target))}_~_${normalizeToolName(edge.targetHandle || "")}`;
}
[User Input] → [Smart Decision Maker] → [Email Tool]
→ [Slack Tool]
→ [Database Tool]
[Data Input] → [Condition Block] → [Process A] (if condition true)
→ [Process B] (if condition false)
[Context] → [Smart Decision Maker] → [Web Search] (if needs info)
→ [Calculator] (if needs math)
→ [File Writer] (if needs storage)
✅ AI-Powered Routing: Smart Decision Maker uses LLM to choose next nodes
✅ Logic-Based Branching: Condition Block for deterministic routing
✅ Multiple Output Paths: Nodes can have multiple conditional outputs
✅ Dynamic Tool Selection: Runtime decision of which tool to execute
✅ Context-Aware Decisions: Considers conversation history and state
✅ Visual Builder Support: Drag-and-drop conditional connections
✅ Function Call Integration: Seamless integration with tool calling
❌ No Custom Functions: Can't write custom routing functions (only AI or predefined logic)
❌ Limited Condition Types: Condition block only supports basic comparisons
❌ No Complex Logic: No support for AND/OR/NOT combinations in single block
In summary, AutoGPT platform provides robust conditional edge support through AI-powered decision making (Smart Decision Maker) and logic-based branching (Condition Block), enabling sophisticated dynamic workflow routing based on context, data, and intelligent decision-making.
I'll help you understand how to implement conditional edges in Apache Flink DAG-based jobs. Let me search for information about Flink's conditional processing capabilities. Ran tool Ran tool Based on my comprehensive research of Apache Flink's capabilities, I can now provide you with a detailed answer about implementing conditional edges in Flink DAG-based jobs.
Apache Flink provides several powerful mechanisms to implement conditional edges and dynamic routing in DAG-based jobs. Here are the main approaches:
Flink's built-in Split and Select operators provide the most direct way to implement conditional edges:
// Split stream based on conditions
SplitStream<Integer> split = dataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
});
// Route to different processing paths
DataStream<Integer> evenStream = split.select("even");
DataStream<Integer> oddStream = split.select("odd");
DataStream<Integer> allStream = split.select("even", "odd");
SplitStream<Transaction> transactionSplit = transactionStream.split(
new OutputSelector<Transaction>() {
@Override
public Iterable<String> select(Transaction transaction) {
List<String> tags = new ArrayList<>();
if (transaction.getAmount() > 10000) {
tags.add("high-value");
}
if (transaction.getRegion().equals("US")) {
tags.add("domestic");
} else {
tags.add("international");
}
if (transaction.isSuspicious()) {
tags.add("fraud-check");
}
return tags;
}
}
);
// Route to different processing pipelines
DataStream<Transaction> highValueStream = transactionSplit.select("high-value");
DataStream<Transaction> fraudCheckStream = transactionSplit.select("fraud-check");
DataStream<Transaction> internationalStream = transactionSplit.select("international");
For more complex conditional logic, use ProcessFunction with Side Outputs:
public class ConditionalRoutingProcessFunction extends ProcessFunction<Event, Event> {
// Define output tags for different conditions
private final OutputTag<Event> highPriorityTag = new OutputTag<Event>("high-priority"){};
private final OutputTag<Event> lowPriorityTag = new OutputTag<Event>("low-priority"){};
private final OutputTag<Event> errorTag = new OutputTag<Event>("error"){};
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
try {
// Complex conditional logic
if (event.getPriority() > 8) {
// Route to high priority processing
ctx.output(highPriorityTag, event);
} else if (event.getPriority() > 3) {
// Continue in main stream
out.collect(event);
} else {
// Route to low priority processing
ctx.output(lowPriorityTag, event);
}
} catch (Exception e) {
// Route errors to error handling
ctx.output(errorTag, event);
}
}
}
// Apply the function and extract side outputs
SingleOutputStreamOperator<Event> mainStream = dataStream
.process(new ConditionalRoutingProcessFunction());
DataStream<Event> highPriorityStream = mainStream.getSideOutput(highPriorityTag);
DataStream<Event> lowPriorityStream = mainStream.getSideOutput(lowPriorityTag);
DataStream<Event> errorStream = mainStream.getSideOutput(errorTag);
For runtime-configurable conditional logic, use Broadcast State Pattern:
// Define routing rules
public class RoutingRule {
private String condition;
private String targetStream;
private Map<String, Object> parameters;
// getters/setters
}
public class DynamicRoutingProcessFunction
extends BroadcastProcessFunction<Event, RoutingRule, Event> {
private final MapStateDescriptor<String, RoutingRule> rulesStateDescriptor =
new MapStateDescriptor<>("routing-rules", String.class, RoutingRule.class);
private final OutputTag<Event> streamA = new OutputTag<Event>("stream-a"){};
private final OutputTag<Event> streamB = new OutputTag<Event>("stream-b"){};
private final OutputTag<Event> streamC = new OutputTag<Event>("stream-c"){};
@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<Event> out) {
// Get current routing rules from broadcast state
ReadOnlyBroadcastState<String, RoutingRule> rulesState =
ctx.getBroadcastState(rulesStateDescriptor);
// Apply routing rules
for (Map.Entry<String, RoutingRule> entry : rulesState.immutableEntries()) {
RoutingRule rule = entry.getValue();
if (evaluateCondition(event, rule)) {
routeToTarget(event, rule.getTargetStream(), ctx, out);
return; // First match wins
}
}
// Default routing
out.collect(event);
}
@Override
public void processBroadcastElement(RoutingRule rule, Context ctx, Collector<Event> out) {
// Update routing rules
BroadcastState<String, RoutingRule> rulesState =
ctx.getBroadcastState(rulesStateDescriptor);
rulesState.put(rule.getRuleId(), rule);
}
private boolean evaluateCondition(Event event, RoutingRule rule) {
// Implement your condition evaluation logic
// Could use expression engines like MVEL, SpEL, etc.
return true; // Simplified
}
private void routeToTarget(Event event, String target, ReadOnlyContext ctx, Collector<Event> out) {
switch (target) {
case "stream-a":
ctx.output(streamA, event);
break;
case "stream-b":
ctx.output(streamB, event);
break;
case "stream-c":
ctx.output(streamC, event);
break;
default:
out.collect(event);
}
}
}
Use multiple Filter operators for simple conditional routing:
// Original stream
DataStream<Transaction> transactions = ...;
// Create conditional branches
DataStream<Transaction> highValueTransactions = transactions
.filter(t -> t.getAmount() > 10000);
DataStream<Transaction> internationalTransactions = transactions
.filter(t -> !t.getRegion().equals("US"));
DataStream<Transaction> suspiciousTransactions = transactions
.filter(t -> t.isSuspicious());
DataStream<Transaction> normalTransactions = transactions
.filter(t -> t.getAmount() <= 10000 &&
t.getRegion().equals("US") &&
!t.isSuspicious());
Use ConnectedStreams for conditional processing based on control streams:
// Control stream with routing decisions
DataStream<RoutingDecision> controlStream = ...;
DataStream<Event> dataStream = ...;
// Connect streams
ConnectedStreams<Event, RoutingDecision> connectedStreams =
dataStream.connect(controlStream);
// Process with conditional logic
SingleOutputStreamOperator<Event> result = connectedStreams
.process(new CoProcessFunction<Event, RoutingDecision, Event>() {
private ValueState<String> currentRoutingMode;
@Override
public void processElement1(Event event, Context ctx, Collector<Event> out) {
String mode = currentRoutingMode.value();
if ("priority".equals(mode)) {
// Route based on priority
if (event.getPriority() > 5) {
out.collect(event);
}
} else if ("region".equals(mode)) {
// Route based on region
if ("US".equals(event.getRegion())) {
out.collect(event);
}
}
}
@Override
public void processElement2(RoutingDecision decision, Context ctx, Collector<Event> out) {
// Update routing mode
currentRoutingMode.update(decision.getMode());
}
});
Here's a complete example implementing conditional edges for fraud detection:
public class FraudDetectionPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Input stream
DataStream<Transaction> transactions = env.addSource(new TransactionSource());
// Split based on transaction characteristics
SplitStream<Transaction> splitTransactions = transactions.split(
new OutputSelector<Transaction>() {
@Override
public Iterable<String> select(Transaction transaction) {
List<String> tags = new ArrayList<>();
if (transaction.getAmount() > 10000) {
tags.add("high-value");
}
if (transaction.isInternational()) {
tags.add("international");
}
if (transaction.isOffHours()) {
tags.add("off-hours");
}
if (tags.isEmpty()) {
tags.add("normal");
}
return tags;
}
}
);
// Route to different processing pipelines
DataStream<Alert> highValueAlerts = splitTransactions
.select("high-value")
.process(new HighValueFraudDetector());
DataStream<Alert> internationalAlerts = splitTransactions
.select("international")
.process(new InternationalFraudDetector());
DataStream<Alert> offHoursAlerts = splitTransactions
.select("off-hours")
.process(new OffHoursFraudDetector());
DataStream<Transaction> normalTransactions = splitTransactions
.select("normal")
.process(new NormalTransactionProcessor());
// Combine all alerts
DataStream<Alert> allAlerts = highValueAlerts
.union(internationalAlerts)
.union(offHoursAlerts);
// Output results
allAlerts.addSink(new AlertSink());
normalTransactions.addSink(new TransactionSink());
env.execute("Fraud Detection with Conditional Routing");
}
}
- Minimize Data Duplication: Use side outputs instead of multiple filters on the same stream
- Lazy Evaluation: Use process functions for complex conditions to avoid unnecessary computations
- State Management: Keep broadcast state small for dynamic routing rules
- Parallelism: Ensure conditional branches maintain good parallelism distribution
// Efficient conditional routing with minimal state
public class EfficientRoutingFunction extends ProcessFunction<Event, Event> {
private transient ValueState<Long> lastProcessedTime;
@Override
public void processElement(Event event, Context ctx, Collector<Event> out) {
// Stateful conditional logic with cleanup
long currentTime = ctx.timestamp();
Long lastTime = lastProcessedTime.value();
if (lastTime == null || currentTime - lastTime > 60000) {
// Route to batch processing
ctx.output(batchTag, event);
lastProcessedTime.update(currentTime);
} else {
// Route to real-time processing
out.collect(event);
}
}
}
Flink provides multiple approaches for implementing conditional edges:
- Split/Select: Best for simple, static conditional routing
- Process Functions + Side Outputs: Ideal for complex conditional logic
- Broadcast State: Perfect for dynamic, runtime-configurable routing
- Filter Operations: Simple but can lead to data duplication
- Connected Streams: Great for control-flow based routing
Choose the approach based on your specific requirements for complexity, performance, and dynamic configurability. The Split/Select pattern is often the most straightforward starting point, while Process Functions with Side Outputs provide the most flexibility for complex scenarios.