From 54b221aabdd75a53304e54b5932f76e14c6f84b5 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Fri, 1 Mar 2024 09:40:51 +0200 Subject: [PATCH] moved exit_on_no_clients logic from http to stream loop --- src/ustreamer/http/server.c | 34 +--------------------------------- src/ustreamer/http/server.h | 6 +----- src/ustreamer/options.c | 2 +- src/ustreamer/stream.c | 37 +++++++++++++++++++++++++++++++------ src/ustreamer/stream.h | 5 +++-- 5 files changed, 37 insertions(+), 47 deletions(-) diff --git a/src/ustreamer/http/server.c b/src/ustreamer/http/server.c index dba0fc8..b34d44c 100644 --- a/src/ustreamer/http/server.c +++ b/src/ustreamer/http/server.c @@ -37,7 +37,6 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_ctx); static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx); -static void _http_request_watcher(int fd, short event, void *v_server); static void _http_refresher(int fd, short event, void *v_server); static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated); @@ -91,11 +90,6 @@ void us_server_destroy(us_server_s *server) { event_free(run->refresher); } - if (run->request_watcher != NULL) { - event_del(run->request_watcher); - event_free(run->request_watcher); - } - evhttp_free(run->http); US_CLOSE_FD(run->ext_fd); event_base_free(run->base); @@ -140,14 +134,6 @@ int us_server_listen(us_server_s *server) { ex->notify_last_width = ex->frame->width; ex->notify_last_height = ex->frame->height; - if (server->exit_on_no_clients > 0) { - run->last_request_ts = us_get_now_monotonic(); - struct timeval interval = {0}; - interval.tv_usec = 100000; - assert((run->request_watcher = event_new(run->base, -1, EV_PERSIST, _http_request_watcher, server)) != NULL); - assert(!event_add(run->request_watcher, &interval)); - } - { struct timeval interval = {0}; if (stream->dev->desired_fps > 0) { @@ -223,7 +209,7 @@ void us_server_loop_break(us_server_s *server) { static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) { us_server_runtime_s *const run = server->run; - run->last_request_ts = us_get_now_monotonic(); + atomic_store(&server->stream->run->http_last_request_ts, us_get_now_monotonic()); if (server->allow_origin[0] != '\0') { const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers"); @@ -834,24 +820,6 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo } } -static void _http_request_watcher(int fd, short what, void *v_server) { - (void)fd; - (void)what; - - us_server_s *const server = (us_server_s *)v_server; - us_server_runtime_s *const run = server->run; - const long double now = us_get_now_monotonic(); - - if (us_stream_has_clients(server->stream)) { - run->last_request_ts = now; - } else if (run->last_request_ts + server->exit_on_no_clients < now) { - US_LOG_INFO("HTTP: No requests or HTTP/sink clients found in last %u seconds, exiting ...", - server->exit_on_no_clients); - us_process_suicide(); - run->last_request_ts = now; - } -} - static void _http_refresher(int fd, short what, void *v_server) { (void)fd; (void)what; diff --git a/src/ustreamer/http/server.h b/src/ustreamer/http/server.h index 40c9063..c661cc6 100644 --- a/src/ustreamer/http/server.h +++ b/src/ustreamer/http/server.h @@ -120,9 +120,6 @@ typedef struct { char *auth_token; - struct event *request_watcher; - long double last_request_ts; - struct event *refresher; us_server_exposed_s *exposed; @@ -156,9 +153,8 @@ typedef struct us_server_sx { unsigned fake_height; bool notify_parent; - unsigned exit_on_no_clients; - us_stream_s *stream; + us_stream_s *stream; us_server_runtime_s *run; } us_server_s; diff --git a/src/ustreamer/options.c b/src/ustreamer/options.c index 6a30636..7099031 100644 --- a/src/ustreamer/options.c +++ b/src/ustreamer/options.c @@ -457,7 +457,7 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us }; break; # endif - case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", server->exit_on_no_clients, 0, 86400, 0); + case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", stream->exit_on_no_clients, 0, 86400, 0); # ifdef WITH_SETPROCTITLE case _O_PROCESS_NAME_PREFIX: OPT_SET(process_name_prefix, optarg); # endif diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 57a48d0..6b9e7e0 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -23,6 +23,8 @@ #include "stream.h" +static bool _stream_is_stopped(us_stream_s *stream); +static bool _stream_has_any_clients(us_stream_s *stream); static bool _stream_slowdown(us_stream_s *stream); static int _stream_init_loop(us_stream_s *stream); static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame); @@ -49,6 +51,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) { US_CALLOC(run, 1); US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init); atomic_init(&run->http_has_clients, false); + atomic_init(&run->http_last_request_ts, 0); atomic_init(&run->captured_fps, 0); atomic_init(&run->stop, false); run->blank = us_blank_init(); @@ -76,6 +79,8 @@ void us_stream_loop(us_stream_s *stream) { US_LOG_INFO("Using V4L2 device: %s", stream->dev->path); US_LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); + atomic_store(&stream->run->http_last_request_ts, us_get_now_monotonic()); + if (stream->h264_sink != NULL) { _RUN(h264) = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop); } @@ -88,7 +93,7 @@ void us_stream_loop(us_stream_s *stream) { US_LOG_INFO("Capturing ..."); - while (!atomic_load(&_RUN(stop))) { + while (!_stream_is_stopped(stream)) { US_SEP_DEBUG('-'); US_LOG_DEBUG("Waiting for worker ..."); @@ -111,7 +116,7 @@ void us_stream_loop(us_stream_s *stream) { } const bool h264_force_key = _stream_slowdown(stream); - if (atomic_load(&_RUN(stop))) { + if (_stream_is_stopped(stream)) { break; } @@ -196,7 +201,27 @@ void us_stream_loop_break(us_stream_s *stream) { atomic_store(&_RUN(stop), true); } -bool us_stream_has_clients(us_stream_s *stream) { +static bool _stream_is_stopped(us_stream_s *stream) { + const bool stop = atomic_load(&_RUN(stop)); + if (stop) { + return true; + } + if (stream->exit_on_no_clients > 0) { + const long double now = us_get_now_monotonic(); + const uint64_t http_last_request_ts = atomic_load(&_RUN(http_last_request_ts)); // Seconds + if (_stream_has_any_clients(stream)) { + atomic_store(&_RUN(http_last_request_ts), now); + } else if (http_last_request_ts + stream->exit_on_no_clients < now) { + US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...", + stream->exit_on_no_clients); + us_process_suicide(); + atomic_store(&_RUN(http_last_request_ts), now); + } + } + return false; +} + +static bool _stream_has_any_clients(us_stream_s *stream) { return ( atomic_load(&_RUN(http_has_clients)) // has_clients синков НЕ обновляются в реальном времени @@ -208,7 +233,7 @@ bool us_stream_has_clients(us_stream_s *stream) { static bool _stream_slowdown(us_stream_s *stream) { if (stream->slowdown) { unsigned count = 0; - while (count < 10 && !atomic_load(&_RUN(stop)) && !us_stream_has_clients(stream)) { + while (count < 10 && !_stream_is_stopped(stream) && !_stream_has_any_clients(stream)) { usleep(100000); ++count; } @@ -219,7 +244,7 @@ static bool _stream_slowdown(us_stream_s *stream) { static int _stream_init_loop(us_stream_s *stream) { int access_errno = 0; - while (!atomic_load(&_RUN(stop))) { + while (!_stream_is_stopped(stream)) { unsigned width = stream->dev->run->width; unsigned height = stream->dev->run->height; if (width == 0 || height == 0) { @@ -304,7 +329,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) { int ri = -1; while ( - !atomic_load(&_RUN(stop)) + !_stream_is_stopped(stream) && ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0) ) { US_LOG_ERROR("Can't push JPEG to HTTP ring (no free slots)"); diff --git a/src/ustreamer/stream.h b/src/ustreamer/stream.h index 4faa576..09f0283 100644 --- a/src/ustreamer/stream.h +++ b/src/ustreamer/stream.h @@ -34,6 +34,7 @@ #include "../libs/tools.h" #include "../libs/threading.h" +#include "../libs/process.h" #include "../libs/logging.h" #include "../libs/ring.h" #include "../libs/frame.h" @@ -52,6 +53,7 @@ typedef struct { us_ring_s *http_jpeg_ring; atomic_bool http_has_clients; + atomic_uint_least64_t http_last_request_ts; // Seconds atomic_uint captured_fps; bool last_online; @@ -71,6 +73,7 @@ typedef struct { int last_as_blank; bool slowdown; unsigned error_delay; + unsigned exit_on_no_clients; us_memsink_s *sink; us_memsink_s *raw_sink; @@ -89,5 +92,3 @@ void us_stream_destroy(us_stream_s *stream); void us_stream_loop(us_stream_s *stream); void us_stream_loop_break(us_stream_s *stream); - -bool us_stream_has_clients(us_stream_s *stream);