Issue #228: Request fresh snapshot from jpeg encoder

This commit is contained in:
Maxim Devaev 2024-03-03 04:59:12 +02:00
parent 8cb6fc4e78
commit c24d6338e2
4 changed files with 101 additions and 57 deletions

View File

@ -93,7 +93,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx);
static void _http_refresher(int fd, short event, void *v_server);
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated);
static void _http_send_snapshot(us_server_s *server);
static bool _expose_frame(us_server_s *server, const us_frame_s *frame);
@ -111,6 +112,9 @@ static char *_http_get_client_hostport(struct evhttp_request *request);
#define _A_EVBUFFER_ADD(x_buf, x_data, x_size) assert(!evbuffer_add(x_buf, x_data, x_size))
#define _A_EVBUFFER_ADD_PRINTF(x_buf, x_fmt, ...) assert(evbuffer_add_printf(x_buf, x_fmt, ##__VA_ARGS__) >= 0)
#define _A_ADD_HEADER(x_request, x_key, x_value) \
assert(!evhttp_add_header(evhttp_request_get_output_headers(x_request), x_key, x_value))
us_server_s *us_server_init(us_stream_s *stream) {
us_server_exposed_s *exposed;
@ -159,6 +163,10 @@ void us_server_destroy(us_server_s *server) {
libevent_global_shutdown();
# endif
US_LIST_ITERATE(run->snapshot_clients, client, { // cppcheck-suppress constStatement
free(client);
});
US_LIST_ITERATE(run->stream_clients, client, { // cppcheck-suppress constStatement
free(client->key);
free(client->hostport);
@ -265,8 +273,6 @@ void us_server_loop_break(us_server_s *server) {
event_base_loopbreak(server->run->base);
}
#define ADD_HEADER(x_key, x_value) assert(!evhttp_add_header(evhttp_request_get_output_headers(request), x_key, x_value))
static int _http_preprocess_request(struct evhttp_request *request, us_server_s *server) {
const us_server_runtime_s *const run = server->run;
@ -276,13 +282,13 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
const char *const cors_headers = _http_get_header(request, "Access-Control-Request-Headers");
const char *const cors_method = _http_get_header(request, "Access-Control-Request-Method");
ADD_HEADER("Access-Control-Allow-Origin", server->allow_origin);
ADD_HEADER("Access-Control-Allow-Credentials", "true");
_A_ADD_HEADER(request, "Access-Control-Allow-Origin", server->allow_origin);
_A_ADD_HEADER(request, "Access-Control-Allow-Credentials", "true");
if (cors_headers != NULL) {
ADD_HEADER("Access-Control-Allow-Headers", cors_headers);
_A_ADD_HEADER(request, "Access-Control-Allow-Headers", cors_headers);
}
if (cors_method != NULL) {
ADD_HEADER("Access-Control-Allow-Methods", cors_method);
_A_ADD_HEADER(request, "Access-Control-Allow-Methods", cors_method);
}
}
@ -295,7 +301,7 @@ static int _http_preprocess_request(struct evhttp_request *request, us_server_s
const char *const token = _http_get_header(request, "Authorization");
if (token == NULL || strcmp(token, run->auth_token) != 0) {
ADD_HEADER("WWW-Authenticate", "Basic realm=\"Restricted area\"");
_A_ADD_HEADER(request, "WWW-Authenticate", "Basic realm=\"Restricted area\"");
evhttp_send_reply(request, 401, "Unauthorized", NULL);
return -1;
}
@ -351,7 +357,7 @@ static void _http_callback_root(struct evhttp_request *request, void *v_server)
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD_PRINTF(buf, "%s", US_HTML_INDEX_PAGE);
ADD_HEADER("Content-Type", "text/html");
_A_ADD_HEADER(request, "Content-Type", "text/html");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
@ -365,7 +371,7 @@ static void _http_callback_favicon(struct evhttp_request *request, void *v_serve
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)US_FAVICON_ICO_DATA, US_FAVICON_ICO_DATA_SIZE);
ADD_HEADER("Content-Type", "image/x-icon");
_A_ADD_HEADER(request, "Content-Type", "image/x-icon");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
@ -423,7 +429,7 @@ static void _http_callback_static(struct evhttp_request *request, void *v_server
// and will close it when finished transferring data
fd = -1;
ADD_HEADER("Content-Type", us_guess_mime_type(static_path));
_A_ADD_HEADER(request, "Content-Type", us_guess_mime_type(static_path));
evhttp_send_reply(request, HTTP_OK, "OK", buf);
goto cleanup;
}
@ -527,62 +533,26 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
_A_EVBUFFER_ADD_PRINTF(buf, "}}}}");
ADD_HEADER("Content-Type", "application/json");
_A_ADD_HEADER(request, "Content-Type", "application/json");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
}
static void _http_callback_snapshot(struct evhttp_request *request, void *v_server) {
us_server_s *const server = v_server;
us_server_exposed_s *const ex = server->run->exposed;
PREPROCESS_REQUEST;
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used);
us_snapshot_client_s *client;
US_CALLOC(client, 1);
client->server = server;
client->request = request;
client->request_ts = us_get_now_monotonic();
ADD_HEADER("Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0");
ADD_HEADER("Pragma", "no-cache");
ADD_HEADER("Expires", "Mon, 3 Jan 2000 12:34:56 GMT");
char header_buf[256];
# define ADD_TIME_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \
ADD_HEADER(x_key, header_buf); \
}
# define ADD_UNSIGNED_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%u", x_value); \
ADD_HEADER(x_key, header_buf); \
}
ADD_TIME_HEADER("X-Timestamp", us_get_now_real());
ADD_HEADER("X-UStreamer-Online", us_bool_to_string(ex->frame->online));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped);
ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width);
ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height);
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts);
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic());
# undef ADD_UNSUGNED_HEADER
# undef ADD_TIME_HEADER
ADD_HEADER("Content-Type", "image/jpeg");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
atomic_fetch_add(&server->stream->run->http_snapshot_requested, 1);
US_LIST_APPEND(server->run->snapshot_clients, client);
}
#undef ADD_HEADER
static void _http_callback_stream(struct evhttp_request *request, void *v_server) {
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2814
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2789
@ -826,7 +796,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
free(client);
}
static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) {
static void _http_send_stream(us_server_s *server, bool stream_updated, bool frame_updated) {
us_server_runtime_s *const run = server->run;
us_server_exposed_s *const ex = run->exposed;
@ -881,6 +851,63 @@ static void _http_queue_send_stream(us_server_s *server, bool stream_updated, bo
}
}
static void _http_send_snapshot(us_server_s *server) {
us_server_exposed_s *const ex = server->run->exposed;
# define ADD_TIME_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%.06Lf", x_value); \
_A_ADD_HEADER(request, x_key, header_buf); \
}
# define ADD_UNSIGNED_HEADER(x_key, x_value) { \
US_SNPRINTF(header_buf, 255, "%u", x_value); \
_A_ADD_HEADER(request, x_key, header_buf); \
}
US_LIST_ITERATE(server->run->snapshot_clients, client, { // cppcheck-suppress constStatement
struct evhttp_request *request = client->request;
if (
(atomic_load(&server->stream->run->http_snapshot_requested) == 0) // Request complete
|| (client->request_ts + US_MAX((uint)1, server->timeout / 2) < us_get_now_monotonic())
) {
struct evbuffer *buf;
_A_EVBUFFER_NEW(buf);
_A_EVBUFFER_ADD(buf, (const void*)ex->frame->data, ex->frame->used);
_A_ADD_HEADER(request, "Cache-Control", "no-store, no-cache, must-revalidate, proxy-revalidate, pre-check=0, post-check=0, max-age=0");
_A_ADD_HEADER(request, "Pragma", "no-cache");
_A_ADD_HEADER(request, "Expires", "Mon, 3 Jan 2000 12:34:56 GMT");
char header_buf[256];
ADD_TIME_HEADER("X-Timestamp", us_get_now_real());
_A_ADD_HEADER(request, "X-UStreamer-Online", us_bool_to_string(ex->frame->online));
ADD_UNSIGNED_HEADER("X-UStreamer-Dropped", ex->dropped);
ADD_UNSIGNED_HEADER("X-UStreamer-Width", ex->frame->width);
ADD_UNSIGNED_HEADER("X-UStreamer-Height", ex->frame->height);
ADD_TIME_HEADER("X-UStreamer-Grab-Timestamp", ex->frame->grab_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-Begin-Timestamp", ex->frame->encode_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Encode-End-Timestamp", ex->frame->encode_end_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Begin-Timestamp", ex->expose_begin_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-Cmp-Timestamp", ex->expose_cmp_ts);
ADD_TIME_HEADER("X-UStreamer-Expose-End-Timestamp", ex->expose_end_ts);
ADD_TIME_HEADER("X-UStreamer-Send-Timestamp", us_get_now_monotonic());
_A_ADD_HEADER(request, "Content-Type", "image/jpeg");
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
US_LIST_REMOVE(server->run->snapshot_clients, client);
free(client);
}
});
# undef ADD_UNSUGNED_HEADER
# undef ADD_TIME_HEADER
}
static void _http_refresher(int fd, short what, void *v_server) {
(void)fd;
(void)what;
@ -907,7 +934,8 @@ static void _http_refresher(int fd, short what, void *v_server) {
stream_updated = true;
}
_http_queue_send_stream(server, stream_updated, frame_updated);
_http_send_stream(server, stream_updated, frame_updated);
_http_send_snapshot(server);
if (
frame_updated

View File

@ -57,6 +57,14 @@ typedef struct us_stream_client_sx {
US_LIST_STRUCT(struct us_stream_client_sx);
} us_stream_client_s;
typedef struct us_snapshot_client_sx {
struct us_server_sx *server;
struct evhttp_request *request;
ldf request_ts;
US_LIST_STRUCT(struct us_snapshot_client_sx);
} us_snapshot_client_s;
typedef struct {
us_frame_s *frame;
uint captured_fps;
@ -83,6 +91,8 @@ typedef struct {
us_stream_client_s *stream_clients;
uint stream_clients_count;
us_snapshot_client_s *snapshot_clients;
} us_server_runtime_s;
typedef struct us_server_sx {

View File

@ -95,6 +95,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
US_CALLOC(run, 1);
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
atomic_init(&run->http_has_clients, false);
atomic_init(&run->http_snapshot_requested, 0);
atomic_init(&run->http_last_request_ts, 0);
atomic_init(&run->http_captured_fps, 0);
atomic_init(&run->stop, false);
@ -267,6 +268,9 @@ static void *_jpeg_thread(void *v_ctx) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
if (atomic_load(&stream->run->http_snapshot_requested) > 0) { // Process real snapshots
atomic_fetch_sub(&stream->run->http_snapshot_requested, 1);
}
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
@ -354,6 +358,7 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
const us_stream_runtime_s *const run = stream->run;
return (
atomic_load(&run->http_has_clients)
|| (atomic_load(&run->http_snapshot_requested) > 0)
// has_clients синков НЕ обновляются в реальном времени
|| (stream->jpeg_sink != NULL && atomic_load(&stream->jpeg_sink->has_clients))
|| (run->h264 != NULL && /*run->h264->sink == NULL ||*/ atomic_load(&run->h264->sink->has_clients))

View File

@ -42,6 +42,7 @@ typedef struct {
us_ring_s *http_jpeg_ring;
atomic_bool http_has_clients;
atomic_uint http_snapshot_requested;
atomic_ullong http_last_request_ts; // Seconds
atomic_uint http_captured_fps;