# 企业内网 Linux 运维 AI Agent Tool 落地教程：从只读巡检工具开始

## 改写后的请求

基于 KDnuggets 文章《Easy Agentic Tool Calling with Gemma 4》，提炼一份适合企业内网 Linux 系统运维团队试点的 AI Agent Tool 落地教程。教程需要包含：最小架构、工具边界、安全控制、可复制的示例代码、样例输入、预期输出、验收标准、失败处理和分阶段上线计划。

## 改写点

- 明确受众：企业内网 Linux 运维团队，而不是泛泛的 AI Agent 爱好者。
- 明确范围：第一阶段只做只读巡检和报告生成，不做自动修复、删除、重启、发布等高风险动作。
- 明确交付物：教程必须能落地执行，包含目录结构、命令、代码、样例、验收和失败处理。
- 明确安全姿态：默认离线/内网、路径受限、命令白名单、日志脱敏、人工确认。

## 来源与边界

- 原文：<https://www.kdnuggets.com/easy-agentic-tool-calling-with-gemma-4>
- 摘要路径：`/home/lin/.hermes/projects/hermes-gsummary-workflow/runs/outputs/20260525-152532-Easy-Agentic-Tool-Calling-with-Gemma-4-KDnuggets-400847-851710280-summary.md`
- 来源事实：原文展示了 Gemma 4 本地模型、文件浏览工具、受限 Python 执行工具、路径沙箱、JSON Schema 工具调用与链式调用。
- 本教程扩展：企业内网 Linux 运维场景、安全边界、目录规范、验收清单和上线计划是面向落地的实践补充，不是原文直接给出的方案。

## 一句话结论

企业内网运维 AI Agent 不应从“自动修复服务器”开始，而应从“只读巡检工具 + 受限计算工具 + 人工确认报告”开始，先证明它能可靠观察系统、正确调用工具、清楚暴露风险，再逐步扩大权限。

## 适用场景

适合：

- 企业内网 Linux 主机巡检
- 运维日报/周报生成
- 故障初筛和证据收集
- 服务状态、磁盘、日志、端口的只读分析
- 离线或半离线环境中的本地 LLM 试点

不适合第一阶段直接做：

- 自动执行 `rm`、`kill`、`systemctl restart`、`kubectl delete`
- 自动修改生产配置
- 自动连接生产数据库执行 SQL
- 自动发布、回滚、扩容、清理磁盘
- 无审计地读取任意目录或日志

## 最小可落地架构

```text
运维人员
  |
  |  输入自然语言任务：
  |  “检查这台机器磁盘、失败服务和最近错误日志，生成报告”
  v
AI Agent 编排层
  |
  |-- Tool 1: system_snapshot      只读系统快照
  |-- Tool 2: service_status       只查服务状态
  |-- Tool 3: recent_errors        只读最近错误日志
  |-- Tool 4: safe_list_dir        限定目录浏览
  |-- Tool 5: safe_python_calc     受限计算/格式化
  |
  v
本地/内网模型服务
  - Ollama / vLLM / llama.cpp / OpenAI-compatible endpoint
  - 第一阶段可用 mock 模式验证工具链，不依赖真实模型

输出目录
  ./outputs/
    report-YYYYmmdd-HHMMSS.md
    trace-YYYYmmdd-HHMMSS.jsonl

审计边界
  - 工具调用参数
  - 工具返回摘要
  - 拒绝原因
  - 报告版本
```

核心原则：

- 模型只做判断和编排，不直接拿 shell。
- 工具只开放最小只读能力。
- 所有工具都有输入校验、超时、返回结构和审计记录。
- 第一阶段只写 `outputs/`，不写系统目录。
- 任何生产变更必须由人执行或审批后执行。

## 试点目录结构

```bash
mkdir -p intranet-ops-agent/{tools,outputs,samples,logs}
cd intranet-ops-agent
```

建议结构：

```text
intranet-ops-agent/
├── ops_agent.py              # 最小 Agent 与工具实现
├── samples/
│   ├── incident_prompt.txt   # 样例任务
│   └── allowed_paths.txt     # 允许浏览路径说明
├── outputs/                  # 只允许写报告和 trace
├── logs/                     # 本工具自己的日志，不写系统日志
└── README.md
```

