diff --git a/src/stream.c b/src/stream.c index 289d0a6..0c8a0e1 100644 --- a/src/stream.c +++ b/src/stream.c @@ -46,18 +46,62 @@ #endif -static struct workers_pool_t *_stream_init_loop(struct stream_t *stream); -static struct workers_pool_t *_stream_init(struct stream_t *stream); +struct _worker_t { + pthread_t tid; + unsigned number; + atomic_bool *proc_stop; + atomic_bool *workers_stop; + + pthread_mutex_t last_comp_time_mutex; + long double last_comp_time; + + pthread_mutex_t has_job_mutex; + int buf_index; + atomic_bool has_job; + bool job_timely; + bool job_failed; + long double job_start_time; + pthread_cond_t has_job_cond; + + pthread_mutex_t *free_workers_mutex; + unsigned *free_workers; + pthread_cond_t *free_workers_cond; + + struct _worker_t *order_prev; + struct _worker_t *order_next; + + struct device_t *dev; + struct encoder_t *encoder; +}; + +struct _workers_pool_t { + unsigned n_workers; + struct _worker_t *workers; + struct _worker_t *oldest_worker; + struct _worker_t *latest_worker; + + pthread_mutex_t free_workers_mutex; + unsigned free_workers; + pthread_cond_t free_workers_cond; + + atomic_bool workers_stop; + + long double desired_frames_interval; +}; + + +static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream); +static struct _workers_pool_t *_stream_init(struct stream_t *stream); static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); -static struct workers_pool_t *_workers_pool_init(struct stream_t *stream); -static void _workers_pool_destroy(struct workers_pool_t *pool); +static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream); +static void _workers_pool_destroy(struct _workers_pool_t *pool); -static void *_worker_thread(void *v_worker); +static void *__worker_thread(void *v_worker); -static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool); -static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index); -static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool); +static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool); +static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index); +static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool); struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { @@ -84,7 +128,7 @@ void stream_destroy(struct stream_t *stream) { } void stream_loop(struct stream_t *stream) { - struct workers_pool_t *pool; + struct _workers_pool_t *pool; LOG_INFO("Using V4L2 device: %s", stream->dev->path); LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); @@ -102,7 +146,7 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Capturing ..."); while (!atomic_load(&stream->proc->stop)) { - struct worker_t *ready_worker; + struct _worker_t *ready_worker; SEP_DEBUG('-'); LOG_DEBUG("Waiting for worker ..."); @@ -247,8 +291,8 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { atomic_store(&stream->proc->slowdown, slowdown); } -static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) { - struct workers_pool_t *pool = NULL; +static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream) { + struct _workers_pool_t *pool = NULL; LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop)); @@ -265,7 +309,7 @@ static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) { return pool; } -static struct workers_pool_t *_stream_init(struct stream_t *stream) { +static struct _workers_pool_t *_stream_init(struct stream_t *stream) { if (device_open(stream->dev) < 0) { goto error; } @@ -304,8 +348,8 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) # undef PICTURE } -static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { - struct workers_pool_t *pool; +static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) { + struct _workers_pool_t *pool; LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers); @@ -341,7 +385,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { WORKER(dev) = stream->dev; WORKER(encoder) = stream->encoder; - A_THREAD_CREATE(&WORKER(tid), _worker_thread, (void *)&(pool->workers[number])); + A_THREAD_CREATE(&WORKER(tid), __worker_thread, (void *)&(pool->workers[number])); pool->free_workers += 1; @@ -350,7 +394,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { return pool; } -static void _workers_pool_destroy(struct workers_pool_t *pool) { +static void _workers_pool_destroy(struct _workers_pool_t *pool) { LOG_INFO("Destroying workers pool ..."); atomic_store(&pool->workers_stop, true); @@ -376,8 +420,8 @@ static void _workers_pool_destroy(struct workers_pool_t *pool) { free(pool); } -static void *_worker_thread(void *v_worker) { - struct worker_t *worker = (struct worker_t *)v_worker; +static void *__worker_thread(void *v_worker) { + struct _worker_t *worker = (struct _worker_t *)v_worker; LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number); @@ -442,8 +486,8 @@ static void *_worker_thread(void *v_worker) { return NULL; } -static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) { - struct worker_t *ready_worker = NULL; +static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool) { + struct _worker_t *ready_worker = NULL; A_MUTEX_LOCK(&pool->free_workers_mutex); A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); @@ -471,7 +515,7 @@ static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) { return ready_worker; } -static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) { +static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index) { if (pool->oldest_worker == NULL) { pool->oldest_worker = ready_worker; pool->latest_worker = pool->oldest_worker; @@ -501,7 +545,7 @@ static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *r LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number); } -static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool) { +static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool) { long double sum_comp_time = 0; long double avg_comp_time; long double min_delay; diff --git a/src/stream.h b/src/stream.h index 6da9423..097798d 100644 --- a/src/stream.h +++ b/src/stream.h @@ -22,58 +22,12 @@ #pragma once -#include #include -#include - #include "device.h" #include "encoder.h" -struct worker_t { - pthread_t tid; - unsigned number; - atomic_bool *proc_stop; - atomic_bool *workers_stop; - - pthread_mutex_t last_comp_time_mutex; - long double last_comp_time; - - pthread_mutex_t has_job_mutex; - int buf_index; - atomic_bool has_job; - bool job_timely; - bool job_failed; - long double job_start_time; - pthread_cond_t has_job_cond; - - pthread_mutex_t *free_workers_mutex; - unsigned *free_workers; - pthread_cond_t *free_workers_cond; - - struct worker_t *order_prev; - struct worker_t *order_next; - - struct device_t *dev; - struct encoder_t *encoder; -}; - -struct workers_pool_t { - unsigned n_workers; - struct worker_t *workers; - struct worker_t *oldest_worker; - struct worker_t *latest_worker; - - pthread_mutex_t free_workers_mutex; - unsigned free_workers; - pthread_cond_t free_workers_cond; - - atomic_bool workers_stop; - - long double desired_frames_interval; -}; - struct process_t { atomic_bool stop; atomic_bool slowdown;