Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/config/meta/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,34 @@ func DefaultRegistry() map[string]FieldMetaOverride {
},
Order: 64,
},
"pipeline.disable_thinking": {
Section: "pipeline",
Label: "Disable Thinking",
Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit <think> blocks you don't want spoken or streamed back to the realtime client.",
Component: "toggle",
Order: 65,
},
"pipeline.streaming.llm": {
Section: "pipeline",
Label: "Stream LLM",
Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.",
Component: "toggle",
Order: 66,
},
"pipeline.streaming.tts": {
Section: "pipeline",
Label: "Stream TTS",
Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.",
Component: "toggle",
Order: 67,
},
"pipeline.streaming.transcription": {
Section: "pipeline",
Label: "Stream Transcription",
Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.",
Component: "toggle",
Order: 68,
},

// --- Functions ---
"function.grammar.parallel_calls": {
Expand Down
33 changes: 33 additions & 0 deletions core/config/model_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,16 @@ type Pipeline struct {
// the pipeline's LLM without editing the LLM model config. Overrides the LLM's
// own reasoning_effort. Unset leaves the LLM model config in charge.
ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"`

// Streaming opts each pipeline stage into incremental delivery (LLM tokens,
// TTS audio chunks, transcription text). Unset stages keep the blocking
// unary path, so existing configs are unaffected.
Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"`

// DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps
// to enable_thinking=false backend metadata) without editing the underlying
// LLM model config. Unset leaves the LLM model config in charge.
DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"`
}

// ApplyReasoningEffort resolves the effective reasoning effort — a per-request
Expand Down Expand Up @@ -530,6 +540,29 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) {
}
}

// @Description PipelineStreaming toggles incremental delivery per realtime stage.
type PipelineStreaming struct {
LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"`
TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"`
Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"`
}

// StreamLLM reports whether LLM tokens should be streamed for this pipeline.
func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM }

// StreamTTS reports whether TTS audio should be streamed for this pipeline.
func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS }

// StreamTranscription reports whether transcription text should be streamed.
func (p Pipeline) StreamTranscription() bool {
return p.Streaming.Transcription != nil && *p.Streaming.Transcription
}

// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off.
func (p Pipeline) ThinkingDisabled() bool {
return p.DisableThinking != nil && *p.DisableThinking
}

// @Description File configuration for model downloads
type File struct {
Filename string `yaml:"filename,omitempty" json:"filename,omitempty"`
Expand Down
54 changes: 54 additions & 0 deletions core/config/pipeline_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package config

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gopkg.in/yaml.v3"
)

// The realtime pipeline can stream each stage (LLM tokens, TTS audio,
// transcription text) and can disable model "thinking" for the LLM. These are
// opt-in per pipeline; everything defaults to off so existing configs keep the
// unary behaviour.
var _ = Describe("Pipeline streaming config", func() {
It("defaults every streaming + thinking helper to false when unset", func() {
var p Pipeline
Expect(p.StreamLLM()).To(BeFalse())
Expect(p.StreamTTS()).To(BeFalse())
Expect(p.StreamTranscription()).To(BeFalse())
Expect(p.ThinkingDisabled()).To(BeFalse())
})

It("parses the nested streaming block and disable_thinking from YAML", func() {
var c ModelConfig
err := yaml.Unmarshal([]byte(`
name: gpt-realtime
pipeline:
llm: my-llm
tts: my-tts
transcription: my-stt
streaming:
llm: true
tts: true
transcription: true
disable_thinking: true
`), &c)
Expect(err).ToNot(HaveOccurred())
Expect(c.Pipeline.StreamLLM()).To(BeTrue())
Expect(c.Pipeline.StreamTTS()).To(BeTrue())
Expect(c.Pipeline.StreamTranscription()).To(BeTrue())
Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue())
})

It("treats an explicit false in the streaming block as disabled", func() {
var c ModelConfig
err := yaml.Unmarshal([]byte(`
name: gpt-realtime
pipeline:
streaming:
tts: false
`), &c)
Expect(err).ToNot(HaveOccurred())
Expect(c.Pipeline.StreamTTS()).To(BeFalse())
})
})
170 changes: 74 additions & 96 deletions core/http/endpoints/openai/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ type Model interface {
Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error)
Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error)
TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error)
// TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM
// chunks (and the backend sample rate) as they are produced.
TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error
// TranscribeStream transcribes audio incrementally, invoking onDelta for each
// transcript text fragment and returning the final aggregated result.
TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error)
PredictConfig() *config.ModelConfig
}

Expand Down Expand Up @@ -1254,27 +1260,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co
// TODO: If we have a real any-to-any model then transcription is optional
var transcript string
if session.InputAudioTranscription != nil {
tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt)
// emitTranscription streams transcript deltas when
// pipeline.streaming.transcription is set, otherwise emits a single
// completed event; either way it returns the final transcript text.
var err error
transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name())
if err != nil {
sendError(t, "transcription_failed", err.Error(), "", "event_TODO")
return
} else if tr == nil {
sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO")
return
}

transcript = tr.Text
sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{
ServerEventBase: types.ServerEventBase{
EventID: "event_TODO",
},

ItemID: generateItemID(),
// ResponseID: "resp_TODO", // Not needed for transcription completed event
// OutputIndex: 0,
ContentIndex: 0,
Transcript: transcript,
})
} else {
sendNotImplemented(t, "any-to-any models")
return
Expand Down Expand Up @@ -1502,6 +1496,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
},
})

