mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-19 04:17:44 +00:00
* fix say verb does not close streaming when finish say * wip * wip * ttsStreamingBuffer reset eventHandlerCount after remove listeners * only send tokens to module if connected * wip * sent stream_open when successfully connected to vendor
468 lines
16 KiB
JavaScript
468 lines
16 KiB
JavaScript
const Emitter = require('events');
|
||
const assert = require('assert');
|
||
const {
|
||
TtsStreamingEvents,
|
||
TtsStreamingConnectionStatus
|
||
} = require('../utils/constants');
|
||
|
||
const MAX_CHUNK_SIZE = 1800;
|
||
const HIGH_WATER_BUFFER_SIZE = 1000;
|
||
const LOW_WATER_BUFFER_SIZE = 200;
|
||
const TIMEOUT_RETRY_MSECS = 1000; // 1 second
|
||
|
||
|
||
const isWhitespace = (str) => /^\s*$/.test(str);
|
||
|
||
/**
|
||
* Each queue item is an object:
|
||
* - { type: 'text', value: '…' } for text tokens.
|
||
* - { type: 'flush' } for a flush command.
|
||
*/
|
||
class TtsStreamingBuffer extends Emitter {
|
||
constructor(cs) {
|
||
super();
|
||
this.cs = cs;
|
||
this.logger = cs.logger;
|
||
|
||
// Use an array to hold our structured items.
|
||
this.queue = [];
|
||
// Track total number of characters in text items.
|
||
this.bufferedLength = 0;
|
||
this.eventHandlers = [];
|
||
this._isFull = false;
|
||
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
|
||
this.timer = null;
|
||
// Record the last time the text buffer was updated.
|
||
this.lastUpdateTime = 0;
|
||
}
|
||
|
||
get isEmpty() {
|
||
return this.queue.length === 0;
|
||
}
|
||
|
||
get size() {
|
||
return this.bufferedLength;
|
||
}
|
||
|
||
get isFull() {
|
||
return this._isFull;
|
||
}
|
||
|
||
get ep() {
|
||
return this.cs?.ep;
|
||
}
|
||
|
||
async start() {
|
||
assert.ok(
|
||
this._connectionStatus === TtsStreamingConnectionStatus.NotConnected,
|
||
'TtsStreamingBuffer:start already started, or has failed'
|
||
);
|
||
|
||
this.vendor = this.cs.getTsStreamingVendor();
|
||
if (!this.vendor) {
|
||
this.logger.info('TtsStreamingBuffer:start No TTS streaming vendor configured');
|
||
throw new Error('No TTS streaming vendor configured');
|
||
}
|
||
|
||
this.logger.info(`TtsStreamingBuffer:start Connecting to TTS streaming with vendor ${this.vendor}`);
|
||
|
||
this._connectionStatus = TtsStreamingConnectionStatus.Connecting;
|
||
try {
|
||
if (this.eventHandlers.length === 0) this._initHandlers(this.ep);
|
||
await this._api(this.ep, [this.ep.uuid, 'connect']);
|
||
} catch (err) {
|
||
this.logger.info({ err }, 'TtsStreamingBuffer:start Error connecting to TTS streaming');
|
||
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
|
||
}
|
||
}
|
||
|
||
stop() {
|
||
clearTimeout(this.timer);
|
||
this.removeCustomEventListeners();
|
||
if (this.ep) {
|
||
this._api(this.ep, [this.ep.uuid, 'stop'])
|
||
.catch((err) =>
|
||
this.logger.info({ err }, 'TtsStreamingBuffer:stop Error closing TTS streaming')
|
||
);
|
||
}
|
||
this.timer = null;
|
||
this.queue = [];
|
||
this.bufferedLength = 0;
|
||
this._connectionStatus = TtsStreamingConnectionStatus.NotConnected;
|
||
}
|
||
|
||
/**
|
||
* Buffer new text tokens.
|
||
*/
|
||
async bufferTokens(tokens) {
|
||
if (this._connectionStatus === TtsStreamingConnectionStatus.Failed) {
|
||
this.logger.info('TtsStreamingBuffer:bufferTokens TTS streaming connection failed, rejecting request');
|
||
return { status: 'failed', reason: `connection to ${this.vendor} failed` };
|
||
}
|
||
|
||
if (0 === this.bufferedLength && isWhitespace(tokens)) {
|
||
this.logger.debug({tokens}, 'TtsStreamingBuffer:bufferTokens discarded whitespace tokens');
|
||
return { status: 'ok' };
|
||
}
|
||
|
||
const displayedTokens = tokens.length <= 40 ? tokens : tokens.substring(0, 40);
|
||
const totalLength = tokens.length;
|
||
|
||
if (this.bufferedLength + totalLength > HIGH_WATER_BUFFER_SIZE) {
|
||
this.logger.info(
|
||
`TtsStreamingBuffer throttling: buffer is full, rejecting request to buffer ${totalLength} tokens`
|
||
);
|
||
if (!this._isFull) {
|
||
this._isFull = true;
|
||
this.emit(TtsStreamingEvents.Pause);
|
||
}
|
||
return { status: 'failed', reason: 'full' };
|
||
}
|
||
|
||
this.logger.debug(
|
||
`TtsStreamingBuffer:bufferTokens "${displayedTokens}" (length: ${totalLength})`
|
||
);
|
||
this.queue.push({ type: 'text', value: tokens });
|
||
this.bufferedLength += totalLength;
|
||
// Update the last update time each time new text is buffered.
|
||
this.lastUpdateTime = Date.now();
|
||
|
||
await this._feedQueue();
|
||
return { status: 'ok' };
|
||
}
|
||
|
||
/**
|
||
* Insert a flush command. If no text is queued, flush immediately.
|
||
* Otherwise, append a flush marker so that all text preceding it will be sent
|
||
* (regardless of sentence boundaries) before the flush is issued.
|
||
*/
|
||
flush() {
|
||
if (this._connectionStatus === TtsStreamingConnectionStatus.Connecting) {
|
||
this.logger.debug('TtsStreamingBuffer:flush TTS stream is not quite ready - wait for connect');
|
||
if (this.queue.length === 0 || this.queue[this.queue.length - 1].type !== 'flush') {
|
||
this.queue.push({ type: 'flush' });
|
||
}
|
||
return;
|
||
}
|
||
else if (this._connectionStatus === TtsStreamingConnectionStatus.Connected) {
|
||
if (this.isEmpty) {
|
||
this._doFlush();
|
||
}
|
||
else {
|
||
if (this.queue[this.queue.length - 1].type !== 'flush') {
|
||
this.queue.push({ type: 'flush' });
|
||
this.logger.debug('TtsStreamingBuffer:flush added flush marker to queue');
|
||
}
|
||
}
|
||
}
|
||
else {
|
||
this.logger.debug(
|
||
`TtsStreamingBuffer:flush TTS stream is not connected, status: ${this._connectionStatus}`
|
||
);
|
||
}
|
||
}
|
||
|
||
clear() {
|
||
this.logger.debug('TtsStreamingBuffer:clear');
|
||
if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) return;
|
||
clearTimeout(this.timer);
|
||
this._api(this.ep, [this.ep.uuid, 'clear']).catch((err) =>
|
||
this.logger.info({ err }, 'TtsStreamingBuffer:clear Error clearing TTS streaming')
|
||
);
|
||
this.queue = [];
|
||
this.bufferedLength = 0;
|
||
this.timer = null;
|
||
this._isFull = false;
|
||
}
|
||
|
||
/**
|
||
* Process the queue in two phases.
|
||
*
|
||
* Phase 1: Look for flush markers. When a flush marker is found (even if not at the very front),
|
||
* send all text tokens that came before it immediately (ignoring sentence boundaries)
|
||
* and then send the flush command. Repeat until there are no flush markers left.
|
||
*
|
||
* Phase 2: With the remaining queue (now containing only text items), accumulate text
|
||
* up to MAX_CHUNK_SIZE and use sentence-boundary logic to determine a chunk.
|
||
* Then, remove the exact tokens (or portions thereof) that were consumed.
|
||
*/
|
||
async _feedQueue(handlingTimeout = false) {
|
||
this.logger.debug({ queue: this.queue }, 'TtsStreamingBuffer:_feedQueue');
|
||
try {
|
||
if (!this.cs.isTtsStreamOpen || !this.ep) {
|
||
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not open or no endpoint available');
|
||
return;
|
||
}
|
||
if (this._connectionStatus !== TtsStreamingConnectionStatus.Connected) {
|
||
this.logger.debug('TtsStreamingBuffer:_feedQueue TTS stream is not connected');
|
||
return;
|
||
}
|
||
|
||
// --- Phase 1: Process flush markers ---
|
||
// Process any flush marker that isn’t in the very first position.
|
||
let flushIndex = this.queue.findIndex((item, idx) => item.type === 'flush' && idx > 0);
|
||
while (flushIndex !== -1) {
|
||
let flushText = '';
|
||
// Accumulate all text tokens preceding the flush marker.
|
||
for (let i = 0; i < flushIndex; i++) {
|
||
if (this.queue[i].type === 'text') {
|
||
flushText += this.queue[i].value;
|
||
}
|
||
}
|
||
// Remove those text items.
|
||
for (let i = 0; i < flushIndex; i++) {
|
||
const item = this.queue.shift();
|
||
if (item.type === 'text') {
|
||
this.bufferedLength -= item.value.length;
|
||
}
|
||
}
|
||
// Remove the flush marker (now at the front).
|
||
if (this.queue.length > 0 && this.queue[0].type === 'flush') {
|
||
this.queue.shift();
|
||
}
|
||
// Immediately send all accumulated text (ignoring sentence boundaries).
|
||
if (flushText.length > 0) {
|
||
const modifiedFlushText = flushText.replace(/\n\n/g, '\n \n');
|
||
try {
|
||
await this._api(this.ep, [this.ep.uuid, 'send', modifiedFlushText]);
|
||
} catch (err) {
|
||
this.logger.info({ err, flushText }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
|
||
}
|
||
}
|
||
// Send the flush command.
|
||
await this._doFlush();
|
||
|
||
flushIndex = this.queue.findIndex((item, idx) => item.type === 'flush' && idx > 0);
|
||
}
|
||
|
||
// If a flush marker is at the very front, process it.
|
||
while (this.queue.length > 0 && this.queue[0].type === 'flush') {
|
||
this.queue.shift();
|
||
await this._doFlush();
|
||
}
|
||
|
||
// --- Phase 2: Process remaining text tokens ---
|
||
if (this.queue.length === 0) {
|
||
this._removeTimer();
|
||
return;
|
||
}
|
||
|
||
// Accumulate contiguous text tokens (from the front) up to MAX_CHUNK_SIZE.
|
||
let combinedText = '';
|
||
for (const item of this.queue) {
|
||
if (item.type !== 'text') break;
|
||
combinedText += item.value;
|
||
if (combinedText.length >= MAX_CHUNK_SIZE) break;
|
||
}
|
||
if (combinedText.length === 0) {
|
||
this._removeTimer();
|
||
return;
|
||
}
|
||
|
||
const limit = Math.min(MAX_CHUNK_SIZE, combinedText.length);
|
||
let chunkEnd = findSentenceBoundary(combinedText, limit);
|
||
if (chunkEnd <= 0) {
|
||
if (handlingTimeout) {
|
||
chunkEnd = findWordBoundary(combinedText, limit);
|
||
if (chunkEnd <= 0) {
|
||
this._setTimerIfNeeded();
|
||
return;
|
||
}
|
||
} else {
|
||
this._setTimerIfNeeded();
|
||
return;
|
||
}
|
||
}
|
||
const chunk = combinedText.slice(0, chunkEnd);
|
||
|
||
// Check if the chunk is only whitespace before processing the queue
|
||
// If so, wait for more meaningful text
|
||
if (isWhitespace(chunk)) {
|
||
this.logger.debug('TtsStreamingBuffer:_feedQueue chunk is only whitespace, waiting for more text');
|
||
this._setTimerIfNeeded();
|
||
return;
|
||
}
|
||
|
||
// Now we iterate over the queue items
|
||
// and deduct their lengths until we've accounted for chunkEnd characters.
|
||
let remaining = chunkEnd;
|
||
let tokensProcessed = 0;
|
||
for (let i = 0; i < this.queue.length; i++) {
|
||
const token = this.queue[i];
|
||
if (token.type !== 'text') break;
|
||
if (remaining >= token.value.length) {
|
||
remaining -= token.value.length;
|
||
tokensProcessed = i + 1;
|
||
} else {
|
||
// Partially consumed token: update its value to remove the consumed part.
|
||
token.value = token.value.slice(remaining);
|
||
tokensProcessed = i;
|
||
remaining = 0;
|
||
break;
|
||
}
|
||
}
|
||
// Remove the fully consumed tokens from the front of the queue.
|
||
this.queue.splice(0, tokensProcessed);
|
||
this.bufferedLength -= chunkEnd;
|
||
|
||
const modifiedChunk = chunk.replace(/\n\n/g, '\n \n');
|
||
|
||
if (isWhitespace(modifiedChunk)) {
|
||
this.logger.debug('TtsStreamingBuffer:_feedQueue modified chunk is only whitespace, restoring queue');
|
||
this.queue.unshift({ type: 'text', value: chunk });
|
||
this.bufferedLength += chunkEnd;
|
||
this._setTimerIfNeeded();
|
||
return;
|
||
}
|
||
this.logger.debug(`TtsStreamingBuffer:_feedQueue sending chunk to tts: ${modifiedChunk}`);
|
||
|
||
try {
|
||
await this._api(this.ep, [this.ep.uuid, 'send', modifiedChunk]);
|
||
} catch (err) {
|
||
this.logger.info({ err, chunk }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
|
||
}
|
||
|
||
if (this._isFull && this.bufferedLength <= LOW_WATER_BUFFER_SIZE) {
|
||
this.logger.info('TtsStreamingBuffer throttling: buffer is no longer full - resuming');
|
||
this._isFull = false;
|
||
this.emit(TtsStreamingEvents.Resume);
|
||
}
|
||
|
||
return this._feedQueue();
|
||
} catch (err) {
|
||
this.logger.info({ err }, 'TtsStreamingBuffer:_feedQueue Error sending TTS chunk');
|
||
this.queue = [];
|
||
this.bufferedLength = 0;
|
||
}
|
||
}
|
||
|
||
async _api(ep, args) {
|
||
const apiCmd = `uuid_${this.vendor.startsWith('custom:') ? 'custom' : this.vendor}_tts_streaming`;
|
||
const res = await ep.api(apiCmd, `^^|${args.join('|')}`);
|
||
if (!res.body?.startsWith('+OK')) {
|
||
this.logger.info({ args }, `Error calling ${apiCmd}: ${res.body}`);
|
||
throw new Error(`Error calling ${apiCmd}: ${res.body}`);
|
||
}
|
||
}
|
||
|
||
_doFlush() {
|
||
return this._api(this.ep, [this.ep.uuid, 'flush'])
|
||
.then(() => this.logger.debug('TtsStreamingBuffer:_doFlush sent flush command'))
|
||
.catch((err) =>
|
||
this.logger.info(
|
||
{ err },
|
||
`TtsStreamingBuffer:_doFlush Error flushing TTS streaming: ${JSON.stringify(err)}`
|
||
)
|
||
);
|
||
}
|
||
|
||
async _onConnect(vendor) {
|
||
this.logger.info(`TtsStreamingBuffer:_onConnect streaming tts connection made to ${vendor} successful`);
|
||
this._connectionStatus = TtsStreamingConnectionStatus.Connected;
|
||
if (this.queue.length > 0) {
|
||
await this._feedQueue();
|
||
}
|
||
this.emit(TtsStreamingEvents.Connected, { vendor });
|
||
}
|
||
|
||
_onConnectFailure(vendor) {
|
||
this.logger.info(`TtsStreamingBuffer:_onConnectFailure streaming tts connection failed to ${vendor}`);
|
||
this._connectionStatus = TtsStreamingConnectionStatus.Failed;
|
||
this.queue = [];
|
||
this.bufferedLength = 0;
|
||
this.emit(TtsStreamingEvents.ConnectFailure, { vendor });
|
||
}
|
||
|
||
_setTimerIfNeeded() {
|
||
if (this.bufferedLength > 0 && !this.timer) {
|
||
this.logger.debug({queue: this.queue},
|
||
`TtsStreamingBuffer:_setTimerIfNeeded setting timer because ${this.bufferedLength} buffered`);
|
||
this.timer = setTimeout(this._onTimeout.bind(this), TIMEOUT_RETRY_MSECS);
|
||
}
|
||
}
|
||
|
||
_removeTimer() {
|
||
if (this.timer) {
|
||
this.logger.debug('TtsStreamingBuffer:_removeTimer clearing timer');
|
||
clearTimeout(this.timer);
|
||
this.timer = null;
|
||
}
|
||
}
|
||
|
||
_onTimeout() {
|
||
this.logger.debug('TtsStreamingBuffer:_onTimeout Timeout waiting for sentence boundary');
|
||
this.timer = null;
|
||
// Check if new text has been added since the timer was set.
|
||
const now = Date.now();
|
||
if (now - this.lastUpdateTime < TIMEOUT_RETRY_MSECS) {
|
||
this.logger.debug('TtsStreamingBuffer:_onTimeout New text received recently; postponing flush.');
|
||
this._setTimerIfNeeded();
|
||
return;
|
||
}
|
||
this._feedQueue(true);
|
||
}
|
||
|
||
_onTtsEmpty(vendor) {
|
||
this.emit(TtsStreamingEvents.Empty, { vendor });
|
||
}
|
||
|
||
addCustomEventListener(ep, event, handler) {
|
||
this.eventHandlers.push({ ep, event, handler });
|
||
ep.addCustomEventListener(event, handler);
|
||
}
|
||
|
||
removeCustomEventListeners() {
|
||
this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler));
|
||
this.eventHandlers.length = 0;
|
||
}
|
||
|
||
_initHandlers(ep) {
|
||
[
|
||
'deepgram',
|
||
'cartesia',
|
||
'elevenlabs',
|
||
'rimelabs',
|
||
'custom'
|
||
].forEach((vendor) => {
|
||
const eventClassName = `${vendor.charAt(0).toUpperCase() + vendor.slice(1)}TtsStreamingEvents`;
|
||
const eventClass = require('../utils/constants')[eventClassName];
|
||
if (!eventClass) throw new Error(`Event class for vendor ${vendor} not found`);
|
||
|
||
this.addCustomEventListener(ep, eventClass.Connect, this._onConnect.bind(this, vendor));
|
||
this.addCustomEventListener(ep, eventClass.ConnectFailure, this._onConnectFailure.bind(this, vendor));
|
||
this.addCustomEventListener(ep, eventClass.Empty, this._onTtsEmpty.bind(this, vendor));
|
||
});
|
||
}
|
||
}
|
||
|
||
const findSentenceBoundary = (text, limit) => {
|
||
// Look for punctuation or double newline that signals sentence end.
|
||
const sentenceEndRegex = /[.!?](?=\s|$)|\n\n/g;
|
||
let lastSentenceBoundary = -1;
|
||
let match;
|
||
while ((match = sentenceEndRegex.exec(text)) && match.index < limit) {
|
||
const precedingText = text.slice(0, match.index).trim();
|
||
if (precedingText.length > 0) {
|
||
if (
|
||
match[0] === '\n\n' ||
|
||
(match.index === 0 || !/\d$/.test(text[match.index - 1]))
|
||
) {
|
||
lastSentenceBoundary = match.index + (match[0] === '\n\n' ? 2 : 1);
|
||
}
|
||
}
|
||
}
|
||
return lastSentenceBoundary;
|
||
};
|
||
|
||
const findWordBoundary = (text, limit) => {
|
||
const wordBoundaryRegex = /\s+/g;
|
||
let lastWordBoundary = -1;
|
||
let match;
|
||
while ((match = wordBoundaryRegex.exec(text)) && match.index < limit) {
|
||
lastWordBoundary = match.index;
|
||
}
|
||
return lastWordBoundary;
|
||
};
|
||
|
||
module.exports = TtsStreamingBuffer;
|