Compare commits

...

4 Commits
v5.4 ... v5.6

Author SHA1 Message Date
Maxim Devaev
1cff2545b1 Bump version: 5.5 → 5.6 2022-06-02 02:18:13 +03:00
Maxim Devaev
3d994d6e67 fixed deps 2022-06-02 02:15:50 +03:00
Maxim Devaev
7175f8d569 Bump version: 5.4 → 5.5 2022-06-02 01:47:54 +03:00
Maxim Devaev
d4a9862a18 webrtc audio 2022-06-01 07:57:34 +03:00
22 changed files with 1081 additions and 319 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 5.4
current_version = 5.6
parse = (?P<major>\d+)\.(?P<minor>\d+)
serialize =
{major}.{minor}

View File

@@ -10,7 +10,7 @@ LDFLAGS ?=
_PLUGIN = libjanus_ustreamer.so
_CFLAGS = -fPIC -MD -c -std=c11 -Wall -Wextra -D_GNU_SOURCE $(shell pkg-config --cflags glib-2.0) $(CFLAGS)
_LDFLAGS = -shared -lm -pthread -lrt -ljansson $(shell pkg-config --libs glib-2.0) $(LDFLAGS)
_LDFLAGS = -shared -lm -pthread -lrt -ljansson -lopus -lasound $(shell pkg-config --libs glib-2.0) $(LDFLAGS)
_SRCS = $(shell ls src/*.c)

249
janus/src/audio.c Normal file
View File

@@ -0,0 +1,249 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "audio.h"
#define JLOG_PERROR_ALSA(_err, _prefix, _msg, ...) JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, snd_strerror(_err))
#define JLOG_PERROR_OPUS(_err, _prefix, _msg, ...) JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, opus_strerror(_err))
// https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368
#define PCM_BITRATE 48000
#define PCM_FRAMES (6 * PCM_BITRATE / 50) // 120ms
#define PCM_DATA_SIZE (PCM_FRAMES * 2)
#define RAW_DATA_SIZE (PCM_DATA_SIZE * sizeof(opus_int16))
typedef struct {
opus_int16 data[PCM_DATA_SIZE];
} _pcm_buf_s;
typedef struct {
uint8_t data[RAW_DATA_SIZE]; // Worst
size_t used;
uint64_t pts;
} _enc_buf_s;
static void *_pcm_thread(void *v_audio);
static void *_encoder_thread(void *v_audio);
audio_s *audio_init(const char *name) {
audio_s *audio;
A_CALLOC(audio, 1);
audio->pcm_queue = queue_init(8);
audio->enc_queue = queue_init(8);
atomic_init(&audio->run, true);
{
int err;
if ((err = snd_pcm_open(&audio->pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
audio->pcm = NULL;
JLOG_PERROR_ALSA(err, "audio", "Can't open PCM capture");
goto error;
}
assert(!snd_pcm_hw_params_malloc(&audio->pcm_params));
# define SET_PARAM(_msg, _func, ...) { \
if ((err = _func(audio->pcm, audio->pcm_params, ##__VA_ARGS__)) < 0) { \
JLOG_PERROR_ALSA(err, "audio", _msg); \
goto error; \
} \
}
SET_PARAM("Can't initialize PCM params", snd_pcm_hw_params_any);
SET_PARAM("Can't set PCM access type", snd_pcm_hw_params_set_access, SND_PCM_ACCESS_RW_INTERLEAVED);
SET_PARAM("Can't set PCM channels numbre", snd_pcm_hw_params_set_channels, 2);
SET_PARAM("Can't set PCM sampling format", snd_pcm_hw_params_set_format, SND_PCM_FORMAT_S16_LE);
unsigned pcm_bitrate = PCM_BITRATE;
SET_PARAM("Can't set PCM sampling rate", snd_pcm_hw_params_set_rate_near, &pcm_bitrate, 0);
if (pcm_bitrate != PCM_BITRATE) {
JLOG_ERROR("audio", "PCM bitrate mismatch: %u, should be %u", pcm_bitrate, PCM_BITRATE);
goto error;
}
SET_PARAM("Can't apply PCM params", snd_pcm_hw_params);
# undef SET_PARAM
}
{
int err;
// OPUS_APPLICATION_VOIP
// OPUS_APPLICATION_RESTRICTED_LOWDELAY
audio->enc = opus_encoder_create(PCM_BITRATE, 2, OPUS_APPLICATION_AUDIO, &err);
if (err < 0) {
audio->enc = NULL;
JLOG_PERROR_OPUS(err, "audio", "Can't create OPUS encoder");
goto error;
}
# define SET_PARAM(_msg, _ctl) { \
if ((err = opus_encoder_ctl(audio->enc, _ctl)) < 0) { \
JLOG_PERROR_OPUS(err, "audio", _msg); \
goto error; \
} \
}
SET_PARAM("Can't set OPUS bitrate", OPUS_SET_BITRATE(48000));
SET_PARAM("Can't set OPUS max bandwidth", OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND));
SET_PARAM("Can't set OPUS signal type", OPUS_SET_SIGNAL(OPUS_SIGNAL_MUSIC));
// Also see rtpa.c
// SET_PARAM("Can't set OPUS FEC", OPUS_SET_INBAND_FEC(1));
// SET_PARAM("Can't set OPUS exploss", OPUS_SET_PACKET_LOSS_PERC(10));
# undef SET_PARAM
}
JLOG_INFO("audio", "PCM & OPUS prepared; capturing ...");
audio->tids_created = true;
A_THREAD_CREATE(&audio->enc_tid, _encoder_thread, audio);
A_THREAD_CREATE(&audio->pcm_tid, _pcm_thread, audio);
return audio;
error:
audio_destroy(audio);
return NULL;
}
void audio_destroy(audio_s *audio) {
if (audio->tids_created) {
atomic_store(&audio->run, false);
A_THREAD_JOIN(audio->pcm_tid);
A_THREAD_JOIN(audio->enc_tid);
}
if (audio->enc) {
opus_encoder_destroy(audio->enc);
}
if (audio->pcm) {
snd_pcm_close(audio->pcm);
}
if (audio->pcm_params) {
snd_pcm_hw_params_free(audio->pcm_params);
}
# define FREE_QUEUE(_suffix) { \
while (!queue_get_free(audio->_suffix##_queue)) { \
_##_suffix##_buf_s *ptr; \
assert(!queue_get(audio->_suffix##_queue, (void **)&ptr, 1)); \
free(ptr); \
} \
queue_destroy(audio->_suffix##_queue); \
}
FREE_QUEUE(enc);
FREE_QUEUE(pcm);
# undef FREE_QUEUE
if (audio->tids_created) {
JLOG_INFO("audio", "Pipeline closed");
}
free(audio);
}
int audio_copy_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts) {
if (!atomic_load(&audio->run)) {
return -1;
}
_enc_buf_s *in;
if (!queue_get(audio->enc_queue, (void **)&in, 1)) {
if (*size < in->used) {
free(in);
return -3;
}
memcpy(data, in->data, in->used);
*size = in->used;
*pts = in->pts;
free(in);
return 0;
}
return -2;
}
static void *_pcm_thread(void *v_audio) {
A_THREAD_RENAME("us_a_pcm");
audio_s *audio = (audio_s *)v_audio;
while (atomic_load(&audio->run)) {
uint8_t in[RAW_DATA_SIZE];
int frames = snd_pcm_readi(audio->pcm, in, PCM_FRAMES);
if (frames < 0) {
JLOG_PERROR_ALSA(frames, "audio", "Can't capture PCM frames; breaking audio ...");
break;
} else if (frames < PCM_FRAMES) {
JLOG_ERROR("audio", "Too few PCM frames captured; breaking audio ...");
break;
}
if (queue_get_free(audio->pcm_queue)) {
_pcm_buf_s *out;
A_CALLOC(out, 1);
/*for (unsigned index = 0; index < RAW_DATA_SIZE; ++index) {
out->data[index] = (opus_int16)in[index * 2 + 1] << 8 | in[index * 2];
}*/
memcpy(out->data, in, RAW_DATA_SIZE);
assert(!queue_put(audio->pcm_queue, out, 1));
} else {
JLOG_ERROR("audio", "PCM queue is full");
}
}
atomic_store(&audio->run, false);
return NULL;
}
static void *_encoder_thread(void *v_audio) {
A_THREAD_RENAME("us_a_enc");
audio_s *audio = (audio_s *)v_audio;
while (atomic_load(&audio->run)) {
_pcm_buf_s *in;
if (!queue_get(audio->pcm_queue, (void **)&in, 1)) {
_enc_buf_s *out;
A_CALLOC(out, 1);
int size = opus_encode(audio->enc, in->data, PCM_FRAMES, out->data, RAW_DATA_SIZE);
free(in);
if (size < 0) {
JLOG_PERROR_OPUS(size, "audio", "Can't encode PCM frame to OPUS; breaking audio ...");
free(out);
break;
}
out->used = size;
out->pts = audio->pts;
audio->pts += PCM_FRAMES;
if (queue_get_free(audio->enc_queue)) {
assert(!queue_put(audio->enc_queue, out, 1));
} else {
JLOG_ERROR("audio", "OPUS encoder queue is full");
free(out);
}
}
}
atomic_store(&audio->run, false);
return NULL;
}

