Compare commits

..

74 Commits

Author SHA1 Message Date
akirilyuk
b31a8db9de do not resolve gather on barge in 2022-02-27 00:01:38 +01:00
akirilyuk
1d1388583b fix linting remove logs 2022-02-26 23:37:06 +01:00
akirilyuk
942c73e64b fix start listening 2022-02-26 23:32:00 +01:00
akirilyuk
de61cc1d3d enhance error log 2022-02-26 23:30:36 +01:00
akirilyuk
32ebc7017d add try catch in init speech 2022-02-26 23:30:12 +01:00
akirilyuk
87ef510dea more logs 2022-02-26 23:27:37 +01:00
akirilyuk
22440843d9 add more logs 2022-02-26 23:27:00 +01:00
akirilyuk
48fa3133ef add more logs 2022-02-26 23:22:13 +01:00
akirilyuk
a3352cc932 resolve with speech on barge in 2022-02-26 23:12:58 +01:00
akirilyuk
2de54ddedf start listening if not say nor play provided 2022-02-26 22:21:24 +01:00
akirilyuk
812b40fcb0 add possibility to not listen after gather say/play finished 2022-02-26 22:05:21 +01:00
akirilyuk
9a0e33e4f7 add listenAfterSpeech to gather spec 2022-02-26 21:57:09 +01:00
Dave Horton
c7e141abf1 gather: support for min/max digits and interdigit timeout 2022-02-26 15:30:35 -05:00
Dave Horton
e30701c4b4 bugfix: gather handles interim results from azure 2022-02-26 14:49:34 -05:00
Dave Horton
c5d392af6a add bargein support to gather 2022-02-26 14:41:21 -05:00
Dave Horton
3c5d392407 Feature/ws api (#72)
initial changes to support websockets as an alternative to webhooks
2022-02-26 14:06:52 -05:00
Dave Horton
5bfc451c85 when running on kubernetes, use sbc-sip service rather than pinging sbcs 2022-02-23 12:27:34 -05:00
Dave Horton
47478fd409 fix possible exception 2022-02-19 09:57:51 -05:00
Dave Horton
c16a2662f2 bugfix: rest outdial issue caused by req.srf not properly set 2022-02-14 09:14:13 -05:00
Dave Horton
c1130adf03 merge 2022-02-12 10:12:45 -05:00
Dave Horton
f982f6c7d8 update to latest realtimedb-helpers 2022-02-12 10:10:03 -05:00
Snyk bot
f20190b0fc fix: upgrade aws-sdk from 2.1061.0 to 2.1062.0 (#69)
Snyk has created this PR to upgrade aws-sdk from 2.1061.0 to 2.1062.0.

See this package in npm:
https://www.npmjs.com/package/aws-sdk

See this project in Snyk:
https://app.snyk.io/org/davehorton/project/cec90d0e-0ded-433e-a42e-fe78b28ae489?utm_source=github&utm_medium=referral&page=upgrade-pr
2022-02-12 09:23:13 -05:00
Snyk bot
74e85e1b16 fix: upgrade aws-sdk from 2.1060.0 to 2.1061.0 (#68)
Snyk has created this PR to upgrade aws-sdk from 2.1060.0 to 2.1061.0.

See this package in npm:
https://www.npmjs.com/package/aws-sdk

See this project in Snyk:
https://app.snyk.io/org/davehorton/project/cec90d0e-0ded-433e-a42e-fe78b28ae489?utm_source=github&utm_medium=referral&page=upgrade-pr
2022-02-11 07:48:02 -05:00
Dave Horton
63e9cb985e allow target-level headers on outdials (#29) 2022-02-10 14:34:21 -05:00
Dave Horton
2e88ab1f55 bugfix: race condition on hangup sometimes resulted in outbound call attempt even though caller had hung up 2022-02-10 12:15:25 -05:00
Dave Horton
7f75a35515 bugfix: race condition on hangup could cause us to send dup webhook 2022-02-10 11:16:57 -05:00
Dave Horton
941727e93f add fs_public_ip to webhook payload (only when running in ec2 autoscale group) 2022-02-10 09:51:48 -05:00
Dave Horton
d8bfa33a00 include fs_sip_address and api_base_url in webhook paylods 2022-02-10 09:19:33 -05:00
Dave Horton
30ed5b6a02 add support for vad to gather and transcribe (#67) 2022-02-10 08:45:16 -05:00
Dave Horton
bac1b7f2c6 bump version 2022-02-09 15:42:27 -05:00
Dave Horton
48deb3ae89 update to latest @jambonz/realtimedb-helpers with support for redis username / password auth 2022-02-09 15:21:55 -05:00
Dave Horton
de83f735ea memory leak fixes 2022-02-08 20:33:16 -05:00
Dave Horton
cfe9397502 lint 2022-02-03 07:36:01 -05:00
Dave Horton
dda3335060 update deps, add helmet middleware 2022-02-03 07:31:30 -05:00
Dave Horton
2329f0cda0 child tasks must remove reference to parent on kill or else entangled parent-child tasks will not be gc'ed 2022-02-01 11:00:12 -05:00
Dave Horton
36683dc151 bugfix: include custom jambonz headers on rest outdial 2022-01-28 13:36:06 -05:00
Dave Horton
ce738a7852 0.7.2 version 2022-01-28 09:16:05 -05:00
Dave Horton
77a696a0dc update to latest synthAudio with minor fixes 2022-01-27 13:52:35 -05:00
Dave Horton
62ff44540d more changes for wellsaid 2022-01-27 10:55:32 -05:00
Dave Horton
e5821cddf8 further fix for wellsaid tts 2022-01-27 10:46:16 -05:00
Dave Horton
25567a7842 add support for retrieving wellsaid speech credential 2022-01-27 10:34:30 -05:00
Dave Horton
40bd3c9c88 update to realtimedb-helpers with support for wellsaid tts 2022-01-27 10:13:18 -05:00
Dave Horton
27d6d32359 bugfix: rtpengine needs to transcode when different codecs are used on A and B legs 2022-01-26 07:37:09 -05:00
Dave Horton
142f5d409f use smpp service name when running in kubernetes 2022-01-25 13:29:16 -05:00
Dave Horton
da4a7184a4 update to realtimedb-helpers with engine caching fix for tts 2022-01-22 15:35:01 -05:00
Dave Horton
2c72bf50cd sync package-lock.json 2022-01-21 22:04:07 -05:00
Dave Horton
b27f349fc6 linting 2022-01-21 10:15:33 -05:00
Dave Horton
138aa5836a lock version 2022-01-21 10:13:42 -05:00
Dave Horton
e1a023c21e bugfix: aws property is engine not platform 2022-01-21 09:57:58 -05:00
Dave Horton
8acb4d1a24 #58 - add support for platform (standard, or neural) when using aws tts 2022-01-19 19:46:24 -05:00
Dave Horton
26d4bfb63b Cognigy: settings tweaks 2022-01-18 19:49:46 -05:00
Dave Horton
45dcab8517 fix linting error 2022-01-17 20:37:32 -05:00
Dave Horton
27e3cba00b fix vulnerabilities 2022-01-17 18:41:12 -05:00
Dave Horton
097f36cb00 bugix: re-invites after releasing media fail 2022-01-17 13:11:19 -05:00
Dave Horton
752eed428f cognigy: when use azuyre tts, request detailed output format 2022-01-14 08:48:55 -05:00
Dave Horton
afb874aabc minor logging change 2022-01-14 07:56:11 -05:00
Dave Horton
59227febf9 K8s (#57)
* JAMBONES_NETWORK_CIDR not needed for K8S

* fix bug setting fsUUID in K8S scenario

* bugfix: dial music was not stopped when a dial verb times out (#56)
2022-01-09 14:57:46 -05:00
Dave Horton
8593f12b51 add custom headers to outdial, save unique uuid for running FS to redis 2022-01-08 11:50:18 -05:00
Dave Horton
3bf1984854 K8s changes (#55)
* K8S: dont send OPTIONS pings

* fix missing ref

* k8s pre-stop hook added

* k8s pre-stop hook changes

* chmod +x utility

* more k8s pre-stop changes

* pre stop

* fix healthcheck

* k8s pre-stop working

* add readiness probe

* fix bug in pre-stop

* logging

* revamp k8s pre-stop a bit

* initial support for cognigy bot

* more cognigy changes

* switch to use transcribe for cognigy

* #54 include callInfo in dialogflow event payload
2022-01-06 12:41:14 -05:00
Dave Horton
0e45e9b27c add target.overrideTo to specs 2021-12-22 08:32:56 -05:00
Dave Horton
b0a8a6828d bugfix: use of tag resulted in redis insert failures 2021-12-21 20:42:53 -05:00
Dave Horton
27d4ad5674 bump version 2021-12-21 09:39:44 -05:00
Dave Horton
d38e77c06c bugfix: support looking up application by regex in addition to exact phone number match 2021-12-20 15:37:21 -05:00
Dave Horton
c9e2a162c2 lookupAppByPhoneNumber: pass voip_carrier_sid if available 2021-12-20 10:04:54 -05:00
Dave Horton
2b9cb5105f clean up handlers 2021-12-15 19:33:31 -05:00
Dave Horton
afbbed3f5c default options ping interval to 30s, with env override if desired 2021-12-14 12:28:02 -05:00
Dave Horton
f642967f02 add SIGTERM handler 2021-12-13 18:08:53 -05:00
Dave Horton
fbe2aa2c06 add SIGUSR2 handler to remove fs from redis set 2021-12-13 17:59:23 -05:00
Dave Horton
5321b5c651 minor change to dial _releaseMedia 2021-12-13 13:22:09 -05:00
Dave Horton
83c114803f minor logging 2021-12-13 11:48:43 -05:00
Dave Horton
0663174f46 additional logging 2021-12-12 09:30:36 -05:00
Dave Horton
3d4359fbe4 fix bug from prev checkin, destroy does not return a promise 2021-12-09 11:24:52 -05:00
Dave Horton
10382573fa clean up some retainers 2021-12-09 10:44:50 -05:00
Dave Horton
c190279927 bugfix: enqueue task was only invoking waitUrl a single time 2021-12-06 21:18:51 -05:00
42 changed files with 2627 additions and 764 deletions

51
.github/workflows/docker-publish.yml vendored Normal file
View File

@@ -0,0 +1,51 @@
name: Docker
on:
push:
# Publish `main` as Docker `latest` image.
branches:
- main
# Publish `v1.2.3` tags as releases.
tags:
- v*
env:
IMAGE_NAME: feature-server
jobs:
push:
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v2
- name: Build image
run: docker build . --file Dockerfile --tag $IMAGE_NAME
- name: Log into registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
- name: Push image
run: |
IMAGE_ID=ghcr.io/${{ github.repository_owner }}/$IMAGE_NAME
# Change all uppercase to lowercase
IMAGE_ID=$(echo $IMAGE_ID | tr '[A-Z]' '[a-z]')
# Strip git ref prefix from version
VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,')
# Strip "v" prefix from tag name
[[ "${{ github.ref }}" == "refs/tags/"* ]] && VERSION=$(echo $VERSION | sed -e 's/^v//')
# Use Docker `latest` tag convention
[ "$VERSION" == "main" ] && VERSION=latest
echo IMAGE_ID=$IMAGE_ID
echo VERSION=$VERSION
docker tag $IMAGE_NAME $IMAGE_ID:$VERSION
docker push $IMAGE_ID:$VERSION

View File

@@ -1,4 +1,4 @@
FROM node:16
FROM node:17.4-slim
WORKDIR /opt/app/
COPY package.json ./
RUN npm install

23
app.js
View File

@@ -7,7 +7,7 @@ assert.ok(process.env.DRACHTIO_PORT || process.env.DRACHTIO_HOST, 'missing DRACH
assert.ok(process.env.DRACHTIO_SECRET, 'missing DRACHTIO_SECRET env var');
assert.ok(process.env.JAMBONES_FREESWITCH, 'missing JAMBONES_FREESWITCH env var');
assert.ok(process.env.JAMBONES_REDIS_HOST, 'missing JAMBONES_REDIS_HOST env var');
assert.ok(process.env.JAMBONES_NETWORK_CIDR, 'missing JAMBONES_SUBNET env var');
assert.ok(process.env.JAMBONES_NETWORK_CIDR || process.env.K8S, 'missing JAMBONES_SUBNET env var');
const Srf = require('drachtio-srf');
const srf = new Srf();
@@ -17,7 +17,7 @@ const opts = {
level: process.env.JAMBONES_LOGLEVEL || 'info'
};
const logger = require('pino')(opts);
const {LifeCycleEvents} = require('./lib/utils/constants');
const {LifeCycleEvents, FS_UUID_SET_NAME} = require('./lib/utils/constants');
const installSrfLocals = require('./lib/utils/install-srf-locals');
installSrfLocals(srf, logger);
@@ -31,6 +31,7 @@ const {
// HTTP
const express = require('express');
const helmet = require('helmet');
const app = express();
Object.assign(app.locals, {
logger,
@@ -73,6 +74,8 @@ srf.invite((req, res) => {
});
// HTTP
app.use(helmet());
app.use(helmet.hidePoweredBy());
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use('/', httpRoutes);
@@ -92,6 +95,10 @@ sessionTracker.on('idle', () => {
}
});
const getCount = () => sessionTracker.count;
const healthCheck = require('@jambonz/http-health-check');
healthCheck({app, logger, path: '/', fn: getCount});
setInterval(() => {
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
}, 5000);
@@ -105,4 +112,16 @@ const disconnect = () => {
});
};
process.on('SIGUSR2', handle);
process.on('SIGTERM', handle);
function handle(signal) {
const {removeFromSet} = srf.locals.dbHelpers;
const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-fs`;
logger.info(`got signal ${signal}, removing ${srf.locals.localSipAddress} from set ${setName}`);
removeFromSet(setName, srf.locals.localSipAddress);
removeFromSet(FS_UUID_SET_NAME, srf.locals.fsUUID);
srf.locals.disabled = true;
}
module.exports = {srf, logger, disconnect};

29
bin/k8s-pre-stop-hook.js Executable file
View File

@@ -0,0 +1,29 @@
#!/usr/bin/env node
const bent = require('bent');
const getJSON = bent('json');
const PORT = process.env.HTTP_PORT || 3000;
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
(async function() {
try {
do {
const obj = await getJSON(`http://127.0.0.1:${PORT}/`);
const {calls} = obj;
if (calls === 0) {
console.log('no calls on the system, we can exit');
process.exit(0);
}
else {
console.log(`waiting for ${calls} to exit..`);
}
await sleep(10000);
} while (1);
} catch (err) {
console.error(err, 'Error querying health endpoint');
process.exit(-1);
}
})();

View File

@@ -6,7 +6,8 @@ const {CallDirection, CallStatus} = require('../../utils/constants');
const { v4: uuidv4 } = require('uuid');
const SipError = require('drachtio-srf').SipError;
const sysError = require('./error');
const Requestor = require('../../utils/requestor');
const HttpRequestor = require('../../utils/http-requestor');
const WsRequestor = require('../../utils/ws-requestor');
const dbUtils = require('../../utils/db-utils');
router.post('/', async(req, res) => {
@@ -35,6 +36,8 @@ router.post('/', async(req, res) => {
opts.headers = {
...opts.headers,
'X-Jambonz-Routing': target.type,
'X-Jambonz-FS-UUID': srf.locals.fsUUID,
'X-Call-Sid': callSid,
'X-Account-Sid': req.body.account_sid
};
@@ -102,11 +105,16 @@ router.post('/', async(req, res) => {
* attach our requestor and notifier objects
* these will be used for all http requests we make during this call
*/
app.requestor = new Requestor(logger, account.account_sid, app.call_hook, account.webhook_secret);
if (app.call_status_hook) {
app.notifier = new Requestor(logger, account.account_sid, app.call_status_hook, account.webhook_secret);
if ('WS' === app.call_hook?.method) {
app.requestor = new WsRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret) ;
app.notifier = app.requestor;
}
else {
app.requestor = new HttpRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret);
if (app.call_status_hook) app.notifier = new HttpRequestor(logger, account.account_sid, app.call_status_hook,
account.webhook_secret);
else app.notifier = {request: () => {}};
}
else app.notifier = {request: () => {}};
/* now launch the outdial */
try {
@@ -120,6 +128,7 @@ router.post('/', async(req, res) => {
res.status(500).send('Call Failure');
return;
}
inviteReq.srf = srf;
/* ok our outbound INVITE is in flight */
const tasks = [restDial];

View File

@@ -9,8 +9,4 @@ api.use('/enqueue', require('./enqueue'));
api.use('/messaging', require('./messaging')); // inbound SMS
api.use('/createMessage', require('./create-message')); // outbound SMS (REST)
// health checks
api.get('/', (req, res) => res.sendStatus(200));
api.get('/health', (req, res) => res.sendStatus(200));
module.exports = api;

View File

@@ -1,5 +1,6 @@
const router = require('express').Router();
const Requestor = require('../../utils/requestor');
const HttpRequestor = require('../../utils/http-requestor');
const WsRequestor = require('../../utils/ws-requestor');
const CallInfo = require('../../session/call-info');
const {CallDirection} = require('../../utils/constants');
const SmsSession = require('../../session/sms-call-session');
@@ -18,7 +19,17 @@ router.post('/:partner', async(req, res) => {
const app = req.body.app;
const account = await lookupAccountBySid(app.accountSid);
const hook = app.messaging_hook;
const requestor = new Requestor(logger, account.account_sid, hook, account.webhook_secret);
let requestor;
if ('WS' === hook?.method) {
app.requestor = new WsRequestor(logger, account.account_sid, hook, account.webhook_secret) ;
app.notifier = app.requestor;
}
else {
app.requestor = new HttpRequestor(logger, account.account_sid, hook, account.webhook_secret);
app.notifier = {request: () => {}};
}
const payload = {
carrier: req.params.partner,
messageSid: app.messageSid,
@@ -33,7 +44,7 @@ router.post('/:partner', async(req, res) => {
res.status(200).json({sid: req.body.messageSid});
try {
tasks = await requestor.request(hook, payload);
tasks = await requestor.request('session:new', hook, payload);
logger.info({tasks}, 'response from incoming SMS webhook');
} catch (err) {
logger.error({err, hook}, 'Error sending incoming SMS message');

View File

@@ -1,16 +1,23 @@
const express = require('express');
const api = require('./api');
const routes = express.Router();
const sessionTracker = require('../session/session-tracker');
const readiness = (req, res) => {
const logger = req.app.locals.logger;
const {count} = sessionTracker;
const {srf} = require('../..');
const {getFreeswitch} = srf.locals;
if (getFreeswitch()) {
return res.status(200).json({calls: count});
}
logger.info('responding to /health check with failure as freeswitch is not up');
res.sendStatus(480);
};
routes.use('/v1', api);
// health checks
routes.get('/', (req, res) => {
res.sendStatus(200);
});
routes.get('/health', (req, res) => {
res.sendStatus(200);
});
// health check
routes.get('/health', readiness);
module.exports = routes;

View File

@@ -1,14 +1,21 @@
const { v4: uuidv4 } = require('uuid');
const {CallDirection} = require('./utils/constants');
const CallInfo = require('./session/call-info');
const Requestor = require('./utils/requestor');
const HttpRequestor = require('./utils/http-requestor');
const WsRequestor = require('./utils/ws-requestor');
const makeTask = require('./tasks/make_task');
const parseUri = require('drachtio-srf').parseUri;
const normalizeJambones = require('./utils/normalize-jambones');
const dbUtils = require('./utils/db-utils');
module.exports = function(srf, logger) {
const {lookupAppByPhoneNumber, lookupAppBySid, lookupAppByRealm, lookupAppByTeamsTenant} = srf.locals.dbHelpers;
const {
lookupAppByPhoneNumber,
lookupAppByRegex,
lookupAppBySid,
lookupAppByRealm,
lookupAppByTeamsTenant
} = srf.locals.dbHelpers;
const {lookupAccountDetails} = dbUtils(logger, srf);
function initLocals(req, res, next) {
const callSid = req.has('X-Retain-Call-Sid') ? req.get('X-Retain-Call-Sid') : uuidv4();
@@ -99,7 +106,7 @@ module.exports = function(srf, logger) {
}
else {
const uri = parseUri(req.uri);
const arr = /context-(.*)/.exec(uri.user);
const arr = /context-(.*)/.exec(uri?.user);
if (arr) {
// this is a transfer from another feature server
const {retrieveKey, deleteKey} = srf.locals.dbHelpers;
@@ -112,7 +119,15 @@ module.exports = function(srf, logger) {
logger.error(err, `Error retrieving transferred call app for ${arr[1]}`);
}
}
else app = await lookupAppByPhoneNumber(req.locals.calledNumber);
else {
const voip_carrier_sid = req.get('X-Voip-Carrier-Sid');
app = await lookupAppByPhoneNumber(req.locals.calledNumber, voip_carrier_sid);
if (!app) {
/* lookup by call_routes.regex */
app = await lookupAppByRegex(req.locals.calledNumber, account_sid);
}
}
}
if (!app || !app.call_hook || !app.call_hook.url) {
@@ -128,10 +143,18 @@ module.exports = function(srf, logger) {
* create a requestor that we will use for all http requests we make during the call.
* also create a notifier for call status events (if not needed, its a no-op).
*/
app.requestor = new Requestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret);
if (app.call_status_hook) app.notifier = new Requestor(logger, account_sid, app.call_status_hook,
accountInfo.account.webhook_secret);
else app.notifier = {request: () => {}};
if ('WS' === app.call_hook?.method ||
app.call_hook?.url.startsWith('ws://') || app.call_hook?.url.startsWith('wss://')) {
app.requestor = new WsRequestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret) ;
app.notifier = app.requestor;
app.call_hook.method = 'WS';
}
else {
app.requestor = new HttpRequestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret);
if (app.call_status_hook) app.notifier = new HttpRequestor(logger, account_sid, app.call_status_hook,
accountInfo.account.webhook_secret);
else app.notifier = {request: () => {}};
}
req.locals.application = app;
const obj = Object.assign({}, app);
@@ -162,15 +185,15 @@ module.exports = function(srf, logger) {
return next();
}
/* retrieve the application to execute for this inbound call */
const params = Object.assign(app.call_hook.method === 'POST' ? {sip: req.msg} : {},
const params = Object.assign(['POST', 'WS'].includes(app.call_hook.method) ? {sip: req.msg} : {},
req.locals.callInfo);
const json = await app.requestor.request(app.call_hook, params);
const json = await app.requestor.request('session:new', app.call_hook, params);
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));
if (0 === app.tasks.length) throw new Error('no application provided');
next();
} catch (err) {
logger.info({err}, `Error retrieving or parsing application: ${err.message}`);
res.send(480, {headers: {'X-Reason': err.message}});
logger.info({err}, `Error retrieving or parsing application: ${err?.message}`);
res.send(480, {headers: {'X-Reason': err?.message || 'unknown'}});
}
}

View File

@@ -1,7 +1,6 @@
const {CallDirection, CallStatus} = require('../utils/constants');
const parseUri = require('drachtio-srf').parseUri;
const { v4: uuidv4 } = require('uuid');
/**
* @classdesc Represents the common information for all calls
* that is provided in call status webhooks
@@ -9,6 +8,7 @@ const { v4: uuidv4 } = require('uuid');
class CallInfo {
constructor(opts) {
let from ;
let srf;
this.direction = opts.direction;
if (opts.req) {
const u = opts.req.getParsedHeader('from');
@@ -19,6 +19,7 @@ class CallInfo {
if (this.direction === CallDirection.Inbound) {
// inbound call
const {app, req} = opts;
srf = req.srf;
this.callSid = req.locals.callSid,
this.accountSid = app.account_sid,
this.applicationSid = app.application_sid;
@@ -33,6 +34,7 @@ class CallInfo {
else if (opts.parentCallInfo) {
// outbound call that is a child of an existing call
const {req, parentCallInfo, to, callSid} = opts;
srf = req.srf;
this.callSid = callSid || uuidv4();
this.parentCallSid = parentCallInfo.callSid;
this.accountSid = parentCallInfo.accountSid;
@@ -47,6 +49,7 @@ class CallInfo {
else if (this.direction === CallDirection.None) {
// outbound SMS
const {messageSid, accountSid, applicationSid, res} = opts;
srf = res.srf;
this.messageSid = messageSid;
this.accountSid = accountSid;
this.applicationSid = applicationSid;
@@ -55,6 +58,7 @@ class CallInfo {
else {
// outbound call triggered by REST
const {req, callSid, accountSid, applicationSid, to, tag} = opts;
srf = req.srf;
this.callSid = callSid;
this.accountSid = accountSid;
this.applicationSid = applicationSid;
@@ -65,6 +69,11 @@ class CallInfo {
this.to = to;
if (tag) this._customerData = tag;
}
this.localSipAddress = srf.locals.localSipAddress;
if (srf.locals.publicIp) {
this.publicIp = srf.locals.publicIp;
}
}
/**
@@ -100,7 +109,8 @@ class CallInfo {
callStatus: this.callStatus,
callerId: this.callerId,
accountSid: this.accountSid,
applicationSid: this.applicationSid
applicationSid: this.applicationSid,
fsSipAddress: this.localSipAddress
};
['parentCallSid', 'originatingSipIp', 'originatingSipTrunkName'].forEach((prop) => {
if (this[prop]) obj[prop] = this[prop];
@@ -110,6 +120,13 @@ class CallInfo {
if (this._customerData) {
Object.assign(obj, {customerData: this._customerData});
}
if (process.env.JAMBONES_API_BASE_URL) {
Object.assign(obj, {apiBaseUrl: process.env.JAMBONES_API_BASE_URL});
}
if (this.publicIp) {
Object.assign(obj, {fsPublicIp: this.publicIp});
}
return obj;
}

View File

@@ -7,7 +7,8 @@ const sessionTracker = require('./session-tracker');
const makeTask = require('../tasks/make_task');
const normalizeJambones = require('../utils/normalize-jambones');
const listTaskNames = require('../utils/summarize-tasks');
const Requestor = require('../utils/requestor');
const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
@@ -48,6 +49,7 @@ class CallSession extends Emitter {
this.taskIdx = 0;
this.stackIdx = 0;
this.callGone = false;
this.notifiedComplete = false;
this.tmpFiles = new Set();
@@ -61,6 +63,8 @@ class CallSession extends Emitter {
}
this._pool = srf.locals.dbHelpers.pool;
this.requestor.on('command', this._onCommand.bind(this));
}
/**
@@ -264,6 +268,12 @@ class CallSession extends Emitter {
region: credential.region
};
}
else if ('wellsaid' === vendor) {
return {
speech_credential_sid: credential.speech_credential_sid,
api_key: credential.api_key
};
}
}
else {
writeAlerts({
@@ -282,6 +292,7 @@ class CallSession extends Emitter {
*/
async exec() {
this.logger.info({tasks: listTaskNames(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`);
while (this.tasks.length && !this.callGone) {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
@@ -295,7 +306,7 @@ class CallSession extends Emitter {
this.logger.info(`CallSession:exec completed task #${stackNum}:${taskNum}: ${task.name}`);
} catch (err) {
this.currentTask = null;
if (err.message.includes(BADPRECONDITIONS)) {
if (err.message?.includes(BADPRECONDITIONS)) {
this.logger.info(`CallSession:exec task #${stackNum}:${taskNum}: ${task.name}: ${err.message}`);
}
else {
@@ -303,6 +314,16 @@ class CallSession extends Emitter {
break;
}
}
if (0 === this.tasks.length && this.hasStableDialog && this.requestor instanceof WsRequestor) {
try {
await this._awaitCommandsOrHangup();
if (!this.hasStableDialog || this.callGone) break;
} catch (err) {
this.logger.info(err, 'CallSession:exec - error waiting for new commands');
break;
}
}
}
// all done - cleanup
@@ -361,6 +382,10 @@ class CallSession extends Emitter {
this.currentTask.kill(this);
this.currentTask = null;
}
if (this.wakeupResolver) {
this.wakeupResolver();
this.wakeupResolver = null;
}
}
/**
@@ -397,29 +422,42 @@ class CallSession extends Emitter {
*/
async _lccCallHook(opts) {
const webhooks = [];
let sd;
if (opts.call_hook) webhooks.push(this.requestor.request(opts.call_hook, this.callInfo.toJSON()));
if (opts.child_call_hook) {
/* child call hook only allowed from a connected Dial state */
const task = this.currentTask;
sd = task.sd;
if (task && TaskName.Dial === task.name && sd) {
webhooks.push(this.requestor.request(opts.child_call_hook, sd.callInfo.toJSON()));
let sd, tasks, childTasks;
if (opts.call_hook || opts.child_call_hook) {
if (opts.call_hook) {
webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON()));
}
if (opts.child_call_hook) {
/* child call hook only allowed from a connected Dial state */
const task = this.currentTask;
sd = task.sd;
if (task && TaskName.Dial === task.name && sd) {
webhooks.push(this.requestor.request('session:redirect', opts.child_call_hook, sd.callInfo.toJSON()));
}
}
const [tasks1, tasks2] = await Promise.all(webhooks);
if (opts.call_hook) {
tasks = tasks1;
if (opts.child_call_hook) childTasks = tasks2;
}
else childTasks = tasks1;
}
const [tasks1, tasks2] = await Promise.all(webhooks);
let tasks, childTasks;
if (opts.call_hook) {
tasks = tasks1;
if (opts.child_call_hook) childTasks = tasks2;
else if (opts.parent_call || opts.child_call) {
const {parent_call, child_call} = opts;
assert.ok(!parent_call || Array.isArray(parent_call), 'CallSession:_lccCallHook - parent_call must be an array');
assert.ok(!child_call || Array.isArray(child_call), 'CallSession:_lccCallHook - child_call must be an array');
tasks = parent_call;
childTasks = child_call;
}
else childTasks = tasks1;
if (childTasks) {
const {parentLogger} = this.srf.locals;
const childLogger = parentLogger.child({callId: this.callId, callSid: sd.callSid});
const t = normalizeJambones(childLogger, childTasks).map((tdata) => makeTask(childLogger, tdata));
childLogger.info({tasks: listTaskNames(t)}, 'CallSession:_lccCallHook new task list for child call');
// TODO: if using websockets api, we need a new websocket for the adulting session..
const cs = await sd.doAdulting({
logger: childLogger,
application: this.application,
@@ -597,6 +635,59 @@ class CallSession extends Emitter {
this.taskIdx = 0;
}
_onCommand({msgid, command, queueCommand, data}) {
this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command');
switch (command) {
case 'redirect':
if (Array.isArray(data)) {
const t = normalizeJambones(this.logger, data).map((tdata) => makeTask(this.logger, tdata));
if (!queueCommand) {
this.logger.info({tasks: listTaskNames(t)}, 'CallSession:_onCommand new task list');
this.replaceApplication(t);
}
else {
this.logger.info({t, tasks: this.tasks}, 'CallSession:_onCommand - about to queue tasks');
this.tasks.push(...t);
this.logger.debug({tasks: this.tasks}, 'CallSession:_onCommand - tasks have been queued');
}
}
else this._lccCallHook(data);
break;
case 'call:status':
this._lccCallStatus(data);
break;
case 'mute:status':
this._lccMuteStatus(data);
break;
case 'conf:mute-status':
this._lccConfMuteStatus(data);
break;
case 'conf:hold-status':
this._lccConfHoldStatus(data);
break;
case 'listen:status':
this._lccListenStatus(data);
break;
case 'whisper':
this._lccWhisper(data);
break;
default:
this.logger.info(`CallSession:_onCommand - invalid command ${command}`);
}
if (this.wakeupResolver) {
this.logger.info('CallSession:_onCommand - got commands, waking up..');
this.wakeupResolver();
this.wakeupResolver = null;
}
}
_evaluatePreconditions(task) {
switch (task.preconditions) {
case TaskPreconditions.None:
@@ -632,7 +723,7 @@ class CallSession extends Emitter {
try {
if (!this.ms) this.ms = this.getMS();
const ep = await this.ms.createEndpoint({remoteSdp: this.req.body});
ep.cs = this;
//ep.cs = this;
this.ep = ep;
ep.set({
hangup_after_bridge: false,
@@ -716,14 +807,12 @@ class CallSession extends Emitter {
/**
* Hang up the call and free the media endpoint
*/
async _clearResources() {
_clearResources() {
for (const resource of [this.dlg, this.ep]) {
try {
if (resource && resource.connected) await resource.destroy();
} catch (err) {
this.logger.error(err, 'CallSession:_clearResources error');
}
if (resource && resource.connected) resource.destroy();
}
this.dlg = null;
this.ep = null;
// remove any temporary tts files that were created (audio is still cached in redis)
for (const path of this.tmpFiles) {
@@ -735,6 +824,7 @@ class CallSession extends Emitter {
});
}
this.tmpFiles.clear();
this.requestor && this.requestor.close();
}
/**
@@ -782,9 +872,19 @@ class CallSession extends Emitter {
async _onReinvite(req, res) {
try {
const newSdp = await this.ep.modify(req.body);
res.send(200, {body: newSdp});
this.logger.info({offer: req.body, answer: newSdp}, 'handling reINVITE');
if (this.ep) {
const newSdp = await this.ep.modify(req.body);
res.send(200, {body: newSdp});
this.logger.info({offer: req.body, answer: newSdp}, 'handling reINVITE');
}
else if (this.currentTask && this.currentTask.name === TaskName.Dial) {
this.logger.info('handling reINVITE after media has been released');
await this.currentTask.handleReinviteAfterMediaReleased(req, res);
}
else {
this.logger.info('got reINVITE but no endpoint and media has not been released');
res.send(488);
}
} catch (err) {
this.logger.error(err, 'Error handling reinvite');
}
@@ -826,7 +926,7 @@ class CallSession extends Emitter {
}
else {
this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found');
this.queueEventHookRequestor = new Requestor(this.logger, this.accountSid,
this.queueEventHookRequestor = new HttpRequestor(this.logger, this.accountSid,
r[0], this.webhook_secret);
this.queueEventHook = r[0];
}
@@ -840,7 +940,7 @@ class CallSession extends Emitter {
/* send webhook */
const params = {...obj, ...this.callInfo.toJSON()};
this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook');
this.queueEventHookRequestor.request(this.queueEventHook, params)
this.queueEventHookRequestor.request('queue:status', this.queueEventHook, params)
.catch((err) => {
this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event');
});
@@ -929,6 +1029,10 @@ class CallSession extends Emitter {
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
this.logger.debug('CallSession: call terminated by jambones');
origDestroy();
if (this.wakeupResolver) {
this.wakeupResolver();
this.wakeupResolver = null;
}
}
};
}
@@ -955,6 +1059,13 @@ class CallSession extends Emitter {
});
}
async handleReinviteAfterMediaReleased(req, res) {
assert(this.dlg && this.dlg.connected && !this.ep);
const sdp = await this.dlg.modify(req.body);
this.logger.info({sdp}, 'CallSession:handleReinviteAfterMediaReleased - reinvite to A leg returned sdp');
res.send(200, {body: sdp});
}
/**
* Called any time call status changes. This method both invokes the
* call_status_hook callback as well as updates the realtime database
@@ -967,6 +1078,12 @@ class CallSession extends Emitter {
_notifyCallStatusChange({callStatus, sipStatus, duration}) {
if (this.callMoved) return;
/* race condition: we hang up at the same time as the caller */
if (callStatus === CallStatus.Completed) {
if (this.notifiedComplete) return;
this.notifiedComplete = true;
}
assert((typeof duration === 'number' && callStatus === CallStatus.Completed) ||
(!duration && callStatus !== CallStatus.Completed),
'duration MUST be supplied when call completed AND ONLY when call completed');
@@ -974,7 +1091,7 @@ class CallSession extends Emitter {
this.callInfo.updateCallStatus(callStatus, sipStatus);
if (typeof duration === 'number') this.callInfo.duration = duration;
try {
this.notifier.request(this.call_status_hook, this.callInfo.toJSON());
this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON());
} catch (err) {
this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}
@@ -984,6 +1101,14 @@ class CallSession extends Emitter {
this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl)
.catch((err) => this.logger.error(err, 'redis error'));
}
_awaitCommandsOrHangup() {
assert(!this.wakeupResolver);
return new Promise((resolve, reject) => {
this.logger.info('_awaitCommandsOrHangup - waiting...');
this.wakeupResolver = resolve;
});
}
}
module.exports = CallSession;

View File

@@ -21,15 +21,17 @@ class InboundCallSession extends CallSession {
this.req = req;
this.res = res;
req.on('cancel', () => {
this._notifyCallStatusChange({callStatus: CallStatus.NoAnswer, sipStatus: 487});
this._callReleased();
});
req.once('cancel', this._onCancel.bind(this));
this.on('callStatusChange', this._notifyCallStatusChange.bind(this));
this._notifyCallStatusChange({callStatus: CallStatus.Trying, sipStatus: 100});
}
_onCancel() {
this._notifyCallStatusChange({callStatus: CallStatus.NoAnswer, sipStatus: 487});
this._callReleased();
}
_onTasksDone() {
if (!this.res.finalResponseSent) {
if (this._mediaServerFailure) {
@@ -45,6 +47,7 @@ class InboundCallSession extends CallSession {
this.res.send(603);
}
}
this.req.removeAllListeners('cancel');
}
/**
@@ -56,6 +59,7 @@ class InboundCallSession extends CallSession {
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
this.logger.debug('InboundCallSession: caller hung up');
this._callReleased();
this.req.removeAllListeners('cancel');
}
}

248
lib/tasks/cognigy.js Normal file
View File

@@ -0,0 +1,248 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const makeTask = require('./make_task');
const { SocketClient } = require('@cognigy/socket-client');
const parseGallery = (obj = {}) => {
const {_default} = obj;
if (_default) {
const {_gallery} = _default;
if (_gallery) return _gallery.fallbackText;
}
};
const parseQuickReplies = (obj) => {
const {_default} = obj;
if (_default) {
const {_quickReplies} = _default;
if (_quickReplies) return _quickReplies.text || _quickReplies.fallbackText;
}
};
const parseBotText = (evt) => {
const {text, data} = evt;
if (text) return text;
switch (data?.type) {
case 'quickReplies':
return parseQuickReplies(data?._cognigy);
case 'gallery':
return parseGallery(data?._cognigy);
default:
break;
}
};
class Cognigy extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.url = this.data.url;
this.token = this.data.token;
this.prompt = this.data.prompt;
this.eventHook = this.data?.eventHook;
this.actionHook = this.data?.actionHook;
this.data = this.data.data || {};
this.prompts = [];
}
get name() { return TaskName.Cognigy; }
get hasReportedFinalAction() {
return this.reportedFinalAction || this.isReplacingApplication;
}
async exec(cs, ep) {
await super.exec(cs);
this.ep = ep;
try {
/* set event handlers and start transcribing */
this.on('transcription', this._onTranscription.bind(this, cs, ep));
this.on('error', this._onError.bind(this, cs, ep));
this.transcribeTask = this._makeTranscribeTask();
this.transcribeTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy transcribe task returned error');
this.notifyTaskDone();
});
if (this.prompt) {
this.sayTask = this._makeSayTask(this.prompt);
this.sayTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy say task returned error');
this.notifyTaskDone();
});
}
/* connect to the bot and send initial data */
this.client = new SocketClient(
this.url,
this.token,
{
sessionId: cs.callSid,
channel: 'jambonz',
forceWebsockets: true,
reconnection: true,
settings: {
enableTypingIndicator: false
}
}
);
this.client.on('output', this._onBotUtterance.bind(this, cs, ep));
this.client.on('typingStatus', this._onBotTypingStatus.bind(this, cs, ep));
this.client.on('error', this._onBotError.bind(this, cs, ep));
this.client.on('finalPing', this._onBotFinalPing.bind(this, cs, ep));
await this.client.connect();
this.client.sendMessage('', {...this.data, ...cs.callInfo});
await this.awaitTaskDone();
} catch (err) {
this.logger.error({err}, 'Cognigy error');
throw err;
}
}
async kill(cs) {
super.kill(cs);
this.logger.debug('Cognigy:kill');
this.removeAllListeners();
this.transcribeTask && this.transcribeTask.kill();
this.client.removeAllListeners();
if (this.client && this.client.connected) this.client.disconnect();
if (!this.hasReportedFinalAction) {
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'caller hungup'})
.catch((err) => this.logger.info({err}, 'cognigy - error w/ action webook'));
}
if (this.ep.connected) {
await this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
}
this.notifyTaskDone();
}
_makeTranscribeTask() {
const opts = {
recognizer: this.data.recognizer || {
vendor: 'default',
language: 'default',
outputFormat: 'detailed'
}
};
this.logger.debug({opts}, 'constructing a nested transcribe object');
const transcribe = makeTask(this.logger, {transcribe: opts}, this);
return transcribe;
}
_makeSayTask(text) {
const opts = {
text,
synthesizer: this.data.synthesizer ||
{
vendor: 'default',
language: 'default',
voice: 'default'
}
};
this.logger.debug({opts}, 'constructing a nested say object');
const say = makeTask(this.logger, {say: opts}, this);
return say;
}
async _onBotError(cs, ep, evt) {
this.logger.info({evt}, 'Cognigy:_onBotError');
this.performAction({cognigyResult: 'botError', message: evt.message });
this.reportedFinalAction = true;
this.notifyTaskDone();
}
async _onBotTypingStatus(cs, ep, evt) {
this.logger.info({evt}, 'Cognigy:_onBotTypingStatus');
}
async _onBotFinalPing(cs, ep) {
this.logger.info('Cognigy:_onBotFinalPing');
if (this.prompts.length) {
const text = this.prompts.join('.');
this.prompts = [];
if (text && !this.killed) {
this.sayTask = this._makeSayTask(text);
this.sayTask.exec(cs, ep, this)
.catch((err) => {
this.logger.info({err}, 'Cognigy say task returned error');
this.notifyTaskDone();
});
}
}
}
async _onBotUtterance(cs, ep, evt) {
this.logger.debug({evt}, 'Cognigy:_onBotUtterance');
if (this.eventHook) {
this.performHook(cs, this.eventHook, {event: 'botMessage', message: evt})
.then((redirected) => {
if (redirected) {
this.logger.info('Cognigy_onTranscription: event handler for bot message redirected us to new webhook');
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'redirect'}, false);
}
return;
})
.catch(({err}) => {
this.logger.info({err}, 'Cognigy_onTranscription: error sending event hook');
});
}
const text = parseBotText(evt);
this.prompts.push(text);
}
async _onTranscription(cs, ep, evt) {
this.logger.debug({evt}, `Cognigy: got transcription for callSid ${cs.callSid}`);
const utterance = evt.alternatives[0].transcript;
if (this.eventHook) {
this.performHook(cs, this.eventHook, {event: 'userMessage', message: utterance})
.then((redirected) => {
if (redirected) {
this.logger.info('Cognigy_onTranscription: event handler for user message redirected us to new webhook');
this.reportedFinalAction = true;
this.performAction({cognigyResult: 'redirect'}, false);
if (this.transcribeTask) this.transcribeTask.kill(cs);
}
return;
})
.catch(({err}) => {
this.logger.info({err}, 'Cognigy_onTranscription: error sending event hook');
});
}
/* send the user utterance to the bot */
try {
if (this.client && this.client.connected) {
this.client.sendMessage(utterance);
}
else {
this.logger.info('Cognigy_onTranscription - not sending user utterance as bot is disconnected');
}
} catch (err) {
this.logger.error({err}, 'Cognigy_onTranscription: Error sending user utterance to Cognigy - ending task');
this.performAction({cognigyResult: 'socketError'});
this.reportedFinalAction = true;
this.notifyTaskDone();
}
}
_onError(cs, ep, err) {
this.logger.debug({err}, 'Cognigy: got error');
if (!this.hasReportedFinalAction) this.performAction({cognigyResult: 'error', err});
this.reportedFinalAction = true;
this.notifyTaskDone();
}
}
module.exports = Cognigy;

View File

@@ -453,7 +453,7 @@ class Conference extends Task {
this._playSession = null;
break;
}
} while (!this.killed && !this.conf_hold_status === 'hold');
} while (!this.killed && this.conf_hold_status !== 'hold');
}
/**
@@ -529,7 +529,7 @@ class Conference extends Task {
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) {
assert(!this._playSession);
const json = await cs.application.requestor.request(hook, cs.callInfo);
const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
@@ -586,7 +586,7 @@ class Conference extends Task {
params.duration = (Date.now() - this.conferenceStartTime.getTime()) / 1000;
if (!params.time) params.time = (new Date()).toISOString();
if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount;
cs.application.requestor.request(this.statusHook, Object.assign(params, this.statusParams))
cs.application.requestor.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams))
.catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error'));
}
}

View File

@@ -147,7 +147,7 @@ class TaskDial extends Task {
this.epOther.play(this.dialMusic).catch((err) => {});
}
}
await this._attemptCalls(cs);
if (!this.killed) await this._attemptCalls(cs);
await this.awaitTaskDone();
this.logger.debug({callSid: this.cs.callSid}, 'Dial:exec task is done, sending actionHook if any');
await this.performAction(this.results, this.killReason !== KillReason.Replaced);
@@ -161,16 +161,25 @@ class TaskDial extends Task {
async kill(cs, reason) {
super.kill(cs);
if (this.dialMusic && this.epOther) {
this.epOther.api('uuid_break', this.epOther.uuid)
.catch((err) => this.logger.info(err, 'Error killing dialMusic'));
}
this.killReason = reason || KillReason.Hangup;
if (this.timerMaxCallDuration) {
clearTimeout(this.timerMaxCallDuration);
this.timerMaxCallDuration = null;
}
if (this.timerRing) {
clearTimeout(this.timerRing);
this.timerRing = null;
}
this._removeDtmfDetection(cs.dlg);
this._removeDtmfDetection(this.dlg);
this._killOutdials();
if (this.sd) {
this.sd.kill();
this.sd.removeAllListeners();
this.sd = null;
}
if (this.callSid) sessionTracker.remove(this.callSid);
@@ -231,10 +240,19 @@ class TaskDial extends Task {
}
}
_removeHandlers(sd) {
sd.removeAllListeners('accept');
sd.removeAllListeners('decline');
sd.removeAllListeners('adulting');
sd.removeAllListeners('callStatusChange');
sd.removeAllListeners('callCreateFail');
}
_killOutdials() {
for (const [callSid, sd] of Array.from(this.dials)) {
this.logger.debug(`Dial:_killOutdials killing callSid ${callSid}`);
sd.kill().catch((err) => this.logger.info(err, `Dial:_killOutdials Error killing ${callSid}`));
this._removeHandlers(sd);
}
this.dials.clear();
}
@@ -270,7 +288,7 @@ class TaskDial extends Task {
const match = dtmfDetector.keyPress(key);
if (match) {
this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`);
requestor.request(this.dtmfHook, {dtmf: match, ...callInfo.toJSON()})
requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON()})
.catch((err) => this.logger.info(err, 'Dial:_onDtmf - error'));
}
}
@@ -319,8 +337,9 @@ class TaskDial extends Task {
}
const ms = await cs.getMS();
const timerRing = setTimeout(() => {
this.timerRing = setTimeout(() => {
this.logger.info(`Dial:_attemptCall: ring no answer timer ${this.timeout}s exceeded`);
this.timerRing = null;
this._killOutdials();
}, this.timeout * 1000);
@@ -348,6 +367,9 @@ class TaskDial extends Task {
opts.headers['X-Requested-Carrier-Sid'] = voip_carrier_sid;
}
}
if (this.killed) return;
const sd = placeCall({
logger: this.logger,
application: cs.application,
@@ -363,7 +385,9 @@ class TaskDial extends Task {
sd
.on('callCreateFail', () => {
clearTimeout(this.timerRing);
this.dials.delete(sd.callSid);
sd.removeAllListeners();
if (this.dials.size === 0 && !this.sd) {
this.logger.debug('Dial:_attemptCalls - all calls failed after call create err, ending task');
this.kill(cs);
@@ -387,7 +411,8 @@ class TaskDial extends Task {
break;
case CallStatus.InProgress:
this.logger.debug('Dial:_attemptCall -- call was answered');
clearTimeout(timerRing);
clearTimeout(this.timerRing);
this.timerRing = null;
break;
case CallStatus.Failed:
case CallStatus.Busy:
@@ -395,7 +420,8 @@ class TaskDial extends Task {
this.dials.delete(sd.callSid);
if (this.dials.size === 0 && !this.sd) {
this.logger.debug('Dial:_attemptCalls - all calls failed after call failure, ending task');
clearTimeout(timerRing);
clearTimeout(this.timerRing);
this.timerRing = null;
this.kill(cs);
}
break;
@@ -403,6 +429,7 @@ class TaskDial extends Task {
})
.on('accept', async() => {
this.logger.debug(`Dial:_attemptCalls - we have a winner: ${sd.callSid}`);
clearTimeout(this.timerRing);
try {
await this._connectSingleDial(cs, sd);
} catch (err) {
@@ -411,12 +438,21 @@ class TaskDial extends Task {
})
.on('decline', () => {
this.logger.debug(`Dial:_attemptCalls - declined: ${sd.callSid}`);
clearTimeout(this.timerRing);
this.dials.delete(sd.callSid);
sd.removeAllListeners();
if (this.dials.size === 0 && !this.sd) {
this.logger.debug('Dial:_attemptCalls - all calls failed after decline, ending task');
this.kill(cs);
}
})
.on('reinvite', (req, res) => {
try {
cs.handleReinviteAfterMediaReleased(req, res);
} catch (err) {
this.logger.error(err, 'Error in dial einvite from B leg');
}
})
.once('adulting', () => {
/* child call just adulted and got its own session */
this.logger.info('Dial:on_adulting: detaching child call leg');
@@ -448,6 +484,12 @@ class TaskDial extends Task {
this._killOutdials(); // NB: order is important
}
_onMaxCallDuration(cs) {
this.logger.info(`Dial:_onMaxCallDuration tearing down call as it has reached ${this.timeLimit}s`);
this.ep && this.ep.unbridge();
this.kill(cs);
}
/**
* We now have a call leg produced by the Dial action, so
* - hangup any simrings in progress
@@ -468,11 +510,7 @@ class TaskDial extends Task {
await cs.propagateAnswer();
}
if (this.timeLimit) {
this.timerMaxCallDuration = setTimeout(() => {
this.logger.info(`Dial:_selectSingleDial tearing down call as it has reached ${this.timeLimit}s`);
this.ep && this.ep.unbridge();
this.kill(cs);
}, this.timeLimit * 1000);
this.timerMaxCallDuration = setTimeout(this._onMaxCallDuration.bind(this, cs), this.timeLimit * 1000);
}
sessionTracker.add(this.callSid, cs);
this.dlg.on('destroy', () => {
@@ -523,10 +561,9 @@ class TaskDial extends Task {
assert(cs.ep && sd.ep);
try {
this.logger.info('Dial:_releaseMedia - releasing media from freewitch');
const aLegSdp = cs.ep.remote.sdp;
const bLegSdp = sd.ep.remote.sdp;
await Promise.all[sd.releaseMediaToSBC(aLegSdp), cs.releaseMediaToSBC(bLegSdp)];
const bLegSdp = sd.dlg.remote.sdp;
await Promise.all[sd.releaseMediaToSBC(aLegSdp, cs.ep.local.sdp), cs.releaseMediaToSBC(bLegSdp)];
this.epOther = null;
this.logger.info('Dial:_releaseMedia - successfully released media from freewitch');
} catch (err) {
@@ -541,6 +578,12 @@ class TaskDial extends Task {
await Promise.all([sd.reAnchorMedia(), cs.reAnchorMedia()]);
this.epOther = cs.ep;
}
async handleReinviteAfterMediaReleased(req, res) {
const sdp = await this.dlg.modify(req.body);
this.logger.info({sdp}, 'Dial:handleReinviteAfterMediaReleased - sent reinvite to B leg');
res.send(200, {body: sdp});
}
}
module.exports = TaskDial;

View File

@@ -452,8 +452,8 @@ class Dialogflow extends Task {
this.noinputTimer = setTimeout(this._onNoInput.bind(this, ep, cs), this.noInputTimeout);
}
async _performHook(cs, hook, results) {
const json = await this.cs.requestor.request(hook, results);
async _performHook(cs, hook, results = {}) {
const json = await this.cs.requestor.request('verb:hook', hook, {...results, ...cs.callInfo.toJSON()});
if (json && Array.isArray(json)) {
const makeTask = require('../make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));

View File

@@ -317,7 +317,7 @@ class TaskEnqueue extends Task {
} catch (err) {
this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`);
}
const json = await cs.application.requestor.request(hook, params);
const json = await cs.application.requestor.request('verb:hook', hook, params);
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
@@ -337,7 +337,7 @@ class TaskEnqueue extends Task {
}
tasksToRun.push(o);
}
const cloneTasks = [...tasksToRun];
if (this.killed) return [];
else if (tasksToRun.length > 0) {
this._playSession = new ConfirmCallSession({
@@ -356,7 +356,7 @@ class TaskEnqueue extends Task {
this.state = QueueResults.Leave;
this.kill(cs);
}
return tasksToRun;
return cloneTasks;
}
}

View File

@@ -9,6 +9,7 @@ const {
const makeTask = require('./make_task');
const assert = require('assert');
const GATHER_STABILITY_THRESHOLD = Number(process.env.JAMBONZ_GATHER_STABILITY_THRESHOLD || 0.7);
class TaskGather extends Task {
constructor(logger, opts, parentTask) {
@@ -16,12 +17,15 @@ class TaskGather extends Task {
this.preconditions = TaskPreconditions.Endpoint;
[
'finishOnKey', 'hints', 'input', 'numDigits',
'partialResultHook',
'finishOnKey', 'hints', 'input', 'numDigits', 'minDigits', 'maxDigits',
'interDigitTimeout', 'partialResultHook', 'bargein', 'dtmfBargein',
'speechTimeout', 'timeout', 'say', 'play'
].forEach((k) => this[k] = this.data[k]);
this.timeout = (this.timeout || 5) * 1000;
/* when collecting dtmf, bargein on dtmf is true unless explicitly set to false */
if (this.dtmfBargein !== false && this.input.includes('digits')) this.dtmfBargein = true;
this.timeout = (this.timeout || 15) * 1000;
this.interim = this.partialResultCallback;
if (this.data.recognizer) {
const recognizer = this.data.recognizer;
@@ -30,6 +34,10 @@ class TaskGather extends Task {
this.hints = recognizer.hints || [];
this.altLanguages = recognizer.altLanguages || [];
/* vad: if provided, we dont connect to recognizer until voice activity is detected */
const {enable, voiceMs = 0, mode = -1} = recognizer.vad || {};
this.vad = {enable, voiceMs, mode};
/* aws options */
this.vocabularyName = recognizer.vocabularyName;
this.vocabularyFilterName = recognizer.vocabularyFilterName;
@@ -40,6 +48,10 @@ class TaskGather extends Task {
this.profanityOption = recognizer.profanityOption || 'raw';
this.requestSnr = recognizer.requestSnr || false;
this.initialSpeechTimeoutMs = recognizer.initialSpeechTimeoutMs || 0;
/* barge in configuration */
this.listenDuringPrompt = this.data.listenDuringPrompt === false ? false : true;
this.minBargeinWordCount = this.data.minBargeinWordCount || 1;
}
this.digitBuffer = '';
@@ -48,6 +60,12 @@ class TaskGather extends Task {
if (this.say) this.sayTask = makeTask(this.logger, {say: this.say}, this);
if (this.play) this.playTask = makeTask(this.logger, {play: this.play}, this);
if (this.sayTask || this.playTask) {
// this is specially for barge in where we want to make a bargebale promt
// to a user without listening after the say task has finished
this.listenAfterSpeech = typeof this.data.listenAfterSpeech === 'boolean' ? this.data.listenAfterSpeech : true;
}
this.parentTask = parentTask;
}
@@ -80,29 +98,56 @@ class TaskGather extends Task {
throw new Error(`no speech-to-text service credentials for ${this.vendor} have been configured`);
}
const startListening = (cs, ep) => {
this._startTimer();
if (this.input.includes('speech') && !this.listenDuringPrompt) {
this._initSpeech(cs, ep)
.then(() => {
this._startTranscribing(ep);
return updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
})
.catch(() => {});
}
};
try {
if (this.sayTask) {
this.sayTask.exec(cs, ep); // kicked off, _not_ waiting for it to complete
this.sayTask.on('playDone', (err) => {
if (!this.killed) this._startTimer();
if (err) return this.logger.error({err}, 'Gather:exec Error playing tts');
this.logger.info('Gather: say task completed');
if (!this.killed) {
if (this.listenAfterSpeech === true) {
startListening(cs, ep);
} else {
this.notifyTaskDone();
}
}
});
}
else if (this.playTask) {
this.playTask.exec(cs, ep); // kicked off, _not_ waiting for it to complete
this.playTask.on('playDone', (err) => {
if (!this.killed) this._startTimer();
if (err) return this.logger.error({err}, 'Gather:exec Error playing url');
if (!this.killed) {
if (this.listenAfterSpeech === true) {
startListening(cs, ep);
} else {
this.notifyTaskDone();
}
}
});
}
else this._startTimer();
else startListening(cs, ep);
if (this.input.includes('speech')) {
if (this.input.includes('speech') && this.listenDuringPrompt) {
await this._initSpeech(cs, ep);
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}
if (this.input.includes('digits')) {
if (this.input.includes('digits') || this.dtmfBargein) {
ep.on('dtmf', this._onDtmf.bind(this, cs, ep));
}
@@ -121,83 +166,111 @@ class TaskGather extends Task {
super.kill(cs);
this._killAudio(cs);
this.ep.removeAllListeners('dtmf');
clearTimeout(this.interDigitTimer);
this._resolve('killed');
}
_onDtmf(cs, ep, evt) {
this.logger.debug(evt, 'TaskGather:_onDtmf');
if (evt.dtmf === this.finishOnKey) this._resolve('dtmf-terminator-key');
clearTimeout(this.interDigitTimer);
let resolved = false;
if (this.dtmfBargein) this._killAudio(cs);
if (evt.dtmf === this.finishOnKey && this.input.includes('digits')) {
resolved = true;
this._resolve('dtmf-terminator-key');
}
else {
this.digitBuffer += evt.dtmf;
if (this.digitBuffer.length === this.numDigits) this._resolve('dtmf-num-digits');
const len = this.digitBuffer.length;
if (len === this.numDigits || len === this.maxDigits) {
resolved = true;
this._resolve('dtmf-num-digits');
}
}
if (!resolved && this.interDigitTimeout > 0 && this.digitBuffer.length >= this.minDigits) {
/* start interDigitTimer */
const ms = this.interDigitTimeout * 1000;
this.logger.debug(`starting interdigit timer of ${ms}`);
this.interDigitTimer = setTimeout(() => this._resolve('dtmf-interdigit-timeout'), ms);
}
this._killAudio(cs);
}
async _initSpeech(cs, ep) {
const opts = {};
try {
const opts = {};
if ('google' === this.vendor) {
if (this.sttCredentials) opts.GOOGLE_APPLICATION_CREDENTIALS = JSON.stringify(this.sttCredentials.credentials);
Object.assign(opts, {
GOOGLE_SPEECH_USE_ENHANCED: true,
GOOGLE_SPEECH_SINGLE_UTTERANCE: true,
GOOGLE_SPEECH_MODEL: 'command_and_search'
});
if (this.hints && this.hints.length > 1) {
opts.GOOGLE_SPEECH_HINTS = this.hints.map((h) => h.trim()).join(',');
if (this.vad.enable) {
opts.START_RECOGNIZING_ON_VAD = 1;
if (this.vad.voiceMs) opts.RECOGNIZER_VAD_VOICE_MS = this.vad.voiceMs;
if (this.vad.mode >= 0 && this.vad.mode <= 3) opts.RECOGNIZER_VAD_MODE = this.vad.mode;
}
if (this.altLanguages && this.altLanguages.length > 1) {
opts.GOOGLE_SPEECH_ALTERNATIVE_LANGUAGE_CODES = this.altLanguages.join(',');
}
if (this.profanityFilter === true) {
Object.assign(opts, {'GOOGLE_SPEECH_PROFANITY_FILTER': true});
}
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
}
else if (['aws', 'polly'].includes(this.vendor)) {
if (this.vocabularyName) opts.AWS_VOCABULARY_NAME = this.vocabularyName;
if (this.vocabularyFilterName) {
opts.AWS_VOCABULARY_NAME = this.vocabularyFilterName;
opts.AWS_VOCABULARY_FILTER_METHOD = this.filterMethod || 'mask';
}
if (this.sttCredentials) {
Object.assign(opts, {
AWS_ACCESS_KEY_ID: this.sttCredentials.accessKeyId,
AWS_SECRET_ACCESS_KEY: this.sttCredentials.secretAccessKey,
AWS_REGION: this.sttCredentials.region
});
}
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
}
else if ('microsoft' === this.vendor) {
if (this.sttCredentials) {
Object.assign(opts, {
'AZURE_SUBSCRIPTION_KEY': this.sttCredentials.api_key,
'AZURE_REGION': this.sttCredentials.region
});
}
if (this.hints && this.hints.length > 1) {
opts.AZURE_SPEECH_HINTS = this.hints.map((h) => h.trim()).join(',');
}
//if (this.requestSnr) opts.AZURE_REQUEST_SNR = 1;
//if (this.profanityOption !== 'raw') opts.AZURE_PROFANITY_OPTION = this.profanityOption;
if (this.initialSpeechTimeoutMs > 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = this.initialSpeechTimeoutMs;
opts.AZURE_USE_OUTPUT_FORMAT_DETAILED = 1;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected, this._onNoSpeechDetected.bind(this, cs, ep));
if ('google' === this.vendor) {
if (this.sttCredentials) opts.GOOGLE_APPLICATION_CREDENTIALS = JSON.stringify(this.sttCredentials.credentials);
Object.assign(opts, {
GOOGLE_SPEECH_USE_ENHANCED: true,
GOOGLE_SPEECH_SINGLE_UTTERANCE: true,
GOOGLE_SPEECH_MODEL: 'command_and_search'
});
if (this.hints && this.hints.length > 1) {
opts.GOOGLE_SPEECH_HINTS = this.hints.map((h) => h.trim()).join(',');
}
if (this.altLanguages && this.altLanguages.length > 1) {
opts.GOOGLE_SPEECH_ALTERNATIVE_LANGUAGE_CODES = this.altLanguages.join(',');
}
if (this.profanityFilter === true) {
Object.assign(opts, {'GOOGLE_SPEECH_PROFANITY_FILTER': true});
}
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, cs, ep));
}
else if (['aws', 'polly'].includes(this.vendor)) {
if (this.vocabularyName) opts.AWS_VOCABULARY_NAME = this.vocabularyName;
if (this.vocabularyFilterName) {
opts.AWS_VOCABULARY_NAME = this.vocabularyFilterName;
opts.AWS_VOCABULARY_FILTER_METHOD = this.filterMethod || 'mask';
}
if (this.sttCredentials) {
Object.assign(opts, {
AWS_ACCESS_KEY_ID: this.sttCredentials.accessKeyId,
AWS_SECRET_ACCESS_KEY: this.sttCredentials.secretAccessKey,
AWS_REGION: this.sttCredentials.region
});
}
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
}
else if ('microsoft' === this.vendor) {
if (this.sttCredentials) {
Object.assign(opts, {
'AZURE_SUBSCRIPTION_KEY': this.sttCredentials.api_key,
'AZURE_REGION': this.sttCredentials.region
});
}
if (this.hints && this.hints.length > 1) {
opts.AZURE_SPEECH_HINTS = this.hints.map((h) => h.trim()).join(',');
}
//if (this.requestSnr) opts.AZURE_REQUEST_SNR = 1;
//if (this.profanityOption !== 'raw') opts.AZURE_PROFANITY_OPTION = this.profanityOption;
if (this.initialSpeechTimeoutMs > 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = this.initialSpeechTimeoutMs;
opts.AZURE_USE_OUTPUT_FORMAT_DETAILED = 1;
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AzureTranscriptionEvents.NoSpeechDetected,
this._onNoSpeechDetected.bind(this, cs, ep));
}
await ep.set(opts)
.catch((err) => this.logger.error(err, 'Error setting channel variables'));
} catch (err) {
this.logger.error(err, 'could not init speech for listening');
}
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
_startTranscribing(ep) {
ep.startTranscription({
vendor: this.vendor,
locale: this.language,
interim: this.partialResultCallback ? true : false,
interim: this.partialResultCallback || this.bargein,
}).catch((err) => {
const {writeAlerts, AlertType} = this.cs.srf.locals;
this.logger.error(err, 'TaskGather:_startTranscribing error');
@@ -239,25 +312,50 @@ class TaskGather extends Task {
_onTranscription(cs, ep, evt) {
if ('aws' === this.vendor && Array.isArray(evt) && evt.length > 0) evt = evt[0];
if ('microsoft' === this.vendor) {
const nbest = evt.NBest;
const newEvent = {
is_final: evt.RecognitionStatus === 'Success',
alternatives: [
{
confidence: nbest[0].Confidence,
transcript: nbest[0].Display
}
]
};
evt = newEvent;
const final = evt.RecognitionStatus === 'Success';
if (final) {
const nbest = evt.NBest;
evt = {
is_final: true,
alternatives: [
{
confidence: nbest[0].Confidence,
transcript: nbest[0].Display
}
]
};
}
else {
evt = {
is_final: false,
alternatives: [
{
transcript: evt.Text
}
]
};
}
}
this.logger.debug(evt, 'TaskGather:_onTranscription');
if (evt.is_final) this._resolve('speech', evt);
else if (this.partialResultHook) {
this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'GatherTask:_onTranscription error'));
else {
/* google has a measure of stability:
https://cloud.google.com/speech-to-text/docs/basics#streaming_responses
others do not.
*/
const isStableEnough = typeof evt.stability === 'undefined' || evt.stability > GATHER_STABILITY_THRESHOLD;
if (this.bargein && isStableEnough &&
evt.alternatives[0].transcript.split(' ').length >= this.minBargeinWordCount) {
this.logger.debug('Gather:_onTranscription - killing audio due to speech bargein');
this._killAudio(cs);
}
if (this.partialResultHook) {
this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'GatherTask:_onTranscription error'));
}
}
}
_onEndOfUtterance(cs, ep) {
this.logger.info('TaskGather:_onEndOfUtterance');
if (!this.resolved && !this.killed) {
@@ -273,6 +371,7 @@ class TaskGather extends Task {
if (this.resolved) return;
this.resolved = true;
this.logger.debug(`TaskGather:resolve with reason ${reason}`);
clearTimeout(this.interDigitTimer);
if (this.ep && this.ep.connected) {
this.ep.stopTranscription({vendor: this.vendor})

View File

@@ -289,7 +289,7 @@ class Lex extends Task {
}
async _performHook(cs, hook, results) {
const json = await this.cs.requestor.request(hook, results);
const json = await this.cs.requestor.request('verb:hook', hook, results);
if (json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));

View File

@@ -64,6 +64,7 @@ class TaskListen extends Task {
this.results.dialCallDuration = duration;
}
if (this.transcribeTask) await this.transcribeTask.kill(cs);
this.ep && this._removeListeners(this.ep);
this.notifyTaskDone();
}
@@ -136,6 +137,10 @@ class TaskListen extends Task {
if (this.finishOnKey || this.passDtmf) {
ep.removeListener('dtmf', this._dtmfHandler);
}
ep.removeCustomEventListener(ListenEvents.PlayAudio);
ep.removeCustomEventListener(ListenEvents.KillAudio);
ep.removeCustomEventListener(ListenEvents.Disconnect);
}
_onDtmf(evt) {

View File

@@ -20,6 +20,9 @@ function makeTask(logger, obj, parent) {
case TaskName.SipRefer:
const TaskSipRefer = require('./sip_refer');
return new TaskSipRefer(logger, data, parent);
case TaskName.Cognigy:
const TaskCognigy = require('./cognigy');
return new TaskCognigy(logger, data, parent);
case TaskName.Conference:
const TaskConference = require('./conference');
return new TaskConference(logger, data, parent);

View File

@@ -42,7 +42,7 @@ class TaskMessage extends Task {
}
if (gw) {
this.logger.info({gw, accountSid}, 'Message:exec - using smpp to send message');
url = getSmpp();
url = process.env.K8S ? 'http://smpp' : getSmpp();
relativeUrl = '/sms';
payload = {
...payload,

View File

@@ -48,7 +48,7 @@ class TaskRestDial extends Task {
cs.setDialog(dlg);
try {
const tasks = await cs.requestor.request(this.call_hook, cs.callInfo);
const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo);
if (tasks && Array.isArray(tasks)) {
this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`);
cs.replaceApplication(normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata)));

View File

@@ -21,15 +21,20 @@ class TaskSay extends Task {
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, srf);
const {writeAlerts, AlertType, stats} = srf.locals;
const {synthAudio} = srf.locals.dbHelpers;
const hasVerbLevelTts = this.synthesizer.vendor && this.synthesizer.vendor !== 'default';
const vendor = hasVerbLevelTts ? this.synthesizer.vendor : cs.speechSynthesisVendor ;
const language = hasVerbLevelTts ? this.synthesizer.language : cs.speechSynthesisLanguage ;
const voice = hasVerbLevelTts ? this.synthesizer.voice : cs.speechSynthesisVoice ;
const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ?
this.synthesizer.vendor :
cs.speechSynthesisVendor;
const language = this.synthesizer.language && this.synthesizer.language !== 'default' ?
this.synthesizer.language :
cs.speechSynthesisLanguage ;
const voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ?
this.synthesizer.voice :
cs.speechSynthesisVoice;
const engine = this.synthesizer.engine || 'standard';
const salt = cs.callSid;
const credentials = cs.getSpeechCredentials(vendor, 'tts');
this.logger.info({language, voice}, `Task:say - using vendor: ${vendor}`);
this.logger.info({vendor, language, voice}, 'TaskSay:exec');
this.ep = ep;
try {
if (!credentials) {
@@ -43,11 +48,13 @@ class TaskSay extends Task {
// synthesize all of the text elements
let lastUpdated = false;
const filepath = (await Promise.all(this.text.map(async(text) => {
if (text.startsWith('silence_stream://')) return text;
const {filePath, servedFromCache} = await synthAudio(stats, {
text,
vendor,
language,
voice,
engine,
salt,
credentials
}).catch((err) => {
@@ -78,7 +85,11 @@ class TaskSay extends Task {
const {memberId, confName, confUuid} = cs;
await this.playToConfMember(this.ep, memberId, confName, confUuid, filepath[segment]);
}
else await ep.play(filepath[segment]);
else {
this.logger.debug(`Say:exec sending command to play file ${filepath[segment]}`);
await ep.play(filepath[segment]);
this.logger.debug(`Say:exec completed play file ${filepath[segment]}`);
}
} while (!this.killed && ++segment < filepath.length);
}
} catch (err) {

View File

@@ -65,7 +65,7 @@ class TaskSipRefer extends Task {
const status = arr[1];
this.logger.debug(`TaskSipRefer:_handleNotify: call got status ${status}`);
if (this.eventHook) {
await cs.requestor.request(this.eventHook, {event: 'transfer-status', call_status: status});
await cs.requestor.request('verb:hook', this.eventHook, {event: 'transfer-status', call_status: status});
}
if (status >= 200) {
await this.performAction({refer_status: 202, final_referred_call_status: status});

View File

@@ -21,6 +21,22 @@
"referTo"
]
},
"cognigy": {
"properties": {
"url": "string",
"token": "string",
"recognizer": "#recognizer",
"tts": "#synthesizer",
"prompt": "string",
"actionHook": "object|string",
"eventHook": "object|string",
"data": "object"
},
"required": [
"url",
"token"
]
},
"dequeue": {
"properties": {
"name": "string",
@@ -82,8 +98,16 @@
"finishOnKey": "string",
"input": "array",
"numDigits": "number",
"minDigits": "number",
"maxDigits": "number",
"interDigitTimeout": "number",
"partialResultHook": "object|string",
"speechTimeout": "number",
"listenDuringPrompt": "boolean",
"listenAfterSpeech": "boolean",
"dtmfBargein": "boolean",
"bargein": "boolean",
"minBargeinWordCount": "number",
"timeout": "number",
"recognizer": "#recognizer",
"play": "#play",
@@ -308,7 +332,6 @@
"earlyMedia": "boolean"
},
"required": [
"transcriptionHook",
"recognizer"
]
},
@@ -323,13 +346,15 @@
"type": "string",
"enum": ["GET", "POST"]
},
"headers": "object",
"name": "string",
"number": "string",
"sipUri": "string",
"auth": "#auth",
"vmail": "boolean",
"tenant": "string",
"trunk": "string"
"trunk": "string",
"overrideTo": "string"
},
"required": [
"type"
@@ -353,6 +378,10 @@
},
"language": "string",
"voice": "string",
"engine": {
"type": "string",
"enum": ["standard", "neural"]
},
"gender": {
"type": "string",
"enum": ["MALE", "FEMALE", "NEUTRAL"]
@@ -369,6 +398,7 @@
"enum": ["google", "aws", "microsoft", "default"]
},
"language": "string",
"vad": "#vad",
"hints": "array",
"altLanguages": "array",
"profanityFilter": "boolean",
@@ -437,5 +467,15 @@
"required": [
"name"
]
},
"vad": {
"properties": {
"enable": "boolean",
"voiceMs": "number",
"mode": "number"
},
"required": [
"enable"
]
}
}

View File

@@ -62,7 +62,9 @@ class Task extends Emitter {
kill(cs) {
if (this.cs && !this.cs.isConfirmCallSession) this.logger.debug(`${this.name} is being killed`);
this._killInProgress = true;
// no-op
/* remove reference to parent task or else entangled parent-child tasks will not be gc'ed */
setImmediate(() => this.parentTask = null);
}
/**
@@ -105,7 +107,7 @@ class Task extends Emitter {
async performAction(results, expectResponse = true) {
if (this.actionHook) {
const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON();
const json = await this.cs.requestor.request(this.actionHook, params);
const json = await this.cs.requestor.request('verb:hook', this.actionHook, params);
if (expectResponse && json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
@@ -118,7 +120,7 @@ class Task extends Emitter {
}
async performHook(cs, hook, results) {
const json = await cs.requestor.request(hook, results);
const json = await cs.requestor.request('verb:hook', hook, results);
if (json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));

View File

@@ -11,6 +11,7 @@ class TaskTranscribe extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.parentTask = parentTask;
this.transcriptionHook = this.data.transcriptionHook;
this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia);
@@ -21,6 +22,10 @@ class TaskTranscribe extends Task {
this.interim = !!recognizer.interim;
this.separateRecognitionPerChannel = recognizer.separateRecognitionPerChannel;
/* vad: if provided, we dont connect to recognizer until voice activity is detected */
const {enable, voiceMs = 0, mode = -1} = recognizer.vad || {};
this.vad = {enable, voiceMs, mode};
/* google-specific options */
this.hints = recognizer.hints || [];
this.profanityFilter = recognizer.profanityFilter;
@@ -76,6 +81,7 @@ class TaskTranscribe extends Task {
await this.awaitTaskDone();
} catch (err) {
this.logger.info(err, 'TaskTranscribe:exec - error');
this.parentTask && this.parentTask.emit('error', err);
}
ep.removeCustomEventListener(GoogleTranscriptionEvents.Transcription);
ep.removeCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected);
@@ -103,6 +109,12 @@ class TaskTranscribe extends Task {
async _startTranscribing(cs, ep) {
const opts = {};
if (this.vad.enable) {
opts.START_RECOGNIZING_ON_VAD = 1;
if (this.vad.voiceMs) opts.RECOGNIZER_VAD_VOICE_MS = this.vad.voiceMs;
if (this.vad.mode >= 0 && this.vad.mode <= 3) opts.RECOGNIZER_VAD_MODE = this.vad.mode;
}
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected, this._onNoAudio.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.MaxDurationExceeded,
@@ -210,25 +222,36 @@ class TaskTranscribe extends Task {
}
_onTranscription(cs, ep, evt) {
this.logger.debug(evt, 'TaskTranscribe:_onTranscription');
if ('aws' === this.vendor && Array.isArray(evt) && evt.length > 0) evt = evt[0];
if ('microsoft' === this.vendor) {
const nbest = evt.NBest;
const alternatives = nbest.map((n) => {
const alternatives = nbest ? nbest.map((n) => {
return {
confidence: n.Confidence,
transcript: n.Display
};
});
}) :
[
{
transcript: evt.DisplayText
}
];
const newEvent = {
is_final: evt.RecognitionStatus === 'Success',
alternatives
};
evt = newEvent;
}
this.logger.debug(evt, 'TaskTranscribe:_onTranscription');
this.cs.requestor.request(this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
if (this.transcriptionHook) {
this.cs.requestor.request('verb:hook', this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
}
if (this.parentTask) {
this.parentTask.emit('transcription', evt);
}
if (this.killed) {
this.logger.debug('TaskTranscribe:_onTranscription exiting after receiving final transcription');
this._clearTimer();

View File

@@ -45,6 +45,7 @@ class SnsNotifier extends Emitter {
}, 'response from SNS SubscribeURL');
const data = await this.describeInstance();
this.lifecycleState = data.AutoScalingInstances[0].LifecycleState;
this.emit('SubscriptionConfirmation', {publicIp: this.publicIp});
break;
case 'Notification':

View File

@@ -0,0 +1,75 @@
const assert = require('assert');
const Emitter = require('events');
const crypto = require('crypto');
const timeSeries = require('@jambonz/time-series');
let alerter ;
class BaseRequestor extends Emitter {
constructor(logger, account_sid, hook, secret) {
super();
assert(typeof hook === 'object');
this.logger = logger;
this.url = hook.url;
this.username = hook.username;
this.password = hook.password;
this.secret = secret;
this.account_sid = account_sid;
const {stats} = require('../../').srf.locals;
this.stats = stats;
if (!alerter) {
alerter = timeSeries(logger, {
host: process.env.JAMBONES_TIME_SERIES_HOST,
commitSize: 50,
commitInterval: 'test' === process.env.NODE_ENV ? 7 : 20
});
}
}
get Alerter() {
return alerter;
}
close() {
/* subclass responsibility */
}
_computeSignature(payload, timestamp, secret) {
assert(secret);
const data = `${timestamp}.${JSON.stringify(payload)}`;
return crypto
.createHmac('sha256', secret)
.update(data, 'utf8')
.digest('hex');
}
_generateSigHeader(payload, secret) {
const timestamp = Math.floor(Date.now() / 1000);
const signature = this._computeSignature(payload, timestamp, secret);
const scheme = 'v1';
return {
'Jambonz-Signature': `t=${timestamp},${scheme}=${signature}`
};
}
_isAbsoluteUrl(u) {
return typeof u === 'string' &&
u.startsWith('https://') || u.startsWith('http://') ||
u.startsWith('ws://') || u.startsWith('wss://');
}
_isRelativeUrl(u) {
return typeof u === 'string' && u.startsWith('/');
}
_roundTrip(startAt) {
const diff = process.hrtime(startAt);
const time = diff[0] * 1e3 + diff[1] * 1e-6;
return time.toFixed(0);
}
}
module.exports = BaseRequestor;

View File

@@ -1,5 +1,6 @@
{
"TaskName": {
"Cognigy": "cognigy",
"Conference": "conference",
"Dequeue": "dequeue",
"Dial": "dial",
@@ -104,6 +105,16 @@
"Hangup": "hangup",
"Replaced": "replaced"
},
"HookMsgTypes": [
"session:new",
"session:reconnect",
"session:redirect",
"call:status",
"queue:status",
"verb:hook",
"jambonz:error"
],
"MAX_SIMRINGS": 10,
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)"
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)",
"FS_UUID_SET_NAME": "fsUUIDs"
}

View File

@@ -36,6 +36,10 @@ const speechMapper = (cred) => {
obj.api_key = o.api_key;
obj.region = o.region;
}
else if ('wellsaid' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
}
return obj;
};
@@ -53,6 +57,7 @@ module.exports = (logger, srf) => {
const haveGoogle = speech.find((s) => s.vendor === 'google');
const haveAws = speech.find((s) => s.vendor === 'aws');
const haveMicrosoft = speech.find((s) => s.vendor === 'microsoft');
const haveWellsaid = speech.find((s) => s.vendor === 'wellsaid');
if (!haveGoogle || !haveAws || !haveMicrosoft) {
const [r3] = await pp.query(sqlSpeechCredentialsForSP, account_sid);
if (r3.length) {
@@ -68,6 +73,10 @@ module.exports = (logger, srf) => {
const ms = r3.find((s) => s.vendor === 'microsoft');
if (ms) speech.push(speechMapper(ms));
}
if (!haveWellsaid) {
const wellsaid = r3.find((s) => s.vendor === 'wellsaid');
if (wellsaid) speech.push(speechMapper(wellsaid));
}
}
}

105
lib/utils/http-requestor.js Normal file
View File

@@ -0,0 +1,105 @@
const bent = require('bent');
const parseUrl = require('parse-url');
const assert = require('assert');
const BaseRequestor = require('./base-requestor');
const {HookMsgTypes} = require('./constants.json');
const snakeCaseKeys = require('./snakecase-keys');
const toBase64 = (str) => Buffer.from(str || '', 'utf8').toString('base64');
function basicAuth(username, password) {
if (!username || !password) return {};
const creds = `${username}:${password || ''}`;
const header = `Basic ${toBase64(creds)}`;
return {Authorization: header};
}
class HttpRequestor extends BaseRequestor {
constructor(logger, account_sid, hook, secret) {
super(logger, account_sid, hook, secret);
this.method = hook.method || 'POST';
this.authHeader = basicAuth(hook.username, hook.password);
const u = parseUrl(this.url);
const myPort = u.port ? `:${u.port}` : '';
const baseUrl = this._baseUrl = `${u.protocol}://${u.resource}${myPort}`;
this.get = bent(baseUrl, 'GET', 'buffer', 200, 201);
this.post = bent(baseUrl, 'POST', 'buffer', 200, 201);
assert(this._isAbsoluteUrl(this.url));
assert(['GET', 'POST'].includes(this.method));
}
get baseUrl() {
return this._baseUrl;
}
/**
* Make an HTTP request.
* All requests use json bodies.
* All requests expect a 200 statusCode on success
* @param {object|string} hook - may be a absolute or relative url, or an object
* @param {string} [hook.url] - an absolute or relative url
* @param {string} [hook.method] - 'GET' or 'POST'
* @param {string} [hook.username] - if basic auth is protecting the endpoint
* @param {string} [hook.password] - if basic auth is protecting the endpoint
* @param {object} [params] - request parameters
*/
async request(type, hook, params) {
assert(HookMsgTypes.includes(type));
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
const url = hook.url || hook;
const method = hook.method || 'POST';
assert.ok(url, 'HttpRequestor:request url was not provided');
assert.ok, (['GET', 'POST'].includes(method), `HttpRequestor:request method must be 'GET' or 'POST' not ${method}`);
const {url: urlInfo = hook, method: methodInfo = 'POST'} = hook; // mask user/pass
this.logger.debug({url: urlInfo, method: methodInfo, payload}, `HttpRequestor:request ${method} ${url}`);
const startAt = process.hrtime();
let buf;
try {
const sigHeader = this._generateSigHeader(payload, this.secret);
const headers = {...sigHeader, ...this.authHeader};
//this.logger.info({url, headers}, 'send webhook');
buf = this._isRelativeUrl(url) ?
await this.post(url, payload, headers) :
await bent(method, 'buffer', 200, 201, 202)(url, payload, headers);
} catch (err) {
this.logger.error({err, secret: this.secret, baseUrl: this.baseUrl, url, statusCode: err.statusCode},
`web callback returned unexpected error code ${err.statusCode}`);
let opts = {account_sid: this.account_sid};
if (err.code === 'ECONNREFUSED') {
opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_CONNECTION_FAILURE, url};
}
else if (err.name === 'StatusError') {
opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_STATUS_FAILURE, url, status: err.statusCode};
}
else {
opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_CONNECTION_FAILURE, url, detail: err.message};
}
this.Alerter.writeAlerts(opts).catch((err) => this.logger.info({err, opts}, 'Error writing alert'));
throw err;
}
const rtt = this._roundTrip(startAt);
if (buf) this.stats.histogram('app.hook.response_time', rtt, ['hook_type:app']);
if (buf && buf.toString().length > 0) {
try {
const json = JSON.parse(buf.toString());
this.logger.info({response: json}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`);
return json;
}
catch (err) {
//this.logger.debug({err, url, method}, `HttpRequestor:request returned non-JSON content: '${buf.toString()}'`);
}
}
}
}
module.exports = HttpRequestor;

View File

@@ -20,6 +20,13 @@ function initMS(logger, wrapper, ms) {
wrapper.connects = 1;
wrapper.active = true;
});
ms.on('channel::open', (evt) => {
logger.debug({evt}, `mediaserver ${ms.address} added endpoint`);
});
ms.on('channel::close', (evt) => {
logger.debug({evt}, `mediaserver ${ms.address} removed endpoint`);
});
}
function installSrfLocals(srf, logger) {
@@ -105,6 +112,7 @@ function installSrfLocals(srf, logger) {
const {
pool,
lookupAppByPhoneNumber,
lookupAppByRegex,
lookupAppBySid,
lookupAppByRealm,
lookupAppByTeamsTenant,
@@ -159,6 +167,7 @@ function installSrfLocals(srf, logger) {
client,
pool,
lookupAppByPhoneNumber,
lookupAppByRegex,
lookupAppBySid,
lookupAppByRealm,
lookupAppByTeamsTenant,

View File

@@ -5,15 +5,10 @@ const {TaskPreconditions, CallDirection} = require('../utils/constants');
const CallInfo = require('../session/call-info');
const assert = require('assert');
const ConfirmCallSession = require('../session/confirm-call-session');
const selectSbc = require('./select-sbc');
const Registrar = require('@jambonz/mw-registrar');
const AdultingCallSession = require('../session/adulting-call-session');
const registrar = new Registrar({
host: process.env.JAMBONES_REDIS_HOST,
port: process.env.JAMBONES_REDIS_PORT || 6379
});
const deepcopy = require('deepcopy');
const moment = require('moment');
const stripCodecs = require('./strip-ancillary-codecs');
const { v4: uuidv4 } = require('uuid');
class SingleDialer extends Emitter {
@@ -63,7 +58,18 @@ class SingleDialer extends Emitter {
async exec(srf, ms, opts) {
opts = opts || {};
opts.headers = opts.headers || {};
opts.headers = {...opts.headers, 'X-Call-Sid': this.callSid};
opts.headers = {
...opts.headers,
...(this.target.headers || {}),
'X-Jambonz-Routing': this.target.type,
'X-Call-Sid': this.callSid
};
if (srf.locals.fsUUID) {
opts.headers = {
...opts.headers,
'X-Jambonz-FS-UUID': srf.locals.fsUUID,
};
}
this.ms = ms;
let uri, to;
try {
@@ -84,7 +90,6 @@ class SingleDialer extends Emitter {
break;
case 'user':
assert(this.target.name);
const aor = this.target.name;
uri = `sip:${this.target.name}`;
to = this.target.name;
@@ -93,16 +98,6 @@ class SingleDialer extends Emitter {
'X-Override-To': this.target.overrideTo
});
}
// need to send to the SBC registered on
const reg = await registrar.query(aor);
if (reg) {
const sbc = selectSbc(reg.sbcAddress);
if (sbc) {
this.logger.debug(`SingleDialer:exec retrieved registration details for ${aor}, using sbc at ${sbc}`);
this.sbcAddress = sbc;
}
}
break;
case 'sip':
assert(this.target.sipUri);
@@ -155,6 +150,7 @@ class SingleDialer extends Emitter {
* (a) create a CallInfo for this call
* (a) create a logger for this call
*/
req.srf = srf;
this.callInfo = new CallInfo({
direction: CallDirection.Outbound,
parentCallInfo: this.parentCallInfo,
@@ -209,9 +205,15 @@ class SingleDialer extends Emitter {
.on('refresh', () => this.logger.info('SingleDialer:exec - dialog refreshed by uas'))
.on('modify', async(req, res) => {
try {
const newSdp = await this.ep.modify(req.body);
res.send(200, {body: newSdp});
this.logger.info({offer: req.body, answer: newSdp}, 'SingleDialer:exec: handling reINVITE');
if (this.ep) {
const newSdp = await this.ep.modify(req.body);
res.send(200, {body: newSdp});
this.logger.info({offer: req.body, answer: newSdp}, 'SingleDialer:exec: handling reINVITE');
}
else {
this.logger.info('SingleDialer:exec: handling reINVITE with released media, emit event');
this.emit('reinvite', req, res);
}
} catch (err) {
this.logger.error(err, 'Error handling reinvite');
}
@@ -220,6 +222,7 @@ class SingleDialer extends Emitter {
if (this.confirmHook) this._executeApp(this.confirmHook);
else this.emit('accept');
} catch (err) {
this.inviteInProgress = null;
const status = {callStatus: CallStatus.Failed};
if (err instanceof SipError) {
status.sipStatus = err.status;
@@ -322,9 +325,10 @@ class SingleDialer extends Emitter {
return cs;
}
async releaseMediaToSBC(remoteSdp) {
async releaseMediaToSBC(remoteSdp, localSdp) {
assert(this.dlg && this.dlg.connected && this.ep && typeof remoteSdp === 'string');
await this.dlg.modify(remoteSdp, {
const sdp = stripCodecs(this.logger, remoteSdp, localSdp) || remoteSdp;
await this.dlg.modify(sdp, {
headers: {
'X-Reason': 'release-media'
}
@@ -353,7 +357,7 @@ class SingleDialer extends Emitter {
this.callInfo.updateCallStatus(callStatus, sipStatus);
if (typeof duration === 'number') this.callInfo.duration = duration;
try {
this.requestor.request(this.application.call_status_hook, this.callInfo.toJSON());
this.requestor.request('call:status', this.application.call_status_hook, this.callInfo.toJSON());
} catch (err) {
this.logger.info(err, `SingleDialer:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}

View File

@@ -1,8 +1,9 @@
const assert = require('assert');
const noopLogger = {info: () => {}, error: () => {}};
const {LifeCycleEvents} = require('./constants');
const { v4: uuidv4 } = require('uuid');
const {LifeCycleEvents, FS_UUID_SET_NAME} = require('./constants');
const Emitter = require('events');
const debug = require('debug')('jambonz:feature-server');
const noopLogger = {info: () => {}, error: () => {}};
module.exports = (logger) => {
logger = logger || noopLogger;
@@ -16,6 +17,10 @@ module.exports = (logger) => {
assert.ok(sbcs.length, 'JAMBONES_SBCS env var is empty or misconfigured');
logger.info({sbcs}, 'SBC inventory');
}
else if (process.env.K8S && process.env.K8S_SBC_SIP_SERVICE_NAME) {
sbcs = [`${process.env.K8S_SBC_SIP_SERVICE_NAME}:5060`];
logger.info({sbcs}, 'SBC inventory');
}
// listen for SNS lifecycle changes
let lifecycleEmitter = new Emitter();
@@ -27,6 +32,10 @@ module.exports = (logger) => {
lifecycleEmitter = await require('./aws-sns-lifecycle')(logger);
lifecycleEmitter
.on('SubscriptionConfirmation', ({publicIp}) => {
const {srf} = require('../..');
srf.locals.publicIp = publicIp;
})
.on(LifeCycleEvents.ScaleIn, () => {
logger.info('AWS scale-in notification: begin drying up calls');
dryUpCalls = true;
@@ -67,7 +76,6 @@ module.exports = (logger) => {
})();
}
// send OPTIONS pings to SBCs
async function pingProxies(srf) {
if (process.env.NODE_ENV === 'test') return;
@@ -90,29 +98,40 @@ module.exports = (logger) => {
}
}
}
if (process.env.K8S) {
setImmediate(() => {
logger.info('disabling OPTIONS pings since we are running as a kubernetes service');
const {srf} = require('../..');
const {addToSet} = srf.locals.dbHelpers;
const uuid = srf.locals.fsUUID = uuidv4();
addToSet(FS_UUID_SET_NAME, uuid)
.catch((err) => logger.info({err}, `Error adding ${uuid} to set ${FS_UUID_SET_NAME}`));
});
}
else {
// OPTIONS ping the SBCs from each feature server every 60 seconds
setInterval(() => {
const {srf} = require('../..');
pingProxies(srf);
}, process.env.OPTIONS_PING_INTERVAL || 30000);
// OPTIONS ping the SBCs from each feature server every 60 seconds
setInterval(() => {
const {srf} = require('../..');
pingProxies(srf);
}, 20000);
// initial ping once we are up
setTimeout(async() => {
// initial ping once we are up
setTimeout(async() => {
const {srf} = require('../..');
// if SBCs are auto-scaling, monitor them as they come and go
const {srf} = require('../..');
if (!process.env.JAMBONES_SBCS) {
const {monitorSet} = srf.locals.dbHelpers;
const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-sip`;
await monitorSet(setName, 10, (members) => {
sbcs = members;
logger.info(`sbc-pinger: SBC roster has changed, list of active SBCs is now ${sbcs}`);
});
}
// if SBCs are auto-scaling, monitor them as they come and go
if (!process.env.JAMBONES_SBCS) {
const {monitorSet} = srf.locals.dbHelpers;
const setName = `${(process.env.JAMBONES_CLUSTER_ID || 'default')}:active-sip`;
await monitorSet(setName, 10, (members) => {
sbcs = members;
logger.info(`sbc-pinger: SBC roster has changed, list of active SBCs is now ${sbcs}`);
});
}
pingProxies(srf);
}, 1000);
pingProxies(srf);
}, 1000);
}
return {
lifecycleEmitter,

View File

@@ -1,13 +0,0 @@
const CIDRMatcher = require('cidr-matcher');
const matcher = new CIDRMatcher([process.env.JAMBONES_NETWORK_CIDR]);
module.exports = (sbcList) => {
const obj = sbcList
.split(',')
.map((str) => {
const arr = /^(.*)\/(.*):(\d+)$/.exec(str);
return {protocol: arr[1], host: arr[2], port: arr[3]};
})
.find((obj) => 'udp' == obj.protocol && matcher.contains(obj.host));
if (obj) return `${obj.host}:${obj.port}`;
};

View File

@@ -0,0 +1,30 @@
const sdpTransform = require('sdp-transform');
const stripCodecs = (logger, remoteSdp, localSdp) => {
try {
const sdp = sdpTransform.parse(remoteSdp);
const local = sdpTransform.parse(localSdp);
const m = local.media
.find((m) => 'audio' === m.type);
const pt = m.rtp[0].payload;
/* manipulate on the audio section */
const audio = sdp.media.find((m) => 'audio' === m.type);
/* discard all of the codecs except the first in our 200 OK, and telephony-events */
const ptSaves = audio.rtp
.filter((r) => r.codec === 'telephone-event' || r.payload === pt)
.map((r) => r.payload);
const rtp = audio.rtp.filter((r) => ptSaves.includes(r.payload));
/* reattach the new rtp sections and stripped payload list */
audio.rtp = rtp;
audio.payloads = rtp.map((r) => r.payload).join(' ');
return sdpTransform.write(sdp);
} catch (err) {
logger.error({err, remoteSdp, localSdp}, 'strip-ancillary-codecs error');
}
};
module.exports = stripCodecs;

249
lib/utils/ws-requestor.js Normal file
View File

@@ -0,0 +1,249 @@
const assert = require('assert');
const BaseRequestor = require('./base-requestor');
const short = require('short-uuid');
const {HookMsgTypes} = require('./constants.json');
const Websocket = require('ws');
const snakeCaseKeys = require('./snakecase-keys');
const HttpRequestor = require('./http-requestor');
const MAX_RECONNECTS = 5;
const RESPONSE_TIMEOUT_MS = process.env.JAMBONES_WS_API_MSG_RESPONSE_TIMEOUT || 5000;
class WsRequestor extends BaseRequestor {
constructor(logger, account_sid, hook, secret) {
super(logger, account_sid, hook, secret);
this.connections = 0;
this.messagesInFlight = new Map();
this.maliciousClient = false;
assert(this._isAbsoluteUrl(this.url));
this.on('socket-closed', this._onSocketClosed.bind(this));
}
/**
* Send a JSON payload over the websocket. If this is the first request,
* open the websocket.
* All requests expect an ack message in response
* @param {object|string} hook - may be a absolute or relative url, or an object
* @param {string} [hook.url] - an absolute or relative url
* @param {string} [hook.method] - 'GET' or 'POST'
* @param {string} [hook.username] - if basic auth is protecting the endpoint
* @param {string} [hook.password] - if basic auth is protecting the endpoint
* @param {object} [params] - request parameters
*/
async request(type, hook, params) {
assert(HookMsgTypes.includes(type));
const url = hook.url || hook;
if (this.maliciousClient) {
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
return;
}
/* if we have an absolute url, and it is http then do a standard webhook */
if (this._isAbsoluteUrl(url) && url.startsWith('http')) {
this.logger.debug({hook}, 'WsRequestor: sending a webhook');
const requestor = new HttpRequestor(this.logger, this.account_sid, hook, this.secret);
return requestor.request(type, hook, params);
}
/* connect if necessary */
if (!this.ws) {
if (this.connections >= MAX_RECONNECTS) {
throw new Error(`max attempts connecting to ${this.url}`);
}
try {
const startAt = process.hrtime();
await this._connect();
const rtt = this._roundTrip(startAt);
this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']);
} catch (err) {
this.logger.error({err}, 'WsRequestor:request - failed connecting');
throw err;
}
}
assert(this.ws);
/* prepare and send message */
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
assert.ok(url, 'WsRequestor:request url was not provided');
const msgid = short.generate();
const obj = {
type,
msgid,
hook: type === 'verb:hook' ? url : undefined,
data: {...payload}
};
this.logger.debug({obj}, `WsRequestor:request ${url}`);
/* simple notifications */
if (['call:status', 'jambonz:error'].includes(type)) {
this.ws.send(JSON.stringify(obj));
return;
}
/* messages that require an ack */
return new Promise((resolve, reject) => {
/* give the far end a reasonable amount of time to ack our message */
const timer = setTimeout(() => {
const {failure} = this.messagesInFlight.get(msgid);
failure && failure(`timeout from far end for msgid ${msgid}`);
this.messagesInFlight.delete(msgid);
}, RESPONSE_TIMEOUT_MS);
/* save the message info for reply */
const startAt = process.hrtime();
this.messagesInFlight.set(msgid, {
success: (response) => {
clearTimeout(timer);
const rtt = this._roundTrip(startAt);
this.logger.info({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`);
this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']);
resolve(response);
},
failure: (err) => {
clearTimeout(timer);
reject(err);
}
});
/* send the message */
this.ws.send(JSON.stringify(obj));
});
}
close() {
this.logger.info('WsRequestor: closing socket');
if (this.ws) {
this.ws.close();
this.ws.removeAllListeners();
}
}
_connect() {
assert(!this.ws);
return new Promise((resolve, reject) => {
let opts = {
followRedirects: true,
maxRedirects: 2,
handshakeTimeout: 1000,
maxPayload: 8096,
};
if (this.username && this.password) opts = {...opts, auth: `${this.username}:${this.password}`};
this
.once('ready', (ws) => {
this.ws = ws;
this.removeAllListeners('not-ready');
resolve();
})
.once('not-ready', () => {
this.removeAllListeners('ready');
reject();
});
const ws = new Websocket(this.url, ['ws.jambonz.org'], opts);
this._setHandlers(ws);
});
}
_setHandlers(ws) {
ws
.on('open', this._onOpen.bind(this, ws))
.on('close', this._onClose.bind(this))
.on('message', this._onMessage.bind(this))
.on('unexpected-response', this._onUnexpectedResponse.bind(this, ws))
.on('error', this._onError.bind(this));
}
_onError(err) {
this.logger.info({url: this.url, err}, 'WsRequestor:_onError');
if (this.connections > 0) this.emit('socket-closed');
this.emit('not-ready');
}
_onOpen(ws) {
assert(!this.ws);
this.emit('ready', ws);
this.logger.info({url: this.url}, 'WsRequestor - successfully connected');
}
_onClose() {
this.logger.info({url: this.url}, 'WsRequestor - socket closed unexpectedly from remote side');
this.emit('socket-closed');
}
_onUnexpectedResponse(ws, req, res) {
assert(!this.ws);
this.logger.info({
headers: res.headers,
statusCode: res.statusCode,
statusMessage: res.statusMessage
}, 'WsRequestor - unexpected response');
this.emit('connection-failure');
this.emit('not-ready');
}
_onSocketClosed() {
this.ws = null;
if (this.connections > 0) {
if (++this.connections < MAX_RECONNECTS) {
setImmediate(this.connect.bind(this));
}
}
}
_onMessage(content, isBinary) {
if (this.isBinary) {
this.logger.info({url: this.url}, 'WsRequestor:_onMessage - discarding binary message');
this.maliciousClient = true;
this.ws.close();
return;
}
/* messages must be JSON format */
try {
const {type, msgid, command, queueCommand = false, data} = JSON.parse(content);
assert.ok(type, 'type property not supplied');
switch (type) {
case 'ack':
assert.ok(msgid, 'msgid not supplied');
this._recvAck(msgid, data);
break;
case 'command':
assert.ok(command, 'command property not supplied');
assert.ok(data, 'data property not supplied');
this._recvCommand(msgid, command, queueCommand, data);
break;
default:
assert.ok(false, `invalid type property: ${type}`);
}
} catch (err) {
this.logger.info({err}, 'WsRequestor:_onMessage - invalid incoming message');
}
}
_recvAck(msgid, data) {
const obj = this.messagesInFlight.get(msgid);
if (!obj) {
this.logger.info({url: this.url}, `WsRequestor:_recvAck - ack to unknown msgid ${msgid}, discarding`);
return;
}
this.logger.debug({url: this.url}, `WsRequestor:_recvAck - received response to ${msgid}`);
this.messagesInFlight.delete(msgid);
const {success} = obj;
success && success(data);
}
_recvCommand(msgid, command, queueCommand, data) {
// TODO: validate command
this.logger.info({msgid, command, queueCommand, data}, 'received command');
this.emit('command', {msgid, command, queueCommand, data});
}
}
module.exports = WsRequestor;

1589
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{
"name": "jambonz-feature-server",
"version": "v0.6.7-rc8",
"version": "v0.7.3",
"main": "app.js",
"engines": {
"node": ">= 10.16.0"
@@ -26,26 +26,31 @@
"jslint": "eslint app.js lib"
},
"dependencies": {
"@jambonz/db-helpers": "^0.6.14",
"@cognigy/socket-client": "^4.5.5",
"@jambonz/db-helpers": "^0.6.16",
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.1",
"@jambonz/realtimedb-helpers": "^0.4.10",
"@jambonz/stats-collector": "^0.1.5",
"@jambonz/time-series": "^0.1.5",
"aws-sdk": "^2.846.0",
"@jambonz/realtimedb-helpers": "^0.4.26",
"@jambonz/stats-collector": "^0.1.6",
"@jambonz/time-series": "^0.1.6",
"aws-sdk": "^2.1073.0",
"bent": "^7.3.12",
"cidr-matcher": "^2.1.1",
"debug": "^4.3.1",
"debug": "^4.3.2",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^2.0.11",
"drachtio-srf": "^4.4.55",
"drachtio-fsmrf": "^2.0.13",
"drachtio-srf": "^4.4.61",
"express": "^4.17.1",
"helmet": "^5.0.2",
"ip": "^1.1.5",
"moment": "^2.29.1",
"parse-url": "^5.0.2",
"pino": "^6.11.2",
"parse-url": "^5.0.7",
"pino": "^6.13.4",
"short-uuid": "^4.2.0",
"to-snake-case": "^1.0.0",
"uuid": "^8.3.2",
"verify-aws-sns-signature": "^0.0.6",
"ws": "^8.5.0",
"xml2js": "^0.4.23"
},
"devDependencies": {
@@ -55,5 +60,9 @@
"eslint-plugin-promise": "^4.3.1",
"nyc": "^15.1.0",
"tape": "^5.2.2"
},
"optionalDependencies": {
"bufferutil": "^4.0.6",
"utf-8-validate": "^5.0.8"
}
}

View File

@@ -9,6 +9,7 @@ networks:
services:
mysql:
image: mysql:5.7
platform: linux/x86_64
ports:
- "3360:3306"
environment:
@@ -123,7 +124,7 @@ services:
ipv4_address: 172.38.0.63
influxdb:
image: influxdb:1.8-alpine
image: influxdb:1.8
ports:
- "8086:8086"
networks: