Compare commits

..

3 Commits

Author SHA1 Message Date
Hoan Luu Huu
c177373817 Merge branch 'main' into fix/fd_1828 2026-01-02 14:27:54 +07:00
xquanluu
037378c732 clean testcases 2025-12-30 06:05:16 +07:00
xquanluu
8cbb12bd9a fixed send whitespace to tts stream modules 2025-12-28 16:41:07 +07:00
16 changed files with 300 additions and 214 deletions

View File

@@ -12,7 +12,6 @@ class CallInfo {
let srf;
this.direction = opts.direction;
this.traceId = opts.traceId;
this.hasRecording = false;
this.callTerminationBy = undefined;
if (opts.req) {
const u = opts.req.getParsedHeader('from');

View File

@@ -756,101 +756,69 @@ class CallSession extends Emitter {
return this._fillerNoise;
}
async pauseOrResumeBackgroundListenIfRequired(action, silence = false) {
if ((action == 'pauseCallRecording' || action == 'resumeCallRecording') &&
this.backgroundTaskManager.isTaskRunning('record')) {
this.logger.debug({action, silence}, 'CallSession:pauseOrResumeBackgroundListenIfRequired');
const backgroundListenTask = this.backgroundTaskManager.getTask('record');
const status = action === 'pauseCallRecording' ? ListenStatus.Pause : ListenStatus.Resume;
backgroundListenTask.updateListen(
status,
silence
);
}
}
async notifyRecordOptions(opts) {
const {action, silence = false, type = 'siprec'} = opts;
const {action, silence} = opts;
this.logger.debug({opts}, 'CallSession:notifyRecordOptions');
if (type == 'cloud') {
switch (action) {
case 'pauseCallRecording':
if (this.backgroundTaskManager.isTaskRunning('record')) {
this.logger.debug({action, silence, type}, 'CallSession:cloudRecording');
const backgroundListenTask = this.backgroundTaskManager.getTask('record');
backgroundListenTask.updateListen(
ListenStatus.Pause,
silence
);
return true;
} else { return false; }
case 'resumeCallRecording':
if (this.backgroundTaskManager.isTaskRunning('record')) {
this.logger.debug({action, silence, type}, 'CallSession:cloudRecording');
const backgroundListenTask = this.backgroundTaskManager.getTask('record');
backgroundListenTask.updateListen(
ListenStatus.Resume,
silence
);
return true;
} else { return false; }
case 'startCallRecording':
if (!this.backgroundTaskManager.isTaskRunning('record')) {
this.logger.debug({action, silence, type}, 'CallSession:cloudRecording');
this.callInfo.hasRecording = true;
this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl)
.catch((err) => this.logger.error(err, 'redis error'));
if (!this.dlg) {
// Call not yet answered so set flag to record on status change
this.application.record_all_calls = true;
} else {
this.backgroundTaskManager.newTask('record');
}
return true;
} else { return false; }
case 'stopCallRecording':
if (this.backgroundTaskManager.isTaskRunning('record')) {
this.logger.debug({action, silence, type}, 'CallSession:cloudRecording');
this.backgroundTaskManager.stop('record');
return true;
} else { return false; }
}
} else {
// SIPREC
/* if we have not answered yet, just save the details for later */
if (!this.dlg) {
if (action === 'startCallRecording') {
this.recordOptions = opts;
return true;
}
return false;
}
this.pauseOrResumeBackgroundListenIfRequired(action, silence);
/* check validity of request */
if (action == 'startCallRecording' && this.recordState !== RecordState.RecordingOff) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: recording is already started, ignoring request');
return false;
}
if (action == 'stopCallRecording' && this.recordState === RecordState.RecordingOff) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: recording is already stopped, ignoring request');
return false;
}
if (action == 'pauseCallRecording' && this.recordState !== RecordState.RecordingOn) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: cannot pause recording, ignoring request ');
return false;
}
if (action == 'resumeCallRecording' && this.recordState !== RecordState.RecordingPaused) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: cannot resume recording, ignoring request ');
return false;
/* if we have not answered yet, just save the details for later */
if (!this.dlg) {
if (action === 'startCallRecording') {
this.recordOptions = opts;
return true;
}
return false;
}
this.recordOptions = opts;
/* check validity of request */
if (action == 'startCallRecording' && this.recordState !== RecordState.RecordingOff) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: recording is already started, ignoring request');
return false;
}
if (action == 'stopCallRecording' && this.recordState === RecordState.RecordingOff) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: recording is already stopped, ignoring request');
return false;
}
if (action == 'pauseCallRecording' && this.recordState !== RecordState.RecordingOn) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: cannot pause recording, ignoring request ');
return false;
}
if (action == 'resumeCallRecording' && this.recordState !== RecordState.RecordingPaused) {
this.logger.info({recordState: this.recordState},
'CallSession:notifyRecordOptions: cannot resume recording, ignoring request ');
return false;
}
switch (action) {
case 'startCallRecording':
return await this.startRecording();
case 'stopCallRecording':
return await this.stopRecording();
case 'pauseCallRecording':
return await this.pauseRecording();
case 'resumeCallRecording':
return await this.resumeRecording();
default:
throw new Error(`invalid record action ${action}`);
}
this.recordOptions = opts;
switch (action) {
case 'startCallRecording':
return await this.startRecording();
case 'stopCallRecording':
return await this.stopRecording();
case 'pauseCallRecording':
return await this.pauseRecording();
case 'resumeCallRecording':
return await this.resumeRecording();
default:
throw new Error(`invalid record action ${action}`);
}
}
@@ -2500,36 +2468,6 @@ Duration=${duration} `
}
else {
this.logger.error(err, `Error attempting to allocate endpoint for for task ${task.name}`);
// Check for SipError type (e.g., 488 codec incompatibility)
const isSipError = err.name === 'SipError';
if (isSipError && err.status) {
// Extract Reason header from SIP response if available (e.g., Q.850;cause=88;text="INCOMPATIBLE_DESTINATION")
const sipReasonHeader = err.res?.msg?.headers?.reason;
this._endpointAllocationError = {
status: err.status,
reason: err.reason || 'Endpoint Allocation Failed',
sipReasonHeader
};
this.logger.info({endpointAllocationError: this._endpointAllocationError},
'Captured SipError for propagation to SBC');
// Send SIP error response immediately for inbound calls
if (this.res && !this.res.finalResponseSent) {
this.logger.info(`Sending ${err.status} response to SBC due to SipError`);
this.res.send(err.status, {
headers: {
'X-Reason': `endpoint allocation failure: ${err.reason || 'Endpoint Allocation Failed'}`,
...(sipReasonHeader && {'Reason': sipReasonHeader})
}
});
this._notifyCallStatusChange({
callStatus: CallStatus.Failed,
sipStatus: err.status,
sipReason: err.reason || 'Endpoint Allocation Failed'
});
this._callReleased();
}
}
throw new Error(`${BADPRECONDITIONS}: unable to allocate endpoint`);
}
}
@@ -3042,7 +2980,8 @@ Duration=${duration} `
// manage record all call.
if (callStatus === CallStatus.InProgress) {
if (this.accountInfo.account.record_all_calls || this.application.record_all_calls) {
if (this.accountInfo.account.record_all_calls ||
this.application.record_all_calls) {
this.backgroundTaskManager.newTask('record');
}
} else if (callStatus == CallStatus.Completed) {

View File

@@ -60,19 +60,6 @@ class InboundCallSession extends CallSession {
}
});
}
else if (this._endpointAllocationError) {
// Propagate SIP error from endpoint allocation failure back to the client
const {status, reason, sipReasonHeader} = this._endpointAllocationError;
this.rootSpan.setAttributes({'call.termination': `endpoint allocation SIP error ${status}`});
this.logger.info({endpointAllocationError: this._endpointAllocationError},
`InboundCallSession:_onTasksDone generating ${status} due to endpoint allocation failure`);
this.res.send(status, {
headers: {
'X-Reason': `endpoint allocation failure: ${reason}`,
...(sipReasonHeader && {'Reason': sipReasonHeader})
}
});
}
else {
this.rootSpan.setAttributes({'call.termination': 'tasks completed without answering call'});
this.logger.info('InboundCallSession:_onTasksDone auto-generating non-success response to invite');

View File

@@ -500,10 +500,6 @@ class TaskGather extends SttTask {
this.addCustomEventListener(ep, GladiaTranscriptionEvents.ConnectFailure,
this._onVendorConnectFailure.bind(this, cs, ep));
this.addCustomEventListener(ep, GladiaTranscriptionEvents.Error, this._onVendorError.bind(this, cs, ep));
// gladia require unique url for each session
const {host, path} = await this.createGladiaLiveSession();
opts.GLADIA_SPEECH_HOST = host;
opts.GLADIA_SPEECH_PATH = path;
break;
case 'soniox':

View File

@@ -1,6 +1,7 @@
const Task = require('./task');
const {TaskName} = require('../utils/constants');
const WsRequestor = require('../utils/ws-requestor');
const URL = require('url');
const HttpRequestor = require('../utils/http-requestor');
/**
@@ -9,7 +10,6 @@ const HttpRequestor = require('../utils/http-requestor');
class TaskRedirect extends Task {
constructor(logger, opts) {
super(logger, opts);
this.statusHook = opts.statusHook || false;
}
get name() { return TaskName.Redirect; }
@@ -33,7 +33,7 @@ class TaskRedirect extends Task {
}
else {
const baseUrl = this.cs.application.requestor.baseUrl;
const newUrl = new URL(this.actionHook);
const newUrl = URL.parse(this.actionHook);
const newBaseUrl = newUrl.protocol + '//' + newUrl.host;
if (baseUrl != newBaseUrl) {
try {
@@ -47,30 +47,6 @@ class TaskRedirect extends Task {
}
}
}
/* update the notifier if a new statusHook was provided */
if (this.statusHook) {
this.logger.info(`TaskRedirect updating statusHook to ${this.statusHook}`);
try {
const oldNotifier = cs.application.notifier;
const isStatusHookAbsolute = cs.notifier?._isAbsoluteUrl(this.statusHook);
if (isStatusHookAbsolute) {
if (cs.notifier instanceof WsRequestor) {
cs.application.notifier = new WsRequestor(this.logger, cs.accountSid, {url: this.statusHook},
cs.accountInfo.account.webhook_secret);
} else {
cs.application.notifier = new HttpRequestor(this.logger, cs.accountSid, {url: this.statusHook},
cs.accountInfo.account.webhook_secret);
}
if (oldNotifier?.close) oldNotifier.close();
}
/* update the call_status_hook URL that gets passed to the notifier */
cs.application.call_status_hook = this.statusHook;
} catch (err) {
this.logger.info(err, `TaskRedirect error updating statusHook to ${this.statusHook}`);
}
}
await this.performAction();
}
}

