initial support for conference and queues

This commit is contained in:
Dave Horton
2020-05-06 15:27:24 -04:00
parent 419c5ea9fd
commit a0508a2494
18 changed files with 711 additions and 68 deletions

View File

@@ -5,7 +5,6 @@ const {TaskName, TaskPreconditions} = require('../utils/constants');
const normalizeJambones = require('../utils/normalize-jambones');
const makeTask = require('./make_task');
const bent = require('bent');
const uuidv4 = require('uuid/v4');
const assert = require('assert');
const WAIT = 'wait';
const JOIN = 'join';
@@ -239,7 +238,7 @@ class Conference extends Task {
localServer: cs.srf.locals.localSipAddress,
confServer: this.joinDetails.conferenceSipAddress
}, `Conference:_doJoin: conference ${this.confName} is hosted elsewhere`);
const success = await this._doRefer(cs, this.joinDetails.conferenceSipAddress);
const success = await this.transferCallToFeatureServer(cs, this.joinDetails.conferenceSipAddress);
/**
* If the REFER succeeded, we will get a BYE from the SBC
@@ -277,7 +276,7 @@ class Conference extends Task {
this.logger.info({members}, `Conference:doStart - notifying waiting list for ${this.confName}`);
for (const url of members) {
try {
await bent('POST', 202)(url, {conferenceSipAddress: cs.srf.locals.localSipAddress});
await bent('POST', 202)(url, {event: 'start', conferenceSipAddress: cs.srf.locals.localSipAddress});
} catch (err) {
this.logger.info(err, `Failed notifying ${url} to join ${this.confName}`);
}
@@ -363,37 +362,6 @@ class Conference extends Task {
this.emitter.emit('join', opts);
}
async _doRefer(cs, sipAddress) {
const uuid = uuidv4();
const {addKey} = cs.srf.locals.dbHelpers;
const obj = Object.assign({}, cs.application);
delete obj.requestor;
delete obj.notifier;
obj.tasks = cs.getRemainingTaskData();
this.logger.debug({obj}, 'Conference:_doRefer');
const success = await addKey(uuid, JSON.stringify(obj), 30);
if (!success) {
this.logger.info(`Conference:_doRefer failed storing task data before REFER for ${this.confName}`);
return;
}
try {
this.logger.info(`Conference:_doRefer: referring call to ${sipAddress} for ${this.confName}`);
this.callMoved = true;
const success = await cs.referCall(`sip:context-${uuid}@${sipAddress}`);
if (!success) {
this.callMoved = false;
this.logger.info('Conference:_doRefer REFER failed');
return success;
}
this.logger.info('Conference:_doRefer REFER succeeded');
return success;
} catch (err) {
this.logger.error(err, 'Conference:_doRefer error');
}
}
/**
* Add ourselves to the waitlist of sessions to be notified once
* the conference starts
@@ -402,7 +370,7 @@ class Conference extends Task {
async _addToWaitList(cs) {
const {addToSet} = cs.srf.locals.dbHelpers;
const setName = getWaitListName(this.confName);
const url = `${cs.srf.locals.serviceUrl}/v1/startConference/${cs.callSid}`;
const url = `${cs.srf.locals.serviceUrl}/v1/conference/${cs.callSid}`;
const added = await addToSet(setName, url);
if (added !== 1) throw new Error(`failed adding to the waitlist for conference ${this.confName}: ${added}`);
this.logger.debug(`successfully added to the waiting list for conference ${this.confName}`);
@@ -411,7 +379,7 @@ class Conference extends Task {
async _removeFromWaitList(cs) {
const {removeFromSet} = cs.srf.locals.dbHelpers;
const setName = getWaitListName(this.confName);
const url = `${cs.srf.locals.serviceUrl}/v1/startConference/${cs.callSid}`;
const url = `${cs.srf.locals.serviceUrl}/v1/conference/${cs.callSid}`;
try {
const count = await removeFromSet(setName, url);
this.logger.debug(`Conference:_removeFromWaitList removed ${count} from waiting list`);

134
lib/tasks/dequeue.js Normal file
View File

@@ -0,0 +1,134 @@
const Task = require('./task');
const {TaskName, TaskPreconditions, DequeueResults} = require('../utils/constants');
const Emitter = require('events');
const bent = require('bent');
const assert = require('assert');
const sleepFor = (ms) => new Promise((resolve) => setTimeout(() => resolve(), ms));
const getUrl = (cs) => `${cs.srf.locals.serviceUrl}/v1/dequeue/${cs.callSid}`;
class TaskDequeue extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.queueName = this.data.name;
this.timeout = this.data.timeout || 0;
this.emitter = new Emitter();
this.state = DequeueResults.Timeout;
}
get name() { return TaskName.Dequeue; }
async exec(cs, ep) {
await super.exec(cs);
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;
const url = await this._getMemberFromQueue(cs);
if (!url) this.performAction({dequeueResult: 'timeout'}).catch((err) => {});
else {
try {
await this._dequeueUrl(cs, ep, url);
this.performAction({dequeueResult: 'complete'}).catch((err) => {});
} catch (err) {
this.emitter.removeAllListeners();
this.performAction({dequeueResult: 'hangup'}).catch((err) => {});
}
}
}
async kill(cs) {
super.kill(cs);
if (this.state === DequeueResults.Bridged) {
this.logger.info(`TaskDequeue:kill - notifying partner we are going away ${this.partnerUrl}`);
bent('POST', 202)(this.partnerUrl, {event: 'hangup'}).catch((err) => {
this.logger.info(err, 'TaskDequeue:kill error notifying partner of hangup');
});
}
this.emitter.emit('kill');
}
_getMemberFromQueue(cs) {
const {popFront} = cs.srf.locals.dbHelpers;
return new Promise(async(resolve) => {
let timer;
let timedout = false, found = false;
if (this.timeout > 0) {
timer = setTimeout(() => {
this.logger.info(`TaskDequeue:_getMemberFromQueue timed out after ${this.timeout}s`);
timedout = true;
resolve();
}, this.timeout * 1000);
}
do {
try {
const url = await popFront(this.queueName);
if (url) {
found = true;
clearTimeout(timer);
resolve(url);
}
} catch (err) {
this.logger.debug({err}, 'TaskDequeue:_getMemberFromQueue error popFront');
}
await sleepFor(5000);
} while (!this.killed && !timedout && !found);
});
}
_dequeueUrl(cs, ep, url) {
this.partnerUrl = url;
return new Promise(async(resolve, reject) => {
let bridgeTimer;
this.emitter
.on('bridged', () => {
clearTimeout(bridgeTimer);
this.state = DequeueResults.Bridged;
})
.on('hangup', () => {
this.logger.info('TaskDequeue:_dequeueUrl hangup from partner');
resolve();
})
.on('kill', () => {
resolve();
});
// now notify partner to bridge to me
try {
await bent('POST', 202)(url, {
event: 'dequeue',
dequeueSipAddress: cs.srf.locals.localSipAddress,
epUuid: ep.uuid,
notifyUrl: getUrl(cs)
});
bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000);
} catch (err) {
this.logger.info({err, url}, `TaskDequeue:_dequeueUrl error dequeueing from ${this.queueName}, try again`);
reject(new Error('bridge failure'));
}
});
}
notifyQueueEvent(cs, opts) {
if (opts.event === 'bridged') {
assert(opts.notifyUrl);
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: successfully bridged to member from ${this.queueName}`);
this.partnerUrl = opts.notifyUrl;
this.emitter.emit('bridged');
}
else if (opts.event === 'hangup') {
this.emitter.emit('hangup');
}
else {
this.logger.error({opts}, 'TaskDequeue:notifyDequeueEvent - unsupported event/payload');
}
}
}
module.exports = TaskDequeue;

314
lib/tasks/enqueue.js Normal file
View File

@@ -0,0 +1,314 @@
const Task = require('./task');
const Emitter = require('events');
const ConfirmCallSession = require('../session/confirm-call-session');
const normalizeJambones = require('../utils/normalize-jambones');
const makeTask = require('./make_task');
const {TaskName, TaskPreconditions, QueueResults} = require('../utils/constants');
const bent = require('bent');
const assert = require('assert');
const getUrl = (cs) => `${cs.srf.locals.serviceUrl}/v1/enqueue/${cs.callSid}`;
const getElapsedTime = (from) => Math.floor((Date.now() - from) / 1000);
class TaskEnqueue extends Task {
constructor(logger, opts) {
super(logger, opts);
this.logger = logger;
this.preconditions = TaskPreconditions.Endpoint;
this.queueName = this.data.name;
this.waitHook = this.data.waitHook;
this.emitter = new Emitter();
this.state = QueueResults.Wait;
// transferred from another server in order to bridge to a local caller?
if (this.data._) {
this.bridgeNow = true;
this.bridgeDetails = {
epUid: this.data._.epUuid,
notifyUrl: this.data._.notifyUrl
};
this.waitStartTime = this.data._.waitStartTime;
}
}
get name() { return TaskName.Enqueue; }
async exec(cs, ep) {
await super.exec(cs);
const dlg = cs.dlg;
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;
try {
if (!this.bridgeNow) {
await this._addToQueue(cs, dlg, ep);
await this._doWait(cs, dlg, ep);
}
else {
await this._doBridge(cs, dlg, ep);
}
if (!this.callMoved) await this.performAction();
this.logger.debug(`TaskEnqueue:exec - task done queue ${this.queueName}`);
} catch (err) {
this.logger.info(err, `TaskEnqueue:exec - error in enqueue ${this.queueName}`);
}
}
async kill(cs) {
super.kill(cs);
this.logger.info(`TaskEnqueue:kill ${this.queueName}`);
this.emitter.emit('kill');
}
async _addToQueue(cs, dlg) {
const {pushBack} = cs.srf.locals.dbHelpers;
const url = getUrl(cs);
this.waitStartTime = Date.now();
this.logger.debug({queue: this.queueName, url}, 'pushing url onto queue');
const members = await pushBack(this.queueName, url);
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
this.notifyUrl = url;
}
async _removeFromQueue(cs, dlg) {
const {removeFromList} = cs.srf.locals.dbHelpers;
return await removeFromList(this.queueName, getUrl(cs));
}
async performAction() {
const params = {
queueSid: this.queueName,
queueTime: getElapsedTime(this.waitStartTime),
queueResult: this.state
};
await super.performAction(params);
}
/**
* Add ourselves to the queue with a url that can be invoked to tell us to dequeue
* @param {CallSession} cs
* @param {SipDialog} dlg
*/
async _doWait(cs, dlg, ep) {
return new Promise(async(resolve, reject) => {
this.emitter
.once('dequeue', (opts) => {
this.bridgeDetails = opts;
this.logger.info({bridgeDetails: this.bridgeDetails}, `time to dequeue from ${this.queueName}`);
if (this._playSession) {
this._playSession.kill();
this._playSession = null;
}
resolve(this._doBridge(cs, dlg, ep));
})
.once('kill', () => {
this._removeFromQueue(cs);
if (this._playSession) {
this.logger.debug('killing waitUrl');
this._playSession.kill();
this._playSession = null;
}
resolve();
});
if (this.waitHook && !this.killed) {
do {
try {
await ep.play('silence_stream://500');
const tasks = await this._playHook(cs, dlg, this.waitHook);
if (0 === tasks.length) break;
} catch (err) {
if (!this.bridgeDetails && !this.killed) {
this.logger.info(err, `TaskEnqueue:_doWait: failed retrieving waitHook for ${this.queueName}`);
}
this._playSession = null;
break;
}
} while (!this.killed && !this.bridgeDetails);
}
});
}
/**
* Bridge to another call.
* The call may be homed on this feature server, or another one -
* in the latter case, move the call to the other server via REFER
* Returns a promise that resolves:
* (a) When the call is transferred to the other feature server if the dequeue-er is not local, or
* (b) When either party hangs up the bridged call
* @param {CallSession} cs
* @param {SipDialog} dlg
*/
async _doBridge(cs, dlg, ep) {
assert(this.bridgeNow || this.bridgeDetails.dequeueSipAddress);
if (!this.bridgeNow && cs.srf.locals.localSipAddress !== this.bridgeDetails.dequeueSipAddress) {
this.logger.info({
localServer: cs.srf.locals.localSipAddress,
otherServer: this.bridgeDetails.dequeueSipAddress
}, `TaskEnqueue:_doBridge: leg for queue ${this.queueName} is hosted elsewhere`);
const success = await this.transferCallToFeatureServer(cs, this.bridgeDetails.dequeueSipAddress, {
waitStartTime: this.waitStartTime,
epUuid: this.bridgeDetails.epUuid,
notifyUrl: this.bridgeDetails.notifyUrl
});
/**
* If the REFER succeeded, we will get a BYE from the SBC
* which will trigger kill and the end of the execution of the CallSession
* which is what we want - so do nothing and let that happen.
* If on the other hand, the REFER failed then we are in a bad state
* and need to end the enqueue task with a failure indication and
* allow the application to continue on
*/
if (success) {
this.logger.info(`TaskEnqueue:_doBridge: REFER of ${this.queueName} succeeded`);
return;
}
this.state = QueueResults.Error;
return;
}
this.logger.info(`TaskEnqueue:_doBridge: queue ${this.queueName} is hosted locally`);
await this._bridgeLocal(cs, dlg, ep);
}
_bridgeLocal(cs, dlg, ep) {
assert(this.bridgeDetails.epUuid && this.bridgeDetails.notifyUrl);
return new Promise(async(resolve, reject) => {
try {
this.other = {epUuid: this.bridgeDetails.epUuid};
// bridge to the dequeuing endpoint
this.logger.debug(`TaskEnqueue:_doBridge: attempting to bridge call to ${this.other.epUuid}`);
await ep.bridge(this.other.epUuid);
this.state = QueueResults.Bridged;
this.logger.info(`TaskEnqueue:_doBridge: successfully bridged to ${this.other.epUuid}`);
// notify partner we are on - give him our possibly new url
const url = this.bridgeDetails.notifyUrl;
bent('POST', 202)(url, {
event: 'bridged',
notifyUrl: getUrl(cs)
}).catch((err) => {
this.logger.info({err, url}, 'TaskEnqueue:_bridgeLocal error sending bridged event');
/**
* TODO: this probably means he dropped while we were connecting....
* should we put this call back to the front of the queue so he gets serviced (?)
*/
this.state = QueueResults.Error;
reject(new Error('bridge failure'));
});
// resolve when either side hangs up
this.emitter
.on('hangup', () => {
this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from dequeue party');
ep.unbridge().catch((err) => {});
resolve();
})
.on('kill', () => {
this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from enqeue party');
ep.unbridge().catch((err) => {});
// notify partner that we dropped
bent('POST', 202)(this.bridgeDetails.notifyUrl, {event: 'hangup'}).catch((err) => {
this.logger.info(err, 'TaskEnqueue:_bridgeLocal error sending hangup event to partner');
});
resolve();
});
} catch (err) {
this.state = QueueResults.Error;
this.logger.error(err, `Failed to bridge to ep ${this.other.epUuid}`);
reject(err);
}
});
}
/**
* We are being dequeued and bridged to another call.
* It may be on this server or a different one, and we are
* given instructions how to find it and connect.
* @param {Object} opts
* @param {string} opts.epUuid uuid of the endpoint we need to bridge to
* @param {string} opts.dequeueSipAddress ip:port of the feature server hosting the other call
*/
notifyQueueEvent(cs, opts) {
if (opts.event === 'dequeue') {
if (this.bridgeNow) return;
this.logger.info({opts}, `TaskEnqueue:notifyDequeueEvent: leaving ${this.queueName} because someone wants me`);
assert(opts.dequeueSipAddress && opts.epUuid && opts.notifyUrl);
this.emitter.emit('dequeue', opts);
}
else if (opts.event === 'hangup') {
this.emitter.emit('hangup');
}
else {
this.logger.error({opts}, 'TaskEnqueue:notifyDequeueEvent - unsupported event/payload');
}
}
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) {
const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers;
assert(!this._playSession);
if (this.killed) return [];
const params = {
queueSid: this.queueName,
queueTime: getElapsedTime(this.waitStartTime)
};
try {
const queueSize = await lengthOfList(this.queueName);
const queuePosition = await getListPosition(this.queueName, this.notifyUrl);
Object.assign(params, {queueSize, queuePosition});
} catch (err) {
this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`);
}
const json = await cs.application.requestor.request(hook, params);
const allowedTasks = json.filter((task) => allowed.includes(task.verb));
if (json.length !== allowedTasks.length) {
this.logger.debug({json, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in dial enqueue waitHook: only ${JSON.stringify(allowed)}`);
}
this.logger.debug(`TaskEnqueue:_playHook: executing ${json.length} tasks`);
// check for 'leave' verb and only execute tasks up till then
const tasksToRun = [];
let leave = false;
for (const o of json) {
if (o.verb === TaskName.Leave) {
leave = true;
this.logger.info('waitHook returned a leave task');
break;
}
tasksToRun.push(o);
}
if (this.killed) return [];
else if (tasksToRun.length > 0) {
const tasks = normalizeJambones(this.logger, tasksToRun).map((tdata) => makeTask(this.logger, tdata));
this._playSession = new ConfirmCallSession({
logger: this.logger,
application: cs.application,
dlg,
ep: cs.ep,
callInfo: cs.callInfo,
tasks
});
await this._playSession.exec();
this._playSession = null;
}
if (leave) {
this.state = QueueResults.Leave;
this.kill(cs);
}
return tasksToRun;
}
}
module.exports = TaskEnqueue;

