wip: implemented listen, transcribe, play

This commit is contained in:
Dave Horton
2020-01-17 09:15:23 -05:00
parent 1a656f3f0e
commit 0d4c1d9d8c
24 changed files with 688 additions and 108 deletions

View File

@@ -28,10 +28,10 @@ class TaskDial extends Task {
this.timeLimit = opts.timeLimit;
if (opts.listen) {
this.listenTask = makeTask(logger, {'listen': opts.transcribe});
this.listenTask = makeTask(logger, {'listen': opts.listen});
}
if (opts.transcribe) {
this.transcribeTask = makeTask(logger, {'transcribe': opts.transcribe});
this.transcribeTask = makeTask(logger, {'transcribe' : opts.transcribe});
}
this.canceled = false;

View File

@@ -1,5 +1,5 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const {TaskName, TaskPreconditions, TranscriptionEvents} = require('../utils/constants');
const makeTask = require('./make_task');
const assert = require('assert');
@@ -19,6 +19,7 @@ class TaskGather extends Task {
this.timeout = (this.timeout || 5) * 1000;
this.language = this.language || 'en-US';
this.digitBuffer = '';
//this._earlyMedia = this.data.earlyMedia === true;
if (this.say) {
this.sayTask = makeTask(this.logger, {say: this.say});
@@ -27,6 +28,11 @@ class TaskGather extends Task {
get name() { return TaskName.Gather; }
get earlyMedia() {
return (this.sayTask && this.sayTask.earlyMedia) ||
(this.playTask && this.playTask.earlyMedia);
}
async exec(cs, ep) {
this.ep = ep;
this.actionHook = cs.actionHook;
@@ -35,33 +41,15 @@ class TaskGather extends Task {
try {
if (this.sayTask) {
this.sayTask.exec(cs, ep); // kicked off, _not_ waiting for it to complete
this.sayTask.on('playDone', this._onPlayDone.bind(this, ep));
this.sayTask.on('playDone', (err) => {
if (this.taskInProgress) this._startTimer();
});
}
else this._startTimer();
if (this.input.includes('speech')) {
const opts = {
GOOGLE_SPEECH_USE_ENHANCED: true,
GOOGLE_SPEECH_SINGLE_UTTERANCE: true,
GOOGLE_SPEECH_MODEL: 'phone_call'
};
if (this.hints) {
Object.assign(opts, {'GOOGLE_SPEECH_HINTS': this.hints.join(',')});
}
if (this.profanityFilter === true) {
Object.assign(opts, {'GOOGLE_SPEECH_PROFANITY_FILTER': true});
}
this.logger.debug(`setting freeswitch vars ${JSON.stringify(opts)}`);
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error set'));
ep.addCustomEventListener('google_transcribe::transcription', this._onTranscription.bind(this, ep));
ep.addCustomEventListener('google_transcribe::no_audio_detected', this._onNoAudioDetected.bind(this, ep));
ep.addCustomEventListener('google_transcribe::max_duration_exceeded', this._onMaxDuration.bind(this, ep));
this.logger.debug('starting transcription');
ep.startTranscription({
interim: this.partialResultCallback ? true : false,
language: this.language
}).catch((err) => this.logger.error(err, 'TaskGather:exec error starting transcription'));
await this._initSpeech(ep);
this._startTranscribing(ep);
}
if (this.input.includes('dtmf')) {
@@ -73,10 +61,12 @@ class TaskGather extends Task {
this.logger.error(err, 'TaskGather:exec error');
}
this.taskInProgress = false;
ep.removeAllListeners();
ep.removeCustomEventListener(TranscriptionEvents.Transcription);
ep.removeCustomEventListener(TranscriptionEvents.EndOfUtterance);
}
kill() {
super.kill();
this._killAudio();
this._resolve('killed');
}
@@ -85,12 +75,6 @@ class TaskGather extends Task {
return new Promise((resolve) => this.resolver = resolve);
}
_onPlayDone(ep, err, evt) {
if (err || !this.taskInProgress) return;
this.logger.debug(evt, 'TaskGather:_onPlayDone, starting input timer');
this._startTimer();
}
_onDtmf(ep, evt) {
this.logger.debug(evt, 'TaskGather:_onDtmf');
if (evt.dtmf === this.finishOnKey) this._resolve('dtmf-terminator-key');
@@ -101,6 +85,32 @@ class TaskGather extends Task {
this._killAudio();
}
async _initSpeech(ep) {
const opts = {
GOOGLE_SPEECH_USE_ENHANCED: true,
GOOGLE_SPEECH_SINGLE_UTTERANCE: true,
GOOGLE_SPEECH_MODEL: 'phone_call'
};
if (this.hints) {
Object.assign(opts, {'GOOGLE_SPEECH_HINTS': this.hints.join(',')});
}
if (this.profanityFilter === true) {
Object.assign(opts, {'GOOGLE_SPEECH_PROFANITY_FILTER': true});
}
this.logger.debug(`setting freeswitch vars ${JSON.stringify(opts)}`);
await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error set'));
ep.addCustomEventListener(TranscriptionEvents.Transcription, this._onTranscription.bind(this, ep));
ep.addCustomEventListener(TranscriptionEvents.EndOfUtterance, this._onEndOfUtterance.bind(this, ep));
}
_startTranscribing(ep) {
ep.startTranscription({
interim: this.partialResultCallback ? true : false,
language: this.language
}).catch((err) => this.logger.error(err, 'TaskGather:_startTranscribing error'));
}
_startTimer() {
assert(!this._timeoutTimer);
this._timeoutTimer = setTimeout(() => this._resolve('timeout'), this.timeout);
@@ -123,6 +133,8 @@ class TaskGather extends Task {
_onTranscription(ep, evt) {
this.logger.debug(evt, 'TaskGather:_onTranscription');
if (evt.is_final) {
ep.removeCustomEventListener(TranscriptionEvents.Transcription);
ep.removeCustomEventListener(TranscriptionEvents.EndOfUtterance);
this._resolve('speech', evt);
}
else if (this.partialResultCallback) {
@@ -131,11 +143,9 @@ class TaskGather extends Task {
});
}
}
_onNoAudioDetected(ep, evt) {
this.logger.info(evt, 'TaskGather:_onNoAudioDetected');
}
_onMaxDuration(ep, evt) {
this.logger.info(evt, 'TaskGather:_onMaxDuration');
_onEndOfUtterance(ep, evt) {
this.logger.info(evt, 'TaskGather:_onEndOfUtterance');
this._startTranscribing(ep);
}
_resolve(reason, evt) {

View File

@@ -1,10 +1,12 @@
const Task = require('./task');
const {TaskName} = require('../utils/constants');
const {TaskName, TaskPreconditions} = require('../utils/constants');
class TaskHangup extends Task {
constructor(logger, opts) {
super(logger, opts);
this.headers = this.data.headers || {};
this.preconditions = TaskPreconditions.StableCall;
}
get name() { return TaskName.Hangup; }
@@ -16,7 +18,7 @@ class TaskHangup extends Task {
try {
await dlg.destroy({headers: this.headers});
} catch (err) {
this.logger.error(err, `TaskHangup:exec - Error hanging up call with sip call id ${dlg.sip.callId}`);
this.logger.error(err, 'TaskHangup:exec - Error hanging up call');
}
}
}

131
lib/tasks/listen.js Normal file
View File

@@ -0,0 +1,131 @@
const Task = require('./task');
const {TaskName, TaskPreconditions, ListenEvents} = require('../utils/constants');
const makeTask = require('./make_task');
const assert = require('assert');
class TaskListen extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
[
'url', 'finishOnKey', 'maxLength', 'metadata', 'mixType', 'passDtmf', 'playBeep',
'sampleRate', 'timeout', 'transcribe'
].forEach((k) => this[k] = this.data[k]);
this.mixType = this.mixType || 'mono';
this.sampleRate = this.sampleRate || 8000;
this.earlyMedia = this.data.earlyMedia === true;
if (this.transcribe) {
this.transcribeTask = makeTask(logger, {'transcribe': opts.transcribe});
}
this._dtmfHandler = this._onDtmf.bind(this);
this._completionPromise = new Promise((resolve) => this._completionResolver = resolve);
}
get name() { return TaskName.Listen; }
async exec(cs, ep) {
this.ep = ep;
try {
if (this.playBeep) await this._playBeep(ep);
if (this.transcribeTask) {
this.logger.debug('TaskListen:exec - starting nested transcribe task');
this.transcribeTask.exec(cs, ep, this); // kicked off, _not_ waiting for it to complete
}
await this._startListening(ep);
await this._completionPromise;
} catch (err) {
this.logger.info(err, `TaskListen:exec - error ${this.url}`);
}
if (this.transcribeTask) this.transcribeTask.kill();
this._removeListeners(ep);
this.listenComplete = true;
this.emit('listenDone');
}
async kill() {
super.kill();
this._clearTimer();
if (this.ep.connected && !this.listenComplete) {
this.listenComplete = true;
if (this.transcribeTask) {
await this.transcribeTask.kill();
this.transcribeTask = null;
}
await this.ep.forkAudioStop()
.catch((err) => this.logger.info(err, 'TaskListen:kill'));
}
this._completionResolver();
}
async _playBeep(ep) {
await ep.play('tone_stream://L=1;%(500, 0, 1500)')
.catch((err) => this.logger.info(err, 'TaskListen:_playBeep Error playing beep'));
}
async _startListening(ep) {
this._initListeners(ep);
await ep.forkAudioStart({
wsUrl: this.url,
mixType: this.mixType,
sampling: this.sampleRate,
metadata: this.metadata
});
if (this.timeout) {
this._timer = setTimeout(() => {
this.logger.debug(`TaskListen:_startListening terminating task due to timeout of ${this.timeout} reached`);
this.kill();
}, this.timeout * 1000);
}
}
_initListeners(ep) {
ep.addCustomEventListener(ListenEvents.Connect, this._onConnect.bind(this, ep));
ep.addCustomEventListener(ListenEvents.ConnectFailure, this._onConnectFailure.bind(this, ep));
ep.addCustomEventListener(ListenEvents.Error, this._onError.bind(this, ep));
if (this.finishOnKey || this.passDtmf) {
ep.on('dtmf', this._dtmfHandler);
}
}
_removeListeners(ep) {
ep.removeCustomEventListener(ListenEvents.Connect);
ep.removeCustomEventListener(ListenEvents.ConnectFailure);
ep.removeCustomEventListener(ListenEvents.Error);
if (this.finishOnKey || this.passDtmf) {
ep.removeListener('dtmf', this._dtmfHandler);
}
}
_onDtmf(evt) {
if (evt.dtmf === this.finishOnKey) {
this.logger.info(`TaskListen:_onDtmf terminating task due to dtmf ${evt.dtmf}`);
this.kill();
}
}
_clearTimer() {
if (this._timer) {
clearTimeout(this._timer);
this._timer = null;
}
}
_onConnect(ep) {
this.logger.debug('TaskListen:_onConnect');
}
_onConnectFailure(ep, evt) {
this.logger.info(evt, 'TaskListen:_onConnectFailure');
this._completionResolver();
}
_onError(ep, evt) {
this.logger.info(evt, 'TaskListen:_onError');
this._completionResolver();
}
}
module.exports = TaskListen;

View File

@@ -2,13 +2,15 @@ const Task = require('./task');
const {TaskName} = require('../utils/constants');
const errBadInstruction = new Error('invalid instruction payload');
function makeTask(logger, opts) {
logger.debug(opts, 'makeTask');
if (typeof opts !== 'object' || Array.isArray(opts)) throw errBadInstruction;
const keys = Object.keys(opts);
if (keys.length !== 1) throw errBadInstruction;
function makeTask(logger, obj) {
const keys = Object.keys(obj);
if (!keys || keys.length !== 1) {
throw errBadInstruction;
}
const name = keys[0];
const data = opts[name];
const data = obj[name];
logger.debug(data, `makeTask: ${name}`);
if (typeof data !== 'object') throw errBadInstruction;
Task.validate(name, data);
switch (name) {
case TaskName.SipDecline:
@@ -26,6 +28,12 @@ function makeTask(logger, opts) {
case TaskName.Gather:
const TaskGather = require('./gather');
return new TaskGather(logger, data);
case TaskName.Transcribe:
const TaskTranscribe = require('./transcribe');
return new TaskTranscribe(logger, data);
case TaskName.Listen:
const TaskListen = require('./listen');
return new TaskListen(logger, data);
}
// should never reach

40
lib/tasks/play.js Normal file
View File

@@ -0,0 +1,40 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
class TaskPlay extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.url = this.data.url;
this.loop = this.data.loop || 1;
this.earlyMedia = this.data.earlyMedia === true;
this.playComplete = false;
}
get name() { return TaskName.Play; }
async exec(cs, ep) {
this.ep = ep;
try {
while (!this.playComplete && this.loop--) {
await ep.play(this.url);
}
} catch (err) {
this.logger.info(err, `TaskPlay:exec - error playing ${this.url}`);
}
this.playComplete = true;
this.emit('playDone');
}
kill() {
super.kill();
if (this.ep.connected && !this.playComplete) {
this.logger.debug('TaskPlay:kill - killing audio');
this.playComplete = true;
this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
}
}
}
module.exports = TaskPlay;

View File

@@ -31,7 +31,7 @@ class TaskSay extends Task {
text: this.text
});
} catch (err) {
if (err.message !== 'hangup') this.logger.info(err, 'TaskSay:exec error');
this.logger.info(err, 'TaskSay:exec error');
}
this.emit('playDone');
this.sayComplete = true;

View File

@@ -16,11 +16,22 @@
"required": [
]
},
"play": {
"properties": {
"url": "string",
"loop": "number",
"earlyMedia": "boolean"
},
"required": [
"url"
]
},
"say": {
"properties": {
"text": "string",
"loop": "number",
"synthesizer": "#synthesizer"
"synthesizer": "#synthesizer",
"earlyMedia": "boolean"
},
"required": [
"text",
@@ -31,14 +42,12 @@
"properties": {
"action": "string",
"finishOnKey": "string",
"hints": "array",
"input": "array",
"language": "string",
"numDigits": "number",
"partialResultCallback": "string",
"profanityFilter": "boolean",
"speechTimeout": "number",
"timeout": "number",
"recognizer": "#recognizer",
"say": "#say"
},
"required": [
@@ -73,37 +82,34 @@
},
"listen": {
"properties": {
"finishOnKey": "string",
"maxLength": "number",
"metadata": "object",
"mixType": {
"type": "string",
"enum": ["mono", "stereo", "mixed"]
},
"passDtmf": "boolean",
"playBeep": "boolean",
"sampleRate": "number",
"source": {
"type": "string",
"enum": ["parent", "child", "both"]
},
"wsUrl": "string"
"timeout": "number",
"transcribe": "#transcribe",
"url": "string",
"earlyMedia": "boolean"
},
"required": [
"wsUrl",
"sampleRate"
"url"
]
},
"transcribe": {
"properties": {
"action": "string",
"interim": "boolean",
"jsonKey": "string",
"language": "string",
"source": "string",
"vendor": "string"
"recognizer": "#recognizer",
"earlyMedia": "boolean"
},
"required": [
"action",
"jsonKey",
"language"
"recognizer"
]
},
"target": {
@@ -143,6 +149,28 @@
"enum": ["google"]
},
"voice": "string"
}
},
"required": [
"vendor"
]
},
"recognizer": {
"properties": {
"vendor": {
"type": "string",
"enum": ["google"]
},
"language": "string",
"hints": "array",
"profanityFilter": "boolean",
"interim": "boolean",
"mixType": {
"type": "string",
"enum": ["mono", "stereo", "mixed"]
}
},
"required": [
"vendor"
]
}
}

View File

@@ -13,6 +13,12 @@ class Task extends Emitter {
this.preconditions = TaskPreconditions.None;
this.logger = logger;
this.data = data;
this._killInProgress = false;
}
get killed() {
return this._killInProgress;
}
/**
@@ -20,6 +26,8 @@ class Task extends Emitter {
* what to do is up to each type of task
*/
kill() {
this.logger.debug(`${this.name} is being killed`);
this._killInProgress = true;
// no-op
}

106
lib/tasks/transcribe.js Normal file
View File

@@ -0,0 +1,106 @@
const Task = require('./task');
const {TaskName, TaskPreconditions, TranscriptionEvents} = require('../utils/constants');
const assert = require('assert');
class TaskTranscribe extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
this.action = this.data.action;
this.language = this.data.language || 'en-US';
this.vendor = this.data.vendor;
this.interim = this.data.interim === true;
this.mixType = this.data.mixType;
this.earlyMedia = this.data.earlyMedia === true;
this._completionPromise = new Promise((resolve) => this._completionResolver = resolve);
}
get name() { return TaskName.Transcribe; }
async exec(cs, ep, parentTask) {
this.ep = ep;
this.actionHook = ep.cs.actionHook;
this.transcribeInProgress = true;
try {
await this._initSpeech(ep);
await this._startTranscribing(ep);
await this._completionPromise;
} catch (err) {
this.logger.info(err, 'TaskTranscribe:exec - error');
}
this.transcribeInProgress = true;
ep.removeCustomEventListener(TranscriptionEvents.Transcription);
}
async kill() {
super.kill();
if (this.ep.connected && this.transcribeInProgress) {
this.ep.stopTranscription().catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
// hangup after 1 sec if we don't get a final transcription
this._timer = setTimeout(() => this._completionResolver(), 1000);
}
else {
this._completionResolver();
}
await this._completionPromise;
}
async _initSpeech(ep) {
const opts = {
GOOGLE_SPEECH_USE_ENHANCED: true,
GOOGLE_SPEECH_MODEL: 'phone_call'
};
if (this.hints) {
Object.assign(opts, {'GOOGLE_SPEECH_HINTS': this.hints.join(',')});
}
if (this.profanityFilter === true) {
Object.assign(opts, {'GOOGLE_SPEECH_PROFANITY_FILTER': true});
}
await ep.set(opts)
.catch((err) => this.logger.info(err, 'TaskTranscribe:_initSpeech error setting fs vars'));
ep.addCustomEventListener(TranscriptionEvents.Transcription, this._onTranscription.bind(this, ep));
ep.addCustomEventListener(TranscriptionEvents.NoAudioDetected, this._onNoAudio.bind(this, ep));
ep.addCustomEventListener(TranscriptionEvents.MaxDurationExceeded, this._onMaxDurationExceeded.bind(this, ep));
}
async _startTranscribing(ep) {
await ep.startTranscription({
interim: this.interim ? true : false,
language: this.language
});
}
_onTranscription(ep, evt) {
this.logger.debug(evt, 'TaskTranscribe:_onTranscription');
this.actionHook(this.action, 'POST', {
Speech: evt
});
if (this.killed) {
this.logger.debug('TaskTranscribe:_onTranscription exiting after receiving final transcription');
this._clearTimer();
this._completionResolver();
}
}
_onNoAudio(ep) {
this.logger.debug('TaskTranscribe:_onNoAudio restarting transcription');
this._startTranscribing(ep);
}
_onMaxDurationExceeded(ep) {
this.logger.debug('TaskTranscribe:_onMaxDurationExceeded restarting transcription');
this._startTranscribing(ep);
}
_clearTimer() {
if (this._timer) {
clearTimeout(this._timer);
this._timer = null;
}
}
}
module.exports = TaskTranscribe;