diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 8e195f18..c5b1322b 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -1,15 +1,22 @@ const Emitter = require('events'); const fs = require('fs'); -const {CallDirection, TaskPreconditions, CallStatus, TaskName} = require('../utils/constants'); +const {CallDirection, TaskPreconditions, CallStatus, TaskName, KillReason} = require('../utils/constants'); const moment = require('moment'); const assert = require('assert'); const sessionTracker = require('./session-tracker'); const makeTask = require('../tasks/make_task'); const normalizeJambones = require('../utils/normalize-jambones'); const listTaskNames = require('../utils/summarize-tasks'); +const Requestor = require('../utils/requestor'); const BADPRECONDITIONS = 'preconditions not met'; const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction'; +const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks +WHERE webhook_sid = +( + SELECT queue_event_hook_sid FROM accounts where account_sid = ? +)`; + /** * @classdesc Represents the execution context for a call. * It holds the resources, such as the sip dialog and media server endpoint @@ -49,6 +56,8 @@ class CallSession extends Emitter { if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) { sessionTracker.add(this.callSid, this); } + + this._pool = srf.locals.dbHelpers.pool; } /** @@ -393,7 +402,7 @@ class CallSession extends Emitter { } else { /* we started a new app on the child leg, but nothing given for parent so hang him up */ - this.currentTask.kill(); + this.currentTask.kill(this); } } @@ -530,7 +539,7 @@ class CallSession extends Emitter { this.logger.info({tasks: listTaskNames(tasks)}, `CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`); if (this.currentTask) { - this.currentTask.kill(); + this.currentTask.kill(this, KillReason.Replaced); this.currentTask = null; } } @@ -539,7 +548,7 @@ class CallSession extends Emitter { if (this.isConfirmCallSession) this.logger.debug('CallSession:kill (ConfirmSession)'); else this.logger.info('CallSession:kill'); if (this.currentTask) { - this.currentTask.kill(); + this.currentTask.kill(this); this.currentTask = null; } this.tasks = []; @@ -759,6 +768,41 @@ class CallSession extends Emitter { return {ms: this.ms, ep: this.ep}; } + /** + * If account was queue event webhook, send notification + * @param {*} obj - data to notify + */ + async performQueueWebhook(obj) { + if (typeof this.queueEventHookRequestor === 'undefined') { + const pp = this._pool.promise(); + try { + this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: looking up account'); + const [r] = await pp.query(sqlRetrieveQueueEventHook, this.accountSid); + if (0 === r.length) { + this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: no webhook provisioned'); + this.queueEventHookRequestor = null; + } + else { + this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found'); + this.queueEventHookRequestor = new Requestor(this.logger, r[0]); + this.queueEventHook = r[0]; + } + } catch (err) { + this.logger.error({err, accountSid: this.accountSid}, 'Error retrieving event hook'); + this.queueEventHookRequestor = null; + } + } + if (null === this.queueEventHookRequestor) return; + + /* send webhook */ + const params = {...obj, ...this.callInfo.toJSON()}; + this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook'); + this.queueEventHookRequestor.request(this.queueEventHook, params) + .catch((err) => { + this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event'); + }); + } + /** * A conference that the current task is waiting on has just started * @param {*} opts diff --git a/lib/tasks/conference.js b/lib/tasks/conference.js index 55afc864..4a7a6200 100644 --- a/lib/tasks/conference.js +++ b/lib/tasks/conference.js @@ -356,7 +356,7 @@ class Conference extends Task { } if (typeof this.maxParticipants === 'number' && this.maxParticipants > 1) { - this.endpoint.api('conference', `${this.confName} set max_members ${this.maxParticipants}`) + this.ep.api('conference', `${this.confName} set max_members ${this.maxParticipants}`) .catch((err) => this.logger.error(err, `Error setting max participants to ${this.maxParticipants}`)); } } @@ -448,16 +448,16 @@ class Conference extends Task { async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) { assert(!this._playSession); const json = await cs.application.requestor.request(hook, cs.callInfo); + const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); - const allowedTasks = json.filter((task) => allowed.includes(task.verb)); - if (json.length !== allowedTasks.length) { - this.logger.debug({json, allowedTasks}, 'unsupported task'); - throw new Error(`unsupported verb in dial conference wait/enterHook: only ${JSON.stringify(allowed)}`); + const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); + if (tasks.length !== allowedTasks.length) { + this.logger.debug({tasks, allowedTasks}, 'unsupported task'); + throw new Error(`unsupported verb in conference waitHook: only ${JSON.stringify(allowed)}`); } - this.logger.debug(`Conference:_playHook: executing ${json.length} tasks`); + this.logger.debug(`Conference:_playHook: executing ${tasks.length} tasks`); - if (json.length > 0) { - const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); + if (tasks.length > 0) { this._playSession = new ConfirmCallSession({ logger: this.logger, application: cs.application, @@ -514,9 +514,6 @@ class Conference extends Task { const functionName = `_on${capitalize(camelize(action))}`; (Conference.prototype[functionName] || unhandled).bind(this, this.logger, cs, evt)() ; } - else { - this.logger.debug(`Conference#__onConferenceEvent: got unhandled custom event: ${eventName}`) ; - } } // conference event handlers diff --git a/lib/tasks/dequeue.js b/lib/tasks/dequeue.js index 03afd795..b8655118 100644 --- a/lib/tasks/dequeue.js +++ b/lib/tasks/dequeue.js @@ -110,7 +110,8 @@ class TaskDequeue extends Task { event: 'dequeue', dequeueSipAddress: cs.srf.locals.localSipAddress, epUuid: ep.uuid, - notifyUrl: getUrl(cs) + notifyUrl: getUrl(cs), + dequeuer: cs.callInfo.toJSON() }); this.logger.info(`TaskDequeue:_dequeueUrl successfully sent POST to ${url}`); bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000); diff --git a/lib/tasks/dial.js b/lib/tasks/dial.js index cfe0ed52..f3a1504f 100644 --- a/lib/tasks/dial.js +++ b/lib/tasks/dial.js @@ -1,6 +1,13 @@ const Task = require('./task'); const makeTask = require('./make_task'); -const {CallStatus, CallDirection, TaskName, TaskPreconditions, MAX_SIMRINGS} = require('../utils/constants'); +const { + CallStatus, + CallDirection, + TaskName, + TaskPreconditions, + MAX_SIMRINGS, + KillReason +} = require('../utils/constants'); const assert = require('assert'); const placeCall = require('../utils/place-outdial'); const sessionTracker = require('../session/session-tracker'); @@ -138,8 +145,9 @@ class TaskDial extends Task { if (this.epOther) this._installDtmfDetection(cs, this.epOther, this.parentDtmfCollector); await this._attemptCalls(cs); await this.awaitTaskDone(); - await this.performAction(this.results); - if (this.epOther) this._removeDtmfDetection(cs, this.epOther); + this.logger.debug({callSid: this.cs.callSid}, 'Dial:exec task is done, sending actionHook if any'); + await this.performAction(this.results, this.killReason !== KillReason.Replaced); + this._removeDtmfDetection(cs, this.epOther); this._removeDtmfDetection(cs, this.ep); } catch (err) { this.logger.error({err}, 'TaskDial:exec terminating with error'); @@ -147,9 +155,10 @@ class TaskDial extends Task { } } - async kill(cs) { + async kill(cs, reason) { super.kill(cs); - if (this.epOther) this._removeDtmfDetection(this.cs, this.epOther); + this.killReason = reason || KillReason.Hangup; + this._removeDtmfDetection(this.cs, this.epOther); this._removeDtmfDetection(this.cs, this.ep); this._killOutdials(); if (this.sd) { diff --git a/lib/tasks/enqueue.js b/lib/tasks/enqueue.js index 65f3267b..8174593e 100644 --- a/lib/tasks/enqueue.js +++ b/lib/tasks/enqueue.js @@ -3,7 +3,7 @@ const Emitter = require('events'); const ConfirmCallSession = require('../session/confirm-call-session'); const normalizeJambones = require('../utils/normalize-jambones'); const makeTask = require('./make_task'); -const {TaskName, TaskPreconditions, QueueResults} = require('../utils/constants'); +const {TaskName, TaskPreconditions, QueueResults, KillReason} = require('../utils/constants'); const bent = require('bent'); const assert = require('assert'); @@ -61,10 +61,11 @@ class TaskEnqueue extends Task { } } - async kill(cs) { + async kill(cs, reason) { super.kill(cs); - this.logger.info(`TaskEnqueue:kill ${this.queueName}`); - this.emitter.emit('kill'); + this.killReason = reason || KillReason.Hangup; + this.logger.info(`TaskEnqueue:kill ${this.queueName} with reason ${this.killReason}`); + this.emitter.emit('kill', reason || KillReason.Hangup); this.notifyTaskDone(); } @@ -76,11 +77,20 @@ class TaskEnqueue extends Task { const members = await pushBack(this.queueName, url); this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`); this.notifyUrl = url; + + /* invoke account-level webhook for queue event notifications */ + cs.performQueueWebhook({ + event: 'join', + queue: this.data.name, + length: members, + joinTime: this.waitStartTime + }); } - async _removeFromQueue(cs, dlg) { - const {removeFromList} = cs.srf.locals.dbHelpers; - return await removeFromList(this.queueName, getUrl(cs)); + async _removeFromQueue(cs) { + const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers; + await removeFromList(this.queueName, getUrl(cs)); + return await lengthOfList(this.queueName); } async performAction() { @@ -89,7 +99,7 @@ class TaskEnqueue extends Task { queueTime: getElapsedTime(this.waitStartTime), queueResult: this.state }; - await super.performAction(params); + await super.performAction(params, this.killReason !== KillReason.Replaced); } /** @@ -109,8 +119,22 @@ class TaskEnqueue extends Task { } resolve(this._doBridge(cs, dlg, ep)); }) - .once('kill', () => { - this._removeFromQueue(cs); + .once('kill', async() => { + + /* invoke account-level webhook for queue event notifications */ + if (!this.dequeued) { + try { + const members = await this._removeFromQueue(cs); + cs.performQueueWebhook({ + event: 'leave', + queue: this.data.name, + length: members, + leaveReason: 'abandoned', + leaveTime: Date.now() + }); + } catch (err) {} + } + if (this._playSession) { this.logger.debug('killing waitUrl'); this._playSession.kill(); @@ -215,8 +239,9 @@ class TaskEnqueue extends Task { ep.unbridge().catch((err) => {}); resolve(); }) - .on('kill', () => { - this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from enqeue party'); + .on('kill', (reason) => { + this.killReason = reason; + this.logger.info(`TaskEnqueue:_bridgeLocal ending with ${this.killReason}`); ep.unbridge().catch((err) => {}); // notify partner that we dropped @@ -242,12 +267,26 @@ class TaskEnqueue extends Task { * @param {string} opts.epUuid uuid of the endpoint we need to bridge to * @param {string} opts.dequeueSipAddress ip:port of the feature server hosting the other call */ - notifyQueueEvent(cs, opts) { + async notifyQueueEvent(cs, opts) { if (opts.event === 'dequeue') { if (this.bridgeNow) return; this.logger.info({opts}, `TaskEnqueue:notifyDequeueEvent: leaving ${this.queueName} because someone wants me`); assert(opts.dequeueSipAddress && opts.epUuid && opts.notifyUrl); this.emitter.emit('dequeue', opts); + + try { + const {lengthOfList} = cs.srf.locals.dbHelpers; + const members = await lengthOfList(this.queueName); + this.dequeued = true; + cs.performQueueWebhook({ + event: 'leave', + queue: this.data.name, + length: Math.max(members - 1, 0), + leaveReason: 'dequeued', + leaveTime: Date.now(), + dequeuer: opts.dequeuer + }); + } catch (err) {} } else if (opts.event === 'hangup') { this.emitter.emit('hangup'); @@ -277,7 +316,7 @@ class TaskEnqueue extends Task { const json = await cs.application.requestor.request(hook, params); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); - const allowedTasks = tasks.filter((t) => allowed.includes(t.verb)); + const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); if (tasks.length !== allowedTasks.length) { this.logger.debug({tasks, allowedTasks}, 'unsupported task'); throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`); @@ -288,7 +327,7 @@ class TaskEnqueue extends Task { const tasksToRun = []; let leave = false; for (const o of tasks) { - if (o.verb === TaskName.Leave) { + if (o.name === TaskName.Leave) { leave = true; this.logger.info('waitHook returned a leave task'); break; @@ -304,7 +343,7 @@ class TaskEnqueue extends Task { dlg, ep: cs.ep, callInfo: cs.callInfo, - tasksToRun + tasks: tasksToRun }); await this._playSession.exec(); this._playSession = null; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 06d5cded..4961d9ea 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -92,6 +92,10 @@ "Hangup": "hangup", "Timeout": "timeout" }, + "KillReason": { + "Hangup": "hangup", + "Replaced": "replaced" + }, "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)" }