Compare commits

..

2 Commits

Author SHA1 Message Date
rammohan-y
28ff85225f Fixed issue for punctuation (#1344)
https://github.com/jambonz/jambonz-feature-server/issues/1343
2025-09-03 13:33:38 -04:00
Dave Horton
f2fe7c4d24 Fix/playback race by fs generates playback (#1331)
* update to speech-utils that generates playback id

* modify tts and say task to track current playback id and match against start and stop events

* bump speech utils

* wip

* wip

* fix race condition where say with playbackId gets stop event from previous play from cache file

* logging

* wip

* fix comparison when playing cached files

* logging
2025-08-26 09:39:25 -04:00
5 changed files with 77 additions and 37 deletions

View File

@@ -710,7 +710,7 @@ class CallSession extends Emitter {
} }
hasGlobalSttPunctuation() { get hasGlobalSttPunctuation() {
return this._globalSttPunctuation !== undefined; return this._globalSttPunctuation !== undefined;
} }
@@ -1017,8 +1017,6 @@ class CallSession extends Emitter {
(type === 'tts' && credential.use_for_tts) || (type === 'tts' && credential.use_for_tts) ||
(type === 'stt' && credential.use_for_stt) (type === 'stt' && credential.use_for_stt)
)) { )) {
this.logger.debug(
`${type}: ${credential.vendor} ${credential.label ? `, label: ${credential.label}` : ''} `);
if ('google' === vendor) { if ('google' === vendor) {
if (type === 'tts' && !credential.tts_tested_ok || if (type === 'tts' && !credential.tts_tested_ok ||
type === 'stt' && !credential.stt_tested_ok) { type === 'stt' && !credential.stt_tested_ok) {

View File

@@ -5,6 +5,28 @@ const pollySSMLSplit = require('polly-ssml-split');
const { SpeechCredentialError } = require('../utils/error'); const { SpeechCredentialError } = require('../utils/error');
const { sleepFor } = require('../utils/helpers'); const { sleepFor } = require('../utils/helpers');
/**
* Discard unmatching responses:
* (1) I sent a playback id but get a response with a different playback id
* (2) I sent a playback id but get a response with no playback id
* (3) I did not send a playback id but get a response with a playback id
* (4) I sent a cache file but get a response with a different cache file
*/
const isMatchingEvent = (logger, filename, playbackId, evt) => {
if (!!playbackId && !!evt.variable_tts_playback_id && evt.variable_tts_playback_id === playbackId) {
//logger.debug({filename, playbackId, evt}, 'Say:isMatchingEvent - playbackId matched');
return true;
}
if (!!filename && !!evt.file && evt.file === filename) {
//logger.debug({filename, playbackId, evt}, 'Say:isMatchingEvent - filename matched');
return true;
}
logger.info({filename, playbackId, evt}, 'Say:isMatchingEvent - no match');
return false;
};
const breakLengthyTextIfNeeded = (logger, text) => { const breakLengthyTextIfNeeded = (logger, text) => {
// As The text can be used for tts streaming, we need to break lengthy text into smaller chunks // As The text can be used for tts streaming, we need to break lengthy text into smaller chunks
// HIGH_WATER_BUFFER_SIZE defined in tts-streaming-buffer.js // HIGH_WATER_BUFFER_SIZE defined in tts-streaming-buffer.js
@@ -259,40 +281,32 @@ class TaskSay extends TtsTask {
while (!this.killed && (this.loop === 'forever' || this.loop--) && ep?.connected) { while (!this.killed && (this.loop === 'forever' || this.loop--) && ep?.connected) {
let segment = 0; let segment = 0;
while (!this.killed && segment < filepath.length) { while (!this.killed && segment < filepath.length) {
const filename = filepath[segment];
if (cs.isInConference) { if (cs.isInConference) {
const {memberId, confName, confUuid} = cs; const {memberId, confName, confUuid} = cs;
await this.playToConfMember(ep, memberId, confName, confUuid, filepath[segment]); await this.playToConfMember(ep, memberId, confName, confUuid, filename);
} }
else { else {
let playbackId; const isStreaming = filename.startsWith('say:{');
const isStreaming = filepath[segment].startsWith('say:{');
if (isStreaming) { if (isStreaming) {
const arr = /^say:\{.*\}\s*(.*)$/.exec(filepath[segment]); const arr = /^say:\{.*\}\s*(.*)$/.exec(filename);
if (arr) this.logger.debug(`Say:exec sending streaming tts request: ${arr[1].substring(0, 64)}..`); if (arr) this.logger.debug(`Say:exec sending streaming tts request ${arr[1].substring(0, 64)}..`);
} else this.logger.debug(`Say:exec sending ${filename.substring(0, 64)}`);
else {
this.logger.debug(`Say:exec sending ${filepath[segment].substring(0, 64)}`);
} }
const onPlaybackStop = (evt) => { const onPlaybackStop = (evt) => {
try { try {
this.logger.debug({evt}, const playbackId = this.getPlaybackId(segment);
`Say got playback-stop ${evt.variable_tts_playback_id ? evt.variable_tts_playback_id : ''}`); const isMatch = isMatchingEvent(this.logger, filename, playbackId, evt);
if (!isMatch) {
/** this.logger.info({currentPlaybackId: playbackId, stopPlaybackId: evt.variable_tts_playback_id},
* If we got a playback id on both the start and stop events, and they don't match,
* then we must have received a playback-stop event for an earlier play request.
*/
const unmatchedResponse = (!!playbackId && !!evt.variable_tts_playback_id) &&
evt.variable_tts_playback_id !== playbackId;
if (unmatchedResponse) {
this.logger.info({currentPlaybackId: playbackId, stopPPlaybackId: evt.variable_tts_playback_id},
'Say:exec discarding playback-stop for earlier play'); 'Say:exec discarding playback-stop for earlier play');
ep.once('playback-stop', this._boundOnPlaybackStop); ep.once('playback-stop', this._boundOnPlaybackStop);
return; return;
} }
this.logger.debug({evt},
`Say got playback-stop ${evt.variable_tts_playback_id ? evt.variable_tts_playback_id : ''}`);
this.notifyStatus({event: 'stop-playback'}); this.notifyStatus({event: 'stop-playback'});
this.notifiedPlayBackStop = true; this.notifiedPlayBackStop = true;
const tts_error = evt.variable_tts_error; const tts_error = evt.variable_tts_error;
@@ -331,6 +345,7 @@ class TaskSay extends TtsTask {
!this.disableTtsCache !this.disableTtsCache
) { ) {
const text = parseTextFromSayString(this.text[segment]); const text = parseTextFromSayString(this.text[segment]);
this.logger.debug({text, cacheFile: evt.variable_tts_cache_filename}, 'Say:exec cache tts');
addFileToCache(evt.variable_tts_cache_filename, { addFileToCache(evt.variable_tts_cache_filename, {
account_sid, account_sid,
vendor, vendor,
@@ -358,9 +373,17 @@ class TaskSay extends TtsTask {
}; };
this._boundOnPlaybackStop = onPlaybackStop.bind(this); this._boundOnPlaybackStop = onPlaybackStop.bind(this);
ep.once('playback-start', (evt) => { const onPlaybackStart = (evt) => {
try { try {
playbackId = evt.variable_tts_playback_id; const playbackId = this.getPlaybackId(segment);
const isMatch = isMatchingEvent(this.logger, filename, playbackId, evt);
if (!isMatch) {
this.logger.info({currentPlaybackId: playbackId, startPlaybackId: evt.variable_tts_playback_id},
'Say:exec playback-start - unmatched playback_id');
ep.once('playback-start', this._boundOnPlaybackStart);
return;
}
ep.once('playback-stop', this._boundOnPlaybackStop);
this.logger.debug({evt}, this.logger.debug({evt},
`Say got playback-start ${evt.variable_tts_playback_id ? evt.variable_tts_playback_id : ''}`); `Say got playback-start ${evt.variable_tts_playback_id ? evt.variable_tts_playback_id : ''}`);
if (this.otelSpan) { if (this.otelSpan) {
@@ -374,15 +397,17 @@ class TaskSay extends TtsTask {
} catch (err) { } catch (err) {
this.logger.info({err}, 'Error handling playback-start event'); this.logger.info({err}, 'Error handling playback-start event');
} }
}); };
ep.once('playback-stop', this._boundOnPlaybackStop); this._boundOnPlaybackStart = onPlaybackStart.bind(this);
ep.once('playback-start', this._boundOnPlaybackStart);
// wait for playback-stop event received to confirm if the playback is successful // wait for playback-stop event received to confirm if the playback is successful
this._playPromise = new Promise((resolve, reject) => { this._playPromise = new Promise((resolve, reject) => {
this._playResolve = resolve; this._playResolve = resolve;
this._playReject = reject; this._playReject = reject;
}); });
const r = await ep.play(filepath[segment]); const r = await ep.play(filename);
this.logger.debug({r}, 'Say:exec play result'); this.logger.debug({r}, 'Say:exec play result');
try { try {
// wait for playback-stop event received to confirm if the playback is successful // wait for playback-stop event received to confirm if the playback is successful
@@ -400,12 +425,12 @@ class TaskSay extends TtsTask {
this._playResolve = null; this._playResolve = null;
this._playReject = null; this._playReject = null;
} }
if (filepath[segment].startsWith('say:{')) { if (filename.startsWith('say:{')) {
const arr = /^say:\{.*\}\s*(.*)$/.exec(filepath[segment]); const arr = /^say:\{.*\}\s*(.*)$/.exec(filename);
if (arr) this.logger.debug(`Say:exec complete playing streaming tts request: ${arr[1].substring(0, 64)}..`); if (arr) this.logger.debug(`Say:exec complete playing streaming tts request: ${arr[1].substring(0, 64)}..`);
} else { } else {
// This log will print spech credentials in say command for tts stream mode // This log will print spech credentials in say command for tts stream mode
this.logger.debug(`Say:exec completed play file ${filepath[segment]}`); this.logger.debug(`Say:exec completed play file ${filename}`);
} }
} }
segment++; segment++;

View File

@@ -3,6 +3,16 @@ const { TaskPreconditions } = require('../utils/constants');
const { SpeechCredentialError } = require('../utils/error'); const { SpeechCredentialError } = require('../utils/error');
const dbUtils = require('../utils/db-utils'); const dbUtils = require('../utils/db-utils');
const extractPlaybackId = (str) => {
// Match say:{...} and capture the content inside braces
const match = str.match(/say:\{([^}]*)\}/);
if (!match) return null;
// Look for playback_id=value within the captured content
const playbackMatch = match[1].match(/playback_id=([^,]*)/);
return playbackMatch ? playbackMatch[1] : null;
};
class TtsTask extends Task { class TtsTask extends Task {
constructor(logger, data, parentTask) { constructor(logger, data, parentTask) {
@@ -22,6 +32,11 @@ class TtsTask extends Task {
this.disableTtsCache = this.data.disableTtsCache; this.disableTtsCache = this.data.disableTtsCache;
this.options = this.synthesizer.options || {}; this.options = this.synthesizer.options || {};
this.instructions = this.data.instructions; this.instructions = this.data.instructions;
this.playbackIds = [];
}
getPlaybackId(offset) {
return this.playbackIds[offset];
} }
async exec(cs) { async exec(cs) {
@@ -280,6 +295,7 @@ class TtsTask extends Task {
renderForCaching: preCache renderForCaching: preCache
}); });
if (!filePath.startsWith('say:')) { if (!filePath.startsWith('say:')) {
this.playbackIds.push(null);
this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`); this.logger.debug(`Say: file ${filePath}, served from cache ${servedFromCache}`);
if (filePath) cs.trackTmpFile(filePath); if (filePath) cs.trackTmpFile(filePath);
if (this.otelSpan) { if (this.otelSpan) {
@@ -309,7 +325,8 @@ class TtsTask extends Task {
} }
} }
else { else {
this.logger.debug('Say: a streaming tts api will be used'); this.playbackIds.push(extractPlaybackId(filePath));
this.logger.debug({playbackIds: this.playbackIds}, 'Say: a streaming tts api will be used');
const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`); const modifiedPath = filePath.replace('say:{', `say:{session-uuid=${ep.uuid},`);
return modifiedPath; return modifiedPath;
} }

8
package-lock.json generated
View File

@@ -15,7 +15,7 @@
"@jambonz/http-health-check": "^0.0.1", "@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7", "@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.15", "@jambonz/realtimedb-helpers": "^0.8.15",
"@jambonz/speech-utils": "^0.2.19", "@jambonz/speech-utils": "^0.2.22",
"@jambonz/stats-collector": "^0.1.10", "@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.14", "@jambonz/time-series": "^0.2.14",
"@jambonz/verb-specifications": "^0.0.113", "@jambonz/verb-specifications": "^0.0.113",
@@ -1376,9 +1376,9 @@
} }
}, },
"node_modules/@jambonz/speech-utils": { "node_modules/@jambonz/speech-utils": {
"version": "0.2.19", "version": "0.2.22",
"resolved": "https://registry.npmjs.org/@jambonz/speech-utils/-/speech-utils-0.2.19.tgz", "resolved": "https://registry.npmjs.org/@jambonz/speech-utils/-/speech-utils-0.2.22.tgz",
"integrity": "sha512-7Sw2pgmsMg/3y3PRhRts/oQrtMlowNS1dn6DgduiHviKSclJNx8oY8S7X8wsBQCe3xdFZYEDxfn9vpcGm4lqZw==", "integrity": "sha512-heSKhoIEAbIjmzwo4CKLkpClGBYrLEo7tud5V0kj2Su3MmgBjCNkPh2WVrP0Qj4Ix8ROKXLASzApkrL60zwNYg==",
"dependencies": { "dependencies": {
"23": "^0.0.0", "23": "^0.0.0",
"@aws-sdk/client-polly": "^3.496.0", "@aws-sdk/client-polly": "^3.496.0",

View File

@@ -31,7 +31,7 @@
"@jambonz/http-health-check": "^0.0.1", "@jambonz/http-health-check": "^0.0.1",
"@jambonz/mw-registrar": "^0.2.7", "@jambonz/mw-registrar": "^0.2.7",
"@jambonz/realtimedb-helpers": "^0.8.15", "@jambonz/realtimedb-helpers": "^0.8.15",
"@jambonz/speech-utils": "^0.2.19", "@jambonz/speech-utils": "^0.2.22",
"@jambonz/stats-collector": "^0.1.10", "@jambonz/stats-collector": "^0.1.10",
"@jambonz/time-series": "^0.2.14", "@jambonz/time-series": "^0.2.14",
"@jambonz/verb-specifications": "^0.0.113", "@jambonz/verb-specifications": "^0.0.113",