Agent Observability Series — Blog 2: Tracing Multi-Agent Research Pipelines#
4 parallel researchers, each making 5 Tavily searches and 2 Bedrock calls. That's 28 operations in 16 seconds. Without tracing, debugging is guesswork.
In Blog 1, we built the SpanData class and the trace_span() context manager — the foundation for wrapping any operation in a timed, attributed span. Now we need a place to put those spans. A single research pipeline produces 15-30 spans. We need to collect them into a trace, persist them to disk, load them on restart, and serve them to the dashboard via REST API.
This post covers the TraceCollector — the storage layer that turns ephemeral spans into queryable trace history.
The Agent Observability Series#
| Part | Title | Focus |
|---|---|---|
| 1 | Architecture & OpenTelemetry Foundations | System design, gen_ai.* conventions, span model |
| 2 | Tracing Multi-Agent Research Pipelines (this post) | Hierarchical traces, TraceCollector, dashboard API |
| 3 | Automated Evaluation with LLM-as-Judge | 5-criteria scoring, quality trends |
| 4 | Token Cost Tracking & Budget Alerts | Per-model pricing, stage breakdown, budget alerts |
| 5 | Production Dashboard & Anomaly Detection | 4-panel dashboard, Z-score anomaly detection |
Hierarchical Trace Structure#
A single research query produces a tree of spans. Here's what a COMPLEX query looks like:
research_pipeline (12,400ms)
├── decomposition (800ms)
│ └── bedrock_call (780ms)
│ gen_ai.request.model: claude-haiku-4.5
│ gen_ai.usage.input_tokens: 420
│ gen_ai.usage.output_tokens: 180
├── researcher_1 (4,200ms)
│ ├── tavily_search (1,100ms)
│ ├── tavily_search (900ms)
│ ├── tavily_search (1,050ms)
│ └── bedrock_call (1,150ms)
│ gen_ai.usage.input_tokens: 2,800
│ gen_ai.usage.output_tokens: 650
├── researcher_2 (3,800ms)
│ ├── tavily_search (950ms)
│ ├── tavily_search (1,200ms)
│ └── bedrock_call (1,100ms)
├── researcher_3 (3,600ms)
│ ├── tavily_search (800ms)
│ ├── tavily_search (1,000ms)
│ └── bedrock_call (980ms)
├── researcher_4 (4,100ms)
│ ├── tavily_search (1,050ms)
│ ├── tavily_search (1,150ms)
│ └── bedrock_call (1,200ms)
├── deduplication (12ms)
├── critique (1,100ms)
│ └── bedrock_call (1,080ms)
└── synthesis (2,100ms)
└── bedrock_call (2,050ms)
gen_ai.usage.input_tokens: 4,200
gen_ai.usage.output_tokens: 1,800
The researchers run in parallel (asyncio tasks), so the total wall time is ~12.4 seconds even though the sum of all spans is ~32 seconds. The trace captures both — individual span durations for identifying slow operations, and the parent span duration for the actual wall time.
Each level in the hierarchy answers a different debugging question:
| Level | Question It Answers |
|---|---|
research_pipeline | How long did the entire request take? |
researcher_N | Which researcher was slowest? Why? |
tavily_search | Is one search query returning slowly? |
bedrock_call | How many tokens did this LLM call use? |
critique / synthesis | Where is post-processing time going? |
The TraceRecord Dataclass#
While SpanData represents a single operation, TraceRecord represents an entire research run — all its spans, plus metadata:
@dataclass class TraceRecord: """A complete trace with all its spans.""" trace_id: str query: str start_time: str end_time: str = "" duration_ms: float = 0.0 total_tokens: int = 0 total_cost: float = 0.0 spans: list[dict] = field(default_factory=list) evaluation: dict = field(default_factory=dict) complexity: str = "" status: str = "ok"
A TraceRecord is the unit of storage. After each research pipeline completes, app.py creates one from the accumulated spans and hands it to the collector:
collector.store_trace(TraceRecord( trace_id=trace_id, query=message, start_time=datetime.fromtimestamp(research_start, tz=timezone.utc).isoformat(), end_time=datetime.now(timezone.utc).isoformat(), duration_ms=round(research_duration, 1), total_tokens=engineered_tokens, total_cost=request_cost.get("total_cost", 0), spans=[s.to_dict() for s in spans], evaluation=evaluation, complexity=complexity.level.value, ))
The spans field stores serialized SpanData dicts, so each trace file is self-contained — you can read a single JSON file and reconstruct the entire trace tree.
The TraceCollector#
The collector is the storage layer. It persists traces to JSON files and keeps the most recent 100 in memory for fast dashboard queries:
class TraceCollector: """Collects and persists traces to local JSON files.""" def __init__(self, storage_dir: str = "./data/traces"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) self._recent_traces: list[TraceRecord] = [] self._max_recent = 100 self._load_recent() def store_trace(self, record: TraceRecord) -> None: """Store a trace record to disk and keep in memory.""" # Save to file filename = f"{record.trace_id}.json" filepath = self.storage_dir / filename try: filepath.write_text(json.dumps(asdict(record), indent=2, default=str)) except Exception as e: logger.warning("trace_save_failed", error=str(e)) # Keep in recent memory self._recent_traces.insert(0, record) if len(self._recent_traces) > self._max_recent: self._recent_traces = self._recent_traces[:self._max_recent]
Each trace becomes a JSON file named by its trace_id — for example, data/traces/a3f2b8c1e4d7.json. The files are human-readable, git-friendly, and trivially inspectable with cat or jq. No database, no schema migrations, no connection pooling.
The in-memory list (_recent_traces) is an LRU-style buffer. New traces are inserted at position 0, and the list is trimmed to 100. This means the dashboard always has the 100 most recent traces available without touching disk.
Retrieving Traces#
The collector supports two access patterns — list (for the dashboard table) and get (for the detail view):
def get_trace(self, trace_id: str) -> TraceRecord | None: """Get a specific trace by ID.""" # Check memory first for t in self._recent_traces: if t.trace_id == trace_id: return t # Check disk filepath = self.storage_dir / f"{trace_id}.json" if filepath.exists(): try: data = json.loads(filepath.read_text()) return TraceRecord(**data) except Exception as e: logger.warning("trace_load_failed", trace_id=trace_id, error=str(e)) return None def list_traces(self, limit: int = 20, offset: int = 0) -> list[dict]: """List recent traces with summary info.""" traces = self._recent_traces[offset:offset + limit] return [ { "trace_id": t.trace_id, "query": t.query[:80], "duration_ms": t.duration_ms, "total_tokens": t.total_tokens, "total_cost": t.total_cost, "complexity": t.complexity, "span_count": len(t.spans), "status": t.status, "start_time": t.start_time, "evaluation_score": t.evaluation.get("overall_score", None), } for t in traces ]
list_traces() returns summary dicts — just enough data for the dashboard table without loading every span. get_trace() checks memory first (O(n) scan of 100 items — microseconds), then falls back to disk. For a local dev tool, this is more than fast enough.
Aggregated Statistics#
The collector computes statistics across all recent traces — the kind of aggregate view you'd normally need Prometheus or CloudWatch for:
def get_stats(self) -> dict: """Get aggregated statistics across all recent traces.""" if not self._recent_traces: return {"total_traces": 0} durations = [t.duration_ms for t in self._recent_traces] tokens = [t.total_tokens for t in self._recent_traces] costs = [t.total_cost for t in self._recent_traces] scores = [ t.evaluation.get("overall_score", 0) for t in self._recent_traces if t.evaluation.get("overall_score") ] return { "total_traces": len(self._recent_traces), "avg_duration_ms": round(sum(durations) / len(durations), 1), "p50_duration_ms": round(sorted(durations)[len(durations) // 2], 1), "p95_duration_ms": round( sorted(durations)[int(len(durations) * 0.95)], 1 ) if len(durations) >= 20 else None, "avg_tokens": round(sum(tokens) / len(tokens)), "total_cost": round(sum(costs), 4), "avg_cost": round(sum(costs) / len(costs), 4), "avg_quality_score": round(sum(scores) / len(scores), 2) if scores else None, }
This gives you the numbers that matter:
- p50 duration: "A typical request takes 11.2 seconds."
- p95 duration: "95% of requests complete within 24.8 seconds." (Only computed when you have 20+ traces for statistical significance.)
- avg quality score: "Our average quality score is 3.6 out of 5."
- total cost: "We've spent $0.47 across 28 research runs today."
These stats are computed on the fly from the in-memory buffer. No pre-aggregation, no materialized views. When you have 100 traces, sorting 100 numbers takes microseconds.
Loading Traces on Startup#
When the server restarts, the collector needs to rebuild its in-memory buffer from disk. The _load_recent() method handles this:
def _load_recent(self) -> None: """Load recent traces from disk on startup.""" try: files = sorted( self.storage_dir.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True, ) for f in files[:self._max_recent]: try: data = json.loads(f.read_text()) self._recent_traces.append(TraceRecord(**data)) except Exception: continue logger.info("traces_loaded", count=len(self._recent_traces)) except Exception as e: logger.warning("traces_load_failed", error=str(e))
It globs for JSON files, sorts by modification time (most recent first), and loads up to 100. Corrupt files are silently skipped — defensive parsing is critical for a tool that stores data on a developer's local disk. Files get corrupted, directories get moved, format changes happen. The collector just keeps working with whatever it can load.
The Dashboard API#
The dashboard needs data from multiple sources — traces from the collector, cost data from the cost tracker, anomalies from the anomaly detector. The dashboard_api.py module bundles these into clean REST endpoints:
router = APIRouter(prefix="/api/dashboard", tags=["dashboard"]) @router.get("/traces") async def list_traces(limit: int = 20, offset: int = 0): """List recent traces with summary info.""" collector = get_collector() return { "traces": collector.list_traces(limit=limit, offset=offset), "total": len(collector._recent_traces), } @router.get("/traces/{trace_id}") async def get_trace(trace_id: str): """Get a specific trace with all spans.""" collector = get_collector() trace = collector.get_trace(trace_id) if not trace: return {"error": "Trace not found"} from dataclasses import asdict return asdict(trace) @router.get("/stats") async def get_stats(): """Get aggregated statistics.""" _seed_anomaly_detector() collector = get_collector() cost_tracker = get_cost_tracker() return { "traces": collector.get_stats(), "cost": cost_tracker.get_summary(), "anomalies": _anomaly_detector.get_stats(), }
The /stats endpoint is the one the dashboard calls on load. It returns a combined view: trace stats (p50, p95, avg duration), cost stats (daily total, average per request), and anomaly stats (total detected, breakdown by type). One API call, one render.
The Seeding Pattern#
The anomaly detector uses Z-scores to flag outliers, but Z-scores need a baseline — you need at least 5-10 data points before "this request is 3 standard deviations above the mean" is meaningful. When the server restarts, the anomaly detector's history is empty.
The _seed_anomaly_detector() function solves this by feeding historical trace data into the detector on first access:
def _seed_anomaly_detector(): """Seed the anomaly detector with existing trace data on first access.""" global _seeded if _seeded: return _seeded = True collector = get_collector() cost_tracker = get_cost_tracker() for trace in reversed(collector._recent_traces): _anomaly_detector.check("latency", trace.duration_ms, query=trace.query[:80]) _anomaly_detector.check("cost", trace.total_cost, query=trace.query[:80]) if trace.evaluation and trace.evaluation.get("overall_score"): _anomaly_detector.check("quality", trace.evaluation["overall_score"], query=trace.query[:80]) # Clear any anomalies from seeding — we only want new ones _anomaly_detector._anomalies.clear()
The key detail: after seeding, it clears the anomaly list. The seeding is purely to build up the running mean and standard deviation — we don't want 50 false "anomalies" from historical data cluttering the alert panel. Only new requests that deviate from the historical baseline will trigger alerts.
The traces are processed in reverse order (reversed()) because the collector stores most-recent-first, but the anomaly detector needs chronological order to build its running statistics correctly.
What a Trace Looks Like on Disk#
Here's a real trace JSON file from data/traces/a3f2b8c1e4d7.json:
{ "trace_id": "a3f2b8c1e4d7", "query": "Compare the top 3 Python web frameworks for building APIs in 2026", "start_time": "2026-04-10T14:23:08.112000+00:00", "end_time": "2026-04-10T14:23:20.534000+00:00", "duration_ms": 12422.0, "total_tokens": 8420, "total_cost": 0.0312, "complexity": "complex", "status": "ok", "spans": [ { "span_id": "f8a2c1b3d4e5", "parent_id": "", "trace_id": "a3f2b8c1e4d7", "name": "research_pipeline", "start_time": 1712758988.112, "end_time": 1712758999.534, "duration_ms": 12422.0, "status": "ok", "attributes": { "gen_ai.request.model": "claude-haiku-4.5", "query": "Compare the top 3 Python web frameworks...", "complexity": "complex" }, "events": [] }, { "span_id": "b2c3d4e5f6a7", "parent_id": "f8a2c1b3d4e5", "trace_id": "a3f2b8c1e4d7", "name": "researcher_1", "duration_ms": 4200.0, "attributes": { "gen_ai.usage.input_tokens": 2800, "gen_ai.usage.output_tokens": 650 } } ], "evaluation": { "overall_score": 3.8, "accuracy": {"score": 4, "reason": "Facts are well-supported by cited sources"}, "completeness": {"score": 3, "reason": "Covers main frameworks but misses ASGI details"}, "citations": {"score": 4, "reason": "All claims have URL citations"}, "coherence": {"score": 4, "reason": "Well-structured comparison format"}, "relevance": {"score": 4, "reason": "Directly answers the comparison question"} } }
Everything in one file. The trace, its spans, the evaluation scores, the cost. You can grep across these files, pipe them through jq, or load them into a Jupyter notebook for analysis. No database needed.
What's Next#
In Blog 3: Automated Evaluation with LLM-as-Judge, we'll build the evaluation system that scores every research output on 5 criteria — accuracy, completeness, citations, coherence, and relevance. We'll walk through the judge prompt, the evaluate_research() function, and how the evaluation integrates into the pipeline at ~$0.001 per assessment.
This project is a fork of the Context Engine, which is a fork of the Deep Research Agent — read those series first for the base pipeline.
All code is open source: github.com/MinhQuanBuiSco/agent-observability