Tutorials10 min read

Claude API Streaming: Build Real-Time AI Responses in Python and TypeScript (2026)

Complete tutorial on Claude API streaming with Server-Sent Events. Build real-time AI responses in Python, TypeScript, FastAPI, and Next.js with production-ready code.

Claude API Streaming: Build Real-Time AI Responses in Python and TypeScript

Nothing kills an AI-powered app's UX faster than a blank screen with a spinner. Your users stare at nothing for 10–20 seconds, then wonder if something broke — even when the model is generating brilliant output the entire time.

Streaming fixes this by sending each token as it's generated, so the response appears word-by-word, just like ChatGPT and Claude.ai do it natively. This tutorial walks through every level: the protocol, the SDK helpers, a production FastAPI endpoint, and a Next.js streaming route — all using Claude's API.

What Is Streaming and Why It Matters

By default, Claude's API waits until the entire response is complete before returning it. For a 500-word response, that can be 15–25 seconds of silence. Streaming switches to server-sent events (SSE) — a one-way HTTP channel where the server pushes incremental chunks to the client as they're generated.

The user experience difference is dramatic:

ModePerceived wait time (500-word response)First-word latency
Default (batch)15–25 seconds15–25 seconds
Streaming< 1 second300–800ms

Streaming doesn't make Claude faster at generating tokens — it just stops hiding the work while it's happening. For most chat, summarization, and document analysis use cases, this alone makes your app feel 10× more responsive.

Streaming is also required for:

  • Real-time chatbots — users expect to see text appear as it's typed
  • Long-form generation — articles, reports, code files where 30-second silences are unacceptable
  • Progressive disclosure — show early parts of an answer so users can start reading before generation finishes
  • Cost observability — inspect token counts mid-stream before the full response commits

How Claude Streaming Works: The SSE Protocol

Claude's streaming API follows the Server-Sent Events specification. Each event is a data: line followed by a blank line:

data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":", world"}}

data: {"type":"message_stop"}

The main event types you need to handle:

Event typeWhen it firesContains
message_startOnce at the startFull Message object with empty content
content_block_startBefore each content blockBlock index, block type (text or tool_use)
content_block_deltaEach token or JSON chunktext_delta (text) or input_json_delta (tool calls)
content_block_stopEnd of each blockBlock index
message_deltaEnd of messageFinal stop_reason, accumulated usage stats
message_stopTerminates the streamNothing — just signals completion

The SDKs handle all of this for you — you rarely need to parse raw SSE unless you're building a proxy or a low-level integration.

Python Streaming Tutorial

Installation

bashpip install anthropic

Basic streaming with the context manager

The cleanest way to stream in Python is with client.messages.stream(), which returns a context manager that exposes .text_stream — a simple iterator over text chunks:

pythonimport anthropic

client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY from environment

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain how transformers work in 5 paragraphs."}
    ],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# After the context manager closes, get the full final message:
final_message = stream.get_final_message()
print(f"\n\nInput tokens: {final_message.usage.input_tokens}")
print(f"Output tokens: {final_message.usage.output_tokens}")

The flush=True is important — without it, Python's output buffering can hold chunks and defeat the purpose of streaming.

Streaming with raw events for advanced control

If you need to react to tool calls, track block boundaries, or handle thinking blocks, iterate over raw events instead:

pythonimport anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    messages=[{"role": "user", "content": "Write a Python class for a binary search tree."}],
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_stop":
            print("\n[Stream complete]")

Async streaming for production services

In async contexts (FastAPI, async Django, etc.), use the async client:

pythonimport asyncio
import anthropic

async def stream_response(prompt: str):
    client = anthropic.AsyncAnthropic()
    
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        async for text in stream.text_stream:
            yield text
        
        final = await stream.get_final_message()
        return final

# Usage
async def main():
    async for chunk in stream_response("Summarize the history of machine learning"):
        print(chunk, end="", flush=True)

asyncio.run(main())

