Skip to main content

Stream chat completions end-to-end

The problem

Your application needs to render LLM tokens as they arrive — a chat UI, a CLI tool, a code editor's "ghost text" suggestion. Plain HTTP gets you the full response only at the end; SSE gets you each delta as the model produces it.

The reference Streaming page describes the wire protocol DVARA emits. This page shows working consumers in three languages and the few SSE-specific edge cases worth handling at the call site.

Want runnable copies?

Each snippet below ships as an executable script in the streaming-end-to-end/ directory of dvarahq/dvara-examples — clone, set DVARA_URL + DVARA_API_KEY, and run.

Prerequisites

  • A running DVARA instance (Quickstart)
  • A DVARA API key for a tenant
  • One of: Python 3.10+ with openai>=1.30, Node.js 18+ with openai>=4.50, or JDK 21+ (with JBang for the single-file Java demo)

Python (OpenAI SDK)

The OpenAI Python SDK handles SSE framing for you — iterate the stream and check delta.content:

from openai import OpenAI

client = OpenAI(
api_key="<your-dvara-api-key>",
base_url="http://localhost:8080/v1",
)

stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a haiku about programming."}],
stream=True,
)

for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
if chunk.choices[0].finish_reason == "content_filter":
print("\n[stream stopped: governance enforcement]")
print()

finish_reason: content_filter is DVARA's signal that a PII or guardrail rule terminated the stream. Treat it as a graceful end-of-stream — don't retry; the same content would trigger the same rule again. See Governance on streaming responses.

Node.js (OpenAI SDK)

import OpenAI from "openai";

const client = new OpenAI({
apiKey: process.env.DVARA_API_KEY,
baseURL: "http://localhost:8080/v1",
});

const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "Write a haiku about programming." }],
stream: true,
});

for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta;
if (delta?.content) process.stdout.write(delta.content);
if (chunk.choices[0]?.finish_reason === "content_filter") {
process.stdout.write("\n[stream stopped: governance enforcement]");
}
}
console.log();

The for await syntax handles SSE framing, including data: [DONE] termination, transparently.

Java (HttpClient + line iterator)

The JDK has no built-in SSE client, but the protocol is simple enough to parse with HttpClient.BodyHandlers.ofLines. Each data: … line carries one chunk; data: [DONE] ends the stream.

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.stream.Stream;

public class StreamingExample {

private static final ObjectMapper JSON = new ObjectMapper();

public static void main(String[] args) throws Exception {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(URI.create("http://localhost:8080/v1/chat/completions"))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + System.getenv("DVARA_API_KEY"))
.POST(HttpRequest.BodyPublishers.ofString("""
{
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Write a haiku about programming."}],
"stream": true
}
"""))
.build();

HttpResponse<Stream<String>> response = client.send(request, HttpResponse.BodyHandlers.ofLines());

response.body()
.filter(line -> line.startsWith("data: "))
.map(line -> line.substring("data: ".length()))
.takeWhile(payload -> !"[DONE]".equals(payload))
.forEach(payload -> {
try {
JsonNode chunk = JSON.readTree(payload);
String delta = chunk.path("choices").path(0).path("delta").path("content").asText("");
System.out.print(delta);
System.out.flush();
String finish = chunk.path("choices").path(0).path("finish_reason").asText("");
if ("content_filter".equals(finish)) {
System.out.print("\n[stream stopped: governance enforcement]");
}
} catch (Exception ignored) { }
});
System.out.println();
}
}

Streaming json_schema: accumulating partial JSON

When you ask for response_format: json_schema and the request lands on a provider that needs a tool-use rewrite (Anthropic, Bedrock), the gateway flattens the upstream's tool-use stream into normal content deltas. JSON bytes arrive progressively — but the full document isn't valid until the last delta. You have two strategies:

Wait for the stream to complete, then parse:

import json
from openai import OpenAI

client = OpenAI(api_key="<your-dvara-api-key>", base_url="http://localhost:8080/v1")

stream = client.chat.completions.create(
model="claude-sonnet-4-5",
messages=[{"role": "user", "content": "Extract: John is 30."}],
response_format={
"type": "json_schema",
"json_schema": {
"name": "person",
"schema": {
"type": "object",
"properties": {"name": {"type": "string"}, "age": {"type": "integer"}},
"required": ["name", "age"],
},
"strict": True,
},
},
stream=True,
)

buffer = []
for chunk in stream:
if chunk.choices[0].delta.content:
buffer.append(chunk.choices[0].delta.content)

result = json.loads("".join(buffer))
print(result["name"], result["age"])

Or render partial JSON progressively with a forgiving parser like json_repair or partial-json-parser — useful when the UI needs to show each field as it arrives, not just the final document. Standard json.loads will reject anything mid-stream.

Reconnect on idle disconnects

Long streams over flaky connections can be cut short by a proxy or load balancer. DVARA's data plane keeps the connection open as long as deltas are arriving, but if your network drops the socket the SDK raises a transport error.

A simple wrapper retries idempotent prompts up to three times with exponential backoff:

import time
from openai import OpenAI, APIConnectionError

client = OpenAI(api_key="<your-dvara-api-key>", base_url="http://localhost:8080/v1")

def stream_with_retry(messages, attempts=3, base_backoff=0.5):
for attempt in range(attempts):
try:
for chunk in client.chat.completions.create(
model="gpt-4o", messages=messages, stream=True,
):
yield chunk
return
except APIConnectionError:
if attempt == attempts - 1:
raise
time.sleep(base_backoff * (2 ** attempt))

for chunk in stream_with_retry([{"role": "user", "content": "Hello!"}]):
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)

Don't retry on finish_reason: content_filter — that's an enforcement decision, not a transport error. The retry handler should only catch connection-class failures.

What's next