From a96fb2b4b2ebd990bac88bd80be0e8bd0c138102 Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Sat, 6 Apr 2024 12:26:56 -0400 Subject: [PATCH] Fixes/mod azure tts (#36) * fixes for unlocking mutex and minimizing time under lock * call .get() on future returned from speechSynthesizer->SpeakTextAsync * mod_azure_tts: various fixes, including dangling session lock and reducing latency * mod_deepgram_transcribe: add support for transcribing filler words --- mod_azure_tts/azure_glue.cpp | 135 ++++++++++-------- mod_deepgram_transcribe/README.md | 4 +- .../dg_transcribe_glue.cpp | 3 + 3 files changed, 81 insertions(+), 61 deletions(-) diff --git a/mod_azure_tts/azure_glue.cpp b/mod_azure_tts/azure_glue.cpp index 2b885a3..8ead3c0 100644 --- a/mod_azure_tts/azure_glue.cpp +++ b/mod_azure_tts/azure_glue.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #define BUFFER_SIZE 8129 @@ -16,6 +17,30 @@ using namespace Microsoft::CognitiveServices::Speech; static std::string fullDirPath; +static void start_synthesis(std::shared_ptr speechSynthesizer, const char* text) { + try { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "start_synthesis calling \n"); + auto result = std::strncmp(text, "SpeakSsmlAsync(text).get() : + speechSynthesizer->SpeakTextAsync(text).get(); + + if (result->Reason == ResultReason::SynthesizingAudioCompleted) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "start_synthesis completed id %s, audio data - bytes: %ld, milliseconds: %ld milliseconds\n", + result->ResultId.c_str(), result->GetAudioLength(), result->AudioDuration.count()); + } else if (result->Reason == ResultReason::Canceled) { + auto cancellation = SpeechSynthesisCancellationDetails::FromResult(result); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, + "Error synthesizing text %s: (%d) %s.\n", text, static_cast(cancellation->ErrorCode), cancellation->ErrorDetails.c_str()); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error synthsize text %s (%d).\n", text, static_cast(result->Reason)); + } + + + } catch (const std::exception& e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "mod_azure_tts: Exception in start_synthesis %s\n", e.what()); + } +} + extern "C" { switch_status_t azure_speech_load() { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "azure_speech_loading..\n"); @@ -109,8 +134,12 @@ extern "C" { if (a->session_id) { int err; switch_codec_implementation_t read_impl; + + /* lock and unlock session */ switch_core_session_t *psession = switch_core_session_locate(a->session_id); switch_core_session_get_read_impl(psession, &read_impl); + switch_core_session_rwunlock(psession); + uint32_t samples_per_second = !strcasecmp(read_impl.iananame, "g722") ? read_impl.actual_samples_per_second : read_impl.samples_per_second; a->samples_rate = samples_per_second; if (samples_per_second != 8000 /*Hz*/) { @@ -154,39 +183,29 @@ extern "C" { }; speechSynthesizer->Synthesizing += [a](const SpeechSynthesisEventArgs& e) { + if (a->flushed) return; bool fireEvent = false; CircularBuffer_t *cBuffer = (CircularBuffer_t *) a->circularBuffer; - std::vector pcm_data; - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Synthesizing: received data\n"); - - if (a->flushed) { - return; + auto audioData = e.Result->GetAudioData(); + if (a->file) { + fwrite(audioData->data(), 1, audioData->size(), a->file); } - { - switch_mutex_lock(a->mutex); - auto audioData = e.Result->GetAudioData(); - for (size_t i = 0; i < audioData->size(); i += sizeof(int16_t)) { - int16_t value = static_cast((*audioData)[i]) | (static_cast((*audioData)[i + 1]) << 8); - pcm_data.push_back(value); - } - /* and write to the file */ - size_t bytesResampled = pcm_data.size() * sizeof(uint16_t); - if (a->file) fwrite(pcm_data.data(), sizeof(uint16_t), pcm_data.size(), a->file); + /** + * this sort of reinterpretation can be dangerous as a general rule, but in this case we know that the data + * is 16-bit PCM, so it's safe to do this and its much faster than copying the data byte by byte + */ + const uint16_t* begin = reinterpret_cast(audioData->data()); + const uint16_t* end = reinterpret_cast(audioData->data() + audioData->size()); - // Resize the buffer if necessary - if (cBuffer->capacity() - cBuffer->size() < (bytesResampled / sizeof(uint16_t))) { - - //TODO: if buffer exceeds some max size, return CURL_WRITEFUNC_ERROR to abort the transfer - cBuffer->set_capacity(cBuffer->size() + std::max((bytesResampled / sizeof(uint16_t)), (size_t)BUFFER_SIZE)); - } - - /* Push the data into the buffer */ - cBuffer->insert(cBuffer->end(), pcm_data.data(), pcm_data.data() + pcm_data.size()); - - switch_mutex_unlock(a->mutex); + /* lock as briefly as possible */ + switch_mutex_lock(a->mutex); + if (cBuffer->capacity() - cBuffer->size() < audioData->size()) { + cBuffer->set_capacity(cBuffer->size() + std::max( audioData->size(), (size_t)BUFFER_SIZE)); } + cBuffer->insert(cBuffer->end(), begin, end); + switch_mutex_unlock(a->mutex); if (0 == a->reads++) { fireEvent = true; @@ -200,6 +219,7 @@ extern "C" { switch_core_session_t* session = switch_core_session_locate(a->session_id); if (session) { switch_channel_t *channel = switch_core_session_get_channel(session); + switch_core_session_rwunlock(session); if (channel) { switch_event_t *event; if (switch_event_create(&event, SWITCH_EVENT_PLAYBACK_START) == SWITCH_STATUS_SUCCESS) { @@ -216,7 +236,6 @@ extern "C" { }else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "speechSynthesizer->Synthesizing: channel not found\n"); } - switch_core_session_rwunlock(session); } } }; @@ -231,16 +250,12 @@ extern "C" { auto cancellation = SpeechSynthesisCancellationDetails::FromResult(e.Result); a->response_code = static_cast(cancellation->ErrorCode); a->err_msg = strdup(cancellation->ErrorDetails.c_str()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error synthsize tex %d with error string: %s.\n", static_cast(cancellation->ErrorCode), cancellation->ErrorDetails.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error synthesizing text %d with error string: %s.\n", static_cast(cancellation->ErrorCode), cancellation->ErrorDetails.c_str()); } + a->draining = 1; }; - // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "azure_speech_feed_tts before sending synthesize request\n"); - if (std::strncmp(text, "SpeakSsmlAsync(text); - } else { - speechSynthesizer->SpeakTextAsync(text); - } - // switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "azure_speech_feed_tts sent synthesize request\n"); + std::thread(start_synthesis, speechSynthesizer, text).detach(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "azure_speech_feed_tts sent synthesize request\n"); return SWITCH_STATUS_SUCCESS; } @@ -248,33 +263,31 @@ extern "C" { CircularBuffer_t *cBuffer = (CircularBuffer_t *) a->circularBuffer; std::vector pcm_data; - { - switch_mutex_lock(a->mutex); - if (a->response_code > 0 && a->response_code != 200) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "azure_speech_read_tts, returning failure\n") ; - return SWITCH_STATUS_FALSE; - } - if (a->flushed) { + if (a->response_code > 0 && a->response_code != 200) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "azure_speech_read_tts, returning failure\n") ; + return SWITCH_STATUS_FALSE; + } + if (a->flushed) { + return SWITCH_STATUS_BREAK; + } + switch_mutex_lock(a->mutex); + size_t bufSize = cBuffer->size(); + if (cBuffer->empty()) { + switch_mutex_unlock(a->mutex); + if (a->draining) { return SWITCH_STATUS_BREAK; } - if (cBuffer->empty()) { - if (a->draining) { - switch_mutex_unlock(a->mutex); - return SWITCH_STATUS_BREAK; - } - /* no audio available yet so send silence */ - memset(data, 255, *datalen); - switch_mutex_unlock(a->mutex); - return SWITCH_STATUS_SUCCESS; - } - // azure returned 8000hz 16 bit data, we have to take enough data based on call sample rate. - size_t size = a->samples_rate ? - std::min((*datalen/(2 * a->samples_rate / 8000)), cBuffer->size()) : - std::min((*datalen/2), cBuffer->size()); - pcm_data.insert(pcm_data.end(), cBuffer->begin(), cBuffer->begin() + size); - cBuffer->erase(cBuffer->begin(), cBuffer->begin() + size); - switch_mutex_unlock(a->mutex); + /* no audio available yet so send silence */ + memset(data, 255, *datalen); + return SWITCH_STATUS_SUCCESS; } + // azure returned 8000hz 16 bit data, we have to take enough data based on call sample rate. + size_t size = a->samples_rate ? + std::min((*datalen/(2 * a->samples_rate / 8000)), bufSize) : + std::min((*datalen/2), bufSize); + pcm_data.insert(pcm_data.end(), cBuffer->begin(), cBuffer->begin() + size); + cBuffer->erase(cBuffer->begin(), cBuffer->begin() + size); + switch_mutex_unlock(a->mutex); size_t data_size = pcm_data.size(); @@ -336,6 +349,9 @@ extern "C" { switch_core_session_t* session = switch_core_session_locate(a->session_id); if (session) { switch_channel_t *channel = switch_core_session_get_channel(session); + + /* unlock as quickly as possible */ + switch_core_session_rwunlock(session); if (channel) { switch_event_t *event; if (switch_event_create(&event, SWITCH_EVENT_PLAYBACK_STOP) == SWITCH_STATUS_SUCCESS) { @@ -357,7 +373,6 @@ extern "C" { else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "write_cb: channel not found\n"); } - switch_core_session_rwunlock(session); } } diff --git a/mod_deepgram_transcribe/README.md b/mod_deepgram_transcribe/README.md index 00b0909..e1bb1ba 100644 --- a/mod_deepgram_transcribe/README.md +++ b/mod_deepgram_transcribe/README.md @@ -25,10 +25,11 @@ Stop transcription on the channel. | variable | Description | | --- | ----------- | | DEEPGRAM_API_KEY | Deepgram API key used to authenticate | -| DEEPGRAM_SPEECH_TIER | https://developers.deepgram.com/documentation/features/tier/ | | DEEPGRAM_SPEECH_CUSTOM_MODEL | custom model id | | DEEPGRAM_SPEECH_MODEL | https://developers.deepgram.com/documentation/features/model/ | | DEEPGRAM_SPEECH_MODEL_VERSION | https://developers.deepgram.com/documentation/features/version/ | +| DEEPGRAM_SPEECH_ENABLE_SMART_FORMAT | https://developers.deepgram.com/docs/smart-format | +| DEEPGRAM_SPEECH_ENABLE_FILLER_WORDS | https://developers.deepgram.com/docs/filler-words | | DEEPGRAM_SPEECH_ENABLE_AUTOMATIC_PUNCTUATION | https://developers.deepgram.com/documentation/features/punctuate/ | | DEEPGRAM_SPEECH_PROFANITY_FILTER | https://developers.deepgram.com/documentation/features/profanity-filter/ | | DEEPGRAM_SPEECH_REDACT | https://developers.deepgram.com/documentation/features/redact/ | @@ -42,6 +43,7 @@ Stop transcription on the channel. | DEEPGRAM_SPEECH_REPLACE | https://developers.deepgram.com/documentation/features/replace/ | | DEEPGRAM_SPEECH_TAG | https://developers.deepgram.com/documentation/features/tag/ | | DEEPGRAM_SPEECH_ENDPOINTING | https://developers.deepgram.com/documentation/features/endpointing/ | +| DEEPGRAM_SPEECH_UTTERANCE_END_MS | https://developers.deepgram.com/docs/utterance-end | | DEEPGRAM_SPEECH_VAD_TURNOFF | https://developers.deepgram.com/documentation/features/voice-activity-detection/ | diff --git a/mod_deepgram_transcribe/dg_transcribe_glue.cpp b/mod_deepgram_transcribe/dg_transcribe_glue.cpp index 5111a44..0762e1a 100644 --- a/mod_deepgram_transcribe/dg_transcribe_glue.cpp +++ b/mod_deepgram_transcribe/dg_transcribe_glue.cpp @@ -184,6 +184,9 @@ namespace { * */ } + if (var = switch_channel_get_variable(channel, "DEEPGRAM_SPEECH_ENABLE_FILLER_WORDS")) { + oss << "&filler_words=true"; + } if (var = switch_channel_get_variable(channel, "DEEPGRAM_SPEECH_ENABLE_AUTOMATIC_PUNCTUATION")) { oss << "&punctuate=true"; }