Skip to main content

Documentation Index

Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Functional API 让你可以用最少的代码改动将 LangGraph 的核心功能(持久化记忆人机协作流式输出)添加到你的现有应用中。
有关 Functional API 的概念信息,请参阅 Functional API

创建简单工作流

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = inputs["value"]
    another_value = inputs["another_value"]
    ...

my_workflow.invoke({"value": 1, "another_value": 2})
from langchain_core.utils.uuid import uuid7
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 检查一个数是否为偶数的任务
@task
def is_even(number: int) -> bool:
    return number % 2 == 0

# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
    return "The number is even." if is_even else "The number is odd."

# 创建持久化检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    """分类一个数字的简单工作流。"""
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

# 使用唯一线程 ID 运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
此示例演示如何在语法上使用 @task@entrypoint 装饰器。由于提供了检查点器,工作流结果将持久化存储在检查点器中。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

model = init_chat_model('gpt-3.5-turbo')

# 任务:使用 LLM 生成文章
@task
def compose_essay(topic: str) -> str:
    """生成关于给定主题的文章。"""
    return model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes essays."},
        {"role": "user", "content": f"Write an essay about {topic}."}
    ]).content

# 创建持久化检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
    """使用 LLM 生成文章的简单工作流。"""
    return compose_essay(topic).result()

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke("the history of flight", config=config)
print(result)

并行执行

任务可以通过并发调用并等待结果来并行执行。这对于提高 IO 密集型任务(例如调用 LLM API)的性能非常有用。
@task
def add_one(number: int) -> int:
    return number + 1

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]
此示例演示如何使用 @task 并行运行多个 LLM 调用。每个调用生成一个关于不同主题的段落,结果合并成一个文本输出。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 初始化 LLM 模型
model = init_chat_model("gpt-3.5-turbo")

# 生成关于给定主题的段落的任务
@task
def generate_paragraph(topic: str) -> str:
    response = model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
        {"role": "user", "content": f"Write a paragraph about {topic}."}
    ])
    return response.content

# 创建持久化检查点器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    """并行生成多个段落并合并它们。"""
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
此示例使用 LangGraph 的并发模型来提高执行时间,特别是当任务涉及 I/O(如 LLM 补全)时。

调用图

Functional APIGraph API 可以在同一应用中一起使用,因为它们共享相同的底层运行时。
from langgraph.func import entrypoint
from langgraph.graph import StateGraph

builder = StateGraph()
...
some_graph = builder.compile()

@entrypoint()
def some_workflow(some_input: dict) -> int:
    # 调用使用 Graph API 定义的图
    result_1 = some_graph.invoke(...)
    # 调用另一个使用 Graph API 定义的图
    result_2 = another_graph.invoke(...)
    return {
        "result_1": result_1,
        "result_2": result_2
    }
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

# 定义共享状态类型
class State(TypedDict):
    foo: int

# 定义一个简单的转换节点
def double(state: State) -> State:
    return {"foo": state["foo"] * 2}

# 使用 Graph API 构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()

# 定义 Functional API 工作流
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
    result = graph.invoke({"foo": x})
    return {"bar": result["foo"]}

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(workflow.invoke(5, config=config))  # 输出:{'bar': 10}

调用其他入口点

你可以从入口点任务内部调用其他入口点
@entrypoint() # 将自动使用父入口点的检查点器
def some_other_workflow(inputs: dict) -> int:
    return inputs["value"]

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = some_other_workflow.invoke({"value": 1})
    return value
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

# 初始化检查点器
checkpointer = InMemorySaver()

# 一个可复用的子工作流,用于乘法运算
@entrypoint()
def multiply(inputs: dict) -> int:
    return inputs["a"] * inputs["b"]

# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
    result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
    return {"product": result}

# 执行主工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(main.invoke({"x": 6, "y": 7}, config=config))  # 输出:{'product': 42}

流式输出

Functional API 使用与 Graph API 相同的流式输出机制。请阅读流式输出指南了解更多详情。 使用流式 API 同时流式输出更新和自定义数据的示例。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer   

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer()
    writer("Started processing")
    result = inputs["x"] * 2
    writer(f"Result is {result}")
    return result

config = {"configurable": {"thread_id": "abc"}}

