diff --git a/src/stream.c b/src/stream.c index fe4961d..b5d7a58 100644 --- a/src/stream.c +++ b/src/stream.c @@ -45,7 +45,7 @@ static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *poo static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool); static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool); -static void *_stream_worker_thread(void *v_ctx); +static void *_stream_worker_thread(void *v_worker); static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool); static int _stream_control(struct device_t *dev, bool enable); @@ -109,14 +109,14 @@ void stream_loop(struct stream_t *stream) { A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex); - if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) { + if (oldest_worker && !oldest_worker->has_job && oldest_worker->buf_index >= 0) { if (oldest_worker->job_failed) { break; } - _stream_expose_picture(stream, oldest_worker->ctx.buf_index); + _stream_expose_picture(stream, oldest_worker->buf_index); - free_worker_number = oldest_worker->ctx.number; + free_worker_number = oldest_worker->number; oldest_worker = oldest_worker->order_next; LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number); @@ -232,7 +232,7 @@ void stream_loop(struct stream_t *stream) { # define FREE_WORKER(_next) pool.workers[free_worker_number]._next LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index); - FREE_WORKER(ctx.buf_info) = buf_info; + FREE_WORKER(buf_info) = buf_info; if (!oldest_worker) { oldest_worker = &pool.workers[free_worker_number]; @@ -251,7 +251,7 @@ void stream_loop(struct stream_t *stream) { last_worker->order_next = NULL; A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex)); - FREE_WORKER(ctx.buf_index) = buf_info.index; + FREE_WORKER(buf_index) = buf_info.index; FREE_WORKER(has_job) = true; A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex)); A_COND_SIGNAL(&FREE_WORKER(has_job_cond)); @@ -413,89 +413,78 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next -# define CTX(_next) WORKER(ctx._next) pool->free_workers += 1; A_MUTEX_INIT(&WORKER(has_job_mutex)); A_COND_INIT(&WORKER(has_job_cond)); - CTX(number) = number; - CTX(dev) = stream->dev; - CTX(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop); - CTX(workers_stop) = pool->workers_stop; + WORKER(number) = number; + WORKER(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop); + WORKER(workers_stop) = pool->workers_stop; - CTX(encoder) = pool->encoder; + WORKER(free_workers_mutex) = &pool->free_workers_mutex; + WORKER(free_workers) = &pool->free_workers; + WORKER(free_workers_cond) = &pool->free_workers_cond; - CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex); - CTX(last_comp_time) = &WORKER(last_comp_time); + WORKER(dev) = stream->dev; + WORKER(encoder) = pool->encoder; - CTX(has_job_mutex) = &WORKER(has_job_mutex); - CTX(has_job) = &WORKER(has_job); - CTX(job_failed) = &WORKER(job_failed); - CTX(job_start_time) = &WORKER(job_start_time); - CTX(has_job_cond) = &WORKER(has_job_cond); + A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number])); - CTX(free_workers_mutex) = &pool->free_workers_mutex; - CTX(free_workers) = &pool->free_workers; - CTX(free_workers_cond) = &pool->free_workers_cond; - - A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx)); - -# undef CTX # undef WORKER } } -static void *_stream_worker_thread(void *v_ctx) { - struct worker_context_t *ctx = (struct worker_context_t *)v_ctx; +static void *_stream_worker_thread(void *v_worker) { + struct worker_t *worker = (struct worker_t *)v_worker; - LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number); + LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number); - while (!*ctx->proc_stop && !*ctx->workers_stop) { - LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); - A_MUTEX_LOCK(ctx->has_job_mutex); - A_COND_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); - A_MUTEX_UNLOCK(ctx->has_job_mutex); + while (!*worker->proc_stop && !*worker->workers_stop) { + LOG_DEBUG("Worker %u waiting for a new job ...", worker->number); + A_MUTEX_LOCK(&worker->has_job_mutex); + A_COND_WAIT_TRUE(worker->has_job, &worker->has_job_cond, &worker->has_job_mutex); + A_MUTEX_UNLOCK(&worker->has_job_mutex); - if (!*ctx->workers_stop) { -# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next + if (!*worker->workers_stop) { +# define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]._next - LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index); + LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index); PICTURE(encode_begin_time) = get_now_monotonic(); - if (encoder_compress_buffer(ctx->encoder, ctx->dev, ctx->number, ctx->buf_index) < 0) { - *ctx->job_failed = true; + if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) { + worker->job_failed = true; } PICTURE(encode_end_time) = get_now_monotonic(); - if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) { - *ctx->job_start_time = PICTURE(encode_begin_time); - *ctx->has_job = false; + if (_stream_release_buffer(worker->dev, &worker->buf_info) == 0) { + worker->job_start_time = PICTURE(encode_begin_time); + worker->has_job = false; - long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time; + long double last_comp_time = PICTURE(encode_end_time) - worker->job_start_time; - A_MUTEX_LOCK(ctx->last_comp_time_mutex); - *ctx->last_comp_time = last_comp_time; - A_MUTEX_UNLOCK(ctx->last_comp_time_mutex); + A_MUTEX_LOCK(&worker->last_comp_time_mutex); + worker->last_comp_time = last_comp_time; + A_MUTEX_UNLOCK(&worker->last_comp_time_mutex); LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u", - PICTURE(size), last_comp_time, ctx->number, ctx->buf_index); + PICTURE(size), last_comp_time, worker->number, worker->buf_index); } else { - *ctx->job_failed = true; - *ctx->has_job = false; + worker->job_failed = true; + worker->has_job = false; } # undef PICTURE } - A_MUTEX_LOCK(ctx->free_workers_mutex); - *ctx->free_workers += 1; - A_MUTEX_UNLOCK(ctx->free_workers_mutex); - A_COND_SIGNAL(ctx->free_workers_cond); + A_MUTEX_LOCK(worker->free_workers_mutex); + *worker->free_workers += 1; + A_MUTEX_UNLOCK(worker->free_workers_mutex); + A_COND_SIGNAL(worker->free_workers_cond); } - LOG_DEBUG("Bye-bye (worker %u)", ctx->number); + LOG_DEBUG("Bye-bye (worker %u)", worker->number); return NULL; } diff --git a/src/stream.h b/src/stream.h index a25687e..2676887 100644 --- a/src/stream.h +++ b/src/stream.h @@ -31,45 +31,32 @@ #include "encoder.h" -struct worker_context_t { +struct worker_t { + pthread_t tid; unsigned number; - struct device_t *dev; - int buf_index; - struct v4l2_buffer buf_info; sig_atomic_t *volatile proc_stop; bool *workers_stop; - struct encoder_t *encoder; + pthread_mutex_t last_comp_time_mutex; + long double last_comp_time; - pthread_mutex_t *last_comp_time_mutex; - long double *last_comp_time; - - pthread_mutex_t *has_job_mutex; - bool *has_job; - bool *job_failed; - long double *job_start_time; - pthread_cond_t *has_job_cond; + pthread_mutex_t has_job_mutex; + int buf_index; + struct v4l2_buffer buf_info; + bool has_job; + bool job_failed; + long double job_start_time; + pthread_cond_t has_job_cond; pthread_mutex_t *free_workers_mutex; unsigned *free_workers; pthread_cond_t *free_workers_cond; -}; -struct worker_t { - struct worker_context_t ctx; - pthread_t tid; + struct worker_t *order_prev; + struct worker_t *order_next; - pthread_mutex_t last_comp_time_mutex; - long double last_comp_time; - - pthread_mutex_t has_job_mutex; - bool has_job; - bool job_failed; - long double job_start_time; - pthread_cond_t has_job_cond; - - struct worker_t *order_prev; - struct worker_t *order_next; + struct device_t *dev; + struct encoder_t *encoder; }; struct workers_pool_t {