support say stream with text (#1227)

* support say stream with text

* wip

* wip

* wip

* wip

* update verb  specification
This commit is contained in:
Hoan Luu Huu
2025-06-10 21:56:44 +07:00
committed by GitHub
parent 29708a1f7c
commit b0b74871e7
5 changed files with 103 additions and 19 deletions

View File

@@ -220,6 +220,18 @@ class CallSession extends Emitter {
this._synthesizer = synth;
}
/**
* Say stream enabled
*/
get autoStreamTts() {
return this._autoStreamTts || false;
}
set autoStreamTts(i) {
this._autoStreamTts = i;
}
/**
* ASR TTS fallback
*/
@@ -1799,6 +1811,10 @@ Duration=${duration} `
.catch((err) => this.logger.debug({err}, 'CallSession:_notifyTaskStatus - Error sending'));
}
async _internalTtsStreamingBufferTokens(tokens) {
return await this.ttsStreamingBuffer?.bufferTokens(tokens) || {status: 'failed', reason: 'no tts streaming buffer'};
}
_lccTtsFlush(opts) {
this.ttsStreamingBuffer?.flush(opts);
}

View File

@@ -17,7 +17,8 @@ class TaskConfig extends Task {
'actionHookDelayAction',
'boostAudioSignal',
'vad',
'ttsStream'
'ttsStream',
'autoStreamTts'
].forEach((k) => this[k] = this.data[k] || {});
if ('notifyEvents' in this.data) {
@@ -117,6 +118,7 @@ class TaskConfig extends Task {
if (this.hasTtsStream) {
phrase.push(`${this.ttsStream.enable ? 'enable' : 'disable'} ttsStream`);
}
if ('autoStreamTts' in this.data) phrase.push(`enable Say.stream value ${this.data.autoStreamTts ? 'on' : 'off'}`);
return `${this.name}{${phrase.join(',')}}`;
}
@@ -296,6 +298,11 @@ class TaskConfig extends Task {
});
}
if ('autoStreamTts' in this.data) {
this.logger.info(`Config: autoStreamTts set to ${this.data.autoStreamTts}`);
cs.autoStreamTts = this.data.autoStreamTts;
}
if (this.hasFillerNoise) {
const {enable, ...opts} = this.fillerNoise;
this.logger.info({fillerNoise: this.fillerNoise}, 'Config: fillerNoise');
@@ -330,7 +337,9 @@ class TaskConfig extends Task {
};
this.logger.info({opts: this.gatherOpts}, 'Config: enabling ttsStream');
cs.enableBackgroundTtsStream(this.sayOpts);
} else if (!this.ttsStream.enable) {
}
// only disable ttsStream if it specifically set to false
else if (this.ttsStream.enable === false) {
this.logger.info('Config: disabling ttsStream');
cs.disableTtsStream();
}

View File

@@ -3,24 +3,32 @@ const TtsTask = require('./tts-task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const pollySSMLSplit = require('polly-ssml-split');
const { SpeechCredentialError } = require('../utils/error');
const { sleepFor } = require('../utils/helpers');
const breakLengthyTextIfNeeded = (logger, text) => {
const chunkSize = 1000;
const breakLengthyTextIfNeeded = (logger, text) => {
// As The text can be used for tts streaming, we need to break lengthy text into smaller chunks
// HIGH_WATER_BUFFER_SIZE defined in tts-streaming-buffer.js
const chunkSize = 900;
const isSSML = text.startsWith('<speak>');
if (text.length <= chunkSize || !isSSML) return [text];
const options = {
// MIN length
softLimit: 100,
// MAX length, exclude 15 characters <speak></speak>
hardLimit: chunkSize - 15,
// Set of extra split characters (Optional property)
extraSplitChars: ',;!?',
};
pollySSMLSplit.configure(options);
try {
return pollySSMLSplit.split(text);
if (text.length <= chunkSize) return [text];
if (isSSML) {
return pollySSMLSplit.split(text);
} else {
// Wrap with <speak> and split
const wrapped = `<speak>${text}</speak>`;
const splitArr = pollySSMLSplit.split(wrapped);
// Remove <speak> and </speak> from each chunk
return splitArr.map((str) => str.replace(/^<speak>/, '').replace(/<\/speak>$/, ''));
}
} catch (err) {
logger.info({err}, 'Error spliting SSML long text');
logger.info({err}, 'Error splitting SSML long text');
return [text];
}
};
@@ -39,6 +47,9 @@ class TaskSay extends TtsTask {
assert.ok((typeof this.data.text === 'string' || Array.isArray(this.data.text)) || this.data.stream === true,
'Say: either text or stream:true is required');
this.text = this.data.text ? (Array.isArray(this.data.text) ? this.data.text : [this.data.text])
.map((t) => breakLengthyTextIfNeeded(this.logger, t))
.flat() : [];
if (this.data.stream === true) {
this._isStreamingTts = true;
@@ -46,10 +57,6 @@ class TaskSay extends TtsTask {
}
else {
this._isStreamingTts = false;
this.text = (Array.isArray(this.data.text) ? this.data.text : [this.data.text])
.map((t) => breakLengthyTextIfNeeded(this.logger, t))
.flat();
this.loop = this.data.loop || 1;
this.isHandledByPrimaryProvider = true;
}
@@ -85,6 +92,10 @@ class TaskSay extends TtsTask {
}
try {
this._isStreamingTts = this._isStreamingTts || cs.autoStreamTts;
if (this.isStreamingTts) {
this.closeOnStreamEmpty = this.closeOnStreamEmpty || this.text.length !== 0;
}
if (this.isStreamingTts) await this.handlingStreaming(cs, obj);
else await this.handling(cs, obj);
this.emit('playDone');
@@ -116,6 +127,54 @@ class TaskSay extends TtsTask {
cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_open'})
.catch((err) => this.logger.info({err}, 'TaskSay:handlingStreaming - Error sending'));
if (this.text.length !== 0) {
this.logger.info('TaskSay:handlingStreaming - sending text to TTS stream');
for (const t of this.text) {
const result = await cs._internalTtsStreamingBufferTokens(t);
if (result?.status === 'failed') {
if (result.reason === 'full') {
// Retry logic for full buffer
const maxRetries = 5;
let backoffMs = 1000;
for (let retryCount = 0; retryCount < maxRetries && !this.killed; retryCount++) {
this.logger.info(
`TaskSay:handlingStreaming - retry ${retryCount + 1}/${maxRetries} after ${backoffMs}ms`);
await sleepFor(backoffMs);
const retryResult = await cs._internalTtsStreamingBufferTokens(t);
// Exit retry loop on success
if (retryResult?.status !== 'failed') {
break;
}
// Handle failure for reason other than full buffer
if (retryResult.reason !== 'full') {
this.logger.info(
{result: retryResult}, 'TaskSay:handlingStreaming - TTS stream failed to buffer tokens');
throw new Error(`TTS stream failed to buffer tokens: ${retryResult.reason}`);
}
// Last retry attempt failed
if (retryCount === maxRetries - 1) {
this.logger.info('TaskSay:handlingStreaming - Maximum retries exceeded for full buffer');
throw new Error('TTS stream buffer full - maximum retries exceeded');
}
// Increase backoff for next retry
backoffMs = Math.min(backoffMs * 1.5, 10000);
}
} else {
// Immediate failure for non-full buffer issues
this.logger.info({result}, 'TaskSay:handlingStreaming - TTS stream failed to buffer tokens');
throw new Error(`TTS stream failed to buffer tokens: ${result.reason}`);
}
} else {
await cs._lccTtsFlush();
}
}
}
} catch (err) {
this.logger.info({err}, 'TaskSay:handlingStreaming - Error setting channel vars');
cs.requestor?.request('tts:streaming-event', '/streaming-event', {event_type: 'stream_closed'})

8
package-lock.json generated
View File

@@ -18,7 +18,7 @@
"@jambonz/speech-utils": "^0.2.11",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.13",
"@jambonz/verb-specifications": "^0.0.104",
"@jambonz/verb-specifications": "^0.0.105",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",
@@ -1504,9 +1504,9 @@
}
},
"node_modules/@jambonz/verb-specifications": {
"version": "0.0.104",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.104.tgz",
"integrity": "sha512-G1LjK6ISujdg0zALudtUvdaPXmvA4FU6x3s8S9MwUbWbFo2WERMUcNOgQAutDZwOMrLH9DnbPL8ZIdnTCKnlkA==",
"version": "0.0.105",
"resolved": "https://registry.npmjs.org/@jambonz/verb-specifications/-/verb-specifications-0.0.105.tgz",
"integrity": "sha512-MD6RMJyXMoHpR7Wl3xmYmU54P0eF/9LNywRNNsdkAmSf0EogFqSJft4xD/yGeRWlO5O6eAYZEJdaMQeLSxitcg==",
"license": "MIT",
"dependencies": {
"debug": "^4.3.4",

View File

@@ -34,7 +34,7 @@
"@jambonz/speech-utils": "^0.2.11",
"@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.13",
"@jambonz/verb-specifications": "^0.0.104",
"@jambonz/verb-specifications": "^0.0.105",
"@modelcontextprotocol/sdk": "^1.9.0",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/exporter-jaeger": "^1.23.0",