RAG架构和向量数据库的原始文件存储

RAG架构

RAG 有三种主流架构:

架构描述
2-Step RAG先检索,再生成
AgenticRAG 使用 Agent 控制检索的时机与方式
Hybrid RAG在 Agentic RAG 的基础上,增加用户 query 改写,确认召回文本的相关性等步骤

一定要看官方文档

2-Step RAG

在两步 RAG 中,检索步骤总是在生成步骤之前执行。这种架构简单且可预测,适用于许多应用,其中检索相关文档是生成答案的明确前提条件。

装饰器的写法:

from langchain.agents.middleware import dynamic_prompt, ModelRequest

@dynamic_prompt
def prompt_with_context(request: ModelRequest) -> str:
    """Inject context into state messages."""
    last_query = request.state["messages"][-1].text
    retrieved_docs = vector_store.similarity_search(last_query)

    docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)

    system_message = (
        "You are a helpful assistant. Use the following context in your response:"
        f"\n\n{docs_content}"
    )

    return system_message


agent = create_agent(model, tools=[], middleware=[prompt_with_context])

使用

query = "What is task decomposition?"
for step in agent.stream(
    {"messages": [{"role": "user", "content": query}]},
    stream_mode="values",
):
    step["messages"][-1].pretty_print()

注意一下里面的stream_modevalues就等于普通agent invoke的result了。 输出:

================================ Human Message =================================

What is task decomposition?
================================== Ai Message ==================================

Task decomposition is...

新语法,可以进行流程控制了:

from typing import Any
from langchain_core.documents import Document
from langchain.agents.middleware import AgentMiddleware, AgentState


class State(AgentState):
    context: list[Document]


class RetrieveDocumentsMiddleware(AgentMiddleware[State]):
    state_schema = State

    def before_model(self, state: AgentState) -> dict[str, Any] | None:
        last_message = state["messages"][-1]
        retrieved_docs = vector_store.similarity_search(last_message.text)

        docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)

        augmented_message_content = (
            f"{last_message.text}\n\n"
            "Use the following context to answer the query:\n"
            f"{docs_content}"
        )
        return {
            "messages": [last_message.model_copy(update={"content": augmented_message_content})],
            "context": retrieved_docs,
        }


agent = create_agent(
    model,
    tools=[],
    middleware=[RetrieveDocumentsMiddleware()],
)

首先,不关注这个语法(用类而不是装饰器了),它送入的是实例,支持构造函数,还能改写state(增加了原文context到state中,显然可以传递到后面的流程去了)

Agentic RAG

代理式检索增强生成(RAG)结合了检索增强生成和基于代理的推理的优势。它不是在回答之前检索文档,而是一个代理(由 LLM 驱动)逐步推理,并在交互过程中决定何时以及如何检索信息。

Hybrid RAG

混合式 RAG 结合了 2-Step RAG 和 Agentic RAG 的特点。它引入了查询预处理、检索验证和生成后检查等中间步骤。这些系统比固定流程提供了更高的灵活性,同时仍保持对执行过程的控制。

注意 在官方教程中,使用init_chat_model比较多,它只代表一次对话,而三方教程基本上都是无脑create_agent,显然官网的更权威,它更能体验流程自控的思路,特别是建立graph的时候,每一个节点如果是一个agent,那将有太多不可控的。

备注一个知识眯,自行构建messages,下面的例子模拟了一次对话,一个tool_calls回应,及一次tools调用:

from langchain_core.messages import convert_to_messages

input = {
    "messages": convert_to_messages(
        [
            {
                "role": "user",
                "content": "What does Lilian Weng say about types of reward hacking?",
            },
            {
                "role": "assistant",
                "content": "",
                "tool_calls": [
                    {
                        "id": "1",
                        "name": "retrieve_blog_posts",
                        "args": {"query": "types of reward hacking"},
                    }
                ],
            },
            {"role": "tool", "content": "meow", "tool_call_id": "1"},
        ]
    )
}
grade_documents(input)

