From 14131f0b54907c6afd5c91ffd0f1c6f48f4c46b0 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Sun, 17 Jan 2021 00:06:44 +0300 Subject: [PATCH] check memsink clients --- src/libs/memsink.c | 25 +++++++++++++++++++++++-- src/libs/memsink.h | 3 ++- src/ustreamer/h264/stream.c | 4 ++++ src/ustreamer/stream.c | 4 +++- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/libs/memsink.c b/src/libs/memsink.c index 6a8add1..282ae65 100644 --- a/src/libs/memsink.c +++ b/src/libs/memsink.c @@ -87,6 +87,27 @@ void memsink_destroy(memsink_s *sink) { free(sink); } +int memsink_server_check_clients(memsink_s *sink) { + assert(sink->server); + + if (flock(sink->fd, LOCK_EX | LOCK_NB) < 0) { + if (errno == EWOULDBLOCK) { + sink->has_clients = true; + return 0; + } + LOG_PERROR("%s-sink: Can't lock memory", sink->name); + return -1; + } + + sink->has_clients = (sink->mem->last_client_ts + 10 > get_now_monotonic()); + + if (flock(sink->fd, LOCK_UN) < 0) { + LOG_PERROR("%s-sink: Can't unlock memory", sink->name); + return -1; + } + return 0; +} + int memsink_server_put(memsink_s *sink, const frame_s *frame) { assert(sink->server); @@ -113,7 +134,7 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) { COPY(grab_ts); COPY(encode_begin_ts); COPY(encode_end_ts); - sink->has_clients = (sink->mem->last_consumed_ts + 10 > get_now_monotonic()); + sink->has_clients = (sink->mem->last_client_ts + 10 > get_now_monotonic()); memcpy(sink->mem->data, frame->data, frame->used); # undef COPY @@ -147,6 +168,7 @@ int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress u bool same = false; + sink->mem->last_client_ts = get_now_monotonic(); if (sink->mem->id == sink->last_id) { same = true; } else { @@ -160,7 +182,6 @@ int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress u COPY(grab_ts); COPY(encode_begin_ts); COPY(encode_end_ts); - sink->mem->last_consumed_ts = get_now_monotonic(); frame_set_data(frame, sink->mem->data, sink->mem->used); # undef COPY } diff --git a/src/libs/memsink.h b/src/libs/memsink.h index bbf606d..59cb1a9 100644 --- a/src/libs/memsink.h +++ b/src/libs/memsink.h @@ -56,7 +56,7 @@ typedef struct { long double grab_ts; long double encode_begin_ts; long double encode_end_ts; - long double last_consumed_ts; + long double last_client_ts; uint8_t data[MEMSINK_MAX_DATA]; } memsink_shared_s; @@ -77,5 +77,6 @@ typedef struct { memsink_s *memsink_init(const char *name, const char *obj, bool server, mode_t mode, bool rm, unsigned timeout); void memsink_destroy(memsink_s *sink); +int memsink_server_check_clients(memsink_s *sink); int memsink_server_put(memsink_s *sink, const frame_s *frame); int memsink_client_get(memsink_s *sink, frame_s *frame); diff --git a/src/ustreamer/h264/stream.c b/src/ustreamer/h264/stream.c index 11fd08a..6c0684f 100644 --- a/src/ustreamer/h264/stream.c +++ b/src/ustreamer/h264/stream.c @@ -53,6 +53,10 @@ void h264_stream_destroy(h264_stream_s *h264) { } void h264_stream_process(h264_stream_s *h264, const frame_s *frame, int vcsm_handle, bool force_key) { + if (memsink_server_check_clients(h264->sink) < 0 || !h264->sink->has_clients) { + return; + } + long double now = get_now_monotonic(); bool zero_copy = false; diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 70ebdff..0ab7104 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -351,7 +351,9 @@ static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned capt A_MUTEX_UNLOCK(&VID(mutex)); if (changed && stream->sink) { - memsink_server_put(stream->sink, VID(frame)); + if (memsink_server_check_clients(stream->sink) == 0 && stream->sink->has_clients) { + memsink_server_put(stream->sink, VID(frame)); + } } return changed;