Claude API Streaming: Build Real-Time AI Responses in Python and TypeScript (2026)
Complete tutorial on Claude API streaming with Server-Sent Events. Build real-time AI responses in Python, TypeScript, FastAPI, and Next.js with production-ready code.
Claude API Streaming: Build Real-Time AI Responses in Python and TypeScript
Nothing kills an AI-powered app's UX faster than a blank screen with a spinner. Your users stare at nothing for 10–20 seconds, then wonder if something broke — even when the model is generating brilliant output the entire time.
Streaming fixes this by sending each token as it's generated, so the response appears word-by-word, just like ChatGPT and Claude.ai do it natively. This tutorial walks through every level: the protocol, the SDK helpers, a production FastAPI endpoint, and a Next.js streaming route — all using Claude's API.
What Is Streaming and Why It Matters
By default, Claude's API waits until the entire response is complete before returning it. For a 500-word response, that can be 15–25 seconds of silence. Streaming switches to server-sent events (SSE) — a one-way HTTP channel where the server pushes incremental chunks to the client as they're generated.
The user experience difference is dramatic:
| Mode | Perceived wait time (500-word response) | First-word latency |
|---|---|---|
| Default (batch) | 15–25 seconds | 15–25 seconds |
| Streaming | < 1 second | 300–800ms |
Streaming doesn't make Claude faster at generating tokens — it just stops hiding the work while it's happening. For most chat, summarization, and document analysis use cases, this alone makes your app feel 10× more responsive.
Streaming is also required for:
- Real-time chatbots — users expect to see text appear as it's typed
- Long-form generation — articles, reports, code files where 30-second silences are unacceptable
- Progressive disclosure — show early parts of an answer so users can start reading before generation finishes
- Cost observability — inspect token counts mid-stream before the full response commits
How Claude Streaming Works: The SSE Protocol
Claude's streaming API follows the Server-Sent Events specification. Each event is a data: line followed by a blank line:
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", world"}}
data: {"type":"message_stop"}The main event types you need to handle:
| Event type | When it fires | Contains |
|---|---|---|
message_start | Once at the start | Full Message object with empty content |
content_block_start | Before each content block | Block index, block type (text or tool_use) |
content_block_delta | Each token or JSON chunk | text_delta (text) or input_json_delta (tool calls) |
content_block_stop | End of each block | Block index |
message_delta | End of message | Final stop_reason, accumulated usage stats |
message_stop | Terminates the stream | Nothing — just signals completion |
The SDKs handle all of this for you — you rarely need to parse raw SSE unless you're building a proxy or a low-level integration.
Python Streaming Tutorial
Installation
bashpip install anthropicBasic streaming with the context manager
The cleanest way to stream in Python is with client.messages.stream(), which returns a context manager that exposes .text_stream — a simple iterator over text chunks:
pythonimport anthropic
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from environment
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain how transformers work in 5 paragraphs."}
],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# After the context manager closes, get the full final message:
final_message = stream.get_final_message()
print(f"\n\nInput tokens: {final_message.usage.input_tokens}")
print(f"Output tokens: {final_message.usage.output_tokens}")The flush=True is important — without it, Python's output buffering can hold chunks and defeat the purpose of streaming.
Streaming with raw events for advanced control
If you need to react to tool calls, track block boundaries, or handle thinking blocks, iterate over raw events instead:
pythonimport anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": "Write a Python class for a binary search tree."}],
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_stop":
print("\n[Stream complete]")Async streaming for production services
In async contexts (FastAPI, async Django, etc.), use the async client:
pythonimport asyncio
import anthropic
async def stream_response(prompt: str):
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text
final = await stream.get_final_message()
return final
# Usage
async def main():
async for chunk in stream_response("Summarize the history of machine learning"):
print(chunk, end="", flush=True)
asyncio.run(main())TypeScript Streaming Tutorial
Installation
bashnpm install @anthropic-ai/sdkBasic streaming
typescriptimport Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic(); // reads ANTHROPIC_API_KEY from environment
async function streamResponse() {
const stream = await client.messages.create({
model: "claude-sonnet-4-6",
max_tokens: 1024,
stream: true,
messages: [
{
role: "user",
content: "Write a TypeScript function to debounce any async function.",
},
],
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
process.stdout.write(event.delta.text);
}
}
console.log("\n[Done]");
}
streamResponse().catch(console.error);Using the helper stream class
The SDK also provides a stream() helper with convenience methods:
typescriptimport Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function streamWithHelper() {
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: "Explain async/await in JavaScript." }],
});
// Listen to text events
stream.on("text", (text) => {
process.stdout.write(text);
});
// Wait for completion and get the full message
const finalMessage = await stream.finalMessage();
console.log(`\nTotal tokens: ${finalMessage.usage.output_tokens}`);
}
streamWithHelper();The .on("text", ...) pattern is useful when you want to process text as it arrives without managing an async iterator manually — for example, piping chunks into a WebSocket or a UI state update.
Building a Streaming API Endpoint with FastAPI
This is the pattern you'll use in a real Python backend — expose a /chat endpoint that proxies Claude's stream to your frontend as SSE:
pythonfrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import json
app = FastAPI()
client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
system: str = "You are a helpful AI assistant."
async def claude_stream_generator(message: str, system: str):
"""Generator that yields SSE-formatted events from Claude."""
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
system=system,
messages=[{"role": "user", "content": message}],
) as stream:
async for text in stream.text_stream:
# Format as SSE
data = json.dumps({"text": text})
yield f"data: {data}\n\n"
# Signal completion
yield "data: [DONE]\n\n"
@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
return StreamingResponse(
claude_stream_generator(request.message, request.system),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
"Connection": "keep-alive",
},
)X-Accel-Buffering: no tells nginx (and similar reverse proxies) not to buffer the response. Without it, your SSE chunks may be held and flushed in batches, losing the streaming effect entirely.
Consuming SSE in the browser
javascriptasync function streamChat(message) {
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let outputEl = document.getElementById("output");
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
outputEl.textContent += parsed.text;
} catch {
// Ignore malformed chunks
}
}
}
}
}Building a Streaming Route in Next.js (App Router)
Next.js 15's App Router has first-class streaming support via ReadableStream. Here's a production-ready API route:
typescript// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { NextRequest } from "next/server";
const anthropic = new Anthropic();
export async function POST(request: NextRequest) {
const { message, system } = await request.json();
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
try {
const claudeStream = anthropic.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 2048,
system: system ?? "You are a helpful assistant.",
messages: [{ role: "user", content: message }],
});
claudeStream.on("text", (text) => {
const data = `data: ${JSON.stringify({ text })}\n\n`;
controller.enqueue(encoder.encode(data));
});
await claudeStream.finalMessage();
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
controller.close();
} catch (error) {
controller.error(error);
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}React hook for consuming the stream
typescript// hooks/useClaudeStream.ts
import { useState, useCallback } from "react";
export function useClaudeStream() {
const [text, setText] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (message: string) => {
setText("");
setIsStreaming(true);
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
for (const line of chunk.split("\n")) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") { setIsStreaming(false); return; }
try {
const { text: chunk } = JSON.parse(data);
setText((prev) => prev + chunk);
} catch { /* ignore */ }
}
}
setIsStreaming(false);
}, []);
return { text, isStreaming, sendMessage };
}Production Considerations
Timeout configuration
This is the most common production pitfall. Default HTTP timeouts (30–60 seconds) will abort Claude mid-response on long generations. Configure your client accordingly:
python# Python
client = anthropic.AsyncAnthropic(
timeout=300.0 # 5 minutes for long documents
)typescript// TypeScript
const client = new Anthropic({
timeout: 300_000, // 5 minutes in milliseconds
});For FastAPI behind nginx, also set:
nginxproxy_read_timeout 300s;
proxy_send_timeout 300s;Error handling and reconnection
Streams can fail mid-response due to network issues. The SDK throws an anthropic.APIStatusError on HTTP errors and anthropic.APIConnectionError on network failures. Implement a retry wrapper for production:
pythonimport asyncio
from anthropic import APIConnectionError, APIStatusError
async def stream_with_retry(prompt: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield text
return # Success — exit retry loop
except APIConnectionError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
except APIStatusError as e:
if e.status_code == 529: # Overloaded
await asyncio.sleep(5)
else:
raiseToken usage and cost tracking
Streaming includes token usage in the message_delta event near the end. Access it after the stream completes:
pythonwith client.messages.stream(...) as stream:
for text in stream.text_stream:
yield text
final = stream.get_final_message()
input_cost = (final.usage.input_tokens / 1_000_000) * 3.00 # Sonnet pricing
output_cost = (final.usage.output_tokens / 1_000_000) * 15.00
print(f"Request cost: ${input_cost + output_cost:.6f}")Which model to use for streaming
| Use case | Recommended model | Reason |
|---|---|---|
| Real-time chat | claude-sonnet-4-6 | Best speed/quality balance |
| Document analysis | claude-sonnet-4-6 | 200K context, fast enough |
| Complex reasoning | claude-opus-4-6 | Best quality, slower streaming |
| High-volume, simple tasks | claude-haiku-4-5 | Fastest tokens-per-second, lowest cost |
Key Takeaways
- Streaming transforms UX — first-word latency drops from 15+ seconds to under 1 second
- Use
.text_streamfor simple cases — the SDK's context manager handles all SSE parsing - Set
X-Accel-Buffering: noon your reverse proxy or chunks will be batched - Configure timeouts to 5+ minutes for production — default 60s kills long responses
- Async is essential for server-side streaming — don't block your event loop with sync calls
- Handle
APIConnectionErrorwith retry/backoff — network issues mid-stream are real
Start Building AI Apps — and Prove You Know How
Streaming is one of the core patterns covered in the Claude Certified Architect (CCA-F) exam. If you're looking to validate your Claude API skills with a credential that holds weight in 2026, the CCA certification is the most focused path.
AI for Anything offers a free CCA practice question bank and study guides that cover streaming, tool use, multi-agent patterns, and everything else in the exam blueprint.Start with the Claude API beginner guide if you're new to the API, then come back here once you're comfortable with basic messages. From here, the natural next step is multi-agent orchestration — where streaming becomes even more important as you chain agents together in real time.
Ready to Start Practicing?
300+ scenario-based practice questions covering all 5 CCA domains. Detailed explanations for every answer.
Free CCA Study Kit
Get domain cheat sheets, anti-pattern flashcards, and weekly exam tips. No spam, unsubscribe anytime.