65
janus/src/audio.h Normal file
View File

@@ -0,0 +1,65 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <assert.h>
#include <sys/types.h>
#include <pthread.h>
#include <alsa/asoundlib.h>
#include <opus/opus.h>
#include "tools.h"
#include "jlogging.h"
#include "threading.h"
#include "queue.h"
typedef struct {
snd_pcm_t *pcm;
snd_pcm_hw_params_t *pcm_params;
OpusEncoder *enc;
queue_s *pcm_queue;
queue_s *enc_queue;
uint32_t pts;
pthread_t pcm_tid;
pthread_t enc_tid;
bool tids_created;
atomic_bool run;
} audio_s;
audio_s *audio_init(const char *name);
void audio_destroy(audio_s *audio);
int audio_copy_encoded(audio_s *audio, uint8_t *data, size_t *size, uint64_t *pts);

36
janus/src/jlogging.h Normal file
View File

@@ -0,0 +1,36 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include <janus/plugins/plugin.h>
#include "tools.h"
#define JLOG_INFO(_prefix, _msg, ...) JANUS_LOG(LOG_INFO, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__)
#define JLOG_WARN(_prefix, _msg, ...) JANUS_LOG(LOG_WARN, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__)
#define JLOG_ERROR(_prefix, _msg, ...) JANUS_LOG(LOG_ERR, "== ustreamer/%-9s -- " _msg "\n", _prefix, ##__VA_ARGS__)
#define JLOG_PERROR(_prefix, _msg, ...) { \
char _perror_buf[1024] = {0}; \
char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \
JANUS_LOG(LOG_ERR, "[ustreamer/%-9s] " _msg ": %s\n", _prefix, ##__VA_ARGS__, _perror_ptr); \
}

View File

