Compare commits

..

5 Commits

Author SHA1 Message Date
Ed Robbins
1a1f53aede Compare sdp to determine if transcoding is being used. (#1444)
* compare sdp for transcoding

* refactor sdp check for leading codec

* fix reference to epOther

* minor changes

* minor

* fix #1447

* fix security issue

* use convenience getter appIsUsingWebsockets in CallSession

---------

Co-authored-by: Dave Horton <daveh@beachdognet.com>
2025-11-24 10:50:41 -06:00
Hoan Luu Huu
1984b6d3ea allow say verb failed as NonFatalTaskError for File Not Found (#1443)
* allow say verb failed as NonFatalTaskError for File Not Found

* wip
2025-11-20 07:22:28 -05:00
Hoan Luu Huu
769b66f57e fixed playbackIds is not in correct order compare with say.text array (#1439)
* fixed playbackIds is not in correct order compare with say.text array

* wip

* wip
2025-11-19 19:00:44 -05:00
Hoan Luu Huu
98b845f489 fix say verb does not close streaming when finish say (#1412)
* fix say verb does not close streaming when finish say

* wip

* wip

* ttsStreamingBuffer reset eventHandlerCount after remove listeners

* only send tokens to module if connected

* wip

* sent stream_open when successfully connected to vendor
2025-11-17 08:56:09 -05:00
Ed Robbins
f92b1dbc97 Add ability to override certain tts streaming options via the config … (#1429)
* Add ability to override certain tts streaming options via the config verb.

* Update to null operator(??), support parameter override via config
2025-11-12 13:54:01 -05:00
10 changed files with 826 additions and 1199 deletions

View File

@@ -950,6 +950,14 @@ class CallSession extends Emitter {
this.ttsStreamingBuffer?.start();
}
stopTtsStream() {
if (this.appIsUsingWebsockets) {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.stop();
}
}
async enableBotMode(gather, autoEnable) {
try {
let task;
@@ -973,7 +981,7 @@ class CallSession extends Emitter {
task.sticky = autoEnable;
// listen to the bargein-done from background manager
this.backgroundTaskManager.on('bargeIn-done', () => {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
try {
this.kill(true);
} catch (err) {}
@@ -1272,6 +1280,7 @@ class CallSession extends Emitter {
this.ttsStreamingBuffer.on(TtsStreamingEvents.Pause, this._onTtsStreamingPause.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Resume, this._onTtsStreamingResume.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Connected, this._onTtsStreamingConnected.bind(this));
}
else {
this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`);
@@ -1330,7 +1339,7 @@ class CallSession extends Emitter {
}
if (0 === this.tasks.length &&
this.requestor instanceof WsRequestor &&
this.appIsUsingWebsockets &&
!this.requestor.closedGracefully &&
!this.callGone &&
!this.isConfirmCallSession
@@ -2556,7 +2565,7 @@ Duration=${duration} `
this.backgroundTaskManager.stopAll();
this.clearOrRestoreActionHookDelayProcessor().catch((err) => {});
this.ttsStreamingBuffer?.stop();
this.stopTtsStream();
this.sttLatencyCalculator?.stop();
}
@@ -3016,14 +3025,14 @@ Duration=${duration} `
*/
_notifyTaskError(obj) {
if (this.requestor instanceof WsRequestor) {
if (this.appIsUsingWebsockets) {
this.requestor.request('jambonz:error', '/error', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskError - Error sending'));
}
}
_notifyTaskStatus(task, evt) {
if (this.notifyEvents && this.requestor instanceof WsRequestor) {
if (this.notifyEvents && this.appIsUsingWebsockets) {
const obj = {...evt, id: task.id, name: task.name};
this.requestor.request('verb:status', '/status', obj)
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
@@ -3075,7 +3084,7 @@ Duration=${duration} `
}
_clearTasks(backgroundGather, evt) {
if (this.requestor instanceof WsRequestor && !backgroundGather.cleared) {
if (this.appIsUsingWebsockets && !backgroundGather.cleared) {
this.logger.debug({evt}, 'CallSession:_clearTasks on event from background gather');
try {
backgroundGather.cleared = true;
@@ -3103,6 +3112,11 @@ Duration=${duration} `
}
}
_onTtsStreamingConnected() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'})
.catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingConnected - Error sending'));
}
_onTtsStreamingEmpty() {
const task = this.currentTask;
if (task && TaskName.Say === task.name) {

View File

@@ -21,7 +21,7 @@ const {parseUri} = require('drachtio-srf');
const {ANCHOR_MEDIA_ALWAYS,
JAMBONZ_DIAL_PAI_HEADER,
JAMBONES_DIAL_SBC_FOR_REGISTERED_USER} = require('../config');
const { isOnhold, isOpusFirst } = require('../utils/sdp-utils');
const { isOnhold, isOpusFirst, getLeadingCodec } = require('../utils/sdp-utils');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const { selectHostPort } = require('../utils/network');
const { sleepFor } = require('../utils/helpers');
@@ -158,6 +158,7 @@ class TaskDial extends Task {
get canReleaseMedia() {
const keepAnchor = this.data.anchorMedia ||
this.weAreTranscoding ||
this.cs.isBackGroundListen ||
this.cs.onHoldMusic ||
ANCHOR_MEDIA_ALWAYS ||
@@ -929,7 +930,13 @@ class TaskDial extends Task {
this.logger.info({err}, 'Dial:_selectSingleDial - Error boosting audio signal');
}
}
/* basic determination to see if call is being transcoded */
const codecA = getLeadingCodec(this.epOther.local.sdp);
const codecB = getLeadingCodec(this.ep.remote.sdp);
this.weAreTranscoding = (codecA !== codecB);
if (this.weAreTranscoding) {
this.logger.info(`Dial:_selectSingleDial - transcoding from ${codecA} (A leg) to ${codecB} (B leg)`);
}
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia || this.shouldExitMediaPathEntirely) {
setTimeout(this._releaseMedia.bind(this, cs, sd, this.shouldExitMediaPathEntirely), 200);

View File

@@ -2,8 +2,9 @@ const assert = require('assert');
const TtsTask = require('./tts-task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const pollySSMLSplit = require('polly-ssml-split');
const { SpeechCredentialError } = require('../utils/error');
const { SpeechCredentialError, NonFatalTaskError } = require('../utils/error');
const { sleepFor } = require('../utils/helpers');
const { NON_FANTAL_ERRORS } = require('../utils/constants.json');
/**
* Discard unmatching responses:
@@ -145,9 +146,6 @@ class TaskSay extends TtsTask {
await cs.startTtsStream();
cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'})
.catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending'));
if (this.text.length !== 0) {
this.logger.info('TaskSay:handlingStreaming - sending text to TTS stream');
for (const t of this.text) {
@@ -405,11 +403,19 @@ class TaskSay extends TtsTask {
this._playResolve = resolve;
this._playReject = reject;
});
const r = await ep.play(filename);
this.logger.debug({r}, 'Say:exec play result');
if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) {
this._playReject(new Error('Playback failed to start'));
try {
const r = await ep.play(filename);
this.logger.debug({r}, 'Say:exec play result');
if (r.playbackSeconds == null && r.playbackMilliseconds == null && r.playbackLastOffsetPos == null) {
this._playReject(new Error('Playback failed to start'));
}
} catch (err) {
if (NON_FANTAL_ERRORS.includes(err.message)) {
throw new NonFatalTaskError(err.message);
}
throw err;
}
try {
// wait for playback-stop event received to confirm if the playback is successful
await this._playPromise;
@@ -447,8 +453,8 @@ class TaskSay extends TtsTask {
const {memberId, confName} = cs;
this.killPlayToConfMember(this.ep, memberId, confName);
} else if (this.isStreamingTts) {
this.logger.debug('TaskSay:kill - clearing TTS stream for streaming audio');
cs.clearTtsStream();
this.logger.debug('TaskSay:kill - stopping TTS stream for streaming audio');
cs.stopTtsStream();
} else {
if (!this.notifiedPlayBackStop) {
this.notifyStatus({event: 'stop-playback'});

View File

@@ -85,55 +85,66 @@ class TtsTask extends Task {
}
async setTtsStreamingChannelVars(vendor, language, voice, credentials, ep) {
const {api_key, model_id, api_uri, custom_tts_streaming_url, auth_token} = credentials;
let obj;
const {api_key, model_id, api_uri, custom_tts_streaming_url, auth_token, options} = credentials;
// api_key, model_id, api_uri, custom_tts_streaming_url, and auth_token are encoded in the credentials
// allow them to be overriden via config, using options
// give preference to options passed in via config
const local_options = {...JSON.parse(options), ...this.options};
const local_voice_settings = {...JSON.parse(options).voice_settings, ...this.options.voice_settings};
const local_api_key = local_options.api_key ?? api_key;
const local_model_id = local_options.model_id ?? model_id;
const local_api_uri = local_options.api_uri ?? api_uri;
const local_custom_tts_streaming_url = local_options.custom_tts_streaming_url ?? custom_tts_streaming_url;
const local_auth_token = local_options.auth_token ?? auth_token;
this.logger.debug(`setTtsStreamingChannelVars: vendor: ${vendor}, language: ${language}, voice: ${voice}`);
let obj;
switch (vendor) {
case 'deepgram':
obj = {
DEEPGRAM_API_KEY: api_key,
DEEPGRAM_API_KEY: local_api_key,
DEEPGRAM_TTS_STREAMING_MODEL: voice
};
break;
case 'cartesia':
obj = {
CARTESIA_API_KEY: api_key,
CARTESIA_TTS_STREAMING_MODEL_ID: model_id,
CARTESIA_API_KEY: local_api_key,
CARTESIA_TTS_STREAMING_MODEL_ID: local_model_id,
CARTESIA_TTS_STREAMING_VOICE_ID: voice,
CARTESIA_TTS_STREAMING_LANGUAGE: language || 'en',
};
break;
case 'elevenlabs':
const {stability, similarity_boost, use_speaker_boost, style, speed} = this.options.voice_settings || {};
// eslint-disable-next-line max-len
const {stability, similarity_boost, use_speaker_boost, style, speed} = local_voice_settings || {};
obj = {
ELEVENLABS_API_KEY: api_key,
...(api_uri && {ELEVENLABS_API_URI: api_uri}),
ELEVENLABS_TTS_STREAMING_MODEL_ID: model_id,
ELEVENLABS_API_KEY: local_api_key,
...(api_uri && {ELEVENLABS_API_URI: local_api_uri}),
ELEVENLABS_TTS_STREAMING_MODEL_ID: local_model_id,
ELEVENLABS_TTS_STREAMING_VOICE_ID: voice,
// 20/12/2024 - only eleven_turbo_v2_5 support multiple language
...(['eleven_turbo_v2_5'].includes(model_id) && {ELEVENLABS_TTS_STREAMING_LANGUAGE: language}),
...(['eleven_turbo_v2_5'].includes(local_model_id) && {ELEVENLABS_TTS_STREAMING_LANGUAGE: language}),
...(stability && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_STABILITY: stability}),
...(similarity_boost && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_SIMILARITY_BOOST: similarity_boost}),
...(use_speaker_boost && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_USE_SPEAKER_BOOST: use_speaker_boost}),
...(style && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_STYLE: style}),
// speed has value 0.7 to 1.2, 1.0 is default, make sure we send the value event it's 0
...(speed !== null && speed !== undefined && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_SPEED: `${speed}`}),
...(this.options.pronunciation_dictionary_locators &&
Array.isArray(this.options.pronunciation_dictionary_locators) && {
...(local_options.pronunciation_dictionary_locators &&
Array.isArray(local_options.pronunciation_dictionary_locators) && {
ELEVENLABS_TTS_STREAMING_PRONUNCIATION_DICTIONARY_LOCATORS:
JSON.stringify(this.options.pronunciation_dictionary_locators)
JSON.stringify(local_options.pronunciation_dictionary_locators)
}),
};
break;
case 'rimelabs':
const {
pauseBetweenBrackets, phonemizeBetweenBrackets, inlineSpeedAlpha, speedAlpha, reduceLatency
} = this.options;
} = local_options;
obj = {
RIMELABS_API_KEY: api_key,
RIMELABS_TTS_STREAMING_MODEL_ID: model_id,
RIMELABS_API_KEY: local_api_key,
RIMELABS_TTS_STREAMING_MODEL_ID: local_model_id,
RIMELABS_TTS_STREAMING_VOICE_ID: voice,
RIMELABS_TTS_STREAMING_LANGUAGE: language || 'en',
...(pauseBetweenBrackets && {RIMELABS_TTS_STREAMING_PAUSE_BETWEEN_BRACKETS: pauseBetweenBrackets}),
@@ -148,8 +159,8 @@ class TtsTask extends Task {
if (vendor.startsWith('custom:')) {
const use_tls = custom_tts_streaming_url.startsWith('wss://');
obj = {
CUSTOM_TTS_STREAMING_HOST: custom_tts_streaming_url.replace(/^(ws|wss):\/\//, ''),
CUSTOM_TTS_STREAMING_API_KEY: auth_token,
CUSTOM_TTS_STREAMING_HOST: local_custom_tts_streaming_url.replace(/^(ws|wss):\/\//, ''),
CUSTOM_TTS_STREAMING_API_KEY: local_auth_token,
CUSTOM_TTS_STREAMING_VOICE_ID: voice,
CUSTOM_TTS_STREAMING_LANGUAGE: language || 'en',
CUSTOM_TTS_STREAMING_USE_TLS: use_tls
@@ -268,9 +279,9 @@ class TtsTask extends Task {
}
/* produce an audio segment from the provided text */
const generateAudio = async(text) => {
if (this.killed) return;
if (text.startsWith('silence_stream://')) return text;
const generateAudio = async(text, index) => {
if (this.killed) return {index, filePath: null};
if (text.startsWith('silence_stream://')) return {index, filePath: text};
/* otel: trace time for tts */
if (!preCache && !this._disableTracing) {
@@ -299,7 +310,6 @@ class TtsTask extends Task {
renderForCaching: preCache
});
if (!filePath.startsWith('say:')) {
this.playbackIds.push(null);
this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (this.otelSpan) {
@@ -327,10 +337,11 @@ class TtsTask extends Task {
'id': this.id
});
}
return {index, filePath, playbackId: null};
}
else {
this.playbackIds.push(extractPlaybackId(filePath));
this.logger.debug({playbackIds: this.playbackIds}, 'Say: a streaming tts api will be used');
const playbackId = extractPlaybackId(filePath);
this.logger.debug('Say: a streaming tts api will be used');
const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`);
this.notifyStatus({
event: 'synthesized-audio',
@@ -339,9 +350,8 @@ class TtsTask extends Task {
servedFromCache,
'id': this.id
});
return modifiedPath;
return {index, filePath: modifiedPath, playbackId};
}
return filePath;
} catch (err) {
this.logger.info({err}, 'Error synthesizing tts');
if (this.otelSpan) this.otelSpan.end();
@@ -356,8 +366,20 @@ class TtsTask extends Task {
}
};
const arr = this.text.map((t) => (this._validateURL(t) ? t : generateAudio(t)));
return (await Promise.all(arr)).filter((fp) => fp && fp.length);
// process all text segments in parallel will cause ordering issue
// so we attach index to each promise result and sort them later
const arr = this.text.map((t, index) => (this._validateURL(t) ?
Promise.resolve({index, filePath: t, playbackId: null}) : generateAudio(t, index)));
const results = await Promise.all(arr);
const sorted = results.sort((a, b) => a.index - b.index);
return sorted
.filter((fp) => fp.filePath && fp.filePath.length)
.map((r) => {
this.playbackIds.push(r.playbackId);
return r.filePath;
});
} catch (err) {
this.logger.info(err, 'TaskSay:exec error');
throw err;

View File

@@ -335,7 +335,8 @@
"Empty": "tts_streaming::empty",
"Pause": "tts_streaming::pause",
"Resume": "tts_streaming::resume",
"ConnectFailure": "tts_streaming::connect_failed"
"ConnectFailure": "tts_streaming::connect_failed",
"Connected": "tts_streaming::connected"
},
"TtsStreamingConnectionStatus": {
"NotConnected": "not_connected",
@@ -355,5 +356,8 @@
"WS_CLOSE_CODES": {
"NormalClosure": 1000,
"GoingAway": 1001
}
},
"NON_FANTAL_ERRORS": [
"File Not Found"
]
}

View File

@@ -55,11 +55,28 @@ const extractSdpMedia = (sdp) => {
}
};
const getLeadingCodec = (sdp) => {
if (!sdp) {
return null;
}
const parsed = sdpTransform.parse(sdp);
const audio = parsed.media?.find((m) => m.type === 'audio');
if (!audio) {
return null;
}
return audio.rtp?.[0]?.codec || null;
};
module.exports = {
isOnhold,
mergeSdpMedia,
extractSdpMedia,
isOpusFirst,
makeOpusFirst,
removeVideoSdp
removeVideoSdp,
getLeadingCodec
};

View File

@@ -80,7 +80,7 @@ class TtsStreamingBuffer extends Emitter {
clearTimeout(this.timer);
this.removeCustomEventListeners();
if (this.ep) {
this._api(this.ep, [this.ep.uuid, 'close'])
this._api(this.ep, [this.ep.uuid, 'stop'])
.catch((err) =>
this.logger.info({ err }, 'TtsStreamingBuffer:stop Error closing TTS streaming')
);
@@ -193,10 +193,7 @@ class TtsStreamingBuffer extends Emitter {
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not open or no endpoint available');
return;
}
if (
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected ||
this._connectionStatus === TtsStreamingConnectionStatus.Failed
) {
if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) {
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not connected');
return;
}
@@ -365,6 +362,7 @@ class TtsStreamingBuffer extends Emitter {
if (this.queue.length > 0) {
await this._feedQueue();
}
this.emit(TtsStreamingEvents.Connected, { vendor });
}
_onConnectFailure(vendor) {
@@ -415,6 +413,7 @@ class TtsStreamingBuffer extends Emitter {
removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
this.eventHandlers.length = 0;
}
_initHandlers(ep) {

1841
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -49,7 +49,7 @@
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^4.1.2",
"drachtio-srf": "^5.0.19",
"drachtio-srf": "^5.0.14",
"express": "^4.19.2",
"express-validator": "^7.0.1",
"moment": "^2.30.1",

View File

@@ -83,7 +83,8 @@ test('invalid jambonz json create alert tests', async(t) => {
{account_sid: 'bb845d4b-83a9-4cde-a6e9-50f3743bab3f', page: 1, page_size: 25, days: 7});
let checked = false;
for (let i = 0; i < data.total; i++) {
checked = data.data[i].message === 'malformed jambonz payload: must be array'
checked = data.data[i].message === 'malformed jambonz payload: must be array';
if (checked) break;
}
t.ok(checked, 'alert is raised as expected');
disconnect();