diff --git a/lib/config.js b/lib/config.js index 664ebb0d..de98764d 100644 --- a/lib/config.js +++ b/lib/config.js @@ -231,5 +231,5 @@ module.exports = { JAMBONES_DIAL_SBC_FOR_REGISTERED_USER, JAMBONES_MEDIA_TIMEOUT_MS, JAMBONES_MEDIA_HOLD_TIMEOUT_MS, - JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS + JAMBONES_TRANSCRIBE_EP_DESTROY_DELAY_MS, }; diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 3912ec2b..fcf055c0 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -38,6 +38,7 @@ const BADPRECONDITIONS = 'preconditions not met'; const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction'; const { NonFatalTaskError} = require('../utils/error'); const { createMediaEndpoint } = require('../utils/media-endpoint'); +const SttLatencyCalculator = require('../utils/stt-latency-calculator'); const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks WHERE webhook_sid = ( @@ -147,6 +148,30 @@ class CallSession extends Emitter { this.conversationTurns = []; this.on('userSaid', this._onUserSaid.bind(this)); this.on('botSaid', this._onBotSaid.bind(this)); + /** + * Support STT latency + */ + this.sttLatencyCalculator = new SttLatencyCalculator({ + logger, + cs: this + }); + this.on('transcribe-start', () => { + this.sttLatencyCalculator.resetTime(); + }); + this.on('on-transcription', () => { + this.sttLatencyCalculator.onTranscriptionReceived(); + }); + this.on('transcribe-stop', () => { + this.sttLatencyCalculator.onTranscribeStop(); + }); + } + + get notifySttLatencyEnabled() { + return this._notifySttLatencyEnabled || false; + } + + set notifySttLatencyEnabled(enabled) { + this._notifySttLatencyEnabled = enabled; } /** @@ -2480,6 +2505,8 @@ Duration=${duration} ` this.clearOrRestoreActionHookDelayProcessor().catch((err) => {}); this.ttsStreamingBuffer?.stop(); + + this.sttLatencyCalculator?.stop(); } /** @@ -3127,6 +3154,20 @@ Duration=${duration} ` return `assistant: ${t.text}`; }).join('\n'); } + + startSttLatencyVad() { + if (this.notifySttLatencyEnabled) { + this.sttLatencyCalculator.start(); + } + } + + stopSttLatencyVad() { + this.sttLatencyCalculator.stop(); + } + + calculateSttLatency() { + return this.sttLatencyCalculator.calculateLatency(); + } } module.exports = CallSession; diff --git a/lib/tasks/config.js b/lib/tasks/config.js index fcc4cace..64b2822a 100644 --- a/lib/tasks/config.js +++ b/lib/tasks/config.js @@ -24,6 +24,9 @@ class TaskConfig extends Task { if ('notifyEvents' in this.data) { this.notifyEvents = !!this.data.notifyEvents; } + if (this.hasNotifySttLatency) { + this.notifySttLatency = !!this.data.notifySttLatency; + } if (this.bargeIn.enable) { this.gatherOpts = { @@ -83,6 +86,7 @@ class TaskConfig extends Task { get hasVad() { return Object.keys(this.vad).length; } get hasFillerNoise() { return Object.keys(this.fillerNoise).length; } get hasReferHook() { return Object.keys(this.data).includes('referHook'); } + get hasNotifySttLatency() { return Object.keys(this.data).includes('notifySttLatency'); } get hasTtsStream() { return Object.keys(this.ttsStream).length; } get summary() { @@ -112,6 +116,8 @@ class TaskConfig extends Task { if (this.hasFillerNoise) phrase.push(`fillerNoise ${this.fillerNoise.enable ? 'on' : 'off'}`); if (this.data.amd) phrase.push('enable amd'); if (this.notifyEvents) phrase.push(`event notification ${this.notifyEvents ? 'on' : 'off'}`); + if (this.hasNotifySttLatency) phrase.push( + `notifySttLatency ${this.notifySttLatency ? 'on' : 'off'}`); if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`); if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`); if (this.hasReferHook) phrase.push('set referHook'); @@ -130,6 +136,11 @@ class TaskConfig extends Task { cs.notifyEvents = !!this.data.notifyEvents; } + if (this.hasNotifySttLatency) { + this.logger.debug(`turning notifySttLatency ${this.notifySttLatency ? 'on' : 'off'}`); + cs.notifySttLatencyEnabled = this.notifySttLatency; + } + if (this.onHoldMusic) { cs.onHoldMusic = this.onHoldMusic; } @@ -318,7 +329,10 @@ class TaskConfig extends Task { voiceMs: this.vad.voiceMs || 250, silenceMs: this.vad.silenceMs || 150, strategy: this.vad.strategy || 'one-shot', - mode: (this.vad.mode !== undefined && this.vad.mode !== null) ? this.vad.mode : 2 + mode: (this.vad.mode !== undefined && this.vad.mode !== null) ? this.vad.mode : 2, + vendor: this.vad.vendor || 'silero', + threshold: this.vad.threshold || 0.5, + speechPadMs: this.vad.speechPadMs || 30, }; } diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index 925e14c3..ee0fb5ee 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -351,6 +351,7 @@ class TaskGather extends SttTask { this.sayTask?.span.end(); this._stopVad(); this._resolve('killed'); + cs.stopSttLatencyVad(); } updateTaskInProgress(opts) { @@ -390,16 +391,7 @@ class TaskGather extends SttTask { if (this.digitBuffer.length === 0 && this.needsStt) { // DTMF is higher priority than STT. this.removeCustomEventListeners(); - // Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281 - // We should immediately call stop transcription from gather - // so that next gather can start transcription immediately - ep.stopTranscription({ - vendor: this.vendor, - bugname: this.bugname, - gracefulShutdown: false - }) - .catch((err) => this.logger.error({err}, - ` Received DTMF, Error stopping transcription for vendor ${this.vendor}`)); + this._stopTranscribing(ep); } this.digitBuffer += evt.dtmf; const len = this.digitBuffer.length; @@ -686,6 +678,9 @@ class TaskGather extends SttTask { target_sid: this.cs.callSid }); }).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure')); + + // Some vendor use single connection, that we cannot use onConnect event to track transcription start + this.cs.emit('transcribe-start'); } _startTimer() { @@ -869,6 +864,10 @@ class TaskGather extends SttTask { if (finished === 'true') return; if (this.vendor === 'ibm' && evt?.state === 'listening') return; + + // emit an event to the call session to track the time transcription is received + cs.emit('on-transcription'); + if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') { /* we will only get this when we have set utterance_end_ms */ if (this._bufferedTranscripts.length === 0) { @@ -952,6 +951,12 @@ class TaskGather extends SttTask { } } + // receive a final transcript, calculate the stt latency for this transcript + const sttLatency = this.cs.calculateSttLatency(); + if (!emptyTranscript && sttLatency) { + this.stt_latency_ms += `${sttLatency.stt_latency_ms},`; + } + if (this.isContinuousAsr) { /* append the transcript and start listening again for asrTimeout */ const t = evt.alternatives[0].transcript; @@ -1103,12 +1108,7 @@ class TaskGather extends SttTask { async _startFallback(cs, ep, evt) { if (this.canFallback) { - ep.stopTranscription({ - vendor: this.vendor, - bugname: this.bugname, - gracefulShutdown: false - }) - .catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`)); + this._stopTranscribing(ep); try { this.logger.debug('gather:_startFallback'); this.notifyError({ msg: 'ASR error', @@ -1237,21 +1237,26 @@ class TaskGather extends SttTask { } } + async _stopTranscribing(ep) { + // Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281 + // We should immediately call stop transcription from gather + // so that next gather can start transcription immediately + ep.stopTranscription({ + vendor: this.vendor, + bugname: this.bugname, + gracefulShutdown: false + }) + .catch((err) => { + if (this.resolved) return; + this.logger.error({err}, 'Error stopping transcription'); + }); + this.cs.emit('transcribe-stop'); + } + async _resolve(reason, evt) { this.logger.info({evt}, `TaskGather:resolve with reason ${reason}`); if (this.needsStt && this.ep && this.ep.connected) { - // Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281 - // We should immediately call stop transcription from gather - // so that next gather can start transcription immediately - this.ep.stopTranscription({ - vendor: this.vendor, - bugname: this.bugname, - gracefulShutdown: false - }) - .catch((err) => { - if (this.resolved) return; - this.logger.error({err}, 'Error stopping transcription'); - }); + this._stopTranscribing(this.ep); } if (this.resolved) { this.logger.debug('TaskGather:_resolve - already resolved'); @@ -1270,11 +1275,28 @@ class TaskGather extends SttTask { this._clearAsrTimer(); this._clearFinalAsrTimer(); + let sttLatencyMetrics = {}; + if (this.needsStt) { + const sttLatency = this.cs.calculateSttLatency(); + if (sttLatency) { + this.stt_latency_ms = this.stt_latency_ms.endsWith(',') ? + this.stt_latency_ms.slice(0, -1) : this.stt_latency_ms; + sttLatencyMetrics = { + 'stt.latency_ms': this.stt_latency_ms, + 'stt.talkspurts': JSON.stringify(sttLatency.talkspurts), + 'stt.start_time': sttLatency.stt_start_time, + 'stt.stop_time': sttLatency.stt_stop_time, + 'stt.usage': sttLatency.stt_usage, + }; + } + } + this.span.setAttributes({ channel: 1, 'stt.label': this.label || 'None', 'stt.resolve': reason, - 'stt.result': JSON.stringify(evt) + 'stt.result': JSON.stringify(evt), + ...sttLatencyMetrics }); if (this.callSession && this.callSession.callGone) { @@ -1302,6 +1324,9 @@ class TaskGather extends SttTask { let returnedVerbs = false; try { + const latencies = Object.fromEntries( + Object.entries(sttLatencyMetrics).map(([key, value]) => [key.replace('stt.', 'stt_'), value]) + ); if (reason.startsWith('dtmf')) { if (this.parentTask) this.parentTask.emit('dtmf', evt); else { @@ -1315,7 +1340,7 @@ class TaskGather extends SttTask { else { this.emit('transcription', evt); this.logger.debug('TaskGather:_resolve - invoking performAction'); - returnedVerbs = await this.performAction({speech: evt, reason: 'speechDetected'}); + returnedVerbs = await this.performAction({speech: evt, reason: 'speechDetected', ...latencies}); this.logger.debug({returnedVerbs}, 'TaskGather:_resolve - back from performAction'); } } @@ -1323,20 +1348,20 @@ class TaskGather extends SttTask { if (this.parentTask) this.parentTask.emit('timeout', evt); else { this.emit('timeout', evt); - returnedVerbs = await this.performAction({reason: 'timeout'}); + returnedVerbs = await this.performAction({reason: 'timeout', ...latencies}); } } else if (reason.startsWith('stt-error')) { if (this.parentTask) this.parentTask.emit('stt-error', evt); else { this.emit('stt-error', evt); - returnedVerbs = await this.performAction({reason: 'error', details: evt.error}); + returnedVerbs = await this.performAction({reason: 'error', details: evt.error, ...latencies}); } } else if (reason.startsWith('stt-low-confidence')) { if (this.parentTask) this.parentTask.emit('stt-low-confidence', evt); else { this.emit('stt-low-confidence', evt); - returnedVerbs = await this.performAction({speech:evt, reason: 'stt-low-confidence'}); + returnedVerbs = await this.performAction({speech:evt, reason: 'stt-low-confidence', ...latencies}); } } } catch (err) { /*already logged error*/ } diff --git a/lib/tasks/stt-task.js b/lib/tasks/stt-task.js index 7f601188..ad40b605 100644 --- a/lib/tasks/stt-task.js +++ b/lib/tasks/stt-task.js @@ -4,6 +4,7 @@ const crypto = require('crypto'); const { TaskPreconditions, CobaltTranscriptionEvents } = require('../utils/constants'); const { SpeechCredentialError } = require('../utils/error'); const {JAMBONES_AWS_TRANSCRIBE_USE_GRPC} = require('../config'); +const {TaskName} = require('../utils/constants.json'); /** * "Please insert turns here: {{turns:4}}" @@ -84,6 +85,9 @@ class SttTask extends Task { /*bug name prefix */ this.bugname_prefix = ''; + // stt latency calculator + this.stt_latency_ms = ''; + } async exec(cs, {ep, ep2}) { @@ -91,6 +95,12 @@ class SttTask extends Task { this.ep = ep; this.ep2 = ep2; + // start vad from stt latency calculator + if (this.name !== TaskName.Gather || + this.name === TaskName.Gather && this.needsStt) { + cs.startSttLatencyVad(); + } + // use session preferences if we don't have specific verb-level settings. if (cs.recognizer) { for (const k in cs.recognizer) { @@ -400,7 +410,7 @@ class SttTask extends Task { dgOptions.utteranceEndMs = dgOptions.utteranceEndMs || asrTimeout; } - _onVendorConnect(_cs, _ep) { + _onVendorConnect(cs, _ep) { this.logger.debug(`TaskGather:_on${this.vendor}Connect`); } diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index 16b68b20..d0dd9c56 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -152,12 +152,15 @@ class TaskTranscribe extends SttTask { .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); } + this.cs.emit('transcribe-stop'); + return stopTranscription; } async kill(cs) { super.kill(cs); const stopTranscription = this._stopTranscription(); + cs.stopSttLatencyVad(); // hangup after 1 sec if we don't get a final transcription if (stopTranscription) this._timer = setTimeout(() => this.notifyTaskDone(), 1500); else this.notifyTaskDone(); @@ -423,6 +426,9 @@ class TaskTranscribe extends SttTask { bugname: this.bugname, hostport: this.hostport }); + + // Some vendor use single connection, that we cannot use onConnect event to track transcription start + this.cs.emit('transcribe-start'); } async _onTranscription(cs, ep, channel, evt, fsEvent) { @@ -441,6 +447,9 @@ class TaskTranscribe extends SttTask { if (this.vendor === 'ibm' && evt?.state === 'listening') return; + // emit an event to the call session to track the time transcription is received + cs.emit('on-transcription'); + if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') { /* we will only get this when we have set utterance_end_ms */ @@ -602,14 +611,28 @@ class TaskTranscribe extends SttTask { } async _resolve(channel, evt) { + let sttLatencyMetrics = {}; if (evt.is_final) { + const sttLatency = this.cs.calculateSttLatency(); + if (sttLatency) { + sttLatencyMetrics = { + 'stt.latency_ms': `${sttLatency.stt_latency_ms}`, + 'stt.talkspurts': JSON.stringify(sttLatency.talkspurts), + 'stt.start_time': sttLatency.stt_start_time, + 'stt.stop_time': sttLatency.stt_stop_time, + 'stt.usage': sttLatency.stt_usage, + }; + } + // time to reset the stt latency + this.cs.emit('transcribe-start'); /* we've got a final transcript, so end the otel child span for this channel */ if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) { this.childSpan[channel - 1].span.setAttributes({ channel, 'stt.label': this.label || 'None', 'stt.resolve': 'transcript', - 'stt.result': JSON.stringify(evt) + 'stt.result': JSON.stringify(evt), + ...sttLatencyMetrics }); this.childSpan[channel - 1].span.end(); } @@ -618,9 +641,13 @@ class TaskTranscribe extends SttTask { if (this.transcriptionHook) { const b3 = this.getTracingPropagation(); const httpHeaders = b3 && {b3}; + const latencies = Object.fromEntries( + Object.entries(sttLatencyMetrics).map(([key, value]) => [key.replace('stt.', 'stt_'), value]) + ); const payload = { ...this.cs.callInfo, ...httpHeaders, + ...latencies, ...(evt.alternatives && {speech: evt}), ...(evt.type && {speechEvent: evt}) }; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 1d777bc2..008ac349 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -177,6 +177,9 @@ "VadDetection": { "Detection": "vad_detect:detection" }, + "SileroVadDetection": { + "Detection": "vad_silero:detect" + }, "ListenEvents": { "Connect": "mod_audio_fork::connect", "ConnectFailure": "mod_audio_fork::connect_failed", diff --git a/lib/utils/stt-latency-calculator.js b/lib/utils/stt-latency-calculator.js new file mode 100644 index 00000000..7130b503 --- /dev/null +++ b/lib/utils/stt-latency-calculator.js @@ -0,0 +1,199 @@ +const { assert } = require('console'); +const Emitter = require('events'); +const { + VadDetection, + SileroVadDetection +} = require('../utils/constants.json'); + +class SttLatencyCalculator extends Emitter { + constructor({ logger, cs}) { + super(); + this.logger = logger; + this.cs = cs; + this.isRunning = false; + this.isInTalkSpurt = false; + this.start_talking_time = 0; + this.talkspurts = []; + this.vendor = this.cs.vad?.vendor || 'silero'; + this.stt_start_time = 0; + this.stt_stop_time = 0; + this.stt_on_transcription_time = 0; + } + + set sttStartTime(time) { + this.stt_start_time = time; + } + + get sttStartTime() { + return this.stt_start_time || 0; + } + + set sttStopTime(time) { + this.stt_stop_time = time; + } + + get sttStopTime() { + return this.stt_stop_time || 0; + } + + set sttOnTranscriptionTime(time) { + this.stt_on_transcription_time = time; + } + + get sttOnTranscriptionTime() { + return this.stt_on_transcription_time || 0; + } + + _onVadDetected(_ep, _evt, fsEvent) { + if (fsEvent.getHeader('detected-event') === 'stop_talking') { + if (this.isInTalkSpurt) { + this.talkspurts.push({ + start: this.start_talking_time, + stop: Date.now() + }); + } + + this.start_talking_time = 0; + this.isInTalkSpurt = false; + } else if (fsEvent.getHeader('detected-event') === 'start_talking') { + this.start_talking_time = Date.now(); + this.isInTalkSpurt = true; + } + } + + _startVad() { + assert(!this.isRunning, 'Latency calculator is already running'); + assert(this.cs.ep, 'Callsession has no endpoint to start the latency calculator'); + const ep = this.cs.ep; + if (!ep.sttLatencyVadHandler) { + ep.sttLatencyVadHandler = this._onVadDetected.bind(this, ep); + if (this.vendor === 'silero') { + ep.addCustomEventListener(SileroVadDetection.Detection, ep.sttLatencyVadHandler); + } else { + ep.addCustomEventListener(VadDetection.Detection, ep.sttLatencyVadHandler); + } + } + this.stop_talking_time = 0; + this.start_talking_time = 0; + this.vad = { + ...(this.cs.vad || {}), + strategy: 'continuous', + bugname: 'stt-latency-calculator-vad', + vendor: this.vendor + }; + + ep.startVadDetection(this.vad); + this.isRunning = true; + } + + _stopVad() { + if (this.isRunning) { + this.logger.warn('Latency calculator is still running, stopping VAD detection'); + const ep = this.cs.ep; + ep.stopVadDetection(this.vad); + if (ep.sttLatencyVadHandler) { + if (this.vendor === 'silero') { + this.ep?.removeCustomEventListener(SileroVadDetection.Detection, ep.sttLatencyVadHandler); + } else { + this.ep?.removeCustomEventListener(VadDetection.Detection, ep.sttLatencyVadHandler); + } + ep.sttLatencyVadHandler = null; + } + this.isRunning = false; + this.logger.info('STT Latency Calculator stopped'); + } else { + this.logger.warn('Latency calculator is not running, no VAD detection to stop'); + } + } + + start() { + if (this.isRunning) { + this.logger.warn('Latency calculator is already running'); + return; + } + if (!this.cs.ep) { + this.logger.error('Callsession has no endpoint to start the latency calculator'); + return; + } + this._startVad(); + this.logger.info('STT Latency Calculator started'); + } + + stop() { + this._stopVad(); + } + + toUnixTimestamp(date) { + return Math.floor(date / 1000); + } + + calculateLatency() { + if (!this.isRunning) { + this.logger.debug('Latency calculator is not running, cannot calculate latency, returning default values'); + return null; + } + + const stt_stop_time = this.stt_stop_time || Date.now(); + if (this.isInTalkSpurt) { + this.talkspurts.push({ + start: this.start_talking_time, + stop: stt_stop_time + }); + this.isInTalkSpurt = false; + this.start_talking_time = 0; + } + const stt_on_transcription_time = this.stt_on_transcription_time || stt_stop_time; + const start_talking_time = this.talkspurts[0]?.start; + let lastIdx = this.talkspurts.length - 1; + lastIdx = lastIdx < 0 ? 0 : lastIdx; + const stop_talking_time = this.talkspurts[lastIdx]?.stop || stt_stop_time; + + return { + stt_start_time: this.toUnixTimestamp(this.stt_start_time), + stt_stop_time: this.toUnixTimestamp(stt_stop_time), + start_talking_time: this.toUnixTimestamp(start_talking_time), + stop_talking_time: this.toUnixTimestamp(stop_talking_time), + stt_latency: parseFloat((Math.abs(stt_on_transcription_time - stop_talking_time)) / 1000).toFixed(2), + stt_latency_ms: Math.abs(stt_on_transcription_time - stop_talking_time), + stt_usage: parseFloat((stt_stop_time - this.stt_start_time) / 1000).toFixed(2), + talkspurts: this.talkspurts.map((ts) => + ([this.toUnixTimestamp(ts.start || 0), this.toUnixTimestamp(ts.stop || 0)])) + }; + } + + resetTime() { + if (!this.isRunning) { + return; + } + this.stt_start_time = Date.now(); + this.stt_stop_time = 0; + this.stt_on_transcription_time = 0; + this.clearTalkspurts(); + this.logger.info('STT Latency Calculator reset'); + } + + onTranscriptionReceived() { + if (!this.isRunning) { + return; + } + this.stt_on_transcription_time = Date.now(); + this.logger.debug(`CallSession:on-transcription set to ${this.stt_on_transcription_time}`); + } + + onTranscribeStop() { + if (!this.isRunning) { + return; + } + this.stt_stop_time = Date.now(); + this.logger.debug(`CallSession:transcribe-stop set to ${this.stt_stop_time}`); + } + + clearTalkspurts() { + this.talkspurts = []; + if (!this.isInTalkSpurt) { + this.start_talking_time = 0; + } + } +} + +module.exports = SttLatencyCalculator;