diff --git a/Makefile b/Makefile index 0f4ecd4..e6de824 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ CC ?= gcc # ===== LIBS = -lm -ljpeg -pthread -levent -levent_pthreads -luuid -override CFLAGS += -c -std=c99 -Wall -Wextra -D_GNU_SOURCE +override CFLAGS += -c -std=c11 -Wall -Wextra -D_GNU_SOURCE SOURCES = $(shell ls src/*.c src/encoders/cpu/*.c src/encoders/hw/*.c) PROG = ustreamer diff --git a/src/http.c b/src/http.c index cb16c63..7a691e6 100644 --- a/src/http.c +++ b/src/http.c @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -330,11 +331,15 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv ADD_HEADER("Pragma", "no-cache"); ADD_HEADER("Expires", "Mon, 3 Jan 2000 12:34:56 GMT"); -# define ADD_TIME_HEADER(_key, _value) \ - { sprintf(header_buf, "%.06Lf", _value); ADD_HEADER(_key, header_buf); } +# define ADD_TIME_HEADER(_key, _value) { \ + sprintf(header_buf, "%.06Lf", _value); \ + ADD_HEADER(_key, header_buf); \ + } -# define ADD_UNSIGNED_HEADER(_key, _value) \ - { sprintf(header_buf, "%u", _value); ADD_HEADER(_key, header_buf); } +# define ADD_UNSIGNED_HEADER(_key, _value) { \ + sprintf(header_buf, "%u", _value); \ + ADD_HEADER(_key, header_buf); \ + } ADD_TIME_HEADER("X-Timestamp", get_now_real()); @@ -471,8 +476,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c // по тем же причинам, по которым у нас нет Content-Length. # define ADD_ADVANCE_HEADERS \ - { assert(evbuffer_add_printf(buf, \ - "Content-Type: image/jpeg" RN "X-Timestamp: %.06Lf" RN RN, get_now_real())); } + assert(evbuffer_add_printf(buf, \ + "Content-Type: image/jpeg" RN "X-Timestamp: %.06Lf" RN RN, get_now_real())) if (client->need_initial) { assert(evbuffer_add_printf(buf, @@ -652,10 +657,12 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv bool stream_updated = false; bool picture_updated = false; -# define UNLOCK_STREAM \ - { server->run->stream->updated = false; A_MUTEX_UNLOCK(&server->run->stream->mutex); } +# define UNLOCK_STREAM { \ + atomic_store(&server->run->stream->updated, false); \ + A_MUTEX_UNLOCK(&server->run->stream->mutex); \ + } - if (server->run->stream->updated) { + if (atomic_load(&server->run->stream->updated)) { LOG_DEBUG("Refreshing HTTP exposed ..."); A_MUTEX_LOCK(&server->run->stream->mutex); if (server->run->stream->picture.size > 0) { // If online diff --git a/src/stream.c b/src/stream.c index b5d7a58..ed1e803 100644 --- a/src/stream.c +++ b/src/stream.c @@ -20,6 +20,7 @@ *****************************************************************************/ +#include #include #include #include @@ -59,10 +60,13 @@ struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { struct stream_t *stream; A_CALLOC(proc, 1); + atomic_init(&proc->stop, false); + atomic_init(&proc->slowdown, false); A_CALLOC(stream, 1); stream->dev = dev; stream->encoder = encoder; + atomic_init(&stream->updated, false); A_MUTEX_INIT(&stream->mutex); stream->proc = proc; return stream; @@ -76,11 +80,10 @@ void stream_destroy(struct stream_t *stream) { void stream_loop(struct stream_t *stream) { struct workers_pool_t pool; - bool workers_stop; MEMSET_ZERO(pool); + atomic_init(&pool.workers_stop, false); pool.encoder = stream->encoder; - pool.workers_stop = &workers_stop; LOG_INFO("Using V4L2 device: %s", stream->dev->path); LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); @@ -99,7 +102,7 @@ void stream_loop(struct stream_t *stream) { LOG_INFO("Capturing ..."); - while (!stream->proc->stop) { + while (!atomic_load(&stream->proc->stop)) { int free_worker_number = -1; SEP_DEBUG('-'); @@ -109,7 +112,7 @@ void stream_loop(struct stream_t *stream) { A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex); - if (oldest_worker && !oldest_worker->has_job && oldest_worker->buf_index >= 0) { + if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) { if (oldest_worker->job_failed) { break; } @@ -123,7 +126,7 @@ void stream_loop(struct stream_t *stream) { } else { for (unsigned number = 0; number < stream->dev->n_workers; ++number) { if ( - !pool.workers[number].has_job && ( + !atomic_load(&pool.workers[number].has_job) && ( free_worker_number == -1 || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time ) @@ -134,16 +137,16 @@ void stream_loop(struct stream_t *stream) { } assert(free_worker_number >= 0); - assert(!pool.workers[free_worker_number].has_job); + assert(!atomic_load(&pool.workers[free_worker_number].has_job)); LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number); } - if (stream->proc->stop) { + if (atomic_load(&stream->proc->stop)) { break; } - if (stream->proc->slowdown) { + if (atomic_load(&stream->proc->slowdown)) { usleep(1000000); } @@ -252,7 +255,7 @@ void stream_loop(struct stream_t *stream) { A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex)); FREE_WORKER(buf_index) = buf_info.index; - FREE_WORKER(has_job) = true; + atomic_store(&FREE_WORKER(has_job), true); A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex)); A_COND_SIGNAL(&FREE_WORKER(has_job_cond)); @@ -292,7 +295,7 @@ void stream_loop(struct stream_t *stream) { free(stream->picture.data); stream->width = 0; stream->height = 0; - stream->updated = true; + atomic_store(&stream->updated, true); A_MUTEX_UNLOCK(&stream->mutex); } @@ -302,11 +305,11 @@ void stream_loop(struct stream_t *stream) { } void stream_loop_break(struct stream_t *stream) { - stream->proc->stop = 1; + atomic_store(&stream->proc->stop, true); } void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { - stream->proc->slowdown = slowdown; + atomic_store(&stream->proc->slowdown, slowdown); } static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { @@ -325,7 +328,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) stream->width = stream->dev->run->width; stream->height = stream->dev->run->height; - stream->updated = true; + atomic_store(&stream->updated, true); A_MUTEX_UNLOCK(&stream->mutex); @@ -365,8 +368,8 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) { int retval = -1; - LOG_DEBUG("%s: *stream->proc->stop = %d", __FUNCTION__, stream->proc->stop); - while (!stream->proc->stop) { + LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop)); + while (!atomic_load(&stream->proc->stop)) { if ((retval = _stream_init(stream, pool)) < 0) { LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay); sleep(stream->dev->error_delay); @@ -405,7 +408,7 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) { LOG_INFO("Spawning %u workers ...", stream->dev->n_workers); - *pool->workers_stop = false; + atomic_store(&pool->workers_stop, false); A_CALLOC(pool->workers, stream->dev->n_workers); A_MUTEX_INIT(&pool->free_workers_mutex); @@ -417,11 +420,12 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t pool->free_workers += 1; A_MUTEX_INIT(&WORKER(has_job_mutex)); + atomic_init(&WORKER(has_job), false); A_COND_INIT(&WORKER(has_job_cond)); WORKER(number) = number; - WORKER(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop); - WORKER(workers_stop) = pool->workers_stop; + WORKER(proc_stop) = &stream->proc->stop; + WORKER(workers_stop) = &pool->workers_stop; WORKER(free_workers_mutex) = &pool->free_workers_mutex; WORKER(free_workers) = &pool->free_workers; @@ -441,13 +445,13 @@ static void *_stream_worker_thread(void *v_worker) { LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number); - while (!*worker->proc_stop && !*worker->workers_stop) { + while (!atomic_load(worker->proc_stop) && !atomic_load(worker->workers_stop)) { LOG_DEBUG("Worker %u waiting for a new job ...", worker->number); A_MUTEX_LOCK(&worker->has_job_mutex); - A_COND_WAIT_TRUE(worker->has_job, &worker->has_job_cond, &worker->has_job_mutex); + A_COND_WAIT_TRUE(atomic_load(&worker->has_job), &worker->has_job_cond, &worker->has_job_mutex); A_MUTEX_UNLOCK(&worker->has_job_mutex); - if (!*worker->workers_stop) { + if (!atomic_load(worker->workers_stop)) { # define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]._next LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index); @@ -460,7 +464,7 @@ static void *_stream_worker_thread(void *v_worker) { if (_stream_release_buffer(worker->dev, &worker->buf_info) == 0) { worker->job_start_time = PICTURE(encode_begin_time); - worker->has_job = false; + atomic_store(&worker->has_job, false); long double last_comp_time = PICTURE(encode_end_time) - worker->job_start_time; @@ -472,7 +476,7 @@ static void *_stream_worker_thread(void *v_worker) { PICTURE(size), last_comp_time, worker->number, worker->buf_index); } else { worker->job_failed = true; - worker->has_job = false; + atomic_store(&worker->has_job, false); } # undef PICTURE @@ -492,12 +496,12 @@ static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool if (pool->workers) { LOG_INFO("Destroying workers ..."); - *pool->workers_stop = true; + atomic_store(&pool->workers_stop, true); for (unsigned number = 0; number < stream->dev->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next A_MUTEX_LOCK(&WORKER(has_job_mutex)); - WORKER(has_job) = true; // Final job: die + atomic_store(&WORKER(has_job), true); // Final job: die A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); A_COND_SIGNAL(&WORKER(has_job_cond)); diff --git a/src/stream.h b/src/stream.h index 2676887..5f98b63 100644 --- a/src/stream.h +++ b/src/stream.h @@ -23,7 +23,7 @@ #pragma once #include -#include +#include #include @@ -34,8 +34,8 @@ struct worker_t { pthread_t tid; unsigned number; - sig_atomic_t *volatile proc_stop; - bool *workers_stop; + atomic_bool *proc_stop; + atomic_bool *workers_stop; pthread_mutex_t last_comp_time_mutex; long double last_comp_time; @@ -43,7 +43,7 @@ struct worker_t { pthread_mutex_t has_job_mutex; int buf_index; struct v4l2_buffer buf_info; - bool has_job; + atomic_bool has_job; bool job_failed; long double job_start_time; pthread_cond_t has_job_cond; @@ -61,7 +61,7 @@ struct worker_t { struct workers_pool_t { struct worker_t *workers; - bool *workers_stop; + atomic_bool workers_stop; pthread_mutex_t free_workers_mutex; unsigned free_workers; @@ -71,8 +71,8 @@ struct workers_pool_t { }; struct process_t { - sig_atomic_t volatile stop; - bool slowdown; + atomic_bool stop; + atomic_bool slowdown; }; struct stream_t { @@ -80,7 +80,7 @@ struct stream_t { unsigned width; unsigned height; unsigned captured_fps; - bool updated; + atomic_bool updated; pthread_mutex_t mutex; struct process_t *proc;