diff --git a/mod_dub/Makefile.am b/mod_dub/Makefile.am index 3746c29..21dd41d 100644 --- a/mod_dub/Makefile.am +++ b/mod_dub/Makefile.am @@ -2,9 +2,17 @@ include $(top_srcdir)/build/modmake.rulesam MODNAME=mod_dub mod_LTLIBRARIES = mod_dub.la -mod_dub_la_SOURCES = file_loader.cpp audio_downloader.cpp mod_dub.c dub_glue.cpp +mod_dub_la_SOURCES = ap_file.cpp ap_http.cpp track.cpp dub_glue.cpp tts_vendor_parser.cpp mod_dub.c vector_math.cpp mpg_decode.cpp mod_dub_la_CFLAGS = $(AM_CFLAGS) mod_dub_la_CXXFLAGS = $(AM_CXXFLAGS) -std=c++17 +if USE_AVX2 +mod_dub_la_CXXFLAGS += -mavx2 -DUSE_AVX2 +else +if USE_SSE2 +mod_dub_la_CXXFLAGS += -msse2 -DUSE_SSE2 +endif +endif + mod_dub_la_LIBADD = $(switch_builddir)/libfreeswitch.la mod_dub_la_LDFLAGS = -avoid-version -module -no-undefined -shared `pkg-config --libs boost` -lstdc++ -lmpg123 diff --git a/mod_dub/ap.h b/mod_dub/ap.h new file mode 100644 index 0000000..48182bb --- /dev/null +++ b/mod_dub/ap.h @@ -0,0 +1,41 @@ +#ifndef __AP_H__ +#define __AP_H__ + +#include + +#include "common.h" + +class AudioProducer { +public: + AudioProducer( + std::mutex& mutex, + CircularBuffer_t& circularBuffer, + int sampleRate + ) : _mutex(mutex), _buffer(circularBuffer), _sampleRate(sampleRate), _notified(false), _loop(false), _gain(0) {} + virtual ~AudioProducer() {} + + virtual void notifyDone(bool error, const std::string& errorMsg) { + if (!_notified) { + _notified = true; + if (_callback) _callback(error, errorMsg); + } + } + virtual void start(std::function callback) = 0; + virtual void stop() = 0; + + bool isLoopedAudio() const { return _loop; } + +protected: + std::mutex& _mutex; + CircularBuffer_t& _buffer; + int _sampleRate; + int _gain; + bool _loop; + std::function _callback; + bool _notified; +}; + + + + +#endif diff --git a/mod_dub/ap_file.cpp b/mod_dub/ap_file.cpp new file mode 100644 index 0000000..f508db3 --- /dev/null +++ b/mod_dub/ap_file.cpp @@ -0,0 +1,196 @@ +#include "switch.h" +#include "ap_file.h" +#include "mpg_decode.h" + +#define INIT_BUFFER_SIZE (80000) +#define BUFFER_GROW_SIZE (80000) +#define BUFFER_THROTTLE_LOW (40000) +#define BUFFER_THROTTLE_HIGH (160000) + +bool AudioProducerFile::initialized = false; +boost::asio::io_service AudioProducerFile::io_service; +std::thread AudioProducerFile::worker_thread; + +void AudioProducerFile::threadFunc() { + io_service.reset() ; + boost::asio::io_service::work work(io_service); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "file_loader threadFunc - starting\n"); + + for(;;) { + + try { + io_service.run() ; + break ; + } + catch( std::exception& e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "file_loader threadFunc - Error: %s\n", e.what()); + } + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "file_loader threadFunc - ending\n"); +} + +void AudioProducerFile::_init() { + if (!initialized) { + initialized = true; + if (mpg123_init() != MPG123_OK) { + throw std::runtime_error("AudioProducerFile::AudioProducerFile: failed to initiate MPG123"); + return ; + } + + /* start worker thread */ + std::thread t(threadFunc) ; + worker_thread.swap( t ) ; + } +} + +void AudioProducerFile::_deinit() { + if (initialized) { + initialized = false; + io_service.stop(); + if (worker_thread.joinable()) { + worker_thread.join(); + } + mpg123_exit(); + } +} +AudioProducerFile::AudioProducerFile( + std::mutex& mutex, + CircularBuffer_t& circularBuffer, + int sampleRate +) : AudioProducer(mutex, circularBuffer, sampleRate), _timer(io_service), _mh(nullptr) { + + AudioProducerFile::_init(); +} + +AudioProducerFile::~AudioProducerFile() { + reset(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerFile::~AudioProducerFile %p\n", (void *)this); +} + +void AudioProducerFile::queueFileAudio(const std::string& path, int gain, bool loop) { + _path = path; + _gain = gain; + _loop = loop; + + /* we only handle mp3 or r8 files atm */ + size_t pos = path.find_last_of('.'); + if (pos == std::string::npos) { + throw std::runtime_error("file " + path + " has no extension"); + } + auto filetype = path.substr(pos + 1); + if (0 == filetype.compare("mp3")) _type = FILE_TYPE_MP3; + else if (0 == filetype.compare("r8")) _type = FILE_TYPE_R8; + else throw std::runtime_error("file " + path + " has unsupported extension " + filetype); +} + +void AudioProducerFile::start(std::function callback) { + int mhError = 0; + + _callback = callback; + + /* allocate handle for mpeg decoding */ + _mh = mpg123_new("auto", &mhError); + if (!_mh) { + const char *mhErr = mpg123_plain_strerror(mhError); + throw std::runtime_error("Error allocating mpg123 handle! " + std::string(mhErr)); + } + + if (mpg123_open_feed(_mh) != MPG123_OK) throw std::runtime_error("Error mpg123_open_feed!"); + if (mpg123_format_all(_mh) != MPG123_OK) throw std::runtime_error("Error mpg123_format_all!"); + if (mpg123_param(_mh, MPG123_FORCE_RATE, _sampleRate, 0) != MPG123_OK) throw std::runtime_error("Error forcing resample to 8k!"); + if (mpg123_param(_mh, MPG123_FLAGS, MPG123_MONO_MIX, 0) != MPG123_OK) throw std::runtime_error("Error forcing single channel!"); + + _fp = fopen(_path.c_str(), "rb"); + if (!_fp) throw std::runtime_error("Error opening file " + _path); + + _status = Status_t::STATUS_AWAITING_RESTART; + + /* do the initial read in the worker thread so we don't block here */ + _timer.expires_from_now(boost::posix_time::millisec(1)); + _timer.async_wait(boost::bind(&AudioProducerFile::read_cb, this, boost::placeholders::_1)); +} + +void AudioProducerFile::read_cb(const boost::system::error_code& error) { + if (_status == Status_t::STATUS_STOPPED) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: session gone\n"); + return; + } + if (_status == Status_t::STATUS_AWAITING_RESTART) { + _status = Status_t::STATUS_IN_PROGRESS; + } + if (!error) { + size_t size = _buffer.size(); + if (size < BUFFER_THROTTLE_LOW) { + std::vector pcm_data; + int8_t buf[INIT_BUFFER_SIZE]; + + size_t bytesRead = ::fread(buf, sizeof(int8_t), INIT_BUFFER_SIZE, _fp); + if (bytesRead <= 0) { + if (::feof(_fp)) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %p eof\n", (void *) this); + else if (::ferror(_fp)) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "read_cb: %p error reading file\n", (void *) this); + else switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "read_cb: %p unknown error reading file\n", (void *) this); + _status = Status_t::STATUS_COMPLETE; + return; + } + + if (_type == FileType_t::FILE_TYPE_MP3) pcm_data = convert_mp3_to_linear(_mh, _gain, buf, bytesRead); + else pcm_data = std::vector(reinterpret_cast(buf), reinterpret_cast(buf) + bytesRead / 2); + + { + std::lock_guard lock(_mutex); + + // Resize the buffer if necessary + size_t capacity = _buffer.capacity(); + if (capacity - size < pcm_data.size()) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "write_cb %p growing buffer, size now %ld\n", (void *) this, size); + _buffer.set_capacity(size + std::max(pcm_data.size(), (size_t)BUFFER_GROW_SIZE)); + } + + /* Push the data into the buffer */ + _buffer.insert(_buffer.end(), pcm_data.data(), pcm_data.data() + pcm_data.size()); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %p wrote data, buffer size is now %ld\n", (void *) this, _buffer.size()); + } + + if (bytesRead < INIT_BUFFER_SIZE) { + _status = Status_t::STATUS_COMPLETE; + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u reached end of file, status is %s\n", (void *) this, status2String(_status)); + } + } + + if (_status == Status_t::STATUS_COMPLETE) { + cleanup(Status_t::STATUS_COMPLETE); + } + else { + // read more in 2 seconds + _timer.expires_from_now(boost::posix_time::millisec(2000)); + _timer.async_wait(boost::bind(&AudioProducerFile::read_cb, this, boost::placeholders::_1)); + } + } else { + cleanup(Status_t::STATUS_FAILED, error.message()); + } +} + +void AudioProducerFile::stop() { + cleanup(Status_t::STATUS_STOPPED, ""); +} + +void AudioProducerFile::reset() { + if (_fp) { + fclose(_fp); + _fp = nullptr; + } + if (_mh) { + mpg123_close(_mh); + mpg123_delete(_mh); + _mh = nullptr; + } + _timer.cancel(); + _status = Status_t::STATUS_NONE; +} + +void AudioProducerFile::cleanup(Status_t status, std::string errMsg) { + reset(); + _status = status; + notifyDone(status != Status_t::STATUS_COMPLETE, errMsg); +} \ No newline at end of file diff --git a/mod_dub/ap_file.h b/mod_dub/ap_file.h new file mode 100644 index 0000000..872eafd --- /dev/null +++ b/mod_dub/ap_file.h @@ -0,0 +1,89 @@ +#ifndef __AP_FILE_H__ +#define __AP_FILE_H__ + +#include +#include +#include + +#include + +#include "ap.h" + +class AudioProducerFile : public AudioProducer { +public: + + typedef enum + { + STATUS_NONE = 0, + STATUS_FAILED, + STATUS_IN_PROGRESS, + STATUS_PAUSED, + STATUS_COMPLETE, + STATUS_AWAITING_RESTART, + STATUS_STOPPED + } Status_t; + + typedef enum { + FILE_TYPE_MP3 = 0, + FILE_TYPE_R8 + } FileType_t; + + const char* status2String(Status_t status) + { + static const char* statusStrings[] = { + "STATUS_NONE", + "STATUS_FAILED", + "STATUS_IN_PROGRESS", + "STATUS_PAUSED", + "STATUS_COMPLETE", + "STATUS_AWAITING_RESTART", + "STATUS_STOPPED" + }; + + if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0])) + { + return statusStrings[status]; + } + else + { + return "UNKNOWN_STATUS"; + } + } + + AudioProducerFile( + std::mutex& mutex, + CircularBuffer_t& circularBuffer, + int sampleRate + ); + virtual ~AudioProducerFile(); + virtual void start(std::function callback); + virtual void stop(); + void cleanup(Status_t status, std::string errMsg = ""); + void reset(); + + void queueFileAudio(const std::string& path, int gain = 0, bool loop = false); + + void read_cb(const boost::system::error_code& error); + + static bool initialized; + static std::thread worker_thread; + static boost::asio::io_service io_service; + static void threadFunc(); + +private: + + static void _init(); + static void _deinit(); + + void stop_file_load(); + + std::string _path; + Status_t _status; + FileType_t _type; + mpg123_handle *_mh; + boost::asio::deadline_timer _timer; + FILE* _fp; + +}; + +#endif \ No newline at end of file diff --git a/mod_dub/ap_http.cpp b/mod_dub/ap_http.cpp new file mode 100644 index 0000000..357aadf --- /dev/null +++ b/mod_dub/ap_http.cpp @@ -0,0 +1,660 @@ +#include +#include +#include "switch.h" +#include "ap_http.h" +#include "mpg_decode.h" + +#include +#include +#include +#include +#include +#include + +#define BUFFER_GROW_SIZE (80000) +#define BUFFER_THROTTLE_LOW (40000) +#define BUFFER_THROTTLE_HIGH (160000) + +bool AudioProducerHttp::initialized = false; +boost::asio::io_service AudioProducerHttp::io_service; +std::thread AudioProducerHttp::worker_thread; +GlobalInfo_t AudioProducerHttp::global; +std::map AudioProducerHttp::socket_map; +boost::asio::deadline_timer AudioProducerHttp::multi_timer(AudioProducerHttp::io_service); + +void AudioProducerHttp::threadFunc() { + /* to make sure the event loop doesn't terminate when there is no work to do */ + io_service.reset() ; + boost::asio::io_service::work work(io_service); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "ap_http threadFunc - starting\n"); + + for(;;) { + + try { + io_service.run() ; + break ; + } + catch( std::exception& e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "ap_http threadFunc - Error: %s\n", e.what()); + } + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "ap_http threadFunc - ending\n"); +} + +void AudioProducerHttp::_init() { + if (!initialized) { + initialized = true; + if (mpg123_init() != MPG123_OK) { + throw std::runtime_error("AudioProducerFile::AudioProducerFile: failed to initiate MPG123"); + return ; + } + + memset(&global, 0, sizeof(GlobalInfo_t)); + global.multi = curl_multi_init(); + if (!global.multi) { + throw std::runtime_error("AudioProducerHttp::_init: failed to initiate CURL multi"); + return ; + } + + curl_multi_setopt(global.multi, CURLMOPT_SOCKETFUNCTION, &AudioProducerHttp::sock_cb); + curl_multi_setopt(global.multi, CURLMOPT_SOCKETDATA, &global); + curl_multi_setopt(global.multi, CURLMOPT_TIMERFUNCTION, &AudioProducerHttp::multi_timer_cb); + curl_multi_setopt(global.multi, CURLMOPT_TIMERDATA, &global); + curl_multi_setopt(global.multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); + + /* start worker thread */ + std::thread t(threadFunc) ; + worker_thread.swap( t ) ; + } +} + +void AudioProducerHttp::_deinit() { + if (initialized) { + initialized = false; + io_service.stop(); + if (worker_thread.joinable()) { + worker_thread.join(); + } + /* cleanup curl multi handle*/ + curl_multi_cleanup(global.multi); + + mpg123_exit(); + } +} + +AudioProducerHttp::AudioProducerHttp( + std::mutex& mutex, + CircularBuffer_t& circularBuffer, + int sampleRate +) : AudioProducer(mutex, circularBuffer, sampleRate), _status(Status_t::STATUS_NONE), _mh(nullptr), _easy(nullptr), + _error{0}, _response_code(0), _timer(io_service) { + + AudioProducerHttp::_init(); +} + +AudioProducerHttp::~AudioProducerHttp() { + reset(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerFile::~AudioProducerFile %p\n", (void *)this); +} + +void AudioProducerHttp::start(std::function callback) { + int mhError = 0; + + _callback = callback; + memset(_error, 0, sizeof(_error)); + + /* allocate handle for mpeg decoding */ + _mh = mpg123_new("auto", &mhError); + if (!_mh) { + const char *mhErr = mpg123_plain_strerror(mhError); + throw std::runtime_error("Error allocating mpg123 handle! " + std::string(mhErr)); + } + + if (mpg123_open_feed(_mh) != MPG123_OK) throw std::runtime_error("Error mpg123_open_feed!\n"); + if (mpg123_format_all(_mh) != MPG123_OK) throw std::runtime_error("Error mpg123_format_all!\n"); + if (mpg123_param(_mh, MPG123_FORCE_RATE, _sampleRate, 0) != MPG123_OK) throw std::runtime_error("Error forcing resample to 8k!\n"); + if (mpg123_param(_mh, MPG123_FLAGS, MPG123_MONO_MIX, 0) != MPG123_OK) throw std::runtime_error("Error forcing single channel!\n"); + + _easy = createEasyHandle(); + if (!_easy) throw std::runtime_error("Error creating easy handle!\n"); + + curl_easy_setopt(_easy, CURLOPT_WRITEFUNCTION, &AudioProducerHttp::static_write_cb); + curl_easy_setopt(_easy, CURLOPT_URL, _url.c_str()); + curl_easy_setopt(_easy, CURLOPT_HTTPGET, _method == HttpMethod_t::HTTP_METHOD_GET ? 1L : 0L); + curl_easy_setopt(_easy, CURLOPT_WRITEDATA, this); + curl_easy_setopt(_easy, CURLOPT_ERRORBUFFER, _error); + curl_easy_setopt(_easy, CURLOPT_PRIVATE, this); + curl_easy_setopt(_easy, CURLOPT_VERBOSE, 0L); + curl_easy_setopt(_easy, CURLOPT_NOPROGRESS, 1L); + curl_easy_setopt(_easy, CURLOPT_HEADERFUNCTION, &AudioProducerHttp::static_header_callback); + curl_easy_setopt(_easy, CURLOPT_HEADERDATA, this); + + /* call this function to get a socket */ + curl_easy_setopt(_easy, CURLOPT_OPENSOCKETFUNCTION, &AudioProducerHttp::open_socket); + + /* call this function to close a socket */ + curl_easy_setopt(_easy, CURLOPT_CLOSESOCKETFUNCTION, &AudioProducerHttp::close_socket); + + curl_easy_setopt(_easy, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_0); + + /* keep the speed down so we don't have to buffer large amounts*/ + curl_easy_setopt(_easy, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)31415); + /*Add request body*/ + if (!_body.empty()) curl_easy_setopt(_easy, CURLOPT_POSTFIELDS, _body.c_str()); + + /*Add request headers*/ + struct curl_slist *hdr_list = nullptr; + for(const auto& header : _headers) { + hdr_list = curl_slist_append(hdr_list, header.c_str()); + } + if (hdr_list) curl_easy_setopt(_easy, CURLOPT_HTTPHEADER, hdr_list); + + _status = Status_t::STATUS_AWAITING_RESTART; + + _timer.expires_from_now(boost::posix_time::millisec(1)); + _timer.async_wait(boost::bind(&AudioProducerHttp::addCurlHandle, this, boost::placeholders::_1)); +} +void AudioProducerHttp::queueHttpGetAudio(const std::string& url, int gain, bool loop) { + _method = HttpMethod_t::HTTP_METHOD_GET; + _url = url; + _gain = gain; + _loop = loop; +} +void AudioProducerHttp::queueHttpPostAudio(const std::string& url, int gain, bool loop) { + _method = HttpMethod_t::HTTP_METHOD_POST; + _url = url; + _gain = gain; + _loop = loop; +} +void AudioProducerHttp::queueHttpPostAudio(const std::string& url, const std::string& body, std::vector& headers, int gain, bool loop) { + _method = HttpMethod_t::HTTP_METHOD_POST; + _url = url; + _body = body; + _headers = headers; + _gain = gain; + _loop = loop; +} + +void AudioProducerHttp::addCurlHandle(const boost::system::error_code& error) { + if (_status == Status_t::STATUS_AWAITING_RESTART) { + auto rc = curl_multi_add_handle(global.multi, _easy); + if (mcode_test("new_conn: curl_multi_add_handle", rc) < 0) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "AudioProducerHttp::addCurlHandle: Error adding easy handle to multi handle\n"); + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerHttp::addCurlHandle retrieving from %s\n", _url.c_str()); + } + } +} + +CURL* AudioProducerHttp::createEasyHandle(void) { + CURL* easy = curl_easy_init(); + if(!easy) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "curl_easy_init() failed!\n"); + return nullptr ; + } + + curl_easy_setopt(easy, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(easy, CURLOPT_USERAGENT, "jambonz/0.8.5"); + + // set connect timeout to 3 seconds and no total timeout as files could be large + curl_easy_setopt(easy, CURLOPT_CONNECTTIMEOUT_MS, 3000L); + curl_easy_setopt(easy, CURLOPT_TIMEOUT, 0L); // no timeout + + return easy ; +} + +size_t AudioProducerHttp::static_write_cb(char* ptr, size_t size, size_t nmemb, void* userdata) { + return static_cast(userdata)->write_cb(ptr, size, nmemb); +} + +size_t AudioProducerHttp::write_cb(void *ptr, size_t size, size_t nmemb) { + int8_t *data = (int8_t *) ptr; + size_t bytes_received = size * nmemb; + std::vector pcm_data; + if (_status == Status_t::STATUS_STOPPING || _status == Status_t::STATUS_STOPPED) { + _timer.cancel(); + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, + // "AudioProducerHttp::write_cb: aborting transfer, status %s, mutex %p, buffer %p\n", status2String(_status)); + /* this will abort the transfer */ + return 0; + } + if (_response_code > 0 && _response_code != 200) { + std::string body((char *) ptr, bytes_received); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "AudioProducerHttp::write_cb: received body %s\n", body.c_str()); + _err_msg = body; + _status = Status_t::STATUS_FAILED; + return 0; + } + + /* throttle after reaching high water mark */ + size_t bufSize = _buffer.size(); + if (bufSize > BUFFER_THROTTLE_HIGH) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerHttp::write_cb: throttling download, buffer size is %ld\n", _buffer.size()); + + // check back in 2 seconds + _timer.expires_from_now(boost::posix_time::millisec(2000)); + _timer.async_wait(boost::bind(&AudioProducerHttp::throttling_cb, this, boost::placeholders::_1)); + + _status = Status_t::STATUS_DOWNLOAD_PAUSED; + return CURL_WRITEFUNC_PAUSE; + } + + pcm_data = convert_mp3_to_linear(_mh, _gain, data, bytes_received); + size_t samples = pcm_data.size(); + std::lock_guard lock(_mutex); + + // Resize the buffer if necessary + if (_buffer.capacity() - bufSize < samples) { + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerHttp::write_cb growing buffer, size now %ld\n", _buffer.size()); + + //TODO: if buffer exceeds some max size, return CURL_WRITEFUNC_ERROR to abort the transfer + _buffer.set_capacity(_buffer.size() + std::max(samples, (size_t)BUFFER_GROW_SIZE)); + } + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AudioProducerHttp::write_cb: writing %ld samples to buffer\n", pcm_data.size()); + + /* Push the data into the buffer */ + _buffer.insert(_buffer.end(), pcm_data.data(), pcm_data.data() + pcm_data.size()); + return bytes_received; + +} + +size_t AudioProducerHttp::static_header_callback(char *buffer, size_t size, size_t nitems, void* userdata) { + return static_cast(userdata)->header_callback(buffer, size, nitems); +} + +size_t AudioProducerHttp::header_callback(char *buffer, size_t size, size_t nitems) { + size_t bytes_received = size * nitems; + const std::string prefix = "HTTP/"; + std::string header, value; + std::string input(buffer, bytes_received); + if (parseHeader(input, header, value)) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "header_callback: %s with value %s\n", header.c_str(), value.c_str()); + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "header_callback: %s\n", input.c_str()); + if (input.rfind(prefix, 0) == 0) { + try { + _response_code = extract_response_code(input); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "header_callback: parsed response code: %ld\n", _response_code); + } catch (const std::invalid_argument& e) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "header_callback: invalid response code %s\n", input.substr(prefix.length()).c_str()); + } + } + } + return bytes_received; +} + +void AudioProducerHttp::throttling_cb(const boost::system::error_code& error) { + if (_status == Status_t::STATUS_STOPPING) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: session gone, resume download so we can complete\n"); + curl_easy_pause(_easy, CURLPAUSE_CONT); + return; + } + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: status is %s\n", status2String(_status)); + + if (!error) { + auto size = _buffer.size(); + + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: size is now %ld\n", size); + if (size < BUFFER_THROTTLE_LOW) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: resuming download\n"); + curl_easy_pause(_easy, CURLPAUSE_CONT); + return; + } + + // check back in 2 seconds + _timer.expires_from_now(boost::posix_time::millisec(2000)); + _timer.async_wait(boost::bind(&AudioProducerHttp::throttling_cb, this, boost::placeholders::_1)); + + } else if (125 == error.value()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: timer canceled\n"); + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "throttling_cb: error (%d): %s\n", error.value(), error.message().c_str()); + } +} + +int AudioProducerHttp::sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) { + GlobalInfo_t *g = &global; + + int *actionp = (int *) sockp; + static const char *whatstr[] = { "none", "IN", "OUT", "INOUT", "REMOVE"}; + + if(what == CURL_POLL_REMOVE) { + *actionp = what; + } + else { + if(!actionp) { + addsock(s, e, what, g); + } + else { + setsock(actionp, s, e, what, *actionp, g); + } + } + return 0; +} + +void AudioProducerHttp::timer_cb(const boost::system::error_code & error, GlobalInfo_t *g) +{ + //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timer_cb\n"); + + if(!error) { + CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running); + mcode_test("timer_cb: curl_multi_socket_action", rc); + check_multi_info(g); + } +} + +int AudioProducerHttp::multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo_t *g) { + + /* cancel running timer */ + multi_timer.cancel(); + + if(timeout_ms >= 0) { + // from libcurl 7.88.1-10+deb12u4 does not allow call curl_multi_socket_action or curl_multi_perform in curl_multi callback directly + multi_timer.expires_from_now(boost::posix_time::millisec(timeout_ms ? timeout_ms : 1)); + multi_timer.async_wait(boost::bind(&timer_cb, boost::placeholders::_1, g)); + } + + return 0; +} + +curl_socket_t AudioProducerHttp::open_socket(void *clientp, curlsocktype purpose, struct curl_sockaddr *address) { + curl_socket_t sockfd = CURL_SOCKET_BAD; + + /* restrict to IPv4 */ + if(purpose == CURLSOCKTYPE_IPCXN && address->family == AF_INET) { + /* create a tcp socket object */ + boost::asio::ip::tcp::socket *tcp_socket = new boost::asio::ip::tcp::socket(io_service); + + /* open it and get the native handle*/ + boost::system::error_code ec; + tcp_socket->open(boost::asio::ip::tcp::v4(), ec); + + if(ec) { + /* An error occurred */ + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't open socket [%ld][%s]\n", ec, ec.message().c_str()); + } + else { + sockfd = tcp_socket->native_handle(); + + /* save it for monitoring */ + socket_map.insert(std::pair(sockfd, tcp_socket)); + } + } + return sockfd; +} + +int AudioProducerHttp::close_socket(void *clientp, curl_socket_t item) { + std::map::iterator it = socket_map.find(item); + if(it != socket_map.end()) { + delete it->second; + socket_map.erase(it); + } + return 0; +} + +int AudioProducerHttp::mcode_test(const char *where, CURLMcode code) { + if(CURLM_OK != code) { + const char *s; + switch(code) { + case CURLM_CALL_MULTI_PERFORM: + s = "CURLM_CALL_MULTI_PERFORM"; + break; + case CURLM_BAD_HANDLE: + s = "CURLM_BAD_HANDLE"; + break; + case CURLM_BAD_EASY_HANDLE: + s = "CURLM_BAD_EASY_HANDLE"; + break; + case CURLM_OUT_OF_MEMORY: + s = "CURLM_OUT_OF_MEMORY"; + break; + case CURLM_INTERNAL_ERROR: + s = "CURLM_INTERNAL_ERROR"; + break; + case CURLM_UNKNOWN_OPTION: + s = "CURLM_UNKNOWN_OPTION"; + break; + case CURLM_LAST: + s = "CURLM_LAST"; + break; + default: + s = "CURLM_unknown"; + break; + case CURLM_BAD_SOCKET: + s = "CURLM_BAD_SOCKET"; + break; + } + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "mcode_test ERROR: %s returns %s:%d\n", where, s, code); + + return -1; + } + return 0 ; +} + +bool AudioProducerHttp::parseHeader(const std::string& str, std::string& header, std::string& value) { + std::vector parts; + boost::split(parts, str, boost::is_any_of(":"), boost::token_compress_on); + + if (parts.size() != 2) + return false; + + header = boost::trim_copy(parts[0]); + value = boost::trim_copy(parts[1]); + return true; +} + +int AudioProducerHttp::extract_response_code(const std::string& input) { + std::size_t space_pos = input.find(' '); + if (space_pos == std::string::npos) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid HTTP response format %s\n", input.c_str()); + return 0; + } + + std::size_t code_start_pos = space_pos + 1; + std::size_t code_end_pos = input.find(' ', code_start_pos); + if (code_end_pos == std::string::npos) { + code_end_pos = input.length(); + } + + std::string code_str = input.substr(code_start_pos, code_end_pos - code_start_pos); + int response_code = std::stoi(code_str); + return response_code; +} + +void AudioProducerHttp::setsock(int *fdp, curl_socket_t s, CURL *e, int act, int oldact, GlobalInfo_t *g) { + std::map::iterator it = socket_map.find(s); + + if(it == socket_map.end()) { + return; + } + + boost::asio::ip::tcp::socket * tcp_socket = it->second; + + *fdp = act; + + if(act == CURL_POLL_IN) { + if(oldact != CURL_POLL_IN && oldact != CURL_POLL_INOUT) { + tcp_socket->async_read_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + CURL_POLL_IN, boost::placeholders::_1, fdp)); + } + } + else if(act == CURL_POLL_OUT) { + if(oldact != CURL_POLL_OUT && oldact != CURL_POLL_INOUT) { + tcp_socket->async_write_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + CURL_POLL_OUT, boost::placeholders::_1, fdp)); + } + } + else if(act == CURL_POLL_INOUT) { + if(oldact != CURL_POLL_IN && oldact != CURL_POLL_INOUT) { + tcp_socket->async_read_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + CURL_POLL_IN, boost::placeholders::_1, fdp)); + } + if(oldact != CURL_POLL_OUT && oldact != CURL_POLL_INOUT) { + tcp_socket->async_write_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + CURL_POLL_OUT, boost::placeholders::_1, fdp)); + } + } +} + +/* Called by asio when there is an action on a socket */ +void AudioProducerHttp::event_cb(GlobalInfo_t *g, curl_socket_t s, int action, const boost::system::error_code & error, int *fdp) { + int f = *fdp; + + + // Socket already POOL REMOVED. + if (f == CURL_POLL_REMOVE) { + remsock(fdp, g); + return; + } + + if(socket_map.find(s) == socket_map.end()) { + return; + } + + /* make sure the event matches what are wanted */ + if(f == action || f == CURL_POLL_INOUT) { + if(error) { + action = CURL_CSELECT_ERR; + } + CURLMcode rc = curl_multi_socket_action(g->multi, s, action, &g->still_running); + + mcode_test("event_cb: curl_multi_socket_action", rc); + check_multi_info(g); + + if(g->still_running <= 0) { + multi_timer.cancel(); + } + + /* keep on watching. + * the socket may have been closed and/or fdp may have been changed + * in curl_multi_socket_action(), so check them both */ + if(!error && socket_map.find(s) != socket_map.end() && + (f == action || f == CURL_POLL_INOUT)) { + boost::asio::ip::tcp::socket *tcp_socket = socket_map.find(s)->second; + + if(action == CURL_POLL_IN) { + tcp_socket->async_read_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + action, boost::placeholders::_1, fdp)); + } + if(action == CURL_POLL_OUT) { + tcp_socket->async_write_some(boost::asio::null_buffers(), + boost::bind(&event_cb, g, s, + action, boost::placeholders::_1, fdp)); + } + } + } +} + +void AudioProducerHttp::remsock(int *f, GlobalInfo_t *g) { + if(f) { + free(f); + f = NULL; + } +} + +void AudioProducerHttp::addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo_t *g) { + /* fdp is used to store current action */ + int *fdp = (int *) calloc(sizeof(int), 1); + + setsock(fdp, s, easy, action, 0, g); + curl_multi_assign(g->multi, s, fdp); +} + +/* Check for completed transfers, and remove their easy handles */ +void AudioProducerHttp::check_multi_info(GlobalInfo_t *g) { + CURLMsg *msg; + int msgs_left; + AudioProducerHttp *ap; + CURL *easy; + CURLcode res; + + while((msg = curl_multi_info_read(g->multi, &msgs_left))) { + if(msg->msg == CURLMSG_DONE) { + long response_code; + double namelookup=0, connect=0, total=0 ; + char *ct = NULL ; + + easy = msg->easy_handle; + res = msg->data.result; + curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ap); + curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &response_code); + curl_easy_getinfo(easy, CURLINFO_CONTENT_TYPE, &ct); + + curl_easy_getinfo(easy, CURLINFO_NAMELOOKUP_TIME, &namelookup); + curl_easy_getinfo(easy, CURLINFO_CONNECT_TIME, &connect); + curl_easy_getinfo(easy, CURLINFO_TOTAL_TIME, &total); + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "curl done, response code %d, status %s\n", response_code, status2String(ap->getStatus())); + + bool restart = ap->isLoopedAudio() && ap->getStatus() != Status_t::STATUS_STOPPING && response_code == 200; + if (restart) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "restarting looped audio\n"); + ap->getTimer().expires_from_now(boost::posix_time::millisec(1000)); + ap->getTimer().async_wait(boost::bind(&AudioProducerHttp::static_restart_cb, boost::placeholders::_1, ap)); + } + else { + ap->cleanup(Status_t::STATUS_DOWNLOAD_COMPLETE, (int) response_code); + } + } + } +} + +void AudioProducerHttp::static_restart_cb(const boost::system::error_code& error, void* userdata) { + static_cast(userdata)->restart_cb(error); +} + +void AudioProducerHttp::restart_cb(const boost::system::error_code& error) { + if (_status == Status_t::STATUS_STOPPING) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "restart_cb: session gone\n"); + return; + } + if (!error) { + reset(); + start(_callback); + } + else if (error.value() == boost::asio::error::operation_aborted) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "restart_cb: %s, cancelling retrieve\n", error.message().c_str()); + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "restart_cb: error (%d): %s\n", error.value(), error.message().c_str()); + } +} + + +void AudioProducerHttp::stop() { + cleanup(Status_t::STATUS_STOPPED, 0); + return; +} + +void AudioProducerHttp::reset() { + if (_easy) { + curl_multi_remove_handle(global.multi, _easy); + curl_easy_cleanup(_easy); + _easy = nullptr; + } + if (_mh) { + mpg123_close(_mh); + mpg123_delete(_mh); + _mh = nullptr; + } + _err_msg.clear(); + _response_code = 0; + _timer.cancel(); + _status = Status_t::STATUS_NONE; +} + +void AudioProducerHttp::cleanup(Status_t status, int response_code) { + std::string errMsg = response_code > 200 ? "http response: " + std::to_string(response_code) : ""; + reset(); + _status = status; + notifyDone(!errMsg.empty(), errMsg); +} \ No newline at end of file diff --git a/mod_dub/ap_http.h b/mod_dub/ap_http.h new file mode 100644 index 0000000..45cde7d --- /dev/null +++ b/mod_dub/ap_http.h @@ -0,0 +1,134 @@ +#ifndef __AP_HTTP_H__ +#define __AP_HTTP_H__ + +#include "ap.h" +#include +#include +#include + + +typedef struct +{ + CURLM *multi; + int still_running; +} GlobalInfo_t; + +class AudioProducerHttp : public AudioProducer { +public: + + typedef enum + { + STATUS_NONE = 0, + STATUS_FAILED, + STATUS_DOWNLOAD_IN_PROGRESS, + STATUS_DOWNLOAD_PAUSED, + STATUS_DOWNLOAD_COMPLETE, + STATUS_AWAITING_RESTART, + STATUS_STOPPING, + STATUS_STOPPED + } Status_t; + + typedef enum { + HTTP_METHOD_GET = 0, + HTTP_METHOD_POST + } HttpMethod_t; + + static const char* status2String(Status_t status) { + static const char* statusStrings[] = { + "STATUS_NONE", + "STATUS_FAILED", + "STATUS_DOWNLOAD_IN_PROGRESS", + "STATUS_DOWNLOAD_PAUSED", + "STATUS_DOWNLOAD_COMPLETE", + "STATUS_AWAITING_RESTART", + "STATUS_STOPPED" + }; + + if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0])) + { + return statusStrings[status]; + } + else + { + return "UNKNOWN_STATUS"; + } + } + AudioProducerHttp( + std::mutex& mutex, + CircularBuffer_t& circularBuffer, + int sampleRate + ); + virtual ~AudioProducerHttp(); + + virtual void start(std::function callback); + void addCurlHandle(const boost::system::error_code& error); + virtual void stop(); + void cleanup(Status_t status, int response_code); + void reset(); + + void queueHttpGetAudio(const std::string& url, int gain = 0, bool loop = false); + void queueHttpPostAudio(const std::string& url, int gain = 0, bool loop = false); + void queueHttpPostAudio(const std::string& url, const std::string& body, std::vector& headers, int gain = 0, bool loop = false); + + Status_t getStatus() const { return _status; } + void setStatus(Status_t status) { _status = status; } + + boost::asio::deadline_timer& getTimer() { return _timer; } + + static bool initialized; + static std::thread worker_thread; + static boost::asio::io_service io_service; + static void threadFunc(); + static std::map socket_map; + static boost::asio::deadline_timer multi_timer; + + + static GlobalInfo_t global; + + static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp); + static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo_t *g); + static void timer_cb(const boost::system::error_code & error, GlobalInfo_t *g); + static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo_t *g); + static void setsock(int *fdp, curl_socket_t s, CURL *e, int act, int oldact, GlobalInfo_t *g); + static int mcode_test(const char *where, CURLMcode code); + static void check_multi_info(GlobalInfo_t *g); + static void event_cb(GlobalInfo_t *g, curl_socket_t s, int action, const boost::system::error_code & error, int *fdp); + static void remsock(int *f, GlobalInfo_t *g); + + static size_t static_write_cb(char* ptr, size_t size, size_t nmemb, void* userdata); + size_t write_cb(void *ptr, size_t size, size_t nmemb); + + static size_t static_header_callback(char *buffer, size_t size, size_t nitems, void* userdata); + size_t header_callback(char *buffer, size_t size, size_t nitems); + + void throttling_cb(const boost::system::error_code& error); + + static void static_restart_cb(const boost::system::error_code& error, void* userdata); + void restart_cb(const boost::system::error_code& error); + + static int close_socket(void *clientp, curl_socket_t item); + static curl_socket_t open_socket(void *clientp, curlsocktype purpose, struct curl_sockaddr *address); + +private: + + static void _init(); + static void _deinit(); + + CURL* createEasyHandle(); + bool parseHeader(const std::string& str, std::string& header, std::string& value); + int extract_response_code(const std::string& input); + + HttpMethod_t _method; + std::string _url; + std::string _body; + std::vector _headers; + Status_t _status; + mpg123_handle *_mh; + CURL *_easy; + char _error[CURL_ERROR_SIZE]; // curl error buffer + std::string _err_msg; + int _response_code; + boost::asio::deadline_timer _timer; +}; + +#endif \ No newline at end of file diff --git a/mod_dub/audio_downloader.cpp b/mod_dub/audio_downloader.cpp deleted file mode 100644 index df04e20..0000000 --- a/mod_dub/audio_downloader.cpp +++ /dev/null @@ -1,885 +0,0 @@ -#include "audio_downloader.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include - -#include - -#define BUFFER_GROW_SIZE (80000) -#define BUFFER_THROTTLE_LOW (40000) -#define BUFFER_THROTTLE_HIGH (160000) - -static uint16_t currDownloadId = 0; - -typedef struct -{ - CURLM *multi; - int still_running; -} GlobalInfo_t; -static GlobalInfo_t global; - -typedef enum -{ - STATUS_NONE = 0, - STATUS_FAILED, - STATUS_DOWNLOAD_IN_PROGRESS, - STATUS_DOWNLOAD_PAUSED, - STATUS_DOWNLOAD_COMPLETE, - STATUS_AWAITING_RESTART, - STATUS_STOPPING, - STATUS_STOPPED -} Status_t; - -static const char* status2String(Status_t status) -{ - static const char* statusStrings[] = { - "STATUS_NONE", - "STATUS_FAILED", - "STATUS_DOWNLOAD_IN_PROGRESS", - "STATUS_DOWNLOAD_PAUSED", - "STATUS_DOWNLOAD_COMPLETE", - "STATUS_AWAITING_RESTART", - "STATUS_STOPPING", - "STATUS_STOPPED" - }; - - if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0])) - { - return statusStrings[status]; - } - else - { - return "UNKNOWN_STATUS"; - } -} - -typedef struct -{ - GlobalInfo_t *global; - CURL *easy; - switch_mutex_t* mutex; - CircularBuffer_t* buffer; - mpg123_handle *mh; - char error[CURL_ERROR_SIZE]; // curl error buffer - char *err_msg; // http server error message - char* url; - bool loop; - int rate; - boost::asio::deadline_timer *timer; - Status_t status; - downloadId_t id; - int response_code; - int gain; -} ConnInfo_t; - -typedef std::map Id2ConnMap_t; -static Id2ConnMap_t id2ConnMap; - -static boost::object_pool pool ; -static std::map socket_map; -static boost::asio::io_service io_service; -static boost::asio::deadline_timer timer(io_service); -static std::string fullDirPath; -static std::thread worker_thread; - -/* forward declarations */ -static ConnInfo_t* createDownloader(const char *url, int rate, int loop, int gain, mpg123_handle *mhm, switch_mutex_t *mutex, CircularBuffer_t *buffer); -static CURL* createEasyHandle(void); -static void destroyConnection(ConnInfo_t *conn); -static void check_multi_info(GlobalInfo_t *g) ; -static int mcode_test(const char *where, CURLMcode code); -static void event_cb(GlobalInfo_t *g, curl_socket_t s, int action, const boost::system::error_code & error, int *fdp); -static void setsock(int *fdp, curl_socket_t s, CURL *e, int act, int oldact, GlobalInfo_t *g); -static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo_t *g); -static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp); -static void threadFunc(); -static void timer_cb(const boost::system::error_code & error, GlobalInfo_t *g); -static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo_t *g); -static std::vector convert_mp3_to_linear(ConnInfo_t *conn, int8_t *data, size_t len); -static void throttling_cb(const boost::system::error_code& error, ConnInfo_t* conn) ; -static void restart_cb(const boost::system::error_code& error, ConnInfo_t* conn); -static size_t write_cb(void *ptr, size_t size, size_t nmemb, ConnInfo_t *conn); -static bool parseHeader(const std::string& str, std::string& header, std::string& value) ; -static int extract_response_code(const std::string& input) ; -static size_t header_callback(char *buffer, size_t size, size_t nitems, ConnInfo_t *conn); -static curl_socket_t opensocket(void *clientp, curlsocktype purpose, struct curl_sockaddr *address); -static int close_socket(void *clientp, curl_socket_t item); - -/* apis */ -extern "C" { - - switch_status_t init_audio_downloader() { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "init_audio_downloader loading..\n"); - memset(&global, 0, sizeof(GlobalInfo_t)); - global.multi = curl_multi_init(); - - if (!global.multi) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "init_audio_downloader curl_multi_init() failed, exiting!\n"); - return SWITCH_STATUS_FALSE; - } - - curl_multi_setopt(global.multi, CURLMOPT_SOCKETFUNCTION, sock_cb); - curl_multi_setopt(global.multi, CURLMOPT_SOCKETDATA, &global); - curl_multi_setopt(global.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb); - curl_multi_setopt(global.multi, CURLMOPT_TIMERDATA, &global); - curl_multi_setopt(global.multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); - - if (mpg123_init() != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "init_audio_downloader: failed to initiate MPG123"); - return SWITCH_STATUS_FALSE; - } - - /* start worker thread */ - std::thread t(threadFunc) ; - worker_thread.swap( t ) ; - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "init_audio_downloader: loaded\n"); - - return SWITCH_STATUS_SUCCESS; - - } - - switch_status_t deinit_audio_downloader() { - /* stop the ASIO IO service */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_audio_downloader: stopping io service\n"); - io_service.stop(); - - /* Join the worker thread */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_audio_downloader: wait for worker thread to complete\n"); - if (worker_thread.joinable()) { - worker_thread.join(); - } - - /* cleanup curl multi handle*/ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_audio_downloader: release curl multi\n"); - curl_multi_cleanup(global.multi); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_audio_downloader: completed\n"); - - mpg123_exit(); - - return SWITCH_STATUS_SUCCESS; - } - - downloadId_t start_audio_download(const char* url, int rate, int loop, int gain, switch_mutex_t* mutex, CircularBuffer_t* buffer) { - int mhError = 0; - - /* allocate handle for mpeg decoding */ - mpg123_handle *mh = mpg123_new("auto", &mhError); - if (!mh) { - const char *mhErr = mpg123_plain_strerror(mhError); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error allocating mpg123 handle! %s\n", switch_str_nil(mhErr)); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_open_feed(mh) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error mpg123_open_feed!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_format_all(mh) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error mpg123_format_all!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_param(mh, MPG123_FORCE_RATE, rate, 0) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error forcing resample to 8k!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_param(mh, MPG123_FLAGS, MPG123_MONO_MIX, 0) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error forcing single channel!\n"); - return INVALID_DOWNLOAD_ID; - } - - ConnInfo_t* conn = createDownloader(url, rate, loop, gain, mh, mutex, buffer); - if (!conn) { - return INVALID_DOWNLOAD_ID; - } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, - "start_audio_download: starting download %d\n", conn->id); - - - return conn->id; - } - - switch_status_t stop_audio_download(int id) { - auto it = id2ConnMap.find(id); - if (it == id2ConnMap.end()) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stop_audio_download: id %d has already completed\n", id); - return SWITCH_STATUS_FALSE; - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, - "stop_audio_download: stopping download %d, status %s\n", id, status2String(it->second->status)); - - ConnInfo_t *conn = it->second; - auto status = conn->status; - - /* past this point I shall not access either the mutex or the buffer provided */ - conn->mutex = nullptr; - conn->buffer = nullptr; - - /* if download is in progress set status to cancel it during next call back */ - if (status == Status_t::STATUS_DOWNLOAD_PAUSED) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stop_audio_download: resuming download %d so we can cancel it\n", id); - conn->status = Status_t::STATUS_STOPPING; - curl_easy_pause(conn->easy, CURLPAUSE_CONT); - } - if (status != Status_t::STATUS_DOWNLOAD_IN_PROGRESS) { - destroyConnection(conn); - } - conn->status = Status_t::STATUS_STOPPING; - return SWITCH_STATUS_SUCCESS; - } -} - -/* internal */ -ConnInfo_t* createDownloader(const char *url, int rate, int loop, int gain, mpg123_handle *mh, switch_mutex_t *mutex, CircularBuffer_t *buffer) { - ConnInfo_t *conn = pool.malloc() ; - CURL* easy = createEasyHandle(); - - if (!easy || !conn) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "createDownloader: failed to allocate memory\n"); - return nullptr; - } - - memset(conn, 0, sizeof(ConnInfo_t)); - conn->easy = easy; - conn->mutex = mutex; - conn->buffer = buffer; - conn->mh = mh; - conn->loop = loop; - conn->gain = gain; - conn->rate = rate; - conn->url = strdup(url); - conn->global = &global; - conn->status = Status_t::STATUS_NONE; - conn->timer = new boost::asio::deadline_timer(io_service); - - downloadId_t id = ++currDownloadId; - if (id == 0) id++; - - id2ConnMap[id] = conn; - conn->id = id; - - curl_easy_setopt(easy, CURLOPT_URL, url); - curl_easy_setopt(easy, CURLOPT_HTTPGET, 1L); - curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, write_cb); - curl_easy_setopt(easy, CURLOPT_WRITEDATA, conn); - curl_easy_setopt(easy, CURLOPT_ERRORBUFFER, conn->error); - curl_easy_setopt(easy, CURLOPT_PRIVATE, conn); - curl_easy_setopt(easy, CURLOPT_VERBOSE, 0L); - curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 1L); - curl_easy_setopt(easy, CURLOPT_HEADERFUNCTION, header_callback); - curl_easy_setopt(easy, CURLOPT_HEADERDATA, conn); - - /* call this function to get a socket */ - curl_easy_setopt(easy, CURLOPT_OPENSOCKETFUNCTION, opensocket); - - /* call this function to close a socket */ - curl_easy_setopt(easy, CURLOPT_CLOSESOCKETFUNCTION, close_socket); - - curl_easy_setopt(easy, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_0); - - /* keep the speed down so we don't have to buffer large amounts*/ - curl_easy_setopt(easy, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)31415); - - auto rc = curl_multi_add_handle(global.multi, conn->easy); - if (mcode_test("new_conn: curl_multi_add_handle", rc) < 0) { - return nullptr; - } - conn->status = Status_t::STATUS_DOWNLOAD_IN_PROGRESS; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "createDownloader: launched request, gain %d\n", conn->gain); - return conn; -} - -void destroyConnection(ConnInfo_t *conn) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "destroyConnection\n"); - - /* clean up the curl handle*/ - curl_multi_remove_handle(conn->global, conn->easy); - curl_easy_cleanup(conn->easy); - - /* clear asio resources and free resources */ - if (conn->timer) { - conn->timer->cancel(); - delete conn->timer; - } - if (conn->err_msg) { - free(conn->err_msg); - } - - /* free mp3 decoder */ - if (conn->mh) { - mpg123_close(conn->mh); - mpg123_delete(conn->mh); - } - - if (conn->url) { - free(conn->url); - } - - if (conn->mutex) switch_mutex_lock(conn->mutex); - id2ConnMap.erase(conn->id); - if (conn->mutex) switch_mutex_unlock(conn->mutex); - - memset(conn, 0, sizeof(ConnInfo_t)); - pool.destroy(conn) ; -} - -CURL* createEasyHandle(void) { - CURL* easy = curl_easy_init(); - if(!easy) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "curl_easy_init() failed!\n"); - return nullptr ; - } - - curl_easy_setopt(easy, CURLOPT_FOLLOWLOCATION, 1L); - curl_easy_setopt(easy, CURLOPT_USERAGENT, "jambonz/0.8.5"); - - // set connect timeout to 3 seconds and no total timeout as files could be large - curl_easy_setopt(easy, CURLOPT_CONNECTTIMEOUT_MS, 3000L); - curl_easy_setopt(easy, CURLOPT_TIMEOUT, 0L); // no timeout - - return easy ; -} - -/* Check for completed transfers, and remove their easy handles */ -void check_multi_info(GlobalInfo_t *g) { - CURLMsg *msg; - int msgs_left; - ConnInfo_t *conn; - CURL *easy; - CURLcode res; - - while((msg = curl_multi_info_read(g->multi, &msgs_left))) { - if(msg->msg == CURLMSG_DONE) { - long response_code; - double namelookup=0, connect=0, total=0 ; - char *ct = NULL ; - - easy = msg->easy_handle; - res = msg->data.result; - curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn); - curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &response_code); - curl_easy_getinfo(easy, CURLINFO_CONTENT_TYPE, &ct); - - curl_easy_getinfo(easy, CURLINFO_NAMELOOKUP_TIME, &namelookup); - curl_easy_getinfo(easy, CURLINFO_CONNECT_TIME, &connect); - curl_easy_getinfo(easy, CURLINFO_TOTAL_TIME, &total); - - downloadId_t id = conn->id; - auto mutex = conn->mutex; - auto buffer = conn->buffer; - auto rate = conn->rate; - auto loop = conn->loop; - auto gain = conn->gain; - auto oldId = conn->id; - bool restart = conn->loop && conn->status != Status_t::STATUS_STOPPING && response_code == 200; - - conn->response_code = response_code; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "curl done, response code %d, status %s\n", response_code, status2String(conn->status)); - conn->status = Status_t::STATUS_DOWNLOAD_COMPLETE; - - curl_multi_remove_handle(g->multi, easy); - - if (restart) { - conn->status = Status_t::STATUS_AWAITING_RESTART; - conn->timer->expires_from_now(boost::posix_time::millisec(1000)); - conn->timer->async_wait(boost::bind(&restart_cb, boost::placeholders::_1, conn)); - - //TODO: this seems to not be working from this callback; maybe start it from a timer callback? - } - else { - destroyConnection(conn); - } - } - } -} - -int mcode_test(const char *where, CURLMcode code) { - if(CURLM_OK != code) { - const char *s; - switch(code) { - case CURLM_CALL_MULTI_PERFORM: - s = "CURLM_CALL_MULTI_PERFORM"; - break; - case CURLM_BAD_HANDLE: - s = "CURLM_BAD_HANDLE"; - break; - case CURLM_BAD_EASY_HANDLE: - s = "CURLM_BAD_EASY_HANDLE"; - break; - case CURLM_OUT_OF_MEMORY: - s = "CURLM_OUT_OF_MEMORY"; - break; - case CURLM_INTERNAL_ERROR: - s = "CURLM_INTERNAL_ERROR"; - break; - case CURLM_UNKNOWN_OPTION: - s = "CURLM_UNKNOWN_OPTION"; - break; - case CURLM_LAST: - s = "CURLM_LAST"; - break; - default: - s = "CURLM_unknown"; - break; - case CURLM_BAD_SOCKET: - s = "CURLM_BAD_SOCKET"; - break; - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "mcode_test ERROR: %s returns %s:%d\n", where, s, code); - - return -1; - } - return 0 ; -} - -void remsock(int *f, GlobalInfo_t *g) { - if(f) { - free(f); - f = NULL; - } -} - -/* Called by asio when there is an action on a socket */ -void event_cb(GlobalInfo_t *g, curl_socket_t s, int action, const boost::system::error_code & error, int *fdp) { - int f = *fdp; - - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "event_cb socket %#X has action %d\n", s, action) ; - - // Socket already POOL REMOVED. - if (f == CURL_POLL_REMOVE) { - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "event_cb socket %#X removed\n", s); - remsock(fdp, g); - return; - } - - if(socket_map.find(s) == socket_map.end()) { - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "event_cb: socket %#X already closed\n, s"); - return; - } - - /* make sure the event matches what are wanted */ - if(f == action || f == CURL_POLL_INOUT) { - if(error) { - action = CURL_CSELECT_ERR; - } - CURLMcode rc = curl_multi_socket_action(g->multi, s, action, &g->still_running); - - mcode_test("event_cb: curl_multi_socket_action", rc); - check_multi_info(g); - - if(g->still_running <= 0) { - timer.cancel(); - } - - /* keep on watching. - * the socket may have been closed and/or fdp may have been changed - * in curl_multi_socket_action(), so check them both */ - if(!error && socket_map.find(s) != socket_map.end() && - (f == action || f == CURL_POLL_INOUT)) { - boost::asio::ip::tcp::socket *tcp_socket = socket_map.find(s)->second; - - if(action == CURL_POLL_IN) { - tcp_socket->async_read_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - action, boost::placeholders::_1, fdp)); - } - if(action == CURL_POLL_OUT) { - tcp_socket->async_write_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - action, boost::placeholders::_1, fdp)); - } - } - } -} - -/* socket functions */ -void setsock(int *fdp, curl_socket_t s, CURL *e, int act, int oldact, GlobalInfo_t *g) { - std::map::iterator it = socket_map.find(s); - - if(it == socket_map.end()) { - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "setsock: socket %#X not found\n, s"); - return; - } - - boost::asio::ip::tcp::socket * tcp_socket = it->second; - - *fdp = act; - - if(act == CURL_POLL_IN) { - if(oldact != CURL_POLL_IN && oldact != CURL_POLL_INOUT) { - tcp_socket->async_read_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - CURL_POLL_IN, boost::placeholders::_1, fdp)); - } - } - else if(act == CURL_POLL_OUT) { - if(oldact != CURL_POLL_OUT && oldact != CURL_POLL_INOUT) { - tcp_socket->async_write_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - CURL_POLL_OUT, boost::placeholders::_1, fdp)); - } - } - else if(act == CURL_POLL_INOUT) { - if(oldact != CURL_POLL_IN && oldact != CURL_POLL_INOUT) { - tcp_socket->async_read_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - CURL_POLL_IN, boost::placeholders::_1, fdp)); - } - if(oldact != CURL_POLL_OUT && oldact != CURL_POLL_INOUT) { - tcp_socket->async_write_some(boost::asio::null_buffers(), - boost::bind(&event_cb, g, s, - CURL_POLL_OUT, boost::placeholders::_1, fdp)); - } - } -} - -void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo_t *g) { - /* fdp is used to store current action */ - int *fdp = (int *) calloc(sizeof(int), 1); - - setsock(fdp, s, easy, action, 0, g); - curl_multi_assign(g->multi, s, fdp); -} - -int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) { - GlobalInfo_t *g = &global; - - int *actionp = (int *) sockp; - static const char *whatstr[] = { "none", "IN", "OUT", "INOUT", "REMOVE"}; - - if(what == CURL_POLL_REMOVE) { - *actionp = what; - } - else { - if(!actionp) { - addsock(s, e, what, g); - } - else { - setsock(actionp, s, e, what, *actionp, g); - } - } - return 0; -} - -void threadFunc() { - /* to make sure the event loop doesn't terminate when there is no work to do */ - io_service.reset() ; - boost::asio::io_service::work work(io_service); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_dub threadFunc - starting\n"); - - for(;;) { - - try { - io_service.run() ; - break ; - } - catch( std::exception& e) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "mod_dub threadFunc - Error: %s\n", e.what()); - } - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_dub threadFunc - ending\n"); -} - - -/* Called by asio when our timeout expires */ -void timer_cb(const boost::system::error_code & error, GlobalInfo_t *g) -{ - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "timer_cb\n"); - - if(!error) { - CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running); - mcode_test("timer_cb: curl_multi_socket_action", rc); - check_multi_info(g); - } -} - -int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo_t *g) { - - /* cancel running timer */ - timer.cancel(); - - if(timeout_ms >= 0) { - // from libcurl 7.88.1-10+deb12u4 does not allow call curl_multi_socket_action or curl_multi_perform in curl_multi callback directly - timer.expires_from_now(boost::posix_time::millisec(timeout_ms ? timeout_ms : 1)); - timer.async_wait(boost::bind(&timer_cb, boost::placeholders::_1, g)); - } - - return 0; -} - -std::vector convert_mp3_to_linear(ConnInfo_t *conn, int8_t *data, size_t len) { - std::vector linear_data; - int eof = 0; - int mp3err = 0; - - if(mpg123_feed(conn->mh, (const unsigned char*) data, len) == MPG123_OK) { - while(!eof) { - size_t usedlen = 0; - off_t frame_offset; - unsigned char* audio; - - int decode_status = mpg123_decode_frame(conn->mh, &frame_offset, &audio, &usedlen); - - switch(decode_status) { - case MPG123_NEW_FORMAT: - continue; - - case MPG123_OK: - { - size_t samples = usedlen / sizeof(int16_t); - linear_data.insert(linear_data.end(), reinterpret_cast(audio), reinterpret_cast(audio) + samples); - } - break; - - case MPG123_DONE: - case MPG123_NEED_MORE: - eof = 1; - break; - - case MPG123_ERR: - default: - if(++mp3err >= 5) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n"); - eof = 1; - } - } - - if (eof) - break; - - mp3err = 0; - } - - if (conn->gain != 0) { - switch_change_sln_volume_granular(linear_data.data(), linear_data.size(), conn->gain); - } - } - - return linear_data; -} - -void restart_cb(const boost::system::error_code& error, ConnInfo_t* conn) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "restart_cb status is %s\n", status2String(conn->status)); - if (conn->status == Status_t::STATUS_AWAITING_RESTART) { - auto url = strdup(conn->url); - auto rate = conn->rate; - auto loop = conn->loop; - auto gain = conn->gain; - auto mutex = conn->mutex; - auto buffer = conn->buffer; - auto oldId = conn->id; - - destroyConnection(conn); - - downloadId_t id = start_audio_download(url, rate, loop, gain, mutex, buffer); - - /* re-use id since caller is tracking that id */ - auto * newConnection = id2ConnMap[id]; - id2ConnMap[oldId] = newConnection; - id2ConnMap.erase(id); - - free(url); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "restarted looped download\n"); - } -} - -void throttling_cb(const boost::system::error_code& error, ConnInfo_t* conn) { - if (conn->status == Status_t::STATUS_STOPPING || !conn->mutex || !conn->buffer) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: session gone, resume download so we can complete\n"); - curl_easy_pause(conn->easy, CURLPAUSE_CONT); - return; - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: status is %s\n", status2String(conn->status)); - - switch_mutex_lock(conn->mutex); - if (!error) { - auto size = conn->buffer->size(); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: size is now %ld\n", size); - if (size < BUFFER_THROTTLE_LOW) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: resuming download\n"); - curl_easy_pause(conn->easy, CURLPAUSE_CONT); - switch_mutex_unlock(conn->mutex); - return; - } - - // check back in 2 seconds - conn->timer->expires_from_now(boost::posix_time::millisec(2000)); - conn->timer->async_wait(boost::bind(&throttling_cb, boost::placeholders::_1, conn)); - - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "throttling_cb: error (%d): %s\n", error.value(), error.message().c_str()); - - // Handle any errors - } - switch_mutex_unlock(conn->mutex); -} - - -/* CURLOPT_WRITEFUNCTION - here is where we receive the data */ -size_t write_cb(void *ptr, size_t size, size_t nmemb, ConnInfo_t *conn) { - int8_t *data = (int8_t *) ptr; - size_t bytes_received = size * nmemb; - std::vector pcm_data; - - if (conn->status == Status_t::STATUS_STOPPING || conn->status == Status_t::STATUS_STOPPED || !conn->mutex || !conn->buffer) { - if (conn->timer) conn->timer->cancel(); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "write_cb: aborting transfer, status %s, mutex %p, buffer %p\n", status2String(conn->status), conn->mutex, conn->buffer); - /* this will abort the transfer */ - return 0; - } - { - switch_mutex_lock(conn->mutex); - - if (conn->response_code > 0 && conn->response_code != 200) { - std::string body((char *) ptr, bytes_received); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "write_cb: received body %s\n", body.c_str()); - conn->err_msg = strdup(body.c_str()); - conn->status = Status_t::STATUS_FAILED; - switch_mutex_unlock(conn->mutex); - return 0; - } - - /* throttle after reaching high water mark */ - if (conn->buffer->size() > BUFFER_THROTTLE_HIGH) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "write_cb: throttling download, buffer size is %ld\n", conn->buffer->size()); - - // check back in 2 seconds - conn->timer->expires_from_now(boost::posix_time::millisec(2000)); - conn->timer->async_wait(boost::bind(&throttling_cb, boost::placeholders::_1, conn)); - - conn->status = Status_t::STATUS_DOWNLOAD_PAUSED; - switch_mutex_unlock(conn->mutex); - return CURL_WRITEFUNC_PAUSE; - } - - pcm_data = convert_mp3_to_linear(conn, data, bytes_received); - size_t bytesResampled = pcm_data.size() * sizeof(int16_t); - - // Resize the buffer if necessary - if (conn->buffer->capacity() - conn->buffer->size() < (bytesResampled / sizeof(int16_t))) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "write_cb growing buffer, size now %ld\n", conn->buffer->size()); - - //TODO: if buffer exceeds some max size, return CURL_WRITEFUNC_ERROR to abort the transfer - conn->buffer->set_capacity(conn->buffer->size() + std::max((bytesResampled / sizeof(int16_t)), (size_t)BUFFER_GROW_SIZE)); - } - - /* Push the data into the buffer */ - conn->buffer->insert(conn->buffer->end(), pcm_data.data(), pcm_data.data() + pcm_data.size()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "write_cb: wrote data, buffer size is now %ld\n", conn->buffer->size()); - - switch_mutex_unlock(conn->mutex); - } - return bytes_received; -} - -bool parseHeader(const std::string& str, std::string& header, std::string& value) { - std::vector parts; - boost::split(parts, str, boost::is_any_of(":"), boost::token_compress_on); - - if (parts.size() != 2) - return false; - - header = boost::trim_copy(parts[0]); - value = boost::trim_copy(parts[1]); - return true; -} - -int extract_response_code(const std::string& input) { - std::size_t space_pos = input.find(' '); - if (space_pos == std::string::npos) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Invalid HTTP response format %s\n", input.c_str()); - return 0; - } - - std::size_t code_start_pos = space_pos + 1; - std::size_t code_end_pos = input.find(' ', code_start_pos); - if (code_end_pos == std::string::npos) { - code_end_pos = input.length(); - } - - std::string code_str = input.substr(code_start_pos, code_end_pos - code_start_pos); - int response_code = std::stoi(code_str); - return response_code; -} - -size_t header_callback(char *buffer, size_t size, size_t nitems, ConnInfo_t *conn) { - size_t bytes_received = size * nitems; - const std::string prefix = "HTTP/"; - std::string header, value; - std::string input(buffer, bytes_received); - if (parseHeader(input, header, value)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "recv header: %s with value %s\n", header.c_str(), value.c_str()); - } - else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "recv header: %s\n", input.c_str()); - if (input.rfind(prefix, 0) == 0) { - try { - conn->response_code = extract_response_code(input); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "parsed response code: %ld\n", conn->response_code); - } catch (const std::invalid_argument& e) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "header_callback: invalid response code %s\n", input.substr(prefix.length()).c_str()); - } - } - } - return bytes_received; -} - -/* CURLOPT_OPENSOCKETFUNCTION */ -curl_socket_t opensocket(void *clientp, curlsocktype purpose, struct curl_sockaddr *address) { - curl_socket_t sockfd = CURL_SOCKET_BAD; - - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "opensocket: %d\n", purpose); - /* restrict to IPv4 */ - if(purpose == CURLSOCKTYPE_IPCXN && address->family == AF_INET) { - /* create a tcp socket object */ - boost::asio::ip::tcp::socket *tcp_socket = new boost::asio::ip::tcp::socket(io_service); - - /* open it and get the native handle*/ - boost::system::error_code ec; - tcp_socket->open(boost::asio::ip::tcp::v4(), ec); - - if(ec) { - /* An error occurred */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't open socket [%ld][%s]\n", ec, ec.message().c_str()); - } - else { - sockfd = tcp_socket->native_handle(); - - /* save it for monitoring */ - socket_map.insert(std::pair(sockfd, tcp_socket)); - } - } - return sockfd; -} - -/* CURLOPT_CLOSESOCKETFUNCTION */ -int close_socket(void *clientp, curl_socket_t item) { - //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "close_socket : %#X\n", item); - - std::map::iterator it = socket_map.find(item); - if(it != socket_map.end()) { - delete it->second; - socket_map.erase(it); - } - return 0; -} - diff --git a/mod_dub/audio_downloader.h b/mod_dub/audio_downloader.h deleted file mode 100644 index 384bb50..0000000 --- a/mod_dub/audio_downloader.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef __AUDIO_DOWNLOADER_H__ -#define __AUDIO_DOWNLOADER_H__ - -#include -#include "common.h" - -extern "C" { - -switch_status_t init_audio_downloader(); -switch_status_t deinit_audio_downloader(); - -int start_audio_download(const char* url, int rate, int loop, int gain, switch_mutex_t* mutex, CircularBuffer_t* buffer); -switch_status_t stop_audio_download(int id); - -} - -#endif - diff --git a/mod_dub/common.h b/mod_dub/common.h index 4c843c1..694bd1f 100644 --- a/mod_dub/common.h +++ b/mod_dub/common.h @@ -3,10 +3,7 @@ #include - typedef boost::circular_buffer CircularBuffer_t; -typedef int32_t downloadId_t; -#define INVALID_DOWNLOAD_ID (-1) #endif \ No newline at end of file diff --git a/mod_dub/dub_glue.cpp b/mod_dub/dub_glue.cpp index 28194e6..683ea8e 100644 --- a/mod_dub/dub_glue.cpp +++ b/mod_dub/dub_glue.cpp @@ -1,8 +1,9 @@ #include "mod_dub.h" -#include "audio_downloader.h" -#include "file_loader.h" - +#include "tts_vendor_parser.h" +#include "track.h" +#include "vector_math.h" #include +#include #include @@ -15,124 +16,111 @@ typedef boost::circular_buffer CircularBuffer_t; extern "C" { - void init_dub_track(dub_track_t *track, char* trackName, int sampleRate) { - track->state = DUB_TRACK_STATE_READY; - track->trackName = strdup(trackName); - track->sampleRate = sampleRate; - track->circularBuffer = new CircularBuffer_t(INIT_BUFFER_SIZE); + Track* find_track_by_name(void** tracks, const std::string& trackName) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "find_track_by_name: searching for %s\n", trackName.c_str()); + + for (int i = 0; i < MAX_DUB_TRACKS; i++) { + Track* track = static_cast(tracks[i]); + std::string name = track ? track->getTrackName() : "null"; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "find_track_by_name: offset %d: %s\n", i, name.c_str()); + if (track && 0 == track->getTrackName().compare(trackName)) { + return track; + } + } + return nullptr; } - switch_status_t silence_dub_track(dub_track_t *track) { - assert(track); - switch (track->generator) { - case DUB_GENERATOR_TYPE_HTTP: - stop_audio_download(track->generatorId); - break; - case DUB_GENERATOR_TYPE_FILE: - stop_file_load(track->generatorId); - break; - case DUB_GENERATOR_TYPE_TTS: - //TODO - break; - } - CircularBuffer_t* buffer = reinterpret_cast(track->circularBuffer); - buffer->clear(); - track->state = DUB_TRACK_STATE_READY; - track->generator = DUB_GENERATOR_TYPE_UNKNOWN; - track->generatorId = 0; - - return SWITCH_STATUS_SUCCESS; - } - - switch_status_t remove_dub_track(dub_track_t *track) { - assert(track); - switch (track->generator) { - case DUB_GENERATOR_TYPE_HTTP: - stop_audio_download(track->generatorId); - break; - case DUB_GENERATOR_TYPE_FILE: - stop_file_load(track->generatorId); - break; - case DUB_GENERATOR_TYPE_TTS: - //TODO - break; - } - CircularBuffer_t* buffer = reinterpret_cast(track->circularBuffer); - if (buffer) { - delete buffer; - } - if (track->trackName) { - free(track->trackName); - } - memset(track, 0, sizeof(dub_track_t)); - - return SWITCH_STATUS_SUCCESS; - } - - switch_status_t play_dub_track(dub_track_t *track, switch_mutex_t *mutex, char* url, int loop, int gain) { - bool isHttp = strncmp(url, "http", 4) == 0; - if (track->state != DUB_TRACK_STATE_READY) { - silence_dub_track(track); - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "play_dub_track: starting %s download: %s\n", (isHttp ? "HTTP" : "file"), url); - int id = isHttp ? - start_audio_download(url, track->sampleRate, loop, gain, mutex, (CircularBuffer_t*) track->circularBuffer) : - start_file_load(url, track->sampleRate, loop, gain, mutex, (CircularBuffer_t*) track->circularBuffer); - - if (id == INVALID_DOWNLOAD_ID) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "play_dub_track: failed to start audio download\n"); + switch_status_t add_track(struct cap_cb* cb, char* trackName, int sampleRate) { + Track* existingTrack = find_track_by_name(cb->tracks, trackName); + if (existingTrack) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "add_track: track %s already exists\n", trackName); return SWITCH_STATUS_FALSE; } - track->state = DUB_TRACK_STATE_ACTIVE; - track->generatorId = id; - track->generator = isHttp ? DUB_GENERATOR_TYPE_HTTP : DUB_GENERATOR_TYPE_FILE; - track->gain = gain; + + for (int i = 0; i < MAX_DUB_TRACKS; i++) { + if (!cb->tracks[i]) { + cb->tracks[i] = new Track(trackName, sampleRate); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "add_track: added track %s at offset %d\n", trackName, i); + return SWITCH_STATUS_SUCCESS; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "add_track: no room for track %s\n", trackName); + return SWITCH_STATUS_FALSE; + } + + switch_status_t silence_dub_track(struct cap_cb* cb, char* trackName) { + Track* track = find_track_by_name(cb->tracks, trackName); + if (track) { + track->removeAllAudio(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "silence_dub_track: silenced track %s\n", trackName); + return SWITCH_STATUS_SUCCESS; + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "silence_dub_track: track %s not found\n", trackName); + return SWITCH_STATUS_FALSE; + } + + switch_status_t remove_dub_track(struct cap_cb* cb, char* trackName) { + for (int i = 0; i < MAX_DUB_TRACKS; i++) { + Track* track = static_cast(cb->tracks[i]); + if (track && track->getTrackName() == trackName) { + track->removeAllAudio(); + delete track; + cb->tracks[i] = nullptr; + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "remove_dub_track: removed track %s\n", trackName); + return SWITCH_STATUS_SUCCESS; + } + } + + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "remove_dub_track: track %s not found\n", trackName); + return SWITCH_STATUS_FALSE; + } + + switch_status_t play_dub_track(struct cap_cb* cb, char* trackName, char* url, int loop, int gain) { + bool isHttp = strncmp(url, "http", 4) == 0; + Track* track = find_track_by_name(cb->tracks, trackName); + + if (!track) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "play_dub_track: track %s not found\n", trackName); + return SWITCH_STATUS_FALSE; + } + + if (isHttp) { + track->queueHttpGetAudio(url, gain, loop); + } + else { + track->queueFileAudio(url, gain, loop); + } return SWITCH_STATUS_SUCCESS; } - switch_status_t say_dub_track(dub_track_t *track, switch_mutex_t *mutex, char* text, int gain) { - if (track->state != DUB_TRACK_STATE_READY) { - silence_dub_track(track); // wait...shouldnt we queue says? - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "say_dub_track: starting TTS\n"); - - /** - * TODO: - * This is not implemented yet. We can play TTS using using the playOnSay function - * because jambonz can generate local audio files using TTS vendors. - * However, we should probably at least implement support for elevenlabs streaming api - * here because it is so much faster. - * - */ + switch_status_t say_dub_track(struct cap_cb* cb, char* trackName, char* text, int gain) { + std::vector headers; + std::string url, body; + Track* track = find_track_by_name(cb->tracks, trackName); - /* - track->state = DUB_TRACK_STATE_ACTIVE; - track->generatorId = id; - track->generator = DUB_GENERATOR_TYPE_TTS; - track->gain = gain; - */ + if (!track) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "play_dub_track: track %s not found\n", trackName); + return SWITCH_STATUS_FALSE; + } + if (tts_vendor_parse_text(text, url, body, headers) != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "say_dub_track: failed to parse text\n"); + return SWITCH_STATUS_FALSE; + } + track->queueHttpPostAudio(url, body, headers, gain); return SWITCH_STATUS_SUCCESS; } /* module load and unload */ switch_status_t dub_init() { - switch_status_t status; - status = init_audio_downloader(); - if (status == SWITCH_STATUS_SUCCESS) { - status = init_file_loader(); - } - return status; + return SWITCH_STATUS_SUCCESS; } switch_status_t dub_cleanup() { - switch_status_t status; - status = deinit_audio_downloader(); - if (status == SWITCH_STATUS_SUCCESS) { - status = deinit_file_loader(); - } - return status; + return SWITCH_STATUS_SUCCESS; } switch_status_t dub_session_cleanup(switch_core_session_t *session, int channelIsClosing, switch_media_bug_t *bug) { @@ -144,17 +132,18 @@ extern "C" { if (!switch_channel_get_private(channel, MY_BUG_NAME)) { // race condition - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s Bug is not attached (race).\n", switch_channel_get_name(channel)); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Bug is not attached (race).\n", switch_channel_get_name(channel)); switch_mutex_unlock(cb->mutex); return SWITCH_STATUS_FALSE; } switch_channel_set_private(channel, MY_BUG_NAME, NULL); for (int i = 0; i < MAX_DUB_TRACKS; i++) { - dub_track_t* track = &cb->tracks[i]; - if (track->state != DUB_TRACK_STATE_INACTIVE) { - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "dub_session_cleanup: cleared track %d:%s\n", i, track->trackName); - remove_dub_track(track); + Track* track = static_cast(cb->tracks[i]); + if (track) { + track->removeAllAudio(); + delete track; + cb->tracks[i] = nullptr; } } @@ -162,7 +151,7 @@ extern "C" { switch_core_media_bug_remove(session, &bug); } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "dub_session_cleanup: removed bug and cleared tracks\n"); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "dub_session_cleanup: removed bug and cleared tracks\n"); switch_mutex_unlock(cb->mutex); } switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "%s dub_session_cleanup: Bug is not attached.\n", switch_channel_get_name(channel)); @@ -175,12 +164,17 @@ extern "C" { if (switch_mutex_trylock(cb->mutex) == SWITCH_STATUS_SUCCESS) { - /* check if any tracks are actively pushing audio */ - int trackCount = 0; + /* check if any tracks have audio to contribute */ + std::vector activeTracks; + activeTracks.reserve(MAX_DUB_TRACKS); for (int i = 0; i < MAX_DUB_TRACKS; i++) { - if (cb->tracks[i].state == DUB_TRACK_STATE_ACTIVE) trackCount++; + if (cb->tracks[i]) { + auto track = static_cast(cb->tracks[i]); + if (track->hasAudio_NoLock()) activeTracks.push_back(static_cast(cb->tracks[i])); + } } - if (trackCount == 0 && cb->gain == 0) { + + if (activeTracks.size() == 0 && cb->gain == 0) { switch_mutex_unlock(cb->mutex); return SWITCH_TRUE; } @@ -189,31 +183,24 @@ extern "C" { int16_t *fp = reinterpret_cast(rframe->data); rframe->channels = 1; - rframe->datalen = rframe->samples * rframe->channels * sizeof(int16_t); + rframe->datalen = rframe->samples * sizeof(int16_t); /* apply gain to audio in main channel if requested*/ if (cb->gain != 0) { - switch_change_sln_volume_granular(fp, rframe->samples, cb->gain); + vector_change_sln_volume_granular(fp, rframe->samples, cb->gain); } /* now mux in the data from tracks */ - for (int i = 0; i < rframe->samples; i++) { - int16_t input = fp[i]; - int16_t value = input; - for (int j = 0; j < MAX_DUB_TRACKS; j++) { - dub_track_t* track = &cb->tracks[j]; - if (track->state == DUB_TRACK_STATE_ACTIVE) { - CircularBuffer_t* buffer = reinterpret_cast(track->circularBuffer); - if (buffer && !buffer->empty()) { - int16_t sample = buffer->front(); - buffer->pop_front(); - value += sample; - } - } + for (auto track : activeTracks) { + int16_t data[SWITCH_RECOMMENDED_BUFFER_SIZE]; + memset(data, 0, sizeof(data)); + auto samples = track->retrieveAndClearAudio(data, rframe->samples); + if (samples > 0) { + vector_add(fp, data, rframe->samples); } - switch_normalize_to_16bit(value); - fp[i] = (int16_t) value; } + vector_normalize(fp, rframe->samples); + switch_core_media_bug_set_write_replace_frame(bug, rframe); switch_mutex_unlock(cb->mutex); } diff --git a/mod_dub/dub_glue.h b/mod_dub/dub_glue.h index af6c20f..3e2ce25 100644 --- a/mod_dub/dub_glue.h +++ b/mod_dub/dub_glue.h @@ -4,11 +4,11 @@ switch_status_t dub_init(); switch_status_t dub_cleanup(); -void init_dub_track(dub_track_t *track, char* trackName, int sampleRate); -switch_status_t silence_dub_track(dub_track_t *track); -switch_status_t remove_dub_track(dub_track_t *track); -switch_status_t play_dub_track(dub_track_t *track, switch_mutex_t *mutex, char* url, int loop, int gain); -switch_status_t say_dub_track(dub_track_t *track, switch_mutex_t *mutex, char* text, int gain); +switch_status_t add_track(struct cap_cb* cb, char* trackName, int sampleRate); +switch_status_t silence_dub_track(struct cap_cb* cb, char* trackName); +switch_status_t remove_dub_track(struct cap_cb* cb, char* trackName); +switch_status_t play_dub_track(struct cap_cb* cb, char* trackName, char* url, int loop, int gain); +switch_status_t say_dub_track(struct cap_cb* cb, char* trackName, char* text, int gain); switch_status_t dub_session_cleanup(switch_core_session_t *session, int channelIsClosing, switch_media_bug_t *bug); switch_bool_t dub_speech_frame(switch_media_bug_t *bug, void* user_data); diff --git a/mod_dub/file_loader.cpp b/mod_dub/file_loader.cpp deleted file mode 100644 index 3f0b8f2..0000000 --- a/mod_dub/file_loader.cpp +++ /dev/null @@ -1,437 +0,0 @@ -#include "file_loader.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include - -#define INIT_BUFFER_SIZE (80000) -#define BUFFER_GROW_SIZE (80000) -#define BUFFER_THROTTLE_LOW (40000) -#define BUFFER_THROTTLE_HIGH (160000) - -static uint16_t currDownloadId = 0; - -typedef enum -{ - STATUS_NONE = 0, - STATUS_FAILED, - STATUS_FILE_IN_PROGRESS, - STATUS_FILE_PAUSED, - STATUS_FILE_COMPLETE, - STATUS_AWAITING_RESTART, - STATUS_STOPPING, - STATUS_STOPPED -} Status_t; - -typedef enum { - FILE_TYPE_MP3 = 0, - FILE_TYPE_R8 -} FileType_t; - -static const char* status2String(Status_t status) -{ - static const char* statusStrings[] = { - "STATUS_NONE", - "STATUS_FAILED", - "STATUS_FILE_IN_PROGRESS", - "STATUS_FILE_PAUSED", - "STATUS_FILE_COMPLETE", - "STATUS_AWAITING_RESTART", - "STATUS_STOPPING", - "STATUS_STOPPED" - }; - - if (status >= 0 && status < sizeof(statusStrings) / sizeof(statusStrings[0])) - { - return statusStrings[status]; - } - else - { - return "UNKNOWN_STATUS"; - } -} - -typedef struct -{ - switch_mutex_t* mutex; - CircularBuffer_t* buffer; - mpg123_handle *mh; - FILE* fp; - char* path; - bool loop; - int rate; - boost::asio::deadline_timer *timer; - Status_t status; - FileType_t type; - downloadId_t id; - int gain; -} FileInfo_t; - -typedef std::map Id2FileMap_t; -static Id2FileMap_t id2FileMap; - -static boost::object_pool pool ; -static boost::asio::io_service io_service; -static std::thread worker_thread; - - -/* forward declarations */ -static FileInfo_t* createFileLoader(const char *path, int rate, int loop, int gain, mpg123_handle *mhm, switch_mutex_t *mutex, CircularBuffer_t *buffer); -static void destroyFileInfo(FileInfo_t *finfo); -static void threadFunc(); -static std::vector convert_mp3_to_linear(FileInfo_t *file, int8_t *data, size_t len); -static void read_cb(const boost::system::error_code& error, FileInfo_t* finfo) ; - -/* apis */ -extern "C" { - - switch_status_t init_file_loader() { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "init_file_loader loading..\n"); - - if (mpg123_init() != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "init_file_loader: failed to initiate MPG123"); - return SWITCH_STATUS_FALSE; - } - - /* start worker thread */ - std::thread t(threadFunc) ; - worker_thread.swap( t ) ; - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "init_file_loader: loaded\n"); - - return SWITCH_STATUS_SUCCESS; - - } - - switch_status_t deinit_file_loader() { - /* stop the ASIO IO service */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_file_loader: stopping io service\n"); - io_service.stop(); - - /* Join the worker thread */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "deinit_file_loader: wait for worker thread to complete\n"); - if (worker_thread.joinable()) { - worker_thread.join(); - } - - mpg123_exit(); - - return SWITCH_STATUS_SUCCESS; - } - - downloadId_t start_file_load(const char* path, int rate, int loop, int gain, switch_mutex_t* mutex, CircularBuffer_t* buffer) { - int mhError = 0; - - /* we only handle mp3 or r8 files atm */ - const char *ext = strrchr(path, '.'); - if (!ext) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "start_file_load: file %s has no extension\n", path); - return INVALID_DOWNLOAD_ID; - } - ext++; - if (0 != strcmp(ext, "mp3") && 0 != strcmp(ext, "r8")) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "start_file_load: file %s has unsupported extension %s\n", path, ext); - return INVALID_DOWNLOAD_ID; - } - - /* allocate handle for mpeg decoding */ - mpg123_handle *mh = mpg123_new("auto", &mhError); - if (!mh) { - const char *mhErr = mpg123_plain_strerror(mhError); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error allocating mpg123 handle! %s\n", switch_str_nil(mhErr)); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_open_feed(mh) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error mpg123_open_feed!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_format_all(mh) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error mpg123_format_all!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_param(mh, MPG123_FORCE_RATE, rate, 0) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error forcing resample to 8k!\n"); - return INVALID_DOWNLOAD_ID; - } - - if (mpg123_param(mh, MPG123_FLAGS, MPG123_MONO_MIX, 0) != MPG123_OK) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error forcing single channel!\n"); - return INVALID_DOWNLOAD_ID; - } - - FileInfo_t* finfo = createFileLoader(path, rate, loop, gain, mh, mutex, buffer); - if (!finfo) { - return INVALID_DOWNLOAD_ID; - } - - /* do the initial read in the worker thread so we don't block here */ - finfo->timer->expires_from_now(boost::posix_time::millisec(1)); - finfo->timer->async_wait(boost::bind(&read_cb, boost::placeholders::_1, finfo)); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, - "start_file_load: starting load %d\n", finfo->id); - - return finfo->id; - } - - switch_status_t stop_file_load(int id) { - auto it = id2FileMap.find(id); - if (it == id2FileMap.end()) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stop_file_load: id %d has already completed\n", id); - return SWITCH_STATUS_FALSE; - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, - "stop_audio_download: stopping download %d, status %s\n", id, status2String(it->second->status)); - - FileInfo_t *finfo = it->second; - auto status = finfo->status; - - /* past this point I shall not access either the mutex or the buffer provided */ - finfo->mutex = nullptr; - finfo->buffer = nullptr; - - destroyFileInfo(finfo); - - finfo->status = Status_t::STATUS_STOPPED; - return SWITCH_STATUS_SUCCESS; - } -} - -/* internal */ -FileInfo_t* createFileLoader(const char *path, int rate, int loop, int gain, mpg123_handle *mh, switch_mutex_t *mutex, CircularBuffer_t *buffer) { - FileInfo_t *finfo = pool.malloc() ; - const char *ext = strrchr(path, '.'); - - memset(finfo, 0, sizeof(FileInfo_t)); - finfo->mutex = mutex; - finfo->buffer = buffer; - finfo->mh = mh; - finfo->loop = loop; - finfo->gain = gain; - finfo->rate = rate; - finfo->path = strdup(path); - finfo->status = Status_t::STATUS_NONE; - finfo->timer = new boost::asio::deadline_timer(io_service); - - if (0 == strcmp(ext, "mp3")) finfo->type = FileType_t::FILE_TYPE_MP3; - else if (0 == strcmp(ext, "r8")) finfo->type = FileType_t::FILE_TYPE_R8; - - downloadId_t id = ++currDownloadId; - if (id == 0) id++; - - id2FileMap[id] = finfo; - finfo->id = id; - - finfo->status = Status_t::STATUS_AWAITING_RESTART; - - finfo->fp = fopen(finfo->path, "rb"); - if (finfo->fp == NULL) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "createFileLoader: failed to open file %s\n", finfo->path); - destroyFileInfo(finfo); - return nullptr; - } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, - "createFileLoader: launched request, loop %s, gain %d\n", (finfo->loop ? "yes": "no"), finfo->gain); - return finfo; -} - -void destroyFileInfo(FileInfo_t *finfo) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "destroyFileInfo\n"); - - /* clear asio resources and free resources */ - if (finfo->timer) { - finfo->timer->cancel(); - delete finfo->timer; - } - - /* free mp3 decoder */ - if (finfo->mh) { - mpg123_close(finfo->mh); - mpg123_delete(finfo->mh); - } - - if (finfo->path) { - free(finfo->path); - } - - if (finfo->mutex) switch_mutex_lock(finfo->mutex); - id2FileMap.erase(finfo->id); - if (finfo->mutex) switch_mutex_unlock(finfo->mutex); - - memset(finfo, 0, sizeof(FileInfo_t)); - pool.destroy(finfo) ; -} - -void threadFunc() { - /* to make sure the event loop doesn't terminate when there is no work to do */ - io_service.reset() ; - boost::asio::io_service::work work(io_service); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "file_loader threadFunc - starting\n"); - - for(;;) { - - try { - io_service.run() ; - break ; - } - catch( std::exception& e) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "file_loader threadFunc - Error: %s\n", e.what()); - } - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "file_loader threadFunc - ending\n"); -} - -std::vector convert_mp3_to_linear(FileInfo_t *finfo, int8_t *data, size_t len) { - std::vector linear_data; - int eof = 0; - int mp3err = 0; - - if(mpg123_feed(finfo->mh, (const unsigned char*) data, len) == MPG123_OK) { - while(!eof) { - size_t usedlen = 0; - off_t frame_offset; - unsigned char* audio; - - int decode_status = mpg123_decode_frame(finfo->mh, &frame_offset, &audio, &usedlen); - - switch(decode_status) { - case MPG123_NEW_FORMAT: - continue; - - case MPG123_OK: - { - size_t samples = usedlen / sizeof(int16_t); - linear_data.insert(linear_data.end(), reinterpret_cast(audio), reinterpret_cast(audio) + samples); - } - break; - - case MPG123_DONE: - case MPG123_NEED_MORE: - eof = 1; - break; - - case MPG123_ERR: - default: - if(++mp3err >= 5) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n"); - eof = 1; - } - } - - if (eof) - break; - - mp3err = 0; - } - - if (finfo->gain != 0) { - switch_change_sln_volume_granular(linear_data.data(), linear_data.size(), finfo->gain); - } - } - - return linear_data; -} - -void read_cb(const boost::system::error_code& error, FileInfo_t* finfo) { - if (finfo->status == Status_t::STATUS_STOPPING || !finfo->mutex || !finfo->buffer) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u session gone\n", finfo->id); - return; - } - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u status is %s\n", finfo->id, status2String(finfo->status)); - if (finfo->status == Status_t::STATUS_AWAITING_RESTART) { - finfo->status = Status_t::STATUS_FILE_IN_PROGRESS; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u starting initial read of file\n", finfo->id); - } - - if (!error) { - size_t size = 0; - - switch_mutex_lock(finfo->mutex); - size = finfo->buffer->size(); - switch_mutex_unlock(finfo->mutex); - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u size is now %ld\n", finfo->id, size); - if (size < BUFFER_THROTTLE_LOW) { - std::vector pcm_data; - int8_t buf[INIT_BUFFER_SIZE]; - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u reading data\n", finfo->id); - - size_t bytesRead = ::fread(buf, sizeof(int8_t), INIT_BUFFER_SIZE, finfo->fp); - if (bytesRead <= 0) { - if (::feof(finfo->fp)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u eof\n", finfo->id); - } - else if (::ferror(finfo->fp)) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "read_cb: %u error reading file\n", finfo->id); - } - else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "read_cb: %u unknown error reading file\n", finfo->id); - } - finfo->status = Status_t::STATUS_FILE_COMPLETE; - return; - } - - if (finfo->type == FileType_t::FILE_TYPE_MP3) { - pcm_data = convert_mp3_to_linear(finfo, buf, bytesRead); - } else { - pcm_data = std::vector(reinterpret_cast(buf), reinterpret_cast(buf) + bytesRead / 2); - } - - switch_mutex_lock(finfo->mutex); - - // Resize the buffer if necessary - if (finfo->buffer->capacity() - finfo->buffer->size() < pcm_data.size()) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "write_cb %u growing buffer, size now %ld\n", finfo->id, finfo->buffer->size()); - finfo->buffer->set_capacity(finfo->buffer->size() + std::max(pcm_data.size(), (size_t)BUFFER_GROW_SIZE)); - } - - /* Push the data into the buffer */ - finfo->buffer->insert(finfo->buffer->end(), pcm_data.data(), pcm_data.data() + pcm_data.size()); - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u wrote data, buffer size is now %ld\n", finfo->id, finfo->buffer->size()); - - switch_mutex_unlock(finfo->mutex); - - if (bytesRead < INIT_BUFFER_SIZE) { - finfo->status = Status_t::STATUS_FILE_COMPLETE; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u reached end of file, status is %s\n", finfo->id, status2String(finfo->status)); - } - } - - if (finfo->status == Status_t::STATUS_FILE_COMPLETE && finfo->loop) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u looping\n", finfo->id); - ::fseek(finfo->fp, 0, SEEK_SET); - finfo->status = Status_t::STATUS_AWAITING_RESTART; - } - - if (finfo->status != Status_t::STATUS_FILE_COMPLETE) { - // read more in 2 seconds - finfo->timer->expires_from_now(boost::posix_time::millisec(2000)); - finfo->timer->async_wait(boost::bind(&read_cb, boost::placeholders::_1, finfo)); - } - else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "read_cb: %u file complete, status %s loop %s\n", - finfo->id, status2String(finfo->status), (finfo->loop ? "yes" : "no")); - } - } else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "read_cb: %u error (%d): %s\n", finfo->id, error.value(), error.message().c_str()); - - // Handle any errors - } -} diff --git a/mod_dub/file_loader.h b/mod_dub/file_loader.h deleted file mode 100644 index 9a31f2e..0000000 --- a/mod_dub/file_loader.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef __FILE_LOADER_H__ -#define __FILE_LOADER_H__ - -#include -#include "common.h" - -extern "C" { - -switch_status_t init_file_loader(); -switch_status_t deinit_file_loader(); - -int start_file_load(const char* path, int rate, int loop, int gain, switch_mutex_t* mutex, CircularBuffer_t* buffer); -switch_status_t stop_file_load(int id); - -} - -#endif - diff --git a/mod_dub/mod_dub.c b/mod_dub/mod_dub.c index 502b4b9..8123afb 100644 --- a/mod_dub/mod_dub.c +++ b/mod_dub/mod_dub.c @@ -8,6 +8,7 @@ #include #include #include "dub_glue.h" +#include /* Prototypes */ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_dub_shutdown); @@ -88,7 +89,6 @@ static switch_status_t dub_add_track(switch_core_session_t *session, char* track switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = NULL; struct cap_cb *cb = NULL; - int offset = 0; int samples_per_second; switch_codec_implementation_t write_impl = { 0 }; @@ -102,36 +102,32 @@ static switch_status_t dub_add_track(switch_core_session_t *session, char* track cb =(struct cap_cb *) switch_core_session_alloc(session, sizeof(struct cap_cb)); memset(cb, 0, sizeof(struct cap_cb)); switch_mutex_init(&cb->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session)); - offset = 0; } else { /* retrieve the bug and search for an empty track */ cb = (struct cap_cb *) switch_core_media_bug_get_user_data(bug); - while (offset < MAX_DUB_TRACKS && cb->tracks[offset].state != DUB_TRACK_STATE_INACTIVE) { - offset++; - } - if (offset == MAX_DUB_TRACKS) { - /* all tracks are in use */ - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_add_track: no available tracks\n"); - return SWITCH_STATUS_FALSE; - } } switch_core_session_get_write_impl(session, &write_impl); samples_per_second = !strcasecmp(write_impl.iananame, "g722") ? write_impl.actual_samples_per_second : write_impl.samples_per_second; - init_dub_track(&cb->tracks[offset], trackName, samples_per_second); + switch_mutex_lock(cb->mutex); + if (add_track(cb, trackName, samples_per_second) != SWITCH_STATUS_SUCCESS) { + switch_mutex_unlock(cb->mutex); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_add_track: error adding track %s\n", trackName); + return SWITCH_STATUS_FALSE; + } if (!bug) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "dub_add_track: adding bug for track %s\n", trackName); if (switch_core_media_bug_add(session, MY_BUG_NAME, NULL, capture_callback, (void *) cb, 0, SMBF_WRITE_REPLACE, &bug) != SWITCH_STATUS_SUCCESS) { + switch_mutex_unlock(cb->mutex); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_add_track: error adding bug!\n"); return SWITCH_STATUS_FALSE; } switch_channel_set_private(channel, MY_BUG_NAME, bug); } - - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "dub_add_track: added track %s at offset %d\n", trackName, offset); + switch_mutex_unlock(cb->mutex); return SWITCH_STATUS_SUCCESS; } @@ -140,39 +136,17 @@ static switch_status_t dub_remove_track(switch_core_session_t *session, char* tr switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); switch_status_t status = SWITCH_STATUS_FALSE; - dub_track_t *track = NULL; if (bug) { struct cap_cb *cb =(struct cap_cb *) switch_core_media_bug_get_user_data(bug); + switch_mutex_lock(cb->mutex); - for (int i = 0; i < MAX_DUB_TRACKS && track == NULL; i++) { - if (cb->tracks[i].state != DUB_TRACK_STATE_INACTIVE && strcmp(cb->tracks[i].trackName, trackName) == 0) { - track = &cb->tracks[i]; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "dub_remove_track: removing track %s at offset %d\n", trackName, i); - break; - } - } - - if (track) { - int count = 0; - - remove_dub_track(track); - - /* check if this is the last bug */ - for (int i = 0; i < MAX_DUB_TRACKS; i++) { - if (cb->tracks[i].state != DUB_TRACK_STATE_INACTIVE) count++; - } - - if (count == 0) { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "dub_remove_track: removing bug after removing last track\n"); - dub_session_cleanup(session, 0, bug); - } - status = SWITCH_STATUS_SUCCESS; - } - else { - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_remove_track: track %s not found\n", trackName); - } + status = remove_dub_track(cb, trackName); switch_mutex_unlock(cb->mutex); + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_remove_track: error removing track %s\n", trackName); + return SWITCH_STATUS_FALSE; + } } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_remove_track: bug not found\n"); @@ -185,24 +159,14 @@ static switch_status_t dub_silence_track(switch_core_session_t *session, char* t switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); switch_status_t status = SWITCH_STATUS_FALSE; - dub_track_t *track = NULL; if (bug) { struct cap_cb *cb =(struct cap_cb *) switch_core_media_bug_get_user_data(bug); - switch_mutex_lock(cb->mutex); - for (int i = 0; i < MAX_DUB_TRACKS; i++) { - if (cb->tracks[i].state != DUB_TRACK_STATE_INACTIVE && strcmp(cb->tracks[i].trackName, trackName) == 0) { - track = &cb->tracks[i]; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "dub_silence_track: silencing track %s at offset %d\n", trackName, i); - break; - } + status = silence_dub_track(cb, trackName); + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_silence_track: error silencing track %s\n", trackName); + return SWITCH_STATUS_FALSE; } - - if (track) { - silence_dub_track(track); - status = SWITCH_STATUS_SUCCESS; - } - switch_mutex_unlock(cb->mutex); } else { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_silence_track: bug not found\n"); @@ -215,24 +179,14 @@ static switch_status_t dub_play_on_track(switch_core_session_t *session, char* t switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); switch_status_t status = SWITCH_STATUS_FALSE; - dub_track_t *track = NULL; if (bug) { struct cap_cb *cb =(struct cap_cb *) switch_core_media_bug_get_user_data(bug); - switch_mutex_lock(cb->mutex); - for (int i = 0; i < MAX_DUB_TRACKS; i++) { - if (cb->tracks[i].state != DUB_TRACK_STATE_INACTIVE && strcmp(cb->tracks[i].trackName, trackName) == 0) { - track = &cb->tracks[i]; - switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, - "dub_play_on_track: playing %s on track %s at offset %d with gain %d\n", url, trackName, i, gain); - break; - } + status = play_dub_track(cb, trackName, url, loop, gain); + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_play_on_track: error playing %s on track %s\n", url, trackName); + return SWITCH_STATUS_FALSE; } - - if (track) { - status = play_dub_track(track, cb->mutex, url, loop, gain); - } - switch_mutex_unlock(cb->mutex); } return status; } @@ -241,37 +195,30 @@ static switch_status_t dub_say_on_track(switch_core_session_t *session, char* tr switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME); switch_status_t status = SWITCH_STATUS_FALSE; - dub_track_t *track = NULL; if (bug) { struct cap_cb *cb =(struct cap_cb *) switch_core_media_bug_get_user_data(bug); - switch_mutex_lock(cb->mutex); - for (int i = 0; i < MAX_DUB_TRACKS; i++) { - if (cb->tracks[i].state != DUB_TRACK_STATE_INACTIVE && strcmp(cb->tracks[i].trackName, trackName) == 0) { - track = &cb->tracks[i]; - break; - } + status = say_dub_track(cb, trackName, text, gain); + if (status != SWITCH_STATUS_SUCCESS) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "dub_say_on_track: error saying %s on track %s\n", text, trackName); + return SWITCH_STATUS_FALSE; } - - if (track) { - status = say_dub_track(track, cb->mutex, text, gain); - } - switch_mutex_unlock(cb->mutex); } return status; } #define DUB_API_SYNTAX " [addTrack|removeTrack|silenceTrack|playOnTrack|sayOnTrack|setGain] track [url|text|gain] [gain] [loop]" +#define MAX_PARAMS 6 SWITCH_STANDARD_API(dub_function) { - char *mycmd = NULL, *argv[6] = { 0 }; + char *mycmd = NULL, *argv[MAX_PARAMS] = { 0 }; int argc = 0; int error_written = 0; switch_status_t status = SWITCH_STATUS_FALSE; if (!zstr(cmd) && (mycmd = strdup(cmd))) { switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "dub_function: %s\n", mycmd); - argc = switch_separate_string(mycmd, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); + argc = switch_separate_string(mycmd, ' ', argv, (sizeof(argv) / sizeof(argv[0]))); } if (zstr(cmd) || argc < 3 || zstr(argv[0]) || zstr(argv[1]) || zstr(argv[2])) { @@ -302,6 +249,7 @@ SWITCH_STANDARD_API(dub_function) } else if (0 == strcmp(action, "playOnTrack")) { if (argc < 4) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "playOnTrack requires a url\n"); stream->write_function(stream, "-USAGE: %s\n", DUB_API_SYNTAX); error_written = 1; } diff --git a/mod_dub/mod_dub.h b/mod_dub/mod_dub.h index d6b1651..1accf2a 100644 --- a/mod_dub/mod_dub.h +++ b/mod_dub/mod_dub.h @@ -17,42 +17,11 @@ /* per-channel data */ typedef void (*responseHandler_t)(switch_core_session_t* session, const char* json, const char* bugname, const char* details); -typedef enum { - DUB_TRACK_STATE_INACTIVE = 0, - DUB_TRACK_STATE_READY, - DUB_TRACK_STATE_ACTIVE, - DUB_TRACK_STATE_PAUSED -} dub_state_t; - -typedef enum { - DUB_GENERATOR_TYPE_UNKNOWN = 0, - DUB_GENERATOR_TYPE_HTTP, - DUB_GENERATOR_TYPE_FILE, - DUB_GENERATOR_TYPE_TTS -} dub_generator_t; - -typedef enum { - DUB_TRACK_EVENT_PLAY = 0, - DUB_TRACK_EVENT_STOP, - DUB_TRACK_EVENT_PAUSE, - DUB_TRACK_EVENT_RESUME -} dub_event_t; - - -typedef struct dub_track { - dub_state_t state; - dub_generator_t generator; - char* trackName; - int sampleRate; - int gain; - void* circularBuffer; - int generatorId; -} dub_track_t; struct cap_cb { switch_mutex_t *mutex; int gain; - dub_track_t tracks[MAX_DUB_TRACKS]; + void *tracks[MAX_DUB_TRACKS]; }; diff --git a/mod_dub/mpg_decode.cpp b/mod_dub/mpg_decode.cpp new file mode 100644 index 0000000..da6d52e --- /dev/null +++ b/mod_dub/mpg_decode.cpp @@ -0,0 +1,58 @@ + +#include "mpg_decode.h" +#include "vector_math.h" + +std::vector convert_mp3_to_linear(mpg123_handle *mh, int gain, int8_t *data, size_t len) { + std::vector linear_data; + int eof = 0; + int mp3err = 0; + + if(mpg123_feed(mh, (const unsigned char*) data, len) == MPG123_OK) { + while(!eof) { + size_t usedlen = 0; + off_t frame_offset; + unsigned char* audio; + + int decode_status = mpg123_decode_frame(mh, &frame_offset, &audio, &usedlen); + + switch(decode_status) { + case MPG123_NEW_FORMAT: + continue; + + case MPG123_OK: + { + size_t samples = usedlen / sizeof(int16_t); + linear_data.insert(linear_data.end(), reinterpret_cast(audio), reinterpret_cast(audio) + samples); + } + break; + + case MPG123_DONE: + case MPG123_NEED_MORE: + eof = 1; + break; + + case MPG123_ERR: + default: + if(++mp3err >= 5) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Decoder Error!\n"); + eof = 1; + } + } + + if (eof) + break; + + mp3err = 0; + } + + if (gain != 0) { + vector_change_sln_volume_granular(linear_data.data(), linear_data.size(), gain); + } + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error feeding data to mpg123\n"); + } + + + return linear_data; +} diff --git a/mod_dub/mpg_decode.h b/mod_dub/mpg_decode.h new file mode 100644 index 0000000..2c03537 --- /dev/null +++ b/mod_dub/mpg_decode.h @@ -0,0 +1,11 @@ + +#ifndef MPG_DECODE_H +#define MPG_DECODE_H + +#include +#include +#include "switch.h" + +std::vector convert_mp3_to_linear(mpg123_handle *mh, int gain, int8_t *data, size_t len); + +#endif diff --git a/mod_dub/track.cpp b/mod_dub/track.cpp new file mode 100644 index 0000000..f9ec544 --- /dev/null +++ b/mod_dub/track.cpp @@ -0,0 +1,143 @@ +#include "track.h" +#include "ap_file.h" +#include "ap_http.h" +#include "switch.h" + +#define INIT_BUFFER_SIZE (80000) + +Track::Track(const std::string& trackName, int sampleRate) : _trackName(trackName), _sampleRate(sampleRate), + _buffer(INIT_BUFFER_SIZE), _stopping(false) +{ +} + +Track::~Track() { + removeAllAudio(); + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Track::~Track: track %s\n", _trackName.c_str()); +} + +/** + * @brief called when an audio producer has finished retrieving the audio. + * If we have another producer queued, then start it. + * + * @param hasError + * @param errMsg + */ +void Track::onPlayDone(bool hasError, const std::string& errMsg) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Track::onPlayDone %s\n", _trackName.c_str()); + + if (!_stopping) { + if (hasError) switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "onPlayDone: error: %s\n", errMsg.c_str()); + std::lock_guard lock(_mutex); + _apQueue.pop(); + if (!_apQueue.empty()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "onPlayDone: starting queued audio on track %s\n", _trackName.c_str()); + _apQueue.front()->start(std::bind(&Track::onPlayDone, this, std::placeholders::_1, std::placeholders::_2)); + } + } + else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "onPlayDone: track %s stopping\n", _trackName.c_str()); + } +} + +void Track::queueFileAudio(const std::string& path, int gain, bool loop) { + bool startIt = false; + if (_stopping) return; + + auto ap = std::make_shared(_mutex, _buffer, _sampleRate); + ap->queueFileAudio(path, gain, loop); + { + std::lock_guard lock(_mutex); + _apQueue.push(ap); + startIt = _apQueue.size() == 1; + } + + if (startIt) { + try { + ap->start(std::bind(&Track::onPlayDone, this, std::placeholders::_1, std::placeholders::_2)); + } catch (std::exception& e) { + onPlayDone(true, e.what()); + } + } +} + +void Track::queueHttpGetAudio(const std::string& url, int gain, bool loop) { + bool startIt = false; + if (_stopping) return; + auto ap = std::make_shared(_mutex, _buffer, _sampleRate); + ap->queueHttpGetAudio(url, gain, loop); + { + std::lock_guard lock(_mutex); + _apQueue.push(ap); + startIt = _apQueue.size() == 1; + } + + if (startIt) { + try { + ap->start(std::bind(&Track::onPlayDone, this, std::placeholders::_1, std::placeholders::_2)); + } catch (std::exception& e) { + onPlayDone(true, e.what()); + } + } +} + +void Track::queueHttpPostAudio(const std::string& url, int gain, bool loop) { + bool startIt = false; + if (_stopping) return; + auto ap = std::make_shared(_mutex, _buffer, _sampleRate); + ap->queueHttpPostAudio(url, gain, loop); + { + std::lock_guard lock(_mutex); + _apQueue.push(ap); + startIt = _apQueue.size() == 1; + } + + if (startIt) { + try { + ap->start(std::bind(&Track::onPlayDone, this, std::placeholders::_1, std::placeholders::_2)); + } catch (std::exception& e) { + onPlayDone(true, e.what()); + } + } +} + +void Track::queueHttpPostAudio(const std::string& url, const std::string& body, std::vector& headers, int gain, bool loop) { + bool startIt = false; + if (_stopping) return; + auto ap = std::make_shared(_mutex, _buffer, _sampleRate); + ap->queueHttpPostAudio(url, body, headers, gain, loop); + { + std::lock_guard lock(_mutex); + _apQueue.push(ap); + startIt = _apQueue.size() == 1; + } + + if (startIt) { + try { + ap->start(std::bind(&Track::onPlayDone, this, std::placeholders::_1, std::placeholders::_2)); + } catch (std::exception& e) { + onPlayDone(true, e.what()); + } + } +} + + bool Track::hasAudio() { + if (_stopping) return false; + std::lock_guard lock(_mutex); + return hasAudio_NoLock(); +} + +void Track::removeAllAudio() { + _stopping = true; + std::queue> apQueueCopy; + { + std::lock_guard lock(_mutex); + apQueueCopy = _apQueue; + _apQueue = std::queue>(); + } + + while (!apQueueCopy.empty()) { + auto ap = apQueueCopy.front(); + apQueueCopy.pop(); + ap->stop(); + } +} \ No newline at end of file diff --git a/mod_dub/track.h b/mod_dub/track.h new file mode 100644 index 0000000..870b26a --- /dev/null +++ b/mod_dub/track.h @@ -0,0 +1,52 @@ +#ifndef __TRACK_H__ +#define __TRACK_H__ + +#include +#include +#include "common.h" +#include "ap.h" + +class Track { +public: + Track(const std::string& trackName, int sampleRate); + ~Track(); + + /* audio production methods */ + void queueHttpGetAudio(const std::string& url, int gain = 0, bool loop = false); + void queueHttpPostAudio(const std::string& url, int gain = 0, bool loop = false); + void queueHttpPostAudio(const std::string& url, const std::string& body, std::vector& headers, int gain = 0, bool loop = false); + void queueFileAudio(const std::string& path, int gain = 0, bool loop = false); + void removeAllAudio(); + + void onPlayDone(bool hasError, const std::string& errMsg); + + std::string& getTrackName() { return _trackName; } + + /* audio playout methods */ + bool hasAudio(); + inline bool hasAudio_NoLock() const { + return !_stopping && !_buffer.empty(); + } + + + int retrieveAndClearAudio(int16_t* buf, int desiredSamples) { + std::lock_guard lock(_mutex); + int samplesToCopy = std::min(static_cast(_buffer.size()), desiredSamples); + std::copy_n(_buffer.begin(), samplesToCopy, buf); + _buffer.erase(_buffer.begin(), _buffer.begin() + samplesToCopy); + return samplesToCopy; + } + +private: + std::string _trackName; + int _sampleRate; + std::mutex _mutex; + CircularBuffer_t _buffer; + std::queue> _apQueue; + bool _stopping; +}; + + + + +#endif diff --git a/mod_dub/tts_vendor_parser.cpp b/mod_dub/tts_vendor_parser.cpp new file mode 100644 index 0000000..2c315f0 --- /dev/null +++ b/mod_dub/tts_vendor_parser.cpp @@ -0,0 +1,118 @@ +#include "tts_vendor_parser.h" + +#include +#include +#include + +switch_status_t elevenlabs_parse_text(const std::map& params, const std::string& text, + std::string& url, std::string& body, std::vector& headers) { + + std::string api_key; + std::string voice_name; + std::string model_id; + std::string similarity_boost; + std::string stability; + std::string style; + std::string use_speaker_boost; + std::string optimize_streaming_latency; + + for (const auto& pair : params) { + if (pair.first == "api_key") { + api_key = pair.second; + } else if (pair.first == "voice") { + voice_name = pair.second; + } else if (pair.first == "model_id") { + model_id = pair.second; + } else if (pair.first == "similarity_boost") { + similarity_boost = pair.second; + } else if (pair.first == "stability") { + stability = pair.second; + } else if (pair.first == "style") { + style = pair.second; + } else if (pair.first == "use_speaker_boost") { + use_speaker_boost = pair.second; + } else if (pair.first == "modeloptimize_streaming_latency_id") { + optimize_streaming_latency = pair.second; + } + } + + if (api_key.empty()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "elevenlabs_parse_text: no api_key provided\n"); + return SWITCH_STATUS_FALSE; + } + if (model_id.empty()) { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "elevenlabs_parse_text: no model_id provided\n"); + return SWITCH_STATUS_FALSE; + } + if (optimize_streaming_latency.empty()) { + optimize_streaming_latency = "2"; + } + + // URL + std::ostringstream url_stream; + url_stream << "https://api.elevenlabs.io/v1/text-to-speech/" << voice_name << "/stream?"; + url_stream << "optimize_streaming_latency=" << optimize_streaming_latency << "&output_format=mp3_44100_128"; + url = url_stream.str(); + + /* create the JSON body */ + cJSON * jResult = cJSON_CreateObject(); + cJSON_AddStringToObject(jResult, "model_id", model_id.c_str()); + cJSON_AddStringToObject(jResult, "text", text.c_str()); + if (!similarity_boost.empty() || !style.empty() || !use_speaker_boost.empty() || !stability.empty()) { + cJSON * jVoiceSettings = cJSON_CreateObject(); + cJSON_AddItemToObject(jResult, "voice_settings", jVoiceSettings); + if (!similarity_boost.empty()) { + cJSON_AddStringToObject(jVoiceSettings, "similarity_boost", similarity_boost.c_str()); + } + if (!style.empty()) { + cJSON_AddStringToObject(jVoiceSettings, "style", style.c_str()); + } + if (!use_speaker_boost.empty()) { + cJSON_AddStringToObject(jVoiceSettings, "use_speaker_boost", use_speaker_boost.c_str()); + } + if (!stability.empty()) { + cJSON_AddStringToObject(jVoiceSettings, "stability", stability.c_str()); + } + } + char* _body = cJSON_PrintUnformatted(jResult); + body = _body; + + cJSON_Delete(jResult); + free(_body); + + // Create headers + headers.push_back("xi-api-key: " + api_key); + headers.push_back("Content-Type: application/json"); + + return SWITCH_STATUS_SUCCESS; +} + +switch_status_t tts_vendor_parse_text(const std::string& say, std::string& url, std::string& body, std::vector& headers) { + size_t start = say.find("{") + 1; + size_t end = say.find("}"); + + std::string text = say.substr(end + 1); + + std::string params_string = say.substr(start, end - start); + std::istringstream ss(params_string); + std::map params; + + while (ss.good()) { + std::string substr; + getline(ss, substr, ','); + substr.erase(0, substr.find_first_not_of(' ')); + + size_t equal_pos = substr.find("="); + std::string key = substr.substr(0, equal_pos); + std::string value = substr.substr(equal_pos + 1, substr.size()); + + params[key] = value; + } + + if (params["vendor"] == "elevenlabs") { + return elevenlabs_parse_text(params, text, url, body, headers); + } else { + switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "tts_vendor_parse_text: There is no available parser for text\n"); + return SWITCH_STATUS_FALSE; + } +} \ No newline at end of file diff --git a/mod_dub/tts_vendor_parser.h b/mod_dub/tts_vendor_parser.h new file mode 100644 index 0000000..7a2e128 --- /dev/null +++ b/mod_dub/tts_vendor_parser.h @@ -0,0 +1,11 @@ +#ifndef __MOD_DUB_TTS_VENDOR_PARSER_H__ +#define __MOD_DUB_TTS_VENDOR_PARSER_H__ + +#include +#include +#include "common.h" + + +switch_status_t tts_vendor_parse_text(const std::string& say, std::string& url, std::string& body, std::vector& headers); + +#endif \ No newline at end of file diff --git a/mod_dub/vector_math.cpp b/mod_dub/vector_math.cpp new file mode 100644 index 0000000..9086710 --- /dev/null +++ b/mod_dub/vector_math.cpp @@ -0,0 +1,211 @@ +#include "vector_math.h" +#include +#include + +#define GRANULAR_VOLUME_MAX (50) +#define SMAX 32767 +#define SMIN (-32768) +#define normalize_to_16bit_basic(n) if (n > SMAX) n = SMAX; else if (n < SMIN) n = SMIN; +#define normalize_volume_granular(x) if (x > GRANULAR_VOLUME_MAX) x = GRANULAR_VOLUME_MAX; if (x < -GRANULAR_VOLUME_MAX) x = -GRANULAR_VOLUME_MAX; + +#ifdef __cplusplus +extern "C" { +#endif + +#if defined(USE_AVX2) +#include +#pragma message("Using AVX2 SIMD.") +void vector_add(int16_t* a, int16_t* b, size_t len) { + size_t i = 0; + for (; i + 15 < len; i += 16) { + __m256i va = _mm256_loadu_si256((const __m256i*)(a + i)); + __m256i vb = _mm256_loadu_si256((const __m256i*)(b + i)); + __m256i vc = _mm256_add_epi16(va, vb); + _mm256_storeu_si256((__m256i*)(a + i), vc); + } + for (; i < len; ++i) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + __m256i max_val = _mm256_set1_epi16(SMAX); + __m256i min_val = _mm256_set1_epi16(SMIN); + + size_t i = 0; + for (; i + 15 < len; i += 16) { + __m256i values = _mm256_loadu_si256((__m256i*)(a + i)); + __m256i gt_max = _mm256_cmpgt_epi16(values, max_val); + __m256i lt_min = _mm256_cmpgt_epi16(min_val, values); + values = _mm256_blendv_epi8(values, max_val, gt_max); + values = _mm256_blendv_epi8(values, min_val, lt_min); + _mm256_storeu_si256((__m256i*)(a + i), values); + } + + // Process remaining elements + for (; i < len; ++i) { + if (a[i] > SMAX) a[i] = SMAX; + else if (a[i] < SMIN) a[i] = SMIN; + } +} +typedef union { + int16_t* data; + __m256i* fp_avx2; +} vector_data_t; + +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol) { + float newrate = 0; + static const float pos[GRANULAR_VOLUME_MAX] = { + 1.122018, 1.258925, 1.412538, 1.584893, 1.778279, 1.995262, 2.238721, 2.511887, 2.818383, 3.162278, + 3.548134, 3.981072, 4.466835, 5.011872, 5.623413, 6.309574, 7.079458, 7.943282, 8.912509, 10.000000, + 11.220183, 12.589254, 14.125375, 15.848933, 17.782795, 19.952621, 22.387213, 25.118862, 28.183832, 31.622776, + 35.481335, 39.810719, 44.668358, 50.118729, 56.234131, 63.095726, 70.794586, 79.432816, 89.125107, 100.000000, + 112.201836, 125.892517, 141.253784, 158.489334, 177.827942, 199.526215, 223.872070, 251.188705, 281.838318, 316.227753 + }; + static const float neg[GRANULAR_VOLUME_MAX] = { + 0.891251, 0.794328, 0.707946, 0.630957, 0.562341, 0.501187, 0.446684, 0.398107, 0.354813, 0.316228, + 0.281838, 0.251189, 0.223872, 0.199526, 0.177828, 0.158489, 0.141254, 0.125893, 0.112202, 0.100000, + 0.089125, 0.079433, 0.070795, 0.063096, 0.056234, 0.050119, 0.044668, 0.039811, 0.035481, 0.031623, + 0.028184, 0.025119, 0.022387, 0.019953, 0.017783, 0.015849, 0.014125, 0.012589, 0.011220, 0.010000, + 0.008913, 0.007943, 0.007079, 0.006310, 0.005623, 0.005012, 0.004467, 0.003981, 0.003548, 0.000000 // NOTE mapped -50 dB ratio to total silence instead of 0.003162 + }; + const float* chart; + uint32_t i = abs(vol) - 1; + + if (vol == 0) return; + normalize_volume_granular(vol); + + chart = vol > 0 ? pos : neg; + + newrate = chart[i]; + + if (newrate) { + __m256 scale_factor_reg = _mm256_set1_ps(newrate); + uint32_t processed_samples = samples - (samples % 8); // Ensure we process only multiples of 8 + for (uint32_t i = 0; i < processed_samples; i += 8) { + __m128i data_ = _mm_loadu_si128((__m128i*)(data + i)); + __m256i data_32 = _mm256_cvtepi16_epi32(data_); + + __m256 data_float = _mm256_cvtepi32_ps(data_32); + __m256 result = _mm256_mul_ps(data_float, scale_factor_reg); + + __m256i result_32 = _mm256_cvtps_epi32(result); + + // Handle saturation + __m256i min_val = _mm256_set1_epi32(SMIN); + __m256i max_val = _mm256_set1_epi32(SMAX); + result_32 = _mm256_min_epi32(result_32, max_val); + result_32 = _mm256_max_epi32(result_32, min_val); + + __m128i result_16 = _mm_packs_epi32(_mm256_castsi256_si128(result_32), _mm256_extractf128_si256(result_32, 1)); + + _mm_storeu_si128((__m128i*)(data + i), result_16); + } + + // Process any remaining samples + for (uint32_t i = processed_samples; i < samples; i++) { + int32_t tmp = (int32_t)(data[i] * newrate); + tmp = tmp > SMAX ? SMAX : (tmp < SMIN ? SMIN : tmp); + data[i] = (int16_t)tmp; + } + } +} +#elif defined(USE_SSE2) +#include +#pragma message("Using SSE2 SIMD.") +void vector_add(int16_t* a, int16_t* b, size_t len) { + size_t i = 0; + for (; i + 7 < len; i += 8) { + __m128i va = _mm_loadu_si128((const __m128i*)(a + i)); + __m128i vb = _mm_loadu_si128((const __m128i*)(b + i)); + __m128i vc = _mm_add_epi16(va, vb); + _mm_storeu_si128((__m128i*)(a + i), vc); + } + for (; i < len; ++i) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + __m128i max_val = _mm_set1_epi16(SMAX); + __m128i min_val = _mm_set1_epi16(SMIN); + + size_t i = 0; + for (; i + 7 < len; i += 8) { + __m128i values = _mm_loadu_si128((__m128i*)(a + i)); + __m128i gt_max = _mm_cmpgt_epi16(values, max_val); + __m128i lt_min = _mm_cmpgt_epi16(min_val, values); + __m128i max_masked = _mm_and_si128(gt_max, max_val); + __m128i min_masked = _mm_and_si128(lt_min, min_val); + __m128i other_masked = _mm_andnot_si128(_mm_or_si128(gt_max, lt_min), values); + values = _mm_or_si128(_mm_or_si128(max_masked, min_masked), other_masked); + _mm_storeu_si128((__m128i*)(a + i), values); + } + + // Process remaining elements + for (; i < len; ++i) { + if (a[i] > SMAX) a[i] = SMAX; + else if (a[i] < SMIN) a[i] = SMIN; + } +} + +typedef union { + int16_t* data; + __m128i* fp_sse2; +} vector_data_t; + +#else +void vector_add(int16_t* a, int16_t* b, size_t len) { + for (size_t i = 0; i < len; i++) { + a[i] += b[i]; + } +} +void vector_normalize(int16_t* a, size_t len) { + for (size_t i = 0; i < len; i++) { + normalize_to_16bit_basic(a[i]); + } +} +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol) { + float newrate = 0; + static const float pos[GRANULAR_VOLUME_MAX] = { + 1.122018, 1.258925, 1.412538, 1.584893, 1.778279, 1.995262, 2.238721, 2.511887, 2.818383, 3.162278, + 3.548134, 3.981072, 4.466835, 5.011872, 5.623413, 6.309574, 7.079458, 7.943282, 8.912509, 10.000000, + 11.220183, 12.589254, 14.125375, 15.848933, 17.782795, 19.952621, 22.387213, 25.118862, 28.183832, 31.622776, + 35.481335, 39.810719, 44.668358, 50.118729, 56.234131, 63.095726, 70.794586, 79.432816, 89.125107, 100.000000, + 112.201836, 125.892517, 141.253784, 158.489334, 177.827942, 199.526215, 223.872070, 251.188705, 281.838318, 316.227753 + }; + static const float neg[GRANULAR_VOLUME_MAX] = { + 0.891251, 0.794328, 0.707946, 0.630957, 0.562341, 0.501187, 0.446684, 0.398107, 0.354813, 0.316228, + 0.281838, 0.251189, 0.223872, 0.199526, 0.177828, 0.158489, 0.141254, 0.125893, 0.112202, 0.100000, + 0.089125, 0.079433, 0.070795, 0.063096, 0.056234, 0.050119, 0.044668, 0.039811, 0.035481, 0.031623, + 0.028184, 0.025119, 0.022387, 0.019953, 0.017783, 0.015849, 0.014125, 0.012589, 0.011220, 0.010000, + 0.008913, 0.007943, 0.007079, 0.006310, 0.005623, 0.005012, 0.004467, 0.003981, 0.003548, 0.000000 // NOTE mapped -50 dB ratio to total silence instead of 0.003162 + }; + const float* chart; + uint32_t i; + + if (vol == 0) return; + + normalize_volume_granular(vol); + + chart = vol > 0 ? pos : neg; + + i = abs(vol) - 1; + assert(i < GRANULAR_VOLUME_MAX); + newrate = chart[i]; + if (newrate) { + int32_t tmp; + uint32_t x; + int16_t *fp = data; + + for (x = 0; x < samples; x++) { + tmp = (int32_t) (fp[x] * newrate); + normalize_to_16bit_basic(tmp); + fp[x] = (int16_t) tmp; + } + } +} + +#endif + +#ifdef __cplusplus +} +#endif diff --git a/mod_dub/vector_math.h b/mod_dub/vector_math.h new file mode 100644 index 0000000..18bc953 --- /dev/null +++ b/mod_dub/vector_math.h @@ -0,0 +1,20 @@ +#ifndef VECTOR_MATH_H +#define VECTOR_MATH_H + +#include +#include + + +#ifdef __cplusplus +extern "C" { +#endif + +void vector_add(int16_t* a, int16_t* b, size_t len); +void vector_normalize(int16_t* a, size_t len); +void vector_change_sln_volume_granular(int16_t* data, uint32_t samples, int32_t vol); + +#ifdef __cplusplus +} +#endif + +#endif