从Deer-flow学习LangGraph企业最佳实践

前言:字节跳动开源的超级智能体deer-flow构建于LangGraph,这样岂不是一个天生的LangGraph的最佳实践教程?这是一个很好的学习路径。本文是从源码生成的教程,部署和二次开发则是部署和二次开发的一些说明。

DeerFlow 2.0 是一个基于 LangGraph 和 LangChain 构建的开源 "超级代理 harness",它编排子代理、内存和沙箱来执行复杂任务。其结构良好的实现使其成为在生产就绪环境中学习高级 LangGraph 概念的卓越资源。

本教程将使用 DeerFlow 的实现作为具体示例,引导您学习 LangGraph 概念,遵循从基础概念到高级模式的渐进学习路径。

为什么选择 DeerFlow 学习 LangGraph?

DeerFlow 体现了几个关键的 LangGraph 原则:

  • 生产就绪模式:不仅仅是玩具示例,而是经过实际考验的实现
  • 清晰的关注点分离:不同模块处理状态、工具、技能、内存等
  • 配置驱动行为:无需更改代码即可在运行时灵活配置
  • 可扩展架构:技能、工具和外部服务的插件系统
  • 内置可观测性:追踪、日志和监控考虑
  • 安全第一:沙箱执行和全面的错误处理

学习路径概述

本教程遵循结构化路径,从基础到高级主题:

  1. 代理基础 - 状态管理和代理创建模式
  2. 工具系统 - 使用工具扩展代理能力
  3. 技能架构 - 基于插件的能力扩展
  4. 子代理编排 - 并行执行和委派模式
  5. 沙箱执行 - 安全、隔离的代理环境
  6. 内存系统 - 持久上下文管理
  7. 中间件模式 - 横切关注点处理
  8. 配置驱动开发 - 灵活的代理行为
  9. MCP 集成 - 外部服务连接
  10. 生产模式 - 企业级实现

让我们开始使用 DeerFlow 作为指南来学习 LangGraph 概念。


1. 代理基础:状态管理和创建

目标

了解如何创建和配置具有自定义状态的 LangGraph 代理,以适应您的领域。

DeerFlow 中的关键概念

自定义状态扩展

DeerFlow 扩展 LangGraph 的 AgentState 来创建具有领域特定字段的 ThreadState

# backend/packages/harness/deerflow/agents/thread_state.py
from typing import Sequence
from langgraph.graph import AgentState
from deerflow.sandbox.sandbox import SandboxState
from deerflow.agents.thread_state import ThreadDataState
from deerflow.tools.builtins.todo_tool import Todo

class ThreadState(AgentState):
    sandbox: SandboxState           # 沙箱执行状态
    thread_data: ThreadDataState    # 线程特定数据
    title: str                      # 自动生成的线程标题
    artifacts: dict[str, str]       # 生成的文件/工件
    todos: Sequence[Todo]           # 任务跟踪(当计划模式启用时)
    uploaded_files: list[str]       # 用户上传的文件
    viewed_images: list[str]        # 代理处理的图像

为什么这很重要:在 LangGraph 中,状态表示代理在步骤之间的内存。通过扩展基本状态,DeerFlow 添加了特定领域的功能:

  • 沙箱状态:跟踪隔离的执行环境
  • 线程数据:管理对话上下文
  • 工件:存储生成的输出
  • 待办事项:启用复杂工作流的任务跟踪
  • 文件跟踪:管理用户上传和代理生成的图像

动态配置下的代理创建

DeerFlow 的主代理工厂展示了如何根据运行时配置创建代理:

# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def make_lead_agent(config: RunnableConfig):
    # 从 config.param 中提取配置
    cfg = config.get("configurable", {})
    thinking_enabled = cfg.get("thinking_enabled", True)
    model_name = cfg.get("model_name") or "default"
    is_plan_mode = cfg.get("is_plan_mode", False)
    subagent_enabled = cfg.get("subagent_enabled", False)
    
    # 创建带有自定义状态和中间件的 LangGraph 代理
    return create_agent(
        model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled),
        tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled),
        middleware=_build_middlewares(config, model_name=model_name, agent_name=agent_name),
        system_prompt=apply_prompt_template(
            subagent_enabled=subagent_enabled, 
            max_concurrent_subagents=max_concurrent_subagents,
            agent_name=agent_name
        ),
        state_schema=ThreadState,  # 我们的自定义状态
    )

为什么这很重要:此模式展示了如何:

  1. RunnableConfig 中提取运行时配置
  2. 动态选择模型和功能
  3. 根据配置有条件地启用中间件
  4. 根据启用的功能生成系统提示
  5. 使用 LangGraph 的 create_agent 与自定义状态模式

练习:扩展代理状态

创建一个客户支持代理的自定义状态,包括:

  • 客户资料信息
  • 对话情感跟踪
  • 未解决的支持工单
  • 引用的知识库文章
from typing import Optional, List, Dict
from langgraph.graph import AgentState
from pydantic import BaseModel

class CustomerProfile(BaseModel):
    customer_id: str
    tier: str  # "免费", "高级", "企业"
    issues_resolved: int

class SupportState(AgentState):
    customer_profile: Optional[CustomerProfile]
    sentiment_score: float  # -1.0 到 1.0
    open_tickets: List[str]
    kb_articles: List[Dict[str, str]]  # 标题、内容、相关性分数

练习:可配置的代理工厂

创建一个代理工厂,可以根据配置在不同的代理类型(研究者、助手、编程者)之间切换,每种类型具有不同的工具集和系统提示。

def create_specialized_agent(config: RunnableConfig, agent_type: str):
    # 实现根据 agent_type 参数返回不同代理配置的逻辑
    pass

2. 工具系统:扩展代理能力

目标

学习如何定义和集成扩展 LangGraph 代理功能的工具。

DeerFlow 中的关键概念

具有自动文档的工具定义

DeerFlow 使用 LangChain 的 @tool 装饰器并带有自动文档解析:

# backend/packages/harness/deerflow/tools/builtins/task_tool.py
from langchain.tools import tool
from typing import Annotated, Literal
from langchain.tools import InjectedToolCallId

@tool("task", parse_docstring=True)
def task_tool(
    runtime: ToolRuntime[ContextT, ThreadState],
    description: str,
    prompt: str,
    subagent_type: Literal["general-purpose", "bash"],
    tool_call_id: Annotated[str, InjectedToolCallId],
    max_turns: int | None = None,
) -> str:
    """将任务委托给在其自身上下文中运行的专用子代理。
    
    子代理可以帮助您:
    - 通过保持探索和实施的分离来保持上下文
    - 自治地处理复杂的多步骤任务
    - 在隔离的上下文中执行命令或操作
    
    可用的子代理类型:
    - **通用用途**:能够处理需要探索和行动的复杂多步骤任务的代理。
      当任务需要复杂推理、多个依赖步骤或将从隔离上下文中受益时使用。
    - **bash**:命令执行专家,用于运行 bash 命令。用于
      git 操作、构建过程,或者当命令输出将是冗长的时候。
    
    何时使用此工具:
    - 多步骤或多工具的复杂任务
    - 产生冗长输出的任务
    - 当您希望将上下文与主对话隔离时
    - 并行研究或探索任务
    
    何时不使用此工具:
    - 简单的单步操作(直接使用工具)
    - 需要用户交互或澄清的任务
    
    参数:
        description: 用于日志/显示的任务简短描述(3-5个词)。优先提供此参数。
        prompt: 子代理的任务描述。明确具体地说明需要完成的工作。优先提供此参数。
        subagent_type: 要使用的子代理类型。优先提供此参数。
        max_turns: 可选的最大代理回合数。默认为子代理配置的最大值。
    """
    # 实现...

