Feature/rtpengine locate by dns (#28)

* k8s: query dns for rtpengine endpoints

* use udp now in k8s

* catch error on rtpengine failure
This commit is contained in:
Dave Horton
2022-01-23 17:32:51 -05:00
committed by GitHub
parent 2ca44c4f5a
commit fc7cdae22a
3 changed files with 53 additions and 24 deletions

60
app.js
View File

@@ -11,6 +11,7 @@ assert.ok(process.env.JAMBONES_NETWORK_CIDR || process.env.K8S, 'missing JAMBONE
const Srf = require('drachtio-srf');
const srf = new Srf('sbc-outbound');
const CIDRMatcher = require('cidr-matcher');
const {pingMsTeamsGateways, equalsIgnoreOrder} = require('./lib/utils');
const opts = Object.assign({
timestamp: () => {return `, "time": "${new Date().toISOString()}"`;}
}, {level: process.env.JAMBONES_LOGLEVEL || 'info'});
@@ -90,7 +91,7 @@ const {initLocals, checkLimits, route} = require('./lib/middleware')(srf, logger
const {getRtpEngine, setRtpEngines} = require('@jambonz/rtpengine-utils')([], logger, {
emitter: stats,
dtmfListenPort: process.env.DTMF_LISTEN_PORT || 22225,
protocol: process.env.RTPENGINE_NG_PROTOCOL || (process.env.K8S ? 'ws' : 'udp')
protocol: 'udp'
});
srf.locals.getRtpEngine = getRtpEngine;
@@ -143,35 +144,52 @@ if (process.env.K8S) {
healthCheck({port: PORT, logger, path: '/', fn: getCount});
}
/* update call stats periodically */
setInterval(() => {
stats.gauge('sbc.sip.calls.count', activeCallIds.size, ['direction:outbound']);
}, 5000);
if ('test' !== process.env.NODE_ENV) {
/* update call stats periodically */
setInterval(() => {
stats.gauge('sbc.sip.calls.count', activeCallIds.size, ['direction:outbound']);
}, 5000);
}
const arrayCompare = (a, b) => {
if (a.length !== b.length) return false;
const uniqueValues = new Set([...a, ...b]);
for (const v of uniqueValues) {
const aCount = a.filter((e) => e === v).length;
const bCount = b.filter((e) => e === v).length;
if (aCount !== bCount) return false;
}
return true;
const lookupRtpServiceEndpoints = (lookup, serviceName) => {
logger.debug(`dns lookup for ${serviceName}..`);
lookup(serviceName, {family: 4, all: true}, (err, addresses) => {
if (err) {
logger.error({err}, `Error looking up ${serviceName}`);
return;
}
logger.debug({addresses, rtpServers}, `dns lookup for ${serviceName} returned`);
const addrs = addresses.map((a) => a.address);
if (!equalsIgnoreOrder(addrs, rtpServers)) {
rtpServers.length = 0;
Array.prototype.push.apply(rtpServers, addrs);
logger.info({rtpServers}, 'rtpserver endpoints have been updated');
setRtpEngines(rtpServers.map((a) => `${a}:${process.env.RTPENGINE_PORT || 22222}`));
}
});
};
const serviceName = process.env.JAMBONES_RTPENGINES || process.env.K8S_RTPENGINE_SERVICE_NAME;
if (serviceName) {
logger.info(`rtpengine(s) will be found at: ${serviceName}`);
setRtpEngines([serviceName]);
if (process.env.K8S_RTPENGINE_SERVICE_NAME) {
/* poll dns for endpoints every so often */
const arr = /^(.*):(\d+)$/.exec(process.env.K8S_RTPENGINE_SERVICE_NAME);
const svc = arr[1];
logger.info(`rtpengine(s) will be found at dns name: ${svc}`);
const {lookup} = require('dns');
lookupRtpServiceEndpoints(lookup, svc);
setInterval(lookupRtpServiceEndpoints.bind(null, lookup, svc), process.env.RTPENGINE_DNS_POLL_INTERVAL || 10000);
}
else if (process.env.JAMBONES_RTPENGINES) {
/* static list of rtpengines */
setRtpEngines([process.env.JAMBONES_RTPENGINES]);
}
else {
/* update rtpengines periodically */
/* poll redis periodically for rtpengines that have registered via OPTIONS ping */
const getActiveRtpServers = async() => {
try {
const set = await retrieveSet(setNameRtp);
const newArray = Array.from(set);
logger.debug({newArray, rtpServers}, 'getActiveRtpServers');
if (!arrayCompare(newArray, rtpServers)) {
if (!equalsIgnoreOrder(newArray, rtpServers)) {
logger.info({newArray}, 'resetting active rtpengines');
setRtpEngines(newArray.map((a) => `${a}:${process.env.RTPENGINE_PORT || 22222}`));
rtpServers.length = 0;
@@ -181,14 +199,12 @@ else {
logger.error({err}, 'Error setting new rtpengines');
}
};
setInterval(() => {
getActiveRtpServers();
}, 30000);
getActiveRtpServers();
}
const {pingMsTeamsGateways} = require('./lib/utils');
pingMsTeamsGateways(logger, srf);
module.exports = {srf};

View File

@@ -372,7 +372,8 @@ class CallSession extends Emitter {
debug(`got final outdial error: ${err}`);
if (!passFailure) this.res.send(status);
this.emit('failed');
this.rtpEngineResource.destroy();
this.rtpEngineResource.destroy()
.catch((err) => this.logger.info({err}, 'Error destroying rtpe after failure'));
const tags = ['accepted:no', `sipStatus:${status}`];
this.stats.increment('sbc.originations', tags);

View File

@@ -73,9 +73,21 @@ const pingMsTeamsGateways = (logger, srf) => {
const makeCallCountKey = (sid) => `${sid}:outcalls`;
const equalsIgnoreOrder = (a, b) => {
if (a.length !== b.length) return false;
const uniqueValues = new Set([...a, ...b]);
for (const v of uniqueValues) {
const aCount = a.filter((e) => e === v).length;
const bCount = b.filter((e) => e === v).length;
if (aCount !== bCount) return false;
}
return true;
};
module.exports = {
makeRtpEngineOpts,
selectHostPort,
pingMsTeamsGateways,
makeCallCountKey
makeCallCountKey,
equalsIgnoreOrder
};