Compare commits

...

58 Commits
v6.10 ... v6.25

Author SHA1 Message Date
Maxim Devaev
b1e7c82131 Bump version: 6.24 → 6.25 2025-01-20 00:23:11 +02:00
Maxim Devaev
3d7685ac48 bunch of mic fixes 2025-01-20 00:21:36 +02:00
Maxim Devaev
37e79995fe Bump version: 6.23 → 6.24 2025-01-19 18:19:30 +02:00
Maxim Devaev
1ee096b17c mic support 2025-01-19 18:15:08 +02:00
Maxim Devaev
918688e91d refactoring 2025-01-18 18:32:41 +02:00
Maxim Devaev
a94ff667b0 refactoring, increased bitrate, reduced buffers 2025-01-18 17:16:55 +02:00
Maxim Devaev
10595a13e9 refactoring 2025-01-18 05:09:32 +02:00
Maxim Devaev
80ffc8b2bd Bump version: 6.22 → 6.23 2025-01-17 20:53:21 +02:00
Maxim Devaev
ba246d90c0 refactoring 2025-01-17 20:40:18 +02:00
Maxim Devaev
29c98e3908 Bump version: 6.21 → 6.22 2025-01-13 17:17:27 +02:00
Maxim Devaev
acc8cecbe4 lint fix 2025-01-13 17:15:55 +02:00
Maxim Devaev
8c31af2f03 janus: sendonly/sendrecv audio flag 2025-01-13 17:10:42 +02:00
Maxim Devaev
a727c9b7c5 Bump version: 6.20 → 6.21 2024-12-27 05:22:35 +02:00
Maxim Devaev
eabc8d8343 fixed bug with reversed logic of parent notification 2024-12-27 05:20:22 +02:00
Maxim Devaev
4e4ae21a83 Bump version: 6.19 → 6.20 2024-12-26 04:31:23 +02:00
Maxim Devaev
412a1775a6 hotfixed online flag 2024-12-26 04:29:15 +02:00
Maxim Devaev
c404c49c6d Bump version: 6.18 → 6.19 2024-12-26 04:09:49 +02:00
Maxim Devaev
481e359153 janus: reduces opus frame length to 20ms 2024-12-26 04:05:53 +02:00
Maxim Devaev
04114bba86 refactoring 2024-12-15 11:34:41 +02:00
Maxim Devaev
c848756d53 Bump version: 6.17 → 6.18 2024-11-29 22:26:02 +02:00
Maxim Devaev
2a8aaabe48 janus: Fixed return value of message handler + memory leak with transaction 2024-11-29 22:03:49 +02:00
Maxim Devaev
239db92a85 Issue #295: Fixed double json_decref() 2024-11-27 16:08:29 +02:00
Maxim Devaev
740e09c70d Bump version: 6.16 → 6.17 2024-11-07 12:38:32 +02:00
Maxim Devaev
e030479aae lint fixes 2024-11-07 12:36:16 +02:00
Maxim Devaev
4db730abd9 fixed missing argument 2024-11-07 12:24:05 +02:00
Frank Müller
79020143c7 scale the blank image for NO SIGNAL to the resolution in the options 2024-11-07 12:06:35 +02:00
Maxim Devaev
1f96925181 Bump version: 6.15 → 6.16 2024-09-11 01:09:17 +03:00
Maxim Devaev
74dc1dc146 Janus: Added sprop-stereo=1 2024-09-11 01:06:19 +03:00
Maxim Devaev
6f8e8205b3 Bump version: 6.14 → 6.15 2024-09-06 22:21:34 +03:00
Maxim Devaev
5f932d862b Small refactoring of #289 + manpage 2024-09-06 20:40:23 +03:00
zefir-o
590a73f9ec Add option to which allows to handle truncated frames. (#289)
Extension of c96559e4ac.
Some cheap Chinise cameras produces frames which are detected as 'broken'. However they
are later handled well.
Introduce an option which allows disable the check on demand.
2024-09-06 19:32:48 +03:00
Maxim Devaev
79bbafdc98 Bump version: 6.13 → 6.14 2024-09-04 18:56:32 +03:00
Maxim Devaev
fcecc12229 Revert "refactoring"
This reverts commit 3e228c1fb8.
2024-09-04 18:34:41 +03:00
Maxim Devaev
f79a663839 added pkgconf to deps 2024-09-04 18:31:48 +03:00
Maxim Devaev
3e228c1fb8 refactoring 2024-09-04 15:49:55 +03:00
Maxim Devaev
53ec87b416 Issue #264: Properly checking of pkg-config 2024-08-17 05:40:03 +03:00
Maxim Devaev
de8cb85605 Bump version: 6.12 → 6.13 2024-08-16 07:07:54 +03:00
Maxim Devaev
000be92a0b lint fix 2024-08-16 07:04:21 +03:00
Maxim Devaev
f2779f7b44 check for pkg-config 2024-08-16 06:38:52 +03:00
yuri@FreeBSD
dcddfddf56 Fix crash on FreeBSD due to incorrect thr_self system call invocation (#285)
The correct signature is:
int thr_self(long *id);

It was called as thr_self() which caused memory corruption.
2024-08-16 06:38:07 +03:00
Randolf Richardson 張文道
793f24c48e Update README.md (#275)
Minor spelling correction
2024-05-29 12:59:48 +03:00
Maxim Devaev
25d87d5fa8 Bump version: 6.11 → 6.12 2024-05-16 00:13:24 +03:00
Maxim Devaev
e8a7fb32ac lint fixes 2024-05-16 00:10:53 +03:00
Maxim Devaev
9d5eb8bacb fixed edid path 2024-05-16 00:01:03 +03:00
Maxim Devaev
353e58d7ca fix 2024-05-16 00:00:10 +03:00
Fabrice Fontaine
6c24c9ea61 src/libs/types.h: include sys/types.h (#273)
Include sys/types.h to avoid the following uclibc build failure since
version 5.52 and
2d6716aa47:

In file included from libs/base64.h:25,
                 from libs/base64.c:23:
libs/types.h:30:9: error: unknown type name 'ssize_t'
   30 | typedef ssize_t sz;
      |         ^~~~~~~

Fixes:
 - http://autobuild.buildroot.org/results/24498049d7beb4afaaf9f9a0c2fc0bcd26a3ee04

Signed-off-by: Fabrice Fontaine <fontaine.fabrice@gmail.com>
2024-05-15 20:56:49 +03:00
Maxim Devaev
dfeefe5a1c Bump version: 6.10 → 6.11 2024-04-05 19:31:57 +03:00
Maxim Devaev
aae090ab4e list: clean next pointer on append 2024-04-05 19:29:13 +03:00
Maxim Devaev
18038799f0 reworked pool logic 2024-04-05 19:21:42 +03:00
Maxim Devaev
fab4c47f17 list: clean prev/next pointers on remove 2024-04-05 17:48:26 +03:00
Maxim Devaev
c40b3ee225 refactoring 2024-04-04 23:25:06 +03:00
Maxim Devaev
fca69db680 us_workers_pool_wait() without side effect 2024-04-04 23:21:34 +03:00
Maxim Devaev
0d974a5faf refactoring 2024-04-04 19:37:03 +03:00
Maxim Devaev
1ed39790ba use JCS_EXT_BGR on libjpeg-turbo 2024-04-04 15:20:16 +03:00
Maxim Devaev
75a193f997 syntax fix 2024-04-04 03:58:45 +03:00
Maxim Devaev
65c652e624 encoder: removed cpu_forced logic 2024-04-04 03:44:20 +03:00
Maxim Devaev
ae2f270f50 refactoring 2024-04-04 02:36:28 +03:00
Maxim Devaev
0a639eabca deprecated noop jpeg encoder 2024-04-03 20:23:35 +03:00
49 changed files with 1153 additions and 886 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 6.10
current_version = 6.25
parse = (?P<major>\d+)\.(?P<minor>\d+)
serialize =
{major}.{minor}

View File

@@ -16,6 +16,12 @@ export
_LINTERS_IMAGE ?= ustreamer-linters
# =====
ifeq (__not_found__,$(shell which pkg-config 2>/dev/null || echo "__not_found__"))
$(error "No pkg-config found in $(PATH)")
endif
# =====
define optbool
$(filter $(shell echo $(1) | tr A-Z a-z), yes on 1)

View File

@@ -23,7 +23,7 @@
| Compatibility with mjpg-streamer's API | ✔ | :) |
Footnotes:
* ```1``` Long before µStreamer, I made a [patch](https://github.com/jacksonliam/mjpg-streamer/pull/164) to add DV-timings support to mjpg-streamer and to keep it from hanging up no device disconnection. Alas, the patch is far from perfect and I can't guarantee it will work every time - mjpg-streamer's source code is very complicated and its structure is hard to understand. With this in mind, along with needing multithreading and JPEG hardware acceleration in the future, I decided to make my own stream server from scratch instead of supporting legacy code.
* ```1``` Long before µStreamer, I made a [patch](https://github.com/jacksonliam/mjpg-streamer/pull/164) to add DV-timings support to mjpg-streamer and to keep it from hanging up on device disconnection. Alas, the patch is far from perfect and I can't guarantee it will work every time - mjpg-streamer's source code is very complicated and its structure is hard to understand. With this in mind, along with needing multithreading and JPEG hardware acceleration in the future, I decided to make my own stream server from scratch instead of supporting legacy code.
* ```2``` This feature allows to cut down outgoing traffic several-fold when streaming HDMI, but it increases CPU usage a little bit. The idea is that HDMI is a fully digital interface and each captured frame can be identical to the previous one byte-wise. There's no need to stream the same image over the net several times a second. With the `--drop-same-frames=20` option enabled, µStreamer will drop all the matching frames (with a limit of 20 in a row). Each new frame is matched with the previous one first by length, then using ```memcmp()```.

256
janus/src/acap.c Normal file
View File

@@ -0,0 +1,256 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "acap.h"
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>
#include <pthread.h>
#include <alsa/asoundlib.h>
#include <speex/speex_resampler.h>
#include <opus/opus.h>
#include "uslibs/types.h"
#include "uslibs/errors.h"
#include "uslibs/tools.h"
#include "uslibs/array.h"
#include "uslibs/ring.h"
#include "uslibs/threading.h"
#include "rtp.h"
#include "au.h"
#include "logging.h"
static void *_pcm_thread(void *v_acap);
static void *_encoder_thread(void *v_acap);
bool us_acap_probe(const char *name) {
snd_pcm_t *dev;
int err;
US_JLOG_INFO("acap", "Probing PCM capture ...");
if ((err = snd_pcm_open(&dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
US_JLOG_PERROR_ALSA(err, "acap", "Can't probe PCM capture");
return false;
}
snd_pcm_close(dev);
US_JLOG_INFO("acap", "PCM capture is available");
return true;
}
us_acap_s *us_acap_init(const char *name, uint pcm_hz) {
us_acap_s *acap;
US_CALLOC(acap, 1);
acap->pcm_hz = pcm_hz;
US_RING_INIT_WITH_ITEMS(acap->pcm_ring, 8, us_au_pcm_init);
US_RING_INIT_WITH_ITEMS(acap->enc_ring, 8, us_au_encoded_init);
atomic_init(&acap->stop, false);
int err;
{
if ((err = snd_pcm_open(&acap->dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
acap->dev = NULL;
US_JLOG_PERROR_ALSA(err, "acap", "Can't open PCM capture");
goto error;
}
assert(!snd_pcm_hw_params_malloc(&acap->dev_params));
# define SET_PARAM(_msg, _func, ...) { \
if ((err = _func(acap->dev, acap->dev_params, ##__VA_ARGS__)) < 0) { \
US_JLOG_PERROR_ALSA(err, "acap", _msg); \
goto error; \
} \
}
SET_PARAM("Can't initialize PCM params", snd_pcm_hw_params_any);
SET_PARAM("Can't set PCM access type", snd_pcm_hw_params_set_access, SND_PCM_ACCESS_RW_INTERLEAVED);
SET_PARAM("Can't set PCM channels number", snd_pcm_hw_params_set_channels, US_RTP_OPUS_CH);
SET_PARAM("Can't set PCM sampling format", snd_pcm_hw_params_set_format, SND_PCM_FORMAT_S16_LE);
SET_PARAM("Can't set PCM sampling rate", snd_pcm_hw_params_set_rate_near, &acap->pcm_hz, 0);
if (acap->pcm_hz < US_AU_MIN_PCM_HZ || acap->pcm_hz > US_AU_MAX_PCM_HZ) {
US_JLOG_ERROR("acap", "Unsupported PCM freq: %u; should be: %u <= F <= %u",
acap->pcm_hz, US_AU_MIN_PCM_HZ, US_AU_MAX_PCM_HZ);
goto error;
}
acap->pcm_frames = US_AU_HZ_TO_FRAMES(acap->pcm_hz);
acap->pcm_size = US_AU_HZ_TO_BUF8(acap->pcm_hz);
SET_PARAM("Can't apply PCM params", snd_pcm_hw_params);
# undef SET_PARAM
}
if (acap->pcm_hz != US_RTP_OPUS_HZ) {
acap->res = speex_resampler_init(US_RTP_OPUS_CH, acap->pcm_hz, US_RTP_OPUS_HZ, SPEEX_RESAMPLER_QUALITY_DESKTOP, &err);
if (err < 0) {
acap->res = NULL;
US_JLOG_PERROR_RES(err, "acap", "Can't create resampler");
goto error;
}
}
{
// OPUS_APPLICATION_VOIP, OPUS_APPLICATION_RESTRICTED_LOWDELAY
acap->enc = opus_encoder_create(US_RTP_OPUS_HZ, US_RTP_OPUS_CH, OPUS_APPLICATION_AUDIO, &err);
assert(err == 0);
// https://github.com/meetecho/janus-gateway/blob/3cdd6ff/src/plugins/janus_audiobridge.c#L2272
// https://datatracker.ietf.org/doc/html/rfc7587#section-3.1.1
assert(!opus_encoder_ctl(acap->enc, OPUS_SET_BITRATE(128000)));
assert(!opus_encoder_ctl(acap->enc, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND)));
assert(!opus_encoder_ctl(acap->enc, OPUS_SET_SIGNAL(OPUS_SIGNAL_MUSIC)));
// OPUS_SET_INBAND_FEC(1), OPUS_SET_PACKET_LOSS_PERC(10): see rtpa.c
}
US_JLOG_INFO("acap", "Capture configured on %uHz; capturing ...", acap->pcm_hz);
acap->tids_created = true;
US_THREAD_CREATE(acap->enc_tid, _encoder_thread, acap);
US_THREAD_CREATE(acap->pcm_tid, _pcm_thread, acap);
return acap;
error:
us_acap_destroy(acap);
return NULL;
}
void us_acap_destroy(us_acap_s *acap) {
if (acap->tids_created) {
atomic_store(&acap->stop, true);
US_THREAD_JOIN(acap->pcm_tid);
US_THREAD_JOIN(acap->enc_tid);
}
US_DELETE(acap->enc, opus_encoder_destroy);
US_DELETE(acap->res, speex_resampler_destroy);
US_DELETE(acap->dev, snd_pcm_close);
US_DELETE(acap->dev_params, snd_pcm_hw_params_free);
US_RING_DELETE_WITH_ITEMS(acap->enc_ring, us_au_encoded_destroy);
US_RING_DELETE_WITH_ITEMS(acap->pcm_ring, us_au_pcm_destroy);
if (acap->tids_created) {
US_JLOG_INFO("acap", "Capture closed");
}
free(acap);
}
int us_acap_get_encoded(us_acap_s *acap, u8 *data, uz *size, u64 *pts) {
if (atomic_load(&acap->stop)) {
return -1;
}
const int ri = us_ring_consumer_acquire(acap->enc_ring, 0.1);
if (ri < 0) {
return US_ERROR_NO_DATA;
}
const us_au_encoded_s *const buf = acap->enc_ring->items[ri];
if (buf->used == 0 || *size < buf->used) {
us_ring_consumer_release(acap->enc_ring, ri);
return US_ERROR_NO_DATA;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
us_ring_consumer_release(acap->enc_ring, ri);
return 0;
}
static void *_pcm_thread(void *v_acap) {
US_THREAD_SETTLE("us_ac_pcm");
us_acap_s *const acap = v_acap;
u8 in[US_AU_MAX_BUF8];
while (!atomic_load(&acap->stop)) {
const int frames = snd_pcm_readi(acap->dev, in, acap->pcm_frames);
if (frames < 0) {
US_JLOG_PERROR_ALSA(frames, "acap", "Fatal: Can't capture PCM frames");
break;
} else if (frames < (int)acap->pcm_frames) {
US_JLOG_ERROR("acap", "Fatal: Too few PCM frames captured");
break;
}
const int ri = us_ring_producer_acquire(acap->pcm_ring, 0);
if (ri >= 0) {
us_au_pcm_s *const out = acap->pcm_ring->items[ri];
memcpy(out->data, in, acap->pcm_size);
us_ring_producer_release(acap->pcm_ring, ri);
} else {
US_JLOG_ERROR("acap", "PCM ring is full");
}
}
atomic_store(&acap->stop, true);
return NULL;
}
static void *_encoder_thread(void *v_acap) {
US_THREAD_SETTLE("us_ac_enc");
us_acap_s *const acap = v_acap;
s16 in_res[US_AU_MAX_BUF16];
while (!atomic_load(&acap->stop)) {
const int in_ri = us_ring_consumer_acquire(acap->pcm_ring, 0.1);
if (in_ri < 0) {
continue;
}
us_au_pcm_s *const in = acap->pcm_ring->items[in_ri];
s16 *in_ptr;
if (acap->res != NULL) {
assert(acap->pcm_hz != US_RTP_OPUS_HZ);
u32 in_count = acap->pcm_frames;
u32 out_count = US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ);
speex_resampler_process_interleaved_int(acap->res, in->data, &in_count, in_res, &out_count);
in_ptr = in_res;
} else {
assert(acap->pcm_hz == US_RTP_OPUS_HZ);
in_ptr = in->data;
}
const int out_ri = us_ring_producer_acquire(acap->enc_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("acap", "OPUS encoder queue is full");
us_ring_consumer_release(acap->pcm_ring, in_ri);
continue;
}
us_au_encoded_s *const out = acap->enc_ring->items[out_ri];
const int size = opus_encode(acap->enc, in_ptr, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), out->data, US_ARRAY_LEN(out->data));
us_ring_consumer_release(acap->pcm_ring, in_ri);
if (size > 0) {
out->used = size;
out->pts = acap->pts;
// https://datatracker.ietf.org/doc/html/rfc7587#section-4.2
acap->pts += US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ);
} else {
out->used = 0;
US_JLOG_PERROR_OPUS(size, "acap", "Fatal: Can't encode PCM frame to OPUS");
}
us_ring_producer_release(acap->enc_ring, out_ri);
}
atomic_store(&acap->stop, true);
return NULL;
}

View File

@@ -34,11 +34,11 @@
typedef struct {
snd_pcm_t *pcm;
snd_pcm_t *dev;
uint pcm_hz;
uint pcm_frames;
uz pcm_size;
snd_pcm_hw_params_t *pcm_params;
snd_pcm_hw_params_t *dev_params;
SpeexResamplerState *res;
OpusEncoder *enc;
@@ -50,12 +50,12 @@ typedef struct {
pthread_t enc_tid;
bool tids_created;
atomic_bool stop;
} us_audio_s;
} us_acap_s;
bool us_audio_probe(const char *name);
bool us_acap_probe(const char *name);
us_audio_s *us_audio_init(const char *name, uint pcm_hz);
void us_audio_destroy(us_audio_s *audio);
us_acap_s *us_acap_init(const char *name, uint pcm_hz);
void us_acap_destroy(us_acap_s *acap);
int us_audio_get_encoded(us_audio_s *audio, u8 *data, uz *size, u64 *pts);
int us_acap_get_encoded(us_acap_s *acap, u8 *data, uz *size, u64 *pts);

View File

@@ -20,33 +20,61 @@
*****************************************************************************/
#include "uri.h"
#include "au.h"
#include <event2/util.h>
#include <event2/http.h>
#include <event2/keyvalq_struct.h>
#include <stdlib.h>
#include "../../libs/types.h"
#include "uslibs/tools.h"
bool us_uri_get_true(struct evkeyvalq *params, const char *key) {
const char *value_str = evhttp_find_header(params, key);
if (value_str != NULL) {
if (
value_str[0] == '1'
|| !evutil_ascii_strcasecmp(value_str, "true")
|| !evutil_ascii_strcasecmp(value_str, "yes")
) {
return true;
us_au_pcm_s *us_au_pcm_init(void) {
us_au_pcm_s *pcm;
US_CALLOC(pcm, 1);
return pcm;
}
void us_au_pcm_destroy(us_au_pcm_s *pcm) {
free(pcm);
}
void us_au_pcm_mix(us_au_pcm_s *dest, us_au_pcm_s *src) {
const uz size = src->frames * US_RTP_OPUS_CH * 2; // 2 for 16 bit
if (src->frames == 0) {
return;
} else if (dest->frames == 0) {
memcpy(dest->data, src->data, size);
dest->frames = src->frames;
} else if (dest->frames == src->frames) {
// https://stackoverflow.com/questions/12089662
for (uz index = 0; index < size; ++index) {
int a = dest->data[index];
int b = src->data[index];
int m;
a += 32768;
b += 32768;
if ((a < 32768) && (b < 32768)) {
m = a * b / 32768;
} else {
m = 2 * (a + b) - (a * b) / 32768 - 65536;
}
if (m == 65536) {
m = 65535;
}
m -= 32768;
dest->data[index] = m;
}
}
return false;
}
char *us_uri_get_string(struct evkeyvalq *params, const char *key) {
const char *const value_str = evhttp_find_header(params, key);
if (value_str != NULL) {
return evhttp_encode_uri(value_str);
}
return NULL;
us_au_encoded_s *us_au_encoded_init(void) {
us_au_encoded_s *enc;
US_CALLOC(enc, 1);
return enc;
}
void us_au_encoded_destroy(us_au_encoded_s *enc) {
free(enc);
}

View File

@@ -22,10 +22,39 @@
#pragma once
#include <event2/keyvalq_struct.h>
#include "uslibs/types.h"
#include "../../libs/types.h"
#include "rtp.h"
// A number of frames per 1 channel:
// - https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368
#define US_AU_FRAME_MS 20
// #define _HZ_TO_FRAMES(_hz) (6 * (_hz) / 50) // 120ms
#define US_AU_HZ_TO_FRAMES(_hz) ((_hz) / 50) // 20ms
#define US_AU_HZ_TO_BUF16(_hz) (US_AU_HZ_TO_FRAMES(_hz) * US_RTP_OPUS_CH) // ... * 2: One stereo frame = (16bit L) + (16bit R)
#define US_AU_HZ_TO_BUF8(_hz) (US_AU_HZ_TO_BUF16(_hz) * sizeof(s16))
#define US_AU_MIN_PCM_HZ 8000
#define US_AU_MAX_PCM_HZ 192000
#define US_AU_MAX_BUF16 US_AU_HZ_TO_BUF16(US_AU_MAX_PCM_HZ)
#define US_AU_MAX_BUF8 US_AU_HZ_TO_BUF8(US_AU_MAX_PCM_HZ)
bool us_uri_get_true(struct evkeyvalq *params, const char *key);
char *us_uri_get_string(struct evkeyvalq *params, const char *key);
typedef struct {
s16 data[US_AU_MAX_BUF16];
uz frames;
} us_au_pcm_s;
typedef struct {
u8 data[US_RTP_PAYLOAD_SIZE];
uz used;
u64 pts;
} us_au_encoded_s;
us_au_pcm_s *us_au_pcm_init(void);
void us_au_pcm_destroy(us_au_pcm_s *pcm);
void us_au_pcm_mix(us_au_pcm_s *a, us_au_pcm_s *b);
us_au_encoded_s *us_au_encoded_init(void);
void us_au_encoded_destroy(us_au_encoded_s *enc);

View File

@@ -1,294 +0,0 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "audio.h"
#include <stdlib.h>
#include <stdatomic.h>
#include <assert.h>
#include <pthread.h>
#include <alsa/asoundlib.h>
#include <speex/speex_resampler.h>
#include <opus/opus.h>
#include "uslibs/types.h"
#include "uslibs/errors.h"
#include "uslibs/tools.h"
#include "uslibs/array.h"
#include "uslibs/ring.h"
#include "uslibs/threading.h"
#include "logging.h"
#define _JLOG_PERROR_ALSA(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, snd_strerror(_err))
#define _JLOG_PERROR_RES(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, speex_resampler_strerror(_err))
#define _JLOG_PERROR_OPUS(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, opus_strerror(_err))
// A number of frames per 1 channel:
// - https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368
#define _HZ_TO_FRAMES(_hz) (6 * (_hz) / 50) // 120ms
#define _HZ_TO_BUF16(_hz) (_HZ_TO_FRAMES(_hz) * 2) // One stereo frame = (16bit L) + (16bit R)
#define _HZ_TO_BUF8(_hz) (_HZ_TO_BUF16(_hz) * sizeof(s16))
#define _MIN_PCM_HZ 8000
#define _MAX_PCM_HZ 192000
#define _MAX_BUF16 _HZ_TO_BUF16(_MAX_PCM_HZ)
#define _MAX_BUF8 _HZ_TO_BUF8(_MAX_PCM_HZ)
#define _ENCODER_INPUT_HZ 48000
typedef struct {
s16 data[_MAX_BUF16];
} _pcm_buffer_s;
typedef struct {
u8 data[_MAX_BUF8]; // Worst case
uz used;
u64 pts;
} _enc_buffer_s;
static _pcm_buffer_s *_pcm_buffer_init(void);
static _enc_buffer_s *_enc_buffer_init(void);
static void *_pcm_thread(void *v_audio);
static void *_encoder_thread(void *v_audio);
bool us_audio_probe(const char *name) {
snd_pcm_t *pcm;
int err;
US_JLOG_INFO("audio", "Probing PCM capture ...");
if ((err = snd_pcm_open(&pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
_JLOG_PERROR_ALSA(err, "audio", "Can't probe PCM capture");
return false;
}
snd_pcm_close(pcm);
US_JLOG_INFO("audio", "PCM capture is available");
return true;
}
us_audio_s *us_audio_init(const char *name, uint pcm_hz) {
us_audio_s *audio;
US_CALLOC(audio, 1);
audio->pcm_hz = pcm_hz;
US_RING_INIT_WITH_ITEMS(audio->pcm_ring, 8, _pcm_buffer_init);
US_RING_INIT_WITH_ITEMS(audio->enc_ring, 8, _enc_buffer_init);
atomic_init(&audio->stop, false);
int err;
{
if ((err = snd_pcm_open(&audio->pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
audio->pcm = NULL;
_JLOG_PERROR_ALSA(err, "audio", "Can't open PCM capture");
goto error;
}
assert(!snd_pcm_hw_params_malloc(&audio->pcm_params));
# define SET_PARAM(_msg, _func, ...) { \
if ((err = _func(audio->pcm, audio->pcm_params, ##__VA_ARGS__)) < 0) { \
_JLOG_PERROR_ALSA(err, "audio", _msg); \
goto error; \
} \
}
SET_PARAM("Can't initialize PCM params", snd_pcm_hw_params_any);
SET_PARAM("Can't set PCM access type", snd_pcm_hw_params_set_access, SND_PCM_ACCESS_RW_INTERLEAVED);
SET_PARAM("Can't set PCM channels numbre", snd_pcm_hw_params_set_channels, 2);
SET_PARAM("Can't set PCM sampling format", snd_pcm_hw_params_set_format, SND_PCM_FORMAT_S16_LE);
SET_PARAM("Can't set PCM sampling rate", snd_pcm_hw_params_set_rate_near, &audio->pcm_hz, 0);
if (audio->pcm_hz < _MIN_PCM_HZ || audio->pcm_hz > _MAX_PCM_HZ) {
US_JLOG_ERROR("audio", "Unsupported PCM freq: %u; should be: %u <= F <= %u",
audio->pcm_hz, _MIN_PCM_HZ, _MAX_PCM_HZ);
goto error;
}
audio->pcm_frames = _HZ_TO_FRAMES(audio->pcm_hz);
audio->pcm_size = _HZ_TO_BUF8(audio->pcm_hz);
SET_PARAM("Can't apply PCM params", snd_pcm_hw_params);
# undef SET_PARAM
}
if (audio->pcm_hz != _ENCODER_INPUT_HZ) {
audio->res = speex_resampler_init(2, audio->pcm_hz, _ENCODER_INPUT_HZ, SPEEX_RESAMPLER_QUALITY_DESKTOP, &err);
if (err < 0) {
audio->res = NULL;
_JLOG_PERROR_RES(err, "audio", "Can't create resampler");
goto error;
}
}
{
// OPUS_APPLICATION_VOIP, OPUS_APPLICATION_RESTRICTED_LOWDELAY
audio->enc = opus_encoder_create(_ENCODER_INPUT_HZ, 2, OPUS_APPLICATION_AUDIO, &err);
assert(err == 0);
assert(!opus_encoder_ctl(audio->enc, OPUS_SET_BITRATE(48000)));
assert(!opus_encoder_ctl(audio->enc, OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND)));
assert(!opus_encoder_ctl(audio->enc, OPUS_SET_SIGNAL(OPUS_SIGNAL_MUSIC)));
// OPUS_SET_INBAND_FEC(1), OPUS_SET_PACKET_LOSS_PERC(10): see rtpa.c
}
US_JLOG_INFO("audio", "Pipeline configured on %uHz; capturing ...", audio->pcm_hz);
audio->tids_created = true;
US_THREAD_CREATE(audio->enc_tid, _encoder_thread, audio);
US_THREAD_CREATE(audio->pcm_tid, _pcm_thread, audio);
return audio;
error:
us_audio_destroy(audio);
return NULL;
}
void us_audio_destroy(us_audio_s *audio) {
if (audio->tids_created) {
atomic_store(&audio->stop, true);
US_THREAD_JOIN(audio->pcm_tid);
US_THREAD_JOIN(audio->enc_tid);
}
US_DELETE(audio->enc, opus_encoder_destroy);
US_DELETE(audio->res, speex_resampler_destroy);
US_DELETE(audio->pcm, snd_pcm_close);
US_DELETE(audio->pcm_params, snd_pcm_hw_params_free);
US_RING_DELETE_WITH_ITEMS(audio->enc_ring, free);
US_RING_DELETE_WITH_ITEMS(audio->pcm_ring, free);
if (audio->tids_created) {
US_JLOG_INFO("audio", "Pipeline closed");
}
free(audio);
}
int us_audio_get_encoded(us_audio_s *audio, u8 *data, uz *size, u64 *pts) {
if (atomic_load(&audio->stop)) {
return -1;
}
const int ri = us_ring_consumer_acquire(audio->enc_ring, 0.1);
if (ri < 0) {
return US_ERROR_NO_DATA;
}
const _enc_buffer_s *const buf = audio->enc_ring->items[ri];
if (*size < buf->used) {
us_ring_consumer_release(audio->enc_ring, ri);
return US_ERROR_NO_DATA;
}
memcpy(data, buf->data, buf->used);
*size = buf->used;
*pts = buf->pts;
us_ring_consumer_release(audio->enc_ring, ri);
return 0;
}
static _pcm_buffer_s *_pcm_buffer_init(void) {
_pcm_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
}
static _enc_buffer_s *_enc_buffer_init(void) {
_enc_buffer_s *buf;
US_CALLOC(buf, 1);
return buf;
}
static void *_pcm_thread(void *v_audio) {
US_THREAD_SETTLE("us_a_pcm");
us_audio_s *const audio = v_audio;
u8 in[_MAX_BUF8];
while (!atomic_load(&audio->stop)) {
const int frames = snd_pcm_readi(audio->pcm, in, audio->pcm_frames);
if (frames < 0) {
_JLOG_PERROR_ALSA(frames, "audio", "Fatal: Can't capture PCM frames");
break;
} else if (frames < (int)audio->pcm_frames) {
US_JLOG_ERROR("audio", "Fatal: Too few PCM frames captured");
break;
}
const int ri = us_ring_producer_acquire(audio->pcm_ring, 0);
if (ri >= 0) {
_pcm_buffer_s *const out = audio->pcm_ring->items[ri];
memcpy(out->data, in, audio->pcm_size);
us_ring_producer_release(audio->pcm_ring, ri);
} else {
US_JLOG_ERROR("audio", "PCM ring is full");
}
}
atomic_store(&audio->stop, true);
return NULL;
}
static void *_encoder_thread(void *v_audio) {
US_THREAD_SETTLE("us_a_enc");
us_audio_s *const audio = v_audio;
s16 in_res[_MAX_BUF16];
while (!atomic_load(&audio->stop)) {
const int in_ri = us_ring_consumer_acquire(audio->pcm_ring, 0.1);
if (in_ri < 0) {
continue;
}
_pcm_buffer_s *const in = audio->pcm_ring->items[in_ri];
s16 *in_ptr;
if (audio->res != NULL) {
assert(audio->pcm_hz != _ENCODER_INPUT_HZ);
u32 in_count = audio->pcm_frames;
u32 out_count = _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
speex_resampler_process_interleaved_int(audio->res, in->data, &in_count, in_res, &out_count);
in_ptr = in_res;
} else {
assert(audio->pcm_hz == _ENCODER_INPUT_HZ);
in_ptr = in->data;
}
const int out_ri = us_ring_producer_acquire(audio->enc_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("audio", "OPUS encoder queue is full");
us_ring_consumer_release(audio->pcm_ring, in_ri);
continue;
}
_enc_buffer_s *const out = audio->enc_ring->items[out_ri];
const int size = opus_encode(audio->enc, in_ptr, _HZ_TO_FRAMES(_ENCODER_INPUT_HZ), out->data, US_ARRAY_LEN(out->data));
us_ring_consumer_release(audio->pcm_ring, in_ri);
if (size >= 0) {
out->used = size;
out->pts = audio->pts;
// https://datatracker.ietf.org/doc/html/rfc7587#section-4.2
audio->pts += _HZ_TO_FRAMES(_ENCODER_INPUT_HZ);
} else {
_JLOG_PERROR_OPUS(size, "audio", "Fatal: Can't encode PCM frame to OPUS");
}
us_ring_producer_release(audio->enc_ring, out_ri);
}
atomic_store(&audio->stop, true);
return NULL;
}

View File

@@ -25,23 +25,29 @@
#include <stdlib.h>
#include <stdatomic.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include <janus/plugins/plugin.h>
#include <janus/rtp.h>
#include <opus/opus.h>
#include "uslibs/types.h"
#include "uslibs/tools.h"
#include "uslibs/threading.h"
#include "uslibs/array.h"
#include "uslibs/list.h"
#include "uslibs/ring.h"
#include "logging.h"
#include "au.h"
#include "rtp.h"
static void *_video_thread(void *v_client);
static void *_audio_thread(void *v_client);
static void *_common_thread(void *v_client, bool video);
static void *_acap_thread(void *v_client);
static void *_video_or_acap_thread(void *v_client, bool video);
static void *_aplay_thread(void *v_client);
us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session) {
@@ -50,7 +56,8 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
client->gw = gw;
client->session = session;
atomic_init(&client->transmit, false);
atomic_init(&client->transmit_audio, false);
atomic_init(&client->transmit_acap, false);
atomic_init(&client->transmit_aplay, false);
atomic_init(&client->video_orient, 0);
atomic_init(&client->stop, false);
@@ -58,8 +65,12 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
US_RING_INIT_WITH_ITEMS(client->video_ring, 2048, us_rtp_init);
US_THREAD_CREATE(client->video_tid, _video_thread, client);
US_RING_INIT_WITH_ITEMS(client->audio_ring, 64, us_rtp_init);
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
US_RING_INIT_WITH_ITEMS(client->acap_ring, 64, us_rtp_init);
US_THREAD_CREATE(client->acap_tid, _acap_thread, client);
US_RING_INIT_WITH_ITEMS(client->aplay_enc_ring, 64, us_au_encoded_init);
US_RING_INIT_WITH_ITEMS(client->aplay_pcm_ring, 64, us_au_pcm_init);
US_THREAD_CREATE(client->aplay_tid, _aplay_thread, client);
return client;
}
@@ -70,8 +81,12 @@ void us_janus_client_destroy(us_janus_client_s *client) {
US_THREAD_JOIN(client->video_tid);
US_RING_DELETE_WITH_ITEMS(client->video_ring, us_rtp_destroy);
US_THREAD_JOIN(client->audio_tid);
US_RING_DELETE_WITH_ITEMS(client->audio_ring, us_rtp_destroy);
US_THREAD_JOIN(client->acap_tid);
US_RING_DELETE_WITH_ITEMS(client->acap_ring, us_rtp_destroy);
US_THREAD_JOIN(client->aplay_tid);
US_RING_DELETE_WITH_ITEMS(client->aplay_enc_ring, us_au_encoded_destroy);
US_RING_DELETE_WITH_ITEMS(client->aplay_pcm_ring, us_au_pcm_destroy);
free(client);
}
@@ -79,13 +94,13 @@ void us_janus_client_destroy(us_janus_client_s *client) {
void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) {
if (
atomic_load(&client->transmit)
&& (rtp->video || atomic_load(&client->transmit_audio))
&& (rtp->video || atomic_load(&client->transmit_acap))
) {
us_ring_s *const ring = (rtp->video ? client->video_ring : client->audio_ring);
us_ring_s *const ring = (rtp->video ? client->video_ring : client->acap_ring);
const int ri = us_ring_producer_acquire(ring, 0);
if (ri < 0) {
US_JLOG_ERROR("client", "Session %p %s ring is full",
client->session, (rtp->video ? "video" : "audio"));
client->session, (rtp->video ? "video" : "acap"));
return;
}
memcpy(ring->items[ri], rtp, sizeof(us_rtp_s));
@@ -93,20 +108,65 @@ void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) {
}
}
void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet) {
if (
packet->video
|| packet->length < sizeof(janus_rtp_header)
|| !atomic_load(&client->transmit)
|| !atomic_load(&client->transmit_aplay)
) {
return;
}
const janus_rtp_header *const header = (janus_rtp_header*)packet->buffer;
if (header->type != US_RTP_OPUS_PAYLOAD) {
return;
}
const u16 seq = ntohs(header->seq_number);
if (
seq >= client->aplay_seq_next // In order or missing
|| (client->aplay_seq_next - seq) > 50 // In late sequence or sequence wrapped
) {
client->aplay_seq_next = seq + 1;
int size = 0;
const char *const data = janus_rtp_payload(packet->buffer, packet->length, &size);
if (data == NULL || size <= 0) {
return;
}
us_ring_s *const ring = client->aplay_enc_ring;
const int ri = us_ring_producer_acquire(ring, 0);
if (ri < 0) {
// US_JLOG_ERROR("client", "Session %p aplay ring is full", client->session);
return;
}
us_au_encoded_s *enc = ring->items[ri];
if ((uz)size < US_ARRAY_LEN(enc->data)) {
memcpy(enc->data, data, size);
enc->used = size;
} else {
enc->used = 0;
}
us_ring_producer_release(ring, ri);
}
}
static void *_video_thread(void *v_client) {
US_THREAD_SETTLE("us_c_video");
return _common_thread(v_client, true);
US_THREAD_SETTLE("us_cx_vid");
return _video_or_acap_thread(v_client, true);
}
static void *_audio_thread(void *v_client) {
US_THREAD_SETTLE("us_c_audio");
return _common_thread(v_client, false);
static void *_acap_thread(void *v_client) {
US_THREAD_SETTLE("us_cx_ac");
return _video_or_acap_thread(v_client, false);
}
static void *_common_thread(void *v_client, bool video) {
static void *_video_or_acap_thread(void *v_client, bool video) {
us_janus_client_s *const client = v_client;
us_ring_s *const ring = (video ? client->video_ring : client->audio_ring);
assert(ring != NULL); // Audio may be NULL
us_ring_s *const ring = (video ? client->video_ring : client->acap_ring);
assert(ring != NULL);
while (!atomic_load(&client->stop)) {
const int ri = us_ring_consumer_acquire(ring, 0.1);
@@ -119,7 +179,7 @@ static void *_common_thread(void *v_client, bool video) {
if (
atomic_load(&client->transmit)
&& (video || atomic_load(&client->transmit_audio))
&& (video || atomic_load(&client->transmit_acap))
) {
janus_plugin_rtp packet = {
.video = rtp.video,
@@ -156,3 +216,48 @@ static void *_common_thread(void *v_client, bool video) {
}
return NULL;
}
static void *_aplay_thread(void *v_client) {
US_THREAD_SETTLE("us_cx_ap");
us_janus_client_s *const client = v_client;
int err;
OpusDecoder *dec = opus_decoder_create(US_RTP_OPUS_HZ, US_RTP_OPUS_CH, &err);
assert(err == 0);
while (!atomic_load(&client->stop)) {
const int in_ri = us_ring_consumer_acquire(client->aplay_enc_ring, 0.1);
if (in_ri < 0) {
continue;
}
us_au_encoded_s *in = client->aplay_enc_ring->items[in_ri];
if (in->used == 0) {
us_ring_consumer_release(client->aplay_enc_ring, in_ri);
continue;
}
const int out_ri = us_ring_producer_acquire(client->aplay_pcm_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("aplay", "OPUS decoder queue is full");
us_ring_consumer_release(client->aplay_enc_ring, in_ri);
continue;
}
us_au_pcm_s *out = client->aplay_pcm_ring->items[out_ri];
const int frames = opus_decode(dec, in->data, in->used, out->data, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), 0);
us_ring_consumer_release(client->aplay_enc_ring, in_ri);
if (frames > 0) {
out->frames = frames;
} else {
out->frames = 0;
US_JLOG_PERROR_OPUS(frames, "aplay", "Fatal: Can't decode OPUS to PCM frame");
}
us_ring_producer_release(client->aplay_pcm_ring, out_ri);
}
opus_decoder_destroy(dec);
return NULL;
}

View File

@@ -38,15 +38,21 @@ typedef struct {
janus_callbacks *gw;
janus_plugin_session *session;
atomic_bool transmit;
atomic_bool transmit_audio;
atomic_bool transmit_acap;
atomic_bool transmit_aplay;
atomic_uint video_orient;
pthread_t video_tid;
pthread_t audio_tid;
pthread_t acap_tid;
pthread_t aplay_tid;
atomic_bool stop;
us_ring_s *video_ring;
us_ring_s *audio_ring;
us_ring_s *acap_ring;
us_ring_s *aplay_enc_ring;
u16 aplay_seq_next;
us_ring_s *aplay_pcm_ring;
US_LIST_DECLARE;
} us_janus_client_s;
@@ -56,3 +62,4 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
void us_janus_client_destroy(us_janus_client_s *client);
void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp);
void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet);

View File

@@ -55,18 +55,16 @@ us_config_s *us_config_init(const char *config_dir_path) {
}
janus_config_print(jcfg);
if (
(config->video_sink_name = _get_value(jcfg, "memsink", "object")) == NULL
&& (config->video_sink_name = _get_value(jcfg, "video", "sink")) == NULL
) {
US_JLOG_ERROR("config", "Missing config value: video.sink (ex. memsink.object)");
if ((config->video_sink_name = _get_value(jcfg, "video", "sink")) == NULL) {
US_JLOG_ERROR("config", "Missing config value: video.sink");
goto error;
}
if ((config->audio_dev_name = _get_value(jcfg, "audio", "device")) != NULL) {
if ((config->tc358743_dev_path = _get_value(jcfg, "audio", "tc358743")) == NULL) {
US_JLOG_INFO("config", "Missing config value: audio.tc358743");
if ((config->acap_dev_name = _get_value(jcfg, "acap", "device")) != NULL) {
if ((config->tc358743_dev_path = _get_value(jcfg, "acap", "tc358743")) == NULL) {
US_JLOG_INFO("config", "Missing config value: acap.tc358743");
goto error;
}
config->aplay_dev_name = _get_value(jcfg, "aplay", "device");
}
goto ok;
@@ -82,8 +80,9 @@ ok:
void us_config_destroy(us_config_s *config) {
US_DELETE(config->video_sink_name, free);
US_DELETE(config->audio_dev_name, free);
US_DELETE(config->acap_dev_name, free);
US_DELETE(config->tc358743_dev_path, free);
US_DELETE(config->aplay_dev_name, free);
free(config);
}

View File

@@ -26,8 +26,10 @@
typedef struct {
char *video_sink_name;
char *audio_dev_name;
char *acap_dev_name;
char *tc358743_dev_path;
char *aplay_dev_name;
} us_config_s;

View File

@@ -36,3 +36,8 @@
JANUS_LOG(LOG_ERR, "[%s/%-9s] " x_msg ": %s\n", US_PLUGIN_NAME, x_prefix, ##__VA_ARGS__, m_perror_str); \
free(m_perror_str); \
}
// We don't include alsa, speex and opus headers here
#define US_JLOG_PERROR_ALSA(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, snd_strerror(_err))
#define US_JLOG_PERROR_RES(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, speex_resampler_strerror(_err))
#define US_JLOG_PERROR_OPUS(_err, _prefix, _msg, ...) US_JLOG_ERROR(_prefix, _msg ": %s", ##__VA_ARGS__, opus_strerror(_err))

View File

@@ -25,6 +25,7 @@
#include <inttypes.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <sys/mman.h>
@@ -33,7 +34,9 @@
#include <pthread.h>
#include <jansson.h>
#include <janus/plugins/plugin.h>
#include <janus/rtp.h>
#include <janus/rtcp.h>
#include <alsa/asoundlib.h>
#include "uslibs/types.h"
#include "uslibs/const.h"
@@ -48,14 +51,14 @@
#include "const.h"
#include "logging.h"
#include "client.h"
#include "audio.h"
#include "au.h"
#include "acap.h"
#include "rtp.h"
#include "rtpv.h"
#include "rtpa.h"
#include "memsinkfd.h"
#include "config.h"
static us_config_s *_g_config = NULL;
static const useconds_t _g_watchers_polling = 100000;
@@ -63,37 +66,47 @@ static us_janus_client_s *_g_clients = NULL;
static janus_callbacks *_g_gw = NULL;
static us_ring_s *_g_video_ring = NULL;
static us_rtpv_s *_g_rtpv = NULL;
static us_rtpa_s *_g_rtpa = NULL;
static us_rtpa_s *_g_rtpa = NULL; // Also indicates "audio capture is available"
static pthread_t _g_video_rtp_tid;
static atomic_bool _g_video_rtp_tid_created = false;
static pthread_t _g_video_sink_tid;
static atomic_bool _g_video_sink_tid_created = false;
static pthread_t _g_audio_tid;
static atomic_bool _g_audio_tid_created = false;
static pthread_t _g_acap_tid;
static atomic_bool _g_acap_tid_created = false;
static pthread_t _g_aplay_tid;
static atomic_bool _g_aplay_tid_created = false;
static pthread_mutex_t _g_video_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t _g_audio_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t _g_acap_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t _g_aplay_lock = PTHREAD_MUTEX_INITIALIZER;
static atomic_bool _g_ready = false;
static atomic_bool _g_stop = false;
static atomic_bool _g_has_watchers = false;
static atomic_bool _g_has_listeners = false;
static atomic_bool _g_has_speakers = false;
static atomic_bool _g_key_required = false;
#define _LOCK_VIDEO US_MUTEX_LOCK(_g_video_lock)
#define _UNLOCK_VIDEO US_MUTEX_UNLOCK(_g_video_lock)
#define _LOCK_AUDIO US_MUTEX_LOCK(_g_audio_lock)
#define _UNLOCK_AUDIO US_MUTEX_UNLOCK(_g_audio_lock)
#define _LOCK_ACAP US_MUTEX_LOCK(_g_acap_lock)
#define _UNLOCK_ACAP US_MUTEX_UNLOCK(_g_acap_lock)
#define _LOCK_ALL { _LOCK_VIDEO; _LOCK_AUDIO; }
#define _UNLOCK_ALL { _UNLOCK_AUDIO; _UNLOCK_VIDEO; }
#define _LOCK_APLAY US_MUTEX_LOCK(_g_aplay_lock)
#define _UNLOCK_APLAY US_MUTEX_UNLOCK(_g_aplay_lock)
#define _LOCK_ALL { _LOCK_VIDEO; _LOCK_ACAP; _LOCK_APLAY; }
#define _UNLOCK_ALL { _UNLOCK_APLAY; _UNLOCK_ACAP; _UNLOCK_VIDEO; }
#define _READY atomic_load(&_g_ready)
#define _STOP atomic_load(&_g_stop)
#define _HAS_WATCHERS atomic_load(&_g_has_watchers)
#define _HAS_LISTENERS atomic_load(&_g_has_listeners)
#define _HAS_SPEAKERS atomic_load(&_g_has_speakers)
#define _IF_DISABLED(...) { if (!_READY || _STOP) { __VA_ARGS__ } }
janus_plugin *create(void);
@@ -101,7 +114,7 @@ janus_plugin *create(void);
static void *_video_rtp_thread(void *arg) {
(void)arg;
US_THREAD_SETTLE("us_video_rtp");
US_THREAD_SETTLE("us_p_rtpv");
atomic_store(&_g_video_rtp_tid_created, true);
while (!_STOP) {
@@ -120,7 +133,7 @@ static void *_video_rtp_thread(void *arg) {
static void *_video_sink_thread(void *arg) {
(void)arg;
US_THREAD_SETTLE("us_video_sink");
US_THREAD_SETTLE("us_p_vsink");
atomic_store(&_g_video_sink_tid_created, true);
us_frame_s *drop = us_frame_init();
@@ -198,15 +211,15 @@ static void *_video_sink_thread(void *arg) {
return NULL;
}
static int _check_tc358743_audio(uint *audio_hz) {
static int _check_tc358743_acap(uint *hz) {
int fd;
if ((fd = open(_g_config->tc358743_dev_path, O_RDWR)) < 0) {
US_JLOG_PERROR("audio", "Can't open TC358743 V4L2 device");
US_JLOG_PERROR("acap", "Can't open TC358743 V4L2 device");
return -1;
}
const int checked = us_tc358743_xioctl_get_audio_hz(fd, audio_hz);
const int checked = us_tc358743_xioctl_get_audio_hz(fd, hz);
if (checked < 0) {
US_JLOG_PERROR("audio", "Can't check TC358743 audio state (%d)", checked);
US_JLOG_PERROR("acap", "Can't check TC358743 audio state (%d)", checked);
close(fd);
return -1;
}
@@ -214,13 +227,14 @@ static int _check_tc358743_audio(uint *audio_hz) {
return 0;
}
static void *_audio_thread(void *arg) {
static void *_acap_thread(void *arg) {
(void)arg;
US_THREAD_SETTLE("us_audio");
atomic_store(&_g_audio_tid_created, true);
US_THREAD_SETTLE("us_p_ac");
atomic_store(&_g_acap_tid_created, true);
assert(_g_config->audio_dev_name != NULL);
assert(_g_config->acap_dev_name != NULL);
assert(_g_config->tc358743_dev_path != NULL);
assert(_g_rtpa != NULL);
int once = 0;
@@ -230,53 +244,166 @@ static void *_audio_thread(void *arg) {
continue;
}
uint audio_hz = 0;
us_audio_s *audio = NULL;
uint hz = 0;
us_acap_s *acap = NULL;
if (_check_tc358743_audio(&audio_hz) < 0) {
goto close_audio;
if (_check_tc358743_acap(&hz) < 0) {
goto close_acap;
}
if (audio_hz == 0) {
US_ONCE({ US_JLOG_INFO("audio", "No audio presented from the host"); });
goto close_audio;
if (hz == 0) {
US_ONCE({ US_JLOG_INFO("acap", "No audio presented from the host"); });
goto close_acap;
}
US_ONCE({ US_JLOG_INFO("audio", "Detected host audio"); });
if ((audio = us_audio_init(_g_config->audio_dev_name, audio_hz)) == NULL) {
goto close_audio;
US_ONCE({ US_JLOG_INFO("acap", "Detected host audio"); });
if ((acap = us_acap_init(_g_config->acap_dev_name, hz)) == NULL) {
goto close_acap;
}
once = 0;
while (!_STOP && _HAS_WATCHERS && _HAS_LISTENERS) {
if (_check_tc358743_audio(&audio_hz) < 0 || audio->pcm_hz != audio_hz) {
goto close_audio;
if (_check_tc358743_acap(&hz) < 0 || acap->pcm_hz != hz) {
goto close_acap;
}
uz size = US_RTP_DATAGRAM_SIZE - US_RTP_HEADER_SIZE;
u8 data[size];
u64 pts;
const int result = us_audio_get_encoded(audio, data, &size, &pts);
const int result = us_acap_get_encoded(acap, data, &size, &pts);
if (result == 0) {
_LOCK_AUDIO;
_LOCK_ACAP;
us_rtpa_wrap(_g_rtpa, data, size, pts);
_UNLOCK_AUDIO;
_UNLOCK_ACAP;
} else if (result == -1) {
goto close_audio;
goto close_acap;
}
}
close_audio:
US_DELETE(audio, us_audio_destroy);
close_acap:
US_DELETE(acap, us_acap_destroy);
sleep(1); // error_delay
}
return NULL;
}
static void *_aplay_thread(void *arg) {
(void)arg;
US_THREAD_SETTLE("us_p_ap");
atomic_store(&_g_aplay_tid_created, true);
assert(_g_config->aplay_dev_name != NULL);
int once = 0;
while (!_STOP) {
snd_pcm_t *dev = NULL;
bool skip = true;
while (!_STOP) {
usleep((US_AU_FRAME_MS / 4) * 1000);
us_au_pcm_s mixed = {0};
_LOCK_APLAY;
US_LIST_ITERATE(_g_clients, client, {
us_au_pcm_s last = {0};
do {
const int ri = us_ring_consumer_acquire(client->aplay_pcm_ring, 0);
if (ri >= 0) {
const us_au_pcm_s *pcm = client->aplay_pcm_ring->items[ri];
memcpy(&last, pcm, sizeof(us_au_pcm_s));
us_ring_consumer_release(client->aplay_pcm_ring, ri);
} else {
break;
}
} while (skip && !_STOP);
us_au_pcm_mix(&mixed, &last);
// US_JLOG_INFO("++++++", "mixed %p", client);
});
_UNLOCK_APLAY;
// US_JLOG_INFO("++++++", "--------------");
if (skip) {
static uint skipped = 0;
if (skipped < (1000 / (US_AU_FRAME_MS / 4))) {
++skipped;
continue;
} else {
skipped = 0;
}
}
if (!_HAS_WATCHERS || !_HAS_LISTENERS || !_HAS_SPEAKERS) {
goto close_aplay;
}
if (dev == NULL) {
int err = snd_pcm_open(&dev, _g_config->aplay_dev_name, SND_PCM_STREAM_PLAYBACK, 0);
if (err < 0) {
US_ONCE({ US_JLOG_PERROR_ALSA(err, "aplay", "Can't open PCM playback"); });
goto close_aplay;
}
err = snd_pcm_set_params(dev, SND_PCM_FORMAT_S16_LE, SND_PCM_ACCESS_RW_INTERLEAVED,
US_RTP_OPUS_CH, US_RTP_OPUS_HZ, 1 /* soft resample */, 50000 /* 50000 = 0.05sec */
);
if (err < 0) {
US_ONCE({ US_JLOG_PERROR_ALSA(err, "aplay", "Can't configure PCM playback"); });
goto close_aplay;
}
US_JLOG_INFO("aplay", "Playback opened, playing ...");
once = 0;
}
if (dev != NULL && mixed.frames > 0) {
snd_pcm_sframes_t frames = snd_pcm_writei(dev, mixed.data, mixed.frames);
if (frames < 0) {
frames = snd_pcm_recover(dev, frames, 1);
} else {
if (once != 0) {
US_JLOG_INFO("aplay", "Playing resumed (snd_pcm_writei) ...");
}
once = 0;
skip = false;
}
if (frames < 0) {
US_ONCE({ US_JLOG_PERROR_ALSA(frames, "aplay", "Can't play to PCM playback"); });
if (frames == -ENODEV) {
goto close_aplay;
}
skip = true;
} else {
if (once != 0) {
US_JLOG_INFO("aplay", "Playing resumed (snd_pcm_recover) ...");
}
once = 0;
skip = false;
}
}
}
close_aplay:
if (dev != NULL) {
US_DELETE(dev, snd_pcm_close);
US_JLOG_INFO("aplay", "Playback closed");
}
}
return NULL;
}
static void _relay_rtp_clients(const us_rtp_s *rtp) {
US_LIST_ITERATE(_g_clients, client, {
us_janus_client_send(client, rtp);
});
}
static void _alsa_quiet(const char *file, int line, const char *func, int err, const char *fmt, ...) {
(void)file;
(void)line;
(void)func;
(void)err;
(void)fmt;
}
static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
// https://groups.google.com/g/meetecho-janus/c/xoWIQfaoJm8
// sysctl -w net.core.rmem_default=500000
@@ -290,11 +417,16 @@ static int _plugin_init(janus_callbacks *gw, const char *config_dir_path) {
}
_g_gw = gw;
snd_lib_error_set_handler(_alsa_quiet);
US_RING_INIT_WITH_ITEMS(_g_video_ring, 64, us_frame_init);
_g_rtpv = us_rtpv_init(_relay_rtp_clients);
if (_g_config->audio_dev_name != NULL && us_audio_probe(_g_config->audio_dev_name)) {
if (_g_config->acap_dev_name != NULL && us_acap_probe(_g_config->acap_dev_name)) {
_g_rtpa = us_rtpa_init(_relay_rtp_clients);
US_THREAD_CREATE(_g_audio_tid, _audio_thread, NULL);
US_THREAD_CREATE(_g_acap_tid, _acap_thread, NULL);
if (_g_config->aplay_dev_name != NULL) {
US_THREAD_CREATE(_g_aplay_tid, _aplay_thread, NULL);
}
}
US_THREAD_CREATE(_g_video_rtp_tid, _video_rtp_thread, NULL);
US_THREAD_CREATE(_g_video_sink_tid, _video_sink_thread, NULL);
@@ -310,7 +442,8 @@ static void _plugin_destroy(void) {
# define JOIN(_tid) { if (atomic_load(&_tid##_created)) { US_THREAD_JOIN(_tid); } }
JOIN(_g_video_sink_tid);
JOIN(_g_video_rtp_tid);
JOIN(_g_audio_tid);
JOIN(_g_acap_tid);
JOIN(_g_aplay_tid);
# undef JOIN
US_LIST_ITERATE(_g_clients, client, {
@@ -325,8 +458,6 @@ static void _plugin_destroy(void) {
US_DELETE(_g_config, us_config_destroy);
}
#define _IF_DISABLED(...) { if (!_READY || _STOP) { __VA_ARGS__ } }
static void _plugin_create_session(janus_plugin_session *session, int *err) {
_IF_DISABLED({ *err = -1; return; });
_LOCK_ALL;
@@ -343,6 +474,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
bool found = false;
bool has_watchers = false;
bool has_listeners = false;
bool has_speakers = false;
US_LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
US_JLOG_INFO("main", "Removing session %p ...", session);
@@ -351,7 +483,8 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
found = true;
} else {
has_watchers = (has_watchers || atomic_load(&client->transmit));
has_listeners = (has_listeners || atomic_load(&client->transmit_audio));
has_listeners = (has_listeners || atomic_load(&client->transmit_acap));
has_speakers = (has_speakers || atomic_load(&client->transmit_aplay));
}
});
if (!found) {
@@ -360,6 +493,7 @@ static void _plugin_destroy_session(janus_plugin_session* session, int *err) {
}
atomic_store(&_g_has_watchers, has_watchers);
atomic_store(&_g_has_listeners, has_listeners);
atomic_store(&_g_has_speakers, has_speakers);
_UNLOCK_ALL;
}
@@ -398,25 +532,19 @@ static void _set_transmit(janus_plugin_session *session, const char *msg, bool t
_UNLOCK_ALL;
}
#undef _IF_DISABLED
static void _plugin_setup_media(janus_plugin_session *session) { _set_transmit(session, "Unmuted", true); }
static void _plugin_hangup_media(janus_plugin_session *session) { _set_transmit(session, "Muted", false); }
static struct janus_plugin_result *_plugin_handle_message(
janus_plugin_session *session, char *transaction, json_t *msg, json_t *jsep) {
assert(transaction != NULL);
# define FREE_MSG_JSEP { \
US_DELETE(msg, json_decref); \
US_DELETE(jsep, json_decref); \
}
janus_plugin_result_type result_type = JANUS_PLUGIN_OK;
char *result_msg = NULL;
if (session == NULL || msg == NULL) {
free(transaction);
FREE_MSG_JSEP;
return janus_plugin_result_new(JANUS_PLUGIN_ERROR, (msg ? "No session" : "No message"), NULL);
result_type = JANUS_PLUGIN_ERROR;
result_msg = (msg ? "No session" : "No message");
goto done;
}
# define PUSH_ERROR(x_error, x_reason) { \
@@ -425,20 +553,20 @@ static struct janus_plugin_result *_plugin_handle_message(
json_object_set_new(m_event, "ustreamer", json_string("event")); \
json_object_set_new(m_event, "error_code", json_integer(x_error)); \
json_object_set_new(m_event, "error", json_string(x_reason)); \
_g_gw->push_event(session, create(), transaction, m_event, NULL); \
_g_gw->push_event(session, create(), NULL, m_event, NULL); \
json_decref(m_event); \
}
json_t *const request = json_object_get(msg, "request");
if (request == NULL) {
PUSH_ERROR(400, "Request missing");
goto ok_wait;
goto done;
}
const char *const request_str = json_string_value(request);
if (request_str == NULL) {
PUSH_ERROR(400, "Request not a string");
goto ok_wait;
goto done;
}
// US_JLOG_INFO("main", "Message: %s", request_str);
@@ -448,10 +576,10 @@ static struct janus_plugin_result *_plugin_handle_message(
json_t *const m_result = json_object(); \
json_object_set_new(m_result, "status", json_string(x_status)); \
if (x_payload != NULL) { \
json_object_set_new(m_result, x_status, x_payload); \
json_object_set(m_result, x_status, x_payload); \
} \
json_object_set_new(m_event, "result", m_result); \
_g_gw->push_event(session, create(), transaction, m_event, x_jsep); \
_g_gw->push_event(session, create(), NULL, m_event, x_jsep); \
json_decref(m_event); \
}
@@ -462,15 +590,22 @@ static struct janus_plugin_result *_plugin_handle_message(
PUSH_STATUS("stopped", NULL, NULL);
} else if (!strcmp(request_str, "watch")) {
bool with_audio = false;
uint video_orient = 0;
bool with_acap = false;
bool with_aplay = false;
{
json_t *const params = json_object_get(msg, "params");
if (params != NULL) {
{
json_t *const obj = json_object_get(params, "audio");
if (obj != NULL && json_is_boolean(obj)) {
with_audio = (_g_rtpa != NULL && json_boolean_value(obj));
with_acap = (_g_rtpa != NULL && json_boolean_value(obj));
}
}
{
json_t *const obj = json_object_get(params, "mic");
if (obj != NULL && json_is_boolean(obj)) {
with_aplay = (_g_config->aplay_dev_name != NULL && with_acap && json_boolean_value(obj));
}
}
{
@@ -489,7 +624,7 @@ static struct janus_plugin_result *_plugin_handle_message(
{
char *sdp;
char *const video_sdp = us_rtpv_make_sdp(_g_rtpv);
char *const audio_sdp = (with_audio ? us_rtpa_make_sdp(_g_rtpa) : us_strdup(""));
char *const audio_sdp = (with_acap ? us_rtpa_make_sdp(_g_rtpa, with_aplay) : us_strdup(""));
US_ASPRINTF(sdp,
"v=0" RN
"o=- %" PRIu64 " 1 IN IP4 0.0.0.0" RN
@@ -518,19 +653,27 @@ static struct janus_plugin_result *_plugin_handle_message(
{
_LOCK_ALL;
bool has_listeners = false;
bool has_speakers = false;
US_LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
atomic_store(&client->transmit_audio, with_audio);
atomic_store(&client->transmit_acap, with_acap);
atomic_store(&client->transmit_aplay, with_aplay);
atomic_store(&client->video_orient, video_orient);
}
has_listeners = (has_listeners || atomic_load(&client->transmit_audio));
has_listeners = (has_listeners || atomic_load(&client->transmit_acap));
has_speakers = (has_speakers || atomic_load(&client->transmit_aplay));
});
atomic_store(&_g_has_listeners, has_listeners);
atomic_store(&_g_has_speakers, has_speakers);
_UNLOCK_ALL;
}
} else if (!strcmp(request_str, "features")) {
json_t *const features = json_pack("{sb}", "audio", (_g_rtpa != NULL));
json_t *const features = json_pack(
"{sbsb}",
"audio", (_g_rtpa != NULL),
"mic", (_g_rtpa != NULL && _g_config->aplay_dev_name != NULL)
);
PUSH_STATUS("features", features, NULL);
json_decref(features);
@@ -542,19 +685,40 @@ static struct janus_plugin_result *_plugin_handle_message(
PUSH_ERROR(405, "Not implemented");
}
ok_wait:
FREE_MSG_JSEP;
return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL);
done:
US_DELETE(transaction, free);
US_DELETE(msg, json_decref);
US_DELETE(jsep, json_decref);
return janus_plugin_result_new(
result_type, result_msg,
(result_type == JANUS_PLUGIN_OK ? json_pack("{sb}", "ok", 1) : NULL));
# undef PUSH_STATUS
# undef PUSH_ERROR
# undef FREE_MSG_JSEP
}
static void _plugin_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet) {
(void)handle;
(void)packet;
if (packet->video && janus_rtcp_has_pli(packet->buffer, packet->length)) {
static void _plugin_incoming_rtp(janus_plugin_session *session, janus_plugin_rtp *packet) {
_IF_DISABLED({ return; });
if (session == NULL || packet == NULL || packet->video) {
return; // Accept only valid audio
}
_LOCK_APLAY;
US_LIST_ITERATE(_g_clients, client, {
if (client->session == session) {
us_janus_client_recv(client, packet);
break;
}
});
_UNLOCK_APLAY;
}
static void _plugin_incoming_rtcp(janus_plugin_session *session, janus_plugin_rtcp *packet) {
_IF_DISABLED({ return; });
if (session == NULL || packet == NULL || !packet->video) {
return; // Accept only valid video
}
if (janus_rtcp_has_pli(packet->buffer, packet->length)) {
// US_JLOG_INFO("main", "Got video PLI");
atomic_store(&_g_key_required, true);
}
@@ -595,6 +759,7 @@ janus_plugin *create(void) {
.get_author = _plugin_get_author,
.get_package = _plugin_get_package,
.incoming_rtp = _plugin_incoming_rtp,
.incoming_rtcp = _plugin_incoming_rtcp,
);
# pragma GCC diagnostic pop

View File

@@ -28,6 +28,13 @@
// https://stackoverflow.com/questions/47635545/why-webrtc-chose-rtp-max-packet-size-to-1200-bytes
#define US_RTP_DATAGRAM_SIZE 1200
#define US_RTP_HEADER_SIZE 12
#define US_RTP_PAYLOAD_SIZE (US_RTP_DATAGRAM_SIZE - US_RTP_HEADER_SIZE)
#define US_RTP_H264_PAYLOAD 96
#define US_RTP_OPUS_PAYLOAD 111
#define US_RTP_OPUS_HZ 48000
#define US_RTP_OPUS_CH 2
typedef struct {

View File

@@ -33,7 +33,7 @@ us_rtpa_s *us_rtpa_init(us_rtp_callback_f callback) {
us_rtpa_s *rtpa;
US_CALLOC(rtpa, 1);
rtpa->rtp = us_rtp_init();
us_rtp_assign(rtpa->rtp, 111, false);
us_rtp_assign(rtpa->rtp, US_RTP_OPUS_PAYLOAD, false);
rtpa->callback = callback;
return rtpa;
}
@@ -43,21 +43,24 @@ void us_rtpa_destroy(us_rtpa_s *rtpa) {
free(rtpa);
}
char *us_rtpa_make_sdp(us_rtpa_s *rtpa) {
char *us_rtpa_make_sdp(us_rtpa_s *rtpa, bool mic) {
const uint pl = rtpa->rtp->payload;
char *sdp;
US_ASPRINTF(sdp,
"m=audio 1 RTP/SAVPF %u" RN
"c=IN IP4 0.0.0.0" RN
"a=rtpmap:%u OPUS/48000/2" RN
// "a=fmtp:%u useinbandfec=1" RN
"a=rtpmap:%u OPUS/%u/%u" RN
"a=fmtp:%u sprop-stereo=1" RN // useinbandfec=1
"a=rtcp-fb:%u nack" RN
"a=rtcp-fb:%u nack pli" RN
"a=rtcp-fb:%u goog-remb" RN
"a=ssrc:%" PRIu32 " cname:ustreamer" RN
"a=sendonly" RN,
pl, pl, pl, pl, pl, // pl,
rtpa->rtp->ssrc
"a=%s" RN,
pl, pl,
US_RTP_OPUS_HZ, US_RTP_OPUS_CH,
pl, pl, pl, pl,
rtpa->rtp->ssrc,
(mic ? "sendrecv" : "sendonly")
);
return sdp;
}

View File

@@ -36,5 +36,5 @@ typedef struct {
us_rtpa_s *us_rtpa_init(us_rtp_callback_f callback);
void us_rtpa_destroy(us_rtpa_s *rtpa);
char *us_rtpa_make_sdp(us_rtpa_s *rtpa);
char *us_rtpa_make_sdp(us_rtpa_s *rtpa, bool mic);
void us_rtpa_wrap(us_rtpa_s *rtpa, const u8 *data, uz size, u32 pts);

View File

@@ -45,7 +45,7 @@ us_rtpv_s *us_rtpv_init(us_rtp_callback_f callback) {
us_rtpv_s *rtpv;
US_CALLOC(rtpv, 1);
rtpv->rtp = us_rtp_init();
us_rtp_assign(rtpv->rtp, 96, true);
us_rtp_assign(rtpv->rtp, US_RTP_H264_PAYLOAD, true);
rtpv->callback = callback;
return rtpv;
}

View File

@@ -3,7 +3,7 @@ envlist = cppcheck, flake8, pylint, mypy, vulture, htmlhint
skipsdist = true
[testenv]
basepython = python3.11
basepython = python3.13
changedir = /src
[testenv:cppcheck]
@@ -13,8 +13,10 @@ commands = cppcheck \
--std=c17 \
--error-exitcode=1 \
--quiet \
--check-level=exhaustive \
--enable=warning,portability,performance,style \
--suppress=assignmentInAssert \
--suppress=assertWithSideEffect \
--suppress=variableScope \
--inline-suppr \
--library=python \
@@ -25,7 +27,7 @@ commands = cppcheck \
allowlist_externals = bash
commands = bash -c 'flake8 --config=linters/flake8.ini tools/*.py' python/*.py
deps =
flake8==5.0.4
flake8
flake8-quotes
[testenv:pylint]
@@ -33,6 +35,7 @@ allowlist_externals = bash
commands = bash -c 'pylint --rcfile=linters/pylint.ini --output-format=colorized --reports=no tools/*.py python/*.py'
deps =
pylint
setuptools
[testenv:mypy]
allowlist_externals = bash

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer-dump.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER-DUMP 1 "version 6.10" "January 2021"
.TH USTREAMER-DUMP 1 "version 6.25" "January 2021"
.SH NAME
ustreamer-dump \- Dump uStreamer's memory sink to file

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER 1 "version 6.10" "November 2020"
.TH USTREAMER 1 "version 6.25" "November 2020"
.SH NAME
ustreamer \- stream MJPEG video from any V4L2 device to the network
@@ -68,6 +68,9 @@ Desired FPS. Default: maximum possible.
.BR \-z\ \fIN ", " \-\-min\-frame\-size\ \fIN
Drop frames smaller then this limit. Useful if the device produces small\-sized garbage frames. Default: 128 bytes.
.TP
.BR \-T ", " \-\-allow\-truncated\-frames
Allows to handle truncated frames. Useful if the device produces incorrect but still acceptable frames. Default: disabled.
.TP
.BR \-n ", " \-\-persistent
Suppress repetitive signal source errors. Default: disabled.
.TP
@@ -96,8 +99,6 @@ HW ─ Use pre-encoded MJPEG frames directly from camera hardware.
M2M-VIDEO ─ GPU-accelerated MJPEG encoding.
M2M-IMAGE ─ GPU-accelerated JPEG encoding.
NOOP ─ Don't compress MJPEG stream (do nothing).
.TP
.BR \-g\ \fIWxH,... ", " \-\-glitched\-resolutions\ \fIWxH,...
It doesn't do anything. Still here for compatibility.

View File

@@ -3,14 +3,14 @@
pkgname=ustreamer
pkgver=6.10
pkgver=6.25
pkgrel=1
pkgdesc="Lightweight and fast MJPEG-HTTP streamer"
url="https://github.com/pikvm/ustreamer"
license=(GPL)
arch=(i686 x86_64 armv6h armv7h aarch64)
depends=(libjpeg libevent libbsd libgpiod systemd)
makedepends=(gcc make systemd)
makedepends=(gcc make pkgconf systemd)
source=(${pkgname}::"git+https://github.com/pikvm/ustreamer#commit=v${pkgver}")
md5sums=(SKIP)

View File

@@ -24,7 +24,7 @@ RUN apk add --no-cache \
WORKDIR /ustreamer
COPY --from=build /build/ustreamer/src/ustreamer.bin ustreamer
RUN wget https://raw.githubusercontent.com/pikvm/kvmd/master/configs/kvmd/edid/v3-hdmi.hex -O /edid.hex
RUN wget https://raw.githubusercontent.com/pikvm/kvmd/master/configs/kvmd/edid/v2.hex -O /edid.hex
COPY pkg/docker/entry.sh /
EXPOSE 8080

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=6.10
PKG_VERSION:=6.25
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -17,7 +17,7 @@ def _find_sources(suffix: str) -> list[str]:
if __name__ == "__main__":
setup(
name="ustreamer",
version="6.10",
version="6.25",
description="uStreamer tools",
author="Maxim Devaev",
author_email="mdevaev@gmail.com",

View File

@@ -111,9 +111,9 @@ int main(int argc, char *argv[]) {
US_LOGGING_INIT;
US_THREAD_RENAME("main");
char *sink_name = NULL;
const char *sink_name = NULL;
unsigned sink_timeout = 1;
char *output_path = NULL;
const char *output_path = NULL;
bool output_json = false;
long long count = 0;
long double interval = 0;

View File

@@ -83,9 +83,9 @@ static const struct {
};
static int _capture_wait_buffer(us_capture_s *cap);
static int _capture_consume_event(us_capture_s *cap);
static int _capture_consume_event(const us_capture_s *cap);
static void _v4l2_buffer_copy(const struct v4l2_buffer *src, struct v4l2_buffer *dest);
static bool _capture_is_buffer_valid(us_capture_s *cap, const struct v4l2_buffer *buf, const u8 *data);
static bool _capture_is_buffer_valid(const us_capture_s *cap, const struct v4l2_buffer *buf, const u8 *data);
static int _capture_open_check_cap(us_capture_s *cap);
static int _capture_open_dv_timings(us_capture_s *cap, bool apply);
static int _capture_open_format(us_capture_s *cap, bool first);
@@ -98,12 +98,12 @@ static int _capture_open_queue_buffers(us_capture_s *cap);
static int _capture_open_export_to_dma(us_capture_s *cap);
static int _capture_apply_resolution(us_capture_s *cap, uint width, uint height, float hz);
static void _capture_apply_controls(us_capture_s *cap);
static void _capture_apply_controls(const us_capture_s *cap);
static int _capture_query_control(
us_capture_s *cap, struct v4l2_queryctrl *query,
const us_capture_s *cap, struct v4l2_queryctrl *query,
const char *name, uint cid, bool quiet);
static void _capture_set_control(
us_capture_s *cap, const struct v4l2_queryctrl *query,
const us_capture_s *cap, const struct v4l2_queryctrl *query,
const char *name, uint cid, int value, bool quiet);
static const char *_format_to_string_nullable(uint format);
@@ -421,7 +421,7 @@ int us_capture_hwbuf_grab(us_capture_s *cap, us_capture_hwbuf_s **hw) {
return buf.index;
}
int us_capture_hwbuf_release(us_capture_s *cap, us_capture_hwbuf_s *hw) {
int us_capture_hwbuf_release(const us_capture_s *cap, us_capture_hwbuf_s *hw) {
assert(atomic_load(&hw->refs) == 0);
const uint index = hw->buf.index;
_LOG_DEBUG("Releasing HW buffer=%u ...", index);
@@ -486,7 +486,7 @@ int _capture_wait_buffer(us_capture_s *cap) {
return 0;
}
static int _capture_consume_event(us_capture_s *cap) {
static int _capture_consume_event(const us_capture_s *cap) {
struct v4l2_event event;
if (us_xioctl(cap->run->fd, VIDIOC_DQEVENT, &event) < 0) {
_LOG_PERROR("Can't consume V4L2 event");
@@ -513,7 +513,7 @@ static void _v4l2_buffer_copy(const struct v4l2_buffer *src, struct v4l2_buffer
}
}
bool _capture_is_buffer_valid(us_capture_s *cap, const struct v4l2_buffer *buf, const u8 *data) {
bool _capture_is_buffer_valid(const us_capture_s *cap, const struct v4l2_buffer *buf, const u8 *data) {
// Workaround for broken, corrupted frames:
// Under low light conditions corrupted frames may get captured.
// The good thing is such frames are quite small compared to the regular frames.
@@ -544,8 +544,11 @@ bool _capture_is_buffer_valid(us_capture_s *cap, const struct v4l2_buffer *buf,
const u8 *const eoi_ptr = end_ptr - 2;
const u16 eoi_marker = (((u16)(eoi_ptr[0]) << 8) | eoi_ptr[1]);
if (eoi_marker != 0xFFD9 && eoi_marker != 0xD900 && eoi_marker != 0x0000) {
_LOG_DEBUG("Discarding truncated JPEG frame: eoi_marker=0x%04x, bytesused=%u", eoi_marker, buf->bytesused);
return false;
if (!cap->allow_truncated_frames) {
_LOG_DEBUG("Discarding truncated JPEG frame: eoi_marker=0x%04x, bytesused=%u", eoi_marker, buf->bytesused);
return false;
}
_LOG_DEBUG("Got truncated JPEG frame: eoi_marker=0x%04x, bytesused=%u", eoi_marker, buf->bytesused);
}
}
@@ -737,7 +740,7 @@ static int _capture_open_format(us_capture_s *cap, bool first) {
_format_to_string_supported(cap->format),
_format_to_string_supported(FMT(pixelformat)));
char *format_str;
const char *format_str;
if ((format_str = (char*)_format_to_string_nullable(FMT(pixelformat))) != NULL) {
_LOG_INFO("Falling back to format=%s", format_str);
} else {
@@ -1037,7 +1040,7 @@ static int _capture_apply_resolution(us_capture_s *cap, uint width, uint height,
return 0;
}
static void _capture_apply_controls(us_capture_s *cap) {
static void _capture_apply_controls(const us_capture_s *cap) {
# define SET_CID_VALUE(x_cid, x_field, x_value, x_quiet) { \
struct v4l2_queryctrl m_query; \
if (_capture_query_control(cap, &m_query, #x_field, x_cid, x_quiet) == 0) { \
@@ -1094,7 +1097,7 @@ static void _capture_apply_controls(us_capture_s *cap) {
}
static int _capture_query_control(
us_capture_s *cap, struct v4l2_queryctrl *query,
const us_capture_s *cap, struct v4l2_queryctrl *query,
const char *name, uint cid, bool quiet) {
// cppcheck-suppress redundantPointerOp
@@ -1111,7 +1114,7 @@ static int _capture_query_control(
}
static void _capture_set_control(
us_capture_s *cap, const struct v4l2_queryctrl *query,
const us_capture_s *cap, const struct v4l2_queryctrl *query,
const char *name, uint cid, int value, bool quiet) {
if (value < query->minimum || value > query->maximum || value % query->step != 0) {

View File

@@ -115,6 +115,7 @@ typedef struct {
bool dma_required;
uint desired_fps;
uz min_frame_size;
bool allow_truncated_frames;
bool persistent;
uint timeout;
us_controls_s ctl;
@@ -133,7 +134,7 @@ int us_capture_open(us_capture_s *cap);
void us_capture_close(us_capture_s *cap);
int us_capture_hwbuf_grab(us_capture_s *cap, us_capture_hwbuf_s **hw);
int us_capture_hwbuf_release(us_capture_s *cap, us_capture_hwbuf_s *hw);
int us_capture_hwbuf_release(const us_capture_s *cap, us_capture_hwbuf_s *hw);
void us_capture_hwbuf_incref(us_capture_hwbuf_s *hw);
void us_capture_hwbuf_decref(us_capture_hwbuf_s *hw);

View File

@@ -26,7 +26,7 @@
#define US_VERSION_MAJOR 6
#define US_VERSION_MINOR 10
#define US_VERSION_MINOR 25
#define US_MAKE_VERSION2(_major, _minor) #_major "." #_minor
#define US_MAKE_VERSION1(_major, _minor) US_MAKE_VERSION2(_major, _minor)

View File

@@ -614,7 +614,7 @@ static int _drm_find_sink(us_drm_s *drm, uint width, uint height, float hz) {
goto done;
}
drmModeModeInfo *best;
const drmModeModeInfo *best;
if ((best = _find_best_mode(conn, width, height, hz)) == NULL) {
_LOG_ERROR("Can't find any appropriate display modes");
drmModeFreeConnector(conn);

View File

@@ -46,6 +46,7 @@
x_item->prev = m_last; \
m_last->next = x_item; \
} \
x_item->next = NULL; \
}
#define US_LIST_APPEND_C(x_first, x_item, x_count) { \
@@ -64,6 +65,8 @@
__typeof__(x_first) m_next = x_item->next; \
m_next->prev = x_item->prev; \
} \
x_item->prev = NULL; \
x_item->next = NULL; \
}
#define US_LIST_REMOVE_C(x_first, x_item, x_count) { \

View File

@@ -113,7 +113,9 @@ INLINE void us_thread_get_name(char *name) { // Always required for logging
#if defined(__linux__)
const pid_t tid = syscall(SYS_gettid);
#elif defined(__FreeBSD__)
const pid_t tid = syscall(SYS_thr_self);
long id;
assert(!syscall(SYS_thr_self, &id));
const pid_t tid = id;
#elif defined(__OpenBSD__)
const pid_t tid = syscall(SYS_getthrid);
#elif defined(__NetBSD__)

View File

@@ -25,6 +25,8 @@
#include <stdbool.h>
#include <stdint.h>
#include <sys/types.h>
typedef long long sll;
typedef ssize_t sz;

View File

@@ -22,6 +22,26 @@
#include "encoder.h"
#include <stdlib.h>
#include <strings.h>
#include <assert.h>
#include <pthread.h>
#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/array.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/frame.h"
#include "../libs/capture.h"
#include "workers.h"
#include "m2m.h"
#include "encoders/cpu/encoder.h"
#include "encoders/hw/encoder.h"
static const struct {
const char *name;
@@ -34,7 +54,7 @@ static const struct {
{"M2M-MJPEG", US_ENCODER_TYPE_M2M_VIDEO},
{"M2M-JPEG", US_ENCODER_TYPE_M2M_IMAGE},
{"OMX", US_ENCODER_TYPE_M2M_IMAGE},
{"NOOP", US_ENCODER_TYPE_NOOP},
{"NOOP", US_ENCODER_TYPE_CPU},
};
@@ -43,9 +63,6 @@ static void _worker_job_destroy(void *v_job);
static bool _worker_run_job(us_worker_s *wr);
#define _ER(x_next) enc->run->x_next
us_encoder_s *us_encoder_init(void) {
us_encoder_runtime_s *run;
US_CALLOC(run, 1);
@@ -62,14 +79,15 @@ us_encoder_s *us_encoder_init(void) {
}
void us_encoder_destroy(us_encoder_s *enc) {
if (_ER(m2ms) != NULL) {
for (unsigned index = 0; index < _ER(n_m2ms); ++index) {
US_DELETE(_ER(m2ms[index]), us_m2m_encoder_destroy)
us_encoder_runtime_s *const run = enc->run;
if (run->m2ms != NULL) {
for (uint index = 0; index < run->n_m2ms; ++index) {
US_DELETE(run->m2ms[index], us_m2m_encoder_destroy);
}
free(_ER(m2ms));
free(run->m2ms);
}
US_MUTEX_DESTROY(_ER(mutex));
free(enc->run);
US_MUTEX_DESTROY(run->mutex);
free(run);
free(enc);
}
@@ -92,85 +110,69 @@ const char *us_encoder_type_to_string(us_encoder_type_e type) {
}
void us_encoder_open(us_encoder_s *enc, us_capture_s *cap) {
assert(enc->run->pool == NULL);
us_encoder_runtime_s *const run = enc->run;
us_capture_runtime_s *const cr = cap->run;
# define DR(x_next) cap->run->x_next
assert(run->pool == NULL);
us_encoder_type_e type = (_ER(cpu_forced) ? US_ENCODER_TYPE_CPU : enc->type);
unsigned quality = cap->jpeg_quality;
unsigned n_workers = US_MIN(enc->n_workers, DR(n_bufs));
bool cpu_forced = false;
us_encoder_type_e type = enc->type;
uint quality = cap->jpeg_quality;
uint n_workers = US_MIN(enc->n_workers, cr->n_bufs);
if (us_is_jpeg(DR(format)) && type != US_ENCODER_TYPE_HW) {
if (us_is_jpeg(cr->format) && type != US_ENCODER_TYPE_HW) {
US_LOG_INFO("Switching to HW encoder: the input is (M)JPEG ...");
type = US_ENCODER_TYPE_HW;
}
if (type == US_ENCODER_TYPE_HW) {
if (!us_is_jpeg(DR(format))) {
if (us_is_jpeg(cr->format)) {
quality = cr->jpeg_quality;
n_workers = 1;
} else {
US_LOG_INFO("Switching to CPU encoder: the input format is not (M)JPEG ...");
goto use_cpu;
type = US_ENCODER_TYPE_CPU;
quality = cap->jpeg_quality;
}
quality = DR(jpeg_quality);
n_workers = 1;
} else if (type == US_ENCODER_TYPE_M2M_VIDEO || type == US_ENCODER_TYPE_M2M_IMAGE) {
US_LOG_DEBUG("Preparing M2M-%s encoder ...", (type == US_ENCODER_TYPE_M2M_VIDEO ? "VIDEO" : "IMAGE"));
if (_ER(m2ms) == NULL) {
US_CALLOC(_ER(m2ms), n_workers);
if (run->m2ms == NULL) {
US_CALLOC(run->m2ms, n_workers);
}
for (; _ER(n_m2ms) < n_workers; ++_ER(n_m2ms)) {
for (; run->n_m2ms < n_workers; ++run->n_m2ms) {
// Начинаем с нуля и доинициализируем на следующих заходах при необходимости
char name[32];
US_SNPRINTF(name, 31, "JPEG-%u", _ER(n_m2ms));
US_SNPRINTF(name, 31, "JPEG-%u", run->n_m2ms);
if (type == US_ENCODER_TYPE_M2M_VIDEO) {
_ER(m2ms[_ER(n_m2ms)]) = us_m2m_mjpeg_encoder_init(name, enc->m2m_path, quality);
run->m2ms[run->n_m2ms] = us_m2m_mjpeg_encoder_init(name, enc->m2m_path, quality);
} else {
_ER(m2ms[_ER(n_m2ms)]) = us_m2m_jpeg_encoder_init(name, enc->m2m_path, quality);
run->m2ms[run->n_m2ms] = us_m2m_jpeg_encoder_init(name, enc->m2m_path, quality);
}
}
} else if (type == US_ENCODER_TYPE_NOOP) {
n_workers = 1;
quality = 0;
}
goto ok;
if (quality == 0) {
US_LOG_INFO("Using JPEG quality: encoder default");
} else {
US_LOG_INFO("Using JPEG quality: %u%%", quality);
}
use_cpu:
type = US_ENCODER_TYPE_CPU;
quality = cap->jpeg_quality;
US_MUTEX_LOCK(run->mutex);
run->type = type;
run->quality = quality;
US_MUTEX_UNLOCK(run->mutex);
ok:
if (type == US_ENCODER_TYPE_NOOP) {
US_LOG_INFO("Using JPEG NOOP encoder");
} else if (quality == 0) {
US_LOG_INFO("Using JPEG quality: encoder default");
} else {
US_LOG_INFO("Using JPEG quality: %u%%", quality);
}
const ldf desired_interval = (
cap->desired_fps > 0 && (cap->desired_fps < cap->run->hw_fps || cap->run->hw_fps == 0)
? (ldf)1 / cap->desired_fps
: 0
);
US_MUTEX_LOCK(_ER(mutex));
_ER(type) = type;
_ER(quality) = quality;
if (cpu_forced) {
_ER(cpu_forced) = true;
}
US_MUTEX_UNLOCK(_ER(mutex));
const long double desired_interval = (
cap->desired_fps > 0 && (cap->desired_fps < cap->run->hw_fps || cap->run->hw_fps == 0)
? (long double)1 / cap->desired_fps
: 0
);
enc->run->pool = us_workers_pool_init(
"JPEG", "jw", n_workers, desired_interval,
_worker_job_init, (void*)enc,
_worker_job_destroy,
_worker_run_job);
# undef DR
enc->run->pool = us_workers_pool_init(
"JPEG", "jw", n_workers, desired_interval,
_worker_job_init, (void*)enc,
_worker_job_destroy,
_worker_run_job);
}
void us_encoder_close(us_encoder_s *enc) {
@@ -178,11 +180,12 @@ void us_encoder_close(us_encoder_s *enc) {
US_DELETE(enc->run->pool, us_workers_pool_destroy);
}
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality) {
US_MUTEX_LOCK(_ER(mutex));
*type = _ER(type);
*quality = _ER(quality);
US_MUTEX_UNLOCK(_ER(mutex));
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, uint *quality) {
us_encoder_runtime_s *const run = enc->run;
US_MUTEX_LOCK(run->mutex);
*type = run->type;
*quality = run->quality;
US_MUTEX_UNLOCK(run->mutex);
}
static void *_worker_job_init(void *v_enc) {
@@ -200,35 +203,28 @@ static void _worker_job_destroy(void *v_job) {
}
static bool _worker_run_job(us_worker_s *wr) {
us_encoder_job_s *job = wr->job;
us_encoder_s *enc = job->enc; // Just for _ER()
const us_frame_s *src = &job->hw->raw;
us_frame_s *dest = job->dest;
us_encoder_job_s *const job = wr->job;
us_encoder_runtime_s *const run = job->enc->run;
const us_frame_s *const src = &job->hw->raw;
us_frame_s *const dest = job->dest;
if (_ER(type) == US_ENCODER_TYPE_CPU) {
if (run->type == US_ENCODER_TYPE_CPU) {
US_LOG_VERBOSE("Compressing JPEG using CPU: worker=%s, buffer=%u",
wr->name, job->hw->buf.index);
us_cpu_encoder_compress(src, dest, _ER(quality));
us_cpu_encoder_compress(src, dest, run->quality);
} else if (_ER(type) == US_ENCODER_TYPE_HW) {
} else if (run->type == US_ENCODER_TYPE_HW) {
US_LOG_VERBOSE("Compressing JPEG using HW (just copying): worker=%s, buffer=%u",
wr->name, job->hw->buf.index);
us_hw_encoder_compress(src, dest);
} else if (_ER(type) == US_ENCODER_TYPE_M2M_VIDEO || _ER(type) == US_ENCODER_TYPE_M2M_IMAGE) {
} else if (run->type == US_ENCODER_TYPE_M2M_VIDEO || run->type == US_ENCODER_TYPE_M2M_IMAGE) {
US_LOG_VERBOSE("Compressing JPEG using M2M-%s: worker=%s, buffer=%u",
(_ER(type) == US_ENCODER_TYPE_M2M_VIDEO ? "VIDEO" : "IMAGE"), wr->name, job->hw->buf.index);
if (us_m2m_encoder_compress(_ER(m2ms[wr->number]), src, dest, false) < 0) {
(run->type == US_ENCODER_TYPE_M2M_VIDEO ? "VIDEO" : "IMAGE"), wr->name, job->hw->buf.index);
if (us_m2m_encoder_compress(run->m2ms[wr->number], src, dest, false) < 0) {
goto error;
}
} else if (_ER(type) == US_ENCODER_TYPE_NOOP) {
US_LOG_VERBOSE("Compressing JPEG using NOOP (do nothing): worker=%s, buffer=%u",
wr->name, job->hw->buf.index);
us_frame_encoding_begin(src, dest, V4L2_PIX_FMT_JPEG);
usleep(5000); // Просто чтобы работала логика desired_fps
dest->encode_end_ts = us_get_now_monotonic(); // us_frame_encoding_end()
} else {
assert(0 && "Unknown encoder type");
}
@@ -238,14 +234,9 @@ static bool _worker_run_job(us_worker_s *wr) {
job->dest->encode_end_ts - job->dest->encode_begin_ts,
wr->name,
job->hw->buf.index);
return true;
error:
US_LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf.index);
US_LOG_ERROR("Error while compressing buffer, falling back to CPU");
US_MUTEX_LOCK(_ER(mutex));
_ER(cpu_forced) = true;
US_MUTEX_UNLOCK(_ER(mutex));
return false;
error:
US_LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf.index);
return false;
}

View File

@@ -22,45 +22,32 @@
#pragma once
#include <stdlib.h>
#include <stdbool.h>
#include <strings.h>
#include <assert.h>
#include <pthread.h>
#include <linux/videodev2.h>
#include "../libs/tools.h"
#include "../libs/array.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/types.h"
#include "../libs/frame.h"
#include "../libs/capture.h"
#include "workers.h"
#include "m2m.h"
#include "encoders/cpu/encoder.h"
#include "encoders/hw/encoder.h"
#define ENCODER_TYPES_STR "CPU, HW, M2M-VIDEO, M2M-IMAGE"
#define ENCODER_TYPES_STR "CPU, HW, M2M-VIDEO, M2M-IMAGE, NOOP"
typedef enum {
US_ENCODER_TYPE_CPU,
US_ENCODER_TYPE_HW,
US_ENCODER_TYPE_M2M_VIDEO,
US_ENCODER_TYPE_M2M_IMAGE,
US_ENCODER_TYPE_NOOP,
} us_encoder_type_e;
typedef struct {
us_encoder_type_e type;
unsigned quality;
bool cpu_forced;
uint quality;
pthread_mutex_t mutex;
unsigned n_m2ms;
uint n_m2ms;
us_m2m_encoder_s **m2ms;
us_workers_pool_s *pool;
@@ -68,7 +55,7 @@ typedef struct {
typedef struct {
us_encoder_type_e type;
unsigned n_workers;
uint n_workers;
char *m2m_path;
us_encoder_runtime_s *run;
@@ -90,4 +77,4 @@ const char *us_encoder_type_to_string(us_encoder_type_e type);
void us_encoder_open(us_encoder_s *enc, us_capture_s *cap);
void us_encoder_close(us_encoder_s *enc);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, uint *quality);

View File

@@ -40,7 +40,10 @@ static void _jpeg_set_dest_frame(j_compress_ptr jpeg, us_frame_s *frame);
static void _jpeg_write_scanlines_yuv(struct jpeg_compress_struct *jpeg, const us_frame_s *frame);
static void _jpeg_write_scanlines_rgb565(struct jpeg_compress_struct *jpeg, const us_frame_s *frame);
static void _jpeg_write_scanlines_rgb24(struct jpeg_compress_struct *jpeg, const us_frame_s *frame);
#ifndef JCS_EXTENSIONS
#warning JCS_EXT_BGR is not supported, please use libjpeg-turbo
static void _jpeg_write_scanlines_bgr24(struct jpeg_compress_struct *jpeg, const us_frame_s *frame);
#endif
static void _jpeg_init_destination(j_compress_ptr jpeg);
static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg);
@@ -67,6 +70,9 @@ void us_cpu_encoder_compress(const us_frame_s *src, us_frame_s *dest, unsigned q
case V4L2_PIX_FMT_YUYV:
case V4L2_PIX_FMT_YVYU:
case V4L2_PIX_FMT_UYVY: jpeg.in_color_space = JCS_YCbCr; break;
# ifdef JCS_EXTENSIONS
case V4L2_PIX_FMT_BGR24: jpeg.in_color_space = JCS_EXT_BGR; break;
# endif
default: jpeg.in_color_space = JCS_RGB; break;
}
@@ -82,7 +88,13 @@ void us_cpu_encoder_compress(const us_frame_s *src, us_frame_s *dest, unsigned q
case V4L2_PIX_FMT_UYVY: _jpeg_write_scanlines_yuv(&jpeg, src); break;
case V4L2_PIX_FMT_RGB565: _jpeg_write_scanlines_rgb565(&jpeg, src); break;
case V4L2_PIX_FMT_RGB24: _jpeg_write_scanlines_rgb24(&jpeg, src); break;
case V4L2_PIX_FMT_BGR24: _jpeg_write_scanlines_bgr24(&jpeg, src); break;
case V4L2_PIX_FMT_BGR24:
# ifdef JCS_EXTENSIONS
_jpeg_write_scanlines_rgb24(&jpeg, src); // Use native JCS_EXT_BGR
# else
_jpeg_write_scanlines_bgr24(&jpeg, src);
# endif
break;
default: assert(0 && "Unsupported input format for CPU encoder"); return;
}
@@ -196,6 +208,7 @@ static void _jpeg_write_scanlines_rgb24(struct jpeg_compress_struct *jpeg, const
}
}
#ifndef JCS_EXTENSIONS
static void _jpeg_write_scanlines_bgr24(struct jpeg_compress_struct *jpeg, const us_frame_s *frame) {
uint8_t *line_buf;
US_CALLOC(line_buf, frame->width * 3);
@@ -222,6 +235,7 @@ static void _jpeg_write_scanlines_bgr24(struct jpeg_compress_struct *jpeg, const
free(line_buf);
}
#endif
#define JPEG_OUTPUT_BUFFER_SIZE ((size_t)4096)

View File

@@ -1,65 +0,0 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "bev.h"
#include <string.h>
#include <errno.h>
#include <event2/util.h>
#include <event2/bufferevent.h>
#include "../../libs/tools.h"
char *us_bufferevent_format_reason(short what) {
char *reason;
US_CALLOC(reason, 2048);
// evutil_socket_error_to_string() is not thread-safe
char *const perror_str = us_errno_to_string(EVUTIL_SOCKET_ERROR());
bool first = true;
strncat(reason, perror_str, 1023);
free(perror_str);
strcat(reason, " (");
# define FILL_REASON(x_bev, x_name) { \
if (what & x_bev) { \
if (first) { \
first = false; \
} else { \
strcat(reason, ","); \
} \
strcat(reason, x_name); \
} \
}
FILL_REASON(BEV_EVENT_READING, "reading");
FILL_REASON(BEV_EVENT_WRITING, "writing");
FILL_REASON(BEV_EVENT_ERROR, "error");
FILL_REASON(BEV_EVENT_TIMEOUT, "timeout");
FILL_REASON(BEV_EVENT_EOF, "eof"); // cppcheck-suppress unreadVariable
# undef FILL_REASON
strcat(reason, ")");
return reason;
}

View File

@@ -1,26 +0,0 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
char *us_bufferevent_format_reason(short what);

View File

@@ -40,7 +40,7 @@ char *us_simplify_request_path(const char *str) {
char pre1; // The one before
char pre2; // The one before that
char *simplified;
char *start;
const char *start;
char *out;
char *slash;

View File

@@ -56,7 +56,6 @@
#include "../../libs/tools.h"
#include "../../libs/threading.h"
#include "../../libs/logging.h"
#include "../../libs/process.h"
#include "../../libs/frame.h"
#include "../../libs/base64.h"
#include "../../libs/list.h"
@@ -68,9 +67,7 @@
# include "../gpio/gpio.h"
#endif
#include "bev.h"
#include "unix.h"
#include "uri.h"
#include "tools.h"
#include "mime.h"
#include "static.h"
#ifdef WITH_SYSTEMD
@@ -98,9 +95,6 @@ static void _http_send_snapshot(us_server_s *server);
static bool _expose_frame(us_server_s *server, const us_frame_s *frame);
static const char *_http_get_header(struct evhttp_request *request, const char *key);
static char *_http_get_client_hostport(struct evhttp_request *request);
#define _LOG_ERROR(x_msg, ...) US_LOG_ERROR("HTTP: " x_msg, ##__VA_ARGS__)
#define _LOG_PERROR(x_msg, ...) US_LOG_PERROR("HTTP: " x_msg, ##__VA_ARGS__)
@@ -203,8 +197,6 @@ int us_server_listen(us_server_s *server) {
}
us_frame_copy(stream->run->blank->jpeg, ex->frame);
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;
{
struct timeval interval = {0};
@@ -282,8 +274,8 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
atomic_store(&server->stream->run->http->last_request_ts, us_get_now_monotonic());
if (server->allow_origin[0] != '\0') {
const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers");
const char *const cors_method = _http_get_header(request, "Access-Control-Request-Method");
const char *const cors_headers = us_evhttp_get_header(request, "Access-Control-Request-Headers");
const char *const cors_method = us_evhttp_get_header(request, "Access-Control-Request-Method");
_A_ADD_HEADER(request, "Access-Control-Allow-Origin", server->allow_origin);
_A_ADD_HEADER(request, "Access-Control-Allow-Credentials", "true");
@@ -301,7 +293,7 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
}
if (run->auth_token != NULL) {
const char *const token = _http_get_header(request, "Authorization");
const char *const token = us_evhttp_get_header(request, "Authorization");
if (token == NULL || strcmp(token, run->auth_token) != 0) {
_A_ADD_HEADER(request, "WWW-Authenticate", "Basic realm=\"Restricted area\"");
evhttp_send_reply(request, 401, "Unauthorized", NULL);
@@ -593,7 +585,7 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
struct evkeyvalq params;
evhttp_parse_query(evhttp_request_get_uri(request), &params);
# define PARSE_PARAM(x_type, x_name) client->x_name = us_uri_get_##x_type(&params, #x_name)
# define PARSE_PARAM(x_type, x_name) client->x_name = us_evkeyvalq_get_##x_type(&params, #x_name)
PARSE_PARAM(string, key);
PARSE_PARAM(true, extra_headers);
PARSE_PARAM(true, advance_headers);
@@ -602,7 +594,7 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
# undef PARSE_PARAM
evhttp_clear_headers(&params);
client->hostport = _http_get_client_hostport(request);
client->hostport = us_evhttp_get_hostport(request);
client->id = us_get_now_id();
{
@@ -682,8 +674,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
_A_EVBUFFER_ADD_PRINTF(buf, "HTTP/1.0 200 OK" RN);
if (client->server->allow_origin[0] != '\0') {
const char *const cors_headers = _http_get_header(client->request, "Access-Control-Request-Headers");
const char *const cors_method = _http_get_header(client->request, "Access-Control-Request-Method");
const char *const cors_headers = us_evhttp_get_header(client->request, "Access-Control-Request-Headers");
const char *const cors_method = us_evhttp_get_header(client->request, "Access-Control-Request-Method");
_A_EVBUFFER_ADD_PRINTF(buf,
"Access-Control-Allow-Origin: %s" RN
@@ -957,21 +949,6 @@ static void _http_refresher(int fd, short what, void *v_server) {
_http_send_stream(server, stream_updated, frame_updated);
_http_send_snapshot(server);
if (
frame_updated
&& server->notify_parent
&& (
ex->notify_last_online != ex->frame->online
|| ex->notify_last_width != ex->frame->width
|| ex->notify_last_height != ex->frame->height
)
) {
ex->notify_last_online = ex->frame->online;
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;
us_process_notify_parent();
}
}
static bool _expose_frame(us_server_s *server, const us_frame_s *frame) {
@@ -1016,39 +993,3 @@ static bool _expose_frame(us_server_s *server, const us_frame_s *frame) {
ex->frame->online, (ex->expose_end_ts - ex->expose_begin_ts));
return true; // Updated
}
static const char *_http_get_header(struct evhttp_request *request, const char *key) {
return evhttp_find_header(evhttp_request_get_input_headers(request), key);
}
static char *_http_get_client_hostport(struct evhttp_request *request) {
char *addr = NULL;
unsigned short port = 0;
struct evhttp_connection *conn = evhttp_request_get_connection(request);
if (conn != NULL) {
char *peer;
evhttp_connection_get_peer(conn, &peer, &port);
addr = us_strdup(peer);
}
const char *xff = _http_get_header(request, "X-Forwarded-For");
if (xff != NULL) {
US_DELETE(addr, free);
assert((addr = strndup(xff, 1024)) != NULL);
for (uint index = 0; addr[index]; ++index) {
if (addr[index] == ',') {
addr[index] = '\0';
break;
}
}
}
if (addr == NULL) {
addr = us_strdup("???");
}
char *hostport;
US_ASPRINTF(hostport, "[%s]:%u", addr, port);
free(addr);
return hostport;
}

View File

@@ -72,10 +72,6 @@ typedef struct {
ldf expose_begin_ts;
ldf expose_cmp_ts;
ldf expose_end_ts;
bool notify_last_online;
uint notify_last_width;
uint notify_last_height;
} us_server_exposed_s;
typedef struct {
@@ -121,8 +117,6 @@ typedef struct us_server_sx {
uint fake_width;
uint fake_height;
bool notify_parent;
us_server_runtime_s *run;
} us_server_s;

View File

@@ -20,7 +20,7 @@
*****************************************************************************/
#include "unix.h"
#include "tools.h"
#include <string.h>
#include <unistd.h>
@@ -33,6 +33,8 @@
#include <event2/http.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>
#include <event2/bufferevent.h>
#include "../../libs/types.h"
#include "../../libs/tools.h"
@@ -79,3 +81,94 @@ evutil_socket_t us_evhttp_bind_unix(struct evhttp *http, const char *path, bool
}
return fd;
}
const char *us_evhttp_get_header(struct evhttp_request *request, const char *key) {
return evhttp_find_header(evhttp_request_get_input_headers(request), key);
}
char *us_evhttp_get_hostport(struct evhttp_request *request) {
char *addr = NULL;
unsigned short port = 0;
struct evhttp_connection *conn = evhttp_request_get_connection(request);
if (conn != NULL) {
char *peer;
evhttp_connection_get_peer(conn, &peer, &port);
addr = us_strdup(peer);
}
const char *xff = us_evhttp_get_header(request, "X-Forwarded-For");
if (xff != NULL) {
US_DELETE(addr, free);
assert((addr = strndup(xff, 1024)) != NULL);
for (uint index = 0; addr[index]; ++index) {
if (addr[index] == ',') {
addr[index] = '\0';
break;
}
}
}
if (addr == NULL) {
addr = us_strdup("???");
}
char *hostport;
US_ASPRINTF(hostport, "[%s]:%u", addr, port);
free(addr);
return hostport;
}
bool us_evkeyvalq_get_true(struct evkeyvalq *params, const char *key) {
const char *value_str = evhttp_find_header(params, key);
if (value_str != NULL) {
if (
value_str[0] == '1'
|| !evutil_ascii_strcasecmp(value_str, "true")
|| !evutil_ascii_strcasecmp(value_str, "yes")
) {
return true;
}
}
return false;
}
char *us_evkeyvalq_get_string(struct evkeyvalq *params, const char *key) {
const char *const value_str = evhttp_find_header(params, key);
if (value_str != NULL) {
return evhttp_encode_uri(value_str);
}
return NULL;
}
char *us_bufferevent_format_reason(short what) {
char *reason;
US_CALLOC(reason, 2048);
// evutil_socket_error_to_string() is not thread-safe
char *const perror_str = us_errno_to_string(EVUTIL_SOCKET_ERROR());
bool first = true;
strncat(reason, perror_str, 1023);
free(perror_str);
strcat(reason, " (");
# define FILL_REASON(x_bev, x_name) { \
if (what & x_bev) { \
if (first) { \
first = false; \
} else { \
strcat(reason, ","); \
} \
strcat(reason, x_name); \
} \
}
FILL_REASON(BEV_EVENT_READING, "reading");
FILL_REASON(BEV_EVENT_WRITING, "writing");
FILL_REASON(BEV_EVENT_ERROR, "error");
FILL_REASON(BEV_EVENT_TIMEOUT, "timeout");
FILL_REASON(BEV_EVENT_EOF, "eof"); // cppcheck-suppress unreadVariable
# undef FILL_REASON
strcat(reason, ")");
return reason;
}

View File

@@ -25,9 +25,17 @@
#include <sys/stat.h>
#include <event2/http.h>
#include <event2/util.h>
#include <event2/keyvalq_struct.h>
#include "../../libs/types.h"
evutil_socket_t us_evhttp_bind_unix(struct evhttp *http, const char *path, bool rm, mode_t mode);
const char *us_evhttp_get_header(struct evhttp_request *request, const char *key);
char *us_evhttp_get_hostport(struct evhttp_request *request);
bool us_evkeyvalq_get_true(struct evkeyvalq *params, const char *key);
char *us_evkeyvalq_get_string(struct evkeyvalq *params, const char *key);
char *us_bufferevent_format_reason(short what);

View File

@@ -90,6 +90,7 @@ int main(int argc, char *argv[]) {
_g_server = us_server_init(_g_stream);
if ((exit_code = options_parse(options, cap, enc, _g_stream, _g_server)) == 0) {
us_stream_update_blank(_g_stream, cap);
# ifdef WITH_GPIO
us_gpio_init();
# endif

View File

@@ -32,6 +32,7 @@ enum _US_OPT_VALUES {
_O_IO_METHOD = 'I',
_O_DESIRED_FPS = 'f',
_O_MIN_FRAME_SIZE = 'z',
_O_ALLOW_TRUNCATED_FRAMES = 'T',
_O_PERSISTENT = 'n',
_O_DV_TIMINGS = 't',
_O_BUFFERS = 'b',
@@ -142,6 +143,7 @@ static const struct option _LONG_OPTS[] = {
{"io-method", required_argument, NULL, _O_IO_METHOD},
{"desired-fps", required_argument, NULL, _O_DESIRED_FPS},
{"min-frame-size", required_argument, NULL, _O_MIN_FRAME_SIZE},
{"allow-truncated-frames", no_argument, NULL, _O_ALLOW_TRUNCATED_FRAMES},
{"persistent", no_argument, NULL, _O_PERSISTENT},
{"dv-timings", no_argument, NULL, _O_DV_TIMINGS},
{"buffers", required_argument, NULL, _O_BUFFERS},
@@ -353,7 +355,7 @@ int options_parse(us_options_s *options, us_capture_s *cap, us_encoder_s *enc, u
}
# define ADD_SINK(x_prefix) \
char *x_prefix##_name = NULL; \
const char *x_prefix##_name = NULL; \
mode_t x_prefix##_mode = 0660; \
bool x_prefix##_rm = false; \
unsigned x_prefix##_client_ttl = 10; \
@@ -364,7 +366,7 @@ int options_parse(us_options_s *options, us_capture_s *cap, us_encoder_s *enc, u
# undef ADD_SINK
# ifdef WITH_SETPROCTITLE
char *process_name_prefix = NULL;
const char *process_name_prefix = NULL;
# endif
char short_opts[128];
@@ -384,6 +386,7 @@ int options_parse(us_options_s *options, us_capture_s *cap, us_encoder_s *enc, u
case _O_IO_METHOD: OPT_PARSE_ENUM("IO method", cap->io_method, us_capture_parse_io_method, US_IO_METHODS_STR);
case _O_DESIRED_FPS: OPT_NUMBER("--desired-fps", cap->desired_fps, 0, US_VIDEO_MAX_FPS, 0);
case _O_MIN_FRAME_SIZE: OPT_NUMBER("--min-frame-size", cap->min_frame_size, 1, 8192, 0);
case _O_ALLOW_TRUNCATED_FRAMES: OPT_SET(cap->allow_truncated_frames, true);
case _O_PERSISTENT: OPT_SET(cap->persistent, true);
case _O_DV_TIMINGS: OPT_SET(cap->dv_timings, true);
case _O_BUFFERS: OPT_NUMBER("--buffers", cap->n_bufs, 1, 32, 0);
@@ -491,7 +494,7 @@ int options_parse(us_options_s *options, us_capture_s *cap, us_encoder_s *enc, u
# ifdef WITH_SETPROCTITLE
case _O_PROCESS_NAME_PREFIX: OPT_SET(process_name_prefix, optarg);
# endif
case _O_NOTIFY_PARENT: OPT_SET(server->notify_parent, true);
case _O_NOTIFY_PARENT: OPT_SET(stream->notify_parent, true);
case _O_LOG_LEVEL: OPT_NUMBER("--log-level", us_g_log_level, US_LOG_LEVEL_INFO, US_LOG_LEVEL_DEBUG, 0);
case _O_PERF: OPT_SET(us_g_log_level, US_LOG_LEVEL_PERF);
@@ -632,6 +635,8 @@ static void _help(FILE *fp, const us_capture_s *cap, const us_encoder_s *enc, co
SAY(" -f|--desired-fps <N> ──────────────── Desired FPS. Default: maximum possible.\n");
SAY(" -z|--min-frame-size <N> ───────────── Drop frames smaller then this limit. Useful if the device");
SAY(" produces small-sized garbage frames. Default: %zu bytes.\n", cap->min_frame_size);
SAY(" -T|--allow-truncated-frames ───────── Allows to handle truncated frames. Useful if the device");
SAY(" produces incorrect but still acceptable frames. Default: disabled.\n");
SAY(" -n|--persistent ───────────────────── Don't re-initialize device on timeout. Default: disabled.\n");
SAY(" -t|--dv-timings ───────────────────── Enable DV-timings querying and events processing");
SAY(" to automatic resolution change. Default: disabled.\n");
@@ -651,8 +656,7 @@ static void _help(FILE *fp, const us_capture_s *cap, const us_encoder_s *enc, co
SAY(" * CPU ──────── Software MJPEG encoding (default);");
SAY(" * HW ───────── Use pre-encoded MJPEG frames directly from camera hardware;");
SAY(" * M2M-VIDEO ── GPU-accelerated MJPEG encoding using V4L2 M2M video interface;");
SAY(" * M2M-IMAGE ── GPU-accelerated JPEG encoding using V4L2 M2M image interface;");
SAY(" * NOOP ─────── Don't compress MJPEG stream (do nothing).\n");
SAY(" * M2M-IMAGE ── GPU-accelerated JPEG encoding using V4L2 M2M image interface.\n");
SAY(" -g|--glitched-resolutions <WxH,...> ─ It doesn't do anything. Still here for compatibility.\n");
SAY(" -k|--blank <path> ─────────────────── It doesn't do anything. Still here for compatibility.\n");
SAY(" -K|--last-as-blank <sec> ──────────── It doesn't do anything. Still here for compatibility.\n");

View File

@@ -85,6 +85,7 @@ static us_capture_hwbuf_s *_get_latest_hw(us_queue_s *queue);
static bool _stream_has_jpeg_clients_cached(us_stream_s *stream);
static bool _stream_has_any_clients_cached(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_update_captured_fpsi(us_stream_s *stream, const us_frame_s *frame, bool bump);
#ifdef WITH_V4P
static void _stream_drm_ensure_no_signal(us_stream_s *stream);
#endif
@@ -122,13 +123,17 @@ us_stream_s *us_stream_init(us_capture_s *cap, us_encoder_s *enc) {
stream->h264_gop = 30;
stream->run = run;
us_blank_draw(run->blank, "< NO SIGNAL >", cap->width, cap->height);
us_fpsi_meta_s meta = {0};
us_fpsi_frame_to_meta(run->blank->raw, &meta);
us_fpsi_update(http->captured_fpsi, false, &meta);
us_stream_update_blank(stream, cap); // Init blank
return stream;
}
void us_stream_update_blank(us_stream_s *stream, const us_capture_s *cap) {
us_stream_runtime_s *const run = stream->run;
us_blank_draw(run->blank, "< NO SIGNAL >", cap->width, cap->height);
us_fpsi_frame_to_meta(run->blank->raw, &run->notify_meta); // Initial "unchanged" meta
_stream_update_captured_fpsi(stream, run->blank->raw, false);
}
void us_stream_destroy(us_stream_s *stream) {
us_fpsi_destroy(stream->run->http->captured_fpsi);
US_RING_DELETE_WITH_ITEMS(stream->run->http->jpeg_ring, us_frame_destroy);
@@ -200,9 +205,7 @@ void us_stream_loop(us_stream_s *stream) {
default: goto close; // Any error
}
us_fpsi_meta_s meta = {0};
us_fpsi_frame_to_meta(&hw->raw, &meta);
us_fpsi_update(run->http->captured_fpsi, true, &meta);
_stream_update_captured_fpsi(stream, &hw->raw, true);
# ifdef WITH_GPIO
us_gpio_set_stream_online(true);
@@ -313,23 +316,23 @@ static void *_jpeg_thread(void *v_ctx) {
uint fluency_passed = 0;
while (!atomic_load(ctx->stop)) {
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
us_encoder_job_s *const ready_job = ready_wr->job;
us_worker_s *const wr = us_workers_pool_wait(stream->enc->run->pool);
us_encoder_job_s *const job = wr->job;
if (ready_job->hw != NULL) {
us_capture_hwbuf_decref(ready_job->hw);
ready_job->hw = NULL;
if (ready_wr->job_failed) {
if (job->hw != NULL) {
us_capture_hwbuf_decref(job->hw);
job->hw = NULL;
if (wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_jpeg(stream, ready_job->dest);
} else if (wr->job_timely) {
_stream_expose_jpeg(stream, job->dest);
if (atomic_load(&stream->run->http->snapshot_requested) > 0) { // Process real snapshots
atomic_fetch_sub(&stream->run->http->snapshot_requested, 1);
}
US_LOG_PERF("JPEG: ##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
wr->name, us_get_now_monotonic() - job->dest->grab_ts);
} else {
US_LOG_PERF("JPEG: ----- Encoded JPEG dropped; worker=%s", ready_wr->name);
US_LOG_PERF("JPEG: ----- Encoded JPEG dropped; worker=%s", wr->name);
}
}
@@ -355,13 +358,13 @@ static void *_jpeg_thread(void *v_ctx) {
}
fluency_passed = 0;
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, wr);
grab_after_ts = now_ts + fluency_delay;
US_LOG_VERBOSE("JPEG: Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after_ts);
ready_job->hw = hw;
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
US_LOG_DEBUG("JPEG: Assigned new frame in buffer=%d to worker=%s", hw->buf.index, ready_wr->name);
job->hw = hw;
us_workers_pool_assign(stream->enc->run->pool, wr);
US_LOG_DEBUG("JPEG: Assigned new frame in buffer=%d to worker=%s", hw->buf.index, wr->name);
}
return NULL;
}
@@ -576,10 +579,7 @@ static int _stream_init_loop(us_stream_s *stream) {
}
us_blank_draw(run->blank, "< NO SIGNAL >", width, height);
us_fpsi_meta_s meta = {0};
us_fpsi_frame_to_meta(run->blank->raw, &meta);
us_fpsi_update(run->http->captured_fpsi, false, &meta);
_stream_update_captured_fpsi(stream, run->blank->raw, false);
_stream_expose_jpeg(stream, run->blank->jpeg);
_stream_expose_raw(stream, run->blank->raw);
_stream_encode_expose_h264(stream, run->blank->raw, true);
@@ -594,6 +594,19 @@ static int _stream_init_loop(us_stream_s *stream) {
return -1;
}
static void _stream_update_captured_fpsi(us_stream_s *stream, const us_frame_s *frame, bool bump) {
us_stream_runtime_s *const run = stream->run;
us_fpsi_meta_s meta = {0};
us_fpsi_frame_to_meta(frame, &meta);
us_fpsi_update(run->http->captured_fpsi, bump, &meta);
if (stream->notify_parent && memcmp(&run->notify_meta, &meta, sizeof(us_fpsi_meta_s))) {
memcpy(&run->notify_meta, &meta, sizeof(us_fpsi_meta_s));
us_process_notify_parent();
}
}
#ifdef WITH_V4P
static void _stream_drm_ensure_no_signal(us_stream_s *stream) {
if (stream->drm == NULL) {

View File

@@ -68,6 +68,8 @@ typedef struct {
us_blank_s *blank;
us_fpsi_meta_s notify_meta;
atomic_bool stop;
} us_stream_runtime_s;
@@ -75,6 +77,7 @@ typedef struct {
us_capture_s *cap;
us_encoder_s *enc;
bool notify_parent;
bool slowdown;
uint error_delay;
uint exit_on_no_clients;
@@ -96,6 +99,7 @@ typedef struct {
us_stream_s *us_stream_init(us_capture_s *cap, us_encoder_s *enc);
void us_stream_update_blank(us_stream_s *stream, const us_capture_s *cap);
void us_stream_destroy(us_stream_s *stream);
void us_stream_loop(us_stream_s *stream);

View File

@@ -22,12 +22,22 @@
#include "workers.h"
#include <stdatomic.h>
#include <pthread.h>
#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/list.h"
static void *_worker_thread(void *v_worker);
us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job) {
@@ -44,28 +54,28 @@ us_workers_pool_s *us_workers_pool_init(
atomic_init(&pool->stop, false);
pool->n_workers = n_workers;
US_CALLOC(pool->workers, pool->n_workers);
US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *wr;
US_CALLOC(wr, 1);
WR(number) = number;
US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
wr->number = index;
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);
US_MUTEX_INIT(WR(has_job_mutex));
atomic_init(&WR(has_job), false);
US_COND_INIT(WR(has_job_cond));
US_MUTEX_INIT(wr->has_job_mutex);
atomic_init(&wr->has_job, false);
US_COND_INIT(wr->has_job_cond);
WR(pool) = pool;
WR(job) = job_init(job_init_arg);
wr->pool = pool;
wr->job = job_init(job_init_arg);
US_THREAD_CREATE(WR(tid), _worker_thread, (void*)&(pool->workers[number]));
US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
pool->free_workers += 1;
# undef WR
US_LIST_APPEND(pool->workers, wr);
}
return pool;
}
@@ -74,98 +84,70 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_LOG_INFO("Destroying workers pool %s ...", pool->name);
atomic_store(&pool->stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true); // Final job: die
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);
US_MUTEX_LOCK(WR(has_job_mutex));
atomic_store(&WR(has_job), true); // Final job: die
US_MUTEX_UNLOCK(WR(has_job_mutex));
US_COND_SIGNAL(WR(has_job_cond));
US_THREAD_JOIN(wr->tid);
US_MUTEX_DESTROY(wr->has_job_mutex);
US_COND_DESTROY(wr->has_job_cond);
US_THREAD_JOIN(WR(tid));
US_MUTEX_DESTROY(WR(has_job_mutex));
US_COND_DESTROY(WR(has_job_cond));
pool->job_destroy(wr->job);
free(WR(name));
pool->job_destroy(WR(job));
# undef WR
}
free(wr->name);
free(wr);
});
US_MUTEX_DESTROY(pool->free_workers_mutex);
US_COND_DESTROY(pool->free_workers_cond);
free(pool->workers);
free(pool);
}
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
us_worker_s *ready_wr = NULL;
US_MUTEX_LOCK(pool->free_workers_mutex);
US_COND_WAIT_FOR(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex);
US_MUTEX_UNLOCK(pool->free_workers_mutex);
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
ready_wr = pool->oldest_wr;
ready_wr->job_timely = true;
pool->oldest_wr = pool->oldest_wr->next_wr;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
ready_wr == NULL
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
)
) {
ready_wr = &pool->workers[number];
break;
}
us_worker_s *found = NULL;
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
if (!atomic_load(&wr->has_job) && (found == NULL || found->job_start_ts <= wr->job_start_ts)) {
found = wr;
}
assert(ready_wr != NULL);
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
});
assert(found != NULL);
US_LIST_REMOVE(pool->workers, found);
US_LIST_APPEND(pool->workers, found); // Перемещаем в конец списка
found->job_timely = (found->job_start_ts > pool->job_timely_ts);
if (found->job_timely) {
pool->job_timely_ts = found->job_start_ts;
}
return ready_wr;
return found;
}
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/) {
if (pool->oldest_wr == NULL) {
pool->oldest_wr = ready_wr;
pool->latest_wr = pool->oldest_wr;
} else {
if (ready_wr->next_wr != NULL) {
ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
}
if (ready_wr->prev_wr != NULL) {
ready_wr->prev_wr->next_wr = ready_wr->next_wr;
}
ready_wr->prev_wr = pool->latest_wr;
pool->latest_wr->next_wr = ready_wr;
pool->latest_wr = ready_wr;
}
pool->latest_wr->next_wr = NULL;
US_MUTEX_LOCK(ready_wr->has_job_mutex);
//ready_wr->job = job;
atomic_store(&ready_wr->has_job, true);
US_MUTEX_UNLOCK(ready_wr->has_job_mutex);
US_COND_SIGNAL(ready_wr->has_job_cond);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *wr) {
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true);
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);
US_MUTEX_LOCK(pool->free_workers_mutex);
pool->free_workers -= 1;
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}
long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + wr->last_job_time * 0.1;
US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
pool->name, pool->approx_job_time, approx_job_time, wr->last_job_time);
pool->approx_job_time = approx_job_time;
const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
const ldf min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
@@ -176,7 +158,7 @@ long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_
}
static void *_worker_thread(void *v_worker) {
us_worker_s *wr = v_worker;
us_worker_s *const wr = v_worker;
US_THREAD_SETTLE("%s", wr->name);
US_LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
@@ -189,13 +171,12 @@ static void *_worker_thread(void *v_worker) {
US_MUTEX_UNLOCK(wr->has_job_mutex);
if (!atomic_load(&wr->pool->stop)) {
const long double job_start_ts = us_get_now_monotonic();
const ldf job_start_ts = us_get_now_monotonic();
wr->job_failed = !wr->pool->run_job(wr);
if (!wr->job_failed) {
wr->job_start_ts = job_start_ts;
wr->last_job_time = us_get_now_monotonic() - wr->job_start_ts;
}
//wr->job = NULL;
atomic_store(&wr->has_job, false);
}

View File

@@ -22,37 +22,32 @@
#pragma once
#include <stdbool.h>
#include <stdatomic.h>
#include <sys/types.h>
#include <pthread.h>
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/types.h"
#include "../libs/list.h"
typedef struct us_worker_sx {
pthread_t tid;
unsigned number;
char *name;
pthread_t tid;
uint number;
char *name;
long double last_job_time;
ldf last_job_time;
pthread_mutex_t has_job_mutex;
void *job;
atomic_bool has_job;
bool job_timely;
bool job_failed;
long double job_start_ts;
ldf job_start_ts;
pthread_cond_t has_job_cond;
struct us_worker_sx *prev_wr;
struct us_worker_sx *next_wr;
struct us_workers_pool_sx *pool;
US_LIST_DECLARE;
} us_worker_s;
typedef void *(*us_workers_pool_job_init_f)(void *arg);
@@ -61,20 +56,19 @@ typedef bool (*us_workers_pool_run_job_f)(us_worker_s *wr);
typedef struct us_workers_pool_sx {
const char *name;
long double desired_interval;
ldf desired_interval;
us_workers_pool_job_destroy_f job_destroy;
us_workers_pool_run_job_f run_job;
unsigned n_workers;
uint n_workers;
us_worker_s *workers;
us_worker_s *oldest_wr;
us_worker_s *latest_wr;
ldf job_timely_ts;
long double approx_job_time;
ldf approx_job_time;
pthread_mutex_t free_workers_mutex;
unsigned free_workers;
uint free_workers;
pthread_cond_t free_workers_cond;
atomic_bool stop;
@@ -82,7 +76,7 @@ typedef struct us_workers_pool_sx {
us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job);
@@ -90,6 +84,6 @@ us_workers_pool_s *us_workers_pool_init(
void us_workers_pool_destroy(us_workers_pool_s *pool);
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr);
long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);