mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-10 09:33:43 +00:00
janus: reducing memory allocating using ring buffer
This commit is contained in:
@@ -34,7 +34,7 @@
|
|||||||
#include "logging.h"
|
#include "logging.h"
|
||||||
|
|
||||||
|
|
||||||
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) {
|
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id) {
|
||||||
const ldf deadline_ts = us_get_now_monotonic() + 1; // wait_timeout
|
const ldf deadline_ts = us_get_now_monotonic() + 1; // wait_timeout
|
||||||
ldf now_ts;
|
ldf now_ts;
|
||||||
do {
|
do {
|
||||||
@@ -57,8 +57,7 @@ int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id) {
|
|||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
|
|
||||||
us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required) {
|
int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required) {
|
||||||
us_frame_s *frame = us_frame_init();
|
|
||||||
us_frame_set_data(frame, mem->data, mem->used);
|
us_frame_set_data(frame, mem->data, mem->used);
|
||||||
US_FRAME_COPY_META(mem, frame);
|
US_FRAME_COPY_META(mem, frame);
|
||||||
*frame_id = mem->id;
|
*frame_id = mem->id;
|
||||||
@@ -67,17 +66,14 @@ us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame
|
|||||||
mem->key_requested = true;
|
mem->key_requested = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ok = true;
|
bool retval = 0;
|
||||||
if (frame->format != V4L2_PIX_FMT_H264) {
|
if (frame->format != V4L2_PIX_FMT_H264) {
|
||||||
US_JLOG_ERROR("video", "Got non-H264 frame from memsink");
|
US_JLOG_ERROR("video", "Got non-H264 frame from memsink");
|
||||||
ok = false;
|
retval = -1;
|
||||||
}
|
}
|
||||||
if (flock(fd, LOCK_UN) < 0) {
|
if (flock(fd, LOCK_UN) < 0) {
|
||||||
US_JLOG_PERROR("video", "Can't unlock memsink");
|
US_JLOG_PERROR("video", "Can't unlock memsink");
|
||||||
ok = false;
|
retval = -1;
|
||||||
}
|
}
|
||||||
if (!ok) {
|
return retval;
|
||||||
US_DELETE(frame, us_frame_destroy);
|
|
||||||
}
|
|
||||||
return frame;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,5 +27,5 @@
|
|||||||
#include "uslibs/memsinksh.h"
|
#include "uslibs/memsinksh.h"
|
||||||
|
|
||||||
|
|
||||||
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, u64 last_id);
|
int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s *mem, u64 last_id);
|
||||||
us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, u64 *frame_id, bool key_required);
|
int us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, us_frame_s *frame, u64 *frame_id, bool key_required);
|
||||||
|
|||||||
@@ -40,7 +40,7 @@
|
|||||||
#include "uslibs/tools.h"
|
#include "uslibs/tools.h"
|
||||||
#include "uslibs/threading.h"
|
#include "uslibs/threading.h"
|
||||||
#include "uslibs/list.h"
|
#include "uslibs/list.h"
|
||||||
#include "uslibs/queue.h"
|
#include "uslibs/ring.h"
|
||||||
#include "uslibs/memsinksh.h"
|
#include "uslibs/memsinksh.h"
|
||||||
|
|
||||||
#include "const.h"
|
#include "const.h"
|
||||||
@@ -60,7 +60,7 @@ static const useconds_t _g_watchers_polling = 100000;
|
|||||||
|
|
||||||
static us_janus_client_s *_g_clients = NULL;
|
static us_janus_client_s *_g_clients = NULL;
|
||||||
static janus_callbacks *_g_gw = NULL;
|
static janus_callbacks *_g_gw = NULL;
|
||||||
static us_queue_s *_g_video_queue = NULL;
|
static us_ring_s *_g_video_ring = NULL;
|
||||||
static us_rtpv_s *_g_rtpv = NULL;
|
static us_rtpv_s *_g_rtpv = NULL;
|
||||||
static us_rtpa_s *_g_rtpa = NULL;
|
static us_rtpa_s *_g_rtpa = NULL;
|
||||||
|
|
||||||
@@ -105,13 +105,14 @@ static void *_video_rtp_thread(void *arg) {
|
|||||||
atomic_store(&_g_video_rtp_tid_created, true);
|
atomic_store(&_g_video_rtp_tid_created, true);
|
||||||
|
|
||||||
while (!_STOP) {
|
while (!_STOP) {
|
||||||
us_frame_s *frame;
|
const int ri = us_ring_consumer_acquire(_g_video_ring, 0.1);
|
||||||
if (us_queue_get(_g_video_queue, (void **)&frame, 0.1) == 0) {
|
if (ri >= 0) {
|
||||||
|
us_frame_s *frame = _g_video_ring->items[ri];
|
||||||
_LOCK_VIDEO;
|
_LOCK_VIDEO;
|
||||||
const bool zero_playout_delay = (frame->gop == 0);
|
const bool zero_playout_delay = (frame->gop == 0);
|
||||||
us_rtpv_wrap(_g_rtpv, frame, zero_playout_delay);
|
us_rtpv_wrap(_g_rtpv, frame, zero_playout_delay);
|
||||||
_UNLOCK_VIDEO;
|
_UNLOCK_VIDEO;
|
||||||
us_frame_destroy(frame);
|
us_ring_consumer_release(_g_video_ring, ri);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
@@ -123,6 +124,7 @@ static void *_video_sink_thread(void *arg) {
|
|||||||
US_THREAD_RENAME("us_video_sink");
|
US_THREAD_RENAME("us_video_sink");
|
||||||
atomic_store(&_g_video_sink_tid_created, true);
|
atomic_store(&_g_video_sink_tid_created, true);
|
||||||
|
|
||||||
|
us_frame_s *drop = us_frame_init();
|
||||||
u64 frame_id = 0;
|
u64 frame_id = 0;
|
||||||
int once = 0;
|
int once = 0;
|
||||||
|
|
||||||
@@ -150,20 +152,29 @@ static void *_video_sink_thread(void *arg) {
|
|||||||
|
|
||||||
US_JLOG_INFO("video", "Memsink opened; reading frames ...");
|
US_JLOG_INFO("video", "Memsink opened; reading frames ...");
|
||||||
while (!_STOP && _HAS_WATCHERS) {
|
while (!_STOP && _HAS_WATCHERS) {
|
||||||
const int result = us_memsink_fd_wait_frame(fd, mem, frame_id);
|
const int waited = us_memsink_fd_wait_frame(fd, mem, frame_id);
|
||||||
if (result == 0) {
|
if (waited == 0) {
|
||||||
us_frame_s *const frame = us_memsink_fd_get_frame(fd, mem, &frame_id, atomic_load(&_g_key_required));
|
const int ri = us_ring_producer_acquire(_g_video_ring, 0);
|
||||||
if (frame == NULL) {
|
us_frame_s *frame;
|
||||||
|
if (ri >= 0) {
|
||||||
|
frame = _g_video_ring->items[ri];
|
||||||
|
} else {
|
||||||
|
US_ONCE({ US_JLOG_PERROR("video", "Video ring is full"); });
|
||||||
|
frame = drop;
|
||||||
|
}
|
||||||
|
|
||||||
|
const int got = us_memsink_fd_get_frame(fd, mem, frame, &frame_id, atomic_load(&_g_key_required));
|
||||||
|
if (ri >= 0) {
|
||||||
|
us_ring_producer_release(_g_video_ring, ri);
|
||||||
|
}
|
||||||
|
if (got < 0) {
|
||||||
goto close_memsink;
|
goto close_memsink;
|
||||||
}
|
}
|
||||||
if (frame->key) {
|
|
||||||
|
if (ri >= 0 && frame->key) {
|
||||||
atomic_store(&_g_key_required, false);
|
atomic_store(&_g_key_required, false);
|
||||||
}
|
}
|
||||||
if (us_queue_put(_g_video_queue, frame, 0) != 0) {
|
} else if (waited != -2) {
|
||||||
US_ONCE({ US_JLOG_PERROR("video", "Video queue is full"); });
|
|
||||||
us_frame_destroy(frame);
|
|
||||||
}
|
|
||||||
} else if (result == -1) {
|
|
||||||
goto close_memsink;
|
goto close_memsink;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -174,6 +185,7 @@ static void *_video_sink_thread(void *arg) {
|
|||||||
US_JLOG_INFO("video", "Memsink closed");
|
US_JLOG_INFO("video", "Memsink closed");
|
||||||
sleep(1); // error_delay
|
sleep(1); // error_delay
|
||||||
}
|
}
|
||||||
|
us_frame_destroy(drop);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -258,7 +270,7 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
|
|||||||
}
|
}
|
||||||
_g_gw = gw;
|
_g_gw = gw;
|
||||||
|
|
||||||
_g_video_queue = us_queue_init(1024);
|
US_RING_INIT_WITH_ITEMS(_g_video_ring, 64, us_frame_init);
|
||||||
_g_rtpv = us_rtpv_init(_relay_rtp_clients);
|
_g_rtpv = us_rtpv_init(_relay_rtp_clients);
|
||||||
if (_g_config->audio_dev_name != NULL && us_audio_probe(_g_config->audio_dev_name)) {
|
if (_g_config->audio_dev_name != NULL && us_audio_probe(_g_config->audio_dev_name)) {
|
||||||
_g_rtpa = us_rtpa_init(_relay_rtp_clients);
|
_g_rtpa = us_rtpa_init(_relay_rtp_clients);
|
||||||
@@ -286,7 +298,7 @@ static void _plugin_destroy(void) {
|
|||||||
us_janus_client_destroy(client);
|
us_janus_client_destroy(client);
|
||||||
});
|
});
|
||||||
|
|
||||||
US_QUEUE_DELETE_WITH_ITEMS(_g_video_queue, us_frame_destroy);
|
US_RING_DELETE_WITH_ITEMS(_g_video_ring, us_frame_destroy);
|
||||||
|
|
||||||
US_DELETE(_g_rtpa, us_rtpa_destroy);
|
US_DELETE(_g_rtpa, us_rtpa_destroy);
|
||||||
US_DELETE(_g_rtpv, us_rtpv_destroy);
|
US_DELETE(_g_rtpv, us_rtpv_destroy);
|
||||||
|
|||||||
1
janus/src/uslibs/ring.c
Symbolic link
1
janus/src/uslibs/ring.c
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../../src/libs/ring.c
|
||||||
1
janus/src/uslibs/ring.h
Symbolic link
1
janus/src/uslibs/ring.h
Symbolic link
@@ -0,0 +1 @@
|
|||||||
|
../../../src/libs/ring.h
|
||||||
86
src/libs/ring.c
Normal file
86
src/libs/ring.c
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
/*****************************************************************************
|
||||||
|
# #
|
||||||
|
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2023 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
*****************************************************************************/
|
||||||
|
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include "ring.h"
|
||||||
|
|
||||||
|
#include "types.h"
|
||||||
|
#include "tools.h"
|
||||||
|
#include "queue.h"
|
||||||
|
|
||||||
|
|
||||||
|
int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout);
|
||||||
|
void _release(us_ring_s *ring, us_queue_s *queue, uint index);
|
||||||
|
|
||||||
|
|
||||||
|
us_ring_s *us_ring_init(uint capacity) {
|
||||||
|
us_ring_s *ring;
|
||||||
|
US_CALLOC(ring, 1);
|
||||||
|
US_CALLOC(ring->items, capacity);
|
||||||
|
US_CALLOC(ring->places, capacity);
|
||||||
|
ring->capacity = capacity;
|
||||||
|
ring->producer = us_queue_init(capacity);
|
||||||
|
ring->consumer = us_queue_init(capacity);
|
||||||
|
for (uint index = 0; index < capacity; ++index) {
|
||||||
|
ring->places[index] = index; // XXX: Just to avoid casting between pointer and uint
|
||||||
|
assert(!us_queue_put(ring->producer, (void*)(ring->places + index), 0));
|
||||||
|
}
|
||||||
|
return ring;
|
||||||
|
}
|
||||||
|
|
||||||
|
void us_ring_destroy(us_ring_s *ring) {
|
||||||
|
us_queue_destroy(ring->consumer);
|
||||||
|
us_queue_destroy(ring->producer);
|
||||||
|
free(ring->places);
|
||||||
|
free(ring->items);
|
||||||
|
free(ring);
|
||||||
|
}
|
||||||
|
|
||||||
|
int us_ring_producer_acquire(us_ring_s *ring, ldf timeout) {
|
||||||
|
return _acquire(ring, ring->producer, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
void us_ring_producer_release(us_ring_s *ring, uint index) {
|
||||||
|
_release(ring, ring->consumer, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout) {
|
||||||
|
return _acquire(ring, ring->consumer, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
void us_ring_consumer_release(us_ring_s *ring, uint index) {
|
||||||
|
_release(ring, ring->producer, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
int _acquire(us_ring_s *ring, us_queue_s *queue, ldf timeout) {
|
||||||
|
(void)ring;
|
||||||
|
uint *place;
|
||||||
|
if (us_queue_get(queue, (void**)&place, timeout) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return *place;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _release(us_ring_s *ring, us_queue_s *queue, uint index) {
|
||||||
|
assert(!us_queue_put(queue, (void*)(ring->places + index), 0));
|
||||||
|
}
|
||||||
63
src/libs/ring.h
Normal file
63
src/libs/ring.h
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
/*****************************************************************************
|
||||||
|
# #
|
||||||
|
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2023 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
*****************************************************************************/
|
||||||
|
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
|
||||||
|
#include "types.h"
|
||||||
|
#include "queue.h"
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uz capacity;
|
||||||
|
void **items;
|
||||||
|
uint *places;
|
||||||
|
us_queue_s *producer;
|
||||||
|
us_queue_s *consumer;
|
||||||
|
} us_ring_s;
|
||||||
|
|
||||||
|
|
||||||
|
#define US_RING_INIT_WITH_ITEMS(x_ring, x_capacity, x_init_item) { \
|
||||||
|
(x_ring) = us_ring_init(x_capacity); \
|
||||||
|
for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \
|
||||||
|
(x_ring)->items[m_index] = x_init_item(); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define US_RING_DELETE_WITH_ITEMS(x_ring, x_destroy_item) { \
|
||||||
|
if (x_ring) { \
|
||||||
|
for (uz m_index = 0; m_index < (x_ring)->capacity; ++m_index) { \
|
||||||
|
x_destroy_item((x_ring)->items[m_index]); \
|
||||||
|
} \
|
||||||
|
us_ring_destroy(x_ring); \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
us_ring_s *us_ring_init(uint capacity);
|
||||||
|
void us_ring_destroy(us_ring_s *ring);
|
||||||
|
|
||||||
|
int us_ring_producer_acquire(us_ring_s *ring, ldf timeout);
|
||||||
|
void us_ring_producer_release(us_ring_s *ring, uint index);
|
||||||
|
|
||||||
|
int us_ring_consumer_acquire(us_ring_s *ring, ldf timeout);
|
||||||
|
void us_ring_consumer_release(us_ring_s *ring, uint index);
|
||||||
Reference in New Issue
Block a user