refactoring

This commit is contained in:
Maxim Devaev 2024-03-29 22:02:03 +02:00
parent 3f69dd785f
commit f8a703f166
12 changed files with 122 additions and 203 deletions

View File

@ -223,7 +223,7 @@ static int _dump_sink(
us_frame_s *frame = us_frame_init();
us_memsink_s *sink = NULL;
if ((sink = us_memsink_init("input", sink_name, false, 0, false, 0, sink_timeout)) == NULL) {
if ((sink = us_memsink_init_opened("input", sink_name, false, 0, false, 0, sink_timeout)) == NULL) {
goto error;
}

View File

@ -199,6 +199,8 @@ int us_capture_open(us_capture_s *cap) {
}
}
US_LOG_INFO("Using V4L2 device: %s", cap->path);
if (_capture_open_check_cap(cap) < 0) {
goto error;
}

View File

@ -40,7 +40,7 @@
#include "memsinksh.h"
us_memsink_s *us_memsink_init(
us_memsink_s *us_memsink_init_opened(
const char *name, const char *obj, bool server,
mode_t mode, bool rm, uint client_ttl, uint timeout) {

View File

@ -50,7 +50,7 @@ typedef struct {
} us_memsink_s;
us_memsink_s *us_memsink_init(
us_memsink_s *us_memsink_init_opened(
const char *name, const char *obj, bool server,
mode_t mode, bool rm, uint client_ttl, uint timeout);

View File

@ -1,78 +0,0 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "h264.h"
#include <stdatomic.h>
#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/logging.h"
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "../libs/unjpeg.h"
#include "m2m.h"
us_h264_stream_s *us_h264_stream_init(us_memsink_s *sink, const char *path, uint bitrate, uint gop) {
us_h264_stream_s *h264;
US_CALLOC(h264, 1);
h264->sink = sink;
h264->tmp_src = us_frame_init();
h264->dest = us_frame_init();
atomic_init(&h264->online, false);
h264->enc = us_m2m_h264_encoder_init("H264", path, bitrate, gop);
return h264;
}
void us_h264_stream_destroy(us_h264_stream_s *h264) {
us_m2m_encoder_destroy(h264->enc);
us_frame_destroy(h264->dest);
us_frame_destroy(h264->tmp_src);
free(h264);
}
void us_h264_stream_process(us_h264_stream_s *h264, const us_frame_s *frame, bool force_key) {
if (us_is_jpeg(frame->format)) {
const ldf now_ts = us_get_now_monotonic();
US_LOG_DEBUG("H264: Input frame is JPEG; decoding ...");
if (us_unjpeg(frame, h264->tmp_src, true) < 0) {
atomic_store(&h264->online, false);
return;
}
frame = h264->tmp_src;
US_LOG_VERBOSE("H264: JPEG decoded; time=%.3Lf", us_get_now_monotonic() - now_ts);
}
if (h264->key_requested) {
US_LOG_INFO("H264: Requested keyframe by a sink client");
h264->key_requested = false;
force_key = true;
}
bool online = false;
if (!us_m2m_encoder_compress(h264->enc, frame, h264->dest, force_key)) {
online = !us_memsink_server_put(h264->sink, h264->dest, &h264->key_requested);
}
atomic_store(&h264->online, online);
}

View File

@ -1,46 +0,0 @@
/*****************************************************************************
# #
# uStreamer - Lightweight and fast MJPEG-HTTP streamer. #
# #
# Copyright (C) 2018-2024 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <stdatomic.h>
#include "../libs/types.h"
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "m2m.h"
typedef struct {
us_memsink_s *sink;
bool key_requested;
us_frame_s *tmp_src;
us_frame_s *dest;
us_m2m_encoder_s *enc;
atomic_bool online;
} us_h264_stream_s;
us_h264_stream_s *us_h264_stream_init(us_memsink_s *sink, const char *path, uint bitrate, uint gop);
void us_h264_stream_destroy(us_h264_stream_s *h264);
void us_h264_stream_process(us_h264_stream_s *h264, const us_frame_s *frame, bool force_key);

View File

@ -473,12 +473,12 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
enc_quality
);
if (stream->run->h264 != NULL) {
if (stream->h264_sink != NULL) {
_A_EVBUFFER_ADD_PRINTF(buf,
" \"h264\": {\"bitrate\": %u, \"gop\": %u, \"online\": %s},",
stream->h264_bitrate,
stream->h264_gop,
us_bool_to_string(atomic_load(&stream->run->h264->online))
us_bool_to_string(atomic_load(&stream->run->http_h264_online))
);
}

View File

@ -95,15 +95,31 @@ void us_m2m_encoder_destroy(us_m2m_encoder_s *enc) {
int us_m2m_encoder_compress(us_m2m_encoder_s *enc, const us_frame_s *src, us_frame_s *dest, bool force_key) {
us_m2m_encoder_runtime_s *const run = enc->run;
us_frame_encoding_begin(src, dest, (enc->output_format == V4L2_PIX_FMT_MJPEG ? V4L2_PIX_FMT_JPEG : enc->output_format));
const ldf now_ts = us_get_now_monotonic();
uint dest_format = enc->output_format;
switch (enc->output_format) {
case V4L2_PIX_FMT_JPEG:
force_key = false;
// fall through
case V4L2_PIX_FMT_MJPEG:
dest_format = V4L2_PIX_FMT_JPEG;
break;
case V4L2_PIX_FMT_H264:
force_key = (
force_key
|| run->last_online != src->online
|| run->last_encode_ts + 0.5 < now_ts
);
break;
}
us_frame_encoding_begin(src, dest, dest_format);
_m2m_encoder_ensure(enc, src);
if (!run->ready) { // Already prepared but failed
return -1;
}
force_key = (enc->output_format == V4L2_PIX_FMT_H264 && (force_key || run->last_online != src->online));
_LOG_DEBUG("Compressing new frame; force_key=%d ...", force_key);
if (_m2m_encoder_compress_raw(enc, src, dest, force_key) < 0) {
@ -118,6 +134,7 @@ int us_m2m_encoder_compress(us_m2m_encoder_s *enc, const us_frame_s *src, us_fra
dest->used, dest->encode_end_ts - dest->encode_begin_ts, force_key);
run->last_online = src->online;
run->last_encode_ts = now_ts;
return 0;
}

View File

@ -47,6 +47,7 @@ typedef struct {
bool ready;
int last_online;
ldf last_encode_ts;
} us_m2m_encoder_runtime_s;
typedef struct {

View File

@ -513,7 +513,7 @@ int options_parse(us_options_s *options, us_capture_s *cap, us_encoder_s *enc, u
# define ADD_SINK(x_label, x_prefix) { \
if (x_prefix##_name && x_prefix##_name[0] != '\0') { \
options->x_prefix = us_memsink_init( \
options->x_prefix = us_memsink_init_opened( \
x_label, \
x_prefix##_name, \
true, \

View File

@ -41,6 +41,7 @@
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "../libs/capture.h"
#include "../libs/unjpeg.h"
#ifdef WITH_V4P
# include "../libs/drm/drm.h"
#endif
@ -48,7 +49,7 @@
#include "blank.h"
#include "encoder.h"
#include "workers.h"
#include "h264.h"
#include "m2m.h"
#ifdef WITH_GPIO
# include "gpio/gpio.h"
#endif
@ -74,8 +75,8 @@ static void _stream_set_capture_state(us_stream_s *stream, uint width, uint heig
static void *_releaser_thread(void *v_ctx);
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
static void *_raw_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx);
#ifdef WITH_V4P
static void *_drm_thread(void *v_ctx);
#endif
@ -90,12 +91,14 @@ static void _stream_drm_ensure_no_signal(us_stream_s *stream);
#endif
static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame);
static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame);
static void _stream_encode_expose_h264(us_stream_s *stream, const us_frame_s *frame, bool force_key);
static void _stream_check_suicide(us_stream_s *stream);
us_stream_s *us_stream_init(us_capture_s *cap, us_encoder_s *enc) {
us_stream_runtime_s *run;
US_CALLOC(run, 1);
atomic_init(&run->http_h264_online, false);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->http_snapshot_requested, 0);
@ -129,13 +132,12 @@ void us_stream_loop(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run;
us_capture_s *const cap = stream->cap;
US_LOG_INFO("Using V4L2 device: %s", cap->path);
US_LOG_INFO("Using desired FPS: %u", cap->desired_fps);
atomic_store(&run->http_last_request_ts, us_get_now_monotonic());
if (stream->h264_sink != NULL) {
run->h264 = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
run->h264_enc = us_m2m_h264_encoder_init("H264", stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
run->h264_tmp_src = us_frame_init();
run->h264_dest = us_frame_init();
}
while (!_stream_init_loop(stream)) {
@ -166,8 +168,8 @@ void us_stream_loop(us_stream_s *stream) {
US_THREAD_CREATE(x_ctx->tid, (x_thread), x_ctx); \
}
CREATE_WORKER(true, jpeg_ctx, _jpeg_thread, cap->run->n_bufs);
CREATE_WORKER((run->h264 != NULL), h264_ctx, _h264_thread, cap->run->n_bufs);
CREATE_WORKER((stream->raw_sink != NULL), raw_ctx, _raw_thread, 2);
CREATE_WORKER((stream->h264_sink != NULL), h264_ctx, _h264_thread, cap->run->n_bufs);
# ifdef WITH_V4P
CREATE_WORKER((stream->drm != NULL), drm_ctx, _drm_thread, cap->run->n_bufs); // cppcheck-suppress assertWithSideEffect
# endif
@ -207,8 +209,8 @@ void us_stream_loop(us_stream_s *stream) {
us_queue_put(x_ctx->queue, hw, 0); \
}
QUEUE_HW(jpeg_ctx);
QUEUE_HW(h264_ctx);
QUEUE_HW(raw_ctx);
QUEUE_HW(h264_ctx);
# ifdef WITH_V4P
QUEUE_HW(drm_ctx);
# endif
@ -237,8 +239,8 @@ void us_stream_loop(us_stream_s *stream) {
# ifdef WITH_V4P
DELETE_WORKER(drm_ctx);
# endif
DELETE_WORKER(raw_ctx);
DELETE_WORKER(h264_ctx);
DELETE_WORKER(raw_ctx);
DELETE_WORKER(jpeg_ctx);
# undef DELETE_WORKER
@ -259,7 +261,9 @@ void us_stream_loop(us_stream_s *stream) {
}
}
US_DELETE(run->h264, us_h264_stream_destroy);
US_DELETE(run->h264_enc, us_m2m_encoder_destroy);
US_DELETE(run->h264_tmp_src, us_frame_destroy);
US_DELETE(run->h264_dest, us_frame_destroy);
}
void us_stream_loop_break(us_stream_s *stream) {
@ -376,48 +380,6 @@ static void *_jpeg_thread(void *v_ctx) {
return NULL;
}
static void *_h264_thread(void *v_ctx) {
US_THREAD_SETTLE("str_h264");
_worker_context_s *ctx = v_ctx;
us_h264_stream_s *h264 = ctx->stream->run->h264;
ldf grab_after_ts = 0;
ldf last_encode_ts = us_get_now_monotonic();
while (!atomic_load(ctx->stop)) {
us_capture_hwbuf_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
if (!us_memsink_server_check(h264->sink, NULL)) {
US_LOG_VERBOSE("H264: Passed encoding because nobody is watching");
goto decref;
}
if (hw->raw.grab_ts < grab_after_ts) {
US_LOG_VERBOSE("H264: Passed encoding for FPS limit: %u", h264->enc->run->fps_limit);
goto decref;
}
// Форсим кейфрейм, если от захвата давно не было фреймов
const ldf now_ts = us_get_now_monotonic();
const bool force_key = (last_encode_ts + 0.5 < now_ts);
us_h264_stream_process(h264, &hw->raw, force_key);
last_encode_ts = now_ts;
// M2M-енкодер увеличивает задержку на 100 милисекунд при 1080p, если скормить ему больше 30 FPS.
// Поэтому у нас есть два режима: 60 FPS для маленьких видео и 30 для 1920x1080(1200).
// Следующй фрейм захватывается не раньше, чем это требуется по FPS, минус небольшая
// погрешность (если захват неравномерный) - немного меньше 1/60, и примерно треть от 1/30.
const ldf frame_interval = (ldf)1 / h264->enc->run->fps_limit;
grab_after_ts = hw->raw.grab_ts + frame_interval - 0.01;
decref:
us_capture_hwbuf_decref(hw);
}
return NULL;
}
static void *_raw_thread(void *v_ctx) {
US_THREAD_SETTLE("str_raw");
_worker_context_s *ctx = v_ctx;
@ -438,6 +400,45 @@ static void *_raw_thread(void *v_ctx) {
return NULL;
}
static void *_h264_thread(void *v_ctx) {
US_THREAD_SETTLE("str_h264");
_worker_context_s *ctx = v_ctx;
us_stream_s *stream = ctx->stream;
ldf grab_after_ts = 0;
while (!atomic_load(ctx->stop)) {
us_capture_hwbuf_s *hw = _get_latest_hw(ctx->queue);
if (hw == NULL) {
continue;
}
if (!us_memsink_server_check(stream->h264_sink, NULL)) {
US_LOG_VERBOSE("H264: Passed encoding because nobody is watching");
goto decref;
}
if (hw->raw.grab_ts < grab_after_ts) {
US_LOG_DEBUG("H264: Passed encoding for FPS limit");
goto decref;
}
_stream_encode_expose_h264(ctx->stream, &hw->raw, false);
// M2M-енкодер увеличивает задержку на 100 милисекунд при 1080p, если скормить ему больше 30 FPS.
// Поэтому у нас есть два режима: 60 FPS для маленьких видео и 30 для 1920x1080(1200).
// Следующй фрейм захватывается не раньше, чем это требуется по FPS, минус небольшая
// погрешность (если захват неравномерный) - немного меньше 1/60, и примерно треть от 1/30.
const uint fps_limit = stream->run->h264_enc->run->fps_limit;
if (fps_limit > 0) {
const ldf frame_interval = (ldf)1 / fps_limit;
grab_after_ts = hw->raw.grab_ts + frame_interval - 0.01;
}
decref:
us_capture_hwbuf_decref(hw);
}
return NULL;
}
#ifdef WITH_V4P
static void *_drm_thread(void *v_ctx) {
US_THREAD_SETTLE("str_drm");
@ -451,7 +452,7 @@ static void *_drm_thread(void *v_ctx) {
while (!atomic_load(ctx->stop)) {
# define CHECK(x_arg) if ((x_arg) < 0) { goto close; }
# define SLOWDOWN { \
ldf m_next_ts = us_get_now_monotonic() + 1; \
const ldf m_next_ts = us_get_now_monotonic() + 1; \
while (!atomic_load(ctx->stop) && us_get_now_monotonic() < m_next_ts) { \
us_capture_hwbuf_s *m_pass_hw = _get_latest_hw(ctx->queue); \
if (m_pass_hw != NULL) { \
@ -517,10 +518,9 @@ static bool _stream_has_jpeg_clients_cached(us_stream_s *stream) {
}
static bool _stream_has_any_clients_cached(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
_stream_has_jpeg_clients_cached(stream)
|| (run->h264 != NULL && atomic_load(&run->h264->sink->has_clients))
|| (stream->h264_sink != NULL && atomic_load(&stream->h264_sink->has_clients))
|| (stream->raw_sink != NULL && atomic_load(&stream->raw_sink->has_clients))
# ifdef WITH_V4P
|| (stream->drm != NULL)
@ -539,22 +539,18 @@ static int _stream_init_loop(us_stream_s *stream) {
// Флаги has_clients у синков не обновляются сами по себе, поэтому обновим их
// на каждой итерации старта стрима. После старта этим будут заниматься воркеры.
if (stream->jpeg_sink != NULL) {
us_memsink_server_check(stream->jpeg_sink, NULL);
}
if (stream->run->h264 != NULL) {
us_memsink_server_check(stream->run->h264->sink, NULL);
}
if (stream->raw_sink != NULL) {
us_memsink_server_check(stream->raw_sink, NULL);
}
# define UPDATE_SINK(x_sink) if (x_sink != NULL) { us_memsink_server_check(x_sink, NULL); }
UPDATE_SINK(stream->jpeg_sink);
UPDATE_SINK(stream->raw_sink);
UPDATE_SINK(stream->h264_sink);
# undef UPDATE_SINK
_stream_check_suicide(stream);
stream->cap->dma_export = (
stream->enc->type == US_ENCODER_TYPE_M2M_VIDEO
|| stream->enc->type == US_ENCODER_TYPE_M2M_IMAGE
|| run->h264 != NULL
|| stream->h264_sink != NULL
# ifdef WITH_V4P
|| stream->drm != NULL
# endif
@ -590,10 +586,8 @@ static int _stream_init_loop(us_stream_s *stream) {
_stream_set_capture_state(stream, width, height, false, 0);
_stream_expose_jpeg(stream, run->blank->jpeg);
if (run->h264 != NULL) {
us_h264_stream_process(run->h264, run->blank->raw, true);
}
_stream_expose_raw(stream, run->blank->raw);
_stream_encode_expose_h264(stream, run->blank->raw, true);
# ifdef WITH_V4P
_stream_drm_ensure_no_signal(stream);
@ -647,6 +641,30 @@ static void _stream_expose_raw(us_stream_s *stream, const us_frame_s *frame) {
}
}
static void _stream_encode_expose_h264(us_stream_s *stream, const us_frame_s *frame, bool force_key) {
us_stream_runtime_s *run = stream->run;
if (stream->h264_sink == NULL) {
return;
}
bool online = false;
if (us_is_jpeg(frame->format)) {
if (us_unjpeg(frame, run->h264_tmp_src, true) < 0) {
goto done;
}
frame = run->h264_tmp_src;
}
if (run->h264_key_requested) {
US_LOG_INFO("H264: Requested keyframe by a sink client");
run->h264_key_requested = false;
force_key = true;
}
if (!us_m2m_encoder_compress(run->h264_enc, frame, run->h264_dest, force_key)) {
online = !us_memsink_server_put(stream->h264_sink, run->h264_dest, &run->h264_key_requested);
}
done:
atomic_store(&run->http_h264_online, online);
}
static void _stream_check_suicide(us_stream_s *stream) {
if (stream->exit_on_no_clients == 0) {
return;

View File

@ -29,6 +29,7 @@
#include "../libs/types.h"
#include "../libs/queue.h"
#include "../libs/ring.h"
#include "../libs/frame.h"
#include "../libs/memsink.h"
#include "../libs/capture.h"
#ifdef WITH_V4P
@ -37,11 +38,15 @@
#include "blank.h"
#include "encoder.h"
#include "h264.h"
#include "m2m.h"
typedef struct {
us_h264_stream_s *h264;
us_m2m_encoder_s *h264_enc;
us_frame_s *h264_tmp_src;
us_frame_s *h264_dest;
bool h264_key_requested;
atomic_bool http_h264_online;
us_ring_s *http_jpeg_ring;
atomic_bool http_has_clients;