fix timing issue of ephemeral gateways update/deletion (#132)

* fix timing issue of ephemeral gateways update/deletion

* Fix for potential regbot Zombie, and other concerns

* address performance concerns in regbot behavior
This commit is contained in:
Ed Robbins
2026-03-26 10:03:12 -04:00
committed by GitHub
parent 03dc4964b6
commit ce4ac89179
3 changed files with 255 additions and 45 deletions
+37
View File
@@ -57,6 +57,7 @@ class Regbot {
}
stop(srf) {
this.retired = true;
const { deleteEphemeralGateway } = srf.locals.realtimeDbHelpers;
clearTimeout(this.timer);
this.timer = null;
@@ -71,6 +72,38 @@ class Regbot {
}
stopTimer() {
this.retired = true;
clearTimeout(this.timer);
this.timer = null;
}
configKey() {
return [
this.voip_carrier_sid, this.ipv4, this.port,
this.username, this.password, this.sip_realm,
this.protocol, this.use_sips_scheme,
this.use_public_ip_in_contact, this.outbound_sip_proxy,
this.trunk_type, this.sip_gateway_sid,
this.account_sip_realm, this.fromUser, this.from
].join('|');
}
static configKeyFromOpts(opts) {
const sip_realm = opts.sip_realm || opts.ipv4;
const fromUser = opts.from_user || opts.username;
const fromDomain = opts.from_domain || sip_realm;
return [
opts.voip_carrier_sid, opts.ipv4, opts.port,
opts.username, opts.password, sip_realm,
opts.protocol, opts.use_sips_scheme || false,
opts.use_public_ip_in_contact || JAMBONES_REGBOT_CONTACT_USE_IP,
opts.outbound_sip_proxy,
opts.trunk_type, opts.sip_gateway_sid,
opts.account_sip_realm, fromUser, `sip:${fromUser}@${fromDomain}`
].join('|');
}
toJSON() {
return {
voip_carrier_sid: this.voip_carrier_sid,
@@ -146,6 +179,10 @@ class Regbot {
}
});
req.on('response', async(res) => {
if (this.retired) {
this.logger.info(`${this.aor}: ignoring response, regbot has been retired`);
return;
}
let expires;
if (res.status !== 200) {
this.status = 'fail';
+85 -39
View File
@@ -1,4 +1,5 @@
const debug = require('debug')('jambonz:sbc-registrar');
const crypto = require('crypto');
const {
JAMBONES_CLUSTER_ID,
JAMBONES_REGBOT_BATCH_SLEEP_MS,
@@ -15,8 +16,14 @@ const waitFor = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
let initialized = false;
const regbots = [];
const carriers = [];
const gateways = [];
let carriersHash = '';
let gatewaysHash = '';
function computeHash(arr) {
const h = crypto.createHash('md5');
for (const item of arr) h.update(JSON.stringify(item));
return h.digest('hex');
}
const getCountSuccessfulRegbots = () => regbots.filter((rb) => rb.status === 'registered').length;
@@ -210,7 +217,8 @@ const updateCarrierRegbots = async(logger, srf) => {
const cs = (await lookupAllVoipCarriers())
.filter((c) => c.requires_register && c.is_active)
.map((c) => pickRelevantCarrierProperties(c));
if (JSON.stringify(cs) !== JSON.stringify(carriers)) hasChanged = true;
const newCarriersHash = computeHash(cs);
if (newCarriersHash !== carriersHash) hasChanged = true;
for (const c of cs) {
try {
const arr = (await lookupSipGatewaysByCarrier(c.voip_carrier_sid))
@@ -224,47 +232,44 @@ const updateCarrierRegbots = async(logger, srf) => {
logger.error({ err }, 'updateCarrierRegbots Error retrieving gateways');
}
}
if (JSON.stringify(gws) !== JSON.stringify(gateways)) hasChanged = true;
const newGatewaysHash = computeHash(gws);
if (newGatewaysHash !== gatewaysHash) hasChanged = true;
if (hasChanged) {
debug('updateCarrierRegbots: got new or changed carriers');
logger.info({count: gws.length}, 'updateCarrierRegbots: got new or changed carriers');
// Clear and repopulate arrays in chunks to avoid argument limit
carriers.length = 0;
for (let i = 0; i < cs.length; i += 1000) {
Array.prototype.push.apply(carriers, cs.slice(i, i + 1000));
carriersHash = newCarriersHash;
gatewaysHash = newGatewaysHash;
// Build maps of existing regbots for O(1) lookup
const existingByKey = new Map();
const existingByCarrierIpPort = new Map();
for (const rb of regbots) {
existingByKey.set(rb.configKey(), rb);
existingByCarrierIpPort.set(`${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`, rb);
}
gateways.length = 0;
for (let i = 0; i < gws.length; i += 1000) {
Array.prototype.push.apply(gateways, gws.slice(i, i + 1000));
}
// preserve consecutive failure counts from existing regbots before stopping them
const failureCounts = new Map();
regbots.forEach((rb) => {
const key = `${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`;
failureCounts.set(key, rb.consecutiveRemoveFailures || 0);
});
// stop / kill existing regbots
regbots.forEach((rb) => rb.stop(srf));
regbots.length = 0;
// start new regbots
const newRegbots = [];
const newCarrierSids = new Set();
const keepKeys = new Set();
const accountSipRealmCache = new Map();
let batch_count = 0;
for (const gw of getUniqueGateways(gateways, logger)) {
// find gateway account sip realm.
for (const gw of getUniqueGateways(gws, logger)) {
let accountSipRealm;
if (!gw.carrier.register_public_ip_in_contact && gw.carrier.account_sid) {
const account = await lookupAccountBySid(gw.carrier.account_sid);
if (account && account.sip_realm) {
accountSipRealm = account.sip_realm;
const acctSid = gw.carrier.account_sid;
if (accountSipRealmCache.has(acctSid)) {
accountSipRealm = accountSipRealmCache.get(acctSid);
} else {
const account = await lookupAccountBySid(acctSid);
accountSipRealm = (account && account.sip_realm) || null;
accountSipRealmCache.set(acctSid, accountSipRealm);
}
}
try {
const rb = new Regbot(logger, {
const opts = {
voip_carrier_sid: gw.carrier.voip_carrier_sid,
account_sip_realm: accountSipRealm,
ipv4: gw.ipv4,
@@ -280,29 +285,70 @@ const updateCarrierRegbots = async(logger, srf) => {
outbound_sip_proxy: gw.carrier.outbound_sip_proxy,
trunk_type: gw.carrier.trunk_type,
sip_gateway_sid: gw.sip_gateway_sid
});
// restore consecutive failure count from previous regbot instance
const key = `${gw.carrier.voip_carrier_sid}:${gw.ipv4}:${gw.port}`;
if (failureCounts.has(key)) {
rb.consecutiveRemoveFailures = failureCounts.get(key);
};
const key = Regbot.configKeyFromOpts(opts);
if (existingByKey.has(key)) {
// Unchanged regbot -- keep the existing one running
const existing = existingByKey.get(key);
newRegbots.push(existing);
newCarrierSids.add(existing.voip_carrier_sid);
keepKeys.add(key);
} else {
// New or changed regbot -- only now construct the instance
const rb = new Regbot(logger, opts);
const oldRb = existingByCarrierIpPort.get(
`${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`);
if (oldRb) {
rb.consecutiveRemoveFailures = oldRb.consecutiveRemoveFailures || 0;
}
regbots.push(rb);
newRegbots.push(rb);
newCarrierSids.add(rb.voip_carrier_sid);
rb.start(srf);
batch_count++;
if (batch_count >= JAMBONES_REGBOT_BATCH_SIZE) {
batch_count = 0;
await sleepFor(JAMBONES_REGBOT_BATCH_SLEEP_MS);
}
}
} catch (err) {
const { updateVoipCarriersRegisterStatus } = srf.locals.dbHelpers;
updateVoipCarriersRegisterStatus(gw.carrier.voip_carrier_sid, JSON.stringify({
status: 'fail',
reason: err.message,
}));
logger.error({ err }, `Error starting regbot, ignore register for ${this.fr}`);
logger.error({ err },
`Error starting regbot for ${gw.carrier.register_username}@${gw.carrier.register_sip_realm}`);
}
}
logger.debug(`updateCarrierRegbots: we have started ${regbots.length} regbots`);
// Stop old regbots that are no longer needed
for (const [key, rb] of existingByKey) {
if (!keepKeys.has(key)) {
// Check if a replacement regbot exists for the same carrier
const hasReplacement = newCarrierSids.has(rb.voip_carrier_sid);
if (hasReplacement) {
// Config changed but carrier still active -- stop timer only,
// keep ephemeral gateways in Redis until new regbot overwrites them
rb.stopTimer();
logger.info(`config changed for regbot ${rb.aor}, preserving gateways until re-registered`);
} else {
// Carrier removed or deactivated -- full cleanup
rb.stop(srf);
logger.info(`removed regbot ${rb.aor}, deleted ephemeral gateways`);
}
}
}
// Replace the regbots array (chunked to avoid call stack argument limit)
regbots.length = 0;
for (let i = 0; i < newRegbots.length; i += 1000) {
Array.prototype.push.apply(regbots, newRegbots.slice(i, i + 1000));
}
logger.debug(`updateCarrierRegbots: ${regbots.length} regbots active, ` +
`${keepKeys.size} kept, ${regbots.length - keepKeys.size} new`);
}
} catch (err) {
logger.error({ err }, 'updateCarrierRegbots Error');
+127
View File
@@ -71,3 +71,130 @@ test('Can create regbot with valid sip_realm', (t) => {
t.fail('Regbot is not created with valid sip_realm');}
t.end();
});
test('configKey returns identical strings for identical config', (t) => {
const config = {
voip_carrier_sid: 'carrier-1',
ipv4: '2.3.4.5',
port: 5060,
username: 'user',
password: 'password',
sip_realm: 'sip.server.com',
protocol: 'udp',
account_sip_realm: 'example.com',
trunk_type: 'reg',
sip_gateway_sid: 'gw-1'
};
const rb1 = new Regbot(logger, config);
const rb2 = new Regbot(logger, config);
t.equal(rb1.configKey(), rb2.configKey(), 'identical config produces identical keys');
t.end();
});
test('static configKeyFromOpts matches instance configKey', (t) => {
const config = {
voip_carrier_sid: 'carrier-1',
ipv4: '2.3.4.5',
port: 5060,
username: 'user',
password: 'password',
sip_realm: 'sip.server.com',
protocol: 'udp',
account_sip_realm: 'example.com',
trunk_type: 'reg',
sip_gateway_sid: 'gw-1'
};
const rb = new Regbot(logger, config);
t.equal(Regbot.configKeyFromOpts(config), rb.configKey(),
'static method produces same key as instance method');
// also with from_user and from_domain overrides
const config2 = {...config, from_user: 'alice', from_domain: 'example.org'};
const rb2 = new Regbot(logger, config2);
t.equal(Regbot.configKeyFromOpts(config2), rb2.configKey(),
'static method matches instance with from_user/from_domain');
t.end();
});
test('configKey returns different strings when config differs', (t) => {
const base = {
voip_carrier_sid: 'carrier-1',
ipv4: '2.3.4.5',
port: 5060,
username: 'user',
password: 'password',
sip_realm: 'sip.server.com',
protocol: 'udp',
trunk_type: 'reg',
sip_gateway_sid: 'gw-1'
};
const baseKey = Regbot.configKeyFromOpts(base);
// each of these should produce a different key
const variants = [
{password: 'newpass'},
{username: 'other'},
{ipv4: '9.9.9.9'},
{port: 5080},
{sip_realm: 'other.com'},
{voip_carrier_sid: 'carrier-2'},
{sip_gateway_sid: 'gw-2'},
{from_user: 'override'},
{from_domain: 'custom.com'}
];
for (const override of variants) {
const key = Regbot.configKeyFromOpts({...base, ...override});
t.notEqual(key, baseKey, `changing ${Object.keys(override)[0]} produces a different key`);
}
t.end();
});
test('stopTimer clears timer without deleting gateways', (t) => {
const rb = new Regbot(logger, {
voip_carrier_sid: 'carrier-1',
ipv4: '2.3.4.5',
port: 5060,
username: 'user',
password: 'password',
sip_realm: 'sip.server.com',
protocol: 'udp',
});
// simulate a running timer
rb.timer = setTimeout(() => {}, 60000);
rb.addresses = ['1.2.3.4'];
rb.stopTimer();
t.equal(rb.timer, null, 'timer is cleared');
t.deepEqual(rb.addresses, ['1.2.3.4'], 'addresses are preserved');
t.equal(rb.retired, true, 'retired flag is set');
t.end();
});
test('stop sets retired flag', (t) => {
const rb = new Regbot(logger, {
voip_carrier_sid: 'carrier-1',
ipv4: '2.3.4.5',
port: 5060,
username: 'user',
password: 'password',
sip_realm: 'sip.server.com',
protocol: 'udp',
});
const srf = {
locals: {
realtimeDbHelpers: {
deleteEphemeralGateway: () => Promise.resolve()
}
}
};
rb.timer = setTimeout(() => {}, 60000);
rb.stop(srf);
t.equal(rb.retired, true, 'retired flag is set');
t.equal(rb.timer, null, 'timer is cleared');
t.end();
});