企业内网 Linux 运维 AI Agent Tool 落地教程:从只读巡检工具开始

改写后的请求

基于 KDnuggets 文章《Easy Agentic Tool Calling with Gemma 4》,提炼一份适合企业内网 Linux 系统运维团队试点的 AI Agent Tool 落地教程。教程需要包含:最小架构、工具边界、安全控制、可复制的示例代码、样例输入、预期输出、验收标准、失败处理和分阶段上线计划。

改写点

来源与边界

一句话结论

企业内网运维 AI Agent 不应从“自动修复服务器”开始,而应从“只读巡检工具 + 受限计算工具 + 人工确认报告”开始,先证明它能可靠观察系统、正确调用工具、清楚暴露风险,再逐步扩大权限。

适用场景

适合:

不适合第一阶段直接做:

最小可落地架构

运维人员
  |
  |  输入自然语言任务:
  |  “检查这台机器磁盘、失败服务和最近错误日志,生成报告”
  v
AI Agent 编排层
  |
  |-- Tool 1: system_snapshot      只读系统快照
  |-- Tool 2: service_status       只查服务状态
  |-- Tool 3: recent_errors        只读最近错误日志
  |-- Tool 4: safe_list_dir        限定目录浏览
  |-- Tool 5: safe_python_calc     受限计算/格式化
  |
  v
本地/内网模型服务
  - Ollama / vLLM / llama.cpp / OpenAI-compatible endpoint
  - 第一阶段可用 mock 模式验证工具链,不依赖真实模型

输出目录
  ./outputs/
    report-YYYYmmdd-HHMMSS.md
    trace-YYYYmmdd-HHMMSS.jsonl

审计边界
  - 工具调用参数
  - 工具返回摘要
  - 拒绝原因
  - 报告版本

核心原则:

试点目录结构

mkdir -p intranet-ops-agent/{tools,outputs,samples,logs}
cd intranet-ops-agent

建议结构:

intranet-ops-agent/
├── ops_agent.py              # 最小 Agent 与工具实现
├── samples/
│   ├── incident_prompt.txt   # 样例任务
│   └── allowed_paths.txt     # 允许浏览路径说明
├── outputs/                  # 只允许写报告和 trace
├── logs/                     # 本工具自己的日志,不写系统日志
└── README.md

第一步:实现只读工具,不接模型也能跑

下面代码只使用 Python 标准库,适合先在内网测试机验证工具边界。

保存为 ops_agent.py

#!/usr/bin/env python3
from __future__ import annotations

import argparse
import contextlib
import io
import json
import math
import os
import statistics
import subprocess
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Callable

BASE_DIR = Path(__file__).resolve().parent
OUTPUT_DIR = BASE_DIR / "outputs"
SAFE_BASE_DIRS = [Path("/var/log").resolve(), BASE_DIR.resolve()]
COMMAND_TIMEOUT = 8


@dataclass
class ToolResult:
    ok: bool
    tool: str
    data: dict[str, Any]
    error: str = ""


def run_command(argv: list[str]) -> tuple[bool, str]:
    try:
        completed = subprocess.run(
            argv,
            check=False,
            capture_output=True,
            text=True,
            timeout=COMMAND_TIMEOUT,
        )
    except subprocess.TimeoutExpired:
        return False, f"command timeout after {COMMAND_TIMEOUT}s"
    except FileNotFoundError:
        return False, f"command not found: {argv[0]}"

    output = (completed.stdout + "\n" + completed.stderr).strip()
    return completed.returncode == 0, output[-6000:]


def system_snapshot(_: dict[str, Any]) -> ToolResult:
    commands = {
        "hostname": ["hostname"],
        "uptime": ["uptime"],
        "disk": ["df", "-hT", "/"],
        "memory": ["free", "-h"],
    }
    data: dict[str, Any] = {}
    for name, argv in commands.items():
        ok, output = run_command(argv)
        data[name] = {"ok": ok, "output": output}
    return ToolResult(ok=True, tool="system_snapshot", data=data)


def service_status(args: dict[str, Any]) -> ToolResult:
    service = str(args.get("service", "")).strip()
    if not service or any(ch in service for ch in " ;|&`$><"):
        return ToolResult(False, "service_status", {}, "invalid service name")

    ok, output = run_command(["systemctl", "status", service, "--no-pager", "--lines", "20"])
    return ToolResult(ok=ok, tool="service_status", data={"service": service, "status": output})