@@ -24,6 +24,7 @@
#include <stdbool.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <inttypes.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
@@ -39,18 +40,20 @@
#include "config.h"
#include "tools.h"
#include "jlogging.h"
#include "threading.h"
#include "list.h"
#include "memsinksh.h"
#include "rtp.h"
#include "audio.h"
#include "rtpv.h"
#include "rtpa.h"
static int _plugin_init(janus_callbacks *gw, const char *config_file_path);
static void _plugin_destroy(void);
static void _plugin_create_session(janus_plugin_session *session, int *error);
static void _plugin_destroy_session(janus_plugin_session *session, int *error);
static void _plugin_create_session(janus_plugin_session *session, int *err);
static void _plugin_destroy_session(janus_plugin_session *session, int *err);
static json_t *_plugin_query_session(janus_plugin_session *session);
static void _plugin_setup_media(janus_plugin_session *session);
@@ -59,14 +62,17 @@ static void _plugin_hangup_media(janus_plugin_session *session);
static struct janus_plugin_result *_plugin_handle_message(
janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep);
static int _plugin_get_api_compatibility(void);
static int _plugin_get_version(void);
static const char *_plugin_get_version_string(void);
static const char *_plugin_get_description(void);
static const char *_plugin_get_name(void);
static const char *_plugin_get_author(void);
static const char *_plugin_get_package(void);
static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; }
static int _plugin_get_version(void) { return VERSION_U; }
static const char *_plugin_get_version_string(void) { return VERSION; }
static const char *_plugin_get_description(void) { return "PiKVM uStreamer Janus plugin for H.264 video"; }
static const char *_plugin_get_name(void) { return "ustreamer"; }
static const char *_plugin_get_author(void) { return "Maxim Devaev <mdevaev@gmail.com>"; }
static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; }
// Just a stub to avoid logging spam about the plugin's purpose.
static void _plugin_incoming_rtp(UNUSED janus_plugin_session *handle, UNUSED janus_plugin_rtp *packet) {}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Woverride-init"
@@ -90,6 +96,8 @@ static janus_plugin _plugin = JANUS_PLUGIN_INIT(
.get_name = _plugin_get_name,
.get_author = _plugin_get_author,
.get_package = _plugin_get_package,
.incoming_rtp = _plugin_incoming_rtp,
);
#pragma GCC diagnostic pop
@@ -112,27 +120,24 @@ const long double _g_lock_timeout = 1;
const useconds_t _g_lock_polling = 1000;
const useconds_t _g_watchers_polling = 100000;
static char *_g_audio_dev = NULL;
static _client_s *_g_clients = NULL;
static janus_callbacks *_g_gw = NULL;
static rtp_s *_g_rtp = NULL;
static rtpv_s *_g_rtpv = NULL;
static rtpa_s *_g_rtpa = NULL;
static pthread_t _g_video_tid;
static atomic_bool _g_video_tid_created = ATOMIC_VAR_INIT(false);
static pthread_t _g_audio_tid;
static atomic_bool _g_audio_tid_created = ATOMIC_VAR_INIT(false);
static pthread_t _g_tid;
static pthread_mutex_t _g_lock = PTHREAD_MUTEX_INITIALIZER;
static atomic_bool _g_ready = ATOMIC_VAR_INIT(false);
static atomic_bool _g_stop = ATOMIC_VAR_INIT(false);
static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false);
#define JLOG_INFO(_msg, ...) JANUS_LOG(LOG_INFO, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_WARN(_msg, ...) JANUS_LOG(LOG_WARN, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_ERROR(_msg, ...) JANUS_LOG(LOG_ERR, "== %s -- " _msg "\n", _plugin_get_name(), ##__VA_ARGS__)
#define JLOG_PERROR(_msg, ...) { \
char _perror_buf[1024] = {0}; \
char *_perror_ptr = errno_to_string(errno, _perror_buf, 1023); \
JANUS_LOG(LOG_ERR, "[%s] " _msg ": %s\n", _plugin_get_name(), ##__VA_ARGS__, _perror_ptr); \
}
#define LOCK A_MUTEX_LOCK(&_g_lock)
#define UNLOCK A_MUTEX_UNLOCK(&_g_lock)
#define READY atomic_load(&_g_ready)
@@ -140,20 +145,6 @@ static atomic_bool _g_has_watchers = ATOMIC_VAR_INIT(false);
#define HAS_WATCHERS atomic_load(&_g_has_watchers)
static void _relay_rtp_clients(const uint8_t *datagram, size_t size) {
janus_plugin_rtp packet = {
.video = true,
.buffer = (char *)datagram,
.length = size,
};
janus_plugin_rtp_extensions_reset(&packet.extensions);
LIST_ITERATE(_g_clients, client, {
if (client->transmit) {
_g_gw->relay_rtp(client->session, &packet);
}
});
}
static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) {
long double deadline_ts = get_now_monotonic() + _g_wait_timeout;
long double now;
@@ -161,14 +152,14 @@ static int _wait_frame(int fd, memsink_shared_s* mem, uint64_t last_id) {
int retval = flock_timedwait_monotonic(fd, _g_lock_timeout);
now = get_now_monotonic();
if (retval < 0 && errno != EWOULDBLOCK) {
JLOG_PERROR("Can't lock memsink");
JLOG_PERROR("video", "Can't lock memsink");
return -1;
} else if (retval == 0) {
if (mem->magic == MEMSINK_MAGIC && mem->version == MEMSINK_VERSION && mem->id != last_id) {
return 0;
}
if (flock(fd, LOCK_UN) < 0) {
JLOG_PERROR("Can't unlock memsink");
JLOG_PERROR("video", "Can't unlock memsink");
return -1;
}
}
@@ -184,18 +175,32 @@ static int _get_frame(int fd, memsink_shared_s *mem, frame_s *frame, uint64_t *f
mem->last_client_ts = get_now_monotonic();
int retval = 0;
if (frame->format != V4L2_PIX_FMT_H264) {
JLOG_ERROR("Got non-H264 frame from memsink");
JLOG_ERROR("video", "Got non-H264 frame from memsink");
retval = -1;
}
if (flock(fd, LOCK_UN) < 0) {
JLOG_PERROR("Can't unlock memsink");
JLOG_PERROR("video", "Can't unlock memsink");
retval = -1;
}
return retval;
}
static void *_clients_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_clients");
static void _relay_rtp_clients(const rtp_s *rtp) {
janus_plugin_rtp packet = {0};
packet.video = rtp->video;
packet.buffer = (char *)rtp->datagram;
packet.length = rtp->used;
janus_plugin_rtp_extensions_reset(&packet.extensions);
LIST_ITERATE(_g_clients, client, {
if (client->transmit) {
_g_gw->relay_rtp(client->session, &packet);
}
});
}
static void *_clients_video_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_v_clients");
atomic_store(&_g_video_tid_created, true);
atomic_store(&_g_ready, true);
frame_s *frame = frame_init();
@@ -209,9 +214,7 @@ static void *_clients_thread(UNUSED void *arg) {
while (!STOP) {
if (!HAS_WATCHERS) {
IF_NOT_REPORTED(1, {
JLOG_INFO("No active watchers, memsink disconnected");
});
IF_NOT_REPORTED(1, { JLOG_INFO("video", "No active watchers, memsink disconnected"); });
usleep(_g_watchers_polling);
continue;
}
@@ -220,22 +223,18 @@ static void *_clients_thread(UNUSED void *arg) {
memsink_shared_s *mem = NULL;
if ((fd = shm_open(_g_memsink_obj, O_RDWR, 0)) <= 0) {
IF_NOT_REPORTED(2, {
JLOG_PERROR("Can't open memsink");
});
IF_NOT_REPORTED(2, { JLOG_PERROR("video", "Can't open memsink"); });
goto close_memsink;
}
if ((mem = memsink_shared_map(fd)) == NULL) {
IF_NOT_REPORTED(3, {
JLOG_PERROR("Can't map memsink");
});
IF_NOT_REPORTED(3, { JLOG_PERROR("video", "Can't map memsink"); });
goto close_memsink;
}
error_reported = 0;
JLOG_INFO("Memsink opened; reading frames ...");
JLOG_INFO("video", "Memsink opened; reading frames ...");
while (!STOP && HAS_WATCHERS) {
int result = _wait_frame(fd, mem, frame_id);
if (result == 0) {
@@ -243,7 +242,7 @@ static void *_clients_thread(UNUSED void *arg) {
goto close_memsink;
}
LOCK;
rtp_wrap_h264(_g_rtp, frame, _relay_rtp_clients);
rtpv_wrap(_g_rtpv, frame);
UNLOCK;
} else if (result == -1) {
goto close_memsink;
@@ -252,7 +251,7 @@ static void *_clients_thread(UNUSED void *arg) {
close_memsink:
if (mem != NULL) {
JLOG_INFO("Memsink closed");
JLOG_INFO("video", "Memsink closed");
memsink_shared_unmap(mem);
mem = NULL;
}
@@ -269,27 +268,75 @@ static void *_clients_thread(UNUSED void *arg) {
return NULL;
}
static void *_clients_audio_thread(UNUSED void *arg) {
A_THREAD_RENAME("us_a_clients");
atomic_store(&_g_audio_tid_created, true);
assert(_g_audio_dev);
while (!STOP) {
if (!HAS_WATCHERS) {
usleep(_g_watchers_polling);
continue;
}
audio_s *audio = NULL;
if ((audio = audio_init(_g_audio_dev)) == NULL) {
goto close_audio;
}
while (!STOP && HAS_WATCHERS) {
size_t size = RTP_DATAGRAM_SIZE - RTP_HEADER_SIZE;
uint8_t data[size];
uint64_t pts;
int result = audio_copy_encoded(audio, data, &size, &pts);
if (result == 0) {
LOCK;
rtpa_wrap(_g_rtpa, data, size, pts);
UNLOCK;
} else if (result == -1) {
goto close_audio;
}
}
close_audio:
if (audio != NULL) {
audio_destroy(audio);
}
sleep(1); // error_delay
}
return NULL;
}
static char *_get_config_value(janus_config *config, const char *section, const char *option) {
janus_config_category *section_obj = janus_config_get_create(config, NULL, janus_config_type_category, section);
janus_config_item *option_obj = janus_config_get(config, section_obj, janus_config_type_item, option);
if (option_obj == NULL || option_obj->value == NULL || option_obj->value[0] == '\0') {
return NULL;
}
return strdup(option_obj->value);
}
static int _read_config(const char *config_dir_path) {
char *config_file_path;
janus_config *config = NULL;
A_ASPRINTF(config_file_path, "%s/%s.jcfg", config_dir_path, _plugin_get_package());
JLOG_INFO("Reading config file '%s' ...", config_file_path);
JLOG_INFO("main", "Reading config file '%s' ...", config_file_path);
config = janus_config_parse(config_file_path);
if (config == NULL) {
JLOG_ERROR("Can't read config");
JLOG_ERROR("main", "Can't read config");
goto error;
}
janus_config_print(config);
janus_config_category *config_memsink = janus_config_get_create(config, NULL, janus_config_type_category, "memsink");
janus_config_item *config_memsink_obj = janus_config_get(config, config_memsink, janus_config_type_item, "object");
if (config_memsink_obj == NULL || config_memsink_obj->value == NULL || config_memsink_obj->value[0] == '\0') {
JLOG_ERROR("Missing config value: memsink.object");
if ((_g_memsink_obj = _get_config_value(config, "memsink", "object")) == NULL) {
JLOG_ERROR("main", "Missing config value: memsink.object");
goto error;
}
_g_memsink_obj = strdup(config_memsink_obj->value);
if ((_g_audio_dev = _get_config_value(config, "audio", "device")) != NULL) {
JLOG_INFO("main", "Enabled the experimental AUDIO feature");
}
int retval = 0;
goto ok;
@@ -310,23 +357,32 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
// sysctl -w net.core.rmem_max=1000000
// sysctl -w net.core.wmem_max=1000000
JLOG_INFO("Initializing plugin ...");
JLOG_INFO("main", "Initializing plugin ...");
assert(!atomic_load(&_g_video_tid_created));
assert(!atomic_load(&_g_audio_tid_created));
assert(!READY);
assert(!STOP);
if (gw == NULL || config_dir_path == NULL || _read_config(config_dir_path) < 0) {
return -1;
}
_g_gw = gw;
_g_rtp = rtp_init();
A_THREAD_CREATE(&_g_tid, _clients_thread, NULL);
_g_rtpv = rtpv_init(_relay_rtp_clients);
if (_g_audio_dev) {
_g_rtpa = rtpa_init(_relay_rtp_clients);
A_THREAD_CREATE(&_g_audio_tid, _clients_audio_thread, NULL);
}
A_THREAD_CREATE(&_g_video_tid, _clients_video_thread, NULL);
return 0;
}
static void _plugin_destroy(void) {
JLOG_INFO("Destroying plugin ...");
JLOG_INFO("main", "Destroying plugin ...");
atomic_store(&_g_stop, true);
if (READY) {
A_THREAD_JOIN(_g_tid);
if (atomic_load(&_g_video_tid_created)) {
A_THREAD_JOIN(_g_video_tid);
}
if (atomic_load(&_g_audio_tid_created)) {
A_THREAD_JOIN(_g_audio_tid);
}
LIST_ITERATE(_g_clients, client, {
@@ -335,11 +391,21 @@ static void _plugin_destroy(void) {
});
_g_clients = NULL;
rtp_destroy(_g_rtp);
_g_rtp = NULL;
if (_g_rtpa) {
rtpa_destroy(_g_rtpa);
_g_rtpa = NULL;
}
rtpv_destroy(_g_rtpv);
_g_rtpv = NULL;
_g_gw = NULL;
if (_g_audio_dev) {
free(_g_audio_dev);
_g_audio_dev = NULL;
}
if (_g_memsink_obj) {
free(_g_memsink_obj);
_g_memsink_obj = NULL;
@@ -348,10 +414,10 @@ static void _plugin_destroy(void) {
#define IF_DISABLED(...) { if (!READY || STOP) { __VA_ARGS__ } }
static void _plugin_create_session(janus_plugin_session *session, int *error) {
IF_DISABLED({ *error = -1; return; });
static void _plugin_create_session(janus_plugin_session *session, int *err) {
IF_DISABLED({ *err = -1; return; });
LOCK;
JLOG_INFO("Creating session %p ...", session);
JLOG_INFO("main", "Creating session %p ...", session);
_client_s *client;
A_CALLOC(client, 1);
client->session = session;
@@ -361,14 +427,14 @@ static void _plugin_create_session(janus_plugin_session *session, int *error) {
UNLOCK;
}
static void _plugin_destroy_session(janus_plugin_session* session, int *error) {
IF_DISABLED({ *error = -1; return; });
static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
IF_DISABLED({ *err = -1; return; });
LOCK;
bool found = false;
bool has_watchers = false;
LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
JLOG_INFO("Removing session %p ...", session);
JLOG_INFO("main", "Removing session %p ...", session);
LIST_REMOVE(_g_clients, client);
free(client);
found = true;
@@ -377,8 +443,8 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *error) {
}
});
if (!found) {
JLOG_WARN("No session %p", session);
*error = -2;
JLOG_WARN("main", "No session %p", session);
*err = -2;
}
atomic_store(&_g_has_watchers, has_watchers);
UNLOCK;
@@ -406,13 +472,13 @@ static void _set_transmit(janus_plugin_session *session, UNUSED const char *msg,
LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
client->transmit = transmit;
//JLOG_INFO("%s session %p", msg, session);
// JLOG_INFO("main", "%s session %p", msg, session);
found = true;
}
has_watchers = (has_watchers || client->transmit);
});
if (!found) {
JLOG_WARN("No session %p", session);
JLOG_WARN("main", "No session %p", session);
}
atomic_store(&_g_has_watchers, has_watchers);
UNLOCK;
@@ -440,7 +506,7 @@ static struct janus_plugin_result *_plugin_handle_message(
}
# define PUSH_ERROR(_error, _reason) { \
/*JLOG_ERROR("Message error in session %p: %s", session, _reason);*/ \
/*JLOG_ERROR("main", "Message error in session %p: %s", session, _reason);*/ \
json_t *_event = json_object(); \
json_object_set_new(_event, "ustreamer", json_string("event")); \
json_object_set_new(_event, "error_code", json_integer(_error)); \
@@ -460,7 +526,7 @@ static struct janus_plugin_result *_plugin_handle_message(
PUSH_ERROR(400, "Request not a string");
goto ok_wait;
}
//JLOG_INFO("Message: %s", request_str);
// JLOG_INFO("main", "Message: %s", request_str);
# define PUSH_STATUS(_status, _jsep) { \
json_t *_event = json_object(); \
@@ -479,12 +545,25 @@ static struct janus_plugin_result *_plugin_handle_message(
PUSH_STATUS("stopped", NULL);
} else if (!strcmp(request_str, "watch")) {
char *sdp = rtp_make_sdp(_g_rtp);
if (sdp == NULL) {
PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet");
goto ok_wait;
char *sdp;
{
char *video_sdp = rtpv_make_sdp(_g_rtpv);
if (video_sdp == NULL) {
PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet");
goto ok_wait;
}
char *audio_sdp = (_g_rtpa ? rtpa_make_sdp(_g_rtpa) : strdup(""));
A_ASPRINTF(sdp,
"v=0" RN
"o=- %" PRIu64 " 1 IN IP4 0.0.0.0" RN
"s=PiKVM uStreamer" RN
"t=0 0" RN
"%s%s",
get_now_id() >> 1, audio_sdp, video_sdp
);
free(audio_sdp);
free(video_sdp);
}
//JLOG_INFO("SDP generated:\n%s", sdp);
json_t *offer_jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp);
free(sdp);
PUSH_STATUS("started", offer_jsep);
@@ -501,22 +580,3 @@ static struct janus_plugin_result *_plugin_handle_message(
# undef PUSH_ERROR
# undef FREE_MSG_JSEP
}
static int _plugin_get_api_compatibility(void) { return JANUS_PLUGIN_API_VERSION; }
static int _plugin_get_version(void) { return VERSION_U; }
static const char *_plugin_get_version_string(void) { return VERSION; }
static const char *_plugin_get_description(void) { return "Pi-KVM uStreamer Janus plugin for H.264 video"; }
static const char *_plugin_get_name(void) { return "ustreamer"; }
static const char *_plugin_get_author(void) { return "Maxim Devaev <mdevaev@gmail.com>"; }
static const char *_plugin_get_package(void) { return "janus.plugin.ustreamer"; }
#undef STOP
#undef READY
#undef UNLOCK
#undef LOCK
#undef JLOG_PERROR
#undef JLOG_ERROR
#undef JLOG_WARN
#undef JLOG_INFO

91
janus/src/queue.c Normal file
View File

@@ -0,0 +1,91 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "queue.h"
queue_s *queue_init(unsigned capacity) {
queue_s *queue;
A_CALLOC(queue, 1);
A_CALLOC(queue->items, capacity);
queue->capacity = capacity;
A_MUTEX_INIT(&queue->mutex);
pthread_condattr_t attrs;
assert(!pthread_condattr_init(&attrs));
assert(!pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC));
assert(!pthread_cond_init(&queue->full_cond, &attrs));
assert(!pthread_cond_init(&queue->empty_cond, &attrs));
assert(!pthread_condattr_destroy(&attrs));
return queue;
}
void queue_destroy(queue_s *queue) {
free(queue->items);
free(queue);
}
#define WAIT_LOOP(_var, _cond) { \
struct timespec ts; \
assert(!clock_gettime(CLOCK_MONOTONIC, &ts)); \
ts.tv_sec += timeout; \
while (_var) { \
int err = pthread_cond_timedwait(_cond, &queue->mutex, &ts); \
if (err == ETIMEDOUT) { \
return -1; \
} \
assert(!err); \
} \
}
int queue_put(queue_s *queue, void *item, unsigned timeout) {
A_MUTEX_LOCK(&queue->mutex);
WAIT_LOOP(queue->size == queue->capacity, &queue->full_cond);
queue->items[queue->in] = item;
++queue->size;
++queue->in;
queue->in %= queue->capacity;
A_MUTEX_UNLOCK(&queue->mutex);
assert(!pthread_cond_broadcast(&queue->empty_cond));
return 0;
}
int queue_get(queue_s *queue, void **item, unsigned timeout) {
A_MUTEX_LOCK(&queue->mutex);
WAIT_LOOP(queue->size == 0, &queue->empty_cond);
*item = queue->items[queue->out];
--queue->size;
++queue->out;
queue->out %= queue->capacity;
A_MUTEX_UNLOCK(&queue->mutex);
assert(!pthread_cond_broadcast(&queue->full_cond));
return 0;
}
#undef WAIT_LOOP
int queue_get_free(queue_s *queue) {
A_MUTEX_LOCK(&queue->mutex);
unsigned size = queue->size;
A_MUTEX_UNLOCK(&queue->mutex);
return queue->capacity - size;
}

54
janus/src/queue.h Normal file
View File

@@ -0,0 +1,54 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <errno.h>
#include <time.h>
#include <pthread.h>
#include "tools.h"
#include "threading.h"
// Based on https://github.com/seifzadeh/c-pthread-queue/blob/master/queue.h
typedef struct {
void **items;
unsigned size;
unsigned capacity;
unsigned in;
unsigned out;
pthread_mutex_t mutex;
pthread_cond_t full_cond;
pthread_cond_t empty_cond;
} queue_s;
queue_s *queue_init(unsigned capacity);
void queue_destroy(queue_s *queue);
int queue_put(queue_s *queue, void *item, unsigned timeout);
int queue_get(queue_s *queue, void **item, unsigned timeout);
int queue_get_free(queue_s *queue);

View File

@@ -26,182 +26,25 @@
#include "rtp.h"
#define PAYLOAD 96 // Payload type
#define PRE 3 // Annex B prefix length
void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, bool marked, rtp_callback_f callback);
static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked);
static ssize_t _find_annexb(const uint8_t *data, size_t size);
rtp_s *rtp_init(void) {
rtp_s *rtp_init(unsigned payload, bool video) {
rtp_s *rtp;
A_CALLOC(rtp, 1);
rtp->payload = payload;
rtp->video = video;
rtp->ssrc = triple_u32(get_now_monotonic_u64());
rtp->sps = frame_init();
rtp->pps = frame_init();
A_MUTEX_INIT(&rtp->mutex);
return rtp;
}
void rtp_destroy(rtp_s *rtp) {
A_MUTEX_DESTROY(&rtp->mutex);
frame_destroy(rtp->pps);
frame_destroy(rtp->sps);
free(rtp);
}
char *rtp_make_sdp(rtp_s *rtp) {
A_MUTEX_LOCK(&rtp->mutex);
if (rtp->sps->used == 0 || rtp->pps->used == 0) {
A_MUTEX_UNLOCK(&rtp->mutex);
return NULL;
}
char *sps = NULL;
char *pps = NULL;
base64_encode(rtp->sps->data, rtp->sps->used, &sps, NULL);
base64_encode(rtp->pps->data, rtp->pps->used, &pps, NULL);
A_MUTEX_UNLOCK(&rtp->mutex);
// https://tools.ietf.org/html/rfc6184
// https://github.com/meetecho/janus-gateway/issues/2443
char *sdp;
A_ASPRINTF(sdp,
"v=0" RN
"o=- %" PRIu64 " 1 IN IP4 127.0.0.1" RN
"s=Pi-KVM uStreamer" RN
"t=0 0" RN
"m=video 1 RTP/SAVPF %d" RN
"c=IN IP4 0.0.0.0" RN
"a=rtpmap:%d H264/90000" RN
"a=fmtp:%d profile-level-id=42E01F" RN
"a=fmtp:%d packetization-mode=1" RN
"a=fmtp:%d sprop-sps=%s" RN
"a=fmtp:%d sprop-pps=%s" RN
"a=rtcp-fb:%d nack" RN
"a=rtcp-fb:%d nack pli" RN
"a=rtcp-fb:%d goog-remb" RN
"a=sendonly" RN,
get_now_id() >> 1, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD,
PAYLOAD, sps,
PAYLOAD, pps,
PAYLOAD, PAYLOAD, PAYLOAD
);
free(sps);
free(pps);
return sdp;
}
void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback) {
// There is a complicated logic here but everything works as it should:
// - https://github.com/pikvm/ustreamer/issues/115#issuecomment-893071775
assert(frame->format == V4L2_PIX_FMT_H264);
const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz
ssize_t last_offset = -PRE;
while (true) { // Find and iterate by nalus
const size_t next_start = last_offset + PRE;
ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start);
if (offset < 0) {
break;
}
offset += next_start;
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = offset - last_offset - PRE;
if (data[size - 1] == 0) { // Check for extra 00
--size;
}
_rtp_process_nalu(rtp, data, size, pts, false, callback);
}
last_offset = offset;
}
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = frame->used - last_offset - PRE;
_rtp_process_nalu(rtp, data, size, pts, true, callback);
}
}
void _rtp_process_nalu(rtp_s *rtp, const uint8_t *data, size_t size, uint32_t pts, bool marked, rtp_callback_f callback) {
const unsigned ref_idc = (data[0] >> 5) & 3;
const unsigned type = data[0] & 0x1F;
frame_s *ps = NULL;
switch (type) {
case 7: ps = rtp->sps; break;
case 8: ps = rtp->pps; break;
}
if (ps) {
A_MUTEX_LOCK(&rtp->mutex);
frame_set_data(ps, data, size);
A_MUTEX_UNLOCK(&rtp->mutex);
}
# define HEADER_SIZE 12
if (size + HEADER_SIZE <= RTP_DATAGRAM_SIZE) {
_rtp_write_header(rtp, pts, marked);
memcpy(rtp->datagram + HEADER_SIZE, data, size);
callback(rtp->datagram, size + HEADER_SIZE);
return;
}
const size_t fu_overhead = HEADER_SIZE + 2; // FU-A overhead
const uint8_t *src = data + 1;
ssize_t remaining = size - 1;
bool first = true;
while (remaining > 0) {
ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead;
const bool last = (remaining <= frag_size);
if (last) {
frag_size = remaining;
}
_rtp_write_header(rtp, pts, (marked && last));
rtp->datagram[HEADER_SIZE] = 28 | (ref_idc << 5);
uint8_t fu = type;
if (first) {
fu |= 0x80;
}
if (last) {
fu |= 0x40;
}
rtp->datagram[HEADER_SIZE + 1] = fu;
memcpy(rtp->datagram + fu_overhead, src, frag_size);
callback(rtp->datagram, fu_overhead + frag_size);
src += frag_size;
remaining -= frag_size;
first = false;
}
# undef HEADER_SIZE
}
static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) {
void rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) {
uint32_t word0 = 0x80000000;
if (marked) {
word0 |= 1 << 23;
}
word0 |= (PAYLOAD & 0x7F) << 16;
word0 |= (rtp->payload & 0x7F) << 16;
word0 |= rtp->seq;
++rtp->seq;
@@ -211,18 +54,3 @@ static void _rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked) {
WRITE_BE_U32(8, rtp->ssrc);
# undef WRITE_BE_U32
}
static ssize_t _find_annexb(const uint8_t *data, size_t size) {
// Parses buffer for 00 00 01 start codes
if (size >= PRE) {
for (size_t index = 0; index <= size - PRE; ++index) {
if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) {
return index;
}
}
}
return -1;
}
#undef PRE
#undef PAYLOAD

View File

@@ -26,44 +26,33 @@
#pragma once
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <inttypes.h>
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include <sys/types.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include "tools.h"
#include "threading.h"
#include "frame.h"
#include "base64.h"
// https://stackoverflow.com/questions/47635545/why-webrtc-chose-rtp-max-packet-size-to-1200-bytes
#define RTP_DATAGRAM_SIZE 1200
#define RTP_DATAGRAM_SIZE 1200
#define RTP_HEADER_SIZE 12
typedef struct {
uint32_t ssrc;
uint16_t seq;
unsigned payload;
bool video;
uint32_t ssrc;
uint8_t datagram[RTP_DATAGRAM_SIZE];
frame_s *sps; // Actually not a frame, just a bytes storage
frame_s *pps;
pthread_mutex_t mutex;
uint16_t seq;
uint8_t datagram[RTP_DATAGRAM_SIZE];
size_t used;
} rtp_s;
typedef void (*rtp_callback_f)(const uint8_t *datagram, size_t size);
typedef void (*rtp_callback_f)(const rtp_s *rtp);
rtp_s *rtp_init(void);
rtp_s *rtp_init(unsigned payload, bool video);
void rtp_destroy(rtp_s *rtp);
char *rtp_make_sdp(rtp_s *rtp);
void rtp_wrap_h264(rtp_s *rtp, const frame_s *frame, rtp_callback_f callback);
void rtp_write_header(rtp_s *rtp, uint32_t pts, bool marked);

69
janus/src/rtpa.c Normal file
View File

@@ -0,0 +1,69 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "rtpa.h"
rtpa_s *rtpa_init(rtp_callback_f callback) {
rtpa_s *rtpa;
A_CALLOC(rtpa, 1);
rtpa->rtp = rtp_init(111, false);
rtpa->callback = callback;
return rtpa;
}
void rtpa_destroy(rtpa_s *rtpa) {
rtp_destroy(rtpa->rtp);
free(rtpa);
}
char *rtpa_make_sdp(rtpa_s *rtpa) {
# define PAYLOAD rtpa->rtp->payload
char *sdp;
A_ASPRINTF(sdp,
"m=audio 1 RTP/SAVPF %u" RN
"c=IN IP4 0.0.0.0" RN
"a=rtpmap:%u OPUS/48000/2" RN
// "a=fmtp:%u useinbandfec=1" RN
"a=rtcp-fb:%u nack" RN
"a=rtcp-fb:%u nack pli" RN
"a=rtcp-fb:%u goog-remb" RN
"a=ssrc:%" PRIu32 " cname:ustreamer" RN
"a=sendonly" RN,
PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD, // PAYLOAD,
rtpa->rtp->ssrc
);
# undef PAYLOAD
return sdp;
}
void rtpa_wrap(rtpa_s *rtpa, const uint8_t *data, size_t size, uint32_t pts) {
if (size + RTP_HEADER_SIZE <= RTP_DATAGRAM_SIZE) {
rtp_write_header(rtpa->rtp, pts, false);
memcpy(rtpa->rtp->datagram + RTP_HEADER_SIZE, data, size);
rtpa->rtp->used = size + RTP_HEADER_SIZE;
rtpa->callback(rtpa->rtp);
}
}

51
janus/src/rtpa.h Normal file
View File

@@ -0,0 +1,51 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include <inttypes.h>
#include <sys/types.h>
#include "tools.h"
#include "threading.h"
#include "audio.h"
#include "rtp.h"
typedef struct {
rtp_s *rtp;
rtp_callback_f callback;
} rtpa_s;
rtpa_s *rtpa_init(rtp_callback_f callback);
void rtpa_destroy(rtpa_s *rtpa);
char *rtpa_make_sdp(rtpa_s *rtpa);
void rtpa_wrap(rtpa_s *rtpa, const uint8_t *data, size_t size, uint32_t pts);

211
janus/src/rtpv.c Normal file
View File

@@ -0,0 +1,211 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "rtpv.h"
void _rtpv_process_nalu(rtpv_s *rtpv, const uint8_t *data, size_t size, uint32_t pts, bool marked);
static ssize_t _find_annexb(const uint8_t *data, size_t size);
rtpv_s *rtpv_init(rtp_callback_f callback) {
rtpv_s *rtpv;
A_CALLOC(rtpv, 1);
rtpv->rtp = rtp_init(96, true);
rtpv->callback = callback;
rtpv->sps = frame_init();
rtpv->pps = frame_init();
A_MUTEX_INIT(&rtpv->mutex);
return rtpv;
}
void rtpv_destroy(rtpv_s *rtpv) {
A_MUTEX_DESTROY(&rtpv->mutex);
frame_destroy(rtpv->pps);
frame_destroy(rtpv->sps);
rtp_destroy(rtpv->rtp);
free(rtpv);
}
char *rtpv_make_sdp(rtpv_s *rtpv) {
A_MUTEX_LOCK(&rtpv->mutex);
if (rtpv->sps->used == 0 || rtpv->pps->used == 0) {
A_MUTEX_UNLOCK(&rtpv->mutex);
return NULL;
}
char *sps = NULL;
char *pps = NULL;
base64_encode(rtpv->sps->data, rtpv->sps->used, &sps, NULL);
base64_encode(rtpv->pps->data, rtpv->pps->used, &pps, NULL);
A_MUTEX_UNLOCK(&rtpv->mutex);
# define PAYLOAD rtpv->rtp->payload
// https://tools.ietf.org/html/rfc6184
// https://github.com/meetecho/janus-gateway/issues/2443
char *sdp;
A_ASPRINTF(sdp,
"m=video 1 RTP/SAVPF %u" RN
"c=IN IP4 0.0.0.0" RN
"a=rtpmap:%u H264/90000" RN
"a=fmtp:%u profile-level-id=42E01F" RN
"a=fmtp:%u packetization-mode=1" RN
"a=fmtp:%u sprop-sps=%s" RN
"a=fmtp:%u sprop-pps=%s" RN
"a=rtcp-fb:%u nack" RN
"a=rtcp-fb:%u nack pli" RN
"a=rtcp-fb:%u goog-remb" RN
"a=ssrc:%" PRIu32 " cname:ustreamer" RN
"a=sendonly" RN,
PAYLOAD, PAYLOAD, PAYLOAD, PAYLOAD,
PAYLOAD, sps,
PAYLOAD, pps,
PAYLOAD, PAYLOAD, PAYLOAD,
rtpv->rtp->ssrc
);
# undef PAYLOAD
free(sps);
free(pps);
return sdp;
}
#define PRE 3 // Annex B prefix length
void rtpv_wrap(rtpv_s *rtpv, const frame_s *frame) {
// There is a complicated logic here but everything works as it should:
// - https://github.com/pikvm/ustreamer/issues/115#issuecomment-893071775
assert(frame->format == V4L2_PIX_FMT_H264);
const uint32_t pts = get_now_monotonic_u64() * 9 / 100; // PTS units are in 90 kHz
ssize_t last_offset = -PRE;
while (true) { // Find and iterate by nalus
const size_t next_start = last_offset + PRE;
ssize_t offset = _find_annexb(frame->data + next_start, frame->used - next_start);
if (offset < 0) {
break;
}
offset += next_start;
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = offset - last_offset - PRE;
if (data[size - 1] == 0) { // Check for extra 00
--size;
}
_rtpv_process_nalu(rtpv, data, size, pts, false);
}
last_offset = offset;
}
if (last_offset >= 0) {
const uint8_t *data = frame->data + last_offset + PRE;
size_t size = frame->used - last_offset - PRE;
_rtpv_process_nalu(rtpv, data, size, pts, true);
}
}
void _rtpv_process_nalu(rtpv_s *rtpv, const uint8_t *data, size_t size, uint32_t pts, bool marked) {
const unsigned ref_idc = (data[0] >> 5) & 3;
const unsigned type = data[0] & 0x1F;
frame_s *ps = NULL;
switch (type) {
case 7: ps = rtpv->sps; break;
case 8: ps = rtpv->pps; break;
}
if (ps) {
A_MUTEX_LOCK(&rtpv->mutex);
frame_set_data(ps, data, size);
A_MUTEX_UNLOCK(&rtpv->mutex);
}
# define DG rtpv->rtp->datagram
if (size + RTP_HEADER_SIZE <= RTP_DATAGRAM_SIZE) {
rtp_write_header(rtpv->rtp, pts, marked);
memcpy(DG + RTP_HEADER_SIZE, data, size);
rtpv->rtp->used = size + RTP_HEADER_SIZE;
rtpv->callback(rtpv->rtp);
return;
}
const size_t fu_overhead = RTP_HEADER_SIZE + 2; // FU-A overhead
const uint8_t *src = data + 1;
ssize_t remaining = size - 1;
bool first = true;
while (remaining > 0) {
ssize_t frag_size = RTP_DATAGRAM_SIZE - fu_overhead;
const bool last = (remaining <= frag_size);
if (last) {
frag_size = remaining;
}
rtp_write_header(rtpv->rtp, pts, (marked && last));
DG[RTP_HEADER_SIZE] = 28 | (ref_idc << 5);
uint8_t fu = type;
if (first) {
fu |= 0x80;
}
if (last) {
fu |= 0x40;
}
DG[RTP_HEADER_SIZE + 1] = fu;
memcpy(DG + fu_overhead, src, frag_size);
rtpv->rtp->used = fu_overhead + frag_size;
rtpv->callback(rtpv->rtp);
src += frag_size;
remaining -= frag_size;
first = false;
}
# undef DG
}
static ssize_t _find_annexb(const uint8_t *data, size_t size) {
// Parses buffer for 00 00 01 start codes
if (size >= PRE) {
for (size_t index = 0; index <= size - PRE; ++index) {
if (data[index] == 0 && data[index + 1] == 0 && data[index + 2] == 1) {
return index;
}
}
}
return -1;
}
#undef PRE

59
janus/src/rtpv.h Normal file
View File

@@ -0,0 +1,59 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# This source file is partially based on this code: #
# - https://github.com/catid/kvm/blob/master/kvm_pipeline/src #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <inttypes.h>
#include <assert.h>
#include <sys/types.h>
#include <linux/videodev2.h>
#include <pthread.h>
#include "tools.h"
#include "threading.h"
#include "frame.h"
#include "base64.h"
#include "rtp.h"
typedef struct {
rtp_s *rtp;
rtp_callback_f callback;
frame_s *sps; // Actually not a frame, just a bytes storage
frame_s *pps;
pthread_mutex_t mutex;
} rtpv_s;
rtpv_s *rtpv_init(rtp_callback_f callback);
void rtpv_destroy(rtpv_s *rtpv);
char *rtpv_make_sdp(rtpv_s *rtpv);
void rtpv_wrap(rtpv_s *rtpv, const frame_s *frame);

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer-dump.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER-DUMP 1 "version 5.4" "January 2021"
.TH USTREAMER-DUMP 1 "version 5.6" "January 2021"
.SH NAME
ustreamer-dump \- Dump uStreamer's memory sink to file

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER 1 "version 5.4" "November 2020"
.TH USTREAMER 1 "version 5.6" "November 2020"
.SH NAME
ustreamer \- stream MJPEG video from any V4L2 device to the network

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer
pkgver=5.4
pkgver=5.6
pkgrel=1
pkgdesc="Lightweight and fast MJPEG-HTTP streamer"
url="https://github.com/pikvm/ustreamer"
@@ -22,8 +22,8 @@ if [ -e /usr/bin/python3 ]; then
makedepends+=(python-setuptools)
fi
if [ -e /usr/include/janus/plugins/plugin.h ];then
depends+=(janus-gateway-pikvm)
makedepends+=(janus-gateway-pikvm)
depends+=(janus-gateway-pikvm alsa-lib opus)
makedepends+=(janus-gateway-pikvm alsa-lib opus)
_options="$_options WITH_JANUS=1"
fi

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=5.4
PKG_VERSION:=5.6
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -8,7 +8,7 @@ from setuptools import setup
if __name__ == "__main__":
setup(
name="ustreamer",
version="5.4",
version="5.6",
description="uStreamer tools",
author="Maxim Devaev",
author_email="mdevaev@gmail.com",

View File

@@ -23,7 +23,7 @@
#pragma once
#define VERSION_MAJOR 5
#define VERSION_MINOR 4
#define VERSION_MINOR 6
#define MAKE_VERSION2(_major, _minor) #_major "." #_minor
#define MAKE_VERSION1(_major, _minor) MAKE_VERSION2(_major, _minor)

View File

@@ -67,7 +67,7 @@
#define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
#define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
#define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!(_var)) assert(!pthread_cond_wait(_cond, _mutex)); }
#ifdef WITH_PTHREAD_NP

View File

@@ -126,13 +126,13 @@ INLINE uint64_t get_now_monotonic_u64(void) {
return (uint64_t)(ts.tv_nsec / 1000) + (uint64_t)ts.tv_sec * 1000000;
}
#undef X_CLOCK_MONOTONIC
INLINE uint64_t get_now_id(void) {
uint64_t now = get_now_monotonic_u64();
return (uint64_t)triple_u32(now) | ((uint64_t)triple_u32(now + 12345) << 32);
}
#undef X_CLOCK_MONOTONIC
INLINE long double get_now_real(void) {
time_t sec;
long msec;