mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-21 17:17:58 +00:00
initial changes for queue webhooks
This commit is contained in:
@@ -7,9 +7,16 @@ const sessionTracker = require('./session-tracker');
|
|||||||
const makeTask = require('../tasks/make_task');
|
const makeTask = require('../tasks/make_task');
|
||||||
const normalizeJambones = require('../utils/normalize-jambones');
|
const normalizeJambones = require('../utils/normalize-jambones');
|
||||||
const listTaskNames = require('../utils/summarize-tasks');
|
const listTaskNames = require('../utils/summarize-tasks');
|
||||||
|
const Requestor = require('../utils/requestor');
|
||||||
const BADPRECONDITIONS = 'preconditions not met';
|
const BADPRECONDITIONS = 'preconditions not met';
|
||||||
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
|
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
|
||||||
|
|
||||||
|
const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks
|
||||||
|
WHERE webhook_sid =
|
||||||
|
(
|
||||||
|
SELECT queue_event_hook_sid FROM accounts where account_sid = ?
|
||||||
|
)`;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @classdesc Represents the execution context for a call.
|
* @classdesc Represents the execution context for a call.
|
||||||
* It holds the resources, such as the sip dialog and media server endpoint
|
* It holds the resources, such as the sip dialog and media server endpoint
|
||||||
@@ -48,6 +55,8 @@ class CallSession extends Emitter {
|
|||||||
if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) {
|
if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) {
|
||||||
sessionTracker.add(this.callSid, this);
|
sessionTracker.add(this.callSid, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this._pool = srf.locals.dbHelpers.pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -708,6 +717,41 @@ class CallSession extends Emitter {
|
|||||||
return {ms: this.ms, ep: this.ep};
|
return {ms: this.ms, ep: this.ep};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If account was queue event webhook, send notification
|
||||||
|
* @param {*} obj - data to notify
|
||||||
|
*/
|
||||||
|
async performQueueWebhook(obj) {
|
||||||
|
if (typeof this.queueEventHookRequestor === 'undefined') {
|
||||||
|
const pp = this._pool.promise();
|
||||||
|
try {
|
||||||
|
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: looking up account');
|
||||||
|
const [r] = await pp.query(sqlRetrieveQueueEventHook, this.accountSid);
|
||||||
|
if (0 === r.length) {
|
||||||
|
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: no webhook provisioned');
|
||||||
|
this.queueEventHookRequestor = null;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found');
|
||||||
|
this.queueEventHookRequestor = new Requestor(this.logger, r[0]);
|
||||||
|
this.queueEventHook = r[0];
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error({err, accountSid: this.accountSid}, 'Error retrieving event hook');
|
||||||
|
this.queueEventHookRequestor = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (null === this.queueEventHookRequestor) return;
|
||||||
|
|
||||||
|
/* send webhook */
|
||||||
|
const params = {...obj, ...this.callInfo.toJSON()};
|
||||||
|
this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook');
|
||||||
|
this.queueEventHookRequestor.request(this.queueEventHook, params)
|
||||||
|
.catch((err) => {
|
||||||
|
this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event');
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A conference that the current task is waiting on has just started
|
* A conference that the current task is waiting on has just started
|
||||||
* @param {*} opts
|
* @param {*} opts
|
||||||
|
|||||||
@@ -76,11 +76,20 @@ class TaskEnqueue extends Task {
|
|||||||
const members = await pushBack(this.queueName, url);
|
const members = await pushBack(this.queueName, url);
|
||||||
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
|
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
|
||||||
this.notifyUrl = url;
|
this.notifyUrl = url;
|
||||||
|
|
||||||
|
/* invoke account-level webhook for queue event notifications */
|
||||||
|
cs.performQueueWebhook({
|
||||||
|
event: 'join',
|
||||||
|
queue: this.data.name,
|
||||||
|
length: members,
|
||||||
|
joinTime: this.waitStartTime
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async _removeFromQueue(cs, dlg) {
|
async _removeFromQueue(cs) {
|
||||||
const {removeFromList} = cs.srf.locals.dbHelpers;
|
const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers;
|
||||||
return await removeFromList(this.queueName, getUrl(cs));
|
await removeFromList(this.queueName, getUrl(cs));
|
||||||
|
return await lengthOfList(this.queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
async performAction() {
|
async performAction() {
|
||||||
@@ -109,8 +118,20 @@ class TaskEnqueue extends Task {
|
|||||||
}
|
}
|
||||||
resolve(this._doBridge(cs, dlg, ep));
|
resolve(this._doBridge(cs, dlg, ep));
|
||||||
})
|
})
|
||||||
.once('kill', () => {
|
.once('kill', async() => {
|
||||||
this._removeFromQueue(cs);
|
try {
|
||||||
|
const members = await this._removeFromQueue(cs);
|
||||||
|
|
||||||
|
/* invoke account-level webhook for queue event notifications */
|
||||||
|
cs.performQueueWebhook({
|
||||||
|
event: 'leave',
|
||||||
|
queue: this.data.name,
|
||||||
|
length: members,
|
||||||
|
leaveReason: 'abandoned',
|
||||||
|
leaveTime: Date.now()
|
||||||
|
});
|
||||||
|
} catch (err) {}
|
||||||
|
|
||||||
if (this._playSession) {
|
if (this._playSession) {
|
||||||
this.logger.debug('killing waitUrl');
|
this.logger.debug('killing waitUrl');
|
||||||
this._playSession.kill();
|
this._playSession.kill();
|
||||||
@@ -277,7 +298,7 @@ class TaskEnqueue extends Task {
|
|||||||
const json = await cs.application.requestor.request(hook, params);
|
const json = await cs.application.requestor.request(hook, params);
|
||||||
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
|
|
||||||
const allowedTasks = tasks.filter((t) => allowed.includes(t.verb));
|
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
|
||||||
if (tasks.length !== allowedTasks.length) {
|
if (tasks.length !== allowedTasks.length) {
|
||||||
this.logger.debug({tasks, allowedTasks}, 'unsupported task');
|
this.logger.debug({tasks, allowedTasks}, 'unsupported task');
|
||||||
throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`);
|
throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`);
|
||||||
@@ -288,7 +309,7 @@ class TaskEnqueue extends Task {
|
|||||||
const tasksToRun = [];
|
const tasksToRun = [];
|
||||||
let leave = false;
|
let leave = false;
|
||||||
for (const o of tasks) {
|
for (const o of tasks) {
|
||||||
if (o.verb === TaskName.Leave) {
|
if (o.name === TaskName.Leave) {
|
||||||
leave = true;
|
leave = true;
|
||||||
this.logger.info('waitHook returned a leave task');
|
this.logger.info('waitHook returned a leave task');
|
||||||
break;
|
break;
|
||||||
@@ -304,7 +325,7 @@ class TaskEnqueue extends Task {
|
|||||||
dlg,
|
dlg,
|
||||||
ep: cs.ep,
|
ep: cs.ep,
|
||||||
callInfo: cs.callInfo,
|
callInfo: cs.callInfo,
|
||||||
tasksToRun
|
tasks: tasksToRun
|
||||||
});
|
});
|
||||||
await this._playSession.exec();
|
await this._playSession.exec();
|
||||||
this._playSession = null;
|
this._playSession = null;
|
||||||
|
|||||||
@@ -99,6 +99,7 @@ function installSrfLocals(srf, logger) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const {
|
const {
|
||||||
|
pool,
|
||||||
lookupAppByPhoneNumber,
|
lookupAppByPhoneNumber,
|
||||||
lookupAppBySid,
|
lookupAppBySid,
|
||||||
lookupAppByRealm,
|
lookupAppByRealm,
|
||||||
@@ -138,6 +139,7 @@ function installSrfLocals(srf, logger) {
|
|||||||
|
|
||||||
Object.assign(srf.locals, {
|
Object.assign(srf.locals, {
|
||||||
dbHelpers: {
|
dbHelpers: {
|
||||||
|
pool,
|
||||||
lookupAppByPhoneNumber,
|
lookupAppByPhoneNumber,
|
||||||
lookupAppBySid,
|
lookupAppBySid,
|
||||||
lookupAppByRealm,
|
lookupAppByRealm,
|
||||||
|
|||||||
Reference in New Issue
Block a user