Compare commits

...

16 Commits
v3.24 ... v4.1

Author SHA1 Message Date
Devaev Maxim
767d8ac240 Bump version: 4.0 → 4.1 2021-06-10 18:56:00 +03:00
Devaev Maxim
4648ecba17 fixed install 2021-06-10 18:55:19 +03:00
Devaev Maxim
0a08ca657d Bump version: 3.27 → 4.0 2021-06-10 18:48:00 +03:00
Devaev Maxim
48f35ccd32 janus plugin 2021-06-10 18:42:16 +03:00
Devaev Maxim
3558089a22 get_now_monotonic_u64() 2021-06-08 18:55:52 +03:00
Devaev Maxim
47b735c51f Bump version: 3.26 → 3.27 2021-04-26 20:37:27 +03:00
Maxim Devaev
e60991974e Merge pull request #109 from netbr/master
Option to rotate image for Raspberry Pi Camera Module
2021-04-24 01:31:47 +03:00
netbr
3a16d17f8f Update options.c 2021-04-21 07:57:16 +02:00
netbr
08ddb351c7 Update device.h 2021-04-21 07:53:14 +02:00
netbr
3f251a2459 Update device.c 2021-04-21 07:52:06 +02:00
Devaev Maxim
a97f08eac8 Bump version: 3.25 → 3.26 2021-04-20 19:35:49 +03:00
Devaev Maxim
92ff097d7e show memsink has_client in the /state 2021-04-08 08:42:53 +03:00
Devaev Maxim
0eb0370bd3 moved python sources 2021-04-08 05:19:19 +03:00
Devaev Maxim
b329dfed1f Bump version: 3.24 → 3.25 2021-04-07 00:46:39 +03:00
Devaev Maxim
62dc2e5ad5 Fixed #105: using $(MAKE) 2021-04-06 23:43:32 +03:00
Devaev Maxim
7ea81fa627 fix 2021-04-03 06:42:33 +03:00
40 changed files with 976 additions and 43 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 3.24
current_version = 4.1
parse = (?P<major>\d+)\.(?P<minor>\d+)
serialize =
{major}.{minor}

1
.gitignore vendored
View File

