From 231f36038adf4d45d48e981c47c10ae93631c72c Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Sun, 16 Sep 2018 11:52:56 +0300 Subject: [PATCH] continue threading --- Makefile | 9 +-- src/capture.c | 185 ++++++++++++++++++++++++++++++++------------------ src/capture.h | 43 ++++++++---- src/device.c | 41 +++++------ src/device.h | 16 ++--- src/jpeg.c | 2 +- src/jpeg.h | 2 +- src/main.c | 76 +++++++++++++++------ src/tools.c | 28 -------- src/tools.h | 69 +++++++++++++++++-- 10 files changed, 301 insertions(+), 170 deletions(-) delete mode 100644 src/tools.c diff --git a/Makefile b/Makefile index 67cada4..9c66bb1 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ +LIBS = -lm -ljpeg -pthread CC = gcc -CFLAGS = -c -O3 -Wall -Wextra -pthread -LDFLAGS = -ljpeg -pthread +CFLAGS = -c -O3 -Wall -Wextra +LDFLAGS = SOURCES = $(shell ls src/*.c) OBJECTS = $(SOURCES:.c=.o) PROG = ustreamer @@ -10,11 +11,11 @@ all: $(SOURCES) $(PROG) $(PROG): $(OBJECTS) - $(CC) $(LDFLAGS) $(OBJECTS) -o $@ + $(CC) $(LIBS) $(LDFLAGS) $(OBJECTS) -o $@ .c.o: - $(CC) $(CFLAGS) $< -o $@ + $(CC) $(LIBS) $(CFLAGS) $< -o $@ clean: diff --git a/src/capture.c b/src/capture.c index c9b5218..536febc 100644 --- a/src/capture.c +++ b/src/capture.c @@ -11,29 +11,31 @@ #include "capture.h" -static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop); -static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop); -static void _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop); -static void *_capture_worker_thread(void *index); -static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool); -static int _capture_control(struct device *dev, const bool enable); -static int _capture_grab_buffer(struct device *dev, struct v4l2_buffer *buf); -static int _capture_release_buffer(struct device *dev, struct v4l2_buffer *buf); -static int _capture_handle_event(struct device *dev); +static int _capture_init_loop(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop); +static int _capture_init(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop); +static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop); +static void *_capture_worker_thread(void *v_ctx_ptr); +static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t *pool); +static int _capture_control(struct device_t *dev, const bool enable); +static int _capture_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); +static int _capture_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); +static int _capture_handle_event(struct device_t *dev); -void capture_loop(struct device *dev, sig_atomic_t *volatile stop) { - struct workers_pool pool; +void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) { + struct workers_pool_t pool; + volatile sig_atomic_t workers_stop; MEMSET_ZERO(pool); + pool.workers_stop = (sig_atomic_t *volatile)&workers_stop; LOG_INFO("Using V4L2 device: %s", dev->path); LOG_INFO("Using JPEG quality: %d%%", dev->jpeg_quality); - while (_capture_init_loop(dev, &pool, stop) == 0) { + while (_capture_init_loop(dev, &pool, global_stop) == 0) { int frames_count = 0; - while (!(*stop)) { + while (!*global_stop) { SEP_DEBUG('-'); fd_set read_fds; @@ -69,11 +71,17 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) { } else { if (FD_ISSET(dev->run->fd, &read_fds)) { - LOG_DEBUG("Frame ready ..."); + LOG_DEBUG("Frame is ready, waiting for workers ..."); - struct v4l2_buffer buf; + assert(pthread_mutex_lock(&pool.has_free_workers_mutex) == 0); + while (!pool.has_free_workers) { + assert(pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex) == 0); + } + assert(pthread_mutex_unlock(&pool.has_free_workers_mutex) == 0); - if (_capture_grab_buffer(dev, &buf) < 0) { + struct v4l2_buffer buf_info; + + if (_capture_grab_buffer(dev, &buf_info) < 0) { break; } @@ -92,20 +100,25 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) { // The good thing is such frames are quite small compared to the regular pictures. // For example a VGA (640x480) webcam picture is normally >= 8kByte large, // corrupted frames are smaller. - if (buf.bytesused < dev->min_frame_size) { - LOG_DEBUG("Dropping too small frame sized %d bytes, assuming it as broken", buf.bytesused); + if (buf_info.bytesused < dev->min_frame_size) { + LOG_DEBUG("Dropping too small frame sized %d bytes, assuming it as broken", buf_info.bytesused); goto pass_frame; } - LOG_DEBUG("Grabbed a new frame"); - jpeg_compress_buffer(dev, buf.index); - //usleep(100000); // TODO: process dev->run->buffers[buf.index].start, buf.bytesused + LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index); + pool.workers[buf_info.index].ctx.buf_info = buf_info; + + assert(pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex) == 0); + pool.workers[buf_info.index].has_job = true; + assert(pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex) == 0); + assert(pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond) == 0); pass_frame: + {} // FIXME: for future mjpg support - if (_capture_release_buffer(dev, &buf) < 0) { + /*if (_capture_release_buffer(dev, &buf_info) < 0) { break; - } + }*/ } if (FD_ISSET(dev->run->fd, &write_fds)) { @@ -128,11 +141,12 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) { device_close(dev); } -static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) { +static int _capture_init_loop(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) { int retval = -1; - while (!(*stop)) { - if ((retval = _capture_init(dev, pool, stop)) < 0) { + LOG_DEBUG("%s: global_stop = %d", __FUNCTION__, *global_stop); + while (!*global_stop) { + if ((retval = _capture_init(dev, pool, global_stop)) < 0) { LOG_INFO("Sleeping %d seconds before new capture init ...", dev->error_timeout); sleep(dev->error_timeout); } else { @@ -142,7 +156,7 @@ static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig return retval; } -static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) { +static int _capture_init(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) { SEP_INFO('='); _capture_destroy_workers(dev, pool); @@ -155,7 +169,7 @@ static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atom if (_capture_control(dev, true) < 0) { goto error; } - _capture_init_workers(dev, pool, stop); + _capture_init_workers(dev, pool, global_stop); return 0; @@ -164,62 +178,99 @@ static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atom return -1; } -static void _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) { - LOG_INFO("Spawning %d workers ...", dev->run->n_buffers); +static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) { + LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers); + *pool->workers_stop = false; assert((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers)))); - assert(!pthread_mutex_init(&pool->busy_mutex, NULL)); - assert(!pthread_cond_init(&pool->busy_cond, NULL)); + assert(pthread_mutex_init(&pool->has_free_workers_mutex, NULL) == 0); + assert(pthread_cond_init(&pool->has_free_workers_cond, NULL) == 0); for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - assert(!pthread_mutex_init(&pool->workers[index].busy_mutex, NULL)); - assert(!pthread_cond_init(&pool->workers[index].busy_cond, NULL)); + assert(pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL) == 0); + assert(pthread_cond_init(&pool->workers[index].has_job_cond, NULL) == 0); - pool->workers[index].params.index = index; - pool->workers[index].params.pool = pool; - pool->workers[index].params.dev = dev; - pool->workers[index].params.stop = stop; + pool->workers[index].ctx.index = index; + pool->workers[index].ctx.dev = dev; + pool->workers[index].ctx.global_stop = global_stop; + pool->workers[index].ctx.workers_stop = pool->workers_stop; - assert(!pthread_create( + pool->workers[index].ctx.has_job_mutex = &pool->workers[index].has_job_mutex; + pool->workers[index].ctx.has_job = &pool->workers[index].has_job; + pool->workers[index].ctx.has_job_cond = &pool->workers[index].has_job_cond; + + pool->workers[index].ctx.has_free_workers_mutex = &pool->has_free_workers_mutex; + pool->workers[index].ctx.has_free_workers = &pool->has_free_workers; + pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond; + + assert(pthread_create( &pool->workers[index].tid, NULL, _capture_worker_thread, - (void *)&pool->workers[index].params - )); + (void *)&pool->workers[index].ctx + ) == 0); } - - LOG_DEBUG("Spawned %d workers", dev->run->n_buffers); } -static void *_capture_worker_thread(void *params_ptr) { - struct worker_params params = *(struct worker_params *)params_ptr; +static void *_capture_worker_thread(void *v_ctx_ptr) { + struct worker_context_t *ctx = (struct worker_context_t *)v_ctx_ptr; - LOG_INFO("Hello! I am a worker #%d ^_^", params.index); - while (!*(params.stop)); - LOG_INFO("Bye"); + LOG_INFO("Hello! I am a worker #%d ^_^", ctx->index); + while (!*ctx->global_stop && !*ctx->workers_stop) { + assert(pthread_mutex_lock(ctx->has_free_workers_mutex) == 0); + *ctx->has_free_workers = true; + assert(pthread_mutex_unlock(ctx->has_free_workers_mutex) == 0); + assert(pthread_cond_signal(ctx->has_free_workers_cond) == 0); + + LOG_INFO("Worker %d waiting for a new job ...", ctx->index); + assert(pthread_mutex_lock(ctx->has_job_mutex) == 0); + while (!(*ctx->has_job)) { + assert(pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex) == 0); + } + assert(pthread_mutex_unlock(ctx->has_job_mutex) == 0); + + if (!*ctx->workers_stop) { + LOG_INFO("Worker %d compressing JPEG ...", ctx->index); + + int compressed = jpeg_compress_buffer(ctx->dev, ctx->index); // FIXME + + LOG_INFO("Compressed JPEG size = %d bytes (worker %d)", compressed, ctx->index); + + assert(_capture_release_buffer(ctx->dev, &ctx->buf_info) == 0); // FIXME + *ctx->has_job = false; + } + } + + LOG_INFO("Bye-bye (worker %d)", ctx->index); return NULL; } -static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool) { - LOG_DEBUG("Destroying JPEG workers ..."); +static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) { + LOG_INFO("Destroying workers ..."); if (pool->workers) { + *pool->workers_stop = true; for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - assert(!pthread_join(pool->workers[index].tid, NULL)); - assert(!pthread_cond_destroy(&pool->workers[index].busy_cond)); - assert(!pthread_mutex_destroy(&pool->workers[index].busy_mutex)); + assert(pthread_mutex_lock(&pool->workers[index].has_job_mutex) == 0); + pool->workers[index].has_job = true; // Final job: die + assert(pthread_mutex_unlock(&pool->workers[index].has_job_mutex) == 0); + assert(pthread_cond_signal(&pool->workers[index].has_job_cond) == 0); + + assert(pthread_join(pool->workers[index].tid, NULL) == 0); + assert(pthread_mutex_destroy(&pool->workers[index].has_job_mutex) == 0); + assert(pthread_cond_destroy(&pool->workers[index].has_job_cond) == 0); } - assert(!pthread_cond_destroy(&pool->busy_cond)); - assert(!pthread_mutex_destroy(&pool->busy_mutex)); + assert(pthread_cond_destroy(&pool->has_free_workers_cond) == 0); + assert(pthread_mutex_destroy(&pool->has_free_workers_mutex) == 0); free(pool->workers); } pool->workers = NULL; } -static int _capture_control(struct device *dev, const bool enable) { +static int _capture_control(struct device_t *dev, const bool enable) { if (enable != dev->run->capturing) { enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; @@ -237,35 +288,35 @@ static int _capture_control(struct device *dev, const bool enable) { return 0; } -static int _capture_grab_buffer(struct device *dev, struct v4l2_buffer *buf) { - memset(buf, 0, sizeof(struct v4l2_buffer)); - buf->type = V4L2_BUF_TYPE_VIDEO_CAPTURE; - buf->memory = V4L2_MEMORY_MMAP; +static int _capture_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) { + memset(buf_info, 0, sizeof(struct v4l2_buffer)); + buf_info->type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf_info->memory = V4L2_MEMORY_MMAP; LOG_DEBUG("Calling ioctl(VIDIOC_DQBUF) ..."); - if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf) < 0) { + if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf_info) < 0) { LOG_PERROR("Unable to dequeue buffer"); return -1; } - LOG_DEBUG("Got a new frame in buffer index=%d; bytesused=%d", buf->index, buf->bytesused); - if (buf->index >= dev->run->n_buffers) { - LOG_ERROR("Got invalid buffer index=%d; nbuffers=%d", buf->index, dev->run->n_buffers); + LOG_DEBUG("Got a new frame in buffer index=%d; bytesused=%d", buf_info->index, buf_info->bytesused); + if (buf_info->index >= dev->run->n_buffers) { + LOG_ERROR("Got invalid buffer index=%d; nbuffers=%d", buf_info->index, dev->run->n_buffers); return -1; } return 0; } -static int _capture_release_buffer(struct device *dev, struct v4l2_buffer *buf) { +static int _capture_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) { LOG_DEBUG("Calling ioctl(VIDIOC_QBUF) ..."); - if (xioctl(dev->run->fd, VIDIOC_QBUF, buf) < 0) { + if (xioctl(dev->run->fd, VIDIOC_QBUF, buf_info) < 0) { LOG_PERROR("Unable to requeue buffer"); return -1; } return 0; } -static int _capture_handle_event(struct device *dev) { +static int _capture_handle_event(struct device_t *dev) { struct v4l2_event event; LOG_DEBUG("Calling ioctl(VIDIOC_DQEVENT) ..."); diff --git a/src/capture.h b/src/capture.h index b702418..2a6a091 100644 --- a/src/capture.h +++ b/src/capture.h @@ -1,30 +1,47 @@ #pragma once +#include +#include #include #include +#include #include "device.h" -struct worker_params { +struct worker_context_t { int index; - struct workers_pool *pool; - struct device *dev; - sig_atomic_t *volatile stop; + struct device_t *dev; + struct v4l2_buffer buf_info; + sig_atomic_t *volatile global_stop; + sig_atomic_t *volatile workers_stop; + + pthread_mutex_t *has_job_mutex; + bool *has_job; + pthread_cond_t *has_job_cond; + + pthread_mutex_t *has_free_workers_mutex; + bool *has_free_workers; + pthread_cond_t *has_free_workers_cond; }; -struct worker { +struct worker_t { + struct worker_context_t ctx; pthread_t tid; - pthread_mutex_t busy_mutex; - pthread_cond_t busy_cond; - struct worker_params params; + + pthread_mutex_t has_job_mutex; + bool has_job; + pthread_cond_t has_job_cond; }; -struct workers_pool { - struct worker *workers; - pthread_mutex_t busy_mutex; - pthread_cond_t busy_cond; +struct workers_pool_t { + struct worker_t *workers; + sig_atomic_t *volatile workers_stop; + + pthread_mutex_t has_free_workers_mutex; + bool has_free_workers; + pthread_cond_t has_free_workers_cond; }; -void capture_loop(struct device *dev, sig_atomic_t *volatile stop); +void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop); diff --git a/src/device.c b/src/device.c index 21470f3..871a63c 100644 --- a/src/device.c +++ b/src/device.c @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "tools.h" @@ -36,31 +37,31 @@ static const struct { }; -static int _device_open_check_cap(struct device *dev); -static int _device_open_dv_timings(struct device *dev); -static int _device_apply_dv_timings(struct device *dev); -static int _device_open_format(struct device *dev); -static void _device_open_alloc_picbufs(struct device *dev); -static int _device_open_mmap(struct device *dev); -static int _device_open_queue_buffers(struct device *dev); +static int _device_open_check_cap(struct device_t *dev); +static int _device_open_dv_timings(struct device_t *dev); +static int _device_apply_dv_timings(struct device_t *dev); +static int _device_open_format(struct device_t *dev); +static void _device_open_alloc_picbufs(struct device_t *dev); +static int _device_open_mmap(struct device_t *dev); +static int _device_open_queue_buffers(struct device_t *dev); static const char *_format_to_string_auto(char *buf, const size_t length, const unsigned format); static const char *_format_to_string_null(const unsigned format); static const char *_standard_to_string(const v4l2_std_id standard); -void device_init(struct device *dev, struct device_runtime *run) { +void device_init(struct device_t *dev, struct device_runtime_t *run) { LOG_DEBUG("Initializing a new device struct ..."); - memset(dev, 0, sizeof(struct device)); - memset(run, 0, sizeof(struct device_runtime)); + memset(dev, 0, sizeof(struct device_t)); + memset(run, 0, sizeof(struct device_runtime_t)); dev->path = (char *)DEFAULT_DEVICE; dev->width = 640; dev->height = 480; dev->format = V4L2_PIX_FMT_YUYV; dev->standard = V4L2_STD_UNKNOWN; - dev->n_buffers = 4; + dev->n_buffers = MAX(sysconf(_SC_NPROCESSORS_ONLN), 1); dev->jpeg_quality = 80; dev->timeout = 1; dev->error_timeout = 1; @@ -88,7 +89,7 @@ v4l2_std_id device_parse_standard(const char *const str) { return STANDARD_UNKNOWN; } -int device_open(struct device *dev) { +int device_open(struct device_t *dev) { if ((dev->run->fd = open(dev->path, O_RDWR|O_NONBLOCK)) < 0) { LOG_PERROR("Can't open device"); goto error; @@ -120,7 +121,7 @@ int device_open(struct device *dev) { return -1; } -void device_close(struct device *dev) { +void device_close(struct device_t *dev) { if (dev->run->pictures) { LOG_DEBUG("Releasing picture buffers ..."); for (unsigned index = 0; index < dev->run->n_buffers && dev->run->pictures[index]; ++index) { @@ -155,7 +156,7 @@ void device_close(struct device *dev) { } } -static int _device_open_check_cap(struct device *dev) { +static int _device_open_check_cap(struct device_t *dev) { struct v4l2_capability cap; MEMSET_ZERO(cap); @@ -188,7 +189,7 @@ static int _device_open_check_cap(struct device *dev) { return 0; } -static int _device_open_dv_timings(struct device *dev) { +static int _device_open_dv_timings(struct device_t *dev) { if (dev->dv_timings) { LOG_DEBUG("Using DV-timings"); @@ -214,7 +215,7 @@ static int _device_open_dv_timings(struct device *dev) { return 0; } -static int _device_apply_dv_timings(struct device *dev) { +static int _device_apply_dv_timings(struct device_t *dev) { struct v4l2_dv_timings dv_timings; MEMSET_ZERO(dv_timings); @@ -250,7 +251,7 @@ static int _device_apply_dv_timings(struct device *dev) { return 0; } -static int _device_open_format(struct device *dev) { +static int _device_open_format(struct device_t *dev) { struct v4l2_format fmt; MEMSET_ZERO(fmt); @@ -309,7 +310,7 @@ static int _device_open_format(struct device *dev) { return 0; } -static int _device_open_mmap(struct device *dev) { +static int _device_open_mmap(struct device_t *dev) { struct v4l2_requestbuffers req; MEMSET_ZERO(req); @@ -359,7 +360,7 @@ static int _device_open_mmap(struct device *dev) { return 0; } -static int _device_open_queue_buffers(struct device *dev) { +static int _device_open_queue_buffers(struct device_t *dev) { for (unsigned index = 0; index < dev->run->n_buffers; ++index) { struct v4l2_buffer buf; @@ -377,7 +378,7 @@ static int _device_open_queue_buffers(struct device *dev) { return 0; } -static void _device_open_alloc_picbufs(struct device *dev) { +static void _device_open_alloc_picbufs(struct device_t *dev) { LOG_DEBUG("Allocating picture buffers ..."); assert((dev->run->pictures = calloc(dev->run->n_buffers, sizeof(*dev->run->pictures)))); diff --git a/src/device.h b/src/device.h index a235c76..539bfce 100644 --- a/src/device.h +++ b/src/device.h @@ -9,23 +9,23 @@ #define STANDARD_UNKNOWN V4L2_STD_UNKNOWN -struct buffer { +struct buffer_t { void *start; size_t length; }; -struct device_runtime { +struct device_runtime_t { int fd; unsigned width; unsigned height; unsigned format; unsigned n_buffers; - struct buffer *buffers; + struct buffer_t *buffers; unsigned char **pictures; bool capturing; }; -struct device { +struct device_t { char *path; unsigned width; unsigned height; @@ -39,14 +39,14 @@ struct device { unsigned timeout; unsigned error_timeout; - struct device_runtime *run; + struct device_runtime_t *run; }; -void device_init(struct device *dev, struct device_runtime *run); +void device_init(struct device_t *dev, struct device_runtime_t *run); int device_parse_format(const char *const str); v4l2_std_id device_parse_standard(const char *const str); -int device_open(struct device *dev); -void device_close(struct device *dev); +int device_open(struct device_t *dev); +void device_close(struct device_t *dev); diff --git a/src/jpeg.c b/src/jpeg.c index 9d877f5..c7d9024 100644 --- a/src/jpeg.c +++ b/src/jpeg.c @@ -65,7 +65,7 @@ static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg); static void _jpeg_term_destination(j_compress_ptr jpeg); -int jpeg_compress_buffer(struct device *dev, int index) { +int jpeg_compress_buffer(struct device_t *dev, int index) { // This function based on compress_image_to_jpeg() from mjpg-streamer struct jpeg_compress_struct jpeg; diff --git a/src/jpeg.h b/src/jpeg.h index 3e5acee..5f6e541 100644 --- a/src/jpeg.h +++ b/src/jpeg.h @@ -5,4 +5,4 @@ #include "device.h" -int jpeg_compress_buffer(struct device *dev, int index); +int jpeg_compress_buffer(struct device_t *dev, int index); diff --git a/src/main.c b/src/main.c index 45bc06b..5a9794a 100644 --- a/src/main.c +++ b/src/main.c @@ -8,14 +8,16 @@ #include #include #include +#include #include +#include #include "tools.h" #include "device.h" #include "capture.h" -static const char _short_opts[] = "hd:f:s:e:tb:q:"; +static const char _short_opts[] = "hd:f:s:e:tb:n:q:"; static const struct option _long_opts[] = { {"help", no_argument, NULL, 'h'}, {"device", required_argument, NULL, 'd'}, @@ -40,17 +42,17 @@ static void _help(int exit_code) { exit(exit_code); } -static void _parse_options(int argc, char *argv[], struct device *dev) { +static void _parse_options(int argc, char *argv[], struct device_t *dev) { # define OPT_ARG(_dest) \ { _dest = optarg; break; } # define OPT_TRUE(_dest) \ { _dest = true; break; } -# define OPT_UNSIGNED(_dest, _name) \ +# define OPT_UNSIGNED(_dest, _name, _min) \ { int _tmp = strtol(optarg, NULL, 0); \ - if (errno || _tmp < 0) \ - { printf("Invalid value for: %s\n", _name); exit(EXIT_FAILURE); } \ + if (errno || _tmp < _min) \ + { printf("Invalid value for '%s=%u'; minimal=%u\n", _name, _tmp, _min); exit(EXIT_FAILURE); } \ _dest = _tmp; break; } # define OPT_PARSE(_dest, _func, _invalid, _name) \ @@ -71,15 +73,16 @@ static void _parse_options(int argc, char *argv[], struct device *dev) { case 'f': OPT_PARSE(dev->format, device_parse_format, FORMAT_UNKNOWN, "pixel format"); # pragma GCC diagnostic pop case 's': OPT_PARSE(dev->standard, device_parse_standard, STANDARD_UNKNOWN, "TV standard"); - case 'e': OPT_UNSIGNED(dev->every_frame, "--every-frame"); - case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size"); + case 'e': OPT_UNSIGNED(dev->every_frame, "--every-frame", 0); + case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0); case 't': OPT_TRUE(dev->dv_timings); - case 'b': OPT_UNSIGNED(dev->n_buffers, "--buffers"); - case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality"); - case 1000: OPT_UNSIGNED(dev->width, "--width"); - case 1001: OPT_UNSIGNED(dev->height, "--height"); - case 1002: OPT_UNSIGNED(dev->timeout, "--timeout"); - case 1003: OPT_UNSIGNED(dev->error_timeout, "--error-timeout"); + case 'n': + case 'b': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1); + case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality", 1); + case 1000: OPT_UNSIGNED(dev->width, "--width", 320); + case 1001: OPT_UNSIGNED(dev->height, "--height", 180); + case 1002: OPT_UNSIGNED(dev->timeout, "--timeout", 1); + case 1003: OPT_UNSIGNED(dev->error_timeout, "--error-timeout", 1); case 5000: OPT_TRUE(debug); case 'h': _help(EXIT_SUCCESS); break; default: _help(EXIT_FAILURE); break; @@ -88,6 +91,24 @@ static void _parse_options(int argc, char *argv[], struct device *dev) { } +struct threads_context { + struct device_t *dev; + sig_atomic_t *volatile global_stop; +}; + +static void *_capture_loop_thread(void *v_ctx_ptr) { + struct threads_context *ctx = (struct threads_context *)v_ctx_ptr; + sigset_t mask; + + assert(sigemptyset(&mask) == 0); + assert(sigaddset(&mask, SIGINT) == 0); + assert(sigaddset(&mask, SIGTERM) == 0); + assert(pthread_sigmask(SIG_BLOCK, &mask, NULL) == 0); + + capture_loop(ctx->dev, (sig_atomic_t *volatile)ctx->global_stop); + return NULL; +} + static volatile sig_atomic_t _global_stop = 0; static void _interrupt_handler(int signum) { @@ -95,20 +116,31 @@ static void _interrupt_handler(int signum) { _global_stop = 1; } - int main(int argc, char *argv[]) { - struct device dev; - struct device_runtime run; + struct device_t dev; + struct device_runtime_t run; + + pthread_t capture_loop_tid; + struct threads_context ctx = {&dev, (sig_atomic_t *volatile)&_global_stop}; + struct sigaction sig_act; + + MEMSET_ZERO(sig_act); + assert(sigemptyset(&sig_act.sa_mask) == 0); + sig_act.sa_handler = _interrupt_handler; + assert(sigaddset(&sig_act.sa_mask, SIGINT) == 0); + assert(sigaddset(&sig_act.sa_mask, SIGTERM) == 0); + + LOG_INFO("Installing SIGINT handler ..."); + assert(sigaction(SIGINT, &sig_act, NULL) == 0); + + LOG_INFO("Installing SIGTERM handler ..."); + assert(sigaction(SIGTERM, &sig_act, NULL) == 0); device_init(&dev, &run); _parse_options(argc, argv, &dev); - LOG_INFO("Installing SIGINT handler ..."); - signal(SIGINT, _interrupt_handler); + pthread_create(&capture_loop_tid, NULL, _capture_loop_thread, (void *)&ctx); + pthread_join(capture_loop_tid, NULL); - LOG_INFO("Installing SIGTERM handler ..."); - signal(SIGTERM, _interrupt_handler); - - capture_loop(&dev, (sig_atomic_t *volatile)&_global_stop); return 0; } diff --git a/src/tools.c b/src/tools.c deleted file mode 100644 index 13abc9b..0000000 --- a/src/tools.c +++ /dev/null @@ -1,28 +0,0 @@ -#include -#include -#include - -#include "tools.h" - - -int xioctl(const int fd, const int request, void *arg) { - int retries = XIOCTL_RETRIES; - int retval = -1; - - do { - retval = ioctl(fd, request, arg); - } while ( - retval - && retries-- - && ( - errno == EINTR - || errno == EAGAIN - || errno == ETIMEDOUT - ) - ); - - if (retval && retries <= 0) { - LOG_PERROR("ioctl(%d) retried %d times; giving up", request, XIOCTL_RETRIES); - } - return retval; -} diff --git a/src/tools.h b/src/tools.h index eae3f00..65bebed 100644 --- a/src/tools.h +++ b/src/tools.h @@ -3,7 +3,14 @@ #include #include #include +#include #include +#include +#include +#include +#include +#include +#include bool debug; @@ -12,25 +19,75 @@ bool debug; #define SEP_INFO(_x_ch) \ { for (int _i = 0; _i < 80; ++_i) putchar(_x_ch); putchar('\n'); } +#define INNER_LOG_MK_PARAMS \ + time_t _sec; long _msec; now_ms(&_sec, &_msec); pid_t _tid = syscall(SYS_gettid); + +#define INNER_LOG_PARAMS _sec, _msec, _tid + +#define INNER_LOG_PL "[%ld.%03ld tid=%d]" + #define LOG_INFO(_x_msg, ...) \ - printf("-- INFO -- " _x_msg "\n", ##__VA_ARGS__) + { INNER_LOG_MK_PARAMS; \ + printf("-- INFO " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); } #define LOG_DEBUG(_x_msg, ...) \ - if (debug) { printf(" DEBUG -- " _x_msg "\n", ##__VA_ARGS__); } + { if (debug) { INNER_LOG_MK_PARAMS; \ + printf("-- DEBUG " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); } } #define SEP_DEBUG(_x_ch) \ - if (debug) { SEP_INFO(_x_ch); } + { if (debug) { SEP_INFO(_x_ch); } } #define LOG_ERROR(_x_msg, ...) \ - printf("** ERROR -- " _x_msg "\n", ##__VA_ARGS__) + { INNER_LOG_MK_PARAMS; \ + printf("-- ERROR " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); } #define LOG_PERROR(_x_msg, ...) \ - printf("** ERROR -- " _x_msg ": %s\n", ##__VA_ARGS__, strerror(errno)) + { INNER_LOG_MK_PARAMS; char _buf[1024]; strerror_r(errno, _buf, 1024); \ + printf("-- ERROR " INNER_LOG_PL " -- " _x_msg ": %s\n", INNER_LOG_PARAMS, ##__VA_ARGS__, _buf); } #define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj)) +#define INLINE inline __attribute__((always_inline)) + #define XIOCTL_RETRIES 4 -int xioctl(const int fd, const int request, void *arg); +INLINE void now(struct timespec *spec) { + assert(!clock_gettime(CLOCK_REALTIME, spec)); +} + +INLINE void now_ms(time_t *sec, long *msec) { + struct timespec spec; + + now(&spec); + *sec = spec.tv_sec; + *msec = round(spec.tv_nsec / 1.0e6); + + if (*msec > 999) { + *sec += 1; + *msec = 0; + } +} + +INLINE int xioctl(const int fd, const int request, void *arg) { + int retries = XIOCTL_RETRIES; + int retval = -1; + + do { + retval = ioctl(fd, request, arg); + } while ( + retval + && retries-- + && ( + errno == EINTR + || errno == EAGAIN + || errno == ETIMEDOUT + ) + ); + + if (retval && retries <= 0) { + LOG_PERROR("ioctl(%d) retried %d times; giving up", request, XIOCTL_RETRIES); + } + return retval; +}