Compare commits

..

27 Commits

Author SHA1 Message Date
Dave Horton
f670626cf7 google gemini set default model to models/gemini-2.0-flash-live-001 2025-05-22 08:12:58 -04:00
Hoan Luu Huu
b92a9c700e fixed google s2s mcp initiate wrong functionDeclarations (#1203)
* fixed google s2s mcp initiate wrong functionDeclarations

* populate model from llm verb to google setup message
2025-05-18 20:15:26 -04:00
Anton Voylenko
761b7f26e7 fix: member mute on conference (#1048) 2025-05-18 14:11:56 -04:00
Dave Horton
76df58bfc2 fix logging in start task msg (#1202)
* fix logging in start task msg

* generate uuids using native crypto lib
2025-05-16 16:54:25 -04:00
Dave Horton
c1cb57c3f6 update version 2025-05-14 15:38:15 -04:00
Dave Horton
610c9af274 update db-helpers 2025-05-13 10:32:30 -04:00
Dave Horton
c0a35af591 update to 0.2.10 speech-utils (#1199) 2025-05-13 10:11:26 -04:00
Hoan Luu Huu
9585018147 support whisper instructions (#1198)
* support whisper instructions

* wip

* update speech utils and verb specification
2025-05-13 09:44:00 -04:00
Hoan Luu Huu
d7884a837a update deepgram voice agent (#1191)
* update deepgram voice agent

* fix lint

* wip

* wip
2025-05-13 07:43:48 -04:00
Dave Horton
ca0bf36815 dont apply snake casing to either env vars or tool call args (#1194) (#1197) 2025-05-12 12:56:58 -04:00
Sam Machin
6b68d32e2c end if last_word_end is -1 (#1196)
* end if last_word_end is -1

* lint
2025-05-12 12:11:32 -04:00
rammohan-y
8217a76697 Removed this.name from Task constructor, as LLM's names are populated post calling the base construction (#1192)
Also fixed a jslint error
2025-05-12 09:14:33 -04:00
rammohan-y
5c8237b382 Feat 1179 race issue with play verb (#1183)
* Fixed race issue between queueCommand false and queueCommand true when play task is involved

https://github.com/jambonz/jambonz-feature-server/issues/1179

* removed unnecessary emitter

* added destroy mechanism for stickyEventEmitter

* clearing stickyEventEmitter

* memory leak fix
2025-05-11 20:25:48 -04:00
Vasudev Anubrolu
4ff5c845de feat/864 update speech utils for playht on prem (#1187)
* feat/864 update speech utils for playht on prem

* feat/864 update speech utils version package lock
2025-05-09 12:34:14 -04:00
Anton Voylenko
78ebd08490 feat: prioritize JAMBONES_LOGLEVEL over db setting (#1188) 2025-05-09 09:41:23 -04:00
Hoan Luu Huu
8b18532f31 fixed tts streaming buffer cannot reset timeoutwhen lastUpdateTime is short (#1184)
* fixed tts streaming buffer cannot reset timeoutwhen lastUpdateTime is short

* wip
2025-05-07 10:26:11 -04:00
rammohan-y
e4bb00b382 Send stop-playback event (#1186)
* Send stop-playback event

https://github.com/jambonz/jambonz-feature-server/issues/1185

* check if not notified in playback-stop, ensure that the stop-playback is sent when kill-playback is sent
2025-05-07 08:59:59 -04:00
Hoan Luu Huu
14295dcebc support google s2s (#1169)
* support google s2s

* wip
2025-05-07 07:20:33 -04:00
Hoan Luu Huu
4d68c179ea sip_decline release callSession if ws requestor is used (#1182) 2025-05-06 10:01:36 -04:00
Hoan Luu Huu
6205959f53 fix microsoft stt max client buffer size error for transcribe verb (#1173) 2025-04-29 09:41:24 -04:00
Hoan Luu Huu
ed92cb2632 update speech utils 0.2.7 (#1177)
* update speech utils 0.2.7

* wip
2025-04-29 08:26:09 -04:00
Sam Machin
3098e04ed6 send env_vars in callHook (#1175)
* send env_vars in callHook

* lint

* add try/catch
2025-04-28 09:51:37 -04:00
Hoan Luu Huu
7e2fe72b6c fix say verb cannot failover if tts_response-code != 2xx (#1174) 2025-04-28 08:46:08 -04:00
Hoan Luu Huu
c2666b7a09 fixed deepgram gather cannot be timeout on empty transcription with continueAsr (#1171) 2025-04-28 08:36:31 -04:00
Hoan Luu Huu
9d54ca8116 Jambonz support Model context protocol (MCP) (#1150)
* Jambonz support Model context protocol (MCP)

* merged mcp tools with existing llmOptions.tools

* support list of mcp servers

* wip

* wip

* wip

* fix voice agent

* fix open-ai

* fix review comment

* fix deepgram voice agent

* update verb specification version
2025-04-24 06:50:53 -04:00
Sam Machin
472f4f4532 clientTools over webhooks (#1167)
* clientTools over webhooks

* lint

* simpler toolHook response
2025-04-23 09:15:16 -04:00
Hoan Luu Huu
63899d0091 update speech utils version 0.2.6 (#1172) 2025-04-23 08:22:47 -04:00
30 changed files with 1523 additions and 196 deletions

14
app.js
View File

@@ -30,6 +30,13 @@ const installSrfLocals = require('./lib/utils/install-srf-locals');
const createHttpListener = require('./lib/utils/http-listener');
const healthCheck = require('@jambonz/http-health-check');
logger.on('level-change', (lvl, _val, prevLvl, _prevVal, instance) => {
if (logger !== instance) {
return;
}
logger.info('system log level %s was changed to %s', prevLvl, lvl);
});
// Install the srf locals
installSrfLocals(srf, logger, {
onFreeswitchConnect: (wraper) => {
@@ -133,7 +140,12 @@ const monInterval = setInterval(async() => {
try {
const systemInformation = await srf.locals.dbHelpers.lookupSystemInformation();
if (systemInformation && systemInformation.log_level) {
logger.level = systemInformation.log_level;
const envLogLevel = logger.levels.values[JAMBONES_LOGLEVEL.toLowerCase()];
const dbLogLevel = logger.levels.values[systemInformation.log_level];
const appliedLogLevel = Math.min(envLogLevel, dbLogLevel);
if (logger.levelVal !== appliedLogLevel) {
logger.level = logger.levels.labels[Math.min(envLogLevel, dbLogLevel)];
}
}
} catch (err) {
if (process.env.NODE_ENV === 'test') {

View File

@@ -3,7 +3,7 @@ const makeTask = require('../../tasks/make_task');
const RestCallSession = require('../../session/rest-call-session');
const CallInfo = require('../../session/call-info');
const {CallDirection, CallStatus} = require('../../utils/constants');
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const SipError = require('drachtio-srf').SipError;
const { validationResult, body } = require('express-validator');
const { validate } = require('@jambonz/verb-specifications');
@@ -80,7 +80,7 @@ router.post('/',
const {lookupTeamsByAccount, lookupAccountBySid} = srf.locals.dbHelpers;
const account = await lookupAccountBySid(req.body.account_sid);
const accountInfo = await lookupAccountDetails(req.body.account_sid);
const callSid = uuidv4();
const callSid = crypto.randomUUID();
const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null;
const record_all_calls = account.record_all_calls || (application && application.record_all_calls);
const recordOutputFormat = account.record_format || 'mp3';

View File

@@ -1,4 +1,4 @@
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const {CallDirection, AllowedSipRecVerbs, WS_CLOSE_CODES} = require('./utils/constants');
const {parseSiprecPayload} = require('./utils/siprec-utils');
const CallInfo = require('./session/call-info');
@@ -15,6 +15,7 @@ const {
JAMBONES_DISABLE_DIRECT_P2P_CALL
} = require('./config');
const { createJambonzApp } = require('./dynamic-apps');
const { decrypt } = require('./utils/encrypt-decrypt');
module.exports = function(srf, logger) {
const {
@@ -45,7 +46,7 @@ module.exports = function(srf, logger) {
logger.info('getAccountDetails - rejecting call due to missing X-Account-Sid header');
return res.send(500);
}
const callSid = req.has('X-Retain-Call-Sid') ? req.get('X-Retain-Call-Sid') : uuidv4();
const callSid = req.has('X-Retain-Call-Sid') ? req.get('X-Retain-Call-Sid') : crypto.randomUUID();
const account_sid = req.get('X-Account-Sid');
req.locals = {callSid, account_sid, callId};
@@ -348,11 +349,10 @@ module.exports = function(srf, logger) {
}
req.locals.application = app2;
// eslint-disable-next-line no-unused-vars
const {call_hook, call_status_hook, ...appInfo} = app; // mask sensitive data like user/pass on webhook
// eslint-disable-next-line no-unused-vars
const {requestor, notifier, ...loggable} = appInfo;
const {requestor, notifier, env_vars, ...loggable} = appInfo;
logger.info({app: loggable}, `retrieved application for incoming call to ${req.locals.calledNumber}`);
req.locals.callInfo = new CallInfo({
req,
@@ -417,10 +417,22 @@ module.exports = function(srf, logger) {
...(app.fallback_speech_recognizer_language && {fallback_language: app.fallback_speech_recognizer_language})
}
};
let env_vars;
try {
if (app.env_vars) {
const d_env_vars = JSON.parse(decrypt(app.env_vars));
logger.info(`Setting env_vars: ${Object.keys(d_env_vars)}`); // Only log the keys not the values
env_vars = d_env_vars;
}
} catch (error) {
logger.info('Unable to set env_vars', error);
}
const params = Object.assign(['POST', 'WS'].includes(app.call_hook.method) ? { sip: req.msg } : {},
req.locals.callInfo,
{ service_provider_sid: req.locals.service_provider_sid },
{ defaults });
{ defaults },
{ env_vars }
);
logger.debug({ params }, 'sending initial webhook');
const obj = rootSpan.startChildSpan('performAppWebhook');
span = obj.span;

View File

@@ -1,6 +1,6 @@
const {CallDirection, CallStatus} = require('../utils/constants');
const parseUri = require('drachtio-srf').parseUri;
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const {JAMBONES_API_BASE_URL} = require('../config');
/**
* @classdesc Represents the common information for all calls
@@ -57,7 +57,7 @@ class CallInfo {
// outbound call that is a child of an existing call
const {req, parentCallInfo, to, callSid} = opts;
srf = req.srf;
this.callSid = callSid || uuidv4();
this.callSid = callSid || crypto.randomUUID();
this.parentCallSid = parentCallInfo.callSid;
this.accountSid = parentCallInfo.accountSid;
this.applicationSid = parentCallInfo.applicationSid;

View File

@@ -23,6 +23,7 @@ const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const ActionHookDelayProcessor = require('../utils/action-hook-delay');
const TtsStreamingBuffer = require('../utils/tts-streaming-buffer');
const StickyEventEmitter = require('../utils/sticky-event-emitter');
const {parseUri} = require('drachtio-srf');
const {
JAMBONES_INJECT_CONTENT,
@@ -79,6 +80,10 @@ class CallSession extends Emitter {
this.callGone = false;
this.notifiedComplete = false;
this.rootSpan = rootSpan;
this.stickyEventEmitter = new StickyEventEmitter();
this.stickyEventEmitter.onSuccess = () => {
this.taskInProgress = null;
};
this.backgroundTaskManager = new BackgroundTaskManager({
cs: this,
logger,
@@ -1180,7 +1185,10 @@ class CallSession extends Emitter {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
const task = this.tasks.shift();
this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name}`);
this.isCurTaskPlay = TaskName.Play === task.name;
this.taskInProgress = task;
this.logger.info(
`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name} (task id: ${task.taskId})`);
this._notifyTaskStatus(task, {event: 'starting'});
// Register verbhook span wait for end
task.on('VerbHookSpanWaitForEnd', ({span}) => {
@@ -1919,6 +1927,8 @@ Duration=${duration} `
this.logger.debug({tasks: listTaskNames(tasks)},
`CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`);
if (this.currentTask) {
this.logger.debug('CallSession:replaceApplication - killing current task ' +
this.currentTask?.name + ', taskId: ' + this.currentTask.taskId);
this.currentTask.kill(this, KillReason.Replaced);
this.currentTask = null;
}
@@ -1927,6 +1937,10 @@ Duration=${duration} `
this.wakeupResolver({reason: 'new tasks'});
this.wakeupResolver = null;
}
if ((!this.currentTask || this.currentTask === undefined) && this.isCurTaskPlay) {
this.logger.debug(`CallSession:replaceApplication - emitting uuid_break, taskId: ${this.taskInProgress?.taskId}`);
this.stickyEventEmitter.emit('uuid_break', this.taskInProgress);
}
}
kill(onBackgroundGatherBargein = false) {
@@ -2387,6 +2401,9 @@ Duration=${duration} `
* Hang up the call and free the media endpoint
*/
async _clearResources() {
this.stickyEventEmitter.destroy();
this.stickyEventEmitter = null;
this.taskInProgress = null;
for (const resource of [this.dlg, this.ep, this.ep2]) {
try {
if (resource && resource.connected) await resource.destroy();

View File

@@ -465,7 +465,7 @@ class Conference extends Task {
doConferenceMute(cs, opts) {
assert (cs.isInConference);
this.logger.info(`Conference:doConferenceMute ${mute ? 'muting' : 'unmuting'} member`);
const mute = opts.conf_mute_status === 'mute';
this.ep.api(`conference ${this.confName} ${mute ? 'mute' : 'unmute'} ${this.memberId}`)
.catch((err) => this.logger.info({err}, 'Error muting or unmuting participant'));
@@ -568,8 +568,8 @@ class Conference extends Task {
/**
* mute or unmute side of the call
*/
mute(callSid, doMute) {
this.doConferenceMute(this.callSession, {conf_mute_status: doMute});
async mute(callSid, doMute) {
this.doConferenceMute(this.callSession, {conf_mute_status: doMute ? 'mute' : 'unmute'});
}
/**

View File

@@ -685,7 +685,9 @@ class TaskGather extends SttTask {
}
_startAsrTimer() {
if (this.vendor === 'deepgram') return; // no need
// Deepgram has a case that UtteranceEnd is not sent to cover the last word end time.
// So we need to wait for the asrTimeout to be sure that the last word is sent.
// if (this.vendor === 'deepgram') return; // no need
assert(this.isContinuousAsr);
this._clearAsrTimer();
this._asrTimer = setTimeout(() => {
@@ -845,7 +847,8 @@ class TaskGather extends SttTask {
}
else {
const utteranceTime = evt.last_word_end;
if (utteranceTime && this._dgTimeOfLastUnprocessedWord && utteranceTime < this._dgTimeOfLastUnprocessedWord) {
// eslint-disable-next-line max-len
if (utteranceTime && this._dgTimeOfLastUnprocessedWord && utteranceTime < this._dgTimeOfLastUnprocessedWord && utteranceTime != -1) {
this.logger.debug('Gather:_onTranscription - got UtteranceEnd with unprocessed words, continue listening');
}
else {

View File

@@ -4,6 +4,8 @@ const TaskLlmOpenAI_S2S = require('./llms/openai_s2s');
const TaskLlmVoiceAgent_S2S = require('./llms/voice_agent_s2s');
const TaskLlmUltravox_S2S = require('./llms/ultravox_s2s');
const TaskLlmElevenlabs_S2S = require('./llms/elevenlabs_s2s');
const TaskLlmGoogle_S2S = require('./llms/google_s2s');
const LlmMcpService = require('../../utils/llm-mcp');
class TaskLlm extends Task {
constructor(logger, opts) {
@@ -18,6 +20,8 @@ class TaskLlm extends Task {
// delegate to the specific llm model
this.llm = this.createSpecificLlm();
// MCP
this.mcpServers = this.data.mcpServers || [];
}
get name() { return this.llm.name ; }
@@ -28,14 +32,32 @@ class TaskLlm extends Task {
get ep() { return this.cs.ep; }
get mcpService() {
return this.llmMcpService;
}
get isMcpEnabled() {
return this.mcpServers.length > 0;
}
async exec(cs, {ep}) {
await super.exec(cs, {ep});
// create the MCP service if we have MCP servers
if (this.isMcpEnabled) {
this.llmMcpService = new LlmMcpService(this.logger, this.mcpServers);
await this.llmMcpService.init();
}
await this.llm.exec(cs, {ep});
}
async kill(cs) {
super.kill(cs);
await this.llm.kill(cs);
// clean up MCP clients
if (this.isMcpEnabled) {
await this.mcpService.close();
}
}
createSpecificLlm() {
@@ -59,6 +81,10 @@ class TaskLlm extends Task {
llm = new TaskLlmElevenlabs_S2S(this.logger, this.data, this);
break;
case 'google':
llm = new TaskLlmGoogle_S2S(this.logger, this.data, this);
break;
default:
throw new Error(`Unsupported vendor ${this.vendor} for LLM`);
}
@@ -82,8 +108,15 @@ class TaskLlm extends Task {
await this.cs?.requestor.request('llm:event', this.eventHook, data);
}
async sendToolHook(tool_call_id, data) {
await this.cs?.requestor.request('llm:tool-call', this.toolHook, {tool_call_id, ...data});
const tool_response = await this.cs?.requestor.request('llm:tool-call', this.toolHook, {tool_call_id, ...data});
// if the toolHook was a websocket it will return undefined, otherwise it should return an object
if (typeof tool_response != 'undefined') {
tool_response.type = 'client_tool_result';
tool_response.invocation_id = tool_call_id;
this.processToolOutput(tool_call_id, tool_response);
}
}
async processToolOutput(tool_call_id, data) {

View File

@@ -244,13 +244,36 @@ class TaskLlmElevenlabs_S2S extends Task {
/* tool calls */
else if (type === 'client_tool_call') {
this.logger.debug({evt}, 'TaskLlmElevenlabs_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
const {tool_name: name, tool_call_id: call_id, parameters: args} = evt.client_tool_call;
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools.some((tool) => tool.name === name)) {
this.logger.debug({name, args}, 'TaskLlmElevenlabs_S2S:_onServerEvent - calling mcp tool');
try {
const res = await this.parent.mcpService.callMcpTool(name, args);
this.logger.debug({res}, 'TaskLlmElevenlabs_S2S:_onServerEvent - function_call - mcp result');
this.processToolOutput(ep, call_id, {
data: {
type: 'client_tool_result',
tool_call_id: call_id,
result: res.content?.length ? res.content[0] : res.content,
is_error: false
}
});
return;
}
catch (err) {
this.logger.info({err, evt}, 'TaskLlmElevenlabs_S2S - error calling mcp tool');
this.results = {
completionReason: 'client error calling mcp function',
error: err
};
endConversation = true;
}
} else if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmElevenlabs_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {client_tool_call} = evt;
const {tool_name: name, tool_call_id: call_id, parameters: args} = client_tool_call;
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {

View File

@@ -0,0 +1,319 @@
const Task = require('../../task');
const TaskName = 'Llm_Google_s2s';
const {LlmEvents_Google} = require('../../../utils/constants');
const ClientEvent = 'client.event';
const SessionDelete = 'session.delete';
const google_server_events = [
'error',
'session.created',
'session.updated',
];
const expandWildcards = (events) => {
const expandedEvents = [];
events.forEach((evt) => {
if (evt.endsWith('.*')) {
const prefix = evt.slice(0, -2); // Remove the wildcard ".*"
const matchingEvents = google_server_events.filter((e) => e.startsWith(prefix));
expandedEvents.push(...matchingEvents);
} else {
expandedEvents.push(evt);
}
});
return expandedEvents;
};
class TaskLlmGoogle_S2S extends Task {
constructor(logger, opts, parentTask) {
super(logger, opts, parentTask);
this.parent = parentTask;
this.vendor = this.parent.vendor;
this.vendor = this.parent.vendor;
this.model = this.parent.model || 'models/gemini-2.0-flash-live-001';
this.auth = this.parent.auth;
this.connectionOptions = this.parent.connectOptions;
const {apiKey} = this.auth || {};
if (!apiKey) throw new Error('auth.apiKey is required for Google S2S');
this.apiKey = apiKey;
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
const {setup} = this.data.llmOptions;
if (typeof setup !== 'object') {
throw new Error('llmOptions with an initial setup is required for Google S2S');
}
this.setup = {
...setup,
model: this.model,
// make sure output is always audio
generationConfig: {
...(setup.generationConfig || {}),
responseModalities: 'audio'
}
};
this.results = {
completionReason: 'normal conversation end'
};
/**
* only one of these will have items,
* if includeEvents, then these are the events to include
* if excludeEvents, then these are the events to exclude
*/
this.includeEvents = [];
this.excludeEvents = [];
/* default to all events if user did not specify */
this._populateEvents(this.data.events || google_server_events);
this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask);
this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask);
}
get name() { return TaskName; }
async _api(ep, args) {
const res = await ep.api('uuid_google_s2s', `^^|${args.join('|')}`);
if (!res.body?.startsWith('+OK')) {
throw new Error({args}, `Error calling uuid_openai_s2s: ${res.body}`);
}
}
async exec(cs, {ep}) {
await super.exec(cs);
await this._startListening(cs, ep);
await this.awaitTaskDone();
/* note: the parent llm verb started the span, which is why this is necessary */
await this.parent.performAction(this.results);
this._unregisterHandlers();
}
async kill(cs) {
super.kill(cs);
this._api(cs.ep, [cs.ep.uuid, SessionDelete])
.catch((err) => this.logger.info({err}, 'TaskLlmGoogle_S2S:kill - error deleting session'));
this.notifyTaskDone();
}
_populateEvents(events) {
if (events.includes('all')) {
/* work by excluding specific events */
const exclude = events
.filter((evt) => evt.startsWith('-'))
.map((evt) => evt.slice(1));
if (exclude.length === 0) this.includeEvents = google_server_events;
else this.excludeEvents = expandWildcards(exclude);
}
else {
/* work by including specific events */
const include = events
.filter((evt) => !evt.startsWith('-'));
this.includeEvents = expandWildcards(include);
}
this.logger.debug({
includeEvents: this.includeEvents,
excludeEvents: this.excludeEvents
}, 'TaskLlmGoogle_S2S:_populateEvents');
}
async _startListening(cs, ep) {
this._registerHandlers(ep);
try {
const args = [ep.uuid, 'session.create', this.apiKey];
await this._api(ep, args);
} catch (err) {
this.logger.error({err}, 'TaskLlmGoogle_S2S:_startListening');
this.notifyTaskDone();
}
}
async _sendClientEvent(ep, obj) {
let ok = true;
this.logger.debug({obj}, 'TaskLlmGoogle_S2S:_sendClientEvent');
try {
const args = [ep.uuid, ClientEvent, JSON.stringify(obj)];
await this._api(ep, args);
} catch (err) {
ok = false;
this.logger.error({err}, 'TaskLlmGoogle_S2S:_sendClientEvent - Error');
}
return ok;
}
async _sendInitialMessage(ep) {
const setup = this.setup;
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools && mcpTools.length > 0) {
const convertedTools = [
{
functionDeclarations: mcpTools.map((tool) => {
if (tool.inputSchema) {
delete tool.inputSchema.additionalProperties;
delete tool.inputSchema['$schema'];
}
return {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
};
})
}
];
// merge with any existing tools
setup.tools = [...convertedTools, ...(this.setup.tools || [])];
}
if (!await this._sendClientEvent(ep, {
setup,
})) {
this.logger.debug(this.setup, 'TaskLlmGoogle_S2S:_sendInitialMessage - sending session.update');
this.notifyTaskDone();
}
}
_registerHandlers(ep) {
this.addCustomEventListener(ep, LlmEvents_Google.Connect, this._onConnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Google.ConnectFailure, this._onConnectFailure.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Google.Disconnect, this._onDisconnect.bind(this, ep));
this.addCustomEventListener(ep, LlmEvents_Google.ServerEvent, this._onServerEvent.bind(this, ep));
}
_unregisterHandlers() {
this.removeCustomEventListeners();
}
_onError(ep, evt) {
this.logger.info({evt}, 'TaskLlmGoogle_S2S:_onError');
this.notifyTaskDone();
}
_onConnect(ep) {
this.logger.debug('TaskLlmGoogle_S2S:_onConnect');
this._sendInitialMessage(ep);
}
_onConnectFailure(_ep, evt) {
this.logger.info(evt, 'TaskLlmGoogle_S2S:_onConnectFailure');
this.results = {completionReason: 'connection failure'};
this.notifyTaskDone();
}
_onDisconnect(_ep, evt) {
this.logger.info(evt, 'TaskLlmGoogle_S2S:_onConnectFailure');
this.results = {completionReason: 'disconnect from remote end'};
this.notifyTaskDone();
}
async _onServerEvent(ep, evt) {
let endConversation = false;
this.logger.debug({evt}, 'TaskLlmGoogle_S2S:_onServerEvent');
const {toolCall /**toolCallCancellation*/} = evt;
if (toolCall) {
this.logger.debug({toolCall}, 'TaskLlmGoogle_S2S:_onServerEvent - toolCall');
if (!this.toolHook) {
this.logger.info({evt}, 'TaskLlmGoogle_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {functionCalls} = toolCall;
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
const functionResponses = [];
if (mcpTools && mcpTools.length > 0) {
for (const functionCall of functionCalls) {
const {name, args, id} = functionCall;
const tool = mcpTools.find((tool) => tool.name === name);
if (tool) {
const response = await this.parent.mcpService.callMcpTool(name, args);
functionResponses.push({
response: {
output: response,
},
id
});
}
}
}
if (functionResponses && functionResponses.length > 0) {
this.logger.debug({functionResponses}, 'TaskLlmGoogle_S2S:_onServerEvent - function_call - mcp result');
this.processToolOutput(ep, 'tool_call_id', {
toolResponse: {
functionResponses
}
});
} else {
try {
await this.parent.sendToolHook('function_call_id', {type: 'toolCall', functionCalls});
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmGoogle_S2S - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err
};
endConversation = true;
}
}
}
}
this._sendLlmEvent('llm_event', evt);
if (endConversation) {
this.logger.info({results: this.results},
'TaskLlmGoogle_S2S:_onServerEvent - ending conversation due to error');
this.notifyTaskDone();
}
}
_sendLlmEvent(type, evt) {
/* check whether we should notify on this event */
if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) {
this.parent.sendEventHook(evt)
.catch((err) => this.logger.info({err}, 'TaskLlmGoogle_S2S:_onServerEvent - error sending event hook'));
}
}
async processLlmUpdate(ep, data, _callSid) {
try {
this.logger.debug({data, _callSid}, 'TaskLlmGoogle_S2S:processLlmUpdate');
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
} catch (err) {
this.logger.info({err, data}, 'TaskLlmGoogle_S2S:processLlmUpdate - Error processing LLM update');
}
}
async processToolOutput(ep, tool_call_id, data) {
try {
this.logger.debug({tool_call_id, data}, 'TaskLlmGoogle_S2S:processToolOutput');
const {toolResponse} = data;
if (!toolResponse) {
this.logger.info({data},
'TaskLlmGoogle_S2S:processToolOutput - invalid tool output, must be functionResponses');
}
else {
await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]);
}
} catch (err) {
this.logger.info({err, data}, 'TaskLlmGoogle_S2S:processToolOutput - Error processing tool output');
}
}
}
module.exports = TaskLlmGoogle_S2S;

View File

@@ -235,6 +235,23 @@ class TaskLlmOpenAI_S2S extends Task {
/* send immediate session.update if present */
else if (this.session_update) {
if (this.parent.isMcpEnabled) {
this.logger.debug('TaskLlmOpenAI_S2S:_sendInitialMessage - mcp enabled');
const tools = await this.parent.mcpService.getAvailableMcpTools();
if (tools && tools.length > 0 && this.session_update) {
const convertedTools = tools.map((tool) => ({
name: tool.name,
type: 'function',
description: tool.description,
parameters: tool.inputSchema
}));
this.session_update.tools = [
...convertedTools,
...(this.session_update.tools || [])
];
}
}
obj = {type: 'session.update', session: this.session_update};
this.logger.debug({obj}, 'TaskLlmOpenAI_S2S:_sendInitialMessage - sending session.update');
if (!await this._sendClientEvent(ep, obj)) {
@@ -299,13 +316,37 @@ class TaskLlmOpenAI_S2S extends Task {
/* tool calls */
else if (type === 'response.output_item.done' && evt.item?.type === 'function_call') {
this.logger.debug({evt}, 'TaskLlmOpenAI_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
const {name, call_id} = evt.item;
const args = JSON.parse(evt.item.arguments);
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools.some((tool) => tool.name === name)) {
this.logger.debug({call_id, name, args}, 'TaskLlmOpenAI_S2S:_onServerEvent - calling mcp tool');
try {
const res = await this.parent.mcpService.callMcpTool(name, args);
this.logger.debug({res}, 'TaskLlmOpenAI_S2S:_onServerEvent - function_call - mcp result');
this.processToolOutput(ep, call_id, {
type: 'conversation.item.create',
item: {
type: 'function_call_output',
call_id,
output: res.content[0]?.text || 'There is no output from the function call',
}
});
return;
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmOpenAI_S2S - error calling function');
this.results = {
completionReason: 'client error calling mcp function',
error: err
};
endConversation = true;
}
}
else if (!this.toolHook) {
this.logger.warn({evt}, 'TaskLlmOpenAI_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {name, call_id} = evt.item;
const args = JSON.parse(evt.item.arguments);
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {

View File

@@ -67,7 +67,50 @@ class TaskLlmUltravox_S2S extends Task {
}
}
/**
* Converts a JSON Schema to the dynamic parameters format used in the Ultravox API
* @param {Object} jsonSchema - A JSON Schema object defining parameters
* @param {string} locationDefault - Default location value for parameters (default: 'PARAMETER_LOCATION_BODY')
* @returns {Array} Array of dynamic parameters objects
*/
transformSchemaToParameters(jsonSchema, locationDefault = 'PARAMETER_LOCATION_BODY') {
if (jsonSchema.properties) {
const required = jsonSchema.required || [];
return Object.entries(jsonSchema.properties).map(([name]) => {
return {
name,
location: locationDefault,
required: required.includes(name)
};
});
}
return [];
}
async createCall() {
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools && mcpTools.length > 0) {
const convertedTools = mcpTools.map((tool) => {
return {
temporaryTool: {
modelToolName: tool.name,
description: tool.description,
dynamicParameters: this.transformSchemaToParameters(tool.inputSchema),
// use client tool that ultravox call tool via freeswitch module.
client: {}
}
};
}
);
// merge with any existing tools
this.data.llmOptions.selectedTools = [
...convertedTools,
...(this.data.llmOptions.selectedTools || [])
];
}
const payload = {
...this.data.llmOptions,
model: this.model,
@@ -182,12 +225,35 @@ class TaskLlmUltravox_S2S extends Task {
/* tool calls */
else if (type === 'client_tool_invocation') {
this.logger.debug({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
const {toolName: name, invocationId: call_id, parameters: args} = evt;
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools.some((tool) => tool.name === name)) {
this.logger.debug({
name,
input: args
}, 'TaskLlmUltravox_S2S:_onServerEvent - function_call - mcp tool');
try {
const res = await this.parent.mcpService.callMcpTool(name, args);
this.logger.debug({res}, 'TaskLlmUltravox_S2S:_onServerEvent - function_call - mcp result');
this.processToolOutput(_ep, call_id, {
type: 'client_tool_result',
invocation_id: call_id,
result: res.content
});
return;
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmUltravox_S2S - error calling mcp tool');
this.results = {
completionReason: 'client error calling mcp function',
error: err
};
endConversation = true;
}
} else if (!this.toolHook) {
this.logger.info({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {toolName: name, invocationId: call_id, parameters: args} = evt;
try {
await this.parent.sendToolHook(call_id, {name, args});
} catch (err) {

View File

@@ -41,25 +41,25 @@ class TaskLlmVoiceAgent_S2S extends Task {
this.actionHook = this.data.actionHook;
this.eventHook = this.data.eventHook;
this.toolHook = this.data.toolHook;
const {settingsConfiguration} = this.data.llmOptions;
const {Settings} = this.data.llmOptions;
if (typeof settingsConfiguration !== 'object') {
throw new Error('llmOptions with an initial settingsConfiguration is required for VoiceAgent S2S');
if (typeof Settings !== 'object') {
throw new Error('llmOptions with an initial Settings is required for VoiceAgent S2S');
}
// eslint-disable-next-line no-unused-vars
const {audio, ...rest} = settingsConfiguration;
const cfg = this.settingsConfiguration = rest;
const {audio, ...rest} = Settings;
const cfg = this.Settings = rest;
if (!cfg.agent) throw new Error('llmOptions.settingsConfiguration.agent is required for VoiceAgent S2S');
if (!cfg.agent) throw new Error('llmOptions.Settings.agent is required for VoiceAgent S2S');
if (!cfg.agent.think) {
throw new Error('llmOptions.settingsConfiguration.agent.think is required for VoiceAgent S2S');
throw new Error('llmOptions.Settings.agent.think is required for VoiceAgent S2S');
}
if (!cfg.agent.think.model) {
throw new Error('llmOptions.settingsConfiguration.agent.think.model is required for VoiceAgent S2S');
if (!cfg.agent.think.provider?.model) {
throw new Error('llmOptions.Settings.agent.think.provider.model is required for VoiceAgent S2S');
}
if (!cfg.agent.think.provider?.type) {
throw new Error('llmOptions.settingsConfiguration.agent.think.provider.type is required for VoiceAgent S2S');
throw new Error('llmOptions.Settings.agent.think.provider.type is required for VoiceAgent S2S');
}
this.results = {
@@ -92,7 +92,7 @@ class TaskLlmVoiceAgent_S2S extends Task {
const {path} = this.connectionOptions || {};
if (path) return path;
return '/agent';
return '/v1/agent/converse';
}
async _api(ep, args) {
@@ -193,7 +193,20 @@ class TaskLlmVoiceAgent_S2S extends Task {
}
async _sendInitialMessage(ep) {
if (!await this._sendClientEvent(ep, this.settingsConfiguration)) {
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (mcpTools && mcpTools.length > 0 && this.Settings.agent?.think) {
const convertedTools = mcpTools.map((tool) => ({
name: tool.name,
description: tool.description,
parameters: tool.inputSchema
}));
this.Settings.agent.think.functions = [
...convertedTools,
...(this.Settings.agent.think?.functions || [])
];
}
if (!await this._sendClientEvent(ep, this.Settings)) {
this.notifyTaskDone();
}
}
@@ -254,17 +267,43 @@ class TaskLlmVoiceAgent_S2S extends Task {
/* tool calls */
else if (type === 'FunctionCallRequest') {
this.logger.debug({evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - function_call');
if (!this.toolHook) {
const mcpTools = this.parent.isMcpEnabled ? await this.parent.mcpService.getAvailableMcpTools() : [];
if (!this.toolHook && mcpTools.length === 0) {
this.logger.warn({evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - no toolHook defined!');
}
else {
const {function_name:name, function_call_id:call_id} = evt;
const args = evt.input;
} else {
const {functions} = evt;
const handledFunctions = [];
try {
await this.parent.sendToolHook(call_id, {name, args});
if (mcpTools && mcpTools.length > 0) {
for (const func of functions) {
const {name, arguments: args, id} = func;
const tool = mcpTools.find((tool) => tool.name === name);
if (tool) {
handledFunctions.push(name);
const response = await this.parent.mcpService.callMcpTool(name, JSON.parse(args));
this.logger.debug({response}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - function_call - mcp result');
this.processToolOutput(_ep, id, {
data: {
type: 'FunctionCallResponse',
id,
name,
content: response.length > 0 ? response[0].text : 'There is no output from the function call'
}
});
}
}
}
for (const func of functions) {
const {name, arguments: args, id} = func;
if (!handledFunctions.includes(name)) {
await this.parent.sendToolHook(id, {name, args: JSON.parse(args)});
}
}
} catch (err) {
this.logger.info({err, evt}, 'TaskLlmVoiceAgent - error calling function');
this.logger.info({err, evt}, 'TaskLlmVoiceAgent_S2S:_onServerEvent - error calling function');
this.results = {
completionReason: 'client error calling function',
error: err

View File

@@ -1,7 +1,7 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const bent = require('bent');
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const {K8S} = require('../config');
class TaskMessage extends Task {
constructor(logger, opts) {
@@ -9,7 +9,7 @@ class TaskMessage extends Task {
this.preconditions = TaskPreconditions.None;
this.payload = {
message_sid: this.data.message_sid || uuidv4(),
message_sid: this.data.message_sid || crypto.randomUUID(),
carrier: this.data.carrier,
to: this.data.to,
from: this.data.from,

View File

@@ -1,7 +1,6 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const { PlayFileNotFoundError } = require('../utils/error');
class TaskPlay extends Task {
constructor(logger, opts) {
super(logger, opts);
@@ -27,6 +26,7 @@ class TaskPlay extends Task {
let playbackSeconds = 0;
let playbackMilliseconds = 0;
let completed = !(this.timeoutSecs > 0 || this.loop);
cs.playingAudio = true;
if (this.timeoutSecs > 0) {
timeout = setTimeout(async() => {
completed = true;
@@ -40,6 +40,22 @@ class TaskPlay extends Task {
try {
this.notifyStatus({event: 'start-playback'});
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep.connected) {
/* Listen for playback-start event and set up a one-time listener for uuid_break
* that will kill the audio playback if the taskIds match. This ensures that
* we only kill the currently playing audio and not audio from other tasks.
* As we are using stickyEventEmitter, even if the event is emitted before the listener is registered,
* the listener will receive the most recent event.
*/
ep.once('playback-start', (evt) => {
this.logger.debug({evt}, 'Play got playback-start');
this.cs.stickyEventEmitter.once('uuid_break', (t) => {
if (t?.taskId === this.taskId) {
this.logger.debug(`Play got kill-playback, executing uuid_break, taskId: ${t?.taskId}`);
this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
this.notifyStatus({event: 'kill-playback'});
}
});
});
if (cs.isInConference) {
const {memberId, confName, confUuid} = cs;
if (Array.isArray(this.url)) {
@@ -87,15 +103,15 @@ class TaskPlay extends Task {
async kill(cs) {
super.kill(cs);
if (this.ep.connected && !this.playComplete) {
if (this.ep?.connected && !this.playComplete) {
this.logger.debug('TaskPlay:kill - killing audio');
if (cs.isInConference) {
const {memberId, confName} = cs;
this.killPlayToConfMember(this.ep, memberId, confName);
}
else {
this.notifyStatus({event: 'kill-playback'});
this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
//this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
cs.stickyEventEmitter.emit('uuid_break', this);
}
}
}

View File

@@ -223,7 +223,19 @@ class TaskSay extends TtsTask {
});
ep.once('playback-stop', (evt) => {
this.logger.debug({evt}, 'Say got playback-stop');
if (evt.variable_tts_error) {
this.notifyStatus({event: 'stop-playback'});
this.notifiedPlayBackStop = true;
const tts_error = evt.variable_tts_error;
let response_code = 200;
// Check if any property ends with _response_code
for (const [key, value] of Object.entries(evt)) {
if (key.endsWith('_response_code')) {
response_code = parseInt(value, 10) || 200;
break;
}
}
if (tts_error) {
writeAlerts({
account_sid,
alert_type: AlertType.TTS_FAILURE,
@@ -232,7 +244,7 @@ class TaskSay extends TtsTask {
target_sid
}).catch((err) => this.logger.info({err}, 'Error generating alert for no tts'));
}
if (evt.variable_tts_cache_filename && !this.killed) {
if (!tts_error && response_code < 300 && evt.variable_tts_cache_filename && !this.killed) {
const text = parseTextFromSayString(this.text[segment]);
addFileToCache(evt.variable_tts_cache_filename, {
account_sid,
@@ -241,12 +253,14 @@ class TaskSay extends TtsTask {
voice,
engine,
model: this.model || this.model_id,
text
text,
instructions: this.instructions
}).catch((err) => this.logger.info({err}, 'Error adding file to cache'));
}
if (this._playResolve) {
evt.variable_tts_error ? this._playReject(new Error(evt.variable_tts_error)) : this._playResolve();
(tts_error || response_code >= 300) ? this._playReject(new Error(evt.variable_tts_error)) :
this._playResolve();
}
});
// wait for playback-stop event received to confirm if the playback is successful
@@ -296,6 +310,9 @@ class TaskSay extends TtsTask {
this.logger.debug('TaskSay:kill - clearing TTS stream for streaming audio');
cs.clearTtsStream();
} else {
if (!this.notifiedPlayBackStop) {
this.notifyStatus({event: 'stop-playback'});
}
this.notifyStatus({event: 'kill-playback'});
this.ep.api('uuid_break', this.ep.uuid);
}
@@ -316,6 +333,7 @@ class TaskSay extends TtsTask {
if (key.startsWith('variable_tts_')) {
let newKey = key.substring('variable_tts_'.length)
.replace('whisper_', 'whisper.')
.replace('nvidia_', 'nvidia.')
.replace('deepgram_', 'deepgram.')
.replace('playht_', 'playht.')
.replace('cartesia_', 'cartesia.')

View File

@@ -1,5 +1,5 @@
const Emitter = require('events');
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const {TaskPreconditions} = require('../utils/constants');
const { normalizeJambones } = require('@jambonz/verb-specifications');
const WsRequestor = require('../utils/ws-requestor');
@@ -19,6 +19,7 @@ class Task extends Emitter {
this.data = data;
this.actionHook = this.data.actionHook;
this.id = data.id;
this.taskId = crypto.randomUUID();
this._killInProgress = false;
this._completionPromise = new Promise((resolve) => this._completionResolver = resolve);
@@ -272,7 +273,7 @@ class Task extends Emitter {
}
async transferCallToFeatureServer(cs, sipAddress, opts) {
const uuid = uuidv4();
const uuid = crypto.randomUUID();
const {addKey} = cs.srf.locals.dbHelpers;
const obj = Object.assign({}, cs.application);
delete obj.requestor;

View File

@@ -653,12 +653,21 @@ class TaskTranscribe extends SttTask {
}
_onMaxDurationExceeded(cs, ep, channel) {
this.logger.debug(`TaskTranscribe:_onMaxDurationExceeded on channel ${channel}`);
this.restartDueToError(ep, channel, 'Max duration exceeded');
}
_onMaxBufferExceeded(cs, ep, channel) {
this.restartDueToError(ep, channel, 'Max buffer exceeded');
}
restartDueToError(ep, channel, reason) {
this.logger.debug(`TaskTranscribe:${reason} on channel ${channel}`);
if (this.paused) return;
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({
channel,
'stt.resolve': 'max duration exceeded',
'stt.resolve': reason,
'stt.label': this.label || 'None',
});
this.childSpan[channel - 1].span.end();
@@ -715,6 +724,14 @@ class TaskTranscribe extends SttTask {
return;
}
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
if (this.vendor === 'microsoft' &&
evt.error?.includes('Due to service inactivity, the client buffer exceeded maximum size. Resetting the buffer')) {
let channel = 1;
if (this.ep !== _ep) {
channel = 2;
}
return this._onMaxBufferExceeded(cs, _ep, channel);
}
if (this.paused) return;
const {writeAlerts, AlertType} = cs.srf.locals;

View File

@@ -21,6 +21,7 @@ class TtsTask extends Task {
this.synthesizer = this.data.synthesizer || {};
this.disableTtsCache = this.data.disableTtsCache;
this.options = this.synthesizer.options || {};
this.instructions = this.data.instructions;
}
async exec(cs) {
@@ -262,6 +263,7 @@ class TtsTask extends Task {
const {filePath, servedFromCache, rtt} = await synthAudio(stats, {
account_sid,
text,
instructions: this.instructions,
vendor,
language,
voice,

View File

@@ -194,6 +194,13 @@
"Disconnect": "openai_s2s::disconnect",
"ServerEvent": "openai_s2s::server_event"
},
"LlmEvents_Google": {
"Error": "error",
"Connect": "google_s2s::connect",
"ConnectFailure": "google_s2s::connect_failed",
"Disconnect": "google_s2s::disconnect",
"ServerEvent": "google_s2s::server_event"
},
"LlmEvents_Elevenlabs": {
"Error": "error",
"Connect": "elevenlabs_s2s::connect",

View File

@@ -108,7 +108,7 @@ class HttpRequestor extends BaseRequestor {
assert(HookMsgTypes.includes(type));
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip', 'env_vars', 'args']) : null;
const url = hook.url || hook;
const method = hook.method || 'POST';
let buf = '';
@@ -219,7 +219,7 @@ class HttpRequestor extends BaseRequestor {
const rtt = this._roundTrip(startAt);
if (buf) this.stats.histogram('app.hook.response_time', rtt, ['hook_type:app']);
if (buf && Array.isArray(buf)) {
if (buf && (Array.isArray(buf) || type == 'llm:tool-call')) {
this.logger.info({response: buf}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`);
return buf;
}

103
lib/utils/llm-mcp.js Normal file
View File

@@ -0,0 +1,103 @@
const { Client } = require('@modelcontextprotocol/sdk/client/index.js');
class LlmMcpService {
constructor(logger, mcpServers) {
this.logger = logger;
this.mcpServers = mcpServers || [];
this.mcpClients = [];
}
// make sure we call init() before using any of the mcp clients
// this is to ensure that we have a valid connection to the MCP server
// and that we have collected the available tools.
async init() {
if (this.mcpClients.length > 0) {
return;
}
const { SSEClientTransport } = await import('@modelcontextprotocol/sdk/client/sse.js');
for (const server of this.mcpServers) {
const { url } = server;
if (url) {
try {
const transport = new SSEClientTransport(new URL(url), {});
const client = new Client({ name: 'Jambonz MCP Client', version: '1.0.0' });
await client.connect(transport);
// collect available tools
const { tools } = await client.listTools();
this.mcpClients.push({
url,
client,
tools
});
} catch (err) {
this.logger.error(`LlmMcpService: Failed to connect to MCP server at ${url}: ${err.message}`);
}
}
}
}
async getAvailableMcpTools() {
// returns a list of available tools from all MCP clients
const tools = [];
for (const mcpClient of this.mcpClients) {
const {tools: availableTools} = mcpClient;
if (availableTools) {
tools.push(...availableTools);
}
}
return tools;
}
async getMcpClientByToolName(name) {
for (const mcpClient of this.mcpClients) {
const { tools } = mcpClient;
if (tools && tools.some((tool) => tool.name === name)) {
return mcpClient.client;
}
}
return null;
}
async getMcpClientByToolId(id) {
for (const mcpClient of this.mcpClients) {
const { tools } = mcpClient;
if (tools && tools.some((tool) => tool.id === id)) {
return mcpClient.client;
}
}
return null;
}
async callMcpTool(name, input) {
const client = await this.getMcpClientByToolName(name);
if (client) {
try {
const result = await client.callTool({
name,
arguments: input,
});
this.logger.debug({result}, 'LlmMcpService - result');
return result;
} catch (err) {
this.logger.error({err}, 'LlmMcpService - error calling tool');
throw err;
}
}
}
async close() {
for (const mcpClient of this.mcpClients) {
const { client } = mcpClient;
if (client) {
await client.close();
this.logger.debug({url: mcpClient.url}, 'LlmMcpService - mcp client closed');
}
}
this.mcpClients = [];
}
}
module.exports = LlmMcpService;

View File

@@ -12,7 +12,7 @@ const deepcopy = require('deepcopy');
const moment = require('moment');
const stripCodecs = require('./strip-ancillary-codecs');
const RootSpan = require('./call-tracer');
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const HttpRequestor = require('./http-requestor');
const WsRequestor = require('./ws-requestor');
const {makeOpusFirst, removeVideoSdp} = require('./sdp-utils');
@@ -47,7 +47,7 @@ class SingleDialer extends Emitter {
this.callGone = false;
this.callSid = uuidv4();
this.callSid = crypto.randomUUID();
this.dialTask = dialTask;
this.onHoldMusic = onHoldMusic;

View File

@@ -1,5 +1,5 @@
const assert = require('assert');
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const {LifeCycleEvents, FS_UUID_SET_NAME} = require('./constants');
const Emitter = require('events');
const debug = require('debug')('jambonz:feature-server');
@@ -130,7 +130,7 @@ module.exports = (logger) => {
logger.info('disabling OPTIONS pings since we are running as a kubernetes service');
const {srf} = require('../..');
const {addToSet} = srf.locals.dbHelpers;
const uuid = srf.locals.fsUUID = uuidv4();
const uuid = srf.locals.fsUUID = crypto.randomUUID();
/* in case redis is restarted, re-insert our key every so often */
setInterval(() => {

View File

@@ -1,5 +1,5 @@
const xmlParser = require('xml2js').parseString;
const uuidv4 = require('uuid-random');
const crypto = require('crypto');
const parseUri = require('drachtio-srf').parseUri;
const transform = require('sdp-transform');
const debug = require('debug')('jambonz:feature-server');
@@ -52,7 +52,7 @@ const parseSiprecPayload = (req, logger) => {
const arr = /^([^]+)(m=[^]+?)(m=[^]+?)$/.exec(sdp);
opts.sdp1 = `${arr[1]}${arr[2]}`;
opts.sdp2 = `${arr[1]}${arr[3]}\r\n`;
opts.sessionId = uuidv4();
opts.sessionId = crypto.randomUUID();
logger.info({ payload: req.payload }, 'SIPREC payload with no metadata (e.g. Cisco NBR)');
resolve(opts);
} else if (!sdp || !meta) {
@@ -64,7 +64,7 @@ const parseSiprecPayload = (req, logger) => {
if (err) { throw err; }
opts.recordingData = result ;
opts.sessionId = uuidv4() ;
opts.sessionId = crypto.randomUUID();
const arr = /^([^]+)(m=[^]+?)(m=[^]+?)$/.exec(sdp) ;
opts.sdp1 = `${arr[1]}${arr[2]}` ;

View File

@@ -0,0 +1,70 @@
const EventEmitter = require('events');
/**
* A specialized EventEmitter that caches the most recent event emissions.
* When new listeners are added, they immediately receive the most recent
* event if it was previously emitted. This is useful for handling state
* changes where late subscribers need to know the current state.
*
* Features:
* - Caches the most recent emission for each event type
* - New listeners immediately receive the cached event if available
* - Supports both regular (on) and one-time (once) listeners
* - Maintains compatibility with Node's EventEmitter interface
*/
class StickyEventEmitter extends EventEmitter {
constructor() {
super();
this._eventCache = new Map();
this._onceListeners = new Map(); // For storing once listeners if needed
}
destroy() {
this._eventCache.clear();
this._onceListeners.clear();
this.removeAllListeners();
}
emit(event, ...args) {
// Store the event and its args
this._eventCache.set(event, args);
// If there are any 'once' listeners waiting, call them
if (this._onceListeners.has(event)) {
const listeners = this._onceListeners.get(event);
for (const listener of listeners) {
listener(...args);
}
if (this.onSuccess) {
this.onSuccess();
}
this._onceListeners.delete(event);
}
return super.emit(event, ...args);
}
on(event, listener) {
if (this._eventCache.has(event)) {
listener(...this._eventCache.get(event));
}
return super.on(event, listener);
}
once(event, listener) {
if (this._eventCache.has(event)) {
listener(...this._eventCache.get(event));
if (this.onSuccess) {
this.onSuccess();
}
} else {
// Store listener in case emit comes before
if (!this._onceListeners.has(event)) {
this._onceListeners.set(event, []);
}
this._onceListeners.get(event).push(listener);
super.once(event, listener); // Also attach to native once
}
return this;
}
}
module.exports = StickyEventEmitter;

View File

@@ -8,7 +8,7 @@ const {
const MAX_CHUNK_SIZE = 1800;
const HIGH_WATER_BUFFER_SIZE = 1000;
const LOW_WATER_BUFFER_SIZE = 200;
const TIMEOUT_RETRY_MSECS = 3000;
const TIMEOUT_RETRY_MSECS = 1000; // 1 second
const isWhitespace = (str) => /^\s*$/.test(str);
@@ -377,6 +377,7 @@ class TtsStreamingBuffer extends Emitter {
_onTimeout() {
this.logger.debug('TtsStreamingBuffer:_onTimeout Timeout waiting for sentence boundary');
this.timer = null;
// Check if new text has been added since the timer was set.
const now = Date.now();
if (now - this.lastUpdateTime < TIMEOUT_RETRY_MSECS) {
@@ -384,7 +385,6 @@ class TtsStreamingBuffer extends Emitter {
this._setTimerIfNeeded();
return;
}
this.timer = null;
this._feedQueue(true);
}

View File

@@ -132,7 +132,7 @@ class WsRequestor extends BaseRequestor {
assert(this.ws);
/* prepare and send message */
let payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
let payload = params ? snakeCaseKeys(params, ['customerData', 'sip', 'env_vars', 'args']) : null;
if (type === 'session:new' || type === 'session:adulting') this._sessionData = payload;
if (type === 'session:reconnect') payload = this._sessionData;
assert.ok(url, 'WsRequestor:request url was not provided');
@@ -290,7 +290,7 @@ class WsRequestor extends BaseRequestor {
followRedirects: true,
maxRedirects: 2,
handshakeTimeout,
maxPayload: JAMBONES_WS_MAX_PAYLOAD ? parseInt(JAMBONES_WS_MAX_PAYLOAD) : 48 * 1024,
maxPayload: JAMBONES_WS_MAX_PAYLOAD ? parseInt(JAMBONES_WS_MAX_PAYLOAD) : 24 * 1024,
headers: {
...(HTTP_USER_AGENT_HEADER && {'user-agent' : HTTP_USER_AGENT_HEADER})
}

762
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{
"name": "jambonz-feature-server",
"version": "0.9.3",
"version": "0.9.4",
"main": "app.js",
"engines": {
"node": ">= 18.x"
@@ -27,14 +27,15 @@
"dependencies": {
"@aws-sdk/client-auto-scaling": "^3.549.0",
"@aws-sdk/client-sns": "^3.549.0",
"@jambonz/db-helpers": "^0.9.11",
"@jambonz/db-helpers": "^0.9.12",
"@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.13",
"@jambonz/speech-utils": "^0.2.3",
"@jambonz/speech-utils": "^0.2.10",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.13",
"@jambonz/verb-specifications": "^0.0.102",
"@jambonz/verb-specifications": "^0.0.104",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.50.0",
@@ -61,7 +62,6 @@
"sinon": "^17.0.1",
"to-snake-case": "^1.0.0",
"undici": "^7.5.0",
"uuid-random": "^1.3.2",
"verify-aws-sns-signature": "^0.1.0",
"ws": "^8.18.0",
"xml2js": "^0.6.2"