Compare commits

..

9 Commits

Author SHA1 Message Date
Dave Horton
de9f2ce5ca bugfix: handle error if we cannot get our own ipv4 2022-04-21 19:09:23 -04:00
Dave Horton
36c97e9562 simplify error message 2022-04-21 14:43:09 -04:00
Dave Horton
13ea559cb1 send error notification over websocket if tts fails 2022-04-21 14:33:49 -04:00
Dave Horton
698d12a95f clean up error handling in say verb 2022-04-21 10:27:33 -04:00
Dave Horton
359cb82d80 per recommendation from microsoft, do NOT sort transcripts by confidence: first transcript in the returned list is 'best' 2022-04-17 17:53:16 -04:00
Dave Horton
29dec24095 bugfix: azure stt - if we get no speech detected, listen again 2022-04-13 12:07:30 -04:00
Dave Horton
6330b0d443 Dockerfile update 2022-04-12 16:12:29 -04:00
Dave Horton
24a0bc547f gather: dont restart transcribing if task has been killed 2022-04-11 21:13:49 -04:00
Dave Horton
db5486de27 gather bugfix: dont start transcribing after call is gone 2022-04-10 15:48:35 -04:00
9 changed files with 86 additions and 45 deletions

View File

@@ -1,10 +1,10 @@
FROM node:17.7.1-slim
FROM node:slim
WORKDIR /opt/app/
COPY package.json ./
RUN npm install
COPY package.json package-lock.json ./
RUN npm ci
RUN npm prune
COPY . /opt/app
ARG NODE_ENV
ENV NODE_ENV $NODE_ENV
CMD [ "npm", "start" ]
CMD [ "npm", "start" ]

View File

