stream prototype

This commit is contained in:
Devaev Maxim 2018-09-20 00:00:33 +03:00
parent 3d7da070dd
commit c1f48fca40
4 changed files with 216 additions and 59 deletions

View File

@ -14,8 +14,6 @@
#include "device.h"
static const char DEFAULT_DEVICE[] = "/dev/video0";
static const struct {
const char *name;
const v4l2_std_id standard;
@ -57,7 +55,7 @@ struct device_t *device_init() {
run->fd = -1;
A_CALLOC(dev, 1);
dev->path = (char *)DEFAULT_DEVICE;
dev->path = "/dev/video0";
dev->width = 640;
dev->height = 480;
dev->format = V4L2_PIX_FMT_YUYV;

View File

@ -4,10 +4,10 @@
#include <time.h>
#include <assert.h>
#include <event2/event.h>
#include <event2/event-config.h>
#include <event2/thread.h>
#include <event2/http.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#ifndef EVTHREAD_USE_PTHREADS_IMPLEMENTED
# error Required libevent-pthreads support
@ -19,14 +19,20 @@
#include "http.h"
static const char DEFAULT_HOST[] = "localhost";
#define BOUNDARY "boundarydonotcross"
#define RN "\r\n"
static void _http_callback_root(struct evhttp_request *request, void *arg);
static void _http_callback_stream_ping(struct evhttp_request *request, void *v_server);
static void _http_callback_stream_snapshot(struct evhttp_request *request, void *v_server);
static void _http_update_exposed(struct http_server_t *server);
static void _http_add_header(struct evhttp_request *request, const char *key, const char *value);
static void _http_callback_ping(struct evhttp_request *request, void *v_server);
static void _http_callback_snapshot(struct evhttp_request *request, void *v_server);
static void _http_callback_stream(struct evhttp_request *request, void *v_server);
static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_ctx);
static void _http_callback_stream_error(struct bufferevent *buf_event, short what, void *v_ctx);
static void _http_send_stream(struct http_server_t *server);
static void _http_exposed_refresh(int fd, short event, void *v_server);
struct http_server_t *http_server_init(struct stream_t *stream) {
@ -39,10 +45,13 @@ struct http_server_t *http_server_init(struct stream_t *stream) {
A_CALLOC(run, 1);
run->stream = stream;
run->exposed = exposed;
run->refresh_interval.tv_sec = 0;
run->refresh_interval.tv_usec = 30000; // ~30 refreshes per second
A_CALLOC(server, 1);
server->host = (char *)DEFAULT_HOST;
server->host = "localhost";
server->port = 8080;
server->timeout = 10;
server->run = run;
assert(!evthread_use_pthreads());
@ -51,23 +60,31 @@ struct http_server_t *http_server_init(struct stream_t *stream) {
evhttp_set_allowed_methods(run->http, EVHTTP_REQ_GET); // TODO: HEAD
assert(!evhttp_set_cb(run->http, "/", _http_callback_root, NULL));
assert(!evhttp_set_cb(run->http, "/ping", _http_callback_stream_ping, (void *)server));
assert(!evhttp_set_cb(run->http, "/snapshot", _http_callback_stream_snapshot, (void *)server));
assert(!evhttp_set_cb(run->http, "/ping", _http_callback_ping, (void *)exposed));
assert(!evhttp_set_cb(run->http, "/snapshot", _http_callback_snapshot, (void *)exposed));
assert(!evhttp_set_cb(run->http, "/stream", _http_callback_stream, (void *)server));
assert((run->refresh = event_new(run->base, -1, EV_PERSIST, _http_exposed_refresh, server)));
assert(!event_add(run->refresh, &run->refresh_interval));
return server;
}
void http_server_destroy(struct http_server_t *server) {
event_del(server->run->refresh);
event_free(server->run->refresh);
evhttp_free(server->run->http);
event_base_free(server->run->base);
libevent_global_shutdown();
free(server->run->exposed->picture.data);
free(server->run->exposed);
free(server->run);
free(server);
libevent_global_shutdown();
}
int http_server_listen(struct http_server_t *server) {
LOG_DEBUG("Binding HTTP to [%s]:%d ...", server->host, server->port);
//evhttp_set_timeout(server->run->http, server->timeout); // FIXME
if (evhttp_bind_socket(server->run->http, server->host, server->port) != 0) {
LOG_PERROR("Can't listen HTTP on [%s]:%d", server->host, server->port)
return -1;
@ -87,6 +104,10 @@ void http_server_loop_break(struct http_server_t *server) {
}
INLINE static void _http_add_header(struct evhttp_request *request, const char *key, const char *value) {
assert(!evhttp_add_header(evhttp_request_get_output_headers(request), key, value));
}
static void _http_callback_root(struct evhttp_request *request, UNUSED void *arg) {
struct evbuffer *buf;
@ -96,61 +117,183 @@ static void _http_callback_root(struct evhttp_request *request, UNUSED void *arg
"<title>uStreamer</title></head><body><ul>"
"<li><a href=\"/ping\">/ping</a></li>"
"<li><a href=\"/snapshot\">/snapshot</a></li>"
"<li><a href=\"/stream\">/stream</a></li>"
"</body></html>"
));
_http_add_header(request, "Content-Type", "text/html");
evhttp_send_reply(request, 200, "OK", buf);
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
}
static void _http_callback_stream_ping(struct evhttp_request *request, void *v_server) {
struct http_server_t *server = (struct http_server_t *)v_server;
static void _http_callback_ping(struct evhttp_request *request, void *v_exposed) {
struct exposed_t *exposed = (struct exposed_t *)v_exposed;
struct evbuffer *buf;
_http_update_exposed(server);
assert((buf = evbuffer_new()));
assert(evbuffer_add_printf(buf,
"{\"stream\": {\"resolution\":"
" {\"width\": %u, \"height\": %u},"
" \"online\": %s}}",
server->run->exposed->width,
server->run->exposed->height,
(server->run->exposed->online ? "true" : "false")
exposed->width, exposed->height,
(exposed->online ? "true" : "false")
));
_http_add_header(request, "Content-Type", "application/json");
evhttp_send_reply(request, 200, "OK", buf);
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
}
static void _http_callback_stream_snapshot(struct evhttp_request *request, void *v_server) {
struct http_server_t *server = (struct http_server_t *)v_server;
static void _http_callback_snapshot(struct evhttp_request *request, void *v_exposed) {
struct exposed_t *exposed = (struct exposed_t *)v_exposed;
struct evbuffer *buf;
struct timespec now_spec;
char now_str[64];
_http_update_exposed(server);
struct timespec x_timestamp_spec;
char x_timestamp_buf[64];
assert((buf = evbuffer_new()));
assert(!evbuffer_add(buf, (const void *)server->run->exposed->picture.data, server->run->exposed->picture.size));
assert(!evbuffer_add(buf, (const void *)exposed->picture.data, exposed->picture.size));
assert(!clock_gettime(CLOCK_REALTIME, &now_spec));
sprintf(now_str, "%u.%06u", (unsigned)now_spec.tv_sec, (unsigned)(now_spec.tv_nsec / 1000)); // TODO: round?
assert(!clock_gettime(CLOCK_REALTIME, &x_timestamp_spec));
sprintf(
x_timestamp_buf, "%u.%06u",
(unsigned)x_timestamp_spec.tv_sec,
(unsigned)(x_timestamp_spec.tv_nsec / 1000) // TODO: round?
);
_http_add_header(request, "Access-Control-Allow-Origin:", "*"); // TODO: need this?
_http_add_header(request, "Access-Control-Allow-Origin:", "*");
_http_add_header(request, "Cache-Control", "no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0");
_http_add_header(request, "Pragma", "no-cache");
_http_add_header(request, "Expires", "Mon, 3 Jan 2000 12:34:56 GMT");
_http_add_header(request, "X-Timestamp", now_str);
_http_add_header(request, "X-Timestamp", x_timestamp_buf);
_http_add_header(request, "Content-Type", "image/jpeg");
evhttp_send_reply(request, 200, "OK", buf);
evhttp_send_reply(request, HTTP_OK, "OK", buf);
evbuffer_free(buf);
}
static void _http_update_exposed(struct http_server_t *server) {
static void _http_callback_stream(struct evhttp_request *request, void *v_server) {
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2814
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L2789
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L362
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L791
// https://github.com/libevent/libevent/blob/29cc8386a2f7911eaa9336692a2c5544d8b4734f/http.c#L1458
struct http_server_t *server = (struct http_server_t *)v_server;
struct evhttp_connection *conn;
struct bufferevent *buf_event;
struct stream_client_t *client;
conn = evhttp_request_get_connection(request);
if (conn != NULL) {
A_CALLOC(client, 1);
client->server = server;
client->request = request;
client->need_initial = true;
if (server->run->stream_clients == NULL) {
server->run->stream_clients = client;
} else {
struct stream_client_t *last = server->run->stream_clients;
for (; last->next != NULL; last = last->next);
client->prev = last;
last->next = client;
}
buf_event = evhttp_connection_get_bufferevent(conn);
bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client);
bufferevent_enable(buf_event, EV_READ);
} else {
evhttp_request_free(request);
}
}
static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_client) {
struct stream_client_t *client = (struct stream_client_t *)v_client;
struct evbuffer *buf;
struct timespec x_timestamp_spec;
assert((buf = evbuffer_new()));
assert(!clock_gettime(CLOCK_REALTIME, &x_timestamp_spec));
if (client->need_initial) {
assert(evbuffer_add_printf(buf,
"HTTP/1.0 200 OK" RN
"Access-Control-Allow-Origin: *" RN
"Cache-Control: no-store, no-cache, must-revalidate, pre-check=0, post-check=0, max-age=0" RN
"Pragma: no-cache" RN
"Expires: Mon, 3 Jan 2000 12:34:56 GMT" RN
"Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY RN
RN
"--" BOUNDARY RN
));
assert(!bufferevent_write_buffer(buf_event, buf)); // FIXME
client->need_initial = false;
}
assert(evbuffer_add_printf(buf,
"Content-Type: image/jpeg" RN
"Content-Length: %lu" RN
"X-Timestamp: %u.%06u" RN
RN,
client->server->run->exposed->picture.size * sizeof(*client->server->run->exposed->picture.data),
(unsigned)x_timestamp_spec.tv_sec,
(unsigned)(x_timestamp_spec.tv_nsec / 1000) // TODO: round?
));
assert(!evbuffer_add(buf,
(void *)client->server->run->exposed->picture.data,
client->server->run->exposed->picture.size * sizeof(*client->server->run->exposed->picture.data)
));
assert(evbuffer_add_printf(buf, RN "--" BOUNDARY RN));
assert(!bufferevent_write_buffer(buf_event, buf)); // FIXME
evbuffer_free(buf);
bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client);
bufferevent_enable(buf_event, EV_READ);
}
static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UNUSED short what, void *v_client) {
struct stream_client_t *client = (struct stream_client_t *)v_client;
struct evhttp_connection *conn;
conn = evhttp_request_get_connection(client->request);
if (conn != NULL) {
evhttp_connection_free(conn);
}
if (client->prev == NULL) {
client->server->run->stream_clients = client->next;
} else {
client->prev->next = client->next;
}
if (client->next != NULL) {
client->next->prev = client->prev;
}
free(client);
}
static void _http_send_stream(struct http_server_t *server) {
struct stream_client_t *client;
struct evhttp_connection *conn;
struct bufferevent *buf_event;
for (client = server->run->stream_clients; client != NULL; client = client->next) {
conn = evhttp_request_get_connection(client->request);
if (conn != NULL) {
buf_event = evhttp_connection_get_bufferevent(conn);
bufferevent_setcb(buf_event, NULL, _http_callback_stream_write, _http_callback_stream_error, (void *)client);
bufferevent_enable(buf_event, EV_READ|EV_WRITE);
}
}
}
static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_server) {
struct http_server_t *server = (struct http_server_t *)v_server;
if (server->run->stream->updated) {
LOG_DEBUG("Refreshing HTTP exposed ...");
A_PTHREAD_M_LOCK(&server->run->stream->mutex);
if (server->run->stream->picture.size > 0) {
if (server->run->stream->picture.allocated > server->run->exposed->picture.allocated) {
A_REALLOC(server->run->exposed->picture.data, server->run->stream->picture.allocated);
server->run->exposed->picture.allocated = server->run->stream->picture.allocated;
@ -165,10 +308,9 @@ static void _http_update_exposed(struct http_server_t *server) {
server->run->exposed->height = server->run->stream->height;
server->run->exposed->online = server->run->stream->online;
server->run->stream->updated = false;
}
A_PTHREAD_M_UNLOCK(&server->run->stream->mutex);
_http_send_stream(server);
}
}
static void _http_add_header(struct evhttp_request *request, const char *key, const char *value) {
assert(!evhttp_add_header(evhttp_request_get_output_headers(request), key, value));
}

View File

@ -1,4 +1,5 @@
#include <stdbool.h>
#include <time.h>
#include <event2/event.h>
#include <event2/http.h>
@ -6,6 +7,15 @@
#include "stream.h"
struct stream_client_t {
struct http_server_t *server;
struct evhttp_request *request;
bool need_initial;
struct stream_client_t *prev;
struct stream_client_t *next;
};
struct exposed_t {
struct picture_t picture;
unsigned width;
@ -16,13 +26,18 @@ struct exposed_t {
struct http_server_runtime_t {
struct event_base *base;
struct evhttp *http;
struct event *refresh;
struct stream_t *stream;
struct exposed_t *exposed;
struct timeval refresh_interval;
struct stream_client_t *stream_clients;
};
struct http_server_t {
char *host;
unsigned port;
unsigned timeout;
struct http_server_runtime_t *run;
};

View File

@ -34,6 +34,7 @@ static const struct option _long_opts[] = {
{"host", required_argument, NULL, 's'},
{"port", required_argument, NULL, 'p'},
{"server-timeout", required_argument, NULL, 2000},
{"debug", no_argument, NULL, 5000},
{"log-level", required_argument, NULL, 5001},
@ -87,6 +88,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct h
case 's': server->host = optarg; break;
case 'p': OPT_UNSIGNED(server->port, "--port", 1);
case 2000: OPT_UNSIGNED(server->timeout, "--server-timeout", 1);
case 5000: log_level = LOG_LEVEL_DEBUG; break;
case 5001: OPT_UNSIGNED(log_level, "--log-level", 0);
@ -118,13 +120,13 @@ static void _block_thread_signals() {
assert(!pthread_sigmask(SIG_BLOCK, &mask, NULL));
}
static void *_stream_loop_thread(UNUSED void *_) {
static void *_stream_loop_thread(UNUSED void *arg) {
_block_thread_signals();
stream_loop(_ctx->stream);
return NULL;
}
static void *_server_loop_thread(UNUSED void *_) {
static void *_server_loop_thread(UNUSED void *arg) {
_block_thread_signals();
http_server_loop(_ctx->server);
return NULL;