Skip to content

feat(realtime): stream the LLM / TTS / transcription pipeline stages#10176

Open
localai-bot wants to merge 15 commits into
masterfrom
feat/realtime-streaming
Open

feat(realtime): stream the LLM / TTS / transcription pipeline stages#10176
localai-bot wants to merge 15 commits into
masterfrom
feat/realtime-streaming

Conversation

@localai-bot
Copy link
Copy Markdown
Collaborator

Summary

Adds opt-in streaming to each stage of the realtime pipeline (LLM tokens, TTS audio, transcription text), plus a per-pipeline disable-thinking toggle. By default every stage runs to completion before the next begins; these flags overlap generation, synthesis and playback to lower a turn's time-to-first-audio. Everything is off by default, so existing pipelines are unaffected.

name: gpt-realtime
pipeline:
  vad: silero-vad-ggml
  transcription: whisper-large-turbo
  llm: qwen3-4b
  tts: tts-1
  streaming:
    llm: true            # stream LLM tokens as transcript deltas + sentence-piped TTS
    tts: true            # emit response.output_audio.delta per synthesized chunk
    transcription: true  # stream conversation.item.input_audio_transcription.delta
  disable_thinking: true # maps to enable_thinking=false for the realtime LLM

What changed

  • Config (core/config): nested pipeline.streaming.{llm,tts,transcription} block + pipeline.disable_thinking, with StreamLLM()/StreamTTS()/StreamTranscription()/ThinkingDisabled() helpers. Pointer-bools so unset preserves the unary path.
  • Model interface: TTSStream / TranscribeStream on the realtime Model, delegating to the existing backend.ModelTTSStream / ModelTranscriptionStream. ttsStream adapts the backend's WAV-framed stream into raw PCM + sample rate.
  • TTS (emitSpeech): response audio flows through emitSpeech; with streaming.tts it sends one response.output_audio.delta per backend chunk, else one delta for the whole utterance.
  • Transcription (emitTranscription): with streaming.transcription, streams …transcription.delta events; else a single …completed (backends that don't stream emit one terminal event).
  • LLM (streamSegmenter + speechStreamer + streamLLMResponse): a real token callback strips reasoning via the streaming ReasoningExtractor, emits transcript text deltas, and sentence-pipes content into emitSpeech so each sentence is synthesized as soon as it's ready. Used only for plain-content turns (no tools) — tool turns keep the buffered path, since partial tool-call output can't be safely spoken mid-stream.
  • disable_thinking (applyPipelineThinking): forces ReasoningConfig.DisableReasoning on the per-session LLM config copy (no leak to other model users).
  • Docs: docs/content/features/openai-realtime.md.

Tests

TDD throughout — new unit tests for the config flags, the segmenter, emitSpeech (streaming vs unary), emitTranscription, speechStreamer, and applyPipelineThinking, with fakeModel/fakeTransport doubles. core/config and core/http/endpoints/openai suites pass; the unchanged buffered path is regression-guarded.

Validation note

Verified by deploying this branch onto a real distributed cluster: the controller drives the realtime pipeline end-to-end (WebSocket session, VAD, STT transcribed real speech correctly). The streaming.llm/streaming.tts delta paths are covered by the unit tests; a full live streaming turn additionally needs a worker that can load the chosen LLM/TTS.

Comment thread core/http/endpoints/openai/realtime_speech.go Fixed
@mudler mudler force-pushed the feat/realtime-streaming branch 3 times, most recently from cdfa782 to e14e7d7 Compare June 4, 2026 23:18
@mudler mudler requested a review from richiejp June 4, 2026 23:20
Copy link
Copy Markdown
Collaborator

@richiejp richiejp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to take advantage of real streaming models or as like a fallback when transcription and TTS don't actually support streaming? Looking at the code it seems to be the later, in which case it should probably be called "streaming emulation" or something like that

Comment thread core/http/endpoints/openai/realtime.go Outdated
// cannot produce a tool call (no tools), stream tokens straight to the client
// as transcript deltas and sentence-pipe them into TTS. Tool turns fall
// through to the buffered path below, since partial tool-call output can't be
// safely spoken mid-stream.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are tool call deltas IIRC, so it should be possible to stream when tool calls are pesent.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (2615934). Streaming now handles tool turns when the LLM uses its tokenizer template — there the C++ autoparser clears reply.Message and delivers content + tool_calls via ChatDeltas, so the spoken transcript only ever carries content and the tool calls are parsed from the final deltas. Grammar-based function calling keeps the buffered path, since there the call is emitted as JSON in the token stream and would leak into the transcript.

buf strings.Builder
}

