Skip to main content

Documentation Index

Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Pregel 实现了 LangGraph 的运行时,管理 LangGraph 应用的执行。 编译 StateGraph 或创建 @entrypoint 会生成一个可以用输入调用的 Pregel 实例。 本指南从高层次解释运行时,并提供直接使用 Pregel 实现应用的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法命名,该算法描述了一种使用图进行大规模并行计算的高效方法。

概述

在 LangGraph 中,Pregel 将Actor通道组合到单个应用中。Actor 从通道读取数据并向通道写入数据。Pregel 按照 Pregel 算法/批量同步并行模型将应用的执行组织为多个步骤。 每个步骤由三个阶段组成:
  • 规划:确定在此步骤中执行哪些 Actor。例如,在第一步中,选择订阅了特殊输入通道的 Actor;在后续步骤中,选择订阅了上一步更新的通道的 Actor
  • 执行:并行执行所有选中的 Actor,直到全部完成、其中一个失败或达到超时。在此阶段,通道更新对 Actor 不可见,直到下一步。
  • 更新:使用 Actor 在此步骤中写入的值更新通道。
重复执行直到没有 Actor 被选中执行,或达到最大步数。

Actor

Actor 是一个 PregelNode。它订阅通道、从通道读取数据并向通道写入数据。可以将其视为 Pregel 算法中的 ActorPregelNodes 实现了 LangChain 的 Runnable 接口。

通道

通道用于 Actor(PregelNode)之间的通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接收一系列更新并修改存储的值。通道可用于从一个链向另一个链发送数据,或从一个链向自己在未来步骤中发送数据。

LastValue

LastValue 是默认的通道类型。它存储写入的最后一个值,覆盖任何先前的值。用于输入和输出值,或用于在步骤之间传递数据。
from langgraph.channels import LastValue

channel: LastValue[int] = LastValue(int)

Topic

Topic 是一个可配置的发布/订阅通道,用于在 Actor 之间发送多个值或在步骤之间累积输出。它可以配置为去重值或累积运行期间写入的所有值。
from langgraph.channels import Topic

# 累积跨步骤写入的所有值
channel: Topic[str] = Topic(str, accumulate=True)

BinaryOperatorAggregate

BinaryOperatorAggregate 存储一个持久值,通过将二元运算符应用于当前值和每个新更新来更新。用于在步骤之间计算运行聚合。
import operator
from langgraph.channels import BinaryOperatorAggregate

# 运行总计:每次写入都添加到当前值
total = BinaryOperatorAggregate(int, operator.add)

DeltaChannel(测试版)

DeltaChannel 需要 langgraph>=1.2,目前处于测试版。API 可能在未来版本中更改。
DeltaChannel 仅在每个步骤存储增量差异,而不是完整的累积值。这对于频繁写入并随时间累积大量值的通道最有用——例如,长时间运行线程中的对话消息列表。如果不使用增量存储,完整列表会在每个检查点中重新序列化;使用 DeltaChannel,只存储每个步骤中写入的新消息。
当通道既频繁写入又随时间增长很大时,考虑使用 DeltaChannel。一个好的信号:如果你注意到某个特定通道的检查点大小随线程长度线性增长,DeltaChannel 可能很合适。
Annotated 类型注解中使用 DeltaChannel,方式与使用普通归约器相同:
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.channels import DeltaChannel


def my_reducer(state: list[str], writes: Sequence[list[str]]) -> list[str]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


class State(TypedDict):
    messages: Annotated[list[str], DeltaChannel(my_reducer)]

批量归约器要求

传递给 DeltaChannelreducer 是一个批量归约器:它在单次调用中接收当前状态和当前步骤中所有写入的序列——而不是像标准归约器那样成对调用。这与 StateGraphAnnotated 使用的每键归约器不同,后者每次更新调用一次。
批量归约器必须是结合性的(批处理不变的):
reducer(reducer(state, [xs]), [ys]) == reducer(state, [xs, ys])
如果你的归约器不是结合性的,重建的状态可能会因 LangGraph 跨步骤批处理写入的方式不同而不同,产生不一致的行为。
归约器在重建时运行,而不是在写入时。BinaryOperatorAggregate 不同(后者的归约器在写入时调用,因此组合值是序列化到检查点中的内容),DeltaChannel 的归约器在通道值从其持久化写入重建时调用。原始的每步写入是被序列化的内容;归约器仅在值被具体化时调用——在下一次读取时、在下一步的 Actor 中或在重放历史时。设计归约器时的实际影响:
  • 使其成为 (state, writes) 的纯函数。 任何副作用、随机性或墙钟时间读取(例如 uuid.uuid4()datetime.now())在每次值重建时执行,并在每次重放时产生不同的结果。它们不会被固化到持久化的写入中。
  • 不要依赖对传入写入的变异被持久化。 如果你的归约器变异了一个写入对象(例如,为到达时没有 ID 的项分配一个稳定的 ID),该变异只存在于重建的值中。存储的写入仍然具有原始形状,因此下次重建将再次看到未变异的输入。
  • 在上游附加标识和其他稳定元数据。 如果下游代码需要在回合之间按 ID 引用项(例如,稍后更新或删除它),请在值写入通道之前分配该 ID——而不是在归约器内部。
以下是两种最常见情况的批量归约器:
from typing import Any, Sequence


# 列表:按顺序追加所有写入
def list_reducer(state: list[Any], writes: Sequence[list[Any]]) -> list[Any]:
    result = list(state)
    for write in writes:
        result.extend(write)
    return result


# 字典:合并所有写入,键冲突时最后一次写入获胜
def dict_reducer(
    state: dict[str, Any], writes: Sequence[dict[str, Any]]
) -> dict[str, Any]:
    result = dict(state)
    for write in writes:
        result.update(write)
    return result
两者都是结合性的:逐批应用和一起应用产生相同的结果。

使用 snapshot_frequency 限定读取延迟

如果没有快照,读取 DeltaChannel 的值需要重放完整的写入历史——对于有 N 步的线程是 O(N)。设置 snapshot_frequency=K 每 K 个 Pregel 步骤写入一个完整快照,将读取深度限制为最多 K 步:
class State(TypedDict):
    messages: Annotated[
        list[str],
        DeltaChannel(my_reducer, snapshot_frequency=5),
    ]
较高的 snapshot_frequency 值减少存储开销但增加读取延迟。较低的值更紧密地限制延迟,但代价是更大的检查点。None(默认值)完全跳过快照——适用于读取稀少或线程较短的情况。

示例

虽然大多数用户会通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。 以下是几个不同的示例,让你了解 Pregel API。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}

高级 API

LangGraph 提供两种高级 API 来创建 Pregel 应用:StateGraph(Graph API)Functional API
StateGraph(Graph API) 是一个更高级的抽象,简化了 Pregel 应用的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用。
from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

# 编译图。
# 这将返回一个 Pregel 实例。
graph = builder.compile()
编译后的 Pregel 实例将与节点和通道列表关联。你可以通过打印它们来检查节点和通道。
print(graph.nodes)
你将看到类似这样的内容:
{'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
 'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
 'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
print(graph.channels)
你应该看到类似这样的内容:
{'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
 'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
 'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
 'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
 'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
 'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
 'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
 'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
 'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
 'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
 'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
 'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}