# 用 Python 做一个可验证的多智能体系统：Mock-first 到真实 LLM 后端

- 原文：Building a Multi-Agent System in Python | Towards Data Science
- 原文链接：https://towardsdatascience.com/building-a-multi-agent-system-in-python/
- 作者：Mahnoor Javed
- 来源：Towards Data Science
- 发布时间：2026-06-07
- 类型：实战教程
- 适用读者：会基础 Python，想把“多 Agent”从概念落到可运行代码的开发者

## 0. 来源边界

### 原文事实

原文用一个“旅游规划系统”解释多智能体系统：把一个旅行计划拆成四个 Agent 顺序协作。

- `Travel Research Agent`：研究目的地、景点、隐藏玩法和旅行提示。
- `Activity Planning Agent`：基于研究结果安排每日活动。
- `Budget Agent`：估算机票、签证、酒店、餐饮、交通和活动费用。
- `Final Travel Assistant`：整合前三个 Agent 的输出，生成最终行程。

原文技术路线：Python、OOP、`openai` 包、OpenRouter.ai 兼容接口、示例模型 `gpt-4.1-mini`。

### 本文提炼

本文保留原文“四角色顺序链”的核心结构，但改成更适合实战的写法：

1. 先跑通离线 Mock 闭环，不依赖 API Key。
2. 用统一 `AgentResult` 返回 envelope，避免 Mock 和真实后端接口漂移。
3. 用 `AgentBackend(Protocol)` 抽象后端，Mock 和 OpenAI/OpenRouter 后端实现同一个接口。
4. 增加 Validator，显式检查失败步骤和实时数据风险。
5. 把真实 LLM 后端标为 `contract-only extension`：代码可编译、接口一致，但本文未真实调用外部模型。

### 实践扩展

本文新增了原文没有完整覆盖的工程边界：

- API Key 只从环境变量读取。
- 非标准库依赖显式列入 `requirements.txt`。
- 结构化输出用 JSON envelope，而不是任意自然语言拼接。
- 每个核心代码块都有文件归属。
- Mock 路径实际执行验证，真实后端只做静态合同验证，不伪造 API 调用结果。

## 1. 最小架构

顺序工作流：

```text
TravelRequest
  → Research Agent
  → Activity Planner Agent
  → Budget Agent
  → Final Travel Assistant
  → Validator Agent
  → 输出结果
```

这个架构不是复杂自治 Agent 系统，而是最小可解释、可测试的多 Agent 顺序链。它适合入门和原型验证。

## 2. 项目结构

```text
python-multi-agent-demo/
├── .env.example
├── .gitignore
├── requirements.txt
└── src/
    ├── agents.py
    ├── backends.py
    ├── contracts.py
    ├── main.py
    ├── validator.py
    └── workflow.py
```

创建目录：

```bash
mkdir python-multi-agent-demo
cd python-multi-agent-demo
mkdir src
```

## 3. 配置文件

### requirements.txt

```text
openai>=1.0.0,<2.0.0
```

说明：

- Mock 路径不需要安装第三方库。
- 只有 `--backend openai` 真实后端路径需要 `openai`。
- `python-dotenv` 不作为默认依赖；如果你习惯自动加载 `.env`，可以自己作为可选便利依赖加入。

### .gitignore

```text
.env
.venv/
__pycache__/
*.pyc
```

### .env.example

```bash
OPENAI_BASE_URL=https://openrouter.ai/api/v1
OPENAI_API_KEY=replace-with-your-key
OPENAI_MODEL=gpt-4.1-mini
```

真实 `.env` 不要提交。API Key 只能放环境变量或本地私有配置。

## 4. 定义统一数据合同

### src/contracts.py

```python
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any


@dataclass
class TravelRequest:
    origin: str
    destination: str
    days: int
    travelers: int
    budget: str
    interests: str

    def to_prompt(self) -> str:
        return (
            f"Origin: {self.origin}\n"
            f"Destination: {self.destination}\n"
            f"Days: {self.days}\n"
            f"Travelers: {self.travelers}\n"
            f"Budget: {self.budget}\n"
            f"Interests: {self.interests}"
        )


@dataclass
class AgentResult:
    agent_name: str
    ok: bool
    output: dict[str, Any] = field(default_factory=dict)
    warnings: list[str] = field(default_factory=list)
    error: str | None = None
    raw: str = ""
```

关键点：

- `TravelRequest` 是用户输入。
- `AgentResult` 是所有 Agent 的统一返回 envelope。
- Mock 后端和真实 LLM 后端都必须通过 `Agent.run()` 返回 `AgentResult`，避免前后接口不一致。

## 5. 后端抽象：Mock 和真实 LLM 共用同一接口

### src/backends.py