简单来说, 就是:

  • user: 我说了句话
  • assistant: 我发现可以调这个工具,参数是blabla
  • tools: 这是结果

教程里大量这么用,其实相当于是“单元测试”,即写了个逻辑后,模拟一段上下文迅速验证一下,这是非常值得借鉴的一种方式,特别是你写的是一个个的小节点,就不需要串起来才能联调了。

tools_condition和ToolNode

看这个节点:

from langgraph.graph import MessagesState
from langchain.chat_models import init_chat_model

response_model = init_chat_model("gpt-4.1", temperature=0)


def generate_query_or_respond(state: MessagesState):
    """Call the model to generate a response based on the current state. Given
    the question, it will decide to retrieve using the retriever tool, or simply respond to the user.
    """
    response = (
        response_model
        .bind_tools([retriever_tool]).invoke(state["messages"])
    )
    return {"messages": [response]}

为一个chatmodel达到了一个查向量库的工具,在制图的代码里:

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition

workflow = StateGraph(MessagesState)

# Define the nodes we will cycle between
workflow.add_node(generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node(rewrite_question)
workflow.add_node(generate_answer)

workflow.add_edge(START, "generate_query_or_respond")

# Decide whether to retrieve
workflow.add_conditional_edges(
    "generate_query_or_respond",
    # Assess LLM decision (call `retriever_tool` tool or respond to the user)
    tools_condition,
    {
        # Translate the condition outputs to nodes in our graph
        "tools": "retrieve",
        END: END,
    },
)

# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
    "retrieve",
    # Assess agent decision
    grade_documents,
)
workflow.add_edge("generate_answer", END)
workflow.add_edge("rewrite_question", "generate_query_or_respond")

# Compile
graph = workflow.compile()

问题:

  1. 为什么generate_query_or_respond成了一个条件边节点?
  2. 为什么retriever_tool成了ToolNode

这同样源于其它教程太懒,直接用agent来做节点,反而把基础给模糊了。大模型的本质,是一问一答,不要被bind_tools给误导了,bind了tools就能调工具,那已经是一个agent了。

  • 所以,事实上一个bind了tools的chat model,只能要么直接答复,要么给你一个tool_calls的消息。
  • 所以这种节点,必然要对接一个条件边
    • 从示例中可以看出,这个条件边叫:从我到tools_call,出路是(end和tools),这都是隐性的。
  • 也所以,对于返回了tool_calls的情况,就必须手动用ToolNode来处理,不然你的图就少了环节。

这种例子才是真正的手写代理。

文件存储

无论是向量检索还是分词检索,它们的核心使命都是定位。定位的结果就是一个 ID(像是一个门牌号)。至于这个门牌号后面住着谁(原文)、住在哪个小区(原文件路径),则由数据库的存储模块负责。

现代数据库设计的命脉:“索引归索引,存储归存储”。这种“先找 ID,再取内容”的策略,在计算机科学中被称为 “二级索引(Secondary Index)” 架构。

文件存储数据库