// Streamed LLM path: when the pipeline opts into LLM streaming, stream the
// transcript to the client as it is generated and synthesize the buffered
// message once. Tool turns are supported only when the model uses its
// tokenizer template: the C++ autoparser then delivers content and tool
// calls via ChatDeltas (clearing the text stream), so the spoken transcript
// never leaks tool-call tokens. Grammar-based function calling emits the
// call as JSON in the token stream, so those turns keep the buffered path.
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() {
canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate
var respMods []types.Modality
if overrides != nil {
respMods = overrides.OutputModalities
}
if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) {
return
}
}
}

predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
if err != nil {
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here
Expand Down Expand Up @@ -1579,15 +1593,15 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
// so it's safe to apply unconditionally in the no-reasoning branch.
if deltaReasoning == "" && deltaContent != "" {
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig)
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
}
reasoningText = deltaReasoning
responseWithoutReasoning = deltaContent
textContent = deltaContent
cleanedResponse = deltaContent
toolCalls = deltaToolCalls
} else {
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig)
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)
Expand Down Expand Up @@ -1713,64 +1727,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
return
}

audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language)
if err != nil {
if ctx.Err() != nil {
xlog.Debug("TTS cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("TTS failed", "error", err)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return
}
if !res.Success {
xlog.Error("TTS failed", "message", res.Message)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID)
return
}
defer func() { _ = os.Remove(audioFilePath) }()

audioBytes, err := os.ReadFile(audioFilePath)
if err != nil {
xlog.Error("failed to read TTS file", "error", err)
sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID)
return
}

// Parse WAV header to get raw PCM and the actual sample rate from the TTS backend.
pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes)
if ttsSampleRate == 0 {
ttsSampleRate = localSampleRate
}
xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate)

// SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the
// Opus encoder, which resamples to 48kHz internally. This avoids a
// lossy intermediate resample through 16kHz.
// XXX: This is a noop in websocket mode; it's included in the JSON instead
if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil {
if ctx.Err() != nil {
xlog.Debug("Audio playback cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("failed to send audio via transport", "error", err)
}

// For WebSocket clients, resample to the session's output rate and
// deliver audio as base64 in JSON events. WebRTC clients already
// received audio over the RTP track, so skip the base64 payload.
if !isWebRTC {
wsPCM := pcmData
if ttsSampleRate != session.OutputSampleRate {
samples := sound.BytesToInt16sLE(pcmData)
resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate)
wsPCM = sound.Int16toBytesLE(resampled)
}
audioString = base64.StdEncoding.EncodeToString(wsPCM)
}

// Transcript of the spoken reply (the audio's text).
sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
Expand All @@ -1788,15 +1745,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
Transcript: finalSpeech,
})

// Synthesize and send the audio. With pipeline.streaming.tts enabled
// emitSpeech forwards a response.output_audio.delta per backend PCM
// chunk as it's produced; otherwise it sends the whole utterance as a
// single delta. The returned PCM is stored (base64) on the item below.
pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech)
if err != nil {
if ctx.Err() != nil {
xlog.Debug("TTS cancelled (barge-in)")
sendCancelledResponse()
return
}
xlog.Error("TTS failed", "error", err)
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
return
}
if !isWebRTC {
audioString = base64.StdEncoding.EncodeToString(pcmAudio)
}

if !isWebRTC {
sendEvent(t, types.ResponseOutputAudioDeltaEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
ItemID: item.Assistant.ID,
OutputIndex: 0,
ContentIndex: 0,
Delta: audioString,
})
sendEvent(t, types.ResponseOutputAudioDoneEvent{
ServerEventBase: types.ServerEventBase{},
ResponseID: responseID,
Expand Down Expand Up @@ -1849,17 +1817,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
})
}

// Handle Tool Calls. Two paths:
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run
// server-side; we append both the call and its output to conv.Items
// and re-trigger a follow-up response so the model can speak the
// result. The client only sees observability events.
// - All other tools follow the standard OpenAI flow: emit
// function_call_arguments.done and wait for the client to send
// conversation.item.create back.
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls))
// Emit the parsed tool calls, the terminal response.done, and (for
// server-side assistant tools) the follow-up response. Shared with the
// streamed path so both finalize tool calls identically.
emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn)
}

// emitToolCallItems emits the realtime function_call items for the parsed tool
// calls, the terminal response.done, and — for server-side LocalAI Assistant
// tools — re-triggers a follow-up response so the model can speak the result.
// hasContent shifts the tool-call output index past the assistant content item
// when the same turn also produced spoken/text content. Two tool paths:
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side;
// we append both the call and its output to conv.Items and re-trigger. The
// client only sees observability events.
// - All other tools follow the standard OpenAI flow: emit
// function_call_arguments.done and wait for the client to send
// conversation.item.create back.
func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) {
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls))
executedAssistantTool := false
for i, tc := range finalToolCalls {
for i, tc := range toolCalls {
toolCallID := generateItemID()
callID := "call_" + generateUniqueID() // OpenAI uses call_xyz

Expand All @@ -1879,7 +1857,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
conv.Lock.Unlock()

outputIndex := i
if finalSpeech != "" {
if hasContent {
outputIndex++
}

Expand Down
Loading
Loading