LangGraph学习笔记

Reducer

from typing import Annotated, List, Tuple, TypedDict
import operator

class PlanExecute(TypedDict) :
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str
  • langraph通过state在节点间传递状态,这个state是自定义的一个类(通常定义为一个字典就行了)
    • 上传中的TypedDict就是dict
    • 要定义为一个BaseModel也行,但会徒增类型转换开销
  • langgraph有一个reducer概念,跟其它编程语言的reduce高阶函数不同,这只是它的一种设计模式/约定
    • 实际是通过Python的Annotated类型注解来声明和实现这种模式
    • 它简单说明了这个属性在获得新状态时的行为,比如是覆盖,合并,还是累加等
Reducer 函数适用场景效果说明
add_messages​管理对话消息列表根据消息ID进行更新或追加,是构建对话系统的首选。
operator.add​数值累加(如计数器)执行数学加法:当前值 + 新值。
operator.extend​列表扩展将新列表的元素追加到当前列表末尾,类似 list.extend()。
update_dict​字典合并更新用新字典的键值对更新当前字典,类似 dict.update()。

RemoveMessage

现在来看一个隐藏的reducer:

@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Keep only the last few messages to fit context window."""
    messages = state["messages"]

    if len(messages) <= 3:
        return None  # No changes needed

    first_msg = messages[0]
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *new_messages
        ]
    }

不跟你说,你能解释return的这么奇怪的语法是什么吗?为什么在一个数组里写指定?以及,为什么不直接这样:

return new_messages

原因在于:

  • 这句话, 本质上,是在给Agentstate.messages赋值
  • messages的定义是: Required[Annotated[list[AnyMessage], add_messages]]
    • 正好就是上面说的一个标准的Reducer
  • 也就是说,这段代码会触发add_messages这个reducer
  • 传给reducer的是(remove指定id,和新值) <<< 这点非常重要
  • reducer认识RemoveMessage指定,表示一个特殊指定,看到就要去移除消息
  • 也就是说,这个语法这么奇怪,魔法就是add_message方法你没看到实现而已

我们顺便翻翻源码:

if isinstance(m, RemoveMessage) and m.id == REMOVE_ALL_MESSAGES:
    remove_all_idx = idx

依赖注入

LangGraph有很多隐藏的“约定”,目的是为了依赖注入,这使得在开发的时候看不到直接的联系,如:

class Context:
    user_id: str

@tool
def get_user_info(runtime: ToolRuntime[Context]) -> str:
    """用于查询用户信息"""
    user_id = runtime.context.user_id
    user_info = runtime.store.get(("users",), user_id) 
    return str(user_info.value) if user_info else "未知用户"

# 创建Agent
agent = create_agent(
    model=llm,
    tools=[get_user_info],
    store=store, 
    context_schema=Context
)

# 运行Agent
result = agent.invoke(
    {"messages": [{"role": "user", "content": "查阅用户信息"}]},
    context=Context(user_id="user_2") 
)

上述工具方法,需要传入ToolRuntime类型的runtime参数(注意,两个规则,必须是"runtime"的签名,和ToolRuntime类型或其变体),那么在内部判断需要调用这个工具的时候,怎么去找到这个参数呢?

flowchart TD A[开发者调用agent.invoke] --> B[传入context参数] B --> C[LangGraph构建运行时环境] C --> D[智能体决策调用工具] D --> E[框架分析工具函数签名] E --> F[识别ToolRuntime参数需求] F --> G[自动注入运行时实例为工具入参] G --> H[工具函数执行] H --> I[返回结果给智能体]

约定就是参数名和类型,匹配中了,就会自动组装你可能绑定到context上的对象,实例化一个runtime, 绑定到context键上,作为runtime参数传入工具函数。

注意,

  • 必须是from langchain.tools import tool, ToolRuntime,
  • 而不是from langgraph.runtime import Runtime
  • ToolRuntime可以看作是 Runtime在工具调用场景下的一个特化或封装。这种设计符合“最小权限原则”,有利于代码的模块化和安全。

意图识别

如果你有一个save_user_info的工具函数,并在构建agent时传入了,这就改变了大模型的思考方向(正常情况下会给出一句招呼),因为你拓宽了它的“任务空间”,会产生这样的思考:“用户的这句话是否触发了使用这个工具的条件?我是否可以通过调用这个工具来更好地完成任务?” 这是一种工具驱动的推理(Tool-augmented Reasoning

State

AgentState长这样:

class AgentState(TypedDict, Generic[ResponseT]):
    """State schema for the agent."""

    messages: Required[Annotated[list[AnyMessage], add_messages]]
    jump_to: NotRequired[Annotated[JumpTo | None, EphemeralValue, PrivateStateAttr]]
    structured_response: NotRequired[Annotated[ResponseT, OmitFromInput]]

可以看到,

  • messages用了一个add_message的reducer,这个reducer会自动把新的消息添加到messages列表中。
  • jump_to字段则是一个EphemeralValue,表示这个字段只在当前调用中有效,不会持久化到AgentState中。
  • structured_response字段则是一个OmitFromInput,表示这个字段不会作为输入传给模型,只会作为输出返回。

显然,这解释了为什么节点里返回message为什么只返回当前信息,却能把历史消息一起传递,也解释了一些关键拦截(比如before_agent中间件)就是靠jump_to实现跳到指定节点(比如End)的。

from typing import Any

from langchain.agents.middleware import before_agent, AgentState
from langgraph.runtime import Runtime

banned_keywords = ["hack", "exploit", "malware"]

@before_agent(can_jump_to=["end"])
def content_filter(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """Deterministic guardrail: Block requests containing banned keywords."""
    # Get the first user message
    if not state["messages"]:
        return None

    last_message = state["messages"][-1]
    if last_message.type != "human":
        return None

    content = last_message.content.lower()

    # Check for banned keywords
    for keyword in banned_keywords:
        if keyword in content:
            # Block execution before any processing
            return {
                "messages": [{
                    "role": "assistant",
                    "content": "I cannot process requests containing inappropriate content. Please rephrase your request."
                }],
                "jump_to": "end"
            }

    return None

agent = create_agent(
    model=basic_model,
    middleware=[content_filter],
)

# This request will be blocked before any processing
result = agent.invoke({
    "messages": [{"role": "user", "content": "How do I hack into a database?"}]
})

自定义reducer

from typing import Annotated

# 自定义 Reducer 函数
def reduce_unique_list(existing: list, new_updates: list) -> list:
    # 如果 existing 为空(初始状态),直接处理新值
    if existing is None: existing = []
    # 将新旧数据合并并去重
    # 注意:如果 list 里是对象,需要根据对象的某个 key 去重
    combined = existing + new_updates
    return list(set(combined))

记忆

如非必要,你无需向智能体添加 Memory 模块。因为 StateGraph 本身就含有历史消息列表 messages,足以满足最基础的“记忆”需求。

  • 短期记忆都是各种Saver,无论是不是存到数据库,靠session关联
  • 长期记忆都是Store,必然会(向量化)存储, 手动插入(靠工具

短期记忆

from langgraph.checkpoint.sqlite import SqliteSaver

# 创建短期记忆
checkpointer = InMemorySaver()
# 创建sqlite支持的短期记忆
checkpointer = SqliteSaver(
    sqlite3.connect("short-memory.db", check_same_thread=False)
)

# 创建图
builder = StateGraph(MessagesState)

# 添加节点
builder.add_node('assistant', assistant)

# 添加边
builder.add_edge(START, 'assistant')
builder.add_edge('assistant', END)

# 使用检查点
graph = builder.compile(checkpointer=checkpointer)

## 如果不使用检查点,看看会发生什么? 
# graph = builder.compile()

# 告诉智能体我是谁
result = graph.invoke(
    {'messages': ['你好!我是派大星']},
    {"configurable": {"thread_id": "1"}},
)

[message.pretty_print() for message in result['messages']]

# 让智能体回忆我的名字
result = graph.invoke(
    {"messages": [{"role": "user", "content": "请问我是谁?"}]},
    {"configurable": {"thread_id": "1"}},  
)
[message.pretty_print() for message in result['messages']]
    

步骤/前提:

  1. 创建checkpointer
  2. 添加到gragh中(如果是agent,则添加到agent中),其实就是初始化
  3. 不管是图还是agent,在invoke的时候配置正确的thread_id

长期记忆

长期记忆支持使用 Embedding 检索语义相近的内容。

# 嵌入维度
EMBED_DIM = 1024

# 获取text embedding的接口
client = OpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
)

# embedding生成函数
def embed(texts: list[str]) -> list[list[float]]:
    response = client.embeddings.create(
        model="text-embedding-v4",
        input=texts,
        dimensions=EMBED_DIM,
    )
    return [item.embedding for item in response.data]

# 测试能否正常生成text embedding
texts = [
    "LangGraph的中间件非常强大",
    "LangGraph的MCP也很好用",
]
vectors = embed(texts)

len(vectors), len(vectors[0])

到此,证明了embed函数是可用的。

# 创建 InMemoryStore 内存存储
store = InMemoryStore(index={"embed": embed, "dims": EMBED_DIM})

# 添加两条用户数据 user_1 user_2
namespace = ("users", )

store.put(
    namespace,  # Namespace to group related data together
    "user_1",  # Key within the namespace
    {
        "rules": [
            "User likes short, direct language",
            "User only speaks English & python",
        ],
        "rule_id": "3",
    },
)

store.put(
    ("users",),
    "user_2",
    {
        "name": "John Smith",
        "language": "English",
    }
)

这里用embed实例化了Store,并且使用Storeapi添加了两个用户数据。

读取,通过namespacekey:

item = store.get(namespace, "user_2")

或通过检索召回:

items = store.search( 
    namespace,
    query="language preferences",
    filter={"rule_id": "3"},
)

到此,我们有了store. 在智能体中使用,则是通过tools 读:

@dataclass
class Context:
    user_id: str

@tool
def get_user_info(runtime: ToolRuntime[Context]) -> str:
    """用于查询用户信息"""
    user_id = runtime.context.user_id
    user_info = runtime.store.get(("users",), user_id) 
    return str(user_info.value) if user_info else "未知用户"

# 创建Agent
agent = create_agent(
    model=model,
    tools=[get_user_info],
    store=store, 
    context_schema=Context
)

# 运行Agent
result = agent.invoke(
    {"messages": [{"role": "user", "content": "查阅用户信息"}]},
    context=Context(user_id="user_2") 
)

for message in result['messages']:
    message.pretty_print()
  1. store存在tools调用时约定的ToolRuntiime
  2. runtime以泛型约束接收Context,如果调用时需要传入自定义信息(通常是用户身份),就是在invoke时通过context参数传入
    • 还记得吗?上节的thread_id也是这个时机传入的
  3. 这个时候runtime就有上下文和向量存储两个属性了
  4. 大模型判断用到用户身份时,调用创建agent时传的工具
  5. store也是这个创建agent时传入的
  6. 上下文对象泛型约束(即模型类)也是这个时候传入的

写:

class UserInfo(TypedDict):
    name: str

@tool
def save_user_info(user_info: UserInfo, runtime: ToolRuntime[Context]) -> str:
    """用于保存/更新用户信息"""
    user_id = runtime.context.user_id
    runtime.store.put(("users",), user_id, user_info) 
    return "成功保存用户信息"

# 创建gent
agent = create_agent(
    model=model,
    tools=[save_user_info],
    store=store,
    context_schema=Context
)

# 运行Agent
agent.invoke(
    {"messages": [{"role": "user", "content": "My name is John Smith"}]},
    context=Context(user_id="user_123") 
)

store.get(("users",), "user_123").value
  1. runtime中获取context,获取user_id
  2. 通过runtime.store.put保存用户信息
  3. 大模型通过对话判断到用户信息需要保存,调用创建agent时传的工具
  4. 存到创建agent时传入的store

相关import:

from dataclasses import dataclass
from langchain_openai import ChatOpenAI
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.store.sqlite import SqliteStore

思考: 上述长期记忆的例子全是通过大模型(智能体)自己判断需要读或写用户信息的时候,发现有用户相关的工具,进行了长期记忆的读写。工具里实现的是InMemoryStore而已,那么实现成InMemorySaver(即短期记忆)会有什么区别?

  1. 短期记忆只认thread_id,长期记忆相当于是知识库,与对话无关
  2. 短期记忆记的是messages, 长期记忆是结构化数据(json

显然我们不可能为每一个记忆都要写一个tools,下面单开一文来深化长期记忆的使用: 长期记忆

上下文工程

LangGraph 将上下文分为三种类型:

  • 模型上下文(Model Context
  • 工具上下文(Tool Context
  • 生命周期上下文(Life-cycle Context

无论哪种 Context,都需要定义它的 Schema。在这方面,LangGraph 提供了相当高的自由度,你可以使用 dataclasses、pydantic、TypedDict 这些包的任意一个创建你的 Context Schema. 上一节用的是dataclasses, 而且涉及的Context是哪一种上下文呢?

从智能体中获取触发逻辑所需的即时变量。这些变量通常存储在以下三个存储介质中:

  • 运行时(Runtime)- 所有节点共享一个 Runtime。同一时刻,所有节点取到的 Runtime 的值是相同的。一般用于存储时效性要求较高的信息。
  • 短期记忆(State)- 在节点之间按顺序传递,每个节点接收上一个节点处理后的 State。主要用于存储 Prompt 和 AI Message。
  • 长期记忆(Store)- 负责持久化存储,可以跨 Workflow / Agent 保存信息。可以用来存用户偏好、以前算过的统计值等。

具体示例见参考教程

上面这个教程例子里,都是用@dynamic_prompt做的示例,那么它做了什么?给提示词添加了一段systemMessage?

@dynamic_prompt 是 LangGraph 提供的一个中间件(Middleware),它的核心作用就是在模型真正发起请求(Model Call)之前的“最后一秒”,动态地往 System Message 里塞东西

1. 它到底做了什么?

在传统的开发中,系统提示词通常是静态的。而 @dynamic_prompt 允许你定义一个函数,这个函数会在每次调用模型前执行:

  • 拦截请求:拦截当前的 ModelRequest
  • 计算逻辑:根据你提供的逻辑(比如判断消息数量、读取当前时间、从 Store 拿用户偏好)。
  • 修改指令:将返回的字符串内容,追加或注入到最终发给大模型的系统指令中。

2. 它注入的是 System Message 吗?

是的,但不完全是“覆盖”。

  • 它通常是在你原有的 System Message 基础上进行增强
  • 例如,你在代码里定义了:"You are a helpful assistant."
  • @dynamic_prompt 函数返回了:"User prefers Chinese."
  • 最终发给模型的系统指令会合并成:"You are a helpful assistant.\nUser prefers Chinese."

3. 三种典型的“塞入”来源

正如你正在看的 上下文工程 页面所示,这个中间件可以从三个地方抓取信息来修改提示词:

来源场景示例页面对应章节
State (短期)如果对话超过 10 轮,自动在提示词加一句“请简短回答”。1)使用 State 管理上下文
Store (长期)自动从数据库读出该用户的 ID 对应的语言偏好。2)使用 Store 管理上下文
Runtime (运行时)根据当前是“管理员”还是“普通用户”身份,动态开关某个工具的使用权限。3)使用 Runtime 管理上下文

4. 为什么要用它,而不是直接改消息列表?

你可能会问:我直接在 messages 列表里手动加一行 SystemMessage 不行吗? 使用 @dynamic_prompt 的高级之处在于:

  • 解耦:你的业务逻辑节点(Node)只需要关心“怎么回答问题”,而“针对不同用户的个性化指令”交由中间件自动处理。
  • 无感注入:它不会污染你的 MessagesState(即对话历史里不会多出这些指令),它只存在于发往 LLM 的那一瞬间。
  • 实时性:它能确保模型拿到的永远是最新的 Runtime 环境变量(比如当前的精确时间或用户实时切换的状态)。

总结一下: @dynamic_prompt 就像是一个“提示词补丁”。它让原本死板的 System Message 变成了可以根据当前上下文(State/Store/Runtime)自动变身的动态指令集

多智能体系统(MAS)

原教程文档

分析一下本页实例,“多代理”两种实现方式,都是将每个agent包成tools从tools传入的,即使用的是tools的架构思路实现的。所以,多代理机制仅仅是个思路/架构的概念,而没有新的api或知识点/技术创新是吧?

多智能体(Multi-Agent)在底层实现上,确实大量复用了 Tool-calling 这一核心技术。

1. “术”的层面:确实是 Tool-calling 的包装

本页的第一个例子:

  • 实现方式:将 subagent1subagent2 包装成函数,并加上 @tool 装饰器。
  • 本质:对于 supervisor_agent 来说,它根本不知道对面是一个“智能体”还是一个“简单的加法函数”。它只看到一个描述为“可以计算加法”的工具。
  • 结论:在这个维度上,多智能体 = 递归的 Tool-calling。这确实没有引入新的底层 API。

2. “道”的层面:为什么它被称为一种“模式”?

虽然底层是工具,但“监督者模式”引入了几个关键的 状态管理逻辑流控制 的概念,这些是单智能体 Tool-calling 不具备的:

  • 上下文隔离(Context Isolation): 这是最核心的进步。如果是一个 Agent 挂 100 个工具,它的 System Prompt 会变得极长且容易混乱。而在多智能体架构中,subagent1 只拿到了它需要的消息,它并不知道 subagent2 之前做了什么。这种局部上下文极大地提高了复杂任务的准确性。
  • 控制权转移(Handoff/Transfer): 在本页第二个例子 langgraph-supervisor 中,你会看到 transfer_to_subagent 这样的字眼。这不仅仅是“调用工具”,它涉及到 Graph 节点的切换
  • 在普通的 Tool 调用中,控制权始终在主 Agent 手里。
  • 在 Supervisor 模式下,控制权可以完全移交给子 Agent,甚至支持多轮对话后再返回给监督者。

3. 技术创新点在哪里?

如果你觉得第一个例子太简单,可以留意一下 langgraph-supervisor 这个包带来的“新东西”:

  • 自动路由(Auto-routing):它不需要你手动写 call_subagent 这种包装函数。它定义了一套协议,让 Graph 能自动识别如何把 State(对话状态)在不同的 Agent 节点之间进行平滑传递。
  • 状态合并(State Merging):当子 Agent 完成任务后,它是只返回一个结果字符串,还是把它的整个中间思考过程(Messages)合并回主 Graph?这种状态合并策略是多智能体框架特有的技术细节。

总结与反思

你的理解是对的:多智能体本质上是把“大型问题”拆解为“多个小型 Tool-calling 问题”的组织架构。

  • 没有黑科技:它没有改变 LLM 推理的本质。
  • 有架构创新:它解决了单智能体在面对“工具爆炸”(Too many tools)和“长文本迷失”(Lost in the middle)时的无力感。

正如页面结尾所说的,作者自己也承认:“多智能体系统在现阶段可能会降低准确性”。这证明了它不是万灵药,而是一种针对 极高复杂度任务 的工程妥协。

控制权转移是什么?

在 LangGraph 的多智能体架构中,控制权转移(Handoff / Transfer) 标志着从“简单的函数调用”进化到了“流程化的协作”。

如果说第一种方式(Tool-calling)只是主智能体在“借用工具”,那么控制权转移则意味着“换人上岗”。具体来说,它包含以下三个维度的深层含义:

1. 节点(Node)的物理切换

在 LangGraph 的图结构中,控制权转移意味着当前执行的指针从 Supervisor 节点跳转到了 Sub-agent 节点。

  • Tool-calling 模式:始终停留在 Supervisor 节点,只是执行了一个 Python 函数并拿回了字符串。
  • 控制权转移模式:流程真正进入了子智能体的地盘。如你在页面看到的 transfer_to_subagent-2,它是通过 langgraph-supervisor 生成的特殊指令,触发了图导航(Routing)。

2. 状态(State)的接管与自治

这是最本质的区别。当控制权转移给子智能体后,它拥有了“对话主权”

  • 独立推理:子智能体可以根据自己的 system_prompt 和工具集,进行多次思考(Reasoning)和多次工具调用,而不需要每一退步都回传给监督者请示。
  • 消息历史隔离:子智能体可以维护自己局部的消息列表。直到它决定“任务完成”并触发 transfer_back_to_supervisor,控制权才会交回。

3. “长跑”与“短跑”的区别

  • Tool-calling 是“短跑”:主智能体问:“1+1等于几?”,子智能体工具瞬间回一个“2”,结束。这适合原子化的简单任务。
  • 控制权转移是“接力赛”:主智能体说:“帮我写一个关于 AI 的研究报告”。
  • 控制权转交给 Researcher Agent,它去搜资料、翻文档、写草稿(中间可能跑了 10 轮对话)。
  • 完成后,Researcher 发起 transfer_back,将整份报告交还。
  • 监督者拿到报告后再进行下一步。

4. 为什么要“交回控制权”?

你在示例中看到的 transfer_back_to_supervisor 是为了解决闭环问题。 在多步任务中(如你算的那个长数学题),如果子智能体干完活不把控制权交回,整个图就会卡在子节点或者直接结束。只有交回控制权,监督者才能意识到“第一步除法做完了,现在我该安排第二步乘法了”。

总结

控制权转移 实际上是让“智能体”回归其作为“独立决策实体”的本色

  • 它意味着 Scope(作用域) 的切换。
  • 它意味着 Responsibility(责任) 的交付。
  • 它解决了单 Agent 无法处理的超长逻辑链问题,通过让不同的专家(子 Agent)在各自擅长的领域“深度接管”一段时间,来换取更高的准确性。

监督者模式页面 所述,虽然现阶段这种转移可能会因为流程变长而增加出错概率,但在处理需要“专家深挖”的复杂任务时,这种控制权的交接是必不可少的架构设计。

你的理解完全正确。这正是从“工具箱模式”进化到“部门协作模式”的核心逻辑。

为了让你更透彻地理解这个“小循环”是如何运作的,我们可以把这个过程拆解为空间(上下文)时间(控制权)两个维度:

1. 为什么“分而治之”比“100 个 Tools”更强?

当一个 Agent 挂载 100 个工具时,它就像一个面对 100 个按钮的工人,很容易“按错按钮”或者“忘记按钮的功能”。

多智能体系统 模式下:

  • 角色专业化:你可以把 30 个财务工具给 Accountant Agent,30 个搜索工具给 Researcher Agent
  • 注意力集中:进入 Accountant Agent 的小循环后,它完全看不见那 70 个无关工具。这极大地降低了 LLM 的推理负担,准确率显著提升。

2. “小循环”内发生了什么?

当你通过 supervisor_agent 触发任务后,子智能体的执行过程如下:

  1. 接管意图:Supervisor 说:“去查一下这笔账单”。
  2. 进入黑盒:控制权转移到子智能体。它开始自己的 ReAct 循环(思考 -> 调用工具 1 -> 观察结果 -> 思考 -> 调用工具 2...)。
  3. 多轮交互:子智能体可能在内部进行了 5 轮对话才得出结论。
  4. 结果收敛:子智能体将最终结果封装成一个字符串(或消息)。
  5. 主动移交:子智能体触发 transfer_back_to_supervisor,带着结果跳出循环。

3. Supervisor 看到的只是“结果”吗?

这里有一个非常关键的技术细节,取决于你的实现方式:

  • 封装为 Tool (方式一):如果你像页面第一个例子那样用 @tool 包装子智能体,那么 Supervisor 只能看到子智能体最后 return 的那句话。中间的思考过程和工具调用记录对主图是不可见的。
  • 集成到 Graph (方式二):如果你使用 create_supervisor,子智能体产生的消息可以根据你的配置有选择地合并回主状态(State)。这让 Supervisor 能看到:“哦,他查了数据库,又查了网页,最后给了我这个结论。”

一句话总结: 控制权转移让子智能体从“被动执行的 API”变成了“拥有独立思考空间(Node)的合作伙伴”。

并发

原始教程地址

这一节笔记又多了,单开一文:并发


Backlinks