diff --git a/src/device.h b/src/device.h index 3af7150..1e3c9ce 100644 --- a/src/device.h +++ b/src/device.h @@ -24,7 +24,6 @@ #include #include -#include #include @@ -106,7 +105,6 @@ struct device_t { struct controls_t *ctl; struct device_runtime_t *run; - sig_atomic_t volatile stop; }; diff --git a/src/http.c b/src/http.c index a155143..68e10ad 100644 --- a/src/http.c +++ b/src/http.c @@ -147,6 +147,10 @@ int http_server_listen(struct http_server_t *server) { server->run->drop_same_frames_blank = max_u(server->drop_same_frames, server->run->drop_same_frames_blank); + if (server->slowdown) { + stream_switch_slowdown(server->run->stream, true); + } + evhttp_set_timeout(server->run->http, server->timeout); if (server->unix_path) { @@ -406,6 +410,10 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server } server->run->stream_clients_count += 1; + if (server->slowdown && server->run->stream_clients_count == 1) { + stream_switch_slowdown(server->run->stream, false); + } + evhttp_connection_get_peer(conn, &client_addr, &client_port); LOG_INFO("HTTP: Registered the new stream client: [%s]:%u; id=%s; advance_headers=%s; dual_final_frames=%s; clients now: %u", client_addr, @@ -559,20 +567,25 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN char *client_addr = "???"; unsigned short client_port = 0; - client->server->run->stream_clients_count -= 1; +# define RUN(_next) client->server->run->_next + + RUN(stream_clients_count) -= 1; + if (client->server->slowdown && RUN(stream_clients_count) <= 0) { + stream_switch_slowdown(RUN(stream), true); + } conn = evhttp_request_get_connection(client->request); if (conn != NULL) { evhttp_connection_get_peer(conn, &client_addr, &client_port); } LOG_INFO("HTTP: Disconnected the stream client: [%s]:%u; clients now: %u", - client_addr, client_port, client->server->run->stream_clients_count); + client_addr, client_port, RUN(stream_clients_count)); if (conn != NULL) { evhttp_connection_free(conn); } if (client->prev == NULL) { - client->server->run->stream_clients = client->next; + RUN(stream_clients) = client->next; } else { client->prev->next = client->next; } @@ -581,6 +594,8 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN } free(client->key); free(client); + +# undef RUN } static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated) { diff --git a/src/http.h b/src/http.h index 99eee35..2ea5bad 100644 --- a/src/http.h +++ b/src/http.h @@ -85,6 +85,7 @@ struct http_server_t { bool unix_rm; mode_t unix_mode; unsigned drop_same_frames; + bool slowdown; unsigned fake_width; unsigned fake_height; unsigned timeout; diff --git a/src/main.c b/src/main.c index b867b52..468528c 100644 --- a/src/main.c +++ b/src/main.c @@ -80,6 +80,7 @@ static const struct option _long_opts[] = { {"unix-rm", no_argument, NULL, 'r'}, {"unix-mode", required_argument, NULL, 'o'}, {"drop-same-frames", required_argument, NULL, 'e'}, + {"slowdown", no_argument, NULL, 3000}, {"fake-width", required_argument, NULL, 3001}, {"fake-height", required_argument, NULL, 3002}, {"server-timeout", required_argument, NULL, 3003}, @@ -162,6 +163,8 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s printf(" It can significantly reduce the outgoing traffic, but will increase\n"); printf(" the CPU loading. Don't use this option with analog signal sources\n"); printf(" or webcams, it's useless. Default: disabled.\n\n"); + printf(" --slowdown -- Slowdown capturing to 1 FPS or less when no stream clients connected.\n"); + printf(" Useful to reduce CPU cosumption. Default: disabled.\n\n"); printf(" --fake-width -- Override image width for /state. Default: disabled\n\n"); printf(" --fake-height -- Override image height for /state. Default: disabled.\n\n"); printf(" --server-timeout -- Timeout for client connections. Default: %u\n\n", server->timeout); @@ -271,6 +274,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e case 'r': OPT_SET(server->unix_rm, true); case 'o': OPT_CHMOD(server->unix_mode, "--unix-mode"); case 'e': OPT_UNSIGNED(server->drop_same_frames, "--drop-same-frames", 0, 30); + case 3000: OPT_SET(server->slowdown, true); case 3001: OPT_UNSIGNED(server->fake_width, "--fake-width", 0, 1920); case 3002: OPT_UNSIGNED(server->fake_height, "--fake-height", 0, 1200); case 3003: OPT_UNSIGNED(server->timeout, "--server-timeout", 1, 60); @@ -377,8 +381,8 @@ int main(int argc, char *argv[]) { if ((exit_code = http_server_listen(server)) == 0) { A_PTHREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); A_PTHREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); - A_PTHREAD_JOIN(stream_loop_tid); A_PTHREAD_JOIN(server_loop_tid); + A_PTHREAD_JOIN(stream_loop_tid); } } diff --git a/src/stream.c b/src/stream.c index d1cbc2f..0867393 100644 --- a/src/stream.c +++ b/src/stream.c @@ -20,6 +20,7 @@ *****************************************************************************/ +#include #include #include #include @@ -40,12 +41,12 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); -static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool); -static int _stream_init(struct device_t *dev, struct workers_pool_t *pool); +static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool); +static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool); -static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool); +static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool); static void *_stream_worker_thread(void *v_ctx); -static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool); +static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool); static int _stream_control(struct device_t *dev, bool enable); static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); @@ -54,17 +55,22 @@ static int _stream_handle_event(struct device_t *dev); struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { + struct process_t *proc; struct stream_t *stream; + A_CALLOC(proc, 1); + A_CALLOC(stream, 1); stream->dev = dev; stream->encoder = encoder; A_PTHREAD_M_INIT(&stream->mutex); + stream->proc = proc; return stream; } void stream_destroy(struct stream_t *stream) { A_PTHREAD_M_DESTROY(&stream->mutex); + free(stream->proc); free(stream); } @@ -79,7 +85,7 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Using V4L2 device: %s", stream->dev->path); LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); - while (_stream_init_loop(stream->dev, &pool) == 0) { + while (_stream_init_loop(stream, &pool) == 0) { struct worker_t *oldest_worker = NULL; struct worker_t *last_worker = NULL; long double grab_after = 0; @@ -93,7 +99,7 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Capturing ..."); - while (!stream->dev->stop) { + while (!stream->proc->stop) { int free_worker_number = -1; SEP_DEBUG('-'); @@ -133,10 +139,14 @@ void stream_loop(struct stream_t *stream) { LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number); } - if (stream->dev->stop) { + if (stream->proc->stop) { break; } + if (stream->proc->slowdown) { + usleep(1000000); + } + # define INIT_FD_SET(_set) \ fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set); @@ -286,13 +296,17 @@ void stream_loop(struct stream_t *stream) { A_PTHREAD_M_UNLOCK(&stream->mutex); } - _stream_destroy_workers(stream->dev, &pool); + _stream_destroy_workers(stream, &pool); _stream_control(stream->dev, false); device_close(stream->dev); } void stream_loop_break(struct stream_t *stream) { - stream->dev->stop = 1; + stream->proc->stop = 1; +} + +void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { + stream->proc->slowdown = slowdown; } static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { @@ -348,14 +362,14 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker return min_delay; } -static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) { +static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) { int retval = -1; - LOG_DEBUG("%s: *dev->stop = %d", __FUNCTION__, dev->stop); - while (!dev->stop) { - if ((retval = _stream_init(dev, pool)) < 0) { - LOG_INFO("Sleeping %u seconds before new stream init ...", dev->error_delay); - sleep(dev->error_delay); + LOG_DEBUG("%s: *stream->proc->stop = %d", __FUNCTION__, stream->proc->stop); + while (!stream->proc->stop) { + if ((retval = _stream_init(stream, pool)) < 0) { + LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay); + sleep(stream->dev->error_delay); } else { break; } @@ -363,41 +377,41 @@ static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) return retval; } -static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) { +static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { SEP_INFO('='); - _stream_destroy_workers(dev, pool); - _stream_control(dev, false); - device_close(dev); + _stream_destroy_workers(stream, pool); + _stream_control(stream->dev, false); + device_close(stream->dev); - if (device_open(dev) < 0) { + if (device_open(stream->dev) < 0) { goto error; } - if (_stream_control(dev, true) < 0) { + if (_stream_control(stream->dev, true) < 0) { goto error; } - encoder_prepare_live(pool->encoder, dev); + encoder_prepare_live(pool->encoder, stream->dev); - _stream_init_workers(dev, pool); + _stream_init_workers(stream, pool); return 0; error: - device_close(dev); + device_close(stream->dev); return -1; } -static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) { - LOG_INFO("Spawning %u workers ...", dev->n_workers); +static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) { + LOG_INFO("Spawning %u workers ...", stream->dev->n_workers); *pool->workers_stop = false; - A_CALLOC(pool->workers, dev->n_workers); + A_CALLOC(pool->workers, stream->dev->n_workers); A_PTHREAD_M_INIT(&pool->free_workers_mutex); A_PTHREAD_C_INIT(&pool->free_workers_cond); - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next # define CTX(_next) WORKER(ctx._next) @@ -407,8 +421,8 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po A_PTHREAD_C_INIT(&WORKER(has_job_cond)); CTX(number) = number; - CTX(dev) = dev; - CTX(dev_stop) = (sig_atomic_t *volatile)&dev->stop; + CTX(dev) = stream->dev; + CTX(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop); CTX(workers_stop) = pool->workers_stop; CTX(encoder) = pool->encoder; @@ -438,7 +452,7 @@ static void *_stream_worker_thread(void *v_ctx) { LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number); - while (!*ctx->dev_stop && !*ctx->workers_stop) { + while (!*ctx->proc_stop && !*ctx->workers_stop) { LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); A_PTHREAD_M_LOCK(ctx->has_job_mutex); A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); @@ -485,12 +499,12 @@ static void *_stream_worker_thread(void *v_ctx) { return NULL; } -static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) { +static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) { if (pool->workers) { LOG_INFO("Destroying workers ..."); *pool->workers_stop = true; - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next A_PTHREAD_M_LOCK(&WORKER(has_job_mutex)); diff --git a/src/stream.h b/src/stream.h index 7b33ec2..a25687e 100644 --- a/src/stream.h +++ b/src/stream.h @@ -36,7 +36,7 @@ struct worker_context_t { struct device_t *dev; int buf_index; struct v4l2_buffer buf_info; - sig_atomic_t *volatile dev_stop; + sig_atomic_t *volatile proc_stop; bool *workers_stop; struct encoder_t *encoder; @@ -83,6 +83,11 @@ struct workers_pool_t { struct encoder_t *encoder; }; +struct process_t { + sig_atomic_t volatile stop; + bool slowdown; +}; + struct stream_t { struct picture_t picture; unsigned width; @@ -90,6 +95,8 @@ struct stream_t { unsigned captured_fps; bool updated; pthread_mutex_t mutex; + + struct process_t *proc; struct device_t *dev; struct encoder_t *encoder; }; @@ -100,3 +107,4 @@ void stream_destroy(struct stream_t *stream); void stream_loop(struct stream_t *stream); void stream_loop_break(struct stream_t *stream); +void stream_switch_slowdown(struct stream_t *stream, bool slowdown);