mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-12 02:23:43 +00:00
simultaneous access to memsink
This commit is contained in:
@@ -23,71 +23,44 @@
|
|||||||
#include "memsink.h"
|
#include "memsink.h"
|
||||||
|
|
||||||
|
|
||||||
static int _sem_timedwait_monotonic(sem_t *sem, long double timeout);
|
|
||||||
static int _flock_timedwait_monotonic(int fd, long double timeout);
|
static int _flock_timedwait_monotonic(int fd, long double timeout);
|
||||||
|
|
||||||
|
|
||||||
memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_t mode, bool rm, unsigned timeout) {
|
memsink_s *memsink_init(const char *name, const char *obj, bool server, mode_t mode, bool rm, unsigned timeout) {
|
||||||
memsink_s *sink;
|
memsink_s *sink;
|
||||||
A_CALLOC(sink, 1);
|
A_CALLOC(sink, 1);
|
||||||
sink->name = name;
|
sink->name = name;
|
||||||
|
sink->obj = obj;
|
||||||
sink->server = server;
|
sink->server = server;
|
||||||
sink->rm = rm;
|
sink->rm = rm;
|
||||||
sink->timeout = timeout;
|
sink->timeout = timeout;
|
||||||
sink->fd = -1;
|
sink->fd = -1;
|
||||||
sink->mem = MAP_FAILED;
|
sink->mem = MAP_FAILED;
|
||||||
sink->sig_sem = SEM_FAILED;
|
|
||||||
|
|
||||||
A_CALLOC(sink->mem_name, strlen(prefix) + 8);
|
LOG_INFO("Using %s sink: %s", name, name);
|
||||||
A_CALLOC(sink->sig_name, strlen(prefix) + 8);
|
|
||||||
|
|
||||||
sprintf(sink->mem_name, "%s.mem", prefix);
|
if ((sink->fd = shm_open(sink->obj, (server ? O_RDWR | O_CREAT : O_RDWR), mode)) == -1) {
|
||||||
sprintf(sink->sig_name, "%s.sig", prefix);
|
LOG_PERROR("%s sink: Can't open shared memory", name);
|
||||||
|
goto error;
|
||||||
LOG_INFO("Using '%s' sink: %s.{mem,sig}", name, prefix);
|
|
||||||
|
|
||||||
const int flags = (server ? O_RDWR | O_CREAT : O_RDWR);
|
|
||||||
# define OPEN_SIGNAL { \
|
|
||||||
if ((sink->sig_sem = sem_open(sink->sig_name, flags, mode, 0)) == SEM_FAILED) { \
|
|
||||||
LOG_PERROR("Can't open %s sink signal semaphore", name); \
|
|
||||||
goto error; \
|
|
||||||
} \
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!server) {
|
|
||||||
OPEN_SIGNAL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // Shared memory
|
if (sink->server && ftruncate(sink->fd, sizeof(memsink_shared_s)) < 0) {
|
||||||
if ((sink->fd = shm_open(sink->mem_name, flags, mode)) == -1) {
|
LOG_PERROR("%s sink: Can't truncate shared memory", name);
|
||||||
LOG_PERROR("Can't open %s sink memory", name);
|
goto error;
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sink->server && ftruncate(sink->fd, sizeof(memsink_shared_s)) < 0) {
|
|
||||||
LOG_PERROR("Can't truncate %s sink memory", name);
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((sink->mem = mmap(
|
|
||||||
NULL,
|
|
||||||
sizeof(memsink_shared_s),
|
|
||||||
PROT_READ | PROT_WRITE,
|
|
||||||
MAP_SHARED,
|
|
||||||
sink->fd,
|
|
||||||
0
|
|
||||||
)) == MAP_FAILED) {
|
|
||||||
LOG_PERROR("Can't mmap %s sink memory", name);
|
|
||||||
goto error;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server) {
|
if ((sink->mem = mmap(
|
||||||
OPEN_SIGNAL;
|
NULL,
|
||||||
|
sizeof(memsink_shared_s),
|
||||||
|
PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED,
|
||||||
|
sink->fd,
|
||||||
|
0
|
||||||
|
)) == MAP_FAILED) {
|
||||||
|
LOG_PERROR("%s sink: Can't mmap shared memory", name);
|
||||||
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
# undef OPEN_SIGNAL
|
|
||||||
|
|
||||||
return sink;
|
return sink;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
@@ -96,36 +69,21 @@ memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_
|
|||||||
}
|
}
|
||||||
|
|
||||||
void memsink_destroy(memsink_s *sink) {
|
void memsink_destroy(memsink_s *sink) {
|
||||||
if (sink->sig_sem != SEM_FAILED) {
|
|
||||||
if (sem_close(sink->sig_sem) < 0) {
|
|
||||||
LOG_PERROR("Can't close %s sink signal semaphore", sink->name);
|
|
||||||
}
|
|
||||||
if (sink->rm && sem_unlink(sink->sig_name) < 0) {
|
|
||||||
if (errno != ENOENT) {
|
|
||||||
LOG_PERROR("Can't remove %s sink signal semaphore", sink->name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sink->mem != MAP_FAILED) {
|
if (sink->mem != MAP_FAILED) {
|
||||||
if (munmap(sink->mem, sizeof(memsink_shared_s)) < 0) {
|
if (munmap(sink->mem, sizeof(memsink_shared_s)) < 0) {
|
||||||
LOG_PERROR("Can't unmap %s sink memory", sink->name);
|
LOG_PERROR("%s sink: Can't unmap shared memory", sink->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sink->fd >= 0) {
|
if (sink->fd >= 0) {
|
||||||
if (close(sink->fd) < 0) {
|
if (close(sink->fd) < 0) {
|
||||||
LOG_PERROR("Can't close %s sink fd", sink->name);
|
LOG_PERROR("%s sink: Can't close shared memory fd", sink->name);
|
||||||
}
|
}
|
||||||
if (sink->rm && shm_unlink(sink->mem_name) < 0) {
|
if (sink->rm && shm_unlink(sink->obj) < 0) {
|
||||||
if (errno != ENOENT) {
|
if (errno != ENOENT) {
|
||||||
LOG_PERROR("Can't remove %s sink memory", sink->name);
|
LOG_PERROR("%s sink: Can't remove shared memory", sink->name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
free(sink->sig_name);
|
|
||||||
free(sink->mem_name);
|
|
||||||
free(sink);
|
free(sink);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,12 +101,8 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) {
|
|||||||
if (_flock_timedwait_monotonic(sink->fd, 1) == 0) {
|
if (_flock_timedwait_monotonic(sink->fd, 1) == 0) {
|
||||||
LOG_VERBOSE("%s sink: >>>>> Exposing new frame ...", sink->name);
|
LOG_VERBOSE("%s sink: >>>>> Exposing new frame ...", sink->name);
|
||||||
|
|
||||||
if (sem_trywait(sink->sig_sem) < 0 && errno != EAGAIN) {
|
|
||||||
LOG_PERROR("%s sink: Can't wait signal semaphore", sink->name);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
# define COPY(_field) sink->mem->_field = frame->_field
|
# define COPY(_field) sink->mem->_field = frame->_field
|
||||||
|
sink->mem->seq += 1;
|
||||||
COPY(used);
|
COPY(used);
|
||||||
COPY(width);
|
COPY(width);
|
||||||
COPY(height);
|
COPY(height);
|
||||||
@@ -161,10 +115,6 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) {
|
|||||||
memcpy(sink->mem->data, frame->data, frame->used);
|
memcpy(sink->mem->data, frame->data, frame->used);
|
||||||
# undef COPY
|
# undef COPY
|
||||||
|
|
||||||
if (sem_post(sink->sig_sem) < 0) {
|
|
||||||
LOG_PERROR("%s sink: Can't post signal semaphore", sink->name);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (flock(sink->fd, LOCK_UN) < 0) {
|
if (flock(sink->fd, LOCK_UN) < 0) {
|
||||||
LOG_PERROR("%s sink: Can't unlock memory", sink->name);
|
LOG_PERROR("%s sink: Can't unlock memory", sink->name);
|
||||||
return -1;
|
return -1;
|
||||||
@@ -185,13 +135,6 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) {
|
|||||||
int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress unusedFunction
|
int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress unusedFunction
|
||||||
assert(!sink->server); // Client only
|
assert(!sink->server); // Client only
|
||||||
|
|
||||||
if (_sem_timedwait_monotonic(sink->sig_sem, sink->timeout) < 0) {
|
|
||||||
if (errno == EAGAIN) {
|
|
||||||
return -2;
|
|
||||||
}
|
|
||||||
LOG_PERROR("%s sink: Can't wait signal semaphore", sink->name);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (_flock_timedwait_monotonic(sink->fd, sink->timeout) < 0) {
|
if (_flock_timedwait_monotonic(sink->fd, sink->timeout) < 0) {
|
||||||
if (errno == EWOULDBLOCK) {
|
if (errno == EWOULDBLOCK) {
|
||||||
return -2;
|
return -2;
|
||||||
@@ -200,37 +143,35 @@ int memsink_client_get(memsink_s *sink, frame_s *frame) { // cppcheck-suppress u
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
# define COPY(_field) frame->_field = sink->mem->_field
|
bool same = false;
|
||||||
COPY(width);
|
|
||||||
COPY(height);
|
if (sink->client_seq == sink->mem->seq) {
|
||||||
COPY(format);
|
same = true;
|
||||||
COPY(stride);
|
} else {
|
||||||
COPY(online);
|
# define COPY(_field) frame->_field = sink->mem->_field
|
||||||
COPY(grab_ts);
|
sink->client_seq = sink->mem->seq;
|
||||||
COPY(encode_begin_ts);
|
COPY(width);
|
||||||
COPY(encode_end_ts);
|
COPY(height);
|
||||||
frame_set_data(frame, sink->mem->data, sink->mem->used);
|
COPY(format);
|
||||||
# undef COPY
|
COPY(stride);
|
||||||
|
COPY(online);
|
||||||
|
COPY(grab_ts);
|
||||||
|
COPY(encode_begin_ts);
|
||||||
|
COPY(encode_end_ts);
|
||||||
|
frame_set_data(frame, sink->mem->data, sink->mem->used);
|
||||||
|
# undef COPY
|
||||||
|
}
|
||||||
|
|
||||||
if (flock(sink->fd, LOCK_UN) < 0) {
|
if (flock(sink->fd, LOCK_UN) < 0) {
|
||||||
LOG_PERROR("%s sink: Can't unlock memory", sink->name);
|
LOG_PERROR("%s sink: Can't unlock memory", sink->name);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int _sem_timedwait_monotonic(sem_t *sem, long double timeout) {
|
if (same) {
|
||||||
long double deadline_ts = get_now_monotonic() + timeout;
|
|
||||||
int retval = -1;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
retval = sem_trywait(sem);
|
|
||||||
if (retval == 0 || errno != EAGAIN || get_now_monotonic() > deadline_ts) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
usleep(1000);
|
usleep(1000);
|
||||||
|
return -2;
|
||||||
}
|
}
|
||||||
return retval;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int _flock_timedwait_monotonic(int fd, long double timeout) {
|
static int _flock_timedwait_monotonic(int fd, long double timeout) {
|
||||||
|
|||||||
@@ -26,7 +26,6 @@
|
|||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <semaphore.h>
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
@@ -47,6 +46,7 @@
|
|||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
uint64_t seq;
|
||||||
size_t used;
|
size_t used;
|
||||||
unsigned width;
|
unsigned width;
|
||||||
unsigned height;
|
unsigned height;
|
||||||
@@ -61,20 +61,18 @@ typedef struct {
|
|||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
const char *name;
|
const char *name;
|
||||||
|
const char *obj;
|
||||||
bool server;
|
bool server;
|
||||||
bool rm;
|
bool rm;
|
||||||
unsigned timeout;
|
unsigned timeout;
|
||||||
|
|
||||||
char *mem_name;
|
|
||||||
char *sig_name;
|
|
||||||
|
|
||||||
int fd;
|
int fd;
|
||||||
memsink_shared_s *mem;
|
memsink_shared_s *mem;
|
||||||
sem_t *sig_sem;
|
uint64_t client_seq;
|
||||||
} memsink_s;
|
} memsink_s;
|
||||||
|
|
||||||
|
|
||||||
memsink_s *memsink_init(const char *name, const char *prefix, bool server, mode_t mode, bool rm, unsigned timeout);
|
memsink_s *memsink_init(const char *name, const char *obj, bool server, mode_t mode, bool rm, unsigned timeout);
|
||||||
void memsink_destroy(memsink_s *sink);
|
void memsink_destroy(memsink_s *sink);
|
||||||
|
|
||||||
int memsink_server_put(memsink_s *sink, const frame_s *frame);
|
int memsink_server_put(memsink_s *sink, const frame_s *frame);
|
||||||
|
|||||||
@@ -482,7 +482,7 @@ int options_parse(options_s *options, device_s *dev, encoder_s *enc, stream_s *s
|
|||||||
# define ADD_SINK(_lower, _upper) { \
|
# define ADD_SINK(_lower, _upper) { \
|
||||||
if (_lower##_sink_name && _lower##_sink_name[0] != '\0') { \
|
if (_lower##_sink_name && _lower##_sink_name[0] != '\0') { \
|
||||||
options->_lower##_sink = memsink_init( \
|
options->_lower##_sink = memsink_init( \
|
||||||
#_lower, \
|
#_upper, \
|
||||||
_lower##_sink_name, \
|
_lower##_sink_name, \
|
||||||
true, \
|
true, \
|
||||||
_lower##_sink_mode, \
|
_lower##_sink_mode, \
|
||||||
|
|||||||
Reference in New Issue
Block a user