```python
from __future__ import annotations

import os
import time
from typing import Protocol


class AgentBackend(Protocol):
    def generate(self, system_prompt: str, user_prompt: str) -> str:
        pass


class MockBackend:
    def generate(self, system_prompt: str, user_prompt: str) -> str:
        if "research" in system_prompt.lower():
            return (
                '{"summary":"Istanbul is suitable for a 3-day family trip.",'
                '"attractions":["Hagia Sophia","Topkapi Palace","Istanbul Aquarium","KidZania Istanbul"],'
                '"needs_verification":["opening hours","ticket prices"]}'
            )
        if "activity" in system_prompt.lower():
            return (
                '{"days":["Day 1: Historic district","Day 2: Aquarium and KidZania",'
                '"Day 3: Park, museum, and bazaar"],'
                '"warnings":["Do not overpack each day"]}'
            )
        if "budget" in system_prompt.lower():
            return (
                '{"currency":"USD","estimated_total_min":3100,'
                '"estimated_total_max":3800,'
                '"assumptions":["Flights and hotels are estimates, not real-time quotes"]}'
            )
        return (
            '{"itinerary":"3-day family-friendly Istanbul plan assembled from research, activities, and budget.",'
            '"disclaimer":"Visa, flight, hotel, opening-hour, and ticket data need real-time verification."}'
        )


class OpenAIBackend:
    def __init__(self, base_url: str, api_key: str, model: str, max_tokens: int = 1200, retries: int = 2) -> None:
        self.base_url = base_url
        self.api_key = api_key
        self.model = model
        self.max_tokens = max_tokens
        self.retries = retries
        self._client = None

    @classmethod
    def from_env(cls) -> "OpenAIBackend":
        api_key = os.environ["OPENAI_API_KEY"]
        base_url = os.environ.get("OPENAI_BASE_URL", "https://openrouter.ai/api/v1")
        model = os.environ.get("OPENAI_MODEL", "gpt-4.1-mini")
        return cls(base_url=base_url, api_key=api_key, model=model)

    def _get_client(self):
        if self._client is None:
            from openai import OpenAI

            self._client = OpenAI(base_url=self.base_url, api_key=self.api_key, timeout=30)
        return self._client

    def generate(self, system_prompt: str, user_prompt: str) -> str:
        client = self._get_client()
        last_error: Exception | None = None
        for attempt in range(self.retries + 1):
            try:
                response = client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": user_prompt},
                    ],
                    max_tokens=self.max_tokens,
                )
                content = response.choices[0].message.content
                if not content:
                    raise RuntimeError("empty model response")
                return content
            except Exception as exc:
                last_error = exc
                if attempt >= self.retries:
                    break
                time.sleep(1 + attempt)
        raise RuntimeError(f"model call failed: {last_error}")
```

关键点：

- `AgentBackend(Protocol)` 是统一接口。
- `MockBackend` 离线返回 JSON 字符串，用来验证工作流。
- `OpenAIBackend` 使用 OpenAI/OpenRouter 兼容接口，但只从环境变量读取密钥。
- `OpenAIBackend.generate()` 包含最小 retry、timeout、空输出检查。
- `OpenAIBackend` 延迟导入 `openai` 并复用 client；因此只跑 Mock 后端时无需安装或导入 `openai`。

真实后端运行前：

```bash
python3 -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
export OPENAI_BASE_URL="https://openrouter.ai/api/v1"
export OPENAI_API_KEY="your-api-key"
export OPENAI_MODEL="gpt-4.1-mini"
python3 src/main.py --backend openai
```

本文没有执行真实 API 调用，所以真实后端验证级别是 `real_backend_contract_ok`，不是 `real_backend_smoke_ok`。

## 6. Agent 封装：统一返回 AgentResult

### src/agents.py

```python
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any

from backends import AgentBackend
from contracts import AgentResult


@dataclass
class Agent:
    name: str
    role: str
    backend: AgentBackend

    def run(self, task: str) -> AgentResult:
        try:
            raw = self.backend.generate(self.role, task)
            return AgentResult(agent_name=self.name, ok=True, output=parse_json_object(raw), raw=raw)
        except Exception as exc:
            return AgentResult(agent_name=self.name, ok=False, error=str(exc))


def parse_json_object(text: str) -> dict[str, Any]:
    try:
        value = json.loads(text)
    except json.JSONDecodeError:
        return {"text": text}
    if isinstance(value, dict):
        return value
    return {"value": value}
```

关键点：

- `Agent.run()` 永远返回 `AgentResult`。
- 模型如果返回 JSON，解析成 dict。
- 模型如果返回普通文本，放入 `{"text": ...}`，不会让工作流崩溃。
- 异常不会静默吞掉，而是进入 `AgentResult(ok=False, error=...)`。

## 7. Validator：不要让失败结果继续伪装成成功

### src/validator.py

