Skip to main content

Documentation Index

Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

函数式 API 允许你以最少的代码改动将 LangGraph 的关键功能(持久化记忆人机协作流式输出)添加到你的应用程序中。 它旨在将这些功能集成到可能使用标准语言原语(如 if 语句、for 循环和函数调用)进行分支和控制流的现有代码中。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,函数式 API 允许你在不强制使用刚性执行模型的情况下引入这些能力。 函数式 API 使用两个关键构建块:
  • @entrypoint:将函数标记为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • @task:表示一个离散的工作单元,如 API 调用或数据处理步骤,可以在入口点内异步执行。任务返回一个类似 future 的对象,可以被等待或同步解析。
这提供了一个用于构建具有状态管理和流式输出的工作流的最小抽象层。
有关如何使用函数式 API 的信息,请参阅使用函数式 API

函数式 API 与图 API

对于更偏好声明式方法的用户,LangGraph 的图 API 允许你使用图范式定义工作流。两种 API 共享相同的底层运行时,因此你可以在同一应用程序中同时使用它们。 以下是一些关键区别:
  • 控制流:函数式 API 不需要考虑图结构。你可以使用标准 Python 构造来定义工作流。这通常会减少你需要编写的代码量。
  • 短期记忆图 API 需要声明状态,并可能需要定义 reducer 来管理对图状态的更新。@entrypoint@task 不需要显式的状态管理,因为它们的状态作用域限于函数,不在函数间共享。
  • 检查点:两种 API 都生成和使用检查点。在图 API 中,每个超级步后生成新的检查点。在函数式 API 中,当任务执行时,其结果保存到与给定入口点关联的现有检查点中,而不是创建新的检查点。
  • 可视化:图 API 使将工作流可视化为图变得容易,这对调试、理解工作流和与他人分享很有用。函数式 API 不支持可视化,因为图是在运行时动态生成的。

示例

下面我们演示一个简单的应用程序,它撰写一篇文章并中断以请求人工审核。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt

@task
def write_essay(topic: str) -> str:
    """撰写一篇关于给定主题的文章。"""
    time.sleep(1) # 模拟长时间运行任务的占位符。
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的工作流,撰写文章并请求审核。"""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # 作为参数提供给 interrupt 的任何可 JSON 序列化的载荷。
        # 它将在客户端作为 Interrupt 在流式传输工作流数据时浮现。
        "essay": essay, # 我们希望被审核的文章。
        # 我们可以添加任何需要的附加信息。
        # 例如,引入一个名为 "action" 的键并附带一些指令。
        "action": "Please approve/reject the essay",
    })

    return {
        "essay": essay, # 生成的文章
        "is_approved": is_approved, # 来自人机协作的响应
    }
此工作流将撰写一篇关于”cat”主题的文章,然后暂停等待人工审核。工作流可以无限期中断,直到提供审核。当工作流恢复时,它从最开始执行,但由于 writeEssay 任务的结果已经保存,任务结果将从检查点加载而不是重新计算。
import time
from langchain_core.utils.uuid import uuid7
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver


@task
def write_essay(topic: str) -> str:
    """撰写一篇关于给定主题的文章。"""
    time.sleep(1)  # 这是模拟长时间运行任务的占位符。
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的工作流,撰写文章并请求审核。"""
    essay = write_essay("cat").result()
    is_approved = interrupt(
        {
            "essay": essay,
            "action": "Please approve/reject the essay",
        }
    )
    return {
        "essay": essay,
        "is_approved": is_approved,
    }


thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
for item in workflow.stream("cat", config):
    print(item)
# > {'write_essay': 'An essay about topic: cat'}
# > {
# >     '__interrupt__': (
# >        Interrupt(
# >            value={
# >                'essay': 'An essay about topic: cat',
# >                'action': 'Please approve/reject the essay'
# >            },
# >            id='b9b2b9d788f482663ced6dc755c9e981'
# >        ),
# >    )
# > }
文章已撰写完毕并准备好接受审核。一旦提供了审核,我们可以恢复工作流:
from langgraph.types import Command

# 从用户获取审核(例如,通过 UI)
# 在此例中,我们使用布尔值,但可以是任何可 JSON 序列化的值。
human_review = True

for item in workflow.stream(Command(resume=human_review), config):
    print(item)
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
工作流已完成,审核已添加到文章中。

入口点

@entrypoint 装饰器可用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断

定义

