mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 16:50:39 +00:00
188 lines
6.5 KiB
JavaScript
188 lines
6.5 KiB
JavaScript
const Emitter = require('events');
|
|
const bent = require('bent');
|
|
const assert = require('assert');
|
|
const PORT = process.env.AWS_SNS_PORT || 3001;
|
|
const {LifeCycleEvents} = require('./constants');
|
|
const express = require('express');
|
|
const app = express();
|
|
const getString = bent('string');
|
|
const AWS = require('aws-sdk');
|
|
const sns = new AWS.SNS({apiVersion: '2010-03-31'});
|
|
const autoscaling = new AWS.AutoScaling({apiVersion: '2011-01-01'});
|
|
const {Parser} = require('xml2js');
|
|
const parser = new Parser();
|
|
const {validatePayload} = require('verify-aws-sns-signature');
|
|
|
|
AWS.config.update({region: process.env.AWS_REGION});
|
|
|
|
class SnsNotifier extends Emitter {
|
|
constructor(logger) {
|
|
super();
|
|
|
|
this.logger = logger;
|
|
}
|
|
|
|
async _handlePost(req, res) {
|
|
try {
|
|
const parsedBody = JSON.parse(req.body);
|
|
this.logger.debug({headers: req.headers, body: parsedBody}, 'Received HTTP POST from AWS');
|
|
if (!validatePayload(parsedBody)) {
|
|
this.logger.info('incoming AWS SNS HTTP POST failed signature validation');
|
|
return res.sendStatus(403);
|
|
}
|
|
this.logger.debug('incoming HTTP POST passed validation');
|
|
res.sendStatus(200);
|
|
|
|
switch (parsedBody.Type) {
|
|
case 'SubscriptionConfirmation':
|
|
const response = await getString(parsedBody.SubscribeURL);
|
|
const result = await parser.parseStringPromise(response);
|
|
this.subscriptionArn = result.ConfirmSubscriptionResponse.ConfirmSubscriptionResult[0].SubscriptionArn[0];
|
|
this.subscriptionRequestId = result.ConfirmSubscriptionResponse.ResponseMetadata[0].RequestId[0];
|
|
this.logger.info({
|
|
subscriptionArn: this.subscriptionArn,
|
|
subscriptionRequestId: this.subscriptionRequestId
|
|
}, 'response from SNS SubscribeURL');
|
|
const data = await this.describeInstance();
|
|
this.lifecycleState = data.AutoScalingInstances[0].LifecycleState;
|
|
this.emit('SubscriptionConfirmation', {publicIp: this.publicIp});
|
|
break;
|
|
|
|
case 'Notification':
|
|
if (parsedBody.Subject.startsWith('Auto Scaling: Lifecycle action \'TERMINATING\'')) {
|
|
const msg = JSON.parse(parsedBody.Message);
|
|
if (msg.EC2InstanceId === this.instanceId) {
|
|
this.logger.info('SnsNotifier - begin scale-in operation');
|
|
this.scaleInParams = {
|
|
AutoScalingGroupName: msg.AutoScalingGroupName,
|
|
LifecycleActionResult: 'CONTINUE',
|
|
LifecycleActionToken: msg.LifecycleActionToken,
|
|
LifecycleHookName: msg.LifecycleHookName
|
|
};
|
|
this.operationalState = LifeCycleEvents.ScaleIn;
|
|
this.emit(LifeCycleEvents.ScaleIn);
|
|
this.unsubscribe();
|
|
}
|
|
else {
|
|
this.logger.debug(`SnsNotifier - instance ${msg.EC2InstanceId} is scaling in (not us)`);
|
|
}
|
|
}
|
|
break;
|
|
|
|
default:
|
|
this.logger.info(`unhandled SNS Post Type: ${parsedBody.Type}`);
|
|
}
|
|
|
|
} catch (err) {
|
|
this.logger.error({err}, 'Error processing SNS POST request');
|
|
if (!res.headersSent) res.sendStatus(500);
|
|
}
|
|
}
|
|
|
|
async init() {
|
|
try {
|
|
this.logger.debug('SnsNotifier: retrieving instance data');
|
|
this.instanceId = await getString('http://169.254.169.254/latest/meta-data/instance-id');
|
|
this.publicIp = await getString('http://169.254.169.254/latest/meta-data/public-ipv4');
|
|
this.snsEndpoint = `http://${this.publicIp}:${PORT}`;
|
|
this.logger.info({
|
|
instanceId: this.instanceId,
|
|
publicIp: this.publicIp,
|
|
snsEndpoint: this.snsEndpoint
|
|
}, 'retrieved AWS instance data');
|
|
|
|
// start listening
|
|
app.use(express.urlencoded({ extended: true }));
|
|
app.use(express.json());
|
|
app.use(express.text());
|
|
app.post('/', this._handlePost.bind(this));
|
|
app.use((err, req, res, next) => {
|
|
this.logger.error(err, 'burped error');
|
|
res.status(err.status || 500).json({msg: err.message});
|
|
});
|
|
app.listen(PORT);
|
|
|
|
} catch (err) {
|
|
this.logger.error({err}, 'Error retrieving AWS instance metadata');
|
|
}
|
|
}
|
|
|
|
async subscribe() {
|
|
try {
|
|
const response = await sns.subscribe({
|
|
Protocol: 'http',
|
|
TopicArn: process.env.AWS_SNS_TOPIC_ARM,
|
|
Endpoint: this.snsEndpoint
|
|
}).promise();
|
|
this.logger.info({response}, `response to SNS subscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
|
|
} catch (err) {
|
|
this.logger.error({err}, `Error subscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
|
|
}
|
|
}
|
|
|
|
async unsubscribe() {
|
|
if (!this.subscriptionArn) throw new Error('SnsNotifier#unsubscribe called without an active subscription');
|
|
try {
|
|
const response = await sns.unsubscribe({
|
|
SubscriptionArn: this.subscriptionArn
|
|
}).promise();
|
|
this.logger.info({response}, `response to SNS unsubscribe to ${process.env.AWS_SNS_TOPIC_ARM}`);
|
|
} catch (err) {
|
|
this.logger.error({err}, `Error unsubscribing to SNS topic arn ${process.env.AWS_SNS_TOPIC_ARM}`);
|
|
}
|
|
}
|
|
|
|
completeScaleIn() {
|
|
assert(this.scaleInParams);
|
|
autoscaling.completeLifecycleAction(this.scaleInParams, (err, response) => {
|
|
if (err) return this.logger.error({err}, 'Error completing scale-in');
|
|
this.logger.info({response}, 'Successfully completed scale-in action');
|
|
});
|
|
}
|
|
|
|
describeInstance() {
|
|
return new Promise((resolve, reject) => {
|
|
if (!this.instanceId) return reject('instance-id unknown');
|
|
autoscaling.describeAutoScalingInstances({
|
|
InstanceIds: [this.instanceId]
|
|
}, (err, data) => {
|
|
if (err) {
|
|
this.logger.error({err}, 'Error describing instances');
|
|
reject(err);
|
|
} else {
|
|
this.logger.info({data}, 'SnsNotifier: describeInstance');
|
|
resolve(data);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
}
|
|
|
|
module.exports = async function(logger) {
|
|
const notifier = new SnsNotifier(logger);
|
|
await notifier.init();
|
|
await notifier.subscribe();
|
|
|
|
process.on('SIGHUP', async() => {
|
|
try {
|
|
const data = await notifier.describeInstance();
|
|
const state = data.AutoScalingInstances[0].LifecycleState;
|
|
if (state !== notifier.lifecycleState) {
|
|
notifier.lifecycleState = state;
|
|
switch (state) {
|
|
case 'Standby':
|
|
notifier.emit(LifeCycleEvents.StandbyEnter);
|
|
break;
|
|
case 'InService':
|
|
notifier.emit(LifeCycleEvents.StandbyExit);
|
|
break;
|
|
}
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
}
|
|
});
|
|
return notifier;
|
|
};
|