Fix/dual legs transcribe race condition (#593)

* fs only stop one of bugname when transcribe is used for dual legs

* wip

* fix review comment

* wip

* wip
This commit is contained in:
Hoan Luu Huu
2024-01-07 07:12:51 +07:00
committed by GitHub
parent 8173a306f7
commit 9d70ed96a1

View File

@@ -89,13 +89,13 @@ class TaskTranscribe extends SttTask {
stopTranscription = true; stopTranscription = true;
this.ep.stopTranscription({ this.ep.stopTranscription({
vendor: this.vendor, vendor: this.vendor,
bugname: this.bugname bugname: this.ep.transcribe_bugname
}) })
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
} }
if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) { if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) {
stopTranscription = true; stopTranscription = true;
this.ep2.stopTranscription({vendor: this.vendor}) this.ep2.stopTranscription({vendor: this.vendor, bugname: this.ep2.transcribe_bugname})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
} }
@@ -132,6 +132,7 @@ class TaskTranscribe extends SttTask {
async _setSpeechHandlers(cs, ep, channel) { async _setSpeechHandlers(cs, ep, channel) {
if (this[`_speechHandlersSet_${channel}`]) return; if (this[`_speechHandlersSet_${channel}`]) return;
this[`_speechHandlersSet_${channel}`] = true; this[`_speechHandlersSet_${channel}`] = true;
let bugname;
/* some special deepgram logic */ /* some special deepgram logic */
if (this.vendor === 'deepgram') { if (this.vendor === 'deepgram') {
@@ -141,7 +142,7 @@ class TaskTranscribe extends SttTask {
const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer); const opts = this.setChannelVarsForStt(this, this.sttCredentials, this.data.recognizer);
switch (this.vendor) { switch (this.vendor) {
case 'google': case 'google':
this.bugname = `${this.bugname_prefix}google_transcribe`; bugname = `${this.bugname_prefix}google_transcribe`;
this.addCustomEventListener(ep, GoogleTranscriptionEvents.Transcription, this.addCustomEventListener(ep, GoogleTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, GoogleTranscriptionEvents.NoAudioDetected, this.addCustomEventListener(ep, GoogleTranscriptionEvents.NoAudioDetected,
@@ -152,7 +153,7 @@ class TaskTranscribe extends SttTask {
case 'aws': case 'aws':
case 'polly': case 'polly':
this.bugname = `${this.bugname_prefix}aws_transcribe`; bugname = `${this.bugname_prefix}aws_transcribe`;
this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription, this.addCustomEventListener(ep, AwsTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, AwsTranscriptionEvents.NoAudioDetected, this.addCustomEventListener(ep, AwsTranscriptionEvents.NoAudioDetected,
@@ -161,19 +162,19 @@ class TaskTranscribe extends SttTask {
this._onMaxDurationExceeded.bind(this, cs, ep, channel)); this._onMaxDurationExceeded.bind(this, cs, ep, channel));
break; break;
case 'microsoft': case 'microsoft':
this.bugname = `${this.bugname_prefix}azure_transcribe`; bugname = `${this.bugname_prefix}azure_transcribe`;
this.addCustomEventListener(ep, AzureTranscriptionEvents.Transcription, this.addCustomEventListener(ep, AzureTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected, this.addCustomEventListener(ep, AzureTranscriptionEvents.NoSpeechDetected,
this._onNoAudio.bind(this, cs, ep, channel)); this._onNoAudio.bind(this, cs, ep, channel));
break; break;
case 'nuance': case 'nuance':
this.bugname = `${this.bugname_prefix}nuance_transcribe`; bugname = `${this.bugname_prefix}nuance_transcribe`;
this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription, this.addCustomEventListener(ep, NuanceTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
break; break;
case 'deepgram': case 'deepgram':
this.bugname = `${this.bugname_prefix}deepgram_transcribe`; bugname = `${this.bugname_prefix}deepgram_transcribe`;
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Transcription, this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect, this.addCustomEventListener(ep, DeepgramTranscriptionEvents.Connect,
@@ -186,12 +187,12 @@ class TaskTranscribe extends SttTask {
break; break;
case 'soniox': case 'soniox':
this.bugname = `${this.bugname_prefix}soniox_transcribe`; bugname = `${this.bugname_prefix}soniox_transcribe`;
this.addCustomEventListener(ep, SonioxTranscriptionEvents.Transcription, this.addCustomEventListener(ep, SonioxTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
break; break;
case 'cobalt': case 'cobalt':
this.bugname = `${this.bugname_prefix}cobalt_transcribe`; bugname = `${this.bugname_prefix}cobalt_transcribe`;
this.addCustomEventListener(ep, CobaltTranscriptionEvents.Transcription, this.addCustomEventListener(ep, CobaltTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
@@ -221,7 +222,7 @@ class TaskTranscribe extends SttTask {
break; break;
case 'ibm': case 'ibm':
this.bugname = `${this.bugname_prefix}ibm_transcribe`; bugname = `${this.bugname_prefix}ibm_transcribe`;
this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription, this.addCustomEventListener(ep, IbmTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect, this.addCustomEventListener(ep, IbmTranscriptionEvents.Connect,
@@ -231,13 +232,13 @@ class TaskTranscribe extends SttTask {
break; break;
case 'nvidia': case 'nvidia':
this.bugname = `${this.bugname_prefix}nvidia_transcribe`; bugname = `${this.bugname_prefix}nvidia_transcribe`;
this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription, this.addCustomEventListener(ep, NvidiaTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
break; break;
case 'assemblyai': case 'assemblyai':
this.bugname = `${this.bugname_prefix}assemblyai_transcribe`; bugname = `${this.bugname_prefix}assemblyai_transcribe`;
this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription, this.addCustomEventListener(ep, AssemblyAiTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, this.addCustomEventListener(ep,
@@ -249,7 +250,7 @@ class TaskTranscribe extends SttTask {
default: default:
if (this.vendor.startsWith('custom:')) { if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.bugname_prefix}${this.vendor}_transcribe`; bugname = `${this.bugname_prefix}${this.vendor}_transcribe`;
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Transcription, this.addCustomEventListener(ep, JambonzTranscriptionEvents.Transcription,
this._onTranscription.bind(this, cs, ep, channel)); this._onTranscription.bind(this, cs, ep, channel));
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep)); this.addCustomEventListener(ep, JambonzTranscriptionEvents.Connect, this._onVendorConnect.bind(this, cs, ep));
@@ -263,7 +264,8 @@ class TaskTranscribe extends SttTask {
throw new Error(`Invalid vendor ${this.vendor}`); throw new Error(`Invalid vendor ${this.vendor}`);
} }
} }
// save dedicated bugname for each endpoint
ep.transcribe_bugname = `${bugname}_${Date.now()}`;
/* common handler for all stt engine errors */ /* common handler for all stt engine errors */
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep)); this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep));
await ep.set(opts) await ep.set(opts)
@@ -281,13 +283,13 @@ class TaskTranscribe extends SttTask {
async _transcribe(ep) { async _transcribe(ep) {
this.logger.debug( this.logger.debug(
`TaskTranscribe:_transcribe - starting transcription vendor ${this.vendor} bugname ${this.bugname}`); `TaskTranscribe:_transcribe - starting transcription vendor ${this.vendor} bugname ${ep.transcribe_bugname}`);
await ep.startTranscription({ await ep.startTranscription({
vendor: this.vendor, vendor: this.vendor,
interim: this.interim ? true : false, interim: this.interim ? true : false,
locale: this.language, locale: this.language,
channels: /*this.separateRecognitionPerChannel ? 2 : */ 1, channels: /*this.separateRecognitionPerChannel ? 2 : */ 1,
bugname: this.bugname, bugname: ep.transcribe_bugname,
hostport: this.hostport hostport: this.hostport
}); });
} }
@@ -295,7 +297,7 @@ class TaskTranscribe extends SttTask {
async _onTranscription(cs, ep, channel, evt, fsEvent) { async _onTranscription(cs, ep, channel, evt, fsEvent) {
// make sure this is not a transcript from answering machine detection // make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname'); const bugname = fsEvent.getHeader('media-bugname');
if (bugname && this.bugname !== bugname) return; if (bugname && ep.transcribe_bugname !== bugname) return;
if (this.vendor === 'ibm' && evt?.state === 'listening') return; if (this.vendor === 'ibm' && evt?.state === 'listening') return;
@@ -443,7 +445,7 @@ class TaskTranscribe extends SttTask {
if (this.isHandledByPrimaryProvider && this.fallbackVendor) { if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
_ep.stopTranscription({ _ep.stopTranscription({
vendor: this.vendor, vendor: this.vendor,
bugname: this.bugname bugname: _ep.transcribe_bugname
}) })
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`)); .catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf); const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);