并发

直接分发到多个节点

这是Langgraph原生能力,

  • 一个节点指向多个节点,就并发了
  • 多个节点指向是一个节点,就能等待了

\@task与实现并发

# 可并发的执行单元
@task
def generate_paragraph(topic: str) -> str:
    response = llm.invoke([
        {"role": "system", "content": "你是一个会写科普文的智能助手"},
        {"role": "user", "content": (
            f"写一个关于{topic}的段落,大约100字。"
            "你写的内容只是整篇文章的一个段落"
            "因此请聚焦给定话题,不需要概述性文字")}
    ])
    return response.content

# 创建持久化检查点
checkpointer = InMemorySaver()

# 工作流入口函数,可并发地调用被 @task 装饰的函数
@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    # 将 topic 分发到多个任务节点,并发执行
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 运行工作流
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
result = workflow.invoke(["海胆豆腐是什么", "海胆豆腐有多美味", "海胆豆腐的原料和做法"], config=config)
print(result)

@task@entrypoint是LangGraph的并发能力,虽然设计为调用大模型,但其实可以执行任何函数。

你的理解非常透彻,精准地抓住了 LangGraph Functional API(函数式 API)的设计意图。我们可以把这两个装饰器的作用看作是“协议转换”“任务单元化”

1. \@entrypoint:实现协议与上下文

你说的“实现了 invoke”是非常准确的。当一个普通 Python 函数被 @entrypoint 包装后,它就不再是一个简单的函数,而变成了一个 LangGraph 对象(Runnable)

  • 统一接口:它确实实现了 .invoke().stream().astream()。这意味着你可以像调用一个复杂的 StateGraph 一样调用它。
  • 上下文注入:它为内部代码提供了一个“沙盒”。在这个沙盒里,LangGraph 会自动注入 Checkpointer(持久化/记忆)Configuration(配置)Writer(流式输出)
  • 状态管理:即使你没有显式定义 Graph,@entrypoint 也会帮你管理整个任务的生命周期,确保数据能正确流向内部的各个 @task

2. @task:实现任务单元化与异步编排 (Private)

  • 并发句柄(Future):调用被 @task 装饰的函数时,它会立即返回一个 TaskFuture 对象。这让你能写出 [task_a(i) for i in items] 这种极其简洁的并发代码,而不需要手动去写 asyncio.gatherThreadPoolExecutor
  • 结果提取:你通过 f.result() 来获取值。这背后的巧妙之处在于,LangGraph 引擎会接管这些任务的调度,自动利用底层的并发能力(如 ThreadPoolExecutor)来运行它们。
  • 可观测性的锚点:每个 @taskLangSmith 追踪中都是一个独立的分支节点。如果没有这个包装,并行的任务在日志里会乱成一团;有了它,你能清晰看到每个并发任务的开始、结束和耗时。

Map-Reduce

为什么绘制的图里看不到条件边?

因为continue_to_response节点只有一个去向,所以绘图时简化了。

continue_to_response的具体实现里,通过Send函数事实上指向了generate_response,如果不在add_conditional_edges里配置第三个参数,只是影响编译时判断continue_to_response的去向,但是不影响实际的去向吧?还是判断不了就直接报错?如果是这样,我随便写个去向通过编译,但真实去向由函数体决定,可以吗?

  • 不配第三个参数:不行,无法通过验证。
  • 乱配骗编译:不行,运行时会因为不在白名单而拦截。
  • 函数体决定去向:是的,函数体决定哪一个去向被激活,但所有可能的去向必须提前在 add_conditional_edges 中报备。

因此如果人的条件节点Send了多个去向,需要在add_conditional_edges里配全(此时起到一个白名单的作用)。

  1. 普通条件边,是节点会返回多个预设的值,每个值对应一个节点(在第三个参数里映射),所以第三个参数应该是个字典对吧?
  2. 如果省略三参数,那么条件节点里应该直接返回目标节点对吧?
  3. 如果第三参数只有一个单值而不是映射表,如本例,那么绘图时可能会省掉这条边直接指向这个节点对吧?
  4. 条件节点也可以用Send来调用不同节点吧?这个时候就要在第三参数里全部体现?
  5. 最后,send函数的参数generate_response到底是函数名,还是节点名?因为起成了一样的名字

1. 第三个参数(Path Map)的形式

  • 标准形式是字典add_conditional_edges(source, path_function, {"option_a": "node_a", "option_b": "node_b"})
  • path_function 返回字符串 "option_a"
  • 字典负责将其路由到物理节点 "node_a"
  • 省略字典的情况:如果 path_function 直接返回目标节点的名称(字符串),那么第三个参数可以省略。但前提是编译器能确定这些节点确实存在。
  • 本例中的列表形式:在本页 Map-reduce 例子中,它传的是一个列表 ["generate_response"]。这是专为Send函数优化的场景,它的功能从路由退化成了白名单
    • ["generate_response"] 实际上是{"generate_response": "generate_response"} 的简化形式。

