tts throttling and send user_interruption event (#1019)

* tts throttling and send user_interruption event

* tts streaming: if we get a flush with tokens pending, send the flush after the tokens

* wip
This commit is contained in:
Dave Horton
2025-01-04 16:34:01 -05:00
committed by GitHub
parent 3706aa4d98
commit a2581eaeb4
3 changed files with 26 additions and 8 deletions

View File

@@ -866,6 +866,8 @@ class CallSession extends Emitter {
}
}
clearTtsStream() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'user_interruption'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.clear();
}

View File

@@ -98,7 +98,7 @@ class TaskLlmVoiceAgent_S2S extends Task {
async _api(ep, args) {
const res = await ep.api('uuid_voice_agent_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error({args}, `Error calling uuid_voice_agent_s2s: ${JSON.stringify(res.body)}`);
throw new Error(`Error calling uuid_voice_agent_s2s: ${JSON.stringify(res.body)}`);
}
}

View File

@@ -5,8 +5,8 @@ const {
TtsStreamingConnectionStatus
} = require('../utils/constants');
const MAX_CHUNK_SIZE = 1800;
const HIGH_WATER_BUFFER_SIZE = 5000;
const LOW_WATER_BUFFER_SIZE = 1000;
const HIGH_WATER_BUFFER_SIZE = 1000;
const LOW_WATER_BUFFER_SIZE = 200;
const TIMEOUT_RETRY_MSECS = 3000;
class TtsStreamingBuffer extends Emitter {
@@ -90,7 +90,7 @@ class TtsStreamingBuffer extends Emitter {
/* if we crossed the high water mark, reject the request */
if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) {
this.logger.info(
`TtsStreamingBuffer:bufferTokensTTS buffer is full, rejecting request to buffer ${totalLength} tokens`);
`TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`);
if (!this._isFull) {
this._isFull = true;
@@ -117,9 +117,14 @@ class TtsStreamingBuffer extends Emitter {
return;
}
else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) {
this._api(this.ep, [this.ep.uuid, 'flush'])
.catch((err) => this.logger.info({err},
`TtsStreamingBuffer:flush Error flushing TTS streaming: ${JSON.stringify(err)}`));
if (this.size === 0) {
this._doFlush();
}
else {
/* we have tokens queued, so flush after they have been sent */
this._pendingFlush = true;
}
}
}
@@ -190,8 +195,13 @@ class TtsStreamingBuffer extends Emitter {
await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]);
this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`);
if (this._pendingFlush) {
this._doFlush();
this._pendingFlush = false;
}
if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) {
this.logger.info('TtsStreamingBuffer:_feedTokens TTS streaming buffer is no longer full');
this.logger.info('TtsStreamingBuffer throttling: TTS streaming buffer is no longer full - resuming');
this._isFull = false;
this.emit(TtsStreamingEvents.Resume);
}
@@ -219,6 +229,12 @@ class TtsStreamingBuffer extends Emitter {
this.emit(TtsStreamingEvents.ConnectFailure, {vendor});
}
_doFlush() {
this._api(this.ep, [this.ep.uuid, 'flush'])
.catch((err) => this.logger.info({err},
`TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`));
}
async _onConnect(vendor) {
this.logger.info(`streaming tts connection made to ${vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Connected;