mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-19 04:17:44 +00:00
support OPTIONS ping to SBCs
This commit is contained in:
2
app.js
2
app.js
@@ -5,9 +5,7 @@ assert.ok(process.env.JAMBONES_MYSQL_HOST &&
|
||||
process.env.JAMBONES_MYSQL_DATABASE, 'missing JAMBONES_MYSQL_XXX env vars');
|
||||
assert.ok(process.env.DRACHTIO_PORT || process.env.DRACHTIO_HOST, 'missing DRACHTIO_PORT env var');
|
||||
assert.ok(process.env.DRACHTIO_SECRET, 'missing DRACHTIO_SECRET env var');
|
||||
assert.ok(process.env.JAMBONES_SBCS, 'missing JAMBONES_SBCS env var');
|
||||
assert.ok(process.env.JAMBONES_FREESWITCH, 'missing JAMBONES_FREESWITCH env var');
|
||||
assert.ok(process.env.JAMBONES_FEATURE_SERVERS, 'missing JAMBONES_FEATURE_SERVERS env var');
|
||||
|
||||
const Srf = require('drachtio-srf');
|
||||
const srf = new Srf();
|
||||
|
||||
@@ -4,56 +4,18 @@ const RestCallSession = require('../../session/rest-call-session');
|
||||
const CallInfo = require('../../session/call-info');
|
||||
const {CallDirection, CallStatus} = require('../../utils/constants');
|
||||
const SipError = require('drachtio-srf').SipError;
|
||||
const Srf = require('drachtio-srf');
|
||||
const sysError = require('./error');
|
||||
const Mrf = require('drachtio-fsmrf');
|
||||
const installSrfLocals = require('../../utils/install-srf-locals');
|
||||
const Requestor = require('../../utils/requestor');
|
||||
let idxDrachtio = 0;
|
||||
let idxSbc = 0;
|
||||
let srfs = [];
|
||||
let initializedSrfs = false;
|
||||
|
||||
/**
|
||||
* Connect to a single drachtio server, returning a Promise when connected.
|
||||
* Upon connect, add ourselves to the list of active servers, removing if we lose the connection
|
||||
*/
|
||||
function connectSrf(logger, d) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const srf = new Srf();
|
||||
srf.connect(d);
|
||||
srf
|
||||
.on('connect', (err, hp) => {
|
||||
if (!err) logger.info(`connectSrf: Connected to drachtio at ${hp} for REST outdials`);
|
||||
else logger.error(`connectSrf: Error connecting to drachtio for outdials: ${err}`);
|
||||
srf.locals.mrf = new Mrf(srf);
|
||||
installSrfLocals(srf, logger);
|
||||
srfs.push(srf);
|
||||
resolve(srf);
|
||||
})
|
||||
.on('error', (err) => {
|
||||
logger.error(err, 'connectSrf error');
|
||||
srfs = srfs.filter((s) => s !== srf);
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a connection to a drachtio server, lazily creating when first called
|
||||
*/
|
||||
function getSrfForOutdial(logger) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (srfs.length === 0 && initializedSrfs) return reject('no available drachtio servers for outdial');
|
||||
else if (srfs.length > 0) return resolve(srfs[idxDrachtio++ % srfs.length]);
|
||||
else {
|
||||
const {srf} = require('../../../');
|
||||
const drachtio = srf.locals.drachtio;
|
||||
logger.debug(drachtio, 'getSrfForOutdial - attempting to connect');
|
||||
initializedSrfs = true;
|
||||
resolve(Promise.race(drachtio.map((d) => connectSrf(logger, d))));
|
||||
}
|
||||
});
|
||||
const {srf} = require('../../../');
|
||||
const {getSrf} = srf.locals;
|
||||
const srfForOutdial = getSrf();
|
||||
if (!srfForOutdial) throw new Error('no available feature servers for outbound call creation');
|
||||
return srfForOutdial;
|
||||
}
|
||||
|
||||
router.post('/', async(req, res) => {
|
||||
@@ -62,8 +24,10 @@ router.post('/', async(req, res) => {
|
||||
try {
|
||||
let uri, cs, to;
|
||||
const restDial = makeTask(logger, {'rest:dial': req.body});
|
||||
const srf = await getSrfForOutdial(logger);
|
||||
const sbcAddress = srf.locals.sbcs[idxSbc++ % srf.locals.sbcs.length];
|
||||
const srf = getSrfForOutdial(logger);
|
||||
const {getSBC, getFreeswitch} = srf.locals;
|
||||
const sbcAddress = getSBC();
|
||||
if (!sbcAddress) throw new Error('no available SBCs for outbound call creation');
|
||||
const target = restDial.to;
|
||||
const opts = { callingNumber: restDial.from };
|
||||
|
||||
@@ -84,8 +48,9 @@ router.post('/', async(req, res) => {
|
||||
|
||||
/* create endpoint for outdial */
|
||||
const mrf = srf.locals.mrf;
|
||||
|
||||
const ms = await mrf.connect(srf.locals.freeswitch);
|
||||
const fsOpts = getFreeswitch();
|
||||
if (!fsOpts) throw new Error('no available Freeswitch for outbound call creation');
|
||||
const ms = await mrf.connect(fsOpts);
|
||||
logger.debug('createCall: successfully connected to media server');
|
||||
const ep = await ms.createEndpoint();
|
||||
logger.debug(`createCall: successfully allocated endpoint, sending INVITE to ${sbcAddress}`);
|
||||
|
||||
@@ -5,7 +5,7 @@ const assert = require('assert');
|
||||
const sessionTracker = require('./session-tracker');
|
||||
const makeTask = require('../tasks/make_task');
|
||||
const normalizeJamones = require('../utils/normalize-jamones');
|
||||
const list = require('../utils/summarize-tasks');
|
||||
const listTaskNames = require('../utils/summarize-tasks');
|
||||
const BADPRECONDITIONS = 'preconditions not met';
|
||||
|
||||
/**
|
||||
@@ -140,7 +140,7 @@ class CallSession extends Emitter {
|
||||
* @async
|
||||
*/
|
||||
async exec() {
|
||||
this.logger.info({tasks: list(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`);
|
||||
this.logger.info({tasks: listTaskNames(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`);
|
||||
while (this.tasks.length && !this.callGone) {
|
||||
const taskNum = ++this.taskIdx;
|
||||
const stackNum = this.stackIdx;
|
||||
@@ -228,7 +228,7 @@ class CallSession extends Emitter {
|
||||
async _lccCallHook(opts) {
|
||||
const tasks = await this.requestor.request(opts.call_hook, this.callInfo);
|
||||
if (tasks && tasks.length > 0) {
|
||||
this.logger.info({tasks}, 'CallSession:updateCall new task list');
|
||||
this.logger.info({tasks: listTaskNames(tasks)}, 'CallSession:updateCall new task list');
|
||||
this.replaceApplication(normalizeJamones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata)));
|
||||
}
|
||||
}
|
||||
@@ -363,7 +363,7 @@ class CallSession extends Emitter {
|
||||
this.tasks = tasks;
|
||||
this.taskIdx = 0;
|
||||
this.stackIdx++;
|
||||
this.logger.info({tasks},
|
||||
this.logger.info({tasks: listTaskNames(tasks)},
|
||||
`CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`);
|
||||
if (this.currentTask) {
|
||||
this.currentTask.kill();
|
||||
@@ -484,8 +484,10 @@ class CallSession extends Emitter {
|
||||
*/
|
||||
async getMS() {
|
||||
if (!this.ms) {
|
||||
const fsOpts = this.srf.locals.getFreeswitch();
|
||||
if (!fsOpts) throw new Error('no available freeswitch');
|
||||
const mrf = this.srf.locals.mrf;
|
||||
this.ms = await mrf.connect(this.srf.locals.freeswitch);
|
||||
this.ms = await mrf.connect(fsOpts);
|
||||
}
|
||||
return this.ms;
|
||||
}
|
||||
@@ -500,7 +502,9 @@ class CallSession extends Emitter {
|
||||
|
||||
// get a media server
|
||||
if (!this.ms) {
|
||||
this.ms = await mrf.connect(this.srf.locals.freeswitch);
|
||||
const fsOpts = this.srf.locals.getFreeswitch();
|
||||
if (!fsOpts) throw new Error('no available freeswitch');
|
||||
this.ms = await mrf.connect(fsOpts);
|
||||
}
|
||||
if (!this.ep) {
|
||||
this.ep = await this.ms.createEndpoint({remoteSdp: this.req.body});
|
||||
|
||||
@@ -21,6 +21,10 @@ class SessionTracker extends Emitter {
|
||||
return this._logger;
|
||||
}
|
||||
|
||||
get count() {
|
||||
return this.sessions.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a new CallSession to the Map
|
||||
* @param {string} callSid
|
||||
|
||||
@@ -238,10 +238,11 @@ class TaskDial extends Task {
|
||||
|
||||
async _attemptCalls(cs) {
|
||||
const {req, srf} = cs;
|
||||
|
||||
const {getSBC} = srf;
|
||||
const sbcAddress = cs.direction === CallDirection.Inbound ?
|
||||
`${req.source_address}:${req.source_port}` :
|
||||
srf.locals.sbcs[0];
|
||||
getSBC();
|
||||
if (!sbcAddress) throw new Error('no SBC found for outbound call');
|
||||
const opts = {
|
||||
headers: req && req.has('X-CID') ? Object.assign(this.headers, {'X-CID': req.get('X-CID')}) : this.headers,
|
||||
proxy: `sip:${sbcAddress}`,
|
||||
|
||||
@@ -5,6 +5,8 @@ const PORT = process.env.HTTP_PORT || 3000;
|
||||
function installSrfLocals(srf, logger) {
|
||||
if (srf.locals.dbHelpers) return;
|
||||
|
||||
const {getSBC, getSrf} = require('./sbc-pinger')(logger);
|
||||
|
||||
const freeswitch = process.env.JAMBONES_FREESWITCH
|
||||
.split(',')
|
||||
.map((fs) => {
|
||||
@@ -13,19 +15,6 @@ function installSrfLocals(srf, logger) {
|
||||
});
|
||||
logger.info({freeswitch}, 'freeswitch inventory');
|
||||
|
||||
const sbcs = process.env.JAMBONES_SBCS
|
||||
.split(',')
|
||||
.map((sbc) => sbc.trim());
|
||||
logger.info({sbcs}, 'SBC inventory');
|
||||
|
||||
const drachtio = process.env.JAMBONES_FEATURE_SERVERS
|
||||
.split(',')
|
||||
.map((fs) => {
|
||||
const arr = /^(.*):(.*):(.*)/.exec(fs);
|
||||
if (arr) return {host: arr[1], port: arr[2], secret: arr[3]};
|
||||
});
|
||||
logger.info({drachtio}, 'drachtio feature server inventory');
|
||||
|
||||
const {
|
||||
lookupAppByPhoneNumber,
|
||||
lookupAppBySid,
|
||||
@@ -60,9 +49,9 @@ function installSrfLocals(srf, logger) {
|
||||
parentLogger: logger,
|
||||
ipv4: localIp,
|
||||
serviceUrl: `http://${localIp}:${PORT}`,
|
||||
freeswitch: freeswitch[0],
|
||||
sbcs,
|
||||
drachtio
|
||||
getSBC,
|
||||
getSrf,
|
||||
getFreeswitch: () => freeswitch[0]
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -33,8 +33,8 @@ class Requestor {
|
||||
const myPort = u.port ? `:${u.port}` : '';
|
||||
const baseUrl = `${u.protocol}://${u.resource}${myPort}`;
|
||||
|
||||
this.get = bent(baseUrl, 'GET', 'buffer', 200);
|
||||
this.post = bent(baseUrl, 'POST', 'buffer', 200);
|
||||
this.get = bent(baseUrl, 'GET', 'buffer', 200, 201);
|
||||
this.post = bent(baseUrl, 'POST', 'buffer', 200, 201);
|
||||
|
||||
assert(isAbsoluteUrl(this.url));
|
||||
assert(['GET', 'POST'].includes(this.method));
|
||||
@@ -64,7 +64,7 @@ class Requestor {
|
||||
this.logger.debug({hook}, `Requestor:request ${method} ${url}`);
|
||||
const buf = isRelativeUrl(url) ?
|
||||
await this.post(url, params, this.authHeader) :
|
||||
await bent(method, 'buffer', 200)(url, params, basicAuth(username, password));
|
||||
await bent(method, 'buffer', 200, 201)(url, params, basicAuth(username, password));
|
||||
//this.logger.debug({body: }, `Requestor:request ${method} ${url} succeeded`);
|
||||
|
||||
if (buf && buf.toString().length > 0) {
|
||||
|
||||
71
lib/utils/sbc-pinger.js
Normal file
71
lib/utils/sbc-pinger.js
Normal file
@@ -0,0 +1,71 @@
|
||||
const assert = require('assert');
|
||||
const noopLogger = {info: () => {}, error: () => {}};
|
||||
const Srf = require('drachtio-srf');
|
||||
const debug = require('debug')('jambonz:sbc-inbound');
|
||||
const srfs = [];
|
||||
|
||||
module.exports = (logger) => {
|
||||
logger = logger || noopLogger;
|
||||
let idxSbc = 0, idxSrfs = 0;
|
||||
|
||||
assert.ok(process.env.JAMBONES_SBCS, 'missing JAMBONES_SBCS env var');
|
||||
const sbcs = process.env.JAMBONES_SBCS
|
||||
.split(',')
|
||||
.map((sbc) => `sip:${sbc.trim()}`);
|
||||
assert.ok(sbcs.length, 'JAMBONES_SBCS env var is empty or misconfigured');
|
||||
logger.info({sbcs}, 'SBC inventory');
|
||||
|
||||
assert.ok(process.env.JAMBONES_FEATURE_SERVERS, 'missing JAMBONES_FEATURE_SERVERS env var');
|
||||
const drachtio = process.env.JAMBONES_FEATURE_SERVERS
|
||||
.split(',')
|
||||
.map((fs) => {
|
||||
const arr = /^(.*):(.*):(.*)/.exec(fs);
|
||||
if (!arr) throw new Error('JAMBONES_FEATURE_SERVERS env var is misconfigured');
|
||||
const srf = new Srf();
|
||||
srf.connect({host: arr[1], port: arr[2], secret: arr[3]})
|
||||
.on('connect', (err, hp) => {
|
||||
if (err) return logger.info(err, `Error connecting to drachtio server at ${arr[1]}:${arr[2]}`);
|
||||
srfs.push(srf);
|
||||
logger.info(err, `Success connecting to FS at ${arr[1]}:${arr[2]}, ${srfs.length} online`);
|
||||
pingProxies(srf);
|
||||
})
|
||||
.on('error', (err) => {
|
||||
const place = srfs.indexOf(srf);
|
||||
if (-1 !== place) srfs.splice(place, 1);
|
||||
logger.info(err, `Error connecting to FS at ${arr[1]}:${arr[2]}, ${srfs.length} remain online`);
|
||||
});
|
||||
return {host: arr[1], port: arr[2], secret: arr[3]};
|
||||
});
|
||||
assert.ok(drachtio.length, 'JAMBONES_FEATURE_SERVERS env var is empty');
|
||||
logger.info({drachtio}, 'drachtio feature server inventory');
|
||||
|
||||
async function pingProxies(srf) {
|
||||
for (const sbc of sbcs) {
|
||||
try {
|
||||
const req = await srf.request({
|
||||
uri: sbc,
|
||||
method: 'OPTIONS',
|
||||
headers: {
|
||||
'X-FS-Status': 'open'
|
||||
}
|
||||
});
|
||||
req.on('response', (res) => {
|
||||
debug(`received ${res.status} from SBC`);
|
||||
});
|
||||
} catch (err) {
|
||||
logger.error(err, `Error sending OPTIONS to ${sbc}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OPTIONS ping the SBCs from each feature server every 60 seconds
|
||||
setInterval(() => {
|
||||
srfs.forEach((srf) => pingProxies(srf));
|
||||
}, 60000);
|
||||
|
||||
return {
|
||||
getSBC: () => sbcs[idxSbc++ % sbcs.length],
|
||||
getSrf: () => srfs[idxSrfs++ % srfs.length]
|
||||
};
|
||||
};
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "jambonz-feature-server",
|
||||
"version": "0.1.0",
|
||||
"version": "0.2.1",
|
||||
"main": "app.js",
|
||||
"engines": {
|
||||
"node": ">= 10.16.0"
|
||||
|
||||
Reference in New Issue
Block a user