diff --git a/LICENSE_MIT b/LICENSE_MIT index e909055..b7d725f 100644 --- a/LICENSE_MIT +++ b/LICENSE_MIT @@ -2,7 +2,7 @@ Under specific conditions that are described here: https://github.com/jambonz/fr The MIT License (MIT) -Copyright (c) 2023 Drachtio Communications Services, LLC +Copyright (c) 2024 FirstFive8, Inc Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/mod_audio_fork/audio_pipe.cpp b/mod_audio_fork/audio_pipe.cpp index de3645e..b014bb4 100644 --- a/mod_audio_fork/audio_pipe.cpp +++ b/mod_audio_fork/audio_pipe.cpp @@ -65,13 +65,17 @@ int AudioPipe::lws_callback(struct lws *wsi, std::string username, password; ap->getBasicAuth(username, password); - lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER username: %s, password: xxxxxx\n", username.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER username: %s, password: xxxxxx\n", username.c_str()); if (dch_lws_http_basic_auth_gen(username.c_str(), password.c_str(), b, sizeof(b))) break; if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (unsigned char *)b, strlen(b), p, end)) return -1; } } break; + case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL: + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "AudioPipe::lws_service_thread LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL\n"); + break; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: processPendingConnects(vhd); processPendingDisconnects(vhd); @@ -81,13 +85,13 @@ int AudioPipe::lws_callback(struct lws *wsi, { AudioPipe* ap = findAndRemovePendingConnect(wsi); int rc = lws_http_client_http_response(wsi); - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); if (ap) { ap->m_state = LWS_CLIENT_FAILED; ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, NULL, len); } else { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); } } break; @@ -102,7 +106,7 @@ int AudioPipe::lws_callback(struct lws *wsi, ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, NULL, len); } else { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); } } break; @@ -110,7 +114,7 @@ int AudioPipe::lws_callback(struct lws *wsi, { AudioPipe* ap = *ppAp; if (!ap) { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CLOSED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CLOSED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); return 0; } if (ap->m_state == LWS_CLIENT_DISCONNECTING) { @@ -119,7 +123,7 @@ int AudioPipe::lws_callback(struct lws *wsi, } else if (ap->m_state == LWS_CLIENT_CONNECTED) { // closed by far end - lwsl_notice("%s socket closed by far end\n", ap->m_uuid.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"%s socket closed by far end\n", ap->m_uuid.c_str()); ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, NULL, len); } ap->m_state = LWS_CLIENT_DISCONNECTED; @@ -137,65 +141,68 @@ int AudioPipe::lws_callback(struct lws *wsi, AudioPipe* ap = *ppAp; if (!ap) { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); return 0; } if (ap->m_state == LWS_CLIENT_DISCONNECTING) { - lwsl_notice("AudioPipe::lws_service_thread race condition: got incoming message while closing the connection.\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread race condition: got incoming message while closing the connection.\n"); return 0; } if (lws_frame_is_binary(wsi)) { - if (ap->is_bidirectional_audio_stream()) { + if (len > 0 && ap->is_bidirectional_audio_stream()) { ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::BINARY, NULL, (char *) in, len); - } else { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received binary frame, discarding.\n"); - } - return 0; - } - - if (lws_is_first_fragment(wsi)) { - // allocate a buffer for the entire chunk of memory needed - assert(nullptr == ap->m_recv_buf); - ap->m_recv_buf_len = len + lws_remaining_packet_payload(wsi); - ap->m_recv_buf = (uint8_t*) malloc(ap->m_recv_buf_len); - ap->m_recv_buf_ptr = ap->m_recv_buf; - } - - size_t write_offset = ap->m_recv_buf_ptr - ap->m_recv_buf; - size_t remaining_space = ap->m_recv_buf_len - write_offset; - if (remaining_space < len) { - lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE buffer realloc needed.\n"); - size_t newlen = ap->m_recv_buf_len + RECV_BUF_REALLOC_SIZE; - if (newlen > MAX_RECV_BUF_SIZE) { - free(ap->m_recv_buf); - ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; - ap->m_recv_buf_len = 0; - lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE max buffer exceeded, truncating message.\n"); + } else if (len > 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received unexpected binary frame, discarding.\n"); } else { - ap->m_recv_buf = (uint8_t*) realloc(ap->m_recv_buf, newlen); - if (nullptr != ap->m_recv_buf) { - ap->m_recv_buf_len = newlen; - ap->m_recv_buf_ptr = ap->m_recv_buf + write_offset; - } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received zero length binary frame, discarding.\n"); } } - - if (nullptr != ap->m_recv_buf) { - if (len > 0) { - memcpy(ap->m_recv_buf_ptr, in, len); - ap->m_recv_buf_ptr += len; + else { + if (lws_is_first_fragment(wsi)) { + // allocate a buffer for the entire chunk of memory needed + assert(nullptr == ap->m_recv_buf); + ap->m_recv_buf_len = len + lws_remaining_packet_payload(wsi); + ap->m_recv_buf = (uint8_t*) malloc(ap->m_recv_buf_len); + ap->m_recv_buf_ptr = ap->m_recv_buf; } - if (lws_is_final_fragment(wsi)) { - if (nullptr != ap->m_recv_buf) { - std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf); - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::MESSAGE, msg.c_str(), NULL, len); - if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf); + + size_t write_offset = ap->m_recv_buf_ptr - ap->m_recv_buf; + size_t remaining_space = ap->m_recv_buf_len - write_offset; + if (remaining_space < len) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE buffer realloc needed.\n"); + size_t newlen = ap->m_recv_buf_len + RECV_BUF_REALLOC_SIZE; + if (newlen > MAX_RECV_BUF_SIZE) { + free(ap->m_recv_buf); + ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; + ap->m_recv_buf_len = 0; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE max buffer exceeded, truncating message.\n"); + } + else { + ap->m_recv_buf = (uint8_t*) realloc(ap->m_recv_buf, newlen); + if (nullptr != ap->m_recv_buf) { + ap->m_recv_buf_len = newlen; + ap->m_recv_buf_ptr = ap->m_recv_buf + write_offset; + } + } + } + + if (nullptr != ap->m_recv_buf) { + if (len > 0) { + memcpy(ap->m_recv_buf_ptr, in, len); + ap->m_recv_buf_ptr += len; + } + if (lws_is_final_fragment(wsi)) { + if (nullptr != ap->m_recv_buf) { + std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::MESSAGE, msg.c_str(), NULL, len); + if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf); + } + ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; + ap->m_recv_buf_len = 0; } - ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; - ap->m_recv_buf_len = 0; } } } @@ -205,13 +212,13 @@ int AudioPipe::lws_callback(struct lws *wsi, { AudioPipe* ap = *ppAp; if (!ap) { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); return 0; } // check for graceful close - send a zero length binary frame if (ap->isGracefulShutdown()) { - lwsl_notice("%s graceful shutdown - sending zero length binary frame to flush any final responses\n", ap->m_uuid.c_str()); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"%s graceful shutdown - sending zero length binary frame to flush any final responses\n", ap->m_uuid.c_str()); std::lock_guard lk(ap->m_audio_mutex); int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, 0, LWS_WRITE_BINARY); return 0; @@ -250,7 +257,7 @@ int AudioPipe::lws_callback(struct lws *wsi, size_t datalen = ap->m_audio_buffer_write_offset - LWS_PRE; int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, datalen, LWS_WRITE_BINARY); if (sent < datalen) { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s attemped to send %lu only sent %d wsi %p..\n", + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s attemped to send %lu only sent %d wsi %p..\n", ap->m_uuid.c_str(), datalen, sent, wsi); } ap->m_audio_buffer_write_offset = LWS_PRE; @@ -379,7 +386,7 @@ void AudioPipe::addPendingConnect(AudioPipe* ap) { { std::lock_guard guard(mutex_connects); pendingConnects.push_back(ap); - lwsl_notice("%s after adding connect there are %lu pending connects\n", + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s after adding connect there are %lu pending connects\n", ap->m_uuid.c_str(), pendingConnects.size()); } lws_cancel_service(context); @@ -389,7 +396,7 @@ void AudioPipe::addPendingDisconnect(AudioPipe* ap) { { std::lock_guard guard(mutex_disconnects); pendingDisconnects.push_back(ap); - lwsl_notice("%s after adding disconnect there are %lu pending disconnects\n", + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s after adding disconnect there are %lu pending disconnects\n", ap->m_uuid.c_str(), pendingDisconnects.size()); } lws_cancel_service(ap->m_vhd->context); @@ -428,11 +435,11 @@ bool AudioPipe::lws_service_thread() { info.timeout_secs_ah_idle = 10; // secs to allow a client to hold an ah without using it info.retry_and_idle_policy = &retry; - lwsl_notice("AudioPipe::lws_service_thread creating context\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread creating context\n"); context = lws_create_context(&info); if (!context) { - lwsl_err("AudioPipe::lws_service_thread failed creating context\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread failed creating context\n"); return false; } @@ -449,16 +456,16 @@ bool AudioPipe::lws_service_thread() { void AudioPipe::initialize(const char* protocol, int loglevel, log_emit_function logger) { protocolName = protocol; - //lws_set_log_level(loglevel, logger); + lws_set_log_level(loglevel, logger); - lwsl_notice("AudioPipe::initialize starting\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE,"AudioPipe::initialize starting\n"); std::lock_guard lock(mapMutex); stopFlag = false; serviceThread = std::thread(&AudioPipe::lws_service_thread); } bool AudioPipe::deinitialize() { - lwsl_notice("AudioPipe::deinitialize\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE,"AudioPipe::deinitialize\n"); std::lock_guard lock(mapMutex); stopFlag = true; if (serviceThread.joinable()) { @@ -513,7 +520,7 @@ bool AudioPipe::connect_client(struct lws_per_vhost_data *vhd) { m_vhd = vhd; m_wsi = lws_client_connect_via_info(&i); - lwsl_notice("%s attempting connection, wsi is %p\n", m_uuid.c_str(), m_wsi); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s attempting connection, wsi is %p\n", m_uuid.c_str(), m_wsi); return nullptr != m_wsi; } diff --git a/mod_audio_fork/lws_glue.cpp b/mod_audio_fork/lws_glue.cpp index a375694..4dbac06 100644 --- a/mod_audio_fork/lws_glue.cpp +++ b/mod_audio_fork/lws_glue.cpp @@ -21,11 +21,12 @@ #include + typedef boost::circular_buffer CircularBuffer_t; #define RTP_PACKETIZATION_PERIOD 20 #define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/ -#define BUFFER_GROW_SIZE (8192) +#define BUFFER_GROW_SIZE (16384) namespace { static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS"); @@ -38,28 +39,92 @@ namespace { static uint32_t playCount = 0; switch_status_t processIncomingBinary(private_t* tech_pvt, switch_core_session_t* session, const char* message, size_t dataLength) { - uint8_t* data = reinterpret_cast(const_cast(message)); - uint16_t* data_uint16 = reinterpret_cast(data); - std::vector pcm_data(data_uint16, data_uint16 + dataLength / sizeof(uint16_t)); + std::vector data; + + // Prepend the set-aside byte if there is one + if (tech_pvt->has_set_aside_byte) { + data.push_back(tech_pvt->set_aside_byte); + tech_pvt->has_set_aside_byte = false; + } + + // Append the new incoming message + data.insert(data.end(), message, message + dataLength); + + // Check if the total data length is now odd + if (data.size() % 2 != 0) { + // Set aside the last byte + tech_pvt->set_aside_byte = data.back(); + tech_pvt->has_set_aside_byte = true; + data.pop_back(); // Remove the last byte from the data vector + } + + // Convert the data to 16-bit elements + const uint16_t* data_uint16 = reinterpret_cast(data.data()); + size_t numSamples = data.size() / sizeof(uint16_t); + + // Access the prebuffer + CircularBuffer_t* cBuffer = static_cast(tech_pvt->streamingPreBuffer); + + // Ensure the prebuffer has enough capacity + if (cBuffer->capacity() - cBuffer->size() < numSamples) { + size_t newCapacity = cBuffer->size() + std::max(numSamples, (size_t)BUFFER_GROW_SIZE); + cBuffer->set_capacity(newCapacity); + } + + // Append the data to the prebuffer + cBuffer->insert(cBuffer->end(), data_uint16, data_uint16 + numSamples); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Appended %zu 16-bit samples to the prebuffer.\n", numSamples); + + // if we haven't reached threshold amount of prebuffered data, return + if (cBuffer->size() < tech_pvt->streamingPreBufSize) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Prebuffered data is below threshold %u, returning.\n", tech_pvt->streamingPreBufSize); + return SWITCH_STATUS_SUCCESS; + } + + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Prebuffered data samples %u is above threshold %u, prepare to playout.\n", cBuffer->size(), tech_pvt->streamingPreBufSize); + + // after initial pre-buffering, rachet down the threshold to 40ms + tech_pvt->streamingPreBufSize = 320 * tech_pvt->downscale_factor * 2; + + // Check for downsampling factor + size_t downsample_factor = tech_pvt->downscale_factor; + + // Calculate the number of samples that can be evenly divided by the downsample factor + size_t numCompleteSamples = (cBuffer->size() / downsample_factor) * downsample_factor; + + // Handle leftover samples + std::vector leftoverSamples; + size_t numLeftoverSamples = cBuffer->size() - numCompleteSamples; + if (numLeftoverSamples > 0) { + leftoverSamples.assign(cBuffer->end() - numLeftoverSamples, cBuffer->end()); + cBuffer->resize(numCompleteSamples); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Temporarily removing %u leftover samples due to downsampling.\n", numLeftoverSamples); + } // resample if necessary + std::vector out; try { if (tech_pvt->bidirectional_audio_resampler) { - std::vector in(pcm_data.begin(), pcm_data.end()); + // Improvement: Use assign to convert circular buffer to vector for resampling + std::vector in; + in.assign(cBuffer->begin(), cBuffer->end()); + out.resize(in.size() * 6); // max upsampling would be from 8k -> 48k - std::vector out(dataLength); - spx_uint32_t in_len = pcm_data.size(); + spx_uint32_t in_len = in.size(); spx_uint32_t out_len = out.size(); + + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resampling %u samples into a buffer that can hold %u samples\n", in.size(), out_len); + speex_resampler_process_interleaved_int(tech_pvt->bidirectional_audio_resampler, in.data(), &in_len, out.data(), &out_len); - if (out_len > out.size()) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Resampler output exceeded maximum buffer size!\n"); - return SWITCH_STATUS_FALSE; - } + // Resize the output buffer to match the output length from resampler + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resizing output buffer from %u to %u samples\n", in.size(), out_len); - // Resize the pcm_data to match the output length from resampler, and then copy the resampled data into it. - pcm_data.resize(out_len); - memcpy(pcm_data.data(), out.data(), out_len * sizeof(int16_t)); + out.resize(out_len); + } + else { + // If no resampling is needed, copy the data from the prebuffer to the output buffer + out.assign(cBuffer->begin(), cBuffer->end()); } } catch (const std::exception& e) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error resampling incoming binary message: %s\n", e.what()); @@ -70,34 +135,43 @@ namespace { } if (nullptr != tech_pvt->mutex && switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { - //switch_mutex_lock(tech_pvt->mutex); - CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + CircularBuffer_t *playoutBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer; try { // Resize the buffer if necessary - size_t bytesResampled = pcm_data.size() * sizeof(uint16_t); - if (cBuffer->capacity() - cBuffer->size() < bytesResampled / sizeof(uint16_t)) { - // If buffer exceeds some max size, you could return SWITCH_STATUS_FALSE to abort the transfer - // if (cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE) > MAX_BUFFER_SIZE) return SWITCH_STATUS_FALSE; - - cBuffer->set_capacity(cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE)); + if (playoutBuffer->capacity() - playoutBuffer->size() < out.size()) { + size_t newCapacity = playoutBuffer->size() + std::max(out.size(), (size_t)BUFFER_GROW_SIZE); + playoutBuffer->set_capacity(newCapacity); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resized playout buffer to new capacity: %zu\n", newCapacity); } // Push the data into the buffer. - cBuffer->insert(cBuffer->end(), pcm_data.begin(), pcm_data.end()); + playoutBuffer->insert(playoutBuffer->end(), out.begin(), out.end()); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Appended %zu 16-bit samples to the playout buffer.\n", out.size()); } catch (const std::exception& e) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message: %s\n", e.what()); switch_mutex_unlock(tech_pvt->mutex); + cBuffer->clear(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message: %s\n", e.what()); return SWITCH_STATUS_FALSE; } catch (...) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message\n"); switch_mutex_unlock(tech_pvt->mutex); + cBuffer->clear(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message\n"); return SWITCH_STATUS_FALSE; } switch_mutex_unlock(tech_pvt->mutex); + cBuffer->clear(); + + // Put the leftover samples back in the prebuffer for the next time + if (!leftoverSamples.empty()) { + cBuffer->insert(cBuffer->end(), leftoverSamples.begin(), leftoverSamples.end()); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Put back %u leftover samples into the prebuffer.\n", leftoverSamples.size()); + } return SWITCH_STATUS_SUCCESS; } - return SWITCH_STATUS_FALSE; -} + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Failed to get mutext (temp)\n"); + + return SWITCH_STATUS_SUCCESS; + } void processIncomingMessage(private_t* tech_pvt, switch_core_session_t* session, const char* message) { std::string msg = message; @@ -193,7 +267,7 @@ namespace { switch_channel_set_flag_value(channel, CF_BREAK, 2); // this will dump buffered incoming audio - tech_pvt->clear_bidirectional_audio_buffer = 1; + tech_pvt->clear_bidirectional_audio_buffer = true; } else if (0 == type.compare("transcription")) { char* jsonString = cJSON_PrintUnformatted(jsonData); @@ -313,11 +387,20 @@ namespace { tech_pvt->buffer_overrun_notified = 0; tech_pvt->audio_paused = 0; tech_pvt->graceful_shutdown = 0; - tech_pvt->circularBuffer = (void *) new CircularBuffer_t(8192); + tech_pvt->streamingPlayoutBuffer = (void *) new CircularBuffer_t(8192); tech_pvt->bidirectional_audio_enable = bidirectional_audio_enable; tech_pvt->bidirectional_audio_stream = bidirectional_audio_stream; tech_pvt->bidirectional_audio_sample_rate = bidirectional_audio_sample_rate; - tech_pvt->clear_bidirectional_audio_buffer = 0; + tech_pvt->clear_bidirectional_audio_buffer = false; + tech_pvt->has_set_aside_byte = 0; + tech_pvt->downscale_factor = 1; + if (bidirectional_audio_sample_rate > sampling) { + tech_pvt->downscale_factor = bidirectional_audio_sample_rate / sampling; + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "downscale_factor is %d\n", tech_pvt->downscale_factor); + } + tech_pvt->streamingPreBufSize = 320 * tech_pvt->downscale_factor * 4; // min 80ms prebuffer + tech_pvt->streamingPreBuffer = (void *) new CircularBuffer_t(8192); + strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN); @@ -347,8 +430,8 @@ namespace { } if (bidirectional_audio_sample_rate && sampling != bidirectional_audio_sample_rate) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) bidirectional audio resampling from %u to %u\n", tech_pvt->id, bidirectional_audio_sample_rate, sampling); - tech_pvt->bidirectional_audio_resampler = speex_resampler_init(channels, bidirectional_audio_sample_rate, sampling, SWITCH_RESAMPLE_QUALITY, &err); + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) bidirectional audio resampling from %u to %u, channels %d\n", tech_pvt->id, bidirectional_audio_sample_rate, sampling, channels); + tech_pvt->bidirectional_audio_resampler = speex_resampler_init(1, bidirectional_audio_sample_rate, sampling, SWITCH_RESAMPLE_QUALITY, &err); if (0 != err) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing bidirectional audio resampler: %s.\n", speex_resampler_strerror(err)); return SWITCH_STATUS_FALSE; @@ -374,10 +457,15 @@ namespace { switch_mutex_destroy(tech_pvt->mutex); tech_pvt->mutex = nullptr; } - if (tech_pvt->circularBuffer) { - CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + if (tech_pvt->streamingPlayoutBuffer) { + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer; delete cBuffer; - tech_pvt->circularBuffer = nullptr; + tech_pvt->streamingPlayoutBuffer = nullptr; + } + if (tech_pvt->streamingPreBuffer) { + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPreBuffer; + delete cBuffer; + tech_pvt->streamingPreBuffer = nullptr; } } @@ -477,8 +565,8 @@ extern "C" { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: audio buffer (in secs): %d secs\n", nAudioBufferSecs); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: sub-protocol: %s\n", mySubProtocolName); - int logs = LLL_ERR | LLL_WARN | LLL_NOTICE ; - //LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; + //int logs = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ; + int logs = LLL_ERR | LLL_WARN | LLL_NOTICE; drachtio::AudioPipe::initialize(mySubProtocolName, logs, lws_logger); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork successfully initialized\n"); return SWITCH_STATUS_SUCCESS; @@ -730,14 +818,14 @@ extern "C" { } switch_bool_t dub_speech_frame(switch_media_bug_t *bug, private_t* tech_pvt) { - CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer; if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { // if flag was set to clear the buffer, do so and clear the flag if (tech_pvt->clear_bidirectional_audio_buffer) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - clearing buffer\n", tech_pvt->id); cBuffer->clear(); - tech_pvt->clear_bidirectional_audio_buffer = 0; + tech_pvt->clear_bidirectional_audio_buffer = false; } else { switch_frame_t* rframe = switch_core_media_bug_get_write_replace_frame(bug); @@ -751,6 +839,8 @@ extern "C" { int samplesToCopy = std::min(static_cast(cBuffer->size()), static_cast(rframe->samples)); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - samples to copy %u\n", tech_pvt->id, samplesToCopy); + std::copy_n(cBuffer->begin(), samplesToCopy, data); cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy); @@ -775,7 +865,7 @@ extern "C" { } private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); - CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer; if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { if (cBuffer != nullptr) { diff --git a/mod_audio_fork/mod_audio_fork.h b/mod_audio_fork/mod_audio_fork.h index a5ddc98..ec4d120 100644 --- a/mod_audio_fork/mod_audio_fork.h +++ b/mod_audio_fork/mod_audio_fork.h @@ -52,7 +52,14 @@ struct private_data { int audio_paused:1; int graceful_shutdown:1; char initialMetadata[8192]; - void *circularBuffer; + + // bidirectional audio + void *streamingPlayoutBuffer; + void *streamingPreBuffer; + int streamingPreBufSize; + uint8_t set_aside_byte; + int has_set_aside_byte; + int downscale_factor; SpeexResamplerState *bidirectional_audio_resampler; int bidirectional_audio_enable; int bidirectional_audio_stream;