@@ -8,6 +8,7 @@
/src/build/
/src/*.bin
/python/build/
/janus/build/
/ustreamer
/ustreamer-dump
/config.mk

View File

@@ -25,29 +25,38 @@ endef
# =====
all:
+ make apps
+ $(MAKE) apps
ifneq ($(call optbool,$(WITH_PYTHON)),)
+ make python
else
@ true
+ $(MAKE) python
endif
ifneq ($(call optbool,$(WITH_JANUS)),)
+ $(MAKE) janus
endif
apps:
make -C src
$(MAKE) -C src
@ ln -sf src/ustreamer.bin ustreamer
@ ln -sf src/ustreamer-dump.bin ustreamer-dump
python:
make -C python
$(MAKE) -C python
@ ln -sf python/build/lib.*/*.so .
janus:
$(MAKE) -C janus
@ ln -sf janus/*.so .
install: all
make -C src install
$(MAKE) -C src install
ifneq ($(call optbool,$(WITH_PYTHON)),)
make -C python install
$(MAKE) -C python install
endif
ifneq ($(call optbool,$(WITH_JANUS)),)
$(MAKE) -C janus install
endif
mkdir -p $(DESTDIR)$(MANPREFIX)/man1
for man in $(shell ls man); do \
@@ -57,21 +66,21 @@ endif
install-strip: install
make -C src install-strip
$(MAKE) -C src install-strip
regen:
tools/make-jpeg-h.py src/ustreamer/data/blank.jpeg src/ustreamer/data/blank_jpeg.c BLANK
tools/make-html-h.py src/ustreamer/data/index.html src/ustreamer/data/index_html.c INDEX
tools/$(MAKE)-jpeg-h.py src/ustreamer/data/blank.jpeg src/ustreamer/data/blank_jpeg.c BLANK
tools/$(MAKE)-html-h.py src/ustreamer/data/index.html src/ustreamer/data/index_html.c INDEX
release:
make clean
make tox
make push
make bump V=$(V)
make push
make clean
$(MAKE) clean
$(MAKE) tox
$(MAKE) push
$(MAKE) bump V=$(V)
$(MAKE) push
$(MAKE) clean
tox: linters
@@ -110,8 +119,9 @@ clean-all: linters clean
clean:
rm -rf pkg/arch/pkg pkg/arch/src pkg/arch/v*.tar.gz pkg/arch/ustreamer-*.pkg.tar.{xz,zst}
rm -f ustreamer ustreamer-dump *.so
make -C src clean
make -C python clean
$(MAKE) -C src clean
$(MAKE) -C python clean
$(MAKE) -C janus clean
.PHONY: python linters
.PHONY: python janus linters

54
janus/Makefile Normal file
View File

@@ -0,0 +1,54 @@
DESTDIR ?=
PREFIX ?= /usr/local
CC ?= gcc
CFLAGS ?= -O3
LDFLAGS ?=
# =====
_PLUGIN = libjanus_ustreamer.so
_CFLAGS = -fPIC -MD -c -std=c11 -Wall -Wextra -D_GNU_SOURCE $(shell pkg-config --cflags glib-2.0) $(CFLAGS)
_LDFLAGS = -shared -lm -pthread -lrt -ljansson $(shell pkg-config --libs glib-2.0) $(LDFLAGS)
_SRCS = $(shell ls src/*.c)
_BUILD = build
define optbool
$(filter $(shell echo $(1) | tr A-Z a-z), yes on 1)
endef
WITH_PTHREAD_NP ?= 1
ifneq ($(call optbool,$(WITH_PTHREAD_NP)),)
override _CFLAGS += -DWITH_PTHREAD_NP
endif
# =====
$(_PLUGIN): $(_SRCS:%.c=$(_BUILD)/%.o)
$(info == SO $@)
@ $(CC) $^ -o $@ $(_LDFLAGS)
$(_BUILD)/%.o: %.c
$(info -- CC $<)
@ mkdir -p $(dir $@) || true
@ $(CC) $< -o $@ $(_CFLAGS)
install: $(_PLUGIN)
mkdir -p $(DESTDIR)$(PREFIX)/lib/ustreamer/janus
install -m755 $(_PLUGIN) $(DESTDIR)$(PREFIX)/lib/ustreamer/janus/$(PLUGIN)
clean:
rm -rf $(_PLUGIN) $(_BUILD)
_OBJS = $(_SRCS:%.c=$(_BUILD)/%.o)
-include $(_OBJS:%.o=%.d)

1
janus/src/base64.c Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/base64.c

1
janus/src/base64.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/base64.h

1
janus/src/config.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/config.h

1
janus/src/frame.c Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/frame.c

1
janus/src/frame.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/frame.h

1
janus/src/list.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/list.h

1
janus/src/memsinksh.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/memsinksh.h

522
janus/src/plugin.c Normal file
View File

@@ -0,0 +1,522 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPG-HTTP streamer. #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include <stdint.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include <jansson.h>
#include <janus/config.h>
#include <janus/plugins/plugin.h>
#include "config.h"
#include "tools.h"
#include "threading.h"
#include "list.h"
#include "memsinksh.h"
#include "rtp.h"
static int _plugin_init(janus_callbacks *gw, const char *config_file_path);
static void _plugin_destroy(void);
static void _plugin_create_session(janus_plugin_session *session, int *error);
static void _plugin_destroy_session(janus_plugin_session *session, int *error);
static json_t *_plugin_query_session(janus_plugin_session *session);
static void _plugin_setup_media(janus_plugin_session *session);
static void _plugin_hangup_media(janus_plugin_session *session);
static struct janus_plugin_result *_plugin_handle_message(
janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep);
static int _plugin_get_api_compatibility(void);
static int _plugin_get_version(void);
static const char *_plugin_get_version_string(void);
static const char *_plugin_get_description(void);
static const char *_plugin_get_name(void);
static const char *_plugin_get_author(void);
static const char *_plugin_get_package(void);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverride-init"
static janus_plugin _plugin = JANUS_PLUGIN_INIT(
.init = _plugin_init,
.destroy = _plugin_destroy,
.create_session = _plugin_create_session,
.destroy_session = _plugin_destroy_session,
.query_session = _plugin_query_session,
.setup_media = _plugin_setup_media,
.hangup_media = _plugin_hangup_media,
.handle_message = _plugin_handle_message,
.get_api_compatibility = _plugin_get_api_compatibility,
.get_version = _plugin_get_version,
.get_version_string = _plugin_get_version_string,
.get_description = _plugin_get_description,
.get_name = _plugin_get_name,
.get_author = _plugin_get_author,
.get_package = _plugin_get_package,
);
#pragma GCC diagnostic pop
janus_plugin *create(void) { // cppcheck-suppress unusedFunction
return &_plugin;
}
typedef struct _client_sx {
janus_plugin_session *session;
bool transmit;
LIST_STRUCT(struct _client_sx);
} _client_s;
static char *_g_memsink_obj = NULL;
const long double _g_wait_timeout = 1;
const long double _g_lock_timeout = 1;
const useconds_t _g_lock_polling = 1000;
const useconds_t _g_watchers_polling = 100000;
static _client_s *_g_clients = NULL;
static janus_callbacks *_g_gw = NULL;
static rtp_s *_g_rtp = NULL;
static pthread_t _g_tid;
static pthread_mutex_t _g_lock = PTHREAD_MUTEX_INITIALIZER;
static atomic_bool _g_ready = ATOMIC_VAR_INIT(false);
static atomic_bool _g_stop = ATOMIC_VAR_INIT(false);
static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false);
#define JLOG_INFO(_msg, ...) JANUS_LOG(LOG_INFO, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_WARN(_msg, ...) JANUS_LOG(LOG_WARN, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_ERROR(_msg, ...) JANUS_LOG(LOG_ERR, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_PERROR(_msg, ...) { \
char _perror_buf[1024] = {0}; \
char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \
JANUS_LOG(LOG_ERR, "[%s] " _msg ": %s\n", _plugin_get_name(), ##__VA_ARGS__, _perror_ptr); \
}
#define LOCK A_MUTEX_LOCK(&_g_lock)
#define UNLOCK A_MUTEX_UNLOCK(&_g_lock)
#define READY atomic_load(&_g_ready)
#define STOP atomic_load(&_g_stop)
#define HAS_WATCHERS atomic_load(&_g_has_watchers)
static void _relay_rtp_clients(const uint8_t *datagram, size_t size) {
janus_plugin_rtp packet = {
.video = true,
.buffer = (char *)datagram,
.length = size,
};
janus_plugin_rtp_extensions_reset(&packet.extensions);
LIST_ITERATE(_g_clients, client, {
if (client->transmit) {
_g_gw->relay_rtp(client->session, &packet);
}
});
}
static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) {
long double deadline_ts = get_now_monotonic() + _g_wait_timeout;
long double now;
do {
int retval = flock_timedwait_monotonic(fd, _g_lock_timeout);
now = get_now_monotonic();
if (retval < 0 && errno != EWOULDBLOCK) {
JLOG_PERROR("Can't lock memsink");
return -1;
} else if (retval == 0) {
if (mem->magic == MEMSINK_MAGIC && mem->version == MEMSINK_VERSION && mem->id != last_id) {
return 0;
}
if (flock(fd, LOCK_UN) < 0) {
JLOG_PERROR("Can't unlock memsink");
return -1;
}
}
usleep(_g_lock_polling);
} while (now < deadline_ts);
return -2;
}
static int _get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *frame_id) {
frame_set_data(frame, mem->data, mem->used);
FRAME_COPY_META(mem, frame);
*frame_id = mem->id;
mem->last_client_ts = get_now_monotonic();
int retval = 0;
if (frame->format != V4L2_PIX_FMT_H264) {
JLOG_ERROR("Got non-H264 frame from memsink");
retval = -1;
}
if (flock(fd, LOCK_UN) < 0) {
JLOG_PERROR("Can't unlock memsink");
retval = -1;
}
return retval;
}
static void *_clients_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_clients");
atomic_store(&_g_ready, true);
frame_s *frame = frame_init();
uint64_t frame_id = 0;
unsigned error_reported = 0;
# define IF_NOT_REPORTED(_error, ...) { \
if (error_reported != _error) { __VA_ARGS__; error_reported = _error; } \
}
while (!STOP) {
if (!HAS_WATCHERS) {
IF_NOT_REPORTED(1, {
JLOG_INFO("No active watchers, memsink disconnected");
});
usleep(_g_watchers_polling);
continue;
}
int fd = -1;
memsink_shared_s *mem = NULL;
if ((fd = shm_open(_g_memsink_obj, O_RDWR, 0)) <= 0) {
IF_NOT_REPORTED(2, {
JLOG_PERROR("Can't open memsink");
});
goto close_memsink;
}
if ((mem = memsink_shared_map(fd)) == NULL) {
IF_NOT_REPORTED(3, {
JLOG_PERROR("Can't map memsink");
});
goto close_memsink;
}
error_reported = 0;
JLOG_INFO("Memsink opened; reading frames ...");
while (!STOP && HAS_WATCHERS) {
int result = _wait_frame(fd, mem, frame_id);
if (result == 0) {
if (_get_frame(fd, mem, frame, &frame_id) != 0) {
goto close_memsink;
}
LOCK;
rtp_wrap_h264(_g_rtp, frame, _relay_rtp_clients);
UNLOCK;
} else if (result == -1) {
goto close_memsink;
}
}
close_memsink:
if (mem != NULL) {
JLOG_INFO("Memsink closed");
memsink_shared_unmap(mem);
mem = NULL;
}
if (fd > 0) {
close(fd);
fd = -1;
}
sleep(1); // error_delay
}
# undef IF_NOT_REPORTED
frame_destroy(frame);
return NULL;
}
static int _read_config(const char *config_dir_path) {
char *config_file_path;
janus_config *config = NULL;
A_ASPRINTF(config_file_path, "%s/%s.jcfg", config_dir_path, _plugin_get_package());
JLOG_INFO("Reading config file '%s' ...", config_file_path);
config = janus_config_parse(config_file_path);
if (config == NULL) {
JLOG_ERROR("Can't read config");
goto error;
}
janus_config_print(config);
janus_config_category *config_memsink = janus_config_get_create(config, NULL, janus_config_type_category, "memsink");
janus_config_item *config_memsink_obj = janus_config_get(config, config_memsink, janus_config_type_item, "object");
if (config_memsink_obj == NULL || config_memsink_obj->value == NULL || config_memsink_obj->value[0] == '\0') {
JLOG_ERROR("Missing config value: memsink.object");
goto error;
}
_g_memsink_obj = strdup(config_memsink_obj->value);
int retval = 0;
goto ok;
error:
retval = -1;
ok:
if (config) {
janus_config_destroy(config);
}
free(config_file_path);
return retval;
}
static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
// https://groups.google.com/g/meetecho-janus/c/xoWIQfaoJm8
// sysctl -w net.core.rmem_default=500000
// sysctl -w net.core.wmem_default=500000
// sysctl -w net.core.rmem_max=1000000
// sysctl -w net.core.wmem_max=1000000
JLOG_INFO("Initializing plugin ...");
assert(!READY);
assert(!STOP);
if (gw == NULL || config_dir_path == NULL || _read_config(config_dir_path) < 0) {
return -1;
}
_g_gw = gw;
_g_rtp = rtp_init();
A_THREAD_CREATE(&_g_tid, _clients_thread, NULL);
return 0;
}
static void _plugin_destroy(void) {
JLOG_INFO("Destroying plugin ...");
atomic_store(&_g_stop, true);
if (READY) {
A_THREAD_JOIN(_g_tid);
}
LIST_ITERATE(_g_clients, client, {
LIST_REMOVE(_g_clients, client);
free(client);
});
_g_clients = NULL;
rtp_destroy(_g_rtp);
_g_rtp = NULL;
_g_gw = NULL;
if (_g_memsink_obj) {
free(_g_memsink_obj);
_g_memsink_obj = NULL;
}
}
#define IF_DISABLED(...) { if (!READY || STOP) { __VA_ARGS__ } }
static void _plugin_create_session(janus_plugin_session *session, int *error) {
IF_DISABLED({ *error = -1; return; });
LOCK;
JLOG_INFO("Creating session %p ...", session);
_client_s *client;
A_CALLOC(client, 1);
client->session = session;
client->transmit = true;
LIST_APPEND(_g_clients, client);
atomic_store(&_g_has_watchers, true);
UNLOCK;
}
static void _plugin_destroy_session(janus_plugin_session* session, int *error) {
IF_DISABLED({ *error = -1; return; });
LOCK;
bool found = false;
bool has_watchers = false;
LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
JLOG_INFO("Removing session %p ...", session);
LIST_REMOVE(_g_clients, client);
free(client);
found = true;
} else {
has_watchers = (has_watchers || client->transmit);
}
});
if (!found) {
JLOG_WARN("No session %p", session);
*error = -2;
}
atomic_store(&_g_has_watchers, has_watchers);
UNLOCK;
}
static json_t *_plugin_query_session(janus_plugin_session *session) {
IF_DISABLED({ return NULL; });
json_t *info = NULL;
LOCK;
LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
info = json_string("session_found");
break;
}
});
UNLOCK;
return info;
}
static void _set_transmit(janus_plugin_session *session, UNUSED const char *msg, bool transmit) {
IF_DISABLED({ return; });
LOCK;
bool found = false;
bool has_watchers = false;
LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
client->transmit = transmit;
//JLOG_INFO("%s session %p", msg, session);
found = true;
}
has_watchers = (has_watchers || client->transmit);
});
if (!found) {
JLOG_WARN("No session %p", session);
}
atomic_store(&_g_has_watchers, has_watchers);
UNLOCK;
}
#undef IF_DISABLED
static void _plugin_setup_media(janus_plugin_session *session) { _set_transmit(session, "Unmuted", true); }
static void _plugin_hangup_media(janus_plugin_session *session) { _set_transmit(session, "Muted", false); }
static struct janus_plugin_result *_plugin_handle_message(
janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep) {
assert(transaction != NULL);
# define FREE_MSG_JSEP { \
if (msg) json_decref(msg); \
if (jsep) json_decref(jsep); \
}
if (session == NULL || msg == NULL) {
free(transaction);
FREE_MSG_JSEP;
return janus_plugin_result_new(JANUS_PLUGIN_ERROR, (msg ? "No session" : "No message"), NULL);
}
# define PUSH_ERROR(_error, _reason) { \
/*JLOG_ERROR("Message error in session %p: %s", session, _reason);*/ \
json_t *_event = json_object(); \
json_object_set_new(_event, "ustreamer", json_string("event")); \
json_object_set_new(_event, "error_code", json_integer(_error)); \
json_object_set_new(_event, "error", json_string(_reason)); \
_g_gw->push_event(session, &_plugin, transaction, _event, NULL); \
json_decref(_event); \
}
json_t *request_obj = json_object_get(msg, "request");
if (request_obj == NULL) {
PUSH_ERROR(400, "Request missing");
goto ok_wait;
}
const char *request_str = json_string_value(request_obj);
if (!request_str) {
PUSH_ERROR(400, "Request not a string");
goto ok_wait;
}
//JLOG_INFO("Message: %s", request_str);
# define PUSH_STATUS(_status, _jsep) { \
json_t *_event = json_object(); \
json_object_set_new(_event, "ustreamer", json_string("event")); \
json_t *_result = json_object(); \
json_object_set_new(_result, "status", json_string(_status)); \
json_object_set_new(_event, "result", _result); \
_g_gw->push_event(session, &_plugin, transaction, _event, _jsep); \
json_decref(_event); \
}
if (!strcmp(request_str, "start")) {
PUSH_STATUS("started", NULL);
} else if (!strcmp(request_str, "stop")) {
PUSH_STATUS("stopped", NULL);
} else if (!strcmp(request_str, "watch")) {
char *sdp = rtp_make_sdp(_g_rtp);
if (sdp == NULL) {
PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet");
goto ok_wait;
}
//JLOG_INFO("SDP generated:\n%s", sdp);
json_t *offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp);
free(sdp);
PUSH_STATUS("started", offer_jsep);
json_decref(offer_jsep);
} else {
PUSH_ERROR(405, "Not implemented");
}
ok_wait:
FREE_MSG_JSEP;
return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL);
# undef PUSH_ERROR
# undef FREE_MSG_JSEP
}
static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; }
static int _plugin_get_version(void) { return VERSION_U; }
static const char *_plugin_get_version_string(void) { return VERSION; }
static const char *_plugin_get_description(void) { return "Pi-KVM uStreamer Janus plugin for H.264 video"; }
static const char *_plugin_get_name(void) { return "ustreamer"; }
static const char *_plugin_get_author(void) { return "Maxim Devaev <mdevaev@gmail.com>"; }
static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; }
#undef STOP
#undef READY
#undef UNLOCK
#undef LOCK
#undef JLOG_PERROR
#undef JLOG_ERROR
#undef JLOG_WARN
#undef JLOG_INFO

