diff --git a/src/encoder.c b/src/encoder.c index 141b11c..2dbe5f3 100644 --- a/src/encoder.c +++ b/src/encoder.c @@ -57,7 +57,7 @@ struct encoder_t *encoder_init() { A_CALLOC(run, 1); run->type = ENCODER_TYPE_CPU; run->quality = 80; - A_PTHREAD_M_INIT(&run->mutex); + A_MUTEX_INIT(&run->mutex); A_CALLOC(encoder, 1); encoder->type = run->type; @@ -119,7 +119,7 @@ void encoder_destroy(struct encoder_t *encoder) { free(encoder->run->omxs); } # endif - A_PTHREAD_M_DESTROY(&encoder->run->mutex); + A_MUTEX_DESTROY(&encoder->run->mutex); free(encoder->run); free(encoder); } @@ -150,9 +150,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) { && encoder->run->type != ENCODER_TYPE_HW ) { LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG"); - A_PTHREAD_M_LOCK(&encoder->run->mutex); + A_MUTEX_LOCK(&encoder->run->mutex); encoder->run->type = ENCODER_TYPE_HW; - A_PTHREAD_M_UNLOCK(&encoder->run->mutex); + A_MUTEX_UNLOCK(&encoder->run->mutex); } if (encoder->run->type == ENCODER_TYPE_HW) { @@ -161,9 +161,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) { goto use_fallback; } if (hw_encoder_prepare_live(dev, encoder->quality) < 0) { - A_PTHREAD_M_LOCK(&encoder->run->mutex); + A_MUTEX_LOCK(&encoder->run->mutex); encoder->run->quality = 0; - A_PTHREAD_M_UNLOCK(&encoder->run->mutex); + A_MUTEX_UNLOCK(&encoder->run->mutex); LOG_INFO("Using JPEG quality: HW-default"); } } @@ -181,10 +181,10 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) { return; use_fallback: - A_PTHREAD_M_LOCK(&encoder->run->mutex); + A_MUTEX_LOCK(&encoder->run->mutex); encoder->run->type = ENCODER_TYPE_CPU; encoder->run->quality = encoder->quality; - A_PTHREAD_M_UNLOCK(&encoder->run->mutex); + A_MUTEX_UNLOCK(&encoder->run->mutex); } #pragma GCC diagnostic ignored "-Wunused-label" @@ -213,10 +213,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns # pragma GCC diagnostic push use_fallback: LOG_INFO("Error while compressing, falling back to CPU"); - A_PTHREAD_M_LOCK(&encoder->run->mutex); + A_MUTEX_LOCK(&encoder->run->mutex); encoder->run->type = ENCODER_TYPE_CPU; encoder->run->quality = encoder->quality; - A_PTHREAD_M_UNLOCK(&encoder->run->mutex); + A_MUTEX_UNLOCK(&encoder->run->mutex); return -1; # pragma GCC diagnostic pop } diff --git a/src/http.c b/src/http.c index 68e10ad..cb16c63 100644 --- a/src/http.c +++ b/src/http.c @@ -268,10 +268,10 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server) # define ENCODER(_next) server->run->stream->encoder->_next - A_PTHREAD_M_LOCK(&ENCODER(run->mutex)); + A_MUTEX_LOCK(&ENCODER(run->mutex)); encoder_run_type = ENCODER(run->type); encoder_run_quality = ENCODER(run->quality); - A_PTHREAD_M_UNLOCK(&ENCODER(run->mutex)); + A_MUTEX_UNLOCK(&ENCODER(run->mutex)); assert((buf = evbuffer_new())); @@ -653,11 +653,11 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv bool picture_updated = false; # define UNLOCK_STREAM \ - { server->run->stream->updated = false; A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); } + { server->run->stream->updated = false; A_MUTEX_UNLOCK(&server->run->stream->mutex); } if (server->run->stream->updated) { LOG_DEBUG("Refreshing HTTP exposed ..."); - A_PTHREAD_M_LOCK(&server->run->stream->mutex); + A_MUTEX_LOCK(&server->run->stream->mutex); if (server->run->stream->picture.size > 0) { // If online picture_updated = _expose_new_picture(server); UNLOCK_STREAM; diff --git a/src/main.c b/src/main.c index 468528c..1024c4e 100644 --- a/src/main.c +++ b/src/main.c @@ -379,10 +379,10 @@ int main(int argc, char *argv[]) { _ctx = &ctx; if ((exit_code = http_server_listen(server)) == 0) { - A_PTHREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); - A_PTHREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); - A_PTHREAD_JOIN(server_loop_tid); - A_PTHREAD_JOIN(stream_loop_tid); + A_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); + A_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); + A_THREAD_JOIN(server_loop_tid); + A_THREAD_JOIN(stream_loop_tid); } } diff --git a/src/stream.c b/src/stream.c index 0867393..fe4961d 100644 --- a/src/stream.c +++ b/src/stream.c @@ -63,13 +63,13 @@ struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { A_CALLOC(stream, 1); stream->dev = dev; stream->encoder = encoder; - A_PTHREAD_M_INIT(&stream->mutex); + A_MUTEX_INIT(&stream->mutex); stream->proc = proc; return stream; } void stream_destroy(struct stream_t *stream) { - A_PTHREAD_M_DESTROY(&stream->mutex); + A_MUTEX_DESTROY(&stream->mutex); free(stream->proc); free(stream); } @@ -105,9 +105,9 @@ void stream_loop(struct stream_t *stream) { SEP_DEBUG('-'); LOG_DEBUG("Waiting for workers ..."); - A_PTHREAD_M_LOCK(&pool.free_workers_mutex); - A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); - A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); + A_MUTEX_LOCK(&pool.free_workers_mutex); + A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); + A_MUTEX_UNLOCK(&pool.free_workers_mutex); if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) { if (oldest_worker->job_failed) { @@ -250,17 +250,17 @@ void stream_loop(struct stream_t *stream) { } last_worker->order_next = NULL; - A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex)); + A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex)); FREE_WORKER(ctx.buf_index) = buf_info.index; FREE_WORKER(has_job) = true; - A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex)); - A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond)); + A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex)); + A_COND_SIGNAL(&FREE_WORKER(has_job_cond)); # undef FREE_WORKER - A_PTHREAD_M_LOCK(&pool.free_workers_mutex); + A_MUTEX_LOCK(&pool.free_workers_mutex); pool.free_workers -= 1; - A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); + A_MUTEX_UNLOCK(&pool.free_workers_mutex); goto next_handlers; // Поток сам освободит буфер @@ -287,13 +287,13 @@ void stream_loop(struct stream_t *stream) { } } - A_PTHREAD_M_LOCK(&stream->mutex); + A_MUTEX_LOCK(&stream->mutex); stream->picture.size = 0; // On stream offline free(stream->picture.data); stream->width = 0; stream->height = 0; stream->updated = true; - A_PTHREAD_M_UNLOCK(&stream->mutex); + A_MUTEX_UNLOCK(&stream->mutex); } _stream_destroy_workers(stream, &pool); @@ -312,7 +312,7 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { # define PICTURE(_next) stream->dev->run->pictures[buf_index]._next - A_PTHREAD_M_LOCK(&stream->mutex); + A_MUTEX_LOCK(&stream->mutex); stream->picture.size = PICTURE(size); stream->picture.allocated = PICTURE(allocated); @@ -327,7 +327,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) stream->height = stream->dev->run->height; stream->updated = true; - A_PTHREAD_M_UNLOCK(&stream->mutex); + A_MUTEX_UNLOCK(&stream->mutex); # undef PICTURE } @@ -341,11 +341,11 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker for (unsigned number = 0; number < dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next - A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex)); + A_MUTEX_LOCK(&WORKER(last_comp_time_mutex)); if (WORKER(last_comp_time) > 0) { sum_comp_time += WORKER(last_comp_time); } - A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex)); + A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex)); # undef WORKER } @@ -408,8 +408,8 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool->workers_stop = false; A_CALLOC(pool->workers, stream->dev->n_workers); - A_PTHREAD_M_INIT(&pool->free_workers_mutex); - A_PTHREAD_C_INIT(&pool->free_workers_cond); + A_MUTEX_INIT(&pool->free_workers_mutex); + A_COND_INIT(&pool->free_workers_cond); for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next @@ -417,8 +417,8 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t pool->free_workers += 1; - A_PTHREAD_M_INIT(&WORKER(has_job_mutex)); - A_PTHREAD_C_INIT(&WORKER(has_job_cond)); + A_MUTEX_INIT(&WORKER(has_job_mutex)); + A_COND_INIT(&WORKER(has_job_cond)); CTX(number) = number; CTX(dev) = stream->dev; @@ -440,7 +440,7 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t CTX(free_workers) = &pool->free_workers; CTX(free_workers_cond) = &pool->free_workers_cond; - A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx)); + A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx)); # undef CTX # undef WORKER @@ -454,9 +454,9 @@ static void *_stream_worker_thread(void *v_ctx) { while (!*ctx->proc_stop && !*ctx->workers_stop) { LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); - A_PTHREAD_M_LOCK(ctx->has_job_mutex); - A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); - A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); + A_MUTEX_LOCK(ctx->has_job_mutex); + A_COND_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); + A_MUTEX_UNLOCK(ctx->has_job_mutex); if (!*ctx->workers_stop) { # define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next @@ -475,9 +475,9 @@ static void *_stream_worker_thread(void *v_ctx) { long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time; - A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); + A_MUTEX_LOCK(ctx->last_comp_time_mutex); *ctx->last_comp_time = last_comp_time; - A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); + A_MUTEX_UNLOCK(ctx->last_comp_time_mutex); LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u", PICTURE(size), last_comp_time, ctx->number, ctx->buf_index); @@ -489,10 +489,10 @@ static void *_stream_worker_thread(void *v_ctx) { # undef PICTURE } - A_PTHREAD_M_LOCK(ctx->free_workers_mutex); + A_MUTEX_LOCK(ctx->free_workers_mutex); *ctx->free_workers += 1; - A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex); - A_PTHREAD_C_SIGNAL(ctx->free_workers_cond); + A_MUTEX_UNLOCK(ctx->free_workers_mutex); + A_COND_SIGNAL(ctx->free_workers_cond); } LOG_DEBUG("Bye-bye (worker %u)", ctx->number); @@ -507,20 +507,20 @@ static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next - A_PTHREAD_M_LOCK(&WORKER(has_job_mutex)); + A_MUTEX_LOCK(&WORKER(has_job_mutex)); WORKER(has_job) = true; // Final job: die - A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex)); - A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond)); + A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); + A_COND_SIGNAL(&WORKER(has_job_cond)); - A_PTHREAD_JOIN(WORKER(tid)); - A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex)); - A_PTHREAD_C_DESTROY(&WORKER(has_job_cond)); + A_THREAD_JOIN(WORKER(tid)); + A_MUTEX_DESTROY(&WORKER(has_job_mutex)); + A_COND_DESTROY(&WORKER(has_job_cond)); # undef WORKER } - A_PTHREAD_M_DESTROY(&pool->free_workers_mutex); - A_PTHREAD_C_DESTROY(&pool->free_workers_cond); + A_MUTEX_DESTROY(&pool->free_workers_mutex); + A_COND_DESTROY(&pool->free_workers_cond); free(pool->workers); } diff --git a/src/tools.h b/src/tools.h index 830c398..bdb2835 100644 --- a/src/tools.h +++ b/src/tools.h @@ -35,18 +35,18 @@ #include -#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg)) -#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL)) +#define A_THREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg)) +#define A_THREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL)) -#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL)) -#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex)) -#define A_PTHREAD_M_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex)) -#define A_PTHREAD_M_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex)) +#define A_MUTEX_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL)) +#define A_MUTEX_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex)) +#define A_MUTEX_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex)) +#define A_MUTEX_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex)) -#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) -#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) -#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) -#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } +#define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) +#define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) +#define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) +#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } #define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest))))) #define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest)))))