Transcribe background task (#576)

* first draft

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* update verb-specification

* fix comment reviews

* provide bugname when stopping transcription, otherwise it will continue

---------

Co-authored-by: Dave Horton <daveh@beachdognet.com>
This commit is contained in:
Hoan Luu Huu
2023-12-27 09:50:51 +07:00
committed by GitHub
parent 3e8474867f
commit 9d8291f892
9 changed files with 342 additions and 176 deletions

View File

@@ -20,10 +20,8 @@ const WsRequestor = require('../utils/ws-requestor');
const {
JAMBONES_INJECT_CONTENT,
AWS_REGION,
JAMBONZ_RECORD_WS_BASE_URL,
JAMBONZ_RECORD_WS_USERNAME,
JAMBONZ_RECORD_WS_PASSWORD,
} = require('../config');
const BackgroundTaskManager = require('../utils/background-task-manager');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
@@ -66,6 +64,11 @@ class CallSession extends Emitter {
this.callGone = false;
this.notifiedComplete = false;
this.rootSpan = rootSpan;
this.backgroundTaskManager = new BackgroundTaskManager({
cs: this,
logger,
rootSpan
});
this._origRecognizerSettings = {
vendor: this.application?.speech_recognizer_vendor,
@@ -136,8 +139,7 @@ class CallSession extends Emitter {
}
get isBackGroundListen() {
return !(this.backgroundListenTask === null ||
this.backgroundListenTask === undefined);
return this.backgroundTaskManager.isTaskRunning('listen');
}
/**
@@ -379,11 +381,11 @@ class CallSession extends Emitter {
}
get isBotModeEnabled() {
return this.backgroundGatherTask;
return this.backgroundTaskManager.isTaskRunning('bargeIn');
}
get isListenEnabled() {
return this.backgroundListenTask;
return this.backgroundTaskManager.isTaskRunning('listen');
}
get b3() {
@@ -612,117 +614,41 @@ class CallSession extends Emitter {
}
}
async startBackgroundListen(opts, bugname) {
if (this.isListenEnabled) {
this.logger.info('CallSession:startBackgroundListen - listen is already enabled, ignoring request');
return;
}
try {
this.logger.debug({opts}, 'CallSession:startBackgroundListen');
const t = normalizeJambones(this.logger, [opts]);
this.backgroundListenTask = makeTask(this.logger, t[0]);
this.backgroundListenTask.bugname = bugname;
// Remove unneeded customer data to be sent to api server.
this.backgroundListenTask.ignoreCustomerData = true;
const resources = await this._evaluatePreconditions(this.backgroundListenTask);
const {span, ctx} = this.rootSpan.startChildSpan(`background-listen:${this.backgroundListenTask.summary}`);
this.backgroundListenTask.span = span;
this.backgroundListenTask.ctx = ctx;
this.backgroundListenTask.exec(this, resources)
.then(() => {
this.logger.info('CallSession:startBackgroundListen: listen completed');
this.backgroundListenTask && this.backgroundListenTask.removeAllListeners();
this.backgroundListenTask && this.backgroundListenTask.span.end();
this.backgroundListenTask = null;
return;
})
.catch((err) => {
this.logger.info({err}, 'CallSession:startBackgroundListen: listen threw error');
this.backgroundListenTask && this.backgroundListenTask.removeAllListeners();
this.backgroundListenTask && this.backgroundListenTask.span.end();
this.backgroundListenTask = null;
});
} catch (err) {
this.logger.info({err, opts}, 'CallSession:startBackgroundListen - Error creating listen task');
}
}
async stopBackgroundListen() {
this.logger.debug('CallSession:stopBackgroundListen');
try {
if (this.backgroundListenTask) {
this.backgroundListenTask.removeAllListeners();
this.backgroundListenTask.kill().catch(() => {});
}
} catch (err) {
this.logger.info({err}, 'CallSession:stopBackgroundListen - Error stopping listen task');
}
}
async enableBotMode(gather, autoEnable) {
try {
const t = normalizeJambones(this.logger, [gather]);
const task = makeTask(this.logger, t[0]);
let task;
if (this.isBotModeEnabled) {
const currInput = this.backgroundGatherTask.input;
const newInput = task.input;
task = this.backgroundTaskManager.getTask('bargeIn');
const currInput = task.input;
const t = normalizeJambones(this.logger, [gather]);
const tmpTask = makeTask(this.logger, t[0]);
const newInput = tmpTask.input;
if (JSON.stringify(currInput) === JSON.stringify(newInput)) {
this.logger.info('CallSession:enableBotMode - bot mode currently enabled, ignoring request to start again');
return;
}
else {
} else {
this.logger.info({currInput, newInput},
'CallSession:enableBotMode - restarting background gather to apply new input type');
this.backgroundGatherTask.sticky = false;
'CallSession:enableBotMode - restarting background bargeIn to apply new input type');
task.sticky = false;
await this.disableBotMode();
}
}
this.backgroundGatherTask = task;
this._bargeInEnabled = true;
this.backgroundGatherTask
.once('dtmf', this._clearTasks.bind(this, this.backgroundGatherTask))
.once('vad', this._clearTasks.bind(this, this.backgroundGatherTask))
.once('transcription', this._clearTasks.bind(this, this.backgroundGatherTask))
.once('timeout', this._clearTasks.bind(this, this.backgroundGatherTask));
this.logger.info({gather}, 'CallSession:enableBotMode - starting background gather');
const resources = await this._evaluatePreconditions(this.backgroundGatherTask);
const {span, ctx} = this.rootSpan.startChildSpan(`background-gather:${this.backgroundGatherTask.summary}`);
this.backgroundGatherTask.span = span;
this.backgroundGatherTask.ctx = ctx;
this.backgroundGatherTask.sticky = autoEnable;
this.backgroundGatherTask.exec(this, resources)
.then(() => {
this.logger.info('CallSession:enableBotMode: gather completed');
this.backgroundGatherTask && this.backgroundGatherTask.removeAllListeners();
this.backgroundGatherTask && this.backgroundGatherTask.span.end();
const sticky = this.backgroundGatherTask?.sticky;
this.backgroundGatherTask = null;
if (sticky && !this.callGone && !this._stopping && this._bargeInEnabled) {
this.logger.info('CallSession:enableBotMode: restarting background gather');
setImmediate(() => this.enableBotMode(gather, true));
}
return;
})
.catch((err) => {
this.logger.info({err}, 'CallSession:enableBotMode: gather threw error');
this.backgroundGatherTask && this.backgroundGatherTask.removeAllListeners();
this.backgroundGatherTask && this.backgroundGatherTask.span.end();
this.backgroundGatherTask = null;
});
task = await this.backgroundTaskManager.newTask('bargeIn', gather);
task.sticky = autoEnable;
task.once('bargeIn-done', () => {
if (this.requestor instanceof WsRequestor) {
try {
this.kill(true);
} catch (err) {}
}
});
this.logger.info({gather}, 'CallSession:enableBotMode - starting background bargeIn');
} catch (err) {
this.logger.info({err, gather}, 'CallSession:enableBotMode - Error creating gather task');
this.logger.info({err, gather}, 'CallSession:enableBotMode - Error creating bargeIn task');
}
}
async disableBotMode() {
this._bargeInEnabled = false;
if (this.backgroundGatherTask) {
try {
this.backgroundGatherTask.removeAllListeners();
await this.backgroundGatherTask.kill();
} catch (err) {}
this.backgroundGatherTask = null;
}
this.backgroundTaskManager.stop('bargeIn');
}
setConferenceDetails(memberId, confName, confUuid) {
@@ -906,7 +832,7 @@ class CallSession extends Emitter {
let skip = false;
this.currentTask = task;
if (TaskName.Gather === task.name && this.isBotModeEnabled) {
if (this.backgroundGatherTask.updateTaskInProgress(task) !== false) {
if (this.backgroundTaskManager.getTask('bargeIn').updateTaskInProgress(task) !== false) {
this.logger.info(`CallSession:exec skipping #${stackNum}:${taskNum}: ${task.name}`);
skip = true;
}
@@ -942,7 +868,7 @@ class CallSession extends Emitter {
this.requestor instanceof WsRequestor &&
!this.requestor.closedGracefully &&
!this.callGone &&
!this.disableWaitCommand
!this.isConfirmCallSession
) {
try {
await this._awaitCommandsOrHangup();
@@ -1394,7 +1320,6 @@ Duration=${duration} `
get the full transcription.
*/
delete t.bargeIn.enable;
this._bargeInEnabled = false;
this.logger.info('CallSession:kill - found bargein disabled in the stack, clearing to that point');
break;
}
@@ -1700,6 +1625,8 @@ Duration=${duration} `
this.notifier && this.notifier.close();
this.rootSpan && this.rootSpan.end();
// close all background tasks
this.backgroundTaskManager.stopAll();
}
/**
@@ -2016,12 +1943,14 @@ Duration=${duration} `
async _notifyCallStatusChange({callStatus, sipStatus, sipReason, duration}) {
if (this.callMoved) return;
// manage record all call.
if (callStatus === CallStatus.InProgress) {
// nice, call is in progress, good time to enable record
await this.enableRecordAllCall();
} else if (callStatus == CallStatus.Completed && this.isBackGroundListen) {
this.stopBackgroundListen().catch((err) => this.logger.error(
{err}, 'CallSession:_notifyCallStatusChange - error stopping background listen'));
if (this.accountInfo.account.record_all_calls ||
this.application.record_all_calls) {
this.backgroundTaskManager.newTask('record');
}
} else if (callStatus == CallStatus.Completed) {
this.backgroundTaskManager.stop('record');
}
/* race condition: we hang up at the same time as the caller */
@@ -2058,29 +1987,6 @@ Duration=${duration} `
}
}
async enableRecordAllCall() {
if (this.accountInfo.account.record_all_calls || this.application.record_all_calls) {
if (!JAMBONZ_RECORD_WS_BASE_URL || !this.accountInfo.account.bucket_credential) {
this.logger.error('Record all calls: invalid configuration');
return;
}
const listenOpts = {
url: `${JAMBONZ_RECORD_WS_BASE_URL}/record/${this.accountInfo.account.bucket_credential.vendor}`,
disableBidirectionalAudio: true,
mixType : 'stereo',
passDtmf: true
};
if (JAMBONZ_RECORD_WS_USERNAME && JAMBONZ_RECORD_WS_PASSWORD) {
listenOpts.wsAuth = {
username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD
};
}
this.logger.debug({listenOpts}, 'Record all calls: enabling listen');
await this.startBackgroundListen({verb: 'listen', ...listenOpts}, 'jambonz-session-record');
}
}
_configMsEndpoint() {
if (this.onHoldMusic) {
this.ep.set({hold_music: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`});
@@ -2123,6 +2029,18 @@ Duration=${duration} `
} catch (err) {}
}
}
/**
* startBackgroundTask - Start background task
*/
async startBackgroundTask(type, opts) {
await this.backgroundTaskManager.newTask(type, opts);
}
stopBackgroundTask(type) {
this.backgroundTaskManager.stop(type);
}
}
module.exports = CallSession;

View File

@@ -23,7 +23,6 @@ class ConfirmCallSession extends CallSession {
});
this.dlg = dlg;
this.ep = ep;
this.disableWaitCommand = true;
}
/**

View File

@@ -9,7 +9,8 @@ class TaskConfig extends Task {
'recognizer',
'bargeIn',
'record',
'listen'
'listen',
'transcribe'
].forEach((k) => this[k] = this.data[k] || {});
if ('notifyEvents' in this.data) {
@@ -30,6 +31,13 @@ class TaskConfig extends Task {
if (this.bargeIn[k]) this.gatherOpts[k] = this.bargeIn[k];
});
}
if (this.transcribe?.enable) {
this.transcribeOpts = {
verb: 'transcribe',
...this.transcribe
};
delete this.transcribeOpts.enable;
}
if (this.data.reset) {
if (typeof this.data.reset === 'string') this.data.reset = [this.data.reset];
@@ -37,7 +45,11 @@ class TaskConfig extends Task {
else this.data.reset = [];
if (this.bargeIn.sticky) this.autoEnable = true;
this.preconditions = (this.bargeIn.enable || this.record?.action || this.listen?.url || this.data.amd) ?
this.preconditions = (this.bargeIn.enable ||
this.record?.action ||
this.listen?.url ||
this.data.amd ||
this.transcribe?.enable) ?
TaskPreconditions.Endpoint :
TaskPreconditions.None;
@@ -50,6 +62,7 @@ class TaskConfig extends Task {
get hasRecognizer() { return Object.keys(this.recognizer).length; }
get hasRecording() { return Object.keys(this.record).length; }
get hasListen() { return Object.keys(this.listen).length; }
get hasTranscribe() { return Object.keys(this.transcribe).length; }
get summary() {
const phrase = [];
@@ -72,6 +85,9 @@ class TaskConfig extends Task {
if (this.hasListen) {
phrase.push(this.listen.enable ? `listen ${this.listen.url}` : 'stop listen');
}
if (this.hasTranscribe) {
phrase.push(this.transcribe.enable ? `transcribe ${this.transcribe.transcriptionHook}` : 'stop transcribe');
}
if (this.data.amd) phrase.push('enable amd');
if (this.notifyEvents) phrase.push(`event notification ${this.notifyEvents ? 'on' : 'off'}`);
if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`);
@@ -212,10 +228,25 @@ class TaskConfig extends Task {
const {enable, ...opts} = this.listen;
if (enable) {
this.logger.debug({opts}, 'Config: enabling listen');
cs.startBackgroundListen({verb: 'listen', ...opts});
cs.startBackgroundTask('listen', {verb: 'listen', ...opts});
} else {
this.logger.info('Config: disabling listen');
cs.stopBackgroundListen();
cs.stopBackgroundTask('listen');
}
}
if (this.hasTranscribe) {
if (this.transcribe.enable) {
this.transcribeOpts.recognizer = this.hasRecognizer ?
this.recognizer :
{
vendor: cs.speechRecognizerVendor,
language: cs.speechRecognizerLanguage
};
this.logger.debug(this.transcribeOpts, 'Config: enabling transcribe');
cs.startBackgroundTask('transcribe', this.transcribeOpts);
} else {
this.logger.info('Config: disabling transcribe');
cs.stopBackgroundTask('transcribe');
}
}
if (this.data.sipRequestWithinDialogHook) {

View File

@@ -71,6 +71,7 @@ class TaskGather extends SttTask {
/* buffer speech for continuous asr */
this._bufferedTranscripts = [];
this.partialTranscriptsCount = 0;
this.bugname_prefix = 'gather_';
}
get name() { return TaskName.Gather; }
@@ -261,7 +262,10 @@ class TaskGather extends SttTask {
if (this.digitBuffer.length === 0 && this.needsStt) {
// DTMF is higher priority than STT.
this.removeSpeechListeners(ep);
ep.stopTranscription({vendor: this.vendor})
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
})
.catch((err) => this.logger.error({err},
` Received DTMF, Error stopping transcription for vendor ${this.vendor}`));
}
@@ -300,7 +304,7 @@ class TaskGather extends SttTask {
const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer);
switch (this.vendor) {
case 'google':
this.bugname = 'google_transcribe';
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
@@ -308,19 +312,19 @@ class TaskGather extends SttTask {
case 'aws':
case 'polly':
this.bugname = 'aws_transcribe';
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'microsoft':
this.bugname = 'azure_transcribe';
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this._onNoSpeechDetected.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.VadDetected, this._onVadDetected.bind(this, cs, ep));
break;
case 'nuance':
this.bugname = 'nuance_transcribe';
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NuanceTranscriptionEvents.StartOfSpeech,
@@ -337,7 +341,7 @@ class TaskGather extends SttTask {
break;
case 'deepgram':
this.bugname = 'deepgram_transcribe';
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(DeepgramTranscriptionEvents.ConnectFailure,
@@ -348,12 +352,12 @@ class TaskGather extends SttTask {
break;
case 'soniox':
this.bugname = 'soniox_transcribe';
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
break;
case 'cobalt':
this.bugname = 'cobalt_transcribe';
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
/* cobalt doesnt have language, it has model, which is required */
@@ -383,7 +387,7 @@ class TaskGather extends SttTask {
break;
case 'ibm':
this.bugname = 'ibm_transcribe';
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(IbmTranscriptionEvents.ConnectFailure,
@@ -391,7 +395,7 @@ class TaskGather extends SttTask {
break;
case 'nvidia':
this.bugname = 'nvidia_transcribe';
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(NvidiaTranscriptionEvents.StartOfSpeech,
@@ -408,7 +412,7 @@ class TaskGather extends SttTask {
break;
case 'assemblyai':
this.bugname = 'assemblyai_transcribe';
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
@@ -418,7 +422,7 @@ class TaskGather extends SttTask {
break;
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.vendor}_transcribe`;
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
@@ -752,7 +756,10 @@ class TaskGather extends SttTask {
async _onJambonzError(cs, ep, evt) {
this.logger.info({evt}, 'TaskGather:_onJambonzError');
if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
ep.stopTranscription({vendor: this.vendor})
ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {
@@ -833,7 +840,10 @@ class TaskGather extends SttTask {
'stt.result': JSON.stringify(evt)
});
if (this.needsStt && this.ep && this.ep.connected) {
this.ep.stopTranscription({vendor: this.vendor})
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, 'Error stopping transcription'));
}

View File

@@ -49,6 +49,8 @@ class SttTask extends Task {
/* buffer for soniox transcripts */
this._sonioxTranscripts = [];
/*bug name prefix */
this.bugname_prefix = '';
}

View File

@@ -33,13 +33,14 @@ class TaskTranscribe extends SttTask {
this.childSpan = [null, null];
// Continuos asr timeout
// Continuous asr timeout
this.asrTimeout = typeof this.data.recognizer.asrTimeout === 'number' ? this.data.recognizer.asrTimeout * 1000 : 0;
if (this.asrTimeout > 0) {
this.isContinuousAsr = true;
}
/* buffer speech for continuous asr */
this._bufferedTranscripts = [];
this.bugname_prefix = 'transcribe_';
}
get name() { return TaskName.Transcribe; }
@@ -86,7 +87,10 @@ class TaskTranscribe extends SttTask {
let stopTranscription = false;
if (this.ep?.connected) {
stopTranscription = true;
this.ep.stopTranscription({vendor: this.vendor})
this.ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
}
if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) {
@@ -137,7 +141,7 @@ class TaskTranscribe extends SttTask {
const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer);
switch (this.vendor) {
case 'google':
this.bugname = 'google_transcribe';
this.bugname = `${this.bugname_prefix}google_transcribe`;
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected,
@@ -148,7 +152,7 @@ class TaskTranscribe extends SttTask {
case 'aws':
case 'polly':
this.bugname = 'aws_transcribe';
this.bugname = `${this.bugname_prefix}aws_transcribe`;
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AwsTranscriptionEvents.NoAudioDetected,
@@ -157,19 +161,19 @@ class TaskTranscribe extends SttTask {
this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break;
case 'microsoft':
this.bugname = 'azure_transcribe';
this.bugname = `${this.bugname_prefix}azure_transcribe`;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this._onNoAudio.bind(this, cs, ep, channel));
break;
case 'nuance':
this.bugname = 'nuance_transcribe';
this.bugname = `${this.bugname_prefix}nuance_transcribe`;
ep.addCustomEventListener(NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'deepgram':
this.bugname = 'deepgram_transcribe';
this.bugname = `${this.bugname_prefix}deepgram_transcribe`;
ep.addCustomEventListener(DeepgramTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(DeepgramTranscriptionEvents.Connect,
@@ -182,12 +186,12 @@ class TaskTranscribe extends SttTask {
break;
case 'soniox':
this.bugname = 'soniox_transcribe';
this.bugname = `${this.bugname_prefix}soniox_transcribe`;
ep.addCustomEventListener(SonioxTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'cobalt':
this.bugname = 'cobalt_transcribe';
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
ep.addCustomEventListener(CobaltTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
@@ -217,7 +221,7 @@ class TaskTranscribe extends SttTask {
break;
case 'ibm':
this.bugname = 'ibm_transcribe';
this.bugname = `${this.bugname_prefix}ibm_transcribe`;
ep.addCustomEventListener(IbmTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(IbmTranscriptionEvents.Connect,
@@ -227,13 +231,13 @@ class TaskTranscribe extends SttTask {
break;
case 'nvidia':
this.bugname = 'nvidia_transcribe';
this.bugname = `${this.bugname_prefix}nvidia_transcribe`;
ep.addCustomEventListener(NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'assemblyai':
this.bugname = 'assemblyai_transcribe';
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`;
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(AssemblyAiTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
@@ -244,7 +248,7 @@ class TaskTranscribe extends SttTask {
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.vendor}_transcribe`;
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
@@ -275,6 +279,8 @@ class TaskTranscribe extends SttTask {
}
async _transcribe(ep) {
this.logger.debug(
`TaskTranscribe:_transcribe - starting transcription vendor ${this.vendor} bugname ${this.bugname}`);
await ep.startTranscription({
vendor: this.vendor,
interim: this.interim ? true : false,
@@ -434,7 +440,10 @@ class TaskTranscribe extends SttTask {
async _onJambonzError(cs, _ep, evt) {
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
_ep.stopTranscription({vendor: this.vendor})
_ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {

View File

@@ -0,0 +1,197 @@
const { normalizeJambones } = require('@jambonz/verb-specifications');
const makeTask = require('../tasks/make_task');
const { JAMBONZ_RECORD_WS_BASE_URL, JAMBONZ_RECORD_WS_USERNAME, JAMBONZ_RECORD_WS_PASSWORD } = require('../config');
const Emitter = require('events');
class BackgroundTaskManager extends Emitter {
constructor({cs, logger, rootSpan}) {
super();
this.tasks = new Map();
this.cs = cs;
this.logger = logger;
this.rootSpan = rootSpan;
}
isTaskRunning(type) {
return this.tasks.has(type);
}
getTask(type) {
if (this.tasks.has(type)) {
return this.tasks.get(type);
}
}
count() {
return this.tasks.size;
}
async newTask(type, taskOpts) {
this.logger.info({taskOpts}, `initiating Background task ${type}`);
if (this.tasks.has(type)) {
this.logger.info(`Background task ${type} is running, skiped`);
return;
}
let task;
switch (type) {
case 'listen':
task = await this._initListen(taskOpts);
break;
case 'bargeIn':
task = await this._initBargeIn(taskOpts);
break;
case 'record':
task = await this._initRecord();
break;
case 'transcribe':
task = await this._initTranscribe(taskOpts);
break;
default:
break;
}
if (task) {
this.tasks.set(type, task);
}
return task;
}
stop(type) {
const task = this.getTask(type);
if (task) {
this.logger.info(`stopping background task: ${type}`);
task.removeAllListeners();
task.span.end();
task.kill();
// Remove task from managed List
this.tasks.delete(type);
} else {
this.logger.info(`stopping background task, ${type} is not running, skipped`);
}
}
stopAll() {
this.logger.info('BackgroundTaskManager:stopAll');
for (const key of this.tasks.keys()) {
this.stop(key);
}
}
// Initiate Task
// Initiate Listen
async _initListen(opts, bugname = 'jambonz-background-listen', ignoreCustomerData = false, type = 'listen') {
let task;
try {
const t = normalizeJambones(this.logger, [opts]);
task = makeTask(this.logger, t[0]);
task.bugname = bugname;
task.ignoreCustomerData = ignoreCustomerData;
const resources = await this.cs._evaluatePreconditions(task);
const {span, ctx} = this.rootSpan.startChildSpan(`background-${type}:${task.summary}`);
task.span = span;
task.ctx = ctx;
task.exec(this.cs, resources)
.then(this._taskCompleted.bind(this, type, task))
.catch(this._taskError.bind(this, type, task));
} catch (err) {
this.logger.info({err, opts}, `BackgroundTaskManager:_initListen - Error creating ${bugname} task`);
}
return task;
}
// Initiate Gather
async _initBargeIn(opts) {
let task;
try {
const t = normalizeJambones(this.logger, [opts]);
task = makeTask(this.logger, t[0]);
task
.once('dtmf', this._bargeInTaskCompleted.bind(this))
.once('vad', this._bargeInTaskCompleted.bind(this))
.once('transcription', this._bargeInTaskCompleted.bind(this))
.once('timeout', this._bargeInTaskCompleted.bind(this));
const resources = await this.cs._evaluatePreconditions(task);
const {span, ctx} = this.rootSpan.startChildSpan(`background-bargeIn:${task.summary}`);
task.span = span;
task.ctx = ctx;
task.bugname_prefix = 'background_bargeIn_';
task.exec(this.cs, resources)
.then(() => {
this._taskCompleted('bargeIn', task);
if (task.sticky && !this.cs.callGone && !this.cs._stopping) {
this.logger.info('BackgroundTaskManager:_initBargeIn: restarting background bargeIn');
this.newTask('bargeIn', opts);
}
return;
})
.catch(this._taskError.bind(this, 'bargeIn', task));
} catch (err) {
this.logger.info(err, 'BackgroundTaskManager:_initGather - Error creating bargeIn task');
}
return task;
}
// Initiate Record
async _initRecord() {
if (this.cs.accountInfo.account.record_all_calls || this.cs.application.record_all_calls) {
if (!JAMBONZ_RECORD_WS_BASE_URL || !this.cs.accountInfo.account.bucket_credential) {
this.logger.error(`_initRecord: invalid configuration,
missing JAMBONZ_RECORD_WS_BASE_URL or bucket configuration`);
return undefined;
}
const listenOpts = {
url: `${JAMBONZ_RECORD_WS_BASE_URL}/record/${this.cs.accountInfo.account.bucket_credential.vendor}`,
disableBidirectionalAudio: true,
mixType : 'stereo',
passDtmf: true
};
if (JAMBONZ_RECORD_WS_USERNAME && JAMBONZ_RECORD_WS_PASSWORD) {
listenOpts.wsAuth = {
username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD
};
}
this.logger.debug({listenOpts}, '_initRecord: enabling listen');
return await this._initListen({verb: 'listen', ...listenOpts}, 'jambonz-session-record', true, 'record');
}
}
// Initiate Transcribe
async _initTranscribe(opts) {
let task;
try {
const t = normalizeJambones(this.logger, [opts]);
task = makeTask(this.logger, t[0]);
const resources = await this.cs._evaluatePreconditions(task);
const {span, ctx} = this.rootSpan.startChildSpan(`background-transcribe:${task.summary}`);
task.span = span;
task.ctx = ctx;
task.bugname_prefix = 'background_transcribe_';
task.exec(this.cs, resources)
.then(this._taskCompleted.bind(this, 'transcribe', task))
.catch(this._taskError.bind(this, 'transcribe', task));
} catch (err) {
this.logger.info(err, 'BackgroundTaskManager:_initTranscribe - Error creating transcribe task');
}
return task;
}
_taskCompleted(type, task) {
this.logger.info({type, task}, 'BackgroundTaskManager:_taskCompleted: task completed');
task.removeAllListeners();
task.span.end();
this.tasks.delete(type);
}
_taskError(type, task, error) {
this.logger.info({type, task, error}, 'BackgroundTaskManager:_taskError: task Error');
task.removeAllListeners();
task.span.end();
this.tasks.delete(type);
}
_bargeInTaskCompleted(evt) {
this.logger.info({evt}, 'BackgroundTaskManager:_bargeInTaskCompleted on event from background bargeIn');
this.emit('bargeIn-done', evt);
}
}
module.exports = BackgroundTaskManager;

14
package-lock.json generated
View File

@@ -18,7 +18,7 @@
"@jambonz/speech-utils": "^0.0.33",
"@jambonz/stats-collector": "^0.1.9",
"@jambonz/time-series": "^0.2.8",
"@jambonz/verb-specifications": "^0.0.46",
"@jambonz/verb-specifications": "^0.0.50",
"@opentelemetry/api": "^1.4.0",
"@opentelemetry/exporter-jaeger": "^1.9.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.35.0",
@@ -4347,9 +4347,9 @@
}
},
"node_modules/@jambonz/verb-specifications": {
"version": "0.0.46",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.46.tgz",
"integrity": "sha512-wf/Pb1Iyg+waepYv9z138dwIKR4V7gB/HRlpxjl5lzogFT8VxWCciKJYfDT+xrvZXPBDfMLCZNhmjmN4dZx12g==",
"version": "0.0.50",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.50.tgz",
"integrity": "sha512-lJurJWbGbh8BgJEQ4mS4ViHG4d23yB0LnjnxTAB7OFsehs4gyS8L2bXHcgPckGCwc+h5HV6Aq85u01I5vJM9qA==",
"dependencies": {
"debug": "^4.3.4",
"pino": "^8.8.0"
@@ -15572,9 +15572,9 @@
}
},
"@jambonz/verb-specifications": {
"version": "0.0.46",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.46.tgz",
"integrity": "sha512-wf/Pb1Iyg+waepYv9z138dwIKR4V7gB/HRlpxjl5lzogFT8VxWCciKJYfDT+xrvZXPBDfMLCZNhmjmN4dZx12g==",
"version": "0.0.50",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.50.tgz",
"integrity": "sha512-lJurJWbGbh8BgJEQ4mS4ViHG4d23yB0LnjnxTAB7OFsehs4gyS8L2bXHcgPckGCwc+h5HV6Aq85u01I5vJM9qA==",
"requires": {
"debug": "^4.3.4",
"pino": "^8.8.0"

View File

@@ -34,7 +34,7 @@
"@jambonz/speech-utils": "^0.0.33",
"@jambonz/stats-collector": "^0.1.9",
"@jambonz/time-series": "^0.2.8",
"@jambonz/verb-specifications": "^0.0.46",
"@jambonz/verb-specifications": "^0.0.50",
"@opentelemetry/api": "^1.4.0",
"@opentelemetry/exporter-jaeger": "^1.9.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.35.0",