refactoring

This commit is contained in:
Devaev Maxim
2019-03-15 14:21:50 +03:00
parent 50158397a0
commit bac7a2595e
2 changed files with 58 additions and 82 deletions

View File

@@ -45,7 +45,7 @@ static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *poo
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
static void *_stream_worker_thread(void *v_ctx);
static void *_stream_worker_thread(void *v_worker);
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
static int _stream_control(struct device_t *dev, bool enable);
@@ -109,14 +109,14 @@ void stream_loop(struct stream_t *stream) {
A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) {
if (oldest_worker && !oldest_worker->has_job && oldest_worker->buf_index >= 0) {
if (oldest_worker->job_failed) {
break;
}
_stream_expose_picture(stream, oldest_worker->ctx.buf_index);
_stream_expose_picture(stream, oldest_worker->buf_index);
free_worker_number = oldest_worker->ctx.number;
free_worker_number = oldest_worker->number;
oldest_worker = oldest_worker->order_next;
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
@@ -232,7 +232,7 @@ void stream_loop(struct stream_t *stream) {
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index);
FREE_WORKER(ctx.buf_info) = buf_info;
FREE_WORKER(buf_info) = buf_info;
if (!oldest_worker) {
oldest_worker = &pool.workers[free_worker_number];
@@ -251,7 +251,7 @@ void stream_loop(struct stream_t *stream) {
last_worker->order_next = NULL;
A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(ctx.buf_index) = buf_info.index;
FREE_WORKER(buf_index) = buf_info.index;
FREE_WORKER(has_job) = true;
A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
@@ -413,89 +413,78 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
# define CTX(_next) WORKER(ctx._next)
pool->free_workers += 1;
A_MUTEX_INIT(&WORKER(has_job_mutex));
A_COND_INIT(&WORKER(has_job_cond));
CTX(number) = number;
CTX(dev) = stream->dev;
CTX(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop);
CTX(workers_stop) = pool->workers_stop;
WORKER(number) = number;
WORKER(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop);
WORKER(workers_stop) = pool->workers_stop;
CTX(encoder) = pool->encoder;
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
WORKER(free_workers) = &pool->free_workers;
WORKER(free_workers_cond) = &pool->free_workers_cond;
CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex);
CTX(last_comp_time) = &WORKER(last_comp_time);
WORKER(dev) = stream->dev;
WORKER(encoder) = pool->encoder;
CTX(has_job_mutex) = &WORKER(has_job_mutex);
CTX(has_job) = &WORKER(has_job);
CTX(job_failed) = &WORKER(job_failed);
CTX(job_start_time) = &WORKER(job_start_time);
CTX(has_job_cond) = &WORKER(has_job_cond);
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
CTX(free_workers_mutex) = &pool->free_workers_mutex;
CTX(free_workers) = &pool->free_workers;
CTX(free_workers_cond) = &pool->free_workers_cond;
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx));
# undef CTX
# undef WORKER
}
}
static void *_stream_worker_thread(void *v_ctx) {
struct worker_context_t *ctx = (struct worker_context_t *)v_ctx;
static void *_stream_worker_thread(void *v_worker) {
struct worker_t *worker = (struct worker_t *)v_worker;
LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number);
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
while (!*ctx->proc_stop && !*ctx->workers_stop) {
LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number);
A_MUTEX_LOCK(ctx->has_job_mutex);
A_COND_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
A_MUTEX_UNLOCK(ctx->has_job_mutex);
while (!*worker->proc_stop && !*worker->workers_stop) {
LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
A_MUTEX_LOCK(&worker->has_job_mutex);
A_COND_WAIT_TRUE(worker->has_job, &worker->has_job_cond, &worker->has_job_mutex);
A_MUTEX_UNLOCK(&worker->has_job_mutex);
if (!*ctx->workers_stop) {
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next
if (!*worker->workers_stop) {
# define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]._next
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index);
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
PICTURE(encode_begin_time) = get_now_monotonic();
if (encoder_compress_buffer(ctx->encoder, ctx->dev, ctx->number, ctx->buf_index) < 0) {
*ctx->job_failed = true;
if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
worker->job_failed = true;
}
PICTURE(encode_end_time) = get_now_monotonic();
if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) {
*ctx->job_start_time = PICTURE(encode_begin_time);
*ctx->has_job = false;
if (_stream_release_buffer(worker->dev, &worker->buf_info) == 0) {
worker->job_start_time = PICTURE(encode_begin_time);
worker->has_job = false;
long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time;
long double last_comp_time = PICTURE(encode_end_time) - worker->job_start_time;
A_MUTEX_LOCK(ctx->last_comp_time_mutex);
*ctx->last_comp_time = last_comp_time;
A_MUTEX_UNLOCK(ctx->last_comp_time_mutex);
A_MUTEX_LOCK(&worker->last_comp_time_mutex);
worker->last_comp_time = last_comp_time;
A_MUTEX_UNLOCK(&worker->last_comp_time_mutex);
LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u",
PICTURE(size), last_comp_time, ctx->number, ctx->buf_index);
PICTURE(size), last_comp_time, worker->number, worker->buf_index);
} else {
*ctx->job_failed = true;
*ctx->has_job = false;
worker->job_failed = true;
worker->has_job = false;
}
# undef PICTURE
}
A_MUTEX_LOCK(ctx->free_workers_mutex);
*ctx->free_workers += 1;
A_MUTEX_UNLOCK(ctx->free_workers_mutex);
A_COND_SIGNAL(ctx->free_workers_cond);
A_MUTEX_LOCK(worker->free_workers_mutex);
*worker->free_workers += 1;
A_MUTEX_UNLOCK(worker->free_workers_mutex);
A_COND_SIGNAL(worker->free_workers_cond);
}
LOG_DEBUG("Bye-bye (worker %u)", ctx->number);
LOG_DEBUG("Bye-bye (worker %u)", worker->number);
return NULL;
}

View File

@@ -31,45 +31,32 @@
#include "encoder.h"
struct worker_context_t {
struct worker_t {
pthread_t tid;
unsigned number;
struct device_t *dev;
int buf_index;
struct v4l2_buffer buf_info;
sig_atomic_t *volatile proc_stop;
bool *workers_stop;
struct encoder_t *encoder;
pthread_mutex_t last_comp_time_mutex;
long double last_comp_time;
pthread_mutex_t *last_comp_time_mutex;
long double *last_comp_time;
pthread_mutex_t *has_job_mutex;
bool *has_job;
bool *job_failed;
long double *job_start_time;
pthread_cond_t *has_job_cond;
pthread_mutex_t has_job_mutex;
int buf_index;
struct v4l2_buffer buf_info;
bool has_job;
bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond;
pthread_mutex_t *free_workers_mutex;
unsigned *free_workers;
pthread_cond_t *free_workers_cond;
};
struct worker_t {
struct worker_context_t ctx;
pthread_t tid;
struct worker_t *order_prev;
struct worker_t *order_next;
pthread_mutex_t last_comp_time_mutex;
long double last_comp_time;
pthread_mutex_t has_job_mutex;
bool has_job;
bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond;
struct worker_t *order_prev;
struct worker_t *order_next;
struct device_t *dev;
struct encoder_t *encoder;
};
struct workers_pool_t {