为什么这很重要parse_docstring=True 参数会自动提取:

  • 装饰器中的工具名称("task"
  • 文档字符串中的描述
  • Args 部分中的参数定义
  • 使用示例和指南

这创建了自文档化的工具,LLM 可以理解和有效使用。

工具组和配置

工具通过 YAML 进行逻辑组织和配置:

# config.example.yaml
tools:
  - name: web_search
    use: deerflow.community.tavily:TavilySearchTool
    group: research
  - name: bash
    use: deerflow.sandbox.tools:bash
    group: system
  - name: read_file
    use: deerflow.sandbox.tools:read_file
    group: file_ops
    
tool_groups:
  - name: research
    description: 用于收集信息的工具
  - name: system
    description: 用于系统操作的工具
  - name: file_ops
    description: 用于文件系统操作的工具

为什么这很重要:工具组使以下成为可能:

  1. 基于角色的工具访问:不同的代理类型获得不同的工具组
  2. 逻辑组织:相关的工具被组合在一起
  3. 配置驱动行为:无需代码更改即可启用/禁用组
  4. 权限系统:限制对危险工具的访问

动态工具解析

DeerFlow 在运行时根据配置解析工具:

# backend/packages/harness/deerflow/config/app_config.py
def get_tool_config(self, name: str) -> ToolConfig | None:
    """根据名称获取工具配置。"""
    return next((tool for tool in self.tools if tool.name == name), None)

def get_available_tools(
    model_name: str | None = None, 
    groups: list[str] | None = None, 
    subagent_enabled: bool = False
) -> list[BaseTool]:
    """根据配置组装可用工具。"""
    # 1. 获取配置定义的工具
    # 2. 如果指定,按组过滤
    # 3. 如果启用,添加 MCP 工具
    # 4. 添加内置工具
    # 5. 如果启用,添加子代理工具
    # 6. 返回最终工具列表

为什么这很重要:此模式允许:

  • 特定环境的工具集:开发和生产环境中的不同工具
  • 功能标志:安全地启用实验性工具
  • 代理专业化:研究人员获得网络工具,编程者获得开发工具
  • 安全控制:在某些情况下限制危险工具

练习:构建数据库工具包

创建一组用于与 PostgreSQL 数据库交互的工具:

  1. db_query:执行 SELECT 查询并返回结果
  2. db_execute:执行 INSERT/UPDATE/DELETE 语句
  3. db_schema:获取表结构信息
  4. db_transaction:在事务中执行多个语句

将它们组织到一个 "database" 工具组中并创建配置示例。


3. 技能架构:基于插件的能力扩展

目标

了解如何在 LangGraph 应用程序中使用技能创建可扩展的能力系统。

DeerFlow 中的关键概念

技能格式

技能是带有 YAML 前言的 Markdown 文件,提供工作流说明:

# skills/public/deep-research/SKILL.md
name: deep-research
description: 一项用于对任何主题进行深入研究的技能
license: MIT
# 深度研究技能

## 概述
此技能提供了一个框架,用于对任何主题进行彻底研究。

## 何时使用
- 需要多个来源的复杂主题
- 当你需要全面覆盖时
- 用于生成带有引用的报告

## 工作流程
1. **分解**:将主题分解为 3-5 个子问题
2. **研究**:使用网络搜索调查每个子问题
3. **综合**:将发现组合成连贯的叙述
4. **引用**:正确引用所有来源
5. **审查**:检查空白和不一致之处

## 最佳实践
- 总是从多个来源验证信息
- 尽可能使用最近的来源(过去一年内)
- 区分事实和观点
- 记录所有来源以便引用

为什么这很重要:技能提供:

  1. 结构化工作流程:复杂任务的逐步指导
  2. 最佳实践:编码的专业知识以获得可靠的结果
  3. 可发现性:代理可以自动加载相关技能
  4. 渐进式加载:仅在需要时加载技能以节省上下文
  5. 可共享性:易于在项目之间分发和重用

技能加载系统

DeerFlow 的技能加载器扫描目录并解析技能文件:

# backend/packages/harness/deerflow/skills/loader.py
def load_skills(skills_path: Path | None = None, use_config: bool = True, enabled_only: bool = False) -> list[Skill]:
    # 获取技能目录路径
    if skills_path is None:
        if use_config:
            skills_path = get_app_config().skills.get_skills_path()
        else:
            skills_path = get_skills_root_path()
    
    skills = []
    
    # 扫描公共和自定义目录
    for category in ["public", "custom"]:
        category_path = skills_path / category
        if not category_path.exists() or not category_path.is_dir():
            continue
        
        # 遍历目录结构
        for current_root, dir_names, file_names in os.walk(category_path):
            # 跳过隐藏目录
            dir_names[:] = sorted(name for name in dir_names if not name.startswith("."))
            if "SKILL.md" not in file_names:
                continue
            
            # 解析技能文件
            skill_file = Path(current_root) / "SKILL.md"
            relative_path = skill_file.parent.relative_to(category_path)
            skill = parse_skill_file(skill_file, category=category, relative_path=relative_path)
            
            if skill:
                skills.append(skill)
    
    # 从配置加载启用状态
    try:
        extensions_config = ExtensionsConfig.from_file()
        for skill in skills:
            skill.enabled = extensions_config.is_skill_enabled(skill.name, skill.category)
    except Exception:
        # 如果配置失败,默认为启用
        pass
    
    # 过滤和排序
    if enabled_only:
        skills = [skill for skill in skills if skill.enabled]
    skills.sort(key=lambda s: s.name)
    
    return skills

为什么这很重要:技能加载系统使以下成为可能:

  1. 零配置发现:自动检测新技能
  2. 关注点分离:技能元数据与启用状态分离
  3. 环境隔离:公共(提交)与自定义(本地)技能
  4. 运行时更新:更改无需重新启动即可生效
  5. 性能优化:仅加载所需的技能

技能提示注入

技能被注入到代理提示中以供 LLM 消费:

# backend/packages/harness/deerflow/agents/lead_agent/prompt.py
def get_skills_prompt_section(available_skills: set[str] | None = None) -> str:
    """生成带有可用技能列表的技能提示部分。"""
    skills = load_skills(enabled_only=True)
    
    # 如果指定,则按可用技能过滤
    if available_skills is not None:
        skills = [skill for skill in skills if skill.name in available_skills]
    
    if not skills:
        return ""
    
    # 构建技能 XML 部分
    skill_items = "\n".join(
        f"    <skill>\n        <name>{skill.name}</name>\n        <description>{skill.description}</description>\n        <location>{skill.get_container_file_path(container_base_path)}</location>\n    </skill>" 
        for skill in skills
    )
    skills_list = f"<available_skills>\n{skill_items}\n</available_skills>"
    
    return f"""<skill_system>
您可以访问针对特定任务提供优化工作流程的技能。每个技能包含最佳实践、框架和参考资源。

**渐进式加载模式:**
1. 当用户查询匹配技能的用例时,立即使用技能标签下提供的路径属性调用 `read_file` 读取技能的主文件
2. 阅读并理解技能的工作流程和说明
3. 技能文件在同一文件夹下包含对外部资源的引用
4. 仅在执行过程中需要时加载引用的资源
5. 精确地按照技能的说明操作

**技能位于:** {container_base_path}

{skills_list}

</skill_system>"""

为什么这很重要:这种方法确保:

  1. 上下文效率:技能被引用,除非需要否则不会完全加载
  2. 及时加载:技能在其工作流程相关时加载
  3. 清晰说明:代理确切知道如何使用每个技能
  4. 资源引用:技能可以指向额外的材料
  5. 渐进式披露:简单任务不会被技能文档压垮

练习:构建数据分析技能系统

创建一个数据分析技能系统,包括:

  1. 数据清洁技能:处理缺失值、异常值和格式
  2. 探索性分析技能:描述性统计和可视化
  3. 建模技能:算法选择、训练和评估
  4. 报告技能:洞察提取、可视化选择和叙事创作

每个技能应包含:

  • 带有元数据的 YAML 前言
  • 清晰的工作流程描述
  • 最佳实践和常见陷阱
  • 相关工具和技术的参考

4. 子代理编排:并行执行模式

目标

掌握 LangGraph 的子代理委派和并行执行模式,用于复杂任务分解。

DeerFlow 中的关键概念

子代理隔离模式

子代理在具有自己的上下文、工具和配置的情况下运行:

# backend/packages/harness/deerflow/subagents/executor.py
class SubagentExecutor:
    def __init__(
        self,
        config: SubagentConfig,
        tools: list[BaseTool],
        parent_model: str | None = None,
        sandbox_state: SandboxState | None = None,
        thread_data: ThreadDataState | None = None,
        thread_id: str | None = None,
        trace_id: str | None = None,
    ):
        self.config = config
        self.parent_model = parent_model
        self.sandbox_state = sandbox_state
        self.thread_data = thread_data
        self.thread_id = thread_id
        self.trace_id = trace_id or str(uuid.uuid4())[:8]  # 如果未提供则生成
        
        # 根据子代理配置过滤工具
        self.tools = _filter_tools(
            tools,
            config.tools,          # 允许的工具(None = 所有)
            config.disallowed_tools, # 拒绝的工具(总是包含 "task")
        )
        
        logger.info(f"[trace={self.trace_id}] SubagentExecutor 初始化:{config.name},拥有 {len(self.tools)} 个工具")

为什么这很重要:子代理隔离提供:

  1. 上下文分离:防止任务之间的污染
  2. 资源限制:每个子代理获得自己的限制
  3. 专业化:不同的子代理可以具有不同的工具/模型
  4. 故障隔离:一个子代理的故障不会导致其他子代理崩溃
  5. 安全性:具有受控工具访问的沙箱执行

双线程池模式以提高效率

DeerFlow 使用两个线程池来防止阻塞:

# backend/packages/harness/deerflow/subagents/executor.py
# 用于后台任务调度和编排的线程池
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")

# 用于实际子代理执行(带超时支持)的线程池
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")

为什么这很重要:这种分离防止:

  1. 调度器阻塞:长时间运行的任务不会延迟新任务的提交
  2. 资源饥饿:执行池可以独立调整大小
  3. 超时管理:调度与执行的不同超时策略
  4. 清晰分离:编排逻辑与执行逻辑的分离

带实时更新的后台任务执行

任务以异步方式运行并带有进度报告:

# backend/packages/harness/deerflow/subagents/executor.py
def execute_async(self, task: str, task_id: str | None = None) -> str:
    """在后台启动任务执行。"""
    # 创建初始待处理结果
    result = SubagentResult(
        task_id=task_id or str(uuid.uuid4())[:8],
        trace_id=self.trace_id,
        status=SubagentStatus.PENDING,
    )
    
    # 存储结果以供轮询
    with _background_tasks_lock:
        _background_tasks[task_id] = result
    
    # 提交到调度器池
    def run_task():
        # 更新状态为 RUNNING
        with _background_tasks_lock:
            _background_tasks[task_id].status = SubagentStatus.RUNNING
            _background_tasks[task_id].started_at = datetime.now()
            result_holder = _background_tasks[task_id]
        
        try:
            # 执行带超时
            execution_future = _execution_pool.submit(self.execute, task, result_holder)
            exec_result = execution_future.result(timeout=self.config.timeout_seconds)
            
            # 更新最终结果
            with _background_tasks_lock:
                _background_tasks[task_id].status = exec_result.status
                _background_tasks[task_id].result = exec_result.result
                _background_tasks[task_id].error = exec_result.error
                _background_tasks[task_id].completed_at = datetime.now()
                _background_tasks[task_id].ai_messages = exec_result.ai_messages
        except Exception as e:
            # 处理执行失败
            with _background_tasks_lock:
                _background_tasks[task_id].status = SubagentStatus.FAILED
                _background_tasks[task_id].error = str(e)
                _background_tasks[task_id].completed_at = datetime.now()
    
    _scheduler_pool.submit(run_task)
    return task_id

为什么这很重要:此模式使以下成为可能:

  1. 非阻塞提交:立即返回带任务 ID
  2. 实时进度:可以在执行期间发送更新
  3. 适当的超时处理:将调度超时与执行超时分开
  4. 结果持久性:存储结果以便以后检索
  5. 错误隔离:故障不会影响主代理的执行

并发管理和批处理

严格的限制通过智能批处理防止资源耗尽:

# backend/packages/harness/deerflow/agents/lead_agent/prompt.py(子代理部分)
**⛔ 硬并发限制:每次响应最多 {n} 次 `task` 调用。这是强制性的。**
- 每次响应,您最多可以包含 {n} 次 `task` 工具调用。任何多余的调用将被系统**静默丢弃**
- **在启动子代理之前,您必须在思考中计算子任务的数量:**
  - 如果数量 ≤ {n}:在此响应中启动所有子任务。
  - 如果数量 > {n}**在此回合中选择最重要/基础的 {n} 个子任务。** 将剩余的保存到下一回合。
- **多批次执行**(对于 >{n} 个子任务):
  - 回合 1:并行启动子任务 1-{n} → 等待结果
  - 回合 2:并行启动下一批子任务 → 等待结果
  - ... 继续直到所有子任务完成
  - 最终回合:将所有结果综合成一个连贯的答案

为什么这很重要:这种方法防止:

  1. 资源耗尽:限制并发执行
  2. 上下文溢出:太多并行结果会压倒 LLM
  3. 速率限制问题:避免让外部 API 不堪重负
  4. 成本控制:防止过度使用令牌
  5. 用户体验:提供可预测的响应时间

批处理策略使以下成为可能:

  • 水平缩放:处理任意复杂的任务
  • 渐进式细化:早期结果为后期批次提供信息
  • 容错性:一批中的故障不会导致所有工作丢失
  • 资源优化:最优利用可用计算资源

练习:构建映射-减少子代理系统

创建一个实现映射-减少模式的子代理系统:

  1. 映射子代理:并行处理单个数据块
  2. 减少子代理:将所有映射器的结果组合起来
  3. 编排器:管理批处理、容错性和结果综合

该系统应处理:

  • 可变大小的输入数据集
  • 带有重试逻辑的映射器故障
  • 当一些映射器失败时的部分结果
  • 内存高效的结果组合

5. 沙箱执行:安全的代理环境

目标

了解如何为 LangGraph 中的代理创建安全、隔离的执行环境。

DeerFlow 中的关键概念

抽象沙箱接口

所有沙箱实现一个通用接口供代理交互:

# backend/packages/harness/deerflow/sandbox/sandbox.py
from abc import ABC, abstractmethod

class Sandbox(ABC):
    """沙箱环境的抽象基类"""
    
    _id: str
    
    def __init__(self, id: str):
        self._id = id
    
    @property
    def id(self) -> str:
        return self._id
    
    @abstractmethod
    def execute_command(self, command: str) -> str:
        """在沙箱中执行 bash 命令。
        
        参数:
            command: 要执行的命令。
            
        返回:
            命令的标准输出或错误输出。
        """
        pass
    
    @abstractmethod
    def read_file(self, path: str) -> str:
        """读取文件的内容。
        
        参数:
            path: 要读取的文件的绝对路径。
            
        返回:
            文件的内容。
        """
        pass
    
    @abstractmethod
    def list_dir(self, path: str, max_depth=2) -> list[str]:
        """列出目录的内容。
        
        参数:
            path: 要列出的目录的绝对路径。
            max_depth: 要遍历的最大深度。默认为 2。
            
        返回:
            目录的内容。
        """
        pass
    
    @abstractmethod
    def write_file(self, path: str, content: str, append: bool = False) -> None:
        """写入内容到文件。
        
        参数:
            path: 要写入的文件的绝对路径。
            content: 要写入的文本内容。
            append: 如果为 False,则创建或覆盖文件;如果为 True,则追加内容。
        """
        pass
    
    @abstractmethod
    def update_file(self, path: str, content: bytes) -> None:
        """用二进制内容更新文件。
        
        参数:
            path: 要更新的文件的绝对路径。
            content: 要写入的二进制内容。
        """
        pass

为什么这很重要:抽象接口使以下成为可能:

  1. 实现灵活性:不同的沙箱类型(本地、Docker、Firecracker 等)
  2. 代理一致性:代理以相同的方式与所有沙箱交互
  3. 轻松切换:更改沙箱实现而无需更改代理
  4. 测试便利性:使用模拟沙箱进行单元测试
  5. 安全边界:明确定义代理的能力

虚拟路径系统以实现安全性

代理看到映射到主机路径的容器路径,防止路径遍历攻击:

# backend/packages/harness/deerflow/sandbox/middleware.py
def replace_virtual_path(path: str) -> str:
    """将虚拟路径替换为实际路径。"""
    if path.startswith("/mnt/user-data/"):
        return str(get_paths().user_data_dir / path[len("/mnt/user-data/"):])
    elif path.startswith("/mnt/skills/"):
        return str(get_paths().skills_dir / path[len("/mnt/skills/"):])
    return path

def replace_virtual_paths_in_command(command: str) -> str:
    """将命令中的所有虚拟路径替换为实际路径。"""
    # 将命令拆分为标记,在每个标记中替换路径
    # 正确处理引用字符串
    pass

为什么这很重要:虚拟路径提供:

  1. 路径遍历保护:代理无法通过 ../../etc/passwd 访问 /etc/passwd
  2. 环境隔离:每个代理只能看到自己的文件空间
  3. 一致的接口/mnt/user-data/workspace 始终指向正确的位置
  4. 轻松清理:临时文件包含在已知位置中
  5. 多租户:多个代理可以同时运行而不会相互干扰

沙箱中间件集成

沙箱生命周期通过中间件管理:

# backend/packages/harness/deerflow/agents/lead_agent/agent.py
# 在 _build_middlewares 中的中间件执行顺序
middlewares = build_lead_runtime_middlewares(lazy_init=True)

# SandboxMiddleware - 获取沙箱,存储 sandbox_id 在状态中
# 必须在 ThreadDataMiddleware 之后以访问 thread_id
# 必须在需要沙箱访问的工具之前
sandbox_middleware = SandboxMiddleware()
middlewares.append(sandbox_middleware)

# ... 其他依赖于沙箱访问的中间件

为什么这很重要:基于中间件的沙箱管理确保:

  1. 自动获取:在代理执行之前就绪沙箱
  2. 适当的清理:执行后释放资源
  3. 线程隔离:每个线程获得自己的沙箱
  4. 错误处理:获取/释放过程中的故障得到处理
  5. 性能优化:在安全的情况下重用沙箱

工具路径翻译以实现透明性

工具自动处理虚拟路径和实际路径之间的翻译:

# backend/packages/harness/deerflow/sandbox/tools.py
def read_file(path: str) -> str:
    """读取文件内容,可选的行范围"""
    actual_path = replace_virtual_path(path)
    # 安全检查:确保路径在允许的边界内
    if not is_path_allowed(actual_path):
        raise ValueError(f"访问路径被拒绝:{path}")
    
    # 从实际路径读取
    with open(actual_path, "r", encoding="utf-8") as f:
        return f.read()

def write_file(path: str, content: str, append: bool = False) -> None:
    """写入/追加到文件,创建目录"""
    actual_path = replace_virtual_path(path)
    # 安全检查
    if not is_path_allowed(actual_path):
        raise ValueError(f"访问路径被拒绝:{path}")
    
    # 确保目录存在
    Path(actual_path).parent.mkdir(parents=True, exist_ok=True)
    
    # 写入实际路径
    mode = "a" if append else "w"
    with open(actual_path, mode, encoding="utf-8") as f:
        f.write(content)

为什么这很重要:自动路径翻译提供:

  1. 开发者透明度:代理使用直观的路径,如 /mnt/user-data/workspace/file.txt
  2. 安全执行:所有路径在实际文件访问前进行验证
  3. 一致行为:相同的代码在所有沙箱实现中工作
  4. 减少认知负担:无需记住复杂的路径映射
  5. 审计能力:所有文件访问可以被记录和监控

练习:构建 Firecracker 沙箱提供程序

创建一个使用 AWS Firecracker 微型虚拟机的沙箱提供程序,以实现比 Docker 更强的隔离:

  1. 实现 Sandbox 抽象基类
  2. 添加 Firecracker 特定的初始化和清理
  3. 确保适当的资源限制(CPU、内存、磁盘 I/O)
  4. 实现用于快速启动的快照/恢复功能
  5. 添加网络限制以增强安全性
  6. 创建不同安全级别的配置示例

6. 内存系统:持久上下文管理

目标

为代理实现长期内存并在 LangGraph 应用程序中有效管理上下文。

DeerFlow 中的关键概念

层次化内存结构

内存组织为用户上下文、历史和离散事实:

# backend/packages/harness/deerflow/agents/memory/updater.py
def _create_empty_memory() -> dict[str, Any]:
    """创建一个空的内存结构。"""
    return {
        "version": "1.0",
        "lastUpdated": datetime.utcnow().isoformat() + "Z",
        "user": {
            "workContext": {"summary": "", "updatedAt": ""},
            "personalContext": {"summary": "", "updatedAt": ""},
            "topOfMind": {"summary": "", "updatedAt": ""},
        },
        "history": {
            "recentMonths": {"summary": "", "updatedAt": ""},
            "earlierContext": {"summary": "", "updatedAt": ""},
            "longTermBackground": {"summary": "", "updatedAt": ""},
        },
        "facts": [],  # 带有元数据的离散事实
    }

为什么这很重要:此结构提供:

  1. 以用户为中心的信息:偏好、工作上下文、即时思考
  2. 时间意识:最近的与遥远的上下文,具有适当的详细程度
  3. 基于事实的知识:离散的、可验证的信息,带有置信度分数
  4. 选择性遗忘:不同记忆类型的不同衰减率
  5. 上下文丰富性: zarówno 叙事摘要和具体事实

内存更新管道与 LLM 处理

对话以异步方式处理以更新内存:

graph TD A[对话消息] --> B[内存中间件] B --> C{内存已启用?} C -->|是| D[内存队列] D --> E[后台线程] E --> F[LLM 处理] F --> G[内存更新] G --> H[原子文件保存] H --> I[缓存失效] I --> J[下次对话看到更新的内存]

为什么这很重要:此管道使以下成为可能:

  1. 非阻塞操作:内存更新不会减慢对话
  2. 智能处理:LLM 提取人类可能错过的见解
  3. 选择性保留:只有有价值的信息被长期存储
  4. 上下文适宜性:不同的时间框架具有适当的详细程度
  5. 持续改进:代理随着时间的推移更好地理解用户

带有去重和置信度的事实管理

新事实根据现有事实进行检查,并基于置信度进行替换:

# backend/packages/harness/deerflow/agents/memory/updater.py
def _apply_updates(
    self,
    current_memory: dict[str, Any],
    update_data: dict[str, Any],
    thread_id: str | None = None,
) -> dict[str, Any]:
    config = get_memory_config()
    now = datetime.utcnow().isoformat() + "Z"
    
    # 跟踪现有事实内容以防止重复
    existing_fact_keys = {
        _fact_content_key(fact.get("content"))
        for fact in current_memory.get("facts", [])
        if _fact_content_key(fact.get("content")) is not None
    }
    
    # 添加新事实并进行置信度过滤
    for fact in new_facts:
        confidence = fact.get("confidence", 0.5)
        if confidence >= config.fact_confidence_threshold:
            raw_content = fact.get("content", "")
            normalized_content = raw_content.strip()
            fact_key = _fact_content_key(normalized_content)
            
            # 如果我们已经有这个事实(除非新事实有更高置信度),则跳过
            if fact_key is not None and fact_key in existing_fact_keys:
                existing_fact = next(
                    (f for f in current_memory["facts"] 
                     if _fact_content_key(f.get("content")) == fact_key),
                    None
                )
                if existing_fact and existing_fact.get("confidence", 0) >= confidence:
                    continue  # 保留具有等于或更高置信度的现有事实
            
            # 添加新事实
            fact_entry = {
                "id": f"fact_{uuid.uuid4().hex[:8]}",
                "content": normalized_content,
                "category": fact.get("category", "context"),
                "confidence": confidence,
                "createdAt": now,
                "source": thread_id or "unknown",
            }
            current_memory["facts"].append(fact_entry)
            
            if fact_key is not None:
                existing_fact_keys.add(fact_key)
    
    # 通过保留最高置信度的事实来执行最大事实限制
    if len(current_memory["facts"]) > config.max_facts:
        current_memory["facts"] = sorted(
            current_memory["facts"],
            key=lambda f: f.get("confidence", 0),
            reverse=True,
        )[: config.max_facts]
    
    return current_memory

为什么这很重要:此事实管理系统提供:

  1. 知识质量:仅保留高置信度的事实
  2. 冲突解决:更新/更好的证据可以替换较旧的事实
  3. 来源追踪:知道信息来自何处
  4. 时间意识:较新的事实权重更高
  5. 存储效率:防止无界内存增长

供LLM消费的上下文注入

内存被格式化并注入到代理提示中:

# backend/packages/harness/deerflow/agents/lead_agent/prompt.py
def _get_memory_context(agent_name: str | None = None) -> str:
    """获取用于注入到系统提示中的内存上下文。"""
    try:
        from deerflow.agents.memory import format_memory_for_injection, get_memory_data
        from deerflow.config.memory_config import get_memory_config
        
        config = get_memory_config()
        if not config.enabled or not config.injection_enabled:
            return ""
        
        memory_data = get_memory_data(agent_name)
        memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
        
        if not memory_content.strip():
            return ""
        
        return f"""<memory>
{memory_content}
</memory>
"""
    except Exception as e:
        print(f"Failed to load memory context: {e}")
        return ""

为什么这很重要:上下文注入使以下成为可能:

  1. 个性化:代理记住用户偏好和历史
  2. 连续性:对话建立在之前的互动之上
  3. 效率:重复使用之前计算出的见解
  4. 个人风格:代理展示对用户上下文的理解
  5. 更好的结果:更相关和准确的响应

内存维护和卫生

自动清理防止内存污染:

# backend/packages/harness/deerflow/agents/memory/updater.py
_UPLOAD_SENTENCE_RE = re.compile(
    r"[^.!?]*\b(?:"
    r"upload(?:ed|ing)?(?:\s+\w+){0,3}\s+(?:file|files?|document|documents?|attachment|attachments?)"
    r"|file\s+upload"
    r"|/mnt/user-data/uploads/"
    r"|<uploaded_files>"
    r")[^.!?]*[.!?]?\s*",
    re.IGNORECASE,
)

def _strip_upload_mentions_from_memory(memory_data: dict[str, Any]) -> dict[str, Any]:
    """从所有内存摘要和事实中删除关于文件上传的句子。"""
    # 清洗用户/历史部分的摘要
    for section in ("user", "history"):
        section_data = memory_data.get(section, {})
        for _key, val in section_data.items():
            if isinstance(val, dict) and "summary" in val:
                cleaned = _UPLOAD_SENTENCE_RE.sub("", val["summary"]).strip()
                cleaned = re.sub(r"  +", " ", cleaned)
                val["summary"] = cleaned
    
    # 还要删除任何描述上传事件的事实
    facts = memory_data.get("facts", [])
    if facts:
        memory_data["facts"] = [f for f in facts if not _UPLOAD_SENTENCE_RE.search(f.get("content", ""))]
    
    return memory_data

为什么这很重要:内存卫生防止:

  1. 陈旧信息:会话特定数据不适当地持续存在
  2. 错误关联:上传事件不会创建错误的用户偏好
  3. 上下文污染:无关事实不会稀释重要知识
  4. 存储膨胀:瞬态数据不会消耗永久存储
  5. 用户信任:代理不会“记住”它不应该记住的事情

练习:构建时间感知内存系统

通过时间意识增强内存系统:

  1. 衰减函数:不同内存类型的不同衰减率
  2. 强化机制:频繁访问的事实获得置信度
  3. 上下文检索:检索与当前对话主题相关的事实
  4. 内存摘要:定期将相关事实合并为更高层次的洞察
  5. 隐私控制:用户可配置的数据保留策略
  6. 内存调试:用于检查和编辑存储内存的工具

7. 中间件模式:横切关注点

目标

掌握用于在代理系统中处理横切关注点的中间件。

DeerFlow 中的关键概念

有序中间件链执行

每个中间件在一个明确定义的序列中处理特定的问题:

# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def _build_middlewares(config: RunnableConfig, model_name: str | None, agent_name: str | None = None):
    """根据运行时配置构建中间件链。"""
    middlewares = build_lead_runtime_middlewares(lazy_init=True)
    
    # 1. 摘要 - 在接近限制时减少上下文
    summarization_middleware = _create_summarization_middleware()
    if summarization_middleware is not None:
        middlewares.append(summarization_middleware)
    
    # 2. 待办事项列表 - 复杂工作流的任务跟踪
    todo_list_middleware = _create_todo_list_middleware(is_plan_mode)
    if todo_list_middleware is not None:
        middlewares.append(todo_list_middleware)
    
    # 3. TitleMiddleware - 自动生成线程标题
    middlewares.append(TitleMiddleware())
    
    # 4. MemoryMiddleware - 将对话排队以进行异步内存更新
    middlewares.append(MemoryMiddleware(agent_name=agent_name))
    
    # 5. ViewImageMiddleware - 在 LLM 调用前注入 base64 图像数据
    # (仅当模型支持视觉时添加)
    if model_config is not None and model_config.supports_vision:
        middlewares.append(ViewImageMiddleware())
    
    # 6. DeferredToolFilterMiddleware - 隐藏延迟工具模式
    if app_config.tool_search.enabled:
        middlewares.append(DeferredToolFilterMiddleware())
    
    # 7. SubagentLimitMiddleware - 强制执行并发限制
    if subagent_enabled:
        middlewares.append(SubagentLimitMiddleware(max_concurrent=max_concurrent_subagents))
    
    # 8. LoopDetectionMiddleware - 防止无限工具调用循环
    middlewares.append(LoopDetectionMiddleware())
    
    # 9. ClarificationMiddleware - 处理澄清请求(必须最后)
    middlewares.append(ClarificationMiddleware())
    
    return middlewares

为什么这很重要:特定的顺序确保:

  1. 首先进行摘要:在其他处理之前减少令牌负载
  2. 尽早进行任务跟踪:在执行开始之前跟踪意图
  3. 标题生成:在第一次交换后但内存处理之前
  4. 内存排队:捕获对话以供以后处理
  5. 视觉支持:在 LLM 查看之前注入图像
  6. 工具过滤:隐藏复杂工具直至需要时
  7. 并发限制:防止资源耗尽
  8. 循环检测:早期捕获问题模式
  9. 澄清最后:在需要时中断执行

用于横切关注点的中间件

每个中间件解决一个特定问题:

  1. ThreadDataMiddleware:为文件隔离创建每个线程的目录
  2. UploadsMiddleware:跟踪并使代理可用的用户上传
  3. SandboxMiddleware:管理沙箱获取/释放生命周期
  4. DanglingToolCallMiddleware:处理来自中断的不完整工具调用
  5. GuardrailMiddleware:安全/合规的预执行授权
  6. SummarizationMiddleware:当令牌限制被接近时的上下文管理
  7. TodoListMiddleware:通过显式任务跟踪启用计划模式
  8. TitleMiddleware:自动生成有意义的对话标题
  9. MemoryMiddleware:实现持久长期内存
  10. ViewImageMiddleware:处理视觉能力模型的图像
  11. DeferredToolFilterMiddleware:优化工具模式呈现
  12. SubagentLimitMiddleware:防止由于过多子代理导致的资源耗尽
  13. LoopDetectionMiddleware:检测并中断重复的工具调用模式
  14. ClarificationMiddleware:处理用户澄清请求

为什么这很重要:这种中间件方法提供:

  1. 关注点分离:每个中间件有一个明确的责任
  2. 可组合性:为不同代理类型混合和匹配中间件
  3. 运行时可配置性:无需代码更改即可启用/禁用功能
  4. 可测试性:每个中间件可以独立进行测试
  5. 可重用性:将常见模式提取为可共享的组件
  6. 可维护性:对一个问题的更改不会影响其他问题

中间件接口和通信

中间件通过 LangGraph 中间件系统进行交互:

# 示例:简化的中间件结构
class BaseMiddleware:
    async def __call__(self, call):
        # 预处理逻辑
        # ...
        
        # 调用链中的下一个中间件
        response = await call
        
        # 后处理逻辑
        # ...
        
        return response

为什么这很重要:此模式使以下成为可能:

  1. 预/后处理:在核心逻辑之前和之后处理问题
  2. 短路:在需要时阻止执行(例如,澄清)
  3. 错误处理:统一捕获和处理异常
  4. 状态修改:读取和写入代理状态
  5. 副作用:执行日志、度量、外部调用
  6. 链控制:决定是否继续到下一个中间件

练习:构建度量和监控中间件

创建一个收集和报告代理性能指标的中间件:

  1. 请求计时:测量端到端响应时间
  2. 工具使用情况:跟踪工具调用的频率和持续时间
  3. 令牌消耗:监控输入/输出令牌使用情况
  4. 错误率:分类和跟踪不同的失败类型
  5. 资源利用率:监控执行期间的内存和 CPU 使用情况
  6. 仪表板集成:通过 Prometheus 或类似工具公开度量
  7. 警报:在出现异常行为或 SLA 违规时通知

8. 配置驱动开发

目标

学习如何创建能够适应不同环境和需求的灵活、可配置的代理系统。

DeerFlow 中的关键概念

带环境变量解析的层次化配置

配置使用带有智能解析的 Pydantic 模型:

# backend/packages/harness/deerflow/config/app_config.py
class AppConfig(BaseModel):
    models: list[ModelConfig] = Field(default_factory=list, description="可用模型")
    sandbox: SandboxConfig = Field(description="沙箱配置")
    tools: list[ToolConfig] = Field(default_factory=list, description="可用工具")
    tool_groups: list[ToolGroupConfig] = Field(default_factory=list, description="可用工具组")
    skills: SkillsConfig = Field(default_factory=SkillsConfig, description="技能配置")
    extensions: ExtensionsConfig = Field(default_factory=ExtensionsConfig, description="扩展配置")
    tool_search: ToolSearchConfig = Field(default_factory=ToolSearchConfig, description="工具搜索配置")
    # ... 其他部分
    
    @classmethod
    def resolve_env_variables(cls, config: Any) -> Any:
        """递归解析配置中的环境变量。"""
        if isinstance(config, str):
            if config.startswith("$"):
                env_value = os.getenv(config[1:])
                if env_value is None:
                    raise ValueError(f"环境变量 {config[1:]} 未找到,用于配置值 {config}")
                return env_value
            return config
        elif isinstance(config, dict):
            return {k: cls.resolve_env_variables(v) for k, v in config.items()}
        elif isinstance(config, list):
            return [cls.resolve_env_variables(item) for item in config]
        return config

为什么这很重要:此配置系统提供:

  1. 类型安全:Pydantic 在加载时验证配置
  2. 环境灵活性:不同部署环境的不同值
  3. 密钥管理:将凭据保留在配置文件之外
  4. 层次覆盖:命令行 > 环境变量 > 文件 > 默认值
  5. 文档生成:从模型定义自动生成文档
  6. IDE 支持:在编辑器中提供自动补全和验证
  7. 版本跟踪:检测和警告过时的配置

无需重启的配置重新加载

智能重新加载检测文件更改并更新配置:

# backend/packages/harness/deerflow/config/app_config.py
_app_config: AppConfig | None = None
_app_config_path: Path | None = None
_app_config_mtime: float | None = None

def get_app_config() -> AppConfig:
    """获取带有自动重新加载的 DeerFlow 配置实例。"""
    global _app_config, _app_config_path, _app_config_mtime
    
    # 检查我们是否需要根据路径或修改时间重新加载
    resolved_path = AppConfig.resolve_config_path()
    current_mtime = _get_config_mtime(resolved_path)
    
    should_reload = (
        _app_config is None
        or _app_config_path != resolved_path
        or _app_config_mtime != current_mtime
    )
    
    if should_reload:
        # 重新加载并更新缓存
        _app_config = _load_and_cache_app_config(str(resolved_path))
        _app_config_path = resolved_path
        _app_config_mtime = current_mtime
    
    return _app_config

为什么这很重要:自动重新加载使以下成为可能:

  1. 零停机更新:在不重启服务的情况下更改行为
  2. 快速实验:快速尝试不同的配置
  3. 生产调整:根据观察到的行为调整参数
  4. 应急响应:快速更改行为以响应事件
  5. 开发效率:在开发过程中立即看到更改
  6. 配置漂移检测:识别文件和内存何时不同

模块化配置部分

复杂的配置被分解为专注的、可重用的部分:

# backend/packages/harness/deerflow/config/model_config.py
class ModelConfig(BaseModel):
    """单个 LLM 模型的配置。"""
    name: str = Field(description="内部标识符")
    display_name: str = Field(description="人类可读名称")
    use: str = Field(description="LangChain 类路径")
    model: str = Field(description="API 的模型标识符")
    api_key: Optional[str] = Field(description="API 密钥(推荐使用环境变量)")
    max_tokens: Optional[int] = Field(description="每次请求的最大标记数")
    temperature: float = Field(description="采样温度", default=0.7)
    # ... 思考/视觉支持标志
    supports_thinking: bool = Field(description="模型是否支持思考模式", default=False)
    supports_vision: bool = Field(description="模型是否支持图像输入", default=False)
    # 提供者特定字段
    base_url: Optional[str] = Field(description="自定义 API 基础 URL", default=None)
    use_responses_api: bool = Field(description="使用 OpenAI 响应 API", default=False)
    output_version: Optional[str] = Field(description="响应 API 的输出版本", default=None)

为什么这很重要:模块化配置提供:

  1. 关注点分离:每个部分处理配置的一个方面
  2. 可重用性:提取常见模式(例如,用于不同提供者的模型配置)
  3. 可维护性:对一个部分的更改不会影响其他部分
  4. 可测试性:每个配置部分可以独立测试
  5. 文档清晰度:用户可以快速找到相关设置
  6. 团队协作:不同团队可以拥有不同的部分

运行时配置覆盖

配置可以在特定请求时被覆盖:

# backend/packages/harness/deerflow/agents/lead_agent/agent.py
def make_lead_agent(config: RunnableConfig):
    cfg = config.get("configurable", {})
    
    # 提取运行时覆盖
    thinking_enabled = cfg.get("thinking_enabled", True)
    model_name = cfg.get("model_name") or cfg.get("model")
    is_plan_mode = cfg.get("is_plan_mode", False)
    subagent_enabled = cfg.get("subagent_enabled", False)
    max_concurrent_subagents = cfg.get("max_concurrent_subagents", 3)
    
    # ... 使用这些值来配置代理

为什么这很重要:运行时覆盖使以下成为可能:

  1. 个性化:不同用户获得不同的体验
  2. 上下文适应:行为基于对话主题而改变
  3. A/B 测试:在生产中比较不同的配置
  4. 功能标志:逐步推出新功能
  5. 紧急覆盖:在事件期间临时更改行为
  6. 实验:在不影响默认设置的情况下尝试新配置

练习:构建功能标志系统

创建一个支持复杂功能标志的配置系统:

  1. 百分比推出:逐步为一定比例的用户启用功能
  2. 有针对性的发布:为特定用户群体启用功能
  3. 基于时间的激活:功能在特定时间开启/关闭
  4. 依赖管理:需要其他功能的功能
  5. 杀死开关:立即禁用有问题的功能
  6. 实验跟踪:衡量不同配置的影响
  7. 即时代码:将标志与代码一起存储在版本控制中

9. MCP 集成:外部服务连接

目标

学习如何使用模型上下文协议(MCP)在 LangGraph 应用程序中将代理连接到外部服务。

DeerFlow 中的关键概念

MCP 客户端架构

DeerFlow 使用 langchain-mcp-adapters 进行 MCP 集成:

# backend/packages/harness/deerflow/mcp/(概念性 - 实际实现可能有所不同)
from langchain_mcp_adapters.client import MultiServerMCPClient

class MCPManager:
    def __init__(self):
        self.client = MultiServerMCPClient()
        self._tools_cache = {}
        self._cache_timestamps = {}
    
    async def get_tools(self, server_names: list[str] | None = None) -> list[BaseTool]:
        """获取 MCP 工具,尽可能使用缓存。"""
        # 检查缓存有效性
        # 如果缓存过期或缺失,则从服务器获取新工具
        # 返回合并的工具列表
    
    def _should_refresh_cache(self, server_name: str) -> bool:
        """检查服务器的缓存是否需要刷新。"""
        # 比较配置文件修改时间与缓存时间戳
        pass
    
    async def _refresh_server_tools(self, server_name: str) -> list[BaseTool]:
        """从特定的 MCP 服务器获取工具。"""
        # 连接到服务器
        # 列出可用工具
        # 将其转换为 LangChain 工具格式
        # 返回工具列表

为什么这很重要:MCP 提供:

  1. 标准化界面:一种一致的方式来连接到多样化的服务
  2. 服务发现:自动查找可用的功能
  3. 认证处理:内置支持各种认证方法
  4. 错误标准化:跨服务的一致错误处理
  5. 元数据丰富性:工具带有描述、示例等
  6. 社区生态系统:流行服务的 MCP 服务器数量不断增长

懒惰初始化和缓存

工具在需要时按需加载,并带有智能缓存失效:

# backend/packages/harness/deerflow/mcp/client.py(概念性)
def get_cached_mcp_tools() -> list[BaseTool]:
    """获取具有懒惰初始化和缓存失效的 MCP 工具。"""
    # 检查我们是否有缓存的工具
    # 如果没有,或者如果配置已更改,则从服务器刷新
    # 返回缓存的或新鲜加载的工具
    
    # 基于以下因素进行缓存失效:
    # 1. 配置文件修改时间
    # 2. 显式缓存 TTL 过期
    # 3. 服务器可用性变化

为什么这很重要:这种方法提供:

  1. 启动性能:只有在需要时才加载工具
  2. 内存效率:只有当工具有用时才保持工具在内存中
  3. 自动更新:当服务器发生变化时选择新工具
  4. 故障韧性:如果服务器暂时不可用,则继续工作
  5. 资源优化:不要在未使用的服务器上浪费连接
  6. 开发者体验:快速的开发周期

传输无关设计

支持多种传输机制:

# 展示不同传输的配置示例
# extensions_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "npx",
      "args": [-y, "@modelcontextprotocol/server-filesystem", "/path/to/allowed/files"],
      "transport": "stdio"
    },
    "browser": {
      "url": "http://localhost:3000/mcp",
      "transport": "HTTP"
    },
    "knowledge-base": {
      "url": "https://api.example.com/mcp",
      "transport": "SSE",
      "headers": {
        "Authorization": "Bearer $MCP_API_TOKEN"
      }
    }
  }
}

