separate memsink and rtp threads

This commit is contained in:
Maxim Devaev
2022-07-13 06:41:04 +03:00
parent f9439c785f
commit 20f056668f
3 changed files with 56 additions and 33 deletions

View File

@@ -46,19 +46,25 @@ int memsink_fd_wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) {
return -2; return -2;
} }
int memsink_fd_get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id) { frame_s *memsink_fd_get_frame(int fd, memsink_shared_s *mem, uint64_t *frame_id) {
frame_s *frame = frame_init();
frame_set_data(frame, mem->data, mem->used); frame_set_data(frame, mem->data, mem->used);
FRAME_COPY_META(mem, frame); FRAME_COPY_META(mem, frame);
*frame_id = mem->id; *frame_id = mem->id;
mem->last_client_ts = get_now_monotonic(); mem->last_client_ts = get_now_monotonic();
int retval = 0;
bool ok = true;
if (frame->format != V4L2_PIX_FMT_H264) { if (frame->format != V4L2_PIX_FMT_H264) {
JLOG_ERROR("video", "Got non-H264 frame from memsink"); JLOG_ERROR("video", "Got non-H264 frame from memsink");
retval = -1; ok = false;
} }
if (flock(fd, LOCK_UN) < 0) { if (flock(fd, LOCK_UN) < 0) {
JLOG_PERROR("video", "Can't unlock memsink"); JLOG_PERROR("video", "Can't unlock memsink");
retval = -1; ok = false;
} }
return retval; if (!ok) {
frame_destroy(frame);
frame = NULL;
}
return frame;
} }

View File

@@ -35,4 +35,4 @@
int memsink_fd_wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id); int memsink_fd_wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id);
int memsink_fd_get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id); frame_s *memsink_fd_get_frame(int fd, memsink_shared_s *mem, uint64_t *frame_id);

View File