func isSentenceTerminator(b byte) bool {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about languages that don't have sentences or word boundaries even? I'm not sure this should even exist for real streaming TTS and for non-streaming TTS I think it is better to buffer an entire message and get the LLM to produce short messages.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed the segmenter entirely (1010f20). As you said, for non-streaming TTS it's better to buffer the whole message (and prompt the model for short replies), and the .!?/newline splitting did nothing for languages without those boundaries anyway. Now the full reply is buffered and synthesized once: streamed as audio chunks via TTSStream where the backend supports it, a single unary delta otherwise.

@mudler mudler force-pushed the feat/realtime-streaming branch 2 times, most recently from b604758 to 2615934 Compare June 5, 2026 07:48
@localai-bot
Copy link
Copy Markdown
Collaborator Author

Thanks for the review! Addressed both points:

Streaming vs. emulation — good call, the per-sentence path was emulation and it's gone. streaming.llm streams the transcript token-by-token (real, via PredictStream), and TTS is real streaming where the backend supports TTSStream (whole message buffered, synthesized once, chunked audio) and a clean single-shot synthesis otherwise. No segmentation, no language assumptions. (1010f20)

Tool-call streaming — implemented for tokenizer-template models, where the autoparser separates content from tool calls so the transcript can't leak tool-call tokens; grammar-based function calling still buffers. (2615934)

Also fixed the gosec G304 the bot flagged (backend-controlled TTS temp path, normalized with filepath.Clean), and a related find along the way: disable_thinking was disabling reasoning extraction as well as setting enable_thinking=false, so leaked <think> could reach the client — spoken output now always strips reasoning regardless. (e81ff0c)

// per-session copy returned by the config loader, so this does not affect other
// users of the same model. When the pipeline does not set disable_thinking the
// LLM config is left untouched.
func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates two layers of thinking config and I wonder if it is not better to be able to fork model configs so users can quickly create different profiles for a model and change any of the settings? Or have a generic override mechanism for realtime pipelines. So we can override any setting for any of the contained models. Or introduce model overlays that take their settings from base model and merge overrides into it?

In the last case you could have one base model then N overlays for that model which inherit the base settings.

@mudler

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — the per-pipeline override fields don't scale, and model overlays (one base model + N overlays that inherit and merge overrides) are the right general mechanism. They'd subsume both pipeline.disable_thinking here and pipeline.reasoning_effort (which already merged in #10184), and they're useful well beyond realtime.

I've opened #10185 to track the overlay mechanism. Proposing to keep the targeted fields for now (one's already in master) and deprecate them in favor of overlays once that lands, so this PR isn't blocked on the larger design. Sound reasonable?

mudler added 15 commits June 5, 2026 14:03
Add a nested pipeline.streaming.{llm,tts,transcription} block plus
pipeline.disable_thinking, with StreamLLM/StreamTTS/StreamTranscription/
ThinkingDisabled helpers. Pointer-bools so unset keeps the unary path;
existing configs are unaffected. Wiring into the realtime handler follows.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
streamSegmenter accumulates streamed LLM tokens and emits complete
sentence/clause segments (terminator+whitespace, or newline) so TTS can
synthesize each segment as it completes instead of waiting for the whole
reply. Pure helper; the streaming handler wiring consumes it next.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Add TTSStream and TranscribeStream to the realtime Model interface and
implement them on wrappedModel (delegating to backend.ModelTTSStream /
ModelTranscriptionStream) and transcriptOnlyModel. ttsStream adapts the
backend's WAV-framed stream (44-byte header carrying the sample rate, then
PCM) into raw PCM + sample rate for the realtime transports. Handler wiring
that consumes these (flag-gated) follows.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
emitSpeech synthesizes a piece of text and forwards audio to the client,
streaming one output_audio.delta per backend PCM chunk when the pipeline
sets streaming.tts, or one delta for the whole utterance otherwise. WebRTC
gets raw PCM (it resamples internally); WebSocket gets base64 PCM at the
session rate. It emits no transcript/audio-done events so a streamed reply
can be split into multiple spoken segments sharing one response.

