threading skel

This commit is contained in:
Devaev Maxim 2018-09-16 05:36:26 +03:00
parent 65f61cdae9
commit be3fff5483
6 changed files with 125 additions and 28 deletions

View File

@ -1,6 +1,6 @@
CC = gcc
CFLAGS = -c -Wall -Wextra
LDFLAGS = -ljpeg
CFLAGS = -c -O3 -Wall -Wextra -pthread
LDFLAGS = -ljpeg -pthread
SOURCES = $(shell ls src/*.c)
OBJECTS = $(SOURCES:.c=.o)
PROG = ustreamer

View File

@ -1,29 +1,39 @@
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <assert.h>
#include <sys/select.h>
#include <linux/videodev2.h>
#include "tools.h"
#include "device.h"
#include "jpeg.h"
#include "capture.h"
static int _capture_init_loop(struct device *dev);
static int _capture_init(struct device *dev);
static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static int _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop);
static void *_capture_worker_thread(void *index);
static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool);
static int _capture_control(struct device *dev, const bool enable);
static int _capture_grab_buffer(struct device *dev, struct v4l2_buffer *buf);
static int _capture_release_buffer(struct device *dev, struct v4l2_buffer *buf);
static int _capture_handle_event(struct device *dev);
void capture_loop(struct device *dev) {
void capture_loop(struct device *dev, sig_atomic_t *volatile stop) {
struct workers_pool pool;
MEMSET_ZERO(pool);
LOG_INFO("Using V4L2 device: %s", dev->path);
LOG_INFO("Using JPEG quality: %d%%", dev->jpeg_quality);
while (_capture_init_loop(dev) == 0) {
while (_capture_init_loop(dev, &pool, stop) == 0) {
int frames_count = 0;
while (!(*dev->stop)) {
while (!(*stop)) {
SEP_DEBUG('-');
fd_set read_fds;
@ -112,15 +122,17 @@ void capture_loop(struct device *dev) {
}
}
}
_capture_destroy_workers(dev, &pool);
_capture_control(dev, false);
device_close(dev);
}
static int _capture_init_loop(struct device *dev) {
static int _capture_init_loop(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
int retval = -1;
while (!(*dev->stop)) {
if ((retval = _capture_init(dev)) < 0) {
while (!(*stop)) {
if ((retval = _capture_init(dev, pool, stop)) < 0) {
LOG_INFO("Sleeping %d seconds before new capture init ...", dev->error_timeout);
sleep(dev->error_timeout);
} else {
@ -130,9 +142,10 @@ static int _capture_init_loop(struct device *dev) {
return retval;
}
static int _capture_init(struct device *dev) {
static int _capture_init(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
SEP_INFO('=');
_capture_destroy_workers(dev, pool);
_capture_control(dev, false);
device_close(dev);
@ -142,6 +155,9 @@ static int _capture_init(struct device *dev) {
if (_capture_control(dev, true) < 0) {
goto error;
}
if (_capture_init_workers(dev, pool, stop) < 0) {
goto error;
}
return 0;
@ -150,6 +166,66 @@ static int _capture_init(struct device *dev) {
return -1;
}
static int _capture_init_workers(struct device *dev, struct workers_pool *pool, sig_atomic_t *volatile stop) {
LOG_INFO("Spawning %d workers ...", dev->run->n_buffers);
pool->workers = NULL;
if ((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers))) == NULL) {
LOG_PERROR("Can't allocate workers pool");
return -1;
}
assert(!pthread_mutex_init(&pool->busy_mutex, NULL));
assert(!pthread_cond_init(&pool->busy_cond, NULL));
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(!pthread_mutex_init(&pool->workers[index].busy_mutex, NULL));
assert(!pthread_cond_init(&pool->workers[index].busy_cond, NULL));
pool->workers[index].params.index = index;
pool->workers[index].params.pool = pool;
pool->workers[index].params.dev = dev;
pool->workers[index].params.stop = stop;
assert(!pthread_create(
&pool->workers[index].tid,
NULL,
_capture_worker_thread,
(void *)&pool->workers[index].params
));
}
LOG_DEBUG("Spawned %d workers", dev->run->n_buffers);
return 0;
}
static void *_capture_worker_thread(void *params_ptr) {
struct worker_params params = *(struct worker_params *)params_ptr;
LOG_INFO("Hello! I am a worker #%d ^_^", params.index);
while (!*(params.stop));
LOG_INFO("Bye");
return NULL;
}
static void _capture_destroy_workers(struct device *dev, struct workers_pool *pool) {
LOG_DEBUG("Destroying JPEG workers ...");
if (pool->workers) {
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
assert(!pthread_join(pool->workers[index].tid, NULL));
assert(!pthread_cond_destroy(&pool->workers[index].busy_cond));
assert(!pthread_mutex_destroy(&pool->workers[index].busy_mutex));
}
assert(!pthread_cond_destroy(&pool->busy_cond));
assert(!pthread_mutex_destroy(&pool->busy_mutex));
free(pool->workers);
}
pool->workers = NULL;
}
static int _capture_control(struct device *dev, const bool enable) {
if (enable != dev->run->capturing) {
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;

View File

@ -1,6 +1,30 @@
#pragma once
#include <signal.h>
#include <pthread.h>
#include "device.h"
void capture_loop(struct device *dev);
struct worker_params {
int index;
struct workers_pool *pool;
struct device *dev;
sig_atomic_t *volatile stop;
};
struct worker {
pthread_t tid;
pthread_mutex_t busy_mutex;
pthread_cond_t busy_cond;
struct worker_params params;
};
struct workers_pool {
struct worker *workers;
pthread_mutex_t busy_mutex;
pthread_cond_t busy_cond;
};
void capture_loop(struct device *dev, sig_atomic_t *volatile stop);

View File

@ -48,7 +48,7 @@ static const char *_format_to_string_null(const unsigned format);
static const char *_standard_to_string(const v4l2_std_id standard);
void device_init(struct device *dev, struct device_runtime *run, bool *const stop) {
void device_init(struct device *dev, struct device_runtime *run) {
LOG_DEBUG("Initializing a new device struct ...");
memset(dev, 0, sizeof(struct device));
@ -66,7 +66,6 @@ void device_init(struct device *dev, struct device_runtime *run, bool *const sto
dev->run = run;
dev->run->fd = -1;
dev->stop = stop;
LOG_DEBUG("We have a clear device!");
}
@ -336,7 +335,7 @@ static int _device_open_mmap(struct device *dev) {
dev->run->buffers = NULL;
if ((dev->run->buffers = calloc(req.count, sizeof(*dev->run->buffers))) == NULL) {
LOG_PERROR("Can't allocate device buffers");
LOG_PERROR("Can't allocate device buffers pool");
return -1;
}
@ -388,14 +387,15 @@ static int _device_open_alloc_picbufs(struct device *dev) {
dev->run->pictures = NULL;
if ((dev->run->pictures = calloc(dev->run->n_buffers, sizeof(*dev->run->pictures))) == NULL) {
LOG_PERROR("Can't allocate picture buffers");
LOG_PERROR("Can't allocate picture buffers pool");
return -1;
}
dev->run->picture_size = dev->run->width * dev->run->height << 1;
unsigned picture_size = dev->run->width * dev->run->height << 1;
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
LOG_DEBUG("Allocating picture buffer %d ...", index);
if ((dev->run->pictures[index] = malloc(dev->run->picture_size)) == NULL) {
if ((dev->run->pictures[index] = malloc(picture_size)) == NULL) {
LOG_PERROR("Can't allocate picture buffer %d", index);
return -1;
}

View File

@ -5,8 +5,8 @@
#include <linux/videodev2.h>
#define FORMAT_UNKNOWN -1
#define STANDARD_UNKNOWN V4L2_STD_UNKNOWN
#define FORMAT_UNKNOWN -1
#define STANDARD_UNKNOWN V4L2_STD_UNKNOWN
struct buffer {
@ -21,7 +21,6 @@ struct device_runtime {
unsigned format;
unsigned n_buffers;
struct buffer *buffers;
unsigned picture_size;
unsigned char **pictures;
bool capturing;
};
@ -41,11 +40,10 @@ struct device {
unsigned error_timeout;
struct device_runtime *run;
bool *stop;
};
void device_init(struct device *dev, struct device_runtime *run, bool *const stop);
void device_init(struct device *dev, struct device_runtime *run);
int device_parse_format(const char *const str);
v4l2_std_id device_parse_standard(const char *const str);

View File

@ -1,6 +1,5 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
@ -84,11 +83,11 @@ static void _parse_options(int argc, char *argv[], struct device *dev) {
}
static bool _global_stop = false;
static volatile sig_atomic_t _global_stop = 0;
static void _interrupt_handler(int signum) {
LOG_INFO("===== Stopping by %s =====", strsignal(signum));
_global_stop = true;
_global_stop = 1;
}
@ -96,7 +95,7 @@ int main(int argc, char *argv[]) {
struct device dev;
struct device_runtime run;
device_init(&dev, &run, &_global_stop);
device_init(&dev, &run);
_parse_options(argc, argv, &dev);
LOG_INFO("Installing SIGINT handler ...");
@ -105,6 +104,6 @@ int main(int argc, char *argv[]) {
LOG_INFO("Installing SIGTERM handler ...");
signal(SIGTERM, _interrupt_handler);
capture_loop(&dev);
capture_loop(&dev, (sig_atomic_t *volatile)&_global_stop);
return 0;
}