only connect to drachtio server if connected to freeswitch (#1123)

* only connect to drachtio server if connected to freeswitch

* wip

* wip
This commit is contained in:
Hoan Luu Huu
2025-04-22 20:55:39 +07:00
committed by GitHub
parent 0bf2013934
commit 15b583ef2c
2 changed files with 74 additions and 37 deletions

81
app.js
View File

@@ -27,8 +27,54 @@ const pino = require('pino');
const logger = pino(opts, pino.destination({sync: false})); const logger = pino(opts, pino.destination({sync: false}));
const {LifeCycleEvents, FS_UUID_SET_NAME, SystemState, FEATURE_SERVER} = require('./lib/utils/constants'); const {LifeCycleEvents, FS_UUID_SET_NAME, SystemState, FEATURE_SERVER} = require('./lib/utils/constants');
const installSrfLocals = require('./lib/utils/install-srf-locals'); const installSrfLocals = require('./lib/utils/install-srf-locals');
installSrfLocals(srf, logger); const createHttpListener = require('./lib/utils/http-listener');
const healthCheck = require('@jambonz/http-health-check');
// Install the srf locals
installSrfLocals(srf, logger, {
onFreeswitchConnect: (wraper) => {
// Only connect to drachtio if freeswitch is connected
logger.info(`connected to freeswitch at ${wraper.ms.address}, start drachtio server`);
if (DRACHTIO_HOST) {
srf.connect({host: DRACHTIO_HOST, port: DRACHTIO_PORT, secret: DRACHTIO_SECRET });
srf.on('connect', (err, hp) => {
const arr = /^(.*)\/(.*)$/.exec(hp.split(',').pop());
srf.locals.localSipAddress = `${arr[2]}`;
logger.info(`connected to drachtio listening on ${hp}, local sip address is ${srf.locals.localSipAddress}`);
});
}
else {
logger.info(`listening for drachtio requests on port ${DRACHTIO_PORT}`);
srf.listen({port: DRACHTIO_PORT, secret: DRACHTIO_SECRET});
}
// Start Http server
createHttpListener(logger, srf)
.then(({server, app}) => {
httpServer = server;
healthCheck({app, logger, path: '/', fn: getCount});
return {server, app};
})
.catch((err) => {
logger.error(err, 'Error creating http listener');
});
},
onFreeswitchDisconnect: (wraper) => {
// check if all freeswitch connections are lost, disconnect drachtio server
logger.info(`lost connection to freeswitch at ${wraper.ms.address}`);
const ms = srf.locals.getFreeswitch();
if (!ms) {
logger.info('no freeswitch connections, stopping drachtio server');
disconnect();
}
}
});
if (NODE_ENV === 'test') {
srf.on('error', (err) => {
logger.info(err, 'Error connecting to drachtio');
});
}
// Init services
const writeSystemAlerts = srf.locals?.writeSystemAlerts; const writeSystemAlerts = srf.locals?.writeSystemAlerts;
if (writeSystemAlerts) { if (writeSystemAlerts) {
writeSystemAlerts({ writeSystemAlerts({
@@ -54,24 +100,6 @@ const {
const InboundCallSession = require('./lib/session/inbound-call-session'); const InboundCallSession = require('./lib/session/inbound-call-session');
const SipRecCallSession = require('./lib/session/siprec-call-session'); const SipRecCallSession = require('./lib/session/siprec-call-session');
if (DRACHTIO_HOST) {
srf.connect({host: DRACHTIO_HOST, port: DRACHTIO_PORT, secret: DRACHTIO_SECRET });
srf.on('connect', (err, hp) => {
const arr = /^(.*)\/(.*)$/.exec(hp.split(',').pop());
srf.locals.localSipAddress = `${arr[2]}`;
logger.info(`connected to drachtio listening on ${hp}, local sip address is ${srf.locals.localSipAddress}`);
});
}
else {
logger.info(`listening for drachtio requests on port ${DRACHTIO_PORT}`);
srf.listen({port: DRACHTIO_PORT, secret: DRACHTIO_SECRET});
}
if (NODE_ENV === 'test') {
srf.on('error', (err) => {
logger.info(err, 'Error connecting to drachtio');
});
}
srf.use('invite', [ srf.use('invite', [
initLocals, initLocals,
createRootSpan, createRootSpan,
@@ -97,21 +125,9 @@ sessionTracker.on('idle', () => {
} }
}); });
const getCount = () => sessionTracker.count; const getCount = () => sessionTracker.count;
const healthCheck = require('@jambonz/http-health-check');
let httpServer; let httpServer;
const createHttpListener = require('./lib/utils/http-listener');
createHttpListener(logger, srf)
.then(({server, app}) => {
httpServer = server;
healthCheck({app, logger, path: '/', fn: getCount});
return {server, app};
})
.catch((err) => {
logger.error(err, 'Error creating http listener');
});
const monInterval = setInterval(async() => { const monInterval = setInterval(async() => {
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count); srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
try { try {
@@ -133,6 +149,7 @@ const disconnect = () => {
httpServer?.on('close', resolve); httpServer?.on('close', resolve);
httpServer?.close(); httpServer?.close();
srf.disconnect(); srf.disconnect();
srf.removeAllListeners();
srf.locals.mediaservers?.forEach((ms) => ms.disconnect()); srf.locals.mediaservers?.forEach((ms) => ms.disconnect());
}); });
}; };

View File

@@ -31,18 +31,26 @@ function getLocalIp() {
return '127.0.0.1'; // Fallback to localhost if no suitable interface found return '127.0.0.1'; // Fallback to localhost if no suitable interface found
} }
function initMS(logger, wrapper, ms) { function initMS(logger, wrapper, ms, {
onFreeswitchConnect,
onFreeswitchDisconnect
}) {
Object.assign(wrapper, {ms, active: true, connects: 1}); Object.assign(wrapper, {ms, active: true, connects: 1});
logger.info(`connected to freeswitch at ${ms.address}`); logger.info(`connected to freeswitch at ${ms.address}`);
onFreeswitchConnect(wrapper);
ms.conn ms.conn
.on('esl::end', () => { .on('esl::end', () => {
wrapper.active = false; wrapper.active = false;
wrapper.connects = 0;
logger.info(`lost connection to freeswitch at ${ms.address}`); logger.info(`lost connection to freeswitch at ${ms.address}`);
onFreeswitchDisconnect(wrapper);
ms.removeAllListeners();
}) })
.on('esl::ready', () => { .on('esl::ready', () => {
if (wrapper.connects > 0) { if (wrapper.connects > 0) {
logger.info(`connected to freeswitch at ${ms.address}`); logger.info(`esl::ready connected to freeswitch at ${ms.address}`);
} }
wrapper.connects = 1; wrapper.connects = 1;
wrapper.active = true; wrapper.active = true;
@@ -56,7 +64,10 @@ function initMS(logger, wrapper, ms) {
}); });
} }
function installSrfLocals(srf, logger) { function installSrfLocals(srf, logger, {
onFreeswitchConnect = () => {},
onFreeswitchDisconnect = () => {}
}) {
logger.debug('installing srf locals'); logger.debug('installing srf locals');
assert(!srf.locals.dbHelpers); assert(!srf.locals.dbHelpers);
const {tracer} = srf.locals.otel; const {tracer} = srf.locals.otel;
@@ -91,7 +102,10 @@ function installSrfLocals(srf, logger) {
mediaservers.push(val); mediaservers.push(val);
try { try {
const ms = await mrf.connect(fs); const ms = await mrf.connect(fs);
initMS(logger, val, ms); initMS(logger, val, ms, {
onFreeswitchConnect,
onFreeswitchDisconnect
});
} }
catch (err) { catch (err) {
logger.info({err}, `failed connecting to freeswitch at ${fs.address}, will retry shortly: ${err.message}`); logger.info({err}, `failed connecting to freeswitch at ${fs.address}, will retry shortly: ${err.message}`);
@@ -102,9 +116,15 @@ function installSrfLocals(srf, logger) {
for (const val of mediaservers) { for (const val of mediaservers) {
if (val.connects === 0) { if (val.connects === 0) {
try { try {
// make sure all listeners are removed before reconnecting
val.ms?.disconnect();
val.ms = null;
logger.info({mediaserver: val.opts}, 'Retrying initial connection to media server'); logger.info({mediaserver: val.opts}, 'Retrying initial connection to media server');
const ms = await mrf.connect(val.opts); const ms = await mrf.connect(val.opts);
initMS(logger, val, ms); initMS(logger, val, ms, {
onFreeswitchConnect,
onFreeswitchDisconnect
});
} catch (err) { } catch (err) {
logger.info({err}, `failed connecting to freeswitch at ${val.opts.address}, will retry shortly`); logger.info({err}, `failed connecting to freeswitch at ${val.opts.address}, will retry shortly`);
} }