refactoring

This commit is contained in:
Maxim Devaev
2024-03-03 08:10:01 +02:00
parent 8fe411aa8b
commit ffa68a86a6

View File

@@ -49,6 +49,14 @@
#endif
typedef struct {
pthread_t tid;
us_device_s *dev;
us_queue_s *queue;
pthread_mutex_t *mutex;
atomic_bool *stop;
} _releaser_context_s;
typedef struct {
pthread_t tid;
us_queue_s *queue;
@@ -64,20 +72,15 @@ typedef struct {
atomic_bool *stop;
} _h264_context_s;
typedef struct {
pthread_t tid;
us_device_s *dev;
us_queue_s *queue;
pthread_mutex_t *mutex;
atomic_bool *stop;
} _releaser_context_s;
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
static void *_releaser_thread(void *v_ctx);
static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps);
static void *_releaser_thread(void *v_ctx);
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue);
static bool _stream_has_any_clients(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
@@ -272,6 +275,35 @@ void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, boo
atomic_store(&stream->run->http_capture_state, state);
}
static void *_releaser_thread(void *v_ctx) {
_releaser_context_s *ctx = v_ctx;
while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
continue;
}
while (atomic_load(&hw->refs) > 0) {
if (atomic_load(ctx->stop)) {
goto done;
}
usleep(5 * 1000);
}
US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
goto done;
}
}
done:
atomic_store(ctx->stop, true); // Stop all other guys on error
return NULL;
}
static void *_jpeg_thread(void *v_ctx) {
_jpeg_context_s *ctx = v_ctx;
us_stream_s *stream = ctx->stream;
@@ -300,14 +332,10 @@ static void *_jpeg_thread(void *v_ctx) {
}
}
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
us_hw_buffer_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(ctx->queue, (void**)&hw, 0));
}
if ( // Если никто не смотрит MJPEG - пропускаем кадр
!atomic_load(&stream->run->http_has_clients)
@@ -345,14 +373,10 @@ static void *_h264_thread(void *v_ctx) {
ldf last_encode_ts = us_get_now_monotonic();
while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
us_hw_buffer_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(ctx->queue, (void**)&hw, 0));
}
if (!us_memsink_server_check(ctx->h264->sink, NULL)) {
us_device_buffer_decref(hw);
@@ -371,33 +395,16 @@ static void *_h264_thread(void *v_ctx) {
return NULL;
}
static void *_releaser_thread(void *v_ctx) {
_releaser_context_s *ctx = v_ctx;
while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
continue;
}
while (atomic_load(&hw->refs) > 0) {
if (atomic_load(ctx->stop)) {
goto done;
}
usleep(5 * 1000);
}
US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
goto done;
}
static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue) {
us_hw_buffer_s *hw;
if (us_queue_get(queue, (void**)&hw, 0.1) < 0) {
return NULL;
}
done:
atomic_store(ctx->stop, true); // Stop all other guys on error
return NULL;
while (!us_queue_is_empty(queue)) { // Берем только самый свежий кадр
us_device_buffer_decref(hw);
assert(!us_queue_get(queue, (void**)&hw, 0));
}
return hw;
}
static bool _stream_has_any_clients(us_stream_s *stream) {