22
lib/tasks/leave.js Normal file
View File

@@ -0,0 +1,22 @@
const Task = require('./task');
const {TaskName} = require('../utils/constants');
class TaskLeave extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts);
}
get name() { return TaskName.Leave; }
async exec(cs, ep) {
await super.exec(cs);
await this.awaitTaskDone();
}
async kill(cs) {
super.kill(cs);
this.notifyTaskDone();
}
}
module.exports = TaskLeave;

View File

@@ -24,9 +24,18 @@ function makeTask(logger, obj, parent) {
case TaskName.Dial:
const TaskDial = require('./dial');
return new TaskDial(logger, data, parent);
case TaskName.Dequeue:
const TaskDequeue = require('./dequeue');
return new TaskDequeue(logger, data, parent);
case TaskName.Enqueue:
const TaskEnqueue = require('./enqueue');
return new TaskEnqueue(logger, data, parent);
case TaskName.Hangup:
const TaskHangup = require('./hangup');
return new TaskHangup(logger, data, parent);
case TaskName.Leave:
const TaskLeave = require('./leave');
return new TaskLeave(logger, data, parent);
case TaskName.Say:
const TaskSay = require('./say');
return new TaskSay(logger, data, parent);

View File

@@ -29,7 +29,8 @@ class TaskSay extends Task {
text: this.text[segment],
vendor: cs.speechSynthesisVendor,
language: cs.speechSynthesisLanguage,
voice: cs.speechSynthesisVoice
voice: cs.speechSynthesisVoice,
salt: cs.callSid
}, this.synthesizer);
const path = await synthAudio(opts);
filepath.push(path);

