Compare commits

..

2 Commits

Author SHA1 Message Date
Dave Horton
a23dc50c20 lint 2024-05-03 08:56:55 -04:00
Dave Horton
888fddff37 possible fix for race condition in gather which ends but lets transcription continue 2024-05-03 08:55:16 -04:00
8 changed files with 118 additions and 289 deletions

View File

@@ -113,13 +113,11 @@ class CallSession extends Emitter {
this.requestor.on('command', this._onCommand.bind(this));
this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this));
this.requestor.on('handover', handover.bind(this));
this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this));
};
this.requestor.on('command', this._onCommand.bind(this));
this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this));
this.requestor.on('handover', handover.bind(this));
this.requestor.on('reconnect-error', this._onSessionReconnectError.bind(this));
}
/**
@@ -190,24 +188,6 @@ class CallSession extends Emitter {
this._synthesizer = synth;
}
/**
* ASR TTS fallback
*/
get hasFallbackAsr() {
return this._hasFallbackAsr || false;
}
set hasFallbackAsr(i) {
this._hasFallbackAsr = i;
}
get hasFallbackTts() {
return this._hasFallbackTts || false;
}
set hasFallbackTts(i) {
this._hasFallbackTts = i;
}
/**
* default vendor to use for speech synthesis if not provided in the app
*/
@@ -1577,22 +1557,6 @@ Duration=${duration} `
}, 'CallSession:_injectTasks - completed');
}
async _onSessionReconnectError(err) {
const {writeAlerts, AlertType} = this.srf.locals;
const sid = this.accountInfo.account.account_sid;
this.logger.info({err}, `_onSessionReconnectError for account ${sid}`);
try {
await writeAlerts({
alert_type: AlertType.WEBHOOK_CONNECTION_FAILURE,
account_sid: this.accountSid,
detail: `Session:reconnect error ${err}`
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
}
this._jambonzHangup();
}
_onCommand({msgid, command, call_sid, queueCommand, data}) {
this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command');
let resolution;
@@ -1868,7 +1832,6 @@ Duration=${duration} `
/**
* called when the caller has hung up. Provided for subclasses to override
* in order to apply logic at this point if needed.
* return true if success fallback, return false if not
*/
_callerHungup() {
assert(false, 'subclass responsibility to override this method');
@@ -2306,7 +2269,6 @@ Duration=${duration} `
_startActionHookNoResponseTimer(options) {
this._clearActionHookNoResponseTimer();
this._actionHookDelayResolved = false;
if (options.noResponseTimeoutMs) {
this.logger.debug(`CallSession:_startActionHookNoResponseTimer ${options.noResponseTimeoutMs}`);
this._actionHookNoResponseTimer = setTimeout(() => {
@@ -2320,9 +2282,7 @@ Duration=${duration} `
if (t.length) {
t[0].on('playDone', (err) => {
if (err) this.logger.error({err}, `Call-Session:exec Error delay action, play ${verb}`);
if (!this._actionHookDelayResolved) {
this._startActionHookNoResponseTimer(options);
}
this._startActionHookNoResponseTimer(options);
});
}
this.tasks.push(...t);
@@ -2340,16 +2300,7 @@ Duration=${duration} `
_clearActionHookNoResponseTimer() {
if (this._actionHookNoResponseTimer) {
// Action Hook delay is solved.
this._actionHookDelayResolved = true;
clearTimeout(this._actionHookNoResponseTimer);
// if delay action is enabled
// and bot has responded with list of new verbs
// Only kill current running play task.
//https://github.com/jambonz/jambonz-feature-server/issues/710
if (this.currentTask?.name === TaskName.Play) {
this.currentTask.kill(this);
}
}
this._actionHookNoResponseTimer = null;
}

View File

@@ -250,7 +250,8 @@ class TaskConfig extends Task {
cs.stopBackgroundTask('transcribe');
}
}
if (Object.keys(this.actionHookDelayAction).length !== 0) {
if (this.actionHookDelayAction) {
cs.actionHookDelayEnabled = this.actionHookDelayAction.enabled || false;
cs.actionHookNoResponseTimeout = this.actionHookDelayAction.noResponseTimeout || 0;
cs.actionHookNoResponseGiveUpTimeout = this.actionHookDelayAction.noResponseGiveUpTimeout || 0;

View File

@@ -174,7 +174,12 @@ class TaskGather extends SttTask {
this._startTranscribing(ep);
return updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
} catch (e) {
await this._startFallback(cs, ep, {error: e});
if (this.fallbackVendor && this.isHandledByPrimaryProvider) {
await this._fallback();
startListening(cs, ep);
} else {
this.logger.error({error: e}, 'error in initSpeech');
}
}
}
};
@@ -182,7 +187,12 @@ class TaskGather extends SttTask {
try {
if (this.sayTask) {
const {span, ctx} = this.startChildSpan(`nested:${this.sayTask.summary}`);
const process = () => {
this.sayTask.span = span;
this.sayTask.ctx = ctx;
this.sayTask.exec(cs, {ep}); // kicked off, _not_ waiting for it to complete
this.sayTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing tts');
this.logger.debug('Gather: nested say task completed');
if (!this.killed) {
startListening(cs, ep);
@@ -193,22 +203,16 @@ class TaskGather extends SttTask {
});
}
}
};
this.sayTask.span = span;
this.sayTask.ctx = ctx;
this.sayTask.exec(cs, {ep}) // kicked off, _not_ waiting for it to complete
.catch((err) => {
process();
});
this.sayTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing tts');
process();
});
}
else if (this.playTask) {
const {span, ctx} = this.startChildSpan(`nested:${this.playTask.summary}`);
const process = () => {
this.playTask.span = span;
this.playTask.ctx = ctx;
this.playTask.exec(cs, {ep}); // kicked off, _not_ waiting for it to complete
this.playTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing url');
this.logger.debug('Gather: nested play task completed');
if (!this.killed) {
startListening(cs, ep);
@@ -219,17 +223,6 @@ class TaskGather extends SttTask {
});
}
}
};
this.playTask.span = span;
this.playTask.ctx = ctx;
this.playTask.exec(cs, {ep}) // kicked off, _not_ waiting for it to complete
.catch((err) => {
process();
});
this.playTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing url');
process();
});
}
else {
@@ -901,9 +894,9 @@ class TaskGather extends SttTask {
_onTranscriptionComplete(cs, ep) {
this.logger.debug('TaskGather:_onTranscriptionComplete');
}
async _startFallback(cs, ep, evt) {
if (this.canFallback) {
async _onJambonzError(cs, ep, evt) {
this.logger.info({evt}, 'TaskGather:_onJambonzError');
if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
@@ -911,35 +904,17 @@ class TaskGather extends SttTask {
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {
this.logger.debug('gather:_startFallback');
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'});
await this._initFallback();
this._speechHandlersSet = false;
await this._setSpeechHandlers(cs, ep);
await this._fallback();
await this._initSpeech(cs, ep);
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
return true;
return;
} catch (error) {
this.logger.info({error}, `There is error while falling back to ${this.fallbackVendor}`);
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'not available'});
}
} else {
this.logger.debug('gather:_startFallback no condition for falling back');
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'not available'});
}
return false;
}
async _onJambonzError(cs, ep, evt) {
if (this.vendor === 'google' && evt.error_code === 0) {
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError - ignoring google error code 0');
return;
}
this.logger.info({evt}, 'TaskGather:_onJambonzError');
const {writeAlerts, AlertType} = cs.srf.locals;
if (this.vendor === 'nuance') {
const {code, error} = evt;
if (code === 404 && error === 'No speech') return this._resolve('timeout');
@@ -952,23 +927,17 @@ class TaskGather extends SttTask {
message: `Custom speech vendor ${this.vendor} error: ${evt.error}`,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
if (!(await this._startFallback(cs, ep, evt))) {
this.notifyTaskDone();
}
this.notifyError({msg: 'ASR error', details:`Custom speech vendor ${this.vendor} error: ${evt.error}`});
}
async _onVendorConnectFailure(cs, _ep, evt) {
_onVendorConnectFailure(cs, _ep, evt) {
super._onVendorConnectFailure(cs, _ep, evt);
if (!(await this._startFallback(cs, _ep, evt))) {
this.notifyTaskDone();
}
this.notifyTaskDone();
}
async _onVendorError(cs, _ep, evt) {
_onVendorError(cs, _ep, evt) {
super._onVendorError(cs, _ep, evt);
if (!(await this._startFallback(cs, _ep, evt))) {
this._resolve('stt-error', evt);
}
this._resolve('stt-error', evt);
}
_onVadDetected(cs, ep) {
@@ -995,6 +964,16 @@ class TaskGather extends SttTask {
async _resolve(reason, evt) {
this.logger.debug(`TaskGather:resolve with reason ${reason}`);
if (this.needsStt && this.ep && this.ep.connected) {
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => {
if (this.resolved) return;
this.logger.error({err}, 'Error stopping transcription');
});
}
if (this.resolved) return;
this.resolved = true;
@@ -1012,13 +991,6 @@ class TaskGather extends SttTask {
'stt.resolve': reason,
'stt.result': JSON.stringify(evt)
});
if (this.needsStt && this.ep && this.ep.connected) {
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, 'Error stopping transcription'));
}
if (this.callSession && this.callSession.callGone) {
this.logger.debug('TaskGather:_resolve - call is gone, not invoking web callback');

View File

@@ -117,6 +117,10 @@ class TaskSay extends Task {
alert_type: AlertType.TTS_NOT_PROVISIONED,
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
this.notifyError({
msg: 'TTS error',
details:`No speech credentials provisioned for selected vendor ${vendor}`
});
throw new Error('no provisioned speech credentials for TTS');
}
// synthesize all of the text elements
@@ -188,6 +192,7 @@ class TaskSay extends Task {
vendor,
detail: err.message
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
this.notifyError({msg: 'TTS error', details: err.message || err});
throw err;
}
};
@@ -210,16 +215,16 @@ class TaskSay extends Task {
await super.exec(cs);
this.ep = ep;
let vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ?
const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ?
this.synthesizer.vendor :
cs.speechSynthesisVendor;
let language = this.synthesizer.language && this.synthesizer.language !== 'default' ?
const language = this.synthesizer.language && this.synthesizer.language !== 'default' ?
this.synthesizer.language :
cs.speechSynthesisLanguage ;
let voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ?
const voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ?
this.synthesizer.voice :
cs.speechSynthesisVoice;
let label = this.synthesizer.label && this.synthesizer.label !== 'default' ?
const label = this.synthesizer.label && this.synthesizer.label !== 'default' ?
this.synthesizer.label :
cs.speechSynthesisLabel;
@@ -236,22 +241,12 @@ class TaskSay extends Task {
this.synthesizer.fallbackLabel :
cs.fallbackSpeechSynthesisLabel;
if (cs.hasFallbackTts) {
vendor = fallbackVendor;
language = fallbackLanguage;
voice = fallbackVoice;
label = fallbackLabel;
}
let filepath;
try {
filepath = await this._synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label});
} catch (error) {
if (fallbackVendor && this.isHandledByPrimaryProvider && !cs.hasFallbackTts) {
this.notifyError(
{ msg: 'TTS error', details:`TTS vendor ${vendor} error: ${error}`, failover: 'in progress'});
if (fallbackVendor && this.isHandledByPrimaryProvider) {
this.isHandledByPrimaryProvider = false;
cs.hasFallbackTts = true;
this.logger.info(`Synthesize error, fallback to ${fallbackVendor}`);
filepath = await this._synthesizeWithSpecificVendor(cs, ep,
{
@@ -261,8 +256,6 @@ class TaskSay extends Task {
label: fallbackLabel
});
} else {
this.notifyError(
{ msg: 'TTS error', details:`TTS vendor ${vendor} error: ${error}`, failover: 'not available'});
throw error;
}
}

View File

@@ -102,13 +102,6 @@ class SttTask extends Task {
this.fallbackLabel = cs.fallbackSpeechRecognizerLabel;
if (this.data.recognizer) this.data.recognizer.fallbackLabel = this.fallbackLabel;
}
// If call is already fallback to 2nd ASR vendor
// use that.
if (cs.hasFallbackAsr) {
this.vendor = this.fallbackVendor;
this.language = this.fallbackLanguage;
this.label = this.fallbackLabel;
}
if (!this.data.recognizer.vendor) {
this.data.recognizer.vendor = this.vendor;
}
@@ -126,19 +119,9 @@ class SttTask extends Task {
try {
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
} catch (error) {
if (this.canFallback) {
this.notifyError(
{
msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`,
failover: 'in progress'
});
await this._initFallback();
if (this.fallbackVendor && this.isHandledByPrimaryProvider) {
await this._fallback();
} else {
this.notifyError(
{
msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`,
failover: 'not available'
});
throw error;
}
}
@@ -181,6 +164,11 @@ class SttTask extends Task {
alert_type: AlertType.STT_NOT_PROVISIONED,
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no stt'));
// Notify application that STT vender is wrong.
this.notifyError({
msg: 'ASR error',
details: `No speech-to-text service credentials for ${vendor} have been configured`
});
this.notifyTaskDone();
throw new Error(`No speech-to-text service credentials for ${vendor} have been configured`);
}
@@ -202,14 +190,9 @@ class SttTask extends Task {
return credentials;
}
get canFallback() {
return this.fallbackVendor && this.isHandledByPrimaryProvider && !this.cs.hasFallbackAsr;
}
async _initFallback() {
async _fallback() {
assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration');
this.isHandledByPrimaryProvider = false;
this.cs.hasFallbackAsr = true;
this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`);
this.vendor = this.fallbackVendor;
this.language = this.fallbackLanguage;
@@ -218,8 +201,6 @@ class SttTask extends Task {
this.data.recognizer.language = this.language;
this.data.recognizer.label = this.label;
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
// cleanup previous listener from previous vendor
this.removeCustomEventListeners();
}
async compileHintsForCobalt(ep, hostport, model, token, hints) {
@@ -282,6 +263,7 @@ class SttTask extends Task {
detail: evt.error,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, `Error generating alert for ${this.vendor} connection failure`));
this.notifyError({msg: 'ASR error', details:`Failed connecting to speech vendor ${this.vendor}: ${evt.error}`});
}
_onVendorConnectFailure(cs, _ep, evt) {
@@ -294,6 +276,7 @@ class SttTask extends Task {
message: `Failed connecting to ${this.vendor} speech recognizer: ${reason}`,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, `Error generating alert for ${this.vendor} connection failure`));
this.notifyError({msg: 'ASR error', details:`Failed connecting to speech vendor ${this.vendor}: ${reason}`});
}
}

View File

@@ -44,7 +44,7 @@ class TaskTranscribe extends SttTask {
this.isContinuousAsr = true;
}
/* buffer speech for continuous asr */
this._bufferedTranscripts = [ [], [] ]; // for channel 1 and 2
this._bufferedTranscripts = [];
this.bugname_prefix = 'transcribe_';
this.paused = false;
}
@@ -80,15 +80,12 @@ class TaskTranscribe extends SttTask {
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
await this.awaitTaskDone();
} catch (err) {
if (!(await this._startFallback(cs, ep, {error: err}))) {
this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
this.removeCustomEventListeners();
return;
}
this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
}
await this.awaitTaskDone();
this.removeCustomEventListeners();
}
@@ -307,7 +304,6 @@ class TaskTranscribe extends SttTask {
// make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname');
const finished = fsEvent.getHeader('transcription-session-finished');
const bufferedTranscripts = this._bufferedTranscripts[channel - 1];
if (bugname && this.bugname !== bugname) return;
if (this.paused) {
this.logger.debug({evt}, 'TaskTranscribe:_onTranscription - paused, ignoring transcript');
@@ -317,14 +313,14 @@ class TaskTranscribe extends SttTask {
if (this.vendor === 'deepgram' && evt.type === 'UtteranceEnd') {
/* we will only get this when we have set utterance_end_ms */
if (bufferedTranscripts.length === 0) {
if (this._bufferedTranscripts.length === 0) {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram but no buffered transcripts');
}
else {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd event from deepgram, return buffered transcript');
evt = this.consolidateTranscripts(bufferedTranscripts, channel, this.language, this.vendor);
evt = this.consolidateTranscripts(this._bufferedTranscripts, 1, this.language, this.vendor);
evt.is_final = true;
this._bufferedTranscripts[channel - 1] = [];
this._bufferedTranscripts = [];
this._resolve(channel, evt);
}
return;
@@ -341,11 +337,11 @@ class TaskTranscribe extends SttTask {
let emptyTranscript = false;
if (evt.is_final) {
if (evt.alternatives.length === 0 || evt.alternatives[0].transcript === '' && !cs.callGone && !this.killed) {
if (evt.alternatives[0].transcript === '' && !cs.callGone && !this.killed) {
emptyTranscript = true;
if (finished === 'true' &&
['microsoft', 'deepgram'].includes(this.vendor) &&
bufferedTranscripts.length === 0) {
this._bufferedTranscripts.length === 0) {
this.logger.debug({evt}, 'TaskGather:_onTranscription - got empty transcript from old gather, disregarding');
return;
}
@@ -358,7 +354,7 @@ class TaskTranscribe extends SttTask {
'TaskGather:_onTranscription - got empty deepgram transcript during continous asr, continue listening');
return;
}
else if (this.vendor === 'deepgram' && bufferedTranscripts.length > 0) {
else if (this.vendor === 'deepgram' && this._bufferedTranscripts.length > 0) {
this.logger.info({evt},
'TaskGather:_onTranscription - got empty transcript from deepgram, return the buffered transcripts');
}
@@ -374,7 +370,7 @@ class TaskTranscribe extends SttTask {
}
}
this.logger.info({evt}, 'TaskGather:_onTranscription - got transcript during continous asr');
bufferedTranscripts.push(evt);
this._bufferedTranscripts.push(evt);
this._startAsrTimer(channel);
/* some STT engines will keep listening after a final response, so no need to restart */
@@ -390,12 +386,12 @@ class TaskTranscribe extends SttTask {
}
else if (this.vendor === 'deepgram') {
/* compile transcripts into one */
if (!emptyTranscript) bufferedTranscripts.push(evt);
if (!emptyTranscript) this._bufferedTranscripts.push(evt);
/* deepgram can send an empty and final transcript; only if we have any buffered should we resolve */
if (bufferedTranscripts.length === 0) return;
evt = this.consolidateTranscripts(bufferedTranscripts, channel, this.language);
this._bufferedTranscripts[channel - 1] = [];
if (this._bufferedTranscripts.length === 0) return;
evt = this.consolidateTranscripts(this._bufferedTranscripts, channel, this.language);
this._bufferedTranscripts = [];
}
/* here is where we return a final transcript */
@@ -414,7 +410,7 @@ class TaskTranscribe extends SttTask {
const originalEvent = evt.vendor.evt;
if (originalEvent.is_final && evt.alternatives[0].transcript !== '') {
this.logger.debug({evt}, 'Gather:_onTranscription - buffering a completed (partial) deepgram transcript');
bufferedTranscripts.push(evt);
this._bufferedTranscripts.push(evt);
}
}
@@ -515,8 +511,10 @@ class TaskTranscribe extends SttTask {
}
}
async _startFallback(cs, _ep, evt) {
if (this.canFallback) {
async _onJambonzError(cs, _ep, evt) {
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
if (this.paused) return;
if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
_ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
@@ -524,57 +522,37 @@ class TaskTranscribe extends SttTask {
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'});
await this._initFallback();
await this._fallback();
let channel = 1;
if (this.ep !== _ep) {
channel = 2;
}
this[`_speechHandlersSet_${channel}`] = false;
this._startTranscribing(cs, _ep, channel);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
return true;
return;
} catch (error) {
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'not available'});
this.logger.info({error}, `There is error while falling back to ${this.fallbackVendor}`);
}
} else {
this.logger.debug('transcribe:_startFallback no condition for falling back');
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'not available'});
}
return false;
}
const {writeAlerts, AlertType} = cs.srf.locals;
async _onJambonzError(cs, _ep, evt) {
if (this.vendor === 'google' && evt.error_code === 0) {
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError - ignoring google error code 0');
return;
}
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
if (this.paused) return;
const {writeAlerts, AlertType} = cs.srf.locals;
if (this.vendor === 'nuance') {
const {code, error} = evt;
if (code === 404 && error === 'No speech') return this._resolve('timeout');
if (code === 413 && error === 'Too much speech') return this._resolve('timeout');
}
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.STT_FAILURE,
message: `Custom speech vendor ${this.vendor} error: ${evt.error}`,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
if (!(await this._startFallback(cs, _ep, evt))) {
this.notifyTaskDone();
if (this.vendor === 'nuance') {
const {code, error} = evt;
if (code === 404 && error === 'No speech') return this._resolve('timeout');
if (code === 413 && error === 'Too much speech') return this._resolve('timeout');
}
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.STT_FAILURE,
message: `Custom speech vendor ${this.vendor} error: ${evt.error}`,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
this.notifyError({msg: 'ASR error', details:`Custom speech vendor ${this.vendor} error: ${evt.error}`});
}
}
async _onVendorConnectFailure(cs, _ep, channel, evt) {
_onVendorConnectFailure(cs, _ep, channel, evt) {
super._onVendorConnectFailure(cs, _ep, evt);
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
@@ -583,9 +561,7 @@ class TaskTranscribe extends SttTask {
});
this.childSpan[channel - 1].span.end();
}
if (!(await this._startFallback(cs, _ep, evt))) {
this.notifyTaskDone();
}
this.notifyTaskDone();
}
_startAsrTimer(channel) {
@@ -594,9 +570,8 @@ class TaskTranscribe extends SttTask {
this._clearAsrTimer(channel);
this._asrTimer = setTimeout(() => {
this.logger.debug(`TaskTranscribe:_startAsrTimer - asr timer went off for channel: ${channel}`);
const evt = this.consolidateTranscripts(
this._bufferedTranscripts[channel - 1], channel, this.language, this.vendor);
this._bufferedTranscripts[channel - 1] = [];
const evt = this.consolidateTranscripts(this._bufferedTranscripts, channel, this.language, this.vendor);
this._bufferedTranscripts = [];
this._resolve(channel, evt);
}, this.asrTimeout);
this.logger.debug(`TaskTranscribe:_startAsrTimer: set for ${this.asrTimeout}ms for channel ${channel}`);

View File

@@ -270,7 +270,7 @@ const normalizeDeepgram = (evt, channel, language, shortUtterance) => {
language_code: language,
channel_tag: channel,
is_final: shortUtterance ? evt.is_final : evt.speech_final,
alternatives: alternatives.length ? [alternatives[0]] : [],
alternatives: [alternatives[0]],
vendor: {
name: 'deepgram',
evt: copy

View File

@@ -56,12 +56,6 @@ class WsRequestor extends BaseRequestor {
}
if (type === 'session:new') this.call_sid = params.callSid;
if (type === 'session:reconnect') {
this._reconnectPromise = new Promise((resolve, reject) => {
this._reconnectResolve = resolve;
this._reconnectReject = reject;
});
}
/* if we have an absolute url, and it is http then do a standard webhook */
if (this._isAbsoluteUrl(url) && url.startsWith('http')) {
@@ -77,23 +71,20 @@ class WsRequestor extends BaseRequestor {
}
/* connect if necessary */
const queueMsg = () => {
this.logger.debug(
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
if (wantsAck) {
const p = new Promise((resolve, reject) => {
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
});
return p;
}
else {
this.queuedMsg.push({type, hook, params, httpHeaders});
}
return;
};
if (!this.ws) {
if (this.connectInProgress) {
return queueMsg();
this.logger.debug(
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
if (wantsAck) {
const p = new Promise((resolve, reject) => {
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
});
return p;
}
else {
this.queuedMsg.push({type, hook, params, httpHeaders});
}
return;
}
this.connectInProgress = true;
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`);
@@ -111,10 +102,6 @@ class WsRequestor extends BaseRequestor {
return Promise.reject(err);
}
}
// If jambonz wait for ack from reconnect, queue the msg until reconnect is acked
if (type !== 'session:reconnect' && this._reconnectPromise) {
return queueMsg();
}
assert(this.ws);
/* prepare and send message */
@@ -152,18 +139,6 @@ class WsRequestor extends BaseRequestor {
}
};
const rejectQueuedMsgs = (err) => {
if (this.queuedMsg.length > 0) {
for (const {promise} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for rejectQueuedMsgs`);
if (promise) {
promise.reject(err);
}
}
this.queuedMsg.length = 0;
}
};
//this.logger.debug({obj}, `websocket: sending (${url})`);
/* special case: reconnecting before we received ack to session:new */
@@ -204,37 +179,16 @@ class WsRequestor extends BaseRequestor {
this.logger.debug({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`);
this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']);
resolve(response);
if (this._reconnectResolve) {
this._reconnectResolve();
}
},
failure: (err) => {
if (this._reconnectReject) {
this._reconnectReject(err);
}
clearTimeout(timer);
reject(err);
}
});
/* send the message */
this.ws.send(JSON.stringify(obj), async() => {
this.ws.send(JSON.stringify(obj), () => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
// If session:reconnect is waiting for ack, hold here until ack to send queuedMsgs
if (this._reconnectPromise) {
try {
await this._reconnectPromise;
} catch (err) {
// bad thing happened to session:recconnect
rejectQueuedMsgs(err);
this.emit('reconnect-error');
return;
} finally {
this._reconnectPromise = null;
this._reconnectResolve = null;
this._reconnectReject = null;
}
}
sendQueuedMsgs();
});
});