227
janus/src/rtp.c Normal file
View File

@@ -0,0 +1,227 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "rtp.h"
#define PAYLOAD 96 // Payload type
#define PRE 3 // Annex B prefix length
void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, rtp_callback_f callback);
static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked);
static ssize_t _find_annexb(const uint8_t *data, size_t size);
rtp_s *rtp_init(void) {
rtp_s *rtp;
A_CALLOC(rtp, 1);
rtp->ssrc = triple_u32(get_now_monotonic_u64());
rtp->sps = frame_init();
rtp->pps = frame_init();
A_MUTEX_INIT(&rtp->mutex);
return rtp;
}
void rtp_destroy(rtp_s *rtp) {
A_MUTEX_DESTROY(&rtp->mutex);
frame_destroy(rtp->pps);
frame_destroy(rtp->sps);
free(rtp);
}
char *rtp_make_sdp(rtp_s *rtp) {
A_MUTEX_LOCK(&rtp->mutex);
if (rtp->sps->used == 0 || rtp->pps->used == 0) {
A_MUTEX_UNLOCK(&rtp->mutex);
return NULL;
}
char *sps = NULL;
char *pps = NULL;
base64_encode(rtp->sps->data, rtp->sps->used, &sps, NULL);
base64_encode(rtp->pps->data, rtp->pps->used, &pps, NULL);
A_MUTEX_UNLOCK(&rtp->mutex);
// https://tools.ietf.org/html/rfc6184
// https://github.com/meetecho/janus-gateway/issues/2443
char *sdp;
A_ASPRINTF(sdp,
"v=0" RN
"o=- %" PRIu64 " 1 IN IP4 127.0.0.1" RN
"s=Pi-KVM uStreamer" RN
"t=0 0" RN
"m=video 1 RTP/SAVPF %d" RN
"c=IN IP4 0.0.0.0" RN
"a=rtpmap:%d H264/90000" RN
"a=fmtp:%d profile-level-id=42E01F" RN
"a=fmtp:%d packetization-mode=1" RN
"a=fmtp:%d sprop-sps=%s" RN
"a=fmtp:%d sprop-pps=%s" RN
"a=rtcp-fb:%d nack" RN
"a=rtcp-fb:%d nack pli" RN
"a=rtcp-fb:%d goog-remb" RN
"a=sendonly" RN,
get_now_id() >> 1, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD,
PAYLOAD, sps,
PAYLOAD, pps,
PAYLOAD, PAYLOAD, PAYLOAD
);
free(sps);
free(pps);
return sdp;
}
void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback) {
assert(frame->format == V4L2_PIX_FMT_H264);
const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz
ssize_t last_offset = -PRE;
while (true) { // Find and iterate by nalus
const size_t next_start = last_offset + PRE;
ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start);
if (offset < 0) {
break;
}
offset += next_start;
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = offset - last_offset - PRE;
if (data[size - 1] == 0) { // Check for extra 00
--size;
}
_rtp_process_nalu(rtp, data, size, pts, callback);
}
last_offset = offset;
}
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = frame->used - last_offset - PRE;
_rtp_process_nalu(rtp, data, size, pts, callback);
}
}
void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, rtp_callback_f callback) {
const unsigned ref_idc = (data[0] >> 5) & 3;
const unsigned type = data[0] & 0x1F;
bool marked = false; // M=1 if this is an access unit
frame_s *ps = NULL;
switch (type) {
case 1 ... 5: marked = true; break;
case 7: ps = rtp->sps; break;
case 8: ps = rtp->pps; break;
}
if (ps) {
A_MUTEX_LOCK(&rtp->mutex);
frame_set_data(ps, data, size);
A_MUTEX_UNLOCK(&rtp->mutex);
}
# define HEADER_SIZE 12
if (size + HEADER_SIZE <= RTP_DATAGRAM_SIZE) {
_rtp_write_header(rtp, pts, marked);
memcpy(rtp->datagram + HEADER_SIZE, data, size);
callback(rtp->datagram, size + HEADER_SIZE);
return;
}
const size_t fu_overhead = HEADER_SIZE + 2; // FU-A overhead
const uint8_t *src = data + 1;
ssize_t remaining = size - 1;
bool first = true;
while (remaining > 0) {
ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead;
const bool last = (remaining <= frag_size);
if (last) {
frag_size = remaining;
}
_rtp_write_header(rtp, pts, (marked && last));
rtp->datagram[HEADER_SIZE] = 28 | (ref_idc << 5);
uint8_t fu = type;
if (first) {
fu |= 0x80;
}
if (last) {
fu |= 0x40;
}
rtp->datagram[HEADER_SIZE + 1] = fu;
memcpy(rtp->datagram + fu_overhead, src, frag_size);
callback(rtp->datagram, fu_overhead + frag_size);
src += frag_size;
remaining -= frag_size;
first = false;
}
# undef HEADER_SIZE
}
static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) {
uint32_t word0 = 0x80000000;
if (marked) {
word0 |= 1 << 23;
}
word0 |= (PAYLOAD & 0x7F) << 16;
word0 |= rtp->seq;
++rtp->seq;
# define WRITE_BE_U32(_offset, _value) *((uint32_t *)(rtp->datagram + _offset)) = __builtin_bswap32(_value)
WRITE_BE_U32(0, word0);
WRITE_BE_U32(4, pts);
WRITE_BE_U32(8, rtp->ssrc);
# undef WRITE_BE_U32
}
static ssize_t _find_annexb(const uint8_t *data, size_t size) {
// Parses buffer for 00 00 01 start codes
if (size >= PRE) {
for (size_t index = 0; index <= size - PRE; ++index) {
if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) {
return index;
}
}
}
return -1;
}
#undef PRE
#undef PAYLOAD