View File

@@ -9,6 +9,32 @@
"status"
]
},
"dequeue": {
"properties": {
"name": "string",
"actionHook": "object|string",
"timeout": "number"
},
"required": [
"name"
]
},
"enqueue": {
"properties": {
"name": "string",
"actionHook": "object|string",
"waitHook": "object|string",
"_": "object"
},
"required": [
"name"
]
},
"leave": {
"properties": {
}
},
"hangup": {
"properties": {
"headers": "object"

View File

@@ -1,4 +1,5 @@
const Emitter = require('events');
const uuidv4 = require('uuid/v4');
const debug = require('debug')('jambonz:feature-server');
const assert = require('assert');
const {TaskPreconditions} = require('../utils/constants');
@@ -98,6 +99,38 @@ class Task extends Emitter {
}
}
async transferCallToFeatureServer(cs, sipAddress, opts) {
const uuid = uuidv4();
const {addKey} = cs.srf.locals.dbHelpers;
const obj = Object.assign({}, cs.application);
delete obj.requestor;
delete obj.notifier;
obj.tasks = cs.getRemainingTaskData();
if (opts && obj.tasks.length > 1) obj.tasks[0]._ = opts;
this.logger.debug({obj}, 'Task:_doRefer');
const success = await addKey(uuid, JSON.stringify(obj), 30);
if (!success) {
this.logger.info(`Task:_doRefer failed storing task data before REFER for ${this.queueName}`);
return;
}
try {
this.logger.info(`Task:_doRefer: referring call to ${sipAddress} for ${this.queueName}`);
this.callMoved = true;
const success = await cs.referCall(`sip:context-${uuid}@${sipAddress}`);
if (!success) {
this.callMoved = false;
this.logger.info('Task:_doRefer REFER failed');
return success;
}
this.logger.info('Task:_doRefer REFER succeeded');
return success;
} catch (err) {
this.logger.error(err, 'Task:_doRefer error');
}
}
/**
* validate that the JSON task description is valid
* @param {string} name - verb name