## 第一步：实现只读工具，不接模型也能跑

下面代码只使用 Python 标准库，适合先在内网测试机验证工具边界。

保存为 `ops_agent.py`：

```python
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import contextlib
import io
import json
import math
import os
import statistics
import subprocess
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Callable

BASE_DIR = Path(__file__).resolve().parent
OUTPUT_DIR = BASE_DIR / "outputs"
SAFE_BASE_DIRS = [Path("/var/log").resolve(), BASE_DIR.resolve()]
COMMAND_TIMEOUT = 8


@dataclass
class ToolResult:
    ok: bool
    tool: str
    data: dict[str, Any]
    error: str = ""


def run_command(argv: list[str]) -> tuple[bool, str]:
    try:
        completed = subprocess.run(
            argv,
            check=False,
            capture_output=True,
            text=True,
            timeout=COMMAND_TIMEOUT,
        )
    except subprocess.TimeoutExpired:
        return False, f"command timeout after {COMMAND_TIMEOUT}s"
    except FileNotFoundError:
        return False, f"command not found: {argv[0]}"

    output = (completed.stdout + "\n" + completed.stderr).strip()
    return completed.returncode == 0, output[-6000:]


def system_snapshot(_: dict[str, Any]) -> ToolResult:
    commands = {
        "hostname": ["hostname"],
        "uptime": ["uptime"],
        "disk": ["df", "-hT", "/"],
        "memory": ["free", "-h"],
    }
    data: dict[str, Any] = {}
    for name, argv in commands.items():
        ok, output = run_command(argv)
        data[name] = {"ok": ok, "output": output}
    return ToolResult(ok=True, tool="system_snapshot", data=data)


def service_status(args: dict[str, Any]) -> ToolResult:
    service = str(args.get("service", "")).strip()
    if not service or any(ch in service for ch in " ;|&`$><"):
        return ToolResult(False, "service_status", {}, "invalid service name")

    ok, output = run_command(["systemctl", "status", service, "--no-pager", "--lines", "20"])
    return ToolResult(ok=ok, tool="service_status", data={"service": service, "status": output})


def recent_errors(args: dict[str, Any]) -> ToolResult:
    unit = str(args.get("unit", "")).strip()
    since = str(args.get("since", "1 hour ago")).strip()

    argv = ["journalctl", "-p", "3", "--since", since, "--no-pager", "-n", "80"]
    if unit:
        if any(ch in unit for ch in " ;|&`$><"):
            return ToolResult(False, "recent_errors", {}, "invalid unit name")
        argv.extend(["-u", unit])

    ok, output = run_command(argv)
    return ToolResult(ok=ok, tool="recent_errors", data={"unit": unit or "all", "errors": output})


def is_allowed_path(path: Path) -> bool:
    try:
        requested = path.resolve()
    except FileNotFoundError:
        requested = path.parent.resolve() / path.name

    for base in SAFE_BASE_DIRS:
        if requested == base or str(requested).startswith(str(base) + os.sep):
            return True
    return False


def safe_list_dir(args: dict[str, Any]) -> ToolResult:
    raw_path = str(args.get("path", ".")).strip() or "."
    path = Path(raw_path)
    if not path.is_absolute():
        path = BASE_DIR / path

    if not is_allowed_path(path):
        return ToolResult(False, "safe_list_dir", {}, f"access denied: {raw_path}")
    if not path.exists() or not path.is_dir():
        return ToolResult(False, "safe_list_dir", {}, f"not a directory: {raw_path}")

    entries = []
    for item in sorted(path.iterdir())[:100]:
        stat = item.stat()
        entries.append({
            "name": item.name,
            "type": "dir" if item.is_dir() else "file",
            "size": stat.st_size,
        })
    return ToolResult(True, "safe_list_dir", {"path": str(path), "entries": entries})


