|
|
|
|
@@ -1,5 +1,6 @@
|
|
|
|
|
const debug = require('debug')('jambonz:sbc-options-handler');
|
|
|
|
|
const fsServers = new Map();
|
|
|
|
|
const fsServiceUrls = new Map();
|
|
|
|
|
const rtpServers = new Map();
|
|
|
|
|
|
|
|
|
|
module.exports = ({srf, logger}) => {
|
|
|
|
|
@@ -7,6 +8,7 @@ module.exports = ({srf, logger}) => {
|
|
|
|
|
|
|
|
|
|
const setNameFs = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-fs`;
|
|
|
|
|
const setNameRtp = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-rtp`;
|
|
|
|
|
const setNameFsSeriveUrl = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:fs-service-url`;
|
|
|
|
|
|
|
|
|
|
/* check for expired servers every so often */
|
|
|
|
|
setInterval(async() => {
|
|
|
|
|
@@ -22,6 +24,16 @@ module.exports = ({srf, logger}) => {
|
|
|
|
|
logger.info({members}, `expired member ${key} from ${setNameFs} we now have ${countOfMembers}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (const [key, value] of fsServiceUrls) {
|
|
|
|
|
const duration = now - value;
|
|
|
|
|
if (duration > expires) {
|
|
|
|
|
fsServiceUrls.delete(key);
|
|
|
|
|
await removeFromSet(setNameFsSeriveUrl, key);
|
|
|
|
|
const members = await retrieveSet(setNameFsSeriveUrl);
|
|
|
|
|
const countOfMembers = members.length;
|
|
|
|
|
logger.info({members}, `expired member ${key} from ${setNameFsSeriveUrl} we now have ${countOfMembers}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for (const [key, value] of rtpServers) {
|
|
|
|
|
const duration = now - value;
|
|
|
|
|
if (duration > expires) {
|
|
|
|
|
@@ -40,12 +52,18 @@ module.exports = ({srf, logger}) => {
|
|
|
|
|
const now = Date.now();
|
|
|
|
|
const runningFs = await retrieveSet(setNameFs);
|
|
|
|
|
const runningRtp = await retrieveSet(setNameRtp);
|
|
|
|
|
const runningFsServiceUrls = await retrieveSet(setNameFsSeriveUrl);
|
|
|
|
|
|
|
|
|
|
if (runningFs.length) {
|
|
|
|
|
logger.info({runningFs}, 'start watching these FS servers');
|
|
|
|
|
for (const ip of runningFs) fsServers.set(ip, now);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (runningFsServiceUrls.length) {
|
|
|
|
|
logger.info({runningFsServiceUrls}, 'start watching these FS Service Urls');
|
|
|
|
|
for (const url of runningFsServiceUrls) fsServiceUrls.set(url, now);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (runningRtp.length) {
|
|
|
|
|
logger.info({runningRtp}, 'start watching these RTP servers');
|
|
|
|
|
for (const ip of runningRtp) rtpServers.set(ip, now);
|
|
|
|
|
@@ -56,29 +74,8 @@ module.exports = ({srf, logger}) => {
|
|
|
|
|
};
|
|
|
|
|
_init();
|
|
|
|
|
|
|
|
|
|
return async(req, res) => {
|
|
|
|
|
|
|
|
|
|
/* OPTIONS ping from internal FS or RTP server? */
|
|
|
|
|
const internal = req.has('X-FS-Status') || req.has('X-RTP-Status');
|
|
|
|
|
if (!internal) {
|
|
|
|
|
debug('got external OPTIONS ping');
|
|
|
|
|
res.send(200);
|
|
|
|
|
return req.srf.endSession(req);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
let map, status, countOfMembers;
|
|
|
|
|
const h = ['X-FS-Status', 'X-RTP-Status'].find((h) => req.has(h));
|
|
|
|
|
if (h) {
|
|
|
|
|
const isRtpServer = req.has('X-RTP-Status');
|
|
|
|
|
const key = isRtpServer ? req.source_address : `${req.source_address}:${req.source_port}`;
|
|
|
|
|
const prefix = isRtpServer ? 'X-RTP' : 'X-FS';
|
|
|
|
|
map = isRtpServer ? rtpServers : fsServers;
|
|
|
|
|
const setName = isRtpServer ? setNameRtp : setNameFs;
|
|
|
|
|
const gaugeName = isRtpServer ? 'rtpservers' : 'featureservers';
|
|
|
|
|
|
|
|
|
|
status = req.get(`${prefix}-Status`);
|
|
|
|
|
|
|
|
|
|
const _addToCache = async (map, status, setName, key) => {
|
|
|
|
|
let countOfMembers;
|
|
|
|
|
if (status === 'open') {
|
|
|
|
|
map.set(key, Date.now());
|
|
|
|
|
const exists = await isMemberOfSet(setName, key);
|
|
|
|
|
@@ -103,6 +100,37 @@ module.exports = ({srf, logger}) => {
|
|
|
|
|
logger.info({members}, `removed member ${key} from ${setName} we now have ${countOfMembers}`);
|
|
|
|
|
debug({members}, `removed member ${key} from ${setName}`);
|
|
|
|
|
}
|
|
|
|
|
return countOfMembers;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return async(req, res) => {
|
|
|
|
|
|
|
|
|
|
/* OPTIONS ping from internal FS or RTP server? */
|
|
|
|
|
const internal = req.has('X-FS-Status') || req.has('X-RTP-Status');
|
|
|
|
|
if (!internal) {
|
|
|
|
|
debug('got external OPTIONS ping');
|
|
|
|
|
res.send(200);
|
|
|
|
|
return req.srf.endSession(req);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
let map, status, countOfMembers;
|
|
|
|
|
const h = ['X-FS-Status', 'X-RTP-Status'].find((h) => req.has(h));
|
|
|
|
|
if (h) {
|
|
|
|
|
const isRtpServer = req.has('X-RTP-Status');
|
|
|
|
|
const key = isRtpServer ? req.source_address : `${req.source_address}:${req.source_port}`;
|
|
|
|
|
const prefix = isRtpServer ? 'X-RTP' : 'X-FS';
|
|
|
|
|
map = isRtpServer ? rtpServers : fsServers;
|
|
|
|
|
const setName = isRtpServer ? setNameRtp : setNameFs;
|
|
|
|
|
const gaugeName = isRtpServer ? 'rtpservers' : 'featureservers';
|
|
|
|
|
const fsServiceUrlKey = req.has('X-FS-ServiceUrl') ? req.get('X-FS-ServiceUrl') : null;
|
|
|
|
|
|
|
|
|
|
status = req.get(`${prefix}-Status`);
|
|
|
|
|
|
|
|
|
|
countOfMembers = await _addToCache(map, status, setName, key);
|
|
|
|
|
if (fsServiceUrlKey) {
|
|
|
|
|
await _addToCache(fsServiceUrls, stats, setNameFsSeriveUrl, fsServiceUrlKey);
|
|
|
|
|
}
|
|
|
|
|
stats.gauge(gaugeName, map.size);
|
|
|
|
|
}
|
|
|
|
|
res.send(200, {headers: {
|
|
|
|
|
|