def recent_errors(args: dict[str, Any]) -> ToolResult:
    unit = str(args.get("unit", "")).strip()
    since = str(args.get("since", "1 hour ago")).strip()

    argv = ["journalctl", "-p", "3", "--since", since, "--no-pager", "-n", "80"]
    if unit:
        if any(ch in unit for ch in " ;|&`$><"):
            return ToolResult(False, "recent_errors", {}, "invalid unit name")
        argv.extend(["-u", unit])

    ok, output = run_command(argv)
    return ToolResult(ok=ok, tool="recent_errors", data={"unit": unit or "all", "errors": output})


def is_allowed_path(path: Path) -> bool:
    try:
        requested = path.resolve()
    except FileNotFoundError:
        requested = path.parent.resolve() / path.name

    for base in SAFE_BASE_DIRS:
        if requested == base or str(requested).startswith(str(base) + os.sep):
            return True
    return False


def safe_list_dir(args: dict[str, Any]) -> ToolResult:
    raw_path = str(args.get("path", ".")).strip() or "."
    path = Path(raw_path)
    if not path.is_absolute():
        path = BASE_DIR / path

    if not is_allowed_path(path):
        return ToolResult(False, "safe_list_dir", {}, f"access denied: {raw_path}")
    if not path.exists() or not path.is_dir():
        return ToolResult(False, "safe_list_dir", {}, f"not a directory: {raw_path}")

    entries = []
    for item in sorted(path.iterdir())[:100]:
        stat = item.stat()
        entries.append({
            "name": item.name,
            "type": "dir" if item.is_dir() else "file",
            "size": stat.st_size,
        })
    return ToolResult(True, "safe_list_dir", {"path": str(path), "entries": entries})


def safe_python_calc(args: dict[str, Any]) -> ToolResult:
    code = str(args.get("code", ""))
    if not code.strip():
        return ToolResult(False, "safe_python_calc", {}, "empty code")
    if len(code) > 2000:
        return ToolResult(False, "safe_python_calc", {}, "code too long")

    allowed_builtins = {
        "abs": abs, "all": all, "any": any, "bool": bool, "dict": dict,
        "divmod": divmod, "enumerate": enumerate, "filter": filter,
        "float": float, "int": int, "len": len, "list": list, "map": map,
        "max": max, "min": min, "pow": pow, "print": print, "range": range,
        "repr": repr, "reversed": reversed, "round": round, "set": set,
        "sorted": sorted, "str": str, "sum": sum, "tuple": tuple, "zip": zip,
    }
    globals_dict = {
        "__builtins__": allowed_builtins,
        "math": math,
        "statistics": statistics,
    }
    stdout = io.StringIO()
    try:
        with contextlib.redirect_stdout(stdout):
            exec(code, globals_dict, {})
    except Exception as exc:
        return ToolResult(False, "safe_python_calc", {}, f"execution error: {exc}")

    output = stdout.getvalue().strip()
    if not output:
        return ToolResult(False, "safe_python_calc", {}, "no output; use print()")
    return ToolResult(True, "safe_python_calc", {"output": output})


TOOLS: dict[str, Callable[[dict[str, Any]], ToolResult]] = {
    "system_snapshot": system_snapshot,
    "service_status": service_status,
    "recent_errors": recent_errors,
    "safe_list_dir": safe_list_dir,
    "safe_python_calc": safe_python_calc,
}


def call_tool(name: str, args: dict[str, Any]) -> ToolResult:
    tool = TOOLS.get(name)
    if tool is None:
        return ToolResult(False, name, {}, "unknown tool")
    return tool(args)


def write_report(prompt: str, results: list[ToolResult]) -> Path:
    OUTPUT_DIR.mkdir(exist_ok=True)
    report_path = OUTPUT_DIR / f"report-{time.strftime('%Y%m%d-%H%M%S')}.md"
    lines = [
        "# Linux 运维 AI Agent 只读巡检报告",
        "",
        f"- 任务:{prompt}",
        f"- 生成时间:{time.strftime('%Y-%m-%d %H:%M:%S')}",
        "- 边界:只读巡检;未执行任何修复、重启、删除或配置修改。",
        "",
        "## 工具调用结果",
    ]
    for result in results:
        lines.append(f"\n### {result.tool}")
        lines.append(f"- ok: {result.ok}")
        if result.error:
            lines.append(f"- error: {result.error}")
        lines.append("```json")
        lines.append(json.dumps(result.data, ensure_ascii=False, indent=2))
        lines.append("```")

    lines.extend([
        "",
        "## 人工复核建议",
        "- 若发现磁盘使用率高、服务失败或错误日志集中出现,请由运维人员复核后再执行修复。",
        "- 本报告不得作为自动变更依据。",
    ])
    report_path.write_text("\n".join(lines), encoding="utf-8")
    return report_path


