n_workers

This commit is contained in:
Devaev Maxim
2018-09-22 03:52:53 +03:00
parent 0a5bf0411b
commit 4cac722c6d
6 changed files with 155 additions and 143 deletions

View File

@@ -82,6 +82,7 @@ struct device_t *device_init() {
dev->format = V4L2_PIX_FMT_YUYV; dev->format = V4L2_PIX_FMT_YUYV;
dev->standard = V4L2_STD_UNKNOWN; dev->standard = V4L2_STD_UNKNOWN;
dev->n_buffers = max_u(sysconf(_SC_NPROCESSORS_ONLN), 1) + 1; dev->n_buffers = max_u(sysconf(_SC_NPROCESSORS_ONLN), 1) + 1;
dev->n_workers = dev->n_buffers;
dev->jpeg_quality = 80; dev->jpeg_quality = 80;
dev->timeout = 1; dev->timeout = 1;
dev->error_timeout = 1; dev->error_timeout = 1;

View File

@@ -62,6 +62,7 @@ struct device_t {
v4l2_std_id standard; v4l2_std_id standard;
bool dv_timings; bool dv_timings;
unsigned n_buffers; unsigned n_buffers;
unsigned n_workers;
unsigned every_frame; unsigned every_frame;
unsigned min_frame_size; unsigned min_frame_size;
unsigned jpeg_quality; unsigned jpeg_quality;

View File

@@ -37,8 +37,8 @@ pthread_mutex_t log_mutex;
#define LOG_LEVEL_INFO 0 #define LOG_LEVEL_INFO 0
#define LOG_LEVEL_VERBOSE 1 #define LOG_LEVEL_PERF 1
#define LOG_LEVEL_PERF 2 #define LOG_LEVEL_VERBOSE 2
#define LOG_LEVEL_DEBUG 3 #define LOG_LEVEL_DEBUG 3
@@ -88,14 +88,6 @@ pthread_mutex_t log_mutex;
printf("-- INFO [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \ printf("-- INFO [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \
} }
#define LOG_VERBOSE(_x_msg, ...) { \
if (log_level >= LOG_LEVEL_VERBOSE) { \
LOGGING_LOCK; \
printf("-- VERB [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \
LOGGING_UNLOCK; \
} \
}
#define LOG_PERF(_x_msg, ...) { \ #define LOG_PERF(_x_msg, ...) { \
if (log_level >= LOG_LEVEL_PERF) { \ if (log_level >= LOG_LEVEL_PERF) { \
LOGGING_LOCK; \ LOGGING_LOCK; \
@@ -104,6 +96,14 @@ pthread_mutex_t log_mutex;
} \ } \
} }
#define LOG_VERBOSE(_x_msg, ...) { \
if (log_level >= LOG_LEVEL_VERBOSE) { \
LOGGING_LOCK; \
printf("-- VERB [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \
LOGGING_UNLOCK; \
} \
}
#define LOG_DEBUG(_x_msg, ...) { \ #define LOG_DEBUG(_x_msg, ...) { \
if (log_level >= LOG_LEVEL_DEBUG) { \ if (log_level >= LOG_LEVEL_DEBUG) { \
LOGGING_LOCK; \ LOGGING_LOCK; \

View File

@@ -38,7 +38,7 @@
#include "http.h" #include "http.h"
static const char _short_opts[] = "d:x:y:f:a:e:z:tn:q:s:p:h"; static const char _short_opts[] = "d:x:y:f:a:e:z:tn:w:q:s:p:h";
static const struct option _long_opts[] = { static const struct option _long_opts[] = {
{"device", required_argument, NULL, 'd'}, {"device", required_argument, NULL, 'd'},
{"width", required_argument, NULL, 'x'}, {"width", required_argument, NULL, 'x'},
@@ -49,6 +49,7 @@ static const struct option _long_opts[] = {
{"min-frame-size", required_argument, NULL, 'z'}, {"min-frame-size", required_argument, NULL, 'z'},
{"dv-timings", no_argument, NULL, 't'}, {"dv-timings", no_argument, NULL, 't'},
{"buffers", required_argument, NULL, 'n'}, {"buffers", required_argument, NULL, 'n'},
{"workers", required_argument, NULL, 'w'},
{"jpeg-quality", required_argument, NULL, 'q'}, {"jpeg-quality", required_argument, NULL, 'q'},
{"device-timeout", required_argument, NULL, 1000}, {"device-timeout", required_argument, NULL, 1000},
{"device-error-timeout", required_argument, NULL, 1001}, {"device-error-timeout", required_argument, NULL, 1001},
@@ -81,8 +82,9 @@ static void _help(struct device_t *dev, struct http_server_t *server) {
printf(" -t|--dv-timings -- Enable DV timings queriyng and events processing.\n"); printf(" -t|--dv-timings -- Enable DV timings queriyng and events processing.\n");
printf(" Supports automatic resolution changing. Default: disabled.\n\n"); printf(" Supports automatic resolution changing. Default: disabled.\n\n");
printf(" -n|--buffers <N> -- The number of buffers to receive data from the device.\n"); printf(" -n|--buffers <N> -- The number of buffers to receive data from the device.\n");
printf(" Each buffer is processed using an intermediate thread.\n"); printf(" Each buffer may processed using an intermediate thread.\n");
printf(" Default: %d (number of CPU cores + 1)\n\n", dev->n_buffers); printf(" Default: %d (number of CPU cores + 1)\n\n", dev->n_buffers);
printf(" -w|--workers <N> -- The number of compressing threads. Default: %d (== --buffers).\n\n", dev->n_workers);
printf(" -q|--jpeg-quality <N> -- Set quality of JPEG encoding from 1 to 100 (best). Default: %d\n\n", dev->jpeg_quality); printf(" -q|--jpeg-quality <N> -- Set quality of JPEG encoding from 1 to 100 (best). Default: %d\n\n", dev->jpeg_quality);
printf(" --device-timeout <seconds> -- Timeout for device querying. Default: %d\n\n", dev->timeout); printf(" --device-timeout <seconds> -- Timeout for device querying. Default: %d\n\n", dev->timeout);
printf(" --device-error-timeout <seconds> -- Delay before trying to connect to the device again\n"); printf(" --device-error-timeout <seconds> -- Delay before trying to connect to the device again\n");
@@ -97,7 +99,7 @@ static void _help(struct device_t *dev, struct http_server_t *server) {
printf(" --debug -- Enabled debug messages (same as --log-level=3). Default: disabled.\n\n"); printf(" --debug -- Enabled debug messages (same as --log-level=3). Default: disabled.\n\n");
printf(" --log-level <N> -- Verbosity level of messages from 0 (info) to 3 (debug).\n"); printf(" --log-level <N> -- Verbosity level of messages from 0 (info) to 3 (debug).\n");
printf(" Enabling debugging messages can slow down the program.\n"); printf(" Enabling debugging messages can slow down the program.\n");
printf(" Available levels: 0=info, 1=verbose, 2=performace, 3=debug.\n"); printf(" Available levels: 0=info, 1=performance, 2=verbose, 3=debug.\n");
printf(" Default: %d\n\n", log_level); printf(" Default: %d\n\n", log_level);
printf(" -h|--help -- Print this messages and exit\n\n"); printf(" -h|--help -- Print this messages and exit\n\n");
} }
@@ -138,6 +140,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct h
case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0, 8192); case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0, 8192);
case 't': OPT_TRUE(dev->dv_timings); case 't': OPT_TRUE(dev->dv_timings);
case 'n': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1, 32); case 'n': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1, 32);
case 'w': OPT_UNSIGNED(dev->n_workers, "--workers", 1, 32);
case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality", 1, 100); case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality", 1, 100);
case 1000: OPT_UNSIGNED(dev->timeout, "--timeout", 1, 60); case 1000: OPT_UNSIGNED(dev->timeout, "--timeout", 1, 60);
case 1001: OPT_UNSIGNED(dev->error_timeout, "--error-timeout", 1, 60); case 1001: OPT_UNSIGNED(dev->error_timeout, "--error-timeout", 1, 60);

View File

@@ -27,14 +27,6 @@
#include <sys/select.h> #include <sys/select.h>
#include <linux/videodev2.h> #include <linux/videodev2.h>
#ifdef DUMP_STREAM_JPEGS
# warning Enabled DUMP_STREAM_JPEGS
# include <stdio.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <sys/types.h>
#endif
#include "tools.h" #include "tools.h"
#include "logging.h" #include "logging.h"
#include "device.h" #include "device.h"
@@ -43,11 +35,15 @@
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool);
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init(struct device_t *dev, struct workers_pool_t *pool);
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool); static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool);
static void *_stream_worker_thread(void *v_ctx); static void *_stream_worker_thread(void *v_ctx);
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool); static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool);
static int _stream_control(struct device_t *dev, const bool enable); static int _stream_control(struct device_t *dev, const bool enable);
static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
@@ -68,27 +64,6 @@ void stream_destroy(struct stream_t *stream) {
free(stream); free(stream);
} }
#ifdef DUMP_STREAM_JPEGS
static void _stream_dump(struct stream_t *stream) {
static unsigned count = 0;
char path[1024];
errno = 0;
mkdir("stream", 0777);
assert(errno == 0 || errno == EEXIST);
sprintf(path, "stream/img_%06u.jpg", count);
int fd = open(path, O_CREAT|O_TRUNC|O_WRONLY, 0644);
assert(fd);
assert(write(fd, stream->picture.data, stream->picture.size) == (ssize_t)stream->picture.size);
assert(!close(fd));
LOG_INFO("-DDUMP_STREAM_JPEGS dumped %s", path);
++count;
}
#endif
void stream_loop(struct stream_t *stream) { void stream_loop(struct stream_t *stream) {
struct workers_pool_t pool; struct workers_pool_t pool;
bool workers_stop; bool workers_stop;
@@ -112,38 +87,40 @@ void stream_loop(struct stream_t *stream) {
A_CALLOC(stream->picture.data, stream->dev->run->max_picture_size); A_CALLOC(stream->picture.data, stream->dev->run->max_picture_size);
while (!stream->dev->stop) { while (!stream->dev->stop) {
int free_worker_number = -1;
SEP_DEBUG('-'); SEP_DEBUG('-');
LOG_DEBUG("Waiting for workers ..."); LOG_DEBUG("Waiting for workers ...");
A_PTHREAD_M_LOCK(&pool.has_free_workers_mutex); A_PTHREAD_M_LOCK(&pool.free_workers_mutex);
A_PTHREAD_C_WAIT_TRUE(pool.has_free_workers, &pool.has_free_workers_cond, &pool.has_free_workers_mutex); A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_PTHREAD_M_UNLOCK(&pool.has_free_workers_mutex); A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex);
if (oldest_worker && !oldest_worker->has_job && stream->dev->run->pictures[oldest_worker->ctx.index].data) { if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) {
if (oldest_worker->job_failed) { if (oldest_worker->job_failed) {
break; break;
} }
A_PTHREAD_M_LOCK(&stream->mutex); _stream_expose_picture(stream, oldest_worker->ctx.buf_index);
stream->picture.size = stream->dev->run->pictures[oldest_worker->ctx.index].size;
stream->picture.allocated = stream->dev->run->pictures[oldest_worker->ctx.index].allocated;
memcpy(
stream->picture.data, stream->dev->run->pictures[oldest_worker->ctx.index].data,
stream->picture.size * sizeof(*stream->picture.data)
);
stream->width = stream->dev->run->width;
stream->height = stream->dev->run->height;
stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex);
free_worker_number = oldest_worker->ctx.number;
oldest_worker = oldest_worker->order_next; oldest_worker = oldest_worker->order_next;
# ifdef DUMP_STREAM_JPEGS LOG_PERF("##### ACCEPT : %u", free_worker_number);
_stream_dump(stream); } else {
# endif for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
if (!pool.workers[number].has_job && (free_worker_number == -1
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
)) {
free_worker_number = number;
break;
}
}
assert(free_worker_number >= 0);
assert(!pool.workers[free_worker_number].has_job);
LOG_PERF("----- DROP : %u", free_worker_number);
} }
if (stream->dev->stop) { if (stream->dev->stop) {
@@ -209,14 +186,14 @@ void stream_loop(struct stream_t *stream) {
if (now < grab_after) { if (now < grab_after) {
fluency_passed += 1; fluency_passed += 1;
LOG_PERF("Passed %u frames for fluency: now=%.03Lf; grab_after=%.03Lf", fluency_passed, now, grab_after); LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf; grab_after=%.03Lf", fluency_passed, now, grab_after);
goto pass_frame; goto pass_frame;
} }
fluency_passed = 0; fluency_passed = 0;
if (log_level >= LOG_LEVEL_VERBOSE) { if (log_level >= LOG_LEVEL_PERF) {
if ((long long)now != fps_second) { if ((long long)now != fps_second) {
LOG_VERBOSE("Oldest worker complete, encoding FPS = %u", fps); LOG_PERF("Oldest worker complete, encoding FPS = %u", fps);
fps = 0; fps = 0;
fps_second = (long long)now; fps_second = (long long)now;
} }
@@ -226,24 +203,37 @@ void stream_loop(struct stream_t *stream) {
long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool); long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool);
grab_after = now + fluency_delay; grab_after = now + fluency_delay;
LOG_PERF("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
} }
LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index); LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index);
pool.workers[buf_info.index].ctx.buf_info = buf_info; pool.workers[free_worker_number].ctx.buf_info = buf_info;
if (!oldest_worker) { if (!oldest_worker) {
oldest_worker = &pool.workers[buf_info.index]; oldest_worker = &pool.workers[free_worker_number];
last_worker = oldest_worker; last_worker = oldest_worker;
} else { } else {
last_worker->order_next = &pool.workers[buf_info.index]; if (pool.workers[free_worker_number].order_next) {
last_worker = last_worker->order_next; pool.workers[free_worker_number].order_next->order_prev = pool.workers[free_worker_number].order_prev;
}
if (pool.workers[free_worker_number].order_prev) {
pool.workers[free_worker_number].order_prev->order_next = pool.workers[free_worker_number].order_next;
}
pool.workers[free_worker_number].order_prev = last_worker;
last_worker->order_next = &pool.workers[free_worker_number];
last_worker = &pool.workers[free_worker_number];
} }
last_worker->order_next = NULL;
A_PTHREAD_M_LOCK(&pool.workers[buf_info.index].has_job_mutex); A_PTHREAD_M_LOCK(&pool.workers[free_worker_number].has_job_mutex);
pool.workers[buf_info.index].has_job = true; pool.workers[free_worker_number].ctx.buf_index = buf_info.index;
A_PTHREAD_M_UNLOCK(&pool.workers[buf_info.index].has_job_mutex); pool.workers[free_worker_number].has_job = true;
A_PTHREAD_C_SIGNAL(&pool.workers[buf_info.index].has_job_cond); A_PTHREAD_M_UNLOCK(&pool.workers[free_worker_number].has_job_mutex);
A_PTHREAD_C_SIGNAL(&pool.workers[free_worker_number].has_job_cond);
A_PTHREAD_M_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1;
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex);
goto next_handlers; // Поток сам освободит буфер goto next_handlers; // Поток сам освободит буфер
@@ -288,18 +278,35 @@ void stream_loop_break(struct stream_t *stream) {
stream->dev->stop = 1; stream->dev->stop = 1;
} }
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
A_PTHREAD_M_LOCK(&stream->mutex);
stream->picture.size = stream->dev->run->pictures[buf_index].size;
stream->picture.allocated = stream->dev->run->pictures[buf_index].allocated;
memcpy(
stream->picture.data, stream->dev->run->pictures[buf_index].data,
stream->picture.size * sizeof(*stream->picture.data)
);
stream->width = stream->dev->run->width;
stream->height = stream->dev->run->height;
stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex);
}
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) { static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) {
long double delay = 0; long double delay = 0;
for (unsigned index = 0; index < dev->run->n_buffers; ++index) { for (unsigned number = 0; number < dev->n_workers; ++number) {
A_PTHREAD_M_LOCK(&pool->workers[index].last_comp_time_mutex); A_PTHREAD_M_LOCK(&pool->workers[number].last_comp_time_mutex);
if (pool->workers[index].last_comp_time > 0) { if (pool->workers[number].last_comp_time > 0) {
delay += pool->workers[index].last_comp_time; delay += pool->workers[number].last_comp_time;
} }
A_PTHREAD_M_UNLOCK(&pool->workers[index].last_comp_time_mutex); A_PTHREAD_M_UNLOCK(&pool->workers[number].last_comp_time_mutex);
} }
// Среднее арифметическое деленное на количество воркеров // Среднее арифметическое деленное на количество воркеров
return delay / dev->run->n_buffers / dev->run->n_buffers; return delay / dev->n_workers / dev->n_workers;
} }
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) { static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) {
@@ -340,94 +347,90 @@ static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) {
} }
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) { static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) {
LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers); LOG_INFO("Spawning %d workers ...", dev->n_workers);
*pool->workers_stop = false; *pool->workers_stop = false;
A_CALLOC(pool->workers, dev->run->n_buffers); A_CALLOC(pool->workers, dev->n_workers);
A_PTHREAD_M_INIT(&pool->has_free_workers_mutex); A_PTHREAD_M_INIT(&pool->free_workers_mutex);
A_PTHREAD_C_INIT(&pool->has_free_workers_cond); A_PTHREAD_C_INIT(&pool->free_workers_cond);
for (unsigned index = 0; index < dev->run->n_buffers; ++index) { for (unsigned number = 0; number < dev->n_workers; ++number) {
A_PTHREAD_M_INIT(&pool->workers[index].has_job_mutex); pool->free_workers += 1;
A_PTHREAD_C_INIT(&pool->workers[index].has_job_cond);
pool->workers[index].ctx.index = index; A_PTHREAD_M_INIT(&pool->workers[number].has_job_mutex);
pool->workers[index].ctx.dev = dev; A_PTHREAD_C_INIT(&pool->workers[number].has_job_cond);
pool->workers[index].ctx.dev_stop = (sig_atomic_t *volatile)&dev->stop;
pool->workers[index].ctx.workers_stop = pool->workers_stop;
pool->workers[index].ctx.last_comp_time_mutex = &pool->workers[index].last_comp_time_mutex; pool->workers[number].ctx.number = number;
pool->workers[index].ctx.last_comp_time = &pool->workers[index].last_comp_time; pool->workers[number].ctx.dev = dev;
pool->workers[number].ctx.dev_stop = (sig_atomic_t *volatile)&dev->stop;
pool->workers[number].ctx.workers_stop = pool->workers_stop;
pool->workers[index].ctx.has_job_mutex = &pool->workers[index].has_job_mutex; pool->workers[number].ctx.last_comp_time_mutex = &pool->workers[number].last_comp_time_mutex;
pool->workers[index].ctx.has_job = &pool->workers[index].has_job; pool->workers[number].ctx.last_comp_time = &pool->workers[number].last_comp_time;
pool->workers[index].ctx.job_failed = &pool->workers[index].job_failed;
pool->workers[index].ctx.has_job_cond = &pool->workers[index].has_job_cond;
pool->workers[index].ctx.has_free_workers_mutex = &pool->has_free_workers_mutex; pool->workers[number].ctx.has_job_mutex = &pool->workers[number].has_job_mutex;
pool->workers[index].ctx.has_free_workers = &pool->has_free_workers; pool->workers[number].ctx.has_job = &pool->workers[number].has_job;
pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond; pool->workers[number].ctx.job_failed = &pool->workers[number].job_failed;
pool->workers[number].ctx.job_start_time = &pool->workers[number].job_start_time;
pool->workers[number].ctx.has_job_cond = &pool->workers[number].has_job_cond;
A_PTHREAD_CREATE(&pool->workers[index].tid, _stream_worker_thread, (void *)&pool->workers[index].ctx); pool->workers[number].ctx.free_workers_mutex = &pool->free_workers_mutex;
pool->workers[number].ctx.free_workers = &pool->free_workers;
pool->workers[number].ctx.free_workers_cond = &pool->free_workers_cond;
A_PTHREAD_CREATE(&pool->workers[number].tid, _stream_worker_thread, (void *)&pool->workers[number].ctx);
} }
} }
static void *_stream_worker_thread(void *v_ctx) { static void *_stream_worker_thread(void *v_ctx) {
struct worker_context_t *ctx = (struct worker_context_t *)v_ctx; struct worker_context_t *ctx = (struct worker_context_t *)v_ctx;
LOG_DEBUG("Hello! I am a worker #%d ^_^", ctx->index); LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number);
while (!*ctx->dev_stop && !*ctx->workers_stop) { while (!*ctx->dev_stop && !*ctx->workers_stop) {
A_PTHREAD_M_LOCK(ctx->has_free_workers_mutex); LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number);
*ctx->has_free_workers = true;
A_PTHREAD_M_UNLOCK(ctx->has_free_workers_mutex);
A_PTHREAD_C_SIGNAL(ctx->has_free_workers_cond);
LOG_DEBUG("Worker %d waiting for a new job ...", ctx->index);
A_PTHREAD_M_LOCK(ctx->has_job_mutex); A_PTHREAD_M_LOCK(ctx->has_job_mutex);
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); A_PTHREAD_M_UNLOCK(ctx->has_job_mutex);
if (!*ctx->workers_stop) { if (!*ctx->workers_stop) {
time_t start_sec; long double start_time;
time_t stop_sec;
long start_msec;
long stop_msec;
long double last_comp_time; long double last_comp_time;
now_ms(&start_sec, &start_msec); start_time = now_ms_ld();
LOG_DEBUG("Worker %d compressing JPEG ...", ctx->index); LOG_DEBUG("Worker %u compressing JPEG from buffer %d ...", ctx->number, ctx->buf_index);
jpeg_compress_buffer(ctx->dev, ctx->index); jpeg_compress_buffer(ctx->dev, ctx->buf_index);
if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) { if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) {
*ctx->job_start_time = start_time;
*ctx->has_job = false; *ctx->has_job = false;
now_ms(&stop_sec, &stop_msec); last_comp_time = now_ms_ld() - start_time;
if (start_sec <= stop_sec) {
last_comp_time = (stop_sec - start_sec) + ((long double)(stop_msec - start_msec)) / 1000;
} else {
last_comp_time = 0;
}
A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex);
*ctx->last_comp_time = last_comp_time; *ctx->last_comp_time = last_comp_time;
A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex);
LOG_PERF( LOG_VERBOSE(
"Compressed JPEG size=%ld; time=%0.3Lf (worker %d)", "Compressed JPEG size=%ld; time=%0.3Lf; worker=%u; buffer=%d",
ctx->dev->run->pictures[ctx->index].size, last_comp_time, ctx->index ctx->dev->run->pictures[ctx->buf_index].size, last_comp_time, ctx->number, ctx->buf_index
); );
} else { } else {
*ctx->job_failed = true; *ctx->job_failed = true;
*ctx->has_job = false; *ctx->has_job = false;
} }
} }
A_PTHREAD_M_LOCK(ctx->free_workers_mutex);
*ctx->free_workers += 1;
A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex);
A_PTHREAD_C_SIGNAL(ctx->free_workers_cond);
} }
LOG_DEBUG("Bye-bye (worker %d)", ctx->index); LOG_DEBUG("Bye-bye (worker %d)", ctx->number);
return NULL; return NULL;
} }
@@ -436,19 +439,19 @@ static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t
LOG_INFO("Destroying workers ..."); LOG_INFO("Destroying workers ...");
*pool->workers_stop = true; *pool->workers_stop = true;
for (unsigned index = 0; index < dev->run->n_buffers; ++index) { for (unsigned number = 0; number < dev->n_workers; ++number) {
A_PTHREAD_M_LOCK(&pool->workers[index].has_job_mutex); A_PTHREAD_M_LOCK(&pool->workers[number].has_job_mutex);
pool->workers[index].has_job = true; // Final job: die pool->workers[number].has_job = true; // Final job: die
A_PTHREAD_M_UNLOCK(&pool->workers[index].has_job_mutex); A_PTHREAD_M_UNLOCK(&pool->workers[number].has_job_mutex);
A_PTHREAD_C_SIGNAL(&pool->workers[index].has_job_cond); A_PTHREAD_C_SIGNAL(&pool->workers[number].has_job_cond);
A_PTHREAD_JOIN(pool->workers[index].tid); A_PTHREAD_JOIN(pool->workers[number].tid);
A_PTHREAD_M_DESTROY(&pool->workers[index].has_job_mutex); A_PTHREAD_M_DESTROY(&pool->workers[number].has_job_mutex);
A_PTHREAD_C_DESTROY(&pool->workers[index].has_job_cond); A_PTHREAD_C_DESTROY(&pool->workers[number].has_job_cond);
} }
A_PTHREAD_M_DESTROY(&pool->has_free_workers_mutex); A_PTHREAD_M_DESTROY(&pool->free_workers_mutex);
A_PTHREAD_C_DESTROY(&pool->has_free_workers_cond); A_PTHREAD_C_DESTROY(&pool->free_workers_cond);
free(pool->workers); free(pool->workers);
} }

