diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 105320b..125e08c 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -49,6 +49,14 @@ #endif +typedef struct { + pthread_t tid; + us_device_s *dev; + us_queue_s *queue; + pthread_mutex_t *mutex; + atomic_bool *stop; +} _releaser_context_s; + typedef struct { pthread_t tid; us_queue_s *queue; @@ -64,20 +72,15 @@ typedef struct { atomic_bool *stop; } _h264_context_s; -typedef struct { - pthread_t tid; - us_device_s *dev; - us_queue_s *queue; - pthread_mutex_t *mutex; - atomic_bool *stop; -} _releaser_context_s; - - -static void *_jpeg_thread(void *v_ctx); -static void *_h264_thread(void *v_ctx); -static void *_releaser_thread(void *v_ctx); static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps); + +static void *_releaser_thread(void *v_ctx); +static void *_jpeg_thread(void *v_ctx); +static void *_h264_thread(void *v_ctx); + +static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue); + static bool _stream_has_any_clients(us_stream_s *stream); static int _stream_init_loop(us_stream_s *stream); static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame); @@ -272,6 +275,35 @@ void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, boo atomic_store(&stream->run->http_capture_state, state); } +static void *_releaser_thread(void *v_ctx) { + _releaser_context_s *ctx = v_ctx; + + while (!atomic_load(ctx->stop)) { + us_hw_buffer_s *hw; + if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + continue; + } + + while (atomic_load(&hw->refs) > 0) { + if (atomic_load(ctx->stop)) { + goto done; + } + usleep(5 * 1000); + } + + US_MUTEX_LOCK(*ctx->mutex); + const int released = us_device_release_buffer(ctx->dev, hw); + US_MUTEX_UNLOCK(*ctx->mutex); + if (released < 0) { + goto done; + } + } + +done: + atomic_store(ctx->stop, true); // Stop all other guys on error + return NULL; +} + static void *_jpeg_thread(void *v_ctx) { _jpeg_context_s *ctx = v_ctx; us_stream_s *stream = ctx->stream; @@ -300,14 +332,10 @@ static void *_jpeg_thread(void *v_ctx) { } } - us_hw_buffer_s *hw; - if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + us_hw_buffer_s *hw = _get_latest_hw(ctx->queue); + if (hw == NULL) { continue; } - while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр - us_device_buffer_decref(hw); - assert(!us_queue_get(ctx->queue, (void**)&hw, 0)); - } if ( // Если никто не смотрит MJPEG - пропускаем кадр !atomic_load(&stream->run->http_has_clients) @@ -345,14 +373,10 @@ static void *_h264_thread(void *v_ctx) { ldf last_encode_ts = us_get_now_monotonic(); while (!atomic_load(ctx->stop)) { - us_hw_buffer_s *hw; - if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { + us_hw_buffer_s *hw = _get_latest_hw(ctx->queue); + if (hw == NULL) { continue; } - while (!us_queue_is_empty(ctx->queue)) { // Берем только самый свежий кадр - us_device_buffer_decref(hw); - assert(!us_queue_get(ctx->queue, (void**)&hw, 0)); - } if (!us_memsink_server_check(ctx->h264->sink, NULL)) { us_device_buffer_decref(hw); @@ -371,33 +395,16 @@ static void *_h264_thread(void *v_ctx) { return NULL; } -static void *_releaser_thread(void *v_ctx) { - _releaser_context_s *ctx = v_ctx; - - while (!atomic_load(ctx->stop)) { - us_hw_buffer_s *hw; - if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) { - continue; - } - - while (atomic_load(&hw->refs) > 0) { - if (atomic_load(ctx->stop)) { - goto done; - } - usleep(5 * 1000); - } - - US_MUTEX_LOCK(*ctx->mutex); - const int released = us_device_release_buffer(ctx->dev, hw); - US_MUTEX_UNLOCK(*ctx->mutex); - if (released < 0) { - goto done; - } +static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue) { + us_hw_buffer_s *hw; + if (us_queue_get(queue, (void**)&hw, 0.1) < 0) { + return NULL; } - -done: - atomic_store(ctx->stop, true); // Stop all other guys on error - return NULL; + while (!us_queue_is_empty(queue)) { // Берем только самый свежий кадр + us_device_buffer_decref(hw); + assert(!us_queue_get(queue, (void**)&hw, 0)); + } + return hw; } static bool _stream_has_any_clients(us_stream_s *stream) {