stratus
Guides

Real-Time Streaming

Stream agent responses to users in real-time with progress indicators and cancellation

Users expect instant feedback. Streaming lets you display tokens as they arrive, show tool call progress, and cancel long-running requests. This guide covers production-ready streaming patterns for CLIs, SSE endpoints, and multi-turn sessions.

Basic streaming

basic-stream.ts
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const agent = new Agent({ name: "writer", model });

const { stream: s, result } = stream(agent, "Write a haiku about TypeScript"); 

for await (const event of s) {
  if (event.type === "content_delta") {
    process.stdout.write(event.content); 
  }
}

const finalResult = await result;
console.log("\n\nTokens used:", finalResult.usage.totalTokens);

The stream() function returns two things: an AsyncGenerator of events and a Promise that resolves to the final RunResult. You must drain the stream before awaiting the result.

Stream event types

Every streaming API in Stratus yields StreamEvent objects. There are five event types:

EventFieldsWhen it fires
content_deltacontent: stringEach chunk of text as the model generates it
tool_call_starttoolCall: { id, name }The model begins a tool call
tool_call_deltatoolCallId, argumentsIncremental JSON fragments for tool call arguments
tool_call_donetoolCallIdTool call arguments are fully received
doneresponse: ModelResponseA single model call has completed

Handle all five to build a complete streaming UI:

all-events.ts
for await (const event of s) {
  switch (event.type) {
    case "content_delta":
      process.stdout.write(event.content);
      break;
    case "tool_call_start":
      console.log(`\n[tool] ${event.toolCall.name} started`);
      break;
    case "tool_call_delta":
      // Accumulate arguments if you need to display them
      break;
    case "tool_call_done":
      console.log(`[tool] ${event.toolCallId} done`);
      break;
    case "done":
      console.log(`\n[model] Finish reason: ${event.response.finishReason}`); 
      break;
  }
}

When an agent uses tools, streaming events come from multiple model calls. Stratus handles the full tool loop - you see tool events from the first call, then content events from the final response. Each model call emits its own done event.

Streaming with tools

When the model calls tools, stream events interleave tool and content events across multiple model rounds. Here is a full example with a weather tool:

tool-stream.ts
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, tool } from "stratus-sdk/core";
import { z } from "zod";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const getWeather = tool({
  name: "get_weather",
  description: "Get the current weather for a city",
  parameters: z.object({
    city: z.string().describe("City name"),
  }),
  execute: async (_ctx, { city }) => {
    // Simulate API call
    await new Promise((r) => setTimeout(r, 500));
    return `72°F and sunny in ${city}`;
  },
});

const agent = new Agent({
  name: "weather_assistant",
  model,
  instructions: "You are a weather assistant. Use the get_weather tool to answer questions.",
  tools: [getWeather],
});

const { stream: s, result } = stream(agent, "What's the weather in NYC and London?");

for await (const event of s) {
  switch (event.type) {
    case "content_delta":
      process.stdout.write(event.content);
      break;
    case "tool_call_start":
      process.stdout.write(`\n  Calling ${event.toolCall.name}...`); 
      break;
    case "tool_call_done":
      process.stdout.write(" done\n"); 
      break;
    case "done":
      // Fires once per model call - expect two: one for tool calls, one for final response
      break;
  }
}

const finalResult = await result;
console.log("\n\nFull response:", finalResult.output);

The event sequence for a tool-using agent looks like this:

tool_call_start  → tool_call_delta (x N) → tool_call_done   ← first model call
tool_call_start  → tool_call_delta (x N) → tool_call_done
done                                                          ← first model call ends
                                                               (Stratus executes tools)
content_delta (x N)                                           ← second model call
done                                                          ← second model call ends

Building a CLI streaming interface

This complete example builds a polished CLI that shows a spinner for tool calls and streams content with a typed effect:

cli-stream.ts
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, tool } from "stratus-sdk/core";
import { z } from "zod";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const searchDocs = tool({
  name: "search_docs",
  description: "Search the documentation for relevant articles",
  parameters: z.object({
    query: z.string().describe("Search query"),
  }),
  execute: async (_ctx, { query }) => {
    await new Promise((r) => setTimeout(r, 1000));
    return JSON.stringify([
      { title: "Getting Started", snippet: "Install with npm..." },
      { title: "API Reference", snippet: "The Agent class..." },
    ]);
  },
});

const agent = new Agent({
  name: "docs_assistant",
  model,
  instructions: "You help users find information in our documentation. Use search_docs to look up answers.",
  tools: [searchDocs],
});

async function cliStream(question: string) {
  const { stream: s, result } = stream(agent, question);

  const activeTools = new Map<string, string>(); 

  for await (const event of s) {
    switch (event.type) {
      case "content_delta":
        process.stdout.write(event.content);
        break;

      case "tool_call_start":
        activeTools.set(event.toolCall.id, event.toolCall.name);
        process.stderr.write(
          `\x1b[90m  ... ${event.toolCall.name}\x1b[0m\n`
        );
        break;

      case "tool_call_done":
        const name = activeTools.get(event.toolCallId) ?? "tool";
        activeTools.delete(event.toolCallId);
        process.stderr.write(
          `\x1b[32m  ✓ ${name} complete\x1b[0m\n`
        );
        break;
    }
  }

  const finalResult = await result;
  process.stderr.write(
    `\x1b[90m\n[${finalResult.usage.totalTokens} tokens]\x1b[0m\n`
  );
}

cliStream("How do I install the SDK?");

Tool call progress goes to stderr so it does not mix with the streamed content on stdout. This lets you pipe the content output cleanly.

Server-Sent Events (SSE) endpoint

Stream agent responses to a frontend over HTTP using Server-Sent Events. This example uses Hono, but the pattern works with any framework:

sse-hono.ts
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const agent = new Agent({
  name: "assistant",
  model,
  instructions: "You are a helpful assistant.",
});

const app = new Hono();

app.post("/chat", async (c) => {
  const { message } = await c.req.json<{ message: string }>();
  const { stream: s } = stream(agent, message);

  return streamSSE(c, async (sse) => { 
    for await (const event of s) {
      switch (event.type) {
        case "content_delta":
          await sse.writeSSE({ 
            event: "content",
            data: JSON.stringify({ text: event.content }),
          });
          break;
        case "tool_call_start":
          await sse.writeSSE({
            event: "tool_start",
            data: JSON.stringify({ name: event.toolCall.name }),
          });
          break;
        case "tool_call_done":
          await sse.writeSSE({
            event: "tool_done",
            data: JSON.stringify({ id: event.toolCallId }),
          });
          break;
        case "done":
          await sse.writeSSE({
            event: "done",
            data: JSON.stringify({
              usage: event.response.usage,
              finishReason: event.response.finishReason,
            }),
          });
          break;
      }
    }
  });
});

export default app;
sse-express.ts
import express from "express";
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream } from "stratus-sdk/core";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const agent = new Agent({
  name: "assistant",
  model,
  instructions: "You are a helpful assistant.",
});

const app = express();
app.use(express.json());

app.post("/chat", async (req, res) => {
  res.setHeader("Content-Type", "text/event-stream"); 
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  const { message } = req.body;
  const { stream: s } = stream(agent, message);

  for await (const event of s) {
    switch (event.type) {
      case "content_delta":
        res.write(`event: content\ndata: ${JSON.stringify({ text: event.content })}\n\n`); 
        break;
      case "tool_call_start":
        res.write(`event: tool_start\ndata: ${JSON.stringify({ name: event.toolCall.name })}\n\n`);
        break;
      case "tool_call_done":
        res.write(`event: tool_done\ndata: ${JSON.stringify({ id: event.toolCallId })}\n\n`);
        break;
      case "done":
        res.write(`event: done\ndata: ${JSON.stringify({ usage: event.response.usage })}\n\n`);
        break;
    }
  }

  res.end();
});

app.listen(3000);

On the frontend, consume the stream with EventSource or the fetch API:

client.ts
const response = await fetch("/chat", {
  method: "POST",
  headers: { "Content-Type": "application/json" },
  body: JSON.stringify({ message: "Hello!" }),
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  // Parse SSE events from the text chunk
  for (const line of text.split("\n")) {
    if (line.startsWith("data: ")) {
      const data = JSON.parse(line.slice(6));
      // Update your UI with data.text
    }
  }
}

Session streaming

Sessions combine multi-turn conversation history with streaming. Each send()/stream() cycle appends to the conversation, so the model sees everything from previous turns.

session-stream.ts
import { AzureResponsesModel } from "stratus-sdk";
import { createSession } from "stratus-sdk/core";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const session = createSession({
  model,
  instructions: "You are a concise assistant.",
});

// First turn
session.send("What is the capital of France?"); 
for await (const event of session.stream()) {
  if (event.type === "content_delta") {
    process.stdout.write(event.content);
  }
}
console.log(); // newline

// Second turn - model remembers the first turn
session.send("And what about Germany?"); 
for await (const event of session.stream()) {
  if (event.type === "content_delta") {
    process.stdout.write(event.content);
  }
}

const result = await session.result; 
console.log("\nTokens:", result.usage.totalTokens);
console.log("Agent:", result.lastAgent.name);

session.result gives you the RunResult for the most recent stream() call. You can only access it after fully consuming the stream.

Cancellation with AbortSignal

Pass an AbortSignal to cancel a stream mid-response. When aborted, the stream throws a RunAbortedError.

abort-stream.ts
import { AzureResponsesModel } from "stratus-sdk";
import { Agent, stream, RunAbortedError } from "stratus-sdk/core";

const model = new AzureResponsesModel({
  endpoint: process.env.AZURE_ENDPOINT!,
  apiKey: process.env.AZURE_API_KEY!,
  deployment: "gpt-5.2",
});

const agent = new Agent({ name: "writer", model });

const ac = new AbortController();

// Cancel after 3 seconds
setTimeout(() => ac.abort(), 3000); 

const { stream: s, result } = stream(agent, "Write a 2000-word essay on TypeScript", {
  signal: ac.signal, 
});

try {
  for await (const event of s) {
    if (event.type === "content_delta") {
      process.stdout.write(event.content);
    }
  }
} catch (error) {
  if (error instanceof RunAbortedError) { 
    console.log("\n\nStream cancelled.");
  } else {
    throw error;
  }
}

The signal threads through to the model API call and any tool execute functions, so cancellation is immediate. The result promise also rejects with RunAbortedError when aborted.

For HTTP endpoints, cancel when the client disconnects:

abort-on-disconnect.ts
app.post("/chat", async (req, res) => {
  const ac = new AbortController();
  req.on("close", () => ac.abort()); 

  const { stream: s } = stream(agent, req.body.message, {
    signal: ac.signal,
  });

  // ... stream events to response
});

Accessing the final result

The stream() function returns { stream, result }. The result is a Promise<RunResult> that resolves after you fully consume the stream.

stream-result.ts
import { Agent, stream } from "stratus-sdk/core";

const agent = new Agent({ name: "assistant", model });
const { stream: s, result } = stream(agent, "Summarize quantum computing");

// Step 1: Drain the stream
for await (const event of s) { 
  if (event.type === "content_delta") {
    process.stdout.write(event.content);
  }
}

// Step 2: Await the result
const finalResult = await result; 

console.log("\n");
console.log("Output:", finalResult.output);
console.log("Tokens:", finalResult.usage.totalTokens);
console.log("Agent:", finalResult.lastAgent.name);
console.log("Finish:", finalResult.finishReason);

The RunResult contains:

PropertyTypeDescription
outputstringRaw text output from the model
finalOutputTOutputParsed structured output (if outputType is set)
messagesChatMessage[]Full message history
usageUsageInfoAccumulated token usage across all model calls
lastAgentAgentThe agent that produced the final response
finishReasonstring?"stop", "tool_calls", etc.

You must drain the stream before the result promise resolves. If you await result without consuming the stream, your program will hang.

Next steps

Edit on GitHub

Last updated on

On this page