fixes from initial load testing

This commit is contained in:
Dave Horton
2020-02-22 15:06:01 -05:00
parent 4bd9e015b5
commit fa05b76451
8 changed files with 108 additions and 67 deletions

View File

@@ -47,14 +47,10 @@ router.post('/', async(req, res) => {
}
/* create endpoint for outdial */
const mrf = srf.locals.mrf;
const fsOpts = getFreeswitch();
if (!fsOpts) throw new Error('no available Freeswitch for outbound call creation');
const ms = await mrf.connect(fsOpts);
logger.debug('createCall: successfully connected to media server');
const ms = getFreeswitch();
if (!ms) throw new Error('no available Freeswitch for outbound call creation');
const ep = await ms.createEndpoint();
logger.debug(`createCall: successfully allocated endpoint, sending INVITE to ${sbcAddress}`);
ms.destroy();
/* launch outdial */
let sdp, sipLogger;

View File

@@ -435,7 +435,7 @@ class CallSession extends Emitter {
// need to allocate an endpoint
try {
if (!this.ms) this.ms = await this.getMS();
if (!this.ms) this.ms = this.getMS();
const ep = await this.ms.createEndpoint({remoteSdp: this.req.body});
ep.cs = this;
this.ep = ep;
@@ -485,7 +485,7 @@ class CallSession extends Emitter {
* Hang up the call and free the media endpoint
*/
async _clearResources() {
for (const resource of [this.dlg, this.ep, this.ms]) {
for (const resource of [this.dlg, this.ep]) {
try {
if (resource && resource.connected) await resource.destroy();
} catch (err) {
@@ -505,12 +505,10 @@ class CallSession extends Emitter {
/**
* get a media server to use for this call
*/
async getMS() {
getMS() {
if (!this.ms) {
const fsOpts = this.srf.locals.getFreeswitch();
if (!fsOpts) throw new Error('no available freeswitch');
const mrf = this.srf.locals.mrf;
this.ms = await mrf.connect(fsOpts);
this.ms = this.srf.locals.getFreeswitch();
if (!this.ms) throw new Error('no available freeswitch');
}
return this.ms;
}
@@ -520,14 +518,13 @@ class CallSession extends Emitter {
* the current media server and endpoint that are associated with this call
*/
async createOrRetrieveEpAndMs() {
const mrf = this.srf.locals.mrf;
if (this.ms && this.ep) return {ms: this.ms, ep: this.ep};
// get a media server
if (!this.ms) {
const fsOpts = this.srf.locals.getFreeswitch();
if (!fsOpts) throw new Error('no available freeswitch');
this.ms = await mrf.connect(fsOpts);
const ms = this.srf.locals.getFreeswitch();
if (!ms) throw new Error('no available freeswitch');
this.ms = ms;
}
if (!this.ep) {
this.ep = await this.ms.createEndpoint({remoteSdp: this.req.body});

View File

@@ -2,7 +2,6 @@ const Task = require('./task');
const makeTask = require('./make_task');
const {CallStatus, CallDirection, TaskName, TaskPreconditions, MAX_SIMRINGS} = require('../utils/constants');
const assert = require('assert');
const parseUri = require('drachtio-srf').parseUri;
const placeCall = require('../utils/place-outdial');
const sessionTracker = require('../session/session-tracker');
const DtmfCollector = require('../utils/dtmf-collector');

View File

@@ -1,3 +1,4 @@
const Mrf = require('drachtio-fsmrf');
const ip = require('ip');
const localIp = ip.address();
const PORT = process.env.HTTP_PORT || 3000;
@@ -5,19 +6,90 @@ const assert = require('assert');
function installSrfLocals(srf, logger) {
assert(!srf.locals.dbHelpers);
const {getSBC, getSrf} = require('./sbc-pinger')(logger);
const freeswitch = process.env.JAMBONES_FREESWITCH
.split(',')
.map((fs) => {
const arr = /^(.*):(.*):(.*)/.exec(fs);
if (arr) return {address: arr[1], port: arr[2], secret: arr[3]};
});
logger.info({freeswitch}, 'freeswitch inventory');
const StatsCollector = require('jambonz-stats-collector');
const stats = srf.locals.stats = new StatsCollector(logger);
// freeswitch connections (typically we connect to only one)
const mrf = new Mrf(srf);
const mediaservers = [];
let idxStart = 0;
(async function() {
const fsInventory = process.env.JAMBONES_FREESWITCH
.split(',')
.map((fs) => {
const arr = /^(.*):(.*):(.*)/.exec(fs);
assert.ok(arr, `Invalid syntax JAMBONES_FREESWITCH: ${process.env.JAMBONES_FREESWITCH}`);
return {address: arr[1], port: arr[2], secret: arr[3]};
});
logger.info({fsInventory}, 'freeswitch inventory');
for (const fs of fsInventory) {
const val = {opts: fs, active: false, connects: 0};
mediaservers.push(val);
try {
const ms = await mrf.connect(fs);
Object.assign(val, {ms, active: true, connects: 1});
logger.info(`connected to freeswitch at ${fs.address}`);
ms.conn
.on('esl::end', () => {
val.active = false;
logger.info(`lost connection to freeswitch at ${fs.address}`);
})
.on('esl::ready', () => {
if (val.connects > 0) {
logger.info(`connected to freeswitch at ${fs.address}`);
}
val.connects = 1;
val.active = true;
});
}
catch (err) {
logger.info(`failed connecting to freeswitch at ${fs.address}, will retry shortly`);
}
}
// retry to connect to any that were initially offline
setInterval(async() => {
for (const val of mediaservers) {
if (val.connect === 0) {
try {
const ms = await mrf.connect(val.opts);
val.ms = ms;
} catch (err) {
logger.info(`failed connecting to freeswitch at ${val.opts.address}, will retry shortly`);
}
}
}
}, 3000);
// if we have a single freeswitch (as is typical) report stats periodically
if (mediaservers.length === 1) {
const ms = mediaservers[0].ms;
setInterval(() => {
try {
stats.gauge('fs.media.channels.in_use', ms.currentSessions);
stats.gauge('fs.media.channels.free', ms.maxSessions - ms.currentSessions);
stats.gauge('fs.media.calls_per_second', ms.cps);
stats.gauge('fs.media.cpu_idle', ms.cpuIdle);
}
catch (err) {
logger.info(err, 'Error sending media server metrics');
}
}, 30000);
}
})();
/**
* return an active media server
*/
function getFreeswitch() {
const active = mediaservers.filter((mediaserver) => mediaserver.active);
if (active.length === 0) return null;
return active[idxStart++ % active.length].ms;
}
const {
lookupAppByPhoneNumber,
lookupAppBySid,
@@ -54,11 +126,9 @@ function installSrfLocals(srf, logger) {
serviceUrl: `http://${localIp}:${PORT}`,
getSBC,
getSrf,
getFreeswitch: () => freeswitch[0],
getFreeswitch,
stats: stats
});
logger.debug({locals: srf.locals}, 'srf.locals installed');
}
module.exports = installSrfLocals;

View File

@@ -248,16 +248,20 @@ class SingleDialer extends Emitter {
(!duration && callStatus !== CallStatus.Completed),
'duration MUST be supplied when call completed AND ONLY when call completed');
this.callInfo.updateCallStatus(callStatus, sipStatus);
if (typeof duration === 'number') this.callInfo.duration = duration;
try {
this.requestor.request(this.application.call_status_hook, this.callInfo.toJSON());
} catch (err) {
this.logger.info(err, `SingleDialer:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
if (this.callInfo) {
this.callInfo.updateCallStatus(callStatus, sipStatus);
if (typeof duration === 'number') this.callInfo.duration = duration;
try {
this.requestor.request(this.application.call_status_hook, this.callInfo.toJSON());
} catch (err) {
this.logger.info(err, `SingleDialer:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}
// update calls db
this.updateCallStatus(this.callInfo, this.serviceUrl).catch((err) => this.logger.error(err, 'redis error'));
}
else {
this.logger.info('SingleDialer:_notifyCallStatusChange: call status change before sending the outbound INVITE!!');
}
// update calls db
this.updateCallStatus(this.callInfo, this.serviceUrl).catch((err) => this.logger.error(err, 'redis error'));
}
}

View File

@@ -68,7 +68,7 @@ class Requestor {
const {username, password} = typeof hook === 'object' ? hook : {};
assert.ok(url, 'Requestor:request url was not provided');
assert.ok, (['GET', 'POST'].includes(method), `Requestor:request method must be 'GET' or 'POST' not ${method}`);
assert.ok, (['GET', 'POST'].includes(method), `Requestor:request method must be 'GET' or 'POST' not ${method}`);
this.logger.debug({hook, params}, `Requestor:request ${method} ${url}`);
const startAt = process.hrtime();
@@ -79,7 +79,7 @@ class Requestor {
await this.post(url, params, this.authHeader) :
await bent(method, 'buffer', 200, 201)(url, params, basicAuth(username, password));
} catch (err) {
this.logger.info({baseUrl: this.baseUrl, url: err.statusCode},
this.logger.info({baseUrl: this.baseUrl, url: err.statusCode},
`web callback returned unexpected error code ${err.statusCode}`);
throw err;
}

View File

@@ -26,7 +26,7 @@ module.exports = (logger) => {
.on('connect', (err, hp) => {
if (err) return logger.info(err, `Error connecting to drachtio server at ${arr[1]}:${arr[2]}`);
srfs.push(srf);
logger.info(err, `Success connecting to FS at ${arr[1]}:${arr[2]}, ${srfs.length} online`);
logger.info(err, `Success connecting to drachtio at ${arr[1]}:${arr[2]}, ${srfs.length} online`);
pingProxies(srf);
})
.on('error', (err) => {