Deep Research Agent Series — Blog 3: Smart Search & Source Intelligence#
An AI research agent is only as good as its sources. You can have the most sophisticated orchestrator, the cleverest prompt engineering, and the fastest streaming pipeline — but if your search layer returns garbage, you get a garbage report. In this post, we build the search and source intelligence layer — Tavily web search with a circuit breaker for fault tolerance, domain-based credibility scoring, and a clever source-capturing pattern that gives us structured metadata without parsing LLM output.
Series Navigation#
| Part | Topic | Status |
|---|---|---|
| Blog 1 | Architecture & Vision | Published |
| Blog 2 | Multi-Agent Orchestration | Published |
| Blog 3 | Smart Search & Source Intelligence | You are here |
| Blog 4 | Real-Time Streaming with WebSocket | Published |
| Blog 5 | Cloud-Native Infrastructure on AWS | Published |
| Blog 6 | Security & Production Hardening | Published |
The Search Pipeline#
Our search layer isn't a single function call — it's a pipeline of four components working together:
- Tavily Search — web search API that returns structured results with content snippets
- Circuit Breaker — fault tolerance so a flaky API doesn't take down the whole agent
- Credibility Scorer — domain-based quality rating for every source
- Source-Capturing Wrappers — structured metadata extraction at invocation time
Each component is implemented as a Strands SDK @tool, which means the LLM can invoke them directly. Let's walk through each one.
Tavily Search with Circuit Breaker#
Tavily is a search API designed specifically for AI agents — it returns clean, structured results with content snippets, URLs, and titles. But any external API can fail, and when your research agent is making 20+ search calls across parallel sub-queries, you need resilience.
The circuit breaker pattern prevents cascading failures. Instead of hammering a failing API and waiting for timeouts, we fail fast once we detect a problem.
Here's the full implementation:
"""Web search tool with circuit breaker pattern.""" import time import structlog from strands import tool logger = structlog.get_logger() # Circuit breaker state (module-level for persistence across calls) _failure_count = 0 _circuit_open_until = 0.0 _FAILURE_THRESHOLD = 3 _RECOVERY_TIMEOUT = 60 @tool def tavily_search(query: str, max_results: int = 5) -> dict: """Search the web for information on a given query using Tavily API.""" global _failure_count, _circuit_open_until # Circuit breaker: if open, fail fast if _failure_count >= _FAILURE_THRESHOLD: if time.time() < _circuit_open_until: return { "results": [], "error": "Search temporarily unavailable", "circuit_breaker": True, } # Half-open: timeout expired, reset and try again _failure_count = 0 try: from tavily import TavilyClient import boto3 import json from config import settings # Retrieve API key from Secrets Manager or environment if settings.tavily_api_key_secret_arn: secrets = boto3.client( "secretsmanager", region_name=settings.aws_region ) secret = secrets.get_secret_value( SecretId=settings.tavily_api_key_secret_arn ) api_key = json.loads(secret["SecretString"])["TAVILY_API_KEY"] else: import os api_key = os.environ.get("TAVILY_API_KEY", "") client = TavilyClient(api_key=api_key) response = client.search(query=query, max_results=max_results) # Success: reset failure count _failure_count = 0 results = [] for r in response.get("results", []): results.append({ "title": r.get("title", ""), "url": r.get("url", ""), "content": r.get("content", ""), "score": r.get("score", 0), }) logger.info("tavily_search_success", query=query, count=len(results)) return {"results": results, "query": query} except Exception as e: _failure_count += 1 if _failure_count >= _FAILURE_THRESHOLD: _circuit_open_until = time.time() + _RECOVERY_TIMEOUT logger.warning( "circuit_breaker_opened", failures=_failure_count, recovery_seconds=_RECOVERY_TIMEOUT, ) logger.error("tavily_search_error", error=str(e), query=query) return {"results": [], "error": str(e)}
Circuit Breaker State Machine#
The circuit breaker has three states:
┌──────────┐ 3 failures ┌──────────┐
│ CLOSED │ ──────────────→ │ OPEN │
│ (normal) │ │(fail fast)│
└──────────┘ └────┬─────┘
↑ │
│ 60s timeout │
│ ┌────▼──────┐
└────── success ────── │ HALF-OPEN │
│ (try one) │
└───────────┘
- Closed — Normal operation. Every successful call keeps the counter at zero. Failures increment
_failure_count. - Open — After 3 consecutive failures, the circuit opens. All calls return immediately with
"circuit_breaker": True"for 60 seconds. No API calls are made. - Half-Open — After the 60-second timeout, the next call resets
_failure_countto zero and actually hits the API. If it succeeds, we're back to Closed. If it fails, we go right back to Open.
Why does this matter? When the agent is running 4 parallel researchers, each making 3-5 search calls, a single Tavily outage would mean 12-20 hanging requests. The circuit breaker detects the pattern after just 3 failures and prevents the other calls from even trying.
Notice we also integrate with AWS Secrets Manager for the API key. In production, secrets should never live in environment variables on the container — Secrets Manager provides rotation, auditing, and fine-grained IAM access control.
Credibility Scoring#
Not all sources are created equal. A result from nature.com or arxiv.org carries more weight than a random blog post. The credibility scorer assigns a tier and numeric score to every source based on its domain.
"""Source credibility scoring tool.""" from strands import tool @tool def score_credibility(url: str, domain: str, content_snippet: str) -> dict: """Score the credibility of a source based on domain reputation.""" tier1_domains = { "reuters.com", "apnews.com", "nature.com", "science.org", "arxiv.org", "scholar.google.com", "pubmed.ncbi.nlm.nih.gov", "bbc.com", "nytimes.com", "washingtonpost.com", } tier2_domains = { "wikipedia.org", "medium.com", "techcrunch.com", "arstechnica.com", "theverge.com", "wired.com", "aws.amazon.com", "cloud.google.com", "learn.microsoft.com", "stackoverflow.com", "github.com", } domain_lower = domain.lower() if any(d in domain_lower for d in tier1_domains): score, tier = 90, "tier1" reasoning = "Established authoritative source" elif any(d in domain_lower for d in tier2_domains): score, tier = 70, "tier2" reasoning = "Reputable secondary source" elif domain_lower.endswith(".gov") or domain_lower.endswith(".edu"): score, tier = 85, "tier1" reasoning = "Government or academic institution" else: score, tier = 50, "tier3" reasoning = "Unknown domain — treat with caution" return { "url": url, "domain": domain, "score": score, "tier": tier, "reasoning": reasoning, }
The Tier System#
| Tier | Score | Examples | Rationale |
|---|---|---|---|
| Tier 1 | 85-90 | reuters.com, nature.com, arxiv.org, .gov, .edu | Established authorities, peer-reviewed, government data |
| Tier 2 | 70 | wikipedia.org, techcrunch.com, aws.amazon.com | Reputable but editorial or community-driven |
| Tier 3 | 50 | Unknown domains | No reputation data — default caution score |
This is intentionally simple. A more sophisticated system could factor in content freshness, citation counts, or even run a secondary LLM evaluation. But for our use case — fast research reports — domain-based scoring gives us 80% of the value with zero latency cost. The score is a numeric value the synthesis agent can use to weight its citations: tier 1 sources get cited first, tier 3 sources get mentioned with caveats.
The Source-Capturing Pattern#
This is the most interesting design decision in the search layer. Here's the problem: after the research agents finish, we need a structured list of all sources with their URLs, titles, credibility scores, and snippets. The naive approach would be to parse the LLM's text output with regex, looking for URLs and extracting metadata. That's fragile and error-prone.
Instead, we wrap the tools to capture structured data as they're invoked. The LLM never needs to output source metadata in a specific format — we intercept it at the tool boundary.
import re def _create_source_capturing_tools(captured_sources: list[dict]): """Create tool wrappers that capture source metadata on invocation.""" @tool def capturing_tavily_search( query: str, max_results: int = 5 ) -> dict: """Search the web and capture source metadata.""" result = tavily_search(query=query, max_results=max_results) for r in result.get("results", []): url = r.get("url", "") # Deduplicate by URL if url and not any(s["url"] == url for s in captured_sources): domain_match = re.match( r"https?://(?:www\.)?([^/]+)", url ) captured_sources.append({ "url": url, "title": r.get("title", ""), "domain": ( domain_match.group(1) if domain_match else url ), "snippet": r.get("content", "")[:200], "credibility_score": 50, "credibility_tier": "tier3", }) return result @tool def capturing_score_credibility( url: str, domain: str, content_snippet: str ) -> dict: """Score credibility and update captured source metadata.""" result = score_credibility( url=url, domain=domain, content_snippet=content_snippet ) # Update the captured source with its credibility score for src in captured_sources: if src["url"] == url: src["credibility_score"] = result.get("score", 50) src["credibility_tier"] = result.get("tier", "tier3") break return result return capturing_tavily_search, capturing_score_credibility
Why This Pattern Is Powerful#
The source-capturing wrapper solves several problems at once:
- No regex parsing of LLM output. We capture structured data at the tool boundary, where it's already well-formed. The LLM can describe sources however it wants in its natural language response — we don't depend on its formatting.
- Deduplication is built in. The
not any(s["url"] == url ...)check ensures each URL appears only once in the captured sources list, even if multiple sub-queries return the same result. - Credibility scores attach automatically. When the agent calls
capturing_score_credibility, the score is written back to the existing source entry. By the time research is complete, every source has both its metadata and its credibility assessment. - The
captured_sourceslist is a shared mutable reference. We pass a single list into the factory function, and both wrappers write to it. After the agent finishes, the orchestrator simply reads this list — no parsing, no extraction, no post-processing.
This is a general pattern worth remembering: when you need structured side-channel data from an agentic workflow, capture it at the tool boundary, not from the LLM's text output.
Source Consolidation#
After all parallel researchers complete, we consolidate their captured sources into a single deduplicated, ranked list. If the same URL was found by multiple researchers, we keep the entry with the highest credibility score:
def _consolidate_sources(all_sources: list[dict]) -> list[dict]: """Deduplicate and rank sources by credibility score.""" seen_urls: dict[str, dict] = {} for src in all_sources: url = src["url"] if ( url not in seen_urls or src.get("credibility_score", 0) > seen_urls[url].get("credibility_score", 0) ): seen_urls[url] = src unique_sources = list(seen_urls.values()) unique_sources.sort( key=lambda s: s.get("credibility_score", 0), reverse=True ) return unique_sources
The result is a clean list ordered from most credible to least. The synthesis agent uses this ordering to prioritize which sources to cite first in the final report.
The @tool Decorator#
You'll notice every function above is decorated with @tool from the Strands Agents SDK. This decorator does something clever: it inspects the function's signature and docstring and automatically generates a JSON schema that the LLM can understand.
When you write:
@tool def tavily_search(query: str, max_results: int = 5) -> dict: """Search the web for information on a given query using Tavily API."""
Strands generates a tool definition like:
{ "name": "tavily_search", "description": "Search the web for information on a given query using Tavily API.", "input_schema": { "type": "object", "properties": { "query": {"type": "string"}, "max_results": {"type": "integer", "default": 5} }, "required": ["query"] } }
This schema is passed to the model in the tool configuration, so the LLM knows exactly what parameters to provide. No manual schema writing, no separate tool definition files — just Python functions with type hints and docstrings. When we create the capturing wrappers, they get their own schemas too, which is why the LLM can seamlessly call capturing_tavily_search instead of the raw tavily_search.
Putting It All Together#
Here's how these components connect in the orchestrator:
- The orchestrator creates a
captured_sourceslist and passes it to_create_source_capturing_tools - Each parallel researcher agent gets the capturing wrappers as its tools
- As researchers call
capturing_tavily_search, sources accumulate in the shared list - Researchers call
capturing_score_credibilityto evaluate key sources - After all researchers complete,
_consolidate_sourcesproduces the final ranked list - The synthesis agent receives the consolidated sources and uses them for inline citations
The entire flow is transparent to the LLM — it just calls tools normally. All the metadata capture, deduplication, and scoring happens in the Python wrapper layer.
What's Next#
In Blog 4: Real-Time Streaming, we'll tackle how research progress, generated text, and source metadata are streamed to the frontend over WebSocket. We'll cover the custom callback handler that intercepts Strands SDK events, the message protocol for different event types, and how CloudFront handles WebSocket connections through the ALB.
The search layer gives us data. The streaming layer gives us the user experience.