diff --git a/janus/src/memsinkfd.c b/janus/src/memsinkfd.c index 7aad127..95d7b3e 100644 --- a/janus/src/memsinkfd.c +++ b/janus/src/memsinkfd.c @@ -34,7 +34,7 @@ #include "logging.h" -int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) { +int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id) { const ldf deadline_ts = us_get_now_monotonic() + 1; // wait_timeout ldf now_ts; do { @@ -57,8 +57,7 @@ int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) { return -2; } -us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required) { - us_frame_s *frame = us_frame_init(); +int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required) { us_frame_set_data(frame, mem->data, mem->used); US_FRAME_COPY_META(mem, frame); *frame_id = mem->id; @@ -67,17 +66,14 @@ us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame mem->key_requested = true; } - bool ok = true; + bool retval = 0; if (frame->format != V4L2_PIX_FMT_H264) { US_JLOG_ERROR("video", "Got non-H264 frame from memsink"); - ok = false; + retval = -1; } if (flock(fd, LOCK_UN) < 0) { US_JLOG_PERROR("video", "Can't unlock memsink"); - ok = false; + retval = -1; } - if (!ok) { - US_DELETE(frame, us_frame_destroy); - } - return frame; + return retval; } diff --git a/janus/src/memsinkfd.h b/janus/src/memsinkfd.h index c61e663..4821c32 100644 --- a/janus/src/memsinkfd.h +++ b/janus/src/memsinkfd.h @@ -27,5 +27,5 @@ #include "uslibs/memsinksh.h" -int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id); -us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required); +int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id); +int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required); diff --git a/janus/src/plugin.c b/janus/src/plugin.c index dfb697b..1e91fa7 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -40,7 +40,7 @@ #include "uslibs/tools.h" #include "uslibs/threading.h" #include "uslibs/list.h" -#include "uslibs/queue.h" +#include "uslibs/ring.h" #include "uslibs/memsinksh.h" #include "const.h" @@ -60,7 +60,7 @@ static const useconds_t _g_watchers_polling = 100000; static us_janus_client_s *_g_clients = NULL; static janus_callbacks *_g_gw = NULL; -static us_queue_s *_g_video_queue = NULL; +static us_ring_s *_g_video_ring = NULL; static us_rtpv_s *_g_rtpv = NULL; static us_rtpa_s *_g_rtpa = NULL; @@ -105,13 +105,14 @@ static void *_video_rtp_thread(void *arg) { atomic_store(&_g_video_rtp_tid_created, true); while (!_STOP) { - us_frame_s *frame; - if (us_queue_get(_g_video_queue, (void **)&frame, 0.1) == 0) { + const int ri = us_ring_consumer_acquire(_g_video_ring, 0.1); + if (ri >= 0) { + us_frame_s *frame = _g_video_ring->items[ri]; _LOCK_VIDEO; const bool zero_playout_delay = (frame->gop == 0); us_rtpv_wrap(_g_rtpv, frame, zero_playout_delay); _UNLOCK_VIDEO; - us_frame_destroy(frame); + us_ring_consumer_release(_g_video_ring, ri); } } return NULL; @@ -123,6 +124,7 @@ static void *_video_sink_thread(void *arg) { US_THREAD_RENAME("us_video_sink"); atomic_store(&_g_video_sink_tid_created, true); + us_frame_s *drop = us_frame_init(); u64 frame_id = 0; int once = 0; @@ -150,20 +152,29 @@ static void *_video_sink_thread(void *arg) { US_JLOG_INFO("video", "Memsink opened; reading frames ..."); while (!_STOP && _HAS_WATCHERS) { - const int result = us_memsink_fd_wait_frame(fd, mem, frame_id); - if (result == 0) { - us_frame_s *const frame = us_memsink_fd_get_frame(fd, mem, &frame_id, atomic_load(&_g_key_required)); - if (frame == NULL) { + const int waited = us_memsink_fd_wait_frame(fd, mem, frame_id); + if (waited == 0) { + const int ri = us_ring_producer_acquire(_g_video_ring, 0); + us_frame_s *frame; + if (ri >= 0) { + frame = _g_video_ring->items[ri]; + } else { + US_ONCE({ US_JLOG_PERROR("video", "Video ring is full"); }); + frame = drop; + } + + const int got = us_memsink_fd_get_frame(fd, mem, frame, &frame_id, atomic_load(&_g_key_required)); + if (ri >= 0) { + us_ring_producer_release(_g_video_ring, ri); + } + if (got < 0) { goto close_memsink; } - if (frame->key) { + + if (ri >= 0 && frame->key) { atomic_store(&_g_key_required, false); } - if (us_queue_put(_g_video_queue, frame, 0) != 0) { - US_ONCE({ US_JLOG_PERROR("video", "Video queue is full"); }); - us_frame_destroy(frame); - } - } else if (result == -1) { + } else if (waited != -2) { goto close_memsink; } } @@ -174,6 +185,7 @@ static void *_video_sink_thread(void *arg) { US_JLOG_INFO("video", "Memsink closed"); sleep(1); // error_delay } + us_frame_destroy(drop); return NULL; } @@ -258,7 +270,7 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { } _g_gw = gw; - _g_video_queue = us_queue_init(1024); + US_RING_INIT_WITH_ITEMS(_g_video_ring, 64, us_frame_init); _g_rtpv = us_rtpv_init(_relay_rtp_clients); if (_g_config->audio_dev_name != NULL && us_audio_probe(_g_config->audio_dev_name)) { _g_rtpa = us_rtpa_init(_relay_rtp_clients); @@ -286,7 +298,7 @@ static void _plugin_destroy(void) { us_janus_client_destroy(client); }); - US_QUEUE_DELETE_WITH_ITEMS(_g_video_queue, us_frame_destroy); + US_RING_DELETE_WITH_ITEMS(_g_video_ring, us_frame_destroy); US_DELETE(_g_rtpa, us_rtpa_destroy); US_DELETE(_g_rtpv, us_rtpv_destroy); diff --git a/janus/src/uslibs/ring.c b/janus/src/uslibs/ring.c new file mode 120000 index 0000000..820246e --- /dev/null +++ b/janus/src/uslibs/ring.c @@ -0,0 +1 @@ +../../../src/libs/ring.c \ No newline at end of file diff --git a/janus/src/uslibs/ring.h b/janus/src/uslibs/ring.h new file mode 120000 index 0000000..5e531ce --- /dev/null +++ b/janus/src/uslibs/ring.h @@ -0,0 +1 @@ +../../../src/libs/ring.h \ No newline at end of file diff --git a/src/libs/ring.c b/src/libs/ring.c new file mode 100644 index 0000000..6c8a1d9 --- /dev/null +++ b/src/libs/ring.c @@ -0,0 +1,86 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2023 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include + +#include "ring.h" + +#include "types.h" +#include "tools.h" +#include "queue.h" + + +int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout); +void _release(us_ring_s *ring, us_queue_s *queue, uint index); + + +us_ring_s *us_ring_init(uint capacity) { + us_ring_s *ring; + US_CALLOC(ring, 1); + US_CALLOC(ring->items, capacity); + US_CALLOC(ring->places, capacity); + ring->capacity = capacity; + ring->producer = us_queue_init(capacity); + ring->consumer = us_queue_init(capacity); + for (uint index = 0; index < capacity; ++index) { + ring->places[index] = index; // XXX: Just to avoid casting between pointer and uint + assert(!us_queue_put(ring->producer, (void*)(ring->places + index), 0)); + } + return ring; +} + +void us_ring_destroy(us_ring_s *ring) { + us_queue_destroy(ring->consumer); + us_queue_destroy(ring->producer); + free(ring->places); + free(ring->items); + free(ring); +} + +int us_ring_producer_acquire(us_ring_s *ring, ldf timeout) { + return _acquire(ring, ring->producer, timeout); +} + +void us_ring_producer_release(us_ring_s *ring, uint index) { + _release(ring, ring->consumer, index); +} + +int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout) { + return _acquire(ring, ring->consumer, timeout); +} + +void us_ring_consumer_release(us_ring_s *ring, uint index) { + _release(ring, ring->producer, index); +} + +int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout) { + (void)ring; + uint *place; + if (us_queue_get(queue, (void**)&place, timeout) < 0) { + return -1; + } + return *place; +} + +void _release(us_ring_s *ring, us_queue_s *queue, uint index) { + assert(!us_queue_put(queue, (void*)(ring->places + index), 0)); +} diff --git a/src/libs/ring.h b/src/libs/ring.h new file mode 100644 index 0000000..02799ec --- /dev/null +++ b/src/libs/ring.h @@ -0,0 +1,63 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2023 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + + +#include "types.h" +#include "queue.h" + + +typedef struct { + uz capacity; + void **items; + uint *places; + us_queue_s *producer; + us_queue_s *consumer; +} us_ring_s; + + +#define US_RING_INIT_WITH_ITEMS(x_ring, x_capacity, x_init_item) { \ + (x_ring) = us_ring_init(x_capacity); \ + for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \ + (x_ring)->items[m_index] = x_init_item(); \ + } \ + } + +#define US_RING_DELETE_WITH_ITEMS(x_ring, x_destroy_item) { \ + if (x_ring) { \ + for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \ + x_destroy_item((x_ring)->items[m_index]); \ + } \ + us_ring_destroy(x_ring); \ + } \ + } + + +us_ring_s *us_ring_init(uint capacity); +void us_ring_destroy(us_ring_s *ring); + +int us_ring_producer_acquire(us_ring_s *ring, ldf timeout); +void us_ring_producer_release(us_ring_s *ring, uint index); + +int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout); +void us_ring_consumer_release(us_ring_s *ring, uint index);