Compare commits

..

4 Commits

Author SHA1 Message Date
surajshivakumar
a6dcc9170b removed:duplicate line 2024-07-28 17:18:27 -04:00
surajshivakumar
34693b7e77 early media call termination log --removed faulty log msg 2024-07-26 18:16:08 -04:00
surajshivakumar
2439e225a0 early media call termination log 2024-07-26 18:07:14 -04:00
surajshivakumar
76c2be1d07 early media logging patch 2024-07-25 15:36:04 -04:00
54 changed files with 11862 additions and 5950 deletions

View File

@@ -12,11 +12,6 @@ jobs:
node-version: 20
- run: npm ci
- run: npm run jslint
- name: Install Docker Compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose --version
- run: docker pull drachtio/sipp
- run: npm test
env:

3
.gitignore vendored
View File

@@ -42,5 +42,4 @@ ecosystem.config.js
test/credentials/*.json
run-tests.sh
run-coverage.sh
.vscode
.env
.vscode

View File

@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 node:20-alpine as base
FROM --platform=linux/amd64 node:18.15-alpine3.16 as base
RUN apk --update --no-cache add --virtual .builds-deps build-base python3

View File

@@ -21,7 +21,6 @@ Configuration is provided via environment variables:
|ENCRYPTION_SECRET| secret for credential encryption(JWT_SECRET is deprecated) |yes|
|GOOGLE_APPLICATION_CREDENTIALS| path to gcp service key file|yes|
|HTTP_PORT| tcp port to listen on for API requests from jambonz-api-server|yes|
|HTTP_IP| IP Address for API requests from jambonz-api-server |no|
|JAMBONES_GATHER_EARLY_HINTS_MATCH| if true and hints are provided, gather will opportunistically review interim transcripts if possible to reduce ASR latency |no|
|JAMBONES_FREESWITCH| IP:port:secret for Freeswitch server (e.g. '127.0.0.1:8021:JambonzR0ck$'|yes|
|JAMBONES_LOGLEVEL| log level for application, 'info' or 'debug'|no|

46
app.js
View File

@@ -25,22 +25,10 @@ const opts = {
};
const pino = require('pino');
const logger = pino(opts, pino.destination({sync: false}));
const {LifeCycleEvents, FS_UUID_SET_NAME, SystemState, FEATURE_SERVER} = require('./lib/utils/constants');
const {LifeCycleEvents, FS_UUID_SET_NAME} = require('./lib/utils/constants');
const installSrfLocals = require('./lib/utils/install-srf-locals');
installSrfLocals(srf, logger);
const writeSystemAlerts = srf.locals?.writeSystemAlerts;
if (writeSystemAlerts) {
writeSystemAlerts({
system_component: FEATURE_SERVER,
state : SystemState.Online,
fields : {
detail: `feature-server with process_id ${process.pid} started`,
host: srf.locals?.ipv4
}
});
}
const {
initLocals,
createRootSpan,
@@ -112,20 +100,8 @@ createHttpListener(logger, srf)
});
const monInterval = setInterval(async() => {
setInterval(() => {
srf.locals.stats.gauge('fs.sip.calls.count', sessionTracker.count);
try {
const systemInformation = await srf.locals.dbHelpers.lookupSystemInformation();
if (systemInformation && systemInformation.log_level) {
logger.level = systemInformation.log_level;
}
} catch (err) {
if (process.env.NODE_ENV === 'test') {
clearInterval(monInterval);
logger.error('all tests complete');
}
else logger.error({err}, 'Error checking system log level in database');
}
}, 20000);
const disconnect = () => {
@@ -136,25 +112,13 @@ const disconnect = () => {
srf.locals.mediaservers?.forEach((ms) => ms.disconnect());
});
};
process.on('SIGTERM', handle);
process.on('SIGINT', handle);
async function handle(signal) {
process.on('SIGTERM', handle);
function handle(signal) {
const {removeFromSet} = srf.locals.dbHelpers;
srf.locals.disabled = true;
logger.info(`got signal ${signal}`);
const writeSystemAlerts = srf.locals?.writeSystemAlerts;
if (writeSystemAlerts) {
// it has to be synchronous call, or else by the time system saves the app terminates
await writeSystemAlerts({
system_component: FEATURE_SERVER,
state : SystemState.Offline,
fields : {
detail: `feature-server with process_id ${process.pid} stopped, signal ${signal}`,
host: srf.locals?.ipv4
}
});
}
const setName = `${(JAMBONES_CLUSTER_ID || 'default')}:active-fs`;
const fsServiceUrlSetName = `${(JAMBONES_CLUSTER_ID || 'default')}:fs-service-url`;
if (setName && srf.locals.localSipAddress) {

View File

@@ -73,7 +73,6 @@ const JAMBONES_LOGLEVEL = process.env.JAMBONES_LOGLEVEL || 'info';
const JAMBONES_INJECT_CONTENT = process.env.JAMBONES_INJECT_CONTENT;
const PORT = parseInt(process.env.HTTP_PORT, 10) || 3000;
const HTTP_IP = process.env.HTTP_IP;
const HTTP_PORT_MAX = parseInt(process.env.HTTP_PORT_MAX, 10);
const K8S = process.env.K8S;
@@ -108,8 +107,6 @@ const DEEPGRAM_API_KEY = process.env.DEEPGRAM_API_KEY;
const ANCHOR_MEDIA_ALWAYS = process.env.ANCHOR_MEDIA_ALWAYS;
const VMD_HINTS_FILE = process.env.VMD_HINTS_FILE;
const JAMBONES_AWS_TRANSCRIBE_USE_GRPC = process.env.JAMBONES_AWS_TRANSCRIBE_USE_GRPC;
/* security, secrets */
const LEGACY_CRYPTO = !!process.env.LEGACY_CRYPTO;
const JWT_SECRET = process.env.JWT_SECRET;
@@ -136,9 +133,6 @@ const JAMBONES_DISABLE_DIRECT_P2P_CALL = process.env.JAMBONES_DISABLE_DIRECT_P2P
const JAMBONES_EAGERLY_PRE_CACHE_AUDIO = parseInt(process.env.JAMBONES_EAGERLY_PRE_CACHE_AUDIO, 10) || 0;
const JAMBONES_USE_FREESWITCH_TIMER_FD = process.env.JAMBONES_USE_FREESWITCH_TIMER_FD;
const JAMBONES_DIAL_SBC_FOR_REGISTERED_USER = process.env.JAMBONES_DIAL_SBC_FOR_REGISTERED_USER || false;
const JAMBONES_MEDIA_TIMEOUT_MS = process.env.JAMBONES_MEDIA_TIMEOUT_MS || 0;
const JAMBONES_MEDIA_HOLD_TIMEOUT_MS = process.env.JAMBONES_MEDIA_HOLD_TIMEOUT_MS || 0;
module.exports = {
JAMBONES_MYSQL_HOST,
@@ -176,7 +170,6 @@ module.exports = {
JAMBONES_CLUSTER_ID,
PORT,
HTTP_PORT_MAX,
HTTP_IP,
K8S,
K8S_SBC_SIP_SERVICE_NAME,
JAMBONES_SUBNET,
@@ -195,7 +188,6 @@ module.exports = {
ANCHOR_MEDIA_ALWAYS,
VMD_HINTS_FILE,
JAMBONES_FREESWITCH_MAX_CALL_DURATION_MINS,
JAMBONES_AWS_TRANSCRIBE_USE_GRPC,
LEGACY_CRYPTO,
JWT_SECRET,
@@ -224,8 +216,5 @@ module.exports = {
JAMBONZ_RECORD_WS_PASSWORD,
JAMBONZ_DISABLE_DIAL_PAI_HEADER,
JAMBONES_DISABLE_DIRECT_P2P_CALL,
JAMBONES_USE_FREESWITCH_TIMER_FD,
JAMBONES_DIAL_SBC_FOR_REGISTERED_USER,
JAMBONES_MEDIA_TIMEOUT_MS,
JAMBONES_MEDIA_HOLD_TIMEOUT_MS
JAMBONES_USE_FREESWITCH_TIMER_FD
};

View File

@@ -14,8 +14,6 @@ const RootSpan = require('../../utils/call-tracer');
const dbUtils = require('../../utils/db-utils');
const { mergeSdpMedia, extractSdpMedia } = require('../../utils/sdp-utils');
const { createCallSchema, customSanitizeFunction } = require('../schemas/create-call');
const { selectHostPort } = require('../../utils/network');
const { JAMBONES_DIAL_SBC_FOR_REGISTERED_USER } = require('../../config');
const removeNullProperties = (obj) => (Object.keys(obj).forEach((key) => obj[key] === null && delete obj[key]), obj);
const removeNulls = (req, res, next) => {
@@ -67,7 +65,7 @@ router.post('/',
lookupAppBySid
} = srf.locals.dbHelpers;
const {getSBC, getFreeswitch} = srf.locals;
let sbcAddress = getSBC();
const sbcAddress = getSBC();
if (!sbcAddress) throw new Error('no available SBCs for outbound call creation');
const target = restDial.to;
const opts = {
@@ -99,8 +97,7 @@ router.post('/',
'X-Trace-ID': rootSpan.traceId,
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat}),
...target.headers
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat})
};
switch (target.type) {
@@ -142,16 +139,6 @@ router.post('/',
}
}
// find handling sbc sip for called user
if (JAMBONES_DIAL_SBC_FOR_REGISTERED_USER && target.type === 'user') {
const { registrar } = srf.locals.dbHelpers;
const reg = await registrar.query(target.name);
if (reg) {
sbcAddress = selectHostPort(logger, reg.sbcAddress, 'tcp')[1];
}
//sbc outbound return 404 Notfound to handle case called user is not reigstered.
}
/**
* trunk isn't specified,
* check if from-number matches any existing numbers on Jambonz
@@ -208,13 +195,10 @@ router.post('/',
/**
* create our application object -
* we merge the inbound call application,
* with the provided app params from the request body
* not from the database as per an inbound call,
* but from the provided params in the request
*/
const app = {
...application,
...req.body
};
const app = req.body;
/**
* attach our requestor and notifier objects
@@ -234,7 +218,7 @@ router.post('/',
}
if (!app.notifier && app.call_status_hook) {
app.notifier = new HttpRequestor(logger, account.account_sid, app.call_status_hook, account.webhook_secret);
logger.debug({call_status_hook: app.call_status_hook}, 'creating http client for call status hook');
logger.debug({call_hook: app.call_hook}, 'creating http client for call status hook');
}
else if (!app.notifier) {
logger.debug('creating null call status hook');
@@ -273,8 +257,6 @@ router.post('/',
callId: inviteReq.get('Call-ID'),
accountSid,
traceId: rootSpan.traceId
}, {
...(account.enable_debug_log && {level: 'debug'})
});
app.requestor.logger = app.notifier.logger = sipLogger;
const callInfo = new CallInfo({
@@ -308,8 +290,6 @@ router.post('/',
},
cbProvisional: (prov) => {
const callStatus = prov.body ? CallStatus.EarlyMedia : CallStatus.Ringing;
// Update call-id for sbc outbound INVITE
cs.callInfo.sbcCallid = prov.get('X-CID');
if ([180, 183].includes(prov.status) && prov.body) connectStream(prov.body);
restDial.emit('callStatus', prov.status, !!prov.body);
cs.emit('callStatusChange', {callStatus, sipStatus: prov.status});

View File

@@ -1,5 +1,5 @@
const uuidv4 = require('uuid-random');
const {CallDirection, AllowedSipRecVerbs, WS_CLOSE_CODES} = require('./utils/constants');
const {CallDirection, AllowedSipRecVerbs} = require('./utils/constants');
const {parseSiprecPayload} = require('./utils/siprec-utils');
const CallInfo = require('./session/call-info');
const HttpRequestor = require('./utils/http-requestor');
@@ -187,20 +187,14 @@ module.exports = function(srf, logger) {
const {span} = rootSpan.startChildSpan('lookupAccountDetails');
try {
const accountDetail = await lookupAccountDetails(account_sid);
const account = accountDetail?.account;
req.locals.accountInfo = accountDetail;
req.locals.service_provider_sid = account?.service_provider_sid;
req.locals.accountInfo = await lookupAccountDetails(account_sid);
req.locals.service_provider_sid = req.locals.accountInfo?.account?.service_provider_sid;
span.end();
if (!account?.is_active) {
if (!req.locals.accountInfo.account.is_active) {
logger.info(`Account is inactive or suspended ${account_sid}`);
// TODO: alert
return res.send(503, {headers: {'X-Reason': 'Account exists but is inactive'}});
}
// Change the default log level to debug
if (account?.enable_debug_log) {
req.locals.logger.level = 'debug';
}
logger.debug({accountInfo: req.locals?.accountInfo?.account}, `retrieved account info for ${account_sid}`);
next();
} catch (err) {
@@ -336,9 +330,7 @@ module.exports = function(srf, logger) {
if (arr) {
const google_custom_voice_sid = arr[1];
const [custom_voice] = await lookupGoogleCustomVoice(google_custom_voice_sid);
//google voice cloning key has size 200kb, jambonz should not resolve the voice here that the app's calling
//webhook will receive big payload, tts-task should resolve the voice later.
if (!custom_voice.use_voice_cloning_key) {
if (custom_voice) {
app2.speech_synthesis_voice = {
reportedUsage: custom_voice.reported_usage,
model: custom_voice.model
@@ -362,13 +354,11 @@ module.exports = function(srf, logger) {
});
// if transferred call contains callInfo, let update original data to newly created callInfo in this instance.
if (app.transferredCall && app.callInfo) {
const {direction, callerName, from, to, originatingSipIp, originatingSipTrunkName} = app.callInfo;
req.locals.callInfo.direction = direction;
req.locals.callInfo.callerName = callerName;
req.locals.callInfo.from = from;
req.locals.callInfo.to = to;
req.locals.callInfo.originatingSipIp = originatingSipIp;
req.locals.callInfo.originatingSipTrunkName = originatingSipTrunkName;
req.locals.callInfo.callerName = app.callInfo.callerName;
req.locals.callInfo.from = app.callInfo.from;
req.locals.callInfo.to = app.callInfo.to;
req.locals.callInfo.originatingSipIp = app.callInfo.originatingSipIp;
req.locals.callInfo.originatingSipTrunkName = app.callInfo.originatingSipTrunkName;
delete app.callInfo;
}
next();
@@ -387,7 +377,7 @@ module.exports = function(srf, logger) {
const {rootSpan, siprec, application:app} = req.locals;
let span;
try {
if (app.tasks && app.tasks?.length > 0 && !JAMBONES_MYSQL_REFRESH_TTL) {
if (app.tasks && !JAMBONES_MYSQL_REFRESH_TTL) {
app.tasks = normalizeJambones(logger, app.tasks).map((tdata) => makeTask(logger, tdata));
if (0 === app.tasks.length) throw new Error('no application provided');
return next();
@@ -460,7 +450,7 @@ module.exports = function(srf, logger) {
}).catch((err) => this.logger.info({err}, 'Error generating alert for parsing application'));
logger.info({err}, `Error retrieving or parsing application: ${err?.message}`);
res.send(480, {headers: {'X-Reason': err?.message || 'unknown'}});
app.requestor.close(WS_CLOSE_CODES.GoingAway);
app.requestor.close();
}
}

View File

@@ -45,10 +45,8 @@ class AdultingCallSession extends CallSession {
return this.sd.ep;
}
// When adulting session kicked from conference, replaceEndpoint is a must
set ep(newEp) {
this.sd.ep = newEp;
}
/* see note above */
set ep(newEp) {}
get callSid() {
return this.callInfo.callSid;

View File

@@ -32,7 +32,6 @@ class CallInfo {
this.sipStatus = 100;
this.sipReason = 'Trying';
this.callStatus = CallStatus.Trying;
this.sbcCallid = req.get('X-CID');
this.originatingSipIp = req.get('X-Forwarded-For');
this.originatingSipTrunkName = req.get('X-Originating-Carrier');
const {siprec} = req.locals;
@@ -130,7 +129,6 @@ class CallInfo {
from: this.from,
to: this.to,
callId: this.callId,
sbcCallid: this.sbcCallid,
sipStatus: this.sipStatus,
sipReason: this.sipReason,
callStatus: this.callStatus,

View File

@@ -2,15 +2,13 @@ const Emitter = require('events');
const fs = require('fs');
const {
CallDirection,
MediaPath,
TaskPreconditions,
CallStatus,
TaskName,
KillReason,
RecordState,
AllowedSipRecVerbs,
AllowedConfirmSessionVerbs,
TtsStreamingEvents
AllowedConfirmSessionVerbs
} = require('../utils/constants');
const moment = require('moment');
const assert = require('assert');
@@ -22,22 +20,17 @@ const listTaskNames = require('../utils/summarize-tasks');
const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const ActionHookDelayProcessor = require('../utils/action-hook-delay');
const TtsStreamingBuffer = require('../utils/tts-streaming-buffer');
const {parseUri} = require('drachtio-srf');
const {
JAMBONES_INJECT_CONTENT,
JAMBONES_EAGERLY_PRE_CACHE_AUDIO,
AWS_REGION,
JAMBONES_USE_FREESWITCH_TIMER_FD,
JAMBONES_MEDIA_TIMEOUT_MS,
JAMBONES_MEDIA_HOLD_TIMEOUT_MS
JAMBONES_USE_FREESWITCH_TIMER_FD
} = require('../config');
const bent = require('bent');
const BackgroundTaskManager = require('../utils/background-task-manager');
const dbUtils = require('../utils/db-utils');
const BADPRECONDITIONS = 'preconditions not met';
const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction';
const { NonFatalTaskError} = require('../utils/error');
const sqlRetrieveQueueEventHook = `SELECT * FROM webhooks
WHERE webhook_sid =
(
@@ -346,18 +339,6 @@ class CallSession extends Emitter {
this.application.fallback_speech_recognizer_language = language;
}
/**
* global referHook
*/
set referHook(hook) {
this._referHook = hook;
}
get referHook() {
return this._referHook;
}
/**
* Vad
*/
@@ -404,41 +385,33 @@ class CallSession extends Emitter {
return this.application.transferredCall === true;
}
/**
* returns true if this session is an inbound call session
*/
get isInboundCallSession() {
return this.constructor.name === 'InboundCallSession';
}
/**
* returns true if this session is a ConfirmCallSession
*/
get isAdultingCallSession() {
return this.constructor.name === 'AdultingCallSession';
}
/**
* returns true if this session is a ConfirmCallSession
*/
get isConfirmCallSession() {
return this.constructor.name === 'ConfirmCallSession';
}
/**
* returns true if this session is a SipRecCallSession
*/
get isSipRecCallSession() {
return this.constructor.name === 'SipRecCallSession';
}
/**
* returns true if this session is a SmsCallSession
*/
get isSmsCallSession() {
return this.constructor.name === 'SmsCallSession';
}
get isRestCallSession() {
return this.constructor.name === 'RestCallSession';
}
get InboundCallSession() {
return this.constructor.name === 'InboundCallSession';
}
get isNormalCallSession() {
return this.constructor.name === 'InboundCallSession' || this.constructor.name === 'RestCallSession';
}
get is3pccInvite() {
return this.isInboundCallSession && this.req?.body?.length === 0;
}
get webhook_secret() {
return this.accountInfo?.account?.webhook_secret;
@@ -452,10 +425,6 @@ class CallSession extends Emitter {
return this.backgroundTaskManager.isTaskRunning('bargeIn');
}
get isTtsStreamEnabled() {
return this.backgroundTaskManager.isTaskRunning('ttsStream');
}
get isListenEnabled() {
return this.backgroundTaskManager.isTaskRunning('listen');
}
@@ -518,10 +487,6 @@ class CallSession extends Emitter {
this._sipRequestWithinDialogHook = url;
}
get isTtsStreamOpen() {
return this.currentTask?.isStreamingTts ||
this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts;
}
// Bot Delay (actionHook delayed)
get actionHookDelayEnabled() {
return this._actionHookDelayEnabled;
@@ -555,6 +520,15 @@ class CallSession extends Emitter {
this._actionHookDelayRetries = e;
}
// Getter/setter for current tts vendor
get currentTtsVendor() {
return this._currentTtsVendor;
}
set currentTtsVendor(vendor) {
this._currentTtsVendor = vendor;
}
get actionHookDelayProcessor() {
return this._actionHookDelayProcessor;
}
@@ -572,60 +546,19 @@ class CallSession extends Emitter {
this._actionHookDelayProcessor = new ActionHookDelayProcessor(this.logger, opts, this, this.ep);
this._actionHookDelayProcessor.on('giveup', () => {
this.logger.info('CallSession: ActionHookDelayProcessor: giveup event - hanging up call');
const {writeAlerts} = this.srf.locals;
try {
writeAlerts({
alert_type: 'bot-action-delay-giveup',
account_sid: this.accountSid,
message: 'Call terminated due to bot action delay timeout',
target_sid: this.callSid
});
} catch (err) {
this.logger.error({err}, 'Error writing bot-action-delay-giveup alert');
}
this._jambonzHangup('bot-action-delay-giveup');
this._jambonzHangup();
if (this.wakeupResolver) {
this.logger.debug('CallSession: Giveup timer expired - waking up');
this.wakeupResolver({reason: 'noResponseGiveUp'});
this.wakeupResolver = null;
}
});
this._actionHookDelayProcessor.on('giveupWithTasks', (tasks) => {
this.logger.info('CallSession: ActionHookDelayProcessor: giveupWithTasks event');
const giveUpTasks = normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata));
this.logger.info({tasks: listTaskNames(giveUpTasks)}, 'CallSession:giveupWithTasks task list');
// we need to clear the ahd, as we do not want to execute actionHookDelay actions again
this.clearActionHookDelayProcessor();
// replace the application with giveUpTasks
this.replaceApplication(giveUpTasks);
});
} catch (err) {
this.logger.error({err}, 'CallSession: Error creating ActionHookDelayProcessor');
}
}
}
getTsStreamingVendor() {
let v;
if (this.currentTask?.isStreamingTts) {
const {vendor} = this.currentTask.getTtsVendorData(this);
v = vendor;
}
else if (this.backgroundTaskManager.getTask('ttsStream')?.isStreamingTts) {
const {vendor} = this.backgroundTaskManager.getTask('ttsStream').getTtsVendorData(this);
v = vendor;
}
return v;
}
get appIsUsingWebsockets() {
return this.requestor instanceof WsRequestor;
}
/* end of getters and setters */
async clearOrRestoreActionHookDelayProcessor() {
if (this._actionHookDelayProcessor) {
await this._actionHookDelayProcessor.stop();
@@ -633,7 +566,9 @@ class CallSession extends Emitter {
//this.logger.debug('CallSession:clearOrRestoreActionHookDelayProcessor - ahd settings');
//await this.clearActionHookDelayProcessor();
}
this.logger.debug('CallSession:clearOrRestoreActionHookDelayProcessor - say or play action completed');
else {
this.logger.debug('CallSession:clearOrRestoreActionHookDelayProcessor - restore ahd after gather override');
}
}
}
@@ -843,38 +778,6 @@ class CallSession extends Emitter {
}
}
async enableBackgroundTtsStream(say) {
try {
if (this.isTtsStreamEnabled) {
this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream currently enabled, ignoring request');
} else if (this.appIsUsingWebsockets && this.isNormalCallSession) {
await this.backgroundTaskManager.newTask('ttsStream', say);
this.logger.debug('CallSession:enableBackgroundTtsStream - ttsStream enabled');
} else {
this.logger.debug(
'CallSession:enableBackgroundTtsStream - ignoring request as call does not have required conditions');
}
} catch (err) {
this.logger.info({err, say}, 'CallSession:enableBackgroundTtsStream - Error creating background tts stream task');
}
}
disableTtsStream() {
if (this.isTtsStreamEnabled) {
this.backgroundTaskManager.stop('ttsStream');
this.logger.debug('CallSession:disableTtsStream - ttsStream disabled');
}
}
clearTtsStream() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'user_interruption'})
.catch((err) => this.logger.info({err}, 'CallSession:clearTtsStream - Error sending user_interruption'));
this.ttsStreamingBuffer?.clear();
}
startTtsStream() {
this.ttsStreamingBuffer?.start();
}
async enableBotMode(gather, autoEnable) {
try {
let task;
@@ -970,8 +873,7 @@ class CallSession extends Emitter {
writeAlerts({
alert_type: AlertType.TTS_FAILURE,
account_sid: this.accountSid,
vendor,
target_sid: this.callSid
vendor
}).catch((err) => this.logger.error({err}, 'Error writing tts alert'));
}
}
@@ -1017,7 +919,6 @@ class CallSession extends Emitter {
speech_credential_sid: credential.speech_credential_sid,
api_key: credential.api_key,
deepgram_stt_uri: credential.deepgram_stt_uri,
deepgram_tts_uri: credential.deepgram_tts_uri,
deepgram_stt_use_tls: credential.deepgram_stt_use_tls
};
}
@@ -1047,76 +948,47 @@ class CallSession extends Emitter {
speech_credential_sid: credential.speech_credential_sid,
cobalt_server_uri: credential.cobalt_server_uri
};
}
else if ('elevenlabs' === vendor) {
} else if ('elevenlabs' === vendor) {
return {
api_key: credential.api_key,
model_id: credential.model_id,
options: credential.options
};
}
else if ('playht' === vendor) {
} else if ('playht' === vendor) {
return {
api_key: credential.api_key,
user_id: credential.user_id,
voice_engine: credential.voice_engine,
options: credential.options
};
}
else if ('cartesia' === vendor) {
return {
api_key: credential.api_key,
model_id: credential.model_id,
embedding: credential.embedding,
options: credential.options
};
}
else if ('rimelabs' === vendor) {
} else if ('rimelabs' === vendor) {
return {
api_key: credential.api_key,
model_id: credential.model_id,
options: credential.options
};
}
else if ('assemblyai' === vendor) {
} else if ('assemblyai' === vendor) {
return {
speech_credential_sid: credential.speech_credential_sid,
api_key: credential.api_key
};
}
else if ('voxist' === vendor) {
return {
speech_credential_sid: credential.speech_credential_sid,
api_key: credential.api_key
};
}
else if ('whisper' === vendor) {
} else if ('whisper' === vendor) {
return {
api_key: credential.api_key,
model_id: credential.model_id
};
}
else if ('verbio' === vendor) {
} else if ('verbio' === vendor) {
return {
client_id: credential.client_id,
client_secret: credential.client_secret,
engine_version: credential.engine_version
};
}
else if ('speechmatics' === vendor) {
this.logger.info({credential}, 'CallSession:getSpeechCredentials - speechmatics credential');
return {
api_key: credential.api_key,
speechmatics_stt_uri: credential.speechmatics_stt_uri,
};
}
else if (vendor.startsWith('custom:')) {
} else if (vendor.startsWith('custom:')) {
return {
speech_credential_sid: credential.speech_credential_sid,
auth_token: credential.auth_token,
custom_stt_url: credential.custom_stt_url,
custom_tts_url: credential.custom_tts_url,
custom_tts_streaming_url: credential.custom_tts_streaming_url
custom_tts_url: credential.custom_tts_url
};
}
}
@@ -1124,8 +996,7 @@ class CallSession extends Emitter {
writeAlerts({
alert_type: AlertType.STT_NOT_PROVISIONED,
account_sid: this.accountSid,
vendor,
target_sid: this.callSid
vendor
}).catch((err) => this.logger.error({err}, 'Error writing tts alert'));
}
}
@@ -1139,27 +1010,6 @@ class CallSession extends Emitter {
async exec() {
this.logger.info({tasks: listTaskNames(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`);
// calculate if inbandDTMF tone is used
const voip_carrier_sid = this.req?.has('X-Voip-Carrier-Sid') ? this.req.get('X-Voip-Carrier-Sid') :
this.req?.has('X-Requested-Carrier-Sid') ? this.req.get('X-Requested-Carrier-Sid') : null;
if (voip_carrier_sid) {
const {lookupVoipCarrierBySid} = dbUtils(this.logger, this.srf);
const [voipCarrier] = await lookupVoipCarrierBySid(voip_carrier_sid);
this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones';
}
if (this.isNormalCallSession) {
this.ttsStreamingBuffer = new TtsStreamingBuffer(this);
this.ttsStreamingBuffer.on(TtsStreamingEvents.Empty, this._onTtsStreamingEmpty.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Pause, this._onTtsStreamingPause.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.Resume, this._onTtsStreamingResume.bind(this));
this.ttsStreamingBuffer.on(TtsStreamingEvents.ConnectFailure, this._onTtsStreamingConnectFailure.bind(this));
}
else {
this.logger.info(`CallSession:exec - not a normal call session: ${this.constructor.name}`);
}
while (this.tasks.length && !this.callGone) {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
@@ -1200,8 +1050,6 @@ class CallSession extends Emitter {
this.currentTask = null;
if (err.message?.includes(BADPRECONDITIONS)) {
this.logger.info(`CallSession:exec task #${stackNum}:${taskNum}: ${task.name}: ${err.message}`);
} else if (err instanceof NonFatalTaskError) {
this.logger.error(err, `Error executing task #${stackNum}:${taskNum}: ${task.name}`);
}
else {
this.logger.error(err, `Error executing task #${stackNum}:${taskNum}: ${task.name}`);
@@ -1218,7 +1066,7 @@ class CallSession extends Emitter {
try {
await this._awaitCommandsOrHangup();
//await this.clearOrRestoreActionHookDelayProcessor();
await this.clearOrRestoreActionHookDelayProcessor();
//TODO: remove filler noise code and simply create as action hook delay
if (this._isPlayingFillerNoise) {
@@ -1294,11 +1142,6 @@ class CallSession extends Emitter {
this.wakeupResolver({reason: 'session ended'});
this.wakeupResolver = null;
}
if (this._maxCallDurationTimer) {
clearTimeout(this._maxCallDurationTimer);
this._maxCallDurationTimer = null;
}
}
/**
@@ -1474,7 +1317,7 @@ class CallSession extends Emitter {
if (!listenTask) {
return this.logger.info('CallSession:_lccListenStatus - invalid listen_status: Dial does not have a listen');
}
listenTask.updateListen(opts.listen_status || opts.stream_status);
listenTask.updateListen(opts.listen_status);
}
/**
@@ -1616,7 +1459,7 @@ Duration=${duration} `
// this whole thing requires us to be in a Dial verb
const task = this.currentTask;
if (!task || ![TaskName.Dial, TaskName.Listen].includes(task.name)) {
return this.logger.info('CallSession:_lccWhisper - invalid command since we are not in a dial or stream/listen');
return this.logger.info('CallSession:_lccWhisper - invalid command since we are not in a dial or listen');
}
// allow user to provide a url object, a url string, an array of tasks, or a single task
@@ -1716,71 +1559,6 @@ Duration=${duration} `
this.logger.info({response}, '_lccBoostAudioSignal: response from freeswitch');
}
async _lccMediaPath(desiredPath) {
const task = this.currentTask;
if (!task || task.name !== TaskName.Dial) {
return this.logger.info('CallSession:_lccMediaPath - invalid command since we are not in a dial verb');
}
task.updateMediaPath(desiredPath)
.catch((err) => this.logger.error(err, 'CallSession:_lccMediaPath'));
}
_lccToolOutput(tool_call_id, opts, callSid) {
// only valid if we are in an LLM verb
const task = this.currentTask;
if (!task || !task.name.startsWith('Llm')) {
return this.logger.info('CallSession:_lccToolOutput - invalid command since we are not in an llm');
}
task.processToolOutput(tool_call_id, opts, callSid)
.catch((err) => this.logger.error(err, 'CallSession:_lccToolOutput'));
}
_lccLlmUpdate(opts, callSid) {
// only valid if we are in an LLM verb
const task = this.currentTask;
if (!task || !task.name.startsWith('Llm')) {
return this.logger.info('CallSession:_lccLlmUpdate - invalid command since we are not in an llm');
}
task.processLlmUpdate(opts, callSid)
.catch((err) => this.logger.error(err, 'CallSession:_lccLlmUpdate'));
}
async _lccTtsTokens(opts) {
const {id, tokens} = opts;
if (id === undefined) {
this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing');
return;
}
else if (tokens === undefined) {
this.logger.info({opts}, 'CallSession:_lccTtsTokens - invalid command since id is missing');
return this.requestor.request('tts:tokens-result', '/tokens-result', {
id,
status: 'failed',
reason: 'missing tokens'
}).catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
}
let res;
try {
res = await this.ttsStreamingBuffer?.bufferTokens(tokens);
this.logger.info({id, res}, 'CallSession:_lccTtsTokens - tts:tokens-result');
} catch (err) {
this.logger.info(err, 'CallSession:_lccTtsTokens');
}
this.requestor.request('tts:tokens-result', '/tokens-result', {id, ...res})
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
}
_lccTtsFlush(opts) {
this.ttsStreamingBuffer?.flush(opts);
}
_lccTtsClear(opts) {
this.ttsStreamingBuffer?.clear(opts);
}
/**
* perform call hangup by jambonz
@@ -1805,7 +1583,7 @@ Duration=${duration} `
if (opts.call_hook || opts.child_call_hook) {
return await this._lccCallHook(opts);
}
if (opts.listen_status || opts.stream_status) {
if (opts.listen_status) {
await this._lccListenStatus(opts);
}
if (opts.transcribe_status) {
@@ -1836,20 +1614,11 @@ Duration=${duration} `
return this._lccConferenceParticipantAction(opts.conferenceParticipantAction);
}
else if (opts.dub) {
return this._lccDub(opts.dub, callSid);
return this._lccDub(opts);
}
else if (opts.boostAudioSignal) {
return this._lccBoostAudioSignal(opts, callSid);
}
else if (opts.media_path) {
return this._lccMediaPath(opts.media_path, callSid);
}
else if (opts.llm_tool_output) {
return this._lccToolOutput(opts.tool_call_id, opts.llm_tool_output, callSid);
}
else if (opts.llm_update) {
return this._lccLlmUpdate(opts.llm_update, callSid);
}
// whisper may be the only thing we are asked to do, or it may that
// we are doing a whisper after having muted, paused recording etc..
@@ -2037,8 +1806,7 @@ Duration=${duration} `
await writeAlerts({
alert_type: AlertType.WEBHOOK_CONNECTION_FAILURE,
account_sid: this.accountSid,
detail: `Session:reconnect error ${err}`,
url: this.application.call_hook.url,
detail: `Session:reconnect error ${err}`
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
@@ -2046,7 +1814,7 @@ Duration=${duration} `
this._jambonzHangup();
}
async _onCommand({msgid, command, call_sid, queueCommand, tool_call_id, data}) {
async _onCommand({msgid, command, call_sid, queueCommand, data}) {
this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command');
let resolution;
switch (command) {
@@ -2115,7 +1883,6 @@ Duration=${duration} `
break;
case 'listen:status':
case 'stream:status':
this._lccListenStatus(data);
break;
@@ -2148,33 +1915,6 @@ Duration=${duration} `
});
break;
case 'media:path':
this._lccMediaPath(data, call_sid)
.catch((err) => {
this.logger.info({err, data}, 'CallSession:_onCommand - error setting media path');
});
break;
case 'llm:tool-output':
this._lccToolOutput(tool_call_id, data, call_sid);
break;
case 'llm:update':
this._lccLlmUpdate(data, call_sid);
break;
case 'tts:tokens':
this._lccTtsTokens(data);
break;
case 'tts:flush':
this._lccTtsFlush(data);
break;
case 'tts:clear':
this._lccTtsClear(data);
break;
default:
this.logger.info(`CallSession:_onCommand - invalid command ${command}`);
}
@@ -2232,8 +1972,6 @@ Duration=${duration} `
};
}
this._enableInbandDtmfIfRequired(this.ep);
// we are going from an early media connection to answer
if (this.direction === CallDirection.Inbound) {
// only do this for inbound call.
@@ -2257,6 +1995,7 @@ Duration=${duration} `
//ep.cs = this;
this.ep = ep;
this.logger.info(`allocated endpoint ${ep.uuid}`);
this._configMsEndpoint();
this.ep.on('destroy', () => {
@@ -2266,6 +2005,11 @@ Duration=${duration} `
if (this.direction === CallDirection.Inbound) {
if (task.earlyMedia && !this.req.finalResponseSent) {
this.res.send(183, {body: ep.local.sdp});
this._notifyCallStatusChange({
callStatus: CallStatus.EarlyMedia,
sipStatus: 183,
sipReason: 'Early Media'
});
return {ep};
}
this.logger.debug('propogating answer');
@@ -2328,12 +2072,6 @@ Duration=${duration} `
this.logger.error('CallSession:replaceEndpoint cannot be called without stable dlg');
return;
}
// When this call kicked out from conference, session need to replace endpoint
// but this.ms might be undefined/null at this case.
this.ms = this.ms || this.getMS();
// Destroy previous ep if it's still running.
if (this.ep?.connected) this.ep.destroy();
this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
this._configMsEndpoint();
@@ -2369,8 +2107,6 @@ Duration=${duration} `
// close all background tasks
this.backgroundTaskManager.stopAll();
this.clearOrRestoreActionHookDelayProcessor().catch((err) => {});
this.ttsStreamingBuffer?.stop();
}
/**
@@ -2420,43 +2156,22 @@ Duration=${duration} `
},
localSdp: this.ep.local.sdp
});
this.logger.debug('answered call');
this.dlg.on('destroy', this._callerHungup.bind(this));
this.wrapDialog(this.dlg);
this.dlg.callSid = this.callSid;
this.emit('callStatusChange', {sipStatus: 200, sipReason: 'OK', callStatus: CallStatus.InProgress});
const tidyUp = () => {
this.dlg.on('destroy', this._callerHungup.bind(this));
this.wrapDialog(this.dlg);
this.dlg.callSid = this.callSid;
this.emit('callStatusChange', {sipStatus: 200, sipReason: 'OK', callStatus: CallStatus.InProgress});
if (this.recordOptions && this.recordState === RecordState.RecordingOff) {
this.startRecording();
}
this.dlg.on('modify', this._onReinvite.bind(this));
this.dlg.on('refer', this._onRefer.bind(this));
if (this.sipRequestWithinDialogHook) {
this.dlg.on('info', this._onRequestWithinDialog.bind(this));
this.dlg.on('message', this._onRequestWithinDialog.bind(this));
}
this.logger.debug(`CallSession:propagateAnswer - answered callSid ${this.callSid}`);
};
if (this.is3pccInvite) {
return new Promise((resolve, reject) => {
this.dlg.once('ack', (ackRequest) => {
this.logger.debug({body: ackRequest.body}, 'received ACK for 3pcc invite');
this.ep.modify(ackRequest.body)
.then(() => {
tidyUp();
return resolve();
})
.catch((err) => {
this.logger.info({err}, 'Error modifying endpoint with ACK');
reject(err);
});
});
this.logger.debug('this is a 3pcc invite');
});
if (this.recordOptions && this.recordState === RecordState.RecordingOff) {
this.startRecording();
}
tidyUp();
this.dlg.on('modify', this._onReinvite.bind(this));
this.dlg.on('refer', this._onRefer.bind(this));
if (this.sipRequestWithinDialogHook) {
this.dlg.on('info', this._onRequestWithinDialog.bind(this));
this.dlg.on('message', this._onRequestWithinDialog.bind(this));
}
this.logger.debug(`CallSession:propagateAnswer - answered callSid ${this.callSid}`);
}
else {
this.logger.debug('CallSession:propagateAnswer - call already answered - re-anchor media with a reinvite');
@@ -2490,7 +2205,7 @@ Duration=${duration} `
res.send(200, {body: this.ep.local.sdp});
}
else {
if (this.currentTask && this.currentTask.name === TaskName.Dial && this.currentTask.isOnHoldEnabled) {
if (this.currentTask.name === TaskName.Dial && this.currentTask.isOnHoldEnabled) {
this.logger.info('onholdMusic reINVITE after media has been released');
await this.currentTask.handleReinviteAfterMediaReleased(req, res);
} else {
@@ -2514,66 +2229,17 @@ Duration=${duration} `
}
/**
* Handle incoming REFER
* Handle incoming REFER if we are in a dial task
* @param {*} req
* @param {*} res
*/
_onRefer(req, res) {
const task = this.currentTask;
const sd = task.sd;
if (task && TaskName.Dial === task.name && sd && task.referHook) {
if (task && TaskName.Dial === task.name && sd) {
task.handleRefer(this, req, res);
}
else {
this._handleRefer(req, res);
}
}
async _handleRefer(req, res) {
if (this._referHook) {
try {
const to = parseUri(req.getParsedHeader('Refer-To').uri);
const by = parseUri(req.getParsedHeader('Referred-By').uri);
const customHeaders = Object.keys(req.headers)
.filter((h) => h.toLowerCase().startsWith('x-'))
.reduce((acc, h) => {
acc[h] = req.get(h);
return acc;
}, {});
const b3 = this.b3;
const httpHeaders = b3 && {b3};
const json = await this.requestor.request('verb:hook', this._referHook, {
...(this.callInfo.toJSON()),
refer_details: {
sip_refer_to: req.get('Refer-To'),
sip_referred_by: req.get('Referred-By'),
sip_user_agent: req.get('User-Agent'),
refer_to_user: to.scheme === 'tel' ? to.number : to.user,
referred_by_user: by.scheme === 'tel' ? by.number : by.user,
referring_call_sid: this.callSid,
referred_call_sid: null,
...customHeaders
}
}, httpHeaders);
if (json && Array.isArray(json)) {
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info('CallSession:handleRefer received REFER, get new tasks');
this.replaceApplication(tasks);
if (this.wakeupResolver) {
this.wakeupResolver({reason: 'CallSession: referHook new taks'});
this.wakeupResolver = null;
}
}
}
res.send(202);
this.logger.info('CallSession:handleRefer - sent 202 Accepted');
} catch (err) {
this.logger.error({err}, 'CallSession:handleRefer - error while asking referHook');
res.send(err.statusCode || 501);
}
} else {
res.send(501);
}
}
@@ -2728,35 +2394,27 @@ Duration=${duration} `
};
}
async releaseMediaToSBC(remoteSdp, releaseMediaEntirely) {
async releaseMediaToSBC(remoteSdp) {
assert(this.dlg && this.dlg.connected && this.ep && typeof remoteSdp === 'string');
await this.dlg.modify(remoteSdp, {
headers: {
'X-Reason': releaseMediaEntirely ? 'release-media-entirely' : 'release-media'
'X-Reason': 'release-media'
}
});
try {
await this.ep.destroy();
} catch (err) {
this.logger.error({err}, 'CallSession:releaseMediaToSBC: Error destroying endpoint');
}
this.ep = null;
this.ep.destroy()
.then(() => this.ep = null)
.catch((err) => this.logger.error({err}, 'CallSession:releaseMediaToSBC: Error destroying endpoint'));
}
async reAnchorMedia(currentMediaRoute = MediaPath.PartialMedia) {
async reAnchorMedia() {
assert(this.dlg && this.dlg.connected && !this.ep);
this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
this._configMsEndpoint();
await this.dlg.modify(this.ep.local.sdp, {
headers: {
'X-Reason': 'anchor-media'
}
});
if (currentMediaRoute === MediaPath.NoMedia) {
await this.ep.modify(this.dlg.remote.sdp);
}
this._configMsEndpoint();
}
async handleReinviteAfterMediaReleased(req, res) {
@@ -2823,40 +2481,15 @@ Duration=${duration} `
}
_configMsEndpoint() {
this._enableInbandDtmfIfRequired(this.ep);
this.ep.once('destroy', this._handleMediaTimeout.bind(this));
const opts = {
...(this.onHoldMusic && {holdMusic: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`}),
...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}),
...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}),
...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS})
...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'})
};
if (Object.keys(opts).length > 0) {
this.ep.set(opts);
}
}
async _handleMediaTimeout(evt) {
if (evt.reason === 'MEDIA_TIMEOUT' && !this.callGone) {
this.logger.info('CallSession:_handleMediaTimeout: received MEDIA_TIMEOUT, hangup the call');
this._jambonzHangup('Media Timeout');
}
}
async _enableInbandDtmfIfRequired(ep) {
if (ep.inbandDtmfEnabled) return;
// only enable inband dtmf detection if voip carrier dtmf_type === tones
if (this.inbandDtmfEnabled) {
// https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about
try {
ep.execute('start_dtmf');
ep.inbandDtmfEnabled = true;
} catch (err) {
this.logger.info(err, 'CallSession:_enableInbandDtmf - error enable inband DTMF');
}
}
}
/**
* notifyTaskError - only used when websocket connection is used instead of webhooks
*/
@@ -2936,58 +2569,6 @@ Duration=${duration} `
this.verbHookSpan = null;
}
}
_onTtsStreamingEmpty() {
const task = this.currentTask;
if (task && TaskName.Say === task.name) {
task.notifyTtsStreamIsEmpty();
}
}
_onTtsStreamingPause() {
this.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_paused'})
.catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingPause - Error sending'));
}
_onTtsStreamingResume() {
this.requestor?.request('tts:streaming-event', 'streaming-event', {event_type: 'stream_resumed'})
.catch((err) => this.logger.info({err}, 'CallSession:_onTtsStreamingResume - Error sending'));
}
async _onTtsStreamingConnectFailure(vendor) {
const {writeAlerts, AlertType} = this.srf.locals;
try {
await writeAlerts({
alert_type: AlertType.TTS_STREAMING_CONNECTION_FAILURE,
account_sid: this.accountSid,
vendor
});
} catch (error) {
this.logger.error({error}, 'Error writing WEBHOOK_CONNECTION_FAILURE alert');
}
this.logger.info({vendor}, 'CallSession:_onTtsStreamingConnectFailure - tts streaming connect failure');
}
async startMaxCallDurationTimer(timeLimit) {
if (!this._maxCallDurationTimer && timeLimit > 0) {
this.timeLimit = timeLimit;
this._maxCallDurationTimer = setTimeout(this._onMaxCallDuration.bind(this), timeLimit * 1000);
this.logger.debug(`CallSession:startMaxCallDurationTimer - started max call duration timer for ${timeLimit}s`);
}
}
/**
* _onMaxCallDuration - called when the call has reached the maximum duration
*/
_onMaxCallDuration() {
this.logger.info(`callSession:_onMaxCallDuration tearing down call as it has reached ${this.timeLimit}s`);
if (!this.dlg) {
this.logger.debug('CallSession:_onMaxCallDuration - no dialog, call already gone');
return;
}
this._jambonzHangup('Max Call Duration');
this._maxCallDurationTimer = null;
}
}
module.exports = CallSession;

View File

@@ -8,7 +8,7 @@ const CallSession = require('./call-session');
*/
class ConfirmCallSession extends CallSession {
constructor({logger, application, dlg, ep, tasks, callInfo, accountInfo, memberId, confName, rootSpan, req}) {
constructor({logger, application, dlg, ep, tasks, callInfo, accountInfo, memberId, confName, rootSpan}) {
super({
logger,
application,
@@ -23,7 +23,6 @@ class ConfirmCallSession extends CallSession {
});
this.dlg = dlg;
this.ep = ep;
this.req = req;
}
/**

View File

@@ -35,11 +35,21 @@ class InboundCallSession extends CallSession {
_onCancel() {
this.rootSpan.setAttributes({'call.termination': 'caller abandoned'});
this.callInfo.callTerminationBy = 'caller';
const wasEarlyMedia = this.callInfo.callStatus === 'early-media';
this._notifyCallStatusChange({
callStatus: CallStatus.NoAnswer,
sipStatus: 487,
sipReason: 'Request Terminated'
});
if (wasEarlyMedia) {
const duration = 0; // Set duration to 0 for early media termination, required param
this._notifyCallStatusChange({
callStatus: CallStatus.Completed,
sipStatus: 487,
sipReason: 'Call Terminated During Early Media',
duration: duration
});
}
this._callReleased();
}
@@ -70,14 +80,8 @@ class InboundCallSession extends CallSession {
this._hangup('caller');
}
_jambonzHangup(reason) {
this.dlg?.destroy({
headers: {
...(reason && {'X-Reason': reason})
}
});
// kill current task or wakeup the call session.
this._callReleased();
_jambonzHangup() {
this.dlg?.destroy();
}
_hangup(terminatedBy = 'jambonz') {

View File

@@ -1,6 +1,10 @@
const CallSession = require('./call-session');
const {CallStatus} = require('../utils/constants');
const moment = require('moment');
const {parseUri} = require('drachtio-srf');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const makeTask = require('../tasks/make_task');
/**
* @classdesc Subclass of CallSession. This represents a CallSession that is
* created for an outbound call that is initiated via the REST API.
@@ -42,9 +46,61 @@ class RestCallSession extends CallSession {
this.dlg = dlg;
dlg.on('destroy', this._callerHungup.bind(this));
dlg.on('refer', this._onRefer.bind(this));
dlg.on('modify', this._onReinvite.bind(this));
this.wrapDialog(dlg);
}
/**
* global referHook
*/
set referHook(hook) {
this._referHook = hook;
}
/**
* This is invoked when the called party sends REFER to Jambonz.
*/
async _onRefer(req, res) {
if (this._referHook) {
try {
const to = parseUri(req.getParsedHeader('Refer-To').uri);
const by = parseUri(req.getParsedHeader('Referred-By').uri);
const b3 = this.b3;
const httpHeaders = b3 && {b3};
const json = await this.requestor.request('verb:hook', this._referHook, {
...(this.callInfo.toJSON()),
refer_details: {
sip_refer_to: req.get('Refer-To'),
sip_referred_by: req.get('Referred-By'),
sip_user_agent: req.get('User-Agent'),
refer_to_user: to.scheme === 'tel' ? to.number : to.user,
referred_by_user: by.scheme === 'tel' ? by.number : by.user,
referring_call_sid: this.callSid,
referred_call_sid: null,
}
}, httpHeaders);
if (json && Array.isArray(json)) {
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info('RestCallSession:handleRefer received REFER, get new tasks');
this.replaceApplication(tasks);
if (this.wakeupResolver) {
this.wakeupResolver({reason: 'RestCallSession: referHook new taks'});
this.wakeupResolver = null;
}
}
}
res.send(202);
this.logger.info('RestCallSession:handleRefer - sent 202 Accepted');
} catch (err) {
this.logger.error({err}, 'RestCallSession:handleRefer - error while asking referHook');
res.send(err.statusCode || 501);
}
} else {
res.send(501);
}
}
/**
* This is invoked when the called party hangs up, in order to calculate the call duration.
*/

View File

@@ -118,9 +118,7 @@ class Conference extends Task {
this.emitter.emit('kill');
await this._doFinalMemberCheck(cs);
if (this.ep && this.ep.connected) {
// drachtio-fsmrf override esl::event::CUSTOM to conference join listerner, After finish the conference
// the application need to reset the esl::event::CUSTOM for another use on the same endpoint
this.ep.resetEslCustomEvent();
this.ep.conn.removeAllListeners('esl::event::CUSTOM::*');
this.ep.api(`conference ${this.confName} kick ${this.memberId}`)
.catch((err) => this.logger.info({err}, 'Error kicking participant'));
}
@@ -137,10 +135,15 @@ class Conference extends Task {
* @param {SipDialog} dlg
*/
async _init(cs, dlg) {
const friendlyName = this.confName;
const {createHash, retrieveHash} = cs.srf.locals.dbHelpers;
this.friendlyName = this.confName;
this.confName = `conf:${cs.accountSid}:${this.confName}`;
this.statusParams = Object.assign({
conferenceSid: this.confName,
friendlyName
}, cs.callInfo);
// check if conference is in progress
const obj = await retrieveHash(this.confName);
if (obj) {
@@ -490,7 +493,7 @@ class Conference extends Task {
}
async doConferenceParticipantAction(cs, opts) {
const {action, tag, wait_hook } = opts;
const {action, tag} = opts;
switch (action) {
case 'tag':
@@ -506,10 +509,7 @@ class Conference extends Task {
await this.clearCoachMode();
break;
case 'hold':
this.doConferenceHold(cs, {
conf_hold_status: 'hold',
...(wait_hook && {wait_hook})
});
this.doConferenceHold(cs, {conf_hold_status: 'hold'});
break;
case 'unhold':
this.doConferenceHold(cs, {conf_hold_status: 'unhold'});
@@ -546,13 +546,6 @@ class Conference extends Task {
} while (!this.killed && this.conf_hold_status === 'hold');
}
/**
* mute or unmute side of the call
*/
mute(callSid, doMute) {
this.doConferenceMute(this.callSession, {conf_mute_status: doMute});
}
/**
* Add ourselves to the waitlist of sessions to be notified once
* the conference starts
@@ -611,7 +604,7 @@ class Conference extends Task {
* when we hang up as the last member, the current member count = 1
* when we are kicked out of the call when the moderator leaves, the member count = 0
*/
if (this.participantCount === 0 || this.endConferenceOnExit) {
if (this.participantCount === 0) {
const {deleteKey} = cs.srf.locals.dbHelpers;
try {
this._notifyConferenceEvent(cs, 'end');
@@ -619,8 +612,7 @@ class Conference extends Task {
this.logger.info(`conf ${this.confName} deprovisioned: ${removed ? 'success' : 'failure'}`);
}
catch (err) {
this.logger.error(err, `Error deprovisioning conference ${this.confName},
might be the conference already cleaned by another moderator`);
this.logger.error(err, `Error deprovisioning conference ${this.confName}`);
}
}
}
@@ -653,8 +645,7 @@ class Conference extends Task {
memberId: this.memberId,
confName: this.confName,
tasks,
rootSpan: cs.rootSpan,
req: cs.req
rootSpan: cs.rootSpan
});
await this._playSession.exec();
this._playSession = null;
@@ -698,24 +689,8 @@ class Conference extends Task {
if (!params.time) params.time = (new Date()).toISOString();
if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount;
cs.application.requestor
.request(
'verb:hook',
this.statusHook,
Object.assign(
params,
Object.assign(
{
conferenceSid: this.confName,
friendlyName: this.friendlyName,
},
cs.callInfo.toJSON()
),
httpHeaders
)
)
.catch((err) =>
this.logger.info(err, 'Conference:notifyConferenceEvent - error')
);
.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams, httpHeaders))
.catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error'));
}
}

View File

@@ -16,8 +16,7 @@ class TaskConfig extends Task {
'fillerNoise',
'actionHookDelayAction',
'boostAudioSignal',
'vad',
'ttsStream'
'vad'
].forEach((k) => this[k] = this.data[k] || {});
if ('notifyEvents' in this.data) {
@@ -35,8 +34,7 @@ class TaskConfig extends Task {
'finishOnKey', 'input', 'numDigits', 'minDigits', 'maxDigits',
'interDigitTimeout', 'bargein', 'dtmfBargein', 'minBargeinWordCount', 'actionHook'
].forEach((k) => {
const val = this.bargeIn[k];
if (val !== undefined && val !== null) this.gatherOpts[k] = val;
if (this.bargeIn[k]) this.gatherOpts[k] = this.bargeIn[k];
});
}
if (this.transcribe?.enable) {
@@ -46,12 +44,6 @@ class TaskConfig extends Task {
};
delete this.transcribeOpts.enable;
}
if (this.ttsStream.enable) {
this.sayOpts = {
verb: 'say',
stream: true
};
}
if (this.data.reset) {
if (typeof this.data.reset === 'string') this.data.reset = [this.data.reset];
@@ -81,8 +73,6 @@ class TaskConfig extends Task {
get hasDub() { return Object.keys(this.dub).length; }
get hasVad() { return Object.keys(this.vad).length; }
get hasFillerNoise() { return Object.keys(this.fillerNoise).length; }
get hasReferHook() { return Object.keys(this.data).includes('referHook'); }
get hasTtsStream() { return Object.keys(this.ttsStream).length; }
get summary() {
const phrase = [];
@@ -92,13 +82,13 @@ class TaskConfig extends Task {
if (this.bargeIn.enable) phrase.push('enable barge-in');
if (this.hasSynthesizer) {
const {vendor:v, language:l, voice, label} = this.synthesizer;
const s = `{${v},${l},${voice},${label || 'None'}}`;
const {vendor:v, language:l, voice} = this.synthesizer;
const s = `{${v},${l},${voice}}`;
phrase.push(`set synthesizer${s}`);
}
if (this.hasRecognizer) {
const {vendor:v, language:l, label} = this.recognizer;
const s = `{${v},${l},${label || 'None'}}`;
const {vendor:v, language:l} = this.recognizer;
const s = `{${v},${l}}`;
phrase.push(`set recognizer${s}`);
}
if (this.hasRecording) phrase.push(this.record.action);
@@ -113,10 +103,6 @@ class TaskConfig extends Task {
if (this.notifyEvents) phrase.push(`event notification ${this.notifyEvents ? 'on' : 'off'}`);
if (this.onHoldMusic) phrase.push(`onHoldMusic: ${this.onHoldMusic}`);
if ('boostAudioSignal' in this.data) phrase.push(`setGain ${this.data.boostAudioSignal}`);
if (this.hasReferHook) phrase.push('set referHook');
if (this.hasTtsStream) {
phrase.push(`${this.ttsStream.enable ? 'enable' : 'disable'} ttsStream`);
}
return `${this.name}{${phrase.join(',')}}`;
}
@@ -309,29 +295,9 @@ class TaskConfig extends Task {
voiceMs: this.vad.voiceMs || 250,
silenceMs: this.vad.silenceMs || 150,
strategy: this.vad.strategy || 'one-shot',
mode: (this.vad.mode !== undefined && this.vad.mode !== null) ? this.vad.mode : 2
mode: this.vad.mod || 2
};
}
if (this.hasReferHook) {
cs.referHook = this.data.referHook;
}
if (this.ttsStream.enable && this.sayOpts) {
this.sayOpts.synthesizer = this.hasSynthesizer ? this.synthesizer : {
vendor: cs.speechSynthesisVendor,
language: cs.speechSynthesisLanguage,
voice: cs.speechSynthesisVoice,
...(cs.speechSynthesisLabel && {
label: cs.speechSynthesisLabel
})
};
this.logger.info({opts: this.gatherOpts}, 'Config: enabling ttsStream');
cs.enableBackgroundTtsStream(this.sayOpts);
} else if (!this.ttsStream.enable) {
this.logger.info('Config: disabling ttsStream');
cs.disableTtsStream();
}
}
async kill(cs) {

View File

@@ -6,7 +6,6 @@ const {
TaskName,
TaskPreconditions,
MAX_SIMRINGS,
MediaPath,
KillReason
} = require('../utils/constants');
const assert = require('assert');
@@ -18,12 +17,9 @@ const dbUtils = require('../utils/db-utils');
const parseDecibels = require('../utils/parse-decibels');
const debug = require('debug')('jambonz:feature-server');
const {parseUri} = require('drachtio-srf');
const {ANCHOR_MEDIA_ALWAYS,
JAMBONZ_DISABLE_DIAL_PAI_HEADER,
JAMBONES_DIAL_SBC_FOR_REGISTERED_USER} = require('../config');
const {ANCHOR_MEDIA_ALWAYS, JAMBONZ_DISABLE_DIAL_PAI_HEADER} = require('../config');
const { isOnhold, isOpusFirst } = require('../utils/sdp-utils');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const { selectHostPort } = require('../utils/network');
function parseDtmfOptions(logger, dtmfCapture) {
let parentDtmfCollector, childDtmfCollector;
@@ -86,8 +82,6 @@ function filterAndLimit(logger, tasks) {
return unique;
}
const sleepFor = (ms) => new Promise((resolve) => setTimeout(() => resolve(), ms));
class TaskDial extends Task {
constructor(logger, opts) {
super(logger, opts);
@@ -109,8 +103,6 @@ class TaskDial extends Task {
this.proxy = this.data.proxy;
this.tag = this.data.tag;
this.boostAudioSignal = this.data.boostAudioSignal;
this._mediaPath = MediaPath.FullMedia;
this.translate = this.data.translate;
if (this.dtmfHook) {
const {parentDtmfCollector, childDtmfCollector} = parseDtmfOptions(logger, this.data.dtmfCapture || {});
@@ -122,9 +114,8 @@ class TaskDial extends Task {
}
}
const listenData = this.data.listen || this.data.stream;
if (listenData) {
this.listenTask = makeTask(logger, {'listen': listenData }, this);
if (this.data.listen) {
this.listenTask = makeTask(logger, {'listen': this.data.listen}, this);
}
if (this.data.transcribe) {
this.transcribeTask = makeTask(logger, {'transcribe' : this.data.transcribe}, this);
@@ -159,22 +150,17 @@ class TaskDial extends Task {
get canReleaseMedia() {
const keepAnchor = this.data.anchorMedia ||
this.data.translate ||
this.cs.isBackGroundListen ||
this.cs.onHoldMusic ||
ANCHOR_MEDIA_ALWAYS ||
this.listenTask ||
this.dubTasks ||
this.transcribeTask ||
this.startAmd;
this.cs.isBackGroundListen ||
this.cs.onHoldMusic ||
ANCHOR_MEDIA_ALWAYS ||
this.listenTask ||
this.dubTasks ||
this.transcribeTask ||
this.startAmd;
return !keepAnchor;
}
get shouldExitMediaPathEntirely() {
return this.data.exitMediaPath;
}
get summary() {
if (this.target.length === 1) {
const target = this.target[0];
@@ -195,16 +181,6 @@ class TaskDial extends Task {
async exec(cs) {
await super.exec(cs);
if (this.data.anchorMedia && this.data.exitMediaPath) {
this.logger.info('Dial:exec - incompatible anchorMedia and exitMediaPath are both set, will obey anchorMedia');
delete this.data.exitMediaPath;
}
if (!this.canReleaseMedia && this.data.exitMediaPath) {
this.logger.info(
'Dial:exec - exitMediaPath is set so features such as transcribe and record will not work on this call');
}
try {
if (this.listenTask) {
const {span, ctx} = this.startChildSpan(`nested:${this.listenTask.summary}`);
@@ -227,16 +203,7 @@ class TaskDial extends Task {
else {
this.epOther = cs.ep;
if (this.dialMusic && this.epOther && this.epOther.connected) {
(async() => {
do {
try {
await this.epOther.play(this.dialMusic);
} catch (err) {
this.logger.error(err, `TaskDial:exec error playing ${this.dialMusic}`);
await sleepFor(1000);
}
} while (!this.killed || !this.bridged);
})();
this.epOther.play(this.dialMusic).catch((err) => {});
}
}
if (!this.killed) await this._attemptCalls(cs);
@@ -276,9 +243,7 @@ class TaskDial extends Task {
this._removeDtmfDetection(this.dlg);
await this._killOutdials();
if (this.sd) {
const byeReasonHeader = this.killReason === KillReason.MediaTimeout ? 'Media Timeout' : undefined;
this.sd.kill(byeReasonHeader);
this.sd.ep?.removeListener('destroy', this._handleMediaTimeout.bind(this));
this.sd.kill();
this.sd.removeAllListeners();
this.sd = null;
}
@@ -293,17 +258,6 @@ class TaskDial extends Task {
this.transcribeTask.span.end();
this.transcribeTask = null;
}
if (this.llmTask) {
await this.llmTask.kill(cs);
this.llmTask.span.end();
this.llmTask = null;
}
if (this.sdLlmTask) {
await this.sdLlmTask.kill(cs);
this.sdLlmTask.span.end();
this.sdLlmTask = null;
}
this.notifyTaskDone();
}
@@ -335,7 +289,7 @@ class TaskDial extends Task {
if (!cs.callGone && this.epOther) {
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia) this._releaseMedia(cs, this.sd, this.shouldExitMediaPathEntirely);
if (this.canReleaseMedia) this._releaseMedia(cs, this.sd);
else this.epOther.bridge(this.ep);
}
} catch (err) {
@@ -375,26 +329,17 @@ class TaskDial extends Task {
const to = parseUri(req.getParsedHeader('Refer-To').uri);
const by = parseUri(req.getParsedHeader('Referred-By').uri);
const referredBy = req.get('Referred-By');
const userAgent = req.get('User-Agent');
const customHeaders = Object.keys(req.headers)
.filter((h) => h.toLowerCase().startsWith('x-'))
.reduce((acc, h) => {
acc[h] = req.get(h);
return acc;
}, {});
this.logger.info({to}, 'refer to parsed');
const json = await cs.requestor.request('verb:hook', this.referHook, {
...(callInfo.toJSON()),
refer_details: {
sip_refer_to: req.get('Refer-To'),
sip_referred_by: req.get('Referred-By'),
sip_user_agent: req.get('User-Agent'),
refer_to_user: to.scheme === 'tel' ? to.number : to.user,
...(referredBy && {sip_referred_by: referredBy}),
...(userAgent && {sip_user_agent: userAgent}),
...(by && {referred_by_user: by.scheme === 'tel' ? by.number : by.user}),
referred_by_user: by.scheme === 'tel' ? by.number : by.user,
referring_call_sid,
referred_call_sid,
...customHeaders
referred_call_sid
}
}, httpHeaders);
if (json && Array.isArray(json)) {
@@ -420,13 +365,9 @@ class TaskDial extends Task {
this.logger.info(err, 'Dial:handleRefer - error setting new application after receiving REFER');
}
}
//caller and callee legs are briged together, accept refer with 202 will release callee leg endpoint
//that makes freeswitch release endpoint for caller leg.
if (this.ep) this.ep.unbridge();
res.send(202);
this.logger.info('DialTask:handleRefer - sent 202 Accepted');
} catch (err) {
this.logger.info({err}, 'DialTask:handleRefer - error processing incoming REFER');
res.send(err.statusCode || 501);
}
}
@@ -532,11 +473,11 @@ class TaskDial extends Task {
}
async _attemptCalls(cs) {
const {req, callInfo, direction, srf} = cs;
const {req, srf} = cs;
const {getSBC} = srf.locals;
const {lookupTeamsByAccount, lookupAccountBySid} = srf.locals.dbHelpers;
const {lookupCarrier, lookupCarrierByPhoneNumber, lookupVoipCarrierBySid} = dbUtils(this.logger, cs.srf);
let sbcAddress = this.proxy || getSBC();
const {lookupCarrier, lookupCarrierByPhoneNumber} = dbUtils(this.logger, cs.srf);
const sbcAddress = this.proxy || getSBC();
const teamsInfo = {};
let fqdn;
@@ -544,25 +485,17 @@ class TaskDial extends Task {
this.headers = {
'X-Account-Sid': cs.accountSid,
...(req && req.has('X-CID') && {'X-CID': req.get('X-CID')}),
...(direction === 'outbound' && callInfo.sbcCallid && {'X-CID': callInfo.sbcCallid}),
...(!JAMBONZ_DISABLE_DIAL_PAI_HEADER && req && {
...(req.has('P-Asserted-Identity') && {'P-Asserted-Identity': req.get('P-Asserted-Identity')}),
...(req.has('Privacy') && {'Privacy': req.get('Privacy')}),
}),
...(req && req.has('P-Asserted-Identity') && !JAMBONZ_DISABLE_DIAL_PAI_HEADER &&
{'P-Asserted-Identity': req.get('P-Asserted-Identity')}),
...(req && req.has('X-Voip-Carrier-Sid') && {'X-Voip-Carrier-Sid': req.get('X-Voip-Carrier-Sid')}),
// Put headers at the end to make sure opt.headers override all default behavior.
...this.headers
};
// default to inband dtmf if not specified
this.inbandDtmfEnabled = cs.inbandDtmfEnabled;
// get calling user from From header
const parsedFrom = req.getParsedHeader('from');
const fromUri = parseUri(parsedFrom.uri);
const opts = {
headers: this.headers,
proxy: `sip:${sbcAddress}`,
callingNumber: this.callerId || fromUri.user,
callingNumber: this.callerId || req.callingNumber,
...(this.callerName && {callingName: this.callerName}),
opusFirst: isOpusFirst(this.cs.ep.remote.sdp)
};
@@ -608,15 +541,6 @@ class TaskDial extends Task {
this.logger.error({err}, 'Error looking up account by sid');
}
}
// find handling sbc sip for called user
if (JAMBONES_DIAL_SBC_FOR_REGISTERED_USER && t.type === 'user') {
const { registrar } = srf.locals.dbHelpers;
const reg = await registrar.query(t.name);
if (reg) {
sbcAddress = selectHostPort(this.logger, reg.sbcAddress, 'tcp')[1];
}
//sbc outbound return 404 Notfound to handle case called user is not reigstered.
}
if (t.type === 'phone' && t.trunk) {
const voip_carrier_sid = await lookupCarrier(cs.accountSid, t.trunk);
this.logger.info(`Dial:_attemptCalls: selected ${voip_carrier_sid} for requested carrier: ${t.trunk}`);
@@ -633,17 +557,10 @@ class TaskDial extends Task {
const str = this.callerId || req.callingNumber || '';
const callingNumber = str.startsWith('+') ? str.substring(1) : str;
const voip_carrier_sid = await lookupCarrierByPhoneNumber(cs.accountSid, callingNumber);
const req_voip_carrier_sid = req.has('X-Voip-Carrier-Sid') ? req.get('X-Voip-Carrier-Sid') : null;
if (voip_carrier_sid) {
this.logger.info(
`Dial:_attemptCalls: selected voip_carrier_sid ${voip_carrier_sid} for callingNumber: ${callingNumber}`);
opts.headers['X-Requested-Carrier-Sid'] = voip_carrier_sid;
// Checking if outbound carrier is different from inbound carrier and has dtmf type tones
if (voip_carrier_sid !== req_voip_carrier_sid) {
const [voipCarrier] = await lookupVoipCarrierBySid(voip_carrier_sid);
this.inbandDtmfEnabled = voipCarrier?.dtmf_type === 'tones';
}
}
}
@@ -684,7 +601,6 @@ class TaskDial extends Task {
dialCallStatus: obj.callStatus,
dialSipStatus: obj.sipStatus,
dialCallSid: sd.callSid,
dialSbcCallid: sd.callInfo.sbcCallid
});
}
switch (obj.callStatus) {
@@ -718,7 +634,6 @@ class TaskDial extends Task {
clearTimeout(this.timerRing);
try {
await this._connectSingleDial(cs, sd);
await this._startTranslate(cs, sd);
} catch (err) {
this.logger.info({err}, 'Dial:_attemptCalls - Error calling _connectSingleDial ');
sd.removeAllListeners();
@@ -772,30 +687,6 @@ class TaskDial extends Task {
this._killOutdials(); // NB: order is important
}
async _startTranslate(cs, sd) {
if (this.translate) {
const {callerLanguage, calleeLanguage, gain} = this.translate;
assert(this.translate.llm, 'Dial:_startTranslate - missing llm in translate');
this.logger.debug('Dial:_startTranslate start llm services');
// setup caller LLM task
this.llmTask = makeTask(this.logger, {'llm': this.translate.llm});
this.llmTask.configureDialTranslate({
language: callerLanguage,
gain,
});
// setup callee LLM task
this.sdLlmTask = makeTask(this.logger, {'llm': this.translate.llm});
this.sdLlmTask.configureDialTranslate({
language: calleeLanguage,
gain,
});
this.llmTask.exec(cs, {ep: this.ep});
this.sdLlmTask.exec(cs, {ep: sd.ep});
}
}
async _onReinvite(req, res) {
try {
let isHandled = false;
@@ -825,7 +716,7 @@ class TaskDial extends Task {
// Offhold, time to release media
const newSdp = await this.ep.modify(req.body);
await res.send(200, {body: newSdp});
await this._releaseMedia(this.cs, this.sd, this.shouldExitMediaPathEntirely);
await this._releaseMedia(this.cs, this.sd);
this.isOutgoingLegHold = false;
} else {
this.logger.debug('Dial: _onReinvite receive unhold Request, update media server');
@@ -912,7 +803,7 @@ class TaskDial extends Task {
if (cs.sipRequestWithinDialogHook) this._initSipIndialogRequestListener(cs, this.dlg);
if (this.transcribeTask) this.transcribeTask.exec(cs, {ep: this.epOther, ep2:this.ep});
if (this.listenTask) this.listenTask.exec(cs, {ep: this.listenTask.channel === 2 ? this.ep : this.epOther});
if (this.listenTask) this.listenTask.exec(cs, {ep: this.epOther});
if (this.startAmd) {
try {
this.startAmd(cs, this.ep, this, this.data.amd);
@@ -934,17 +825,7 @@ class TaskDial extends Task {
}
/* if we can release the media back to the SBC, do so now */
if (this.canReleaseMedia || this.shouldExitMediaPathEntirely) {
setTimeout(this._releaseMedia.bind(this, cs, sd, this.shouldExitMediaPathEntirely), 200);
}
this.sd.ep.once('destroy', this._handleMediaTimeout.bind(this));
}
_handleMediaTimeout(evt) {
if (evt.reason === 'MEDIA_TIMEOUT' && this.sd && this.bridged) {
this.kill(this.cs, KillReason.MediaTimeout);
}
if (this.canReleaseMedia) setTimeout(this._releaseMedia.bind(this, cs, sd), 200);
}
_bridgeEarlyMedia(sd) {
@@ -956,57 +837,22 @@ class TaskDial extends Task {
}
}
/* public api */
async updateMediaPath(desiredPath) {
this.logger.info(`Dial:updateMediaPath - ${this._mediaPath} => ${desiredPath}`);
switch (desiredPath) {
case MediaPath.NoMedia:
assert(this._mediaPath !== MediaPath.NoMedia, 'updateMediaPath: already no-media');
await this._releaseMedia(this.cs, this.sd, true);
break;
case MediaPath.PartialMedia:
assert(this._mediaPath !== MediaPath.PartialMedia, 'updateMediaPath: already partial-media');
if (this._mediaPath === MediaPath.FullMedia) {
await this._releaseMedia(this.cs, this.sd, false);
}
else {
// to go from no-media to partial-media we need to go through full-media first
await this.reAnchorMedia(this.cs, this.sd);
await this._releaseMedia(this.cs, this.sd, false);
}
assert(!this.epOther, 'updateMediaPath: epOther should be null');
assert(!this.ep, 'updateMediaPath: ep should be null');
break;
case MediaPath.FullMedia:
assert(this._mediaPath !== MediaPath.FullMedia, 'updateMediaPath: already full-media');
await this.reAnchorMedia(this.cs, this.sd);
break;
default:
assert(false, `updateMediaPath: invalid path request ${desiredPath}`);
}
}
/**
* Release the media from freeswitch
* @param {*} cs
* @param {*} sd
*/
async _releaseMedia(cs, sd, releaseEntirely = false) {
async _releaseMedia(cs, sd) {
assert(cs.ep && sd.ep);
try {
// Wait until we got new SDP from B leg to ofter to A Leg
const aLegSdp = cs.ep.remote.sdp;
await sd.releaseMediaToSBC(aLegSdp, cs.ep.local.sdp, releaseEntirely);
await sd.releaseMediaToSBC(aLegSdp, cs.ep.local.sdp);
const bLegSdp = sd.dlg.remote.sdp;
await cs.releaseMediaToSBC(bLegSdp, releaseEntirely);
await cs.releaseMediaToSBC(bLegSdp);
this.epOther = null;
this._mediaPath = releaseEntirely ? MediaPath.NoMedia : MediaPath.PartialMedia;
this.logger.info(
`Dial:_releaseMedia - successfully released media from freewitch, media path is now ${this._mediaPath}`);
this.logger.info('Dial:_releaseMedia - successfully released media from freewitch');
} catch (err) {
this.logger.info({err}, 'Dial:_releaseMedia error');
}
@@ -1016,14 +862,8 @@ class TaskDial extends Task {
if (cs.ep && sd.ep) return;
this.logger.info('Dial:reAnchorMedia - re-anchoring media to freewitch');
await Promise.all([sd.reAnchorMedia(this._mediaPath), cs.reAnchorMedia(this._mediaPath)]);
await Promise.all([sd.reAnchorMedia(), cs.reAnchorMedia()]);
this.epOther = cs.ep;
this.epOther.bridge(this.ep);
this._mediaPath = MediaPath.FullMedia;
this.logger.info(
`Dial:_releaseMedia - successfully re-anchored media to freewitch, media path is now ${this._mediaPath}`);
}
// Handle RE-INVITE hold from caller leg.
@@ -1042,12 +882,11 @@ class TaskDial extends Task {
}
this._onHoldHook(req);
} else if (!isOnhold(req.body)) {
if (this.epOther && this.ep && this.isIncomingLegHold &&
(this.canReleaseMedia || this.shouldExitMediaPathEntirely)) {
if (this.epOther && this.ep && this.isIncomingLegHold && this.canReleaseMedia) {
// Offhold, time to release media
const newSdp = await this.epOther.modify(req.body);
await res.send(200, {body: newSdp});
await this._releaseMedia(this.cs, this.sd, this.shouldExitMediaPathEntirely);
await this._releaseMedia(this.cs, this.sd);
isHandled = true;
}
this.isIncomingLegHold = false;
@@ -1106,8 +945,7 @@ class TaskDial extends Task {
callInfo: this.cs.callInfo,
accountInfo: this.cs.accountInfo,
tasks,
rootSpan: this.cs.rootSpan,
req: this.cs.req
rootSpan: this.cs.rootSpan
});
await this._onHoldSession.exec();
this._onHoldSession = null;

View File

@@ -369,8 +369,7 @@ class TaskEnqueue extends Task {
callInfo: cs.callInfo,
accountInfo: cs.accountInfo,
tasks: tasksToRun,
rootSpan: cs.rootSpan,
req: cs.req
rootSpan: cs.rootSpan
});
await this._playSession.exec();
this._playSession = null;

View File

@@ -11,10 +11,8 @@ const {
NvidiaTranscriptionEvents,
JambonzTranscriptionEvents,
AssemblyAiTranscriptionEvents,
VoxistTranscriptionEvents,
VadDetection,
VerbioTranscriptionEvents,
SpeechmaticsTranscriptionEvents
VerbioTranscriptionEvents
} = require('../utils/constants.json');
const {
JAMBONES_GATHER_EARLY_HINTS_MATCH,
@@ -24,8 +22,6 @@ const {
const makeTask = require('./make_task');
const assert = require('assert');
const SttTask = require('./stt-task');
const { SpeechCredentialError } = require('../utils/error');
const SPEECHMATICS_DEFAULT_ASR_TIMEOUT = 1200;
class TaskGather extends SttTask {
constructor(logger, opts, parentTask) {
@@ -126,22 +122,10 @@ class TaskGather extends SttTask {
return s;
}
async exec(cs, obj) {
try {
await this.handling(cs, obj);
} catch (error) {
if (error instanceof SpeechCredentialError) {
this.logger.info('Gather failed due to SpeechCredentialError, finished!');
this.notifyTaskDone();
return;
}
throw error;
}
}
async handling(cs, {ep}) {
async exec(cs, {ep}) {
this.logger.debug({options: this.data}, 'Gather:exec');
await super.exec(cs, {ep});
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
this.fillerNoise = {
...(cs.fillerNoise || {}),
@@ -157,23 +141,12 @@ class TaskGather extends SttTask {
const {hints, hintsBoost} = cs.globalSttHints;
const setOfHints = new Set((this.data.recognizer.hints || [])
.concat(hints)
// allow for hints to be an array of object
.filter((h) => (typeof h === 'string' && h.length > 0) || (typeof h === 'object')));
.filter((h) => typeof h === 'string' && h.length > 0));
this.data.recognizer.hints = [...setOfHints];
if (!this.data.recognizer.hintsBoost && hintsBoost) this.data.recognizer.hintsBoost = hintsBoost;
this.logger.debug({hints: this.data.recognizer.hints, hintsBoost: this.data.recognizer.hintsBoost},
'Gather:exec - applying global sttHints');
}
// specials case for speechmatics: they dont do endpointing so we need to enable continuous ASR
if (this.vendor === 'speechmatics' && !this.isContinuousAsr) {
const maxDelay = this.recognizer?.speechmaticsOptions?.transcription_config?.max_delay;
if (maxDelay) this.asrTimeout = Math.min(SPEECHMATICS_DEFAULT_ASR_TIMEOUT, maxDelay * 1000);
else this.asrTimeout = SPEECHMATICS_DEFAULT_ASR_TIMEOUT;
this.isContinuousAsr = true;
this.logger.debug(`Gather:exec - auto-enabling continuous ASR for speechmatics w/ timeout ${this.asrTimeout}`);
}
if (!this.isContinuousAsr && cs.isContinuousAsr) {
this.isContinuousAsr = true;
this.asrTimeout = cs.asrTimeout * 1000;
@@ -204,10 +177,8 @@ class TaskGather extends SttTask {
this._startVad();
const startDtmfListener = () => {
assert(!this._dtmfListenerStarted);
if (this.input.includes('digits') || this.dtmfBargein || this.asrDtmfTerminationDigit) {
ep.on('dtmf', this._onDtmf.bind(this, cs, ep));
this._dtmfListenerStarted = true;
}
};
@@ -222,6 +193,7 @@ class TaskGather extends SttTask {
return;
}
this._startTranscribing(ep);
return updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
} catch (e) {
await this._startFallback(cs, ep, {error: e});
}
@@ -301,14 +273,15 @@ class TaskGather extends SttTask {
await this._setSpeechHandlers(cs, ep);
if (!this.resolved && !this.killed) {
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}
else {
this.logger.info('Gather:exec - task was killed or resolved quickly, not starting transcription');
}
}
// https://github.com/jambonz/jambonz-feature-server/issues/913
if (this.listenDuringPrompt || (!this.sayTask && !this.playTask)) {
if (this.listenDuringPrompt) {
startDtmfListener();
}
@@ -349,19 +322,9 @@ class TaskGather extends SttTask {
clearTimeout(this.interDigitTimer);
let resolved = false;
if (this.dtmfBargein) {
if (!this.playComplete) {
this.notifyStatus({event: 'dtmf-bargein-detected', ...evt});
}
this._killAudio(cs);
this.emit('dtmf', evt);
}
if (this.isContinuousAsr && evt.dtmf === this.asrDtmfTerminationDigit && this._bufferedTranscripts.length > 0) {
this.logger.info(`continuousAsr triggered with dtmf ${this.asrDtmfTerminationDigit}`);
this._clearAsrTimer();
this._clearTimer();
this._startFinalAsrTimer();
return;
}
if (evt.dtmf === this.finishOnKey && this.input.includes('digits')) {
resolved = true;
this._resolve('dtmf-terminator-key');
@@ -384,6 +347,13 @@ class TaskGather extends SttTask {
this._resolve('dtmf-num-digits');
}
}
else if (this.isContinuousAsr && evt.dtmf === this.asrDtmfTerminationDigit) {
this.logger.info(`continuousAsr triggered with dtmf ${this.asrDtmfTerminationDigit}`);
this._clearAsrTimer();
this._clearTimer();
this._startFinalAsrTimer();
return;
}
if (!resolved && this.interDigitTimeout > 0 && this.digitBuffer.length >= this.minDigits) {
/* start interDigitTimer */
const ms = this.interDigitTimeout * 1000;
@@ -524,35 +494,6 @@ class TaskGather extends SttTask {
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
case 'voxist':
this.bugname = `${this.bugname_prefix}voxist_transcribe`;
this.addCustomEventListener(ep, VoxistTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(
ep, VoxistTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, VoxistTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, VoxistTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
break;
case 'speechmatics':
this.bugname = `${this.bugname_prefix}speechmatics_transcribe`;
this.addCustomEventListener(
ep, SpeechmaticsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Info,
this._onSpeechmaticsInfo.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.RecognitionStarted,
this._onSpeechmaticsRecognitionStarted.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Error,
this._onSpeechmaticsErrror.bind(this, cs, ep));
break;
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
@@ -602,8 +543,7 @@ class TaskGather extends SttTask {
account_sid: this.cs.accountSid,
alert_type: AlertType.STT_FAILURE,
vendor: this.vendor,
detail: err.message,
target_sid: this.cs.callSid
detail: err.message
});
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
}
@@ -746,7 +686,6 @@ class TaskGather extends SttTask {
this._fillerNoiseOn = false; // in a race, if we just started audio it may sneak through here
this.ep.api('uuid_break', this.ep.uuid)
.catch((err) => this.logger.info(err, 'Error killing audio'));
cs.clearTtsStream();
}
return;
}
@@ -760,7 +699,6 @@ class TaskGather extends SttTask {
this.playTask.kill(cs);
this.playTask = null;
}
this.playComplete = true;
}
_onTranscription(cs, ep, evt, fsEvent) {
@@ -792,7 +730,7 @@ class TaskGather extends SttTask {
evt = this.normalizeTranscription(evt, this.vendor, 1, this.language,
this.shortUtterance, this.data.recognizer.punctuation);
this.logger.debug({evt, bugname, finished, vendor: this.vendor}, 'Gather:_onTranscription normalized transcript');
//this.logger.debug({evt, bugname, finished, vendor: this.vendor}, 'Gather:_onTranscription normalized transcript');
if (evt.alternatives.length === 0) {
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, continue listening');
@@ -855,7 +793,7 @@ class TaskGather extends SttTask {
const t = evt.alternatives[0].transcript;
if (t) {
/* remove trailing punctuation */
if (this.vendor !== 'speechmatics' && /[,;:\.!\?]$/.test(t)) {
if (/[,;:\.!\?]$/.test(t)) {
this.logger.debug('TaskGather:_onTranscription - removing trailing punctuation');
evt.alternatives[0].transcript = t.slice(0, -1);
}
@@ -871,10 +809,7 @@ class TaskGather extends SttTask {
this._startAsrTimer();
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'speechmatics'].includes(this.vendor) &&
!this.vendor.startsWith('custom')) {
this._startTranscribing(ep);
}
if (!['soniox', 'aws', 'microsoft', 'deepgram'].includes(this.vendor)) this._startTranscribing(ep);
}
else {
/* this was removed to fix https://github.com/jambonz/jambonz-feature-server/issues/783 */
@@ -927,7 +862,6 @@ class TaskGather extends SttTask {
if (!this.playComplete) {
this.logger.debug({transcript: evt.alternatives[0].transcript}, 'killing audio due to speech');
this.emit('vad');
this.notifyStatus({event: 'speech-bargein-detected', ...evt});
}
this._killAudio(cs);
}
@@ -942,18 +876,15 @@ class TaskGather extends SttTask {
this.cs.callInfo, httpHeaders));
}
if (this.vendor === 'soniox') {
this._clearTimer();
if (evt.vendor.finalWords.length) {
this.logger.debug({evt}, 'TaskGather:_onTranscription - buffering soniox transcript');
this._sonioxTranscripts.push(evt.vendor.finalWords);
}
}
// If transcription received, reset timeout timer.
if (this._timeoutTimer && !emptyTranscript) {
this._startTimer();
}
/* restart asr timer if we get a partial transcript (only if the asr timer is already running) */
/* note: https://github.com/jambonz/jambonz-feature-server/issues/866 */
if (this.isContinuousAsr && this._asrTimer) this._startAsrTimer();
/* restart asr timer if we get a partial transcript */
if (this.isContinuousAsr) this._startAsrTimer();
}
}
_onEndOfUtterance(cs, ep) {
@@ -993,6 +924,7 @@ class TaskGather extends SttTask {
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {
this.logger.debug('gather:_startFallback');
this.notifyError({ msg: 'ASR error',
@@ -1001,6 +933,7 @@ class TaskGather extends SttTask {
this._speechHandlersSet = false;
await this._setSpeechHandlers(cs, ep);
this._startTranscribing(ep);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
return true;
} catch (error) {
this.logger.info({error}, `There is error while falling back to ${this.fallbackVendor}`);
@@ -1028,13 +961,11 @@ class TaskGather extends SttTask {
if (code === 413 && error === 'Too much speech') return this._resolve('timeout');
}
this.logger.info({evt}, 'TaskGather:_onJambonzError');
const errMessage = evt.error || evt.Message;
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.STT_FAILURE,
message: `Custom speech vendor ${this.vendor} error: ${errMessage}`,
message: `Custom speech vendor ${this.vendor} error: ${evt.error}`,
vendor: this.vendor,
target_sid: cs.callSid
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
if (!(await this._startFallback(cs, ep, evt))) {
this.notifyTaskDone();
@@ -1048,25 +979,12 @@ class TaskGather extends SttTask {
}
}
async _onSpeechmaticsErrror(cs, _ep, evt) {
// eslint-disable-next-line no-unused-vars
const {message, ...e} = evt;
this._onVendorError(cs, _ep, {error: JSON.stringify(e)});
}
async _onVendorError(cs, _ep, evt) {
super._onVendorError(cs, _ep, evt);
if (!(await this._startFallback(cs, _ep, evt))) {
this._resolve('stt-error', evt);
}
}
async _onSpeechmaticsRecognitionStarted(_cs, _ep, evt) {
this.logger.debug({evt}, 'TaskGather:_onSpeechmaticsRecognitionStarted');
}
async _onSpeechmaticsInfo(_cs, _ep, evt) {
this.logger.debug({evt}, 'TaskGather:_onSpeechmaticsInfo');
}
_onVadDetected(cs, ep) {
if (this.bargein && this.minBargeinWordCount === 0) {
@@ -1125,7 +1043,6 @@ class TaskGather extends SttTask {
this.span.setAttributes({
channel: 1,
'stt.label': this.label || 'None',
'stt.resolve': reason,
'stt.result': JSON.stringify(evt)
});
@@ -1194,6 +1111,7 @@ class TaskGather extends SttTask {
} catch (err) { /*already logged error*/ }
// Gather got response from hook, cancel actionHookDelay processing
this.logger.debug('TaskGather:_resolve - checking ahd');
if (this.cs.actionHookDelayProcessor) {
if (returnedVerbs) {
this.logger.debug('TaskGather:_resolve - got response from action hook, cancelling actionHookDelay');

View File

@@ -17,7 +17,7 @@ class TaskListen extends Task {
[
'action', 'auth', 'method', 'url', 'finishOnKey', 'maxLength', 'metadata', 'mixType', 'passDtmf', 'playBeep',
'sampleRate', 'timeout', 'transcribe', 'wsAuth', 'disableBidirectionalAudio', 'channel'
'sampleRate', 'timeout', 'transcribe', 'wsAuth', 'disableBidirectionalAudio'
].forEach((k) => this[k] = this.data[k]);
this.mixType = this.mixType || 'mono';

View File

@@ -1,117 +0,0 @@
const Task = require('../task');
const {TaskPreconditions} = require('../../utils/constants');
const TaskLlmOpenAI_S2S = require('./llms/openai_s2s');
const TaskLlmVoiceAgent_S2S = require('./llms/voice_agent_s2s');
const TaskLlmUltravox_S2S = require('./llms/ultravox_s2s');
const TaskLlmElevenlabs_S2S = require('./llms/elevenlabs_s2s');
class TaskLlm extends Task {
constructor(logger, opts) {
super(logger, opts);
this.preconditions = TaskPreconditions.Endpoint;
['vendor', 'model', 'auth', 'connectOptions'].forEach((prop) => {
this[prop] = this.data[prop];
});
this.eventHandlers = [];
// delegate to the specific llm model
this.llm = this.createSpecificLlm();
}
get name() { return this.llm.name ; }
get toolHook() { return this.llm?.toolHook; }
get eventHook() { return this.llm?.eventHook; }
get ep() { return this.cs.ep; }
configureDialTranslate(opts) {
this.logger.debug(opts, 'TaskLlm:configureDialTranslate');
this.llm.configureDialTranslate(opts);
}
async exec(cs, {ep}) {
await super.exec(cs, {ep});
await this.llm.exec(cs, {ep});
}
async kill(cs) {
super.kill(cs);
await this.llm.kill(cs);
}
createSpecificLlm() {
let llm;
switch (this.vendor) {
case 'openai':
case 'microsoft':
llm = new TaskLlmOpenAI_S2S(this.logger, this.data, this);
break;
case 'voiceagent':
case 'deepgram':
llm = new TaskLlmVoiceAgent_S2S(this.logger, this.data, this);
break;
case 'ultravox':
llm = new TaskLlmUltravox_S2S(this.logger, this.data, this);
break;
case 'elevenlabs':
llm = new TaskLlmElevenlabs_S2S(this.logger, this.data, this);
break;
default:
throw new Error(`Unsupported vendor ${this.vendor} for LLM`);
}
if (!llm) {
throw new Error(`Unsupported vendor:model ${this.vendor}:${this.model}`);
}
return llm;
}
addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
ep.addCustomEventListener(event, handler);
}
removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
}
async sendEventHook(data) {
await this.cs?.requestor.request('llm:event', this.eventHook, data);
}
async sendToolHook(tool_call_id, data) {
await this.cs?.requestor.request('llm:tool-call', this.toolHook, {tool_call_id, ...data});
}
async processToolOutput(tool_call_id, data) {
if (!this.ep.connected) {
this.logger.info('TaskLlm:processToolOutput - no connected endpoint');
return;
}
this.llm.processToolOutput(this.ep, tool_call_id, data);
}
async processLlmUpdate(data, callSid) {
if (this.ep.connected) {
if (typeof this.llm.processLlmUpdate === 'function') {
this.llm.processLlmUpdate(this.ep, data, callSid);
}
else {
const {vendor, model} = this.llm;
this.logger.info({data, callSid},
`TaskLlm:_processLlmUpdate: LLM ${vendor}:${model} does not support llm:update`);
}
}
}
}
module.exports = TaskLlm;

View File

@@ -1,326 +0,0 @@
const Task = require('../../task');
const TaskName = 'Llm_Elevenlabs_s2s';
const {LlmEvents_Elevenlabs} = require('../../../utils/constants');
const {request} = require('undici');
const { parseDecibels } = require('drachtio-fsmrf/lib/utils');
const ClientEvent = 'client.event';
const SessionDelete = 'session.delete';
const elevenlabs_server_events = [
'conversation_initiation_metadata',
'user_transcript',
'agent_response',
'client_tool_call'
];
const expandWildcards = (events) => {
const expandedEvents = [];
events.forEach((evt) => {
if (evt.endsWith('.*')) {
const prefix = evt.slice(0, -2); // Remove the wildcard ".*"
const matchingEvents = elevenlabs_server_events.filter((e) => e.startsWith(prefix));
expandedEvents.push(...matchingEvents);
} else {
expandedEvents.push(evt);
}
});
return expandedEvents;
};
class TaskLlmElevenlabs_S2S extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts, parentTask);
this.parent = parentTask;
this.vendor = this.parent.vendor;
this.auth = this.parent.auth;
const {agent_id, api_key} = this.auth || {};
if (!agent_id) throw new Error('auth.agent_id is required for Elevenlabs S2S');
this.agent_id = agent_id;
this.api_key = api_key;
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
const {
conversation_initiation_client_data,
input_sample_rate = 16000,
output_sample_rate = 16000
} = this.data.llmOptions;
this.conversation_initiation_client_data = conversation_initiation_client_data;
this.input_sample_rate = input_sample_rate;
this.output_sample_rate = output_sample_rate;
this.results = {
completionReason: 'normal conversation end'
};
/**
* only one of these will have items,
* if includeEvents, then these are the events to include
* if excludeEvents, then these are the events to exclude
*/
this.includeEvents = [];
this.excludeEvents = [];
/* default to all events if user did not specify */
this._populateEvents(this.data.events || elevenlabs_server_events);
this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask);
this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask);
}
get name() { return TaskName; }
configureDialTranslate(opts) {
const {language, gain} = opts;
const {naturalVoice, translatedVoice} = gain;
this.replace_read = true;
this.audio_in_gain = parseDecibels(naturalVoice);
this.audio_injection_gain = parseDecibels(translatedVoice);
// override the agent prompt to ask for a translation
this.conversation_initiation_client_data = {
...(this.conversation_initiation_client_data || {}),
conversation_config_override: {
...(this.conversation_initiation_client_data?.conversation_config_override || {}),
agent: {
prompt: {
prompt: `Please translate the text to ${language}.
Your response should only include the ${language} translation, without any additional words:\n\n`
}
}
}
};
}
async getSignedUrl() {
if (!this.api_key) {
return {
host: 'api.elevenlabs.io',
path: `/v1/convai/conversation?agent_id=${this.agent_id}`,
};
}
const {statusCode, body} = await request(
`https://api.elevenlabs.io/v1/convai/conversation/get_signed_url?agent_id=${this.agent_id}`, {
method: 'GET',
headers: {
'xi-api-key': this.api_key
},
}
);
const data = await body.json();
if (statusCode !== 200 || !data?.signed_url) {
this.logger.error({statusCode, data}, 'Elevenlabs Error registering call');
throw new Error(`Elevenlabs Error registering call: ${data.message}`);
}
const url = new URL(data.signed_url);
return {
host: url.hostname,
path: url.pathname + url.search,
};
}
async _api(ep, args) {
const res = await ep.api('uuid_elevenlabs_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error({args}, `Error calling uuid_elevenlabs_s2s: ${res.body}`);
}
}
async exec(cs, {ep}) {
await super.exec(cs);
await this._startListening(cs, ep);
await this.awaitTaskDone();
/* note: the parent llm verb started the span, which is why this is necessary */
await this.parent.performAction(this.results);
this._unregisterHandlers();
}
async kill(cs) {
super.kill(cs);
this._api(cs.ep, [cs.ep.uuid, SessionDelete])
.catch((err) => this.logger.info({err}, 'TaskLlmElevenlabs_S2S:kill - error deleting session'));
this.notifyTaskDone();
}
/**
* Send function call output to the Elevenlabs server in the form of conversation.item.create
* per https://elevenlabs.io/docs/conversational-ai/api-reference/conversational-ai/websocket
*/
async processToolOutput(ep, tool_call_id, rawData) {
try {
const {data} = rawData;
this.logger.debug({tool_call_id, data}, 'TaskLlmElevenlabs_S2S:processToolOutput');
if (!data.type || data.type !== 'client_tool_result') {
this.logger.info({data},
'TaskLlmElevenlabs_S2S:processToolOutput - invalid tool output, must be client_tool_result');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmElevenlabs_S2S:processToolOutput');
}
}
/**
* Send a session.update to the Elevenlabs server
* Note: creating and deleting conversation items also supported as well as interrupting the assistant
*/
async processLlmUpdate(ep, data, _callSid) {
this.logger.debug({data, _callSid}, 'TaskLlmElevenlabs_S2S:processLlmUpdate, ignored');
}
async _startListening(cs, ep) {
this._registerHandlers(ep);
try {
const {host, path} = await this.getSignedUrl();
const args = [ep.uuid, 'session.create', this.input_sample_rate, this.output_sample_rate, host, path,
...(this.replace_read ? ['replace_read', this.audio_in_gain, this.audio_injection_gain] : [])];
await this._api(ep, args);
} catch (err) {
this.logger.error({err}, 'TaskLlmElevenlabs_S2S:_startListening');
this.notifyTaskDone();
}
}
async _sendClientEvent(ep, obj) {
let ok = true;
this.logger.debug({obj}, 'TaskLlmElevenlabs_S2S:_sendClientEvent');
try {
const args = [ep.uuid, ClientEvent, JSON.stringify(obj)];
await this._api(ep, args);
} catch (err) {
ok = false;
this.logger.error({err}, 'TaskLlmElevenlabs_S2S:_sendClientEvent - Error');
}
return ok;
}
async _sendInitialMessage(ep) {
if (this.conversation_initiation_client_data) {
if (!await this._sendClientEvent(ep, {
type: 'conversation_initiation_client_data',
conversation_initiation_client_data: this.conversation_initiation_client_data
})) {
this.notifyTaskDone();
}
}
}
_registerHandlers(ep) {
this.addCustomEventListener(ep, LlmEvents_Elevenlabs.Connect, this._onConnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Elevenlabs.ConnectFailure, this._onConnectFailure.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Elevenlabs.Disconnect, this._onDisconnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Elevenlabs.ServerEvent, this._onServerEvent.bind(this, ep));
}
_unregisterHandlers() {
this.removeCustomEventListeners();
}
_onError(ep, evt) {
this.logger.info({evt}, 'TaskLlmElevenlabs_S2S:_onError');
this.notifyTaskDone();
}
_onConnect(ep) {
this.logger.debug('TaskLlmElevenlabs_S2S:_onConnect');
this._sendInitialMessage(ep);
}
_onConnectFailure(_ep, evt) {
this.logger.info(evt, 'TaskLlmElevenlabs_S2S:_onConnectFailure');
this.results = {completionReason: 'connection failure'};
this.notifyTaskDone();
}
_onDisconnect(_ep, evt) {
this.logger.info(evt, 'TaskLlmElevenlabs_S2S:_onConnectFailure');
this.results = {completionReason: 'disconnect from remote end'};
this.notifyTaskDone();
}
async _onServerEvent(ep, evt) {
let endConversation = false;
const type = evt.type;
this.logger.info({evt}, 'TaskLlmElevenlabs_S2S:_onServerEvent');
if (type === 'error') {
endConversation = true;
this.results = {
completionReason: 'server error',
error: evt.error
};
}
/* tool calls */
else if (type === 'client_tool_call') {
this.logger.debug({evt}, 'TaskLlmElevenlabs_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmElevenlabs_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {client_tool_call} = evt;
const {tool_name: name, tool_call_id: call_id, parameters: args} = client_tool_call;
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmElevenlabs_S2S - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err
};
endConversation = true;
}
}
}
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err},
'TaskLlmElevenlabs_S2S:_onServerEvent - error sending event hook'));
}
if (endConversation) {
this.logger.info({results: this.results},
'TaskLlmElevenlabs_S2S:_onServerEvent - ending conversation due to error');
this.notifyTaskDone();
}
}
_populateEvents(events) {
if (events.includes('all')) {
/* work by excluding specific events */
const exclude = events
.filter((evt) => evt.startsWith('-'))
.map((evt) => evt.slice(1));
if (exclude.length === 0) this.includeEvents = elevenlabs_server_events;
else this.excludeEvents = expandWildcards(exclude);
}
else {
/* work by including specific events */
const include = events
.filter((evt) => !evt.startsWith('-'));
this.includeEvents = expandWildcards(include);
}
this.logger.debug({
includeEvents: this.includeEvents,
excludeEvents: this.excludeEvents
}, 'TaskLlmElevenlabs_S2S:_populateEvents');
}
}
module.exports = TaskLlmElevenlabs_S2S;

View File

@@ -1,357 +0,0 @@
const Task = require('../../task');
const TaskName = 'Llm_OpenAI_s2s';
const {LlmEvents_OpenAI} = require('../../../utils/constants');
const ClientEvent = 'client.event';
const SessionDelete = 'session.delete';
const openai_server_events = [
'error',
'session.created',
'session.updated',
'conversation.created',
'input_audio_buffer.committed',
'input_audio_buffer.cleared',
'input_audio_buffer.speech_started',
'input_audio_buffer.speech_stopped',
'conversation.item.created',
'conversation.item.input_audio_transcription.completed',
'conversation.item.input_audio_transcription.failed',
'conversation.item.truncated',
'conversation.item.deleted',
'response.created',
'response.done',
'response.output_item.added',
'response.output_item.done',
'response.content_part.added',
'response.content_part.done',
'response.text.delta',
'response.text.done',
'response.audio_transcript.delta',
'response.audio_transcript.done',
'response.audio.delta',
'response.audio.done',
'response.function_call_arguments.delta',
'response.function_call_arguments.done',
'rate_limits.updated',
'output_audio.playback_started',
'output_audio.playback_stopped',
];
const expandWildcards = (events) => {
const expandedEvents = [];
events.forEach((evt) => {
if (evt.endsWith('.*')) {
const prefix = evt.slice(0, -2); // Remove the wildcard ".*"
const matchingEvents = openai_server_events.filter((e) => e.startsWith(prefix));
expandedEvents.push(...matchingEvents);
} else {
expandedEvents.push(evt);
}
});
return expandedEvents;
};
class TaskLlmOpenAI_S2S extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts, parentTask);
this.parent = parentTask;
this.vendor = this.parent.vendor;
this.model = this.parent.model || 'gpt-4o-realtime-preview-2024-12-17';
this.auth = this.parent.auth;
this.connectionOptions = this.parent.connectOptions;
const {apiKey} = this.auth || {};
if (!apiKey) throw new Error('auth.apiKey is required for OpenAI S2S');
if (['openai', 'microsoft'].indexOf(this.vendor) === -1) {
throw new Error(`Invalid vendor ${this.vendor} for OpenAI S2S`);
}
if ('microsoft' === this.vendor && !this.connectionOptions?.host) {
throw new Error('connectionOptions.host is required for Microsoft OpenAI S2S');
}
this.apiKey = apiKey;
this.authType = 'microsoft' === this.vendor ? 'query' : 'bearer';
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
const {response_create, session_update} = this.data.llmOptions;
if (typeof response_create !== 'object') {
throw new Error('llmOptions with an initial response.create is required for OpenAI S2S');
}
this.response_create = response_create;
this.session_update = session_update;
this.results = {
completionReason: 'normal conversation end'
};
/**
* only one of these will have items,
* if includeEvents, then these are the events to include
* if excludeEvents, then these are the events to exclude
*/
this.includeEvents = [];
this.excludeEvents = [];
/* default to all events if user did not specify */
this._populateEvents(this.data.events || openai_server_events);
this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask);
this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask);
}
get name() { return TaskName; }
get host() {
const {host} = this.connectionOptions || {};
return host || (this.vendor === 'openai' ? 'api.openai.com' : void 0);
}
get path() {
const {path} = this.connectionOptions || {};
if (path) return path;
switch (this.vendor) {
case 'openai':
return `v1/realtime?model=${this.model}`;
case 'microsoft':
return `openai/realtime?api-version=2024-10-01-preview&deployment=${this.model}`;
}
}
async _api(ep, args) {
const res = await ep.api('uuid_openai_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error({args}, `Error calling uuid_openai_s2s: ${res.body}`);
}
}
async exec(cs, {ep}) {
await super.exec(cs);
await this._startListening(cs, ep);
await this.awaitTaskDone();
/* note: the parent llm verb started the span, which is why this is necessary */
await this.parent.performAction(this.results);
this._unregisterHandlers();
}
async kill(cs) {
super.kill(cs);
this._api(cs.ep, [cs.ep.uuid, SessionDelete])
.catch((err) => this.logger.info({err}, 'TaskLlmOpenAI_S2S:kill - error deleting session'));
this.notifyTaskDone();
}
/**
* Send function call output to the OpenAI server in the form of conversation.item.create
* per https://platform.openai.com/docs/guides/realtime/function-calls
*/
async processToolOutput(ep, tool_call_id, data) {
try {
this.logger.debug({tool_call_id, data}, 'TaskLlmOpenAI_S2S:processToolOutput');
if (!data.type || data.type !== 'conversation.item.create') {
this.logger.info({data},
'TaskLlmOpenAI_S2S:processToolOutput - invalid tool output, must be conversation.item.create');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
// spec also recommends to send immediate response.create
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify({type: 'response.create'})]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmOpenAI_S2S:processToolOutput');
}
}
/**
* Send a session.update to the OpenAI server
* Note: creating and deleting conversation items also supported as well as interrupting the assistant
*/
async processLlmUpdate(ep, data, _callSid) {
try {
this.logger.debug({data, _callSid}, 'TaskLlmOpenAI_S2S:processLlmUpdate');
if (!data.type || ![
'session.update',
'conversation.item.create',
'conversation.item.delete',
'response.cancel'
].includes(data.type)) {
this.logger.info({data}, 'TaskLlmOpenAI_S2S:processLlmUpdate - invalid mid-call request');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmOpenAI_S2S:processLlmUpdate');
}
}
async _startListening(cs, ep) {
this._registerHandlers(ep);
try {
const args = [ep.uuid, 'session.create', this.host, this.path, this.authType, this.apiKey];
await this._api(ep, args);
} catch (err) {
this.logger.error({err}, 'TaskLlmOpenAI_S2S:_startListening');
this.notifyTaskDone();
}
}
async _sendClientEvent(ep, obj) {
let ok = true;
this.logger.debug({obj}, 'TaskLlmOpenAI_S2S:_sendClientEvent');
try {
const args = [ep.uuid, ClientEvent, JSON.stringify(obj)];
await this._api(ep, args);
} catch (err) {
ok = false;
this.logger.error({err}, 'TaskLlmOpenAI_S2S:_sendClientEvent - Error');
}
return ok;
}
async _sendInitialMessage(ep) {
let obj = {type: 'response.create', response: this.response_create};
if (!await this._sendClientEvent(ep, obj)) {
this.notifyTaskDone();
}
/* send immediate session.update if present */
else if (this.session_update) {
obj = {type: 'session.update', session: this.session_update};
this.logger.debug({obj}, 'TaskLlmOpenAI_S2S:_sendInitialMessage - sending session.update');
if (!await this._sendClientEvent(ep, obj)) {
this.notifyTaskDone();
}
}
}
_registerHandlers(ep) {
this.addCustomEventListener(ep, LlmEvents_OpenAI.Connect, this._onConnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_OpenAI.ConnectFailure, this._onConnectFailure.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_OpenAI.Disconnect, this._onDisconnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_OpenAI.ServerEvent, this._onServerEvent.bind(this, ep));
}
_unregisterHandlers() {
this.removeCustomEventListeners();
}
_onError(ep, evt) {
this.logger.info({evt}, 'TaskLlmOpenAI_S2S:_onError');
this.notifyTaskDone();
}
_onConnect(ep) {
this.logger.debug('TaskLlmOpenAI_S2S:_onConnect');
this._sendInitialMessage(ep);
}
_onConnectFailure(_ep, evt) {
this.logger.info(evt, 'TaskLlmOpenAI_S2S:_onConnectFailure');
this.results = {completionReason: 'connection failure'};
this.notifyTaskDone();
}
_onDisconnect(_ep, evt) {
this.logger.info(evt, 'TaskLlmOpenAI_S2S:_onConnectFailure');
this.results = {completionReason: 'disconnect from remote end'};
this.notifyTaskDone();
}
async _onServerEvent(ep, evt) {
let endConversation = false;
const type = evt.type;
this.logger.info({evt}, 'TaskLlmOpenAI_S2S:_onServerEvent');
/* check for failures, such as rate limit exceeded, that should terminate the conversation */
if (type === 'response.done' && evt.response.status === 'failed') {
endConversation = true;
this.results = {
completionReason: 'server failure',
error: evt.response.status_details?.error
};
}
/* server errors of some sort */
else if (type === 'error') {
endConversation = true;
this.results = {
completionReason: 'server error',
error: evt.error
};
}
/* tool calls */
else if (type === 'response.output_item.done' && evt.item?.type === 'function_call') {
this.logger.debug({evt}, 'TaskLlmOpenAI_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmOpenAI_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {name, call_id} = evt.item;
const args = JSON.parse(evt.item.arguments);
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmOpenAI - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err
};
endConversation = true;
}
}
}
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmOpenAI_S2S:_onServerEvent - error sending event hook'));
}
if (endConversation) {
this.logger.info({results: this.results}, 'TaskLlmOpenAI_S2S:_onServerEvent - ending conversation due to error');
this.notifyTaskDone();
}
}
_populateEvents(events) {
if (events.includes('all')) {
/* work by excluding specific events */
const exclude = events
.filter((evt) => evt.startsWith('-'))
.map((evt) => evt.slice(1));
if (exclude.length === 0) this.includeEvents = openai_server_events;
else this.excludeEvents = expandWildcards(exclude);
}
else {
/* work by including specific events */
const include = events
.filter((evt) => !evt.startsWith('-'));
this.includeEvents = expandWildcards(include);
}
this.logger.debug({
includeEvents: this.includeEvents,
excludeEvents: this.excludeEvents
}, 'TaskLlmOpenAI_S2S:_populateEvents');
}
}
module.exports = TaskLlmOpenAI_S2S;

View File

@@ -1,245 +0,0 @@
const Task = require('../../task');
const TaskName = 'Llm_Ultravox_s2s';
const {request} = require('undici');
const {LlmEvents_Ultravox} = require('../../../utils/constants');
const ultravox_server_events = [
'pong',
'state',
'transcript',
'conversationText',
'clientToolInvocation',
'playbackClearBuffer',
];
const ClientEvent = 'client.event';
const expandWildcards = (events) => {
// no-op for deepgram
return events;
};
const SessionDelete = 'session.delete';
class TaskLlmUltravox_S2S extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts, parentTask);
this.parent = parentTask;
this.vendor = this.parent.vendor;
this.model = this.parent.model || 'fixie-ai/ultravox';
this.auth = this.parent.auth;
this.connectionOptions = this.parent.connectOptions;
const {apiKey} = this.auth || {};
if (!apiKey) throw new Error('auth.apiKey is required for Vendor: Ultravox');
this.apiKey = apiKey;
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
/**
* only one of these will have items,
* if includeEvents, then these are the events to include
* if excludeEvents, then these are the events to exclude
*/
this.includeEvents = [];
this.excludeEvents = [];
/* default to all events if user did not specify */
this._populateEvents(this.data.events || ultravox_server_events);
this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask);
this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask);
}
get name() { return TaskName; }
async _api(ep, args) {
const res = await ep.api('uuid_ultravox_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error(`Error calling uuid_ultravox_s2s: ${JSON.stringify(res.body)}`);
}
}
async createCall() {
const payload = {
...this.data.llmOptions,
model: this.model,
medium: {
...(this.data.llmOptions.medium || {}),
serverWebSocket: {
inputSampleRate: 8000,
outputSampleRate: 8000,
}
}
};
const {statusCode, body} = await request('https://api.ultravox.ai/api/calls', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-API-Key': this.apiKey
},
body: JSON.stringify(payload)
});
const data = await body.json();
if (statusCode !== 201 || !data?.joinUrl) {
this.logger.error({statusCode, data}, 'Ultravox Error registering call');
throw new Error(`Ultravox Error registering call: ${data.message}`);
}
this.logger.info({joinUrl: data.joinUrl}, 'Ultravox Call registered');
return data.joinUrl;
}
_unregisterHandlers() {
this.removeCustomEventListeners();
}
_registerHandlers(ep) {
this.addCustomEventListener(ep, LlmEvents_Ultravox.Connect, this._onConnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Ultravox.ConnectFailure, this._onConnectFailure.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Ultravox.Disconnect, this._onDisconnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Ultravox.ServerEvent, this._onServerEvent.bind(this, ep));
}
async _startListening(cs, ep) {
this._registerHandlers(ep);
const joinUrl = await this.createCall();
// split the joinUrl into host and path
const {host, pathname, search} = new URL(joinUrl);
try {
const args = [ep.uuid, 'session.create', host, pathname + search];
await this._api(ep, args);
} catch (err) {
this.logger.error({err}, 'TaskLlmUltraVox_S2S:_startListening');
this.notifyTaskDone();
}
}
async exec(cs, {ep}) {
await super.exec(cs);
await this._startListening(cs, ep);
await this.awaitTaskDone();
/* note: the parent llm verb started the span, which is why this is necessary */
await this.parent.performAction(this.results);
this._unregisterHandlers();
}
async kill(cs) {
super.kill(cs);
this._api(cs.ep, [cs.ep.uuid, SessionDelete])
.catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:kill - error deleting session'));
this.notifyTaskDone();
}
_onConnect(ep) {
this.logger.debug('TaskLlmUltravox_S2S:_onConnect');
}
_onConnectFailure(_ep, evt) {
this.logger.info(evt, 'TaskLlmUltravox_S2S:_onConnectFailure');
this.results = {completionReason: 'connection failure'};
this.notifyTaskDone();
}
_onDisconnect(_ep, evt) {
this.logger.info(evt, 'TaskLlmUltravox_S2S:_onConnectFailure');
this.results = {completionReason: 'disconnect from remote end'};
this.notifyTaskDone();
}
async _onServerEvent(_ep, evt) {
let endConversation = false;
const type = evt.type;
this.logger.info({evt}, 'TaskLlmUltravox_S2S:_onServerEvent');
/* server errors of some sort */
if (type === 'error') {
endConversation = true;
this.results = {
completionReason: 'server error',
error: evt.error
};
}
/* tool calls */
else if (type === 'client_tool_invocation') {
this.logger.debug({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {toolName: name, invocationId: call_id, parameters: args} = evt;
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmUltravox_S2S - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err
};
endConversation = true;
}
}
}
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:_onServerEvent - error sending event hook'));
}
if (endConversation) {
this.logger.info({results: this.results},
'TaskLlmUltravox_S2S:_onServerEvent - ending conversation due to error');
this.notifyTaskDone();
}
}
async processToolOutput(ep, tool_call_id, data) {
try {
this.logger.debug({tool_call_id, data}, 'TaskLlmUltravox_S2S:processToolOutput');
if (!data.type || data.type !== 'client_tool_result') {
this.logger.info({data},
'TaskLlmUltravox_S2S:processToolOutput - invalid tool output, must be client_tool_result');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmUltravox_S2S:processToolOutput');
}
}
_populateEvents(events) {
if (events.includes('all')) {
/* work by excluding specific events */
const exclude = events
.filter((evt) => evt.startsWith('-'))
.map((evt) => evt.slice(1));
if (exclude.length === 0) this.includeEvents = ultravox_server_events;
else this.excludeEvents = expandWildcards(exclude);
}
else {
/* work by including specific events */
const include = events
.filter((evt) => !evt.startsWith('-'));
this.includeEvents = expandWildcards(include);
}
this.logger.debug({
includeEvents: this.includeEvents,
excludeEvents: this.excludeEvents
}, 'TaskLlmUltravox_S2S:_populateEvents');
}
}
module.exports = TaskLlmUltravox_S2S;

View File

@@ -1,313 +0,0 @@
const Task = require('../../task');
const TaskName = 'Llm_VoiceAgent_s2s';
const {LlmEvents_VoiceAgent} = require('../../../utils/constants');
const ClientEvent = 'client.event';
const SessionDelete = 'session.delete';
const va_server_events = [
'Error',
'Welcome',
'SettingsApplied',
'ConversationText',
'UserStartedSpeaking',
'EndOfThought',
'AgentThinking',
'FunctionCallRequest',
'FunctionCalling',
'AgentStartedSpeaking',
'AgentAudioDone',
];
const expandWildcards = (events) => {
// no-op for deepgram
return events;
};
class TaskLlmVoiceAgent_S2S extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts, parentTask);
this.parent = parentTask;
this.vendor = this.parent.vendor;
this.model = this.parent.model;
this.auth = this.parent.auth;
this.connectionOptions = this.parent.connectOptions;
const {apiKey} = this.auth || {};
if (!apiKey) throw new Error('auth.apiKey is required for VoiceAgent S2S');
this.apiKey = apiKey;
this.authType = 'bearer';
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
const {settingsConfiguration} = this.data.llmOptions;
if (typeof settingsConfiguration !== 'object') {
throw new Error('llmOptions with an initial settingsConfiguration is required for VoiceAgent S2S');
}
// eslint-disable-next-line no-unused-vars
const {audio, ...rest} = settingsConfiguration;
const cfg = this.settingsConfiguration = rest;
if (!cfg.agent) throw new Error('llmOptions.settingsConfiguration.agent is required for VoiceAgent S2S');
if (!cfg.agent.think) {
throw new Error('llmOptions.settingsConfiguration.agent.think is required for VoiceAgent S2S');
}
if (!cfg.agent.think.model) {
throw new Error('llmOptions.settingsConfiguration.agent.think.model is required for VoiceAgent S2S');
}
if (!cfg.agent.think.provider?.type) {
throw new Error('llmOptions.settingsConfiguration.agent.think.provider.type is required for VoiceAgent S2S');
}
this.results = {
completionReason: 'normal conversation end'
};
/**
* only one of these will have items,
* if includeEvents, then these are the events to include
* if excludeEvents, then these are the events to exclude
*/
this.includeEvents = [];
this.excludeEvents = [];
/* default to all events if user did not specify */
this._populateEvents(this.data.events || va_server_events);
this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask);
this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask);
}
get name() { return TaskName; }
get host() {
const {host} = this.connectionOptions || {};
return host || 'agent.deepgram.com';
}
get path() {
const {path} = this.connectionOptions || {};
if (path) return path;
return '/agent';
}
async _api(ep, args) {
const res = await ep.api('uuid_voice_agent_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error(`Error calling uuid_voice_agent_s2s: ${JSON.stringify(res.body)}`);
}
}
async exec(cs, {ep}) {
await super.exec(cs);
await this._startListening(cs, ep);
await this.awaitTaskDone();
/* note: the parent llm verb started the span, which is why this is necessary */
await this.parent.performAction(this.results);
this._unregisterHandlers();
}
async kill(cs) {
super.kill(cs);
this._api(cs.ep, [cs.ep.uuid, SessionDelete])
.catch((err) => this.logger.info({err}, 'TaskLlmVoiceAgent_S2S:kill - error deleting session'));
this.notifyTaskDone();
}
/**
* Send function call response to the VoiceAgent server
*/
async processToolOutput(ep, tool_call_id, data) {
try {
const {data:response} = data;
this.logger.debug({tool_call_id, response}, 'TaskLlmVoiceAgent_S2S:processToolOutput');
if (!response.type || response.type !== 'FunctionCallResponse') {
this.logger.info({response},
'TaskLlmVoiceAgent_S2S:processToolOutput - invalid tool output, must be FunctionCallResponse');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(response)]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmVoiceAgent_S2S:processToolOutput');
}
}
/**
* Send a session.update to the VoiceAgent server
* Note: creating and deleting conversation items also supported as well as interrupting the assistant
*/
async processLlmUpdate(ep, data, _callSid) {
try {
this.logger.debug({data, _callSid}, 'TaskLlmVoiceAgent_S2S:processLlmUpdate');
if (!data.type || ![
'UpdateInstructions',
'UpdateSpeak',
'InjectAgentMessage',
].includes(data.type)) {
this.logger.info({data}, 'TaskLlmVoiceAgent_S2S:processLlmUpdate - invalid mid-call request');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
}
} catch (err) {
this.logger.info({err}, 'TaskLlmVoiceAgent_S2S:processLlmUpdate');
}
}
async _startListening(cs, ep) {
this._registerHandlers(ep);
try {
const args = [ep.uuid, 'session.create', this.host, this.path, this.authType, this.apiKey];
await this._api(ep, args);
} catch (err) {
this.logger.error({err}, `TaskLlmVoiceAgent_S2S:_startListening: ${JSON.stringify(err)}`);
this.notifyTaskDone();
}
}
async _sendClientEvent(ep, obj) {
let ok = true;
this.logger.debug({obj}, 'TaskLlmVoiceAgent_S2S:_sendClientEvent');
try {
const args = [ep.uuid, ClientEvent, JSON.stringify(obj)];
await this._api(ep, args);
} catch (err) {
ok = false;
this.logger.error({err}, 'TaskLlmVoiceAgent_S2S:_sendClientEvent - Error');
}
return ok;
}
async _sendInitialMessage(ep) {
if (!await this._sendClientEvent(ep, this.settingsConfiguration)) {
this.notifyTaskDone();
}
}
_registerHandlers(ep) {
this.addCustomEventListener(ep, LlmEvents_VoiceAgent.Connect, this._onConnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_VoiceAgent.ConnectFailure, this._onConnectFailure.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_VoiceAgent.Disconnect, this._onDisconnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_VoiceAgent.ServerEvent, this._onServerEvent.bind(this, ep));
}
_unregisterHandlers() {
this.removeCustomEventListeners();
}
_onError(_ep, evt) {
this.logger.info({evt}, 'TaskLlmVoiceAgent_S2S:_onError');
this.notifyTaskDone();
}
_onConnect(ep) {
this.logger.debug('TaskLlmVoiceAgent_S2S:_onConnect');
this._sendInitialMessage(ep);
}
_onConnectFailure(_ep, evt) {
this.logger.info(evt, 'TaskLlmVoiceAgent_S2S:_onConnectFailure');
this.results = {completionReason: 'connection failure'};
this.notifyTaskDone();
}
_onDisconnect(_ep, evt) {
this.logger.info(evt, 'TaskLlmVoiceAgent_S2S:_onConnectFailure');
this.results = {completionReason: 'disconnect from remote end'};
this.notifyTaskDone();
}
async _onServerEvent(_ep, evt) {
let endConversation = false;
const type = evt.type;
this.logger.info({evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent');
/* check for failures, such as rate limit exceeded, that should terminate the conversation */
if (type === 'response.done' && evt.response.status === 'failed') {
endConversation = true;
this.results = {
completionReason: 'server failure',
error: evt.response.status_details?.error
};
}
/* server errors of some sort */
else if (type === 'error') {
endConversation = true;
this.results = {
completionReason: 'server error',
error: evt.error
};
}
/* tool calls */
else if (type === 'FunctionCallRequest') {
this.logger.debug({evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {function_name:name, function_call_id:call_id} = evt;
const args = evt.input;
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmVoiceAgent - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err
};
endConversation = true;
}
}
}
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - error sending event hook'));
}
if (endConversation) {
this.logger.info({results: this.results},
'TaskLlmVoiceAgent_S2S:_onServerEvent - ending conversation due to error');
this.notifyTaskDone();
}
}
_populateEvents(events) {
if (events.includes('all')) {
/* work by excluding specific events */
const exclude = events
.filter((evt) => evt.startsWith('-'))
.map((evt) => evt.slice(1));
if (exclude.length === 0) this.includeEvents = va_server_events;
else this.excludeEvents = expandWildcards(exclude);
}
else {
/* work by including specific events */
const include = events
.filter((evt) => !evt.startsWith('-'));
this.includeEvents = expandWildcards(include);
}
this.logger.debug({
includeEvents: this.includeEvents,
excludeEvents: this.excludeEvents
}, 'TaskLlmVoiceAgent_S2S:_populateEvents');
}
}
module.exports = TaskLlmVoiceAgent_S2S;

View File

@@ -62,9 +62,6 @@ function makeTask(logger, obj, parent) {
case TaskName.Message:
const TaskMessage = require('./message');
return new TaskMessage(logger, data, parent);
case TaskName.Llm:
const TaskLlm = require('./llm');
return new TaskLlm(logger, data, parent);
case TaskName.Rasa:
const TaskRasa = require('./rasa');
return new TaskRasa(logger, data, parent);
@@ -84,7 +81,6 @@ function makeTask(logger, obj, parent) {
const TaskTranscribe = require('./transcribe');
return new TaskTranscribe(logger, data, parent);
case TaskName.Listen:
case TaskName.Stream:
const TaskListen = require('./listen');
return new TaskListen(logger, data, parent);
case TaskName.Redirect:

View File

@@ -1,6 +1,5 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const { PlayFileNotFoundError } = require('../utils/error');
class TaskPlay extends Task {
constructor(logger, opts) {
@@ -67,20 +66,8 @@ class TaskPlay extends Task {
}
}
} catch (err) {
this.logger.info(`TaskPlay:exec - error playing ${this.url}: ${err.message}`);
this.playComplete = true;
if (err.message === 'File Not Found') {
const {writeAlerts, AlertType} = cs.srf.locals;
await this.performAction({status: 'fail', reason: 'playFailed'}, !(this.parentTask || cs.isConfirmCallSession));
this.emit('playDone');
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.PLAY_FILENOTFOUND,
url: this.url,
target_sid: cs.callSid
});
throw new PlayFileNotFoundError(this.url);
}
if (timeout) clearTimeout(timeout);
this.logger.info(err, `TaskPlay:exec - error playing ${this.url}`);
}
this.emit('playDone');
}

View File

@@ -1,6 +1,5 @@
const Task = require('./task');
const {TaskName} = require('../utils/constants');
const WsRequestor = require('../utils/ws-requestor');
/**
* Redirects to a new application
@@ -14,17 +13,6 @@ class TaskRedirect extends Task {
async exec(cs) {
await super.exec(cs);
if (cs.requestor instanceof WsRequestor && cs.application.requestor._isAbsoluteUrl(this.actionHook)) {
this.logger.info(`Task:performAction redirecting to ${this.actionHook}, requires new ws connection`);
try {
this.cs.requestor.close();
const requestor = new WsRequestor(this.logger, cs.accountSid, {url: this.actionHook}, this.webhook_secret) ;
this.cs.application.requestor = requestor;
} catch (err) {
this.logger.info(err, `Task:performAction error redirecting to ${this.actionHook}`);
}
}
await this.performAction();
}
}

View File

@@ -12,7 +12,6 @@ class TaskRestDial extends Task {
this.from = this.data.from;
this.callerName = this.data.callerName;
this.timeLimit = this.data.timeLimit;
this.fromHost = this.data.fromHost;
this.to = this.data.to;
this.call_hook = this.data.call_hook;
@@ -40,9 +39,9 @@ class TaskRestDial extends Task {
if (this.data.amd) {
this.startAmd = cs.startAmd;
this.stopAmd = cs.stopAmd;
this.on('amd', this._onAmdEvent.bind(this, cs));
}
this.stopAmd = cs.stopAmd;
this._setCallTimer();
await this.awaitTaskDone();
@@ -67,9 +66,6 @@ class TaskRestDial extends Task {
const cs = this.callSession;
cs.setDialog(dlg);
cs.referHook = this.referHook;
if (this.timeLimit) {
cs.startMaxCallDurationTimer(this.timeLimit);
}
this.logger.debug('TaskRestDial:_onConnect - call connected');
if (this.sipRequestWithinDialogHook) this._initSipRequestWithinDialogHandler(cs, dlg);
try {
@@ -81,13 +77,11 @@ class TaskRestDial extends Task {
synthesizer: {
vendor: cs.speechSynthesisVendor,
language: cs.speechSynthesisLanguage,
voice: cs.speechSynthesisVoice,
label: cs.speechSynthesisLabel,
voice: cs.speechSynthesisVoice
},
recognizer: {
vendor: cs.speechRecognizerVendor,
language: cs.speechRecognizerLanguage,
label: cs.speechRecognizerLabel,
language: cs.speechRecognizerLanguage
}
}
};

View File

@@ -1,8 +1,6 @@
const assert = require('assert');
const TtsTask = require('./tts-task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const pollySSMLSplit = require('polly-ssml-split');
const { SpeechCredentialError } = require('../utils/error');
const breakLengthyTextIfNeeded = (logger, text) => {
const chunkSize = 1000;
@@ -36,40 +34,24 @@ class TaskSay extends TtsTask {
super(logger, opts, parentTask);
this.preconditions = TaskPreconditions.Endpoint;
assert.ok((typeof this.data.text === 'string' || Array.isArray(this.data.text)) || this.data.stream === true,
'Say: either text or stream:true is required');
this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text])
.map((t) => breakLengthyTextIfNeeded(this.logger, t))
.flat();
if (this.data.stream === true) {
this._isStreamingTts = true;
this.closeOnStreamEmpty = this.data.closeOnStreamEmpty !== false;
}
else {
this._isStreamingTts = false;
this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text])
.map((t) => breakLengthyTextIfNeeded(this.logger, t))
.flat();
this.loop = this.data.loop || 1;
this.isHandledByPrimaryProvider = true;
}
this.loop = this.data.loop || 1;
this.isHandledByPrimaryProvider = true;
}
get name() { return TaskName.Say; }
get summary() {
if (this.isStreamingTts) return `${this.name} streaming`;
else {
for (let i = 0; i < this.text.length; i++) {
if (this.text[i].startsWith('silence_stream')) continue;
return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`;
}
return `${this.name}{${this.text[0]}}`;
for (let i = 0; i < this.text.length; i++) {
if (this.text[i].startsWith('silence_stream')) continue;
return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`;
}
return `${this.name}{${this.text[0]}}`;
}
get isStreamingTts() { return this._isStreamingTts; }
_validateURL(urlString) {
try {
new URL(urlString);
@@ -79,58 +61,147 @@ class TaskSay extends TtsTask {
}
}
async exec(cs, obj) {
if (this.isStreamingTts && !cs.appIsUsingWebsockets) {
throw new Error('Say: streaming say verb requires applications to use the websocket API');
}
async _synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label, preCache = false}) {
const {srf, accountSid:account_sid} = cs;
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, srf);
const {writeAlerts, AlertType, stats} = srf.locals;
const {synthAudio} = srf.locals.dbHelpers;
const engine = this.synthesizer.engine || cs.synthesizer?.engine || 'neural';
const salt = cs.callSid;
try {
if (this.isStreamingTts) await this.handlingStreaming(cs, obj);
else await this.handling(cs, obj);
this.emit('playDone');
} catch (error) {
if (error instanceof SpeechCredentialError) {
// if say failed due to speech credentials, alarm is writtern and error notification is sent
// finished this say to move to next task.
this.logger.info({error}, 'Say failed due to SpeechCredentialError, finished!');
this.emit('playDone');
return;
let credentials = cs.getSpeechCredentials(vendor, 'tts', label);
/* parse Nuance voices into name and model */
let model;
if (vendor === 'nuance' && voice) {
const arr = /([A-Za-z-]*)\s+-\s+(enhanced|standard)/.exec(voice);
if (arr) {
voice = arr[1];
model = arr[2];
}
throw error;
}
}
async handlingStreaming(cs, {ep}) {
const {vendor, language, voice, label} = this.getTtsVendorData(cs);
const credentials = cs.getSpeechCredentials(vendor, 'tts', label);
if (!credentials) {
throw new SpeechCredentialError(
`No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`);
} else if (vendor === 'deepgram') {
model = voice;
}
/* allow for microsoft custom region voice and api_key to be specified as an override */
if (vendor === 'microsoft' && this.options.deploymentId) {
credentials = credentials || {};
credentials.use_custom_tts = true;
credentials.custom_tts_endpoint = this.options.deploymentId;
credentials.api_key = this.options.apiKey || credentials.apiKey;
credentials.region = this.options.region || credentials.region;
voice = this.options.voice || voice;
} else if (vendor === 'elevenlabs') {
credentials = credentials || {};
credentials.model_id = this.options.model_id || credentials.model_id;
credentials.voice_settings = this.options.voice_settings || {};
credentials.optimize_streaming_latency = this.options.optimize_streaming_latency
|| credentials.optimize_streaming_latency;
voice = this.options.voice_id || voice;
}
ep.set({
tts_engine: vendor.startsWith('custom:') ? 'custom' : vendor,
tts_voice: voice,
cache_speech_handles: !cs.currentTtsVendor || cs.currentTtsVendor === vendor ? 1 : 0,
}).catch((err) => this.logger.info({err}, 'Error setting tts_engine on endpoint'));
// set the current vendor on the call session
// If vendor is changed from the previous one, then reset the cache_speech_handles flag
cs.currentTtsVendor = vendor;
if (!preCache && !this._disableTracing) this.logger.info({vendor, language, voice, model}, 'TaskSay:exec');
try {
if (!credentials) {
writeAlerts({
account_sid,
alert_type: AlertType.TTS_NOT_PROVISIONED,
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
throw new Error('no provisioned speech credentials for TTS');
}
// synthesize all of the text elements
let lastUpdated = false;
await this.setTtsStreamingChannelVars(vendor, language, voice, credentials, ep);
/* produce an audio segment from the provided text */
const generateAudio = async(text) => {
if (this.killed) return;
if (text.startsWith('silence_stream://')) return text;
await cs.startTtsStream();
/* otel: trace time for tts */
if (!preCache && !this._disableTracing) {
const {span} = this.startChildSpan('tts-generation', {
'tts.vendor': vendor,
'tts.language': language,
'tts.voice': voice
});
this.otelSpan = span;
}
try {
const {filePath, servedFromCache, rtt} = await synthAudio(stats, {
account_sid,
text,
vendor,
language,
voice,
engine,
model,
salt,
credentials,
options: this.options,
disableTtsCache : this.disableTtsCache,
preCache
});
if (!filePath.startsWith('say:')) {
this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (this.otelSpan) {
this.otelSpan.setAttributes({'tts.cached': servedFromCache});
this.otelSpan.end();
this.otelSpan = null;
}
if (!servedFromCache && !lastUpdated) {
lastUpdated = true;
updateSpeechCredentialLastUsed(credentials.speech_credential_sid).catch(() => {/* logged error */});
}
if (!servedFromCache && rtt && !preCache && !this._disableTracing) {
this.notifyStatus({
event: 'synthesized-audio',
vendor,
language,
characters: text.length,
elapsedTime: rtt
});
}
}
else {
this.logger.debug('Say: a streaming tts api will be used');
const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`);
return modifiedPath;
}
return filePath;
} catch (err) {
this.logger.info({err}, 'Error synthesizing tts');
if (this.otelSpan) this.otelSpan.end();
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.TTS_FAILURE,
vendor,
detail: err.message
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
throw err;
}
};
cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'})
.catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending'));
const arr = this.text.map((t) => (this._validateURL(t) ? t : generateAudio(t)));
return (await Promise.all(arr)).filter((fp) => fp && fp.length);
} catch (err) {
this.logger.info({err}, 'TaskSay:handlingStreaming - Error setting channel vars');
cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})
.catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending'));
//TODO: send tts:streaming-event with error?
this.notifyTaskDone();
this.logger.info(err, 'TaskSay:exec error');
throw err;
}
await this.awaitTaskDone();
this.logger.info('TaskSay:handlingStreaming - done');
}
async handling(cs, {ep}) {
const {srf, accountSid:account_sid, callSid:target_sid} = cs;
async exec(cs, {ep}) {
const {srf, accountSid:account_sid} = cs;
const {writeAlerts, AlertType} = srf.locals;
const {addFileToCache} = srf.locals.dbHelpers;
const engine = this.synthesizer.engine || cs.synthesizer?.engine || 'neural';
@@ -147,7 +218,10 @@ class TaskSay extends TtsTask {
let voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ?
this.synthesizer.voice :
cs.speechSynthesisVoice;
let label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel;
// label can be null/empty in synthesizer config, just use application level label if it's default
let label = this.synthesizer.label === 'default' ?
cs.speechSynthesisLabel :
this.synthesizer.label;
const fallbackVendor = this.synthesizer.fallbackVendor && this.synthesizer.fallbackVendor !== 'default' ?
this.synthesizer.fallbackVendor :
@@ -158,8 +232,10 @@ class TaskSay extends TtsTask {
const fallbackVoice = this.synthesizer.fallbackVoice && this.synthesizer.fallbackVoice !== 'default' ?
this.synthesizer.fallbackVoice :
cs.fallbackSpeechSynthesisVoice;
const fallbackLabel = this.taskIncludeSynthesizer ?
this.synthesizer.fallbackLabel : cs.fallbackSpeechSynthesisLabel;
// label can be null/empty in synthesizer config, just use application level label if it's default
const fallbackLabel = this.synthesizer.fallbackLabel === 'default' ?
cs.fallbackSpeechSynthesisLabel :
this.synthesizer.fallbackLabel;
if (cs.hasFallbackTts) {
vendor = fallbackVendor;
@@ -185,7 +261,7 @@ class TaskSay extends TtsTask {
} else {
this.notifyError(
{ msg: 'TTS error', details:`TTS vendor ${vendor} error: ${error}`, failover: 'not available'});
throw new SpeechCredentialError(error.message);
throw error;
}
};
let filepath;
@@ -204,12 +280,12 @@ class TaskSay extends TtsTask {
await this.playToConfMember(ep, memberId, confName, confUuid, filepath[segment]);
}
else {
const isStreaming = filepath[segment].startsWith('say:{');
if (isStreaming) {
let tts_cache_filename;
if (filepath[segment].startsWith('say:{')) {
const arr = /^say:\{.*\}\s*(.*)$/.exec(filepath[segment]);
if (arr) this.logger.debug(`Say:exec sending streaming tts request: ${arr[1].substring(0, 64)}..`);
}
else this.logger.debug(`Say:exec sending ${filepath[segment].substring(0, 64)}`);
else this.logger.debug(`Say:exec sending ${filepath[segment].substring(0, 64)}`);
ep.once('playback-start', (evt) => {
this.logger.debug({evt}, 'Say got playback-start');
if (this.otelSpan) {
@@ -217,34 +293,40 @@ class TaskSay extends TtsTask {
this.otelSpan.end();
this.otelSpan = null;
if (evt.variable_tts_cache_filename) {
tts_cache_filename = evt.variable_tts_cache_filename;
cs.trackTmpFile(evt.variable_tts_cache_filename);
}
else {
this.logger.info('No tts_cache_filename in playback-start event');
}
}
});
ep.once('playback-stop', (evt) => {
this.logger.debug({evt}, 'Say got playback-stop');
if (evt.variable_tts_error) {
writeAlerts({
account_sid,
alert_type: AlertType.TTS_FAILURE,
vendor,
detail: evt.variable_tts_error,
target_sid
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
if (!tts_cache_filename || evt.variable_tts_cache_filename !== tts_cache_filename) {
this.logger.info({evt}, 'Say: discarding playback-stop from other say verb');
}
if (evt.variable_tts_cache_filename && !this.killed) {
const text = parseTextFromSayString(this.text[segment]);
addFileToCache(evt.variable_tts_cache_filename, {
account_sid,
vendor,
language,
voice,
engine,
model: this.model || this.model_id,
text
}).catch((err) => this.logger.info({err}, 'Error adding file to cache'));
else {
this.logger.debug({evt}, 'Say got playback-stop');
if (evt.variable_tts_error) {
writeAlerts({
account_sid,
alert_type: AlertType.TTS_FAILURE,
vendor,
detail: evt.variable_tts_error
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
}
if (evt.variable_tts_cache_filename && !this.killed) {
const text = parseTextFromSayString(this.text[segment]);
addFileToCache(evt.variable_tts_cache_filename, {
account_sid,
vendor,
language,
voice,
engine,
text
}).catch((err) => this.logger.info({err}, 'Error adding file to cache'));
}
}
if (this._playResolve) {
evt.variable_tts_error ? this._playReject(new Error(evt.variable_tts_error)) : this._playResolve();
}
@@ -265,7 +347,6 @@ class TaskSay extends TtsTask {
continue;
} catch (err) {
this.logger.info({err}, 'Error waiting for playback-stop event');
throw err;
}
} finally {
this._playPromise = null;
@@ -283,6 +364,7 @@ class TaskSay extends TtsTask {
segment++;
}
}
this.emit('playDone');
}
async kill(cs) {
@@ -305,7 +387,6 @@ class TaskSay extends TtsTask {
this._playResolve = null;
}
}
this.notifyTaskDone();
}
_addStreamingTtsAttributes(span, evt) {
@@ -316,7 +397,6 @@ class TaskSay extends TtsTask {
.replace('whisper_', 'whisper.')
.replace('deepgram_', 'deepgram.')
.replace('playht_', 'playht.')
.replace('cartesia_', 'cartesia.')
.replace('rimelabs_', 'rimelabs.')
.replace('verbio_', 'verbio.')
.replace('elevenlabs_', 'elevenlabs.');
@@ -327,13 +407,6 @@ class TaskSay extends TtsTask {
delete attrs['cache_filename']; //no value in adding this to the span
span.setAttributes(attrs);
}
notifyTtsStreamIsEmpty() {
if (this.isStreamingTts && this.closeOnStreamEmpty) {
this.logger.info('TaskSay:notifyTtsStreamIsEmpty - stream is empty, killing task');
this.notifyTaskDone();
}
}
}
const spanMapping = {
@@ -370,11 +443,6 @@ const spanMapping = {
'playht.name_lookup_time_ms': 'name_lookup_ms',
'playht.connect_time_ms': 'connect_ms',
'playht.final_response_time_ms': 'final_response_ms',
// Cartesia
'cartesia.request_id': 'cartesia.req_id',
'cartesia.name_lookup_time_ms': 'name_lookup_ms',
'cartesia.connect_time_ms': 'connect_ms',
'cartesia.final_response_time_ms': 'final_response_ms',
// Rimelabs
'rimelabs.name_lookup_time_ms': 'name_lookup_ms',
'rimelabs.connect_time_ms': 'connect_ms',

View File

@@ -12,7 +12,6 @@ class TaskSipRefer extends Task {
this.referTo = this.data.referTo;
this.referredBy = this.data.referredBy;
this.referredByDisplayName = this.data.referredByDisplayName;
this.headers = this.data.headers || {};
this.eventHook = this.data.eventHook;
}
@@ -95,10 +94,7 @@ class TaskSipRefer extends Task {
}
if (status >= 200) {
this.referSpan.setAttributes({'refer.finalNotify': status});
await this.performAction({refer_status: 202, final_referred_call_status: status})
.catch((err) => {
this.logger.error(err, 'TaskSipRefer:exec - error performing action finalNotify');
});
await this.performAction({refer_status: 202, final_referred_call_status: status});
this.notifyTaskDone();
}
}
@@ -106,7 +102,7 @@ class TaskSipRefer extends Task {
}
_normalizeReferHeaders(cs, dlg) {
let {referTo, referredBy, referredByDisplayName} = this;
let {referTo, referredBy} = this;
/* get IP address of the SBC to use as hostname if needed */
const {host} = parseUri(dlg.remote.uri);
@@ -121,12 +117,9 @@ class TaskSipRefer extends Task {
referredBy = cs.req?.callingNumber || dlg.local.uri;
this.logger.info({referredBy}, 'setting referredby');
}
if (!referredByDisplayName) {
referredByDisplayName = cs.req?.callingName;
}
if (!referredBy.startsWith('<') && !referredBy.startsWith('sip') && !referredBy.startsWith('"')) {
/* they may have only provided a phone number/user */
referredBy = `${referredByDisplayName ? `"${referredByDisplayName}"` : ''}<sip:${referredBy}@${host}>`;
referredBy = `sip:${referredBy}@${host}`;
}
return {referTo, referredBy};
}

View File

@@ -2,8 +2,6 @@ const Task = require('./task');
const assert = require('assert');
const crypto = require('crypto');
const { TaskPreconditions, CobaltTranscriptionEvents } = require('../utils/constants');
const { SpeechCredentialError } = require('../utils/error');
const {JAMBONES_AWS_TRANSCRIBE_USE_GRPC} = require('../config');
class SttTask extends Task {
@@ -18,22 +16,14 @@ class SttTask extends Task {
normalizeTranscription,
setSpeechCredentialsAtRuntime,
compileSonioxTranscripts,
consolidateTranscripts,
updateSpeechmaticsPayload
consolidateTranscripts
} = require('../utils/transcription-utils')(logger);
this.setChannelVarsForStt = setChannelVarsForStt;
this.normalizeTranscription = normalizeTranscription;
this.compileSonioxTranscripts = compileSonioxTranscripts;
this.consolidateTranscripts = consolidateTranscripts;
this.updateSpeechmaticsPayload = updateSpeechmaticsPayload;
this.eventHandlers = [];
this.isHandledByPrimaryProvider = true;
/**
* Task use taskIncludeRecognizer to identify
* if taskIncludeRecognizer === true, use label from verb.recognizer, even it's empty
* if taskIncludeRecognizer === false, use label from application.recognizer
*/
this.taskIncludeRecognizer = !!this.data.recognizer;
if (this.data.recognizer) {
const recognizer = this.data.recognizer;
this.vendor = recognizer.vendor;
@@ -43,6 +33,7 @@ class SttTask extends Task {
//fallback
this.fallbackVendor = recognizer.fallbackVendor || 'default';
this.fallbackLanguage = recognizer.fallbackLanguage || 'default';
// label can be empty and should not have default value.
this.fallbackLabel = recognizer.fallbackLabel;
/* let credentials be supplied in the recognizer object at runtime */
@@ -91,7 +82,8 @@ class SttTask extends Task {
this.language = cs.speechRecognizerLanguage;
if (this.data.recognizer) this.data.recognizer.language = this.language;
}
if (!this.taskIncludeRecognizer) {
// label can be empty, should not assign application level label
if ('default' === this.label) {
this.label = cs.speechRecognizerLabel;
if (this.data.recognizer) this.data.recognizer.label = this.label;
}
@@ -104,21 +96,17 @@ class SttTask extends Task {
this.fallbackLanguage = cs.fallbackSpeechRecognizerLanguage;
if (this.data.recognizer) this.data.recognizer.fallbackLanguage = this.fallbackLanguage;
}
if (!this.taskIncludeRecognizer) {
// label can be empty, should not assign application level label
if ('default' === this.fallbackLabel) {
this.fallbackLabel = cs.fallbackSpeechRecognizerLabel;
if (this.data.recognizer) this.data.recognizer.fallbackLabel = this.fallbackLabel;
}
// If call is already fallback to 2nd ASR vendor
// use that.
if (cs.hasFallbackAsr) {
if (this.taskIncludeRecognizer) {
// reset fallback ASR from previous run if this verb contains data.recognizer.
cs.hasFallbackAsr = false;
} else {
this.logger.debug('Call session has fallback to 2nd ASR, use 2nd recognizer configuration');
this.vendor = this.fallbackVendor;
this.language = this.fallbackLanguage;
this.label = this.fallbackLabel;
}
this.vendor = this.fallbackVendor;
this.language = this.fallbackLanguage;
this.label = this.fallbackLabel;
}
if (!this.data.recognizer.vendor) {
this.data.recognizer.vendor = this.vendor;
@@ -190,11 +178,10 @@ class SttTask extends Task {
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.STT_NOT_PROVISIONED,
vendor,
target_sid: cs.callSid
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no stt'));
// the ASR might have fallback configuration, should not done task here.
throw new SpeechCredentialError(`No speech-to-text service credentials for ${vendor} have been configured`);
this.notifyTaskDone();
throw new Error(`No speech-to-text service credentials for ${vendor} have been configured`);
}
if (vendor === 'nuance' && credentials.client_id) {
@@ -218,30 +205,14 @@ class SttTask extends Task {
region,
roleArn
});
this.logger.debug({roleArn}, `(roleArn) got aws access token ${servedFromCache ? 'from cache' : ''}`);
// from role ARN, we will get SessionToken, but feature server use it as securityToken.
credentials = {...credentials, accessKeyId, secretAccessKey, securityToken: sessionToken};
}
else if (vendor === 'verbio' && credentials.client_id && credentials.client_secret) {
this.logger.debug({roleArn}, `got aws access token ${servedFromCache ? 'from cache' : ''}`);
credentials = {...credentials, accessKeyId, secretAccessKey, sessionToken};
} else if (vendor === 'verbio' && credentials.client_id && credentials.client_secret) {
const {access_token, servedFromCache} = await getVerbioAccessToken(credentials);
this.logger.debug({client_id: credentials.client_id},
`got verbio access token ${servedFromCache ? 'from cache' : ''}`);
credentials.access_token = access_token;
}
else if (vendor == 'aws' && !JAMBONES_AWS_TRANSCRIBE_USE_GRPC) {
/* get AWS access token */
const {speech_credential_sid, accessKeyId, secretAccessKey, securityToken, region } = credentials;
if (!securityToken) {
const { servedFromCache, ...newCredentials} = await getAwsAuthToken({
speech_credential_sid,
accessKeyId,
secretAccessKey,
region});
this.logger.debug({newCredentials}, `got aws security token ${servedFromCache ? 'from cache' : ''}`);
credentials = {...newCredentials, region};
}
}
return credentials;
}
@@ -251,12 +222,12 @@ class SttTask extends Task {
async _initFallback() {
assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration');
this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`);
this.isHandledByPrimaryProvider = false;
this.cs.hasFallbackAsr = true;
this.vendor = this.cs.fallbackSpeechRecognizerVendor = this.fallbackVendor;
this.language = this.cs.fallbackSpeechRecognizerLanguage = this.fallbackLanguage;
this.label = this.cs.fallbackSpeechRecognizerLabel = this.fallbackLabel;
this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`);
this.vendor = this.fallbackVendor;
this.language = this.fallbackLanguage;
this.label = this.fallbackLabel;
this.data.recognizer.vendor = this.vendor;
this.data.recognizer.language = this.language;
this.data.recognizer.label = this.label;
@@ -338,7 +309,6 @@ class SttTask extends Task {
message: 'STT failure reported by vendor',
detail: evt.error,
vendor: this.vendor,
target_sid: cs.callSid
}).catch((err) => this.logger.info({err}, `Error generating alert for ${this.vendor} connection failure`));
}
@@ -351,7 +321,6 @@ class SttTask extends Task {
alert_type: AlertType.STT_FAILURE,
message: `Failed connecting to ${this.vendor} speech recognizer: ${reason}`,
vendor: this.vendor,
target_sid: cs.callSid
}).catch((err) => this.logger.info({err}, `Error generating alert for ${this.vendor} connection failure`));
}
}

View File

@@ -12,14 +12,10 @@ const {
NvidiaTranscriptionEvents,
JambonzTranscriptionEvents,
TranscribeStatus,
AssemblyAiTranscriptionEvents,
VoxistTranscriptionEvents,
VerbioTranscriptionEvents,
SpeechmaticsTranscriptionEvents
AssemblyAiTranscriptionEvents
} = require('../utils/constants.json');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const SttTask = require('./stt-task');
const { SpeechCredentialError } = require('../utils/error');
const STT_LISTEN_SPAN_NAME = 'stt-listen';
@@ -28,7 +24,6 @@ class TaskTranscribe extends SttTask {
super(logger, opts, parentTask);
this.transcriptionHook = this.data.transcriptionHook;
this.translationHook = this.data.translationHook;
this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia);
if (this.data.recognizer) {
@@ -78,20 +73,7 @@ class TaskTranscribe extends SttTask {
return this.channel === 2 || this.separateRecognitionPerChannel && this.ep2;
}
async exec(cs, obj) {
try {
await this.handling(cs, obj);
} catch (error) {
if (error instanceof SpeechCredentialError) {
this.logger.info('Transcribe failed due to SpeechCredentialError, finished!');
this.notifyTaskDone();
return;
}
throw error;
}
}
async handling(cs, {ep, ep2}) {
async exec(cs, {ep, ep2}) {
await super.exec(cs, {ep, ep2});
if (this.data.recognizer.vendor === 'nuance') {
@@ -102,6 +84,7 @@ class TaskTranscribe extends SttTask {
...this.data.recognizer.nuanceOptions
};
}
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
if (cs.hasGlobalSttHints) {
const {hints, hintsBoost} = cs.globalSttHints;
@@ -118,6 +101,9 @@ class TaskTranscribe extends SttTask {
if (this.transcribing2) {
await this._startTranscribing(cs, ep2, 2);
}
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid)
.catch(() => {/*already logged error */});
} catch (err) {
if (!(await this._startFallback(cs, ep, {error: err}))) {
this.logger.info(err, 'TaskTranscribe:exec - error');
@@ -237,13 +223,6 @@ class TaskTranscribe extends SttTask {
this.addCustomEventListener(ep, SonioxTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
break;
case 'verbio':
this.bugname = `${this.bugname_prefix}verbio_transcribe`;
this.addCustomEventListener(
ep, VerbioTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
break;
case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`;
this.addCustomEventListener(ep, CobaltTranscriptionEvents.Transcription,
@@ -301,35 +280,6 @@ class TaskTranscribe extends SttTask {
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;
case 'voxist':
this.bugname = `${this.bugname_prefix}voxist_transcribe`;
this.addCustomEventListener(ep, VoxistTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep,
VoxistTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, VoxistTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
this.addCustomEventListener(ep, VoxistTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep, channel));
break;
case 'speechmatics':
this.bugname = `${this.bugname_prefix}speechmatics_transcribe`;
this.addCustomEventListener(
ep, SpeechmaticsTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(
ep, SpeechmaticsTranscriptionEvents.Translation, this._onTranslation.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Info,
this._onSpeechmaticsInfo.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.RecognitionStarted,
this._onSpeechmaticsRecognitionStarted.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Connect,
this._onVendorConnect.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
this.addCustomEventListener(ep, SpeechmaticsTranscriptionEvents.Error,
this._onSpeechmaticsError.bind(this, cs, ep));
break;
default:
if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
@@ -456,7 +406,7 @@ class TaskTranscribe extends SttTask {
this._startAsrTimer(channel);
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google', 'speechmatics']
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google']
.includes(this.vendor)) this._startTranscribing(cs, ep, channel);
}
else {
@@ -481,7 +431,7 @@ class TaskTranscribe extends SttTask {
this._resolve(channel, evt);
/* some STT engines will keep listening after a final response, so no need to restart */
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google', 'speechmatics'].includes(this.vendor) &&
if (!['soniox', 'aws', 'microsoft', 'deepgram', 'google'].includes(this.vendor) &&
!this.vendor.startsWith('custom:')) {
this.logger.debug('TaskTranscribe:_onTranscription - restarting transcribe');
this._startTranscribing(cs, ep, channel);
@@ -507,54 +457,12 @@ class TaskTranscribe extends SttTask {
}
}
async _onTranslation(_cs, _ep, channel, evt, _fsEvent) {
this.logger.debug({evt}, 'TaskTranscribe:_onTranslation');
if (this.translationHook && evt.results?.length > 0) {
try {
const b3 = this.getTracingPropagation();
const httpHeaders = b3 && {b3};
const payload = {
...this.cs.callInfo,
...httpHeaders,
translation: {
channel,
language: evt.language,
translation: evt.results[0].content
}
};
this.logger.debug({payload}, 'sending translationHook');
const json = await this.cs.requestor.request('verb:hook', this.translationHook, payload);
this.logger.info({json}, 'completed translationHook');
if (json && Array.isArray(json) && !this.parentTask) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.cs.replaceApplication(tasks);
}
}
} catch (err) {
this.logger.info(err, 'TranscribeTask:_onTranslation error');
}
if (this.parentTask) {
this.parentTask.emit('translation', evt);
}
}
if (this.killed) {
this.logger.debug('TaskTranscribe:_onTranslation exiting after receiving final transcription');
this._clearTimer();
this.notifyTaskDone();
}
}
async _resolve(channel, evt) {
if (evt.is_final) {
/* we've got a final transcript, so end the otel child span for this channel */
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
channel,
'stt.label': this.label || 'None',
'stt.resolve': 'transcript',
'stt.result': JSON.stringify(evt)
});
@@ -608,8 +516,7 @@ class TaskTranscribe extends SttTask {
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
channel,
'stt.resolve': 'timeout',
'stt.label': this.label || 'None',
'stt.resolve': 'timeout'
});
this.childSpan[channel - 1].span.end();
}
@@ -626,8 +533,7 @@ class TaskTranscribe extends SttTask {
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
channel,
'stt.resolve': 'max duration exceeded',
'stt.label': this.label || 'None',
'stt.resolve': 'max duration exceeded'
});
this.childSpan[channel - 1].span.end();
}
@@ -653,6 +559,7 @@ class TaskTranscribe extends SttTask {
bugname: this.bugname
})
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);
try {
this.notifyError({ msg: 'ASR error',
details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'});
@@ -663,6 +570,7 @@ class TaskTranscribe extends SttTask {
}
this[`_speechHandlersSet_${channel}`] = false;
this._startTranscribing(cs, _ep, channel);
updateSpeechCredentialLastUsed(this.sttCredentials.speech_credential_sid);
return true;
} catch (error) {
this.notifyError({ msg: 'ASR error',
@@ -697,7 +605,6 @@ class TaskTranscribe extends SttTask {
alert_type: AlertType.STT_FAILURE,
message: `Custom speech vendor ${this.vendor} error: ${evt.error}`,
vendor: this.vendor,
target_sid: cs.callSid
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
if (!(await this._startFallback(cs, _ep, evt))) {
this.notifyTaskDone();
@@ -709,8 +616,7 @@ class TaskTranscribe extends SttTask {
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
channel,
'stt.resolve': 'connection failure',
'stt.label': this.label || 'None',
'stt.resolve': 'connection failure'
});
this.childSpan[channel - 1].span.end();
}
@@ -719,20 +625,6 @@ class TaskTranscribe extends SttTask {
}
}
async _onSpeechmaticsRecognitionStarted(_cs, _ep, evt) {
this.logger.debug({evt}, 'TaskGather:_onSpeechmaticsRecognitionStarted');
}
async _onSpeechmaticsInfo(_cs, _ep, evt) {
this.logger.debug({evt}, 'TaskGather:_onSpeechmaticsInfo');
}
async _onSpeechmaticsError(cs, _ep, evt) {
// eslint-disable-next-line no-unused-vars
const {message, ...e} = evt;
this._onVendorError(cs, _ep, {error: JSON.stringify(e)});
}
_startAsrTimer(channel) {
if (this.vendor === 'deepgram') return; // no need
assert(this.isContinuousAsr);

View File

@@ -1,7 +1,5 @@
const Task = require('./task');
const { TaskPreconditions } = require('../utils/constants');
const { SpeechCredentialError } = require('../utils/error');
const dbUtils = require('../utils/db-utils');
class TtsTask extends Task {
@@ -12,12 +10,6 @@ class TtsTask extends Task {
this.preconditions = TaskPreconditions.Endpoint;
this.earlyMedia = this.data.earlyMedia === true || (parentTask && parentTask.earlyMedia);
/**
* Task use taskIncludeSynthesizer to identify
* if taskIncludeSynthesizer === true, use label from verb.synthesizer, even it's empty
* if taskIncludeSynthesizer === false, use label from application.synthesizer
*/
this.taskIncludeSynthesizer = !!this.data.synthesizer;
this.synthesizer = this.data.synthesizer || {};
this.disableTtsCache = this.data.disableTtsCache;
this.options = this.synthesizer.options || {};
@@ -25,134 +17,35 @@ class TtsTask extends Task {
async exec(cs) {
super.exec(cs);
if (cs.synthesizer) {
this.options = {...cs.synthesizer.options, ...this.options};
this.data.synthesizer = this.data.synthesizer || {};
for (const k in cs.synthesizer) {
const newValue = this.data.synthesizer && this.data.synthesizer[k] !== undefined ?
this.data.synthesizer[k] :
cs.synthesizer[k];
if (Array.isArray(newValue)) {
this.data.synthesizer[k] = [...(this.data.synthesizer[k] || []), ...cs.synthesizer[k]];
} else if (typeof newValue === 'object' && newValue !== null) {
this.data.synthesizer[k] = { ...(this.data.synthesizer[k] || {}), ...cs.synthesizer[k] };
} else {
this.data.synthesizer[k] = newValue;
}
}
}
}
getTtsVendorData(cs) {
const vendor = this.synthesizer.vendor && this.synthesizer.vendor !== 'default' ?
this.synthesizer.vendor :
cs.speechSynthesisVendor;
const language = this.synthesizer.language && this.synthesizer.language !== 'default' ?
this.synthesizer.language :
cs.speechSynthesisLanguage ;
const voice = this.synthesizer.voice && this.synthesizer.voice !== 'default' ?
this.synthesizer.voice :
cs.speechSynthesisVoice;
const label = this.taskIncludeSynthesizer ? this.synthesizer.label : cs.speechSynthesisLabel;
return {vendor, language, voice, label};
}
async setTtsStreamingChannelVars(vendor, language, voice, credentials, ep) {
const {api_key, model_id, custom_tts_streaming_url, auth_token} = credentials;
const {stability, similarity_boost, use_speaker_boost, style} = this.options;
let obj;
this.logger.debug({credentials},
`setTtsStreamingChannelVars: vendor: ${vendor}, language: ${language}, voice: ${voice}`);
switch (vendor) {
case 'deepgram':
obj = {
DEEPGRAM_API_KEY: api_key,
DEEPGRAM_TTS_STREAMING_MODEL: voice
};
break;
case 'cartesia':
obj = {
CARTESIA_API_KEY: api_key,
CARTESIA_TTS_STREAMING_MODEL_ID: model_id,
CARTESIA_TTS_STREAMING_VOICE_ID: voice,
CARTESIA_TTS_STREAMING_LANGUAGE: language || 'en',
};
break;
case 'elevenlabs':
obj = {
ELEVENLABS_API_KEY: api_key,
ELEVENLABS_TTS_STREAMING_MODEL_ID: model_id,
ELEVENLABS_TTS_STREAMING_VOICE_ID: voice,
// 20/12/2024 - only eleven_turbo_v2_5 support multiple language
...(['eleven_turbo_v2_5'].includes(model_id) && {ELEVENLABS_TTS_STREAMING_LANGUAGE: language}),
...(stability && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_STABILITY: stability}),
...(similarity_boost && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_SIMILARITY_BOOST: similarity_boost}),
...(use_speaker_boost && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_USE_SPEAKER_BOOST: use_speaker_boost}),
...(style && {ELEVENLABS_TTS_STREAMING_VOICE_SETTINGS_STYLE: style})
};
break;
case 'rimelabs':
const {
pauseBetweenBrackets, phonemizeBetweenBrackets, inlineSpeedAlpha, speedAlpha, reduceLatency
} = this.options;
obj = {
RIMELABS_API_KEY: api_key,
RIMELABS_TTS_STREAMING_MODEL_ID: model_id,
RIMELABS_TTS_STREAMING_VOICE_ID: voice,
RIMELABS_TTS_STREAMING_LANGUAGE: language || 'en',
...(pauseBetweenBrackets && {RIMELABS_TTS_STREAMING_PAUSE_BETWEEN_BRACKETS: pauseBetweenBrackets}),
...(phonemizeBetweenBrackets &&
{RIMELABS_TTS_STREAMING_PHONEMIZE_BETWEEN_BRACKETS: phonemizeBetweenBrackets}),
...(inlineSpeedAlpha && {RIMELABS_TTS_STREAMING_INLINE_SPEED_ALPHA: inlineSpeedAlpha}),
...(speedAlpha && {RIMELABS_TTS_STREAMING_SPEED_ALPHA: speedAlpha}),
...(reduceLatency && {RIMELABS_TTS_STREAMING_REDUCE_LATENCY: reduceLatency})
};
break;
default:
if (vendor.startsWith('custom:')) {
const use_tls = custom_tts_streaming_url.startsWith('wss://');
obj = {
CUSTOM_TTS_STREAMING_HOST: custom_tts_streaming_url.replace(/^(ws|wss):\/\//, ''),
CUSTOM_TTS_STREAMING_API_KEY: auth_token,
CUSTOM_TTS_STREAMING_VOICE_ID: voice,
CUSTOM_TTS_STREAMING_LANGUAGE: language || 'en',
CUSTOM_TTS_STREAMING_USE_TLS: use_tls
};
} else {
throw new Error(`vendor ${vendor} is not supported for tts streaming yet`);
}
}
this.logger.info({vendor, credentials, obj}, 'setTtsStreamingChannelVars');
await ep.set(obj);
}
async _synthesizeWithSpecificVendor(cs, ep, {vendor, language, voice, label, preCache = false}) {
async _synthesizeWithSpecificVendor(cs, ep, {
vendor,
language,
voice,
label,
disableTtsStreaming,
preCache
}) {
const {srf, accountSid:account_sid} = cs;
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, srf);
const {writeAlerts, AlertType, stats} = srf.locals;
const {synthAudio} = srf.locals.dbHelpers;
const engine = this.synthesizer.engine || cs.synthesizer?.engine || 'neural';
const salt = cs.callSid;
let credentials = cs.getSpeechCredentials(vendor, 'tts', label);
if (!credentials) {
throw new SpeechCredentialError(
`No text-to-speech service credentials for ${vendor} with labels: ${label} have been configured`);
}
/* parse Nuance voices into name and model */
let model;
if (vendor === 'nuance' && voice) {
const arr = /([A-Za-z-]*)\s+-\s+(enhanced|standard)/.exec(voice);
if (arr) {
voice = arr[1];
this.model = arr[2];
model = arr[2];
}
} else if (vendor === 'deepgram') {
this.model = voice;
model = voice;
}
this.model_id = credentials.model_id;
/* allow for microsoft custom region voice and api_key to be specified as an override */
if (vendor === 'microsoft' && this.options.deploymentId) {
@@ -169,64 +62,30 @@ class TtsTask extends Task {
credentials.optimize_streaming_latency = this.options.optimize_streaming_latency
|| credentials.optimize_streaming_latency;
voice = this.options.voice_id || voice;
} else if (vendor === 'rimelabs') {
credentials = credentials || {};
credentials.model_id = this.options.model_id || credentials.model_id;
} else if (vendor === 'whisper') {
credentials = credentials || {};
credentials.model_id = this.options.model_id || credentials.model_id;
} else if (vendor === 'verbio') {
credentials = credentials || {};
credentials.engine_version = this.options.engine_version || credentials.engine_version;
} else if (vendor === 'playht') {
credentials = credentials || {};
credentials.voice_engine = this.options.voice_engine || credentials.voice_engine;
} else if (vendor === 'google' && typeof voice === 'string' && voice.startsWith('custom_')) {
const {lookupGoogleCustomVoice} = dbUtils(this.logger, cs.srf);
const arr = /custom_(.*)/.exec(voice);
if (arr) {
const google_custom_voice_sid = arr[1];
const [custom_voice] = await lookupGoogleCustomVoice(google_custom_voice_sid);
if (custom_voice.use_voice_cloning_key) {
voice = {
voice_cloning_key: custom_voice.voice_cloning_key,
};
}
}
}
/**
* note on cache_speech_handles. This was found to be risky.
* It can cause a crash in the following sequence on a single call:
* 1. Stream tts on vendor A with cache_speech_handles=1, then
* 2. Stream tts on vendor B with cache_speech_handles=1
*
* we previously tried to track when vendors were switched and manage the flag accordingly,
* but it difficult to track all the scenarios and the benefit (slightly faster start to tts playout)
* is probably minimal. DH.
*/
ep.set({
tts_engine: vendor.startsWith('custom:') ? 'custom' : vendor,
tts_engine: vendor,
tts_voice: voice,
//cache_speech_handles: !cs.currentTtsVendor || cs.currentTtsVendor === vendor ? 1 : 0,
cache_speech_handles: 0,
}).catch((err) => this.logger.info({err}, 'Error setting tts_engine on endpoint'));
// set the current vendor on the call session
// If vendor is changed from the previous one, then reset the cache_speech_handles flag
//cs.currentTtsVendor = vendor;
cache_speech_handles: 1,
}).catch((err) => this.logger.info({err}, `${this.name}: Error setting tts_engine on endpoint`));
if (!preCache && !this._disableTracing)
this.logger.info({vendor, language, voice, model: this.model}, 'TaskSay:exec');
if (!preCache) this.logger.info({vendor, language, voice, model}, `${this.name}:exec`);
try {
if (!credentials) {
writeAlerts({
account_sid,
alert_type: AlertType.TTS_NOT_PROVISIONED,
vendor,
target_sid: cs.callSid
vendor
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
throw new SpeechCredentialError('no provisioned speech credentials for TTS');
this.notifyError({
msg: 'TTS error',
details:`No speech credentials provisioned for selected vendor ${vendor}`
});
throw new Error('no provisioned speech credentials for TTS');
}
// synthesize all of the text elements
let lastUpdated = false;
/* produce an audio segment from the provided text */
const generateAudio = async(text) => {
@@ -234,12 +93,11 @@ class TtsTask extends Task {
if (text.startsWith('silence_stream://')) return text;
/* otel: trace time for tts */
if (!preCache && !this._disableTracing) {
if (!preCache && !this.parentTask) {
const {span} = this.startChildSpan('tts-generation', {
'tts.vendor': vendor,
'tts.language': language,
'tts.voice': voice,
'tts.label': label || 'None',
'tts.voice': voice
});
this.otelSpan = span;
}
@@ -251,22 +109,27 @@ class TtsTask extends Task {
language,
voice,
engine,
model: this.model,
model,
salt,
credentials,
options: this.options,
disableTtsCache : this.disableTtsCache,
renderForCaching: preCache
disableTtsStreaming,
preCache
});
if (!filePath.startsWith('say:')) {
this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`);
this.logger.debug(`file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath);
if (this.otelSpan) {
this.otelSpan.setAttributes({'tts.cached': servedFromCache});
this.otelSpan.end();
this.otelSpan = null;
}
if (!servedFromCache && rtt && !preCache && !this._disableTracing) {
if (!servedFromCache && !lastUpdated) {
lastUpdated = true;
updateSpeechCredentialLastUsed(credentials.speech_credential_sid).catch(() => {/* logged error */});
}
if (!servedFromCache && rtt && !preCache) {
this.notifyStatus({
event: 'synthesized-audio',
vendor,
@@ -277,7 +140,7 @@ class TtsTask extends Task {
}
}
else {
this.logger.debug('Say: a streaming tts api will be used');
this.logger.debug('a streaming tts api will be used');
const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`);
return modifiedPath;
}
@@ -289,9 +152,9 @@ class TtsTask extends Task {
account_sid: cs.accountSid,
alert_type: AlertType.TTS_FAILURE,
vendor,
detail: err.message,
target_sid: cs.callSid
detail: err.message
}).catch((err) => this.logger.info({err}, 'Error generating alert for tts failure'));
this.notifyError({msg: 'TTS error', details: err.message || err});
throw err;
}
};
@@ -302,7 +165,6 @@ class TtsTask extends Task {
this.logger.info(err, 'TaskSay:exec error');
throw err;
}
}
_validateURL(urlString) {

View File

@@ -2,6 +2,7 @@ const makeTask = require('../tasks/make_task');
const Emitter = require('events');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const {TaskName} = require('../utils/constants');
const assert = require('assert');
/**
* ActionHookDelayProcessor
@@ -24,12 +25,10 @@ class ActionHookDelayProcessor extends Emitter {
this._active = false;
const enabled = this.init(opts);
if (enabled && this.noResponseTimeout &&
(!this.actions || !Array.isArray(this.actions) || this.actions.length === 0)) {
if (enabled && (!this.actions || !Array.isArray(this.actions) || this.actions.length === 0)) {
throw new Error('ActionHookDelayProcessor: no actions specified');
}
else if (enabled && this.actions &&
this.actions.some((a) => !a.verb || ![TaskName.Say, TaskName.Play].includes(a.verb))) {
else if (enabled && this.actions.some((a) => !a.verb || ![TaskName.Say, TaskName.Play].includes(a.verb))) {
throw new Error(`ActionHookDelayProcessor: invalid actions specified: ${JSON.stringify(this.actions)}`);
}
}
@@ -52,9 +51,8 @@ class ActionHookDelayProcessor extends Emitter {
this.actions = opts.actions;
this.retries = opts.retries || 0;
this.noResponseTimeout = opts.noResponseTimeout;
this.noResponseTimeout = opts.noResponseTimeout || 0;
this.noResponseGiveUpTimeout = opts.noResponseGiveUpTimeout;
this.giveUpActions = opts.giveUpActions;
// return false if these options actually disable the ahdp
return ('enable' in opts && opts.enable === true) ||
@@ -68,16 +66,11 @@ class ActionHookDelayProcessor extends Emitter {
this.logger.debug('ActionHookDelayProcessor#start: already started due to prior gather which is continuing');
return;
}
assert(!this._noResponseTimer);
this._active = true;
this._retryCount = 0;
if (this.noResponseTimeout > 0) {
const timeoutMs = this.noResponseTimeout * 1000;
this._noResponseTimer = setTimeout(this._onNoResponseTimer.bind(this), timeoutMs);
} else {
this.logger.debug(
'ActionHookDelayProcessor#start: noResponseTimeout is 0 or undefined hence not calling _onNoResponseTimer'
);
}
const timeoutMs = this.noResponseTimeout === 0 ? 1 : this.noResponseTimeout * 1000;
this._noResponseTimer = setTimeout(this._onNoResponseTimer.bind(this), timeoutMs);
if (this.noResponseGiveUpTimeout > 0) {
const timeoutMs = this.noResponseGiveUpTimeout * 1000;
@@ -86,6 +79,7 @@ class ActionHookDelayProcessor extends Emitter {
}
async stop() {
this.logger.debug('ActionHookDelayProcessor#stop');
this._active = false;
if (this._noResponseTimer) {
@@ -97,19 +91,25 @@ class ActionHookDelayProcessor extends Emitter {
this._noResponseGiveUpTimer = null;
}
if (this._taskInProgress) {
this.logger.debug(`ActionHookDelayProcessor#stop: stopping ${this._taskInProgress.name}`);
this.logger.debug(`ActionHookDelayProcessor#stop: killing task in progress: ${this._taskInProgress.name}`);
this._sayResolver = () => {
this.logger.debug('ActionHookDelayProcessor#stop: play/say is done, continue on..');
//this._taskInProgress.kill(this.cs);
this._taskInProgress = null;
};
/* we let Say finish, but interrupt Play */
if (TaskName.Play === this._taskInProgress.name) {
await this._taskInProgress.kill(this.cs);
/** if we are doing a play, kill it immediately
* if we are doing a say, wait for it to finish
*/
if (TaskName.Say === this._taskInProgress.name) {
this._sayResolver = () => {
this.logger.debug('ActionHookDelayProcessor#stop: say is done, continue on..');
this._taskInProgress.kill(this.cs);
this._taskInProgress = null;
};
this.logger.debug('ActionHookDelayProcessor#stop returning promise');
return new Promise((resolve) => this._sayResolver = resolve);
}
else {
/* play */
this._taskInProgress.kill(this.cs);
this._taskInProgress = null;
}
return new Promise((resolve) => this._sayResolver = resolve);
}
this.logger.debug('ActionHookDelayProcessor#stop returning');
}
@@ -126,12 +126,7 @@ class ActionHookDelayProcessor extends Emitter {
try {
this._taskInProgress = makeTask(this.logger, t[0]);
this._taskInProgress.disableTracing = true;
this._taskInProgress.exec(this.cs, {ep: this.ep}).catch((err) => {
this.logger.info(`ActionHookDelayProcessor#_onNoResponseTimer: error playing file: ${err.message}`);
this._taskInProgress = null;
this.ep.removeAllListeners('playback-start');
this.ep.removeAllListeners('playback-stop');
});
this._taskInProgress.exec(this.cs, {ep: this.ep});
} catch (err) {
this.logger.info(err, 'ActionHookDelayProcessor#_onNoResponseTimer: error starting action');
this._taskInProgress = null;
@@ -142,9 +137,7 @@ class ActionHookDelayProcessor extends Emitter {
this.logger.debug({evt}, 'got playback-start');
if (!this._active) {
this.logger.info({evt}, 'ActionHookDelayProcessor#_onNoResponseTimer: killing audio immediately');
/* note: in race condition we may have just hung up and cs.ep cleared */
this.ep?.api('uuid_break', this.ep?.uuid)
this.ep.api('uuid_break', this.ep.uuid)
.catch((err) => this.logger.info(err,
'ActionHookDelayProcessor#_onNoResponseTimer Error killing audio'));
}
@@ -154,7 +147,7 @@ class ActionHookDelayProcessor extends Emitter {
this._taskInProgress = null;
if (this._sayResolver) {
/* we were waiting for the play to finish before continuing to next task */
this.logger.debug({evt}, 'ActionHookDelayProcessor#_onNoResponseTimer got playback-stop');
this.logger.debug({evt}, 'got playback-stop');
this._sayResolver();
this._sayResolver = null;
}
@@ -173,14 +166,9 @@ class ActionHookDelayProcessor extends Emitter {
_onNoResponseGiveUpTimer() {
this._active = false;
if (!this.giveUpActions) {
this.logger.info('ActionHookDelayProcessor#_onNoResponseGiveUpTimer');
this.stop().catch((err) => {});
this.emit('giveup');
} else {
this.logger.info('ActionHookDelayProcessor#_onNoResponseGiveUpTimer - giveUpActions');
this.emit('giveupWithTasks', this.giveUpActions);
}
this.logger.info('ActionHookDelayProcessor#_onNoResponseGiveUpTimer');
this.stop().catch((err) => {});
this.emit('giveup');
}
}

View File

@@ -153,7 +153,7 @@ class Amd extends Emitter {
const wordCount = t.alternatives[0].transcript.split(' ').length;
const final = t.is_final;
const foundHint = hints.find((h) => t.alternatives[0].transcript.toLowerCase().includes(h.toLowerCase()));
const foundHint = hints.find((h) => t.alternatives[0].transcript.includes(h));
if (foundHint) {
/* we detected a common voice mail greeting */
this.logger.debug(`Amd:evaluateTranscription: found hint ${foundHint}`);
@@ -210,8 +210,7 @@ module.exports = (logger) => {
account_sid: cs.accountSid,
alert_type: AlertType.STT_FAILURE,
vendor: vendor,
detail: err.message,
target_sid: cs.callSid
detail: err.message
});
}).catch((err) => logger.info({err}, 'Error generating alert for tts failure'));
@@ -246,10 +245,7 @@ module.exports = (logger) => {
const amd = ep.amd = new Amd(logger, cs, opts);
const {vendor, language} = amd;
let sttCredentials = amd.sttCredentials;
// hints from configuration might be too long for specific language and vendor that make transcribe freeswitch
// modules cannot connect to the vendor. hints is used in next step to validate if the transcription
// matchs voice mail hints.
const hints = [];
const hints = voicemailHints[language] || [];
if (vendor === 'nuance' && sttCredentials.client_id) {
/* get nuance access token */

View File

@@ -46,9 +46,6 @@ class BackgroundTaskManager extends Emitter {
case 'transcribe':
task = await this._initTranscribe(opts);
break;
case 'ttsStream':
task = await this._initTtsStream(opts);
break;
default:
break;
}
@@ -103,7 +100,6 @@ class BackgroundTaskManager extends Emitter {
async _initBargeIn(opts) {
let task;
try {
const copy = JSON.parse(JSON.stringify(opts));
const t = normalizeJambones(this.logger, [opts]);
task = makeTask(this.logger, t[0]);
task
@@ -122,7 +118,7 @@ class BackgroundTaskManager extends Emitter {
if (task.sticky && !this.cs.callGone && !this.cs._stopping) {
this.logger.info('BackgroundTaskManager:_initBargeIn: restarting background bargeIn');
this._bargeInHandled = false;
this.newTask('bargeIn', copy, true);
this.newTask('bargeIn', opts, true);
}
return;
})
@@ -177,25 +173,6 @@ class BackgroundTaskManager extends Emitter {
return task;
}
// Initiate Tts Stream
async _initTtsStream(opts) {
let task;
try {
const t = normalizeJambones(this.logger, [opts]);
task = makeTask(this.logger, t[0]);
const resources = await this.cs._evaluatePreconditions(task);
const {span, ctx} = this.rootSpan.startChildSpan(`background-ttsStream:${task.summary}`);
task.span = span;
task.ctx = ctx;
task.exec(this.cs, resources)
.then(this._taskCompleted.bind(this, 'ttsStream', task))
.catch(this._taskError.bind(this, 'ttsStream', task));
} catch (err) {
this.logger.info(err, 'BackgroundTaskManager:_initTtsStream - Error creating ttsStream task');
}
return task;
}
_taskCompleted(type, task) {
this.logger.debug({type, task}, `BackgroundTaskManager:_taskCompleted: task completed, sticky: ${task.sticky}`);
task.removeAllListeners();

View File

@@ -14,7 +14,6 @@
"Leave": "leave",
"Lex": "lex",
"Listen": "listen",
"Llm": "llm",
"Message": "message",
"Pause": "pause",
"Play": "play",
@@ -28,7 +27,6 @@
"SipRedirect": "sip:redirect",
"Say": "say",
"SayLegacy": "say:legacy",
"Stream": "stream",
"Tag": "tag",
"Transcribe": "transcribe"
},
@@ -128,15 +126,6 @@
"NoSpeechDetected": "azure_transcribe::no_speech_detected",
"VadDetected": "azure_transcribe::vad_detected"
},
"SpeechmaticsTranscriptionEvents": {
"Transcription": "speechmatics_transcribe::transcription",
"Translation": "speechmatics_transcribe::translation",
"Info": "speechmatics_transcribe::info",
"RecognitionStarted": "speechmatics_transcribe::recognition_started",
"ConnectFailure": "speechmatics_transcribe::connect_failed",
"Connect": "speechmatics_transcribe::connect",
"Error": "speechmatics_transcribe::error"
},
"JambonzTranscriptionEvents": {
"Transcription": "jambonz_transcribe::transcription",
"ConnectFailure": "jambonz_transcribe::connect_failed",
@@ -149,12 +138,6 @@
"ConnectFailure": "assemblyai_transcribe::connect_failed",
"Connect": "assemblyai_transcribe::connect"
},
"VoxistTranscriptionEvents": {
"Transcription": "voxist_transcribe::transcription",
"Error": "voxist_transcribe::error",
"ConnectFailure": "voxist_transcribe::connect_failed",
"Connect": "voxist_transcribe::connect"
},
"VadDetection": {
"Detection": "vad_detect:detection"
},
@@ -175,34 +158,6 @@
"StandbyEnter": "standby-enter",
"StandbyExit": "standby-exit"
},
"LlmEvents_OpenAI": {
"Error": "error",
"Connect": "openai_s2s::connect",
"ConnectFailure": "openai_s2s::connect_failed",
"Disconnect": "openai_s2s::disconnect",
"ServerEvent": "openai_s2s::server_event"
},
"LlmEvents_Elevenlabs": {
"Error": "error",
"Connect": "elevenlabs_s2s::connect",
"ConnectFailure": "elevenlabs_s2s::connect_failed",
"Disconnect": "elevenlabs_s2s::disconnect",
"ServerEvent": "elevenlabs_s2s::server_event"
},
"LlmEvents_VoiceAgent": {
"Error": "error",
"Connect": "voice_agent_s2s::connect",
"ConnectFailure": "voice_agent_s2s::connect_failed",
"Disconnect": "voice_agent_s2s::disconnect",
"ServerEvent": "voice_agent_s2s::server_event"
},
"LlmEvents_Ultravox": {
"Error": "error",
"Connect": "ultravox_s2s::connect",
"ConnectFailure": "ultravox_s2s::connect_failed",
"Disconnect": "ultravox_s2s::disconnect",
"ServerEvent": "ultravox_s2s::server_event"
},
"QueueResults": {
"Bridged": "bridged",
"Error": "error",
@@ -217,8 +172,7 @@
},
"KillReason": {
"Hangup": "hangup",
"Replaced": "replaced",
"MediaTimeout": "media_timeout"
"Replaced": "replaced"
},
"HookMsgTypes": [
"session:new",
@@ -230,10 +184,6 @@
"dial:confirm",
"verb:hook",
"verb:status",
"llm:event",
"llm:tool-call",
"tts:tokens-result",
"tts:streaming-event",
"jambonz:error"
],
"RecordState": {
@@ -252,59 +202,7 @@
"ToneTimeout": "amd_tone_timeout",
"Stopped": "amd_stopped"
},
"MediaPath": {
"NoMedia": "no-media",
"PartialMedia": "partial-media",
"FullMedia": "full-media"
},
"DeepgramTtsStreamingEvents": {
"Empty": "deepgram_tts_streaming::empty",
"ConnectFailure": "deepgram_tts_streaming::connect_failed",
"Connect": "deepgram_tts_streaming::connect"
},
"CartesiaTtsStreamingEvents": {
"Empty": "cartesia_tts_streaming::empty",
"ConnectFailure": "cartesia_tts_streaming::connect_failed",
"Connect": "cartesia_tts_streaming::connect"
},
"ElevenlabsTtsStreamingEvents": {
"Empty": "elevenlabs_tts_streaming::empty",
"ConnectFailure": "elevenlabs_tts_streaming::connect_failed",
"Connect": "elevenlabs_tts_streaming::connect"
},
"RimelabsTtsStreamingEvents": {
"Empty": "rimelabs_tts_streaming::empty",
"ConnectFailure": "rimelabs_tts_streaming::connect_failed",
"Connect": "rimelabs_tts_streaming::connect"
},
"CustomTtsStreamingEvents": {
"Empty": "custom_tts_streaming::empty",
"ConnectFailure": "custom_tts_streaming::connect_failed",
"Connect": "custom_tts_streaming::connect"
},
"TtsStreamingEvents": {
"Empty": "tts_streaming::empty",
"Pause": "tts_streaming::pause",
"Resume": "tts_streaming::resume",
"ConnectFailure": "tts_streaming::connect_failed"
},
"TtsStreamingConnectionStatus": {
"NotConnected": "not_connected",
"Connected": "connected",
"Connecting": "connecting",
"Failed": "failed"
},
"MAX_SIMRINGS": 10,
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)",
"FS_UUID_SET_NAME": "fsUUIDs",
"SystemState" : {
"Online": "ONLINE",
"Offline": "OFFLINE",
"GracefulShutdownInProgress":"SHUTDOWN_IN_PROGRESS"
},
"FEATURE_SERVER" : "feature-server",
"WS_CLOSE_CODES": {
"NormalClosure": 1000,
"GoingAway": 1001
}
"FS_UUID_SET_NAME": "fsUUIDs"
}

View File

@@ -77,7 +77,6 @@ const speechMapper = (cred) => {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.deepgram_stt_uri = o.deepgram_stt_uri;
obj.deepgram_tts_uri = o.deepgram_tts_uri;
obj.deepgram_stt_use_tls = o.deepgram_stt_use_tls;
}
else if ('soniox' === obj.vendor) {
@@ -91,63 +90,39 @@ const speechMapper = (cred) => {
else if ('cobalt' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.cobalt_server_uri = o.cobalt_server_uri;
}
else if ('elevenlabs' === obj.vendor) {
} else if ('elevenlabs' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.model_id = o.model_id;
obj.options = o.options;
}
else if ('playht' === obj.vendor) {
} else if ('playht' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.user_id = o.user_id;
obj.voice_engine = o.voice_engine;
obj.options = o.options;
}
else if ('cartesia' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.model_id = o.model_id;
obj.embedding = o.embedding;
obj.options = o.options;
}
else if ('rimelabs' === obj.vendor) {
} else if ('rimelabs' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.model_id = o.model_id;
obj.options = o.options;
}
else if ('assemblyai' === obj.vendor) {
} else if ('assemblyai' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
}
else if ('voxist' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
}
else if ('whisper' === obj.vendor) {
} else if ('whisper' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.model_id = o.model_id;
}
else if ('verbio' === obj.vendor) {
} else if ('verbio' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.client_id = o.client_id;
obj.client_secret = o.client_secret;
obj.engine_version = o.engine_version;
}
else if ('speechmatics' === obj.vendor) {
const o = JSON.parse(decrypt(credential));
obj.api_key = o.api_key;
obj.speechmatics_stt_uri = o.speechmatics_stt_uri;
}
else if (obj.vendor.startsWith('custom:')) {
} else if (obj.vendor.startsWith('custom:')) {
const o = JSON.parse(decrypt(credential));
obj.auth_token = o.auth_token;
obj.custom_stt_url = o.custom_stt_url;
obj.custom_tts_url = o.custom_tts_url;
obj.custom_tts_streaming_url = o.custom_tts_streaming_url;
}
} catch (err) {
console.log(err);
@@ -227,23 +202,11 @@ module.exports = (logger, srf) => {
}
};
const lookupVoipCarrierBySid = async(sid) => {
const pp = pool.promise();
try {
const [r] = await pp.query('SELECT * FROM voip_carriers WHERE voip_carrier_sid = ?', [sid]);
return r;
} catch (err) {
logger.error({err}, `lookupVoipCarrierBySid: Error ${sid}`);
}
};
return {
lookupAccountDetails,
updateSpeechCredentialLastUsed,
lookupCarrier,
lookupCarrierByPhoneNumber,
lookupGoogleCustomVoice,
lookupVoipCarrierBySid
lookupGoogleCustomVoice
};
};

View File

@@ -1,33 +0,0 @@
class NonFatalTaskError extends Error {
constructor(msg) {
super(msg);
}
}
class SpeechCredentialError extends NonFatalTaskError {
constructor(msg) {
super(msg);
}
}
class PlayFileNotFoundError extends NonFatalTaskError {
constructor(url) {
super('File not found');
this.url = url;
}
}
class HTTPResponseError extends Error {
constructor(statusCode) {
super('Unexpected HTTP Response');
delete this.stack;
this.statusCode = statusCode;
}
}
module.exports = {
SpeechCredentialError,
NonFatalTaskError,
PlayFileNotFoundError,
HTTPResponseError
};

View File

@@ -16,7 +16,6 @@ const {
NODE_ENV,
HTTP_USER_AGENT_HEADER,
} = require('../config');
const {HTTPResponseError} = require('./error');
const toBase64 = (str) => Buffer.from(str || '', 'utf8').toString('base64');
@@ -191,7 +190,8 @@ class HttpRequestor extends BaseRequestor {
followRedirects: false
});
if (![200, 202, 204].includes(statusCode)) {
const err = new HTTPResponseError(statusCode);
const err = new Error();
err.statusCode = statusCode;
throw err;
}
if (headers['content-type']?.includes('application/json')) {

View File

@@ -1,5 +1,5 @@
const Mrf = require('drachtio-fsmrf');
const os = require('os');
const ip = require('ip');
const {
JAMBONES_MYSQL_HOST,
JAMBONES_MYSQL_USER,
@@ -12,25 +12,11 @@ const {
JAMBONES_TIME_SERIES_HOST,
JAMBONES_ESL_LISTEN_ADDRESS,
PORT,
HTTP_IP,
NODE_ENV,
} = require('../config');
const Registrar = require('@jambonz/mw-registrar');
const assert = require('assert');
function getLocalIp() {
const interfaces = os.networkInterfaces();
for (const interfaceName in interfaces) {
const interface = interfaces[interfaceName];
for (const iface of interface) {
if (iface.family === 'IPv4' && !iface.internal) {
return iface.address;
}
}
}
return '127.0.0.1'; // Fallback to localhost if no suitable interface found
}
function initMS(logger, wrapper, ms) {
Object.assign(wrapper, {ms, active: true, connects: 1});
logger.info(`connected to freeswitch at ${ms.address}`);
@@ -152,8 +138,7 @@ function installSrfLocals(srf, logger) {
lookupAccountBySid,
lookupAccountCapacitiesBySid,
lookupSmppGateways,
lookupClientByAccountAndUsername,
lookupSystemInformation
lookupClientByAccountAndUsername
} = require('@jambonz/db-helpers')({
host: JAMBONES_MYSQL_HOST,
user: JAMBONES_MYSQL_USER,
@@ -199,8 +184,7 @@ function installSrfLocals(srf, logger) {
} = require('@jambonz/speech-utils')({}, logger);
const {
writeAlerts,
AlertType,
writeSystemAlerts
AlertType
} = require('@jambonz/time-series')(logger, {
host: JAMBONES_TIME_SERIES_HOST,
commitSize: 50,
@@ -209,8 +193,7 @@ function installSrfLocals(srf, logger) {
let localIp;
try {
// Either use the configured IP address or discover it
localIp = HTTP_IP || getLocalIp();
localIp = ip.address();
} catch (err) {
logger.error({err}, 'installSrfLocals - error detecting local ipv4 address');
}
@@ -230,7 +213,6 @@ function installSrfLocals(srf, logger) {
lookupAccountCapacitiesBySid,
lookupSmppGateways,
lookupClientByAccountAndUsername,
lookupSystemInformation,
updateCallStatus,
retrieveCall,
listCalls,
@@ -270,8 +252,7 @@ function installSrfLocals(srf, logger) {
getFreeswitch,
stats: stats,
writeAlerts,
AlertType,
writeSystemAlerts
AlertType
};
if (localIp) {

View File

@@ -1,32 +0,0 @@
/**
* Parses a list of hostport entries and selects the first one that matches the specified protocol,
* excluding any entries with the localhost IP address ('127.0.0.1').
*
* Each hostport entry should be in the format: 'protocol/ip:port'
*
* @param {Object} logger - A logging object with a 'debug' method for logging debug messages.
* @param {string} hostport - A comma-separated string containing hostport entries.
* @param {string} protocol - The protocol to match (e.g., 'udp', 'tcp').
* @returns {Array} An array containing:
* 0: protocol
* 1: ip address
* 2: port
*/
const selectHostPort = (logger, hostport, protocol) => {
logger.debug(`selectHostPort: ${hostport}, ${protocol}`);
const sel = hostport
.split(',')
.map((hp) => {
const arr = /(.*)\/(.*):(.*)/.exec(hp);
return [arr[1], arr[2], arr[3]];
})
.filter((hp) => {
return hp[0] === protocol && hp[1] !== '127.0.0.1';
});
return sel[0];
};
module.exports = {
selectHostPort
};

View File

@@ -1,5 +1,5 @@
const Emitter = require('events');
const {CallStatus, MediaPath} = require('./constants');
const {CallStatus} = require('./constants');
const SipError = require('drachtio-srf').SipError;
const {TaskPreconditions, CallDirection} = require('../utils/constants');
const CallInfo = require('../session/call-info');
@@ -17,9 +17,7 @@ const HttpRequestor = require('./http-requestor');
const WsRequestor = require('./ws-requestor');
const {makeOpusFirst} = require('./sdp-utils');
const {
JAMBONES_USE_FREESWITCH_TIMER_FD,
JAMBONES_MEDIA_TIMEOUT_MS,
JAMBONES_MEDIA_HOLD_TIMEOUT_MS
JAMBONES_USE_FREESWITCH_TIMER_FD
} = require('../config');
class SingleDialer extends Emitter {
@@ -215,8 +213,6 @@ class SingleDialer extends Emitter {
},
cbProvisional: (prov) => {
const status = {sipStatus: prov.status, sipReason: prov.reason};
// Update call-id for sbc outbound INVITE
this.callInfo.sbcCallid = prov.get('X-CID');
if ([180, 183].includes(prov.status) && prov.body) {
if (status.callStatus !== CallStatus.EarlyMedia) {
status.callStatus = CallStatus.EarlyMedia;
@@ -319,19 +315,14 @@ class SingleDialer extends Emitter {
/**
* kill the call in progress or the stable dialog, whichever we have
*/
async kill(Reason) {
async kill() {
this.killed = true;
if (this.inviteInProgress) await this.inviteInProgress.cancel();
else if (this.dlg && this.dlg.connected) {
const duration = moment().diff(this.dlg.connectTime, 'seconds');
this.logger.debug('SingleDialer:kill hanging up called party');
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
const headers = {
...(Reason && {'X-Reason': Reason})
};
this.dlg.destroy({
headers
});
this.dlg.destroy();
}
if (this.ep) {
this.logger.debug(`SingleDialer:kill - deleting endpoint ${this.ep.uuid}`);
@@ -342,22 +333,11 @@ class SingleDialer extends Emitter {
_configMsEndpoint() {
const opts = {
...(this.onHoldMusic && {holdMusic: `shout://${this.onHoldMusic.replace(/^https?:\/\//, '')}`}),
...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'}),
...(JAMBONES_MEDIA_TIMEOUT_MS && {media_timeout: JAMBONES_MEDIA_TIMEOUT_MS}),
...(JAMBONES_MEDIA_HOLD_TIMEOUT_MS && {media_hold_timeout: JAMBONES_MEDIA_HOLD_TIMEOUT_MS})
...(JAMBONES_USE_FREESWITCH_TIMER_FD && {timer_name: 'timerfd'})
};
if (Object.keys(opts).length > 0) {
this.ep.set(opts);
}
if (this.dialTask?.inbandDtmfEnabled && !this.ep.inbandDtmfEnabled) {
// https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod-dptools/6587132/#0-about
try {
this.ep.execute('start_dtmf');
this.ep.inbandDtmfEnabled = true;
} catch (err) {
this.logger.info(err, 'place-outdial:_configMsEndpoint - error enable inband DTMF');
}
}
}
/**
@@ -399,8 +379,7 @@ class SingleDialer extends Emitter {
callInfo: this.callInfo,
accountInfo: this.accountInfo,
tasks,
rootSpan: this.rootSpan,
req: this.req
rootSpan: this.rootSpan
});
await cs.exec();
@@ -462,38 +441,31 @@ class SingleDialer extends Emitter {
});
app.requestor.request('session:adulting', '/adulting', {
...cs.callInfo.toJSON(),
parentCallInfo: this.parentCallInfo.toJSON()
parentCallInfo: this.parentCallInfo
}).catch((err) => {
newLogger.error({err}, 'doAdulting: error sending adulting request');
});
cs.req = this.req;
// fixed hangup an adulting session does not send status callback Completed
cs.wrapDialog(this.dlg);
cs.exec().catch((err) => newLogger.error({err}, 'doAdulting: error executing session'));
return cs;
}
async releaseMediaToSBC(remoteSdp, localSdp, releaseMediaEntirely) {
async releaseMediaToSBC(remoteSdp, localSdp) {
assert(this.dlg && this.dlg.connected && this.ep && typeof remoteSdp === 'string');
const sdp = stripCodecs(this.logger, remoteSdp, localSdp) || remoteSdp;
await this.dlg.modify(sdp, {
headers: {
'X-Reason': releaseMediaEntirely ? 'release-media-entirely' : 'release-media'
'X-Reason': 'release-media'
}
});
try {
await this.ep.destroy();
} catch (err) {
this.logger.error({err}, 'SingleDialer:releaseMediaToSBC: Error destroying endpoint');
}
this.ep = null;
this.ep.destroy()
.then(() => this.ep = null)
.catch((err) => this.logger.error({err}, 'SingleDialer:releaseMediaToSBC: Error destroying endpoint'));
}
async reAnchorMedia(currentMediaRoute = MediaPath.PartialMedia) {
async reAnchorMedia() {
assert(this.dlg && this.dlg.connected && !this.ep);
this.logger.debug('SingleDialer:reAnchorMedia: re-anchoring media after partial media');
this.ep = await this.ms.createEndpoint({remoteSdp: this.dlg.remote.sdp});
this._configMsEndpoint();
await this.dlg.modify(this.ep.local.sdp, {
@@ -501,11 +473,6 @@ class SingleDialer extends Emitter {
'X-Reason': 'anchor-media'
}
});
if (currentMediaRoute === MediaPath.NoMedia) {
this.logger.debug('SingleDialer:reAnchorMedia: repoint endpoint after no media');
await this.ep.modify(this.dlg.remote.sdp);
}
}
_notifyCallStatusChange({callStatus, sipStatus, sipReason, duration}) {

View File

@@ -46,24 +46,12 @@ module.exports = (logger) => {
const {srf} = require('../..');
srf.locals.publicIp = publicIp;
})
.on(LifeCycleEvents.ScaleIn, async() => {
.on(LifeCycleEvents.ScaleIn, () => {
logger.info('AWS scale-in notification: begin drying up calls');
dryUpCalls = true;
lifecycleEmitter.operationalState = LifeCycleEvents.ScaleIn;
const {srf} = require('../..');
const {writeSystemAlerts} = srf.locals;
if (writeSystemAlerts) {
const {SystemState, FEATURE_SERVER} = require('./constants');
await writeSystemAlerts({
system_component: FEATURE_SERVER,
state : SystemState.GracefulShutdownInProgress,
fields : {
detail: `feature-server with process_id ${process.pid} shutdown in progress`,
host: srf.locals?.ipv4
}
});
}
pingProxies(srf);
// if we have zero calls, we can complete the scale-in right

View File

@@ -1,4 +1,7 @@
const {TaskName} = require('./constants.json');
const {
TaskName,
} = require('./constants.json');
const stickyVars = {
google: [
'GOOGLE_SPEECH_HINTS',
@@ -42,20 +45,12 @@ const stickyVars = {
'DEEPGRAM_SPEECH_ENDPOINTING',
'DEEPGRAM_SPEECH_UTTERANCE_END_MS',
'DEEPGRAM_SPEECH_VAD_TURNOFF',
'DEEPGRAM_SPEECH_TAG',
'DEEPGRAM_SPEECH_MODEL_VERSION',
'DEEPGRAM_SPEECH_FILLER_WORDS'
'DEEPGRAM_SPEECH_TAG'
],
aws: [
'AWS_VOCABULARY_NAME',
'AWS_VOCABULARY_FILTER_METHOD',
'AWS_VOCABULARY_FILTER_NAME',
'AWS_LANGUAGE_MODEL_NAME',
'AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY',
'AWS_REGION',
'AWS_SECURITY_TOKEN',
'AWS_PII_ENTITY_TYPES',
'AWS_VOCABULARY_FILTER_NAME'
],
nuance: [
'NUANCE_ACCESS_TOKEN',
@@ -104,17 +99,6 @@ const stickyVars = {
assemblyai: [
'ASSEMBLYAI_API_KEY',
'ASSEMBLYAI_WORD_BOOST'
],
voxist: [
'VOXIST_API_KEY',
],
speechmatics: [
'SPEECHMATICS_API_KEY',
'SPEECHMATICS_HOST',
'SPEECHMATICS_PATH',
'SPEECHMATICS_SPEECH_HINTS',
'SPEECHMATICS_TRANSLATION_LANGUAGES',
'SPEECHMATICS_TRANSLATION_PARTIALS'
]
};
@@ -157,6 +141,7 @@ const optimalDeepramModels = {
tr: ['nova-2', 'nova-2'],
uk: ['nova-2', 'nova-2']
};
const selectDefaultDeepgramModel = (task, language) => {
if (language in optimalDeepramModels) {
const [gather, transcribe] = optimalDeepramModels[language];
@@ -165,34 +150,8 @@ const selectDefaultDeepgramModel = (task, language) => {
return 'base';
};
const optimalGoogleModels = {
'v1' : {
'en-IN':['telephony', 'telephony'],
'es-DO':['default', 'default'],
'es-MX':['default', 'default'],
'en-AU':['telephony', 'telephony'],
'en-GB':['telephony', 'telephony'],
'en-NZ':['telephony', 'telephony']
},
'v2' : {
'en-IN':['telephony', 'long']
}
};
const selectDefaultGoogleModel = (task, language, version) => {
const useV2 = version === 'v2';
if (language in optimalGoogleModels[version]) {
const [gather, transcribe] = optimalGoogleModels[version][language];
return task.name === TaskName.Gather ? gather : transcribe;
}
return task.name === TaskName.Gather ?
(useV2 ? 'telephony_short' : 'command_and_search') :
(useV2 ? 'long' : 'latest_long');
};
const consolidateTranscripts = (bufferedTranscripts, channel, language, vendor) => {
if (bufferedTranscripts.length === 1) {
bufferedTranscripts[0].is_final = true;
return bufferedTranscripts[0];
}
if (bufferedTranscripts.length === 1) return bufferedTranscripts[0];
let totalConfidence = 0;
const finalTranscript = bufferedTranscripts.reduce((acc, evt) => {
totalConfidence += evt.alternatives[0].confidence;
@@ -212,7 +171,7 @@ const consolidateTranscripts = (bufferedTranscripts, channel, language, vendor)
const lastChar = acc.alternatives[0].transcript.slice(-1);
const firstChar = newTranscript.charAt(0);
if (vendor === 'speechmatics' || (lastChar.match(/\d/) && firstChar.match(/\d/))) {
if (lastChar.match(/\d/) && firstChar.match(/\d/)) {
acc.alternatives[0].transcript += newTranscript;
} else {
acc.alternatives[0].transcript += ` ${newTranscript}`;
@@ -357,10 +316,8 @@ const normalizeIbm = (evt, channel, language) => {
const normalizeGoogle = (evt, channel, language) => {
const copy = JSON.parse(JSON.stringify(evt));
const language_code = evt.language_code || language;
return {
language_code: language_code,
language_code: language,
channel_tag: channel,
is_final: evt.is_final,
alternatives: [evt.alternatives[0]],
@@ -464,41 +421,16 @@ const normalizeMicrosoft = (evt, channel, language, punctuation = true) => {
const normalizeAws = (evt, channel, language) => {
const copy = JSON.parse(JSON.stringify(evt));
const isGrpcPayload = Array.isArray(evt);
if (isGrpcPayload) {
/* legacy grpc api */
return {
language_code: language,
channel_tag: channel,
is_final: evt[0].is_final,
alternatives: evt[0].alternatives,
vendor: {
name: 'aws',
evt: copy
}
};
}
else {
/* websocket api */
const alternatives = evt.Transcript?.Results[0]?.Alternatives.map((alt) => {
const items = alt.Items.filter((item) => item.Type === 'pronunciation' && 'Confidence' in item);
const confidence = items.reduce((acc, item) => acc + item.Confidence, 0) / items.length;
return {
transcript: alt.Transcript,
confidence
};
});
return {
language_code: language,
channel_tag: channel,
is_final: evt.Transcript?.Results[0].IsPartial === false,
alternatives,
vendor: {
name: 'aws',
evt: copy
}
};
}
return {
language_code: language,
channel_tag: channel,
is_final: evt[0].is_final,
alternatives: evt[0].alternatives,
vendor: {
name: 'aws',
evt: copy
}
};
};
const normalizeAssemblyAi = (evt, channel, language) => {
@@ -514,56 +446,12 @@ const normalizeAssemblyAi = (evt, channel, language) => {
}
],
vendor: {
name: 'assemblyai',
name: 'ASSEMBLYAI',
evt: copy
}
};
};
const normalizeVoxist = (evt, channel, language) => {
const copy = JSON.parse(JSON.stringify(evt));
return {
language_code: language,
channel_tag: channel,
is_final: evt.type === 'final',
alternatives: [
{
confidence: 1.00,
transcript: evt.text,
}
],
vendor: {
name: 'voxist',
evt: copy
}
};
};
const normalizeSpeechmatics = (evt, channel, language) => {
const copy = JSON.parse(JSON.stringify(evt));
const is_final = evt.message === 'AddTranscript';
const words = evt.results?.filter((r) => r.type === 'word') || [];
const confidence = words.length > 0 ?
words.reduce((acc, word) => acc + word.alternatives[0].confidence, 0) / words.length :
0;
const alternative = {
confidence,
transcript: evt.metadata?.transcript
};
const obj = {
language_code: language,
channel_tag: channel,
is_final,
alternatives: [alternative],
vendor: {
name: 'speechmatics',
evt: copy
}
};
return obj;
};
module.exports = (logger) => {
const normalizeTranscription = (evt, vendor, channel, language, shortUtterance, punctuation) => {
@@ -589,12 +477,8 @@ module.exports = (logger) => {
return normalizeCobalt(evt, channel, language);
case 'assemblyai':
return normalizeAssemblyAi(evt, channel, language, shortUtterance);
case 'voxist':
return normalizeVoxist(evt, channel, language);
case 'verbio':
return normalizeVerbio(evt, channel, language);
case 'speechmatics':
return normalizeSpeechmatics(evt, channel, language);
default:
if (vendor.startsWith('custom:')) {
return normalizeCustom(evt, channel, language, vendor);
@@ -610,9 +494,9 @@ module.exports = (logger) => {
if ('google' === vendor) {
const useV2 = rOpts.googleOptions?.serviceVersion === 'v2';
const version = useV2 ? 'v2' : 'v1';
let {model} = rOpts;
model = model || selectDefaultGoogleModel(task, language, version);
const model = task.name === TaskName.Gather ?
(useV2 ? 'telephony_short' : 'command_and_search') :
(useV2 ? 'long' : 'latest_long');
opts = {
...opts,
...(sttCredentials && {GOOGLE_APPLICATION_CREDENTIALS: JSON.stringify(sttCredentials.credentials)}),
@@ -668,29 +552,17 @@ module.exports = (logger) => {
};
}
else if (['aws', 'polly'].includes(vendor)) {
const {awsOptions = {}} = rOpts;
const vocabularyName = awsOptions.vocabularyName || rOpts.vocabularyName;
const vocabularyFilterName = awsOptions.vocabularyFilterName || rOpts.vocabularyFilterName;
const filterMethod = awsOptions.vocabularyFilterMethod || rOpts.filterMethod;
opts = {
...opts,
...(vocabularyName && {AWS_VOCABULARY_NAME: vocabularyName}),
...(vocabularyFilterName && {AWS_VOCABULARY_FILTER_NAME: vocabularyFilterName}),
...(filterMethod && {AWS_VOCABULARY_FILTER_METHOD: filterMethod}),
...(rOpts.vocabularyName && {AWS_VOCABULARY_NAME: rOpts.vocabularyName}),
...(rOpts.vocabularyFilterName && {AWS_VOCABULARY_FILTER_NAME: rOpts.vocabularyFilterName}),
...(rOpts.filterMethod && {AWS_VOCABULARY_FILTER_METHOD: rOpts.filterMethod}),
...(sttCredentials && {
AWS_ACCESS_KEY_ID: sttCredentials.accessKeyId,
AWS_SECRET_ACCESS_KEY: sttCredentials.secretAccessKey,
...(sttCredentials.accessKeyId && {AWS_ACCESS_KEY_ID: sttCredentials.accessKeyId}),
...(sttCredentials.secretAccessKey && {AWS_SECRET_ACCESS_KEY: sttCredentials.secretAccessKey}),
AWS_REGION: sttCredentials.region,
AWS_SECURITY_TOKEN: sttCredentials.securityToken
...(sttCredentials.sessionToken && {AWS_SESSION_TOKEN: sttCredentials.sessionToken}),
}),
...(awsOptions.accessKey && {AWS_ACCESS_KEY_ID: awsOptions.accessKey}),
...(awsOptions.secretKey && {AWS_SECRET_ACCESS_KEY: awsOptions.secretKey}),
...(awsOptions.region && {AWS_REGION: awsOptions.region}),
...(awsOptions.securityToken && {AWS_SECURITY_TOKEN: awsOptions.securityToken}),
...(awsOptions.languageModelName && {AWS_LANGUAGE_MODEL_NAME: awsOptions.languageModelName}),
...(awsOptions.piiEntityTypes?.length && {AWS_PII_ENTITY_TYPES: awsOptions.piiEntityTypes.join(',')}),
...(awsOptions.piiIdentifyEntities && {AWS_PII_IDENTIFY_ENTITIES: true}),
...(awsOptions.languageModelName && {AWS_LANGUAGE_MODEL_NAME: awsOptions.languageModelName}),
};
}
else if ('microsoft' === vendor) {
@@ -729,8 +601,6 @@ module.exports = (logger) => {
//azureSttEndpointId overrides sttCredentials.custom_stt_endpoint
...(rOpts.azureSttEndpointId &&
{AZURE_SERVICE_ENDPOINT_ID: rOpts.azureSttEndpointId}),
...(azureOptions.speechRecognitionMode &&
{AZURE_RECOGNITION_MODE: azureOptions.speechRecognitionMode}),
};
}
else if ('nuance' === vendor) {
@@ -837,11 +707,7 @@ module.exports = (logger) => {
...(deepgramOptions.vadTurnoff) &&
{DEEPGRAM_SPEECH_VAD_TURNOFF: deepgramOptions.vadTurnoff},
...(deepgramOptions.tag) &&
{DEEPGRAM_SPEECH_TAG: deepgramOptions.tag},
...(deepgramOptions.version) &&
{DEEPGRAM_SPEECH_MODEL_VERSION: deepgramOptions.version},
...(deepgramOptions.fillerWords) &&
{DEEPGRAM_SPEECH_FILLER_WORDS: deepgramOptions.fillerWords}
{DEEPGRAM_SPEECH_TAG: deepgramOptions.tag}
};
}
else if ('soniox' === vendor) {
@@ -940,8 +806,7 @@ module.exports = (logger) => {
...(cobaltOptions.enableConfusionNetwork && {COBALT_ENABLE_CONFUSION_NETWORK: 1}),
...(cobaltOptions.compiledContextData && {COBALT_COMPILED_CONTEXT_DATA: cobaltOptions.compiledContextData}),
};
}
else if ('assemblyai' === vendor) {
} else if ('assemblyai' === vendor) {
opts = {
...opts,
...(sttCredentials.api_key) &&
@@ -949,15 +814,7 @@ module.exports = (logger) => {
...(rOpts.hints?.length > 0 &&
{ASSEMBLYAI_WORD_BOOST: JSON.stringify(rOpts.hints)})
};
}
else if ('voxist' === vendor) {
opts = {
...opts,
...(sttCredentials.api_key) &&
{VOXIST_API_KEY: sttCredentials.api_key},
};
}
else if ('verbio' === vendor) {
} else if ('verbio' === vendor) {
const {verbioOptions = {}} = rOpts;
opts = {
...opts,
@@ -976,55 +833,8 @@ module.exports = (logger) => {
...(verbioOptions.speech_incomplete_timeout &&
{VERBIO_SPEECH_INCOMPLETE_TIMEOUT: verbioOptions.speech_incomplete_timeout}),
};
}
else if ('speechmatics' === vendor) {
const {speechmaticsOptions = {}} = rOpts;
opts = {
...opts,
...(sttCredentials.api_key) && {SPEECHMATICS_API_KEY: sttCredentials.api_key},
...(sttCredentials.speechmatics_stt_uri) && {SPEECHMATICS_HOST: sttCredentials.speechmatics_stt_uri},
...(rOpts.hints?.length > 0 && {SPEECHMATICS_SPEECH_HINTS: rOpts.hints.join(',')}),
...(speechmaticsOptions.translation_config &&
{
SPEECHMATICS_TRANSLATION_LANGUAGES: speechmaticsOptions.translation_config.target_languages.join(','),
SPEECHMATICS_TRANSLATION_PARTIALS: speechmaticsOptions.translation_config.enable_partials ? 1 : 0
}
),
...(speechmaticsOptions.transcription_config?.domain &&
{SPEECHMATICS_DOMAIN: speechmaticsOptions.transcription_config.domain}),
...{SPEECHMATICS_MAX_DELAY: speechmaticsOptions.transcription_config?.max_delay || 0.7},
...{SPEECHMATICS_MAX_DELAY_MODE: speechmaticsOptions.transcription_config?.max_delay_mode || 'flexible'},
...(speechmaticsOptions.transcription_config?.diarization &&
{SPEECHMATICS_DIARIZATION: speechmaticsOptions.transcription_config.diarization}),
...(speechmaticsOptions.transcription_config?.speaker_diarization_config?.speaker_sensitivity &&
{SPEECHMATICS_DIARIZATION_SPEAKER_SENSITIVITY:
speechmaticsOptions.transcription_config.speaker_diarization_config.speaker_sensitivity}),
...(speechmaticsOptions.transcription_config?.speaker_diarization_config?.max_speakers &&
{SPEECHMATICS_DIARIZATION_MAX_SPEAKERS:
speechmaticsOptions.transcription_config.speaker_diarization_config.max_speakers}),
...(speechmaticsOptions.transcription_config?.output_locale &&
{SPEECHMATICS_OUTPUT_LOCALE: speechmaticsOptions.transcription_config.output_locale}),
...(speechmaticsOptions.transcription_config?.punctuation_overrides?.permitted_marks &&
{SPEECHMATICS_PUNCTUATION_ALLOWED:
speechmaticsOptions.transcription_config.punctuation_overrides.permitted_marks.join(',')}),
...(speechmaticsOptions.transcription_config?.punctuation_overrides?.sensitivity &&
{SPEECHMATICS_PUNCTUATION_SENSITIVITY:
speechmaticsOptions.transcription_config?.punctuation_overrides?.sensitivity}),
...(speechmaticsOptions.transcription_config?.operating_point &&
{SPEECHMATICS_OPERATING_POINT: speechmaticsOptions.transcription_config.operating_point}),
...(speechmaticsOptions.transcription_config?.enable_entities &&
{SPEECHMATICS_ENABLE_ENTTIES: speechmaticsOptions.transcription_config.enable_entities}),
...(speechmaticsOptions.transcription_config?.audio_filtering_config?.volume_threshold &&
{SPEECHMATICS_VOLUME_THRESHOLD:
speechmaticsOptions.transcription_config.audio_filtering_config.volume_threshold}),
...(speechmaticsOptions.transcription_config?.transcript_filtering_config?.remove_disfluencies &&
{SPEECHMATICS_REMOVE_DISFLUENCIES:
speechmaticsOptions.transcription_config.transcript_filtering_config.remove_disfluencies})
};
}
else if (vendor.startsWith('custom:')) {
let {options = {}} = rOpts.customOptions || {};
const {sampleRate} = rOpts.customOptions || {};
} else if (vendor.startsWith('custom:')) {
let {options = {}} = rOpts;
const {auth_token, custom_stt_url} = sttCredentials;
options = {
...options,
@@ -1032,15 +842,14 @@ module.exports = (logger) => {
{hints: rOpts.hints}),
...(rOpts.hints?.length > 0 && typeof rOpts.hints[0] === 'object' &&
{hints: JSON.stringify(rOpts.hints)}),
...(typeof rOpts.hintsBoost === 'number' && {hintsBoost: rOpts.hintsBoost}),
...(task.cs?.callSid && {callSid: task.cs.callSid})
...(typeof rOpts.hintsBoost === 'number' && {hintsBoost: rOpts.hintsBoost})
};
opts = {
...opts,
...(auth_token && {JAMBONZ_STT_API_KEY: auth_token}),
JAMBONZ_STT_URL: custom_stt_url,
...(Object.keys(options).length > 0 && {JAMBONZ_STT_OPTIONS: JSON.stringify(options)}),
...(sampleRate && {JAMBONZ_STT_SAMPLING: sampleRate})
};
}
@@ -1090,6 +899,6 @@ module.exports = (logger) => {
setChannelVarsForStt,
setSpeechCredentialsAtRuntime,
compileSonioxTranscripts,
consolidateTranscripts,
consolidateTranscripts
};
};

View File

@@ -1,327 +0,0 @@
const Emitter = require('events');
const assert = require('assert');
const {
TtsStreamingEvents,
TtsStreamingConnectionStatus
} = require('../utils/constants');
const MAX_CHUNK_SIZE = 1800;
const HIGH_WATER_BUFFER_SIZE = 1000;
const LOW_WATER_BUFFER_SIZE = 200;
const TIMEOUT_RETRY_MSECS = 3000;
class TtsStreamingBuffer extends Emitter {
constructor(cs) {
super();
this.cs = cs;
this.logger = cs.logger;
this.tokens = '';
this.eventHandlers = [];
this._isFull = false;
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
this._flushPending = false;
this.timer = null;
}
get isEmpty() {
return this.tokens.length === 0;
}
get isFull() {
return this._isFull;
}
get size() {
return this.tokens.length;
}
get ep() {
return this.cs?.ep;
}
async start() {
assert.ok(
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected,
'TtsStreamingBuffer:start already started, or has failed');
this.vendor = this.cs.getTsStreamingVendor();
if (!this.vendor) {
this.logger.info('TtsStreamingBuffer:start No TTS streaming vendor configured');
throw new Error('No TTS streaming vendor configured');
}
this.logger.info(`TtsStreamingBuffer:start Connecting to TTS streaming with vendor ${this.vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Connecting;
try {
if (this.eventHandlers.length === 0) this._initHandlers(this.ep);
await this._api(this.ep, [this.ep.uuid, 'connect']);
} catch (err) {
this.logger.info({err}, 'TtsStreamingBuffer:start Error connecting to TTS streaming');
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
}
}
stop() {
clearTimeout(this.timer);
this.removeCustomEventListeners();
if (this.ep) {
this._api(this.ep, [this.ep.uuid, 'close'])
.catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:kill Error closing TTS streaming'));
}
this.timer = null;
this.tokens = '';
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
}
/**
* Add tokens to the buffer and start feeding them to the endpoint if necessary.
*/
async bufferTokens(tokens) {
if (this._connectionStatus === TtsStreamingConnectionStatus.Failed) {
this.logger.info('TtsStreamingBuffer:bufferTokens TTS streaming connection failed, rejecting request');
return {status: 'failed', reason: `connection to ${this.vendor} failed`};
}
const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40);
const totalLength = tokens.length;
/* if we crossed the high water mark, reject the request */
if (this.tokens.length + totalLength > HIGH_WATER_BUFFER_SIZE) {
this.logger.info(
`TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`);
if (!this._isFull) {
this._isFull = true;
this.emit(TtsStreamingEvents.Pause);
}
return {status: 'failed', reason: 'full'};
}
this.logger.debug(
`TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength}), starting? ${this.isEmpty}`
);
this.tokens += (tokens || '');
await this._feedTokens();
return {status: 'ok'};
}
flush() {
this.logger.debug('TtsStreamingBuffer:flush');
if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) {
this.logger.debug('TtsStreamingBuffer:flush TTS stream is not quite ready - wait for connect');
this._flushPending = true;
return;
}
else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) {
if (this.size === 0) {
this._doFlush();
}
else {
/* we have tokens queued, so flush after they have been sent */
this._pendingFlush = true;
}
}
}
clear() {
this.logger.debug('TtsStreamingBuffer:clear');
if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) return;
clearTimeout(this.timer);
this._api(this.ep, [this.ep.uuid, 'clear'])
.catch((err) => this.logger.info({err}, 'TtsStreamingBuffer:clear Error clearing TTS streaming'));
this.tokens = '';
this.timer = null;
this._isFull = false;
}
/**
* Send tokens to the TTS engine in sentence chunks for best playout
*/
async _feedTokens(handlingTimeout = false) {
this.logger.debug({tokens: this.tokens}, '_feedTokens');
try {
/* are we in a state where we can feed tokens to the TTS? */
if (!this.cs.isTtsStreamOpen || !this.ep || !this.tokens) {
this.logger.debug('TTS stream is not open or no tokens to send');
return this.tokens?.length || 0;
}
if (this._connectionStatus === TtsStreamingConnectionStatus.NotConnected ||
this._connectionStatus === TtsStreamingConnectionStatus.Failed) {
this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not connected');
return;
}
if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) {
this.logger.debug('TtsStreamingBuffer:_feedTokens TTS stream is not ready, waiting for connect');
return;
}
/* must send at least one sentence */
const limit = Math.min(MAX_CHUNK_SIZE, this.tokens.length);
let chunkEnd = findSentenceBoundary(this.tokens, limit);
if (chunkEnd <= 0) {
if (handlingTimeout) {
/* on a timeout we've left some tokens sitting around, so be more aggressive now in sending them */
chunkEnd = findWordBoundary(this.tokens, limit);
if (chunkEnd <= 0) {
this.logger.debug('TtsStreamingBuffer:_feedTokens: no word boundary found');
this._setTimerIfNeeded();
return;
}
}
else {
/* if we just received tokens, we wont send unless we have at least a full sentence */
this.logger.debug('TtsStreamingBuffer:_feedTokens: no sentence boundary found');
this._setTimerIfNeeded();
return;
}
}
const chunk = this.tokens.slice(0, chunkEnd);
this.tokens = this.tokens.slice(chunkEnd);
/* freeswitch looks for sequence of 2 newlines to determine end of message, so insert a space */
const modifiedChunk = chunk.replace(/\n\n/g, '\n \n');
await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]);
this.logger.debug(`TtsStreamingBuffer:_feedTokens: sent ${chunk.length}, remaining: ${this.tokens.length}`);
if (this._pendingFlush) {
this._doFlush();
this._pendingFlush = false;
}
if (this.isFull && this.tokens.length <= LOW_WATER_BUFFER_SIZE) {
this.logger.info('TtsStreamingBuffer throttling: TTS streaming buffer is no longer full - resuming');
this._isFull = false;
this.emit(TtsStreamingEvents.Resume);
}
} catch (err) {
this.logger.info({err}, 'TtsStreamingBuffer:_feedTokens Error sending TTS chunk');
this.tokens = '';
}
return;
}
async _api(ep, args) {
const apiCmd = `uuid_${this.vendor.startsWith('custom:') ? 'custom' : this.vendor}_tts_streaming`;
const res = await ep.api(apiCmd, `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
this.logger.info({args}, `Error calling ${apiCmd}: ${res.body}`);
throw new Error(`Error calling ${apiCmd}: ${res.body}`);
}
}
_onConnectFailure(vendor) {
this.logger.info(`streaming tts connection failed to ${vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
this.tokens = '';
this.emit(TtsStreamingEvents.ConnectFailure, {vendor});
}
_doFlush() {
this._api(this.ep, [this.ep.uuid, 'flush'])
.catch((err) => this.logger.info({err},
`TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`));
}
async _onConnect(vendor) {
this.logger.info(`streaming tts connection made to ${vendor}`);
this._connectionStatus = TtsStreamingConnectionStatus.Connected;
if (this.tokens.length > 0) {
await this._feedTokens();
}
if (this._flushPending) {
this.flush();
this._flushPending = false;
}
}
_setTimerIfNeeded() {
if (this.tokens.length > 0 && !this.timer) {
this.timer = setTimeout(this._onTimeout.bind(this), TIMEOUT_RETRY_MSECS);
}
}
_onTimeout() {
this.logger.info('TtsStreamingBuffer:_onTimeout');
this.timer = null;
this._feedTokens(true);
}
_onTtsEmpty(vendor) {
this.emit(TtsStreamingEvents.Empty, {vendor});
}
addCustomEventListener(ep, event, handler) {
this.eventHandlers.push({ep, event, handler});
ep.addCustomEventListener(event, handler);
}
removeCustomEventListeners() {
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
}
_initHandlers(ep) {
[
// DH: add other vendors here as modules are added
'deepgram',
'cartesia',
'elevenlabs',
'rimelabs',
'custom'
].forEach((vendor) => {
const eventClassName = `${vendor.charAt(0).toUpperCase() + vendor.slice(1)}TtsStreamingEvents`;
const eventClass = require('../utils/constants')[eventClassName];
if (!eventClass) throw new Error(`Event class for vendor ${vendor} not found`);
this.addCustomEventListener(ep, eventClass.Connect, this._onConnect.bind(this, vendor));
this.addCustomEventListener(ep, eventClass.ConnectFailure, this._onConnectFailure.bind(this, vendor));
this.addCustomEventListener(ep, eventClass.Empty, this._onTtsEmpty.bind(this, vendor));
});
}
}
const findSentenceBoundary = (text, limit) => {
// Match traditional sentence boundaries or double newlines
const sentenceEndRegex = /[.!?](?=\s|$)|\n\n/g;
let lastSentenceBoundary = -1;
let match;
while ((match = sentenceEndRegex.exec(text)) && match.index < limit) {
const precedingText = text.slice(0, match.index).trim(); // Extract text before the match and trim whitespace
if (precedingText.length > 0) { // Check if there's actual content
if (
match[0] === '\n\n' || // It's a double newline
(match.index === 0 || !/\d$/.test(text[match.index - 1])) // Standard punctuation rules
) {
lastSentenceBoundary = match.index + (match[0] === '\n\n' ? 2 : 1); // Include the boundary
}
}
}
return lastSentenceBoundary;
};
const findWordBoundary = (text, limit) => {
const wordBoundaryRegex = /\s+/g;
let lastWordBoundary = -1;
let match;
while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) {
lastWordBoundary = match.index;
}
return lastWordBoundary;
};
module.exports = TtsStreamingBuffer;

View File

@@ -1,7 +1,7 @@
const assert = require('assert');
const BaseRequestor = require('./base-requestor');
const short = require('short-uuid');
const {HookMsgTypes, WS_CLOSE_CODES} = require('./constants.json');
const {HookMsgTypes} = require('./constants.json');
const Websocket = require('ws');
const snakeCaseKeys = require('./snakecase-keys');
const {
@@ -12,20 +12,6 @@ const {
JAMBONES_WS_MAX_PAYLOAD,
HTTP_USER_AGENT_HEADER
} = require('../config');
const MTYPE_WANTS_ACK = [
'call:status',
'verb:status',
'jambonz:error',
'llm:event',
'llm:tool-call',
'tts:streaming-event',
'tts:tokens-result',
];
const MTYPE_NO_DATA = [
'llm:tool-output',
'tts:flush',
'tts:clear'
];
class WsRequestor extends BaseRequestor {
constructor(logger, account_sid, hook, secret) {
@@ -58,7 +44,7 @@ class WsRequestor extends BaseRequestor {
async request(type, hook, params, httpHeaders = {}) {
assert(HookMsgTypes.includes(type));
const url = hook.url || hook;
const wantsAck = !MTYPE_WANTS_ACK.includes(type);
const wantsAck = !['call:status', 'verb:status', 'jambonz:error'].includes(type);
if (this.maliciousClient) {
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
@@ -146,7 +132,7 @@ class WsRequestor extends BaseRequestor {
type,
msgid,
call_sid: this.call_sid,
hook: ['verb:hook', 'session:redirect', 'llm:event', 'llm:tool-call'].includes(type) ? url : undefined,
hook: ['verb:hook', 'session:redirect'].includes(type) ? url : undefined,
data: {...payload},
...b3
};
@@ -261,13 +247,13 @@ class WsRequestor extends BaseRequestor {
}
}
close(code = WS_CLOSE_CODES.NormalClosure) {
close() {
this.closedGracefully = true;
this.logger.debug(`WsRequestor:close closing socket with code ${code}`);
this.logger.debug('WsRequestor:close closing socket');
this._stopPingTimer();
try {
if (this.ws) {
this.ws.close(code);
this.ws.close(1000);
this.ws.removeAllListeners();
this.ws = null;
}
@@ -406,9 +392,8 @@ class WsRequestor extends BaseRequestor {
/* messages must be JSON format */
try {
const obj = JSON.parse(content);
this.logger.debug({obj}, 'WsRequestor:_onMessage - received message');
//const {type, msgid, command, call_sid = this.call_sid, queueCommand = false, data} = obj;
const {type, msgid, command, queueCommand = false, tool_call_id, data} = obj;
const {type, msgid, command, queueCommand = false, data} = obj;
const call_sid = obj.callSid || this.call_sid;
//this.logger.debug({obj}, 'WsRequestor:request websocket: received');
@@ -422,8 +407,8 @@ class WsRequestor extends BaseRequestor {
case 'command':
assert.ok(command, 'command property not supplied');
assert.ok(data || MTYPE_NO_DATA.includes(command), 'data property not supplied');
this._recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data);
assert.ok(data, 'data property not supplied');
this._recvCommand(msgid, command, call_sid, queueCommand, data);
break;
default:
@@ -447,10 +432,10 @@ class WsRequestor extends BaseRequestor {
success && success(data);
}
_recvCommand(msgid, command, call_sid, queueCommand, tool_call_id, data) {
_recvCommand(msgid, command, call_sid, queueCommand, data) {
// TODO: validate command
this.logger.debug({msgid, command, call_sid, queueCommand, data}, 'received command');
this.emit('command', {msgid, command, call_sid, queueCommand, tool_call_id, data});
this.emit('command', {msgid, command, call_sid, queueCommand, data});
}
}

13181
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{
"name": "jambonz-feature-server",
"version": "0.9.3",
"version": "0.9.0",
"main": "app.js",
"engines": {
"node": ">= 18.x"
@@ -31,10 +31,10 @@
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.8",
"@jambonz/speech-utils": "^0.2.2",
"@jambonz/speech-utils": "^0.1.11",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/verb-specifications": "^0.0.95",
"@jambonz/time-series": "^0.2.13",
"@jambonz/time-series": "^0.2.8",
"@jambonz/verb-specifications": "^0.0.74",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.50.0",
@@ -47,23 +47,24 @@
"bent": "^7.3.12",
"debug": "^4.3.4",
"deepcopy": "^2.1.0",
"drachtio-fsmrf": "^4.0.1",
"drachtio-srf": "^5.0.1",
"drachtio-fsmrf": "^3.0.43",
"drachtio-srf": "^4.5.35",
"express": "^4.19.2",
"express-validator": "^7.0.1",
"ip": "^2.0.1",
"moment": "^2.30.1",
"parse-url": "^9.2.0",
"pino": "^8.20.0",
"polly-ssml-split": "^0.1.0",
"proxyquire": "^2.1.3",
"sdp-transform": "^2.15.0",
"sdp-transform": "^2.14.2",
"short-uuid": "^5.1.0",
"sinon": "^17.0.1",
"to-snake-case": "^1.0.0",
"undici": "^6.20.0",
"undici": "^6.19.2",
"uuid-random": "^1.3.2",
"verify-aws-sns-signature": "^0.1.0",
"ws": "^8.18.0",
"ws": "^8.17.1",
"xml2js": "^0.6.2"
},
"devDependencies": {

View File

@@ -222,62 +222,3 @@ test('test create-call app_json', async(t) => {
t.error(err);
}
});
test('test create-call timeLimit', async(t) => {
clearModule.all();
const {srf, disconnect} = require('../app');
try {
await connect(srf);
// GIVEN
let from = 'create-call-app-json';
let account_sid = 'bb845d4b-83a9-4cde-a6e9-50f3743bab3f';
// Give UAS app time to come up
const p = sippUac('uas.xml', '172.38.0.10', from);
await waitFor(1000);
const startTime = Date.now();
const app_json = `[
{
"verb": "pause",
"length": 7
}
]`;
const post = bent('http://127.0.0.1:3000/', 'POST', 'json', 201);
post('v1/createCall', {
'account_sid':account_sid,
"call_hook": {
"url": "http://127.0.0.1:3100/",
"method": "POST",
"username": "username",
"password": "password"
},
app_json,
"from": from,
"to": {
"type": "phone",
"number": "15583084809"
},
"timeLimit": 1,
"speech_recognizer_vendor": "google",
"speech_recognizer_language": "en"
});
//THEN
await p;
const endTime = Date.now();
t.ok(endTime - startTime < 2000, 'create-call: timeLimit is respected');
disconnect();
} catch (err) {
console.log(`error received: ${err}`);
disconnect();
t.error(err);
}
});

View File

@@ -1,5 +1,4 @@
/* SQLEditor (MySQL (2))*/
SET FOREIGN_KEY_CHECKS=0;
DROP TABLE IF EXISTS account_static_ips;
@@ -54,8 +53,6 @@ DROP TABLE IF EXISTS signup_history;
DROP TABLE IF EXISTS smpp_addresses;
DROP TABLE IF EXISTS google_custom_voices;
DROP TABLE IF EXISTS speech_credentials;
DROP TABLE IF EXISTS system_information;
@@ -139,9 +136,6 @@ account_sid CHAR(36) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT 1,
username VARCHAR(64),
password VARCHAR(1024),
allow_direct_app_calling BOOLEAN NOT NULL DEFAULT 1,
allow_direct_queue_calling BOOLEAN NOT NULL DEFAULT 1,
allow_direct_user_calling BOOLEAN NOT NULL DEFAULT 1,
PRIMARY KEY (client_sid)
);
@@ -344,25 +338,11 @@ label VARCHAR(64),
PRIMARY KEY (speech_credential_sid)
);
CREATE TABLE google_custom_voices
(
google_custom_voice_sid CHAR(36) NOT NULL UNIQUE ,
speech_credential_sid CHAR(36) NOT NULL,
model VARCHAR(512) NOT NULL,
reported_usage ENUM('REPORTED_USAGE_UNSPECIFIED','REALTIME','OFFLINE') DEFAULT 'REALTIME',
name VARCHAR(64) NOT NULL,
voice_cloning_key MEDIUMTEXT,
use_voice_cloning_key BOOLEAN DEFAULT false,
PRIMARY KEY (google_custom_voice_sid)
);
CREATE TABLE system_information
(
domain_name VARCHAR(255),
sip_domain_name VARCHAR(255),
monitoring_domain_name VARCHAR(255),
private_network_cidr VARCHAR(8192),
log_level ENUM('info', 'debug') NOT NULL DEFAULT 'info'
monitoring_domain_name VARCHAR(255)
);
CREATE TABLE users
@@ -457,14 +437,11 @@ CREATE TABLE sip_gateways
sip_gateway_sid CHAR(36),
ipv4 VARCHAR(128) NOT NULL COMMENT 'ip address or DNS name of the gateway. For gateways providing inbound calling service, ip address is required.',
netmask INTEGER NOT NULL DEFAULT 32,
port INTEGER COMMENT 'sip signaling port',
port INTEGER NOT NULL DEFAULT 5060 COMMENT 'sip signaling port',
inbound BOOLEAN NOT NULL COMMENT 'if true, whitelist this IP to allow inbound calls from the gateway',
outbound BOOLEAN NOT NULL COMMENT 'if true, include in least-cost routing when placing calls to the PSTN',
voip_carrier_sid CHAR(36) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT 1,
send_options_ping BOOLEAN NOT NULL DEFAULT 0,
use_sips_scheme BOOLEAN NOT NULL DEFAULT 0,
pad_crypto BOOLEAN NOT NULL DEFAULT 0,
protocol ENUM('udp','tcp','tls', 'tls/srtp') DEFAULT 'udp' COMMENT 'Outbound call protocol',
PRIMARY KEY (sip_gateway_sid)
) COMMENT='A whitelisted sip gateway used for origination/termination';
@@ -501,19 +478,11 @@ messaging_hook_sid CHAR(36) COMMENT 'webhook to call for inbound SMS/MMS ',
app_json TEXT,
speech_synthesis_vendor VARCHAR(64) NOT NULL DEFAULT 'google',
speech_synthesis_language VARCHAR(12) NOT NULL DEFAULT 'en-US',
speech_synthesis_voice VARCHAR(256),
speech_synthesis_voice VARCHAR(64),
speech_synthesis_label VARCHAR(64),
speech_recognizer_vendor VARCHAR(64) NOT NULL DEFAULT 'google',
speech_recognizer_language VARCHAR(64) NOT NULL DEFAULT 'en-US',
speech_recognizer_label VARCHAR(64),
use_for_fallback_speech BOOLEAN DEFAULT false,
fallback_speech_synthesis_vendor VARCHAR(64),
fallback_speech_synthesis_language VARCHAR(12),
fallback_speech_synthesis_voice VARCHAR(256),
fallback_speech_synthesis_label VARCHAR(64),
fallback_speech_recognizer_vendor VARCHAR(64),
fallback_speech_recognizer_language VARCHAR(64),
fallback_speech_recognizer_label VARCHAR(64),
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
record_all_calls BOOLEAN NOT NULL DEFAULT false,
PRIMARY KEY (application_sid)
@@ -556,7 +525,6 @@ siprec_hook_sid CHAR(36),
record_all_calls BOOLEAN NOT NULL DEFAULT false,
record_format VARCHAR(16) NOT NULL DEFAULT 'mp3',
bucket_credential VARCHAR(8192) COMMENT 'credential used to authenticate with storage service',
enable_debug_log BOOLEAN NOT NULL DEFAULT false,
PRIMARY KEY (account_sid)
) COMMENT='An enterprise that uses the platform for comm services';
@@ -651,10 +619,6 @@ ALTER TABLE speech_credentials ADD FOREIGN KEY service_provider_sid_idxfk_5 (ser
CREATE INDEX account_sid_idx ON speech_credentials (account_sid);
ALTER TABLE speech_credentials ADD FOREIGN KEY account_sid_idxfk_8 (account_sid) REFERENCES accounts (account_sid);
CREATE INDEX google_custom_voice_sid_idx ON google_custom_voices (google_custom_voice_sid);
CREATE INDEX speech_credential_sid_idx ON google_custom_voices (speech_credential_sid);
ALTER TABLE google_custom_voices ADD FOREIGN KEY speech_credential_sid_idxfk (speech_credential_sid) REFERENCES speech_credentials (speech_credential_sid) ON DELETE CASCADE;
CREATE INDEX user_sid_idx ON users (user_sid);
CREATE INDEX email_idx ON users (email);
CREATE INDEX phone_idx ON users (phone);
@@ -740,5 +704,4 @@ ALTER TABLE accounts ADD FOREIGN KEY queue_event_hook_sid_idxfk (queue_event_hoo
ALTER TABLE accounts ADD FOREIGN KEY device_calling_application_sid_idxfk (device_calling_application_sid) REFERENCES applications (application_sid);
ALTER TABLE accounts ADD FOREIGN KEY siprec_hook_sid_idxfk (siprec_hook_sid) REFERENCES applications (application_sid);
SET FOREIGN_KEY_CHECKS=1;
SET FOREIGN_KEY_CHECKS=1;

View File

@@ -42,7 +42,7 @@ services:
ipv4_address: 172.38.0.7
drachtio:
image: drachtio/drachtio-server:0.8.26
image: drachtio/drachtio-server:0.8.25-rc8
restart: always
command: drachtio --contact "sip:*;transport=udp" --mtu 4096 --address 0.0.0.0 --port 9022
ports:
@@ -57,7 +57,7 @@ services:
condition: service_healthy
freeswitch:
image: drachtio/drachtio-freeswitch-mrf:0.9.2-4
image: drachtio/drachtio-freeswitch-mrf:0.7.3
restart: always
command: freeswitch --rtp-range-start 20000 --rtp-range-end 20100
environment:

View File

@@ -25,38 +25,29 @@ module.exports = (serviceName) => {
}),
});
const exporters = [];
let exporter;
if (OTEL_EXPORTER_JAEGER_AGENT_HOST || OTEL_EXPORTER_JAEGER_ENDPOINT) {
exporters.push(new JaegerExporter());
exporter = new JaegerExporter();
}
if (OTEL_EXPORTER_ZIPKIN_URL) {
exporters.push(new ZipkinExporter({url:OTEL_EXPORTER_ZIPKIN_URL}));
else if (OTEL_EXPORTER_ZIPKIN_URL) {
exporter = new ZipkinExporter({url:OTEL_EXPORTER_ZIPKIN_URL});
}
if (OTEL_EXPORTER_ZIPKIN_URL) {
exporters.push(new ZipkinExporter({url:OTEL_EXPORTER_ZIPKIN_URL}));
}
if (OTEL_EXPORTER_COLLECTOR_URL) {
exporters.push(new OTLPTraceExporter({
else {
exporter = new OTLPTraceExporter({
url: OTEL_EXPORTER_COLLECTOR_URL
}));
});
}
exporters.forEach((element) => {
provider.addSpanProcessor(new BatchSpanProcessor(element, {
// The maximum queue size. After the size is reached spans are dropped.
maxQueueSize: 100,
// The maximum batch size of every export. It must be smaller or equal to maxQueueSize.
maxExportBatchSize: 10,
// The interval between two consecutive exports
scheduledDelayMillis: 500,
// How long the export can run before it is cancelled
exportTimeoutMillis: 30000,
}));
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter, {
// The maximum queue size. After the size is reached spans are dropped.
maxQueueSize: 100,
// The maximum batch size of every export. It must be smaller or equal to maxQueueSize.
maxExportBatchSize: 10,
// The interval between two consecutive exports
scheduledDelayMillis: 500,
// How long the export can run before it is cancelled
exportTimeoutMillis: 30000,
}));
// Initialize the OpenTelemetry APIs to use the NodeTracerProvider bindings
provider.register();