[mod_verto] HEPv3 support.

This commit is contained in:
Dmitry Verenitsin
2025-06-09 15:09:41 +05:00
parent 72827d0944
commit 9ab431831f

View File

@@ -68,12 +68,36 @@ SWITCH_MODULE_DEFINITION(mod_verto, mod_verto_load, mod_verto_shutdown, mod_vert
static struct globals_s verto_globals;
static const uint8_t KS_HEPV3_PROTO_VERTO = 0x3d;
static struct {
switch_mutex_t *store_mutex;
switch_hash_t *store_hash;
} json_GLOBALS;
static struct {
ks_socket_t socket;
switch_bool_t start_on_load;
char server[256];
ks_port_t port;
uint32_t node_id;
switch_bool_t use_tls;
switch_memory_pool_t *pool;
switch_mutex_t *mutex;
int enabled;
int running; /* Thread is active, but connection may not be established. */
int accept_packets; /* Connection is active so we can fill send buffer. */
int reconnect; /* Signal capture thread to reinit the connection. */
int terminating;
switch_queue_t *queue;
switch_thread_t *thread;
} capture_globals;
typedef struct {
char *buffer;
ks_size_t buffer_size;
} capture_queue_item_t;
const char json_sql[] =
"create table json_store (\n"
@@ -580,6 +604,220 @@ static void jrpc_add_result(cJSON *obj, cJSON *result)
}
}
static void verto_hepv3_capture_send(jsock_t *jsock, char *verto_payload, ks_size_t verto_payload_size, ks_hepv3_direction_t direction)
{
char *hep_data = NULL;
ks_hepv3_capture_params_t hep_params = {0};
size_t hep_size = 0;
hep_params.direction = direction;
hep_params.ip_family = jsock->family;
hep_params.remote_ip = jsock->remote_host;
hep_params.remote_port = jsock->remote_port;
hep_params.local_ip = jsock->profile->ip->local_ip;
hep_params.local_port = jsock->profile->ip->local_port;
hep_params.capture_id = capture_globals.node_id;
hep_params.protocol_type_id = KS_HEPV3_PROTO_VERTO;
hep_params.payload_size = verto_payload_size;
hep_params.payload = verto_payload;
hep_size = ks_hepv3_capture_create(&hep_params, &hep_data);
if (hep_size > 0) {
capture_queue_item_t *capture_item = NULL;
capture_item = malloc(sizeof(capture_queue_item_t));
capture_item->buffer = hep_data;
capture_item->buffer_size = hep_size;
switch_queue_trypush(capture_globals.queue, capture_item);
}
}
/* Manages the connection to capture server. Sends queued packets.*/
static void *SWITCH_THREAD_FUNC verto_hepv3_capture_runtime(switch_thread_t *thread, void *obj)
{
static const switch_interval_time_t queue_pop_timeout_us = 100 * 1000;
static const switch_interval_time_t reconnect_loop_interval_us = 100 * 1000;
static const uint32_t reconnect_loop_max = 50;
/* Actual reconnect interval = (reconnect_loop_interval_us * reconnect_loop_max) */
static const uint32_t verto_hepv3_reconnect_s = (uint32_t)((reconnect_loop_interval_us * reconnect_loop_max) / (1000 * 1000));
uint32_t reconnect_loop = reconnect_loop_max;
ks_hepv3_socket_t *capture_socket = NULL;
ks_pool_t *pool = NULL;
capture_globals.running = 1;
ks_pool_open(&pool);
while (capture_globals.running) {
capture_queue_item_t *capture_item = NULL;
/* Disconnect if signalled */
if (capture_globals.reconnect) {
capture_globals.reconnect = 0;
if (capture_socket) {
ks_hepv3_socket_destroy(&capture_socket);
}
}
/* Reconnect on init and errors. */
if (capture_socket == NULL) {
ks_hepv3_socket_params_t hep_socket_params = {0};
ks_status_t init_status;
/* Interrupt delay between reconnections to check "running" state more frequently. */
if (++reconnect_loop < 50) {
switch_yield(reconnect_loop_interval_us);
continue;
}
reconnect_loop = 0;
switch_mutex_lock(capture_globals.mutex);
hep_socket_params.server = capture_globals.server;
hep_socket_params.port = capture_globals.port;
hep_socket_params.use_tls = (ks_bool_t)capture_globals.use_tls;
hep_socket_params.pool = pool;
init_status = ks_hepv3_socket_init(&hep_socket_params, &capture_socket);
switch_mutex_unlock(capture_globals.mutex);
if (init_status != KS_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Can't establish the capture connection. Retry in [%d] sec.\n", verto_hepv3_reconnect_s);
continue;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Capture connection established.\n");
}
}
capture_globals.accept_packets = 1;
/* Send queued packages. */
if (switch_queue_pop_timeout(capture_globals.queue, (void **)&capture_item, queue_pop_timeout_us) == SWITCH_STATUS_SUCCESS) {
if (capture_item && capture_item->buffer) {
ks_size_t size_to_send = capture_item->buffer_size;
if ((capture_item->buffer_size) && ks_hepv3_socket_write(capture_socket, capture_item->buffer, &capture_item->buffer_size) != KS_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket due to connection error...\n");
ks_hepv3_socket_destroy(&capture_socket);
} else if (capture_item->buffer_size != size_to_send) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Capture attempted/sent mismatch [%" SWITCH_SIZE_T_FMT "]/[%" SWITCH_SIZE_T_FMT "]\n", size_to_send, capture_item->buffer_size);
}
free(capture_item->buffer);
}
switch_safe_free(capture_item);
}
switch_yield(10);
}
capture_globals.accept_packets = 0;
if (capture_socket != NULL) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Closing capture socket...\n");
ks_hepv3_socket_destroy(&capture_socket);
}
ks_pool_close(&pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Capture thread stopped.\n");
return NULL;
}
static void verto_hepv3_capture_thread_run(void)
{
switch_threadattr_t *thd_attr = NULL;
if (capture_globals.thread) {
return;
}
switch_threadattr_create(&thd_attr, capture_globals.pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_thread_create(&capture_globals.thread, thd_attr, verto_hepv3_capture_runtime, NULL, capture_globals.pool);
}
static void verto_hepv3_capture_start(void)
{
if (capture_globals.enabled || capture_globals.terminating) {
return;
}
if (zstr(capture_globals.server)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Can't start capture. Server is not configured.\n");
return;
}
capture_globals.enabled = SWITCH_TRUE;
verto_hepv3_capture_thread_run();
}
static void verto_hepv3_capture_stop(void)
{
capture_queue_item_t *capture_item = NULL;
uint32_t count = 0;
switch_status_t thread_status;
if (!capture_globals.enabled) {
return;
}
capture_globals.enabled = 0;
/* Signal runtime thread to stop. */
capture_globals.running = 0;
if (capture_globals.thread) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for capture thread to stop...\n");
switch_thread_join(&thread_status, capture_globals.thread);
capture_globals.thread = NULL;
}
/* Cleanup queue */
while (switch_queue_trypop(capture_globals.queue, (void **)&capture_item) == SWITCH_STATUS_SUCCESS) {
count++;
if (capture_item && capture_item->buffer) {
switch_safe_free(capture_item->buffer);
}
switch_safe_free(capture_item);
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Cleaned up [%d] queued capture packets.\n", count);
}
static void verto_hepv3_capture_terminate(void)
{
capture_globals.terminating = 1;
if (capture_globals.enabled) {
verto_hepv3_capture_stop();
}
switch_queue_term(capture_globals.queue);
switch_mutex_destroy(capture_globals.mutex);
switch_core_destroy_memory_pool(&capture_globals.pool);
}
/* The separate init for data that we need even if the capture thread isn't active */
static void verto_hepv3_capture_init(void)
{
memset(&capture_globals, 0, sizeof(capture_globals));
capture_globals.start_on_load = SWITCH_FALSE;
capture_globals.node_id = KS_HEPV3_DEFAULT_NODE_ID;
capture_globals.port = KS_HEPV3_DEFAULT_PORT;
capture_globals.use_tls = SWITCH_FALSE;
switch_core_new_memory_pool(&capture_globals.pool);
switch_mutex_init(&capture_globals.mutex, SWITCH_MUTEX_DEFAULT, capture_globals.pool);
switch_queue_create(&capture_globals.queue, 1000, capture_globals.pool);
}
static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t destroy)
{
char *json_text;
@@ -601,14 +839,21 @@ static switch_ssize_t ws_write_json(jsock_t *jsock, cJSON **json, switch_bool_t
}
if ((json_text = cJSON_PrintUnformatted(*json))) {
ks_size_t json_size = strlen(json_text);
if (jsock->profile->debug || verto_globals.debug) {
//char *log_text = cJSON_Prin(*json);
switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "WRITE %s [%s]\n", jsock->name, json_text);
//free(log_text);
}
switch_mutex_lock(jsock->write_mutex);
r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, strlen(json_text));
r = kws_write_frame(jsock->ws, WSOC_TEXT, json_text, json_size);
switch_mutex_unlock(jsock->write_mutex);
if (capture_globals.accept_packets) {
verto_hepv3_capture_send(jsock, json_text, json_size, KS_HEPV3_DIR_SEND);
}
switch_safe_free(json_text);
}
@@ -1533,9 +1778,17 @@ static switch_status_t process_input(jsock_t *jsock, uint8_t *data, switch_ssize
if (json) {
if (jsock->profile->debug || verto_globals.debug) {
if (jsock->profile->debug || verto_globals.debug || capture_globals.accept_packets) {
char *log_text = cJSON_PrintUnformatted(json);
switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
if (jsock->profile->debug || verto_globals.debug) {
switch_log_printf(SWITCH_CHANNEL_LOG, verto_globals.debug_level, "READ %s [%s]\n", jsock->name, log_text);
}
if (capture_globals.accept_packets) {
verto_hepv3_capture_send(jsock, log_text, strlen(log_text), KS_HEPV3_DIR_RECV);
}
free(log_text);
}
@@ -5035,6 +5288,8 @@ static void do_shutdown(void)
unsub_all_jsock();
verto_hepv3_capture_terminate();
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Done\n");
}
@@ -5461,6 +5716,22 @@ static switch_status_t parse_config(const char *cf)
if (val) {
verto_globals.kslog_on = switch_true(val);
}
} else if (!strcasecmp(var, "capture-server") && !zstr(val)) {
switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", val);
} else if (!strcasecmp(var, "capture-port") && val) {
int tmp = atoi(val);
if (tmp > 0) {
capture_globals.port = tmp;
}
} else if (!strcasecmp(var, "capture-id") && val) {
int tmp = atoi(val);
if (tmp > 0) {
capture_globals.node_id = tmp;
}
} else if (!strcasecmp(var, "capture-use-tls") && val) {
capture_globals.use_tls = switch_true(val);
} else if (!strcasecmp(var, "capture-enable") && val) {
capture_globals.start_on_load = switch_true(val);
}
}
}
@@ -5694,6 +5965,84 @@ static switch_status_t cmd_json_status(char **argv, int argc, switch_stream_hand
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t verto_hepv3_capture_set_server(const char *server, const char *str_port)
{
if (zstr(server)) {
return SWITCH_STATUS_FALSE;
}
switch_mutex_lock(capture_globals.mutex);
switch_snprintf(capture_globals.server, sizeof(capture_globals.server), "%s", server);
if (str_port) {
int tmp = atoi(str_port);
if (tmp > 0) {
capture_globals.port = (ks_port_t)tmp;
}
}
capture_globals.reconnect = 1;
switch_mutex_unlock(capture_globals.mutex);
return SWITCH_STATUS_SUCCESS;
}
static switch_status_t verto_hepv3_capture_set_id(const char *str_id)
{
if (str_id) {
int tmp = atoi(str_id);
if (tmp > 0) {
capture_globals.node_id = tmp;
return SWITCH_STATUS_SUCCESS;
}
}
return SWITCH_STATUS_FALSE;
}
static switch_status_t cmd_capture(char **argv, int argc, switch_stream_handle_t *stream)
{
switch_status_t status = SWITCH_STATUS_SUCCESS;
if (zstr(argv[0])) {
switch_mutex_lock(capture_globals.mutex);
stream->write_function(stream, "Server: %s\n", capture_globals.server);
stream->write_function(stream, "Port: %d\n", capture_globals.port);
stream->write_function(stream, "Node Id: %d\n", capture_globals.node_id);
stream->write_function(stream, "Use TLS: %s\n", capture_globals.use_tls ? "yes" : "no" );
stream->write_function(stream, "Enabled: %s\n", capture_globals.enabled ? "yes" : "no" );
stream->write_function(stream, "Queue: %d\n", switch_queue_size(capture_globals.queue));
switch_mutex_unlock(capture_globals.mutex);
return SWITCH_STATUS_SUCCESS;
}
if (!strcasecmp(argv[0], "id")) {
status = verto_hepv3_capture_set_id(argv[1]);
} else if (!strcasecmp(argv[0], "server")) {
status = verto_hepv3_capture_set_server(argv[1], argv[2]);
} else if (!strcasecmp(argv[0], "on")) {
verto_hepv3_capture_start();
} else if (!strcasecmp(argv[0], "off")) {
verto_hepv3_capture_stop();
} else {
status = SWITCH_STATUS_FALSE;
}
if (status == SWITCH_STATUS_SUCCESS) {
stream->write_function(stream, "+OK");
} else {
stream->write_function(stream, "-ERR");
}
return status;
}
SWITCH_STANDARD_API(verto_function)
{
char *argv[1024] = { 0 };
@@ -5709,6 +6058,9 @@ SWITCH_STANDARD_API(verto_function)
"verto debug [0-10]\n"
"verto perm <sessid> <type> <value>\n"
"verto noperm <sessid> <type> <value>\n"
"verto capture [on|off]\n"
"verto capture server <server> [port]\n"
"verto capture id <id>\n"
"--------------------------------------------------------------------------------\n";
if (zstr(cmd)) {
@@ -5771,6 +6123,8 @@ SWITCH_STANDARD_API(verto_function)
}
stream->write_function(stream, "Debug Level: %s\n", switch_log_level2str(verto_globals.debug_level));
goto done;
} else if (!strcasecmp(argv[0], "capture")) {
func = cmd_capture;
}
if (func) {
@@ -6814,7 +7168,9 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
verto_globals.enable_presence = SWITCH_TRUE;
verto_globals.enable_fs_events = SWITCH_FALSE;
verto_globals.debug_level = SWITCH_LOG_INFO;
verto_hepv3_capture_init();
switch_mutex_init(&verto_globals.mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
switch_mutex_init(&verto_globals.method_mutex, SWITCH_MUTEX_NESTED, verto_globals.pool);
@@ -6880,6 +7236,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
switch_console_set_complete("add verto debug-level");
switch_console_set_complete("add verto status");
switch_console_set_complete("add verto xmlstatus");
switch_console_set_complete("add verto capture");
switch_console_set_complete("add verto capture on");
switch_console_set_complete("add verto capture off");
switch_console_set_complete("add verto capture server");
switch_console_set_complete("add verto capture id");
SWITCH_ADD_JSON_API(json_api_interface, "store", "JSON store", json_store_function, "");
@@ -6910,6 +7271,11 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_verto_load)
run_profiles();
if (capture_globals.start_on_load) {
verto_hepv3_capture_start();
}
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}