This commit is contained in:
Hoan HL
2025-12-01 15:44:44 +07:00
13 changed files with 158 additions and 49 deletions

View File

@@ -951,9 +951,11 @@ class CallSession extends Emitter {
}
stopTtsStream() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.stop();
if (this.appIsUsingWebsockets) {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.stop();
}
}
async enableBotMode(gather, autoEnable) {
@@ -979,7 +981,7 @@ class CallSession extends Emitter {
task.sticky = autoEnable;
// listen to the bargein-done from background manager
this.backgroundTaskManager.on('bargeIn-done', () => {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
try {
this.kill(true);
} catch (err) {}
@@ -1194,7 +1196,8 @@ class CallSession extends Emitter {
speech_credential_sid: credential.speech_credential_sid,
client_id: credential.client_id,
client_key: credential.client_key,
user_id: credential.user_id
user_id: credential.user_id,
houndify_server_uri: credential.houndify_server_uri
};
}
else if ('deepgramflux' === vendor) {
@@ -1338,7 +1341,7 @@ class CallSession extends Emitter {
}
if (0 === this.tasks.length &&
this.requestor instanceof WsRequestor &&
this.appIsUsingWebsockets &&
!this.requestor.closedGracefully &&
!this.callGone &&
!this.isConfirmCallSession
@@ -3024,14 +3027,14 @@ Duration=${duration} `
*/
_notifyTaskError(obj) {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
this.requestor.request('jambonz:error', '/error', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskError - Error sending'));
}
}
_notifyTaskStatus(task, evt) {
if (this.notifyEvents && this.requestor instanceof WsRequestor) {
if (this.notifyEvents && this.appIsUsingWebsockets) {
const obj = {...evt, id: task.id, name: task.name};
this.requestor.request('verb:status', '/status', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
@@ -3083,7 +3086,7 @@ Duration=${duration} `
}
_clearTasks(backgroundGather, evt) {
if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) {
if (this.appIsUsingWebsockets && !backgroundGather.cleared) {
this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather');
try {
backgroundGather.cleared = true;

View File

@@ -21,7 +21,7 @@ const {parseUri} = require('drachtio-srf');
const {ANCHOR_MEDIA_ALWAYS,
JAMBONZ_DIAL_PAI_HEADER,
JAMBONES_DIAL_SBC_FOR_REGISTERED_USER} = require('../config');
const { isOnhold, isOpusFirst } = require('../utils/sdp-utils');
const { isOnhold, isOpusFirst, getLeadingCodec } = require('../utils/sdp-utils');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const { selectHostPort } = require('../utils/network');
const { sleepFor } = require('../utils/helpers');
@@ -158,6 +158,7 @@ class TaskDial extends Task {
get canReleaseMedia() {
const keepAnchor = this.data.anchorMedia ||
this.weAreTranscoding ||
this.cs.isBackGroundListen ||
this.cs.onHoldMusic ||
ANCHOR_MEDIA_ALWAYS ||
@@ -929,7 +930,13 @@ class TaskDial extends Task {
this.logger.info({err}, 'Dial:_selectSingleDial - Error boosting audio signal');
}
}
/* basic determination to see if call is being transcoded */
const codecA = getLeadingCodec(this.epOther.local.sdp);
const codecB = getLeadingCodec(this.ep.remote.sdp);
this.weAreTranscoding = (codecA !== codecB);
if (this.weAreTranscoding) {
this.logger.info(`Dial:_selectSingleDial - transcoding from ${codecA} (A leg) to ${codecB} (B leg)`);
}
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia || this.shouldExitMediaPathEntirely) {
setTimeout(this._releaseMedia.bind(this, cs, sd, this.shouldExitMediaPathEntirely), 200);

View File

@@ -259,7 +259,7 @@ class TaskGather extends SttTask {
startDtmfListener();
}
this._stopVad();
if (!this.killed) {
if (!this.killed && !this.resolved) {
startListening(cs, ep);
if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) {
this.logger.debug('Gather:exec - starting transcription timers after say completes');
@@ -297,7 +297,7 @@ class TaskGather extends SttTask {
startDtmfListener();
}
this._stopVad();
if (!this.killed) {
if (!this.killed && !this.resolved) {
startListening(cs, ep);
if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) {
this.logger.debug('Gather:exec - starting transcription timers after play completes');
@@ -1173,7 +1173,7 @@ class TaskGather extends SttTask {
}
async _startFallback(cs, ep, evt) {
if (this.canFallback) {
if (this.canFallback()) {
this._stopTranscribing(ep);
try {
this.logger.debug('gather:_startFallback');

View File

@@ -2,8 +2,9 @@ const assert = require('assert');
const TtsTask = require('./tts-task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const pollySSMLSplit = require('polly-ssml-split');
const { SpeechCredentialError } = require('../utils/error');
const { SpeechCredentialError, NonFatalTaskError } = require('../utils/error');
const { sleepFor } = require('../utils/helpers');
const { NON_FANTAL_ERRORS } = require('../utils/constants.json');
/**
* Discard unmatching responses:
@@ -402,11 +403,19 @@ class TaskSay extends TtsTask {
this._playResolve = resolve;
this._playReject = reject;
});
const r = await ep.play(filename);
this.logger.debug({r}, 'Say:exec play result');
if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) {
this._playReject(new Error('Playback failed to start'));
try {
const r = await ep.play(filename);
this.logger.debug({r}, 'Say:exec play result');
if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) {
this._playReject(new Error('Playback failed to start'));
}
} catch (err) {
if (NON_FANTAL_ERRORS.includes(err.message)) {
throw new NonFatalTaskError(err.message);
}
throw err;
}
try {
// wait for playback-stop event received to confirm if the playback is successful
await this._playPromise;

View File

@@ -171,7 +171,7 @@ class SttTask extends Task {
try {
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
} catch (error) {
if (this.canFallback) {
if (this.canFallback()) {
this.notifyError(
{
msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`,
@@ -260,8 +260,19 @@ class SttTask extends Task {
ep.addCustomEventListener(event, handler);
}
removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
removeCustomEventListeners(ep) {
if (ep) {
// for specific endpoint
this.eventHandlers.filter((h) => h.ep === ep).forEach((h) => {
h.ep.removeCustomEventListener(h.event, h.handler);
});
this.eventHandlers = this.eventHandlers.filter((h) => h.ep !== ep);
return;
} else {
// for all endpoints
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
this.eventHandlers = [];
}
}
async _initSpeechCredentials(cs, vendor, label) {
@@ -329,11 +340,13 @@ class SttTask extends Task {
return credentials;
}
get canFallback() {
canFallback() {
return this.fallbackVendor && this.isHandledByPrimaryProvider && !this.cs.hasFallbackAsr;
}
async _initFallback() {
// ep is optional for gather or any verb that have single ep,
// but transcribe does need as it might has 2 eps
async _initFallback(ep) {
assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration');
this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`);
this.isHandledByPrimaryProvider = false;
@@ -346,7 +359,7 @@ class SttTask extends Task {
this.data.recognizer.label = this.label;
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
// cleanup previous listener from previous vendor
this.removeCustomEventListeners();
this.removeCustomEventListeners(ep);
}
async compileHintsForCobalt(ep, hostport, model, token, hints) {

View File

@@ -70,6 +70,9 @@ class TaskTranscribe extends SttTask {
this._bufferedTranscripts = [ [], [] ]; // for channel 1 and 2
this.bugname_prefix = 'transcribe_';
this.paused = false;
// fallback flags
this.isHandledByPrimaryProviderForEp1 = true;
this.isHandledByPrimaryProviderForEp2 = true;
}
get name() { return TaskName.Transcribe; }
@@ -776,7 +779,7 @@ class TaskTranscribe extends SttTask {
}
async _startFallback(cs, _ep, evt) {
if (this.canFallback) {
if (this.canFallback(_ep)) {
_ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
@@ -786,7 +789,7 @@ class TaskTranscribe extends SttTask {
try {
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'});
await this._initFallback();
await this._initFallback(_ep);
let channel = 1;
if (this.ep !== _ep) {
channel = 2;
@@ -895,6 +898,41 @@ class TaskTranscribe extends SttTask {
if (this._asrTimer) clearTimeout(this._asrTimer);
this._asrTimer = null;
}
// We need to keep track the fallback is happened for each endpoint
// override the canFallback and _initFallback methods to make sure that
// we only fallback once per endpoint
// we want to keep track this on task level instead of endpoint level
// because the endpoint instance is used across multiple tasks.
canFallback(ep) {
let isHandledByPrimaryProvider = this.isHandledByPrimaryProvider;
if (ep === this.ep) {
isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp1;
} else if (ep === this.ep2) {
isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp2;
}
const isOneOfEndpointAlreadyFallenBack = !!this.ep && !!this.ep2 &&
this.isHandledByPrimaryProviderForEp1 !== this.isHandledByPrimaryProviderForEp2;
// fallback is configured
return this.fallbackVendor &&
// has this endpoint already fallen back
isHandledByPrimaryProvider &&
// in global level, is there any fallback is already happened
// one fallen endpoint will mark cs.hasFallbackAsr to true,
// so if one endpoint was fallen, the other endpoint would be able to fallback.
(isOneOfEndpointAlreadyFallenBack || !this.cs.hasFallbackAsr);
}
_initFallback(ep) {
if (ep === this.ep) {
this.isHandledByPrimaryProviderForEp1 = false;
} else if (ep === this.ep2) {
this.isHandledByPrimaryProviderForEp2 = false;
}
return super._initFallback(ep);
}
}
module.exports = TaskTranscribe;

View File

@@ -279,9 +279,9 @@ class TtsTask extends Task {
}
/* produce an audio segment from the provided text */
const generateAudio = async(text) => {
if (this.killed) return;
if (text.startsWith('silence_stream://')) return text;
const generateAudio = async(text, index) => {
if (this.killed) return {index, filePath: null};
if (text.startsWith('silence_stream://')) return {index, filePath: text};
/* otel: trace time for tts */
if (!preCache && !this._disableTracing) {
@@ -310,7 +310,6 @@ class TtsTask extends Task {
renderForCaching: preCache
});
if (!filePath.startsWith('say:')) {
this.playbackIds.push(null);
this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (this.otelSpan) {
@@ -338,10 +337,11 @@ class TtsTask extends Task {
'id': this.id
});
}
return {index, filePath, playbackId: null};
}
else {
this.playbackIds.push(extractPlaybackId(filePath));
this.logger.debug({playbackIds: this.playbackIds}, 'Say: a streaming tts api will be used');
const playbackId = extractPlaybackId(filePath);
this.logger.debug('Say: a streaming tts api will be used');
const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`);
this.notifyStatus({
event: 'synthesized-audio',
@@ -350,9 +350,8 @@ class TtsTask extends Task {
servedFromCache,
'id': this.id
});
return modifiedPath;
return {index, filePath: modifiedPath, playbackId};
}
return filePath;
} catch (err) {
this.logger.info({err}, 'Error synthesizing tts');
if (this.otelSpan) this.otelSpan.end();
@@ -367,8 +366,20 @@ class TtsTask extends Task {
}
};
const arr = this.text.map((t) => (this._validateURL(t) ? t : generateAudio(t)));
return (await Promise.all(arr)).filter((fp) => fp && fp.length);
// process all text segments in parallel will cause ordering issue
// so we attach index to each promise result and sort them later
const arr = this.text.map((t, index) => (this._validateURL(t) ?
Promise.resolve({index, filePath: t, playbackId: null}) : generateAudio(t, index)));
const results = await Promise.all(arr);
const sorted = results.sort((a, b) => a.index - b.index);
return sorted
.filter((fp) => fp.filePath && fp.filePath.length)
.map((r) => {
this.playbackIds.push(r.playbackId);
return r.filePath;
});
} catch (err) {
this.logger.info(err, 'TaskSay:exec error');
throw err;

View File

@@ -362,5 +362,8 @@
"WS_CLOSE_CODES": {
"NormalClosure": 1000,
"GoingAway": 1001
}
},
"NON_FANTAL_ERRORS": [
"File Not Found"
]
}

View File

@@ -153,6 +153,7 @@ const speechMapper = (cred) => {
obj.client_id = o.client_id;
obj.client_key = o.client_key;
obj.user_id = o.user_id;
obj.houndify_server_uri = o.houndify_server_uri;
}
else if ('voxist' === obj.vendor) {
const o = JSON.parse(decrypt(credential));

View File

@@ -55,11 +55,28 @@ const extractSdpMedia = (sdp) => {
}
};
const getLeadingCodec = (sdp) => {
if (!sdp) {
return null;
}
const parsed = sdpTransform.parse(sdp);
const audio = parsed.media?.find((m) => m.type === 'audio');
if (!audio) {
return null;
}
return audio.rtp?.[0]?.codec || null;
};
module.exports = {
isOnhold,
mergeSdpMedia,
extractSdpMedia,
isOpusFirst,
makeOpusFirst,
removeVideoSdp
removeVideoSdp,
getLeadingCodec
};

View File

@@ -939,7 +939,7 @@ module.exports = (logger) => {
...(rOpts.initialSpeechTimeoutMs > 0 &&
{AZURE_INITIAL_SPEECH_TIMEOUT_MS: rOpts.initialSpeechTimeoutMs}),
...(rOpts.requestSnr && {AZURE_REQUEST_SNR: 1}),
...(rOpts.audioLogging && {AZURE_AUDIO_LOGGING: 1}),
...(azureOptions.audioLogging && {AZURE_AUDIO_LOGGING: 1}),
...{AZURE_USE_OUTPUT_FORMAT_DETAILED: 1},
...(azureOptions.speechSegmentationSilenceTimeoutMs &&
{AZURE_SPEECH_SEGMENTATION_SILENCE_TIMEOUT_MS: azureOptions.speechSegmentationSilenceTimeoutMs}),
@@ -1263,8 +1263,10 @@ module.exports = (logger) => {
audioFormat, enableNoiseReduction, enableProfanityFilter, enablePunctuation,
enableCapitalization, confidenceThreshold, enableDisfluencyFilter,
maxResults, enableWordTimestamps, maxAlternatives, partialTranscriptInterval,
sessionTimeout, connectionTimeout, customVocabulary, languageModel
sessionTimeout, connectionTimeout, customVocabulary, languageModel,
requestInfo, sampleRate
} = rOpts.houndifyOptions || {};
const audioEndpointUri = audioEndpoint || sttCredentials.houndify_server_uri;
opts = {
...opts,
@@ -1300,10 +1302,12 @@ module.exports = (logger) => {
...(country && {HOUNDIFY_COUNTRY: country}),
...(timeZone && {HOUNDIFY_TIMEZONE: timeZone}),
...(domain && {HOUNDIFY_DOMAIN: domain}),
...(audioEndpoint && {HOUNDIFY_AUDIO_ENDPOINT: audioEndpoint}),
...(audioEndpointUri && {HOUNDIFY_AUDIO_ENDPOINT: audioEndpointUri}),
...(customVocabulary && {HOUNDIFY_CUSTOM_VOCABULARY:
Array.isArray(customVocabulary) ? customVocabulary.join(',') : customVocabulary}),
...(languageModel && {HOUNDIFY_LANGUAGE_MODEL: languageModel}),
...(requestInfo && {HOUNDIFY_REQUEST_INFO: JSON.stringify(requestInfo)}),
...(sampleRate && {HOUNDIFY_SAMPLING_RATE: sampleRate}),
};
}
else if ('voxist' === vendor) {

15
package-lock.json generated
View File

@@ -18,7 +18,7 @@
"@jambonz/speech-utils": "^0.2.26",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.14",
"@jambonz/verb-specifications": "^0.0.121",
"@jambonz/verb-specifications": "^0.0.122",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
@@ -1533,9 +1533,9 @@
}
},
"node_modules/@jambonz/verb-specifications": {
"version": "0.0.121",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.121.tgz",
"integrity": "sha512-urM8rCJihQn+/dB+0+rlIDf36MlIfCOhCpqqBc1euET2A5fxYP6XXXpNmxuk2CqiU5xNqhsSv362fqKIHtF3dw==",
"version": "0.0.122",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.122.tgz",
"integrity": "sha512-7xqaULhKFywJ2ZuyiYt77iiJwJ+8b98Zt1X4+OqZ7Cdjhfo7S6KnR66XRVJHnekXbmfVv58kB0KWUux5TG//Sw==",
"license": "MIT",
"dependencies": {
"debug": "^4.3.4",
@@ -6183,9 +6183,10 @@
"license": "MIT"
},
"node_modules/js-yaml": {
"version": "3.14.1",
"version": "3.14.2",
"resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz",
"integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==",
"dev": true,
"license": "MIT",
"dependencies": {
"argparse": "^1.0.7",
"esprima": "^4.0.0"
@@ -6542,6 +6543,8 @@
},
"node_modules/microsoft-cognitiveservices-speech-sdk/node_modules/utf-8-validate": {
"version": "5.0.10",
"resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.10.tgz",
"integrity": "sha512-Z6czzLq4u8fPOyx7TU6X3dvUZVvoJmxSQ+IcrlmagKhilxlhZgxPK6C5Jqbkw1IDUmFTM+cz9QDnnLTwDz/2gQ==",
"hasInstallScript": true,
"license": "MIT",
"optional": true,

View File

@@ -34,7 +34,7 @@
"@jambonz/speech-utils": "^0.2.26",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.14",
"@jambonz/verb-specifications": "^0.0.121",
"@jambonz/verb-specifications": "^0.0.122",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",