Compare commits

...

16 Commits

Author SHA1 Message Date
Dave Horton
b9b70debc7 bugfix: 302 response in rest outdial caused restart 2021-09-27 10:38:49 -04:00
Dave Horton
beb9c8c81d fix bug in createCall 2021-09-27 08:42:17 -04:00
Dave Horton
90ed866404 add support for overrideTo and 302 redirect on rest outdial 2021-09-24 09:59:27 -04:00
Dave Horton
4d5876a6a0 race condition: dial call killed just as called party picks up 2021-08-10 11:00:47 -04:00
Dave Horton
2cd3e6e724 bugfix: enqueue queue_result = bridged if queued call was answered 2021-08-04 08:53:08 -04:00
Dave Horton
0df937fb85 bugfix: if waitUrl of enqueue task includes leave but caller is dequeued before leave is reached, ignore leave 2021-08-03 16:36:17 -04:00
Dave Horton
3226cae04b minor logging 2021-07-06 14:57:24 -04:00
Dave Horton
29440a1c9d bugfix for transferring conferene 2021-07-02 13:22:55 -04:00
Dave Horton
2b782bc5f3 bugfix for enqueue and dial when these tasks are replaced by LCC 2021-06-29 11:04:12 -04:00
Dave Horton
9bd406fcba bugfix: undefined reference to endpoint and validation fix for wait tasks 2021-06-29 11:03:20 -04:00
Dave Horton
28df651c44 bugfix: transferring queued party to dequeuer on other FS fails if only 1 task 2021-06-28 16:40:33 -04:00
Dave Horton
05ae2c6039 bugfix: sns notifications do not require aws secrets in the env 2021-06-26 18:07:24 -04:00
Dave Horton
dc92f75529 linting 2021-06-23 11:24:40 -04:00
Dave Horton
c0a29c7a64 bugfix: if enqeue task is killed because it is being replaced with new app supplied by LCC, ignore any app returned from the actionHook as LCC takes precedence 2021-06-23 11:14:19 -04:00
Dave Horton
08c53aa586 send queue leave webhook when dequeued 2021-06-17 15:52:46 -04:00
Dave Horton
bb0ec8e184 initial changes for queue webhooks 2021-06-17 15:06:52 -04:00
13 changed files with 155 additions and 42 deletions

1
.gitignore vendored
View File

