# 用 Python + NumPy 从零实现向量搜索：实操教程

本文基于 KDnuggets 文章《How to Build Vector Search From Scratch in Python》的核心思路，整理成一份可以直接动手运行的中文教程。

## 目标

做一个本地向量搜索 demo：

- 输入一批商品文本
- 给每个商品准备一个向量 embedding
- 建立一个简单向量索引
- 输入查询向量
- 用余弦相似度找最相近的商品
- 可视化向量空间和相似度分布

先用模拟 embedding 理解原理，最后再替换成真实 embedding。

---

## 1. 创建项目

```bash
mkdir vector-search-demo
cd vector-search-demo
uv init --python 3.12
uv add numpy matplotlib
```

创建代码目录和文件：

```bash
mkdir -p src
touch src/vector_search_demo.py
```

---

## 2. 写入完整代码

把下面代码放进 `src/vector_search_demo.py`：

```python
from __future__ import annotations

import numpy as np
import matplotlib.pyplot as plt


def normalize(vectors: np.ndarray) -> np.ndarray:
    """对每一行向量做 L2 归一化。"""
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms = np.where(norms == 0, 1e-10, norms)
    return vectors / norms


class VectorIndex:
    """一个最小向量索引：保存归一化向量，并支持 top-k 搜索。"""

    def __init__(self) -> None:
        self.vectors: np.ndarray | None = None
        self.labels: list[str] | None = None

    def add(self, vectors: np.ndarray, labels: list[str]) -> None:
        if len(vectors) != len(labels):
            raise ValueError("vectors 和 labels 数量必须一致")

        self.vectors = normalize(vectors)
        self.labels = labels

        print(f"Indexed {len(labels)} items with {vectors.shape[1]}-dimensional embeddings.")

    def search(self, query_vector: np.ndarray, top_k: int = 3) -> list[tuple[str, float]]:
        if self.vectors is None or self.labels is None:
            raise RuntimeError("index 为空，请先调用 add()")

        query_norm = normalize(query_vector.reshape(1, -1))

        # 归一化后，余弦相似度 = 点积
        scores = self.vectors @ query_norm.T
        scores = scores.flatten()

        top_indices = np.argsort(scores)[::-1][:top_k]

        return [(self.labels[i], float(scores[i])) for i in top_indices]

    def all_scores(self, query_vector: np.ndarray) -> np.ndarray:
        if self.vectors is None:
            raise RuntimeError("index 为空，请先调用 add()")

        query_norm = normalize(query_vector.reshape(1, -1))
        return (self.vectors @ query_norm.T).flatten()


def build_demo_data() -> tuple[list[str], np.ndarray, dict[str, np.ndarray]]:
    """构造一个模拟电商商品数据集。"""

    np.random.seed(42)

    products = [
        "Wireless noise-cancelling headphones with 30-hour battery",
        "Bluetooth speaker with waterproof design",
        "USB-C hub with 7 ports and power delivery",
        "4K HDMI cable 6ft braided",
        "Mechanical keyboard with RGB backlight",
        "Men's slim-fit chino pants navy blue",
        "Women's merino wool turtleneck sweater",
        "Unisex running jacket lightweight windbreaker",
        "Leather chelsea boots for men",
        "Organic cotton crew neck t-shirt",
        "Solid oak dining table seats 6",
        "Ergonomic mesh office chair lumbar support",
        "Linen sofa 3-seater natural beige",
        "Bamboo bookshelf 5-tier adjustable",
        "Memory foam mattress queen size medium firm",
    ]

    # 三个语义簇：电子产品、服装、家具
    electronics_center = np.array([0.9, 0.1, 0.2, 0.8, 0.1, 0.3, 0.7, 0.2])
    clothing_center = np.array([0.1, 0.8, 0.7, 0.1, 0.9, 0.2, 0.1, 0.8])
    furniture_center = np.array([0.2, 0.3, 0.9, 0.2, 0.1, 0.9, 0.3, 0.1])

    n_per_cluster = 5
    noise = 0.08

    embeddings = np.vstack(
        [
            electronics_center + np.random.randn(n_per_cluster, 8) * noise,
            clothing_center + np.random.randn(n_per_cluster, 8) * noise,
            furniture_center + np.random.randn(n_per_cluster, 8) * noise,
        ]
    )

    centers = {
        "audio equipment": electronics_center,
        "casual wear": clothing_center,
        "home furniture": furniture_center,
    }

    return products, embeddings, centers


def make_query(center: np.ndarray, noise_scale: float = 0.05) -> np.ndarray:
    """用簇中心加一点噪声，模拟查询 embedding。"""
    return center + np.random.randn(8) * noise_scale


def pca_2d(vectors: np.ndarray) -> np.ndarray:
    """用 NumPy 做一个最小 PCA，把高维向量降到 2D。"""
    centered = vectors - vectors.mean(axis=0)

    covariance = np.cov(centered, rowvar=False)
    eigenvalues, eigenvectors = np.linalg.eigh(covariance)

    top_two = eigenvectors[:, np.argsort(eigenvalues)[::-1][:2]]
    return centered @ top_two


def plot_embedding_space(
    products: list[str],
    embeddings: np.ndarray,
    queries: dict[str, np.ndarray],
) -> None:
    """画出 embedding 的二维 PCA 投影。"""

    all_vectors = np.vstack([embeddings, *queries.values()])
    projected = pca_2d(all_vectors)

    product_points = projected[: len(products)]
    query_points = projected[len(products) :]

    colors = ["#4A90D9"] * 5 + ["#E8734A"] * 5 + ["#5BAD72"] * 5

    plt.figure(figsize=(10, 6))

    plt.scatter(
        product_points[:, 0],
        product_points[:, 1],
        c=colors,
        s=90,
        edgecolors="white",
        linewidths=0.7,
        label="Products",
    )

    for i, product in enumerate(products):
        short_label = product[:22] + "..." if len(product) > 22 else product
        plt.annotate(short_label, product_points[i], fontsize=7, alpha=0.75)

    for (query_name, _query_vector), point in zip(queries.items(), query_points):
        plt.scatter(
            point[0],
            point[1],
            marker="*",
            s=250,
            color="gold",
            edgecolors="#333333",
            linewidths=0.8,
        )
        plt.annotate(f"query: {query_name}", point, fontsize=9, color="#333333")

    plt.title("Vector Search Embedding Space - PCA 2D Projection")
    plt.xlabel("PC 1")
    plt.ylabel("PC 2")
    plt.grid(True, linestyle="--", alpha=0.3)
    plt.tight_layout()
    plt.savefig("embedding_space.png", dpi=150)
    print("Saved chart: embedding_space.png")


def plot_score_distribution(
    products: list[str],
    index: VectorIndex,
    query_vector: np.ndarray,
    query_name: str,
) -> None:
    """画出某个查询对所有商品的相似度分布。"""

    scores = index.all_scores(query_vector)
    sorted_indices = np.argsort(scores)[::-1]

    sorted_scores = scores[sorted_indices]
    sorted_labels = [
        products[i][:34] + "..." if len(products[i]) > 34 else products[i]
        for i in sorted_indices
    ]

    plt.figure(figsize=(10, 6))

    bars = plt.barh(
        list(reversed(sorted_labels)),
        list(reversed(sorted_scores)),
        color="#5BAD72",
        edgecolor="white",
    )

    for bar, score in zip(bars, reversed(sorted_scores)):
        plt.text(
            bar.get_width(),
            bar.get_y() + bar.get_height() / 2,
            f" {score:.4f}",
            va="center",
            fontsize=8,
        )

    plt.title(f"Similarity Score Distribution - Query: {query_name}")
    plt.xlabel("Cosine Similarity")
    plt.grid(axis="x", linestyle="--", alpha=0.3)
    plt.tight_layout()
    plt.savefig("score_distribution.png", dpi=150)
    print("Saved chart: score_distribution.png")


def main() -> None:
    products, embeddings, centers = build_demo_data()

    print(f"Embeddings shape: {embeddings.shape}")

    index = VectorIndex()
    index.add(embeddings, products)

    queries = {name: make_query(center) for name, center in centers.items()}

    print("\nSearch results:")
    for query_name, query_vector in queries.items():
        print(f"\nQuery: {query_name}")
        results = index.search(query_vector, top_k=3)

        for rank, (label, score) in enumerate(results, start=1):
            print(f"{rank}. [{score:.4f}] {label}")

    plot_embedding_space(products, embeddings, queries)

    furniture_query = queries["home furniture"]
    plot_score_distribution(
        products=products,
        index=index,
        query_vector=furniture_query,
        query_name="home furniture",
    )


if __name__ == "__main__":
    main()
```

