Skip to main content
Galileo traces Google A2A interactions using the galileo-a2a instrumentor — giving you a single distributed trace across agents, including LLM calls, tool use, and cross-agent handoffs.

Setup

1

Installation

Add the OpenTelemetry packages to your project:
pip install opentelemetry-api opentelemetry-sdk \
            opentelemetry-exporter-otlp
The opentelemetry-api and opentelemetry-sdk packages provide the core OpenTelemetry functionality. The opentelemetry-exporter-otlp package enables sending traces to Galileo’s OTLP endpoint.
2

Create environment variables for your Galileo settings

Set environment variables for your Galileo settings, for example in a .env file. These environment variables are consumed by the GalileoSpanProcessorto authenticate and route traces to the correct Galileo Project and Log stream:
# Your Galileo API key
GALILEO_API_KEY="your-galileo-api-key"

# Your Galileo project name
GALILEO_PROJECT="your-galileo-project-name"

# The name of the Log stream you want to use for logging
GALILEO_LOG_STREAM="your-galileo-log-stream "

# Provide the console url below if you are using a
# custom deployment, and not using the free tier, or app.galileo.ai.
# This will look something like “console.galileo.yourcompany.com”.
# GALILEO_CONSOLE_URL="your-galileo-console-url"
3

Self hosted deployments: Set the OTel endpoint

Skip this step if you are using Galileo Cloud.
The OTel endpoint is different from Galileo’s regular API endpoint and is specifically designed to receive telemetry data in the OTLP format.If you are using:
  • Galileo Cloud at app.galileo.ai, then you don’t need to provide a custom OTel endpoint. The default endpoint https://api.galileo.ai/otel/traces will be used automatically.
  • A self-hosted Galileo deployment, replace the https://api.galileo.ai/otel/traces endpoint with your deployment URL. The format of this URL is based on your console URL, replacing console with api and appending /otel/traces.
For example:
  • if your console URL is https://console.galileo.example.com, the OTel endpoint would be https://api.galileo.example.com/otel/traces
  • if your console URL is https://console-galileo.apps.mycompany.com, the OTel endpoint would be https://api-galileo.apps.mycompany.com/otel/traces
The convention is to store this in the GALILEO_CONSOLE_URL environment variable. For example:
os.environ["GALILEO_CONSOLE_URL"] = "https://api.galileo.ai"
4

Initialize and create the Galileo span processor

The GalileoSpanProcessor automatically configures authentication and metadata using your environment variables. It also:
  • Auto-builds OTLP headers using your Galileo credentials
  • Configures the correct OTLP trace endpoint
  • Registers a batch span processor that exports traces to Galileo
from galileo import otel  

# GalileoSpanProcessor (no manual OTLP config required) loads the env vars for 
# the Galileo API key, Project, and Log stream. Make sure to set them first. 
galileo_span_processor = otel.GalileoSpanProcessor(
    # Optional parameters if not set, uses env var
    # project=os.environ["GALILEO_PROJECT"], 
    # logstream=os.environ.get("GALILEO_LOG_STREAM"),  
)
5

Install and instrument A2A

pip install galileo-a2a opentelemetry-instrumentation-langchain
from galileo_a2a import A2AInstrumentor
from opentelemetry.instrumentation.langchain import LangchainInstrumentor

# Trace A2A protocol
A2AInstrumentor().instrument(tracer_provider=provider, agent_name="my-agent")

# Trace LangChain/LangGraph agents, LLM calls, and tools
LangchainInstrumentor().instrument(tracer_provider=provider)
All a2a-sdk client and server methods are now traced automatically.

How it works

When Agent A calls Agent B, trace context is propagated through A2A message metadata. Both agents’ spans appear in one distributed trace:
Agent A: LangGraph orchestrator             Agent B: LangChain researcher
+-----------------------------------+       +------------------------------+
| invoke_agent LangGraph            |       | a2a.server.on_message_send   |
|  +-- plan (LLM)                   |       |  +-- invoke_agent LangGraph  |
|  +-- delegate                     |  a2a  |     +-- LLM + tool_calls     |
|  |    +-- a2a.client.send_message |-----> |     +-- search_kb (tool)     |
|  +-- synthesize (LLM)             |  ctx  |     +-- LLM final answer     |
+-----------------------------------+       +------------------------------+
                            Single trace in Galileo
A2A context_id is mapped to session.id, grouping all interactions in the same conversation into one Galileo session.

Full example

Two LangGraph agents in one script. Agent A (orchestrator) plans, delegates to Agent B via A2A, then synthesizes. Agent B (researcher) uses a tool and LLM to answer. Copy-paste and run.
pip install galileo-a2a "galileo[otel]" langchain langchain-openai langgraph \
    opentelemetry-instrumentation-langchain uvicorn starlette sse-starlette
.env
GALILEO_API_KEY=your-galileo-api-key
GALILEO_PROJECT=your-galileo-project
GALILEO_LOG_STREAM=your-log-stream
OPENAI_API_KEY=your-openai-api-key
GALILEO_CONSOLE_URL=your-galileo-cluster-url
import asyncio
import uuid

