K8s changes (#55)

* K8S: dont send OPTIONS pings

* fix missing ref

* k8s pre-stop hook added

* k8s pre-stop hook changes

* chmod +x utility

* more k8s pre-stop changes

* pre stop

* fix healthcheck

* k8s pre-stop working

* add readiness probe

* fix bug in pre-stop

* logging

* revamp k8s pre-stop a bit

* initial support for cognigy bot

* more cognigy changes

* switch to use transcribe for cognigy

* #54 include callInfo in dialogflow event payload
This commit is contained in:
Dave Horton
2022-01-06 12:41:14 -05:00
committed by GitHub
parent 0e45e9b27c
commit 3bf1984854
15 changed files with 1095 additions and 310 deletions

View File

@@ -1,4 +1,4 @@
FROM node:16 FROM node:17.0.1-slim
WORKDIR /opt/app/ WORKDIR /opt/app/
COPY package.json ./ COPY package.json ./
RUN npm install RUN npm install

4
app.js
View File

@@ -92,6 +92,10 @@ sessionTracker.on('idle', () => {
} }
}); });
const getCount = () => sessionTracker.count;
const healthCheck = require('@jambonz/http-health-check');
healthCheck({app, logger, path: '/', fn: getCount});
setInterval(() => { setInterval(() => {
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count); srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
}, 5000); }, 5000);

29
bin/k8s-pre-stop-hook.js Executable file
View File

@@ -0,0 +1,29 @@
#!/usr/bin/env node
const bent = require('bent');
const getJSON = bent('json');
const PORT = process.env.HTTP_PORT || 3000;
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
(async function() {
try {
do {
const obj = await getJSON(`http://127.0.0.1:${PORT}/`);
const {calls} = obj;
if (calls === 0) {
console.log('no calls on the system, we can exit');
process.exit(0);
}
else {
console.log(`waiting for ${calls} to exit..`);
}
await sleep(10000);
} while (1);
} catch (err) {
console.error(err, 'Error querying health endpoint');
process.exit(-1);
}
})();

View File

@@ -9,8 +9,4 @@ api.use('/enqueue', require('./enqueue'));
api.use('/messaging', require('./messaging')); // inbound SMS api.use('/messaging', require('./messaging')); // inbound SMS
api.use('/createMessage', require('./create-message')); // outbound SMS (REST) api.use('/createMessage', require('./create-message')); // outbound SMS (REST)
// health checks
api.get('/', (req, res) => res.sendStatus(200));
api.get('/health', (req, res) => res.sendStatus(200));
module.exports = api; module.exports = api;

View File

@@ -1,16 +1,23 @@
const express = require('express'); const express = require('express');
const api = require('./api'); const api = require('./api');
const routes = express.Router(); const routes = express.Router();
const sessionTracker = require('../session/session-tracker');
const readiness = (req, res) => {
const logger = req.app.locals.logger;
const {count} = sessionTracker;
const {srf} = require('../..');
const {getFreeswitch} = srf.locals;
if (getFreeswitch()) {
return res.status(200).json({calls: count});
}
logger.info('responding to /health check with failure as freeswitch is not up');
res.sendStatus(480);
};
routes.use('/v1', api); routes.use('/v1', api);
// health checks // health check
routes.get('/', (req, res) => { routes.get('/health', readiness);
res.sendStatus(200);
});
routes.get('/health', (req, res) => {
res.sendStatus(200);
});
module.exports = routes; module.exports = routes;

242
lib/tasks/cognigy.js Normal file
View File

