continue threading

This commit is contained in:
Devaev Maxim 2018-09-16 11:52:56 +03:00
parent 1d3c767617
commit 231f36038a
10 changed files with 301 additions and 170 deletions

View File

@ -1,6 +1,7 @@
LIBS = -lm -ljpeg -pthread
CC = gcc
CFLAGS = -c -O3 -Wall -Wextra -pthread
LDFLAGS = -ljpeg -pthread
CFLAGS = -c -O3 -Wall -Wextra
LDFLAGS =
SOURCES = $(shell ls src/*.c)
OBJECTS = $(SOURCES:.c=.o)
PROG = ustreamer
@ -10,11 +11,11 @@ all: $(SOURCES) $(PROG)
$(PROG): $(OBJECTS)
$(CC) $(LDFLAGS) $(OBJECTS) -o $@
$(CC) $(LIBS) $(LDFLAGS) $(OBJECTS) -o $@
.c.o:
$(CC) $(CFLAGS) $< -o $@
$(CC) $(LIBS) $(CFLAGS) $< -o $@
clean:

View File

@ -11,29 +11,31 @@
#include "capture.h"
static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static void _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static void *_capture_worker_thread(void *index);
static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool);
static int _capture_control(struct device *dev, const bool enable);
static int _capture_grab_buffer(struct device *dev, struct v4l2_buffer *buf);
static int _capture_release_buffer(struct device *dev, struct v4l2_buffer *buf);
static int _capture_handle_event(struct device *dev);
static int _capture_init_loop(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop);
static int _capture_init(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop);
static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop);
static void *_capture_worker_thread(void *v_ctx_ptr);
static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t *pool);
static int _capture_control(struct device_t *dev, const bool enable);
static int _capture_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
static int _capture_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
static int _capture_handle_event(struct device_t *dev);
void capture_loop(struct device *dev, sig_atomic_t *volatile stop) {
struct workers_pool pool;
void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) {
struct workers_pool_t pool;
volatile sig_atomic_t workers_stop;
MEMSET_ZERO(pool);
pool.workers_stop = (sig_atomic_t *volatile)&workers_stop;
LOG_INFO("Using V4L2 device: %s", dev->path);
LOG_INFO("Using JPEG quality: %d%%", dev->jpeg_quality);
while (_capture_init_loop(dev, &pool, stop) == 0) {
while (_capture_init_loop(dev, &pool, global_stop) == 0) {
int frames_count = 0;
while (!(*stop)) {
while (!*global_stop) {
SEP_DEBUG('-');
fd_set read_fds;
@ -69,11 +71,17 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) {
} else {
if (FD_ISSET(dev->run->fd, &read_fds)) {
LOG_DEBUG("Frame ready ...");
LOG_DEBUG("Frame is ready, waiting for workers ...");
struct v4l2_buffer buf;
assert(pthread_mutex_lock(&pool.has_free_workers_mutex) == 0);
while (!pool.has_free_workers) {
assert(pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex) == 0);
}
assert(pthread_mutex_unlock(&pool.has_free_workers_mutex) == 0);
if (_capture_grab_buffer(dev, &buf) < 0) {
struct v4l2_buffer buf_info;
if (_capture_grab_buffer(dev, &buf_info) < 0) {
break;
}
@ -92,20 +100,25 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) {
// The good thing is such frames are quite small compared to the regular pictures.
// For example a VGA (640x480) webcam picture is normally >= 8kByte large,
// corrupted frames are smaller.
if (buf.bytesused < dev->min_frame_size) {
LOG_DEBUG("Dropping too small frame sized %d bytes, assuming it as broken", buf.bytesused);
if (buf_info.bytesused < dev->min_frame_size) {
LOG_DEBUG("Dropping too small frame sized %d bytes, assuming it as broken", buf_info.bytesused);
goto pass_frame;
}
LOG_DEBUG("Grabbed a new frame");
jpeg_compress_buffer(dev, buf.index);
//usleep(100000); // TODO: process dev->run->buffers[buf.index].start, buf.bytesused
LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index);
pool.workers[buf_info.index].ctx.buf_info = buf_info;
assert(pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex) == 0);
pool.workers[buf_info.index].has_job = true;
assert(pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex) == 0);
assert(pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond) == 0);
pass_frame:
{} // FIXME: for future mjpg support
if (_capture_release_buffer(dev, &buf) < 0) {
/*if (_capture_release_buffer(dev, &buf_info) < 0) {
break;
}
}*/
}
if (FD_ISSET(dev->run->fd, &write_fds)) {
@ -128,11 +141,12 @@ void capture_loop(struct device *dev, sig_atomic_t *volatile stop) {
device_close(dev);
}
static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
static int _capture_init_loop(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) {
int retval = -1;
while (!(*stop)) {
if ((retval = _capture_init(dev, pool, stop)) < 0) {
LOG_DEBUG("%s: global_stop = %d", __FUNCTION__, *global_stop);
while (!*global_stop) {
if ((retval = _capture_init(dev, pool, global_stop)) < 0) {
LOG_INFO("Sleeping %d seconds before new capture init ...", dev->error_timeout);
sleep(dev->error_timeout);
} else {
@ -142,7 +156,7 @@ static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig
return retval;
}
static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
static int _capture_init(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) {
SEP_INFO('=');
_capture_destroy_workers(dev, pool);
@ -155,7 +169,7 @@ static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atom
if (_capture_control(dev, true) < 0) {
goto error;
}
_capture_init_workers(dev, pool, stop);
_capture_init_workers(dev, pool, global_stop);
return 0;
@ -164,62 +178,99 @@ static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atom
return -1;
}
static void _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
LOG_INFO("Spawning %d workers ...", dev->run->n_buffers);
static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *pool, sig_atomic_t *volatile global_stop) {
LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers);
*pool->workers_stop = false;
assert((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers))));
assert(!pthread_mutex_init(&pool->busy_mutex, NULL));
assert(!pthread_cond_init(&pool->busy_cond, NULL));
assert(pthread_mutex_init(&pool->has_free_workers_mutex, NULL) == 0);
assert(pthread_cond_init(&pool->has_free_workers_cond, NULL) == 0);
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(!pthread_mutex_init(&pool->workers[index].busy_mutex, NULL));
assert(!pthread_cond_init(&pool->workers[index].busy_cond, NULL));
assert(pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL) == 0);
assert(pthread_cond_init(&pool->workers[index].has_job_cond, NULL) == 0);
pool->workers[index].params.index = index;
pool->workers[index].params.pool = pool;
pool->workers[index].params.dev = dev;
pool->workers[index].params.stop = stop;
pool->workers[index].ctx.index = index;
pool->workers[index].ctx.dev = dev;
pool->workers[index].ctx.global_stop = global_stop;
pool->workers[index].ctx.workers_stop = pool->workers_stop;
assert(!pthread_create(
pool->workers[index].ctx.has_job_mutex = &pool->workers[index].has_job_mutex;
pool->workers[index].ctx.has_job = &pool->workers[index].has_job;
pool->workers[index].ctx.has_job_cond = &pool->workers[index].has_job_cond;
pool->workers[index].ctx.has_free_workers_mutex = &pool->has_free_workers_mutex;
pool->workers[index].ctx.has_free_workers = &pool->has_free_workers;
pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond;
assert(pthread_create(
&pool->workers[index].tid,
NULL,
_capture_worker_thread,
(void *)&pool->workers[index].params
));
(void *)&pool->workers[index].ctx
) == 0);
}
LOG_DEBUG("Spawned %d workers", dev->run->n_buffers);
}
static void *_capture_worker_thread(void *params_ptr) {
struct worker_params params = *(struct worker_params *)params_ptr;
static void *_capture_worker_thread(void *v_ctx_ptr) {
struct worker_context_t *ctx = (struct worker_context_t *)v_ctx_ptr;
LOG_INFO("Hello! I am a worker #%d ^_^", params.index);
while (!*(params.stop));
LOG_INFO("Bye");
LOG_INFO("Hello! I am a worker #%d ^_^", ctx->index);
while (!*ctx->global_stop && !*ctx->workers_stop) {
assert(pthread_mutex_lock(ctx->has_free_workers_mutex) == 0);
*ctx->has_free_workers = true;
assert(pthread_mutex_unlock(ctx->has_free_workers_mutex) == 0);
assert(pthread_cond_signal(ctx->has_free_workers_cond) == 0);
LOG_INFO("Worker %d waiting for a new job ...", ctx->index);
assert(pthread_mutex_lock(ctx->has_job_mutex) == 0);
while (!(*ctx->has_job)) {
assert(pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex) == 0);
}
assert(pthread_mutex_unlock(ctx->has_job_mutex) == 0);
if (!*ctx->workers_stop) {
LOG_INFO("Worker %d compressing JPEG ...", ctx->index);
int compressed = jpeg_compress_buffer(ctx->dev, ctx->index); // FIXME
LOG_INFO("Compressed JPEG size = %d bytes (worker %d)", compressed, ctx->index);
assert(_capture_release_buffer(ctx->dev, &ctx->buf_info) == 0); // FIXME
*ctx->has_job = false;
}
}
LOG_INFO("Bye-bye (worker %d)", ctx->index);
return NULL;
}
static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool) {
LOG_DEBUG("Destroying JPEG workers ...");
static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) {
LOG_INFO("Destroying workers ...");
if (pool->workers) {
*pool->workers_stop = true;
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(!pthread_join(pool->workers[index].tid, NULL));
assert(!pthread_cond_destroy(&pool->workers[index].busy_cond));
assert(!pthread_mutex_destroy(&pool->workers[index].busy_mutex));
assert(pthread_mutex_lock(&pool->workers[index].has_job_mutex) == 0);
pool->workers[index].has_job = true; // Final job: die
assert(pthread_mutex_unlock(&pool->workers[index].has_job_mutex) == 0);
assert(pthread_cond_signal(&pool->workers[index].has_job_cond) == 0);
assert(pthread_join(pool->workers[index].tid, NULL) == 0);
assert(pthread_mutex_destroy(&pool->workers[index].has_job_mutex) == 0);
assert(pthread_cond_destroy(&pool->workers[index].has_job_cond) == 0);
}
assert(!pthread_cond_destroy(&pool->busy_cond));
assert(!pthread_mutex_destroy(&pool->busy_mutex));
assert(pthread_cond_destroy(&pool->has_free_workers_cond) == 0);
assert(pthread_mutex_destroy(&pool->has_free_workers_mutex) == 0);
free(pool->workers);
}
pool->workers = NULL;
}
static int _capture_control(struct device *dev, const bool enable) {
static int _capture_control(struct device_t *dev, const bool enable) {
if (enable != dev->run->capturing) {
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
@ -237,35 +288,35 @@ static int _capture_control(struct device *dev, const bool enable) {
return 0;
}
static int _capture_grab_buffer(struct device *dev, struct v4l2_buffer *buf) {
memset(buf, 0, sizeof(struct v4l2_buffer));
buf->type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf->memory = V4L2_MEMORY_MMAP;
static int _capture_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
memset(buf_info, 0, sizeof(struct v4l2_buffer));
buf_info->type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf_info->memory = V4L2_MEMORY_MMAP;
LOG_DEBUG("Calling ioctl(VIDIOC_DQBUF) ...");
if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf) < 0) {
if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf_info) < 0) {
LOG_PERROR("Unable to dequeue buffer");
return -1;
}
LOG_DEBUG("Got a new frame in buffer index=%d; bytesused=%d", buf->index, buf->bytesused);
if (buf->index >= dev->run->n_buffers) {
LOG_ERROR("Got invalid buffer index=%d; nbuffers=%d", buf->index, dev->run->n_buffers);
LOG_DEBUG("Got a new frame in buffer index=%d; bytesused=%d", buf_info->index, buf_info->bytesused);
if (buf_info->index >= dev->run->n_buffers) {
LOG_ERROR("Got invalid buffer index=%d; nbuffers=%d", buf_info->index, dev->run->n_buffers);
return -1;
}
return 0;
}
static int _capture_release_buffer(struct device *dev, struct v4l2_buffer *buf) {
static int _capture_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
LOG_DEBUG("Calling ioctl(VIDIOC_QBUF) ...");
if (xioctl(dev->run->fd, VIDIOC_QBUF, buf) < 0) {
if (xioctl(dev->run->fd, VIDIOC_QBUF, buf_info) < 0) {
LOG_PERROR("Unable to requeue buffer");
return -1;
}
return 0;
}
static int _capture_handle_event(struct device *dev) {
static int _capture_handle_event(struct device_t *dev) {
struct v4l2_event event;
LOG_DEBUG("Calling ioctl(VIDIOC_DQEVENT) ...");

View File

@ -1,30 +1,47 @@
#pragma once
#include <signal.h>
#include <stdbool.h>
#include <signal.h>
#include <pthread.h>
#include <linux/videodev2.h>
#include "device.h"
struct worker_params {
struct worker_context_t {
int index;
struct workers_pool *pool;
struct device *dev;
sig_atomic_t *volatile stop;
struct device_t *dev;
struct v4l2_buffer buf_info;
sig_atomic_t *volatile global_stop;
sig_atomic_t *volatile workers_stop;
pthread_mutex_t *has_job_mutex;
bool *has_job;
pthread_cond_t *has_job_cond;
pthread_mutex_t *has_free_workers_mutex;
bool *has_free_workers;
pthread_cond_t *has_free_workers_cond;
};
struct worker {
struct worker_t {
struct worker_context_t ctx;
pthread_t tid;
pthread_mutex_t busy_mutex;
pthread_cond_t busy_cond;
struct worker_params params;
pthread_mutex_t has_job_mutex;
bool has_job;
pthread_cond_t has_job_cond;
};
struct workers_pool {
struct worker *workers;
pthread_mutex_t busy_mutex;
pthread_cond_t busy_cond;
struct workers_pool_t {
struct worker_t *workers;
sig_atomic_t *volatile workers_stop;
pthread_mutex_t has_free_workers_mutex;
bool has_free_workers;
pthread_cond_t has_free_workers_cond;
};
void capture_loop(struct device *dev, sig_atomic_t *volatile stop);
void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop);

View File

@ -8,6 +8,7 @@
#include <assert.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/param.h>
#include <linux/videodev2.h>
#include "tools.h"
@ -36,31 +37,31 @@ static const struct {
};
static int _device_open_check_cap(struct device *dev);
static int _device_open_dv_timings(struct device *dev);
static int _device_apply_dv_timings(struct device *dev);
static int _device_open_format(struct device *dev);
static void _device_open_alloc_picbufs(struct device *dev);
static int _device_open_mmap(struct device *dev);
static int _device_open_queue_buffers(struct device *dev);
static int _device_open_check_cap(struct device_t *dev);
static int _device_open_dv_timings(struct device_t *dev);
static int _device_apply_dv_timings(struct device_t *dev);
static int _device_open_format(struct device_t *dev);
static void _device_open_alloc_picbufs(struct device_t *dev);
static int _device_open_mmap(struct device_t *dev);
static int _device_open_queue_buffers(struct device_t *dev);
static const char *_format_to_string_auto(char *buf, const size_t length, const unsigned format);
static const char *_format_to_string_null(const unsigned format);
static const char *_standard_to_string(const v4l2_std_id standard);
void device_init(struct device *dev, struct device_runtime *run) {
void device_init(struct device_t *dev, struct device_runtime_t *run) {
LOG_DEBUG("Initializing a new device struct ...");
memset(dev, 0, sizeof(struct device));
memset(run, 0, sizeof(struct device_runtime));
memset(dev, 0, sizeof(struct device_t));
memset(run, 0, sizeof(struct device_runtime_t));
dev->path = (char *)DEFAULT_DEVICE;
dev->width = 640;
dev->height = 480;
dev->format = V4L2_PIX_FMT_YUYV;
dev->standard = V4L2_STD_UNKNOWN;
dev->n_buffers = 4;
dev->n_buffers = MAX(sysconf(_SC_NPROCESSORS_ONLN), 1);
dev->jpeg_quality = 80;
dev->timeout = 1;
dev->error_timeout = 1;
@ -88,7 +89,7 @@ v4l2_std_id device_parse_standard(const char *const str) {
return STANDARD_UNKNOWN;
}
int device_open(struct device *dev) {
int device_open(struct device_t *dev) {
if ((dev->run->fd = open(dev->path, O_RDWR|O_NONBLOCK)) < 0) {
LOG_PERROR("Can't open device");
goto error;
@ -120,7 +121,7 @@ int device_open(struct device *dev) {
return -1;
}
void device_close(struct device *dev) {
void device_close(struct device_t *dev) {
if (dev->run->pictures) {
LOG_DEBUG("Releasing picture buffers ...");
for (unsigned index = 0; index < dev->run->n_buffers && dev->run->pictures[index]; ++index) {
@ -155,7 +156,7 @@ void device_close(struct device *dev) {
}
}
static int _device_open_check_cap(struct device *dev) {
static int _device_open_check_cap(struct device_t *dev) {
struct v4l2_capability cap;
MEMSET_ZERO(cap);
@ -188,7 +189,7 @@ static int _device_open_check_cap(struct device *dev) {
return 0;
}
static int _device_open_dv_timings(struct device *dev) {
static int _device_open_dv_timings(struct device_t *dev) {
if (dev->dv_timings) {
LOG_DEBUG("Using DV-timings");
@ -214,7 +215,7 @@ static int _device_open_dv_timings(struct device *dev) {
return 0;
}
static int _device_apply_dv_timings(struct device *dev) {
static int _device_apply_dv_timings(struct device_t *dev) {
struct v4l2_dv_timings dv_timings;
MEMSET_ZERO(dv_timings);
@ -250,7 +251,7 @@ static int _device_apply_dv_timings(struct device *dev) {
return 0;
}
static int _device_open_format(struct device *dev) {
static int _device_open_format(struct device_t *dev) {
struct v4l2_format fmt;
MEMSET_ZERO(fmt);
@ -309,7 +310,7 @@ static int _device_open_format(struct device *dev) {
return 0;
}
static int _device_open_mmap(struct device *dev) {
static int _device_open_mmap(struct device_t *dev) {
struct v4l2_requestbuffers req;
MEMSET_ZERO(req);
@ -359,7 +360,7 @@ static int _device_open_mmap(struct device *dev) {
return 0;
}
static int _device_open_queue_buffers(struct device *dev) {
static int _device_open_queue_buffers(struct device_t *dev) {
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
struct v4l2_buffer buf;
@ -377,7 +378,7 @@ static int _device_open_queue_buffers(struct device *dev) {
return 0;
}
static void _device_open_alloc_picbufs(struct device *dev) {
static void _device_open_alloc_picbufs(struct device_t *dev) {
LOG_DEBUG("Allocating picture buffers ...");
assert((dev->run->pictures = calloc(dev->run->n_buffers, sizeof(*dev->run->pictures))));

View File

@ -9,23 +9,23 @@
#define STANDARD_UNKNOWN V4L2_STD_UNKNOWN
struct buffer {
struct buffer_t {
void *start;
size_t length;
};
struct device_runtime {
struct device_runtime_t {
int fd;
unsigned width;
unsigned height;
unsigned format;
unsigned n_buffers;
struct buffer *buffers;
struct buffer_t *buffers;
unsigned char **pictures;
bool capturing;
};
struct device {
struct device_t {
char *path;
unsigned width;
unsigned height;
@ -39,14 +39,14 @@ struct device {
unsigned timeout;
unsigned error_timeout;
struct device_runtime *run;
struct device_runtime_t *run;
};
void device_init(struct device *dev, struct device_runtime *run);
void device_init(struct device_t *dev, struct device_runtime_t *run);
int device_parse_format(const char *const str);
v4l2_std_id device_parse_standard(const char *const str);
int device_open(struct device *dev);
void device_close(struct device *dev);
int device_open(struct device_t *dev);
void device_close(struct device_t *dev);

View File

@ -65,7 +65,7 @@ static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg);
static void _jpeg_term_destination(j_compress_ptr jpeg);
int jpeg_compress_buffer(struct device *dev, int index) {
int jpeg_compress_buffer(struct device_t *dev, int index) {
// This function based on compress_image_to_jpeg() from mjpg-streamer
struct jpeg_compress_struct jpeg;

View File

@ -5,4 +5,4 @@
#include "device.h"
int jpeg_compress_buffer(struct device *dev, int index);
int jpeg_compress_buffer(struct device_t *dev, int index);

View File

@ -8,14 +8,16 @@
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#include <getopt.h>
#include <sys/param.h>
#include "tools.h"
#include "device.h"
#include "capture.h"
static const char _short_opts[] = "hd:f:s:e:tb:q:";
static const char _short_opts[] = "hd:f:s:e:tb:n:q:";
static const struct option _long_opts[] = {
{"help", no_argument, NULL, 'h'},
{"device", required_argument, NULL, 'd'},
@ -40,17 +42,17 @@ static void _help(int exit_code) {
exit(exit_code);
}
static void _parse_options(int argc, char *argv[], struct device *dev) {
static void _parse_options(int argc, char *argv[], struct device_t *dev) {
# define OPT_ARG(_dest) \
{ _dest = optarg; break; }
# define OPT_TRUE(_dest) \
{ _dest = true; break; }
# define OPT_UNSIGNED(_dest, _name) \
# define OPT_UNSIGNED(_dest, _name, _min) \
{ int _tmp = strtol(optarg, NULL, 0); \
if (errno || _tmp < 0) \
{ printf("Invalid value for: %s\n", _name); exit(EXIT_FAILURE); } \
if (errno || _tmp < _min) \
{ printf("Invalid value for '%s=%u'; minimal=%u\n", _name, _tmp, _min); exit(EXIT_FAILURE); } \
_dest = _tmp; break; }
# define OPT_PARSE(_dest, _func, _invalid, _name) \
@ -71,15 +73,16 @@ static void _parse_options(int argc, char *argv[], struct device *dev) {
case 'f': OPT_PARSE(dev->format, device_parse_format, FORMAT_UNKNOWN, "pixel format");
# pragma GCC diagnostic pop
case 's': OPT_PARSE(dev->standard, device_parse_standard, STANDARD_UNKNOWN, "TV standard");
case 'e': OPT_UNSIGNED(dev->every_frame, "--every-frame");
case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size");
case 'e': OPT_UNSIGNED(dev->every_frame, "--every-frame", 0);
case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0);
case 't': OPT_TRUE(dev->dv_timings);
case 'b': OPT_UNSIGNED(dev->n_buffers, "--buffers");
case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality");
case 1000: OPT_UNSIGNED(dev->width, "--width");
case 1001: OPT_UNSIGNED(dev->height, "--height");
case 1002: OPT_UNSIGNED(dev->timeout, "--timeout");
case 1003: OPT_UNSIGNED(dev->error_timeout, "--error-timeout");
case 'n':
case 'b': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1);
case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality", 1);
case 1000: OPT_UNSIGNED(dev->width, "--width", 320);
case 1001: OPT_UNSIGNED(dev->height, "--height", 180);
case 1002: OPT_UNSIGNED(dev->timeout, "--timeout", 1);
case 1003: OPT_UNSIGNED(dev->error_timeout, "--error-timeout", 1);
case 5000: OPT_TRUE(debug);
case 'h': _help(EXIT_SUCCESS); break;
default: _help(EXIT_FAILURE); break;
@ -88,6 +91,24 @@ static void _parse_options(int argc, char *argv[], struct device *dev) {
}
struct threads_context {
struct device_t *dev;
sig_atomic_t *volatile global_stop;
};
static void *_capture_loop_thread(void *v_ctx_ptr) {
struct threads_context *ctx = (struct threads_context *)v_ctx_ptr;
sigset_t mask;
assert(sigemptyset(&mask) == 0);
assert(sigaddset(&mask, SIGINT) == 0);
assert(sigaddset(&mask, SIGTERM) == 0);
assert(pthread_sigmask(SIG_BLOCK, &mask, NULL) == 0);
capture_loop(ctx->dev, (sig_atomic_t *volatile)ctx->global_stop);
return NULL;
}
static volatile sig_atomic_t _global_stop = 0;
static void _interrupt_handler(int signum) {
@ -95,20 +116,31 @@ static void _interrupt_handler(int signum) {
_global_stop = 1;
}
int main(int argc, char *argv[]) {
struct device dev;
struct device_runtime run;
struct device_t dev;
struct device_runtime_t run;
pthread_t capture_loop_tid;
struct threads_context ctx = {&dev, (sig_atomic_t *volatile)&_global_stop};
struct sigaction sig_act;
MEMSET_ZERO(sig_act);
assert(sigemptyset(&sig_act.sa_mask) == 0);
sig_act.sa_handler = _interrupt_handler;
assert(sigaddset(&sig_act.sa_mask, SIGINT) == 0);
assert(sigaddset(&sig_act.sa_mask, SIGTERM) == 0);
LOG_INFO("Installing SIGINT handler ...");
assert(sigaction(SIGINT, &sig_act, NULL) == 0);
LOG_INFO("Installing SIGTERM handler ...");
assert(sigaction(SIGTERM, &sig_act, NULL) == 0);
device_init(&dev, &run);
_parse_options(argc, argv, &dev);
LOG_INFO("Installing SIGINT handler ...");
signal(SIGINT, _interrupt_handler);
pthread_create(&capture_loop_tid, NULL, _capture_loop_thread, (void *)&ctx);
pthread_join(capture_loop_tid, NULL);
LOG_INFO("Installing SIGTERM handler ...");
signal(SIGTERM, _interrupt_handler);
capture_loop(&dev, (sig_atomic_t *volatile)&_global_stop);
return 0;
}

View File

@ -1,28 +0,0 @@
#include <string.h>
#include <errno.h>
#include <sys/ioctl.h>
#include "tools.h"
int xioctl(const int fd, const int request, void *arg) {
int retries = XIOCTL_RETRIES;
int retval = -1;
do {
retval = ioctl(fd, request, arg);
} while (
retval
&& retries--
&& (
errno == EINTR
|| errno == EAGAIN
|| errno == ETIMEDOUT
)
);
if (retval && retries <= 0) {
LOG_PERROR("ioctl(%d) retried %d times; giving up", request, XIOCTL_RETRIES);
}
return retval;
}

View File

@ -3,7 +3,14 @@
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <math.h>
#include <time.h>
#include <assert.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/syscall.h>
bool debug;
@ -12,25 +19,75 @@ bool debug;
#define SEP_INFO(_x_ch) \
{ for (int _i = 0; _i < 80; ++_i) putchar(_x_ch); putchar('\n'); }
#define INNER_LOG_MK_PARAMS \
time_t _sec; long _msec; now_ms(&_sec, &_msec); pid_t _tid = syscall(SYS_gettid);
#define INNER_LOG_PARAMS _sec, _msec, _tid
#define INNER_LOG_PL "[%ld.%03ld tid=%d]"
#define LOG_INFO(_x_msg, ...) \
printf("-- INFO -- " _x_msg "\n", ##__VA_ARGS__)
{ INNER_LOG_MK_PARAMS; \
printf("-- INFO " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); }
#define LOG_DEBUG(_x_msg, ...) \
if (debug) { printf(" DEBUG -- " _x_msg "\n", ##__VA_ARGS__); }
{ if (debug) { INNER_LOG_MK_PARAMS; \
printf("-- DEBUG " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); } }
#define SEP_DEBUG(_x_ch) \
if (debug) { SEP_INFO(_x_ch); }
{ if (debug) { SEP_INFO(_x_ch); } }
#define LOG_ERROR(_x_msg, ...) \
printf("** ERROR -- " _x_msg "\n", ##__VA_ARGS__)
{ INNER_LOG_MK_PARAMS; \
printf("-- ERROR " INNER_LOG_PL " -- " _x_msg "\n", INNER_LOG_PARAMS, ##__VA_ARGS__); }
#define LOG_PERROR(_x_msg, ...) \
printf("** ERROR -- " _x_msg ": %s\n", ##__VA_ARGS__, strerror(errno))
{ INNER_LOG_MK_PARAMS; char _buf[1024]; strerror_r(errno, _buf, 1024); \
printf("-- ERROR " INNER_LOG_PL " -- " _x_msg ": %s\n", INNER_LOG_PARAMS, ##__VA_ARGS__, _buf); }
#define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj))
#define INLINE inline __attribute__((always_inline))
#define XIOCTL_RETRIES 4
int xioctl(const int fd, const int request, void *arg);
INLINE void now(struct timespec *spec) {
assert(!clock_gettime(CLOCK_REALTIME, spec));
}
INLINE void now_ms(time_t *sec, long *msec) {
struct timespec spec;
now(&spec);
*sec = spec.tv_sec;
*msec = round(spec.tv_nsec / 1.0e6);
if (*msec > 999) {
*sec += 1;
*msec = 0;
}
}
INLINE int xioctl(const int fd, const int request, void *arg) {
int retries = XIOCTL_RETRIES;
int retval = -1;
do {
retval = ioctl(fd, request, arg);
} while (
retval
&& retries--
&& (
errno == EINTR
|| errno == EAGAIN
|| errno == ETIMEDOUT
)
);
if (retval && retries <= 0) {
LOG_PERROR("ioctl(%d) retried %d times; giving up", request, XIOCTL_RETRIES);
}
return retval;
}