janus: optional audio for each client

This commit is contained in:
Maxim Devaev 2022-11-27 06:51:00 +03:00
parent 900d7e1112
commit 9b4f3229f2
4 changed files with 68 additions and 43 deletions

View File

@ -189,7 +189,7 @@ The client-side JavaScript application uses the following control flow:
success: function (pluginHandle) {
uStreamerPluginHandle = pluginHandle;
// Instruct the µStreamer Janus plugin to initiate streaming.
uStreamerPluginHandle.send({ message: { request: "watch" } });
uStreamerPluginHandle.send({ message: { request: "watch", params: {audio: true} } });
},
// Callback function if the server fails to attach the plugin.
@ -197,13 +197,6 @@ The client-side JavaScript application uses the following control flow:
// Callback function for processing messages from the Janus server.
onmessage: function (msg, jsepOffer) {
// 503 indicates that the plugin is not ready to stream yet. Retry the
// watch request until the video stream is available.
if (msg.error_code === 503) {
uStreamerPluginHandle.send({ message: { request: "watch" } });
return;
}
// If there is a JSEP offer, respond to it. This starts the WebRTC
// connection.
if (jsepOffer) {

View File

@ -28,53 +28,50 @@ static void *_audio_thread(void *v_client);
static void *_common_thread(void *v_client, bool video);
us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio) {
us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session) {
us_janus_client_s *client;
US_CALLOC(client, 1);
client->gw = gw;
client->session = session;
atomic_init(&client->transmit, true);
atomic_init(&client->transmit, false);
atomic_init(&client->transmit_audio, false);
atomic_init(&client->stop, false);
client->video_queue = us_queue_init(1024);
US_THREAD_CREATE(client->video_tid, _video_thread, client);
if (has_audio) {
client->audio_queue = us_queue_init(64);
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
}
client->audio_queue = us_queue_init(64);
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
return client;
}
void us_janus_client_destroy(us_janus_client_s *client) {
atomic_store(&client->stop, true);
us_queue_put(client->video_queue, NULL, 0);
if (client->audio_queue != NULL) {
us_queue_put(client->audio_queue, NULL, 0);
}
us_queue_put(client->audio_queue, NULL, 0);
US_THREAD_JOIN(client->video_tid);
US_QUEUE_DELETE_WITH_ITEMS(client->video_queue, us_rtp_destroy);
if (client->audio_queue != NULL) {
US_THREAD_JOIN(client->audio_tid);
US_QUEUE_DELETE_WITH_ITEMS(client->audio_queue, us_rtp_destroy);
}
US_THREAD_JOIN(client->audio_tid);
US_QUEUE_DELETE_WITH_ITEMS(client->audio_queue, us_rtp_destroy);
free(client);
}
void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) {
if (
!atomic_load(&client->transmit)
|| (!rtp->video && client->audio_queue == NULL)
atomic_load(&client->transmit)
&& (rtp->video || atomic_load(&client->transmit_audio))
) {
return;
}
us_rtp_s *const new = us_rtp_dup(rtp);
if (us_queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) {
US_JLOG_ERROR("client", "Session %p %s queue is full",
client->session, (new->video ? "video" : "audio"));
us_rtp_destroy(new);
us_rtp_s *const new = us_rtp_dup(rtp);
if (us_queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) {
US_JLOG_ERROR("client", "Session %p %s queue is full",
client->session, (new->video ? "video" : "audio"));
us_rtp_destroy(new);
}
}
}
@ -97,7 +94,11 @@ static void *_common_thread(void *v_client, bool video) {
if (rtp == NULL) {
break;
}
if (atomic_load(&client->transmit)) {
if (
atomic_load(&client->transmit)
&& (video || atomic_load(&client->transmit_audio))
) {
janus_plugin_rtp packet = {0};
packet.video = rtp->video;
packet.buffer = (char *)rtp->datagram;
@ -107,6 +108,7 @@ static void *_common_thread(void *v_client, bool video) {
// (if available) in stream index 1.
packet.mindex = (rtp->video ? 0 : 1);
# endif
janus_plugin_rtp_extensions_reset(&packet.extensions);
/*if (rtp->zero_playout_delay) {
// https://github.com/pikvm/pikvm/issues/784
@ -116,6 +118,7 @@ static void *_common_thread(void *v_client, bool video) {
packet.extensions.min_delay = 0;
packet.extensions.max_delay = 1000;
}*/
client->gw->relay_rtp(client->session, &packet);
}
us_rtp_destroy(rtp);

View File

@ -41,8 +41,9 @@
typedef struct us_janus_client_sx {
janus_callbacks *gw;
janus_plugin_session *session;
atomic_bool transmit;
janus_plugin_session *session;
atomic_bool transmit;
atomic_bool transmit_audio;
pthread_t video_tid;
pthread_t audio_tid;
@ -55,7 +56,7 @@ typedef struct us_janus_client_sx {
} us_janus_client_s;
us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio);
us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session);
void us_janus_client_destroy(us_janus_client_s *client);
void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp);

View File

@ -77,6 +77,7 @@ static pthread_mutex_t _g_audio_lock = PTHREAD_MUTEX_INITIALIZER;
static atomic_bool _g_ready = false;
static atomic_bool _g_stop = false;
static atomic_bool _g_has_watchers = false;
static atomic_bool _g_has_listeners = false;
static atomic_bool _g_key_required = false;
@ -92,6 +93,7 @@ static atomic_bool _g_key_required = false;
#define _READY atomic_load(&_g_ready)
#define _STOP atomic_load(&_g_stop)
#define _HAS_WATCHERS atomic_load(&_g_has_watchers)
#define _HAS_LISTENERS atomic_load(&_g_has_listeners)
janus_plugin *create(void);
@ -184,7 +186,7 @@ static void *_audio_thread(UNUSED void *arg) {
int once = 0;
while (!_STOP) {
if (!_HAS_WATCHERS) {
if (!_HAS_WATCHERS || !_HAS_LISTENERS) {
usleep(_g_watchers_polling);
continue;
}
@ -206,7 +208,7 @@ static void *_audio_thread(UNUSED void *arg) {
once = 0;
while (!_STOP && _HAS_WATCHERS) {
while (!_STOP && _HAS_WATCHERS && _HAS_LISTENERS) {
if (
us_tc358743_read_info(_g_config->tc358743_dev_path, &info) < 0
|| !info.has_audio
@ -295,7 +297,7 @@ static void _plugin_create_session(janus_plugin_session *session, int *err) {
_IF_DISABLED({ *err = -1; return; });
_LOCK_ALL;
US_JLOG_INFO("main", "Creating session %p ...", session);
us_janus_client_s *const client = us_janus_client_init(_g_gw, session, (_g_config->audio_dev_name != NULL));
us_janus_client_s *const client = us_janus_client_init(_g_gw, session);
US_LIST_APPEND(_g_clients, client);
atomic_store(&_g_has_watchers, true);
_UNLOCK_ALL;
@ -306,6 +308,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
_LOCK_ALL;
bool found = false;
bool has_watchers = false;
bool has_listeners = false;
US_LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
US_JLOG_INFO("main", "Removing session %p ...", session);
@ -314,6 +317,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
found = true;
} else {
has_watchers = (has_watchers || atomic_load(&client->transmit));
has_listeners = (has_listeners || atomic_load(&client->transmit_audio));
}
});
if (!found) {
@ -321,6 +325,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
*err = -2;
}
atomic_store(&_g_has_watchers, has_watchers);
atomic_store(&_g_has_listeners, has_listeners);
_UNLOCK_ALL;
}
@ -419,10 +424,21 @@ static struct janus_plugin_result *_plugin_handle_message(
PUSH_STATUS("stopped", NULL);
} else if (!strcmp(request_str, "watch")) {
char *sdp;
bool with_audio = false;
{
json_t *const params_obj = json_object_get(msg, "params");
if (params_obj != NULL) {
json_t *const audio_obj = json_object_get(params_obj, "audio");
if (audio_obj != NULL && json_is_boolean(audio_obj)) {
with_audio = (_g_rtpa != NULL && json_boolean_value(audio_obj));
}
}
}
{
char *sdp;
char *const video_sdp = us_rtpv_make_sdp(_g_rtpv);
char *const audio_sdp = (_g_rtpa ? us_rtpa_make_sdp(_g_rtpa) : us_strdup(""));
char *const audio_sdp = (with_audio ? us_rtpa_make_sdp(_g_rtpa) : us_strdup(""));
US_ASPRINTF(sdp,
"v=0" RN
"o=- %" PRIu64 " 1 IN IP4 0.0.0.0" RN
@ -440,13 +456,25 @@ static struct janus_plugin_result *_plugin_handle_message(
audio_sdp, video_sdp
# endif
);
json_t *const offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp);
PUSH_STATUS("started", offer_jsep);
json_decref(offer_jsep);
free(audio_sdp);
free(video_sdp);
free(sdp);
}
if (with_audio) {
_LOCK_ALL;
US_LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
atomic_store(&client->transmit_audio, true);
break;
}
});
atomic_store(&_g_has_listeners, true);
_UNLOCK_ALL;
}
json_t *const offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp);
free(sdp);
PUSH_STATUS("started", offer_jsep);
json_decref(offer_jsep);
} else if (!strcmp(request_str, "key_required")) {
// US_JLOG_INFO("main", "Got key_required message");