refactoring

This commit is contained in:
Maxim Devaev 2024-02-29 22:40:45 +02:00
parent df63ad4678
commit 15f160c874
4 changed files with 69 additions and 80 deletions

View File

@ -41,7 +41,7 @@ static void _http_request_watcher(int fd, short event, void *v_server);
static void _http_refresher(int fd, short event, void *v_server); static void _http_refresher(int fd, short event, void *v_server);
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated); static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
static bool _expose_new_frame(us_server_s *server); static bool _expose_frame(us_server_s *server, us_frame_s *frame);
static const char *_http_get_header(struct evhttp_request *request, const char *key); static const char *_http_get_header(struct evhttp_request *request, const char *key);
static char *_http_get_client_hostport(struct evhttp_request *request); static char *_http_get_client_hostport(struct evhttp_request *request);
@ -51,8 +51,6 @@ static char *_http_get_client_hostport(struct evhttp_request *request);
#define _A_EVBUFFER_ADD(x_buf, x_data, x_size) assert(!evbuffer_add(x_buf, x_data, x_size)) #define _A_EVBUFFER_ADD(x_buf, x_data, x_size) assert(!evbuffer_add(x_buf, x_data, x_size))
#define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0) #define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0)
#define _VID(x_next) server->run->stream->run->video->x_next
us_server_s *us_server_init(us_stream_s *stream) { us_server_s *us_server_init(us_stream_s *stream) {
us_server_exposed_s *exposed; us_server_exposed_s *exposed;
@ -62,7 +60,6 @@ us_server_s *us_server_init(us_stream_s *stream) {
us_server_runtime_s *run; us_server_runtime_s *run;
US_CALLOC(run, 1); US_CALLOC(run, 1);
run->ext_fd = -1; run->ext_fd = -1;
run->stream = stream;
run->exposed = exposed; run->exposed = exposed;
us_server_s *server; us_server_s *server;
@ -76,6 +73,7 @@ us_server_s *us_server_init(us_stream_s *stream) {
server->allow_origin = ""; server->allow_origin = "";
server->instance_id = ""; server->instance_id = "";
server->timeout = 10; server->timeout = 10;
server->stream = stream;
server->run = run; server->run = run;
assert(!evthread_use_pthreads()); assert(!evthread_use_pthreads());
@ -123,7 +121,7 @@ void us_server_destroy(us_server_s *server) {
int us_server_listen(us_server_s *server) { int us_server_listen(us_server_s *server) {
us_server_runtime_s *const run = server->run; us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed; us_server_exposed_s *const ex = run->exposed;
us_stream_s *const stream = run->stream; us_stream_s *const stream = server->stream;
{ {
if (server->static_path[0] != '\0') { if (server->static_path[0] != '\0') {
@ -405,7 +403,7 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
us_server_s *const server = (us_server_s *)v_server; us_server_s *const server = (us_server_s *)v_server;
us_server_runtime_s *const run = server->run; us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed; us_server_exposed_s *const ex = run->exposed;
us_stream_s *const stream = run->stream; us_stream_s *const stream = server->stream;
PREPROCESS_REQUEST; PREPROCESS_REQUEST;
@ -460,7 +458,7 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
(server->fake_height ? server->fake_height : ex->frame->height), (server->fake_height ? server->fake_height : ex->frame->height),
us_bool_to_string(ex->frame->online), us_bool_to_string(ex->frame->online),
stream->dev->desired_fps, stream->dev->desired_fps,
ex->captured_fps, atomic_load(&stream->run->captured_fps),
ex->queued_fps, ex->queued_fps,
run->stream_clients_count run->stream_clients_count
); );
@ -576,7 +574,7 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
US_LIST_APPEND_C(run->stream_clients, client, run->stream_clients_count); US_LIST_APPEND_C(run->stream_clients, client, run->stream_clients_count);
if (run->stream_clients_count == 1) { if (run->stream_clients_count == 1) {
atomic_store(&_VID(has_clients), true); atomic_store(&server->stream->run->http_has_clients, true);
# ifdef WITH_GPIO # ifdef WITH_GPIO
us_gpio_set_has_http_clients(true); us_gpio_set_has_http_clients(true);
# endif # endif
@ -762,7 +760,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
US_LIST_REMOVE_C(run->stream_clients, client, run->stream_clients_count); US_LIST_REMOVE_C(run->stream_clients, client, run->stream_clients_count);
if (run->stream_clients_count == 0) { if (run->stream_clients_count == 0) {
atomic_store(&_VID(has_clients), false); atomic_store(&server->stream->run->http_has_clients, false);
# ifdef WITH_GPIO # ifdef WITH_GPIO
us_gpio_set_has_http_clients(false); us_gpio_set_has_http_clients(false);
# endif # endif
@ -844,7 +842,7 @@ static void _http_request_watcher(int fd, short what, void *v_server) {
us_server_runtime_s *const run = server->run; us_server_runtime_s *const run = server->run;
const long double now = us_get_now_monotonic(); const long double now = us_get_now_monotonic();
if (us_stream_has_clients(run->stream)) { if (us_stream_has_clients(server->stream)) {
run->last_request_ts = now; run->last_request_ts = now;
} else if (run->last_request_ts + server->exit_on_no_clients < now) { } else if (run->last_request_ts + server->exit_on_no_clients < now) {
US_LOG_INFO("HTTP: No requests or HTTP/sink clients found in last %u seconds, exiting ...", US_LOG_INFO("HTTP: No requests or HTTP/sink clients found in last %u seconds, exiting ...",
@ -860,12 +858,17 @@ static void _http_refresher(int fd, short what, void *v_server) {
us_server_s *server = (us_server_s *)v_server; us_server_s *server = (us_server_s *)v_server;
us_server_exposed_s *ex = server->run->exposed; us_server_exposed_s *ex = server->run->exposed;
us_ring_s *const ring = server->stream->run->http_jpeg_ring;
bool stream_updated = false; bool stream_updated = false;
bool frame_updated = false; bool frame_updated = false;
if (atomic_load(&_VID(updated))) { const int ri = us_ring_consumer_acquire(ring, 0);
frame_updated = _expose_new_frame(server); if (ri >= 0) {
us_frame_s *const frame = ring->items[ri];
frame_updated = _expose_frame(server, frame);
stream_updated = true; stream_updated = true;
us_ring_consumer_release(ring, ri);
} else if (ex->expose_end_ts + 1 < us_get_now_monotonic()) { } else if (ex->expose_end_ts + 1 < us_get_now_monotonic()) {
US_LOG_DEBUG("HTTP: Repeating exposed ..."); US_LOG_DEBUG("HTTP: Repeating exposed ...");
ex->expose_begin_ts = us_get_now_monotonic(); ex->expose_begin_ts = us_get_now_monotonic();
@ -893,31 +896,25 @@ static void _http_refresher(int fd, short what, void *v_server) {
} }
} }
static bool _expose_new_frame(us_server_s *server) { static bool _expose_frame(us_server_s *server, us_frame_s *frame) {
us_server_exposed_s *const ex = server->run->exposed; us_server_exposed_s *const ex = server->run->exposed;
bool updated = false; US_LOG_DEBUG("HTTP: Updating exposed frame (online=%d) ...", frame->online);
US_MUTEX_LOCK(_VID(mutex));
US_LOG_DEBUG("HTTP: Updating exposed frame (online=%d) ...", _VID(frame->online));
ex->captured_fps = _VID(captured_fps);
ex->expose_begin_ts = us_get_now_monotonic(); ex->expose_begin_ts = us_get_now_monotonic();
if (server->drop_same_frames && _VID(frame->online)) { if (server->drop_same_frames && frame->online) {
bool need_drop = false; bool need_drop = false;
bool maybe_same = false; bool maybe_same = false;
if ( if (
(need_drop = (ex->dropped < server->drop_same_frames)) (need_drop = (ex->dropped < server->drop_same_frames))
&& (maybe_same = us_frame_compare(ex->frame, _VID(frame))) && (maybe_same = us_frame_compare(ex->frame, frame))
) { ) {
ex->expose_cmp_ts = us_get_now_monotonic(); ex->expose_cmp_ts = us_get_now_monotonic();
ex->expose_end_ts = ex->expose_cmp_ts; ex->expose_end_ts = ex->expose_cmp_ts;
US_LOG_VERBOSE("HTTP: Dropped same frame number %u; cmp_time=%.06Lf", US_LOG_VERBOSE("HTTP: Dropped same frame number %u; cmp_time=%.06Lf",
ex->dropped, (ex->expose_cmp_ts - ex->expose_begin_ts)); ex->dropped, (ex->expose_cmp_ts - ex->expose_begin_ts));
ex->dropped += 1; ex->dropped += 1;
goto not_updated; return false; // Not updated
} else { } else {
ex->expose_cmp_ts = us_get_now_monotonic(); ex->expose_cmp_ts = us_get_now_monotonic();
US_LOG_VERBOSE("HTTP: Passed same frame check (need_drop=%d, maybe_same=%d); cmp_time=%.06Lf", US_LOG_VERBOSE("HTTP: Passed same frame check (need_drop=%d, maybe_same=%d); cmp_time=%.06Lf",
@ -925,7 +922,12 @@ static bool _expose_new_frame(us_server_s *server) {
} }
} }
us_frame_copy(_VID(frame), ex->frame); if (frame->used == 0) {
// Фрейм нулевой длины означает, что мы просто должны повторить то что уже есть.
US_FRAME_COPY_META(frame, ex->frame);
} else {
us_frame_copy(frame, ex->frame);
}
ex->dropped = 0; ex->dropped = 0;
ex->expose_cmp_ts = ex->expose_begin_ts; ex->expose_cmp_ts = ex->expose_begin_ts;
@ -933,13 +935,7 @@ static bool _expose_new_frame(us_server_s *server) {
US_LOG_VERBOSE("HTTP: Exposed frame: online=%d, exp_time=%.06Lf", US_LOG_VERBOSE("HTTP: Exposed frame: online=%d, exp_time=%.06Lf",
ex->frame->online, (ex->expose_end_ts - ex->expose_begin_ts)); ex->frame->online, (ex->expose_end_ts - ex->expose_begin_ts));
return true; // Updated
updated = true;
not_updated:
atomic_store(&_VID(updated), false);
US_MUTEX_UNLOCK(_VID(mutex));
return updated;
} }
static const char *_http_get_header(struct evhttp_request *request, const char *key) { static const char *_http_get_header(struct evhttp_request *request, const char *key) {

View File

@ -124,7 +124,6 @@ typedef struct {
long double last_request_ts; long double last_request_ts;
struct event *refresher; struct event *refresher;
us_stream_s *stream;
us_server_exposed_s *exposed; us_server_exposed_s *exposed;
us_stream_client_s *stream_clients; us_stream_client_s *stream_clients;
@ -159,6 +158,8 @@ typedef struct us_server_sx {
bool notify_parent; bool notify_parent;
unsigned exit_on_no_clients; unsigned exit_on_no_clients;
us_stream_s *stream;
us_server_runtime_s *run; us_server_runtime_s *run;
} us_server_s; } us_server_s;

View File

@ -24,7 +24,7 @@
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream); static us_workers_pool_s *_stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigned captured_fps); static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
#define _RUN(x_next) stream->run->x_next #define _RUN(x_next) stream->run->x_next
@ -46,16 +46,11 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne
us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
us_stream_runtime_s *run; us_stream_runtime_s *run;
US_CALLOC(run, 1); US_CALLOC(run, 1);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->captured_fps, 0);
atomic_init(&run->stop, false); atomic_init(&run->stop, false);
us_video_s *video;
US_CALLOC(video, 1);
video->frame = us_frame_init();
atomic_init(&video->updated, false);
US_MUTEX_INIT(video->mutex);
atomic_init(&video->has_clients, false);
run->video = video;
us_stream_s *stream; us_stream_s *stream;
US_CALLOC(stream, 1); US_CALLOC(stream, 1);
stream->dev = dev; stream->dev = dev;
@ -69,9 +64,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
} }
void us_stream_destroy(us_stream_s *stream) { void us_stream_destroy(us_stream_s *stream) {
US_MUTEX_DESTROY(_RUN(video->mutex)); US_RING_DELETE_WITH_ITEMS(stream->run->http_jpeg_ring, us_frame_destroy);
us_frame_destroy(_RUN(video->frame));
free(_RUN(video));
free(stream->run); free(stream->run);
free(stream); free(stream);
} }
@ -89,7 +82,6 @@ void us_stream_loop(us_stream_s *stream) {
for (us_workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) { for (us_workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) {
long double grab_after = 0; long double grab_after = 0;
unsigned fluency_passed = 0; unsigned fluency_passed = 0;
unsigned captured_fps = 0;
unsigned captured_fps_accum = 0; unsigned captured_fps_accum = 0;
long long captured_fps_second = 0; long long captured_fps_second = 0;
@ -110,7 +102,7 @@ void us_stream_loop(us_stream_s *stream) {
if (!ready_wr->job_failed) { if (!ready_wr->job_failed) {
if (ready_wr->job_timely) { if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest, captured_fps); _stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else { } else {
@ -171,10 +163,10 @@ void us_stream_loop(us_stream_s *stream) {
fluency_passed = 0; fluency_passed = 0;
if (now_second != captured_fps_second) { if (now_second != captured_fps_second) {
captured_fps = captured_fps_accum; US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
atomic_store(&stream->run->captured_fps, captured_fps_accum);
captured_fps_accum = 0; captured_fps_accum = 0;
captured_fps_second = now_second; captured_fps_second = now_second;
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps);
} }
captured_fps_accum += 1; captured_fps_accum += 1;
@ -217,7 +209,7 @@ void us_stream_loop_break(us_stream_s *stream) {
bool us_stream_has_clients(us_stream_s *stream) { bool us_stream_has_clients(us_stream_s *stream) {
return ( return (
atomic_load(&_RUN(video->has_clients)) atomic_load(&_RUN(http_has_clients))
// has_clients синков НЕ обновляются в реальном времени // has_clients синков НЕ обновляются в реальном времени
|| (stream->sink != NULL && atomic_load(&stream->sink->has_clients)) || (stream->sink != NULL && atomic_load(&stream->sink->has_clients))
|| (_RUN(h264) != NULL && /*_RUN(h264->sink) == NULL ||*/ atomic_load(&_RUN(h264->sink->has_clients))) || (_RUN(h264) != NULL && /*_RUN(h264->sink) == NULL ||*/ atomic_load(&_RUN(h264->sink->has_clients)))
@ -227,7 +219,8 @@ bool us_stream_has_clients(us_stream_s *stream) {
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) { static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
int access_errno = 0; int access_errno = 0;
while (!atomic_load(&_RUN(stop))) { while (!atomic_load(&_RUN(stop))) {
_stream_expose_frame(stream, NULL, 0); atomic_store(&stream->run->captured_fps, 0);
_stream_expose_frame(stream, NULL);
if (access(stream->dev->path, R_OK|W_OK) < 0) { if (access(stream->dev->path, R_OK|W_OK) < 0) {
if (access_errno != errno) { if (access_errno != errno) {
@ -258,24 +251,18 @@ static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
return NULL; return NULL;
} }
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigned captured_fps) { static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
# define VID(x_next) _RUN(video->x_next) us_stream_runtime_s *const run = stream->run;
us_frame_s *new = NULL; us_frame_s *new = NULL;
US_MUTEX_LOCK(VID(mutex));
if (frame != NULL) { if (frame != NULL) {
new = frame; new = frame;
_RUN(last_as_blank_ts) = 0; // Останавливаем таймер _RUN(last_as_blank_ts) = 0; // Останавливаем таймер
US_LOG_DEBUG("Exposed ALIVE video frame"); US_LOG_DEBUG("Exposed ALIVE video frame");
} else { } else {
if (VID(frame->used == 0)) { if (run->last_online) { // Если переходим из online в offline
new = stream->blank; // Инициализация
_RUN(last_as_blank_ts) = 0;
} else if (VID(frame->online)) { // Если переходим из online в offline
if (stream->last_as_blank < 0) { // Если last_as_blank выключен, просто покажем старую картинку if (stream->last_as_blank < 0) { // Если last_as_blank выключен, просто покажем старую картинку
new = stream->blank; new = stream->blank;
US_LOG_INFO("Changed video frame to BLANK"); US_LOG_INFO("Changed video frame to BLANK");
@ -285,7 +272,6 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne
} else { // last_as_blank == 0 - показываем последний фрейм вечно } else { // last_as_blank == 0 - показываем последний фрейм вечно
US_LOG_INFO("Freezed last ALIVE video frame forever"); US_LOG_INFO("Freezed last ALIVE video frame forever");
} }
} else if (stream->last_as_blank < 0) { } else if (stream->last_as_blank < 0) {
new = stream->blank; new = stream->blank;
// US_LOG_INFO("Changed video frame to BLANK"); // US_LOG_INFO("Changed video frame to BLANK");
@ -297,27 +283,37 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne
&& _RUN(last_as_blank_ts) < us_get_now_monotonic() && _RUN(last_as_blank_ts) < us_get_now_monotonic()
) { ) {
new = stream->blank; new = stream->blank;
_RUN(last_as_blank_ts) = 0; // // Останавливаем таймер _RUN(last_as_blank_ts) = 0; // Останавливаем таймер
US_LOG_INFO("Changed last ALIVE video frame to BLANK"); US_LOG_INFO("Changed last ALIVE video frame to BLANK");
} }
} }
if (new != NULL) { int ri = -1;
us_frame_copy(new, VID(frame)); while (
!atomic_load(&_RUN(stop))
&& ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0)
) {
US_LOG_ERROR("Can't push JPEG to HTTP ring (no free slots)");
}
if (ri < 0) {
return;
} }
VID(frame->online) = (frame != NULL);
VID(captured_fps) = captured_fps;
atomic_store(&VID(updated), true);
US_MUTEX_UNLOCK(VID(mutex)); us_frame_s *const dest = run->http_jpeg_ring->items[ri];
if (new == NULL) {
dest->used = 0;
dest->online = false;
} else {
us_frame_copy(new, dest);
dest->online = true;
}
run->last_online = (frame != NULL);
us_ring_producer_release(run->http_jpeg_ring, ri);
new = (frame ? frame : stream->blank); _SINK_PUT(sink, (frame != NULL ? frame : stream->blank));
_SINK_PUT(sink, new);
if (frame == NULL) { if (frame == NULL) {
_SINK_PUT(raw_sink, stream->blank); _SINK_PUT(raw_sink, stream->blank);
_H264_PUT(stream->blank, false); _H264_PUT(stream->blank, false);
} }
# undef VID
} }

View File

@ -35,6 +35,7 @@
#include "../libs/tools.h" #include "../libs/tools.h"
#include "../libs/threading.h" #include "../libs/threading.h"
#include "../libs/logging.h" #include "../libs/logging.h"
#include "../libs/ring.h"
#include "../libs/frame.h" #include "../libs/frame.h"
#include "../libs/memsink.h" #include "../libs/memsink.h"
#include "../libs/device.h" #include "../libs/device.h"
@ -49,16 +50,11 @@
typedef struct { typedef struct {
us_frame_s *frame; us_ring_s *http_jpeg_ring;
unsigned captured_fps; atomic_bool http_has_clients;
atomic_bool updated; atomic_uint captured_fps;
pthread_mutex_t mutex;
atomic_bool has_clients; // For slowdown bool last_online;
} us_video_s;
typedef struct {
us_video_s *video;
long double last_as_blank_ts; long double last_as_blank_ts;
us_h264_stream_s *h264; us_h264_stream_s *h264;