def run_mock_agent(prompt: str) -> Path:
    # 第一阶段不要让模型自由选择任意工具;用固定计划验证工具链。
    plan = [
        ("system_snapshot", {}),
        ("recent_errors", {"since": "1 hour ago"}),
        ("safe_list_dir", {"path": "outputs"}),
        ("safe_python_calc", {"code": "values = [412, 1834, 10786]\nprint(round(sum(values) / 1024, 2), 'KB')"}),
    ]
    results = [call_tool(name, args) for name, args in plan]
    return write_report(prompt, results)


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument("prompt", help="运维巡检任务描述")
    parser.add_argument("--mock", action="store_true", help="不连接模型,使用固定计划验证工具")
    args = parser.parse_args()

    if not args.mock:
        raise SystemExit("第一版教程只启用 --mock;接入模型前必须先通过工具边界验收。")

    report_path = run_mock_agent(args.prompt)
    print(report_path)


if __name__ == "__main__":
    main()

运行:

python3 ops_agent.py --mock "检查这台机器的磁盘、内存、错误日志,并生成只读巡检报告"

预期输出:

/path/to/intranet-ops-agent/outputs/report-20260525-153000.md

检查报告:

sed -n '1,220p' outputs/report-*.md

验收标准:

失败处理:

第二步:验证路径沙箱确实拦截越权读取

执行:

python3 - <<'PY'
from ops_agent import call_tool

for path in ["outputs", "/var/log", "/etc", "../../etc"]:
    result = call_tool("safe_list_dir", {"path": path})
    print(path, result.ok, result.error)
PY

预期输出形态:

outputs True
/var/log True
/etc False access denied: /etc
../../etc False access denied: ../../etc

验收标准:

失败处理:

第三步:验证受限计算工具不会读取文件或导入模块

执行:

python3 - <<'PY'
from ops_agent import call_tool

tests = {
    "normal_calc": "values=[1,2,3]\nprint(sum(values))",
    "try_import": "import os\nprint(os.listdir('/'))",
    "try_open": "print(open('/etc/passwd').read())",
    "no_print": "sum([1,2,3])",
}

for name, code in tests.items():
    result = call_tool("safe_python_calc", {"code": code})
    print("---", name)
    print("ok:", result.ok)
    print("error:", result.error)
    print("data:", result.data)
PY

预期输出形态:

--- normal_calc
ok: True
data: {'output': '6'}

--- try_import
ok: False
error: execution error: __import__ not found

--- try_open
ok: False
error: execution error: name 'open' is not defined

--- no_print
ok: False
error: no output; use print()

验收标准:

重要提醒:这只是教学级沙箱。生产环境不能只靠 exec() 白名单,应改为独立低权限子进程、容器、seccomp、只读文件系统和资源限制。

第四步:定义运维 Agent 的工具契约

第一阶段建议只开放以下工具:

[
  {
    "name": "system_snapshot",
    "description": "采集主机名、运行时间、根分区磁盘和内存使用情况,只读。",
    "args": {}
  },
  {
    "name": "service_status",
    "description": "查询指定 systemd 服务状态,只读,不执行 restart。",
    "args": {"service": "nginx"}
  },
  {
    "name": "recent_errors",
    "description": "查询最近错误日志,只读。",
    "args": {"unit": "nginx", "since": "1 hour ago"}
  },
  {
    "name": "safe_list_dir",
    "description": "浏览允许目录下的文件名和大小,不读取文件内容。",
    "args": {"path": "/var/log"}
  },
  {
    "name": "safe_python_calc",
    "description": "执行受限 Python 计算,用于汇总数字和格式化结果。",
    "args": {"code": "values=[1,2,3]\nprint(sum(values))"}
  }
]

