From 4ec3f1193572d86192f3eaba41f24a5e2a23b898 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Sat, 30 Mar 2024 16:10:46 +0200 Subject: [PATCH] refactoring --- src/ustreamer/http/server.c | 18 ++++++------- src/ustreamer/stream.c | 51 ++++++++++++++++++++----------------- src/ustreamer/stream.h | 24 +++++++++-------- 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/src/ustreamer/http/server.c b/src/ustreamer/http/server.c index cb1372f..6a52060 100644 --- a/src/ustreamer/http/server.c +++ b/src/ustreamer/http/server.c @@ -279,7 +279,7 @@ void us_server_loop_break(us_server_s *server) { static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) { const us_server_runtime_s *const run = server->run; - atomic_store(&server->stream->run->http_last_request_ts, us_get_now_monotonic()); + atomic_store(&server->stream->run->http->last_request_ts, us_get_now_monotonic()); if (server->allow_origin[0] != '\0') { const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers"); @@ -481,7 +481,7 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server) " \"h264\": {\"bitrate\": %u, \"gop\": %u, \"online\": %s},", stream->h264_bitrate, stream->h264_gop, - us_bool_to_string(atomic_load(&stream->run->http_h264_online)) + us_bool_to_string(atomic_load(&stream->run->http->h264_online)) ); } @@ -504,7 +504,7 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server) } us_fpsi_meta_s captured_meta; - const uint captured_fps = us_fpsi_get(stream->run->http_captured_fpsi, &captured_meta); + const uint captured_fps = us_fpsi_get(stream->run->http->captured_fpsi, &captured_meta); _A_EVBUFFER_ADD_PRINTF(buf, " \"source\": {\"resolution\": {\"width\": %u, \"height\": %u}," " \"online\": %s, \"desired_fps\": %u, \"captured_fps\": %u}," @@ -551,7 +551,7 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv client->request = request; client->request_ts = us_get_now_monotonic(); - atomic_fetch_add(&server->stream->run->http_snapshot_requested, 1); + atomic_fetch_add(&server->stream->run->http->snapshot_requested, 1); US_LIST_APPEND(server->run->snapshot_clients, client); } @@ -600,7 +600,7 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server US_LIST_APPEND_C(run->stream_clients, client, run->stream_clients_count); if (run->stream_clients_count == 1) { - atomic_store(&server->stream->run->http_has_clients, true); + atomic_store(&server->stream->run->http->has_clients, true); # ifdef WITH_GPIO us_gpio_set_has_http_clients(true); # endif @@ -779,7 +779,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha US_LIST_REMOVE_C(run->stream_clients, client, run->stream_clients_count); if (run->stream_clients_count == 0) { - atomic_store(&server->stream->run->http_has_clients, false); + atomic_store(&server->stream->run->http->has_clients, false); # ifdef WITH_GPIO us_gpio_set_has_http_clients(false); # endif @@ -861,12 +861,12 @@ static void _http_send_snapshot(us_server_s *server) { } us_fpsi_meta_s captured_meta; - us_fpsi_get(server->stream->run->http_captured_fpsi, &captured_meta); + us_fpsi_get(server->stream->run->http->captured_fpsi, &captured_meta); US_LIST_ITERATE(server->run->snapshot_clients, client, { // cppcheck-suppress constStatement struct evhttp_request *request = client->request; - const bool has_fresh_snapshot = (atomic_load(&server->stream->run->http_snapshot_requested) == 0); + const bool has_fresh_snapshot = (atomic_load(&server->stream->run->http->snapshot_requested) == 0); const bool timed_out = (client->request_ts + US_MAX((uint)1, server->stream->error_delay * 3) < us_get_now_monotonic()); if (has_fresh_snapshot || timed_out) { @@ -921,7 +921,7 @@ static void _http_refresher(int fd, short what, void *v_server) { us_server_s *server = v_server; us_server_exposed_s *ex = server->run->exposed; - us_ring_s *const ring = server->stream->run->http_jpeg_ring; + us_ring_s *const ring = server->stream->run->http->jpeg_ring; bool stream_updated = false; bool frame_updated = false; diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index cd0c0bb..3f2e2ae 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -95,16 +95,20 @@ static void _stream_check_suicide(us_stream_s *stream); us_stream_s *us_stream_init(us_capture_s *cap, us_encoder_s *enc) { + us_stream_http_s *http; + US_CALLOC(http, 1); + atomic_init(&http->h264_online, false); + US_RING_INIT_WITH_ITEMS(http->jpeg_ring, 4, us_frame_init); + atomic_init(&http->has_clients, false); + atomic_init(&http->snapshot_requested, 0); + atomic_init(&http->last_request_ts, 0); + http->captured_fpsi = us_fpsi_init("STREAM-CAPTURED", true); + us_stream_runtime_s *run; US_CALLOC(run, 1); - atomic_init(&run->http_h264_online, false); - US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init); - atomic_init(&run->http_has_clients, false); - atomic_init(&run->http_snapshot_requested, 0); - atomic_init(&run->http_last_request_ts, 0); - run->http_captured_fpsi = us_fpsi_init("STREAM-CAPTURED", true); atomic_init(&run->stop, false); run->blank = us_blank_init(); + run->http = http; us_stream_s *stream; US_CALLOC(stream, 1); @@ -116,14 +120,15 @@ us_stream_s *us_stream_init(us_capture_s *cap, us_encoder_s *enc) { stream->run = run; us_blank_draw(run->blank, "< NO SIGNAL >", cap->width, cap->height); - us_fpsi_reset(run->http_captured_fpsi, run->blank->raw); + us_fpsi_reset(http->captured_fpsi, run->blank->raw); return stream; } void us_stream_destroy(us_stream_s *stream) { + us_fpsi_destroy(stream->run->http->captured_fpsi); + US_RING_DELETE_WITH_ITEMS(stream->run->http->jpeg_ring, us_frame_destroy); us_blank_destroy(stream->run->blank); - us_fpsi_destroy(stream->run->http_captured_fpsi); - US_RING_DELETE_WITH_ITEMS(stream->run->http_jpeg_ring, us_frame_destroy); + free(stream->run->http); free(stream->run); free(stream); } @@ -132,7 +137,7 @@ void us_stream_loop(us_stream_s *stream) { us_stream_runtime_s *const run = stream->run; us_capture_s *const cap = stream->cap; - atomic_store(&run->http_last_request_ts, us_get_now_monotonic()); + atomic_store(&run->http->last_request_ts, us_get_now_monotonic()); if (stream->h264_sink != NULL) { run->h264_enc = us_m2m_h264_encoder_init("H264", stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop); @@ -186,7 +191,7 @@ void us_stream_loop(us_stream_s *stream) { default: goto close; // Any error } - us_fpsi_bump(run->http_captured_fpsi, &hw->raw); + us_fpsi_bump(run->http->captured_fpsi, &hw->raw); # ifdef WITH_GPIO us_gpio_set_stream_online(true); @@ -307,8 +312,8 @@ static void *_jpeg_thread(void *v_ctx) { // pass } else if (ready_wr->job_timely) { _stream_expose_jpeg(stream, ready_job->dest); - if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots - atomic_fetch_sub(&stream->run->http_snapshot_requested, 1); + if (atomic_load(&stream->run->http->snapshot_requested) > 0) { // Process real snapshots + atomic_fetch_sub(&stream->run->http->snapshot_requested, 1); } US_LOG_PERF("JPEG: ##### Encoded JPEG exposed; worker=%s, latency=%.3Lf", ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts); @@ -481,8 +486,8 @@ static us_capture_hwbuf_s *_get_latest_hw(us_queue_s *queue) { static bool _stream_has_jpeg_clients_cached(us_stream_s *stream) { const us_stream_runtime_s *const run = stream->run; return ( - atomic_load(&run->http_has_clients) - || (atomic_load(&run->http_snapshot_requested) > 0) + atomic_load(&run->http->has_clients) + || (atomic_load(&run->http->snapshot_requested) > 0) || (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients)) ); } @@ -552,7 +557,7 @@ static int _stream_init_loop(us_stream_s *stream) { height = stream->cap->height; } us_blank_draw(run->blank, "< NO SIGNAL >", width, height); - us_fpsi_reset(run->http_captured_fpsi, run->blank->raw); + us_fpsi_reset(run->http->captured_fpsi, run->blank->raw); _stream_expose_jpeg(stream, run->blank->jpeg); _stream_expose_raw(stream, run->blank->raw); @@ -591,14 +596,14 @@ close: static void _stream_expose_jpeg(us_stream_s *stream, const us_frame_s *frame) { us_stream_runtime_s *const run = stream->run; int ri; - while ((ri = us_ring_producer_acquire(run->http_jpeg_ring, 0)) < 0) { + while ((ri = us_ring_producer_acquire(run->http->jpeg_ring, 0)) < 0) { if (atomic_load(&run->stop)) { return; } } - us_frame_s *const dest = run->http_jpeg_ring->items[ri]; + us_frame_s *const dest = run->http->jpeg_ring->items[ri]; us_frame_copy(frame, dest); - us_ring_producer_release(run->http_jpeg_ring, ri); + us_ring_producer_release(run->http->jpeg_ring, ri); if (stream->jpeg_sink != NULL) { us_memsink_server_put(stream->jpeg_sink, dest, NULL); } @@ -631,7 +636,7 @@ static void _stream_encode_expose_h264(us_stream_s *stream, const us_frame_s *fr online = !us_memsink_server_put(stream->h264_sink, run->h264_dest, &run->h264_key_requested); } done: - atomic_store(&run->http_h264_online, online); + atomic_store(&run->http->h264_online, online); } static void _stream_check_suicide(us_stream_s *stream) { @@ -640,13 +645,13 @@ static void _stream_check_suicide(us_stream_s *stream) { } us_stream_runtime_s *const run = stream->run; const ldf now_ts = us_get_now_monotonic(); - const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds + const ull http_last_request_ts = atomic_load(&run->http->last_request_ts); // Seconds if (_stream_has_any_clients_cached(stream)) { - atomic_store(&run->http_last_request_ts, now_ts); + atomic_store(&run->http->last_request_ts, now_ts); } else if (http_last_request_ts + stream->exit_on_no_clients < now_ts) { US_LOG_INFO("No requests or HTTP/sink clients found in last %u seconds, exiting ...", stream->exit_on_no_clients); us_process_suicide(); - atomic_store(&run->http_last_request_ts, now_ts); + atomic_store(&run->http->last_request_ts, now_ts); } } diff --git a/src/ustreamer/stream.h b/src/ustreamer/stream.h index 1d48875..0264b5b 100644 --- a/src/ustreamer/stream.h +++ b/src/ustreamer/stream.h @@ -43,28 +43,32 @@ typedef struct { + atomic_bool h264_online; + + us_ring_s *jpeg_ring; + atomic_bool has_clients; + atomic_uint snapshot_requested; + atomic_ullong last_request_ts; // Seconds + us_fpsi_s *captured_fpsi; +} us_stream_http_s; + +typedef struct { + us_stream_http_s *http; + us_m2m_encoder_s *h264_enc; us_frame_s *h264_tmp_src; us_frame_s *h264_dest; bool h264_key_requested; - atomic_bool http_h264_online; - us_ring_s *http_jpeg_ring; - atomic_bool http_has_clients; - atomic_uint http_snapshot_requested; - atomic_ullong http_last_request_ts; // Seconds - us_fpsi_s *http_captured_fpsi; + us_blank_s *blank; - us_blank_s *blank; - - atomic_bool stop; + atomic_bool stop; } us_stream_runtime_s; typedef struct { us_capture_s *cap; us_encoder_s *enc; - int last_as_blank; bool slowdown; uint error_delay; uint exit_on_no_clients;