diff --git a/docs/h264.md b/docs/h264.md index 4a8be50..bf0b347 100644 --- a/docs/h264.md +++ b/docs/h264.md @@ -189,7 +189,7 @@ The client-side JavaScript application uses the following control flow: success: function (pluginHandle) { uStreamerPluginHandle = pluginHandle; // Instruct the µStreamer Janus plugin to initiate streaming. - uStreamerPluginHandle.send({ message: { request: "watch" } }); + uStreamerPluginHandle.send({ message: { request: "watch", params: {audio: true} } }); }, // Callback function if the server fails to attach the plugin. @@ -197,13 +197,6 @@ The client-side JavaScript application uses the following control flow: // Callback function for processing messages from the Janus server. onmessage: function (msg, jsepOffer) { - // 503 indicates that the plugin is not ready to stream yet. Retry the - // watch request until the video stream is available. - if (msg.error_code === 503) { - uStreamerPluginHandle.send({ message: { request: "watch" } }); - return; - } - // If there is a JSEP offer, respond to it. This starts the WebRTC // connection. if (jsepOffer) { diff --git a/janus/src/client.c b/janus/src/client.c index 4c03237..58c52e7 100644 --- a/janus/src/client.c +++ b/janus/src/client.c @@ -28,53 +28,50 @@ static void *_audio_thread(void *v_client); static void *_common_thread(void *v_client, bool video); -us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio) { +us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session) { us_janus_client_s *client; US_CALLOC(client, 1); client->gw = gw; client->session = session; - atomic_init(&client->transmit, true); + atomic_init(&client->transmit, false); + atomic_init(&client->transmit_audio, false); atomic_init(&client->stop, false); client->video_queue = us_queue_init(1024); US_THREAD_CREATE(client->video_tid, _video_thread, client); - if (has_audio) { - client->audio_queue = us_queue_init(64); - US_THREAD_CREATE(client->audio_tid, _audio_thread, client); - } + client->audio_queue = us_queue_init(64); + US_THREAD_CREATE(client->audio_tid, _audio_thread, client); + return client; } void us_janus_client_destroy(us_janus_client_s *client) { atomic_store(&client->stop, true); us_queue_put(client->video_queue, NULL, 0); - if (client->audio_queue != NULL) { - us_queue_put(client->audio_queue, NULL, 0); - } + us_queue_put(client->audio_queue, NULL, 0); US_THREAD_JOIN(client->video_tid); US_QUEUE_DELETE_WITH_ITEMS(client->video_queue, us_rtp_destroy); - if (client->audio_queue != NULL) { - US_THREAD_JOIN(client->audio_tid); - US_QUEUE_DELETE_WITH_ITEMS(client->audio_queue, us_rtp_destroy); - } + + US_THREAD_JOIN(client->audio_tid); + US_QUEUE_DELETE_WITH_ITEMS(client->audio_queue, us_rtp_destroy); + free(client); } void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) { if ( - !atomic_load(&client->transmit) - || (!rtp->video && client->audio_queue == NULL) + atomic_load(&client->transmit) + && (rtp->video || atomic_load(&client->transmit_audio)) ) { - return; - } - us_rtp_s *const new = us_rtp_dup(rtp); - if (us_queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) { - US_JLOG_ERROR("client", "Session %p %s queue is full", - client->session, (new->video ? "video" : "audio")); - us_rtp_destroy(new); + us_rtp_s *const new = us_rtp_dup(rtp); + if (us_queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) { + US_JLOG_ERROR("client", "Session %p %s queue is full", + client->session, (new->video ? "video" : "audio")); + us_rtp_destroy(new); + } } } @@ -97,7 +94,11 @@ static void *_common_thread(void *v_client, bool video) { if (rtp == NULL) { break; } - if (atomic_load(&client->transmit)) { + + if ( + atomic_load(&client->transmit) + && (video || atomic_load(&client->transmit_audio)) + ) { janus_plugin_rtp packet = {0}; packet.video = rtp->video; packet.buffer = (char *)rtp->datagram; @@ -107,6 +108,7 @@ static void *_common_thread(void *v_client, bool video) { // (if available) in stream index 1. packet.mindex = (rtp->video ? 0 : 1); # endif + janus_plugin_rtp_extensions_reset(&packet.extensions); /*if (rtp->zero_playout_delay) { // https://github.com/pikvm/pikvm/issues/784 @@ -116,6 +118,7 @@ static void *_common_thread(void *v_client, bool video) { packet.extensions.min_delay = 0; packet.extensions.max_delay = 1000; }*/ + client->gw->relay_rtp(client->session, &packet); } us_rtp_destroy(rtp); diff --git a/janus/src/client.h b/janus/src/client.h index 8e044ee..3ac7919 100644 --- a/janus/src/client.h +++ b/janus/src/client.h @@ -41,8 +41,9 @@ typedef struct us_janus_client_sx { janus_callbacks *gw; - janus_plugin_session *session; - atomic_bool transmit; + janus_plugin_session *session; + atomic_bool transmit; + atomic_bool transmit_audio; pthread_t video_tid; pthread_t audio_tid; @@ -55,7 +56,7 @@ typedef struct us_janus_client_sx { } us_janus_client_s; -us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session, bool has_audio); +us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session); void us_janus_client_destroy(us_janus_client_s *client); void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp); diff --git a/janus/src/plugin.c b/janus/src/plugin.c index b73d9f7..0c18306 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -77,6 +77,7 @@ static pthread_mutex_t _g_audio_lock = PTHREAD_MUTEX_INITIALIZER; static atomic_bool _g_ready = false; static atomic_bool _g_stop = false; static atomic_bool _g_has_watchers = false; +static atomic_bool _g_has_listeners = false; static atomic_bool _g_key_required = false; @@ -92,6 +93,7 @@ static atomic_bool _g_key_required = false; #define _READY atomic_load(&_g_ready) #define _STOP atomic_load(&_g_stop) #define _HAS_WATCHERS atomic_load(&_g_has_watchers) +#define _HAS_LISTENERS atomic_load(&_g_has_listeners) janus_plugin *create(void); @@ -184,7 +186,7 @@ static void *_audio_thread(UNUSED void *arg) { int once = 0; while (!_STOP) { - if (!_HAS_WATCHERS) { + if (!_HAS_WATCHERS || !_HAS_LISTENERS) { usleep(_g_watchers_polling); continue; } @@ -206,7 +208,7 @@ static void *_audio_thread(UNUSED void *arg) { once = 0; - while (!_STOP && _HAS_WATCHERS) { + while (!_STOP && _HAS_WATCHERS && _HAS_LISTENERS) { if ( us_tc358743_read_info(_g_config->tc358743_dev_path, &info) < 0 || !info.has_audio @@ -295,7 +297,7 @@ static void _plugin_create_session(janus_plugin_session *session, int *err) { _IF_DISABLED({ *err = -1; return; }); _LOCK_ALL; US_JLOG_INFO("main", "Creating session %p ...", session); - us_janus_client_s *const client = us_janus_client_init(_g_gw, session, (_g_config->audio_dev_name != NULL)); + us_janus_client_s *const client = us_janus_client_init(_g_gw, session); US_LIST_APPEND(_g_clients, client); atomic_store(&_g_has_watchers, true); _UNLOCK_ALL; @@ -306,6 +308,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { _LOCK_ALL; bool found = false; bool has_watchers = false; + bool has_listeners = false; US_LIST_ITERATE(_g_clients, client, { if (client->session == session) { US_JLOG_INFO("main", "Removing session %p ...", session); @@ -314,6 +317,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { found = true; } else { has_watchers = (has_watchers || atomic_load(&client->transmit)); + has_listeners = (has_listeners || atomic_load(&client->transmit_audio)); } }); if (!found) { @@ -321,6 +325,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) { *err = -2; } atomic_store(&_g_has_watchers, has_watchers); + atomic_store(&_g_has_listeners, has_listeners); _UNLOCK_ALL; } @@ -419,10 +424,21 @@ static struct janus_plugin_result *_plugin_handle_message( PUSH_STATUS("stopped", NULL); } else if (!strcmp(request_str, "watch")) { - char *sdp; + bool with_audio = false; { + json_t *const params_obj = json_object_get(msg, "params"); + if (params_obj != NULL) { + json_t *const audio_obj = json_object_get(params_obj, "audio"); + if (audio_obj != NULL && json_is_boolean(audio_obj)) { + with_audio = (_g_rtpa != NULL && json_boolean_value(audio_obj)); + } + } + } + + { + char *sdp; char *const video_sdp = us_rtpv_make_sdp(_g_rtpv); - char *const audio_sdp = (_g_rtpa ? us_rtpa_make_sdp(_g_rtpa) : us_strdup("")); + char *const audio_sdp = (with_audio ? us_rtpa_make_sdp(_g_rtpa) : us_strdup("")); US_ASPRINTF(sdp, "v=0" RN "o=- %" PRIu64 " 1 IN IP4 0.0.0.0" RN @@ -440,13 +456,25 @@ static struct janus_plugin_result *_plugin_handle_message( audio_sdp, video_sdp # endif ); + json_t *const offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp); + PUSH_STATUS("started", offer_jsep); + json_decref(offer_jsep); free(audio_sdp); free(video_sdp); + free(sdp); + } + + if (with_audio) { + _LOCK_ALL; + US_LIST_ITERATE(_g_clients, client, { + if (client->session == session) { + atomic_store(&client->transmit_audio, true); + break; + } + }); + atomic_store(&_g_has_listeners, true); + _UNLOCK_ALL; } - json_t *const offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp); - free(sdp); - PUSH_STATUS("started", offer_jsep); - json_decref(offer_jsep); } else if (!strcmp(request_str, "key_required")) { // US_JLOG_INFO("main", "Got key_required message");