for mode, chunk in main.stream(
    {"x": 5},
    stream_mode=["custom", "updates"],
    config=config
):
    print(f"{mode}: {chunk}")
  1. langgraph.config 导入 get_stream_writer
  2. 在入口点内获取流写入器实例。
  3. 在计算开始前发出自定义数据。
  4. 在计算结果后发出另一条自定义消息。
  5. 使用 .stream() 处理流式输出。
  6. 指定使用哪些流模式。
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
Python < 3.11 的异步 如果使用 Python < 3.11 并编写异步代码,使用 get_stream_writer 将不起作用。请直接使用 StreamWriter 类。详见 Python < 3.11 的异步
from langgraph.types import StreamWriter

@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:
...

重试策略

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy

# 此变量仅用于演示目的,模拟网络故障。
# 这不是你在实际代码中会有的东西。
attempts = 0

# 让我们配置 RetryPolicy 在 ValueError 时重试。
# 默认的 RetryPolicy 针对特定网络错误进行了优化重试。
retry_policy = RetryPolicy(retry_on=ValueError)

@task(retry_policy=retry_policy)
def get_info():
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError('Failure')
    return "OK"

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
    return get_info().result()

config = {
    "configurable": {
        "thread_id": "1"
    }
}

main.invoke({'any_input': 'foobar'}, config=config)
'OK'

设置任务和入口点超时

使用 @task@entrypointtimeout 参数来限制单个异步尝试可以运行的时间。以秒为单位或 datetime.timedelta 提供超时。
import asyncio

from langgraph.errors import NodeTimeoutError
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy


@task(
    timeout=1.0,
    retry_policy=RetryPolicy(retry_on=NodeTimeoutError),
)
async def call_api(url: str) -> str:
    await asyncio.sleep(2)
    return f"result from {url}"


@entrypoint(timeout=5.0)
async def workflow(inputs: dict) -> str:
    return await call_api(inputs["url"])


try:
    await workflow.ainvoke({"url": "https://example.com"})
except NodeTimeoutError:
    print("Task timed out")
超时仅支持异步任务和入口点。如果你在同步函数上设置 timeout,LangGraph 会在声明任务或入口点时抛出错误。 当任务或入口点超过超时时间时,LangGraph 会抛出 NodeTimeoutError(继承自 Python 内置的 TimeoutError)。如果重试策略配置为重试 TimeoutErrorNodeTimeoutError,超时的尝试会被重试。超时独立应用于每次尝试,因此计时器会在每次重试时重置。

缓存任务

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy


@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2


@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()
    return {"result1": result1, "result2": result2}


for chunk in main.stream({"x": 5}, stream_mode="updates"):
    print(chunk)

#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
  1. ttl 以秒为单位指定。缓存将在此时间后失效。

错误后恢复

import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter

# 此变量仅用于演示目的,模拟网络故障。
# 这不是你在实际代码中会有的东西。
attempts = 0

@task()
def get_info():
    """
    模拟一个在成功前失败一次的任务。
    第一次尝试时抛出异常,后续尝试返回 "OK"。
    """
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError("Failure")  # 在第一次尝试时模拟失败
    return "OK"

# 初始化内存检查点器用于持久化
checkpointer = InMemorySaver()

@task
def slow_task():
    """
    通过引入 1 秒延迟来模拟慢运行任务。
    """
    time.sleep(1)
    return "Ran slow task."

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
    """
    主工作流函数,顺序运行 slow_task 和 get_info 任务。

    参数:
    - inputs:包含工作流输入值的字典。
    - writer:用于流式自定义数据的 StreamWriter。

    工作流首先执行 `slow_task`,然后尝试执行 `get_info`,
    该任务在第一次调用时会失败。
    """
    slow_task_result = slow_task().result()  # 阻塞调用 slow_task
    get_info().result()  # 第一次尝试时此处会抛出异常
    return slow_task_result

# 带有唯一线程标识符的工作流执行配置
config = {
    "configurable": {
        "thread_id": "1"  # 用于跟踪工作流执行的唯一标识符
    }
}

# 此调用由于 slow_task 执行将耗时约 1 秒
try:
    # 第一次调用会因 `get_info` 任务失败而抛出异常
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass  # 优雅地处理失败
当我们恢复执行时,不需要重新运行 slow_task,因为其结果已保存在检查点中。
main.invoke(None, config=config)
'Ran slow task.'

人机协作

Functional API 支持使用 interrupt 函数和 Command 原语的人机协作工作流。

基本人机协作工作流

我们将创建三个任务
  1. 追加 "bar"
  2. 暂停等待人工输入。恢复时追加人工输入。
  3. 追加 "qux"
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """追加 bar。"""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """追加用户输入。"""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """追加 qux。"""
    return f"{input_query} qux"
