Documentation Index
Fetch the complete documentation index at: https://docs.galileo.ai/llms.txt
Use this file to discover all available pages before exploring further.
Module
Distributed tracing middleware for Starlette-based applications.
This middleware automatically extracts distributed tracing headers from incoming HTTP requests
and makes them available to the Galileo logger within request handlers.
Works with any ASGI framework built on Starlette:
- FastAPI
- Starlette
- Any other Starlette-based framework
Example usage with FastAPI:
from fastapi import FastAPI
from galileo.middleware import TracingMiddleware, get_request_logger
app = FastAPI()
app.add_middleware(TracingMiddleware)
@app.post("/process")
async def process_request(data: dict):
# Logger automatically continues the distributed trace
logger = get_request_logger()
logger.add_workflow_span(input=str(data), name="process_workflow")
# ... process request ...
logger.conclude(output="done")
return {"status": "success"}
Example usage with Starlette:
from starlette.applications import Starlette
from starlette.routing import Route
from galileo.middleware import TracingMiddleware, get_request_logger
async def homepage(request):
logger = get_request_logger()
logger.add_workflow_span(input="homepage", name="homepage_handler")
logger.conclude(output="success")
return {"status": "ok"}
app = Starlette(
routes=[Route("/", homepage)],
middleware=[TracingMiddleware]
)
TracingMiddleware
Middleware that extracts distributed tracing headers from incoming requests.
This middleware looks for the following headers in incoming HTTP requests:
- X-Galileo-Trace-ID: The root trace ID
- X-Galileo-Parent-ID: The parent span/trace ID to attach to
These values are stored in context variables, making them available to request
handlers via the get_request_logger() function.
The middleware is compatible with FastAPI and any Starlette-based framework.
Note: Project and log_stream are configured per service via environment variables
(GALILEO_PROJECT and GALILEO_LOG_STREAM). They are not propagated via headers,
following standard distributed tracing patterns.
dispatch
async def dispatch(self,
request: Request,
call_next: RequestResponseEndpoint) -> Response
Process the request and extract tracing headers.
Arguments
request (Request): The incoming HTTP request
call_next (RequestResponseEndpoint): The next middleware or route handler
Returns
Response: The HTTP response
get_request_logger
def get_request_logger() -> GalileoLogger
Get a request-scoped GalileoLogger configured for distributed mode.
Note: Distributed mode enables distributed tracing across services by propagating
trace context and sending updates immediately to the backend.
This function should be called within a request handler after the TracingMiddleware has
been registered. It creates a new GalileoLogger instance per request that automatically
continues the distributed trace from the upstream service.
The logger is configured using trace context extracted by the middleware:
- X-Galileo-Trace-ID: Root trace ID
- X-Galileo-Parent-ID: Parent span/trace ID to attach to
Project and log_stream are configured per service via environment variables
(GALILEO_PROJECT and GALILEO_LOG_STREAM), not propagated via headers, following
standard distributed tracing patterns.
If no tracing headers were present in the request, a regular logger is returned
(using GALILEO_PROJECT and GALILEO_LOG_STREAM env vars).
Note: This creates a new logger per request, unlike the decorator’s get_logger_instance()
which uses a singleton pattern.
Returns
GalileoLogger: A logger instance configured for the current request’s trace context
Examples
@app.post("/process")
async def process_request(data: dict):
logger = get_request_logger()
# This span will be attached to the distributed trace
logger.add_workflow_span(input=str(data), name="process_workflow")
result = await process(data)
logger.conclude(output=str(result))
logger.flush()
logger.terminate()
return {"result": result}
@app.post("/retrieve")
async def retrieve_endpoint(query: str):
# Get logger with trace context from upstream service
logger = get_request_logger()
# If trace context exists, this creates a workflow span
# Otherwise, it starts a new trace
if logger.trace_id:
logger.add_workflow_span(input=query, name="retrieval_service")
else:
logger.start_trace(input=query, name="retrieval_service")
results = retrieve(query, logger)
logger.conclude(output=str(results))
logger.flush()
logger.terminate()
return {"results": results}