add support for AWS autoscaling (#10)

This commit is contained in:
Dave Horton
2021-10-02 12:41:52 -04:00
committed by GitHub
parent 5267b6f9a4
commit 04c4ad5975
6 changed files with 1486 additions and 28 deletions

19
app.js
View File

@@ -27,6 +27,7 @@ const {
});
const StatsCollector = require('@jambonz/stats-collector');
const stats = new StatsCollector(logger);
const {LifeCycleEvents} = require('./lib/constants');
const setNameRtp = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-rtp`;
const rtpServers = [];
const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-sip`;
@@ -48,7 +49,7 @@ const {
database: process.env.JAMBONES_MYSQL_DATABASE,
connectionLimit: process.env.JAMBONES_MYSQL_CONNECTION_LIMIT || 10
}, logger);
const {createSet, retrieveSet, addToSet, incrKey, decrKey} = require('@jambonz/realtimedb-helpers')({
const {createSet, retrieveSet, addToSet, removeFromSet, incrKey, decrKey} = require('@jambonz/realtimedb-helpers')({
host: process.env.JAMBONES_REDIS_HOST || 'localhost',
port: process.env.JAMBONES_REDIS_PORT || 6379
}, logger);
@@ -108,7 +109,9 @@ if (process.env.DRACHTIO_HOST) {
else if (arr && 'tcp' === arr[1] && matcher.contains(arr[2])) {
const hostport = `${arr[2]}:${arr[3]}`;
logger.info(`adding sbc private address to redis: ${hostport}`);
addToSet(setName, hostport);
srf.locals.addToRedis = () => addToSet(setName, hostport);
srf.locals.removeFromRedis = () => removeFromSet(setName, hostport);
srf.locals.addToRedis();
}
}
});
@@ -189,4 +192,16 @@ else {
getActiveRtpServers();
}
const {lifecycleEmitter} = require('./lib/autoscale-manager')(logger);
/* if we are scaling in, check every so often if call count has gone to zero */
setInterval(async() => {
if (lifecycleEmitter.operationalState === LifeCycleEvents.ScaleIn) {
if (0 === activeCallIds.size) {
logger.info('scale-in complete now that calls have dried up');
lifecycleEmitter.scaleIn();
}
}
}, 20000);
module.exports = {srf, logger};

63
lib/autoscale-manager.js Normal file
View File

@@ -0,0 +1,63 @@
const noopLogger = {info: () => {}, error: () => {}};
const {LifeCycleEvents} = require('./constants');
const Emitter = require('events');
module.exports = (logger) => {
logger = logger || noopLogger;
// listen for SNS lifecycle changes
let lifecycleEmitter = new Emitter();
lifecycleEmitter.dryUpCalls = false;
if (process.env.AWS_SNS_TOPIC_ARM && process.env.AWS_REGION) {
(async function() {
try {
lifecycleEmitter = await require('./aws-sns-lifecycle')(logger);
lifecycleEmitter
.on(LifeCycleEvents.ScaleIn, async() => {
logger.info('AWS scale-in notification: begin drying up calls');
lifecycleEmitter.dryUpCalls = true;
lifecycleEmitter.operationalState = LifeCycleEvents.ScaleIn;
const {srf} = require('..');
const {activeCallIds, removeFromRedis} = srf.locals;
/* remove our private IP from the set of active SBCs so rtp and fs know we are gone */
removeFromRedis();
/* if we have zero calls, we can complete the scale-in right now */
const calls = activeCallIds.size;
if (0 === calls) {
logger.info('scale-in can complete immediately as we have no calls in progress');
lifecycleEmitter.completeScaleIn();
}
else {
logger.info(`${calls} calls in progress; scale-in will complete when they are done`);
}
})
.on(LifeCycleEvents.StandbyEnter, () => {
lifecycleEmitter.dryUpCalls = true;
const {srf} = require('..');
const {removeFromRedis} = srf.locals;
removeFromRedis();
logger.info('AWS enter pending state notification: begin drying up calls');
})
.on(LifeCycleEvents.StandbyExit, () => {
lifecycleEmitter.dryUpCalls = false;
const {srf} = require('..');
const {addToRedis} = srf.locals;
addToRedis();
logger.info('AWS exit pending state notification: re-enable calls');
});
} catch (err) {
logger.error({err}, 'Failure creating SNS notifier, lifecycle events will be disabled');
}
})();
}
return {lifecycleEmitter};
};

186
lib/aws-sns-lifecycle.js Normal file
View File