TypeScript Streaming Tutorial

Installation

bashnpm install @anthropic-ai/sdk

Basic streaming

typescriptimport Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic(); // reads ANTHROPIC_API_KEY from environment

async function streamResponse() {
  const stream = await client.messages.create({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    stream: true,
    messages: [
      {
        role: "user",
        content: "Write a TypeScript function to debounce any async function.",
      },
    ],
  });

  for await (const event of stream) {
    if (
      event.type === "content_block_delta" &&
      event.delta.type === "text_delta"
    ) {
      process.stdout.write(event.delta.text);
    }
  }

  console.log("\n[Done]");
}

streamResponse().catch(console.error);

Using the helper stream class

The SDK also provides a stream() helper with convenience methods:

typescriptimport Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function streamWithHelper() {
  const stream = client.messages.stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: "Explain async/await in JavaScript." }],
  });

  // Listen to text events
  stream.on("text", (text) => {
    process.stdout.write(text);
  });

  // Wait for completion and get the full message
  const finalMessage = await stream.finalMessage();
  console.log(`\nTotal tokens: ${finalMessage.usage.output_tokens}`);
}

streamWithHelper();

The .on("text", ...) pattern is useful when you want to process text as it arrives without managing an async iterator manually — for example, piping chunks into a WebSocket or a UI state update.

Building a Streaming API Endpoint with FastAPI

This is the pattern you'll use in a real Python backend — expose a /chat endpoint that proxies Claude's stream to your frontend as SSE:

pythonfrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import json

app = FastAPI()
client = anthropic.AsyncAnthropic()

class ChatRequest(BaseModel):
    message: str
    system: str = "You are a helpful AI assistant."

async def claude_stream_generator(message: str, system: str):
    """Generator that yields SSE-formatted events from Claude."""
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system=system,
        messages=[{"role": "user", "content": message}],
    ) as stream:
        async for text in stream.text_stream:
            # Format as SSE
            data = json.dumps({"text": text})
            yield f"data: {data}\n\n"
        
        # Signal completion
        yield "data: [DONE]\n\n"

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
    return StreamingResponse(
        claude_stream_generator(request.message, request.system),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
            "Connection": "keep-alive",
        },
    )

Critical header: X-Accel-Buffering: no tells nginx (and similar reverse proxies) not to buffer the response. Without it, your SSE chunks may be held and flushed in batches, losing the streaming effect entirely.

Consuming SSE in the browser

javascriptasync function streamChat(message) {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let outputEl = document.getElementById("output");

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = line.slice(6);
        if (data === "[DONE]") return;
        
        try {
          const parsed = JSON.parse(data);
          outputEl.textContent += parsed.text;
        } catch {
          // Ignore malformed chunks
        }
      }
    }
  }
}

Building a Streaming Route in Next.js (App Router)

Next.js 15's App Router has first-class streaming support via ReadableStream. Here's a production-ready API route:

typescript// app/api/chat/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { NextRequest } from "next/server";

const anthropic = new Anthropic();

export async function POST(request: NextRequest) {
  const { message, system } = await request.json();

  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      try {
        const claudeStream = anthropic.messages.stream({
          model: "claude-sonnet-4-6",
          max_tokens: 2048,
          system: system ?? "You are a helpful assistant.",
          messages: [{ role: "user", content: message }],
        });

        claudeStream.on("text", (text) => {
          const data = `data: ${JSON.stringify({ text })}\n\n`;
          controller.enqueue(encoder.encode(data));
        });

        await claudeStream.finalMessage();
        controller.enqueue(encoder.encode("data: [DONE]\n\n"));
        controller.close();
      } catch (error) {
        controller.error(error);
      }
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

React hook for consuming the stream

typescript// hooks/useClaudeStream.ts
import { useState, useCallback } from "react";

export function useClaudeStream() {
  const [text, setText] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (message: string) => {
    setText("");
    setIsStreaming(true);

    const response = await fetch("/api/chat", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ message }),
    });

    const reader = response.body!.getReader();
    const decoder = new TextDecoder();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      for (const line of chunk.split("\n")) {
        if (!line.startsWith("data: ")) continue;
        const data = line.slice(6);
        if (data === "[DONE]") { setIsStreaming(false); return; }
        
        try {
          const { text: chunk } = JSON.parse(data);
          setText((prev) => prev + chunk);
        } catch { /* ignore */ }
      }
    }

    setIsStreaming(false);
  }, []);

  return { text, isStreaming, sendMessage };
}