Adds fakeModel/fakeTransport test doubles for the realtime Model/Transport
interfaces, driving streaming assertions deterministically.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Replace the inline unary TTS block in the response handler with emitSpeech,
which streams a response.output_audio.delta per backend PCM chunk when
pipeline.streaming.tts is set and otherwise preserves the single-delta unary
behaviour. emitSpeech returns the accumulated base64 audio, stored on the
conversation item as before. Transcript and audio-done events stay in the
handler so later per-segment streaming can reuse emitSpeech.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Add emitTranscription and route commitUtterance through it. With
pipeline.streaming.transcription set it streams each transcript fragment as
a conversation.item.input_audio_transcription.delta via TranscribeStream
then a completed event; otherwise it preserves the single completed-event
unary behaviour. Returns the final transcript for response generation.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
applyPipelineThinking forces the LLM's ReasoningConfig.DisableReasoning when
pipeline.disable_thinking is set, which gRPCPredictOpts turns into the
enable_thinking=false backend metadata. Applied at newModel construction on
the per-session LLM config copy, so it doesn't leak to other model users and
needs no realtime-specific request plumbing.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
emitSpeech now returns raw PCM (caller base64-encodes) so streamed segments
accumulate correctly. speechStreamer consumes streamed LLM tokens: it strips
reasoning via the streaming ReasoningExtractor, emits a transcript delta per
content fragment, and sentence-pipes content into emitSpeech so each sentence
is synthesized as soon as it's ready. Handler wiring (plain-content turns)
follows.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
triggerResponseAtTurn takes a streamed path when pipeline.streaming.llm is
set, the turn has no tools, and audio is requested: streamLLMResponse
announces the assistant item, drives the LLM token callback through a
speechStreamer (reasoning-stripped transcript deltas + sentence-piped TTS),
and emits the terminal events. Tool turns and non-streaming pipelines keep
the existing buffered path unchanged, so this is strictly opt-in.

Assisted-by: Claude:claude-opus-4-8 go vet
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
TestAllFieldsHaveRegistryEntries (core/config/meta) requires every config
field to have a meta registry entry. The four new pipeline fields
(disable_thinking, streaming.{llm,tts,transcription}) had none, failing
tests-linux/tests-apple. Add toggle entries for them.

Also handle the os.Remove return in realtime_speech_test.go to satisfy
errcheck (golangci-lint).

Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
disable_thinking maps to ReasoningConfig.DisableReasoning=true on the LLM
config, which the backend reads as enable_thinking=false. But the realtime
handler reads that SAME config to drive reasoning extraction, and there
DisableReasoning=true means "skip stripping". PredictConfig() returns this
LLM config, so both the streamed (speechStreamer) and buffered realtime
paths stopped stripping <think>…</think> exactly when disable_thinking was
on — leaking raw reasoning to the client whenever the model ignored the
enable_thinking hint (e.g. lfm2.5).

Add spokenReasoningConfig() which clears DisableReasoning for extraction
(keeping custom tokens/tag pairs) and route both realtime paths through it.
Spoken output now always strips reasoning, independent of the backend
suppression hint.

Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
emitSpeech reads the WAV file the TTS backend wrote. The read moved here
from realtime.go, so code-scanning flagged it as a new G304 alert even
though the path is backend-controlled (a temp file), not user input.
Wrap it in filepath.Clean — a real path normalization that also clears
the alert, keeping with the repo's no-#nosec convention.

Assisted-by: Claude:claude-opus-4-8 gosec, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
…nter

Per review (richiejp): the sentence segmenter pipelined unary TTS by
splitting on ASCII .!?/newline, which does nothing for languages without
those boundaries (CJK/Thai) — there it already degraded to buffering the
whole message anyway.

Replace it with a uniform model: stream the LLM transcript live, buffer the
full message, then synthesize it once. emitSpeech already streams the audio
chunks when the backend implements TTSStream and falls back to a single
unary delta otherwise, so this is real streaming TTS where supported and a
clean whole-message synthesis elsewhere — no per-sentence emulation, no
language assumptions. speechStreamer becomes transcriptStreamer (transcript
deltas only); the whole-message synthesis moves into streamLLMResponse.

Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Per review (richiejp): tool-call deltas exist, so streaming should work with
tools too. It does — for models that use their tokenizer template. The C++
autoparser then clears reply.Message and delivers content + tool calls via
ChatDeltas, so the streamed transcript carries only spoken content (no
tool-call JSON leak) and the tool calls are parsed from the final response.

- Drop the len(tools)==0 gate; stream when no tools OR use_tokenizer_template
  (grammar-based function calling still buffers, since its call is emitted as
  JSON in the token stream and would leak into the transcript).
- streamLLMResponse takes tools/toolChoice/toolTurn, reads ChatDelta content
  in the token callback, parses tool calls from the final ChatDeltas, and
  creates the assistant content item lazily so a content-less tool turn emits
  only the tool calls.
- Extract emitToolCallItems from the buffered path so both paths finalize tool
  calls, response.done, and server-side assistant-tool follow-ups identically.

Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants