Documentation Index Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Pregel 实现了 LangGraph 的运行时,管理 LangGraph 应用的执行。
编译 StateGraph 或创建 entrypoint 会生成一个 Pregel 实例,可以通过输入来调用。
本指南从高层次解释运行时,并提供直接使用 Pregel 实现应用的说明。
注意: Pregel 运行时以 Google 的 Pregel 算法 命名,该算法描述了一种使用图进行大规模并行计算的高效方法。
在 LangGraph 中,Pregel 将 actors 和 channels 组合到单个应用中。Actors 从 channels 读取数据并向 channels 写入数据。Pregel 按照 Pregel 算法 /批量同步并行 模型将应用的执行组织为多个步骤。
每个步骤包含三个阶段:
计划 :确定此步骤中要执行哪些 actors 。例如,在第一步中,选择订阅特殊 input channels 的 actors ;在后续步骤中,选择订阅上一步中更新的 channels 的 actors 。
执行 :并行执行所有选中的 actors ,直到全部完成、某个失败或达到超时。在此阶段,channel 更新对 actors 不可见,直到下一步。
更新 :使用此步骤中 actors 写入的值更新 channels。
重复以上过程,直到没有 actors 被选择执行,或达到最大步骤数。
Actors
actor 是一个 PregelNode。它订阅 channels,从中读取数据,并向其写入数据。可以将其视为 Pregel 算法中的 actor 。PregelNodes 实现了 LangChain 的 Runnable 接口。
Channels
Channels 用于在 actors(PregelNodes)之间通信。每个 channel 有一个值类型、一个更新类型和一个更新函数——该函数接收一系列更新并修改存储的值。Channels 可用于从一个链向另一个链发送数据,或从一个链在未来步骤中向自身发送数据。
LastValue
LastValue 是默认的 channel 类型。它存储写入的最后一个值,覆盖任何先前的值。用于输入和输出值,或在步骤之间传递数据。
import { LastValue } from "@langchain/langgraph/channels" ;
const channel = new LastValue < number > () ;
Topic
Topic 是一个可配置的发布/订阅 channel,适用于在 actors 之间发送多个值或跨步骤累积输出。可以配置为去重值或累积运行期间写入的所有值。
import { Topic } from "@langchain/langgraph/channels" ;
// 累积跨步骤写入的所有值
const channel = new Topic < string > ( { accumulate : true } ) ;
BinaryOperatorAggregate
BinaryOperatorAggregate 存储一个持久值,通过对当前值和每个新更新应用二元运算符来更新。用于计算跨步骤的运行聚合。
import { BinaryOperatorAggregate } from "@langchain/langgraph/channels" ;
// 运行总计:每次写入将值加到当前值上
const total = new BinaryOperatorAggregate < number > ( { operator : ( a , b ) => a + b } ) ;
虽然大多数用户会通过 StateGraph API 或 entrypoint 装饰器与 Pregel 交互,但也可以直接与 Pregel 交互。
以下是几个不同的示例,让您了解 Pregel API。
单节点
多节点
Topic
BinaryOperatorAggregate
循环
import { EphemeralValue } from "@langchain/langgraph/channels" ;
import { Pregel , NodeBuilder } from "@langchain/langgraph/pregel" ;
const node1 = new NodeBuilder ()
. subscribeOnly ( "a" )
. do ( ( x : string ) => x + x)
. writeTo ( "b" ) ;
const app = new Pregel ( {
nodes : { node1 },
channels : {
a : new EphemeralValue < string > () ,
b : new EphemeralValue < string > () ,
},
inputChannels : [ "a" ] ,
outputChannels : [ "b" ] ,
} ) ;
await app . invoke ( { a : "foo" } ) ;
import { LastValue , EphemeralValue } from "@langchain/langgraph/channels" ;
import { Pregel , NodeBuilder } from "@langchain/langgraph/pregel" ;
const node1 = new NodeBuilder ()
. subscribeOnly ( "a" )
. do ( ( x : string ) => x + x)
. writeTo ( "b" ) ;
const node2 = new NodeBuilder ()
. subscribeOnly ( "b" )
. do ( ( x : string ) => x + x)
. writeTo ( "c" ) ;
const app = new Pregel ( {
nodes : { node1 , node2 },
channels : {
a : new EphemeralValue < string > () ,
b : new LastValue < string > () ,
c : new EphemeralValue < string > () ,
},
inputChannels : [ "a" ] ,
outputChannels : [ "b" , "c" ] ,
} ) ;
await app . invoke ( { a : "foo" } ) ;
{ b: 'foofoo', c: 'foofoofoofoo' }
import { EphemeralValue , Topic } from "@langchain/langgraph/channels" ;
import { Pregel , NodeBuilder } from "@langchain/langgraph/pregel" ;
const node1 = new NodeBuilder ()
. subscribeOnly ( "a" )
. do ( ( x : string ) => x + x)
. writeTo ( "b" , "c" ) ;
const node2 = new NodeBuilder ()
. subscribeTo ( "b" )
. do ( ( x : { b : string }) => x . b + x . b)
. writeTo ( "c" ) ;
const app = new Pregel ( {
nodes : { node1 , node2 },
channels : {
a : new EphemeralValue < string > () ,
b : new EphemeralValue < string > () ,
c : new Topic < string > ( { accumulate : true } ) ,
},
inputChannels : [ "a" ] ,
outputChannels : [ "c" ] ,
} ) ;
await app . invoke ( { a : "foo" } ) ;
{ c: ['foofoo', 'foofoofoofoo'] }
此示例演示如何使用 BinaryOperatorAggregate channel 来实现 reducer。 import { EphemeralValue , BinaryOperatorAggregate } from "@langchain/langgraph/channels" ;
import { Pregel , NodeBuilder } from "@langchain/langgraph/pregel" ;
const node1 = new NodeBuilder ()
. subscribeOnly ( "a" )
. do ( ( x : string ) => x + x)
. writeTo ( "b" , "c" ) ;
const node2 = new NodeBuilder ()
. subscribeOnly ( "b" )
. do ( ( x : string ) => x + x)
. writeTo ( "c" ) ;
const reducer = ( current : string , update : string ) => {
if (current) {
return current + " | " + update ;
} else {
return update ;
}
};
const app = new Pregel ( {
nodes : { node1 , node2 },
channels : {
a : new EphemeralValue < string > () ,
b : new EphemeralValue < string > () ,
c : new BinaryOperatorAggregate < string > ( { operator : reducer } ) ,
},
inputChannels : [ "a" ] ,
outputChannels : [ "c" ] ,
} ) ;
await app . invoke ( { a : "foo" } ) ;
此示例演示如何在图中引入循环,方法是让一个链写入它所订阅的 channel。执行将继续直到向 channel 写入 null 值。 import { EphemeralValue } from "@langchain/langgraph/channels" ;
import { Pregel , NodeBuilder , ChannelWriteEntry } from "@langchain/langgraph/pregel" ;
const exampleNode = new NodeBuilder ()
. subscribeOnly ( "value" )
. do ( ( x : string ) => x . length < 10 ? x + x : null )
. writeTo ( new ChannelWriteEntry ( "value" , { skipNone : true } )) ;
const app = new Pregel ( {
nodes : { exampleNode },
channels : {
value : new EphemeralValue < string > () ,
},
inputChannels : [ "value" ] ,
outputChannels : [ "value" ] ,
} ) ;
await app . invoke ( { value : "a" } ) ;
{ value: 'aaaaaaaaaaaaaaaa' }
高级 API
LangGraph 提供了两个高级 API 来创建 Pregel 应用:StateGraph(图 API) 和函数式 API 。
StateGraph(图 API)
函数式 API
StateGraph(图 API) 是一个更高级别的抽象,简化了 Pregel 应用的创建。它允许您定义节点和边的图。当您编译图时,StateGraph API 会自动为您创建 Pregel 应用。import { START , StateGraph } from "@langchain/langgraph" ;
interface Essay {
topic : string ;
content ?: string ;
score ?: number ;
}
const writeEssay = ( essay : Essay ) => {
return {
content : `Essay about ${ essay . topic } ` ,
};
};
const scoreEssay = ( essay : Essay ) => {
return {
score : 10
};
};
const builder = new StateGraph < Essay > ( {
channels : {
topic : null ,
content : null ,
score : null ,
}
} )
. addNode ( "writeEssay" , writeEssay)
. addNode ( "scoreEssay" , scoreEssay)
. addEdge (START , "writeEssay" )
. addEdge ( "writeEssay" , "scoreEssay" ) ;
// 编译图。
// 这将返回一个 Pregel 实例。
const graph = builder . compile () ;
编译后的 Pregel 实例将关联一组节点和 channels。您可以通过打印来检查节点和 channels。 console . log (graph . nodes) ;
您将看到类似以下内容: {
__start__: PregelNode { ... },
writeEssay: PregelNode { ... },
scoreEssay: PregelNode { ... }
}
console . log (graph . channels) ;
您应该看到类似以下内容 {
topic: LastValue { ... },
content: LastValue { ... },
score: LastValue { ... },
__start__: EphemeralValue { ... },
writeEssay: EphemeralValue { ... },
scoreEssay: EphemeralValue { ... },
'branch:__start__:__self__:writeEssay': EphemeralValue { ... },
'branch:__start__:__self__:scoreEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:writeEssay:__self__:scoreEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:writeEssay': EphemeralValue { ... },
'branch:scoreEssay:__self__:scoreEssay': EphemeralValue { ... },
'start:writeEssay': EphemeralValue { ... }
}
在函数式 API 中,您可以使用 entrypoint 来创建 Pregel 应用。entrypoint 装饰器允许您定义一个接收输入并返回输出的函数。 import { MemorySaver } from "@langchain/langgraph" ;
import { entrypoint } from "@langchain/langgraph/func" ;
interface Essay {
topic : string ;
content ?: string ;
score ?: number ;
}
const checkpointer = new MemorySaver () ;
const writeEssay = entrypoint (
{ checkpointer , name : "writeEssay" },
async ( essay : Essay ) => {
return {
content : `Essay about ${ essay . topic } ` ,
};
}
) ;
console . log ( "Nodes: " ) ;
console . log (writeEssay . nodes) ;
console . log ( "Channels: " ) ;
console . log (writeEssay . channels) ;
Nodes:
{ writeEssay: PregelNode { ... } }
Channels:
{
__start__: EphemeralValue { ... },
__end__: LastValue { ... },
__previous__: LastValue { ... }
}
将这些文档连接 到 Claude、VSCode 等,通过 MCP 获取实时答案。