mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-19 04:17:44 +00:00
add support for running in AWS autoscale cluster with graceful scale-in and standby
This commit is contained in:
10
app.js
10
app.js
@@ -16,6 +16,7 @@ const opts = Object.assign({
|
||||
timestamp: () => {return `, "time": "${new Date().toISOString()}"`;}
|
||||
}, {level: process.env.JAMBONES_LOGLEVEL || 'info'});
|
||||
const logger = require('pino')(opts);
|
||||
const {LifeCycleEvents} = require('./lib/utils/constants');
|
||||
const installSrfLocals = require('./lib/utils/install-srf-locals');
|
||||
installSrfLocals(srf, logger);
|
||||
|
||||
@@ -73,7 +74,14 @@ app.listen(PORT);
|
||||
|
||||
logger.info(`listening for HTTP requests on port ${PORT}, serviceUrl is ${srf.locals.serviceUrl}`);
|
||||
|
||||
const sessionTracker = require('./lib/session/session-tracker');
|
||||
const sessionTracker = srf.locals.sessionTracker = require('./lib/session/session-tracker');
|
||||
sessionTracker.on('idle', () => {
|
||||
if (srf.locals.lifecycleEmitter.operationalState === LifeCycleEvents.ScaleIn) {
|
||||
logger.info('scale-in complete now that calls have dried up');
|
||||
srf.locals.lifecycleEmitter.scaleIn();
|
||||
}
|
||||
});
|
||||
|
||||
setInterval(() => {
|
||||
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
|
||||
}, 5000);
|
||||
|
||||
@@ -44,6 +44,7 @@ class SessionTracker extends Emitter {
|
||||
assert(callSid);
|
||||
this.sessions.delete(callSid);
|
||||
this.logger.info(`SessionTracker:remove callSid ${callSid}, we have ${this.sessions.size} being tracked`);
|
||||
if (0 === this.sessions.size) this.emit('idle');
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
186
lib/utils/aws-sns-lifecycle.js
Normal file
186
lib/utils/aws-sns-lifecycle.js
Normal file
@@ -0,0 +1,186 @@
|
||||
const Emitter = require('events');
|
||||
const bent = require('bent');
|
||||
const assert = require('assert');
|
||||
const PORT = process.env.AWS_SNS_PORT || 3001;
|
||||
const {LifeCycleEvents} = require('./constants');
|
||||
const express = require('express');
|
||||
const app = express();
|
||||
const getString = bent('string');
|
||||
const AWS = require('aws-sdk');
|
||||
const sns = new AWS.SNS({apiVersion: '2010-03-31'});
|
||||
const autoscaling = new AWS.AutoScaling({apiVersion: '2011-01-01'});
|
||||
const {Parser} = require('xml2js');
|
||||
const parser = new Parser();
|
||||
const {validatePayload} = require('verify-aws-sns-signature');
|
||||
|
||||
AWS.config.update({region: process.env.AWS_REGION});
|
||||
|
||||
class SnsNotifier extends Emitter {
|
||||
constructor(logger) {
|
||||
super();
|
||||
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
async _handlePost(req, res) {
|
||||
try {
|
||||
const parsedBody = JSON.parse(req.body);
|
||||
this.logger.debug({headers: req.headers, body: parsedBody}, 'Received HTTP POST from AWS');
|
||||
if (!validatePayload(parsedBody)) {
|
||||
this.logger.info('incoming AWS SNS HTTP POST failed signature validation');
|
||||
return res.sendStatus(403);
|
||||
}
|
||||
this.logger.debug('incoming HTTP POST passed validation');
|
||||
res.sendStatus(200);
|
||||
|
||||
switch (parsedBody.Type) {
|
||||
case 'SubscriptionConfirmation':
|
||||
const response = await getString(parsedBody.SubscribeURL);
|
||||
const result = await parser.parseStringPromise(response);
|
||||
this.subscriptionArn = result.ConfirmSubscriptionResponse.ConfirmSubscriptionResult[0].SubscriptionArn[0];
|
||||
this.subscriptionRequestId = result.ConfirmSubscriptionResponse.ResponseMetadata[0].RequestId[0];
|
||||
this.logger.info({
|
||||
subscriptionArn: this.subscriptionArn,
|
||||
subscriptionRequestId: this.subscriptionRequestId
|
||||
}, 'response from SNS SubscribeURL');
|
||||
const data = await this.describeInstance();
|
||||
this.lifecycleState = data.AutoScalingInstances[0].LifecycleState;
|
||||
break;
|
||||
|
||||
case 'Notification':
|
||||
if (parsedBody.Subject.startsWith('Auto Scaling: Lifecycle action \'TERMINATING\'')) {
|
||||
const msg = JSON.parse(parsedBody.Message);
|
||||
if (msg.EC2InstanceId === this.instanceId) {
|
||||
this.logger.info('SnsNotifier - begin scale-in operation');
|
||||
this.scaleInParams = {
|
||||
AutoScalingGroupName: msg.AutoScalingGroupName,
|
||||
LifecycleActionResult: 'CONTINUE',
|
||||
LifecycleActionToken: msg.LifecycleActionToken,
|
||||
LifecycleHookName: msg.LifecycleHookName
|
||||
};
|
||||
this.operationalState = LifeCycleEvents.ScaleIn;
|
||||
this.emit(LifeCycleEvents.ScaleIn);
|
||||
this.unsubscribe();
|
||||
}
|
||||
else {
|
||||
this.logger.debug(`SnsNotifier - instance ${msg.EC2InstanceId} is scaling in (not us)`);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
this.logger.info(`unhandled SNS Post Type: ${parsedBody.Type}`);
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
this.logger.error({err}, 'Error processing SNS POST request');
|
||||
if (!res.headersSent) res.sendStatus(500);
|
||||
}
|
||||
}
|
||||
|
||||
async init() {
|
||||
try {
|
||||
this.logger.debug('SnsNotifier: retrieving instance data');
|
||||
this.instanceId = await getString('http://169.254.169.254/latest/meta-data/instance-id');
|
||||
this.publicIp = await getString('http://169.254.169.254/latest/meta-data/public-ipv4');
|
||||
this.snsEndpoint = `http://${this.publicIp}:${PORT}`;
|
||||
this.logger.info({
|
||||
instanceId: this.instanceId,
|
||||
publicIp: this.publicIp,
|
||||
snsEndpoint: this.snsEndpoint
|
||||
}, 'retrieved AWS instance data');
|
||||
|
||||
// start listening
|
||||
app.use(express.urlencoded({ extended: true }));
|
||||
app.use(express.json());
|
||||
app.use(express.text());
|
||||
app.post('/', this._handlePost.bind(this));
|
||||
app.use((err, req, res, next) => {
|
||||
this.logger.error(err, 'burped error');
|
||||
res.status(err.status || 500).json({msg: err.message});
|
||||
});
|
||||
app.listen(PORT);
|
||||
|
||||
} catch (err) {
|
||||
this.logger.error({err}, 'Error retrieving AWS instance metadata');
|
||||
}
|
||||
}
|
||||
|
||||
async subscribe() {
|
||||
try {
|
||||
const response = await sns.subscribe({
|
||||
Protocol: 'http',
|
||||
TopicArn: process.env.AWS_SNS_TOPIC_ARM,
|
||||
Endpoint: this.snsEndpoint
|
||||
}).promise();
|
||||
this.logger.info({response}, `response to SNS subscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
|
||||
} catch (err) {
|
||||
this.logger.error({err}, `Error subscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
|
||||
}
|
||||
}
|
||||
|
||||
async unsubscribe() {
|
||||
if (!this.subscriptionArn) throw new Error('SnsNotifier#unsubscribe called without an active subscription');
|
||||
try {
|
||||
const response = await sns.unsubscribe({
|
||||
SubscriptionArn: this.subscriptionArn
|
||||
}).promise();
|
||||
this.logger.info({response}, `response to SNS unsubscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
|
||||
} catch (err) {
|
||||
this.logger.error({err}, `Error unsubscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
|
||||
}
|
||||
}
|
||||
|
||||
completeScaleIn() {
|
||||
assert(this.scaleInParams);
|
||||
autoscaling.completeLifecycleAction(this.scaleInParams, (err, response) => {
|
||||
if (err) return this.logger.error({err}, 'Error completing scale-in');
|
||||
this.logger.info({response}, 'Successfully completed scale-in action');
|
||||
});
|
||||
}
|
||||
|
||||
describeInstance() {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!this.instanceId) return reject('instance-id unknown');
|
||||
autoscaling.describeAutoScalingInstances({
|
||||
InstanceIds: [this.instanceId]
|
||||
}, (err, data) => {
|
||||
if (err) {
|
||||
this.logger.error({err}, 'Error describing instances');
|
||||
reject(err);
|
||||
} else {
|
||||
this.logger.info({data}, 'SnsNotifier: describeInstance');
|
||||
resolve(data);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
module.exports = async function(logger) {
|
||||
const notifier = new SnsNotifier(logger);
|
||||
await notifier.init();
|
||||
await notifier.subscribe();
|
||||
|
||||
process.on('SIGHUP', async() => {
|
||||
try {
|
||||
const data = await notifier.describeInstance();
|
||||
const state = data.AutoScalingInstances[0].LifecycleState;
|
||||
if (state !== notifier.lifecycleState) {
|
||||
notifier.lifecycleState = state;
|
||||
switch (state) {
|
||||
case 'Standby':
|
||||
notifier.emit(LifeCycleEvents.StandbyEnter);
|
||||
break;
|
||||
case 'InService':
|
||||
notifier.emit(LifeCycleEvents.StandbyExit);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
});
|
||||
return notifier;
|
||||
};
|
||||
@@ -60,5 +60,10 @@
|
||||
"BufferOverrun": "mod_audio_fork::buffer_overrun",
|
||||
"JsonMessage": "mod_audio_fork::json"
|
||||
},
|
||||
"LifeCycleEvents" : {
|
||||
"ScaleIn": "scale-in",
|
||||
"StandbyEnter": "standby-enter",
|
||||
"StandbyExit": "standby-exit"
|
||||
},
|
||||
"MAX_SIMRINGS": 10
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ function initMS(logger, wrapper, ms) {
|
||||
function installSrfLocals(srf, logger) {
|
||||
logger.debug('installing srf locals');
|
||||
assert(!srf.locals.dbHelpers);
|
||||
const {getSBC} = require('./sbc-pinger')(logger);
|
||||
const {getSBC, lifecycleEmitter} = require('./sbc-pinger')(logger);
|
||||
const StatsCollector = require('jambonz-stats-collector');
|
||||
const stats = srf.locals.stats = new StatsCollector(logger);
|
||||
|
||||
@@ -135,6 +135,7 @@ function installSrfLocals(srf, logger) {
|
||||
ipv4: localIp,
|
||||
serviceUrl: `http://${localIp}:${PORT}`,
|
||||
getSBC,
|
||||
lifecycleEmitter,
|
||||
getFreeswitch,
|
||||
stats: stats
|
||||
});
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
const assert = require('assert');
|
||||
const noopLogger = {info: () => {}, error: () => {}};
|
||||
const {LifeCycleEvents} = require('./constants');
|
||||
const Emitter = require('events');
|
||||
const debug = require('debug')('jambonz:feature-server');
|
||||
|
||||
module.exports = (logger) => {
|
||||
@@ -13,6 +15,58 @@ module.exports = (logger) => {
|
||||
assert.ok(sbcs.length, 'JAMBONES_SBCS env var is empty or misconfigured');
|
||||
logger.info({sbcs}, 'SBC inventory');
|
||||
|
||||
// listen for SNS lifecycle changes
|
||||
let lifecycleEmitter = new Emitter();
|
||||
let dryUpCalls = false;
|
||||
if (process.env.AWS_SNS_TOPIC_ARM &&
|
||||
process.env.AWS_ACCESS_KEY_ID && process.env.AWS_SECRET_ACCESS_KEY && process.env.AWS_REGION) {
|
||||
|
||||
(async function() {
|
||||
try {
|
||||
lifecycleEmitter = await require('./aws-sns-lifecycle')(logger);
|
||||
|
||||
lifecycleEmitter
|
||||
.on(LifeCycleEvents.ScaleIn, () => {
|
||||
logger.info('AWS scale-in notification: begin drying up calls');
|
||||
dryUpCalls = true;
|
||||
lifecycleEmitter.operationalState = LifeCycleEvents.ScaleIn;
|
||||
|
||||
const {srf} = require('../..');
|
||||
pingProxies(srf);
|
||||
|
||||
// if we have zero calls, we can complete the scale-in right
|
||||
setTimeout(() => {
|
||||
const calls = srf.locals.sessionTracker.count;
|
||||
if (calls === 0) {
|
||||
logger.info('scale-in can complete immediately as we have no calls in progress');
|
||||
lifecycleEmitter.completeScaleIn();
|
||||
}
|
||||
else {
|
||||
logger.info(`${calls} calls in progress; scale-in will complete when they are done`);
|
||||
}
|
||||
}, 5000);
|
||||
})
|
||||
.on(LifeCycleEvents.StandbyEnter, () => {
|
||||
dryUpCalls = true;
|
||||
const {srf} = require('../..');
|
||||
pingProxies(srf);
|
||||
|
||||
logger.info('AWS enter pending state notification: begin drying up calls');
|
||||
})
|
||||
.on(LifeCycleEvents.StandbyExit, () => {
|
||||
dryUpCalls = false;
|
||||
const {srf} = require('../..');
|
||||
pingProxies(srf);
|
||||
|
||||
logger.info('AWS enter pending state notification: re-enable calls');
|
||||
});
|
||||
} catch (err) {
|
||||
logger.error({err}, 'Failure creating SNS notifier, lifecycle events will be disabled');
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
// send OPTIONS pings to SBCs
|
||||
async function pingProxies(srf) {
|
||||
for (const sbc of sbcs) {
|
||||
try {
|
||||
@@ -21,7 +75,7 @@ module.exports = (logger) => {
|
||||
uri: `sip:${sbc}`,
|
||||
method: 'OPTIONS',
|
||||
headers: {
|
||||
'X-FS-Status': ms ? 'open' : 'closed'
|
||||
'X-FS-Status': ms && !dryUpCalls ? 'open' : 'closed'
|
||||
}
|
||||
});
|
||||
req.on('response', (res) => {
|
||||
@@ -40,6 +94,7 @@ module.exports = (logger) => {
|
||||
}, 60000);
|
||||
|
||||
return {
|
||||
lifecycleEmitter,
|
||||
getSBC: () => sbcs[idxSbc++ % sbcs.length]
|
||||
};
|
||||
};
|
||||
|
||||
24
package-lock.json
generated
24
package-lock.json
generated
@@ -2184,8 +2184,7 @@
|
||||
"is-stream": {
|
||||
"version": "2.0.0",
|
||||
"resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.0.tgz",
|
||||
"integrity": "sha512-XCoy+WlUr7d1+Z8GgSuXmpuUFC9fOhRXglJMx+dwLKTkL44Cjd4W1Z5P+BQZpr+cR93aGP4S/s7Ftw6Nd/kiEw==",
|
||||
"dev": true
|
||||
"integrity": "sha512-XCoy+WlUr7d1+Z8GgSuXmpuUFC9fOhRXglJMx+dwLKTkL44Cjd4W1Z5P+BQZpr+cR93aGP4S/s7Ftw6Nd/kiEw=="
|
||||
},
|
||||
"is-stream-ended": {
|
||||
"version": "0.1.4",
|
||||
@@ -4043,6 +4042,27 @@
|
||||
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",
|
||||
"integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw="
|
||||
},
|
||||
"verify-aws-sns-signature": {
|
||||
"version": "0.0.5",
|
||||
"resolved": "https://registry.npmjs.org/verify-aws-sns-signature/-/verify-aws-sns-signature-0.0.5.tgz",
|
||||
"integrity": "sha512-3Z5bOYvUoFFLuGhOYOa6xjtdpNw8Hy7Grx4CuGEDCEGdXSQYDJrABRlnxl88blJqjoqNJYnY8C84pB5WD2tmSg==",
|
||||
"requires": {
|
||||
"bent": "^7.3.0",
|
||||
"parse-url": "^5.0.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"bent": {
|
||||
"version": "7.3.0",
|
||||
"resolved": "https://registry.npmjs.org/bent/-/bent-7.3.0.tgz",
|
||||
"integrity": "sha512-ROfdGmcW1shnkiV/PZhj2Gw0+TiIfBYYs40QBBFYszdd2f2D07zLOTDQm5D411srCacR3Wt5mjLstv0OMmtmFQ==",
|
||||
"requires": {
|
||||
"bytesish": "^0.4.1",
|
||||
"caseless": "~0.12.0",
|
||||
"is-stream": "^2.0.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"verror": {
|
||||
"version": "1.10.0",
|
||||
"resolved": "https://registry.npmjs.org/verror/-/verror-1.10.0.tgz",
|
||||
|
||||
@@ -40,7 +40,9 @@
|
||||
"jambonz-stats-collector": "^0.0.3",
|
||||
"moment": "^2.24.0",
|
||||
"parse-url": "^5.0.1",
|
||||
"pino": "^5.16.0"
|
||||
"pino": "^5.16.0",
|
||||
"verify-aws-sns-signature": "0.0.5",
|
||||
"xml2js": "^0.4.23"
|
||||
},
|
||||
"devDependencies": {
|
||||
"blue-tape": "^1.0.0",
|
||||
|
||||
Reference in New Issue
Block a user