From 20f056668f99f139e2c7fe49059e74b33793824d Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Wed, 13 Jul 2022 06:41:04 +0300 Subject: [PATCH] separate memsink and rtp threads --- janus/src/memsinkfd.c | 16 +++++++--- janus/src/memsinkfd.h | 2 +- janus/src/plugin.c | 71 +++++++++++++++++++++++++++---------------- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/janus/src/memsinkfd.c b/janus/src/memsinkfd.c index 4932ca1..7041c13 100644 --- a/janus/src/memsinkfd.c +++ b/janus/src/memsinkfd.c @@ -46,19 +46,25 @@ int memsink_fd_wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) { return -2; } -int memsink_fd_get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id) { +frame_s *memsink_fd_get_frame(int fd, memsink_shared_s *mem, uint64_t *frame_id) { + frame_s *frame = frame_init(); frame_set_data(frame, mem->data, mem->used); FRAME_COPY_META(mem, frame); *frame_id = mem->id; mem->last_client_ts = get_now_monotonic(); - int retval = 0; + + bool ok = true; if (frame->format != V4L2_PIX_FMT_H264) { JLOG_ERROR("video", "Got non-H264 frame from memsink"); - retval = -1; + ok = false; } if (flock(fd, LOCK_UN) < 0) { JLOG_PERROR("video", "Can't unlock memsink"); - retval = -1; + ok = false; } - return retval; + if (!ok) { + frame_destroy(frame); + frame = NULL; + } + return frame; } diff --git a/janus/src/memsinkfd.h b/janus/src/memsinkfd.h index 552c0d0..ee2804c 100644 --- a/janus/src/memsinkfd.h +++ b/janus/src/memsinkfd.h @@ -35,4 +35,4 @@ int memsink_fd_wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id); -int memsink_fd_get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id); +frame_s *memsink_fd_get_frame(int fd, memsink_shared_s *mem, uint64_t *frame_id); diff --git a/janus/src/plugin.c b/janus/src/plugin.c index e26ba78..e6f5283 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -44,6 +44,7 @@ #include "const.h" #include "logging.h" +#include "queue.h" #include "client.h" #include "audio.h" #include "tc358743.h" @@ -64,11 +65,14 @@ const useconds_t _g_watchers_polling = 100000; static client_s *_g_clients = NULL; static janus_callbacks *_g_gw = NULL; +static queue_s *_g_video_queue = NULL; static rtpv_s *_g_rtpv = NULL; static rtpa_s *_g_rtpa = NULL; -static pthread_t _g_video_tid; -static atomic_bool _g_video_tid_created = false; +static pthread_t _g_video_rtp_tid; +static atomic_bool _g_video_rtp_tid_created = false; +static pthread_t _g_video_sink_tid; +static atomic_bool _g_video_sink_tid_created = false; static pthread_t _g_audio_tid; static atomic_bool _g_audio_tid_created = false; @@ -98,14 +102,28 @@ static atomic_bool _g_has_watchers = false; if (error_reported != _error_code) { __VA_ARGS__; error_reported = _error_code; } \ } -static void *_video_thread(UNUSED void *arg) { - A_THREAD_RENAME("us_video"); - atomic_store(&_g_video_tid_created, true); +static void *_video_rtp_thread(UNUSED void *arg) { + A_THREAD_RENAME("us_video_rtp"); + atomic_store(&_g_video_rtp_tid_created, true); + + while (!STOP) { + frame_s *frame; + if (queue_get(_g_video_queue, (void **)&frame, 0.1) == 0) { + LOCK_VIDEO; + rtpv_wrap(_g_rtpv, frame); + UNLOCK_VIDEO; + frame_destroy(frame); + } + } + return NULL; +} + +static void *_video_sink_thread(UNUSED void *arg) { + A_THREAD_RENAME("us_video_sink"); + atomic_store(&_g_video_sink_tid_created, true); atomic_store(&_g_ready, true); - frame_s *frame = frame_init(); uint64_t frame_id = 0; - unsigned error_reported = 0; while (!STOP) { @@ -134,12 +152,14 @@ static void *_video_thread(UNUSED void *arg) { while (!STOP && HAS_WATCHERS) { int result = memsink_fd_wait_frame(fd, mem, frame_id); if (result == 0) { - if (memsink_fd_get_frame(fd, mem, frame, &frame_id) != 0) { + frame_s *frame = memsink_fd_get_frame(fd, mem, &frame_id); + if (frame == NULL) { goto close_memsink; } - LOCK_VIDEO; - rtpv_wrap(_g_rtpv, frame); - UNLOCK_VIDEO; + if (queue_put(_g_video_queue, frame, 0) != 0) { + IF_NOT_REPORTED({ JLOG_PERROR("video", "Video queue is full"); }); + frame_destroy(frame); + } } else if (result == -1) { goto close_memsink; } @@ -157,8 +177,6 @@ static void *_video_thread(UNUSED void *arg) { } sleep(1); // error_delay } - - frame_destroy(frame); return NULL; } @@ -240,10 +258,6 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { // sysctl -w net.core.wmem_max=1000000 JLOG_INFO("main", "Initializing plugin ..."); - assert(!atomic_load(&_g_video_tid_created)); - assert(!atomic_load(&_g_audio_tid_created)); - assert(!READY); - assert(!STOP); if (gw == NULL || config_dir_path == NULL || read_config(config_dir_path, &_g_video_sink_name, &_g_audio_dev_name, @@ -252,35 +266,38 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { return -1; } _g_gw = gw; + + _g_video_queue = queue_init(1024); _g_rtpv = rtpv_init(_relay_rtp_clients); if (_g_audio_dev_name) { _g_rtpa = rtpa_init(_relay_rtp_clients); A_THREAD_CREATE(&_g_audio_tid, _audio_thread, NULL); } - A_THREAD_CREATE(&_g_video_tid, _video_thread, NULL); + A_THREAD_CREATE(&_g_video_rtp_tid, _video_rtp_thread, NULL); + A_THREAD_CREATE(&_g_video_sink_tid, _video_sink_thread, NULL); return 0; } static void _plugin_destroy(void) { JLOG_INFO("main", "Destroying plugin ..."); + atomic_store(&_g_stop, true); - if (atomic_load(&_g_video_tid_created)) { - A_THREAD_JOIN(_g_video_tid); - } - if (atomic_load(&_g_audio_tid_created)) { - A_THREAD_JOIN(_g_audio_tid); - } +# define JOIN(_tid) { if (atomic_load(&_tid##_created)) { A_THREAD_JOIN(_tid); } } + JOIN(_g_video_sink_tid); + JOIN(_g_video_rtp_tid); + JOIN(_g_audio_tid); +# undef JOIN LIST_ITERATE(_g_clients, client, { LIST_REMOVE(_g_clients, client); client_destroy(client); }); - _g_clients = NULL; -# define DEL(_func, _var) { if (_var) { _func(_var); _var = NULL; } } + QUEUE_FREE_ITEMS_AND_DESTROY(_g_video_queue, frame_destroy); + +# define DEL(_func, _var) { if (_var) { _func(_var); } } DEL(rtpa_destroy, _g_rtpa); DEL(rtpv_destroy, _g_rtpv); - _g_gw = NULL; DEL(free, _g_tc358743_dev_path); DEL(free, _g_audio_dev_name); DEL(free, _g_video_sink_name);