using flock for rawsink

This commit is contained in:
Devaev Maxim
2020-12-11 22:03:25 +03:00
parent 0f92e73f56
commit 89467a0ef9
6 changed files with 111 additions and 94 deletions

View File

@@ -23,41 +23,39 @@
#include "rawsink.h"
static int _sem_wait_monotonic(sem_t *sem, long double timeout);
static int _sem_timedwait_monotonic(sem_t *sem, long double timeout);
static int _flock_timedwait_monotonic(int fd, long double timeout);
rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) {
rawsink_s *rawsink_init(const char *name, bool server, mode_t mode, bool rm, unsigned timeout) {
rawsink_s *rawsink;
int flags = (server ? O_RDWR | O_CREAT : O_RDWR);
A_CALLOC(rawsink, 1);
rawsink->fd = -1;
rawsink->shared = MAP_FAILED;
rawsink->signal_sem = SEM_FAILED;
rawsink->lock_sem = SEM_FAILED;
rawsink->rm = rm;
rawsink->server = server;
rawsink->rm = rm;
rawsink->timeout = timeout;
rawsink->fd = -1;
rawsink->mem = MAP_FAILED;
rawsink->sig_sem = SEM_FAILED;
A_CALLOC(rawsink->mem_name, strlen(name) + 8);
A_CALLOC(rawsink->signal_name, strlen(name) + 8);
A_CALLOC(rawsink->lock_name, strlen(name) + 8);
A_CALLOC(rawsink->sig_name, strlen(name) + 8);
sprintf(rawsink->mem_name, "%s.mem", name);
sprintf(rawsink->signal_name, "%s.sig", name);
sprintf(rawsink->lock_name, "%s.lock", name);
sprintf(rawsink->sig_name, "%s.sig", name);
LOG_INFO("Using RAW sink: %s.{mem,sig,lock}", name);
LOG_INFO("Using RAW sink: %s.{mem,sig}", name);
# define OPEN_SEM(_role, _default) { \
if ((rawsink->_role##_sem = sem_open(rawsink->_role##_name, flags, mode, _default)) == SEM_FAILED) { \
LOG_PERROR("Can't open RAW sink " #_role " semaphore"); \
# define OPEN_SIGNAL { \
if ((rawsink->sig_sem = sem_open(rawsink->sig_name, flags, mode, 0)) == SEM_FAILED) { \
LOG_PERROR("Can't open RAW sink signal semaphore"); \
goto error; \
} \
}
if (!server) {
OPEN_SEM(lock, 1);
OPEN_SEM(signal, 0);
OPEN_SIGNAL;
}
{ // Shared memory
@@ -66,12 +64,12 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) {
goto error;
}
if (ftruncate(rawsink->fd, sizeof(rawsink_shared_s)) < 0) {
if (rawsink->server && ftruncate(rawsink->fd, sizeof(rawsink_shared_s)) < 0) {
LOG_PERROR("Can't truncate RAW sink memory");
goto error;
}
if ((rawsink->shared = mmap(
if ((rawsink->mem = mmap(
NULL,
sizeof(rawsink_shared_s),
PROT_READ | PROT_WRITE,
@@ -85,11 +83,10 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) {
}
if (server) {
OPEN_SEM(signal, 0);
OPEN_SEM(lock, 1);
OPEN_SIGNAL;
}
# undef OPEN_SEM
# undef OPEN_SIGNAL
return rawsink;
@@ -99,26 +96,19 @@ rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server) {
}
void rawsink_destroy(rawsink_s *rawsink) {
# define CLOSE_SEM(_role) { \
if (rawsink->_role##_sem != SEM_FAILED) { \
if (sem_close(rawsink->_role##_sem) < 0) { \
LOG_PERROR("Can't close RAW sink " #_role " semaphore"); \
} \
if (rawsink->rm && sem_unlink(rawsink->_role##_name) < 0) { \
if (errno != ENOENT) { \
LOG_PERROR("Can't remove RAW sink " #_role " semaphore"); \
} \
} \
} \
if (rawsink->sig_sem != SEM_FAILED) {
if (sem_close(rawsink->sig_sem) < 0) {
LOG_PERROR("Can't close RAW sink signal semaphore");
}
if (rawsink->rm && sem_unlink(rawsink->sig_name) < 0) {
if (errno != ENOENT) {
LOG_PERROR("Can't remove RAW sink signal semaphore");
}
}
}
CLOSE_SEM(lock);
CLOSE_SEM(signal);
# undef CLOSE_SEM
if (rawsink->shared != MAP_FAILED) {
if (munmap(rawsink->shared, sizeof(rawsink_shared_s)) < 0) {
if (rawsink->mem != MAP_FAILED) {
if (munmap(rawsink->mem, sizeof(rawsink_shared_s)) < 0) {
LOG_PERROR("Can't unmap RAW sink memory");
}
}
@@ -134,8 +124,7 @@ void rawsink_destroy(rawsink_s *rawsink) {
}
}
free(rawsink->lock_name);
free(rawsink->signal_name);
free(rawsink->sig_name);
free(rawsink->mem_name);
free(rawsink);
}
@@ -159,38 +148,38 @@ void rawsink_server_put(
return;
}
if (sem_trywait(rawsink->lock_sem) == 0) {
if (_flock_timedwait_monotonic(rawsink->fd, 1) == 0) {
LOG_PERF("RAWSINK: >>>>> Exposing new frame ...");
if (sem_trywait(rawsink->signal_sem) < 0 && errno != EAGAIN) {
LOG_PERROR("RAWSINK: Can't wait %s", rawsink->signal_name);
if (sem_trywait(rawsink->sig_sem) < 0 && errno != EAGAIN) {
LOG_PERROR("RAWSINK: Can't wait signal semaphore");
goto error;
}
# define COPY(_field) rawsink->shared->_field = _field
# define COPY(_field) rawsink->mem->_field = _field
COPY(format);
COPY(width);
COPY(height);
COPY(grab_ts);
COPY(size);
memcpy(rawsink->shared->data, data, size);
memcpy(rawsink->mem->data, data, size);
# undef COPY
if (sem_post(rawsink->signal_sem) < 0) {
LOG_PERROR("RAWSINK: Can't post %s", rawsink->signal_name);
if (sem_post(rawsink->sig_sem) < 0) {
LOG_PERROR("RAWSINK: Can't post signal semaphore");
goto error;
}
if (sem_post(rawsink->lock_sem) < 0) {
LOG_PERROR("RAWSINK: Can't post %s", rawsink->lock_name);
if (flock(rawsink->fd, LOCK_UN) < 0) {
LOG_PERROR("RAWSINK: Can't unlock memory");
goto error;
}
LOG_VERBOSE("RAWSINK: Exposed new frame; full exposition time = %Lf", get_now_monotonic() - now);
} else if (errno == EAGAIN) {
} else if (errno == EWOULDBLOCK) {
LOG_PERF("RAWSINK: ===== Shared memory is busy now; frame skipped");
} else {
LOG_PERROR("RAWSINK: Can't wait %s", rawsink->lock_name);
LOG_PERROR("RAWSINK: Can't lock memory");
goto error;
}
@@ -205,47 +194,62 @@ int rawsink_client_get(
rawsink_s *rawsink,
char *data, size_t *size,
unsigned *format, unsigned *width, unsigned *height,
long double *grab_ts,
long double timeout) {
long double *grab_ts) {
assert(!rawsink->server); // Client only
# define WAIT_SEM(_role) { \
if (_sem_wait_monotonic(rawsink->_role##_sem, timeout) < 0) { \
if (errno == EAGAIN) { \
return -2; \
} \
LOG_PERROR("RAWSRC: Can't wait %s", rawsink->_role##_name); \
return -1; \
} \
if (_sem_timedwait_monotonic(rawsink->sig_sem, rawsink->timeout) < 0) {
if (errno == EAGAIN) {
return -2;
}
LOG_PERROR("RAWSRC: Can't wait signal semaphore");
return -1;
}
if (_flock_timedwait_monotonic(rawsink->fd, rawsink->timeout) < 0) {
if (errno == EWOULDBLOCK) {
return -2;
}
LOG_PERROR("RAWSRC: Can't lock memory");
return -1;
}
WAIT_SEM(signal);
WAIT_SEM(lock);
# define COPY(_field) *_field = rawsink->shared->_field
# define COPY(_field) *_field = rawsink->mem->_field
COPY(format);
COPY(width);
COPY(height);
COPY(grab_ts);
COPY(size);
memcpy(data, rawsink->shared->data, *size);
memcpy(data, rawsink->mem->data, *size);
# undef COPY
if (sem_post(rawsink->lock_sem) < 0) {
LOG_PERROR("RAWSINK: Can't post %s", rawsink->lock_name);
if (flock(rawsink->fd, LOCK_UN) < 0) {
LOG_PERROR("RAWSRC: Can't unlock memory");
return -1;
}
return 0;
}
static int _sem_wait_monotonic(sem_t *sem, long double timeout) {
static int _sem_timedwait_monotonic(sem_t *sem, long double timeout) {
long double deadline_ts = get_now_monotonic() + timeout;
int retval = -1;
while (true) {
retval = sem_trywait(sem);
if (retval == 0 || retval != EAGAIN || get_now_monotonic() > deadline_ts) {
if (retval == 0 || errno != EAGAIN || get_now_monotonic() > deadline_ts) {
break;
}
usleep(1000);
}
return retval;
}
static int _flock_timedwait_monotonic(int fd, long double timeout) {
long double deadline_ts = get_now_monotonic() + timeout;
int retval = -1;
while (true) {
retval = flock(fd, LOCK_EX | LOCK_NB);
if (retval == 0 || errno != EWOULDBLOCK || get_now_monotonic() > deadline_ts) {
break;
}
usleep(1000);

View File

@@ -31,6 +31,7 @@
#include <assert.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <sys/mman.h>
@@ -54,24 +55,22 @@ typedef struct {
} rawsink_shared_s;
typedef struct {
char *mem_name;
char *signal_name;
char *lock_name;
bool server;
bool rm;
unsigned timeout;
int fd;
rawsink_shared_s *shared;
char *mem_name;
char *sig_name;
sem_t *signal_sem;
sem_t *lock_sem;
int fd;
rawsink_shared_s *mem;
sem_t *sig_sem;
bool rm;
bool server;
bool server_failed;
bool server_failed;
} rawsink_s;
rawsink_s *rawsink_init(const char *name, mode_t mode, bool rm, bool server);
rawsink_s *rawsink_init(const char *name, bool server, mode_t mode, bool rm, unsigned timeout);
void rawsink_destroy(rawsink_s *rawsink);
void rawsink_server_put(
@@ -84,5 +83,4 @@ int rawsink_client_get(
rawsink_s *rawsink,
char *data, size_t *size,
unsigned *format, unsigned *width, unsigned *height,
long double *grab_ts,
long double timeout);
long double *grab_ts);

View File

@@ -86,6 +86,7 @@ enum _OPT_VALUES {
_O_RAWSINK,
_O_RAWSINK_MODE,
_O_RAWSINK_RM,
_O_RAWSINK_TIMEOUT,
#endif
#ifdef WITH_GPIO
@@ -170,6 +171,7 @@ static const struct option _LONG_OPTS[] = {
{"raw-sink", required_argument, NULL, _O_RAWSINK},
{"raw-sink-mode", required_argument, NULL, _O_RAWSINK_MODE},
{"raw-sink-rm", no_argument, NULL, _O_RAWSINK_RM},
{"raw-sink-timeout", required_argument, NULL, _O_RAWSINK_TIMEOUT},
#endif
#ifdef WITH_GPIO
@@ -400,9 +402,10 @@ int options_parse(options_s *options, device_s *dev, encoder_s *encoder, stream_
case _O_SERVER_TIMEOUT: OPT_NUMBER("--server-timeout", server->timeout, 1, 60, 0);
# ifdef WITH_RAWSINK
case _O_RAWSINK: OPT_SET(stream->rawsink_name, optarg);
case _O_RAWSINK_MODE: OPT_NUMBER("--raw-sink-mode", stream->rawsink_mode, INT_MIN, INT_MAX, 8);
case _O_RAWSINK_RM: OPT_SET(stream->rawsink_rm, true);
case _O_RAWSINK: OPT_SET(stream->rawsink_name, optarg);
case _O_RAWSINK_MODE: OPT_NUMBER("--raw-sink-mode", stream->rawsink_mode, INT_MIN, INT_MAX, 8);
case _O_RAWSINK_RM: OPT_SET(stream->rawsink_rm, true);
case _O_RAWSINK_TIMEOUT: OPT_NUMBER("--raw-sink-timeout", server->timeout, 1, 60, 0);
# endif
# ifdef WITH_GPIO
@@ -666,10 +669,11 @@ static void _help(device_s *dev, encoder_s *encoder, stream_s *stream, server_s
#ifdef WITH_RAWSINK
printf("RAW sink options:\n");
printf("═════════════════\n");
printf(" --raw-sink <name> ────── Use the shared memory to sink RAW frames before encoding.\n");
printf(" Most likely you will never need it. Default: disabled.\n\n");
printf(" --raw-sink-mode <mode> ─ Set RAW sink permissions (like 777). Default: %o.\n\n", stream->rawsink_mode);
printf(" --raw-sink-rm ────────── Remove shared memory on stop. Default: disabled.\n\n");
printf(" --raw-sink <name> ──────── Use the shared memory to sink RAW frames before encoding.\n");
printf(" Most likely you will never need it. Default: disabled.\n\n");
printf(" --raw-sink-mode <mode> ─── Set RAW sink permissions (like 777). Default: %o.\n\n", stream->rawsink_mode);
printf(" --raw-sink-rm ──────────── Remove shared memory on stop. Default: disabled.\n\n");
printf(" --raw-sink-timeout <sec> ─ Timeout for lock. Default: %u.\n\n", stream->rawsink_timeout);
#endif
#ifdef WITH_GPIO
printf("GPIO options:\n");

View File

@@ -103,6 +103,7 @@ stream_s *stream_init(device_s *dev, encoder_s *encoder) {
# ifdef WITH_RAWSINK
stream->rawsink_name = "";
stream->rawsink_mode = 0660;
stream->rawsink_timeout = 1;
# endif
stream->proc = proc;
stream->video = video;
@@ -129,7 +130,13 @@ void stream_loop(stream_s *stream) {
# ifdef WITH_RAWSINK
rawsink_s *rawsink = NULL;
if (stream->rawsink_name[0] != '\0') {
rawsink = rawsink_init(stream->rawsink_name, stream->rawsink_mode, stream->rawsink_rm, true);
rawsink = rawsink_init(
stream->rawsink_name,
true,
stream->rawsink_mode,
stream->rawsink_rm,
stream->rawsink_timeout
);
}
# endif

View File

@@ -69,6 +69,7 @@ typedef struct {
char *rawsink_name;
mode_t rawsink_mode;
bool rawsink_rm;
unsigned rawsink_timeout;
# endif
device_s *dev;