From 33750b5420eea4d42dfddab98dc96d2a0f55ab79 Mon Sep 17 00:00:00 2001 From: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com> Date: Tue, 30 Apr 2024 19:00:59 +0700 Subject: [PATCH] mod_deepgram_transcribe keep alive connection (#56) * mod_deepgram_transcribe keep alive connection Signed-off-by: Hoan HL * wip Signed-off-by: Hoan HL * wip Signed-off-by: Hoan HL * wip Signed-off-by: Hoan HL * wip Signed-off-by: Hoan HL * read use single connection configuration from env var * remove deepgram media bug buffered data Signed-off-by: Hoan HL * fix review comments Signed-off-by: Hoan HL --------- Signed-off-by: Hoan HL --- mod_deepgram_transcribe/audio_pipe.cpp | 11 +- mod_deepgram_transcribe/audio_pipe.hpp | 2 + .../dg_transcribe_glue.cpp | 171 ++++++++++++------ .../mod_deepgram_transcribe.c | 15 +- .../mod_deepgram_transcribe.h | 3 + 5 files changed, 144 insertions(+), 58 deletions(-) diff --git a/mod_deepgram_transcribe/audio_pipe.cpp b/mod_deepgram_transcribe/audio_pipe.cpp index 2e9fc15..fa3300e 100644 --- a/mod_deepgram_transcribe/audio_pipe.cpp +++ b/mod_deepgram_transcribe/audio_pipe.cpp @@ -176,7 +176,9 @@ int AudioPipe::lws_callback(struct lws *wsi, if (lws_is_final_fragment(wsi)) { if (nullptr != ap->m_recv_buf) { std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf); - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), deepgram::AudioPipe::MESSAGE, msg.c_str(), ap->isFinished()); + if (!ap->m_silence_disconnect) { + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), deepgram::AudioPipe::MESSAGE, msg.c_str(), ap->isFinished()); + } if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf); } ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; @@ -451,7 +453,7 @@ AudioPipe::AudioPipe(const char* uuid, const char* bugname, const char* host, un m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false), m_bugname(bugname), m_audio_buffer_min_freespace(minFreespace), m_audio_buffer_max_len(bufLen), m_gracefulShutdown(false), m_audio_buffer_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_useTls(useTls), - m_state(LWS_CLIENT_IDLE), m_wsi(nullptr), m_vhd(nullptr), m_callback(callback) { + m_state(LWS_CLIENT_IDLE), m_wsi(nullptr), m_vhd(nullptr), m_callback(callback), m_silence_disconnect(false) { if (apiKey) m_apiKey = apiKey; else m_apiKey = ""; @@ -516,6 +518,11 @@ void AudioPipe::finish() { bufferForSending("{\"type\": \"CloseStream\"}"); } +void AudioPipe::finish_in_silence() { + m_silence_disconnect = true; + finish(); +} + void AudioPipe::waitForClose() { std::shared_future sf(m_promise.get_future()); sf.wait(); diff --git a/mod_deepgram_transcribe/audio_pipe.hpp b/mod_deepgram_transcribe/audio_pipe.hpp index 64f5c54..0ecaab3 100644 --- a/mod_deepgram_transcribe/audio_pipe.hpp +++ b/mod_deepgram_transcribe/audio_pipe.hpp @@ -76,6 +76,7 @@ namespace deepgram { void close() ; void finish(); + void finish_in_silence(); void waitForClose(); void setClosed() { m_promise.set_value(); } bool isFinished() { return m_finished;} @@ -138,6 +139,7 @@ namespace deepgram { std::string m_bugname; std::promise m_promise; bool m_useTls; + bool m_silence_disconnect; }; } // namespace deepgram diff --git a/mod_deepgram_transcribe/dg_transcribe_glue.cpp b/mod_deepgram_transcribe/dg_transcribe_glue.cpp index 63e719d..48512a6 100644 --- a/mod_deepgram_transcribe/dg_transcribe_glue.cpp +++ b/mod_deepgram_transcribe/dg_transcribe_glue.cpp @@ -22,6 +22,7 @@ #define RTP_PACKETIZATION_PERIOD 20 #define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/ +#define DEEPGRAM_KEEP_ALIVE_INTERVAL_SECOND 8 namespace { static bool hasDefaultCredentials = false; @@ -86,13 +87,17 @@ namespace { static const char* emptyTranscript = "\"is_final\":false,\"speech_final\":false,\"channel\":{\"alternatives\":[{\"transcript\":\"\",\"confidence\":0.0,\"words\":[]}]}"; - static void reaper(private_t *tech_pvt) { + static void reaper(private_t *tech_pvt, bool silence_disconnect) { std::shared_ptr pAp; pAp.reset((deepgram::AudioPipe *)tech_pvt->pAudioPipe); tech_pvt->pAudioPipe = nullptr; - std::thread t([pAp, tech_pvt]{ - pAp->finish(); + std::thread t([pAp, tech_pvt, silence_disconnect]{ + if (silence_disconnect) { + pAp->finish_in_silence(); + } else { + pAp->finish(); + } pAp->waitForClose(); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "%s (%u) got remote close\n", tech_pvt->sessionId, tech_pvt->id); }); @@ -342,60 +347,82 @@ namespace { int err; int useTls = true; + std::string host; + int port = 443; + size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); + + std::ostringstream configuration_stream; switch_codec_implementation_t read_impl; switch_channel_t *channel = switch_core_session_get_channel(session); switch_core_session_get_read_impl(session, &read_impl); - memset(tech_pvt, 0, sizeof(private_t)); - std::string path; constructPath(session, path, desiredSampling, channels, lang, interim); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "path: %s\n", path.c_str()); - strncpy(tech_pvt->sessionId, switch_core_session_get_uuid(session), MAX_SESSION_ID); - const char* endpoint = switch_channel_get_variable(channel, "DEEPGRAM_URI"); if (endpoint != nullptr) { std::string ep(endpoint); - useTls = switch_true(switch_channel_get_variable(channel, "DEEPGRAM_USE_TLS")); size_t pos = ep.find(':'); + host = ep; if (pos != std::string::npos) { - std::string host = ep.substr(0, pos); - std::string port = ep.substr(pos + 1); - strncpy(tech_pvt->host, host.c_str(), MAX_WS_URL_LEN); - tech_pvt->port = ::atoi(port.c_str()); - } - else { - strncpy(tech_pvt->host, ep.c_str(), MAX_WS_URL_LEN); - tech_pvt->port = 443; + host = ep.substr(0, pos); + std::string strPort = ep.substr(pos + 1); + port = ::atoi(strPort.c_str()); } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, - "connecting to deepgram on-prem %s port %d, using tls? (%s)\n", tech_pvt->host, tech_pvt->port, useTls ? "yes" : "no"); - } - else { - strncpy(tech_pvt->host, "api.deepgram.com", MAX_WS_URL_LEN); - tech_pvt->port = 443; + "connecting to deepgram on-prem %s port %d, using tls? (%s)\n", host.c_str(), port, useTls ? "yes" : "no"); + } else { + host = "api.deepgram.com"; } - strncpy(tech_pvt->path, path.c_str(), MAX_PATH_LEN); + const char* apiKey = switch_channel_get_variable(channel, "DEEPGRAM_API_KEY"); + if (!apiKey && defaultApiKey) { + apiKey = defaultApiKey; + } else if (!apiKey && endpoint == nullptr) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "no deepgram api key provided\n"); + return SWITCH_STATUS_FALSE; + } + + configuration_stream << + host << ":" << + port << ";" << + path << ";" << + buflen << ";" << + read_impl.decoded_bytes_per_packet << ";" << + apiKey << ";" << + useTls; + + if (tech_pvt->pAudioPipe) { + // stop sending keep alive + tech_pvt->is_keep_alive = 0; + if (0 != strcmp(tech_pvt->configuration, configuration_stream.str().c_str())) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "fork_data_init: stop existing deepgram connection, old configuration %s, new configuration %s\n", + tech_pvt->configuration, configuration_stream.str().c_str()); + reaper(tech_pvt, true); + } else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "fork_data_init: enable existing deepgram connection\n"); + return SWITCH_STATUS_SUCCESS; + } + } else { + memset(tech_pvt, 0, sizeof(private_t)); + } + + strncpy(tech_pvt->sessionId, switch_core_session_get_uuid(session), MAX_SESSION_ID); + strncpy(tech_pvt->host, host.c_str(), MAX_WS_URL_LEN); + tech_pvt->port = port; + strncpy(tech_pvt->path, path.c_str(), MAX_PATH_LEN); + strncpy(tech_pvt->configuration, configuration_stream.str().c_str(), MAX_PATH_LEN) ; tech_pvt->sampling = desiredSampling; tech_pvt->responseHandler = responseHandler; tech_pvt->channels = channels; tech_pvt->id = ++idxCallCount; tech_pvt->buffer_overrun_notified = 0; + tech_pvt->is_keep_alive = 0; strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); - - size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); - - const char* apiKey = switch_channel_get_variable(channel, "DEEPGRAM_API_KEY"); - if (!apiKey && defaultApiKey) apiKey = defaultApiKey; - else if (!apiKey && endpoint == nullptr) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "no deepgram api key provided\n"); - return SWITCH_STATUS_FALSE; - } deepgram::AudioPipe* ap = new deepgram::AudioPipe(tech_pvt->sessionId, bugname, tech_pvt->host, tech_pvt->port, tech_pvt->path, buflen, read_impl.decoded_bytes_per_packet, apiKey, useTls, eventCallback); @@ -406,18 +433,26 @@ namespace { tech_pvt->pAudioPipe = static_cast(ap); - switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connecting now\n"); + ap->connect(); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection in progress\n"); - if (desiredSampling != sampling) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) resampling from %u to %u\n", tech_pvt->id, sampling, desiredSampling); - tech_pvt->resampler = speex_resampler_init(channels, sampling, desiredSampling, SWITCH_RESAMPLE_QUALITY, &err); - if (0 != err) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing resampler: %s.\n", speex_resampler_strerror(err)); - return SWITCH_STATUS_FALSE; - } + if (!tech_pvt->mutex) { + switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); } - else { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) no resampling needed for this call\n", tech_pvt->id); + + if (!tech_pvt->resampler) { + if (desiredSampling != sampling) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) resampling from %u to %u\n", tech_pvt->id, sampling, desiredSampling); + tech_pvt->resampler = speex_resampler_init(channels, sampling, desiredSampling, SWITCH_RESAMPLE_QUALITY, &err); + if (0 != err) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing resampler: %s.\n", speex_resampler_strerror(err)); + return SWITCH_STATUS_FALSE; + } + } + else { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) no resampling needed for this call\n", tech_pvt->id); + } } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) fork_data_init\n", tech_pvt->id); @@ -474,11 +509,22 @@ extern "C" { switch_status_t dg_transcribe_session_init(switch_core_session_t *session, responseHandler_t responseHandler, uint32_t samples_per_second, uint32_t channels, char* lang, int interim, char* bugname, void **ppUserData) - { + { int err; + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); + private_t* tech_pvt; + if (bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "reuse existing kep alive deepgram connection\n"); + tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + } else { + tech_pvt = (private_t *) switch_core_session_alloc(session, sizeof(private_t)); + tech_pvt->pAudioPipe = NULL; + tech_pvt->is_keep_alive = 0; + tech_pvt->mutex = NULL; + tech_pvt->resampler = NULL; + } - // allocate per-session data structure - private_t* tech_pvt = (private_t *) switch_core_session_alloc(session, sizeof(private_t)); if (!tech_pvt) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error allocating memory!\n"); return SWITCH_STATUS_FALSE; @@ -491,16 +537,13 @@ extern "C" { *ppUserData = tech_pvt; - deepgram::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connecting now\n"); - pAudioPipe->connect(); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection in progress\n"); return SWITCH_STATUS_SUCCESS; } switch_status_t dg_transcribe_session_stop(switch_core_session_t *session,int channelIsClosing, char* bugname) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); + const bool use_single_connection = switch_true(std::getenv("DEEPGRAM_SPEECH_USE_SINGLE_CONNECTION")); if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "dg_transcribe_session_stop: no bug - websocket conection already closed\n"); return SWITCH_STATUS_FALSE; @@ -511,6 +554,12 @@ extern "C" { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) dg_transcribe_session_stop\n", id); if (!tech_pvt) return SWITCH_STATUS_FALSE; + if (use_single_connection && !channelIsClosing) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "dg_transcribe_session_stop: call is running, use_single_connection is true, keep alive is activated\n", id); + tech_pvt->is_keep_alive = 1; + tech_pvt->frame_count = 0; + return SWITCH_STATUS_SUCCESS; + } // close connection and get final responses switch_mutex_lock(tech_pvt->mutex); @@ -518,7 +567,7 @@ extern "C" { if (!channelIsClosing) switch_core_media_bug_remove(session, &bug); deepgram::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); - if (pAudioPipe) reaper(tech_pvt); + if (pAudioPipe) reaper(tech_pvt, false); destroy_tech_pvt(tech_pvt); switch_mutex_unlock(tech_pvt->mutex); switch_mutex_destroy(tech_pvt->mutex); @@ -532,9 +581,31 @@ extern "C" { size_t inuse = 0; bool dirty = false; char *p = (char *) "{\"msg\": \"buffer overrun\"}"; + char *keep_alive = (char *) "{\"type\": \"KeepAlive\"}"; if (!tech_pvt) return SWITCH_TRUE; + + // Keep sending keep alive if there is no transcribe activity + if (tech_pvt->is_keep_alive && tech_pvt->pAudioPipe) { + deepgram::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + if (++tech_pvt->frame_count * 20 /*ms*/ / 1000 >= DEEPGRAM_KEEP_ALIVE_INTERVAL_SECOND) { + tech_pvt->frame_count = 0; + pAudioPipe->bufferForSending(keep_alive); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "dg_transcribe_frame: sending %s to deepgram\n", keep_alive); + } + // remove media bug buffered data + while (true) { + unsigned char data[SWITCH_RECOMMENDED_BUFFER_SIZE] = {0}; + switch_frame_t frame = { 0 }; + frame.data = data; + frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; + switch_status_t rv = switch_core_media_bug_read(bug, &frame, SWITCH_TRUE); + if (rv != SWITCH_STATUS_SUCCESS) break; + } + return SWITCH_TRUE; + } + if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { if (!tech_pvt->pAudioPipe) { switch_mutex_unlock(tech_pvt->mutex); @@ -545,7 +616,6 @@ extern "C" { switch_mutex_unlock(tech_pvt->mutex); return SWITCH_TRUE; } - pAudioPipe->lockAudioBuffer(); size_t available = pAudioPipe->binarySpaceAvailable(); if (NULL == tech_pvt->resampler) { @@ -587,7 +657,6 @@ extern "C" { if (frame.datalen) { spx_uint32_t out_len = available >> 1; // space for samples which are 2 bytes spx_uint32_t in_len = frame.samples; - speex_resampler_process_interleaved_int(tech_pvt->resampler, (const spx_int16_t *) frame.data, (spx_uint32_t *) &in_len, diff --git a/mod_deepgram_transcribe/mod_deepgram_transcribe.c b/mod_deepgram_transcribe/mod_deepgram_transcribe.c index 8f571f3..ddf6c50 100644 --- a/mod_deepgram_transcribe/mod_deepgram_transcribe.c +++ b/mod_deepgram_transcribe/mod_deepgram_transcribe.c @@ -5,6 +5,7 @@ */ #include "mod_deepgram_transcribe.h" #include "dg_transcribe_glue.h" +#include /* Prototypes */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_deepgram_transcribe_shutdown); @@ -72,8 +73,10 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi switch_codec_implementation_t read_impl = { 0 }; void *pUserData; uint32_t samples_per_second; + int use_single_connection = switch_true(getenv("DEEPGRAM_SPEECH_USE_SINGLE_CONNECTION")); + bug = switch_channel_get_private(channel, bugname); - if (switch_channel_get_private(channel, bugname)) { + if (bug && !use_single_connection) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n"); do_stop(session, bugname); } @@ -90,11 +93,13 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initializing dg speech session.\n"); return SWITCH_STATUS_FALSE; } - if ((status = switch_core_media_bug_add(session, "dg_transcribe", NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { - return status; + if (!bug || !use_single_connection) { + if ((status = switch_core_media_bug_add(session, "dg_transcribe", NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { + return status; + } + switch_channel_set_private(channel, bugname, bug); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for dg transcribe\n"); } - switch_channel_set_private(channel, bugname, bug); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for dg transcribe\n"); return SWITCH_STATUS_SUCCESS; } diff --git a/mod_deepgram_transcribe/mod_deepgram_transcribe.h b/mod_deepgram_transcribe/mod_deepgram_transcribe.h index 0fcfcd6..ba540b7 100644 --- a/mod_deepgram_transcribe/mod_deepgram_transcribe.h +++ b/mod_deepgram_transcribe/mod_deepgram_transcribe.h @@ -39,6 +39,9 @@ struct private_data { unsigned int id; int buffer_overrun_notified:1; int is_finished:1; + int is_keep_alive; + int frame_count; + char configuration[MAX_PATH_LEN]; }; typedef struct private_data private_t;