为什么这很重要:传输灵活性使以下成为可能:

  1. 本地开发:stdio 用于轻松的测试和调试
  2. 生产部署:HTTP/SSE 用于可扩展的网络化服务
  3. 防火墙灵活性:选择在您的网络环境中有效的传输
  4. 性能优化:stdio 用于低延迟本地工具,HTTP 用于远程工具
  5. 安全选项:不同的传输具有不同的安全特性
  6. 供应商支持:使用服务提供商提供的任何传输

OAuth 令牌流管理

HTTP/SSE 传输的自动认证处理:

# backend/packages/harness/deerflow/mcp/client.py(概念性)
class MCPClientWithOAuth:
    def __init__(self, server_url: str, oauth_config: dict):
        self.server_url = server_url
        self.oauth_config = oauth_config
        self.access_token = None
        self.refresh_token = None
        self.token_expires_at = None
    
    async def ensure_valid_token(self):
        """确保我们有有效的访问令牌,如果需要则刷新它。"""
        if self._is_token_valid():
            return
        
        if self._can_refresh():
            await self._refresh_access_token()
        else:
            await self._perform_full_oauth_flow()
    
    async def _refresh_access_token(self):
        """使用刷新令牌获取新的访问令牌。"""
        # 发出令牌刷新请求
        # 更新 access_token 和 expires_at
        # 如果提供了新的刷新令牌,则存储它
    
    async def _perform_full_oauth_flow(self):
        """执行完整的 OAuth 授权码流程。"""
        # 如果需要,生成 PKCE 挑战
        # 将用户重定向到授权 URL
        # 处理带有授权码的回调
        # 用代码交换令牌