入口点通过使用 @entrypoint 装饰器装饰函数来定义。 函数必须接受单个位置参数,该参数作为工作流输入。如果你需要传递多个数据,请使用字典作为第一个参数的输入类型。 使用 entrypoint 装饰函数会生成一个 Pregel 实例,它帮助管理工作流的执行(例如,处理流式输出、恢复和检查点)。 你通常会希望向 @entrypoint 装饰器传递一个检查点器以启用持久化并使用人机协作等功能。
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: dict) -> int:
    # 可能涉及长时间运行任务(如 API 调用)的逻辑,
    # 并且可能因人机协作而中断。
    ...
    return result
序列化 入口点的输入输出必须是可 JSON 序列化的,以支持检查点。请参阅序列化部分了解更多详情。

可注入参数

声明 entrypoint 时,你可以请求访问将在运行时自动注入的附加参数。这些参数包括:
参数描述
previous访问给定线程的前一个 checkpoint 关联的状态。参见短期记忆
store[BaseStore][langgraph.store.base.BaseStore] 的实例。用于长期记忆
writer在 Async Python < 3.11 时用于访问 StreamWriter。详情请参阅函数式 API 的流式输出
config用于访问运行时配置。参见 RunnableConfig 了解信息。
使用适当的名称和类型注解声明参数。
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import StreamWriter

in_memory_checkpointer = InMemorySaver(...)
in_memory_store = InMemoryStore(...)  # 用于长期记忆的 InMemoryStore 实例

@entrypoint(
    checkpointer=in_memory_checkpointer,  # 指定检查点器
    store=in_memory_store  # 指定存储
)
def my_workflow(
    some_input: dict,  # 输入(例如通过 `invoke` 传递)
    *,
    previous: Any = None, # 用于短期记忆
    store: BaseStore,  # 用于长期记忆
    writer: StreamWriter,  # 用于流式输出自定义数据
    config: RunnableConfig  # 用于访问传递给入口点的配置
) -> ...:

执行

使用 @entrypoint 会产生一个 Pregel 对象,可以使用 invokeainvokestreamastream 方法执行。
config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}
my_workflow.invoke(some_input, config)  # 同步等待结果

恢复

中断后恢复执行可以通过向 Command 原语传递 resume 值来完成。
from langgraph.types import Command

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(Command(resume=some_resume_value), config)
错误后恢复 要在错误后恢复,使用 None 和相同的 thread id(config)运行 entrypoint 这假设底层错误已解决,执行可以成功继续。

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(None, config)

短期记忆

entrypoint 使用 checkpointer 定义时,它会在同一 thread id 上的连续调用之间将信息存储在检查点中。 这允许使用 previous 参数访问上一次调用的状态。 默认情况下,previous 参数是上一次调用的返回值。
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> int:
    previous = previous or 0
    return number + previous

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

my_workflow.invoke(1, config)  # 1(previous 为 None)
my_workflow.invoke(2, config)  # 3(previous 为上次调用的 1)

entrypoint.final

entrypoint.final 是一个特殊原语,可以从入口点返回,允许将保存在检查点中的值入口点的返回值****解耦 第一个值是入口点的返回值,第二个值是将保存在检查点中的值。类型注解为 entrypoint.final[return_type, save_type]
@entrypoint(checkpointer=checkpointer)
def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]:
    previous = previous or 0
    # 这将向调用者返回 previous 值,将
    # 2 * number 保存到检查点,在下次调用时
    # 用于 `previous` 参数。
    return entrypoint.final(value=previous, save=2 * number)

config = {
    "configurable": {
        "thread_id": "1"
    }
}

my_workflow.invoke(3, config)  # 0(previous 为 None)
my_workflow.invoke(1, config)  # 6(previous 为上次调用的 3 * 2)

任务

任务表示一个离散的工作单元,如 API 调用或数据处理步骤。它有两个关键特征:
  • 异步执行:任务设计为异步执行,允许多个操作并发运行而不阻塞。
  • 检查点:任务结果保存到检查点,使工作流能够从上次保存的状态恢复。(详情请参阅持久化)。

定义

任务使用 @task 装饰器定义,它包装一个普通的 Python 函数。
from langgraph.func import task

@task()
def slow_computation(input_value):
    # 模拟长时间运行的操作
    ...
    return result
序列化 任务的输出必须是可 JSON 序列化的,以支持检查点。

执行

