From 01e21e419d7000ae9ff7170a7496828d931ac8d4 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Wed, 6 Mar 2019 03:07:02 +0300 Subject: [PATCH] refactoring --- src/encoders/cpu/encoder.c | 10 ++-- src/encoders/omx/encoder.c | 40 ++++++++------ src/stream.c | 107 +++++++++++++++++++++---------------- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/src/encoders/cpu/encoder.c b/src/encoders/cpu/encoder.c index 8f86c52..0fc9337 100644 --- a/src/encoders/cpu/encoder.c +++ b/src/encoders/cpu/encoder.c @@ -79,8 +79,10 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned jpeg.err = jpeg_std_error(&jpeg_error); jpeg_create_compress(&jpeg); - dev->run->pictures[index].size = 0; - _jpeg_set_dest_picture(&jpeg, dev->run->pictures[index].data, &dev->run->pictures[index].size); +# define PICTURE(_next) dev->run->pictures[index]._next + + PICTURE(size) = 0; + _jpeg_set_dest_picture(&jpeg, PICTURE(data), &PICTURE(size)); jpeg.image_width = dev->run->width; jpeg.image_height = dev->run->height; @@ -110,8 +112,8 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned // https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg jpeg_finish_compress(&jpeg); jpeg_destroy_compress(&jpeg); - assert(dev->run->pictures[index].size > 0); - assert(dev->run->pictures[index].size <= dev->run->max_picture_size); + assert(PICTURE(size) > 0); + assert(PICTURE(size) <= dev->run->max_picture_size); } static void _jpeg_set_dest_picture(j_compress_ptr jpeg, unsigned char *picture, size_t *written) { diff --git a/src/encoders/omx/encoder.c b/src/encoders/omx/encoder.c index d77824c..fed33e8 100644 --- a/src/encoders/omx/encoder.c +++ b/src/encoders/omx/encoder.c @@ -172,8 +172,13 @@ int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, un } int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index) { +# define PICTURE(_next) dev->run->pictures[index]._next +# define HW_BUFFER(_next) dev->run->hw_buffers[index]._next +# define IN(_next) omx->input_buffer->_next +# define OUT(_next) omx->output_buffer->_next + OMX_ERRORTYPE error; - size_t slice_size = omx->input_buffer->nAllocLen; + size_t slice_size = IN(nAllocLen); size_t pos = 0; if ((error = OMX_FillThisBuffer(omx->encoder, omx->output_buffer)) != OMX_ErrorNone) { @@ -181,7 +186,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, return -1; } - dev->run->pictures[index].size = 0; + PICTURE(size) = 0; omx->output_available = false; omx->input_required = true; @@ -193,16 +198,12 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, if (omx->output_available) { omx->output_available = false; - memcpy( - dev->run->pictures[index].data + dev->run->pictures[index].size, - omx->output_buffer->pBuffer + omx->output_buffer->nOffset, - omx->output_buffer->nFilledLen - ); - assert(dev->run->pictures[index].size + omx->output_buffer->nFilledLen <= dev->run->max_picture_size); - dev->run->pictures[index].size += omx->output_buffer->nFilledLen; + assert(PICTURE(size) + OUT(nFilledLen) <= dev->run->max_picture_size); + memcpy(PICTURE(data) + PICTURE(size), OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen)); + PICTURE(size) += OUT(nFilledLen); - if (omx->output_buffer->nFlags & OMX_BUFFERFLAG_ENDOFFRAME) { - omx->output_buffer->nFlags = 0; + if (OUT(nFlags) & OMX_BUFFERFLAG_ENDOFFRAME) { + OUT(nFlags) = 0; break; } @@ -215,18 +216,18 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, if (omx->input_required) { omx->input_required = false; - if (pos == dev->run->hw_buffers[index].length) { + if (pos == HW_BUFFER(length)) { continue; } - memcpy(omx->input_buffer->pBuffer, dev->run->hw_buffers[index].start + pos, slice_size); - omx->input_buffer->nOffset = 0; - omx->input_buffer->nFilledLen = slice_size; + memcpy(IN(pBuffer), HW_BUFFER(start) + pos, slice_size); + IN(nOffset) = 0; + IN(nFilledLen) = slice_size; pos += slice_size; - if (pos + slice_size > dev->run->hw_buffers[index].length) { - slice_size = dev->run->hw_buffers[index].length - pos; + if (pos + slice_size > HW_BUFFER(length)) { + slice_size = HW_BUFFER(length) - pos; } if ((error = OMX_EmptyThisBuffer(omx->encoder, omx->input_buffer)) != OMX_ErrorNone) { @@ -237,6 +238,11 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, vcos_semaphore_wait(&omx->handler_lock); } + +# undef OUT +# undef IN +# undef HW_BUFFER +# undef PICTURE return 0; } diff --git a/src/stream.c b/src/stream.c index 08ef062..4e45ac1 100644 --- a/src/stream.c +++ b/src/stream.c @@ -116,9 +116,12 @@ void stream_loop(struct stream_t *stream) { LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number); } else { for (unsigned number = 0; number < stream->dev->n_workers; ++number) { - if (!pool.workers[number].has_job && (free_worker_number == -1 - || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time - )) { + if ( + !pool.workers[number].has_job && ( + free_worker_number == -1 + || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time + ) + ) { free_worker_number = number; break; } @@ -134,14 +137,14 @@ void stream_loop(struct stream_t *stream) { break; } -# define INIT_FD_SET(_set) \ - fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set); +# define INIT_FD_SET(_set) \ + fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set); INIT_FD_SET(read_fds); INIT_FD_SET(write_fds); INIT_FD_SET(error_fds); -# undef INIT_FD_SET +# undef INIT_FD_SET struct timeval timeout; timeout.tv_sec = stream->dev->timeout; @@ -216,30 +219,34 @@ void stream_loop(struct stream_t *stream) { LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); } +# define FREE_WORKER(_next) pool.workers[free_worker_number]._next + LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index); - pool.workers[free_worker_number].ctx.buf_info = buf_info; + FREE_WORKER(ctx.buf_info) = buf_info; if (!oldest_worker) { oldest_worker = &pool.workers[free_worker_number]; last_worker = oldest_worker; } else { - if (pool.workers[free_worker_number].order_next) { - pool.workers[free_worker_number].order_next->order_prev = pool.workers[free_worker_number].order_prev; + if (FREE_WORKER(order_next)) { + FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev); } - if (pool.workers[free_worker_number].order_prev) { - pool.workers[free_worker_number].order_prev->order_next = pool.workers[free_worker_number].order_next; + if (FREE_WORKER(order_prev)) { + FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next); } - pool.workers[free_worker_number].order_prev = last_worker; + FREE_WORKER(order_prev) = last_worker; last_worker->order_next = &pool.workers[free_worker_number]; last_worker = &pool.workers[free_worker_number]; } last_worker->order_next = NULL; - A_PTHREAD_M_LOCK(&pool.workers[free_worker_number].has_job_mutex); - pool.workers[free_worker_number].ctx.buf_index = buf_info.index; - pool.workers[free_worker_number].has_job = true; - A_PTHREAD_M_UNLOCK(&pool.workers[free_worker_number].has_job_mutex); - A_PTHREAD_C_SIGNAL(&pool.workers[free_worker_number].has_job_cond); + A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex)); + FREE_WORKER(ctx.buf_index) = buf_info.index; + FREE_WORKER(has_job) = true; + A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex)); + A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond)); + +# undef FREE_WORKER A_PTHREAD_M_LOCK(&pool.free_workers_mutex); pool.free_workers -= 1; @@ -319,11 +326,15 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker long double soft_delay; for (unsigned number = 0; number < dev->n_workers; ++number) { - A_PTHREAD_M_LOCK(&pool->workers[number].last_comp_time_mutex); - if (pool->workers[number].last_comp_time > 0) { - sum_comp_time += pool->workers[number].last_comp_time; +# define WORKER(_next) pool->workers[number]._next + + A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex)); + if (WORKER(last_comp_time) > 0) { + sum_comp_time += WORKER(last_comp_time); } - A_PTHREAD_M_UNLOCK(&pool->workers[number].last_comp_time_mutex); + A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex)); + +# undef WORKER } avg_comp_time = sum_comp_time / dev->n_workers; // Среднее время работы воркеров @@ -388,12 +399,13 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po A_PTHREAD_C_INIT(&pool->free_workers_cond); for (unsigned number = 0; number < dev->n_workers; ++number) { +# define WORKER(_next) pool->workers[number]._next +# define CTX(_next) WORKER(ctx._next) + pool->free_workers += 1; - A_PTHREAD_M_INIT(&pool->workers[number].has_job_mutex); - A_PTHREAD_C_INIT(&pool->workers[number].has_job_cond); - -# define CTX(_next) pool->workers[number].ctx._next + A_PTHREAD_M_INIT(&WORKER(has_job_mutex)); + A_PTHREAD_C_INIT(&WORKER(has_job_cond)); CTX(number) = number; CTX(dev) = dev; @@ -402,22 +414,23 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po CTX(encoder) = pool->encoder; - CTX(last_comp_time_mutex) = &pool->workers[number].last_comp_time_mutex; - CTX(last_comp_time) = &pool->workers[number].last_comp_time; + CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex); + CTX(last_comp_time) = &WORKER(last_comp_time); - CTX(has_job_mutex) = &pool->workers[number].has_job_mutex; - CTX(has_job) = &pool->workers[number].has_job; - CTX(job_failed) = &pool->workers[number].job_failed; - CTX(job_start_time) = &pool->workers[number].job_start_time; - CTX(has_job_cond) = &pool->workers[number].has_job_cond; + CTX(has_job_mutex) = &WORKER(has_job_mutex); + CTX(has_job) = &WORKER(has_job); + CTX(job_failed) = &WORKER(job_failed); + CTX(job_start_time) = &WORKER(job_start_time); + CTX(has_job_cond) = &WORKER(has_job_cond); CTX(free_workers_mutex) = &pool->free_workers_mutex; CTX(free_workers) = &pool->free_workers; CTX(free_workers_cond) = &pool->free_workers_cond; -# undef CTX + A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx)); - A_PTHREAD_CREATE(&pool->workers[number].tid, _stream_worker_thread, (void *)&pool->workers[number].ctx); +# undef CTX +# undef WORKER } } @@ -432,9 +445,9 @@ static void *_stream_worker_thread(void *v_ctx) { A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); -# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next - if (!*ctx->workers_stop) { +# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next + LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index); PICTURE(encode_begin_time) = get_now_monotonic(); @@ -461,9 +474,9 @@ static void *_stream_worker_thread(void *v_ctx) { *ctx->job_failed = true; *ctx->has_job = false; } - } -# undef PICTURE +# undef PICTURE + } A_PTHREAD_M_LOCK(ctx->free_workers_mutex); *ctx->free_workers += 1; @@ -481,14 +494,18 @@ static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool->workers_stop = true; for (unsigned number = 0; number < dev->n_workers; ++number) { - A_PTHREAD_M_LOCK(&pool->workers[number].has_job_mutex); - pool->workers[number].has_job = true; // Final job: die - A_PTHREAD_M_UNLOCK(&pool->workers[number].has_job_mutex); - A_PTHREAD_C_SIGNAL(&pool->workers[number].has_job_cond); +# define WORKER(_next) pool->workers[number]._next - A_PTHREAD_JOIN(pool->workers[number].tid); - A_PTHREAD_M_DESTROY(&pool->workers[number].has_job_mutex); - A_PTHREAD_C_DESTROY(&pool->workers[number].has_job_cond); + A_PTHREAD_M_LOCK(&WORKER(has_job_mutex)); + WORKER(has_job) = true; // Final job: die + A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex)); + A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond)); + + A_PTHREAD_JOIN(WORKER(tid)); + A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex)); + A_PTHREAD_C_DESTROY(&WORKER(has_job_cond)); + +# undef WORKER } A_PTHREAD_M_DESTROY(&pool->free_workers_mutex);