From c1f48fca4061413acdc47ddd80bf5917c70ee332 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Thu, 20 Sep 2018 00:00:33 +0300 Subject: [PATCH] stream prototype --- src/device.c | 4 +- src/http.c | 242 ++++++++++++++++++++++++++++++++++++++++----------- src/http.h | 23 ++++- src/main.c | 6 +- 4 files changed, 216 insertions(+), 59 deletions(-) diff --git a/src/device.c b/src/device.c index b34b42d..9960af2 100644 --- a/src/device.c +++ b/src/device.c @@ -14,8 +14,6 @@ #include "device.h" -static const char DEFAULT_DEVICE[] = "/dev/video0"; - static const struct { const char *name; const v4l2_std_id standard; @@ -57,7 +55,7 @@ struct device_t *device_init() { run->fd = -1; A_CALLOC(dev, 1); - dev->path = (char *)DEFAULT_DEVICE; + dev->path = "/dev/video0"; dev->width = 640; dev->height = 480; dev->format = V4L2_PIX_FMT_YUYV; diff --git a/src/http.c b/src/http.c index d21b297..92b8317 100644 --- a/src/http.c +++ b/src/http.c @@ -4,10 +4,10 @@ #include #include #include -#include #include #include #include +#include #ifndef EVTHREAD_USE_PTHREADS_IMPLEMENTED # error Required libevent-pthreads support @@ -19,14 +19,20 @@ #include "http.h" -static const char DEFAULT_HOST[] = "localhost"; +#define BOUNDARY "boundarydonotcross" +#define RN "\r\n" static void _http_callback_root(struct evhttp_request *request, void *arg); -static void _http_callback_stream_ping(struct evhttp_request *request, void *v_server); -static void _http_callback_stream_snapshot(struct evhttp_request *request, void *v_server); -static void _http_update_exposed(struct http_server_t *server); -static void _http_add_header(struct evhttp_request *request, const char *key, const char *value); +static void _http_callback_ping(struct evhttp_request *request, void *v_server); +static void _http_callback_snapshot(struct evhttp_request *request, void *v_server); + +static void _http_callback_stream(struct evhttp_request *request, void *v_server); +static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_ctx); +static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx); + +static void _http_send_stream(struct http_server_t *server); +static void _http_exposed_refresh(int fd, short event, void *v_server); struct http_server_t *http_server_init(struct stream_t *stream) { @@ -39,10 +45,13 @@ struct http_server_t *http_server_init(struct stream_t *stream) { A_CALLOC(run, 1); run->stream = stream; run->exposed = exposed; + run->refresh_interval.tv_sec = 0; + run->refresh_interval.tv_usec = 30000; // ~30 refreshes per second A_CALLOC(server, 1); - server->host = (char *)DEFAULT_HOST; + server->host = "localhost"; server->port = 8080; + server->timeout = 10; server->run = run; assert(!evthread_use_pthreads()); @@ -51,23 +60,31 @@ struct http_server_t *http_server_init(struct stream_t *stream) { evhttp_set_allowed_methods(run->http, EVHTTP_REQ_GET); // TODO: HEAD assert(!evhttp_set_cb(run->http, "/", _http_callback_root, NULL)); - assert(!evhttp_set_cb(run->http, "/ping", _http_callback_stream_ping, (void *)server)); - assert(!evhttp_set_cb(run->http, "/snapshot", _http_callback_stream_snapshot, (void *)server)); + assert(!evhttp_set_cb(run->http, "/ping", _http_callback_ping, (void *)exposed)); + assert(!evhttp_set_cb(run->http, "/snapshot", _http_callback_snapshot, (void *)exposed)); + assert(!evhttp_set_cb(run->http, "/stream", _http_callback_stream, (void *)server)); + + assert((run->refresh = event_new(run->base, -1, EV_PERSIST, _http_exposed_refresh, server))); + assert(!event_add(run->refresh, &run->refresh_interval)); return server; } void http_server_destroy(struct http_server_t *server) { + event_del(server->run->refresh); + event_free(server->run->refresh); evhttp_free(server->run->http); event_base_free(server->run->base); + libevent_global_shutdown(); + free(server->run->exposed->picture.data); free(server->run->exposed); free(server->run); free(server); - libevent_global_shutdown(); } int http_server_listen(struct http_server_t *server) { LOG_DEBUG("Binding HTTP to [%s]:%d ...", server->host, server->port); + //evhttp_set_timeout(server->run->http, server->timeout); // FIXME if (evhttp_bind_socket(server->run->http, server->host, server->port) != 0) { LOG_PERROR("Can't listen HTTP on [%s]:%d", server->host, server->port) return -1; @@ -87,6 +104,10 @@ void http_server_loop_break(struct http_server_t *server) { } +INLINE static void _http_add_header(struct evhttp_request *request, const char *key, const char *value) { + assert(!evhttp_add_header(evhttp_request_get_output_headers(request), key, value)); +} + static void _http_callback_root(struct evhttp_request *request, UNUSED void *arg) { struct evbuffer *buf; @@ -96,79 +117,200 @@ static void _http_callback_root(struct evhttp_request *request, UNUSED void *arg "uStreamer
    " "
  • /ping
  • " "
  • /snapshot
  • " + "
  • /stream
  • " "" )); _http_add_header(request, "Content-Type", "text/html"); - evhttp_send_reply(request, 200, "OK", buf); + evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); } -static void _http_callback_stream_ping(struct evhttp_request *request, void *v_server) { - struct http_server_t *server = (struct http_server_t *)v_server; +static void _http_callback_ping(struct evhttp_request *request, void *v_exposed) { + struct exposed_t *exposed = (struct exposed_t *)v_exposed; struct evbuffer *buf; - _http_update_exposed(server); - assert((buf = evbuffer_new())); assert(evbuffer_add_printf(buf, "{\"stream\": {\"resolution\":" " {\"width\": %u, \"height\": %u}," " \"online\": %s}}", - server->run->exposed->width, - server->run->exposed->height, - (server->run->exposed->online ? "true" : "false") + exposed->width, exposed->height, + (exposed->online ? "true" : "false") )); _http_add_header(request, "Content-Type", "application/json"); - evhttp_send_reply(request, 200, "OK", buf); + evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); } -static void _http_callback_stream_snapshot(struct evhttp_request *request, void *v_server) { - struct http_server_t *server = (struct http_server_t *)v_server; +static void _http_callback_snapshot(struct evhttp_request *request, void *v_exposed) { + struct exposed_t *exposed = (struct exposed_t *)v_exposed; struct evbuffer *buf; - struct timespec now_spec; - char now_str[64]; - - _http_update_exposed(server); + struct timespec x_timestamp_spec; + char x_timestamp_buf[64]; assert((buf = evbuffer_new())); - assert(!evbuffer_add(buf, (const void *)server->run->exposed->picture.data, server->run->exposed->picture.size)); + assert(!evbuffer_add(buf, (const void *)exposed->picture.data, exposed->picture.size)); - assert(!clock_gettime(CLOCK_REALTIME, &now_spec)); - sprintf(now_str, "%u.%06u", (unsigned)now_spec.tv_sec, (unsigned)(now_spec.tv_nsec / 1000)); // TODO: round? + assert(!clock_gettime(CLOCK_REALTIME, &x_timestamp_spec)); + sprintf( + x_timestamp_buf, "%u.%06u", + (unsigned)x_timestamp_spec.tv_sec, + (unsigned)(x_timestamp_spec.tv_nsec / 1000) // TODO: round? + ); - _http_add_header(request, "Access-Control-Allow-Origin:", "*"); // TODO: need this? + _http_add_header(request, "Access-Control-Allow-Origin:", "*"); _http_add_header(request, "Cache-Control", "no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0"); _http_add_header(request, "Pragma", "no-cache"); _http_add_header(request, "Expires", "Mon, 3 Jan 2000 12:34:56 GMT"); - _http_add_header(request, "X-Timestamp", now_str); + _http_add_header(request, "X-Timestamp", x_timestamp_buf); _http_add_header(request, "Content-Type", "image/jpeg"); - evhttp_send_reply(request, 200, "OK", buf); + evhttp_send_reply(request, HTTP_OK, "OK", buf); evbuffer_free(buf); } -static void _http_update_exposed(struct http_server_t *server) { - if (server->run->stream->updated) { - A_PTHREAD_M_LOCK(&server->run->stream->mutex); - if (server->run->stream->picture.allocated > server->run->exposed->picture.allocated) { - A_REALLOC(server->run->exposed->picture.data, server->run->stream->picture.allocated); - server->run->exposed->picture.allocated = server->run->stream->picture.allocated; +static void _http_callback_stream(struct evhttp_request *request, void *v_server) { + // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2814 + // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2789 + // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L362 + // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L791 + // https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L1458 + + struct http_server_t *server = (struct http_server_t *)v_server; + struct evhttp_connection *conn; + struct bufferevent *buf_event; + struct stream_client_t *client; + + conn = evhttp_request_get_connection(request); + if (conn != NULL) { + A_CALLOC(client, 1); + client->server = server; + client->request = request; + client->need_initial = true; + + if (server->run->stream_clients == NULL) { + server->run->stream_clients = client; + } else { + struct stream_client_t *last = server->run->stream_clients; + + for (; last->next != NULL; last = last->next); + client->prev = last; + last->next = client; } - memcpy( - server->run->exposed->picture.data, - server->run->stream->picture.data, - server->run->stream->picture.size * sizeof(*server->run->exposed->picture.data) - ); - server->run->exposed->picture.size = server->run->stream->picture.size; - server->run->exposed->width = server->run->stream->width; - server->run->exposed->height = server->run->stream->height; - server->run->exposed->online = server->run->stream->online; - server->run->stream->updated = false; - A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); + + buf_event = evhttp_connection_get_bufferevent(conn); + bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client); + bufferevent_enable(buf_event, EV_READ); + } else { + evhttp_request_free(request); } } -static void _http_add_header(struct evhttp_request *request, const char *key, const char *value) { - assert(!evhttp_add_header(evhttp_request_get_output_headers(request), key, value)); +static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_client) { + struct stream_client_t *client = (struct stream_client_t *)v_client; + struct evbuffer *buf; + struct timespec x_timestamp_spec; + + assert((buf = evbuffer_new())); + assert(!clock_gettime(CLOCK_REALTIME, &x_timestamp_spec)); + + if (client->need_initial) { + assert(evbuffer_add_printf(buf, + "HTTP/1.0 200 OK" RN + "Access-Control-Allow-Origin: *" RN + "Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0" RN + "Pragma: no-cache" RN + "Expires: Mon, 3 Jan 2000 12:34:56 GMT" RN + "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY RN + RN + "--" BOUNDARY RN + )); + assert(!bufferevent_write_buffer(buf_event, buf)); // FIXME + client->need_initial = false; + } + + assert(evbuffer_add_printf(buf, + "Content-Type: image/jpeg" RN + "Content-Length: %lu" RN + "X-Timestamp: %u.%06u" RN + RN, + client->server->run->exposed->picture.size * sizeof(*client->server->run->exposed->picture.data), + (unsigned)x_timestamp_spec.tv_sec, + (unsigned)(x_timestamp_spec.tv_nsec / 1000) // TODO: round? + )); + assert(!evbuffer_add(buf, + (void *)client->server->run->exposed->picture.data, + client->server->run->exposed->picture.size * sizeof(*client->server->run->exposed->picture.data) + )); + assert(evbuffer_add_printf(buf, RN "--" BOUNDARY RN)); + + assert(!bufferevent_write_buffer(buf_event, buf)); // FIXME + evbuffer_free(buf); + + bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client); + bufferevent_enable(buf_event, EV_READ); +} + +static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UNUSED short what, void *v_client) { + struct stream_client_t *client = (struct stream_client_t *)v_client; + struct evhttp_connection *conn; + + conn = evhttp_request_get_connection(client->request); + if (conn != NULL) { + evhttp_connection_free(conn); + } + + if (client->prev == NULL) { + client->server->run->stream_clients = client->next; + } else { + client->prev->next = client->next; + } + if (client->next != NULL) { + client->next->prev = client->prev; + } + free(client); +} + +static void _http_send_stream(struct http_server_t *server) { + struct stream_client_t *client; + struct evhttp_connection *conn; + struct bufferevent *buf_event; + + for (client = server->run->stream_clients; client != NULL; client = client->next) { + conn = evhttp_request_get_connection(client->request); + if (conn != NULL) { + buf_event = evhttp_connection_get_bufferevent(conn); + bufferevent_setcb(buf_event, NULL, _http_callback_stream_write, _http_callback_stream_error, (void *)client); + bufferevent_enable(buf_event, EV_READ|EV_WRITE); + } + } +} + +static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_server) { + struct http_server_t *server = (struct http_server_t *)v_server; + + if (server->run->stream->updated) { + LOG_DEBUG("Refreshing HTTP exposed ..."); + + A_PTHREAD_M_LOCK(&server->run->stream->mutex); + if (server->run->stream->picture.size > 0) { + if (server->run->stream->picture.allocated > server->run->exposed->picture.allocated) { + A_REALLOC(server->run->exposed->picture.data, server->run->stream->picture.allocated); + server->run->exposed->picture.allocated = server->run->stream->picture.allocated; + } + memcpy( + server->run->exposed->picture.data, + server->run->stream->picture.data, + server->run->stream->picture.size * sizeof(*server->run->exposed->picture.data) + ); + server->run->exposed->picture.size = server->run->stream->picture.size; + server->run->exposed->width = server->run->stream->width; + server->run->exposed->height = server->run->stream->height; + server->run->exposed->online = server->run->stream->online; + server->run->stream->updated = false; + } + A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); + + _http_send_stream(server); + } } diff --git a/src/http.h b/src/http.h index 15a0db1..bcd3026 100644 --- a/src/http.h +++ b/src/http.h @@ -1,4 +1,5 @@ #include +#include #include #include @@ -6,6 +7,15 @@ #include "stream.h" +struct stream_client_t { + struct http_server_t *server; + struct evhttp_request *request; + bool need_initial; + + struct stream_client_t *prev; + struct stream_client_t *next; +}; + struct exposed_t { struct picture_t picture; unsigned width; @@ -14,15 +24,20 @@ struct exposed_t { }; struct http_server_runtime_t { - struct event_base *base; - struct evhttp *http; - struct stream_t *stream; - struct exposed_t *exposed; + struct event_base *base; + struct evhttp *http; + struct event *refresh; + struct stream_t *stream; + struct exposed_t *exposed; + struct timeval refresh_interval; + + struct stream_client_t *stream_clients; }; struct http_server_t { char *host; unsigned port; + unsigned timeout; struct http_server_runtime_t *run; }; diff --git a/src/main.c b/src/main.c index 52ebcaa..7252255 100644 --- a/src/main.c +++ b/src/main.c @@ -34,6 +34,7 @@ static const struct option _long_opts[] = { {"host", required_argument, NULL, 's'}, {"port", required_argument, NULL, 'p'}, + {"server-timeout", required_argument, NULL, 2000}, {"debug", no_argument, NULL, 5000}, {"log-level", required_argument, NULL, 5001}, @@ -87,6 +88,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct h case 's': server->host = optarg; break; case 'p': OPT_UNSIGNED(server->port, "--port", 1); + case 2000: OPT_UNSIGNED(server->timeout, "--server-timeout", 1); case 5000: log_level = LOG_LEVEL_DEBUG; break; case 5001: OPT_UNSIGNED(log_level, "--log-level", 0); @@ -118,13 +120,13 @@ static void _block_thread_signals() { assert(!pthread_sigmask(SIG_BLOCK, &mask, NULL)); } -static void *_stream_loop_thread(UNUSED void *_) { +static void *_stream_loop_thread(UNUSED void *arg) { _block_thread_signals(); stream_loop(_ctx->stream); return NULL; } -static void *_server_loop_thread(UNUSED void *_) { +static void *_server_loop_thread(UNUSED void *arg) { _block_thread_signals(); http_server_loop(_ctx->server); return NULL;