improved memsink checks performance

This commit is contained in:
Maxim Devaev 2024-03-05 04:17:54 +02:00
parent c55b6c4d7d
commit f7c2948477
4 changed files with 160 additions and 92 deletions

View File

@ -101,16 +101,35 @@ void us_memsink_destroy(us_memsink_s *sink) {
}
bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) {
// Return true (the need to write to memsink) on any of these conditions:
// - EWOULDBLOCK - we have an active client;
// - Incorrect magic or version - need to first write;
// - We have some active clients by last_client_ts;
// - Frame meta differs (like size, format, but not timestamp).
// Если frame == NULL, то только проверяем наличие клиентов
// или необходимость инициализировать память.
assert(sink->server);
if (sink->mem->magic != US_MEMSINK_MAGIC || sink->mem->version != US_MEMSINK_VERSION) {
// Если регион памяти не был инициализирован, то нужно что-то туда положить.
// Блокировка не нужна, потому что только сервер пишет в эти переменные.
return true;
}
const ldf unsafe_ts = sink->mem->last_client_ts;
if (unsafe_ts != sink->unsafe_last_client_ts) {
// Клиент пишет в синке свою отметку last_client_ts при любом действии.
// Мы не берем блокировку здесь, а просто проверяем, является ли это число тем же самым,
// что было прочитано нами в предыдущих итерациях. Значению не нужно быть консистентным,
// и даже если мы прочитали мусор из-за гонки в памяти между чтением здеси и записью
// из клиента, мы все равно можем сделать вывод, есть ли у нас клиенты вообще.
// Если число число поменялось то у нас точно есть клиенты и дальнейшие проверки
// проводить не требуется. Если же число неизменно, то стоит поставить блокировку
// и проверить, нужно ли записать что-нибудь в память для инициализации фрейма.
sink->unsafe_last_client_ts = unsafe_ts;
atomic_store(&sink->has_clients, true);
return true;
}
if (flock(sink->fd, LOCK_EX | LOCK_NB) < 0) {
if (errno == EWOULDBLOCK) {
// Есть живой клиент, который прямо сейчас взял блокировку и читает фрейм из синка
atomic_store(&sink->has_clients, true);
return true;
}
@ -118,10 +137,7 @@ bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) {
return false;
}
if (sink->mem->magic != US_MEMSINK_MAGIC || sink->mem->version != US_MEMSINK_VERSION) {
return true;
}
// Проверяем, есть ли у нас живой клиент по таймауту
const bool has_clients = (sink->mem->last_client_ts + sink->client_ttl > us_get_now_monotonic());
atomic_store(&sink->has_clients, has_clients);
@ -133,6 +149,7 @@ bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) {
return true;
}
if (frame != NULL && !US_FRAME_COMPARE_GEOMETRY(sink->mem, frame)) {
// Если есть изменения в геометрии/формате фрейма, то их тоже нобходимо сразу записать в синк
return true;
}
return false;
@ -152,12 +169,13 @@ int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame, bool *key
if (us_flock_timedwait_monotonic(sink->fd, 1) == 0) {
US_LOG_VERBOSE("%s-sink: >>>>> Exposing new frame ...", sink->name);
sink->last_id = us_get_now_id();
sink->mem->id = sink->last_id;
sink->mem->id = us_get_now_id();
if (sink->mem->key_requested && frame->key) {
sink->mem->key_requested = false;
}
*key_requested = sink->mem->key_requested;
if (key_requested != NULL) { // We don't need it for non-H264 sinks
*key_requested = sink->mem->key_requested;
}
memcpy(sink->mem->data, frame->data, frame->used);
sink->mem->used = frame->used;
@ -196,27 +214,33 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool *key_reque
return -1;
}
int retval = -2; // Not updated
int retval = 0;
if (sink->mem->magic == US_MEMSINK_MAGIC) {
if (sink->mem->version != US_MEMSINK_VERSION) {
US_LOG_ERROR("%s-sink: Protocol version mismatch: sink=%u, required=%u",
sink->name, sink->mem->version, US_MEMSINK_VERSION);
retval = -1;
goto done;
}
if (sink->mem->id != sink->last_id) { // When updated
sink->last_id = sink->mem->id;
us_frame_set_data(frame, sink->mem->data, sink->mem->used);
US_FRAME_COPY_META(sink->mem, frame);
*key_requested = sink->mem->key_requested;
retval = 0;
}
sink->mem->last_client_ts = us_get_now_monotonic();
if (key_required) {
sink->mem->key_requested = true;
}
if (sink->mem->magic != US_MEMSINK_MAGIC) {
retval = -2; // Not updated
goto done;
}
if (sink->mem->version != US_MEMSINK_VERSION) {
US_LOG_ERROR("%s-sink: Protocol version mismatch: sink=%u, required=%u",
sink->name, sink->mem->version, US_MEMSINK_VERSION);
retval = -1;
goto done;
}
if (sink->mem->id == sink->last_readed_id) {
retval = -2; // Not updated
goto done;
}
sink->last_readed_id = sink->mem->id;
us_frame_set_data(frame, sink->mem->data, sink->mem->used);
US_FRAME_COPY_META(sink->mem, frame);
if (key_requested != NULL) { // We don't need it for non-H264 sinks
*key_requested = sink->mem->key_requested;
}
if (key_required) {
sink->mem->key_requested = true;
}
sink->mem->last_client_ts = us_get_now_monotonic();
done:
if (flock(sink->fd, LOCK_UN) < 0) {

View File

@ -41,8 +41,11 @@ typedef struct {
int fd;
us_memsink_shared_s *mem;
u64 last_id;
atomic_bool has_clients; // Only for server
u64 last_readed_id; // Only for client
atomic_bool has_clients; // Only for server results
ldf unsafe_last_client_ts; // Only for server
} us_memsink_s;

View File

@ -53,14 +53,11 @@ void us_h264_stream_destroy(us_h264_stream_s *h264) {
}
void us_h264_stream_process(us_h264_stream_s *h264, const us_frame_s *frame, bool force_key) {
/*if (!us_memsink_server_check(h264->sink, frame)) {
return;
}*/
if (us_is_jpeg(frame->format)) {
const ldf now_ts = us_get_now_monotonic();
US_LOG_DEBUG("H264: Input frame is JPEG; decoding ...");
if (us_unjpeg(frame, h264->tmp_src, true) < 0) {
atomic_store(&h264->online, false);
return;
}
frame = h264->tmp_src;

View File

@ -62,15 +62,7 @@ typedef struct {
us_queue_s *queue;
us_stream_s *stream;
atomic_bool *stop;
} _jpeg_context_s;
typedef struct {
pthread_t tid;
us_device_s *dev;
us_queue_s *queue;
us_h264_stream_s *h264;
atomic_bool *stop;
} _h264_context_s;
} _worker_context_s;
static void _stream_set_capture_state(us_stream_s *stream, uint width, uint height, bool online, uint captured_fps);
@ -78,23 +70,18 @@ static void _stream_set_capture_state(us_stream_s *stream, uint width, uint heig
static void *_releaser_thread(void *v_ctx);
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
static void *_raw_thread(void *v_ctx);
static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue);
static bool _stream_has_any_clients(us_stream_s *stream);
static bool _stream_has_jpeg_clients_cached(us_stream_s *stream);
static bool _stream_has_any_clients_cached(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame);
static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame);
static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame);
static void _stream_check_suicide(us_stream_s *stream);
#define _SINK_PUT(x_sink, x_frame) { \
if (stream->x_sink && us_memsink_server_check(stream->x_sink, x_frame)) {\
bool m_key_requested; /* Unused */ \
us_memsink_server_put(stream->x_sink, x_frame, &m_key_requested); \
} \
}
us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
us_stream_runtime_s *run;
US_CALLOC(run, 1);
@ -158,22 +145,29 @@ void us_stream_loop(us_stream_s *stream) {
US_THREAD_CREATE(ctx->tid, _releaser_thread, ctx);
}
_jpeg_context_s jpeg_ctx = {
_worker_context_s jpeg_ctx = {
.queue = us_queue_init(dev->run->n_bufs),
.stream = stream,
.stop = &threads_stop,
};
US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx);
_h264_context_s h264_ctx;
_worker_context_s h264_ctx;
if (run->h264 != NULL) {
h264_ctx.dev = dev;
h264_ctx.queue = us_queue_init(dev->run->n_bufs);
h264_ctx.h264 = run->h264;
h264_ctx.stream = stream;
h264_ctx.stop = &threads_stop;
US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx);
}
_worker_context_s raw_ctx;
if (stream->raw_sink != NULL) {
raw_ctx.queue = us_queue_init(2);
raw_ctx.stream = stream;
raw_ctx.stop = &threads_stop;
US_THREAD_CREATE(raw_ctx.tid, _raw_thread, &raw_ctx);
}
uint captured_fps_accum = 0;
sll captured_fps_ts = 0;
uint captured_fps = 0;
@ -182,15 +176,6 @@ void us_stream_loop(us_stream_s *stream) {
uint slowdown_count = 0;
while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) {
_stream_check_suicide(stream);
if (stream->slowdown && !_stream_has_any_clients(stream)) {
usleep(100 * 1000);
slowdown_count = (slowdown_count + 1) % 10;
if (slowdown_count > 0) {
continue;
}
}
us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(dev, &hw);
switch (buf_index) {
@ -219,14 +204,31 @@ void us_stream_loop(us_stream_s *stream) {
us_device_buffer_incref(hw); // H264
us_queue_put(h264_ctx.queue, hw, 0);
}
if (stream->raw_sink != NULL) {
us_device_buffer_incref(hw); // RAW
us_queue_put(raw_ctx.queue, hw, 0);
}
us_queue_put(releasers[buf_index].queue, hw, 0); // Plan to release
_SINK_PUT(raw_sink, &hw->raw);
// Мы не обновляем здесь состояние синков, потому что это происходит внутри обслуживающих их потоков
_stream_check_suicide(stream);
if (stream->slowdown && !_stream_has_any_clients_cached(stream)) {
usleep(100 * 1000);
slowdown_count = (slowdown_count + 1) % 10;
if (slowdown_count > 0) {
continue;
}
}
}
close:
atomic_store(&threads_stop, true);
if (stream->raw_sink != NULL) {
US_THREAD_JOIN(raw_ctx.tid);
us_queue_destroy(raw_ctx.queue);
}
if (run->h264 != NULL) {
US_THREAD_JOIN(h264_ctx.tid);
us_queue_destroy(h264_ctx.queue);
@ -309,7 +311,7 @@ done:
static void *_jpeg_thread(void *v_ctx) {
US_THREAD_SETTLE("str_jpeg")
_jpeg_context_s *ctx = v_ctx;
_worker_context_s *ctx = v_ctx;
us_stream_s *stream = ctx->stream;
ldf grab_after = 0;
@ -325,11 +327,10 @@ static void *_jpeg_thread(void *v_ctx) {
if (ready_wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_http(stream, ready_job->dest);
_stream_expose_jpeg(stream, ready_job->dest);
if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots
atomic_fetch_sub(&stream->run->http_snapshot_requested, 1);
}
_SINK_PUT(jpeg_sink, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
@ -342,11 +343,8 @@ static void *_jpeg_thread(void *v_ctx) {
continue;
}
if ( // Если никто не смотрит MJPEG - пропускаем кадр
!atomic_load(&stream->run->http_has_clients)
&& (atomic_load(&stream->run->http_snapshot_requested) == 0)
&& (stream->jpeg_sink == NULL || !us_memsink_server_check(stream->jpeg_sink, NULL))
) {
const bool update_required = (stream->jpeg_sink != NULL && us_memsink_server_check(stream->jpeg_sink, NULL));
if (!update_required && !_stream_has_jpeg_clients_cached(stream)) {
US_LOG_VERBOSE("Passed JPEG encoding because nobody is watching");
us_device_buffer_decref(hw);
continue;
@ -375,7 +373,7 @@ static void *_jpeg_thread(void *v_ctx) {
static void *_h264_thread(void *v_ctx) {
US_THREAD_SETTLE("str_h264");
_h264_context_s *ctx = v_ctx;
_worker_context_s *ctx = v_ctx;
ldf last_encode_ts = us_get_now_monotonic();
while (!atomic_load(ctx->stop)) {
@ -384,7 +382,7 @@ static void *_h264_thread(void *v_ctx) {
continue;
}
if (!us_memsink_server_check(ctx->h264->sink, NULL)) {
if (!us_memsink_server_check(ctx->stream->run->h264->sink, NULL)) {
us_device_buffer_decref(hw);
US_LOG_VERBOSE("Passed H264 encoding because nobody is watching");
continue;
@ -395,7 +393,29 @@ static void *_h264_thread(void *v_ctx) {
const bool force_key = (last_encode_ts + 0.5 < now_ts);
last_encode_ts = now_ts;
us_h264_stream_process(ctx->h264, &hw->raw, force_key);
us_h264_stream_process(ctx->stream->run->h264, &hw->raw, force_key);
us_device_buffer_decref(hw);
}
return NULL;
}
static void *_raw_thread(void *v_ctx) {
US_THREAD_SETTLE("str_raw");
_worker_context_s *ctx = v_ctx;
while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
if (!us_memsink_server_check(ctx->stream->raw_sink, NULL)) {
us_device_buffer_decref(hw);
US_LOG_VERBOSE("Passed RAW publishing because nobody is watching");
continue;
}
us_memsink_server_put(ctx->stream->raw_sink, &hw->raw, false);
us_device_buffer_decref(hw);
}
return NULL;
@ -413,14 +433,21 @@ static us_hw_buffer_s *_get_latest_hw(us_queue_s *queue) {
return hw;
}
static bool _stream_has_any_clients(us_stream_s *stream) {
static bool _stream_has_jpeg_clients_cached(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
atomic_load(&run->http_has_clients)
|| (atomic_load(&run->http_snapshot_requested) > 0)
// has_clients синков НЕ обновляются в реальном времени
|| (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients))
|| (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients))
);
}
static bool _stream_has_any_clients_cached(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
_stream_has_jpeg_clients_cached(stream)
|| (run->h264 != NULL && atomic_load(&run->h264->sink->has_clients))
|| (stream->raw_sink != NULL && atomic_load(&stream->raw_sink->has_clients))
);
}
@ -429,6 +456,18 @@ static int _stream_init_loop(us_stream_s *stream) {
bool waiting_reported = false;
while (!atomic_load(&stream->run->stop)) {
// Флаги has_clients у синков не обновляются сами по себе, поэтому обновим их
// на каждой итерации старта стрима. После старта этим будут заниматься воркеры.
if (stream->jpeg_sink != NULL) {
us_memsink_server_check(stream->jpeg_sink, NULL);
}
if (stream->run->h264 != NULL) {
us_memsink_server_check(stream->run->h264->sink, NULL);
}
if (stream->raw_sink != NULL) {
us_memsink_server_check(stream->raw_sink, NULL);
}
_stream_check_suicide(stream);
uint width = stream->dev->run->width;
@ -444,12 +483,11 @@ static int _stream_init_loop(us_stream_s *stream) {
us_gpio_set_stream_online(false);
# endif
_stream_expose_http(stream, run->blank->jpeg);
_SINK_PUT(jpeg_sink, run->blank->jpeg);
_stream_expose_jpeg(stream, run->blank->jpeg);
if (run->h264 != NULL) {
us_h264_stream_process(run->h264, run->blank->raw, true);
}
_SINK_PUT(raw_sink, run->blank->raw);
_stream_expose_raw(stream, run->blank->raw);
stream->dev->dma_export = (
stream->enc->type == US_ENCODER_TYPE_M2M_VIDEO
@ -482,7 +520,7 @@ static int _stream_init_loop(us_stream_s *stream) {
return -1;
}
static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame) {
static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame) {
us_stream_runtime_s *const run = stream->run;
int ri;
while ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0) {
@ -493,19 +531,25 @@ static void _stream_expose_http(us_stream_s *stream, const us_frame_s *frame) {
us_frame_s *const dest = run->http_jpeg_ring->items[ri];
us_frame_copy(frame, dest);
us_ring_producer_release(run->http_jpeg_ring, ri);
if (stream->jpeg_sink != NULL) {
us_memsink_server_put(stream->jpeg_sink, dest, NULL);
}
}
static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame) {
if (stream->raw_sink != NULL) {
us_memsink_server_put(stream->raw_sink, frame, NULL);
}
}
static void _stream_check_suicide(us_stream_s *stream) {
if (stream->exit_on_no_clients == 0) {
return;
}
us_stream_runtime_s *const run = stream->run;
const ldf now_ts = us_get_now_monotonic();
const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds
if (_stream_has_any_clients(stream)) {
if (_stream_has_any_clients_cached(stream)) {
atomic_store(&run->http_last_request_ts, now_ts);
} else if (http_last_request_ts + stream->exit_on_no_clients < now_ts) {
US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...",