Feature/siprec server (#143)

* fixes from testing

* modify Task#exec to take resources as an object rather than argument list

* pass 2 endpoints to Transcribe when invoked in a SipRec call session

* logging

* change siprec invite to sendrecv just so freeswitch does not try to reinvite (TODO: block outgoing media at rtpengine)

* Config: when enabling recording, block until siprec dialog is established

* missed play verb in commit 031c79d

* linting

* bugfix: get final transcript in siprec call
This commit is contained in:
Dave Horton
2022-08-09 15:23:55 +02:00
committed by GitHub
parent f068aa5390
commit 3298918322
24 changed files with 87 additions and 65 deletions

3
app.js
View File

@@ -77,9 +77,10 @@ srf.use('invite', [
invokeWebCallback
]);
srf.invite((req, res) => {
srf.invite(async(req, res) => {
const isSipRec = !!req.locals.siprec;
const session = isSipRec ? new SipRecCallSession(req, res) : new InboundCallSession(req, res);
if (isSipRec) await session.answerSipRecCall();
session.exec();
});

View File

@@ -1,5 +1,5 @@
const { v4: uuidv4 } = require('uuid');
const {CallDirection, TaskName} = require('./utils/constants');
const {CallDirection, AllowedSipRecVerbs} = require('./utils/constants');
const {parseSiprecPayload} = require('./utils/siprec-utils');
const CallInfo = require('./session/call-info');
const HttpRequestor = require('./utils/http-requestor');
@@ -296,10 +296,9 @@ module.exports = function(srf, logger) {
if (0 === app.tasks.length) throw new Error('no application provided');
if (siprec) {
/* only transcribe and/or listen allowed on an incoming siprec call */
const tasks = app.tasks.filter((t) => [TaskName.Config, TaskName.Listen, TaskName.Transcribe].includes(t.name));
const tasks = app.tasks.filter((t) => AllowedSipRecVerbs.includes(t.name));
if (0 === tasks.length) {
logger.info({tasks: app.tasks}, 'only config, transcribe and/or listen allowed on an incoming siprec call');
logger.info({tasks: app.tasks}, 'no valid verbs in app found for an incoming siprec call');
throw new Error('invalid verbs for incoming siprec call');
}
if (tasks.length < app.tasks.length) {

View File

@@ -6,7 +6,8 @@ const {
CallStatus,
TaskName,
KillReason,
RecordState
RecordState,
AllowedSipRecVerbs
} = require('../utils/constants');
const moment = require('moment');
const assert = require('assert');
@@ -917,9 +918,7 @@ class CallSession extends Emitter {
}
if (this.isSipRecCallSession) {
const pruned = tasks.filter((t) =>
[TaskName.Config, TaskName.Listen, TaskName.Transcribe].includes(t.name)
);
const pruned = tasks.filter((t) => AllowedSipRecVerbs.includes(t.name));
if (0 === pruned.length) {
this.logger.info({tasks},
'CallSession:replaceApplication - only config, transcribe and/or listen allowed on an incoming siprec call');
@@ -1102,11 +1101,20 @@ class CallSession extends Emitter {
if (this.callGone) new Error(`${BADPRECONDITIONS}: call gone`);
if (this.ep) {
if (task.earlyMedia === true || this.dlg) return this.ep;
const resources = {ep: this.ep};
if (task.earlyMedia === true || this.dlg) {
return {
...resources,
...(this.isSipRecCallSession && {ep2: this.ep2})
};
}
// we are going from an early media connection to answer
await this.propagateAnswer();
return this.ep;
return {
...resources,
...(this.isSipRecCallSession && {ep2: this.ep2})
};
}
// need to allocate an endpoint
@@ -1129,7 +1137,7 @@ class CallSession extends Emitter {
if (this.direction === CallDirection.Inbound) {
if (task.earlyMedia && !this.req.finalResponseSent) {
this.res.send(183, {body: ep.local.sdp});
return ep;
return {ep};
}
this.logger.debug('propogating answer');
await this.propagateAnswer();
@@ -1138,7 +1146,7 @@ class CallSession extends Emitter {
// outbound call TODO
}
return ep;
return {ep};
} catch (err) {
if (err === CALLER_CANCELLED_ERR_MSG) {
this.logger.error(err, 'caller canceled quickly before we could respond, ending call');
@@ -1163,7 +1171,7 @@ class CallSession extends Emitter {
_evalStableCallPrecondition(task) {
if (this.callGone) throw new Error(`${BADPRECONDITIONS}: call gone`);
if (!this.dlg) throw new Error(`${BADPRECONDITIONS}: call was not answered`);
return this.dlg;
return {dlg: this.dlg};
}
/**
@@ -1202,7 +1210,7 @@ class CallSession extends Emitter {
* Hang up the call and free the media endpoint
*/
_clearResources() {
for (const resource of [this.dlg, this.ep]) {
for (const resource of [this.dlg, this.ep, this.ep2]) {
if (resource && resource.connected) resource.destroy();
}
this.dlg = null;

View File

@@ -1,5 +1,6 @@
const InboundCallSession = require('./inbound-call-session');
const {createSipRecPayload} = require('../utils/siprec-utils');
const {CallStatus} = require('../utils/constants');
/**
* @classdesc Subclass of InboundCallSession. This represents a CallSession that is
* established for an inbound SIPREC call.
@@ -13,44 +14,45 @@ class SipRecCallSession extends InboundCallSession {
this.sdp1 = sdp1;
this.sdp2 = sdp2;
this.metadata = metadata;
setImmediate(this._answerSipRecCall.bind(this));
}
async _answerSipRecCall() {
async answerSipRecCall() {
try {
this.ms = this.getMS();
this.ep = await this.ms.createEndpoint({remoteSdp: this.sdp1});
this.ep2 = await this.ms.createEndpoint({remoteSdp: this.sdp2});
let remoteSdp = this.sdp1.replace(/sendonly/, 'sendrecv');
this.ep = await this.ms.createEndpoint({remoteSdp});
//this.logger.debug({remoteSdp, localSdp: this.ep.local.sdp}, 'SipRecCallSession - allocated first endpoint');
remoteSdp = this.sdp2.replace(/sendonly/, 'sendrecv');
this.ep2 = await this.ms.createEndpoint({remoteSdp});
//this.logger.debug({remoteSdp, localSdp: this.ep2.local.sdp}, 'SipRecCallSession - allocated second endpoint');
await this.ep.bridge(this.ep2);
const combinedSdp = await createSipRecPayload(this.ep.local.sdp, this.ep2.local.sdp, this.logger);
/*
this.logger.debug({
sdp1: this.sdp1,
sdp2: this.sdp2,
combinedSdp
}, 'SipRecCallSession:_answerSipRecCall - created SIPREC payload');
*/
this.dlg = await this.srf.createUAS(this.req, this.res, {
headers: {
'Content-Type': 'application/sdp',
'X-Trace-ID': this.req.locals.traceId,
'X-Call-Sid': this.req.locals.callSid
},
localSdp: combinedSdp
});
this.dlg.on('destroy', this._callerHungup.bind(this));
this.wrapDialog(this.dlg);
this.dlg.callSid = this.callSid;
this.emit('callStatusChange', {sipStatus: 200, sipReason: 'OK', callStatus: CallStatus.InProgress});
this.dlg.on('modify', this._onReinvite.bind(this));
this.dlg.on('refer', this._onRefer.bind(this));
} catch (err) {
this.logger.error({err}, 'SipRecCallSession:_answerSipRecCall error:');
if (this.res && !this.res.finalResponseSent) this.res.send(500);
this._callReleased();
}
}
_callReleased() {
/* release that second endpoint we created, then call superclass implementation */
if (this.ep2?.connected) {
this.ep2.destroy();
this.ep2 = null;
}
super._callReleased();
}
}
module.exports = SipRecCallSession;

View File

@@ -72,7 +72,7 @@ class Conference extends Task {
get shouldRecord() { return this.record.path; }
get isRecording() { return this.recordingInProgress; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
const dlg = cs.dlg;

View File

@@ -4,8 +4,6 @@ const {TaskName, TaskPreconditions} = require('../utils/constants');
class TaskConfig extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
[
'synthesizer',
'recognizer',
@@ -28,7 +26,9 @@ class TaskConfig extends Task {
});
}
if (this.bargeIn.sticky) this.autoEnable = true;
this.preconditions = this.bargeIn.enable ? TaskPreconditions.Endpoint : TaskPreconditions.None;
this.preconditions = (this.bargeIn.enable || this.record?.action) ?
TaskPreconditions.Endpoint :
TaskPreconditions.None;
}
get name() { return TaskName.Config; }
@@ -108,7 +108,13 @@ class TaskConfig extends Task {
cs.disableBotMode();
}
}
if (this.record.action) cs.notifyRecordOptions(this.record);
if (this.record.action) {
try {
await cs.notifyRecordOptions(this.record);
} catch (err) {
this.logger.info({err}, 'Config: error starting recording');
}
}
}
async kill(cs) {

View File

@@ -23,7 +23,7 @@ class TaskDequeue extends Task {
get name() { return TaskName.Dequeue; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;

View File

@@ -248,7 +248,7 @@ class TaskDial extends Task {
const {span, ctx} = this.startChildSpan(`whisper:${task.summary}`);
task.span = span;
task.ctx = ctx;
await task.exec(cs, callSid === this.callSid ? this.ep : this.epOther);
await task.exec(cs, callSid === this.callSid ? {ep: this.ep} : {ep: this.epOther});
span.end();
}
this.logger.debug('Dial:whisper tasks complete');
@@ -631,8 +631,8 @@ class TaskDial extends Task {
if (this.parentDtmfCollector) this._installDtmfDetection(cs, cs.dlg);
if (this.childDtmfCollector) this._installDtmfDetection(cs, this.dlg);
if (this.transcribeTask) this.transcribeTask.exec(cs, this.epOther, this.ep);
if (this.listenTask) this.listenTask.exec(cs, this.epOther);
if (this.transcribeTask) this.transcribeTask.exec(cs, {ep2: this.epOther, ep:this.ep});
if (this.listenTask) this.listenTask.exec(cs, {ep: this.epOther});
if (this.startAmd) {
try {
this.startAmd(cs, this.ep, this, this.data.amd);

View File

@@ -64,7 +64,7 @@ class Dialogflow extends Task {
get name() { return TaskName.Dialogflow; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
try {

View File

@@ -12,7 +12,7 @@ class TaskDtmf extends Task {
get name() { return TaskName.Dtmf; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
try {

View File

@@ -37,7 +37,7 @@ class TaskEnqueue extends Task {
get name() { return TaskName.Enqueue; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
const dlg = cs.dlg;
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;

View File

@@ -116,7 +116,7 @@ class TaskGather extends Task {
return s;
}
async exec(cs, ep) {
async exec(cs, {ep}) {
this.logger.debug('Gather:exec');
await super.exec(cs);
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
@@ -164,7 +164,7 @@ class TaskGather extends Task {
const {span, ctx} = this.startChildSpan(`nested:${this.sayTask.summary}`);
this.sayTask.span = span;
this.sayTask.ctx = ctx;
this.sayTask.exec(cs, ep); // kicked off, _not_ waiting for it to complete
this.sayTask.exec(cs, {ep}); // kicked off, _not_ waiting for it to complete
this.sayTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing tts');
@@ -176,7 +176,7 @@ class TaskGather extends Task {
const {span, ctx} = this.startChildSpan(`nested:${this.playTask.summary}`);
this.playTask.span = span;
this.playTask.ctx = ctx;
this.playTask.exec(cs, ep); // kicked off, _not_ waiting for it to complete
this.playTask.exec(cs, {ep}); // kicked off, _not_ waiting for it to complete
this.playTask.on('playDone', (err) => {
span.end();
if (err) this.logger.error({err}, 'Gather:exec Error playing url');

View File

@@ -14,7 +14,7 @@ class TaskHangup extends Task {
/**
* Hangup the call
*/
async exec(cs, dlg) {
async exec(cs, {dlg}) {
await super.exec(cs);
try {
await dlg.destroy({headers: this.headers});

View File

@@ -8,7 +8,7 @@ class TaskLeave extends Task {
get name() { return TaskName.Leave; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
await this.awaitTaskDone();
}

View File

@@ -44,7 +44,7 @@ class Lex extends Task {
get name() { return TaskName.Lex; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
try {

View File

@@ -26,7 +26,7 @@ class TaskListen extends Task {
get name() { return TaskName.Listen; }
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
this._dtmfHandler = this._onDtmf.bind(this, ep);
@@ -40,7 +40,7 @@ class TaskListen extends Task {
const {span, ctx} = this.startChildSpan(`nested:${this.transcribeTask.summary}`);
this.transcribeTask.span = span;
this.transcribeTask.ctx = ctx;
this.transcribeTask.exec(cs, ep)
this.transcribeTask.exec(cs, {ep})
.then((result) => span.end())
.catch((err) => span.end());
}
@@ -219,7 +219,7 @@ class TaskListen extends Task {
this.logger.debug('Listen:whisper tasks starting');
while (tasks.length && !cs.callGone) {
const task = tasks.shift();
await task.exec(cs, this.ep);
await task.exec(cs, {ep: this.ep});
}
this.logger.debug('Listen:whisper tasks complete');
} catch (err) {

View File

@@ -10,7 +10,7 @@ class TaskPause extends Task {
get name() { return TaskName.Pause; }
async exec(cs, ep) {
async exec(cs) {
await super.exec(cs);
this.timer = setTimeout(this.notifyTaskDone.bind(this), this.length * 1000);
await this.awaitTaskDone();

View File

@@ -17,7 +17,7 @@ class TaskPlay extends Task {
return `${this.name}:{url=${this.url}}`;
}
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
try {

View File

@@ -20,7 +20,7 @@ class Rasa extends Task {
return this.reportedFinalAction || this.isReplacingApplication;
}
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
this.ep = ep;
@@ -34,7 +34,7 @@ class Rasa extends Task {
const {span, ctx} = this.startChildSpan(`nested:${this.gatherTask.summary}`);
this.gatherTask.span = span;
this.gatherTask.ctx = ctx;
this.gatherTask.exec(cs, ep, this)
this.gatherTask.exec(cs, {ep})
.then(() => span.end())
.catch((err) => {
span.end();
@@ -128,7 +128,7 @@ class Rasa extends Task {
const {span, ctx} = this.startChildSpan(`nested:${this.gatherTask.summary}`);
this.gatherTask.span = span;
this.gatherTask.ctx = ctx;
this.gatherTask.exec(cs, ep, this)
this.gatherTask.exec(cs, {ep})
.then(() => span.end())
.catch((err) => {
span.end();

View File

@@ -23,7 +23,7 @@ class TaskSayLegacy extends Task {
get name() { return TaskName.SayLegacy; }
async exec(cs, ep) {
async exec(cs, {ep}) {
super.exec(cs);
this.ep = ep;
try {

View File

@@ -22,7 +22,7 @@ class TaskSay extends Task {
return `${this.name}{${this.text[0]}}`;
}
async exec(cs, ep) {
async exec(cs, {ep}) {
await super.exec(cs);
const {srf} = cs;

View File

@@ -17,7 +17,7 @@ class TaskSipRequest extends Task {
get name() { return TaskName.SipRequest; }
async exec(cs, dlg) {
async exec(cs, {dlg}) {
super.exec(cs);
try {
this.logger.info({dlg}, `TaskSipRequest: sending a SIP ${this.method}`);

View File

@@ -58,7 +58,7 @@ class TaskTranscribe extends Task {
get name() { return TaskName.Transcribe; }
async exec(cs, ep, ep2) {
async exec(cs, {ep, ep2}) {
super.exec(cs);
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
@@ -80,7 +80,9 @@ class TaskTranscribe extends Task {
throw new Error('no provisioned speech credentials for TTS');
}
await this._startTranscribing(cs, ep, 1);
if (this.separateRecognitionPerChannel && ep2) await this._startTranscribing(cs, ep2, 2);
if (this.separateRecognitionPerChannel && ep2) {
await this._startTranscribing(cs, ep2, 2);
}
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
@@ -102,18 +104,21 @@ class TaskTranscribe extends Task {
async kill(cs) {
super.kill(cs);
let stopTranscription = false;
if (this.ep?.connected) {
stopTranscription = true;
this.ep.stopTranscription({vendor: this.vendor})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
// hangup after 1 sec if we don't get a final transcription
this._timer = setTimeout(() => this.notifyTaskDone(), 1000);
}
if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) {
stopTranscription = true;
this.ep2.stopTranscription({vendor: this.vendor})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
}
// hangup after 1 sec if we don't get a final transcription
if (stopTranscription) this._timer = setTimeout(() => this.notifyTaskDone(), 1500);
else this.notifyTaskDone();
await this.awaitTaskDone();
}

View File

@@ -29,6 +29,7 @@
"Tag": "tag",
"Transcribe": "transcribe"
},
"AllowedSipRecVerbs": ["config", "gather", "transcribe", "listen"],
"CallStatus": {
"Trying": "trying",
"Ringing": "ringing",