diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index e05c66f6..1d70220b 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -20,16 +20,6 @@ const makeTask = require('./make_task'); const assert = require('assert'); const SttTask = require('./stt-task'); -const compileTranscripts = (logger, evt, arr) => { - if (!Array.isArray(arr) || arr.length === 0) return; - let t = ''; - for (const a of arr) { - t += ` ${a.alternatives[0].transcript}`; - } - t += ` ${evt.alternatives[0].transcript}`; - evt.alternatives[0].transcript = t.trim(); -}; - class TaskGather extends SttTask { constructor(logger, opts, parentTask) { super(logger, opts, parentTask); @@ -51,8 +41,10 @@ class TaskGather extends SttTask { /* continuous ASR (i.e. compile transcripts until a special timeout or dtmf key) */ this.asrTimeout = typeof this.data.recognizer.asrTimeout === 'number' ? this.data.recognizer.asrTimeout * 1000 : 0; - if (this.asrTimeout > 0) this.asrDtmfTerminationDigit = this.data.recognizer.asrDtmfTerminationDigit; - this.isContinuousAsr = this.asrTimeout > 0; + if (this.asrTimeout > 0) { + this.isContinuousAsr = true; + this.asrDtmfTerminationDigit = this.data.recognizer.asrDtmfTerminationDigit; + } if (Array.isArray(this.data.recognizer.hints) && 0 == this.data.recognizer.hints.length && JAMBONES_GATHER_CLEAR_GLOBAL_HINTS_ON_EMPTY_HINTS) { @@ -351,6 +343,13 @@ class TaskGather extends SttTask { async _setSpeechHandlers(cs, ep) { if (this._speechHandlersSet) return; this._speechHandlersSet = true; + + /* some special deepgram logic */ + if (this.vendor === 'deepgram') { + if (this.isContinuousAsr) this._doContinuousAsrWithDeepgram(this.asrTimeout); + if (this.data.recognizer?.deepgramOptions?.shortUtterance) this.shortUtterance = true; + } + const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer); switch (this.vendor) { case 'google': @@ -396,6 +395,9 @@ class TaskGather extends SttTask { ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect, this._onDeepgramConnect.bind(this, cs, ep)); ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure, this._onDeepGramConnectFailure.bind(this, cs, ep)); + + /* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */ + if (opts.DEEPGRAM_SPEECH_UTTERANCE_END_MS) this.isContinuousAsr = true; break; case 'soniox': @@ -487,6 +489,12 @@ class TaskGather extends SttTask { interim: this.interim, bugname: this.bugname }, 'Gather:_startTranscribing'); + + /** + * Note: we don't need to ask deepgram for interim results, because they + * already send us words as they are finalized (is_final=true) even before + * the utterance is finalized (speech_final=true) + */ ep.startTranscription({ vendor: this.vendor, locale: this.language, @@ -522,11 +530,13 @@ class TaskGather extends SttTask { } _startAsrTimer() { + if (this.vendor === 'deepgram') return; // no need assert(this.isContinuousAsr); this._clearAsrTimer(); this._asrTimer = setTimeout(() => { this.logger.debug('_startAsrTimer - asr timer went off'); - this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout'); + const evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout', evt); }, this.asrTimeout); this.logger.debug(`_startAsrTimer: set for ${this.asrTimeout}ms`); } @@ -556,7 +566,8 @@ class TaskGather extends SttTask { this._clearFinalAsrTimer(); this._finalAsrTimer = setTimeout(() => { this.logger.debug('_startFinalAsrTimer - final asr timer went off'); - this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout'); + const evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout', evt); }, 1000); this.logger.debug('_startFinalAsrTimer: set for 1 second'); } @@ -595,11 +606,23 @@ class TaskGather extends SttTask { this.logger.debug({evt, bugname, finished}, `Gather:_onTranscription for vendor ${this.vendor}`); if (bugname && this.bugname !== bugname) return; - if (this.vendor === 'ibm') { - if (evt?.state === 'listening') return; + if (this.vendor === 'ibm' && evt?.state === 'listening') return; + + if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') { + /* we will only get this when we have set utterance_end_ms */ + if (this._bufferedTranscripts.length === 0) { + this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram but no buffered transcripts'); + } + else { + this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram, return buffered transcript'); + evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + this._bufferedTranscripts = []; + this._resolve('speech', evt); + } + return; } - evt = this.normalizeTranscription(evt, this.vendor, 1, this.language); + evt = this.normalizeTranscription(evt, this.vendor, 1, this.language, this.shortUtterance); if (evt.alternatives.length === 0) { this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, continue listening'); return; @@ -621,15 +644,27 @@ class TaskGather extends SttTask { const bufferedWords = this._sonioxTranscripts.length + this._bufferedTranscripts.reduce((count, e) => count + e.alternatives[0]?.transcript.split(' ').length, 0); + let emptyTranscript = false; if (evt.is_final) { if (evt.alternatives[0].transcript === '' && !this.callSession.callGone && !this.killed) { + emptyTranscript = true; if (finished === 'true' && ['microsoft', 'deepgram'].includes(this.vendor)) { this.logger.debug({evt}, 'TaskGather:_onTranscription - got empty transcript from old gather, disregarding'); + return; } - else { + else if (this.vendor !== 'deepgram') { this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, continue listening'); + return; + } + else if (this.isContinuousAsr) { + this.logger.info({evt}, + 'TaskGather:_onTranscription - got empty deepgram transcript during continous asr, continue listening'); + return; + } + else if (this.vendor === 'deepgram' && this._bufferedTranscripts.length > 0) { + this.logger.info({evt}, + 'TaskGather:_onTranscription - got empty transcript from deepgram, return the buffered transcripts'); } - return; } if (this.isContinuousAsr) { @@ -641,14 +676,14 @@ class TaskGather extends SttTask { this.logger.debug('TaskGather:_onTranscription - removing trailing punctuation'); evt.alternatives[0].transcript = t.slice(0, -1); } - else this.logger.debug({t}, 'TaskGather:_onTranscription - no trailing punctuation'); } this.logger.info({evt}, 'TaskGather:_onTranscription - got transcript during continous asr'); this._bufferedTranscripts.push(evt); this._clearTimer(); if (this._finalAsrTimer) { this._clearFinalAsrTimer(); - return this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout'); + const evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + return this._resolve(this._bufferedTranscripts.length > 0 ? 'speech' : 'timeout', evt); } this._startAsrTimer(); @@ -670,16 +705,25 @@ class TaskGather extends SttTask { evt = this.compileSonioxTranscripts(this._sonioxTranscripts, 1, this.language); this._sonioxTranscripts = []; } + else if (this.vendor === 'deepgram') { + /* compile transcripts into one */ + if (!emptyTranscript) this._bufferedTranscripts.push(evt); + if (this.data.recognizer?.deepgramOptions?.utteranceEndMs) { + this.logger.debug('TaskGather:_onTranscription - got speech_final waiting for UtteranceEnd event'); + return; + } + this.logger.debug({evt}, 'TaskGather:_onTranscription - compiling deepgram transcripts'); + evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + this._bufferedTranscripts = []; + this.logger.debug({evt}, 'TaskGather:_onTranscription - compiled deepgram transcripts'); + } + + /* here is where we return a final transcript */ this._resolve('speech', evt); } } } else { - /* google has a measure of stability: - https://cloud.google.com/speech-to-text/docs/basics#streaming_responses - others do not. - */ - //const isStableEnough = typeof evt.stability === 'undefined' || evt.stability > GATHER_STABILITY_THRESHOLD; this._clearTimer(); this._startTimer(); if (this.bargein && (words + bufferedWords) >= this.minBargeinWordCount) { @@ -705,6 +749,14 @@ class TaskGather extends SttTask { this._sonioxTranscripts.push(evt.vendor.finalWords); } } + /* deepgram can send a non-final transcript but with words that are final, so we need to buffer */ + if (this.vendor === 'deepgram') { + const originalEvent = evt.vendor.evt; + if (originalEvent.is_final && evt.alternatives[0].transcript !== '') { + this.logger.debug({evt}, 'Gather:_onTranscription - buffering a completed (partial) deepgram transcript'); + this._bufferedTranscripts.push(evt); + } + } } } _onEndOfUtterance(cs, ep) { @@ -719,7 +771,7 @@ class TaskGather extends SttTask { * getting a transcription. This can happen if someone coughs or mumbles. * For that reason don't ask for a single utterance and we'll terminate the transcribe operation * once we get a final transcript. - * However, if the usr has specified a singleUtterance, then we need to restart here + * However, if the user has specified a singleUtterance, then we need to restart here * since we dont have a final transcript yet. */ if (!this.resolved && !this.killed && !this._bufferedTranscripts.length && this.wantsSingleUtterance) { @@ -858,18 +910,6 @@ class TaskGather extends SttTask { this._clearTimer(); this._clearFastRecognitionTimer(); - if (this.isContinuousAsr && reason.startsWith('speech')) { - evt = { - is_final: true, - transcripts: this._bufferedTranscripts - }; - this.logger.debug({evt}, 'TaskGather:resolve continuous asr'); - } - else if (!this.isContinuousAsr && reason.startsWith('speech') && this._bufferedTranscripts.length) { - compileTranscripts(this.logger, evt, this._bufferedTranscripts); - this.logger.debug({evt}, 'TaskGather:resolve buffered results'); - } - this.span.setAttributes({ channel: 1, 'stt.resolve': reason, diff --git a/lib/tasks/stt-task.js b/lib/tasks/stt-task.js index 9d922ebe..5aa07a7d 100644 --- a/lib/tasks/stt-task.js +++ b/lib/tasks/stt-task.js @@ -16,12 +16,14 @@ class SttTask extends Task { normalizeTranscription, removeSpeechListeners, setSpeechCredentialsAtRuntime, - compileSonioxTranscripts + compileSonioxTranscripts, + consolidateTranscripts } = require('../utils/transcription-utils')(logger); this.setChannelVarsForStt = setChannelVarsForStt; this.normalizeTranscription = normalizeTranscription; this.removeSpeechListeners = removeSpeechListeners; this.compileSonioxTranscripts = compileSonioxTranscripts; + this.consolidateTranscripts = consolidateTranscripts; this.isHandledByPrimaryProvider = true; if (this.data.recognizer) { @@ -138,6 +140,14 @@ class SttTask extends Task { addKey(key, evt.compiled_context, 3600 * 12) .catch((err) => this.logger.info({err}, `Error caching cobalt context for ${key}`)); } + + _doContinuousAsrWithDeepgram(asrTimeout) { + /* deepgram has an utterance_end_ms property that simplifies things */ + assert(this.vendor === 'deepgram'); + this.logger.debug(`_doContinuousAsrWithDeepgram - setting utterance_end_ms to ${asrTimeout}`); + const dgOptions = this.data.recognizer.deepgramOptions = this.data.recognizer.deepgramOptions || {}; + dgOptions.utteranceEndMs = dgOptions.utteranceEndMs || asrTimeout; + } } module.exports = SttTask; diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index 30dca36e..098e3d90 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -34,7 +34,9 @@ class TaskTranscribe extends SttTask { // Continuos asr timeout this.asrTimeout = typeof this.data.recognizer.asrTimeout === 'number' ? this.data.recognizer.asrTimeout * 1000 : 0; - this.isContinuousAsr = this.asrTimeout > 0; + if (this.asrTimeout > 0) { + this.isContinuousAsr = true; + } /* buffer speech for continuous asr */ this._bufferedTranscripts = []; } @@ -177,6 +179,12 @@ class TaskTranscribe extends SttTask { async _setSpeechHandlers(cs, ep, channel) { if (this[`_speechHandlersSet_${channel}`]) return; this[`_speechHandlersSet_${channel}`] = true; + + /* some special deepgram logic */ + if (this.vendor === 'deepgram') { + if (this.isContinuousAsr) this._doContinuousAsrWithDeepgram(this.asrTimeout); + } + const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer); switch (this.vendor) { case 'google': @@ -223,6 +231,10 @@ class TaskTranscribe extends SttTask { this._onDeepgramConnect.bind(this, cs, ep, channel)); ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure, this._onDeepGramConnectFailure.bind(this, cs, ep, channel)); + + /* if app sets deepgramOptions.utteranceEndMs they essentially want continuous asr */ + if (opts.DEEPGRAM_SPEECH_UTTERANCE_END_MS) this.isContinuousAsr = true; + break; case 'soniox': this.bugname = 'soniox_transcribe'; @@ -329,8 +341,20 @@ class TaskTranscribe extends SttTask { const bugname = fsEvent.getHeader('media-bugname'); if (bugname && this.bugname !== bugname) return; - if (this.vendor === 'ibm') { - if (evt?.state === 'listening') return; + if (this.vendor === 'ibm' && evt?.state === 'listening') return; + + if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') { + /* we will only get this when we have set utterance_end_ms */ + if (this._bufferedTranscripts.length === 0) { + this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram but no buffered transcripts'); + } + else { + this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram, return buffered transcript'); + evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language); + this._bufferedTranscripts = []; + this._resolve('speech', evt); + } + return; } this.logger.debug({evt}, 'TaskTranscribe:_onTranscription - before normalization'); @@ -369,17 +393,6 @@ class TaskTranscribe extends SttTask { } } - _compileTranscripts() { - assert(this._bufferedTranscripts.length); - const evt = this._bufferedTranscripts[0]; - let t = ''; - for (const a of this._bufferedTranscripts) { - t += ` ${a.alternatives[0].transcript}`; - } - evt.alternatives[0].transcript = t.trim(); - return evt; - } - async _resolve(channel, evt) { /* we've got a transcript, so end the otel child span for this channel */ if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) { @@ -577,11 +590,12 @@ class TaskTranscribe extends SttTask { } _startAsrTimer(channel) { + if (this.vendor === 'deepgram') return; // no need assert(this.isContinuousAsr); this._clearAsrTimer(channel); this._asrTimer = setTimeout(() => { this.logger.debug(`TaskTranscribe:_startAsrTimer - asr timer went off for channel: ${channel}`); - const evt = this._compileTranscripts(); + const evt = this.consolidateTranscripts(this._bufferedTranscripts, channel, this.language); this._bufferedTranscripts = []; this._resolve(channel, evt); }, this.asrTimeout); diff --git a/lib/utils/transcription-utils.js b/lib/utils/transcription-utils.js index 8f0ecd6d..111fc741 100644 --- a/lib/utils/transcription-utils.js +++ b/lib/utils/transcription-utils.js @@ -52,6 +52,7 @@ const stickyVars = { 'DEEPGRAM_SPEECH_SEARCH', 'DEEPGRAM_SPEECH_REPLACE', 'DEEPGRAM_SPEECH_ENDPOINTING', + 'DEEPGRAM_SPEECH_UTTERANCE_END_MS', 'DEEPGRAM_SPEECH_VAD_TURNOFF', 'DEEPGRAM_SPEECH_TAG' ], @@ -106,6 +107,53 @@ const stickyVars = { ] }; +const consolidateTranscripts = (bufferedTranscripts, channel, language) => { + if (bufferedTranscripts.length === 1) return bufferedTranscripts[0]; + let totalConfidence = 0; + const finalTranscript = bufferedTranscripts.reduce((acc, evt) => { + totalConfidence += evt.alternatives[0].confidence; + + let newTranscript = evt.alternatives[0].transcript; + + // If new transcript consists only of digits, spaces, and a trailing comma or period + if (newTranscript.match(/^[\d\s]+[,.]?$/)) { + newTranscript = newTranscript.replace(/\s/g, ''); // Remove all spaces + if (newTranscript.endsWith(',')) { + newTranscript = newTranscript.slice(0, -1); // Remove the trailing comma + } else if (newTranscript.endsWith('.')) { + newTranscript = newTranscript.slice(0, -1); // Remove the trailing period + } + } + + const lastChar = acc.alternatives[0].transcript.slice(-1); + const firstChar = newTranscript.charAt(0); + + if (lastChar.match(/\d/) && firstChar.match(/\d/)) { + acc.alternatives[0].transcript += newTranscript; + } else { + acc.alternatives[0].transcript += ` ${newTranscript}`; + } + + return acc; + }, { + language_code: language, + channel_tag: channel, + is_final: true, + alternatives: [{ + transcript: '' + }] + }); + finalTranscript.alternatives[0].confidence = bufferedTranscripts.length === 1 ? + bufferedTranscripts[0].alternatives[0].confidence : + totalConfidence / bufferedTranscripts.length; + finalTranscript.alternatives[0].transcript = finalTranscript.alternatives[0].transcript.trim(); + finalTranscript.vendor = { + name: 'deepgram', + evt: bufferedTranscripts + }; + return finalTranscript; +}; + const compileSonioxTranscripts = (finalWordChunks, channel, language) => { const words = finalWordChunks.flat(); const transcript = words.reduce((acc, word) => { @@ -163,7 +211,7 @@ const normalizeSoniox = (evt, channel, language) => { }; }; -const normalizeDeepgram = (evt, channel, language) => { +const normalizeDeepgram = (evt, channel, language, shortUtterance) => { const copy = JSON.parse(JSON.stringify(evt)); const alternatives = (evt.channel?.alternatives || []) .map((alt) => ({ @@ -171,10 +219,14 @@ const normalizeDeepgram = (evt, channel, language) => { transcript: alt.transcript, })); + /** + * note difference between is_final and speech_final in Deepgram: + * https://developers.deepgram.com/docs/understand-endpointing-interim-results + */ return { language_code: language, channel_tag: channel, - is_final: evt.is_final, + is_final: shortUtterance ? evt.is_final : evt.speech_final, alternatives: [alternatives[0]], vendor: { name: 'deepgram', @@ -325,12 +377,12 @@ const normalizeAws = (evt, channel, language) => { module.exports = (logger) => { - const normalizeTranscription = (evt, vendor, channel, language) => { + const normalizeTranscription = (evt, vendor, channel, language, shortUtterance) => { //logger.debug({ evt, vendor, channel, language }, 'normalizeTranscription'); switch (vendor) { case 'deepgram': - return normalizeDeepgram(evt, channel, language); + return normalizeDeepgram(evt, channel, language, shortUtterance); case 'microsoft': return normalizeMicrosoft(evt, channel, language); case 'google': @@ -536,6 +588,8 @@ module.exports = (logger) => { {DEEPGRAM_SPEECH_KEYWORDS: deepgramOptions.keywords.join(',')}, ...('endpointing' in deepgramOptions) && {DEEPGRAM_SPEECH_ENDPOINTING: deepgramOptions.endpointing}, + ...(deepgramOptions.utteranceEndMs) && + {DEEPGRAM_SPEECH_UTTERANCE_END_MS: deepgramOptions.utteranceEndMs}, ...(deepgramOptions.vadTurnoff) && {DEEPGRAM_SPEECH_VAD_TURNOFF: deepgramOptions.vadTurnoff}, ...(deepgramOptions.tag) && @@ -743,6 +797,7 @@ module.exports = (logger) => { setChannelVarsForStt, removeSpeechListeners, setSpeechCredentialsAtRuntime, - compileSonioxTranscripts + compileSonioxTranscripts, + consolidateTranscripts }; };