diff --git a/src/device.c b/src/device.c index 71f2611..1517fc4 100644 --- a/src/device.c +++ b/src/device.c @@ -82,6 +82,7 @@ struct device_t *device_init() { dev->format = V4L2_PIX_FMT_YUYV; dev->standard = V4L2_STD_UNKNOWN; dev->n_buffers = max_u(sysconf(_SC_NPROCESSORS_ONLN), 1) + 1; + dev->n_workers = dev->n_buffers; dev->jpeg_quality = 80; dev->timeout = 1; dev->error_timeout = 1; diff --git a/src/device.h b/src/device.h index 097d831..ee9d96a 100644 --- a/src/device.h +++ b/src/device.h @@ -62,6 +62,7 @@ struct device_t { v4l2_std_id standard; bool dv_timings; unsigned n_buffers; + unsigned n_workers; unsigned every_frame; unsigned min_frame_size; unsigned jpeg_quality; diff --git a/src/logging.h b/src/logging.h index 8721f53..66deff9 100644 --- a/src/logging.h +++ b/src/logging.h @@ -37,8 +37,8 @@ pthread_mutex_t log_mutex; #define LOG_LEVEL_INFO 0 -#define LOG_LEVEL_VERBOSE 1 -#define LOG_LEVEL_PERF 2 +#define LOG_LEVEL_PERF 1 +#define LOG_LEVEL_VERBOSE 2 #define LOG_LEVEL_DEBUG 3 @@ -88,14 +88,6 @@ pthread_mutex_t log_mutex; printf("-- INFO [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \ } -#define LOG_VERBOSE(_x_msg, ...) { \ - if (log_level >= LOG_LEVEL_VERBOSE) { \ - LOGGING_LOCK; \ - printf("-- VERB [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \ - LOGGING_UNLOCK; \ - } \ - } - #define LOG_PERF(_x_msg, ...) { \ if (log_level >= LOG_LEVEL_PERF) { \ LOGGING_LOCK; \ @@ -104,6 +96,14 @@ pthread_mutex_t log_mutex; } \ } +#define LOG_VERBOSE(_x_msg, ...) { \ + if (log_level >= LOG_LEVEL_VERBOSE) { \ + LOGGING_LOCK; \ + printf("-- VERB [%.03Lf tid=%ld] -- " _x_msg "\n", now_ms_ld(), syscall(SYS_gettid), ##__VA_ARGS__); \ + LOGGING_UNLOCK; \ + } \ + } + #define LOG_DEBUG(_x_msg, ...) { \ if (log_level >= LOG_LEVEL_DEBUG) { \ LOGGING_LOCK; \ diff --git a/src/main.c b/src/main.c index ef2fe97..2ce5418 100644 --- a/src/main.c +++ b/src/main.c @@ -38,7 +38,7 @@ #include "http.h" -static const char _short_opts[] = "d:x:y:f:a:e:z:tn:q:s:p:h"; +static const char _short_opts[] = "d:x:y:f:a:e:z:tn:w:q:s:p:h"; static const struct option _long_opts[] = { {"device", required_argument, NULL, 'd'}, {"width", required_argument, NULL, 'x'}, @@ -49,6 +49,7 @@ static const struct option _long_opts[] = { {"min-frame-size", required_argument, NULL, 'z'}, {"dv-timings", no_argument, NULL, 't'}, {"buffers", required_argument, NULL, 'n'}, + {"workers", required_argument, NULL, 'w'}, {"jpeg-quality", required_argument, NULL, 'q'}, {"device-timeout", required_argument, NULL, 1000}, {"device-error-timeout", required_argument, NULL, 1001}, @@ -81,8 +82,9 @@ static void _help(struct device_t *dev, struct http_server_t *server) { printf(" -t|--dv-timings -- Enable DV timings queriyng and events processing.\n"); printf(" Supports automatic resolution changing. Default: disabled.\n\n"); printf(" -n|--buffers -- The number of buffers to receive data from the device.\n"); - printf(" Each buffer is processed using an intermediate thread.\n"); + printf(" Each buffer may processed using an intermediate thread.\n"); printf(" Default: %d (number of CPU cores + 1)\n\n", dev->n_buffers); + printf(" -w|--workers -- The number of compressing threads. Default: %d (== --buffers).\n\n", dev->n_workers); printf(" -q|--jpeg-quality -- Set quality of JPEG encoding from 1 to 100 (best). Default: %d\n\n", dev->jpeg_quality); printf(" --device-timeout -- Timeout for device querying. Default: %d\n\n", dev->timeout); printf(" --device-error-timeout -- Delay before trying to connect to the device again\n"); @@ -97,7 +99,7 @@ static void _help(struct device_t *dev, struct http_server_t *server) { printf(" --debug -- Enabled debug messages (same as --log-level=3). Default: disabled.\n\n"); printf(" --log-level -- Verbosity level of messages from 0 (info) to 3 (debug).\n"); printf(" Enabling debugging messages can slow down the program.\n"); - printf(" Available levels: 0=info, 1=verbose, 2=performace, 3=debug.\n"); + printf(" Available levels: 0=info, 1=performance, 2=verbose, 3=debug.\n"); printf(" Default: %d\n\n", log_level); printf(" -h|--help -- Print this messages and exit\n\n"); } @@ -138,6 +140,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct h case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0, 8192); case 't': OPT_TRUE(dev->dv_timings); case 'n': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1, 32); + case 'w': OPT_UNSIGNED(dev->n_workers, "--workers", 1, 32); case 'q': OPT_UNSIGNED(dev->jpeg_quality, "--jpeg-quality", 1, 100); case 1000: OPT_UNSIGNED(dev->timeout, "--timeout", 1, 60); case 1001: OPT_UNSIGNED(dev->error_timeout, "--error-timeout", 1, 60); diff --git a/src/stream.c b/src/stream.c index cf7a855..abe2b1a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -27,14 +27,6 @@ #include #include -#ifdef DUMP_STREAM_JPEGS -# warning Enabled DUMP_STREAM_JPEGS -# include -# include -# include -# include -#endif - #include "tools.h" #include "logging.h" #include "device.h" @@ -43,11 +35,15 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); +static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); + static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init(struct device_t *dev, struct workers_pool_t *pool); + static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool); static void *_stream_worker_thread(void *v_ctx); static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool); + static int _stream_control(struct device_t *dev, const bool enable); static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); @@ -68,27 +64,6 @@ void stream_destroy(struct stream_t *stream) { free(stream); } -#ifdef DUMP_STREAM_JPEGS -static void _stream_dump(struct stream_t *stream) { - static unsigned count = 0; - char path[1024]; - - errno = 0; - mkdir("stream", 0777); - assert(errno == 0 || errno == EEXIST); - - sprintf(path, "stream/img_%06u.jpg", count); - int fd = open(path, O_CREAT|O_TRUNC|O_WRONLY, 0644); - assert(fd); - assert(write(fd, stream->picture.data, stream->picture.size) == (ssize_t)stream->picture.size); - assert(!close(fd)); - - LOG_INFO("-DDUMP_STREAM_JPEGS dumped %s", path); - - ++count; -} -#endif - void stream_loop(struct stream_t *stream) { struct workers_pool_t pool; bool workers_stop; @@ -112,38 +87,40 @@ void stream_loop(struct stream_t *stream) { A_CALLOC(stream->picture.data, stream->dev->run->max_picture_size); while (!stream->dev->stop) { + int free_worker_number = -1; + SEP_DEBUG('-'); LOG_DEBUG("Waiting for workers ..."); - A_PTHREAD_M_LOCK(&pool.has_free_workers_mutex); - A_PTHREAD_C_WAIT_TRUE(pool.has_free_workers, &pool.has_free_workers_cond, &pool.has_free_workers_mutex); - A_PTHREAD_M_UNLOCK(&pool.has_free_workers_mutex); + A_PTHREAD_M_LOCK(&pool.free_workers_mutex); + A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); + A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); - if (oldest_worker && !oldest_worker->has_job && stream->dev->run->pictures[oldest_worker->ctx.index].data) { + if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) { if (oldest_worker->job_failed) { break; } - A_PTHREAD_M_LOCK(&stream->mutex); - - stream->picture.size = stream->dev->run->pictures[oldest_worker->ctx.index].size; - stream->picture.allocated = stream->dev->run->pictures[oldest_worker->ctx.index].allocated; - memcpy( - stream->picture.data, stream->dev->run->pictures[oldest_worker->ctx.index].data, - stream->picture.size * sizeof(*stream->picture.data) - ); - - stream->width = stream->dev->run->width; - stream->height = stream->dev->run->height; - stream->updated = true; - - A_PTHREAD_M_UNLOCK(&stream->mutex); + _stream_expose_picture(stream, oldest_worker->ctx.buf_index); + free_worker_number = oldest_worker->ctx.number; oldest_worker = oldest_worker->order_next; -# ifdef DUMP_STREAM_JPEGS - _stream_dump(stream); -# endif + LOG_PERF("##### ACCEPT : %u", free_worker_number); + } else { + for (unsigned number = 0; number < stream->dev->n_workers; ++number) { + if (!pool.workers[number].has_job && (free_worker_number == -1 + || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time + )) { + free_worker_number = number; + break; + } + } + + assert(free_worker_number >= 0); + assert(!pool.workers[free_worker_number].has_job); + + LOG_PERF("----- DROP : %u", free_worker_number); } if (stream->dev->stop) { @@ -209,14 +186,14 @@ void stream_loop(struct stream_t *stream) { if (now < grab_after) { fluency_passed += 1; - LOG_PERF("Passed %u frames for fluency: now=%.03Lf; grab_after=%.03Lf", fluency_passed, now, grab_after); + LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf; grab_after=%.03Lf", fluency_passed, now, grab_after); goto pass_frame; } fluency_passed = 0; - if (log_level >= LOG_LEVEL_VERBOSE) { + if (log_level >= LOG_LEVEL_PERF) { if ((long long)now != fps_second) { - LOG_VERBOSE("Oldest worker complete, encoding FPS = %u", fps); + LOG_PERF("Oldest worker complete, encoding FPS = %u", fps); fps = 0; fps_second = (long long)now; } @@ -226,24 +203,37 @@ void stream_loop(struct stream_t *stream) { long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool); grab_after = now + fluency_delay; - LOG_PERF("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); + LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); } LOG_DEBUG("Grabbed a new frame to buffer %d", buf_info.index); - pool.workers[buf_info.index].ctx.buf_info = buf_info; + pool.workers[free_worker_number].ctx.buf_info = buf_info; if (!oldest_worker) { - oldest_worker = &pool.workers[buf_info.index]; + oldest_worker = &pool.workers[free_worker_number]; last_worker = oldest_worker; } else { - last_worker->order_next = &pool.workers[buf_info.index]; - last_worker = last_worker->order_next; + if (pool.workers[free_worker_number].order_next) { + pool.workers[free_worker_number].order_next->order_prev = pool.workers[free_worker_number].order_prev; + } + if (pool.workers[free_worker_number].order_prev) { + pool.workers[free_worker_number].order_prev->order_next = pool.workers[free_worker_number].order_next; + } + pool.workers[free_worker_number].order_prev = last_worker; + last_worker->order_next = &pool.workers[free_worker_number]; + last_worker = &pool.workers[free_worker_number]; } + last_worker->order_next = NULL; - A_PTHREAD_M_LOCK(&pool.workers[buf_info.index].has_job_mutex); - pool.workers[buf_info.index].has_job = true; - A_PTHREAD_M_UNLOCK(&pool.workers[buf_info.index].has_job_mutex); - A_PTHREAD_C_SIGNAL(&pool.workers[buf_info.index].has_job_cond); + A_PTHREAD_M_LOCK(&pool.workers[free_worker_number].has_job_mutex); + pool.workers[free_worker_number].ctx.buf_index = buf_info.index; + pool.workers[free_worker_number].has_job = true; + A_PTHREAD_M_UNLOCK(&pool.workers[free_worker_number].has_job_mutex); + A_PTHREAD_C_SIGNAL(&pool.workers[free_worker_number].has_job_cond); + + A_PTHREAD_M_LOCK(&pool.free_workers_mutex); + pool.free_workers -= 1; + A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); goto next_handlers; // Поток сам освободит буфер @@ -288,18 +278,35 @@ void stream_loop_break(struct stream_t *stream) { stream->dev->stop = 1; } +static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { + A_PTHREAD_M_LOCK(&stream->mutex); + + stream->picture.size = stream->dev->run->pictures[buf_index].size; + stream->picture.allocated = stream->dev->run->pictures[buf_index].allocated; + memcpy( + stream->picture.data, stream->dev->run->pictures[buf_index].data, + stream->picture.size * sizeof(*stream->picture.data) + ); + + stream->width = stream->dev->run->width; + stream->height = stream->dev->run->height; + stream->updated = true; + + A_PTHREAD_M_UNLOCK(&stream->mutex); +} + static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) { long double delay = 0; - for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - A_PTHREAD_M_LOCK(&pool->workers[index].last_comp_time_mutex); - if (pool->workers[index].last_comp_time > 0) { - delay += pool->workers[index].last_comp_time; + for (unsigned number = 0; number < dev->n_workers; ++number) { + A_PTHREAD_M_LOCK(&pool->workers[number].last_comp_time_mutex); + if (pool->workers[number].last_comp_time > 0) { + delay += pool->workers[number].last_comp_time; } - A_PTHREAD_M_UNLOCK(&pool->workers[index].last_comp_time_mutex); + A_PTHREAD_M_UNLOCK(&pool->workers[number].last_comp_time_mutex); } // Среднее арифметическое деленное на количество воркеров - return delay / dev->run->n_buffers / dev->run->n_buffers; + return delay / dev->n_workers / dev->n_workers; } static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) { @@ -340,94 +347,90 @@ static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) { } static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) { - LOG_DEBUG("Spawning %d workers ...", dev->run->n_buffers); + LOG_INFO("Spawning %d workers ...", dev->n_workers); *pool->workers_stop = false; - A_CALLOC(pool->workers, dev->run->n_buffers); + A_CALLOC(pool->workers, dev->n_workers); - A_PTHREAD_M_INIT(&pool->has_free_workers_mutex); - A_PTHREAD_C_INIT(&pool->has_free_workers_cond); + A_PTHREAD_M_INIT(&pool->free_workers_mutex); + A_PTHREAD_C_INIT(&pool->free_workers_cond); - for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - A_PTHREAD_M_INIT(&pool->workers[index].has_job_mutex); - A_PTHREAD_C_INIT(&pool->workers[index].has_job_cond); + for (unsigned number = 0; number < dev->n_workers; ++number) { + pool->free_workers += 1; - pool->workers[index].ctx.index = index; - pool->workers[index].ctx.dev = dev; - pool->workers[index].ctx.dev_stop = (sig_atomic_t *volatile)&dev->stop; - pool->workers[index].ctx.workers_stop = pool->workers_stop; + A_PTHREAD_M_INIT(&pool->workers[number].has_job_mutex); + A_PTHREAD_C_INIT(&pool->workers[number].has_job_cond); - pool->workers[index].ctx.last_comp_time_mutex = &pool->workers[index].last_comp_time_mutex; - pool->workers[index].ctx.last_comp_time = &pool->workers[index].last_comp_time; + pool->workers[number].ctx.number = number; + pool->workers[number].ctx.dev = dev; + pool->workers[number].ctx.dev_stop = (sig_atomic_t *volatile)&dev->stop; + pool->workers[number].ctx.workers_stop = pool->workers_stop; - pool->workers[index].ctx.has_job_mutex = &pool->workers[index].has_job_mutex; - pool->workers[index].ctx.has_job = &pool->workers[index].has_job; - pool->workers[index].ctx.job_failed = &pool->workers[index].job_failed; - pool->workers[index].ctx.has_job_cond = &pool->workers[index].has_job_cond; + pool->workers[number].ctx.last_comp_time_mutex = &pool->workers[number].last_comp_time_mutex; + pool->workers[number].ctx.last_comp_time = &pool->workers[number].last_comp_time; - pool->workers[index].ctx.has_free_workers_mutex = &pool->has_free_workers_mutex; - pool->workers[index].ctx.has_free_workers = &pool->has_free_workers; - pool->workers[index].ctx.has_free_workers_cond = &pool->has_free_workers_cond; + pool->workers[number].ctx.has_job_mutex = &pool->workers[number].has_job_mutex; + pool->workers[number].ctx.has_job = &pool->workers[number].has_job; + pool->workers[number].ctx.job_failed = &pool->workers[number].job_failed; + pool->workers[number].ctx.job_start_time = &pool->workers[number].job_start_time; + pool->workers[number].ctx.has_job_cond = &pool->workers[number].has_job_cond; - A_PTHREAD_CREATE(&pool->workers[index].tid, _stream_worker_thread, (void *)&pool->workers[index].ctx); + pool->workers[number].ctx.free_workers_mutex = &pool->free_workers_mutex; + pool->workers[number].ctx.free_workers = &pool->free_workers; + pool->workers[number].ctx.free_workers_cond = &pool->free_workers_cond; + + A_PTHREAD_CREATE(&pool->workers[number].tid, _stream_worker_thread, (void *)&pool->workers[number].ctx); } } static void *_stream_worker_thread(void *v_ctx) { struct worker_context_t *ctx = (struct worker_context_t *)v_ctx; - LOG_DEBUG("Hello! I am a worker #%d ^_^", ctx->index); + LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number); while (!*ctx->dev_stop && !*ctx->workers_stop) { - A_PTHREAD_M_LOCK(ctx->has_free_workers_mutex); - *ctx->has_free_workers = true; - A_PTHREAD_M_UNLOCK(ctx->has_free_workers_mutex); - A_PTHREAD_C_SIGNAL(ctx->has_free_workers_cond); - - LOG_DEBUG("Worker %d waiting for a new job ...", ctx->index); + LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); A_PTHREAD_M_LOCK(ctx->has_job_mutex); A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); if (!*ctx->workers_stop) { - time_t start_sec; - time_t stop_sec; - long start_msec; - long stop_msec; + long double start_time; long double last_comp_time; - now_ms(&start_sec, &start_msec); + start_time = now_ms_ld(); - LOG_DEBUG("Worker %d compressing JPEG ...", ctx->index); + LOG_DEBUG("Worker %u compressing JPEG from buffer %d ...", ctx->number, ctx->buf_index); - jpeg_compress_buffer(ctx->dev, ctx->index); + jpeg_compress_buffer(ctx->dev, ctx->buf_index); if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) { + *ctx->job_start_time = start_time; *ctx->has_job = false; - now_ms(&stop_sec, &stop_msec); - if (start_sec <= stop_sec) { - last_comp_time = (stop_sec - start_sec) + ((long double)(stop_msec - start_msec)) / 1000; - } else { - last_comp_time = 0; - } + last_comp_time = now_ms_ld() - start_time; A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); *ctx->last_comp_time = last_comp_time; A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); - LOG_PERF( - "Compressed JPEG size=%ld; time=%0.3Lf (worker %d)", - ctx->dev->run->pictures[ctx->index].size, last_comp_time, ctx->index + LOG_VERBOSE( + "Compressed JPEG size=%ld; time=%0.3Lf; worker=%u; buffer=%d", + ctx->dev->run->pictures[ctx->buf_index].size, last_comp_time, ctx->number, ctx->buf_index ); } else { *ctx->job_failed = true; *ctx->has_job = false; } } + + A_PTHREAD_M_LOCK(ctx->free_workers_mutex); + *ctx->free_workers += 1; + A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex); + A_PTHREAD_C_SIGNAL(ctx->free_workers_cond); } - LOG_DEBUG("Bye-bye (worker %d)", ctx->index); + LOG_DEBUG("Bye-bye (worker %d)", ctx->number); return NULL; } @@ -436,19 +439,19 @@ static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t LOG_INFO("Destroying workers ..."); *pool->workers_stop = true; - for (unsigned index = 0; index < dev->run->n_buffers; ++index) { - A_PTHREAD_M_LOCK(&pool->workers[index].has_job_mutex); - pool->workers[index].has_job = true; // Final job: die - A_PTHREAD_M_UNLOCK(&pool->workers[index].has_job_mutex); - A_PTHREAD_C_SIGNAL(&pool->workers[index].has_job_cond); + for (unsigned number = 0; number < dev->n_workers; ++number) { + A_PTHREAD_M_LOCK(&pool->workers[number].has_job_mutex); + pool->workers[number].has_job = true; // Final job: die + A_PTHREAD_M_UNLOCK(&pool->workers[number].has_job_mutex); + A_PTHREAD_C_SIGNAL(&pool->workers[number].has_job_cond); - A_PTHREAD_JOIN(pool->workers[index].tid); - A_PTHREAD_M_DESTROY(&pool->workers[index].has_job_mutex); - A_PTHREAD_C_DESTROY(&pool->workers[index].has_job_cond); + A_PTHREAD_JOIN(pool->workers[number].tid); + A_PTHREAD_M_DESTROY(&pool->workers[number].has_job_mutex); + A_PTHREAD_C_DESTROY(&pool->workers[number].has_job_cond); } - A_PTHREAD_M_DESTROY(&pool->has_free_workers_mutex); - A_PTHREAD_C_DESTROY(&pool->has_free_workers_cond); + A_PTHREAD_M_DESTROY(&pool->free_workers_mutex); + A_PTHREAD_C_DESTROY(&pool->free_workers_cond); free(pool->workers); } diff --git a/src/stream.h b/src/stream.h index 7fd6e62..8d8c076 100644 --- a/src/stream.h +++ b/src/stream.h @@ -29,8 +29,9 @@ struct worker_context_t { - int index; + unsigned number; struct device_t *dev; + int buf_index; struct v4l2_buffer buf_info; sig_atomic_t *volatile dev_stop; bool *workers_stop; @@ -41,11 +42,12 @@ struct worker_context_t { pthread_mutex_t *has_job_mutex; bool *has_job; bool *job_failed; + long double *job_start_time; pthread_cond_t *has_job_cond; - pthread_mutex_t *has_free_workers_mutex; - bool *has_free_workers; - pthread_cond_t *has_free_workers_cond; + pthread_mutex_t *free_workers_mutex; + unsigned *free_workers; + pthread_cond_t *free_workers_cond; }; struct worker_t { @@ -58,8 +60,10 @@ struct worker_t { pthread_mutex_t has_job_mutex; bool has_job; bool job_failed; + long double job_start_time; pthread_cond_t has_job_cond; + struct worker_t *order_prev; struct worker_t *order_next; }; @@ -67,9 +71,9 @@ struct workers_pool_t { struct worker_t *workers; bool *workers_stop; - pthread_mutex_t has_free_workers_mutex; - bool has_free_workers; - pthread_cond_t has_free_workers_cond; + pthread_mutex_t free_workers_mutex; + unsigned free_workers; + pthread_cond_t free_workers_cond; }; struct stream_t {