From 5c48faa83215fdf3ad41f2f4d4117fef2397f2bd Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Tue, 8 Dec 2020 07:28:35 +0300 Subject: [PATCH] refactoring --- src/common/tools.h | 9 +++ src/ustreamer/device.c | 47 +++--------- src/ustreamer/device.h | 14 ++-- src/ustreamer/encoder.c | 102 +++++++++++++++------------ src/ustreamer/encoder.h | 10 ++- src/ustreamer/encoders/cpu/encoder.c | 9 ++- src/ustreamer/encoders/cpu/encoder.h | 5 +- src/ustreamer/encoders/hw/encoder.c | 4 +- src/ustreamer/encoders/hw/encoder.h | 3 +- src/ustreamer/encoders/omx/encoder.c | 11 +-- src/ustreamer/encoders/omx/encoder.h | 6 +- src/ustreamer/options.c | 4 +- src/ustreamer/stream.c | 43 ++++++----- 13 files changed, 146 insertions(+), 121 deletions(-) diff --git a/src/common/tools.h b/src/common/tools.h index d604e86..f394ee4 100644 --- a/src/common/tools.h +++ b/src/common/tools.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -95,3 +96,11 @@ INLINE long double get_now_real(void) { get_now(CLOCK_REALTIME, &sec, &msec); return (long double)sec + ((long double)msec) / 1000; } + +INLINE unsigned get_cores_available(void) { + long cores_sysconf; + + cores_sysconf = sysconf(_SC_NPROCESSORS_ONLN); + cores_sysconf = (cores_sysconf < 0 ? 0 : cores_sysconf); + return max_u(min_u(cores_sysconf, 4), 1); +} diff --git a/src/ustreamer/device.c b/src/ustreamer/device.c index 62d4c84..2987770 100644 --- a/src/ustreamer/device.c +++ b/src/ustreamer/device.c @@ -44,7 +44,6 @@ #include "../common/threading.h" #include "xioctl.h" -#include "picture.h" static const struct { @@ -87,7 +86,6 @@ static int _device_open_io_method(struct device_t *dev); static int _device_open_io_method_mmap(struct device_t *dev); static int _device_open_io_method_userptr(struct device_t *dev); static int _device_open_queue_buffers(struct device_t *dev); -static void _device_open_alloc_picbufs(struct device_t *dev); static int _device_apply_resolution(struct device_t *dev, unsigned width, unsigned height); static void _device_apply_controls(struct device_t *dev); @@ -111,12 +109,6 @@ static const char *_io_method_to_string_supported(enum v4l2_memory io_method); struct device_t *device_init(void) { struct device_runtime_t *run; struct device_t *dev; - long cores_sysconf; - unsigned cores_available; - - cores_sysconf = sysconf(_SC_NPROCESSORS_ONLN); - cores_sysconf = (cores_sysconf < 0 ? 0 : cores_sysconf); - cores_available = max_u(min_u(cores_sysconf, 4), 1); A_CALLOC(run, 1); run->fd = -1; @@ -128,15 +120,18 @@ struct device_t *device_init(void) { dev->format = V4L2_PIX_FMT_YUYV; dev->standard = V4L2_STD_UNKNOWN; dev->io_method = V4L2_MEMORY_MMAP; - dev->n_buffers = cores_available + 1; - dev->n_workers = min_u(cores_available, dev->n_buffers); + dev->n_buffers = get_cores_available() + 1; dev->min_frame_size = 128; dev->timeout = 1; - dev->error_delay = 1; // XXX: not device param -# ifdef WITH_RAWSINK // XXX: not device param + + // FIXME: Not device params + dev->error_delay = 1; +# ifdef WITH_RAWSINK dev->rawsink_name = ""; dev->rawsink_mode = 0660; # endif + // end-of-fixme + dev->run = run; return dev; } @@ -196,11 +191,8 @@ int device_open(struct device_t *dev) { if (_device_open_queue_buffers(dev) < 0) { goto error; } - _device_open_alloc_picbufs(dev); _device_apply_controls(dev); - RUN(n_workers) = min_u(RUN(n_buffers), dev->n_workers); - LOG_DEBUG("Device fd=%d initialized", RUN(fd)); return 0; @@ -211,16 +203,6 @@ int device_open(struct device_t *dev) { void device_close(struct device_t *dev) { RUN(persistent_timeout_reported) = false; - RUN(n_workers) = 0; - - if (RUN(pictures)) { - LOG_DEBUG("Releasing picture buffers ..."); - for (unsigned index = 0; index < RUN(n_buffers); ++index) { - picture_destroy(RUN(pictures[index])); - } - free(RUN(pictures)); - RUN(pictures) = NULL; - } if (RUN(hw_buffers)) { LOG_DEBUG("Releasing device buffers ..."); @@ -373,7 +355,7 @@ int device_grab_buffer(struct device_t *dev) { HW(used) = buf_info.bytesused; memcpy(&HW(buf_info), &buf_info, sizeof(struct v4l2_buffer)); - RUN(pictures)[buf_info.index]->grab_ts = get_now_monotonic(); + HW(grab_ts) = get_now_monotonic(); # undef HW return buf_info.index; @@ -751,19 +733,6 @@ static int _device_open_queue_buffers(struct device_t *dev) { return 0; } -static void _device_open_alloc_picbufs(struct device_t *dev) { - size_t picture_size = picture_get_generous_size(RUN(width), RUN(height)); - - LOG_DEBUG("Allocating picture buffers ..."); - A_CALLOC(RUN(pictures), RUN(n_buffers)); - - for (unsigned index = 0; index < RUN(n_buffers); ++index) { - RUN(pictures)[index] = picture_init(); - LOG_DEBUG("Pre-allocating picture buffer %u sized %zu bytes... ", index, picture_size); - picture_realloc_data(RUN(pictures)[index], picture_size); - } -} - static int _device_apply_resolution(struct device_t *dev, unsigned width, unsigned height) { // Тут VIDEO_MIN_* не используются из-за странностей минимального разрешения при отсутствии сигнала // у некоторых устройств, например Auvidea B101 diff --git a/src/ustreamer/device.h b/src/ustreamer/device.h index 6bb69c1..771fd9c 100644 --- a/src/ustreamer/device.h +++ b/src/ustreamer/device.h @@ -32,8 +32,6 @@ # include "../rawsink/rawsink.h" #endif -#include "picture.h" - #define VIDEO_MIN_WIDTH ((unsigned)160) #define VIDEO_MAX_WIDTH ((unsigned)10240) @@ -57,6 +55,8 @@ struct hw_buffer_t { unsigned char *data; size_t used; size_t allocated; + long double grab_ts; + struct v4l2_buffer buf_info; pthread_mutex_t grabbed_mutex; @@ -71,9 +71,7 @@ struct device_runtime_t { unsigned hw_fps; size_t raw_size; unsigned n_buffers; - unsigned n_workers; struct hw_buffer_t *hw_buffers; - struct picture_t **pictures; bool capturing; bool persistent_timeout_reported; }; @@ -115,17 +113,19 @@ struct device_t { enum v4l2_memory io_method; bool dv_timings; unsigned n_buffers; - unsigned n_workers; unsigned desired_fps; size_t min_frame_size; bool persistent; unsigned timeout; - unsigned error_delay; // XXX: not device param -# ifdef WITH_RAWSINK // XXX: not device params + + // FIXME: Not device params + unsigned error_delay; +# ifdef WITH_RAWSINK char *rawsink_name; mode_t rawsink_mode; bool rawsink_rm; # endif + // end-of-fixme struct controls_t ctl; diff --git a/src/ustreamer/encoder.c b/src/ustreamer/encoder.c index 27ef65a..95fb16b 100644 --- a/src/ustreamer/encoder.c +++ b/src/ustreamer/encoder.c @@ -34,6 +34,7 @@ #include "../common/logging.h" #include "device.h" +#include "picture.h" #include "encoders/cpu/encoder.h" #include "encoders/hw/encoder.h" @@ -57,6 +58,10 @@ static const struct { }; +#define ER(_next) encoder->run->_next +#define DR(_next) dev->run->_next + + struct encoder_t *encoder_init(void) { struct encoder_runtime_t *run; struct encoder_t *encoder; @@ -69,22 +74,23 @@ struct encoder_t *encoder_init(void) { A_CALLOC(encoder, 1); encoder->type = run->type; encoder->quality = run->quality; + encoder->n_workers = get_cores_available(); encoder->run = run; return encoder; } void encoder_destroy(struct encoder_t *encoder) { # ifdef WITH_OMX - if (encoder->run->omxs) { - for (unsigned index = 0; index < encoder->run->n_omxs; ++index) { - if (encoder->run->omxs[index]) { - omx_encoder_destroy(encoder->run->omxs[index]); + if (ER(omxs)) { + for (unsigned index = 0; index < ER(n_omxs); ++index) { + if (ER(omxs[index])) { + omx_encoder_destroy(ER(omxs[index])); } } - free(encoder->run->omxs); + free(ER(omxs)); } # endif - A_MUTEX_DESTROY(&encoder->run->mutex); + A_MUTEX_DESTROY(&ER(mutex)); free(encoder->run); free(encoder); } @@ -108,17 +114,19 @@ const char *encoder_type_to_string(enum encoder_type_t type) { } void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { - enum encoder_type_t type = (encoder->run->cpu_forced ? ENCODER_TYPE_CPU : encoder->type); + enum encoder_type_t type = (ER(cpu_forced) ? ENCODER_TYPE_CPU : encoder->type); unsigned quality = encoder->quality; bool cpu_forced = false; - if ((dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) { + ER(n_workers) = min_u(encoder->n_workers, DR(n_buffers)); + + if ((DR(format) == V4L2_PIX_FMT_MJPEG || DR(format) == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) { LOG_INFO("Switching to HW encoder because the input format is (M)JPEG"); type = ENCODER_TYPE_HW; } if (type == ENCODER_TYPE_HW) { - if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) { + if (DR(format) != V4L2_PIX_FMT_MJPEG && DR(format) != V4L2_PIX_FMT_JPEG) { LOG_INFO("Switching to CPU encoder because the input format is not (M)JPEG"); goto use_cpu; } @@ -127,42 +135,42 @@ void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { quality = 0; } - dev->run->n_workers = 1; + ER(n_workers) = 1; } # ifdef WITH_OMX else if (type == ENCODER_TYPE_OMX) { for (unsigned index = 0; index < encoder->n_glitched_resolutions; ++index) { if ( - encoder->glitched_resolutions[index][0] == dev->run->width - && encoder->glitched_resolutions[index][1] == dev->run->height + encoder->glitched_resolutions[index][0] == DR(width) + && encoder->glitched_resolutions[index][1] == DR(height) ) { LOG_INFO("Switching to CPU encoder the resolution %ux%u marked as glitchy for OMX", - dev->run->width, dev->run->height); + DR(width), DR(height)); goto use_cpu; } } LOG_DEBUG("Preparing OMX encoder ..."); - if (dev->run->n_workers > OMX_MAX_ENCODERS) { + if (ER(n_workers) > OMX_MAX_ENCODERS) { LOG_INFO("OMX encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS); - dev->run->n_workers = OMX_MAX_ENCODERS; + ER(n_workers) = OMX_MAX_ENCODERS; } - if (encoder->run->omxs == NULL) { - A_CALLOC(encoder->run->omxs, OMX_MAX_ENCODERS); + if (ER(omxs) == NULL) { + A_CALLOC(ER(omxs), OMX_MAX_ENCODERS); } // Начинаем с нуля и доинициализируем на следующих заходах при необходимости - for (; encoder->run->n_omxs < dev->run->n_workers; ++encoder->run->n_omxs) { - if ((encoder->run->omxs[encoder->run->n_omxs] = omx_encoder_init()) == NULL) { + for (; ER(n_omxs) < ER(n_workers); ++ER(n_omxs)) { + if ((ER(omxs[ER(n_omxs)]) = omx_encoder_init()) == NULL) { LOG_ERROR("Can't initialize OMX encoder, falling back to CPU"); goto force_cpu; } } - for (unsigned index = 0; index < encoder->run->n_omxs; ++index) { - if (omx_encoder_prepare(encoder->run->omxs[index], dev, quality) < 0) { + for (unsigned index = 0; index < ER(n_omxs); ++index) { + if (omx_encoder_prepare(ER(omxs[index]), dev, quality) < 0) { LOG_ERROR("Can't prepare OMX encoder, falling back to CPU"); goto force_cpu; } @@ -194,51 +202,54 @@ void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { LOG_INFO("Using JPEG quality: %u%%", quality); } - A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->type = type; - encoder->run->quality = quality; + A_MUTEX_LOCK(&ER(mutex)); + ER(type) = type; + ER(quality) = quality; if (cpu_forced) { - encoder->run->cpu_forced = true; + ER(cpu_forced) = true; } - A_MUTEX_UNLOCK(&encoder->run->mutex); + A_MUTEX_UNLOCK(&ER(mutex)); } #pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic push -int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index) { +int encoder_compress_buffer( + struct encoder_t *encoder, struct device_t *dev, + unsigned worker_number, unsigned buf_index, + struct picture_t *picture) { #pragma GCC diagnostic pop - assert(encoder->run->type != ENCODER_TYPE_UNKNOWN); - assert(dev->run->hw_buffers[buf_index].used > 0); + assert(ER(type) != ENCODER_TYPE_UNKNOWN); + assert(DR(hw_buffers[buf_index].used) > 0); - dev->run->pictures[buf_index]->encode_begin_ts = get_now_monotonic(); + picture->grab_ts = DR(hw_buffers[buf_index].grab_ts); + picture->encode_begin_ts = get_now_monotonic(); - if (encoder->run->type == ENCODER_TYPE_CPU) { + if (ER(type) == ENCODER_TYPE_CPU) { LOG_VERBOSE("Compressing buffer %u using CPU", buf_index); - cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality); - } else if (encoder->run->type == ENCODER_TYPE_HW) { + cpu_encoder_compress_buffer(dev, buf_index, ER(quality), picture); + } else if (ER(type) == ENCODER_TYPE_HW) { LOG_VERBOSE("Compressing buffer %u using HW (just copying)", buf_index); - hw_encoder_compress_buffer(dev, buf_index); + hw_encoder_compress_buffer(dev, buf_index, picture); } # ifdef WITH_OMX - else if (encoder->run->type == ENCODER_TYPE_OMX) { + else if (ER(type) == ENCODER_TYPE_OMX) { LOG_VERBOSE("Compressing buffer %u using OMX", buf_index); - if (omx_encoder_compress_buffer(encoder->run->omxs[worker_number], dev, buf_index) < 0) { + if (omx_encoder_compress_buffer(ER(omxs[worker_number]), dev, buf_index, picture) < 0) { goto error; } } # endif # ifdef WITH_RAWSINK - else if (encoder->run->type == ENCODER_TYPE_NOOP) { + else if (ER(type) == ENCODER_TYPE_NOOP) { LOG_VERBOSE("Compressing buffer %u using NOOP (do nothing)", buf_index); } # endif + picture->encode_end_ts = get_now_monotonic(); - dev->run->pictures[buf_index]->encode_end_ts = get_now_monotonic(); - - dev->run->pictures[buf_index]->width = dev->run->width; - dev->run->pictures[buf_index]->height = dev->run->height; + picture->width = DR(width); + picture->height = DR(height); return 0; @@ -247,9 +258,12 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns // cppcheck-suppress unusedLabel error: LOG_INFO("Error while compressing buffer, falling back to CPU"); - A_MUTEX_LOCK(&encoder->run->mutex); - encoder->run->cpu_forced = true; - A_MUTEX_UNLOCK(&encoder->run->mutex); + A_MUTEX_LOCK(&ER(mutex)); + ER(cpu_forced) = true; + A_MUTEX_UNLOCK(&ER(mutex)); return -1; # pragma GCC diagnostic pop } + +#undef DR +#undef ER diff --git a/src/ustreamer/encoder.h b/src/ustreamer/encoder.h index 025b18b..8a05d4d 100644 --- a/src/ustreamer/encoder.h +++ b/src/ustreamer/encoder.h @@ -27,6 +27,7 @@ #include #include "device.h" +#include "picture.h" #ifdef WITH_OMX # include "encoders/omx/encoder.h" @@ -71,6 +72,8 @@ struct encoder_runtime_t { bool cpu_forced; pthread_mutex_t mutex; + unsigned n_workers; + # ifdef WITH_OMX unsigned n_omxs; struct omx_encoder_t **omxs; @@ -80,6 +83,7 @@ struct encoder_runtime_t { struct encoder_t { enum encoder_type_t type; unsigned quality; + unsigned n_workers; # ifdef WITH_OMX unsigned n_glitched_resolutions; unsigned glitched_resolutions[2][MAX_GLITCHED_RESOLUTIONS]; @@ -96,4 +100,8 @@ enum encoder_type_t encoder_parse_type(const char *str); const char *encoder_type_to_string(enum encoder_type_t type); void encoder_prepare(struct encoder_t *encoder, struct device_t *dev); -int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index); + +int encoder_compress_buffer( + struct encoder_t *encoder, struct device_t *dev, + unsigned worker_number, unsigned buf_index, + struct picture_t *picture); diff --git a/src/ustreamer/encoders/cpu/encoder.c b/src/ustreamer/encoders/cpu/encoder.c index cca5675..4231b6f 100644 --- a/src/ustreamer/encoders/cpu/encoder.c +++ b/src/ustreamer/encoders/cpu/encoder.c @@ -70,7 +70,10 @@ static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg); static void _jpeg_term_destination(j_compress_ptr jpeg); -void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned quality) { +void cpu_encoder_compress_buffer( + struct device_t *dev, unsigned index, unsigned quality, + struct picture_t *picture) { + // This function based on compress_image_to_jpeg() from mjpg-streamer struct jpeg_compress_struct jpeg; @@ -79,7 +82,7 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned jpeg.err = jpeg_std_error(&jpeg_error); jpeg_create_compress(&jpeg); - _jpeg_set_picture(&jpeg, dev->run->pictures[index]); + _jpeg_set_picture(&jpeg, picture); jpeg.image_width = dev->run->width; jpeg.image_height = dev->run->height; @@ -108,7 +111,7 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned jpeg_finish_compress(&jpeg); jpeg_destroy_compress(&jpeg); - assert(dev->run->pictures[index]->used > 0); + assert(picture->used > 0); } static void _jpeg_set_picture(j_compress_ptr jpeg, struct picture_t *picture) { diff --git a/src/ustreamer/encoders/cpu/encoder.h b/src/ustreamer/encoders/cpu/encoder.h index 2fef16f..7701296 100644 --- a/src/ustreamer/encoders/cpu/encoder.h +++ b/src/ustreamer/encoders/cpu/encoder.h @@ -23,6 +23,9 @@ #pragma once #include "../../device.h" +#include "../../picture.h" -void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned quality); +void cpu_encoder_compress_buffer( + struct device_t *dev, unsigned index, unsigned quality, + struct picture_t *picture); diff --git a/src/ustreamer/encoders/hw/encoder.c b/src/ustreamer/encoders/hw/encoder.c index f6b9ca8..6294189 100644 --- a/src/ustreamer/encoders/hw/encoder.c +++ b/src/ustreamer/encoders/hw/encoder.c @@ -63,11 +63,11 @@ int hw_encoder_prepare(struct device_t *dev, unsigned quality) { return 0; } -void hw_encoder_compress_buffer(struct device_t *dev, unsigned index) { +void hw_encoder_compress_buffer(struct device_t *dev, unsigned index, struct picture_t *picture) { if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) { assert(0 && "Unsupported input format for HW encoder"); } - _copy_plus_huffman(&dev->run->hw_buffers[index], dev->run->pictures[index]); + _copy_plus_huffman(&dev->run->hw_buffers[index], picture); } void _copy_plus_huffman(const struct hw_buffer_t *src, struct picture_t *dest) { diff --git a/src/ustreamer/encoders/hw/encoder.h b/src/ustreamer/encoders/hw/encoder.h index 2ee06e6..3268735 100644 --- a/src/ustreamer/encoders/hw/encoder.h +++ b/src/ustreamer/encoders/hw/encoder.h @@ -23,7 +23,8 @@ #pragma once #include "../../device.h" +#include "../../picture.h" int hw_encoder_prepare(struct device_t *dev, unsigned quality); -void hw_encoder_compress_buffer(struct device_t *dev, unsigned index); +void hw_encoder_compress_buffer(struct device_t *dev, unsigned index, struct picture_t *picture); diff --git a/src/ustreamer/encoders/omx/encoder.c b/src/ustreamer/encoders/omx/encoder.c index e58da3e..cd59f16 100644 --- a/src/ustreamer/encoders/omx/encoder.c +++ b/src/ustreamer/encoders/omx/encoder.c @@ -38,8 +38,8 @@ #include "../../../common/logging.h" #include "../../../common/tools.h" -#include "../../picture.h" #include "../../device.h" +#include "../../picture.h" #include "formatters.h" #include "component.h" @@ -176,7 +176,10 @@ int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigne return 0; } -int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index) { +int omx_encoder_compress_buffer( + struct omx_encoder_t *omx, struct device_t *dev, unsigned index, + struct picture_t *picture) { + # define HW(_next) dev->run->hw_buffers[index]._next # define IN(_next) omx->input_buffer->_next # define OUT(_next) omx->output_buffer->_next @@ -190,7 +193,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, return -1; } - dev->run->pictures[index]->used = 0; + picture->used = 0; omx->output_available = false; omx->input_required = true; @@ -202,7 +205,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, if (omx->output_available) { omx->output_available = false; - picture_append_data(dev->run->pictures[index], OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen)); + picture_append_data(picture, OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen)); if (OUT(nFlags) & OMX_BUFFERFLAG_ENDOFFRAME) { OUT(nFlags) = 0; diff --git a/src/ustreamer/encoders/omx/encoder.h b/src/ustreamer/encoders/omx/encoder.h index bc2ca99..9645524 100644 --- a/src/ustreamer/encoders/omx/encoder.h +++ b/src/ustreamer/encoders/omx/encoder.h @@ -28,6 +28,7 @@ #include #include "../../device.h" +#include "../../picture.h" #ifndef CFG_OMX_MAX_ENCODERS # define CFG_OMX_MAX_ENCODERS 3 // Raspberry Pi limitation @@ -55,4 +56,7 @@ struct omx_encoder_t *omx_encoder_init(void); void omx_encoder_destroy(struct omx_encoder_t *omx); int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality); -int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index); + +int omx_encoder_compress_buffer( + struct omx_encoder_t *omx, struct device_t *dev, unsigned index, + struct picture_t *picture); diff --git a/src/ustreamer/options.c b/src/ustreamer/options.c index 5e9c061..f34a7c5 100644 --- a/src/ustreamer/options.c +++ b/src/ustreamer/options.c @@ -360,7 +360,7 @@ int options_parse(struct options_t *options, struct device_t *dev, struct encode case _O_PERSISTENT: OPT_SET(dev->persistent, true); case _O_DV_TIMINGS: OPT_SET(dev->dv_timings, true); case _O_BUFFERS: OPT_NUMBER("--buffers", dev->n_buffers, 1, 32, 0); - case _O_WORKERS: OPT_NUMBER("--workers", dev->n_workers, 1, 32, 0); + case _O_WORKERS: OPT_NUMBER("--workers", encoder->n_workers, 1, 32, 0); case _O_QUALITY: OPT_NUMBER("--quality", encoder->quality, 1, 100, 0); case _O_ENCODER: OPT_PARSE("encoder type", encoder->type, encoder_parse_type, ENCODER_TYPE_UNKNOWN, ENCODER_TYPES_STR); # ifdef WITH_OMX @@ -610,7 +610,7 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s printf(" Each buffer may processed using an independent thread.\n"); printf(" Default: %u (the number of CPU cores (but not more than 4) + 1).\n\n", dev->n_buffers); printf(" -w|--workers ──────────────────── The number of worker threads but not more than buffers.\n"); - printf(" Default: %u (the number of CPU cores (but not more than 4)).\n\n", dev->n_workers); + printf(" Default: %u (the number of CPU cores (but not more than 4)).\n\n", encoder->n_workers); printf(" -q|--quality ──────────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n", encoder->quality); printf(" Note: If HW encoding is used (JPEG source format selected),\n"); printf(" this parameter attempts to configure the camera\n"); diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index fdba25b..0293b7d 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -53,6 +53,7 @@ struct _worker_t { atomic_bool *workers_stop; long double last_comp_time; + struct picture_t *picture; pthread_mutex_t has_job_mutex; unsigned buf_index; @@ -69,8 +70,7 @@ struct _worker_t { struct _worker_t *order_prev; struct _worker_t *order_next; - struct device_t *dev; - struct encoder_t *encoder; + struct stream_t *stream; }; struct _workers_pool_t { @@ -93,7 +93,7 @@ struct _workers_pool_t { static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream); static struct _workers_pool_t *_stream_init_one(struct stream_t *stream); -static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps); +static void _stream_expose_picture(struct stream_t *stream, struct picture_t *picture, unsigned captured_fps); static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream); static void _workers_pool_destroy(struct _workers_pool_t *pool); @@ -166,7 +166,7 @@ void stream_loop(struct stream_t *stream) { if (!ready_wr->job_failed) { if (ready_wr->job_timely) { - _stream_expose_picture(stream, ready_wr->buf_index, captured_fps); + _stream_expose_picture(stream, ready_wr->picture, captured_fps); LOG_PERF("##### Encoded picture exposed; worker=%u", ready_wr->number); } else { LOG_PERF("----- Encoded picture dropped; worker=%u", ready_wr->number); @@ -242,7 +242,7 @@ void stream_loop(struct stream_t *stream) { DEV(run->format), DEV(run->width), DEV(run->height), - DEV(run->pictures[buf_index]->grab_ts) + DEV(run->hw_buffers[buf_index].grab_ts) ); } # endif @@ -346,10 +346,10 @@ static struct _workers_pool_t *_stream_init_one(struct stream_t *stream) { return NULL; } -static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps) { +static void _stream_expose_picture(struct stream_t *stream, struct picture_t *picture, unsigned captured_fps) { A_MUTEX_LOCK(&stream->mutex); - picture_copy(stream->dev->run->pictures[buf_index], stream->picture); + picture_copy(picture, stream->picture); stream->online = true; stream->captured_fps = captured_fps; @@ -359,16 +359,17 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, } static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) { - struct _workers_pool_t *pool; - # define DEV(_next) stream->dev->_next # define RUN(_next) stream->dev->run->_next - LOG_INFO("Creating pool with %u workers ...", RUN(n_workers)); + struct _workers_pool_t *pool; + size_t picture_size = picture_get_generous_size(RUN(width), RUN(height)); + + LOG_INFO("Creating pool with %u workers ...", stream->encoder->run->n_workers); A_CALLOC(pool, 1); - pool->n_workers = RUN(n_workers); + pool->n_workers = stream->encoder->run->n_workers; A_CALLOC(pool->workers, pool->n_workers); A_MUTEX_INIT(&pool->free_workers_mutex); @@ -386,6 +387,9 @@ static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) { for (unsigned number = 0; number < pool->n_workers; ++number) { # define WR(_next) pool->workers[number]._next + WR(picture) = picture_init(); + picture_realloc_data(WR(picture), picture_size); + A_MUTEX_INIT(&WR(has_job_mutex)); atomic_init(&WR(has_job), false); A_COND_INIT(&WR(has_job_cond)); @@ -398,8 +402,7 @@ static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) { WR(free_workers) = &pool->free_workers; WR(free_workers_cond) = &pool->free_workers_cond; - WR(dev) = stream->dev; - WR(encoder) = stream->encoder; + WR(stream) = stream; A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number])); @@ -426,6 +429,8 @@ static void _workers_pool_destroy(struct _workers_pool_t *pool) { A_MUTEX_DESTROY(&WR(has_job_mutex)); A_COND_DESTROY(&WR(has_job_cond)); + picture_destroy(WR(picture)); + # undef WR } @@ -450,13 +455,19 @@ static void *_worker_thread(void *v_worker) { A_MUTEX_UNLOCK(&wr->has_job_mutex); if (!atomic_load(wr->workers_stop)) { -# define PIC(_next) wr->dev->run->pictures[wr->buf_index]->_next +# define PIC(_next) wr->picture->_next LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", wr->number, wr->buf_index); - wr->job_failed = (bool)encoder_compress_buffer(wr->encoder, wr->dev, wr->number, wr->buf_index); + wr->job_failed = (bool)encoder_compress_buffer( + wr->stream->encoder, + wr->stream->dev, + wr->number, + wr->buf_index, + wr->picture + ); - if (device_release_buffer(wr->dev, wr->buf_index) == 0) { + if (device_release_buffer(wr->stream->dev, wr->buf_index) == 0) { if (!wr->job_failed) { wr->job_start_ts = PIC(encode_begin_ts); wr->last_comp_time = PIC(encode_end_ts) - wr->job_start_ts;