initial changes for queue webhooks

This commit is contained in:
Dave Horton
2021-06-17 15:06:52 -04:00
parent 02f5efba48
commit 8f7b5c959b
2 changed files with 73 additions and 8 deletions

View File

@@ -7,9 +7,16 @@ const sessionTracker = require('./session-tracker');
const makeTask = require('../tasks/make_task'); const makeTask = require('../tasks/make_task');
const normalizeJambones = require('../utils/normalize-jambones'); const normalizeJambones = require('../utils/normalize-jambones');
const listTaskNames = require('../utils/summarize-tasks'); const listTaskNames = require('../utils/summarize-tasks');
const Requestor = require('../utils/requestor');
const BADPRECONDITIONS = 'preconditions not met'; const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction'; const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks
WHERE webhook_sid =
(
SELECT queue_event_hook_sid FROM accounts where account_sid = ?
)`;
/** /**
* @classdesc Represents the execution context for a call. * @classdesc Represents the execution context for a call.
* It holds the resources, such as the sip dialog and media server endpoint * It holds the resources, such as the sip dialog and media server endpoint
@@ -49,6 +56,8 @@ class CallSession extends Emitter {
if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) { if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) {
sessionTracker.add(this.callSid, this); sessionTracker.add(this.callSid, this);
} }
this._pool = srf.locals.dbHelpers.pool;
} }
/** /**
@@ -759,6 +768,41 @@ class CallSession extends Emitter {
return {ms: this.ms, ep: this.ep}; return {ms: this.ms, ep: this.ep};
} }
/**
* If account was queue event webhook, send notification
* @param {*} obj - data to notify
*/
async performQueueWebhook(obj) {
if (typeof this.queueEventHookRequestor === 'undefined') {
const pp = this._pool.promise();
try {
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: looking up account');
const [r] = await pp.query(sqlRetrieveQueueEventHook, this.accountSid);
if (0 === r.length) {
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: no webhook provisioned');
this.queueEventHookRequestor = null;
}
else {
this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found');
this.queueEventHookRequestor = new Requestor(this.logger, r[0]);
this.queueEventHook = r[0];
}
} catch (err) {
this.logger.error({err, accountSid: this.accountSid}, 'Error retrieving event hook');
this.queueEventHookRequestor = null;
}
}
if (null === this.queueEventHookRequestor) return;
/* send webhook */
const params = {...obj, ...this.callInfo.toJSON()};
this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook');
this.queueEventHookRequestor.request(this.queueEventHook, params)
.catch((err) => {
this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event');
});
}
/** /**
* A conference that the current task is waiting on has just started * A conference that the current task is waiting on has just started
* @param {*} opts * @param {*} opts

View File

@@ -76,11 +76,20 @@ class TaskEnqueue extends Task {
const members = await pushBack(this.queueName, url); const members = await pushBack(this.queueName, url);
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`); this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
this.notifyUrl = url; this.notifyUrl = url;
/* invoke account-level webhook for queue event notifications */
cs.performQueueWebhook({
event: 'join',
queue: this.data.name,
length: members,
joinTime: this.waitStartTime
});
} }
async _removeFromQueue(cs, dlg) { async _removeFromQueue(cs) {
const {removeFromList} = cs.srf.locals.dbHelpers; const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers;
return await removeFromList(this.queueName, getUrl(cs)); await removeFromList(this.queueName, getUrl(cs));
return await lengthOfList(this.queueName);
} }
async performAction() { async performAction() {
@@ -109,8 +118,20 @@ class TaskEnqueue extends Task {
} }
resolve(this._doBridge(cs, dlg, ep)); resolve(this._doBridge(cs, dlg, ep));
}) })
.once('kill', () => { .once('kill', async() => {
this._removeFromQueue(cs); try {
const members = await this._removeFromQueue(cs);
/* invoke account-level webhook for queue event notifications */
cs.performQueueWebhook({
event: 'leave',
queue: this.data.name,
length: members,
leaveReason: 'abandoned',
leaveTime: Date.now()
});
} catch (err) {}
if (this._playSession) { if (this._playSession) {
this.logger.debug('killing waitUrl'); this.logger.debug('killing waitUrl');
this._playSession.kill(); this._playSession.kill();
@@ -277,7 +298,7 @@ class TaskEnqueue extends Task {
const json = await cs.application.requestor.request(hook, params); const json = await cs.application.requestor.request(hook, params);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = tasks.filter((t) => allowed.includes(t.verb)); const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
if (tasks.length !== allowedTasks.length) { if (tasks.length !== allowedTasks.length) {
this.logger.debug({tasks, allowedTasks}, 'unsupported task'); this.logger.debug({tasks, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`); throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`);
@@ -288,7 +309,7 @@ class TaskEnqueue extends Task {
const tasksToRun = []; const tasksToRun = [];
let leave = false; let leave = false;
for (const o of tasks) { for (const o of tasks) {
if (o.verb === TaskName.Leave) { if (o.name === TaskName.Leave) {
leave = true; leave = true;
this.logger.info('waitHook returned a leave task'); this.logger.info('waitHook returned a leave task');
break; break;
@@ -304,7 +325,7 @@ class TaskEnqueue extends Task {
dlg, dlg,
ep: cs.ep, ep: cs.ep,
callInfo: cs.callInfo, callInfo: cs.callInfo,
tasksToRun tasks: tasksToRun
}); });
await this._playSession.exec(); await this._playSession.exec();
this._playSession = null; this._playSession = null;