Compare commits

..

12 Commits

Author SHA1 Message Date
Hoan Luu Huu
14f89364a0 Merge branch 'main' into fix/gh_1248 2025-11-29 05:30:14 +07:00
Dave Horton
cc1751f500 fix race condition where gather resolves with speech transcript but t… (#1449)
* fix race condition where gather resolves with speech transcript but timeout timer gets set after the resolve and is left running after gather completes

* remove unneeded line of code
2025-11-27 11:44:49 -06:00
Hoan HL
f4b989fa34 wip 2025-11-27 16:53:41 +07:00
Hoan HL
fc2ad38750 wip 2025-11-27 16:53:04 +07:00
Hoan HL
79e140cb20 wip 2025-11-27 16:46:52 +07:00
Hoan HL
c6ce0d968e wip 2025-11-27 16:41:14 +07:00
Hoan HL
baff899e87 wip 2025-11-27 16:25:31 +07:00
Hoan HL
c226e55fcc wip 2025-11-27 15:59:47 +07:00
Hoan HL
8853c21927 wip 2025-11-27 15:28:26 +07:00
Hoan HL
19008ca485 wip 2025-11-27 15:27:29 +07:00
Hoan HL
4065db1d88 fixed transcribe cannot fallback for specific endpoint 2025-11-27 15:07:57 +07:00
Ed Robbins
1a1f53aede Compare sdp to determine if transcoding is being used. (#1444)
* compare sdp for transcoding

* refactor sdp check for leading codec

* fix reference to epOther

* minor changes

* minor

* fix #1447

* fix security issue

* use convenience getter appIsUsingWebsockets in CallSession

---------

Co-authored-by: Dave Horton <daveh@beachdognet.com>
2025-11-24 10:50:41 -06:00
8 changed files with 104 additions and 27 deletions

View File

@@ -951,9 +951,11 @@ class CallSession extends Emitter {
}
stopTtsStream() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.stop();
if (this.appIsUsingWebsockets) {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.stop();
}
}
async enableBotMode(gather, autoEnable) {
@@ -979,7 +981,7 @@ class CallSession extends Emitter {
task.sticky = autoEnable;
// listen to the bargein-done from background manager
this.backgroundTaskManager.on('bargeIn-done', () => {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
try {
this.kill(true);
} catch (err) {}
@@ -1337,7 +1339,7 @@ class CallSession extends Emitter {
}
if (0 === this.tasks.length &&
this.requestor instanceof WsRequestor &&
this.appIsUsingWebsockets &&
!this.requestor.closedGracefully &&
!this.callGone &&
!this.isConfirmCallSession
@@ -3023,14 +3025,14 @@ Duration=${duration} `
*/
_notifyTaskError(obj) {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
this.requestor.request('jambonz:error', '/error', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskError - Error sending'));
}
}
_notifyTaskStatus(task, evt) {
if (this.notifyEvents && this.requestor instanceof WsRequestor) {
if (this.notifyEvents && this.appIsUsingWebsockets) {
const obj = {...evt, id: task.id, name: task.name};
this.requestor.request('verb:status', '/status', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
@@ -3082,7 +3084,7 @@ Duration=${duration} `
}
_clearTasks(backgroundGather, evt) {
if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) {
if (this.appIsUsingWebsockets && !backgroundGather.cleared) {
this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather');
try {
backgroundGather.cleared = true;

View File

@@ -21,7 +21,7 @@ const {parseUri} = require('drachtio-srf');
const {ANCHOR_MEDIA_ALWAYS,
JAMBONZ_DIAL_PAI_HEADER,
JAMBONES_DIAL_SBC_FOR_REGISTERED_USER} = require('../config');
const { isOnhold, isOpusFirst } = require('../utils/sdp-utils');
const { isOnhold, isOpusFirst, getLeadingCodec } = require('../utils/sdp-utils');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const { selectHostPort } = require('../utils/network');
const { sleepFor } = require('../utils/helpers');
@@ -158,6 +158,7 @@ class TaskDial extends Task {
get canReleaseMedia() {
const keepAnchor = this.data.anchorMedia ||
this.weAreTranscoding ||
this.cs.isBackGroundListen ||
this.cs.onHoldMusic ||
ANCHOR_MEDIA_ALWAYS ||
@@ -929,7 +930,13 @@ class TaskDial extends Task {
this.logger.info({err}, 'Dial:_selectSingleDial - Error boosting audio signal');
}
}
/* basic determination to see if call is being transcoded */
const codecA = getLeadingCodec(this.epOther.local.sdp);
const codecB = getLeadingCodec(this.ep.remote.sdp);
this.weAreTranscoding = (codecA !== codecB);
if (this.weAreTranscoding) {
this.logger.info(`Dial:_selectSingleDial - transcoding from ${codecA} (A leg) to ${codecB} (B leg)`);
}
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia || this.shouldExitMediaPathEntirely) {
setTimeout(this._releaseMedia.bind(this, cs, sd, this.shouldExitMediaPathEntirely), 200);

View File

@@ -258,7 +258,7 @@ class TaskGather extends SttTask {
startDtmfListener();
}
this._stopVad();
if (!this.killed) {
if (!this.killed && !this.resolved) {
startListening(cs, ep);
if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) {
this.logger.debug('Gather:exec - starting transcription timers after say completes');
@@ -296,7 +296,7 @@ class TaskGather extends SttTask {
startDtmfListener();
}
this._stopVad();
if (!this.killed) {
if (!this.killed && !this.resolved) {
startListening(cs, ep);
if (this.input.includes('speech') && this.vendor === 'nuance' && this.listenDuringPrompt) {
this.logger.debug('Gather:exec - starting transcription timers after play completes');
@@ -1161,7 +1161,7 @@ class TaskGather extends SttTask {
}
async _startFallback(cs, ep, evt) {
if (this.canFallback) {
if (this.canFallback()) {
this._stopTranscribing(ep);
try {
this.logger.debug('gather:_startFallback');

View File

@@ -171,7 +171,7 @@ class SttTask extends Task {
try {
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
} catch (error) {
if (this.canFallback) {
if (this.canFallback()) {
this.notifyError(
{
msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`,
@@ -260,8 +260,19 @@ class SttTask extends Task {
ep.addCustomEventListener(event, handler);
}
removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
removeCustomEventListeners(ep) {
if (ep) {
// for specific endpoint
this.eventHandlers.filter((h) => h.ep === ep).forEach((h) => {
h.ep.removeCustomEventListener(h.event, h.handler);
});
this.eventHandlers = this.eventHandlers.filter((h) => h.ep !== ep);
return;
} else {
// for all endpoints
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
this.eventHandlers = [];
}
}
async _initSpeechCredentials(cs, vendor, label) {
@@ -329,11 +340,13 @@ class SttTask extends Task {
return credentials;
}
get canFallback() {
canFallback() {
return this.fallbackVendor && this.isHandledByPrimaryProvider && !this.cs.hasFallbackAsr;
}
async _initFallback() {
// ep is optional for gather or any verb that have single ep,
// but transcribe does need as it might has 2 eps
async _initFallback(ep) {
assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration');
this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`);
this.isHandledByPrimaryProvider = false;
@@ -346,7 +359,7 @@ class SttTask extends Task {
this.data.recognizer.label = this.label;
this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label);
// cleanup previous listener from previous vendor
this.removeCustomEventListeners();
this.removeCustomEventListeners(ep);
}
async compileHintsForCobalt(ep, hostport, model, token, hints) {

View File

@@ -70,6 +70,9 @@ class TaskTranscribe extends SttTask {
this._bufferedTranscripts = [ [], [] ]; // for channel 1 and 2
this.bugname_prefix = 'transcribe_';
this.paused = false;
// fallback flags
this.isHandledByPrimaryProviderForEp1 = true;
this.isHandledByPrimaryProviderForEp2 = true;
}
get name() { return TaskName.Transcribe; }
@@ -776,7 +779,7 @@ class TaskTranscribe extends SttTask {
}
async _startFallback(cs, _ep, evt) {
if (this.canFallback) {
if (this.canFallback(_ep)) {
_ep.stopTranscription({
vendor: this.vendor,
bugname: this.bugname,
@@ -786,7 +789,7 @@ class TaskTranscribe extends SttTask {
try {
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'});
await this._initFallback();
await this._initFallback(_ep);
let channel = 1;
if (this.ep !== _ep) {
channel = 2;
@@ -895,6 +898,41 @@ class TaskTranscribe extends SttTask {
if (this._asrTimer) clearTimeout(this._asrTimer);
this._asrTimer = null;
}
// We need to keep track the fallback is happened for each endpoint
// override the canFallback and _initFallback methods to make sure that
// we only fallback once per endpoint
// we want to keep track this on task level instead of endpoint level
// because the endpoint instance is used across multiple tasks.
canFallback(ep) {
let isHandledByPrimaryProvider = this.isHandledByPrimaryProvider;
if (ep === this.ep) {
isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp1;
} else if (ep === this.ep2) {
isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp2;
}
const isOneOfEndpointAlreadyFallenBack = !!this.ep && !!this.ep2 &&
this.isHandledByPrimaryProviderForEp1 !== this.isHandledByPrimaryProviderForEp2;
// fallback is configured
return this.fallbackVendor &&
// has this endpoint already fallen back
isHandledByPrimaryProvider &&
// in global level, is there any fallback is already happened
// one fallen endpoint will mark cs.hasFallbackAsr to true,
// so if one endpoint was fallen, the other endpoint would be able to fallback.
(isOneOfEndpointAlreadyFallenBack || !this.cs.hasFallbackAsr);
}
_initFallback(ep) {
if (ep === this.ep) {
this.isHandledByPrimaryProviderForEp1 = false;
} else if (ep === this.ep2) {
this.isHandledByPrimaryProviderForEp2 = false;
}
return super._initFallback(ep);
}
}
module.exports = TaskTranscribe;

View File

@@ -89,9 +89,8 @@ class TtsTask extends Task {
// api_key, model_id, api_uri, custom_tts_streaming_url, and auth_token are encoded in the credentials
// allow them to be overriden via config, using options
// give preference to options passed in via config
const parsed_options = JSON.parse(options ?? '{}');
const local_options = {...parsed_options, ...this.options ?? {}};
const local_voice_settings = {...parsed_options.voice_settings ?? {}, ...this.options?.voice_settings ?? {}};
const local_options = {...JSON.parse(options), ...this.options};
const local_voice_settings = {...JSON.parse(options).voice_settings, ...this.options.voice_settings};
const local_api_key = local_options.api_key ?? api_key;
const local_model_id = local_options.model_id ?? model_id;
const local_api_uri = local_options.api_uri ?? api_uri;

View File

@@ -55,11 +55,28 @@ const extractSdpMedia = (sdp) => {
}
};
const getLeadingCodec = (sdp) => {
if (!sdp) {
return null;
}
const parsed = sdpTransform.parse(sdp);
const audio = parsed.media?.find((m) => m.type === 'audio');
if (!audio) {
return null;
}
return audio.rtp?.[0]?.codec || null;
};
module.exports = {
isOnhold,
mergeSdpMedia,
extractSdpMedia,
isOpusFirst,
makeOpusFirst,
removeVideoSdp
removeVideoSdp,
getLeadingCodec
};

5
package-lock.json generated
View File

@@ -6183,9 +6183,10 @@
"license": "MIT"
},
"node_modules/js-yaml": {
"version": "3.14.1",
"version": "3.14.2",
"resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz",
"integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==",
"dev": true,
"license": "MIT",
"dependencies": {
"argparse": "^1.0.7",
"esprima": "^4.0.0"