diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index 9e3ff309..5ca7738c 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -1161,7 +1161,7 @@ class TaskGather extends SttTask { } async _startFallback(cs, ep, evt) { - if (this.canFallback) { + if (this.canFallback()) { this._stopTranscribing(ep); try { this.logger.debug('gather:_startFallback'); diff --git a/lib/tasks/stt-task.js b/lib/tasks/stt-task.js index 79201185..06cd4992 100644 --- a/lib/tasks/stt-task.js +++ b/lib/tasks/stt-task.js @@ -171,7 +171,7 @@ class SttTask extends Task { try { this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label); } catch (error) { - if (this.canFallback) { + if (this.canFallback()) { this.notifyError( { msg: 'ASR error', details:`Invalid vendor ${this.vendor}, Error: ${error}`, @@ -260,8 +260,19 @@ class SttTask extends Task { ep.addCustomEventListener(event, handler); } - removeCustomEventListeners() { - this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + removeCustomEventListeners(ep) { + if (ep) { + // for specific endpoint + this.eventHandlers.filter((h) => h.ep === ep).forEach((h) => { + h.ep.removeCustomEventListener(h.event, h.handler); + }); + this.eventHandlers = this.eventHandlers.filter((h) => h.ep !== ep); + return; + } else { + // for all endpoints + this.eventHandlers.forEach((h) => h.ep.removeCustomEventListener(h.event, h.handler)); + this.eventHandlers = []; + } } async _initSpeechCredentials(cs, vendor, label) { @@ -329,11 +340,13 @@ class SttTask extends Task { return credentials; } - get canFallback() { + canFallback() { return this.fallbackVendor && this.isHandledByPrimaryProvider && !this.cs.hasFallbackAsr; } - async _initFallback() { + // ep is optional for gather or any verb that have single ep, + // but transcribe does need as it might has 2 eps + async _initFallback(ep) { assert(this.fallbackVendor, 'fallback failed without fallbackVendor configuration'); this.logger.info(`Failed to use primary STT provider, fallback to ${this.fallbackVendor}`); this.isHandledByPrimaryProvider = false; @@ -346,7 +359,7 @@ class SttTask extends Task { this.data.recognizer.label = this.label; this.sttCredentials = await this._initSpeechCredentials(this.cs, this.vendor, this.label); // cleanup previous listener from previous vendor - this.removeCustomEventListeners(); + this.removeCustomEventListeners(ep); } async compileHintsForCobalt(ep, hostport, model, token, hints) { diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index ec51685c..34ccf59b 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -70,6 +70,9 @@ class TaskTranscribe extends SttTask { this._bufferedTranscripts = [ [], [] ]; // for channel 1 and 2 this.bugname_prefix = 'transcribe_'; this.paused = false; + // fallback flags + this.isHandledByPrimaryProviderForEp1 = true; + this.isHandledByPrimaryProviderForEp2 = true; } get name() { return TaskName.Transcribe; } @@ -776,7 +779,7 @@ class TaskTranscribe extends SttTask { } async _startFallback(cs, _ep, evt) { - if (this.canFallback) { + if (this.canFallback(_ep)) { _ep.stopTranscription({ vendor: this.vendor, bugname: this.bugname, @@ -786,7 +789,7 @@ class TaskTranscribe extends SttTask { try { this.notifyError({ msg: 'ASR error', details:`STT Vendor ${this.vendor} error: ${evt.error || evt.reason}`, failover: 'in progress'}); - await this._initFallback(); + await this._initFallback(_ep); let channel = 1; if (this.ep !== _ep) { channel = 2; @@ -895,6 +898,41 @@ class TaskTranscribe extends SttTask { if (this._asrTimer) clearTimeout(this._asrTimer); this._asrTimer = null; } + + // We need to keep track the fallback is happened for each endpoint + // override the canFallback and _initFallback methods to make sure that + // we only fallback once per endpoint + // we want to keep track this on task level instead of endpoint level + // because the endpoint instance is used across multiple tasks. + canFallback(ep) { + let isHandledByPrimaryProvider = this.isHandledByPrimaryProvider; + if (ep === this.ep) { + isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp1; + } else if (ep === this.ep2) { + isHandledByPrimaryProvider = this.isHandledByPrimaryProviderForEp2; + } + + const isOneOfEndpointAlreadyFallenBack = !!this.ep && !!this.ep2 && + this.isHandledByPrimaryProviderForEp1 !== this.isHandledByPrimaryProviderForEp2; + + // fallback is configured + return this.fallbackVendor && + // has this endpoint already fallen back + isHandledByPrimaryProvider && + // in global level, is there any fallback is already happened + // one fallen endpoint will mark cs.hasFallbackAsr to true, + // so if one endpoint was fallen, the other endpoint would be able to fallback. + (isOneOfEndpointAlreadyFallenBack || !this.cs.hasFallbackAsr); + } + + _initFallback(ep) { + if (ep === this.ep) { + this.isHandledByPrimaryProviderForEp1 = false; + } else if (ep === this.ep2) { + this.isHandledByPrimaryProviderForEp2 = false; + } + return super._initFallback(ep); + } } module.exports = TaskTranscribe;