为什么这很重要:OAuth 管理提供:

  1. 无缝用户体验:不需要手动处理令牌
  2. 安全最佳实践:适当的令牌存储和轮换
  3. 减少摩擦:用户不需要管理凭据
  4. 服务兼容性:适用于标准 OAuth 提供者
  5. 令牌卫生:自动清理过期的令牌
  6. 审计能力:跟踪令牌使用和刷新模式

练习:为内部服务构建自定义 MCP 服务器

创建一个向代理暴露您的内部服务的 MCP 服务器:

  1. 服务识别:发现并记录可用的内部 API
  2. 工具创建:将 API 端点转换为带有适当描述的 MCP 工具
  3. 认证集成:连接到您的内部认证系统
  4. 速率限制:保护内部服务免受过度使用
  5. 数据转换:在内部格式和 MCP 期望之间进行转换
  6. 错误处理:将内部错误映射到适当的 MCP 错误响应
  7. 文档编写:提供清晰的示例和使用指南
  8. 安全审查:确保适当的访问控制和数据保护

10. 生产模式:企业级实现

目标

实现具有时效性、可扩展性和可维护性考虑的企业级代理系统。

DeerFlow 中的关键概念

可观测性和监控

生产代理需要全面的可见性:

# 基于 DeerFlow 模式的概念实现
class ObservabilityMiddleware:
    async def __call__(self, call):
        # 开始计时器
        start_time = time.time()
        
        # 提取请求信息
        request_id = generate_request_id()
        user_id = extract_user_id_from_context()
        
        try:
            # 处理请求
            response = await call
            
            # 记录成功指标
            record_metrics(
                request_id=request_id,
                user_id=user_id,
                duration=time.time() - start_time,
                status="success",
                tool_calls=extract_tool_calls(response),
                token_usage=extract_token_usage(response)
            )
            
            return response
            
        except Exception as e:
            # 记录失败指标
            record_metrics(
                request_id=request_id,
                user_id=user_id,
                duration=time.time() - start_time,
                status="error",
                error_type=type(e).__name__,
                error_message=str(e)
            )
            
            # 为正常的错误处理重新抛出
            raise