禁止第一阶段开放:

shell(command: str)
write_file(path, content)
delete_file(path)
restart_service(name)
kill_process(pid)
execute_sql(sql)
kubectl_delete(resource)
docker_prune()

如果确实需要修复类动作,应先让 Agent 生成候选命令和回滚步骤,由人复制执行;不要让模型直接执行。

第五步:接入内网模型前的系统提示词模板

接入 Ollama、vLLM 或其他 OpenAI-compatible endpoint 后,建议使用下面的系统提示词。

你是企业内网 Linux 运维只读巡检 Agent。

硬性边界:
1. 你只能调用提供的工具,不得要求执行未注册 shell 命令。
2. 当前阶段只允许只读巡检,不允许修复、删除、重启、清理、发布、回滚或修改配置。
3. 工具失败时必须报告失败原因,不得编造系统状态。
4. 对任何生产变更,只能给出候选命令、风险、回滚建议和人工确认要求。
5. 输出必须包含:发现、证据、风险等级、建议动作、是否需要人工确认。

当你不确定时,先调用只读工具收集证据;证据不足时明确说明“不足以判断”。

用户提示词示例:

请检查这台 Linux 主机是否存在明显风险:磁盘、内存、失败服务、最近 1 小时错误日志。只生成报告,不要执行修复。

预期输出结构:

## 结论
- 当前风险:低/中/高
- 是否需要人工处理:是/否

## 证据
- 磁盘:...
- 内存:...
- 服务:...
- 错误日志:...

## 建议
- 只读建议:...
- 如需变更:候选命令 + 风险 + 回滚方式 + 人工确认

## 不确定性
- 哪些信息没有采集到
- 哪些判断不能下结论

三个企业内网试点场景

场景 1:Linux 主机每日只读巡检

目标:每天生成一份主机健康报告,不做自动修复。

样例任务:

检查主机磁盘、内存、失败服务和最近 1 小时错误日志,生成运维日报。

执行命令:

python3 ops_agent.py --mock "每日只读巡检:磁盘、内存、错误日志"

预期输出:

验收标准:

失败处理:

场景 2:服务异常初筛

目标:当某个服务异常时,收集状态和错误日志,给人工排查提供证据。

样例任务:

nginx 访问异常,请只读检查 nginx 服务状态和最近错误日志,不要重启服务。

工具计划:

[
  {"tool": "service_status", "args": {"service": "nginx"}},
  {"tool": "recent_errors", "args": {"unit": "nginx", "since": "2 hours ago"}}
]

预期输出:

风险等级:中
证据:nginx 服务状态输出、最近错误日志摘要
建议:人工确认配置、证书、端口占用;如需重启,先确认影响范围
禁止动作:未执行 systemctl restart

验收标准:

失败处理:

场景 3:目录容量和日志文件初筛

目标:只读取允许目录的文件名和大小,用计算工具汇总容量,辅助定位日志膨胀。

样例任务:

查看 /var/log 下文件大小,计算前 10 个文件总大小,生成风险提示。

工具计划:

[
  {"tool": "safe_list_dir", "args": {"path": "/var/log"}},
  {"tool": "safe_python_calc", "args": {"code": "sizes=[1024,2048,4096]\nprint(round(sum(sizes)/1024/1024, 2), 'MB')"}}
]

预期输出:

发现:/var/log 可访问,已列出文件名与大小。
计算:前 N 个文件总大小 X MB。
建议:如需清理,必须由人工确认具体文件、保留策略和回滚方案。

验收标准:

失败处理:

生产化安全边界

第一阶段必须遵守:

第二阶段可考虑但仍需审批:

第三阶段才讨论:

4 周上线计划

第 1 周:单机工具链验证

交付物:

通过标准:

第 2 周:小团队试用

交付物:

通过标准:

第 3 周:接入内网模型

交付物:

通过标准:

第 4 周:制度化与有限推广

交付物:

通过标准:

验收清单

上线前逐项确认:

常见坑

管理层可读摘要

这个试点不是让 AI 自动接管 Linux 运维,而是让 AI 先成为“只读证据收集与报告助手”。它的价值在于:减少重复巡检、统一报告格式、降低初筛成本;它的边界是:不自动修复、不越权读取、不接触密钥、不修改生产系统。只有在只读阶段稳定、可审计、可回滚后,才讨论更高权限能力。