Skip to main content

Documentation Index

Fetch the complete documentation index at: https://nvd-54.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

消息队列允许用户在不等待智能体处理完当前消息的情况下快速连续发送多条消息。每条消息在服务器端排队并按顺序处理,你可以完全查看和控制待处理队列。
This feature requires the LangGraph Agent Server. Run your agent locally with langgraph dev or deploy it to LangSmith to use this pattern.

为什么需要消息队列?

在典型的聊天界面中,用户必须等待智能体回复完毕后才能发送下一条消息。这在多个场景中造成了不便:
  • 批量提问:用户想一次性提出五个相关问题,而不是逐个等待回答
  • 追问链:在智能体仍在工作时提交澄清或额外上下文
  • 自动化测试序列:以编程方式发送一系列提示词来验证智能体行为
  • 数据输入工作流:逐个提供结构化输入进行处理
消息队列通过立即接受所有提交并按顺序处理来解决此问题。

工作原理

底层,LangGraph 使用 multitaskStrategy: "enqueue" 来管理并发提交。当智能体正在处理时提交消息,新消息会被添加到服务器端队列。当前运行完成后,下一条排队的消息会自动被处理。 useStream hook 暴露了一个 queue 属性,提供对待处理消息的实时可见性:
属性类型描述
queue.entriesQueueEntry[]所有待处理队列条目的数组
queue.sizenumber当前队列中的条目数量
queue.cancel(id)(id: string) => Promise<void>按 ID 取消特定的排队条目
queue.clear()() => Promise<void>取消所有排队条目
每个 QueueEntry 对象包含:
字段类型描述
idstring此队列条目的唯一标识符
valuesobject提交的输入值(包括消息)
optionsobject提交时传递的任何附加选项
createdAtstring条目创建的 ISO 时间戳

设置 useStream

定义一个与你的智能体状态 schema 匹配的 TypeScript 接口,并将其作为类型参数传递给 useStream,以获得类型安全的状态值访问。在以下示例中,将 typeof myAgent 替换为你的接口名称:
import type { BaseMessage } from "@langchain/core/messages";

interface AgentState {
  messages: BaseMessage[];
}
import { useStream } from "@langchain/react";

function Chat() {
  const stream = useStream<typeof myAgent>({
    apiUrl: "http://localhost:2024",
    assistantId: "message_queue",
  });

  const handleSubmit = (text: string) => {
    stream.submit({
      messages: [{ type: "human", content: text }],
    });
  };

  // 访问队列状态
  const pendingCount = stream.queue.size;
  const entries = stream.queue.entries;

  return (
    <div>
      <MessageList messages={stream.messages} />
      {pendingCount > 0 && <QueueList entries={entries} queue={stream.queue} />}
      <ChatInput onSubmit={handleSubmit} />
    </div>
  );
}

显示队列

构建一个 QueueList 组件,显示每条待处理消息和一个取消按钮。这让用户可以看到什么在等待,并能移除不再需要的项目。
function QueueList({ entries, queue }) {
  return (
    <div className="queue-panel">
      <div className="queue-header">
        <span>排队的消息 ({entries.length})</span>
        <button onClick={() => queue.clear()}>全部清除</button>
      </div>
      <ul className="queue-entries">
        {entries.map((entry) => {
          const text = entry.values?.messages?.[0]?.content ?? "未知";
          return (
            <li key={entry.id} className="queue-entry">
              <span className="queue-text">{text}</span>
              <span className="queue-time">
                {new Date(entry.createdAt).toLocaleTimeString()}
              </span>
              <button
                className="queue-cancel"
                onClick={() => queue.cancel(entry.id)}
              >
                取消
              </button>
            </li>
          );
        })}
      </ul>
    </div>
  );
}
显示每条排队消息的前几个字符作为预览,这样用户可以快速识别要取消哪些项目,而无需阅读完整消息。

取消排队消息

你有两个级别的取消操作:

取消单个条目

按 ID 从队列中移除特定消息。智能体会跳过它并处理下一个条目。
await queue.cancel(entryId);

清除整个队列

一次性移除所有待处理消息。适用于用户更改上下文或想重新开始的场景。
await queue.clear();
取消队列条目只影响尚未开始处理的消息。如果智能体已经在处理某条消息,从队列中取消它不会产生效果。使用 stream.stop() 中断当前运行。

使用 onCreated 链接追问提交

onCreated 回调在新运行创建时触发,为你提供了以编程方式提交追问消息的钩子。这对于构建多步工作流很有用,其中下一个问题取决于前一个提交被接受。
stream.submit(
  { messages: [{ type: "human", content: "什么是量子计算?" }] },
  {
    onCreated(run) {
      console.log("运行已创建:", run.run_id);
      // 链接一个追问
      stream.submit({
        messages: [{ type: "human", content: "给我一个简单的类比。" }],
      });
    },
  }
);
这种模式自然地填充队列。第一条消息立即开始处理,追问排在其后面。

开始新线程

当用户想要开始全新的对话时,使用 switchThread(null) 创建新线程。这会清除当前的消息历史和队列。
function NewThreadButton() {
  const stream = useStream<typeof myAgent>({ /* ... */ });

  return (
    <button onClick={() => stream.switchThread(null)}>
      新对话
    </button>
  );
}

完整示例

将所有内容整合在一起,这是一个带有队列管理的完整聊天组件:
function QueueChat() {
  const stream = useStream<typeof myAgent>({
    apiUrl: "http://localhost:2024",
    assistantId: "message_queue",
  });

  const [input, setInput] = useState("");

  const handleSubmit = () => {
    if (!input.trim()) return;
    stream.submit({
      messages: [{ type: "human", content: input.trim() }],
    });
    setInput("");
  };

  return (
    <div className="chat-container">
      <header>
        <h2>队列聊天</h2>
        <button onClick={() => stream.switchThread(null)}>新线程</button>
      </header>

      <div className="messages">
        {stream.messages.map((msg, i) => (
          <MessageBubble key={i} message={msg} />
        ))}
        {stream.isLoading && <TypingIndicator />}
      </div>

      {stream.queue.size > 0 && (
        <div className="queue-panel">
          <strong>排队中 ({stream.queue.size})</strong>
          <button onClick={() => stream.queue.clear()}>全部清除</button>
          {stream.queue.entries.map((entry) => (
            <div key={entry.id} className="queue-item">
              <span>{entry.values?.messages?.[0]?.content}</span>
              <button onClick={() => stream.queue.cancel(entry.id)}>×</button>
            </div>
          ))}
        </div>
      )}

      <form onSubmit={(e) => { e.preventDefault(); handleSubmit(); }}>
        <input
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="输入消息(可以连续发送多条!)"
        />
        <button type="submit">发送</button>
      </form>
    </div>
  );
}

最佳实践

  • 限制队列大小:虽然客户端没有硬性的队列大小限制,但请注意非常大的队列会降低用户体验。考虑在队列超过合理阈值(如 10 个项目)时显示警告。
  • 显示队列位置:为每个排队项目编号,让用户知道处理顺序。
  • 保持输入焦点:提交后保持输入字段聚焦,使用户可以立即输入下一条消息。
  • 添加过渡动画:在项目从队列面板移入消息列表时进行平滑动画处理。
  • 优雅处理错误:如果排队消息处理失败,显示错误而不阻塞后续队列条目。
  • 对快速提交进行防抖:对于自动化或编程提交,在消息之间添加小延迟以避免服务器过载。