Documentation Index
Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
持久执行是一种技术,在这种技术中,进程或工作流在关键节点保存其进度,使其能够暂停并在稍后从中断处精确恢复。这在需要人机协作的场景中特别有用——用户可以在继续之前检查、验证或修改流程,以及在可能遇到中断或错误(例如,LLM 调用超时)的长时间运行任务中。通过保留已完成的工作,持久执行使进程能够在不重新处理先前步骤的情况下恢复——即使在很长的延迟之后(例如,一周后)。
LangGraph 内置的持久化层为工作流提供持久执行,确保每个执行步骤的状态被保存到持久化存储中。此功能保证如果工作流被中断——无论是由于系统故障还是人机协作交互——它都可以从最后记录的状态恢复。
前提条件
要在 LangGraph 中利用持久执行,你需要:
-
通过指定一个检查点器来在工作流中启用持久化,它将保存工作流进度。
-
在执行工作流时指定线程标识符。这将跟踪工作流特定实例的执行历史。
-
将任何非确定性操作(例如随机数生成)或具有副作用的操作(例如文件写入、API 调用)包装在 task 中,以确保工作流恢复时,这些操作不会针对特定运行重复执行,而是从持久化层检索其结果。更多信息请参阅确定性和一致性重放。
确定性和一致性重放
当你恢复工作流运行时,代码不会从执行停止的同一行代码处恢复;相反,它会确定一个适当的起点来接续之前的工作。这意味着工作流将从起点重放所有步骤,直到它到达停止的位置。
因此,当你为持久执行编写工作流时,必须将任何非确定性操作(例如随机数生成)和任何具有副作用的操作(例如文件写入、API 调用)包装在 task 或节点中。
为了确保你的工作流是确定性的并且可以一致地重放,请遵循以下准则:
- 避免重复工作:如果一个节点包含多个具有副作用的操作(例如日志记录、文件写入或网络调用),请将每个操作包装在单独的 task 中。这确保工作流恢复时,这些操作不会重复执行,而是从持久化层检索其结果。
- 封装非确定性操作: 将任何可能产生非确定性结果的代码(例如随机数生成)包装在 task 或节点中。这确保在恢复时,工作流以相同的结果遵循精确记录的步骤序列。
- 使用幂等操作:尽可能确保副作用(例如 API 调用、文件写入)是幂等的。这意味着如果操作在工作流中失败后重试,它将产生与首次执行相同的效果。这对于导致数据写入的操作尤其重要。如果 task 启动但未能成功完成,工作流的恢复将重新运行该 task,依赖记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保工作流执行顺畅且可预测。
有关需要避免的陷阱示例,请参阅 Functional API 中的常见陷阱部分,其中展示了如何使用 task 来构建代码以避免这些问题。相同的原则适用于 StateGraph(Graph API)。
持久化模式
LangGraph 支持三种持久化模式,允许你根据应用需求在性能和数据一致性之间取得平衡。更高的持久化模式会为工作流执行增加更多开销。你可以在调用任何图执行方法时指定持久化模式:
graph.stream(
{"input": "test"},
durability="sync"
)
持久化模式从最低到最高持久性排列如下:
"exit":LangGraph 仅在图执行退出时(无论是成功、错误还是由于人机协作中断)持久化更改。这为长时间运行的图提供最佳性能,但意味着中间状态不会保存,因此你无法从执行过程中发生的系统故障(如进程崩溃)中恢复。
"async":LangGraph 在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但存在较小的风险——如果进程在执行期间崩溃,LangGraph 可能不会写入检查点。
"sync":LangGraph 在下一步开始之前同步持久化更改。这确保 LangGraph 在继续执行之前写入每个检查点,以一定的性能开销为代价提供高持久性。
在节点中使用 task
如果一个节点包含多个操作,你可能会发现将每个操作转换为 task 比将操作重构为单独的节点更容易。
from typing import NotRequired
from typing_extensions import TypedDict
from langchain_core.utils.uuid import uuid7
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests
# 定义一个 TypedDict 来表示状态
class State(TypedDict):
url: str
result: NotRequired[str]
def call_api(state: State):
"""发起 API 请求的示例节点。"""
result = requests.get(state['url']).text[:100] # 副作用 #
return {
"result": result
}
# 创建 StateGraph 构建器并为 call_api 函数添加节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# 将起始节点和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# 指定检查点器
checkpointer = InMemorySaver()
# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)
# 定义一个带有线程 ID 的配置。
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
# 调用图
graph.invoke({"url": "https://www.example.com"}, config)
from typing import NotRequired
from typing_extensions import TypedDict
from langchain_core.utils.uuid import uuid7
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import task
from langgraph.graph import StateGraph, START, END
import requests
# 定义一个 TypedDict 来表示状态
class State(TypedDict):
urls: list[str]
result: NotRequired[list[str]]
@task
def _make_request(url: str):
"""发起请求。"""
return requests.get(url).text[:100]
def call_api(state: State):
"""发起 API 请求的示例节点。"""
requests = [_make_request(url) for url in state['urls']]
results = [request.result() for request in requests]
return {
"results": results
}
# 创建 StateGraph 构建器并为 call_api 函数添加节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)
# 将起始节点和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)
# 指定检查点器
checkpointer = InMemorySaver()
# 使用检查点器编译图
graph = builder.compile(checkpointer=checkpointer)
# 定义一个带有线程 ID 的配置。
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
# 调用图
graph.invoke({"urls": ["https://www.example.com"]}, config)
恢复工作流
一旦你在工作流中启用了持久执行,你可以在以下场景中恢复执行:
- 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,使用
Command 原语以更新的状态恢复它。详见中断。
- 从故障中恢复: 在异常(例如 LLM 提供商故障)后从最后一个成功检查点自动恢复工作流。这涉及使用相同的线程标识符执行工作流,并提供
None 作为输入值(参见使用 Functional API 的示例)。
恢复工作流的起点
优雅关闭
优雅关闭允许你协作地停止正在进行的图运行——在当前超步完成之后——并保存一个可恢复的检查点。这对于处理 SIGTERM 信号或任何需要在不丢失工作的情况下回收资源的外部监督程序非常有用。
创建一个 RunControl 并将其作为 control= 传递给 invoke 或 stream。从任何线程调用 request_drain() 来发出运行应该停止的信号:
from langgraph.runtime import RunControl
from langgraph.errors import GraphDrained
control = RunControl()
# 在信号处理器或监督程序中:
# control.request_drain("sigterm")
try:
result = graph.invoke(inputs, config, control=control)
except GraphDrained as e:
# 图提前停止并保存了检查点。
# 稍后使用相同的配置恢复。
print(f"已排空:{e.reason}")
排空是协作式的,在超步之间操作,永远不会抢占已经在运行的工作:
| 场景 | 行为 |
|---|
| 节点正在执行中 | 运行到完成。排空在下一个超步生效。 |
| 节点具有重试策略且当前正在重试 | 重试循环运行到耗尽或成功。排空在之后生效。 |
| 图在与排空相同的时刻自然完成 | 正常返回。检查 control.drain_requested 以区分正常运行。 |
| 还有更多超步 | 抛出 GraphDrained(reason)。检查点已保存且可恢复。 |
| 子图请求排空 | GraphDrained 通过父图向上冒泡,并在其自己的下一个超步边界处停止。 |
排空后恢复
使用相同的 thread_id 通过 invoke(None, config) 恢复已排空的运行:
result = graph.invoke(None, config)
在节点内读取排空状态
通过 runtime 参数访问排空状态,以在超步边界到达之前调整节点行为:
from langgraph.runtime import Runtime
async def my_node(state: State, runtime: Runtime) -> State:
if runtime.drain_requested:
# 跳过昂贵的工作,返回最小结果
return {"status": "skipped", "reason": runtime.drain_reason}
return {"status": await do_work()}
SIGTERM 钩子模式
处理进程关闭的推荐模式:
import signal
from langgraph.runtime import RunControl
from langgraph.errors import GraphDrained
control = RunControl()
signal.signal(signal.SIGTERM, lambda *_: control.request_drain("sigterm"))
try:
result = graph.invoke(inputs, config, control=control)
except GraphDrained as e:
log.info("图已排空:%s", e.reason)
# 在下次启动时使用相同的配置恢复
request_drain() 不会取消正在运行的 asyncio 任务或终止线程。如需硬性上限,请将排空与优雅超时和任务取消配合使用。
将这些文档连接到 Claude、VSCode 等工具,通过 MCP 获取实时答案。