Skip to content

Instantly share code, notes, and snippets.

@raoulbia-ai
Created February 4, 2026 12:56
Show Gist options
  • Select an option

  • Save raoulbia-ai/6bb4f930f60a475fecf6ef82b87afc6d to your computer and use it in GitHub Desktop.

Select an option

Save raoulbia-ai/6bb4f930f60a475fecf6ef82b87afc6d to your computer and use it in GitHub Desktop.
Adds A2A (Agent-to-Agent) protocol support to vanilla Langflow. Required for using the A2A Server Agent component.
"""
Langflow A2A Service Patch
==========================
This patch adds A2A (Agent-to-Agent) protocol support to Langflow.
Required for using the A2A Server Agent component.
Installation:
1. Create directory: langflow/services/a2a/
2. Add the files below to that directory
3. Update langflow/services/schema.py (add ServiceType)
4. Update langflow/services/deps.py (add get_a2a_service)
5. Restart Langflow
Requirements: pip install a2a-sdk
Related gists:
- A2A Server Agent: https://gist.github.com/raoulbia-ai/285152d7c789143dfe6b7a848f9dd204
- A2A Agent Connector: https://gist.github.com/raoulbia-ai/934f1fd83f09638dd2343ad152a96782
"""
# =============================================================================
# FILE 1: langflow/services/a2a/__init__.py
# =============================================================================
__init__py = '''
"""A2A service module for Agent2Agent protocol support."""
from langflow.services.a2a.factory import A2AServiceFactory
from langflow.services.a2a.service import A2AService, AgentEndpoint
__all__ = ["A2AService", "A2AServiceFactory", "AgentEndpoint"]
'''
# =============================================================================
# FILE 2: langflow/services/a2a/service.py
# =============================================================================
service_py = '''
"""A2A Service for exposing Langflow agents via Agent2Agent Protocol."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from loguru import logger
from langflow.services.base import Service
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from a2a.types import AgentCard, Message as A2AMessage, Task
try:
from a2a.types import AgentCard, AgentSkill, Task, AgentCapabilities
from a2a.types import Message as A2AMessage
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
logger.warning("A2A SDK not available. Install with: pip install a2a-sdk")
class AgentEndpoint:
"""Represents a registered A2A agent endpoint."""
def __init__(
self,
component_id: str,
flow_id: str,
agent_card: AgentCard,
executor_callable: Any,
) -> None:
self.component_id = component_id
self.flow_id = flow_id
self.agent_card = agent_card
self.executor_callable = executor_callable
self.created_at = datetime.now()
class A2AService(Service):
"""Service for managing A2A protocol endpoints and agent registry."""
name = "a2a_service"
def __init__(self) -> None:
super().__init__()
self.agent_registry: dict[str, AgentEndpoint] = {}
self.task_store: dict[str, Task] = {}
logger.info("A2A Service initialized")
def register_agent(
self,
component_id: str,
flow_id: str,
agent_card: AgentCard,
executor_callable: Any,
) -> str:
endpoint = AgentEndpoint(
component_id=component_id,
flow_id=flow_id,
agent_card=agent_card,
executor_callable=executor_callable,
)
self.agent_registry[component_id] = endpoint
url_path = f"/a2a/{flow_id}/{component_id}"
logger.info(f"Registered A2A agent: {agent_card.name} at {url_path}")
return url_path
def unregister_agent(self, component_id: str) -> None:
if component_id in self.agent_registry:
agent = self.agent_registry[component_id]
logger.info(f"Unregistered A2A agent: {agent.agent_card.name}")
del self.agent_registry[component_id]
def get_agent(self, component_id: str) -> AgentEndpoint | None:
return self.agent_registry.get(component_id)
def get_agent_card(self, component_id: str) -> AgentCard | None:
agent = self.get_agent(component_id)
return agent.agent_card if agent else None
def list_agents(self, flow_id: str | None = None) -> list[AgentEndpoint]:
if flow_id:
return [a for a in self.agent_registry.values() if a.flow_id == flow_id]
return list(self.agent_registry.values())
async def handle_message_send(
self,
component_id: str,
message: dict[str, Any],
) -> dict[str, Any]:
agent = self.get_agent(component_id)
if not agent:
raise ValueError(f"Agent not found: {component_id}")
try:
return await agent.executor_callable(message)
except Exception as e:
logger.exception(f"Error executing agent {component_id}: {e}")
raise
def generate_agent_card(
self,
agent_name: str,
description: str,
component_id: str,
flow_id: str,
skills: list[AgentSkill] | None = None,
base_url: str | None = None,
) -> AgentCard:
if not A2A_AVAILABLE:
raise ImportError("A2A SDK not available")
url = base_url or f"http://localhost:7860/a2a/{flow_id}/{component_id}"
return AgentCard(
name=agent_name,
description=description,
url=url,
version="1.0.0",
defaultInputModes=["text/plain", "application/json"],
defaultOutputModes=["text/plain", "application/json"],
capabilities=AgentCapabilities(streaming=True, pushNotifications=False),
skills=skills or [],
)
async def teardown(self) -> None:
logger.info("Shutting down A2A service")
self.agent_registry.clear()
self.task_store.clear()
'''
# =============================================================================
# FILE 3: langflow/services/a2a/factory.py
# =============================================================================
factory_py = '''
"""Factory for creating A2A service instances."""
from typing import TYPE_CHECKING
from typing_extensions import override
from langflow.services.a2a.service import A2AService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class A2AServiceFactory(ServiceFactory):
"""Factory for creating A2A service instances."""
def __init__(self) -> None:
super().__init__(A2AService)
@override
def create(self, settings_service: "SettingsService") -> A2AService:
return A2AService()
'''
# =============================================================================
# PATCH 1: Add to langflow/services/schema.py (in ServiceType enum)
# =============================================================================
schema_patch = '''
# Add this line to the ServiceType enum in langflow/services/schema.py:
class ServiceType(str, Enum):
# ... existing entries ...
A2A_SERVICE = "a2a_service" # <-- ADD THIS LINE
'''
# =============================================================================
# PATCH 2: Add to langflow/services/deps.py
# =============================================================================
deps_patch = '''
# Add this import at the top of langflow/services/deps.py:
from langflow.services.a2a.service import A2AService
# Add this function to langflow/services/deps.py:
def get_a2a_service() -> A2AService:
"""Retrieves the A2AService instance from the service manager.
Returns:
A2AService: The A2AService instance.
"""
from langflow.services.a2a.factory import A2AServiceFactory
return get_service(ServiceType.A2A_SERVICE, A2AServiceFactory())
'''
# =============================================================================
# INSTALLATION INSTRUCTIONS
# =============================================================================
instructions = '''
INSTALLATION INSTRUCTIONS
=========================
1. Locate your Langflow installation's source code:
- If installed via pip: find with `python -c "import langflow; print(langflow.__file__)"`
- If running from source: navigate to src/backend/base/langflow/
2. Create the A2A service directory:
mkdir -p langflow/services/a2a/
3. Create three files in langflow/services/a2a/:
- __init__.py (copy content from __init__py variable above)
- service.py (copy content from service_py variable above)
- factory.py (copy content from factory_py variable above)
4. Edit langflow/services/schema.py:
- Find the ServiceType enum
- Add: A2A_SERVICE = "a2a_service"
5. Edit langflow/services/deps.py:
- Add import: from langflow.services.a2a.service import A2AService
- Add the get_a2a_service() function (see deps_patch above)
6. Install the A2A SDK:
pip install a2a-sdk
7. Restart Langflow
VERIFICATION
============
After restarting, you should be able to use the A2A Server Agent component
without import errors.
'''
if __name__ == "__main__":
print(instructions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment