moved exit_on_no_clients logic from http to stream loop

This commit is contained in:
Maxim Devaev 2024-03-01 09:40:51 +02:00
parent dabee9d47a
commit 54b221aabd
5 changed files with 37 additions and 47 deletions

View File

@ -37,7 +37,6 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_ctx);
static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx);
static void _http_request_watcher(int fd, short event, void *v_server);
static void _http_refresher(int fd, short event, void *v_server);
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
@ -91,11 +90,6 @@ void us_server_destroy(us_server_s *server) {
event_free(run->refresher);
}
if (run->request_watcher != NULL) {
event_del(run->request_watcher);
event_free(run->request_watcher);
}
evhttp_free(run->http);
US_CLOSE_FD(run->ext_fd);
event_base_free(run->base);
@ -140,14 +134,6 @@ int us_server_listen(us_server_s *server) {
ex->notify_last_width = ex->frame->width;
ex->notify_last_height = ex->frame->height;
if (server->exit_on_no_clients > 0) {
run->last_request_ts = us_get_now_monotonic();
struct timeval interval = {0};
interval.tv_usec = 100000;
assert((run->request_watcher = event_new(run->base, -1, EV_PERSIST, _http_request_watcher, server)) != NULL);
assert(!event_add(run->request_watcher, &interval));
}
{
struct timeval interval = {0};
if (stream->dev->desired_fps > 0) {
@ -223,7 +209,7 @@ void us_server_loop_break(us_server_s *server) {
static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) {
us_server_runtime_s *const run = server->run;
run->last_request_ts = us_get_now_monotonic();
atomic_store(&server->stream->run->http_last_request_ts, us_get_now_monotonic());
if (server->allow_origin[0] != '\0') {
const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers");
@ -834,24 +820,6 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo
}
}
static void _http_request_watcher(int fd, short what, void *v_server) {
(void)fd;
(void)what;
us_server_s *const server = (us_server_s *)v_server;
us_server_runtime_s *const run = server->run;
const long double now = us_get_now_monotonic();
if (us_stream_has_clients(server->stream)) {
run->last_request_ts = now;
} else if (run->last_request_ts + server->exit_on_no_clients < now) {
US_LOG_INFO("HTTP: No requests or HTTP/sink clients found in last %u seconds, exiting ...",
server->exit_on_no_clients);
us_process_suicide();
run->last_request_ts = now;
}
}
static void _http_refresher(int fd, short what, void *v_server) {
(void)fd;
(void)what;

View File

@ -120,9 +120,6 @@ typedef struct {
char *auth_token;
struct event *request_watcher;
long double last_request_ts;
struct event *refresher;
us_server_exposed_s *exposed;
@ -156,9 +153,8 @@ typedef struct us_server_sx {
unsigned fake_height;
bool notify_parent;
unsigned exit_on_no_clients;
us_stream_s *stream;
us_stream_s *stream;
us_server_runtime_s *run;
} us_server_s;

View File

@ -457,7 +457,7 @@ int options_parse(us_options_s *options, us_device_s *dev, us_encoder_s *enc, us
};
break;
# endif
case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", server->exit_on_no_clients, 0, 86400, 0);
case _O_EXIT_ON_NO_CLIENTS: OPT_NUMBER("--exit-on-no-clients", stream->exit_on_no_clients, 0, 86400, 0);
# ifdef WITH_SETPROCTITLE
case _O_PROCESS_NAME_PREFIX: OPT_SET(process_name_prefix, optarg);
# endif

View File

@ -23,6 +23,8 @@
#include "stream.h"
static bool _stream_is_stopped(us_stream_s *stream);
static bool _stream_has_any_clients(us_stream_s *stream);
static bool _stream_slowdown(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
@ -49,6 +51,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
US_CALLOC(run, 1);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->http_last_request_ts, 0);
atomic_init(&run->captured_fps, 0);
atomic_init(&run->stop, false);
run->blank = us_blank_init();
@ -76,6 +79,8 @@ void us_stream_loop(us_stream_s *stream) {
US_LOG_INFO("Using V4L2 device: %s", stream->dev->path);
US_LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
atomic_store(&stream->run->http_last_request_ts, us_get_now_monotonic());
if (stream->h264_sink != NULL) {
_RUN(h264) = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
}
@ -88,7 +93,7 @@ void us_stream_loop(us_stream_s *stream) {
US_LOG_INFO("Capturing ...");
while (!atomic_load(&_RUN(stop))) {
while (!_stream_is_stopped(stream)) {
US_SEP_DEBUG('-');
US_LOG_DEBUG("Waiting for worker ...");
@ -111,7 +116,7 @@ void us_stream_loop(us_stream_s *stream) {
}
const bool h264_force_key = _stream_slowdown(stream);
if (atomic_load(&_RUN(stop))) {
if (_stream_is_stopped(stream)) {
break;
}
@ -196,7 +201,27 @@ void us_stream_loop_break(us_stream_s *stream) {
atomic_store(&_RUN(stop), true);
}
bool us_stream_has_clients(us_stream_s *stream) {
static bool _stream_is_stopped(us_stream_s *stream) {
const bool stop = atomic_load(&_RUN(stop));
if (stop) {
return true;
}
if (stream->exit_on_no_clients > 0) {
const long double now = us_get_now_monotonic();
const uint64_t http_last_request_ts = atomic_load(&_RUN(http_last_request_ts)); // Seconds
if (_stream_has_any_clients(stream)) {
atomic_store(&_RUN(http_last_request_ts), now);
} else if (http_last_request_ts + stream->exit_on_no_clients < now) {
US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...",
stream->exit_on_no_clients);
us_process_suicide();
atomic_store(&_RUN(http_last_request_ts), now);
}
}
return false;
}
static bool _stream_has_any_clients(us_stream_s *stream) {
return (
atomic_load(&_RUN(http_has_clients))
// has_clients синков НЕ обновляются в реальном времени
@ -208,7 +233,7 @@ bool us_stream_has_clients(us_stream_s *stream) {
static bool _stream_slowdown(us_stream_s *stream) {
if (stream->slowdown) {
unsigned count = 0;
while (count < 10 && !atomic_load(&_RUN(stop)) && !us_stream_has_clients(stream)) {
while (count < 10 && !_stream_is_stopped(stream) && !_stream_has_any_clients(stream)) {
usleep(100000);
++count;
}
@ -219,7 +244,7 @@ static bool _stream_slowdown(us_stream_s *stream) {
static int _stream_init_loop(us_stream_s *stream) {
int access_errno = 0;
while (!atomic_load(&_RUN(stop))) {
while (!_stream_is_stopped(stream)) {
unsigned width = stream->dev->run->width;
unsigned height = stream->dev->run->height;
if (width == 0 || height == 0) {
@ -304,7 +329,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
int ri = -1;
while (
!atomic_load(&_RUN(stop))
!_stream_is_stopped(stream)
&& ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0)
) {
US_LOG_ERROR("Can't push JPEG to HTTP ring (no free slots)");

View File

@ -34,6 +34,7 @@
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/process.h"
#include "../libs/logging.h"
#include "../libs/ring.h"
#include "../libs/frame.h"
@ -52,6 +53,7 @@
typedef struct {
us_ring_s *http_jpeg_ring;
atomic_bool http_has_clients;
atomic_uint_least64_t http_last_request_ts; // Seconds
atomic_uint captured_fps;
bool last_online;
@ -71,6 +73,7 @@ typedef struct {
int last_as_blank;
bool slowdown;
unsigned error_delay;
unsigned exit_on_no_clients;
us_memsink_s *sink;
us_memsink_s *raw_sink;
@ -89,5 +92,3 @@ void us_stream_destroy(us_stream_s *stream);
void us_stream_loop(us_stream_s *stream);
void us_stream_loop_break(us_stream_s *stream);
bool us_stream_has_clients(us_stream_s *stream);