为什么这很重要:可观察性使以下成为可能:

  1. 性能优化:识别瓶颈和缓慢的操作
  2. 容量规划:了解资源使用模式
  3. 故障排除:快速诊断和解决问题
  4. SLA 监控:确保服务级别目标得到实现
  5. 业务洞察:了解用户如何与代理交互
  6. 持续改进:数据驱动的代理行为增强

优雅降级和熔断器

生产系统以优雅的方式处理故障:

# 概念性熔断器实现
class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if self._should_attempt_reset():
                self.state = "HALF_OPEN"
            else:
                raise CircuitBreakerOpenException()
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        self.state = "CLOSED"
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
    
    def _should_attempt_reset(self):
        return (
            self.state == "OPEN" and
            time.time() - self.last_failure_time >= self.timeout
        )

为什么这很重要:韧性模式提供:

  1. 故障隔离:防止级联故障
  2. 优雅降级:在部分中断期间保持核心功能
  3. 自动恢复:在问题解决时恢复正常操作
  4. 用户体验:提供有意义的反馈而不是神秘的错误
  5. 系统稳定性:在故障期间防止资源耗尽
  6. 运营简便性:减少故障期间的人工干预

安全和合规特性

生产代理必须满足安全和合规要求:

# 概念性安全中间件
class SecurityMiddleware:
    async def __call__(self, call):
        # 输入验证和清理
        sanitized_input = self._sanitize_input(call.input)
        
        # 访问控制检查
        if not self._is_authorized(call.user, call.request):
            raise AuthorizationError("权限不足")
        
        # 速率限制
        if not self._check_rate_limit(call.user):
            raise RateLimitExceededException()
        
        # 内容过滤
        if self._contains_prohibited_content(sanitized_input):
            raise ContentPolicyViolation()
        
        # 进行清理、授权的请求
        try:
            response = await call
            
            # 输出验证
            safe_output = self._sanitize_output(response)
            
            # 审计日志
            self._log_audit_event(call.user, sanitized_input, safe_output)
            
            return safe_output
            
        except Exception as e:
            # 记录与安全相关的异常
            self._log_security_event(call.user, e)
            raise

为什么这很重要:安全特性确保:

  1. 数据保护:防止未经授权访问敏感信息
  2. 法规遵从:满足 GDPR、HIPAA、SOC2 等要求
  3. 滥用防范:挫败自动攻击和刮取
  4. 用户信任:展示对隐私和安全的承诺
  5. 法律保护:减少因滥用或泄漏而产生的责任
  6. 声誉管理:避免损害品牌的安全事件

部署和运营考虑因素

生产就绪的代理需要适当的部署模式:

# 展示生产考虑因素的 Kubernetes 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
  name: deerflow-agent
  labels:
    app: deerflow-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: deerflow-agent
  template:
    metadata:
      labels:
        app: deerflow-agent
    spec:
      containers:
      - name: agent
        image: deerflow-agent:latest
        ports:
        - containerPort: 8000
        env:
        - name: LOG_LEVEL
          valueFrom:
            configMapKeyRef:
              name: deerflow-config
              key: log_level
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: deerflow-secrets
              key: database_url
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: tmp-volume
          mountPath: /tmp
        - name: cache-volume
          mountPath: /var/cache/deerflow
      volumes:
      - name: tmp-volume
        emptyDir: {}
      - name: cache-volume
        emptyDir:
          sizeLimit: 5Gi

为什么这很重要:生产部署注意事项包括:

  1. 可伸缩性:水平缩放以处理负载变化
  2. 资源管理:防止容器挨饿或过度供应
  3. 健康检查:自动重启不健康的实例
  4. 配置管理:将配置与容器镜像分离
  5. 密钥管理:安全地处理凭据和证书
  6. 日志和监控:集中式可观测性
  7. 备份和灾难恢复:防止数据丢失
  8. 更新策略:零停机部署和回滚