69
janus/src/rtp.h Normal file
View File

@@ -0,0 +1,69 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <inttypes.h>
#include <string.h>
#include <assert.h>
#include <sys/types.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include "tools.h"
#include "threading.h"
#include "frame.h"
#include "base64.h"
// https://stackoverflow.com/questions/47635545/why-webrtc-chose-rtp-max-packet-size-to-1200-bytes
#define RTP_DATAGRAM_SIZE 1200
typedef struct {
uint32_t ssrc;
uint16_t seq;
uint8_t datagram[RTP_DATAGRAM_SIZE];
frame_s *sps; // Actually not a frame, just a bytes storage
frame_s *pps;
pthread_mutex_t mutex;
} rtp_s;
typedef void (*rtp_callback_f)(const uint8_t *datagram, size_t size);
rtp_s *rtp_init(void);
void rtp_destroy(rtp_s *rtp);
char *rtp_make_sdp(rtp_s *rtp);
void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback);

1
janus/src/threading.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/threading.h

1
janus/src/tools.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/tools.h

View File

@@ -1,3 +1,4 @@
#define CHAR_BIT 8
#define WITH_OMX
#define WITH_GPIO
#define JANUS_PLUGIN_INIT(...) { __VA_ARGS__ }

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer-dump.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER-DUMP 1 "version 3.24" "January 2021"
.TH USTREAMER-DUMP 1 "version 4.1" "January 2021"
.SH NAME
ustreamer-dump \- Dump uStreamer's memory sink to file

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER 1 "version 3.24" "November 2020"
.TH USTREAMER 1 "version 4.1" "November 2020"
.SH NAME
ustreamer \- stream MJPG video from any V4L2 device to the network

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer
pkgver=3.24
pkgver=4.1
pkgrel=1
pkgdesc="Lightweight and fast MJPG-HTTP streamer"
url="https://github.com/pikvm/ustreamer"

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=3.24
PKG_VERSION:=4.1
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -1 +0,0 @@
../src/libs/frame.c

