From d4a9862a189814b240538290181481b32ab87a72 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sun, 29 May 2022 01:29:01 +0300 Subject: [PATCH] webrtc audio --- janus/Makefile | 2 +- janus/src/audio.c | 249 +++++++++++++++++++++++++++++++++++++++ janus/src/audio.h | 65 +++++++++++ janus/src/jlogging.h | 36 ++++++ janus/src/plugin.c | 272 ++++++++++++++++++++++++++----------------- janus/src/queue.c | 91 +++++++++++++++ janus/src/queue.h | 54 +++++++++ janus/src/rtp.c | 182 +---------------------------- janus/src/rtp.h | 35 ++---- janus/src/rtpa.c | 69 +++++++++++ janus/src/rtpa.h | 51 ++++++++ janus/src/rtpv.c | 211 +++++++++++++++++++++++++++++++++ janus/src/rtpv.h | 59 ++++++++++ src/libs/threading.h | 2 +- src/libs/tools.h | 4 +- 15 files changed, 1072 insertions(+), 310 deletions(-) create mode 100644 janus/src/audio.c create mode 100644 janus/src/audio.h create mode 100644 janus/src/jlogging.h create mode 100644 janus/src/queue.c create mode 100644 janus/src/queue.h create mode 100644 janus/src/rtpa.c create mode 100644 janus/src/rtpa.h create mode 100644 janus/src/rtpv.c create mode 100644 janus/src/rtpv.h diff --git a/janus/Makefile b/janus/Makefile index 0278d30..b8b8ba6 100644 --- a/janus/Makefile +++ b/janus/Makefile @@ -10,7 +10,7 @@ LDFLAGS ?= _PLUGIN = libjanus_ustreamer.so _CFLAGS = -fPIC -MD -c -std=c11 -Wall -Wextra -D_GNU_SOURCE $(shell pkg-config --cflags glib-2.0) $(CFLAGS) -_LDFLAGS = -shared -lm -pthread -lrt -ljansson $(shell pkg-config --libs glib-2.0) $(LDFLAGS) +_LDFLAGS = -shared -lm -pthread -lrt -ljansson -lopus -lasound $(shell pkg-config --libs glib-2.0) $(LDFLAGS) _SRCS = $(shell ls src/*.c) diff --git a/janus/src/audio.c b/janus/src/audio.c new file mode 100644 index 0000000..26e4ceb --- /dev/null +++ b/janus/src/audio.c @@ -0,0 +1,249 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "audio.h" + + +#define JLOG_PERROR_ALSA(_err, _prefix, _msg, ...) JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, snd_strerror(_err)) +#define JLOG_PERROR_OPUS(_err, _prefix, _msg, ...) JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, opus_strerror(_err)) + + +// https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368 +#define PCM_BITRATE 48000 +#define PCM_FRAMES (6 * PCM_BITRATE / 50) // 120ms +#define PCM_DATA_SIZE (PCM_FRAMES * 2) +#define RAW_DATA_SIZE (PCM_DATA_SIZE * sizeof(opus_int16)) + + +typedef struct { + opus_int16 data[PCM_DATA_SIZE]; +} _pcm_buf_s; + +typedef struct { + uint8_t data[RAW_DATA_SIZE]; // Worst + size_t used; + uint64_t pts; +} _enc_buf_s; + + +static void *_pcm_thread(void *v_audio); +static void *_encoder_thread(void *v_audio); + + +audio_s *audio_init(const char *name) { + audio_s *audio; + A_CALLOC(audio, 1); + audio->pcm_queue = queue_init(8); + audio->enc_queue = queue_init(8); + atomic_init(&audio->run, true); + + { + int err; + if ((err = snd_pcm_open(&audio->pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) { + audio->pcm = NULL; + JLOG_PERROR_ALSA(err, "audio", "Can't open PCM capture"); + goto error; + } + assert(!snd_pcm_hw_params_malloc(&audio->pcm_params)); + +# define SET_PARAM(_msg, _func, ...) { \ + if ((err = _func(audio->pcm, audio->pcm_params, ##__VA_ARGS__)) < 0) { \ + JLOG_PERROR_ALSA(err, "audio", _msg); \ + goto error; \ + } \ + } + + SET_PARAM("Can't initialize PCM params", snd_pcm_hw_params_any); + SET_PARAM("Can't set PCM access type", snd_pcm_hw_params_set_access, SND_PCM_ACCESS_RW_INTERLEAVED); + SET_PARAM("Can't set PCM channels numbre", snd_pcm_hw_params_set_channels, 2); + SET_PARAM("Can't set PCM sampling format", snd_pcm_hw_params_set_format, SND_PCM_FORMAT_S16_LE); + unsigned pcm_bitrate = PCM_BITRATE; + SET_PARAM("Can't set PCM sampling rate", snd_pcm_hw_params_set_rate_near, &pcm_bitrate, 0); + if (pcm_bitrate != PCM_BITRATE) { + JLOG_ERROR("audio", "PCM bitrate mismatch: %u, should be %u", pcm_bitrate, PCM_BITRATE); + goto error; + } + SET_PARAM("Can't apply PCM params", snd_pcm_hw_params); + +# undef SET_PARAM + } + + { + int err; + // OPUS_APPLICATION_VOIP + // OPUS_APPLICATION_RESTRICTED_LOWDELAY + audio->enc = opus_encoder_create(PCM_BITRATE, 2, OPUS_APPLICATION_AUDIO, &err); + if (err < 0) { + audio->enc = NULL; + JLOG_PERROR_OPUS(err, "audio", "Can't create OPUS encoder"); + goto error; + } + +# define SET_PARAM(_msg, _ctl) { \ + if ((err = opus_encoder_ctl(audio->enc, _ctl)) < 0) { \ + JLOG_PERROR_OPUS(err, "audio", _msg); \ + goto error; \ + } \ + } + + SET_PARAM("Can't set OPUS bitrate", OPUS_SET_BITRATE(48000)); + SET_PARAM("Can't set OPUS max bandwidth", OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND)); + SET_PARAM("Can't set OPUS signal type", OPUS_SET_SIGNAL(OPUS_SIGNAL_MUSIC)); + // Also see rtpa.c + // SET_PARAM("Can't set OPUS FEC", OPUS_SET_INBAND_FEC(1)); + // SET_PARAM("Can't set OPUS exploss", OPUS_SET_PACKET_LOSS_PERC(10)); + +# undef SET_PARAM + } + + JLOG_INFO("audio", "PCM & OPUS prepared; capturing ..."); + audio->tids_created = true; + A_THREAD_CREATE(&audio->enc_tid, _encoder_thread, audio); + A_THREAD_CREATE(&audio->pcm_tid, _pcm_thread, audio); + + return audio; + + error: + audio_destroy(audio); + return NULL; +} + +void audio_destroy(audio_s *audio) { + if (audio->tids_created) { + atomic_store(&audio->run, false); + A_THREAD_JOIN(audio->pcm_tid); + A_THREAD_JOIN(audio->enc_tid); + } + if (audio->enc) { + opus_encoder_destroy(audio->enc); + } + if (audio->pcm) { + snd_pcm_close(audio->pcm); + } + if (audio->pcm_params) { + snd_pcm_hw_params_free(audio->pcm_params); + } +# define FREE_QUEUE(_suffix) { \ + while (!queue_get_free(audio->_suffix##_queue)) { \ + _##_suffix##_buf_s *ptr; \ + assert(!queue_get(audio->_suffix##_queue, (void **)&ptr, 1)); \ + free(ptr); \ + } \ + queue_destroy(audio->_suffix##_queue); \ + } + FREE_QUEUE(enc); + FREE_QUEUE(pcm); +# undef FREE_QUEUE + if (audio->tids_created) { + JLOG_INFO("audio", "Pipeline closed"); + } + free(audio); +} + +int audio_copy_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts) { + if (!atomic_load(&audio->run)) { + return -1; + } + _enc_buf_s *in; + if (!queue_get(audio->enc_queue, (void **)&in, 1)) { + if (*size < in->used) { + free(in); + return -3; + } + memcpy(data, in->data, in->used); + *size = in->used; + *pts = in->pts; + free(in); + return 0; + } + return -2; +} + +static void *_pcm_thread(void *v_audio) { + A_THREAD_RENAME("us_a_pcm"); + + audio_s *audio = (audio_s *)v_audio; + + while (atomic_load(&audio->run)) { + uint8_t in[RAW_DATA_SIZE]; + int frames = snd_pcm_readi(audio->pcm, in, PCM_FRAMES); + if (frames < 0) { + JLOG_PERROR_ALSA(frames, "audio", "Can't capture PCM frames; breaking audio ..."); + break; + } else if (frames < PCM_FRAMES) { + JLOG_ERROR("audio", "Too few PCM frames captured; breaking audio ..."); + break; + } + + if (queue_get_free(audio->pcm_queue)) { + _pcm_buf_s *out; + A_CALLOC(out, 1); + /*for (unsigned index = 0; index < RAW_DATA_SIZE; ++index) { + out->data[index] = (opus_int16)in[index * 2 + 1] << 8 | in[index * 2]; + }*/ + memcpy(out->data, in, RAW_DATA_SIZE); + assert(!queue_put(audio->pcm_queue, out, 1)); + } else { + JLOG_ERROR("audio", "PCM queue is full"); + } + } + + atomic_store(&audio->run, false); + return NULL; +} + +static void *_encoder_thread(void *v_audio) { + A_THREAD_RENAME("us_a_enc"); + + audio_s *audio = (audio_s *)v_audio; + + while (atomic_load(&audio->run)) { + _pcm_buf_s *in; + if (!queue_get(audio->pcm_queue, (void **)&in, 1)) { + _enc_buf_s *out; + A_CALLOC(out, 1); + int size = opus_encode(audio->enc, in->data, PCM_FRAMES, out->data, RAW_DATA_SIZE); + free(in); + if (size < 0) { + JLOG_PERROR_OPUS(size, "audio", "Can't encode PCM frame to OPUS; breaking audio ..."); + free(out); + break; + } + out->used = size; + out->pts = audio->pts; + audio->pts += PCM_FRAMES; + + if (queue_get_free(audio->enc_queue)) { + assert(!queue_put(audio->enc_queue, out, 1)); + } else { + JLOG_ERROR("audio", "OPUS encoder queue is full"); + free(out); + } + } + } + + atomic_store(&audio->run, false); + return NULL; +} diff --git a/janus/src/audio.h b/janus/src/audio.h new file mode 100644 index 0000000..075c063 --- /dev/null +++ b/janus/src/audio.h @@ -0,0 +1,65 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include "tools.h" +#include "jlogging.h" +#include "threading.h" +#include "queue.h" + + +typedef struct { + snd_pcm_t *pcm; + snd_pcm_hw_params_t *pcm_params; + OpusEncoder *enc; + + queue_s *pcm_queue; + queue_s *enc_queue; + uint32_t pts; + + pthread_t pcm_tid; + pthread_t enc_tid; + bool tids_created; + atomic_bool run; +} audio_s; + + +audio_s *audio_init(const char *name); +void audio_destroy(audio_s *audio); + +int audio_copy_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts); diff --git a/janus/src/jlogging.h b/janus/src/jlogging.h new file mode 100644 index 0000000..0bc80c6 --- /dev/null +++ b/janus/src/jlogging.h @@ -0,0 +1,36 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include + +#include "tools.h" + + +#define JLOG_INFO(_prefix, _msg, ...) JANUS_LOG(LOG_INFO, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__) +#define JLOG_WARN(_prefix, _msg, ...) JANUS_LOG(LOG_WARN, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__) +#define JLOG_ERROR(_prefix, _msg, ...) JANUS_LOG(LOG_ERR, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__) + +#define JLOG_PERROR(_prefix, _msg, ...) { \ + char _perror_buf[1024] = {0}; \ + char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \ + JANUS_LOG(LOG_ERR, "[ustreamer/%-9s] " _msg ": %s\n", _prefix, ##__VA_ARGS__, _perror_ptr); \ + } diff --git a/janus/src/plugin.c b/janus/src/plugin.c index 6f5ec4f..144c250 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -39,18 +40,20 @@ #include "config.h" #include "tools.h" +#include "jlogging.h" #include "threading.h" #include "list.h" #include "memsinksh.h" - -#include "rtp.h" +#include "audio.h" +#include "rtpv.h" +#include "rtpa.h" static int _plugin_init(janus_callbacks *gw, const char *config_file_path); static void _plugin_destroy(void); -static void _plugin_create_session(janus_plugin_session *session, int *error); -static void _plugin_destroy_session(janus_plugin_session *session, int *error); +static void _plugin_create_session(janus_plugin_session *session, int *err); +static void _plugin_destroy_session(janus_plugin_session *session, int *err); static json_t *_plugin_query_session(janus_plugin_session *session); static void _plugin_setup_media(janus_plugin_session *session); @@ -59,14 +62,17 @@ static void _plugin_hangup_media(janus_plugin_session *session); static struct janus_plugin_result *_plugin_handle_message( janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep); -static int _plugin_get_api_compatibility(void); -static int _plugin_get_version(void); -static const char *_plugin_get_version_string(void); -static const char *_plugin_get_description(void); -static const char *_plugin_get_name(void); -static const char *_plugin_get_author(void); -static const char *_plugin_get_package(void); +static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; } +static int _plugin_get_version(void) { return VERSION_U; } +static const char *_plugin_get_version_string(void) { return VERSION; } +static const char *_plugin_get_description(void) { return "PiKVM uStreamer Janus plugin for H.264 video"; } +static const char *_plugin_get_name(void) { return "ustreamer"; } +static const char *_plugin_get_author(void) { return "Maxim Devaev "; } +static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; } + +// Just a stub to avoid logging spam about the plugin's purpose. +static void _plugin_incoming_rtp(UNUSED janus_plugin_session *handle, UNUSED janus_plugin_rtp *packet) {} #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Woverride-init" @@ -90,6 +96,8 @@ static janus_plugin _plugin = JANUS_PLUGIN_INIT( .get_name = _plugin_get_name, .get_author = _plugin_get_author, .get_package = _plugin_get_package, + + .incoming_rtp = _plugin_incoming_rtp, ); #pragma GCC diagnostic pop @@ -112,27 +120,24 @@ const long double _g_lock_timeout = 1; const useconds_t _g_lock_polling = 1000; const useconds_t _g_watchers_polling = 100000; +static char *_g_audio_dev = NULL; + static _client_s *_g_clients = NULL; static janus_callbacks *_g_gw = NULL; -static rtp_s *_g_rtp = NULL; +static rtpv_s *_g_rtpv = NULL; +static rtpa_s *_g_rtpa = NULL; + +static pthread_t _g_video_tid; +static atomic_bool _g_video_tid_created = ATOMIC_VAR_INIT(false); +static pthread_t _g_audio_tid; +static atomic_bool _g_audio_tid_created = ATOMIC_VAR_INIT(false); -static pthread_t _g_tid; static pthread_mutex_t _g_lock = PTHREAD_MUTEX_INITIALIZER; static atomic_bool _g_ready = ATOMIC_VAR_INIT(false); static atomic_bool _g_stop = ATOMIC_VAR_INIT(false); static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false); -#define JLOG_INFO(_msg, ...) JANUS_LOG(LOG_INFO, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) -#define JLOG_WARN(_msg, ...) JANUS_LOG(LOG_WARN, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) -#define JLOG_ERROR(_msg, ...) JANUS_LOG(LOG_ERR, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) - -#define JLOG_PERROR(_msg, ...) { \ - char _perror_buf[1024] = {0}; \ - char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \ - JANUS_LOG(LOG_ERR, "[%s] " _msg ": %s\n", _plugin_get_name(), ##__VA_ARGS__, _perror_ptr); \ - } - #define LOCK A_MUTEX_LOCK(&_g_lock) #define UNLOCK A_MUTEX_UNLOCK(&_g_lock) #define READY atomic_load(&_g_ready) @@ -140,20 +145,6 @@ static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false); #define HAS_WATCHERS atomic_load(&_g_has_watchers) -static void _relay_rtp_clients(const uint8_t *datagram, size_t size) { - janus_plugin_rtp packet = { - .video = true, - .buffer = (char *)datagram, - .length = size, - }; - janus_plugin_rtp_extensions_reset(&packet.extensions); - LIST_ITERATE(_g_clients, client, { - if (client->transmit) { - _g_gw->relay_rtp(client->session, &packet); - } - }); -} - static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) { long double deadline_ts = get_now_monotonic() + _g_wait_timeout; long double now; @@ -161,14 +152,14 @@ static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) { int retval = flock_timedwait_monotonic(fd, _g_lock_timeout); now = get_now_monotonic(); if (retval < 0 && errno != EWOULDBLOCK) { - JLOG_PERROR("Can't lock memsink"); + JLOG_PERROR("video", "Can't lock memsink"); return -1; } else if (retval == 0) { if (mem->magic == MEMSINK_MAGIC && mem->version == MEMSINK_VERSION && mem->id != last_id) { return 0; } if (flock(fd, LOCK_UN) < 0) { - JLOG_PERROR("Can't unlock memsink"); + JLOG_PERROR("video", "Can't unlock memsink"); return -1; } } @@ -184,18 +175,32 @@ static int _get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *f mem->last_client_ts = get_now_monotonic(); int retval = 0; if (frame->format != V4L2_PIX_FMT_H264) { - JLOG_ERROR("Got non-H264 frame from memsink"); + JLOG_ERROR("video", "Got non-H264 frame from memsink"); retval = -1; } if (flock(fd, LOCK_UN) < 0) { - JLOG_PERROR("Can't unlock memsink"); + JLOG_PERROR("video", "Can't unlock memsink"); retval = -1; } return retval; } -static void *_clients_thread(UNUSED void *arg) { - A_THREAD_RENAME("us_clients"); +static void _relay_rtp_clients(const rtp_s *rtp) { + janus_plugin_rtp packet = {0}; + packet.video = rtp->video; + packet.buffer = (char *)rtp->datagram; + packet.length = rtp->used; + janus_plugin_rtp_extensions_reset(&packet.extensions); + LIST_ITERATE(_g_clients, client, { + if (client->transmit) { + _g_gw->relay_rtp(client->session, &packet); + } + }); +} + +static void *_clients_video_thread(UNUSED void *arg) { + A_THREAD_RENAME("us_v_clients"); + atomic_store(&_g_video_tid_created, true); atomic_store(&_g_ready, true); frame_s *frame = frame_init(); @@ -209,9 +214,7 @@ static void *_clients_thread(UNUSED void *arg) { while (!STOP) { if (!HAS_WATCHERS) { - IF_NOT_REPORTED(1, { - JLOG_INFO("No active watchers, memsink disconnected"); - }); + IF_NOT_REPORTED(1, { JLOG_INFO("video", "No active watchers, memsink disconnected"); }); usleep(_g_watchers_polling); continue; } @@ -220,22 +223,18 @@ static void *_clients_thread(UNUSED void *arg) { memsink_shared_s *mem = NULL; if ((fd = shm_open(_g_memsink_obj, O_RDWR, 0)) <= 0) { - IF_NOT_REPORTED(2, { - JLOG_PERROR("Can't open memsink"); - }); + IF_NOT_REPORTED(2, { JLOG_PERROR("video", "Can't open memsink"); }); goto close_memsink; } if ((mem = memsink_shared_map(fd)) == NULL) { - IF_NOT_REPORTED(3, { - JLOG_PERROR("Can't map memsink"); - }); + IF_NOT_REPORTED(3, { JLOG_PERROR("video", "Can't map memsink"); }); goto close_memsink; } error_reported = 0; - JLOG_INFO("Memsink opened; reading frames ..."); + JLOG_INFO("video", "Memsink opened; reading frames ..."); while (!STOP && HAS_WATCHERS) { int result = _wait_frame(fd, mem, frame_id); if (result == 0) { @@ -243,7 +242,7 @@ static void *_clients_thread(UNUSED void *arg) { goto close_memsink; } LOCK; - rtp_wrap_h264(_g_rtp, frame, _relay_rtp_clients); + rtpv_wrap(_g_rtpv, frame); UNLOCK; } else if (result == -1) { goto close_memsink; @@ -252,7 +251,7 @@ static void *_clients_thread(UNUSED void *arg) { close_memsink: if (mem != NULL) { - JLOG_INFO("Memsink closed"); + JLOG_INFO("video", "Memsink closed"); memsink_shared_unmap(mem); mem = NULL; } @@ -269,27 +268,75 @@ static void *_clients_thread(UNUSED void *arg) { return NULL; } +static void *_clients_audio_thread(UNUSED void *arg) { + A_THREAD_RENAME("us_a_clients"); + atomic_store(&_g_audio_tid_created, true); + assert(_g_audio_dev); + + while (!STOP) { + if (!HAS_WATCHERS) { + usleep(_g_watchers_polling); + continue; + } + + audio_s *audio = NULL; + if ((audio = audio_init(_g_audio_dev)) == NULL) { + goto close_audio; + } + + while (!STOP && HAS_WATCHERS) { + size_t size = RTP_DATAGRAM_SIZE - RTP_HEADER_SIZE; + uint8_t data[size]; + uint64_t pts; + int result = audio_copy_encoded(audio, data, &size, &pts); + if (result == 0) { + LOCK; + rtpa_wrap(_g_rtpa, data, size, pts); + UNLOCK; + } else if (result == -1) { + goto close_audio; + } + } + + close_audio: + if (audio != NULL) { + audio_destroy(audio); + } + sleep(1); // error_delay + } + return NULL; +} + +static char *_get_config_value(janus_config *config, const char *section, const char *option) { + janus_config_category *section_obj = janus_config_get_create(config, NULL, janus_config_type_category, section); + janus_config_item *option_obj = janus_config_get(config, section_obj, janus_config_type_item, option); + if (option_obj == NULL || option_obj->value == NULL || option_obj->value[0] == '\0') { + return NULL; + } + return strdup(option_obj->value); +} + static int _read_config(const char *config_dir_path) { char *config_file_path; janus_config *config = NULL; A_ASPRINTF(config_file_path, "%s/%s.jcfg", config_dir_path, _plugin_get_package()); - JLOG_INFO("Reading config file '%s' ...", config_file_path); + JLOG_INFO("main", "Reading config file '%s' ...", config_file_path); config = janus_config_parse(config_file_path); if (config == NULL) { - JLOG_ERROR("Can't read config"); + JLOG_ERROR("main", "Can't read config"); goto error; } janus_config_print(config); - janus_config_category *config_memsink = janus_config_get_create(config, NULL, janus_config_type_category, "memsink"); - janus_config_item *config_memsink_obj = janus_config_get(config, config_memsink, janus_config_type_item, "object"); - if (config_memsink_obj == NULL || config_memsink_obj->value == NULL || config_memsink_obj->value[0] == '\0') { - JLOG_ERROR("Missing config value: memsink.object"); + if ((_g_memsink_obj = _get_config_value(config, "memsink", "object")) == NULL) { + JLOG_ERROR("main", "Missing config value: memsink.object"); goto error; } - _g_memsink_obj = strdup(config_memsink_obj->value); + if ((_g_audio_dev = _get_config_value(config, "audio", "device")) != NULL) { + JLOG_INFO("main", "Enabled the experimental AUDIO feature"); + } int retval = 0; goto ok; @@ -310,23 +357,32 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { // sysctl -w net.core.rmem_max=1000000 // sysctl -w net.core.wmem_max=1000000 - JLOG_INFO("Initializing plugin ..."); + JLOG_INFO("main", "Initializing plugin ..."); + assert(!atomic_load(&_g_video_tid_created)); + assert(!atomic_load(&_g_audio_tid_created)); assert(!READY); assert(!STOP); if (gw == NULL || config_dir_path == NULL || _read_config(config_dir_path) < 0) { return -1; } _g_gw = gw; - _g_rtp = rtp_init(); - A_THREAD_CREATE(&_g_tid, _clients_thread, NULL); + _g_rtpv = rtpv_init(_relay_rtp_clients); + if (_g_audio_dev) { + _g_rtpa = rtpa_init(_relay_rtp_clients); + A_THREAD_CREATE(&_g_audio_tid, _clients_audio_thread, NULL); + } + A_THREAD_CREATE(&_g_video_tid, _clients_video_thread, NULL); return 0; } static void _plugin_destroy(void) { - JLOG_INFO("Destroying plugin ..."); + JLOG_INFO("main", "Destroying plugin ..."); atomic_store(&_g_stop, true); - if (READY) { - A_THREAD_JOIN(_g_tid); + if (atomic_load(&_g_video_tid_created)) { + A_THREAD_JOIN(_g_video_tid); + } + if (atomic_load(&_g_audio_tid_created)) { + A_THREAD_JOIN(_g_audio_tid); } LIST_ITERATE(_g_clients, client, { @@ -335,11 +391,21 @@ static void _plugin_destroy(void) { }); _g_clients = NULL; - rtp_destroy(_g_rtp); - _g_rtp = NULL; + if (_g_rtpa) { + rtpa_destroy(_g_rtpa); + _g_rtpa = NULL; + } + + rtpv_destroy(_g_rtpv); + _g_rtpv = NULL; _g_gw = NULL; + if (_g_audio_dev) { + free(_g_audio_dev); + _g_audio_dev = NULL; + } + if (_g_memsink_obj) { free(_g_memsink_obj); _g_memsink_obj = NULL; @@ -348,10 +414,10 @@ static void _plugin_destroy(void) { #define IF_DISABLED(...) { if (!READY || STOP) { __VA_ARGS__ } } -static void _plugin_create_session(janus_plugin_session *session, int *error) { - IF_DISABLED({ *error = -1; return; }); +static void _plugin_create_session(janus_plugin_session *session, int *err) { + IF_DISABLED({ *err = -1; return; }); LOCK; - JLOG_INFO("Creating session %p ...", session); + JLOG_INFO("main", "Creating session %p ...", session); _client_s *client; A_CALLOC(client, 1); client->session = session; @@ -361,14 +427,14 @@ static void _plugin_create_session(janus_plugin_session *session, int *error) { UNLOCK; } -static void _plugin_destroy_session(janus_plugin_session* session, int *error) { - IF_DISABLED({ *error = -1; return; }); +static void _plugin_destroy_session(janus_plugin_session* session, int *err) { + IF_DISABLED({ *err = -1; return; }); LOCK; bool found = false; bool has_watchers = false; LIST_ITERATE(_g_clients, client, { if (client->session == session) { - JLOG_INFO("Removing session %p ...", session); + JLOG_INFO("main", "Removing session %p ...", session); LIST_REMOVE(_g_clients, client); free(client); found = true; @@ -377,8 +443,8 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *error) { } }); if (!found) { - JLOG_WARN("No session %p", session); - *error = -2; + JLOG_WARN("main", "No session %p", session); + *err = -2; } atomic_store(&_g_has_watchers, has_watchers); UNLOCK; @@ -406,13 +472,13 @@ static void _set_transmit(janus_plugin_session *session, UNUSED const char *msg, LIST_ITERATE(_g_clients, client, { if (client->session == session) { client->transmit = transmit; - //JLOG_INFO("%s session %p", msg, session); + // JLOG_INFO("main", "%s session %p", msg, session); found = true; } has_watchers = (has_watchers || client->transmit); }); if (!found) { - JLOG_WARN("No session %p", session); + JLOG_WARN("main", "No session %p", session); } atomic_store(&_g_has_watchers, has_watchers); UNLOCK; @@ -440,7 +506,7 @@ static struct janus_plugin_result *_plugin_handle_message( } # define PUSH_ERROR(_error, _reason) { \ - /*JLOG_ERROR("Message error in session %p: %s", session, _reason);*/ \ + /*JLOG_ERROR("main", "Message error in session %p: %s", session, _reason);*/ \ json_t *_event = json_object(); \ json_object_set_new(_event, "ustreamer", json_string("event")); \ json_object_set_new(_event, "error_code", json_integer(_error)); \ @@ -460,7 +526,7 @@ static struct janus_plugin_result *_plugin_handle_message( PUSH_ERROR(400, "Request not a string"); goto ok_wait; } - //JLOG_INFO("Message: %s", request_str); + // JLOG_INFO("main", "Message: %s", request_str); # define PUSH_STATUS(_status, _jsep) { \ json_t *_event = json_object(); \ @@ -479,12 +545,25 @@ static struct janus_plugin_result *_plugin_handle_message( PUSH_STATUS("stopped", NULL); } else if (!strcmp(request_str, "watch")) { - char *sdp = rtp_make_sdp(_g_rtp); - if (sdp == NULL) { - PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet"); - goto ok_wait; + char *sdp; + { + char *video_sdp = rtpv_make_sdp(_g_rtpv); + if (video_sdp == NULL) { + PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet"); + goto ok_wait; + } + char *audio_sdp = (_g_rtpa ? rtpa_make_sdp(_g_rtpa) : strdup("")); + A_ASPRINTF(sdp, + "v=0" RN + "o=- %" PRIu64 " 1 IN IP4 0.0.0.0" RN + "s=PiKVM uStreamer" RN + "t=0 0" RN + "%s%s", + get_now_id() >> 1, audio_sdp, video_sdp + ); + free(audio_sdp); + free(video_sdp); } - //JLOG_INFO("SDP generated:\n%s", sdp); json_t *offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp); free(sdp); PUSH_STATUS("started", offer_jsep); @@ -501,22 +580,3 @@ static struct janus_plugin_result *_plugin_handle_message( # undef PUSH_ERROR # undef FREE_MSG_JSEP } - -static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; } -static int _plugin_get_version(void) { return VERSION_U; } -static const char *_plugin_get_version_string(void) { return VERSION; } -static const char *_plugin_get_description(void) { return "Pi-KVM uStreamer Janus plugin for H.264 video"; } -static const char *_plugin_get_name(void) { return "ustreamer"; } -static const char *_plugin_get_author(void) { return "Maxim Devaev "; } -static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; } - - -#undef STOP -#undef READY -#undef UNLOCK -#undef LOCK - -#undef JLOG_PERROR -#undef JLOG_ERROR -#undef JLOG_WARN -#undef JLOG_INFO diff --git a/janus/src/queue.c b/janus/src/queue.c new file mode 100644 index 0000000..62cef33 --- /dev/null +++ b/janus/src/queue.c @@ -0,0 +1,91 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "queue.h" + + +queue_s *queue_init(unsigned capacity) { + queue_s *queue; + A_CALLOC(queue, 1); + A_CALLOC(queue->items, capacity); + queue->capacity = capacity; + A_MUTEX_INIT(&queue->mutex); + + pthread_condattr_t attrs; + assert(!pthread_condattr_init(&attrs)); + assert(!pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC)); + assert(!pthread_cond_init(&queue->full_cond, &attrs)); + assert(!pthread_cond_init(&queue->empty_cond, &attrs)); + assert(!pthread_condattr_destroy(&attrs)); + return queue; +} + +void queue_destroy(queue_s *queue) { + free(queue->items); + free(queue); +} + +#define WAIT_LOOP(_var, _cond) { \ + struct timespec ts; \ + assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \ + ts.tv_sec += timeout; \ + while (_var) { \ + int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \ + if (err == ETIMEDOUT) { \ + return -1; \ + } \ + assert(!err); \ + } \ + } + +int queue_put(queue_s *queue, void *item, unsigned timeout) { + A_MUTEX_LOCK(&queue->mutex); + WAIT_LOOP(queue->size == queue->capacity, &queue->full_cond); + queue->items[queue->in] = item; + ++queue->size; + ++queue->in; + queue->in %= queue->capacity; + A_MUTEX_UNLOCK(&queue->mutex); + assert(!pthread_cond_broadcast(&queue->empty_cond)); + return 0; +} + +int queue_get(queue_s *queue, void **item, unsigned timeout) { + A_MUTEX_LOCK(&queue->mutex); + WAIT_LOOP(queue->size == 0, &queue->empty_cond); + *item = queue->items[queue->out]; + --queue->size; + ++queue->out; + queue->out %= queue->capacity; + A_MUTEX_UNLOCK(&queue->mutex); + assert(!pthread_cond_broadcast(&queue->full_cond)); + return 0; +} + +#undef WAIT_LOOP + +int queue_get_free(queue_s *queue) { + A_MUTEX_LOCK(&queue->mutex); + unsigned size = queue->size; + A_MUTEX_UNLOCK(&queue->mutex); + return queue->capacity - size; +} diff --git a/janus/src/queue.h b/janus/src/queue.h new file mode 100644 index 0000000..8ac67cc --- /dev/null +++ b/janus/src/queue.h @@ -0,0 +1,54 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include + +#include + +#include "tools.h" +#include "threading.h" + + +// Based on https://github.com/seifzadeh/c-pthread-queue/blob/master/queue.h + +typedef struct { + void **items; + unsigned size; + unsigned capacity; + unsigned in; + unsigned out; + + pthread_mutex_t mutex; + pthread_cond_t full_cond; + pthread_cond_t empty_cond; +} queue_s; + + +queue_s *queue_init(unsigned capacity); +void queue_destroy(queue_s *queue); + +int queue_put(queue_s *queue, void *item, unsigned timeout); +int queue_get(queue_s *queue, void **item, unsigned timeout); +int queue_get_free(queue_s *queue); diff --git a/janus/src/rtp.c b/janus/src/rtp.c index f6047fb..915112a 100644 --- a/janus/src/rtp.c +++ b/janus/src/rtp.c @@ -26,182 +26,25 @@ #include "rtp.h" -#define PAYLOAD 96 // Payload type -#define PRE 3 // Annex B prefix length - - -void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, bool marked, rtp_callback_f callback); -static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked); - -static ssize_t _find_annexb(const uint8_t *data, size_t size); - - -rtp_s *rtp_init(void) { +rtp_s *rtp_init(unsigned payload, bool video) { rtp_s *rtp; A_CALLOC(rtp, 1); + rtp->payload = payload; + rtp->video = video; rtp->ssrc = triple_u32(get_now_monotonic_u64()); - rtp->sps = frame_init(); - rtp->pps = frame_init(); - A_MUTEX_INIT(&rtp->mutex); return rtp; } void rtp_destroy(rtp_s *rtp) { - A_MUTEX_DESTROY(&rtp->mutex); - frame_destroy(rtp->pps); - frame_destroy(rtp->sps); free(rtp); } -char *rtp_make_sdp(rtp_s *rtp) { - A_MUTEX_LOCK(&rtp->mutex); - - if (rtp->sps->used == 0 || rtp->pps->used == 0) { - A_MUTEX_UNLOCK(&rtp->mutex); - return NULL; - } - - char *sps = NULL; - char *pps = NULL; - base64_encode(rtp->sps->data, rtp->sps->used, &sps, NULL); - base64_encode(rtp->pps->data, rtp->pps->used, &pps, NULL); - - A_MUTEX_UNLOCK(&rtp->mutex); - - // https://tools.ietf.org/html/rfc6184 - // https://github.com/meetecho/janus-gateway/issues/2443 - char *sdp; - A_ASPRINTF(sdp, - "v=0" RN - "o=- %" PRIu64 " 1 IN IP4 127.0.0.1" RN - "s=Pi-KVM uStreamer" RN - "t=0 0" RN - "m=video 1 RTP/SAVPF %d" RN - "c=IN IP4 0.0.0.0" RN - "a=rtpmap:%d H264/90000" RN - "a=fmtp:%d profile-level-id=42E01F" RN - "a=fmtp:%d packetization-mode=1" RN - "a=fmtp:%d sprop-sps=%s" RN - "a=fmtp:%d sprop-pps=%s" RN - "a=rtcp-fb:%d nack" RN - "a=rtcp-fb:%d nack pli" RN - "a=rtcp-fb:%d goog-remb" RN - "a=sendonly" RN, - get_now_id() >> 1, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, - PAYLOAD, sps, - PAYLOAD, pps, - PAYLOAD, PAYLOAD, PAYLOAD - ); - - free(sps); - free(pps); - return sdp; -} - -void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback) { - // There is a complicated logic here but everything works as it should: - // - https://github.com/pikvm/ustreamer/issues/115#issuecomment-893071775 - - assert(frame->format == V4L2_PIX_FMT_H264); - - const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz - ssize_t last_offset = -PRE; - - while (true) { // Find and iterate by nalus - const size_t next_start = last_offset + PRE; - ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start); - if (offset < 0) { - break; - } - offset += next_start; - - if (last_offset >= 0) { - const uint8_t *data = frame->data + last_offset + PRE; - size_t size = offset - last_offset - PRE; - if (data[size - 1] == 0) { // Check for extra 00 - --size; - } - _rtp_process_nalu(rtp, data, size, pts, false, callback); - } - - last_offset = offset; - } - - if (last_offset >= 0) { - const uint8_t *data = frame->data + last_offset + PRE; - size_t size = frame->used - last_offset - PRE; - _rtp_process_nalu(rtp, data, size, pts, true, callback); - } -} - -void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, bool marked, rtp_callback_f callback) { - const unsigned ref_idc = (data[0] >> 5) & 3; - const unsigned type = data[0] & 0x1F; - - frame_s *ps = NULL; - switch (type) { - case 7: ps = rtp->sps; break; - case 8: ps = rtp->pps; break; - } - if (ps) { - A_MUTEX_LOCK(&rtp->mutex); - frame_set_data(ps, data, size); - A_MUTEX_UNLOCK(&rtp->mutex); - } - -# define HEADER_SIZE 12 - - if (size + HEADER_SIZE <= RTP_DATAGRAM_SIZE) { - _rtp_write_header(rtp, pts, marked); - memcpy(rtp->datagram + HEADER_SIZE, data, size); - callback(rtp->datagram, size + HEADER_SIZE); - return; - } - - const size_t fu_overhead = HEADER_SIZE + 2; // FU-A overhead - - const uint8_t *src = data + 1; - ssize_t remaining = size - 1; - - bool first = true; - while (remaining > 0) { - ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead; - const bool last = (remaining <= frag_size); - if (last) { - frag_size = remaining; - } - - _rtp_write_header(rtp, pts, (marked && last)); - - rtp->datagram[HEADER_SIZE] = 28 | (ref_idc << 5); - - uint8_t fu = type; - if (first) { - fu |= 0x80; - } - if (last) { - fu |= 0x40; - } - rtp->datagram[HEADER_SIZE + 1] = fu; - - memcpy(rtp->datagram + fu_overhead, src, frag_size); - - callback(rtp->datagram, fu_overhead + frag_size); - - src += frag_size; - remaining -= frag_size; - first = false; - } - -# undef HEADER_SIZE -} - -static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) { +void rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) { uint32_t word0 = 0x80000000; if (marked) { word0 |= 1 << 23; } - word0 |= (PAYLOAD & 0x7F) << 16; + word0 |= (rtp->payload & 0x7F) << 16; word0 |= rtp->seq; ++rtp->seq; @@ -211,18 +54,3 @@ static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) { WRITE_BE_U32(8, rtp->ssrc); # undef WRITE_BE_U32 } - -static ssize_t _find_annexb(const uint8_t *data, size_t size) { - // Parses buffer for 00 00 01 start codes - if (size >= PRE) { - for (size_t index = 0; index <= size - PRE; ++index) { - if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) { - return index; - } - } - } - return -1; -} - -#undef PRE -#undef PAYLOAD diff --git a/janus/src/rtp.h b/janus/src/rtp.h index b1ba606..36dc09a 100644 --- a/janus/src/rtp.h +++ b/janus/src/rtp.h @@ -26,44 +26,33 @@ #pragma once #include -#include #include -#include -#include -#include +#include #include -#include - -#include #include "tools.h" -#include "threading.h" -#include "frame.h" -#include "base64.h" // https://stackoverflow.com/questions/47635545/why-webrtc-chose-rtp-max-packet-size-to-1200-bytes -#define RTP_DATAGRAM_SIZE 1200 +#define RTP_DATAGRAM_SIZE 1200 +#define RTP_HEADER_SIZE 12 typedef struct { - uint32_t ssrc; - uint16_t seq; + unsigned payload; + bool video; + uint32_t ssrc; - uint8_t datagram[RTP_DATAGRAM_SIZE]; - - frame_s *sps; // Actually not a frame, just a bytes storage - frame_s *pps; - - pthread_mutex_t mutex; + uint16_t seq; + uint8_t datagram[RTP_DATAGRAM_SIZE]; + size_t used; } rtp_s; -typedef void (*rtp_callback_f)(const uint8_t *datagram, size_t size); +typedef void (*rtp_callback_f)(const rtp_s *rtp); -rtp_s *rtp_init(void); +rtp_s *rtp_init(unsigned payload, bool video); void rtp_destroy(rtp_s *rtp); -char *rtp_make_sdp(rtp_s *rtp); -void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback); +void rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked); diff --git a/janus/src/rtpa.c b/janus/src/rtpa.c new file mode 100644 index 0000000..8882b78 --- /dev/null +++ b/janus/src/rtpa.c @@ -0,0 +1,69 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "rtpa.h" + + +rtpa_s *rtpa_init(rtp_callback_f callback) { + rtpa_s *rtpa; + A_CALLOC(rtpa, 1); + rtpa->rtp = rtp_init(111, false); + rtpa->callback = callback; + return rtpa; +} + +void rtpa_destroy(rtpa_s *rtpa) { + rtp_destroy(rtpa->rtp); + free(rtpa); +} + +char *rtpa_make_sdp(rtpa_s *rtpa) { +# define PAYLOAD rtpa->rtp->payload + char *sdp; + A_ASPRINTF(sdp, + "m=audio 1 RTP/SAVPF %u" RN + "c=IN IP4 0.0.0.0" RN + "a=rtpmap:%u OPUS/48000/2" RN + // "a=fmtp:%u useinbandfec=1" RN + "a=rtcp-fb:%u nack" RN + "a=rtcp-fb:%u nack pli" RN + "a=rtcp-fb:%u goog-remb" RN + "a=ssrc:%" PRIu32 " cname:ustreamer" RN + "a=sendonly" RN, + PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, // PAYLOAD, + rtpa->rtp->ssrc + ); +# undef PAYLOAD + return sdp; +} + +void rtpa_wrap(rtpa_s *rtpa, const uint8_t *data, size_t size, uint32_t pts) { + if (size + RTP_HEADER_SIZE <= RTP_DATAGRAM_SIZE) { + rtp_write_header(rtpa->rtp, pts, false); + memcpy(rtpa->rtp->datagram + RTP_HEADER_SIZE, data, size); + rtpa->rtp->used = size + RTP_HEADER_SIZE; + rtpa->callback(rtpa->rtp); + } +} diff --git a/janus/src/rtpa.h b/janus/src/rtpa.h new file mode 100644 index 0000000..90399b8 --- /dev/null +++ b/janus/src/rtpa.h @@ -0,0 +1,51 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include +#include +#include + +#include + +#include "tools.h" +#include "threading.h" +#include "audio.h" +#include "rtp.h" + + +typedef struct { + rtp_s *rtp; + rtp_callback_f callback; +} rtpa_s; + + +rtpa_s *rtpa_init(rtp_callback_f callback); +void rtpa_destroy(rtpa_s *rtpa); + +char *rtpa_make_sdp(rtpa_s *rtpa); +void rtpa_wrap(rtpa_s *rtpa, const uint8_t *data, size_t size, uint32_t pts); diff --git a/janus/src/rtpv.c b/janus/src/rtpv.c new file mode 100644 index 0000000..4b550fd --- /dev/null +++ b/janus/src/rtpv.c @@ -0,0 +1,211 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "rtpv.h" + + +void _rtpv_process_nalu(rtpv_s *rtpv, const uint8_t *data, size_t size, uint32_t pts, bool marked); + +static ssize_t _find_annexb(const uint8_t *data, size_t size); + + +rtpv_s *rtpv_init(rtp_callback_f callback) { + rtpv_s *rtpv; + A_CALLOC(rtpv, 1); + rtpv->rtp = rtp_init(96, true); + rtpv->callback = callback; + rtpv->sps = frame_init(); + rtpv->pps = frame_init(); + A_MUTEX_INIT(&rtpv->mutex); + return rtpv; +} + +void rtpv_destroy(rtpv_s *rtpv) { + A_MUTEX_DESTROY(&rtpv->mutex); + frame_destroy(rtpv->pps); + frame_destroy(rtpv->sps); + rtp_destroy(rtpv->rtp); + free(rtpv); +} + +char *rtpv_make_sdp(rtpv_s *rtpv) { + A_MUTEX_LOCK(&rtpv->mutex); + + if (rtpv->sps->used == 0 || rtpv->pps->used == 0) { + A_MUTEX_UNLOCK(&rtpv->mutex); + return NULL; + } + + char *sps = NULL; + char *pps = NULL; + base64_encode(rtpv->sps->data, rtpv->sps->used, &sps, NULL); + base64_encode(rtpv->pps->data, rtpv->pps->used, &pps, NULL); + + A_MUTEX_UNLOCK(&rtpv->mutex); + +# define PAYLOAD rtpv->rtp->payload + // https://tools.ietf.org/html/rfc6184 + // https://github.com/meetecho/janus-gateway/issues/2443 + char *sdp; + A_ASPRINTF(sdp, + "m=video 1 RTP/SAVPF %u" RN + "c=IN IP4 0.0.0.0" RN + "a=rtpmap:%u H264/90000" RN + "a=fmtp:%u profile-level-id=42E01F" RN + "a=fmtp:%u packetization-mode=1" RN + "a=fmtp:%u sprop-sps=%s" RN + "a=fmtp:%u sprop-pps=%s" RN + "a=rtcp-fb:%u nack" RN + "a=rtcp-fb:%u nack pli" RN + "a=rtcp-fb:%u goog-remb" RN + "a=ssrc:%" PRIu32 " cname:ustreamer" RN + "a=sendonly" RN, + PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, + PAYLOAD, sps, + PAYLOAD, pps, + PAYLOAD, PAYLOAD, PAYLOAD, + rtpv->rtp->ssrc + ); +# undef PAYLOAD + + free(sps); + free(pps); + return sdp; +} + +#define PRE 3 // Annex B prefix length + +void rtpv_wrap(rtpv_s *rtpv, const frame_s *frame) { + // There is a complicated logic here but everything works as it should: + // - https://github.com/pikvm/ustreamer/issues/115#issuecomment-893071775 + + assert(frame->format == V4L2_PIX_FMT_H264); + + const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz + ssize_t last_offset = -PRE; + + while (true) { // Find and iterate by nalus + const size_t next_start = last_offset + PRE; + ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start); + if (offset < 0) { + break; + } + offset += next_start; + + if (last_offset >= 0) { + const uint8_t *data = frame->data + last_offset + PRE; + size_t size = offset - last_offset - PRE; + if (data[size - 1] == 0) { // Check for extra 00 + --size; + } + _rtpv_process_nalu(rtpv, data, size, pts, false); + } + + last_offset = offset; + } + + if (last_offset >= 0) { + const uint8_t *data = frame->data + last_offset + PRE; + size_t size = frame->used - last_offset - PRE; + _rtpv_process_nalu(rtpv, data, size, pts, true); + } +} + +void _rtpv_process_nalu(rtpv_s *rtpv, const uint8_t *data, size_t size, uint32_t pts, bool marked) { + const unsigned ref_idc = (data[0] >> 5) & 3; + const unsigned type = data[0] & 0x1F; + + frame_s *ps = NULL; + switch (type) { + case 7: ps = rtpv->sps; break; + case 8: ps = rtpv->pps; break; + } + if (ps) { + A_MUTEX_LOCK(&rtpv->mutex); + frame_set_data(ps, data, size); + A_MUTEX_UNLOCK(&rtpv->mutex); + } + +# define DG rtpv->rtp->datagram + + if (size + RTP_HEADER_SIZE <= RTP_DATAGRAM_SIZE) { + rtp_write_header(rtpv->rtp, pts, marked); + memcpy(DG + RTP_HEADER_SIZE, data, size); + rtpv->rtp->used = size + RTP_HEADER_SIZE; + rtpv->callback(rtpv->rtp); + return; + } + + const size_t fu_overhead = RTP_HEADER_SIZE + 2; // FU-A overhead + + const uint8_t *src = data + 1; + ssize_t remaining = size - 1; + + bool first = true; + while (remaining > 0) { + ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead; + const bool last = (remaining <= frag_size); + if (last) { + frag_size = remaining; + } + + rtp_write_header(rtpv->rtp, pts, (marked && last)); + + DG[RTP_HEADER_SIZE] = 28 | (ref_idc << 5); + + uint8_t fu = type; + if (first) { + fu |= 0x80; + } + if (last) { + fu |= 0x40; + } + DG[RTP_HEADER_SIZE + 1] = fu; + + memcpy(DG + fu_overhead, src, frag_size); + rtpv->rtp->used = fu_overhead + frag_size; + rtpv->callback(rtpv->rtp); + + src += frag_size; + remaining -= frag_size; + first = false; + } + +# undef DG +} + +static ssize_t _find_annexb(const uint8_t *data, size_t size) { + // Parses buffer for 00 00 01 start codes + if (size >= PRE) { + for (size_t index = 0; index <= size - PRE; ++index) { + if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) { + return index; + } + } + } + return -1; +} + +#undef PRE diff --git a/janus/src/rtpv.h b/janus/src/rtpv.h new file mode 100644 index 0000000..4ca7e58 --- /dev/null +++ b/janus/src/rtpv.h @@ -0,0 +1,59 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPEG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2022 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "tools.h" +#include "threading.h" +#include "frame.h" +#include "base64.h" +#include "rtp.h" + + +typedef struct { + rtp_s *rtp; + rtp_callback_f callback; + frame_s *sps; // Actually not a frame, just a bytes storage + frame_s *pps; + pthread_mutex_t mutex; +} rtpv_s; + + +rtpv_s *rtpv_init(rtp_callback_f callback); +void rtpv_destroy(rtpv_s *rtpv); + +char *rtpv_make_sdp(rtpv_s *rtpv); +void rtpv_wrap(rtpv_s *rtpv, const frame_s *frame); diff --git a/src/libs/threading.h b/src/libs/threading.h index a283ee2..924136f 100644 --- a/src/libs/threading.h +++ b/src/libs/threading.h @@ -67,7 +67,7 @@ #define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) #define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) #define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) -#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } +#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!(_var)) assert(!pthread_cond_wait(_cond, _mutex)); } #ifdef WITH_PTHREAD_NP diff --git a/src/libs/tools.h b/src/libs/tools.h index b4f1be9..38668c2 100644 --- a/src/libs/tools.h +++ b/src/libs/tools.h @@ -126,13 +126,13 @@ INLINE uint64_t get_now_monotonic_u64(void) { return (uint64_t)(ts.tv_nsec / 1000) + (uint64_t)ts.tv_sec * 1000000; } +#undef X_CLOCK_MONOTONIC + INLINE uint64_t get_now_id(void) { uint64_t now = get_now_monotonic_u64(); return (uint64_t)triple_u32(now) | ((uint64_t)triple_u32(now + 12345) << 32); } -#undef X_CLOCK_MONOTONIC - INLINE long double get_now_real(void) { time_t sec; long msec;