add support for multiple rtpengines

This commit is contained in:
Dave Horton
2020-02-17 09:23:52 -05:00
parent 75946e89a4
commit fa60d8a3be
2 changed files with 84 additions and 9 deletions

View File

@@ -1,6 +1,4 @@
const Emitter = require('events');
const Client = require('rtpengine-client').Client ;
const rtpengine = new Client();
const {getAppserver, isWSS, makeRtpEngineOpts} = require('./utils');
const {forwardInDialogRequests} = require('drachtio-fn-b2b-sugar');
const {parseUri, SipError} = require('drachtio-srf');
@@ -13,17 +11,22 @@ class CallSession extends Emitter {
this.res = res;
this.srf = req.srf;
this.logger = logger.child({callId: req.get('Call-ID')});
// TODO: we always using the first rtpengine here
// we should be load balancing, and doing some form of health checking
this.offer = rtpengine.offer.bind(rtpengine, this.srf.locals.rtpEngines[0]);
this.answer = rtpengine.answer.bind(rtpengine, this.srf.locals.rtpEngines[0]);
this.del = rtpengine.delete.bind(rtpengine, this.srf.locals.rtpEngines[0]);
}
async connect() {
const getRtpEngine = require('./rtpengine')(this.logger);
const engine = getRtpEngine();
if (!engine) {
this.logger.info('No available rtpengines, rejecting call!');
return this.res.send(480);
}
const {offer, answer, del} = engine;
this.offer = offer;
this.answer = answer;
this.del = del;
this.rtpEngineOpts = makeRtpEngineOpts(this.req, isWSS(this.req), false);
this.rtpEngineResource = {destroy: this.del.bind(rtpengine, this.rtpEngineOpts.common)};
this.rtpEngineResource = {destroy: this.del.bind(null, this.rtpEngineOpts.common)};
const obj = parseUri(this.req.uri);
const appServer = getAppserver(this.srf);
let proxy, host, uri;

72
lib/rtpengine.js Normal file
View File

@@ -0,0 +1,72 @@
const assert = require('assert');
const Client = require('rtpengine-client').Client ;
const client = new Client({timeout: 1500});
const debug = require('debug')('jambonz:sbc-inbound');
let timer;
const engines = process.env.JAMBONES_RTPENGINES
.split(',')
.map((hp) => {
const arr = /^(.*):(.*)$/.exec(hp.trim());
if (!arr) throw new Error('JAMBONES_RTPENGINES must be an array host:port addresses');
const engine = {
active: true,
calls: 0,
host: arr[1],
port: parseInt(arr[2])
};
[
'offer',
'answer',
'delete',
'list',
'startRecording',
'stopRecording'
].forEach((method) => engine[method] = client[method].bind(client, engine.port, engine.host));
return engine;
});
assert.ok(engines.length > 0, 'JAMBONES_RTPENGINES must be an array host:port addresses');
debug(`engines: ${JSON.stringify(engines)}`);
assert(typeof engines[0].offer === 'function');
assert(typeof engines[0].answer === 'function');
assert(typeof engines[0].delete === 'function');
function testEngines(logger) {
return setInterval(() => {
engines.forEach(async(engine) => {
try {
const res = await engine.list();
if ('ok' === res.result) {
engine.calls = res.calls.length;
engine.active = true;
logger.info({res}, `rtpengine:list ${engine.host}:${engine.port} has ${engine.calls} calls`);
return;
}
logger.info({rtpengine: engine.host, response: res}, 'Failure response from rtpengine');
engine.active = false;
} catch (err) {
logger.info({rtpengine: engine.host, err}, 'Failure response from rtpengine');
}
engine.active = false;
});
}, 5000);
}
const selectClient = () => engines.filter((c) => c.active).sort((a, b) => (a.calls - b.calls)).shift();
function getRtpEngine(logger) {
if (!timer) timer = testEngines(logger);
return () => {
const engine = selectClient();
if (engine) {
debug({engine}, 'selected engine');
return {
offer: engine.offer,
answer: engine.answer,
del: engine.delete
};
}
};
}
module.exports = getRtpEngine;