我们现在可以将这些任务组合在一个入口点中:
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3
interrupt() 在任务内部调用,使人类能够审核和编辑前一个任务的输出。先前任务的结果——在此例中是 step_1——会被持久化,因此在 interrupt 之后不会再次运行。 让我们发送一个查询字符串:
config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
注意我们已在 step_1 之后通过 interrupt 暂停。中断提供了恢复运行的说明。要恢复,我们发出一个包含 human_feedback 任务所期望数据的 Command
# 继续执行
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
恢复后,运行会继续通过剩余步骤并按预期终止。

审核工具调用

要在执行前审核工具调用,我们添加一个调用 interruptreview_tool_call 函数。当调用此函数时,执行将暂停直到我们发出恢复命令。 给定一个工具调用,我们的函数将 interrupt 以进行人工审核。此时我们可以:
  • 接受工具调用
  • 修改工具调用并继续
  • 生成自定义工具消息(例如指示模型重新格式化其工具调用)
from typing import Union

def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
    """审核工具调用,返回验证后的版本。"""
    human_review = interrupt(
        {
            "question": "Is this correct?",
            "tool_call": tool_call,
        }
    )
    review_action = human_review["action"]
    review_data = human_review.get("data")
    if review_action == "continue":
        return tool_call
    elif review_action == "update":
        updated_tool_call = {**tool_call, **{"args": review_data}}
        return updated_tool_call
    elif review_action == "feedback":
        return ToolMessage(
            content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
        )
我们现在可以更新我们的入口点来审核生成的工具调用。如果工具调用被接受或修改,我们像之前一样执行。否则,我们只追加人工提供的 ToolMessage。先前任务的结果——在此例中是初始模型调用——会被持久化,因此在 interrupt 之后不会再次运行。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt


checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    model_response = call_model(messages).result()
    while True:
        if not model_response.tool_calls:
            break

        # 审核工具调用
        tool_results = []
        tool_calls = []
        for i, tool_call in enumerate(model_response.tool_calls):
            review = review_tool_call(tool_call)
            if isinstance(review, ToolMessage):
                tool_results.append(review)
            else:  # 是验证过的工具调用
                tool_calls.append(review)
                if review != tool_call:
                    model_response.tool_calls[i] = review  # 更新消息

        # 执行剩余的工具调用
        tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
        remaining_tool_results = [fut.result() for fut in tool_result_futures]

        # 追加到消息列表
        messages = add_messages(
            messages,
            [model_response, *tool_results, *remaining_tool_results],
        )

        # 再次调用模型
        model_response = call_model(messages).result()

    # 生成最终响应
    messages = add_messages(messages, model_response)
    return entrypoint.final(value=model_response, save=messages)

短期记忆

短期记忆允许在同一线程 ID 的不同调用之间存储信息。详见短期记忆

管理检查点

你可以查看和删除检查点器存储的信息。

查看线程状态

config = {
    "configurable": {
        "thread_id": "1",
        # 可选地提供特定检查点的 ID,
        # 否则显示最新的检查点
        # "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"  #

    }
}
graph.get_state(config)
StateSnapshot(
    values={'messages': [...]}, next=(),
    config={...},
    metadata={...},
    created_at='2025-05-05T16:01:24.680462+00:00',
    parent_config={...},
    tasks=(),
    interrupts=()
)

查看线程历史

config = {
    "configurable": {
        "thread_id": "1"
    }
}
list(graph.get_state_history(config))
[
    StateSnapshot(values={...}, next=(), config={...}, ...),
    StateSnapshot(values={...}, next=('call_model',), config={...}, ...),
    ...
]

解耦返回值和保存值

使用 entrypoint.final 将返回给调用者的值与持久化到检查点中的值解耦。这在以下情况下很有用:
  • 你想返回一个计算结果(例如摘要或状态),但保存一个不同的内部值供下次调用时使用。
  • 你需要控制下次运行时传递给 previous 参数的内容。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    # 将*先前*的值返回给调用者,但将*新*的总计保存到检查点。
    return entrypoint.final(value=previous, save=total)

config = {"configurable": {"thread_id": "my-thread"}}

print(accumulate.invoke(1, config=config))  # 0
print(accumulate.invoke(2, config=config))  # 1
print(accumulate.invoke(3, config=config))  # 3

聊天机器人示例

使用 Functional API 和 InMemorySaver 检查点器的简单聊天机器人示例。 该机器人能够记住之前的对话并从中断处继续。
from langchain.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-sonnet-4-6")

@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

长期记忆

长期记忆允许跨不同线程 ID 存储信息。这对于在一个对话中学习某个用户的信息并在另一个对话中使用非常有用。

工作流

与其他库集成