From 16d7704a692f6f0d02d09f7345764e351c33ca8a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:03:03 +0000 Subject: [PATCH 01/15] feat(realtime): pipeline streaming + disable_thinking config Add a nested pipeline.streaming.{llm,tts,transcription} block plus pipeline.disable_thinking, with StreamLLM/StreamTTS/StreamTranscription/ ThinkingDisabled helpers. Pointer-bools so unset keeps the unary path; existing configs are unaffected. Wiring into the realtime handler follows. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/config/model_config.go | 33 ++++++++++++++++ core/config/pipeline_streaming_test.go | 54 ++++++++++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 core/config/pipeline_streaming_test.go diff --git a/core/config/model_config.go b/core/config/model_config.go index 9980c92e8c80..241ed2d49863 100644 --- a/core/config/model_config.go +++ b/core/config/model_config.go @@ -499,6 +499,16 @@ type Pipeline struct { // the pipeline's LLM without editing the LLM model config. Overrides the LLM's // own reasoning_effort. Unset leaves the LLM model config in charge. ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"` + + // Streaming opts each pipeline stage into incremental delivery (LLM tokens, + // TTS audio chunks, transcription text). Unset stages keep the blocking + // unary path, so existing configs are unaffected. + Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"` + + // DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps + // to enable_thinking=false backend metadata) without editing the underlying + // LLM model config. Unset leaves the LLM model config in charge. + DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"` } // ApplyReasoningEffort resolves the effective reasoning effort — a per-request @@ -530,6 +540,29 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) { } } +// @Description PipelineStreaming toggles incremental delivery per realtime stage. +type PipelineStreaming struct { + LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"` + TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"` + Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"` +} + +// StreamLLM reports whether LLM tokens should be streamed for this pipeline. +func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM } + +// StreamTTS reports whether TTS audio should be streamed for this pipeline. +func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS } + +// StreamTranscription reports whether transcription text should be streamed. +func (p Pipeline) StreamTranscription() bool { + return p.Streaming.Transcription != nil && *p.Streaming.Transcription +} + +// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off. +func (p Pipeline) ThinkingDisabled() bool { + return p.DisableThinking != nil && *p.DisableThinking +} + // @Description File configuration for model downloads type File struct { Filename string `yaml:"filename,omitempty" json:"filename,omitempty"` diff --git a/core/config/pipeline_streaming_test.go b/core/config/pipeline_streaming_test.go new file mode 100644 index 000000000000..a6bec5ee4eb6 --- /dev/null +++ b/core/config/pipeline_streaming_test.go @@ -0,0 +1,54 @@ +package config + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "gopkg.in/yaml.v3" +) + +// The realtime pipeline can stream each stage (LLM tokens, TTS audio, +// transcription text) and can disable model "thinking" for the LLM. These are +// opt-in per pipeline; everything defaults to off so existing configs keep the +// unary behaviour. +var _ = Describe("Pipeline streaming config", func() { + It("defaults every streaming + thinking helper to false when unset", func() { + var p Pipeline + Expect(p.StreamLLM()).To(BeFalse()) + Expect(p.StreamTTS()).To(BeFalse()) + Expect(p.StreamTranscription()).To(BeFalse()) + Expect(p.ThinkingDisabled()).To(BeFalse()) + }) + + It("parses the nested streaming block and disable_thinking from YAML", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + llm: my-llm + tts: my-tts + transcription: my-stt + streaming: + llm: true + tts: true + transcription: true + disable_thinking: true +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamLLM()).To(BeTrue()) + Expect(c.Pipeline.StreamTTS()).To(BeTrue()) + Expect(c.Pipeline.StreamTranscription()).To(BeTrue()) + Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue()) + }) + + It("treats an explicit false in the streaming block as disabled", func() { + var c ModelConfig + err := yaml.Unmarshal([]byte(` +name: gpt-realtime +pipeline: + streaming: + tts: false +`), &c) + Expect(err).ToNot(HaveOccurred()) + Expect(c.Pipeline.StreamTTS()).To(BeFalse()) + }) +}) From e0820a11c9f10f37741c7b7918ccadeba6f22eea Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:05:30 +0000 Subject: [PATCH 02/15] feat(realtime): sentence segmenter for streamed LLM->TTS pipelining streamSegmenter accumulates streamed LLM tokens and emits complete sentence/clause segments (terminator+whitespace, or newline) so TTS can synthesize each segment as it completes instead of waiting for the whole reply. Pure helper; the streaming handler wiring consumes it next. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_segmenter.go | 61 +++++++++++++++++++ .../openai/realtime_segmenter_test.go | 41 +++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 core/http/endpoints/openai/realtime_segmenter.go create mode 100644 core/http/endpoints/openai/realtime_segmenter_test.go diff --git a/core/http/endpoints/openai/realtime_segmenter.go b/core/http/endpoints/openai/realtime_segmenter.go new file mode 100644 index 000000000000..77116229f157 --- /dev/null +++ b/core/http/endpoints/openai/realtime_segmenter.go @@ -0,0 +1,61 @@ +package openai + +import "strings" + +// streamSegmenter accumulates streamed LLM text and emits complete utterance +// segments (sentence/clause boundaries) so the realtime pipeline can hand each +// segment to TTS as soon as it's complete, overlapping generation, synthesis +// and playback instead of waiting for the whole reply. +// +// A segment is committed when a sentence terminator (. ! ?) is followed by +// whitespace, or at a newline. Terminators not followed by whitespace (e.g. +// decimals like "3.14" mid-stream) stay buffered until more text arrives or the +// stream is flushed. +type streamSegmenter struct { + buf strings.Builder +} + +func isSentenceTerminator(b byte) bool { + return b == '.' || b == '!' || b == '?' +} + +func isSpace(b byte) bool { + return b == ' ' || b == '\t' || b == '\n' || b == '\r' +} + +// Push appends text to the buffer and returns any newly-completed segments, +// trimmed of surrounding whitespace. Incomplete trailing text stays buffered. +func (s *streamSegmenter) Push(text string) []string { + s.buf.WriteString(text) + cur := s.buf.String() + + var segments []string + start := 0 + for i := 0; i < len(cur); i++ { + cut := -1 + switch { + case cur[i] == '\n': + cut = i // segment excludes the newline + case isSentenceTerminator(cur[i]) && i+1 < len(cur) && isSpace(cur[i+1]): + cut = i + 1 // segment includes the terminator + } + if cut >= 0 { + if seg := strings.TrimSpace(cur[start:cut]); seg != "" { + segments = append(segments, seg) + } + start = cut + } + } + + rem := cur[start:] + s.buf.Reset() + s.buf.WriteString(rem) + return segments +} + +// Flush returns the remaining buffered text (trimmed) and clears the buffer. +func (s *streamSegmenter) Flush() string { + seg := strings.TrimSpace(s.buf.String()) + s.buf.Reset() + return seg +} diff --git a/core/http/endpoints/openai/realtime_segmenter_test.go b/core/http/endpoints/openai/realtime_segmenter_test.go new file mode 100644 index 000000000000..13d9a8c78293 --- /dev/null +++ b/core/http/endpoints/openai/realtime_segmenter_test.go @@ -0,0 +1,41 @@ +package openai + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// streamSegmenter turns a stream of LLM token text into complete sentence/clause +// segments so TTS can start synthesizing before the full reply is generated. +var _ = Describe("streamSegmenter", func() { + It("buffers partial text until a sentence terminator followed by space", func() { + var s streamSegmenter + Expect(s.Push("Hello")).To(BeEmpty()) + Expect(s.Push(" world")).To(BeEmpty()) + Expect(s.Push(". ")).To(Equal([]string{"Hello world."})) + }) + + It("emits each complete sentence and keeps the trailing partial buffered", func() { + var s streamSegmenter + Expect(s.Push("One. Two! Three")).To(Equal([]string{"One.", "Two!"})) + Expect(s.Flush()).To(Equal("Three")) + }) + + It("splits on newlines", func() { + var s streamSegmenter + Expect(s.Push("Line one\nLine two")).To(Equal([]string{"Line one"})) + Expect(s.Flush()).To(Equal("Line two")) + }) + + It("does not split decimals or mid-token punctuation", func() { + var s streamSegmenter + Expect(s.Push("Pi is 3.14 today")).To(BeEmpty()) + Expect(s.Flush()).To(Equal("Pi is 3.14 today")) + }) + + It("flushes to empty when the buffer holds only consumed text", func() { + var s streamSegmenter + s.Push("Done. ") + Expect(s.Flush()).To(Equal("")) + }) +}) From 2ba2216ce20c0b00bc9423521dbdbaef469da90c Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:12:27 +0000 Subject: [PATCH 03/15] feat(realtime): streaming TTS/transcription methods on Model interface Add TTSStream and TranscribeStream to the realtime Model interface and implement them on wrappedModel (delegating to backend.ModelTTSStream / ModelTranscriptionStream) and transcriptOnlyModel. ttsStream adapts the backend's WAV-framed stream (44-byte header carrying the sample rate, then PCM) into raw PCM + sample rate for the realtime transports. Handler wiring that consumes these (flag-gated) follows. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 6 ++ core/http/endpoints/openai/realtime_model.go | 74 ++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 0d638a909d34..76dc9fd3f254 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -235,6 +235,12 @@ type Model interface { Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error) Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error) TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error) + // TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM + // chunks (and the backend sample rate) as they are produced. + TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error + // TranscribeStream transcribes audio incrementally, invoking onDelta for each + // transcript text fragment and returning the final aggregated result. + TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) PredictConfig() *config.ModelConfig } diff --git a/core/http/endpoints/openai/realtime_model.go b/core/http/endpoints/openai/realtime_model.go index b9a3adda92ec..b184393403f5 100644 --- a/core/http/endpoints/openai/realtime_model.go +++ b/core/http/endpoints/openai/realtime_model.go @@ -3,6 +3,7 @@ package openai import ( "context" "crypto/rand" + "encoding/binary" "encoding/hex" "encoding/json" "fmt" @@ -87,6 +88,14 @@ func (m *transcriptOnlyModel) TTS(ctx context.Context, text, voice, language str return "", nil, fmt.Errorf("TTS not supported in transcript-only mode") } +func (m *transcriptOnlyModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + return fmt.Errorf("TTS not supported in transcript-only mode") +} + +func (m *transcriptOnlyModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta) +} + func (m *transcriptOnlyModel) PredictConfig() *config.ModelConfig { return nil } @@ -321,10 +330,75 @@ func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (s return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig) } +func (m *wrappedModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + return ttsStream(ctx, m.modelLoader, m.appConfig, *m.TTSConfig, text, voice, language, onAudio) +} + +func (m *wrappedModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta) +} + func (m *wrappedModel) PredictConfig() *config.ModelConfig { return m.LLMConfig } +// wavStreamHeaderBytes is the size of the WAV header that backend.ModelTTSStream +// emits as its first audio callback; the sample rate lives at byte offset 24. +const wavStreamHeaderBytes = 44 + +// ttsStream adapts backend.ModelTTSStream (which emits a WAV stream: a 44-byte +// header carrying the sample rate, then raw PCM) to the realtime onAudio +// callback, which wants raw PCM plus the sample rate. The header is buffered +// until complete, the sample rate is read from it, and subsequent bytes are +// forwarded as PCM. +func ttsStream(ctx context.Context, ml *model.ModelLoader, appConfig *config.ApplicationConfig, ttsConfig config.ModelConfig, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error { + var header []byte + headerDone := false + sampleRate := 0 + return backend.ModelTTSStream(ctx, text, voice, language, "", nil, ml, appConfig, ttsConfig, func(b []byte) error { + if headerDone { + if len(b) == 0 { + return nil + } + return onAudio(b, sampleRate) + } + header = append(header, b...) + if len(header) < wavStreamHeaderBytes { + return nil + } + sampleRate = int(binary.LittleEndian.Uint32(header[24:28])) + headerDone = true + if len(header) > wavStreamHeaderBytes { + return onAudio(header[wavStreamHeaderBytes:], sampleRate) + } + return nil + }) +} + +// transcribeStream adapts backend.ModelTranscriptionStream to the realtime +// onDelta callback, returning the final aggregated transcription result. +func transcribeStream(ctx context.Context, ml *model.ModelLoader, transcriptionConfig config.ModelConfig, appConfig *config.ApplicationConfig, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + var final *schema.TranscriptionResult + err := backend.ModelTranscriptionStream(ctx, backend.TranscriptionRequest{ + Audio: audio, + Language: language, + Translate: translate, + Diarize: diarize, + Prompt: prompt, + }, ml, transcriptionConfig, appConfig, func(chunk backend.TranscriptionStreamChunk) { + if chunk.Delta != "" { + onDelta(chunk.Delta) + } + if chunk.Final != nil { + final = chunk.Final + } + }) + if err != nil { + return nil, err + } + return final, nil +} + func newTranscriptionOnlyModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) (Model, *config.ModelConfig, error) { cfgVAD, err := cl.LoadModelConfigFileByName(pipeline.VAD, ml.ModelPath) if err != nil { From 2c6fdd0570d0dc75d32827c818f1626eaa461c45 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:16:53 +0000 Subject: [PATCH 04/15] feat(realtime): emitSpeech with flag-gated streaming TTS emitSpeech synthesizes a piece of text and forwards audio to the client, streaming one output_audio.delta per backend PCM chunk when the pipeline sets streaming.tts, or one delta for the whole utterance otherwise. WebRTC gets raw PCM (it resamples internally); WebSocket gets base64 PCM at the session rate. It emits no transcript/audio-done events so a streamed reply can be split into multiple spoken segments sharing one response. Adds fakeModel/fakeTransport test doubles for the realtime Model/Transport interfaces, driving streaming assertions deterministically. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_doubles_test.go | 101 ++++++++++++++++++ core/http/endpoints/openai/realtime_speech.go | 86 +++++++++++++++ .../endpoints/openai/realtime_speech_test.go | 68 ++++++++++++ 3 files changed, 255 insertions(+) create mode 100644 core/http/endpoints/openai/realtime_doubles_test.go create mode 100644 core/http/endpoints/openai/realtime_speech.go create mode 100644 core/http/endpoints/openai/realtime_speech_test.go diff --git a/core/http/endpoints/openai/realtime_doubles_test.go b/core/http/endpoints/openai/realtime_doubles_test.go new file mode 100644 index 000000000000..2a54f3dbe47c --- /dev/null +++ b/core/http/endpoints/openai/realtime_doubles_test.go @@ -0,0 +1,101 @@ +package openai + +import ( + "context" + + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/grpc/proto" +) + +// fakeTransport records the server events and audio sent to a realtime client +// so streaming behaviour can be asserted without a real WebSocket/WebRTC peer. +// It is not a *WebRTCTransport, so handler code takes the WebSocket path. +type fakeTransport struct { + events []types.ServerEvent + audio []fakeAudioChunk +} + +type fakeAudioChunk struct { + pcm []byte + sampleRate int +} + +func (f *fakeTransport) SendEvent(e types.ServerEvent) error { + f.events = append(f.events, e) + return nil +} + +func (f *fakeTransport) ReadEvent() ([]byte, error) { return nil, nil } + +func (f *fakeTransport) SendAudio(_ context.Context, pcm []byte, sampleRate int) error { + f.audio = append(f.audio, fakeAudioChunk{pcm: pcm, sampleRate: sampleRate}) + return nil +} + +func (f *fakeTransport) Close() error { return nil } + +// countEvents returns how many recorded events have the given type. +func (f *fakeTransport) countEvents(et types.ServerEventType) int { + n := 0 + for _, e := range f.events { + if e.ServerEventType() == et { + n++ + } + } + return n +} + +// fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks +// and TranscribeStream replays transcribeDeltas, so the handler's streaming +// paths can be driven deterministically. +type fakeModel struct { + cfg *config.ModelConfig + + ttsFile string + ttsStreamChunks [][]byte + ttsStreamRate int + ttsStreamErr error + + transcribeDeltas []string + transcribeFinal *schema.TranscriptionResult +} + +func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) { + return nil, nil +} + +func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, string) (*schema.TranscriptionResult, error) { + return m.transcribeFinal, nil +} + +func (m *fakeModel) Predict(context.Context, schema.Messages, []string, []string, []string, func(string, backend.TokenUsage) bool, []types.ToolUnion, *types.ToolChoiceUnion, *int, *int, map[string]float64) (func() (backend.LLMResponse, error), error) { + return nil, nil +} + +func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) { + return m.ttsFile, &proto.Result{Success: true}, nil +} + +func (m *fakeModel) TTSStream(_ context.Context, _, _, _ string, onAudio func(pcm []byte, sampleRate int) error) error { + if m.ttsStreamErr != nil { + return m.ttsStreamErr + } + for _, c := range m.ttsStreamChunks { + if err := onAudio(c, m.ttsStreamRate); err != nil { + return err + } + } + return nil +} + +func (m *fakeModel) TranscribeStream(_ context.Context, _, _ string, _, _ bool, _ string, onDelta func(text string)) (*schema.TranscriptionResult, error) { + for _, d := range m.transcribeDeltas { + onDelta(d) + } + return m.transcribeFinal, nil +} + +func (m *fakeModel) PredictConfig() *config.ModelConfig { return m.cfg } diff --git a/core/http/endpoints/openai/realtime_speech.go b/core/http/endpoints/openai/realtime_speech.go new file mode 100644 index 000000000000..b8429f7cafb3 --- /dev/null +++ b/core/http/endpoints/openai/realtime_speech.go @@ -0,0 +1,86 @@ +package openai + +import ( + "context" + "encoding/base64" + "fmt" + "os" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + laudio "github.com/mudler/LocalAI/pkg/audio" + "github.com/mudler/LocalAI/pkg/sound" +) + +// emitSpeech synthesizes text and sends the audio to the client. When the +// pipeline opts into TTS streaming it forwards each PCM chunk as its own +// response.output_audio.delta as soon as the backend produces it; otherwise it +// synthesizes the whole utterance and sends it as a single delta. +// +// It deliberately does NOT emit transcript or audio-done events: the caller owns +// those so a streamed reply can be split into several spoken segments that share +// one response/item. +func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) error { + if text == "" { + return nil + } + + _, isWebRTC := t.(*WebRTCTransport) + + // sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw + // PCM directly (it resamples internally); WebSocket gets base64 PCM at the + // session output rate via a JSON delta event. + sendChunk := func(pcm []byte, sampleRate int) error { + if len(pcm) == 0 { + return nil + } + if err := t.SendAudio(ctx, pcm, sampleRate); err != nil { + return err + } + if isWebRTC { + return nil + } + wsPCM := pcm + if sampleRate != 0 && sampleRate != session.OutputSampleRate { + samples := sound.BytesToInt16sLE(pcm) + resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate) + wsPCM = sound.Int16toBytesLE(resampled) + } + return t.SendEvent(types.ResponseOutputAudioDeltaEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Delta: base64.StdEncoding.EncodeToString(wsPCM), + }) + } + + language := "" + if session.InputAudioTranscription != nil { + language = session.InputAudioTranscription.Language + } + + if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() { + return session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk) + } + + // Unary fallback: synthesize the whole utterance to a file, then emit once. + audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language) + if err != nil { + return err + } + if res != nil && !res.Success { + return fmt.Errorf("tts generation failed: %s", res.Message) + } + defer func() { _ = os.Remove(audioFilePath) }() + + audioBytes, err := os.ReadFile(audioFilePath) + if err != nil { + return fmt.Errorf("read tts audio: %w", err) + } + pcm, sampleRate := laudio.ParseWAV(audioBytes) + if sampleRate == 0 { + sampleRate = session.OutputSampleRate + } + return sendChunk(pcm, sampleRate) +} diff --git a/core/http/endpoints/openai/realtime_speech_test.go b/core/http/endpoints/openai/realtime_speech_test.go new file mode 100644 index 000000000000..1e7da36c7d02 --- /dev/null +++ b/core/http/endpoints/openai/realtime_speech_test.go @@ -0,0 +1,68 @@ +package openai + +import ( + "context" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + laudio "github.com/mudler/LocalAI/pkg/audio" +) + +// emitSpeech synthesizes a piece of text and forwards the audio to the client, +// streaming a delta per TTS chunk when the pipeline opts in, or sending the +// whole utterance as one delta otherwise. +var _ = Describe("emitSpeech", func() { + ttsOn := true + + streamingSession := func(m Model) *Session { + return &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &ttsOn}}, + }, + } + } + + It("streams one output_audio.delta per TTS chunk when streaming is enabled", func() { + m := &fakeModel{ + ttsStreamChunks: [][]byte{{1, 2}, {3, 4}, {5, 6}}, + ttsStreamRate: 24000, + } + t := &fakeTransport{} + + err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.") + + Expect(err).ToNot(HaveOccurred()) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3)) + }) + + It("sends a single output_audio.delta in unary mode", func() { + // A minimal real WAV file for the unary TTS path to read + parse. + f, err := os.CreateTemp("", "emit-*.wav") + Expect(err).ToNot(HaveOccurred()) + defer os.Remove(f.Name()) + pcm := make([]byte, 320) // 160 samples of silence + hdr := laudio.NewWAVHeader(uint32(len(pcm))) + Expect(hdr.Write(f)).To(Succeed()) + _, err = f.Write(pcm) + Expect(err).ToNot(HaveOccurred()) + Expect(f.Close()).To(Succeed()) + + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: &fakeModel{ttsFile: f.Name()}, + ModelConfig: &config.ModelConfig{}, // streaming off + } + t := &fakeTransport{} + + err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.") + + Expect(err).ToNot(HaveOccurred()) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) + }) +}) From 378d6c25cfea145701ee8ceadffa0ff539f55293 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:20:28 +0000 Subject: [PATCH 05/15] feat(realtime): route response audio through emitSpeech (streaming TTS) Replace the inline unary TTS block in the response handler with emitSpeech, which streams a response.output_audio.delta per backend PCM chunk when pipeline.streaming.tts is set and otherwise preserves the single-delta unary behaviour. emitSpeech returns the accumulated base64 audio, stored on the conversation item as before. Transcript and audio-done events stay in the handler so later per-segment streaming can reuse emitSpeech. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 84 ++++--------------- core/http/endpoints/openai/realtime_speech.go | 28 +++++-- .../endpoints/openai/realtime_speech_test.go | 7 +- 3 files changed, 44 insertions(+), 75 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 76dc9fd3f254..bbb11a681c72 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1719,64 +1719,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa return } - audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language) - if err != nil { - if ctx.Err() != nil { - xlog.Debug("TTS cancelled (barge-in)") - sendCancelledResponse() - return - } - xlog.Error("TTS failed", "error", err) - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) - return - } - if !res.Success { - xlog.Error("TTS failed", "message", res.Message) - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID) - return - } - defer func() { _ = os.Remove(audioFilePath) }() - - audioBytes, err := os.ReadFile(audioFilePath) - if err != nil { - xlog.Error("failed to read TTS file", "error", err) - sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID) - return - } - - // Parse WAV header to get raw PCM and the actual sample rate from the TTS backend. - pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes) - if ttsSampleRate == 0 { - ttsSampleRate = localSampleRate - } - xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate) - - // SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the - // Opus encoder, which resamples to 48kHz internally. This avoids a - // lossy intermediate resample through 16kHz. - // XXX: This is a noop in websocket mode; it's included in the JSON instead - if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil { - if ctx.Err() != nil { - xlog.Debug("Audio playback cancelled (barge-in)") - sendCancelledResponse() - return - } - xlog.Error("failed to send audio via transport", "error", err) - } - - // For WebSocket clients, resample to the session's output rate and - // deliver audio as base64 in JSON events. WebRTC clients already - // received audio over the RTP track, so skip the base64 payload. - if !isWebRTC { - wsPCM := pcmData - if ttsSampleRate != session.OutputSampleRate { - samples := sound.BytesToInt16sLE(pcmData) - resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate) - wsPCM = sound.Int16toBytesLE(resampled) - } - audioString = base64.StdEncoding.EncodeToString(wsPCM) - } - + // Transcript of the spoken reply (the audio's text). sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, @@ -1794,15 +1737,24 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa Transcript: finalSpeech, }) + // Synthesize and send the audio. With pipeline.streaming.tts enabled + // emitSpeech forwards a response.output_audio.delta per backend PCM + // chunk as it's produced; otherwise it sends the whole utterance as a + // single delta. The returned base64 audio is stored on the item below. + var err error + audioString, err = emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech) + if err != nil { + if ctx.Err() != nil { + xlog.Debug("TTS cancelled (barge-in)") + sendCancelledResponse() + return + } + xlog.Error("TTS failed", "error", err) + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) + return + } + if !isWebRTC { - sendEvent(t, types.ResponseOutputAudioDeltaEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Delta: audioString, - }) sendEvent(t, types.ResponseOutputAudioDoneEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, diff --git a/core/http/endpoints/openai/realtime_speech.go b/core/http/endpoints/openai/realtime_speech.go index b8429f7cafb3..830777371e2a 100644 --- a/core/http/endpoints/openai/realtime_speech.go +++ b/core/http/endpoints/openai/realtime_speech.go @@ -19,13 +19,20 @@ import ( // It deliberately does NOT emit transcript or audio-done events: the caller owns // those so a streamed reply can be split into several spoken segments that share // one response/item. -func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) error { +// +// It returns the base64-encoded audio (at the session output rate) accumulated +// across all chunks, which the caller stores on the conversation item. For +// WebRTC the audio goes over the RTP track instead, so the returned string is +// empty. +func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) (string, error) { if text == "" { - return nil + return "", nil } _, isWebRTC := t.(*WebRTCTransport) + var wsAudio []byte // PCM at the session output rate, accumulated for the item record + // sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw // PCM directly (it resamples internally); WebSocket gets base64 PCM at the // session output rate via a JSON delta event. @@ -45,6 +52,7 @@ func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate) wsPCM = sound.Int16toBytesLE(resampled) } + wsAudio = append(wsAudio, wsPCM...) return t.SendEvent(types.ResponseOutputAudioDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, @@ -61,26 +69,32 @@ func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, } if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() { - return session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk) + if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(wsAudio), nil } // Unary fallback: synthesize the whole utterance to a file, then emit once. audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language) if err != nil { - return err + return "", err } if res != nil && !res.Success { - return fmt.Errorf("tts generation failed: %s", res.Message) + return "", fmt.Errorf("tts generation failed: %s", res.Message) } defer func() { _ = os.Remove(audioFilePath) }() audioBytes, err := os.ReadFile(audioFilePath) if err != nil { - return fmt.Errorf("read tts audio: %w", err) + return "", fmt.Errorf("read tts audio: %w", err) } pcm, sampleRate := laudio.ParseWAV(audioBytes) if sampleRate == 0 { sampleRate = session.OutputSampleRate } - return sendChunk(pcm, sampleRate) + if err := sendChunk(pcm, sampleRate); err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(wsAudio), nil } diff --git a/core/http/endpoints/openai/realtime_speech_test.go b/core/http/endpoints/openai/realtime_speech_test.go index 1e7da36c7d02..268e6a87757e 100644 --- a/core/http/endpoints/openai/realtime_speech_test.go +++ b/core/http/endpoints/openai/realtime_speech_test.go @@ -2,6 +2,7 @@ package openai import ( "context" + "encoding/base64" "os" . "github.com/onsi/ginkgo/v2" @@ -35,10 +36,12 @@ var _ = Describe("emitSpeech", func() { } t := &fakeTransport{} - err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.") + audio, err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.") Expect(err).ToNot(HaveOccurred()) Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3)) + // The returned audio is the base64 of all chunks concatenated. + Expect(audio).To(Equal(base64.StdEncoding.EncodeToString([]byte{1, 2, 3, 4, 5, 6}))) }) It("sends a single output_audio.delta in unary mode", func() { @@ -60,7 +63,7 @@ var _ = Describe("emitSpeech", func() { } t := &fakeTransport{} - err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.") + _, err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.") Expect(err).ToNot(HaveOccurred()) Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) From 98ed541b228b73bcab17a332ffc4b36ec80dac6a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:23:04 +0000 Subject: [PATCH 06/15] feat(realtime): streaming transcription text deltas Add emitTranscription and route commitUtterance through it. With pipeline.streaming.transcription set it streams each transcript fragment as a conversation.item.input_audio_transcription.delta via TranscribeStream then a completed event; otherwise it preserves the single completed-event unary behaviour. Returns the final transcript for response generation. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 22 ++----- .../openai/realtime_transcription.go | 63 +++++++++++++++++++ .../openai/realtime_transcription_test.go | 54 ++++++++++++++++ 3 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 core/http/endpoints/openai/realtime_transcription.go create mode 100644 core/http/endpoints/openai/realtime_transcription_test.go diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index bbb11a681c72..b14f64b4d09f 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1260,27 +1260,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co // TODO: If we have a real any-to-any model then transcription is optional var transcript string if session.InputAudioTranscription != nil { - tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt) + // emitTranscription streams transcript deltas when + // pipeline.streaming.transcription is set, otherwise emits a single + // completed event; either way it returns the final transcript text. + var err error + transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name()) if err != nil { sendError(t, "transcription_failed", err.Error(), "", "event_TODO") return - } else if tr == nil { - sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO") - return } - - transcript = tr.Text - sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{ - ServerEventBase: types.ServerEventBase{ - EventID: "event_TODO", - }, - - ItemID: generateItemID(), - // ResponseID: "resp_TODO", // Not needed for transcription completed event - // OutputIndex: 0, - ContentIndex: 0, - Transcript: transcript, - }) } else { sendNotImplemented(t, "any-to-any models") return diff --git a/core/http/endpoints/openai/realtime_transcription.go b/core/http/endpoints/openai/realtime_transcription.go new file mode 100644 index 000000000000..44456101c44f --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription.go @@ -0,0 +1,63 @@ +package openai + +import ( + "context" + "fmt" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" +) + +// emitTranscription transcribes a committed utterance and emits the transcription +// events for it, returning the final transcript text. With +// pipeline.streaming.transcription enabled it streams each transcript fragment as +// a conversation.item.input_audio_transcription.delta as the backend produces it, +// then a completed event; otherwise it transcribes the whole utterance and emits +// a single completed event. delta and completed events share itemID. +func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) { + cfg := session.InputAudioTranscription + + if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() { + final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) { + _ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Delta: delta, + }) + }) + if err != nil { + return "", err + } + transcript := "" + if final != nil { + transcript = final.Text + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: transcript, + }); err != nil { + return "", err + } + return transcript, nil + } + + // Unary fallback: transcribe the whole utterance, emit one completed event. + tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt) + if err != nil { + return "", err + } + if tr == nil { + return "", fmt.Errorf("transcribe result is nil") + } + if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{ + ServerEventBase: types.ServerEventBase{EventID: "event_TODO"}, + ItemID: itemID, + ContentIndex: 0, + Transcript: tr.Text, + }); err != nil { + return "", err + } + return tr.Text, nil +} diff --git a/core/http/endpoints/openai/realtime_transcription_test.go b/core/http/endpoints/openai/realtime_transcription_test.go new file mode 100644 index 000000000000..f3f760fd8a5f --- /dev/null +++ b/core/http/endpoints/openai/realtime_transcription_test.go @@ -0,0 +1,54 @@ +package openai + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" +) + +// emitTranscription transcribes a committed utterance, streaming transcript text +// deltas when the pipeline opts in, and returns the final transcript text. +var _ = Describe("emitTranscription", func() { + It("streams transcription deltas then a completed event when streaming is enabled", func() { + on := true + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}}, + }, + ModelInterface: &fakeModel{ + transcribeDeltas: []string{"Hel", "lo", " world"}, + transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"}, + }, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hello world")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) + + It("emits a single completed event with no deltas in unary mode", func() { + session := &Session{ + InputAudioTranscription: &types.AudioTranscription{}, + ModelConfig: &config.ModelConfig{}, // streaming off + ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}}, + } + t := &fakeTransport{} + + transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav") + + Expect(err).ToNot(HaveOccurred()) + Expect(transcript).To(Equal("Hi")) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0)) + Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1)) + }) +}) From 685e4632d7ef96c7419e7d818380e781da8d97a5 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:25:53 +0000 Subject: [PATCH 07/15] feat(realtime): pipeline disable_thinking maps to enable_thinking off applyPipelineThinking forces the LLM's ReasoningConfig.DisableReasoning when pipeline.disable_thinking is set, which gRPCPredictOpts turns into the enable_thinking=false backend metadata. Applied at newModel construction on the per-session LLM config copy, so it doesn't leak to other model users and needs no realtime-specific request plumbing. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime_model.go | 4 ++- .../endpoints/openai/realtime_thinking.go | 17 ++++++++++++ .../openai/realtime_thinking_test.go | 26 +++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 core/http/endpoints/openai/realtime_thinking.go create mode 100644 core/http/endpoints/openai/realtime_thinking_test.go diff --git a/core/http/endpoints/openai/realtime_model.go b/core/http/endpoints/openai/realtime_model.go index b184393403f5..8281197a3021 100644 --- a/core/http/endpoints/openai/realtime_model.go +++ b/core/http/endpoints/openai/realtime_model.go @@ -528,8 +528,10 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model return nil, fmt.Errorf("failed to validate config: %w", err) } - // Let the pipeline set the LLM's reasoning effort (cfgLLM is a per-session copy). + // Let the pipeline set the LLM's reasoning effort and force thinking off + // (cfgLLM is a per-session copy). disable_thinking applies after the effort. applyPipelineReasoning(cfgLLM, *pipeline) + applyPipelineThinking(cfgLLM, *pipeline) cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath) if err != nil { diff --git a/core/http/endpoints/openai/realtime_thinking.go b/core/http/endpoints/openai/realtime_thinking.go new file mode 100644 index 000000000000..41addf963164 --- /dev/null +++ b/core/http/endpoints/openai/realtime_thinking.go @@ -0,0 +1,17 @@ +package openai + +import "github.com/mudler/LocalAI/core/config" + +// applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime +// pipeline sets disable_thinking, mapping to the enable_thinking=false backend +// metadata via ReasoningConfig.DisableReasoning. The LLM config passed in is the +// per-session copy returned by the config loader, so this does not affect other +// users of the same model. When the pipeline does not set disable_thinking the +// LLM config is left untouched. +func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) { + if llm == nil || !pipeline.ThinkingDisabled() { + return + } + disable := true + llm.ReasoningConfig.DisableReasoning = &disable +} diff --git a/core/http/endpoints/openai/realtime_thinking_test.go b/core/http/endpoints/openai/realtime_thinking_test.go new file mode 100644 index 000000000000..6a38fa86d411 --- /dev/null +++ b/core/http/endpoints/openai/realtime_thinking_test.go @@ -0,0 +1,26 @@ +package openai + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" +) + +// applyPipelineThinking lets a realtime pipeline force the LLM's thinking off +// (enable_thinking=false metadata) without editing the LLM model config. +var _ = Describe("applyPipelineThinking", func() { + It("disables reasoning on the LLM config when the pipeline disables thinking", func() { + disable := true + llm := &config.ModelConfig{} + applyPipelineThinking(llm, config.Pipeline{DisableThinking: &disable}) + Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil()) + Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue()) + }) + + It("leaves the LLM config untouched when the pipeline does not set disable_thinking", func() { + llm := &config.ModelConfig{} + applyPipelineThinking(llm, config.Pipeline{}) + Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil()) + }) +}) From ca23d05c66ead2caa8b8dcc679dc22c4d4a31d34 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:36:11 +0000 Subject: [PATCH 08/15] feat(realtime): speechStreamer for token-streamed LLM->TTS emitSpeech now returns raw PCM (caller base64-encodes) so streamed segments accumulate correctly. speechStreamer consumes streamed LLM tokens: it strips reasoning via the streaming ReasoningExtractor, emits a transcript delta per content fragment, and sentence-pipes content into emitSpeech so each sentence is synthesized as soon as it's ready. Handler wiring (plain-content turns) follows. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 8 +- core/http/endpoints/openai/realtime_speech.go | 25 +++--- .../endpoints/openai/realtime_speech_test.go | 5 +- core/http/endpoints/openai/realtime_stream.go | 86 +++++++++++++++++++ .../endpoints/openai/realtime_stream_test.go | 65 ++++++++++++++ 5 files changed, 170 insertions(+), 19 deletions(-) create mode 100644 core/http/endpoints/openai/realtime_stream.go create mode 100644 core/http/endpoints/openai/realtime_stream_test.go diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index b14f64b4d09f..14971ff1fe51 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1728,9 +1728,8 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa // Synthesize and send the audio. With pipeline.streaming.tts enabled // emitSpeech forwards a response.output_audio.delta per backend PCM // chunk as it's produced; otherwise it sends the whole utterance as a - // single delta. The returned base64 audio is stored on the item below. - var err error - audioString, err = emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech) + // single delta. The returned PCM is stored (base64) on the item below. + pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech) if err != nil { if ctx.Err() != nil { xlog.Debug("TTS cancelled (barge-in)") @@ -1741,6 +1740,9 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) return } + if !isWebRTC { + audioString = base64.StdEncoding.EncodeToString(pcmAudio) + } if !isWebRTC { sendEvent(t, types.ResponseOutputAudioDoneEvent{ diff --git a/core/http/endpoints/openai/realtime_speech.go b/core/http/endpoints/openai/realtime_speech.go index 830777371e2a..2b98b1b4e956 100644 --- a/core/http/endpoints/openai/realtime_speech.go +++ b/core/http/endpoints/openai/realtime_speech.go @@ -20,13 +20,12 @@ import ( // those so a streamed reply can be split into several spoken segments that share // one response/item. // -// It returns the base64-encoded audio (at the session output rate) accumulated -// across all chunks, which the caller stores on the conversation item. For -// WebRTC the audio goes over the RTP track instead, so the returned string is -// empty. -func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) (string, error) { +// It returns the PCM audio (at the session output rate) accumulated across all +// chunks, which the caller base64-encodes onto the conversation item. For WebRTC +// the audio goes over the RTP track instead, so the returned slice is empty. +func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) ([]byte, error) { if text == "" { - return "", nil + return nil, nil } _, isWebRTC := t.(*WebRTCTransport) @@ -70,31 +69,31 @@ func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() { if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil { - return "", err + return nil, err } - return base64.StdEncoding.EncodeToString(wsAudio), nil + return wsAudio, nil } // Unary fallback: synthesize the whole utterance to a file, then emit once. audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language) if err != nil { - return "", err + return nil, err } if res != nil && !res.Success { - return "", fmt.Errorf("tts generation failed: %s", res.Message) + return nil, fmt.Errorf("tts generation failed: %s", res.Message) } defer func() { _ = os.Remove(audioFilePath) }() audioBytes, err := os.ReadFile(audioFilePath) if err != nil { - return "", fmt.Errorf("read tts audio: %w", err) + return nil, fmt.Errorf("read tts audio: %w", err) } pcm, sampleRate := laudio.ParseWAV(audioBytes) if sampleRate == 0 { sampleRate = session.OutputSampleRate } if err := sendChunk(pcm, sampleRate); err != nil { - return "", err + return nil, err } - return base64.StdEncoding.EncodeToString(wsAudio), nil + return wsAudio, nil } diff --git a/core/http/endpoints/openai/realtime_speech_test.go b/core/http/endpoints/openai/realtime_speech_test.go index 268e6a87757e..6d09a72174a9 100644 --- a/core/http/endpoints/openai/realtime_speech_test.go +++ b/core/http/endpoints/openai/realtime_speech_test.go @@ -2,7 +2,6 @@ package openai import ( "context" - "encoding/base64" "os" . "github.com/onsi/ginkgo/v2" @@ -40,8 +39,8 @@ var _ = Describe("emitSpeech", func() { Expect(err).ToNot(HaveOccurred()) Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3)) - // The returned audio is the base64 of all chunks concatenated. - Expect(audio).To(Equal(base64.StdEncoding.EncodeToString([]byte{1, 2, 3, 4, 5, 6}))) + // The returned audio is all chunks concatenated (session output rate). + Expect(audio).To(Equal([]byte{1, 2, 3, 4, 5, 6})) }) It("sends a single output_audio.delta in unary mode", func() { diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go new file mode 100644 index 000000000000..aa3f31d7d057 --- /dev/null +++ b/core/http/endpoints/openai/realtime_stream.go @@ -0,0 +1,86 @@ +package openai + +import ( + "context" + + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// speechStreamer consumes streamed LLM tokens and drives the realtime output: +// it strips reasoning incrementally, emits a transcript text delta for each +// content fragment, and — when the pipeline streams TTS — sentence-pipes the +// content so each completed sentence is synthesized as soon as it's ready, +// overlapping generation, synthesis and playback. +// +// It is used only for plain-content turns (no tools): tool-call output can't be +// safely spoken mid-stream, so those turns keep the buffered path. +type speechStreamer struct { + ctx context.Context + t Transport + session *Session + responseID string + itemID string + + extractor *reasoning.ReasoningExtractor + seg streamSegmenter + audio []byte + streamTTS bool + err error +} + +func newSpeechStreamer(ctx context.Context, t Transport, session *Session, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *speechStreamer { + return &speechStreamer{ + ctx: ctx, + t: t, + session: session, + responseID: responseID, + itemID: itemID, + extractor: reasoning.NewReasoningExtractor(thinkingStartToken, reasoningCfg), + streamTTS: session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS(), + } +} + +// onToken handles one streamed LLM token. It is shaped to be used directly as +// the backend token callback's text sink. +func (s *speechStreamer) onToken(token string) { + _, content := s.extractor.ProcessToken(token) + if content == "" { + return + } + _ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: s.responseID, + ItemID: s.itemID, + OutputIndex: 0, + ContentIndex: 0, + Delta: content, + }) + if s.streamTTS { + for _, segment := range s.seg.Push(content) { + s.speak(segment) + } + } +} + +func (s *speechStreamer) speak(text string) { + pcm, err := emitSpeech(s.ctx, s.t, s.session, s.responseID, s.itemID, text) + if err != nil { + if s.err == nil { + s.err = err + } + return + } + s.audio = append(s.audio, pcm...) +} + +// finish flushes any buffered sentence to TTS and returns the full cleaned +// content, the accumulated PCM audio, and the first error encountered (if any). +func (s *speechStreamer) finish() (content string, audio []byte, err error) { + if s.streamTTS { + if rem := s.seg.Flush(); rem != "" { + s.speak(rem) + } + } + return s.extractor.CleanedContent(), s.audio, s.err +} diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go new file mode 100644 index 000000000000..a6d233175ed1 --- /dev/null +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -0,0 +1,65 @@ +package openai + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/pkg/reasoning" +) + +// speechStreamer consumes streamed LLM tokens: it strips reasoning, emits a +// transcript delta per content fragment, and sentence-pipes content into TTS so +// audio starts before the full reply is generated. +var _ = Describe("speechStreamer", func() { + It("emits a transcript delta per token and speaks each completed sentence", func() { + on := true + m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &on}}, + }, + } + t := &fakeTransport{} + s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + + for _, tok := range []string{"Hello", " world.", " Bye"} { + s.onToken(tok) + } + content, audio, err := s.finish() + + Expect(err).ToNot(HaveOccurred()) + Expect(content).To(Equal("Hello world. Bye")) + // One transcript delta per (non-empty) token. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) + // Two sentences spoken: "Hello world." mid-stream + "Bye" on flush; one + // chunk each. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2)) + Expect(audio).To(Equal([]byte{7, 7})) + }) + + It("does not synthesize audio when TTS streaming is disabled", func() { + m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{}, // streaming.tts off + } + t := &fakeTransport{} + s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + + s.onToken("Hello world.") + content, audio, err := s.finish() + + Expect(err).ToNot(HaveOccurred()) + Expect(content).To(Equal("Hello world.")) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(1)) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(0)) + Expect(audio).To(BeEmpty()) + }) +}) From 16a5bab71fa9c6e9d2dade8887694c5ca72a37dc Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 16:40:32 +0000 Subject: [PATCH 09/15] feat(realtime): wire streamLLMResponse for token-streamed replies triggerResponseAtTurn takes a streamed path when pipeline.streaming.llm is set, the turn has no tools, and audio is requested: streamLLMResponse announces the assistant item, drives the LLM token callback through a speechStreamer (reasoning-stripped transcript deltas + sentence-piped TTS), and emits the terminal events. Tool turns and non-streaming pipelines keep the existing buffered path unchanged, so this is strictly opt-in. Assisted-by: Claude:claude-opus-4-8 go vet Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 17 +++ core/http/endpoints/openai/realtime_stream.go | 144 ++++++++++++++++++ 2 files changed, 161 insertions(+) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 14971ff1fe51..078cf4a5b999 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1496,6 +1496,23 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }, }) + // Streamed LLM path: when the pipeline opts into LLM streaming and the turn + // cannot produce a tool call (no tools), stream tokens straight to the client + // as transcript deltas and sentence-pipe them into TTS. Tool turns fall + // through to the buffered path below, since partial tool-call output can't be + // safely spoken mid-stream. + if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() && len(tools) == 0 { + var respMods []types.Modality + if overrides != nil { + respMods = overrides.OutputModalities + } + if modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { + if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config) { + return + } + } + } + predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil) if err != nil { sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index aa3f31d7d057..015f6850efe1 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -2,8 +2,13 @@ package openai import ( "context" + "encoding/base64" + "fmt" + "github.com/mudler/LocalAI/core/backend" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -84,3 +89,142 @@ func (s *speechStreamer) finish() (content string, audio []byte, err error) { } return s.extractor.CleanedContent(), s.audio, s.err } + +// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. +// It announces the assistant item before tokens arrive, feeds the LLM token +// callback through a speechStreamer (transcript deltas + sentence-piped TTS), +// then emits the terminal events. It returns true when it has fully handled the +// response so the caller can return; callers must only invoke it for turns with +// no tools and an audio modality (see triggerResponseAtTurn). +func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { + // Announce the assistant item up front so streamed deltas target a known item. + item := types.MessageItemUnion{ + Assistant: &types.MessageItemAssistant{ + ID: generateItemID(), + Status: types.ItemStatusInProgress, + Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}}, + }, + } + conv.Lock.Lock() + conv.Items = append(conv.Items, &item) + conv.Lock.Unlock() + + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseContentPartAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + + cancel := func() { + conv.Lock.Lock() + for i := len(conv.Items) - 1; i >= 0; i-- { + if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == item.Assistant.ID { + conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) + break + } + } + conv.Lock.Unlock() + sendEvent(t, types.ResponseDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled}, + }) + } + + var template string + if llmCfg.TemplateConfig.UseTokenizerTemplate { + template = llmCfg.GetModelTemplate() + } else { + template = llmCfg.TemplateConfig.Chat + } + thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) + + streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) + cb := func(token string, _ backend.TokenUsage) bool { + if ctx.Err() != nil { + return false + } + streamer.onToken(token) + return true + } + + predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, nil, nil, nil, nil, nil) + if err != nil { + sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + return true + } + if _, err := predFunc(); err != nil { + if ctx.Err() != nil { + cancel() + return true + } + sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + return true + } + if ctx.Err() != nil { + cancel() + return true + } + + content, audio, err := streamer.finish() + if err != nil { + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) + return true + } + + _, isWebRTC := t.(*WebRTCTransport) + + sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Transcript: content, + }) + if !isWebRTC { + sendEvent(t, types.ResponseOutputAudioDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + }) + } + + conv.Lock.Lock() + item.Assistant.Status = types.ItemStatusCompleted + item.Assistant.Content[0].Transcript = content + if !isWebRTC { + item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + } + conv.Lock.Unlock() + + sendEvent(t, types.ResponseContentPartDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: item.Assistant.ID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCompleted}, + }) + return true +} From 658a3efb2030db3af2fddab6875b2f21d14322be Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 17:46:05 +0000 Subject: [PATCH 10/15] docs(realtime): document pipeline streaming + disable_thinking Assisted-by: Claude:claude-opus-4-8 Signed-off-by: Ettore Di Giacinto --- docs/content/features/openai-realtime.md | 35 ++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/docs/content/features/openai-realtime.md b/docs/content/features/openai-realtime.md index 8dba6d419845..b9af56ca200e 100644 --- a/docs/content/features/openai-realtime.md +++ b/docs/content/features/openai-realtime.md @@ -31,6 +31,41 @@ This configuration links the following components: Make sure all referenced models (`silero-vad-ggml`, `whisper-large-turbo`, `qwen3-4b`, `tts-1`) are also installed or defined in your LocalAI instance. +### Streaming the pipeline + +By default each stage runs to completion before the next begins: the whole utterance is transcribed, the full LLM reply is generated, then it is synthesized. Each stage can instead be streamed incrementally, which lowers the time-to-first-audio of a turn: + +```yaml +name: gpt-realtime +pipeline: + vad: silero-vad-ggml + transcription: whisper-large-turbo + llm: qwen3-4b + tts: tts-1 + streaming: + llm: true # stream LLM tokens as transcript deltas + tts: true # emit audio deltas per synthesized chunk + transcription: true # stream transcript text deltas of the user's speech +``` + +- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces, instead of one delta for the whole utterance. +- **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming). +- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events and, when `streaming.tts` is also enabled, synthesize each completed sentence as soon as it is ready — overlapping generation, synthesis and playback. Streaming is used only for turns that cannot produce a tool call; turns with tools fall back to the buffered path so partial tool-call output is never spoken. + +All streaming flags are off by default, so existing pipelines are unaffected. + +### Disabling thinking + +For reasoning models, you can force the pipeline LLM's thinking off without editing the LLM model config: + +```yaml +pipeline: + llm: qwen3-4b + disable_thinking: true # maps to enable_thinking=false for the realtime LLM +``` + +This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings. + ## Transports The Realtime API supports two transports: **WebSocket** and **WebRTC**. From f48344f2ff0b322b6329b87fa638f72eea36fd38 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 23:08:49 +0000 Subject: [PATCH 11/15] fix(realtime): register pipeline streaming/thinking config fields TestAllFieldsHaveRegistryEntries (core/config/meta) requires every config field to have a meta registry entry. The four new pipeline fields (disable_thinking, streaming.{llm,tts,transcription}) had none, failing tests-linux/tests-apple. Add toggle entries for them. Also handle the os.Remove return in realtime_speech_test.go to satisfy errcheck (golangci-lint). Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- core/config/meta/registry.go | 28 +++++++++++++++++++ .../endpoints/openai/realtime_speech_test.go | 2 +- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/core/config/meta/registry.go b/core/config/meta/registry.go index 548b218921ba..565bc2d4e272 100644 --- a/core/config/meta/registry.go +++ b/core/config/meta/registry.go @@ -308,6 +308,34 @@ func DefaultRegistry() map[string]FieldMetaOverride { }, Order: 64, }, + "pipeline.disable_thinking": { + Section: "pipeline", + Label: "Disable Thinking", + Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit blocks you don't want spoken or streamed back to the realtime client.", + Component: "toggle", + Order: 65, + }, + "pipeline.streaming.llm": { + Section: "pipeline", + Label: "Stream LLM", + Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.", + Component: "toggle", + Order: 66, + }, + "pipeline.streaming.tts": { + Section: "pipeline", + Label: "Stream TTS", + Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.", + Component: "toggle", + Order: 67, + }, + "pipeline.streaming.transcription": { + Section: "pipeline", + Label: "Stream Transcription", + Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.", + Component: "toggle", + Order: 68, + }, // --- Functions --- "function.grammar.parallel_calls": { diff --git a/core/http/endpoints/openai/realtime_speech_test.go b/core/http/endpoints/openai/realtime_speech_test.go index 6d09a72174a9..a501f946cc84 100644 --- a/core/http/endpoints/openai/realtime_speech_test.go +++ b/core/http/endpoints/openai/realtime_speech_test.go @@ -47,7 +47,7 @@ var _ = Describe("emitSpeech", func() { // A minimal real WAV file for the unary TTS path to read + parse. f, err := os.CreateTemp("", "emit-*.wav") Expect(err).ToNot(HaveOccurred()) - defer os.Remove(f.Name()) + defer func() { _ = os.Remove(f.Name()) }() pcm := make([]byte, 320) // 160 samples of silence hdr := laudio.NewWAVHeader(uint32(len(pcm))) Expect(hdr.Write(f)).To(Succeed()) From cb3609530a8944244f696e6464d1116fd8c93d07 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 4 Jun 2026 23:18:37 +0000 Subject: [PATCH 12/15] fix(realtime): always strip reasoning from spoken output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit disable_thinking maps to ReasoningConfig.DisableReasoning=true on the LLM config, which the backend reads as enable_thinking=false. But the realtime handler reads that SAME config to drive reasoning extraction, and there DisableReasoning=true means "skip stripping". PredictConfig() returns this LLM config, so both the streamed (speechStreamer) and buffered realtime paths stopped stripping exactly when disable_thinking was on — leaking raw reasoning to the client whenever the model ignored the enable_thinking hint (e.g. lfm2.5). Add spokenReasoningConfig() which clears DisableReasoning for extraction (keeping custom tokens/tag pairs) and route both realtime paths through it. Spoken output now always strips reasoning, independent of the backend suppression hint. Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 4 +-- .../endpoints/openai/realtime_doubles_test.go | 13 ++++++++++ core/http/endpoints/openai/realtime_stream.go | 3 +++ .../endpoints/openai/realtime_stream_test.go | 26 +++++++++++++++++++ .../endpoints/openai/realtime_thinking.go | 18 ++++++++++++- .../openai/realtime_thinking_test.go | 24 +++++++++++++++++ 6 files changed, 85 insertions(+), 3 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 078cf4a5b999..bc2a8078534e 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1590,7 +1590,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa // ExtractReasoningWithConfig is a no-op when no tag pair matches, // so it's safe to apply unconditionally in the no-reasoning branch. if deltaReasoning == "" && deltaContent != "" { - deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig) + deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig)) } reasoningText = deltaReasoning responseWithoutReasoning = deltaContent @@ -1598,7 +1598,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa cleanedResponse = deltaContent toolCalls = deltaToolCalls } else { - reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig) + reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig)) textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig) cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig) toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig) diff --git a/core/http/endpoints/openai/realtime_doubles_test.go b/core/http/endpoints/openai/realtime_doubles_test.go index 2a54f3dbe47c..afb1f5e7a31a 100644 --- a/core/http/endpoints/openai/realtime_doubles_test.go +++ b/core/http/endpoints/openai/realtime_doubles_test.go @@ -2,6 +2,7 @@ package openai import ( "context" + "strings" "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" @@ -48,6 +49,18 @@ func (f *fakeTransport) countEvents(et types.ServerEventType) int { return n } +// transcriptDeltaText concatenates the Delta of every recorded transcript +// delta event — i.e. the text streamed to the client as it is generated. +func (f *fakeTransport) transcriptDeltaText() string { + var b strings.Builder + for _, e := range f.events { + if d, ok := e.(types.ResponseOutputAudioTranscriptDeltaEvent); ok { + b.WriteString(d.Delta) + } + } + return b.String() +} + // fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks // and TranscribeStream replays transcribeDeltas, so the handler's streaming // paths can be driven deterministically. diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index 015f6850efe1..09526c561fe5 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -35,6 +35,9 @@ type speechStreamer struct { } func newSpeechStreamer(ctx context.Context, t Transport, session *Session, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *speechStreamer { + // Spoken output must never contain reasoning, even when disable_thinking set + // DisableReasoning (which would otherwise turn the extractor's stripping off). + reasoningCfg = spokenReasoningConfig(reasoningCfg) return &speechStreamer{ ctx: ctx, t: t, diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index a6d233175ed1..d8697c331e9f 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -43,6 +43,32 @@ var _ = Describe("speechStreamer", func() { Expect(audio).To(Equal([]byte{7, 7})) }) + It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() { + // disable_thinking maps to ReasoningConfig.DisableReasoning=true (it tells + // the backend enable_thinking=false). When the model ignores that and emits + // thinking anyway, the spoken stream must still not leak it: the streamer is + // the last line of defence and always strips reasoning from spoken content. + disable := true + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: &fakeModel{}, + ModelConfig: &config.ModelConfig{}, // streaming.tts off + } + t := &fakeTransport{} + s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", + reasoning.Config{DisableReasoning: &disable}) + + s.onToken("secret plan") + s.onToken("The answer is 42.") + content, _, err := s.finish() + + Expect(err).ToNot(HaveOccurred()) + Expect(content).To(Equal("The answer is 42.")) + Expect(content).ToNot(ContainSubstring("secret plan")) + // The text streamed to the client must not carry the reasoning either. + Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan")) + }) + It("does not synthesize audio when TTS streaming is disabled", func() { m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} session := &Session{ diff --git a/core/http/endpoints/openai/realtime_thinking.go b/core/http/endpoints/openai/realtime_thinking.go index 41addf963164..8222219afefc 100644 --- a/core/http/endpoints/openai/realtime_thinking.go +++ b/core/http/endpoints/openai/realtime_thinking.go @@ -1,6 +1,9 @@ package openai -import "github.com/mudler/LocalAI/core/config" +import ( + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/reasoning" +) // applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime // pipeline sets disable_thinking, mapping to the enable_thinking=false backend @@ -15,3 +18,16 @@ func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) { disable := true llm.ReasoningConfig.DisableReasoning = &disable } + +// spokenReasoningConfig adapts a model's reasoning config for stripping reasoning +// OUT of realtime spoken output. ReasoningConfig.DisableReasoning is overloaded: +// the backend reads it as the "enable_thinking=false" hint (which pipeline +// disable_thinking sets via applyPipelineThinking), but the reasoning extractor +// reads it as "skip stripping, assume there is no reasoning". Honouring the latter +// when extracting for speech would leak raw whenever the model +// ignores the suppression hint. Spoken output must never contain reasoning, so we +// always strip: clear DisableReasoning while keeping custom tokens/tag pairs. +func spokenReasoningConfig(cfg reasoning.Config) reasoning.Config { + cfg.DisableReasoning = nil + return cfg +} diff --git a/core/http/endpoints/openai/realtime_thinking_test.go b/core/http/endpoints/openai/realtime_thinking_test.go index 6a38fa86d411..a056dd0e7700 100644 --- a/core/http/endpoints/openai/realtime_thinking_test.go +++ b/core/http/endpoints/openai/realtime_thinking_test.go @@ -5,6 +5,7 @@ import ( . "github.com/onsi/gomega" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/reasoning" ) // applyPipelineThinking lets a realtime pipeline force the LLM's thinking off @@ -24,3 +25,26 @@ var _ = Describe("applyPipelineThinking", func() { Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil()) }) }) + +// spokenReasoningConfig clears DisableReasoning so realtime spoken output always +// strips reasoning, even though disable_thinking sets DisableReasoning=true on the +// LLM config (which the backend reads as enable_thinking=false). +var _ = Describe("spokenReasoningConfig", func() { + It("clears DisableReasoning so the extractor still strips leaked reasoning", func() { + disable := true + out := spokenReasoningConfig(reasoning.Config{DisableReasoning: &disable}) + Expect(out.DisableReasoning).To(BeNil()) + }) + + It("preserves the other reasoning settings", func() { + disable := true + out := spokenReasoningConfig(reasoning.Config{ + DisableReasoning: &disable, + ThinkingStartTokens: []string{""}, + TagPairs: []reasoning.TagPair{{Start: "", End: ""}}, + }) + Expect(out.ThinkingStartTokens).To(Equal([]string{""})) + Expect(out.TagPairs).To(HaveLen(1)) + Expect(out.TagPairs[0].Start).To(Equal("")) + }) +}) From 9ec1456ec61a18af0e7a79ba4ae2ed2ab4c6a263 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jun 2026 07:06:14 +0000 Subject: [PATCH 13/15] fix(realtime): clean TTS temp path before read (gosec G304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit emitSpeech reads the WAV file the TTS backend wrote. The read moved here from realtime.go, so code-scanning flagged it as a new G304 alert even though the path is backend-controlled (a temp file), not user input. Wrap it in filepath.Clean — a real path normalization that also clears the alert, keeping with the repo's no-#nosec convention. Assisted-by: Claude:claude-opus-4-8 gosec, golangci-lint Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime_speech.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/http/endpoints/openai/realtime_speech.go b/core/http/endpoints/openai/realtime_speech.go index 2b98b1b4e956..ec4bbc4b052d 100644 --- a/core/http/endpoints/openai/realtime_speech.go +++ b/core/http/endpoints/openai/realtime_speech.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "os" + "path/filepath" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" laudio "github.com/mudler/LocalAI/pkg/audio" @@ -84,7 +85,9 @@ func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, } defer func() { _ = os.Remove(audioFilePath) }() - audioBytes, err := os.ReadFile(audioFilePath) + // filepath.Clean normalizes the backend-produced temp path before reading + // (also keeps gosec G304 quiet — the path is backend-controlled, not user input). + audioBytes, err := os.ReadFile(filepath.Clean(audioFilePath)) if err != nil { return nil, fmt.Errorf("read tts audio: %w", err) } From 076dcdbed876b68ec2e138434226e6dca2e6d543 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jun 2026 07:38:38 +0000 Subject: [PATCH 14/15] refactor(realtime): buffer whole message for TTS, drop sentence segmenter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review (richiejp): the sentence segmenter pipelined unary TTS by splitting on ASCII .!?/newline, which does nothing for languages without those boundaries (CJK/Thai) — there it already degraded to buffering the whole message anyway. Replace it with a uniform model: stream the LLM transcript live, buffer the full message, then synthesize it once. emitSpeech already streams the audio chunks when the backend implements TTSStream and falls back to a single unary delta otherwise, so this is real streaming TTS where supported and a clean whole-message synthesis elsewhere — no per-sentence emulation, no language assumptions. speechStreamer becomes transcriptStreamer (transcript deltas only); the whole-message synthesis moves into streamLLMResponse. Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- .../endpoints/openai/realtime_doubles_test.go | 28 +++++- .../endpoints/openai/realtime_segmenter.go | 61 ------------ .../openai/realtime_segmenter_test.go | 41 -------- core/http/endpoints/openai/realtime_stream.go | 95 +++++++------------ .../endpoints/openai/realtime_stream_test.go | 89 ++++++++--------- 5 files changed, 102 insertions(+), 212 deletions(-) delete mode 100644 core/http/endpoints/openai/realtime_segmenter.go delete mode 100644 core/http/endpoints/openai/realtime_segmenter_test.go diff --git a/core/http/endpoints/openai/realtime_doubles_test.go b/core/http/endpoints/openai/realtime_doubles_test.go index afb1f5e7a31a..accd6af5168d 100644 --- a/core/http/endpoints/openai/realtime_doubles_test.go +++ b/core/http/endpoints/openai/realtime_doubles_test.go @@ -74,6 +74,15 @@ type fakeModel struct { transcribeDeltas []string transcribeFinal *schema.TranscriptionResult + + // Predict streaming: predictTokens are replayed through the token callback + // (simulating streamed LLM output); predictResp/predictErr are returned by + // the deferred predict function. predictChunkDeltas, when set, are delivered + // per-token via TokenUsage.ChatDeltas to exercise the autoparser path. + predictTokens []string + predictChunkDeltas [][]*proto.ChatDelta + predictResp backend.LLMResponse + predictErr error } func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) { @@ -84,8 +93,23 @@ func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, stri return m.transcribeFinal, nil } -func (m *fakeModel) Predict(context.Context, schema.Messages, []string, []string, []string, func(string, backend.TokenUsage) bool, []types.ToolUnion, *types.ToolChoiceUnion, *int, *int, map[string]float64) (func() (backend.LLMResponse, error), error) { - return nil, nil +func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) { + if m.predictErr != nil { + return nil, m.predictErr + } + return func() (backend.LLMResponse, error) { + for i, tok := range m.predictTokens { + if cb == nil { + continue + } + usage := backend.TokenUsage{} + if i < len(m.predictChunkDeltas) { + usage.ChatDeltas = m.predictChunkDeltas[i] + } + cb(tok, usage) + } + return m.predictResp, nil + }, nil } func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) { diff --git a/core/http/endpoints/openai/realtime_segmenter.go b/core/http/endpoints/openai/realtime_segmenter.go deleted file mode 100644 index 77116229f157..000000000000 --- a/core/http/endpoints/openai/realtime_segmenter.go +++ /dev/null @@ -1,61 +0,0 @@ -package openai - -import "strings" - -// streamSegmenter accumulates streamed LLM text and emits complete utterance -// segments (sentence/clause boundaries) so the realtime pipeline can hand each -// segment to TTS as soon as it's complete, overlapping generation, synthesis -// and playback instead of waiting for the whole reply. -// -// A segment is committed when a sentence terminator (. ! ?) is followed by -// whitespace, or at a newline. Terminators not followed by whitespace (e.g. -// decimals like "3.14" mid-stream) stay buffered until more text arrives or the -// stream is flushed. -type streamSegmenter struct { - buf strings.Builder -} - -func isSentenceTerminator(b byte) bool { - return b == '.' || b == '!' || b == '?' -} - -func isSpace(b byte) bool { - return b == ' ' || b == '\t' || b == '\n' || b == '\r' -} - -// Push appends text to the buffer and returns any newly-completed segments, -// trimmed of surrounding whitespace. Incomplete trailing text stays buffered. -func (s *streamSegmenter) Push(text string) []string { - s.buf.WriteString(text) - cur := s.buf.String() - - var segments []string - start := 0 - for i := 0; i < len(cur); i++ { - cut := -1 - switch { - case cur[i] == '\n': - cut = i // segment excludes the newline - case isSentenceTerminator(cur[i]) && i+1 < len(cur) && isSpace(cur[i+1]): - cut = i + 1 // segment includes the terminator - } - if cut >= 0 { - if seg := strings.TrimSpace(cur[start:cut]); seg != "" { - segments = append(segments, seg) - } - start = cut - } - } - - rem := cur[start:] - s.buf.Reset() - s.buf.WriteString(rem) - return segments -} - -// Flush returns the remaining buffered text (trimmed) and clears the buffer. -func (s *streamSegmenter) Flush() string { - seg := strings.TrimSpace(s.buf.String()) - s.buf.Reset() - return seg -} diff --git a/core/http/endpoints/openai/realtime_segmenter_test.go b/core/http/endpoints/openai/realtime_segmenter_test.go deleted file mode 100644 index 13d9a8c78293..000000000000 --- a/core/http/endpoints/openai/realtime_segmenter_test.go +++ /dev/null @@ -1,41 +0,0 @@ -package openai - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -// streamSegmenter turns a stream of LLM token text into complete sentence/clause -// segments so TTS can start synthesizing before the full reply is generated. -var _ = Describe("streamSegmenter", func() { - It("buffers partial text until a sentence terminator followed by space", func() { - var s streamSegmenter - Expect(s.Push("Hello")).To(BeEmpty()) - Expect(s.Push(" world")).To(BeEmpty()) - Expect(s.Push(". ")).To(Equal([]string{"Hello world."})) - }) - - It("emits each complete sentence and keeps the trailing partial buffered", func() { - var s streamSegmenter - Expect(s.Push("One. Two! Three")).To(Equal([]string{"One.", "Two!"})) - Expect(s.Flush()).To(Equal("Three")) - }) - - It("splits on newlines", func() { - var s streamSegmenter - Expect(s.Push("Line one\nLine two")).To(Equal([]string{"Line one"})) - Expect(s.Flush()).To(Equal("Line two")) - }) - - It("does not split decimals or mid-token punctuation", func() { - var s streamSegmenter - Expect(s.Push("Pi is 3.14 today")).To(BeEmpty()) - Expect(s.Flush()).To(Equal("Pi is 3.14 today")) - }) - - It("flushes to empty when the buffer holds only consumed text", func() { - var s streamSegmenter - s.Push("Done. ") - Expect(s.Flush()).To(Equal("")) - }) -}) diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index 09526c561fe5..9eca643d3b07 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -12,46 +12,36 @@ import ( "github.com/mudler/LocalAI/pkg/reasoning" ) -// speechStreamer consumes streamed LLM tokens and drives the realtime output: -// it strips reasoning incrementally, emits a transcript text delta for each -// content fragment, and — when the pipeline streams TTS — sentence-pipes the -// content so each completed sentence is synthesized as soon as it's ready, -// overlapping generation, synthesis and playback. -// -// It is used only for plain-content turns (no tools): tool-call output can't be -// safely spoken mid-stream, so those turns keep the buffered path. -type speechStreamer struct { +// transcriptStreamer turns streamed LLM tokens into the assistant's spoken +// transcript: it strips reasoning incrementally and sends one +// response.output_audio_transcript.delta per content fragment. It does NOT +// synthesize audio — the caller buffers the full message and synthesizes it +// once (streaming the audio chunks when the TTS backend supports TTSStream), +// which works uniformly for streaming and non-streaming TTS and for languages +// without sentence or word boundaries. +type transcriptStreamer struct { ctx context.Context t Transport - session *Session responseID string itemID string - - extractor *reasoning.ReasoningExtractor - seg streamSegmenter - audio []byte - streamTTS bool - err error + extractor *reasoning.ReasoningExtractor } -func newSpeechStreamer(ctx context.Context, t Transport, session *Session, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *speechStreamer { - // Spoken output must never contain reasoning, even when disable_thinking set - // DisableReasoning (which would otherwise turn the extractor's stripping off). - reasoningCfg = spokenReasoningConfig(reasoningCfg) - return &speechStreamer{ +func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer { + return &transcriptStreamer{ ctx: ctx, t: t, - session: session, responseID: responseID, itemID: itemID, - extractor: reasoning.NewReasoningExtractor(thinkingStartToken, reasoningCfg), - streamTTS: session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS(), + extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)), } } -// onToken handles one streamed LLM token. It is shaped to be used directly as -// the backend token callback's text sink. -func (s *speechStreamer) onToken(token string) { +// onToken handles one streamed unit of model output, sending a transcript delta +// for the new content (reasoning stripped). For plain-content models the unit is +// the raw text token; for autoparser tool turns the backend clears the text and +// delivers content via ChatDeltas, so the caller passes that content here. +func (s *transcriptStreamer) onToken(token string) { _, content := s.extractor.ProcessToken(token) if content == "" { return @@ -64,41 +54,20 @@ func (s *speechStreamer) onToken(token string) { ContentIndex: 0, Delta: content, }) - if s.streamTTS { - for _, segment := range s.seg.Push(content) { - s.speak(segment) - } - } } -func (s *speechStreamer) speak(text string) { - pcm, err := emitSpeech(s.ctx, s.t, s.session, s.responseID, s.itemID, text) - if err != nil { - if s.err == nil { - s.err = err - } - return - } - s.audio = append(s.audio, pcm...) -} - -// finish flushes any buffered sentence to TTS and returns the full cleaned -// content, the accumulated PCM audio, and the first error encountered (if any). -func (s *speechStreamer) finish() (content string, audio []byte, err error) { - if s.streamTTS { - if rem := s.seg.Flush(); rem != "" { - s.speak(rem) - } - } - return s.extractor.CleanedContent(), s.audio, s.err +// content returns the full transcript so far with reasoning stripped. +func (s *transcriptStreamer) content() string { + return s.extractor.CleanedContent() } // streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. -// It announces the assistant item before tokens arrive, feeds the LLM token -// callback through a speechStreamer (transcript deltas + sentence-piped TTS), -// then emits the terminal events. It returns true when it has fully handled the -// response so the caller can return; callers must only invoke it for turns with -// no tools and an audio modality (see triggerResponseAtTurn). +// It announces the assistant item before tokens arrive, streams transcript +// deltas as the LLM generates, then synthesizes the whole buffered message once +// (streaming the audio chunks when the TTS backend supports it, otherwise a +// single unary delta) and emits the terminal events. It returns true when it has +// fully handled the response so the caller can return; callers must only invoke +// it for turns with no tools and an audio modality (see triggerResponseAtTurn). func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { // Announce the assistant item up front so streamed deltas target a known item. item := types.MessageItemUnion{ @@ -150,7 +119,7 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation } thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) - streamer := newSpeechStreamer(ctx, t, session, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) + streamer := newTranscriptStreamer(ctx, t, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) cb := func(token string, _ backend.TokenUsage) bool { if ctx.Err() != nil { return false @@ -177,8 +146,16 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation return true } - content, audio, err := streamer.finish() + // Buffer the whole message, then synthesize it once. emitSpeech streams the + // audio chunks when the TTS backend supports TTSStream, otherwise it sends a + // single unary delta — no per-sentence segmentation either way. + content := streamer.content() + audio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, content) if err != nil { + if ctx.Err() != nil { + cancel() + return true + } sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) return true } diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index d8697c331e9f..ccdd31cca62f 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -6,86 +6,77 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/pkg/reasoning" ) -// speechStreamer consumes streamed LLM tokens: it strips reasoning, emits a -// transcript delta per content fragment, and sentence-pipes content into TTS so -// audio starts before the full reply is generated. -var _ = Describe("speechStreamer", func() { - It("emits a transcript delta per token and speaks each completed sentence", func() { - on := true - m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} - session := &Session{ - OutputSampleRate: 24000, - ModelInterface: m, - ModelConfig: &config.ModelConfig{ - Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &on}}, - }, - } +// transcriptStreamer turns streamed LLM tokens into incremental transcript +// deltas, stripping reasoning. Audio is synthesized once from the full message +// by the caller, so there is no per-sentence segmentation. +var _ = Describe("transcriptStreamer", func() { + It("emits one transcript delta per content token", func() { t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{}) for _, tok := range []string{"Hello", " world.", " Bye"} { s.onToken(tok) } - content, audio, err := s.finish() - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("Hello world. Bye")) - // One transcript delta per (non-empty) token. + Expect(s.content()).To(Equal("Hello world. Bye")) Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) - // Two sentences spoken: "Hello world." mid-stream + "Bye" on flush; one - // chunk each. - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2)) - Expect(audio).To(Equal([]byte{7, 7})) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye")) }) It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() { - // disable_thinking maps to ReasoningConfig.DisableReasoning=true (it tells - // the backend enable_thinking=false). When the model ignores that and emits - // thinking anyway, the spoken stream must still not leak it: the streamer is - // the last line of defence and always strips reasoning from spoken content. + // disable_thinking maps to DisableReasoning=true (enable_thinking=false to + // the backend). If the model emits thinking anyway, the transcript must + // still not leak it: stripping always runs for spoken output. disable := true - session := &Session{ - OutputSampleRate: 24000, - ModelInterface: &fakeModel{}, - ModelConfig: &config.ModelConfig{}, // streaming.tts off - } t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", + s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{DisableReasoning: &disable}) s.onToken("secret plan") s.onToken("The answer is 42.") - content, _, err := s.finish() - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("The answer is 42.")) - Expect(content).ToNot(ContainSubstring("secret plan")) - // The text streamed to the client must not carry the reasoning either. + Expect(s.content()).To(Equal("The answer is 42.")) + Expect(s.content()).ToNot(ContainSubstring("secret plan")) Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan")) }) +}) - It("does not synthesize audio when TTS streaming is disabled", func() { - m := &fakeModel{ttsStreamChunks: [][]byte{{7}}, ttsStreamRate: 24000} +// streamLLMResponse drives a full streamed realtime turn: live transcript +// deltas while the LLM generates, then the whole message is synthesized once. +var _ = Describe("streamLLMResponse", func() { + It("streams transcript deltas then synthesizes the whole message once", func() { + on := true + m := &fakeModel{ + predictTokens: []string{"Hello", " world.", " How are you?"}, + predictResp: backend.LLMResponse{Response: "Hello world. How are you?"}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } session := &Session{ OutputSampleRate: 24000, ModelInterface: m, - ModelConfig: &config.ModelConfig{}, // streaming.tts off + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, } + conv := &Conversation{} t := &fakeTransport{} - s := newSpeechStreamer(context.Background(), t, session, "resp1", "item1", "", reasoning.Config{}) + llmCfg := &config.ModelConfig{} - s.onToken("Hello world.") - content, audio, err := s.finish() + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg) - Expect(err).ToNot(HaveOccurred()) - Expect(content).To(Equal("Hello world.")) - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(1)) - Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(0)) - Expect(audio).To(BeEmpty()) + Expect(handled).To(BeTrue()) + // One live transcript delta per streamed token. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3)) + // The whole message is synthesized ONCE (not per sentence): a single + // emitSpeech replays the one TTS stream chunk. + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) + Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) }) }) From d05d83ff367bcd83ea11243145ea250f5273596a Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 5 Jun 2026 07:48:00 +0000 Subject: [PATCH 15/15] feat(realtime): stream tool-call turns via tokenizer-template autoparser MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review (richiejp): tool-call deltas exist, so streaming should work with tools too. It does — for models that use their tokenizer template. The C++ autoparser then clears reply.Message and delivers content + tool calls via ChatDeltas, so the streamed transcript carries only spoken content (no tool-call JSON leak) and the tool calls are parsed from the final response. - Drop the len(tools)==0 gate; stream when no tools OR use_tokenizer_template (grammar-based function calling still buffers, since its call is emitted as JSON in the token stream and would leak into the transcript). - streamLLMResponse takes tools/toolChoice/toolTurn, reads ChatDelta content in the token callback, parses tool calls from the final ChatDeltas, and creates the assistant content item lazily so a content-less tool turn emits only the tool calls. - Extract emitToolCallItems from the buffered path so both paths finalize tool calls, response.done, and server-side assistant-tool follow-ups identically. Assisted-by: Claude:claude-opus-4-8 go test, golangci-lint Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/realtime.go | 51 ++-- core/http/endpoints/openai/realtime_stream.go | 217 +++++++++++------- .../endpoints/openai/realtime_stream_test.go | 70 +++++- docs/content/features/openai-realtime.md | 4 +- 4 files changed, 233 insertions(+), 109 deletions(-) diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index bc2a8078534e..7d76444aa953 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -1496,18 +1496,21 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }, }) - // Streamed LLM path: when the pipeline opts into LLM streaming and the turn - // cannot produce a tool call (no tools), stream tokens straight to the client - // as transcript deltas and sentence-pipe them into TTS. Tool turns fall - // through to the buffered path below, since partial tool-call output can't be - // safely spoken mid-stream. - if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() && len(tools) == 0 { + // Streamed LLM path: when the pipeline opts into LLM streaming, stream the + // transcript to the client as it is generated and synthesize the buffered + // message once. Tool turns are supported only when the model uses its + // tokenizer template: the C++ autoparser then delivers content and tool + // calls via ChatDeltas (clearing the text stream), so the spoken transcript + // never leaks tool-call tokens. Grammar-based function calling emits the + // call as JSON in the token stream, so those turns keep the buffered path. + if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() { + canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate var respMods []types.Modality if overrides != nil { respMods = overrides.OutputModalities } - if modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { - if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config) { + if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) { + if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) { return } } @@ -1814,17 +1817,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa }) } - // Handle Tool Calls. Two paths: - // - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run - // server-side; we append both the call and its output to conv.Items - // and re-trigger a follow-up response so the model can speak the - // result. The client only sees observability events. - // - All other tools follow the standard OpenAI flow: emit - // function_call_arguments.done and wait for the client to send - // conversation.item.create back. - xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls)) + // Emit the parsed tool calls, the terminal response.done, and (for + // server-side assistant tools) the follow-up response. Shared with the + // streamed path so both finalize tool calls identically. + emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn) +} + +// emitToolCallItems emits the realtime function_call items for the parsed tool +// calls, the terminal response.done, and — for server-side LocalAI Assistant +// tools — re-triggers a follow-up response so the model can speak the result. +// hasContent shifts the tool-call output index past the assistant content item +// when the same turn also produced spoken/text content. Two tool paths: +// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side; +// we append both the call and its output to conv.Items and re-trigger. The +// client only sees observability events. +// - All other tools follow the standard OpenAI flow: emit +// function_call_arguments.done and wait for the client to send +// conversation.item.create back. +func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) { + xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls)) executedAssistantTool := false - for i, tc := range finalToolCalls { + for i, tc := range toolCalls { toolCallID := generateItemID() callID := "call_" + generateUniqueID() // OpenAI uses call_xyz @@ -1844,7 +1857,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa conv.Lock.Unlock() outputIndex := i - if finalSpeech != "" { + if hasContent { outputIndex++ } diff --git a/core/http/endpoints/openai/realtime_stream.go b/core/http/endpoints/openai/realtime_stream.go index 9eca643d3b07..f6c70e82da49 100644 --- a/core/http/endpoints/openai/realtime_stream.go +++ b/core/http/endpoints/openai/realtime_stream.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/pkg/functions" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -25,6 +26,12 @@ type transcriptStreamer struct { responseID string itemID string extractor *reasoning.ReasoningExtractor + + // announce, if set, is invoked once just before the first transcript delta. + // It lets the caller create the assistant item lazily, so a content-less + // tool-call turn never emits a spurious empty assistant item. + announce func() + announced bool } func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer { @@ -46,6 +53,12 @@ func (s *transcriptStreamer) onToken(token string) { if content == "" { return } + if !s.announced { + s.announced = true + if s.announce != nil { + s.announce() + } + } _ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: s.responseID, @@ -61,50 +74,61 @@ func (s *transcriptStreamer) content() string { return s.extractor.CleanedContent() } -// streamLLMResponse drives a streamed, plain-content (no tools) realtime reply. -// It announces the assistant item before tokens arrive, streams transcript -// deltas as the LLM generates, then synthesizes the whole buffered message once -// (streaming the audio chunks when the TTS backend supports it, otherwise a -// single unary delta) and emits the terminal events. It returns true when it has -// fully handled the response so the caller can return; callers must only invoke -// it for turns with no tools and an audio modality (see triggerResponseAtTurn). -func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig) bool { - // Announce the assistant item up front so streamed deltas target a known item. +// streamLLMResponse drives a streamed realtime reply. It streams the assistant +// transcript as the LLM generates, then synthesizes the whole buffered message +// once (streaming the audio chunks when the TTS backend supports it, otherwise a +// single unary delta). Tool calls parsed from the autoparser ChatDeltas are +// emitted after the spoken content. The assistant content item is created lazily +// on the first content delta, so a content-less tool-call turn emits only the +// tool calls. It returns true when it has fully handled the response so the +// caller can return; callers must only invoke it for an audio modality, and with +// tools only when the model uses its tokenizer template (see triggerResponseAtTurn). +func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool { + itemID := generateItemID() item := types.MessageItemUnion{ Assistant: &types.MessageItemAssistant{ - ID: generateItemID(), + ID: itemID, Status: types.ItemStatusInProgress, Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}}, }, } - conv.Lock.Lock() - conv.Items = append(conv.Items, &item) - conv.Lock.Unlock() - sendEvent(t, types.ResponseOutputItemAddedEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - OutputIndex: 0, - Item: item, - }) - sendEvent(t, types.ResponseContentPartAddedEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Part: item.Assistant.Content[0], - }) + // announce creates the assistant content item lazily, just before the first + // transcript delta — a tool-only turn never produces content, so it stays out + // of the conversation and the client sees only the tool calls. + announced := false + announce := func() { + announced = true + conv.Lock.Lock() + conv.Items = append(conv.Items, &item) + conv.Lock.Unlock() + sendEvent(t, types.ResponseOutputItemAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) + sendEvent(t, types.ResponseContentPartAddedEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + } cancel := func() { - conv.Lock.Lock() - for i := len(conv.Items) - 1; i >= 0; i-- { - if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == item.Assistant.ID { - conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) - break + if announced { + conv.Lock.Lock() + for i := len(conv.Items) - 1; i >= 0; i-- { + if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID { + conv.Items = append(conv.Items[:i], conv.Items[i+1:]...) + break + } } + conv.Lock.Unlock() } - conv.Lock.Unlock() sendEvent(t, types.ResponseDoneEvent{ ServerEventBase: types.ServerEventBase{}, Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled}, @@ -119,26 +143,36 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation } thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig) - streamer := newTranscriptStreamer(ctx, t, responseID, item.Assistant.ID, thinkingStartToken, llmCfg.ReasoningConfig) - cb := func(token string, _ backend.TokenUsage) bool { + streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, llmCfg.ReasoningConfig) + streamer.announce = announce + cb := func(token string, usage backend.TokenUsage) bool { if ctx.Err() != nil { return false } - streamer.onToken(token) + // Plain-content models stream text via the token; autoparser tool turns + // clear the text and deliver content via ChatDeltas, so prefer the latter + // when present. Either way only content reaches the transcript — tool-call + // deltas are parsed from the final response below. + text := token + if len(usage.ChatDeltas) > 0 { + text = functions.ContentFromChatDeltas(usage.ChatDeltas) + } + streamer.onToken(text) return true } - predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, nil, nil, nil, nil, nil) + predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil) if err != nil { - sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID) return true } - if _, err := predFunc(); err != nil { + pred, err := predFunc() + if err != nil { if ctx.Err() != nil { cancel() return true } - sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", item.Assistant.ID) + sendError(t, "prediction_failed", fmt.Sprintf("backend error: %v", err), "", itemID) return true } if ctx.Err() != nil { @@ -146,65 +180,74 @@ func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation return true } - // Buffer the whole message, then synthesize it once. emitSpeech streams the - // audio chunks when the TTS backend supports TTSStream, otherwise it sends a - // single unary delta — no per-sentence segmentation either way. content := streamer.content() - audio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, content) - if err != nil { - if ctx.Err() != nil { - cancel() + toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas) + + // Finalize the spoken content item only when the turn produced content. A + // tool-only turn skips this entirely (no empty assistant item). + if content != "" { + if !announced { + announce() + } + // Buffer the whole message, then synthesize it once. emitSpeech streams + // the audio chunks when the TTS backend supports TTSStream, otherwise it + // sends a single unary delta — no per-sentence segmentation either way. + audio, err := emitSpeech(ctx, t, session, responseID, itemID, content) + if err != nil { + if ctx.Err() != nil { + cancel() + return true + } + sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", itemID) return true } - sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID) - return true - } - _, isWebRTC := t.(*WebRTCTransport) + _, isWebRTC := t.(*WebRTCTransport) - sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Transcript: content, - }) - if !isWebRTC { - sendEvent(t, types.ResponseOutputAudioDoneEvent{ + sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{ ServerEventBase: types.ServerEventBase{}, ResponseID: responseID, - ItemID: item.Assistant.ID, + ItemID: itemID, OutputIndex: 0, ContentIndex: 0, + Transcript: content, }) - } + if !isWebRTC { + sendEvent(t, types.ResponseOutputAudioDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + }) + } + + conv.Lock.Lock() + item.Assistant.Status = types.ItemStatusCompleted + item.Assistant.Content[0].Transcript = content + if !isWebRTC { + item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + } + conv.Lock.Unlock() - conv.Lock.Lock() - item.Assistant.Status = types.ItemStatusCompleted - item.Assistant.Content[0].Transcript = content - if !isWebRTC { - item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio) + sendEvent(t, types.ResponseContentPartDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + ItemID: itemID, + OutputIndex: 0, + ContentIndex: 0, + Part: item.Assistant.Content[0], + }) + sendEvent(t, types.ResponseOutputItemDoneEvent{ + ServerEventBase: types.ServerEventBase{}, + ResponseID: responseID, + OutputIndex: 0, + Item: item, + }) } - conv.Lock.Unlock() - sendEvent(t, types.ResponseContentPartDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - ItemID: item.Assistant.ID, - OutputIndex: 0, - ContentIndex: 0, - Part: item.Assistant.Content[0], - }) - sendEvent(t, types.ResponseOutputItemDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - ResponseID: responseID, - OutputIndex: 0, - Item: item, - }) - sendEvent(t, types.ResponseDoneEvent{ - ServerEventBase: types.ServerEventBase{}, - Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCompleted}, - }) + // Emit any tool calls, the terminal response.done, and (for server-side + // assistant tools) the follow-up turn — shared with the buffered path. + emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn) return true } diff --git a/core/http/endpoints/openai/realtime_stream_test.go b/core/http/endpoints/openai/realtime_stream_test.go index ccdd31cca62f..f7042f7722eb 100644 --- a/core/http/endpoints/openai/realtime_stream_test.go +++ b/core/http/endpoints/openai/realtime_stream_test.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http/endpoints/openai/types" + "github.com/mudler/LocalAI/pkg/grpc/proto" "github.com/mudler/LocalAI/pkg/reasoning" ) @@ -69,7 +70,7 @@ var _ = Describe("streamLLMResponse", func() { t := &fakeTransport{} llmCfg := &config.ModelConfig{} - handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg) + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) Expect(handled).To(BeTrue()) // One live transcript delta per streamed token. @@ -79,4 +80,71 @@ var _ = Describe("streamLLMResponse", func() { Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1)) Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?")) }) + + It("streams content deltas and emits tool-call items (autoparser tool turn)", func() { + on := true + // Autoparser path: reply.Message is empty; content + tool calls arrive via + // ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call. + contentDelta := []*proto.ChatDelta{{Content: "Let me check."}} + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}} + m := &fakeModel{ + predictTokens: []string{"", ""}, + predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)}, + ttsStreamChunks: [][]byte{{9}}, + ttsStreamRate: 24000, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // The spoken content was streamed live. + Expect(t.transcriptDeltaText()).To(Equal("Let me check.")) + // The tool call is emitted as a function_call item. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + // Exactly one terminal response.done. + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) + + It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() { + on := true + toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}} + m := &fakeModel{ + predictTokens: []string{""}, + predictChunkDeltas: [][]*proto.ChatDelta{toolDelta}, + predictResp: backend.LLMResponse{ChatDeltas: toolDelta}, + } + session := &Session{ + OutputSampleRate: 24000, + ModelInterface: m, + ModelConfig: &config.ModelConfig{ + Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}}, + }, + } + conv := &Conversation{} + t := &fakeTransport{} + llmCfg := &config.ModelConfig{} + llmCfg.TemplateConfig.UseTokenizerTemplate = true + + handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0) + + Expect(handled).To(BeTrue()) + // No content → no transcript deltas and no spurious assistant content item. + Expect(t.transcriptDeltaText()).To(Equal("")) + Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0)) + // The tool call is still emitted. + Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1)) + Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1)) + }) }) diff --git a/docs/content/features/openai-realtime.md b/docs/content/features/openai-realtime.md index b9af56ca200e..dd152f19cec3 100644 --- a/docs/content/features/openai-realtime.md +++ b/docs/content/features/openai-realtime.md @@ -48,9 +48,9 @@ pipeline: transcription: true # stream transcript text deltas of the user's speech ``` -- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces, instead of one delta for the whole utterance. +- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise. - **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming). -- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events and, when `streaming.tts` is also enabled, synthesize each completed sentence as soon as it is ready — overlapping generation, synthesis and playback. Streaming is used only for turns that cannot produce a tool call; turns with tools fall back to the buffered path so partial tool-call output is never spoken. +- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path. All streaming flags are off by default, so existing pipelines are unaffected.