练习:构建生产就绪的代理平台

创建一个包含以下内容的生产就绪代理平台:

  1. 多租户:安全地隔离不同的客户或团队
  2. 使用量计量:跟踪和计费资源消耗
  3. API 网关:速率限制、身份验证和请求路由
  4. 服务网格:可观测性、流量管理和安全性
  5. 灾难恢复:备份策略和跨区域复制
  6. 合规报告:为监管机构生成审计报告
  7. 性能优化:缓存、连接池和查询优化
  8. 开发者门户:文档、SDK 和沙箱环境
  9. 警报和通知:主动问题检测和升级
  10. 混沌工程:有意注入故障以测试韧性

综合:将所有内容结合在一起

让我们看看这些概念如何在典型的 DeerFlow 交互中协同工作:

典型请求流程

  1. 请求接收

    • 用户通过网页界面、API 或 IM 通道发送消息
    • 请求击中 Nginx,被路由到适当的服务
  2. 配置加载

    • get_app_config() 加载带有环境变量解析的配置
    • 中间件链根据运行时标志构建(is_plan_modesubagent_enabled 等)
  3. 代理初始化

    • 带有自定义 ThreadState 的主代理被创建
    • 根据配置和组加载适当的工具
    • 技能在系统提示中被引用(尚未完全加载)
    • 内存上下文被检索并注入到提示中
  4. 处理开始

    • ThreadDataMiddleware 创建每个线程的目录
    • UploadsMiddleware 处理任何文件上传
    • SandboxMiddleware 获取沙箱执行环境
    • 各种中间件为 LLM 调用准备状态
  5. LLM 推理

    • LLM 收到包含以下内容的提示:
      • 系统说明(包括技能引用)
      • 内存上下文(用户偏好、历史、事实)
      • 对话历史
      • 可用工具描述
    • LLM 使用链式思维推理来思考问题
    • LLM 决定行动方案(工具使用、澄清等)
  6. 工具执行(如果需要)

    • 如果调用了工具:
      • 适当的中间件处理它(SandboxMiddleware 确保正确的路径)
      • 工具在隔离的沙箱环境中执行
      • 结果返回到代理状态
      • ViewImageMiddleware 处理视觉模型的任何生成图像
  7. 子代理委派(对于复杂任务)

    • 如果调用了 task 工具:
      • TaskTool 验证参数并创建 SubagentExecutor
      • 执行器创建带有适当工具/配置的隔离子代理
      • 子代理在带有超时保护的后台线程池中运行
      • 主代理根据执行模式继续或等待
      • 结果在就绪时被收集和综合
  8. 内存更新

    • MemoryMiddleware 将对话排队以进行异步处理
    • 后台线程运行 LLM 提取洞察和事实
    • 内存更新器应用带有去重和置信度评分的更改
    • 从长期记忆中剥离上传引用
    • 更新的内存被保存并缓存以供将来使用
  9. 响应生成

    • LLM 基于更新的状态生成最终响应
    • 根据 response_style 指南格式化响应
    • 为任何使用的外部信息添加引用
    • 通过适当的渠道将响应返回给用户
  10. 后处理

    • TitleMiddleware 可能根据对话更新线程标题
    • 执行任何剩余的清理
    • 系统准备好进行下一次交互