@@ -39,3 +39,4 @@ examples/*
ecosystem.config.js
.vscode
run-tests.sh

View File

@@ -43,6 +43,11 @@ router.post('/', async(req, res) => {
case 'user':
uri = `sip:${target.name}`;
to = target.name;
if (target.overrideTo) {
Object.assign(opts.headers, {
'X-Override-To': target.overrideTo
});
}
break;
case 'sip':
uri = target.sipUri;
@@ -89,8 +94,11 @@ router.post('/', async(req, res) => {
/* now launch the outdial */
try {
const dlg = await srf.createUAC(uri, opts, {
const dlg = await srf.createUAC(uri, {...opts, followRedirects: true, keepUriOnRedirect: true}, {
cbRequest: (err, inviteReq) => {
/* in case of 302 redirect, this gets called twice, ignore the second */
if (res.headersSent) return;
if (err) {
logger.error(err, 'createCall Error creating call');
res.status(500).send('Call Failure');

View File

@@ -1,15 +1,22 @@
const Emitter = require('events');
const fs = require('fs');
const {CallDirection, TaskPreconditions, CallStatus, TaskName} = require('../utils/constants');
const {CallDirection, TaskPreconditions, CallStatus, TaskName, KillReason} = require('../utils/constants');
const moment = require('moment');
const assert = require('assert');
const sessionTracker = require('./session-tracker');
const makeTask = require('../tasks/make_task');
const normalizeJambones = require('../utils/normalize-jambones');
const listTaskNames = require('../utils/summarize-tasks');
const Requestor = require('../utils/requestor');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks
WHERE webhook_sid =
(
SELECT queue_event_hook_sid FROM accounts where account_sid = ?
)`;
/**
* @classdesc Represents the execution context for a call.
* It holds the resources, such as the sip dialog and media server endpoint
@@ -48,6 +55,8 @@ class CallSession extends Emitter {
if (!this.isConfirmCallSession && !this.isSmsCallSession && !this.isAdultingCallSession) {
sessionTracker.add(this.callSid, this);
}
this._pool = srf.locals.dbHelpers.pool;
}
/**
@@ -342,7 +351,7 @@ class CallSession extends Emitter {
}
else {
/* we started a new app on the child leg, but nothing given for parent so hang him up */
this.currentTask.kill();
this.currentTask.kill(this);
}
}
@@ -479,7 +488,7 @@ class CallSession extends Emitter {
this.logger.info({tasks: listTaskNames(tasks)},
`CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`);
if (this.currentTask) {
this.currentTask.kill();
this.currentTask.kill(this, KillReason.Replaced);
this.currentTask = null;
}
}
@@ -488,7 +497,7 @@ class CallSession extends Emitter {
if (this.isConfirmCallSession) this.logger.debug('CallSession:kill (ConfirmSession)');
else this.logger.info('CallSession:kill');
if (this.currentTask) {
this.currentTask.kill();
this.currentTask.kill(this);
this.currentTask = null;
}
this.tasks = [];
@@ -708,6 +717,41 @@ class CallSession extends Emitter {
return {ms: this.ms, ep: this.ep};
}
/**
* If account was queue event webhook, send notification
* @param {*} obj - data to notify
*/
async performQueueWebhook(obj) {
if (typeof this.queueEventHookRequestor === 'undefined') {
const pp = this._pool.promise();
try {
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: looking up account');
const [r] = await pp.query(sqlRetrieveQueueEventHook, this.accountSid);
if (0 === r.length) {
this.logger.info({accountSid: this.accountSid}, 'performQueueWebhook: no webhook provisioned');
this.queueEventHookRequestor = null;
}
else {
this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found');
this.queueEventHookRequestor = new Requestor(this.logger, r[0]);
this.queueEventHook = r[0];
}
} catch (err) {
this.logger.error({err, accountSid: this.accountSid}, 'Error retrieving event hook');
this.queueEventHookRequestor = null;
}
}
if (null === this.queueEventHookRequestor) return;
/* send webhook */
const params = {...obj, ...this.callInfo.toJSON()};
this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook');
this.queueEventHookRequestor.request(this.queueEventHook, params)
.catch((err) => {
this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event');
});
}
/**
* A conference that the current task is waiting on has just started
* @param {*} opts

View File

@@ -27,7 +27,7 @@ function camelize(str) {
function unhandled(logger, cs, evt) {
this.participantCount = parseInt(evt.getHeader('Conference-Size'));
logger.debug({evt}, `unhandled conference event: ${evt.getHeader('Action')}`) ;
logger.debug(`unhandled conference event: ${evt.getHeader('Action')}`) ;
}
function capitalize(s) {
@@ -356,7 +356,7 @@ class Conference extends Task {
}
if (typeof this.maxParticipants === 'number' && this.maxParticipants > 1) {
this.endpoint.api('conference', `${this.confName} set max_members ${this.maxParticipants}`)
this.ep.api('conference', `${this.confName} set max_members ${this.maxParticipants}`)
.catch((err) => this.logger.error(err, `Error setting max participants to ${this.maxParticipants}`));
}
}
@@ -448,16 +448,16 @@ class Conference extends Task {
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) {
assert(!this._playSession);
const json = await cs.application.requestor.request(hook, cs.callInfo);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = json.filter((task) => allowed.includes(task.verb));
if (json.length !== allowedTasks.length) {
this.logger.debug({json, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in dial conference wait/enterHook: only ${JSON.stringify(allowed)}`);
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
if (tasks.length !== allowedTasks.length) {
this.logger.debug({tasks, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in conference waitHook: only ${JSON.stringify(allowed)}`);
}
this.logger.debug(`Conference:_playHook: executing ${json.length} tasks`);
this.logger.debug(`Conference:_playHook: executing ${tasks.length} tasks`);
if (json.length > 0) {
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks.length > 0) {
this._playSession = new ConfirmCallSession({
logger: this.logger,
application: cs.application,
@@ -514,9 +514,6 @@ class Conference extends Task {
const functionName = `_on${capitalize(camelize(action))}`;
(Conference.prototype[functionName] || unhandled).bind(this, this.logger, cs, evt)() ;
}
else {
this.logger.debug(`Conference#__onConferenceEvent: got unhandled custom event: ${eventName}`) ;
}
}
// conference event handlers

View File

@@ -110,7 +110,8 @@ class TaskDequeue extends Task {
event: 'dequeue',
dequeueSipAddress: cs.srf.locals.localSipAddress,
epUuid: ep.uuid,
notifyUrl: getUrl(cs)
notifyUrl: getUrl(cs),
dequeuer: cs.callInfo.toJSON()
});
this.logger.info(`TaskDequeue:_dequeueUrl successfully sent POST to ${url}`);
bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000);

View File

@@ -1,6 +1,13 @@
const Task = require('./task');
const makeTask = require('./make_task');
const {CallStatus, CallDirection, TaskName, TaskPreconditions, MAX_SIMRINGS} = require('../utils/constants');
const {
CallStatus,
CallDirection,
TaskName,
TaskPreconditions,
MAX_SIMRINGS,
KillReason
} = require('../utils/constants');
const assert = require('assert');
const placeCall = require('../utils/place-outdial');
const sessionTracker = require('../session/session-tracker');
@@ -140,7 +147,7 @@ class TaskDial extends Task {
await this.awaitTaskDone();
this.logger.debug({callSid: this.cs.callSid}, 'Dial:exec task is done, sending actionHook if any');
await this.performAction(this.results);
await this.performAction(this.results, this.killReason !== KillReason.Replaced);
this._removeDtmfDetection(cs, this.epOther);
this._removeDtmfDetection(cs, this.ep);
} catch (err) {
@@ -149,8 +156,9 @@ class TaskDial extends Task {
}
}
async kill(cs) {
async kill(cs, reason) {
super.kill(cs);
this.killReason = reason || KillReason.Hangup;
this._removeDtmfDetection(this.cs, this.epOther);
this._removeDtmfDetection(this.cs, this.ep);
this.logger.debug({callSid: this.cs.callSid}, 'Dial:kill removed dtmf listeners');

View File

@@ -3,7 +3,7 @@ const Emitter = require('events');
const ConfirmCallSession = require('../session/confirm-call-session');
const normalizeJambones = require('../utils/normalize-jambones');
const makeTask = require('./make_task');
const {TaskName, TaskPreconditions, QueueResults} = require('../utils/constants');
const {TaskName, TaskPreconditions, QueueResults, KillReason} = require('../utils/constants');
const bent = require('bent');
const assert = require('assert');
@@ -61,9 +61,10 @@ class TaskEnqueue extends Task {
}
}
async kill(cs) {
async kill(cs, reason) {
super.kill(cs);
this.logger.info(`TaskEnqueue:kill ${this.queueName}`);
this.killReason = reason || KillReason.Hangup;
this.logger.info(`TaskEnqueue:kill ${this.queueName} with reason ${this.killReason}`);
this.emitter.emit('kill');
this.notifyTaskDone();
}
@@ -76,11 +77,20 @@ class TaskEnqueue extends Task {
const members = await pushBack(this.queueName, url);
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
this.notifyUrl = url;
/* invoke account-level webhook for queue event notifications */
cs.performQueueWebhook({
event: 'join',
queue: this.data.name,
length: members,
joinTime: this.waitStartTime
});
}
async _removeFromQueue(cs, dlg) {
const {removeFromList} = cs.srf.locals.dbHelpers;
return await removeFromList(this.queueName, getUrl(cs));
async _removeFromQueue(cs) {
const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers;
await removeFromList(this.queueName, getUrl(cs));
return await lengthOfList(this.queueName);
}
async performAction() {
@@ -89,7 +99,7 @@ class TaskEnqueue extends Task {
queueTime: getElapsedTime(this.waitStartTime),
queueResult: this.state
};
await super.performAction(params);
await super.performAction(params, this.killReason !== KillReason.Replaced);
}
/**
@@ -104,13 +114,28 @@ class TaskEnqueue extends Task {
this.bridgeDetails = opts;
this.logger.info({bridgeDetails: this.bridgeDetails}, `time to dequeue from ${this.queueName}`);
if (this._playSession) {
this._leave = false;
this._playSession.kill();
this._playSession = null;
}
resolve(this._doBridge(cs, dlg, ep));
})
.once('kill', () => {
this._removeFromQueue(cs);
.once('kill', async() => {
/* invoke account-level webhook for queue event notifications */
if (!this.dequeued) {
try {
const members = await this._removeFromQueue(cs);
cs.performQueueWebhook({
event: 'leave',
queue: this.data.name,
length: members,
leaveReason: this.killReason !== KillReason.Replaced ? 'abandoned' : 'redirected',
leaveTime: Date.now()
});
} catch (err) {}
}
if (this._playSession) {
this.logger.debug('killing waitUrl');
this._playSession.kill();
@@ -209,6 +234,7 @@ class TaskEnqueue extends Task {
});
// resolve when either side hangs up
this.state = QueueResults.Bridged;
this.emitter
.on('hangup', () => {
this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from dequeue party');
@@ -216,7 +242,7 @@ class TaskEnqueue extends Task {
resolve();
})
.on('kill', () => {
this.logger.info('TaskEnqueue:_bridgeLocal ending with hangup from enqeue party');
this.logger.info(`TaskEnqueue:_bridgeLocal ending with ${this.killReason}`);
ep.unbridge().catch((err) => {});
// notify partner that we dropped
@@ -242,12 +268,26 @@ class TaskEnqueue extends Task {
* @param {string} opts.epUuid uuid of the endpoint we need to bridge to
* @param {string} opts.dequeueSipAddress ip:port of the feature server hosting the other call
*/
notifyQueueEvent(cs, opts) {
async notifyQueueEvent(cs, opts) {
if (opts.event === 'dequeue') {
if (this.bridgeNow) return;
this.logger.info({opts}, `TaskEnqueue:notifyDequeueEvent: leaving ${this.queueName} because someone wants me`);
assert(opts.dequeueSipAddress && opts.epUuid && opts.notifyUrl);
this.emitter.emit('dequeue', opts);
try {
const {lengthOfList} = cs.srf.locals.dbHelpers;
const members = await lengthOfList(this.queueName);
this.dequeued = true;
cs.performQueueWebhook({
event: 'leave',
queue: this.data.name,
length: Math.max(members - 1, 0),
leaveReason: 'dequeued',
leaveTime: Date.now(),
dequeuer: opts.dequeuer
});
} catch (err) {}
}
else if (opts.event === 'hangup') {
this.emitter.emit('hangup');
@@ -277,7 +317,7 @@ class TaskEnqueue extends Task {
const json = await cs.application.requestor.request(hook, params);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = tasks.filter((t) => allowed.includes(t.verb));
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
if (tasks.length !== allowedTasks.length) {
this.logger.debug({tasks, allowedTasks}, 'unsupported task');
throw new Error(`unsupported verb in enqueue waitHook: only ${JSON.stringify(allowed)}`);
@@ -286,10 +326,9 @@ class TaskEnqueue extends Task {
// check for 'leave' verb and only execute tasks up till then
const tasksToRun = [];
let leave = false;
for (const o of tasks) {
if (o.verb === TaskName.Leave) {
leave = true;
if (o.name === TaskName.Leave) {
this._leave = true;
this.logger.info('waitHook returned a leave task');
break;
}
@@ -304,12 +343,12 @@ class TaskEnqueue extends Task {
dlg,
ep: cs.ep,
callInfo: cs.callInfo,
tasksToRun
tasks: tasksToRun
});
await this._playSession.exec();
this._playSession = null;
}
if (leave) {
if (this._leave) {
this.state = QueueResults.Leave;
this.kill(cs);
}

View File

@@ -92,7 +92,8 @@
"waitHook": "object|string",
"statusEvents": "array",
"statusHook": "object|string",
"enterHook": "object|string"
"enterHook": "object|string",
"_": "object"
},
"required": [
"name"

View File

@@ -106,7 +106,7 @@ class Task extends Emitter {
delete obj.requestor;
delete obj.notifier;
obj.tasks = cs.getRemainingTaskData();
if (opts && obj.tasks.length > 1) {
if (opts && obj.tasks.length > 0) {
const key = Object.keys(obj.tasks[0])[0];
Object.assign(obj.tasks[0][key], {_: opts});
}

View File

@@ -92,6 +92,10 @@
"Hangup": "hangup",
"Timeout": "timeout"
},
"KillReason": {
"Hangup": "hangup",
"Replaced": "replaced"
},
"MAX_SIMRINGS": 10,
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)"
}

View File

@@ -99,6 +99,7 @@ function installSrfLocals(srf, logger) {
}
const {
pool,
lookupAppByPhoneNumber,
lookupAppBySid,
lookupAppByRealm,
@@ -138,6 +139,7 @@ function installSrfLocals(srf, logger) {
Object.assign(srf.locals, {
dbHelpers: {
pool,
lookupAppByPhoneNumber,
lookupAppBySid,
lookupAppByRealm,

View File

@@ -186,6 +186,15 @@ class SingleDialer extends Emitter {
this.logger.debug(`SingleDialer:exec call connected: ${this.callSid}`);
const connectTime = this.dlg.connectTime = moment();
/* race condition: we were killed just as call was answered */
if (this.killed) {
this.logger.info(`SingleDialer:exec race condition - we were killed as call connected: ${this.callSid}`);
const duration = moment().diff(connectTime, 'seconds');
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
if (this.ep) this.ep.destroy();
return;
}
this.dlg
.on('destroy', () => {
const duration = moment().diff(connectTime, 'seconds');

View File

@@ -18,8 +18,7 @@ module.exports = (logger) => {
// listen for SNS lifecycle changes
let lifecycleEmitter = new Emitter();
let dryUpCalls = false;
if (process.env.AWS_SNS_TOPIC_ARM &&
process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY && process.env.AWS_REGION) {
if (process.env.AWS_SNS_TOPIC_ARM && process.env.AWS_REGION) {
(async function() {
try {