diff --git a/go.mod b/go.mod index ace11be5..478d9d9b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/gofiber/template/html/v2 v2.1.3 github.com/google/uuid v1.6.0 github.com/google/wire v0.7.0 + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/jordic/lti v0.0.0-20160211051708-2c756eacbab9 github.com/livekit/media-sdk v0.0.0-20260401192012-ea94ab340a57 github.com/livekit/protocol v1.45.2-0.20260403151849-8a360e8d0221 @@ -24,6 +25,7 @@ require ( github.com/nats-io/jwt/v2 v2.8.1 github.com/nats-io/nats.go v1.51.0 github.com/nats-io/nkeys v0.4.15 + github.com/openai/openai-go/v3 v3.33.0 github.com/pion/webrtc/v4 v4.2.11 github.com/redis/go-redis/v9 v9.18.0 github.com/sirupsen/logrus v1.9.4 @@ -69,7 +71,6 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.14 // indirect github.com/googleapis/gax-go/v2 v2.19.0 // indirect - github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.8 // indirect github.com/jinzhu/inflection v1.0.0 // indirect @@ -109,6 +110,10 @@ require ( github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.69.0 // indirect diff --git a/go.sum b/go.sum index 42718f71..26051815 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4= github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/openai/openai-go/v3 v3.33.0 h1:aiETRPoLxnk6y3sIakXAdRCvtcLhdhBqHqIvEdOkEuc= +github.com/openai/openai-go/v3 v3.33.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= @@ -280,6 +282,16 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/pkg/insights/providers/openai/client.go b/pkg/insights/providers/openai/client.go new file mode 100644 index 00000000..85a9d262 --- /dev/null +++ b/pkg/insights/providers/openai/client.go @@ -0,0 +1,187 @@ +// Package openai implements insights.Provider against the OpenAI API and any +// OpenAI-compatible HTTP backend (LocalAI, vLLM, llama.cpp-server, whisper.cpp, +// etc.). The base_url provider option lets operators point this provider at a +// self-hosted endpoint while keeping the same Go code path as for OpenAI cloud. +package openai + +import ( + "context" + "encoding/json" + "fmt" + "io" + + openaisdk "github.com/openai/openai-go/v3" + "github.com/openai/openai-go/v3/option" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/config" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" + "github.com/sirupsen/logrus" +) + +const ( + defaultTranscriptionModel = "whisper-1" + defaultChatModel = "gpt-4o-mini" + defaultChunkSeconds = 5.0 + transcriptionSampleRate = 16000 + + modeChunked = "chunked" + modeRealtime = "realtime" +) + +// Provider implements insights.Provider for OpenAI and OpenAI-compatible APIs. +type Provider struct { + account *config.ProviderAccount + service *config.ServiceConfig + client openaisdk.Client + logger *logrus.Entry +} + +// NewProvider builds a Provider. The api_key credential is required; the +// base_url option is optional and overrides the SDK's default endpoint. +func NewProvider(providerAccount *config.ProviderAccount, serviceConfig *config.ServiceConfig, log *logrus.Entry) (insights.Provider, error) { + if providerAccount == nil { + return nil, fmt.Errorf("openai: provider account is nil") + } + if providerAccount.Credentials.APIKey == "" { + return nil, fmt.Errorf("openai: credentials.api_key is required") + } + + opts := []option.RequestOption{ + option.WithAPIKey(providerAccount.Credentials.APIKey), + } + if baseURL, _ := providerAccount.Options["base_url"].(string); baseURL != "" { + opts = append(opts, option.WithBaseURL(baseURL)) + } + + return &Provider{ + account: providerAccount, + service: serviceConfig, + client: openaisdk.NewClient(opts...), + logger: log.WithField("service", "openai"), + }, nil +} + +// CreateTranscription opens a transcription stream. mode: "chunked" (default) +// uploads WAV chunks to /v1/audio/transcriptions; mode: "realtime" streams +// PCM16 over WebSocket and surfaces partials. +func (p *Provider) CreateTranscription(ctx context.Context, roomId, userId string, options []byte) (insights.TranscriptionStream, error) { + opts := &insights.TranscriptionOptions{} + if len(options) > 0 { + if err := json.Unmarshal(options, opts); err != nil { + return nil, fmt.Errorf("openai: failed to unmarshal transcription options: %w", err) + } + } + + model := p.serviceModel(defaultTranscriptionModel) + + switch p.transcribeMode() { + case modeRealtime: + return newRealtimeStream(ctx, p.account, model, roomId, userId, opts, p.logger) + default: + return newChunkedStream(ctx, p.client, model, p.chunkSeconds(), roomId, userId, opts, p.logger) + } +} + +// TranslateText performs translation via Chat Completions with a JSON-schema +// constrained response. One round-trip handles all target languages. +func (p *Provider) TranslateText(ctx context.Context, text, sourceLang string, targetLangs []string) (*plugnmeet.InsightsTextTranslationResult, error) { + if len(targetLangs) == 0 { + return nil, fmt.Errorf("openai: at least one target language is required") + } + model := p.serviceModel(defaultChatModel) + return translateViaChatCompletions(ctx, p.client, model, text, sourceLang, targetLangs, p.logger) +} + +// SynthesizeText is intentionally not implemented: TTS via the OpenAI audio +// endpoint can be added in a follow-up; until then we surface a clear error. +func (p *Provider) SynthesizeText(_ context.Context, _ []byte) (io.ReadCloser, error) { + return nil, fmt.Errorf("openai: speech synthesis not implemented") +} + +// GetSupportedLanguages returns the static language lists for transcription +// and translation. Whisper / OpenAI translation models support far more codes +// than this list; we surface only the subset we exercise in PlugNmeet. +func (p *Provider) GetSupportedLanguages(serviceType insights.ServiceType) []*plugnmeet.InsightsSupportedLangInfo { + if langs, ok := supportedLanguages[serviceType]; ok { + out := make([]*plugnmeet.InsightsSupportedLangInfo, len(langs)) + for i := range langs { + out[i] = &langs[i] + } + return out + } + return nil +} + +// AITextChatStream is not supported by this provider in its current scope. +func (p *Provider) AITextChatStream(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (<-chan *plugnmeet.InsightsAITextChatStreamResult, error) { + return nil, nil +} + +// AIChatTextSummarize is not supported by this provider in its current scope. +func (p *Provider) AIChatTextSummarize(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (string, uint32, uint32, error) { + return "", 0, 0, nil +} + +// StartBatchSummarizeAudioFile is not supported by this provider. +func (p *Provider) StartBatchSummarizeAudioFile(_ context.Context, _, _, _ string) (string, string, error) { + return "", "", nil +} + +// CheckBatchJobStatus is not supported by this provider. +func (p *Provider) CheckBatchJobStatus(_ context.Context, _ string) (*insights.BatchJobResponse, error) { + return nil, nil +} + +// DeleteUploadedFile is not supported by this provider. +func (p *Provider) DeleteUploadedFile(_ context.Context, _ string) error { + return nil +} + +// serviceModel reads the per-service model name from the service config, +// falling back to the supplied default. The provider account can also pin a +// model via its own options as a coarse default for both services. +func (p *Provider) serviceModel(fallback string) string { + if p.service != nil { + if m, _ := p.service.Options["model"].(string); m != "" { + return m + } + } + if p.account != nil { + if m, _ := p.account.Options["model"].(string); m != "" { + return m + } + } + return fallback +} + +func (p *Provider) transcribeMode() string { + if p.account == nil { + return modeChunked + } + switch m, _ := p.account.Options["mode"].(string); m { + case modeRealtime: + return modeRealtime + default: + return modeChunked + } +} + +// chunkSeconds reads chunk_seconds from the provider account options. YAML +// numbers arrive as float64 from gopkg.in/yaml.v3; ints are accepted too. +func (p *Provider) chunkSeconds() float64 { + if p.account == nil { + return defaultChunkSeconds + } + switch v := p.account.Options["chunk_seconds"].(type) { + case float64: + if v > 0 { + return v + } + case int: + if v > 0 { + return float64(v) + } + } + return defaultChunkSeconds +} diff --git a/pkg/insights/providers/openai/languages.go b/pkg/insights/providers/openai/languages.go new file mode 100644 index 00000000..c594639c --- /dev/null +++ b/pkg/insights/providers/openai/languages.go @@ -0,0 +1,79 @@ +package openai + +import ( + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// supportedLanguages enumerates the languages we surface for transcription +// and translation. The transcription set tracks Whisper's documented +// coverage; the translation set is the same superset since modern chat +// models (gpt-4o, gpt-4o-mini, Llama-class instruct models) translate +// between any of these pairs comfortably. +var supportedLanguages = map[insights.ServiceType][]plugnmeet.InsightsSupportedLangInfo{ + insights.ServiceTypeTranscription: whisperLanguages(), + insights.ServiceTypeTranslation: whisperLanguages(), +} + +func whisperLanguages() []plugnmeet.InsightsSupportedLangInfo { + return []plugnmeet.InsightsSupportedLangInfo{ + {Code: "af", Name: "Afrikaans", Locale: "af"}, + {Code: "ar", Name: "Arabic", Locale: "ar"}, + {Code: "az", Name: "Azerbaijani", Locale: "az"}, + {Code: "be", Name: "Belarusian", Locale: "be"}, + {Code: "bg", Name: "Bulgarian", Locale: "bg"}, + {Code: "bn", Name: "Bengali", Locale: "bn"}, + {Code: "bs", Name: "Bosnian", Locale: "bs"}, + {Code: "ca", Name: "Catalan", Locale: "ca"}, + {Code: "cs", Name: "Czech", Locale: "cs"}, + {Code: "cy", Name: "Welsh", Locale: "cy"}, + {Code: "da", Name: "Danish", Locale: "da"}, + {Code: "de", Name: "German", Locale: "de"}, + {Code: "el", Name: "Greek", Locale: "el"}, + {Code: "en", Name: "English", Locale: "en"}, + {Code: "es", Name: "Spanish", Locale: "es"}, + {Code: "et", Name: "Estonian", Locale: "et"}, + {Code: "fa", Name: "Persian", Locale: "fa"}, + {Code: "fi", Name: "Finnish", Locale: "fi"}, + {Code: "fr", Name: "French", Locale: "fr"}, + {Code: "gl", Name: "Galician", Locale: "gl"}, + {Code: "he", Name: "Hebrew", Locale: "he"}, + {Code: "hi", Name: "Hindi", Locale: "hi"}, + {Code: "hr", Name: "Croatian", Locale: "hr"}, + {Code: "hu", Name: "Hungarian", Locale: "hu"}, + {Code: "hy", Name: "Armenian", Locale: "hy"}, + {Code: "id", Name: "Indonesian", Locale: "id"}, + {Code: "is", Name: "Icelandic", Locale: "is"}, + {Code: "it", Name: "Italian", Locale: "it"}, + {Code: "ja", Name: "Japanese", Locale: "ja"}, + {Code: "kk", Name: "Kazakh", Locale: "kk"}, + {Code: "kn", Name: "Kannada", Locale: "kn"}, + {Code: "ko", Name: "Korean", Locale: "ko"}, + {Code: "lt", Name: "Lithuanian", Locale: "lt"}, + {Code: "lv", Name: "Latvian", Locale: "lv"}, + {Code: "mi", Name: "Maori", Locale: "mi"}, + {Code: "mk", Name: "Macedonian", Locale: "mk"}, + {Code: "mr", Name: "Marathi", Locale: "mr"}, + {Code: "ms", Name: "Malay", Locale: "ms"}, + {Code: "ne", Name: "Nepali", Locale: "ne"}, + {Code: "nl", Name: "Dutch", Locale: "nl"}, + {Code: "no", Name: "Norwegian", Locale: "no"}, + {Code: "pl", Name: "Polish", Locale: "pl"}, + {Code: "pt", Name: "Portuguese", Locale: "pt"}, + {Code: "ro", Name: "Romanian", Locale: "ro"}, + {Code: "ru", Name: "Russian", Locale: "ru"}, + {Code: "sk", Name: "Slovak", Locale: "sk"}, + {Code: "sl", Name: "Slovenian", Locale: "sl"}, + {Code: "sr", Name: "Serbian", Locale: "sr"}, + {Code: "sv", Name: "Swedish", Locale: "sv"}, + {Code: "sw", Name: "Swahili", Locale: "sw"}, + {Code: "ta", Name: "Tamil", Locale: "ta"}, + {Code: "th", Name: "Thai", Locale: "th"}, + {Code: "tl", Name: "Tagalog", Locale: "tl"}, + {Code: "tr", Name: "Turkish", Locale: "tr"}, + {Code: "uk", Name: "Ukrainian", Locale: "uk"}, + {Code: "ur", Name: "Urdu", Locale: "ur"}, + {Code: "vi", Name: "Vietnamese", Locale: "vi"}, + {Code: "zh", Name: "Chinese", Locale: "zh"}, + } +} diff --git a/pkg/insights/providers/openai/transcribe.go b/pkg/insights/providers/openai/transcribe.go new file mode 100644 index 00000000..98406f08 --- /dev/null +++ b/pkg/insights/providers/openai/transcribe.go @@ -0,0 +1,262 @@ +package openai + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "sync" + "sync/atomic" + "time" + + "github.com/livekit/media-sdk" + openaisdk "github.com/openai/openai-go/v3" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// chunkedStream implements insights.TranscriptionStream by buffering PCM16 +// audio in memory and POSTing fixed-duration WAV chunks to the OpenAI +// transcription endpoint. Each upload yields a single final_result event. +// +// This trades partials for portability: the same code path works against any +// OpenAI-compatible HTTP backend (faster-whisper-server, whisper.cpp, vLLM, +// LocalAI). For partial deltas use mode: "realtime" (realtimeStream). +type chunkedStream struct { + client openaisdk.Client + model string + chunkSamples int + language string + userId string + userName string + allowStorage bool + + bufMu sync.Mutex + buf []int16 + + chunks chan []int16 + results chan *insights.TranscriptionEvent + + closed atomic.Bool + wg sync.WaitGroup + cancel context.CancelFunc + + log *logrus.Entry +} + +func newChunkedStream(parentCtx context.Context, client openaisdk.Client, model string, chunkSeconds float64, roomId, userId string, opts *insights.TranscriptionOptions, log *logrus.Entry) (*chunkedStream, error) { + if chunkSeconds <= 0 { + chunkSeconds = defaultChunkSeconds + } + ctx, cancel := context.WithCancel(parentCtx) + + s := &chunkedStream{ + client: client, + model: model, + chunkSamples: int(chunkSeconds * float64(transcriptionSampleRate)), + language: opts.SpokenLang, + userId: userId, + userName: opts.UserName, + allowStorage: opts.AllowedTranscriptionStorage, + chunks: make(chan []int16, 4), + results: make(chan *insights.TranscriptionEvent, 16), + cancel: cancel, + log: log.WithFields(logrus.Fields{ + "roomId": roomId, + "userId": userId, + "lang": opts.SpokenLang, + "model": model, + }), + } + + s.wg.Add(1) + go s.uploadLoop(ctx) + + s.results <- &insights.TranscriptionEvent{Type: insights.EventTypeSessionStarted} + return s, nil +} + +// WriteSample appends PCM16 samples to the in-memory buffer and pushes a chunk +// onto the upload queue once the configured duration has accumulated. +func (s *chunkedStream) WriteSample(sample media.PCM16Sample) error { + if s.closed.Load() { + return fmt.Errorf("stream is closed") + } + + s.bufMu.Lock() + s.buf = append(s.buf, sample...) + var ready []int16 + if len(s.buf) >= s.chunkSamples { + ready = make([]int16, s.chunkSamples) + copy(ready, s.buf[:s.chunkSamples]) + s.buf = s.buf[s.chunkSamples:] + } + s.bufMu.Unlock() + + if ready != nil { + select { + case s.chunks <- ready: + default: + // Upload queue is saturated. Dropping is preferable to blocking + // the audio path; the next chunk has fresh content anyway. + s.log.Warn("openai transcription upload queue full, dropping chunk") + } + } + return nil +} + +// Close flushes any buffered tail audio, signals the upload loop to drain, +// and waits for in-flight uploads to finish before returning. +func (s *chunkedStream) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return nil + } + + s.bufMu.Lock() + tail := s.buf + s.buf = nil + s.bufMu.Unlock() + + if len(tail) > 0 { + select { + case s.chunks <- tail: + default: + s.log.Warn("openai transcription upload queue full at close, dropping tail") + } + } + close(s.chunks) + s.wg.Wait() + s.cancel() + close(s.results) + return nil +} + +// SetProperty is unused; OpenAI's transcription model is selected at session +// creation and does not accept mid-stream property changes. +func (s *chunkedStream) SetProperty(_, _ string) error { return nil } + +// Results exposes the read side of the event channel. +func (s *chunkedStream) Results() <-chan *insights.TranscriptionEvent { return s.results } + +func (s *chunkedStream) uploadLoop(ctx context.Context) { + defer s.wg.Done() + defer func() { + // Best-effort: always emit a stopped event so downstream consumers + // can release any room-level state tied to this stream. + s.safeSend(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + }() + + for samples := range s.chunks { + if len(samples) == 0 { + continue + } + text, err := s.transcribeChunk(ctx, samples) + if err != nil { + if ctx.Err() != nil { + return + } + s.log.WithError(err).Warn("openai transcription upload failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + continue + } + if text == "" { + continue + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeFinalResult, + Result: &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.userName, + Lang: s.language, + Text: text, + IsPartial: false, + AllowedTranscriptionStorage: s.allowStorage, + Translations: map[string]string{}, + }, + }) + } +} + +func (s *chunkedStream) safeSend(ev *insights.TranscriptionEvent) { + defer func() { + // A late event after Close() races with channel close; swallow the + // resulting panic rather than tearing down the upload goroutine. + _ = recover() + }() + select { + case s.results <- ev: + default: + s.log.Warn("openai transcription results channel full, dropping event") + } +} + +func (s *chunkedStream) transcribeChunk(ctx context.Context, samples []int16) (string, error) { + wav := encodeWAV(samples, transcriptionSampleRate) + + params := openaisdk.AudioTranscriptionNewParams{ + Model: openaisdk.AudioModel(s.model), + File: namedReader{Reader: bytes.NewReader(wav), name: "audio.wav"}, + } + if s.language != "" { + params.Language = openaisdk.String(s.language) + } + + // Each chunk gets a generous-but-bounded deadline so a hung backend + // doesn't pile up requests indefinitely. + reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + resp, err := s.client.Audio.Transcriptions.New(reqCtx, params) + if err != nil { + return "", fmt.Errorf("audio.transcriptions.new: %w", err) + } + return resp.Text, nil +} + +// namedReader exposes a filename to the openai-go multipart encoder, which +// uses it to derive the upload's MIME type. Without a recognised audio +// extension Whisper-compatible servers refuse the upload. +type namedReader struct { + io.Reader + name string +} + +func (n namedReader) Name() string { return n.name } + +// encodeWAV serialises 16-bit signed little-endian PCM samples as a minimal +// canonical RIFF/WAVE file (44-byte header + interleaved data). 16 kHz mono +// matches the audio LiveKit hands us via PCM16Sample. +func encodeWAV(samples []int16, sampleRate int) []byte { + const ( + numChannels = 1 + bitsPerSample = 16 + ) + dataSize := len(samples) * 2 + totalSize := 36 + dataSize + + buf := bytes.NewBuffer(make([]byte, 0, 44+dataSize)) + buf.WriteString("RIFF") + _ = binary.Write(buf, binary.LittleEndian, uint32(totalSize)) + buf.WriteString("WAVE") + + buf.WriteString("fmt ") + _ = binary.Write(buf, binary.LittleEndian, uint32(16)) // PCM fmt chunk size + _ = binary.Write(buf, binary.LittleEndian, uint16(1)) // PCM format + _ = binary.Write(buf, binary.LittleEndian, uint16(numChannels)) + _ = binary.Write(buf, binary.LittleEndian, uint32(sampleRate)) + _ = binary.Write(buf, binary.LittleEndian, uint32(sampleRate*numChannels*bitsPerSample/8)) + _ = binary.Write(buf, binary.LittleEndian, uint16(numChannels*bitsPerSample/8)) + _ = binary.Write(buf, binary.LittleEndian, uint16(bitsPerSample)) + + buf.WriteString("data") + _ = binary.Write(buf, binary.LittleEndian, uint32(dataSize)) + _ = binary.Write(buf, binary.LittleEndian, samples) + + return buf.Bytes() +} diff --git a/pkg/insights/providers/openai/transcribe_realtime.go b/pkg/insights/providers/openai/transcribe_realtime.go new file mode 100644 index 00000000..bc7bd55f --- /dev/null +++ b/pkg/insights/providers/openai/transcribe_realtime.go @@ -0,0 +1,347 @@ +package openai + +import ( + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/livekit/media-sdk" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/config" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// realtimeStream implements insights.TranscriptionStream over the OpenAI +// Realtime WebSocket API, surfacing per-segment partial_result and +// final_result events. Requires a backend that speaks the Realtime protocol +// (OpenAI cloud or Azure OpenAI Realtime). +type realtimeStream struct { + conn *websocket.Conn + model string + language string + userId string + userName string + allow bool + + // All sends go through outbound; gorilla/websocket forbids concurrent writes. + outbound chan []byte + results chan *insights.TranscriptionEvent + + pmu sync.Mutex + partials map[string]string + + closed atomic.Bool + wg sync.WaitGroup + cancel context.CancelFunc + log *logrus.Entry +} + +const ( + defaultRealtimeURL = "wss://api.openai.com/v1/realtime?intent=transcription" + defaultRealtimeModel = "gpt-4o-mini-transcribe" + realtimeDialTimeout = 10 * time.Second +) + +func newRealtimeStream(parentCtx context.Context, account *config.ProviderAccount, model, roomId, userId string, opts *insights.TranscriptionOptions, log *logrus.Entry) (*realtimeStream, error) { + if account == nil { + return nil, fmt.Errorf("openai realtime: provider account is nil") + } + if account.Credentials.APIKey == "" { + return nil, fmt.Errorf("openai realtime: credentials.api_key is required") + } + + url := defaultRealtimeURL + if v, _ := account.Options["realtime_url"].(string); v != "" { + url = v + } + // whisper-1 is rejected by the Realtime transcription endpoint. + if model == "" || model == defaultTranscriptionModel { + model = defaultRealtimeModel + } + + header := http.Header{} + header.Set("Authorization", "Bearer "+account.Credentials.APIKey) + header.Set("OpenAI-Beta", "realtime=v1") + + dialer := &websocket.Dialer{HandshakeTimeout: realtimeDialTimeout} + dialCtx, dialCancel := context.WithTimeout(parentCtx, realtimeDialTimeout) + defer dialCancel() + + conn, resp, err := dialer.DialContext(dialCtx, url, header) + if err != nil { + if resp != nil { + return nil, fmt.Errorf("openai realtime: dial %s: %w (http %d)", url, err, resp.StatusCode) + } + return nil, fmt.Errorf("openai realtime: dial %s: %w", url, err) + } + + ctx, cancel := context.WithCancel(parentCtx) + s := &realtimeStream{ + conn: conn, + model: model, + language: opts.SpokenLang, + userId: userId, + userName: opts.UserName, + allow: opts.AllowedTranscriptionStorage, + outbound: make(chan []byte, 64), + results: make(chan *insights.TranscriptionEvent, 16), + partials: make(map[string]string), + cancel: cancel, + log: log.WithFields(logrus.Fields{ + "roomId": roomId, + "userId": userId, + "lang": opts.SpokenLang, + "model": model, + "mode": modeRealtime, + }), + } + + if err := s.sendSessionUpdate(); err != nil { + _ = conn.Close() + cancel() + return nil, err + } + + s.wg.Add(2) + go s.writeLoop(ctx) + go s.readLoop(ctx) + + s.results <- &insights.TranscriptionEvent{Type: insights.EventTypeSessionStarted} + return s, nil +} + +// sendSessionUpdate runs synchronously before writeLoop starts, so it writes +// directly. After that, all sends MUST go through outbound. +func (s *realtimeStream) sendSessionUpdate() error { + session := map[string]any{ + "type": "transcription_session.update", + "session": map[string]any{ + "input_audio_format": "pcm16", + "input_audio_transcription": map[string]any{ + "model": s.model, + "language": s.language, + }, + // Server VAD drives the .completed event we map to final_result. + "turn_detection": map[string]any{ + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 500, + }, + }, + } + payload, err := json.Marshal(session) + if err != nil { + return fmt.Errorf("openai realtime: marshal session.update: %w", err) + } + if err := s.conn.WriteMessage(websocket.TextMessage, payload); err != nil { + return fmt.Errorf("openai realtime: send session.update: %w", err) + } + return nil +} + +func (s *realtimeStream) WriteSample(sample media.PCM16Sample) error { + if s.closed.Load() { + return fmt.Errorf("stream is closed") + } + if len(sample) == 0 { + return nil + } + + raw := make([]byte, len(sample)*2) + for i, v := range sample { + binary.LittleEndian.PutUint16(raw[i*2:], uint16(v)) + } + + event := map[string]string{ + "type": "input_audio_buffer.append", + "audio": base64.StdEncoding.EncodeToString(raw), + } + payload, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("openai realtime: marshal append: %w", err) + } + + select { + case s.outbound <- payload: + default: + s.log.Warn("openai realtime outbound queue full, dropping audio frame") + } + return nil +} + +func (s *realtimeStream) SetProperty(_, _ string) error { return nil } + +func (s *realtimeStream) Results() <-chan *insights.TranscriptionEvent { return s.results } + +func (s *realtimeStream) Close() error { + if !s.closed.CompareAndSwap(false, true) { + return nil + } + + commit, _ := json.Marshal(map[string]string{"type": "input_audio_buffer.commit"}) + select { + case s.outbound <- commit: + default: + } + + close(s.outbound) + s.cancel() + // Close unblocks readLoop with a network error. + _ = s.conn.Close() + + s.wg.Wait() + close(s.results) + return nil +} + +func (s *realtimeStream) writeLoop(ctx context.Context) { + defer s.wg.Done() + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-s.outbound: + if !ok { + return + } + _ = s.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := s.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + if !s.closed.Load() { + s.log.WithError(err).Warn("openai realtime write failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + } + return + } + } + } +} + +func (s *realtimeStream) readLoop(ctx context.Context) { + defer s.wg.Done() + defer s.safeSend(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + + for { + _, data, err := s.conn.ReadMessage() + if err != nil { + if ctx.Err() != nil || s.closed.Load() { + return + } + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + return + } + s.log.WithError(err).Warn("openai realtime read failed") + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) + return + } + s.dispatch(data) + } +} + +type realtimeServerEvent struct { + Type string `json:"type"` + ItemID string `json:"item_id,omitempty"` + Delta string `json:"delta,omitempty"` + Transcript string `json:"transcript,omitempty"` + Error *struct { + Message string `json:"message"` + Code string `json:"code"` + } `json:"error,omitempty"` +} + +func (s *realtimeStream) dispatch(data []byte) { + var ev realtimeServerEvent + if err := json.Unmarshal(data, &ev); err != nil { + s.log.WithError(err).Debug("openai realtime: unparseable server event") + return + } + + switch ev.Type { + case "conversation.item.input_audio_transcription.delta": + text := s.appendPartial(ev.ItemID, ev.Delta) + if text == "" { + return + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypePartialResult, + Result: s.buildResult(text, true), + }) + + case "conversation.item.input_audio_transcription.completed": + text := strings.TrimSpace(ev.Transcript) + s.clearPartial(ev.ItemID) + if text == "" { + return + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeFinalResult, + Result: s.buildResult(text, false), + }) + + case "error": + msg := "unknown realtime error" + if ev.Error != nil && ev.Error.Message != "" { + msg = ev.Error.Message + } + s.safeSend(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: msg, + }) + } +} + +func (s *realtimeStream) appendPartial(itemID, delta string) string { + if itemID == "" || delta == "" { + return "" + } + s.pmu.Lock() + defer s.pmu.Unlock() + s.partials[itemID] += delta + return strings.TrimSpace(s.partials[itemID]) +} + +func (s *realtimeStream) clearPartial(itemID string) { + if itemID == "" { + return + } + s.pmu.Lock() + delete(s.partials, itemID) + s.pmu.Unlock() +} + +func (s *realtimeStream) buildResult(text string, isPartial bool) *plugnmeet.InsightsTranscriptionResult { + return &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.userName, + Lang: s.language, + Text: text, + IsPartial: isPartial, + AllowedTranscriptionStorage: s.allow, + Translations: map[string]string{}, + } +} + +func (s *realtimeStream) safeSend(ev *insights.TranscriptionEvent) { + defer func() { _ = recover() }() + select { + case s.results <- ev: + default: + s.log.Warn("openai realtime results channel full, dropping event") + } +} diff --git a/pkg/insights/providers/openai/translate.go b/pkg/insights/providers/openai/translate.go new file mode 100644 index 00000000..8c3f8cd7 --- /dev/null +++ b/pkg/insights/providers/openai/translate.go @@ -0,0 +1,154 @@ +package openai + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + openaisdk "github.com/openai/openai-go/v3" + "github.com/sirupsen/logrus" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" +) + +// translateViaChatCompletions uses the Chat Completions endpoint with a +// dynamically-shaped JSON Schema to translate a single source string into all +// requested target languages in one round-trip. The schema's properties are +// the target language codes, so a Strict-mode response is forced to populate +// exactly the keys we expect. +// +// We deliberately use Chat Completions rather than the newer Responses API: +// every OpenAI-compatible self-hosted backend (llama.cpp-server, LocalAI, +// vLLM, Ollama) implements /v1/chat/completions, while the Responses API is +// OpenAI-specific. Keeping translation portable matters more here than using +// the latest API surface. +func translateViaChatCompletions(parentCtx context.Context, client openaisdk.Client, model, text, sourceLang string, targetLangs []string, log *logrus.Entry) (*plugnmeet.InsightsTextTranslationResult, error) { + if strings.TrimSpace(text) == "" { + // Skip the round-trip; an empty source maps to empty translations + // and the model would either refuse or hallucinate filler. + empty := make(map[string]string, len(targetLangs)) + for _, l := range targetLangs { + empty[l] = "" + } + return &plugnmeet.InsightsTextTranslationResult{ + SourceText: text, + SourceLang: sourceLang, + Translations: empty, + }, nil + } + + schema := buildTranslationSchema(targetLangs) + systemPrompt := "You are a professional translation engine. Translate the user's text faithfully and idiomatically. Preserve meaning, tone, named entities, and numbers. Do not add explanations. Respond ONLY with the JSON object that matches the response schema." + + srcDescriptor := sourceLang + if srcDescriptor == "" { + srcDescriptor = "auto-detect" + } + userPrompt := fmt.Sprintf( + "Source language: %s\nTarget languages (ISO 639-1): %s\n\nText to translate:\n%s", + srcDescriptor, + strings.Join(targetLangs, ", "), + text, + ) + + schemaParam := openaisdk.ResponseFormatJSONSchemaJSONSchemaParam{ + Name: "translations", + Description: openaisdk.String("Map of ISO 639-1 target language code to translated text."), + Schema: schema, + Strict: openaisdk.Bool(true), + } + + ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second) + defer cancel() + + resp, err := client.Chat.Completions.New(ctx, openaisdk.ChatCompletionNewParams{ + Model: openaisdk.ChatModel(model), + Messages: []openaisdk.ChatCompletionMessageParamUnion{ + openaisdk.SystemMessage(systemPrompt), + openaisdk.UserMessage(userPrompt), + }, + ResponseFormat: openaisdk.ChatCompletionNewParamsResponseFormatUnion{ + OfJSONSchema: &openaisdk.ResponseFormatJSONSchemaParam{JSONSchema: schemaParam}, + }, + Temperature: openaisdk.Float(0), + }) + if err != nil { + return nil, fmt.Errorf("chat.completions.new: %w", err) + } + if len(resp.Choices) == 0 { + return nil, fmt.Errorf("openai translation: empty choices in response") + } + + raw := resp.Choices[0].Message.Content + parsed, err := parseTranslationJSON(raw) + if err != nil { + log.WithError(err).WithField("raw", truncate(raw, 256)).Warn("openai translation: failed to parse JSON response") + return nil, fmt.Errorf("failed to parse translation JSON: %w", err) + } + + translations := make(map[string]string, len(targetLangs)) + for _, l := range targetLangs { + translations[l] = parsed[l] + } + + return &plugnmeet.InsightsTextTranslationResult{ + SourceText: text, + SourceLang: sourceLang, + Translations: translations, + }, nil +} + +// buildTranslationSchema produces a JSON Schema requiring exactly the supplied +// language codes as string properties. Strict mode then forces the model to +// emit all of them and only them. +func buildTranslationSchema(targetLangs []string) map[string]any { + properties := make(map[string]any, len(targetLangs)) + required := make([]string, 0, len(targetLangs)) + for _, l := range targetLangs { + properties[l] = map[string]any{"type": "string"} + required = append(required, l) + } + return map[string]any{ + "type": "object", + "additionalProperties": false, + "properties": properties, + "required": required, + } +} + +// parseTranslationJSON unmarshals the response, retrying once with markdown +// fences stripped for backends that ignore Strict mode. +func parseTranslationJSON(raw string) (map[string]string, error) { + parsed := map[string]string{} + if err := json.Unmarshal([]byte(raw), &parsed); err == nil { + return parsed, nil + } else if cleaned := stripJSONNoise(raw); cleaned != raw { + if err2 := json.Unmarshal([]byte(cleaned), &parsed); err2 == nil { + return parsed, nil + } + return nil, err + } else { + return nil, err + } +} + +// stripJSONNoise trims markdown code fences and surrounding whitespace from +// a model response. Strict-mode-compliant backends shouldn't need this; it's +// purely a forgiveness layer for OpenAI-compatible servers that don't enforce +// the schema. +func stripJSONNoise(s string) string { + s = strings.TrimSpace(s) + s = strings.TrimPrefix(s, "```json") + s = strings.TrimPrefix(s, "```") + s = strings.TrimSuffix(s, "```") + return strings.TrimSpace(s) +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "…" +} diff --git a/pkg/services/insights/insights_service.go b/pkg/services/insights/insights_service.go index 60961541..51588a45 100644 --- a/pkg/services/insights/insights_service.go +++ b/pkg/services/insights/insights_service.go @@ -8,6 +8,7 @@ import ( "github.com/mynaparrot/plugnmeet-server/pkg/insights" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/azure" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/google" + openaiprovider "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/openai" natsservice "github.com/mynaparrot/plugnmeet-server/pkg/services/nats" redisservice "github.com/mynaparrot/plugnmeet-server/pkg/services/redis" "github.com/sirupsen/logrus" @@ -23,6 +24,8 @@ func NewProvider(ctx context.Context, providerType string, providerAccount *conf return azure.NewProvider(providerAccount, serviceConfig, log) case "google": return google.NewProvider(ctx, providerAccount, serviceConfig, log) + case "openai": + return openaiprovider.NewProvider(providerAccount, serviceConfig, log) default: return nil, fmt.Errorf("unknown AI provider type: %s", providerType) }