如果条件节点是普通节点(返节点或字符串,而不是Send函数), 第三参数却写成了字符串列表呢? 比如["node_a", "node_b"]

编译器不会报错,但会迷路

  • LangGraph 它会自动把列表转换成一个 {"node_a": "node_a", "node_b": "node_b"} 的字典。
  • 如果你的节点返回 "node_a":正常运行。因为在它自动生成的“字典”里,"node_a" 确实指向 "node_a"。
  • 如果你的节点返回 "success":报错。 运行时会提示:KeyError: 'success' is not in ['node_a', 'node_b']。

2. 为什么绘图时省掉了这条边?

是的,这正是可视化工具的优化策略。add_conditional_edges 的目标映射(第三个参数)只有一个固定值时,拓扑结构上它是“确定性”的。

  • 虽然逻辑上它是“条件边”(因为要通过函数计算发几个 Send),但在图的静态结构里,它只有一条路可走。
  • 为了保持图的简洁,Mermaid 渲染时通常会忽略那个“路由函数”的小方块,直接从起点连向终点。

3. 条件节点可以用 Send 调用不同节点吗?

可以,而且必须在第三个参数里报备。 如果你想在 continue_to_responses 里根据逻辑,有的发给 node_a,有的发给 node_b

  • 代码实现:函数内返回 [Send("node_a", ...), Send("node_b", ...)]
  • 配置要求add_conditional_edges 的第三个参数必须包含这两个目标:["node_a", "node_b"](或对应的映射字典)。
  • 原因:LangGraph 需要在启动并发任务前,确认所有 Send 的目标都在预设的“白名单”里,否则会报非法路径错误。

4. Send 函数的第一个参数到底是啥?

是节点名(Node Name),而不是函数名。 在 LangGraph 中,节点名和函数名经常起成一样的(比如 builder.add_node("generate_response", generate_response)),但这会导致混淆。

  • Send("generate_response", ...) 指向的是 add_node 时定义的那个 Key(字符串)。
  • 如果你定义成 builder.add_node("worker_node", generate_response),那么你的 Send 必须写成 Send("worker_node", ...)

真正的并行是由哪个api实现的?Send函数吗?Send函数还保证了它下游的节点跑的是另一套state schema?

在 LangGraph 的设计中,Send 确实是实现“动态并行”的灵魂,但它背后依赖的是一套状态隔离(State Isolation)自动合并(Reducer)的数学逻辑。

1. 真正的并行(并发)是由谁实现的?

严格来说,Send 函数本身只是一个“指令(Instruction)”

  • 它的角色:它告诉 LangGraph 引擎:“请把这部分数据,分发给某个节点的一个新实例”。
  • 真正的执行者:真正的并行(在 Python 中主要是基于 asyncioThreadPoolExecutor并发)是由 LangGraph 运行时(Runtime/Compiler) 实现的。
  • 底层 API:当你调用 graph.invoke() 时,LangGraph 引擎会扫描所有的 Send 对象。如果发现有多个 Send 指向同一个或不同的节点,引擎会启动并发任务流(Concurrent tasks)。

关键点Send 实现了 Map(分发),而并发执行是引擎在拿到 Send 列表后的默认行为。


2. Send 如何保证下游运行“另一套 State Schema”?

你的观察非常敏锐。这正是 Send 最神奇的地方:局部状态隔离

Map-reduce 例子中:

  • 全局 StateOverall(包含所有人的回复列表)。
  • 下游节点 generate_response** 的输入声明是 Role(只包含一个字符串)。 可见,在节点定义的时候就已经把不同的State Schema区分开了。

它是如何保证不冲突的?

  1. 独立副本:每一个 Send("node_name", payload) 都会触发目标节点的一个全新、独立的执行环境
  2. 输入裁剪Send 的第二个参数 payload 会被作为该节点执行时的初始 State。它不再看全局的 Overall,而是只盯着你传给它的那个小 JSON。
  3. 避免竞态:因为每个并行的节点都在操作自己私有的、裁剪后的 State 副本,所以它们之间不存在内存竞争。这解决了并发编程中最头疼的同步问题。

3. “发出去”容易,“收回来”才是技术活

既然每个 generate_response 都在跑自己的小 Schema(只返回一个回复),那最后怎么变回全局的大 Schema(所有回复的列表)呢?

这依赖于我们在 1)创建上下文的 schema 里定义的 Reducer

class Overall(TypedDict):
    # Annotated[类型, 处理函数]
    responses: Annotated[list, operator.add] 

这是一个reducer的语法糖,我们碰到过多次了,包括AgentStatemessages也是这样,说明各子任务返回的responses最终都会集合起来,这不是神奇的地方,神奇的在于这样一个小State进,大State出的机制: 你的子任务既然构造为是Role State ,return的时候却是往Overall里塞,不单独给你指出来,很容易忽略掉这个关键机制,这正是 LangGraph Map-Reduce 架构中最精妙的“魔术”:状态的非对称流动

