从Deer-flow学习LangGraph企业最佳实践
前言:字节跳动开源的超级智能体deer-flow构建于
LangGraph,这样岂不是一个天生的LangGraph的最佳实践教程?这是一个很好的学习路径。本文是从源码生成的教程,部署和二次开发则是部署和二次开发的一些说明。
DeerFlow 2.0 是一个基于 LangGraph 和 LangChain 构建的开源 "超级代理 harness",它编排子代理、内存和沙箱来执行复杂任务。其结构良好的实现使其成为在生产就绪环境中学习高级 LangGraph 概念的卓越资源。
本教程将使用 DeerFlow 的实现作为具体示例,引导您学习 LangGraph 概念,遵循从基础概念到高级模式的渐进学习路径。
为什么选择 DeerFlow 学习 LangGraph?
DeerFlow 体现了几个关键的 LangGraph 原则:
- 生产就绪模式:不仅仅是玩具示例,而是经过实际考验的实现
- 清晰的关注点分离:不同模块处理状态、工具、技能、内存等
- 配置驱动行为:无需更改代码即可在运行时灵活配置
- 可扩展架构:技能、工具和外部服务的插件系统
- 内置可观测性:追踪、日志和监控考虑
- 安全第一:沙箱执行和全面的错误处理
学习路径概述
本教程遵循结构化路径,从基础到高级主题:
- 代理基础 - 状态管理和代理创建模式
- 工具系统 - 使用工具扩展代理能力
- 技能架构 - 基于插件的能力扩展
- 子代理编排 - 并行执行和委派模式
- 沙箱执行 - 安全、隔离的代理环境
- 内存系统 - 持久上下文管理
- 中间件模式 - 横切关注点处理
- 配置驱动开发 - 灵活的代理行为
- MCP 集成 - 外部服务连接
- 生产模式 - 企业级实现
让我们开始使用 DeerFlow 作为指南来学习 LangGraph 概念。
1. 代理基础:状态管理和创建
目标
了解如何创建和配置具有自定义状态的 LangGraph 代理,以适应您的领域。
DeerFlow 中的关键概念
自定义状态扩展
DeerFlow 扩展 LangGraph 的 AgentState 来创建具有领域特定字段的 ThreadState:
# backend/packages/harness/deerflow/agents/thread_state.py
from typing import Sequence
from langgraph.graph import AgentState
from deerflow.sandbox.sandbox import SandboxState
from deerflow.agents.thread_state import ThreadDataState
from deerflow.tools.builtins.todo_tool import Todo
class ThreadState(AgentState):
sandbox: SandboxState # 沙箱执行状态
thread_data: ThreadDataState # 线程特定数据
title: str # 自动生成的线程标题
artifacts: dict[str, str] # 生成的文件/工件
todos: Sequence[Todo] # 任务跟踪(当计划模式启用时)
uploaded_files: list[str] # 用户上传的文件
viewed_images: list[str] # 代理处理的图像
为什么这很重要:在 LangGraph 中,状态表示代理在步骤之间的内存。通过扩展基本状态,DeerFlow 添加了特定领域的功能:
- 沙箱状态:跟踪隔离的执行环境
- 线程数据:管理对话上下文
- 工件:存储生成的输出
- 待办事项:启用复杂工作流的任务跟踪
- 文件跟踪:管理用户上传和代理生成的图像
动态配置下的代理创建
DeerFlow 的主代理工厂展示了如何根据运行时配置创建代理:
# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def make_lead_agent(config: RunnableConfig):
# 从 config.param 中提取配置
cfg = config.get("configurable", {})
thinking_enabled = cfg.get("thinking_enabled", True)
model_name = cfg.get("model_name") or "default"
is_plan_mode = cfg.get("is_plan_mode", False)
subagent_enabled = cfg.get("subagent_enabled", False)
# 创建带有自定义状态和中间件的 LangGraph 代理
return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled),
tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled),
middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name),
system_prompt=apply_prompt_template(
subagent_enabled=subagent_enabled,
max_concurrent_subagents=max_concurrent_subagents,
agent_name=agent_name
),
state_schema=ThreadState, # 我们的自定义状态
)
为什么这很重要:此模式展示了如何:
- 从
RunnableConfig中提取运行时配置 - 动态选择模型和功能
- 根据配置有条件地启用中间件
- 根据启用的功能生成系统提示
- 使用 LangGraph 的
create_agent与自定义状态模式
练习:扩展代理状态
创建一个客户支持代理的自定义状态,包括:
- 客户资料信息
- 对话情感跟踪
- 未解决的支持工单
- 引用的知识库文章
from typing import Optional, List, Dict
from langgraph.graph import AgentState
from pydantic import BaseModel
class CustomerProfile(BaseModel):
customer_id: str
tier: str # "免费", "高级", "企业"
issues_resolved: int
class SupportState(AgentState):
customer_profile: Optional[CustomerProfile]
sentiment_score: float # -1.0 到 1.0
open_tickets: List[str]
kb_articles: List[Dict[str, str]] # 标题、内容、相关性分数
练习:可配置的代理工厂
创建一个代理工厂,可以根据配置在不同的代理类型(研究者、助手、编程者)之间切换,每种类型具有不同的工具集和系统提示。
def create_specialized_agent(config: RunnableConfig, agent_type: str):
# 实现根据 agent_type 参数返回不同代理配置的逻辑
pass
2. 工具系统:扩展代理能力
目标
学习如何定义和集成扩展 LangGraph 代理功能的工具。
DeerFlow 中的关键概念
具有自动文档的工具定义
DeerFlow 使用 LangChain 的 @tool 装饰器并带有自动文档解析:
# backend/packages/harness/deerflow/tools/builtins/task_tool.py
from langchain.tools import tool
from typing import Annotated, Literal
from langchain.tools import InjectedToolCallId
@tool("task", parse_docstring=True)
def task_tool(
runtime: ToolRuntime[ContextT, ThreadState],
description: str,
prompt: str,
subagent_type: Literal["general-purpose", "bash"],
tool_call_id: Annotated[str, InjectedToolCallId],
max_turns: int | None = None,
) -> str:
"""将任务委托给在其自身上下文中运行的专用子代理。
子代理可以帮助您:
- 通过保持探索和实施的分离来保持上下文
- 自治地处理复杂的多步骤任务
- 在隔离的上下文中执行命令或操作
可用的子代理类型:
- **通用用途**:能够处理需要探索和行动的复杂多步骤任务的代理。
当任务需要复杂推理、多个依赖步骤或将从隔离上下文中受益时使用。
- **bash**:命令执行专家,用于运行 bash 命令。用于
git 操作、构建过程,或者当命令输出将是冗长的时候。
何时使用此工具:
- 多步骤或多工具的复杂任务
- 产生冗长输出的任务
- 当您希望将上下文与主对话隔离时
- 并行研究或探索任务
何时不使用此工具:
- 简单的单步操作(直接使用工具)
- 需要用户交互或澄清的任务
参数:
description: 用于日志/显示的任务简短描述(3-5个词)。优先提供此参数。
prompt: 子代理的任务描述。明确具体地说明需要完成的工作。优先提供此参数。
subagent_type: 要使用的子代理类型。优先提供此参数。
max_turns: 可选的最大代理回合数。默认为子代理配置的最大值。
"""
# 实现...
为什么这很重要:parse_docstring=True 参数会自动提取:
- 装饰器中的工具名称(
"task") - 文档字符串中的描述
- Args 部分中的参数定义
- 使用示例和指南
这创建了自文档化的工具,LLM 可以理解和有效使用。
工具组和配置
工具通过 YAML 进行逻辑组织和配置:
# config.example.yaml
tools:
- name: web_search
use: deerflow.community.tavily:TavilySearchTool
group: research
- name: bash
use: deerflow.sandbox.tools:bash
group: system
- name: read_file
use: deerflow.sandbox.tools:read_file
group: file_ops
tool_groups:
- name: research
description: 用于收集信息的工具
- name: system
description: 用于系统操作的工具
- name: file_ops
description: 用于文件系统操作的工具
为什么这很重要:工具组使以下成为可能:
- 基于角色的工具访问:不同的代理类型获得不同的工具组
- 逻辑组织:相关的工具被组合在一起
- 配置驱动行为:无需代码更改即可启用/禁用组
- 权限系统:限制对危险工具的访问
动态工具解析
DeerFlow 在运行时根据配置解析工具:
# backend/packages/harness/deerflow/config/app_config.py
def get_tool_config(self, name: str) -> ToolConfig | None:
"""根据名称获取工具配置。"""
return next((tool for tool in self.tools if tool.name == name), None)
def get_available_tools(
model_name: str | None = None,
groups: list[str] | None = None,
subagent_enabled: bool = False
) -> list[BaseTool]:
"""根据配置组装可用工具。"""
# 1. 获取配置定义的工具
# 2. 如果指定,按组过滤
# 3. 如果启用,添加 MCP 工具
# 4. 添加内置工具
# 5. 如果启用,添加子代理工具
# 6. 返回最终工具列表
为什么这很重要:此模式允许:
- 特定环境的工具集:开发和生产环境中的不同工具
- 功能标志:安全地启用实验性工具
- 代理专业化:研究人员获得网络工具,编程者获得开发工具
- 安全控制:在某些情况下限制危险工具
练习:构建数据库工具包
创建一组用于与 PostgreSQL 数据库交互的工具:
db_query:执行 SELECT 查询并返回结果db_execute:执行 INSERT/UPDATE/DELETE 语句db_schema:获取表结构信息db_transaction:在事务中执行多个语句
将它们组织到一个 "database" 工具组中并创建配置示例。
3. 技能架构:基于插件的能力扩展
目标
了解如何在 LangGraph 应用程序中使用技能创建可扩展的能力系统。
DeerFlow 中的关键概念
技能格式
技能是带有 YAML 前言的 Markdown 文件,提供工作流说明:
# skills/public/deep-research/SKILL.md
name: deep-research
description: 一项用于对任何主题进行深入研究的技能
license: MIT
# 深度研究技能
## 概述
此技能提供了一个框架,用于对任何主题进行彻底研究。
## 何时使用
- 需要多个来源的复杂主题
- 当你需要全面覆盖时
- 用于生成带有引用的报告
## 工作流程
1. **分解**:将主题分解为 3-5 个子问题
2. **研究**:使用网络搜索调查每个子问题
3. **综合**:将发现组合成连贯的叙述
4. **引用**:正确引用所有来源
5. **审查**:检查空白和不一致之处
## 最佳实践
- 总是从多个来源验证信息
- 尽可能使用最近的来源(过去一年内)
- 区分事实和观点
- 记录所有来源以便引用
为什么这很重要:技能提供:
- 结构化工作流程:复杂任务的逐步指导
- 最佳实践:编码的专业知识以获得可靠的结果
- 可发现性:代理可以自动加载相关技能
- 渐进式加载:仅在需要时加载技能以节省上下文
- 可共享性:易于在项目之间分发和重用
技能加载系统
DeerFlow 的技能加载器扫描目录并解析技能文件:
# backend/packages/harness/deerflow/skills/loader.py
def load_skills(skills_path: Path | None = None, use_config: bool = True, enabled_only: bool = False) -> list[Skill]:
# 获取技能目录路径
if skills_path is None:
if use_config:
skills_path = get_app_config().skills.get_skills_path()
else:
skills_path = get_skills_root_path()
skills = []
# 扫描公共和自定义目录
for category in ["public", "custom"]:
category_path = skills_path / category
if not category_path.exists() or not category_path.is_dir():
continue
# 遍历目录结构
for current_root, dir_names, file_names in os.walk(category_path):
# 跳过隐藏目录
dir_names[:] = sorted(name for name in dir_names if not name.startswith("."))
if "SKILL.md" not in file_names:
continue
# 解析技能文件
skill_file = Path(current_root) / "SKILL.md"
relative_path = skill_file.parent.relative_to(category_path)
skill = parse_skill_file(skill_file, category=category, relative_path=relative_path)
if skill:
skills.append(skill)
# 从配置加载启用状态
try:
extensions_config = ExtensionsConfig.from_file()
for skill in skills:
skill.enabled = extensions_config.is_skill_enabled(skill.name, skill.category)
except Exception:
# 如果配置失败,默认为启用
pass
# 过滤和排序
if enabled_only:
skills = [skill for skill in skills if skill.enabled]
skills.sort(key=lambda s: s.name)
return skills
为什么这很重要:技能加载系统使以下成为可能:
- 零配置发现:自动检测新技能
- 关注点分离:技能元数据与启用状态分离
- 环境隔离:公共(提交)与自定义(本地)技能
- 运行时更新:更改无需重新启动即可生效
- 性能优化:仅加载所需的技能
技能提示注入
技能被注入到代理提示中以供 LLM 消费:
# backend/packages/harness/deerflow/agents/lead_agent/prompt.py
def get_skills_prompt_section(available_skills: set[str] | None = None) -> str:
"""生成带有可用技能列表的技能提示部分。"""
skills = load_skills(enabled_only=True)
# 如果指定,则按可用技能过滤
if available_skills is not None:
skills = [skill for skill in skills if skill.name in available_skills]
if not skills:
return ""
# 构建技能 XML 部分
skill_items = "\n".join(
f" <skill>\n <name>{skill.name}</name>\n <description>{skill.description}</description>\n <location>{skill.get_container_file_path(container_base_path)}</location>\n </skill>"
for skill in skills
)
skills_list = f"<available_skills>\n{skill_items}\n</available_skills>"
return f"""<skill_system>
您可以访问针对特定任务提供优化工作流程的技能。每个技能包含最佳实践、框架和参考资源。
**渐进式加载模式:**
1. 当用户查询匹配技能的用例时,立即使用技能标签下提供的路径属性调用 `read_file` 读取技能的主文件
2. 阅读并理解技能的工作流程和说明
3. 技能文件在同一文件夹下包含对外部资源的引用
4. 仅在执行过程中需要时加载引用的资源
5. 精确地按照技能的说明操作
**技能位于:** {container_base_path}
{skills_list}
</skill_system>"""
为什么这很重要:这种方法确保:
- 上下文效率:技能被引用,除非需要否则不会完全加载
- 及时加载:技能在其工作流程相关时加载
- 清晰说明:代理确切知道如何使用每个技能
- 资源引用:技能可以指向额外的材料
- 渐进式披露:简单任务不会被技能文档压垮
练习:构建数据分析技能系统
创建一个数据分析技能系统,包括:
- 数据清洁技能:处理缺失值、异常值和格式
- 探索性分析技能:描述性统计和可视化
- 建模技能:算法选择、训练和评估
- 报告技能:洞察提取、可视化选择和叙事创作
每个技能应包含:
- 带有元数据的 YAML 前言
- 清晰的工作流程描述
- 最佳实践和常见陷阱
- 相关工具和技术的参考
4. 子代理编排:并行执行模式
目标
掌握 LangGraph 的子代理委派和并行执行模式,用于复杂任务分解。
DeerFlow 中的关键概念
子代理隔离模式
子代理在具有自己的上下文、工具和配置的情况下运行:
# backend/packages/harness/deerflow/subagents/executor.py
class SubagentExecutor:
def __init__(
self,
config: SubagentConfig,
tools: list[BaseTool],
parent_model: str | None = None,
sandbox_state: SandboxState | None = None,
thread_data: ThreadDataState | None = None,
thread_id: str | None = None,
trace_id: str | None = None,
):
self.config = config
self.parent_model = parent_model
self.sandbox_state = sandbox_state
self.thread_data = thread_data
self.thread_id = thread_id
self.trace_id = trace_id or str(uuid.uuid4())[:8] # 如果未提供则生成
# 根据子代理配置过滤工具
self.tools = _filter_tools(
tools,
config.tools, # 允许的工具(None = 所有)
config.disallowed_tools, # 拒绝的工具(总是包含 "task")
)
logger.info(f"[trace={self.trace_id}] SubagentExecutor 初始化:{config.name},拥有 {len(self.tools)} 个工具")
为什么这很重要:子代理隔离提供:
- 上下文分离:防止任务之间的污染
- 资源限制:每个子代理获得自己的限制
- 专业化:不同的子代理可以具有不同的工具/模型
- 故障隔离:一个子代理的故障不会导致其他子代理崩溃
- 安全性:具有受控工具访问的沙箱执行
双线程池模式以提高效率
DeerFlow 使用两个线程池来防止阻塞:
# backend/packages/harness/deerflow/subagents/executor.py
# 用于后台任务调度和编排的线程池
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
# 用于实际子代理执行(带超时支持)的线程池
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
为什么这很重要:这种分离防止:
- 调度器阻塞:长时间运行的任务不会延迟新任务的提交
- 资源饥饿:执行池可以独立调整大小
- 超时管理:调度与执行的不同超时策略
- 清晰分离:编排逻辑与执行逻辑的分离
带实时更新的后台任务执行
任务以异步方式运行并带有进度报告:
# backend/packages/harness/deerflow/subagents/executor.py
def execute_async(self, task: str, task_id: str | None = None) -> str:
"""在后台启动任务执行。"""
# 创建初始待处理结果
result = SubagentResult(
task_id=task_id or str(uuid.uuid4())[:8],
trace_id=self.trace_id,
status=SubagentStatus.PENDING,
)
# 存储结果以供轮询
with _background_tasks_lock:
_background_tasks[task_id] = result
# 提交到调度器池
def run_task():
# 更新状态为 RUNNING
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
result_holder = _background_tasks[task_id]
try:
# 执行带超时
execution_future = _execution_pool.submit(self.execute, task, result_holder)
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
# 更新最终结果
with _background_tasks_lock:
_background_tasks[task_id].status = exec_result.status
_background_tasks[task_id].result = exec_result.result
_background_tasks[task_id].error = exec_result.error
_background_tasks[task_id].completed_at = datetime.now()
_background_tasks[task_id].ai_messages = exec_result.ai_messages
except Exception as e:
# 处理执行失败
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.FAILED
_background_tasks[task_id].error = str(e)
_background_tasks[task_id].completed_at = datetime.now()
_scheduler_pool.submit(run_task)
return task_id
为什么这很重要:此模式使以下成为可能:
- 非阻塞提交:立即返回带任务 ID
- 实时进度:可以在执行期间发送更新
- 适当的超时处理:将调度超时与执行超时分开
- 结果持久性:存储结果以便以后检索
- 错误隔离:故障不会影响主代理的执行
并发管理和批处理
严格的限制通过智能批处理防止资源耗尽:
# backend/packages/harness/deerflow/agents/lead_agent/prompt.py(子代理部分)
**⛔ 硬并发限制:每次响应最多 {n} 次 `task` 调用。这是强制性的。**
- 每次响应,您最多可以包含 {n} 次 `task` 工具调用。任何多余的调用将被系统**静默丢弃**
- **在启动子代理之前,您必须在思考中计算子任务的数量:**
- 如果数量 ≤ {n}:在此响应中启动所有子任务。
- 如果数量 > {n}:**在此回合中选择最重要/基础的 {n} 个子任务。** 将剩余的保存到下一回合。
- **多批次执行**(对于 >{n} 个子任务):
- 回合 1:并行启动子任务 1-{n} → 等待结果
- 回合 2:并行启动下一批子任务 → 等待结果
- ... 继续直到所有子任务完成
- 最终回合:将所有结果综合成一个连贯的答案
为什么这很重要:这种方法防止:
- 资源耗尽:限制并发执行
- 上下文溢出:太多并行结果会压倒 LLM
- 速率限制问题:避免让外部 API 不堪重负
- 成本控制:防止过度使用令牌
- 用户体验:提供可预测的响应时间
批处理策略使以下成为可能:
- 水平缩放:处理任意复杂的任务
- 渐进式细化:早期结果为后期批次提供信息
- 容错性:一批中的故障不会导致所有工作丢失
- 资源优化:最优利用可用计算资源
练习:构建映射-减少子代理系统
创建一个实现映射-减少模式的子代理系统:
- 映射子代理:并行处理单个数据块
- 减少子代理:将所有映射器的结果组合起来
- 编排器:管理批处理、容错性和结果综合
该系统应处理:
- 可变大小的输入数据集
- 带有重试逻辑的映射器故障
- 当一些映射器失败时的部分结果
- 内存高效的结果组合
5. 沙箱执行:安全的代理环境
目标
了解如何为 LangGraph 中的代理创建安全、隔离的执行环境。
DeerFlow 中的关键概念
抽象沙箱接口
所有沙箱实现一个通用接口供代理交互:
# backend/packages/harness/deerflow/sandbox/sandbox.py
from abc import ABC, abstractmethod
class Sandbox(ABC):
"""沙箱环境的抽象基类"""
_id: str
def __init__(self, id: str):
self._id = id
@property
def id(self) -> str:
return self._id
@abstractmethod
def execute_command(self, command: str) -> str:
"""在沙箱中执行 bash 命令。
参数:
command: 要执行的命令。
返回:
命令的标准输出或错误输出。
"""
pass
@abstractmethod
def read_file(self, path: str) -> str:
"""读取文件的内容。
参数:
path: 要读取的文件的绝对路径。
返回:
文件的内容。
"""
pass
@abstractmethod
def list_dir(self, path: str, max_depth=2) -> list[str]:
"""列出目录的内容。
参数:
path: 要列出的目录的绝对路径。
max_depth: 要遍历的最大深度。默认为 2。
返回:
目录的内容。
"""
pass
@abstractmethod
def write_file(self, path: str, content: str, append: bool = False) -> None:
"""写入内容到文件。
参数:
path: 要写入的文件的绝对路径。
content: 要写入的文本内容。
append: 如果为 False,则创建或覆盖文件;如果为 True,则追加内容。
"""
pass
@abstractmethod
def update_file(self, path: str, content: bytes) -> None:
"""用二进制内容更新文件。
参数:
path: 要更新的文件的绝对路径。
content: 要写入的二进制内容。
"""
pass
为什么这很重要:抽象接口使以下成为可能:
- 实现灵活性:不同的沙箱类型(本地、Docker、Firecracker 等)
- 代理一致性:代理以相同的方式与所有沙箱交互
- 轻松切换:更改沙箱实现而无需更改代理
- 测试便利性:使用模拟沙箱进行单元测试
- 安全边界:明确定义代理的能力
虚拟路径系统以实现安全性
代理看到映射到主机路径的容器路径,防止路径遍历攻击:
# backend/packages/harness/deerflow/sandbox/middleware.py
def replace_virtual_path(path: str) -> str:
"""将虚拟路径替换为实际路径。"""
if path.startswith("/mnt/user-data/"):
return str(get_paths().user_data_dir / path[len("/mnt/user-data/"):])
elif path.startswith("/mnt/skills/"):
return str(get_paths().skills_dir / path[len("/mnt/skills/"):])
return path
def replace_virtual_paths_in_command(command: str) -> str:
"""将命令中的所有虚拟路径替换为实际路径。"""
# 将命令拆分为标记,在每个标记中替换路径
# 正确处理引用字符串
pass
为什么这很重要:虚拟路径提供:
- 路径遍历保护:代理无法通过
../../etc/passwd访问/etc/passwd - 环境隔离:每个代理只能看到自己的文件空间
- 一致的接口:
/mnt/user-data/workspace始终指向正确的位置 - 轻松清理:临时文件包含在已知位置中
- 多租户:多个代理可以同时运行而不会相互干扰
沙箱中间件集成
沙箱生命周期通过中间件管理:
# backend/packages/harness/deerflow/agents/lead_agent/agent.py
# 在 _build_middlewares 中的中间件执行顺序
middlewares = build_lead_runtime_middlewares(lazy_init=True)
# SandboxMiddleware - 获取沙箱,存储 sandbox_id 在状态中
# 必须在 ThreadDataMiddleware 之后以访问 thread_id
# 必须在需要沙箱访问的工具之前
sandbox_middleware = SandboxMiddleware()
middlewares.append(sandbox_middleware)
# ... 其他依赖于沙箱访问的中间件
为什么这很重要:基于中间件的沙箱管理确保:
- 自动获取:在代理执行之前就绪沙箱
- 适当的清理:执行后释放资源
- 线程隔离:每个线程获得自己的沙箱
- 错误处理:获取/释放过程中的故障得到处理
- 性能优化:在安全的情况下重用沙箱
工具路径翻译以实现透明性
工具自动处理虚拟路径和实际路径之间的翻译:
# backend/packages/harness/deerflow/sandbox/tools.py
def read_file(path: str) -> str:
"""读取文件内容,可选的行范围"""
actual_path = replace_virtual_path(path)
# 安全检查:确保路径在允许的边界内
if not is_path_allowed(actual_path):
raise ValueError(f"访问路径被拒绝:{path}")
# 从实际路径读取
with open(actual_path, "r", encoding="utf-8") as f:
return f.read()
def write_file(path: str, content: str, append: bool = False) -> None:
"""写入/追加到文件,创建目录"""
actual_path = replace_virtual_path(path)
# 安全检查
if not is_path_allowed(actual_path):
raise ValueError(f"访问路径被拒绝:{path}")
# 确保目录存在
Path(actual_path).parent.mkdir(parents=True, exist_ok=True)
# 写入实际路径
mode = "a" if append else "w"
with open(actual_path, mode, encoding="utf-8") as f:
f.write(content)
为什么这很重要:自动路径翻译提供:
- 开发者透明度:代理使用直观的路径,如
/mnt/user-data/workspace/file.txt - 安全执行:所有路径在实际文件访问前进行验证
- 一致行为:相同的代码在所有沙箱实现中工作
- 减少认知负担:无需记住复杂的路径映射
- 审计能力:所有文件访问可以被记录和监控
练习:构建 Firecracker 沙箱提供程序
创建一个使用 AWS Firecracker 微型虚拟机的沙箱提供程序,以实现比 Docker 更强的隔离:
- 实现
Sandbox抽象基类 - 添加 Firecracker 特定的初始化和清理
- 确保适当的资源限制(CPU、内存、磁盘 I/O)
- 实现用于快速启动的快照/恢复功能
- 添加网络限制以增强安全性
- 创建不同安全级别的配置示例
6. 内存系统:持久上下文管理
目标
为代理实现长期内存并在 LangGraph 应用程序中有效管理上下文。
DeerFlow 中的关键概念
层次化内存结构
内存组织为用户上下文、历史和离散事实:
# backend/packages/harness/deerflow/agents/memory/updater.py
def _create_empty_memory() -> dict[str, Any]:
"""创建一个空的内存结构。"""
return {
"version": "1.0",
"lastUpdated": datetime.utcnow().isoformat() + "Z",
"user": {
"workContext": {"summary": "", "updatedAt": ""},
"personalContext": {"summary": "", "updatedAt": ""},
"topOfMind": {"summary": "", "updatedAt": ""},
},
"history": {
"recentMonths": {"summary": "", "updatedAt": ""},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "", "updatedAt": ""},
},
"facts": [], # 带有元数据的离散事实
}
为什么这很重要:此结构提供:
- 以用户为中心的信息:偏好、工作上下文、即时思考
- 时间意识:最近的与遥远的上下文,具有适当的详细程度
- 基于事实的知识:离散的、可验证的信息,带有置信度分数
- 选择性遗忘:不同记忆类型的不同衰减率
- 上下文丰富性: zarówno 叙事摘要和具体事实
内存更新管道与 LLM 处理
对话以异步方式处理以更新内存:
为什么这很重要:此管道使以下成为可能:
- 非阻塞操作:内存更新不会减慢对话
- 智能处理:LLM 提取人类可能错过的见解
- 选择性保留:只有有价值的信息被长期存储
- 上下文适宜性:不同的时间框架具有适当的详细程度
- 持续改进:代理随着时间的推移更好地理解用户
带有去重和置信度的事实管理
新事实根据现有事实进行检查,并基于置信度进行替换:
# backend/packages/harness/deerflow/agents/memory/updater.py
def _apply_updates(
self,
current_memory: dict[str, Any],
update_data: dict[str, Any],
thread_id: str | None = None,
) -> dict[str, Any]:
config = get_memory_config()
now = datetime.utcnow().isoformat() + "Z"
# 跟踪现有事实内容以防止重复
existing_fact_keys = {
_fact_content_key(fact.get("content"))
for fact in current_memory.get("facts", [])
if _fact_content_key(fact.get("content")) is not None
}
# 添加新事实并进行置信度过滤
for fact in new_facts:
confidence = fact.get("confidence", 0.5)
if confidence >= config.fact_confidence_threshold:
raw_content = fact.get("content", "")
normalized_content = raw_content.strip()
fact_key = _fact_content_key(normalized_content)
# 如果我们已经有这个事实(除非新事实有更高置信度),则跳过
if fact_key is not None and fact_key in existing_fact_keys:
existing_fact = next(
(f for f in current_memory["facts"]
if _fact_content_key(f.get("content")) == fact_key),
None
)
if existing_fact and existing_fact.get("confidence", 0) >= confidence:
continue # 保留具有等于或更高置信度的现有事实
# 添加新事实
fact_entry = {
"id": f"fact_{uuid.uuid4().hex[:8]}",
"content": normalized_content,
"category": fact.get("category", "context"),
"confidence": confidence,
"createdAt": now,
"source": thread_id or "unknown",
}
current_memory["facts"].append(fact_entry)
if fact_key is not None:
existing_fact_keys.add(fact_key)
# 通过保留最高置信度的事实来执行最大事实限制
if len(current_memory["facts"]) > config.max_facts:
current_memory["facts"] = sorted(
current_memory["facts"],
key=lambda f: f.get("confidence", 0),
reverse=True,
)[: config.max_facts]
return current_memory
为什么这很重要:此事实管理系统提供:
- 知识质量:仅保留高置信度的事实
- 冲突解决:更新/更好的证据可以替换较旧的事实
- 来源追踪:知道信息来自何处
- 时间意识:较新的事实权重更高
- 存储效率:防止无界内存增长
供LLM消费的上下文注入
内存被格式化并注入到代理提示中:
# backend/packages/harness/deerflow/agents/lead_agent/prompt.py
def _get_memory_context(agent_name: str | None = None) -> str:
"""获取用于注入到系统提示中的内存上下文。"""
try:
from deerflow.agents.memory import format_memory_for_injection, get_memory_data
from deerflow.config.memory_config import get_memory_config
config = get_memory_config()
if not config.enabled or not config.injection_enabled:
return ""
memory_data = get_memory_data(agent_name)
memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
if not memory_content.strip():
return ""
return f"""<memory>
{memory_content}
</memory>
"""
except Exception as e:
print(f"Failed to load memory context: {e}")
return ""
为什么这很重要:上下文注入使以下成为可能:
- 个性化:代理记住用户偏好和历史
- 连续性:对话建立在之前的互动之上
- 效率:重复使用之前计算出的见解
- 个人风格:代理展示对用户上下文的理解
- 更好的结果:更相关和准确的响应
内存维护和卫生
自动清理防止内存污染:
# backend/packages/harness/deerflow/agents/memory/updater.py
_UPLOAD_SENTENCE_RE = re.compile(
r"[^.!?]*\b(?:"
r"upload(?:ed|ing)?(?:\s+\w+){0,3}\s+(?:file|files?|document|documents?|attachment|attachments?)"
r"|file\s+upload"
r"|/mnt/user-data/uploads/"
r"|<uploaded_files>"
r")[^.!?]*[.!?]?\s*",
re.IGNORECASE,
)
def _strip_upload_mentions_from_memory(memory_data: dict[str, Any]) -> dict[str, Any]:
"""从所有内存摘要和事实中删除关于文件上传的句子。"""
# 清洗用户/历史部分的摘要
for section in ("user", "history"):
section_data = memory_data.get(section, {})
for _key, val in section_data.items():
if isinstance(val, dict) and "summary" in val:
cleaned = _UPLOAD_SENTENCE_RE.sub("", val["summary"]).strip()
cleaned = re.sub(r" +", " ", cleaned)
val["summary"] = cleaned
# 还要删除任何描述上传事件的事实
facts = memory_data.get("facts", [])
if facts:
memory_data["facts"] = [f for f in facts if not _UPLOAD_SENTENCE_RE.search(f.get("content", ""))]
return memory_data
为什么这很重要:内存卫生防止:
- 陈旧信息:会话特定数据不适当地持续存在
- 错误关联:上传事件不会创建错误的用户偏好
- 上下文污染:无关事实不会稀释重要知识
- 存储膨胀:瞬态数据不会消耗永久存储
- 用户信任:代理不会“记住”它不应该记住的事情
练习:构建时间感知内存系统
通过时间意识增强内存系统:
- 衰减函数:不同内存类型的不同衰减率
- 强化机制:频繁访问的事实获得置信度
- 上下文检索:检索与当前对话主题相关的事实
- 内存摘要:定期将相关事实合并为更高层次的洞察
- 隐私控制:用户可配置的数据保留策略
- 内存调试:用于检查和编辑存储内存的工具
7. 中间件模式:横切关注点
目标
掌握用于在代理系统中处理横切关注点的中间件。
DeerFlow 中的关键概念
有序中间件链执行
每个中间件在一个明确定义的序列中处理特定的问题:
# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_name: str | None = None):
"""根据运行时配置构建中间件链。"""
middlewares = build_lead_runtime_middlewares(lazy_init=True)
# 1. 摘要 - 在接近限制时减少上下文
summarization_middleware = _create_summarization_middleware()
if summarization_middleware is not None:
middlewares.append(summarization_middleware)
# 2. 待办事项列表 - 复杂工作流的任务跟踪
todo_list_middleware = _create_todo_list_middleware(is_plan_mode)
if todo_list_middleware is not None:
middlewares.append(todo_list_middleware)
# 3. TitleMiddleware - 自动生成线程标题
middlewares.append(TitleMiddleware())
# 4. MemoryMiddleware - 将对话排队以进行异步内存更新
middlewares.append(MemoryMiddleware(agent_name=agent_name))
# 5. ViewImageMiddleware - 在 LLM 调用前注入 base64 图像数据
# (仅当模型支持视觉时添加)
if model_config is not None and model_config.supports_vision:
middlewares.append(ViewImageMiddleware())
# 6. DeferredToolFilterMiddleware - 隐藏延迟工具模式
if app_config.tool_search.enabled:
middlewares.append(DeferredToolFilterMiddleware())
# 7. SubagentLimitMiddleware - 强制执行并发限制
if subagent_enabled:
middlewares.append(SubagentLimitMiddleware(max_concurrent=max_concurrent_subagents))
# 8. LoopDetectionMiddleware - 防止无限工具调用循环
middlewares.append(LoopDetectionMiddleware())
# 9. ClarificationMiddleware - 处理澄清请求(必须最后)
middlewares.append(ClarificationMiddleware())
return middlewares
为什么这很重要:特定的顺序确保:
- 首先进行摘要:在其他处理之前减少令牌负载
- 尽早进行任务跟踪:在执行开始之前跟踪意图
- 标题生成:在第一次交换后但内存处理之前
- 内存排队:捕获对话以供以后处理
- 视觉支持:在 LLM 查看之前注入图像
- 工具过滤:隐藏复杂工具直至需要时
- 并发限制:防止资源耗尽
- 循环检测:早期捕获问题模式
- 澄清最后:在需要时中断执行
用于横切关注点的中间件
每个中间件解决一个特定问题:
- ThreadDataMiddleware:为文件隔离创建每个线程的目录
- UploadsMiddleware:跟踪并使代理可用的用户上传
- SandboxMiddleware:管理沙箱获取/释放生命周期
- DanglingToolCallMiddleware:处理来自中断的不完整工具调用
- GuardrailMiddleware:安全/合规的预执行授权
- SummarizationMiddleware:当令牌限制被接近时的上下文管理
- TodoListMiddleware:通过显式任务跟踪启用计划模式
- TitleMiddleware:自动生成有意义的对话标题
- MemoryMiddleware:实现持久长期内存
- ViewImageMiddleware:处理视觉能力模型的图像
- DeferredToolFilterMiddleware:优化工具模式呈现
- SubagentLimitMiddleware:防止由于过多子代理导致的资源耗尽
- LoopDetectionMiddleware:检测并中断重复的工具调用模式
- ClarificationMiddleware:处理用户澄清请求
为什么这很重要:这种中间件方法提供:
- 关注点分离:每个中间件有一个明确的责任
- 可组合性:为不同代理类型混合和匹配中间件
- 运行时可配置性:无需代码更改即可启用/禁用功能
- 可测试性:每个中间件可以独立进行测试
- 可重用性:将常见模式提取为可共享的组件
- 可维护性:对一个问题的更改不会影响其他问题
中间件接口和通信
中间件通过 LangGraph 中间件系统进行交互:
# 示例:简化的中间件结构
class BaseMiddleware:
async def __call__(self, call):
# 预处理逻辑
# ...
# 调用链中的下一个中间件
response = await call
# 后处理逻辑
# ...
return response
为什么这很重要:此模式使以下成为可能:
- 预/后处理:在核心逻辑之前和之后处理问题
- 短路:在需要时阻止执行(例如,澄清)
- 错误处理:统一捕获和处理异常
- 状态修改:读取和写入代理状态
- 副作用:执行日志、度量、外部调用
- 链控制:决定是否继续到下一个中间件
练习:构建度量和监控中间件
创建一个收集和报告代理性能指标的中间件:
- 请求计时:测量端到端响应时间
- 工具使用情况:跟踪工具调用的频率和持续时间
- 令牌消耗:监控输入/输出令牌使用情况
- 错误率:分类和跟踪不同的失败类型
- 资源利用率:监控执行期间的内存和 CPU 使用情况
- 仪表板集成:通过 Prometheus 或类似工具公开度量
- 警报:在出现异常行为或 SLA 违规时通知
8. 配置驱动开发
目标
学习如何创建能够适应不同环境和需求的灵活、可配置的代理系统。
DeerFlow 中的关键概念
带环境变量解析的层次化配置
配置使用带有智能解析的 Pydantic 模型:
# backend/packages/harness/deerflow/config/app_config.py
class AppConfig(BaseModel):
models: list[ModelConfig] = Field(default_factory=list, description="可用模型")
sandbox: SandboxConfig = Field(description="沙箱配置")
tools: list[ToolConfig] = Field(default_factory=list, description="可用工具")
tool_groups: list[ToolGroupConfig] = Field(default_factory=list, description="可用工具组")
skills: SkillsConfig = Field(default_factory=SkillsConfig, description="技能配置")
extensions: ExtensionsConfig = Field(default_factory=ExtensionsConfig, description="扩展配置")
tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="工具搜索配置")
# ... 其他部分
@classmethod
def resolve_env_variables(cls, config: Any) -> Any:
"""递归解析配置中的环境变量。"""
if isinstance(config, str):
if config.startswith("$"):
env_value = os.getenv(config[1:])
if env_value is None:
raise ValueError(f"环境变量 {config[1:]} 未找到,用于配置值 {config}")
return env_value
return config
elif isinstance(config, dict):
return {k: cls.resolve_env_variables(v) for k, v in config.items()}
elif isinstance(config, list):
return [cls.resolve_env_variables(item) for item in config]
return config
为什么这很重要:此配置系统提供:
- 类型安全:Pydantic 在加载时验证配置
- 环境灵活性:不同部署环境的不同值
- 密钥管理:将凭据保留在配置文件之外
- 层次覆盖:命令行 > 环境变量 > 文件 > 默认值
- 文档生成:从模型定义自动生成文档
- IDE 支持:在编辑器中提供自动补全和验证
- 版本跟踪:检测和警告过时的配置
无需重启的配置重新加载
智能重新加载检测文件更改并更新配置:
# backend/packages/harness/deerflow/config/app_config.py
_app_config: AppConfig | None = None
_app_config_path: Path | None = None
_app_config_mtime: float | None = None
def get_app_config() -> AppConfig:
"""获取带有自动重新加载的 DeerFlow 配置实例。"""
global _app_config, _app_config_path, _app_config_mtime
# 检查我们是否需要根据路径或修改时间重新加载
resolved_path = AppConfig.resolve_config_path()
current_mtime = _get_config_mtime(resolved_path)
should_reload = (
_app_config is None
or _app_config_path != resolved_path
or _app_config_mtime != current_mtime
)
if should_reload:
# 重新加载并更新缓存
_app_config = _load_and_cache_app_config(str(resolved_path))
_app_config_path = resolved_path
_app_config_mtime = current_mtime
return _app_config
为什么这很重要:自动重新加载使以下成为可能:
- 零停机更新:在不重启服务的情况下更改行为
- 快速实验:快速尝试不同的配置
- 生产调整:根据观察到的行为调整参数
- 应急响应:快速更改行为以响应事件
- 开发效率:在开发过程中立即看到更改
- 配置漂移检测:识别文件和内存何时不同
模块化配置部分
复杂的配置被分解为专注的、可重用的部分:
# backend/packages/harness/deerflow/config/model_config.py
class ModelConfig(BaseModel):
"""单个 LLM 模型的配置。"""
name: str = Field(description="内部标识符")
display_name: str = Field(description="人类可读名称")
use: str = Field(description="LangChain 类路径")
model: str = Field(description="API 的模型标识符")
api_key: Optional[str] = Field(description="API 密钥(推荐使用环境变量)")
max_tokens: Optional[int] = Field(description="每次请求的最大标记数")
temperature: float = Field(description="采样温度", default=0.7)
# ... 思考/视觉支持标志
supports_thinking: bool = Field(description="模型是否支持思考模式", default=False)
supports_vision: bool = Field(description="模型是否支持图像输入", default=False)
# 提供者特定字段
base_url: Optional[str] = Field(description="自定义 API 基础 URL", default=None)
use_responses_api: bool = Field(description="使用 OpenAI 响应 API", default=False)
output_version: Optional[str] = Field(description="响应 API 的输出版本", default=None)
为什么这很重要:模块化配置提供:
- 关注点分离:每个部分处理配置的一个方面
- 可重用性:提取常见模式(例如,用于不同提供者的模型配置)
- 可维护性:对一个部分的更改不会影响其他部分
- 可测试性:每个配置部分可以独立测试
- 文档清晰度:用户可以快速找到相关设置
- 团队协作:不同团队可以拥有不同的部分
运行时配置覆盖
配置可以在特定请求时被覆盖:
# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def make_lead_agent(config: RunnableConfig):
cfg = config.get("configurable", {})
# 提取运行时覆盖
thinking_enabled = cfg.get("thinking_enabled", True)
model_name = cfg.get("model_name") or cfg.get("model")
is_plan_mode = cfg.get("is_plan_mode", False)
subagent_enabled = cfg.get("subagent_enabled", False)
max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3)
# ... 使用这些值来配置代理
为什么这很重要:运行时覆盖使以下成为可能:
- 个性化:不同用户获得不同的体验
- 上下文适应:行为基于对话主题而改变
- A/B 测试:在生产中比较不同的配置
- 功能标志:逐步推出新功能
- 紧急覆盖:在事件期间临时更改行为
- 实验:在不影响默认设置的情况下尝试新配置
练习:构建功能标志系统
创建一个支持复杂功能标志的配置系统:
- 百分比推出:逐步为一定比例的用户启用功能
- 有针对性的发布:为特定用户群体启用功能
- 基于时间的激活:功能在特定时间开启/关闭
- 依赖管理:需要其他功能的功能
- 杀死开关:立即禁用有问题的功能
- 实验跟踪:衡量不同配置的影响
- 即时代码:将标志与代码一起存储在版本控制中
9. MCP 集成:外部服务连接
目标
学习如何使用模型上下文协议(MCP)在 LangGraph 应用程序中将代理连接到外部服务。
DeerFlow 中的关键概念
MCP 客户端架构
DeerFlow 使用 langchain-mcp-adapters 进行 MCP 集成:
# backend/packages/harness/deerflow/mcp/(概念性 - 实际实现可能有所不同)
from langchain_mcp_adapters.client import MultiServerMCPClient
class MCPManager:
def __init__(self):
self.client = MultiServerMCPClient()
self._tools_cache = {}
self._cache_timestamps = {}
async def get_tools(self, server_names: list[str] | None = None) -> list[BaseTool]:
"""获取 MCP 工具,尽可能使用缓存。"""
# 检查缓存有效性
# 如果缓存过期或缺失,则从服务器获取新工具
# 返回合并的工具列表
def _should_refresh_cache(self, server_name: str) -> bool:
"""检查服务器的缓存是否需要刷新。"""
# 比较配置文件修改时间与缓存时间戳
pass
async def _refresh_server_tools(self, server_name: str) -> list[BaseTool]:
"""从特定的 MCP 服务器获取工具。"""
# 连接到服务器
# 列出可用工具
# 将其转换为 LangChain 工具格式
# 返回工具列表
为什么这很重要:MCP 提供:
- 标准化界面:一种一致的方式来连接到多样化的服务
- 服务发现:自动查找可用的功能
- 认证处理:内置支持各种认证方法
- 错误标准化:跨服务的一致错误处理
- 元数据丰富性:工具带有描述、示例等
- 社区生态系统:流行服务的 MCP 服务器数量不断增长
懒惰初始化和缓存
工具在需要时按需加载,并带有智能缓存失效:
# backend/packages/harness/deerflow/mcp/client.py(概念性)
def get_cached_mcp_tools() -> list[BaseTool]:
"""获取具有懒惰初始化和缓存失效的 MCP 工具。"""
# 检查我们是否有缓存的工具
# 如果没有,或者如果配置已更改,则从服务器刷新
# 返回缓存的或新鲜加载的工具
# 基于以下因素进行缓存失效:
# 1. 配置文件修改时间
# 2. 显式缓存 TTL 过期
# 3. 服务器可用性变化
为什么这很重要:这种方法提供:
- 启动性能:只有在需要时才加载工具
- 内存效率:只有当工具有用时才保持工具在内存中
- 自动更新:当服务器发生变化时选择新工具
- 故障韧性:如果服务器暂时不可用,则继续工作
- 资源优化:不要在未使用的服务器上浪费连接
- 开发者体验:快速的开发周期
传输无关设计
支持多种传输机制:
# 展示不同传输的配置示例
# extensions_config.json
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": [-y, "@modelcontextprotocol/server-filesystem", "/path/to/allowed/files"],
"transport": "stdio"
},
"browser": {
"url": "http://localhost:3000/mcp",
"transport": "HTTP"
},
"knowledge-base": {
"url": "https://api.example.com/mcp",
"transport": "SSE",
"headers": {
"Authorization": "Bearer $MCP_API_TOKEN"
}
}
}
}
为什么这很重要:传输灵活性使以下成为可能:
- 本地开发:stdio 用于轻松的测试和调试
- 生产部署:HTTP/SSE 用于可扩展的网络化服务
- 防火墙灵活性:选择在您的网络环境中有效的传输
- 性能优化:stdio 用于低延迟本地工具,HTTP 用于远程工具
- 安全选项:不同的传输具有不同的安全特性
- 供应商支持:使用服务提供商提供的任何传输
OAuth 令牌流管理
HTTP/SSE 传输的自动认证处理:
# backend/packages/harness/deerflow/mcp/client.py(概念性)
class MCPClientWithOAuth:
def __init__(self, server_url: str, oauth_config: dict):
self.server_url = server_url
self.oauth_config = oauth_config
self.access_token = None
self.refresh_token = None
self.token_expires_at = None
async def ensure_valid_token(self):
"""确保我们有有效的访问令牌,如果需要则刷新它。"""
if self._is_token_valid():
return
if self._can_refresh():
await self._refresh_access_token()
else:
await self._perform_full_oauth_flow()
async def _refresh_access_token(self):
"""使用刷新令牌获取新的访问令牌。"""
# 发出令牌刷新请求
# 更新 access_token 和 expires_at
# 如果提供了新的刷新令牌,则存储它
async def _perform_full_oauth_flow(self):
"""执行完整的 OAuth 授权码流程。"""
# 如果需要,生成 PKCE 挑战
# 将用户重定向到授权 URL
# 处理带有授权码的回调
# 用代码交换令牌
为什么这很重要:OAuth 管理提供:
- 无缝用户体验:不需要手动处理令牌
- 安全最佳实践:适当的令牌存储和轮换
- 减少摩擦:用户不需要管理凭据
- 服务兼容性:适用于标准 OAuth 提供者
- 令牌卫生:自动清理过期的令牌
- 审计能力:跟踪令牌使用和刷新模式
练习:为内部服务构建自定义 MCP 服务器
创建一个向代理暴露您的内部服务的 MCP 服务器:
- 服务识别:发现并记录可用的内部 API
- 工具创建:将 API 端点转换为带有适当描述的 MCP 工具
- 认证集成:连接到您的内部认证系统
- 速率限制:保护内部服务免受过度使用
- 数据转换:在内部格式和 MCP 期望之间进行转换
- 错误处理:将内部错误映射到适当的 MCP 错误响应
- 文档编写:提供清晰的示例和使用指南
- 安全审查:确保适当的访问控制和数据保护
10. 生产模式:企业级实现
目标
实现具有时效性、可扩展性和可维护性考虑的企业级代理系统。
DeerFlow 中的关键概念
可观测性和监控
生产代理需要全面的可见性:
# 基于 DeerFlow 模式的概念实现
class ObservabilityMiddleware:
async def __call__(self, call):
# 开始计时器
start_time = time.time()
# 提取请求信息
request_id = generate_request_id()
user_id = extract_user_id_from_context()
try:
# 处理请求
response = await call
# 记录成功指标
record_metrics(
request_id=request_id,
user_id=user_id,
duration=time.time() - start_time,
status="success",
tool_calls=extract_tool_calls(response),
token_usage=extract_token_usage(response)
)
return response
except Exception as e:
# 记录失败指标
record_metrics(
request_id=request_id,
user_id=user_id,
duration=time.time() - start_time,
status="error",
error_type=type(e).__name__,
error_message=str(e)
)
# 为正常的错误处理重新抛出
raise
为什么这很重要:可观察性使以下成为可能:
- 性能优化:识别瓶颈和缓慢的操作
- 容量规划:了解资源使用模式
- 故障排除:快速诊断和解决问题
- SLA 监控:确保服务级别目标得到实现
- 业务洞察:了解用户如何与代理交互
- 持续改进:数据驱动的代理行为增强
优雅降级和熔断器
生产系统以优雅的方式处理故障:
# 概念性熔断器实现
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if self._should_attempt_reset():
self.state = "HALF_OPEN"
else:
raise CircuitBreakerOpenException()
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = "CLOSED"
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
def _should_attempt_reset(self):
return (
self.state == "OPEN" and
time.time() - self.last_failure_time >= self.timeout
)
为什么这很重要:韧性模式提供:
- 故障隔离:防止级联故障
- 优雅降级:在部分中断期间保持核心功能
- 自动恢复:在问题解决时恢复正常操作
- 用户体验:提供有意义的反馈而不是神秘的错误
- 系统稳定性:在故障期间防止资源耗尽
- 运营简便性:减少故障期间的人工干预
安全和合规特性
生产代理必须满足安全和合规要求:
# 概念性安全中间件
class SecurityMiddleware:
async def __call__(self, call):
# 输入验证和清理
sanitized_input = self._sanitize_input(call.input)
# 访问控制检查
if not self._is_authorized(call.user, call.request):
raise AuthorizationError("权限不足")
# 速率限制
if not self._check_rate_limit(call.user):
raise RateLimitExceededException()
# 内容过滤
if self._contains_prohibited_content(sanitized_input):
raise ContentPolicyViolation()
# 进行清理、授权的请求
try:
response = await call
# 输出验证
safe_output = self._sanitize_output(response)
# 审计日志
self._log_audit_event(call.user, sanitized_input, safe_output)
return safe_output
except Exception as e:
# 记录与安全相关的异常
self._log_security_event(call.user, e)
raise
为什么这很重要:安全特性确保:
- 数据保护:防止未经授权访问敏感信息
- 法规遵从:满足 GDPR、HIPAA、SOC2 等要求
- 滥用防范:挫败自动攻击和刮取
- 用户信任:展示对隐私和安全的承诺
- 法律保护:减少因滥用或泄漏而产生的责任
- 声誉管理:避免损害品牌的安全事件
部署和运营考虑因素
生产就绪的代理需要适当的部署模式:
# 展示生产考虑因素的 Kubernetes 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: deerflow-agent
labels:
app: deerflow-agent
spec:
replicas: 3
selector:
matchLabels:
app: deerflow-agent
template:
metadata:
labels:
app: deerflow-agent
spec:
containers:
- name: agent
image: deerflow-agent:latest
ports:
- containerPort: 8000
env:
- name: LOG_LEVEL
valueFrom:
configMapKeyRef:
name: deerflow-config
key: log_level
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: deerflow-secrets
key: database_url
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: tmp-volume
mountPath: /tmp
- name: cache-volume
mountPath: /var/cache/deerflow
volumes:
- name: tmp-volume
emptyDir: {}
- name: cache-volume
emptyDir:
sizeLimit: 5Gi
为什么这很重要:生产部署注意事项包括:
- 可伸缩性:水平缩放以处理负载变化
- 资源管理:防止容器挨饿或过度供应
- 健康检查:自动重启不健康的实例
- 配置管理:将配置与容器镜像分离
- 密钥管理:安全地处理凭据和证书
- 日志和监控:集中式可观测性
- 备份和灾难恢复:防止数据丢失
- 更新策略:零停机部署和回滚
练习:构建生产就绪的代理平台
创建一个包含以下内容的生产就绪代理平台:
- 多租户:安全地隔离不同的客户或团队
- 使用量计量:跟踪和计费资源消耗
- API 网关:速率限制、身份验证和请求路由
- 服务网格:可观测性、流量管理和安全性
- 灾难恢复:备份策略和跨区域复制
- 合规报告:为监管机构生成审计报告
- 性能优化:缓存、连接池和查询优化
- 开发者门户:文档、SDK 和沙箱环境
- 警报和通知:主动问题检测和升级
- 混沌工程:有意注入故障以测试韧性
综合:将所有内容结合在一起
让我们看看这些概念如何在典型的 DeerFlow 交互中协同工作:
典型请求流程
-
请求接收
- 用户通过网页界面、API 或 IM 通道发送消息
- 请求击中 Nginx,被路由到适当的服务
-
配置加载
get_app_config()加载带有环境变量解析的配置- 中间件链根据运行时标志构建(
is_plan_mode、subagent_enabled等)
-
代理初始化
- 带有自定义
ThreadState的主代理被创建 - 根据配置和组加载适当的工具
- 技能在系统提示中被引用(尚未完全加载)
- 内存上下文被检索并注入到提示中
- 带有自定义
-
处理开始
- ThreadDataMiddleware 创建每个线程的目录
- UploadsMiddleware 处理任何文件上传
- SandboxMiddleware 获取沙箱执行环境
- 各种中间件为 LLM 调用准备状态
-
LLM 推理
- LLM 收到包含以下内容的提示:
- 系统说明(包括技能引用)
- 内存上下文(用户偏好、历史、事实)
- 对话历史
- 可用工具描述
- LLM 使用链式思维推理来思考问题
- LLM 决定行动方案(工具使用、澄清等)
- LLM 收到包含以下内容的提示:
-
工具执行(如果需要)
- 如果调用了工具:
- 适当的中间件处理它(SandboxMiddleware 确保正确的路径)
- 工具在隔离的沙箱环境中执行
- 结果返回到代理状态
- ViewImageMiddleware 处理视觉模型的任何生成图像
- 如果调用了工具:
-
子代理委派(对于复杂任务)
- 如果调用了
task工具:- TaskTool 验证参数并创建 SubagentExecutor
- 执行器创建带有适当工具/配置的隔离子代理
- 子代理在带有超时保护的后台线程池中运行
- 主代理根据执行模式继续或等待
- 结果在就绪时被收集和综合
- 如果调用了
-
内存更新
- MemoryMiddleware 将对话排队以进行异步处理
- 后台线程运行 LLM 提取洞察和事实
- 内存更新器应用带有去重和置信度评分的更改
- 从长期记忆中剥离上传引用
- 更新的内存被保存并缓存以供将来使用
-
响应生成
- LLM 基于更新的状态生成最终响应
- 根据 response_style 指南格式化响应
- 为任何使用的外部信息添加引用
- 通过适当的渠道将响应返回给用户
-
后处理
- TitleMiddleware 可能根据对话更新线程标题
- 执行任何剩余的清理
- 系统准备好进行下一次交互
DeerFlow 演示的关键架构原则
DeerFlow 体现了几个重要的 LangGraph 和代理架构原则:
-
关注点分离
- 不同的组件处理状态、工具、技能、内存、沙箱等
- 每个问题可以独立开发、测试和维护
-
组合优于继承
- 通过组合简单、专注的组件来构建复杂行为
- 中间件链展示了如何组合横切关注点
- 工具系统展示了如何组合能力
-
配置驱动行为
- 运行时配置在不更改代码的情况下启用灵活性
- 功能标志允许安全的实验和逐步推出
- 特定环境的配置支持开发/测试/生产差异
-
渐进式加载和上下文效率
- 技能、工具和记忆仅在需要时加载
- 虚拟路径提供一致的接口而不暴露复杂性
- 后台处理保持主线程的响应能力
-
隔离和安全
- 沙箱执行防止代理逃逸或主机污染
- 子代理隔离防止上下文污染和资源争用
- 虚拟路径和输入验证防止安全问题
-
可观察性和可调试性
- 贯穿系统的结构化日志和追踪
- 清晰的关注点分离有助于问题隔离
- 配置版本控制有助于检测漂移
-
可扩展性和插件架构
- 技能系统允许在不更改核心的情况下添加新工作流程
- 工具系统允许添加新功能
- MCP 集成允许连接到外部服务
- 中间件系统允许添加新的横切关注点
您的 LangGraph 学习之旅的下一步
现在您已经看到了 DeerFlow 如何实现高级 LangGraph 概念,以下是一些继续学习的方法:
-
加深理解
- 彻底阅读 LangGraph 文档
- 研究 LangChain 代理和工具文档
- 探索 langgraph-api 和 langgraph-js 存储库
-
实验修改
- 分叉 DeerFlow 并尝试在每个部分实现练习
- 从简单的更改开始(如添加一个新工具),逐步过渡到复杂的更改(如实现一个新的沙箱类型)
- 用单元测试和集成测试彻底测试您的更改
-
构建您自己的代理系统
- 将您学到的模式应用到您自己的用例中
- 从针对特定领域的专注代理开始
- 随着对需求的更好理解,逐步添加复杂性
-
为社区做出贡献
- 与 DeerFlow 社区分享您的技能和工具
- 参与关于最佳实践和模式的讨论
- 通过回答问题和创建教程来帮助他人学习
-
保持最新
- 关注 LangGraph 和 LangChain 生态系统的发展
- 注意该领域出现的新模式和技术
- 考虑新兴技术(如更好的推理模型、改进的工具使用等)可能如何影响代理架构
请记住,目标不是完全复制 DeerFlow,而是理解其背后的原则和模式,以便将它们应用到您自己的代理系统中。最好的代理架构是适合您特定需求、约束和用例的架构。
祝您构建顺利,愿您的代理始终有帮助且富有洞察力!
Children