long double queue timeout

This commit is contained in:
Maxim Devaev
2022-07-10 02:54:32 +03:00
parent db5b9d3cd7
commit c874929e9d
4 changed files with 26 additions and 13 deletions

View File

@@ -159,7 +159,7 @@ int audio_get_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts
return -1; return -1;
} }
_enc_buffer_s *buf; _enc_buffer_s *buf;
if (!queue_get(audio->enc_queue, (void **)&buf, 1)) { if (!queue_get(audio->enc_queue, (void **)&buf, 0.1)) {
if (*size < buf->used) { if (*size < buf->used) {
free(buf); free(buf);
return -3; return -3;
@@ -193,7 +193,7 @@ static void *_pcm_thread(void *v_audio) {
_pcm_buffer_s *out; _pcm_buffer_s *out;
A_CALLOC(out, 1); A_CALLOC(out, 1);
memcpy(out->data, in, audio->pcm_size); memcpy(out->data, in, audio->pcm_size);
assert(!queue_put(audio->pcm_queue, out, 1)); assert(!queue_put(audio->pcm_queue, out, 0.1));
} else { } else {
JLOG_ERROR("audio", "PCM queue is full"); JLOG_ERROR("audio", "PCM queue is full");
} }
@@ -211,7 +211,7 @@ static void *_encoder_thread(void *v_audio) {
while (!atomic_load(&audio->stop)) { while (!atomic_load(&audio->stop)) {
_pcm_buffer_s *in; _pcm_buffer_s *in;
if (!queue_get(audio->pcm_queue, (void **)&in, 1)) { if (!queue_get(audio->pcm_queue, (void **)&in, 0.1)) {
int16_t *in_ptr; int16_t *in_ptr;
if (audio->res) { if (audio->res) {
assert(audio->pcm_hz != ENCODER_INPUT_HZ); assert(audio->pcm_hz != ENCODER_INPUT_HZ);
@@ -239,7 +239,7 @@ static void *_encoder_thread(void *v_audio) {
audio->pts += HZ_TO_FRAMES(ENCODER_INPUT_HZ); audio->pts += HZ_TO_FRAMES(ENCODER_INPUT_HZ);
if (queue_get_free(audio->enc_queue)) { if (queue_get_free(audio->enc_queue)) {
assert(!queue_put(audio->enc_queue, out, 1)); assert(!queue_put(audio->enc_queue, out, 0.1));
} else { } else {
JLOG_ERROR("audio", "OPUS encoder queue is full"); JLOG_ERROR("audio", "OPUS encoder queue is full");
free(out); free(out);

View File

@@ -45,11 +45,11 @@ void queue_destroy(queue_s *queue) {
} }
#define WAIT_OR_UNLOCK(_var, _cond) { \ #define WAIT_OR_UNLOCK(_var, _cond) { \
struct timespec ts; \ struct timespec _ts; \
assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \ assert(!clock_gettime(CLOCK_MONOTONIC, &_ts)); \
ts.tv_sec += timeout; \ ld_to_timespec(timespec_to_ld(&_ts) + timeout, &_ts); \
while (_var) { \ while (_var) { \
int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \ int err = pthread_cond_timedwait(_cond, &queue->mutex, &_ts); \
if (err == ETIMEDOUT) { \ if (err == ETIMEDOUT) { \
A_MUTEX_UNLOCK(&queue->mutex); \ A_MUTEX_UNLOCK(&queue->mutex); \
return -1; \ return -1; \
@@ -58,7 +58,7 @@ void queue_destroy(queue_s *queue) {
} \ } \
} }
int queue_put(queue_s *queue, void *item, unsigned timeout) { int queue_put(queue_s *queue, void *item, long double timeout) {
A_MUTEX_LOCK(&queue->mutex); A_MUTEX_LOCK(&queue->mutex);
WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond); WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond);
queue->items[queue->in] = item; queue->items[queue->in] = item;
@@ -70,7 +70,7 @@ int queue_put(queue_s *queue, void *item, unsigned timeout) {
return 0; return 0;
} }
int queue_get(queue_s *queue, void **item, unsigned timeout) { int queue_get(queue_s *queue, void **item, long double timeout) {
A_MUTEX_LOCK(&queue->mutex); A_MUTEX_LOCK(&queue->mutex);
WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond); WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond);
*item = queue->items[queue->out]; *item = queue->items[queue->out];

View File

@@ -50,7 +50,7 @@ typedef struct {
#define QUEUE_FREE_ITEMS_AND_DESTROY(_queue, _free_item) { \ #define QUEUE_FREE_ITEMS_AND_DESTROY(_queue, _free_item) { \
while (!queue_get_free(_queue)) { \ while (!queue_get_free(_queue)) { \
void *_ptr; \ void *_ptr; \
assert(!queue_get(_queue, &_ptr, 1)); \ assert(!queue_get(_queue, &_ptr, 0.1)); \
_free_item(_ptr); \ _free_item(_ptr); \
} \ } \
queue_destroy(_queue); \ queue_destroy(_queue); \
@@ -60,6 +60,6 @@ typedef struct {
queue_s *queue_init(unsigned capacity); queue_s *queue_init(unsigned capacity);
void queue_destroy(queue_s *queue); void queue_destroy(queue_s *queue);
int queue_put(queue_s *queue, void *item, unsigned timeout); int queue_put(queue_s *queue, void *item, long double timeout);
int queue_get(queue_s *queue, void **item, unsigned timeout); int queue_get(queue_s *queue, void **item, long double timeout);
int queue_get_free(queue_s *queue); int queue_get_free(queue_s *queue);

View File

@@ -147,6 +147,19 @@ INLINE unsigned get_cores_available(void) {
return max_u(min_u(cores_sysconf, 4), 1); return max_u(min_u(cores_sysconf, 4), 1);
} }
INLINE void ld_to_timespec(long double ld, struct timespec *ts) {
ts->tv_sec = (long)ld;
ts->tv_nsec = (ld - ts->tv_sec) * 1000000000L;
if (ts->tv_nsec > 999999999L) {
ts->tv_sec += 1;
ts->tv_nsec = 0;
}
}
INLINE long double timespec_to_ld(const struct timespec *ts) {
return ts->tv_sec + ((long double)ts->tv_nsec) / 1000000000;
}
INLINE int flock_timedwait_monotonic(int fd, long double timeout) { INLINE int flock_timedwait_monotonic(int fd, long double timeout) {
long double deadline_ts = get_now_monotonic() + timeout; long double deadline_ts = get_now_monotonic() + timeout;
int retval = -1; int retval = -1;