eliminate support for multiple lws threads as part of fixing valgrind errors

Signed-off-by: Dave Horton <daveh@beachdognet.com>
This commit is contained in:
Dave Horton
2023-12-26 10:57:15 -05:00
parent a2324972eb
commit 420e51eac7
140 changed files with 19851 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
Copyright 2023, Drachtio Communications Services, LLC
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,9 @@
include $(top_srcdir)/build/modmake.rulesam
MODNAME=mod_ibm_transcribe
mod_LTLIBRARIES = mod_ibm_transcribe.la
mod_ibm_transcribe_la_SOURCES = mod_ibm_transcribe.c ibm_transcribe_glue.cpp audio_pipe.cpp parser.cpp
mod_ibm_transcribe_la_CFLAGS = $(AM_CFLAGS)
mod_ibm_transcribe_la_CXXFLAGS = $(AM_CXXFLAGS) -std=c++11
mod_ibm_transcribe_la_LIBADD = $(switch_builddir)/libfreeswitch.la
mod_ibm_transcribe_la_LDFLAGS = -avoid-version -module -no-undefined -shared `pkg-config --libs libwebsockets`

View File

@@ -0,0 +1,57 @@
# mod_ibm_transcribe
A Freeswitch module that generates real-time transcriptions on a Freeswitch channel by using IBM Watson
## API
### Commands
The freeswitch module exposes the following API commands:
```
uuid_ibm_transcribe <uuid> start <lang-code> [interim]
```
Attaches media bug to channel and performs streaming recognize request.
- `uuid` - unique identifier of Freeswitch channel
- `lang-code` - a valid IBM [language code](https://cloud.ibm.com/docs/speech-to-text?topic=speech-to-text-models-ng#models-ng-supported) that is supported for streaming transcription
- `interim` - If the 'interim' keyword is present then both interim and final transcription results will be returned; otherwise only final transcriptions will be returned
```
uuid_ibm_transcribe <uuid> stop
```
Stop transcription on the channel.
### Channel Variables
| variable | Description |
| --- | ----------- |
| IBM_ACCESS_TOKEN | IBM access token used to authenticate |
| IBM_SPEECH_INSTANCE_ID |IBM instance id |
| IBM_SPEECH_MODEL | IBM speech model (https://cloud.ibm.com/docs/speech-to-text?topic=speech-to-text-websockets) |
| IBM_SPEECH_LANGUAGE_CUSTOMIZATION_ID |IBM speech language customization id |
| IBM_SPEECH_ACOUSTIC_CUSTOMIZATION_ID | IBM accoustic customization id|
| IBM_SPEECH_BASE_MODEL_VERSION | IBM base model version |
| IBM_SPEECH_WATSON_METADATA | customer metadata to pass to IBM watson |
| IBM_SPEECH_WATSON_LEARNING_OPT_OUT | 1 means opt out |
### Events
`ibm_transcribe::transcription` - returns an interim or final transcription. The event contains a JSON body describing the transcription result:
```json
{
"result_index": 0,
"results": [{
"final": true,
"alternatives": [{
"transcript": "what kind of dog is that",
"confidence": 0.83
}]
}]
}
```
## Usage
When using [drachtio-fsrmf](https://www.npmjs.com/package/drachtio-fsmrf), you can access this API command via the api method on the 'endpoint' object.
```js
ep.api('uuid_ibm_transcribe', `${ep.uuid} start en-US interim`);
```

View File

@@ -0,0 +1,513 @@
#include "audio_pipe.hpp"
#include <cassert>
#include <sstream>
#include <iostream>
/* discard incoming text messages over the socket that are longer than this */
#define MAX_RECV_BUF_SIZE (65 * 1024 * 10)
#define RECV_BUF_REALLOC_SIZE (8 * 1024)
using namespace ibm;
namespace {
static const char *requestedTcpKeepaliveSecs = std::getenv("MOD_AUDIO_FORK_TCP_KEEPALIVE_SECS");
static int nTcpKeepaliveSecs = requestedTcpKeepaliveSecs ? ::atoi(requestedTcpKeepaliveSecs) : 55;
}
int AudioPipe::lws_callback(struct lws *wsi,
enum lws_callback_reasons reason,
void *user, void *in, size_t len) {
struct AudioPipe::lws_per_vhost_data *vhd =
(struct AudioPipe::lws_per_vhost_data *) lws_protocol_vh_priv_get(lws_get_vhost(wsi), lws_get_protocol(wsi));
struct lws_vhost* vhost = lws_get_vhost(wsi);
AudioPipe ** ppAp = (AudioPipe **) user;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
vhd = (struct AudioPipe::lws_per_vhost_data *) lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct AudioPipe::lws_per_vhost_data));
vhd->context = lws_get_context(wsi);
vhd->protocol = lws_get_protocol(wsi);
vhd->vhost = lws_get_vhost(wsi);
break;
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
processPendingConnects(vhd);
processPendingDisconnects(vhd);
processPendingWrites();
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
{
AudioPipe* ap = findAndRemovePendingConnect(wsi);
int rc = lws_http_client_http_response(wsi);
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR: %s, response status %d\n", in ? (char *)in : "(null)", rc);
if (ap) {
ap->m_state = LWS_CLIENT_FAILED;
ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_FAIL, (char *) in, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str());
}
else {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CONNECTION_ERROR unable to find wsi %p..\n", wsi);
}
}
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
AudioPipe* ap = findAndRemovePendingConnect(wsi);
if (ap) {
std::ostringstream oss;
*ppAp = ap;
ap->m_vhd = vhd;
ap->m_state = LWS_CLIENT_CONNECTED;
if (ap->isFinished()) {
//std::cerr << "Got quick hangup from client while connecting, immediate close" << std::endl;
ap->close();
}
else {
oss << "{\"action\": \"start\",";
oss << "\"content-type\": \"audio/l16;rate=16000\"";
oss << ",\"interim_results\": true";
oss << ",\"low_latency\": false";
oss << "}";
ap->bufferForSending(oss.str().c_str());
ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECT_SUCCESS, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str());
}
}
else {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_ESTABLISHED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
}
}
break;
case LWS_CALLBACK_CLIENT_CLOSED:
{
AudioPipe* ap = *ppAp;
if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_CLOSED %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0;
}
if (ap->m_state == LWS_CLIENT_DISCONNECTING) {
// closed by us
lwsl_debug("%s socket closed by us\n", ap->m_uuid.c_str());
ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_CLOSED_GRACEFULLY, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str());
}
else if (ap->m_state == LWS_CLIENT_CONNECTED) {
// closed by far end
lwsl_info("%s socket closed by far end\n", ap->m_uuid.c_str());
ap->m_callback(ap->m_uuid.c_str(), AudioPipe::CONNECTION_DROPPED, NULL, ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str());
}
ap->m_state = LWS_CLIENT_DISCONNECTED;
ap->setClosed();
//NB: after receiving any of the events above, any holder of a
//pointer or reference to this object must treat is as no longer valid
*ppAp = NULL;
delete ap;
}
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
{
AudioPipe* ap = *ppAp;
if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0;
}
if (lws_frame_is_binary(wsi)) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE received binary frame, discarding.\n");
return 0;
}
if (lws_is_first_fragment(wsi)) {
// allocate a buffer for the entire chunk of memory needed
assert(nullptr == ap->m_recv_buf);
ap->m_recv_buf_len = len + lws_remaining_packet_payload(wsi);
ap->m_recv_buf = (uint8_t*) malloc(ap->m_recv_buf_len);
ap->m_recv_buf_ptr = ap->m_recv_buf;
}
size_t write_offset = ap->m_recv_buf_ptr - ap->m_recv_buf;
size_t remaining_space = ap->m_recv_buf_len - write_offset;
if (remaining_space < len) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE buffer realloc needed.\n");
size_t newlen = ap->m_recv_buf_len + RECV_BUF_REALLOC_SIZE;
if (newlen > MAX_RECV_BUF_SIZE) {
free(ap->m_recv_buf);
ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr;
ap->m_recv_buf_len = 0;
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_RECEIVE max buffer exceeded, truncating message.\n");
}
else {
ap->m_recv_buf = (uint8_t*) realloc(ap->m_recv_buf, newlen);
if (nullptr != ap->m_recv_buf) {
ap->m_recv_buf_len = newlen;
ap->m_recv_buf_ptr = ap->m_recv_buf + write_offset;
}
}
}
if (nullptr != ap->m_recv_buf) {
if (len > 0) {
memcpy(ap->m_recv_buf_ptr, in, len);
ap->m_recv_buf_ptr += len;
}
if (lws_is_final_fragment(wsi)) {
if (nullptr != ap->m_recv_buf) {
std::string msg((char *)ap->m_recv_buf, ap->m_recv_buf_ptr - ap->m_recv_buf);
//std::cerr << "Recv: " << msg << std::endl;
ap->m_callback(ap->m_uuid.c_str(), AudioPipe::MESSAGE, msg.c_str(), ap->isFinished(), ap->isInterimTranscriptsEnabled(), ap->getBugname().c_str());
if (nullptr != ap->m_recv_buf) free(ap->m_recv_buf);
}
ap->m_recv_buf = ap->m_recv_buf_ptr = nullptr;
ap->m_recv_buf_len = 0;
if (ap->isFinished()) {
// got final transcript, close the connection
ap->close();
}
}
}
}
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
{
AudioPipe* ap = *ppAp;
if (!ap) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s unable to find wsi %p..\n", ap->m_uuid.c_str(), wsi);
return 0;
}
// check for text frames to send
{
std::lock_guard<std::mutex> lk(ap->m_text_mutex);
if (ap->m_metadata.length() > 0) {
//std::cerr << "Sending: " << ap->m_metadata << std::endl;
uint8_t buf[ap->m_metadata.length() + LWS_PRE];
memcpy(buf + LWS_PRE, ap->m_metadata.c_str(), ap->m_metadata.length());
int n = ap->m_metadata.length();
int m = lws_write(wsi, buf + LWS_PRE, n, LWS_WRITE_TEXT);
ap->m_metadata.clear();
if (m < n) {
return -1;
}
// there may be audio data, but only one write per writeable event
// get it next time
lws_callback_on_writable(wsi);
return 0;
}
}
if (ap->m_state == LWS_CLIENT_DISCONNECTING) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
return -1;
}
// check for audio packets
{
std::lock_guard<std::mutex> lk(ap->m_audio_mutex);
if (ap->m_audio_buffer_write_offset > LWS_PRE) {
size_t datalen = ap->m_audio_buffer_write_offset - LWS_PRE;
int sent = lws_write(wsi, (unsigned char *) ap->m_audio_buffer + LWS_PRE, datalen, LWS_WRITE_BINARY);
if (sent < datalen) {
lwsl_err("AudioPipe::lws_service_thread LWS_CALLBACK_CLIENT_WRITEABLE %s attemped to send %lu only sent %d wsi %p..\n",
ap->m_uuid.c_str(), datalen, sent, wsi);
}
ap->m_audio_buffer_write_offset = LWS_PRE;
}
}
return 0;
}
break;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
// static members
static const lws_retry_bo_t retry = {
nullptr, // retry_ms_table
0, // retry_ms_table_count
0, // conceal_count
UINT16_MAX, // secs_since_valid_ping
UINT16_MAX, // secs_since_valid_hangup
0 // jitter_percent
};
struct lws_context *AudioPipe::context = nullptr;
std::mutex AudioPipe::mutex_connects;
std::mutex AudioPipe::mutex_disconnects;
std::mutex AudioPipe::mutex_writes;
std::list<AudioPipe*> AudioPipe::pendingConnects;
std::list<AudioPipe*> AudioPipe::pendingDisconnects;
std::list<AudioPipe*> AudioPipe::pendingWrites;
AudioPipe::log_emit_function AudioPipe::logger;
std::mutex AudioPipe::mapMutex;
bool AudioPipe::stopFlag;
void AudioPipe::processPendingConnects(lws_per_vhost_data *vhd) {
std::list<AudioPipe*> connects;
{
std::lock_guard<std::mutex> guard(mutex_connects);
for (auto it = pendingConnects.begin(); it != pendingConnects.end(); ++it) {
if ((*it)->m_state == LWS_CLIENT_IDLE) {
connects.push_back(*it);
(*it)->m_state = LWS_CLIENT_CONNECTING;
}
}
}
for (auto it = connects.begin(); it != connects.end(); ++it) {
AudioPipe* ap = *it;
ap->connect_client(vhd);
}
}
void AudioPipe::processPendingDisconnects(lws_per_vhost_data *vhd) {
std::list<AudioPipe*> disconnects;
{
std::lock_guard<std::mutex> guard(mutex_disconnects);
for (auto it = pendingDisconnects.begin(); it != pendingDisconnects.end(); ++it) {
if ((*it)->m_state == LWS_CLIENT_DISCONNECTING) disconnects.push_back(*it);
}
pendingDisconnects.clear();
}
for (auto it = disconnects.begin(); it != disconnects.end(); ++it) {
AudioPipe* ap = *it;
lws_callback_on_writable(ap->m_wsi);
}
}
void AudioPipe::processPendingWrites() {
std::list<AudioPipe*> writes;
{
std::lock_guard<std::mutex> guard(mutex_writes);
for (auto it = pendingWrites.begin(); it != pendingWrites.end(); ++it) {
if ((*it)->m_state == LWS_CLIENT_CONNECTED) writes.push_back(*it);
}
pendingWrites.clear();
}
for (auto it = writes.begin(); it != writes.end(); ++it) {
AudioPipe* ap = *it;
lws_callback_on_writable(ap->m_wsi);
}
}
AudioPipe* AudioPipe::findAndRemovePendingConnect(struct lws *wsi) {
AudioPipe* ap = NULL;
std::lock_guard<std::mutex> guard(mutex_connects);
std::list<AudioPipe* > toRemove;
for (auto it = pendingConnects.begin(); it != pendingConnects.end() && !ap; ++it) {
int state = (*it)->m_state;
if ((*it)->m_wsi == nullptr)
toRemove.push_back(*it);
if ((state == LWS_CLIENT_CONNECTING) &&
(*it)->m_wsi == wsi) ap = *it;
}
for (auto it = toRemove.begin(); it != toRemove.end(); ++it)
pendingConnects.remove(*it);
if (ap) {
pendingConnects.remove(ap);
}
return ap;
}
AudioPipe* AudioPipe::findPendingConnect(struct lws *wsi) {
AudioPipe* ap = NULL;
std::lock_guard<std::mutex> guard(mutex_connects);
for (auto it = pendingConnects.begin(); it != pendingConnects.end() && !ap; ++it) {
int state = (*it)->m_state;
if ((state == LWS_CLIENT_CONNECTING) &&
(*it)->m_wsi == wsi) ap = *it;
}
return ap;
}
void AudioPipe::addPendingConnect(AudioPipe* ap) {
{
std::lock_guard<std::mutex> guard(mutex_connects);
pendingConnects.push_back(ap);
lwsl_debug("%s after adding connect there are %lu pending connects\n",
ap->m_uuid.c_str(), pendingConnects.size());
}
lws_cancel_service(context);
}
void AudioPipe::addPendingDisconnect(AudioPipe* ap) {
ap->m_state = LWS_CLIENT_DISCONNECTING;
{
std::lock_guard<std::mutex> guard(mutex_disconnects);
pendingDisconnects.push_back(ap);
lwsl_debug("%s after adding disconnect there are %lu pending disconnects\n",
ap->m_uuid.c_str(), pendingDisconnects.size());
}
lws_cancel_service(ap->m_vhd->context);
}
void AudioPipe::addPendingWrite(AudioPipe* ap) {
{
std::lock_guard<std::mutex> guard(mutex_writes);
pendingWrites.push_back(ap);
}
lws_cancel_service(ap->m_vhd->context);
}
bool AudioPipe::lws_service_thread() {
struct lws_context_creation_info info;
const struct lws_protocols protocols[] = {
{
"",
AudioPipe::lws_callback,
sizeof(void *),
1024,
},
{ NULL, NULL, 0, 0 }
};
memset(&info, 0, sizeof info);
info.port = CONTEXT_PORT_NO_LISTEN;
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.protocols = protocols;
info.ka_time = nTcpKeepaliveSecs; // tcp keep-alive timer
info.ka_probes = 4; // number of times to try ka before closing connection
info.ka_interval = 5; // time between ka's
info.timeout_secs = 10; // doc says timeout for "various processes involving network roundtrips"
info.keepalive_timeout = 5; // seconds to allow remote client to hold on to an idle HTTP/1.1 connection
info.timeout_secs_ah_idle = 10; // secs to allow a client to hold an ah without using it
info.retry_and_idle_policy = &retry;
lwsl_notice("AudioPipe::lws_service_thread creating context\n");
context = lws_create_context(&info);
if (!context) {
lwsl_err("AudioPipe::lws_service_thread failed creating context\n");
return false;
}
int n;
do {
n = lws_service(context, 0);
} while (n >= 0 && !stopFlag);
lwsl_notice("AudioPipe::lws_service_thread ending\n");
lws_context_destroy(context);
return true;
}
void AudioPipe::initialize(int loglevel, log_emit_function logger) {
lws_set_log_level(loglevel, logger);
lwsl_notice("AudioPipe::initialize starting\n");
std::lock_guard<std::mutex> lock(mapMutex);
std::thread t(&AudioPipe::lws_service_thread);
stopFlag = false;
t.detach();
}
bool AudioPipe::deinitialize() {
lwsl_notice("AudioPipe::deinitialize\n");
std::lock_guard<std::mutex> lock(mapMutex);
stopFlag = true;
return true;
}
// instance members
AudioPipe::AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path,
size_t bufLen, size_t minFreespace, notifyHandler_t callback) :
m_uuid(uuid), m_host(host), m_port(port), m_path(path), m_finished(false),
m_audio_buffer_min_freespace(minFreespace), m_audio_buffer_max_len(bufLen), m_gracefulShutdown(false),
m_audio_buffer_write_offset(LWS_PRE), m_recv_buf(nullptr), m_recv_buf_ptr(nullptr), m_interim(false),
m_state(LWS_CLIENT_IDLE), m_wsi(nullptr), m_vhd(nullptr), m_callback(callback) {
m_audio_buffer = new uint8_t[m_audio_buffer_max_len];
}
AudioPipe::~AudioPipe() {
//std::cerr << "AudioPipe::~AudioPipe " << std::endl;
if (m_audio_buffer) delete [] m_audio_buffer;
if (m_recv_buf) delete [] m_recv_buf;
}
void AudioPipe::connect(void) {
addPendingConnect(this);
}
bool AudioPipe::connect_client(struct lws_per_vhost_data *vhd) {
assert(m_audio_buffer != nullptr);
assert(m_vhd == nullptr);
struct lws_client_connect_info i;
memset(&i, 0, sizeof(i));
i.context = vhd->context;
i.port = m_port;
i.address = m_host.c_str();
i.path = m_path.c_str();
i.host = i.address;
i.origin = i.address;
i.ssl_connection = LCCSCF_USE_SSL;
i.pwsi = &(m_wsi);
m_state = LWS_CLIENT_CONNECTING;
m_vhd = vhd;
m_wsi = lws_client_connect_via_info(&i);
lwsl_debug("%s attempting connection, wsi is %p\n", m_uuid.c_str(), m_wsi);
return nullptr != m_wsi;
}
void AudioPipe::bufferForSending(const char* text) {
if (m_state != LWS_CLIENT_CONNECTED) return;
{
std::lock_guard<std::mutex> lk(m_text_mutex);
m_metadata.append(text);
}
addPendingWrite(this);
}
void AudioPipe::unlockAudioBuffer() {
if (m_audio_buffer_write_offset > LWS_PRE) addPendingWrite(this);
m_audio_mutex.unlock();
}
void AudioPipe::close() {
if (m_state != LWS_CLIENT_CONNECTED) return;
addPendingDisconnect(this);
}
void AudioPipe::finish() {
if (m_finished || m_state != LWS_CLIENT_CONNECTED) {
m_finished = true;
return;
}
m_finished = true;
bufferForSending("{\"action\": \"stop\"}");
}
void AudioPipe::waitForClose() {
std::shared_future<void> sf(m_promise.get_future());
sf.wait();
return;
}

View File

@@ -0,0 +1,161 @@
#ifndef __IBM_AUDIO_PIPE_HPP__
#define __IBM_AUDIO_PIPE_HPP__
#include <string>
#include <list>
#include <mutex>
#include <future>
#include <queue>
#include <unordered_map>
#include <thread>
#include <libwebsockets.h>
namespace ibm {
class AudioPipe {
public:
enum LwsState_t {
LWS_CLIENT_IDLE,
LWS_CLIENT_CONNECTING,
LWS_CLIENT_CONNECTED,
LWS_CLIENT_FAILED,
LWS_CLIENT_DISCONNECTING,
LWS_CLIENT_DISCONNECTED
};
enum NotifyEvent_t {
CONNECT_SUCCESS,
CONNECT_FAIL,
CONNECTION_DROPPED,
CONNECTION_CLOSED_GRACEFULLY,
MESSAGE
};
typedef void (*log_emit_function)(int level, const char *line);
typedef void (*notifyHandler_t)(const char *sessionId, NotifyEvent_t event, const char* message, bool finished, bool wantsInterim, const char* bugname);
struct lws_per_vhost_data {
struct lws_context *context;
struct lws_vhost *vhost;
const struct lws_protocols *protocol;
};
static void initialize(int loglevel, log_emit_function logger);
static bool deinitialize();
static bool lws_service_thread();
// constructor
AudioPipe(const char* uuid, const char* host, unsigned int port, const char* path,
size_t bufLen, size_t minFreespace, notifyHandler_t callback);
~AudioPipe();
LwsState_t getLwsState(void) { return m_state; }
void connect(void);
void bufferForSending(const char* text);
size_t binarySpaceAvailable(void) {
return m_audio_buffer_max_len - m_audio_buffer_write_offset;
}
size_t binaryMinSpace(void) {
return m_audio_buffer_min_freespace;
}
char * binaryWritePtr(void) {
return (char *) m_audio_buffer + m_audio_buffer_write_offset;
}
void binaryWritePtrAdd(size_t len) {
m_audio_buffer_write_offset += len;
}
void binaryWritePtrResetToZero(void) {
m_audio_buffer_write_offset = 0;
}
void lockAudioBuffer(void) {
m_audio_mutex.lock();
}
void unlockAudioBuffer(void) ;
void enableInterimTranscripts(bool interim) {
m_interim = interim;
}
bool isInterimTranscriptsEnabled(void) {
return m_interim;
}
void setAccessToken(const char* accessToken) {
m_access_token = accessToken;
}
std::string& getAccessToken(void) {
return m_access_token;
}
void setBugname(const char* bugname) {
m_bugname = bugname;
}
std::string& getBugname(void) {
return m_bugname;
}
void close() ;
void finish();
void waitForClose();
void setClosed() { m_promise.set_value(); }
bool isFinished() { return m_finished;}
// no default constructor or copying
AudioPipe() = delete;
AudioPipe(const AudioPipe&) = delete;
void operator=(const AudioPipe&) = delete;
private:
static int lws_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
static struct lws_context *context;
static std::mutex mutex_connects;
static std::mutex mutex_disconnects;
static std::mutex mutex_writes;
static std::list<AudioPipe*> pendingConnects;
static std::list<AudioPipe*> pendingDisconnects;
static std::list<AudioPipe*> pendingWrites;
static log_emit_function logger;
static std::mutex mapMutex;
static bool stopFlag;
static AudioPipe* findAndRemovePendingConnect(struct lws *wsi);
static AudioPipe* findPendingConnect(struct lws *wsi);
static void addPendingConnect(AudioPipe* ap);
static void addPendingDisconnect(AudioPipe* ap);
static void addPendingWrite(AudioPipe* ap);
static void processPendingConnects(lws_per_vhost_data *vhd);
static void processPendingDisconnects(lws_per_vhost_data *vhd);
static void processPendingWrites(void);
bool connect_client(struct lws_per_vhost_data *vhd);
LwsState_t m_state;
std::string m_uuid;
std::string m_host;
unsigned int m_port;
std::string m_path;
std::string m_metadata;
std::mutex m_text_mutex;
std::mutex m_audio_mutex;
int m_sslFlags;
struct lws *m_wsi;
uint8_t *m_audio_buffer;
size_t m_audio_buffer_max_len;
size_t m_audio_buffer_write_offset;
size_t m_audio_buffer_min_freespace;
uint8_t* m_recv_buf;
uint8_t* m_recv_buf_ptr;
size_t m_recv_buf_len;
struct lws_per_vhost_data* m_vhd;
notifyHandler_t m_callback;
log_emit_function m_logger;
bool m_gracefulShutdown;
bool m_finished;
bool m_interim;
std::string m_access_token;
std::string m_bugname;
std::promise<void> m_promise;
};
} // namespace ibm
#endif

View File

@@ -0,0 +1,488 @@
#include <switch.h>
#include <switch_json.h>
#include <string.h>
#include <string>
#include <mutex>
#include <thread>
#include <list>
#include <algorithm>
#include <functional>
#include <cassert>
#include <cstdlib>
#include <fstream>
#include <sstream>
#include <regex>
#include <map>
#include <iostream>
#include "mod_ibm_transcribe.h"
#include "simple_buffer.h"
#include "parser.hpp"
#include "audio_pipe.hpp"
#define RTP_PACKETIZATION_PERIOD 20
#define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/
namespace {
static bool hasDefaultCredentials = false;
static const char* defaultApiKey = nullptr;
static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS");
static int nAudioBufferSecs = std::max(1, std::min(requestedBufferSecs ? ::atoi(requestedBufferSecs) : 2, 7));
static const char *requestedNumServiceThreads = std::getenv("MOD_AUDIO_FORK_SERVICE_THREADS");
static unsigned int nServiceThreads = std::max(1, std::min(requestedNumServiceThreads ? ::atoi(requestedNumServiceThreads) : 1, 5));
static unsigned int idxCallCount = 0;
static uint32_t playCount = 0;
static const std::map<ibm::AudioPipe::NotifyEvent_t, std::string> Event2Str = {
{ibm::AudioPipe::CONNECT_SUCCESS, "CONNECT_SUCCESS"},
{ibm::AudioPipe::CONNECT_FAIL, "CONNECT_FAIL"},
{ibm::AudioPipe::CONNECTION_DROPPED, "CONNECTION_DROPPED"},
{ibm::AudioPipe::CONNECTION_CLOSED_GRACEFULLY, "CONNECTION_CLOSED_GRACEFULLY"},
{ibm::AudioPipe::MESSAGE, "MESSAGE"}
};
static std::string EventStr(ibm::AudioPipe::NotifyEvent_t event) {
auto it = Event2Str.find(event);
if (it != Event2Str.end()) {
return it->second;
}
return "UNKNOWN";
}
/*
static void reaper(private_t *tech_pvt) {
std::shared_ptr<ibm::AudioPipe> pAp;
pAp.reset((ibm::AudioPipe *)tech_pvt->pAudioPipe);
tech_pvt->pAudioPipe = nullptr;
std::thread t([pAp]{
pAp->finish();
pAp->waitForClose();
});
t.detach();
}
*/
static void destroy_tech_pvt(private_t *tech_pvt) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s (%u) destroy_tech_pvt\n", tech_pvt->sessionId, tech_pvt->id);
if (tech_pvt) {
if (tech_pvt->pAudioPipe) {
ibm::AudioPipe* p = (ibm::AudioPipe *) tech_pvt->pAudioPipe;
delete p;
tech_pvt->pAudioPipe = nullptr;
}
if (tech_pvt->resampler) {
speex_resampler_destroy(tech_pvt->resampler);
tech_pvt->resampler = NULL;
}
/*
if (tech_pvt->vad) {
switch_vad_destroy(&tech_pvt->vad);
tech_pvt->vad = nullptr;
}
*/
}
}
static void responseHandler(switch_core_session_t* session,
const char* eventName, const char * json, const char* bugname, int finished) {
switch_event_t *event;
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_event_create_subclass(&event, SWITCH_EVENT_CUSTOM, eventName);
switch_channel_event_set_data(channel, event);
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "transcription-vendor", "ibm");
switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "transcription-session-finished", finished ? "true" : "false");
if (finished) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "responseHandler returning event %s, from finished recognition session\n", eventName);
}
if (json) switch_event_add_body(event, "%s", json);
if (bugname) switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "media-bugname", bugname);
switch_event_fire(&event);
}
std::string encodeURIComponent(std::string decoded)
{
std::ostringstream oss;
std::regex r("[!'\\(\\)*-.0-9A-Za-z_~:]");
for (char &c : decoded)
{
if (std::regex_match((std::string){c}, r))
{
oss << c;
}
else
{
oss << "%" << std::uppercase << std::hex << (0xff & c);
}
}
return oss.str();
}
std::string& constructPath(switch_core_session_t* session, std::string& path,
int sampleRate, int channels, const char* language, int interim) {
switch_channel_t *channel = switch_core_session_get_channel(session);
const char *var ;
std::ostringstream oss;
const char* instanceId = switch_channel_get_variable(channel, "IBM_SPEECH_INSTANCE_ID");
oss << "/instances/" << instanceId << "/v1/recognize";
// access token
if (var = switch_channel_get_variable(channel, "IBM_ACCESS_TOKEN")) {
oss << "?access_token=" << var;
}
// model = voice
if (var = switch_channel_get_variable(channel, "IBM_SPEECH_MODEL")) {
oss << "&model=" << var;
}
else {
oss << "&model=" << language;
}
if (var = switch_channel_get_variable(channel, "IBM_SPEECH_LANGUAGE_CUSTOMIZATION_ID")) {
oss << "&language_customization_id=" << var;
}
if (var = switch_channel_get_variable(channel, "IBM_SPEECH_ACOUSTIC_CUSTOMIZATION_ID")) {
oss << "&acoustic_customization_id=" << var;
}
if (var = switch_channel_get_variable(channel, "IBM_SPEECH_BASE_MODEL_VERSION")) {
oss << "&base_model_version=" << var;
}
if (var = switch_channel_get_variable(channel, "IBM_SPEECH_WATSON_METADATA")) {
oss << "&x-watson-metadata=" << var;
}
if (switch_true(switch_channel_get_variable(channel, "IBM_SPEECH_WATSON_LEARNING_OPT_OUT"))) {
oss << "&x-watson-learning-opt-out=true";
}
path = oss.str();
return path;
}
static void eventCallback(const char* sessionId, ibm::AudioPipe::NotifyEvent_t event, const char* message, bool finished, bool wantsInterim, const char* bugname) {
switch_core_session_t* session = switch_core_session_locate(sessionId);
if (session) {
bool releaseAudioPipe = false;
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "received %s: %s\n", EventStr(event).c_str(), message);
switch (event) {
case ibm::AudioPipe::CONNECT_SUCCESS:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "connection successful\n");
responseHandler(session, TRANSCRIBE_EVENT_CONNECT_SUCCESS, NULL, bugname, finished);
break;
case ibm::AudioPipe::CONNECT_FAIL:
{
// first thing: we can no longer access the AudioPipe
std::stringstream json;
json << "{\"reason\":\"" << message << "\"}";
releaseAudioPipe = true;
responseHandler(session, TRANSCRIBE_EVENT_CONNECT_FAIL, (char *) json.str().c_str(), bugname, finished);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "connection failed: %s\n", message);
}
break;
case ibm::AudioPipe::CONNECTION_DROPPED:
// first thing: we can no longer access the AudioPipe
releaseAudioPipe = true;
responseHandler(session, TRANSCRIBE_EVENT_DISCONNECT, NULL, bugname, finished);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection dropped from far end\n");
break;
case ibm::AudioPipe::CONNECTION_CLOSED_GRACEFULLY:
// first thing: we can no longer access the AudioPipe
releaseAudioPipe = true;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection closed gracefully\n");
break;
case ibm::AudioPipe::MESSAGE:
if (!wantsInterim && NULL != strstr(message, "\"state\": \"listening\"")) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "ibm service is listening\n");
}
else if (NULL != strstr(message, "\"final\": false")) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "got interim transcript: %s\n", message);
}
else if (NULL != strstr(message, "\"error\":")) {
responseHandler(session, TRANSCRIBE_EVENT_ERROR, message, bugname, finished);
}
else responseHandler(session, TRANSCRIBE_EVENT_RESULTS, message, bugname, finished);
break;
default:
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_NOTICE, "got unexpected msg from ibm %d:%s\n", event, message);
break;
}
if (releaseAudioPipe) {
switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname);
if (bug) {
private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug);
if (tech_pvt) tech_pvt->pAudioPipe = nullptr;
}
}
switch_core_session_rwunlock(session);
}
}
switch_status_t fork_data_init(private_t *tech_pvt, switch_core_session_t *session,
int sampling, int desiredSampling, int channels, char *lang, int interim, char* bugname) {
int err;
switch_codec_implementation_t read_impl;
switch_channel_t *channel = switch_core_session_get_channel(session);
const char* region = switch_channel_get_variable(channel, "IBM_SPEECH_REGION");
const char* instanceId = switch_channel_get_variable(channel, "IBM_SPEECH_INSTANCE_ID");
if (!region || !instanceId || !switch_channel_get_variable(channel, "IBM_ACCESS_TOKEN")) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR,
"missing IBM_SPEECH_REGION or IBM_SPEECH_INSTANCE_ID or IBM_ACCESS_TOKEN\n");
return SWITCH_STATUS_FALSE;
}
switch_core_session_get_read_impl(session, &read_impl);
memset(tech_pvt, 0, sizeof(private_t));
std::ostringstream oss;
oss << "api." << region << ".speech-to-text.watson.cloud.ibm.com";
std::string host = oss.str();
std::string path;
constructPath(session, path, desiredSampling, channels, lang, interim);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "host: %s, path: %s\n", host.c_str(), path.c_str());
strncpy(tech_pvt->sessionId, switch_core_session_get_uuid(session), MAX_SESSION_ID);
strncpy(tech_pvt->host,host.c_str(), MAX_WS_URL_LEN);
tech_pvt->port = 443;
strncpy(tech_pvt->path, path.c_str(), MAX_PATH_LEN);
tech_pvt->sampling = desiredSampling;
tech_pvt->channels = channels;
tech_pvt->id = ++idxCallCount;
tech_pvt->buffer_overrun_notified = 0;
size_t buflen = LWS_PRE + (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PACKETIZATION_PERIOD * nAudioBufferSecs);
ibm::AudioPipe* ap = new ibm::AudioPipe(tech_pvt->sessionId, tech_pvt->host, tech_pvt->port, tech_pvt->path,
buflen, read_impl.decoded_bytes_per_packet, eventCallback);
if (!ap) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error allocating AudioPipe\n");
return SWITCH_STATUS_FALSE;
}
const char* access_token = switch_channel_get_variable(channel, "IBM_ACCESS_TOKEN");
ap->setAccessToken(access_token);
ap->setBugname(bugname);
if (interim) ap->enableInterimTranscripts(true);
tech_pvt->pAudioPipe = static_cast<void *>(ap);
switch_mutex_init(&tech_pvt->mutex, SWITCH_MUTEX_NESTED, switch_core_session_get_pool(session));
if (desiredSampling != sampling) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) resampling from %u to %u\n", tech_pvt->id, sampling, desiredSampling);
tech_pvt->resampler = speex_resampler_init(channels, sampling, desiredSampling, SWITCH_RESAMPLE_QUALITY, &err);
if (0 != err) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing resampler: %s.\n", speex_resampler_strerror(err));
return SWITCH_STATUS_FALSE;
}
}
else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) no resampling needed for this call\n", tech_pvt->id);
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%u) fork_data_init\n", tech_pvt->id);
return SWITCH_STATUS_SUCCESS;
}
void lws_logger(int level, const char *line) {
switch_log_level_t llevel = SWITCH_LOG_DEBUG;
switch (level) {
case LLL_ERR: llevel = SWITCH_LOG_ERROR; break;
case LLL_WARN: llevel = SWITCH_LOG_WARNING; break;
case LLL_NOTICE: llevel = SWITCH_LOG_NOTICE; break;
case LLL_INFO: llevel = SWITCH_LOG_INFO; break;
break;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s\n", line);
}
}
extern "C" {
switch_status_t ibm_transcribe_init() {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "mod_ibm_transcribe: audio buffer (in secs): %d secs\n", nAudioBufferSecs);
int logs = LLL_ERR | LLL_WARN | LLL_NOTICE || LLL_INFO | LLL_PARSER | LLL_HEADER | LLL_EXT | LLL_CLIENT | LLL_LATENCY | LLL_DEBUG ;
ibm::AudioPipe::initialize(logs, lws_logger);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "AudioPipe::initialize completed\n");
return SWITCH_STATUS_SUCCESS;
}
switch_status_t ibm_transcribe_cleanup() {
bool cleanup = false;
cleanup = ibm::AudioPipe::deinitialize();
if (cleanup == true) {
return SWITCH_STATUS_SUCCESS;
}
return SWITCH_STATUS_FALSE;
}
switch_status_t ibm_transcribe_session_init(switch_core_session_t *session,
uint32_t samples_per_second, uint32_t channels, char* lang, int interim, char* bugname, void **ppUserData)
{
int err;
// allocate per-session data structure
private_t* tech_pvt = (private_t *) switch_core_session_alloc(session, sizeof(private_t));
if (!tech_pvt) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error allocating memory!\n");
return SWITCH_STATUS_FALSE;
}
if (SWITCH_STATUS_SUCCESS != fork_data_init(tech_pvt, session, samples_per_second, 16000, channels, lang, interim, bugname /*, responseHandler */)) {
destroy_tech_pvt(tech_pvt);
return SWITCH_STATUS_FALSE;
}
*ppUserData = tech_pvt;
ibm::AudioPipe *pAudioPipe = static_cast<ibm::AudioPipe *>(tech_pvt->pAudioPipe);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connecting now\n");
pAudioPipe->connect();
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "connection in progress\n");
return SWITCH_STATUS_SUCCESS;
}
switch_status_t ibm_transcribe_session_stop(switch_core_session_t *session,int channelIsClosing, char* bugname) {
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, MY_BUG_NAME);
if (!bug) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "ibm_transcribe_session_stop: no bug - websocket conection already closed\n");
return SWITCH_STATUS_FALSE;
}
private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug);
uint32_t id = tech_pvt->id;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) ibm_transcribe_session_stop\n", id);
if (!tech_pvt) return SWITCH_STATUS_FALSE;
// close connection and get final responses
switch_mutex_lock(tech_pvt->mutex);
switch_channel_set_private(channel, bugname, NULL);
if (!channelIsClosing) switch_core_media_bug_remove(session, &bug);
ibm::AudioPipe *pAudioPipe = static_cast<ibm::AudioPipe *>(tech_pvt->pAudioPipe);
if (pAudioPipe) {
//reaper(tech_pvt);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) ibm_transcribe_session_stop, send stop request to get final transcript\n", id);
pAudioPipe->finish();
tech_pvt->pAudioPipe = nullptr;
}
else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) ibm_transcribe_session_stop, null audiopipe\n", id);
}
destroy_tech_pvt(tech_pvt);
switch_mutex_unlock(tech_pvt->mutex);
switch_mutex_destroy(tech_pvt->mutex);
tech_pvt->mutex = nullptr;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) ibm_transcribe_session_stop exiting\n", id);
return SWITCH_STATUS_SUCCESS;
}
switch_bool_t ibm_transcribe_frame(switch_core_session_t *session, switch_media_bug_t *bug) {
private_t* tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug);
size_t inuse = 0;
bool dirty = false;
char *p = (char *) "{\"msg\": \"buffer overrun\"}";
if (!tech_pvt) return SWITCH_TRUE;
if (switch_mutex_trylock(tech_pvt->mutex) == SWITCH_STATUS_SUCCESS) {
if (!tech_pvt->pAudioPipe) {
switch_mutex_unlock(tech_pvt->mutex);
return SWITCH_TRUE;
}
ibm::AudioPipe *pAudioPipe = static_cast<ibm::AudioPipe *>(tech_pvt->pAudioPipe);
if (pAudioPipe->getLwsState() != ibm::AudioPipe::LWS_CLIENT_CONNECTED) {
switch_mutex_unlock(tech_pvt->mutex);
return SWITCH_TRUE;
}
pAudioPipe->lockAudioBuffer();
size_t available = pAudioPipe->binarySpaceAvailable();
if (NULL == tech_pvt->resampler) {
switch_frame_t frame = { 0 };
frame.data = pAudioPipe->binaryWritePtr();
frame.buflen = available;
while (true) {
// check if buffer would be overwritten; dump packets if so
if (available < pAudioPipe->binaryMinSpace()) {
if (!tech_pvt->buffer_overrun_notified) {
tech_pvt->buffer_overrun_notified = 1;
responseHandler(session, TRANSCRIBE_EVENT_BUFFER_OVERRUN, NULL, tech_pvt->bugname, 0);
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%u) dropping packets!\n",
tech_pvt->id);
pAudioPipe->binaryWritePtrResetToZero();
frame.data = pAudioPipe->binaryWritePtr();
frame.buflen = available = pAudioPipe->binarySpaceAvailable();
}
switch_status_t rv = switch_core_media_bug_read(bug, &frame, SWITCH_TRUE);
if (rv != SWITCH_STATUS_SUCCESS) break;
if (frame.datalen) {
pAudioPipe->binaryWritePtrAdd(frame.datalen);
frame.buflen = available = pAudioPipe->binarySpaceAvailable();
frame.data = pAudioPipe->binaryWritePtr();
dirty = true;
}
}
}
else {
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
while (switch_core_media_bug_read(bug, &frame, SWITCH_TRUE) == SWITCH_STATUS_SUCCESS) {
if (frame.datalen) {
spx_uint32_t out_len = available >> 1; // space for samples which are 2 bytes
spx_uint32_t in_len = frame.samples;
speex_resampler_process_interleaved_int(tech_pvt->resampler,
(const spx_int16_t *) frame.data,
(spx_uint32_t *) &in_len,
(spx_int16_t *) ((char *) pAudioPipe->binaryWritePtr()),
&out_len);
if (out_len > 0) {
// bytes written = (num samples) * (2 bytes per sample) * (num channels)
size_t bytes_written = out_len * 2 * tech_pvt->channels;
//std::cerr << "read " << in_len << " samples, wrote " << out_len << " samples, wrote " << bytes_written << " bytes " << std::endl;
pAudioPipe->binaryWritePtrAdd(bytes_written);
available = pAudioPipe->binarySpaceAvailable();
dirty = true;
}
if (available < pAudioPipe->binaryMinSpace()) {
if (!tech_pvt->buffer_overrun_notified) {
tech_pvt->buffer_overrun_notified = 1;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "(%u) dropping packets!\n",
tech_pvt->id);
responseHandler(session, TRANSCRIBE_EVENT_BUFFER_OVERRUN, NULL, tech_pvt->bugname, 0);
}
break;
}
}
}
}
pAudioPipe->unlockAudioBuffer();
switch_mutex_unlock(tech_pvt->mutex);
}
return SWITCH_TRUE;
}
}

View File

@@ -0,0 +1,11 @@
#ifndef __IBM_GLUE_H__
#define __IBM_GLUE_H__
switch_status_t ibm_transcribe_init();
switch_status_t ibm_transcribe_cleanup();
switch_status_t ibm_transcribe_session_init(switch_core_session_t *session,
uint32_t samples_per_second, uint32_t channels, char* lang, int interim, char* bugname, void **ppUserData);
switch_status_t ibm_transcribe_session_stop(switch_core_session_t *session, int channelIsClosing, char* bugname);
switch_bool_t ibm_transcribe_frame(switch_core_session_t *session, switch_media_bug_t *bug);
#endif

View File

@@ -0,0 +1,223 @@
/*
*
* mod_ibm_transcribe.c -- Freeswitch module for using ibm streaming transcribe api
*
*/
#include "mod_ibm_transcribe.h"
#include "ibm_transcribe_glue.h"
/* Prototypes */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_ibm_transcribe_shutdown);
SWITCH_MODULE_LOAD_FUNCTION(mod_ibm_transcribe_load);
SWITCH_MODULE_DEFINITION(mod_ibm_transcribe, mod_ibm_transcribe_load, mod_ibm_transcribe_shutdown, NULL);
static switch_status_t do_stop(switch_core_session_t *session, char* bugname);
static switch_bool_t capture_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
switch_core_session_t *session = switch_core_media_bug_get_session(bug);
switch (type) {
case SWITCH_ABC_TYPE_INIT:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got SWITCH_ABC_TYPE_INIT.\n");
break;
case SWITCH_ABC_TYPE_CLOSE:
{
private_t *tech_pvt = (private_t*) switch_core_media_bug_get_user_data(bug);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got SWITCH_ABC_TYPE_CLOSE.\n");
ibm_transcribe_session_stop(session, 1, tech_pvt->bugname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Finished SWITCH_ABC_TYPE_CLOSE.\n");
}
break;
case SWITCH_ABC_TYPE_READ:
return ibm_transcribe_frame(session, bug);
break;
case SWITCH_ABC_TYPE_WRITE:
default:
break;
}
return SWITCH_TRUE;
}
static switch_status_t start_capture(switch_core_session_t *session, switch_media_bug_flag_t flags,
char* lang, int interim, char* bugname)
{
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_media_bug_t *bug;
switch_status_t status;
switch_codec_implementation_t read_impl = { 0 };
void *pUserData;
uint32_t samples_per_second;
if (switch_channel_get_private(channel, MY_BUG_NAME)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n");
do_stop(session, bugname);
}
switch_core_session_get_read_impl(session, &read_impl);
if (switch_channel_pre_answer(channel) != SWITCH_STATUS_SUCCESS) {
return SWITCH_STATUS_FALSE;
}
samples_per_second = !strcasecmp(read_impl.iananame, "g722") ? read_impl.actual_samples_per_second : read_impl.samples_per_second;
if (SWITCH_STATUS_FALSE == ibm_transcribe_session_init(session, samples_per_second, flags & SMBF_STEREO ? 2 : 1, lang, interim, bugname, &pUserData)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initializing ibm speech session.\n");
return SWITCH_STATUS_FALSE;
}
if ((status = switch_core_media_bug_add(session, "ibm_transcribe", NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) {
return status;
}
switch_channel_set_private(channel, MY_BUG_NAME, bug);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for ibm transcribe\n");
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t do_stop(switch_core_session_t *session, char* bugname)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_channel_t *channel = switch_core_session_get_channel(session);
switch_media_bug_t *bug = switch_channel_get_private(channel, MY_BUG_NAME);
if (bug) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Received user command command to stop transcribe.\n");
status = ibm_transcribe_session_stop(session, 0, bugname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "stopped transcribe.\n");
}
return status;
}
#define TRANSCRIBE_API_SYNTAX "<uuid> [start|stop] lang-code [interim] [stereo|mono]"
SWITCH_STANDARD_API(ibm_transcribe_function)
{
char *mycmd = NULL, *argv[6] = { 0 };
int argc = 0;
switch_status_t status = SWITCH_STATUS_FALSE;
switch_media_bug_flag_t flags = SMBF_READ_STREAM /* | SMBF_WRITE_STREAM | SMBF_READ_PING */;
if (!zstr(cmd) && (mycmd = strdup(cmd))) {
argc = switch_separate_string(mycmd, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
}
if (zstr(cmd) ||
(!strcasecmp(argv[1], "stop") && argc < 2) ||
(!strcasecmp(argv[1], "start") && argc < 3) ||
zstr(argv[0])) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error with command %s %s %s.\n", cmd, argv[0], argv[1]);
stream->write_function(stream, "-USAGE: %s\n", TRANSCRIBE_API_SYNTAX);
goto done;
} else {
switch_core_session_t *lsession = NULL;
if ((lsession = switch_core_session_locate(argv[0]))) {
if (!strcasecmp(argv[1], "stop")) {
char *bugname = argc > 2 ? argv[2] : MY_BUG_NAME;
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "stop transcribing\n");
status = do_stop(lsession, bugname);
} else if (!strcasecmp(argv[1], "start")) {
char* lang = argv[2];
int interim = argc > 3 && !strcmp(argv[3], "interim");
char *bugname = argc > 5 ? argv[5] : MY_BUG_NAME;
if (argc > 4 && !strcmp(argv[4], "stereo")) {
flags |= SMBF_WRITE_STREAM ;
flags |= SMBF_STEREO;
}
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_INFO, "start transcribing %s %s\n", lang, interim ? "interim": "complete");
status = start_capture(lsession, flags, lang, interim, bugname);
}
switch_core_session_rwunlock(lsession);
}
}
if (status == SWITCH_STATUS_SUCCESS) {
stream->write_function(stream, "+OK Success\n");
} else {
stream->write_function(stream, "-ERR Operation Failed\n");
}
done:
switch_safe_free(mycmd);
return SWITCH_STATUS_SUCCESS;
}
SWITCH_MODULE_LOAD_FUNCTION(mod_ibm_transcribe_load)
{
switch_api_interface_t *api_interface;
/* create/register custom event message type */
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_RESULTS) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_RESULTS);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_NO_AUDIO_DETECTED) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_NO_AUDIO_DETECTED);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_VAD_DETECTED) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_VAD_DETECTED);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_CONNECT_SUCCESS) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_CONNECT_SUCCESS);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_CONNECT_FAIL) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_CONNECT_FAIL);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_BUFFER_OVERRUN) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_BUFFER_OVERRUN);
return SWITCH_STATUS_TERM;
}
if (switch_event_reserve_subclass(TRANSCRIBE_EVENT_DISCONNECT) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Couldn't register subclass %s!\n", TRANSCRIBE_EVENT_DISCONNECT);
return SWITCH_STATUS_TERM;
}
/* connect my internal structure to the blank pointer passed to me */
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "IBM Speech Transcription API loading..\n");
if (SWITCH_STATUS_FALSE == ibm_transcribe_init()) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failed initializing ibm speech interface\n");
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "IBM Speech Transcription API successfully loaded\n");
SWITCH_ADD_API(api_interface, "uuid_ibm_transcribe", "IBM Speech Transcription API", ibm_transcribe_function, TRANSCRIBE_API_SYNTAX);
switch_console_set_complete("add uuid_ibm_transcribe start lang-code [interim|final] [stereo|mono]");
switch_console_set_complete("add uuid_ibm_transcribe stop ");
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
/*
Called when the system shuts down
Macro expands to: switch_status_t mod_ibm_transcribe_shutdown() */
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_ibm_transcribe_shutdown)
{
ibm_transcribe_cleanup();
switch_event_free_subclass(TRANSCRIBE_EVENT_RESULTS);
switch_event_free_subclass(TRANSCRIBE_EVENT_NO_AUDIO_DETECTED);
switch_event_free_subclass(TRANSCRIBE_EVENT_VAD_DETECTED);
switch_event_free_subclass(TRANSCRIBE_EVENT_CONNECT_SUCCESS);
switch_event_free_subclass(TRANSCRIBE_EVENT_CONNECT_FAIL);
switch_event_free_subclass(TRANSCRIBE_EVENT_BUFFER_OVERRUN);
switch_event_free_subclass(TRANSCRIBE_EVENT_DISCONNECT);
return SWITCH_STATUS_SUCCESS;
}

View File

@@ -0,0 +1,44 @@
#ifndef __MOD_IBM_TRANSCRIBE_H__
#define __MOD_IBM_TRANSCRIBE_H__
#include <switch.h>
#include <speex/speex_resampler.h>
#include <unistd.h>
#define MY_BUG_NAME "ibm_transcribe"
#define TRANSCRIBE_EVENT_RESULTS "ibm_transcribe::transcription"
#define TRANSCRIBE_EVENT_NO_AUDIO_DETECTED "ibm_transcribe::no_audio_detected"
#define TRANSCRIBE_EVENT_VAD_DETECTED "ibm_transcribe::vad_detected"
#define TRANSCRIBE_EVENT_CONNECT_SUCCESS "ibm_transcribe::connect"
#define TRANSCRIBE_EVENT_CONNECT_FAIL "ibm_transcribe::connect_failed"
#define TRANSCRIBE_EVENT_BUFFER_OVERRUN "ibm_transcribe::buffer_overrun"
#define TRANSCRIBE_EVENT_DISCONNECT "ibm_transcribe::disconnect"
#define TRANSCRIBE_EVENT_ERROR "jambonz_transcribe::error"
#define MAX_LANG (12)
#define MAX_SESSION_ID (256)
#define MAX_WS_URL_LEN (1024)
#define MAX_PATH_LEN (4096)
#define MAX_BUG_LEN (64)
struct private_data {
switch_mutex_t *mutex;
char sessionId[MAX_SESSION_ID];
SpeexResamplerState *resampler;
void *pAudioPipe;
int ws_state;
char host[MAX_WS_URL_LEN];
unsigned int port;
char path[MAX_PATH_LEN];
char bugname[MAX_BUG_LEN+1];
int sampling;
int channels;
unsigned int id;
int buffer_overrun_notified:1;
int is_finished:1;
};
typedef struct private_data private_t;
#endif

View File

@@ -0,0 +1,21 @@
#include "parser.hpp"
#include <switch.h>
cJSON* parse_json(switch_core_session_t* session, const std::string& data, std::string& type) {
cJSON* json = NULL;
const char *szType = NULL;
json = cJSON_Parse(data.c_str());
if (!json) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "parse - failed parsing incoming msg as JSON: %s\n", data.c_str());
return NULL;
}
szType = cJSON_GetObjectCstr(json, "type");
if (szType) {
type.assign(szType);
}
else {
type.assign("json");
}
return json;
}

View File

@@ -0,0 +1,9 @@
#ifndef __PARSER_H__
#define __PARSER_H__
#include <string>
#include <switch_json.h>
cJSON* parse_json(switch_core_session_t* session, const std::string& data, std::string& type) ;
#endif

View File

@@ -0,0 +1,51 @@
/**
* (very) simple and limited circular buffer,
* supporting only the use case of doing all of the adds
* and then subsquently retrieves.
*
*/
class SimpleBuffer {
public:
SimpleBuffer(uint32_t chunkSize, uint32_t numChunks) : numItems(0),
m_numChunks(numChunks), m_chunkSize(chunkSize) {
m_pData = new char[chunkSize * numChunks];
m_pNextWrite = m_pData;
}
~SimpleBuffer() {
delete [] m_pData;
}
void add(void *data, uint32_t datalen) {
if (datalen % m_chunkSize != 0) return;
int numChunks = datalen / m_chunkSize;
for (int i = 0; i < numChunks; i++) {
memcpy(m_pNextWrite, data, m_chunkSize);
data = static_cast<char*>(data) + m_chunkSize;
if (numItems < m_numChunks) numItems++;
uint32_t offset = (m_pNextWrite - m_pData) / m_chunkSize;
if (offset >= m_numChunks - 1) m_pNextWrite = m_pData;
else m_pNextWrite += m_chunkSize;
}
}
char* getNextChunk() {
if (numItems--) {
char *p = m_pNextWrite;
uint32_t offset = (m_pNextWrite - m_pData) / m_chunkSize;
if (offset >= m_numChunks - 1) m_pNextWrite = m_pData;
else m_pNextWrite += m_chunkSize;
return p;
}
return nullptr;
}
uint32_t getNumItems() { return numItems;}
private:
char *m_pData;
uint32_t numItems;
uint32_t m_chunkSize;
uint32_t m_numChunks;
char* m_pNextWrite;
};