Back
View source
AI Engineering··18 min

Deep Research Agent Series — Blog 2: Multi-Agent Orchestration

Build a parallel multi-agent research pipeline using Strands Agents SDK, asyncio.gather, and Amazon Bedrock. Learn how orchestrator, researcher, and critique agents work together to produce comprehensive cited reports.

Deep Research Agent Series — Blog 2: Multi-Agent Orchestration#

A single AI agent is smart. A team of specialized agents — a planner, four parallel researchers, and a critic — is transformative. In this post, we build the multi-agent orchestration layer that turns a research question into a comprehensive cited report in under 20 seconds. No frameworks bolted on top of frameworks. Just Strands Agents SDK, Python's asyncio.gather, and clean architecture.


Series Navigation#

PartTopicStatus
1Architecture & VisionPublished
2Multi-Agent OrchestrationYou are here
3Smart Search & Source IntelligencePublished
4Real-Time Streaming & WebSocketPublished
5Cloud-Native Infrastructure on AWSPublished
6Security & Production HardeningPublished

The Agent Architecture#

The research pipeline uses three distinct agent types, each with a single responsibility:

  1. Orchestrator — The entry point. Classifies incoming messages as simple chat or complex research, then either responds directly or kicks off the full pipeline.
  2. Researcher — A disposable sub-agent spawned per sub-query. Four of these run in parallel, each with its own tools, context, and source list. They never share state.
  3. Critique — A quality gate. Reviews the consolidated findings and issues a PASS, REFINE, or FAIL verdict before synthesis begins.

This separation matters. The orchestrator never touches raw search results. The researchers never see each other's findings. The critic never generates content. Each agent does one thing well.


Setting Up Strands SDK with Bedrock#

Every agent in the system starts with the same model factory. This is the actual code from the project:

from strands import Agent
from strands.models.bedrock import BedrockModel

from config import settings


def _create_model() -> BedrockModel:
    """Create a Bedrock model config."""
    kwargs = {
        "model_id": settings.bedrock_model_id,
        "region_name": settings.aws_region,
    }
    if settings.bedrock_guardrail_id:
        kwargs["guardrail_id"] = settings.bedrock_guardrail_id
        kwargs["guardrail_version"] = settings.bedrock_guardrail_version
    return BedrockModel(**kwargs)

The model_id is us.anthropic.claude-haiku-4-5-20251001-v1:0 — Claude Haiku 4.5 via Amazon Bedrock's cross-region inference. We chose Haiku for two reasons: speed (sub-second first token) and cost (roughly 10x cheaper than Sonnet for the same token count). For a pipeline that spawns six or more agent calls per research query, cost discipline at the model layer is non-negotiable.

Guardrails are optional but recommended for production. When a bedrock_guardrail_id is configured, every agent call passes through Bedrock Guardrails for PII detection and content filtering — no per-agent opt-in required.


The Orchestrator Agent#

The orchestrator is the only agent the WebSocket handler touches directly. It decides whether a message needs a simple chat response or the full research pipeline:

def create_orchestrator() -> Agent:
    """Create the main orchestrator agent for chat-mode responses."""
    return Agent(
        model=_create_model(),
        system_prompt=ORCHESTRATOR_PROMPT,
        tools=[tavily_search, plan_research, score_credibility, save_report],
        callback_handler=None,
    )

The callback_handler=None is intentional — we handle streaming ourselves through the WebSocket layer rather than using Strands' built-in callback system. This gives us full control over the event format sent to the frontend.

Here is the full orchestrator system prompt from agents/prompts.py:

ORCHESTRATOR_PROMPT = """\
You are a Deep Research Agent — a conversational AI that helps users
research complex topics.

Your capabilities:
1. **Chat**: Answer simple questions directly from your knowledge.
2. **Research**: For complex questions, decompose them into sub-queries,
   research each in parallel, critique findings, and produce a
   comprehensive cited report.

Behavior:
- Classify each user message as CHAT (simple) or RESEARCH (complex,
  multi-faceted).
- For CHAT: respond directly, concisely, and helpfully.
- For RESEARCH: use plan_research to decompose, then dispatch to the
  research swarm.
- Always cite sources with URLs when providing research findings.
- Stream progress updates to keep the user informed.
- If a research query is ambiguous, ask clarifying questions before
  proceeding.
- Respect the token budget — never exceed the per-query cost limit.
"""

The CHAT vs RESEARCH classification happens at the LLM level. Simple questions ("What is RAG?") get a direct response. Multi-faceted questions ("Compare LangChain, CrewAI, and Strands SDK for production AI agents in 2026") trigger the full pipeline. The agent makes this judgment based on the prompt — no regex routing or keyword matching.


Query Decomposition#

The first step of the research pipeline breaks a broad question into four focused, non-overlapping sub-queries. This uses a dedicated planner agent with no tools — pure reasoning:

planner = Agent(
    model=_create_model(),
    system_prompt="You decompose research questions into focused sub-queries. "
                  "Output ONLY valid JSON.",
    tools=[],
    callback_handler=None,
)

