Skip to main content

Streaming Responses

When you send a message to a session, the response is delivered as a Server-Sent Events (SSE) stream. Events arrive in real time as the agent thinks, uses tools, and generates text.

SSE Event Types

The stream carries three event types:

EventDescription
messageAn SDK message from the agent. Contains assistant text, tool use, tool results, or stream deltas.
errorAn error occurred during processing.
doneThe agent's turn is complete.

Each SSE frame has the format:

event: message
data: {"type": "assistant", "message": {"content": [{"type": "text", "text": "Hello!"}]}}

event: done
data: {"sessionId": "a1b2c3d4-..."}

The data field of message events carries raw SDK message objects passed through from the Claude Code SDK. The shape varies by message type (assistant, user, result, stream_event).

Basic Streaming

The sendMessageStream method returns an async generator of typed events:

import { AshClient } from '@ash-ai/sdk';
import { extractTextFromEvent, extractDisplayItems } from '@ash-ai/shared';

const client = new AshClient({ serverUrl: 'http://localhost:4100', apiKey: process.env.ASH_API_KEY });
const session = await client.createSession('my-agent');

for await (const event of client.sendMessageStream(session.id, 'Explain TCP in one paragraph.')) {
switch (event.type) {
case 'message': {
const text = extractTextFromEvent(event.data);
if (text) {
process.stdout.write(text);
}
break;
}
case 'error':
console.error('Error:', event.data.error);
break;
case 'done':
console.log('\nDone.');
break;
}
}

Display Items

For richer output that includes tool use and tool results, use extractDisplayItems:

for await (const event of client.sendMessageStream(session.id, 'List files in /tmp')) {
if (event.type === 'message') {
const items = extractDisplayItems(event.data);
if (items) {
for (const item of items) {
switch (item.type) {
case 'text':
console.log(item.content);
break;
case 'tool_use':
console.log(`[Tool: ${item.toolName}] ${item.toolInput}`);
break;
case 'tool_result':
console.log(`[Result] ${item.content}`);
break;
}
}
}
}
}

Partial Messages (Real-Time Streaming)

By default, message events contain complete SDK messages. To receive incremental text deltas as the agent types, enable includePartialMessages:

for await (const event of client.sendMessageStream(
session.id,
'Write a haiku about servers.',
{ includePartialMessages: true },
)) {
if (event.type === 'message') {
const delta = extractStreamDelta(event.data);
if (delta) {
process.stdout.write(delta); // Character-by-character streaming
}
}
}

The extractStreamDelta helper extracts text from content_block_delta stream events. It returns null for non-delta events, so you can safely call it on every message.

Browser (Raw Fetch)

For browser applications that do not use the SDK, parse the SSE stream directly with ReadableStream:

const response = await fetch('http://localhost:4100/api/sessions/SESSION_ID/messages', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer YOUR_API_KEY',
},
body: JSON.stringify({ content: 'Hello!' }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let currentEvent = '';

while (true) {
const { done, value } = await reader.read();
if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';

for (const line of lines) {
if (line.startsWith('event: ')) {
currentEvent = line.slice(7).trim();
} else if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
if (currentEvent === 'message') {
// Handle message
console.log(data);
} else if (currentEvent === 'done') {
console.log('Stream complete');
} else if (currentEvent === 'error') {
console.error(data.error);
}
}
}
}

Error Handling

Errors can arrive at two levels: connection errors (network failure, server restart) throw exceptions, and agent errors (sandbox crash, SDK error) arrive as error events within the stream. Handle both:

try {
for await (const event of client.sendMessageStream(sessionId, 'Hello')) {
if (event.type === 'message') {
const text = extractTextFromEvent(event.data);
if (text) process.stdout.write(text);
} else if (event.type === 'error') {
// Agent-level error (sandbox crash, OOM, SDK error)
console.error('Agent error:', event.data.error);
} else if (event.type === 'done') {
console.log('\nDone.');
}
}
} catch (err) {
// Connection-level error (network failure, server restart, 404)
console.error('Connection error:', err.message);
}

Reconnection with Retry

When an SSE stream disconnects (server restart, network blip, load balancer timeout), retry with exponential backoff. If the session's sandbox was destroyed, resume it before retrying.

import { AshClient, extractTextFromEvent } from '@ash-ai/sdk';

const client = new AshClient({
serverUrl: 'http://localhost:4100',
apiKey: process.env.ASH_API_KEY,
});

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function streamWithRetry(
sessionId: string,
content: string,
maxRetries = 3,
): Promise<string> {
let fullText = '';

for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
for await (const event of client.sendMessageStream(sessionId, content)) {
if (event.type === 'message') {
const text = extractTextFromEvent(event.data);
if (text) {
fullText += text;
process.stdout.write(text);
}
} else if (event.type === 'error') {
throw new Error(`Agent error: ${event.data.error}`);
}
}
return fullText; // Stream completed successfully
} catch (err) {
console.warn(`Stream attempt ${attempt + 1} failed: ${(err as Error).message}`);

if (attempt === maxRetries - 1) throw err;

// Check if the session needs recovery before retrying
try {
const session = await client.getSession(sessionId);
if (session.status === 'paused' || session.status === 'error') {
await client.resumeSession(sessionId);
console.log('Session resumed after disconnect');
}
} catch {
// Server might be temporarily unreachable — wait and retry
}

// Exponential backoff: 1s, 2s, 4s
await sleep(Math.pow(2, attempt) * 1000);
}
}

return fullText;
}

// Usage
const session = await client.createSession('my-agent');
const result = await streamWithRetry(session.id, 'Analyze this code');

Backpressure

Ash handles backpressure automatically on the server side. When your client reads the SSE stream slowly, the server pauses the upstream agent rather than buffering unbounded data in memory.

What this means for your client:

  • You do not need to implement client-side backpressure. Read the stream at whatever pace you can handle. If you process events slowly, the server waits.
  • Memory is bounded. The server buffers at most one SSE frame plus the kernel TCP send buffer (typically 128 KB - 1 MB). There is no application-level buffering.
  • Slow clients get disconnected after 30 seconds. If your client stops reading for more than 30 seconds, the server closes the stream with a timeout error. Reconnect and resume the session to continue.

See SSE Backpressure for the full server-side implementation.

Helper Functions Reference

The @ash-ai/shared package exports three helper functions for extracting content from stream events:

FunctionPurposeReturns
extractTextFromEvent(data)Extract text content from assistant messagesstring | null
extractDisplayItems(data)Extract structured items (text, tool use, tool results)DisplayItem[] | null
extractStreamDelta(data)Extract incremental text from partial stream eventsstring | null

All three accept the data field from a message event and return null for events that do not match their expected type.