View File

@@ -29,8 +29,9 @@
struct worker_context_t { struct worker_context_t {
int index; unsigned number;
struct device_t *dev; struct device_t *dev;
int buf_index;
struct v4l2_buffer buf_info; struct v4l2_buffer buf_info;
sig_atomic_t *volatile dev_stop; sig_atomic_t *volatile dev_stop;
bool *workers_stop; bool *workers_stop;
@@ -41,11 +42,12 @@ struct worker_context_t {
pthread_mutex_t *has_job_mutex; pthread_mutex_t *has_job_mutex;
bool *has_job; bool *has_job;
bool *job_failed; bool *job_failed;
long double *job_start_time;
pthread_cond_t *has_job_cond; pthread_cond_t *has_job_cond;
pthread_mutex_t *has_free_workers_mutex; pthread_mutex_t *free_workers_mutex;
bool *has_free_workers; unsigned *free_workers;
pthread_cond_t *has_free_workers_cond; pthread_cond_t *free_workers_cond;
}; };
struct worker_t { struct worker_t {
@@ -58,8 +60,10 @@ struct worker_t {
pthread_mutex_t has_job_mutex; pthread_mutex_t has_job_mutex;
bool has_job; bool has_job;
bool job_failed; bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond; pthread_cond_t has_job_cond;
struct worker_t *order_prev;
struct worker_t *order_next; struct worker_t *order_next;
}; };
@@ -67,9 +71,9 @@ struct workers_pool_t {
struct worker_t *workers; struct worker_t *workers;
bool *workers_stop; bool *workers_stop;
pthread_mutex_t has_free_workers_mutex; pthread_mutex_t free_workers_mutex;
bool has_free_workers; unsigned free_workers;
pthread_cond_t has_free_workers_cond; pthread_cond_t free_workers_cond;
}; };
struct stream_t { struct stream_t {