decompose_prompt = (
    f"Decompose this research question into 4 specific, non-overlapping "
    f"search queries that together cover all aspects. Output ONLY a JSON "
    f"array, no other text.\n\n"
    f"Question: {query}\n\n"
    f'Format: ["query1", "query2", "query3", "query4"]'
)

The JSON parsing includes a fallback for when the LLM wraps the array in markdown or adds commentary:

try:
    start = sub_queries_text.index("[")
    end = sub_queries_text.rindex("]") + 1
    sub_queries = json.loads(sub_queries_text[start:end])
except (ValueError, json.JSONDecodeError):
    logger.warning("failed_to_parse_sub_queries", raw=sub_queries_text)
    sub_queries = [query]  # Fallback: research the original query

The bracket-finding approach is deliberately simple. We tried structured output schemas, but Haiku 4.5 reliably produces clean JSON arrays for this prompt. The fallback to a single query ensures the pipeline never crashes — it just produces a less comprehensive report.


Parallel Research with asyncio.gather#

This is the core innovation of the pipeline. Instead of researching sub-queries sequentially (4x ~12 seconds = ~48 seconds), we spawn four independent agent instances and run them concurrently:

@dataclass
class ResearchResult:
    """Result from a single researcher agent."""
    sub_query: str
    findings: str = ""
    sources: list[dict] = field(default_factory=list)
    error: str = ""

Each researcher gets its own source-capturing tool wrappers. This is what makes structured source extraction possible without parsing agent text output:

async def _research_sub_query(sub_query: str, index: int) -> ResearchResult:
    """Research a single sub-query with source capture."""
    result = ResearchResult(sub_query=sub_query)
    captured_sources: list[dict] = []

    try:
        search_tool, cred_tool = _create_source_capturing_tools(captured_sources)

        agent = Agent(
            model=_create_model(),
            system_prompt=f"{RESEARCHER_PROMPT}\n\nYour assigned sub-query: {sub_query}",
            tools=[search_tool, cred_tool],
            callback_handler=None,
        )

        research_prompt = (
            f"Research this thoroughly. Use capturing_tavily_search to find "
            f"information, then capturing_score_credibility on each source. "
            f"Provide a detailed summary with source URLs.\n\nQuery: {sub_query}"
        )

        agent_result = await agent.invoke_async(research_prompt)
        result.findings = str(agent_result)
        result.sources = captured_sources
    except Exception as e:
        result.error = str(e)
        logger.error("sub_query_failed", index=index, error=str(e))
    return result

Then the parallel execution with a keepalive pattern to prevent CloudFront from dropping the WebSocket:

research_tasks = [
    _research_sub_query(sq, i)
    for i, sq in enumerate(sub_queries)
]
gather_future = asyncio.gather(*research_tasks)

# Keepalive loop: send pings every 10s while research runs
while not gather_future.done():
    try:
        research_results = await asyncio.wait_for(
            asyncio.shield(gather_future), timeout=10.0
        )
        break
    except asyncio.TimeoutError:
        yield {"type": "keepalive"}
else:
    research_results = gather_future.result()

The asyncio.shield call is critical. Without it, wait_for would cancel the gather future on timeout. With shield, the timeout only breaks out of the wait — the research tasks keep running. Each 10-second cycle sends a keepalive event to the client, preventing CloudFront's 60-second idle timeout from killing the connection.

After the gather completes, we separate successes from failures:

successful = [r for r in research_results if r.findings and not r.error]
failed = [r for r in research_results if r.error]

If one researcher fails (network error, Bedrock throttling, malformed response), the others continue. The final report is built from whatever succeeded. This graceful degradation means a 75% success rate still produces a useful report.


Source-Capturing Tool Wrappers#

Each researcher gets its own captured_sources list via closure. The tool wrappers intercept Tavily results and credibility scores, extracting structured metadata without parsing the agent's natural language output:

def _create_source_capturing_tools(captured_sources: list[dict]):
    @tool
    def capturing_tavily_search(query: str, max_results: int = 5) -> dict:
        """Search the web using Tavily API."""
        result = tavily_search(query=query, max_results=max_results)
        for r in result.get("results", []):
            url = r.get("url", "")
            if url and not any(s["url"] == url for s in captured_sources):
                domain_match = re.match(r'https?://(?:www\.)?([^/]+)', url)
                captured_sources.append({
                    "url": url,
                    "title": r.get("title", ""),
                    "domain": domain_match.group(1) if domain_match else url,
                    "snippet": r.get("content", "")[:200],
                    "credibility_score": 50,
                    "credibility_tier": "tier3",
                })
        return result

    @tool
    def capturing_score_credibility(url: str, domain: str,
                                     content_snippet: str) -> dict:
        """Score source credibility."""
        result = score_credibility(url=url, domain=domain,
                                    content_snippet=content_snippet)
        for src in captured_sources:
            if src["url"] == url:
                src["credibility_score"] = result.get("score", 50)
                src["credibility_tier"] = result.get("tier", "tier3")
                break
        return result

    return capturing_tavily_search, capturing_score_credibility