def safe_python_calc(args: dict[str, Any]) -> ToolResult:
    code = str(args.get("code", ""))
    if not code.strip():
        return ToolResult(False, "safe_python_calc", {}, "empty code")
    if len(code) > 2000:
        return ToolResult(False, "safe_python_calc", {}, "code too long")

    allowed_builtins = {
        "abs": abs, "all": all, "any": any, "bool": bool, "dict": dict,
        "divmod": divmod, "enumerate": enumerate, "filter": filter,
        "float": float, "int": int, "len": len, "list": list, "map": map,
        "max": max, "min": min, "pow": pow, "print": print, "range": range,
        "repr": repr, "reversed": reversed, "round": round, "set": set,
        "sorted": sorted, "str": str, "sum": sum, "tuple": tuple, "zip": zip,
    }
    globals_dict = {
        "__builtins__": allowed_builtins,
        "math": math,
        "statistics": statistics,
    }
    stdout = io.StringIO()
    try:
        with contextlib.redirect_stdout(stdout):
            exec(code, globals_dict, {})
    except Exception as exc:
        return ToolResult(False, "safe_python_calc", {}, f"execution error: {exc}")

    output = stdout.getvalue().strip()
    if not output:
        return ToolResult(False, "safe_python_calc", {}, "no output; use print()")
    return ToolResult(True, "safe_python_calc", {"output": output})


TOOLS: dict[str, Callable[[dict[str, Any]], ToolResult]] = {
    "system_snapshot": system_snapshot,
    "service_status": service_status,
    "recent_errors": recent_errors,
    "safe_list_dir": safe_list_dir,
    "safe_python_calc": safe_python_calc,
}


def call_tool(name: str, args: dict[str, Any]) -> ToolResult:
    tool = TOOLS.get(name)
    if tool is None:
        return ToolResult(False, name, {}, "unknown tool")
    return tool(args)


def write_report(prompt: str, results: list[ToolResult]) -> Path:
    OUTPUT_DIR.mkdir(exist_ok=True)
    report_path = OUTPUT_DIR / f"report-{time.strftime('%Y%m%d-%H%M%S')}.md"
    lines = [
        "# Linux 运维 AI Agent 只读巡检报告",
        "",
        f"- 任务：{prompt}",
        f"- 生成时间：{time.strftime('%Y-%m-%d %H:%M:%S')}",
        "- 边界：只读巡检；未执行任何修复、重启、删除或配置修改。",
        "",
        "## 工具调用结果",
    ]
    for result in results:
        lines.append(f"\n### {result.tool}")
        lines.append(f"- ok: {result.ok}")
        if result.error:
            lines.append(f"- error: {result.error}")
        lines.append("```json")
        lines.append(json.dumps(result.data, ensure_ascii=False, indent=2))
        lines.append("```")

    lines.extend([
        "",
        "## 人工复核建议",
        "- 若发现磁盘使用率高、服务失败或错误日志集中出现，请由运维人员复核后再执行修复。",
        "- 本报告不得作为自动变更依据。",
    ])
    report_path.write_text("\n".join(lines), encoding="utf-8")
    return report_path


def run_mock_agent(prompt: str) -> Path:
    # 第一阶段不要让模型自由选择任意工具；用固定计划验证工具链。
    plan = [
        ("system_snapshot", {}),
        ("recent_errors", {"since": "1 hour ago"}),
        ("safe_list_dir", {"path": "outputs"}),
        ("safe_python_calc", {"code": "values = [412, 1834, 10786]\nprint(round(sum(values) / 1024, 2), 'KB')"}),
    ]
    results = [call_tool(name, args) for name, args in plan]
    return write_report(prompt, results)


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("prompt", help="运维巡检任务描述")
    parser.add_argument("--mock", action="store_true", help="不连接模型，使用固定计划验证工具")
    args = parser.parse_args()

    if not args.mock:
        raise SystemExit("第一版教程只启用 --mock；接入模型前必须先通过工具边界验收。")

    report_path = run_mock_agent(args.prompt)
    print(report_path)


if __name__ == "__main__":
    main()
