并发
直接分发到多个节点
这是Langgraph原生能力,
- 一个节点指向多个节点,就并发了
- 多个节点指向是一个节点,就能等待了
\@task与实现并发
# 可并发的执行单元
@task
def generate_paragraph(topic: str) -> str:
response = llm.invoke([
{"role": "system", "content": "你是一个会写科普文的智能助手"},
{"role": "user", "content": (
f"写一个关于{topic}的段落,大约100字。"
"你写的内容只是整篇文章的一个段落"
"因此请聚焦给定话题,不需要概述性文字")}
])
return response.content
# 创建持久化检查点
checkpointer = InMemorySaver()
# 工作流入口函数,可并发地调用被 @task 装饰的函数
@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
# 将 topic 分发到多个任务节点,并发执行
futures = [generate_paragraph(topic) for topic in topics]
paragraphs = [f.result() for f in futures]
return "\n\n".join(paragraphs)
# 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["海胆豆腐是什么", "海胆豆腐有多美味", "海胆豆腐的原料和做法"], config=config)
print(result)
@task与@entrypoint是LangGraph的并发能力,虽然设计为调用大模型,但其实可以执行任何函数。
你的理解非常透彻,精准地抓住了 LangGraph Functional API(函数式 API)的设计意图。我们可以把这两个装饰器的作用看作是“协议转换”和“任务单元化”:
1. \@entrypoint:实现协议与上下文
你说的“实现了 invoke”是非常准确的。当一个普通 Python 函数被 @entrypoint 包装后,它就不再是一个简单的函数,而变成了一个 LangGraph 对象(Runnable)。
- 统一接口:它确实实现了
.invoke()、.stream()和.astream()。这意味着你可以像调用一个复杂的StateGraph一样调用它。 - 上下文注入:它为内部代码提供了一个“沙盒”。在这个沙盒里,LangGraph 会自动注入 Checkpointer(持久化/记忆)、Configuration(配置) 和 Writer(流式输出)。
- 状态管理:即使你没有显式定义 Graph,
@entrypoint也会帮你管理整个任务的生命周期,确保数据能正确流向内部的各个@task。
2. @task:实现任务单元化与异步编排 (Private)
- 并发句柄(Future):调用被
@task装饰的函数时,它会立即返回一个 TaskFuture 对象。这让你能写出[task_a(i) for i in items]这种极其简洁的并发代码,而不需要手动去写asyncio.gather或ThreadPoolExecutor。 - 结果提取:你通过
f.result()来获取值。这背后的巧妙之处在于,LangGraph 引擎会接管这些任务的调度,自动利用底层的并发能力(如 ThreadPoolExecutor)来运行它们。 - 可观测性的锚点:每个
@task在 LangSmith 追踪中都是一个独立的分支节点。如果没有这个包装,并行的任务在日志里会乱成一团;有了它,你能清晰看到每个并发任务的开始、结束和耗时。
Map-Reduce
为什么绘制的图里看不到条件边?
因为continue_to_response节点只有一个去向,所以绘图时简化了。
continue_to_response的具体实现里,通过Send函数事实上指向了generate_response,如果不在add_conditional_edges里配置第三个参数,只是影响编译时判断continue_to_response的去向,但是不影响实际的去向吧?还是判断不了就直接报错?如果是这样,我随便写个去向通过编译,但真实去向由函数体决定,可以吗?
- 不配第三个参数:不行,无法通过验证。
- 乱配骗编译:不行,运行时会因为不在白名单而拦截。
- 函数体决定去向:是的,函数体决定哪一个去向被激活,但所有可能的去向必须提前在
add_conditional_edges中报备。
因此如果人的条件节点Send了多个去向,需要在add_conditional_edges里配全(此时起到一个白名单的作用)。
- 普通条件边,是节点会返回多个预设的值,每个值对应一个节点(在第三个参数里映射),所以第三个参数应该是个字典对吧?
- 如果省略三参数,那么条件节点里应该直接返回目标节点对吧?
- 如果第三参数只有一个单值而不是映射表,如本例,那么绘图时可能会省掉这条边直接指向这个节点对吧?
- 条件节点也可以用Send来调用不同节点吧?这个时候就要在第三参数里全部体现?
- 最后,send函数的参数
generate_response到底是函数名,还是节点名?因为起成了一样的名字
1. 第三个参数(Path Map)的形式
- 标准形式是字典:
add_conditional_edges(source, path_function, {"option_a": "node_a", "option_b": "node_b"})。 path_function返回字符串"option_a"。- 字典负责将其路由到物理节点
"node_a"。 - 省略字典的情况:如果
path_function直接返回目标节点的名称(字符串),那么第三个参数可以省略。但前提是编译器能确定这些节点确实存在。 - 本例中的列表形式:在本页 Map-reduce 例子中,它传的是一个列表
["generate_response"]。这是专为Send函数优化的场景,它的功能从路由退化成了白名单。["generate_response"]实际上是{"generate_response": "generate_response"}的简化形式。
如果条件节点是普通节点(返节点或字符串,而不是
Send函数), 第三参数却写成了字符串列表呢? 比如["node_a", "node_b"]
编译器不会报错,但会迷路
- LangGraph 它会自动把列表转换成一个
{"node_a": "node_a", "node_b": "node_b"}的字典。 - 如果你的节点返回 "
node_a":正常运行。因为在它自动生成的“字典”里,"node_a" 确实指向 "node_a"。 - 如果你的节点返回 "
success":报错。 运行时会提示:KeyError: 'success' is not in ['node_a', 'node_b']。
2. 为什么绘图时省掉了这条边?
是的,这正是可视化工具的优化策略。
当 add_conditional_edges 的目标映射(第三个参数)只有一个固定值时,拓扑结构上它是“确定性”的。
- 虽然逻辑上它是“条件边”(因为要通过函数计算发几个
Send),但在图的静态结构里,它只有一条路可走。 - 为了保持图的简洁,Mermaid 渲染时通常会忽略那个“路由函数”的小方块,直接从起点连向终点。
3. 条件节点可以用 Send 调用不同节点吗?
可以,而且必须在第三个参数里报备。
如果你想在 continue_to_responses 里根据逻辑,有的发给 node_a,有的发给 node_b:
- 代码实现:函数内返回
[Send("node_a", ...), Send("node_b", ...)]。 - 配置要求:
add_conditional_edges的第三个参数必须包含这两个目标:["node_a", "node_b"](或对应的映射字典)。 - 原因:LangGraph 需要在启动并发任务前,确认所有
Send的目标都在预设的“白名单”里,否则会报非法路径错误。
4. Send 函数的第一个参数到底是啥?
是节点名(Node Name),而不是函数名。
在 LangGraph 中,节点名和函数名经常起成一样的(比如 builder.add_node("generate_response", generate_response)),但这会导致混淆。
Send("generate_response", ...)指向的是add_node时定义的那个 Key(字符串)。- 如果你定义成
builder.add_node("worker_node", generate_response),那么你的Send必须写成Send("worker_node", ...)。
真正的并行是由哪个api实现的?Send函数吗?Send函数还保证了它下游的节点跑的是另一套state schema?
在 LangGraph 的设计中,Send 确实是实现“动态并行”的灵魂,但它背后依赖的是一套状态隔离(State Isolation)与自动合并(Reducer)的数学逻辑。
1. 真正的并行(并发)是由谁实现的?
严格来说,Send 函数本身只是一个“指令(Instruction)”。
- 它的角色:它告诉 LangGraph 引擎:“请把这部分数据,分发给某个节点的一个新实例”。
- 真正的执行者:真正的并行(在 Python 中主要是基于
asyncio或ThreadPoolExecutor的并发)是由 LangGraph 运行时(Runtime/Compiler) 实现的。 - 底层 API:当你调用
graph.invoke()时,LangGraph 引擎会扫描所有的Send对象。如果发现有多个Send指向同一个或不同的节点,引擎会启动并发任务流(Concurrent tasks)。
关键点:
Send实现了 Map(分发),而并发执行是引擎在拿到Send列表后的默认行为。
2. Send 如何保证下游运行“另一套 State Schema”?
你的观察非常敏锐。这正是 Send 最神奇的地方:局部状态隔离。
在 Map-reduce 例子中:
- 全局 State 是
Overall(包含所有人的回复列表)。 - 下游节点
generate_response**的输入声明是Role(只包含一个字符串)。 可见,在节点定义的时候就已经把不同的State Schema区分开了。
它是如何保证不冲突的?
- 独立副本:每一个
Send("node_name", payload)都会触发目标节点的一个全新、独立的执行环境。 - 输入裁剪:
Send的第二个参数payload会被作为该节点执行时的初始State。它不再看全局的Overall,而是只盯着你传给它的那个小 JSON。 - 避免竞态:因为每个并行的节点都在操作自己私有的、裁剪后的 State 副本,所以它们之间不存在内存竞争。这解决了并发编程中最头疼的同步问题。
3. “发出去”容易,“收回来”才是技术活
既然每个 generate_response 都在跑自己的小 Schema(只返回一个回复),那最后怎么变回全局的大 Schema(所有回复的列表)呢?
这依赖于我们在 1)创建上下文的 schema 里定义的 Reducer:
class Overall(TypedDict):
# Annotated[类型, 处理函数]
responses: Annotated[list, operator.add]
这是一个reducer的语法糖,我们碰到过多次了,包括AgentState的messages也是这样,说明各子任务返回的responses最终都会集合起来,这不是神奇的地方,神奇的在于这样一个小State进,大State出的机制: 你的子任务既然构造为是Role State
,return的时候却是往Overall里塞,不单独给你指出来,很容易忽略掉这个关键机制,这正是 LangGraph Map-Reduce 架构中最精妙的“魔术”:状态的非对称流动。
子图
原教程创建一个工作流里,做了一个子图:
# 定义ReAct子图的全局Schema
class ReActState(TypedDict):
roles: list[str]
n: int
new_roles: list[str]
responses: list[str]
# 定义ReAct子图的输出Schema
class ReActOutput(TypedDict):
new_roles: list[str]
responses: list[str]
# 创建图
react_builder = StateGraph(
state_schema=ReActState,
output_schema=ReActOutput
)
与上面的不对称相反,上面是子任务里裁剪输入,输出与大部队保持一致,这里显然定义为输入是大State,输出则裁剪为部分字段。目的是为了隐藏子图的细节,只给出答案即可,同时,也是避免子图与外部大图产生同名的键的冲突。
当然,例子并没有举好,因为它的子图直接对接到END了,没有任何需要“净化”输出的必要。
需要知道的是, output_schema只影响最终输出,整个图的状态流转一定是state_schema
本页两个工作流组成一个大图的例子中:
- doge_builder图是用Send功能实现隐式并发(map),以及输入裁剪。在下一个节点里reduce.react_build则是用ThreadPoolExecutor来手动实现并发,也没有map-reduce的概念。
- doge图生成单条回复是一个节点,react图则是一个纯方法(因为是在一个节点里直接并发的).分析对比一下这两个实现的优劣和适用场景(功能是一样的,都是为一组角色生成角色的反应)。
- 两个图能接到一个父图,潜在要求入口state schema必须是父schema的子集,这里文章没说,但从精心构建的三个schema能看出来。
你的分析非常透彻,已经完全摸清了 LangGraph 在处理并发时的“工程美学”和“权衡取舍”。针对你提出的四个点,我们逐一进行深度复盘和对比。
1. 并发实现方式对比:Send vs ThreadPoolExecutor
| 维度 | doge_builder (Send / Map-Reduce) | react_builder (ThreadPoolExecutor) |
|---|---|---|
| 颗粒度 | 节点级。每个并发任务都是 Graph 中的一个独立节点实例。 | 函数级。所有并发任务缩在同一个节点函数内部。 |
| 可观测性 | 极高。在 LangSmith 或可视化图中能看到每一个角色的处理过程。 | 低。外部只看到一个节点在运行,内部并发是黑盒。 |
| 状态管理 | 自动。利用 Reducer (operator.add) 自动缝合。 | 手动。需要自己处理 executor.map 的结果汇总。 |
| 错误恢复 | 支持断点重试。如果 10 个角色中 1 个失败,可以只重试那一个。 | 全灭。函数内报错通常导致整个节点失败,需从头再来。 |
适用场景建议:
- 使用
Send(Map-Reduce):任务耗时长(如长文本生成)、需要高可靠性、或者你希望在可视化界面监控每个子任务的进度。 - 使用 `ThreadPoolExecutor`**:任务极短(毫秒级 API 调用)、不需要复杂的重试机制、追求极致的精简代码。
2. 节点 vs 纯方法:架构意义上的区别
你提到 doge 生成回复是一个节点,而 react 是在节点内并发。
doge的“节点化”:意味着它拥有自己的 Checkpoint(检查点)。如果程序断电,它能记住哪些角色回复了,哪些还没回。react的“纯方法化”:它把并发当成了工具函数。虽然写起来爽(像普通 Python 开发),但它丢掉了 LangGraph 作为“状态机”的核心优势——长时运行的容错性。
3. 父子图的 Schema 契约(子集关系)
你的观察极其敏锐。虽然文章没明说,但这是 Sub-graphs 成功的潜规则:
- 输入匹配:当父图调用
fa_builder.add_node("doge_graph", doge_builder.compile())时,父图必须保证传给这个节点的数据(EntryGraphState)能够覆盖子图所需的Overall字段。 - 字段对齐:子图不需要知道父图的所有字段,但父图传给子图的字典必须包含子图
START节点所需的 Key。 - 本质:这是一种结构化映射。只要父 State 包含
roles字段,且类型一致,子图就能无缝运行。
总结
作者特意构建了两个截然不同的子图来形成对比:
- doge_graph:它是标准的 Agent 编排模式(Agentic 工作流)。利用 LangGraph 原生的
Send机制实现并发,每个角色都是图里的一个独立节点。这演示了“如何用框架自带的工具做 Map-Reduce”。 - react_graph:它是传统的 Python 并发模式。它在节点内部直接用了
ThreadPoolExecutor。这演示了“如何把一段现有的、非 LangGraph 原生的并发代码封装成一个子图”。
这个例子最精妙的地方在于,父图在调用它们时,代码是完全一致的:
fa_builder.add_node("doge_graph", doge_builder.compile())
fa_builder.add_node("react_graph", react_builder.compile())
对于父图来说,它根本不在乎 doge 里面是用 Send 还是用 while 循环,也不在乎 react 里面是用线程池还是协程。它只关注接口契约:
- 输入:你得给我
roles。 - 输出:你得还我
responses。
Backlinks