Compare commits

...

17 Commits
v5.56 ... v5.58

Author SHA1 Message Date
Maxim Devaev
c32ea286f2 Bump version: 5.57 → 5.58 2024-03-02 19:11:06 +02:00
Maxim Devaev
b2fb857f5b refactoring 2024-03-02 19:09:53 +02:00
Maxim Devaev
20cdabc8a4 lint fix 2024-03-02 10:42:30 +02:00
Maxim Devaev
e2f4c193e3 refactoring 2024-03-02 10:08:06 +02:00
Maxim Devaev
b4aa9593dc refactoring 2024-03-02 09:59:41 +02:00
Maxim Devaev
20c729893b refactoring 2024-03-02 09:56:44 +02:00
Maxim Devaev
a00f49331c wait (select) device in grab function 2024-03-02 09:33:45 +02:00
Maxim Devaev
85308e48fd stream: null hw buffer pointer after encoding 2024-03-02 07:54:38 +02:00
Maxim Devaev
ff08a0fb25 refactoring 2024-03-02 07:23:18 +02:00
Maxim Devaev
6145b69c97 refactoring 2024-03-02 02:09:54 +02:00
Maxim Devaev
cfc5ae1b94 v4p: using /dev/dri/by-path/platform-gpu-card instead of card0 2024-03-02 01:14:15 +02:00
Maxim Devaev
54b221aabd moved exit_on_no_clients logic from http to stream loop 2024-03-01 09:40:51 +02:00
Maxim Devaev
dabee9d47a gitignores ustreamer-v4p 2024-03-01 08:22:14 +02:00
Maxim Devaev
e30520d9f3 refactoring 2024-03-01 08:11:37 +02:00
Maxim Devaev
8f0acb2176 Bump version: 5.56 → 5.57 2024-03-01 04:33:13 +02:00
Maxim Devaev
8edeff8160 fixed makefile 2024-03-01 04:21:45 +02:00
Maxim Devaev
cacec0d25c refactoring 2024-03-01 04:09:16 +02:00
22 changed files with 323 additions and 322 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion]
commit = True
tag = True
current_version = 5.56
current_version = 5.58
parse = (?P<major>\d+)\.(?P<minor>\d+)
serialize =
{major}.{minor}

2
.gitignore vendored
View File

@@ -7,7 +7,7 @@
/python/ustreamer.egg-info/
/janus/build/
/ustreamer
/ustreamer-dump
/ustreamer-*
/config.mk
vgcore.*
*.sock

View File

