深度智能体构建在 LangGraph 的流式基础设施之上,对子智能体流提供一流支持。当深度智能体将工作委托给子智能体时,你可以独立地流式输出每个子智能体的更新——实时跟踪进度、LLM Token 和工具调用。 深度智能体流式输出可以实现:Documentation Index
Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
- 流式输出子智能体进度——跟踪每个子智能体在并行运行时的执行。
- 流式输出 LLM Token——从主智能体和每个子智能体流式输出 Token。
- 流式输出工具调用——查看子智能体执行中的工具调用和结果。
- 流式输出自定义更新——从子智能体节点内部发出用户定义的信号。
启用子图流式输出
深度智能体使用 LangGraph 的子图流式输出来呈现子智能体执行中的事件。要接收子智能体事件,请在流式输出时启用stream_subgraphs。
import { createDeepAgent } from "deepagents";
const agent = createDeepAgent({
systemPrompt: "You are a helpful research assistant",
subagents: [
{
name: "researcher",
description: "Researches a topic in depth",
systemPrompt: "You are a thorough researcher.",
},
],
});
for await (const [namespace, chunk] of await agent.stream(
{ messages: [{ role: "user", content: "Research quantum computing advances" }] },
{
streamMode: "updates",
subgraphs: true,
}
)) {
if (namespace.length > 0) {
// 子智能体事件 - namespace 标识来源
console.log(`[subagent: ${namespace.join("|")}]`);
} else {
// 主智能体事件
console.log("[main agent]");
}
console.log(chunk);
}
命名空间
当启用subgraphs 时,每个流式事件包含一个命名空间,标识哪个智能体产生了它。命名空间是代表智能体层级的节点名称和任务 ID 的路径。
| 命名空间 | 来源 |
|---|---|
() (空) | 主智能体 |
("tools:abc123",) | 由主智能体的 task 工具调用 abc123 生成的子智能体 |
("tools:abc123", "model_request:def456") | 子智能体内部的模型请求节点 |
for await (const [namespace, chunk] of await agent.stream(
{ messages: [{ role: "user", content: "Plan my vacation" }] },
{ streamMode: "updates", subgraphs: true }
)) {
// 检查此事件是否来自子智能体
const isSubagent = namespace.some(
(segment: string) => segment.startsWith("tools:")
);
if (isSubagent) {
// 从命名空间中提取工具调用 ID
const toolCallId = namespace
.find((s: string) => s.startsWith("tools:"))
?.split(":")[1];
console.log(`Subagent ${toolCallId}:`, chunk);
} else {
console.log("Main agent:", chunk);
}
}
子智能体进度
使用stream_mode="updates" 在每个步骤完成时跟踪子智能体进度。这对于显示哪些子智能体处于活动状态以及它们完成了什么工作很有用。
import { createDeepAgent } from "deepagents";
const agent = createDeepAgent({
systemPrompt:
"You are a project coordinator. Always delegate research tasks " +
"to your researcher subagent using the task tool. Keep your final response to one sentence.",
subagents: [
{
name: "researcher",
description: "Researches topics thoroughly",
systemPrompt:
"You are a thorough researcher. Research the given topic " +
"and provide a concise summary in 2-3 sentences.",
},
],
});
for await (const [namespace, chunk] of await agent.stream(
{
messages: [
{ role: "user", content: "Write a short summary about AI safety" },
],
},
{ streamMode: "updates", subgraphs: true },
)) {
// 主智能体更新(空命名空间)
if (namespace.length === 0) {
for (const [nodeName, data] of Object.entries(chunk)) {
if (nodeName === "tools") {
// 子智能体结果返回给主智能体
for (const msg of (data as any).messages ?? []) {
if (msg.type === "tool") {
console.log(`\n子智能体完成: ${msg.name}`);
console.log(` 结果: ${String(msg.content).slice(0, 200)}...`);
}
}
} else {
console.log(`[主智能体] 步骤: ${nodeName}`);
}
}
}
// 子智能体更新(非空命名空间)
else {
for (const [nodeName] of Object.entries(chunk)) {
console.log(` [${namespace[0]}] 步骤: ${nodeName}`);
}
}
}
输出
Main agent step: model_request
[tools:call_abc123] step: model_request
[tools:call_abc123] step: tools
[tools:call_abc123] step: model_request
Subagent complete: task
Result: ## AI Safety Report...
Main agent step: model_request
[tools:call_def456] step: model_request
[tools:call_def456] step: model_request
Subagent complete: task
Result: # Comprehensive Report on AI Safety...
Main agent step: model_request
LLM Token
使用stream_mode="messages" 从主智能体和子智能体流式输出单个 Token。每个消息事件包含标识源智能体的元数据。
let currentSource = "";
for await (const [namespace, chunk] of await agent.stream(
{
messages: [
{
role: "user",
content: "Research quantum computing advances",
},
],
},
{ streamMode: "messages", subgraphs: true },
)) {
const [message] = chunk;
// 检查此事件是否来自子智能体(命名空间包含 "tools:")
const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
if (isSubagent) {
// 来自子智能体的 Token
const subagentNs = namespace.find((s: string) => s.startsWith("tools:"))!;
if (subagentNs !== currentSource) {
process.stdout.write(`\n\n--- [subagent: ${subagentNs}] ---\n`);
currentSource = subagentNs;
}
if (message.text) {
process.stdout.write(message.text);
}
} else {
// 来自主智能体的 Token
if ("main" !== currentSource) {
process.stdout.write(`\n\n--- [main agent] ---\n`);
currentSource = "main";
}
if (message.text) {
process.stdout.write(message.text);
}
}
}
process.stdout.write("\n");
工具调用
当子智能体使用工具时,你可以流式输出工具调用事件来显示每个子智能体在做什么。工具调用块出现在messages 流模式中。
import { AIMessageChunk, ToolMessage } from "langchain";
for await (const [namespace, chunk] of await agent.stream(
{
messages: [
{
role: "user",
content: "Research recent quantum computing advances",
},
],
},
{ streamMode: "messages", subgraphs: true },
)) {
const [message] = chunk;
// 标识来源:"main" 或子智能体命名空间段
const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
const source = isSubagent
? namespace.find((s: string) => s.startsWith("tools:"))!
: "main";
// 工具调用块(流式工具调用)
if (AIMessageChunk.isInstance(message) && message.tool_call_chunks?.length) {
for (const tc of message.tool_call_chunks) {
if (tc.name) {
console.log(`\n[${source}] Tool call: ${tc.name}`);
}
// 参数以块的形式流入 - 增量写入
if (tc.args) {
process.stdout.write(tc.args);
}
}
}
// 工具结果
if (ToolMessage.isInstance(message)) {
console.log(
`\n[${source}] Tool result [${message.name}]: ${message.text?.slice(0, 150)}`,
);
}
// 常规 AI 内容(跳过工具调用消息)
if (
AIMessageChunk.isInstance(message) &&
message.text &&
!message.tool_call_chunks?.length
) {
process.stdout.write(message.text);
}
}
process.stdout.write("\n");
自定义更新
在你的子智能体工具中使用config.writer 来发出自定义进度事件:
import { createDeepAgent } from "deepagents";
import { tool, type ToolRuntime } from "langchain";
import { z } from "zod";
/**
* 一个通过 config.writer 发出自定义进度事件的工具。
* writer 将数据发送到 "custom" 流模式。
*/
const analyzeData = tool(
async ({ topic }: { topic: string }, config: ToolRuntime) => {
const writer = config.writer;
writer?.({ status: "starting", topic, progress: 0 });
await new Promise((r) => setTimeout(r, 500));
writer?.({ status: "analyzing", progress: 50 });
await new Promise((r) => setTimeout(r, 500));
writer?.({ status: "complete", progress: 100 });
return `Analysis of "${topic}": Customer sentiment is 85% positive, driven by product quality and support response times.`;
},
{
name: "analyze_data",
description:
"Run a data analysis on a given topic. " +
"This tool performs the actual analysis and emits progress updates. " +
"You MUST call this tool for any analysis request.",
schema: z.object({
topic: z.string().describe("The topic or subject to analyze"),
}),
},
);
const agent = createDeepAgent({
systemPrompt:
"You are a coordinator. For any analysis request, you MUST delegate " +
"to the analyst subagent using the task tool. Never try to answer directly. " +
"After receiving the result, summarize it in one sentence.",
subagents: [
{
name: "analyst",
description: "Performs data analysis with real-time progress tracking",
systemPrompt:
"You are a data analyst. You MUST call the analyze_data tool " +
"for every analysis request. Do not use any other tools. " +
"After the analysis completes, report the result.",
tools: [analyzeData],
},
],
});
for await (const [namespace, chunk] of await agent.stream(
{
messages: [
{
role: "user",
content: "Analyze customer satisfaction trends",
},
],
},
{ streamMode: "custom", subgraphs: true },
)) {
const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
if (isSubagent) {
const subagentNs = namespace.find((s: string) => s.startsWith("tools:"))!;
console.log(`[${subagentNs}]`, chunk);
} else {
console.log("[main]", chunk);
}
}
输出
[tools:call_abc123] { status: 'fetching', progress: 0 }
[tools:call_abc123] { status: 'analyzing', progress: 50 }
[tools:call_abc123] { status: 'complete', progress: 100 }
流式输出多种模式
结合多种流模式以获取智能体执行的完整视图:// 跳过内部中间件步骤 - 仅显示有意义的节点名称
const INTERESTING_NODES = new Set(["model_request", "tools"]);
let lastSource = "";
let midLine = false; // 当我们写了没有尾部换行的 Token 时为 true
for await (const [namespace, mode, data] of await agent.stream(
{
messages: [
{
role: "user",
content: "Analyze the impact of remote work on team productivity",
},
],
},
{ streamMode: ["updates", "messages", "custom"], subgraphs: true },
)) {
const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
const source = isSubagent ? "subagent" : "main";
if (mode === "updates") {
for (const nodeName of Object.keys(data)) {
if (!INTERESTING_NODES.has(nodeName)) continue;
if (midLine) {
process.stdout.write("\n");
midLine = false;
}
console.log(`[${source}] step: ${nodeName}`);
}
} else if (mode === "messages") {
const [message] = data;
if (message.text) {
// 当来源变化时打印标题
if (source !== lastSource) {
if (midLine) {
process.stdout.write("\n");
midLine = false;
}
process.stdout.write(`\n[${source}] `);
lastSource = source;
}
process.stdout.write(message.text);
midLine = true;
}
} else if (mode === "custom") {
if (midLine) {
process.stdout.write("\n");
midLine = false;
}
console.log(`[${source}] custom event:`, data);
}
}
process.stdout.write("\n");
常见模式
跟踪子智能体生命周期
监控子智能体何时启动、运行和完成:for await (const [namespace, chunk] of await agent.stream(
{
messages: [
{ role: "user", content: "Research the latest AI safety developments" },
],
},
{ streamMode: "updates", subgraphs: true },
)) {
for (const [nodeName, data] of Object.entries(chunk)) {
// --- 阶段 1:检测子智能体启动 ---
// 当主智能体的 model_request 包含 task 工具调用时,
// 说明已生成子智能体。
if (namespace.length === 0 && nodeName === "model_request") {
for (const msg of (data as any).messages ?? []) {
for (const tc of msg.tool_calls ?? []) {
if (tc.name === "task") {
activeSubagents.set(tc.id, {
type: tc.args?.subagent_type,
description: tc.args?.description?.slice(0, 80),
status: "pending",
});
console.log(
`[lifecycle] PENDING → subagent "${tc.args?.subagent_type}" (${tc.id})`,
);
}
}
}
}
// --- 阶段 2:检测子智能体运行 ---
// 当我们从 tools:UUID 命名空间接收事件时,
// 该子智能体正在活跃执行。
if (namespace.length > 0 && namespace[0].startsWith("tools:")) {
const pregelId = namespace[0].split(":")[1];
for (const [id, sub] of activeSubagents) {
if (sub.status === "pending") {
sub.status = "running";
console.log(
`[lifecycle] RUNNING → subagent "${sub.type}" (pregel: ${pregelId})`,
);
break;
}
}
}
// --- 阶段 3:检测子智能体完成 ---
// 当主智能体的 tools 节点返回工具消息时,
// 子智能体已完成并返回其结果。
if (namespace.length === 0 && nodeName === "tools") {
for (const msg of (data as any).messages ?? []) {
if (msg.type === "tool") {
const subagent = activeSubagents.get(msg.tool_call_id);
if (subagent) {
subagent.status = "complete";
console.log(
`[lifecycle] COMPLETE → subagent "${subagent.type}" (${msg.tool_call_id})`,
);
console.log(
` Result preview: ${String(msg.content).slice(0, 120)}...`,
);
}
}
}
}
}
}
// 打印最终状态
console.log("\n--- Final subagent states ---");
for (const [id, sub] of activeSubagents) {
console.log(` ${sub.type}: ${sub.status}`);
}
相关资源
- 子智能体——配置和使用深度智能体中的子智能体
- 前端流式输出——使用
useStream为深度智能体构建 React UI - LangChain 流式输出概述——LangChain 智能体的通用流式输出概念
连接这些文档到 Claude、VSCode 等工具,通过 MCP 获取实时解答。