任务只能从入口点、另一个任务状态图节点内部调用。 任务_不能_直接从主应用程序代码调用。 当你调用一个任务时,它会_立即_返回一个 future 对象。future 是稍后可用结果的占位符。 要获取任务的结果,你可以同步等待(使用 result())或异步等待(使用 await)。
@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: int) -> int:
    future = slow_computation(some_input)
    return future.result()  # 同步等待结果

何时使用任务

任务在以下场景中很有用:
  • 检查点:当你需要将长时间运行操作的结果保存到检查点时,这样在恢复工作流时就不需要重新计算。
  • 人机协作:如果你正在构建需要人工干预的工作流,你必须使用任务来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。详情请参阅确定性部分。
  • 并行执行:对于 I/O 密集型任务,任务支持并行执行,允许多个操作并发运行而不阻塞(例如,调用多个 API)。
  • 可观测性:将操作包装在任务中提供了一种使用 LangSmith 跟踪工作流进度和监控单个操作执行的方式。
  • 可重试工作:当工作需要重试以处理故障或不一致时,任务提供了一种封装和管理重试逻辑的方式。

序列化

LangGraph 中序列化有两个关键方面:
  1. entrypoint 的输入和输出必须是可 JSON 序列化的。
  2. task 的输出必须是可 JSON 序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用 Python 原语(如字典、列表、字符串、数字和布尔值)来确保你的输入和输出是可序列化的。 序列化确保工作流状态(如任务结果和中间值)可以被可靠地保存和恢复。这对于启用人机协作交互、容错和并行执行至关重要。 当工作流配置了检查点器时,提供不可序列化的输入或输出将导致运行时错误。

确定性

要利用人机协作等功能,任何随机性都应封装在任务内部。这保证了当执行停止(例如用于人机协作)然后恢复时,它将遵循相同的_步骤序列_,即使任务结果是不确定的。 LangGraph 通过在任务子图执行时持久化其结果来实现此行为。设计良好的工作流确保恢复执行遵循_相同的步骤序列_,允许正确检索先前计算的结果而无需重新执行。这对于长时间运行的任务或具有不确定性结果的任务特别有用,因为它避免了重复先前完成的工作,并允许从基本相同的位置恢复。 虽然工作流的不同运行可以产生不同的结果,但恢复一个特定运行应始终遵循相同的已记录步骤序列。这允许 LangGraph 高效地查找在图中断之前执行的任务子图结果,并避免重新计算它们。

幂等性

幂等性确保多次运行相同操作产生相同结果。这有助于防止在步骤因故障而重新运行时出现重复的 API 调用和冗余处理。始终将 API 调用放在任务函数内以进行检查点,并设计它们在重新执行时是幂等的。如果任务启动但未成功完成,则可能发生重新执行。然后,如果工作流恢复,任务将再次运行。使用幂等性键或验证现有结果以避免重复。

常见陷阱

处理副作用

将副作用(例如写入文件、发送邮件)封装在任务中,以确保恢复工作流时它们不会被多次执行。
在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时会第二次执行。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    # 此代码在恢复工作流时会第二次执行。
    # 这很可能不是你想要的。
    with open("output.txt", "w") as f:
        f.write("Side effect executed")
    value = interrupt("question")
    return value

非确定性控制流

可能每次给出不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保恢复时返回相同的结果。
  • 在任务中:获取随机数 (5) -> 中断 -> 恢复 -> (再次返回 5) -> …
  • 不在任务中:获取随机数 (5) -> 中断 -> 恢复 -> 获取新随机数 (7) -> …
这在使用包含多个中断调用的人机协作工作流时尤其重要。LangGraph 为每个任务/入口点维护一个恢复值列表。当遇到中断时,它与相应的恢复值匹配。此匹配严格基于索引,因此恢复值的顺序应与中断的顺序匹配。 如果恢复时执行顺序未保持,一个 interrupt 调用可能与错误的 resume 值匹配,导致不正确的结果。 更多详情请阅读确定性部分。
在此示例中,工作流使用当前时间来确定执行哪个任务。这是不确定的,因为工作流的结果取决于执行时的时间。
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    t0 = inputs["t0"]
    t1 = time.time()

    delta_t = t1 - t0

    if delta_t > 1:
        result = slow_task(1).result()
        value = interrupt("question")
    else:
        result = slow_task(2).result()
        value = interrupt("question")

    return {
        "result": result,
        "value": value
    }

了解更多