Compare commits

...

5 Commits

Author SHA1 Message Date
Dave Horton
8938bf25dc race condition where gather transcribe is restarted after final transcript 2024-05-21 19:04:55 -04:00
Dave Horton
f7134d8fe7 more logging on restart of transcribing during gather 2024-05-10 14:12:34 -04:00
Dave Horton
a23dc50c20 lint 2024-05-03 08:56:55 -04:00
Dave Horton
888fddff37 possible fix for race condition in gather which ends but lets transcription continue 2024-05-03 08:55:16 -04:00
Dave Horton
e1497f90a8 update with various deepgram fixes, including for #700 2024-04-01 13:03:52 -04:00
3 changed files with 123 additions and 39 deletions

View File

@@ -877,6 +877,7 @@ class CallSession extends Emitter {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
const task = this.tasks.shift();
task._stackNum = `${stackNum}:${taskNum}`;
this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name}`);
this._notifyTaskStatus(task, {event: 'starting'});
// Register verbhook span wait for end

View File

@@ -107,7 +107,7 @@ class TaskGather extends SttTask {
}
async exec(cs, {ep}) {
this.logger.debug({options: this.data}, 'Gather:exec');
this.logger.debug({options: this.data}, `Gather:exec ${this.stackNum}`);
await super.exec(cs, {ep});
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
@@ -171,6 +171,7 @@ class TaskGather extends SttTask {
this.logger.info('Gather:exec - task was quickly killed so do not transcribe');
return;
}
this.logger.debug('Gather:exec - going to start transcribing (startListening)');
this._startTranscribing(ep);
return updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
} catch (e) {
@@ -235,9 +236,15 @@ class TaskGather extends SttTask {
if (this.input.includes('speech') && this.listenDuringPrompt) {
await this._setSpeechHandlers(cs, ep);
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
if (!this.resolved && !this.killed) {
this.logger.debug(`Gather:exec ${this.stackNum} - going to start transcribing (listenDuringPrompt)`);
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}
else {
this.logger.info(`Gather:exec ${this.stackNum} - task was killed or resolved before starting transcription`);
}
}
if (this.input.includes('digits') || this.dtmfBargein || this.asrDtmfTerminationDigit) {
@@ -482,7 +489,7 @@ class TaskGather extends SttTask {
locale: this.language,
interim: this.interim,
bugname: this.bugname
}, 'Gather:_startTranscribing');
}, `Gather:_startTranscribing ${this.stackNum}`);
/**
* Note: we don't need to ask deepgram for interim results, because they
@@ -700,7 +707,12 @@ class TaskGather extends SttTask {
// make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname');
const finished = fsEvent.getHeader('transcription-session-finished');
this.logger.debug({evt, bugname, finished, vendor: this.vendor}, 'Gather:_onTranscription raw transcript');
this.logger.debug({
evt,
bugname,
finished,
vendor: this.vendor
}, `Gather:_onTranscription ${this.stackNum} raw transcript`);
if (bugname && this.bugname !== bugname) return;
if (this.vendor === 'ibm' && evt?.state === 'listening') return;
@@ -793,14 +805,21 @@ class TaskGather extends SttTask {
this._startAsrTimer();
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram'].includes(this.vendor)) this._startTranscribing(ep);
if (!['soniox', 'aws', 'microsoft', 'deepgram'].includes(this.vendor)) {
this.logger.debug('Gather:_onTranscription - going to start transcribing again (continuous asr)');
this._startTranscribing(ep);
}
}
else {
if (this.bargein && (words + bufferedWords) < this.minBargeinWordCount) {
this.logger.debug({evt, words, bufferedWords},
'TaskGather:_onTranscription - final transcript but < min barge words');
this._bufferedTranscripts.push(evt);
if (!['soniox', 'aws', 'microsoft', 'deepgram'].includes(this.vendor)) this._startTranscribing(ep);
if (!['soniox', 'aws', 'microsoft', 'deepgram'].includes(this.vendor)) {
this.logger.debug(
`Gather:_onTranscription - start transcribing again (min bargein words=${this.minBargeinWordCount}`);
this._startTranscribing(ep);
}
return;
}
else {
@@ -866,7 +885,7 @@ class TaskGather extends SttTask {
}
}
_onEndOfUtterance(cs, ep) {
this.logger.debug('TaskGather:_onEndOfUtterance');
this.logger.debug(`TaskGather:_onEndOfUtterance ${this.stackNum}`);
if (this.bargein && this.minBargeinWordCount === 0) {
this._killAudio(cs);
}
@@ -881,6 +900,7 @@ class TaskGather extends SttTask {
* since we dont have a final transcript yet.
*/
if (!this.resolved && !this.killed && !this._bufferedTranscripts.length && this.wantsSingleUtterance) {
this.logger.debug('Gather:_onEndOfUtterance - start transcribing again (end of utterance/wantsSingleUtterance)');
this._startTranscribing(ep);
}
}
@@ -906,6 +926,7 @@ class TaskGather extends SttTask {
try {
await this._fallback();
await this._initSpeech(cs, ep);
this.logger.debug('Gather:_onJambonzError - going to start transcribing again (jambonz error)');
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
return;
@@ -955,7 +976,7 @@ class TaskGather extends SttTask {
this.logger.debug('TaskGather:_onNoSpeechDetected for old gather, ignoring');
}
else {
this.logger.debug('TaskGather:_onNoSpeechDetected - listen again');
this.logger.debug('Gather:_onNoSpeechDetected - going to start transcribing again');
this._startTranscribing(ep);
}
return;
@@ -963,7 +984,17 @@ class TaskGather extends SttTask {
}
async _resolve(reason, evt) {
this.logger.debug(`TaskGather:resolve with reason ${reason}`);
this.logger.debug(`TaskGather:resolve ${this.stackNum} with reason ${reason}`);
if (this.needsStt && this.ep && this.ep.connected) {
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => {
if (this.resolved) return;
this.logger.error({err}, 'Error stopping transcription');
});
}
if (this.resolved) return;
this.resolved = true;
@@ -981,13 +1012,6 @@ class TaskGather extends SttTask {
'stt.resolve': reason,
'stt.result': JSON.stringify(evt)
});
if (this.needsStt && this.ep && this.ep.connected) {
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, 'Error stopping transcription'));
}
if (this.callSession && this.callSession.callGone) {
this.logger.debug('TaskGather:_resolve - call is gone, not invoking web callback');

View File

@@ -303,12 +303,12 @@ class TaskTranscribe extends SttTask {
async _onTranscription(cs, ep, channel, evt, fsEvent) {
// make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname');
const finished = fsEvent.getHeader('transcription-session-finished');
if (bugname && this.bugname !== bugname) return;
if (this.paused) {
this.logger.debug({evt}, 'TaskTranscribe:_onTranscription - paused, ignoring transcript');
}
if (this.vendor === 'ibm' && evt?.state === 'listening') return;
if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') {
@@ -319,8 +319,9 @@ class TaskTranscribe extends SttTask {
else {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram, return buffered transcript');
evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language, this.vendor);
evt.is_final = true;
this._bufferedTranscripts = [];
this._resolve('speech', evt);
this._resolve(channel, evt);
}
return;
}
@@ -334,31 +335,89 @@ class TaskTranscribe extends SttTask {
return;
}
if (evt.alternatives[0]?.transcript === '' && !cs.callGone && !this.killed) {
if (['microsoft', 'deepgram'].includes(this.vendor)) {
this.logger.info({evt}, 'TaskTranscribe:_onTranscription - got empty transcript, continue listening');
let emptyTranscript = false;
if (evt.is_final) {
if (evt.alternatives[0].transcript === '' && !cs.callGone && !this.killed) {
emptyTranscript = true;
if (finished === 'true' &&
['microsoft', 'deepgram'].includes(this.vendor) &&
this._bufferedTranscripts.length === 0) {
this.logger.debug({evt}, 'TaskGather:_onTranscription - got empty transcript from old gather, disregarding');
return;
}
else if (this.vendor !== 'deepgram') {
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, continue listening');
return;
}
else if (this.isContinuousAsr) {
this.logger.info({evt},
'TaskGather:_onTranscription - got empty deepgram transcript during continous asr, continue listening');
return;
}
else if (this.vendor === 'deepgram' && this._bufferedTranscripts.length > 0) {
this.logger.info({evt},
'TaskGather:_onTranscription - got empty transcript from deepgram, return the buffered transcripts');
}
}
if (this.isContinuousAsr) {
/* append the transcript and start listening again for asrTimeout */
const t = evt.alternatives[0].transcript;
if (t) {
/* remove trailing punctuation */
if (/[,;:\.!\?]$/.test(t)) {
this.logger.debug('TaskGather:_onTranscription - removing trailing punctuation');
evt.alternatives[0].transcript = t.slice(0, -1);
}
}
this.logger.info({evt}, 'TaskGather:_onTranscription - got transcript during continous asr');
this._bufferedTranscripts.push(evt);
this._startAsrTimer(channel);
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google']
.includes(this.vendor)) this._startTranscribing(cs, ep, channel);
}
else {
this.logger.info({evt}, 'TaskTranscribe:_onTranscription - got empty transcript, listen again');
this._transcribe(ep);
}
return;
}
if (this.vendor === 'soniox') {
/* compile transcripts into one */
this._sonioxTranscripts.push(evt.vendor.finalWords);
evt = this.compileSonioxTranscripts(this._sonioxTranscripts, 1, this.language);
this._sonioxTranscripts = [];
}
else if (this.vendor === 'deepgram') {
/* compile transcripts into one */
if (!emptyTranscript) this._bufferedTranscripts.push(evt);
if (this.vendor === 'soniox') {
/* compile transcripts into one */
this._sonioxTranscripts.push(evt.vendor.finalWords);
if (evt.is_final) {
evt = this.compileSonioxTranscripts(this._sonioxTranscripts, 1, this.language);
this._sonioxTranscripts = [];
/* deepgram can send an empty and final transcript; only if we have any buffered should we resolve */
if (this._bufferedTranscripts.length === 0) return;
evt = this.consolidateTranscripts(this._bufferedTranscripts, channel, this.language);
this._bufferedTranscripts = [];
}
/* here is where we return a final transcript */
this.logger.debug({evt}, 'TaskTranscribe:_onTranscription - sending final transcript');
this._resolve(channel, evt);
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google']
.includes(this.vendor)) this._startTranscribing(cs, ep, channel);
}
}
else {
/* interim transcript */
if (this.isContinuousAsr && evt.is_final) {
this._bufferedTranscripts.push(evt);
this._startAsrTimer(channel);
} else {
await this._resolve(channel, evt);
/* deepgram can send a non-final transcript but with words that are final, so we need to buffer */
if (this.vendor === 'deepgram') {
const originalEvent = evt.vendor.evt;
if (originalEvent.is_final && evt.alternatives[0].transcript !== '') {
this.logger.debug({evt}, 'Gather:_onTranscription - buffering a completed (partial) deepgram transcript');
this._bufferedTranscripts.push(evt);
}
}
if (this.interim) {
this.logger.debug({evt}, 'TaskTranscribe:_onTranscription - sending interim transcript');
this._resolve(channel, evt);
}
}
}