From fd2bf5ea25bf429bed597469db8b8046fe49ecaf Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 10 Jul 2022 05:32:53 +0300 Subject: [PATCH] separate thread for each client --- janus/src/client.c | 114 +++++++++++++++++++++++++++++++++++++++++++++ janus/src/client.h | 60 ++++++++++++++++++++++++ janus/src/plugin.c | 42 +++++------------ janus/src/queue.c | 9 +++- janus/src/queue.h | 4 +- 5 files changed, 197 insertions(+), 32 deletions(-) create mode 100644 janus/src/client.c create mode 100644 janus/src/client.h diff --git a/janus/src/client.c b/janus/src/client.c new file mode 100644 index 0000000..afd7177 --- /dev/null +++ b/janus/src/client.c @@ -0,0 +1,114 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "client.h" + + +static void *_video_thread(void *v_client); +static void *_audio_thread(void *v_client); +static void *_common_thread(void *v_client, bool video); + + +client_s *client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio) { + client_s *client; + A_CALLOC(client, 1); + client->gw = gw; + client->session = session; + atomic_init(&client->transmit, true); + + atomic_init(&client->stop, false); + + client->video_queue = queue_init(64); + A_THREAD_CREATE(&client->video_tid, _video_thread, client); + + if (has_audio) { + client->audio_queue = queue_init(64); + A_THREAD_CREATE(&client->audio_tid, _audio_thread, client); + } + return client; +} + +void client_destroy(client_s *client) { + atomic_store(&client->stop, true); + queue_put(client->video_queue, NULL, 0); + if (client->audio_queue != NULL) { + queue_put(client->audio_queue, NULL, 0); + } + + A_THREAD_JOIN(client->video_tid); + QUEUE_FREE_ITEMS_AND_DESTROY(client->video_queue, free); + if (client->audio_queue != NULL) { + A_THREAD_JOIN(client->audio_tid); + QUEUE_FREE_ITEMS_AND_DESTROY(client->audio_queue, free); + } + free(client); +} + +void client_send(client_s *client, const rtp_s *rtp) { + if ( + !atomic_load(&client->transmit) + || (!rtp->video && client->audio_queue == NULL) + ) { + return; + } + rtp_s *new; + A_CALLOC(new, 1); + memcpy(new, rtp, sizeof(rtp_s)); + if (queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) { + JLOG_ERROR("client", "Session %p %s queue is full", + client->session, (new->video ? "video" : "audio")); + free(new); + } +} + +static void *_video_thread(void *v_client) { + return _common_thread(v_client, true); +} + +static void *_audio_thread(void *v_client) { + return _common_thread(v_client, false); +} + +static void *_common_thread(void *v_client, bool video) { + client_s *client = (client_s *)v_client; + queue_s *queue = (video ? client->video_queue : client->audio_queue); + assert(queue != NULL); // Audio may be NULL + + while (!atomic_load(&client->stop)) { + rtp_s *rtp; + if (!queue_get(queue, (void **)&rtp, 0.1)) { + if (rtp == NULL) { + break; + } + if (atomic_load(&client->transmit)) { + janus_plugin_rtp packet = {0}; + packet.video = rtp->video; + packet.buffer = (char *)rtp->datagram; + packet.length = rtp->used; + janus_plugin_rtp_extensions_reset(&packet.extensions); + client->gw->relay_rtp(client->session, &packet); + } + free(rtp); + } + } + return NULL; +} diff --git a/janus/src/client.h b/janus/src/client.h new file mode 100644 index 0000000..de0610c --- /dev/null +++ b/janus/src/client.h @@ -0,0 +1,60 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include "uslibs/tools.h" +#include "uslibs/threading.h" +#include "uslibs/list.h" + +#include "queue.h" +#include "rtp.h" + + +typedef struct client_sx { + janus_callbacks *gw; + janus_plugin_session *session; + atomic_bool transmit; + + pthread_t video_tid; + pthread_t audio_tid; + atomic_bool stop; + + queue_s *video_queue; + queue_s *audio_queue; + + LIST_STRUCT(struct client_sx); +} client_s; + + +client_s *client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio); +void client_destroy(client_s *client); + +void client_send(client_s *client, const rtp_s *rtp); diff --git a/janus/src/plugin.c b/janus/src/plugin.c index 1abbda2..792ec74 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -44,22 +44,16 @@ #include "const.h" #include "logging.h" +#include "client.h" #include "audio.h" #include "tc358743.h" +#include "rtp.h" #include "rtpv.h" #include "rtpa.h" #include "memsinkfd.h" #include "config.h" -typedef struct _client_sx { - janus_plugin_session *session; - bool transmit; - - LIST_STRUCT(struct _client_sx); -} _client_s; - - static janus_plugin *_g_plugin = NULL; static char *_g_video_sink_name = NULL; @@ -68,7 +62,7 @@ static char *_g_tc358743_dev_path = NULL; const useconds_t _g_watchers_polling = 100000; -static _client_s *_g_clients = NULL; +static client_s *_g_clients = NULL; static janus_callbacks *_g_gw = NULL; static rtpv_s *_g_rtpv = NULL; static rtpa_s *_g_rtpa = NULL; @@ -135,9 +129,7 @@ static void *_clients_video_thread(UNUSED void *arg) { if (memsink_fd_get_frame(fd, mem, frame, &frame_id) != 0) { goto close_memsink; } - LOCK; rtpv_wrap(_g_rtpv, frame); - UNLOCK; } else if (result == -1) { goto close_memsink; } @@ -205,9 +197,7 @@ static void *_clients_audio_thread(UNUSED void *arg) { uint64_t pts; int result = audio_get_encoded(audio, data, &size, &pts); if (result == 0) { - LOCK; rtpa_wrap(_g_rtpa, data, size, pts); - UNLOCK; } else if (result == -1) { goto close_audio; } @@ -225,16 +215,11 @@ static void *_clients_audio_thread(UNUSED void *arg) { #undef IF_NOT_REPORTED static void _relay_rtp_clients(const rtp_s *rtp) { - janus_plugin_rtp packet = {0}; - packet.video = rtp->video; - packet.buffer = (char *)rtp->datagram; - packet.length = rtp->used; - janus_plugin_rtp_extensions_reset(&packet.extensions); + LOCK; LIST_ITERATE(_g_clients, client, { - if (client->transmit) { - _g_gw->relay_rtp(client->session, &packet); - } + client_send(client, rtp); }); + UNLOCK; } static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { @@ -278,7 +263,7 @@ static void _plugin_destroy(void) { LIST_ITERATE(_g_clients, client, { LIST_REMOVE(_g_clients, client); - free(client); + client_destroy(client); }); _g_clients = NULL; @@ -298,10 +283,7 @@ static void _plugin_create_session(janus_plugin_session *session, int *err) { IF_DISABLED({ *err = -1; return; }); LOCK; JLOG_INFO("main", "Creating session %p ...", session); - _client_s *client; - A_CALLOC(client, 1); - client->session = session; - client->transmit = true; + client_s *client = client_init(_g_gw, session, (_g_audio_dev_name != NULL)); LIST_APPEND(_g_clients, client); atomic_store(&_g_has_watchers, true); UNLOCK; @@ -316,10 +298,10 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { if (client->session == session) { JLOG_INFO("main", "Removing session %p ...", session); LIST_REMOVE(_g_clients, client); - free(client); + client_destroy(client); found = true; } else { - has_watchers = (has_watchers || client->transmit); + has_watchers = (has_watchers || atomic_load(&client->transmit)); } }); if (!found) { @@ -351,11 +333,11 @@ static void _set_transmit(janus_plugin_session *session, UNUSED const char *msg, bool has_watchers = false; LIST_ITERATE(_g_clients, client, { if (client->session == session) { - client->transmit = transmit; + atomic_store(&client->transmit, transmit); // JLOG_INFO("main", "%s session %p", msg, session); found = true; } - has_watchers = (has_watchers || client->transmit); + has_watchers = (has_watchers || atomic_load(&client->transmit)); }); if (!found) { JLOG_WARN("main", "No session %p", session); diff --git a/janus/src/queue.c b/janus/src/queue.c index 8c3d2f5..cf32e4e 100644 --- a/janus/src/queue.c +++ b/janus/src/queue.c @@ -60,7 +60,14 @@ void queue_destroy(queue_s *queue) { int queue_put(queue_s *queue, void *item, long double timeout) { A_MUTEX_LOCK(&queue->mutex); - WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond); + if (timeout == 0) { + if (queue->size == queue->capacity) { + A_MUTEX_UNLOCK(&queue->mutex); + return -1; + } + } else { + WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond); + } queue->items[queue->in] = item; ++queue->size; ++queue->in; diff --git a/janus/src/queue.h b/janus/src/queue.h index 6981574..e60f2fb 100644 --- a/janus/src/queue.h +++ b/janus/src/queue.h @@ -51,7 +51,9 @@ typedef struct { while (!queue_get_free(_queue)) { \ void *_ptr; \ assert(!queue_get(_queue, &_ptr, 0.1)); \ - _free_item(_ptr); \ + if (_ptr != NULL) { \ + _free_item(_ptr); \ + } \ } \ queue_destroy(_queue); \ }