This pattern — wrapping existing tools with a closure-based side channel — is far more reliable than regex-parsing agent output. The agent sees normal tool responses. We get structured metadata as a byproduct.


The Critique Agent#

Before synthesis, a critique agent reviews the consolidated findings. The prompt is tight — we want a fast verdict, not a dissertation:

CRITIQUE_PROMPT = """\
You are a Research Critique Agent. Review research findings for quality
and completeness.

Evaluate:
1. **Coverage**: Are all aspects of the sub-query addressed?
2. **Accuracy**: Do findings align across multiple sources?
3. **Recency**: Is the information current and relevant?
4. **Bias**: Are findings balanced, not one-sided?
5. **Gaps**: What important aspects are missing?

Output:
- PASS: Findings are sufficient quality. Brief quality summary.
- REFINE: Findings need improvement. Specify what to research further.
- FAIL: Findings are inadequate. Explain why and suggest alternatives.
"""

The critique call truncates findings to 4000 characters to keep costs low:

critique_agent = _create_critique_agent()
critique_prompt = (
    f"Review the following research findings for quality and completeness. "
    f"Be concise — 2-3 sentences max.\n"
    f"Original question: {query}\n\n"
    f"Findings:\n{combined_findings[:4000]}\n\n"
    f"Respond with: PASS, REFINE, or FAIL followed by a brief reason."
)

In practice, findings almost always PASS. The critique agent's real value is catching edge cases — a researcher that returned only one source, findings that contradict each other, or a sub-query that was misinterpreted. The verdict is passed to the synthesizer so it can acknowledge limitations in the final report.


Report Synthesis#

The final agent produces the report. It receives the combined findings, the critique verdict, and a numbered source reference list:

source_refs = "\n".join(
    f"[{i+1}] {s.get('title', s['domain'])}{s['url']} "
    f"(credibility: {s.get('credibility_score', 50)}%)"
    for i, s in enumerate(unique_sources[:8])
)

synthesizer = Agent(
    model=_create_model(),
    system_prompt=(
        "You are a research report writer. Produce well-structured, "
        "comprehensive markdown reports with inline citations [1], [2], "
        "etc. Be thorough but concise."
    ),
    tools=[],
    callback_handler=None,
)

The synthesizer streams directly to the WebSocket — each token appears in the client's chat window in real-time. No buffering, no waiting for the complete report.

Sources are deduplicated by URL before synthesis. When the same URL appears across multiple researchers, we keep the entry with the highest credibility score:

seen_urls: dict[str, dict] = {}
for src in all_sources:
    url = src["url"]
    if url not in seen_urls or src.get("credibility_score", 0) > \
       seen_urls[url].get("credibility_score", 0):
        seen_urls[url] = src

unique_sources = sorted(
    seen_urls.values(),
    key=lambda s: s.get("credibility_score", 0),
    reverse=True,
)

The Full Pipeline at a Glance#

Query → Decompose → [R1 | R2 | R3 | R4] → Consolidate → Critique → Synthesize → Stream
         ~2s            ~12s (parallel)        ~1s          ~2s         ~3s
                                                                    Total: ~20s

Six agent calls. Four of them parallel. One WebSocket connection. Under 20 seconds for a comprehensive cited report.


Key Design Decisions#

1. Independent agent instances, not shared context. Each researcher gets a fresh Agent() with its own system prompt, tools, and conversation history. This prevents context pollution — researcher 3's findings about pricing never leak into researcher 1's analysis of features. The tradeoff is higher Bedrock API calls, but Haiku 4.5 at $0.80/M input tokens makes this negligible.

2. Source-capturing tool wrappers over output parsing. We wrap the real tavily_search and score_credibility tools with closures that intercept structured data. This gives us reliable metadata (URL, title, domain, credibility score) without fragile regex parsing of agent prose.

3. Graceful degradation on partial failure. If one of four researchers throws an exception, asyncio.gather still completes. We filter out failed results and synthesize from what succeeded. A 3-out-of-4 report is better than a crash.

4. Budget control via configuration. The Settings class caps max_cost_per_query at $2.00 and max_tokens_per_query at 200,000. Combined with Haiku 4.5's pricing, a typical research query costs under $0.05.

5. Keepalive during long operations. CloudFront drops idle WebSocket connections after 60 seconds. The asyncio.shield + wait_for pattern sends keepalive events every 10 seconds during the parallel research phase, keeping the connection alive without blocking the gather.


What's Next#

In Blog 3: Smart Search & Source Intelligence, we'll dive into the Tavily search integration — how the tool wrapper handles rate limiting with a circuit breaker pattern, how credibility scoring works across domain tiers, and why structured tool results beat raw API responses for agent consumption.


All code is open source: github.com/MinhQuanBuiSco/deep-research-agent