Documentation Index
Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
通过实现在智能体执行流程中特定点运行的钩子来构建自定义中间件。
中间件提供两种风格的钩子来拦截智能体执行:
节点式钩子
在特定执行点按顺序运行。用于日志记录、验证和状态更新。
选择你的中间件需要的钩子。你可以在节点式钩子和包装式钩子之间选择。
节点式钩子在特定执行点运行:
| 钩子 | 运行时机 |
|---|
before_agent | 智能体启动前(每次调用一次) |
before_model | 每次模型调用前 |
after_model | 每次模型响应后 |
after_agent | 智能体完成后(每次调用一次) |
包装式钩子围绕每次调用运行,让你控制执行:
| 钩子 | 运行时机 |
|---|
wrap_model_call | 围绕每次模型调用 |
wrap_tool_call | 围绕每次工具调用 |
示例:
from langchain.agents.middleware import before_model, after_model, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any
@before_model(can_jump_to=["end"])
def check_message_limit(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
if len(state["messages"]) >= 50:
return {
"messages": [AIMessage("已达到对话限制。")],
"jump_to": "end"
}
return None
@after_model
def log_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"模型返回:{state['messages'][-1].content}")
return None
from langchain.agents.middleware import AgentMiddleware, AgentState, hook_config
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any
class MessageLimitMiddleware(AgentMiddleware):
def __init__(self, max_messages: int = 50):
super().__init__()
self.max_messages = max_messages
@hook_config(can_jump_to=["end"])
def before_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
if len(state["messages"]) >= self.max_messages:
return {
"messages": [AIMessage("已达到对话限制。")],
"jump_to": "end"
}
return None
def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"模型返回:{state['messages'][-1].content}")
return None
包装式钩子
拦截执行并控制何时调用处理程序。用于重试、缓存和转换。
你决定处理程序被调用零次(短路)、一次(正常流程)或多次(重试逻辑)。
可用钩子:
wrap_model_call - 围绕每次模型调用
wrap_tool_call - 围绕每次工具调用
示例:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable
@wrap_model_call
def retry_model(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
for attempt in range(3):
try:
return handler(request)
except Exception as e:
if attempt == 2:
raise
print(f"重试 {attempt + 1}/3,错误:{e}")
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable
class RetryMiddleware(AgentMiddleware):
def __init__(self, max_retries: int = 3):
super().__init__()
self.max_retries = max_retries
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
for attempt in range(self.max_retries):
try:
return handler(request)
except Exception as e:
if attempt == self.max_retries - 1:
raise
print(f"重试 {attempt + 1}/{self.max_retries},错误:{e}")
状态更新
节点式和包装式钩子都可以更新智能体状态。机制有所不同:
- 节点式钩子(
before_agent、before_model、after_model、after_agent):直接返回字典。字典通过图的归约器应用到智能体状态。
- 包装式钩子(
wrap_model_call、wrap_tool_call):对于模型调用,返回带有 Command 的 ExtendedModelResponse,以在模型响应旁注入状态更新。对于工具调用,直接返回 Command。当你需要基于模型或工具调用期间运行的逻辑来跟踪或更新状态时使用这些,例如摘要触发点、使用元数据或从请求或响应计算的自定义字段。
节点式钩子
从节点式钩子返回字典以将更新合并到智能体状态。字典键映射到状态字段。
from langchain.agents.middleware import after_model, AgentState
from langgraph.runtime import Runtime
from typing import Any
from typing_extensions import NotRequired
class TrackingState(AgentState):
model_call_count: NotRequired[int]
@after_model(state_schema=TrackingState)
def increment_after_model(state: TrackingState, runtime: Runtime) -> dict[str, Any] | None:
return {"model_call_count": state.get("model_call_count", 0) + 1}
包装式钩子
从 wrap_model_call 返回带有 Command 的 ExtendedModelResponse,以从模型调用层注入状态更新:
from typing import Callable
from langchain.agents.middleware import (
wrap_model_call,
ModelRequest,
ModelResponse,
AgentState,
ExtendedModelResponse
)
from langgraph.types import Command
from typing_extensions import NotRequired
class UsageTrackingState(AgentState):
"""带有 Token 用量跟踪的智能体状态。"""
last_model_call_tokens: NotRequired[int]
@wrap_model_call(state_schema=UsageTrackingState)
def track_usage(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ExtendedModelResponse:
response = handler(request)
return ExtendedModelResponse(
model_response=response,
command=Command(update={"last_model_call_tokens": 150}),
)
Command 通过图的归约器传递,因此更新会被正确应用,消息是累加的而不是替换现有状态。
多中间件组合
当多个中间件层返回 ExtendedModelResponse 时,它们的命令会组合:
- 命令通过归约器应用: 每个
Command 成为单独的状态更新。对于消息,这意味着它们是累加的。
- 外层优先于冲突: 对于非归约器状态字段,命令先从内层应用,再从外层应用。最外层中间件的值在冲突键上优先。
- 重试安全: 如果外层中间件实现了可能导致多次调用
handler() 的逻辑(例如重试逻辑),来自早期调用的命令将被丢弃。
from typing import Annotated, Callable
from langchain.agents.middleware import (
AgentMiddleware,
AgentState,
ExtendedModelResponse,
ModelRequest,
ModelResponse,
)
from langchain.messages import SystemMessage
from langgraph.types import Command
from typing_extensions import NotRequired
def _last_wins(_a: str, b: str) -> str:
"""归约器:最后写入者获胜(外层覆盖内层)。"""
return b
class CustomMiddlewareState(AgentState):
"""智能体状态:trace_layer 使用最后获胜(外层获胜),messages 使用累加归约器。"""
# 使用最后获胜的非归约器字段:两个中间件都写入;最外层的值获胜
trace_layer: NotRequired[Annotated[str, _last_wins]]
class OuterMiddleware(AgentMiddleware):
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ExtendedModelResponse:
response = handler(request)
return ExtendedModelResponse(
model_response=response,
command=Command(update={
"trace_layer": "outer",
"messages": [SystemMessage(content="[外层已运行]")],
}),
)
class InnerMiddleware(AgentMiddleware):
"""添加 trace_layer 和消息。外层添加到相同键;trace_layer:外层获胜,messages:累加。"""
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
):
response = handler(request)
return ExtendedModelResponse(
model_response=response,
command=Command(update={
"trace_layer": "inner",
"messages": [SystemMessage(content="[内层已运行]")],
}),
)
创建中间件
你可以通过两种方式创建中间件:
基于装饰器的中间件
快速简单,适用于单钩子中间件。使用装饰器包装单个函数。
基于类的中间件
更强大,适用于具有多个钩子或配置的复杂中间件。
基于装饰器的中间件
快速简单,适用于单钩子中间件。使用装饰器包装单个函数。
可用装饰器:
节点式:
包装式:
便捷:
示例:
from langchain.agents.middleware import (
before_model,
wrap_model_call,
AgentState,
ModelRequest,
ModelResponse,
)
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from typing import Any, Callable
@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"即将使用 {len(state['messages'])} 条消息调用模型")
return None
@wrap_model_call
def retry_model(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
for attempt in range(3):
try:
return handler(request)
except Exception as e:
if attempt == 2:
raise
print(f"重试 {attempt + 1}/3,错误:{e}")
agent = create_agent(
model="gpt-5.4",
middleware=[log_before_model, retry_model],
tools=[...],
)
何时使用装饰器:
基于类的中间件
更强大,适用于具有多个钩子或配置的复杂中间件。当你需要为同一个钩子定义同步和异步实现,或者想要在单个中间件中组合多个钩子时,使用类。
示例:
from langchain.agents.middleware import (
AgentMiddleware,
AgentState,
ModelRequest,
ModelResponse,
)
from langgraph.runtime import Runtime
from typing import Any, Callable
class LoggingMiddleware(AgentMiddleware):
def before_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"即将使用 {len(state['messages'])} 条消息调用模型")
return None
def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
print(f"模型返回:{state['messages'][-1].content}")
return None
async def abefore_model(
self, state: AgentState, runtime: Runtime
) -> dict[str, Any] | None:
# before_model 的异步版本
return None
async def aafter_model(
self, state: AgentState, runtime: Runtime
) -> dict[str, Any] | None:
# after_model 的异步版本
print(f"模型返回:{state['messages'][-1].content}")
return None
agent = create_agent(
model="gpt-5.4",
middleware=[LoggingMiddleware()],
tools=[...],
)
何时使用类:
- 为同一个钩子定义同步和异步实现
- 单个中间件需要多个钩子
- 需要复杂配置(例如可配置的阈值、自定义模型)
- 通过初始化时配置在项目间复用
自定义状态模式
如果你的中间件需要跨钩子跟踪状态,中间件可以用自定义属性扩展智能体的状态。这使中间件能够:
-
跨执行跟踪状态:维护在智能体整个执行生命周期中持久化的计数器、标志或其他值
-
在钩子之间共享数据:在
before_model 和 after_model 之间或不同中间件实例之间传递信息
-
实现横切关注点:添加诸如速率限制、用量跟踪、用户上下文或审计日志等功能,而无需修改核心智能体逻辑
-
做出条件决策:使用累积的状态来决定是否继续执行、跳转到不同节点或动态修改行为
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, before_model, after_model
from typing_extensions import NotRequired
from typing import Any
from langgraph.runtime import Runtime
class CustomState(AgentState):
model_call_count: NotRequired[int]
user_id: NotRequired[str]
@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
count = state.get("model_call_count", 0)
if count > 10:
return {"jump_to": "end"}
return None
@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
return {"model_call_count": state.get("model_call_count", 0) + 1}
agent = create_agent(
model="gpt-5.4",
middleware=[check_call_limit, increment_counter],
tools=[],
)
# 使用自定义状态调用
result = agent.invoke({
"messages": [HumanMessage("你好")],
"model_call_count": 0,
"user_id": "user-123",
})
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, AgentMiddleware
from typing_extensions import NotRequired
from typing import Any
class CustomState(AgentState):
model_call_count: NotRequired[int]
user_id: NotRequired[str]
class CallCounterMiddleware(AgentMiddleware[CustomState]):
state_schema = CustomState
def before_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
count = state.get("model_call_count", 0)
if count > 10:
return {"jump_to": "end"}
return None
def after_model(self, state: CustomState, runtime) -> dict[str, Any] | None:
return {"model_call_count": state.get("model_call_count", 0) + 1}
agent = create_agent(
model="gpt-5.4",
middleware=[CallCounterMiddleware()],
tools=[],
)
# 使用自定义状态调用
result = agent.invoke({
"messages": [HumanMessage("你好")],
"model_call_count": 0,
"user_id": "user-123",
})
执行顺序
使用多个中间件时,了解它们的执行方式:
agent = create_agent(
model="gpt-5.4",
middleware=[middleware1, middleware2, middleware3],
tools=[...],
)
Before 钩子按顺序运行:
middleware1.before_agent()
middleware2.before_agent()
middleware3.before_agent()
智能体循环开始
middleware1.before_model()
middleware2.before_model()
middleware3.before_model()
Wrap 钩子像函数调用一样嵌套:
middleware1.wrap_model_call() → middleware2.wrap_model_call() → middleware3.wrap_model_call() → 模型
After 钩子按逆序运行:
middleware3.after_model()
middleware2.after_model()
middleware1.after_model()
智能体循环结束
middleware3.after_agent()
middleware2.after_agent()
middleware1.after_agent()
关键规则:
before_* 钩子:从前到后
after_* 钩子:从后到前(逆序)
wrap_* 钩子:嵌套(第一个中间件包装所有其他的)
智能体跳转
要从中间件提前退出,返回包含 jump_to 的字典:
可用跳转目标:
'end':跳转到智能体执行的结尾(或第一个 after_agent 钩子)
'tools':跳转到工具节点
'model':跳转到模型节点(或第一个 before_model 钩子)
from langchain.agents.middleware import after_model, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any
@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
last_message = state["messages"][-1]
if "BLOCKED" in last_message.content:
return {
"messages": [AIMessage("我无法回应该请求。")],
"jump_to": "end"
}
return None
from langchain.agents.middleware import AgentMiddleware, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any
class BlockedContentMiddleware(AgentMiddleware):
@hook_config(can_jump_to=["end"])
def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
last_message = state["messages"][-1]
if "BLOCKED" in last_message.content:
return {
"messages": [AIMessage("我无法回应该请求。")],
"jump_to": "end"
}
return None
最佳实践
- 保持中间件专注 - 每个中间件应该只做好一件事
- 优雅地处理错误 - 不要让中间件错误导致智能体崩溃
- 使用适当的钩子类型:
- 节点式用于顺序逻辑(日志记录、验证)
- 包装式用于控制流(重试、回退、缓存)
- 清晰记录任何自定义状态属性
- 在集成之前独立地对中间件进行单元测试
- 考虑执行顺序 - 将关键中间件放在列表的前面
- 尽可能使用内置中间件
动态提示词
在运行时动态修改系统提示词,以在每次模型调用前注入上下文、用户特定的指令或其他信息。这是最常见的中间件用例之一。
使用 ModelRequest 上的 system_message 字段来读取和修改系统提示词。它包含一个 SystemMessage 对象(即使智能体是用字符串 system_prompt 创建的)。
from collections.abc import Callable
from langchain.agents.middleware import ModelRequest, ModelResponse, wrap_model_call
from langchain.messages import SystemMessage
@wrap_model_call
def add_context(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
new_content = list(request.system_message.content_blocks) + [
{"type": "text", "text": "Additional context."}
]
new_system_message = SystemMessage(content=new_content)
return handler(request.override(system_message=new_system_message))
from collections.abc import Callable
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
class ContextMiddleware(AgentMiddleware):
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
new_content = list(request.system_message.content_blocks) + [
{"type": "text", "text": "Additional context."}
]
new_system_message = SystemMessage(content=new_content)
return handler(request.override(system_message=new_system_message))
ModelRequest.system_message 始终是 SystemMessage 对象,即使智能体是用 system_prompt="string" 创建的
- 使用
SystemMessage.content_blocks 以块列表的形式访问内容,无论原始内容是字符串还是列表
- 修改系统消息时,使用
content_blocks 并追加新块以保留现有结构
- 你可以将
SystemMessage 对象直接传递给 create_agent 的 system_prompt 参数,用于缓存控制等高级用例
动态模型选择
from collections.abc import Callable
from langchain.agents.middleware import ModelRequest, ModelResponse, wrap_model_call
from langchain.chat_models import init_chat_model
complex_model = init_chat_model("claude-sonnet-4-6")
simple_model = init_chat_model("claude-haiku-4-5-20251001")
@wrap_model_call
def dynamic_model(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
if len(request.messages) > 10:
model = complex_model
else:
model = simple_model
return handler(request.override(model=model))
from collections.abc import Callable
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from langchain.chat_models import init_chat_model
complex_model = init_chat_model("claude-sonnet-4-6")
simple_model = init_chat_model("claude-haiku-4-5-20251001")
class DynamicModelMiddleware(AgentMiddleware):
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
if len(request.messages) > 10:
model = complex_model
else:
model = simple_model
return handler(request.override(model=model))
动态选择工具
在运行时选择相关工具以提高性能和准确性。本节介绍过滤预注册工具。要注册在运行时发现的工具(例如来自 MCP 服务器的工具),请参阅运行时工具注册。
优势:
- 更短的提示词 - 通过仅暴露相关工具来降低复杂性
- 更高的准确性 - 模型从更少的选项中更正确地选择
- 权限控制 - 根据用户访问权限动态过滤工具
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable
@wrap_model_call
def select_tools(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""根据状态/上下文选择相关工具的中间件。"""
# 根据状态/上下文选择一个小而相关的工具子集
relevant_tools = select_relevant_tools(request.state, request.runtime)
return handler(request.override(tools=relevant_tools))
agent = create_agent(
model="gpt-5.4",
tools=all_tools, # 所有可用工具需要预先注册
middleware=[select_tools],
)
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable
class ToolSelectorMiddleware(AgentMiddleware):
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
"""根据状态/上下文选择相关工具的中间件。"""
# 根据状态/上下文选择一个小而相关的工具子集
relevant_tools = select_relevant_tools(request.state, request.runtime)
return handler(request.override(tools=relevant_tools))
agent = create_agent(
model="gpt-5.4",
tools=all_tools, # 所有可用工具需要预先注册
middleware=[ToolSelectorMiddleware()],
)
工具调用监控
from collections.abc import Callable
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage
from langchain.tools.tool_node import ToolCallRequest
from langgraph.types import Command
@wrap_tool_call
def monitor_tool(
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
print(f"Executing tool: {request.tool_call['name']}")
print(f"Arguments: {request.tool_call['args']}")
try:
result = handler(request)
print("Tool completed successfully")
return result
except Exception as e:
print(f"Tool failed: {e}")
raise
from collections.abc import Callable
from langchain.agents.middleware import AgentMiddleware
from langchain.messages import ToolMessage
from langchain.tools.tool_node import ToolCallRequest
from langgraph.types import Command
class ToolMonitoringMiddleware(AgentMiddleware):
def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command:
print(f"Executing tool: {request.tool_call['name']}")
print(f"Arguments: {request.tool_call['args']}")
try:
result = handler(request)
print("Tool completed successfully")
return result
except Exception as e:
print(f"Tool failed: {e}")
raise
提示词缓存(Anthropic)
在使用 Anthropic 模型时,使用带缓存控制指令的结构化内容块来缓存大型系统提示词:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import SystemMessage
from typing import Callable
@wrap_model_call
def add_cached_context(
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
# 始终使用内容块
new_content = list(request.system_message.content_blocks) + [
{
"type": "text",
"text": "这是一份需要分析的大文档:\n\n<document>...</document>",
# 到此为止的内容已被缓存
"cache_control": {"type": "ephemeral"}
}
]
new_system_message = SystemMessage(content=new_content)
return handler(request.override(system_message=new_system_message))
from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from langchain.messages import SystemMessage
from typing import Callable
class CachedContextMiddleware(AgentMiddleware):
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
# 始终使用内容块
new_content = list(request.system_message.content_blocks) + [
{
"type": "text",
"text": "这是一份需要分析的大文档:\n\n<document>...</document>",
"cache_control": {"type": "ephemeral"} # 此内容将被缓存
}
]
new_system_message = SystemMessage(content=new_content)
return handler(request.override(system_message=new_system_message))
注意事项:
ModelRequest.system_message 始终是 SystemMessage 对象,即使智能体是用 system_prompt="string" 创建的
- 使用
SystemMessage.content_blocks 以块列表的形式访问内容,无论原始内容是字符串还是列表
- 修改系统消息时,使用
content_blocks 并追加新块以保留现有结构
- 你可以将
SystemMessage 对象直接传递给 create_agent 的 system_prompt 参数,用于缓存控制等高级用例
:::
其他资源
通过 MCP 连接这些文档到 Claude、VSCode 等,获取实时答案。