Skip to main content

Documentation Index

Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

中断允许你在特定点暂停图的执行,等待外部输入后再继续。这启用了需要外部输入才能继续的人机协作模式。当中断被触发时,LangGraph 使用其持久化层保存图状态,并无限期等待直到你恢复执行。 中断通过在图节点中的任意位置调用 interrupt() 函数来工作。该函数接受任何 JSON 可序列化的值,并将其呈现给调用者。当你准备继续时,使用 Command 重新调用图来恢复执行,Command 随后成为节点内 interrupt() 调用的返回值。 与静态断点(在特定节点之前或之后暂停)不同,中断是动态的:它们可以放置在代码的任何位置,并可以基于应用逻辑进行条件判断。
  • 检查点保持你的位置: 检查点器写入精确的图状态,因此你可以在之后恢复,即使处于错误状态。
  • thread_id 是你的指针: 设置 config={"configurable": {"thread_id": ...}} 来告诉检查点器要加载哪个状态。
  • 中断载荷通过 chunk["interrupts"] 呈现: 当使用 version="v2" 流式输出时,你传给 interrupt() 的值会出现在 values 流式部分的 interrupts 字段中,让你知道图正在等什么。
你选择的 thread_id 实际上是你的持久化游标。重复使用它会恢复同一检查点;使用新值会启动一个具有空状态的全新线程。

使用 interrupt 暂停

interrupt 函数暂停图执行并向调用者返回一个值。当你在节点中调用 interrupt 时,LangGraph 保存当前图状态并等待你使用输入恢复执行。 使用 interrupt 需要:
  1. 一个检查点器来持久化图状态(生产环境中使用持久化检查点器)
  2. 配置中的线程 ID,以便运行时知道从哪个状态恢复
  3. 在你想暂停的地方调用 interrupt()(载荷必须是 JSON 可序列化的)
from langgraph.types import interrupt

def approval_node(state: State):
    # 暂停并请求批准
    approved = interrupt("Do you approve this action?")

    # 恢复时,Command(resume=...) 在此返回该值
    return {"approved": approved}
当你调用 interrupt 时,会发生以下事情:
  1. 图执行被挂起——恰好在调用 interrupt 的位置
  2. 状态被保存——使用检查点器以便稍后恢复执行。在生产环境中,这应该是持久化检查点器(例如由数据库支持)
  3. 值被返回——给调用者,位于 __interrupt__ 下;可以是任何 JSON 可序列化值(字符串、对象、数组等)
  4. 图无限期等待——直到你使用响应恢复执行
  5. 响应被传回——恢复时传入节点,成为 interrupt() 调用的返回值

恢复中断

在中断暂停执行后,你通过使用包含恢复值的 Command 重新调用图来恢复它。恢复值被传回 interrupt 调用,允许节点使用外部输入继续执行。
from langgraph.types import Command

# 初始运行——遇到中断并暂停
# thread_id 是持久化指针(生产环境中存储稳定 ID)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config, version="v2")

# result 是具有 .value 和 .interrupts 的 GraphOutput
# .interrupts 包含传给 interrupt() 的载荷
print(result.interrupts)
# > (Interrupt(value='Do you approve this action?'),)

# 使用人类的响应恢复
# 恢复载荷成为节点内 interrupt() 的返回值
graph.invoke(Command(resume=True), config=config, version="v2")
关于恢复的要点:
  • 恢复时必须使用与中断发生时相同的线程 ID
  • 传给 Command(resume=...) 的值成为 interrupt 调用的返回值
  • 恢复时节点从调用 interrupt节点开头重新启动,因此 interrupt 之前的任何代码会再次运行
  • 可以传递任何 JSON 可序列化值作为恢复值
Command(resume=...)唯一用于 invoke()/stream() 输入的 Command 模式。其他 Command 参数(updategotograph)设计用于从节点函数返回。不要传递 Command(update=...) 作为输入来继续多轮对话——请传递普通输入字典。

常见模式

中断解锁的关键能力是暂停执行并等待外部输入。这适用于多种用例,包括:

使用人机协作(HITL)中断进行流式输出

