Real-Time Streaming
Stream agent responses to users in real-time with progress indicators and cancellation
Users expect instant feedback. Streaming lets you display tokens as they arrive, show tool call progress, and cancel long-running requests. This guide covers production-ready streaming patterns for CLIs, SSE endpoints, and multi-turn sessions.
Basic streaming
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const agent = new Agent({ name: "writer", model });
const { stream: s, result } = stream(agent, "Write a haiku about TypeScript");
for await (const event of s) {
if (event.type === "content_delta") {
process.stdout.write(event.content);
}
}
const finalResult = await result;
console.log("\n\nTokens used:", finalResult.usage.totalTokens);The stream() function returns two things: an AsyncGenerator of events and a Promise that resolves to the final RunResult. You must drain the stream before awaiting the result.
Stream event types
Every streaming API in Stratus yields StreamEvent objects. There are five event types:
| Event | Fields | When it fires |
|---|---|---|
content_delta | content: string | Each chunk of text as the model generates it |
tool_call_start | toolCall: { id, name } | The model begins a tool call |
tool_call_delta | toolCallId, arguments | Incremental JSON fragments for tool call arguments |
tool_call_done | toolCallId | Tool call arguments are fully received |
done | response: ModelResponse | A single model call has completed |
Handle all five to build a complete streaming UI:
for await (const event of s) {
switch (event.type) {
case "content_delta":
process.stdout.write(event.content);
break;
case "tool_call_start":
console.log(`\n[tool] ${event.toolCall.name} started`);
break;
case "tool_call_delta":
// Accumulate arguments if you need to display them
break;
case "tool_call_done":
console.log(`[tool] ${event.toolCallId} done`);
break;
case "done":
console.log(`\n[model] Finish reason: ${event.response.finishReason}`);
break;
}
}When an agent uses tools, streaming events come from multiple model calls. Stratus handles the full tool loop - you see tool events from the first call, then content events from the final response. Each model call emits its own done event.
Streaming with tools
When the model calls tools, stream events interleave tool and content events across multiple model rounds. Here is a full example with a weather tool:
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, tool } from "stratus-sdk/core";
import { z } from "zod";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const getWeather = tool({
name: "get_weather",
description: "Get the current weather for a city",
parameters: z.object({
city: z.string().describe("City name"),
}),
execute: async (_ctx, { city }) => {
// Simulate API call
await new Promise((r) => setTimeout(r, 500));
return `72°F and sunny in ${city}`;
},
});
const agent = new Agent({
name: "weather_assistant",
model,
instructions: "You are a weather assistant. Use the get_weather tool to answer questions.",
tools: [getWeather],
});
const { stream: s, result } = stream(agent, "What's the weather in NYC and London?");
for await (const event of s) {
switch (event.type) {
case "content_delta":
process.stdout.write(event.content);
break;
case "tool_call_start":
process.stdout.write(`\n Calling ${event.toolCall.name}...`);
break;
case "tool_call_done":
process.stdout.write(" done\n");
break;
case "done":
// Fires once per model call - expect two: one for tool calls, one for final response
break;
}
}
const finalResult = await result;
console.log("\n\nFull response:", finalResult.output);The event sequence for a tool-using agent looks like this:
tool_call_start → tool_call_delta (x N) → tool_call_done ← first model call
tool_call_start → tool_call_delta (x N) → tool_call_done
done ← first model call ends
(Stratus executes tools)
content_delta (x N) ← second model call
done ← second model call endsBuilding a CLI streaming interface
This complete example builds a polished CLI that shows a spinner for tool calls and streams content with a typed effect:
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, tool } from "stratus-sdk/core";
import { z } from "zod";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const searchDocs = tool({
name: "search_docs",
description: "Search the documentation for relevant articles",
parameters: z.object({
query: z.string().describe("Search query"),
}),
execute: async (_ctx, { query }) => {
await new Promise((r) => setTimeout(r, 1000));
return JSON.stringify([
{ title: "Getting Started", snippet: "Install with npm..." },
{ title: "API Reference", snippet: "The Agent class..." },
]);
},
});
const agent = new Agent({
name: "docs_assistant",
model,
instructions: "You help users find information in our documentation. Use search_docs to look up answers.",
tools: [searchDocs],
});
async function cliStream(question: string) {
const { stream: s, result } = stream(agent, question);
const activeTools = new Map<string, string>();
for await (const event of s) {
switch (event.type) {
case "content_delta":
process.stdout.write(event.content);
break;
case "tool_call_start":
activeTools.set(event.toolCall.id, event.toolCall.name);
process.stderr.write(
`\x1b[90m ... ${event.toolCall.name}\x1b[0m\n`
);
break;
case "tool_call_done":
const name = activeTools.get(event.toolCallId) ?? "tool";
activeTools.delete(event.toolCallId);
process.stderr.write(
`\x1b[32m ✓ ${name} complete\x1b[0m\n`
);
break;
}
}
const finalResult = await result;
process.stderr.write(
`\x1b[90m\n[${finalResult.usage.totalTokens} tokens]\x1b[0m\n`
);
}
cliStream("How do I install the SDK?");Tool call progress goes to stderr so it does not mix with the streamed content on stdout. This lets you pipe the content output cleanly.
Server-Sent Events (SSE) endpoint
Stream agent responses to a frontend over HTTP using Server-Sent Events. This example uses Hono, but the pattern works with any framework:
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const agent = new Agent({
name: "assistant",
model,
instructions: "You are a helpful assistant.",
});
const app = new Hono();
app.post("/chat", async (c) => {
const { message } = await c.req.json<{ message: string }>();
const { stream: s } = stream(agent, message);
return streamSSE(c, async (sse) => {
for await (const event of s) {
switch (event.type) {
case "content_delta":
await sse.writeSSE({
event: "content",
data: JSON.stringify({ text: event.content }),
});
break;
case "tool_call_start":
await sse.writeSSE({
event: "tool_start",
data: JSON.stringify({ name: event.toolCall.name }),
});
break;
case "tool_call_done":
await sse.writeSSE({
event: "tool_done",
data: JSON.stringify({ id: event.toolCallId }),
});
break;
case "done":
await sse.writeSSE({
event: "done",
data: JSON.stringify({
usage: event.response.usage,
finishReason: event.response.finishReason,
}),
});
break;
}
}
});
});
export default app;import express from "express";
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const agent = new Agent({
name: "assistant",
model,
instructions: "You are a helpful assistant.",
});
const app = express();
app.use(express.json());
app.post("/chat", async (req, res) => {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
const { message } = req.body;
const { stream: s } = stream(agent, message);
for await (const event of s) {
switch (event.type) {
case "content_delta":
res.write(`event: content\ndata: ${JSON.stringify({ text: event.content })}\n\n`);
break;
case "tool_call_start":
res.write(`event: tool_start\ndata: ${JSON.stringify({ name: event.toolCall.name })}\n\n`);
break;
case "tool_call_done":
res.write(`event: tool_done\ndata: ${JSON.stringify({ id: event.toolCallId })}\n\n`);
break;
case "done":
res.write(`event: done\ndata: ${JSON.stringify({ usage: event.response.usage })}\n\n`);
break;
}
}
res.end();
});
app.listen(3000);On the frontend, consume the stream with EventSource or the fetch API:
const response = await fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: "Hello!" }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
// Parse SSE events from the text chunk
for (const line of text.split("\n")) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
// Update your UI with data.text
}
}
}Session streaming
Sessions combine multi-turn conversation history with streaming. Each send()/stream() cycle appends to the conversation, so the model sees everything from previous turns.
import { AzureResponsesModel } from "stratus-sdk";
import { createSession } from "stratus-sdk/core";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const session = createSession({
model,
instructions: "You are a concise assistant.",
});
// First turn
session.send("What is the capital of France?");
for await (const event of session.stream()) {
if (event.type === "content_delta") {
process.stdout.write(event.content);
}
}
console.log(); // newline
// Second turn - model remembers the first turn
session.send("And what about Germany?");
for await (const event of session.stream()) {
if (event.type === "content_delta") {
process.stdout.write(event.content);
}
}
const result = await session.result;
console.log("\nTokens:", result.usage.totalTokens);
console.log("Agent:", result.lastAgent.name);session.result gives you the RunResult for the most recent stream() call. You can only access it after fully consuming the stream.
Cancellation with AbortSignal
Pass an AbortSignal to cancel a stream mid-response. When aborted, the stream throws a RunAbortedError.
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, RunAbortedError } from "stratus-sdk/core";
const model = new AzureResponsesModel({
endpoint: process.env.AZURE_ENDPOINT!,
apiKey: process.env.AZURE_API_KEY!,
deployment: "gpt-5.2",
});
const agent = new Agent({ name: "writer", model });
const ac = new AbortController();
// Cancel after 3 seconds
setTimeout(() => ac.abort(), 3000);
const { stream: s, result } = stream(agent, "Write a 2000-word essay on TypeScript", {
signal: ac.signal,
});
try {
for await (const event of s) {
if (event.type === "content_delta") {
process.stdout.write(event.content);
}
}
} catch (error) {
if (error instanceof RunAbortedError) {
console.log("\n\nStream cancelled.");
} else {
throw error;
}
}The signal threads through to the model API call and any tool execute functions, so cancellation is immediate. The result promise also rejects with RunAbortedError when aborted.
For HTTP endpoints, cancel when the client disconnects:
app.post("/chat", async (req, res) => {
const ac = new AbortController();
req.on("close", () => ac.abort());
const { stream: s } = stream(agent, req.body.message, {
signal: ac.signal,
});
// ... stream events to response
});Accessing the final result
The stream() function returns { stream, result }. The result is a Promise<RunResult> that resolves after you fully consume the stream.
import { Agent, stream } from "stratus-sdk/core";
const agent = new Agent({ name: "assistant", model });
const { stream: s, result } = stream(agent, "Summarize quantum computing");
// Step 1: Drain the stream
for await (const event of s) {
if (event.type === "content_delta") {
process.stdout.write(event.content);
}
}
// Step 2: Await the result
const finalResult = await result;
console.log("\n");
console.log("Output:", finalResult.output);
console.log("Tokens:", finalResult.usage.totalTokens);
console.log("Agent:", finalResult.lastAgent.name);
console.log("Finish:", finalResult.finishReason);The RunResult contains:
| Property | Type | Description |
|---|---|---|
output | string | Raw text output from the model |
finalOutput | TOutput | Parsed structured output (if outputType is set) |
messages | ChatMessage[] | Full message history |
usage | UsageInfo | Accumulated token usage across all model calls |
lastAgent | Agent | The agent that produced the final response |
finishReason | string? | "stop", "tool_calls", etc. |
You must drain the stream before the result promise resolves. If you await result without consuming the stream, your program will hang.
Next steps
Last updated on