每一个成熟的向量数据库(如 Chroma都会内置一个“小型关系数据库”。这个数据库(通常是隐藏的 SQLite 或 Parquet)存了一张大表:

文档 ID (Key)原文内容 (Content)原文件物理路径 (Source Path)其他属性 (Metadata)
001"当前的 Agent 能力..."/Users/data/blog.pdfpage: 5, author: LC

结论:检索是基于 ID 的,但 ID 指向了一个包含 路径原文 的结构化对象。并不是“基于路径检索”,而是检索到 ID 后,反查出了路径。

检索器只能查出ID

  • 倒排索引告诉你:词“Agent”出现在 ID: 001
  • 向量索引告诉你:你的问题与 ID: 001 最像。 此时,系统只拿到了一个逻辑符号(ID)。

串起来

市面上大多数入门教程为了降低门槛,往往把 vector_store.similarity_search() 演示得像一个黑盒“魔法”:输入问题 \rightarrow 掉出文字,事实上他们讲解的,仅仅是“掉出ID”,后面的部分是具体工程实现。

  1. 分词/向量 \rightarrow 算出最相关的 ID
  2. ID \rightarrow 去数据库的 Metadata 表 捞取数据。
  3. Metadata 表 \rightarrow 吐出 原文内容(给 LLM)和 原文件路径(给用户看)。

关于清理

清理包括清理向量数据库,倒排索引表和文件存储数据库,及真实的物理文件。

在真实的生产环境(Production)中,清理向量数据库(Vector Store)的数据绝不是简单的 rm -rf,而是一套涉及生命周期管理 (TTL)数据一致性存储成本控制的工程体系。

以下是除了逻辑删除之外,在高级工程应用中常见的清理实践:


1. 基于 TTL(生存时间)的自动清理

在日志分析或实时客服 Agent 中,数据具有很强的时效性。

  • 实现方式:在存储文档时,在 metadata 中植入 expire_at(过期时间戳)。
  • 工程做法:启动一个后台“看门狗”服务(Cron Job),每隔一小时运行一次,调用数据库的删除接口:
# 伪代码:删除所有当前时间大于过期时间的数据
current_ts = time.time()
vector_store.delete(where={"expire_at": {"$lt": current_ts}})

  • 优点:无需人工干预,存储空间始终维持在健康水平。

2. 引用计数与级联删除 (Reference Counting)

在复杂的 RAG 系统中,同一个原始文档可能被切分为几十个 Chunk(分块)

  • 工程风险:如果你只删除了向量库里的数据,而没删原始对象存储(如 S3)里的文件,或者反之,就会产生“孤儿数据”。
  • 实践:建立一个中间层(Metadata DB,如 PostgreSQL)。
  • 删除时,先在管理后台发起删除指令。
  • 系统先去中间层查出该文档对应的所有 chunk_ids
  • 事务操作:同时删除向量库中的索引、DocStore 中的文本以及物理磁盘上的原文件。

3. 影子库切换与“蓝绿清理” (Shadow Indexing)

当你发现向量库目录因为长期的增删产生了严重的碎片(明明删了数据,磁盘占用却没减),或者你想更换 Embedding 模型时,会使用此方法。

  • 步骤
  1. 全量同步:在后台启动一个新的数据库实例(新目录)。
  2. 重新索引:从原始可靠数据源重新跑一遍 RAG 流程
  3. 热切:代码逻辑修改指向新目录。
  4. 物理清理:直接删除旧的整个物理文件夹。
  • 场景:这是最彻底的清理方式,也是模型升级时的标准动作。

4. 存储压缩与压实 (Compaction)

很多工业级向量数据库(如 Milvus, Weaviate)模仿了传统数据库的 LSM Tree 结构。

  • 原理:当你调用 delete 时,系统只是在数据上打了一个“墓碑标记”(Soft Delete),并没有真的擦除。
  • 工程应用
  • Compact 触发:当“墓碑”占比超过 30% 时,手动或自动触发 Compact 操作。
  • 动作:数据库会将有效的数据重新排列、紧凑地写成新文件,并把包含大量空洞的旧文件物理删除。
  • 作用:在不停止服务的情况下,显著降低磁盘占用。

5. 基于相似度的去重清理 (Deduping)

有时候垃圾不是因为“过期”,而是因为“重复”。

  • 实践:在写入新数据前,先做一次 Top-1 检索
  • 阈值判断:如果发现最相似的结果分值(Score)高达 0.999,说明内容高度重复。
  • 策略
  • 丢弃新数据(节省空间)。
  • 或者更新旧数据的 last_seen 时间戳(延长其 TTL)。

真实工程总结表

清理级别常用手段适用场景
行级vector_store.delete(ids=[...])用户主动删除某条信息。
批次级根据 metadata 过滤删除清理特定日期、特定客户的数据。
物理级蓝绿切换 / 目录重建磁盘满额、模型升级、严重的逻辑碎片。
系统级后台自动 Compaction企业级向量数据库的日常维护。

Backlinks