diff --git a/janus/src/audio.c b/janus/src/audio.c index 10b6de2..9b068ba 100644 --- a/janus/src/audio.c +++ b/janus/src/audio.c @@ -118,8 +118,8 @@ us_audio_s *us_audio_init(const char *name, unsigned pcm_hz) { US_JLOG_INFO("audio", "Pipeline configured on %uHz; capturing ...", audio->pcm_hz); audio->tids_created = true; - US_THREAD_CREATE(&audio->enc_tid, _encoder_thread, audio); - US_THREAD_CREATE(&audio->pcm_tid, _pcm_thread, audio); + US_THREAD_CREATE(audio->enc_tid, _encoder_thread, audio); + US_THREAD_CREATE(audio->pcm_tid, _pcm_thread, audio); return audio; diff --git a/janus/src/client.c b/janus/src/client.c index 7734cf7..6f32368 100644 --- a/janus/src/client.c +++ b/janus/src/client.c @@ -38,11 +38,11 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio atomic_init(&client->stop, false); client->video_queue = us_queue_init(1024); - US_THREAD_CREATE(&client->video_tid, _video_thread, client); + US_THREAD_CREATE(client->video_tid, _video_thread, client); if (has_audio) { client->audio_queue = us_queue_init(64); - US_THREAD_CREATE(&client->audio_tid, _audio_thread, client); + US_THREAD_CREATE(client->audio_tid, _audio_thread, client); } return client; } diff --git a/janus/src/plugin.c b/janus/src/plugin.c index 5c9da7a..18046d9 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -78,11 +78,11 @@ static atomic_bool _g_stop = false; static atomic_bool _g_has_watchers = false; -#define _LOCK_VIDEO US_MUTEX_LOCK(&_g_video_lock) -#define _UNLOCK_VIDEO US_MUTEX_UNLOCK(&_g_video_lock) +#define _LOCK_VIDEO US_MUTEX_LOCK(_g_video_lock) +#define _UNLOCK_VIDEO US_MUTEX_UNLOCK(_g_video_lock) -#define _LOCK_AUDIO US_MUTEX_LOCK(&_g_audio_lock) -#define _UNLOCK_AUDIO US_MUTEX_UNLOCK(&_g_audio_lock) +#define _LOCK_AUDIO US_MUTEX_LOCK(_g_audio_lock) +#define _UNLOCK_AUDIO US_MUTEX_UNLOCK(_g_audio_lock) #define _LOCK_ALL { _LOCK_VIDEO; _LOCK_AUDIO; } #define _UNLOCK_ALL { _UNLOCK_AUDIO; _UNLOCK_VIDEO; } @@ -260,10 +260,10 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { _g_rtpv = us_rtpv_init(_relay_rtp_clients, _g_config->video_zero_playout_delay); if (_g_config->audio_dev_name != NULL) { _g_rtpa = us_rtpa_init(_relay_rtp_clients); - US_THREAD_CREATE(&_g_audio_tid, _audio_thread, NULL); + US_THREAD_CREATE(_g_audio_tid, _audio_thread, NULL); } - US_THREAD_CREATE(&_g_video_rtp_tid, _video_rtp_thread, NULL); - US_THREAD_CREATE(&_g_video_sink_tid, _video_sink_thread, NULL); + US_THREAD_CREATE(_g_video_rtp_tid, _video_rtp_thread, NULL); + US_THREAD_CREATE(_g_video_sink_tid, _video_sink_thread, NULL); atomic_store(&_g_ready, true); return 0; diff --git a/janus/src/queue.c b/janus/src/queue.c index dc51f62..566e697 100644 --- a/janus/src/queue.c +++ b/janus/src/queue.c @@ -28,7 +28,7 @@ us_queue_s *us_queue_init(unsigned capacity) { US_CALLOC(queue, 1); US_CALLOC(queue->items, capacity); queue->capacity = capacity; - US_MUTEX_INIT(&queue->mutex); + US_MUTEX_INIT(queue->mutex); pthread_condattr_t attrs; assert(!pthread_condattr_init(&attrs)); @@ -49,9 +49,9 @@ void us_queue_destroy(us_queue_s *queue) { assert(!clock_gettime(CLOCK_MONOTONIC, &m_ts)); \ us_ld_to_timespec(us_timespec_to_ld(&m_ts) + timeout, &m_ts); \ while (x_var) { \ - const int err = pthread_cond_timedwait(x_cond, &queue->mutex, &m_ts); \ + const int err = pthread_cond_timedwait(&(x_cond), &queue->mutex, &m_ts); \ if (err == ETIMEDOUT) { \ - US_MUTEX_UNLOCK(&queue->mutex); \ + US_MUTEX_UNLOCK(queue->mutex); \ return -1; \ } \ assert(!err); \ @@ -59,32 +59,32 @@ void us_queue_destroy(us_queue_s *queue) { } int us_queue_put(us_queue_s *queue, void *item, long double timeout) { - US_MUTEX_LOCK(&queue->mutex); + US_MUTEX_LOCK(queue->mutex); if (timeout == 0) { if (queue->size == queue->capacity) { - US_MUTEX_UNLOCK(&queue->mutex); + US_MUTEX_UNLOCK(queue->mutex); return -1; } } else { - _WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond); + _WAIT_OR_UNLOCK(queue->size == queue->capacity, queue->full_cond); } queue->items[queue->in] = item; ++queue->size; ++queue->in; queue->in %= queue->capacity; - US_MUTEX_UNLOCK(&queue->mutex); + US_MUTEX_UNLOCK(queue->mutex); assert(!pthread_cond_broadcast(&queue->empty_cond)); return 0; } int us_queue_get(us_queue_s *queue, void **item, long double timeout) { - US_MUTEX_LOCK(&queue->mutex); - _WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond); + US_MUTEX_LOCK(queue->mutex); + _WAIT_OR_UNLOCK(queue->size == 0, queue->empty_cond); *item = queue->items[queue->out]; --queue->size; ++queue->out; queue->out %= queue->capacity; - US_MUTEX_UNLOCK(&queue->mutex); + US_MUTEX_UNLOCK(queue->mutex); assert(!pthread_cond_broadcast(&queue->full_cond)); return 0; } @@ -92,8 +92,8 @@ int us_queue_get(us_queue_s *queue, void **item, long double timeout) { #undef _WAIT_OR_UNLOCK int us_queue_get_free(us_queue_s *queue) { - US_MUTEX_LOCK(&queue->mutex); + US_MUTEX_LOCK(queue->mutex); const unsigned size = queue->size; - US_MUTEX_UNLOCK(&queue->mutex); + US_MUTEX_UNLOCK(queue->mutex); return queue->capacity - size; } diff --git a/janus/src/rtpv.c b/janus/src/rtpv.c index 02ef45c..de02961 100644 --- a/janus/src/rtpv.c +++ b/janus/src/rtpv.c @@ -38,12 +38,12 @@ us_rtpv_s *us_rtpv_init(us_rtp_callback_f callback, bool zero_playout_delay) { rtpv->callback = callback; rtpv->sps = us_frame_init(); rtpv->pps = us_frame_init(); - US_MUTEX_INIT(&rtpv->mutex); + US_MUTEX_INIT(rtpv->mutex); return rtpv; } void us_rtpv_destroy(us_rtpv_s *rtpv) { - US_MUTEX_DESTROY(&rtpv->mutex); + US_MUTEX_DESTROY(rtpv->mutex); us_frame_destroy(rtpv->pps); us_frame_destroy(rtpv->sps); us_rtp_destroy(rtpv->rtp); @@ -51,10 +51,10 @@ void us_rtpv_destroy(us_rtpv_s *rtpv) { } char *us_rtpv_make_sdp(us_rtpv_s *rtpv) { - US_MUTEX_LOCK(&rtpv->mutex); + US_MUTEX_LOCK(rtpv->mutex); if (rtpv->sps->used == 0 || rtpv->pps->used == 0) { - US_MUTEX_UNLOCK(&rtpv->mutex); + US_MUTEX_UNLOCK(rtpv->mutex); return NULL; } @@ -63,7 +63,7 @@ char *us_rtpv_make_sdp(us_rtpv_s *rtpv) { us_base64_encode(rtpv->sps->data, rtpv->sps->used, &sps, NULL); us_base64_encode(rtpv->pps->data, rtpv->pps->used, &pps, NULL); - US_MUTEX_UNLOCK(&rtpv->mutex); + US_MUTEX_UNLOCK(rtpv->mutex); # define PAYLOAD rtpv->rtp->payload // https://tools.ietf.org/html/rfc6184 @@ -145,9 +145,9 @@ void _rtpv_process_nalu(us_rtpv_s *rtpv, const uint8_t *data, size_t size, uint3 case 8: ps = rtpv->pps; break; } if (ps != NULL) { - US_MUTEX_LOCK(&rtpv->mutex); + US_MUTEX_LOCK(rtpv->mutex); us_frame_set_data(ps, data, size); - US_MUTEX_UNLOCK(&rtpv->mutex); + US_MUTEX_UNLOCK(rtpv->mutex); } # define DG rtpv->rtp->datagram diff --git a/src/libs/logging.h b/src/libs/logging.h index 39867a1..f2a4df3 100644 --- a/src/libs/logging.h +++ b/src/libs/logging.h @@ -55,19 +55,19 @@ extern pthread_mutex_t us_log_mutex; #define US_LOGGING_INIT { \ us_log_level = US_LOG_LEVEL_INFO; \ us_log_colored = isatty(2); \ - US_MUTEX_INIT(&us_log_mutex); \ + US_MUTEX_INIT(us_log_mutex); \ } -#define US_LOGGING_DESTROY US_MUTEX_DESTROY(&us_log_mutex) +#define US_LOGGING_DESTROY US_MUTEX_DESTROY(us_log_mutex) -#define US_LOGGING_LOCK US_MUTEX_LOCK(&us_log_mutex) -#define US_LOGGING_UNLOCK US_MUTEX_UNLOCK(&us_log_mutex) +#define US_LOGGING_LOCK US_MUTEX_LOCK(us_log_mutex) +#define US_LOGGING_UNLOCK US_MUTEX_UNLOCK(us_log_mutex) #define US_COLOR_GRAY "\x1b[30;1m" #define US_COLOR_RED "\x1b[31;1m" #define US_COLOR_GREEN "\x1b[32;1m" -#define US_COLOR_YELLOW "\x1b[33;1m" +#define US_COLOR_YELLOW "\x1b[33;1m" #define US_COLOR_BLUE "\x1b[34;1m" #define US_COLOR_CYAN "\x1b[36;1m" #define US_COLOR_RESET "\x1b[0m" diff --git a/src/libs/threading.h b/src/libs/threading.h index a5c3527..7f94e8c 100644 --- a/src/libs/threading.h +++ b/src/libs/threading.h @@ -46,7 +46,7 @@ # define US_MAX_THREAD_NAME ((size_t)16) #endif -#define US_THREAD_CREATE(x_tid, x_func, x_arg) assert(!pthread_create((x_tid), NULL, (x_func), (x_arg))) +#define US_THREAD_CREATE(x_tid, x_func, x_arg) assert(!pthread_create(&(x_tid), NULL, (x_func), (x_arg))) #define US_THREAD_JOIN(x_tid) assert(!pthread_join((x_tid), NULL)) #ifdef WITH_PTHREAD_NP @@ -59,15 +59,15 @@ # define US_THREAD_RENAME(_fmt, ...) #endif -#define US_MUTEX_INIT(x_mutex) assert(!pthread_mutex_init((x_mutex), NULL)) -#define US_MUTEX_DESTROY(x_mutex) assert(!pthread_mutex_destroy(x_mutex)) -#define US_MUTEX_LOCK(x_mutex) assert(!pthread_mutex_lock(x_mutex)) -#define US_MUTEX_UNLOCK(x_mutex) assert(!pthread_mutex_unlock(x_mutex)) +#define US_MUTEX_INIT(x_mutex) assert(!pthread_mutex_init(&(x_mutex), NULL)) +#define US_MUTEX_DESTROY(x_mutex) assert(!pthread_mutex_destroy(&(x_mutex))) +#define US_MUTEX_LOCK(x_mutex) assert(!pthread_mutex_lock(&(x_mutex))) +#define US_MUTEX_UNLOCK(x_mutex) assert(!pthread_mutex_unlock(&(x_mutex))) -#define US_COND_INIT(x_cond) assert(!pthread_cond_init((x_cond), NULL)) -#define US_COND_DESTROY(x_cond) assert(!pthread_cond_destroy(x_cond)) -#define US_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) -#define US_COND_WAIT_TRUE(x_var, x_cond, x_mutex) { while(!(x_var)) assert(!pthread_cond_wait((x_cond), (x_mutex))); } +#define US_COND_INIT(x_cond) assert(!pthread_cond_init(&(x_cond), NULL)) +#define US_COND_DESTROY(x_cond) assert(!pthread_cond_destroy(&(x_cond))) +#define US_COND_SIGNAL(x_cond) assert(!pthread_cond_signal(&(x_cond))) +#define US_COND_WAIT_TRUE(x_var, x_cond, x_mutex) { while(!(x_var)) assert(!pthread_cond_wait(&(x_cond), &(x_mutex))); } #ifdef WITH_PTHREAD_NP diff --git a/src/ustreamer/encoder.c b/src/ustreamer/encoder.c index 681ed37..250257a 100644 --- a/src/ustreamer/encoder.c +++ b/src/ustreamer/encoder.c @@ -51,7 +51,7 @@ us_encoder_s *us_encoder_init(void) { US_CALLOC(run, 1); run->type = US_ENCODER_TYPE_CPU; run->quality = 80; - US_MUTEX_INIT(&run->mutex); + US_MUTEX_INIT(run->mutex); us_encoder_s *enc; US_CALLOC(enc, 1); @@ -68,7 +68,7 @@ void us_encoder_destroy(us_encoder_s *enc) { } free(_ER(m2ms)); } - US_MUTEX_DESTROY(&_ER(mutex)); + US_MUTEX_DESTROY(_ER(mutex)); free(enc->run); free(enc); } @@ -148,13 +148,13 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s * US_LOG_INFO("Using JPEG quality: %u%%", quality); } - US_MUTEX_LOCK(&_ER(mutex)); + US_MUTEX_LOCK(_ER(mutex)); _ER(type) = type; _ER(quality) = quality; if (cpu_forced) { _ER(cpu_forced) = true; } - US_MUTEX_UNLOCK(&_ER(mutex)); + US_MUTEX_UNLOCK(_ER(mutex)); const long double desired_interval = ( dev->desired_fps > 0 && (dev->desired_fps < dev->run->hw_fps || dev->run->hw_fps == 0) @@ -172,10 +172,10 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s * } void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality) { - US_MUTEX_LOCK(&_ER(mutex)); + US_MUTEX_LOCK(_ER(mutex)); *type = _ER(type); *quality = _ER(quality); - US_MUTEX_UNLOCK(&_ER(mutex)); + US_MUTEX_UNLOCK(_ER(mutex)); } static void *_worker_job_init(void *v_enc) { @@ -236,8 +236,8 @@ static bool _worker_run_job(us_worker_s *wr) { error: US_LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf.index); US_LOG_ERROR("Error while compressing buffer, falling back to CPU"); - US_MUTEX_LOCK(&_ER(mutex)); + US_MUTEX_LOCK(_ER(mutex)); _ER(cpu_forced) = true; - US_MUTEX_UNLOCK(&_ER(mutex)); + US_MUTEX_UNLOCK(_ER(mutex)); return false; } diff --git a/src/ustreamer/gpio/gpio.c b/src/ustreamer/gpio/gpio.c index 7f2d416..f5cb2c5 100644 --- a/src/ustreamer/gpio/gpio.c +++ b/src/ustreamer/gpio/gpio.c @@ -57,7 +57,7 @@ void us_gpio_init(void) { || us_gpio.stream_online.pin >= 0 || us_gpio.has_http_clients.pin >= 0 ) { - US_MUTEX_INIT(&us_gpio.mutex); + US_MUTEX_INIT(us_gpio.mutex); US_LOG_INFO("GPIO: Using chip device: %s", us_gpio.path); if ((us_gpio.chip = gpiod_chip_open(us_gpio.path)) != NULL) { _gpio_output_init(&us_gpio.prog_running); @@ -76,7 +76,7 @@ void us_gpio_destroy(void) { if (us_gpio.chip != NULL) { gpiod_chip_close(us_gpio.chip); us_gpio.chip = NULL; - US_MUTEX_DESTROY(&us_gpio.mutex); + US_MUTEX_DESTROY(us_gpio.mutex); } } @@ -86,7 +86,7 @@ int us_gpio_inner_set(us_gpio_output_s *output, bool state) { assert(us_gpio.chip != NULL); assert(output->line != NULL); assert(output->state != state); // Must be checked in macro for the performance - US_MUTEX_LOCK(&us_gpio.mutex); + US_MUTEX_LOCK(us_gpio.mutex); if (gpiod_line_set_value(output->line, (int)state) < 0) { \ US_LOG_PERROR("GPIO: Can't write value %d to line %s (will be disabled)", state, output->consumer); \ @@ -94,7 +94,7 @@ int us_gpio_inner_set(us_gpio_output_s *output, bool state) { retval = -1; } - US_MUTEX_UNLOCK(&us_gpio.mutex); + US_MUTEX_UNLOCK(us_gpio.mutex); return retval; } diff --git a/src/ustreamer/http/server.c b/src/ustreamer/http/server.c index 3e1a1de..c5e6cb8 100644 --- a/src/ustreamer/http/server.c +++ b/src/ustreamer/http/server.c @@ -856,7 +856,7 @@ static void _http_refresher(UNUSED int fd, UNUSED short what, void *v_server) { static bool _expose_new_frame(us_server_s *server) { bool updated = false; - US_MUTEX_LOCK(&_VID(mutex)); + US_MUTEX_LOCK(_VID(mutex)); US_LOG_DEBUG("HTTP: Updating exposed frame (online=%d) ...", _VID(frame->online)); @@ -895,7 +895,7 @@ static bool _expose_new_frame(us_server_s *server) { updated = true; not_updated: atomic_store(&_VID(updated), false); - US_MUTEX_UNLOCK(&_VID(mutex)); + US_MUTEX_UNLOCK(_VID(mutex)); return updated; } diff --git a/src/ustreamer/main.c b/src/ustreamer/main.c index 5c20b7d..bb497a6 100644 --- a/src/ustreamer/main.c +++ b/src/ustreamer/main.c @@ -129,8 +129,8 @@ int main(int argc, char *argv[]) { pthread_t stream_loop_tid; pthread_t server_loop_tid; - US_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); - US_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); + US_THREAD_CREATE(stream_loop_tid, _stream_loop_thread, NULL); + US_THREAD_CREATE(server_loop_tid, _server_loop_thread, NULL); US_THREAD_JOIN(server_loop_tid); US_THREAD_JOIN(stream_loop_tid); } diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 360cea1..4b4749e 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -52,7 +52,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { US_CALLOC(video, 1); video->frame = us_frame_init(); atomic_init(&video->updated, false); - US_MUTEX_INIT(&video->mutex); + US_MUTEX_INIT(video->mutex); atomic_init(&video->has_clients, false); run->video = video; @@ -69,7 +69,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { } void us_stream_destroy(us_stream_s *stream) { - US_MUTEX_DESTROY(&_RUN(video->mutex)); + US_MUTEX_DESTROY(_RUN(video->mutex)); us_frame_destroy(_RUN(video->frame)); free(_RUN(video)); free(stream->run); @@ -295,7 +295,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne us_frame_s *new = NULL; - US_MUTEX_LOCK(&VID(mutex)); + US_MUTEX_LOCK(VID(mutex)); if (frame != NULL) { new = frame; @@ -341,7 +341,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne VID(captured_fps) = captured_fps; atomic_store(&VID(updated), true); - US_MUTEX_UNLOCK(&VID(mutex)); + US_MUTEX_UNLOCK(VID(mutex)); new = (frame ? frame : stream->blank); _SINK_PUT(sink, new); diff --git a/src/ustreamer/workers.c b/src/ustreamer/workers.c index d3a0021..1fbb53b 100644 --- a/src/ustreamer/workers.c +++ b/src/ustreamer/workers.c @@ -46,8 +46,8 @@ us_workers_pool_s *us_workers_pool_init( pool->n_workers = n_workers; US_CALLOC(pool->workers, pool->n_workers); - US_MUTEX_INIT(&pool->free_workers_mutex); - US_COND_INIT(&pool->free_workers_cond); + US_MUTEX_INIT(pool->free_workers_mutex); + US_COND_INIT(pool->free_workers_cond); for (unsigned number = 0; number < pool->n_workers; ++number) { # define WR(x_next) pool->workers[number].x_next @@ -55,14 +55,14 @@ us_workers_pool_s *us_workers_pool_init( WR(number) = number; US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number); - US_MUTEX_INIT(&WR(has_job_mutex)); + US_MUTEX_INIT(WR(has_job_mutex)); atomic_init(&WR(has_job), false); - US_COND_INIT(&WR(has_job_cond)); + US_COND_INIT(WR(has_job_cond)); WR(pool) = pool; WR(job) = job_init(job_init_arg); - US_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number])); + US_THREAD_CREATE(WR(tid), _worker_thread, (void *)&(pool->workers[number])); pool->free_workers += 1; # undef WR @@ -77,14 +77,14 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { for (unsigned number = 0; number < pool->n_workers; ++number) { # define WR(x_next) pool->workers[number].x_next - US_MUTEX_LOCK(&WR(has_job_mutex)); + US_MUTEX_LOCK(WR(has_job_mutex)); atomic_store(&WR(has_job), true); // Final job: die - US_MUTEX_UNLOCK(&WR(has_job_mutex)); - US_COND_SIGNAL(&WR(has_job_cond)); + US_MUTEX_UNLOCK(WR(has_job_mutex)); + US_COND_SIGNAL(WR(has_job_cond)); US_THREAD_JOIN(WR(tid)); - US_MUTEX_DESTROY(&WR(has_job_mutex)); - US_COND_DESTROY(&WR(has_job_cond)); + US_MUTEX_DESTROY(WR(has_job_mutex)); + US_COND_DESTROY(WR(has_job_cond)); free(WR(name)); @@ -93,8 +93,8 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { # undef WR } - US_MUTEX_DESTROY(&pool->free_workers_mutex); - US_COND_DESTROY(&pool->free_workers_cond); + US_MUTEX_DESTROY(pool->free_workers_mutex); + US_COND_DESTROY(pool->free_workers_cond); free(pool->workers); free(pool); @@ -103,9 +103,9 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) { us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) { us_worker_s *ready_wr = NULL; - US_MUTEX_LOCK(&pool->free_workers_mutex); - US_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); - US_MUTEX_UNLOCK(&pool->free_workers_mutex); + US_MUTEX_LOCK(pool->free_workers_mutex); + US_COND_WAIT_TRUE(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex); + US_MUTEX_UNLOCK(pool->free_workers_mutex); if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) { ready_wr = pool->oldest_wr; @@ -146,15 +146,15 @@ void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, vo } pool->latest_wr->next_wr = NULL; - US_MUTEX_LOCK(&ready_wr->has_job_mutex); + US_MUTEX_LOCK(ready_wr->has_job_mutex); //ready_wr->job = job; atomic_store(&ready_wr->has_job, true); - US_MUTEX_UNLOCK(&ready_wr->has_job_mutex); - US_COND_SIGNAL(&ready_wr->has_job_cond); + US_MUTEX_UNLOCK(ready_wr->has_job_mutex); + US_COND_SIGNAL(ready_wr->has_job_cond); - US_MUTEX_LOCK(&pool->free_workers_mutex); + US_MUTEX_LOCK(pool->free_workers_mutex); pool->free_workers -= 1; - US_MUTEX_UNLOCK(&pool->free_workers_mutex); + US_MUTEX_UNLOCK(pool->free_workers_mutex); } long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, us_worker_s *ready_wr) { @@ -184,9 +184,9 @@ static void *_worker_thread(void *v_worker) { while (!atomic_load(&wr->pool->stop)) { US_LOG_DEBUG("Worker %s waiting for a new job ...", wr->name); - US_MUTEX_LOCK(&wr->has_job_mutex); - US_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex); - US_MUTEX_UNLOCK(&wr->has_job_mutex); + US_MUTEX_LOCK(wr->has_job_mutex); + US_COND_WAIT_TRUE(atomic_load(&wr->has_job), wr->has_job_cond, wr->has_job_mutex); + US_MUTEX_UNLOCK(wr->has_job_mutex); if (!atomic_load(&wr->pool->stop)) { const long double job_start_ts = us_get_now_monotonic(); @@ -199,10 +199,10 @@ static void *_worker_thread(void *v_worker) { atomic_store(&wr->has_job, false); } - US_MUTEX_LOCK(&wr->pool->free_workers_mutex); + US_MUTEX_LOCK(wr->pool->free_workers_mutex); wr->pool->free_workers += 1; - US_MUTEX_UNLOCK(&wr->pool->free_workers_mutex); - US_COND_SIGNAL(&wr->pool->free_workers_cond); + US_MUTEX_UNLOCK(wr->pool->free_workers_mutex); + US_COND_SIGNAL(wr->pool->free_workers_cond); } US_LOG_DEBUG("Bye-bye (worker %s)", wr->name);