Back
AI Engineering··16 min

Agent Observability Series — Blog 2: Tracing Multi-Agent Research Pipelines

4 parallel researchers, each making 5 Tavily searches and 2 Bedrock calls. That's 28 operations in 16 seconds. Without tracing, debugging is guesswork. Build hierarchical traces that show every span.

Agent Observability Series — Blog 2: Tracing Multi-Agent Research Pipelines#

4 parallel researchers, each making 5 Tavily searches and 2 Bedrock calls. That's 28 operations in 16 seconds. Without tracing, debugging is guesswork.

In Blog 1, we built the SpanData class and the trace_span() context manager — the foundation for wrapping any operation in a timed, attributed span. Now we need a place to put those spans. A single research pipeline produces 15-30 spans. We need to collect them into a trace, persist them to disk, load them on restart, and serve them to the dashboard via REST API.

This post covers the TraceCollector — the storage layer that turns ephemeral spans into queryable trace history.


The Agent Observability Series#

PartTitleFocus
1Architecture & OpenTelemetry FoundationsSystem design, gen_ai.* conventions, span model
2Tracing Multi-Agent Research Pipelines (this post)Hierarchical traces, TraceCollector, dashboard API
3Automated Evaluation with LLM-as-Judge5-criteria scoring, quality trends
4Token Cost Tracking & Budget AlertsPer-model pricing, stage breakdown, budget alerts
5Production Dashboard & Anomaly Detection4-panel dashboard, Z-score anomaly detection

Hierarchical Trace Structure#

A single research query produces a tree of spans. Here's what a COMPLEX query looks like:

research_pipeline (12,400ms)
├── decomposition (800ms)
│   └── bedrock_call (780ms)
│       gen_ai.request.model: claude-haiku-4.5
│       gen_ai.usage.input_tokens: 420
│       gen_ai.usage.output_tokens: 180
├── researcher_1 (4,200ms)
│   ├── tavily_search (1,100ms)
│   ├── tavily_search (900ms)
│   ├── tavily_search (1,050ms)
│   └── bedrock_call (1,150ms)
│       gen_ai.usage.input_tokens: 2,800
│       gen_ai.usage.output_tokens: 650
├── researcher_2 (3,800ms)
│   ├── tavily_search (950ms)
│   ├── tavily_search (1,200ms)
│   └── bedrock_call (1,100ms)
├── researcher_3 (3,600ms)
│   ├── tavily_search (800ms)
│   ├── tavily_search (1,000ms)
│   └── bedrock_call (980ms)
├── researcher_4 (4,100ms)
│   ├── tavily_search (1,050ms)
│   ├── tavily_search (1,150ms)
│   └── bedrock_call (1,200ms)
├── deduplication (12ms)
├── critique (1,100ms)
│   └── bedrock_call (1,080ms)
└── synthesis (2,100ms)
    └── bedrock_call (2,050ms)
        gen_ai.usage.input_tokens: 4,200
        gen_ai.usage.output_tokens: 1,800

The researchers run in parallel (asyncio tasks), so the total wall time is ~12.4 seconds even though the sum of all spans is ~32 seconds. The trace captures both — individual span durations for identifying slow operations, and the parent span duration for the actual wall time.

Each level in the hierarchy answers a different debugging question:

LevelQuestion It Answers
research_pipelineHow long did the entire request take?
researcher_NWhich researcher was slowest? Why?
tavily_searchIs one search query returning slowly?
bedrock_callHow many tokens did this LLM call use?
critique / synthesisWhere is post-processing time going?

The TraceRecord Dataclass#

While SpanData represents a single operation, TraceRecord represents an entire research run — all its spans, plus metadata:

@dataclass
class TraceRecord:
    """A complete trace with all its spans."""
    trace_id: str
    query: str
    start_time: str
    end_time: str = ""
    duration_ms: float = 0.0
    total_tokens: int = 0
    total_cost: float = 0.0
    spans: list[dict] = field(default_factory=list)
    evaluation: dict = field(default_factory=dict)
    complexity: str = ""
    status: str = "ok"

