major revamp of http client functionalit

This commit is contained in:
Dave Horton
2020-02-14 12:45:28 -05:00
parent ff531e6964
commit 446000ee97
35 changed files with 906 additions and 433 deletions

View File

@@ -3,9 +3,37 @@ const makeTask = require('./make_task');
const {CallStatus, CallDirection, TaskName, TaskPreconditions, MAX_SIMRINGS} = require('../utils/constants');
const assert = require('assert');
const placeCall = require('../utils/place-outdial');
const sessionTracker = require('../session/session-tracker');
const DtmfCollector = require('../utils/dtmf-collector');
const config = require('config');
const debug = require('debug')('jambonz:feature-server');
function parseDtmfOptions(logger, dtmfCapture) {
let parentDtmfCollector, childDtmfCollector;
const parentKeys = [], childKeys = [];
if (Array.isArray(dtmfCapture)) {
Array.prototype.push.apply(parentKeys, dtmfCapture);
Array.prototype.push.apply(childKeys, dtmfCapture);
}
else if (dtmfCapture.childCall || dtmfCapture.parentCall) {
if (dtmfCapture.childCall && Array.isArray(dtmfCapture.childCall)) {
Array.prototype.push.apply(childKeys, dtmfCapture.childCall);
}
if (dtmfCapture.parentCall && Array.isArray(dtmfCapture.parentCall)) {
Array.prototype.push.apply(childKeys, dtmfCapture.parentCall);
}
}
if (childKeys.length) {
childDtmfCollector = new DtmfCollector({logger, patterns: childKeys});
}
if (parentKeys.length) {
parentDtmfCollector = new DtmfCollector({logger, patterns: parentKeys});
}
return {childDtmfCollector, parentDtmfCollector};
}
function compareTasks(t1, t2) {
if (t1.type !== t2.type) return false;
switch (t1.type) {
@@ -44,6 +72,7 @@ class TaskDial extends Task {
super(logger, opts);
this.preconditions = TaskPreconditions.None;
this.actionHook = this.data.actionHook;
this.earlyMedia = this.data.answerOnBridge === true;
this.callerId = this.data.callerId;
this.dialMusic = this.data.dialMusic;
@@ -52,8 +81,19 @@ class TaskDial extends Task {
this.target = filterAndLimit(this.logger, this.data.target);
this.timeout = this.data.timeout || 60;
this.timeLimit = this.data.timeLimit;
this.confirmUrl = this.data.confirmUrl;
this.confirmHook = this.data.confirmHook;
this.confirmMethod = this.data.confirmMethod;
this.dtmfHook = this.data.dtmfHook;
if (this.dtmfHook) {
const {parentDtmfCollector, childDtmfCollector} = parseDtmfOptions(logger, this.data.dtmfCapture || {});
if (parentDtmfCollector) {
this.parentDtmfCollector = parentDtmfCollector;
}
if (childDtmfCollector) {
this.childDtmfCollector = childDtmfCollector;
}
}
if (this.data.listen) {
this.listenTask = makeTask(logger, {'listen': this.data.listen}, this);
@@ -83,9 +123,15 @@ class TaskDial extends Task {
if (cs.direction === CallDirection.Inbound) {
await this._initializeInbound(cs);
}
else {
this.epOther = cs.ep;
}
this._installDtmfDetection(cs, this.epOther, this.parentDtmfCollector);
await this._attemptCalls(cs);
await this.awaitTaskDone();
await this.performAction(this.method, null, this.results);
await cs.requestor.request(this.actionHook, Object.assign({}, cs.callInfo, this.results));
this._removeDtmfDetection(cs, this.epOther);
this._removeDtmfDetection(cs, this.ep);
} catch (err) {
this.logger.error(`TaskDial:exec terminating with error ${err.message}`);
this.kill();
@@ -94,17 +140,57 @@ class TaskDial extends Task {
async kill() {
super.kill();
this._removeDtmfDetection(this.cs, this.epOther);
this._removeDtmfDetection(this.cs, this.ep);
this._killOutdials();
if (this.sd) {
this.sd.kill();
this.sd = null;
}
sessionTracker.remove(this.callSid);
if (this.listenTask) await this.listenTask.kill();
if (this.transcribeTask) await this.transcribeTask.kill();
if (this.timerMaxCallDuration) clearTimeout(this.timerMaxCallDuration);
this.notifyTaskDone();
}
/**
* whisper a prompt to one side of the call
* @param {*} tasks - array of play/say tasks to execute
*/
async whisper(tasks, callSid) {
if (!this.epOther || !this.ep) return this.logger.info('Dial:whisper: no paired endpoint found');
try {
const cs = this.callSession;
this.logger.debug('Dial:whisper unbridging endpoints');
await this.epOther.unbridge();
this.logger.debug('Dial:whisper executing tasks');
while (tasks.length && !cs.callGone) {
const task = tasks.shift();
await task.exec(cs, callSid === this.callSid ? this.ep : this.epOther);
}
this.logger.debug('Dial:whisper tasks complete');
if (!cs.callGone) this.epOther.bridge(this.ep);
} catch (err) {
this.logger.error(err, 'Dial:whisper error');
}
}
/**
* mute or unmute one side of the call
*/
async mute(callSid, doMute) {
if (!this.epOther || !this.ep) return this.logger.info('Dial:mute: no paired endpoint found');
try {
const parentCall = callSid !== this.callSid;
const ep = parentCall ? this.epOther : this.ep;
await ep[doMute ? 'mute' : 'unmute']();
this.logger.debug(`Dial:mute ${doMute ? 'muted' : 'unmuted'} ${parentCall ? 'parentCall' : 'childCall'}`);
} catch (err) {
this.logger.error(err, 'Dial:mute error');
}
}
_killOutdials() {
for (const [callSid, sd] of Array.from(this.dials)) {
this.logger.debug(`Dial:_killOutdials killing callSid ${callSid}`);
@@ -113,8 +199,33 @@ class TaskDial extends Task {
this.dials.clear();
}
_installDtmfDetection(cs, ep, dtmfDetector) {
if (ep && this.dtmfHook && !ep.dtmfDetector) {
ep.dtmfDetector = dtmfDetector;
ep.on('dtmf', this._onDtmf.bind(this, cs, ep));
}
}
_removeDtmfDetection(cs, ep) {
if (ep) {
delete ep.dtmfDetector;
ep.removeListener('dtmf', this._onDtmf.bind(this, cs, ep));
}
}
_onDtmf(cs, ep, evt) {
const match = ep.dtmfDetector.keyPress(evt.dtmf);
const requestor = ep.dtmfDetector === this.parentDtmfCollector ?
cs.requestor :
this.sd.requestor;
if (match) {
this.logger.debug(`parentCall triggered dtmf match: ${match}`);
requestor.request(this.dtmfHook, Object.assign({dtmf: match}, cs.callInfo))
.catch((err) => this.logger.info(err, 'Dial:_onDtmf - error'));
}
}
async _initializeInbound(cs) {
const {ep} = await cs.connectInboundCallToIvr(this.earlyMedia);
const ep = await cs._evalEndpointPrecondition(this);
this.epOther = ep;
debug(`Dial:__initializeInbound allocated ep for incoming call: ${ep.uuid}`);
@@ -256,8 +367,10 @@ class TaskDial extends Task {
this.kill();
}, this.timeLimit * 1000);
}
sessionTracker.add(this.callSid, cs);
this.dlg.on('destroy', () => {
this.logger.debug('Dial:_selectSingleDial called party hungup, ending dial operation');
sessionTracker.remove(this.callSid);
if (this.timerMaxCallDuration) clearTimeout(this.timerMaxCallDuration);
this.ep.unbridge();
this.kill();
@@ -268,6 +381,8 @@ class TaskDial extends Task {
dialCallSid: sd.callSid,
});
if (this.childDtmfCollector) this._installDtmfDetection(cs, this.ep, this.childDtmfCollector);
if (this.transcribeTask) this.transcribeTask.exec(cs, this.ep);
if (this.listenTask) this.listenTask.exec(cs, this.ep);
}

View File

@@ -9,13 +9,11 @@ class TaskGather extends Task {
this.preconditions = TaskPreconditions.Endpoint;
[
'action', 'finishOnKey', 'hints', 'input', 'method', 'numDigits',
'partialResultCallback', 'partialResultCallbackMethod', 'profanityFilter',
'finishOnKey', 'hints', 'input', 'numDigits',
'partialResultHook', 'profanityFilter',
'speechTimeout', 'timeout', 'say', 'play'
].forEach((k) => this[k] = this.data[k]);
this.partialResultCallbackMethod = this.partialResultCallbackMethod || 'POST';
this.method = this.method || 'POST';
this.timeout = (this.timeout || 5) * 1000;
this.interim = this.partialResultCallback;
if (this.data.recognizer) {
@@ -23,7 +21,6 @@ class TaskGather extends Task {
this.vendor = this.data.recognizer.vendor;
}
this.digitBuffer = '';
this._earlyMedia = this.data.earlyMedia === true;
@@ -142,7 +139,10 @@ class TaskGather extends Task {
_onTranscription(ep, evt) {
this.logger.debug(evt, 'TaskGather:_onTranscription');
if (evt.is_final) this._resolve('speech', evt);
else if (this.partialResultCallback) this.notifyHook(this.partialResultCallback, 'POST', null, {speech: evt});
else if (this.partialResultHook) {
this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'GatherTask:_onTranscription error'));
}
}
_onEndOfUtterance(ep, evt) {
this.logger.info(evt, 'TaskGather:_onEndOfUtterance');
@@ -154,10 +154,10 @@ class TaskGather extends Task {
this._clearTimer();
if (reason.startsWith('dtmf')) {
await this.performAction(this.method, null, {digits: this.digitBuffer});
await this.performAction({digits: this.digitBuffer});
}
else if (reason.startsWith('speech')) {
await this.performAction(this.method, null, {speech: evt});
await this.performAction({speech: evt});
}
this.notifyTaskDone();
}

View File

@@ -20,7 +20,6 @@ class TaskListen extends Task {
this.nested = parentTask instanceof Task;
this.results = {};
this.ranToCompletion = false;
if (this.transcribe) this.transcribeTask = makeTask(logger, {'transcribe': opts.transcribe}, this);
@@ -42,8 +41,7 @@ class TaskListen extends Task {
}
await this._startListening(cs, ep);
await this.awaitTaskDone();
const acceptNewApp = !this.nested && this.ranToCompletion;
if (this.action) await this.performAction(this.method, this.auth, this.results, acceptNewApp);
await this.performAction(this.results, !this.nested);
} catch (err) {
this.logger.info(err, `TaskListen:exec - error ${this.url}`);
}
@@ -68,18 +66,15 @@ class TaskListen extends Task {
this.notifyTaskDone();
}
updateListen(status) {
async updateListen(status) {
if (!this.killed && this.ep && this.ep.connected) {
this.logger.info(`TaskListen:updateListen status ${status}`);
switch (status) {
case ListenStatus.Pause:
this.ep.forkAudioPause().catch((err) => this.logger.info(err, 'TaskListen: error pausing audio'));
break;
case ListenStatus.Silence:
this.ep.forkAudioPause().catch((err) => this.logger.info(err, 'TaskListen: error pausing audio'));
await this.ep.forkAudioPause().catch((err) => this.logger.info(err, 'TaskListen: error pausing audio'));
break;
case ListenStatus.Resume:
this.ep.forkAudioResume().catch((err) => this.logger.info(err, 'TaskListen: error resuming audio'));
await this.ep.forkAudioResume().catch((err) => this.logger.info(err, 'TaskListen: error resuming audio'));
break;
}
}
@@ -114,7 +109,6 @@ class TaskListen extends Task {
if (this.maxLength) {
this._timer = setTimeout(() => {
this.logger.debug(`TaskListen terminating task due to timeout of ${this.timeout}s reached`);
this.ranToCompletion = true;
this.kill();
}, this.maxLength * 1000);
}
@@ -142,7 +136,6 @@ class TaskListen extends Task {
if (evt.dtmf === this.finishOnKey) {
this.logger.info(`TaskListen:_onDtmf terminating task due to dtmf ${evt.dtmf}`);
this.results.digits = evt.dtmf;
this.ranToCompletion = true;
this.kill();
}
}

View File

@@ -7,17 +7,13 @@ const {TaskName} = require('../utils/constants');
class TaskRedirect extends Task {
constructor(logger, opts) {
super(logger, opts);
this.action = this.data.action;
this.method = (this.data.method || 'POST').toUpperCase();
this.auth = this.data.auth;
}
get name() { return TaskName.Redirect; }
async exec(cs) {
super.exec(cs);
await this.performAction(this.method, this.auth);
await this.performAction();
}
}

View File

@@ -44,9 +44,7 @@ class TaskRestDial extends Task {
this.req = null;
const cs = this.callSession;
cs.setDialog(dlg);
const obj = Object.assign({}, cs.callInfo);
const tasks = await this.actionHook(this.call_hook, obj);
const tasks = await cs.requestor.request(this.call_hook, cs.callInfo);
if (tasks && Array.isArray(tasks)) {
this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`);
cs.replaceApplication(tasks);

View File

@@ -39,11 +39,11 @@
},
"gather": {
"properties": {
"action": "string",
"actionHook": "object|string",
"finishOnKey": "string",
"input": "array",
"numDigits": "number",
"partialResultCallback": "string",
"partialResultHook": "object|string",
"speechTimeout": "number",
"timeout": "number",
"recognizer": "#recognizer",
@@ -51,26 +51,20 @@
"say": "#say"
},
"required": [
"action"
"actionHook"
]
},
"dial": {
"properties": {
"action": "string",
"actionHook": "object|string",
"answerOnBridge": "boolean",
"callerId": "string",
"confirmUrl": "string",
"confirmMethod": {
"type": "string",
"enum": ["GET", "POST"]
},
"confirmHook": "object|string",
"dialMusic": "string",
"dtmfCapture": "object",
"dtmfHook": "object|string",
"headers": "object",
"listen": "#listen",
"method": {
"type": "string",
"enum": ["GET", "POST"]
},
"target": ["#target"],
"timeLimit": "number",
"timeout": "number",
@@ -82,15 +76,11 @@
},
"listen": {
"properties": {
"action": "string",
"actionHook": "object|string",
"auth": "#auth",
"finishOnKey": "string",
"maxLength": "number",
"metadata": "object",
"method": {
"type": "string",
"enum": ["GET", "POST"]
},
"mixType": {
"type": "string",
"enum": ["mono", "stereo", "mixed"]
@@ -118,21 +108,17 @@
},
"redirect": {
"properties": {
"action": "string",
"method": {
"type": "string",
"enum": ["GET", "POST"]
},
"auth": "#auth"
"actionHook": "object|string"
},
"required": [
"action"
"actionHook"
]
},
"rest:dial": {
"properties": {
"call_hook": "object",
"call_hook": "object|string",
"from": "string",
"tag": "object",
"to": "#target",
"timeout": "number"
},
@@ -152,12 +138,12 @@
},
"transcribe": {
"properties": {
"transcriptionCallback": "string",
"transcriptionHook": "string",
"recognizer": "#recognizer",
"earlyMedia": "boolean"
},
"required": [
"transcriptionCallback"
"transcriptionHook"
]
},
"target": {

View File

@@ -2,25 +2,39 @@ const Emitter = require('events');
const debug = require('debug')('jambonz:feature-server');
const assert = require('assert');
const {TaskPreconditions} = require('../utils/constants');
const normalizeJamones = require('../utils/normalize-jamones');
const makeTask = require('./make_task');
const specs = new Map();
const _specData = require('./specs');
for (const key in _specData) {specs.set(key, _specData[key]);}
/**
* @classdesc Represents a jambonz verb. This is a superclass that is extended
* by a subclass for each verb.
* @extends Emitter
*/
class Task extends Emitter {
constructor(logger, data) {
super();
this.preconditions = TaskPreconditions.None;
this.logger = logger;
this.data = data;
this.actionHook = this.data.actionHook;
this._killInProgress = false;
this._completionPromise = new Promise((resolve) => this._completionResolver = resolve);
}
/**
* @property {boolean} killed - true if the task has been killed
*/
get killed() {
return this._killInProgress;
}
/**
* @property {CallSession} callSession - the CallSession this task is executing within
*/
get callSession() {
return this.cs;
}
@@ -29,13 +43,13 @@ class Task extends Emitter {
return this.data;
}
/**
* Execute the task. Subclasses must implement this method, but should always call
* the superclass implementation first.
* @param {CallSession} cs - the CallSession that the Task will be executing within.
*/
async exec(cs) {
this.cs = cs;
// N.B. need to require it down here rather than at top to avoid recursion in require of this module
const {actionHook, notifyHook} = require('../utils/notifiers')(this.logger, cs.callInfo);
this.actionHook = actionHook;
this.notifyHook = notifyHook;
}
/**
@@ -48,29 +62,47 @@ class Task extends Emitter {
// no-op
}
/**
* when a subclass Task has completed its work, it should call this method
*/
notifyTaskDone() {
this._completionResolver();
}
/**
* when a subclass task has launched various async activities and is now simply waiting
* for them to complete it should call this method to block until that happens
*/
awaitTaskDone() {
return this._completionPromise;
}
/**
* provided as a convenience for tasks, this simply calls CallSession#normalizeUrl
*/
normalizeUrl(url, method, auth) {
return this.callSession.normalizeUrl(url, method, auth);
}
async performAction(method, auth, results, expectResponse = true) {
if (this.action) {
const hook = this.normalizeUrl(this.action, method, auth);
const tasks = await this.actionHook(hook, results, expectResponse);
if (expectResponse && tasks && Array.isArray(tasks)) {
this.logger.debug({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.callSession.replaceApplication(tasks);
async performAction(results, expectResponse = true) {
if (this.actionHook) {
const params = results ? Object.assign(results, this.cs.callInfo) : this.cs.callInfo;
const json = await this.cs.requestor.request(this.actionHook, params);
if (expectResponse && json && Array.isArray(json)) {
const tasks = normalizeJamones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.callSession.replaceApplication(tasks);
}
}
}
}
/**
* validate that the JSON task description is valid
* @param {string} name - verb name
* @param {object} data - verb properties
*/
static validate(name, data) {
debug(`validating ${name} with data ${JSON.stringify(data)}`);
// validate the instruction is supported
@@ -94,6 +126,12 @@ class Task extends Emitter {
else if (typeof dSpec === 'string' && dSpec === 'array') {
if (!Array.isArray(dVal)) throw new Error(`${name}: property ${dKey} is not an array`);
}
else if (typeof dSpec === 'string' && dSpec.includes('|')) {
const types = dSpec.split('|').map((t) => t.trim());
if (!types.includes(typeof dVal)) {
throw new Error(`${name}: property ${dKey} has invalid data type, must be one of ${types}`);
}
}
else if (Array.isArray(dSpec) && dSpec[0].startsWith('#')) {
const name = dSpec[0].slice(1);
for (const item of dVal) {

View File

@@ -6,7 +6,7 @@ class TaskTranscribe extends Task {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.transcriptionCallback = this.data.transcriptionCallback;
this.transcriptionHook = this.data.transcriptionHook;
this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia);
if (this.data.recognizer) {
this.language = this.data.recognizer.language || 'en-US';
@@ -78,7 +78,8 @@ class TaskTranscribe extends Task {
_onTranscription(ep, evt) {
this.logger.debug(evt, 'TaskTranscribe:_onTranscription');
this.notifyHook(this.transcriptionCallback, 'POST', null, {speech: evt});
this.cs.requestor.request(this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo))
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
if (this.killed) {
this.logger.debug('TaskTranscribe:_onTranscription exiting after receiving final transcription');
this._clearTimer();