Feature/queue webhooks (#34)

* initial changes for queue webhooks

* send queue leave webhook when dequeued

* bugfix: if enqeue task is killed because it is being replaced with new app supplied by LCC, ignore any app returned from the actionHook as LCC takes precedence

* remove leftover merge brackets
This commit is contained in:
Dave Horton
2021-07-31 13:32:40 -04:00
committed by GitHub
parent 02f5efba48
commit 1a2aaf9845
6 changed files with 131 additions and 37 deletions

View File

@@ -3,7 +3,7 @@ const Emitter = require('events');
const ConfirmCallSession = require('../session/confirm-call-session');
const normalizeJambones = require('../utils/normalize-jambones');
const makeTask = require('./make_task');
const {TaskName, TaskPreconditions, QueueResults} = require('../utils/constants');
const {TaskName, TaskPreconditions, QueueResults, KillReason} = require('../utils/constants');
const bent = require('bent');
const assert = require('assert');
@@ -61,10 +61,11 @@ class TaskEnqueue extends Task {
}
}
async kill(cs) {
async kill(cs, reason) {
super.kill(cs);
this.logger.info(`TaskEnqueue:kill ${this.queueName}`);
this.emitter.emit('kill');
this.killReason = reason || KillReason.Hangup;
this.logger.info(`TaskEnqueue:kill ${this.queueName} with reason ${this.killReason}`);
this.emitter.emit('kill', reason || KillReason.Hangup);
this.notifyTaskDone();
}
@@ -76,11 +77,20 @@ class TaskEnqueue extends Task {
const members = await pushBack(this.queueName, url);
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
this.notifyUrl = url;
/* invoke account-level webhook for queue event notifications */
cs.performQueueWebhook({
event: 'join',
queue: this.data.name,
length: members,
joinTime: this.waitStartTime
});
}
async _removeFromQueue(cs, dlg) {
const {removeFromList} = cs.srf.locals.dbHelpers;
return await removeFromList(this.queueName, getUrl(cs));
async _removeFromQueue(cs) {
const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers;
await removeFromList(this.queueName, getUrl(cs));
return await lengthOfList(this.queueName);
}
async performAction() {
@@ -89,7 +99,7 @@ class TaskEnqueue extends Task {
queueTime: getElapsedTime(this.waitStartTime),
queueResult: this.state
};
await super.performAction(params);
await super.performAction(params, this.killReason !== KillReason.Replaced);
}
/**
@@ -109,8 +119,22 @@ class TaskEnqueue extends Task {
}
resolve(this._doBridge(cs, dlg, ep));
})
.once('kill', () => {
this._removeFromQueue(cs);
.once('kill', async() => {
/* invoke account-level webhook for queue event notifications */
if (!this.dequeued) {
try {
const members = await this._removeFromQueue(cs);
cs.performQueueWebhook({
event: 'leave',
queue: this.data.name,
length: members,
leaveReason: 'abandoned',
leaveTime: Date.now()
});
} catch (err) {}
}
if (this._playSession) {
this.logger.debug('killing waitUrl');
this._playSession.kill();
@@ -215,8 +239,9 @@ class TaskEnqueue extends Task {
ep.unbridge().catch((err) => {});
resolve();
})
.on('kill', () => {
this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from enqeue party');
.on('kill', (reason) => {
this.killReason = reason;
this.logger.info(`TaskEnqueue:_bridgeLocal ending with ${this.killReason}`);
ep.unbridge().catch((err) => {});
// notify partner that we dropped
@@ -242,12 +267,26 @@ class TaskEnqueue extends Task {
* @param {string} opts.epUuid uuid of the endpoint we need to bridge to
* @param {string} opts.dequeueSipAddress ip:port of the feature server hosting the other call
*/
notifyQueueEvent(cs, opts) {
async notifyQueueEvent(cs, opts) {
if (opts.event === 'dequeue') {
if (this.bridgeNow) return;
this.logger.info({opts}, `TaskEnqueue:notifyDequeueEvent: leaving ${this.queueName} because someone wants me`);
assert(opts.dequeueSipAddress && opts.epUuid && opts.notifyUrl);
this.emitter.emit('dequeue', opts);
try {
const {lengthOfList} = cs.srf.locals.dbHelpers;
const members = await lengthOfList(this.queueName);
this.dequeued = true;
cs.performQueueWebhook({
event: 'leave',
queue: this.data.name,
length: Math.max(members - 1, 0),
leaveReason: 'dequeued',
leaveTime: Date.now(),
dequeuer: opts.dequeuer
});
} catch (err) {}
}
else if (opts.event === 'hangup') {
this.emitter.emit('hangup');
@@ -277,7 +316,7 @@ class TaskEnqueue extends Task {
const json = await cs.application.requestor.request(hook, params);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = tasks.filter((t) => allowed.includes(t.verb));
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
if (tasks.length !== allowedTasks.length) {
this.logger.debug({tasks, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`);
@@ -288,7 +327,7 @@ class TaskEnqueue extends Task {
const tasksToRun = [];
let leave = false;
for (const o of tasks) {
if (o.verb === TaskName.Leave) {
if (o.name === TaskName.Leave) {
leave = true;
this.logger.info('waitHook returned a leave task');
break;
@@ -304,7 +343,7 @@ class TaskEnqueue extends Task {
dlg,
ep: cs.ep,
callInfo: cs.callInfo,
tasksToRun
tasks: tasksToRun
});
await this._playSession.exec();
this._playSession = null;