RAG架构和向量数据库的原始文件存储
RAG架构
RAG 有三种主流架构:
| 架构 | 描述 |
|---|---|
| 2-Step RAG | 先检索,再生成 |
| Agentic | RAG 使用 Agent 控制检索的时机与方式 |
| Hybrid RAG | 在 Agentic RAG 的基础上,增加用户 query 改写,确认召回文本的相关性等步骤 |
一定要看官方文档
2-Step RAG
在两步 RAG 中,检索步骤总是在生成步骤之前执行。这种架构简单且可预测,适用于许多应用,其中检索相关文档是生成答案的明确前提条件。
装饰器的写法:
from langchain.agents.middleware import dynamic_prompt, ModelRequest
@dynamic_prompt
def prompt_with_context(request: ModelRequest) -> str:
"""Inject context into state messages."""
last_query = request.state["messages"][-1].text
retrieved_docs = vector_store.similarity_search(last_query)
docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)
system_message = (
"You are a helpful assistant. Use the following context in your response:"
f"\n\n{docs_content}"
)
return system_message
agent = create_agent(model, tools=[], middleware=[prompt_with_context])
使用
query = "What is task decomposition?"
for step in agent.stream(
{"messages": [{"role": "user", "content": query}]},
stream_mode="values",
):
step["messages"][-1].pretty_print()
注意一下里面的stream_mode,values就等于普通agent invoke的result了。
输出:
================================ Human Message =================================
What is task decomposition?
================================== Ai Message ==================================
Task decomposition is...
新语法,可以进行流程控制了:
from typing import Any
from langchain_core.documents import Document
from langchain.agents.middleware import AgentMiddleware, AgentState
class State(AgentState):
context: list[Document]
class RetrieveDocumentsMiddleware(AgentMiddleware[State]):
state_schema = State
def before_model(self, state: AgentState) -> dict[str, Any] | None:
last_message = state["messages"][-1]
retrieved_docs = vector_store.similarity_search(last_message.text)
docs_content = "\n\n".join(doc.page_content for doc in retrieved_docs)
augmented_message_content = (
f"{last_message.text}\n\n"
"Use the following context to answer the query:\n"
f"{docs_content}"
)
return {
"messages": [last_message.model_copy(update={"content": augmented_message_content})],
"context": retrieved_docs,
}
agent = create_agent(
model,
tools=[],
middleware=[RetrieveDocumentsMiddleware()],
)
首先,不关注这个语法(用类而不是装饰器了),它送入的是实例,支持构造函数,还能改写state(增加了原文context到state中,显然可以传递到后面的流程去了)
Agentic RAG
代理式检索增强生成(RAG)结合了检索增强生成和基于代理的推理的优势。它不是在回答之前检索文档,而是一个代理(由 LLM 驱动)逐步推理,并在交互过程中决定何时以及如何检索信息。
Hybrid RAG
混合式 RAG 结合了 2-Step RAG 和 Agentic RAG 的特点。它引入了查询预处理、检索验证和生成后检查等中间步骤。这些系统比固定流程提供了更高的灵活性,同时仍保持对执行过程的控制。
注意
在官方教程中,使用init_chat_model比较多,它只代表一次对话,而三方教程基本上都是无脑create_agent,显然官网的更权威,它更能体验流程自控的思路,特别是建立graph的时候,每一个节点如果是一个agent,那将有太多不可控的。
备注一个知识眯,自行构建messages,下面的例子模拟了一次对话,一个tool_calls回应,及一次tools调用:
from langchain_core.messages import convert_to_messages
input = {
"messages": convert_to_messages(
[
{
"role": "user",
"content": "What does Lilian Weng say about types of reward hacking?",
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "1",
"name": "retrieve_blog_posts",
"args": {"query": "types of reward hacking"},
}
],
},
{"role": "tool", "content": "meow", "tool_call_id": "1"},
]
)
}
grade_documents(input)
简单来说, 就是:
- user: 我说了句话
- assistant: 我发现可以调这个工具,参数是blabla
- tools: 这是结果
教程里大量这么用,其实相当于是“单元测试”,即写了个逻辑后,模拟一段上下文迅速验证一下,这是非常值得借鉴的一种方式,特别是你写的是一个个的小节点,就不需要串起来才能联调了。
tools_condition和ToolNode
看这个节点:
from langgraph.graph import MessagesState
from langchain.chat_models import init_chat_model
response_model = init_chat_model("gpt-4.1", temperature=0)
def generate_query_or_respond(state: MessagesState):
"""Call the model to generate a response based on the current state. Given
the question, it will decide to retrieve using the retriever tool, or simply respond to the user.
"""
response = (
response_model
.bind_tools([retriever_tool]).invoke(state["messages"])
)
return {"messages": [response]}
为一个chatmodel达到了一个查向量库的工具,在制图的代码里:
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
workflow = StateGraph(MessagesState)
# Define the nodes we will cycle between
workflow.add_node(generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node(rewrite_question)
workflow.add_node(generate_answer)
workflow.add_edge(START, "generate_query_or_respond")
# Decide whether to retrieve
workflow.add_conditional_edges(
"generate_query_or_respond",
# Assess LLM decision (call `retriever_tool` tool or respond to the user)
tools_condition,
{
# Translate the condition outputs to nodes in our graph
"tools": "retrieve",
END: END,
},
)
# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
"retrieve",
# Assess agent decision
grade_documents,
)
workflow.add_edge("generate_answer", END)
workflow.add_edge("rewrite_question", "generate_query_or_respond")
# Compile
graph = workflow.compile()
问题:
- 为什么
generate_query_or_respond成了一个条件边节点?- 为什么
retriever_tool成了ToolNode?
这同样源于其它教程太懒,直接用agent来做节点,反而把基础给模糊了。大模型的本质,是一问一答,不要被bind_tools给误导了,bind了tools就能调工具,那已经是一个agent了。
- 所以,事实上一个bind了tools的chat model,只能要么直接答复,要么给你一个
tool_calls的消息。 - 所以这种节点,必然要对接一个条件边
- 从示例中可以看出,这个条件边叫:从我到tools_call,出路是(end和tools),这都是隐性的。
- 也所以,对于返回了
tool_calls的情况,就必须手动用ToolNode来处理,不然你的图就少了环节。
这种例子才是真正的手写代理。
文件存储
无论是向量检索还是分词检索,它们的核心使命都是定位。定位的结果就是一个 ID(像是一个门牌号)。至于这个门牌号后面住着谁(原文)、住在哪个小区(原文件路径),则由数据库的存储模块负责。
现代数据库设计的命脉:“索引归索引,存储归存储”。这种“先找 ID,再取内容”的策略,在计算机科学中被称为 “二级索引(Secondary Index)” 架构。
文件存储数据库
每一个成熟的向量数据库(如 Chroma都会内置一个“小型关系数据库”。这个数据库(通常是隐藏的 SQLite 或 Parquet)存了一张大表:
| 文档 ID (Key) | 原文内容 (Content) | 原文件物理路径 (Source Path) | 其他属性 (Metadata) |
|---|---|---|---|
001 | "当前的 Agent 能力..." | /Users/data/blog.pdf | page: 5, author: LC |
结论:检索是基于 ID 的,但 ID 指向了一个包含 路径 和 原文 的结构化对象。并不是“基于路径检索”,而是检索到 ID 后,反查出了路径。
检索器只能查出ID
- 倒排索引告诉你:词“Agent”出现在
ID: 001。 - 向量索引告诉你:你的问题与
ID: 001最像。 此时,系统只拿到了一个逻辑符号(ID)。
串起来
市面上大多数入门教程为了降低门槛,往往把 vector_store.similarity_search() 演示得像一个黑盒“魔法”:输入问题 掉出文字,事实上他们讲解的,仅仅是“掉出ID”,后面的部分是具体工程实现。
- 分词/向量 算出最相关的 ID。
- ID 去数据库的 Metadata 表 捞取数据。
- Metadata 表 吐出 原文内容(给 LLM)和 原文件路径(给用户看)。
关于清理
清理包括清理向量数据库,倒排索引表和文件存储数据库,及真实的物理文件。
在真实的生产环境(Production)中,清理向量数据库(Vector Store)的数据绝不是简单的 rm -rf,而是一套涉及生命周期管理 (TTL)、数据一致性和存储成本控制的工程体系。
以下是除了逻辑删除之外,在高级工程应用中常见的清理实践:
1. 基于 TTL(生存时间)的自动清理
在日志分析或实时客服 Agent 中,数据具有很强的时效性。
- 实现方式:在存储文档时,在
metadata中植入expire_at(过期时间戳)。 - 工程做法:启动一个后台“看门狗”服务(Cron Job),每隔一小时运行一次,调用数据库的删除接口:
# 伪代码:删除所有当前时间大于过期时间的数据
current_ts = time.time()
vector_store.delete(where={"expire_at": {"$lt": current_ts}})
- 优点:无需人工干预,存储空间始终维持在健康水平。
2. 引用计数与级联删除 (Reference Counting)
在复杂的 RAG 系统中,同一个原始文档可能被切分为几十个 Chunk(分块)。
- 工程风险:如果你只删除了向量库里的数据,而没删原始对象存储(如 S3)里的文件,或者反之,就会产生“孤儿数据”。
- 实践:建立一个中间层(Metadata DB,如 PostgreSQL)。
- 删除时,先在管理后台发起删除指令。
- 系统先去中间层查出该文档对应的所有
chunk_ids。 - 事务操作:同时删除向量库中的索引、DocStore 中的文本以及物理磁盘上的原文件。
3. 影子库切换与“蓝绿清理” (Shadow Indexing)
当你发现向量库目录因为长期的增删产生了严重的碎片(明明删了数据,磁盘占用却没减),或者你想更换 Embedding 模型时,会使用此方法。
- 步骤:
- 全量同步:在后台启动一个新的数据库实例(新目录)。
- 重新索引:从原始可靠数据源重新跑一遍 RAG 流程。
- 热切:代码逻辑修改指向新目录。
- 物理清理:直接删除旧的整个物理文件夹。
- 场景:这是最彻底的清理方式,也是模型升级时的标准动作。
4. 存储压缩与压实 (Compaction)
很多工业级向量数据库(如 Milvus, Weaviate)模仿了传统数据库的 LSM Tree 结构。
- 原理:当你调用
delete时,系统只是在数据上打了一个“墓碑标记”(Soft Delete),并没有真的擦除。 - 工程应用:
- Compact 触发:当“墓碑”占比超过 30% 时,手动或自动触发
Compact操作。 - 动作:数据库会将有效的数据重新排列、紧凑地写成新文件,并把包含大量空洞的旧文件物理删除。
- 作用:在不停止服务的情况下,显著降低磁盘占用。
5. 基于相似度的去重清理 (Deduping)
有时候垃圾不是因为“过期”,而是因为“重复”。
- 实践:在写入新数据前,先做一次 Top-1 检索。
- 阈值判断:如果发现最相似的结果分值(Score)高达 0.999,说明内容高度重复。
- 策略:
- 丢弃新数据(节省空间)。
- 或者更新旧数据的
last_seen时间戳(延长其 TTL)。
真实工程总结表
| 清理级别 | 常用手段 | 适用场景 |
|---|---|---|
| 行级 | vector_store.delete(ids=[...]) | 用户主动删除某条信息。 |
| 批次级 | 根据 metadata 过滤删除 | 清理特定日期、特定客户的数据。 |
| 物理级 | 蓝绿切换 / 目录重建 | 磁盘满额、模型升级、严重的逻辑碎片。 |
| 系统级 | 后台自动 Compaction | 企业级向量数据库的日常维护。 |
Backlinks