Files
sbc-sip-sidecar/lib/sip-trunk-register.js
2024-04-04 15:38:06 +07:00

312 lines
11 KiB
JavaScript

const debug = require('debug')('jambonz:sbc-registrar');
const {
JAMBONES_CLUSTER_ID,
JAMBONES_REGBOT_CONTACT_USE_IP,
JAMBONES_REGBOT_DEFAULT_EXPIRES_INTERVAL,
JAMBONES_REGBOT_MIN_EXPIRES_INTERVAL
} = require('./config');
const assert = require('assert');
const short = require('short-uuid');
const DEFAULT_EXPIRES = (parseInt(JAMBONES_REGBOT_DEFAULT_EXPIRES_INTERVAL) || 3600);
const MIN_EXPIRES = (parseInt(JAMBONES_REGBOT_MIN_EXPIRES_INTERVAL) || 30);
const MAX_INITIAL_DELAY = 15;
const REGBOT_STATUS_CHECK_INTERVAL = 60;
const regbotKey = `${(JAMBONES_CLUSTER_ID || 'default')}:regbot-token`;
const waitFor = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
let initialized = false;
const regbots = [];
const carriers = [];
const gateways = [];
function pickRelevantCarrierProperties(c) {
return {
voip_carrier_sid: c.voip_carrier_sid,
requires_register: c.requires_register,
is_active: c.is_active,
register_username: c.register_username,
register_password: c.register_password,
register_sip_realm: c.register_sip_realm,
register_from_user: c.register_from_user,
register_from_domain: c.register_from_domain,
register_public_ip_in_contact: c.register_public_ip_in_contact
};
}
class Regbot {
constructor(logger, opts) {
this.logger = logger;
['ipv4', 'port', 'username', 'password', 'sip_realm', 'protocol'].forEach((prop) => this[prop] = opts[prop]);
this.voip_carrier_sid = opts.voip_carrier_sid;
this.username = opts.username;
this.password = opts.password;
this.sip_realm = opts.sip_realm || opts.ipv4;
this.ipv4 = opts.ipv4;
this.port = opts.port;
this.use_public_ip_in_contact = opts.use_public_ip_in_contact || JAMBONES_REGBOT_CONTACT_USE_IP;
this.use_tls = opts.use_tls || false;
this.fromUser = opts.from_user || this.username;
const fromDomain = opts.from_domain || this.sip_realm;
this.from = `sip:${this.fromUser}@${fromDomain}`;
this.aor = `${this.fromUser}@${this.sip_realm}`;
this.status = 'none';
}
start(srf) {
this.logger.info(`starting regbot ${this.fromUser}@${this.sip_realm}`);
assert(!this.timer);
this.register(srf);
}
stop() {
this.logger.info(`stopping regbot ${this.fromUser}@${this.sip_realm}`);
clearTimeout(this.timer);
}
toJSON() {
return {
voip_carrier_sid: this.voip_carrier_sid,
username: this.username,
fromUser: this.fromUser,
sip_realm: this.sip_realm,
ipv4: this.ipv4,
port: this.port,
aor: this.aor,
status: this.status
};
}
async register(srf) {
const { updateVoipCarriersRegisterStatus } = srf.locals.dbHelpers;
try {
const contactAddress = this.use_public_ip_in_contact ?
`${this.fromUser}@${srf.locals.sbcPublicIpAddress}` : this.aor;
this.logger.debug(`sending REGISTER for ${this.aor}`);
const isIPv4 = /[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}/.test(this.ipv4);
const transport = this.use_tls ? 'tls ' :
this.protocol.includes('/') ? this.protocol.substring(0, this.protocol.indexOf('/')) :
this.protocol;
const proxy = `sip:${this.ipv4}${isIPv4 ? `:${this.port}` : ''};transport=${transport}`;
const req = await srf.request(`sip${transport === 'tls' ? 's' : ''}:${this.aor}`, {
method: 'REGISTER',
proxy,
headers: {
'From': this.from,
'Contact': `<sip:${contactAddress}>;expires=${DEFAULT_EXPIRES}`,
'Expires': DEFAULT_EXPIRES
},
auth: {
username: this.username,
password: this.password
}
});
req.on('response', (res) => {
if (res.status !== 200) {
this.status = 'fail';
this.logger.info(`${this.aor}: got ${res.status} registering to ${this.ipv4}:${this.port}`);
this.timer = setTimeout(this.register.bind(this, srf), 30 * 1000);
}
else {
this.status = 'registered';
let expires = DEFAULT_EXPIRES;
const contact = res.getParsedHeader('Contact');
if (contact.length > 0 && contact[0].params && contact[0].params.expires) {
if (contact[0].params.expires) expires = parseInt(contact[0].params.expires);
}
else if (res.has('Expires')) {
expires = parseInt(res.get('Expires'));
}
if (isNaN(expires) || expires < MIN_EXPIRES) {
this.logger.info({aor: this.aor, ipv4: this.ipv4, port: this.port},
`got expires of ${expires} in 200 OK, too small so setting to ${MIN_EXPIRES}`);
expires = MIN_EXPIRES;
}
debug(`setting timer for next register to ${expires} seconds`);
this.timer = setTimeout(this.register.bind(this, srf), (expires - 5) * 1000);
}
updateVoipCarriersRegisterStatus(this.voip_carrier_sid, JSON.stringify({
status: res.status === 200 ? 'ok' : 'fail',
reason: `${res.status} ${res.reason}`,
cseq: req.get('Cseq'),
callId: req.get('Call-Id')
}));
});
} catch (err) {
this.logger.error({ err }, `${this.aor}: Error registering to ${this.ipv4}:${this.port}`);
this.timer = setTimeout(this.register.bind(this, srf), 60 * 1000);
updateVoipCarriersRegisterStatus(this.voip_carrier_sid, JSON.stringify({
status: 'fail',
reason: err
}));
}
}
}
module.exports = async(logger, srf) => {
if (initialized) return;
initialized = true;
const { addKeyNx } = srf.locals.realtimeDbHelpers;
const myToken = short.generate();
srf.locals.regbot = {
myToken,
active: false
};
/* sleep a random duration between 0 and MAX_INITIAL_DELAY seconds */
const ms = Math.floor(Math.random() * MAX_INITIAL_DELAY) * 1000;
logger.info(`waiting ${ms}ms before attempting to claim regbot responsibility with token ${myToken}`);
await waitFor(ms);
/* try to claim responsibility */
const result = await addKeyNx(regbotKey, myToken, REGBOT_STATUS_CHECK_INTERVAL + 10);
if (result === 'OK') {
srf.locals.regbot.active = true;
logger.info(`successfully claimed regbot responsibility with token ${myToken}`);
}
else {
logger.info(`failed to claim regbot responsibility with my token ${myToken}`);
}
/* check every so often if I need to go from inactive->active (or vice versa) */
setInterval(checkStatus.bind(null, logger, srf), REGBOT_STATUS_CHECK_INTERVAL * 1000);
/* if I am the regbot holder, then kick it off */
if (srf.locals.regbot.active) {
updateCarrierRegbots(logger, srf)
.catch((err) => {
logger.error({ err }, 'updateCarrierRegbots failure');
});
}
return srf.locals.regbot.active;
};
const checkStatus = async(logger, srf) => {
const { addKeyNx, addKey, retrieveKey } = srf.locals.realtimeDbHelpers;
const { myToken, active } = srf.locals.regbot;
logger.info({ active, myToken }, 'checking in on regbot status');
try {
const token = await retrieveKey(regbotKey);
let grabForTheWheel = false;
if (active) {
if (token === myToken) {
logger.info('I am active, and shall continue in my role as regbot');
addKey(regbotKey, myToken, REGBOT_STATUS_CHECK_INTERVAL + 10)
.then(updateCarrierRegbots.bind(null, logger, srf))
.catch((err) => {
logger.error({ err }, 'updateCarrierRegbots failure');
});
}
else if (token && token !== myToken) {
logger.info('Someone else grabbed the role! I need to stand down');
regbots.forEach((rb) => rb.stop());
regbots.length = 0;
}
else {
grabForTheWheel = true;
regbots.forEach((rb) => rb.stop());
regbots.length = 0;
}
}
else {
if (token) {
logger.info('I am inactive and someone else is performing the role');
}
else {
grabForTheWheel = true;
}
}
if (grabForTheWheel) {
logger.info('regbot status is vacated, try to grab it!');
const result = await addKeyNx(regbotKey, myToken, REGBOT_STATUS_CHECK_INTERVAL + 10);
if (result === 'OK') {
srf.locals.regbot.active = true;
logger.info(`successfully claimed regbot responsibility with token ${myToken}`);
updateCarrierRegbots(logger, srf)
.catch((err) => {
logger.error({ err }, 'updateCarrierRegbots failure');
});
}
else {
srf.locals.regbot.active = false;
logger.info('failed to claim regbot responsibility');
}
}
} catch (err) {
logger.error({ err }, 'checkStatus: ERROR');
}
};
const updateCarrierRegbots = async(logger, srf) => {
// Check if We are
const { lookupAllVoipCarriers, lookupSipGatewaysByCarrier } = srf.locals.dbHelpers;
try {
/* first check: has anything changed (new carriers or gateways)? */
let hasChanged = false;
const gws = [];
const cs = (await lookupAllVoipCarriers())
.filter((c) => c.requires_register && c.is_active)
.map((c) => pickRelevantCarrierProperties(c));
if (JSON.stringify(cs) !== JSON.stringify(carriers)) hasChanged = true;
for (const c of cs) {
try {
const arr = (await lookupSipGatewaysByCarrier(c.voip_carrier_sid))
.filter((gw) => gw.outbound && gw.is_active)
.map((gw) => {
gw.carrier = pickRelevantCarrierProperties(c);
return gw;
});
gws.push(...arr);
} catch (err) {
logger.error({ err }, 'updateCarrierRegbots Error retrieving gateways');
}
}
if (JSON.stringify(gws) !== JSON.stringify(gateways)) hasChanged = true;
if (hasChanged) {
debug('updateCarrierRegbots: got new or changed carriers');
logger.info('updateCarrierRegbots: got new or changed carriers');
carriers.length = 0;
Array.prototype.push.apply(carriers, cs);
gateways.length = 0;
Array.prototype.push.apply(gateways, gws);
// stop / kill existing regbots
regbots.forEach((rb) => rb.stop());
regbots.length = 0;
// start new regbots
for (const gw of gateways) {
const rb = new Regbot(logger, {
voip_carrier_sid: gw.carrier.voip_carrier_sid,
ipv4: gw.ipv4,
port: gw.port,
protocol: gw.protocol,
username: gw.carrier.register_username,
password: gw.carrier.register_password,
sip_realm: gw.carrier.register_sip_realm,
from_user: gw.carrier.register_from_user,
from_domain: gw.carrier.register_from_domain,
use_public_ip_in_contact: gw.carrier.register_public_ip_in_contact,
use_tls: gw.carrier.register_use_tls
});
regbots.push(rb);
rb.start(srf);
}
logger.debug(`updateCarrierRegbots: we have started ${regbots.length} regbots`);
}
} catch (err) {
logger.error({ err }, 'updateCarrierRegbots Error');
}
};