diff --git a/lib/session/call-session.js b/lib/session/call-session.js index a14a10df..ee9a83a8 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -951,9 +951,11 @@ class CallSession extends Emitter { } stopTtsStream() { - this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) - .catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption')); - this.ttsStreamingBuffer?.stop(); + if (this.appIsUsingWebsockets) { + this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'}) + .catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption')); + this.ttsStreamingBuffer?.stop(); + } } async enableBotMode(gather, autoEnable) { @@ -979,7 +981,7 @@ class CallSession extends Emitter { task.sticky = autoEnable; // listen to the bargein-done from background manager this.backgroundTaskManager.on('bargeIn-done', () => { - if (this.requestor instanceof WsRequestor) { + if (this.appIsUsingWebsockets) { try { this.kill(true); } catch (err) {} @@ -1194,7 +1196,8 @@ class CallSession extends Emitter { speech_credential_sid: credential.speech_credential_sid, client_id: credential.client_id, client_key: credential.client_key, - user_id: credential.user_id + user_id: credential.user_id, + houndify_server_uri: credential.houndify_server_uri }; } else if ('deepgramflux' === vendor) { @@ -1338,7 +1341,7 @@ class CallSession extends Emitter { } if (0 === this.tasks.length && - this.requestor instanceof WsRequestor && + this.appIsUsingWebsockets && !this.requestor.closedGracefully && !this.callGone && !this.isConfirmCallSession @@ -3024,14 +3027,14 @@ Duration=${duration} ` */ _notifyTaskError(obj) { - if (this.requestor instanceof WsRequestor) { + if (this.appIsUsingWebsockets) { this.requestor.request('jambonz:error', '/error', obj) .catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskError - Error sending')); } } _notifyTaskStatus(task, evt) { - if (this.notifyEvents && this.requestor instanceof WsRequestor) { + if (this.notifyEvents && this.appIsUsingWebsockets) { const obj = {...evt, id: task.id, name: task.name}; this.requestor.request('verb:status', '/status', obj) .catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending')); @@ -3083,7 +3086,7 @@ Duration=${duration} ` } _clearTasks(backgroundGather, evt) { - if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) { + if (this.appIsUsingWebsockets && !backgroundGather.cleared) { this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather'); try { backgroundGather.cleared = true; diff --git a/lib/tasks/dial.js b/lib/tasks/dial.js index 1537cae7..f14b6259 100644 --- a/lib/tasks/dial.js +++ b/lib/tasks/dial.js @@ -21,7 +21,7 @@ const {parseUri} = require('drachtio-srf'); const {ANCHOR_MEDIA_ALWAYS, JAMBONZ_DIAL_PAI_HEADER, JAMBONES_DIAL_SBC_FOR_REGISTERED_USER} = require('../config'); -const { isOnhold, isOpusFirst } = require('../utils/sdp-utils'); +const { isOnhold, isOpusFirst, getLeadingCodec } = require('../utils/sdp-utils'); const { normalizeJambones } = require('@jambonz/verb-specifications'); const { selectHostPort } = require('../utils/network'); const { sleepFor } = require('../utils/helpers'); @@ -158,6 +158,7 @@ class TaskDial extends Task { get canReleaseMedia() { const keepAnchor = this.data.anchorMedia || + this.weAreTranscoding || this.cs.isBackGroundListen || this.cs.onHoldMusic || ANCHOR_MEDIA_ALWAYS || @@ -929,7 +930,13 @@ class TaskDial extends Task { this.logger.info({err}, 'Dial:_selectSingleDial - Error boosting audio signal'); } } - + /* basic determination to see if call is being transcoded */ + const codecA = getLeadingCodec(this.epOther.local.sdp); + const codecB = getLeadingCodec(this.ep.remote.sdp); + this.weAreTranscoding = (codecA !== codecB); + if (this.weAreTranscoding) { + this.logger.info(`Dial:_selectSingleDial - transcoding from ${codecA} (A leg) to ${codecB} (B leg)`); + } /* if we can release the media back to the SBC, do so now */ if (this.canReleaseMedia || this.shouldExitMediaPathEntirely) { setTimeout(this._releaseMedia.bind(this, cs, sd, this.shouldExitMediaPathEntirely), 200); diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index af453c12..441e78e0 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -259,7 +259,7 @@ class TaskGather extends SttTask { startDtmfListener(); } this._stopVad(); - if (!this.killed) { + if (!this.killed && !this.resolved) { startListening(cs, ep); if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) { this.logger.debug('Gather:exec - starting transcription timers after say completes'); @@ -297,7 +297,7 @@ class TaskGather extends SttTask { startDtmfListener(); } this._stopVad(); - if (!this.killed) { + if (!this.killed && !this.resolved) { startListening(cs, ep); if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) { this.logger.debug('Gather:exec - starting transcription timers after play completes'); @@ -1173,7 +1173,7 @@ class TaskGather extends SttTask { } async _startFallback(cs, ep, evt) { - if (this.canFallback) { + if (this.canFallback()) { this._stopTranscribing(ep); try { this.logger.debug('gather:_startFallback'); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 44709172..e29f7a5d 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -2,8 +2,9 @@ const assert = require('assert'); const TtsTask = require('./tts-task'); const {TaskName, TaskPreconditions} = require('../utils/constants'); const pollySSMLSplit = require('polly-ssml-split'); -const { SpeechCredentialError } = require('../utils/error'); +const { SpeechCredentialError, NonFatalTaskError } = require('../utils/error'); const { sleepFor } = require('../utils/helpers'); +const { NON_FANTAL_ERRORS } = require('../utils/constants.json'); /** * Discard unmatching responses: @@ -402,11 +403,19 @@ class TaskSay extends TtsTask { this._playResolve = resolve; this._playReject = reject; }); - const r = await ep.play(filename); - this.logger.debug({r}, 'Say:exec play result'); - if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) { - this._playReject(new Error('Playback failed to start')); + try { + const r = await ep.play(filename); + this.logger.debug({r}, 'Say:exec play result'); + if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) { + this._playReject(new Error('Playback failed to start')); + } + } catch (err) { + if (NON_FANTAL_ERRORS.includes(err.message)) { + throw new NonFatalTaskError(err.message); + } + throw err; } + try { // wait for playback-stop event received to confirm if the playback is successful await this._playPromise; diff --git a/lib/tasks/stt-task.js b/lib/tasks/stt-task.js index 79201185..06cd4992 100644 --- a/lib/tasks/stt-task.js +++ b/lib/tasks/stt-task.js @@ -171,7 +171,7 @@ class SttTask extends Task { try { this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label); } catch (error) { - if (this.canFallback) { + if (this.canFallback()) { this.notifyError( { msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`, @@ -260,8 +260,19 @@ class SttTask extends Task { ep.addCustomEventListener(event, handler); } - removeCustomEventListeners() { - this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + removeCustomEventListeners(ep) { + if (ep) { + // for specific endpoint + this.eventHandlers.filter((h) => h.ep === ep).forEach((h) => { + h.ep.removeCustomEventListener(h.event, h.handler); + }); + this.eventHandlers = this.eventHandlers.filter((h) => h.ep !== ep); + return; + } else { + // for all endpoints + this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + this.eventHandlers = []; + } } async _initSpeechCredentials(cs, vendor, label) { @@ -329,11 +340,13 @@ class SttTask extends Task { return credentials; } - get canFallback() { + canFallback() { return this.fallbackVendor && this.isHandledByPrimaryProvider && !this.cs.hasFallbackAsr; } - async _initFallback() { + // ep is optional for gather or any verb that have single ep, + // but transcribe does need as it might has 2 eps + async _initFallback(ep) { assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration'); this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`); this.isHandledByPrimaryProvider = false; @@ -346,7 +359,7 @@ class SttTask extends Task { this.data.recognizer.label = this.label; this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label); // cleanup previous listener from previous vendor - this.removeCustomEventListeners(); + this.removeCustomEventListeners(ep); } async compileHintsForCobalt(ep, hostport, model, token, hints) { diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index ec51685c..34ccf59b 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -70,6 +70,9 @@ class TaskTranscribe extends SttTask { this._bufferedTranscripts = [ [], [] ]; // for channel 1 and 2 this.bugname_prefix = 'transcribe_'; this.paused = false; + // fallback flags + this.isHandledByPrimaryProviderForEp1 = true; + this.isHandledByPrimaryProviderForEp2 = true; } get name() { return TaskName.Transcribe; } @@ -776,7 +779,7 @@ class TaskTranscribe extends SttTask { } async _startFallback(cs, _ep, evt) { - if (this.canFallback) { + if (this.canFallback(_ep)) { _ep.stopTranscription({ vendor: this.vendor, bugname: this.bugname, @@ -786,7 +789,7 @@ class TaskTranscribe extends SttTask { try { this.notifyError({ msg: 'ASR error', details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'}); - await this._initFallback(); + await this._initFallback(_ep); let channel = 1; if (this.ep !== _ep) { channel = 2; @@ -895,6 +898,41 @@ class TaskTranscribe extends SttTask { if (this._asrTimer) clearTimeout(this._asrTimer); this._asrTimer = null; } + + // We need to keep track the fallback is happened for each endpoint + // override the canFallback and _initFallback methods to make sure that + // we only fallback once per endpoint + // we want to keep track this on task level instead of endpoint level + // because the endpoint instance is used across multiple tasks. + canFallback(ep) { + let isHandledByPrimaryProvider = this.isHandledByPrimaryProvider; + if (ep === this.ep) { + isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp1; + } else if (ep === this.ep2) { + isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp2; + } + + const isOneOfEndpointAlreadyFallenBack = !!this.ep && !!this.ep2 && + this.isHandledByPrimaryProviderForEp1 !== this.isHandledByPrimaryProviderForEp2; + + // fallback is configured + return this.fallbackVendor && + // has this endpoint already fallen back + isHandledByPrimaryProvider && + // in global level, is there any fallback is already happened + // one fallen endpoint will mark cs.hasFallbackAsr to true, + // so if one endpoint was fallen, the other endpoint would be able to fallback. + (isOneOfEndpointAlreadyFallenBack || !this.cs.hasFallbackAsr); + } + + _initFallback(ep) { + if (ep === this.ep) { + this.isHandledByPrimaryProviderForEp1 = false; + } else if (ep === this.ep2) { + this.isHandledByPrimaryProviderForEp2 = false; + } + return super._initFallback(ep); + } } module.exports = TaskTranscribe; diff --git a/lib/tasks/tts-task.js b/lib/tasks/tts-task.js index 3dc6e716..2663005c 100644 --- a/lib/tasks/tts-task.js +++ b/lib/tasks/tts-task.js @@ -279,9 +279,9 @@ class TtsTask extends Task { } /* produce an audio segment from the provided text */ - const generateAudio = async(text) => { - if (this.killed) return; - if (text.startsWith('silence_stream://')) return text; + const generateAudio = async(text, index) => { + if (this.killed) return {index, filePath: null}; + if (text.startsWith('silence_stream://')) return {index, filePath: text}; /* otel: trace time for tts */ if (!preCache && !this._disableTracing) { @@ -310,7 +310,6 @@ class TtsTask extends Task { renderForCaching: preCache }); if (!filePath.startsWith('say:')) { - this.playbackIds.push(null); this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`); if (filePath) cs.trackTmpFile(filePath); if (this.otelSpan) { @@ -338,10 +337,11 @@ class TtsTask extends Task { 'id': this.id }); } + return {index, filePath, playbackId: null}; } else { - this.playbackIds.push(extractPlaybackId(filePath)); - this.logger.debug({playbackIds: this.playbackIds}, 'Say: a streaming tts api will be used'); + const playbackId = extractPlaybackId(filePath); + this.logger.debug('Say: a streaming tts api will be used'); const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`); this.notifyStatus({ event: 'synthesized-audio', @@ -350,9 +350,8 @@ class TtsTask extends Task { servedFromCache, 'id': this.id }); - return modifiedPath; + return {index, filePath: modifiedPath, playbackId}; } - return filePath; } catch (err) { this.logger.info({err}, 'Error synthesizing tts'); if (this.otelSpan) this.otelSpan.end(); @@ -367,8 +366,20 @@ class TtsTask extends Task { } }; - const arr = this.text.map((t) => (this._validateURL(t) ? t : generateAudio(t))); - return (await Promise.all(arr)).filter((fp) => fp && fp.length); + // process all text segments in parallel will cause ordering issue + // so we attach index to each promise result and sort them later + + const arr = this.text.map((t, index) => (this._validateURL(t) ? + Promise.resolve({index, filePath: t, playbackId: null}) : generateAudio(t, index))); + const results = await Promise.all(arr); + const sorted = results.sort((a, b) => a.index - b.index); + + return sorted + .filter((fp) => fp.filePath && fp.filePath.length) + .map((r) => { + this.playbackIds.push(r.playbackId); + return r.filePath; + }); } catch (err) { this.logger.info(err, 'TaskSay:exec error'); throw err; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index e429d1e9..13b383d8 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -362,5 +362,8 @@ "WS_CLOSE_CODES": { "NormalClosure": 1000, "GoingAway": 1001 - } + }, + "NON_FANTAL_ERRORS": [ + "File Not Found" + ] } diff --git a/lib/utils/db-utils.js b/lib/utils/db-utils.js index a602ff58..268a8f9e 100644 --- a/lib/utils/db-utils.js +++ b/lib/utils/db-utils.js @@ -153,6 +153,7 @@ const speechMapper = (cred) => { obj.client_id = o.client_id; obj.client_key = o.client_key; obj.user_id = o.user_id; + obj.houndify_server_uri = o.houndify_server_uri; } else if ('voxist' === obj.vendor) { const o = JSON.parse(decrypt(credential)); diff --git a/lib/utils/sdp-utils.js b/lib/utils/sdp-utils.js index 9a3d27f7..785389a9 100644 --- a/lib/utils/sdp-utils.js +++ b/lib/utils/sdp-utils.js @@ -55,11 +55,28 @@ const extractSdpMedia = (sdp) => { } }; +const getLeadingCodec = (sdp) => { + if (!sdp) { + return null; + } + + const parsed = sdpTransform.parse(sdp); + const audio = parsed.media?.find((m) => m.type === 'audio'); + + if (!audio) { + return null; + } + + return audio.rtp?.[0]?.codec || null; +}; + + module.exports = { isOnhold, mergeSdpMedia, extractSdpMedia, isOpusFirst, makeOpusFirst, - removeVideoSdp + removeVideoSdp, + getLeadingCodec }; diff --git a/lib/utils/transcription-utils.js b/lib/utils/transcription-utils.js index 565160a7..c89acb3d 100644 --- a/lib/utils/transcription-utils.js +++ b/lib/utils/transcription-utils.js @@ -939,7 +939,7 @@ module.exports = (logger) => { ...(rOpts.initialSpeechTimeoutMs > 0 && {AZURE_INITIAL_SPEECH_TIMEOUT_MS: rOpts.initialSpeechTimeoutMs}), ...(rOpts.requestSnr && {AZURE_REQUEST_SNR: 1}), - ...(rOpts.audioLogging && {AZURE_AUDIO_LOGGING: 1}), + ...(azureOptions.audioLogging && {AZURE_AUDIO_LOGGING: 1}), ...{AZURE_USE_OUTPUT_FORMAT_DETAILED: 1}, ...(azureOptions.speechSegmentationSilenceTimeoutMs && {AZURE_SPEECH_SEGMENTATION_SILENCE_TIMEOUT_MS: azureOptions.speechSegmentationSilenceTimeoutMs}), @@ -1263,8 +1263,10 @@ module.exports = (logger) => { audioFormat, enableNoiseReduction, enableProfanityFilter, enablePunctuation, enableCapitalization, confidenceThreshold, enableDisfluencyFilter, maxResults, enableWordTimestamps, maxAlternatives, partialTranscriptInterval, - sessionTimeout, connectionTimeout, customVocabulary, languageModel + sessionTimeout, connectionTimeout, customVocabulary, languageModel, + requestInfo, sampleRate } = rOpts.houndifyOptions || {}; + const audioEndpointUri = audioEndpoint || sttCredentials.houndify_server_uri; opts = { ...opts, @@ -1300,10 +1302,12 @@ module.exports = (logger) => { ...(country && {HOUNDIFY_COUNTRY: country}), ...(timeZone && {HOUNDIFY_TIMEZONE: timeZone}), ...(domain && {HOUNDIFY_DOMAIN: domain}), - ...(audioEndpoint && {HOUNDIFY_AUDIO_ENDPOINT: audioEndpoint}), + ...(audioEndpointUri && {HOUNDIFY_AUDIO_ENDPOINT: audioEndpointUri}), ...(customVocabulary && {HOUNDIFY_CUSTOM_VOCABULARY: Array.isArray(customVocabulary) ? customVocabulary.join(',') : customVocabulary}), ...(languageModel && {HOUNDIFY_LANGUAGE_MODEL: languageModel}), + ...(requestInfo && {HOUNDIFY_REQUEST_INFO: JSON.stringify(requestInfo)}), + ...(sampleRate && {HOUNDIFY_SAMPLING_RATE: sampleRate}), }; } else if ('voxist' === vendor) { diff --git a/package-lock.json b/package-lock.json index 9f219df2..ad145316 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "@jambonz/speech-utils": "^0.2.26", "@jambonz/stats-collector": "^0.1.10", "@jambonz/time-series": "^0.2.14", - "@jambonz/verb-specifications": "^0.0.121", + "@jambonz/verb-specifications": "^0.0.122", "@modelcontextprotocol/sdk": "^1.9.0", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0", @@ -1533,9 +1533,9 @@ } }, "node_modules/@jambonz/verb-specifications": { - "version": "0.0.121", - "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.121.tgz", - "integrity": "sha512-urM8rCJihQn+/dB+0+rlIDf36MlIfCOhCpqqBc1euET2A5fxYP6XXXpNmxuk2CqiU5xNqhsSv362fqKIHtF3dw==", + "version": "0.0.122", + "resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.122.tgz", + "integrity": "sha512-7xqaULhKFywJ2ZuyiYt77iiJwJ+8b98Zt1X4+OqZ7Cdjhfo7S6KnR66XRVJHnekXbmfVv58kB0KWUux5TG//Sw==", "license": "MIT", "dependencies": { "debug": "^4.3.4", @@ -6183,9 +6183,10 @@ "license": "MIT" }, "node_modules/js-yaml": { - "version": "3.14.1", + "version": "3.14.2", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz", + "integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==", "dev": true, - "license": "MIT", "dependencies": { "argparse": "^1.0.7", "esprima": "^4.0.0" @@ -6542,6 +6543,8 @@ }, "node_modules/microsoft-cognitiveservices-speech-sdk/node_modules/utf-8-validate": { "version": "5.0.10", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz", + "integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==", "hasInstallScript": true, "license": "MIT", "optional": true, diff --git a/package.json b/package.json index 786fd6a5..ea8fd9b8 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "@jambonz/speech-utils": "^0.2.26", "@jambonz/stats-collector": "^0.1.10", "@jambonz/time-series": "^0.2.14", - "@jambonz/verb-specifications": "^0.0.121", + "@jambonz/verb-specifications": "^0.0.122", "@modelcontextprotocol/sdk": "^1.9.0", "@opentelemetry/api": "^1.8.0", "@opentelemetry/exporter-jaeger": "^1.23.0",