Feature/multi forks on ec2 (#182)

* changes to allow multiple instances to run in an EC2 autoscale deployment

* fix health check

* fixup aws sns notification so it subscribes using bound port

* AWS SNS port range 3010-3019
This commit is contained in:
Dave Horton
2022-10-30 13:07:49 -04:00
committed by GitHub
parent f044cdd150
commit 70caf00dd1
5 changed files with 109 additions and 56 deletions

View File

@@ -10,7 +10,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
with:
node-version: 14
node-version: 16
- run: npm ci
- run: npm run jslint
- run: docker pull drachtio/sipp

47
app.js
View File

@@ -15,7 +15,6 @@ const tracer = require('./tracer')(process.env.JAMBONES_OTEL_SERVICE_NAME || 'ja
const api = require('@opentelemetry/api');
srf.locals = {...srf.locals, otel: {tracer, api}};
const PORT = process.env.HTTP_PORT || 3000;
const opts = {level: process.env.JAMBONES_LOGLEVEL || 'info'};
const pino = require('pino');
const logger = pino(opts, pino.destination({sync: false}));
@@ -33,17 +32,6 @@ const {
invokeWebCallback
} = require('./lib/middleware')(srf, logger);
// HTTP
const express = require('express');
const helmet = require('helmet');
const app = express();
Object.assign(app.locals, {
logger,
srf
});
const httpRoutes = require('./lib/http-routes');
const InboundCallSession = require('./lib/session/inbound-call-session');
const SipRecCallSession = require('./lib/session/siprec-call-session');
@@ -82,20 +70,6 @@ srf.invite(async(req, res) => {
session.exec();
});
// HTTP
app.use(helmet());
app.use(helmet.hidePoweredBy());
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use('/', httpRoutes);
app.use((err, req, res, next) => {
logger.error(err, 'burped error');
res.status(err.status || 500).json({msg: err.message});
});
const httpServer = app.listen(PORT);
logger.info(`listening for HTTP requests on port ${PORT}, serviceUrl is ${srf.locals.serviceUrl}`);
const sessionTracker = srf.locals.sessionTracker = require('./lib/session/session-tracker');
sessionTracker.on('idle', () => {
if (srf.locals.lifecycleEmitter.operationalState === LifeCycleEvents.ScaleIn) {
@@ -103,19 +77,30 @@ sessionTracker.on('idle', () => {
srf.locals.lifecycleEmitter.scaleIn();
}
});
const getCount = () => sessionTracker.count;
const healthCheck = require('@jambonz/http-health-check');
healthCheck({app, logger, path: '/', fn: getCount});
let httpServer;
const createHttpListener = require('./lib/utils/http-listener');
createHttpListener(logger, srf)
.then(({server, app}) => {
httpServer = server;
healthCheck({app, logger, path: '/', fn: getCount});
return {server, app};
})
.catch((err) => {
logger.error(err, 'Error creating http listener');
});
setInterval(() => {
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
}, 5000);
}, 20000);
const disconnect = () => {
return new Promise ((resolve) => {
httpServer.on('close', resolve);
httpServer.close();
httpServer?.on('close', resolve);
httpServer?.close();
srf.disconnect();
srf.locals.mediaservers.forEach((ms) => ms.disconnect());
});

View File

@@ -1,7 +1,7 @@
const Emitter = require('events');
const bent = require('bent');
const assert = require('assert');
const PORT = process.env.AWS_SNS_PORT || 3001;
const PORT = process.env.AWS_SNS_PORT || 3010;
const {LifeCycleEvents} = require('./constants');
const express = require('express');
const app = express();
@@ -21,6 +21,26 @@ class SnsNotifier extends Emitter {
this.logger = logger;
}
_doListen(logger, app, port, resolve) {
return app.listen(port, () => {
this.snsEndpoint = `http://${this.publicIp}:${port}`;
logger.info(`SNS lifecycle server listening on http://localhost:${port}`);
resolve(app);
});
}
_handleErrors(logger, app, resolve, reject, e) {
if (e.code === 'EADDRINUSE' &&
process.env.AWS_SNS_PORT_MAX &&
e.port < process.env.AWS_SNS_PORT_MAX) {
logger.info(`SNS lifecycle server failed to bind port on ${e.port}, will try next port`);
const server = this._doListen(logger, app, ++e.port, resolve);
server.on('error', this._handleErrors.bind(null, logger, app, resolve, reject));
return;
}
reject(e);
}
async _handlePost(req, res) {
try {
@@ -84,11 +104,9 @@ class SnsNotifier extends Emitter {
this.logger.debug('SnsNotifier: retrieving instance data');
this.instanceId = await getString('http://169.254.169.254/latest/meta-data/instance-id');
this.publicIp = await getString('http://169.254.169.254/latest/meta-data/public-ipv4');
this.snsEndpoint = `http://${this.publicIp}:${PORT}`;
this.logger.info({
instanceId: this.instanceId,
publicIp: this.publicIp,
snsEndpoint: this.snsEndpoint
publicIp: this.publicIp
}, 'retrieved AWS instance data');
// start listening
@@ -100,7 +118,10 @@ class SnsNotifier extends Emitter {
this.logger.error(err, 'burped error');
res.status(err.status || 500).json({msg: err.message});
});
app.listen(PORT);
return new Promise((resolve, reject) => {
const server = this._doListen(this.logger, app, PORT, resolve);
server.on('error', this._handleErrors.bind(null, this.logger, app, resolve, reject));
});
} catch (err) {
this.logger.error({err}, 'Error retrieving AWS instance metadata');

View File

@@ -23,6 +23,7 @@ AND vc.name = ?`;
const speechMapper = (cred) => {
const {credential, ...obj} = cred;
try {
if ('google' === obj.vendor) {
obj.service_key = decrypt(credential);
}
@@ -43,6 +44,8 @@ const speechMapper = (cred) => {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
}
} catch (err) {
}
return obj;
};

View File

@@ -0,0 +1,44 @@
const express = require('express');
const httpRoutes = require('../http-routes');
const PORT = process.env.HTTP_PORT || 3000;
const doListen = (logger, app, port, resolve) => {
const server = app.listen(port, () => {
const {srf} = app.locals;
logger.info(`listening for HTTP requests on port ${PORT}, serviceUrl is ${srf.locals.serviceUrl}`);
resolve({server, app});
});
return server;
};
const handleErrors = (logger, app, resolve, reject, e) => {
if (e.code === 'EADDRINUSE' &&
process.env.HTTP_PORT_MAX &&
e.port < process.env.HTTP_PORT_MAX) {
logger.info(`HTTP server failed to bind port on ${e.port}, will try next port`);
const server = doListen(logger, app, ++e.port, resolve);
server.on('error', handleErrors.bind(null, logger, app, resolve, reject));
return;
}
reject(e);
};
const createHttpListener = (logger, srf) => {
const app = express();
app.locals = {...app.locals, logger, srf};
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use('/', httpRoutes);
app.use((err, req, res, next) => {
logger.error(err, 'burped error');
res.status(err.status || 500).json({msg: err.message});
});
return new Promise((resolve, reject) => {
const server = doListen(logger, app, PORT, resolve);
server.on('error', handleErrors.bind(null, logger, app, resolve, reject));
});
};
module.exports = createHttpListener;