构建具有人机协作工作流的交互式智能体时,你可以同时流式输出消息块和节点更新,在处理中断的同时提供实时反馈。 使用多种流模式("messages""updates")配合 subgraphs=True(如果存在子图)来:
  • 实时流式输出 AI 响应
  • 检测图何时遇到中断
  • 处理用户输入并无缝恢复执行
async for chunk in graph.astream(
    initial_input,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config=config,
    version="v2",
):
    if chunk["type"] == "messages":
        # 处理流式消息内容
        msg, _ = chunk["data"]
        if isinstance(msg, AIMessageChunk) and msg.content:
            display_streaming_content(msg.content)

    elif chunk["type"] == "updates":
        # 检查更新数据中的中断
        if "__interrupt__" in chunk["data"]:
            interrupt_info = chunk["data"]["__interrupt__"][0].value
            user_response = get_user_input(interrupt_info)
            initial_input = Command(resume=user_response)
            break
        else:
            current_node = list(chunk["data"].keys())[0]
  • version="v2":所有块都是具有 typensdata 键的 StreamPart 字典
  • chunk["type"]:根据流模式("messages""updates" 等)缩窄以进行类型推断
  • chunk["ns"]:标识源图(根图为空元组,子图时有填充)
  • subgraphs=True:嵌套图中的中断检测需要此选项
  • Command(resume=...):使用用户提供的数据恢复图执行

处理多个中断

当并行分支同时中断时(例如,扇出到多个节点,每个调用 interrupt()),你可能需要在单次调用中恢复多个中断。恢复多个中断时,将每个中断 ID 映射到其恢复值。这确保每个响应在运行时与正确的中断配对。
from typing import Annotated, TypedDict
import operator

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt


class State(TypedDict):
    vals: Annotated[list[str], operator.add]


def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}


def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}


graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# 第 1 步:调用——两个并行节点都触发 interrupt() 并暂停
interrupted_result = graph.invoke({"vals": []}, config)
print(interrupted_result)
"""
{
    'vals': [],
    '__interrupt__': [
        Interrupt(value='question_a', id='bd4f3183600f2c41dddafbf8f0f7be7b'),
        Interrupt(value='question_b', id='29963e3d3585f0cef025dd0f14323f55')
    ]
}
"""

# 第 2 步:一次性恢复所有待处理的中断
resume_map = {
    i.id: f"answer for {i.value}"
    for i in interrupted_result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)

print("最终状态:", result)
#> 最终状态: {'vals': ['a:answer for question_a', 'b:answer for question_b']}

批准或拒绝

中断最常见的用途之一是在关键操作前暂停并请求批准。例如,你可能想让人类批准 API 调用、数据库更改或其他重要决策。
from typing import Literal
from langgraph.types import interrupt, Command

def approval_node(state: State) -> Command[Literal["proceed", "cancel"]]:
    # 暂停执行;载荷显示在 result["__interrupt__"] 中
    is_approved = interrupt({
        "question": "Do you want to proceed with this action?",
        "details": state["action_details"]
    })

    # 根据响应路由
    if is_approved:
        return Command(goto="proceed")  # 提供恢复载荷后运行
    else:
        return Command(goto="cancel")
恢复图时,传递 True 批准或 False 拒绝:
# 批准
graph.invoke(Command(resume=True), config=config)

# 拒绝
graph.invoke(Command(resume=False), config=config)
from typing import Literal, Optional, TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ApprovalState(TypedDict):
    action_details: str
    status: Optional[Literal["pending", "approved", "rejected"]]


def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
    # 暴露详情以便调用者在 UI 中渲染
    decision = interrupt({
        "question": "Approve this action?",
        "details": state["action_details"],
    })

    # 恢复后路由到适当节点
    return Command(goto="proceed" if decision else "cancel")


def proceed_node(state: ApprovalState):
    return {"status": "approved"}


def cancel_node(state: ApprovalState):
    return {"status": "rejected"}


builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)

# 生产环境中使用更持久的检查点器
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "approval-123"}}
initial = graph.invoke(
    {"action_details": "Transfer $500", "status": "pending"},
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'question': ..., 'details': ...})]

# 使用决定恢复;True 路由到 proceed,False 路由到 cancel
resumed = graph.invoke(Command(resume=True), config=config)
print(resumed["status"])  # -> "approved"