```python
from __future__ import annotations

from contracts import AgentResult


class TravelPlanValidator:
    def validate(self, results: list[AgentResult]) -> AgentResult:
        warnings: list[str] = []
        failed = [item for item in results if not item.ok]
        if failed:
            return AgentResult(
                agent_name="Validator Agent",
                ok=False,
                warnings=[f"{item.agent_name} failed" for item in failed],
                error="workflow contains failed agent steps",
            )

        combined = " ".join(item.raw for item in results)
        for keyword in ["Visa", "flight", "hotel", "opening-hour", "ticket"]:
            if keyword.lower() not in combined.lower():
                warnings.append(f"Consider marking {keyword} data as needing real-time verification")

        return AgentResult(
            agent_name="Validator Agent",
            ok=True,
            output={"validation": "passed_with_warnings" if warnings else "passed"},
            warnings=warnings,
        )
```

Validator 做两件事：

1. 如果前面某个 Agent 失败，最终结果标记失败。
2. 对签证、机票、酒店、开放时间、门票等实时信息做提示，避免把模型估算当事实。

这是最小实现，不是生产级事实校验。生产系统应该接官方签证、航班、酒店、地图或票务 API。

## 8. 工作流编排

### src/workflow.py

```python
from __future__ import annotations

import json

from agents import Agent
from backends import AgentBackend
from contracts import AgentResult, TravelRequest
from validator import TravelPlanValidator


def build_agents(backend: AgentBackend) -> list[Agent]:
    return [
        Agent("Research Agent", "You are a travel research agent. Return compact JSON research notes.", backend),
        Agent("Activity Planner Agent", "You are an activity planner. Return compact JSON day-wise activities.", backend),
        Agent("Budget Agent", "You are a budget agent. Return compact JSON budget estimates and assumptions.", backend),
        Agent("Final Travel Assistant", "You combine prior outputs into compact JSON final itinerary and disclaimers.", backend),
    ]


def run_workflow(request: TravelRequest, backend: AgentBackend) -> list[AgentResult]:
    context = request.to_prompt()
    results: list[AgentResult] = []

    for agent in build_agents(backend):
        result = agent.run(context)
        results.append(result)
        if not result.ok:
            break
        context = json.dumps([item.output for item in results], ensure_ascii=False)

    results.append(TravelPlanValidator().validate(results))
    return results


def format_results(results: list[AgentResult]) -> str:
    lines: list[str] = []
    for result in results:
        lines.append(f"## {result.agent_name}")
        lines.append(f"ok: {result.ok}")
        if result.output:
            lines.append(json.dumps(result.output, ensure_ascii=False, indent=2))
        if result.warnings:
            lines.append("warnings: " + "; ".join(result.warnings))
        if result.error:
            lines.append("error: " + result.error)
        lines.append("")
    return "\n".join(lines)
```

关键点：

- 每个 Agent 的输出会进入下一个 Agent 的上下文。
- 一旦某个 Agent 失败，后续业务 Agent 停止，交给 Validator 输出失败原因。
- `format_results()` 统一打印结构化结果，便于日志和调试。

## 9. 运行入口

### src/main.py

```python
from __future__ import annotations

import argparse

from backends import MockBackend, OpenAIBackend
from contracts import TravelRequest
from workflow import format_results, run_workflow


def build_request() -> TravelRequest:
    return TravelRequest(
        origin="Islamabad",
        destination="Istanbul",
        days=3,
        travelers=4,
        budget="$4000",
        interests="kid friendly",
    )


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--backend", choices=["mock", "openai"], default="mock")
    args = parser.parse_args()

    backend = MockBackend() if args.backend == "mock" else OpenAIBackend.from_env()
    results = run_workflow(build_request(), backend)
    print(format_results(results))
    return 0 if results[-1].ok else 1


if __name__ == "__main__":
    raise SystemExit(main())
```

Mock 运行：

```bash
python3 src/main.py --backend mock
```

真实后端运行：

```bash
python3 src/main.py --backend openai
```

真实后端需要环境变量和 `openai` 依赖。本文没有真实调用外部模型，因此真实后端部分是接口一致的 `contract-only extension`。

## 10. Mock 路径实际验证

验证命令：

```bash
python3 -m py_compile src/*.py
python3 src/main.py --backend mock
```

本次实际输出：

