refactoring

This commit is contained in:
Maxim Devaev
2022-07-20 14:55:47 +03:00
parent 54af47fc43
commit b693c24411
13 changed files with 90 additions and 90 deletions

View File

@@ -118,8 +118,8 @@ us_audio_s *us_audio_init(const char *name, unsigned pcm_hz) {
US_JLOG_INFO("audio", "Pipeline configured on %uHz; capturing ...", audio->pcm_hz);
audio->tids_created = true;
US_THREAD_CREATE(&audio->enc_tid, _encoder_thread, audio);
US_THREAD_CREATE(&audio->pcm_tid, _pcm_thread, audio);
US_THREAD_CREATE(audio->enc_tid, _encoder_thread, audio);
US_THREAD_CREATE(audio->pcm_tid, _pcm_thread, audio);
return audio;

View File

@@ -38,11 +38,11 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
atomic_init(&client->stop, false);
client->video_queue = us_queue_init(1024);
US_THREAD_CREATE(&client->video_tid, _video_thread, client);
US_THREAD_CREATE(client->video_tid, _video_thread, client);
if (has_audio) {
client->audio_queue = us_queue_init(64);
US_THREAD_CREATE(&client->audio_tid, _audio_thread, client);
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
}
return client;
}

View File

@@ -78,11 +78,11 @@ static atomic_bool _g_stop = false;
static atomic_bool _g_has_watchers = false;
#define _LOCK_VIDEO US_MUTEX_LOCK(&_g_video_lock)
#define _UNLOCK_VIDEO US_MUTEX_UNLOCK(&_g_video_lock)
#define _LOCK_VIDEO US_MUTEX_LOCK(_g_video_lock)
#define _UNLOCK_VIDEO US_MUTEX_UNLOCK(_g_video_lock)
#define _LOCK_AUDIO US_MUTEX_LOCK(&_g_audio_lock)
#define _UNLOCK_AUDIO US_MUTEX_UNLOCK(&_g_audio_lock)
#define _LOCK_AUDIO US_MUTEX_LOCK(_g_audio_lock)
#define _UNLOCK_AUDIO US_MUTEX_UNLOCK(_g_audio_lock)
#define _LOCK_ALL { _LOCK_VIDEO; _LOCK_AUDIO; }
#define _UNLOCK_ALL { _UNLOCK_AUDIO; _UNLOCK_VIDEO; }
@@ -260,10 +260,10 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
_g_rtpv = us_rtpv_init(_relay_rtp_clients, _g_config->video_zero_playout_delay);
if (_g_config->audio_dev_name != NULL) {
_g_rtpa = us_rtpa_init(_relay_rtp_clients);
US_THREAD_CREATE(&_g_audio_tid, _audio_thread, NULL);
US_THREAD_CREATE(_g_audio_tid, _audio_thread, NULL);
}
US_THREAD_CREATE(&_g_video_rtp_tid, _video_rtp_thread, NULL);
US_THREAD_CREATE(&_g_video_sink_tid, _video_sink_thread, NULL);
US_THREAD_CREATE(_g_video_rtp_tid, _video_rtp_thread, NULL);
US_THREAD_CREATE(_g_video_sink_tid, _video_sink_thread, NULL);
atomic_store(&_g_ready, true);
return 0;

View File

