From 47378a17dbd40228c2916a0b5d1ad319f127d667 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Sun, 2 Jun 2019 15:22:34 +0300 Subject: [PATCH] refactoring --- src/stream.c | 195 ++++++++++++++++++++++++--------------------------- src/stream.h | 4 +- 2 files changed, 93 insertions(+), 106 deletions(-) diff --git a/src/stream.c b/src/stream.c index 8c5a595..f5517ae 100644 --- a/src/stream.c +++ b/src/stream.c @@ -46,17 +46,17 @@ #endif -static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker); +static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool); static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); -static void _stream_assign_worker(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index); +static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index); static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); -static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool); -static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool); +static struct workers_pool_t *_stream_init_loop(struct stream_t *stream); +static struct workers_pool_t *_stream_init(struct stream_t *stream); -static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool); +static struct workers_pool_t *_workers_pool_init(struct stream_t *stream); static void *_stream_worker_thread(void *v_worker); -static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool); +static void _workers_pool_destroy(struct workers_pool_t *pool); struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { @@ -83,16 +83,12 @@ void stream_destroy(struct stream_t *stream) { } void stream_loop(struct stream_t *stream) { - struct workers_pool_t pool; - - MEMSET_ZERO(pool); - atomic_init(&pool.workers_stop, false); - pool.encoder = stream->encoder; + struct workers_pool_t *pool; LOG_INFO("Using V4L2 device: %s", stream->dev->path); LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); - while (_stream_init_loop(stream, &pool) == 0) { + while ((pool = _stream_init_loop(stream)) != NULL) { long double grab_after = 0; unsigned fluency_passed = 0; unsigned captured_fps_accum = 0; @@ -110,15 +106,17 @@ void stream_loop(struct stream_t *stream) { SEP_DEBUG('-'); LOG_DEBUG("Waiting for worker ..."); - if (_stream_wait_worker(stream, &pool, &ready_worker)) { - _stream_expose_picture(stream, ready_worker->buf_index); - LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number); - } else { - LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number); - } + ready_worker = _workers_pool_wait(pool); - if (ready_worker == NULL) { - break; // Если произошел сбой сжатия + if (!ready_worker->job_failed) { + if (ready_worker->job_timely) { + _stream_expose_picture(stream, ready_worker->buf_index); + LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number); + } else { + LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number); + } + } else { + break; } if (atomic_load(&stream->proc->stop)) { @@ -193,13 +191,13 @@ void stream_loop(struct stream_t *stream) { } captured_fps_accum += 1; - long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool); + long double fluency_delay = _stream_get_fluency_delay(stream->dev, pool); grab_after = now + fluency_delay; LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); } - _stream_assign_worker(&pool, ready_worker, buf_index); + _workers_pool_assign(pool, ready_worker, buf_index); goto next_handlers; // Поток сам освободит буфер @@ -233,11 +231,11 @@ void stream_loop(struct stream_t *stream) { stream->height = 0; atomic_store(&stream->updated, true); A_MUTEX_UNLOCK(&stream->mutex); - } - _stream_destroy_workers(stream, &pool); - device_switch_capturing(stream->dev, false); - device_close(stream->dev); + _workers_pool_destroy(pool); + device_switch_capturing(stream->dev, false); + device_close(stream->dev); + } } void stream_loop_break(struct stream_t *stream) { @@ -248,40 +246,33 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { atomic_store(&stream->proc->slowdown, slowdown); } -static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker) { - bool ok; +static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) { + struct worker_t *ready_worker = NULL; A_MUTEX_LOCK(&pool->free_workers_mutex); A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); A_MUTEX_UNLOCK(&pool->free_workers_mutex); - *ready_worker = NULL; - if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) { - *ready_worker = pool->oldest_worker; + ready_worker = pool->oldest_worker; + ready_worker->job_timely = true; pool->oldest_worker = pool->oldest_worker->order_next; - ok = true; // Воркер успел вовремя } else { - for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { + for (unsigned number = 0; number < pool->n_workers; ++number) { if ( !atomic_load(&pool->workers[number].has_job) && ( - *ready_worker == NULL - || (*ready_worker)->job_start_time < pool->workers[number].job_start_time + ready_worker == NULL + || ready_worker->job_start_time < pool->workers[number].job_start_time ) ) { - *ready_worker = &pool->workers[number]; + ready_worker = &pool->workers[number]; break; } } - assert(*ready_worker != NULL); - ok = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + assert(ready_worker != NULL); + ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) } - - if ((*ready_worker)->job_failed) { - *ready_worker = NULL; - ok = false; - } - return ok; + return ready_worker; } static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { @@ -307,7 +298,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) # undef PICTURE } -static void _stream_assign_worker(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) { +static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) { if (pool->oldest_worker == NULL) { pool->oldest_worker = ready_worker; pool->latest_worker = pool->oldest_worker; @@ -343,7 +334,7 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker long double min_delay; long double soft_delay; - for (unsigned number = 0; number < dev->run->n_workers; ++number) { + for (unsigned number = 0; number < pool->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next A_MUTEX_LOCK(&WORKER(last_comp_time_mutex)); @@ -355,9 +346,9 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker # undef WORKER } - avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров + avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров - min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров + min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров if (dev->desired_fps > 0 && min_delay > 0) { // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps @@ -368,28 +359,25 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker return min_delay; } -static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) { - int retval = -1; +static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) { + struct workers_pool_t *pool = NULL; LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop)); + while (!atomic_load(&stream->proc->stop)) { - if ((retval = _stream_init(stream, pool)) < 0) { + SEP_INFO('='); + + if ((pool = _stream_init(stream)) == NULL) { LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay); sleep(stream->dev->error_delay); } else { break; } } - return retval; + return pool; } -static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { - _stream_destroy_workers(stream, pool); - device_switch_capturing(stream->dev, false); - device_close(stream->dev); - - SEP_INFO('='); - +static struct workers_pool_t *_stream_init(struct stream_t *stream) { if (device_open(stream->dev) < 0) { goto error; } @@ -397,50 +385,54 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { goto error; } - encoder_prepare(pool->encoder, stream->dev); - - _stream_init_workers(stream, pool); - - return 0; + encoder_prepare(stream->encoder, stream->dev); + return _workers_pool_init(stream); error: device_close(stream->dev); - return -1; + return NULL; } -static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) { - LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers); +static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { + struct workers_pool_t *pool; - atomic_store(&pool->workers_stop, false); - A_CALLOC(pool->workers, stream->dev->run->n_workers); + LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers); + + A_CALLOC(pool, 1); + + pool->n_workers = stream->dev->run->n_workers; + A_CALLOC(pool->workers, pool->n_workers); + + atomic_init(&pool->workers_stop, false); A_MUTEX_INIT(&pool->free_workers_mutex); A_COND_INIT(&pool->free_workers_cond); - for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { + for (unsigned number = 0; number < pool->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next - pool->free_workers += 1; - A_MUTEX_INIT(&WORKER(has_job_mutex)); atomic_init(&WORKER(has_job), false); A_COND_INIT(&WORKER(has_job_cond)); - WORKER(number) = number; - WORKER(proc_stop) = &stream->proc->stop; - WORKER(workers_stop) = &pool->workers_stop; + WORKER(number) = number; + WORKER(proc_stop) = &stream->proc->stop; + WORKER(workers_stop) = &pool->workers_stop; - WORKER(free_workers_mutex) = &pool->free_workers_mutex; - WORKER(free_workers) = &pool->free_workers; - WORKER(free_workers_cond) = &pool->free_workers_cond; + WORKER(free_workers_mutex) = &pool->free_workers_mutex; + WORKER(free_workers) = &pool->free_workers; + WORKER(free_workers_cond) = &pool->free_workers_cond; - WORKER(dev) = stream->dev; - WORKER(encoder) = pool->encoder; + WORKER(dev) = stream->dev; + WORKER(encoder) = stream->encoder; A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number])); + pool->free_workers += 1; + # undef WORKER } + return pool; } static void *_stream_worker_thread(void *v_worker) { @@ -509,33 +501,28 @@ static void *_stream_worker_thread(void *v_worker) { return NULL; } -static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) { - if (pool->workers) { - LOG_INFO("Destroying workers ..."); +static void _workers_pool_destroy(struct workers_pool_t *pool) { + LOG_INFO("Destroying workers pool ..."); - atomic_store(&pool->workers_stop, true); - for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { -# define WORKER(_next) pool->workers[number]._next + atomic_store(&pool->workers_stop, true); + for (unsigned number = 0; number < pool->n_workers; ++number) { +# define WORKER(_next) pool->workers[number]._next - A_MUTEX_LOCK(&WORKER(has_job_mutex)); - atomic_store(&WORKER(has_job), true); // Final job: die - A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); - A_COND_SIGNAL(&WORKER(has_job_cond)); + A_MUTEX_LOCK(&WORKER(has_job_mutex)); + atomic_store(&WORKER(has_job), true); // Final job: die + A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); + A_COND_SIGNAL(&WORKER(has_job_cond)); - A_THREAD_JOIN(WORKER(tid)); - A_MUTEX_DESTROY(&WORKER(has_job_mutex)); - A_COND_DESTROY(&WORKER(has_job_cond)); + A_THREAD_JOIN(WORKER(tid)); + A_MUTEX_DESTROY(&WORKER(has_job_mutex)); + A_COND_DESTROY(&WORKER(has_job_cond)); -# undef WORKER - } - - A_MUTEX_DESTROY(&pool->free_workers_mutex); - A_COND_DESTROY(&pool->free_workers_cond); - - free(pool->workers); +# undef WORKER } - pool->free_workers = 0; - pool->workers = NULL; - pool->oldest_worker = NULL; - pool->latest_worker = NULL; + + A_MUTEX_DESTROY(&pool->free_workers_mutex); + A_COND_DESTROY(&pool->free_workers_cond); + + free(pool->workers); + free(pool); } diff --git a/src/stream.h b/src/stream.h index 903cb86..8f6e9dd 100644 --- a/src/stream.h +++ b/src/stream.h @@ -43,6 +43,7 @@ struct worker_t { pthread_mutex_t has_job_mutex; int buf_index; atomic_bool has_job; + bool job_timely; bool job_failed; long double job_start_time; pthread_cond_t has_job_cond; @@ -59,6 +60,7 @@ struct worker_t { }; struct workers_pool_t { + unsigned n_workers; struct worker_t *workers; struct worker_t *oldest_worker; struct worker_t *latest_worker; @@ -68,8 +70,6 @@ struct workers_pool_t { pthread_cond_t free_workers_cond; atomic_bool workers_stop; - - struct encoder_t *encoder; }; struct process_t {