From 85f47fd79d3111e87190c46dbc346d1d2769fce6 Mon Sep 17 00:00:00 2001 From: xynstr Date: Tue, 28 Apr 2026 22:58:42 +0200 Subject: [PATCH 1/2] feat(insights): add OpenAI provider for transcription and translation Introduces a new "openai" insights provider implemented entirely in Go on top of github.com/openai/openai-go/v3. Replaces the third-party- language helper service experimented with in the previous draft PR; nothing outside this Go module is required to use it. Capabilities - Transcription via POST /v1/audio/transcriptions, chunked from PCM16 audio supplied through TranscriptionStream.WriteSample. Each chunk is encoded as an in-memory WAV and uploaded; one final_result event is emitted per chunk. Real-time partials require the Realtime websocket API which openai-go does not yet cover and most self-hosted backends do not support, so we deliberately ship chunked-only. - Translation via POST /v1/chat/completions with a JSON Schema response format whose properties are the requested target language codes. One round-trip yields all target translations; Strict mode forces the schema. Chat Completions (rather than the newer Responses API) keeps the provider compatible with self-hosted OpenAI-API endpoints (LocalAI, vLLM, llama.cpp-server, ...). - Speech synthesis, AI text chat, and batch summarisation are stubbed out for now and can be added in follow-ups. Configuration - providers.openai[].credentials.api_key - required. - providers.openai[].options.base_url - optional, point at any OpenAI-compatible endpoint to use a self-hosted backend. - providers.openai[].options.chunk_seconds - transcription chunk duration in seconds (default 5). - services.transcription.options.model / services.translation.options.model override the defaults (whisper-1 and gpt-4o-mini respectively). --- go.mod | 5 + go.sum | 12 + pkg/insights/providers/openai/client.go | 168 +++++++++++++ pkg/insights/providers/openai/languages.go | 79 ++++++ pkg/insights/providers/openai/transcribe.go | 265 ++++++++++++++++++++ pkg/insights/providers/openai/translate.go | 145 +++++++++++ pkg/services/insights/insights_service.go | 3 + 7 files changed, 677 insertions(+) create mode 100644 pkg/insights/providers/openai/client.go create mode 100644 pkg/insights/providers/openai/languages.go create mode 100644 pkg/insights/providers/openai/transcribe.go create mode 100644 pkg/insights/providers/openai/translate.go diff --git a/go.mod b/go.mod index ace11be5..d738f7ee 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/nats-io/jwt/v2 v2.8.1 github.com/nats-io/nats.go v1.51.0 github.com/nats-io/nkeys v0.4.15 + github.com/openai/openai-go/v3 v3.33.0 github.com/pion/webrtc/v4 v4.2.11 github.com/redis/go-redis/v9 v9.18.0 github.com/sirupsen/logrus v1.9.4 @@ -109,6 +110,10 @@ require ( github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.69.0 // indirect diff --git a/go.sum b/go.sum index 42718f71..26051815 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/openai/openai-go/v3 v3.33.0 h1:aiETRPoLxnk6y3sIakXAdRCvtcLhdhBqHqIvEdOkEuc= +github.com/openai/openai-go/v3 v3.33.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= @@ -280,6 +282,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/pkg/insights/providers/openai/client.go b/pkg/insights/providers/openai/client.go new file mode 100644 index 00000000..f851b7b2 --- /dev/null +++ b/pkg/insights/providers/openai/client.go @@ -0,0 +1,168 @@ +// Package openai implements insights.Provider against the OpenAI API and any +// OpenAI-compatible HTTP backend (LocalAI, vLLM, llama.cpp-server, whisper.cpp, +// etc.). The base_url provider option lets operators point this provider at a +// self-hosted endpoint while keeping the same Go code path as for OpenAI cloud. +package openai + +import ( + "context" + "encoding/json" + "fmt" + "io" + + openaisdk "github.com/openai/openai-go/v3" + "github.com/openai/openai-go/v3/option" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/config" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" + "github.com/sirupsen/logrus" +) + +const ( + defaultTranscriptionModel = "whisper-1" + defaultChatModel = "gpt-4o-mini" + defaultChunkSeconds = 5.0 + transcriptionSampleRate = 16000 +) + +// Provider implements insights.Provider for OpenAI and OpenAI-compatible APIs. +type Provider struct { + account *config.ProviderAccount + service *config.ServiceConfig + client openaisdk.Client + logger *logrus.Entry +} + +// NewProvider builds a Provider. The api_key credential is required; the +// base_url option is optional and overrides the SDK's default endpoint. +func NewProvider(providerAccount *config.ProviderAccount, serviceConfig *config.ServiceConfig, log *logrus.Entry) (insights.Provider, error) { + if providerAccount == nil { + return nil, fmt.Errorf("openai: provider account is nil") + } + if providerAccount.Credentials.APIKey == "" { + return nil, fmt.Errorf("openai: credentials.api_key is required") + } + + opts := []option.RequestOption{ + option.WithAPIKey(providerAccount.Credentials.APIKey), + } + if baseURL, _ := providerAccount.Options["base_url"].(string); baseURL != "" { + opts = append(opts, option.WithBaseURL(baseURL)) + } + + return &Provider{ + account: providerAccount, + service: serviceConfig, + client: openaisdk.NewClient(opts...), + logger: log.WithField("service", "openai"), + }, nil +} + +// CreateTranscription opens a chunked transcription stream that periodically +// uploads buffered PCM16 audio to /v1/audio/transcriptions and emits a +// final_result event per chunk. +func (p *Provider) CreateTranscription(ctx context.Context, roomId, userId string, options []byte) (insights.TranscriptionStream, error) { + opts := &insights.TranscriptionOptions{} + if len(options) > 0 { + if err := json.Unmarshal(options, opts); err != nil { + return nil, fmt.Errorf("openai: failed to unmarshal transcription options: %w", err) + } + } + + model := p.serviceModel(defaultTranscriptionModel) + chunkSec := p.chunkSeconds() + + return newChunkedStream(ctx, p.client, model, chunkSec, roomId, userId, opts, p.logger) +} + +// TranslateText performs translation via Chat Completions with a JSON-schema +// constrained response. One round-trip handles all target languages. +func (p *Provider) TranslateText(ctx context.Context, text, sourceLang string, targetLangs []string) (*plugnmeet.InsightsTextTranslationResult, error) { + if len(targetLangs) == 0 { + return nil, fmt.Errorf("openai: at least one target language is required") + } + model := p.serviceModel(defaultChatModel) + return translateViaChatCompletions(ctx, p.client, model, text, sourceLang, targetLangs, p.logger) +} + +// SynthesizeText is intentionally not implemented: TTS via the OpenAI audio +// endpoint can be added in a follow-up; until then we surface a clear error. +func (p *Provider) SynthesizeText(_ context.Context, _ []byte) (io.ReadCloser, error) { + return nil, fmt.Errorf("openai: speech synthesis not implemented") +} + +// GetSupportedLanguages returns the static language lists for transcription +// and translation. Whisper / OpenAI translation models support far more codes +// than this list; we surface only the subset we exercise in PlugNmeet. +func (p *Provider) GetSupportedLanguages(serviceType insights.ServiceType) []*plugnmeet.InsightsSupportedLangInfo { + if langs, ok := supportedLanguages[serviceType]; ok { + out := make([]*plugnmeet.InsightsSupportedLangInfo, len(langs)) + for i := range langs { + out[i] = &langs[i] + } + return out + } + return nil +} + +// AITextChatStream is not supported by this provider in its current scope. +func (p *Provider) AITextChatStream(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (<-chan *plugnmeet.InsightsAITextChatStreamResult, error) { + return nil, nil +} + +// AIChatTextSummarize is not supported by this provider in its current scope. +func (p *Provider) AIChatTextSummarize(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (string, uint32, uint32, error) { + return "", 0, 0, nil +} + +// StartBatchSummarizeAudioFile is not supported by this provider. +func (p *Provider) StartBatchSummarizeAudioFile(_ context.Context, _, _, _ string) (string, string, error) { + return "", "", nil +} + +// CheckBatchJobStatus is not supported by this provider. +func (p *Provider) CheckBatchJobStatus(_ context.Context, _ string) (*insights.BatchJobResponse, error) { + return nil, nil +} + +// DeleteUploadedFile is not supported by this provider. +func (p *Provider) DeleteUploadedFile(_ context.Context, _ string) error { + return nil +} + +// serviceModel reads the per-service model name from the service config, +// falling back to the supplied default. The provider account can also pin a +// model via its own options as a coarse default for both services. +func (p *Provider) serviceModel(fallback string) string { + if p.service != nil { + if m, _ := p.service.Options["model"].(string); m != "" { + return m + } + } + if p.account != nil { + if m, _ := p.account.Options["model"].(string); m != "" { + return m + } + } + return fallback +} + +// chunkSeconds reads chunk_seconds from the provider account options. YAML +// numbers arrive as float64 from gopkg.in/yaml.v3; ints are accepted too. +func (p *Provider) chunkSeconds() float64 { + if p.account == nil { + return defaultChunkSeconds + } + switch v := p.account.Options["chunk_seconds"].(type) { + case float64: + if v > 0 { + return v + } + case int: + if v > 0 { + return float64(v) + } + } + return defaultChunkSeconds +} diff --git a/pkg/insights/providers/openai/languages.go b/pkg/insights/providers/openai/languages.go new file mode 100644 index 00000000..c594639c --- /dev/null +++ b/pkg/insights/providers/openai/languages.go @@ -0,0 +1,79 @@ +package openai + +import ( + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// supportedLanguages enumerates the languages we surface for transcription +// and translation. The transcription set tracks Whisper's documented +// coverage; the translation set is the same superset since modern chat +// models (gpt-4o, gpt-4o-mini, Llama-class instruct models) translate +// between any of these pairs comfortably. +var supportedLanguages = map[insights.ServiceType][]plugnmeet.InsightsSupportedLangInfo{ + insights.ServiceTypeTranscription: whisperLanguages(), + insights.ServiceTypeTranslation: whisperLanguages(), +} + +func whisperLanguages() []plugnmeet.InsightsSupportedLangInfo { + return []plugnmeet.InsightsSupportedLangInfo{ + {Code: "af", Name: "Afrikaans", Locale: "af"}, + {Code: "ar", Name: "Arabic", Locale: "ar"}, + {Code: "az", Name: "Azerbaijani", Locale: "az"}, + {Code: "be", Name: "Belarusian", Locale: "be"}, + {Code: "bg", Name: "Bulgarian", Locale: "bg"}, + {Code: "bn", Name: "Bengali", Locale: "bn"}, + {Code: "bs", Name: "Bosnian", Locale: "bs"}, + {Code: "ca", Name: "Catalan", Locale: "ca"}, + {Code: "cs", Name: "Czech", Locale: "cs"}, + {Code: "cy", Name: "Welsh", Locale: "cy"}, + {Code: "da", Name: "Danish", Locale: "da"}, + {Code: "de", Name: "German", Locale: "de"}, + {Code: "el", Name: "Greek", Locale: "el"}, + {Code: "en", Name: "English", Locale: "en"}, + {Code: "es", Name: "Spanish", Locale: "es"}, + {Code: "et", Name: "Estonian", Locale: "et"}, + {Code: "fa", Name: "Persian", Locale: "fa"}, + {Code: "fi", Name: "Finnish", Locale: "fi"}, + {Code: "fr", Name: "French", Locale: "fr"}, + {Code: "gl", Name: "Galician", Locale: "gl"}, + {Code: "he", Name: "Hebrew", Locale: "he"}, + {Code: "hi", Name: "Hindi", Locale: "hi"}, + {Code: "hr", Name: "Croatian", Locale: "hr"}, + {Code: "hu", Name: "Hungarian", Locale: "hu"}, + {Code: "hy", Name: "Armenian", Locale: "hy"}, + {Code: "id", Name: "Indonesian", Locale: "id"}, + {Code: "is", Name: "Icelandic", Locale: "is"}, + {Code: "it", Name: "Italian", Locale: "it"}, + {Code: "ja", Name: "Japanese", Locale: "ja"}, + {Code: "kk", Name: "Kazakh", Locale: "kk"}, + {Code: "kn", Name: "Kannada", Locale: "kn"}, + {Code: "ko", Name: "Korean", Locale: "ko"}, + {Code: "lt", Name: "Lithuanian", Locale: "lt"}, + {Code: "lv", Name: "Latvian", Locale: "lv"}, + {Code: "mi", Name: "Maori", Locale: "mi"}, + {Code: "mk", Name: "Macedonian", Locale: "mk"}, + {Code: "mr", Name: "Marathi", Locale: "mr"}, + {Code: "ms", Name: "Malay", Locale: "ms"}, + {Code: "ne", Name: "Nepali", Locale: "ne"}, + {Code: "nl", Name: "Dutch", Locale: "nl"}, + {Code: "no", Name: "Norwegian", Locale: "no"}, + {Code: "pl", Name: "Polish", Locale: "pl"}, + {Code: "pt", Name: "Portuguese", Locale: "pt"}, + {Code: "ro", Name: "Romanian", Locale: "ro"}, + {Code: "ru", Name: "Russian", Locale: "ru"}, + {Code: "sk", Name: "Slovak", Locale: "sk"}, + {Code: "sl", Name: "Slovenian", Locale: "sl"}, + {Code: "sr", Name: "Serbian", Locale: "sr"}, + {Code: "sv", Name: "Swedish", Locale: "sv"}, + {Code: "sw", Name: "Swahili", Locale: "sw"}, + {Code: "ta", Name: "Tamil", Locale: "ta"}, + {Code: "th", Name: "Thai", Locale: "th"}, + {Code: "tl", Name: "Tagalog", Locale: "tl"}, + {Code: "tr", Name: "Turkish", Locale: "tr"}, + {Code: "uk", Name: "Ukrainian", Locale: "uk"}, + {Code: "ur", Name: "Urdu", Locale: "ur"}, + {Code: "vi", Name: "Vietnamese", Locale: "vi"}, + {Code: "zh", Name: "Chinese", Locale: "zh"}, + } +} diff --git a/pkg/insights/providers/openai/transcribe.go b/pkg/insights/providers/openai/transcribe.go new file mode 100644 index 00000000..0b19dd6b --- /dev/null +++ b/pkg/insights/providers/openai/transcribe.go @@ -0,0 +1,265 @@ +package openai + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/livekit/media-sdk" + openaisdk "github.com/openai/openai-go/v3" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// chunkedStream implements insights.TranscriptionStream by buffering PCM16 +// audio in memory and POSTing fixed-duration WAV chunks to the OpenAI +// transcription endpoint. Each upload yields a single final_result event. +// +// This trades partials and per-word latency for portability: the same code +// path works against OpenAI cloud and any OpenAI-compatible self-hosted +// transcription server (e.g. faster-whisper behind LocalAI, whisper.cpp's +// HTTP server, vLLM with whisper). Real-time partials would require the +// OpenAI Realtime websocket API, which is not yet covered by openai-go and +// is not supported by most self-hosted backends. +type chunkedStream struct { + client openaisdk.Client + model string + chunkSamples int + language string + userId string + userName string + allowStorage bool + + bufMu sync.Mutex + buf []int16 + + chunks chan []int16 + results chan *insights.TranscriptionEvent + + closed atomic.Bool + wg sync.WaitGroup + cancel context.CancelFunc + + log *logrus.Entry +} + +func newChunkedStream(parentCtx context.Context, client openaisdk.Client, model string, chunkSeconds float64, roomId, userId string, opts *insights.TranscriptionOptions, log *logrus.Entry) (*chunkedStream, error) { + if chunkSeconds <= 0 { + chunkSeconds = defaultChunkSeconds + } + ctx, cancel := context.WithCancel(parentCtx) + + s := &chunkedStream{ + client: client, + model: model, + chunkSamples: int(chunkSeconds * float64(transcriptionSampleRate)), + language: opts.SpokenLang, + userId: userId, + userName: opts.UserName, + allowStorage: opts.AllowedTranscriptionStorage, + chunks: make(chan []int16, 4), + results: make(chan *insights.TranscriptionEvent, 16), + cancel: cancel, + log: log.WithFields(logrus.Fields{ + "roomId": roomId, + "userId": userId, + "lang": opts.SpokenLang, + "model": model, + }), + } + + s.wg.Add(1) + go s.uploadLoop(ctx) + + s.results <- &insights.TranscriptionEvent{Type: insights.EventTypeSessionStarted} + return s, nil +} + +// WriteSample appends PCM16 samples to the in-memory buffer and pushes a chunk +// onto the upload queue once the configured duration has accumulated. +func (s *chunkedStream) WriteSample(sample media.PCM16Sample) error { + if s.closed.Load() { + return fmt.Errorf("stream is closed") + } + + s.bufMu.Lock() + s.buf = append(s.buf, sample...) + var ready []int16 + if len(s.buf) >= s.chunkSamples { + ready = make([]int16, s.chunkSamples) + copy(ready, s.buf[:s.chunkSamples]) + s.buf = s.buf[s.chunkSamples:] + } + s.bufMu.Unlock() + + if ready != nil { + select { + case s.chunks <- ready: + default: + // Upload queue is saturated. Dropping is preferable to blocking + // the audio path; the next chunk has fresh content anyway. + s.log.Warn("openai transcription upload queue full, dropping chunk") + } + } + return nil +} + +// Close flushes any buffered tail audio, signals the upload loop to drain, +// and waits for in-flight uploads to finish before returning. +func (s *chunkedStream) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return nil + } + + s.bufMu.Lock() + tail := s.buf + s.buf = nil + s.bufMu.Unlock() + + if len(tail) > 0 { + select { + case s.chunks <- tail: + default: + s.log.Warn("openai transcription upload queue full at close, dropping tail") + } + } + close(s.chunks) + s.wg.Wait() + s.cancel() + close(s.results) + return nil +} + +// SetProperty is unused; OpenAI's transcription model is selected at session +// creation and does not accept mid-stream property changes. +func (s *chunkedStream) SetProperty(_, _ string) error { return nil } + +// Results exposes the read side of the event channel. +func (s *chunkedStream) Results() <-chan *insights.TranscriptionEvent { return s.results } + +func (s *chunkedStream) uploadLoop(ctx context.Context) { + defer s.wg.Done() + defer func() { + // Best-effort: always emit a stopped event so downstream consumers + // can release any room-level state tied to this stream. + s.safeSend(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + }() + + for samples := range s.chunks { + if len(samples) == 0 { + continue + } + text, err := s.transcribeChunk(ctx, samples) + if err != nil { + if ctx.Err() != nil { + return + } + s.log.WithError(err).Warn("openai transcription upload failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + continue + } + if text == "" { + continue + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeFinalResult, + Result: &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.userName, + Lang: s.language, + Text: text, + IsPartial: false, + AllowedTranscriptionStorage: s.allowStorage, + Translations: map[string]string{}, + }, + }) + } +} + +func (s *chunkedStream) safeSend(ev *insights.TranscriptionEvent) { + defer func() { + // A late event after Close() races with channel close; swallow the + // resulting panic rather than tearing down the upload goroutine. + _ = recover() + }() + select { + case s.results <- ev: + default: + s.log.Warn("openai transcription results channel full, dropping event") + } +} + +func (s *chunkedStream) transcribeChunk(ctx context.Context, samples []int16) (string, error) { + wav := encodeWAV(samples, transcriptionSampleRate) + + params := openaisdk.AudioTranscriptionNewParams{ + Model: openaisdk.AudioModel(s.model), + File: namedReader{Reader: bytes.NewReader(wav), name: "audio.wav"}, + } + if s.language != "" { + params.Language = openaisdk.String(s.language) + } + + // Each chunk gets a generous-but-bounded deadline so a hung backend + // doesn't pile up requests indefinitely. + reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + resp, err := s.client.Audio.Transcriptions.New(reqCtx, params) + if err != nil { + return "", fmt.Errorf("audio.transcriptions.new: %w", err) + } + return resp.Text, nil +} + +// namedReader exposes a filename to the openai-go multipart encoder, which +// uses it to derive the upload's MIME type. Without a recognised audio +// extension Whisper-compatible servers refuse the upload. +type namedReader struct { + io.Reader + name string +} + +func (n namedReader) Name() string { return n.name } + +// encodeWAV serialises 16-bit signed little-endian PCM samples as a minimal +// canonical RIFF/WAVE file (44-byte header + interleaved data). 16 kHz mono +// matches the audio LiveKit hands us via PCM16Sample. +func encodeWAV(samples []int16, sampleRate int) []byte { + const ( + numChannels = 1 + bitsPerSample = 16 + ) + dataSize := len(samples) * 2 + totalSize := 36 + dataSize + + buf := bytes.NewBuffer(make([]byte, 0, 44+dataSize)) + buf.WriteString("RIFF") + _ = binary.Write(buf, binary.LittleEndian, uint32(totalSize)) + buf.WriteString("WAVE") + + buf.WriteString("fmt ") + _ = binary.Write(buf, binary.LittleEndian, uint32(16)) // PCM fmt chunk size + _ = binary.Write(buf, binary.LittleEndian, uint16(1)) // PCM format + _ = binary.Write(buf, binary.LittleEndian, uint16(numChannels)) + _ = binary.Write(buf, binary.LittleEndian, uint32(sampleRate)) + _ = binary.Write(buf, binary.LittleEndian, uint32(sampleRate*numChannels*bitsPerSample/8)) + _ = binary.Write(buf, binary.LittleEndian, uint16(numChannels*bitsPerSample/8)) + _ = binary.Write(buf, binary.LittleEndian, uint16(bitsPerSample)) + + buf.WriteString("data") + _ = binary.Write(buf, binary.LittleEndian, uint32(dataSize)) + _ = binary.Write(buf, binary.LittleEndian, samples) + + return buf.Bytes() +} diff --git a/pkg/insights/providers/openai/translate.go b/pkg/insights/providers/openai/translate.go new file mode 100644 index 00000000..3560d83b --- /dev/null +++ b/pkg/insights/providers/openai/translate.go @@ -0,0 +1,145 @@ +package openai + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + openaisdk "github.com/openai/openai-go/v3" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" +) + +// translateViaChatCompletions uses the Chat Completions endpoint with a +// dynamically-shaped JSON Schema to translate a single source string into all +// requested target languages in one round-trip. The schema's properties are +// the target language codes, so a Strict-mode response is forced to populate +// exactly the keys we expect. +// +// We deliberately use Chat Completions rather than the newer Responses API: +// every OpenAI-compatible self-hosted backend (llama.cpp-server, LocalAI, +// vLLM, Ollama) implements /v1/chat/completions, while the Responses API is +// OpenAI-specific. Keeping translation portable matters more here than using +// the latest API surface. +func translateViaChatCompletions(parentCtx context.Context, client openaisdk.Client, model, text, sourceLang string, targetLangs []string, log *logrus.Entry) (*plugnmeet.InsightsTextTranslationResult, error) { + if strings.TrimSpace(text) == "" { + // Skip the round-trip; an empty source maps to empty translations + // and the model would either refuse or hallucinate filler. + empty := make(map[string]string, len(targetLangs)) + for _, l := range targetLangs { + empty[l] = "" + } + return &plugnmeet.InsightsTextTranslationResult{ + SourceText: text, + SourceLang: sourceLang, + Translations: empty, + }, nil + } + + schema := buildTranslationSchema(targetLangs) + systemPrompt := "You are a professional translation engine. Translate the user's text faithfully and idiomatically. Preserve meaning, tone, named entities, and numbers. Do not add explanations. Respond ONLY with the JSON object that matches the response schema." + + srcDescriptor := sourceLang + if srcDescriptor == "" { + srcDescriptor = "auto-detect" + } + userPrompt := fmt.Sprintf( + "Source language: %s\nTarget languages (ISO 639-1): %s\n\nText to translate:\n%s", + srcDescriptor, + strings.Join(targetLangs, ", "), + text, + ) + + schemaParam := openaisdk.ResponseFormatJSONSchemaJSONSchemaParam{ + Name: "translations", + Description: openaisdk.String("Map of ISO 639-1 target language code to translated text."), + Schema: schema, + Strict: openaisdk.Bool(true), + } + + ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + defer cancel() + + resp, err := client.Chat.Completions.New(ctx, openaisdk.ChatCompletionNewParams{ + Model: openaisdk.ChatModel(model), + Messages: []openaisdk.ChatCompletionMessageParamUnion{ + openaisdk.SystemMessage(systemPrompt), + openaisdk.UserMessage(userPrompt), + }, + ResponseFormat: openaisdk.ChatCompletionNewParamsResponseFormatUnion{ + OfJSONSchema: &openaisdk.ResponseFormatJSONSchemaParam{JSONSchema: schemaParam}, + }, + Temperature: openaisdk.Float(0), + }) + if err != nil { + return nil, fmt.Errorf("chat.completions.new: %w", err) + } + if len(resp.Choices) == 0 { + return nil, fmt.Errorf("openai translation: empty choices in response") + } + + raw := resp.Choices[0].Message.Content + parsed := map[string]string{} + if err := json.Unmarshal([]byte(raw), &parsed); err != nil { + // Some local backends ignore Strict mode and wrap output in markdown + // fences or prose. Try one tolerant unwrap before giving up. + if cleaned := stripJSONNoise(raw); cleaned != raw { + if err2 := json.Unmarshal([]byte(cleaned), &parsed); err2 == nil { + goto OK + } + } + log.WithError(err).WithField("raw", truncate(raw, 256)).Warn("openai translation: failed to parse JSON response") + return nil, fmt.Errorf("failed to parse translation JSON: %w", err) + } +OK: + translations := make(map[string]string, len(targetLangs)) + for _, l := range targetLangs { + translations[l] = parsed[l] + } + + return &plugnmeet.InsightsTextTranslationResult{ + SourceText: text, + SourceLang: sourceLang, + Translations: translations, + }, nil +} + +// buildTranslationSchema produces a JSON Schema requiring exactly the supplied +// language codes as string properties. Strict mode then forces the model to +// emit all of them and only them. +func buildTranslationSchema(targetLangs []string) map[string]any { + properties := make(map[string]any, len(targetLangs)) + required := make([]string, 0, len(targetLangs)) + for _, l := range targetLangs { + properties[l] = map[string]any{"type": "string"} + required = append(required, l) + } + return map[string]any{ + "type": "object", + "additionalProperties": false, + "properties": properties, + "required": required, + } +} + +// stripJSONNoise trims markdown code fences and surrounding whitespace from +// a model response. Strict-mode-compliant backends shouldn't need this; it's +// purely a forgiveness layer for OpenAI-compatible servers that don't enforce +// the schema. +func stripJSONNoise(s string) string { + s = strings.TrimSpace(s) + s = strings.TrimPrefix(s, "```json") + s = strings.TrimPrefix(s, "```") + s = strings.TrimSuffix(s, "```") + return strings.TrimSpace(s) +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "…" +} diff --git a/pkg/services/insights/insights_service.go b/pkg/services/insights/insights_service.go index 60961541..51588a45 100644 --- a/pkg/services/insights/insights_service.go +++ b/pkg/services/insights/insights_service.go @@ -8,6 +8,7 @@ import ( "github.com/mynaparrot/plugnmeet-server/pkg/insights" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/azure" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/google" + openaiprovider "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/openai" natsservice "github.com/mynaparrot/plugnmeet-server/pkg/services/nats" redisservice "github.com/mynaparrot/plugnmeet-server/pkg/services/redis" "github.com/sirupsen/logrus" @@ -23,6 +24,8 @@ func NewProvider(ctx context.Context, providerType string, providerAccount *conf return azure.NewProvider(providerAccount, serviceConfig, log) case "google": return google.NewProvider(ctx, providerAccount, serviceConfig, log) + case "openai": + return openaiprovider.NewProvider(providerAccount, serviceConfig, log) default: return nil, fmt.Errorf("unknown AI provider type: %s", providerType) } From 8e7c9527c04415378373b8a2f44e8f245f103cf3 Mon Sep 17 00:00:00 2001 From: xynstr Date: Wed, 29 Apr 2026 02:26:39 +0200 Subject: [PATCH 2/2] feat(insights): add realtime mode to OpenAI provider Adds a realtime transcription path alongside the existing chunked one, selectable via the provider account's mode option (default: chunked). - chunked (existing): WAV chunks POSTed to /v1/audio/transcriptions. Works against any OpenAI-compatible HTTP backend (faster-whisper-server, whisper.cpp, vLLM, LocalAI). - realtime (new): WebSocket to /v1/realtime?intent=transcription with PCM16 streaming, server VAD, partial_result deltas, and final_result on segment completion. Works against OpenAI cloud and Azure OpenAI Realtime. Also drops the goto fallback in translate.go in favour of a small parseTranslationJSON helper. --- go.mod | 2 +- pkg/insights/providers/openai/client.go | 29 +- pkg/insights/providers/openai/transcribe.go | 9 +- .../providers/openai/transcribe_realtime.go | 347 ++++++++++++++++++ pkg/insights/providers/openai/translate.go | 29 +- 5 files changed, 394 insertions(+), 22 deletions(-) create mode 100644 pkg/insights/providers/openai/transcribe_realtime.go diff --git a/go.mod b/go.mod index d738f7ee..478d9d9b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/gofiber/template/html/v2 v2.1.3 github.com/google/uuid v1.6.0 github.com/google/wire v0.7.0 + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/jordic/lti v0.0.0-20160211051708-2c756eacbab9 github.com/livekit/media-sdk v0.0.0-20260401192012-ea94ab340a57 github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221 @@ -70,7 +71,6 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.14 // indirect github.com/googleapis/gax-go/v2 v2.19.0 // indirect - github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.8 // indirect github.com/jinzhu/inflection v1.0.0 // indirect diff --git a/pkg/insights/providers/openai/client.go b/pkg/insights/providers/openai/client.go index f851b7b2..85a9d262 100644 --- a/pkg/insights/providers/openai/client.go +++ b/pkg/insights/providers/openai/client.go @@ -24,6 +24,9 @@ const ( defaultChatModel = "gpt-4o-mini" defaultChunkSeconds = 5.0 transcriptionSampleRate = 16000 + + modeChunked = "chunked" + modeRealtime = "realtime" ) // Provider implements insights.Provider for OpenAI and OpenAI-compatible APIs. @@ -59,9 +62,9 @@ func NewProvider(providerAccount *config.ProviderAccount, serviceConfig *config. }, nil } -// CreateTranscription opens a chunked transcription stream that periodically -// uploads buffered PCM16 audio to /v1/audio/transcriptions and emits a -// final_result event per chunk. +// CreateTranscription opens a transcription stream. mode: "chunked" (default) +// uploads WAV chunks to /v1/audio/transcriptions; mode: "realtime" streams +// PCM16 over WebSocket and surfaces partials. func (p *Provider) CreateTranscription(ctx context.Context, roomId, userId string, options []byte) (insights.TranscriptionStream, error) { opts := &insights.TranscriptionOptions{} if len(options) > 0 { @@ -71,9 +74,13 @@ func (p *Provider) CreateTranscription(ctx context.Context, roomId, userId strin } model := p.serviceModel(defaultTranscriptionModel) - chunkSec := p.chunkSeconds() - return newChunkedStream(ctx, p.client, model, chunkSec, roomId, userId, opts, p.logger) + switch p.transcribeMode() { + case modeRealtime: + return newRealtimeStream(ctx, p.account, model, roomId, userId, opts, p.logger) + default: + return newChunkedStream(ctx, p.client, model, p.chunkSeconds(), roomId, userId, opts, p.logger) + } } // TranslateText performs translation via Chat Completions with a JSON-schema @@ -148,6 +155,18 @@ func (p *Provider) serviceModel(fallback string) string { return fallback } +func (p *Provider) transcribeMode() string { + if p.account == nil { + return modeChunked + } + switch m, _ := p.account.Options["mode"].(string); m { + case modeRealtime: + return modeRealtime + default: + return modeChunked + } +} + // chunkSeconds reads chunk_seconds from the provider account options. YAML // numbers arrive as float64 from gopkg.in/yaml.v3; ints are accepted too. func (p *Provider) chunkSeconds() float64 { diff --git a/pkg/insights/providers/openai/transcribe.go b/pkg/insights/providers/openai/transcribe.go index 0b19dd6b..98406f08 100644 --- a/pkg/insights/providers/openai/transcribe.go +++ b/pkg/insights/providers/openai/transcribe.go @@ -22,12 +22,9 @@ import ( // audio in memory and POSTing fixed-duration WAV chunks to the OpenAI // transcription endpoint. Each upload yields a single final_result event. // -// This trades partials and per-word latency for portability: the same code -// path works against OpenAI cloud and any OpenAI-compatible self-hosted -// transcription server (e.g. faster-whisper behind LocalAI, whisper.cpp's -// HTTP server, vLLM with whisper). Real-time partials would require the -// OpenAI Realtime websocket API, which is not yet covered by openai-go and -// is not supported by most self-hosted backends. +// This trades partials for portability: the same code path works against any +// OpenAI-compatible HTTP backend (faster-whisper-server, whisper.cpp, vLLM, +// LocalAI). For partial deltas use mode: "realtime" (realtimeStream). type chunkedStream struct { client openaisdk.Client model string diff --git a/pkg/insights/providers/openai/transcribe_realtime.go b/pkg/insights/providers/openai/transcribe_realtime.go new file mode 100644 index 00000000..bc7bd55f --- /dev/null +++ b/pkg/insights/providers/openai/transcribe_realtime.go @@ -0,0 +1,347 @@ +package openai + +import ( + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/livekit/media-sdk" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/config" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// realtimeStream implements insights.TranscriptionStream over the OpenAI +// Realtime WebSocket API, surfacing per-segment partial_result and +// final_result events. Requires a backend that speaks the Realtime protocol +// (OpenAI cloud or Azure OpenAI Realtime). +type realtimeStream struct { + conn *websocket.Conn + model string + language string + userId string + userName string + allow bool + + // All sends go through outbound; gorilla/websocket forbids concurrent writes. + outbound chan []byte + results chan *insights.TranscriptionEvent + + pmu sync.Mutex + partials map[string]string + + closed atomic.Bool + wg sync.WaitGroup + cancel context.CancelFunc + log *logrus.Entry +} + +const ( + defaultRealtimeURL = "wss://api.openai.com/v1/realtime?intent=transcription" + defaultRealtimeModel = "gpt-4o-mini-transcribe" + realtimeDialTimeout = 10 * time.Second +) + +func newRealtimeStream(parentCtx context.Context, account *config.ProviderAccount, model, roomId, userId string, opts *insights.TranscriptionOptions, log *logrus.Entry) (*realtimeStream, error) { + if account == nil { + return nil, fmt.Errorf("openai realtime: provider account is nil") + } + if account.Credentials.APIKey == "" { + return nil, fmt.Errorf("openai realtime: credentials.api_key is required") + } + + url := defaultRealtimeURL + if v, _ := account.Options["realtime_url"].(string); v != "" { + url = v + } + // whisper-1 is rejected by the Realtime transcription endpoint. + if model == "" || model == defaultTranscriptionModel { + model = defaultRealtimeModel + } + + header := http.Header{} + header.Set("Authorization", "Bearer "+account.Credentials.APIKey) + header.Set("OpenAI-Beta", "realtime=v1") + + dialer := &websocket.Dialer{HandshakeTimeout: realtimeDialTimeout} + dialCtx, dialCancel := context.WithTimeout(parentCtx, realtimeDialTimeout) + defer dialCancel() + + conn, resp, err := dialer.DialContext(dialCtx, url, header) + if err != nil { + if resp != nil { + return nil, fmt.Errorf("openai realtime: dial %s: %w (http %d)", url, err, resp.StatusCode) + } + return nil, fmt.Errorf("openai realtime: dial %s: %w", url, err) + } + + ctx, cancel := context.WithCancel(parentCtx) + s := &realtimeStream{ + conn: conn, + model: model, + language: opts.SpokenLang, + userId: userId, + userName: opts.UserName, + allow: opts.AllowedTranscriptionStorage, + outbound: make(chan []byte, 64), + results: make(chan *insights.TranscriptionEvent, 16), + partials: make(map[string]string), + cancel: cancel, + log: log.WithFields(logrus.Fields{ + "roomId": roomId, + "userId": userId, + "lang": opts.SpokenLang, + "model": model, + "mode": modeRealtime, + }), + } + + if err := s.sendSessionUpdate(); err != nil { + _ = conn.Close() + cancel() + return nil, err + } + + s.wg.Add(2) + go s.writeLoop(ctx) + go s.readLoop(ctx) + + s.results <- &insights.TranscriptionEvent{Type: insights.EventTypeSessionStarted} + return s, nil +} + +// sendSessionUpdate runs synchronously before writeLoop starts, so it writes +// directly. After that, all sends MUST go through outbound. +func (s *realtimeStream) sendSessionUpdate() error { + session := map[string]any{ + "type": "transcription_session.update", + "session": map[string]any{ + "input_audio_format": "pcm16", + "input_audio_transcription": map[string]any{ + "model": s.model, + "language": s.language, + }, + // Server VAD drives the .completed event we map to final_result. + "turn_detection": map[string]any{ + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 500, + }, + }, + } + payload, err := json.Marshal(session) + if err != nil { + return fmt.Errorf("openai realtime: marshal session.update: %w", err) + } + if err := s.conn.WriteMessage(websocket.TextMessage, payload); err != nil { + return fmt.Errorf("openai realtime: send session.update: %w", err) + } + return nil +} + +func (s *realtimeStream) WriteSample(sample media.PCM16Sample) error { + if s.closed.Load() { + return fmt.Errorf("stream is closed") + } + if len(sample) == 0 { + return nil + } + + raw := make([]byte, len(sample)*2) + for i, v := range sample { + binary.LittleEndian.PutUint16(raw[i*2:], uint16(v)) + } + + event := map[string]string{ + "type": "input_audio_buffer.append", + "audio": base64.StdEncoding.EncodeToString(raw), + } + payload, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("openai realtime: marshal append: %w", err) + } + + select { + case s.outbound <- payload: + default: + s.log.Warn("openai realtime outbound queue full, dropping audio frame") + } + return nil +} + +func (s *realtimeStream) SetProperty(_, _ string) error { return nil } + +func (s *realtimeStream) Results() <-chan *insights.TranscriptionEvent { return s.results } + +func (s *realtimeStream) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return nil + } + + commit, _ := json.Marshal(map[string]string{"type": "input_audio_buffer.commit"}) + select { + case s.outbound <- commit: + default: + } + + close(s.outbound) + s.cancel() + // Close unblocks readLoop with a network error. + _ = s.conn.Close() + + s.wg.Wait() + close(s.results) + return nil +} + +func (s *realtimeStream) writeLoop(ctx context.Context) { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-s.outbound: + if !ok { + return + } + _ = s.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := s.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + if !s.closed.Load() { + s.log.WithError(err).Warn("openai realtime write failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + } + return + } + } + } +} + +func (s *realtimeStream) readLoop(ctx context.Context) { + defer s.wg.Done() + defer s.safeSend(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + + for { + _, data, err := s.conn.ReadMessage() + if err != nil { + if ctx.Err() != nil || s.closed.Load() { + return + } + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return + } + s.log.WithError(err).Warn("openai realtime read failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + return + } + s.dispatch(data) + } +} + +type realtimeServerEvent struct { + Type string `json:"type"` + ItemID string `json:"item_id,omitempty"` + Delta string `json:"delta,omitempty"` + Transcript string `json:"transcript,omitempty"` + Error *struct { + Message string `json:"message"` + Code string `json:"code"` + } `json:"error,omitempty"` +} + +func (s *realtimeStream) dispatch(data []byte) { + var ev realtimeServerEvent + if err := json.Unmarshal(data, &ev); err != nil { + s.log.WithError(err).Debug("openai realtime: unparseable server event") + return + } + + switch ev.Type { + case "conversation.item.input_audio_transcription.delta": + text := s.appendPartial(ev.ItemID, ev.Delta) + if text == "" { + return + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypePartialResult, + Result: s.buildResult(text, true), + }) + + case "conversation.item.input_audio_transcription.completed": + text := strings.TrimSpace(ev.Transcript) + s.clearPartial(ev.ItemID) + if text == "" { + return + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeFinalResult, + Result: s.buildResult(text, false), + }) + + case "error": + msg := "unknown realtime error" + if ev.Error != nil && ev.Error.Message != "" { + msg = ev.Error.Message + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: msg, + }) + } +} + +func (s *realtimeStream) appendPartial(itemID, delta string) string { + if itemID == "" || delta == "" { + return "" + } + s.pmu.Lock() + defer s.pmu.Unlock() + s.partials[itemID] += delta + return strings.TrimSpace(s.partials[itemID]) +} + +func (s *realtimeStream) clearPartial(itemID string) { + if itemID == "" { + return + } + s.pmu.Lock() + delete(s.partials, itemID) + s.pmu.Unlock() +} + +func (s *realtimeStream) buildResult(text string, isPartial bool) *plugnmeet.InsightsTranscriptionResult { + return &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.userName, + Lang: s.language, + Text: text, + IsPartial: isPartial, + AllowedTranscriptionStorage: s.allow, + Translations: map[string]string{}, + } +} + +func (s *realtimeStream) safeSend(ev *insights.TranscriptionEvent) { + defer func() { _ = recover() }() + select { + case s.results <- ev: + default: + s.log.Warn("openai realtime results channel full, dropping event") + } +} diff --git a/pkg/insights/providers/openai/translate.go b/pkg/insights/providers/openai/translate.go index 3560d83b..8c3f8cd7 100644 --- a/pkg/insights/providers/openai/translate.go +++ b/pkg/insights/providers/openai/translate.go @@ -82,19 +82,12 @@ func translateViaChatCompletions(parentCtx context.Context, client openaisdk.Cli } raw := resp.Choices[0].Message.Content - parsed := map[string]string{} - if err := json.Unmarshal([]byte(raw), &parsed); err != nil { - // Some local backends ignore Strict mode and wrap output in markdown - // fences or prose. Try one tolerant unwrap before giving up. - if cleaned := stripJSONNoise(raw); cleaned != raw { - if err2 := json.Unmarshal([]byte(cleaned), &parsed); err2 == nil { - goto OK - } - } + parsed, err := parseTranslationJSON(raw) + if err != nil { log.WithError(err).WithField("raw", truncate(raw, 256)).Warn("openai translation: failed to parse JSON response") return nil, fmt.Errorf("failed to parse translation JSON: %w", err) } -OK: + translations := make(map[string]string, len(targetLangs)) for _, l := range targetLangs { translations[l] = parsed[l] @@ -125,6 +118,22 @@ func buildTranslationSchema(targetLangs []string) map[string]any { } } +// parseTranslationJSON unmarshals the response, retrying once with markdown +// fences stripped for backends that ignore Strict mode. +func parseTranslationJSON(raw string) (map[string]string, error) { + parsed := map[string]string{} + if err := json.Unmarshal([]byte(raw), &parsed); err == nil { + return parsed, nil + } else if cleaned := stripJSONNoise(raw); cleaned != raw { + if err2 := json.Unmarshal([]byte(cleaned), &parsed); err2 == nil { + return parsed, nil + } + return nil, err + } else { + return nil, err + } +} + // stripJSONNoise trims markdown code fences and surrounding whitespace from // a model response. Strict-mode-compliant backends shouldn't need this; it's // purely a forgiveness layer for OpenAI-compatible servers that don't enforce