diff --git a/config_sample.yaml b/config_sample.yaml index 66e5a4dc..6aa6f476 100644 --- a/config_sample.yaml +++ b/config_sample.yaml @@ -252,6 +252,18 @@ insights: credentials: api_key: "YOUR_GEMINI_API_KEY_HERE" + # Self-hosted transcription & translation. Talks to a companion + # service over WebSocket (PCM16). Reference implementation: + # https://github.com/xynstr/plugnmeet-local-insights + # Useful for GDPR-sensitive or air-gapped deployments. Setup guide: + # docs/providers/local.md + #local: + # - id: "local-01" + # credentials: { api_key: "", region: "" } + # options: + # whisper_url: "ws://whisper-local:8002/ws/transcribe" + # translate_url: "http://whisper-local:8002/translate" + # 2. Define the services that USE the providers. # The key ("transcription", "translation", "ai_text_chat", "meeting_summarizing") is the service name. services: diff --git a/docs/providers/local.md b/docs/providers/local.md new file mode 100644 index 00000000..48b2e650 --- /dev/null +++ b/docs/providers/local.md @@ -0,0 +1,104 @@ +# Local (Self-Hosted) Insights Provider + +The `local` provider enables self-hosted transcription and translation +without sending audio to a cloud vendor. It is useful for GDPR-sensitive +deployments, air-gapped environments, or cost-sensitive operators. + +## Architecture + +``` +LiveKit audio ──► plugNmeet-server ──► local provider + │ + ├─ WebSocket (PCM16 16kHz mono) + │ + ▼ + companion service (Python) + ├─ faster-whisper (STT) + └─ NLLB-200 (translation, optional) +``` + +The `local` provider is a thin Go client that speaks a simple WebSocket +protocol. The heavy lifting (speech recognition, translation) happens in a +separate companion service written in Python. The reference +implementation is available at: + +**** (MIT licensed) + +## Configuration + +Add this block to your `config.yaml`: + +```yaml +insights: + enabled: true + providers: + local: + - id: "local-01" + credentials: + api_key: "" + region: "" + options: + whisper_url: "ws://whisper-local:8002/ws/transcribe" + translate_url: "http://whisper-local:8002/translate" + services: + transcription: + provider: "local" + id: "local-01" + options: {} + translation: + provider: "local" + id: "local-01" + options: {} +``` + +`whisper_url` is the WebSocket endpoint of the companion service. +`translate_url` is optional and only needed when translation is used. + +## Running the companion service + +```bash +docker run -d --name whisper-local \ + --network plugnmeet_net \ + -p 8002:8002 \ + ghcr.io/xynstr/plugnmeet-local-insights:latest +``` + +Or build from source — see the companion repo's README for details. + +## Supported languages + +Transcription (via faster-whisper): +`de`, `en`, `ar`, `uk`, `ru`, `fr`, `es`, `it`, `pl`, `tr`, `fa`, `zh`, +`ja`, `ko`, `pt`, `nl`. + +Translation (via NLLB-200, optional, non-commercial license — see +companion repo for details): same list. + +## Hardware notes + +The reference implementation defaults to CPU with int8 quantization. +It is tested on: + +- ARM64 (Neoverse-N1, 10 cores) — `small` Whisper model, real-time + transcription feasible with VAD and 500 ms chunks. +- x86_64 — similar performance profile. + +For GPU, switch the companion service to `device=cuda` via environment +variables (see companion repo). + +## Protocol + +The WebSocket protocol is intentionally minimal: + +``` +Client → Server {"type":"start","lang":"de","transLangs":["en"]} +Client → Server +Server → Client {"type":"partial","text":"...","lang":"de"} +Server → Client {"type":"final","text":"...","lang":"de"} +Server → Client {"type":"error","error":"..."} +Client → Server {"type":"end"} +``` + +Anyone implementing a different backend (e.g., whisper.cpp, Vosk, Deepgram +self-hosted) can replace the companion service without changing any Go +code, as long as this protocol is honored. diff --git a/pkg/insights/providers/local/client.go b/pkg/insights/providers/local/client.go new file mode 100644 index 00000000..ef233cb4 --- /dev/null +++ b/pkg/insights/providers/local/client.go @@ -0,0 +1,161 @@ +package local + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/config" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" + "github.com/sirupsen/logrus" +) + +// LocalProvider implements insights.Provider using local faster-whisper + NLLB translation. +type LocalProvider struct { + account *config.ProviderAccount + service *config.ServiceConfig + logger *logrus.Entry + httpClient *http.Client +} + +// NewProvider creates a new LocalProvider. +func NewProvider(providerAccount *config.ProviderAccount, serviceConfig *config.ServiceConfig, log *logrus.Entry) (insights.Provider, error) { + return &LocalProvider{ + account: providerAccount, + service: serviceConfig, + logger: log.WithField("service", "local"), + httpClient: &http.Client{Timeout: 15 * time.Second}, + }, nil +} + +// CreateTranscription opens a WebSocket connection to the local whisper service. +func (p *LocalProvider) CreateTranscription(ctx context.Context, roomId, userId string, options []byte) (insights.TranscriptionStream, error) { + opts := &insights.TranscriptionOptions{} + if len(options) > 0 { + if err := json.Unmarshal(options, opts); err != nil { + return nil, fmt.Errorf("failed to unmarshal transcription options: %w", err) + } + } + + whisperURL, _ := p.account.Options["whisper_url"].(string) + if whisperURL == "" { + return nil, fmt.Errorf("local provider: whisper_url not configured in account options") + } + + return newTranscribeStream(ctx, whisperURL, roomId, userId, opts, p.logger) +} + +// TranslateText calls the NLLB translation proxy (Azure Translation API-compatible). +func (p *LocalProvider) TranslateText(ctx context.Context, text, sourceLang string, targetLangs []string) (*plugnmeet.InsightsTextTranslationResult, error) { + translateURL, _ := p.account.Options["translate_url"].(string) + if translateURL == "" { + return nil, fmt.Errorf("local provider: translate_url not configured in account options") + } + if len(targetLangs) == 0 { + return nil, fmt.Errorf("at least one target language is required") + } + + u, err := url.Parse(translateURL) + if err != nil { + return nil, fmt.Errorf("failed to parse translate_url: %w", err) + } + q := u.Query() + q.Add("from", sourceLang) + for _, l := range targetLangs { + q.Add("to", l) + } + u.RawQuery = q.Encode() + + requestBody, err := json.Marshal([]struct { + Text string `json:"Text"` + }{{Text: text}}) + if err != nil { + return nil, fmt.Errorf("failed to marshal translation request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), bytes.NewBuffer(requestBody)) + if err != nil { + return nil, fmt.Errorf("failed to create translation request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("translation request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // Bound the error body read to keep a misbehaving companion + // from exhausting memory on a very large response. + bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) + return nil, fmt.Errorf("translation request failed with status %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var response []struct { + Translations []struct { + Text string `json:"text"` + To string `json:"to"` + } `json:"translations"` + } + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, fmt.Errorf("failed to decode translation response: %w", err) + } + if len(response) == 0 || len(response[0].Translations) == 0 { + return nil, fmt.Errorf("empty translation response from local proxy") + } + + translations := make(map[string]string) + for _, t := range response[0].Translations { + translations[t.To] = t.Text + } + + return &plugnmeet.InsightsTextTranslationResult{ + SourceText: text, + SourceLang: sourceLang, + Translations: translations, + }, nil +} + +// SynthesizeText is not supported by the local provider. +func (p *LocalProvider) SynthesizeText(_ context.Context, _ []byte) (io.ReadCloser, error) { + return nil, fmt.Errorf("speech synthesis not supported by local provider") +} + +// GetSupportedLanguages returns the list of supported languages. +func (p *LocalProvider) GetSupportedLanguages(serviceType insights.ServiceType) []*plugnmeet.InsightsSupportedLangInfo { + if langs, ok := supportedLanguages[serviceType]; ok { + result := make([]*plugnmeet.InsightsSupportedLangInfo, len(langs)) + for i := range langs { + result[i] = &langs[i] + } + return result + } + return make([]*plugnmeet.InsightsSupportedLangInfo, 0) +} + +func (p *LocalProvider) AITextChatStream(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (<-chan *plugnmeet.InsightsAITextChatStreamResult, error) { + return nil, nil +} + +func (p *LocalProvider) AIChatTextSummarize(_ context.Context, _ string, _ []*plugnmeet.InsightsAITextChatContent) (string, uint32, uint32, error) { + return "", 0, 0, nil +} + +func (p *LocalProvider) StartBatchSummarizeAudioFile(_ context.Context, _, _, _ string) (string, string, error) { + return "", "", nil +} + +func (p *LocalProvider) CheckBatchJobStatus(_ context.Context, _ string) (*insights.BatchJobResponse, error) { + return nil, nil +} + +func (p *LocalProvider) DeleteUploadedFile(_ context.Context, _ string) error { + return nil +} diff --git a/pkg/insights/providers/local/languages.go b/pkg/insights/providers/local/languages.go new file mode 100644 index 00000000..27cdbdd9 --- /dev/null +++ b/pkg/insights/providers/local/languages.go @@ -0,0 +1,47 @@ +package local + +import ( + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" +) + +// supportedLanguages lists languages supported by faster-whisper (transcription) +// and NLLB-200 (translation). These are common language codes used in PlugNmeet. +var supportedLanguages = map[insights.ServiceType][]plugnmeet.InsightsSupportedLangInfo{ + insights.ServiceTypeTranscription: { + {Code: "de", Name: "German", Locale: "de"}, + {Code: "en", Name: "English", Locale: "en"}, + {Code: "ar", Name: "Arabic", Locale: "ar"}, + {Code: "uk", Name: "Ukrainian", Locale: "uk"}, + {Code: "ru", Name: "Russian", Locale: "ru"}, + {Code: "fr", Name: "French", Locale: "fr"}, + {Code: "es", Name: "Spanish", Locale: "es"}, + {Code: "it", Name: "Italian", Locale: "it"}, + {Code: "pl", Name: "Polish", Locale: "pl"}, + {Code: "tr", Name: "Turkish", Locale: "tr"}, + {Code: "fa", Name: "Persian", Locale: "fa"}, + {Code: "zh", Name: "Chinese", Locale: "zh"}, + {Code: "ja", Name: "Japanese", Locale: "ja"}, + {Code: "ko", Name: "Korean", Locale: "ko"}, + {Code: "pt", Name: "Portuguese", Locale: "pt"}, + {Code: "nl", Name: "Dutch", Locale: "nl"}, + }, + insights.ServiceTypeTranslation: { + {Code: "de", Name: "German", Locale: "de"}, + {Code: "en", Name: "English", Locale: "en"}, + {Code: "ar", Name: "Arabic", Locale: "ar"}, + {Code: "uk", Name: "Ukrainian", Locale: "uk"}, + {Code: "ru", Name: "Russian", Locale: "ru"}, + {Code: "fr", Name: "French", Locale: "fr"}, + {Code: "es", Name: "Spanish", Locale: "es"}, + {Code: "it", Name: "Italian", Locale: "it"}, + {Code: "pl", Name: "Polish", Locale: "pl"}, + {Code: "tr", Name: "Turkish", Locale: "tr"}, + {Code: "fa", Name: "Persian", Locale: "fa"}, + {Code: "zh", Name: "Chinese", Locale: "zh"}, + {Code: "ja", Name: "Japanese", Locale: "ja"}, + {Code: "ko", Name: "Korean", Locale: "ko"}, + {Code: "pt", Name: "Portuguese", Locale: "pt"}, + {Code: "nl", Name: "Dutch", Locale: "nl"}, + }, +} diff --git a/pkg/insights/providers/local/transcribe.go b/pkg/insights/providers/local/transcribe.go new file mode 100644 index 00000000..cbc7137b --- /dev/null +++ b/pkg/insights/providers/local/transcribe.go @@ -0,0 +1,238 @@ +package local + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/livekit/media-sdk" + "github.com/mynaparrot/plugnmeet-protocol/plugnmeet" + "github.com/mynaparrot/plugnmeet-server/pkg/insights" + "github.com/sirupsen/logrus" +) + +type whisperMsg struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Lang string `json:"lang,omitempty"` + Error string `json:"error,omitempty"` +} + +// audioBufPool reuses PCM-encoding scratch buffers. WriteSample is called +// at audio-frame rate (tens of Hz per active speaker), so avoiding per-call +// allocation materially reduces GC pressure on busy servers. Initial cap is +// sized for typical LiveKit opus frame expansions. +var audioBufPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, 0, 4096) + return &b + }, +} + +type localTranscribeStream struct { + conn *websocket.Conn + cancel context.CancelFunc + results chan *insights.TranscriptionEvent + mu sync.Mutex + closed bool + opts *insights.TranscriptionOptions + userId string + log *logrus.Entry +} + +func newTranscribeStream(mainCtx context.Context, wsURL, roomId, userId string, opts *insights.TranscriptionOptions, log *logrus.Entry) (*localTranscribeStream, error) { + l := log.WithFields(logrus.Fields{ + "roomId": roomId, + "userId": userId, + "lang": opts.SpokenLang, + }) + + dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second} + conn, _, err := dialer.Dial(wsURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to connect to whisper service at %s: %w", wsURL, err) + } + + // Send handshake so the server knows the language and target translations. + handshake := map[string]interface{}{ + "type": "start", + "lang": opts.SpokenLang, + "transLangs": opts.TransLangs, + "userName": opts.UserName, + } + if err := conn.WriteJSON(handshake); err != nil { + conn.Close() + return nil, fmt.Errorf("failed to send handshake to whisper service: %w", err) + } + + resultsChan := make(chan *insights.TranscriptionEvent, 32) + ctx, cancel := context.WithCancel(mainCtx) + + s := &localTranscribeStream{ + conn: conn, + cancel: cancel, + results: resultsChan, + opts: opts, + userId: userId, + log: l, + } + + go s.readLoop(ctx, resultsChan) + + return s, nil +} + +func (s *localTranscribeStream) readLoop(ctx context.Context, resultsChan chan *insights.TranscriptionEvent) { + safeClose := sync.OnceFunc(func() { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + close(resultsChan) + }) + defer safeClose() + + // emit sends an event or aborts if the context is done. Returning false + // tells the caller to unwind — the consumer is gone and further sends + // would block forever on a full channel. + emit := func(ev *insights.TranscriptionEvent) bool { + select { + case resultsChan <- ev: + return true + case <-ctx.Done(): + return false + } + } + + if !emit(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStarted}) { + return + } + + for { + select { + case <-ctx.Done(): + emit(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + return + default: + } + + _, msgBytes, err := s.conn.ReadMessage() + if err != nil { + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + s.log.WithError(err).Warn("whisper WebSocket read error") + if !emit(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: err.Error(), + }) { + return + } + } + emit(&insights.TranscriptionEvent{Type: insights.EventTypeSessionStopped}) + return + } + + var msg whisperMsg + if err := json.Unmarshal(msgBytes, &msg); err != nil { + s.log.WithError(err).Warn("failed to parse whisper message") + continue + } + + switch msg.Type { + case "partial": + if !emit(&insights.TranscriptionEvent{ + Type: insights.EventTypePartialResult, + Result: &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.opts.UserName, + Lang: msg.Lang, + Text: msg.Text, + IsPartial: true, + AllowedTranscriptionStorage: s.opts.AllowedTranscriptionStorage, + Translations: make(map[string]string), + }, + }) { + return + } + case "final": + if !emit(&insights.TranscriptionEvent{ + Type: insights.EventTypeFinalResult, + Result: &plugnmeet.InsightsTranscriptionResult{ + FromUserId: s.userId, + FromUserName: s.opts.UserName, + Lang: msg.Lang, + Text: msg.Text, + IsPartial: false, + AllowedTranscriptionStorage: s.opts.AllowedTranscriptionStorage, + Translations: make(map[string]string), + }, + }) { + return + } + case "translation": + // Server can optionally send pre-translated results embedded in the message. + // Handled separately if the whisper service does its own translation. + case "error": + if !emit(&insights.TranscriptionEvent{ + Type: insights.EventTypeError, + Error: msg.Error, + }) { + return + } + } + } +} + +// WriteSample converts PCM16 samples to bytes and sends them to the whisper service. +func (s *localTranscribeStream) WriteSample(sample media.PCM16Sample) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return fmt.Errorf("stream is closed") + } + + need := len(sample) * 2 + bufPtr := audioBufPool.Get().(*[]byte) + defer audioBufPool.Put(bufPtr) + + if cap(*bufPtr) < need { + *bufPtr = make([]byte, need) + } else { + *bufPtr = (*bufPtr)[:need] + } + for i, v := range sample { + binary.LittleEndian.PutUint16((*bufPtr)[i*2:], uint16(v)) + } + + // gorilla/websocket WriteMessage copies/consumes the buffer synchronously, + // so returning it to the pool via defer is safe. + return s.conn.WriteMessage(websocket.BinaryMessage, *bufPtr) +} + +// Close signals the stream end, sends a WebSocket close frame, and cancels the context. +func (s *localTranscribeStream) Close() error { + s.cancel() + s.mu.Lock() + defer s.mu.Unlock() + if !s.closed { + _ = s.conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(2*time.Second), + ) + return s.conn.Close() + } + return nil +} + +// SetProperty is a no-op for this provider; all properties are set via the handshake. +func (s *localTranscribeStream) SetProperty(_, _ string) error { + return nil +} + +// Results returns the channel of transcription events. +func (s *localTranscribeStream) Results() <-chan *insights.TranscriptionEvent { + return s.results +} diff --git a/pkg/services/insights/insights_service.go b/pkg/services/insights/insights_service.go index 60961541..107cdb28 100644 --- a/pkg/services/insights/insights_service.go +++ b/pkg/services/insights/insights_service.go @@ -8,6 +8,7 @@ import ( "github.com/mynaparrot/plugnmeet-server/pkg/insights" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/azure" "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/google" + "github.com/mynaparrot/plugnmeet-server/pkg/insights/providers/local" natsservice "github.com/mynaparrot/plugnmeet-server/pkg/services/nats" redisservice "github.com/mynaparrot/plugnmeet-server/pkg/services/redis" "github.com/sirupsen/logrus" @@ -23,6 +24,8 @@ func NewProvider(ctx context.Context, providerType string, providerAccount *conf return azure.NewProvider(providerAccount, serviceConfig, log) case "google": return google.NewProvider(ctx, providerAccount, serviceConfig, log) + case "local": + return local.NewProvider(providerAccount, serviceConfig, log) default: return nil, fmt.Errorf("unknown AI provider type: %s", providerType) }