@@ -331,7 +331,7 @@ class CallSession extends Emitter {
speech_credential_sid: credential.speech_credential_sid,
accessKeyId: credential.access_key_id,
secretAccessKey: credential.secret_access_key,
region: process.env.AWS_REGION || credential.aws_region
region: credential.aws_region || process.env.AWS_REGION
};
}
else if ('microsoft' === vendor) {

View File

@@ -362,7 +362,9 @@ class TaskGather extends Task {
if ('microsoft' === this.vendor) {
const final = evt.RecognitionStatus === 'Success';
if (final) {
const nbest = evt.NBest.sort((a, b) => b.Confidence - a.Confidence);
// don't sort based on confidence: https://github.com/Azure-Samples/cognitive-services-speech-sdk/issues/1463
//const nbest = evt.NBest.sort((a, b) => b.Confidence - a.Confidence);
const nbest = evt.NBest;
evt = {
is_final: true,
alternatives: [
@@ -385,7 +387,7 @@ class TaskGather extends Task {
}
}
if (evt.is_final) {
if (evt.alternatives[0].transcript === '') {
if (evt.alternatives[0].transcript === '' && !this.callSession.callGone && !this.killed) {
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, listen again');
return this._startTranscribing(ep);
}
@@ -433,7 +435,10 @@ class TaskGather extends Task {
}
_onNoSpeechDetected(cs, ep) {
this._resolve('timeout');
if (!this.callSession.callGone && !this.killed) {
this.logger.debug('TaskGather:_onNoSpeechDetected - listen again');
return this._startTranscribing(ep);
}
}
async _resolve(reason, evt) {

View File

@@ -51,55 +51,64 @@ class TaskSay extends Task {
alert_type: AlertType.TTS_NOT_PROVISIONED,
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
this.notifyError(`No speech credentials have been provisioned for ${vendor}`);
throw new Error('no provisioned speech credentials for TTS');
}
// synthesize all of the text elements
let lastUpdated = false;
/* otel: trace time for tts */
const {span} = this.startChildSpan('tts-generation', {
'tts.vendor': vendor,
'tts.language': language,
'tts.voice': voice
});
this.ttsSpan = span;
const filepath = (await Promise.all(this.text.map(async(text) => {
/* produce an audio segment from the provided text */
const generateAudio = async(text) => {
if (this.killed) return;
if (text.startsWith('silence_stream://')) return text;
const {filePath, servedFromCache} = await synthAudio(stats, {
text,
vendor,
language,
voice,
engine,
salt,
credentials
}).catch((err) => {
this.logger.info(err, 'Error synthesizing tts');
/* otel: trace time for tts */
const {span} = this.startChildSpan('tts-generation', {
'tts.vendor': vendor,
'tts.language': language,
'tts.voice': voice
});
try {
const {filePath, servedFromCache} = await synthAudio(stats, {
text,
vendor,
language,
voice,
engine,
salt,
credentials
});
this.logger.debug(`file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (!servedFromCache && !lastUpdated) {
lastUpdated = true;
updateSpeechCredentialLastUsed(credentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}
span.setAttributes({'tts.cached': servedFromCache});
span.end();
return filePath;
} catch (err) {
this.logger.info({err}, 'Error synthesizing tts');
span.end();
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.TTS_NOT_PROVISIONED,
vendor,
detail: err.message
});
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
this.logger.debug(`file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (!servedFromCache && !lastUpdated) {
lastUpdated = true;
updateSpeechCredentialLastUsed(credentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
this.notifyError(err.message || err);
return;
}
this.ttsSpan.setAttributes({'tts.cached': servedFromCache});
return filePath;
}))).filter((fp) => fp && fp.length);
this.ttsSpan?.end();
};
const arr = this.text.map((t) => generateAudio(t));
const filepath = (await Promise.all(arr)).filter((fp) => fp && fp.length);
this.logger.debug({filepath}, 'synthesized files for tts');
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep?.connected) {
let segment = 0;
do {
while (!this.killed && segment < filepath.length) {
if (cs.isInConference) {
const {memberId, confName, confUuid} = cs;
await this.playToConfMember(this.ep, memberId, confName, confUuid, filepath[segment]);
@@ -109,10 +118,10 @@ class TaskSay extends Task {
await ep.play(filepath[segment]);
this.logger.debug(`Say:exec completed play file ${filepath[segment]}`);
}
} while (!this.killed && ++segment < filepath.length);
segment++;
}
}
} catch (err) {
this.ttsSpan?.end();
this.logger.info(err, 'TaskSay:exec error');
}
this.emit('playDone');

View File

@@ -137,6 +137,12 @@ class Task extends Emitter {
return this.callSession.normalizeUrl(url, method, auth);
}
notifyError(errMsg) {
const params = {error: errMsg, verb: this.name};
this.cs.requestor.request('jambonz:error', '/error', params)
.catch((err) => this.logger.info({err}, 'Task:notifyError error sending error'));
}
async performAction(results, expectResponse = true) {
if (this.actionHook) {
const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON();

View File

@@ -253,6 +253,11 @@ class TaskTranscribe extends Task {
evt = newEvent;
}
if (evt.alternatives[0].transcript === '' && !cs.callGone && !this.killed) {
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, listen again');
return this._transcribe(ep);
}
if (this.transcriptionHook) {
const b3 = this.getTracingPropagation();
const httpHeaders = b3 && {b3};

View File

@@ -50,7 +50,11 @@ class HttpRequestor extends BaseRequestor {
* @param {object} [params] - request parameters
*/
async request(type, hook, params, httpHeaders = {}) {
/* jambonz:error only sent over ws */
if (type === 'jambonz:error') return;
assert(HookMsgTypes.includes(type));
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
const url = hook.url || hook;
const method = hook.method || 'POST';

View File

@@ -1,6 +1,5 @@
const Mrf = require('drachtio-fsmrf');
const ip = require('ip');
const localIp = ip.address();
const PORT = process.env.HTTP_PORT || 3000;
const assert = require('assert');
@@ -167,6 +166,13 @@ function installSrfLocals(srf, logger) {
commitInterval: 'test' === process.env.NODE_ENV ? 7 : 20
});
let localIp;
try {
localIp = ip.address();
} catch (err) {
logger.error({err}, 'installSrfLocals - error detecting local ipv4 address');
}
srf.locals = {...srf.locals,
dbHelpers: {
client,
@@ -201,8 +207,6 @@ function installSrfLocals(srf, logger) {
getListPosition
},
parentLogger: logger,
ipv4: localIp,
serviceUrl: `http://${localIp}:${PORT}`,
getSBC,
getSmpp: () => {
return process.env.SMPP_URL;
@@ -213,6 +217,11 @@ function installSrfLocals(srf, logger) {
writeAlerts,
AlertType
};
if (localIp) {
srf.locals.ipv4 = localIp;
srf.locals.serviceUrl = `http://${localIp}:${PORT}`;
}
}
module.exports = installSrfLocals;

View File

@@ -105,6 +105,7 @@ class WsRequestor extends BaseRequestor {
/* save the message info for reply */
const startAt = process.hrtime();
this.messagesInFlight.set(msgid, {
timer,
success: (response) => {
clearTimeout(timer);
const rtt = this._roundTrip(startAt);
@@ -135,6 +136,8 @@ class WsRequestor extends BaseRequestor {
}
for (const [msgid, obj] of this.messagesInFlight) {
const {timer} = obj;
clearTimeout(timer);
obj.failure(`abandoning msgid ${msgid} since we have closed the socket`);
}
this.messagesInFlight.clear();