@@ -44,6 +44,7 @@
#include "const.h" #include "const.h"
#include "logging.h" #include "logging.h"
#include "queue.h"
#include "client.h" #include "client.h"
#include "audio.h" #include "audio.h"
#include "tc358743.h" #include "tc358743.h"
@@ -64,11 +65,14 @@ const useconds_t _g_watchers_polling = 100000;
static client_s *_g_clients = NULL; static client_s *_g_clients = NULL;
static janus_callbacks *_g_gw = NULL; static janus_callbacks *_g_gw = NULL;
static queue_s *_g_video_queue = NULL;
static rtpv_s *_g_rtpv = NULL; static rtpv_s *_g_rtpv = NULL;
static rtpa_s *_g_rtpa = NULL; static rtpa_s *_g_rtpa = NULL;
static pthread_t _g_video_tid; static pthread_t _g_video_rtp_tid;
static atomic_bool _g_video_tid_created = false; static atomic_bool _g_video_rtp_tid_created = false;
static pthread_t _g_video_sink_tid;
static atomic_bool _g_video_sink_tid_created = false;
static pthread_t _g_audio_tid; static pthread_t _g_audio_tid;
static atomic_bool _g_audio_tid_created = false; static atomic_bool _g_audio_tid_created = false;
@@ -98,14 +102,28 @@ static atomic_bool _g_has_watchers = false;
if (error_reported != _error_code) { __VA_ARGS__; error_reported = _error_code; } \ if (error_reported != _error_code) { __VA_ARGS__; error_reported = _error_code; } \
} }
static void *_video_thread(UNUSED void *arg) { static void *_video_rtp_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_video"); A_THREAD_RENAME("us_video_rtp");
atomic_store(&_g_video_tid_created, true); atomic_store(&_g_video_rtp_tid_created, true);
while (!STOP) {
frame_s *frame;
if (queue_get(_g_video_queue, (void **)&frame, 0.1) == 0) {
LOCK_VIDEO;
rtpv_wrap(_g_rtpv, frame);
UNLOCK_VIDEO;
frame_destroy(frame);
}
}
return NULL;
}
static void *_video_sink_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_video_sink");
atomic_store(&_g_video_sink_tid_created, true);
atomic_store(&_g_ready, true); atomic_store(&_g_ready, true);
frame_s *frame = frame_init();
uint64_t frame_id = 0; uint64_t frame_id = 0;
unsigned error_reported = 0; unsigned error_reported = 0;
while (!STOP) { while (!STOP) {
@@ -134,12 +152,14 @@ static void *_video_thread(UNUSED void *arg) {
while (!STOP && HAS_WATCHERS) { while (!STOP && HAS_WATCHERS) {
int result = memsink_fd_wait_frame(fd, mem, frame_id); int result = memsink_fd_wait_frame(fd, mem, frame_id);
if (result == 0) { if (result == 0) {
if (memsink_fd_get_frame(fd, mem, frame, &frame_id) != 0) { frame_s *frame = memsink_fd_get_frame(fd, mem, &frame_id);
if (frame == NULL) {
goto close_memsink; goto close_memsink;
} }
LOCK_VIDEO; if (queue_put(_g_video_queue, frame, 0) != 0) {
rtpv_wrap(_g_rtpv, frame); IF_NOT_REPORTED({ JLOG_PERROR("video", "Video queue is full"); });
UNLOCK_VIDEO; frame_destroy(frame);
}
} else if (result == -1) { } else if (result == -1) {
goto close_memsink; goto close_memsink;
} }
@@ -157,8 +177,6 @@ static void *_video_thread(UNUSED void *arg) {
} }
sleep(1); // error_delay sleep(1); // error_delay
} }
frame_destroy(frame);
return NULL; return NULL;
} }
@@ -240,10 +258,6 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
// sysctl -w net.core.wmem_max=1000000 // sysctl -w net.core.wmem_max=1000000
JLOG_INFO("main", "Initializing plugin ..."); JLOG_INFO("main", "Initializing plugin ...");
assert(!atomic_load(&_g_video_tid_created));
assert(!atomic_load(&_g_audio_tid_created));
assert(!READY);
assert(!STOP);
if (gw == NULL || config_dir_path == NULL || read_config(config_dir_path, if (gw == NULL || config_dir_path == NULL || read_config(config_dir_path,
&_g_video_sink_name, &_g_video_sink_name,
&_g_audio_dev_name, &_g_audio_dev_name,
@@ -252,35 +266,38 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
return -1; return -1;
} }
_g_gw = gw; _g_gw = gw;
_g_video_queue = queue_init(1024);
_g_rtpv = rtpv_init(_relay_rtp_clients); _g_rtpv = rtpv_init(_relay_rtp_clients);
if (_g_audio_dev_name) { if (_g_audio_dev_name) {
_g_rtpa = rtpa_init(_relay_rtp_clients); _g_rtpa = rtpa_init(_relay_rtp_clients);
A_THREAD_CREATE(&_g_audio_tid, _audio_thread, NULL); A_THREAD_CREATE(&_g_audio_tid, _audio_thread, NULL);
} }
A_THREAD_CREATE(&_g_video_tid, _video_thread, NULL); A_THREAD_CREATE(&_g_video_rtp_tid, _video_rtp_thread, NULL);
A_THREAD_CREATE(&_g_video_sink_tid, _video_sink_thread, NULL);
return 0; return 0;
} }
static void _plugin_destroy(void) { static void _plugin_destroy(void) {
JLOG_INFO("main", "Destroying plugin ..."); JLOG_INFO("main", "Destroying plugin ...");
atomic_store(&_g_stop, true); atomic_store(&_g_stop, true);
if (atomic_load(&_g_video_tid_created)) { # define JOIN(_tid) { if (atomic_load(&_tid##_created)) { A_THREAD_JOIN(_tid); } }
A_THREAD_JOIN(_g_video_tid); JOIN(_g_video_sink_tid);
} JOIN(_g_video_rtp_tid);
if (atomic_load(&_g_audio_tid_created)) { JOIN(_g_audio_tid);
A_THREAD_JOIN(_g_audio_tid); # undef JOIN
}
LIST_ITERATE(_g_clients, client, { LIST_ITERATE(_g_clients, client, {
LIST_REMOVE(_g_clients, client); LIST_REMOVE(_g_clients, client);
client_destroy(client); client_destroy(client);
}); });
_g_clients = NULL;
# define DEL(_func, _var) { if (_var) { _func(_var); _var = NULL; } } QUEUE_FREE_ITEMS_AND_DESTROY(_g_video_queue, frame_destroy);
# define DEL(_func, _var) { if (_var) { _func(_var); } }
DEL(rtpa_destroy, _g_rtpa); DEL(rtpa_destroy, _g_rtpa);
DEL(rtpv_destroy, _g_rtpv); DEL(rtpv_destroy, _g_rtpv);
_g_gw = NULL;
DEL(free, _g_tc358743_dev_path); DEL(free, _g_tc358743_dev_path);
DEL(free, _g_audio_dev_name); DEL(free, _g_audio_dev_name);
DEL(free, _g_video_sink_name); DEL(free, _g_video_sink_name);