janus: using ring for audio pipeline

This commit is contained in:
Maxim Devaev
2024-02-29 08:06:48 +02:00
parent 12937b93d5
commit 4296d5dada
2 changed files with 72 additions and 51 deletions

View File

@@ -51,6 +51,9 @@ typedef struct {
} _enc_buffer_s; } _enc_buffer_s;
static _pcm_buffer_s *_pcm_buffer_init(void);
static _enc_buffer_s *_enc_buffer_init(void);
static void *_pcm_thread(void *v_audio); static void *_pcm_thread(void *v_audio);
static void *_encoder_thread(void *v_audio); static void *_encoder_thread(void *v_audio);
@@ -72,8 +75,8 @@ us_audio_s *us_audio_init(const char *name, unsigned pcm_hz) {
us_audio_s *audio; us_audio_s *audio;
US_CALLOC(audio, 1); US_CALLOC(audio, 1);
audio->pcm_hz = pcm_hz; audio->pcm_hz = pcm_hz;
audio->pcm_queue = us_queue_init(8); US_RING_INIT_WITH_ITEMS(audio->pcm_ring, 8, _pcm_buffer_init);
audio->enc_queue = us_queue_init(8); US_RING_INIT_WITH_ITEMS(audio->enc_ring, 8, _enc_buffer_init);
atomic_init(&audio->stop, false); atomic_init(&audio->stop, false);
int err; int err;
@@ -151,8 +154,8 @@ void us_audio_destroy(us_audio_s *audio) {
US_DELETE(audio->res, speex_resampler_destroy); US_DELETE(audio->res, speex_resampler_destroy);
US_DELETE(audio->pcm, snd_pcm_close); US_DELETE(audio->pcm, snd_pcm_close);
US_DELETE(audio->pcm_params, snd_pcm_hw_params_free); US_DELETE(audio->pcm_params, snd_pcm_hw_params_free);
US_QUEUE_DELETE_WITH_ITEMS(audio->enc_queue, free); US_RING_DELETE_WITH_ITEMS(audio->enc_ring, free);
US_QUEUE_DELETE_WITH_ITEMS(audio->pcm_queue, free); US_RING_DELETE_WITH_ITEMS(audio->pcm_ring, free);
if (audio->tids_created) { if (audio->tids_created) {
US_JLOG_INFO("audio", "Pipeline closed"); US_JLOG_INFO("audio", "Pipeline closed");
} }
@@ -163,19 +166,32 @@ int us_audio_get_encoded(us_audio_s *audio, uint8_t *data, size_t *size, uint64_
if (atomic_load(&audio->stop)) { if (atomic_load(&audio->stop)) {
return -1; return -1;
} }
_enc_buffer_s *buf; const int ri = us_ring_consumer_acquire(audio->enc_ring, 0.1);
if (!us_queue_get(audio->enc_queue, (void **)&buf, 0.1)) { if (ri < 0) {
if (*size < buf->used) { return -2;
free(buf);
return -3;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
free(buf);
return 0;
} }
return -2; _enc_buffer_s *const buf = audio->enc_ring->items[ri];
if (*size < buf->used) {
us_ring_consumer_release(audio->enc_ring, ri);
return -3;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
us_ring_consumer_release(audio->enc_ring, ri);
return 0;
}
static _pcm_buffer_s *_pcm_buffer_init(void) {
_pcm_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
}
static _enc_buffer_s *_enc_buffer_init(void) {
_enc_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
} }
static void *_pcm_thread(void *v_audio) { static void *_pcm_thread(void *v_audio) {
@@ -194,13 +210,13 @@ static void *_pcm_thread(void *v_audio) {
break; break;
} }
if (us_queue_get_free(audio->pcm_queue)) { const int ri = us_ring_producer_acquire(audio->pcm_ring, 0);
_pcm_buffer_s *out; if (ri >= 0) {
US_CALLOC(out, 1); _pcm_buffer_s *const out = audio->pcm_ring->items[ri];
memcpy(out->data, in, audio->pcm_size); memcpy(out->data, in, audio->pcm_size);
assert(!us_queue_put(audio->pcm_queue, out, 0)); us_ring_producer_release(audio->pcm_ring, ri);
} else { } else {
US_JLOG_ERROR("audio", "PCM queue is full"); US_JLOG_ERROR("audio", "PCM ring is full");
} }
} }
@@ -215,39 +231,44 @@ static void *_encoder_thread(void *v_audio) {
int16_t in_res[_MAX_BUF16]; int16_t in_res[_MAX_BUF16];
while (!atomic_load(&audio->stop)) { while (!atomic_load(&audio->stop)) {
_pcm_buffer_s *in; const int in_ri = us_ring_consumer_acquire(audio->pcm_ring, 0.1);
if (!us_queue_get(audio->pcm_queue, (void **)&in, 0.1)) { if (in_ri < 0) {
int16_t *in_ptr; continue;
if (audio->res != NULL) { }
assert(audio->pcm_hz != _ENCODER_INPUT_HZ); _pcm_buffer_s *const in = audio->pcm_ring->items[in_ri];
uint32_t in_count = audio->pcm_frames;
uint32_t out_count = _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
speex_resampler_process_interleaved_int(audio->res, in->data, &in_count, in_res, &out_count);
in_ptr = in_res;
} else {
assert(audio->pcm_hz == _ENCODER_INPUT_HZ);
in_ptr = in->data;
}
_enc_buffer_s *out; int16_t *in_ptr;
US_CALLOC(out, 1); if (audio->res != NULL) {
const int size = opus_encode(audio->enc, in_ptr, _HZ_TO_FRAMES(_ENCODER_INPUT_HZ), out->data, US_ARRAY_LEN(out->data)); assert(audio->pcm_hz != _ENCODER_INPUT_HZ);
free(in); uint32_t in_count = audio->pcm_frames;
if (size < 0) { uint32_t out_count = _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
_JLOG_PERROR_OPUS(size, "audio", "Fatal: Can't encode PCM frame to OPUS"); speex_resampler_process_interleaved_int(audio->res, in->data, &in_count, in_res, &out_count);
free(out); in_ptr = in_res;
break; } else {
} assert(audio->pcm_hz == _ENCODER_INPUT_HZ);
in_ptr = in->data;
}
const int out_ri = us_ring_producer_acquire(audio->enc_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("audio", "OPUS encoder queue is full");
us_ring_consumer_release(audio->pcm_ring, in_ri);
continue;
}
_enc_buffer_s *const out = audio->enc_ring->items[out_ri];
const int size = opus_encode(audio->enc, in_ptr, _HZ_TO_FRAMES(_ENCODER_INPUT_HZ), out->data, US_ARRAY_LEN(out->data));
us_ring_consumer_release(audio->pcm_ring, in_ri);
if (size >= 0) {
out->used = size; out->used = size;
out->pts = audio->pts; out->pts = audio->pts;
// https://datatracker.ietf.org/doc/html/rfc7587#section-4.2 // https://datatracker.ietf.org/doc/html/rfc7587#section-4.2
audio->pts += _HZ_TO_FRAMES(_ENCODER_INPUT_HZ); audio->pts += _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
} else {
if (us_queue_put(audio->enc_queue, out, 0) != 0) { _JLOG_PERROR_OPUS(size, "audio", "Fatal: Can't encode PCM frame to OPUS");
US_JLOG_ERROR("audio", "OPUS encoder queue is full");
free(out);
}
} }
us_ring_producer_release(audio->enc_ring, out_ri);
} }
atomic_store(&audio->stop, true); atomic_store(&audio->stop, true);

View File

@@ -37,7 +37,7 @@
#include "uslibs/tools.h" #include "uslibs/tools.h"
#include "uslibs/array.h" #include "uslibs/array.h"
#include "uslibs/queue.h" #include "uslibs/ring.h"
#include "uslibs/threading.h" #include "uslibs/threading.h"
#include "logging.h" #include "logging.h"
@@ -52,8 +52,8 @@ typedef struct {
SpeexResamplerState *res; SpeexResamplerState *res;
OpusEncoder *enc; OpusEncoder *enc;
us_queue_s *pcm_queue; us_ring_s *pcm_ring;
us_queue_s *enc_queue; us_ring_s *enc_ring;
uint32_t pts; uint32_t pts;
pthread_t pcm_tid; pthread_t pcm_tid;