diff --git a/janus/src/acap.c b/janus/src/acap.c index 469cf47..bfde469 100644 --- a/janus/src/acap.c +++ b/janus/src/acap.c @@ -53,14 +53,14 @@ static void *_encoder_thread(void *v_acap); bool us_acap_probe(const char *name) { - snd_pcm_t *pcm; + snd_pcm_t *dev; int err; US_JLOG_INFO("acap", "Probing PCM capture ..."); - if ((err = snd_pcm_open(&pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) { + if ((err = snd_pcm_open(&dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) { _JLOG_PERROR_ALSA(err, "acap", "Can't probe PCM capture"); return false; } - snd_pcm_close(pcm); + snd_pcm_close(dev); US_JLOG_INFO("acap", "PCM capture is available"); return true; } @@ -76,15 +76,15 @@ us_acap_s *us_acap_init(const char *name, uint pcm_hz) { int err; { - if ((err = snd_pcm_open(&acap->pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) { - acap->pcm = NULL; + if ((err = snd_pcm_open(&acap->dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) { + acap->dev = NULL; _JLOG_PERROR_ALSA(err, "acap", "Can't open PCM capture"); goto error; } - assert(!snd_pcm_hw_params_malloc(&acap->pcm_params)); + assert(!snd_pcm_hw_params_malloc(&acap->dev_params)); # define SET_PARAM(_msg, _func, ...) { \ - if ((err = _func(acap->pcm, acap->pcm_params, ##__VA_ARGS__)) < 0) { \ + if ((err = _func(acap->dev, acap->dev_params, ##__VA_ARGS__)) < 0) { \ _JLOG_PERROR_ALSA(err, "acap", _msg); \ goto error; \ } \ @@ -148,8 +148,8 @@ void us_acap_destroy(us_acap_s *acap) { } US_DELETE(acap->enc, opus_encoder_destroy); US_DELETE(acap->res, speex_resampler_destroy); - US_DELETE(acap->pcm, snd_pcm_close); - US_DELETE(acap->pcm_params, snd_pcm_hw_params_free); + US_DELETE(acap->dev, snd_pcm_close); + US_DELETE(acap->dev_params, snd_pcm_hw_params_free); US_RING_DELETE_WITH_ITEMS(acap->enc_ring, us_au_encoded_destroy); US_RING_DELETE_WITH_ITEMS(acap->pcm_ring, us_au_pcm_destroy); if (acap->tids_created) { @@ -167,7 +167,7 @@ int us_acap_get_encoded(us_acap_s *acap, u8 *data, uz *size, u64 *pts) { return US_ERROR_NO_DATA; } const us_au_encoded_s *const buf = acap->enc_ring->items[ri]; - if (*size < buf->used) { + if (buf->used == 0 || *size < buf->used) { us_ring_consumer_release(acap->enc_ring, ri); return US_ERROR_NO_DATA; } @@ -185,7 +185,7 @@ static void *_pcm_thread(void *v_acap) { u8 in[US_AU_MAX_BUF8]; while (!atomic_load(&acap->stop)) { - const int frames = snd_pcm_readi(acap->pcm, in, acap->pcm_frames); + const int frames = snd_pcm_readi(acap->dev, in, acap->pcm_frames); if (frames < 0) { _JLOG_PERROR_ALSA(frames, "acap", "Fatal: Can't capture PCM frames"); break; @@ -209,7 +209,7 @@ static void *_pcm_thread(void *v_acap) { } static void *_encoder_thread(void *v_acap) { - US_THREAD_SETTLE("us_a_enc"); + US_THREAD_SETTLE("us_ac_enc"); us_acap_s *const acap = v_acap; s16 in_res[US_AU_MAX_BUF16]; @@ -244,12 +244,13 @@ static void *_encoder_thread(void *v_acap) { const int size = opus_encode(acap->enc, in_ptr, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), out->data, US_ARRAY_LEN(out->data)); us_ring_consumer_release(acap->pcm_ring, in_ri); - if (size >= 0) { + if (size > 0) { out->used = size; out->pts = acap->pts; // https://datatracker.ietf.org/doc/html/rfc7587#section-4.2 acap->pts += US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ); } else { + out->used = 0; _JLOG_PERROR_OPUS(size, "acap", "Fatal: Can't encode PCM frame to OPUS"); } us_ring_producer_release(acap->enc_ring, out_ri); diff --git a/janus/src/acap.h b/janus/src/acap.h index 82b33dd..fb65094 100644 --- a/janus/src/acap.h +++ b/janus/src/acap.h @@ -34,11 +34,11 @@ typedef struct { - snd_pcm_t *pcm; + snd_pcm_t *dev; uint pcm_hz; uint pcm_frames; uz pcm_size; - snd_pcm_hw_params_t *pcm_params; + snd_pcm_hw_params_t *dev_params; SpeexResamplerState *res; OpusEncoder *enc; diff --git a/janus/src/au.c b/janus/src/au.c index dcd5e9d..840c39e 100644 --- a/janus/src/au.c +++ b/janus/src/au.c @@ -37,10 +37,42 @@ void us_au_pcm_destroy(us_au_pcm_s *pcm) { free(pcm); } +void us_au_pcm_mix(us_au_pcm_s *dest, us_au_pcm_s *src) { + const uz size = src->frames * US_RTP_OPUS_CH * 2; // 2 for 16 bit + if (src->frames == 0) { + return; + } else if (dest->frames == 0) { + memcpy(dest->data, src->data, size); + dest->frames = src->frames; + } else if (dest->frames == src->frames) { + // https://stackoverflow.com/questions/12089662 + for (uz index = 0; index < size; ++index) { + int a = dest->data[index]; + int b = src->data[index]; + int m; + + a += 32768; + b += 32768; + + if ((a < 32768) && (b < 32768)) { + m = a * b / 32768; + } else { + m = 2 * (a + b) - (a * b) / 32768 - 65536; + } + if (m == 65536) { + m = 65535; + } + m -= 32768; + + dest->data[index] = m; + } + } +} + us_au_encoded_s *us_au_encoded_init(void) { - us_au_encoded_s *enc; - US_CALLOC(enc, 1); - return enc; + us_au_encoded_s *enc; + US_CALLOC(enc, 1); + return enc; } void us_au_encoded_destroy(us_au_encoded_s *enc) { diff --git a/janus/src/au.h b/janus/src/au.h index 77b46ee..3bb8f7c 100644 --- a/janus/src/au.h +++ b/janus/src/au.h @@ -28,6 +28,7 @@ // A number of frames per 1 channel: // - https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368 +#define US_AU_FRAME_MS 20 // #define _HZ_TO_FRAMES(_hz) (6 * (_hz) / 50) // 120ms #define US_AU_HZ_TO_FRAMES(_hz) ((_hz) / 50) // 20ms #define US_AU_HZ_TO_BUF16(_hz) (US_AU_HZ_TO_FRAMES(_hz) * US_RTP_OPUS_CH) // ... * 2: One stereo frame = (16bit L) + (16bit R) @@ -41,6 +42,7 @@ typedef struct { s16 data[US_AU_MAX_BUF16]; + uz frames; } us_au_pcm_s; typedef struct { @@ -52,6 +54,7 @@ typedef struct { us_au_pcm_s *us_au_pcm_init(void); void us_au_pcm_destroy(us_au_pcm_s *pcm); +void us_au_pcm_mix(us_au_pcm_s *a, us_au_pcm_s *b); us_au_encoded_s *us_au_encoded_init(void); void us_au_encoded_destroy(us_au_encoded_s *enc); diff --git a/janus/src/client.c b/janus/src/client.c index 89d3338..260ab9a 100644 --- a/janus/src/client.c +++ b/janus/src/client.c @@ -25,23 +25,29 @@ #include #include #include +#include #include #include +#include +#include #include "uslibs/types.h" #include "uslibs/tools.h" #include "uslibs/threading.h" +#include "uslibs/array.h" #include "uslibs/list.h" #include "uslibs/ring.h" #include "logging.h" +#include "au.h" #include "rtp.h" static void *_video_thread(void *v_client); static void *_acap_thread(void *v_client); -static void *_common_thread(void *v_client, bool video); +static void *_video_or_acap_thread(void *v_client, bool video); +static void *_aplay_thread(void *v_client); us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session) { @@ -51,6 +57,7 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio client->session = session; atomic_init(&client->transmit, false); atomic_init(&client->transmit_acap, false); + atomic_init(&client->transmit_aplay, false); atomic_init(&client->video_orient, 0); atomic_init(&client->stop, false); @@ -61,6 +68,10 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio US_RING_INIT_WITH_ITEMS(client->acap_ring, 64, us_rtp_init); US_THREAD_CREATE(client->acap_tid, _acap_thread, client); + US_RING_INIT_WITH_ITEMS(client->aplay_enc_ring, 64, us_au_encoded_init); + US_RING_INIT_WITH_ITEMS(client->aplay_pcm_ring, 64, us_au_pcm_init); + US_THREAD_CREATE(client->aplay_tid, _aplay_thread, client); + return client; } @@ -73,6 +84,10 @@ void us_janus_client_destroy(us_janus_client_s *client) { US_THREAD_JOIN(client->acap_tid); US_RING_DELETE_WITH_ITEMS(client->acap_ring, us_rtp_destroy); + US_THREAD_JOIN(client->aplay_tid); + US_RING_DELETE_WITH_ITEMS(client->aplay_enc_ring, us_au_encoded_destroy); + US_RING_DELETE_WITH_ITEMS(client->aplay_pcm_ring, us_au_pcm_destroy); + free(client); } @@ -93,20 +108,65 @@ void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) { } } +void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet) { + if ( + packet->video + || packet->length < sizeof(janus_rtp_header) + || !atomic_load(&client->transmit) + || !atomic_load(&client->transmit_aplay) + ) { + return; + } + + const janus_rtp_header *const header = (janus_rtp_header*)packet->buffer; + if (header->type != US_RTP_OPUS_PAYLOAD) { + return; + } + + const u16 seq = ntohs(header->seq_number); + if ( + seq >= client->aplay_seq_next // In order or missing + || (client->aplay_seq_next - seq) > 50 // In late sequence or sequence wrapped + ) { + client->aplay_seq_next = seq + 1; + + int size = 0; + const char *const data = janus_rtp_payload(packet->buffer, packet->length, &size); + if (data == NULL || size <= 0) { + return; + } + + us_ring_s *const ring = client->aplay_enc_ring; + const int ri = us_ring_producer_acquire(ring, 0); + if (ri < 0) { + // US_JLOG_ERROR("client", "Session %p aplay ring is full", client->session); + return; + } + us_au_encoded_s *enc = ring->items[ri]; + if ((uz)size < US_ARRAY_LEN(enc->data)) { + memcpy(enc->data, data, size); + enc->used = size; + } else { + enc->used = 0; + } + us_ring_producer_release(ring, ri); + } +} + static void *_video_thread(void *v_client) { - US_THREAD_SETTLE("us_c_video"); - return _common_thread(v_client, true); + US_THREAD_SETTLE("us_cx_vid"); + return _video_or_acap_thread(v_client, true); } static void *_acap_thread(void *v_client) { - US_THREAD_SETTLE("us_c_acap"); - return _common_thread(v_client, false); + US_THREAD_SETTLE("us_cx_ac"); + return _video_or_acap_thread(v_client, false); } -static void *_common_thread(void *v_client, bool video) { +static void *_video_or_acap_thread(void *v_client, bool video) { us_janus_client_s *const client = v_client; us_ring_s *const ring = (video ? client->video_ring : client->acap_ring); - assert(ring != NULL); // Audio may be NULL + assert(ring != NULL); while (!atomic_load(&client->stop)) { const int ri = us_ring_consumer_acquire(ring, 0.1); @@ -156,3 +216,48 @@ static void *_common_thread(void *v_client, bool video) { } return NULL; } + +static void *_aplay_thread(void *v_client) { + US_THREAD_SETTLE("us_cx_ap"); + + us_janus_client_s *const client = v_client; + + int err; + OpusDecoder *dec = opus_decoder_create(US_RTP_OPUS_HZ, US_RTP_OPUS_CH, &err); + assert(err == 0); + + while (!atomic_load(&client->stop)) { + const int in_ri = us_ring_consumer_acquire(client->aplay_enc_ring, 0.1); + if (in_ri < 0) { + continue; + } + us_au_encoded_s *in = client->aplay_enc_ring->items[in_ri]; + + if (in->used == 0) { + us_ring_consumer_release(client->aplay_enc_ring, in_ri); + continue; + } + + const int out_ri = us_ring_producer_acquire(client->aplay_pcm_ring, 0); + if (out_ri < 0) { + US_JLOG_ERROR("aplay", "OPUS decoder queue is full"); + us_ring_consumer_release(client->aplay_enc_ring, in_ri); + continue; + } + us_au_pcm_s *out = client->aplay_pcm_ring->items[out_ri]; + + const int frames = opus_decode(dec, in->data, in->used, out->data, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), 0); + us_ring_consumer_release(client->aplay_enc_ring, in_ri); + + if (frames > 0) { + out->frames = frames; + } else { + out->frames = 0; + US_JLOG_ERROR("aplay", "Fatal: Can't decode OPUS to PCM frame: %s", opus_strerror(frames)); + } + us_ring_producer_release(client->aplay_pcm_ring, out_ri); + } + + opus_decoder_destroy(dec); + return NULL; +} diff --git a/janus/src/client.h b/janus/src/client.h index d93411f..016645a 100644 --- a/janus/src/client.h +++ b/janus/src/client.h @@ -39,15 +39,21 @@ typedef struct { janus_plugin_session *session; atomic_bool transmit; atomic_bool transmit_acap; + atomic_bool transmit_aplay; atomic_uint video_orient; pthread_t video_tid; pthread_t acap_tid; + pthread_t aplay_tid; atomic_bool stop; us_ring_s *video_ring; us_ring_s *acap_ring; + us_ring_s *aplay_enc_ring; + u16 aplay_seq_next; + us_ring_s *aplay_pcm_ring; + US_LIST_DECLARE; } us_janus_client_s; @@ -56,3 +62,4 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio void us_janus_client_destroy(us_janus_client_s *client); void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp); +void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet); diff --git a/janus/src/config.c b/janus/src/config.c index 2f317f8..621a3a8 100644 --- a/janus/src/config.c +++ b/janus/src/config.c @@ -55,18 +55,16 @@ us_config_s *us_config_init(const char *config_dir_path) { } janus_config_print(jcfg); - if ( - (config->video_sink_name = _get_value(jcfg, "memsink", "object")) == NULL - && (config->video_sink_name = _get_value(jcfg, "video", "sink")) == NULL - ) { - US_JLOG_ERROR("config", "Missing config value: video.sink (ex. memsink.object)"); + if ((config->video_sink_name = _get_value(jcfg, "video", "sink")) == NULL) { + US_JLOG_ERROR("config", "Missing config value: video.sink"); goto error; } - if ((config->acap_dev_name = _get_value(jcfg, "audio", "device")) != NULL) { - if ((config->tc358743_dev_path = _get_value(jcfg, "audio", "tc358743")) == NULL) { - US_JLOG_INFO("config", "Missing config value: audio.tc358743"); + if ((config->acap_dev_name = _get_value(jcfg, "acap", "device")) != NULL) { + if ((config->tc358743_dev_path = _get_value(jcfg, "acap", "tc358743")) == NULL) { + US_JLOG_INFO("config", "Missing config value: acap.tc358743"); goto error; } + config->aplay_dev_name = _get_value(jcfg, "aplay", "device"); } goto ok; @@ -84,6 +82,7 @@ void us_config_destroy(us_config_s *config) { US_DELETE(config->video_sink_name, free); US_DELETE(config->acap_dev_name, free); US_DELETE(config->tc358743_dev_path, free); + US_DELETE(config->aplay_dev_name, free); free(config); } diff --git a/janus/src/config.h b/janus/src/config.h index b84f506..1cb4046 100644 --- a/janus/src/config.h +++ b/janus/src/config.h @@ -28,6 +28,8 @@ typedef struct { char *acap_dev_name; char *tc358743_dev_path; + + char *aplay_dev_name; } us_config_s; diff --git a/janus/src/plugin.c b/janus/src/plugin.c index 8e0db41..db8b242 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -33,7 +33,9 @@ #include #include #include +#include #include +#include #include "uslibs/types.h" #include "uslibs/const.h" @@ -48,6 +50,7 @@ #include "const.h" #include "logging.h" #include "client.h" +#include "au.h" #include "acap.h" #include "rtp.h" #include "rtpv.h" @@ -55,7 +58,6 @@ #include "memsinkfd.h" #include "config.h" - static us_config_s *_g_config = NULL; static const useconds_t _g_watchers_polling = 100000; @@ -63,7 +65,7 @@ static us_janus_client_s *_g_clients = NULL; static janus_callbacks *_g_gw = NULL; static us_ring_s *_g_video_ring = NULL; static us_rtpv_s *_g_rtpv = NULL; -static us_rtpa_s *_g_rtpa = NULL; +static us_rtpa_s *_g_rtpa = NULL; // Also indicates "audio capture is available" static pthread_t _g_video_rtp_tid; static atomic_bool _g_video_rtp_tid_created = false; @@ -71,13 +73,17 @@ static pthread_t _g_video_sink_tid; static atomic_bool _g_video_sink_tid_created = false; static pthread_t _g_acap_tid; static atomic_bool _g_acap_tid_created = false; +static pthread_t _g_aplay_tid; +static atomic_bool _g_aplay_tid_created = false; static pthread_mutex_t _g_video_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t _g_acap_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t _g_aplay_lock = PTHREAD_MUTEX_INITIALIZER; static atomic_bool _g_ready = false; static atomic_bool _g_stop = false; static atomic_bool _g_has_watchers = false; static atomic_bool _g_has_listeners = false; +static atomic_bool _g_has_speakers = false; static atomic_bool _g_key_required = false; @@ -87,13 +93,19 @@ static atomic_bool _g_key_required = false; #define _LOCK_ACAP US_MUTEX_LOCK(_g_acap_lock) #define _UNLOCK_ACAP US_MUTEX_UNLOCK(_g_acap_lock) -#define _LOCK_ALL { _LOCK_VIDEO; _LOCK_ACAP; } -#define _UNLOCK_ALL { _UNLOCK_ACAP; _UNLOCK_VIDEO; } +#define _LOCK_APLAY US_MUTEX_LOCK(_g_aplay_lock) +#define _UNLOCK_APLAY US_MUTEX_UNLOCK(_g_aplay_lock) + +#define _LOCK_ALL { _LOCK_VIDEO; _LOCK_ACAP; _LOCK_APLAY; } +#define _UNLOCK_ALL { _UNLOCK_APLAY; _UNLOCK_ACAP; _UNLOCK_VIDEO; } #define _READY atomic_load(&_g_ready) #define _STOP atomic_load(&_g_stop) #define _HAS_WATCHERS atomic_load(&_g_has_watchers) #define _HAS_LISTENERS atomic_load(&_g_has_listeners) +#define _HAS_SPEAKERS atomic_load(&_g_has_speakers) + +#define _IF_DISABLED(...) { if (!_READY || _STOP) { __VA_ARGS__ } } janus_plugin *create(void); @@ -101,7 +113,7 @@ janus_plugin *create(void); static void *_video_rtp_thread(void *arg) { (void)arg; - US_THREAD_SETTLE("us_video_rtp"); + US_THREAD_SETTLE("us_p_rtpv"); atomic_store(&_g_video_rtp_tid_created, true); while (!_STOP) { @@ -120,7 +132,7 @@ static void *_video_rtp_thread(void *arg) { static void *_video_sink_thread(void *arg) { (void)arg; - US_THREAD_SETTLE("us_video_sink"); + US_THREAD_SETTLE("us_p_vsink"); atomic_store(&_g_video_sink_tid_created, true); us_frame_s *drop = us_frame_init(); @@ -216,11 +228,12 @@ static int _check_tc358743_acap(uint *hz) { static void *_acap_thread(void *arg) { (void)arg; - US_THREAD_SETTLE("us_acap"); + US_THREAD_SETTLE("us_p_ac"); atomic_store(&_g_acap_tid_created, true); assert(_g_config->acap_dev_name != NULL); assert(_g_config->tc358743_dev_path != NULL); + assert(_g_rtpa != NULL); int once = 0; @@ -271,6 +284,90 @@ static void *_acap_thread(void *arg) { return NULL; } +static void *_aplay_thread(void *arg) { + (void)arg; + US_THREAD_SETTLE("us_p_ap"); + atomic_store(&_g_aplay_tid_created, true); + + assert(_g_config->aplay_dev_name != NULL); + + int once = 0; + snd_pcm_t *dev = NULL; + bool skip = false; + + while (!_STOP) { + usleep(US_AU_FRAME_MS * 1000 / 4); + + us_au_pcm_s mixed = {0}; + _LOCK_APLAY; + US_LIST_ITERATE(_g_clients, client, { + us_au_pcm_s last = {0}; + do { + const int ri = us_ring_consumer_acquire(client->aplay_pcm_ring, 0); + if (ri >= 0) { + const us_au_pcm_s *pcm = client->aplay_pcm_ring->items[ri]; + memcpy(&last, pcm, sizeof(us_au_pcm_s)); + us_ring_consumer_release(client->aplay_pcm_ring, ri); + } else { + break; + } + } while (skip && !_STOP); + us_au_pcm_mix(&mixed, &last); + // US_JLOG_INFO("++++++", "mixed %p", client); + }); + _UNLOCK_APLAY; + // US_JLOG_INFO("++++++", "--------------"); + + if (!_HAS_WATCHERS || !_HAS_LISTENERS || !_HAS_SPEAKERS) { + skip = true; + continue; + } + + if (dev == NULL) { + int err = snd_pcm_open(&dev, _g_config->aplay_dev_name, SND_PCM_STREAM_PLAYBACK, 0); + if (err < 0) { + US_ONCE({ US_JLOG_ERROR("aplay", "Can't open PCM playback: %s", snd_strerror(err)); }); + goto close_aplay; + } + + err = snd_pcm_set_params( + dev, + SND_PCM_FORMAT_S16_LE, SND_PCM_ACCESS_RW_INTERLEAVED, + US_RTP_OPUS_CH, US_RTP_OPUS_HZ, + 1 /* soft resample */, 50000 /* 50000 = 0.05sec */ + ); + if (err < 0) { + US_ONCE({ US_JLOG_ERROR("aplay", "Can't configure PCM playback: %s", snd_strerror(err)); }); + goto close_aplay; + } + + US_JLOG_INFO("aplay", "Playback opened, playing ..."); + } + + if (dev != NULL && mixed.frames > 0) { + snd_pcm_sframes_t frames = snd_pcm_writei(dev, mixed.data, mixed.frames); + if (frames < 0) { + frames = snd_pcm_recover(dev, frames, 1); + } else { + skip = false; + } + if (frames < 0) { + US_ONCE({ US_JLOG_ERROR("aplay", "Can't play to PCM: %s", snd_strerror(frames)); }); + skip = true; + } else { + skip = false; + } + } + + continue; + close_aplay: + US_DELETE(dev, snd_pcm_close); + } + + US_DELETE(dev, snd_pcm_close); + return NULL; +} + static void _relay_rtp_clients(const us_rtp_s *rtp) { US_LIST_ITERATE(_g_clients, client, { us_janus_client_send(client, rtp); @@ -295,6 +392,9 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { if (_g_config->acap_dev_name != NULL && us_acap_probe(_g_config->acap_dev_name)) { _g_rtpa = us_rtpa_init(_relay_rtp_clients); US_THREAD_CREATE(_g_acap_tid, _acap_thread, NULL); + if (_g_config->aplay_dev_name != NULL) { + US_THREAD_CREATE(_g_aplay_tid, _aplay_thread, NULL); + } } US_THREAD_CREATE(_g_video_rtp_tid, _video_rtp_thread, NULL); US_THREAD_CREATE(_g_video_sink_tid, _video_sink_thread, NULL); @@ -311,6 +411,7 @@ static void _plugin_destroy(void) { JOIN(_g_video_sink_tid); JOIN(_g_video_rtp_tid); JOIN(_g_acap_tid); + JOIN(_g_aplay_tid); # undef JOIN US_LIST_ITERATE(_g_clients, client, { @@ -325,8 +426,6 @@ static void _plugin_destroy(void) { US_DELETE(_g_config, us_config_destroy); } -#define _IF_DISABLED(...) { if (!_READY || _STOP) { __VA_ARGS__ } } - static void _plugin_create_session(janus_plugin_session *session, int *err) { _IF_DISABLED({ *err = -1; return; }); _LOCK_ALL; @@ -343,6 +442,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { bool found = false; bool has_watchers = false; bool has_listeners = false; + bool has_speakers = false; US_LIST_ITERATE(_g_clients, client, { if (client->session == session) { US_JLOG_INFO("main", "Removing session %p ...", session); @@ -352,6 +452,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { } else { has_watchers = (has_watchers || atomic_load(&client->transmit)); has_listeners = (has_listeners || atomic_load(&client->transmit_acap)); + has_speakers = (has_speakers || atomic_load(&client->transmit_aplay)); } }); if (!found) { @@ -360,6 +461,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { } atomic_store(&_g_has_watchers, has_watchers); atomic_store(&_g_has_listeners, has_listeners); + atomic_store(&_g_has_speakers, has_speakers); _UNLOCK_ALL; } @@ -398,8 +500,6 @@ static void _set_transmit(janus_plugin_session *session, const char *msg, bool t _UNLOCK_ALL; } -#undef _IF_DISABLED - static void _plugin_setup_media(janus_plugin_session *session) { _set_transmit(session, "Unmuted", true); } static void _plugin_hangup_media(janus_plugin_session *session) { _set_transmit(session, "Muted", false); } @@ -471,9 +571,9 @@ static struct janus_plugin_result *_plugin_handle_message( } } { - json_t *const obj = json_object_get(params, "microphone"); + json_t *const obj = json_object_get(params, "mic"); if (obj != NULL && json_is_boolean(obj)) { - with_aplay = (with_acap && json_boolean_value(obj)); // FIXME: also check playback + with_aplay = (_g_config->aplay_dev_name != NULL && with_acap && json_boolean_value(obj)); } } { @@ -521,19 +621,27 @@ static struct janus_plugin_result *_plugin_handle_message( { _LOCK_ALL; bool has_listeners = false; + bool has_speakers = false; US_LIST_ITERATE(_g_clients, client, { if (client->session == session) { atomic_store(&client->transmit_acap, with_acap); + atomic_store(&client->transmit_aplay, with_aplay); atomic_store(&client->video_orient, video_orient); } has_listeners = (has_listeners || atomic_load(&client->transmit_acap)); + has_speakers = (has_speakers || atomic_load(&client->transmit_aplay)); }); atomic_store(&_g_has_listeners, has_listeners); + atomic_store(&_g_has_speakers, has_speakers); _UNLOCK_ALL; } } else if (!strcmp(request_str, "features")) { - json_t *const features = json_pack("{sbsb}", "audio", (_g_rtpa != NULL), "microphone", false); + json_t *const features = json_pack( + "{sbsb}", + "audio", (_g_rtpa != NULL), + "mic", (_g_rtpa != NULL && _g_config->aplay_dev_name != NULL) + ); PUSH_STATUS("features", features, NULL); json_decref(features); @@ -558,10 +666,27 @@ done: # undef PUSH_ERROR } -static void _plugin_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet) { - (void)handle; - (void)packet; - if (packet->video && janus_rtcp_has_pli(packet->buffer, packet->length)) { +static void _plugin_incoming_rtp(janus_plugin_session *session, janus_plugin_rtp *packet) { + _IF_DISABLED({ return; }); + if (session == NULL || packet == NULL || packet->video) { + return; // Accept only valid audio + } + _LOCK_APLAY; + US_LIST_ITERATE(_g_clients, client, { + if (client->session == session) { + us_janus_client_recv(client, packet); + break; + } + }); + _UNLOCK_APLAY; +} + +static void _plugin_incoming_rtcp(janus_plugin_session *session, janus_plugin_rtcp *packet) { + _IF_DISABLED({ return; }); + if (session == NULL || packet == NULL || !packet->video) { + return; // Accept only valid video + } + if (janus_rtcp_has_pli(packet->buffer, packet->length)) { // US_JLOG_INFO("main", "Got video PLI"); atomic_store(&_g_key_required, true); } @@ -602,6 +727,7 @@ janus_plugin *create(void) { .get_author = _plugin_get_author, .get_package = _plugin_get_package, + .incoming_rtp = _plugin_incoming_rtp, .incoming_rtcp = _plugin_incoming_rtcp, ); # pragma GCC diagnostic pop