@@ -0,0 +1,186 @@
const Emitter = require('events');
const bent = require('bent');
const assert = require('assert');
const PORT = process.env.AWS_SNS_PORT || 3001;
const {LifeCycleEvents} = require('./constants');
const express = require('express');
const app = express();
const getString = bent('string');
const AWS = require('aws-sdk');
const sns = new AWS.SNS({apiVersion: '2010-03-31'});
const autoscaling = new AWS.AutoScaling({apiVersion: '2011-01-01'});
const {Parser} = require('xml2js');
const parser = new Parser();
const {validatePayload} = require('verify-aws-sns-signature');
AWS.config.update({region: process.env.AWS_REGION});
class SnsNotifier extends Emitter {
constructor(logger) {
super();
this.logger = logger;
}
async _handlePost(req, res) {
try {
const parsedBody = JSON.parse(req.body);
this.logger.debug({headers: req.headers, body: parsedBody}, 'Received HTTP POST from AWS');
if (!validatePayload(parsedBody)) {
this.logger.info('incoming AWS SNS HTTP POST failed signature validation');
return res.sendStatus(403);
}
this.logger.debug('incoming HTTP POST passed validation');
res.sendStatus(200);
switch (parsedBody.Type) {
case 'SubscriptionConfirmation':
const response = await getString(parsedBody.SubscribeURL);
const result = await parser.parseStringPromise(response);
this.subscriptionArn = result.ConfirmSubscriptionResponse.ConfirmSubscriptionResult[0].SubscriptionArn[0];
this.subscriptionRequestId = result.ConfirmSubscriptionResponse.ResponseMetadata[0].RequestId[0];
this.logger.info({
subscriptionArn: this.subscriptionArn,
subscriptionRequestId: this.subscriptionRequestId
}, 'response from SNS SubscribeURL');
const data = await this.describeInstance();
this.lifecycleState = data.AutoScalingInstances[0].LifecycleState;
break;
case 'Notification':
if (parsedBody.Subject.startsWith('Auto Scaling: Lifecycle action \'TERMINATING\'')) {
const msg = JSON.parse(parsedBody.Message);
if (msg.EC2InstanceId === this.instanceId) {
this.logger.info('SnsNotifier - begin scale-in operation');
this.scaleInParams = {
AutoScalingGroupName: msg.AutoScalingGroupName,
LifecycleActionResult: 'CONTINUE',
LifecycleActionToken: msg.LifecycleActionToken,
LifecycleHookName: msg.LifecycleHookName
};
this.operationalState = LifeCycleEvents.ScaleIn;
this.emit(LifeCycleEvents.ScaleIn);
this.unsubscribe();
}
else {
this.logger.debug(`SnsNotifier - instance ${msg.EC2InstanceId} is scaling in (not us)`);
}
}
break;
default:
this.logger.info(`unhandled SNS Post Type: ${parsedBody.Type}`);
}
} catch (err) {
this.logger.error({err}, 'Error processing SNS POST request');
if (!res.headersSent) res.sendStatus(500);
}
}
async init() {
try {
this.logger.debug('SnsNotifier: retrieving instance data');
this.instanceId = await getString('http://169.254.169.254/latest/meta-data/instance-id');
this.publicIp = await getString('http://169.254.169.254/latest/meta-data/public-ipv4');
this.snsEndpoint = `http://${this.publicIp}:${PORT}`;
this.logger.info({
instanceId: this.instanceId,
publicIp: this.publicIp,
snsEndpoint: this.snsEndpoint
}, 'retrieved AWS instance data');
// start listening
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(express.text());
app.post('/', this._handlePost.bind(this));
app.use((err, req, res, next) => {
this.logger.error(err, 'burped error');
res.status(err.status || 500).json({msg: err.message});
});
app.listen(PORT);
} catch (err) {
this.logger.error({err}, 'Error retrieving AWS instance metadata');
}
}
async subscribe() {
try {
const response = await sns.subscribe({
Protocol: 'http',
TopicArn: process.env.AWS_SNS_TOPIC_ARM,
Endpoint: this.snsEndpoint
}).promise();
this.logger.info({response}, `response to SNS subscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
} catch (err) {
this.logger.error({err}, `Error subscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
}
}
async unsubscribe() {
if (!this.subscriptionArn) throw new Error('SnsNotifier#unsubscribe called without an active subscription');
try {
const response = await sns.unsubscribe({
SubscriptionArn: this.subscriptionArn
}).promise();
this.logger.info({response}, `response to SNS unsubscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
} catch (err) {
this.logger.error({err}, `Error unsubscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
}
}
completeScaleIn() {
assert(this.scaleInParams);
autoscaling.completeLifecycleAction(this.scaleInParams, (err, response) => {
if (err) return this.logger.error({err}, 'Error completing scale-in');
this.logger.info({response}, 'Successfully completed scale-in action');
});
}
describeInstance() {
return new Promise((resolve, reject) => {
if (!this.instanceId) return reject('instance-id unknown');
autoscaling.describeAutoScalingInstances({
InstanceIds: [this.instanceId]
}, (err, data) => {
if (err) {
this.logger.error({err}, 'Error describing instances');
reject(err);
} else {
this.logger.info({data}, 'SnsNotifier: describeInstance');
resolve(data);
}
});
});
}
}
module.exports = async function(logger) {
const notifier = new SnsNotifier(logger);
await notifier.init();
await notifier.subscribe();
process.on('SIGHUP', async() => {
try {
const data = await notifier.describeInstance();
const state = data.AutoScalingInstances[0].LifecycleState;
if (state !== notifier.lifecycleState) {
notifier.lifecycleState = state;
switch (state) {
case 'Standby':
notifier.emit(LifeCycleEvents.StandbyEnter);
break;
case 'InService':
notifier.emit(LifeCycleEvents.StandbyExit);
break;
}
}
} catch (err) {
console.error(err);
}
});
return notifier;
};

7
lib/constants.json Normal file
View File

@@ -0,0 +1,7 @@
{
"LifeCycleEvents" : {
"ScaleIn": "scale-in",
"StandbyEnter": "standby-enter",
"StandbyExit": "standby-exit"
}
}

1232
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -27,10 +27,15 @@
"dependencies": {
"@jambonz/db-helpers": "^0.6.12",
"@jambonz/http-authenticator": "^0.2.0",
"@jambonz/realtimedb-helpers": "^0.4.3",
"@jambonz/realtimedb-helpers": "^0.4.8",
"@jambonz/rtpengine-utils": "^0.1.12",
"@jambonz/stats-collector": "^0.1.5",
"@jambonz/time-series": "^0.1.5",
"aws-sdk": "^2.848.0",
"bent": "^7.3.12",
"express": "^4.17.1",
"verify-aws-sns-signature": "^0.0.6",
"xml2js": "^0.4.23",
"cidr-matcher": "^2.1.1",
"debug": "^4.3.1",
"drachtio-fn-b2b-sugar": "0.0.12",