From a2581eaeb4d42686b86d79fd1dc4106e77aa4145 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Sat, 4 Jan 2025 16:34:01 -0500 Subject: [PATCH] tts throttling and send user_interruption event (#1019) * tts throttling and send user_interruption event * tts streaming: if we get a flush with tokens pending, send the flush after the tokens * wip --- lib/session/call-session.js | 2 ++ lib/tasks/llm/llms/voice_agent_s2s.js | 2 +- lib/utils/tts-streaming-buffer.js | 30 ++++++++++++++++++++------- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 9e28bb83..fd335d1d 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -866,6 +866,8 @@ class CallSession extends Emitter { } } clearTtsStream() { + this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'user_interruption'}) + .catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption')); this.ttsStreamingBuffer?.clear(); } diff --git a/lib/tasks/llm/llms/voice_agent_s2s.js b/lib/tasks/llm/llms/voice_agent_s2s.js index 9c179403..6ff3e5a9 100644 --- a/lib/tasks/llm/llms/voice_agent_s2s.js +++ b/lib/tasks/llm/llms/voice_agent_s2s.js @@ -98,7 +98,7 @@ class TaskLlmVoiceAgent_S2S extends Task { async _api(ep, args) { const res = await ep.api('uuid_voice_agent_s2s', `^^|${args.join('|')}`); if (!res.body?.startsWith('+OK')) { - throw new Error({args}, `Error calling uuid_voice_agent_s2s: ${JSON.stringify(res.body)}`); + throw new Error(`Error calling uuid_voice_agent_s2s: ${JSON.stringify(res.body)}`); } } diff --git a/lib/utils/tts-streaming-buffer.js b/lib/utils/tts-streaming-buffer.js index c3bf2fa6..deda1aa3 100644 --- a/lib/utils/tts-streaming-buffer.js +++ b/lib/utils/tts-streaming-buffer.js @@ -5,8 +5,8 @@ const { TtsStreamingConnectionStatus } = require('../utils/constants'); const MAX_CHUNK_SIZE = 1800; -const HIGH_WATER_BUFFER_SIZE = 5000; -const LOW_WATER_BUFFER_SIZE = 1000; +const HIGH_WATER_BUFFER_SIZE = 1000; +const LOW_WATER_BUFFER_SIZE = 200; const TIMEOUT_RETRY_MSECS = 3000; class TtsStreamingBuffer extends Emitter { @@ -90,7 +90,7 @@ class TtsStreamingBuffer extends Emitter { /* if we crossed the high water mark, reject the request */ if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) { this.logger.info( - `TtsStreamingBuffer:bufferTokensTTS buffer is full, rejecting request to buffer ${totalLength} tokens`); + `TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`); if (!this._isFull) { this._isFull = true; @@ -117,9 +117,14 @@ class TtsStreamingBuffer extends Emitter { return; } else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) { - this._api(this.ep, [this.ep.uuid, 'flush']) - .catch((err) => this.logger.info({err}, - `TtsStreamingBuffer:flush Error flushing TTS streaming: ${JSON.stringify(err)}`)); + + if (this.size === 0) { + this._doFlush(); + } + else { + /* we have tokens queued, so flush after they have been sent */ + this._pendingFlush = true; + } } } @@ -190,8 +195,13 @@ class TtsStreamingBuffer extends Emitter { await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]); this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`); + if (this._pendingFlush) { + this._doFlush(); + this._pendingFlush = false; + } + if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) { - this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full'); + this.logger.info('TtsStreamingBuffer throttling: TTS streaming buffer is no longer full - resuming'); this._isFull = false; this.emit(TtsStreamingEvents.Resume); } @@ -219,6 +229,12 @@ class TtsStreamingBuffer extends Emitter { this.emit(TtsStreamingEvents.ConnectFailure, {vendor}); } + _doFlush() { + this._api(this.ep, [this.ep.uuid, 'flush']) + .catch((err) => this.logger.info({err}, + `TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`)); + } + async _onConnect(vendor) { this.logger.info(`streaming tts connection made to ${vendor}`); this._connectionStatus = TtsStreamingConnectionStatus.Connected;