DeerFlow 演示的关键架构原则

DeerFlow 体现了几个重要的 LangGraph 和代理架构原则:

  1. 关注点分离

    • 不同的组件处理状态、工具、技能、内存、沙箱等
    • 每个问题可以独立开发、测试和维护
  2. 组合优于继承

    • 通过组合简单、专注的组件来构建复杂行为
    • 中间件链展示了如何组合横切关注点
    • 工具系统展示了如何组合能力
  3. 配置驱动行为

    • 运行时配置在不更改代码的情况下启用灵活性
    • 功能标志允许安全的实验和逐步推出
    • 特定环境的配置支持开发/测试/生产差异
  4. 渐进式加载和上下文效率

    • 技能、工具和记忆仅在需要时加载
    • 虚拟路径提供一致的接口而不暴露复杂性
    • 后台处理保持主线程的响应能力
  5. 隔离和安全

    • 沙箱执行防止代理逃逸或主机污染
    • 子代理隔离防止上下文污染和资源争用
    • 虚拟路径和输入验证防止安全问题
  6. 可观察性和可调试性

    • 贯穿系统的结构化日志和追踪
    • 清晰的关注点分离有助于问题隔离
    • 配置版本控制有助于检测漂移
  7. 可扩展性和插件架构

    • 技能系统允许在不更改核心的情况下添加新工作流程
    • 工具系统允许添加新功能
    • MCP 集成允许连接到外部服务
    • 中间件系统允许添加新的横切关注点

您的 LangGraph 学习之旅的下一步

现在您已经看到了 DeerFlow 如何实现高级 LangGraph 概念,以下是一些继续学习的方法:

  1. 加深理解

    • 彻底阅读 LangGraph 文档
    • 研究 LangChain 代理和工具文档
    • 探索 langgraph-api 和 langgraph-js 存储库
  2. 实验修改

    • 分叉 DeerFlow 并尝试在每个部分实现练习
    • 从简单的更改开始(如添加一个新工具),逐步过渡到复杂的更改(如实现一个新的沙箱类型)
    • 用单元测试和集成测试彻底测试您的更改
  3. 构建您自己的代理系统

    • 将您学到的模式应用到您自己的用例中
    • 从针对特定领域的专注代理开始
    • 随着对需求的更好理解,逐步添加复杂性
  4. 为社区做出贡献

    • 与 DeerFlow 社区分享您的技能和工具
    • 参与关于最佳实践和模式的讨论
    • 通过回答问题和创建教程来帮助他人学习
  5. 保持最新

    • 关注 LangGraph 和 LangChain 生态系统的发展
    • 注意该领域出现的新模式和技术
    • 考虑新兴技术(如更好的推理模型、改进的工具使用等)可能如何影响代理架构

请记住,目标不是完全复制 DeerFlow,而是理解其背后的原则和模式,以便将它们应用到您自己的代理系统中。最好的代理架构是适合您特定需求、约束和用例的架构。

祝您构建顺利,愿您的代理始终有帮助且富有洞察力!


Children
  1. 部署和二次开发