@@ -28,7 +28,7 @@ us_queue_s *us_queue_init(unsigned capacity) {
US_CALLOC(queue, 1);
US_CALLOC(queue->items, capacity);
queue->capacity = capacity;
US_MUTEX_INIT(&queue->mutex);
US_MUTEX_INIT(queue->mutex);
pthread_condattr_t attrs;
assert(!pthread_condattr_init(&attrs));
@@ -49,9 +49,9 @@ void us_queue_destroy(us_queue_s *queue) {
assert(!clock_gettime(CLOCK_MONOTONIC, &m_ts)); \
us_ld_to_timespec(us_timespec_to_ld(&m_ts) + timeout, &m_ts); \
while (x_var) { \
const int err = pthread_cond_timedwait(x_cond, &queue->mutex, &m_ts); \
const int err = pthread_cond_timedwait(&(x_cond), &queue->mutex, &m_ts); \
if (err == ETIMEDOUT) { \
US_MUTEX_UNLOCK(&queue->mutex); \
US_MUTEX_UNLOCK(queue->mutex); \
return -1; \
} \
assert(!err); \
@@ -59,32 +59,32 @@ void us_queue_destroy(us_queue_s *queue) {
}
int us_queue_put(us_queue_s *queue, void *item, long double timeout) {
US_MUTEX_LOCK(&queue->mutex);
US_MUTEX_LOCK(queue->mutex);
if (timeout == 0) {
if (queue->size == queue->capacity) {
US_MUTEX_UNLOCK(&queue->mutex);
US_MUTEX_UNLOCK(queue->mutex);
return -1;
}
} else {
_WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond);
_WAIT_OR_UNLOCK(queue->size == queue->capacity, queue->full_cond);
}
queue->items[queue->in] = item;
++queue->size;
++queue->in;
queue->in %= queue->capacity;
US_MUTEX_UNLOCK(&queue->mutex);
US_MUTEX_UNLOCK(queue->mutex);
assert(!pthread_cond_broadcast(&queue->empty_cond));
return 0;
}
int us_queue_get(us_queue_s *queue, void **item, long double timeout) {
US_MUTEX_LOCK(&queue->mutex);
_WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond);
US_MUTEX_LOCK(queue->mutex);
_WAIT_OR_UNLOCK(queue->size == 0, queue->empty_cond);
*item = queue->items[queue->out];
--queue->size;
++queue->out;
queue->out %= queue->capacity;
US_MUTEX_UNLOCK(&queue->mutex);
US_MUTEX_UNLOCK(queue->mutex);
assert(!pthread_cond_broadcast(&queue->full_cond));
return 0;
}
@@ -92,8 +92,8 @@ int us_queue_get(us_queue_s *queue, void **item, long double timeout) {
#undef _WAIT_OR_UNLOCK
int us_queue_get_free(us_queue_s *queue) {
US_MUTEX_LOCK(&queue->mutex);
US_MUTEX_LOCK(queue->mutex);
const unsigned size = queue->size;
US_MUTEX_UNLOCK(&queue->mutex);
US_MUTEX_UNLOCK(queue->mutex);
return queue->capacity - size;
}

View File

@@ -38,12 +38,12 @@ us_rtpv_s *us_rtpv_init(us_rtp_callback_f callback, bool zero_playout_delay) {
rtpv->callback = callback;
rtpv->sps = us_frame_init();
rtpv->pps = us_frame_init();
US_MUTEX_INIT(&rtpv->mutex);
US_MUTEX_INIT(rtpv->mutex);
return rtpv;
}
void us_rtpv_destroy(us_rtpv_s *rtpv) {
US_MUTEX_DESTROY(&rtpv->mutex);
US_MUTEX_DESTROY(rtpv->mutex);
us_frame_destroy(rtpv->pps);
us_frame_destroy(rtpv->sps);
us_rtp_destroy(rtpv->rtp);
@@ -51,10 +51,10 @@ void us_rtpv_destroy(us_rtpv_s *rtpv) {
}
char *us_rtpv_make_sdp(us_rtpv_s *rtpv) {
US_MUTEX_LOCK(&rtpv->mutex);
US_MUTEX_LOCK(rtpv->mutex);
if (rtpv->sps->used == 0 || rtpv->pps->used == 0) {
US_MUTEX_UNLOCK(&rtpv->mutex);
US_MUTEX_UNLOCK(rtpv->mutex);
return NULL;
}
@@ -63,7 +63,7 @@ char *us_rtpv_make_sdp(us_rtpv_s *rtpv) {
us_base64_encode(rtpv->sps->data, rtpv->sps->used, &sps, NULL);
us_base64_encode(rtpv->pps->data, rtpv->pps->used, &pps, NULL);
US_MUTEX_UNLOCK(&rtpv->mutex);
US_MUTEX_UNLOCK(rtpv->mutex);
# define PAYLOAD rtpv->rtp->payload
// https://tools.ietf.org/html/rfc6184
@@ -145,9 +145,9 @@ void _rtpv_process_nalu(us_rtpv_s *rtpv, const uint8_t *data, size_t size, uint3
case 8: ps = rtpv->pps; break;
}
if (ps != NULL) {
US_MUTEX_LOCK(&rtpv->mutex);
US_MUTEX_LOCK(rtpv->mutex);
us_frame_set_data(ps, data, size);
US_MUTEX_UNLOCK(&rtpv->mutex);
US_MUTEX_UNLOCK(rtpv->mutex);
}
# define DG rtpv->rtp->datagram

View File

@@ -55,19 +55,19 @@ extern pthread_mutex_t us_log_mutex;
#define US_LOGGING_INIT { \
us_log_level = US_LOG_LEVEL_INFO; \
us_log_colored = isatty(2); \
US_MUTEX_INIT(&us_log_mutex); \
US_MUTEX_INIT(us_log_mutex); \
}
#define US_LOGGING_DESTROY US_MUTEX_DESTROY(&us_log_mutex)
#define US_LOGGING_DESTROY US_MUTEX_DESTROY(us_log_mutex)
#define US_LOGGING_LOCK US_MUTEX_LOCK(&us_log_mutex)
#define US_LOGGING_UNLOCK US_MUTEX_UNLOCK(&us_log_mutex)
#define US_LOGGING_LOCK US_MUTEX_LOCK(us_log_mutex)
#define US_LOGGING_UNLOCK US_MUTEX_UNLOCK(us_log_mutex)
#define US_COLOR_GRAY "\x1b[30;1m"
#define US_COLOR_RED "\x1b[31;1m"
#define US_COLOR_GREEN "\x1b[32;1m"
#define US_COLOR_YELLOW "\x1b[33;1m"
#define US_COLOR_YELLOW "\x1b[33;1m"
#define US_COLOR_BLUE "\x1b[34;1m"
#define US_COLOR_CYAN "\x1b[36;1m"
#define US_COLOR_RESET "\x1b[0m"

View File

@@ -46,7 +46,7 @@
# define US_MAX_THREAD_NAME ((size_t)16)
#endif
#define US_THREAD_CREATE(x_tid, x_func, x_arg) assert(!pthread_create((x_tid), NULL, (x_func), (x_arg)))
#define US_THREAD_CREATE(x_tid, x_func, x_arg) assert(!pthread_create(&(x_tid), NULL, (x_func), (x_arg)))
#define US_THREAD_JOIN(x_tid) assert(!pthread_join((x_tid), NULL))
#ifdef WITH_PTHREAD_NP
@@ -59,15 +59,15 @@
# define US_THREAD_RENAME(_fmt, ...)
#endif
#define US_MUTEX_INIT(x_mutex) assert(!pthread_mutex_init((x_mutex), NULL))
#define US_MUTEX_DESTROY(x_mutex) assert(!pthread_mutex_destroy(x_mutex))
#define US_MUTEX_LOCK(x_mutex) assert(!pthread_mutex_lock(x_mutex))
#define US_MUTEX_UNLOCK(x_mutex) assert(!pthread_mutex_unlock(x_mutex))
#define US_MUTEX_INIT(x_mutex) assert(!pthread_mutex_init(&(x_mutex), NULL))
#define US_MUTEX_DESTROY(x_mutex) assert(!pthread_mutex_destroy(&(x_mutex)))
#define US_MUTEX_LOCK(x_mutex) assert(!pthread_mutex_lock(&(x_mutex)))
#define US_MUTEX_UNLOCK(x_mutex) assert(!pthread_mutex_unlock(&(x_mutex)))
#define US_COND_INIT(x_cond) assert(!pthread_cond_init((x_cond), NULL))
#define US_COND_DESTROY(x_cond) assert(!pthread_cond_destroy(x_cond))
#define US_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
#define US_COND_WAIT_TRUE(x_var, x_cond, x_mutex) { while(!(x_var)) assert(!pthread_cond_wait((x_cond), (x_mutex))); }
#define US_COND_INIT(x_cond) assert(!pthread_cond_init(&(x_cond), NULL))
#define US_COND_DESTROY(x_cond) assert(!pthread_cond_destroy(&(x_cond)))
#define US_COND_SIGNAL(x_cond) assert(!pthread_cond_signal(&(x_cond)))
#define US_COND_WAIT_TRUE(x_var, x_cond, x_mutex) { while(!(x_var)) assert(!pthread_cond_wait(&(x_cond), &(x_mutex))); }
#ifdef WITH_PTHREAD_NP

View File

@@ -51,7 +51,7 @@ us_encoder_s *us_encoder_init(void) {
US_CALLOC(run, 1);
run->type = US_ENCODER_TYPE_CPU;
run->quality = 80;
US_MUTEX_INIT(&run->mutex);
US_MUTEX_INIT(run->mutex);
us_encoder_s *enc;
US_CALLOC(enc, 1);
@@ -68,7 +68,7 @@ void us_encoder_destroy(us_encoder_s *enc) {
}
free(_ER(m2ms));
}
US_MUTEX_DESTROY(&_ER(mutex));
US_MUTEX_DESTROY(_ER(mutex));
free(enc->run);
free(enc);
}
@@ -148,13 +148,13 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
US_LOG_INFO("Using JPEG quality: %u%%", quality);
}
US_MUTEX_LOCK(&_ER(mutex));
US_MUTEX_LOCK(_ER(mutex));
_ER(type) = type;
_ER(quality) = quality;
if (cpu_forced) {
_ER(cpu_forced) = true;
}
US_MUTEX_UNLOCK(&_ER(mutex));
US_MUTEX_UNLOCK(_ER(mutex));
const long double desired_interval = (
dev->desired_fps > 0 && (dev->desired_fps < dev->run->hw_fps || dev->run->hw_fps == 0)
@@ -172,10 +172,10 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
}
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality) {
US_MUTEX_LOCK(&_ER(mutex));
US_MUTEX_LOCK(_ER(mutex));
*type = _ER(type);
*quality = _ER(quality);
US_MUTEX_UNLOCK(&_ER(mutex));
US_MUTEX_UNLOCK(_ER(mutex));
}
static void *_worker_job_init(void *v_enc) {
@@ -236,8 +236,8 @@ static bool _worker_run_job(us_worker_s *wr) {
error:
US_LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf.index);
US_LOG_ERROR("Error while compressing buffer, falling back to CPU");
US_MUTEX_LOCK(&_ER(mutex));
US_MUTEX_LOCK(_ER(mutex));
_ER(cpu_forced) = true;
US_MUTEX_UNLOCK(&_ER(mutex));
US_MUTEX_UNLOCK(_ER(mutex));
return false;
}

View File

@@ -57,7 +57,7 @@ void us_gpio_init(void) {
|| us_gpio.stream_online.pin >= 0
|| us_gpio.has_http_clients.pin >= 0
) {
US_MUTEX_INIT(&us_gpio.mutex);
US_MUTEX_INIT(us_gpio.mutex);
US_LOG_INFO("GPIO: Using chip device: %s", us_gpio.path);
if ((us_gpio.chip = gpiod_chip_open(us_gpio.path)) != NULL) {
_gpio_output_init(&us_gpio.prog_running);
@@ -76,7 +76,7 @@ void us_gpio_destroy(void) {
if (us_gpio.chip != NULL) {
gpiod_chip_close(us_gpio.chip);
us_gpio.chip = NULL;
US_MUTEX_DESTROY(&us_gpio.mutex);
US_MUTEX_DESTROY(us_gpio.mutex);
}
}
@@ -86,7 +86,7 @@ int us_gpio_inner_set(us_gpio_output_s *output, bool state) {
assert(us_gpio.chip != NULL);
assert(output->line != NULL);
assert(output->state != state); // Must be checked in macro for the performance
US_MUTEX_LOCK(&us_gpio.mutex);
US_MUTEX_LOCK(us_gpio.mutex);
if (gpiod_line_set_value(output->line, (int)state) < 0) { \
US_LOG_PERROR("GPIO: Can't write value %d to line %s (will be disabled)", state, output->consumer); \
@@ -94,7 +94,7 @@ int us_gpio_inner_set(us_gpio_output_s *output, bool state) {
retval = -1;
}
US_MUTEX_UNLOCK(&us_gpio.mutex);
US_MUTEX_UNLOCK(us_gpio.mutex);
return retval;
}

View File

@@ -856,7 +856,7 @@ static void _http_refresher(UNUSED int fd, UNUSED short what, void *v_server) {
static bool _expose_new_frame(us_server_s *server) {
bool updated = false;
US_MUTEX_LOCK(&_VID(mutex));
US_MUTEX_LOCK(_VID(mutex));
US_LOG_DEBUG("HTTP: Updating exposed frame (online=%d) ...", _VID(frame->online));
@@ -895,7 +895,7 @@ static bool _expose_new_frame(us_server_s *server) {
updated = true;
not_updated:
atomic_store(&_VID(updated), false);
US_MUTEX_UNLOCK(&_VID(mutex));
US_MUTEX_UNLOCK(_VID(mutex));
return updated;
}

View File

@@ -129,8 +129,8 @@ int main(int argc, char *argv[]) {
pthread_t stream_loop_tid;
pthread_t server_loop_tid;
US_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL);
US_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL);
US_THREAD_CREATE(stream_loop_tid, _stream_loop_thread, NULL);
US_THREAD_CREATE(server_loop_tid, _server_loop_thread, NULL);
US_THREAD_JOIN(server_loop_tid);
US_THREAD_JOIN(stream_loop_tid);
}

View File

@@ -52,7 +52,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
US_CALLOC(video, 1);
video->frame = us_frame_init();
atomic_init(&video->updated, false);
US_MUTEX_INIT(&video->mutex);
US_MUTEX_INIT(video->mutex);
atomic_init(&video->has_clients, false);
run->video = video;
@@ -69,7 +69,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
}
void us_stream_destroy(us_stream_s *stream) {
US_MUTEX_DESTROY(&_RUN(video->mutex));
US_MUTEX_DESTROY(_RUN(video->mutex));
us_frame_destroy(_RUN(video->frame));
free(_RUN(video));
free(stream->run);
@@ -295,7 +295,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne
us_frame_s *new = NULL;
US_MUTEX_LOCK(&VID(mutex));
US_MUTEX_LOCK(VID(mutex));
if (frame != NULL) {
new = frame;
@@ -341,7 +341,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne
VID(captured_fps) = captured_fps;
atomic_store(&VID(updated), true);
US_MUTEX_UNLOCK(&VID(mutex));
US_MUTEX_UNLOCK(VID(mutex));
new = (frame ? frame : stream->blank);
_SINK_PUT(sink, new);

View File

@@ -46,8 +46,8 @@ us_workers_pool_s *us_workers_pool_init(
pool->n_workers = n_workers;
US_CALLOC(pool->workers, pool->n_workers);
US_MUTEX_INIT(&pool->free_workers_mutex);
US_COND_INIT(&pool->free_workers_cond);
US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
@@ -55,14 +55,14 @@ us_workers_pool_s *us_workers_pool_init(
WR(number) = number;
US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
US_MUTEX_INIT(&WR(has_job_mutex));
US_MUTEX_INIT(WR(has_job_mutex));
atomic_init(&WR(has_job), false);
US_COND_INIT(&WR(has_job_cond));
US_COND_INIT(WR(has_job_cond));
WR(pool) = pool;
WR(job) = job_init(job_init_arg);
US_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
US_THREAD_CREATE(WR(tid), _worker_thread, (void *)&(pool->workers[number]));
pool->free_workers += 1;
# undef WR
@@ -77,14 +77,14 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
US_MUTEX_LOCK(&WR(has_job_mutex));
US_MUTEX_LOCK(WR(has_job_mutex));
atomic_store(&WR(has_job), true); // Final job: die
US_MUTEX_UNLOCK(&WR(has_job_mutex));
US_COND_SIGNAL(&WR(has_job_cond));
US_MUTEX_UNLOCK(WR(has_job_mutex));
US_COND_SIGNAL(WR(has_job_cond));
US_THREAD_JOIN(WR(tid));
US_MUTEX_DESTROY(&WR(has_job_mutex));
US_COND_DESTROY(&WR(has_job_cond));
US_MUTEX_DESTROY(WR(has_job_mutex));
US_COND_DESTROY(WR(has_job_cond));
free(WR(name));
@@ -93,8 +93,8 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
# undef WR
}
US_MUTEX_DESTROY(&pool->free_workers_mutex);
US_COND_DESTROY(&pool->free_workers_cond);
US_MUTEX_DESTROY(pool->free_workers_mutex);
US_COND_DESTROY(pool->free_workers_cond);
free(pool->workers);
free(pool);
@@ -103,9 +103,9 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
us_worker_s *ready_wr = NULL;
US_MUTEX_LOCK(&pool->free_workers_mutex);
US_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
US_MUTEX_UNLOCK(&pool->free_workers_mutex);
US_MUTEX_LOCK(pool->free_workers_mutex);
US_COND_WAIT_TRUE(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex);
US_MUTEX_UNLOCK(pool->free_workers_mutex);
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
ready_wr = pool->oldest_wr;
@@ -146,15 +146,15 @@ void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, vo
}
pool->latest_wr->next_wr = NULL;
US_MUTEX_LOCK(&ready_wr->has_job_mutex);
US_MUTEX_LOCK(ready_wr->has_job_mutex);
//ready_wr->job = job;
atomic_store(&ready_wr->has_job, true);
US_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
US_COND_SIGNAL(&ready_wr->has_job_cond);
US_MUTEX_UNLOCK(ready_wr->has_job_mutex);
US_COND_SIGNAL(ready_wr->has_job_cond);
US_MUTEX_LOCK(&pool->free_workers_mutex);
US_MUTEX_LOCK(pool->free_workers_mutex);
pool->free_workers -= 1;
US_MUTEX_UNLOCK(&pool->free_workers_mutex);
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}
long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, us_worker_s *ready_wr) {
@@ -184,9 +184,9 @@ static void *_worker_thread(void *v_worker) {
while (!atomic_load(&wr->pool->stop)) {
US_LOG_DEBUG("Worker %s waiting for a new job ...", wr->name);
US_MUTEX_LOCK(&wr->has_job_mutex);
US_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
US_MUTEX_UNLOCK(&wr->has_job_mutex);
US_MUTEX_LOCK(wr->has_job_mutex);
US_COND_WAIT_TRUE(atomic_load(&wr->has_job), wr->has_job_cond, wr->has_job_mutex);
US_MUTEX_UNLOCK(wr->has_job_mutex);
if (!atomic_load(&wr->pool->stop)) {
const long double job_start_ts = us_get_now_monotonic();
@@ -199,10 +199,10 @@ static void *_worker_thread(void *v_worker) {
atomic_store(&wr->has_job, false);
}
US_MUTEX_LOCK(&wr->pool->free_workers_mutex);
US_MUTEX_LOCK(wr->pool->free_workers_mutex);
wr->pool->free_workers += 1;
US_MUTEX_UNLOCK(&wr->pool->free_workers_mutex);
US_COND_SIGNAL(&wr->pool->free_workers_cond);
US_MUTEX_UNLOCK(wr->pool->free_workers_mutex);
US_COND_SIGNAL(wr->pool->free_workers_cond);
}
US_LOG_DEBUG("Bye-bye (worker %s)", wr->name);