import httpx
import uvicorn
from a2a.client import ClientConfig, ClientFactory
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication
from a2a.server.events import EventQueue, InMemoryQueueManager
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities, AgentCard, AgentSkill, Message, Role,
    TaskState, TaskStatus, TaskStatusUpdateEvent, TextPart,
)
from galileo.otel import GalileoSpanProcessor, add_galileo_span_processor
from galileo_a2a import A2AInstrumentor
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import END, START, StateGraph
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from starlette.applications import Starlette
from typing_extensions import TypedDict

# --- Galileo tracing setup ---
provider = TracerProvider()
add_galileo_span_processor(provider, GalileoSpanProcessor())
A2AInstrumentor().instrument(tracer_provider=provider, agent_name="orchestrator")
LangchainInstrumentor().instrument(tracer_provider=provider)

llm = ChatOpenAI(model="gpt-4o-mini")


# --- Agent B: researcher (served over A2A) ---

@tool
def search_kb(query: str) -> str:
    """Search the travel knowledge base."""
    if "paris" in query.lower():
        return "Eiffel Tower 330m, Louvre 9.6M visitors/yr, 20 arrondissements."
    return f"No results for: {query}"

researcher = create_agent(
    llm, [search_kb],
    system_prompt="Use search_kb to find facts, then summarize for a traveler.",
)

class ResearcherExecutor(AgentExecutor):
    async def execute(self, ctx: RequestContext, queue: EventQueue) -> None:
        result = await researcher.ainvoke({"messages": [("user", ctx.get_user_input())]})
        await queue.enqueue_event(TaskStatusUpdateEvent(
            task_id=ctx.task_id, context_id=ctx.context_id, final=True,
            status=TaskStatus(
                state=TaskState.completed,
                message=Message(
                    message_id=str(uuid.uuid4()), role=Role.agent,
                    parts=[TextPart(text=result["messages"][-1].content or "")],
                ),
            ),
        ))

    async def cancel(self, ctx: RequestContext, queue: EventQueue) -> None:
        await queue.enqueue_event(TaskStatusUpdateEvent(
            task_id=ctx.task_id, context_id=ctx.context_id, final=True,
            status=TaskStatus(state=TaskState.canceled),
        ))

CARD = AgentCard(
    name="researcher",
    description="Travel researcher with tool use",
    url="http://localhost:9867",
    version="1.0.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text/plain"],
    default_output_modes=["text/plain"],
    skills=[AgentSkill(id="qa", name="Q&A", description="Answer questions", tags=[])],
)


# --- Agent A: orchestrator (LangGraph) ---

class OrchestratorState(TypedDict):
    user_query: str
    research_query: str
    response: str
    plan: str

def build_orchestrator(client):
    async def plan(state: OrchestratorState) -> dict:
        prompt = (
            "Formulate a travel research question."
            " Reply with ONLY the question."
        )
        result = await create_agent(
            llm, system_prompt=prompt,
        ).ainvoke({"messages": [("user", state["user_query"])]})
        return {"research_query": result["messages"][-1].content}

    async def delegate(state: OrchestratorState) -> dict:
        msg = Message(
            message_id=str(uuid.uuid4()), role=Role.user,
            parts=[TextPart(text=state["research_query"])],
            context_id="session-1",
        )
        async for event in client.send_message(msg):
            if isinstance(event, tuple):
                task = event[0]
                is_done = (
                    task.status
                    and task.status.state == TaskState.completed
                    and task.status.message
                )
                if is_done:
                    text = getattr(
                        task.status.message.parts[0].root,
                        "text", "",
                    )
                    return {"response": text}
        return {"response": ""}

    async def synthesize(state: OrchestratorState) -> dict:
        prompt = "Create a brief 3-day itinerary from the research."
        user_msg = (
            f"Research:\n{state['response']}"
            "\n\nCreate itinerary."
        )
        result = await create_agent(
            llm, system_prompt=prompt,
        ).ainvoke({"messages": [("user", user_msg)]})
        return {"plan": result["messages"][-1].content}

    graph = StateGraph(OrchestratorState)
    graph.add_node("plan", plan)
    graph.add_node("delegate", delegate)
    graph.add_node("synthesize", synthesize)
    graph.add_edge(START, "plan")
    graph.add_edge("plan", "delegate")
    graph.add_edge("delegate", "synthesize")
    graph.add_edge("synthesize", END)
    return graph.compile()


# --- Run both agents ---

async def main():
    # Start Agent B
    app = Starlette()
    A2AStarletteApplication(
        agent_card=CARD,
        http_handler=DefaultRequestHandler(
            agent_executor=ResearcherExecutor(),
            task_store=InMemoryTaskStore(),
            queue_manager=InMemoryQueueManager(),
        ),
    ).add_routes_to_app(app)
    server = uvicorn.Server(uvicorn.Config(app, port=9867, log_level="warning"))
    server_task = asyncio.create_task(server.serve())
    await asyncio.sleep(1)

    # Run Agent A
    client = ClientFactory(
        config=ClientConfig(
            streaming=True,
            httpx_client=httpx.AsyncClient(
                timeout=httpx.Timeout(120),
            ),
        ),
    ).create(CARD)
    result = await build_orchestrator(client).ainvoke({
        "user_query": "Plan a 3-day trip to Paris",
        "research_query": "",
        "response": "",
        "plan": "",
    })
    print(result["plan"])

    server.should_exit = True
    await server_task
    provider.shutdown()

asyncio.run(main())