From a0508a2494042d5ac91fdd2ea45d59ab42e3e354 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Wed, 6 May 2020 15:27:24 -0400 Subject: [PATCH] initial support for conference and queues --- .../{start-conference.js => conference.js} | 10 +- lib/http-routes/api/dequeue.js | 41 +++ lib/http-routes/api/enqueue.js | 41 +++ lib/http-routes/api/index.js | 4 +- lib/middleware.js | 2 +- lib/session/call-session.js | 31 +- lib/tasks/conference.js | 40 +-- lib/tasks/dequeue.js | 134 ++++++++ lib/tasks/enqueue.js | 314 ++++++++++++++++++ lib/tasks/leave.js | 22 ++ lib/tasks/make_task.js | 9 + lib/tasks/say.js | 3 +- lib/tasks/specs.json | 26 ++ lib/tasks/task.js | 33 ++ lib/utils/constants.json | 15 + lib/utils/install-srf-locals.js | 14 +- package-lock.json | 38 +-- package.json | 2 +- 18 files changed, 711 insertions(+), 68 deletions(-) rename lib/http-routes/api/{start-conference.js => conference.js} (72%) create mode 100644 lib/http-routes/api/dequeue.js create mode 100644 lib/http-routes/api/enqueue.js create mode 100644 lib/tasks/dequeue.js create mode 100644 lib/tasks/enqueue.js create mode 100644 lib/tasks/leave.js diff --git a/lib/http-routes/api/start-conference.js b/lib/http-routes/api/conference.js similarity index 72% rename from lib/http-routes/api/start-conference.js rename to lib/http-routes/api/conference.js index 07d7d32f..ca283ce5 100644 --- a/lib/http-routes/api/start-conference.js +++ b/lib/http-routes/api/conference.js @@ -12,27 +12,27 @@ function retrieveCallSession(callSid, opts) { if (cs) { const task = cs.currentTask; if (!task || task.name != TaskName.Conference) { - throw new DbErrorUnprocessableRequest(`startConference api failure: indicated call is not waiting: ${task.name}`); + throw new DbErrorUnprocessableRequest(`conference api failure: indicated call is not waiting: ${task.name}`); } } return cs; } /** - * update a call + * notify a waiting session that a conference has started */ router.post('/:callSid', async(req, res) => { const logger = req.app.locals.logger; const callSid = req.params.callSid; - logger.debug({body: req.body}, 'got startConference request'); + logger.debug({body: req.body}, 'got conference request'); try { const cs = retrieveCallSession(callSid, req.body); if (!cs) { - logger.info(`startConference: callSid not found ${callSid}`); + logger.info(`conference: callSid not found ${callSid}`); return res.sendStatus(404); } res.status(202).end(); - cs.notifyStartConference(req.body); + cs.notifyConferenceEvent(req.body); } catch (err) { sysError(logger, res, err); } diff --git a/lib/http-routes/api/dequeue.js b/lib/http-routes/api/dequeue.js new file mode 100644 index 00000000..5bc4f9a5 --- /dev/null +++ b/lib/http-routes/api/dequeue.js @@ -0,0 +1,41 @@ +const router = require('express').Router(); +const sysError = require('./error'); +const sessionTracker = require('../../session/session-tracker'); +const {TaskName} = require('../../utils/constants.json'); +const {DbErrorUnprocessableRequest} = require('../utils/errors'); + +/** + * validate the call state + */ +function retrieveCallSession(callSid, opts) { + const cs = sessionTracker.get(callSid); + if (cs) { + const task = cs.currentTask; + if (!task || task.name != TaskName.Dequeue) { + throw new DbErrorUnprocessableRequest(`dequeue api failure: indicated call is not queued: ${task.name}`); + } + } + return cs; +} + +/** + * notify a session in a dequeue verb of an event + */ +router.post('/:callSid', async(req, res) => { + const logger = req.app.locals.logger; + const callSid = req.params.callSid; + logger.debug({body: req.body}, 'got dequeue event'); + try { + const cs = retrieveCallSession(callSid, req.body); + if (!cs) { + logger.info(`dequeue: callSid not found ${callSid}`); + return res.sendStatus(404); + } + res.status(202).end(); + cs.notifyDequeueEvent(req.body); + } catch (err) { + sysError(logger, res, err); + } +}); + +module.exports = router; diff --git a/lib/http-routes/api/enqueue.js b/lib/http-routes/api/enqueue.js new file mode 100644 index 00000000..9758497f --- /dev/null +++ b/lib/http-routes/api/enqueue.js @@ -0,0 +1,41 @@ +const router = require('express').Router(); +const sysError = require('./error'); +const sessionTracker = require('../../session/session-tracker'); +const {TaskName} = require('../../utils/constants.json'); +const {DbErrorUnprocessableRequest} = require('../utils/errors'); + +/** + * validate the call state + */ +function retrieveCallSession(callSid, opts) { + const cs = sessionTracker.get(callSid); + if (cs) { + const task = cs.currentTask; + if (!task || task.name != TaskName.Enqueue) { + throw new DbErrorUnprocessableRequest(`enqueue api failure: indicated call is not queued: ${task.name}`); + } + } + return cs; +} + +/** + * notify a waiting session that a conference has started + */ +router.post('/:callSid', async(req, res) => { + const logger = req.app.locals.logger; + const callSid = req.params.callSid; + logger.debug({body: req.body}, 'got enqueue event'); + try { + const cs = retrieveCallSession(callSid, req.body); + if (!cs) { + logger.info(`enqueue: callSid not found ${callSid}`); + return res.sendStatus(404); + } + res.status(202).end(); + cs.notifyEnqueueEvent(req.body); + } catch (err) { + sysError(logger, res, err); + } +}); + +module.exports = router; diff --git a/lib/http-routes/api/index.js b/lib/http-routes/api/index.js index 09c1b449..9c11590b 100644 --- a/lib/http-routes/api/index.js +++ b/lib/http-routes/api/index.js @@ -2,7 +2,9 @@ const api = require('express').Router(); api.use('/createCall', require('./create-call')); api.use('/updateCall', require('./update-call')); -api.use('/startConference', require('./start-conference')); +api.use('/conference', require('./conference')); +api.use('/dequeue', require('./dequeue')); +api.use('/enqueue', require('./enqueue')); // health checks api.get('/', (req, res) => res.sendStatus(200)); diff --git a/lib/middleware.js b/lib/middleware.js index 76adad64..49b7f48e 100644 --- a/lib/middleware.js +++ b/lib/middleware.js @@ -10,7 +10,7 @@ module.exports = function(srf, logger) { const {lookupAppByPhoneNumber, lookupAppBySid, lookupAppByRealm} = srf.locals.dbHelpers; function initLocals(req, res, next) { - const callSid = uuidv4(); + const callSid = req.has('X-Retain-Call-Sid') ? req.get('X-Retain-Call-Sid') : uuidv4(); req.locals = { callSid, logger: logger.child({callId: req.get('Call-ID'), callSid}) diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 7099b191..6cddce98 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -204,6 +204,8 @@ class CallSession extends Emitter { } trackTmpFile(path) { + // TODO: don't add if its already in the list (should we make it a set?) + this.logger.debug(`adding tmp file to track ${path}`); this.tmpFiles.add(path); } @@ -477,6 +479,7 @@ class CallSession extends Emitter { const ep = await this.ms.createEndpoint({remoteSdp: this.req.body}); ep.cs = this; this.ep = ep; + await ep.set('hangup_after_bridge', false); this.logger.debug('allocated endpoint'); @@ -534,6 +537,8 @@ class CallSession extends Emitter { return; } this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp}); + await this.ep.set('hangup_after_bridge', false); + await this.dlg.modify(this.ep.local.sdp); this.logger.debug('CallSession:replaceEndpoint completed'); return this.ep; @@ -631,6 +636,7 @@ class CallSession extends Emitter { } if (!this.ep) { this.ep = await this.ms.createEndpoint({remoteSdp: this.req.body}); + await this.ep.set('hangup_after_bridge', false); } return {ms: this.ms, ep: this.ep}; } @@ -639,12 +645,32 @@ class CallSession extends Emitter { * A conference that the current task is waiting on has just started * @param {*} opts */ - notifyStartConference(opts) { + notifyConferenceEvent(opts) { if (this.currentTask && typeof this.currentTask.notifyStartConference === 'function') { this.currentTask.notifyStartConference(this, opts); } } + /** + * Notify a session in an Enqueue task of an event + * @param {*} opts + */ + notifyEnqueueEvent(opts) { + if (this.currentTask && typeof this.currentTask.notifyQueueEvent === 'function') { + this.currentTask.notifyQueueEvent(this, opts); + } + } + + /** + * Notify a session in a Dequeue task of an event + * @param {*} opts + */ + notifyDequeueEvent(opts) { + if (this.currentTask && typeof this.currentTask.notifyQueueEvent === 'function') { + this.currentTask.notifyQueueEvent(this, opts); + } + } + /** * Transfer the call to another feature server * @param {uri} sip uri to refer the call to @@ -656,7 +682,8 @@ class CallSession extends Emitter { method: 'REFER', headers: { 'Refer-To': referTo, - 'Referred-By': `sip:${this.srf.locals.localSipAddress}` + 'Referred-By': `sip:${this.srf.locals.localSipAddress}`, + 'X-Retain-Call-Sid': this.callSid } }); return [200, 202].includes(res.status); diff --git a/lib/tasks/conference.js b/lib/tasks/conference.js index 249f43ea..04eba550 100644 --- a/lib/tasks/conference.js +++ b/lib/tasks/conference.js @@ -5,7 +5,6 @@ const {TaskName, TaskPreconditions} = require('../utils/constants'); const normalizeJambones = require('../utils/normalize-jambones'); const makeTask = require('./make_task'); const bent = require('bent'); -const uuidv4 = require('uuid/v4'); const assert = require('assert'); const WAIT = 'wait'; const JOIN = 'join'; @@ -239,7 +238,7 @@ class Conference extends Task { localServer: cs.srf.locals.localSipAddress, confServer: this.joinDetails.conferenceSipAddress }, `Conference:_doJoin: conference ${this.confName} is hosted elsewhere`); - const success = await this._doRefer(cs, this.joinDetails.conferenceSipAddress); + const success = await this.transferCallToFeatureServer(cs, this.joinDetails.conferenceSipAddress); /** * If the REFER succeeded, we will get a BYE from the SBC @@ -277,7 +276,7 @@ class Conference extends Task { this.logger.info({members}, `Conference:doStart - notifying waiting list for ${this.confName}`); for (const url of members) { try { - await bent('POST', 202)(url, {conferenceSipAddress: cs.srf.locals.localSipAddress}); + await bent('POST', 202)(url, {event: 'start', conferenceSipAddress: cs.srf.locals.localSipAddress}); } catch (err) { this.logger.info(err, `Failed notifying ${url} to join ${this.confName}`); } @@ -363,37 +362,6 @@ class Conference extends Task { this.emitter.emit('join', opts); } - async _doRefer(cs, sipAddress) { - const uuid = uuidv4(); - const {addKey} = cs.srf.locals.dbHelpers; - const obj = Object.assign({}, cs.application); - delete obj.requestor; - delete obj.notifier; - obj.tasks = cs.getRemainingTaskData(); - - this.logger.debug({obj}, 'Conference:_doRefer'); - - const success = await addKey(uuid, JSON.stringify(obj), 30); - if (!success) { - this.logger.info(`Conference:_doRefer failed storing task data before REFER for ${this.confName}`); - return; - } - try { - this.logger.info(`Conference:_doRefer: referring call to ${sipAddress} for ${this.confName}`); - this.callMoved = true; - const success = await cs.referCall(`sip:context-${uuid}@${sipAddress}`); - if (!success) { - this.callMoved = false; - this.logger.info('Conference:_doRefer REFER failed'); - return success; - } - this.logger.info('Conference:_doRefer REFER succeeded'); - return success; - } catch (err) { - this.logger.error(err, 'Conference:_doRefer error'); - } - } - /** * Add ourselves to the waitlist of sessions to be notified once * the conference starts @@ -402,7 +370,7 @@ class Conference extends Task { async _addToWaitList(cs) { const {addToSet} = cs.srf.locals.dbHelpers; const setName = getWaitListName(this.confName); - const url = `${cs.srf.locals.serviceUrl}/v1/startConference/${cs.callSid}`; + const url = `${cs.srf.locals.serviceUrl}/v1/conference/${cs.callSid}`; const added = await addToSet(setName, url); if (added !== 1) throw new Error(`failed adding to the waitlist for conference ${this.confName}: ${added}`); this.logger.debug(`successfully added to the waiting list for conference ${this.confName}`); @@ -411,7 +379,7 @@ class Conference extends Task { async _removeFromWaitList(cs) { const {removeFromSet} = cs.srf.locals.dbHelpers; const setName = getWaitListName(this.confName); - const url = `${cs.srf.locals.serviceUrl}/v1/startConference/${cs.callSid}`; + const url = `${cs.srf.locals.serviceUrl}/v1/conference/${cs.callSid}`; try { const count = await removeFromSet(setName, url); this.logger.debug(`Conference:_removeFromWaitList removed ${count} from waiting list`); diff --git a/lib/tasks/dequeue.js b/lib/tasks/dequeue.js new file mode 100644 index 00000000..7ee4f97c --- /dev/null +++ b/lib/tasks/dequeue.js @@ -0,0 +1,134 @@ +const Task = require('./task'); +const {TaskName, TaskPreconditions, DequeueResults} = require('../utils/constants'); +const Emitter = require('events'); +const bent = require('bent'); +const assert = require('assert'); + +const sleepFor = (ms) => new Promise((resolve) => setTimeout(() => resolve(), ms)); + +const getUrl = (cs) => `${cs.srf.locals.serviceUrl}/v1/dequeue/${cs.callSid}`; + +class TaskDequeue extends Task { + constructor(logger, opts, parentTask) { + super(logger, opts); + this.preconditions = TaskPreconditions.Endpoint; + + this.queueName = this.data.name; + this.timeout = this.data.timeout || 0; + + this.emitter = new Emitter(); + this.state = DequeueResults.Timeout; + } + + get name() { return TaskName.Dequeue; } + + async exec(cs, ep) { + await super.exec(cs); + this.queueName = `queue:${cs.accountSid}:${this.queueName}`; + + const url = await this._getMemberFromQueue(cs); + if (!url) this.performAction({dequeueResult: 'timeout'}).catch((err) => {}); + else { + try { + await this._dequeueUrl(cs, ep, url); + this.performAction({dequeueResult: 'complete'}).catch((err) => {}); + } catch (err) { + this.emitter.removeAllListeners(); + this.performAction({dequeueResult: 'hangup'}).catch((err) => {}); + } + } + } + + async kill(cs) { + super.kill(cs); + if (this.state === DequeueResults.Bridged) { + this.logger.info(`TaskDequeue:kill - notifying partner we are going away ${this.partnerUrl}`); + bent('POST', 202)(this.partnerUrl, {event: 'hangup'}).catch((err) => { + this.logger.info(err, 'TaskDequeue:kill error notifying partner of hangup'); + }); + } + this.emitter.emit('kill'); + } + + _getMemberFromQueue(cs) { + const {popFront} = cs.srf.locals.dbHelpers; + + return new Promise(async(resolve) => { + let timer; + let timedout = false, found = false; + if (this.timeout > 0) { + timer = setTimeout(() => { + this.logger.info(`TaskDequeue:_getMemberFromQueue timed out after ${this.timeout}s`); + timedout = true; + resolve(); + }, this.timeout * 1000); + } + + do { + try { + const url = await popFront(this.queueName); + if (url) { + found = true; + clearTimeout(timer); + resolve(url); + } + } catch (err) { + this.logger.debug({err}, 'TaskDequeue:_getMemberFromQueue error popFront'); + } + await sleepFor(5000); + } while (!this.killed && !timedout && !found); + }); + } + + _dequeueUrl(cs, ep, url) { + this.partnerUrl = url; + + return new Promise(async(resolve, reject) => { + let bridgeTimer; + this.emitter + .on('bridged', () => { + clearTimeout(bridgeTimer); + this.state = DequeueResults.Bridged; + }) + .on('hangup', () => { + this.logger.info('TaskDequeue:_dequeueUrl hangup from partner'); + resolve(); + }) + .on('kill', () => { + resolve(); + }); + + // now notify partner to bridge to me + try { + await bent('POST', 202)(url, { + event: 'dequeue', + dequeueSipAddress: cs.srf.locals.localSipAddress, + epUuid: ep.uuid, + notifyUrl: getUrl(cs) + }); + bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000); + } catch (err) { + this.logger.info({err, url}, `TaskDequeue:_dequeueUrl error dequeueing from ${this.queueName}, try again`); + reject(new Error('bridge failure')); + } + }); + } + + notifyQueueEvent(cs, opts) { + if (opts.event === 'bridged') { + assert(opts.notifyUrl); + this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: successfully bridged to member from ${this.queueName}`); + this.partnerUrl = opts.notifyUrl; + this.emitter.emit('bridged'); + } + else if (opts.event === 'hangup') { + this.emitter.emit('hangup'); + } + else { + this.logger.error({opts}, 'TaskDequeue:notifyDequeueEvent - unsupported event/payload'); + } + } + +} + +module.exports = TaskDequeue; diff --git a/lib/tasks/enqueue.js b/lib/tasks/enqueue.js new file mode 100644 index 00000000..4c6b64d2 --- /dev/null +++ b/lib/tasks/enqueue.js @@ -0,0 +1,314 @@ +const Task = require('./task'); +const Emitter = require('events'); +const ConfirmCallSession = require('../session/confirm-call-session'); +const normalizeJambones = require('../utils/normalize-jambones'); +const makeTask = require('./make_task'); +const {TaskName, TaskPreconditions, QueueResults} = require('../utils/constants'); +const bent = require('bent'); +const assert = require('assert'); + +const getUrl = (cs) => `${cs.srf.locals.serviceUrl}/v1/enqueue/${cs.callSid}`; + +const getElapsedTime = (from) => Math.floor((Date.now() - from) / 1000); + +class TaskEnqueue extends Task { + constructor(logger, opts) { + super(logger, opts); + this.logger = logger; + this.preconditions = TaskPreconditions.Endpoint; + + this.queueName = this.data.name; + this.waitHook = this.data.waitHook; + + this.emitter = new Emitter(); + this.state = QueueResults.Wait; + + // transferred from another server in order to bridge to a local caller? + if (this.data._) { + this.bridgeNow = true; + this.bridgeDetails = { + epUid: this.data._.epUuid, + notifyUrl: this.data._.notifyUrl + }; + this.waitStartTime = this.data._.waitStartTime; + } + } + + get name() { return TaskName.Enqueue; } + + async exec(cs, ep) { + await super.exec(cs); + const dlg = cs.dlg; + this.queueName = `queue:${cs.accountSid}:${this.queueName}`; + + try { + if (!this.bridgeNow) { + await this._addToQueue(cs, dlg, ep); + await this._doWait(cs, dlg, ep); + } + else { + await this._doBridge(cs, dlg, ep); + } + if (!this.callMoved) await this.performAction(); + + this.logger.debug(`TaskEnqueue:exec - task done queue ${this.queueName}`); + } catch (err) { + this.logger.info(err, `TaskEnqueue:exec - error in enqueue ${this.queueName}`); + } + } + + async kill(cs) { + super.kill(cs); + this.logger.info(`TaskEnqueue:kill ${this.queueName}`); + this.emitter.emit('kill'); + } + + async _addToQueue(cs, dlg) { + const {pushBack} = cs.srf.locals.dbHelpers; + const url = getUrl(cs); + this.waitStartTime = Date.now(); + this.logger.debug({queue: this.queueName, url}, 'pushing url onto queue'); + const members = await pushBack(this.queueName, url); + this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`); + this.notifyUrl = url; + } + + async _removeFromQueue(cs, dlg) { + const {removeFromList} = cs.srf.locals.dbHelpers; + return await removeFromList(this.queueName, getUrl(cs)); + } + + async performAction() { + const params = { + queueSid: this.queueName, + queueTime: getElapsedTime(this.waitStartTime), + queueResult: this.state + }; + await super.performAction(params); + } + + /** + * Add ourselves to the queue with a url that can be invoked to tell us to dequeue + * @param {CallSession} cs + * @param {SipDialog} dlg + */ + async _doWait(cs, dlg, ep) { + return new Promise(async(resolve, reject) => { + this.emitter + .once('dequeue', (opts) => { + this.bridgeDetails = opts; + this.logger.info({bridgeDetails: this.bridgeDetails}, `time to dequeue from ${this.queueName}`); + if (this._playSession) { + this._playSession.kill(); + this._playSession = null; + } + resolve(this._doBridge(cs, dlg, ep)); + }) + .once('kill', () => { + this._removeFromQueue(cs); + if (this._playSession) { + this.logger.debug('killing waitUrl'); + this._playSession.kill(); + this._playSession = null; + } + resolve(); + }); + + if (this.waitHook && !this.killed) { + do { + try { + await ep.play('silence_stream://500'); + const tasks = await this._playHook(cs, dlg, this.waitHook); + if (0 === tasks.length) break; + } catch (err) { + if (!this.bridgeDetails && !this.killed) { + this.logger.info(err, `TaskEnqueue:_doWait: failed retrieving waitHook for ${this.queueName}`); + } + this._playSession = null; + break; + } + } while (!this.killed && !this.bridgeDetails); + } + }); + + } + + /** + * Bridge to another call. + * The call may be homed on this feature server, or another one - + * in the latter case, move the call to the other server via REFER + * Returns a promise that resolves: + * (a) When the call is transferred to the other feature server if the dequeue-er is not local, or + * (b) When either party hangs up the bridged call + * @param {CallSession} cs + * @param {SipDialog} dlg + */ + async _doBridge(cs, dlg, ep) { + assert(this.bridgeNow || this.bridgeDetails.dequeueSipAddress); + if (!this.bridgeNow && cs.srf.locals.localSipAddress !== this.bridgeDetails.dequeueSipAddress) { + this.logger.info({ + localServer: cs.srf.locals.localSipAddress, + otherServer: this.bridgeDetails.dequeueSipAddress + }, `TaskEnqueue:_doBridge: leg for queue ${this.queueName} is hosted elsewhere`); + const success = await this.transferCallToFeatureServer(cs, this.bridgeDetails.dequeueSipAddress, { + waitStartTime: this.waitStartTime, + epUuid: this.bridgeDetails.epUuid, + notifyUrl: this.bridgeDetails.notifyUrl + }); + + /** + * If the REFER succeeded, we will get a BYE from the SBC + * which will trigger kill and the end of the execution of the CallSession + * which is what we want - so do nothing and let that happen. + * If on the other hand, the REFER failed then we are in a bad state + * and need to end the enqueue task with a failure indication and + * allow the application to continue on + */ + if (success) { + this.logger.info(`TaskEnqueue:_doBridge: REFER of ${this.queueName} succeeded`); + return; + } + this.state = QueueResults.Error; + return; + } + this.logger.info(`TaskEnqueue:_doBridge: queue ${this.queueName} is hosted locally`); + await this._bridgeLocal(cs, dlg, ep); + } + + _bridgeLocal(cs, dlg, ep) { + assert(this.bridgeDetails.epUuid && this.bridgeDetails.notifyUrl); + + return new Promise(async(resolve, reject) => { + try { + this.other = {epUuid: this.bridgeDetails.epUuid}; + + // bridge to the dequeuing endpoint + this.logger.debug(`TaskEnqueue:_doBridge: attempting to bridge call to ${this.other.epUuid}`); + await ep.bridge(this.other.epUuid); + this.state = QueueResults.Bridged; + this.logger.info(`TaskEnqueue:_doBridge: successfully bridged to ${this.other.epUuid}`); + + // notify partner we are on - give him our possibly new url + const url = this.bridgeDetails.notifyUrl; + bent('POST', 202)(url, { + event: 'bridged', + notifyUrl: getUrl(cs) + }).catch((err) => { + this.logger.info({err, url}, 'TaskEnqueue:_bridgeLocal error sending bridged event'); + /** + * TODO: this probably means he dropped while we were connecting.... + * should we put this call back to the front of the queue so he gets serviced (?) + */ + this.state = QueueResults.Error; + reject(new Error('bridge failure')); + }); + + // resolve when either side hangs up + this.emitter + .on('hangup', () => { + this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from dequeue party'); + ep.unbridge().catch((err) => {}); + resolve(); + }) + .on('kill', () => { + this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from enqeue party'); + ep.unbridge().catch((err) => {}); + + // notify partner that we dropped + bent('POST', 202)(this.bridgeDetails.notifyUrl, {event: 'hangup'}).catch((err) => { + this.logger.info(err, 'TaskEnqueue:_bridgeLocal error sending hangup event to partner'); + }); + resolve(); + }); + } catch (err) { + this.state = QueueResults.Error; + this.logger.error(err, `Failed to bridge to ep ${this.other.epUuid}`); + reject(err); + } + }); + } + + /** + * We are being dequeued and bridged to another call. + * It may be on this server or a different one, and we are + * given instructions how to find it and connect. + * @param {Object} opts + * @param {string} opts.epUuid uuid of the endpoint we need to bridge to + * @param {string} opts.dequeueSipAddress ip:port of the feature server hosting the other call + */ + notifyQueueEvent(cs, opts) { + if (opts.event === 'dequeue') { + if (this.bridgeNow) return; + this.logger.info({opts}, `TaskEnqueue:notifyDequeueEvent: leaving ${this.queueName} because someone wants me`); + assert(opts.dequeueSipAddress && opts.epUuid && opts.notifyUrl); + this.emitter.emit('dequeue', opts); + } + else if (opts.event === 'hangup') { + this.emitter.emit('hangup'); + } + else { + this.logger.error({opts}, 'TaskEnqueue:notifyDequeueEvent - unsupported event/payload'); + } + } + + async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) { + const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers; + + assert(!this._playSession); + if (this.killed) return []; + + const params = { + queueSid: this.queueName, + queueTime: getElapsedTime(this.waitStartTime) + }; + try { + const queueSize = await lengthOfList(this.queueName); + const queuePosition = await getListPosition(this.queueName, this.notifyUrl); + Object.assign(params, {queueSize, queuePosition}); + } catch (err) { + this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`); + } + const json = await cs.application.requestor.request(hook, params); + + const allowedTasks = json.filter((task) => allowed.includes(task.verb)); + if (json.length !== allowedTasks.length) { + this.logger.debug({json, allowedTasks}, 'unsupported task'); + throw new Error(`unsupported verb in dial enqueue waitHook: only ${JSON.stringify(allowed)}`); + } + this.logger.debug(`TaskEnqueue:_playHook: executing ${json.length} tasks`); + + // check for 'leave' verb and only execute tasks up till then + const tasksToRun = []; + let leave = false; + for (const o of json) { + if (o.verb === TaskName.Leave) { + leave = true; + this.logger.info('waitHook returned a leave task'); + break; + } + tasksToRun.push(o); + } + + if (this.killed) return []; + else if (tasksToRun.length > 0) { + const tasks = normalizeJambones(this.logger, tasksToRun).map((tdata) => makeTask(this.logger, tdata)); + this._playSession = new ConfirmCallSession({ + logger: this.logger, + application: cs.application, + dlg, + ep: cs.ep, + callInfo: cs.callInfo, + tasks + }); + await this._playSession.exec(); + this._playSession = null; + } + if (leave) { + this.state = QueueResults.Leave; + this.kill(cs); + } + return tasksToRun; + } +} + +module.exports = TaskEnqueue; diff --git a/lib/tasks/leave.js b/lib/tasks/leave.js new file mode 100644 index 00000000..9080dd19 --- /dev/null +++ b/lib/tasks/leave.js @@ -0,0 +1,22 @@ +const Task = require('./task'); +const {TaskName} = require('../utils/constants'); + +class TaskLeave extends Task { + constructor(logger, opts, parentTask) { + super(logger, opts); + } + + get name() { return TaskName.Leave; } + + async exec(cs, ep) { + await super.exec(cs); + await this.awaitTaskDone(); + } + + async kill(cs) { + super.kill(cs); + this.notifyTaskDone(); + } +} + +module.exports = TaskLeave; diff --git a/lib/tasks/make_task.js b/lib/tasks/make_task.js index 79622893..586e64ea 100644 --- a/lib/tasks/make_task.js +++ b/lib/tasks/make_task.js @@ -24,9 +24,18 @@ function makeTask(logger, obj, parent) { case TaskName.Dial: const TaskDial = require('./dial'); return new TaskDial(logger, data, parent); + case TaskName.Dequeue: + const TaskDequeue = require('./dequeue'); + return new TaskDequeue(logger, data, parent); + case TaskName.Enqueue: + const TaskEnqueue = require('./enqueue'); + return new TaskEnqueue(logger, data, parent); case TaskName.Hangup: const TaskHangup = require('./hangup'); return new TaskHangup(logger, data, parent); + case TaskName.Leave: + const TaskLeave = require('./leave'); + return new TaskLeave(logger, data, parent); case TaskName.Say: const TaskSay = require('./say'); return new TaskSay(logger, data, parent); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 285b9bf3..8a735223 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -29,7 +29,8 @@ class TaskSay extends Task { text: this.text[segment], vendor: cs.speechSynthesisVendor, language: cs.speechSynthesisLanguage, - voice: cs.speechSynthesisVoice + voice: cs.speechSynthesisVoice, + salt: cs.callSid }, this.synthesizer); const path = await synthAudio(opts); filepath.push(path); diff --git a/lib/tasks/specs.json b/lib/tasks/specs.json index 14dc2413..1287ebc8 100644 --- a/lib/tasks/specs.json +++ b/lib/tasks/specs.json @@ -9,6 +9,32 @@ "status" ] }, + "dequeue": { + "properties": { + "name": "string", + "actionHook": "object|string", + "timeout": "number" + }, + "required": [ + "name" + ] + }, + "enqueue": { + "properties": { + "name": "string", + "actionHook": "object|string", + "waitHook": "object|string", + "_": "object" + }, + "required": [ + "name" + ] + }, + "leave": { + "properties": { + + } + }, "hangup": { "properties": { "headers": "object" diff --git a/lib/tasks/task.js b/lib/tasks/task.js index d9c1770d..c140bc30 100644 --- a/lib/tasks/task.js +++ b/lib/tasks/task.js @@ -1,4 +1,5 @@ const Emitter = require('events'); +const uuidv4 = require('uuid/v4'); const debug = require('debug')('jambonz:feature-server'); const assert = require('assert'); const {TaskPreconditions} = require('../utils/constants'); @@ -98,6 +99,38 @@ class Task extends Emitter { } } + async transferCallToFeatureServer(cs, sipAddress, opts) { + const uuid = uuidv4(); + const {addKey} = cs.srf.locals.dbHelpers; + const obj = Object.assign({}, cs.application); + delete obj.requestor; + delete obj.notifier; + obj.tasks = cs.getRemainingTaskData(); + if (opts && obj.tasks.length > 1) obj.tasks[0]._ = opts; + + this.logger.debug({obj}, 'Task:_doRefer'); + + const success = await addKey(uuid, JSON.stringify(obj), 30); + if (!success) { + this.logger.info(`Task:_doRefer failed storing task data before REFER for ${this.queueName}`); + return; + } + try { + this.logger.info(`Task:_doRefer: referring call to ${sipAddress} for ${this.queueName}`); + this.callMoved = true; + const success = await cs.referCall(`sip:context-${uuid}@${sipAddress}`); + if (!success) { + this.callMoved = false; + this.logger.info('Task:_doRefer REFER failed'); + return success; + } + this.logger.info('Task:_doRefer REFER succeeded'); + return success; + } catch (err) { + this.logger.error(err, 'Task:_doRefer error'); + } + } + /** * validate that the JSON task description is valid * @param {string} name - verb name diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 645fda93..6a5aec2e 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -1,9 +1,12 @@ { "TaskName": { "Conference": "conference", + "Dequeue": "dequeue", "Dial": "dial", + "Enqueue": "enqueue", "Gather": "gather", "Hangup": "hangup", + "Leave": "leave", "Listen": "listen", "Pause": "pause", "Play": "play", @@ -66,5 +69,17 @@ "StandbyEnter": "standby-enter", "StandbyExit": "standby-exit" }, + "QueueResults": { + "Bridged": "bridged", + "Error": "error", + "Wait": "hangup", + "Leave": "leave" + }, + "DequeueResults": { + "Bridged": "bridged", + "Error": "error", + "Hangup": "hangup", + "Timeout": "timeout" + }, "MAX_SIMRINGS": 10 } diff --git a/lib/utils/install-srf-locals.js b/lib/utils/install-srf-locals.js index bebcfa40..d0077b22 100644 --- a/lib/utils/install-srf-locals.js +++ b/lib/utils/install-srf-locals.js @@ -122,7 +122,12 @@ function installSrfLocals(srf, logger) { retrieveKey, retrieveSet, addToSet, - removeFromSet + removeFromSet, + pushBack, + popFront, + removeFromList, + lengthOfList, + getListPosition } = require('jambonz-realtimedb-helpers')({ host: process.env.JAMBONES_REDIS_HOST, port: process.env.JAMBONES_REDIS_PORT || 6379 @@ -145,7 +150,12 @@ function installSrfLocals(srf, logger) { retrieveKey, retrieveSet, addToSet, - removeFromSet + removeFromSet, + pushBack, + popFront, + removeFromList, + lengthOfList, + getListPosition }, parentLogger: logger, ipv4: localIp, diff --git a/package-lock.json b/package-lock.json index 65b4fd26..7aaa7815 100644 --- a/package-lock.json +++ b/package-lock.json @@ -260,9 +260,9 @@ } }, "@grpc/grpc-js": { - "version": "0.7.9", - "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-0.7.9.tgz", - "integrity": "sha512-ihn9xWOqubMPBlU77wcYpy7FFamGo5xtsK27EAILL/eoOvGEAq29UOrqRvqYPwWfl2+3laFmGKNR7uCdJhKu4Q==", + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.0.3.tgz", + "integrity": "sha512-JKV3f5Bv2TZxK6eJSB9EarsZrnLxrvcFNwI9goq0YRXa3S6NNoCSnI3cG3lkXVIJ03Wng1WXe76kc2JQtRe7AQ==", "requires": { "semver": "^6.2.0" } @@ -368,9 +368,9 @@ "integrity": "sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==" }, "@types/node": { - "version": "13.13.2", - "resolved": "https://registry.npmjs.org/@types/node/-/node-13.13.2.tgz", - "integrity": "sha512-LB2R1Oyhpg8gu4SON/mfforE525+Hi/M1ineICEDftqNVTyFg1aRIeGuTvXAoWHc4nbrFncWtJgMmoyRvuGh7A==" + "version": "13.13.4", + "resolved": "https://registry.npmjs.org/@types/node/-/node-13.13.4.tgz", + "integrity": "sha512-x26ur3dSXgv5AwKS0lNfbjpCakGIduWU1DU91Zz58ONRWrIKGunmZBNv4P7N+e27sJkiGDsw/3fT4AtsqQBrBA==" }, "abort-controller": { "version": "3.0.0", @@ -519,9 +519,9 @@ "integrity": "sha1-7GphrlZIDAw8skHJVhjiCJL5Zyo=" }, "aws-sdk": { - "version": "2.663.0", - "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.663.0.tgz", - "integrity": "sha512-xPOszNOaSXTRs8VGXaMbhTKXdlq2TlDRfFRVEGxkZrtow87hEIVZGAUSUme2e3GHqHUDnySwcufrUpUPUizOKQ==", + "version": "2.669.0", + "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.669.0.tgz", + "integrity": "sha512-kuVcSRpDzvkgmeSmMX6Q32eTOb8UeihhUdavMrvUOP6fzSU19cNWS9HAIkYOi/jrEDK85cCZxXjxqE3JGZIGcw==", "requires": { "buffer": "4.9.1", "events": "1.1.1", @@ -1780,11 +1780,11 @@ } }, "google-gax": { - "version": "1.15.2", - "resolved": "https://registry.npmjs.org/google-gax/-/google-gax-1.15.2.tgz", - "integrity": "sha512-yNNiRf9QxWpZNfQQmSPz3rIDTBDDKnLKY/QEsjCaJyDxttespr6v8WRGgU5KrU/6ZM7QRlgBAYXCkxqHhJp0wA==", + "version": "1.15.3", + "resolved": "https://registry.npmjs.org/google-gax/-/google-gax-1.15.3.tgz", + "integrity": "sha512-3JKJCRumNm3x2EksUTw4P1Rad43FTpqrtW9jzpf3xSMYXx+ogaqTM1vGo7VixHB4xkAyATXVIa3OcNSh8H9zsQ==", "requires": { - "@grpc/grpc-js": "^0.7.4", + "@grpc/grpc-js": "~1.0.3", "@grpc/proto-loader": "^0.5.1", "@types/fs-extra": "^8.0.1", "@types/long": "^4.0.0", @@ -1826,9 +1826,9 @@ }, "dependencies": { "mime": { - "version": "2.4.4", - "resolved": "https://registry.npmjs.org/mime/-/mime-2.4.4.tgz", - "integrity": "sha512-LRxmNwziLPT828z+4YkNzloCFC2YM4wrB99k+AV5ZbEyfGNWfG8SO1FUXLmLDBSo89NrJZ4DIWeLjy1CHGhMGA==" + "version": "2.4.5", + "resolved": "https://registry.npmjs.org/mime/-/mime-2.4.5.tgz", + "integrity": "sha512-3hQhEUF027BuxZjQA3s7rIv/7VCQPa27hN9u9g87sEkWaKwQPuXOkVKtOeiyUrnWqTDiOs8Ed2rwg733mB0R5w==" } } }, @@ -2384,9 +2384,9 @@ } }, "jambonz-realtimedb-helpers": { - "version": "0.2.11", - "resolved": "https://registry.npmjs.org/jambonz-realtimedb-helpers/-/jambonz-realtimedb-helpers-0.2.11.tgz", - "integrity": "sha512-/QWjRgCvBcZyzt4NfKKDA54Cm4/tEzXH4QkLN4Yy/ZjGZYHaY2QRd76A6rV8/gnTlgoVDUd2iH0zyapLGZg7Aw==", + "version": "0.2.12", + "resolved": "https://registry.npmjs.org/jambonz-realtimedb-helpers/-/jambonz-realtimedb-helpers-0.2.12.tgz", + "integrity": "sha512-KcuXPSwxc9cHXz9uzuI9HxnHcKRHbwwq7BSFNZT61lF7KZdZDBozenDpP0Tmk/P65SN1gLbtWP4KM2W4FTtv/Q==", "requires": { "@google-cloud/text-to-speech": "^2.2.0", "aws-sdk": "^2.631.0", diff --git a/package.json b/package.json index a8850ed3..c57c6a38 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "ip": "^1.1.5", "jambonz-db-helpers": "^0.3.2", "jambonz-mw-registrar": "^0.1.3", - "jambonz-realtimedb-helpers": "^0.2.11", + "jambonz-realtimedb-helpers": "^0.2.13", "jambonz-stats-collector": "^0.0.3", "moment": "^2.24.0", "parse-url": "^5.0.1",