fix race condition if transcribe_bugname is saved in ep

This commit is contained in:
Quan HL
2024-01-09 19:49:55 +07:00
parent a2ba80a9a3
commit d4939131a1

View File

@@ -41,6 +41,7 @@ class TaskTranscribe extends SttTask {
/* buffer speech for continuous asr */ /* buffer speech for continuous asr */
this._bufferedTranscripts = []; this._bufferedTranscripts = [];
this.bugname_prefix = 'transcribe_'; this.bugname_prefix = 'transcribe_';
this.bugnames = new Map();
} }
get name() { return TaskName.Transcribe; } get name() { return TaskName.Transcribe; }
@@ -89,13 +90,13 @@ class TaskTranscribe extends SttTask {
stopTranscription = true; stopTranscription = true;
this.ep.stopTranscription({ this.ep.stopTranscription({
vendor: this.vendor, vendor: this.vendor,
bugname: this.ep.transcribe_bugname bugname: this.bugnames.get(1)
}) })
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
} }
if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) { if (this.separateRecognitionPerChannel && this.ep2 && this.ep2.connected) {
stopTranscription = true; stopTranscription = true;
this.ep2.stopTranscription({vendor: this.vendor, bugname: this.ep2.transcribe_bugname}) this.ep2.stopTranscription({vendor: this.vendor, bugname: this.bugnames.get(2)})
.catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill')); .catch((err) => this.logger.info(err, 'Error TaskTranscribe:kill'));
} }
@@ -265,31 +266,32 @@ class TaskTranscribe extends SttTask {
} }
} }
// save dedicated bugname for each endpoint // save dedicated bugname for each endpoint
ep.transcribe_bugname = `${bugname}_${Date.now()}`; this.bugnames.set(channel, `${bugname}_${Date.now()}`);
/* common handler for all stt engine errors */ /* common handler for all stt engine errors */
this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep)); this.addCustomEventListener(ep, JambonzTranscriptionEvents.Error, this._onJambonzError.bind(this, cs, ep, channel));
await ep.set(opts) await ep.set(opts)
.catch((err) => this.logger.info(err, 'Error setting channel variables')); .catch((err) => this.logger.info(err, 'Error setting channel variables'));
} }
async _startTranscribing(cs, ep, channel) { async _startTranscribing(cs, ep, channel) {
await this._setSpeechHandlers(cs, ep, channel); await this._setSpeechHandlers(cs, ep, channel);
await this._transcribe(ep); await this._transcribe(ep, channel);
/* start child span for this channel */ /* start child span for this channel */
const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`); const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`);
this.childSpan[channel - 1] = {span, ctx}; this.childSpan[channel - 1] = {span, ctx};
} }
async _transcribe(ep) { async _transcribe(ep, channel) {
this.logger.debug( this.logger.debug(
`TaskTranscribe:_transcribe - starting transcription vendor ${this.vendor} bugname ${ep.transcribe_bugname}`); `TaskTranscribe:_transcribe - starting transcription vendor
${this.vendor} bugname ${this.bugnames.get(channel)}`);
await ep.startTranscription({ await ep.startTranscription({
vendor: this.vendor, vendor: this.vendor,
interim: this.interim ? true : false, interim: this.interim ? true : false,
locale: this.language, locale: this.language,
channels: /*this.separateRecognitionPerChannel ? 2 : */ 1, channels: /*this.separateRecognitionPerChannel ? 2 : */ 1,
bugname: ep.transcribe_bugname, bugname: this.bugnames.get(channel),
hostport: this.hostport hostport: this.hostport
}); });
} }
@@ -297,7 +299,7 @@ class TaskTranscribe extends SttTask {
async _onTranscription(cs, ep, channel, evt, fsEvent) { async _onTranscription(cs, ep, channel, evt, fsEvent) {
// make sure this is not a transcript from answering machine detection // make sure this is not a transcript from answering machine detection
const bugname = fsEvent.getHeader('media-bugname'); const bugname = fsEvent.getHeader('media-bugname');
if (bugname && ep.transcribe_bugname !== bugname) return; if (bugname && this.bugnames.get(channel) !== bugname) return;
if (this.vendor === 'ibm' && evt?.state === 'listening') return; if (this.vendor === 'ibm' && evt?.state === 'listening') return;
@@ -330,7 +332,7 @@ class TaskTranscribe extends SttTask {
} }
else { else {
this.logger.info({evt}, 'TaskTranscribe:_onTranscription - got empty transcript, listen again'); this.logger.info({evt}, 'TaskTranscribe:_onTranscription - got empty transcript, listen again');
this._transcribe(ep); this._transcribe(ep, channel);
} }
return; return;
} }
@@ -409,7 +411,7 @@ class TaskTranscribe extends SttTask {
}); });
this.childSpan[channel - 1].span.end(); this.childSpan[channel - 1].span.end();
} }
this._transcribe(ep); this._transcribe(ep, channel);
/* start new child span for this channel */ /* start new child span for this channel */
const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`); const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`);
@@ -426,7 +428,7 @@ class TaskTranscribe extends SttTask {
this.childSpan[channel - 1].span.end(); this.childSpan[channel - 1].span.end();
} }
this._transcribe(ep); this._transcribe(ep, channel);
/* start new child span for this channel */ /* start new child span for this channel */
const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`); const {span, ctx} = this.startChildSpan(`${STT_LISTEN_SPAN_NAME}:${channel}`);
@@ -440,12 +442,12 @@ class TaskTranscribe extends SttTask {
} }
} }
async _onJambonzError(cs, _ep, evt) { async _onJambonzError(cs, _ep, channel, evt) {
this.logger.info({evt}, 'TaskTranscribe:_onJambonzError'); this.logger.info({evt}, 'TaskTranscribe:_onJambonzError');
if (this.isHandledByPrimaryProvider && this.fallbackVendor) { if (this.isHandledByPrimaryProvider && this.fallbackVendor) {
_ep.stopTranscription({ _ep.stopTranscription({
vendor: this.vendor, vendor: this.vendor,
bugname: _ep.transcribe_bugname bugname: this.bugnames.get(channel)
}) })
.catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`)); .catch((err) => this.logger.error({err}, `Error stopping transcription for primary vendor ${this.vendor}`));
const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf); const {updateSpeechCredentialLastUsed} = require('../utils/db-utils')(this.logger, cs.srf);