refactoring

This commit is contained in:
Devaev Maxim
2019-05-30 06:53:01 +03:00
parent 9ca43f17a3
commit 3f03575222
5 changed files with 131 additions and 92 deletions

View File

@@ -30,6 +30,7 @@
#include <errno.h>
#include <assert.h>
#include <sys/select.h>
#include <sys/mman.h>
#include <linux/videodev2.h>
@@ -232,6 +233,39 @@ int device_switch_capturing(struct device_t *dev, bool enable) {
return 0;
}
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error) {
struct timeval timeout;
int retval;
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(dev->run->fd, &_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
timeout.tv_sec = dev->timeout;
timeout.tv_usec = 0;
LOG_DEBUG("Calling select() on video device ...");
retval = select(dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
if (retval > 0) {
*has_read = FD_ISSET(dev->run->fd, &read_fds);
*has_write = FD_ISSET(dev->run->fd, &write_fds);
*has_error = FD_ISSET(dev->run->fd, &error_fds);
} else {
has_read = false;
has_write = false;
has_error = false;
}
LOG_DEBUG("Device select() --> %d", retval);
return retval;
}
int device_grab_buffer(struct device_t *dev) {
struct v4l2_buffer buf_info;

View File

@@ -120,6 +120,7 @@ int device_open(struct device_t *dev);
void device_close(struct device_t *dev);
int device_switch_capturing(struct device_t *dev, bool enable);
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error);
int device_grab_buffer(struct device_t *dev);
int device_release_buffer(struct device_t *dev, unsigned index);
int device_consume_event(struct device_t *dev);

View File

@@ -77,7 +77,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
static void _http_exposed_refresh(int fd, short event, void *v_server);
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated);
static bool _expose_new_picture(struct http_server_t *server);
static bool _expose_new_picture_unsafe(struct http_server_t *server);
static bool _expose_blank_picture(struct http_server_t *server);
@@ -807,7 +807,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
LOG_DEBUG("Refreshing HTTP exposed ...");
A_MUTEX_LOCK(&server->run->stream->mutex);
if (server->run->stream->picture.used > 0) { // If online
picture_updated = _expose_new_picture(server);
picture_updated = _expose_new_picture_unsafe(server);
UNLOCK_STREAM;
} else {
UNLOCK_STREAM;
@@ -825,7 +825,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
_http_queue_send_stream(server, stream_updated, picture_updated);
}
static bool _expose_new_picture(struct http_server_t *server) {
static bool _expose_new_picture_unsafe(struct http_server_t *server) {
# define STREAM(_next) server->run->stream->_next
# define EXPOSED(_next) server->run->exposed->_next

View File

@@ -20,6 +20,7 @@
*****************************************************************************/
#include <stdbool.h>
#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
@@ -28,7 +29,6 @@
#include <pthread.h>
#include <sys/select.h>
#include <linux/videodev2.h>
#include "tools.h"
@@ -46,8 +46,10 @@
#endif
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static void _stream_give_buf_to_worker(struct workers_pool_t *pool, struct worker_t *free_worker, unsigned buf_index);
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
@@ -91,8 +93,6 @@ void stream_loop(struct stream_t *stream) {
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
while (_stream_init_loop(stream, &pool) == 0) {
struct worker_t *oldest_worker = NULL;
struct worker_t *last_worker = NULL;
long double grab_after = 0;
unsigned fluency_passed = 0;
unsigned captured_fps_accum = 0;
@@ -105,43 +105,20 @@ void stream_loop(struct stream_t *stream) {
LOG_INFO("Capturing ...");
while (!atomic_load(&stream->proc->stop)) {
int free_worker_number = -1;
struct worker_t *ready_worker;
SEP_DEBUG('-');
LOG_DEBUG("Waiting for worker ...");
LOG_DEBUG("Waiting for workers ...");
A_MUTEX_LOCK(&pool.free_workers_mutex);
A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) {
if (oldest_worker->job_failed) {
break;
}
_stream_expose_picture(stream, oldest_worker->buf_index);
free_worker_number = oldest_worker->number;
oldest_worker = oldest_worker->order_next;
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
if (_stream_wait_worker(stream, &pool, &ready_worker)) {
_stream_expose_picture(stream, ready_worker->buf_index);
LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number);
} else {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
if (
!atomic_load(&pool.workers[number].has_job) && (
free_worker_number == -1
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
)
) {
free_worker_number = number;
break;
}
}
LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number);
}
assert(free_worker_number >= 0);
assert(!atomic_load(&pool.workers[free_worker_number].has_job));
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
if (ready_worker == NULL) {
break;
}
if (atomic_load(&stream->proc->stop)) {
@@ -152,30 +129,18 @@ void stream_loop(struct stream_t *stream) {
usleep(1000000);
}
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
bool has_read;
bool has_write;
bool has_error;
int selected = device_select(stream->dev, &has_read, &has_write, &has_error);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
struct timeval timeout;
timeout.tv_sec = stream->dev->timeout;
timeout.tv_usec = 0;
LOG_DEBUG("Calling select() on video device ...");
int retval = select(stream->dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
LOG_DEBUG("Device select() --> %d", retval);
if (retval < 0) {
if (selected < 0) {
if (errno != EINTR) {
LOG_PERROR("Mainloop select() error");
break;
}
} else if (retval == 0) {
} else if (selected == 0) {
if (stream->dev->persistent) {
if (!persistent_timeout_reported) {
LOG_ERROR("Mainloop select() timeout, polling ...")
@@ -190,7 +155,7 @@ void stream_loop(struct stream_t *stream) {
} else {
persistent_timeout_reported = false;
if (FD_ISSET(stream->dev->run->fd, &read_fds)) {
if (has_read) {
LOG_DEBUG("Frame is ready");
int buf_index;
@@ -225,7 +190,7 @@ void stream_loop(struct stream_t *stream) {
stream->captured_fps = captured_fps_accum;
captured_fps_accum = 0;
captured_fps_second = now_second;
LOG_PERF("Oldest worker complete, Captured-FPS = %u", stream->captured_fps);
LOG_PERF("A new second has come, Captured-FPS = %u", stream->captured_fps);
}
captured_fps_accum += 1;
@@ -235,37 +200,9 @@ void stream_loop(struct stream_t *stream) {
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
}
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_index);
if (oldest_worker == NULL) {
oldest_worker = &pool.workers[free_worker_number];
last_worker = oldest_worker;
} else {
if (FREE_WORKER(order_next)) {
FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev);
}
if (FREE_WORKER(order_prev)) {
FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next);
}
FREE_WORKER(order_prev) = last_worker;
last_worker->order_next = &pool.workers[free_worker_number];
last_worker = &pool.workers[free_worker_number];
}
last_worker->order_next = NULL;
A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(buf_index) = buf_index;
atomic_store(&FREE_WORKER(has_job), true);
A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
# undef FREE_WORKER
A_MUTEX_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1;
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
_stream_give_buf_to_worker(&pool, ready_worker, buf_index);
goto next_handlers; // Поток сам освободит буфер
@@ -278,12 +215,12 @@ void stream_loop(struct stream_t *stream) {
next_handlers:
if (FD_ISSET(stream->dev->run->fd, &write_fds)) {
if (has_write) {
LOG_ERROR("Got unexpected writing event, seems device was disconnected");
break;
}
if (FD_ISSET(stream->dev->run->fd, &error_fds)) {
if (has_error) {
LOG_INFO("Got V4L2 event");
if (device_consume_event(stream->dev) < 0) {
break;
@@ -314,6 +251,42 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
atomic_store(&stream->proc->slowdown, slowdown);
}
static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker) {
bool ok;
A_MUTEX_LOCK(&pool->free_workers_mutex);
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
*ready_worker = NULL;
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
*ready_worker = pool->oldest_worker;
pool->oldest_worker = pool->oldest_worker->order_next;
ok = true; // Воркер успел вовремя
} else {
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
*ready_worker == NULL
|| (*ready_worker)->job_start_time < pool->workers[number].job_start_time
)
) {
*ready_worker = &pool->workers[number];
break;
}
}
assert(*ready_worker != NULL);
ok = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
}
if ((*ready_worker)->job_failed) {
*ready_worker = NULL;
ok = false;
}
return ok;
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
@@ -337,6 +310,34 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
# undef PICTURE
}
static void _stream_give_buf_to_worker(struct workers_pool_t *pool, struct worker_t *free_worker, unsigned buf_index) {
if (pool->oldest_worker == NULL) {
pool->oldest_worker = free_worker;
pool->latest_worker = pool->oldest_worker;
} else {
if (free_worker->order_next) {
free_worker->order_next->order_prev = free_worker->order_prev;
}
if (free_worker->order_prev) {
free_worker->order_prev->order_next = free_worker->order_next;
}
free_worker->order_prev = pool->latest_worker;
pool->latest_worker->order_next = free_worker;
pool->latest_worker = free_worker;
}
pool->latest_worker->order_next = NULL;
A_MUTEX_LOCK(&free_worker->has_job_mutex);
free_worker->buf_index = buf_index;
atomic_store(&free_worker->has_job, true);
A_MUTEX_UNLOCK(&free_worker->has_job_mutex);
A_COND_SIGNAL(&free_worker->has_job_cond);
A_MUTEX_LOCK(&pool->free_workers_mutex);
pool->free_workers -= 1;
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
}
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) {
long double sum_comp_time = 0;
long double avg_comp_time;
@@ -475,7 +476,7 @@ static void *_stream_worker_thread(void *v_worker) {
PICTURE(encode_begin_time) = get_now_monotonic();
if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
worker->job_failed = true;
worker->job_failed = false;
}
PICTURE(encode_end_time) = get_now_monotonic();

View File

@@ -60,12 +60,15 @@ struct worker_t {
struct workers_pool_t {
struct worker_t *workers;
atomic_bool workers_stop;
struct worker_t *oldest_worker;
struct worker_t *latest_worker;
pthread_mutex_t free_workers_mutex;
unsigned free_workers;
pthread_cond_t free_workers_cond;
atomic_bool workers_stop;
struct encoder_t *encoder;
};