View File

@@ -1 +0,0 @@
../src/libs/frame.h

View File

@@ -1 +0,0 @@
../src/libs/memsinksh.h

View File

@@ -8,7 +8,7 @@ from distutils.core import setup
if __name__ == "__main__":
setup(
name="ustreamer",
version="3.24",
version="4.1",
description="uStreamer tools",
author="Maxim Devaev",
author_email="mdevaev@gmail.com",
@@ -18,8 +18,8 @@ if __name__ == "__main__":
"ustreamer",
libraries=["rt", "m", "pthread"],
undef_macros=["NDEBUG"],
sources=[name for name in os.listdir(".") if name.endswith(".c")],
depends=[name for name in os.listdir(".") if name.endswith(".h")],
sources=["src/" + name for name in os.listdir("src") if name.endswith(".c")],
depends=["src/" + name for name in os.listdir("src") if name.endswith(".h")],
),
],
)

1
python/src/frame.c Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/frame.c

1
python/src/frame.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/frame.h

1
python/src/memsinksh.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/memsinksh.h

1
python/src/tools.h Symbolic link
View File

@@ -0,0 +1 @@
../../src/libs/tools.h

View File

@@ -1 +0,0 @@
../src/libs/tools.h

View File

@@ -22,8 +22,8 @@
#pragma once
#define VERSION_MAJOR 3
#define VERSION_MINOR 24
#define VERSION_MAJOR 4
#define VERSION_MINOR 1
#define MAKE_VERSION2(_major, _minor) #_major "." #_minor
#define MAKE_VERSION1(_major, _minor) MAKE_VERSION2(_major, _minor)

