diff --git a/mod_assemblyai_transcribe/aai_transcribe_glue.cpp b/mod_assemblyai_transcribe/aai_transcribe_glue.cpp index fb73685..fc48852 100644 --- a/mod_assemblyai_transcribe/aai_transcribe_glue.cpp +++ b/mod_assemblyai_transcribe/aai_transcribe_glue.cpp @@ -102,11 +102,12 @@ namespace { return path; } - static void eventCallback(const char* sessionId, assemblyai::AudioPipe::NotifyEvent_t event, const char* message, bool finished) { + static void eventCallback(const char* sessionId, const char* bugname, + assemblyai::AudioPipe::NotifyEvent_t event, const char* message, bool finished) { switch_core_session_t* session = switch_core_session_locate(sessionId); if (session) { switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); if (bug) { private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); if (tech_pvt) { @@ -194,7 +195,8 @@ namespace { tech_pvt->channels = channels; tech_pvt->id = ++idxCallCount; tech_pvt->buffer_overrun_notified = 0; - + strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); + size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); const char* apiKey = switch_channel_get_variable(channel, "ASSEMBLYAI_API_KEY"); @@ -204,7 +206,7 @@ namespace { return SWITCH_STATUS_FALSE; } - assemblyai::AudioPipe* ap = new assemblyai::AudioPipe(tech_pvt->sessionId, tech_pvt->host, tech_pvt->port, tech_pvt->path, + assemblyai::AudioPipe* ap = new assemblyai::AudioPipe(tech_pvt->sessionId, bugname, tech_pvt->host, tech_pvt->port, tech_pvt->path, buflen, read_impl.decoded_bytes_per_packet, apiKey, eventCallback); if (!ap) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n"); @@ -251,7 +253,8 @@ extern "C" { switch_status_t aai_transcribe_init() { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_assemblyai_transcribe: audio buffer (in secs): %d secs\n", nAudioBufferSecs); - int logs = LLL_ERR | LLL_WARN | LLL_NOTICE || LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; + int logs = LLL_ERR | LLL_WARN | LLL_NOTICE ; + //| LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; assemblyai::AudioPipe::initialize(logs, lws_logger); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::initialize completed\n"); @@ -306,7 +309,7 @@ extern "C" { switch_status_t aai_transcribe_session_stop(switch_core_session_t *session,int channelIsClosing, char* bugname) { switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); + switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); if (!bug) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "aai_transcribe_session_stop: no bug - websocket conection already closed\n"); return SWITCH_STATUS_FALSE; diff --git a/mod_assemblyai_transcribe/audio_pipe.cpp b/mod_assemblyai_transcribe/audio_pipe.cpp index 544a4ee..6e5e69c 100644 --- a/mod_assemblyai_transcribe/audio_pipe.cpp +++ b/mod_assemblyai_transcribe/audio_pipe.cpp @@ -72,7 +72,7 @@ int AudioPipe::lws_callback(struct lws *wsi, lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); if (ap) { ap->m_state = LWS_CLIENT_FAILED; - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, ap->isFinished()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, ap->isFinished()); } else { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); @@ -87,7 +87,7 @@ int AudioPipe::lws_callback(struct lws *wsi, *ppAp = ap; ap->m_vhd = vhd; ap->m_state = LWS_CLIENT_CONNECTED; - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, ap->isFinished()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, ap->isFinished()); } else { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); @@ -105,12 +105,12 @@ int AudioPipe::lws_callback(struct lws *wsi, // closed by us lwsl_debug("%s socket closed by us\n", ap->m_uuid.c_str()); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, ap->isFinished()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, ap->isFinished()); } else if (ap->m_state == LWS_CLIENT_CONNECTED) { // closed by far end lwsl_info("%s socket closed by far end\n", ap->m_uuid.c_str()); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, ap->isFinished()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, ap->isFinished()); } ap->m_state = LWS_CLIENT_DISCONNECTED; ap->setClosed(); @@ -172,7 +172,7 @@ int AudioPipe::lws_callback(struct lws *wsi, if (lws_is_final_fragment(wsi)) { if (nullptr != ap->m_recv_buf) { std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::MESSAGE, msg.c_str(), ap->isFinished()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::MESSAGE, msg.c_str(), ap->isFinished()); if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf); } ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; @@ -261,6 +261,7 @@ static const lws_retry_bo_t retry = { }; struct lws_context *AudioPipe::context = nullptr; +std::thread AudioPipe::serviceThread; std::string AudioPipe::protocolName; std::mutex AudioPipe::mutex_connects; std::mutex AudioPipe::mutex_disconnects; @@ -429,26 +430,28 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(int loglevel, log_emit_function logger) { - lws_set_log_level(loglevel, logger); + //lws_set_log_level(loglevel, logger); lwsl_notice("AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); - std::thread t(&AudioPipe::lws_service_thread); stopFlag = false; - t.detach(); + serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { lwsl_notice("AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; + if (serviceThread.joinable()) { + serviceThread.join(); + } return true; } // instance members -AudioPipe::AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, +AudioPipe::AudioPipe(const char* uuid, const char* bugname, const char* host, unsigned int port, const char* path, size_t bufLen, size_t minFreespace, const char* apiKey, notifyHandler_t callback) : - m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false), + m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false), m_bugname(bugname), m_audio_buffer_min_freespace(minFreespace), m_audio_buffer_max_len(bufLen), m_gracefulShutdown(false), m_audio_buffer_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_state(LWS_CLIENT_IDLE), m_wsi(nullptr), m_vhd(nullptr), m_apiKey(apiKey), m_callback(callback) { diff --git a/mod_assemblyai_transcribe/audio_pipe.hpp b/mod_assemblyai_transcribe/audio_pipe.hpp index 4171a47..d6caa44 100644 --- a/mod_assemblyai_transcribe/audio_pipe.hpp +++ b/mod_assemblyai_transcribe/audio_pipe.hpp @@ -31,7 +31,7 @@ public: MESSAGE }; typedef void (*log_emit_function)(int level, const char *line); - typedef void (*notifyHandler_t)(const char *sessionId, NotifyEvent_t event, const char* message, bool finished); + typedef void (*notifyHandler_t)(const char *sessionId,const char* bugname, NotifyEvent_t event, const char* message, bool finished); struct lws_per_vhost_data { struct lws_context *context; @@ -44,7 +44,7 @@ public: static bool lws_service_thread(); // constructor - AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, + AudioPipe(const char* uuid, const char* bugname, const char* host, unsigned int port, const char* path, size_t bufLen, size_t minFreespace, const char* apiKey, notifyHandler_t callback); ~AudioPipe(); @@ -86,6 +86,7 @@ public: void operator=(const AudioPipe&) = delete; private: + static std::thread serviceThread; static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static struct lws_context *context; diff --git a/mod_assemblyai_transcribe/mod_assemblyai_transcribe.c b/mod_assemblyai_transcribe/mod_assemblyai_transcribe.c index 0cc3262..ec5e9da 100644 --- a/mod_assemblyai_transcribe/mod_assemblyai_transcribe.c +++ b/mod_assemblyai_transcribe/mod_assemblyai_transcribe.c @@ -74,7 +74,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi void *pUserData; uint32_t samples_per_second; - if (switch_channel_get_private(channel, MY_BUG_NAME)) { + if (switch_channel_get_private(channel, bugname)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n"); do_stop(session, bugname); } @@ -94,7 +94,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi if ((status = switch_core_media_bug_add(session, "aai_transcribe", NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { return status; } - switch_channel_set_private(channel, MY_BUG_NAME, bug); + switch_channel_set_private(channel, bugname, bug); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for assemblyai transcribe\n"); return SWITCH_STATUS_SUCCESS; @@ -105,7 +105,7 @@ static switch_status_t do_stop(switch_core_session_t *session, char* bugname) switch_status_t status = SWITCH_STATUS_SUCCESS; switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = switch_channel_get_private(channel, MY_BUG_NAME); + switch_media_bug_t *bug = switch_channel_get_private(channel, bugname); if (bug) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Received user command command to stop transcribe.\n"); diff --git a/mod_audio_fork/audio_pipe.cpp b/mod_audio_fork/audio_pipe.cpp index d422efe..83ea426 100644 --- a/mod_audio_fork/audio_pipe.cpp +++ b/mod_audio_fork/audio_pipe.cpp @@ -7,6 +7,7 @@ #define MAX_RECV_BUF_SIZE (65 * 1024 * 10) #define RECV_BUF_REALLOC_SIZE (8 * 1024) +using namespace drachtio; namespace { static const char* basicAuthUser = std::getenv("MOD_AUDIO_FORK_HTTP_AUTH_USER"); @@ -267,6 +268,7 @@ static const lws_retry_bo_t retry = { }; struct lws_context *AudioPipe::context = nullptr; +std::thread AudioPipe::serviceThread; std::string AudioPipe::protocolName; std::mutex AudioPipe::mutex_connects; std::mutex AudioPipe::mutex_disconnects; @@ -436,19 +438,21 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(const char* protocol, int loglevel, log_emit_function logger) { protocolName = protocol; - lws_set_log_level(loglevel, logger); + //lws_set_log_level(loglevel, logger); lwsl_notice("AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); - std::thread t(&AudioPipe::lws_service_thread); stopFlag = false; - t.detach(); + serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { lwsl_notice("AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; + if (serviceThread.joinable()) { + serviceThread.join(); + } return true; } diff --git a/mod_audio_fork/audio_pipe.hpp b/mod_audio_fork/audio_pipe.hpp index 1412918..25b457c 100644 --- a/mod_audio_fork/audio_pipe.hpp +++ b/mod_audio_fork/audio_pipe.hpp @@ -10,135 +10,140 @@ #include -class AudioPipe { -public: - enum LwsState_t { - LWS_CLIENT_IDLE, - LWS_CLIENT_CONNECTING, - LWS_CLIENT_CONNECTED, - LWS_CLIENT_FAILED, - LWS_CLIENT_DISCONNECTING, - LWS_CLIENT_DISCONNECTED - }; - enum NotifyEvent_t { - CONNECT_SUCCESS, - CONNECT_FAIL, - CONNECTION_DROPPED, - CONNECTION_CLOSED_GRACEFULLY, - MESSAGE - }; - typedef void (*log_emit_function)(int level, const char *line); - typedef void (*notifyHandler_t)(const char *sessionId, const char* bugname, NotifyEvent_t event, const char* message); +namespace drachtio { - struct lws_per_vhost_data { - struct lws_context *context; - struct lws_vhost *vhost; - const struct lws_protocols *protocol; + class AudioPipe { + public: + enum LwsState_t { + LWS_CLIENT_IDLE, + LWS_CLIENT_CONNECTING, + LWS_CLIENT_CONNECTED, + LWS_CLIENT_FAILED, + LWS_CLIENT_DISCONNECTING, + LWS_CLIENT_DISCONNECTED + }; + enum NotifyEvent_t { + CONNECT_SUCCESS, + CONNECT_FAIL, + CONNECTION_DROPPED, + CONNECTION_CLOSED_GRACEFULLY, + MESSAGE + }; + typedef void (*log_emit_function)(int level, const char *line); + typedef void (*notifyHandler_t)(const char *sessionId, const char* bugname, NotifyEvent_t event, const char* message); + + struct lws_per_vhost_data { + struct lws_context *context; + struct lws_vhost *vhost; + const struct lws_protocols *protocol; + }; + + static void initialize(const char* protocolName, int loglevel, log_emit_function logger); + static bool deinitialize(); + static bool lws_service_thread(); + + // constructor + AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, int sslFlags, + size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, notifyHandler_t callback); + ~AudioPipe(); + + LwsState_t getLwsState(void) { return m_state; } + void connect(void); + void bufferForSending(const char* text); + size_t binarySpaceAvailable(void) { + return m_audio_buffer_max_len - m_audio_buffer_write_offset; + } + size_t binaryMinSpace(void) { + return m_audio_buffer_min_freespace; + } + char * binaryWritePtr(void) { + return (char *) m_audio_buffer + m_audio_buffer_write_offset; + } + void binaryWritePtrAdd(size_t len) { + m_audio_buffer_write_offset += len; + } + void binaryWritePtrResetToZero(void) { + m_audio_buffer_write_offset = 0; + } + void lockAudioBuffer(void) { + m_audio_mutex.lock(); + } + void unlockAudioBuffer(void) ; + bool hasBasicAuth(void) { + return !m_username.empty() && !m_password.empty(); + } + + void getBasicAuth(std::string& username, std::string& password) { + username = m_username; + password = m_password; + } + + void do_graceful_shutdown(); + bool isGracefulShutdown(void) { + return m_gracefulShutdown; + } + + void close() ; + + // no default constructor or copying + AudioPipe() = delete; + AudioPipe(const AudioPipe&) = delete; + void operator=(const AudioPipe&) = delete; + + private: + static std::thread serviceThread; + + static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + static struct lws_context *context; + static std::string protocolName; + static std::mutex mutex_connects; + static std::mutex mutex_disconnects; + static std::mutex mutex_writes; + static std::list pendingConnects; + static std::list pendingDisconnects; + static std::list pendingWrites; + static log_emit_function logger; + + static std::mutex mapMutex; + static bool stopFlag; + + static AudioPipe* findAndRemovePendingConnect(struct lws *wsi); + static AudioPipe* findPendingConnect(struct lws *wsi); + static void addPendingConnect(AudioPipe* ap); + static void addPendingDisconnect(AudioPipe* ap); + static void addPendingWrite(AudioPipe* ap); + static void processPendingConnects(lws_per_vhost_data *vhd); + static void processPendingDisconnects(lws_per_vhost_data *vhd); + static void processPendingWrites(void); + + bool connect_client(struct lws_per_vhost_data *vhd); + + LwsState_t m_state; + std::string m_uuid; + std::string m_host; + std::string m_bugname; + unsigned int m_port; + std::string m_path; + std::string m_metadata; + std::mutex m_text_mutex; + std::mutex m_audio_mutex; + int m_sslFlags; + struct lws *m_wsi; + uint8_t *m_audio_buffer; + size_t m_audio_buffer_max_len; + size_t m_audio_buffer_write_offset; + size_t m_audio_buffer_min_freespace; + uint8_t* m_recv_buf; + uint8_t* m_recv_buf_ptr; + size_t m_recv_buf_len; + struct lws_per_vhost_data* m_vhd; + notifyHandler_t m_callback; + log_emit_function m_logger; + std::string m_username; + std::string m_password; + bool m_gracefulShutdown; }; - static void initialize(const char* protocolName, int loglevel, log_emit_function logger); - static bool deinitialize(); - static bool lws_service_thread(); - - // constructor - AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, int sslFlags, - size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, notifyHandler_t callback); - ~AudioPipe(); - - LwsState_t getLwsState(void) { return m_state; } - void connect(void); - void bufferForSending(const char* text); - size_t binarySpaceAvailable(void) { - return m_audio_buffer_max_len - m_audio_buffer_write_offset; - } - size_t binaryMinSpace(void) { - return m_audio_buffer_min_freespace; - } - char * binaryWritePtr(void) { - return (char *) m_audio_buffer + m_audio_buffer_write_offset; - } - void binaryWritePtrAdd(size_t len) { - m_audio_buffer_write_offset += len; - } - void binaryWritePtrResetToZero(void) { - m_audio_buffer_write_offset = 0; - } - void lockAudioBuffer(void) { - m_audio_mutex.lock(); - } - void unlockAudioBuffer(void) ; - bool hasBasicAuth(void) { - return !m_username.empty() && !m_password.empty(); - } - - void getBasicAuth(std::string& username, std::string& password) { - username = m_username; - password = m_password; - } - - void do_graceful_shutdown(); - bool isGracefulShutdown(void) { - return m_gracefulShutdown; - } - - void close() ; - - // no default constructor or copying - AudioPipe() = delete; - AudioPipe(const AudioPipe&) = delete; - void operator=(const AudioPipe&) = delete; - -private: - - static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); - static struct lws_context *context; - static std::string protocolName; - static std::mutex mutex_connects; - static std::mutex mutex_disconnects; - static std::mutex mutex_writes; - static std::list pendingConnects; - static std::list pendingDisconnects; - static std::list pendingWrites; - static log_emit_function logger; - - static std::mutex mapMutex; - static bool stopFlag; - - static AudioPipe* findAndRemovePendingConnect(struct lws *wsi); - static AudioPipe* findPendingConnect(struct lws *wsi); - static void addPendingConnect(AudioPipe* ap); - static void addPendingDisconnect(AudioPipe* ap); - static void addPendingWrite(AudioPipe* ap); - static void processPendingConnects(lws_per_vhost_data *vhd); - static void processPendingDisconnects(lws_per_vhost_data *vhd); - static void processPendingWrites(void); - - bool connect_client(struct lws_per_vhost_data *vhd); - - LwsState_t m_state; - std::string m_uuid; - std::string m_host; - std::string m_bugname; - unsigned int m_port; - std::string m_path; - std::string m_metadata; - std::mutex m_text_mutex; - std::mutex m_audio_mutex; - int m_sslFlags; - struct lws *m_wsi; - uint8_t *m_audio_buffer; - size_t m_audio_buffer_max_len; - size_t m_audio_buffer_write_offset; - size_t m_audio_buffer_min_freespace; - uint8_t* m_recv_buf; - uint8_t* m_recv_buf_ptr; - size_t m_recv_buf_len; - struct lws_per_vhost_data* m_vhd; - notifyHandler_t m_callback; - log_emit_function m_logger; - std::string m_username; - std::string m_password; - bool m_gracefulShutdown; -}; +} // namespace drachtio #endif diff --git a/mod_audio_fork/lws_glue.cpp b/mod_audio_fork/lws_glue.cpp index 32c2f5a..45a075b 100644 --- a/mod_audio_fork/lws_glue.cpp +++ b/mod_audio_fork/lws_glue.cpp @@ -156,7 +156,7 @@ namespace { } } - static void eventCallback(const char* sessionId, const char* bugname, AudioPipe::NotifyEvent_t event, const char* message) { + static void eventCallback(const char* sessionId, const char* bugname, drachtio::AudioPipe::NotifyEvent_t event, const char* message) { switch_core_session_t* session = switch_core_session_locate(sessionId); if (session) { switch_channel_t *channel = switch_core_session_get_channel(session); @@ -165,16 +165,16 @@ namespace { private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); if (tech_pvt) { switch (event) { - case AudioPipe::CONNECT_SUCCESS: + case drachtio::AudioPipe::CONNECT_SUCCESS: switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "connection successful\n"); tech_pvt->responseHandler(session, EVENT_CONNECT_SUCCESS, NULL); if (strlen(tech_pvt->initialMetadata) > 0) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "sending initial metadata %s\n", tech_pvt->initialMetadata); - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); pAudioPipe->bufferForSending(tech_pvt->initialMetadata); } break; - case AudioPipe::CONNECT_FAIL: + case drachtio::AudioPipe::CONNECT_FAIL: { // first thing: we can no longer access the AudioPipe std::stringstream json; @@ -184,18 +184,18 @@ namespace { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "connection failed: %s\n", message); } break; - case AudioPipe::CONNECTION_DROPPED: + case drachtio::AudioPipe::CONNECTION_DROPPED: // first thing: we can no longer access the AudioPipe tech_pvt->pAudioPipe = nullptr; tech_pvt->responseHandler(session, EVENT_DISCONNECT, NULL); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "connection dropped from far end\n"); break; - case AudioPipe::CONNECTION_CLOSED_GRACEFULLY: + case drachtio::AudioPipe::CONNECTION_CLOSED_GRACEFULLY: // first thing: we can no longer access the AudioPipe tech_pvt->pAudioPipe = nullptr; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection closed gracefully\n"); break; - case AudioPipe::MESSAGE: + case drachtio::AudioPipe::MESSAGE: processIncomingMessage(tech_pvt, session, message); break; } @@ -239,7 +239,7 @@ namespace { size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); - AudioPipe* ap = new AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags, + drachtio::AudioPipe* ap = new drachtio::AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags, buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback); if (!ap) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n"); @@ -374,13 +374,17 @@ extern "C" { int logs = LLL_ERR | LLL_WARN | LLL_NOTICE ; //LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; - AudioPipe::initialize(mySubProtocolName, logs, lws_logger); - return SWITCH_STATUS_SUCCESS; + drachtio::AudioPipe::initialize(mySubProtocolName, logs, lws_logger); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork successfully initialized\n"); + return SWITCH_STATUS_SUCCESS; } switch_status_t fork_cleanup() { bool cleanup = false; - cleanup = AudioPipe::deinitialize(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork unloading..\n"); + + cleanup = drachtio::AudioPipe::deinitialize(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork unloaded status %d\n", cleanup); if (cleanup == true) { return SWITCH_STATUS_SUCCESS; } @@ -420,7 +424,7 @@ extern "C" { switch_status_t fork_session_connect(void **ppUserData) { private_t *tech_pvt = static_cast(*ppUserData); - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); pAudioPipe->connect(); return SWITCH_STATUS_SUCCESS; } @@ -438,7 +442,7 @@ extern "C" { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) fork_session_cleanup\n", id); if (!tech_pvt) return SWITCH_STATUS_FALSE; - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); switch_mutex_lock(tech_pvt->mutex); @@ -481,7 +485,7 @@ extern "C" { private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); if (!tech_pvt) return SWITCH_STATUS_FALSE; - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); if (pAudioPipe && text) pAudioPipe->bufferForSending(text); return SWITCH_STATUS_SUCCESS; @@ -516,7 +520,7 @@ extern "C" { tech_pvt->graceful_shutdown = 1; - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); if (pAudioPipe) pAudioPipe->do_graceful_shutdown(); return SWITCH_STATUS_SUCCESS; @@ -535,8 +539,8 @@ extern "C" { switch_mutex_unlock(tech_pvt->mutex); return SWITCH_TRUE; } - AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); - if (pAudioPipe->getLwsState() != AudioPipe::LWS_CLIENT_CONNECTED) { + drachtio::AudioPipe *pAudioPipe = static_cast(tech_pvt->pAudioPipe); + if (pAudioPipe->getLwsState() != drachtio::AudioPipe::LWS_CLIENT_CONNECTED) { switch_mutex_unlock(tech_pvt->mutex); return SWITCH_TRUE; } diff --git a/mod_deepgram_transcribe/audio_pipe.cpp b/mod_deepgram_transcribe/audio_pipe.cpp index 7d75656..2684d50 100644 --- a/mod_deepgram_transcribe/audio_pipe.cpp +++ b/mod_deepgram_transcribe/audio_pipe.cpp @@ -255,6 +255,7 @@ static const lws_retry_bo_t retry = { }; struct lws_context *AudioPipe::context = nullptr; +std::thread AudioPipe::serviceThread; std::mutex AudioPipe::mutex_connects; std::mutex AudioPipe::mutex_disconnects; std::mutex AudioPipe::mutex_writes; @@ -423,19 +424,22 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(int loglevel, log_emit_function logger) { - lws_set_log_level(loglevel, logger); + //lws_set_log_level(loglevel, logger); lwsl_notice("AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); - std::thread t(&deepgram::AudioPipe::lws_service_thread); stopFlag = false; - t.detach(); + serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { lwsl_notice("AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; + if (serviceThread.joinable()) { + serviceThread.join(); + } + return true; } diff --git a/mod_deepgram_transcribe/audio_pipe.hpp b/mod_deepgram_transcribe/audio_pipe.hpp index d09b611..ee8562c 100644 --- a/mod_deepgram_transcribe/audio_pipe.hpp +++ b/mod_deepgram_transcribe/audio_pipe.hpp @@ -86,6 +86,7 @@ namespace deepgram { void operator=(const AudioPipe&) = delete; private: + static std::thread serviceThread; static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static struct lws_context *context; diff --git a/mod_deepgram_transcribe/dg_transcribe_glue.cpp b/mod_deepgram_transcribe/dg_transcribe_glue.cpp index e268fb3..ae9ac2e 100644 --- a/mod_deepgram_transcribe/dg_transcribe_glue.cpp +++ b/mod_deepgram_transcribe/dg_transcribe_glue.cpp @@ -276,7 +276,7 @@ namespace { if (tech_pvt) { switch (event) { case deepgram::AudioPipe::CONNECT_SUCCESS: - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "connection successful\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "connection (%s) successful\n", tech_pvt->bugname); tech_pvt->responseHandler(session, TRANSCRIBE_EVENT_CONNECT_SUCCESS, NULL, tech_pvt->bugname, finished); break; case deepgram::AudioPipe::CONNECT_FAIL: @@ -286,19 +286,19 @@ namespace { json << "{\"reason\":\"" << message << "\"}"; tech_pvt->pAudioPipe = nullptr; tech_pvt->responseHandler(session, TRANSCRIBE_EVENT_CONNECT_FAIL, (char *) json.str().c_str(), tech_pvt->bugname, finished); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "connection failed: %s\n", message); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "connection (%s) failed: %s\n", message, tech_pvt->bugname); } break; case deepgram::AudioPipe::CONNECTION_DROPPED: // first thing: we can no longer access the AudioPipe tech_pvt->pAudioPipe = nullptr; tech_pvt->responseHandler(session, TRANSCRIBE_EVENT_DISCONNECT, NULL, tech_pvt->bugname, finished); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection dropped from far end\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection (%s) dropped from far end\n", tech_pvt->bugname); break; case deepgram::AudioPipe::CONNECTION_CLOSED_GRACEFULLY: // first thing: we can no longer access the AudioPipe tech_pvt->pAudioPipe = nullptr; - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection closed gracefully\n"); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection (%s) closed gracefully\n", tech_pvt->bugname); break; case deepgram::AudioPipe::MESSAGE: if( strstr(message, emptyTranscript)) { @@ -306,7 +306,7 @@ namespace { } else { tech_pvt->responseHandler(session, TRANSCRIBE_EVENT_RESULTS, message, tech_pvt->bugname, finished); - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "deepgram message: %s\n", message); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "deepgram message (%s): %s\n", tech_pvt->bugname, message); } break; diff --git a/mod_ibm_transcribe/audio_pipe.cpp b/mod_ibm_transcribe/audio_pipe.cpp index 2bb1c00..23ffcfd 100644 --- a/mod_ibm_transcribe/audio_pipe.cpp +++ b/mod_ibm_transcribe/audio_pipe.cpp @@ -48,7 +48,7 @@ int AudioPipe::lws_callback(struct lws *wsi, lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); if (ap) { ap->m_state = LWS_CLIENT_FAILED; - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, ap->isFinished(), ap->isInterimTranscriptsEnabled()); } else { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); @@ -77,7 +77,7 @@ int AudioPipe::lws_callback(struct lws *wsi, oss << "}"; ap->bufferForSending(oss.str().c_str()); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled()); } } @@ -97,12 +97,12 @@ int AudioPipe::lws_callback(struct lws *wsi, // closed by us lwsl_debug("%s socket closed by us\n", ap->m_uuid.c_str()); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled()); } else if (ap->m_state == LWS_CLIENT_CONNECTED) { // closed by far end lwsl_info("%s socket closed by far end\n", ap->m_uuid.c_str()); - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled()); } ap->m_state = LWS_CLIENT_DISCONNECTED; ap->setClosed(); @@ -166,7 +166,7 @@ int AudioPipe::lws_callback(struct lws *wsi, std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf); //std::cerr << "Recv: " << msg << std::endl; - ap->m_callback(ap->m_uuid.c_str(), AudioPipe::MESSAGE, msg.c_str(), ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str()); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::MESSAGE, msg.c_str(), ap->isFinished(), ap->isInterimTranscriptsEnabled()); if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf); } ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; @@ -251,6 +251,7 @@ static const lws_retry_bo_t retry = { }; struct lws_context *AudioPipe::context = nullptr; +std::thread AudioPipe::serviceThread; std::mutex AudioPipe::mutex_connects; std::mutex AudioPipe::mutex_disconnects; std::mutex AudioPipe::mutex_writes; @@ -418,26 +419,29 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(int loglevel, log_emit_function logger) { - lws_set_log_level(loglevel, logger); + //lws_set_log_level(loglevel, logger); lwsl_notice("AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); - std::thread t(&AudioPipe::lws_service_thread); stopFlag = false; - t.detach(); + serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { lwsl_notice("AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; + if (serviceThread.joinable()) { + serviceThread.join(); + } + return true; } // instance members -AudioPipe::AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, +AudioPipe::AudioPipe(const char* uuid, const char* bugname, const char* host, unsigned int port, const char* path, size_t bufLen, size_t minFreespace, notifyHandler_t callback) : - m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false), + m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false), m_bugname(bugname), m_audio_buffer_min_freespace(minFreespace), m_audio_buffer_max_len(bufLen), m_gracefulShutdown(false), m_audio_buffer_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_interim(false), m_state(LWS_CLIENT_IDLE), m_wsi(nullptr), m_vhd(nullptr), m_callback(callback) { diff --git a/mod_ibm_transcribe/audio_pipe.hpp b/mod_ibm_transcribe/audio_pipe.hpp index 9b4f69e..34b5fc9 100644 --- a/mod_ibm_transcribe/audio_pipe.hpp +++ b/mod_ibm_transcribe/audio_pipe.hpp @@ -31,7 +31,7 @@ public: MESSAGE }; typedef void (*log_emit_function)(int level, const char *line); - typedef void (*notifyHandler_t)(const char *sessionId, NotifyEvent_t event, const char* message, bool finished, bool wantsInterim, const char* bugname); + typedef void (*notifyHandler_t)(const char *sessionId, const char* bugname, NotifyEvent_t event, const char* message, bool finished, bool wantsInterim); struct lws_per_vhost_data { struct lws_context *context; @@ -44,7 +44,7 @@ public: static bool lws_service_thread(); // constructor - AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, + AudioPipe(const char* uuid, const char* bugname, const char* host, unsigned int port, const char* path, size_t bufLen, size_t minFreespace, notifyHandler_t callback); ~AudioPipe(); @@ -104,6 +104,7 @@ public: void operator=(const AudioPipe&) = delete; private: + static std::thread serviceThread; static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static struct lws_context *context; diff --git a/mod_ibm_transcribe/ibm_transcribe_glue.cpp b/mod_ibm_transcribe/ibm_transcribe_glue.cpp index b3846f6..db842be 100644 --- a/mod_ibm_transcribe/ibm_transcribe_glue.cpp +++ b/mod_ibm_transcribe/ibm_transcribe_glue.cpp @@ -82,8 +82,8 @@ namespace { } } - static void responseHandler(switch_core_session_t* session, - const char* eventName, const char * json, const char* bugname, int finished) { + static void responseHandler(switch_core_session_t* session, const char* bugname, + const char* eventName, const char * json, int finished) { switch_event_t *event; switch_channel_t *channel = switch_core_session_get_channel(session); @@ -162,7 +162,7 @@ namespace { return path; } - static void eventCallback(const char* sessionId, ibm::AudioPipe::NotifyEvent_t event, const char* message, bool finished, bool wantsInterim, const char* bugname) { + static void eventCallback(const char* sessionId, const char* bugname, ibm::AudioPipe::NotifyEvent_t event, const char* message, bool finished, bool wantsInterim) { switch_core_session_t* session = switch_core_session_locate(sessionId); if (session) { bool releaseAudioPipe = false; @@ -256,10 +256,11 @@ namespace { tech_pvt->channels = channels; tech_pvt->id = ++idxCallCount; tech_pvt->buffer_overrun_notified = 0; - + strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); + size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); - ibm::AudioPipe* ap = new ibm::AudioPipe(tech_pvt->sessionId, tech_pvt->host, tech_pvt->port, tech_pvt->path, + ibm::AudioPipe* ap = new ibm::AudioPipe(tech_pvt->sessionId, bugname, tech_pvt->host, tech_pvt->port, tech_pvt->path, buflen, read_impl.decoded_bytes_per_packet, eventCallback); if (!ap) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n"); @@ -311,7 +312,8 @@ extern "C" { switch_status_t ibm_transcribe_init() { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_ibm_transcribe: audio buffer (in secs): %d secs\n", nAudioBufferSecs); - int logs = LLL_ERR | LLL_WARN | LLL_NOTICE || LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; + int logs = LLL_ERR | LLL_WARN | LLL_NOTICE ; + // | LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; ibm::AudioPipe::initialize(logs, lws_logger); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::initialize completed\n"); diff --git a/mod_ibm_transcribe/mod_ibm_transcribe.c b/mod_ibm_transcribe/mod_ibm_transcribe.c index 9faa449..c58268b 100644 --- a/mod_ibm_transcribe/mod_ibm_transcribe.c +++ b/mod_ibm_transcribe/mod_ibm_transcribe.c @@ -56,7 +56,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi void *pUserData; uint32_t samples_per_second; - if (switch_channel_get_private(channel, MY_BUG_NAME)) { + if (switch_channel_get_private(channel, bugname)) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n"); do_stop(session, bugname); } @@ -76,7 +76,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi if ((status = switch_core_media_bug_add(session, "ibm_transcribe", NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { return status; } - switch_channel_set_private(channel, MY_BUG_NAME, bug); + switch_channel_set_private(channel, bugname, bug); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for ibm transcribe\n"); return SWITCH_STATUS_SUCCESS; @@ -87,7 +87,7 @@ static switch_status_t do_stop(switch_core_session_t *session, char* bugname) switch_status_t status = SWITCH_STATUS_SUCCESS; switch_channel_t *channel = switch_core_session_get_channel(session); - switch_media_bug_t *bug = switch_channel_get_private(channel, MY_BUG_NAME); + switch_media_bug_t *bug = switch_channel_get_private(channel, bugname); if (bug) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Received user command command to stop transcribe.\n"); diff --git a/mod_jambonz_transcribe/audio_pipe.cpp b/mod_jambonz_transcribe/audio_pipe.cpp index c693763..9025406 100644 --- a/mod_jambonz_transcribe/audio_pipe.cpp +++ b/mod_jambonz_transcribe/audio_pipe.cpp @@ -240,6 +240,7 @@ static const lws_retry_bo_t retry = { }; struct lws_context *AudioPipe::context = nullptr; +std::thread AudioPipe::serviceThread; std::mutex AudioPipe::mutex_connects; std::mutex AudioPipe::mutex_disconnects; std::mutex AudioPipe::mutex_writes; @@ -408,19 +409,21 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(int loglevel, log_emit_function logger) { - lws_set_log_level(loglevel, logger); + //lws_set_log_level(loglevel, logger); lwsl_notice("AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); - std::thread t(&AudioPipe::lws_service_thread); stopFlag = false; - t.detach(); + serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { lwsl_notice("AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; + if (serviceThread.joinable()) { + serviceThread.join(); + } return true; } diff --git a/mod_jambonz_transcribe/audio_pipe.hpp b/mod_jambonz_transcribe/audio_pipe.hpp index dcc9e84..e953065 100644 --- a/mod_jambonz_transcribe/audio_pipe.hpp +++ b/mod_jambonz_transcribe/audio_pipe.hpp @@ -86,6 +86,7 @@ public: void operator=(const AudioPipe&) = delete; private: + static std::thread serviceThread; static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static struct lws_context *context; diff --git a/mod_jambonz_transcribe/jb_transcribe_glue.cpp b/mod_jambonz_transcribe/jb_transcribe_glue.cpp index d703275..0074aa6 100644 --- a/mod_jambonz_transcribe/jb_transcribe_glue.cpp +++ b/mod_jambonz_transcribe/jb_transcribe_glue.cpp @@ -256,7 +256,6 @@ namespace { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "host: %s, port: %d, path: %s\n", host, port, path); strncpy(tech_pvt->sessionId, switch_core_session_get_uuid(session), MAX_SESSION_ID); - strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); strncpy(tech_pvt->host, host, MAX_WS_URL_LEN); strncpy(tech_pvt->path, path, MAX_PATH_LEN); tech_pvt->port = port; @@ -268,6 +267,7 @@ namespace { tech_pvt->channels = channels; tech_pvt->id = ++idxCallCount; tech_pvt->buffer_overrun_notified = 0; + strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); @@ -325,7 +325,8 @@ extern "C" { switch_status_t jb_transcribe_init() { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_jambonz_transcribe: audio buffer (in secs): %d secs\n", nAudioBufferSecs); - int logs = LLL_ERR | LLL_WARN | LLL_NOTICE || LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; + int logs = LLL_ERR | LLL_WARN | LLL_NOTICE ; + // | LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; jambonz::AudioPipe::initialize(logs, lws_logger); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::initialize completed\n");