Documentation Index Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Pregel 实现了 LangGraph 的运行时,管理 LangGraph 应用的执行。
编译 StateGraph 或创建 @entrypoint 会生成一个可以用输入调用的 Pregel 实例。
本指南从高层次解释运行时,并提供直接使用 Pregel 实现应用的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法 命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
在 LangGraph 中,Pregel 将Actor 和通道 组合到单个应用中。Actor 从通道读取数据并向通道写入数据。Pregel 按照 Pregel 算法 /批量同步并行 模型将应用的执行组织为多个步骤。
每个步骤由三个阶段组成:
规划 :确定在此步骤中执行哪些 Actor 。例如,在第一步中,选择订阅了特殊输入 通道的 Actor ;在后续步骤中,选择订阅了上一步更新的通道的 Actor 。
执行 :并行执行所有选中的 Actor ,直到全部完成、其中一个失败或达到超时。在此阶段,通道更新对 Actor 不可见,直到下一步。
更新 :使用 Actor 在此步骤中写入的值更新通道。
重复执行直到没有 Actor 被选中执行,或达到最大步数。
Actor
Actor 是一个 PregelNode。它订阅通道、从通道读取数据并向通道写入数据。可以将其视为 Pregel 算法中的 Actor 。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道用于 Actor(PregelNode)之间的通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接收一系列更新并修改存储的值。通道可用于从一个链向另一个链发送数据,或从一个链向自己在未来步骤中发送数据。
LastValue
LastValue 是默认的通道类型。它存储写入的最后一个值,覆盖任何先前的值。用于输入和输出值,或用于在步骤之间传递数据。
from langgraph . channels import LastValue
channel : LastValue [ int ] = LastValue ( int )
Topic
Topic 是一个可配置的发布/订阅通道,用于在 Actor 之间发送多个值或在步骤之间累积输出。它可以配置为去重值或累积运行期间写入的所有值。
from langgraph . channels import Topic
# 累积跨步骤写入的所有值
channel : Topic [ str ] = Topic ( str , accumulate = True )
BinaryOperatorAggregate
BinaryOperatorAggregate 存储一个持久值,通过将二元运算符应用于当前值和每个新更新来更新。用于在步骤之间计算运行聚合。
import operator
from langgraph . channels import BinaryOperatorAggregate
# 运行总计:每次写入都添加到当前值
total = BinaryOperatorAggregate ( int , operator . add )
DeltaChannel(测试版)
DeltaChannel 需要 langgraph>=1.2,目前处于测试版。API 可能在未来版本中更改。
DeltaChannel 仅在每个步骤存储增量差异,而不是完整的累积值。这对于频繁写入并随时间累积大量值的通道最有用——例如,长时间运行线程中的对话消息列表。如果不使用增量存储,完整列表会在每个检查点中重新序列化;使用 DeltaChannel,只存储每个步骤中写入的新消息。
当通道既频繁写入又随时间增长很大时,考虑使用 DeltaChannel。一个好的信号:如果你注意到某个特定通道的检查点大小随线程长度线性增长,DeltaChannel 可能很合适。
在 Annotated 类型注解中使用 DeltaChannel,方式与使用普通归约器相同:
from typing import Annotated , Sequence
from typing_extensions import TypedDict
from langgraph . channels import DeltaChannel
def my_reducer ( state : list [ str ], writes : Sequence [ list [ str ]]) -> list [ str ]:
result = list ( state )
for write in writes :
result . extend ( write )
return result
class State ( TypedDict ):
messages : Annotated [ list [ str ], DeltaChannel ( my_reducer )]
批量归约器要求
传递给 DeltaChannel 的 reducer 是一个批量归约器 :它在单次调用中接收当前状态和当前步骤中所有写入的序列 ——而不是像标准归约器那样成对调用。这与 StateGraph 中 Annotated 使用的每键归约器不同,后者每次更新调用一次。
批量归约器必须是结合性的 (批处理不变的): reducer(reducer(state, [xs]), [ys]) == reducer(state, [xs, ys])
如果你的归约器不是结合性的,重建的状态可能会因 LangGraph 跨步骤批处理写入的方式不同而不同,产生不一致的行为。
归约器在重建时运行,而不是在写入时。 与 BinaryOperatorAggregate 不同(后者的归约器在写入时调用,因此组合值是序列化到检查点中的内容),DeltaChannel 的归约器在通道值从其持久化写入重建 时调用。原始的每步写入是被序列化的内容;归约器仅在值被具体化时调用——在下一次读取时、在下一步的 Actor 中或在重放历史时。设计归约器时的实际影响:
使其成为 (state, writes) 的纯函数。 任何副作用、随机性或墙钟时间读取(例如 uuid.uuid4()、datetime.now())在每次值重建时执行,并在每次重放时产生不同的结果。它们不会 被固化到持久化的写入中。
不要依赖对传入写入的变异被持久化。 如果你的归约器变异了一个写入对象(例如,为到达时没有 ID 的项分配一个稳定的 ID),该变异只存在于重建的值中。存储的写入仍然具有原始形状,因此下次重建将再次看到未变异的输入。
在上游附加标识和其他稳定元数据。 如果下游代码需要在回合之间按 ID 引用项(例如,稍后更新或删除它),请在值写入通道之前分配该 ID——而不是在归约器内部。
以下是两种最常见情况的批量归约器:
from typing import Any , Sequence
# 列表:按顺序追加所有写入
def list_reducer ( state : list [ Any ], writes : Sequence [ list [ Any ]]) -> list [ Any ]:
result = list ( state )
for write in writes :
result . extend ( write )
return result
# 字典:合并所有写入,键冲突时最后一次写入获胜
def dict_reducer (
state : dict [ str , Any ], writes : Sequence [ dict [ str , Any ]]
) -> dict [ str , Any ]:
result = dict ( state )
for write in writes :
result . update ( write )
return result
两者都是结合性的:逐批应用和一起应用产生相同的结果。
使用 snapshot_frequency 限定读取延迟
如果没有快照,读取 DeltaChannel 的值需要重放完整的写入历史——对于有 N 步的线程是 O(N)。设置 snapshot_frequency=K 每 K 个 Pregel 步骤写入一个完整快照,将读取深度限制为最多 K 步:
class State ( TypedDict ):
messages : Annotated [
list [ str ],
DeltaChannel ( my_reducer , snapshot_frequency = 5 ),
]
较高的 snapshot_frequency 值减少存储开销但增加读取延迟。较低的值更紧密地限制延迟,但代价是更大的检查点。None(默认值)完全跳过快照——适用于读取稀少或线程较短的情况。
虽然大多数用户会通过 StateGraph API 或 @entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,让你了解 Pregel API。
单节点
多节点
Topic
BinaryOperatorAggregate
循环
from langgraph . channels import EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" )
)
app = Pregel (
nodes = { "node1" : node1 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
},
input_channels = [ "a" ],
output_channels = [ "b" ],
)
app . invoke ({ "a" : "foo" })
from langgraph . channels import LastValue , EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" )
)
node2 = (
NodeBuilder (). subscribe_only ( "b" )
. do ( lambda x : x + x )
. write_to ( "c" )
)
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : LastValue ( str ),
"c" : EphemeralValue ( str ),
},
input_channels = [ "a" ],
output_channels = [ "b" , "c" ],
)
app . invoke ({ "a" : "foo" })
{'b': 'foofoo', 'c': 'foofoofoofoo'}
from langgraph . channels import EphemeralValue , Topic
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" , "c" )
)
node2 = (
NodeBuilder (). subscribe_to ( "b" )
. do ( lambda x : x [ " b " ] + x [ " b " ])
. write_to ( "c" )
)
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
"c" : Topic ( str , accumulate = True ),
},
input_channels = [ "a" ],
output_channels = [ "c" ],
)
app . invoke ({ "a" : "foo" })
{'c': ['foofoo', 'foofoofoofoo']}
此示例演示如何使用 BinaryOperatorAggregate 通道实现归约器。 from langgraph . channels import EphemeralValue , BinaryOperatorAggregate
from langgraph . pregel import Pregel , NodeBuilder
node1 = (
NodeBuilder (). subscribe_only ( "a" )
. do ( lambda x : x + x )
. write_to ( "b" , "c" )
)
node2 = (
NodeBuilder (). subscribe_only ( "b" )
. do ( lambda x : x + x )
. write_to ( "c" )
)
def reducer ( current , update ):
if current :
return current + " | " + update
else :
return update
app = Pregel (
nodes = { "node1" : node1 , "node2" : node2 },
channels = {
"a" : EphemeralValue ( str ),
"b" : EphemeralValue ( str ),
"c" : BinaryOperatorAggregate ( str , operator = reducer ),
},
input_channels = [ "a" ],
output_channels = [ "c" ],
)
app . invoke ({ "a" : "foo" })
此示例演示如何在图中引入循环,让一个链写入它订阅的通道。执行将持续进行,直到向通道写入 None 值。 from langgraph . channels import EphemeralValue
from langgraph . pregel import Pregel , NodeBuilder , ChannelWriteEntry
example_node = (
NodeBuilder (). subscribe_only ( "value" )
. do ( lambda x : x + x if len ( x ) < 10 else None )
. write_to ( ChannelWriteEntry ( "value" , skip_none = True ))
)
app = Pregel (
nodes = { "example_node" : example_node },
channels = {
"value" : EphemeralValue ( str ),
},
input_channels = [ "value" ],
output_channels = [ "value" ],
)
app . invoke ({ "value" : "a" })
{'value': 'aaaaaaaaaaaaaaaa'}
高级 API
LangGraph 提供两种高级 API 来创建 Pregel 应用:StateGraph(Graph API) 和 Functional API 。
StateGraph(Graph API)
Functional API
StateGraph(Graph API) 是一个更高级的抽象,简化了 Pregel 应用的创建。它允许你定义节点和边的图。当你编译图时,StateGraph API 会自动为你创建 Pregel 应用。from typing import TypedDict
from langgraph . constants import START
from langgraph . graph import StateGraph
class Essay ( TypedDict ):
topic : str
content : str | None
score : float | None
def write_essay ( essay : Essay ):
return {
"content" : f "Essay about { essay [ ' topic ' ] } " ,
}
def score_essay ( essay : Essay ):
return {
"score" : 10
}
builder = StateGraph ( Essay )
builder . add_node ( write_essay )
builder . add_node ( score_essay )
builder . add_edge ( START , "write_essay" )
builder . add_edge ( "write_essay" , "score_essay" )
# 编译图。
# 这将返回一个 Pregel 实例。
graph = builder . compile ()
编译后的 Pregel 实例将与节点和通道列表关联。你可以通过打印它们来检查节点和通道。 你将看到类似这样的内容: {'__start__': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1810>,
'write_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba14d0>,
'score_essay': <langgraph.pregel.read.PregelNode at 0x7d05e3ba1710>}
你应该看到类似这样的内容: {'topic': <langgraph.channels.last_value.LastValue at 0x7d05e3294d80>,
'content': <langgraph.channels.last_value.LastValue at 0x7d05e3295040>,
'score': <langgraph.channels.last_value.LastValue at 0x7d05e3295980>,
'__start__': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3297e00>,
'write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32960c0>,
'score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ab80>,
'branch:__start__:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e32941c0>,
'branch:__start__:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d88800>,
'branch:write_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e3295ec0>,
'branch:write_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8ac00>,
'branch:score_essay:__self__:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d89700>,
'branch:score_essay:__self__:score_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b400>,
'start:write_essay': <langgraph.channels.ephemeral_value.EphemeralValue at 0x7d05e2d8b280>}
在 Functional API 中,你可以使用 @entrypoint 创建 Pregel 应用。entrypoint 装饰器允许你定义一个接收输入并返回输出的函数。 from typing import TypedDict
from langgraph . checkpoint . memory import InMemorySaver
from langgraph . func import entrypoint
class Essay ( TypedDict ):
topic : str
content : str | None
score : float | None
checkpointer = InMemorySaver ()
@entrypoint ( checkpointer = checkpointer )
def write_essay ( essay : Essay ):
return {
"content" : f "Essay about { essay [ ' topic ' ] } " ,
}
print ( "节点:" )
print ( write_essay . nodes )
print ( "通道:" )
print ( write_essay . channels )
节点:
{'write_essay': <langgraph.pregel.read.PregelNode object at 0x7d05e2f9aad0>}
通道:
{'__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x7d05e2c906c0>, '__end__': <langgraph.channels.last_value.LastValue object at 0x7d05e2c90c40>, '__previous__': <langgraph.channels.last_value.LastValue object at 0x7d05e1007280>}
将这些文档连接 到 Claude、VSCode 等工具,通过 MCP 获取实时答案。