```

运行：

```bash
python3 ops_agent.py --mock "检查这台机器的磁盘、内存、错误日志，并生成只读巡检报告"
```

预期输出：

```text
/path/to/intranet-ops-agent/outputs/report-20260525-153000.md
```

检查报告：

```bash
sed -n '1,220p' outputs/report-*.md
```

验收标准：

- 命令能生成 `outputs/report-*.md`。
- 报告中包含 `system_snapshot`、`recent_errors`、`safe_list_dir`、`safe_python_calc` 四类结果。
- 没有执行重启、删除、清理、写系统配置等动作。
- `safe_list_dir` 只能读取 `/var/log` 或项目目录，不能读取 `/etc`、`/root`、`/home/其他用户`。

失败处理：

- `journalctl` 不可用：保留错误输出，报告中标记 `ok: false`，不要让模型编造日志。
- `systemctl` 在容器里不可用：这是环境限制，不是 Agent 失败；在验收记录里注明。
- Python 工具无输出：返回 `no output; use print()`，要求调用方重试，而不是把空结果当成功。

## 第二步：验证路径沙箱确实拦截越权读取

执行：

```bash
python3 - <<'PY'
from ops_agent import call_tool

for path in ["outputs", "/var/log", "/etc", "../../etc"]:
    result = call_tool("safe_list_dir", {"path": path})
    print(path, result.ok, result.error)
PY
```

预期输出形态：

```text
outputs True
/var/log True
/etc False access denied: /etc
../../etc False access denied: ../../etc
```

验收标准：

- `/etc` 和 `../../etc` 必须被拒绝。
- 拒绝必须发生在真正读取目录之前。
- 错误信息要清晰，但不要泄露敏感路径内容。

失败处理：

- 如果 `/etc` 被允许，立即停止试点，不要接入模型。
- 检查 `SAFE_BASE_DIRS` 是否误加入了 `/`。
- 检查是否用了字符串拼接代替 `Path.resolve()` 或 `os.path.abspath()`。

## 第三步：验证受限计算工具不会读取文件或导入模块

执行：

```bash
python3 - <<'PY'
from ops_agent import call_tool

tests = {
    "normal_calc": "values=[1,2,3]\nprint(sum(values))",
    "try_import": "import os\nprint(os.listdir('/'))",
    "try_open": "print(open('/etc/passwd').read())",
    "no_print": "sum([1,2,3])",
}

for name, code in tests.items():
    result = call_tool("safe_python_calc", {"code": code})
    print("---", name)
    print("ok:", result.ok)
    print("error:", result.error)
    print("data:", result.data)
PY
```

预期输出形态：

```text
--- normal_calc
ok: True
data: {'output': '6'}

--- try_import
ok: False
error: execution error: __import__ not found

--- try_open
ok: False
error: execution error: name 'open' is not defined

--- no_print
ok: False
error: no output; use print()
```

验收标准：

- 正常数学计算可用。
- `import os` 被拒绝。
- `open('/etc/passwd')` 被拒绝。
- 没有 `print()` 的代码不能算成功。

重要提醒：这只是教学级沙箱。生产环境不能只靠 `exec()` 白名单，应改为独立低权限子进程、容器、seccomp、只读文件系统和资源限制。

## 第四步：定义运维 Agent 的工具契约

第一阶段建议只开放以下工具：

```json
[
  {
    "name": "system_snapshot",
    "description": "采集主机名、运行时间、根分区磁盘和内存使用情况，只读。",
    "args": {}
  },
  {
    "name": "service_status",
    "description": "查询指定 systemd 服务状态，只读，不执行 restart。",
    "args": {"service": "nginx"}
  },
  {
    "name": "recent_errors",
    "description": "查询最近错误日志，只读。",
    "args": {"unit": "nginx", "since": "1 hour ago"}
  },
  {
    "name": "safe_list_dir",
    "description": "浏览允许目录下的文件名和大小，不读取文件内容。",
    "args": {"path": "/var/log"}
  },
  {
    "name": "safe_python_calc",
    "description": "执行受限 Python 计算，用于汇总数字和格式化结果。",
    "args": {"code": "values=[1,2,3]\nprint(sum(values))"}
  }
]
```

禁止第一阶段开放：

```text
shell(command: str)
write_file(path, content)
delete_file(path)
restart_service(name)
kill_process(pid)
execute_sql(sql)
kubectl_delete(resource)
docker_prune()
```

如果确实需要修复类动作，应先让 Agent 生成候选命令和回滚步骤，由人复制执行；不要让模型直接执行。

## 第五步：接入内网模型前的系统提示词模板

接入 Ollama、vLLM 或其他 OpenAI-compatible endpoint 后，建议使用下面的系统提示词。

```text
你是企业内网 Linux 运维只读巡检 Agent。

