queue fix

This commit is contained in:
Maxim Devaev
2022-06-06 02:04:16 +03:00
parent c505a423af
commit 0f753dc654

View File

@@ -44,13 +44,14 @@ void queue_destroy(queue_s *queue) {
free(queue); free(queue);
} }
#define WAIT_LOOP(_var, _cond) { \ #define WAIT_OR_UNLOCK(_var, _cond) { \
struct timespec ts; \ struct timespec ts; \
assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \ assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \
ts.tv_sec += timeout; \ ts.tv_sec += timeout; \
while (_var) { \ while (_var) { \
int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \ int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \
if (err == ETIMEDOUT) { \ if (err == ETIMEDOUT) { \
A_MUTEX_UNLOCK(&queue->mutex); \
return -1; \ return -1; \
} \ } \
assert(!err); \ assert(!err); \
@@ -59,7 +60,7 @@ void queue_destroy(queue_s *queue) {
int queue_put(queue_s *queue, void *item, unsigned timeout) { int queue_put(queue_s *queue, void *item, unsigned timeout) {
A_MUTEX_LOCK(&queue->mutex); A_MUTEX_LOCK(&queue->mutex);
WAIT_LOOP(queue->size == queue->capacity, &queue->full_cond); WAIT_OR_UNLOCK(queue->size == queue->capacity, &queue->full_cond);
queue->items[queue->in] = item; queue->items[queue->in] = item;
++queue->size; ++queue->size;
++queue->in; ++queue->in;
@@ -71,7 +72,7 @@ int queue_put(queue_s *queue, void *item, unsigned timeout) {
int queue_get(queue_s *queue, void **item, unsigned timeout) { int queue_get(queue_s *queue, void **item, unsigned timeout) {
A_MUTEX_LOCK(&queue->mutex); A_MUTEX_LOCK(&queue->mutex);
WAIT_LOOP(queue->size == 0, &queue->empty_cond); WAIT_OR_UNLOCK(queue->size == 0, &queue->empty_cond);
*item = queue->items[queue->out]; *item = queue->items[queue->out];
--queue->size; --queue->size;
++queue->out; ++queue->out;
@@ -81,7 +82,7 @@ int queue_get(queue_s *queue, void **item, unsigned timeout) {
return 0; return 0;
} }
#undef WAIT_LOOP #undef WAIT_OR_UNLOCK
int queue_get_free(queue_s *queue) { int queue_get_free(queue_s *queue) {
A_MUTEX_LOCK(&queue->mutex); A_MUTEX_LOCK(&queue->mutex);