From 5e18ce3806563a1b8adfcc8c9d8ea5bdcd2e4949 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Mon, 7 Dec 2020 20:31:04 +0300 Subject: [PATCH] rawsink slave --- src/rawsink/rawsink.c | 95 +++++++++++++++++++++++++++++++++++------- src/rawsink/rawsink.h | 15 +++++-- src/ustreamer/stream.c | 2 +- 3 files changed, 94 insertions(+), 18 deletions(-) diff --git a/src/rawsink/rawsink.c b/src/rawsink/rawsink.c index 246f288..ee254f5 100644 --- a/src/rawsink/rawsink.c +++ b/src/rawsink/rawsink.c @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -36,8 +37,12 @@ #include "../common/logging.h" -struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm) { +static int _sem_wait_monotonic(sem_t *sem, long double timeout); + + +struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm, bool master) { struct rawsink_t *rawsink; + int flags = (master ? O_RDWR | O_CREAT : O_RDWR); A_CALLOC(rawsink, 1); rawsink->fd = -1; @@ -45,6 +50,7 @@ struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm) { rawsink->signal_sem = SEM_FAILED; rawsink->lock_sem = SEM_FAILED; rawsink->rm = rm; + rawsink->master = master; A_CALLOC(rawsink->mem_name, strlen(name) + 8); A_CALLOC(rawsink->signal_name, strlen(name) + 8); @@ -56,8 +62,20 @@ struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm) { LOG_INFO("Using RAW sink: %s.{mem,sig,lock}", name); +# define OPEN_SEM(_role, _default) { \ + if ((rawsink->_role##_sem = sem_open(rawsink->_role##_name, flags, mode, _default)) == SEM_FAILED) { \ + LOG_PERROR("Can't open RAW sink " #_role " semaphore"); \ + goto error; \ + } \ + } + + if (!master) { + OPEN_SEM(lock, 1); + OPEN_SEM(signal, 0); + } + { // Shared memory - if ((rawsink->fd = shm_open(rawsink->mem_name, O_RDWR | O_CREAT, mode)) == -1) { + if ((rawsink->fd = shm_open(rawsink->mem_name, flags, mode)) == -1) { LOG_PERROR("Can't open RAW sink memory"); goto error; } @@ -80,15 +98,10 @@ struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm) { } } -# define OPEN_SEM(_role, _default) { \ - if ((rawsink->_role##_sem = sem_open(rawsink->_role##_name, O_RDWR | O_CREAT, mode, _default)) == SEM_FAILED) { \ - LOG_PERROR("Can't open RAW sink " #_role " semaphore"); \ - goto error; \ - } \ - } - - OPEN_SEM(signal, 0); - OPEN_SEM(lock, 1); + if (master) { + OPEN_SEM(signal, 0); + OPEN_SEM(lock, 1); + } # undef OPEN_SEM @@ -149,7 +162,9 @@ void rawsink_put( long double now = get_now_monotonic(); - if (rawsink->failed) { + assert(rawsink->master); // Master only + + if (rawsink->master_failed) { return; } @@ -171,7 +186,7 @@ void rawsink_put( PIC(width) = width; PIC(height) = height; PIC(grab_ts) = grab_ts; - PIC(used) = size; + PIC(size) = size; memcpy(PIC(data), data, size); # undef PIC @@ -197,5 +212,57 @@ void rawsink_put( error: LOG_ERROR("RAW sink completely disabled due error"); - rawsink->failed = true; + rawsink->master_failed = true; +} + +int rawsink_get( + struct rawsink_t *rawsink, + char *data, size_t *size, + unsigned *format, unsigned *width, unsigned *height, + long double *grab_ts, + long double timeout) { + + assert(!rawsink->master); // Slave only + +# define WAIT_SEM(_role) { \ + if (_sem_wait_monotonic(rawsink->_role##_sem, timeout) < 0) { \ + if (errno == EAGAIN) { \ + return -2; \ + } \ + LOG_PERROR("RAWSRC: Can't wait %s", rawsink->_role##_name); \ + return -1; \ + } \ + } + + WAIT_SEM(signal); + WAIT_SEM(lock); + +# define PIC(_next) rawsink->picture->_next + *format = PIC(format); + *width = PIC(width); + *height = PIC(height); + *grab_ts = PIC(grab_ts); + *size = PIC(size); + memcpy(data, PIC(data), *size); +# undef PIC + + if (sem_post(rawsink->lock_sem) < 0) { + LOG_PERROR("RAWSINK: Can't post %s", rawsink->lock_name); + return -1; + } + return 0; +} + +static int _sem_wait_monotonic(sem_t *sem, long double timeout) { + long double deadline_ts = get_now_monotonic() + timeout; + int retval = -1; + + while (true) { + retval = sem_trywait(sem); + if (retval == 0 || retval != EAGAIN || get_now_monotonic() > deadline_ts) { + break; + } + usleep(1000); + } + return retval; } diff --git a/src/rawsink/rawsink.h b/src/rawsink/rawsink.h index faa685e..1476e2d 100644 --- a/src/rawsink/rawsink.h +++ b/src/rawsink/rawsink.h @@ -40,7 +40,7 @@ struct rawsink_picture_t { unsigned width; unsigned height; long double grab_ts; - size_t used; + size_t size; unsigned char data[RAWSINK_MAX_DATA]; }; @@ -56,11 +56,13 @@ struct rawsink_t { sem_t *lock_sem; bool rm; - bool failed; + bool master; + + bool master_failed; }; -struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm); +struct rawsink_t *rawsink_init(const char *name, mode_t mode, bool rm, bool master); void rawsink_destroy(struct rawsink_t *rawsink); void rawsink_put( @@ -68,3 +70,10 @@ void rawsink_put( const unsigned char *data, size_t size, unsigned format, unsigned witdh, unsigned height, long double grab_ts); + +int rawsink_get( + struct rawsink_t *rawsink, + char *data, size_t *size, + unsigned *format, unsigned *width, unsigned *height, + long double *grab_ts, + long double timeout); diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index b2bc4f5..fdba25b 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -140,7 +140,7 @@ void stream_loop(struct stream_t *stream) { # ifdef WITH_RAWSINK struct rawsink_t *rawsink = NULL; if (DEV(rawsink_name[0]) != '\0') { - rawsink = rawsink_init(DEV(rawsink_name), DEV(rawsink_mode), DEV(rawsink_rm)); + rawsink = rawsink_init(DEV(rawsink_name), DEV(rawsink_mode), DEV(rawsink_rm), true); } # endif