@@ -0,0 +1,242 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const makeTask = require('./make_task');
const { SocketClient } = require('@cognigy/socket-client');
const parseGallery = (obj = {}) => {
const {_default} = obj;
if (_default) {
const {_gallery} = _default;
if (_gallery) return _gallery.fallbackText;
}
};
const parseQuickReplies = (obj) => {
const {_default} = obj;
if (_default) {
const {_quickReplies} = _default;
if (_quickReplies) return _quickReplies.text || _quickReplies.fallbackText;
}
};
const parseBotText = (evt) => {
const {text, data} = evt;
if (text) return text;
switch (data?.type) {
case 'quickReplies':
return parseQuickReplies(data?._cognigy);
case 'gallery':
return parseGallery(data?._cognigy);
default:
break;
}
};
class Cognigy extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.url = this.data.url;
this.token = this.data.token;
this.prompt = this.data.prompt;
this.eventHook = this.data?.eventHook;
this.actionHook = this.data?.actionHook;
this.data = this.data.data || {};
this.prompts = [];
}
get name() { return TaskName.Cognigy; }
get hasReportedFinalAction() {
return this.reportedFinalAction || this.isReplacingApplication;
}
async exec(cs, ep) {
await super.exec(cs);
this.ep = ep;
try {
/* set event handlers and start transcribing */
this.on('transcription', this._onTranscription.bind(this, cs, ep));
this.on('error', this._onError.bind(this, cs, ep));
this.transcribeTask = this._makeTranscribeTask();
this.transcribeTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy transcribe task returned error');
this.notifyTaskDone();
});
if (this.prompt) {
this.sayTask = this._makeSayTask(this.prompt);
this.sayTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy say task returned error');
this.notifyTaskDone();
});
}
/* connect to the bot and send initial data */
this.client = new SocketClient(
this.url,
this.token,
{
sessionId: cs.callSid,
channel: 'jambonz'
}
);
this.client.on('output', this._onBotUtterance.bind(this, cs, ep));
this.client.on('typingStatus', this._onBotTypingStatus.bind(this, cs, ep));
this.client.on('error', this._onBotError.bind(this, cs, ep));
this.client.on('finalPing', this._onBotFinalPing.bind(this, cs, ep));
await this.client.connect();
this.client.sendMessage('', {...this.data, ...cs.callInfo});
await this.awaitTaskDone();
} catch (err) {
this.logger.error({err}, 'Cognigy error');
throw err;
}
}
async kill(cs) {
super.kill(cs);
this.logger.debug('Cognigy:kill');
this.removeAllListeners();
this.transcribeTask && this.transcribeTask.kill();
this.client.removeAllListeners();
if (this.client && this.client.connected) this.client.disconnect();
if (!this.hasReportedFinalAction) {
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'caller hungup'})
.catch((err) => this.logger.info({err}, 'cognigy - error w/ action webook'));
}
if (this.ep.connected) {
await this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
}
this.notifyTaskDone();
}
_makeTranscribeTask() {
const opts = {
recognizer: this.data.recognizer || {
vendor: 'default',
language: 'default'
}
};
this.logger.debug({opts}, 'constructing a nested transcribe object');
const transcribe = makeTask(this.logger, {transcribe: opts}, this);
return transcribe;
}
_makeSayTask(text) {
const opts = {
text,
synthesizer: this.data.synthesizer ||
{
vendor: 'default',
language: 'default',
voice: 'default'
}
};
this.logger.debug({opts}, 'constructing a nested say object');
const say = makeTask(this.logger, {say: opts}, this);
return say;
}
async _onBotError(cs, ep, evt) {
this.logger.info({evt}, 'Cognigy:_onBotError');
this.performAction({cognigyResult: 'botError', message: evt.message });
this.reportedFinalAction = true;
this.notifyTaskDone();
}
async _onBotTypingStatus(cs, ep, evt) {
this.logger.info({evt}, 'Cognigy:_onBotTypingStatus');
}
async _onBotFinalPing(cs, ep) {
this.logger.info('Cognigy:_onBotFinalPing');
if (this.prompts.length) {
const text = this.prompts.join('.');
this.prompts = [];
if (text && !this.killed) {
this.sayTask = this._makeSayTask(text);
this.sayTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy say task returned error');
this.notifyTaskDone();
});
}
}
}
async _onBotUtterance(cs, ep, evt) {
this.logger.debug({evt}, 'Cognigy:_onBotUtterance');
if (this.eventHook) {
this.performHook(cs, this.eventHook, {event: 'botMessage', message: evt})
.then((redirected) => {
if (redirected) {
this.logger.info('Cognigy_onTranscription: event handler for bot message redirected us to new webhook');
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'redirect'}, false);
}
return;
})
.catch(({err}) => {
this.logger.info({err}, 'Cognigy_onTranscription: error sending event hook');
});
}
const text = parseBotText(evt);
this.prompts.push(text);
}
async _onTranscription(cs, ep, evt) {
this.logger.debug({evt}, `Cognigy: got transcription for callSid ${cs.callSid}`);
const utterance = evt.alternatives[0].transcript;
if (this.eventHook) {
this.performHook(cs, this.eventHook, {event: 'userMessage', message: utterance})
.then((redirected) => {
if (redirected) {
this.logger.info('Cognigy_onTranscription: event handler for user message redirected us to new webhook');
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'redirect'}, false);
if (this.transcribeTask) this.transcribeTask.kill(cs);
}
return;
})
.catch(({err}) => {
this.logger.info({err}, 'Cognigy_onTranscription: error sending event hook');
});
}
/* send the user utterance to the bot */
try {
if (this.client && this.client.connected) {
this.client.sendMessage(utterance);
}
else {
this.logger.info('Cognigy_onTranscription - not sending user utterance as bot is disconnected');
}
} catch (err) {
this.logger.error({err}, 'Cognigy_onTranscription: Error sending user utterance to Cognigy - ending task');
this.performAction({cognigyResult: 'socketError'});
this.reportedFinalAction = true;
this.notifyTaskDone();
}
}
_onError(cs, ep, err) {
this.logger.debug({err}, 'Cognigy: got error');
if (!this.hasReportedFinalAction) this.performAction({cognigyResult: 'error', err});
this.reportedFinalAction = true;
this.notifyTaskDone();
}
}
module.exports = Cognigy;