子图

原教程创建一个工作流里,做了一个子图:

# 定义ReAct子图的全局Schema
class ReActState(TypedDict):
    roles: list[str]
    n: int
    new_roles: list[str]
    responses: list[str]

# 定义ReAct子图的输出Schema
class ReActOutput(TypedDict):
    new_roles: list[str]
    responses: list[str]

# 创建图
react_builder = StateGraph(
    state_schema=ReActState,
    output_schema=ReActOutput
)

与上面的不对称相反,上面是子任务里裁剪输入,输出与大部队保持一致,这里显然定义为输入是大State,输出则裁剪为部分字段。目的是为了隐藏子图的细节,只给出答案即可,同时,也是避免子图与外部大图产生同名的键的冲突。

当然,例子并没有举好,因为它的子图直接对接到END了,没有任何需要“净化”输出的必要。

需要知道的是, output_schema只影响最终输出,整个图的状态流转一定是state_schema

本页两个工作流组成一个大图的例子中:

  1. doge_builder图是用Send功能实现隐式并发(map),以及输入裁剪。在下一个节点里reduce.react_build则是用ThreadPoolExecutor来手动实现并发,也没有map-reduce的概念。
  2. doge图生成单条回复是一个节点,react图则是一个纯方法(因为是在一个节点里直接并发的).分析对比一下这两个实现的优劣和适用场景(功能是一样的,都是为一组角色生成角色的反应)。
  3. 两个图能接到一个父图,潜在要求入口state schema必须是父schema的子集,这里文章没说,但从精心构建的三个schema能看出来。

你的分析非常透彻,已经完全摸清了 LangGraph 在处理并发时的“工程美学”和“权衡取舍”。针对你提出的四个点,我们逐一进行深度复盘和对比。

1. 并发实现方式对比:Send vs ThreadPoolExecutor

维度doge_builder (Send / Map-Reduce)react_builder (ThreadPoolExecutor)
颗粒度节点级。每个并发任务都是 Graph 中的一个独立节点实例。函数级。所有并发任务缩在同一个节点函数内部。
可观测性极高。在 LangSmith 或可视化图中能看到每一个角色的处理过程。。外部只看到一个节点在运行,内部并发是黑盒。
状态管理自动。利用 Reducer (operator.add) 自动缝合。手动。需要自己处理 executor.map 的结果汇总。
错误恢复支持断点重试。如果 10 个角色中 1 个失败,可以只重试那一个。全灭。函数内报错通常导致整个节点失败,需从头再来。

适用场景建议:

  • 使用 Send (Map-Reduce):任务耗时长(如长文本生成)、需要高可靠性、或者你希望在可视化界面监控每个子任务的进度。
  • 使用 `ThreadPoolExecutor`**:任务极短(毫秒级 API 调用)、不需要复杂的重试机制、追求极致的精简代码。

2. 节点 vs 纯方法:架构意义上的区别

你提到 doge 生成回复是一个节点,而 react 是在节点内并发。

  • doge 的“节点化”:意味着它拥有自己的 Checkpoint(检查点)。如果程序断电,它能记住哪些角色回复了,哪些还没回。
  • react 的“纯方法化”:它把并发当成了工具函数。虽然写起来爽(像普通 Python 开发),但它丢掉了 LangGraph 作为“状态机”的核心优势——长时运行的容错性

3. 父子图的 Schema 契约(子集关系)

你的观察极其敏锐。虽然文章没明说,但这是 Sub-graphs 成功的潜规则:

  • 输入匹配:当父图调用 fa_builder.add_node("doge_graph", doge_builder.compile()) 时,父图必须保证传给这个节点的数据(EntryGraphState)能够覆盖子图所需的 Overall 字段。
  • 字段对齐:子图不需要知道父图的所有字段,但父图传给子图的字典必须包含子图 START 节点所需的 Key。
  • 本质:这是一种结构化映射。只要父 State 包含 roles 字段,且类型一致,子图就能无缝运行。

总结

作者特意构建了两个截然不同的子图来形成对比:

  • doge_graph:它是标准的 Agent 编排模式(Agentic 工作流)。利用 LangGraph 原生的 Send 机制实现并发,每个角色都是图里的一个独立节点。这演示了“如何用框架自带的工具做 Map-Reduce”。
  • react_graph:它是传统的 Python 并发模式。它在节点内部直接用了 ThreadPoolExecutor。这演示了“如何把一段现有的、非 LangGraph 原生的并发代码封装成一个子图”。

这个例子最精妙的地方在于,父图在调用它们时,代码是完全一致的:

fa_builder.add_node("doge_graph", doge_builder.compile())
fa_builder.add_node("react_graph", react_builder.compile())

对于父图来说,它根本不在乎 doge 里面是用 Send 还是用 while 循环,也不在乎 react 里面是用线程池还是协程。它只关注接口契约

  • 输入:你得给我 roles
  • 输出:你得还我 responses

Backlinks