From 89467a0ef9e7b63a8dd61dd95fae08423acc185a Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Fri, 11 Dec 2020 22:03:25 +0300 Subject: [PATCH] using flock for rawsink --- src/rawsink/rawsink.c | 148 +++++++++++++++++++++------------------- src/rawsink/rawsink.h | 26 ++++--- src/ustreamer/options.c | 18 +++-- src/ustreamer/stream.c | 9 ++- src/ustreamer/stream.h | 1 + ustreamer.1 | 3 + 6 files changed, 111 insertions(+), 94 deletions(-) diff --git a/src/rawsink/rawsink.c b/src/rawsink/rawsink.c index f706d22..d54c3e0 100644 --- a/src/rawsink/rawsink.c +++ b/src/rawsink/rawsink.c @@ -23,41 +23,39 @@ #include "rawsink.h" -static int _sem_wait_monotonic(sem_t *sem, long double timeout); +static int _sem_timedwait_monotonic(sem_t *sem, long double timeout); +static int _flock_timedwait_monotonic(int fd, long double timeout); -rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) { +rawsink_s *rawsink_init(const char *name, bool server, mode_t mode, bool rm, unsigned timeout) { rawsink_s *rawsink; int flags = (server ? O_RDWR | O_CREAT : O_RDWR); A_CALLOC(rawsink, 1); - rawsink->fd = -1; - rawsink->shared = MAP_FAILED; - rawsink->signal_sem = SEM_FAILED; - rawsink->lock_sem = SEM_FAILED; - rawsink->rm = rm; rawsink->server = server; + rawsink->rm = rm; + rawsink->timeout = timeout; + rawsink->fd = -1; + rawsink->mem = MAP_FAILED; + rawsink->sig_sem = SEM_FAILED; A_CALLOC(rawsink->mem_name, strlen(name) + 8); - A_CALLOC(rawsink->signal_name, strlen(name) + 8); - A_CALLOC(rawsink->lock_name, strlen(name) + 8); + A_CALLOC(rawsink->sig_name, strlen(name) + 8); sprintf(rawsink->mem_name, "%s.mem", name); - sprintf(rawsink->signal_name, "%s.sig", name); - sprintf(rawsink->lock_name, "%s.lock", name); + sprintf(rawsink->sig_name, "%s.sig", name); - LOG_INFO("Using RAW sink: %s.{mem,sig,lock}", name); + LOG_INFO("Using RAW sink: %s.{mem,sig}", name); -# define OPEN_SEM(_role, _default) { \ - if ((rawsink->_role##_sem = sem_open(rawsink->_role##_name, flags, mode, _default)) == SEM_FAILED) { \ - LOG_PERROR("Can't open RAW sink " #_role " semaphore"); \ +# define OPEN_SIGNAL { \ + if ((rawsink->sig_sem = sem_open(rawsink->sig_name, flags, mode, 0)) == SEM_FAILED) { \ + LOG_PERROR("Can't open RAW sink signal semaphore"); \ goto error; \ } \ } if (!server) { - OPEN_SEM(lock, 1); - OPEN_SEM(signal, 0); + OPEN_SIGNAL; } { // Shared memory @@ -66,12 +64,12 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) { goto error; } - if (ftruncate(rawsink->fd, sizeof(rawsink_shared_s)) < 0) { + if (rawsink->server && ftruncate(rawsink->fd, sizeof(rawsink_shared_s)) < 0) { LOG_PERROR("Can't truncate RAW sink memory"); goto error; } - if ((rawsink->shared = mmap( + if ((rawsink->mem = mmap( NULL, sizeof(rawsink_shared_s), PROT_READ | PROT_WRITE, @@ -85,11 +83,10 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) { } if (server) { - OPEN_SEM(signal, 0); - OPEN_SEM(lock, 1); + OPEN_SIGNAL; } -# undef OPEN_SEM +# undef OPEN_SIGNAL return rawsink; @@ -99,26 +96,19 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) { } void rawsink_destroy(rawsink_s *rawsink) { -# define CLOSE_SEM(_role) { \ - if (rawsink->_role##_sem != SEM_FAILED) { \ - if (sem_close(rawsink->_role##_sem) < 0) { \ - LOG_PERROR("Can't close RAW sink " #_role " semaphore"); \ - } \ - if (rawsink->rm && sem_unlink(rawsink->_role##_name) < 0) { \ - if (errno != ENOENT) { \ - LOG_PERROR("Can't remove RAW sink " #_role " semaphore"); \ - } \ - } \ - } \ + if (rawsink->sig_sem != SEM_FAILED) { + if (sem_close(rawsink->sig_sem) < 0) { + LOG_PERROR("Can't close RAW sink signal semaphore"); } + if (rawsink->rm && sem_unlink(rawsink->sig_name) < 0) { + if (errno != ENOENT) { + LOG_PERROR("Can't remove RAW sink signal semaphore"); + } + } + } - CLOSE_SEM(lock); - CLOSE_SEM(signal); - -# undef CLOSE_SEM - - if (rawsink->shared != MAP_FAILED) { - if (munmap(rawsink->shared, sizeof(rawsink_shared_s)) < 0) { + if (rawsink->mem != MAP_FAILED) { + if (munmap(rawsink->mem, sizeof(rawsink_shared_s)) < 0) { LOG_PERROR("Can't unmap RAW sink memory"); } } @@ -134,8 +124,7 @@ void rawsink_destroy(rawsink_s *rawsink) { } } - free(rawsink->lock_name); - free(rawsink->signal_name); + free(rawsink->sig_name); free(rawsink->mem_name); free(rawsink); } @@ -159,38 +148,38 @@ void rawsink_server_put( return; } - if (sem_trywait(rawsink->lock_sem) == 0) { + if (_flock_timedwait_monotonic(rawsink->fd, 1) == 0) { LOG_PERF("RAWSINK: >>>>> Exposing new frame ..."); - if (sem_trywait(rawsink->signal_sem) < 0 && errno != EAGAIN) { - LOG_PERROR("RAWSINK: Can't wait %s", rawsink->signal_name); + if (sem_trywait(rawsink->sig_sem) < 0 && errno != EAGAIN) { + LOG_PERROR("RAWSINK: Can't wait signal semaphore"); goto error; } -# define COPY(_field) rawsink->shared->_field = _field +# define COPY(_field) rawsink->mem->_field = _field COPY(format); COPY(width); COPY(height); COPY(grab_ts); COPY(size); - memcpy(rawsink->shared->data, data, size); + memcpy(rawsink->mem->data, data, size); # undef COPY - if (sem_post(rawsink->signal_sem) < 0) { - LOG_PERROR("RAWSINK: Can't post %s", rawsink->signal_name); + if (sem_post(rawsink->sig_sem) < 0) { + LOG_PERROR("RAWSINK: Can't post signal semaphore"); goto error; } - if (sem_post(rawsink->lock_sem) < 0) { - LOG_PERROR("RAWSINK: Can't post %s", rawsink->lock_name); + if (flock(rawsink->fd, LOCK_UN) < 0) { + LOG_PERROR("RAWSINK: Can't unlock memory"); goto error; } LOG_VERBOSE("RAWSINK: Exposed new frame; full exposition time = %Lf", get_now_monotonic() - now); - } else if (errno == EAGAIN) { + } else if (errno == EWOULDBLOCK) { LOG_PERF("RAWSINK: ===== Shared memory is busy now; frame skipped"); } else { - LOG_PERROR("RAWSINK: Can't wait %s", rawsink->lock_name); + LOG_PERROR("RAWSINK: Can't lock memory"); goto error; } @@ -205,47 +194,62 @@ int rawsink_client_get( rawsink_s *rawsink, char *data, size_t *size, unsigned *format, unsigned *width, unsigned *height, - long double *grab_ts, - long double timeout) { + long double *grab_ts) { assert(!rawsink->server); // Client only -# define WAIT_SEM(_role) { \ - if (_sem_wait_monotonic(rawsink->_role##_sem, timeout) < 0) { \ - if (errno == EAGAIN) { \ - return -2; \ - } \ - LOG_PERROR("RAWSRC: Can't wait %s", rawsink->_role##_name); \ - return -1; \ - } \ + if (_sem_timedwait_monotonic(rawsink->sig_sem, rawsink->timeout) < 0) { + if (errno == EAGAIN) { + return -2; } + LOG_PERROR("RAWSRC: Can't wait signal semaphore"); + return -1; + } + if (_flock_timedwait_monotonic(rawsink->fd, rawsink->timeout) < 0) { + if (errno == EWOULDBLOCK) { + return -2; + } + LOG_PERROR("RAWSRC: Can't lock memory"); + return -1; + } - WAIT_SEM(signal); - WAIT_SEM(lock); - -# define COPY(_field) *_field = rawsink->shared->_field +# define COPY(_field) *_field = rawsink->mem->_field COPY(format); COPY(width); COPY(height); COPY(grab_ts); COPY(size); - memcpy(data, rawsink->shared->data, *size); + memcpy(data, rawsink->mem->data, *size); # undef COPY - if (sem_post(rawsink->lock_sem) < 0) { - LOG_PERROR("RAWSINK: Can't post %s", rawsink->lock_name); + if (flock(rawsink->fd, LOCK_UN) < 0) { + LOG_PERROR("RAWSRC: Can't unlock memory"); return -1; } return 0; } -static int _sem_wait_monotonic(sem_t *sem, long double timeout) { +static int _sem_timedwait_monotonic(sem_t *sem, long double timeout) { long double deadline_ts = get_now_monotonic() + timeout; int retval = -1; while (true) { retval = sem_trywait(sem); - if (retval == 0 || retval != EAGAIN || get_now_monotonic() > deadline_ts) { + if (retval == 0 || errno != EAGAIN || get_now_monotonic() > deadline_ts) { + break; + } + usleep(1000); + } + return retval; +} + +static int _flock_timedwait_monotonic(int fd, long double timeout) { + long double deadline_ts = get_now_monotonic() + timeout; + int retval = -1; + + while (true) { + retval = flock(fd, LOCK_EX | LOCK_NB); + if (retval == 0 || errno != EWOULDBLOCK || get_now_monotonic() > deadline_ts) { break; } usleep(1000); diff --git a/src/rawsink/rawsink.h b/src/rawsink/rawsink.h index 9c461b8..8b7e4b7 100644 --- a/src/rawsink/rawsink.h +++ b/src/rawsink/rawsink.h @@ -31,6 +31,7 @@ #include #include +#include #include #include @@ -54,24 +55,22 @@ typedef struct { } rawsink_shared_s; typedef struct { - char *mem_name; - char *signal_name; - char *lock_name; + bool server; + bool rm; + unsigned timeout; - int fd; - rawsink_shared_s *shared; + char *mem_name; + char *sig_name; - sem_t *signal_sem; - sem_t *lock_sem; + int fd; + rawsink_shared_s *mem; + sem_t *sig_sem; - bool rm; - bool server; - - bool server_failed; + bool server_failed; } rawsink_s; -rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server); +rawsink_s *rawsink_init(const char *name, bool server, mode_t mode, bool rm, unsigned timeout); void rawsink_destroy(rawsink_s *rawsink); void rawsink_server_put( @@ -84,5 +83,4 @@ int rawsink_client_get( rawsink_s *rawsink, char *data, size_t *size, unsigned *format, unsigned *width, unsigned *height, - long double *grab_ts, - long double timeout); + long double *grab_ts); diff --git a/src/ustreamer/options.c b/src/ustreamer/options.c index 7f1a3b6..53443dd 100644 --- a/src/ustreamer/options.c +++ b/src/ustreamer/options.c @@ -86,6 +86,7 @@ enum _OPT_VALUES { _O_RAWSINK, _O_RAWSINK_MODE, _O_RAWSINK_RM, + _O_RAWSINK_TIMEOUT, #endif #ifdef WITH_GPIO @@ -170,6 +171,7 @@ static const struct option _LONG_OPTS[] = { {"raw-sink", required_argument, NULL, _O_RAWSINK}, {"raw-sink-mode", required_argument, NULL, _O_RAWSINK_MODE}, {"raw-sink-rm", no_argument, NULL, _O_RAWSINK_RM}, + {"raw-sink-timeout", required_argument, NULL, _O_RAWSINK_TIMEOUT}, #endif #ifdef WITH_GPIO @@ -400,9 +402,10 @@ int options_parse(options_s *options, device_s *dev, encoder_s *encoder, stream_ case _O_SERVER_TIMEOUT: OPT_NUMBER("--server-timeout", server->timeout, 1, 60, 0); # ifdef WITH_RAWSINK - case _O_RAWSINK: OPT_SET(stream->rawsink_name, optarg); - case _O_RAWSINK_MODE: OPT_NUMBER("--raw-sink-mode", stream->rawsink_mode, INT_MIN, INT_MAX, 8); - case _O_RAWSINK_RM: OPT_SET(stream->rawsink_rm, true); + case _O_RAWSINK: OPT_SET(stream->rawsink_name, optarg); + case _O_RAWSINK_MODE: OPT_NUMBER("--raw-sink-mode", stream->rawsink_mode, INT_MIN, INT_MAX, 8); + case _O_RAWSINK_RM: OPT_SET(stream->rawsink_rm, true); + case _O_RAWSINK_TIMEOUT: OPT_NUMBER("--raw-sink-timeout", server->timeout, 1, 60, 0); # endif # ifdef WITH_GPIO @@ -666,10 +669,11 @@ static void _help(device_s *dev, encoder_s *encoder, stream_s *stream, server_s #ifdef WITH_RAWSINK printf("RAW sink options:\n"); printf("═════════════════\n"); - printf(" --raw-sink ────── Use the shared memory to sink RAW frames before encoding.\n"); - printf(" Most likely you will never need it. Default: disabled.\n\n"); - printf(" --raw-sink-mode ─ Set RAW sink permissions (like 777). Default: %o.\n\n", stream->rawsink_mode); - printf(" --raw-sink-rm ────────── Remove shared memory on stop. Default: disabled.\n\n"); + printf(" --raw-sink ──────── Use the shared memory to sink RAW frames before encoding.\n"); + printf(" Most likely you will never need it. Default: disabled.\n\n"); + printf(" --raw-sink-mode ─── Set RAW sink permissions (like 777). Default: %o.\n\n", stream->rawsink_mode); + printf(" --raw-sink-rm ──────────── Remove shared memory on stop. Default: disabled.\n\n"); + printf(" --raw-sink-timeout ─ Timeout for lock. Default: %u.\n\n", stream->rawsink_timeout); #endif #ifdef WITH_GPIO printf("GPIO options:\n"); diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 97220c0..681aa19 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -103,6 +103,7 @@ stream_s *stream_init(device_s *dev, encoder_s *encoder) { # ifdef WITH_RAWSINK stream->rawsink_name = ""; stream->rawsink_mode = 0660; + stream->rawsink_timeout = 1; # endif stream->proc = proc; stream->video = video; @@ -129,7 +130,13 @@ void stream_loop(stream_s *stream) { # ifdef WITH_RAWSINK rawsink_s *rawsink = NULL; if (stream->rawsink_name[0] != '\0') { - rawsink = rawsink_init(stream->rawsink_name, stream->rawsink_mode, stream->rawsink_rm, true); + rawsink = rawsink_init( + stream->rawsink_name, + true, + stream->rawsink_mode, + stream->rawsink_rm, + stream->rawsink_timeout + ); } # endif diff --git a/src/ustreamer/stream.h b/src/ustreamer/stream.h index 138e218..c642387 100644 --- a/src/ustreamer/stream.h +++ b/src/ustreamer/stream.h @@ -69,6 +69,7 @@ typedef struct { char *rawsink_name; mode_t rawsink_mode; bool rawsink_rm; + unsigned rawsink_timeout; # endif device_s *dev; diff --git a/ustreamer.1 b/ustreamer.1 index 658ae09..4a53db1 100644 --- a/ustreamer.1 +++ b/ustreamer.1 @@ -208,6 +208,9 @@ Set RAW sink permissions (like 777). Default: 660. .TP .BR \-\-raw\-sink\-rm Remove shared memory on stop. Default: disabled. +.TP +.BR \-\-raw\-sink\-timeout\ \fIsec +Timeout for lock. Default: 1. .SS "Process options" .BR \-\-exit\-on\-parent\-death