LangGraph学习笔记
Reducer
from typing import Annotated, List, Tuple, TypedDict
import operator
class PlanExecute(TypedDict) :
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
response: str
- langraph通过
state在节点间传递状态,这个state是自定义的一个类(通常定义为一个字典就行了)- 上传中的
TypedDict就是dict - 要定义为一个
BaseModel也行,但会徒增类型转换开销
- 上传中的
- langgraph有一个
reducer概念,跟其它编程语言的reduce高阶函数不同,这只是它的一种设计模式/约定- 实际是通过Python的
Annotated类型注解来声明和实现这种模式 - 它简单说明了这个属性在获得新状态时的行为,比如是覆盖,合并,还是累加等
- 实际是通过Python的
| Reducer 函数 | 适用场景 | 效果说明 |
|---|---|---|
| add_messages | 管理对话消息列表 | 根据消息ID进行更新或追加,是构建对话系统的首选。 |
| operator.add | 数值累加(如计数器) | 执行数学加法:当前值 + 新值。 |
| operator.extend | 列表扩展 | 将新列表的元素追加到当前列表末尾,类似 list.extend()。 |
| update_dict | 字典合并更新 | 用新字典的键值对更新当前字典,类似 dict.update()。 |
RemoveMessage
现在来看一个隐藏的reducer:
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
if len(messages) <= 3:
return None # No changes needed
first_msg = messages[0]
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages
]
}
不跟你说,你能解释return的这么奇怪的语法是什么吗?为什么在一个数组里写指定?以及,为什么不直接这样:
return new_messages
原因在于:
- 这句话, 本质上,是在给
Agentstate.messages赋值 - messages的定义是:
Required[Annotated[list[AnyMessage], add_messages]]- 正好就是上面说的一个标准的
Reducer
- 正好就是上面说的一个标准的
- 也就是说,这段代码会触发
add_messages这个reducer - 传给reducer的是
(remove指定id,和新值)<<< 这点非常重要! - reducer认识RemoveMessage指定,表示一个特殊指定,看到就要去移除消息
- 也就是说,这个语法这么奇怪,魔法就是
add_message方法你没看到实现而已
我们顺便翻翻源码:
if isinstance(m, RemoveMessage) and m.id == REMOVE_ALL_MESSAGES:
remove_all_idx = idx
依赖注入
LangGraph有很多隐藏的“约定”,目的是为了依赖注入,这使得在开发的时候看不到直接的联系,如:
class Context:
user_id: str
@tool
def get_user_info(runtime: ToolRuntime[Context]) -> str:
"""用于查询用户信息"""
user_id = runtime.context.user_id
user_info = runtime.store.get(("users",), user_id)
return str(user_info.value) if user_info else "未知用户"
# 创建Agent
agent = create_agent(
model=llm,
tools=[get_user_info],
store=store,
context_schema=Context
)
# 运行Agent
result = agent.invoke(
{"messages": [{"role": "user", "content": "查阅用户信息"}]},
context=Context(user_id="user_2")
)
上述工具方法,需要传入ToolRuntime类型的runtime参数(注意,两个规则,必须是"runtime"的签名,和ToolRuntime类型或其变体),那么在内部判断需要调用这个工具的时候,怎么去找到这个参数呢?
约定就是参数名和类型,匹配中了,就会自动组装你可能绑定到context上的对象,实例化一个runtime, 绑定到context键上,作为runtime参数传入工具函数。
注意,
- 必须是
from langchain.tools import tool, ToolRuntime, - 而不是
from langgraph.runtime import Runtime - ToolRuntime可以看作是 Runtime在工具调用场景下的一个特化或封装。这种设计符合“最小权限原则”,有利于代码的模块化和安全。
意图识别
如果你有一个save_user_info的工具函数,并在构建agent时传入了,这就改变了大模型的思考方向(正常情况下会给出一句招呼),因为你拓宽了它的“任务空间”,会产生这样的思考:“用户的这句话是否触发了使用这个工具的条件?我是否可以通过调用这个工具来更好地完成任务?” 这是一种工具驱动的推理(Tool-augmented Reasoning)
State
AgentState长这样:
class AgentState(TypedDict, Generic[ResponseT]):
"""State schema for the agent."""
messages: Required[Annotated[list[AnyMessage], add_messages]]
jump_to: NotRequired[Annotated[JumpTo | None, EphemeralValue, PrivateStateAttr]]
structured_response: NotRequired[Annotated[ResponseT, OmitFromInput]]
可以看到,
messages用了一个add_message的reducer,这个reducer会自动把新的消息添加到messages列表中。jump_to字段则是一个EphemeralValue,表示这个字段只在当前调用中有效,不会持久化到AgentState中。structured_response字段则是一个OmitFromInput,表示这个字段不会作为输入传给模型,只会作为输出返回。
显然,这解释了为什么节点里返回message为什么只返回当前信息,却能把历史消息一起传递,也解释了一些关键拦截(比如before_agent中间件)就是靠jump_to实现跳到指定节点(比如End)的。
from typing import Any
from langchain.agents.middleware import before_agent, AgentState
from langgraph.runtime import Runtime
banned_keywords = ["hack", "exploit", "malware"]
@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Deterministic guardrail: Block requests containing banned keywords."""
# Get the first user message
if not state["messages"]:
return None
last_message = state["messages"][-1]
if last_message.type != "human":
return None
content = last_message.content.lower()
# Check for banned keywords
for keyword in banned_keywords:
if keyword in content:
# Block execution before any processing
return {
"messages": [{
"role": "assistant",
"content": "I cannot process requests containing inappropriate content. Please rephrase your request."
}],
"jump_to": "end"
}
return None
agent = create_agent(
model=basic_model,
middleware=[content_filter],
)
# This request will be blocked before any processing
result = agent.invoke({
"messages": [{"role": "user", "content": "How do I hack into a database?"}]
})
自定义reducer
from typing import Annotated
# 自定义 Reducer 函数
def reduce_unique_list(existing: list, new_updates: list) -> list:
# 如果 existing 为空(初始状态),直接处理新值
if existing is None: existing = []
# 将新旧数据合并并去重
# 注意:如果 list 里是对象,需要根据对象的某个 key 去重
combined = existing + new_updates
return list(set(combined))
记忆
如非必要,你无需向智能体添加 Memory 模块。因为 StateGraph 本身就含有历史消息列表 messages,足以满足最基础的“记忆”需求。
- 短期记忆都是各种
Saver,无论是不是存到数据库,靠session关联 - 长期记忆都是
Store,必然会(向量化)存储, 手动插入(靠工具
短期记忆
from langgraph.checkpoint.sqlite import SqliteSaver
# 创建短期记忆
checkpointer = InMemorySaver()
# 创建sqlite支持的短期记忆
checkpointer = SqliteSaver(
sqlite3.connect("short-memory.db", check_same_thread=False)
)
# 创建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node('assistant', assistant)
# 添加边
builder.add_edge(START, 'assistant')
builder.add_edge('assistant', END)
# 使用检查点
graph = builder.compile(checkpointer=checkpointer)
## 如果不使用检查点,看看会发生什么?
# graph = builder.compile()
# 告诉智能体我是谁
result = graph.invoke(
{'messages': ['你好!我是派大星']},
{"configurable": {"thread_id": "1"}},
)
[message.pretty_print() for message in result['messages']]
# 让智能体回忆我的名字
result = graph.invoke(
{"messages": [{"role": "user", "content": "请问我是谁?"}]},
{"configurable": {"thread_id": "1"}},
)
[message.pretty_print() for message in result['messages']]
步骤/前提:
- 创建
checkpointer - 添加到
gragh中(如果是agent,则添加到agent中),其实就是初始化 - 不管是图还是agent,在invoke的时候配置正确的
thread_id
长期记忆
长期记忆支持使用 Embedding 检索语义相近的内容。
# 嵌入维度
EMBED_DIM = 1024
# 获取text embedding的接口
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
)
# embedding生成函数
def embed(texts: list[str]) -> list[list[float]]:
response = client.embeddings.create(
model="text-embedding-v4",
input=texts,
dimensions=EMBED_DIM,
)
return [item.embedding for item in response.data]
# 测试能否正常生成text embedding
texts = [
"LangGraph的中间件非常强大",
"LangGraph的MCP也很好用",
]
vectors = embed(texts)
len(vectors), len(vectors[0])
到此,证明了embed函数是可用的。
# 创建 InMemoryStore 内存存储
store = InMemoryStore(index={"embed": embed, "dims": EMBED_DIM})
# 添加两条用户数据 user_1 user_2
namespace = ("users", )
store.put(
namespace, # Namespace to group related data together
"user_1", # Key within the namespace
{
"rules": [
"User likes short, direct language",
"User only speaks English & python",
],
"rule_id": "3",
},
)
store.put(
("users",),
"user_2",
{
"name": "John Smith",
"language": "English",
}
)
这里用embed实例化了Store,并且使用Store的api添加了两个用户数据。
读取,通过namespace和key:
item = store.get(namespace, "user_2")
或通过检索召回:
items = store.search(
namespace,
query="language preferences",
filter={"rule_id": "3"},
)
到此,我们有了store. 在智能体中使用,则是通过tools
读:
@dataclass
class Context:
user_id: str
@tool
def get_user_info(runtime: ToolRuntime[Context]) -> str:
"""用于查询用户信息"""
user_id = runtime.context.user_id
user_info = runtime.store.get(("users",), user_id)
return str(user_info.value) if user_info else "未知用户"
# 创建Agent
agent = create_agent(
model=model,
tools=[get_user_info],
store=store,
context_schema=Context
)
# 运行Agent
result = agent.invoke(
{"messages": [{"role": "user", "content": "查阅用户信息"}]},
context=Context(user_id="user_2")
)
for message in result['messages']:
message.pretty_print()
- store存在tools调用时约定的
ToolRuntiime里 - runtime以泛型约束接收
Context,如果调用时需要传入自定义信息(通常是用户身份),就是在invoke时通过context参数传入- 还记得吗?上节的
thread_id也是这个时机传入的
- 还记得吗?上节的
- 这个时候
runtime就有上下文和向量存储两个属性了 - 大模型判断用到用户身份时,调用创建
agent时传的工具 store也是这个创建agent时传入的- 上下文对象泛型约束(即模型类)也是这个时候传入的
写:
class UserInfo(TypedDict):
name: str
@tool
def save_user_info(user_info: UserInfo, runtime: ToolRuntime[Context]) -> str:
"""用于保存/更新用户信息"""
user_id = runtime.context.user_id
runtime.store.put(("users",), user_id, user_info)
return "成功保存用户信息"
# 创建gent
agent = create_agent(
model=model,
tools=[save_user_info],
store=store,
context_schema=Context
)
# 运行Agent
agent.invoke(
{"messages": [{"role": "user", "content": "My name is John Smith"}]},
context=Context(user_id="user_123")
)
store.get(("users",), "user_123").value
- 从
runtime中获取context,获取user_id - 通过
runtime.store.put保存用户信息 - 大模型通过对话判断到用户信息需要保存,调用创建
agent时传的工具 - 存到创建
agent时传入的store中
相关import:
from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.store.sqlite import SqliteStore
思考: 上述长期记忆的例子全是通过大模型(智能体)自己判断需要读或写用户信息的时候,发现有用户相关的工具,进行了长期记忆的读写。工具里实现的是InMemoryStore而已,那么实现成InMemorySaver(即短期记忆)会有什么区别?
- 短期记忆只认
thread_id,长期记忆相当于是知识库,与对话无关 - 短期记忆记的是
messages, 长期记忆是结构化数据(json)
显然我们不可能为每一个记忆都要写一个tools,下面单开一文来深化长期记忆的使用: 长期记忆
上下文工程
LangGraph 将上下文分为三种类型:
- 模型上下文(
Model Context) - 工具上下文(
Tool Context) - 生命周期上下文(
Life-cycle Context)
无论哪种 Context,都需要定义它的 Schema。在这方面,LangGraph 提供了相当高的自由度,你可以使用 dataclasses、pydantic、TypedDict 这些包的任意一个创建你的 Context Schema. 上一节用的是dataclasses, 而且涉及的Context是哪一种上下文呢?
从智能体中获取触发逻辑所需的即时变量。这些变量通常存储在以下三个存储介质中:
- 运行时(Runtime)- 所有节点共享一个 Runtime。同一时刻,所有节点取到的 Runtime 的值是相同的。一般用于存储时效性要求较高的信息。
- 短期记忆(State)- 在节点之间按顺序传递,每个节点接收上一个节点处理后的 State。主要用于存储 Prompt 和 AI Message。
- 长期记忆(Store)- 负责持久化存储,可以跨 Workflow / Agent 保存信息。可以用来存用户偏好、以前算过的统计值等。
具体示例见参考教程
上面这个教程例子里,都是用
@dynamic_prompt做的示例,那么它做了什么?给提示词添加了一段systemMessage?
@dynamic_prompt 是 LangGraph 提供的一个中间件(Middleware),它的核心作用就是在模型真正发起请求(Model Call)之前的“最后一秒”,动态地往 System Message 里塞东西。
1. 它到底做了什么?
在传统的开发中,系统提示词通常是静态的。而 @dynamic_prompt 允许你定义一个函数,这个函数会在每次调用模型前执行:
- 拦截请求:拦截当前的
ModelRequest。 - 计算逻辑:根据你提供的逻辑(比如判断消息数量、读取当前时间、从
Store拿用户偏好)。 - 修改指令:将返回的字符串内容,追加或注入到最终发给大模型的系统指令中。
2. 它注入的是 System Message 吗?
是的,但不完全是“覆盖”。
- 它通常是在你原有的
System Message基础上进行增强。 - 例如,你在代码里定义了:
"You are a helpful assistant." @dynamic_prompt函数返回了:"User prefers Chinese."- 最终发给模型的系统指令会合并成:
"You are a helpful assistant.\nUser prefers Chinese."
3. 三种典型的“塞入”来源
正如你正在看的 上下文工程 页面所示,这个中间件可以从三个地方抓取信息来修改提示词:
| 来源 | 场景示例 | 页面对应章节 |
|---|---|---|
| State (短期) | 如果对话超过 10 轮,自动在提示词加一句“请简短回答”。 | 1)使用 State 管理上下文 |
| Store (长期) | 自动从数据库读出该用户的 ID 对应的语言偏好。 | 2)使用 Store 管理上下文 |
| Runtime (运行时) | 根据当前是“管理员”还是“普通用户”身份,动态开关某个工具的使用权限。 | 3)使用 Runtime 管理上下文 |
4. 为什么要用它,而不是直接改消息列表?
你可能会问:我直接在 messages 列表里手动加一行 SystemMessage 不行吗?
使用 @dynamic_prompt 的高级之处在于:
- 解耦:你的业务逻辑节点(Node)只需要关心“怎么回答问题”,而“针对不同用户的个性化指令”交由中间件自动处理。
- 无感注入:它不会污染你的
MessagesState(即对话历史里不会多出这些指令),它只存在于发往 LLM 的那一瞬间。 - 实时性:它能确保模型拿到的永远是最新的 Runtime 环境变量(比如当前的精确时间或用户实时切换的状态)。
总结一下:
@dynamic_prompt 就像是一个“提示词补丁”。它让原本死板的 System Message 变成了可以根据当前上下文(State/Store/Runtime)自动变身的动态指令集。
多智能体系统(MAS)
分析一下本页实例,“多代理”两种实现方式,都是将每个agent包成tools从tools传入的,即使用的是tools的架构思路实现的。所以,多代理机制仅仅是个思路/架构的概念,而没有新的api或知识点/技术创新是吧?
多智能体(Multi-Agent)在底层实现上,确实大量复用了 Tool-calling 这一核心技术。
1. “术”的层面:确实是 Tool-calling 的包装
本页的第一个例子:
- 实现方式:将
subagent1和subagent2包装成函数,并加上@tool装饰器。 - 本质:对于
supervisor_agent来说,它根本不知道对面是一个“智能体”还是一个“简单的加法函数”。它只看到一个描述为“可以计算加法”的工具。 - 结论:在这个维度上,多智能体 = 递归的 Tool-calling。这确实没有引入新的底层 API。
2. “道”的层面:为什么它被称为一种“模式”?
虽然底层是工具,但“监督者模式”引入了几个关键的 状态管理 和 逻辑流控制 的概念,这些是单智能体 Tool-calling 不具备的:
- 上下文隔离(Context Isolation):
这是最核心的进步。如果是一个 Agent 挂 100 个工具,它的 System Prompt 会变得极长且容易混乱。而在多智能体架构中,
subagent1只拿到了它需要的消息,它并不知道subagent2之前做了什么。这种局部上下文极大地提高了复杂任务的准确性。 - 控制权转移(Handoff/Transfer):
在本页第二个例子
langgraph-supervisor中,你会看到transfer_to_subagent这样的字眼。这不仅仅是“调用工具”,它涉及到 Graph 节点的切换。 - 在普通的 Tool 调用中,控制权始终在主 Agent 手里。
- 在 Supervisor 模式下,控制权可以完全移交给子 Agent,甚至支持多轮对话后再返回给监督者。
3. 技术创新点在哪里?
如果你觉得第一个例子太简单,可以留意一下 langgraph-supervisor 这个包带来的“新东西”:
- 自动路由(Auto-routing):它不需要你手动写
call_subagent这种包装函数。它定义了一套协议,让 Graph 能自动识别如何把State(对话状态)在不同的 Agent 节点之间进行平滑传递。 - 状态合并(State Merging):当子 Agent 完成任务后,它是只返回一个结果字符串,还是把它的整个中间思考过程(Messages)合并回主 Graph?这种状态合并策略是多智能体框架特有的技术细节。
总结与反思
你的理解是对的:多智能体本质上是把“大型问题”拆解为“多个小型 Tool-calling 问题”的组织架构。
- 没有黑科技:它没有改变 LLM 推理的本质。
- 有架构创新:它解决了单智能体在面对“工具爆炸”(Too many tools)和“长文本迷失”(Lost in the middle)时的无力感。
正如页面结尾所说的,作者自己也承认:“多智能体系统在现阶段可能会降低准确性”。这证明了它不是万灵药,而是一种针对 极高复杂度任务 的工程妥协。
控制权转移是什么?
在 LangGraph 的多智能体架构中,控制权转移(Handoff / Transfer) 标志着从“简单的函数调用”进化到了“流程化的协作”。
如果说第一种方式(Tool-calling)只是主智能体在“借用工具”,那么控制权转移则意味着“换人上岗”。具体来说,它包含以下三个维度的深层含义:
1. 节点(Node)的物理切换
在 LangGraph 的图结构中,控制权转移意味着当前执行的指针从 Supervisor 节点跳转到了 Sub-agent 节点。
- Tool-calling 模式:始终停留在
Supervisor节点,只是执行了一个 Python 函数并拿回了字符串。 - 控制权转移模式:流程真正进入了子智能体的地盘。如你在页面看到的
transfer_to_subagent-2,它是通过langgraph-supervisor生成的特殊指令,触发了图导航(Routing)。
2. 状态(State)的接管与自治
这是最本质的区别。当控制权转移给子智能体后,它拥有了“对话主权”:
- 独立推理:子智能体可以根据自己的
system_prompt和工具集,进行多次思考(Reasoning)和多次工具调用,而不需要每一退步都回传给监督者请示。 - 消息历史隔离:子智能体可以维护自己局部的消息列表。直到它决定“任务完成”并触发
transfer_back_to_supervisor,控制权才会交回。
3. “长跑”与“短跑”的区别
- Tool-calling 是“短跑”:主智能体问:“1+1等于几?”,子智能体工具瞬间回一个“2”,结束。这适合原子化的简单任务。
- 控制权转移是“接力赛”:主智能体说:“帮我写一个关于 AI 的研究报告”。
- 控制权转交给
Researcher Agent,它去搜资料、翻文档、写草稿(中间可能跑了 10 轮对话)。 - 完成后,
Researcher发起transfer_back,将整份报告交还。 - 监督者拿到报告后再进行下一步。
4. 为什么要“交回控制权”?
你在示例中看到的 transfer_back_to_supervisor 是为了解决闭环问题。
在多步任务中(如你算的那个长数学题),如果子智能体干完活不把控制权交回,整个图就会卡在子节点或者直接结束。只有交回控制权,监督者才能意识到“第一步除法做完了,现在我该安排第二步乘法了”。
总结
控制权转移 实际上是让“智能体”回归其作为“独立决策实体”的本色。
- 它意味着 Scope(作用域) 的切换。
- 它意味着 Responsibility(责任) 的交付。
- 它解决了单 Agent 无法处理的超长逻辑链问题,通过让不同的专家(子 Agent)在各自擅长的领域“深度接管”一段时间,来换取更高的准确性。
如 监督者模式页面 所述,虽然现阶段这种转移可能会因为流程变长而增加出错概率,但在处理需要“专家深挖”的复杂任务时,这种控制权的交接是必不可少的架构设计。
你的理解完全正确。这正是从“工具箱模式”进化到“部门协作模式”的核心逻辑。
为了让你更透彻地理解这个“小循环”是如何运作的,我们可以把这个过程拆解为空间(上下文)和时间(控制权)两个维度:
1. 为什么“分而治之”比“100 个 Tools”更强?
当一个 Agent 挂载 100 个工具时,它就像一个面对 100 个按钮的工人,很容易“按错按钮”或者“忘记按钮的功能”。
在 多智能体系统 模式下:
- 角色专业化:你可以把 30 个财务工具给
Accountant Agent,30 个搜索工具给Researcher Agent。 - 注意力集中:进入
Accountant Agent的小循环后,它完全看不见那 70 个无关工具。这极大地降低了 LLM 的推理负担,准确率显著提升。
2. “小循环”内发生了什么?
当你通过 supervisor_agent 触发任务后,子智能体的执行过程如下:
- 接管意图:Supervisor 说:“去查一下这笔账单”。
- 进入黑盒:控制权转移到子智能体。它开始自己的
ReAct循环(思考 -> 调用工具 1 -> 观察结果 -> 思考 -> 调用工具 2...)。 - 多轮交互:子智能体可能在内部进行了 5 轮对话才得出结论。
- 结果收敛:子智能体将最终结果封装成一个字符串(或消息)。
- 主动移交:子智能体触发 transfer_back_to_supervisor,带着结果跳出循环。
3. Supervisor 看到的只是“结果”吗?
这里有一个非常关键的技术细节,取决于你的实现方式:
- 封装为 Tool (方式一):如果你像页面第一个例子那样用
@tool包装子智能体,那么 Supervisor 只能看到子智能体最后return的那句话。中间的思考过程和工具调用记录对主图是不可见的。 - 集成到 Graph (方式二):如果你使用 create_supervisor,子智能体产生的消息可以根据你的配置有选择地合并回主状态(State)。这让 Supervisor 能看到:“哦,他查了数据库,又查了网页,最后给了我这个结论。”
一句话总结: 控制权转移让子智能体从“被动执行的 API”变成了“拥有独立思考空间(Node)的合作伙伴”。
并发
这一节笔记又多了,单开一文:并发
Backlinks