---

## 3. 运行 demo

```bash
uv run python src/vector_search_demo.py
```

你应该看到类似输出：

```text
Embeddings shape: (15, 8)
Indexed 15 items with 8-dimensional embeddings.

Search results:

Query: audio equipment
1. [0.9856] Wireless noise-cancelling headphones with 30-hour battery
2. [0.9840] USB-C hub with 7 ports and power delivery
3. [0.9829] Mechanical keyboard with RGB backlight

Query: casual wear
1. [0.9960] Men's slim-fit chino pants navy blue
2. [0.9958] Leather chelsea boots for men
3. [0.9916] Women's merino wool turtleneck sweater

Query: home furniture
1. [0.9929] Bamboo bookshelf 5-tier adjustable
2. [0.9902] Linen sofa 3-seater natural beige
3. [0.9881] Solid oak dining table seats 6

Saved chart: embedding_space.png
Saved chart: score_distribution.png
```

同时目录下会生成两个图：

```text
embedding_space.png
score_distribution.png
```

---

## 4. 理解核心逻辑

这个 demo 的核心只有三步。

### 第一步：把文本变成向量

真实系统里，一般用 embedding 模型：

```text
文本 → embedding 模型 → 向量
```

但为了看清原理，教程里先手工模拟了 8 维向量。

例如：

```python
electronics_center = np.array([0.9, 0.1, 0.2, 0.8, 0.1, 0.3, 0.7, 0.2])
```

然后给它加一点随机噪声，模拟同一类别商品之间的差异。

