diff --git a/mod_audio_fork/Makefile.am b/mod_audio_fork/Makefile.am index 2ba32ed..0da48c9 100644 --- a/mod_audio_fork/Makefile.am +++ b/mod_audio_fork/Makefile.am @@ -2,9 +2,17 @@ include $(top_srcdir)/build/modmake.rulesam MODNAME=mod_audio_fork mod_LTLIBRARIES = mod_audio_fork.la -mod_audio_fork_la_SOURCES = mod_audio_fork.c lws_glue.cpp parser.cpp audio_pipe.cpp +mod_audio_fork_la_SOURCES = mod_audio_fork.c lws_glue.cpp parser.cpp audio_pipe.cpp vector_math.cpp mod_audio_fork_la_CFLAGS = $(AM_CFLAGS) mod_audio_fork_la_CXXFLAGS = $(AM_CXXFLAGS) -std=c++11 +if USE_AVX2 +mod_audio_fork_la_CXXFLAGS += -mavx2 -DUSE_AVX2 +else +if USE_SSE2 +mod_audio_fork_la_CXXFLAGS += -msse2 -DUSE_SSE2 +endif +endif + mod_audio_fork_la_LIBADD = $(switch_builddir)/libfreeswitch.la -mod_audio_fork_la_LDFLAGS = -avoid-version -module -no-undefined -shared `pkg-config --libs libwebsockets` +mod_audio_fork_la_LDFLAGS = -avoid-version -module -no-undefined -shared `pkg-config --libs libwebsockets` -lstdc++ diff --git a/mod_audio_fork/audio_pipe.cpp b/mod_audio_fork/audio_pipe.cpp index 83ea426..521d17b 100644 --- a/mod_audio_fork/audio_pipe.cpp +++ b/mod_audio_fork/audio_pipe.cpp @@ -1,4 +1,5 @@ #include "audio_pipe.hpp" +#include #include #include @@ -83,7 +84,7 @@ int AudioPipe::lws_callback(struct lws *wsi, lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc); if (ap) { ap->m_state = LWS_CLIENT_FAILED; - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, NULL, len); } else { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi); @@ -98,7 +99,7 @@ int AudioPipe::lws_callback(struct lws *wsi, *ppAp = ap; ap->m_vhd = vhd; ap->m_state = LWS_CLIENT_CONNECTED; - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, NULL, len); } else { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); @@ -114,12 +115,12 @@ int AudioPipe::lws_callback(struct lws *wsi, } if (ap->m_state == LWS_CLIENT_DISCONNECTING) { // closed by us - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, NULL, len); } else if (ap->m_state == LWS_CLIENT_CONNECTED) { // closed by far end lwsl_notice("%s socket closed by far end\n", ap->m_uuid.c_str()); - ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL); + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, NULL, len); } ap->m_state = LWS_CLIENT_DISCONNECTED; @@ -133,14 +134,19 @@ int AudioPipe::lws_callback(struct lws *wsi, case LWS_CALLBACK_CLIENT_RECEIVE: { + AudioPipe* ap = *ppAp; if (!ap) { lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi); return 0; } - + if (lws_frame_is_binary(wsi)) { - lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received binary frame, discarding.\n"); + if (ap->is_bidirectional_audio_stream()) { + ap->m_callback(ap->m_uuid.c_str(), ap->m_bugname.c_str(), AudioPipe::BINARY, NULL, (char *) in, len); + } else { + lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received binary frame, discarding.\n"); + } return 0; } @@ -458,7 +464,8 @@ bool AudioPipe::deinitialize() { // instance members AudioPipe::AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, - int sslFlags, size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, notifyHandler_t callback) : + int sslFlags, size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, + int bidirectional_audio_stream, notifyHandler_t callback) : m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_sslFlags(sslFlags), m_audio_buffer_min_freespace(minFreespace), m_audio_buffer_max_len(bufLen), m_gracefulShutdown(false), m_audio_buffer_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_bugname(bugname), @@ -468,7 +475,7 @@ AudioPipe::AudioPipe(const char* uuid, const char* host, unsigned int port, cons m_username.assign(username); m_password.assign(password); } - + m_bidirectional_audio_stream = bidirectional_audio_stream; m_audio_buffer = new uint8_t[m_audio_buffer_max_len]; } AudioPipe::~AudioPipe() { diff --git a/mod_audio_fork/audio_pipe.hpp b/mod_audio_fork/audio_pipe.hpp index 25b457c..bfeab49 100644 --- a/mod_audio_fork/audio_pipe.hpp +++ b/mod_audio_fork/audio_pipe.hpp @@ -27,10 +27,11 @@ namespace drachtio { CONNECT_FAIL, CONNECTION_DROPPED, CONNECTION_CLOSED_GRACEFULLY, - MESSAGE + MESSAGE, + BINARY }; typedef void (*log_emit_function)(int level, const char *line); - typedef void (*notifyHandler_t)(const char *sessionId, const char* bugname, NotifyEvent_t event, const char* message); + typedef void (*notifyHandler_t)(const char *sessionId, const char* bugname, NotifyEvent_t event, const char* message, const char* binary, size_t binary_len ); struct lws_per_vhost_data { struct lws_context *context; @@ -44,7 +45,8 @@ namespace drachtio { // constructor AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path, int sslFlags, - size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, notifyHandler_t callback); + size_t bufLen, size_t minFreespace, const char* username, const char* password, char* bugname, + int bidirectional_audio, notifyHandler_t callback); ~AudioPipe(); LwsState_t getLwsState(void) { return m_state; } @@ -83,6 +85,10 @@ namespace drachtio { return m_gracefulShutdown; } + bool is_bidirectional_audio_stream() { + return m_bidirectional_audio_stream; + } + void close() ; // no default constructor or copying @@ -142,6 +148,7 @@ namespace drachtio { std::string m_username; std::string m_password; bool m_gracefulShutdown; + bool m_bidirectional_audio_stream; }; } // namespace drachtio diff --git a/mod_audio_fork/lws_glue.cpp b/mod_audio_fork/lws_glue.cpp index 45a075b..2da2f70 100644 --- a/mod_audio_fork/lws_glue.cpp +++ b/mod_audio_fork/lws_glue.cpp @@ -17,9 +17,15 @@ #include "parser.hpp" #include "mod_audio_fork.h" #include "audio_pipe.hpp" +#include "vector_math.h" + +#include + +typedef boost::circular_buffer CircularBuffer_t; #define RTP_PACKETIZATION_PERIOD 20 #define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/ +#define BUFFER_GROW_SIZE (8192) namespace { static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS"); @@ -31,6 +37,48 @@ namespace { static unsigned int idxCallCount = 0; static uint32_t playCount = 0; + switch_status_t processIncomingBinary(private_t* tech_pvt, switch_core_session_t* session, const char* message, size_t dataLength) { + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + uint8_t* data = reinterpret_cast(const_cast(message)); + uint16_t* data_uint16 = reinterpret_cast(data); + std::vector pcm_data(data_uint16, data_uint16 + dataLength / sizeof(uint16_t)); + + + if (tech_pvt->bidirectional_audio_resampler) { + std::vector in(pcm_data.begin(), pcm_data.end()); + + std::vector out(dataLength); + spx_uint32_t in_len = pcm_data.size(); + spx_uint32_t out_len = out.size(); + speex_resampler_process_interleaved_int(tech_pvt->bidirectional_audio_resampler, in.data(), &in_len, out.data(), &out_len); + + if (out_len > out.size()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Resampler output exceeded maximum buffer size!\n"); + return SWITCH_STATUS_FALSE; + } + + // Resize the pcm_data to match the output length from resampler, and then copy the resampled data into it. + pcm_data.resize(out_len); + memcpy(pcm_data.data(), out.data(), out_len * sizeof(int16_t)); + } + switch_mutex_lock(tech_pvt->mutex); + + // Resize the buffer if necessary + size_t bytesResampled = pcm_data.size() * sizeof(uint16_t); + if (cBuffer->capacity() - cBuffer->size() < bytesResampled / sizeof(uint16_t)) { + // If buffer exceeds some max size, you could return SWITCH_STATUS_FALSE to abort the transfer + // if (cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE) > MAX_BUFFER_SIZE) return SWITCH_STATUS_FALSE; + + cBuffer->set_capacity(cBuffer->size() + std::max(bytesResampled / sizeof(uint16_t), (size_t)BUFFER_GROW_SIZE)); + } + // Push the data into the buffer. + cBuffer->insert(cBuffer->end(), pcm_data.begin(), pcm_data.end()); + + switch_mutex_unlock(tech_pvt->mutex); + + return SWITCH_STATUS_SUCCESS; +} + void processIncomingMessage(private_t* tech_pvt, switch_core_session_t* session, const char* message) { std::string msg = message; std::string type; @@ -38,7 +86,10 @@ namespace { if (json) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) processIncomingMessage - received %s message\n", tech_pvt->id, type.c_str()); cJSON* jsonData = cJSON_GetObjectItem(json, "data"); - if (0 == type.compare("playAudio")) { + if (0 == type.compare("playAudio") && + // playAudio is enabled and there is no bidirectional audio from stream is enabled. + tech_pvt->bidirectional_audio_enable && + !tech_pvt->bidirectional_audio_stream) { if (jsonData) { // dont send actual audio bytes in event message cJSON* jsonFile = NULL; @@ -156,7 +207,7 @@ namespace { } } - static void eventCallback(const char* sessionId, const char* bugname, drachtio::AudioPipe::NotifyEvent_t event, const char* message) { + static void eventCallback(const char* sessionId, const char* bugname, drachtio::AudioPipe::NotifyEvent_t event, const char* message, const char* binary, size_t len) { switch_core_session_t* session = switch_core_session_locate(sessionId); if (session) { switch_channel_t *channel = switch_core_session_get_channel(session); @@ -198,6 +249,9 @@ namespace { case drachtio::AudioPipe::MESSAGE: processIncomingMessage(tech_pvt, session, message); break; + case drachtio::AudioPipe::BINARY: + processIncomingBinary(tech_pvt, session, binary, len); + break; } } } @@ -206,11 +260,13 @@ namespace { } switch_status_t fork_data_init(private_t *tech_pvt, switch_core_session_t *session, char * host, unsigned int port, char* path, int sslFlags, int sampling, int desiredSampling, int channels, - char *bugname, char* metadata, responseHandler_t responseHandler) { + char *bugname, char* metadata, int bidirectional_audio_enable, + int bidirectional_audio_stream, int bidirectional_audio_sample_rate, responseHandler_t responseHandler) { const char* username = nullptr; const char* password = nullptr; int err; + int bidirectional_audio_stream_enable = bidirectional_audio_enable + bidirectional_audio_stream; switch_codec_implementation_t read_impl; switch_channel_t *channel = switch_core_session_get_channel(session); @@ -234,13 +290,17 @@ namespace { tech_pvt->buffer_overrun_notified = 0; tech_pvt->audio_paused = 0; tech_pvt->graceful_shutdown = 0; + tech_pvt->circularBuffer = (void *) new CircularBuffer_t(8192); + tech_pvt->bidirectional_audio_enable = bidirectional_audio_enable; + tech_pvt->bidirectional_audio_stream = bidirectional_audio_stream; + tech_pvt->bidirectional_audio_sample_rate = bidirectional_audio_sample_rate; strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN); if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN); size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs); drachtio::AudioPipe* ap = new drachtio::AudioPipe(tech_pvt->sessionId, host, port, path, sslFlags, - buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, eventCallback); + buflen, read_impl.decoded_bytes_per_packet, username, password, bugname, bidirectional_audio_stream_enable, eventCallback); if (!ap) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n"); return SWITCH_STATUS_FALSE; @@ -262,6 +322,15 @@ namespace { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) no resampling needed for this call\n", tech_pvt->id); } + if (bidirectional_audio_sample_rate && sampling != bidirectional_audio_sample_rate) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) bidirectional audio resampling from %u to %u\n", tech_pvt->id, bidirectional_audio_sample_rate, sampling); + tech_pvt->bidirectional_audio_resampler = speex_resampler_init(channels, bidirectional_audio_sample_rate, sampling, SWITCH_RESAMPLE_QUALITY, &err); + if (0 != err) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing bidirectional audio resampler: %s.\n", speex_resampler_strerror(err)); + return SWITCH_STATUS_FALSE; + } + } + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) fork_data_init\n", tech_pvt->id); return SWITCH_STATUS_SUCCESS; @@ -273,10 +342,19 @@ namespace { speex_resampler_destroy(tech_pvt->resampler); tech_pvt->resampler = nullptr; } + if (tech_pvt->bidirectional_audio_resampler) { + speex_resampler_destroy(tech_pvt->bidirectional_audio_resampler); + tech_pvt->bidirectional_audio_resampler = nullptr; + } if (tech_pvt->mutex) { switch_mutex_destroy(tech_pvt->mutex); tech_pvt->mutex = nullptr; } + if (tech_pvt->circularBuffer) { + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + delete cBuffer; + tech_pvt->circularBuffer = nullptr; + } } void lws_logger(int level, const char *line) { @@ -392,18 +470,22 @@ extern "C" { } switch_status_t fork_session_init(switch_core_session_t *session, - responseHandler_t responseHandler, - uint32_t samples_per_second, - char *host, - unsigned int port, - char *path, - int sampling, - int sslFlags, - int channels, - char *bugname, - char* metadata, - void **ppUserData) - { + responseHandler_t responseHandler, + uint32_t samples_per_second, + char *host, + unsigned int port, + char *path, + int sampling, + int sslFlags, + int channels, + char *bugname, + char* metadata, + int bidirectional_audio_enable, + int bidirectional_audio_stream, + int bidirectional_audio_sample_rate, + void **ppUserData + ) + { int err; // allocate per-session data structure @@ -412,8 +494,9 @@ extern "C" { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error allocating memory!\n"); return SWITCH_STATUS_FALSE; } + if (SWITCH_STATUS_SUCCESS != fork_data_init(tech_pvt, session, host, port, path, sslFlags, samples_per_second, sampling, channels, - bugname, metadata, responseHandler)) { + bugname, metadata, bidirectional_audio_enable, bidirectional_audio_stream, bidirectional_audio_sample_rate, responseHandler)) { destroy_tech_pvt(tech_pvt); return SWITCH_STATUS_FALSE; } @@ -619,5 +702,53 @@ extern "C" { return SWITCH_TRUE; } + switch_bool_t dub_speech_frame(switch_media_bug_t *bug, private_t* tech_pvt) { + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { + switch_frame_t* rframe = switch_core_media_bug_get_write_replace_frame(bug); + int16_t *fp = reinterpret_cast(rframe->data); + + rframe->channels = 1; + rframe->datalen = rframe->samples * sizeof(int16_t); + + int16_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; + memset(data, 0, sizeof(data)); + + int samplesToCopy = std::min(static_cast(cBuffer->size()), static_cast(rframe->samples)); + + std::copy_n(cBuffer->begin(), samplesToCopy, data); + cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy); + + if (samplesToCopy > 0) { + vector_add(fp, data, rframe->samples); + } + vector_normalize(fp, rframe->samples); + + switch_core_media_bug_set_write_replace_frame(bug, rframe); + switch_mutex_unlock(tech_pvt->mutex); + } + return SWITCH_TRUE; + } + + switch_status_t fork_session_stop_play(switch_core_session_t *session, char *bugname) { + switch_channel_t *channel = switch_core_session_get_channel(session); + switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); + if (!bug) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "fork_session_stop_play failed because no bug\n"); + return SWITCH_STATUS_FALSE; + } + private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug); + + CircularBuffer_t *cBuffer = (CircularBuffer_t *) tech_pvt->circularBuffer; + + if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) { + if (cBuffer != nullptr) { + cBuffer->clear(); + } + switch_mutex_unlock(tech_pvt->mutex); + } + return SWITCH_STATUS_SUCCESS; + } + } diff --git a/mod_audio_fork/lws_glue.h b/mod_audio_fork/lws_glue.h index b95897a..e4dfb26 100644 --- a/mod_audio_fork/lws_glue.h +++ b/mod_audio_fork/lws_glue.h @@ -8,13 +8,16 @@ int parse_ws_uri(switch_channel_t *channel, const char* szServerUri, char* host, switch_status_t fork_init(); switch_status_t fork_cleanup(); switch_status_t fork_session_init(switch_core_session_t *session, responseHandler_t responseHandler, - uint32_t samples_per_second, char *host, unsigned int port, char* path, int sampling, int sslFlags, int channels, - char *bugname, char* metadata, void **ppUserData); + uint32_t samples_per_second, char *host, unsigned int port, char* path, int sampling, int sslFlags, int channels, + char *bugname, char* metadata, int bidirectional_audio_enable, + int bidirectional_audio_stream, int bidirectional_audio_sample_rate, void **ppUserData); switch_status_t fork_session_cleanup(switch_core_session_t *session, char *bugname, char* text, int channelIsClosing); +switch_status_t fork_session_stop_play(switch_core_session_t *session, char *bugname); switch_status_t fork_session_pauseresume(switch_core_session_t *session, char *bugname, int pause); switch_status_t fork_session_graceful_shutdown(switch_core_session_t *session, char *bugname); switch_status_t fork_session_send_text(switch_core_session_t *session, char *bugname, char* text); switch_bool_t fork_frame(switch_core_session_t *session, switch_media_bug_t *bug); +switch_bool_t dub_speech_frame(switch_media_bug_t *bug, private_t * tech_pvt); switch_status_t fork_service_threads(); switch_status_t fork_session_connect(void **ppUserData); #endif diff --git a/mod_audio_fork/mod_audio_fork.c b/mod_audio_fork/mod_audio_fork.c index 326ec16..e0a5017 100644 --- a/mod_audio_fork/mod_audio_fork.c +++ b/mod_audio_fork/mod_audio_fork.c @@ -27,40 +27,48 @@ static void responseHandler(switch_core_session_t* session, const char * eventNa static switch_bool_t capture_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type) { + switch_bool_t ret = SWITCH_TRUE; switch_core_session_t *session = switch_core_media_bug_get_session(bug); + private_t* tech_pvt = (private_t *) switch_core_media_bug_get_user_data(bug); switch (type) { case SWITCH_ABC_TYPE_INIT: break; case SWITCH_ABC_TYPE_CLOSE: { - private_t* tech_pvt = (private_t *) switch_core_media_bug_get_user_data(bug); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "Got SWITCH_ABC_TYPE_CLOSE for bug %s\n", tech_pvt->bugname); fork_session_cleanup(session, tech_pvt->bugname, NULL, 1); } break; case SWITCH_ABC_TYPE_READ: - return fork_frame(session, bug); + ret = fork_frame(session, bug); break; + case SWITCH_ABC_TYPE_WRITE_REPLACE: + ret = dub_speech_frame(bug, tech_pvt); + break; + case SWITCH_ABC_TYPE_WRITE: default: break; } - return SWITCH_TRUE; + return ret; } static switch_status_t start_capture(switch_core_session_t *session, - switch_media_bug_flag_t flags, - char* host, - unsigned int port, - char* path, - int sampling, - int sslFlags, - char* bugname, - char* metadata) + switch_media_bug_flag_t flags, + char* host, + unsigned int port, + char* path, + int sampling, + int sslFlags, + int bidirectional_audio_enable, + int bidirectional_audio_stream, + int bidirectional_audio_sample_rate, + char* bugname, + char* metadata) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug; @@ -71,8 +79,8 @@ static switch_status_t start_capture(switch_core_session_t *session, int channels = (flags & SMBF_STEREO) ? 2 : 1; switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, - "mod_audio_fork (%s): streaming %d sampling to %s path %s port %d tls: %s.\n", - bugname, sampling, host, path, port, sslFlags ? "yes" : "no"); + "mod_audio_fork (%s): streaming %d sampling to %s path %s port %d tls: %s bidirectional_audio_sample_rate: %d.\n", + bugname, sampling, host, path, port, sslFlags ? "yes" : "no", bidirectional_audio_sample_rate); if (switch_channel_get_private(channel, bugname)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "mod_audio_fork: bug %s already attached!\n", bugname); @@ -88,10 +96,12 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "calling fork_session_init.\n"); if (SWITCH_STATUS_FALSE == fork_session_init(session, responseHandler, read_codec->implementation->actual_samples_per_second, - host, port, path, sampling, sslFlags, channels, bugname, metadata, &pUserData)) { + host, port, path, sampling, sslFlags, channels, bugname, metadata, bidirectional_audio_enable, bidirectional_audio_stream, + bidirectional_audio_sample_rate, &pUserData)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mod_audio_fork session.\n"); return SWITCH_STATUS_FALSE; } + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "adding bug %s.\n", bugname); if ((status = switch_core_media_bug_add(session, bugname, NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { return status; @@ -133,6 +143,16 @@ static switch_status_t do_pauseresume(switch_core_session_t *session, char* bugn return status; } +static switch_status_t stop_play(switch_core_session_t *session, char* bugname) +{ + switch_status_t status = SWITCH_STATUS_SUCCESS; + + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "mod_audio_fork stop_play\n"); + status = fork_session_stop_play(session, bugname); + + return status; +} + static switch_status_t do_graceful_shutdown(switch_core_session_t *session, char* bugname) { switch_status_t status = SWITCH_STATUS_SUCCESS; @@ -159,10 +179,10 @@ static switch_status_t send_text(switch_core_session_t *session, char* bugname, return status; } -#define FORK_API_SYNTAX " [start | stop | send_text | pause | resume | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000 | 32000 | 64000] [bugname] [metadata]" +#define FORK_API_SYNTAX " [start | stop | send_text | pause | resume | graceful-shutdown | stop_play ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000 | 24000 | 32000 | 64000] [bugname] [metadata] [bidirectionalAudio_enabled] [bidirectionalAudio_stream_enabled] [bidirectionalAudio_stream_samplerate]" SWITCH_STANDARD_API(fork_function) { - char *mycmd = NULL, *argv[7] = { 0 }; + char *mycmd = NULL, *argv[10] = { 0 }; int argc = 0; switch_status_t status = SWITCH_STATUS_FALSE; char *bugname = MY_BUG_NAME; @@ -196,6 +216,9 @@ SWITCH_STANDARD_API(fork_function) } status = do_stop(lsession, bugname, text); } + else if (!strcasecmp(argv[1], "stop_play")) { + status = stop_play(lsession, bugname); + } else if (!strcasecmp(argv[1], "pause")) { if (argc > 2) bugname = argv[2]; status = do_pauseresume(lsession, bugname, 1); @@ -230,10 +253,31 @@ SWITCH_STANDARD_API(fork_function) char host[MAX_WS_URL_LEN], path[MAX_PATH_LEN]; unsigned int port; int sslFlags; + int sampling = 8000; - switch_media_bug_flag_t flags = SMBF_READ_STREAM ; + switch_media_bug_flag_t flags = SMBF_READ_STREAM; char *metadata = NULL; - if( argc > 6) { + int bidirectional_audio_enable = 1; + int bidirectional_audio_stream = 0; + int bidirectional_audio_sample_rate = 0; + // Expecting that bidirectional audio params is always received together with bugname and metadata even they are empty string + if (argc > 9) { + if (argv[5][0] != '\0') { + bugname = argv[5]; + } + if (argv[6][0] != '\0') { + metadata = argv[6]; + } + bidirectional_audio_enable = !strcmp(argv[7], "true") ? 1 : 0; + bidirectional_audio_stream = !strcmp(argv[8], "true") ? 1 : 0; + bidirectional_audio_sample_rate = atoi(argv[9]); + + if (bidirectional_audio_enable && + bidirectional_audio_stream && + bidirectional_audio_sample_rate) { + flags |= SMBF_WRITE_REPLACE ; + } + } else if( argc > 6 ) { bugname = argv[5]; metadata = argv[6]; } @@ -241,6 +285,7 @@ SWITCH_STANDARD_API(fork_function) if (argv[5][0] == '{' || argv[5][0] == '[') metadata = argv[5]; else bugname = argv[5]; } + if (0 == strcmp(argv[3], "mixed")) { flags |= SMBF_WRITE_STREAM ; } @@ -268,7 +313,8 @@ SWITCH_STANDARD_API(fork_function) else if (sampling % 8000 != 0) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid sample rate: %s\n", argv[4]); } - status = start_capture(lsession, flags, host, port, path, sampling, sslFlags, bugname, metadata); + status = start_capture(lsession, flags, host, port, path, sampling, sslFlags, + bidirectional_audio_enable, bidirectional_audio_stream, bidirectional_audio_sample_rate, bugname, metadata); } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "unsupported mod_audio_fork cmd: %s\n", argv[1]); diff --git a/mod_audio_fork/mod_audio_fork.h b/mod_audio_fork/mod_audio_fork.h index 6e356ce..9bb8167 100644 --- a/mod_audio_fork/mod_audio_fork.h +++ b/mod_audio_fork/mod_audio_fork.h @@ -52,6 +52,11 @@ struct private_data { int audio_paused:1; int graceful_shutdown:1; char initialMetadata[8192]; + void *circularBuffer; + SpeexResamplerState *bidirectional_audio_resampler; + int bidirectional_audio_enable; + int bidirectional_audio_stream; + int bidirectional_audio_sample_rate; }; typedef struct private_data private_t; diff --git a/mod_audio_fork/vector_math.cpp b/mod_audio_fork/vector_math.cpp new file mode 100644 index 0000000..9086710 --- /dev/null +++ b/mod_audio_fork/vector_math.cpp @@ -0,0 +1,211 @@ +#include "vector_math.h" +#include +#include + +#define GRANULAR_VOLUME_MAX (50) +#define SMAX 32767 +#define SMIN (-32768) +#define normalize_to_16bit_basic(n) if (n > SMAX) n = SMAX; else if (n < SMIN) n = SMIN; +#define normalize_volume_granular(x) if (x > GRANULAR_VOLUME_MAX) x = GRANULAR_VOLUME_MAX; if (x < -GRANULAR_VOLUME_MAX) x = -GRANULAR_VOLUME_MAX; + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined(USE_AVX2) +#include +#pragma message("Using AVX2 SIMD.") +void vector_add(int16_t* a, int16_t* b, size_t len) { + size_t i = 0; + for (; i + 15 < len; i += 16) { + __m256i va = _mm256_loadu_si256((const __m256i*)(a + i)); + __m256i vb = _mm256_loadu_si256((const __m256i*)(b + i)); + __m256i vc = _mm256_add_epi16(va, vb); + _mm256_storeu_si256((__m256i*)(a + i), vc); + } + for (; i < len; ++i) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + __m256i max_val = _mm256_set1_epi16(SMAX); + __m256i min_val = _mm256_set1_epi16(SMIN); + + size_t i = 0; + for (; i + 15 < len; i += 16) { + __m256i values = _mm256_loadu_si256((__m256i*)(a + i)); + __m256i gt_max = _mm256_cmpgt_epi16(values, max_val); + __m256i lt_min = _mm256_cmpgt_epi16(min_val, values); + values = _mm256_blendv_epi8(values, max_val, gt_max); + values = _mm256_blendv_epi8(values, min_val, lt_min); + _mm256_storeu_si256((__m256i*)(a + i), values); + } + + // Process remaining elements + for (; i < len; ++i) { + if (a[i] > SMAX) a[i] = SMAX; + else if (a[i] < SMIN) a[i] = SMIN; + } +} +typedef union { + int16_t* data; + __m256i* fp_avx2; +} vector_data_t; + +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol) { + float newrate = 0; + static const float pos[GRANULAR_VOLUME_MAX] = { + 1.122018, 1.258925, 1.412538, 1.584893, 1.778279, 1.995262, 2.238721, 2.511887, 2.818383, 3.162278, + 3.548134, 3.981072, 4.466835, 5.011872, 5.623413, 6.309574, 7.079458, 7.943282, 8.912509, 10.000000, + 11.220183, 12.589254, 14.125375, 15.848933, 17.782795, 19.952621, 22.387213, 25.118862, 28.183832, 31.622776, + 35.481335, 39.810719, 44.668358, 50.118729, 56.234131, 63.095726, 70.794586, 79.432816, 89.125107, 100.000000, + 112.201836, 125.892517, 141.253784, 158.489334, 177.827942, 199.526215, 223.872070, 251.188705, 281.838318, 316.227753 + }; + static const float neg[GRANULAR_VOLUME_MAX] = { + 0.891251, 0.794328, 0.707946, 0.630957, 0.562341, 0.501187, 0.446684, 0.398107, 0.354813, 0.316228, + 0.281838, 0.251189, 0.223872, 0.199526, 0.177828, 0.158489, 0.141254, 0.125893, 0.112202, 0.100000, + 0.089125, 0.079433, 0.070795, 0.063096, 0.056234, 0.050119, 0.044668, 0.039811, 0.035481, 0.031623, + 0.028184, 0.025119, 0.022387, 0.019953, 0.017783, 0.015849, 0.014125, 0.012589, 0.011220, 0.010000, + 0.008913, 0.007943, 0.007079, 0.006310, 0.005623, 0.005012, 0.004467, 0.003981, 0.003548, 0.000000 // NOTE mapped -50 dB ratio to total silence instead of 0.003162 + }; + const float* chart; + uint32_t i = abs(vol) - 1; + + if (vol == 0) return; + normalize_volume_granular(vol); + + chart = vol > 0 ? pos : neg; + + newrate = chart[i]; + + if (newrate) { + __m256 scale_factor_reg = _mm256_set1_ps(newrate); + uint32_t processed_samples = samples - (samples % 8); // Ensure we process only multiples of 8 + for (uint32_t i = 0; i < processed_samples; i += 8) { + __m128i data_ = _mm_loadu_si128((__m128i*)(data + i)); + __m256i data_32 = _mm256_cvtepi16_epi32(data_); + + __m256 data_float = _mm256_cvtepi32_ps(data_32); + __m256 result = _mm256_mul_ps(data_float, scale_factor_reg); + + __m256i result_32 = _mm256_cvtps_epi32(result); + + // Handle saturation + __m256i min_val = _mm256_set1_epi32(SMIN); + __m256i max_val = _mm256_set1_epi32(SMAX); + result_32 = _mm256_min_epi32(result_32, max_val); + result_32 = _mm256_max_epi32(result_32, min_val); + + __m128i result_16 = _mm_packs_epi32(_mm256_castsi256_si128(result_32), _mm256_extractf128_si256(result_32, 1)); + + _mm_storeu_si128((__m128i*)(data + i), result_16); + } + + // Process any remaining samples + for (uint32_t i = processed_samples; i < samples; i++) { + int32_t tmp = (int32_t)(data[i] * newrate); + tmp = tmp > SMAX ? SMAX : (tmp < SMIN ? SMIN : tmp); + data[i] = (int16_t)tmp; + } + } +} +#elif defined(USE_SSE2) +#include +#pragma message("Using SSE2 SIMD.") +void vector_add(int16_t* a, int16_t* b, size_t len) { + size_t i = 0; + for (; i + 7 < len; i += 8) { + __m128i va = _mm_loadu_si128((const __m128i*)(a + i)); + __m128i vb = _mm_loadu_si128((const __m128i*)(b + i)); + __m128i vc = _mm_add_epi16(va, vb); + _mm_storeu_si128((__m128i*)(a + i), vc); + } + for (; i < len; ++i) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + __m128i max_val = _mm_set1_epi16(SMAX); + __m128i min_val = _mm_set1_epi16(SMIN); + + size_t i = 0; + for (; i + 7 < len; i += 8) { + __m128i values = _mm_loadu_si128((__m128i*)(a + i)); + __m128i gt_max = _mm_cmpgt_epi16(values, max_val); + __m128i lt_min = _mm_cmpgt_epi16(min_val, values); + __m128i max_masked = _mm_and_si128(gt_max, max_val); + __m128i min_masked = _mm_and_si128(lt_min, min_val); + __m128i other_masked = _mm_andnot_si128(_mm_or_si128(gt_max, lt_min), values); + values = _mm_or_si128(_mm_or_si128(max_masked, min_masked), other_masked); + _mm_storeu_si128((__m128i*)(a + i), values); + } + + // Process remaining elements + for (; i < len; ++i) { + if (a[i] > SMAX) a[i] = SMAX; + else if (a[i] < SMIN) a[i] = SMIN; + } +} + +typedef union { + int16_t* data; + __m128i* fp_sse2; +} vector_data_t; + +#else +void vector_add(int16_t* a, int16_t* b, size_t len) { + for (size_t i = 0; i < len; i++) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + for (size_t i = 0; i < len; i++) { + normalize_to_16bit_basic(a[i]); + } +} +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol) { + float newrate = 0; + static const float pos[GRANULAR_VOLUME_MAX] = { + 1.122018, 1.258925, 1.412538, 1.584893, 1.778279, 1.995262, 2.238721, 2.511887, 2.818383, 3.162278, + 3.548134, 3.981072, 4.466835, 5.011872, 5.623413, 6.309574, 7.079458, 7.943282, 8.912509, 10.000000, + 11.220183, 12.589254, 14.125375, 15.848933, 17.782795, 19.952621, 22.387213, 25.118862, 28.183832, 31.622776, + 35.481335, 39.810719, 44.668358, 50.118729, 56.234131, 63.095726, 70.794586, 79.432816, 89.125107, 100.000000, + 112.201836, 125.892517, 141.253784, 158.489334, 177.827942, 199.526215, 223.872070, 251.188705, 281.838318, 316.227753 + }; + static const float neg[GRANULAR_VOLUME_MAX] = { + 0.891251, 0.794328, 0.707946, 0.630957, 0.562341, 0.501187, 0.446684, 0.398107, 0.354813, 0.316228, + 0.281838, 0.251189, 0.223872, 0.199526, 0.177828, 0.158489, 0.141254, 0.125893, 0.112202, 0.100000, + 0.089125, 0.079433, 0.070795, 0.063096, 0.056234, 0.050119, 0.044668, 0.039811, 0.035481, 0.031623, + 0.028184, 0.025119, 0.022387, 0.019953, 0.017783, 0.015849, 0.014125, 0.012589, 0.011220, 0.010000, + 0.008913, 0.007943, 0.007079, 0.006310, 0.005623, 0.005012, 0.004467, 0.003981, 0.003548, 0.000000 // NOTE mapped -50 dB ratio to total silence instead of 0.003162 + }; + const float* chart; + uint32_t i; + + if (vol == 0) return; + + normalize_volume_granular(vol); + + chart = vol > 0 ? pos : neg; + + i = abs(vol) - 1; + assert(i < GRANULAR_VOLUME_MAX); + newrate = chart[i]; + if (newrate) { + int32_t tmp; + uint32_t x; + int16_t *fp = data; + + for (x = 0; x < samples; x++) { + tmp = (int32_t) (fp[x] * newrate); + normalize_to_16bit_basic(tmp); + fp[x] = (int16_t) tmp; + } + } +} + +#endif + +#ifdef __cplusplus +} +#endif diff --git a/mod_audio_fork/vector_math.h b/mod_audio_fork/vector_math.h new file mode 100644 index 0000000..18bc953 --- /dev/null +++ b/mod_audio_fork/vector_math.h @@ -0,0 +1,20 @@ +#ifndef VECTOR_MATH_H +#define VECTOR_MATH_H + +#include +#include + + +#ifdef __cplusplus +extern "C" { +#endif + +void vector_add(int16_t* a, int16_t* b, size_t len); +void vector_normalize(int16_t* a, size_t len); +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol); + +#ifdef __cplusplus +} +#endif + +#endif