审查和编辑状态

有时你想让人类在继续之前审查和编辑部分图状态。这对于纠正 LLM 输出、添加缺失信息或进行调整很有用。
from langgraph.types import interrupt

def review_node(state: State):
    # 暂停并显示当前内容以供审查(显示在 result["__interrupt__"] 中)
    edited_content = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"]
    })

    # 使用编辑后的版本更新状态
    return {"generated_text": edited_content}
恢复时,提供编辑后的内容:
graph.invoke(
    Command(resume="The edited and improved text"),  # 值成为 interrupt() 的返回值
    config=config
)
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class ReviewState(TypedDict):
    generated_text: str


def review_node(state: ReviewState):
    # 请求审阅者编辑生成的内容
    updated = interrupt({
        "instruction": "Review and edit this content",
        "content": state["generated_text"],
    })
    return {"generated_text": updated}


builder = StateGraph(ReviewState)
builder.add_node("review", review_node)
builder.add_edge(START, "review")
builder.add_edge("review", END)

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "review-42"}}
initial = graph.invoke({"generated_text": "Initial draft"}, config=config)
print(initial["__interrupt__"])  # -> [Interrupt(value={'instruction': ..., 'content': ...})]

# 使用审阅者编辑后的文本恢复
final_state = graph.invoke(
    Command(resume="Improved draft after review"),
    config=config,
)
print(final_state["generated_text"])  # -> "Improved draft after review"

工具中的中断

你也可以直接在工具函数中放置中断。这使工具本身在被调用时暂停以等待批准,并允许在执行前对工具调用进行人工审查和编辑。 首先,定义一个使用 interrupt 的工具:
from langchain.tools import tool
from langgraph.types import interrupt

@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # 发送前暂停;载荷显示在 result["__interrupt__"] 中
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?"
    })

    if response.get("action") == "approve":
        # 恢复值可以在执行前覆盖输入
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)
        return f"Email sent to {final_to} with subject '{final_subject}'"
    return "Email cancelled by user"
这种方式在你希望批准逻辑与工具本身一起存在时很有用,使其在图的不同部分中可重用。LLM 可以自然地调用工具,中断会在工具被调用时暂停执行,允许你批准、编辑或取消操作。
import sqlite3
from typing import TypedDict

from langchain.tools import tool
from langchain_anthropic import ChatAnthropic
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class AgentState(TypedDict):
    messages: list[dict]


@tool
def send_email(to: str, subject: str, body: str):
    """Send an email to a recipient."""

    # 发送前暂停;载荷显示在 result["__interrupt__"] 中
    response = interrupt({
        "action": "send_email",
        "to": to,
        "subject": subject,
        "body": body,
        "message": "Approve sending this email?",
    })

    if response.get("action") == "approve":
        final_to = response.get("to", to)
        final_subject = response.get("subject", subject)
        final_body = response.get("body", body)

        # 实际发送邮件(你的实现在此处)
        print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")
        return f"Email sent to {final_to}"

    return "Email cancelled by user"


model = ChatAnthropic(model="claude-sonnet-4-6").bind_tools([send_email])


def agent_node(state: AgentState):
    # LLM 可能决定调用工具;中断在发送前暂停
    result = model.invoke(state["messages"])
    return {"messages": state["messages"] + [result]}


builder = StateGraph(AgentState)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

