> ## Documentation Index
> Fetch the complete documentation index at: https://docs.galileo.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# A2A

> Trace A2A (Agent2Agent) protocol interactions with Galileo for multi-agent observability

Galileo traces [A2A (Agent2Agent)](https://a2a-protocol.org/) interactions using the `galileo-a2a` instrumentor — giving you a single distributed trace across agents, including LLM calls, tool use, and cross-agent handoffs.

## Setup

<Steps>
  <Step title="Installation">
    Add the OpenTelemetry packages to your project:

    <CodeGroup>
      ```bash Terminal theme={null}
      pip install opentelemetry-api opentelemetry-sdk \
                  opentelemetry-exporter-otlp
      ```
    </CodeGroup>

    The `opentelemetry-api` and `opentelemetry-sdk` packages provide the core OpenTelemetry functionality. The `opentelemetry-exporter-otlp` package enables sending traces to Galileo's OTLP endpoint.
  </Step>

  <Step title="Create environment variables for your Galileo settings">
    Set environment variables for your Galileo settings, for example in a `.env` file.
    These environment variables are consumed by the `GalileoSpanProcessor`to authenticate
    and route traces to the correct Galileo Project and Log stream:

    {/*<!-- markdownlint-enable MD044 -->*/}

    <CodeGroup>
      ```ini .env theme={null}
      # Your Galileo API key
      GALILEO_API_KEY="your-galileo-api-key"

      # Your Galileo project name
      GALILEO_PROJECT="your-galileo-project-name"

      # The name of the Log stream you want to use for logging
      GALILEO_LOG_STREAM="your-galileo-log-stream "

      # Provide the console url below if you are using a
      # custom deployment, and not using the free tier, or app.galileo.ai.
      # This will look something like “console.galileo.yourcompany.com”.
      # GALILEO_CONSOLE_URL="your-galileo-console-url"
      ```
    </CodeGroup>
  </Step>

  <Step title="Self hosted deployments: Set the OTel endpoint">
    <Note>
      Skip this step if you are using Galileo Cloud.
    </Note>

    The OTel endpoint is different from Galileo's regular API endpoint and is specifically designed to receive telemetry data in the OTLP format.

    If you are using:

    * **Galileo Cloud** at [app.galileo.ai](https://app.galileo.ai), then you don't need to provide a custom OTel endpoint.
      The default endpoint `https://api.galileo.ai/otel/traces` will be used automatically.

    * A **self-hosted Galileo deployment**, replace the `https://api.galileo.ai/otel/traces` endpoint with your deployment URL. The format of this URL is based on your console URL, replacing `console` with `api` and appending `/otel/traces`.

    For example:

    * if your console URL is `https://console.galileo.example.com`, the OTel endpoint would be `https://api.galileo.example.com/otel/traces`
    * if your console URL is `https://console-galileo.apps.mycompany.com`, the OTel endpoint would be `https://api-galileo.apps.mycompany.com/otel/traces`

    The convention is to store this in the `GALILEO_CONSOLE_URL` environment variable. For example:

    <CodeGroup>
      ```python Python theme={null}
      os.environ["GALILEO_CONSOLE_URL"] = "https://api.galileo.ai"
      ```
    </CodeGroup>
  </Step>

  <Step title="Initialize and create the Galileo span processor">
    The `GalileoSpanProcessor` automatically configures authentication
    and metadata using your environment variables. It also:

    * Auto-builds OTLP headers using your Galileo credentials
    * Configures the correct OTLP trace endpoint
    * Registers a batch span processor that exports traces to Galileo

    <CodeGroup>
      ```python Python theme={null}
      from galileo import otel  

      # GalileoSpanProcessor (no manual OTLP config required) loads the env vars for 
      # the Galileo API key, Project, and Log stream. Make sure to set them first. 
      galileo_span_processor = otel.GalileoSpanProcessor(
          # Optional parameters if not set, uses env var
          # project=os.environ["GALILEO_PROJECT"], 
          # logstream=os.environ.get("GALILEO_LOG_STREAM"),  
      )
      ```
    </CodeGroup>
  </Step>

  <Step title="Install and instrument A2A">
    <CodeGroup>
      ```bash Terminal theme={null}
      pip install galileo-a2a opentelemetry-instrumentation-langchain
      ```
    </CodeGroup>

    <CodeGroup>
      ```python Python theme={null}
      from galileo_a2a import A2AInstrumentor
      from opentelemetry.instrumentation.langchain import LangchainInstrumentor

      # Trace A2A protocol
      A2AInstrumentor().instrument(tracer_provider=provider, agent_name="my-agent")

      # Trace LangChain/LangGraph agents, LLM calls, and tools
      LangchainInstrumentor().instrument(tracer_provider=provider)
      ```
    </CodeGroup>

    All `a2a-sdk` client and server methods are now traced automatically.
  </Step>
</Steps>

## How it works

When Agent A calls Agent B, trace context is propagated through A2A message metadata. Both agents' spans appear in **one distributed trace**:

```text theme={null}
Agent A: LangGraph orchestrator             Agent B: LangChain researcher
+-----------------------------------+       +------------------------------+
| invoke_agent LangGraph            |       | a2a.server.on_message_send   |
|  +-- plan (LLM)                   |       |  +-- invoke_agent LangGraph  |
|  +-- delegate                     |  a2a  |     +-- LLM + tool_calls     |
|  |    +-- a2a.client.send_message |-----> |     +-- search_kb (tool)     |
|  +-- synthesize (LLM)             |  ctx  |     +-- LLM final answer     |
+-----------------------------------+       +------------------------------+
                            Single trace in Galileo
```

A2A `context_id` is mapped to `session.id`, grouping all interactions in the same conversation into one Galileo session.

## Full example

Two LangGraph agents in one script. Agent A (orchestrator) plans, delegates to Agent B via A2A, then synthesizes. Agent B (researcher) uses a tool and LLM to answer. Copy-paste and run.

<CodeGroup>
  ```bash Terminal theme={null}
  pip install galileo-a2a "galileo[otel]" langchain langchain-openai langgraph \
      opentelemetry-instrumentation-langchain uvicorn starlette sse-starlette \
      python-dotenv
  ```
</CodeGroup>

```ini .env theme={null}
GALILEO_API_KEY=your-galileo-api-key
GALILEO_PROJECT=your-galileo-project
GALILEO_LOG_STREAM=your-log-stream
OPENAI_API_KEY=your-openai-api-key
GALILEO_CONSOLE_URL=your-galileo-cluster-url
```

<CodeGroup>
  ```python Python theme={null}
  import asyncio
  import uuid

  import httpx
  import uvicorn
  from a2a.client import ClientConfig, ClientFactory
  from a2a.server.agent_execution import AgentExecutor, RequestContext
  from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication
  from a2a.server.events import EventQueue, InMemoryQueueManager
  from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
  from a2a.server.tasks import InMemoryTaskStore
  from a2a.types import (
      AgentCapabilities, AgentCard, AgentSkill, Message, Role,
      TaskState, TaskStatus, TaskStatusUpdateEvent, TextPart,
  )
  from dotenv import load_dotenv
  from galileo.otel import GalileoSpanProcessor, add_galileo_span_processor
  from galileo_a2a import A2AInstrumentor
  from langchain.agents import create_agent
  from langchain_core.tools import tool
  from langchain_openai import ChatOpenAI
  from langgraph.graph import END, START, StateGraph
  from opentelemetry.instrumentation.langchain import LangchainInstrumentor
  from opentelemetry.sdk.trace import TracerProvider
  from starlette.applications import Starlette
  from typing_extensions import TypedDict

  # Load GALILEO_API_KEY, GALILEO_CONSOLE_URL, OPENAI_API_KEY, etc. from .env
  # into the process environment before constructing GalileoSpanProcessor.
  load_dotenv()

  # --- Galileo tracing setup ---
  provider = TracerProvider()
  add_galileo_span_processor(provider, GalileoSpanProcessor())
  A2AInstrumentor().instrument(tracer_provider=provider, agent_name="orchestrator")
  LangchainInstrumentor().instrument(tracer_provider=provider)

  llm = ChatOpenAI(model="gpt-4o-mini")


  # --- Agent B: researcher (served over A2A) ---

  @tool
  def search_kb(query: str) -> str:
      """Search the travel knowledge base."""
      if "paris" in query.lower():
          return "Eiffel Tower 330m, Louvre 9.6M visitors/yr, 20 arrondissements."
      return f"No results for: {query}"

  researcher = create_agent(
      llm, [search_kb],
      system_prompt="Use search_kb to find facts, then summarize for a traveler.",
  )

  class ResearcherExecutor(AgentExecutor):
      async def execute(self, ctx: RequestContext, queue: EventQueue) -> None:
          result = await researcher.ainvoke({"messages": [("user", ctx.get_user_input())]})
          await queue.enqueue_event(TaskStatusUpdateEvent(
              task_id=ctx.task_id, context_id=ctx.context_id, final=True,
              status=TaskStatus(
                  state=TaskState.completed,
                  message=Message(
                      message_id=str(uuid.uuid4()), role=Role.agent,
                      parts=[TextPart(text=result["messages"][-1].content or "")],
                  ),
              ),
          ))

      async def cancel(self, ctx: RequestContext, queue: EventQueue) -> None:
          await queue.enqueue_event(TaskStatusUpdateEvent(
              task_id=ctx.task_id, context_id=ctx.context_id, final=True,
              status=TaskStatus(state=TaskState.canceled),
          ))

  CARD = AgentCard(
      name="researcher",
      description="Travel researcher with tool use",
      url="http://localhost:9867",
      version="1.0.0",
      capabilities=AgentCapabilities(streaming=True),
      default_input_modes=["text/plain"],
      default_output_modes=["text/plain"],
      skills=[AgentSkill(id="qa", name="Q&A", description="Answer questions", tags=[])],
  )


  # --- Agent A: orchestrator (LangGraph) ---

  class OrchestratorState(TypedDict):
      user_query: str
      research_query: str
      response: str
      plan: str

  def build_orchestrator(client):
      async def plan(state: OrchestratorState) -> dict:
          prompt = (
              "Formulate a travel research question."
              " Reply with ONLY the question."
          )
          result = await create_agent(
              llm, system_prompt=prompt,
          ).ainvoke({"messages": [("user", state["user_query"])]})
          return {"research_query": result["messages"][-1].content}

      async def delegate(state: OrchestratorState) -> dict:
          msg = Message(
              message_id=str(uuid.uuid4()), role=Role.user,
              parts=[TextPart(text=state["research_query"])],
              context_id="session-1",
          )
          async for event in client.send_message(msg):
              if isinstance(event, tuple):
                  task = event[0]
                  is_done = (
                      task.status
                      and task.status.state == TaskState.completed
                      and task.status.message
                  )
                  if is_done:
                      text = getattr(
                          task.status.message.parts[0].root,
                          "text", "",
                      )
                      return {"response": text}
          return {"response": ""}

      async def synthesize(state: OrchestratorState) -> dict:
          prompt = "Create a brief 3-day itinerary from the research."
          user_msg = (
              f"Research:\n{state['response']}"
              "\n\nCreate itinerary."
          )
          result = await create_agent(
              llm, system_prompt=prompt,
          ).ainvoke({"messages": [("user", user_msg)]})
          return {"plan": result["messages"][-1].content}

      graph = StateGraph(OrchestratorState)
      graph.add_node("plan", plan)
      graph.add_node("delegate", delegate)
      graph.add_node("synthesize", synthesize)
      graph.add_edge(START, "plan")
      graph.add_edge("plan", "delegate")
      graph.add_edge("delegate", "synthesize")
      graph.add_edge("synthesize", END)
      return graph.compile()


  # --- Run both agents ---

  async def main():
      # Start Agent B
      app = Starlette()
      A2AStarletteApplication(
          agent_card=CARD,
          http_handler=DefaultRequestHandler(
              agent_executor=ResearcherExecutor(),
              task_store=InMemoryTaskStore(),
              queue_manager=InMemoryQueueManager(),
          ),
      ).add_routes_to_app(app)
      server = uvicorn.Server(uvicorn.Config(app, port=9867, log_level="warning"))
      server_task = asyncio.create_task(server.serve())
      await asyncio.sleep(1)

      # Run Agent A
      client = ClientFactory(
          config=ClientConfig(
              streaming=True,
              httpx_client=httpx.AsyncClient(
                  timeout=httpx.Timeout(120),
              ),
          ),
      ).create(CARD)
      result = await build_orchestrator(client).ainvoke({
          "user_query": "Plan a 3-day trip to Paris",
          "research_query": "",
          "response": "",
          "plan": "",
      })
      print(result["plan"])

      server.should_exit = True
      await server_task
      provider.shutdown()

  asyncio.run(main())
  ```
</CodeGroup>
