跳转到内容

流式输出

流式输出让 Agent 在 LLM 边生成边把 token 推给用户,UX 上消除”等几秒空白” 的尴尬。这一页讲 StreamSession 的全部 API、加密语义、与 OpenAI/Anthropic 等流式 LLM 的集成方式、以及超时与中止处理。

StreamSession API

interface StreamSession {
delta(text: string): Promise<void>; // 推送一段增量文本
done(finalPayload?: SendPayload): Promise<void>; // 完成;可附完整 payload
abort(reason?: string): Promise<void>; // 主动中止
on(event: "ack" | "error", cb: (...) => void): void;
}

最小用法

const session = agent.createStreamSession(conversationId);
// SDK 立即发送一个 stream.start 帧(含幂等 key)
await session.delta("");
await session.delta("");
await session.delta("");
await session.delta("我是");
await session.delta(" Agent。");
await session.done({
type: "text",
text: "你好,我是 Agent。", // 完整文本,用于服务端持久化
});

整个过程客户端看到的渲染:

t=0: 收到 stream.start → 显示空气泡 + 闪烁光标
t=1: 收到 delta="你" → "你"
t=2: 收到 delta="好" → "你好"
t=3: 收到 delta="," → "你好,"
t=4: 收到 delta="我是" → "你好,我是"
t=5: 收到 delta=" Agent。" → "你好,我是 Agent。"
t=6: 收到 stream.done → 光标停止,气泡定稿

与 OpenAI 集成

import OpenAI from "openai";
const openai = new OpenAI();
agent.addMessageHandler(async (msg) => {
if (msg.payload?.type !== "text") return;
const session = agent.createStreamSession(msg.conversation_id);
try {
const stream = await openai.chat.completions.create({
model: "gpt-4o-mini",
messages: [{ role: "user", content: msg.payload.text }],
stream: true,
});
let full = "";
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
full += delta;
await session.delta(delta);
}
}
await session.done({ type: "text", text: full });
} catch (err) {
await session.abort(`llm error: ${err.message}`);
throw err;
}
});

与 Anthropic 集成

import Anthropic from "@anthropic-ai/sdk";
const anthropic = new Anthropic();
agent.addMessageHandler(async (msg) => {
if (msg.payload?.type !== "text") return;
const session = agent.createStreamSession(msg.conversation_id);
try {
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: msg.payload.text }],
});
let full = "";
for await (const event of stream) {
if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
full += event.delta.text;
await session.delta(event.delta.text);
}
}
await session.done({ type: "text", text: full });
} catch (err) {
await session.abort(`llm error: ${err.message}`);
throw err;
}
});

加密语义

每个 delta 是一个独立加密帧

  • 每帧自己的 CEK(一次性)
  • 自己的 Layer 2 wrap
  • 自己的 Layer 3 签名
  • 客户端逐帧解密 + 累加

为什么不复用同一 CEK?— 一致性安全:每帧独立 → 即使中间某帧密文损坏, 别的帧仍可解密;不会因一个 CEK 失效导致整段流崩溃。

最终 stream.done 帧带完整 payload —— 用于后端持久化(让历史聊天回看时 不需要重放所有 delta,直接显示完整消息)。

节奏控制

delta() 是 async 但不要紧接着 await——会 serialize 卡住整个流:

// ❌ 慢:每个 delta 等 50ms WS RTT
for (const c of fullText) {
await session.delta(c);
}
// ✅ 快:批量 batch
let buf = "";
for (const c of fullText) {
buf += c;
if (buf.length >= 16) {
void session.delta(buf); // 不 await
buf = "";
}
}
if (buf) void session.delta(buf);
await session.done({ type: "text", text: fullText });

但太快也不好——客户端打字效果会瞬间结束,UX 不佳。SDK 内部对 delta 批量 合并(每 30ms 或 100 字符 flush),所以你可以放心高频 delta。

中止

LLM 错误 / 用户取消 / 业务中断 → 调 abort

session.abort("user_cancelled");
// SDK 发送 stream.abort 帧
// 客户端收到后丢弃部分内容,恢复正常输入框

中止后不要再调 delta / done(throw StreamAbortedError)。

超时

场景SDK 行为
5 分钟无 done服务端发 stream.error reason=stream_timeout;session 进入 abort 态
30 秒无 delta(孤儿检测)同上
客户端断开 / 重连session 状态保留,重连后客户端继续接收剩余 delta
session.on("error", (err) => {
console.warn("[stream] aborted by server:", err.reason);
});

群聊里的流式

V1 在大群(201-500 成员)禁用流式 —— 因为成员端到达顺序不可控, “边输入边渲染”对其他成员是干扰。

const session = agent.createStreamSession(conversationId);
// 如果 conversation 是 large group → throw STREAM_DISABLED_LARGE_GROUP

业务侧 fallback:用 agent.send 一次性发完整文本。

文本以外的流式

V1 仅支持 type: "text" 流式输出。Artifact / 文件 / 媒体 不支持流式 (artifact 自有 artifact_update 增量机制,文件用大对象 R2 一次性传输)。

stream.error 事件枚举

reason触发
stream_timeout5 分钟无 done
orphan_detected30 秒无 delta
client_offline客户端长时间断开(V2 路线,V1 暂不主动判定)
peer_revoked流式过程中关系被撤销
rate_limited触发流式限速

性能与限速

上限
单 stream 最大持续时间5 分钟
单 stream 最大 delta 数10000
同一 Agent 并发 stream 数10
delta 频率100 / 秒(超过自动降频)

演示视频

视频文字版逐节描述
  • 00:00 – 00:08 — Hashee app 用户问”详细介绍下端到端加密”。Agent 终端 inbound text=...
  • 00:08 – 00:18 — Agent 调 OpenAI streaming API;终端 console 每 ~50ms 打印一次 delta sent: 16 chars
  • 00:18 – 00:35 — Hashee app 显示 “正在输入…” 紧接着出现回复气泡, 文字像打字机一样逐段浮现(每段约 30ms 间隔)。约 1500 字符的回复在 约 17 秒内全部出来。
  • 00:35 – 00:42 — 最后一帧 done,光标消失,气泡定稿。终端 done sent, total 1487 chars
  • 00:42 – 00:55 — 演示中止:用户在 Agent 还在输出时按”取消”按钮(V1 目前没有 UI 按钮,需手动触发)。Agent 终端 received cancel signal, aborting stream。Hashee app 当前部分内容保留显示但灰色置 “已取消”。

下一步