Compare commits

..

12 Commits

Author SHA1 Message Date
Dave Horton
ee54e4341a update drachtio-srf 2025-02-20 10:17:53 -05:00
Hoan Luu Huu
4bf2f42f33 support ultravox sends createCall response to app (#1091)
* support ultravox sends createCall response to app

* update type issue

Co-authored-by: Matt Hertogs <matthertogs@gmail.com>

---------

Co-authored-by: Matt Hertogs <matthertogs@gmail.com>
2025-02-20 07:07:03 -05:00
Dave Horton
e09c763d3a #1088 ignore UtteranceEnd if we have unprocessed words (#1089)
* #1088 ignore UtteranceEnd if we have unprocessed words

* wip
2025-02-18 16:30:59 -05:00
Dave Horton
e8a7366526 handle exceptions if we invoke _lccCallHook with new url and it rejects for some reason (#1087) 2025-02-18 13:03:34 -05:00
Dave Horton
122d267816 better handling of flush commands (#1081)
* better handling of flush commands

* rework buffering of tokens

* gather: when returning low confidence also provide the transcript

* better error handling in tts:tokens

* special handling of asr timeout for speechmatics

* remove some logs that were excessively wordy
2025-02-18 09:31:11 -05:00
Hoan Luu Huu
33bca8e67c tts stream should save tts.response_time metric (#1086)
* tts stream should save tts.response_time metric

* wip
2025-02-18 08:45:21 -05:00
Hoan Luu Huu
9c05fd3deb fix dialMusic keep running in infinity loop (#1085) 2025-02-18 07:08:19 -05:00
Hoan Luu Huu
7fa0041f6b support deepgram options noDelay (#1083)
* support deepgram options noDelay

* update verb specification version
2025-02-15 16:39:30 -05:00
Hoan Luu Huu
59d9c62cbe support create call with target.proxy (#1075) 2025-02-11 09:24:04 -05:00
Dave Horton
55b408eecb add support for deepgram keyterms (#1071) 2025-02-07 13:12:25 -05:00
Hoan Luu Huu
f241faa871 update speech utils version (#1070) 2025-02-07 08:00:33 -05:00
rammohan-y
65d35c893c Feat/1067 set default language if language is undefined (#1068)
* sending recognition mode channel variable

* change verb-specifications version

* feat/1067 - setting default language to previously set language for the recognizer object if the vendor is default

* added undefined check for fallbackVendor and fallbackLanguage
2025-02-06 08:06:56 -05:00
11 changed files with 380 additions and 202 deletions

View File

@@ -100,6 +100,7 @@ router.post('/',
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat}),
...(target.proxy && {'X-SIP-Proxy': target.proxy}),
...target.headers
};

View File

@@ -1799,62 +1799,66 @@ Duration=${duration} `
async updateCall(opts, callSid) {
this.logger.debug(opts, 'CallSession:updateCall');
if (opts.call_status) {
return this._lccCallStatus(opts);
}
if (opts.call_hook || opts.child_call_hook) {
return await this._lccCallHook(opts);
}
if (opts.listen_status || opts.stream_status) {
await this._lccListenStatus(opts);
}
if (opts.transcribe_status) {
await this._lccTranscribeStatus(opts);
}
else if (opts.mute_status) {
await this._lccMuteStatus(opts.mute_status === 'mute', callSid);
}
else if (opts.conf_hold_status) {
await this._lccConfHoldStatus(opts);
}
else if (opts.conf_mute_status) {
await this._lccConfMuteStatus(opts);
}
else if (opts.sip_request) {
const res = await this._lccSipRequest(opts, callSid);
return {status: res.status, reason: res.reason};
} else if (opts.dtmf) {
await this._lccDtmf(opts, callSid);
}
else if (opts.record) {
await this.notifyRecordOptions(opts.record);
}
else if (opts.tag) {
return this._lccTag(opts);
}
else if (opts.conferenceParticipantAction) {
return this._lccConferenceParticipantAction(opts.conferenceParticipantAction);
}
else if (opts.dub) {
return this._lccDub(opts.dub, callSid);
}
else if (opts.boostAudioSignal) {
return this._lccBoostAudioSignal(opts, callSid);
}
else if (opts.media_path) {
return this._lccMediaPath(opts.media_path, callSid);
}
else if (opts.llm_tool_output) {
return this._lccToolOutput(opts.tool_call_id, opts.llm_tool_output, callSid);
}
else if (opts.llm_update) {
return this._lccLlmUpdate(opts.llm_update, callSid);
}
try {
if (opts.call_status) {
return this._lccCallStatus(opts);
}
if (opts.call_hook || opts.child_call_hook) {
return await this._lccCallHook(opts);
}
if (opts.listen_status || opts.stream_status) {
await this._lccListenStatus(opts);
}
if (opts.transcribe_status) {
await this._lccTranscribeStatus(opts);
}
else if (opts.mute_status) {
await this._lccMuteStatus(opts.mute_status === 'mute', callSid);
}
else if (opts.conf_hold_status) {
await this._lccConfHoldStatus(opts);
}
else if (opts.conf_mute_status) {
await this._lccConfMuteStatus(opts);
}
else if (opts.sip_request) {
const res = await this._lccSipRequest(opts, callSid);
return {status: res.status, reason: res.reason};
} else if (opts.dtmf) {
await this._lccDtmf(opts, callSid);
}
else if (opts.record) {
await this.notifyRecordOptions(opts.record);
}
else if (opts.tag) {
return this._lccTag(opts);
}
else if (opts.conferenceParticipantAction) {
return this._lccConferenceParticipantAction(opts.conferenceParticipantAction);
}
else if (opts.dub) {
return this._lccDub(opts.dub, callSid);
}
else if (opts.boostAudioSignal) {
return this._lccBoostAudioSignal(opts, callSid);
}
else if (opts.media_path) {
return this._lccMediaPath(opts.media_path, callSid);
}
else if (opts.llm_tool_output) {
return this._lccToolOutput(opts.tool_call_id, opts.llm_tool_output, callSid);
}
else if (opts.llm_update) {
return this._lccLlmUpdate(opts.llm_update, callSid);
}
// whisper may be the only thing we are asked to do, or it may that
// we are doing a whisper after having muted, paused recording etc..
if (opts.whisper) {
return this._lccWhisper(opts, callSid);
// whisper may be the only thing we are asked to do, or it may that
// we are doing a whisper after having muted, paused recording etc..
if (opts.whisper) {
return this._lccWhisper(opts, callSid);
}
} catch (err) {
this.logger.info({err, opts, callSid}, 'CallSession:updateCall - error updating call');
}
}

View File

@@ -187,18 +187,20 @@ class TaskConfig extends Task {
: cs.speechRecognizerVendor;
cs.speechRecognizerLabel = this.recognizer.label === 'default'
? cs.speechRecognizerLabel : this.recognizer.label;
cs.speechRecognizerLanguage = this.recognizer.language !== 'default'
cs.speechRecognizerLanguage = this.recognizer.language !== undefined && this.recognizer.language !== 'default'
? this.recognizer.language
: cs.speechRecognizerLanguage;
//fallback
cs.fallbackSpeechRecognizerVendor = this.recognizer.fallbackVendor !== 'default'
cs.fallbackSpeechRecognizerVendor = this.recognizer.fallbackVendor !== undefined &&
this.recognizer.fallbackVendor !== 'default'
? this.recognizer.fallbackVendor
: cs.fallbackSpeechRecognizerVendor;
cs.fallbackSpeechRecognizerLabel = this.recognizer.fallbackLabel === 'default' ?
cs.fallbackSpeechRecognizerLabel :
this.recognizer.fallbackLabel;
cs.fallbackSpeechRecognizerLanguage = this.recognizer.fallbackLanguage !== 'default'
cs.fallbackSpeechRecognizerLanguage = this.recognizer.fallbackLanguage !== undefined &&
this.recognizer.fallbackLanguage !== 'default'
? this.recognizer.fallbackLanguage
: cs.fallbackSpeechRecognizerLanguage;

View File

@@ -230,10 +230,10 @@ class TaskDial extends Task {
try {
await this.epOther.play(this.dialMusic);
} catch (err) {
this.logger.error(err, `TaskDial:exec error playing ${this.dialMusic}`);
this.logger.error(err, `TaskDial:exec error playing dialMusic ${this.dialMusic}`);
await sleepFor(1000);
}
} while (!this.killed || !this.bridged);
} while (!this.killed && !this.bridged && this._mediaPath === MediaPath.FullMedia);
})();
}
}

View File

@@ -636,6 +636,13 @@ class TaskGather extends SttTask {
this._asrTimer = setTimeout(() => {
this.logger.debug('_startAsrTimer - asr timer went off');
const evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language, this.vendor);
/* special case for speechmatics - keep listening if we dont have any transcripts */
if (this.vendor === 'speechmatics' && this._bufferedTranscripts.length === 0) {
this.logger.debug('Gather:_startAsrTimer - speechmatics, no transcripts yet, keep listening');
this._startAsrTimer();
return;
}
this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout', evt);
}, this.asrTimeout);
this.logger.debug(`_startAsrTimer: set for ${this.asrTimeout}ms`);
@@ -778,10 +785,16 @@ class TaskGather extends SttTask {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram but no buffered transcripts');
}
else {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram, return buffered transcript');
evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language, this.vendor);
this._bufferedTranscripts = [];
this._resolve('speech', evt);
const utteranceTime = evt.last_word_end;
if (utteranceTime && this._dgTimeOfLastUnprocessedWord && utteranceTime < this._dgTimeOfLastUnprocessedWord) {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd with unprocessed words, continue listening');
}
else {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd from deepgram, return buffered transcript');
evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language, this.vendor);
this._bufferedTranscripts = [];
this._resolve('speech', evt);
}
}
return;
}
@@ -792,7 +805,7 @@ class TaskGather extends SttTask {
evt = this.normalizeTranscription(evt, this.vendor, 1, this.language,
this.shortUtterance, this.data.recognizer.punctuation);
this.logger.debug({evt, bugname, finished, vendor: this.vendor}, 'Gather:_onTranscription normalized transcript');
//this.logger.debug({evt, bugname, finished, vendor: this.vendor}, 'Gather:_onTranscription normalized transcript');
if (evt.alternatives.length === 0) {
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, continue listening');
@@ -800,8 +813,6 @@ class TaskGather extends SttTask {
}
const confidence = evt.alternatives[0].confidence;
const minConfidence = this.data.recognizer?.minConfidence;
this.logger.debug({evt},
`TaskGather:_onTranscription - confidence (${confidence}), minConfidence (${minConfidence})`);
if (confidence && minConfidence && confidence < minConfidence) {
this.logger.info({evt},
'TaskGather:_onTranscription - Transcript confidence ' +
@@ -917,8 +928,21 @@ class TaskGather extends SttTask {
if (originalEvent.is_final && evt.alternatives[0].transcript !== '') {
this.logger.debug({evt}, 'Gather:_onTranscription - buffering a completed (partial) deepgram transcript');
this._bufferedTranscripts.push(evt);
this._dgTimeOfLastUnprocessedWord = null;
}
if (evt.alternatives[0].transcript === '') {
emptyTranscript = true;
}
else if (!originalEvent.is_final) {
/* Deepgram: we have unprocessed words-save last word end time so we can later compare to UtteranceEnd */
const words = originalEvent.channel.alternatives[0].words;
if (words?.length > 0) {
this._dgTimeOfLastUnprocessedWord = words.slice(-1)[0].end;
this.logger.debug(
`TaskGather:_onTranscription - saving word end time: ${this._dgTimeOfLastUnprocessedWord}`);
}
}
if (evt.alternatives[0].transcript === '') emptyTranscript = true;
}
if (!emptyTranscript) {
@@ -1188,7 +1212,7 @@ class TaskGather extends SttTask {
if (this.parentTask) this.parentTask.emit('stt-low-confidence', evt);
else {
this.emit('stt-low-confidence', evt);
returnedVerbs = await this.performAction({reason: 'stt-low-confidence'});
returnedVerbs = await this.performAction({speech:evt, reason: 'stt-low-confidence'});
}
}
} catch (err) { /*already logged error*/ }

View File

@@ -4,6 +4,7 @@ const {request} = require('undici');
const {LlmEvents_Ultravox} = require('../../../utils/constants');
const ultravox_server_events = [
'createCall',
'pong',
'state',
'transcript',
@@ -88,7 +89,7 @@ class TaskLlmUltravox_S2S extends Task {
throw new Error(`Ultravox Error registering call: ${data.message}`);
}
this.logger.info({joinUrl: data.joinUrl}, 'Ultravox Call registered');
return data.joinUrl;
return data;
}
_unregisterHandlers() {
@@ -105,13 +106,19 @@ class TaskLlmUltravox_S2S extends Task {
async _startListening(cs, ep) {
this._registerHandlers(ep);
const joinUrl = await this.createCall();
const data = await this.createCall();
const {joinUrl} = data;
// split the joinUrl into host and path
const {host, pathname, search} = new URL(joinUrl);
try {
const args = [ep.uuid, 'session.create', host, pathname + search];
await this._api(ep, args);
// Notify the application that the session has been created with detail information
this._sendLlmEvent('createCall', {
type: 'createCall',
...data
});
} catch (err) {
this.logger.error({err}, 'TaskLlmUltraVox_S2S:_startListening');
this.notifyTaskDone();
@@ -190,11 +197,7 @@ class TaskLlmUltravox_S2S extends Task {
}
}
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:_onServerEvent - error sending event hook'));
}
this._sendLlmEvent(type, evt);
if (endConversation) {
this.logger.info({results: this.results},
@@ -203,6 +206,14 @@ class TaskLlmUltravox_S2S extends Task {
}
}
_sendLlmEvent(type, evt) {
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:_onServerEvent - error sending event hook'));
}
}
async processToolOutput(ep, tool_call_id, data) {
try {
this.logger.debug({tool_call_id, data}, 'TaskLlmUltravox_S2S:processToolOutput');

View File

@@ -213,7 +213,7 @@ class TaskSay extends TtsTask {
ep.once('playback-start', (evt) => {
this.logger.debug({evt}, 'Say got playback-start');
if (this.otelSpan) {
this._addStreamingTtsAttributes(this.otelSpan, evt);
this._addStreamingTtsAttributes(this.otelSpan, evt, vendor);
this.otelSpan.end();
this.otelSpan = null;
if (evt.variable_tts_cache_filename) {
@@ -308,7 +308,7 @@ class TaskSay extends TtsTask {
this.notifyTaskDone();
}
_addStreamingTtsAttributes(span, evt) {
_addStreamingTtsAttributes(span, evt, vendor) {
const attrs = {'tts.cached': false};
for (const [key, value] of Object.entries(evt)) {
if (key.startsWith('variable_tts_')) {
@@ -322,6 +322,9 @@ class TaskSay extends TtsTask {
.replace('elevenlabs_', 'elevenlabs.');
if (spanMapping[newKey]) newKey = spanMapping[newKey];
attrs[newKey] = value;
if (key === 'variable_tts_time_to_first_byte_ms' && value) {
this.cs.srf.locals.stats.histogram('tts.response_time', value, [`vendor:${vendor}`]);
}
}
}
delete attrs['cache_filename']; //no value in adding this to the span

View File

@@ -30,6 +30,7 @@ const stickyVars = {
'DEEPGRAM_SPEECH_TIER',
'DEEPGRAM_SPEECH_MODEL',
'DEEPGRAM_SPEECH_ENABLE_SMART_FORMAT',
'DEEPGRAM_SPEECH_ENABLE_NO_DELAY',
'DEEPGRAM_SPEECH_ENABLE_AUTOMATIC_PUNCTUATION',
'DEEPGRAM_SPEECH_PROFANITY_FILTER',
'DEEPGRAM_SPEECH_REDACT',
@@ -44,7 +45,8 @@ const stickyVars = {
'DEEPGRAM_SPEECH_VAD_TURNOFF',
'DEEPGRAM_SPEECH_TAG',
'DEEPGRAM_SPEECH_MODEL_VERSION',
'DEEPGRAM_SPEECH_FILLER_WORDS'
'DEEPGRAM_SPEECH_FILLER_WORDS',
'DEEPGRAM_SPEECH_KEYTERMS',
],
aws: [
'AWS_VOCABULARY_NAME',
@@ -804,6 +806,8 @@ module.exports = (logger) => {
{DEEPGRAM_SPEECH_ENABLE_AUTOMATIC_PUNCTUATION: 1},
...(deepgramOptions.smartFormatting) &&
{DEEPGRAM_SPEECH_ENABLE_SMART_FORMAT: 1},
...(deepgramOptions.noDelay) &&
{DEEPGRAM_SPEECH_ENABLE_NO_DELAY: 1},
...(deepgramOptions.profanityFilter) &&
{DEEPGRAM_SPEECH_PROFANITY_FILTER: 1},
...(deepgramOptions.redact) &&
@@ -841,7 +845,9 @@ module.exports = (logger) => {
...(deepgramOptions.version) &&
{DEEPGRAM_SPEECH_MODEL_VERSION: deepgramOptions.version},
...(deepgramOptions.fillerWords) &&
{DEEPGRAM_SPEECH_FILLER_WORDS: deepgramOptions.fillerWords}
{DEEPGRAM_SPEECH_FILLER_WORDS: deepgramOptions.fillerWords},
...((Array.isArray(deepgramOptions.keyterms) && deepgramOptions.keyterms.length > 0) &&
{DEEPGRAM_SPEECH_KEYTERMS: deepgramOptions.keyterms.join(',')})
};
}
else if ('soniox' === vendor) {

View File

@@ -4,37 +4,50 @@ const {
TtsStreamingEvents,
TtsStreamingConnectionStatus
} = require('../utils/constants');
const MAX_CHUNK_SIZE = 1800;
const HIGH_WATER_BUFFER_SIZE = 1000;
const LOW_WATER_BUFFER_SIZE = 200;
const TIMEOUT_RETRY_MSECS = 3000;
const isWhitespace = (str) => /^\s*$/.test(str);
/**
* Each queue item is an object:
* - { type: 'text', value: '…' } for text tokens.
* - { type: 'flush' } for a flush command.
*/
class TtsStreamingBuffer extends Emitter {
constructor(cs) {
super();
this.cs = cs;
this.logger = cs.logger;
this.tokens = '';
// Use an array to hold our structured items.
this.queue = [];
// Track total number of characters in text items.
this.bufferedLength = 0;
this.eventHandlers = [];
this._isFull = false;
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
this._flushPending = false;
this.timer = null;
// Record the last time the text buffer was updated.
this.lastUpdateTime = 0;
}
get isEmpty() {
return this.tokens.length === 0;
return this.queue.length === 0;
}
get size() {
return this.bufferedLength;
}
get isFull() {
return this._isFull;
}
get size() {
return this.tokens.length;
}
get ep() {
return this.cs?.ep;
}
@@ -42,7 +55,8 @@ class TtsStreamingBuffer extends Emitter {
async start() {
assert.ok(
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected,
'TtsStreamingBuffer:start already started, or has failed');
'TtsStreamingBuffer:start already started, or has failed'
);
this.vendor = this.cs.getTsStreamingVendor();
if (!this.vendor) {
@@ -55,9 +69,9 @@ class TtsStreamingBuffer extends Emitter {
this._connectionStatus = TtsStreamingConnectionStatus.Connecting;
try {
if (this.eventHandlers.length === 0) this._initHandlers(this.ep);
await this._api(this.ep, [this.ep.uuid, 'connect']);
await this._api(this.ep, [this.ep.uuid, 'connect']);
} catch (err) {
this.logger.info({err}, 'TtsStreamingBuffer:start Error connecting to TTS streaming');
this.logger.info({ err }, 'TtsStreamingBuffer:start Error connecting to TTS streaming');
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
}
}
@@ -67,204 +81,319 @@ class TtsStreamingBuffer extends Emitter {
this.removeCustomEventListeners();
if (this.ep) {
this._api(this.ep, [this.ep.uuid, 'close'])
.catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming'));
.catch((err) =>
this.logger.info({ err }, 'TtsStreamingBuffer:stop Error closing TTS streaming')
);
}
this.timer = null;
this.tokens = '';
this.queue = [];
this.bufferedLength = 0;
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
}
/**
* Add tokens to the buffer and start feeding them to the endpoint if necessary.
* Buffer new text tokens.
*/
async bufferTokens(tokens) {
if (this._connectionStatus === TtsStreamingConnectionStatus.Failed) {
this.logger.info('TtsStreamingBuffer:bufferTokens TTS streaming connection failed, rejecting request');
return {status: 'failed', reason: `connection to ${this.vendor} failed`};
return { status: 'failed', reason: `connection to ${this.vendor} failed` };
}
if (0 === this.bufferedLength && isWhitespace(tokens)) {
this.logger.debug({tokens}, 'TtsStreamingBuffer:bufferTokens discarded whitespace tokens');
return { status: 'ok' };
}
const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40);
const totalLength = tokens.length;
/* if we crossed the high water mark, reject the request */
if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) {
if (this.bufferedLength + totalLength > HIGH_WATER_BUFFER_SIZE) {
this.logger.info(
`TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`);
`TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`
);
if (!this._isFull) {
this._isFull = true;
this.emit(TtsStreamingEvents.Pause);
}
return {status: 'failed', reason: 'full'};
return { status: 'failed', reason: 'full' };
}
this.logger.debug(
`TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${this.isEmpty}`
`TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength})`
);
this.tokens += (tokens || '');
this.queue.push({ type: 'text', value: tokens });
this.bufferedLength += totalLength;
// Update the last update time each time new text is buffered.
this.lastUpdateTime = Date.now();
await this._feedTokens();
return {status: 'ok'};
await this._feedQueue();
return { status: 'ok' };
}
/**
* Insert a flush command. If no text is queued, flush immediately.
* Otherwise, append a flush marker so that all text preceding it will be sent
* (regardless of sentence boundaries) before the flush is issued.
*/
flush() {
this.logger.debug('TtsStreamingBuffer:flush');
if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) {
this.logger.debug('TtsStreamingBuffer:flush TTS stream is not quite ready - wait for connect');
this._flushPending = true;
if (this.queue.length === 0 || this.queue[this.queue.length - 1].type !== 'flush') {
this.queue.push({ type: 'flush' });
}
return;
}
else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) {
if (this.size === 0) {
if (this.isEmpty) {
this._doFlush();
}
else {
/* we have tokens queued, so flush after they have been sent */
this._pendingFlush = true;
if (this.queue[this.queue.length - 1].type !== 'flush') {
this.queue.push({ type: 'flush' });
this.logger.debug('TtsStreamingBuffer:flush added flush marker to queue');
}
}
}
else {
this.logger.debug(
`TtsStreamingBuffer:flush TTS stream is not connected, status: ${this._connectionStatus}`
);
}
}
clear() {
this.logger.debug('TtsStreamingBuffer:clear');
if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) return;
clearTimeout(this.timer);
this._api(this.ep, [this.ep.uuid, 'clear'])
.catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:clear Error clearing TTS streaming'));
this.tokens = '';
this._api(this.ep, [this.ep.uuid, 'clear']).catch((err) =>
this.logger.info({ err }, 'TtsStreamingBuffer:clear Error clearing TTS streaming')
);
this.queue = [];
this.bufferedLength = 0;
this.timer = null;
this._isFull = false;
}
/**
* Send tokens to the TTS engine in sentence chunks for best playout
* Process the queue in two phases.
*
* Phase 1: Look for flush markers. When a flush marker is found (even if not at the very front),
* send all text tokens that came before it immediately (ignoring sentence boundaries)
* and then send the flush command. Repeat until there are no flush markers left.
*
* Phase 2: With the remaining queue (now containing only text items), accumulate text
* up to MAX_CHUNK_SIZE and use sentence-boundary logic to determine a chunk.
* Then, remove the exact tokens (or portions thereof) that were consumed.
*/
async _feedTokens(handlingTimeout = false) {
this.logger.debug({tokens: this.tokens}, '_feedTokens');
async _feedQueue(handlingTimeout = false) {
this.logger.debug({ queue: this.queue }, 'TtsStreamingBuffer:_feedQueue');
try {
/* are we in a state where we can feed tokens to the TTS? */
if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) {
this.logger.debug('TTS stream is not open or no tokens to send');
return this.tokens?.length || 0;
if (!this.cs.isTtsStreamOpen || !this.ep) {
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not open or no endpoint available');
return;
}
if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected ||
this._connectionStatus === TtsStreamingConnectionStatus.Failed) {
this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not connected');
if (
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected ||
this._connectionStatus === TtsStreamingConnectionStatus.Failed
) {
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not connected');
return;
}
if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) {
this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect');
// --- Phase 1: Process flush markers ---
// Process any flush marker that isnt in the very first position.
let flushIndex = this.queue.findIndex((item, idx) => item.type === 'flush' && idx > 0);
while (flushIndex !== -1) {
let flushText = '';
// Accumulate all text tokens preceding the flush marker.
for (let i = 0; i < flushIndex; i++) {
if (this.queue[i].type === 'text') {
flushText += this.queue[i].value;
}
}
// Remove those text items.
for (let i = 0; i < flushIndex; i++) {
const item = this.queue.shift();
if (item.type === 'text') {
this.bufferedLength -= item.value.length;
}
}
// Remove the flush marker (now at the front).
if (this.queue.length > 0 && this.queue[0].type === 'flush') {
this.queue.shift();
}
// Immediately send all accumulated text (ignoring sentence boundaries).
if (flushText.length > 0) {
const modifiedFlushText = flushText.replace(/\n\n/g, '\n \n');
try {
await this._api(this.ep, [this.ep.uuid, 'send', modifiedFlushText]);
} catch (err) {
this.logger.info({ err, flushText }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
}
}
// Send the flush command.
await this._doFlush();
flushIndex = this.queue.findIndex((item, idx) => item.type === 'flush' && idx > 0);
}
// If a flush marker is at the very front, process it.
while (this.queue.length > 0 && this.queue[0].type === 'flush') {
this.queue.shift();
await this._doFlush();
}
// --- Phase 2: Process remaining text tokens ---
if (this.queue.length === 0) {
this._removeTimer();
return;
}
/* must send at least one sentence */
const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length);
let chunkEnd = findSentenceBoundary(this.tokens, limit);
// Accumulate contiguous text tokens (from the front) up to MAX_CHUNK_SIZE.
let combinedText = '';
for (const item of this.queue) {
if (item.type !== 'text') break;
combinedText += item.value;
if (combinedText.length >= MAX_CHUNK_SIZE) break;
}
if (combinedText.length === 0) {
this._removeTimer();
return;
}
const limit = Math.min(MAX_CHUNK_SIZE, combinedText.length);
let chunkEnd = findSentenceBoundary(combinedText, limit);
if (chunkEnd <= 0) {
if (handlingTimeout) {
/* on a timeout we've left some tokens sitting around, so be more aggressive now in sending them */
chunkEnd = findWordBoundary(this.tokens, limit);
chunkEnd = findWordBoundary(combinedText, limit);
if (chunkEnd <= 0) {
this.logger.debug('TtsStreamingBuffer:_feedTokens: no word boundary found');
this._setTimerIfNeeded();
return;
}
}
else {
/* if we just received tokens, we wont send unless we have at least a full sentence */
this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found');
} else {
this._setTimerIfNeeded();
return;
}
}
const chunk = combinedText.slice(0, chunkEnd);
const chunk = this.tokens.slice(0, chunkEnd);
this.tokens = this.tokens.slice(chunkEnd);
// Now we iterate over the queue items
// and deduct their lengths until we've accounted for chunkEnd characters.
let remaining = chunkEnd;
let tokensProcessed = 0;
for (let i = 0; i < this.queue.length; i++) {
const token = this.queue[i];
if (token.type !== 'text') break;
if (remaining >= token.value.length) {
remaining -= token.value.length;
tokensProcessed = i + 1;
} else {
// Partially consumed token: update its value to remove the consumed part.
token.value = token.value.slice(remaining);
tokensProcessed = i;
remaining = 0;
break;
}
}
// Remove the fully consumed tokens from the front of the queue.
this.queue.splice(0, tokensProcessed);
this.bufferedLength -= chunkEnd;
/* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */
const modifiedChunk = chunk.replace(/\n\n/g, '\n \n');
await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]);
this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`);
this.logger.debug(`TtsStreamingBuffer:_feedQueue sending chunk to tts: ${modifiedChunk}`);
if (this._pendingFlush) {
this._doFlush();
this._pendingFlush = false;
try {
await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]);
} catch (err) {
this.logger.info({ err, chunk }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
}
if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) {
this.logger.info('TtsStreamingBuffer throttling: TTS streaming buffer is no longer full - resuming');
if (this._isFull && this.bufferedLength <= LOW_WATER_BUFFER_SIZE) {
this.logger.info('TtsStreamingBuffer throttling: buffer is no longer full - resuming');
this._isFull = false;
this.emit(TtsStreamingEvents.Resume);
}
} catch (err) {
this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk');
this.tokens = '';
}
return;
return this._feedQueue();
} catch (err) {
this.logger.info({ err }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
this.queue = [];
this.bufferedLength = 0;
}
}
async _api(ep, args) {
const apiCmd = `uuid_${this.vendor.startsWith('custom:') ? 'custom' : this.vendor}_tts_streaming`;
const res = await ep.api(apiCmd, `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
this.logger.info({args}, `Error calling ${apiCmd}: ${res.body}`);
this.logger.info({ args }, `Error calling ${apiCmd}: ${res.body}`);
throw new Error(`Error calling ${apiCmd}: ${res.body}`);
}
}
_onConnectFailure(vendor) {
this.logger.info(`streaming tts connection failed to ${vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
this.tokens = '';
this.emit(TtsStreamingEvents.ConnectFailure, {vendor});
}
_doFlush() {
this._api(this.ep, [this.ep.uuid, 'flush'])
.catch((err) => this.logger.info({err},
`TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`));
return this._api(this.ep, [this.ep.uuid, 'flush'])
.then(() => this.logger.debug('TtsStreamingBuffer:_doFlush sent flush command'))
.catch((err) =>
this.logger.info(
{ err },
`TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`
)
);
}
async _onConnect(vendor) {
this.logger.info(`streaming tts connection made to ${vendor}`);
this.logger.info(`TtsStreamingBuffer:_onConnect streaming tts connection made to ${vendor} successful`);
this._connectionStatus = TtsStreamingConnectionStatus.Connected;
if (this.tokens.length > 0) {
await this._feedTokens();
}
if (this._flushPending) {
this.flush();
this._flushPending = false;
if (this.queue.length > 0) {
await this._feedQueue();
}
}
_onConnectFailure(vendor) {
this.logger.info(`TtsStreamingBuffer:_onConnectFailure streaming tts connection failed to ${vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
this.queue = [];
this.bufferedLength = 0;
this.emit(TtsStreamingEvents.ConnectFailure, { vendor });
}
_setTimerIfNeeded() {
if (this.tokens.length > 0 && !this.timer) {
if (this.bufferedLength > 0 && !this.timer) {
this.logger.debug({queue: this.queue},
`TtsStreamingBuffer:_setTimerIfNeeded setting timer because ${this.bufferedLength} buffered`);
this.timer = setTimeout(this._onTimeout.bind(this), TIMEOUT_RETRY_MSECS);
}
}
_removeTimer() {
if (this.timer) {
this.logger.debug('TtsStreamingBuffer:_removeTimer clearing timer');
clearTimeout(this.timer);
this.timer = null;
}
}
_onTimeout() {
this.logger.info('TtsStreamingBuffer:_onTimeout');
this.logger.debug('TtsStreamingBuffer:_onTimeout Timeout waiting for sentence boundary');
// Check if new text has been added since the timer was set.
const now = Date.now();
if (now - this.lastUpdateTime < TIMEOUT_RETRY_MSECS) {
this.logger.debug('TtsStreamingBuffer:_onTimeout New text received recently; postponing flush.');
this._setTimerIfNeeded();
return;
}
this.timer = null;
this._feedTokens(true);
this._feedQueue(true);
}
_onTtsEmpty(vendor) {
this.emit(TtsStreamingEvents.Empty, {vendor});
this.emit(TtsStreamingEvents.Empty, { vendor });
}
addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
this.eventHandlers.push({ ep, event, handler });
ep.addCustomEventListener(event, handler);
}
@@ -274,7 +403,6 @@ class TtsStreamingBuffer extends Emitter {
_initHandlers(ep) {
[
// DH: add other vendors here as modules are added
'deepgram',
'cartesia',
'elevenlabs',
@@ -293,23 +421,21 @@ class TtsStreamingBuffer extends Emitter {
}
const findSentenceBoundary = (text, limit) => {
// Match traditional sentence boundaries or double newlines
// Look for punctuation or double newline that signals sentence end.
const sentenceEndRegex = /[.!?](?=\s|$)|\n\n/g;
let lastSentenceBoundary = -1;
let match;
while ((match = sentenceEndRegex.exec(text)) && match.index < limit) {
const precedingText = text.slice(0, match.index).trim(); // Extract text before the match and trim whitespace
if (precedingText.length > 0) { // Check if there's actual content
const precedingText = text.slice(0, match.index).trim();
if (precedingText.length > 0) {
if (
match[0] === '\n\n' || // It's a double newline
(match.index === 0 || !/\d$/.test(text[match.index - 1])) // Standard punctuation rules
match[0] === '\n\n' ||
(match.index === 0 || !/\d$/.test(text[match.index - 1]))
) {
lastSentenceBoundary = match.index + (match[0] === '\n\n' ? 2 : 1); // Include the boundary
lastSentenceBoundary = match.index + (match[0] === '\n\n' ? 2 : 1);
}
}
}
return lastSentenceBoundary;
};
@@ -317,7 +443,6 @@ const findWordBoundary = (text, limit) => {
const wordBoundaryRegex = /\s+/g;
let lastWordBoundary = -1;
let match;
while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) {
lastWordBoundary = match.index;
}

24
package-lock.json generated
View File

@@ -15,10 +15,10 @@
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.8",
"@jambonz/speech-utils": "^0.2.2",
"@jambonz/speech-utils": "^0.2.3",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.13",
"@jambonz/verb-specifications": "^0.0.95",
"@jambonz/verb-specifications": "^0.0.97",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.50.0",
@@ -32,7 +32,7 @@
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^4.0.1",
"drachtio-srf": "^5.0.1",
"drachtio-srf": "^5.0.2",
"express": "^4.19.2",
"express-validator": "^7.0.1",
"moment": "^2.30.1",
@@ -1512,9 +1512,9 @@
}
},
"node_modules/@jambonz/speech-utils": {
"version": "0.2.2",
"resolved": "https://registry.npmjs.org/@jambonz/speech-utils/-/speech-utils-0.2.2.tgz",
"integrity": "sha512-+O+5Ej6RhQZjbZLRbJSA4UT1Es2JcDSDJT24kGRSVWCf8SuG5B3TqKzZP0aaVA297I12b7ztNG9ShWjY0iR7Fg==",
"version": "0.2.3",
"resolved": "https://registry.npmjs.org/@jambonz/speech-utils/-/speech-utils-0.2.3.tgz",
"integrity": "sha512-ffx0TYggGDeEEB5yZ9y+BkF6dtDmKvCEgyK0lmzjkRQ2h3xaIWao3c5VCU9WXhKb+d2o6mLL06GebBMdzWIGhg==",
"license": "MIT",
"dependencies": {
"@aws-sdk/client-polly": "^3.496.0",
@@ -1550,9 +1550,10 @@
}
},
"node_modules/@jambonz/verb-specifications": {
"version": "0.0.95",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.95.tgz",
"integrity": "sha512-36JB/Dc6k2KFqPzaZjREKfF+2sHYkQeJPMQkqeT61V/WHqso+MpWAKxPhBIu2B1E+6D/VS2jUrHRRmahU4nIVQ==",
"version": "0.0.97",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.97.tgz",
"integrity": "sha512-CncykmCwc8YZcDYwFDq88n6IAyoQNae3lSF2BI5etoBKMujzxOty227lq6zgeXun9UYYDy/CONk5MiLO29kcBg==",
"license": "MIT",
"dependencies": {
"debug": "^4.3.4",
"pino": "^8.8.0"
@@ -3646,8 +3647,9 @@
}
},
"node_modules/drachtio-srf": {
"version": "5.0.1",
"license": "MIT",
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/drachtio-srf/-/drachtio-srf-5.0.2.tgz",
"integrity": "sha512-tM4TVNoC3IpdmpNn2gnuIp5AzNF6Ik6rvRTFjmQ25/W4gb4eVzK8cCYntc00rtbENI4HHmrX4Ep+/T+ZVnoTDw==",
"dependencies": {
"debug": "^3.2.7",
"delegates": "^0.1.0",

View File

@@ -31,9 +31,9 @@
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.8",
"@jambonz/speech-utils": "^0.2.2",
"@jambonz/speech-utils": "^0.2.3",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/verb-specifications": "^0.0.95",
"@jambonz/verb-specifications": "^0.0.97",
"@jambonz/time-series": "^0.2.13",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
@@ -48,7 +48,7 @@
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^4.0.1",
"drachtio-srf": "^5.0.1",
"drachtio-srf": "^5.0.2",
"express": "^4.19.2",
"express-validator": "^7.0.1",
"moment": "^2.30.1",