```text
## Research Agent
ok: True
{
  "summary": "Istanbul is suitable for a 3-day family trip.",
  "attractions": [
    "Hagia Sophia",
    "Topkapi Palace",
    "Istanbul Aquarium",
    "KidZania Istanbul"
  ],
  "needs_verification": [
    "opening hours",
    "ticket prices"
  ]
}

## Activity Planner Agent
ok: True
{
  "days": [
    "Day 1: Historic district",
    "Day 2: Aquarium and KidZania",
    "Day 3: Park, museum, and bazaar"
  ],
  "warnings": [
    "Do not overpack each day"
  ]
}

## Budget Agent
ok: True
{
  "currency": "USD",
  "estimated_total_min": 3100,
  "estimated_total_max": 3800,
  "assumptions": [
    "Flights and hotels are estimates, not real-time quotes"
  ]
}

## Final Travel Assistant
ok: True
{
  "itinerary": "3-day family-friendly Istanbul plan assembled from research, activities, and budget.",
  "disclaimer": "Visa, flight, hotel, opening-hour, and ticket data need real-time verification."
}

## Validator Agent
ok: True
{
  "validation": "passed"
}
```

这证明：

- 所有 Python 文件语法通过。
- Mock 多 Agent 顺序链能完整跑完。
- `Validator Agent` 参与最终输出。
- 所有 Agent 都通过统一 `AgentResult` envelope 返回。

## 11. 失败处理策略

这套最小实现采用简单规则：

- 单个 Agent 异常：`Agent.run()` 返回 `ok=False`。
- 工作流发现失败：停止后续业务 Agent。
- Validator 发现失败：最终结果 `ok=False`，并标出哪个 Agent 失败。
- 真实模型空响应：`OpenAIBackend.generate()` 抛出 `empty model response`，不会伪造成正常结果。
- 真实模型调用失败：重试后仍失败则返回 `model call failed: ...`。

可以继续扩展：

- 给每个 Agent 增加 trace id。
- 把每一步输入输出写入日志。
- 对 JSON schema 做更严格校验。
- 对预算、签证、航班接入实时 API。

## 12. 和原文示例相比，本文做了哪些增强

- 原文直接写一个 `Agent` 类调用模型；本文把 backend 抽象出来，便于 Mock 和真实模型共用。
- 原文把 API Key 写在示例代码位置；本文只使用环境变量。
- 原文输出自然语言；本文让 Mock 返回 JSON，并统一解析为 dict。
- 原文没有 Validator；本文增加最小 Validator。
- 原文没有失败 envelope；本文所有 Agent 都返回 `AgentResult`。
- 原文没有真实数据源；本文明确标注签证、航班、酒店、开放时间、门票都需要实时验证。

## 13. 常见坑

### 坑 1：Mock 和真实后端返回类型不一致

错误示例：

```python
# Mock 返回 AgentResult，但真实后端返回 str
result = agent.run(task)
print(result.output)  # 如果 result 是 str，这里会崩
```

本文避免方式：`Agent.run()` 统一包装为 `AgentResult`，backend 只负责生成字符串。

### 坑 2：项目树和代码块对不上

如果项目树里没有 `agents.py`，后面却突然出现 `Agent` 类，读者会不知道放哪。

本文每个核心代码块都标明文件路径。

### 坑 3：把发布成功当成教程成功

HTML 发布成功只说明文件生成了，不说明代码真的能跑。

本文把证据拆开：

- `static_publish_ok`：Markdown/HTML 发布成功。
- `mock_code_run_ok`：Mock 路径真实执行成功。
- `real_backend_contract_ok`：真实后端代码接口一致并可编译。
- `real_backend_smoke_ok`：真实外部模型调用成功。

### 坑 4：让模型凭记忆生成实时信息

旅游规划涉及实时信息。不要把模型输出的预算、签证、航班、酒店价格当事实。

真实产品必须接工具或 API，并把来源展示给用户。

## 14. 验收标准

一个合格的入门多 Agent 教程，至少应满足：

- 项目结构完整。
- 每个核心代码块有文件归属。
- Mock 路径不依赖外部服务即可运行。
- 真实后端路径不硬编码 API Key。
- Mock 和真实后端共享同一 Agent 接口。
- 失败不会静默变成成功。
- 最终输出明确标注实时数据风险。
- 发布证据和代码运行证据分开汇报。

## 15. 本文验证级别

- `static_publish_ok`: 发布后验证。
- `mock_code_run_ok`: 已执行 `python3 src/main.py --backend mock`，通过。
- `real_backend_contract_ok`: 已执行 `python3 -m py_compile src/*.py`，真实后端代码与统一接口可编译；未调用外部 API。
- `real_backend_smoke_ok`: 未执行。原因：需要真实 API Key 和外部模型调用，本文不伪造结果。

## 16. 总结

原文适合作为多 Agent 入门：它展示了如何把一个旅行规划任务拆成多个角色。

实战落地时，更关键的是工程边界：

- 先用 Mock 跑通闭环。
- 再抽象 backend。
- 用统一 envelope 固定接口。
- 给失败路径和实时数据风险留出口。
- 最后再接真实模型。

**多 Agent 的重点不是 Agent 数量多，而是职责边界清楚、接口一致、数据流可验证、失败路径可控。**
