mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 16:50:39 +00:00
support stt latency metrics (#1252)
* support stt latency metrics * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * enable stt latency calculator by config verb * wip * wip * wip * fix jslint * fixed gather timeout does not have latency calculation * upadte verb specification to use notifySttLatency * move stt latency metric from call session to stt-latency calculator * wip
This commit is contained in:
@@ -24,6 +24,9 @@ class TaskConfig extends Task {
|
||||
if ('notifyEvents' in this.data) {
|
||||
this.notifyEvents = !!this.data.notifyEvents;
|
||||
}
|
||||
if (this.hasNotifySttLatency) {
|
||||
this.notifySttLatency = !!this.data.notifySttLatency;
|
||||
}
|
||||
|
||||
if (this.bargeIn.enable) {
|
||||
this.gatherOpts = {
|
||||
@@ -83,6 +86,7 @@ class TaskConfig extends Task {
|
||||
get hasVad() { return Object.keys(this.vad).length; }
|
||||
get hasFillerNoise() { return Object.keys(this.fillerNoise).length; }
|
||||
get hasReferHook() { return Object.keys(this.data).includes('referHook'); }
|
||||
get hasNotifySttLatency() { return Object.keys(this.data).includes('notifySttLatency'); }
|
||||
get hasTtsStream() { return Object.keys(this.ttsStream).length; }
|
||||
|
||||
get summary() {
|
||||
@@ -112,6 +116,8 @@ class TaskConfig extends Task {
|
||||
if (this.hasFillerNoise) phrase.push(`fillerNoise ${this.fillerNoise.enable ? 'on' : 'off'}`);
|
||||
if (this.data.amd) phrase.push('enable amd');
|
||||
if (this.notifyEvents) phrase.push(`event notification ${this.notifyEvents ? 'on' : 'off'}`);
|
||||
if (this.hasNotifySttLatency) phrase.push(
|
||||
`notifySttLatency ${this.notifySttLatency ? 'on' : 'off'}`);
|
||||
if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`);
|
||||
if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`);
|
||||
if (this.hasReferHook) phrase.push('set referHook');
|
||||
@@ -130,6 +136,11 @@ class TaskConfig extends Task {
|
||||
cs.notifyEvents = !!this.data.notifyEvents;
|
||||
}
|
||||
|
||||
if (this.hasNotifySttLatency) {
|
||||
this.logger.debug(`turning notifySttLatency ${this.notifySttLatency ? 'on' : 'off'}`);
|
||||
cs.notifySttLatencyEnabled = this.notifySttLatency;
|
||||
}
|
||||
|
||||
if (this.onHoldMusic) {
|
||||
cs.onHoldMusic = this.onHoldMusic;
|
||||
}
|
||||
@@ -318,7 +329,10 @@ class TaskConfig extends Task {
|
||||
voiceMs: this.vad.voiceMs || 250,
|
||||
silenceMs: this.vad.silenceMs || 150,
|
||||
strategy: this.vad.strategy || 'one-shot',
|
||||
mode: (this.vad.mode !== undefined && this.vad.mode !== null) ? this.vad.mode : 2
|
||||
mode: (this.vad.mode !== undefined && this.vad.mode !== null) ? this.vad.mode : 2,
|
||||
vendor: this.vad.vendor || 'silero',
|
||||
threshold: this.vad.threshold || 0.5,
|
||||
speechPadMs: this.vad.speechPadMs || 30,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -351,6 +351,7 @@ class TaskGather extends SttTask {
|
||||
this.sayTask?.span.end();
|
||||
this._stopVad();
|
||||
this._resolve('killed');
|
||||
cs.stopSttLatencyVad();
|
||||
}
|
||||
|
||||
updateTaskInProgress(opts) {
|
||||
@@ -390,16 +391,7 @@ class TaskGather extends SttTask {
|
||||
if (this.digitBuffer.length === 0 && this.needsStt) {
|
||||
// DTMF is higher priority than STT.
|
||||
this.removeCustomEventListeners();
|
||||
// Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281
|
||||
// We should immediately call stop transcription from gather
|
||||
// so that next gather can start transcription immediately
|
||||
ep.stopTranscription({
|
||||
vendor: this.vendor,
|
||||
bugname: this.bugname,
|
||||
gracefulShutdown: false
|
||||
})
|
||||
.catch((err) => this.logger.error({err},
|
||||
` Received DTMF, Error stopping transcription for vendor ${this.vendor}`));
|
||||
this._stopTranscribing(ep);
|
||||
}
|
||||
this.digitBuffer += evt.dtmf;
|
||||
const len = this.digitBuffer.length;
|
||||
@@ -686,6 +678,9 @@ class TaskGather extends SttTask {
|
||||
target_sid: this.cs.callSid
|
||||
});
|
||||
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
|
||||
|
||||
// Some vendor use single connection, that we cannot use onConnect event to track transcription start
|
||||
this.cs.emit('transcribe-start');
|
||||
}
|
||||
|
||||
_startTimer() {
|
||||
@@ -869,6 +864,10 @@ class TaskGather extends SttTask {
|
||||
if (finished === 'true') return;
|
||||
|
||||
if (this.vendor === 'ibm' && evt?.state === 'listening') return;
|
||||
|
||||
// emit an event to the call session to track the time transcription is received
|
||||
cs.emit('on-transcription');
|
||||
|
||||
if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') {
|
||||
/* we will only get this when we have set utterance_end_ms */
|
||||
if (this._bufferedTranscripts.length === 0) {
|
||||
@@ -952,6 +951,12 @@ class TaskGather extends SttTask {
|
||||
}
|
||||
}
|
||||
|
||||
// receive a final transcript, calculate the stt latency for this transcript
|
||||
const sttLatency = this.cs.calculateSttLatency();
|
||||
if (!emptyTranscript && sttLatency) {
|
||||
this.stt_latency_ms += `${sttLatency.stt_latency_ms},`;
|
||||
}
|
||||
|
||||
if (this.isContinuousAsr) {
|
||||
/* append the transcript and start listening again for asrTimeout */
|
||||
const t = evt.alternatives[0].transcript;
|
||||
@@ -1103,12 +1108,7 @@ class TaskGather extends SttTask {
|
||||
|
||||
async _startFallback(cs, ep, evt) {
|
||||
if (this.canFallback) {
|
||||
ep.stopTranscription({
|
||||
vendor: this.vendor,
|
||||
bugname: this.bugname,
|
||||
gracefulShutdown: false
|
||||
})
|
||||
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
|
||||
this._stopTranscribing(ep);
|
||||
try {
|
||||
this.logger.debug('gather:_startFallback');
|
||||
this.notifyError({ msg: 'ASR error',
|
||||
@@ -1237,21 +1237,26 @@ class TaskGather extends SttTask {
|
||||
}
|
||||
}
|
||||
|
||||
async _stopTranscribing(ep) {
|
||||
// Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281
|
||||
// We should immediately call stop transcription from gather
|
||||
// so that next gather can start transcription immediately
|
||||
ep.stopTranscription({
|
||||
vendor: this.vendor,
|
||||
bugname: this.bugname,
|
||||
gracefulShutdown: false
|
||||
})
|
||||
.catch((err) => {
|
||||
if (this.resolved) return;
|
||||
this.logger.error({err}, 'Error stopping transcription');
|
||||
});
|
||||
this.cs.emit('transcribe-stop');
|
||||
}
|
||||
|
||||
async _resolve(reason, evt) {
|
||||
this.logger.info({evt}, `TaskGather:resolve with reason ${reason}`);
|
||||
if (this.needsStt && this.ep && this.ep.connected) {
|
||||
// Fix for https://github.com/jambonz/jambonz-feature-server/issues/1281
|
||||
// We should immediately call stop transcription from gather
|
||||
// so that next gather can start transcription immediately
|
||||
this.ep.stopTranscription({
|
||||
vendor: this.vendor,
|
||||
bugname: this.bugname,
|
||||
gracefulShutdown: false
|
||||
})
|
||||
.catch((err) => {
|
||||
if (this.resolved) return;
|
||||
this.logger.error({err}, 'Error stopping transcription');
|
||||
});
|
||||
this._stopTranscribing(this.ep);
|
||||
}
|
||||
if (this.resolved) {
|
||||
this.logger.debug('TaskGather:_resolve - already resolved');
|
||||
@@ -1270,11 +1275,28 @@ class TaskGather extends SttTask {
|
||||
this._clearAsrTimer();
|
||||
this._clearFinalAsrTimer();
|
||||
|
||||
let sttLatencyMetrics = {};
|
||||
if (this.needsStt) {
|
||||
const sttLatency = this.cs.calculateSttLatency();
|
||||
if (sttLatency) {
|
||||
this.stt_latency_ms = this.stt_latency_ms.endsWith(',') ?
|
||||
this.stt_latency_ms.slice(0, -1) : this.stt_latency_ms;
|
||||
sttLatencyMetrics = {
|
||||
'stt.latency_ms': this.stt_latency_ms,
|
||||
'stt.talkspurts': JSON.stringify(sttLatency.talkspurts),
|
||||
'stt.start_time': sttLatency.stt_start_time,
|
||||
'stt.stop_time': sttLatency.stt_stop_time,
|
||||
'stt.usage': sttLatency.stt_usage,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
this.span.setAttributes({
|
||||
channel: 1,
|
||||
'stt.label': this.label || 'None',
|
||||
'stt.resolve': reason,
|
||||
'stt.result': JSON.stringify(evt)
|
||||
'stt.result': JSON.stringify(evt),
|
||||
...sttLatencyMetrics
|
||||
});
|
||||
|
||||
if (this.callSession && this.callSession.callGone) {
|
||||
@@ -1302,6 +1324,9 @@ class TaskGather extends SttTask {
|
||||
|
||||
let returnedVerbs = false;
|
||||
try {
|
||||
const latencies = Object.fromEntries(
|
||||
Object.entries(sttLatencyMetrics).map(([key, value]) => [key.replace('stt.', 'stt_'), value])
|
||||
);
|
||||
if (reason.startsWith('dtmf')) {
|
||||
if (this.parentTask) this.parentTask.emit('dtmf', evt);
|
||||
else {
|
||||
@@ -1315,7 +1340,7 @@ class TaskGather extends SttTask {
|
||||
else {
|
||||
this.emit('transcription', evt);
|
||||
this.logger.debug('TaskGather:_resolve - invoking performAction');
|
||||
returnedVerbs = await this.performAction({speech: evt, reason: 'speechDetected'});
|
||||
returnedVerbs = await this.performAction({speech: evt, reason: 'speechDetected', ...latencies});
|
||||
this.logger.debug({returnedVerbs}, 'TaskGather:_resolve - back from performAction');
|
||||
}
|
||||
}
|
||||
@@ -1323,20 +1348,20 @@ class TaskGather extends SttTask {
|
||||
if (this.parentTask) this.parentTask.emit('timeout', evt);
|
||||
else {
|
||||
this.emit('timeout', evt);
|
||||
returnedVerbs = await this.performAction({reason: 'timeout'});
|
||||
returnedVerbs = await this.performAction({reason: 'timeout', ...latencies});
|
||||
}
|
||||
}
|
||||
else if (reason.startsWith('stt-error')) {
|
||||
if (this.parentTask) this.parentTask.emit('stt-error', evt);
|
||||
else {
|
||||
this.emit('stt-error', evt);
|
||||
returnedVerbs = await this.performAction({reason: 'error', details: evt.error});
|
||||
returnedVerbs = await this.performAction({reason: 'error', details: evt.error, ...latencies});
|
||||
}
|
||||
} else if (reason.startsWith('stt-low-confidence')) {
|
||||
if (this.parentTask) this.parentTask.emit('stt-low-confidence', evt);
|
||||
else {
|
||||
this.emit('stt-low-confidence', evt);
|
||||
returnedVerbs = await this.performAction({speech:evt, reason: 'stt-low-confidence'});
|
||||
returnedVerbs = await this.performAction({speech:evt, reason: 'stt-low-confidence', ...latencies});
|
||||
}
|
||||
}
|
||||
} catch (err) { /*already logged error*/ }
|
||||
|
||||
@@ -4,6 +4,7 @@ const crypto = require('crypto');
|
||||
const { TaskPreconditions, CobaltTranscriptionEvents } = require('../utils/constants');
|
||||
const { SpeechCredentialError } = require('../utils/error');
|
||||
const {JAMBONES_AWS_TRANSCRIBE_USE_GRPC} = require('../config');
|
||||
const {TaskName} = require('../utils/constants.json');
|
||||
|
||||
/**
|
||||
* "Please insert turns here: {{turns:4}}"
|
||||
@@ -84,6 +85,9 @@ class SttTask extends Task {
|
||||
/*bug name prefix */
|
||||
this.bugname_prefix = '';
|
||||
|
||||
// stt latency calculator
|
||||
this.stt_latency_ms = '';
|
||||
|
||||
}
|
||||
|
||||
async exec(cs, {ep, ep2}) {
|
||||
@@ -91,6 +95,12 @@ class SttTask extends Task {
|
||||
this.ep = ep;
|
||||
this.ep2 = ep2;
|
||||
|
||||
// start vad from stt latency calculator
|
||||
if (this.name !== TaskName.Gather ||
|
||||
this.name === TaskName.Gather && this.needsStt) {
|
||||
cs.startSttLatencyVad();
|
||||
}
|
||||
|
||||
// use session preferences if we don't have specific verb-level settings.
|
||||
if (cs.recognizer) {
|
||||
for (const k in cs.recognizer) {
|
||||
@@ -400,7 +410,7 @@ class SttTask extends Task {
|
||||
dgOptions.utteranceEndMs = dgOptions.utteranceEndMs || asrTimeout;
|
||||
}
|
||||
|
||||
_onVendorConnect(_cs, _ep) {
|
||||
_onVendorConnect(cs, _ep) {
|
||||
this.logger.debug(`TaskGather:_on${this.vendor}Connect`);
|
||||
}
|
||||
|
||||
|
||||
@@ -152,12 +152,15 @@ class TaskTranscribe extends SttTask {
|
||||
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
|
||||
}
|
||||
|
||||
this.cs.emit('transcribe-stop');
|
||||
|
||||
return stopTranscription;
|
||||
}
|
||||
|
||||
async kill(cs) {
|
||||
super.kill(cs);
|
||||
const stopTranscription = this._stopTranscription();
|
||||
cs.stopSttLatencyVad();
|
||||
// hangup after 1 sec if we don't get a final transcription
|
||||
if (stopTranscription) this._timer = setTimeout(() => this.notifyTaskDone(), 1500);
|
||||
else this.notifyTaskDone();
|
||||
@@ -423,6 +426,9 @@ class TaskTranscribe extends SttTask {
|
||||
bugname: this.bugname,
|
||||
hostport: this.hostport
|
||||
});
|
||||
|
||||
// Some vendor use single connection, that we cannot use onConnect event to track transcription start
|
||||
this.cs.emit('transcribe-start');
|
||||
}
|
||||
|
||||
async _onTranscription(cs, ep, channel, evt, fsEvent) {
|
||||
@@ -441,6 +447,9 @@ class TaskTranscribe extends SttTask {
|
||||
|
||||
if (this.vendor === 'ibm' && evt?.state === 'listening') return;
|
||||
|
||||
// emit an event to the call session to track the time transcription is received
|
||||
cs.emit('on-transcription');
|
||||
|
||||
if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') {
|
||||
/* we will only get this when we have set utterance_end_ms */
|
||||
|
||||
@@ -602,14 +611,28 @@ class TaskTranscribe extends SttTask {
|
||||
}
|
||||
|
||||
async _resolve(channel, evt) {
|
||||
let sttLatencyMetrics = {};
|
||||
if (evt.is_final) {
|
||||
const sttLatency = this.cs.calculateSttLatency();
|
||||
if (sttLatency) {
|
||||
sttLatencyMetrics = {
|
||||
'stt.latency_ms': `${sttLatency.stt_latency_ms}`,
|
||||
'stt.talkspurts': JSON.stringify(sttLatency.talkspurts),
|
||||
'stt.start_time': sttLatency.stt_start_time,
|
||||
'stt.stop_time': sttLatency.stt_stop_time,
|
||||
'stt.usage': sttLatency.stt_usage,
|
||||
};
|
||||
}
|
||||
// time to reset the stt latency
|
||||
this.cs.emit('transcribe-start');
|
||||
/* we've got a final transcript, so end the otel child span for this channel */
|
||||
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
|
||||
this.childSpan[channel - 1].span.setAttributes({
|
||||
channel,
|
||||
'stt.label': this.label || 'None',
|
||||
'stt.resolve': 'transcript',
|
||||
'stt.result': JSON.stringify(evt)
|
||||
'stt.result': JSON.stringify(evt),
|
||||
...sttLatencyMetrics
|
||||
});
|
||||
this.childSpan[channel - 1].span.end();
|
||||
}
|
||||
@@ -618,9 +641,13 @@ class TaskTranscribe extends SttTask {
|
||||
if (this.transcriptionHook) {
|
||||
const b3 = this.getTracingPropagation();
|
||||
const httpHeaders = b3 && {b3};
|
||||
const latencies = Object.fromEntries(
|
||||
Object.entries(sttLatencyMetrics).map(([key, value]) => [key.replace('stt.', 'stt_'), value])
|
||||
);
|
||||
const payload = {
|
||||
...this.cs.callInfo,
|
||||
...httpHeaders,
|
||||
...latencies,
|
||||
...(evt.alternatives && {speech: evt}),
|
||||
...(evt.type && {speechEvent: evt})
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user