Compare commits

...

17 Commits
v0.76 ... v0.78

Author SHA1 Message Date
Devaev Maxim
05246706f0 Bump version: 0.77 → 0.78 2019-06-02 21:07:18 +03:00
Devaev Maxim
9355055a7f Revert "fluency delay based on involved workers counter"
This reverts commit 9f16de13fe.
2019-06-02 21:06:30 +03:00
Devaev Maxim
9f16de13fe fluency delay based on involved workers counter 2019-06-02 20:23:39 +03:00
Devaev Maxim
36a987fb9d locked captured_fps 2019-06-02 19:35:53 +03:00
Devaev Maxim
652397f569 refactoring 2019-06-02 16:31:50 +03:00
Devaev Maxim
3333fc56a3 refactoring 2019-06-02 16:20:30 +03:00
Devaev Maxim
47378a17db refactoring 2019-06-02 15:22:34 +03:00
Devaev Maxim
5f320786f5 fixed workers cleanup on restart 2019-06-01 05:31:44 +03:00
Devaev Maxim
498c6e1f5d refactoring 2019-06-01 05:00:09 +03:00
Devaev Maxim
6c09adc689 free_worker -> ready_worker 2019-06-01 04:58:01 +03:00
Devaev Maxim
8bf7ac3005 refactoring 2019-06-01 04:47:51 +03:00
Devaev Maxim
6aebd7167e config.mk 2019-05-31 01:02:03 +03:00
Devaev Maxim
3f03575222 refactoring 2019-05-30 06:53:01 +03:00
Devaev Maxim
9ca43f17a3 debug workers using gpio on rpi 2019-05-26 22:32:51 +03:00
Devaev Maxim
66ef566c9a Makefile: on/off WITH_* options 2019-05-26 18:47:53 +03:00
Devaev Maxim
07ecc5b0a0 Bump version: 0.76 → 0.77 2019-05-26 06:03:59 +03:00
Devaev Maxim
8d19711f2c fixed pkgbuild 2019-05-26 06:03:48 +03:00
13 changed files with 360 additions and 272 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 0.76
current_version = 0.78
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)?
serialize =
{major}.{minor}

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@
/pkg/arch/src/
/pkg/arch/v*.tar.gz
/pkg/arch/ustreamer-*.pkg.tar.xz
/config.mk
/vgcore.*
/ustreamer
/*.sock

View File

@@ -1,3 +1,5 @@
-include config.mk
PROG ?= ustreamer
DESTDIR ?=
PREFIX ?= /usr/local
@@ -15,14 +17,25 @@ LIBS = -lm -ljpeg -pthread -levent -levent_pthreads -luuid
override CFLAGS += -c -std=c11 -Wall -Wextra -D_GNU_SOURCE
SOURCES = $(shell ls src/*.c src/http/*.c src/encoders/cpu/*.c src/encoders/hw/*.c)
ifeq ($(WITH_OMX_ENCODER),)
else
define optbool
$(filter $(shell echo $(1) | tr A-Z a-z), yes on 1)
endef
ifneq ($(call optbool,$(WITH_OMX_ENCODER)),)
LIBS += -lbcm_host -lvcos -lopenmaxil -L$(RPI_VC_LIBS)
override CFLAGS += -DWITH_OMX_ENCODER -DOMX_SKIP64BIT -I$(RPI_VC_HEADERS)
SOURCES += $(shell ls src/encoders/omx/*.c)
endif
ifneq ($(call optbool,$(WITH_WORKERS_GPIO_DEBUG)),)
LIBS += -lwiringPi
override CFLAGS += -DWITH_WORKERS_GPIO_DEBUG
endif
# =====
all: $(SOURCES) $(PROG)

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer
pkgver=0.76
pkgver=0.78
pkgrel=1
pkgdesc="Lightweight and fast MJPG-HTTP streamer"
url="https://github.com/pi-kvm/ustreamer"
@@ -12,7 +12,7 @@ arch=(i686 x86_64 armv6h armv7h)
depends=(libjpeg libevent libutil-linux)
# optional: raspberrypi-firmware for OMX JPEG encoder
makedepends=(gcc make)
source=(${pkgname}::"git+https://github.com/pi-kvm/ustreamer#commit=${pkgver}")
source=(${pkgname}::"git+https://github.com/pi-kvm/ustreamer#commit=v${pkgver}")
md5sums=(SKIP)

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=0.76
PKG_VERSION:=0.78
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -23,5 +23,5 @@
#pragma once
#ifndef VERSION
# define VERSION "0.76"
# define VERSION "0.78"
#endif

View File

@@ -30,6 +30,7 @@
#include <errno.h>
#include <assert.h>
#include <sys/select.h>
#include <sys/mman.h>
#include <linux/videodev2.h>
@@ -232,6 +233,39 @@ int device_switch_capturing(struct device_t *dev, bool enable) {
return 0;
}
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error) {
struct timeval timeout;
int retval;
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(dev->run->fd, &_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
timeout.tv_sec = dev->timeout;
timeout.tv_usec = 0;
LOG_DEBUG("Calling select() on video device ...");
retval = select(dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
if (retval > 0) {
*has_read = FD_ISSET(dev->run->fd, &read_fds);
*has_write = FD_ISSET(dev->run->fd, &write_fds);
*has_error = FD_ISSET(dev->run->fd, &error_fds);
} else {
has_read = false;
has_write = false;
has_error = false;
}
LOG_DEBUG("Device select() --> %d", retval);
return retval;
}
int device_grab_buffer(struct device_t *dev) {
struct v4l2_buffer buf_info;
@@ -253,6 +287,7 @@ int device_grab_buffer(struct device_t *dev) {
dev->run->hw_buffers[buf_info.index].used = buf_info.bytesused;
memcpy(&dev->run->hw_buffers[buf_info.index].buf_info, &buf_info, sizeof(struct v4l2_buffer));
dev->run->pictures[buf_info.index].grab_time = get_now_monotonic();
return buf_info.index;
}

View File

@@ -120,6 +120,7 @@ int device_open(struct device_t *dev);
void device_close(struct device_t *dev);
int device_switch_capturing(struct device_t *dev, bool enable);
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error);
int device_grab_buffer(struct device_t *dev);
int device_release_buffer(struct device_t *dev, unsigned index);
int device_consume_event(struct device_t *dev);

View File

@@ -188,6 +188,8 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
assert(encoder->run->type != ENCODER_TYPE_UNKNOWN);
dev->run->pictures[buf_index].encode_begin_time = get_now_monotonic();
if (encoder->run->type == ENCODER_TYPE_CPU) {
cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality);
} else if (encoder->run->type == ENCODER_TYPE_HW) {
@@ -201,6 +203,8 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
}
# endif
dev->run->pictures[buf_index].encode_end_time = get_now_monotonic();
return 0;
# pragma GCC diagnostic ignored "-Wunused-label"

View File

@@ -77,7 +77,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
static void _http_exposed_refresh(int fd, short event, void *v_server);
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated);
static bool _expose_new_picture(struct http_server_t *server);
static bool _expose_new_picture_unsafe(struct http_server_t *server);
static bool _expose_blank_picture(struct http_server_t *server);
@@ -807,7 +807,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
LOG_DEBUG("Refreshing HTTP exposed ...");
A_MUTEX_LOCK(&server->run->stream->mutex);
if (server->run->stream->picture.used > 0) { // If online
picture_updated = _expose_new_picture(server);
picture_updated = _expose_new_picture_unsafe(server);
UNLOCK_STREAM;
} else {
UNLOCK_STREAM;
@@ -825,7 +825,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
_http_queue_send_stream(server, stream_updated, picture_updated);
}
static bool _expose_new_picture(struct http_server_t *server) {
static bool _expose_new_picture_unsafe(struct http_server_t *server) {
# define STREAM(_next) server->run->stream->_next
# define EXPOSED(_next) server->run->exposed->_next

View File

@@ -32,6 +32,10 @@
#include <pthread.h>
#ifdef WITH_WORKERS_GPIO_DEBUG
# include <wiringPi.h>
#endif
#include "config.h"
#include "tools.h"
#include "logging.h"
@@ -377,6 +381,15 @@ int main(int argc, char *argv[]) {
LOGGING_INIT;
# ifdef WITH_WORKERS_GPIO_DEBUG
if (wiringPiSetupGpio() < 0) {
LOG_PERROR("Can't initialize wiringPi GPIO");
return 1;
} else {
LOG_INFO("Using wiringPi to debug using GPIO");
}
# endif
dev = device_init();
encoder = encoder_init();
stream = stream_init(dev, encoder);

View File

@@ -20,6 +20,7 @@
*****************************************************************************/
#include <stdbool.h>
#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
@@ -28,7 +29,6 @@
#include <pthread.h>
#include <sys/select.h>
#include <linux/videodev2.h>
#include "tools.h"
@@ -38,16 +38,70 @@
#include "encoder.h"
#include "stream.h"
#ifdef WITH_WORKERS_GPIO_DEBUG
# include <wiringPi.h>
# ifndef WORKERS_GPIO_DEBUG_START_PIN
# define WORKERS_GPIO_DEBUG_START_PIN 5
# endif
#endif
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
struct _worker_t {
pthread_t tid;
unsigned number;
atomic_bool *proc_stop;
atomic_bool *workers_stop;
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
static void *_stream_worker_thread(void *v_worker);
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
pthread_mutex_t last_comp_time_mutex;
long double last_comp_time;
pthread_mutex_t has_job_mutex;
int buf_index;
atomic_bool has_job;
bool job_timely;
bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond;
pthread_mutex_t *free_workers_mutex;
unsigned *free_workers;
pthread_cond_t *free_workers_cond;
struct _worker_t *order_prev;
struct _worker_t *order_next;
struct device_t *dev;
struct encoder_t *encoder;
};
struct _workers_pool_t {
unsigned n_workers;
struct _worker_t *workers;
struct _worker_t *oldest_worker;
struct _worker_t *latest_worker;
pthread_mutex_t free_workers_mutex;
unsigned free_workers;
pthread_cond_t free_workers_cond;
atomic_bool workers_stop;
long double desired_frames_interval;
};
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream);
static struct _workers_pool_t *_stream_init(struct stream_t *stream);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps);
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream);
static void _workers_pool_destroy(struct _workers_pool_t *pool);
static void *__worker_thread(void *v_worker);
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool);
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index);
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool);
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
@@ -74,20 +128,15 @@ void stream_destroy(struct stream_t *stream) {
}
void stream_loop(struct stream_t *stream) {
struct workers_pool_t pool;
MEMSET_ZERO(pool);
atomic_init(&pool.workers_stop, false);
pool.encoder = stream->encoder;
struct _workers_pool_t *pool;
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
while (_stream_init_loop(stream, &pool) == 0) {
struct worker_t *oldest_worker = NULL;
struct worker_t *last_worker = NULL;
while ((pool = _stream_init_loop(stream)) != NULL) {
long double grab_after = 0;
unsigned fluency_passed = 0;
unsigned captured_fps = 0;
unsigned captured_fps_accum = 0;
long long captured_fps_second = 0;
bool persistent_timeout_reported = false;
@@ -98,43 +147,22 @@ void stream_loop(struct stream_t *stream) {
LOG_INFO("Capturing ...");
while (!atomic_load(&stream->proc->stop)) {
int free_worker_number = -1;
struct _worker_t *ready_worker;
SEP_DEBUG('-');
LOG_DEBUG("Waiting for worker ...");
LOG_DEBUG("Waiting for workers ...");
A_MUTEX_LOCK(&pool.free_workers_mutex);
A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
ready_worker = _workers_pool_wait(pool);
if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) {
if (oldest_worker->job_failed) {
break;
if (!ready_worker->job_failed) {
if (ready_worker->job_timely) {
_stream_expose_picture(stream, ready_worker->buf_index, captured_fps);
LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number);
} else {
LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number);
}
_stream_expose_picture(stream, oldest_worker->buf_index);
free_worker_number = oldest_worker->number;
oldest_worker = oldest_worker->order_next;
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
} else {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
if (
!atomic_load(&pool.workers[number].has_job) && (
free_worker_number == -1
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
)
) {
free_worker_number = number;
break;
}
}
assert(free_worker_number >= 0);
assert(!atomic_load(&pool.workers[free_worker_number].has_job));
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
break;
}
if (atomic_load(&stream->proc->stop)) {
@@ -145,30 +173,18 @@ void stream_loop(struct stream_t *stream) {
usleep(1000000);
}
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
bool has_read;
bool has_write;
bool has_error;
int selected = device_select(stream->dev, &has_read, &has_write, &has_error);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
struct timeval timeout;
timeout.tv_sec = stream->dev->timeout;
timeout.tv_usec = 0;
LOG_DEBUG("Calling select() on video device ...");
int retval = select(stream->dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
LOG_DEBUG("Device select() --> %d", retval);
if (retval < 0) {
if (selected < 0) {
if (errno != EINTR) {
LOG_PERROR("Mainloop select() error");
break;
}
} else if (retval == 0) {
} else if (selected == 0) {
if (stream->dev->persistent) {
if (!persistent_timeout_reported) {
LOG_ERROR("Mainloop select() timeout, polling ...")
@@ -183,7 +199,7 @@ void stream_loop(struct stream_t *stream) {
} else {
persistent_timeout_reported = false;
if (FD_ISSET(stream->dev->run->fd, &read_fds)) {
if (has_read) {
LOG_DEBUG("Frame is ready");
int buf_index;
@@ -193,7 +209,6 @@ void stream_loop(struct stream_t *stream) {
if ((buf_index = device_grab_buffer(stream->dev)) < 0) {
break;
}
stream->dev->run->pictures[buf_index].grab_time = now;
// Workaround for broken, corrupted frames:
// Under low light conditions corrupted frames may get captured.
@@ -201,7 +216,7 @@ void stream_loop(struct stream_t *stream) {
// For example a VGA (640x480) webcam picture is normally >= 8kByte large,
// corrupted frames are smaller.
if (stream->dev->run->hw_buffers[buf_index].used < stream->dev->min_frame_size) {
LOG_DEBUG("Dropping too small frame sized %zu bytes, assuming it as broken",
LOG_DEBUG("Dropped too small frame sized %zu bytes, assuming it was broken",
stream->dev->run->hw_buffers[buf_index].used);
goto pass_frame;
}
@@ -215,50 +230,20 @@ void stream_loop(struct stream_t *stream) {
fluency_passed = 0;
if (now_second != captured_fps_second) {
stream->captured_fps = captured_fps_accum;
captured_fps = captured_fps_accum;
captured_fps_accum = 0;
captured_fps_second = now_second;
LOG_PERF("Oldest worker complete, Captured-FPS = %u", stream->captured_fps);
LOG_PERF("A new second has come, Captured-FPS = %u", captured_fps);
}
captured_fps_accum += 1;
long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool);
long double fluency_delay = _workers_pool_get_fluency_delay(pool);
grab_after = now + fluency_delay;
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
}
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_index);
if (oldest_worker == NULL) {
oldest_worker = &pool.workers[free_worker_number];
last_worker = oldest_worker;
} else {
if (FREE_WORKER(order_next)) {
FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev);
}
if (FREE_WORKER(order_prev)) {
FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next);
}
FREE_WORKER(order_prev) = last_worker;
last_worker->order_next = &pool.workers[free_worker_number];
last_worker = &pool.workers[free_worker_number];
}
last_worker->order_next = NULL;
A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(buf_index) = buf_index;
atomic_store(&FREE_WORKER(has_job), true);
A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
# undef FREE_WORKER
A_MUTEX_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1;
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
_workers_pool_assign(pool, ready_worker, buf_index);
goto next_handlers; // Поток сам освободит буфер
@@ -271,12 +256,12 @@ void stream_loop(struct stream_t *stream) {
next_handlers:
if (FD_ISSET(stream->dev->run->fd, &write_fds)) {
if (has_write) {
LOG_ERROR("Got unexpected writing event, seems device was disconnected");
break;
}
if (FD_ISSET(stream->dev->run->fd, &error_fds)) {
if (has_error) {
LOG_INFO("Got V4L2 event");
if (device_consume_event(stream->dev) < 0) {
break;
@@ -292,11 +277,11 @@ void stream_loop(struct stream_t *stream) {
stream->height = 0;
atomic_store(&stream->updated, true);
A_MUTEX_UNLOCK(&stream->mutex);
}
_stream_destroy_workers(stream, &pool);
device_switch_capturing(stream->dev, false);
device_close(stream->dev);
_workers_pool_destroy(pool);
device_switch_capturing(stream->dev, false);
device_close(stream->dev);
}
}
void stream_loop_break(struct stream_t *stream) {
@@ -307,7 +292,41 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
atomic_store(&stream->proc->slowdown, slowdown);
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream) {
struct _workers_pool_t *pool = NULL;
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
while (!atomic_load(&stream->proc->stop)) {
SEP_INFO('=');
if ((pool = _stream_init(stream)) == NULL) {
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
sleep(stream->dev->error_delay);
} else {
break;
}
}
return pool;
}
static struct _workers_pool_t *_stream_init(struct stream_t *stream) {
if (device_open(stream->dev) < 0) {
goto error;
}
if (device_switch_capturing(stream->dev, true) < 0) {
goto error;
}
encoder_prepare(stream->encoder, stream->dev);
return _workers_pool_init(stream);
error:
device_close(stream->dev);
return NULL;
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
A_MUTEX_LOCK(&stream->mutex);
@@ -323,6 +342,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
stream->width = stream->dev->run->width;
stream->height = stream->dev->run->height;
stream->captured_fps = captured_fps;
atomic_store(&stream->updated, true);
A_MUTEX_UNLOCK(&stream->mutex);
@@ -330,118 +350,98 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
# undef PICTURE
}
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) {
long double sum_comp_time = 0;
long double avg_comp_time;
long double min_delay;
long double soft_delay;
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
struct _workers_pool_t *pool;
for (unsigned number = 0; number < dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers);
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time);
}
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
A_CALLOC(pool, 1);
# undef WORKER
}
avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров
min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров
if (dev->desired_fps > 0 && min_delay > 0) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
soft_delay = ((long double) 1) / dev->desired_fps - sum_comp_time;
return (min_delay > soft_delay ? min_delay : soft_delay);
}
return min_delay;
}
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
int retval = -1;
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
while (!atomic_load(&stream->proc->stop)) {
if ((retval = _stream_init(stream, pool)) < 0) {
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
sleep(stream->dev->error_delay);
} else {
break;
}
}
return retval;
}
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
_stream_destroy_workers(stream, pool);
device_switch_capturing(stream->dev, false);
device_close(stream->dev);
SEP_INFO('=');
if (device_open(stream->dev) < 0) {
goto error;
}
if (device_switch_capturing(stream->dev, true) < 0) {
goto error;
}
encoder_prepare(pool->encoder, stream->dev);
_stream_init_workers(stream, pool);
return 0;
error:
device_close(stream->dev);
return -1;
}
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers);
atomic_store(&pool->workers_stop, false);
A_CALLOC(pool->workers, stream->dev->run->n_workers);
pool->n_workers = stream->dev->run->n_workers;
A_CALLOC(pool->workers, pool->n_workers);
A_MUTEX_INIT(&pool->free_workers_mutex);
A_COND_INIT(&pool->free_workers_cond);
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
atomic_init(&pool->workers_stop, false);
pool->free_workers += 1;
if (stream->dev->desired_fps > 0) {
pool->desired_frames_interval = (long double)1 / stream->dev->desired_fps;
}
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_INIT(&WORKER(has_job_mutex));
atomic_init(&WORKER(has_job), false);
A_COND_INIT(&WORKER(has_job_cond));
WORKER(number) = number;
WORKER(proc_stop) = &stream->proc->stop;
WORKER(workers_stop) = &pool->workers_stop;
WORKER(number) = number;
WORKER(proc_stop) = &stream->proc->stop;
WORKER(workers_stop) = &pool->workers_stop;
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
WORKER(free_workers) = &pool->free_workers;
WORKER(free_workers_cond) = &pool->free_workers_cond;
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
WORKER(free_workers) = &pool->free_workers;
WORKER(free_workers_cond) = &pool->free_workers_cond;
WORKER(dev) = stream->dev;
WORKER(encoder) = pool->encoder;
WORKER(dev) = stream->dev;
WORKER(encoder) = stream->encoder;
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
A_THREAD_CREATE(&WORKER(tid), __worker_thread, (void *)&(pool->workers[number]));
pool->free_workers += 1;
# undef WORKER
}
return pool;
}
static void *_stream_worker_thread(void *v_worker) {
struct worker_t *worker = (struct worker_t *)v_worker;
static void _workers_pool_destroy(struct _workers_pool_t *pool) {
LOG_INFO("Destroying workers pool ...");
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(has_job_mutex));
atomic_store(&WORKER(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_COND_SIGNAL(&WORKER(has_job_cond));
A_THREAD_JOIN(WORKER(tid));
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_COND_DESTROY(&WORKER(has_job_cond));
# undef WORKER
}
A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_COND_DESTROY(&pool->free_workers_cond);
free(pool->workers);
free(pool);
}
static void *__worker_thread(void *v_worker) {
struct _worker_t *worker = (struct _worker_t *)v_worker;
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
# ifdef WITH_WORKERS_GPIO_DEBUG
# define WORKER_GPIO_DEBUG_BUSY digitalWrite(WORKERS_GPIO_DEBUG_START_PIN + worker->number, HIGH)
# define WORKER_GPIO_DEBUG_FREE digitalWrite(WORKERS_GPIO_DEBUG_START_PIN + worker->number, LOW)
pinMode(WORKERS_GPIO_DEBUG_START_PIN + worker->number, OUTPUT);
WORKER_GPIO_DEBUG_FREE;
# else
# define WORKER_GPIO_DEBUG_BUSY
# define WORKER_GPIO_DEBUG_FREE
# endif
while (!atomic_load(worker->proc_stop) && !atomic_load(worker->workers_stop)) {
LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
WORKER_GPIO_DEBUG_FREE;
A_MUTEX_LOCK(&worker->has_job_mutex);
A_COND_WAIT_TRUE(atomic_load(&worker->has_job), &worker->has_job_cond, &worker->has_job_mutex);
A_MUTEX_UNLOCK(&worker->has_job_mutex);
@@ -451,11 +451,11 @@ static void *_stream_worker_thread(void *v_worker) {
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
PICTURE(encode_begin_time) = get_now_monotonic();
WORKER_GPIO_DEBUG_BUSY;
if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
worker->job_failed = true;
worker->job_failed = false;
}
PICTURE(encode_end_time) = get_now_monotonic();
if (device_release_buffer(worker->dev, worker->buf_index) == 0) {
worker->job_start_time = PICTURE(encode_begin_time);
@@ -484,34 +484,96 @@ static void *_stream_worker_thread(void *v_worker) {
}
LOG_DEBUG("Bye-bye (worker %u)", worker->number);
WORKER_GPIO_DEBUG_FREE;
return NULL;
}
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
if (pool->workers) {
LOG_INFO("Destroying workers ...");
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool) {
struct _worker_t *ready_worker = NULL;
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&pool->free_workers_mutex);
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
A_MUTEX_LOCK(&WORKER(has_job_mutex));
atomic_store(&WORKER(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_COND_SIGNAL(&WORKER(has_job_cond));
A_THREAD_JOIN(WORKER(tid));
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_COND_DESTROY(&WORKER(has_job_cond));
# undef WORKER
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
ready_worker = pool->oldest_worker;
ready_worker->job_timely = true;
pool->oldest_worker = pool->oldest_worker->order_next;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
ready_worker == NULL
|| ready_worker->job_start_time < pool->workers[number].job_start_time
)
) {
ready_worker = &pool->workers[number];
break;
}
}
A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_COND_DESTROY(&pool->free_workers_cond);
free(pool->workers);
assert(ready_worker != NULL);
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
}
pool->free_workers = 0;
pool->workers = NULL;
return ready_worker;
}
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index) {
if (pool->oldest_worker == NULL) {
pool->oldest_worker = ready_worker;
pool->latest_worker = pool->oldest_worker;
} else {
if (ready_worker->order_next) {
ready_worker->order_next->order_prev = ready_worker->order_prev;
}
if (ready_worker->order_prev) {
ready_worker->order_prev->order_next = ready_worker->order_next;
}
ready_worker->order_prev = pool->latest_worker;
pool->latest_worker->order_next = ready_worker;
pool->latest_worker = ready_worker;
}
pool->latest_worker->order_next = NULL;
A_MUTEX_LOCK(&ready_worker->has_job_mutex);
ready_worker->buf_index = buf_index;
atomic_store(&ready_worker->has_job, true);
A_MUTEX_UNLOCK(&ready_worker->has_job_mutex);
A_COND_SIGNAL(&ready_worker->has_job_cond);
A_MUTEX_LOCK(&pool->free_workers_mutex);
pool->free_workers -= 1;
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
}
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool) {
long double sum_comp_time = 0;
long double avg_comp_time;
long double min_delay;
long double soft_delay;
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time);
}
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER
}
avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров
min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
if (pool->desired_frames_interval > 0 && min_delay > 0) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
soft_delay = pool->desired_frames_interval - sum_comp_time;
return (min_delay > soft_delay ? min_delay : soft_delay);
}
return min_delay;
}

View File

@@ -22,53 +22,12 @@
#pragma once
#include <stdbool.h>
#include <stdatomic.h>
#include <pthread.h>
#include "device.h"
#include "encoder.h"
struct worker_t {
pthread_t tid;
unsigned number;
atomic_bool *proc_stop;
atomic_bool *workers_stop;
pthread_mutex_t last_comp_time_mutex;
long double last_comp_time;
pthread_mutex_t has_job_mutex;
int buf_index;
atomic_bool has_job;
bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond;
pthread_mutex_t *free_workers_mutex;
unsigned *free_workers;
pthread_cond_t *free_workers_cond;
struct worker_t *order_prev;
struct worker_t *order_next;
struct device_t *dev;
struct encoder_t *encoder;
};
struct workers_pool_t {
struct worker_t *workers;
atomic_bool workers_stop;
pthread_mutex_t free_workers_mutex;
unsigned free_workers;
pthread_cond_t free_workers_cond;
struct encoder_t *encoder;
};
struct process_t {
atomic_bool stop;
atomic_bool slowdown;