From 499c8002130b298773de9b689fe183adc1ebf277 Mon Sep 17 00:00:00 2001 From: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com> Date: Tue, 14 Jan 2025 19:11:55 +0700 Subject: [PATCH] Feat/ultravox s2s (#1032) * support ultravox_s2s * support ultravox_s2s * support ultravox_s2s * wip * wip * wip * wip * fix ultravox toolcall * wip --- lib/tasks/llm/index.js | 5 + lib/tasks/llm/llms/ultravox_s2s.js | 245 +++++++++++++++++++++++++++++ lib/utils/constants.json | 7 + package-lock.json | 13 +- 4 files changed, 264 insertions(+), 6 deletions(-) create mode 100644 lib/tasks/llm/llms/ultravox_s2s.js diff --git a/lib/tasks/llm/index.js b/lib/tasks/llm/index.js index c0d6f316..00c6ed72 100644 --- a/lib/tasks/llm/index.js +++ b/lib/tasks/llm/index.js @@ -2,6 +2,7 @@ const Task = require('../task'); const {TaskPreconditions} = require('../../utils/constants'); const TaskLlmOpenAI_S2S = require('./llms/openai_s2s'); const TaskLlmVoiceAgent_S2S = require('./llms/voice_agent_s2s'); +const TaskLlmUltravox_S2S = require('./llms/ultravox_s2s'); class TaskLlm extends Task { constructor(logger, opts) { @@ -49,6 +50,10 @@ class TaskLlm extends Task { llm = new TaskLlmVoiceAgent_S2S(this.logger, this.data, this); break; + case 'ultravox': + llm = new TaskLlmUltravox_S2S(this.logger, this.data, this); + break; + default: throw new Error(`Unsupported vendor ${this.vendor} for LLM`); } diff --git a/lib/tasks/llm/llms/ultravox_s2s.js b/lib/tasks/llm/llms/ultravox_s2s.js new file mode 100644 index 00000000..1850229a --- /dev/null +++ b/lib/tasks/llm/llms/ultravox_s2s.js @@ -0,0 +1,245 @@ +const Task = require('../../task'); +const TaskName = 'Llm_Ultravox_s2s'; +const {request} = require('undici'); +const {LlmEvents_Ultravox} = require('../../../utils/constants'); + +const ultravox_server_events = [ + 'pong', + 'state', + 'transcript', + 'conversationText', + 'clientToolInvocation', + 'playbackClearBuffer', +]; + +const ClientEvent = 'client.event'; + +const expandWildcards = (events) => { + // no-op for deepgram + return events; +}; + +const SessionDelete = 'session.delete'; + +class TaskLlmUltravox_S2S extends Task { + constructor(logger, opts, parentTask) { + super(logger, opts, parentTask); + this.parent = parentTask; + + this.vendor = this.parent.vendor; + this.model = this.parent.model || 'fixie-ai/ultravox'; + this.auth = this.parent.auth; + this.connectionOptions = this.parent.connectOptions; + + const {apiKey} = this.auth || {}; + if (!apiKey) throw new Error('auth.apiKey is required for Vendor: Ultravox'); + this.apiKey = apiKey; + this.actionHook = this.data.actionHook; + this.eventHook = this.data.eventHook; + this.toolHook = this.data.toolHook; + + /** + * only one of these will have items, + * if includeEvents, then these are the events to include + * if excludeEvents, then these are the events to exclude + */ + this.includeEvents = []; + this.excludeEvents = []; + + /* default to all events if user did not specify */ + this._populateEvents(this.data.events || ultravox_server_events); + + this.addCustomEventListener = parentTask.addCustomEventListener.bind(parentTask); + this.removeCustomEventListeners = parentTask.removeCustomEventListeners.bind(parentTask); + } + + get name() { return TaskName; } + + async _api(ep, args) { + const res = await ep.api('uuid_ultravox_s2s', `^^|${args.join('|')}`); + if (!res.body?.startsWith('+OK')) { + throw new Error(`Error calling uuid_ultravox_s2s: ${JSON.stringify(res.body)}`); + } + } + + async createCall() { + const payload = { + ...this.data.llmOptions, + model: this.model, + medium: { + ...(this.data.llmOptions.medium || {}), + serverWebSocket: { + inputSampleRate: 8000, + outputSampleRate: 8000, + } + } + }; + const {statusCode, body} = await request('https://api.ultravox.ai/api/calls', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-API-Key': this.apiKey + }, + body: JSON.stringify(payload) + }); + const data = await body.json(); + if (statusCode !== 201 || !data?.joinUrl) { + this.logger.error({statusCode, data}, 'Ultravox Error registering call'); + throw new Error(`Ultravox Error registering call: ${data.message}`); + } + this.logger.info({joinUrl: data.joinUrl}, 'Ultravox Call registered'); + return data.joinUrl; + } + + _unregisterHandlers() { + this.removeCustomEventListeners(); + } + + _registerHandlers(ep) { + this.addCustomEventListener(ep, LlmEvents_Ultravox.Connect, this._onConnect.bind(this, ep)); + this.addCustomEventListener(ep, LlmEvents_Ultravox.ConnectFailure, this._onConnectFailure.bind(this, ep)); + this.addCustomEventListener(ep, LlmEvents_Ultravox.Disconnect, this._onDisconnect.bind(this, ep)); + this.addCustomEventListener(ep, LlmEvents_Ultravox.ServerEvent, this._onServerEvent.bind(this, ep)); + } + + async _startListening(cs, ep) { + this._registerHandlers(ep); + + const joinUrl = await this.createCall(); + // split the joinUrl into host and path + const {host, pathname, search} = new URL(joinUrl); + + try { + const args = [ep.uuid, 'session.create', host, pathname + search]; + await this._api(ep, args); + } catch (err) { + this.logger.error({err}, 'TaskLlmUltraVox_S2S:_startListening'); + this.notifyTaskDone(); + } + } + + async exec(cs, {ep}) { + await super.exec(cs); + + await this._startListening(cs, ep); + + await this.awaitTaskDone(); + + /* note: the parent llm verb started the span, which is why this is necessary */ + await this.parent.performAction(this.results); + + this._unregisterHandlers(); + } + + async kill(cs) { + super.kill(cs); + + this._api(cs.ep, [cs.ep.uuid, SessionDelete]) + .catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:kill - error deleting session')); + + this.notifyTaskDone(); + } + + _onConnect(ep) { + this.logger.debug('TaskLlmUltravox_S2S:_onConnect'); + } + _onConnectFailure(_ep, evt) { + this.logger.info(evt, 'TaskLlmUltravox_S2S:_onConnectFailure'); + this.results = {completionReason: 'connection failure'}; + this.notifyTaskDone(); + } + _onDisconnect(_ep, evt) { + this.logger.info(evt, 'TaskLlmUltravox_S2S:_onConnectFailure'); + this.results = {completionReason: 'disconnect from remote end'}; + this.notifyTaskDone(); + } + + async _onServerEvent(_ep, evt) { + let endConversation = false; + const type = evt.type; + this.logger.info({evt}, 'TaskLlmUltravox_S2S:_onServerEvent'); + + /* server errors of some sort */ + if (type === 'error') { + endConversation = true; + this.results = { + completionReason: 'server error', + error: evt.error + }; + } + + /* tool calls */ + else if (type === 'client_tool_invocation') { + this.logger.debug({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - function_call'); + if (!this.toolHook) { + this.logger.warn({evt}, 'TaskLlmUltravox_S2S:_onServerEvent - no toolHook defined!'); + } + else { + const {toolName: name, invocationId: call_id, parameters: args} = evt; + + try { + await this.parent.sendToolHook(call_id, {name, args}); + } catch (err) { + this.logger.info({err, evt}, 'TaskLlmUltravox_S2S - error calling function'); + this.results = { + completionReason: 'client error calling function', + error: err + }; + endConversation = true; + } + } + } + + /* check whether we should notify on this event */ + if (this.includeEvents.length > 0 ? this.includeEvents.includes(type) : !this.excludeEvents.includes(type)) { + this.parent.sendEventHook(evt) + .catch((err) => this.logger.info({err}, 'TaskLlmUltravox_S2S:_onServerEvent - error sending event hook')); + } + + if (endConversation) { + this.logger.info({results: this.results}, + 'TaskLlmUltravox_S2S:_onServerEvent - ending conversation due to error'); + this.notifyTaskDone(); + } + } + + async processToolOutput(ep, tool_call_id, data) { + try { + this.logger.debug({tool_call_id, data}, 'TaskLlmUltravox_S2S:processToolOutput'); + + if (!data.type || data.type !== 'client_tool_result') { + this.logger.info({data}, + 'TaskLlmUltravox_S2S:processToolOutput - invalid tool output, must be client_tool_result'); + } + else { + await this._api(ep, [ep.uuid, ClientEvent, JSON.stringify(data)]); + } + } catch (err) { + this.logger.info({err}, 'TaskLlmUltravox_S2S:processToolOutput'); + } + } + + _populateEvents(events) { + if (events.includes('all')) { + /* work by excluding specific events */ + const exclude = events + .filter((evt) => evt.startsWith('-')) + .map((evt) => evt.slice(1)); + if (exclude.length === 0) this.includeEvents = ultravox_server_events; + else this.excludeEvents = expandWildcards(exclude); + } + else { + /* work by including specific events */ + const include = events + .filter((evt) => !evt.startsWith('-')); + this.includeEvents = expandWildcards(include); + } + + this.logger.debug({ + includeEvents: this.includeEvents, + excludeEvents: this.excludeEvents + }, 'TaskLlmUltravox_S2S:_populateEvents'); + } +} + +module.exports = TaskLlmUltravox_S2S; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index bee95d30..13067bab 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -182,6 +182,13 @@ "Disconnect": "voice_agent_s2s::disconnect", "ServerEvent": "voice_agent_s2s::server_event" }, + "LlmEvents_Ultravox": { + "Error": "error", + "Connect": "ultravox_s2s::connect", + "ConnectFailure": "ultravox_s2s::connect_failed", + "Disconnect": "ultravox_s2s::disconnect", + "ServerEvent": "ultravox_s2s::server_event" + }, "QueueResults": { "Bridged": "bridged", "Error": "error", diff --git a/package-lock.json b/package-lock.json index fabcd0ed..1e6c6d6f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9004,9 +9004,10 @@ "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" }, "node_modules/tslib": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", - "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" }, "node_modules/type": { "version": "1.2.0", @@ -16409,9 +16410,9 @@ "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==" }, "tslib": { - "version": "2.6.2", - "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", - "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==" }, "type": { "version": "1.2.0",