硬性边界：
1. 你只能调用提供的工具，不得要求执行未注册 shell 命令。
2. 当前阶段只允许只读巡检，不允许修复、删除、重启、清理、发布、回滚或修改配置。
3. 工具失败时必须报告失败原因，不得编造系统状态。
4. 对任何生产变更，只能给出候选命令、风险、回滚建议和人工确认要求。
5. 输出必须包含：发现、证据、风险等级、建议动作、是否需要人工确认。

当你不确定时，先调用只读工具收集证据；证据不足时明确说明“不足以判断”。
```

用户提示词示例：

```text
请检查这台 Linux 主机是否存在明显风险：磁盘、内存、失败服务、最近 1 小时错误日志。只生成报告，不要执行修复。
```

预期输出结构：

```text
## 结论
- 当前风险：低/中/高
- 是否需要人工处理：是/否

## 证据
- 磁盘：...
- 内存：...
- 服务：...
- 错误日志：...

## 建议
- 只读建议：...
- 如需变更：候选命令 + 风险 + 回滚方式 + 人工确认

## 不确定性
- 哪些信息没有采集到
- 哪些判断不能下结论
```

## 三个企业内网试点场景

### 场景 1：Linux 主机每日只读巡检

目标：每天生成一份主机健康报告，不做自动修复。

样例任务：

```text
检查主机磁盘、内存、失败服务和最近 1 小时错误日志，生成运维日报。
```

执行命令：

```bash
python3 ops_agent.py --mock "每日只读巡检：磁盘、内存、错误日志"
```

预期输出：

- `outputs/report-*.md`
- 包含磁盘、内存、错误日志工具结果
- 明确写出“未执行任何修复”

验收标准：

- 连续运行 5 次不报错。
- 报告不包含密钥、Token、业务用户数据。
- Agent 不建议直接执行破坏性命令。
- 工具失败时报告失败，不编造成功状态。

失败处理：

- 如果报告出现“已修复”“已清理”等措辞，调整系统提示词和输出模板。
- 如果日志太长，限制 `journalctl -n` 条数并只保留最近关键错误。

### 场景 2：服务异常初筛

目标：当某个服务异常时，收集状态和错误日志，给人工排查提供证据。

样例任务：

```text
nginx 访问异常，请只读检查 nginx 服务状态和最近错误日志，不要重启服务。
```

工具计划：

```json
[
  {"tool": "service_status", "args": {"service": "nginx"}},
  {"tool": "recent_errors", "args": {"unit": "nginx", "since": "2 hours ago"}}
]
```

预期输出：

```text
风险等级：中
证据：nginx 服务状态输出、最近错误日志摘要
建议：人工确认配置、证书、端口占用；如需重启，先确认影响范围
禁止动作：未执行 systemctl restart
```

验收标准：

- 服务名中包含 `;`、`|`、`&` 等字符时必须拒绝。
- 报告必须区分“服务确实失败”和“systemctl 在当前环境不可用”。
- 不自动重启服务。

失败处理：

- 如果 `systemctl` 不存在，改为记录环境限制，并提示在真实 systemd 主机验证。
- 如果日志为空，不得推断“没有问题”，只能说“未采集到错误日志”。

### 场景 3：目录容量和日志文件初筛

目标：只读取允许目录的文件名和大小，用计算工具汇总容量，辅助定位日志膨胀。

样例任务：

```text
查看 /var/log 下文件大小，计算前 10 个文件总大小，生成风险提示。
```

工具计划：

```json
[
  {"tool": "safe_list_dir", "args": {"path": "/var/log"}},
  {"tool": "safe_python_calc", "args": {"code": "sizes=[1024,2048,4096]\nprint(round(sum(sizes)/1024/1024, 2), 'MB')"}}
]
```

预期输出：

```text
发现：/var/log 可访问，已列出文件名与大小。
计算：前 N 个文件总大小 X MB。
建议：如需清理，必须由人工确认具体文件、保留策略和回滚方案。
```

验收标准：

- 只列文件名、类型、大小，不读取日志正文。
- 不执行 `truncate`、`rm`、`logrotate`。
- 如果 `/var/log` 权限不足，报告权限不足，不绕过权限。

失败处理：

- 如果目录项超过 100 个，截断并提示“只展示前 100 个”。
- 如果需要更精细排序，应新增只读工具，而不是让模型拿 shell 自由执行。

## 生产化安全边界

第一阶段必须遵守：

- 只读工具优先，写操作全部禁用。
- 输出只写项目 `outputs/` 目录。
- 工具参数必须校验，不能拼接成 shell 字符串。
- `subprocess.run()` 必须使用参数列表，禁止 `shell=True`。
- 每个命令必须有超时。
- 每次工具调用必须记录 trace。
- 日志和报告中不得保存密钥、Token、Cookie、客户隐私数据。
- 所有修复建议必须进入人工确认。
- 不允许模型访问任意路径。
- 不允许模型直接读取 `.env`、SSH key、数据库配置、证书私钥。

第二阶段可考虑但仍需审批：

- 允许生成候选修复命令，但不执行。
- 允许创建工单草稿。
- 允许读取白名单配置文件，但要做脱敏。
- 允许对非生产测试机执行有限动作。

第三阶段才讨论：

- 自动执行低风险修复。
- 自动重启非关键服务。
- 与 CMDB、监控、工单系统集成。
- 多主机集中调度。

## 4 周上线计划

### 第 1 周：单机工具链验证

交付物：

- `ops_agent.py`
- 只读巡检报告
- 路径越权测试记录
- 受限计算测试记录

通过标准：

- 工具链可重复运行。
- 越权路径被拒绝。
- 不可用命令被明确报告。
- 未发生任何系统变更。

### 第 2 周：小团队试用

交付物：

- 3 台测试机报告样例
- 常见失败案例清单
- 运维人员反馈

通过标准：

- 报告能减少人工收集证据时间。
- 没有误导性“已修复”表述。
- 没有泄露敏感信息。

### 第 3 周：接入内网模型

交付物：

- 模型 endpoint 配置说明
- 系统提示词
- 工具调用 trace
- 人工复核样例

通过标准：

- 模型只调用白名单工具。
- 工具失败时模型不编造结果。
- 输出结构稳定。

### 第 4 周：制度化与有限推广

交付物：

- 工具白名单
- 禁止动作清单
- 报告模板
- 审计字段规范
- 回滚和停用方案

通过标准：

- 新增工具必须经过评审。
- 高风险动作仍需人工确认。
- 试点范围没有扩大到生产自动变更。

## 验收清单

上线前逐项确认：

- [ ] 所有工具默认只读。
- [ ] 没有 `shell=True`。
- [ ] 所有命令都有超时。
- [ ] 路径访问使用绝对路径解析和白名单前缀校验。
- [ ] 输出只写 `outputs/`。
- [ ] 工具失败不会被包装成成功。
- [ ] 报告包含证据和不确定性。
- [ ] 敏感信息不进入报告和 trace。
- [ ] 修复动作只生成候选建议，不自动执行。
- [ ] 运维人员知道如何停用该 Agent。

## 常见坑

- 一开始就开放通用 shell：风险过高，模型会把“建议”变成“执行”。
- 只做提示词约束，不做工具层约束：提示词不能替代权限控制。
- 把 `exec()` 当生产沙箱：教学可以，生产不够。
- 日志无限制返回给模型：容易泄露敏感信息，也会撑爆上下文。
- 工具返回空字符串：模型容易自信编造，必须明确失败或要求重试。
- 自动清理磁盘：企业环境中最容易误删业务数据，必须人工确认。

## 管理层可读摘要

这个试点不是让 AI 自动接管 Linux 运维，而是让 AI 先成为“只读证据收集与报告助手”。它的价值在于：减少重复巡检、统一报告格式、降低初筛成本；它的边界是：不自动修复、不越权读取、不接触密钥、不修改生产系统。只有在只读阶段稳定、可审计、可回滚后，才讨论更高权限能力。
