diff --git a/.gitignore b/.gitignore index 31c3e4c..88349ef 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ /src/build/ /src/*.bin /python/build/ +/janus/build/ /ustreamer /ustreamer-dump /config.mk diff --git a/Makefile b/Makefile index 92d7428..7f218ef 100644 --- a/Makefile +++ b/Makefile @@ -29,6 +29,9 @@ all: ifneq ($(call optbool,$(WITH_PYTHON)),) + $(MAKE) python endif +ifneq ($(call optbool,$(WITH_JANUS)),) + + $(MAKE) janus +endif apps: @@ -42,10 +45,18 @@ python: @ ln -sf python/build/lib.*/*.so . +janus: + $(MAKE) -C janus + @ ln -sf janus/*.so . + + install: all $(MAKE) -C src install ifneq ($(call optbool,$(WITH_PYTHON)),) $(MAKE) -C python install +endif +ifneq ($(call optbool,$(WITH_PYTHON)),) + $(MAKE) -C janus install endif mkdir -p $(DESTDIR)$(MANPREFIX)/man1 for man in $(shell ls man); do \ @@ -110,6 +121,7 @@ clean: rm -f ustreamer ustreamer-dump *.so $(MAKE) -C src clean $(MAKE) -C python clean + $(MAKE) -C janus clean -.PHONY: python linters +.PHONY: python janus linters diff --git a/janus/Makefile b/janus/Makefile new file mode 100644 index 0000000..0278d30 --- /dev/null +++ b/janus/Makefile @@ -0,0 +1,54 @@ +DESTDIR ?= +PREFIX ?= /usr/local + +CC ?= gcc +CFLAGS ?= -O3 +LDFLAGS ?= + + +# ===== +_PLUGIN = libjanus_ustreamer.so + +_CFLAGS = -fPIC -MD -c -std=c11 -Wall -Wextra -D_GNU_SOURCE $(shell pkg-config --cflags glib-2.0) $(CFLAGS) +_LDFLAGS = -shared -lm -pthread -lrt -ljansson $(shell pkg-config --libs glib-2.0) $(LDFLAGS) + +_SRCS = $(shell ls src/*.c) + +_BUILD = build + + +define optbool +$(filter $(shell echo $(1) | tr A-Z a-z), yes on 1) +endef + + +WITH_PTHREAD_NP ?= 1 +ifneq ($(call optbool,$(WITH_PTHREAD_NP)),) +override _CFLAGS += -DWITH_PTHREAD_NP +endif + + +# ===== +$(_PLUGIN): $(_SRCS:%.c=$(_BUILD)/%.o) + $(info == SO $@) + @ $(CC) $^ -o $@ $(_LDFLAGS) + + +$(_BUILD)/%.o: %.c + $(info -- CC $<) + @ mkdir -p $(dir $@) || true + @ $(CC) $< -o $@ $(_CFLAGS) + + + +install: $(_PLUGIN) + mkdir -p $(DESTDIR)$(PREFIX)/lib/ustreamer/janus + install -m755 $(_PLUGIN) $(DESTDIR)$(PREFIX)/lib/ustreamer/janus/$(PLUGIN) + + +clean: + rm -rf $(_PLUGIN) $(_BUILD) + + +_OBJS = $(_SRCS:%.c=$(_BUILD)/%.o) +-include $(_OBJS:%.o=%.d) diff --git a/janus/src/base64.c b/janus/src/base64.c new file mode 120000 index 0000000..a911067 --- /dev/null +++ b/janus/src/base64.c @@ -0,0 +1 @@ +../../src/libs/base64.c \ No newline at end of file diff --git a/janus/src/base64.h b/janus/src/base64.h new file mode 120000 index 0000000..1a74653 --- /dev/null +++ b/janus/src/base64.h @@ -0,0 +1 @@ +../../src/libs/base64.h \ No newline at end of file diff --git a/janus/src/config.h b/janus/src/config.h new file mode 120000 index 0000000..4bf94da --- /dev/null +++ b/janus/src/config.h @@ -0,0 +1 @@ +../../src/libs/config.h \ No newline at end of file diff --git a/janus/src/frame.c b/janus/src/frame.c new file mode 120000 index 0000000..3bc454d --- /dev/null +++ b/janus/src/frame.c @@ -0,0 +1 @@ +../../src/libs/frame.c \ No newline at end of file diff --git a/janus/src/frame.h b/janus/src/frame.h new file mode 120000 index 0000000..73ed450 --- /dev/null +++ b/janus/src/frame.h @@ -0,0 +1 @@ +../../src/libs/frame.h \ No newline at end of file diff --git a/janus/src/list.h b/janus/src/list.h new file mode 120000 index 0000000..faaa81a --- /dev/null +++ b/janus/src/list.h @@ -0,0 +1 @@ +../../src/libs/list.h \ No newline at end of file diff --git a/janus/src/memsinksh.h b/janus/src/memsinksh.h new file mode 120000 index 0000000..52d4728 --- /dev/null +++ b/janus/src/memsinksh.h @@ -0,0 +1 @@ +../../src/libs/memsinksh.h \ No newline at end of file diff --git a/janus/src/plugin.c b/janus/src/plugin.c new file mode 100644 index 0000000..abf5422 --- /dev/null +++ b/janus/src/plugin.c @@ -0,0 +1,522 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# Copyright (C) 2018-2021 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include "config.h" +#include "tools.h" +#include "threading.h" +#include "list.h" +#include "memsinksh.h" + +#include "rtp.h" + + +static int _plugin_init(janus_callbacks *gw, const char *config_file_path); +static void _plugin_destroy(void); + +static void _plugin_create_session(janus_plugin_session *session, int *error); +static void _plugin_destroy_session(janus_plugin_session *session, int *error); +static json_t *_plugin_query_session(janus_plugin_session *session); + +static void _plugin_setup_media(janus_plugin_session *session); +static void _plugin_hangup_media(janus_plugin_session *session); + +static struct janus_plugin_result *_plugin_handle_message( + janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep); + +static int _plugin_get_api_compatibility(void); +static int _plugin_get_version(void); +static const char *_plugin_get_version_string(void); +static const char *_plugin_get_description(void); +static const char *_plugin_get_name(void); +static const char *_plugin_get_author(void); +static const char *_plugin_get_package(void); + + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Woverride-init" +static janus_plugin _plugin = JANUS_PLUGIN_INIT( + .init = _plugin_init, + .destroy = _plugin_destroy, + + .create_session = _plugin_create_session, + .destroy_session = _plugin_destroy_session, + .query_session = _plugin_query_session, + + .setup_media = _plugin_setup_media, + .hangup_media = _plugin_hangup_media, + + .handle_message = _plugin_handle_message, + + .get_api_compatibility = _plugin_get_api_compatibility, + .get_version = _plugin_get_version, + .get_version_string = _plugin_get_version_string, + .get_description = _plugin_get_description, + .get_name = _plugin_get_name, + .get_author = _plugin_get_author, + .get_package = _plugin_get_package, +); +#pragma GCC diagnostic pop + +janus_plugin *create(void) { // cppcheck-suppress unusedFunction + return &_plugin; +} + + +typedef struct _client_sx { + janus_plugin_session *session; + bool transmit; + + LIST_STRUCT(struct _client_sx); +} _client_s; + + +static char *_g_memsink_obj = NULL; +const long double _g_wait_timeout = 1; +const long double _g_lock_timeout = 1; +const useconds_t _g_lock_polling = 1000; +const useconds_t _g_watchers_polling = 100000; + +static _client_s *_g_clients = NULL; +static janus_callbacks *_g_gw = NULL; +static rtp_s *_g_rtp = NULL; + +static pthread_t _g_tid; +static pthread_mutex_t _g_lock = PTHREAD_MUTEX_INITIALIZER; +static atomic_bool _g_ready = ATOMIC_VAR_INIT(false); +static atomic_bool _g_stop = ATOMIC_VAR_INIT(false); +static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false); + + +#define JLOG_INFO(_msg, ...) JANUS_LOG(LOG_INFO, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) +#define JLOG_WARN(_msg, ...) JANUS_LOG(LOG_WARN, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) +#define JLOG_ERROR(_msg, ...) JANUS_LOG(LOG_ERR, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__) + +#define JLOG_PERROR(_msg, ...) { \ + char _perror_buf[1024] = {0}; \ + char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \ + JANUS_LOG(LOG_ERR, "[%s] " _msg ": %s\n", _plugin_get_name(), ##__VA_ARGS__, _perror_ptr); \ + } + +#define LOCK A_MUTEX_LOCK(&_g_lock) +#define UNLOCK A_MUTEX_UNLOCK(&_g_lock) +#define READY atomic_load(&_g_ready) +#define STOP atomic_load(&_g_stop) +#define HAS_WATCHERS atomic_load(&_g_has_watchers) + + +static void _relay_rtp_clients(const uint8_t *datagram, size_t size) { + janus_plugin_rtp packet = { + .video = true, + .buffer = (char *)datagram, + .length = size, + }; + janus_plugin_rtp_extensions_reset(&packet.extensions); + LIST_ITERATE(_g_clients, client, { + if (client->transmit) { + _g_gw->relay_rtp(client->session, &packet); + } + }); +} + +static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) { + long double deadline_ts = get_now_monotonic() + _g_wait_timeout; + long double now; + do { + int retval = flock_timedwait_monotonic(fd, _g_lock_timeout); + now = get_now_monotonic(); + if (retval < 0 && errno != EWOULDBLOCK) { + JLOG_PERROR("Can't lock memsink"); + return -1; + } else if (retval == 0) { + if (mem->magic == MEMSINK_MAGIC && mem->version == MEMSINK_VERSION && mem->id != last_id) { + return 0; + } + if (flock(fd, LOCK_UN) < 0) { + JLOG_PERROR("Can't unlock memsink"); + return -1; + } + } + usleep(_g_lock_polling); + } while (now < deadline_ts); + return -2; +} + +static int _get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id) { + frame_set_data(frame, mem->data, mem->used); + FRAME_COPY_META(mem, frame); + *frame_id = mem->id; + mem->last_client_ts = get_now_monotonic(); + int retval = 0; + if (frame->format != V4L2_PIX_FMT_H264) { + JLOG_ERROR("Got non-H264 frame from memsink"); + retval = -1; + } + if (flock(fd, LOCK_UN) < 0) { + JLOG_PERROR("Can't unlock memsink"); + retval = -1; + } + return retval; +} + +static void *_clients_thread(UNUSED void *arg) { + A_THREAD_RENAME("us_clients"); + atomic_store(&_g_ready, true); + + frame_s *frame = frame_init(); + uint64_t frame_id = 0; + + unsigned error_reported = 0; + +# define IF_NOT_REPORTED(_error, ...) { \ + if (error_reported != _error) { __VA_ARGS__; error_reported = _error; } \ + } + + while (!STOP) { + if (!HAS_WATCHERS) { + IF_NOT_REPORTED(1, { + JLOG_INFO("No active watchers, memsink disconnected"); + }); + usleep(_g_watchers_polling); + continue; + } + + int fd = -1; + memsink_shared_s *mem = NULL; + + if ((fd = shm_open(_g_memsink_obj, O_RDWR, 0)) <= 0) { + IF_NOT_REPORTED(2, { + JLOG_PERROR("Can't open memsink"); + }); + goto close_memsink; + } + + if ((mem = memsink_shared_map(fd)) == NULL) { + IF_NOT_REPORTED(3, { + JLOG_PERROR("Can't map memsink"); + }); + goto close_memsink; + } + + error_reported = 0; + + JLOG_INFO("Memsink opened; reading frames ..."); + while (!STOP && HAS_WATCHERS) { + int result = _wait_frame(fd, mem, frame_id); + if (result == 0) { + if (_get_frame(fd, mem, frame, &frame_id) != 0) { + goto close_memsink; + } + LOCK; + rtp_wrap_h264(_g_rtp, frame, _relay_rtp_clients); + UNLOCK; + } else if (result == -1) { + goto close_memsink; + } + } + + close_memsink: + if (mem != NULL) { + JLOG_INFO("Memsink closed"); + memsink_shared_unmap(mem); + mem = NULL; + } + if (fd > 0) { + close(fd); + fd = -1; + } + sleep(1); // error_delay + } + +# undef IF_NOT_REPORTED + + frame_destroy(frame); + return NULL; +} + +static int _read_config(const char *config_dir_path) { + char *config_file_path; + janus_config *config = NULL; + + A_ASPRINTF(config_file_path, "%s/%s.jcfg", config_dir_path, _plugin_get_package()); + JLOG_INFO("Reading config file '%s' ...", config_file_path); + + config = janus_config_parse(config_file_path); + if (config == NULL) { + JLOG_ERROR("Can't read config"); + goto error; + } + janus_config_print(config); + + janus_config_category *config_memsink = janus_config_get_create(config, NULL, janus_config_type_category, "memsink"); + janus_config_item *config_memsink_obj = janus_config_get(config, config_memsink, janus_config_type_item, "object"); + if (config_memsink_obj == NULL || config_memsink_obj->value == NULL || config_memsink_obj->value[0] == '\0') { + JLOG_ERROR("Missing config value: memsink.object"); + goto error; + } + _g_memsink_obj = strdup(config_memsink_obj->value); + + int retval = 0; + goto ok; + error: + retval = -1; + ok: + if (config) { + janus_config_destroy(config); + } + free(config_file_path); + return retval; +} + +static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) { + // https://groups.google.com/g/meetecho-janus/c/xoWIQfaoJm8 + // sysctl -w net.core.rmem_default=500000 + // sysctl -w net.core.wmem_default=500000 + // sysctl -w net.core.rmem_max=1000000 + // sysctl -w net.core.wmem_max=1000000 + + JLOG_INFO("Initializing plugin ..."); + assert(!READY); + assert(!STOP); + if (gw == NULL || config_dir_path == NULL || _read_config(config_dir_path) < 0) { + return -1; + } + _g_gw = gw; + _g_rtp = rtp_init(); + A_THREAD_CREATE(&_g_tid, _clients_thread, NULL); + return 0; +} + +static void _plugin_destroy(void) { + JLOG_INFO("Destroying plugin ..."); + atomic_store(&_g_stop, true); + if (READY) { + A_THREAD_JOIN(_g_tid); + } + + LIST_ITERATE(_g_clients, client, { + LIST_REMOVE(_g_clients, client); + free(client); + }); + _g_clients = NULL; + + rtp_destroy(_g_rtp); + _g_rtp = NULL; + + _g_gw = NULL; + + if (_g_memsink_obj) { + free(_g_memsink_obj); + _g_memsink_obj = NULL; + } +} + +#define IF_DISABLED(...) { if (!READY || STOP) { __VA_ARGS__ } } + +static void _plugin_create_session(janus_plugin_session *session, int *error) { + IF_DISABLED({ *error = -1; return; }); + LOCK; + JLOG_INFO("Creating session %p ...", session); + _client_s *client; + A_CALLOC(client, 1); + client->session = session; + client->transmit = true; + LIST_APPEND(_g_clients, client); + atomic_store(&_g_has_watchers, true); + UNLOCK; +} + +static void _plugin_destroy_session(janus_plugin_session* session, int *error) { + IF_DISABLED({ *error = -1; return; }); + LOCK; + bool found = false; + bool has_watchers = false; + LIST_ITERATE(_g_clients, client, { + if (client->session == session) { + JLOG_INFO("Removing session %p ...", session); + LIST_REMOVE(_g_clients, client); + free(client); + found = true; + } else { + has_watchers = (has_watchers || client->transmit); + } + }); + if (!found) { + JLOG_WARN("No session %p", session); + *error = -2; + } + atomic_store(&_g_has_watchers, has_watchers); + UNLOCK; +} + +static json_t *_plugin_query_session(janus_plugin_session *session) { + IF_DISABLED({ return NULL; }); + json_t *info = NULL; + LOCK; + LIST_ITERATE(_g_clients, client, { + if (client->session == session) { + info = json_string("session_found"); + break; + } + }); + UNLOCK; + return info; +} + +static void _set_transmit(janus_plugin_session *session, UNUSED const char *msg, bool transmit) { + IF_DISABLED({ return; }); + LOCK; + bool found = false; + bool has_watchers = false; + LIST_ITERATE(_g_clients, client, { + if (client->session == session) { + client->transmit = transmit; + //JLOG_INFO("%s session %p", msg, session); + found = true; + } + has_watchers = (has_watchers || client->transmit); + }); + if (!found) { + JLOG_WARN("No session %p", session); + } + atomic_store(&_g_has_watchers, has_watchers); + UNLOCK; +} + +#undef IF_DISABLED + +static void _plugin_setup_media(janus_plugin_session *session) { _set_transmit(session, "Unmuted", true); } +static void _plugin_hangup_media(janus_plugin_session *session) { _set_transmit(session, "Muted", false); } + +static struct janus_plugin_result *_plugin_handle_message( + janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep) { + + assert(transaction != NULL); + +# define FREE_MSG_JSEP { \ + if (msg) json_decref(msg); \ + if (jsep) json_decref(jsep); \ + } + + if (session == NULL || msg == NULL) { + free(transaction); + FREE_MSG_JSEP; + return janus_plugin_result_new(JANUS_PLUGIN_ERROR, (msg ? "No session" : "No message"), NULL); + } + +# define PUSH_ERROR(_error, _reason) { \ + /*JLOG_ERROR("Message error in session %p: %s", session, _reason);*/ \ + json_t *_event = json_object(); \ + json_object_set_new(_event, "ustreamer", json_string("event")); \ + json_object_set_new(_event, "error_code", json_integer(_error)); \ + json_object_set_new(_event, "error", json_string(_reason)); \ + _g_gw->push_event(session, &_plugin, transaction, _event, NULL); \ + json_decref(_event); \ + } + + json_t *request_obj = json_object_get(msg, "request"); + if (request_obj == NULL) { + PUSH_ERROR(400, "Request missing"); + goto ok_wait; + } + + const char *request_str = json_string_value(request_obj); + if (!request_str) { + PUSH_ERROR(400, "Request not a string"); + goto ok_wait; + } + //JLOG_INFO("Message: %s", request_str); + +# define PUSH_STATUS(_status, _jsep) { \ + json_t *_event = json_object(); \ + json_object_set_new(_event, "ustreamer", json_string("event")); \ + json_t *_result = json_object(); \ + json_object_set_new(_result, "status", json_string(_status)); \ + json_object_set_new(_event, "result", _result); \ + _g_gw->push_event(session, &_plugin, transaction, _event, _jsep); \ + json_decref(_event); \ + } + + if (!strcmp(request_str, "start")) { + PUSH_STATUS("started", NULL); + + } else if (!strcmp(request_str, "stop")) { + PUSH_STATUS("stopped", NULL); + + } else if (!strcmp(request_str, "watch")) { + char *sdp = rtp_make_sdp(_g_rtp); + if (sdp == NULL) { + PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet"); + goto ok_wait; + } + //JLOG_INFO("SDP generated:\n%s", sdp); + json_t *offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp); + free(sdp); + PUSH_STATUS("started", offer_jsep); + json_decref(offer_jsep); + + } else { + PUSH_ERROR(405, "Not implemented"); + } + + ok_wait: + FREE_MSG_JSEP; + return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL); + +# undef PUSH_ERROR +# undef FREE_MSG_JSEP +} + +static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; } +static int _plugin_get_version(void) { return VERSION_U; } +static const char *_plugin_get_version_string(void) { return VERSION; } +static const char *_plugin_get_description(void) { return "Pi-KVM uStreamer Janus plugin for H.264 video"; } +static const char *_plugin_get_name(void) { return "ustreamer"; } +static const char *_plugin_get_author(void) { return "Maxim Devaev "; } +static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; } + + +#undef STOP +#undef READY +#undef UNLOCK +#undef LOCK + +#undef JLOG_PERROR +#undef JLOG_ERROR +#undef JLOG_WARN +#undef JLOG_INFO diff --git a/janus/src/rtp.c b/janus/src/rtp.c new file mode 100644 index 0000000..90c0652 --- /dev/null +++ b/janus/src/rtp.c @@ -0,0 +1,227 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2021 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "rtp.h" + + +#define PAYLOAD 96 // Payload type +#define PRE 3 // Annex B prefix length + + +void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, rtp_callback_f callback); +static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked); + +static ssize_t _find_annexb(const uint8_t *data, size_t size); + + +rtp_s *rtp_init(void) { + rtp_s *rtp; + A_CALLOC(rtp, 1); + rtp->ssrc = triple_u32(get_now_monotonic_u64()); + rtp->sps = frame_init(); + rtp->pps = frame_init(); + A_MUTEX_INIT(&rtp->mutex); + return rtp; +} + +void rtp_destroy(rtp_s *rtp) { + A_MUTEX_DESTROY(&rtp->mutex); + frame_destroy(rtp->pps); + frame_destroy(rtp->sps); + free(rtp); +} + +char *rtp_make_sdp(rtp_s *rtp) { + A_MUTEX_LOCK(&rtp->mutex); + + if (rtp->sps->used == 0 || rtp->pps->used == 0) { + A_MUTEX_UNLOCK(&rtp->mutex); + return NULL; + } + + char *sps = NULL; + char *pps = NULL; + base64_encode(rtp->sps->data, rtp->sps->used, &sps, NULL); + base64_encode(rtp->pps->data, rtp->pps->used, &pps, NULL); + + A_MUTEX_UNLOCK(&rtp->mutex); + + // https://tools.ietf.org/html/rfc6184 + // https://github.com/meetecho/janus-gateway/issues/2443 + char *sdp; + A_ASPRINTF(sdp, + "v=0" RN + "o=- %" PRIu64 " 1 IN IP4 127.0.0.1" RN + "s=Pi-KVM uStreamer" RN + "t=0 0" RN + "m=video 1 RTP/SAVPF %d" RN + "c=IN IP4 0.0.0.0" RN + "a=rtpmap:%d H264/90000" RN + "a=fmtp:%d profile-level-id=42E01F" RN + "a=fmtp:%d packetization-mode=1" RN + "a=fmtp:%d sprop-sps=%s" RN + "a=fmtp:%d sprop-pps=%s" RN + "a=rtcp-fb:%d nack" RN + "a=rtcp-fb:%d nack pli" RN + "a=rtcp-fb:%d goog-remb" RN + "a=sendonly" RN, + get_now_id() >> 1, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, + PAYLOAD, sps, + PAYLOAD, pps, + PAYLOAD, PAYLOAD, PAYLOAD + ); + + free(sps); + free(pps); + return sdp; +} + +void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback) { + assert(frame->format == V4L2_PIX_FMT_H264); + + const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz + ssize_t last_offset = -PRE; + + while (true) { // Find and iterate by nalus + const size_t next_start = last_offset + PRE; + ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start); + if (offset < 0) { + break; + } + offset += next_start; + + if (last_offset >= 0) { + const uint8_t *data = frame->data + last_offset + PRE; + size_t size = offset - last_offset - PRE; + if (data[size - 1] == 0) { // Check for extra 00 + --size; + } + _rtp_process_nalu(rtp, data, size, pts, callback); + } + + last_offset = offset; + } + + if (last_offset >= 0) { + const uint8_t *data = frame->data + last_offset + PRE; + size_t size = frame->used - last_offset - PRE; + _rtp_process_nalu(rtp, data, size, pts, callback); + } +} + +void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, rtp_callback_f callback) { + const unsigned ref_idc = (data[0] >> 5) & 3; + const unsigned type = data[0] & 0x1F; + + bool marked = false; // M=1 if this is an access unit + frame_s *ps = NULL; + switch (type) { + case 1 ... 5: marked = true; break; + case 7: ps = rtp->sps; break; + case 8: ps = rtp->pps; break; + } + if (ps) { + A_MUTEX_LOCK(&rtp->mutex); + frame_set_data(ps, data, size); + A_MUTEX_UNLOCK(&rtp->mutex); + } + +# define HEADER_SIZE 12 + + if (size + HEADER_SIZE <= RTP_DATAGRAM_SIZE) { + _rtp_write_header(rtp, pts, marked); + memcpy(rtp->datagram + HEADER_SIZE, data, size); + callback(rtp->datagram, size + HEADER_SIZE); + return; + } + + const size_t fu_overhead = HEADER_SIZE + 2; // FU-A overhead + + const uint8_t *src = data + 1; + ssize_t remaining = size - 1; + + bool first = true; + while (remaining > 0) { + ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead; + const bool last = (remaining <= frag_size); + if (last) { + frag_size = remaining; + } + + _rtp_write_header(rtp, pts, (marked && last)); + + rtp->datagram[HEADER_SIZE] = 28 | (ref_idc << 5); + + uint8_t fu = type; + if (first) { + fu |= 0x80; + } + if (last) { + fu |= 0x40; + } + rtp->datagram[HEADER_SIZE + 1] = fu; + + memcpy(rtp->datagram + fu_overhead, src, frag_size); + + callback(rtp->datagram, fu_overhead + frag_size); + + src += frag_size; + remaining -= frag_size; + first = false; + } + +# undef HEADER_SIZE +} + +static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) { + uint32_t word0 = 0x80000000; + if (marked) { + word0 |= 1 << 23; + } + word0 |= (PAYLOAD & 0x7F) << 16; + word0 |= rtp->seq; + ++rtp->seq; + +# define WRITE_BE_U32(_offset, _value) *((uint32_t *)(rtp->datagram + _offset)) = __builtin_bswap32(_value) + WRITE_BE_U32(0, word0); + WRITE_BE_U32(4, pts); + WRITE_BE_U32(8, rtp->ssrc); +# undef WRITE_BE_U32 +} + +static ssize_t _find_annexb(const uint8_t *data, size_t size) { + // Parses buffer for 00 00 01 start codes + if (size >= PRE) { + for (size_t index = 0; index <= size - PRE; ++index) { + if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) { + return index; + } + } + } + return -1; +} + +#undef PRE +#undef PAYLOAD diff --git a/janus/src/rtp.h b/janus/src/rtp.h new file mode 100644 index 0000000..370c4bd --- /dev/null +++ b/janus/src/rtp.h @@ -0,0 +1,69 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# This source file is partially based on this code: # +# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src # +# # +# Copyright (C) 2018-2021 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "tools.h" +#include "threading.h" +#include "frame.h" +#include "base64.h" + + +// https://stackoverflow.com/questions/47635545/why-webrtc-chose-rtp-max-packet-size-to-1200-bytes +#define RTP_DATAGRAM_SIZE 1200 + + +typedef struct { + uint32_t ssrc; + uint16_t seq; + + uint8_t datagram[RTP_DATAGRAM_SIZE]; + + frame_s *sps; // Actually not a frame, just a bytes storage + frame_s *pps; + + pthread_mutex_t mutex; +} rtp_s; + +typedef void (*rtp_callback_f)(const uint8_t *datagram, size_t size); + + +rtp_s *rtp_init(void); +void rtp_destroy(rtp_s *rtp); + +char *rtp_make_sdp(rtp_s *rtp); +void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback); diff --git a/janus/src/threading.h b/janus/src/threading.h new file mode 120000 index 0000000..44824c3 --- /dev/null +++ b/janus/src/threading.h @@ -0,0 +1 @@ +../../src/libs/threading.h \ No newline at end of file diff --git a/janus/src/tools.h b/janus/src/tools.h new file mode 120000 index 0000000..c2a07b7 --- /dev/null +++ b/janus/src/tools.h @@ -0,0 +1 @@ +../../src/libs/tools.h \ No newline at end of file diff --git a/linters/cppcheck.h b/linters/cppcheck.h index 58af919..1cf2c8b 100644 --- a/linters/cppcheck.h +++ b/linters/cppcheck.h @@ -1,3 +1,4 @@ #define CHAR_BIT 8 #define WITH_OMX #define WITH_GPIO +#define JANUS_PLUGIN_INIT(...) { __VA_ARGS__ }