diff --git a/src/libs/memsink.c b/src/libs/memsink.c index f8c8dee..39bf7db 100644 --- a/src/libs/memsink.c +++ b/src/libs/memsink.c @@ -23,71 +23,44 @@ #include "memsink.h" -static int _sem_timedwait_monotonic(sem_t *sem, long double timeout); static int _flock_timedwait_monotonic(int fd, long double timeout); -memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_t mode, bool rm, unsigned timeout) { +memsink_s *memsink_init(const char *name, const char *obj, bool server, mode_t mode, bool rm, unsigned timeout) { memsink_s *sink; A_CALLOC(sink, 1); sink->name = name; + sink->obj = obj; sink->server = server; sink->rm = rm; sink->timeout = timeout; sink->fd = -1; sink->mem = MAP_FAILED; - sink->sig_sem = SEM_FAILED; - A_CALLOC(sink->mem_name, strlen(prefix) + 8); - A_CALLOC(sink->sig_name, strlen(prefix) + 8); + LOG_INFO("Using %s sink: %s", name, name); - sprintf(sink->mem_name, "%s.mem", prefix); - sprintf(sink->sig_name, "%s.sig", prefix); - - LOG_INFO("Using '%s' sink: %s.{mem,sig}", name, prefix); - - const int flags = (server ? O_RDWR | O_CREAT : O_RDWR); -# define OPEN_SIGNAL { \ - if ((sink->sig_sem = sem_open(sink->sig_name, flags, mode, 0)) == SEM_FAILED) { \ - LOG_PERROR("Can't open %s sink signal semaphore", name); \ - goto error; \ - } \ - } - - if (!server) { - OPEN_SIGNAL; + if ((sink->fd = shm_open(sink->obj, (server ? O_RDWR | O_CREAT : O_RDWR), mode)) == -1) { + LOG_PERROR("%s sink: Can't open shared memory", name); + goto error; } - { // Shared memory - if ((sink->fd = shm_open(sink->mem_name, flags, mode)) == -1) { - LOG_PERROR("Can't open %s sink memory", name); - goto error; - } - - if (sink->server && ftruncate(sink->fd, sizeof(memsink_shared_s)) < 0) { - LOG_PERROR("Can't truncate %s sink memory", name); - goto error; - } - - if ((sink->mem = mmap( - NULL, - sizeof(memsink_shared_s), - PROT_READ | PROT_WRITE, - MAP_SHARED, - sink->fd, - 0 - )) == MAP_FAILED) { - LOG_PERROR("Can't mmap %s sink memory", name); - goto error; - } + if (sink->server && ftruncate(sink->fd, sizeof(memsink_shared_s)) < 0) { + LOG_PERROR("%s sink: Can't truncate shared memory", name); + goto error; } - if (server) { - OPEN_SIGNAL; + if ((sink->mem = mmap( + NULL, + sizeof(memsink_shared_s), + PROT_READ | PROT_WRITE, + MAP_SHARED, + sink->fd, + 0 + )) == MAP_FAILED) { + LOG_PERROR("%s sink: Can't mmap shared memory", name); + goto error; } -# undef OPEN_SIGNAL - return sink; error: @@ -96,36 +69,21 @@ memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_ } void memsink_destroy(memsink_s *sink) { - if (sink->sig_sem != SEM_FAILED) { - if (sem_close(sink->sig_sem) < 0) { - LOG_PERROR("Can't close %s sink signal semaphore", sink->name); - } - if (sink->rm && sem_unlink(sink->sig_name) < 0) { - if (errno != ENOENT) { - LOG_PERROR("Can't remove %s sink signal semaphore", sink->name); - } - } - } - if (sink->mem != MAP_FAILED) { if (munmap(sink->mem, sizeof(memsink_shared_s)) < 0) { - LOG_PERROR("Can't unmap %s sink memory", sink->name); + LOG_PERROR("%s sink: Can't unmap shared memory", sink->name); } } - if (sink->fd >= 0) { if (close(sink->fd) < 0) { - LOG_PERROR("Can't close %s sink fd", sink->name); + LOG_PERROR("%s sink: Can't close shared memory fd", sink->name); } - if (sink->rm && shm_unlink(sink->mem_name) < 0) { + if (sink->rm && shm_unlink(sink->obj) < 0) { if (errno != ENOENT) { - LOG_PERROR("Can't remove %s sink memory", sink->name); + LOG_PERROR("%s sink: Can't remove shared memory", sink->name); } } } - - free(sink->sig_name); - free(sink->mem_name); free(sink); } @@ -143,12 +101,8 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) { if (_flock_timedwait_monotonic(sink->fd, 1) == 0) { LOG_VERBOSE("%s sink: >>>>> Exposing new frame ...", sink->name); - if (sem_trywait(sink->sig_sem) < 0 && errno != EAGAIN) { - LOG_PERROR("%s sink: Can't wait signal semaphore", sink->name); - return -1; - } - # define COPY(_field) sink->mem->_field = frame->_field + sink->mem->seq += 1; COPY(used); COPY(width); COPY(height); @@ -161,10 +115,6 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) { memcpy(sink->mem->data, frame->data, frame->used); # undef COPY - if (sem_post(sink->sig_sem) < 0) { - LOG_PERROR("%s sink: Can't post signal semaphore", sink->name); - return -1; - } if (flock(sink->fd, LOCK_UN) < 0) { LOG_PERROR("%s sink: Can't unlock memory", sink->name); return -1; @@ -185,13 +135,6 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) { int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress unusedFunction assert(!sink->server); // Client only - if (_sem_timedwait_monotonic(sink->sig_sem, sink->timeout) < 0) { - if (errno == EAGAIN) { - return -2; - } - LOG_PERROR("%s sink: Can't wait signal semaphore", sink->name); - return -1; - } if (_flock_timedwait_monotonic(sink->fd, sink->timeout) < 0) { if (errno == EWOULDBLOCK) { return -2; @@ -200,37 +143,35 @@ int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress u return -1; } -# define COPY(_field) frame->_field = sink->mem->_field - COPY(width); - COPY(height); - COPY(format); - COPY(stride); - COPY(online); - COPY(grab_ts); - COPY(encode_begin_ts); - COPY(encode_end_ts); - frame_set_data(frame, sink->mem->data, sink->mem->used); -# undef COPY + bool same = false; + + if (sink->client_seq == sink->mem->seq) { + same = true; + } else { +# define COPY(_field) frame->_field = sink->mem->_field + sink->client_seq = sink->mem->seq; + COPY(width); + COPY(height); + COPY(format); + COPY(stride); + COPY(online); + COPY(grab_ts); + COPY(encode_begin_ts); + COPY(encode_end_ts); + frame_set_data(frame, sink->mem->data, sink->mem->used); +# undef COPY + } if (flock(sink->fd, LOCK_UN) < 0) { LOG_PERROR("%s sink: Can't unlock memory", sink->name); return -1; } - return 0; -} -static int _sem_timedwait_monotonic(sem_t *sem, long double timeout) { - long double deadline_ts = get_now_monotonic() + timeout; - int retval = -1; - - while (true) { - retval = sem_trywait(sem); - if (retval == 0 || errno != EAGAIN || get_now_monotonic() > deadline_ts) { - break; - } + if (same) { usleep(1000); + return -2; } - return retval; + return 0; } static int _flock_timedwait_monotonic(int fd, long double timeout) { diff --git a/src/libs/memsink.h b/src/libs/memsink.h index 36d5c6f..2576c75 100644 --- a/src/libs/memsink.h +++ b/src/libs/memsink.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include @@ -47,6 +46,7 @@ typedef struct { + uint64_t seq; size_t used; unsigned width; unsigned height; @@ -61,20 +61,18 @@ typedef struct { typedef struct { const char *name; + const char *obj; bool server; bool rm; unsigned timeout; - char *mem_name; - char *sig_name; - int fd; memsink_shared_s *mem; - sem_t *sig_sem; + uint64_t client_seq; } memsink_s; -memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_t mode, bool rm, unsigned timeout); +memsink_s *memsink_init(const char *name, const char *obj, bool server, mode_t mode, bool rm, unsigned timeout); void memsink_destroy(memsink_s *sink); int memsink_server_put(memsink_s *sink, const frame_s *frame); diff --git a/src/ustreamer/options.c b/src/ustreamer/options.c index bade381..e739d21 100644 --- a/src/ustreamer/options.c +++ b/src/ustreamer/options.c @@ -482,7 +482,7 @@ int options_parse(options_s *options, device_s *dev, encoder_s *enc, stream_s *s # define ADD_SINK(_lower, _upper) { \ if (_lower##_sink_name && _lower##_sink_name[0] != '\0') { \ options->_lower##_sink = memsink_init( \ - #_lower, \ + #_upper, \ _lower##_sink_name, \ true, \ _lower##_sink_mode, \