support single azure connection for same channel (#73)

* support single azure connection for same channel

* wip

* wip

Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>

---------

Signed-off-by: Hoan HL <quan.luuhoang8@gmail.com>
This commit is contained in:
Hoan Luu Huu
2024-06-01 17:59:24 +07:00
committed by GitHub
parent 6ad663f9d9
commit 2a94213668
3 changed files with 101 additions and 29 deletions

View File

@@ -36,7 +36,7 @@ static const char* proxyPassword = std::getenv("JAMBONES_HTTP_PROXY_PASSWORD");
class GStreamer { class GStreamer {
public: public:
GStreamer( GStreamer(
const char *sessionId, const char *sessionId,
const char *bugname, const char *bugname,
u_int16_t channels, u_int16_t channels,
char *lang, char *lang,
@@ -51,6 +51,15 @@ public:
switch_core_session_t* psession = switch_core_session_locate(sessionId); switch_core_session_t* psession = switch_core_session_locate(sessionId);
if (!psession) throw std::invalid_argument( "session id no longer active" ); if (!psession) throw std::invalid_argument( "session id no longer active" );
//Due to use_single_connection, each GStreamer need to identify itself by configuration, if there is changes in configuration,
// the GStreamer will be closed and replaced by new object with new configuration.
m_configuration_stream <<
channels << ";" <<
lang << ";" <<
interim << ";" <<
samples_per_second << ";" <<
region << ";" <<
subscriptionKey << ";";
switch_channel_t *channel = switch_core_session_get_channel(psession); switch_channel_t *channel = switch_core_session_get_channel(psession);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "GStreamer::GStreamer(%p) region %s, language %s\n", switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "GStreamer::GStreamer(%p) region %s, language %s\n",
@@ -59,6 +68,9 @@ public:
const char* endpoint = switch_channel_get_variable(channel, "AZURE_SERVICE_ENDPOINT"); const char* endpoint = switch_channel_get_variable(channel, "AZURE_SERVICE_ENDPOINT");
const char* endpointId = switch_channel_get_variable(channel, "AZURE_SERVICE_ENDPOINT_ID"); const char* endpointId = switch_channel_get_variable(channel, "AZURE_SERVICE_ENDPOINT_ID");
m_configuration_stream <<
endpoint << ";" <<
endpointId << ";";
auto sourceLanguageConfig = SourceLanguageConfig::FromLanguage(lang); auto sourceLanguageConfig = SourceLanguageConfig::FromLanguage(lang);
auto format = AudioStreamFormat::GetWaveFormatPCM(8000, 16, channels); auto format = AudioStreamFormat::GetWaveFormatPCM(8000, 16, channels);
@@ -69,6 +81,7 @@ public:
SpeechConfig::FromEndpoint(endpoint)) : SpeechConfig::FromEndpoint(endpoint)) :
SpeechConfig::FromSubscription(subscriptionKey, region); SpeechConfig::FromSubscription(subscriptionKey, region);
if (switch_true(switch_channel_get_variable(channel, "AZURE_USE_OUTPUT_FORMAT_DETAILED"))) { if (switch_true(switch_channel_get_variable(channel, "AZURE_USE_OUTPUT_FORMAT_DETAILED"))) {
m_configuration_stream << "output_format_detailed;";
speechConfig->SetOutputFormat(OutputFormat::Detailed); speechConfig->SetOutputFormat(OutputFormat::Detailed);
} }
if (nullptr != endpointId) { if (nullptr != endpointId) {
@@ -80,12 +93,18 @@ public:
speechConfig->SetProperty(PropertyId::Speech_LogFilename, sdkLog); speechConfig->SetProperty(PropertyId::Speech_LogFilename, sdkLog);
} }
if (switch_true(switch_channel_get_variable(channel, "AZURE_AUDIO_LOGGING"))) { if (switch_true(switch_channel_get_variable(channel, "AZURE_AUDIO_LOGGING"))) {
m_configuration_stream << "audio_logging;";
speechConfig->EnableAudioLogging(); speechConfig->EnableAudioLogging();
} }
if (nullptr != proxyIP && nullptr != proxyPort) { if (nullptr != proxyIP && nullptr != proxyPort) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting proxy: %s:%s\n", proxyIP, proxyPort); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting proxy: %s:%s\n", proxyIP, proxyPort);
speechConfig->SetProxy(proxyIP, atoi(proxyPort), proxyUsername, proxyPassword); speechConfig->SetProxy(proxyIP, atoi(proxyPort), proxyUsername, proxyPassword);
m_configuration_stream <<
proxyIP << ";" <<
proxyPort << ";" <<
proxyUsername << ";" <<
proxyPassword << ";";
} }
m_pushStream = AudioInputStream::CreatePushStream(format); m_pushStream = AudioInputStream::CreatePushStream(format);
@@ -94,6 +113,7 @@ public:
// alternative language // alternative language
const char* var; const char* var;
if (var = switch_channel_get_variable(channel, "AZURE_SPEECH_ALTERNATIVE_LANGUAGE_CODES")) { if (var = switch_channel_get_variable(channel, "AZURE_SPEECH_ALTERNATIVE_LANGUAGE_CODES")) {
m_configuration_stream << var << ";";
std::vector<std::string> languages; std::vector<std::string> languages;
char *alt_langs[3] = { 0 }; char *alt_langs[3] = { 0 };
int argc = switch_separate_string((char *) var, ',', alt_langs, 3); int argc = switch_separate_string((char *) var, ',', alt_langs, 3);
@@ -118,18 +138,22 @@ public:
// profanity options: Allowed values are "masked", "removed", and "raw". // profanity options: Allowed values are "masked", "removed", and "raw".
const char* profanity = switch_channel_get_variable(channel, "AZURE_PROFANITY_OPTION"); const char* profanity = switch_channel_get_variable(channel, "AZURE_PROFANITY_OPTION");
if (profanity) { if (profanity) {
m_configuration_stream << profanity << ";";
properties.SetProperty(PropertyId::SpeechServiceResponse_ProfanityOption, profanity); properties.SetProperty(PropertyId::SpeechServiceResponse_ProfanityOption, profanity);
} }
// report signal-to-noise ratio // report signal-to-noise ratio
if (switch_true(switch_channel_get_variable(channel, "AZURE_REQUEST_SNR"))) { if (switch_true(switch_channel_get_variable(channel, "AZURE_REQUEST_SNR"))) {
m_configuration_stream << "request_snr;";
properties.SetProperty(PropertyId::SpeechServiceResponse_RequestSnr, TrueString); properties.SetProperty(PropertyId::SpeechServiceResponse_RequestSnr, TrueString);
} }
// initial speech timeout in milliseconds // initial speech timeout in milliseconds
const char* timeout = switch_channel_get_variable(channel, "AZURE_INITIAL_SPEECH_TIMEOUT_MS"); const char* timeout = switch_channel_get_variable(channel, "AZURE_INITIAL_SPEECH_TIMEOUT_MS");
m_configuration_stream << timeout << ";";
if (timeout) properties.SetProperty(PropertyId::SpeechServiceConnection_InitialSilenceTimeoutMs, timeout); if (timeout) properties.SetProperty(PropertyId::SpeechServiceConnection_InitialSilenceTimeoutMs, timeout);
else properties.SetProperty(PropertyId::SpeechServiceConnection_InitialSilenceTimeoutMs, DEFAULT_SPEECH_TIMEOUT); else properties.SetProperty(PropertyId::SpeechServiceConnection_InitialSilenceTimeoutMs, DEFAULT_SPEECH_TIMEOUT);
const char* segmentationInterval = switch_channel_get_variable(channel, "AZURE_SPEECH_SEGMENTATION_SILENCE_TIMEOUT_MS"); const char* segmentationInterval = switch_channel_get_variable(channel, "AZURE_SPEECH_SEGMENTATION_SILENCE_TIMEOUT_MS");
m_configuration_stream << segmentationInterval << ";";
if (segmentationInterval) { if (segmentationInterval) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting segmentation interval to %s ms\n", segmentationInterval); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting segmentation interval to %s ms\n", segmentationInterval);
properties.SetProperty(PropertyId::Speech_SegmentationSilenceTimeoutMs, segmentationInterval); properties.SetProperty(PropertyId::Speech_SegmentationSilenceTimeoutMs, segmentationInterval);
@@ -137,6 +161,7 @@ public:
//https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-identification?tabs=once&pivots=programming-language-cpp#at-start-and-continuous-language-identification //https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-identification?tabs=once&pivots=programming-language-cpp#at-start-and-continuous-language-identification
const char* languageIdMode = switch_channel_get_variable(channel, "AZURE_LANGUAGE_ID_MODE"); const char* languageIdMode = switch_channel_get_variable(channel, "AZURE_LANGUAGE_ID_MODE");
m_configuration_stream << languageIdMode << ";";
if (languageIdMode) { if (languageIdMode) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting SpeechServiceConnection_LanguageIdMode to %s \n", languageIdMode); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_DEBUG, "setting SpeechServiceConnection_LanguageIdMode to %s \n", languageIdMode);
properties.SetProperty(PropertyId::SpeechServiceConnection_LanguageIdMode, languageIdMode); properties.SetProperty(PropertyId::SpeechServiceConnection_LanguageIdMode, languageIdMode);
@@ -153,6 +178,7 @@ public:
// hints // hints
const char* hints = switch_channel_get_variable(channel, "AZURE_SPEECH_HINTS"); const char* hints = switch_channel_get_variable(channel, "AZURE_SPEECH_HINTS");
m_configuration_stream << hints << ";";
if (hints) { if (hints) {
auto grammar = PhraseListGrammar::FromRecognizer(m_recognizer); auto grammar = PhraseListGrammar::FromRecognizer(m_recognizer);
char *phrases[500] = { 0 }; char *phrases[500] = { 0 };
@@ -253,6 +279,10 @@ public:
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "GStreamer::~GStreamer %p\n", this); //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "GStreamer::~GStreamer %p\n", this);
} }
const char* configuration() {
return m_configuration_stream.str().c_str();
}
void connect() { void connect() {
if (m_connecting) return; if (m_connecting) return;
m_connecting = true; m_connecting = true;
@@ -304,7 +334,7 @@ public:
void finish() { void finish() {
if (m_finished) return; if (m_finished) return;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "GStreamer::finish - calling StopContinuousRecognitionAsync (%p)\n", this); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "GStreamer::finish - calling StopContinuousRecognitionAsync (%p)\n", this);
m_finished = true; m_finished = true;
m_recognizer->StopContinuousRecognitionAsync().get(); m_recognizer->StopContinuousRecognitionAsync().get();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "GStreamer::finish - recognition has completed (%p)\n", this); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "GStreamer::finish - recognition has completed (%p)\n", this);
@@ -324,6 +354,7 @@ private:
std::string m_region; std::string m_region;
std::shared_ptr<SpeechRecognizer> m_recognizer; std::shared_ptr<SpeechRecognizer> m_recognizer;
std::shared_ptr<PushAudioInputStream> m_pushStream; std::shared_ptr<PushAudioInputStream> m_pushStream;
std::ostringstream m_configuration_stream;
responseHandler_t m_responseHandler; responseHandler_t m_responseHandler;
bool m_interim; bool m_interim;
@@ -388,6 +419,7 @@ extern "C" {
GStreamer *streamer = NULL; GStreamer *streamer = NULL;
switch_status_t status = SWITCH_STATUS_SUCCESS; switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname);
int err; int err;
switch_threadattr_t *thd_attr = NULL; switch_threadattr_t *thd_attr = NULL;
switch_memory_pool_t *pool = switch_core_session_get_pool(session); switch_memory_pool_t *pool = switch_core_session_get_pool(session);
@@ -418,15 +450,44 @@ extern "C" {
cb->responseHandler = responseHandler; cb->responseHandler = responseHandler;
cb->interim = interim;
strncpy(cb->lang, lang, MAX_LANG);
try {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s: initializing gstreamer with %s\n",
switch_channel_get_name(channel), bugname);
streamer = new GStreamer(sessionId, bugname, channels, lang, interim, sampleRate, cb->region, subscriptionKey, responseHandler);
cb->streamer = streamer;
if (bug) {
struct cap_cb* existing_cb = (struct cap_cb*) switch_core_media_bug_get_user_data(bug);
GStreamer* existing_streamer = (GStreamer*) existing_cb->streamer;
if (0 != strcmp(existing_streamer->configuration(), streamer->configuration())) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_init: stop existing azure connection, old configuration %s, new configuration %s\n",
existing_streamer->configuration(), streamer->configuration());
if (existing_streamer) reaper(existing_cb);
killcb(existing_cb);
switch_mutex_destroy(existing_cb->mutex);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_init: enable existing azure connection\n");
killcb(cb);
cb = existing_cb;
status = SWITCH_STATUS_SUCCESS;
goto done;
}
}
} catch (std::exception& e) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error initializing gstreamer: %s.\n",
switch_channel_get_name(channel), e.what());
return SWITCH_STATUS_FALSE;
}
if (switch_mutex_init(&cb->mutex, SWITCH_MUTEX_NESTED, pool) != SWITCH_STATUS_SUCCESS) { if (switch_mutex_init(&cb->mutex, SWITCH_MUTEX_NESTED, pool) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mutex\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mutex\n");
status = SWITCH_STATUS_FALSE; status = SWITCH_STATUS_FALSE;
goto done; goto done;
} }
cb->interim = interim;
strncpy(cb->lang, lang, MAX_LANG);
/* determine if we need to resample the audio to 16-bit 8khz */ /* determine if we need to resample the audio to 16-bit 8khz */
if (sampleRate != 8000) { if (sampleRate != 8000) {
cb->resampler = speex_resampler_init(1, sampleRate, 8000, SWITCH_RESAMPLE_QUALITY, &err); cb->resampler = speex_resampler_init(1, sampleRate, 8000, SWITCH_RESAMPLE_QUALITY, &err);
@@ -468,34 +529,28 @@ extern "C" {
switch_channel_get_name(channel), voice_ms, mode); switch_channel_get_name(channel), voice_ms, mode);
} }
} }
if (!cb->vad) streamer->connect();
try {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "%s: initializing gstreamer with %s\n",
switch_channel_get_name(channel), bugname);
streamer = new GStreamer(sessionId, bugname, channels, lang, interim, sampleRate, cb->region, subscriptionKey, responseHandler);
cb->streamer = streamer;
if (!cb->vad) streamer->connect();
} catch (std::exception& e) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s: Error initializing gstreamer: %s.\n",
switch_channel_get_name(channel), e.what());
return SWITCH_STATUS_FALSE;
}
*ppUserData = cb;
done: done:
*ppUserData = cb;
cb->is_keep_alive = 0;
return status; return status;
} }
switch_status_t azure_transcribe_session_stop(switch_core_session_t *session, int channelIsClosing, char* bugname) { switch_status_t azure_transcribe_session_stop(switch_core_session_t *session, int channelIsClosing, char* bugname) {
switch_channel_t *channel = switch_core_session_get_channel(session); switch_channel_t *channel = switch_core_session_get_channel(session);
switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname); switch_media_bug_t *bug = (switch_media_bug_t*) switch_channel_get_private(channel, bugname);
const bool use_single_connection = switch_true(std::getenv("AZURE_SPEECH_USE_SINGLE_CONNECTION"));
if (bug) { if (bug) {
struct cap_cb *cb = (struct cap_cb *) switch_core_media_bug_get_user_data(bug); struct cap_cb *cb = (struct cap_cb *) switch_core_media_bug_get_user_data(bug);
switch_status_t st; switch_status_t st;
if (use_single_connection && !channelIsClosing) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_stop: call is running, use_single_connection is true, keep alive is activated\n");
cb->is_keep_alive = 1;
return SWITCH_STATUS_SUCCESS;
}
// close connection and get final responses // close connection and get final responses
switch_mutex_lock(cb->mutex); switch_mutex_lock(cb->mutex);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_stop: locked session\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_stop: locked session\n");
@@ -507,6 +562,7 @@ extern "C" {
if (streamer) reaper(cb); if (streamer) reaper(cb);
killcb(cb); killcb(cb);
switch_mutex_unlock(cb->mutex); switch_mutex_unlock(cb->mutex);
switch_mutex_destroy(cb->mutex);
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_stop: unlocked session\n"); switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "azure_transcribe_session_stop: unlocked session\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
@@ -524,6 +580,19 @@ extern "C" {
frame.data = data; frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE; frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
if (cb->is_keep_alive) {
// remove media bug buffered data
while (true) {
unsigned char data[SWITCH_RECOMMENDED_BUFFER_SIZE] = {0};
switch_frame_t frame = { 0 };
frame.data = data;
frame.buflen = SWITCH_RECOMMENDED_BUFFER_SIZE;
switch_status_t rv = switch_core_media_bug_read(bug, &frame, SWITCH_TRUE);
if (rv != SWITCH_STATUS_SUCCESS) break;
}
return SWITCH_TRUE;
}
if (switch_mutex_trylock(cb->mutex) == SWITCH_STATUS_SUCCESS) { if (switch_mutex_trylock(cb->mutex) == SWITCH_STATUS_SUCCESS) {
GStreamer* streamer = (GStreamer *) cb->streamer; GStreamer* streamer = (GStreamer *) cb->streamer;

View File

@@ -71,9 +71,9 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi
switch_codec_implementation_t read_impl = { 0 }; switch_codec_implementation_t read_impl = { 0 };
void *pUserData; void *pUserData;
uint32_t samples_per_second; uint32_t samples_per_second;
int use_single_connection = switch_true(getenv("AZURE_SPEECH_USE_SINGLE_CONNECTION"));
bug = switch_channel_get_private(channel, bugname);
if (switch_channel_get_private(channel, bugname)) { if (bug && !use_single_connection) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "removing bug from previous transcribe\n");
do_stop(session, bugname); do_stop(session, bugname);
} }
@@ -91,12 +91,13 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_medi
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initializing azure speech session.\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error initializing azure speech session.\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if ((status = switch_core_media_bug_add(session, bugname, NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) { if (!bug || !use_single_connection) {
return status; if ((status = switch_core_media_bug_add(session, bugname, NULL, capture_callback, pUserData, 0, flags, &bug)) != SWITCH_STATUS_SUCCESS) {
return status;
}
switch_channel_set_private(channel, bugname, bug);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for azure transcribe\n");
} }
switch_channel_set_private(channel, bugname, bug);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "added media bug for azure transcribe\n");
return SWITCH_STATUS_SUCCESS; return SWITCH_STATUS_SUCCESS;
} }

View File

@@ -37,6 +37,8 @@ struct cap_cb {
char lang[MAX_LANG]; char lang[MAX_LANG];
char region[MAX_REGION]; char region[MAX_REGION];
int is_keep_alive;
switch_vad_t * vad; switch_vad_t * vad;
}; };