Feat/mark bidirectional streaming (#102)

* initial support for mark feature in bidirectional streaming

Signed-off-by: Dave Horton <daveh@beachdognet.com>

* wip

* allow max of 30 marks on any connection

* fix send multiple json in same ws text frame (#108)

* fix send multiple json in same ws text frame

Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>

* fix mark is not sent without more bidirectional audio

Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>

---------

Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>

---------

Signed-off-by: Dave Horton <daveh@beachdognet.com>
Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>
Co-authored-by: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com>
This commit is contained in:
Dave Horton
2024-09-04 16:10:41 +01:00
committed by GitHub
parent d01991ed0f
commit 4ee08a310a
4 changed files with 218 additions and 17 deletions

View File

@@ -227,20 +227,21 @@ int AudioPipe::lws_callback(struct lws *wsi,
// check for text frames to send
{
std::lock_guard<std::mutex> lk(ap->m_text_mutex);
if (ap->m_metadata.length() > 0) {
uint8_t buf[ap->m_metadata.length() + LWS_PRE];
memcpy(buf + LWS_PRE, ap->m_metadata.c_str(), ap->m_metadata.length());
int n = ap->m_metadata.length();
if (!ap->m_metadata_list.empty()) {
const std::string& message = ap->m_metadata_list.front();
uint8_t buf[message.length() + LWS_PRE];
memcpy(buf + LWS_PRE, message.c_str(), message.length());
int n = message.length();
int m = lws_write(wsi, buf + LWS_PRE, n, LWS_WRITE_TEXT);
ap->m_metadata.clear();
if (m < n) {
return -1;
return -1; // Failed to send the full message
}
// there may be audio data, but only one write per writeable event
// get it next time
// Remove the message that was successfully sent
ap->m_metadata_list.pop_front();
// Request another writable event if there are more messages
lws_callback_on_writable(wsi);
return 0;
}
}
@@ -529,7 +530,7 @@ void AudioPipe::bufferForSending(const char* text) {
if (m_state != LWS_CLIENT_CONNECTED) return;
{
std::lock_guard<std::mutex> lk(m_text_mutex);
m_metadata.append(text);
m_metadata_list.emplace_back(text);
}
addPendingWrite(this);
}

View File

@@ -130,7 +130,7 @@ namespace drachtio {
std::string m_bugname;
unsigned int m_port;
std::string m_path;
std::string m_metadata;
std::list<std::string> m_metadata_list;
std::mutex m_text_mutex;
std::mutex m_audio_mutex;
int m_sslFlags;

View File

@@ -27,6 +27,8 @@ typedef boost::circular_buffer<uint16_t> CircularBuffer_t;
#define RTP_PACKETIZATION_PERIOD 20
#define FRAME_SIZE_8000 320 /*which means each 20ms frame as 320 bytes at 8 khz (1 channel only)*/
#define BUFFER_GROW_SIZE (16384)
#define AUDIO_MARKER 0xFFFF
#define MAX_MARKS (30)
namespace {
static const char *requestedBufferSecs = std::getenv("MOD_AUDIO_FORK_BUFFER_SECS");
@@ -38,6 +40,15 @@ namespace {
static unsigned int idxCallCount = 0;
static uint32_t playCount = 0;
static bool markCountExceeded(private_t* tech_pvt) {
if (nullptr != tech_pvt->pVecMarksInUse) {
std::deque<std::string>* pVecMarksInUse = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
std::deque<std::string>* pVecMarksInInventory = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
return pVecMarksInUse->size()+ pVecMarksInInventory->size() >= MAX_MARKS;
}
return false;
}
switch_status_t processIncomingBinary(private_t* tech_pvt, switch_core_session_t* session, const char* message, size_t dataLength) {
std::vector<uint8_t> data;
@@ -65,12 +76,30 @@ namespace {
// Access the prebuffer
CircularBuffer_t* cBuffer = static_cast<CircularBuffer_t*>(tech_pvt->streamingPreBuffer);
int numMarkers = 0;
std::deque<std::string>* pVecMarksInInventory = nullptr;
std::deque<std::string>* pVecMarksInUse = nullptr;
if (nullptr != tech_pvt->pVecMarksInInventory) {
pVecMarksInInventory = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
pVecMarksInUse = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
numMarkers = pVecMarksInInventory->size();
// move inventory to in-use
pVecMarksInUse->insert(pVecMarksInUse->end(), pVecMarksInInventory->begin(), pVecMarksInInventory->end());
pVecMarksInInventory->clear();
}
// Ensure the prebuffer has enough capacity
if (cBuffer->capacity() - cBuffer->size() < numSamples) {
size_t newCapacity = cBuffer->size() + std::max(numSamples, (size_t)BUFFER_GROW_SIZE);
if (cBuffer->capacity() - cBuffer->size() < numSamples + numMarkers) {
size_t newCapacity = cBuffer->size() + std::max(numSamples + numMarkers, (size_t)BUFFER_GROW_SIZE);
cBuffer->set_capacity(newCapacity);
}
// prepend any markers
while (numMarkers-- > 0) {
cBuffer->push_back(AUDIO_MARKER);
}
// Append the data to the prebuffer
cBuffer->insert(cBuffer->end(), data_uint16, data_uint16 + numSamples);
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Appended %zu 16-bit samples to the prebuffer.\n", numSamples);
@@ -269,6 +298,63 @@ namespace {
// this will dump buffered incoming audio
tech_pvt->clear_bidirectional_audio_buffer = true;
}
else if (0 == type.compare("mark")) {
cJSON* data = cJSON_GetObjectItem(json, "data");
if (data) {
cJSON* name = cJSON_GetObjectItem(data, "name");
if (cJSON_IsString(name) && name->valuestring) {
if (markCountExceeded(tech_pvt)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "(%u) processIncomingMessage - mark count exceeded, discarding mark %s\n", tech_pvt->id, cJSON_GetStringValue(name));
}
else {
if (nullptr == tech_pvt->pVecMarksInInventory) {
tech_pvt->pVecMarksInInventory = static_cast<void *>(new std::deque<std::string>());
tech_pvt->pVecMarksInUse = static_cast<void *>(new std::deque<std::string>());
tech_pvt->pVecMarksCleared = static_cast<void *>(new std::deque<std::string>());
}
std::deque<std::string>* pVec = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
pVec->push_back(name->valuestring);
}
}
}
}
else if (0 == type.compare("clearMarks")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) processIncomingMessage - received clearMarks\n", tech_pvt->id);
if (nullptr != tech_pvt->pVecMarksInInventory) {
std::deque<std::string>* pVecMarksInInventory = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
std::deque<std::string>* pVecMarksInUse = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
std::deque<std::string>* pVecMarksCleared = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksCleared);
pVecMarksCleared->insert(pVecMarksCleared->end(), pVecMarksInUse->begin(), pVecMarksInUse->end());
pVecMarksCleared->insert(pVecMarksCleared->end(), pVecMarksInInventory->begin(), pVecMarksInInventory->end());
pVecMarksInInventory->clear();
pVecMarksInUse->clear();
}
}
else if (0 == type.compare("transcription")) {
char* jsonString = cJSON_PrintUnformatted(jsonData);
tech_pvt->responseHandler(session, EVENT_TRANSCRIPTION, jsonString);
free(jsonString);
}
else if (0 == type.compare("transfer")) {
char* jsonString = cJSON_PrintUnformatted(jsonData);
tech_pvt->responseHandler(session, EVENT_TRANSFER, jsonString);
free(jsonString);
}
else if (0 == type.compare("disconnect")) {
char* jsonString = cJSON_PrintUnformatted(jsonData);
tech_pvt->responseHandler(session, EVENT_DISCONNECT, jsonString);
free(jsonString);
}
else if (0 == type.compare("error")) {
char* jsonString = cJSON_PrintUnformatted(jsonData);
tech_pvt->responseHandler(session, EVENT_ERROR, jsonString);
free(jsonString);
}
else if (0 == type.compare("json")) {
char* jsonString = cJSON_PrintUnformatted(json);
tech_pvt->responseHandler(session, EVENT_JSON, jsonString);
free(jsonString);
}
else if (0 == type.compare("transcription")) {
char* jsonString = cJSON_PrintUnformatted(jsonData);
tech_pvt->responseHandler(session, EVENT_TRANSCRIPTION, jsonString);
@@ -400,6 +486,9 @@ namespace {
}
tech_pvt->streamingPreBufSize = 320 * tech_pvt->downscale_factor * 4; // min 80ms prebuffer
tech_pvt->streamingPreBuffer = (void *) new CircularBuffer_t(8192);
tech_pvt->pVecMarksInInventory = nullptr;
tech_pvt->pVecMarksInUse = nullptr;
tech_pvt->pVecMarksCleared = nullptr;
strncpy(tech_pvt->bugname, bugname, MAX_BUG_LEN);
if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN);
@@ -467,6 +556,29 @@ namespace {
delete cBuffer;
tech_pvt->streamingPreBuffer = nullptr;
}
if (nullptr == tech_pvt->pVecMarksInInventory) {
delete static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
tech_pvt->pVecMarksInInventory = nullptr;
delete static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
tech_pvt->pVecMarksInUse = nullptr;
delete static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksCleared);
tech_pvt->pVecMarksCleared = nullptr;
}
}
static void send_mark_event(private_t* tech_pvt, const char* name, int cleared = false) {
drachtio::AudioPipe *pAudioPipe = static_cast<drachtio::AudioPipe *>(tech_pvt->pAudioPipe);
std::ostringstream json;
json << "{\"type\": \"mark\", \"data\": {\"name\":\"" << name << "\", ";
if (cleared) json << "\"event\": \"cleared\"}}";
else json << "\"event\": \"playout\"}}";
if (pAudioPipe) {
std::string str = json.str();
pAudioPipe->bufferForSending(str.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) send_mark_event: %s\n", tech_pvt->id, str.c_str());
}
}
void lws_logger(int level, const char *line) {
@@ -826,6 +938,28 @@ extern "C" {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - clearing buffer\n", tech_pvt->id);
cBuffer->clear();
tech_pvt->clear_bidirectional_audio_buffer = false;
// send "mark" event for any queued markers
if (nullptr != tech_pvt->pVecMarksInInventory) {
std::deque<std::string>* pVecMarksInInventory = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
std::deque<std::string>* pVecMarksInUse = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
if (pVecMarksInInventory->size() + pVecMarksInUse->size() > 0) {
std::deque<std::string> vec = *pVecMarksInUse;
vec.insert(vec.end(), pVecMarksInInventory->begin(), pVecMarksInInventory->end());
for (auto it = vec.begin(); it != vec.end(); ++it) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "(%u) dub_speech_frame - Marker %s cleared\n",
tech_pvt->id, it->c_str());
send_mark_event(tech_pvt, it->c_str(), true);
}
// put the "in-use" ones into the "cleared" queue so we dont notify again when they eventually come through
std::deque<std::string>* pVecMarksCleared = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksCleared);
pVecMarksCleared->insert(pVecMarksCleared->end(), pVecMarksInUse->begin(), pVecMarksInUse->end());
pVecMarksInUse->clear();
pVecMarksInInventory->clear();
}
}
}
else {
switch_frame_t* rframe = switch_core_media_bug_get_write_replace_frame(bug);
@@ -841,11 +975,72 @@ extern "C" {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - samples to copy %u\n", tech_pvt->id, samplesToCopy);
bool hasMarkers = false;
std::deque<std::string>* pVecInventory = nullptr;
std::deque<std::string>* pVecInUse = nullptr;
std::deque<std::string>* pVecCleared = nullptr;
if (nullptr != tech_pvt->pVecMarksInUse) {
pVecInventory = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInInventory);
pVecInUse = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksInUse);
pVecCleared = static_cast<std::deque<std::string>*>(tech_pvt->pVecMarksCleared);
hasMarkers = pVecInUse->size() + pVecCleared->size() > 0;
}
if (hasMarkers) {
/* discard markers and send notifications */
auto bufferIter = cBuffer->begin();
auto dataIter = data;
for (int i = 0; i < samplesToCopy; ++i) {
if (*bufferIter == AUDIO_MARKER) {
// Marker detected, discard it and send a notice unless it was previously cleared
auto * pVec = pVecCleared->size() > 0 ? pVecCleared : pVecInUse;
if (!pVec->empty()) {
auto name = pVec->front();
pVec->pop_front();
if (pVec == pVecInUse) {
send_mark_event(tech_pvt, name.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - Marker %s detected in playout\n",
tech_pvt->id, name.c_str());
}
else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - Marker %s detected in playout but previously cleared\n",
tech_pvt->id, name.c_str());
}
}
} else {
// Copy valid audio samplewhat
*dataIter = *bufferIter;
++dataIter;
}
++bufferIter;
}
// Remove the processed samples (including discarded markers) from the buffer
cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy);
// Adjust the number of samples copied to the output frame
int validSamplesCopied = std::distance(data, dataIter);
if (validSamplesCopied > 0) {
vector_add(fp, data, validSamplesCopied);
}
}
else {
std::copy_n(cBuffer->begin(), samplesToCopy, data);
cBuffer->erase(cBuffer->begin(), cBuffer->begin() + samplesToCopy);
if (samplesToCopy > 0) {
vector_add(fp, data, rframe->samples);
} else if (pVecInventory != nullptr && pVecInventory->size()) {
// no bidirectional audio to dub but still have some mark in inventory, send them now
auto name = pVecInventory->front();
pVecInventory->pop_front();
send_mark_event(tech_pvt, name.c_str());
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "(%u) dub_speech_frame - Marker %s detected in inventory\n",
tech_pvt->id, name.c_str());
}
}
vector_normalize(fp, rframe->samples);

View File

@@ -53,6 +53,11 @@ struct private_data {
int graceful_shutdown:1;
char initialMetadata[8192];
// for "mark" feature of bidirectional audio
void *pVecMarksInInventory;
void *pVecMarksInUse;
void *pVecMarksCleared;
// bidirectional audio
void *streamingPlayoutBuffer;
void *streamingPreBuffer;