From c874929e9d8ae5517f1270dc3d3e2f015b130cca Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 10 Jul 2022 02:54:32 +0300 Subject: [PATCH] long double queue timeout --- janus/src/audio.c | 8 ++++---- janus/src/queue.c | 12 ++++++------ janus/src/queue.h | 6 +++--- src/libs/tools.h | 13 +++++++++++++ 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/janus/src/audio.c b/janus/src/audio.c index 8ed653c..6b007ba 100644 --- a/janus/src/audio.c +++ b/janus/src/audio.c @@ -159,7 +159,7 @@ int audio_get_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts return -1; } _enc_buffer_s *buf; - if (!queue_get(audio->enc_queue, (void **)&buf, 1)) { + if (!queue_get(audio->enc_queue, (void **)&buf, 0.1)) { if (*size < buf->used) { free(buf); return -3; @@ -193,7 +193,7 @@ static void *_pcm_thread(void *v_audio) { _pcm_buffer_s *out; A_CALLOC(out, 1); memcpy(out->data, in, audio->pcm_size); - assert(!queue_put(audio->pcm_queue, out, 1)); + assert(!queue_put(audio->pcm_queue, out, 0.1)); } else { JLOG_ERROR("audio", "PCM queue is full"); } @@ -211,7 +211,7 @@ static void *_encoder_thread(void *v_audio) { while (!atomic_load(&audio->stop)) { _pcm_buffer_s *in; - if (!queue_get(audio->pcm_queue, (void **)&in, 1)) { + if (!queue_get(audio->pcm_queue, (void **)&in, 0.1)) { int16_t *in_ptr; if (audio->res) { assert(audio->pcm_hz != ENCODER_INPUT_HZ); @@ -239,7 +239,7 @@ static void *_encoder_thread(void *v_audio) { audio->pts += HZ_TO_FRAMES(ENCODER_INPUT_HZ); if (queue_get_free(audio->enc_queue)) { - assert(!queue_put(audio->enc_queue, out, 1)); + assert(!queue_put(audio->enc_queue, out, 0.1)); } else { JLOG_ERROR("audio", "OPUS encoder queue is full"); free(out); diff --git a/janus/src/queue.c b/janus/src/queue.c index 54d5825..8c3d2f5 100644 --- a/janus/src/queue.c +++ b/janus/src/queue.c @@ -45,11 +45,11 @@ void queue_destroy(queue_s *queue) { } #define WAIT_OR_UNLOCK(_var, _cond) { \ - struct timespec ts; \ - assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \ - ts.tv_sec += timeout; \ + struct timespec _ts; \ + assert(!clock_gettime(CLOCK_MONOTONIC, &_ts)); \ + ld_to_timespec(timespec_to_ld(&_ts) + timeout, &_ts); \ while (_var) { \ - int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \ + int err = pthread_cond_timedwait(_cond, &queue->mutex, &_ts); \ if (err == ETIMEDOUT) { \ A_MUTEX_UNLOCK(&queue->mutex); \ return -1; \ @@ -58,7 +58,7 @@ void queue_destroy(queue_s *queue) { } \ } -int queue_put(queue_s *queue, void *item, unsigned timeout) { +int queue_put(queue_s *queue, void *item, long double timeout) { A_MUTEX_LOCK(&queue->mutex); WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond); queue->items[queue->in] = item; @@ -70,7 +70,7 @@ int queue_put(queue_s *queue, void *item, unsigned timeout) { return 0; } -int queue_get(queue_s *queue, void **item, unsigned timeout) { +int queue_get(queue_s *queue, void **item, long double timeout) { A_MUTEX_LOCK(&queue->mutex); WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond); *item = queue->items[queue->out]; diff --git a/janus/src/queue.h b/janus/src/queue.h index 6929f59..6981574 100644 --- a/janus/src/queue.h +++ b/janus/src/queue.h @@ -50,7 +50,7 @@ typedef struct { #define QUEUE_FREE_ITEMS_AND_DESTROY(_queue, _free_item) { \ while (!queue_get_free(_queue)) { \ void *_ptr; \ - assert(!queue_get(_queue, &_ptr, 1)); \ + assert(!queue_get(_queue, &_ptr, 0.1)); \ _free_item(_ptr); \ } \ queue_destroy(_queue); \ @@ -60,6 +60,6 @@ typedef struct { queue_s *queue_init(unsigned capacity); void queue_destroy(queue_s *queue); -int queue_put(queue_s *queue, void *item, unsigned timeout); -int queue_get(queue_s *queue, void **item, unsigned timeout); +int queue_put(queue_s *queue, void *item, long double timeout); +int queue_get(queue_s *queue, void **item, long double timeout); int queue_get_free(queue_s *queue); diff --git a/src/libs/tools.h b/src/libs/tools.h index 8c166fa..f91f9cf 100644 --- a/src/libs/tools.h +++ b/src/libs/tools.h @@ -147,6 +147,19 @@ INLINE unsigned get_cores_available(void) { return max_u(min_u(cores_sysconf, 4), 1); } +INLINE void ld_to_timespec(long double ld, struct timespec *ts) { + ts->tv_sec = (long)ld; + ts->tv_nsec = (ld - ts->tv_sec) * 1000000000L; + if (ts->tv_nsec > 999999999L) { + ts->tv_sec += 1; + ts->tv_nsec = 0; + } +} + +INLINE long double timespec_to_ld(const struct timespec *ts) { + return ts->tv_sec + ((long double)ts->tv_nsec) / 1000000000; +} + INLINE int flock_timedwait_monotonic(int fd, long double timeout) { long double deadline_ts = get_now_monotonic() + timeout; int retval = -1;