refactoring

This commit is contained in:
Devaev Maxim 2018-09-17 00:33:21 +03:00
parent 4e89ac2624
commit 2964c63249
2 changed files with 39 additions and 39 deletions

View File

@ -74,11 +74,11 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) {
if (FD_ISSET(dev->run->fd, &read_fds)) {
LOG_DEBUG("Frame is ready, waiting for workers ...");
assert(pthread_mutex_lock(&pool.has_free_workers_mutex) == 0);
assert(!pthread_mutex_lock(&pool.has_free_workers_mutex));
while (!pool.has_free_workers) {
assert(pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex) == 0);
assert(!pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex));
}
assert(pthread_mutex_unlock(&pool.has_free_workers_mutex) == 0);
assert(!pthread_mutex_unlock(&pool.has_free_workers_mutex));
struct v4l2_buffer buf_info;
@ -109,10 +109,10 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) {
LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index);
pool.workers[buf_info.index].ctx.buf_info = buf_info;
assert(pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex) == 0);
assert(!pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex));
pool.workers[buf_info.index].has_job = true;
assert(pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex) == 0);
assert(pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond) == 0);
assert(!pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex));
assert(!pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond));
pass_frame:
{} // FIXME: for future mjpg support
@ -185,12 +185,12 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p
*pool->workers_stop = false;
assert((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers))));
assert(pthread_mutex_init(&pool->has_free_workers_mutex, NULL) == 0);
assert(pthread_cond_init(&pool->has_free_workers_cond, NULL) == 0);
assert(!pthread_mutex_init(&pool->has_free_workers_mutex, NULL));
assert(!pthread_cond_init(&pool->has_free_workers_cond, NULL));
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL) == 0);
assert(pthread_cond_init(&pool->workers[index].has_job_cond, NULL) == 0);
assert(!pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL));
assert(!pthread_cond_init(&pool->workers[index].has_job_cond, NULL));
pool->workers[index].ctx.index = index;
pool->workers[index].ctx.dev = dev;
@ -208,12 +208,12 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p
pool->workers[index].ctx.has_free_workers = &pool->has_free_workers;
pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond;
assert(pthread_create(
assert(!pthread_create(
&pool->workers[index].tid,
NULL,
_capture_worker_thread,
(void *)&pool->workers[index].ctx
) == 0);
));
}
}
@ -223,17 +223,17 @@ static void *_capture_worker_thread(void *v_ctx_ptr) {
LOG_INFO("Hello! I am a worker #%d ^_^", ctx->index);
while (!*ctx->global_stop && !*ctx->workers_stop) {
assert(pthread_mutex_lock(ctx->has_free_workers_mutex) == 0);
assert(!pthread_mutex_lock(ctx->has_free_workers_mutex));
*ctx->has_free_workers = true;
assert(pthread_mutex_unlock(ctx->has_free_workers_mutex) == 0);
assert(pthread_cond_signal(ctx->has_free_workers_cond) == 0);
assert(!pthread_mutex_unlock(ctx->has_free_workers_mutex));
assert(!pthread_cond_signal(ctx->has_free_workers_cond));
LOG_DEBUG("Worker %d waiting for a new job ...", ctx->index);
assert(pthread_mutex_lock(ctx->has_job_mutex) == 0);
while (!(*ctx->has_job)) {
assert(pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex) == 0);
assert(!pthread_mutex_lock(ctx->has_job_mutex));
while (!*ctx->has_job) {
assert(!pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex));
}
assert(pthread_mutex_unlock(ctx->has_job_mutex) == 0);
assert(!pthread_mutex_unlock(ctx->has_job_mutex));
if (!*ctx->workers_stop) {
int compressed;
@ -249,7 +249,7 @@ static void *_capture_worker_thread(void *v_ctx_ptr) {
compressed = jpeg_compress_buffer(ctx->dev, ctx->index); // FIXME
assert(_capture_release_buffer(ctx->dev, &ctx->buf_info) == 0); // FIXME
assert(!_capture_release_buffer(ctx->dev, &ctx->buf_info)); // FIXME
*ctx->has_job = false;
now_ms(&stop_sec, &stop_msec);
@ -259,9 +259,9 @@ static void *_capture_worker_thread(void *v_ctx_ptr) {
last_comp_time = 0;
}
assert(pthread_mutex_lock(ctx->last_comp_time_mutex) == 0);
assert(!pthread_mutex_lock(ctx->last_comp_time_mutex));
*ctx->last_comp_time = last_comp_time;
assert(pthread_mutex_unlock(ctx->last_comp_time_mutex) == 0);
assert(!pthread_mutex_unlock(ctx->last_comp_time_mutex));
LOG_INFO("Compressed JPEG size=%d; time=%LG (worker %d)", compressed, last_comp_time, ctx->index);
}
@ -276,18 +276,18 @@ static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t
if (pool->workers) {
*pool->workers_stop = true;
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(pthread_mutex_lock(&pool->workers[index].has_job_mutex) == 0);
assert(!pthread_mutex_lock(&pool->workers[index].has_job_mutex));
pool->workers[index].has_job = true; // Final job: die
assert(pthread_mutex_unlock(&pool->workers[index].has_job_mutex) == 0);
assert(pthread_cond_signal(&pool->workers[index].has_job_cond) == 0);
assert(!pthread_mutex_unlock(&pool->workers[index].has_job_mutex));
assert(!pthread_cond_signal(&pool->workers[index].has_job_cond));
assert(pthread_join(pool->workers[index].tid, NULL) == 0);
assert(pthread_mutex_destroy(&pool->workers[index].has_job_mutex) == 0);
assert(pthread_cond_destroy(&pool->workers[index].has_job_cond) == 0);
assert(!pthread_join(pool->workers[index].tid, NULL));
assert(!pthread_mutex_destroy(&pool->workers[index].has_job_mutex));
assert(!pthread_cond_destroy(&pool->workers[index].has_job_cond));
}
assert(pthread_cond_destroy(&pool->has_free_workers_cond) == 0);
assert(pthread_mutex_destroy(&pool->has_free_workers_mutex) == 0);
assert(!pthread_cond_destroy(&pool->has_free_workers_cond));
assert(!pthread_mutex_destroy(&pool->has_free_workers_mutex));
free(pool->workers);
}

View File

@ -100,10 +100,10 @@ static void *_capture_loop_thread(void *v_ctx_ptr) {
struct threads_context *ctx = (struct threads_context *)v_ctx_ptr;
sigset_t mask;
assert(sigemptyset(&mask) == 0);
assert(sigaddset(&mask, SIGINT) == 0);
assert(sigaddset(&mask, SIGTERM) == 0);
assert(pthread_sigmask(SIG_BLOCK, &mask, NULL) == 0);
assert(!sigemptyset(&mask));
assert(!sigaddset(&mask, SIGINT));
assert(!sigaddset(&mask, SIGTERM));
assert(!pthread_sigmask(SIG_BLOCK, &mask, NULL));
capture_loop(ctx->dev, (sig_atomic_t *volatile)ctx->global_stop);
return NULL;
@ -125,16 +125,16 @@ int main(int argc, char *argv[]) {
struct sigaction sig_act;
MEMSET_ZERO(sig_act);
assert(sigemptyset(&sig_act.sa_mask) == 0);
assert(!sigemptyset(&sig_act.sa_mask));
sig_act.sa_handler = _interrupt_handler;
assert(sigaddset(&sig_act.sa_mask, SIGINT) == 0);
assert(sigaddset(&sig_act.sa_mask, SIGTERM) == 0);
assert(!sigaddset(&sig_act.sa_mask, SIGINT));
assert(!sigaddset(&sig_act.sa_mask, SIGTERM));
LOG_INFO("Installing SIGINT handler ...");
assert(sigaction(SIGINT, &sig_act, NULL) == 0);
assert(!sigaction(SIGINT, &sig_act, NULL));
LOG_INFO("Installing SIGTERM handler ...");
assert(sigaction(SIGTERM, &sig_act, NULL) == 0);
assert(!sigaction(SIGTERM, &sig_act, NULL));
device_init(&dev, &run);
_parse_options(argc, argv, &dev);