mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-02-18 02:55:46 +00:00
refactoring
This commit is contained in:
parent
2964c63249
commit
968df5d82b
@ -74,11 +74,9 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) {
|
||||
if (FD_ISSET(dev->run->fd, &read_fds)) {
|
||||
LOG_DEBUG("Frame is ready, waiting for workers ...");
|
||||
|
||||
assert(!pthread_mutex_lock(&pool.has_free_workers_mutex));
|
||||
while (!pool.has_free_workers) {
|
||||
assert(!pthread_cond_wait(&pool.has_free_workers_cond, &pool.has_free_workers_mutex));
|
||||
}
|
||||
assert(!pthread_mutex_unlock(&pool.has_free_workers_mutex));
|
||||
A_PTHREAD_M_LOCK(&pool.has_free_workers_mutex);
|
||||
A_PTHREAD_C_WAIT_TRUE(pool.has_free_workers, &pool.has_free_workers_cond, &pool.has_free_workers_mutex);
|
||||
A_PTHREAD_M_UNLOCK(&pool.has_free_workers_mutex);
|
||||
|
||||
struct v4l2_buffer buf_info;
|
||||
|
||||
@ -109,10 +107,10 @@ void capture_loop(struct device_t *dev, sig_atomic_t *volatile global_stop) {
|
||||
LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index);
|
||||
pool.workers[buf_info.index].ctx.buf_info = buf_info;
|
||||
|
||||
assert(!pthread_mutex_lock(&pool.workers[buf_info.index].has_job_mutex));
|
||||
A_PTHREAD_M_LOCK(&pool.workers[buf_info.index].has_job_mutex);
|
||||
pool.workers[buf_info.index].has_job = true;
|
||||
assert(!pthread_mutex_unlock(&pool.workers[buf_info.index].has_job_mutex));
|
||||
assert(!pthread_cond_signal(&pool.workers[buf_info.index].has_job_cond));
|
||||
A_PTHREAD_M_UNLOCK(&pool.workers[buf_info.index].has_job_mutex);
|
||||
A_PTHREAD_C_SIGNAL(&pool.workers[buf_info.index].has_job_cond);
|
||||
|
||||
pass_frame:
|
||||
{} // FIXME: for future mjpg support
|
||||
@ -183,14 +181,14 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p
|
||||
LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers);
|
||||
|
||||
*pool->workers_stop = false;
|
||||
assert((pool->workers = calloc(dev->run->n_buffers, sizeof(*pool->workers))));
|
||||
A_CALLOC(pool->workers, dev->run->n_buffers, sizeof(*pool->workers));
|
||||
|
||||
assert(!pthread_mutex_init(&pool->has_free_workers_mutex, NULL));
|
||||
assert(!pthread_cond_init(&pool->has_free_workers_cond, NULL));
|
||||
A_PTHREAD_M_INIT(&pool->has_free_workers_mutex);
|
||||
A_PTHREAD_C_INIT(&pool->has_free_workers_cond);
|
||||
|
||||
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
|
||||
assert(!pthread_mutex_init(&pool->workers[index].has_job_mutex, NULL));
|
||||
assert(!pthread_cond_init(&pool->workers[index].has_job_cond, NULL));
|
||||
A_PTHREAD_M_INIT(&pool->workers[index].has_job_mutex);
|
||||
A_PTHREAD_C_INIT(&pool->workers[index].has_job_cond);
|
||||
|
||||
pool->workers[index].ctx.index = index;
|
||||
pool->workers[index].ctx.dev = dev;
|
||||
@ -208,12 +206,7 @@ static void _capture_init_workers(struct device_t *dev, struct workers_pool_t *p
|
||||
pool->workers[index].ctx.has_free_workers = &pool->has_free_workers;
|
||||
pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond;
|
||||
|
||||
assert(!pthread_create(
|
||||
&pool->workers[index].tid,
|
||||
NULL,
|
||||
_capture_worker_thread,
|
||||
(void *)&pool->workers[index].ctx
|
||||
));
|
||||
A_PTHREAD_CREATE(&pool->workers[index].tid, _capture_worker_thread, (void *)&pool->workers[index].ctx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -223,17 +216,15 @@ static void *_capture_worker_thread(void *v_ctx_ptr) {
|
||||
LOG_INFO("Hello! I am a worker #%d ^_^", ctx->index);
|
||||
|
||||
while (!*ctx->global_stop && !*ctx->workers_stop) {
|
||||
assert(!pthread_mutex_lock(ctx->has_free_workers_mutex));
|
||||
A_PTHREAD_M_LOCK(ctx->has_free_workers_mutex);
|
||||
*ctx->has_free_workers = true;
|
||||
assert(!pthread_mutex_unlock(ctx->has_free_workers_mutex));
|
||||
assert(!pthread_cond_signal(ctx->has_free_workers_cond));
|
||||
A_PTHREAD_M_UNLOCK(ctx->has_free_workers_mutex);
|
||||
A_PTHREAD_C_SIGNAL(ctx->has_free_workers_cond);
|
||||
|
||||
LOG_DEBUG("Worker %d waiting for a new job ...", ctx->index);
|
||||
assert(!pthread_mutex_lock(ctx->has_job_mutex));
|
||||
while (!*ctx->has_job) {
|
||||
assert(!pthread_cond_wait(ctx->has_job_cond, ctx->has_job_mutex));
|
||||
}
|
||||
assert(!pthread_mutex_unlock(ctx->has_job_mutex));
|
||||
A_PTHREAD_M_LOCK(ctx->has_job_mutex);
|
||||
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
|
||||
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex);
|
||||
|
||||
if (!*ctx->workers_stop) {
|
||||
int compressed;
|
||||
@ -259,9 +250,9 @@ static void *_capture_worker_thread(void *v_ctx_ptr) {
|
||||
last_comp_time = 0;
|
||||
}
|
||||
|
||||
assert(!pthread_mutex_lock(ctx->last_comp_time_mutex));
|
||||
A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex);
|
||||
*ctx->last_comp_time = last_comp_time;
|
||||
assert(!pthread_mutex_unlock(ctx->last_comp_time_mutex));
|
||||
A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex);
|
||||
|
||||
LOG_INFO("Compressed JPEG size=%d; time=%LG (worker %d)", compressed, last_comp_time, ctx->index);
|
||||
}
|
||||
@ -276,18 +267,18 @@ static void _capture_destroy_workers(struct device_t *dev, struct workers_pool_t
|
||||
if (pool->workers) {
|
||||
*pool->workers_stop = true;
|
||||
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
|
||||
assert(!pthread_mutex_lock(&pool->workers[index].has_job_mutex));
|
||||
A_PTHREAD_M_LOCK(&pool->workers[index].has_job_mutex);
|
||||
pool->workers[index].has_job = true; // Final job: die
|
||||
assert(!pthread_mutex_unlock(&pool->workers[index].has_job_mutex));
|
||||
assert(!pthread_cond_signal(&pool->workers[index].has_job_cond));
|
||||
A_PTHREAD_M_UNLOCK(&pool->workers[index].has_job_mutex);
|
||||
A_PTHREAD_C_SIGNAL(&pool->workers[index].has_job_cond);
|
||||
|
||||
assert(!pthread_join(pool->workers[index].tid, NULL));
|
||||
assert(!pthread_mutex_destroy(&pool->workers[index].has_job_mutex));
|
||||
assert(!pthread_cond_destroy(&pool->workers[index].has_job_cond));
|
||||
A_PTHREAD_JOIN(pool->workers[index].tid);
|
||||
A_PTHREAD_M_DESTROY(&pool->workers[index].has_job_mutex);
|
||||
A_PTHREAD_C_DESTROY(&pool->workers[index].has_job_cond);
|
||||
}
|
||||
|
||||
assert(!pthread_cond_destroy(&pool->has_free_workers_cond));
|
||||
assert(!pthread_mutex_destroy(&pool->has_free_workers_mutex));
|
||||
A_PTHREAD_M_DESTROY(&pool->has_free_workers_mutex);
|
||||
A_PTHREAD_C_DESTROY(&pool->has_free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@
|
||||
#include <strings.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <assert.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/param.h>
|
||||
@ -333,8 +332,7 @@ static int _device_open_mmap(struct device_t *dev) {
|
||||
|
||||
LOG_DEBUG("Allocating device buffers ...");
|
||||
|
||||
assert((dev->run->buffers = calloc(req.count, sizeof(*dev->run->buffers))));
|
||||
|
||||
A_CALLOC(dev->run->buffers, req.count, sizeof(*dev->run->buffers));
|
||||
for (dev->run->n_buffers = 0; dev->run->n_buffers < req.count; ++dev->run->n_buffers) {
|
||||
struct v4l2_buffer buf;
|
||||
|
||||
@ -381,13 +379,13 @@ static int _device_open_queue_buffers(struct device_t *dev) {
|
||||
static void _device_open_alloc_picbufs(struct device_t *dev) {
|
||||
LOG_DEBUG("Allocating picture buffers ...");
|
||||
|
||||
assert((dev->run->pictures = calloc(dev->run->n_buffers, sizeof(*dev->run->pictures))));
|
||||
A_CALLOC(dev->run->pictures, dev->run->n_buffers, sizeof(*dev->run->pictures));
|
||||
|
||||
unsigned picture_size = dev->run->width * dev->run->height << 1;
|
||||
|
||||
for (unsigned index = 0; index < dev->run->n_buffers; ++index) {
|
||||
LOG_DEBUG("Allocating picture buffer %d ...", index);
|
||||
assert((dev->run->pictures[index] = malloc(picture_size)));
|
||||
A_MALLOC(dev->run->pictures[index], picture_size);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -73,7 +73,7 @@ int jpeg_compress_buffer(struct device_t *dev, int index) {
|
||||
unsigned char *line_buffer;
|
||||
int written = -1;
|
||||
|
||||
assert((line_buffer = calloc(dev->run->width * 3, sizeof(unsigned char))));
|
||||
A_CALLOC(line_buffer, dev->run->width * 3, sizeof(unsigned char));
|
||||
|
||||
jpeg.err = jpeg_std_error(&jpeg_error);
|
||||
jpeg_create_compress(&jpeg);
|
||||
|
||||
@ -36,7 +36,6 @@ static const struct option _long_opts[] = {
|
||||
{NULL, 0, NULL, 0},
|
||||
};
|
||||
|
||||
|
||||
static void _help(int exit_code) {
|
||||
printf("No manual yet\n");
|
||||
exit(exit_code);
|
||||
@ -90,7 +89,6 @@ static void _parse_options(int argc, char *argv[], struct device_t *dev) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct threads_context {
|
||||
struct device_t *dev;
|
||||
sig_atomic_t *volatile global_stop;
|
||||
@ -139,8 +137,8 @@ int main(int argc, char *argv[]) {
|
||||
device_init(&dev, &run);
|
||||
_parse_options(argc, argv, &dev);
|
||||
|
||||
pthread_create(&capture_loop_tid, NULL, _capture_loop_thread, (void *)&ctx);
|
||||
pthread_join(capture_loop_tid, NULL);
|
||||
A_PTHREAD_CREATE(&capture_loop_tid, _capture_loop_thread, (void *)&ctx);
|
||||
A_PTHREAD_JOIN(capture_loop_tid);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
22
src/tools.h
22
src/tools.h
@ -1,11 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdbool.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <math.h>
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
#include <assert.h>
|
||||
#include <sys/ioctl.h>
|
||||
@ -46,7 +48,25 @@ bool debug;
|
||||
printf("-- ERROR " INNER_LOG_PL " -- " _x_msg ": %s\n", INNER_LOG_PARAMS, ##__VA_ARGS__, _buf); }
|
||||
|
||||
|
||||
#define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj))
|
||||
#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg))
|
||||
#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL))
|
||||
|
||||
#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL))
|
||||
#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex))
|
||||
#define A_PTHREAD_M_LOCK(...) assert(!pthread_mutex_lock(__VA_ARGS__))
|
||||
#define A_PTHREAD_M_UNLOCK(...) assert(!pthread_mutex_unlock(__VA_ARGS__))
|
||||
|
||||
#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
|
||||
#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
|
||||
#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
|
||||
#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) \
|
||||
{ while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
|
||||
|
||||
|
||||
#define A_CALLOC(_dest, _nmemb, _size) assert((_dest = calloc(_nmemb, _size)))
|
||||
#define A_MALLOC(_dest, _size) assert((_dest = malloc(_size)));
|
||||
#define MEMSET_ZERO(_x_obj) memset(&(_x_obj), 0, sizeof(_x_obj))
|
||||
|
||||
|
||||
#define INLINE inline __attribute__((always_inline))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user