From 968df5d82bf2351af332ffc852185466a851e71f Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Mon, 17 Sep 2018 01:11:34 +0300 Subject: [PATCH] refactoring --- src/capture.c | 65 ++++++++++++++++++++++----------------------------- src/device.c | 8 +++---- src/jpeg.c | 2 +- src/main.c | 6 ++--- src/tools.h | 22 ++++++++++++++++- 5 files changed, 55 insertions(+), 48 deletions(-) diff --git a/src/capture.c b/src/capture.c index ef36d94..b95ff28 100644 --- a/src/capture.c +++ b/src/capture.c @@ -74,11 +74,9 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) { if (FD_ISSET(dev->run->fd, &read_fds)) { LOG_DEBUG("Frame is ready, waiting for workers ..."); - assert(!pthread_mutex_lock(&pool.has_free_workers_mutex)); - while (!pool.has_free_workers) { - assert(!pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex)); - } - assert(!pthread_mutex_unlock(&pool.has_free_workers_mutex)); + A_PTHREAD_M_LOCK(&pool.has_free_workers_mutex); + A_PTHREAD_C_WAIT_TRUE(pool.has_free_workers, &pool.has_free_workers_cond, &pool.has_free_workers_mutex); + A_PTHREAD_M_UNLOCK(&pool.has_free_workers_mutex); struct v4l2_buffer buf_info; @@ -109,10 +107,10 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) { LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index); pool.workers[buf_info.index].ctx.buf_info = buf_info; - assert(!pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex)); + A_PTHREAD_M_LOCK(&pool.workers[buf_info.index].has_job_mutex); pool.workers[buf_info.index].has_job = true; - assert(!pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex)); - assert(!pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond)); + A_PTHREAD_M_UNLOCK(&pool.workers[buf_info.index].has_job_mutex); + A_PTHREAD_C_SIGNAL(&pool.workers[buf_info.index].has_job_cond); pass_frame: {} // FIXME: for future mjpg support @@ -183,14 +181,14 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers); *pool->workers_stop = false; - assert((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers)))); + A_CALLOC(pool->workers, dev->run->n_buffers, sizeof(*pool->workers)); - assert(!pthread_mutex_init(&pool->has_free_workers_mutex, NULL)); - assert(!pthread_cond_init(&pool->has_free_workers_cond, NULL)); + A_PTHREAD_M_INIT(&pool->has_free_workers_mutex); + A_PTHREAD_C_INIT(&pool->has_free_workers_cond); for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - assert(!pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL)); - assert(!pthread_cond_init(&pool->workers[index].has_job_cond, NULL)); + A_PTHREAD_M_INIT(&pool->workers[index].has_job_mutex); + A_PTHREAD_C_INIT(&pool->workers[index].has_job_cond); pool->workers[index].ctx.index = index; pool->workers[index].ctx.dev = dev; @@ -208,12 +206,7 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p pool->workers[index].ctx.has_free_workers = &pool->has_free_workers; pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond; - assert(!pthread_create( - &pool->workers[index].tid, - NULL, - _capture_worker_thread, - (void *)&pool->workers[index].ctx - )); + A_PTHREAD_CREATE(&pool->workers[index].tid, _capture_worker_thread, (void *)&pool->workers[index].ctx); } } @@ -223,17 +216,15 @@ static void *_capture_worker_thread(void *v_ctx_ptr) { LOG_INFO("Hello! I am a worker #%d ^_^", ctx->index); while (!*ctx->global_stop && !*ctx->workers_stop) { - assert(!pthread_mutex_lock(ctx->has_free_workers_mutex)); + A_PTHREAD_M_LOCK(ctx->has_free_workers_mutex); *ctx->has_free_workers = true; - assert(!pthread_mutex_unlock(ctx->has_free_workers_mutex)); - assert(!pthread_cond_signal(ctx->has_free_workers_cond)); + A_PTHREAD_M_UNLOCK(ctx->has_free_workers_mutex); + A_PTHREAD_C_SIGNAL(ctx->has_free_workers_cond); LOG_DEBUG("Worker %d waiting for a new job ...", ctx->index); - assert(!pthread_mutex_lock(ctx->has_job_mutex)); - while (!*ctx->has_job) { - assert(!pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex)); - } - assert(!pthread_mutex_unlock(ctx->has_job_mutex)); + A_PTHREAD_M_LOCK(ctx->has_job_mutex); + A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); + A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); if (!*ctx->workers_stop) { int compressed; @@ -259,9 +250,9 @@ static void *_capture_worker_thread(void *v_ctx_ptr) { last_comp_time = 0; } - assert(!pthread_mutex_lock(ctx->last_comp_time_mutex)); + A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); *ctx->last_comp_time = last_comp_time; - assert(!pthread_mutex_unlock(ctx->last_comp_time_mutex)); + A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); LOG_INFO("Compressed JPEG size=%d; time=%LG (worker %d)", compressed, last_comp_time, ctx->index); } @@ -276,18 +267,18 @@ static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t if (pool->workers) { *pool->workers_stop = true; for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - assert(!pthread_mutex_lock(&pool->workers[index].has_job_mutex)); + A_PTHREAD_M_LOCK(&pool->workers[index].has_job_mutex); pool->workers[index].has_job = true; // Final job: die - assert(!pthread_mutex_unlock(&pool->workers[index].has_job_mutex)); - assert(!pthread_cond_signal(&pool->workers[index].has_job_cond)); + A_PTHREAD_M_UNLOCK(&pool->workers[index].has_job_mutex); + A_PTHREAD_C_SIGNAL(&pool->workers[index].has_job_cond); - assert(!pthread_join(pool->workers[index].tid, NULL)); - assert(!pthread_mutex_destroy(&pool->workers[index].has_job_mutex)); - assert(!pthread_cond_destroy(&pool->workers[index].has_job_cond)); + A_PTHREAD_JOIN(pool->workers[index].tid); + A_PTHREAD_M_DESTROY(&pool->workers[index].has_job_mutex); + A_PTHREAD_C_DESTROY(&pool->workers[index].has_job_cond); } - assert(!pthread_cond_destroy(&pool->has_free_workers_cond)); - assert(!pthread_mutex_destroy(&pool->has_free_workers_mutex)); + A_PTHREAD_M_DESTROY(&pool->has_free_workers_mutex); + A_PTHREAD_C_DESTROY(&pool->has_free_workers_cond); free(pool->workers); } diff --git a/src/device.c b/src/device.c index 871a63c..cb99d4c 100644 --- a/src/device.c +++ b/src/device.c @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -333,8 +332,7 @@ static int _device_open_mmap(struct device_t *dev) { LOG_DEBUG("Allocating device buffers ..."); - assert((dev->run->buffers = calloc(req.count, sizeof(*dev->run->buffers)))); - + A_CALLOC(dev->run->buffers, req.count, sizeof(*dev->run->buffers)); for (dev->run->n_buffers = 0; dev->run->n_buffers < req.count; ++dev->run->n_buffers) { struct v4l2_buffer buf; @@ -381,13 +379,13 @@ static int _device_open_queue_buffers(struct device_t *dev) { static void _device_open_alloc_picbufs(struct device_t *dev) { LOG_DEBUG("Allocating picture buffers ..."); - assert((dev->run->pictures = calloc(dev->run->n_buffers, sizeof(*dev->run->pictures)))); + A_CALLOC(dev->run->pictures, dev->run->n_buffers, sizeof(*dev->run->pictures)); unsigned picture_size = dev->run->width * dev->run->height << 1; for (unsigned index = 0; index < dev->run->n_buffers; ++index) { LOG_DEBUG("Allocating picture buffer %d ...", index); - assert((dev->run->pictures[index] = malloc(picture_size))); + A_MALLOC(dev->run->pictures[index], picture_size); } } diff --git a/src/jpeg.c b/src/jpeg.c index c7d9024..3c0a94a 100644 --- a/src/jpeg.c +++ b/src/jpeg.c @@ -73,7 +73,7 @@ int jpeg_compress_buffer(struct device_t *dev, int index) { unsigned char *line_buffer; int written = -1; - assert((line_buffer = calloc(dev->run->width * 3, sizeof(unsigned char)))); + A_CALLOC(line_buffer, dev->run->width * 3, sizeof(unsigned char)); jpeg.err = jpeg_std_error(&jpeg_error); jpeg_create_compress(&jpeg); diff --git a/src/main.c b/src/main.c index 50985ef..35a4776 100644 --- a/src/main.c +++ b/src/main.c @@ -36,7 +36,6 @@ static const struct option _long_opts[] = { {NULL, 0, NULL, 0}, }; - static void _help(int exit_code) { printf("No manual yet\n"); exit(exit_code); @@ -90,7 +89,6 @@ static void _parse_options(int argc, char *argv[], struct device_t *dev) { } } - struct threads_context { struct device_t *dev; sig_atomic_t *volatile global_stop; @@ -139,8 +137,8 @@ int main(int argc, char *argv[]) { device_init(&dev, &run); _parse_options(argc, argv, &dev); - pthread_create(&capture_loop_tid, NULL, _capture_loop_thread, (void *)&ctx); - pthread_join(capture_loop_tid, NULL); + A_PTHREAD_CREATE(&capture_loop_tid, _capture_loop_thread, (void *)&ctx); + A_PTHREAD_JOIN(capture_loop_tid); return 0; } diff --git a/src/tools.h b/src/tools.h index f0d422f..70322ed 100644 --- a/src/tools.h +++ b/src/tools.h @@ -1,11 +1,13 @@ #pragma once #include +#include #include #include #include #include #include +#include #include #include #include @@ -46,7 +48,25 @@ bool debug; printf("-- ERROR " INNER_LOG_PL " -- " _x_msg ": %s\n", INNER_LOG_PARAMS, ##__VA_ARGS__, _buf); } -#define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj)) +#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg)) +#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL)) + +#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL)) +#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex)) +#define A_PTHREAD_M_LOCK(...) assert(!pthread_mutex_lock(__VA_ARGS__)) +#define A_PTHREAD_M_UNLOCK(...) assert(!pthread_mutex_unlock(__VA_ARGS__)) + +#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) +#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) +#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) +#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) \ + { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } + + +#define A_CALLOC(_dest, _nmemb, _size) assert((_dest = calloc(_nmemb, _size))) +#define A_MALLOC(_dest, _size) assert((_dest = malloc(_size))); +#define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj)) + #define INLINE inline __attribute__((always_inline))