checkpointer = SqliteSaver(sqlite3.connect("tool-approval.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "email-workflow"}}
initial = graph.invoke(
    {
        "messages": [
            {"role": "user", "content": "Send an email to alice@example.com about the meeting"}
        ]
    },
    config=config,
)
print(initial["__interrupt__"])  # -> [Interrupt(value={'action': 'send_email', ...})]

# 使用批准和可选编辑的参数恢复
resumed = graph.invoke(
    Command(resume={"action": "approve", "subject": "Updated subject"}),
    config=config,
)
print(resumed["messages"][-1])  # -> send_email 返回的工具结果

验证人类输入

有时你需要验证人类输入,如果无效则再次询问。可以在循环中使用多个 interrupt 调用来实现。
from langgraph.types import interrupt

def get_age_node(state: State):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # 载荷显示在 result["__interrupt__"] 中

        # 验证输入
        if isinstance(answer, int) and answer > 0:
            # 有效输入——继续
            break
        else:
            # 无效输入——使用更具体的提示再次询问
            prompt = f"'{answer}' is not a valid age. Please enter a positive number."

    return {"age": answer}
每次使用无效输入恢复图时,它会以更清晰的消息再次询问。一旦提供有效输入,节点完成,图继续执行。
import sqlite3
from typing import TypedDict

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt


class FormState(TypedDict):
    age: int | None


def get_age_node(state: FormState):
    prompt = "What is your age?"

    while True:
        answer = interrupt(prompt)  # 载荷显示在 result["__interrupt__"] 中

        if isinstance(answer, int) and answer > 0:
            return {"age": answer}

        prompt = f"'{answer}' is not a valid age. Please enter a positive number."


builder = StateGraph(FormState)
builder.add_node("collect_age", get_age_node)
builder.add_edge(START, "collect_age")
builder.add_edge("collect_age", END)

checkpointer = SqliteSaver(sqlite3.connect("forms.db"))
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config=config)
print(first["__interrupt__"])  # -> [Interrupt(value='What is your age?', ...)]

# 提供无效数据;节点重新提示
retry = graph.invoke(Command(resume="thirty"), config=config)
print(retry["__interrupt__"])  # -> [Interrupt(value="'thirty' is not a valid age...", ...)]

# 提供有效数据;循环退出,状态更新
final = graph.invoke(Command(resume=30), config=config)
print(final["age"])  # -> 30

中断规则

当你在节点中调用 interrupt 时,LangGraph 通过抛出异常来挂起执行,该异常信号运行时暂停。此异常通过调用栈向上传播并被运行时捕获,通知图保存当前状态并等待外部输入。 当执行恢复时(在你提供请求的输入后),运行时从节点的开头重新启动整个节点——不会从调用 interrupt 的确切行恢复。这意味着 interrupt 之前运行的任何代码都会再次执行。因此,使用中断时需要遵循一些重要规则以确保行为符合预期。

不要将 interrupt 调用包装在 try/except 中

interrupt 通过抛出特殊异常来在调用点暂停执行。如果你将 interrupt 调用包装在 try/except 块中,你会捕获此异常,中断将不会传回图。
  • ✅ 将 interrupt 调用与容易出错的代码分开
  • ✅ 在 try/except 块中使用特定异常类型
def node_a(state: State):
    # ✅ 好的做法:先中断,然后单独处理
    # 错误条件
    interrupt("What's your name?")
    try:
        fetch_data()  # 这可能失败
    except Exception as e:
        print(e)
    return state
  • 🔴 不要将 interrupt 调用包装在裸的 try/except 块中
def node_a(state: State):
    # ❌ 不好的做法:将 interrupt 包装在裸的 try/except 中
    # 会捕获中断异常
    try:
        interrupt("What's your name?")
    except Exception as e:
        print(e)
    return state

不要在节点内重新排序 interrupt 调用

在单个节点中使用多个中断很常见,但如果处理不当可能导致意外行为。 当节点包含多个 interrupt 调用时,LangGraph 维护一个特定于执行该节点的任务的恢复值列表。每当执行恢复时,它从节点开头开始。对于遇到的每个 interrupt,LangGraph 检查任务恢复列表中是否存在匹配值。匹配是严格基于索引的,因此节点内 interrupt 调用的顺序很重要。
  • ✅ 保持 interrupt 调用在节点执行间保持一致
def node_a(state: State):
    # ✅ 好的做法:interrupt 调用每次都以相同顺序发生
    name = interrupt("What's your name?")
    age = interrupt("What's your age?")
    city = interrupt("What's your city?")

    return {
        "name": name,
        "age": age,
        "city": city
    }
  • 🔴 不要在节点内有条件地跳过 interrupt 调用
  • 🔴 不要使用在执行间不确定的逻辑循环 interrupt 调用
def node_a(state: State):
    # ❌ 不好的做法:有条件地跳过中断会改变顺序
    name = interrupt("What's your name?")

    # 第一次运行可能跳过此中断
    # 恢复时可能不跳过——导致索引不匹配
    if state.get("needs_age"):
        age = interrupt("What's your age?")

    city = interrupt("What's your city?")

    return {"name": name, "city": city}

不要在 interrupt 调用中返回复杂值

根据使用的检查点器,复杂值可能无法序列化(例如无法序列化函数)。为了使你的图适应任何部署,最佳实践是只使用可合理序列化的值。
  • ✅ 向 interrupt 传递简单的 JSON 可序列化类型
  • ✅ 传递具有简单值的字典/对象
def node_a(state: State):
    # ✅ 好的做法:传递可序列化的简单类型
    name = interrupt("What's your name?")
    count = interrupt(42)
    approved = interrupt(True)

    return {"name": name, "count": count, "approved": approved}
  • 🔴 不要向 interrupt 传递函数、类实例或其他复杂对象
def validate_input(value):
    return len(value) > 0

def node_a(state: State):
    # ❌ 不好的做法:向 interrupt 传递函数
    # 函数无法被序列化
    response = interrupt({
        "question": "What's your name?",
        "validator": validate_input  # 这会失败
    })
    return {"name": response}

interrupt 之前调用的副作用必须是幂等的

因为中断通过重新运行所在节点来工作,在 interrupt 之前调用的副作用(理想情况下)应该是幂等的。幂等性意味着同一操作可以多次应用而不会改变初始执行之外的结果。 例如,你可能在节点内有更新记录的 API 调用。如果在该调用完成后调用 interrupt,当节点恢复时它会被多次重新运行,可能覆盖初始更新或创建重复记录。
  • ✅ 在 interrupt 之前使用幂等操作
  • ✅ 将副作用放在 interrupt 调用之后
  • ✅ 尽可能将副作用分离到单独的节点中
def node_a(state: State):
    # ✅ 好的做法:使用 upsert 操作,它是幂等的
    # 多次运行会有相同结果
    db.upsert_user(
        user_id=state["user_id"],
        status="pending_approval"
    )

    approved = interrupt("Approve this change?")

    return {"approved": approved}
  • 🔴 不要在 interrupt 之前执行非幂等操作
  • 🔴 不要在不检查记录是否存在的情况下创建新记录
def node_a(state: State):
    # ❌ 不好的做法:在中断前创建新记录
    # 每次恢复都会创建重复记录
    audit_id = db.create_audit_log({
        "user_id": state["user_id"],
        "action": "pending_approval",
        "timestamp": datetime.now()
    })

    approved = interrupt("Approve this change?")

    return {"approved": approved, "audit_id": audit_id}

与作为函数调用的子图一起使用

在节点内调用子图时,父图将从调用子图并触发 interrupt节点开头恢复执行。同样,子图也将从调用 interrupt 的节点开头恢复。
def node_in_parent_graph(state: State):
    some_code()  # <-- 恢复时会重新执行
    # 将子图作为函数调用
    # 子图包含一个 `interrupt` 调用
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # <-- 恢复时也会重新执行
    result = interrupt("What's your name?")
    # ...

使用中断进行调试

要调试和测试图,你可以使用静态中断作为断点,逐节点步进图的执行。静态中断在节点执行之前或之后的定义点触发。你可以在编译图时通过指定 interrupt_beforeinterrupt_after 来设置。
静态中断推荐用于人机协作工作流。请改用 interrupt 函数。
graph = builder.compile(
    interrupt_before=["node_a"],
    interrupt_after=["node_b", "node_c"],
    checkpointer=checkpointer,
)

# 向图传递线程 ID
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# 运行图直到断点
graph.invoke(inputs, config=config)

# 恢复图
graph.invoke(None, config=config)
  1. 断点在 compile 时设置。
  2. interrupt_before 指定在节点执行前暂停的节点。
  3. interrupt_after 指定在节点执行后暂停的节点。
  4. 需要检查点器来启用断点。
  5. 图运行直到第一个断点被触发。
  6. 通过传入 None 作为输入来恢复图。将运行直到下一个断点被触发。
要调试你的中断,请使用 LangSmith

使用 LangSmith Studio

你可以使用 LangSmith Studio 在运行图之前在 UI 中设置静态中断。你还可以使用 UI 在执行的任何时间点检查图状态。 image