From f7c29484771bfa1c145ebdc39057401a88481894 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Tue, 5 Mar 2024 04:17:54 +0200 Subject: [PATCH] improved memsink checks performance --- src/libs/memsink.c | 86 ++++++++++++++--------- src/libs/memsink.h | 7 +- src/ustreamer/h264.c | 5 +- src/ustreamer/stream.c | 154 ++++++++++++++++++++++++++--------------- 4 files changed, 160 insertions(+), 92 deletions(-) diff --git a/src/libs/memsink.c b/src/libs/memsink.c index c4812ee..01df505 100644 --- a/src/libs/memsink.c +++ b/src/libs/memsink.c @@ -101,16 +101,35 @@ void us_memsink_destroy(us_memsink_s *sink) { } bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) { - // Return true (the need to write to memsink) on any of these conditions: - // - EWOULDBLOCK - we have an active client; - // - Incorrect magic or version - need to first write; - // - We have some active clients by last_client_ts; - // - Frame meta differs (like size, format, but not timestamp). + // Если frame == NULL, то только проверяем наличие клиентов + // или необходимость инициализировать память. assert(sink->server); + if (sink->mem->magic != US_MEMSINK_MAGIC || sink->mem->version != US_MEMSINK_VERSION) { + // Если регион памяти не был инициализирован, то нужно что-то туда положить. + // Блокировка не нужна, потому что только сервер пишет в эти переменные. + return true; + } + + const ldf unsafe_ts = sink->mem->last_client_ts; + if (unsafe_ts != sink->unsafe_last_client_ts) { + // Клиент пишет в синке свою отметку last_client_ts при любом действии. + // Мы не берем блокировку здесь, а просто проверяем, является ли это число тем же самым, + // что было прочитано нами в предыдущих итерациях. Значению не нужно быть консистентным, + // и даже если мы прочитали мусор из-за гонки в памяти между чтением здеси и записью + // из клиента, мы все равно можем сделать вывод, есть ли у нас клиенты вообще. + // Если число число поменялось то у нас точно есть клиенты и дальнейшие проверки + // проводить не требуется. Если же число неизменно, то стоит поставить блокировку + // и проверить, нужно ли записать что-нибудь в память для инициализации фрейма. + sink->unsafe_last_client_ts = unsafe_ts; + atomic_store(&sink->has_clients, true); + return true; + } + if (flock(sink->fd, LOCK_EX | LOCK_NB) < 0) { if (errno == EWOULDBLOCK) { + // Есть живой клиент, который прямо сейчас взял блокировку и читает фрейм из синка atomic_store(&sink->has_clients, true); return true; } @@ -118,10 +137,7 @@ bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) { return false; } - if (sink->mem->magic != US_MEMSINK_MAGIC || sink->mem->version != US_MEMSINK_VERSION) { - return true; - } - + // Проверяем, есть ли у нас живой клиент по таймауту const bool has_clients = (sink->mem->last_client_ts + sink->client_ttl > us_get_now_monotonic()); atomic_store(&sink->has_clients, has_clients); @@ -133,6 +149,7 @@ bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) { return true; } if (frame != NULL && !US_FRAME_COMPARE_GEOMETRY(sink->mem, frame)) { + // Если есть изменения в геометрии/формате фрейма, то их тоже нобходимо сразу записать в синк return true; } return false; @@ -152,12 +169,13 @@ int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame, bool *key if (us_flock_timedwait_monotonic(sink->fd, 1) == 0) { US_LOG_VERBOSE("%s-sink: >>>>> Exposing new frame ...", sink->name); - sink->last_id = us_get_now_id(); - sink->mem->id = sink->last_id; + sink->mem->id = us_get_now_id(); if (sink->mem->key_requested && frame->key) { sink->mem->key_requested = false; } - *key_requested = sink->mem->key_requested; + if (key_requested != NULL) { // We don't need it for non-H264 sinks + *key_requested = sink->mem->key_requested; + } memcpy(sink->mem->data, frame->data, frame->used); sink->mem->used = frame->used; @@ -196,27 +214,33 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque return -1; } - int retval = -2; // Not updated + int retval = 0; - if (sink->mem->magic == US_MEMSINK_MAGIC) { - if (sink->mem->version != US_MEMSINK_VERSION) { - US_LOG_ERROR("%s-sink: Protocol version mismatch: sink=%u, required=%u", - sink->name, sink->mem->version, US_MEMSINK_VERSION); - retval = -1; - goto done; - } - if (sink->mem->id != sink->last_id) { // When updated - sink->last_id = sink->mem->id; - us_frame_set_data(frame, sink->mem->data, sink->mem->used); - US_FRAME_COPY_META(sink->mem, frame); - *key_requested = sink->mem->key_requested; - retval = 0; - } - sink->mem->last_client_ts = us_get_now_monotonic(); - if (key_required) { - sink->mem->key_requested = true; - } + if (sink->mem->magic != US_MEMSINK_MAGIC) { + retval = -2; // Not updated + goto done; } + if (sink->mem->version != US_MEMSINK_VERSION) { + US_LOG_ERROR("%s-sink: Protocol version mismatch: sink=%u, required=%u", + sink->name, sink->mem->version, US_MEMSINK_VERSION); + retval = -1; + goto done; + } + if (sink->mem->id == sink->last_readed_id) { + retval = -2; // Not updated + goto done; + } + + sink->last_readed_id = sink->mem->id; + us_frame_set_data(frame, sink->mem->data, sink->mem->used); + US_FRAME_COPY_META(sink->mem, frame); + if (key_requested != NULL) { // We don't need it for non-H264 sinks + *key_requested = sink->mem->key_requested; + } + if (key_required) { + sink->mem->key_requested = true; + } + sink->mem->last_client_ts = us_get_now_monotonic(); done: if (flock(sink->fd, LOCK_UN) < 0) { diff --git a/src/libs/memsink.h b/src/libs/memsink.h index e52a730..a55814a 100644 --- a/src/libs/memsink.h +++ b/src/libs/memsink.h @@ -41,8 +41,11 @@ typedef struct { int fd; us_memsink_shared_s *mem; - u64 last_id; - atomic_bool has_clients; // Only for server + + u64 last_readed_id; // Only for client + + atomic_bool has_clients; // Only for server results + ldf unsafe_last_client_ts; // Only for server } us_memsink_s; diff --git a/src/ustreamer/h264.c b/src/ustreamer/h264.c index 11a41ec..ee07048 100644 --- a/src/ustreamer/h264.c +++ b/src/ustreamer/h264.c @@ -53,14 +53,11 @@ void us_h264_stream_destroy(us_h264_stream_s *h264) { } void us_h264_stream_process(us_h264_stream_s *h264, const us_frame_s *frame, bool force_key) { - /*if (!us_memsink_server_check(h264->sink, frame)) { - return; - }*/ - if (us_is_jpeg(frame->format)) { const ldf now_ts = us_get_now_monotonic(); US_LOG_DEBUG("H264: Input frame is JPEG; decoding ..."); if (us_unjpeg(frame, h264->tmp_src, true) < 0) { + atomic_store(&h264->online, false); return; } frame = h264->tmp_src; diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index d3558e7..1bee2e0 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -62,15 +62,7 @@ typedef struct { us_queue_s *queue; us_stream_s *stream; atomic_bool *stop; -} _jpeg_context_s; - -typedef struct { - pthread_t tid; - us_device_s *dev; - us_queue_s *queue; - us_h264_stream_s *h264; - atomic_bool *stop; -} _h264_context_s; +} _worker_context_s; static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps); @@ -78,23 +70,18 @@ static void _stream_set_capture_state(us_stream_s *stream, uint width, uint heig static void *_releaser_thread(void *v_ctx); static void *_jpeg_thread(void *v_ctx); static void *_h264_thread(void *v_ctx); +static void *_raw_thread(void *v_ctx); static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue); -static bool _stream_has_any_clients(us_stream_s *stream); +static bool _stream_has_jpeg_clients_cached(us_stream_s *stream); +static bool _stream_has_any_clients_cached(us_stream_s *stream); static int _stream_init_loop(us_stream_s *stream); -static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame); +static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame); +static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame); static void _stream_check_suicide(us_stream_s *stream); -#define _SINK_PUT(x_sink, x_frame) { \ - if (stream->x_sink && us_memsink_server_check(stream->x_sink, x_frame)) {\ - bool m_key_requested; /* Unused */ \ - us_memsink_server_put(stream->x_sink, x_frame, &m_key_requested); \ - } \ - } - - us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { us_stream_runtime_s *run; US_CALLOC(run, 1); @@ -158,22 +145,29 @@ void us_stream_loop(us_stream_s *stream) { US_THREAD_CREATE(ctx->tid, _releaser_thread, ctx); } - _jpeg_context_s jpeg_ctx = { + _worker_context_s jpeg_ctx = { .queue = us_queue_init(dev->run->n_bufs), .stream = stream, .stop = &threads_stop, }; US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx); - _h264_context_s h264_ctx; + _worker_context_s h264_ctx; if (run->h264 != NULL) { - h264_ctx.dev = dev; h264_ctx.queue = us_queue_init(dev->run->n_bufs); - h264_ctx.h264 = run->h264; + h264_ctx.stream = stream; h264_ctx.stop = &threads_stop; US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx); } + _worker_context_s raw_ctx; + if (stream->raw_sink != NULL) { + raw_ctx.queue = us_queue_init(2); + raw_ctx.stream = stream; + raw_ctx.stop = &threads_stop; + US_THREAD_CREATE(raw_ctx.tid, _raw_thread, &raw_ctx); + } + uint captured_fps_accum = 0; sll captured_fps_ts = 0; uint captured_fps = 0; @@ -182,15 +176,6 @@ void us_stream_loop(us_stream_s *stream) { uint slowdown_count = 0; while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) { - _stream_check_suicide(stream); - if (stream->slowdown && !_stream_has_any_clients(stream)) { - usleep(100 * 1000); - slowdown_count = (slowdown_count + 1) % 10; - if (slowdown_count > 0) { - continue; - } - } - us_hw_buffer_s *hw; const int buf_index = us_device_grab_buffer(dev, &hw); switch (buf_index) { @@ -219,14 +204,31 @@ void us_stream_loop(us_stream_s *stream) { us_device_buffer_incref(hw); // H264 us_queue_put(h264_ctx.queue, hw, 0); } + if (stream->raw_sink != NULL) { + us_device_buffer_incref(hw); // RAW + us_queue_put(raw_ctx.queue, hw, 0); + } us_queue_put(releasers[buf_index].queue, hw, 0); // Plan to release - _SINK_PUT(raw_sink, &hw->raw); + // Мы не обновляем здесь состояние синков, потому что это происходит внутри обслуживающих их потоков + _stream_check_suicide(stream); + if (stream->slowdown && !_stream_has_any_clients_cached(stream)) { + usleep(100 * 1000); + slowdown_count = (slowdown_count + 1) % 10; + if (slowdown_count > 0) { + continue; + } + } } close: atomic_store(&threads_stop, true); + if (stream->raw_sink != NULL) { + US_THREAD_JOIN(raw_ctx.tid); + us_queue_destroy(raw_ctx.queue); + } + if (run->h264 != NULL) { US_THREAD_JOIN(h264_ctx.tid); us_queue_destroy(h264_ctx.queue); @@ -309,7 +311,7 @@ done: static void *_jpeg_thread(void *v_ctx) { US_THREAD_SETTLE("str_jpeg") - _jpeg_context_s *ctx = v_ctx; + _worker_context_s *ctx = v_ctx; us_stream_s *stream = ctx->stream; ldf grab_after = 0; @@ -325,11 +327,10 @@ static void *_jpeg_thread(void *v_ctx) { if (ready_wr->job_failed) { // pass } else if (ready_wr->job_timely) { - _stream_expose_http(stream, ready_job->dest); + _stream_expose_jpeg(stream, ready_job->dest); if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots atomic_fetch_sub(&stream->run->http_snapshot_requested, 1); } - _SINK_PUT(jpeg_sink, ready_job->dest); US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); } else { @@ -342,11 +343,8 @@ static void *_jpeg_thread(void *v_ctx) { continue; } - if ( // Если никто не смотрит MJPEG - пропускаем кадр - !atomic_load(&stream->run->http_has_clients) - && (atomic_load(&stream->run->http_snapshot_requested) == 0) - && (stream->jpeg_sink == NULL || !us_memsink_server_check(stream->jpeg_sink, NULL)) - ) { + const bool update_required = (stream->jpeg_sink != NULL && us_memsink_server_check(stream->jpeg_sink, NULL)); + if (!update_required && !_stream_has_jpeg_clients_cached(stream)) { US_LOG_VERBOSE("Passed JPEG encoding because nobody is watching"); us_device_buffer_decref(hw); continue; @@ -375,7 +373,7 @@ static void *_jpeg_thread(void *v_ctx) { static void *_h264_thread(void *v_ctx) { US_THREAD_SETTLE("str_h264"); - _h264_context_s *ctx = v_ctx; + _worker_context_s *ctx = v_ctx; ldf last_encode_ts = us_get_now_monotonic(); while (!atomic_load(ctx->stop)) { @@ -384,7 +382,7 @@ static void *_h264_thread(void *v_ctx) { continue; } - if (!us_memsink_server_check(ctx->h264->sink, NULL)) { + if (!us_memsink_server_check(ctx->stream->run->h264->sink, NULL)) { us_device_buffer_decref(hw); US_LOG_VERBOSE("Passed H264 encoding because nobody is watching"); continue; @@ -395,7 +393,29 @@ static void *_h264_thread(void *v_ctx) { const bool force_key = (last_encode_ts + 0.5 < now_ts); last_encode_ts = now_ts; - us_h264_stream_process(ctx->h264, &hw->raw, force_key); + us_h264_stream_process(ctx->stream->run->h264, &hw->raw, force_key); + us_device_buffer_decref(hw); + } + return NULL; +} + +static void *_raw_thread(void *v_ctx) { + US_THREAD_SETTLE("str_raw"); + _worker_context_s *ctx = v_ctx; + + while (!atomic_load(ctx->stop)) { + us_hw_buffer_s *hw = _get_latest_hw(ctx->queue); + if (hw == NULL) { + continue; + } + + if (!us_memsink_server_check(ctx->stream->raw_sink, NULL)) { + us_device_buffer_decref(hw); + US_LOG_VERBOSE("Passed RAW publishing because nobody is watching"); + continue; + } + + us_memsink_server_put(ctx->stream->raw_sink, &hw->raw, false); us_device_buffer_decref(hw); } return NULL; @@ -413,14 +433,21 @@ static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue) { return hw; } -static bool _stream_has_any_clients(us_stream_s *stream) { +static bool _stream_has_jpeg_clients_cached(us_stream_s *stream) { const us_stream_runtime_s *const run = stream->run; return ( atomic_load(&run->http_has_clients) || (atomic_load(&run->http_snapshot_requested) > 0) - // has_clients синков НЕ обновляются в реальном времени || (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients)) - || (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients)) + ); +} + +static bool _stream_has_any_clients_cached(us_stream_s *stream) { + const us_stream_runtime_s *const run = stream->run; + return ( + _stream_has_jpeg_clients_cached(stream) + || (run->h264 != NULL && atomic_load(&run->h264->sink->has_clients)) + || (stream->raw_sink != NULL && atomic_load(&stream->raw_sink->has_clients)) ); } @@ -429,6 +456,18 @@ static int _stream_init_loop(us_stream_s *stream) { bool waiting_reported = false; while (!atomic_load(&stream->run->stop)) { + // Флаги has_clients у синков не обновляются сами по себе, поэтому обновим их + // на каждой итерации старта стрима. После старта этим будут заниматься воркеры. + if (stream->jpeg_sink != NULL) { + us_memsink_server_check(stream->jpeg_sink, NULL); + } + if (stream->run->h264 != NULL) { + us_memsink_server_check(stream->run->h264->sink, NULL); + } + if (stream->raw_sink != NULL) { + us_memsink_server_check(stream->raw_sink, NULL); + } + _stream_check_suicide(stream); uint width = stream->dev->run->width; @@ -444,12 +483,11 @@ static int _stream_init_loop(us_stream_s *stream) { us_gpio_set_stream_online(false); # endif - _stream_expose_http(stream, run->blank->jpeg); - _SINK_PUT(jpeg_sink, run->blank->jpeg); + _stream_expose_jpeg(stream, run->blank->jpeg); if (run->h264 != NULL) { us_h264_stream_process(run->h264, run->blank->raw, true); } - _SINK_PUT(raw_sink, run->blank->raw); + _stream_expose_raw(stream, run->blank->raw); stream->dev->dma_export = ( stream->enc->type == US_ENCODER_TYPE_M2M_VIDEO @@ -482,7 +520,7 @@ static int _stream_init_loop(us_stream_s *stream) { return -1; } -static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame) { +static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame) { us_stream_runtime_s *const run = stream->run; int ri; while ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0) { @@ -493,19 +531,25 @@ static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame) { us_frame_s *const dest = run->http_jpeg_ring->items[ri]; us_frame_copy(frame, dest); us_ring_producer_release(run->http_jpeg_ring, ri); + if (stream->jpeg_sink != NULL) { + us_memsink_server_put(stream->jpeg_sink, dest, NULL); + } +} + +static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame) { + if (stream->raw_sink != NULL) { + us_memsink_server_put(stream->raw_sink, frame, NULL); + } } static void _stream_check_suicide(us_stream_s *stream) { if (stream->exit_on_no_clients == 0) { return; } - us_stream_runtime_s *const run = stream->run; - const ldf now_ts = us_get_now_monotonic(); const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds - - if (_stream_has_any_clients(stream)) { + if (_stream_has_any_clients_cached(stream)) { atomic_store(&run->http_last_request_ts, now_ts); } else if (http_last_request_ts + stream->exit_on_no_clients < now_ts) { US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...",