### 第二步：归一化向量

代码：

```python
def normalize(vectors: np.ndarray) -> np.ndarray:
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms = np.where(norms == 0, 1e-10, norms)
    return vectors / norms
```

为什么要归一化？

因为归一化后：

```text
余弦相似度 = 点积
```

这样原本要算夹角的问题，就变成一次矩阵乘法：

```python
scores = self.vectors @ query_norm.T
```

这就是最小版向量搜索的核心。

### 第三步：排序返回 top-k

代码：

```python
top_indices = np.argsort(scores)[::-1][:top_k]
```

含义：

- `scores` 是查询向量和所有商品向量的相似度
- 分数越高，语义越接近
- 排序后取前 `top_k` 个结果

---

## 5. 如何判断检索结果是否可靠

不要只看 top-1。

更实用的做法是看 top-k 的分数分布。

比如：

```text
0.9929
0.9902
0.9881
0.8120
0.8012
```

如果前三个结果明显高于后面，说明这次查询比较确定。

但如果是：

```text
0.7210
0.7198
0.7182
0.7175
0.7169
```

说明系统其实没找到明确匹配项，只是在一堆差不多的结果里硬排了个名次。

所以生产系统里通常要加阈值：

```python
def search_with_threshold(
    index: VectorIndex,
    query_vector: np.ndarray,
    top_k: int = 3,
    threshold: float = 0.85,
) -> list[tuple[str, float]]:
    results = index.search(query_vector, top_k=top_k)
    return [(label, score) for label, score in results if score >= threshold]
```

---

## 6. 替换成真实 embedding

上面的 demo 用的是模拟向量。真正有用时，要把文本交给 embedding 模型。

可以用 `sentence-transformers`。

安装：

```bash
uv add sentence-transformers
```

新建文件：

```bash
touch src/real_embedding_demo.py
```

写入：

```python
from __future__ import annotations

import numpy as np
from sentence_transformers import SentenceTransformer

from vector_search_demo import VectorIndex


def main() -> None:
    documents = [
        "How to deploy a FastAPI service with Docker",
        "A tutorial about semantic search and vector databases",
        "Family travel planning for Japan",
        "Python guide for data analysis with pandas",
        "How to build a RAG pipeline with local documents",
        "Best practices for unit testing Python projects",
    ]

    model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

    embeddings = model.encode(documents)
    embeddings = np.asarray(embeddings)

    index = VectorIndex()
    index.add(embeddings, documents)

    query = "I want to search documents by meaning, not keywords"
    query_embedding = model.encode(query)
    query_embedding = np.asarray(query_embedding)

    results = index.search(query_embedding, top_k=3)

    print(f"Query: {query}\n")
    for rank, (doc, score) in enumerate(results, start=1):
        print(f"{rank}. [{score:.4f}] {doc}")


if __name__ == "__main__":
    main()
```

运行：

```bash
uv run python src/real_embedding_demo.py
```

你应该会看到和 “semantic search / vector databases / RAG” 相关的文档排在前面。

---

## 7. 从 demo 到真实项目

如果要把它变成真正可用的个人知识库检索，可以按这个路线推进。

### 阶段 1：本地小规模检索

适合几百到几千条文本。

做法：

- 用 `sentence-transformers` 生成 embedding
- 用 NumPy 保存向量
- 用 JSON 保存原始文本和 metadata
- 查询时全量矩阵乘法

适合：

- 个人笔记
- 小型文档集
- demo / prototype

### 阶段 2：加持久化

保存向量：

```python
np.save("embeddings.npy", embeddings)
```

保存文本：

```python
import json

with open("documents.json", "w", encoding="utf-8") as f:
    json.dump(documents, f, ensure_ascii=False, indent=2)
```

加载：

```python
embeddings = np.load("embeddings.npy")

with open("documents.json", encoding="utf-8") as f:
    documents = json.load(f)
```

### 阶段 3：加分块

如果文档较长，不要整篇文章生成一个 embedding。

应该切成 chunks：

```text
文档 → 分段 → 每段生成 embedding → 检索相关段落
```

一个简单 chunk 策略：

```python
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 80) -> list[str]:
    chunks = []
    start = 0

    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap

    return chunks
```

### 阶段 4：换成向量数据库

当数据量超过几十万条，NumPy 全量检索会变慢。

可以换成：

- `faiss`：本地高性能向量检索
- `hnswlib`：轻量近似最近邻
- `pgvector`：PostgreSQL 里做向量检索
- `Qdrant`：独立向量数据库
- `Milvus`：大规模向量数据库

---

## 8. 应该记住的最小闭环

向量搜索最小闭环是：

```text
文本
→ embedding
→ 向量归一化
→ 存入索引
→ 查询文本转向量
→ 查询向量归一化
→ 点积 / 余弦相似度
→ 排序
→ 返回 top-k
```

生产化之后再考虑：

- embedding 模型质量
- chunk 策略
- metadata 过滤
- 相似度阈值
- rerank
- 向量数据库
- RAG prompt 拼接
- 召回评测集

先把这个最小 demo 跑通，再考虑工程化。
