refactoring

This commit is contained in:
Devaev Maxim
2020-12-07 19:16:11 +03:00
parent 338389c219
commit e7ad86ded9
5 changed files with 405 additions and 405 deletions

View File

@@ -166,14 +166,14 @@ void rawsink_put(
goto error;
}
# define PICTURE(_next) rawsink->picture->_next
PICTURE(format) = format;
PICTURE(width) = width;
PICTURE(height) = height;
PICTURE(grab_ts) = grab_ts;
PICTURE(used) = size;
memcpy(PICTURE(data), data, size);
# undef PICTURE
# define PIC(_next) rawsink->picture->_next
PIC(format) = format;
PIC(width) = width;
PIC(height) = height;
PIC(grab_ts) = grab_ts;
PIC(used) = size;
memcpy(PIC(data), data, size);
# undef PIC
if (sem_post(rawsink->signal_sem) < 0) {
LOG_PERROR("RAWSINK: Can't post %s", rawsink->signal_name);

View File

@@ -91,8 +91,12 @@ static void _device_open_alloc_picbufs(struct device_t *dev);
static int _device_apply_resolution(struct device_t *dev, unsigned width, unsigned height);
static void _device_apply_controls(struct device_t *dev);
static int _device_query_control(struct device_t *dev, struct v4l2_queryctrl *query, const char *name, unsigned cid, bool quiet);
static void _device_set_control(struct device_t *dev, struct v4l2_queryctrl *query, const char *name, unsigned cid, int value, bool quiet);
static int _device_query_control(
struct device_t *dev, struct v4l2_queryctrl *query,
const char *name, unsigned cid, bool quiet);
static void _device_set_control(
struct device_t *dev, struct v4l2_queryctrl *query,
const char *name, unsigned cid, int value, bool quiet);
static const char *_format_to_string_fourcc(char *buf, size_t size, unsigned format);
static const char *_format_to_string_nullable(unsigned format);
@@ -101,6 +105,9 @@ static const char *_standard_to_string(v4l2_std_id standard);
static const char *_io_method_to_string_supported(enum v4l2_memory io_method);
# define RUN(_next) dev->run->_next
struct device_t *device_init(void) {
struct device_runtime_t *run;
struct device_t *dev;
@@ -167,11 +174,11 @@ int device_parse_io_method(const char *str) {
}
int device_open(struct device_t *dev) {
if ((dev->run->fd = open(dev->path, O_RDWR|O_NONBLOCK)) < 0) {
if ((RUN(fd) = open(dev->path, O_RDWR|O_NONBLOCK)) < 0) {
LOG_PERROR("Can't open device");
goto error;
}
LOG_INFO("Device fd=%d opened", dev->run->fd);
LOG_INFO("Device fd=%d opened", RUN(fd));
if (_device_open_check_cap(dev) < 0) {
goto error;
@@ -192,9 +199,9 @@ int device_open(struct device_t *dev) {
_device_open_alloc_picbufs(dev);
_device_apply_controls(dev);
dev->run->n_workers = min_u(dev->run->n_buffers, dev->n_workers);
RUN(n_workers) = min_u(RUN(n_buffers), dev->n_workers);
LOG_DEBUG("Device fd=%d initialized", dev->run->fd);
LOG_DEBUG("Device fd=%d initialized", RUN(fd));
return 0;
error:
@@ -203,67 +210,67 @@ int device_open(struct device_t *dev) {
}
void device_close(struct device_t *dev) {
dev->run->persistent_timeout_reported = false;
dev->run->n_workers = 0;
RUN(persistent_timeout_reported) = false;
RUN(n_workers) = 0;
if (dev->run->pictures) {
if (RUN(pictures)) {
LOG_DEBUG("Releasing picture buffers ...");
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
picture_destroy(dev->run->pictures[index]);
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
picture_destroy(RUN(pictures[index]));
}
free(dev->run->pictures);
dev->run->pictures = NULL;
free(RUN(pictures));
RUN(pictures) = NULL;
}
if (dev->run->hw_buffers) {
if (RUN(hw_buffers)) {
LOG_DEBUG("Releasing device buffers ...");
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
# define HW_BUFFER(_next) dev->run->hw_buffers[index]._next
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
# define HW(_next) RUN(hw_buffers)[index]._next
if (dev->io_method == V4L2_MEMORY_MMAP) {
if (HW_BUFFER(allocated) > 0 && HW_BUFFER(data) != MAP_FAILED) {
if (munmap(HW_BUFFER(data), HW_BUFFER(allocated)) < 0) {
if (HW(allocated) > 0 && HW(data) != MAP_FAILED) {
if (munmap(HW(data), HW(allocated)) < 0) {
LOG_PERROR("Can't unmap device buffer %u", index);
}
}
} else { // V4L2_MEMORY_USERPTR
if (HW_BUFFER(data)) {
free(HW_BUFFER(data));
if (HW(data)) {
free(HW(data));
}
}
A_MUTEX_DESTROY(&HW_BUFFER(grabbed_mutex));
A_MUTEX_DESTROY(&HW(grabbed_mutex));
# undef HW_BUFFER
# undef HW
}
dev->run->n_buffers = 0;
free(dev->run->hw_buffers);
dev->run->hw_buffers = NULL;
RUN(n_buffers) = 0;
free(RUN(hw_buffers));
RUN(hw_buffers) = NULL;
}
if (dev->run->fd >= 0) {
if (RUN(fd) >= 0) {
LOG_DEBUG("Closing device ...");
if (close(dev->run->fd) < 0) {
LOG_PERROR("Can't close device fd=%d", dev->run->fd);
if (close(RUN(fd)) < 0) {
LOG_PERROR("Can't close device fd=%d", RUN(fd));
} else {
LOG_INFO("Device fd=%d closed", dev->run->fd);
LOG_INFO("Device fd=%d closed", RUN(fd));
}
dev->run->fd = -1;
RUN(fd) = -1;
}
}
int device_switch_capturing(struct device_t *dev, bool enable) {
if (enable != dev->run->capturing) {
if (enable != RUN(capturing)) {
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
LOG_DEBUG("Calling ioctl(%s) ...", (enable ? "VIDIOC_STREAMON" : "VIDIOC_STREAMOFF"));
if (xioctl(dev->run->fd, (enable ? VIDIOC_STREAMON : VIDIOC_STREAMOFF), &type) < 0) {
if (xioctl(RUN(fd), (enable ? VIDIOC_STREAMON : VIDIOC_STREAMOFF), &type) < 0) {
LOG_PERROR("Unable to %s capturing", (enable ? "start" : "stop"));
if (enable) {
return -1;
}
}
dev->run->capturing = enable;
RUN(capturing) = enable;
LOG_INFO("Capturing %s", (enable ? "started" : "stopped"));
}
return 0;
@@ -274,7 +281,7 @@ int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *h
int retval;
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(dev->run->fd, &_set);
fd_set _set; FD_ZERO(&_set); FD_SET(RUN(fd), &_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
@@ -287,11 +294,11 @@ int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *h
LOG_DEBUG("Calling select() on video device ...");
retval = select(dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
retval = select(RUN(fd) + 1, &read_fds, &write_fds, &error_fds, &timeout);
if (retval > 0) {
*has_read = FD_ISSET(dev->run->fd, &read_fds);
*has_write = FD_ISSET(dev->run->fd, &write_fds);
*has_error = FD_ISSET(dev->run->fd, &error_fds);
*has_read = FD_ISSET(RUN(fd), &read_fds);
*has_write = FD_ISSET(RUN(fd), &write_fds);
*has_error = FD_ISSET(RUN(fd), &error_fds);
} else {
*has_read = false;
*has_write = false;
@@ -300,12 +307,12 @@ int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *h
LOG_DEBUG("Device select() --> %d", retval);
if (retval > 0) {
dev->run->persistent_timeout_reported = false;
RUN(persistent_timeout_reported) = false;
} else if (retval == 0) {
if (dev->persistent) {
if (!dev->run->persistent_timeout_reported) {
if (!RUN(persistent_timeout_reported)) {
LOG_ERROR("Persistent device timeout (unplugged)");
dev->run->persistent_timeout_reported = true;
RUN(persistent_timeout_reported) = true;
}
} else {
// Если устройство не персистентное, то таймаут является ошибкой
@@ -323,7 +330,7 @@ int device_grab_buffer(struct device_t *dev) {
buf_info.memory = dev->io_method;
LOG_DEBUG("Grabbing device buffer ...");
if (xioctl(dev->run->fd, VIDIOC_DQBUF, &buf_info) < 0) {
if (xioctl(RUN(fd), VIDIOC_DQBUF, &buf_info) < 0) {
LOG_PERROR("Unable to grab device buffer");
return -1;
}
@@ -331,9 +338,9 @@ int device_grab_buffer(struct device_t *dev) {
LOG_DEBUG("Grabbed new frame in device buffer: index=%u, bytesused=%u",
buf_info.index, buf_info.bytesused);
if (buf_info.index >= dev->run->n_buffers) {
if (buf_info.index >= RUN(n_buffers)) {
LOG_ERROR("V4L2 error: grabbed invalid device buffer: index=%u, nbuffers=%u",
buf_info.index, dev->run->n_buffers);
buf_info.index, RUN(n_buffers));
return -1;
}
@@ -345,49 +352,49 @@ int device_grab_buffer(struct device_t *dev) {
if (buf_info.bytesused < dev->min_frame_size) {
LOG_DEBUG("Dropped too small frame sized %d bytes, assuming it was broken", buf_info.bytesused);
LOG_DEBUG("Releasing device buffer index=%u (broken frame) ...", buf_info.index);
if (xioctl(dev->run->fd, VIDIOC_QBUF, &buf_info) < 0) {
if (xioctl(RUN(fd), VIDIOC_QBUF, &buf_info) < 0) {
LOG_PERROR("Unable to release device buffer index=%u (broken frame)", buf_info.index);
return -1;
}
return -2;
}
# define HW_BUFFER(_next) dev->run->hw_buffers[buf_info.index]._next
# define HW(_next) RUN(hw_buffers)[buf_info.index]._next
A_MUTEX_LOCK(&HW_BUFFER(grabbed_mutex));
if (HW_BUFFER(grabbed)) {
A_MUTEX_LOCK(&HW(grabbed_mutex));
if (HW(grabbed)) {
LOG_ERROR("V4L2 error: grabbed device buffer is already used: index=%u, bytesused=%u",
buf_info.index, buf_info.bytesused);
A_MUTEX_UNLOCK(&HW_BUFFER(grabbed_mutex));
A_MUTEX_UNLOCK(&HW(grabbed_mutex));
return -1;
}
HW_BUFFER(grabbed) = true;
A_MUTEX_UNLOCK(&HW_BUFFER(grabbed_mutex));
HW(grabbed) = true;
A_MUTEX_UNLOCK(&HW(grabbed_mutex));
HW_BUFFER(used) = buf_info.bytesused;
memcpy(&HW_BUFFER(buf_info), &buf_info, sizeof(struct v4l2_buffer));
dev->run->pictures[buf_info.index]->grab_ts = get_now_monotonic();
HW(used) = buf_info.bytesused;
memcpy(&HW(buf_info), &buf_info, sizeof(struct v4l2_buffer));
RUN(pictures)[buf_info.index]->grab_ts = get_now_monotonic();
# undef HW_BUFFER
# undef HW
return buf_info.index;
}
int device_release_buffer(struct device_t *dev, unsigned index) {
# define HW_BUFFER(_next) dev->run->hw_buffers[index]._next
# define HW(_next) RUN(hw_buffers)[index]._next
LOG_DEBUG("Releasing device buffer index=%u ...", index);
A_MUTEX_LOCK(&HW_BUFFER(grabbed_mutex));
if (xioctl(dev->run->fd, VIDIOC_QBUF, &HW_BUFFER(buf_info)) < 0) {
A_MUTEX_LOCK(&HW(grabbed_mutex));
if (xioctl(RUN(fd), VIDIOC_QBUF, &HW(buf_info)) < 0) {
LOG_PERROR("Unable to release device buffer index=%u", index);
A_MUTEX_UNLOCK(&HW_BUFFER(grabbed_mutex));
A_MUTEX_UNLOCK(&HW(grabbed_mutex));
return -1;
}
HW_BUFFER(grabbed) = false;
A_MUTEX_UNLOCK(&HW_BUFFER(grabbed_mutex));
HW_BUFFER(used) = 0;
HW(grabbed) = false;
A_MUTEX_UNLOCK(&HW(grabbed_mutex));
HW(used) = 0;
# undef HW_BUFFER
# undef HW
return 0;
}
@@ -395,7 +402,7 @@ int device_consume_event(struct device_t *dev) {
struct v4l2_event event;
LOG_DEBUG("Calling ioctl(VIDIOC_DQEVENT) ...");
if (xioctl(dev->run->fd, VIDIOC_DQEVENT, &event) == 0) {
if (xioctl(RUN(fd), VIDIOC_DQEVENT, &event) == 0) {
switch (event.type) {
case V4L2_EVENT_SOURCE_CHANGE:
LOG_INFO("Got V4L2_EVENT_SOURCE_CHANGE: source changed");
@@ -417,7 +424,7 @@ static int _device_open_check_cap(struct device_t *dev) {
MEMSET_ZERO(cap);
LOG_DEBUG("Calling ioctl(VIDIOC_QUERYCAP) ...");
if (xioctl(dev->run->fd, VIDIOC_QUERYCAP, &cap) < 0) {
if (xioctl(RUN(fd), VIDIOC_QUERYCAP, &cap) < 0) {
LOG_PERROR("Can't query device (VIDIOC_QUERYCAP)");
return -1;
}
@@ -433,14 +440,14 @@ static int _device_open_check_cap(struct device_t *dev) {
}
LOG_INFO("Using input channel: %d", input);
if (xioctl(dev->run->fd, VIDIOC_S_INPUT, &input) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_INPUT, &input) < 0) {
LOG_ERROR("Can't set input channel");
return -1;
}
if (dev->standard != V4L2_STD_UNKNOWN) {
LOG_INFO("Using TV standard: %s", _standard_to_string(dev->standard));
if (xioctl(dev->run->fd, VIDIOC_S_STD, &dev->standard) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_STD, &dev->standard) < 0) {
LOG_ERROR("Can't set video standard");
return -1;
}
@@ -465,7 +472,7 @@ static int _device_open_dv_timings(struct device_t *dev) {
sub.type = V4L2_EVENT_SOURCE_CHANGE;
LOG_DEBUG("Calling ioctl(VIDIOC_SUBSCRIBE_EVENT) ...");
if (xioctl(dev->run->fd, VIDIOC_SUBSCRIBE_EVENT, &sub) < 0) {
if (xioctl(RUN(fd), VIDIOC_SUBSCRIBE_EVENT, &sub) < 0) {
LOG_PERROR("Can't subscribe to V4L2_EVENT_SOURCE_CHANGE");
return -1;
}
@@ -479,12 +486,12 @@ static int _device_apply_dv_timings(struct device_t *dev) {
MEMSET_ZERO(dv);
LOG_DEBUG("Calling ioctl(VIDIOC_QUERY_DV_TIMINGS) ...");
if (xioctl(dev->run->fd, VIDIOC_QUERY_DV_TIMINGS, &dv) == 0) {
if (xioctl(RUN(fd), VIDIOC_QUERY_DV_TIMINGS, &dv) == 0) {
LOG_INFO("Got new DV timings: resolution=%ux%u, pixclk=%llu",
dv.bt.width, dv.bt.height, (unsigned long long)dv.bt.pixelclock); // Issue #11
LOG_DEBUG("Calling ioctl(VIDIOC_S_DV_TIMINGS) ...");
if (xioctl(dev->run->fd, VIDIOC_S_DV_TIMINGS, &dv) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_DV_TIMINGS, &dv) < 0) {
LOG_PERROR("Failed to set DV timings");
return -1;
}
@@ -495,9 +502,9 @@ static int _device_apply_dv_timings(struct device_t *dev) {
} else {
LOG_DEBUG("Calling ioctl(VIDIOC_QUERYSTD) ...");
if (xioctl(dev->run->fd, VIDIOC_QUERYSTD, &dev->standard) == 0) {
if (xioctl(RUN(fd), VIDIOC_QUERYSTD, &dev->standard) == 0) {
LOG_INFO("Applying the new VIDIOC_S_STD: %s ...", _standard_to_string(dev->standard));
if (xioctl(dev->run->fd, VIDIOC_S_STD, &dev->standard) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_STD, &dev->standard) < 0) {
LOG_PERROR("Can't set video standard");
return -1;
}
@@ -511,29 +518,27 @@ static int _device_open_format(struct device_t *dev) {
MEMSET_ZERO(fmt);
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.width = dev->run->width;
fmt.fmt.pix.height = dev->run->height;
fmt.fmt.pix.width = RUN(width);
fmt.fmt.pix.height = RUN(height);
fmt.fmt.pix.pixelformat = dev->format;
fmt.fmt.pix.field = V4L2_FIELD_ANY;
// Set format
LOG_DEBUG("Calling ioctl(VIDIOC_S_FMT) ...");
if (xioctl(dev->run->fd, VIDIOC_S_FMT, &fmt) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_FMT, &fmt) < 0) {
LOG_PERROR("Unable to set pixelformat=%s, resolution=%ux%u",
_format_to_string_supported(dev->format),
dev->run->width,
dev->run->height);
_format_to_string_supported(dev->format), RUN(width), RUN(height));
return -1;
}
// Check resolution
if (fmt.fmt.pix.width != dev->run->width || fmt.fmt.pix.height != dev->run->height) {
LOG_ERROR("Requested resolution=%ux%u is unavailable", dev->run->width, dev->run->height);
if (fmt.fmt.pix.width != RUN(width) || fmt.fmt.pix.height != RUN(height)) {
LOG_ERROR("Requested resolution=%ux%u is unavailable", RUN(width), RUN(height));
}
if (_device_apply_resolution(dev, fmt.fmt.pix.width, fmt.fmt.pix.height) < 0) {
return -1;
}
LOG_INFO("Using resolution: %ux%u", dev->run->width, dev->run->height);
LOG_INFO("Using resolution: %ux%u", RUN(width), RUN(height));
// Check format
if (fmt.fmt.pix.pixelformat != dev->format) {
@@ -553,23 +558,23 @@ static int _device_open_format(struct device_t *dev) {
}
}
dev->run->format = fmt.fmt.pix.pixelformat;
LOG_INFO("Using pixelformat: %s", _format_to_string_supported(dev->run->format));
RUN(format) = fmt.fmt.pix.pixelformat;
LOG_INFO("Using pixelformat: %s", _format_to_string_supported(RUN(format)));
dev->run->raw_size = fmt.fmt.pix.sizeimage; // Only for userptr
RUN(raw_size) = fmt.fmt.pix.sizeimage; // Only for userptr
return 0;
}
static void _device_open_hw_fps(struct device_t *dev) {
struct v4l2_streamparm setfps;
dev->run->hw_fps = 0;
RUN(hw_fps) = 0;
MEMSET_ZERO(setfps);
setfps.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
LOG_DEBUG("Calling ioctl(VIDIOC_G_PARM) ...");
if (xioctl(dev->run->fd, VIDIOC_G_PARM, &setfps) < 0) {
if (xioctl(RUN(fd), VIDIOC_G_PARM, &setfps) < 0) {
if (errno == ENOTTY) { // Quiet message for Auvidea B101
LOG_INFO("Querying HW FPS changing is not supported");
} else {
@@ -590,7 +595,7 @@ static void _device_open_hw_fps(struct device_t *dev) {
SETFPS_TPF(numerator) = 1;
SETFPS_TPF(denominator) = (dev->desired_fps == 0 ? 255 : dev->desired_fps);
if (xioctl(dev->run->fd, VIDIOC_S_PARM, &setfps) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_PARM, &setfps) < 0) {
LOG_PERROR("Unable to set HW FPS");
return;
}
@@ -605,11 +610,11 @@ static void _device_open_hw_fps(struct device_t *dev) {
return;
}
dev->run->hw_fps = SETFPS_TPF(denominator);
if (dev->desired_fps != dev->run->hw_fps) {
LOG_INFO("Using HW FPS: %u -> %u (coerced)", dev->desired_fps, dev->run->hw_fps);
RUN(hw_fps) = SETFPS_TPF(denominator);
if (dev->desired_fps != RUN(hw_fps)) {
LOG_INFO("Using HW FPS: %u -> %u (coerced)", dev->desired_fps, RUN(hw_fps));
} else {
LOG_INFO("Using HW FPS: %u", dev->run->hw_fps);
LOG_INFO("Using HW FPS: %u", RUN(hw_fps));
}
# undef SETFPS_TPF
@@ -634,7 +639,7 @@ static int _device_open_io_method_mmap(struct device_t *dev) {
req.memory = V4L2_MEMORY_MMAP;
LOG_DEBUG("Calling ioctl(VIDIOC_REQBUFS) for V4L2_MEMORY_MMAP ...");
if (xioctl(dev->run->fd, VIDIOC_REQBUFS, &req) < 0) {
if (xioctl(RUN(fd), VIDIOC_REQBUFS, &req) < 0) {
LOG_PERROR("Device '%s' doesn't support V4L2_MEMORY_MMAP", dev->path);
return -1;
}
@@ -648,34 +653,40 @@ static int _device_open_io_method_mmap(struct device_t *dev) {
LOG_DEBUG("Allocating device buffers ...");
A_CALLOC(dev->run->hw_buffers, req.count);
for (dev->run->n_buffers = 0; dev->run->n_buffers < req.count; ++dev->run->n_buffers) {
A_CALLOC(RUN(hw_buffers), req.count);
for (RUN(n_buffers) = 0; RUN(n_buffers) < req.count; ++RUN(n_buffers)) {
struct v4l2_buffer buf_info;
MEMSET_ZERO(buf_info);
buf_info.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf_info.memory = V4L2_MEMORY_MMAP;
buf_info.index = dev->run->n_buffers;
buf_info.index = RUN(n_buffers);
LOG_DEBUG("Calling ioctl(VIDIOC_QUERYBUF) for device buffer %u ...", dev->run->n_buffers);
if (xioctl(dev->run->fd, VIDIOC_QUERYBUF, &buf_info) < 0) {
LOG_DEBUG("Calling ioctl(VIDIOC_QUERYBUF) for device buffer %u ...", RUN(n_buffers));
if (xioctl(RUN(fd), VIDIOC_QUERYBUF, &buf_info) < 0) {
LOG_PERROR("Can't VIDIOC_QUERYBUF");
return -1;
}
# define HW_BUFFER(_next) dev->run->hw_buffers[dev->run->n_buffers]._next
# define HW(_next) RUN(hw_buffers)[RUN(n_buffers)]._next
A_MUTEX_INIT(&HW_BUFFER(grabbed_mutex));
A_MUTEX_INIT(&HW(grabbed_mutex));
LOG_DEBUG("Mapping device buffer %u ...", dev->run->n_buffers);
HW_BUFFER(data) = mmap(NULL, buf_info.length, PROT_READ|PROT_WRITE, MAP_SHARED, dev->run->fd, buf_info.m.offset);
if (HW_BUFFER(data) == MAP_FAILED) {
LOG_PERROR("Can't map device buffer %u", dev->run->n_buffers);
LOG_DEBUG("Mapping device buffer %u ...", RUN(n_buffers));
if ((HW(data) = mmap(
NULL,
buf_info.length,
PROT_READ | PROT_WRITE,
MAP_SHARED,
RUN(fd),
buf_info.m.offset
)) == MAP_FAILED) {
LOG_PERROR("Can't map device buffer %u", RUN(n_buffers));
return -1;
}
HW_BUFFER(allocated) = buf_info.length;
HW(allocated) = buf_info.length;
# undef HW_BUFFER
# undef HW
}
return 0;
}
@@ -683,7 +694,7 @@ static int _device_open_io_method_mmap(struct device_t *dev) {
static int _device_open_io_method_userptr(struct device_t *dev) {
struct v4l2_requestbuffers req;
unsigned page_size = getpagesize();
unsigned buf_size = align_size(dev->run->raw_size, page_size);
unsigned buf_size = align_size(RUN(raw_size), page_size);
MEMSET_ZERO(req);
req.count = dev->n_buffers;
@@ -691,7 +702,7 @@ static int _device_open_io_method_userptr(struct device_t *dev) {
req.memory = V4L2_MEMORY_USERPTR;
LOG_DEBUG("Calling ioctl(VIDIOC_REQBUFS) for V4L2_MEMORY_USERPTR ...");
if (xioctl(dev->run->fd, VIDIOC_REQBUFS, &req) < 0) {
if (xioctl(RUN(fd), VIDIOC_REQBUFS, &req) < 0) {
LOG_PERROR("Device '%s' doesn't support V4L2_MEMORY_USERPTR", dev->path);
return -1;
}
@@ -705,21 +716,21 @@ static int _device_open_io_method_userptr(struct device_t *dev) {
LOG_DEBUG("Allocating device buffers ...");
A_CALLOC(dev->run->hw_buffers, req.count);
for (dev->run->n_buffers = 0; dev->run->n_buffers < req.count; ++dev->run->n_buffers) {
# define HW_BUFFER(_next) dev->run->hw_buffers[dev->run->n_buffers]._next
A_CALLOC(RUN(hw_buffers), req.count);
for (RUN(n_buffers) = 0; RUN(n_buffers) < req.count; ++RUN(n_buffers)) {
# define HW(_next) RUN(hw_buffers)[RUN(n_buffers)]._next
assert(HW_BUFFER(data) = aligned_alloc(page_size, buf_size));
memset(HW_BUFFER(data), 0, buf_size);
HW_BUFFER(allocated) = buf_size;
assert(HW(data) = aligned_alloc(page_size, buf_size));
memset(HW(data), 0, buf_size);
HW(allocated) = buf_size;
# undef HW_BUFFER
# undef HW
}
return 0;
}
static int _device_open_queue_buffers(struct device_t *dev) {
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
struct v4l2_buffer buf_info;
MEMSET_ZERO(buf_info);
@@ -727,12 +738,12 @@ static int _device_open_queue_buffers(struct device_t *dev) {
buf_info.memory = dev->io_method;
buf_info.index = index;
if (dev->io_method == V4L2_MEMORY_USERPTR) {
buf_info.m.userptr = (unsigned long)dev->run->hw_buffers[index].data;
buf_info.length = dev->run->hw_buffers[index].allocated;
buf_info.m.userptr = (unsigned long)RUN(hw_buffers)[index].data;
buf_info.length = RUN(hw_buffers)[index].allocated;
}
LOG_DEBUG("Calling ioctl(VIDIOC_QBUF) for buffer %u ...", index);
if (xioctl(dev->run->fd, VIDIOC_QBUF, &buf_info) < 0) {
if (xioctl(RUN(fd), VIDIOC_QBUF, &buf_info) < 0) {
LOG_PERROR("Can't VIDIOC_QBUF");
return -1;
}
@@ -741,15 +752,15 @@ static int _device_open_queue_buffers(struct device_t *dev) {
}
static void _device_open_alloc_picbufs(struct device_t *dev) {
size_t picture_size = picture_get_generous_size(dev->run->width, dev->run->height);
size_t picture_size = picture_get_generous_size(RUN(width), RUN(height));
LOG_DEBUG("Allocating picture buffers ...");
A_CALLOC(dev->run->pictures, dev->run->n_buffers);
A_CALLOC(RUN(pictures), RUN(n_buffers));
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
dev->run->pictures[index] = picture_init();
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
RUN(pictures)[index] = picture_init();
LOG_DEBUG("Pre-allocating picture buffer %u sized %zu bytes... ", index, picture_size);
picture_realloc_data(dev->run->pictures[index], picture_size);
picture_realloc_data(RUN(pictures)[index], picture_size);
}
}
@@ -764,8 +775,8 @@ static int _device_apply_resolution(struct device_t *dev, unsigned width, unsign
width, height, VIDEO_MAX_WIDTH, VIDEO_MAX_HEIGHT);
return -1;
}
dev->run->width = width;
dev->run->height = height;
RUN(width) = width;
RUN(height) = height;
return 0;
}
@@ -824,12 +835,15 @@ static void _device_apply_controls(struct device_t *dev) {
# undef SET_CID_VALUE
}
static int _device_query_control(struct device_t *dev, struct v4l2_queryctrl *query, const char *name, unsigned cid, bool quiet) {
static int _device_query_control(
struct device_t *dev, struct v4l2_queryctrl *query,
const char *name, unsigned cid, bool quiet) {
// cppcheck-suppress redundantPointerOp
MEMSET_ZERO(*query);
query->id = cid;
if (xioctl(dev->run->fd, VIDIOC_QUERYCTRL, query) < 0 || query->flags & V4L2_CTRL_FLAG_DISABLED) {
if (xioctl(RUN(fd), VIDIOC_QUERYCTRL, query) < 0 || query->flags & V4L2_CTRL_FLAG_DISABLED) {
if (!quiet) {
LOG_ERROR("Changing control %s is unsupported", name);
}
@@ -838,7 +852,10 @@ static int _device_query_control(struct device_t *dev, struct v4l2_queryctrl *qu
return 0;
}
static void _device_set_control(struct device_t *dev, struct v4l2_queryctrl *query, const char *name, unsigned cid, int value, bool quiet) {
static void _device_set_control(
struct device_t *dev, struct v4l2_queryctrl *query,
const char *name, unsigned cid, int value, bool quiet) {
struct v4l2_control ctl;
if (value < query->minimum || value > query->maximum || value % query->step != 0) {
@@ -853,7 +870,7 @@ static void _device_set_control(struct device_t *dev, struct v4l2_queryctrl *que
ctl.id = cid;
ctl.value = value;
if (xioctl(dev->run->fd, VIDIOC_S_CTRL, &ctl) < 0) {
if (xioctl(RUN(fd), VIDIOC_S_CTRL, &ctl) < 0) {
if (!quiet) {
LOG_PERROR("Can't set control %s", name);
}
@@ -910,3 +927,5 @@ static const char *_io_method_to_string_supported(enum v4l2_memory io_method) {
}
return "unsupported";
}
# undef RUN

View File

@@ -177,12 +177,12 @@ int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigne
}
int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index) {
# define HW_BUFFER(_next) dev->run->hw_buffers[index]._next
# define IN(_next) omx->input_buffer->_next
# define OUT(_next) omx->output_buffer->_next
# define HW(_next) dev->run->hw_buffers[index]._next
# define IN(_next) omx->input_buffer->_next
# define OUT(_next) omx->output_buffer->_next
OMX_ERRORTYPE error;
size_t slice_size = (IN(nAllocLen) < HW_BUFFER(used) ? IN(nAllocLen) : HW_BUFFER(used));
size_t slice_size = (IN(nAllocLen) < HW(used) ? IN(nAllocLen) : HW(used));
size_t pos = 0;
if ((error = OMX_FillThisBuffer(omx->encoder, omx->output_buffer)) != OMX_ErrorNone) {
@@ -218,18 +218,18 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
if (omx->input_required) {
omx->input_required = false;
if (pos == HW_BUFFER(used)) {
if (pos == HW(used)) {
continue;
}
memcpy(IN(pBuffer), HW_BUFFER(data) + pos, slice_size);
memcpy(IN(pBuffer), HW(data) + pos, slice_size);
IN(nOffset) = 0;
IN(nFilledLen) = slice_size;
pos += slice_size;
if (pos + slice_size > HW_BUFFER(used)) {
slice_size = HW_BUFFER(used) - pos;
if (pos + slice_size > HW(used)) {
slice_size = HW(used) - pos;
}
if ((error = OMX_EmptyThisBuffer(omx->encoder, omx->input_buffer)) != OMX_ErrorNone) {
@@ -245,7 +245,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
# undef OUT
# undef IN
# undef HW_BUFFER
# undef HW
return 0;
}

View File

@@ -92,6 +92,11 @@ static bool _expose_blank_picture(struct http_server_t *server);
static void _format_bufferevent_reason(short what, char *reason);
#define RUN(_next) server->run->_next
#define STREAM(_next) RUN(stream->_next)
#define EX(_next) RUN(exposed->_next)
struct http_server_t *http_server_init(struct stream_t *stream) {
struct http_server_runtime_t *run;
struct http_server_t *server;
@@ -125,22 +130,22 @@ struct http_server_t *http_server_init(struct stream_t *stream) {
}
void http_server_destroy(struct http_server_t *server) {
if (server->run->refresh) {
event_del(server->run->refresh);
event_free(server->run->refresh);
if (RUN(refresh)) {
event_del(RUN(refresh));
event_free(RUN(refresh));
}
evhttp_free(server->run->http);
if (server->run->unix_fd) {
close(server->run->unix_fd);
evhttp_free(RUN(http));
if (RUN(unix_fd)) {
close(RUN(unix_fd));
}
event_base_free(server->run->base);
event_base_free(RUN(base));
# if LIBEVENT_VERSION_NUMBER >= 0x02010100
libevent_global_shutdown();
# endif
for (struct stream_client_t *client = server->run->stream_clients; client != NULL;) {
for (struct stream_client_t *client = RUN(stream_clients); client != NULL;) {
struct stream_client_t *next = client->next;
free(client->key);
@@ -148,16 +153,16 @@ void http_server_destroy(struct http_server_t *server) {
client = next;
}
if (server->run->auth_token) {
free(server->run->auth_token);
if (RUN(auth_token)) {
free(RUN(auth_token));
}
if (server->run->blank) {
picture_destroy(server->run->blank);
if (RUN(blank)) {
picture_destroy(RUN(blank));
}
picture_destroy(server->run->exposed->picture);
free(server->run->exposed);
picture_destroy(EX(picture));
free(RUN(exposed));
free(server->run);
free(server);
}
@@ -166,48 +171,46 @@ int http_server_listen(struct http_server_t *server) {
{
if (server->static_path[0] != '\0') {
LOG_INFO("Enabling HTTP file server: %s", server->static_path);
evhttp_set_gencb(server->run->http, _http_callback_static, (void *)server);
evhttp_set_gencb(RUN(http), _http_callback_static, (void *)server);
} else {
assert(!evhttp_set_cb(server->run->http, "/", _http_callback_root, (void *)server));
assert(!evhttp_set_cb(RUN(http), "/", _http_callback_root, (void *)server));
}
assert(!evhttp_set_cb(server->run->http, "/state", _http_callback_state, (void *)server));
assert(!evhttp_set_cb(server->run->http, "/snapshot", _http_callback_snapshot, (void *)server));
assert(!evhttp_set_cb(server->run->http, "/stream", _http_callback_stream, (void *)server));
assert(!evhttp_set_cb(RUN(http), "/state", _http_callback_state, (void *)server));
assert(!evhttp_set_cb(RUN(http), "/snapshot", _http_callback_snapshot, (void *)server));
assert(!evhttp_set_cb(RUN(http), "/stream", _http_callback_stream, (void *)server));
}
server->run->drop_same_frames_blank = max_u(server->drop_same_frames, server->run->drop_same_frames_blank);
server->run->blank = blank_picture_init(server->blank_path);
RUN(drop_same_frames_blank) = max_u(server->drop_same_frames, RUN(drop_same_frames_blank));
RUN(blank) = blank_picture_init(server->blank_path);
# define EXPOSED(_next) server->run->exposed->_next
// See _expose_blank_picture()
picture_copy(server->run->blank, EXPOSED(picture));
EXPOSED(expose_begin_ts) = get_now_monotonic();
EXPOSED(expose_cmp_ts) = EXPOSED(expose_begin_ts);
EXPOSED(expose_end_ts) = EXPOSED(expose_begin_ts);
picture_copy(RUN(blank), EX(picture));
EX(expose_begin_ts) = get_now_monotonic();
EX(expose_cmp_ts) = EX(expose_begin_ts);
EX(expose_end_ts) = EX(expose_begin_ts);
// See _http_exposed_refresh()
EXPOSED(notify_last_width) = EXPOSED(picture->width);
EXPOSED(notify_last_height) = EXPOSED(picture->height);
# undef EXPOSED
EX(notify_last_width) = EX(picture->width);
EX(notify_last_height) = EX(picture->height);
{
struct timeval refresh_interval;
refresh_interval.tv_sec = 0;
if (server->run->stream->dev->desired_fps > 0) {
refresh_interval.tv_usec = 1000000 / (server->run->stream->dev->desired_fps * 2);
if (STREAM(dev->desired_fps) > 0) {
refresh_interval.tv_usec = 1000000 / (STREAM(dev->desired_fps) * 2);
} else {
refresh_interval.tv_usec = 16000; // ~60fps
}
assert((server->run->refresh = event_new(server->run->base, -1, EV_PERSIST, _http_exposed_refresh, server)));
assert(!event_add(server->run->refresh, &refresh_interval));
assert((RUN(refresh) = event_new(RUN(base), -1, EV_PERSIST, _http_exposed_refresh, server)));
assert(!event_add(RUN(refresh), &refresh_interval));
}
if (server->slowdown) {
stream_switch_slowdown(server->run->stream, true);
stream_switch_slowdown(RUN(stream), true);
}
evhttp_set_timeout(server->run->http, server->timeout);
evhttp_set_timeout(RUN(http), server->timeout);
if (server->user[0] != '\0') {
char *raw_token;
@@ -218,8 +221,8 @@ int http_server_listen(struct http_server_t *server) {
encoded_token = base64_encode((unsigned char *)raw_token);
free(raw_token);
A_CALLOC(server->run->auth_token, strlen(encoded_token) + 16);
sprintf(server->run->auth_token, "Basic %s", encoded_token);
A_CALLOC(RUN(auth_token), strlen(encoded_token) + 16);
sprintf(RUN(auth_token), "Basic %s", encoded_token);
free(encoded_token);
LOG_INFO("Using HTTP basic auth");
@@ -227,8 +230,8 @@ int http_server_listen(struct http_server_t *server) {
if (server->unix_path[0] != '\0') {
LOG_DEBUG("Binding HTTP to UNIX socket '%s' ...", server->unix_path);
if ((server->run->unix_fd = evhttp_my_bind_unix(
server->run->http,
if ((RUN(unix_fd) = evhttp_my_bind_unix(
RUN(http),
server->unix_path,
server->unix_rm,
server->unix_mode)) < 0
@@ -241,7 +244,7 @@ int http_server_listen(struct http_server_t *server) {
}
} else {
LOG_DEBUG("Binding HTTP to [%s]:%u ...", server->host, server->port);
if (evhttp_bind_socket(server->run->http, server->host, server->port) < 0) {
if (evhttp_bind_socket(RUN(http), server->host, server->port) < 0) {
LOG_PERROR("Can't bind HTTP on [%s]:%u", server->host, server->port)
return -1;
}
@@ -253,22 +256,22 @@ int http_server_listen(struct http_server_t *server) {
void http_server_loop(struct http_server_t *server) {
LOG_INFO("Starting HTTP eventloop ...");
event_base_dispatch(server->run->base);
event_base_dispatch(RUN(base));
LOG_INFO("HTTP eventloop stopped");
}
void http_server_loop_break(struct http_server_t *server) {
event_base_loopbreak(server->run->base);
event_base_loopbreak(RUN(base));
}
#define ADD_HEADER(_key, _value) \
assert(!evhttp_add_header(evhttp_request_get_output_headers(request), _key, _value))
static int _http_preprocess_request(struct evhttp_request *request, struct http_server_t *server) {
if (server->run->auth_token) {
if (RUN(auth_token)) {
const char *token = evhttp_find_header(evhttp_request_get_input_headers(request), "Authorization");
if (token == NULL || strcmp(token, server->run->auth_token) != 0) {
if (token == NULL || strcmp(token, RUN(auth_token)) != 0) {
ADD_HEADER("WWW-Authenticate", "Basic realm=\"Restricted area\"");
evhttp_send_reply(request, 401, "Unauthorized", NULL);
return -1;
@@ -396,13 +399,12 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
PREPROCESS_REQUEST;
# define ENCODER(_next) server->run->stream->encoder->_next
# define EXPOSED(_next) server->run->exposed->_next
A_MUTEX_LOCK(&ENCODER(run->mutex));
encoder_run_type = ENCODER(run->type);
encoder_run_quality = ENCODER(run->quality);
A_MUTEX_UNLOCK(&ENCODER(run->mutex));
# define ER(_next) STREAM(encoder->run->_next)
A_MUTEX_LOCK(&ER(mutex));
encoder_run_type = ER(type);
encoder_run_quality = ER(quality);
A_MUTEX_UNLOCK(&ER(mutex));
# undef ER
assert((buf = evbuffer_new()));
@@ -414,19 +416,16 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
" \"stream\": {\"queued_fps\": %u, \"clients\": %u, \"clients_stat\": {",
encoder_type_to_string(encoder_run_type),
encoder_run_quality,
(server->fake_width ? server->fake_width : EXPOSED(picture->width)),
(server->fake_height ? server->fake_height : EXPOSED(picture->height)),
bool_to_string(EXPOSED(online)),
server->run->stream->dev->desired_fps,
EXPOSED(captured_fps),
EXPOSED(queued_fps),
server->run->stream_clients_count
(server->fake_width ? server->fake_width : EX(picture->width)),
(server->fake_height ? server->fake_height : EX(picture->height)),
bool_to_string(EX(online)),
STREAM(dev->desired_fps),
EX(captured_fps),
EX(queued_fps),
RUN(stream_clients_count)
));
# undef EXPOSED
# undef ENCODER
for (struct stream_client_t * client = server->run->stream_clients; client != NULL; client = client->next) {
for (struct stream_client_t * client = RUN(stream_clients); client != NULL; client = client->next) {
assert(evbuffer_add_printf(buf,
"\"%s\": {\"fps\": %u, \"extra_headers\": %s, \"advance_headers\": %s, \"dual_final_frames\": %s}%s",
client->id,
@@ -452,10 +451,8 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv
PREPROCESS_REQUEST;
# define EXPOSED(_next) server->run->exposed->_next
assert((buf = evbuffer_new()));
assert(!evbuffer_add(buf, (const void *)EXPOSED(picture->data), EXPOSED(picture->used)));
assert(!evbuffer_add(buf, (const void *)EX(picture->data), EX(picture->used)));
if (server->allow_origin[0] != '\0') {
ADD_HEADER("Access-Control-Allow-Origin", server->allow_origin);
@@ -476,21 +473,20 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv
ADD_TIME_HEADER("X-Timestamp", get_now_real());
ADD_HEADER("X-UStreamer-Online", bool_to_string(EXPOSED(online)));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", EXPOSED(dropped));
ADD_UNSIGNED_HEADER("X-UStreamer-Width", EXPOSED(picture->width));
ADD_UNSIGNED_HEADER("X-UStreamer-Height", EXPOSED(picture->height));
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", EXPOSED(picture->grab_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", EXPOSED(picture->encode_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", EXPOSED(picture->encode_end_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", EXPOSED(expose_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", EXPOSED(expose_cmp_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", EXPOSED(expose_end_ts));
ADD_HEADER("X-UStreamer-Online", bool_to_string(EX(online)));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", EX(dropped));
ADD_UNSIGNED_HEADER("X-UStreamer-Width", EX(picture->width));
ADD_UNSIGNED_HEADER("X-UStreamer-Height", EX(picture->height));
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", EX(picture->grab_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", EX(picture->encode_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", EX(picture->encode_end_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", EX(expose_begin_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", EX(expose_cmp_ts));
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", EX(expose_end_ts));
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", get_now_monotonic());
# undef ADD_UNSUGNED_HEADER
# undef ADD_TIME_HEADER
# undef EXPOSED
ADD_HEADER("Content-Type", "image/jpeg");
@@ -536,20 +532,20 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
uuid_generate(uuid);
uuid_unparse_lower(uuid, client->id);
if (server->run->stream_clients == NULL) {
server->run->stream_clients = client;
if (RUN(stream_clients) == NULL) {
RUN(stream_clients) = client;
} else {
struct stream_client_t *last = server->run->stream_clients;
struct stream_client_t *last = RUN(stream_clients);
for (; last->next != NULL; last = last->next);
client->prev = last;
last->next = client;
}
server->run->stream_clients_count += 1;
RUN(stream_clients_count) += 1;
if (server->run->stream_clients_count == 1) {
if (RUN(stream_clients_count) == 1) {
if (server->slowdown) {
stream_switch_slowdown(server->run->stream, false);
stream_switch_slowdown(RUN(stream), false);
}
# ifdef WITH_GPIO
@@ -559,10 +555,10 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
evhttp_connection_get_peer(conn, &client_addr, &client_port);
LOG_INFO("HTTP: Registered client: [%s]:%u, id=%s; clients now: %u",
client_addr, client_port, client->id, server->run->stream_clients_count);
client_addr, client_port, client->id, RUN(stream_clients_count));
buf_event = evhttp_connection_get_bufferevent(conn);
if (server->tcp_nodelay && !server->run->unix_fd) {
if (server->tcp_nodelay && !RUN(unix_fd)) {
evutil_socket_t fd;
int on = 1;
@@ -586,6 +582,7 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
# define RN "\r\n"
struct stream_client_t *client = (struct stream_client_t *)v_client;
struct http_server_t *server = client->server;
struct evbuffer *buf;
long double now = get_now_monotonic();
long long now_second = floor_ms(now);
@@ -647,15 +644,13 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
client->need_initial = false;
}
# define EXPOSED(_next) client->server->run->exposed->_next
if (!client->advance_headers) {
assert(evbuffer_add_printf(buf,
"Content-Type: image/jpeg" RN
"Content-Length: %zu" RN
"X-Timestamp: %.06Lf" RN
"%s",
EXPOSED(picture->used),
EX(picture->used),
get_now_real(),
(client->extra_headers ? "" : RN)
));
@@ -674,23 +669,23 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
"X-UStreamer-Expose-End-Time: %.06Lf" RN
"X-UStreamer-Send-Time: %.06Lf" RN
RN,
bool_to_string(EXPOSED(online)),
EXPOSED(dropped),
EXPOSED(picture->width),
EXPOSED(picture->height),
bool_to_string(EX(online)),
EX(dropped),
EX(picture->width),
EX(picture->height),
client->fps,
EXPOSED(picture->grab_ts),
EXPOSED(picture->encode_begin_ts),
EXPOSED(picture->encode_end_ts),
EXPOSED(expose_begin_ts),
EXPOSED(expose_cmp_ts),
EXPOSED(expose_end_ts),
EX(picture->grab_ts),
EX(picture->encode_begin_ts),
EX(picture->encode_end_ts),
EX(expose_begin_ts),
EX(expose_cmp_ts),
EX(expose_end_ts),
now
));
}
}
assert(!evbuffer_add(buf, (void *)EXPOSED(picture->data), EXPOSED(picture->used)));
assert(!evbuffer_add(buf, (void *)EX(picture->data), EX(picture->used)));
assert(evbuffer_add_printf(buf, RN "--" BOUNDARY RN));
if (client->advance_headers) {
@@ -703,7 +698,6 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client);
bufferevent_enable(buf_event, EV_READ);
# undef EXPOSED
# undef ADD_ADVANCE_HEADERS
# undef RN
# undef BOUNDARY
@@ -711,6 +705,7 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UNUSED short what, void *v_client) {
struct stream_client_t *client = (struct stream_client_t *)v_client;
struct http_server_t *server = client->server;
struct evhttp_connection *conn;
char *client_addr = "???";
unsigned short client_port = 0;
@@ -718,8 +713,6 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
_format_bufferevent_reason(what, reason);
# define RUN(_next) client->server->run->_next
assert(RUN(stream_clients_count) > 0);
RUN(stream_clients_count) -= 1;
@@ -754,8 +747,6 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
}
free(client->key);
free(client);
# undef RUN
}
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated) {
@@ -765,7 +756,7 @@ static void _http_queue_send_stream(struct http_server_t *server, bool stream_up
bool has_clients = false;
bool queued = false;
for (struct stream_client_t *client = server->run->stream_clients; client != NULL; client = client->next) {
for (struct stream_client_t *client = RUN(stream_clients); client != NULL; client = client->next) {
conn = evhttp_request_get_connection(client->request);
if (conn) {
// Фикс для бага WebKit. При включенной опции дропа одинаковых фреймов,
@@ -803,13 +794,13 @@ static void _http_queue_send_stream(struct http_server_t *server, bool stream_up
static long long queued_fps_second = 0;
if ((now = floor_ms(get_now_monotonic())) != queued_fps_second) {
server->run->exposed->queued_fps = queued_fps_accum;
EX(queued_fps) = queued_fps_accum;
queued_fps_accum = 0;
queued_fps_second = now;
}
queued_fps_accum += 1;
} else if (!has_clients) {
server->run->exposed->queued_fps = 0;
EX(queued_fps) = 0;
}
}
@@ -819,14 +810,14 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
bool picture_updated = false;
# define UNLOCK_STREAM { \
atomic_store(&server->run->stream->updated, false); \
A_MUTEX_UNLOCK(&server->run->stream->mutex); \
atomic_store(&STREAM(updated), false); \
A_MUTEX_UNLOCK(&STREAM(mutex)); \
}
if (atomic_load(&server->run->stream->updated)) {
if (atomic_load(&STREAM(updated))) {
LOG_DEBUG("Refreshing HTTP exposed ...");
A_MUTEX_LOCK(&server->run->stream->mutex);
if (server->run->stream->online) {
A_MUTEX_LOCK(&STREAM(mutex));
if (STREAM(online)) {
picture_updated = _expose_new_picture_unsafe(server);
UNLOCK_STREAM;
} else {
@@ -834,7 +825,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
picture_updated = _expose_blank_picture(server);
}
stream_updated = true;
} else if (!server->run->exposed->online) {
} else if (!EX(online)) {
LOG_DEBUG("Refreshing HTTP exposed (BLANK) ...");
picture_updated = _expose_blank_picture(server);
stream_updated = true;
@@ -844,83 +835,71 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
_http_queue_send_stream(server, stream_updated, picture_updated);
# define EXPOSED(_next) server->run->exposed->_next
if (
picture_updated
&& server->notify_parent
&& (
EXPOSED(notify_last_online) != EXPOSED(online)
|| EXPOSED(notify_last_width) != EXPOSED(picture->width)
|| EXPOSED(notify_last_height) != EXPOSED(picture->height)
EX(notify_last_online) != EX(online)
|| EX(notify_last_width) != EX(picture->width)
|| EX(notify_last_height) != EX(picture->height)
)
) {
EXPOSED(notify_last_online) = EXPOSED(online);
EXPOSED(notify_last_width) = EXPOSED(picture->width);
EXPOSED(notify_last_height) = EXPOSED(picture->height);
EX(notify_last_online) = EX(online);
EX(notify_last_width) = EX(picture->width);
EX(notify_last_height) = EX(picture->height);
process_notify_parent();
}
# undef EXPOSED
}
static bool _expose_new_picture_unsafe(struct http_server_t *server) {
# define EXPOSED(_next) server->run->exposed->_next
# define STREAM(_next) server->run->stream->_next
EXPOSED(captured_fps) = STREAM(captured_fps);
EXPOSED(expose_begin_ts) = get_now_monotonic();
EX(captured_fps) = STREAM(captured_fps);
EX(expose_begin_ts) = get_now_monotonic();
if (server->drop_same_frames) {
if (
EXPOSED(online)
&& EXPOSED(dropped) < server->drop_same_frames
&& picture_compare(EXPOSED(picture), STREAM(picture))
EX(online)
&& EX(dropped) < server->drop_same_frames
&& picture_compare(EX(picture), STREAM(picture))
) {
EXPOSED(expose_cmp_ts) = get_now_monotonic();
EXPOSED(expose_end_ts) = EXPOSED(expose_cmp_ts);
EX(expose_cmp_ts) = get_now_monotonic();
EX(expose_end_ts) = EX(expose_cmp_ts);
LOG_VERBOSE("HTTP: Dropped same frame number %u; cmp_time=%.06Lf",
EXPOSED(dropped), EXPOSED(expose_cmp_ts) - EXPOSED(expose_begin_ts));
EXPOSED(dropped) += 1;
EX(dropped), EX(expose_cmp_ts) - EX(expose_begin_ts));
EX(dropped) += 1;
return false; // Not updated
} else {
EXPOSED(expose_cmp_ts) = get_now_monotonic();
EX(expose_cmp_ts) = get_now_monotonic();
LOG_VERBOSE("HTTP: Passed same frame check (frames are differ); cmp_time=%.06Lf",
EXPOSED(expose_cmp_ts) - EXPOSED(expose_begin_ts));
EX(expose_cmp_ts) - EX(expose_begin_ts));
}
}
picture_copy(STREAM(picture), EXPOSED(picture));
picture_copy(STREAM(picture), EX(picture));
# undef STREAM
EXPOSED(online) = true;
EXPOSED(dropped) = 0;
EXPOSED(expose_cmp_ts) = EXPOSED(expose_begin_ts);
EXPOSED(expose_end_ts) = get_now_monotonic();
EX(online) = true;
EX(dropped) = 0;
EX(expose_cmp_ts) = EX(expose_begin_ts);
EX(expose_end_ts) = get_now_monotonic();
LOG_VERBOSE("HTTP: Exposed new frame; full exposition time = %.06Lf",
EXPOSED(expose_end_ts) - EXPOSED(expose_begin_ts));
EX(expose_end_ts) - EX(expose_begin_ts));
# undef EXPOSED
return true; // Updated
}
static bool _expose_blank_picture(struct http_server_t *server) {
# define EXPOSED(_next) server->run->exposed->_next
EX(expose_begin_ts) = get_now_monotonic();
EX(expose_cmp_ts) = EX(expose_begin_ts);
EXPOSED(expose_begin_ts) = get_now_monotonic();
EXPOSED(expose_cmp_ts) = EXPOSED(expose_begin_ts);
# define EXPOSE_BLANK picture_copy(RUN(blank), EX(picture))
# define EXPOSE_BLANK picture_copy(server->run->blank, EXPOSED(picture))
if (EXPOSED(online)) { // Если переходим из online в offline
if (EX(online)) { // Если переходим из online в offline
if (server->last_as_blank < 0) { // Если last_as_blank выключено, просто покажем картинку
LOG_INFO("HTTP: Changed picture to BLANK");
EXPOSE_BLANK;
} else if (server->last_as_blank > 0) { // Если нужен таймер - запустим
LOG_INFO("HTTP: Freezing last alive frame for %d seconds", server->last_as_blank);
EXPOSED(last_as_blank_ts) = get_now_monotonic();
EX(last_as_blank_ts) = get_now_monotonic();
} else { // last_as_blank == 0 - показываем последний фрейм вечно
LOG_INFO("HTTP: Freezing last alive frame forever");
}
@@ -929,32 +908,30 @@ static bool _expose_blank_picture(struct http_server_t *server) {
if ( // Если уже оффлайн, включена фича last_as_blank с таймером и он запущен
server->last_as_blank > 0
&& EXPOSED(last_as_blank_ts) > 0
&& EXPOSED(last_as_blank_ts) + server->last_as_blank < EXPOSED(expose_begin_ts)
&& EX(last_as_blank_ts) > 0
&& EX(last_as_blank_ts) + server->last_as_blank < EX(expose_begin_ts)
) {
LOG_INFO("HTTP: Changed last alive frame to BLANK");
EXPOSE_BLANK;
EXPOSED(last_as_blank_ts) = 0; // Останавливаем таймер
EX(last_as_blank_ts) = 0; // Останавливаем таймер
goto updated;
}
# undef EXPOSE_BLANK
if (EXPOSED(dropped) < server->run->drop_same_frames_blank) {
LOG_PERF("HTTP: Dropped same frame (BLANK) number %u", EXPOSED(dropped));
EXPOSED(dropped) += 1;
EXPOSED(expose_end_ts) = get_now_monotonic();
if (EX(dropped) < RUN(drop_same_frames_blank)) {
LOG_PERF("HTTP: Dropped same frame (BLANK) number %u", EX(dropped));
EX(dropped) += 1;
EX(expose_end_ts) = get_now_monotonic();
return false; // Not updated
}
updated:
EXPOSED(captured_fps) = 0;
EXPOSED(online) = false;
EXPOSED(dropped) = 0;
EXPOSED(expose_end_ts) = get_now_monotonic();
EX(captured_fps) = 0;
EX(online) = false;
EX(dropped) = 0;
EX(expose_end_ts) = get_now_monotonic();
return true; // Updated
# undef EXPOSED
}
static void _format_bufferevent_reason(short what, char *reason) {
@@ -986,3 +963,7 @@ static void _format_bufferevent_reason(short what, char *reason) {
strcat(reason, ")");
}
#undef EX
#undef STREAM
#undef RUN

View File

@@ -76,8 +76,8 @@ struct _worker_t {
struct _workers_pool_t {
unsigned n_workers;
struct _worker_t *workers;
struct _worker_t *oldest_worker;
struct _worker_t *latest_worker;
struct _worker_t *oldest_wr;
struct _worker_t *latest_wr;
long double approx_comp_time;
@@ -101,8 +101,8 @@ static void _workers_pool_destroy(struct _workers_pool_t *pool);
static void *_worker_thread(void *v_worker);
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool);
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index);
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool, struct _worker_t *ready_worker);
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_wr, unsigned buf_index);
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool, struct _worker_t *ready_wr);
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
@@ -157,19 +157,19 @@ void stream_loop(struct stream_t *stream) {
picture_realloc_data(stream->picture, picture_get_generous_size(DEV(run->width), DEV(run->height)));
while (!atomic_load(&stream->proc->stop)) {
struct _worker_t *ready_worker;
struct _worker_t *ready_wr;
SEP_DEBUG('-');
LOG_DEBUG("Waiting for worker ...");
ready_worker = _workers_pool_wait(pool);
ready_wr = _workers_pool_wait(pool);
if (!ready_worker->job_failed) {
if (ready_worker->job_timely) {
_stream_expose_picture(stream, ready_worker->buf_index, captured_fps);
LOG_PERF("##### Encoded picture exposed; worker=%u", ready_worker->number);
if (!ready_wr->job_failed) {
if (ready_wr->job_timely) {
_stream_expose_picture(stream, ready_wr->buf_index, captured_fps);
LOG_PERF("##### Encoded picture exposed; worker=%u", ready_wr->number);
} else {
LOG_PERF("----- Encoded picture dropped; worker=%u", ready_worker->number);
LOG_PERF("----- Encoded picture dropped; worker=%u", ready_wr->number);
}
} else {
break;
@@ -228,7 +228,7 @@ void stream_loop(struct stream_t *stream) {
}
captured_fps_accum += 1;
long double fluency_delay = _workers_pool_get_fluency_delay(pool, ready_worker);
long double fluency_delay = _workers_pool_get_fluency_delay(pool, ready_wr);
grab_after = now + fluency_delay;
LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
@@ -247,7 +247,7 @@ void stream_loop(struct stream_t *stream) {
}
# endif
_workers_pool_assign(pool, ready_worker, buf_index);
_workers_pool_assign(pool, ready_wr, buf_index);
}
} else if (buf_index != -2) { // -2 for broken frame
break;
@@ -384,28 +384,28 @@ static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
# undef DEV
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
# define WR(_next) pool->workers[number]._next
A_MUTEX_INIT(&WORKER(has_job_mutex));
atomic_init(&WORKER(has_job), false);
A_COND_INIT(&WORKER(has_job_cond));
A_MUTEX_INIT(&WR(has_job_mutex));
atomic_init(&WR(has_job), false);
A_COND_INIT(&WR(has_job_cond));
WORKER(number) = number;
WORKER(proc_stop) = &stream->proc->stop;
WORKER(workers_stop) = &pool->workers_stop;
WR(number) = number;
WR(proc_stop) = &stream->proc->stop;
WR(workers_stop) = &pool->workers_stop;
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
WORKER(free_workers) = &pool->free_workers;
WORKER(free_workers_cond) = &pool->free_workers_cond;
WR(free_workers_mutex) = &pool->free_workers_mutex;
WR(free_workers) = &pool->free_workers;
WR(free_workers_cond) = &pool->free_workers_cond;
WORKER(dev) = stream->dev;
WORKER(encoder) = stream->encoder;
WR(dev) = stream->dev;
WR(encoder) = stream->encoder;
A_THREAD_CREATE(&WORKER(tid), _worker_thread, (void *)&(pool->workers[number]));
A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
pool->free_workers += 1;
# undef WORKER
# undef WR
}
return pool;
}
@@ -415,18 +415,18 @@ static void _workers_pool_destroy(struct _workers_pool_t *pool) {
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
# define WR(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(has_job_mutex));
atomic_store(&WORKER(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_COND_SIGNAL(&WORKER(has_job_cond));
A_MUTEX_LOCK(&WR(has_job_mutex));
atomic_store(&WR(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WR(has_job_mutex));
A_COND_SIGNAL(&WR(has_job_cond));
A_THREAD_JOIN(WORKER(tid));
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_COND_DESTROY(&WORKER(has_job_cond));
A_THREAD_JOIN(WR(tid));
A_MUTEX_DESTROY(&WR(has_job_mutex));
A_COND_DESTROY(&WR(has_job_cond));
# undef WORKER
# undef WR
}
A_MUTEX_DESTROY(&pool->free_workers_mutex);
@@ -437,121 +437,121 @@ static void _workers_pool_destroy(struct _workers_pool_t *pool) {
}
static void *_worker_thread(void *v_worker) {
struct _worker_t *worker = (struct _worker_t *)v_worker;
struct _worker_t *wr = (struct _worker_t *)v_worker;
A_THREAD_RENAME("worker-%u", worker->number);
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
A_THREAD_RENAME("worker-%u", wr->number);
LOG_DEBUG("Hello! I am a worker #%u ^_^", wr->number);
while (!atomic_load(worker->proc_stop) && !atomic_load(worker->workers_stop)) {
LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
while (!atomic_load(wr->proc_stop) && !atomic_load(wr->workers_stop)) {
LOG_DEBUG("Worker %u waiting for a new job ...", wr->number);
A_MUTEX_LOCK(&worker->has_job_mutex);
A_COND_WAIT_TRUE(atomic_load(&worker->has_job), &worker->has_job_cond, &worker->has_job_mutex);
A_MUTEX_UNLOCK(&worker->has_job_mutex);
A_MUTEX_LOCK(&wr->has_job_mutex);
A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
A_MUTEX_UNLOCK(&wr->has_job_mutex);
if (!atomic_load(worker->workers_stop)) {
# define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]->_next
if (!atomic_load(wr->workers_stop)) {
# define PIC(_next) wr->dev->run->pictures[wr->buf_index]->_next
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", wr->number, wr->buf_index);
worker->job_failed = (bool)encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index);
wr->job_failed = (bool)encoder_compress_buffer(wr->encoder, wr->dev, wr->number, wr->buf_index);
if (device_release_buffer(worker->dev, worker->buf_index) == 0) {
if (!worker->job_failed) {
worker->job_start_ts = PICTURE(encode_begin_ts);
worker->last_comp_time = PICTURE(encode_end_ts) - worker->job_start_ts;
if (device_release_buffer(wr->dev, wr->buf_index) == 0) {
if (!wr->job_failed) {
wr->job_start_ts = PIC(encode_begin_ts);
wr->last_comp_time = PIC(encode_end_ts) - wr->job_start_ts;
LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%u, buffer=%u",
PICTURE(used), worker->last_comp_time, worker->number, worker->buf_index);
PIC(used), wr->last_comp_time, wr->number, wr->buf_index);
} else {
LOG_VERBOSE("Compression failed: worker=%u, buffer=%u", worker->number, worker->buf_index);
LOG_VERBOSE("Compression failed: worker=%u, buffer=%u", wr->number, wr->buf_index);
}
} else {
worker->job_failed = true;
wr->job_failed = true;
}
atomic_store(&worker->has_job, false);
atomic_store(&wr->has_job, false);
# undef PICTURE
# undef PIC
}
A_MUTEX_LOCK(worker->free_workers_mutex);
*worker->free_workers += 1;
A_MUTEX_UNLOCK(worker->free_workers_mutex);
A_COND_SIGNAL(worker->free_workers_cond);
A_MUTEX_LOCK(wr->free_workers_mutex);
*wr->free_workers += 1;
A_MUTEX_UNLOCK(wr->free_workers_mutex);
A_COND_SIGNAL(wr->free_workers_cond);
}
LOG_DEBUG("Bye-bye (worker %u)", worker->number);
LOG_DEBUG("Bye-bye (worker %u)", wr->number);
return NULL;
}
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool) {
struct _worker_t *ready_worker = NULL;
struct _worker_t *ready_wr = NULL;
A_MUTEX_LOCK(&pool->free_workers_mutex);
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job)) {
ready_worker = pool->oldest_worker;
ready_worker->job_timely = true;
pool->oldest_worker = pool->oldest_worker->order_next;
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
ready_wr = pool->oldest_wr;
ready_wr->job_timely = true;
pool->oldest_wr = pool->oldest_wr->order_next;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
ready_worker == NULL
|| ready_worker->job_start_ts < pool->workers[number].job_start_ts
ready_wr == NULL
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
)
) {
ready_worker = &pool->workers[number];
ready_wr = &pool->workers[number];
break;
}
}
assert(ready_worker != NULL);
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
assert(ready_wr != NULL);
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
}
return ready_worker;
return ready_wr;
}
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index) {
if (pool->oldest_worker == NULL) {
pool->oldest_worker = ready_worker;
pool->latest_worker = pool->oldest_worker;
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_wr, unsigned buf_index) {
if (pool->oldest_wr == NULL) {
pool->oldest_wr = ready_wr;
pool->latest_wr = pool->oldest_wr;
} else {
if (ready_worker->order_next) {
ready_worker->order_next->order_prev = ready_worker->order_prev;
if (ready_wr->order_next) {
ready_wr->order_next->order_prev = ready_wr->order_prev;
}
if (ready_worker->order_prev) {
ready_worker->order_prev->order_next = ready_worker->order_next;
if (ready_wr->order_prev) {
ready_wr->order_prev->order_next = ready_wr->order_next;
}
ready_worker->order_prev = pool->latest_worker;
pool->latest_worker->order_next = ready_worker;
pool->latest_worker = ready_worker;
ready_wr->order_prev = pool->latest_wr;
pool->latest_wr->order_next = ready_wr;
pool->latest_wr = ready_wr;
}
pool->latest_worker->order_next = NULL;
pool->latest_wr->order_next = NULL;
A_MUTEX_LOCK(&ready_worker->has_job_mutex);
ready_worker->buf_index = buf_index;
atomic_store(&ready_worker->has_job, true);
A_MUTEX_UNLOCK(&ready_worker->has_job_mutex);
A_COND_SIGNAL(&ready_worker->has_job_cond);
A_MUTEX_LOCK(&ready_wr->has_job_mutex);
ready_wr->buf_index = buf_index;
atomic_store(&ready_wr->has_job, true);
A_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
A_COND_SIGNAL(&ready_wr->has_job_cond);
A_MUTEX_LOCK(&pool->free_workers_mutex);
pool->free_workers -= 1;
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_wr->number);
}
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool, struct _worker_t *ready_worker) {
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool, struct _worker_t *ready_wr) {
long double approx_comp_time;
long double min_delay;
approx_comp_time = pool->approx_comp_time * 0.9 + ready_worker->last_comp_time * 0.1;
approx_comp_time = pool->approx_comp_time * 0.9 + ready_wr->last_comp_time * 0.1;
LOG_VERBOSE("Correcting approx_comp_time: %.3Lf -> %.3Lf (last_comp_time=%.3Lf)",
pool->approx_comp_time, approx_comp_time, ready_worker->last_comp_time);
pool->approx_comp_time, approx_comp_time, ready_wr->last_comp_time);
pool->approx_comp_time = approx_comp_time;