View File

@@ -37,6 +37,7 @@ memsink_s *memsink_init(
sink->timeout = timeout;
sink->fd = -1;
sink->mem = MAP_FAILED;
atomic_init(&sink->has_clients, false);
LOG_INFO("Using %s-sink: %s", name, obj);
@@ -92,16 +93,16 @@ bool memsink_server_check(memsink_s *sink, const frame_s *frame) {
if (flock(sink->fd, LOCK_EX | LOCK_NB) < 0) {
if (errno == EWOULDBLOCK) {
sink->has_clients = true;
atomic_store(&sink->has_clients, true);
return true;
}
LOG_PERROR("%s-sink: Can't lock memory", sink->name);
return false;
}
sink->has_clients = (sink->mem->last_client_ts + sink->client_ttl > get_now_monotonic());
atomic_store(&sink->has_clients, (sink->mem->last_client_ts + sink->client_ttl > get_now_monotonic()));
bool retval = (sink->has_clients || !FRAME_COMPARE_META_USED_NOTS(sink->mem, frame));
bool retval = (atomic_load(&sink->has_clients) || !FRAME_COMPARE_META_USED_NOTS(sink->mem, frame));
if (flock(sink->fd, LOCK_UN) < 0) {
LOG_PERROR("%s-sink: Can't unlock memory", sink->name);
@@ -133,7 +134,7 @@ int memsink_server_put(memsink_s *sink, const frame_s *frame) {
sink->mem->magic = MEMSINK_MAGIC;
sink->mem->version = MEMSINK_VERSION;
sink->has_clients = (sink->mem->last_client_ts + sink->client_ttl > get_now_monotonic());
atomic_store(&sink->has_clients, (sink->mem->last_client_ts + sink->client_ttl > get_now_monotonic()));
if (flock(sink->fd, LOCK_UN) < 0) {
LOG_PERROR("%s-sink: Can't unlock memory", sink->name);

View File

@@ -24,6 +24,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
@@ -51,7 +52,7 @@ typedef struct {
int fd;
memsink_shared_s *mem;
uint64_t last_id;
bool has_clients; // Only for server
atomic_bool has_clients; // Only for server
} memsink_s;

View File

@@ -110,10 +110,14 @@ INLINE long double get_now_monotonic(void) {
return (long double)sec + ((long double)msec) / 1000;
}
INLINE uint64_t get_now_id(void) {
INLINE uint64_t get_now_monotonic_u64(void) {
struct timespec ts;
assert(!clock_gettime(X_CLOCK_MONOTONIC, &ts));
uint64_t now = (uint64_t)(ts.tv_nsec / 1000) + (uint64_t)ts.tv_sec * 1000000;
return (uint64_t)(ts.tv_nsec / 1000) + (uint64_t)ts.tv_sec * 1000000;
}
INLINE uint64_t get_now_id(void) {
uint64_t now = get_now_monotonic_u64();
return (uint64_t)triple_u32(now) | ((uint64_t)triple_u32(now + 12345) << 32);
}

View File

@@ -852,6 +852,7 @@ static void _device_apply_controls(device_s *dev) {
CONTROL_AUTO_CID (V4L2_CID_AUTO_WHITE_BALANCE, V4L2_CID_WHITE_BALANCE_TEMPERATURE, white_balance);
CONTROL_AUTO_CID (V4L2_CID_AUTOGAIN, V4L2_CID_GAIN, gain);
CONTROL_MANUAL_CID ( V4L2_CID_COLORFX, color_effect);
CONTROL_MANUAL_CID ( V4L2_CID_ROTATE, rotate);
CONTROL_MANUAL_CID ( V4L2_CID_VFLIP, flip_vertical);
CONTROL_MANUAL_CID ( V4L2_CID_HFLIP, flip_horizontal);

View File

@@ -122,6 +122,7 @@ typedef struct {
control_s white_balance;
control_s gain;
control_s color_effect;
control_s rotate;
control_s flip_vertical;
control_s flip_horizontal;
} controls_s;

View File

@@ -342,6 +342,7 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
encoder_type_to_string(enc_type),
enc_quality
));
# ifdef WITH_OMX
if (STREAM(run->h264)) {
assert(evbuffer_add_printf(buf,
@@ -352,6 +353,32 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
));
}
# endif
if (
STREAM(sink)
# ifdef WITH_OMX
|| STREAM(h264_sink)
# endif
) {
assert(evbuffer_add_printf(buf, " \"sinks\": {"));
if (STREAM(sink)) {
assert(evbuffer_add_printf(buf,
"\"jpeg\": {\"has_clients\": %s}",
bool_to_string(atomic_load(&STREAM(sink->has_clients)))
));
}
# ifdef WITH_OMX
if (STREAM(h264_sink)) {
assert(evbuffer_add_printf(buf,
"%s\"h264\": {\"has_clients\": %s}",
(STREAM(sink) ? ", " : ""),
bool_to_string(atomic_load(&STREAM(h264_sink->has_clients)))
));
}
# endif
assert(evbuffer_add_printf(buf, "},"));
}
assert(evbuffer_add_printf(buf,
" \"source\": {\"resolution\": {\"width\": %u, \"height\": %u},"
" \"online\": %s, \"desired_fps\": %u, \"captured_fps\": %u},"

View File

@@ -72,6 +72,7 @@ enum _OPT_VALUES {
_O_WHITE_BALANCE,
_O_GAIN,
_O_COLOR_EFFECT,
_O_ROTATE,
_O_FLIP_VERTICAL,
_O_FLIP_HORIZONTAL,
@@ -158,6 +159,7 @@ static const struct option _LONG_OPTS[] = {
{"white-balance", required_argument, NULL, _O_WHITE_BALANCE},
{"gain", required_argument, NULL, _O_GAIN},
{"color-effect", required_argument, NULL, _O_COLOR_EFFECT},
{"rotate", required_argument, NULL, _O_ROTATE},
{"flip-vertical", required_argument, NULL, _O_FLIP_VERTICAL},
{"flip-horizontal", required_argument, NULL, _O_FLIP_HORIZONTAL},
@@ -394,6 +396,7 @@ int options_parse(options_s *options, device_s *dev, encoder_s *enc, stream_s *s
OPT_CTL_DEFAULT_NOBREAK(white_balance);
OPT_CTL_DEFAULT_NOBREAK(gain);
OPT_CTL_DEFAULT_NOBREAK(color_effect);
OPT_CTL_DEFAULT_NOBREAK(rotate);
OPT_CTL_DEFAULT_NOBREAK(flip_vertical);
OPT_CTL_DEFAULT_NOBREAK(flip_horizontal);
break;
@@ -407,6 +410,7 @@ int options_parse(options_s *options, device_s *dev, encoder_s *enc, stream_s *s
case _O_WHITE_BALANCE: OPT_CTL_AUTO(white_balance);
case _O_GAIN: OPT_CTL_AUTO(gain);
case _O_COLOR_EFFECT: OPT_CTL_MANUAL(color_effect);
case _O_ROTATE: OPT_CTL_MANUAL(rotate);
case _O_FLIP_VERTICAL: OPT_CTL_MANUAL(flip_vertical);
case _O_FLIP_HORIZONTAL: OPT_CTL_MANUAL(flip_horizontal);
@@ -637,6 +641,7 @@ static void _help(FILE *fp, device_s *dev, encoder_s *enc, stream_s *stream, ser
SAY(" --white-balance <N|auto|default> ───── Set white balance. Default: no change.\n");
SAY(" --gain <N|auto|default> ────────────── Set gain. Default: no change.\n");
SAY(" --color-effect <N|default> ─────────── Set color effect. Default: no change.\n");
SAY(" --rotate <N|default> ───────────────── Set rotation. Default: no change.\n");
SAY(" --flip-vertical <1|0|default> ──────── Set vertical flip. Default: no change.\n");
SAY(" --flip-horizontal <1|0|default> ────── Set horizontal flip. Default: no change.\n");
SAY(" Hint: use v4l2-ctl --list-ctrls-menus to query available controls of the device.\n");

View File

@@ -136,9 +136,9 @@ void stream_loop(stream_s *stream) {
&& !atomic_load(&RUN(stop))
&& !atomic_load(&RUN(video->has_clients))
// has_clients синков НЕ обновляются в реальном времени
&& (stream->sink == NULL || !stream->sink->has_clients)
&& (stream->sink == NULL || !atomic_load(&stream->sink->has_clients))
# ifdef WITH_OMX
&& (RUN(h264) == NULL || /*RUN(h264->sink) == NULL ||*/ !RUN(h264->sink->has_clients))
&& (RUN(h264) == NULL || /*RUN(h264->sink) == NULL ||*/ !atomic_load(&RUN(h264->sink->has_clients)))
# endif
) {
usleep(100000);