fixes for resampling and handling odd-number byte stream (#88)

* fixes for resampling and handling odd-number byte stream
This commit is contained in:
Dave Horton
2024-07-19 16:45:45 -04:00
committed by GitHub
parent 56df923cdb
commit 3ce819b7c9
4 changed files with 207 additions and 103 deletions

View File

@@ -2,7 +2,7 @@ Under specific conditions that are described here: https://github.com/jambonz/fr
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2023 Drachtio Communications Services, LLC Copyright (c) 2024 FirstFive8, Inc
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@@ -65,13 +65,17 @@ int AudioPipe::lws_callback(struct lws *wsi,
std::string username, password; std::string username, password;
ap->getBasicAuth(username, password); ap->getBasicAuth(username, password);
lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER username: %s, password: xxxxxx\n", username.c_str()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER username: %s, password: xxxxxx\n", username.c_str());
if (dch_lws_http_basic_auth_gen(username.c_str(), password.c_str(), b, sizeof(b))) break; if (dch_lws_http_basic_auth_gen(username.c_str(), password.c_str(), b, sizeof(b))) break;
if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (unsigned char *)b, strlen(b), p, end)) return -1; if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (unsigned char *)b, strlen(b), p, end)) return -1;
} }
} }
break; break;
case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "AudioPipe::lws_service_thread LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL\n");
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED: case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
processPendingConnects(vhd); processPendingConnects(vhd);
processPendingDisconnects(vhd); processPendingDisconnects(vhd);
@@ -81,13 +85,13 @@ int AudioPipe::lws_callback(struct lws *wsi,
{ {
AudioPipe* ap = findAndRemovePendingConnect(wsi); AudioPipe* ap = findAndRemovePendingConnect(wsi);
int rc = lws_http_client_http_response(wsi); int rc = lws_http_client_http_response(wsi);
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc);
if (ap) { if (ap) {
ap->m_state = LWS_CLIENT_FAILED; ap->m_state = LWS_CLIENT_FAILED;
ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, NULL, len); ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, NULL, len);
} }
else { else {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi);
} }
} }
break; break;
@@ -102,7 +106,7 @@ int AudioPipe::lws_callback(struct lws *wsi,
ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, NULL, len); ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, NULL, len);
} }
else { else {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
} }
} }
break; break;
@@ -110,7 +114,7 @@ int AudioPipe::lws_callback(struct lws *wsi,
{ {
AudioPipe* ap = *ppAp; AudioPipe* ap = *ppAp;
if (!ap) { if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CLOSED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CLOSED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0; return 0;
} }
if (ap->m_state == LWS_CLIENT_DISCONNECTING) { if (ap->m_state == LWS_CLIENT_DISCONNECTING) {
@@ -119,7 +123,7 @@ int AudioPipe::lws_callback(struct lws *wsi,
} }
else if (ap->m_state == LWS_CLIENT_CONNECTED) { else if (ap->m_state == LWS_CLIENT_CONNECTED) {
// closed by far end // closed by far end
lwsl_notice("%s socket closed by far end\n", ap->m_uuid.c_str()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"%s socket closed by far end\n", ap->m_uuid.c_str());
ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, NULL, len); ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, NULL, len);
} }
ap->m_state = LWS_CLIENT_DISCONNECTED; ap->m_state = LWS_CLIENT_DISCONNECTED;
@@ -137,24 +141,26 @@ int AudioPipe::lws_callback(struct lws *wsi,
AudioPipe* ap = *ppAp; AudioPipe* ap = *ppAp;
if (!ap) { if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0; return 0;
} }
if (ap->m_state == LWS_CLIENT_DISCONNECTING) { if (ap->m_state == LWS_CLIENT_DISCONNECTING) {
lwsl_notice("AudioPipe::lws_service_thread race condition: got incoming message while closing the connection.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread race condition: got incoming message while closing the connection.\n");
return 0; return 0;
} }
if (lws_frame_is_binary(wsi)) { if (lws_frame_is_binary(wsi)) {
if (ap->is_bidirectional_audio_stream()) { if (len > 0 && ap->is_bidirectional_audio_stream()) {
ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::BINARY, NULL, (char *) in, len); ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::BINARY, NULL, (char *) in, len);
} else { } else if (len > 0) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received binary frame, discarding.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received unexpected binary frame, discarding.\n");
} }
return 0; else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received zero length binary frame, discarding.\n");
} }
}
else {
if (lws_is_first_fragment(wsi)) { if (lws_is_first_fragment(wsi)) {
// allocate a buffer for the entire chunk of memory needed // allocate a buffer for the entire chunk of memory needed
assert(nullptr == ap->m_recv_buf); assert(nullptr == ap->m_recv_buf);
@@ -166,13 +172,13 @@ int AudioPipe::lws_callback(struct lws *wsi,
size_t write_offset = ap->m_recv_buf_ptr - ap->m_recv_buf; size_t write_offset = ap->m_recv_buf_ptr - ap->m_recv_buf;
size_t remaining_space = ap->m_recv_buf_len - write_offset; size_t remaining_space = ap->m_recv_buf_len - write_offset;
if (remaining_space < len) { if (remaining_space < len) {
lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE buffer realloc needed.\n"); //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE buffer realloc needed.\n");
size_t newlen = ap->m_recv_buf_len + RECV_BUF_REALLOC_SIZE; size_t newlen = ap->m_recv_buf_len + RECV_BUF_REALLOC_SIZE;
if (newlen > MAX_RECV_BUF_SIZE) { if (newlen > MAX_RECV_BUF_SIZE) {
free(ap->m_recv_buf); free(ap->m_recv_buf);
ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr; ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr;
ap->m_recv_buf_len = 0; ap->m_recv_buf_len = 0;
lwsl_notice("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE max buffer exceeded, truncating message.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE max buffer exceeded, truncating message.\n");
} }
else { else {
ap->m_recv_buf = (uint8_t*) realloc(ap->m_recv_buf, newlen); ap->m_recv_buf = (uint8_t*) realloc(ap->m_recv_buf, newlen);
@@ -199,19 +205,20 @@ int AudioPipe::lws_callback(struct lws *wsi,
} }
} }
} }
}
break; break;
case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_CLIENT_WRITEABLE:
{ {
AudioPipe* ap = *ppAp; AudioPipe* ap = *ppAp;
if (!ap) { if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0; return 0;
} }
// check for graceful close - send a zero length binary frame // check for graceful close - send a zero length binary frame
if (ap->isGracefulShutdown()) { if (ap->isGracefulShutdown()) {
lwsl_notice("%s graceful shutdown - sending zero length binary frame to flush any final responses\n", ap->m_uuid.c_str()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"%s graceful shutdown - sending zero length binary frame to flush any final responses\n", ap->m_uuid.c_str());
std::lock_guard<std::mutex> lk(ap->m_audio_mutex); std::lock_guard<std::mutex> lk(ap->m_audio_mutex);
int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, 0, LWS_WRITE_BINARY); int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, 0, LWS_WRITE_BINARY);
return 0; return 0;
@@ -250,7 +257,7 @@ int AudioPipe::lws_callback(struct lws *wsi,
size_t datalen = ap->m_audio_buffer_write_offset - LWS_PRE; size_t datalen = ap->m_audio_buffer_write_offset - LWS_PRE;
int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, datalen, LWS_WRITE_BINARY); int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, datalen, LWS_WRITE_BINARY);
if (sent < datalen) { if (sent < datalen) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s attemped to send %lu only sent %d wsi %p..\n", switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s attemped to send %lu only sent %d wsi %p..\n",
ap->m_uuid.c_str(), datalen, sent, wsi); ap->m_uuid.c_str(), datalen, sent, wsi);
} }
ap->m_audio_buffer_write_offset = LWS_PRE; ap->m_audio_buffer_write_offset = LWS_PRE;
@@ -379,7 +386,7 @@ void AudioPipe::addPendingConnect(AudioPipe* ap) {
{ {
std::lock_guard<std::mutex> guard(mutex_connects); std::lock_guard<std::mutex> guard(mutex_connects);
pendingConnects.push_back(ap); pendingConnects.push_back(ap);
lwsl_notice("%s after adding connect there are %lu pending connects\n", switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s after adding connect there are %lu pending connects\n",
ap->m_uuid.c_str(), pendingConnects.size()); ap->m_uuid.c_str(), pendingConnects.size());
} }
lws_cancel_service(context); lws_cancel_service(context);
@@ -389,7 +396,7 @@ void AudioPipe::addPendingDisconnect(AudioPipe* ap) {
{ {
std::lock_guard<std::mutex> guard(mutex_disconnects); std::lock_guard<std::mutex> guard(mutex_disconnects);
pendingDisconnects.push_back(ap); pendingDisconnects.push_back(ap);
lwsl_notice("%s after adding disconnect there are %lu pending disconnects\n", switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s after adding disconnect there are %lu pending disconnects\n",
ap->m_uuid.c_str(), pendingDisconnects.size()); ap->m_uuid.c_str(), pendingDisconnects.size());
} }
lws_cancel_service(ap->m_vhd->context); lws_cancel_service(ap->m_vhd->context);
@@ -428,11 +435,11 @@ bool AudioPipe::lws_service_thread() {
info.timeout_secs_ah_idle = 10; // secs to allow a client to hold an ah without using it info.timeout_secs_ah_idle = 10; // secs to allow a client to hold an ah without using it
info.retry_and_idle_policy = &retry; info.retry_and_idle_policy = &retry;
lwsl_notice("AudioPipe::lws_service_thread creating context\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"AudioPipe::lws_service_thread creating context\n");
context = lws_create_context(&info); context = lws_create_context(&info);
if (!context) { if (!context) {
lwsl_err("AudioPipe::lws_service_thread failed creating context\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,"AudioPipe::lws_service_thread failed creating context\n");
return false; return false;
} }
@@ -449,16 +456,16 @@ bool AudioPipe::lws_service_thread() {
void AudioPipe::initialize(const char* protocol, int loglevel, log_emit_function logger) { void AudioPipe::initialize(const char* protocol, int loglevel, log_emit_function logger) {
protocolName = protocol; protocolName = protocol;
//lws_set_log_level(loglevel, logger); lws_set_log_level(loglevel, logger);
lwsl_notice("AudioPipe::initialize starting\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE,"AudioPipe::initialize starting\n");
std::lock_guard<std::mutex> lock(mapMutex); std::lock_guard<std::mutex> lock(mapMutex);
stopFlag = false; stopFlag = false;
serviceThread = std::thread(&AudioPipe::lws_service_thread); serviceThread = std::thread(&AudioPipe::lws_service_thread);
} }
bool AudioPipe::deinitialize() { bool AudioPipe::deinitialize() {
lwsl_notice("AudioPipe::deinitialize\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE,"AudioPipe::deinitialize\n");
std::lock_guard<std::mutex> lock(mapMutex); std::lock_guard<std::mutex> lock(mapMutex);
stopFlag = true; stopFlag = true;
if (serviceThread.joinable()) { if (serviceThread.joinable()) {
@@ -513,7 +520,7 @@ bool AudioPipe::connect_client(struct lws_per_vhost_data *vhd) {
m_vhd = vhd; m_vhd = vhd;
m_wsi = lws_client_connect_via_info(&i); m_wsi = lws_client_connect_via_info(&i);
lwsl_notice("%s attempting connection, wsi is %p\n", m_uuid.c_str(), m_wsi); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"%s attempting connection, wsi is %p\n", m_uuid.c_str(), m_wsi);
return nullptr != m_wsi; return nullptr != m_wsi;
} }

View File

@@ -21,11 +21,12 @@
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
typedef boost::circular_buffer<uint16_t> CircularBuffer_t; typedef boost::circular_buffer<uint16_t> CircularBuffer_t;
#define RTP_PACKETIZATION_PERIOD 20 #define RTP_PACKETIZATION_PERIOD 20
#define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/ #define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/
#define BUFFER_GROW_SIZE (8192) #define BUFFER_GROW_SIZE (16384)
namespace { namespace {
static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS"); static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS");
@@ -38,28 +39,92 @@ namespace {
static uint32_t playCount = 0; static uint32_t playCount = 0;
switch_status_t processIncomingBinary(private_t* tech_pvt, switch_core_session_t* session, const char* message, size_t dataLength) { switch_status_t processIncomingBinary(private_t* tech_pvt, switch_core_session_t* session, const char* message, size_t dataLength) {
uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<char*>(message)); std::vector<uint8_t> data;
uint16_t* data_uint16 = reinterpret_cast<uint16_t*>(data);
std::vector<uint16_t> pcm_data(data_uint16, data_uint16 + dataLength / sizeof(uint16_t));
// resample if necessary // Prepend the set-aside byte if there is one
try { if (tech_pvt->has_set_aside_byte) {
if (tech_pvt->bidirectional_audio_resampler) { data.push_back(tech_pvt->set_aside_byte);
std::vector<int16_t> in(pcm_data.begin(), pcm_data.end()); tech_pvt->has_set_aside_byte = false;
std::vector<int16_t> out(dataLength);
spx_uint32_t in_len = pcm_data.size();
spx_uint32_t out_len = out.size();
speex_resampler_process_interleaved_int(tech_pvt->bidirectional_audio_resampler, in.data(), &in_len, out.data(), &out_len);
if (out_len > out.size()) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Resampler output exceeded maximum buffer size!\n");
return SWITCH_STATUS_FALSE;
} }
// Resize the pcm_data to match the output length from resampler, and then copy the resampled data into it. // Append the new incoming message
pcm_data.resize(out_len); data.insert(data.end(), message, message + dataLength);
memcpy(pcm_data.data(), out.data(), out_len * sizeof(int16_t));
// Check if the total data length is now odd
if (data.size() % 2 != 0) {
// Set aside the last byte
tech_pvt->set_aside_byte = data.back();
tech_pvt->has_set_aside_byte = true;
data.pop_back(); // Remove the last byte from the data vector
}
// Convert the data to 16-bit elements
const uint16_t* data_uint16 = reinterpret_cast<const uint16_t*>(data.data());
size_t numSamples = data.size() / sizeof(uint16_t);
// Access the prebuffer
CircularBuffer_t* cBuffer = static_cast<CircularBuffer_t*>(tech_pvt->streamingPreBuffer);
// Ensure the prebuffer has enough capacity
if (cBuffer->capacity() - cBuffer->size() < numSamples) {
size_t newCapacity = cBuffer->size() + std::max(numSamples, (size_t)BUFFER_GROW_SIZE);
cBuffer->set_capacity(newCapacity);
}
// Append the data to the prebuffer
cBuffer->insert(cBuffer->end(), data_uint16, data_uint16 + numSamples);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Appended %zu 16-bit samples to the prebuffer.\n", numSamples);
// if we haven't reached threshold amount of prebuffered data, return
if (cBuffer->size() < tech_pvt->streamingPreBufSize) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Prebuffered data is below threshold %u, returning.\n", tech_pvt->streamingPreBufSize);
return SWITCH_STATUS_SUCCESS;
}
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Prebuffered data samples %u is above threshold %u, prepare to playout.\n", cBuffer->size(), tech_pvt->streamingPreBufSize);
// after initial pre-buffering, rachet down the threshold to 40ms
tech_pvt->streamingPreBufSize = 320 * tech_pvt->downscale_factor * 2;
// Check for downsampling factor
size_t downsample_factor = tech_pvt->downscale_factor;
// Calculate the number of samples that can be evenly divided by the downsample factor
size_t numCompleteSamples = (cBuffer->size() / downsample_factor) * downsample_factor;
// Handle leftover samples
std::vector<uint16_t> leftoverSamples;
size_t numLeftoverSamples = cBuffer->size() - numCompleteSamples;
if (numLeftoverSamples > 0) {
leftoverSamples.assign(cBuffer->end() - numLeftoverSamples, cBuffer->end());
cBuffer->resize(numCompleteSamples);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Temporarily removing %u leftover samples due to downsampling.\n", numLeftoverSamples);
}
// resample if necessary
std::vector<int16_t> out;
try {
if (tech_pvt->bidirectional_audio_resampler) {
// Improvement: Use assign to convert circular buffer to vector for resampling
std::vector<int16_t> in;
in.assign(cBuffer->begin(), cBuffer->end());
out.resize(in.size() * 6); // max upsampling would be from 8k -> 48k
spx_uint32_t in_len = in.size();
spx_uint32_t out_len = out.size();
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resampling %u samples into a buffer that can hold %u samples\n", in.size(), out_len);
speex_resampler_process_interleaved_int(tech_pvt->bidirectional_audio_resampler, in.data(), &in_len, out.data(), &out_len);
// Resize the output buffer to match the output length from resampler
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resizing output buffer from %u to %u samples\n", in.size(), out_len);
out.resize(out_len);
}
else {
// If no resampling is needed, copy the data from the prebuffer to the output buffer
out.assign(cBuffer->begin(), cBuffer->end());
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error resampling incoming binary message: %s\n", e.what()); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error resampling incoming binary message: %s\n", e.what());
@@ -70,33 +135,42 @@ namespace {
} }
if (nullptr != tech_pvt->mutex && switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { if (nullptr != tech_pvt->mutex && switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) {
//switch_mutex_lock(tech_pvt->mutex); CircularBuffer_t *playoutBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer;
CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer;
try { try {
// Resize the buffer if necessary // Resize the buffer if necessary
size_t bytesResampled = pcm_data.size() * sizeof(uint16_t); if (playoutBuffer->capacity() - playoutBuffer->size() < out.size()) {
if (cBuffer->capacity() - cBuffer->size() < bytesResampled / sizeof(uint16_t)) { size_t newCapacity = playoutBuffer->size() + std::max(out.size(), (size_t)BUFFER_GROW_SIZE);
// If buffer exceeds some max size, you could return SWITCH_STATUS_FALSE to abort the transfer playoutBuffer->set_capacity(newCapacity);
// if (cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE) > MAX_BUFFER_SIZE) return SWITCH_STATUS_FALSE; //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Resized playout buffer to new capacity: %zu\n", newCapacity);
cBuffer->set_capacity(cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE));
} }
// Push the data into the buffer. // Push the data into the buffer.
cBuffer->insert(cBuffer->end(), pcm_data.begin(), pcm_data.end()); playoutBuffer->insert(playoutBuffer->end(), out.begin(), out.end());
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Appended %zu 16-bit samples to the playout buffer.\n", out.size());
} catch (const std::exception& e) { } catch (const std::exception& e) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message: %s\n", e.what());
switch_mutex_unlock(tech_pvt->mutex); switch_mutex_unlock(tech_pvt->mutex);
cBuffer->clear();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message: %s\n", e.what());
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} catch (...) { } catch (...) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message\n");
switch_mutex_unlock(tech_pvt->mutex); switch_mutex_unlock(tech_pvt->mutex);
cBuffer->clear();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error processing incoming binary message\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
switch_mutex_unlock(tech_pvt->mutex); switch_mutex_unlock(tech_pvt->mutex);
cBuffer->clear();
// Put the leftover samples back in the prebuffer for the next time
if (!leftoverSamples.empty()) {
cBuffer->insert(cBuffer->end(), leftoverSamples.begin(), leftoverSamples.end());
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Put back %u leftover samples into the prebuffer.\n", leftoverSamples.size());
}
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }
return SWITCH_STATUS_FALSE; switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Failed to get mutext (temp)\n");
return SWITCH_STATUS_SUCCESS;
} }
void processIncomingMessage(private_t* tech_pvt, switch_core_session_t* session, const char* message) { void processIncomingMessage(private_t* tech_pvt, switch_core_session_t* session, const char* message) {
@@ -193,7 +267,7 @@ namespace {
switch_channel_set_flag_value(channel, CF_BREAK, 2); switch_channel_set_flag_value(channel, CF_BREAK, 2);
// this will dump buffered incoming audio // this will dump buffered incoming audio
tech_pvt->clear_bidirectional_audio_buffer = 1; tech_pvt->clear_bidirectional_audio_buffer = true;
} }
else if (0 == type.compare("transcription")) { else if (0 == type.compare("transcription")) {
char* jsonString = cJSON_PrintUnformatted(jsonData); char* jsonString = cJSON_PrintUnformatted(jsonData);
@@ -313,11 +387,20 @@ namespace {
tech_pvt->buffer_overrun_notified = 0; tech_pvt->buffer_overrun_notified = 0;
tech_pvt->audio_paused = 0; tech_pvt->audio_paused = 0;
tech_pvt->graceful_shutdown = 0; tech_pvt->graceful_shutdown = 0;
tech_pvt->circularBuffer = (void *) new CircularBuffer_t(8192); tech_pvt->streamingPlayoutBuffer = (void *) new CircularBuffer_t(8192);
tech_pvt->bidirectional_audio_enable = bidirectional_audio_enable; tech_pvt->bidirectional_audio_enable = bidirectional_audio_enable;
tech_pvt->bidirectional_audio_stream = bidirectional_audio_stream; tech_pvt->bidirectional_audio_stream = bidirectional_audio_stream;
tech_pvt->bidirectional_audio_sample_rate = bidirectional_audio_sample_rate; tech_pvt->bidirectional_audio_sample_rate = bidirectional_audio_sample_rate;
tech_pvt->clear_bidirectional_audio_buffer = 0; tech_pvt->clear_bidirectional_audio_buffer = false;
tech_pvt->has_set_aside_byte = 0;
tech_pvt->downscale_factor = 1;
if (bidirectional_audio_sample_rate > sampling) {
tech_pvt->downscale_factor = bidirectional_audio_sample_rate / sampling;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "downscale_factor is %d\n", tech_pvt->downscale_factor);
}
tech_pvt->streamingPreBufSize = 320 * tech_pvt->downscale_factor * 4; // min 80ms prebuffer
tech_pvt->streamingPreBuffer = (void *) new CircularBuffer_t(8192);
strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN);
if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN); if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN);
@@ -347,8 +430,8 @@ namespace {
} }
if (bidirectional_audio_sample_rate && sampling != bidirectional_audio_sample_rate) { if (bidirectional_audio_sample_rate && sampling != bidirectional_audio_sample_rate) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) bidirectional audio resampling from %u to %u\n", tech_pvt->id, bidirectional_audio_sample_rate, sampling); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) bidirectional audio resampling from %u to %u, channels %d\n", tech_pvt->id, bidirectional_audio_sample_rate, sampling, channels);
tech_pvt->bidirectional_audio_resampler = speex_resampler_init(channels, bidirectional_audio_sample_rate, sampling, SWITCH_RESAMPLE_QUALITY, &err); tech_pvt->bidirectional_audio_resampler = speex_resampler_init(1, bidirectional_audio_sample_rate, sampling, SWITCH_RESAMPLE_QUALITY, &err);
if (0 != err) { if (0 != err) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing bidirectional audio resampler: %s.\n", speex_resampler_strerror(err)); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing bidirectional audio resampler: %s.\n", speex_resampler_strerror(err));
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
@@ -374,10 +457,15 @@ namespace {
switch_mutex_destroy(tech_pvt->mutex); switch_mutex_destroy(tech_pvt->mutex);
tech_pvt->mutex = nullptr; tech_pvt->mutex = nullptr;
} }
if (tech_pvt->circularBuffer) { if (tech_pvt->streamingPlayoutBuffer) {
CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer;
delete cBuffer; delete cBuffer;
tech_pvt->circularBuffer = nullptr; tech_pvt->streamingPlayoutBuffer = nullptr;
}
if (tech_pvt->streamingPreBuffer) {
CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPreBuffer;
delete cBuffer;
tech_pvt->streamingPreBuffer = nullptr;
} }
} }
@@ -477,8 +565,8 @@ extern "C" {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: audio buffer (in secs): %d secs\n", nAudioBufferSecs); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: audio buffer (in secs): %d secs\n", nAudioBufferSecs);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: sub-protocol: %s\n", mySubProtocolName); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork: sub-protocol: %s\n", mySubProtocolName);
//int logs = LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ;
int logs = LLL_ERR | LLL_WARN | LLL_NOTICE; int logs = LLL_ERR | LLL_WARN | LLL_NOTICE;
//LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ;
drachtio::AudioPipe::initialize(mySubProtocolName, logs, lws_logger); drachtio::AudioPipe::initialize(mySubProtocolName, logs, lws_logger);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork successfully initialized\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_audio_fork successfully initialized\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@@ -730,14 +818,14 @@ extern "C" {
} }
switch_bool_t dub_speech_frame(switch_media_bug_t *bug, private_t* tech_pvt) { switch_bool_t dub_speech_frame(switch_media_bug_t *bug, private_t* tech_pvt) {
CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer;
if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) {
// if flag was set to clear the buffer, do so and clear the flag // if flag was set to clear the buffer, do so and clear the flag
if (tech_pvt->clear_bidirectional_audio_buffer) { if (tech_pvt->clear_bidirectional_audio_buffer) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - clearing buffer\n", tech_pvt->id); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - clearing buffer\n", tech_pvt->id);
cBuffer->clear(); cBuffer->clear();
tech_pvt->clear_bidirectional_audio_buffer = 0; tech_pvt->clear_bidirectional_audio_buffer = false;
} }
else { else {
switch_frame_t* rframe = switch_core_media_bug_get_write_replace_frame(bug); switch_frame_t* rframe = switch_core_media_bug_get_write_replace_frame(bug);
@@ -751,6 +839,8 @@ extern "C" {
int samplesToCopy = std::min(static_cast<int>(cBuffer->size()), static_cast<int>(rframe->samples)); int samplesToCopy = std::min(static_cast<int>(cBuffer->size()), static_cast<int>(rframe->samples));
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - samples to copy %u\n", tech_pvt->id, samplesToCopy);
std::copy_n(cBuffer->begin(), samplesToCopy, data); std::copy_n(cBuffer->begin(), samplesToCopy, data);
cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy); cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy);
@@ -775,7 +865,7 @@ extern "C" {
} }
private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug);
CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->streamingPlayoutBuffer;
if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) {
if (cBuffer != nullptr) { if (cBuffer != nullptr) {

View File

@@ -52,7 +52,14 @@ struct private_data {
int audio_paused:1; int audio_paused:1;
int graceful_shutdown:1; int graceful_shutdown:1;
char initialMetadata[8192]; char initialMetadata[8192];
void *circularBuffer;
// bidirectional audio
void *streamingPlayoutBuffer;
void *streamingPreBuffer;
int streamingPreBufSize;
uint8_t set_aside_byte;
int has_set_aside_byte;
int downscale_factor;
SpeexResamplerState *bidirectional_audio_resampler; SpeexResamplerState *bidirectional_audio_resampler;
int bidirectional_audio_enable; int bidirectional_audio_enable;
int bidirectional_audio_stream; int bidirectional_audio_stream;