A TraceRecord is the unit of storage. After each research pipeline completes, app.py creates one from the accumulated spans and hands it to the collector:

collector.store_trace(TraceRecord(
    trace_id=trace_id,
    query=message,
    start_time=datetime.fromtimestamp(research_start, tz=timezone.utc).isoformat(),
    end_time=datetime.now(timezone.utc).isoformat(),
    duration_ms=round(research_duration, 1),
    total_tokens=engineered_tokens,
    total_cost=request_cost.get("total_cost", 0),
    spans=[s.to_dict() for s in spans],
    evaluation=evaluation,
    complexity=complexity.level.value,
))

The spans field stores serialized SpanData dicts, so each trace file is self-contained — you can read a single JSON file and reconstruct the entire trace tree.


The TraceCollector#

The collector is the storage layer. It persists traces to JSON files and keeps the most recent 100 in memory for fast dashboard queries:

class TraceCollector:
    """Collects and persists traces to local JSON files."""

    def __init__(self, storage_dir: str = "./data/traces"):
        self.storage_dir = Path(storage_dir)
        self.storage_dir.mkdir(parents=True, exist_ok=True)
        self._recent_traces: list[TraceRecord] = []
        self._max_recent = 100
        self._load_recent()

    def store_trace(self, record: TraceRecord) -> None:
        """Store a trace record to disk and keep in memory."""
        # Save to file
        filename = f"{record.trace_id}.json"
        filepath = self.storage_dir / filename
        try:
            filepath.write_text(json.dumps(asdict(record), indent=2, default=str))
        except Exception as e:
            logger.warning("trace_save_failed", error=str(e))

        # Keep in recent memory
        self._recent_traces.insert(0, record)
        if len(self._recent_traces) > self._max_recent:
            self._recent_traces = self._recent_traces[:self._max_recent]

Each trace becomes a JSON file named by its trace_id — for example, data/traces/a3f2b8c1e4d7.json. The files are human-readable, git-friendly, and trivially inspectable with cat or jq. No database, no schema migrations, no connection pooling.

The in-memory list (_recent_traces) is an LRU-style buffer. New traces are inserted at position 0, and the list is trimmed to 100. This means the dashboard always has the 100 most recent traces available without touching disk.

Retrieving Traces#

The collector supports two access patterns — list (for the dashboard table) and get (for the detail view):

def get_trace(self, trace_id: str) -> TraceRecord | None:
    """Get a specific trace by ID."""
    # Check memory first
    for t in self._recent_traces:
        if t.trace_id == trace_id:
            return t

    # Check disk
    filepath = self.storage_dir / f"{trace_id}.json"
    if filepath.exists():
        try:
            data = json.loads(filepath.read_text())
            return TraceRecord(**data)
        except Exception as e:
            logger.warning("trace_load_failed", trace_id=trace_id, error=str(e))
    return None

def list_traces(self, limit: int = 20, offset: int = 0) -> list[dict]:
    """List recent traces with summary info."""
    traces = self._recent_traces[offset:offset + limit]
    return [
        {
            "trace_id": t.trace_id,
            "query": t.query[:80],
            "duration_ms": t.duration_ms,
            "total_tokens": t.total_tokens,
            "total_cost": t.total_cost,
            "complexity": t.complexity,
            "span_count": len(t.spans),
            "status": t.status,
            "start_time": t.start_time,
            "evaluation_score": t.evaluation.get("overall_score", None),
        }
        for t in traces
    ]

list_traces() returns summary dicts — just enough data for the dashboard table without loading every span. get_trace() checks memory first (O(n) scan of 100 items — microseconds), then falls back to disk. For a local dev tool, this is more than fast enough.


Aggregated Statistics#

The collector computes statistics across all recent traces — the kind of aggregate view you'd normally need Prometheus or CloudWatch for:

def get_stats(self) -> dict:
    """Get aggregated statistics across all recent traces."""
    if not self._recent_traces:
        return {"total_traces": 0}

    durations = [t.duration_ms for t in self._recent_traces]
    tokens = [t.total_tokens for t in self._recent_traces]
    costs = [t.total_cost for t in self._recent_traces]
    scores = [
        t.evaluation.get("overall_score", 0)
        for t in self._recent_traces
        if t.evaluation.get("overall_score")
    ]

    return {
        "total_traces": len(self._recent_traces),
        "avg_duration_ms": round(sum(durations) / len(durations), 1),
        "p50_duration_ms": round(sorted(durations)[len(durations) // 2], 1),
        "p95_duration_ms": round(
            sorted(durations)[int(len(durations) * 0.95)], 1
        ) if len(durations) >= 20 else None,
        "avg_tokens": round(sum(tokens) / len(tokens)),
        "total_cost": round(sum(costs), 4),
        "avg_cost": round(sum(costs) / len(costs), 4),
        "avg_quality_score": round(sum(scores) / len(scores), 2) if scores else None,
    }

This gives you the numbers that matter:

  • p50 duration: "A typical request takes 11.2 seconds."
  • p95 duration: "95% of requests complete within 24.8 seconds." (Only computed when you have 20+ traces for statistical significance.)
  • avg quality score: "Our average quality score is 3.6 out of 5."
  • total cost: "We've spent $0.47 across 28 research runs today."

These stats are computed on the fly from the in-memory buffer. No pre-aggregation, no materialized views. When you have 100 traces, sorting 100 numbers takes microseconds.


Loading Traces on Startup#

When the server restarts, the collector needs to rebuild its in-memory buffer from disk. The _load_recent() method handles this:

def _load_recent(self) -> None:
    """Load recent traces from disk on startup."""
    try:
        files = sorted(
            self.storage_dir.glob("*.json"),
            key=lambda f: f.stat().st_mtime,
            reverse=True,
        )
        for f in files[:self._max_recent]:
            try:
                data = json.loads(f.read_text())
                self._recent_traces.append(TraceRecord(**data))
            except Exception:
                continue
        logger.info("traces_loaded", count=len(self._recent_traces))
    except Exception as e:
        logger.warning("traces_load_failed", error=str(e))

It globs for JSON files, sorts by modification time (most recent first), and loads up to 100. Corrupt files are silently skipped — defensive parsing is critical for a tool that stores data on a developer's local disk. Files get corrupted, directories get moved, format changes happen. The collector just keeps working with whatever it can load.


The Dashboard API#

The dashboard needs data from multiple sources — traces from the collector, cost data from the cost tracker, anomalies from the anomaly detector. The dashboard_api.py module bundles these into clean REST endpoints:

router = APIRouter(prefix="/api/dashboard", tags=["dashboard"])

@router.get("/traces")
async def list_traces(limit: int = 20, offset: int = 0):
    """List recent traces with summary info."""
    collector = get_collector()
    return {
        "traces": collector.list_traces(limit=limit, offset=offset),
        "total": len(collector._recent_traces),
    }

@router.get("/traces/{trace_id}")
async def get_trace(trace_id: str):
    """Get a specific trace with all spans."""
    collector = get_collector()
    trace = collector.get_trace(trace_id)
    if not trace:
        return {"error": "Trace not found"}
    from dataclasses import asdict
    return asdict(trace)

@router.get("/stats")
async def get_stats():
    """Get aggregated statistics."""
    _seed_anomaly_detector()
    collector = get_collector()
    cost_tracker = get_cost_tracker()
    return {
        "traces": collector.get_stats(),
        "cost": cost_tracker.get_summary(),
        "anomalies": _anomaly_detector.get_stats(),
    }

The /stats endpoint is the one the dashboard calls on load. It returns a combined view: trace stats (p50, p95, avg duration), cost stats (daily total, average per request), and anomaly stats (total detected, breakdown by type). One API call, one render.

The Seeding Pattern#

The anomaly detector uses Z-scores to flag outliers, but Z-scores need a baseline — you need at least 5-10 data points before "this request is 3 standard deviations above the mean" is meaningful. When the server restarts, the anomaly detector's history is empty.

The _seed_anomaly_detector() function solves this by feeding historical trace data into the detector on first access:

def _seed_anomaly_detector():
    """Seed the anomaly detector with existing trace data on first access."""
    global _seeded
    if _seeded:
        return
    _seeded = True

    collector = get_collector()
    cost_tracker = get_cost_tracker()

    for trace in reversed(collector._recent_traces):
        _anomaly_detector.check("latency", trace.duration_ms, query=trace.query[:80])
        _anomaly_detector.check("cost", trace.total_cost, query=trace.query[:80])
        if trace.evaluation and trace.evaluation.get("overall_score"):
            _anomaly_detector.check("quality", trace.evaluation["overall_score"], query=trace.query[:80])

    # Clear any anomalies from seeding — we only want new ones
    _anomaly_detector._anomalies.clear()

The key detail: after seeding, it clears the anomaly list. The seeding is purely to build up the running mean and standard deviation — we don't want 50 false "anomalies" from historical data cluttering the alert panel. Only new requests that deviate from the historical baseline will trigger alerts.

The traces are processed in reverse order (reversed()) because the collector stores most-recent-first, but the anomaly detector needs chronological order to build its running statistics correctly.


What a Trace Looks Like on Disk#

Here's a real trace JSON file from data/traces/a3f2b8c1e4d7.json:

{
  "trace_id": "a3f2b8c1e4d7",
  "query": "Compare the top 3 Python web frameworks for building APIs in 2026",
  "start_time": "2026-04-10T14:23:08.112000+00:00",
  "end_time": "2026-04-10T14:23:20.534000+00:00",
  "duration_ms": 12422.0,
  "total_tokens": 8420,
  "total_cost": 0.0312,
  "complexity": "complex",
  "status": "ok",
  "spans": [
    {
      "span_id": "f8a2c1b3d4e5",
      "parent_id": "",
      "trace_id": "a3f2b8c1e4d7",
      "name": "research_pipeline",
      "start_time": 1712758988.112,
      "end_time": 1712758999.534,
      "duration_ms": 12422.0,
      "status": "ok",
      "attributes": {
        "gen_ai.request.model": "claude-haiku-4.5",
        "query": "Compare the top 3 Python web frameworks...",
        "complexity": "complex"
      },
      "events": []
    },
    {
      "span_id": "b2c3d4e5f6a7",
      "parent_id": "f8a2c1b3d4e5",
      "trace_id": "a3f2b8c1e4d7",
      "name": "researcher_1",
      "duration_ms": 4200.0,
      "attributes": {
        "gen_ai.usage.input_tokens": 2800,
        "gen_ai.usage.output_tokens": 650
      }
    }
  ],
  "evaluation": {
    "overall_score": 3.8,
    "accuracy": {"score": 4, "reason": "Facts are well-supported by cited sources"},
    "completeness": {"score": 3, "reason": "Covers main frameworks but misses ASGI details"},
    "citations": {"score": 4, "reason": "All claims have URL citations"},
    "coherence": {"score": 4, "reason": "Well-structured comparison format"},
    "relevance": {"score": 4, "reason": "Directly answers the comparison question"}
  }
}

Everything in one file. The trace, its spans, the evaluation scores, the cost. You can grep across these files, pipe them through jq, or load them into a Jupyter notebook for analysis. No database needed.


What's Next#

In Blog 3: Automated Evaluation with LLM-as-Judge, we'll build the evaluation system that scores every research output on 5 criteria — accuracy, completeness, citations, coherence, and relevance. We'll walk through the judge prompt, the evaluate_research() function, and how the evaluation integrates into the pipeline at ~$0.001 per assessment.


This project is a fork of the Context Engine, which is a fork of the Deep Research Agent — read those series first for the base pipeline.

All code is open source: github.com/MinhQuanBuiSco/agent-observability