View File

@@ -452,8 +452,8 @@ class Dialogflow extends Task {
this.noinputTimer = setTimeout(this._onNoInput.bind(this, ep, cs), this.noInputTimeout); this.noinputTimer = setTimeout(this._onNoInput.bind(this, ep, cs), this.noInputTimeout);
} }
async _performHook(cs, hook, results) { async _performHook(cs, hook, results = {}) {
const json = await this.cs.requestor.request(hook, results); const json = await this.cs.requestor.request(hook, {...results, ...cs.callInfo.toJSON()});
if (json && Array.isArray(json)) { if (json && Array.isArray(json)) {
const makeTask = require('../make_task'); const makeTask = require('../make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));

View File

@@ -20,6 +20,9 @@ function makeTask(logger, obj, parent) {
case TaskName.SipRefer: case TaskName.SipRefer:
const TaskSipRefer = require('./sip_refer'); const TaskSipRefer = require('./sip_refer');
return new TaskSipRefer(logger, data, parent); return new TaskSipRefer(logger, data, parent);
case TaskName.Cognigy:
const TaskCognigy = require('./cognigy');
return new TaskCognigy(logger, data, parent);
case TaskName.Conference: case TaskName.Conference:
const TaskConference = require('./conference'); const TaskConference = require('./conference');
return new TaskConference(logger, data, parent); return new TaskConference(logger, data, parent);

View File

@@ -21,6 +21,22 @@
"referTo" "referTo"
] ]
}, },
"cognigy": {
"properties": {
"url": "string",
"token": "string",
"recognizer": "#recognizer",
"tts": "#synthesizer",
"prompt": "string",
"actionHook": "object|string",
"eventHook": "object|string",
"data": "object"
},
"required": [
"url",
"token"
]
},
"dequeue": { "dequeue": {
"properties": { "properties": {
"name": "string", "name": "string",
@@ -308,7 +324,6 @@
"earlyMedia": "boolean" "earlyMedia": "boolean"
}, },
"required": [ "required": [
"transcriptionHook",
"recognizer" "recognizer"
] ]
}, },

View File

@@ -11,6 +11,7 @@ class TaskTranscribe extends Task {
constructor(logger, opts, parentTask) { constructor(logger, opts, parentTask) {
super(logger, opts); super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint; this.preconditions = TaskPreconditions.Endpoint;
this.parentTask = parentTask;
this.transcriptionHook = this.data.transcriptionHook; this.transcriptionHook = this.data.transcriptionHook;
this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia); this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia);
@@ -76,6 +77,7 @@ class TaskTranscribe extends Task {
await this.awaitTaskDone(); await this.awaitTaskDone();
} catch (err) { } catch (err) {
this.logger.info(err, 'TaskTranscribe:exec - error'); this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
} }
ep.removeCustomEventListener(GoogleTranscriptionEvents.Transcription); ep.removeCustomEventListener(GoogleTranscriptionEvents.Transcription);
ep.removeCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected); ep.removeCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected);
@@ -227,8 +229,13 @@ class TaskTranscribe extends Task {
} }
this.logger.debug(evt, 'TaskTranscribe:_onTranscription'); this.logger.debug(evt, 'TaskTranscribe:_onTranscription');
this.cs.requestor.request(this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo)) if (this.transcriptionHook) {
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error')); this.cs.requestor.request(this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
}
if (this.parentTask) {
this.parentTask.emit('transcription', evt);
}
if (this.killed) { if (this.killed) {
this.logger.debug('TaskTranscribe:_onTranscription exiting after receiving final transcription'); this.logger.debug('TaskTranscribe:_onTranscription exiting after receiving final transcription');
this._clearTimer(); this._clearTimer();

View File

@@ -1,5 +1,6 @@
{ {
"TaskName": { "TaskName": {
"Cognigy": "cognigy",
"Conference": "conference", "Conference": "conference",
"Dequeue": "dequeue", "Dequeue": "dequeue",
"Dial": "dial", "Dial": "dial",

View File

@@ -67,7 +67,6 @@ module.exports = (logger) => {
})(); })();
} }
// send OPTIONS pings to SBCs
async function pingProxies(srf) { async function pingProxies(srf) {
if (process.env.NODE_ENV === 'test') return; if (process.env.NODE_ENV === 'test') return;
@@ -91,28 +90,33 @@ module.exports = (logger) => {
} }
} }
// OPTIONS ping the SBCs from each feature server every 60 seconds if (process.env.K8S) {
setInterval(() => { logger.info('disabling OPTIONS pings since we are running as a kubernetes service');
const {srf} = require('../..'); }
pingProxies(srf); else {
}, process.env.OPTIONS_PING_INTERVAL || 30000); // OPTIONS ping the SBCs from each feature server every 60 seconds
setInterval(() => {
const {srf} = require('../..');
pingProxies(srf);
}, process.env.OPTIONS_PING_INTERVAL || 30000);
// initial ping once we are up // initial ping once we are up
setTimeout(async() => { setTimeout(async() => {
const {srf} = require('../..'); const {srf} = require('../..');
// if SBCs are auto-scaling, monitor them as they come and go // if SBCs are auto-scaling, monitor them as they come and go
if (!process.env.JAMBONES_SBCS) { if (!process.env.JAMBONES_SBCS) {
const {monitorSet} = srf.locals.dbHelpers; const {monitorSet} = srf.locals.dbHelpers;
const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-sip`; const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-sip`;
await monitorSet(setName, 10, (members) => { await monitorSet(setName, 10, (members) => {
sbcs = members; sbcs = members;
logger.info(`sbc-pinger: SBC roster has changed, list of active SBCs is now ${sbcs}`); logger.info(`sbc-pinger: SBC roster has changed, list of active SBCs is now ${sbcs}`);
}); });
} }
pingProxies(srf); pingProxies(srf);
}, 1000); }, 1000);
}
return { return {
lifecycleEmitter, lifecycleEmitter,

View File

@@ -1,5 +1,8 @@
const CIDRMatcher = require('cidr-matcher'); const CIDRMatcher = require('cidr-matcher');
const matcher = new CIDRMatcher([process.env.JAMBONES_NETWORK_CIDR]); const cidrs = process.env.JAMBONES_NETWORK_CIDR
.split(',')
.map((s) => s.trim());
const matcher = new CIDRMatcher(cidrs);
module.exports = (sbcList) => { module.exports = (sbcList) => {
const obj = sbcList const obj = sbcList

1004
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -26,23 +26,25 @@
"jslint": "eslint app.js lib" "jslint": "eslint app.js lib"
}, },
"dependencies": { "dependencies": {
"@cognigy/socket-client": "^4.5.5",
"@jambonz/db-helpers": "^0.6.16", "@jambonz/db-helpers": "^0.6.16",
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.1", "@jambonz/mw-registrar": "^0.2.1",
"@jambonz/realtimedb-helpers": "^0.4.14", "@jambonz/realtimedb-helpers": "^0.4.14",
"@jambonz/stats-collector": "^0.1.5", "@jambonz/stats-collector": "^0.1.6",
"@jambonz/time-series": "^0.1.5", "@jambonz/time-series": "^0.1.5",
"aws-sdk": "^2.846.0", "aws-sdk": "^2.1036.0",
"bent": "^7.3.12", "bent": "^7.3.12",
"cidr-matcher": "^2.1.1", "cidr-matcher": "^2.1.1",
"debug": "^4.3.1", "debug": "^4.3.2",
"deepcopy": "^2.1.0", "deepcopy": "^2.1.0",
"drachtio-fsmrf": "^2.0.13", "drachtio-fsmrf": "^2.0.13",
"drachtio-srf": "^4.4.55", "drachtio-srf": "^4.4.55",
"express": "^4.17.1", "express": "^4.17.1",
"ip": "^1.1.5", "ip": "^1.1.5",
"moment": "^2.29.1", "moment": "^2.29.1",
"parse-url": "^5.0.2", "parse-url": "^5.0.7",
"pino": "^6.11.2", "pino": "^6.13.2",
"to-snake-case": "^1.0.0", "to-snake-case": "^1.0.0",
"uuid": "^8.3.2", "uuid": "^8.3.2",
"verify-aws-sns-signature": "^0.0.6", "verify-aws-sns-signature": "^0.0.6",