refactoring

This commit is contained in:
Devaev Maxim
2019-03-06 03:07:02 +03:00
parent a5bca4cca3
commit 01e21e419d
3 changed files with 91 additions and 66 deletions

View File

@@ -79,8 +79,10 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
jpeg.err = jpeg_std_error(&jpeg_error);
jpeg_create_compress(&jpeg);
dev->run->pictures[index].size = 0;
_jpeg_set_dest_picture(&jpeg, dev->run->pictures[index].data, &dev->run->pictures[index].size);
# define PICTURE(_next) dev->run->pictures[index]._next
PICTURE(size) = 0;
_jpeg_set_dest_picture(&jpeg, PICTURE(data), &PICTURE(size));
jpeg.image_width = dev->run->width;
jpeg.image_height = dev->run->height;
@@ -110,8 +112,8 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
// https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg
jpeg_finish_compress(&jpeg);
jpeg_destroy_compress(&jpeg);
assert(dev->run->pictures[index].size > 0);
assert(dev->run->pictures[index].size <= dev->run->max_picture_size);
assert(PICTURE(size) > 0);
assert(PICTURE(size) <= dev->run->max_picture_size);
}
static void _jpeg_set_dest_picture(j_compress_ptr jpeg, unsigned char *picture, size_t *written) {

View File

@@ -172,8 +172,13 @@ int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, un
}
int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index) {
# define PICTURE(_next) dev->run->pictures[index]._next
# define HW_BUFFER(_next) dev->run->hw_buffers[index]._next
# define IN(_next) omx->input_buffer->_next
# define OUT(_next) omx->output_buffer->_next
OMX_ERRORTYPE error;
size_t slice_size = omx->input_buffer->nAllocLen;
size_t slice_size = IN(nAllocLen);
size_t pos = 0;
if ((error = OMX_FillThisBuffer(omx->encoder, omx->output_buffer)) != OMX_ErrorNone) {
@@ -181,7 +186,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
return -1;
}
dev->run->pictures[index].size = 0;
PICTURE(size) = 0;
omx->output_available = false;
omx->input_required = true;
@@ -193,16 +198,12 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
if (omx->output_available) {
omx->output_available = false;
memcpy(
dev->run->pictures[index].data + dev->run->pictures[index].size,
omx->output_buffer->pBuffer + omx->output_buffer->nOffset,
omx->output_buffer->nFilledLen
);
assert(dev->run->pictures[index].size + omx->output_buffer->nFilledLen <= dev->run->max_picture_size);
dev->run->pictures[index].size += omx->output_buffer->nFilledLen;
assert(PICTURE(size) + OUT(nFilledLen) <= dev->run->max_picture_size);
memcpy(PICTURE(data) + PICTURE(size), OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen));
PICTURE(size) += OUT(nFilledLen);
if (omx->output_buffer->nFlags & OMX_BUFFERFLAG_ENDOFFRAME) {
omx->output_buffer->nFlags = 0;
if (OUT(nFlags) & OMX_BUFFERFLAG_ENDOFFRAME) {
OUT(nFlags) = 0;
break;
}
@@ -215,18 +216,18 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
if (omx->input_required) {
omx->input_required = false;
if (pos == dev->run->hw_buffers[index].length) {
if (pos == HW_BUFFER(length)) {
continue;
}
memcpy(omx->input_buffer->pBuffer, dev->run->hw_buffers[index].start + pos, slice_size);
omx->input_buffer->nOffset = 0;
omx->input_buffer->nFilledLen = slice_size;
memcpy(IN(pBuffer), HW_BUFFER(start) + pos, slice_size);
IN(nOffset) = 0;
IN(nFilledLen) = slice_size;
pos += slice_size;
if (pos + slice_size > dev->run->hw_buffers[index].length) {
slice_size = dev->run->hw_buffers[index].length - pos;
if (pos + slice_size > HW_BUFFER(length)) {
slice_size = HW_BUFFER(length) - pos;
}
if ((error = OMX_EmptyThisBuffer(omx->encoder, omx->input_buffer)) != OMX_ErrorNone) {
@@ -237,6 +238,11 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
vcos_semaphore_wait(&omx->handler_lock);
}
# undef OUT
# undef IN
# undef HW_BUFFER
# undef PICTURE
return 0;
}

View File

@@ -116,9 +116,12 @@ void stream_loop(struct stream_t *stream) {
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
} else {
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
if (!pool.workers[number].has_job && (free_worker_number == -1
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
)) {
if (
!pool.workers[number].has_job && (
free_worker_number == -1
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
)
) {
free_worker_number = number;
break;
}
@@ -134,14 +137,14 @@ void stream_loop(struct stream_t *stream) {
break;
}
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
# define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(write_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
# undef INIT_FD_SET
struct timeval timeout;
timeout.tv_sec = stream->dev->timeout;
@@ -216,30 +219,34 @@ void stream_loop(struct stream_t *stream) {
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
}
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index);
pool.workers[free_worker_number].ctx.buf_info = buf_info;
FREE_WORKER(ctx.buf_info) = buf_info;
if (!oldest_worker) {
oldest_worker = &pool.workers[free_worker_number];
last_worker = oldest_worker;
} else {
if (pool.workers[free_worker_number].order_next) {
pool.workers[free_worker_number].order_next->order_prev = pool.workers[free_worker_number].order_prev;
if (FREE_WORKER(order_next)) {
FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev);
}
if (pool.workers[free_worker_number].order_prev) {
pool.workers[free_worker_number].order_prev->order_next = pool.workers[free_worker_number].order_next;
if (FREE_WORKER(order_prev)) {
FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next);
}
pool.workers[free_worker_number].order_prev = last_worker;
FREE_WORKER(order_prev) = last_worker;
last_worker->order_next = &pool.workers[free_worker_number];
last_worker = &pool.workers[free_worker_number];
}
last_worker->order_next = NULL;
A_PTHREAD_M_LOCK(&pool.workers[free_worker_number].has_job_mutex);
pool.workers[free_worker_number].ctx.buf_index = buf_info.index;
pool.workers[free_worker_number].has_job = true;
A_PTHREAD_M_UNLOCK(&pool.workers[free_worker_number].has_job_mutex);
A_PTHREAD_C_SIGNAL(&pool.workers[free_worker_number].has_job_cond);
A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(ctx.buf_index) = buf_info.index;
FREE_WORKER(has_job) = true;
A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond));
# undef FREE_WORKER
A_PTHREAD_M_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1;
@@ -319,11 +326,15 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
long double soft_delay;
for (unsigned number = 0; number < dev->n_workers; ++number) {
A_PTHREAD_M_LOCK(&pool->workers[number].last_comp_time_mutex);
if (pool->workers[number].last_comp_time > 0) {
sum_comp_time += pool->workers[number].last_comp_time;
# define WORKER(_next) pool->workers[number]._next
A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time);
}
A_PTHREAD_M_UNLOCK(&pool->workers[number].last_comp_time_mutex);
A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER
}
avg_comp_time = sum_comp_time / dev->n_workers; // Среднее время работы воркеров
@@ -388,12 +399,13 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po
A_PTHREAD_C_INIT(&pool->free_workers_cond);
for (unsigned number = 0; number < dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
# define CTX(_next) WORKER(ctx._next)
pool->free_workers += 1;
A_PTHREAD_M_INIT(&pool->workers[number].has_job_mutex);
A_PTHREAD_C_INIT(&pool->workers[number].has_job_cond);
# define CTX(_next) pool->workers[number].ctx._next
A_PTHREAD_M_INIT(&WORKER(has_job_mutex));
A_PTHREAD_C_INIT(&WORKER(has_job_cond));
CTX(number) = number;
CTX(dev) = dev;
@@ -402,22 +414,23 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po
CTX(encoder) = pool->encoder;
CTX(last_comp_time_mutex) = &pool->workers[number].last_comp_time_mutex;
CTX(last_comp_time) = &pool->workers[number].last_comp_time;
CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex);
CTX(last_comp_time) = &WORKER(last_comp_time);
CTX(has_job_mutex) = &pool->workers[number].has_job_mutex;
CTX(has_job) = &pool->workers[number].has_job;
CTX(job_failed) = &pool->workers[number].job_failed;
CTX(job_start_time) = &pool->workers[number].job_start_time;
CTX(has_job_cond) = &pool->workers[number].has_job_cond;
CTX(has_job_mutex) = &WORKER(has_job_mutex);
CTX(has_job) = &WORKER(has_job);
CTX(job_failed) = &WORKER(job_failed);
CTX(job_start_time) = &WORKER(job_start_time);
CTX(has_job_cond) = &WORKER(has_job_cond);
CTX(free_workers_mutex) = &pool->free_workers_mutex;
CTX(free_workers) = &pool->free_workers;
CTX(free_workers_cond) = &pool->free_workers_cond;
# undef CTX
A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx));
A_PTHREAD_CREATE(&pool->workers[number].tid, _stream_worker_thread, (void *)&pool->workers[number].ctx);
# undef CTX
# undef WORKER
}
}
@@ -432,9 +445,9 @@ static void *_stream_worker_thread(void *v_ctx) {
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex);
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next
if (!*ctx->workers_stop) {
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index);
PICTURE(encode_begin_time) = get_now_monotonic();
@@ -461,9 +474,9 @@ static void *_stream_worker_thread(void *v_ctx) {
*ctx->job_failed = true;
*ctx->has_job = false;
}
}
# undef PICTURE
# undef PICTURE
}
A_PTHREAD_M_LOCK(ctx->free_workers_mutex);
*ctx->free_workers += 1;
@@ -481,14 +494,18 @@ static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t
*pool->workers_stop = true;
for (unsigned number = 0; number < dev->n_workers; ++number) {
A_PTHREAD_M_LOCK(&pool->workers[number].has_job_mutex);
pool->workers[number].has_job = true; // Final job: die
A_PTHREAD_M_UNLOCK(&pool->workers[number].has_job_mutex);
A_PTHREAD_C_SIGNAL(&pool->workers[number].has_job_cond);
# define WORKER(_next) pool->workers[number]._next
A_PTHREAD_JOIN(pool->workers[number].tid);
A_PTHREAD_M_DESTROY(&pool->workers[number].has_job_mutex);
A_PTHREAD_C_DESTROY(&pool->workers[number].has_job_cond);
A_PTHREAD_M_LOCK(&WORKER(has_job_mutex));
WORKER(has_job) = true; // Final job: die
A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond));
A_PTHREAD_JOIN(WORKER(tid));
A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex));
A_PTHREAD_C_DESTROY(&WORKER(has_job_cond));
# undef WORKER
}
A_PTHREAD_M_DESTROY(&pool->free_workers_mutex);