diff --git a/src/device.c b/src/device.c index 7c03c8f..a765411 100644 --- a/src/device.c +++ b/src/device.c @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -232,6 +233,39 @@ int device_switch_capturing(struct device_t *dev, bool enable) { return 0; } +int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error) { + struct timeval timeout; + int retval; + +# define INIT_FD_SET(_set) \ + fd_set _set; FD_ZERO(&_set); FD_SET(dev->run->fd, &_set); + + INIT_FD_SET(read_fds); + INIT_FD_SET(write_fds); + INIT_FD_SET(error_fds); + +# undef INIT_FD_SET + + timeout.tv_sec = dev->timeout; + timeout.tv_usec = 0; + + LOG_DEBUG("Calling select() on video device ..."); + + retval = select(dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout); + if (retval > 0) { + *has_read = FD_ISSET(dev->run->fd, &read_fds); + *has_write = FD_ISSET(dev->run->fd, &write_fds); + *has_error = FD_ISSET(dev->run->fd, &error_fds); + } else { + has_read = false; + has_write = false; + has_error = false; + } + + LOG_DEBUG("Device select() --> %d", retval); + return retval; +} + int device_grab_buffer(struct device_t *dev) { struct v4l2_buffer buf_info; diff --git a/src/device.h b/src/device.h index 63032f0..f7f4a84 100644 --- a/src/device.h +++ b/src/device.h @@ -120,6 +120,7 @@ int device_open(struct device_t *dev); void device_close(struct device_t *dev); int device_switch_capturing(struct device_t *dev, bool enable); +int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error); int device_grab_buffer(struct device_t *dev); int device_release_buffer(struct device_t *dev, unsigned index); int device_consume_event(struct device_t *dev); diff --git a/src/http/server.c b/src/http/server.c index 87b4737..4ebadb9 100644 --- a/src/http/server.c +++ b/src/http/server.c @@ -77,7 +77,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha static void _http_exposed_refresh(int fd, short event, void *v_server); static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated); -static bool _expose_new_picture(struct http_server_t *server); +static bool _expose_new_picture_unsafe(struct http_server_t *server); static bool _expose_blank_picture(struct http_server_t *server); @@ -807,7 +807,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv LOG_DEBUG("Refreshing HTTP exposed ..."); A_MUTEX_LOCK(&server->run->stream->mutex); if (server->run->stream->picture.used > 0) { // If online - picture_updated = _expose_new_picture(server); + picture_updated = _expose_new_picture_unsafe(server); UNLOCK_STREAM; } else { UNLOCK_STREAM; @@ -825,7 +825,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv _http_queue_send_stream(server, stream_updated, picture_updated); } -static bool _expose_new_picture(struct http_server_t *server) { +static bool _expose_new_picture_unsafe(struct http_server_t *server) { # define STREAM(_next) server->run->stream->_next # define EXPOSED(_next) server->run->exposed->_next diff --git a/src/stream.c b/src/stream.c index e1c8dc5..c0f8e1c 100644 --- a/src/stream.c +++ b/src/stream.c @@ -20,6 +20,7 @@ *****************************************************************************/ +#include #include #include #include @@ -28,7 +29,6 @@ #include -#include #include #include "tools.h" @@ -46,8 +46,10 @@ #endif -static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); +static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker); static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); +static void _stream_give_buf_to_worker(struct workers_pool_t *pool, struct worker_t *free_worker, unsigned buf_index); +static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool); static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool); @@ -91,8 +93,6 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); while (_stream_init_loop(stream, &pool) == 0) { - struct worker_t *oldest_worker = NULL; - struct worker_t *last_worker = NULL; long double grab_after = 0; unsigned fluency_passed = 0; unsigned captured_fps_accum = 0; @@ -105,43 +105,20 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Capturing ..."); while (!atomic_load(&stream->proc->stop)) { - int free_worker_number = -1; + struct worker_t *ready_worker; SEP_DEBUG('-'); + LOG_DEBUG("Waiting for worker ..."); - LOG_DEBUG("Waiting for workers ..."); - A_MUTEX_LOCK(&pool.free_workers_mutex); - A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); - A_MUTEX_UNLOCK(&pool.free_workers_mutex); - - if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) { - if (oldest_worker->job_failed) { - break; - } - - _stream_expose_picture(stream, oldest_worker->buf_index); - - free_worker_number = oldest_worker->number; - oldest_worker = oldest_worker->order_next; - - LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number); + if (_stream_wait_worker(stream, &pool, &ready_worker)) { + _stream_expose_picture(stream, ready_worker->buf_index); + LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number); } else { - for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { - if ( - !atomic_load(&pool.workers[number].has_job) && ( - free_worker_number == -1 - || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time - ) - ) { - free_worker_number = number; - break; - } - } + LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number); + } - assert(free_worker_number >= 0); - assert(!atomic_load(&pool.workers[free_worker_number].has_job)); - - LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number); + if (ready_worker == NULL) { + break; } if (atomic_load(&stream->proc->stop)) { @@ -152,30 +129,18 @@ void stream_loop(struct stream_t *stream) { usleep(1000000); } -# define INIT_FD_SET(_set) \ - fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set); + bool has_read; + bool has_write; + bool has_error; + int selected = device_select(stream->dev, &has_read, &has_write, &has_error); - INIT_FD_SET(read_fds); - INIT_FD_SET(write_fds); - INIT_FD_SET(error_fds); - -# undef INIT_FD_SET - - struct timeval timeout; - timeout.tv_sec = stream->dev->timeout; - timeout.tv_usec = 0; - - LOG_DEBUG("Calling select() on video device ..."); - int retval = select(stream->dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout); - LOG_DEBUG("Device select() --> %d", retval); - - if (retval < 0) { + if (selected < 0) { if (errno != EINTR) { LOG_PERROR("Mainloop select() error"); break; } - } else if (retval == 0) { + } else if (selected == 0) { if (stream->dev->persistent) { if (!persistent_timeout_reported) { LOG_ERROR("Mainloop select() timeout, polling ...") @@ -190,7 +155,7 @@ void stream_loop(struct stream_t *stream) { } else { persistent_timeout_reported = false; - if (FD_ISSET(stream->dev->run->fd, &read_fds)) { + if (has_read) { LOG_DEBUG("Frame is ready"); int buf_index; @@ -225,7 +190,7 @@ void stream_loop(struct stream_t *stream) { stream->captured_fps = captured_fps_accum; captured_fps_accum = 0; captured_fps_second = now_second; - LOG_PERF("Oldest worker complete, Captured-FPS = %u", stream->captured_fps); + LOG_PERF("A new second has come, Captured-FPS = %u", stream->captured_fps); } captured_fps_accum += 1; @@ -235,37 +200,9 @@ void stream_loop(struct stream_t *stream) { LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); } -# define FREE_WORKER(_next) pool.workers[free_worker_number]._next - LOG_DEBUG("Grabbed a new frame to buffer %u", buf_index); - if (oldest_worker == NULL) { - oldest_worker = &pool.workers[free_worker_number]; - last_worker = oldest_worker; - } else { - if (FREE_WORKER(order_next)) { - FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev); - } - if (FREE_WORKER(order_prev)) { - FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next); - } - FREE_WORKER(order_prev) = last_worker; - last_worker->order_next = &pool.workers[free_worker_number]; - last_worker = &pool.workers[free_worker_number]; - } - last_worker->order_next = NULL; - - A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex)); - FREE_WORKER(buf_index) = buf_index; - atomic_store(&FREE_WORKER(has_job), true); - A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex)); - A_COND_SIGNAL(&FREE_WORKER(has_job_cond)); - -# undef FREE_WORKER - - A_MUTEX_LOCK(&pool.free_workers_mutex); - pool.free_workers -= 1; - A_MUTEX_UNLOCK(&pool.free_workers_mutex); + _stream_give_buf_to_worker(&pool, ready_worker, buf_index); goto next_handlers; // Поток сам освободит буфер @@ -278,12 +215,12 @@ void stream_loop(struct stream_t *stream) { next_handlers: - if (FD_ISSET(stream->dev->run->fd, &write_fds)) { + if (has_write) { LOG_ERROR("Got unexpected writing event, seems device was disconnected"); break; } - if (FD_ISSET(stream->dev->run->fd, &error_fds)) { + if (has_error) { LOG_INFO("Got V4L2 event"); if (device_consume_event(stream->dev) < 0) { break; @@ -314,6 +251,42 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { atomic_store(&stream->proc->slowdown, slowdown); } +static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker) { + bool ok; + + A_MUTEX_LOCK(&pool->free_workers_mutex); + A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); + A_MUTEX_UNLOCK(&pool->free_workers_mutex); + + *ready_worker = NULL; + + if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) { + *ready_worker = pool->oldest_worker; + pool->oldest_worker = pool->oldest_worker->order_next; + ok = true; // Воркер успел вовремя + } else { + for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { + if ( + !atomic_load(&pool->workers[number].has_job) && ( + *ready_worker == NULL + || (*ready_worker)->job_start_time < pool->workers[number].job_start_time + ) + ) { + *ready_worker = &pool->workers[number]; + break; + } + } + assert(*ready_worker != NULL); + ok = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + } + + if ((*ready_worker)->job_failed) { + *ready_worker = NULL; + ok = false; + } + return ok; +} + static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { # define PICTURE(_next) stream->dev->run->pictures[buf_index]._next @@ -337,6 +310,34 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) # undef PICTURE } +static void _stream_give_buf_to_worker(struct workers_pool_t *pool, struct worker_t *free_worker, unsigned buf_index) { + if (pool->oldest_worker == NULL) { + pool->oldest_worker = free_worker; + pool->latest_worker = pool->oldest_worker; + } else { + if (free_worker->order_next) { + free_worker->order_next->order_prev = free_worker->order_prev; + } + if (free_worker->order_prev) { + free_worker->order_prev->order_next = free_worker->order_next; + } + free_worker->order_prev = pool->latest_worker; + pool->latest_worker->order_next = free_worker; + pool->latest_worker = free_worker; + } + pool->latest_worker->order_next = NULL; + + A_MUTEX_LOCK(&free_worker->has_job_mutex); + free_worker->buf_index = buf_index; + atomic_store(&free_worker->has_job, true); + A_MUTEX_UNLOCK(&free_worker->has_job_mutex); + A_COND_SIGNAL(&free_worker->has_job_cond); + + A_MUTEX_LOCK(&pool->free_workers_mutex); + pool->free_workers -= 1; + A_MUTEX_UNLOCK(&pool->free_workers_mutex); +} + static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) { long double sum_comp_time = 0; long double avg_comp_time; @@ -475,7 +476,7 @@ static void *_stream_worker_thread(void *v_worker) { PICTURE(encode_begin_time) = get_now_monotonic(); if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) { - worker->job_failed = true; + worker->job_failed = false; } PICTURE(encode_end_time) = get_now_monotonic(); diff --git a/src/stream.h b/src/stream.h index 5503c5d..903cb86 100644 --- a/src/stream.h +++ b/src/stream.h @@ -60,12 +60,15 @@ struct worker_t { struct workers_pool_t { struct worker_t *workers; - atomic_bool workers_stop; + struct worker_t *oldest_worker; + struct worker_t *latest_worker; pthread_mutex_t free_workers_mutex; unsigned free_workers; pthread_cond_t free_workers_cond; + atomic_bool workers_stop; + struct encoder_t *encoder; };