Feature/minimal media anchoring (#36)

* initial WIP to remove freeswitch from media path when not recording or transcribing dial calls

* implement release-media and anchor-media operations

* mute/unmute now handled by rtpengine

* Dial: dtmf detection now based on SIP INFO events from sbcs and rtpengine

* add reason to gather action, bugfixes for transcribe and say
This commit is contained in:
Dave Horton
2021-10-21 11:59:45 -04:00
committed by GitHub
parent bedf25c6a2
commit 72345f83c1
10 changed files with 228 additions and 4838 deletions

View File

@@ -167,7 +167,7 @@ module.exports = function(srf, logger) {
if (0 === app.tasks.length) throw new Error('no application provided');
next();
} catch (err) {
logger.info(`Error retrieving or parsing application: ${err.message}`);
logger.info({err}, `Error retrieving or parsing application: ${err.message}`);
res.send(480, {headers: {'X-Reason': err.message}});
}
}

View File

@@ -8,13 +8,14 @@ const CallSession = require('./call-session');
*/
class AdultingCallSession extends CallSession {
constructor({logger, application, singleDialer, tasks, callInfo}) {
constructor({logger, application, singleDialer, tasks, callInfo, accountInfo}) {
super({
logger,
application,
srf: singleDialer.dlg.srf,
tasks,
callInfo
callInfo,
accountInfo
});
this.sd = singleDialer;

View File

@@ -926,6 +926,28 @@ class CallSession extends Emitter {
};
}
async releaseMediaToSBC(remoteSdp) {
assert(this.dlg && this.dlg.connected && this.ep && typeof remoteSdp === 'string');
await this.dlg.modify(remoteSdp, {
headers: {
'X-Reason': 'release-media'
}
});
this.ep.destroy()
.then(() => this.ep = null)
.catch((err) => this.logger.error({err}, 'CallSession:releaseMediaToSBC: Error destroying endpoint'));
}
async reAnchorMedia() {
assert(this.dlg && this.dlg.connected && !this.ep);
this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
await this.dlg.modify(this.ep.local.sdp, {
headers: {
'X-Reason': 'anchor-media'
}
});
}
/**
* Called any time call status changes. This method both invokes the
* call_status_hook callback as well as updates the realtime database

View File

@@ -130,6 +130,10 @@ class TaskDial extends Task {
get name() { return TaskName.Dial; }
get canReleaseMedia() {
return !process.env.ANCHOR_MEDIA_ALWAYS && !this.listenTask && !this.transcribeTask;
}
async exec(cs) {
await super.exec(cs);
try {
@@ -142,13 +146,12 @@ class TaskDial extends Task {
this.epOther.play(this.dialMusic).catch((err) => {});
}
}
if (this.epOther) this._installDtmfDetection(cs, this.epOther, this.parentDtmfCollector);
await this._attemptCalls(cs);
await this.awaitTaskDone();
this.logger.debug({callSid: this.cs.callSid}, 'Dial:exec task is done, sending actionHook if any');
await this.performAction(this.results, this.killReason !== KillReason.Replaced);
this._removeDtmfDetection(cs, this.epOther);
this._removeDtmfDetection(cs, this.ep);
this._removeDtmfDetection(cs.dlg);
this._removeDtmfDetection(this.dlg);
} catch (err) {
this.logger.error({err}, 'TaskDial:exec terminating with error');
this.kill(cs);
@@ -158,8 +161,8 @@ class TaskDial extends Task {
async kill(cs, reason) {
super.kill(cs);
this.killReason = reason || KillReason.Hangup;
this._removeDtmfDetection(this.cs, this.epOther);
this._removeDtmfDetection(this.cs, this.ep);
this._removeDtmfDetection(cs.dlg);
this._removeDtmfDetection(this.dlg);
this._killOutdials();
if (this.sd) {
this.sd.kill();
@@ -177,9 +180,14 @@ class TaskDial extends Task {
* @param {*} tasks - array of play/say tasks to execute
*/
async whisper(tasks, callSid) {
if (!this.epOther || !this.ep) return this.logger.info('Dial:whisper: no paired endpoint found');
try {
const cs = this.callSession;
if (!this.ep && !this.epOther) {
await this.reAnchorMedia(this.callSession, this.sd);
}
if (!this.epOther || !this.ep) return this.logger.info('Dial:whisper: no paired endpoint found');
this.logger.debug('Dial:whisper unbridging endpoints');
await this.epOther.unbridge();
this.logger.debug('Dial:whisper executing tasks');
@@ -188,7 +196,12 @@ class TaskDial extends Task {
await task.exec(cs, callSid === this.callSid ? this.ep : this.epOther);
}
this.logger.debug('Dial:whisper tasks complete');
if (!cs.callGone && this.epOther) this.epOther.bridge(this.ep);
if (!cs.callGone && this.epOther) {
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia) this._releaseMedia(cs, this.sd);
else this.epOther.bridge(this.ep);
}
} catch (err) {
this.logger.error(err, 'Dial:whisper error');
}
@@ -198,14 +211,19 @@ class TaskDial extends Task {
* mute or unmute one side of the call
*/
async mute(callSid, doMute) {
if (!this.epOther || !this.ep) return this.logger.info('Dial:mute: no paired endpoint found');
try {
const parentCall = callSid !== this.callSid;
const ep = parentCall ? this.epOther : this.ep;
await ep[doMute ? 'mute' : 'unmute']();
this.logger.debug(`Dial:mute ${doMute ? 'muted' : 'unmuted'} ${parentCall ? 'parentCall' : 'childCall'}`);
const dlg = parentCall ? this.callSession.dlg : this.dlg;
const hdr = `${doMute ? 'mute' : 'unmute'} call leg`;
try {
/* let rtpengine do the mute / unmute */
await dlg.request({
method: 'INFO',
headers: {
'X-Reason': hdr
}
});
} catch (err) {
this.logger.error(err, 'Dial:mute error');
this.logger.info({err}, `Dial:mute - ${hdr} error`);
}
}
@@ -217,37 +235,41 @@ class TaskDial extends Task {
this.dials.clear();
}
_installDtmfDetection(cs, ep, dtmfDetector) {
if (ep && this.dtmfHook && !ep.dtmfDetector) {
ep.dtmfDetector = dtmfDetector;
ep.on('dtmf', this._onDtmf.bind(this, cs, ep));
}
}
_removeDtmfDetection(cs, ep) {
if (ep) {
delete ep.dtmfDetector;
ep.removeAllListeners('dtmf');
_installDtmfDetection(cs, dlg) {
dlg.on('info', this._onInfo.bind(this, cs, dlg));
}
_removeDtmfDetection(dlg) {
dlg && dlg.removeAllListeners('info');
}
_onDtmf(cs, ep, evt) {
if (ep.dtmfDetector) {
const match = ep.dtmfDetector.keyPress(evt.dtmf);
if (match) {
this.logger.debug({callSid: this.cs.callSid}, `Dial:_onDtmf triggered dtmf match: ${match}`);
const requestor = ep.dtmfDetector === this.parentDtmfCollector ?
cs.requestor :
(this.sd ? this.sd.requestor : null);
if (!requestor) {
this.logger.info(`Dial:_onDtmf got digits on B leg after adulting: ${evt.dtmf}`);
_onInfo(cs, dlg, req, res) {
res.send(200);
if (req.get('Content-Type') !== 'application/dtmf-relay') return;
const dtmfDetector = dlg === cs.dlg ? this.parentDtmfCollector : this.childDtmfCollector;
if (!dtmfDetector) return;
let requestor, callSid, callInfo;
if (dtmfDetector === this.parentDtmfCollector) {
requestor = cs.requestor;
callSid = cs.callSid;
callInfo = cs.callInfo;
}
else {
requestor.request(this.dtmfHook, {dtmf: match, ...cs.callInfo.toJSON()})
requestor = this.sd?.requestor;
callSid = this.sd?.callSid;
callInfo = this.sd?.callInfo;
}
if (!requestor) return;
const arr = /Signal=([0-9#*])/.exec(req.body);
if (!arr) return;
const key = arr[1];
const match = dtmfDetector.keyPress(key);
if (match) {
this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`);
requestor.request(this.dtmfHook, {dtmf: match, ...callInfo.toJSON()})
.catch((err) => this.logger.info(err, 'Dial:_onDtmf - error'));
}
}
}
}
async _initializeInbound(cs) {
const ep = await cs._evalEndpointPrecondition(this);
@@ -255,7 +277,7 @@ class TaskDial extends Task {
debug(`Dial:__initializeInbound allocated ep for incoming call: ${ep.uuid}`);
/* send outbound legs back to the same SBC (to support static IP feature) */
if (!this.proxy) this.proxy = `${cs.req.source_address}:${cs.req.source_port};transport=tcp`;
if (!this.proxy) this.proxy = `${cs.req.source_address}:${cs.req.source_port}`;
if (this.dialMusic) {
// play dial music to caller while we outdial
@@ -322,7 +344,8 @@ class TaskDial extends Task {
sbcAddress,
target: t,
opts,
callInfo: cs.callInfo
callInfo: cs.callInfo,
accountInfo: cs.accountInfo
});
this.dials.set(sd.callSid, sd);
@@ -366,9 +389,13 @@ class TaskDial extends Task {
break;
}
})
.on('accept', () => {
.on('accept', async() => {
this.logger.debug(`Dial:_attemptCalls - we have a winner: ${sd.callSid}`);
this._connectSingleDial(cs, sd);
try {
await this._connectSingleDial(cs, sd);
} catch (err) {
this.logger.info({err}, 'Dial:_attemptCalls - Error calling _connectSingleDial ');
}
})
.on('decline', () => {
this.logger.debug(`Dial:_attemptCalls - declined: ${sd.callSid}`);
@@ -394,8 +421,8 @@ class TaskDial extends Task {
});
}
_connectSingleDial(cs, sd) {
if (!this.bridged) {
async _connectSingleDial(cs, sd) {
if (!this.bridged && !this.canReleaseMedia) {
this.logger.debug('Dial:_connectSingleDial bridging endpoints');
if (this.epOther) {
this.epOther.api('uuid_break', this.epOther.uuid);
@@ -405,7 +432,7 @@ class TaskDial extends Task {
}
// ding! ding! ding! we have a winner
this._selectSingleDial(cs, sd);
await this._selectSingleDial(cs, sd);
this._killOutdials(); // NB: order is important
}
@@ -418,7 +445,7 @@ class TaskDial extends Task {
* - launch any nested tasks
* - and establish a handler to clean up if the called party hangs up
*/
_selectSingleDial(cs, sd) {
async _selectSingleDial(cs, sd) {
debug(`Dial:_selectSingleDial ep for outbound call: ${sd.ep.uuid}`);
this.dials.delete(sd.callSid);
@@ -426,7 +453,7 @@ class TaskDial extends Task {
this.callSid = sd.callSid;
if (this.earlyMedia) {
debug('Dial:_selectSingleDial propagating answer supervision on A leg now that B is connected');
cs.propagateAnswer();
await cs.propagateAnswer();
}
if (this.timeLimit) {
this.timerMaxCallDuration = setTimeout(() => {
@@ -442,7 +469,7 @@ class TaskDial extends Task {
this.logger.debug('Dial:_selectSingleDial called party hungup, ending dial operation');
sessionTracker.remove(this.callSid);
if (this.timerMaxCallDuration) clearTimeout(this.timerMaxCallDuration);
this.ep.unbridge();
this.ep && this.ep.unbridge();
this.kill(cs);
}
});
@@ -453,10 +480,14 @@ class TaskDial extends Task {
dialCallSid: sd.callSid,
});
if (this.childDtmfCollector) this._installDtmfDetection(cs, this.ep, this.childDtmfCollector);
if (this.parentDtmfCollector) this._installDtmfDetection(cs, cs.dlg);
if (this.childDtmfCollector) this._installDtmfDetection(cs, this.dlg);
if (this.transcribeTask) this.transcribeTask.exec(cs, this.ep);
if (this.listenTask) this.listenTask.exec(cs, this.ep);
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia) this._releaseMedia(cs, sd);
}
_bridgeEarlyMedia(sd) {
@@ -468,6 +499,33 @@ class TaskDial extends Task {
}
}
/**
* Release the media from freeswitch
* @param {*} cs
* @param {*} sd
*/
async _releaseMedia(cs, sd) {
assert(cs.ep && sd.ep);
try {
this.logger.info('Dial:_releaseMedia - releasing media from freewitch');
const aLegSdp = cs.ep.remote.sdp;
const bLegSdp = sd.ep.remote.sdp;
await Promise.all[sd.releaseMediaToSBC(aLegSdp), cs.releaseMediaToSBC(bLegSdp)];
this.epOther = null;
this.logger.info('Dial:_releaseMedia - successfully released media from freewitch');
} catch (err) {
this.logger.info({err}, 'Dial:_releaseMedia error');
}
}
async reAnchorMedia(cs, sd) {
if (cs.ep && sd.ep) return;
this.logger.info('Dial:reAnchorMedia - re-anchoring media to freewitch');
await Promise.all([sd.reAnchorMedia(), cs.reAnchorMedia()]);
this.epOther = cs.ep;
}
}
module.exports = TaskDial;

View File

@@ -235,14 +235,15 @@ class TaskGather extends Task {
this._clearTimer();
if (reason.startsWith('dtmf')) {
await this.performAction({digits: this.digitBuffer});
await this.performAction({digits: this.digitBuffer, reason: 'dtmfDetected'});
}
else if (reason.startsWith('speech')) {
if (this.parentTask) this.parentTask.emit('transcription', evt);
else await this.performAction({speech: evt});
else await this.performAction({speech: evt, reason: 'speechDetected'});
}
else if (reason.startsWith('timeout') && this.parentTask) {
this.parentTask.emit('timeout', evt);
else if (reason.startsWith('timeout')) {
if (this.parentTask) this.parentTask.emit('timeout', evt);
else await this.performAction({reason: 'timeout'});
}
this.notifyTaskDone();
}

View File

@@ -21,9 +21,15 @@ class TaskSay extends Task {
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, srf);
const {writeAlerts, AlertType, stats} = srf.locals;
const {synthAudio} = srf.locals.dbHelpers;
const vendor = this.synthesizer.vendor || cs.speechSynthesisVendor;
const language = this.synthesizer.language || cs.speechSynthesisLanguage;
const voice = this.synthesizer.voice || cs.speechSynthesisVoice;
const vendor = ('default' === this.synthesizer.vendor || !this.synthesizer.vendor) ?
cs.speechSynthesisVendor :
this.synthesizer.vendor;
const language = ('default' === this.synthesizer.language || !this.synthesizer.language) ?
cs.speechSynthesisLanguage :
this.synthesizer.language;
const voice = ('default' === this.synthesizer.voice || !this.synthesizer.voice) ?
cs.speechSynthesisVoice :
this.synthesizer.voice;
const salt = cs.callSid;
const credentials = cs.getSpeechCredentials(vendor, 'tts');

View File

@@ -91,7 +91,7 @@ class TaskTranscribe extends Task {
ep.addCustomEventListener(GoogleTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.NoAudioDetected, this._onNoAudio.bind(this, cs, ep));
ep.addCustomEventListener(GoogleTranscriptionEvents.MaxDurationExceeded,
this._onMaxDurationExceeded.bind(this, ep));
this._onMaxDurationExceeded.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.NoAudioDetected, this._onNoAudio.bind(this, cs, ep));
ep.addCustomEventListener(AwsTranscriptionEvents.MaxDurationExceeded,

View File

@@ -17,7 +17,7 @@ const moment = require('moment');
const { v4: uuidv4 } = require('uuid');
class SingleDialer extends Emitter {
constructor({logger, sbcAddress, target, opts, application, callInfo}) {
constructor({logger, sbcAddress, target, opts, application, callInfo, accountInfo}) {
super();
assert(target.type);
@@ -31,6 +31,8 @@ class SingleDialer extends Emitter {
this.bindings = logger.bindings();
this.parentCallInfo = callInfo;
this.accountInfo = accountInfo;
this.callGone = false;
this.callSid = uuidv4();
@@ -62,6 +64,7 @@ class SingleDialer extends Emitter {
opts = opts || {};
opts.headers = opts.headers || {};
opts.headers = {...opts.headers, 'X-Call-Sid': this.callSid};
this.ms = ms;
let uri, to;
try {
switch (this.target.type) {
@@ -201,7 +204,7 @@ class SingleDialer extends Emitter {
const duration = moment().diff(connectTime, 'seconds');
this.logger.debug('SingleDialer:exec called party hung up');
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
this.ep.destroy();
this.ep && this.ep.destroy();
})
.on('refresh', () => this.logger.info('SingleDialer:exec - dialog refreshed by uas'))
.on('modify', async(req, res) => {
@@ -299,20 +302,48 @@ class SingleDialer extends Emitter {
this.logger = logger;
this.adulting = true;
this.emit('adulting');
if (this.ep) {
await this.ep.unbridge()
.catch((err) => this.logger.info({err}, 'SingleDialer:doAdulting - failed to unbridge ep'));
this.ep.play('silence_stream://1000');
}
else {
await this.reAnchorMedia();
}
const cs = new AdultingCallSession({
logger: this.logger,
singleDialer: this,
application,
callInfo: this.callInfo,
accountInfo: this.accountInfo,
tasks
});
cs.exec();
return cs;
}
async releaseMediaToSBC(remoteSdp) {
assert(this.dlg && this.dlg.connected && this.ep && typeof remoteSdp === 'string');
await this.dlg.modify(remoteSdp, {
headers: {
'X-Reason': 'release-media'
}
});
this.ep.destroy()
.then(() => this.ep = null)
.catch((err) => this.logger.error({err}, 'SingleDialer:releaseMediaToSBC: Error destroying endpoint'));
}
async reAnchorMedia() {
assert(this.dlg && this.dlg.connected && !this.ep);
this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
await this.dlg.modify(this.ep.local.sdp, {
headers: {
'X-Reason': 'anchor-media'
}
});
}
_notifyCallStatusChange({callStatus, sipStatus, duration}) {
assert((typeof duration === 'number' && callStatus === CallStatus.Completed) ||
(!duration && callStatus !== CallStatus.Completed),
@@ -335,9 +366,9 @@ class SingleDialer extends Emitter {
}
}
function placeOutdial({logger, srf, ms, sbcAddress, target, opts, application, callInfo}) {
function placeOutdial({logger, srf, ms, sbcAddress, target, opts, application, callInfo, accountInfo}) {
const myOpts = deepcopy(opts);
const sd = new SingleDialer({logger, sbcAddress, target, myOpts, application, callInfo});
const sd = new SingleDialer({logger, sbcAddress, target, myOpts, application, callInfo, accountInfo});
sd.exec(srf, ms, myOpts);
return sd;
}

4809
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -37,7 +37,7 @@
"debug": "^4.3.1",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^2.0.7",
"drachtio-srf": "^4.4.50",
"drachtio-srf": "^4.4.55",
"express": "^4.17.1",
"ip": "^1.1.5",
"moment": "^2.29.1",