用 Python + NumPy 从零实现向量搜索:实操教程

本文基于 KDnuggets 文章《How to Build Vector Search From Scratch in Python》的核心思路,整理成一份可以直接动手运行的中文教程。

目标

做一个本地向量搜索 demo:

先用模拟 embedding 理解原理,最后再替换成真实 embedding。

---

1. 创建项目

mkdir vector-search-demo
cd vector-search-demo
uv init --python 3.12
uv add numpy matplotlib

创建代码目录和文件:

mkdir -p src
touch src/vector_search_demo.py

---

2. 写入完整代码

把下面代码放进 src/vector_search_demo.py

from __future__ import annotations

import numpy as np
import matplotlib.pyplot as plt


def normalize(vectors: np.ndarray) -> np.ndarray:
    """对每一行向量做 L2 归一化。"""
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms = np.where(norms == 0, 1e-10, norms)
    return vectors / norms


class VectorIndex:
    """一个最小向量索引:保存归一化向量,并支持 top-k 搜索。"""

    def __init__(self) -> None:
        self.vectors: np.ndarray | None = None
        self.labels: list[str] | None = None

    def add(self, vectors: np.ndarray, labels: list[str]) -> None:
        if len(vectors) != len(labels):
            raise ValueError("vectors 和 labels 数量必须一致")

        self.vectors = normalize(vectors)
        self.labels = labels

        print(f"Indexed {len(labels)} items with {vectors.shape[1]}-dimensional embeddings.")

    def search(self, query_vector: np.ndarray, top_k: int = 3) -> list[tuple[str, float]]:
        if self.vectors is None or self.labels is None:
            raise RuntimeError("index 为空,请先调用 add()")

        query_norm = normalize(query_vector.reshape(1, -1))

        # 归一化后,余弦相似度 = 点积
        scores = self.vectors @ query_norm.T
        scores = scores.flatten()

        top_indices = np.argsort(scores)[::-1][:top_k]

        return [(self.labels[i], float(scores[i])) for i in top_indices]

    def all_scores(self, query_vector: np.ndarray) -> np.ndarray:
        if self.vectors is None:
            raise RuntimeError("index 为空,请先调用 add()")

        query_norm = normalize(query_vector.reshape(1, -1))
        return (self.vectors @ query_norm.T).flatten()


def build_demo_data() -> tuple[list[str], np.ndarray, dict[str, np.ndarray]]:
    """构造一个模拟电商商品数据集。"""

    np.random.seed(42)

    products = [
        "Wireless noise-cancelling headphones with 30-hour battery",
        "Bluetooth speaker with waterproof design",
        "USB-C hub with 7 ports and power delivery",
        "4K HDMI cable 6ft braided",
        "Mechanical keyboard with RGB backlight",
        "Men's slim-fit chino pants navy blue",
        "Women's merino wool turtleneck sweater",
        "Unisex running jacket lightweight windbreaker",
        "Leather chelsea boots for men",
        "Organic cotton crew neck t-shirt",
        "Solid oak dining table seats 6",
        "Ergonomic mesh office chair lumbar support",
        "Linen sofa 3-seater natural beige",
        "Bamboo bookshelf 5-tier adjustable",
        "Memory foam mattress queen size medium firm",
    ]

    # 三个语义簇:电子产品、服装、家具
    electronics_center = np.array([0.9, 0.1, 0.2, 0.8, 0.1, 0.3, 0.7, 0.2])
    clothing_center = np.array([0.1, 0.8, 0.7, 0.1, 0.9, 0.2, 0.1, 0.8])
    furniture_center = np.array([0.2, 0.3, 0.9, 0.2, 0.1, 0.9, 0.3, 0.1])

    n_per_cluster = 5
    noise = 0.08

    embeddings = np.vstack(
        [
            electronics_center + np.random.randn(n_per_cluster, 8) * noise,
            clothing_center + np.random.randn(n_per_cluster, 8) * noise,
            furniture_center + np.random.randn(n_per_cluster, 8) * noise,
        ]
    )

    centers = {
        "audio equipment": electronics_center,
        "casual wear": clothing_center,
        "home furniture": furniture_center,
    }

    return products, embeddings, centers


def make_query(center: np.ndarray, noise_scale: float = 0.05) -> np.ndarray:
    """用簇中心加一点噪声,模拟查询 embedding。"""
    return center + np.random.randn(8) * noise_scale


def pca_2d(vectors: np.ndarray) -> np.ndarray:
    """用 NumPy 做一个最小 PCA,把高维向量降到 2D。"""
    centered = vectors - vectors.mean(axis=0)

    covariance = np.cov(centered, rowvar=False)
    eigenvalues, eigenvectors = np.linalg.eigh(covariance)

    top_two = eigenvectors[:, np.argsort(eigenvalues)[::-1][:2]]
    return centered @ top_two


def plot_embedding_space(
    products: list[str],
    embeddings: np.ndarray,
    queries: dict[str, np.ndarray],
) -> None:
    """画出 embedding 的二维 PCA 投影。"""

    all_vectors = np.vstack([embeddings, *queries.values()])
    projected = pca_2d(all_vectors)

    product_points = projected[: len(products)]
    query_points = projected[len(products) :]

    colors = ["#4A90D9"] * 5 + ["#E8734A"] * 5 + ["#5BAD72"] * 5

    plt.figure(figsize=(10, 6))

    plt.scatter(
        product_points[:, 0],
        product_points[:, 1],
        c=colors,
        s=90,
        edgecolors="white",
        linewidths=0.7,
        label="Products",
    )

    for i, product in enumerate(products):
        short_label = product[:22] + "..." if len(product) > 22 else product
        plt.annotate(short_label, product_points[i], fontsize=7, alpha=0.75)

    for (query_name, _query_vector), point in zip(queries.items(), query_points):
        plt.scatter(
            point[0],
            point[1],
            marker="*",
            s=250,
            color="gold",
            edgecolors="#333333",
            linewidths=0.8,
        )
        plt.annotate(f"query: {query_name}", point, fontsize=9, color="#333333")

    plt.title("Vector Search Embedding Space - PCA 2D Projection")
    plt.xlabel("PC 1")
    plt.ylabel("PC 2")
    plt.grid(True, linestyle="--", alpha=0.3)
    plt.tight_layout()
    plt.savefig("embedding_space.png", dpi=150)
    print("Saved chart: embedding_space.png")


def plot_score_distribution(
    products: list[str],
    index: VectorIndex,
    query_vector: np.ndarray,
    query_name: str,
) -> None:
    """画出某个查询对所有商品的相似度分布。"""

    scores = index.all_scores(query_vector)
    sorted_indices = np.argsort(scores)[::-1]

    sorted_scores = scores[sorted_indices]
    sorted_labels = [
        products[i][:34] + "..." if len(products[i]) > 34 else products[i]
        for i in sorted_indices
    ]

    plt.figure(figsize=(10, 6))

    bars = plt.barh(
        list(reversed(sorted_labels)),
        list(reversed(sorted_scores)),
        color="#5BAD72",
        edgecolor="white",
    )

    for bar, score in zip(bars, reversed(sorted_scores)):
        plt.text(
            bar.get_width(),
            bar.get_y() + bar.get_height() / 2,
            f" {score:.4f}",
            va="center",
            fontsize=8,
        )

    plt.title(f"Similarity Score Distribution - Query: {query_name}")
    plt.xlabel("Cosine Similarity")
    plt.grid(axis="x", linestyle="--", alpha=0.3)
    plt.tight_layout()
    plt.savefig("score_distribution.png", dpi=150)
    print("Saved chart: score_distribution.png")


def main() -> None:
    products, embeddings, centers = build_demo_data()

    print(f"Embeddings shape: {embeddings.shape}")

    index = VectorIndex()
    index.add(embeddings, products)

    queries = {name: make_query(center) for name, center in centers.items()}

    print("\nSearch results:")
    for query_name, query_vector in queries.items():
        print(f"\nQuery: {query_name}")
        results = index.search(query_vector, top_k=3)

        for rank, (label, score) in enumerate(results, start=1):
            print(f"{rank}. [{score:.4f}] {label}")

    plot_embedding_space(products, embeddings, queries)

    furniture_query = queries["home furniture"]
    plot_score_distribution(
        products=products,
        index=index,
        query_vector=furniture_query,
        query_name="home furniture",
    )


if __name__ == "__main__":
    main()

---

3. 运行 demo

uv run python src/vector_search_demo.py

你应该看到类似输出:

Embeddings shape: (15, 8)
Indexed 15 items with 8-dimensional embeddings.

Search results:

Query: audio equipment
1. [0.9856] Wireless noise-cancelling headphones with 30-hour battery
2. [0.9840] USB-C hub with 7 ports and power delivery
3. [0.9829] Mechanical keyboard with RGB backlight

Query: casual wear
1. [0.9960] Men's slim-fit chino pants navy blue
2. [0.9958] Leather chelsea boots for men
3. [0.9916] Women's merino wool turtleneck sweater

Query: home furniture
1. [0.9929] Bamboo bookshelf 5-tier adjustable
2. [0.9902] Linen sofa 3-seater natural beige
3. [0.9881] Solid oak dining table seats 6

Saved chart: embedding_space.png
Saved chart: score_distribution.png

同时目录下会生成两个图:

embedding_space.png
score_distribution.png

---

4. 理解核心逻辑

这个 demo 的核心只有三步。

第一步:把文本变成向量

真实系统里,一般用 embedding 模型:

文本 → embedding 模型 → 向量

但为了看清原理,教程里先手工模拟了 8 维向量。

例如:

electronics_center = np.array([0.9, 0.1, 0.2, 0.8, 0.1, 0.3, 0.7, 0.2])

然后给它加一点随机噪声,模拟同一类别商品之间的差异。

第二步:归一化向量

代码:

def normalize(vectors: np.ndarray) -> np.ndarray:
    norms = np.linalg.norm(vectors, axis=1, keepdims=True)
    norms = np.where(norms == 0, 1e-10, norms)
    return vectors / norms

为什么要归一化?

因为归一化后:

余弦相似度 = 点积

这样原本要算夹角的问题,就变成一次矩阵乘法:

scores = self.vectors @ query_norm.T

这就是最小版向量搜索的核心。

第三步:排序返回 top-k

代码:

top_indices = np.argsort(scores)[::-1][:top_k]

含义:

---

5. 如何判断检索结果是否可靠

不要只看 top-1。

更实用的做法是看 top-k 的分数分布。

比如:

0.9929
0.9902
0.9881
0.8120
0.8012

如果前三个结果明显高于后面,说明这次查询比较确定。

但如果是:

0.7210
0.7198
0.7182
0.7175
0.7169

说明系统其实没找到明确匹配项,只是在一堆差不多的结果里硬排了个名次。

所以生产系统里通常要加阈值:

def search_with_threshold(
    index: VectorIndex,
    query_vector: np.ndarray,
    top_k: int = 3,
    threshold: float = 0.85,
) -> list[tuple[str, float]]:
    results = index.search(query_vector, top_k=top_k)
    return [(label, score) for label, score in results if score >= threshold]

---

6. 替换成真实 embedding

上面的 demo 用的是模拟向量。真正有用时,要把文本交给 embedding 模型。

可以用 sentence-transformers

安装:

uv add sentence-transformers

新建文件:

touch src/real_embedding_demo.py

写入:

from __future__ import annotations

import numpy as np
from sentence_transformers import SentenceTransformer

from vector_search_demo import VectorIndex


def main() -> None:
    documents = [
        "How to deploy a FastAPI service with Docker",
        "A tutorial about semantic search and vector databases",
        "Family travel planning for Japan",
        "Python guide for data analysis with pandas",
        "How to build a RAG pipeline with local documents",
        "Best practices for unit testing Python projects",
    ]

    model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

    embeddings = model.encode(documents)
    embeddings = np.asarray(embeddings)

    index = VectorIndex()
    index.add(embeddings, documents)

    query = "I want to search documents by meaning, not keywords"
    query_embedding = model.encode(query)
    query_embedding = np.asarray(query_embedding)

    results = index.search(query_embedding, top_k=3)

    print(f"Query: {query}\n")
    for rank, (doc, score) in enumerate(results, start=1):
        print(f"{rank}. [{score:.4f}] {doc}")


if __name__ == "__main__":
    main()

运行:

uv run python src/real_embedding_demo.py

你应该会看到和 “semantic search / vector databases / RAG” 相关的文档排在前面。

---

7. 从 demo 到真实项目

如果要把它变成真正可用的个人知识库检索,可以按这个路线推进。

阶段 1:本地小规模检索

适合几百到几千条文本。

做法:

适合:

阶段 2:加持久化

保存向量:

np.save("embeddings.npy", embeddings)

保存文本:

import json

with open("documents.json", "w", encoding="utf-8") as f:
    json.dump(documents, f, ensure_ascii=False, indent=2)

加载:

embeddings = np.load("embeddings.npy")

with open("documents.json", encoding="utf-8") as f:
    documents = json.load(f)

阶段 3:加分块

如果文档较长,不要整篇文章生成一个 embedding。

应该切成 chunks:

文档 → 分段 → 每段生成 embedding → 检索相关段落

一个简单 chunk 策略:

def chunk_text(text: str, chunk_size: int = 500, overlap: int = 80) -> list[str]:
    chunks = []
    start = 0

    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap

    return chunks

阶段 4:换成向量数据库

当数据量超过几十万条,NumPy 全量检索会变慢。

可以换成:

---

8. 应该记住的最小闭环

向量搜索最小闭环是:

文本
→ embedding
→ 向量归一化
→ 存入索引
→ 查询文本转向量
→ 查询向量归一化
→ 点积 / 余弦相似度
→ 排序
→ 返回 top-k

生产化之后再考虑:

先把这个最小 demo 跑通,再考虑工程化。