流式输出
流式输出让 Agent 在 LLM 边生成边把 token 推给用户,UX 上消除”等几秒空白”
的尴尬。这一页讲 StreamSession 的全部 API、加密语义、与 OpenAI/Anthropic
等流式 LLM 的集成方式、以及超时与中止处理。
StreamSession API
interface StreamSession { delta(text: string): Promise<void>; // 推送一段增量文本 done(finalPayload?: SendPayload): Promise<void>; // 完成;可附完整 payload abort(reason?: string): Promise<void>; // 主动中止 on(event: "ack" | "error", cb: (...) => void): void;}最小用法
const session = agent.createStreamSession(conversationId);// SDK 立即发送一个 stream.start 帧(含幂等 key)
await session.delta("你");await session.delta("好");await session.delta(",");await session.delta("我是");await session.delta(" Agent。");
await session.done({ type: "text", text: "你好,我是 Agent。", // 完整文本,用于服务端持久化});整个过程客户端看到的渲染:
t=0: 收到 stream.start → 显示空气泡 + 闪烁光标t=1: 收到 delta="你" → "你"t=2: 收到 delta="好" → "你好"t=3: 收到 delta="," → "你好,"t=4: 收到 delta="我是" → "你好,我是"t=5: 收到 delta=" Agent。" → "你好,我是 Agent。"t=6: 收到 stream.done → 光标停止,气泡定稿与 OpenAI 集成
import OpenAI from "openai";const openai = new OpenAI();
agent.addMessageHandler(async (msg) => { if (msg.payload?.type !== "text") return;
const session = agent.createStreamSession(msg.conversation_id);
try { const stream = await openai.chat.completions.create({ model: "gpt-4o-mini", messages: [{ role: "user", content: msg.payload.text }], stream: true, });
let full = ""; for await (const chunk of stream) { const delta = chunk.choices[0]?.delta?.content; if (delta) { full += delta; await session.delta(delta); } }
await session.done({ type: "text", text: full }); } catch (err) { await session.abort(`llm error: ${err.message}`); throw err; }});与 Anthropic 集成
import Anthropic from "@anthropic-ai/sdk";const anthropic = new Anthropic();
agent.addMessageHandler(async (msg) => { if (msg.payload?.type !== "text") return; const session = agent.createStreamSession(msg.conversation_id);
try { const stream = anthropic.messages.stream({ model: "claude-sonnet-4-6", max_tokens: 1024, messages: [{ role: "user", content: msg.payload.text }], });
let full = ""; for await (const event of stream) { if (event.type === "content_block_delta" && event.delta.type === "text_delta") { full += event.delta.text; await session.delta(event.delta.text); } }
await session.done({ type: "text", text: full }); } catch (err) { await session.abort(`llm error: ${err.message}`); throw err; }});加密语义
每个 delta 是一个独立加密帧:
- 每帧自己的 CEK(一次性)
- 自己的 Layer 2 wrap
- 自己的 Layer 3 签名
- 客户端逐帧解密 + 累加
为什么不复用同一 CEK?— 一致性安全:每帧独立 → 即使中间某帧密文损坏, 别的帧仍可解密;不会因一个 CEK 失效导致整段流崩溃。
最终 stream.done 帧带完整 payload —— 用于后端持久化(让历史聊天回看时
不需要重放所有 delta,直接显示完整消息)。
节奏控制
delta() 是 async 但不要紧接着 await——会 serialize 卡住整个流:
// ❌ 慢:每个 delta 等 50ms WS RTTfor (const c of fullText) { await session.delta(c);}
// ✅ 快:批量 batchlet buf = "";for (const c of fullText) { buf += c; if (buf.length >= 16) { void session.delta(buf); // 不 await buf = ""; }}if (buf) void session.delta(buf);await session.done({ type: "text", text: fullText });但太快也不好——客户端打字效果会瞬间结束,UX 不佳。SDK 内部对 delta 批量 合并(每 30ms 或 100 字符 flush),所以你可以放心高频 delta。
中止
LLM 错误 / 用户取消 / 业务中断 → 调 abort:
session.abort("user_cancelled");// SDK 发送 stream.abort 帧// 客户端收到后丢弃部分内容,恢复正常输入框中止后不要再调 delta / done(throw StreamAbortedError)。
超时
| 场景 | SDK 行为 |
|---|---|
| 5 分钟无 done | 服务端发 stream.error reason=stream_timeout;session 进入 abort 态 |
| 30 秒无 delta(孤儿检测) | 同上 |
| 客户端断开 / 重连 | session 状态保留,重连后客户端继续接收剩余 delta |
session.on("error", (err) => { console.warn("[stream] aborted by server:", err.reason);});群聊里的流式
V1 在大群(201-500 成员)禁用流式 —— 因为成员端到达顺序不可控, “边输入边渲染”对其他成员是干扰。
const session = agent.createStreamSession(conversationId);// 如果 conversation 是 large group → throw STREAM_DISABLED_LARGE_GROUP业务侧 fallback:用 agent.send 一次性发完整文本。
文本以外的流式
V1 仅支持 type: "text" 流式输出。Artifact / 文件 / 媒体 不支持流式
(artifact 自有 artifact_update 增量机制,文件用大对象 R2 一次性传输)。
stream.error 事件枚举
| reason | 触发 |
|---|---|
stream_timeout | 5 分钟无 done |
orphan_detected | 30 秒无 delta |
client_offline | 客户端长时间断开(V2 路线,V1 暂不主动判定) |
peer_revoked | 流式过程中关系被撤销 |
rate_limited | 触发流式限速 |
性能与限速
| 项 | 上限 |
|---|---|
| 单 stream 最大持续时间 | 5 分钟 |
| 单 stream 最大 delta 数 | 10000 |
| 同一 Agent 并发 stream 数 | 10 |
| delta 频率 | 100 / 秒(超过自动降频) |
演示视频
视频文字版逐节描述
- 00:00 – 00:08 — Hashee app 用户问”详细介绍下端到端加密”。Agent 终端
inbound text=...。 - 00:08 – 00:18 — Agent 调 OpenAI streaming API;终端 console 每 ~50ms
打印一次
delta sent: 16 chars。 - 00:18 – 00:35 — Hashee app 显示 “正在输入…” 紧接着出现回复气泡, 文字像打字机一样逐段浮现(每段约 30ms 间隔)。约 1500 字符的回复在 约 17 秒内全部出来。
- 00:35 – 00:42 — 最后一帧 done,光标消失,气泡定稿。终端
done sent, total 1487 chars。 - 00:42 – 00:55 — 演示中止:用户在 Agent 还在输出时按”取消”按钮(V1
目前没有 UI 按钮,需手动触发)。Agent 终端
received cancel signal, aborting stream。Hashee app 当前部分内容保留显示但灰色置 “已取消”。
下一步
- 发送回复 — 非流式发送
- Artifact 高级 — artifact_update 是另一种”流式” UX
- WebSocket 事件 —
stream.start/stream.delta/stream.done/stream.error帧定义