Production Considerations

Timeout configuration

This is the most common production pitfall. Default HTTP timeouts (30–60 seconds) will abort Claude mid-response on long generations. Configure your client accordingly:

python# Python
client = anthropic.AsyncAnthropic(
    timeout=300.0  # 5 minutes for long documents
)

typescript// TypeScript
const client = new Anthropic({
  timeout: 300_000, // 5 minutes in milliseconds
});

For FastAPI behind nginx, also set:

nginxproxy_read_timeout 300s;
proxy_send_timeout 300s;

Error handling and reconnection

Streams can fail mid-response due to network issues. The SDK throws an anthropic.APIStatusError on HTTP errors and anthropic.APIConnectionError on network failures. Implement a retry wrapper for production:

pythonimport asyncio
from anthropic import APIConnectionError, APIStatusError

async def stream_with_retry(prompt: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            async with client.messages.stream(
                model="claude-sonnet-4-6",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            ) as stream:
                async for text in stream.text_stream:
                    yield text
                return  # Success — exit retry loop
        except APIConnectionError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
        except APIStatusError as e:
            if e.status_code == 529:  # Overloaded
                await asyncio.sleep(5)
            else:
                raise

Token usage and cost tracking

Streaming includes token usage in the message_delta event near the end. Access it after the stream completes:

pythonwith client.messages.stream(...) as stream:
    for text in stream.text_stream:
        yield text

final = stream.get_final_message()
input_cost = (final.usage.input_tokens / 1_000_000) * 3.00   # Sonnet pricing
output_cost = (final.usage.output_tokens / 1_000_000) * 15.00
print(f"Request cost: ${input_cost + output_cost:.6f}")

Which model to use for streaming

Use caseRecommended modelReason
Real-time chatclaude-sonnet-4-6Best speed/quality balance
Document analysisclaude-sonnet-4-6200K context, fast enough
Complex reasoningclaude-opus-4-6Best quality, slower streaming
High-volume, simple tasksclaude-haiku-4-5Fastest tokens-per-second, lowest cost

Key Takeaways

  • Streaming transforms UX — first-word latency drops from 15+ seconds to under 1 second
  • Use .text_stream for simple cases — the SDK's context manager handles all SSE parsing
  • Set X-Accel-Buffering: no on your reverse proxy or chunks will be batched
  • Configure timeouts to 5+ minutes for production — default 60s kills long responses
  • Async is essential for server-side streaming — don't block your event loop with sync calls
  • Handle APIConnectionError with retry/backoff — network issues mid-stream are real

Start Building AI Apps — and Prove You Know How

Streaming is one of the core patterns covered in the Claude Certified Architect (CCA-F) exam. If you're looking to validate your Claude API skills with a credential that holds weight in 2026, the CCA certification is the most focused path.

AI for Anything offers a free CCA practice question bank and study guides that cover streaming, tool use, multi-agent patterns, and everything else in the exam blueprint.

Start with the Claude API beginner guide if you're new to the API, then come back here once you're comfortable with basic messages. From here, the natural next step is multi-agent orchestration — where streaming becomes even more important as you chain agents together in real time.

Ready to Start Practicing?

300+ scenario-based practice questions covering all 5 CCA domains. Detailed explanations for every answer.

Free CCA Study Kit

Get domain cheat sheets, anti-pattern flashcards, and weekly exam tips. No spam, unsubscribe anytime.