From ce4ac891793fb1d52a21864fa8177a4042e68b2b Mon Sep 17 00:00:00 2001 From: Ed Robbins Date: Thu, 26 Mar 2026 10:03:12 -0400 Subject: [PATCH] fix timing issue of ephemeral gateways update/deletion (#132) * fix timing issue of ephemeral gateways update/deletion * Fix for potential regbot Zombie, and other concerns * address performance concerns in regbot behavior --- lib/regbot.js | 37 +++++++++++ lib/sip-trunk-register.js | 136 +++++++++++++++++++++++++------------- test/regbot-unit-test.js | 127 +++++++++++++++++++++++++++++++++++ 3 files changed, 255 insertions(+), 45 deletions(-) diff --git a/lib/regbot.js b/lib/regbot.js index d4b022c..0fb740e 100644 --- a/lib/regbot.js +++ b/lib/regbot.js @@ -57,6 +57,7 @@ class Regbot { } stop(srf) { + this.retired = true; const { deleteEphemeralGateway } = srf.locals.realtimeDbHelpers; clearTimeout(this.timer); this.timer = null; @@ -71,6 +72,38 @@ class Regbot { } + stopTimer() { + this.retired = true; + clearTimeout(this.timer); + this.timer = null; + } + + configKey() { + return [ + this.voip_carrier_sid, this.ipv4, this.port, + this.username, this.password, this.sip_realm, + this.protocol, this.use_sips_scheme, + this.use_public_ip_in_contact, this.outbound_sip_proxy, + this.trunk_type, this.sip_gateway_sid, + this.account_sip_realm, this.fromUser, this.from + ].join('|'); + } + + static configKeyFromOpts(opts) { + const sip_realm = opts.sip_realm || opts.ipv4; + const fromUser = opts.from_user || opts.username; + const fromDomain = opts.from_domain || sip_realm; + return [ + opts.voip_carrier_sid, opts.ipv4, opts.port, + opts.username, opts.password, sip_realm, + opts.protocol, opts.use_sips_scheme || false, + opts.use_public_ip_in_contact || JAMBONES_REGBOT_CONTACT_USE_IP, + opts.outbound_sip_proxy, + opts.trunk_type, opts.sip_gateway_sid, + opts.account_sip_realm, fromUser, `sip:${fromUser}@${fromDomain}` + ].join('|'); + } + toJSON() { return { voip_carrier_sid: this.voip_carrier_sid, @@ -146,6 +179,10 @@ class Regbot { } }); req.on('response', async(res) => { + if (this.retired) { + this.logger.info(`${this.aor}: ignoring response, regbot has been retired`); + return; + } let expires; if (res.status !== 200) { this.status = 'fail'; diff --git a/lib/sip-trunk-register.js b/lib/sip-trunk-register.js index 2d6239f..1891d71 100644 --- a/lib/sip-trunk-register.js +++ b/lib/sip-trunk-register.js @@ -1,4 +1,5 @@ const debug = require('debug')('jambonz:sbc-registrar'); +const crypto = require('crypto'); const { JAMBONES_CLUSTER_ID, JAMBONES_REGBOT_BATCH_SLEEP_MS, @@ -15,8 +16,14 @@ const waitFor = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); let initialized = false; const regbots = []; -const carriers = []; -const gateways = []; +let carriersHash = ''; +let gatewaysHash = ''; + +function computeHash(arr) { + const h = crypto.createHash('md5'); + for (const item of arr) h.update(JSON.stringify(item)); + return h.digest('hex'); +} const getCountSuccessfulRegbots = () => regbots.filter((rb) => rb.status === 'registered').length; @@ -210,7 +217,8 @@ const updateCarrierRegbots = async(logger, srf) => { const cs = (await lookupAllVoipCarriers()) .filter((c) => c.requires_register && c.is_active) .map((c) => pickRelevantCarrierProperties(c)); - if (JSON.stringify(cs) !== JSON.stringify(carriers)) hasChanged = true; + const newCarriersHash = computeHash(cs); + if (newCarriersHash !== carriersHash) hasChanged = true; for (const c of cs) { try { const arr = (await lookupSipGatewaysByCarrier(c.voip_carrier_sid)) @@ -224,47 +232,44 @@ const updateCarrierRegbots = async(logger, srf) => { logger.error({ err }, 'updateCarrierRegbots Error retrieving gateways'); } } - if (JSON.stringify(gws) !== JSON.stringify(gateways)) hasChanged = true; + const newGatewaysHash = computeHash(gws); + if (newGatewaysHash !== gatewaysHash) hasChanged = true; if (hasChanged) { debug('updateCarrierRegbots: got new or changed carriers'); logger.info({count: gws.length}, 'updateCarrierRegbots: got new or changed carriers'); - // Clear and repopulate arrays in chunks to avoid argument limit - carriers.length = 0; - for (let i = 0; i < cs.length; i += 1000) { - Array.prototype.push.apply(carriers, cs.slice(i, i + 1000)); + carriersHash = newCarriersHash; + gatewaysHash = newGatewaysHash; + + // Build maps of existing regbots for O(1) lookup + const existingByKey = new Map(); + const existingByCarrierIpPort = new Map(); + for (const rb of regbots) { + existingByKey.set(rb.configKey(), rb); + existingByCarrierIpPort.set(`${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`, rb); } - gateways.length = 0; - for (let i = 0; i < gws.length; i += 1000) { - Array.prototype.push.apply(gateways, gws.slice(i, i + 1000)); - } - - // preserve consecutive failure counts from existing regbots before stopping them - const failureCounts = new Map(); - regbots.forEach((rb) => { - const key = `${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`; - failureCounts.set(key, rb.consecutiveRemoveFailures || 0); - }); - - // stop / kill existing regbots - regbots.forEach((rb) => rb.stop(srf)); - regbots.length = 0; - - // start new regbots + const newRegbots = []; + const newCarrierSids = new Set(); + const keepKeys = new Set(); + const accountSipRealmCache = new Map(); let batch_count = 0; - for (const gw of getUniqueGateways(gateways, logger)) { - // find gateway account sip realm. + + for (const gw of getUniqueGateways(gws, logger)) { let accountSipRealm; if (!gw.carrier.register_public_ip_in_contact && gw.carrier.account_sid) { - const account = await lookupAccountBySid(gw.carrier.account_sid); - if (account && account.sip_realm) { - accountSipRealm = account.sip_realm; + const acctSid = gw.carrier.account_sid; + if (accountSipRealmCache.has(acctSid)) { + accountSipRealm = accountSipRealmCache.get(acctSid); + } else { + const account = await lookupAccountBySid(acctSid); + accountSipRealm = (account && account.sip_realm) || null; + accountSipRealmCache.set(acctSid, accountSipRealm); } } try { - const rb = new Regbot(logger, { + const opts = { voip_carrier_sid: gw.carrier.voip_carrier_sid, account_sip_realm: accountSipRealm, ipv4: gw.ipv4, @@ -280,18 +285,32 @@ const updateCarrierRegbots = async(logger, srf) => { outbound_sip_proxy: gw.carrier.outbound_sip_proxy, trunk_type: gw.carrier.trunk_type, sip_gateway_sid: gw.sip_gateway_sid - }); - // restore consecutive failure count from previous regbot instance - const key = `${gw.carrier.voip_carrier_sid}:${gw.ipv4}:${gw.port}`; - if (failureCounts.has(key)) { - rb.consecutiveRemoveFailures = failureCounts.get(key); - } - regbots.push(rb); - rb.start(srf); - batch_count++; - if (batch_count >= JAMBONES_REGBOT_BATCH_SIZE) { - batch_count = 0; - await sleepFor(JAMBONES_REGBOT_BATCH_SLEEP_MS); + }; + + const key = Regbot.configKeyFromOpts(opts); + + if (existingByKey.has(key)) { + // Unchanged regbot -- keep the existing one running + const existing = existingByKey.get(key); + newRegbots.push(existing); + newCarrierSids.add(existing.voip_carrier_sid); + keepKeys.add(key); + } else { + // New or changed regbot -- only now construct the instance + const rb = new Regbot(logger, opts); + const oldRb = existingByCarrierIpPort.get( + `${rb.voip_carrier_sid}:${rb.ipv4}:${rb.port}`); + if (oldRb) { + rb.consecutiveRemoveFailures = oldRb.consecutiveRemoveFailures || 0; + } + newRegbots.push(rb); + newCarrierSids.add(rb.voip_carrier_sid); + rb.start(srf); + batch_count++; + if (batch_count >= JAMBONES_REGBOT_BATCH_SIZE) { + batch_count = 0; + await sleepFor(JAMBONES_REGBOT_BATCH_SLEEP_MS); + } } } catch (err) { const { updateVoipCarriersRegisterStatus } = srf.locals.dbHelpers; @@ -299,10 +318,37 @@ const updateCarrierRegbots = async(logger, srf) => { status: 'fail', reason: err.message, })); - logger.error({ err }, `Error starting regbot, ignore register for ${this.fr}`); + logger.error({ err }, + `Error starting regbot for ${gw.carrier.register_username}@${gw.carrier.register_sip_realm}`); } } - logger.debug(`updateCarrierRegbots: we have started ${regbots.length} regbots`); + + // Stop old regbots that are no longer needed + for (const [key, rb] of existingByKey) { + if (!keepKeys.has(key)) { + // Check if a replacement regbot exists for the same carrier + const hasReplacement = newCarrierSids.has(rb.voip_carrier_sid); + if (hasReplacement) { + // Config changed but carrier still active -- stop timer only, + // keep ephemeral gateways in Redis until new regbot overwrites them + rb.stopTimer(); + logger.info(`config changed for regbot ${rb.aor}, preserving gateways until re-registered`); + } else { + // Carrier removed or deactivated -- full cleanup + rb.stop(srf); + logger.info(`removed regbot ${rb.aor}, deleted ephemeral gateways`); + } + } + } + + // Replace the regbots array (chunked to avoid call stack argument limit) + regbots.length = 0; + for (let i = 0; i < newRegbots.length; i += 1000) { + Array.prototype.push.apply(regbots, newRegbots.slice(i, i + 1000)); + } + + logger.debug(`updateCarrierRegbots: ${regbots.length} regbots active, ` + + `${keepKeys.size} kept, ${regbots.length - keepKeys.size} new`); } } catch (err) { logger.error({ err }, 'updateCarrierRegbots Error'); diff --git a/test/regbot-unit-test.js b/test/regbot-unit-test.js index 7c62566..e5e1ac7 100644 --- a/test/regbot-unit-test.js +++ b/test/regbot-unit-test.js @@ -71,3 +71,130 @@ test('Can create regbot with valid sip_realm', (t) => { t.fail('Regbot is not created with valid sip_realm');} t.end(); }); + +test('configKey returns identical strings for identical config', (t) => { + const config = { + voip_carrier_sid: 'carrier-1', + ipv4: '2.3.4.5', + port: 5060, + username: 'user', + password: 'password', + sip_realm: 'sip.server.com', + protocol: 'udp', + account_sip_realm: 'example.com', + trunk_type: 'reg', + sip_gateway_sid: 'gw-1' + }; + const rb1 = new Regbot(logger, config); + const rb2 = new Regbot(logger, config); + t.equal(rb1.configKey(), rb2.configKey(), 'identical config produces identical keys'); + t.end(); +}); + +test('static configKeyFromOpts matches instance configKey', (t) => { + const config = { + voip_carrier_sid: 'carrier-1', + ipv4: '2.3.4.5', + port: 5060, + username: 'user', + password: 'password', + sip_realm: 'sip.server.com', + protocol: 'udp', + account_sip_realm: 'example.com', + trunk_type: 'reg', + sip_gateway_sid: 'gw-1' + }; + const rb = new Regbot(logger, config); + t.equal(Regbot.configKeyFromOpts(config), rb.configKey(), + 'static method produces same key as instance method'); + + // also with from_user and from_domain overrides + const config2 = {...config, from_user: 'alice', from_domain: 'example.org'}; + const rb2 = new Regbot(logger, config2); + t.equal(Regbot.configKeyFromOpts(config2), rb2.configKey(), + 'static method matches instance with from_user/from_domain'); + t.end(); +}); + +test('configKey returns different strings when config differs', (t) => { + const base = { + voip_carrier_sid: 'carrier-1', + ipv4: '2.3.4.5', + port: 5060, + username: 'user', + password: 'password', + sip_realm: 'sip.server.com', + protocol: 'udp', + trunk_type: 'reg', + sip_gateway_sid: 'gw-1' + }; + const baseKey = Regbot.configKeyFromOpts(base); + + // each of these should produce a different key + const variants = [ + {password: 'newpass'}, + {username: 'other'}, + {ipv4: '9.9.9.9'}, + {port: 5080}, + {sip_realm: 'other.com'}, + {voip_carrier_sid: 'carrier-2'}, + {sip_gateway_sid: 'gw-2'}, + {from_user: 'override'}, + {from_domain: 'custom.com'} + ]; + for (const override of variants) { + const key = Regbot.configKeyFromOpts({...base, ...override}); + t.notEqual(key, baseKey, `changing ${Object.keys(override)[0]} produces a different key`); + } + t.end(); +}); + +test('stopTimer clears timer without deleting gateways', (t) => { + const rb = new Regbot(logger, { + voip_carrier_sid: 'carrier-1', + ipv4: '2.3.4.5', + port: 5060, + username: 'user', + password: 'password', + sip_realm: 'sip.server.com', + protocol: 'udp', + }); + + // simulate a running timer + rb.timer = setTimeout(() => {}, 60000); + rb.addresses = ['1.2.3.4']; + + rb.stopTimer(); + + t.equal(rb.timer, null, 'timer is cleared'); + t.deepEqual(rb.addresses, ['1.2.3.4'], 'addresses are preserved'); + t.equal(rb.retired, true, 'retired flag is set'); + t.end(); +}); + +test('stop sets retired flag', (t) => { + const rb = new Regbot(logger, { + voip_carrier_sid: 'carrier-1', + ipv4: '2.3.4.5', + port: 5060, + username: 'user', + password: 'password', + sip_realm: 'sip.server.com', + protocol: 'udp', + }); + + const srf = { + locals: { + realtimeDbHelpers: { + deleteEphemeralGateway: () => Promise.resolve() + } + } + }; + + rb.timer = setTimeout(() => {}, 60000); + rb.stop(srf); + + t.equal(rb.retired, true, 'retired flag is set'); + t.equal(rb.timer, null, 'timer is cleared'); + t.end(); +});