add queueing of tasks

This commit is contained in:
akirilyuk
2022-02-01 14:19:28 +01:00
parent db6f56a068
commit 28cde62d5d
4 changed files with 87 additions and 24 deletions

View File

@@ -3,6 +3,7 @@ const {TaskName, TaskPreconditions} = require('../../utils/constants');
const makeTask = require('../make_task');
const { SocketClient } = require('@cognigy/socket-client');
const SpeechConfig = require('./speech-config');
const queue = require('queue');
const parseGallery = (obj = {}) => {
const {_default} = obj;
@@ -48,6 +49,9 @@ class Cognigy extends Task {
this.prompts = [];
this.retry = {};
this.timeoutCount = 0;
// create a task queue so we can execute our taskss subsequently
// also executing tasks whenever they come in
this.taskQueue = queue({concurrency: 1, autostart: true});
}
get name() { return TaskName.Cognigy; }
@@ -56,6 +60,27 @@ class Cognigy extends Task {
return this.reportedFinalAction || this.isReplacingApplication;
}
async _enqueueTask(task) {
let resolver;
let rejector;
const taskPromise = new Promise(async(resolve, reject) => {
resolver = resolve;
rejector = reject;
});
this.taskQueue.push(async(cb) => {
try {
const result = await task.bind(this)();
resolver(result);
cb(result);
} catch (err) {
rejector(err);
cb(err);
}
});
this.taskQueue.lastPromise = taskPromise;
return taskPromise;
}
async exec(cs, ep) {
await super.exec(cs);
@@ -119,12 +144,16 @@ class Cognigy extends Task {
super.kill(cs);
this.logger.debug('Cognigy:kill');
this.removeAllListeners();
this.transcribeTask && this.transcribeTask.kill();
this.client.removeAllListeners();
if (this.client && this.client.connected) this.client.disconnect();
// end the task queue AFTER we have removed all listeneres since now we cannot get new stuff inside the queue
this.taskQueue.end();
if (!this.hasReportedFinalAction) {
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'caller hungup'})
@@ -176,6 +205,17 @@ class Cognigy extends Task {
return refer;
}
/* if we need to interrupt the currently-running say task(s), call this */
_killSayTasks(ep) {
// this will also remove all other upcoming tasks after the say task
// maybe we need a flow to kill only one say tasks and keep others executitng need to discuss this further
// this.taskQueue.end();
if (ep && ep.connected) {
ep.api('uuid_break', this.ep.uuid)
.catch((err) => this.logger.info({err}, 'Cognigy:_killSayTasks - error killing audio for current say task'));
}
}
async _onBotError(cs, ep, evt) {
this.logger.info({evt}, 'Cognigy:_onBotError');
this.performAction({cognigyResult: 'botError', message: evt.message });
@@ -184,16 +224,10 @@ class Cognigy extends Task {
}
async _onBotFinalPing(cs, ep) {
this.logger.info({}, 'TEST FROM ALEX');
this.logger.info({prompts: this.prompts}, 'Cognigy:_onBotFinalPing');
if (this.prompts.length) {
const text = this.prompts.join('.');
if (text && !this.killed) {
this.gatherTask = this._makeGatherTask({textPrompt: text});
this.gatherTask.exec(cs, ep, this)
.catch((err) => this.logger.info({err}, 'Cognigy gather task returned error'));
}
}
this.gatherTask = this._makeGatherTask({textPrompt: ""});
this.gatherTask.exec(cs, ep, this)
.catch((err) => this.logger.info({err}, 'Cognigy gather task returned error'));
this.prompts = [];
}
@@ -215,22 +249,36 @@ class Cognigy extends Task {
});
}
const text = parseBotText(evt);
if (evt.data) this.config.update(evt.data);
if (text) {
await this._enqueueTask(async() => {
await this._makeSayTask(text);
});
}
if (evt && evt.data && evt.data.type) {
try {
switch (evt.data.type) {
case 'hangup':
await this._makeHangupTask(evt.data.reason);
this.performAction({cognigyResult: 'hangup Succeeded'});
this.reportedFinalAction = true;
this.notifyTaskDone();
this.kill(cs);
await this._enqueueTask(async() => {
await this._makeHangupTask(evt.data.reason);
this.performAction({cognigyResult: 'hangup Succeeded'});
this.reportedFinalAction = true;
this.notifyTaskDone();
this.kill(cs);
});
return;
case 'refer':
await this._makeReferTask(evt.data.number);
this.performAction({cognigyResult: 'refer succeeded'});
this.reportedFinalAction = true;
this.notifyTaskDone();
this.kill(cs);
await this._enqueueTask(async() => {
await this._makeReferTask(evt.data.number);
this.performAction({cognigyResult: 'refer succeeded'});
this.reportedFinalAction = true;
this.notifyTaskDone();
this.kill(cs);
});
return;
default:
break;
@@ -241,12 +289,7 @@ class Cognigy extends Task {
this.reportedFinalAction = true;
this.notifyTaskDone();
}
}
const text = parseBotText(evt);
if (evt.data) this.config.update(evt.data);
if (text) this.prompts.push(text);
}
async _onTranscription(cs, ep, evt) {