View File

@@ -203,14 +203,26 @@ class SttTask extends Task {
if (cs.hasGlobalSttPunctuation && !this.data.recognizer.punctuation) {
this.data.recognizer.punctuation = cs.globalSttPunctuation;
}
if (this.vendor === 'gladia') {
const { api_key, region } = this.sttCredentials;
const {url} = await this.createGladiaLiveSession({
api_key, region,
model: this.data.recognizer.model || 'solaria-1',
options: this.data.recognizer.gladiaOptions || {}
});
const {host, pathname, search} = new URL(url);
this.sttCredentials.host = host;
this.sttCredentials.path = `${pathname}${search}`;
}
}
async createGladiaLiveSession() {
const { api_key, region = 'us-west' } = this.sttCredentials;
const model = this.data.recognizer.model || 'solaria-1';
const options = this.data.recognizer.gladiaOptions || {};
async createGladiaLiveSession({
api_key,
region = 'us-west',
model = 'solaria-1',
options = {},
}) {
const url = `https://api.gladia.io/v2/live?region=${region}`;
const response = await fetch(url, {
method: 'POST',
@@ -240,9 +252,7 @@ class SttTask extends Task {
const data = await response.json();
this.logger.debug({url: data.url}, 'Gladia Call registered');
const {host, pathname, search} = new URL(data.url);
return {host, path: `${pathname}${search}`};
return data;
}
addCustomEventListener(ep, event, handler) {

View File

@@ -459,14 +459,6 @@ class TaskTranscribe extends SttTask {
else if (this.data.recognizer?.hints?.length > 0) {
prompt = this.data.recognizer?.hints.join(', ');
}
} else if (this.vendor === 'gladia') {
// gladia require unique url for each session
const {host, path} = await this.createGladiaLiveSession();
await ep.set({
GLADIA_SPEECH_HOST: host,
GLADIA_SPEECH_PATH: path,
})
.catch((err) => this.logger.info(err, 'Error setting channel variables'));
}
await ep.startTranscription({

View File

@@ -118,13 +118,6 @@ class ActionHookDelayProcessor extends Emitter {
this.logger.debug('ActionHookDelayProcessor#_onNoResponseTimer');
this._noResponseTimer = null;
/* check if endpoint is still available (call may have ended) */
if (!this.ep) {
this.logger.debug('ActionHookDelayProcessor#_onNoResponseTimer: endpoint is null, call may have ended');
this._active = false;
return;
}
/* get the next play or say action */
const verb = this.actions[this._retryCount % this.actions.length];
@@ -136,8 +129,8 @@ class ActionHookDelayProcessor extends Emitter {
this._taskInProgress.exec(this.cs, {ep: this.ep}).catch((err) => {
this.logger.info(`ActionHookDelayProcessor#_onNoResponseTimer: error playing file: ${err.message}`);
this._taskInProgress = null;
this.ep?.removeAllListeners('playback-start');
this.ep?.removeAllListeners('playback-stop');
this.ep.removeAllListeners('playback-start');
this.ep.removeAllListeners('playback-stop');
});
} catch (err) {
this.logger.info(err, 'ActionHookDelayProcessor#_onNoResponseTimer: error starting action');

View File

@@ -135,24 +135,26 @@ class BackgroundTaskManager extends Emitter {
// Initiate Record
async _initRecord() {
if (!JAMBONZ_RECORD_WS_BASE_URL || !this.cs.accountInfo.account.bucket_credential) {
this.logger.error('_initRecord: invalid cfg - missing JAMBONZ_RECORD_WS_BASE_URL or bucket config');
return undefined;
}
const listenOpts = {
url: `${JAMBONZ_RECORD_WS_BASE_URL}/record/${this.cs.accountInfo.account.bucket_credential.vendor}`,
disableBidirectionalAudio: true,
mixType : 'stereo',
passDtmf: true
};
if (JAMBONZ_RECORD_WS_USERNAME && JAMBONZ_RECORD_WS_PASSWORD) {
listenOpts.wsAuth = {
username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD
if (this.cs.accountInfo.account.record_all_calls || this.cs.application.record_all_calls) {
if (!JAMBONZ_RECORD_WS_BASE_URL || !this.cs.accountInfo.account.bucket_credential) {
this.logger.error('_initRecord: invalid cfg - missing JAMBONZ_RECORD_WS_BASE_URL or bucket config');
return undefined;
}
const listenOpts = {
url: `${JAMBONZ_RECORD_WS_BASE_URL}/record/${this.cs.accountInfo.account.bucket_credential.vendor}`,
disableBidirectionalAudio: true,
mixType : 'stereo',
passDtmf: true
};
if (JAMBONZ_RECORD_WS_USERNAME && JAMBONZ_RECORD_WS_PASSWORD) {
listenOpts.wsAuth = {
username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD
};
}
this.logger.debug({listenOpts}, '_initRecord: enabling listen');
return await this._initListen({verb: 'listen', ...listenOpts}, 'jambonz-session-record', true, 'record');
}
this.logger.debug({listenOpts}, '_initRecord: enabling listen');
return await this._initListen({verb: 'listen', ...listenOpts}, 'jambonz-session-record', true, 'record');
}
// Initiate Transcribe

View File

@@ -120,8 +120,17 @@ module.exports = (logger) => {
}
pingProxies(srf);
// Note: in response to SIGUSR1 we start drying up but do not exit when calls reach zero.
// This is to allow external scripts that sent the signal to manage the lifecycle.
// if we have zero calls, we can complete the scale-in right
setTimeout(() => {
const calls = srf.locals.sessionTracker.count;
if (calls === 0) {
logger.info('scale-in can complete immediately as we have no calls in progress');
process.exit(0);
}
else {
logger.info(`${calls} calls in progress; scale-in will complete when they are done`);
}
}, 5000);
});
}

View File

@@ -1085,6 +1085,13 @@ module.exports = (logger) => {
...(keyterms && keyterms.length > 0 && {DEEPGRAMFLUX_SPEECH_KEYTERMS: keyterms.join(',')}),
};
}
else if ('gladia' === vendor) {
const {host, path} = sttCredentials;
opts = {
GLADIA_SPEECH_HOST: host,
GLADIA_SPEECH_PATH: path,
};
}
else if ('soniox' === vendor) {
const {sonioxOptions = {}} = rOpts;
const {storage = {}} = sonioxOptions;
@@ -1310,9 +1317,6 @@ module.exports = (logger) => {
...(openaiOptions.turn_detection.silence_duration_ms && {
OPENAI_TURN_DETECTION_SILENCE_DURATION_MS: openaiOptions.turn_detection.silence_duration_ms
}),
...(openaiOptions.turn_detection.eagerness && {
OPENAI_TURN_DETECTION_EAGERNESS: openaiOptions.turn_detection.eagerness
})
};
}
}

View File

@@ -220,7 +220,8 @@ class TtsStreamingBuffer extends Emitter {
this.queue.shift();
}
// Immediately send all accumulated text (ignoring sentence boundaries).
if (flushText.length > 0) {
// Skip sending if flushText is only whitespace.
if (flushText.length > 0 && !isWhitespace(flushText)) {
const modifiedFlushText = flushText.replace(/\n\n/g, '\n \n');
try {
await this._api(this.ep, [this.ep.uuid, 'send', modifiedFlushText]);

8
package-lock.json generated
View File

@@ -18,7 +18,7 @@
"@jambonz/speech-utils": "^0.2.26",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.15",
"@jambonz/verb-specifications": "^0.0.125",
"@jambonz/verb-specifications": "^0.0.122",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
@@ -1529,9 +1529,9 @@
}
},
"node_modules/@jambonz/verb-specifications": {
"version": "0.0.125",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.125.tgz",
"integrity": "sha512-lU1fyyYyjXOdIfQ2gmOFmssZASYNu6LD066iXjqFrBJpiI7shkprcZ1qeWGibuEk9nR2k+em3/YL31Wc8L4wvA==",
"version": "0.0.122",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.122.tgz",
"integrity": "sha512-7xqaULhKFywJ2ZuyiYt77iiJwJ+8b98Zt1X4+OqZ7Cdjhfo7S6KnR66XRVJHnekXbmfVv58kB0KWUux5TG//Sw==",
"license": "MIT",
"dependencies": {
"debug": "^4.3.4",

View File

@@ -34,7 +34,7 @@
"@jambonz/speech-utils": "^0.2.26",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.15",
"@jambonz/verb-specifications": "^0.0.125",
"@jambonz/verb-specifications": "^0.0.122",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",

View File

@@ -4,6 +4,7 @@ require('./ws-requestor-unit-test');
require('./http-requestor-retry-test');
require('./http-requestor-unit-test');
require('./unit-tests');
require('./tts-streaming-buffer-test');
require('./docker_start');
require('./create-test-db');
require('./account-validation-tests');

View File

@@ -0,0 +1,177 @@
const test = require('tape');
const sinon = require('sinon');
const noop = () => {};
const logger = {
error: noop,
info: noop,
debug: noop
};
const {
TtsStreamingConnectionStatus
} = require('../lib/utils/constants.json');
const TtsStreamingBuffer = require('../lib/utils/tts-streaming-buffer');
// Helper to create a mock CallSession
function createMockCs(options = {}) {
const mockEp = {
uuid: 'test-uuid-1234',
api: sinon.stub().resolves({ body: '+OK' }),
addCustomEventListener: sinon.stub(),
removeCustomEventListener: sinon.stub()
};
return {
logger,
ep: mockEp,
isTtsStreamOpen: options.isTtsStreamOpen !== undefined ? options.isTtsStreamOpen : true,
getTsStreamingVendor: () => options.vendor || 'deepgram'
};
}
/**
* BUG REPRODUCTION TEST
*
* This test reproduces the exact issue from production logs:
* {
* "args": ["uuid", "send", " "],
* "msg": "Error calling uuid_deepgram_tts_streaming: -USAGE: <uuid> connect|send|clear|close [tokens]"
* }
*
* Root cause: When multiple flushes are queued while connecting, and a space token
* gets buffered between flushes, Phase 1 of _feedQueue sends that space to the TTS vendor.
*
* Sequence:
* 1. bufferTokens('Hello.') while connecting
* 2. flush() while connecting
* 3. bufferTokens(' ') while connecting (passes because bufferedLength > 0)
* 4. flush() while connecting
* 5. Connection completes, _feedQueue processes: [text:Hello., flush, text:" ", flush]
* 6. First flush sends "Hello." - OK
* 7. Second flush sends " " - BUG!
*/
test('TtsStreamingBuffer: multiple flushes while connecting - space token sent to TTS vendor', async(t) => {
const cs = createMockCs();
const buffer = new TtsStreamingBuffer(cs);
buffer._connectionStatus = TtsStreamingConnectionStatus.Connecting;
buffer.vendor = 'deepgram';
const apiCalls = [];
const originalApi = buffer._api.bind(buffer);
buffer._api = async function(ep, args) {
apiCalls.push({ args: [...args] });
return originalApi(ep, args);
};
// First batch while connecting
await buffer.bufferTokens('Hello.');
buffer.flush();
// Second batch - just a space (passes because bufferedLength > 0)
await buffer.bufferTokens(' ');
buffer.flush();
// Verify queue state before connect
t.equal(buffer.queue.length, 4, 'queue should have 4 items: [text, flush, text, flush]');
t.equal(buffer.queue[0].type, 'text', 'first item should be text');
t.equal(buffer.queue[0].value, 'Hello.', 'first text should be "Hello."');
t.equal(buffer.queue[1].type, 'flush', 'second item should be flush');
t.equal(buffer.queue[2].type, 'text', 'third item should be text');
t.equal(buffer.queue[2].value, ' ', 'third item should be space');
t.equal(buffer.queue[3].type, 'flush', 'fourth item should be flush');
// Connect - triggers _feedQueue
buffer._connectionStatus = TtsStreamingConnectionStatus.Connected;
await buffer._feedQueue();
// Check API calls
const sendCalls = apiCalls.filter(call => call.args[1] === 'send');
// This assertion will FAIL until the bug is fixed
const whitespaceOnlySends = sendCalls.filter(call => /^\s*$/.test(call.args[2]));
t.equal(whitespaceOnlySends.length, 0,
`should not send whitespace-only tokens, but sent: ${whitespaceOnlySends.map(c => JSON.stringify(c.args[2])).join(', ')}`);
t.end();
});
/**
* Additional test: Verify text with trailing space in same flush is OK
*/
test('TtsStreamingBuffer: text with trailing space in same flush should work', async(t) => {
const cs = createMockCs();
const buffer = new TtsStreamingBuffer(cs);
buffer._connectionStatus = TtsStreamingConnectionStatus.Connecting;
buffer.vendor = 'deepgram';
const apiCalls = [];
const originalApi = buffer._api.bind(buffer);
buffer._api = async function(ep, args) {
apiCalls.push({ args: [...args] });
return originalApi(ep, args);
};
// Buffer text with trailing space, then flush
await buffer.bufferTokens('Hello.');
await buffer.bufferTokens(' ');
buffer.flush();
// Connect
buffer._connectionStatus = TtsStreamingConnectionStatus.Connected;
await buffer._feedQueue();
const sendCalls = apiCalls.filter(call => call.args[1] === 'send');
t.equal(sendCalls.length, 1, 'should have one send call');
t.equal(sendCalls[0].args[2], 'Hello. ', 'should send "Hello. " (text with trailing space)');
t.end();
});
/**
* Test: Leading whitespace should be discarded when buffer is empty
*/
test('TtsStreamingBuffer: leading whitespace discarded when buffer empty', async(t) => {
const cs = createMockCs();
const buffer = new TtsStreamingBuffer(cs);
buffer._connectionStatus = TtsStreamingConnectionStatus.Connected;
buffer.vendor = 'deepgram';
// Try to buffer whitespace when buffer is empty
const result = await buffer.bufferTokens(' ');
t.equal(result.status, 'ok', 'should return ok status');
t.equal(buffer.bufferedLength, 0, 'buffer should remain empty');
t.equal(buffer.queue.length, 0, 'queue should remain empty');
t.end();
});
/**
* Test: Whitespace can be buffered when buffer has content
*/
test('TtsStreamingBuffer: whitespace accepted when buffer has content', async(t) => {
const cs = createMockCs();
const buffer = new TtsStreamingBuffer(cs);
buffer._connectionStatus = TtsStreamingConnectionStatus.Connecting;
buffer.vendor = 'deepgram';
// Buffer real text first
await buffer.bufferTokens('Hello');
// Now buffer whitespace (should pass because bufferedLength > 0)
const result = await buffer.bufferTokens(' ');
t.equal(result.status, 'ok', 'should return ok status');
t.equal(buffer.bufferedLength, 6, 'buffer should have 6 chars');
t.equal(buffer.queue.length, 2, 'queue should have 2 items');
t.end();
});