feat: record all calls (#352)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* fix jslint

* fix

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* fix: add file ext

* fix: record format

* fix outbound

* update to drachtio-fsmrf with support for multiple recording streams on a call

* enable DTMF during background listen recording

* fix merge commit

---------

Co-authored-by: Dave Horton <daveh@beachdognet.com>
This commit is contained in:
Hoan Luu Huu
2023-06-10 01:54:53 +07:00
committed by GitHub
parent ab0452879e
commit aad24744f3
11 changed files with 114 additions and 29 deletions

4
app.js
View File

@@ -20,7 +20,9 @@ const tracer = require('./tracer')(JAMBONES_OTEL_SERVICE_NAME);
const api = require('@opentelemetry/api');
srf.locals = {...srf.locals, otel: {tracer, api}};
const opts = {level: JAMBONES_LOGLEVEL};
const opts = {
level: JAMBONES_LOGLEVEL
};
const pino = require('pino');
const logger = pino(opts, pino.destination({sync: false}));
const {LifeCycleEvents, FS_UUID_SET_NAME} = require('./lib/utils/constants');

View File

@@ -144,6 +144,9 @@ const JAMBONES_REDIS_SENTINELS = process.env.JAMBONES_REDIS_SENTINELS ? {
username: process.env.JAMBONES_REDIS_SENTINEL_USERNAME
})
} : null;
const JAMBONZ_RECORD_WS_BASE_URL = process.env.JAMBONZ_RECORD_WS_BASE_URL;
const JAMBONZ_RECORD_WS_USERNAME = process.env.JAMBONZ_RECORD_WS_USERNAME;
const JAMBONZ_RECORD_WS_PASSWORD = process.env.JAMBONZ_RECORD_WS_PASSWORD;
module.exports = {
JAMBONES_MYSQL_HOST,
@@ -219,5 +222,8 @@ module.exports = {
MICROSOFT_REGION,
MICROSOFT_API_KEY,
SONIOX_API_KEY,
DEEPGRAM_API_KEY
DEEPGRAM_API_KEY,
JAMBONZ_RECORD_WS_BASE_URL,
JAMBONZ_RECORD_WS_USERNAME,
JAMBONZ_RECORD_WS_PASSWORD
};

View File

@@ -26,6 +26,9 @@ router.post('/', async(req, res) => {
const restDial = makeTask(logger, {'rest:dial': req.body});
restDial.appJson = app_json;
const {lookupAccountDetails, lookupCarrierByPhoneNumber, lookupCarrier} = dbUtils(logger, srf);
const {
lookupAppBySid
} = srf.locals.dbHelpers;
const {getSBC, getFreeswitch} = srf.locals;
const sbcAddress = getSBC();
if (!sbcAddress) throw new Error('no available SBCs for outbound call creation');
@@ -41,6 +44,9 @@ router.post('/', async(req, res) => {
const account = await lookupAccountBySid(req.body.account_sid);
const accountInfo = await lookupAccountDetails(req.body.account_sid);
const callSid = uuidv4();
const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null;
const record_all_calls = account.record_all_calls || (application && application.record_all_calls);
const recordOutputFormat = account.record_format || 'mp3';
opts.headers = {
...opts.headers,
@@ -49,7 +55,8 @@ router.post('/', async(req, res) => {
'X-Call-Sid': callSid,
'X-Account-Sid': accountSid,
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost})
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat})
};
switch (target.type) {

View File

@@ -11,7 +11,7 @@ const dbUtils = require('./utils/db-utils');
const RootSpan = require('./utils/call-tracer');
const listTaskNames = require('./utils/summarize-tasks');
const {
JAMBONES_MYSQL_REFRESH_TTL,
JAMBONES_MYSQL_REFRESH_TTL
} = require('./config');
module.exports = function(srf, logger) {
@@ -322,6 +322,7 @@ module.exports = function(srf, logger) {
const httpHeaders = b3 && { b3 };
json = await app.requestor.request('session:new', app.call_hook, params, httpHeaders);
}
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));
span?.setAttributes({
'http.statusCode': 200,

View File

@@ -19,7 +19,10 @@ const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const {
JAMBONES_INJECT_CONTENT,
AWS_REGION
AWS_REGION,
JAMBONZ_RECORD_WS_BASE_URL,
JAMBONZ_RECORD_WS_USERNAME,
JAMBONZ_RECORD_WS_PASSWORD,
} = require('../config');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
@@ -132,6 +135,11 @@ class CallSession extends Emitter {
return this.callInfo.callStatus;
}
get isBackGroundListen() {
return !(this.backgroundListenTask === null ||
this.backgroundListenTask === undefined);
}
/**
* SIP call-id for the call
*/
@@ -495,7 +503,7 @@ class CallSession extends Emitter {
}
}
async startBackgroundListen(opts) {
async startBackgroundListen(opts, bugname) {
if (this.isListenEnabled) {
this.logger.info('CallSession:startBackgroundListen - listen is already enabled, ignoring request');
return;
@@ -504,8 +512,9 @@ class CallSession extends Emitter {
this.logger.debug({opts}, 'CallSession:startBackgroundListen');
const t = normalizeJambones(this.logger, [opts]);
this.backgroundListenTask = makeTask(this.logger, t[0]);
this.backgroundListenTask.bugname = bugname;
const resources = await this._evaluatePreconditions(this.backgroundListenTask);
const {span, ctx} = this.rootSpan.startChildSpan(`background-gather:${this.backgroundListenTask.summary}`);
const {span, ctx} = this.rootSpan.startChildSpan(`background-listen:${this.backgroundListenTask.summary}`);
this.backgroundListenTask.span = span;
this.backgroundListenTask.ctx = ctx;
this.backgroundListenTask.exec(this, resources)
@@ -528,6 +537,7 @@ class CallSession extends Emitter {
}
async stopBackgroundListen() {
this.logger.debug('CallSession:stopBackgroundListen');
try {
if (this.backgroundListenTask) {
this.backgroundListenTask.removeAllListeners();
@@ -536,7 +546,6 @@ class CallSession extends Emitter {
} catch (err) {
this.logger.info({err}, 'CallSession:stopBackgroundListen - Error stopping listen task');
}
this.backgroundListenTask = null;
}
async enableBotMode(gather, autoEnable) {
@@ -1341,7 +1350,10 @@ class CallSession extends Emitter {
}
// we are going from an early media connection to answer
await this.propagateAnswer();
if (this.direction === CallDirection.Inbound) {
// only do this for inbound call.
await this.propagateAnswer();
}
return {
...resources,
...(this.isSipRecCallSession && {ep2: this.ep2})
@@ -1512,7 +1524,6 @@ class CallSession extends Emitter {
}
this.dlg.on('modify', this._onReinvite.bind(this));
this.dlg.on('refer', this._onRefer.bind(this));
this.logger.debug(`CallSession:propagateAnswer - answered callSid ${this.callSid}`);
}
}
@@ -1749,6 +1760,13 @@ class CallSession extends Emitter {
async _notifyCallStatusChange({callStatus, sipStatus, sipReason, duration}) {
if (this.callMoved) return;
if (callStatus === CallStatus.InProgress) {
// nice, call is in progress, good time to enable record
await this.enableRecordAllCall();
} else if (callStatus == CallStatus.Completed && this.isBackGroundListen) {
await this.stopBackgroundListen();
}
/* race condition: we hang up at the same time as the caller */
if (callStatus === CallStatus.Completed) {
if (this.notifiedComplete) return;
@@ -1779,6 +1797,23 @@ class CallSession extends Emitter {
.catch((err) => this.logger.error(err, 'redis error'));
}
async enableRecordAllCall() {
if (this.accountInfo.account.record_all_calls || this.application.record_all_calls) {
const listenOpts = {
url: `${JAMBONZ_RECORD_WS_BASE_URL}/record/${this.accountInfo.account.bucket_credential.vendor}`,
wsAuth: {
username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD
},
mixType : 'stereo',
passDtmf: true
};
this.logger.debug({listenOpts}, 'Record all calls: enabling listen');
await this.startBackgroundListen({verb: 'listen', ...listenOpts}, 'jambonz-session-record');
}
}
/**
* notifyTaskError - only used when websocket connection is used instead of webhooks
*/

View File

@@ -137,6 +137,7 @@ class TaskDial extends Task {
get canReleaseMedia() {
const keepAnchor = this.data.anchorMedia ||
this.cs.isBackGroundListen ||
ANCHOR_MEDIA_ALWAYS ||
this.listenTask ||
this.transcribeTask ||

View File

@@ -3,6 +3,7 @@ const {TaskName, TaskPreconditions, ListenEvents, ListenStatus} = require('../ut
const makeTask = require('./make_task');
const moment = require('moment');
const MAX_PLAY_AUDIO_QUEUE_SIZE = 10;
const DTMF_SPAN_NAME = 'dtmf';
class TaskListen extends Task {
constructor(logger, opts, parentTask) {
@@ -29,6 +30,8 @@ class TaskListen extends Task {
get name() { return TaskName.Listen; }
set bugname(name) { this._bugname = name; }
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
@@ -65,7 +68,8 @@ class TaskListen extends Task {
if (this.ep && this.ep.connected) {
this.logger.debug('TaskListen:kill closing websocket');
try {
await this.ep.forkAudioStop();
const args = this._bugname ? [this._bugname] : [];
await this.ep.forkAudioStop(...args);
this.logger.debug('TaskListen:kill successfully closed websocket');
} catch (err) {
this.logger.info(err, 'TaskListen:kill');
@@ -85,13 +89,16 @@ class TaskListen extends Task {
async updateListen(status) {
if (!this.killed && this.ep && this.ep.connected) {
const args = this._bugname ? [this._bugname] : [];
this.logger.info(`TaskListen:updateListen status ${status}`);
switch (status) {
case ListenStatus.Pause:
await this.ep.forkAudioPause().catch((err) => this.logger.info(err, 'TaskListen: error pausing audio'));
await this.ep.forkAudioPause(...args)
.catch((err) => this.logger.info(err, 'TaskListen: error pausing audio'));
break;
case ListenStatus.Resume:
await this.ep.forkAudioResume().catch((err) => this.logger.info(err, 'TaskListen: error resuming audio'));
await this.ep.forkAudioResume(...args)
.catch((err) => this.logger.info(err, 'TaskListen: error resuming audio'));
break;
}
}
@@ -120,6 +127,7 @@ class TaskListen extends Task {
wsUrl: this.hook.url,
mixType: this.mixType,
sampling: this.sampleRate,
...(this._bugname && {bugname: this._bugname}),
metadata
});
this.recordStartTime = moment();
@@ -161,12 +169,21 @@ class TaskListen extends Task {
}
_onDtmf(ep, evt) {
this.logger.debug({evt}, `TaskListen:_onDtmf received dtmf ${evt.dtmf}`);
const {dtmf, duration} = evt;
this.logger.debug({evt}, `TaskListen:_onDtmf received dtmf ${dtmf}`);
if (this.passDtmf && this.ep?.connected) {
const obj = {event: 'dtmf', dtmf: evt.dtmf, duration: evt.duration};
this.ep.forkAudioSendText(obj)
const obj = {event: 'dtmf', dtmf, duration};
const args = this._bugname ? [this._bugname, obj] : [obj];
this.ep.forkAudioSendText(...args)
.catch((err) => this.logger.info({err}, 'TaskListen:_onDtmf error sending dtmf'));
}
/* add a child span for the dtmf event */
const msDuration = Math.floor((duration / 8000) * 1000);
const {span} = this.startChildSpan(`${DTMF_SPAN_NAME}:${dtmf}`);
span.setAttributes({dtmf, duration: `${msDuration}ms`});
span.end();
if (evt.dtmf === this.finishOnKey) {
this.logger.info(`TaskListen:_onDtmf terminating task due to dtmf ${evt.dtmf}`);
this.results.digits = evt.dtmf;
@@ -192,7 +209,15 @@ class TaskListen extends Task {
try {
const results = await ep.play(evt.file);
logger.debug(`Finished playing file, result: ${JSON.stringify(results)}`);
ep.forkAudioSendText({type: 'playDone', data: Object.assign({id: evt.id}, results)});
const obj = {
type: 'playDone',
data: {
id: evt.id,
...results
}
};
const args = this._bugname ? [this._bugname, obj] : [obj];
ep.forkAudioSendText(...args);
} catch (err) {
logger.error({err}, 'Error playing file');
}

View File

@@ -94,6 +94,12 @@ const speechMapper = (cred) => {
return obj;
};
const bucketCredentialDecrypt = (account) => {
const { bucket_credential } = account.account;
if (!bucket_credential || bucket_credential.vendor) return;
account.account.bucket_credential = JSON.parse(decrypt(bucket_credential));
};
module.exports = (logger, srf) => {
const {pool} = srf.locals.dbHelpers;
const pp = pool.promise();
@@ -112,9 +118,11 @@ module.exports = (logger, srf) => {
speech.push(speechMapper(s));
}
});
const account = r[0];
bucketCredentialDecrypt(account);
return {
...r[0],
...account,
speech
};
};

16
package-lock.json generated
View File

@@ -29,8 +29,8 @@
"bent": "^7.3.12",
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^3.0.21",
"drachtio-srf": "^4.5.23",
"drachtio-fsmrf": "^3.0.23",
"drachtio-srf": "^4.5.26",
"express": "^4.18.2",
"ip": "^1.1.8",
"moment": "^2.29.4",
@@ -3366,9 +3366,9 @@
}
},
"node_modules/drachtio-fsmrf": {
"version": "3.0.22",
"resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.22.tgz",
"integrity": "sha512-YVzGa72dcND1BahKf0/HuKqpEa3Ip2/O6k0tnS7U6x+VdKJ0CM3n9e6iyV8BvBfUMDbtKAv0LRPC/6O8NS7uPg==",
"version": "3.0.23",
"resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.23.tgz",
"integrity": "sha512-ElruNKuPzFiMOUH06PUd3dR9tElEGRhbP/gXxai58qhrqRQNLJxzCRJkbgbjrZdYWFQPHOAzy4ZQb7+qq0AUPw==",
"dependencies": {
"camel-case": "^4.1.2",
"debug": "^2.6.9",
@@ -11421,9 +11421,9 @@
}
},
"drachtio-fsmrf": {
"version": "3.0.22",
"resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.22.tgz",
"integrity": "sha512-YVzGa72dcND1BahKf0/HuKqpEa3Ip2/O6k0tnS7U6x+VdKJ0CM3n9e6iyV8BvBfUMDbtKAv0LRPC/6O8NS7uPg==",
"version": "3.0.23",
"resolved": "https://registry.npmjs.org/drachtio-fsmrf/-/drachtio-fsmrf-3.0.23.tgz",
"integrity": "sha512-ElruNKuPzFiMOUH06PUd3dR9tElEGRhbP/gXxai58qhrqRQNLJxzCRJkbgbjrZdYWFQPHOAzy4ZQb7+qq0AUPw==",
"requires": {
"camel-case": "^4.1.2",
"debug": "^2.6.9",

View File

@@ -45,8 +45,8 @@
"bent": "^7.3.12",
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^3.0.21",
"drachtio-srf": "^4.5.23",
"drachtio-fsmrf": "^3.0.23",
"drachtio-srf": "^4.5.26",
"express": "^4.18.2",
"ip": "^1.1.8",
"moment": "^2.29.4",

View File

@@ -36,7 +36,7 @@ obj.sippUac = (file, bindAddress, from='sipp', to='16174000000', loop=1) => {
'-cid_str', `%u-%p@%s-${idx++}`,
'172.38.0.50',
'-key','from', from,
'-key','to', to, '-trace_msg'
'-key','to', to, '-trace_msg', '-trace_err'
];
if (bindAddress) args.splice(5, 0, '--ip', bindAddress);