diff --git a/src/device.c b/src/device.c index d577102..4e9ae29 100644 --- a/src/device.c +++ b/src/device.c @@ -156,6 +156,8 @@ int device_open(struct device_t *dev) { _device_open_alloc_picbufs(dev); _device_apply_controls(dev); + dev->run->n_workers = min_u(dev->run->n_buffers, dev->n_workers); + LOG_DEBUG("Device fd=%d initialized", dev->run->fd); return 0; @@ -165,6 +167,8 @@ int device_open(struct device_t *dev) { } void device_close(struct device_t *dev) { + dev->run->n_workers = 0; + if (dev->run->pictures) { LOG_DEBUG("Releasing picture buffers ..."); for (unsigned index = 0; index < dev->run->n_buffers && dev->run->pictures[index].data; ++index) { diff --git a/src/device.h b/src/device.h index 6a7fcdb..63032f0 100644 --- a/src/device.h +++ b/src/device.h @@ -63,7 +63,7 @@ struct device_runtime_t { unsigned height; unsigned format; unsigned n_buffers; -// unsigned n_workers; // FIXME + unsigned n_workers; struct hw_buffer_t *hw_buffers; struct picture_t *pictures; size_t max_raw_image_size; diff --git a/src/encoder.c b/src/encoder.c index 2dbe5f3..a33c801 100644 --- a/src/encoder.c +++ b/src/encoder.c @@ -20,6 +20,7 @@ *****************************************************************************/ +#include #include #include @@ -66,48 +67,6 @@ struct encoder_t *encoder_init() { return encoder; } -#pragma GCC diagnostic ignored "-Wunused-parameter" -#pragma GCC diagnostic push -void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { -#pragma GCC diagnostic pop - - assert(encoder->type != ENCODER_TYPE_UNKNOWN); - // XXX: Тут нет гонки, потому что encoder_prepare() запускается еще до существования других потоков - encoder->run->type = encoder->type; - encoder->run->quality = encoder->quality; - - LOG_INFO("Using JPEG quality: %u%%", encoder->quality); - -# ifdef WITH_OMX_ENCODER - if (encoder->run->type == ENCODER_TYPE_OMX) { - LOG_DEBUG("Preparing OMX JPEG encoder ..."); - - if (dev->n_workers > OMX_MAX_ENCODERS) { - LOG_INFO("OMX JPEG encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS); - dev->n_workers = OMX_MAX_ENCODERS; - } - encoder->run->n_omxs = dev->n_workers; - - A_CALLOC(encoder->run->omxs, encoder->run->n_omxs); - for (unsigned index = 0; index < encoder->run->n_omxs; ++index) { - if ((encoder->run->omxs[index] = omx_encoder_init()) == NULL) { - goto use_fallback; - } - } - } -# endif - - return; - -# pragma GCC diagnostic ignored "-Wunused-label" -# pragma GCC diagnostic push - use_fallback: - LOG_ERROR("Can't initialize selected encoder, using CPU instead it"); - encoder->run->type = ENCODER_TYPE_CPU; - encoder->run->quality = encoder->quality; -# pragma GCC diagnostic pop -} - void encoder_destroy(struct encoder_t *encoder) { # ifdef WITH_OMX_ENCODER if (encoder->run->omxs) { @@ -142,52 +101,87 @@ const char *encoder_type_to_string(enum encoder_type_t type) { return _ENCODER_TYPES[0].name; } -void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) { - assert(encoder->run->type != ENCODER_TYPE_UNKNOWN); +void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { + enum encoder_type_t type = (encoder->run->cpu_forced ? ENCODER_TYPE_CPU : encoder->type); + unsigned quality = encoder->quality; + bool cpu_forced = false; - if ( - (dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG) - && encoder->run->type != ENCODER_TYPE_HW - ) { + if ((dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) { LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG"); - A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->type = ENCODER_TYPE_HW; - A_MUTEX_UNLOCK(&encoder->run->mutex); + type = ENCODER_TYPE_HW; } - if (encoder->run->type == ENCODER_TYPE_HW) { + if (type == ENCODER_TYPE_HW) { if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) { LOG_INFO("Switching to CPU JPEG encoder because the input format is not (M)JPEG"); - goto use_fallback; + goto use_cpu; } - if (hw_encoder_prepare_live(dev, encoder->quality) < 0) { - A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->quality = 0; - A_MUTEX_UNLOCK(&encoder->run->mutex); - LOG_INFO("Using JPEG quality: HW-default"); + + if (hw_encoder_prepare(dev, quality) < 0) { + quality = 0; } + + dev->run->n_workers = 1; } # ifdef WITH_OMX_ENCODER - else if (encoder->run->type == ENCODER_TYPE_OMX) { + else if (type == ENCODER_TYPE_OMX) { + LOG_DEBUG("Preparing OMX JPEG encoder ..."); + + if (dev->run->n_workers > OMX_MAX_ENCODERS) { + LOG_INFO("OMX JPEG encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS); + dev->run->n_workers = OMX_MAX_ENCODERS; + } + + if (encoder->run->omxs == NULL) { + A_CALLOC(encoder->run->omxs, OMX_MAX_ENCODERS); + } + + // Начинаем с нуля и доинициализируем на следующих заходах при необходимости + for (; encoder->run->n_omxs < dev->run->n_workers; ++encoder->run->n_omxs) { + if ((encoder->run->omxs[encoder->run->n_omxs] = omx_encoder_init()) == NULL) { + LOG_ERROR("Can't initialize OMX JPEG encoder, falling back to CPU"); + goto force_cpu; + } + } + for (unsigned index = 0; index < encoder->run->n_omxs; ++index) { - if (omx_encoder_prepare_live(encoder->run->omxs[index], dev, encoder->quality) < 0) { + if (omx_encoder_prepare(encoder->run->omxs[index], dev, quality) < 0) { LOG_ERROR("Can't prepare OMX JPEG encoder, falling back to CPU"); - goto use_fallback; + goto force_cpu; } } } # endif - return; + goto ok; + +# pragma GCC diagnostic ignored "-Wunused-label" +# pragma GCC diagnostic push + force_cpu: + cpu_forced = true; +# pragma GCC diagnostic pop + + use_cpu: + type = ENCODER_TYPE_CPU; + quality = encoder->quality; + + ok: + if (quality == 0) { + LOG_INFO("Using JPEG quality: encoder default"); + } else { + LOG_INFO("Using JPEG quality: %u%%", quality); + } - use_fallback: A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->type = ENCODER_TYPE_CPU; - encoder->run->quality = encoder->quality; + encoder->run->type = type; + encoder->run->quality = quality; + if (cpu_forced) { + encoder->run->cpu_forced = true; + } A_MUTEX_UNLOCK(&encoder->run->mutex); } -#pragma GCC diagnostic ignored "-Wunused-label" +#pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic push int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index) { #pragma GCC diagnostic pop @@ -195,14 +189,14 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns assert(encoder->run->type != ENCODER_TYPE_UNKNOWN); if (encoder->run->type == ENCODER_TYPE_CPU) { - cpu_encoder_compress_buffer(dev, buf_index, encoder->quality); + cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality); } else if (encoder->run->type == ENCODER_TYPE_HW) { hw_encoder_compress_buffer(dev, buf_index); } # ifdef WITH_OMX_ENCODER else if (encoder->run->type == ENCODER_TYPE_OMX) { if (omx_encoder_compress_buffer(encoder->run->omxs[worker_number], dev, buf_index) < 0) { - goto use_fallback; + goto error; } } # endif @@ -211,11 +205,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns # pragma GCC diagnostic ignored "-Wunused-label" # pragma GCC diagnostic push - use_fallback: - LOG_INFO("Error while compressing, falling back to CPU"); + error: + LOG_INFO("Error while compressing buffer, falling back to CPU"); A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->type = ENCODER_TYPE_CPU; - encoder->run->quality = encoder->quality; + encoder->run->cpu_forced = true; A_MUTEX_UNLOCK(&encoder->run->mutex); return -1; # pragma GCC diagnostic pop diff --git a/src/encoder.h b/src/encoder.h index 444cfbc..a1c1c9b 100644 --- a/src/encoder.h +++ b/src/encoder.h @@ -22,6 +22,8 @@ #pragma once +#include + #include "pthread.h" #include "tools.h" @@ -51,6 +53,7 @@ enum encoder_type_t { struct encoder_runtime_t { enum encoder_type_t type; unsigned quality; + bool cpu_forced; pthread_mutex_t mutex; # ifdef WITH_OMX_ENCODER @@ -74,6 +77,4 @@ enum encoder_type_t encoder_parse_type(const char *str); const char *encoder_type_to_string(enum encoder_type_t type); void encoder_prepare(struct encoder_t *encoder, struct device_t *dev); -void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev); - int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index); diff --git a/src/encoders/hw/encoder.c b/src/encoders/hw/encoder.c index 63c7419..9ab6f2e 100644 --- a/src/encoders/hw/encoder.c +++ b/src/encoders/hw/encoder.c @@ -44,7 +44,7 @@ static bool _is_huffman(const unsigned char *data); static size_t _memcpy_with_huffman(unsigned char *dest, const unsigned char *src, size_t size); -int hw_encoder_prepare_live(struct device_t *dev, unsigned quality) { +int hw_encoder_prepare(struct device_t *dev, unsigned quality) { struct v4l2_jpegcompression comp; MEMSET_ZERO(comp); diff --git a/src/encoders/hw/encoder.h b/src/encoders/hw/encoder.h index b498f52..2ee06e6 100644 --- a/src/encoders/hw/encoder.h +++ b/src/encoders/hw/encoder.h @@ -25,5 +25,5 @@ #include "../../device.h" -int hw_encoder_prepare_live(struct device_t *dev, unsigned quality); +int hw_encoder_prepare(struct device_t *dev, unsigned quality); void hw_encoder_compress_buffer(struct device_t *dev, unsigned index); diff --git a/src/encoders/omx/encoder.c b/src/encoders/omx/encoder.c index 7aa6e3f..7633138 100644 --- a/src/encoders/omx/encoder.c +++ b/src/encoders/omx/encoder.c @@ -153,7 +153,7 @@ void omx_encoder_destroy(struct omx_encoder_t *omx) { free(omx); } -int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality) { +int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality) { if (component_set_state(&omx->encoder, OMX_StateIdle) < 0) { return -1; } diff --git a/src/encoders/omx/encoder.h b/src/encoders/omx/encoder.h index 9719973..8a5dc65 100644 --- a/src/encoders/omx/encoder.h +++ b/src/encoders/omx/encoder.h @@ -53,5 +53,5 @@ struct omx_encoder_t { struct omx_encoder_t *omx_encoder_init(); void omx_encoder_destroy(struct omx_encoder_t *omx); -int omx_encoder_prepare_live(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality); +int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality); int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index); diff --git a/src/main.c b/src/main.c index aae4bfd..24e87f2 100644 --- a/src/main.c +++ b/src/main.c @@ -134,7 +134,8 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s printf(" -b|--buffers ─────────────── The number of buffers to receive data from the device.\n"); printf(" Each buffer may processed using an intermediate thread.\n"); printf(" Default: %u (the number of CPU cores (but not more 4) + 1).\n\n", dev->n_buffers); - printf(" -w|--workers ─────────────── The number of worker threads. Default: %u (== --buffers).\n\n", dev->n_workers); + printf(" -w|--workers ─────────────── The number of worker threads but not more than buffers.\n"); + printf(" Default: %u (== --buffers).\n\n", dev->n_workers); printf(" -q|--quality ─────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n\n", encoder->quality); printf(" -c|--encoder ──────────── Use specified encoder. It may affects to workers number.\n"); printf(" Available: %s; default: CPU.\n\n", ENCODER_TYPES_STR); @@ -383,7 +384,6 @@ int main(int argc, char *argv[]) { if ((exit_code = _parse_options(argc, argv, dev, encoder, server)) == 0) { _install_signal_handlers(); - encoder_prepare(encoder, dev); pthread_t stream_loop_tid; pthread_t server_loop_tid; diff --git a/src/stream.c b/src/stream.c index 6913ee5..5fde24e 100644 --- a/src/stream.c +++ b/src/stream.c @@ -119,7 +119,7 @@ void stream_loop(struct stream_t *stream) { LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number); } else { - for (unsigned number = 0; number < stream->dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { if ( !atomic_load(&pool.workers[number].has_job) && ( free_worker_number == -1 @@ -336,7 +336,7 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker long double min_delay; long double soft_delay; - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < dev->run->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next A_MUTEX_LOCK(&WORKER(last_comp_time_mutex)); @@ -347,9 +347,9 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker # undef WORKER } - avg_comp_time = sum_comp_time / dev->n_workers; // Среднее время работы воркеров + avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров - min_delay = avg_comp_time / dev->n_workers; // Среднее время работы размазывается на N воркеров + min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров if (dev->desired_fps > 0 && min_delay > 0) { // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps @@ -389,7 +389,7 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { goto error; } - encoder_prepare_live(pool->encoder, stream->dev); + encoder_prepare(pool->encoder, stream->dev); _stream_init_workers(stream, pool); @@ -401,15 +401,15 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) { } static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) { - LOG_INFO("Spawning %u workers ...", stream->dev->n_workers); + LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers); atomic_store(&pool->workers_stop, false); - A_CALLOC(pool->workers, stream->dev->n_workers); + A_CALLOC(pool->workers, stream->dev->run->n_workers); A_MUTEX_INIT(&pool->free_workers_mutex); A_COND_INIT(&pool->free_workers_cond); - for (unsigned number = 0; number < stream->dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next pool->free_workers += 1; @@ -492,7 +492,7 @@ static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool LOG_INFO("Destroying workers ..."); atomic_store(&pool->workers_stop, true); - for (unsigned number = 0; number < stream->dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next A_MUTEX_LOCK(&WORKER(has_job_mutex));