add support for live call control

This commit is contained in:
Dave Horton
2020-02-07 10:26:35 -05:00
parent 2811e35c6b
commit 3ca2d982cc
18 changed files with 332 additions and 73 deletions

View File

@@ -11,21 +11,51 @@ const Srf = require('drachtio-srf');
const drachtio = config.get('outdials.drachtio');
const sbcs = config.get('outdials.sbc');
const Mrf = require('drachtio-fsmrf');
const installSrfLocals = require('../../utils/install-srf-locals');
let idxDrachtio = 0;
let idxSbc = 0;
let srfs = [];
let initializedSrfs = false;
const srfs = drachtio.map((d) => {
const srf = new Srf();
srf.connect(d);
srf
.on('connect', (err, hp) => {
if (!err) console.log(`Connected to drachtio at ${hp} for REST outdials`);
else console.log(`Error connecting to drachtio for outdials: ${err}`);
srf.locals.mrf = new Mrf(srf);
})
.on('error', (err) => console.log(err));
return srf;
});
/**
* Connect to a single drachtio server, returning a Promise when connected.
* Upon connect, add ourselves to the list of active servers, removing if we lose the connection
*/
function connectSrf(logger, d) {
return new Promise((resolve, reject) => {
const srf = new Srf();
srf.connect(d);
srf
.on('connect', (err, hp) => {
if (!err) logger.info(`connectSrf: Connected to drachtio at ${hp} for REST outdials`);
else logger.error(`connectSrf: Error connecting to drachtio for outdials: ${err}`);
srf.locals.mrf = new Mrf(srf);
installSrfLocals(srf, logger);
srfs.push(srf);
resolve(srf);
})
.on('error', (err) => {
logger.error(err, 'connectSrf error');
srfs = srfs.filter((s) => s !== srf);
reject(err);
});
});
}
/**
* Retrieve a connection to a drachtio server, lazily creating when first called
*/
function getSrfForOutdial(logger) {
return new Promise((resolve, reject) => {
if (srfs.length === 0 && initializedSrfs) return reject('no available drachtio servers for outdial');
else if (srfs.length > 0) return resolve(srfs[idxDrachtio++ % srfs.length]);
else {
logger.debug(drachtio, 'getSrfForOutdial - attempting to connect');
initializedSrfs = true;
resolve(Promise.race(drachtio.map((d) => connectSrf(logger, d))));
}
});
}
async function validate(logger, payload) {
const data = Object.assign({}, {
@@ -57,7 +87,7 @@ router.post('/', async(req, res) => {
let uri, cs, to;
const restDial = await validate(logger, req.body);
const sbcAddress = sbcs[idxSbc++ % sbcs.length];
const srf = srfs[idxDrachtio++ % srfs.length];
const srf = await getSrfForOutdial(logger);
const target = restDial.to;
const opts = {
'callingNumber': restDial.from

View File

@@ -1,5 +1,6 @@
const api = require('express').Router();
api.use('/createCall', require('./create-call'));
api.use('/updateCall', require('./update-call'));
module.exports = api;

View File

@@ -0,0 +1,22 @@
const router = require('express').Router();
const sysError = require('./error');
const sessionTracker = require('../../session/session-tracker');
router.post('/:callSid', async(req, res) => {
const logger = req.app.locals.logger;
const callSid = req.params.callSid;
logger.debug({body: req.body}, 'got upateCall request');
try {
const cs = sessionTracker.get(callSid);
if (!cs) {
logger.info(`updateCall: callSid not found ${callSid}`);
return res.sendStatus(404);
}
res.sendStatus(202);
cs.updateCall(req.body);
} catch (err) {
sysError(logger, res, err);
}
});
module.exports = router;

View File

@@ -16,7 +16,7 @@ class CallInfo {
this.callId = req.get('Call-ID');
this.sipStatus = 100;
this.callStatus = CallStatus.Trying;
this.originatingSipIP = req.get('X-Forwarded-For');
this.originatingSipIp = req.get('X-Forwarded-For');
this.originatingSipTrunkName = req.get('X-Originating-Carrier');
}
else if (opts.parentCallInfo) {
@@ -74,7 +74,7 @@ class CallInfo {
accountSid: this.accountSid,
applicationSid: this.applicationSid
};
['parentCallSid', 'originatingSipIP', 'originatingSipTrunkName'].forEach((prop) => {
['parentCallSid', 'originatingSipIp', 'originatingSipTrunkName'].forEach((prop) => {
if (this[prop]) obj[prop] = this[prop];
});
if (typeof this.duration === 'number') obj.duration = this.duration;

View File

@@ -4,6 +4,7 @@ const {CallDirection, TaskPreconditions, CallStatus} = require('../utils/constan
const hooks = require('../utils/notifiers');
const moment = require('moment');
const assert = require('assert');
const sessionTracker = require('./session-tracker');
const BADPRECONDITIONS = 'preconditions not met';
class CallSession extends Emitter {
@@ -18,9 +19,14 @@ class CallSession extends Emitter {
const {notifyHook} = hooks(this.logger, this.callInfo);
this.notifyHook = notifyHook;
this.updateCallStatus = srf.locals.dbHelpers.updateCallStatus;
this.serviceUrl = srf.locals.serviceUrl;
this.taskIdx = 0;
this.stackIdx = 0;
this.callGone = false;
sessionTracker.add(this.callSid, this);
}
get callSid() {
@@ -87,6 +93,8 @@ class CallSession extends Emitter {
this._onTasksDone();
this._clearCalls();
this.ms && this.ms.destroy();
sessionTracker.remove(this.callSid);
}
_onTasksDone() {
@@ -96,7 +104,42 @@ class CallSession extends Emitter {
_callReleased() {
this.logger.debug('CallSession:_callReleased - caller hung up');
this.callGone = true;
if (this.currentTask) this.currentTask.kill();
if (this.currentTask) {
this.currentTask.kill();
this.currentTask = null;
}
}
normalizeUrl(url, method, auth) {
const hook = {url, method};
if (auth && auth.username && auth.password) Object.assign(hook, auth);
if (url.startsWith('/')) {
const or = this.originalRequest;
if (or) {
hook.url = `${or.baseUrl}${url}`;
hook.method = hook.method || or.method || 'POST';
if (!hook.auth && or.auth) Object.assign(hook, or.auth);
}
}
this.logger.debug({hook}, 'Task:normalizeUrl');
return hook;
}
async updateCall(opts) {
this.logger.debug(opts, 'CallSession:updateCall');
if (opts.call_status === 'completed' && this.dlg) {
this.logger.info('CallSession:updateCall hanging up call due to request from api');
this._callerHungup();
}
else if (opts.call_hook && opts.call_hook.url) {
const hook = this.normalizeUrl(opts.call_hook.url, opts.call_hook.method, opts.call_hook.auth);
this.logger.info({hook}, 'CallSession:updateCall replacing application due to request from api');
const {actionHook} = hooks(this.logger, this.callInfo);
const tasks = await actionHook(hook);
this.logger.info({tasks}, 'CallSession:updateCall new task list');
this.replaceApplication(tasks);
}
}
/**
@@ -105,9 +148,14 @@ class CallSession extends Emitter {
*/
replaceApplication(tasks) {
this.tasks = tasks;
this.logger.info({tasks}, `CallSession:replaceApplication - reset application with ${tasks.length} new tasks`);
this.taskIdx = 0;
this.stackIdx++;
this.logger.info({tasks},
`CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`);
if (this.currentTask) {
this.currentTask.kill();
this.currentTask = null;
}
}
_evaluatePreconditions(task) {
switch (task.preconditions) {
@@ -227,6 +275,9 @@ class CallSession extends Emitter {
} catch (err) {
this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}
// update calls db
this.updateCallStatus(this.callInfo, this.serviceUrl).catch((err) => this.logger.error(err, 'redis error'));
}
}

View File

@@ -0,0 +1,42 @@
const Emitter = require('events');
const assert = require('assert');
class SessionTracker extends Emitter {
constructor() {
super();
this.sessions = new Map();
}
get logger() {
if (!this._logger) {
const {logger} = require('../../app');
this._logger = logger;
}
return this._logger;
}
add(callSid, callSession) {
assert(callSid);
this.sessions.set(callSid, callSession);
this.logger.info(`SessionTracker:add callSid ${callSid}, we have ${this.sessions.size} session being tracked`);
}
remove(callSid) {
assert(callSid);
this.sessions.delete(callSid);
this.logger.info(`SessionTracker:remove callSid ${callSid}, we have ${this.sessions.size} being tracked`);
}
has(callSid) {
return this.sessions.has(callSid);
}
get(callSid) {
return this.sessions.get(callSid);
}
}
const singleton = new SessionTracker();
module.exports = singleton;

View File

@@ -56,10 +56,10 @@ class TaskDial extends Task {
this.confirmMethod = this.data.confirmMethod;
if (this.data.listen) {
this.listenTask = makeTask(logger, {'listen': this.data.listen});
this.listenTask = makeTask(logger, {'listen': this.data.listen}, this);
}
if (this.data.transcribe) {
this.transcribeTask = makeTask(logger, {'transcribe' : this.data.transcribe});
this.transcribeTask = makeTask(logger, {'transcribe' : this.data.transcribe}, this);
}
this.results = {};
@@ -131,7 +131,7 @@ class TaskDial extends Task {
const sbcAddress = cs.direction === CallDirection.Inbound ?
`${req.source_address}:${req.source_port}` :
config.get('sbcAddress');
config.get('outdials.sbc');
const opts = {
headers: req && req.has('X-CID') ? Object.assign(this.headers, {'X-CID': req.get('X-CID')}) : this.headers,
proxy: `sip:${sbcAddress}`,
@@ -268,8 +268,8 @@ class TaskDial extends Task {
dialCallSid: sd.callSid,
});
if (this.transcribeTask) this.transcribeTask.exec(cs, this.ep, this);
if (this.listenTask) this.listenTask.exec(cs, this.ep, this);
if (this.transcribeTask) this.transcribeTask.exec(cs, this.ep);
if (this.listenTask) this.listenTask.exec(cs, this.ep);
}
_bridgeEarlyMedia(sd) {

View File

@@ -16,10 +16,11 @@ class TaskListen extends Task {
this.mixType = this.mixType || 'mono';
this.sampleRate = this.sampleRate || 8000;
this.earlyMedia = this.data.earlyMedia === true;
this.hook = this.normalizeUrl(this.url, 'GET', this.wsAuth);
this.nested = typeof parentTask !== 'undefined';
this.parentTask = parentTask;
this.nested = parentTask instanceof Task;
this.results = {};
this.ranToCompletion = false;
if (this.transcribe) this.transcribeTask = makeTask(logger, {'transcribe': opts.transcribe}, this);
@@ -31,15 +32,18 @@ class TaskListen extends Task {
async exec(cs, ep) {
super.exec(cs);
this.ep = ep;
try {
this.hook = this.normalizeUrl(this.url, 'GET', this.wsAuth);
if (this.playBeep) await this._playBeep(ep);
if (this.transcribeTask) {
this.logger.debug('TaskListen:exec - starting nested transcribe task');
this.transcribeTask.exec(cs, ep, this);
this.transcribeTask.exec(cs, ep);
}
await this._startListening(cs, ep);
await this.awaitTaskDone();
if (this.action) await this.performAction(this.method, this.auth, this.results, !this.nested);
const acceptNewApp = !this.nested && this.ranToCompletion;
if (this.action) await this.performAction(this.method, this.auth, this.results, acceptNewApp);
} catch (err) {
this.logger.info(err, `TaskListen:exec - error ${this.url}`);
}
@@ -73,9 +77,8 @@ class TaskListen extends Task {
this._initListeners(ep);
const metadata = Object.assign(
{sampleRate: this.sampleRate, mixType: this.mixType},
cs.callInfo.toJSON(),
this.nested ? this.parentTask.sd.callInfo : cs.callInfo.toJSON(),
this.metadata);
this.logger.debug({metadata, hook: this.hook}, 'TaskListen:_startListening');
if (this.hook.username && this.hook.password) {
this.logger.debug({username: this.hook.username, password: this.hook.password},
'TaskListen:_startListening basic auth');
@@ -94,6 +97,7 @@ class TaskListen extends Task {
if (this.maxLength) {
this._timer = setTimeout(() => {
this.logger.debug(`TaskListen terminating task due to timeout of ${this.timeout}s reached`);
this.ranToCompletion = true;
this.kill();
}, this.maxLength * 1000);
}
@@ -121,6 +125,7 @@ class TaskListen extends Task {
if (evt.dtmf === this.finishOnKey) {
this.logger.info(`TaskListen:_onDtmf terminating task due to dtmf ${evt.dtmf}`);
this.results.digits = evt.dtmf;
this.ranToCompletion = true;
this.kill();
}
}

View File

@@ -2,7 +2,7 @@ const Task = require('./task');
const {TaskName} = require('../utils/constants');
const errBadInstruction = new Error('malformed jambonz application payload');
function makeTask(logger, obj) {
function makeTask(logger, obj, parent) {
const keys = Object.keys(obj);
if (!keys || keys.length !== 1) {
throw errBadInstruction;
@@ -17,40 +17,40 @@ function makeTask(logger, obj) {
switch (name) {
case TaskName.SipDecline:
const TaskSipDecline = require('./sip_decline');
return new TaskSipDecline(logger, data);
return new TaskSipDecline(logger, data, parent);
case TaskName.Dial:
const TaskDial = require('./dial');
return new TaskDial(logger, data);
return new TaskDial(logger, data, parent);
case TaskName.Hangup:
const TaskHangup = require('./hangup');
return new TaskHangup(logger, data);
return new TaskHangup(logger, data, parent);
case TaskName.Say:
const TaskSay = require('./say');
return new TaskSay(logger, data);
return new TaskSay(logger, data, parent);
case TaskName.Play:
const TaskPlay = require('./play');
return new TaskPlay(logger, data);
return new TaskPlay(logger, data, parent);
case TaskName.Pause:
const TaskPause = require('./pause');
return new TaskPause(logger, data);
return new TaskPause(logger, data, parent);
case TaskName.Gather:
const TaskGather = require('./gather');
return new TaskGather(logger, data);
return new TaskGather(logger, data, parent);
case TaskName.Transcribe:
const TaskTranscribe = require('./transcribe');
return new TaskTranscribe(logger, data);
return new TaskTranscribe(logger, data, parent);
case TaskName.Listen:
const TaskListen = require('./listen');
return new TaskListen(logger, data);
return new TaskListen(logger, data, parent);
case TaskName.Redirect:
const TaskRedirect = require('./redirect');
return new TaskRedirect(logger, data);
return new TaskRedirect(logger, data, parent);
case TaskName.RestDial:
const TaskRestDial = require('./rest_dial');
return new TaskRestDial(logger, data);
return new TaskRestDial(logger, data, parent);
case TaskName.Tag:
const TaskTag = require('./tag');
return new TaskTag(logger, data);
return new TaskTag(logger, data, parent);
}
// should never reach

View File

@@ -57,26 +57,14 @@ class Task extends Emitter {
}
normalizeUrl(url, method, auth) {
const hook = {url, method};
if (auth && auth.username && auth.password) Object.assign(hook, auth);
if (url.startsWith('/')) {
const or = this.callSession.originalRequest;
if (or) {
hook.url = `${or.baseUrl}${url}`;
hook.method = hook.method || or.method || 'POST';
if (!hook.auth && or.auth) Object.assign(hook, or.auth);
}
}
this.logger.debug({hook}, 'Task:normalizeUrl');
return hook;
return this.callSession.normalizeUrl(url, method, auth);
}
async performAction(method, auth, results, expectResponse = true) {
if (this.action) {
const hook = this.normalizeUrl(this.action, method, auth);
const tasks = await this.actionHook(hook, results, expectResponse);
if (tasks && Array.isArray(tasks)) {
if (expectResponse && tasks && Array.isArray(tasks)) {
this.logger.debug({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.callSession.replaceApplication(tasks);
}

View File

@@ -0,0 +1,31 @@
const config = require('config');
const ip = require('ip');
const localIp = ip.address();
const PORT = process.env.HTTP_PORT || config.get('defaultHttpPort');
function installSrfLocals(srf, logger) {
if (srf.locals.dbHelpers) return;
const {lookupAppByPhoneNumber} = require('jambonz-db-helpers')(config.get('mysql'), logger);
const {
updateCallStatus,
retrieveCall,
listCalls,
deleteCall
} = require('jambonz-realtimedb-helpers')(config.get('redis'), logger);
Object.assign(srf.locals, {
dbHelpers: {
lookupAppByPhoneNumber,
updateCallStatus,
retrieveCall,
listCalls,
deleteCall
},
parentLogger: logger,
ipv4: localIp,
serviceUrl: `http://${localIp}:${PORT}`
});
}
module.exports = installSrfLocals;

View File

@@ -25,7 +25,7 @@ function normalizeJambones(logger, obj) {
throw new Error('malformed jambonz payload: missing verb property');
}
}
logger.debug(`returning document with ${document.length} tasks`);
logger.debug({document}, `normalizeJambones: returning document with ${document.length} tasks`);
return document;
}

View File

@@ -61,6 +61,9 @@ class SingleDialer extends Emitter {
}
try {
this.updateCallStatus = srf.locals.dbHelpers.updateCallStatus;
this.serviceUrl = srf.locals.serviceUrl;
this.ep = await ms.createEndpoint();
this.logger.debug(`SingleDialer:exec - created endpoint ${this.ep.uuid}`);
let sdp;
@@ -237,6 +240,9 @@ class SingleDialer extends Emitter {
} catch (err) {
this.logger.info(err, `SingleDialer:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}
// update calls db
this.updateCallStatus(this.callInfo, this.serviceUrl).catch((err) => this.logger.error(err, 'redis error'));
}
}