@@ -36,8 +36,8 @@ endif
apps:
$(MAKE) -C src
for i in src/ustreamer.bin src/ustreamer-*.bin; do \
test ! -x $$i || ln -sf $$i .; \
for i in src/*.bin; do \
test ! -x $$i || ln -sf $$i `basename $$i .bin`; \
done

View File

@@ -170,7 +170,7 @@ int us_audio_get_encoded(us_audio_s *audio, uint8_t *data, size_t *size, uint64_
if (ri < 0) {
return -2;
}
_enc_buffer_s *const buf = audio->enc_ring->items[ri];
const _enc_buffer_s *const buf = audio->enc_ring->items[ri];
if (*size < buf->used) {
us_ring_consumer_release(audio->enc_ring, ri);
return -3;

View File

@@ -107,7 +107,7 @@ static void *_video_rtp_thread(void *arg) {
while (!_STOP) {
const int ri = us_ring_consumer_acquire(_g_video_ring, 0.1);
if (ri >= 0) {
us_frame_s *frame = _g_video_ring->items[ri];
const us_frame_s *const frame = _g_video_ring->items[ri];
_LOCK_VIDEO;
const bool zero_playout_delay = (frame->gop == 0);
us_rtpv_wrap(_g_rtpv, frame, zero_playout_delay);

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer-dump.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER-DUMP 1 "version 5.56" "January 2021"
.TH USTREAMER-DUMP 1 "version 5.58" "January 2021"
.SH NAME
ustreamer-dump \- Dump uStreamer's memory sink to file

View File

@@ -1,6 +1,6 @@
.\" Manpage for ustreamer.
.\" Open an issue or pull request to https://github.com/pikvm/ustreamer to correct errors or typos
.TH USTREAMER 1 "version 5.56" "November 2020"
.TH USTREAMER 1 "version 5.58" "November 2020"
.SH NAME
ustreamer \- stream MJPEG video from any V4L2 device to the network

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer
pkgver=5.56
pkgver=5.58
pkgrel=1
pkgdesc="Lightweight and fast MJPEG-HTTP streamer"
url="https://github.com/pikvm/ustreamer"

View File

@@ -6,7 +6,7 @@
include $(TOPDIR)/rules.mk
PKG_NAME:=ustreamer
PKG_VERSION:=5.56
PKG_VERSION:=5.58
PKG_RELEASE:=1
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>

View File

@@ -17,7 +17,7 @@ def _find_sources(suffix: str) -> list[str]:
if __name__ == "__main__":
setup(
name="ustreamer",
version="5.56",
version="5.58",
description="uStreamer tools",
author="Maxim Devaev",
author_email="mdevaev@gmail.com",

View File

@@ -26,7 +26,7 @@
#define US_VERSION_MAJOR 5
#define US_VERSION_MINOR 56
#define US_VERSION_MINOR 58
#define US_MAKE_VERSION2(_major, _minor) #_major "." #_minor
#define US_MAKE_VERSION1(_major, _minor) US_MAKE_VERSION2(_major, _minor)

View File

@@ -80,7 +80,8 @@ static const struct {
{"USERPTR", V4L2_MEMORY_USERPTR},
};
static int _device_wait_buffer(us_device_s *dev);
static int _device_consume_event(us_device_s *dev);
static void _v4l2_buffer_copy(const struct v4l2_buffer *src, struct v4l2_buffer *dest);
static bool _device_is_buffer_valid(us_device_s *dev, const struct v4l2_buffer *buf, const u8 *data);
static int _device_open_check_cap(us_device_s *dev);
@@ -149,16 +150,16 @@ int us_device_parse_format(const char *str) {
return item->format;
}
});
return US_FORMAT_UNKNOWN;
return -1;
}
v4l2_std_id us_device_parse_standard(const char *str) {
US_ARRAY_ITERATE(_STANDARDS, 1, item, {
int us_device_parse_standard(const char *str) {
US_ARRAY_ITERATE(_STANDARDS, 0, item, {
if (!strcasecmp(item->name, str)) {
return item->standard;
}
});
return US_STANDARD_UNKNOWN;
return -1;
}
int us_device_parse_io_method(const char *str) {
@@ -167,7 +168,7 @@ int us_device_parse_io_method(const char *str) {
return item->io_method;
}
});
return US_IO_METHOD_UNKNOWN;
return -1;
}
int us_device_open(us_device_s *dev) {
@@ -258,52 +259,23 @@ void us_device_close(us_device_s *dev) {
run->persistent_timeout_reported = false;
}
int us_device_select(us_device_s *dev, bool *has_read, bool *has_error) {
us_device_runtime_s *const run = dev->run;
# define INIT_FD_SET(x_set) \
fd_set x_set; FD_ZERO(&x_set); FD_SET(run->fd, &x_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
// Раньше мы проверяли и has_write, но потом выяснилось, что libcamerify зачем-то
// генерирует эвенты на запись, вероятно ошибочно. Судя по всему, игнорирование
// has_write не делает никому плохо.
struct timeval timeout;
timeout.tv_sec = dev->timeout;
timeout.tv_usec = 0;
_D_LOG_DEBUG("Calling select() on video device ...");
int retval = select(run->fd + 1, &read_fds, NULL, &error_fds, &timeout);
if (retval > 0) {
*has_read = FD_ISSET(run->fd, &read_fds);
*has_error = FD_ISSET(run->fd, &error_fds);
} else {
*has_read = false;
*has_error = false;
}
_D_LOG_DEBUG("Device select() --> %d; has_read=%d, has_error=%d", retval, *has_read, *has_error);
if (retval > 0) {
run->persistent_timeout_reported = false;
} else if (retval == 0) {
if (dev->persistent) {
if (!run->persistent_timeout_reported) {
_D_LOG_ERROR("Persistent device timeout (unplugged)");
run->persistent_timeout_reported = true;
}
} else {
// Если устройство не персистентное, то таймаут является ошибкой
retval = -1;
}
}
return retval;
}
int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
// Это сложная функция, которая делает сразу много всего, чтобы получить новый фрейм.
// - Вызывается _device_wait_buffer() с select() внутри, чтобы подождать новый фрейм
// или эвент V4L2. Обработка эвентов более приоритетна, чем кадров.
// - При таймауте select() в _device_wait_buffer() возвращаем -2 для персистентных
// устройств типа TC358743, для остальных же возвращаем ошибку -1.
// - Если есть новые фреймы, то пропустить их все, пока не закончатся и вернуть
// самый-самый свежий, содержащий при этом валидные данные.
// - Если таковых не нашлось, вернуть -3.
// - Ошибка -1 возвращается при любых сбоях.
switch (_device_wait_buffer(dev)) {
case 0: break; // New frame
case -2: return -2; // Persistent timeout
default: return -1; // Error
}
us_device_runtime_s *const run = dev->run;
*hw = NULL;
@@ -384,7 +356,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
if (buf_got) {
break; // Process any latest valid frame
} else if (broken) {
return -2; // If we have only broken frames on this capture session
return -3; // If we have only broken frames on this capture session
}
}
_D_LOG_PERROR("Can't grab device buffer");
@@ -419,20 +391,71 @@ int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) {
return 0;
}
int us_device_consume_event(us_device_s *dev) {
struct v4l2_event event;
_D_LOG_INFO("Consuming V4L2 event ...");
if (us_xioctl(dev->run->fd, VIDIOC_DQEVENT, &event) == 0) {
switch (event.type) {
case V4L2_EVENT_SOURCE_CHANGE:
_D_LOG_INFO("Got V4L2_EVENT_SOURCE_CHANGE: source changed");
return -1;
case V4L2_EVENT_EOS:
_D_LOG_INFO("Got V4L2_EVENT_EOS: end of stream (ignored)");
return 0;
int _device_wait_buffer(us_device_s *dev) {
us_device_runtime_s *const run = dev->run;
# define INIT_FD_SET(x_set) \
fd_set x_set; FD_ZERO(&x_set); FD_SET(run->fd, &x_set);
INIT_FD_SET(read_fds);
INIT_FD_SET(error_fds);
# undef INIT_FD_SET
// Раньше мы проверяли и has_write, но потом выяснилось, что libcamerify зачем-то
// генерирует эвенты на запись, вероятно ошибочно. Судя по всему, игнорирование
// has_write не делает никому плохо.
struct timeval timeout;
timeout.tv_sec = dev->timeout;
timeout.tv_usec = 0;
_D_LOG_DEBUG("Calling select() on video device ...");
bool has_read = false;
bool has_error = false;
const int selected = select(run->fd + 1, &read_fds, NULL, &error_fds, &timeout);
if (selected > 0) {
has_read = FD_ISSET(run->fd, &read_fds);
has_error = FD_ISSET(run->fd, &error_fds);
}
_D_LOG_DEBUG("Device select() --> %d; has_read=%d, has_error=%d", selected, has_read, has_error);
if (selected < 0) {
if (errno != EINTR) {
_D_LOG_PERROR("Device select() error");
}
return -1;
} else if (selected == 0) {
if (!dev->persistent) {
// Если устройство не персистентное, то таймаут является ошибкой
return -1;
}
if (!run->persistent_timeout_reported) {
_D_LOG_ERROR("Persistent device timeout (unplugged)");
run->persistent_timeout_reported = true;
}
return -2; // Таймаут, нет новых фреймов
} else {
_D_LOG_PERROR("Got some V4L2 device event, but where is it? ");
run->persistent_timeout_reported = false;
if (has_error && _device_consume_event(dev) < 0) {
return -1; // Restart required
}
}
return 0;
}
static int _device_consume_event(us_device_s *dev) {
struct v4l2_event event;
if (us_xioctl(dev->run->fd, VIDIOC_DQEVENT, &event) < 0) {
_D_LOG_PERROR("Can't consume V4L2 event");
return -1;
}
switch (event.type) {
case V4L2_EVENT_SOURCE_CHANGE:
_D_LOG_INFO("Got V4L2_EVENT_SOURCE_CHANGE: Source changed");
return -1;
case V4L2_EVENT_EOS:
_D_LOG_INFO("Got V4L2_EVENT_EOS: End of stream");
return -1;
}
return 0;
}
@@ -659,7 +682,7 @@ static int _device_open_format(us_device_s *dev, bool first) {
_format_to_string_supported(FMT(pixelformat)));
char *format_str;
if ((format_str = (char *)_format_to_string_nullable(FMT(pixelformat))) != NULL) {
if ((format_str = (char*)_format_to_string_nullable(FMT(pixelformat))) != NULL) {
_D_LOG_INFO("Falling back to format=%s", format_str);
} else {
char fourcc_str[8];
@@ -1060,7 +1083,7 @@ static const char *_standard_to_string(v4l2_std_id standard) {
return item->name;
}
});
return _STANDARDS[0].name;
return "???";
}
static const char *_io_method_to_string_supported(enum v4l2_memory io_method) {

View File

@@ -36,13 +36,8 @@
#define US_VIDEO_MAX_FPS ((uint)120)
#define US_STANDARD_UNKNOWN V4L2_STD_UNKNOWN
#define US_STANDARDS_STR "PAL, NTSC, SECAM"
#define US_FORMAT_UNKNOWN -1
#define US_FORMATS_STR "YUYV, YVYU, UYVY, RGB565, RGB24, BGR24, MJPEG, JPEG"
#define US_IO_METHOD_UNKNOWN -1
#define US_IO_METHODS_STR "MMAP, USERPTR"
@@ -126,13 +121,11 @@ us_device_s *us_device_init(void);
void us_device_destroy(us_device_s *dev);
int us_device_parse_format(const char *str);
v4l2_std_id us_device_parse_standard(const char *str);
int us_device_parse_standard(const char *str);
int us_device_parse_io_method(const char *str);
int us_device_open(us_device_s *dev);
void us_device_close(us_device_s *dev);
int us_device_select(us_device_s *dev, bool *has_read, bool *has_error);
int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw);
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw);
int us_device_consume_event(us_device_s *dev);

View File

@@ -73,13 +73,13 @@ void us_encoder_destroy(us_encoder_s *enc) {
free(enc);
}
us_encoder_type_e us_encoder_parse_type(const char *str) {
int us_encoder_parse_type(const char *str) {
US_ARRAY_ITERATE(_ENCODER_TYPES, 0, item, {
if (!strcasecmp(item->name, str)) {
return item->type;
}
});
return US_ENCODER_TYPE_UNKNOWN;
return -1;
}
const char *us_encoder_type_to_string(us_encoder_type_e type) {
@@ -91,7 +91,9 @@ const char *us_encoder_type_to_string(us_encoder_type_e type) {
return _ENCODER_TYPES[0].name;
}
us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *dev) {
void us_encoder_open(us_encoder_s *enc, us_device_s *dev) {
assert(enc->run->pool == NULL);
# define DR(x_next) dev->run->x_next
us_encoder_type_e type = (_ER(cpu_forced) ? US_ENCODER_TYPE_CPU : enc->type);
@@ -162,7 +164,7 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
: 0
);
return us_workers_pool_init(
enc->run->pool = us_workers_pool_init(
"JPEG", "jw", n_workers, desired_interval,
_worker_job_init, (void *)enc,
_worker_job_destroy,
@@ -171,6 +173,11 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
# undef DR
}
void us_encoder_close(us_encoder_s *enc) {
assert(enc->run->pool != NULL);
US_DELETE(enc->run->pool, us_workers_pool_destroy);
}
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality) {
US_MUTEX_LOCK(_ER(mutex));
*type = _ER(type);
@@ -198,8 +205,6 @@ static bool _worker_run_job(us_worker_s *wr) {
const us_frame_s *src = &job->hw->raw;
us_frame_s *dest = job->dest;
assert(_ER(type) != US_ENCODER_TYPE_UNKNOWN);
if (_ER(type) == US_ENCODER_TYPE_CPU) {
US_LOG_VERBOSE("Compressing JPEG using CPU: worker=%s, buffer=%u",
wr->name, job->hw->buf.index);
@@ -223,6 +228,9 @@ static bool _worker_run_job(us_worker_s *wr) {
us_frame_encoding_begin(src, dest, V4L2_PIX_FMT_JPEG);
usleep(5000); // Просто чтобы работала логика desired_fps
dest->encode_end_ts = us_get_now_monotonic(); // us_frame_encoding_end()
} else {
assert(0 && "Unknown encoder type");
}
US_LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u",

View File

@@ -47,7 +47,6 @@
#define ENCODER_TYPES_STR "CPU, HW, M2M-VIDEO, M2M-IMAGE, NOOP"
typedef enum {
US_ENCODER_TYPE_UNKNOWN, // Only for us_encoder_parse_type() and main()
US_ENCODER_TYPE_CPU,
US_ENCODER_TYPE_HW,
US_ENCODER_TYPE_M2M_VIDEO,
@@ -63,6 +62,8 @@ typedef struct {
unsigned n_m2ms;
us_m2m_encoder_s **m2ms;
us_workers_pool_s *pool;
} us_encoder_runtime_s;
typedef struct {
@@ -83,10 +84,10 @@ typedef struct {
us_encoder_s *us_encoder_init(void);
void us_encoder_destroy(us_encoder_s *enc);
us_encoder_type_e us_encoder_parse_type(const char *str);
int us_encoder_parse_type(const char *str);
const char *us_encoder_type_to_string(us_encoder_type_e type);
us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *dev);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality);
void us_encoder_open(us_encoder_s *enc, us_device_s *dev);
void us_encoder_close(us_encoder_s *enc);
int us_encoder_compress(us_encoder_s *enc, unsigned worker_number, us_frame_s *src, us_frame_s *dest);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality);

View File

@@ -37,7 +37,6 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_ctx);
static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx);
static void _http_request_watcher(int fd, short event, void *v_server);
static void _http_refresher(int fd, short event, void *v_server);
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
@@ -91,11 +90,6 @@ void us_server_destroy(us_server_s *server) {
event_free(run->refresher);
}
if (run->request_watcher != NULL) {
event_del(run->request_watcher);
event_free(run->request_watcher);
}
evhttp_free(run->http);
US_CLOSE_FD(run->ext_fd);
event_base_free(run->base);
@@ -140,14 +134,6 @@ int us_server_listen(us_server_s *server) {
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;
if (server->exit_on_no_clients > 0) {
run->last_request_ts = us_get_now_monotonic();
struct timeval interval = {0};
interval.tv_usec = 100000;
assert((run->request_watcher = event_new(run->base, -1, EV_PERSIST, _http_request_watcher, server)) != NULL);
assert(!event_add(run->request_watcher, &interval));
}
{
struct timeval interval = {0};
if (stream->dev->desired_fps > 0) {
@@ -221,9 +207,9 @@ void us_server_loop_break(us_server_s *server) {
#define ADD_HEADER(x_key, x_value) assert(!evhttp_add_header(evhttp_request_get_output_headers(request), x_key, x_value))
static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) {
us_server_runtime_s *const run = server->run;
const us_server_runtime_s *const run = server->run;
run->last_request_ts = us_get_now_monotonic();
atomic_store(&server->stream->run->http_last_request_ts, us_get_now_monotonic());
if (server->allow_origin[0] != '\0') {
const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers");
@@ -834,24 +820,6 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo
}
}
static void _http_request_watcher(int fd, short what, void *v_server) {
(void)fd;
(void)what;
us_server_s *const server = (us_server_s *)v_server;
us_server_runtime_s *const run = server->run;
const long double now = us_get_now_monotonic();
if (us_stream_has_clients(server->stream)) {
run->last_request_ts = now;
} else if (run->last_request_ts + server->exit_on_no_clients < now) {
US_LOG_INFO("HTTP: No requests or HTTP/sink clients found in last %u seconds, exiting ...",
server->exit_on_no_clients);
us_process_suicide();
run->last_request_ts = now;
}
}
static void _http_refresher(int fd, short what, void *v_server) {
(void)fd;
(void)what;

View File

@@ -120,9 +120,6 @@ typedef struct {
char *auth_token;
struct event *request_watcher;
long double last_request_ts;
struct event *refresher;
us_server_exposed_s *exposed;
@@ -156,9 +153,8 @@ typedef struct us_server_sx {
unsigned fake_height;
bool notify_parent;
unsigned exit_on_no_clients;
us_stream_s *stream;
us_stream_s *stream;
us_server_runtime_s *run;
} us_server_s;

View File

@@ -296,11 +296,13 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us
break; \
}
# define OPT_PARSE(x_name, x_dest, x_func, x_invalid, x_available) { \
if ((x_dest = x_func(optarg)) == x_invalid) { \
# define OPT_PARSE_ENUM(x_name, x_dest, x_func, x_available) { \
const int m_value = x_func(optarg); \
if (m_value < 0) { \
printf("Unknown " x_name ": %s; available: %s\n", optarg, x_available); \
return -1; \
} \
x_dest = m_value; \
break; \
}
@@ -355,10 +357,10 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us
case _O_RESOLUTION: OPT_RESOLUTION("--resolution", dev->width, dev->height, true);
# pragma GCC diagnostic ignored "-Wsign-compare"
# pragma GCC diagnostic push
case _O_FORMAT: OPT_PARSE("pixel format", dev->format, us_device_parse_format, US_FORMAT_UNKNOWN, US_FORMATS_STR);
case _O_FORMAT: OPT_PARSE_ENUM("pixel format", dev->format, us_device_parse_format, US_FORMATS_STR);
# pragma GCC diagnostic pop
case _O_TV_STANDARD: OPT_PARSE("TV standard", dev->standard, us_device_parse_standard, US_STANDARD_UNKNOWN, US_STANDARDS_STR);
case _O_IO_METHOD: OPT_PARSE("IO method", dev->io_method, us_device_parse_io_method, US_IO_METHOD_UNKNOWN, US_IO_METHODS_STR);
case _O_TV_STANDARD: OPT_PARSE_ENUM("TV standard", dev->standard, us_device_parse_standard, US_STANDARDS_STR);
case _O_IO_METHOD: OPT_PARSE_ENUM("IO method", dev->io_method, us_device_parse_io_method, US_IO_METHODS_STR);
case _O_DESIRED_FPS: OPT_NUMBER("--desired-fps", dev->desired_fps, 0, US_VIDEO_MAX_FPS, 0);
case _O_MIN_FRAME_SIZE: OPT_NUMBER("--min-frame-size", dev->min_frame_size, 1, 8192, 0);
case _O_PERSISTENT: OPT_SET(dev->persistent, true);
@@ -366,7 +368,7 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us
case _O_BUFFERS: OPT_NUMBER("--buffers", dev->n_bufs, 1, 32, 0);
case _O_WORKERS: OPT_NUMBER("--workers", enc->n_workers, 1, 32, 0);
case _O_QUALITY: OPT_NUMBER("--quality", dev->jpeg_quality, 1, 100, 0);
case _O_ENCODER: OPT_PARSE("encoder type", enc->type, us_encoder_parse_type, US_ENCODER_TYPE_UNKNOWN, ENCODER_TYPES_STR);
case _O_ENCODER: OPT_PARSE_ENUM("encoder type", enc->type, us_encoder_parse_type, ENCODER_TYPES_STR);
case _O_GLITCHED_RESOLUTIONS: break; // Deprecated
case _O_BLANK: break; // Deprecated
case _O_LAST_AS_BLANK: OPT_NUMBER("--last-as-blank", stream->last_as_blank, 0, 86400, 0);
@@ -457,7 +459,7 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us
};
break;
# endif
case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", server->exit_on_no_clients, 0, 86400, 0);
case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", stream->exit_on_no_clients, 0, 86400, 0);
# ifdef WITH_SETPROCTITLE
case _O_PROCESS_NAME_PREFIX: OPT_SET(process_name_prefix, optarg);
# endif

View File

@@ -22,13 +22,41 @@
#include "stream.h"
#include <stdlib.h>
#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream);
#include <pthread.h>
#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/process.h"
#include "../libs/logging.h"
#include "../libs/ring.h"
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "../libs/device.h"
#include "blank.h"
#include "encoder.h"
#include "workers.h"
#include "h264.h"
#ifdef WITH_GPIO
# include "gpio/gpio.h"
#endif
static bool _stream_is_stopped(us_stream_s *stream);
static bool _stream_has_any_clients(us_stream_s *stream);
static bool _stream_slowdown(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
#define _RUN(x_next) stream->run->x_next
#define _SINK_PUT(x_sink, x_frame) { \
if (stream->x_sink && us_memsink_server_check(stream->x_sink, x_frame)) {\
bool m_key_requested; /* Unused */ \
@@ -37,8 +65,8 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
}
#define _H264_PUT(x_frame, x_force_key) { \
if (_RUN(h264)) { \
us_h264_stream_process(_RUN(h264), x_frame, x_force_key); \
if (stream->run->h264) { \
us_h264_stream_process(stream->run->h264, x_frame, x_force_key); \
} \
}
@@ -48,6 +76,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
US_CALLOC(run, 1);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->http_last_request_ts, 0);
atomic_init(&run->captured_fps, 0);
atomic_init(&run->stop, false);
run->blank = us_blank_init();
@@ -72,127 +101,102 @@ void us_stream_destroy(us_stream_s *stream) {
}
void us_stream_loop(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run;
US_LOG_INFO("Using V4L2 device: %s", stream->dev->path);
US_LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
atomic_store(&run->http_last_request_ts, us_get_now_monotonic());
if (stream->h264_sink != NULL) {
_RUN(h264) = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
run->h264 = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
}
for (us_workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) {
long double grab_after = 0;
unsigned fluency_passed = 0;
unsigned captured_fps_accum = 0;
long long captured_fps_second = 0;
while (!_stream_init_loop(stream)) {
ldf grab_after = 0;
uint fluency_passed = 0;
uint captured_fps_accum = 0;
sll captured_fps_ts = 0;
US_LOG_INFO("Capturing ...");
while (!atomic_load(&_RUN(stop))) {
while (!_stream_is_stopped(stream)) {
US_SEP_DEBUG('-');
US_LOG_DEBUG("Waiting for worker ...");
us_worker_s *const ready_wr = us_workers_pool_wait(pool);
us_encoder_job_s *const ready_job = (us_encoder_job_s *)(ready_wr->job);
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
us_encoder_job_s *const ready_job = ready_wr->job;
if (ready_job->hw != NULL) {
if (us_device_release_buffer(stream->dev, ready_job->hw) < 0) {
ready_wr->job_failed = true;
goto close;
}
ready_job->hw = NULL;
if (!ready_wr->job_failed) {
if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
}
if (ready_wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
break;
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
}
}
bool h264_force_key = false;
if (stream->slowdown) {
unsigned slc = 0;
for (; slc < 10 && !atomic_load(&_RUN(stop)) && !us_stream_has_clients(stream); ++slc) {
usleep(100000);
}
h264_force_key = (slc == 10);
const bool h264_force_key = _stream_slowdown(stream);
if (_stream_is_stopped(stream)) {
goto close;
}
if (atomic_load(&_RUN(stop))) {
break;
us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(stream->dev, &hw);
switch (buf_index) {
case -3: continue; // Broken frame
case -2: // Persistent timeout
case -1: goto close; // Any error
}
assert(buf_index >= 0);
bool has_read;
bool has_error;
const int selected = us_device_select(stream->dev, &has_read, &has_error);
# ifdef WITH_GPIO
us_gpio_set_stream_online(true);
# endif
if (selected < 0) {
if (errno != EINTR) {
US_LOG_PERROR("Mainloop select() error");
break;
const ldf now_ts = us_get_now_monotonic();
if (now_ts < grab_after) {
fluency_passed += 1;
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
fluency_passed, now_ts, grab_after);
if (us_device_release_buffer(stream->dev, hw) < 0) {
goto close;
}
} else if (selected == 0) { // Persistent timeout
# ifdef WITH_GPIO
us_gpio_set_stream_online(false);
# endif
} else {
if (has_read) {
# ifdef WITH_GPIO
us_gpio_set_stream_online(true);
# endif
fluency_passed = 0;
const long double now = us_get_now_monotonic();
const long long now_second = us_floor_ms(now);
us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(stream->dev, &hw);
if (buf_index >= 0) {
if (now < grab_after) {
fluency_passed += 1;
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
fluency_passed, now, grab_after);
if (us_device_release_buffer(stream->dev, hw) < 0) {
break;
}
} else {
fluency_passed = 0;
if (now_second != captured_fps_second) {
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
atomic_store(&stream->run->captured_fps, captured_fps_accum);
captured_fps_accum = 0;
captured_fps_second = now_second;
}
captured_fps_accum += 1;
const long double fluency_delay = us_workers_pool_get_fluency_delay(pool, ready_wr);
grab_after = now + fluency_delay;
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
ready_job->hw = hw;
us_workers_pool_assign(pool, ready_wr);
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
_SINK_PUT(raw_sink, &hw->raw);
_H264_PUT(&hw->raw, h264_force_key);
}
} else if (buf_index != -2) { // -2 for broken frame
break;
}
const sll now_sec_ts = us_floor_ms(now_ts);
if (now_sec_ts != captured_fps_ts) {
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
atomic_store(&run->captured_fps, captured_fps_accum);
captured_fps_accum = 0;
captured_fps_ts = now_sec_ts;
}
captured_fps_accum += 1;
if (has_error && us_device_consume_event(stream->dev) < 0) {
break;
}
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
grab_after = now_ts + fluency_delay;
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
ready_job->hw = hw;
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
_SINK_PUT(raw_sink, &hw->raw);
_H264_PUT(&hw->raw, h264_force_key);
}
}
us_workers_pool_destroy(pool);
close:
us_encoder_close(stream->enc);
us_device_close(stream->dev);
# ifdef WITH_GPIO
@@ -200,28 +204,75 @@ void us_stream_loop(us_stream_s *stream) {
# endif
}
US_DELETE(_RUN(h264), us_h264_stream_destroy);
US_DELETE(run->h264, us_h264_stream_destroy);
}
void us_stream_loop_break(us_stream_s *stream) {
atomic_store(&_RUN(stop), true);
atomic_store(&stream->run->stop, true);
}
bool us_stream_has_clients(us_stream_s *stream) {
static bool _stream_is_stopped(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run;
const bool stop = atomic_load(&run->stop);
if (stop) {
return true;
}
if (stream->exit_on_no_clients > 0) {
const ldf now_ts = us_get_now_monotonic();
const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds
if (_stream_has_any_clients(stream)) {
atomic_store(&run->http_last_request_ts, now_ts);
} else if (http_last_request_ts + stream->exit_on_no_clients < now_ts) {
US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...",
stream->exit_on_no_clients);
us_process_suicide();
atomic_store(&run->http_last_request_ts, now_ts);
}
}
return false;
}
static bool _stream_has_any_clients(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
atomic_load(&_RUN(http_has_clients))
atomic_load(&run->http_has_clients)
// has_clients синков НЕ обновляются в реальном времени
|| (stream->sink != NULL && atomic_load(&stream->sink->has_clients))
|| (_RUN(h264) != NULL && /*_RUN(h264->sink) == NULL ||*/ atomic_load(&_RUN(h264->sink->has_clients)))
|| (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients))
);
}
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
static bool _stream_slowdown(us_stream_s *stream) {
if (stream->slowdown) {
unsigned count = 0;
while (count < 10 && !_stream_is_stopped(stream) && !_stream_has_any_clients(stream)) {
usleep(100000);
++count;
}
return (count >= 10);
}
return false;
}
static int _stream_init_loop(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run;
int access_errno = 0;
while (!atomic_load(&_RUN(stop))) {
atomic_store(&stream->run->captured_fps, 0);
while (!_stream_is_stopped(stream)) {
unsigned width = stream->dev->run->width;
unsigned height = stream->dev->run->height;
if (width == 0 || height == 0) {
width = stream->dev->width;
height = stream->dev->height;
}
us_blank_draw(run->blank, "< NO SIGNAL >", width, height);
atomic_store(&run->captured_fps, 0);
_stream_expose_frame(stream, NULL);
_SINK_PUT(raw_sink, run->blank->raw);
_H264_PUT(run->blank->raw, false);
if (access(stream->dev->path, R_OK|W_OK) < 0) {
if (access_errno != errno) {
US_SEP_INFO('=');
@@ -238,68 +289,60 @@ static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
stream->dev->dma_export = (
stream->enc->type == US_ENCODER_TYPE_M2M_VIDEO
|| stream->enc->type == US_ENCODER_TYPE_M2M_IMAGE
|| _RUN(h264) != NULL
|| run->h264 != NULL
);
if (us_device_open(stream->dev) == 0) {
return us_encoder_workers_pool_init(stream->enc, stream->dev);
us_encoder_open(stream->enc, stream->dev);
return 0;
}
US_LOG_INFO("Sleeping %u seconds before new stream init ...", stream->error_delay);
sleep_and_retry:
sleep(stream->error_delay);
}
return NULL;
return -1;
}
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
us_stream_runtime_s *const run = stream->run;
us_blank_s *const blank = run->blank;
us_frame_s *new = NULL;
if (frame != NULL) {
new = frame;
_RUN(last_as_blank_ts) = 0; // Останавливаем таймер
run->last_as_blank_ts = 0; // Останавливаем таймер
US_LOG_DEBUG("Exposed ALIVE video frame");
} else {
unsigned width = stream->dev->run->width;
unsigned height = stream->dev->run->height;
if (width == 0 || height == 0) {
width = stream->dev->width;
height = stream->dev->height;
}
us_blank_draw(blank, "< NO SIGNAL >", width, height);
if (run->last_online) { // Если переходим из online в offline
if (stream->last_as_blank < 0) { // Если last_as_blank выключен, просто покажем старую картинку
new = blank->jpeg;
new = run->blank->jpeg;
US_LOG_INFO("Changed video frame to BLANK");
} else if (stream->last_as_blank > 0) { // // Если нужен таймер - запустим
_RUN(last_as_blank_ts) = us_get_now_monotonic() + stream->last_as_blank;
run->last_as_blank_ts = us_get_now_monotonic() + stream->last_as_blank;
US_LOG_INFO("Freezed last ALIVE video frame for %d seconds", stream->last_as_blank);
} else { // last_as_blank == 0 - показываем последний фрейм вечно
US_LOG_INFO("Freezed last ALIVE video frame forever");
}
} else if (stream->last_as_blank < 0) {
new = blank->jpeg;
new = run->blank->jpeg;
// US_LOG_INFO("Changed video frame to BLANK");
}
if ( // Если уже оффлайн, включена фича last_as_blank с таймером и он запущен
stream->last_as_blank > 0
&& _RUN(last_as_blank_ts) != 0
&& _RUN(last_as_blank_ts) < us_get_now_monotonic()
&& run->last_as_blank_ts != 0
&& run->last_as_blank_ts < us_get_now_monotonic()
) {
new = blank->jpeg;
_RUN(last_as_blank_ts) = 0; // Останавливаем таймер
new = run->blank->jpeg;
run->last_as_blank_ts = 0; // Останавливаем таймер
US_LOG_INFO("Changed last ALIVE video frame to BLANK");
}
}
int ri = -1;
while (
!atomic_load(&_RUN(stop))
!_stream_is_stopped(stream)
&& ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0)
) {
US_LOG_ERROR("Can't push JPEG to HTTP ring (no free slots)");
@@ -319,10 +362,5 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
run->last_online = (frame != NULL);
us_ring_producer_release(run->http_jpeg_ring, ri);
_SINK_PUT(sink, (frame != NULL ? frame : blank->jpeg));
if (frame == NULL) {
_SINK_PUT(raw_sink, blank->raw);
_H264_PUT(blank->raw, false);
}
_SINK_PUT(sink, (frame != NULL ? frame : run->blank->jpeg));
}

View File

@@ -22,36 +22,22 @@
#pragma once
#include <stdlib.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <unistd.h>
#include <errno.h>
#include <assert.h>
#include <pthread.h>
#include <linux/videodev2.h>
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/types.h"
#include "../libs/ring.h"
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "../libs/device.h"
#include "blank.h"
#include "encoder.h"
#include "workers.h"
#include "h264.h"
#ifdef WITH_GPIO
# include "gpio/gpio.h"
#endif
typedef struct {
us_ring_s *http_jpeg_ring;
atomic_bool http_has_clients;
atomic_ullong http_last_request_ts; // Seconds
atomic_uint captured_fps;
bool last_online;
@@ -71,6 +57,7 @@ typedef struct {
int last_as_blank;
bool slowdown;
unsigned error_delay;
unsigned exit_on_no_clients;
us_memsink_s *sink;
us_memsink_s *raw_sink;
@@ -89,5 +76,3 @@ void us_stream_destroy(us_stream_s *stream);
void us_stream_loop(us_stream_s *stream);
void us_stream_loop_break(us_stream_s *stream);
bool us_stream_has_clients(us_stream_s *stream);

View File

@@ -75,7 +75,8 @@ us_drm_s *us_drm_init(void) {
us_drm_s *drm;
US_CALLOC(drm, 1);
drm->path = "/dev/dri/card0";
// drm->path = "/dev/dri/card0";
drm->path = "/dev/dri/by-path/platform-gpu-card";
drm->port = "HDMI-A-1";
drm->n_bufs = 4;
drm->timeout = 5;

View File

@@ -216,40 +216,26 @@ static void _main_loop(void) {
continue;
}
bool has_read;
bool has_error;
const int selected = us_device_select(dev, &has_read, &has_error);
if (selected < 0) {
if (errno != EINTR) {
US_LOG_PERROR("Mainloop select() error");
goto close;
}
} else if (selected == 0) { // Persistent timeout
if (us_drm_expose(drm, US_DRM_EXPOSE_NO_SIGNAL, NULL, 0) < 0) {
_slowdown();
continue;
}
} else {
if (has_read) {
us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(dev, &hw);
if (buf_index >= 0) {
const int exposed = us_drm_expose(drm, US_DRM_EXPOSE_FRAME, &hw->raw, dev->run->hz);
if (us_device_release_buffer(dev, hw) < 0) {
goto close;
}
if (exposed < 0) {
_slowdown();
continue;
}
} else if (buf_index != -2) { // -2 for broken frame
goto close;
us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(dev, &hw);
switch (buf_index) {
case -3: continue; // Broken frame
case -2: // Persistent timeout
if (us_drm_expose(drm, US_DRM_EXPOSE_NO_SIGNAL, NULL, 0) < 0) {
_slowdown();
continue;
}
}
if (has_error && us_device_consume_event(dev) < 0) {
goto close;
}
case -1: goto close; // Any error
}
assert(buf_index >= 0);
const int exposed = us_drm_expose(drm, US_DRM_EXPOSE_FRAME, &hw->raw, dev->run->hz);
if (us_device_release_buffer(dev, hw) < 0) {
goto close;
}
if (exposed < 0) {
_slowdown();
continue;
}
}