From 9c679f6d5ded5e656b40696362e1ac06ea6e4ca4 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Fri, 8 Jan 2021 03:33:14 +0300 Subject: [PATCH] refactoring --- src/ustreamer/encoder.c | 90 ++++++++++++++++++++++++++++++++--------- src/ustreamer/encoder.h | 12 ++++-- src/ustreamer/stream.c | 70 +------------------------------- 3 files changed, 81 insertions(+), 91 deletions(-) diff --git a/src/ustreamer/encoder.c b/src/ustreamer/encoder.c index cd61126..f0d94a4 100644 --- a/src/ustreamer/encoder.c +++ b/src/ustreamer/encoder.c @@ -36,8 +36,12 @@ static const struct { }; +static void *_worker_job_init(worker_s *wr, void *v_enc); +static void _worker_job_destroy(void *v_job); +static bool _worker_run_job(worker_s *wr); + + #define ER(_next) enc->run->_next -#define DR(_next) dev->run->_next encoder_s *encoder_init(void) { @@ -89,13 +93,14 @@ const char *encoder_type_to_string(encoder_type_e type) { return _ENCODER_TYPES[0].name; } -void encoder_prepare(encoder_s *enc, device_s *dev) { +workers_pool_s *encoder_workers_pool_init(encoder_s *enc, device_s *dev) { +# define DR(_next) dev->run->_next + encoder_type_e type = (ER(cpu_forced) ? ENCODER_TYPE_CPU : enc->type); unsigned quality = dev->jpeg_quality; + unsigned n_workers = min_u(enc->n_workers, DR(n_bufs)); bool cpu_forced = false; - ER(n_workers) = min_u(enc->n_workers, DR(n_bufs)); - if (is_jpeg(DR(format)) && type != ENCODER_TYPE_HW) { LOG_INFO("Switching to HW encoder: the input is (M)JPEG ..."); type = ENCODER_TYPE_HW; @@ -107,7 +112,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) { goto use_cpu; } quality = DR(jpeg_quality); - ER(n_workers) = 1; + n_workers = 1; } # ifdef WITH_OMX else if (type == ENCODER_TYPE_OMX) { @@ -118,9 +123,9 @@ void encoder_prepare(encoder_s *enc, device_s *dev) { LOG_DEBUG("Preparing OMX encoder ..."); - if (ER(n_workers) > OMX_MAX_ENCODERS) { + if (n_workers > OMX_MAX_ENCODERS) { LOG_INFO("OMX encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS); - ER(n_workers) = OMX_MAX_ENCODERS; + n_workers = OMX_MAX_ENCODERS; } if (ER(omxs) == NULL) { @@ -128,7 +133,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) { } // Начинаем с нуля и доинициализируем на следующих заходах при необходимости - for (; ER(n_omxs) < ER(n_workers); ++ER(n_omxs)) { + for (; ER(n_omxs) < n_workers; ++ER(n_omxs)) { if ((ER(omxs[ER(n_omxs)]) = omx_encoder_init()) == NULL) { LOG_ERROR("Can't initialize OMX encoder, falling back to CPU"); goto force_cpu; @@ -153,7 +158,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) { } # endif else if (type == ENCODER_TYPE_NOOP) { - ER(n_workers) = 1; + n_workers = 1; quality = 0; } @@ -185,6 +190,19 @@ void encoder_prepare(encoder_s *enc, device_s *dev) { ER(cpu_forced) = true; } A_MUTEX_UNLOCK(&ER(mutex)); + + long double desired_interval = 0; + if (dev->desired_fps > 0 && (dev->desired_fps < dev->run->hw_fps || dev->run->hw_fps == 0)) { + desired_interval = (long double)1 / dev->desired_fps; + } + + return workers_pool_init( + "JPEG", "jw", n_workers, desired_interval, + _worker_job_init, (void *)enc, + _worker_job_destroy, + _worker_run_job); + +# undef DR } void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned *quality) { @@ -194,10 +212,36 @@ void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned * A_MUTEX_UNLOCK(&ER(mutex)); } -#pragma GCC diagnostic ignored "-Wunused-parameter" -#pragma GCC diagnostic push -int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame_s *dest) { -#pragma GCC diagnostic pop +static void *_worker_job_init(worker_s *wr, void *v_enc) { + encoder_job_s *job; + A_CALLOC(job, 1); + job->enc = (encoder_s *)v_enc; + + const size_t dest_role_len = strlen(wr->name) + 16; + A_CALLOC(job->dest_role, dest_role_len); + snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name); + job->dest = frame_init(job->dest_role); + + return (void *)job; +} + +static void _worker_job_destroy(void *v_job) { + encoder_job_s *job = (encoder_job_s *)v_job; + frame_destroy(job->dest); + free(job->dest_role); + free(job); +} + +#undef ER + +static bool _worker_run_job(worker_s *wr) { + encoder_job_s *job = (encoder_job_s *)wr->job; + frame_s *src = &job->hw->raw; + frame_s *dest = job->dest; + +# define ER(_next) job->enc->run->_next + + LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->hw->buf_info.index); assert(ER(type) != ENCODER_TYPE_UNKNOWN); assert(src->used > 0); @@ -218,7 +262,7 @@ int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame # ifdef WITH_OMX else if (ER(type) == ENCODER_TYPE_OMX) { LOG_VERBOSE("Compressing buffer using OMX"); - if (omx_encoder_compress(ER(omxs[worker_number]), src, dest) < 0) { + if (omx_encoder_compress(ER(omxs[wr->number]), src, dest) < 0) { goto error; } } @@ -229,17 +273,23 @@ int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame } dest->encode_end_ts = get_now_monotonic(); - return 0; + LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u", + job->dest->used, + job->dest->encode_end_ts - job->dest->encode_begin_ts, + wr->name, + job->hw->buf_info.index); + + return true; # ifdef WITH_OMX error: - LOG_INFO("Error while compressing buffer, falling back to CPU"); + LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf_info.index); + LOG_ERROR("Error while compressing buffer, falling back to CPU"); A_MUTEX_LOCK(&ER(mutex)); ER(cpu_forced) = true; A_MUTEX_UNLOCK(&ER(mutex)); - return -1; + return false; # endif -} -#undef DR -#undef ER +# undef ER +} diff --git a/src/ustreamer/encoder.h b/src/ustreamer/encoder.h index 5ce14fd..0395a92 100644 --- a/src/ustreamer/encoder.h +++ b/src/ustreamer/encoder.h @@ -36,6 +36,7 @@ #include "../libs/frame.h" #include "device.h" +#include "workers.h" #include "encoders/cpu/encoder.h" #include "encoders/hw/encoder.h" @@ -69,8 +70,6 @@ typedef struct { bool cpu_forced; pthread_mutex_t mutex; - unsigned n_workers; - # ifdef WITH_OMX unsigned n_omxs; omx_encoder_s **omxs; @@ -84,6 +83,13 @@ typedef struct { encoder_runtime_s *run; } encoder_s; +typedef struct { + encoder_s *enc; + hw_buffer_s *hw; + char *dest_role; + frame_s *dest; +} encoder_job_s; + encoder_s *encoder_init(void); void encoder_destroy(encoder_s *enc); @@ -91,7 +97,7 @@ void encoder_destroy(encoder_s *enc); encoder_type_e encoder_parse_type(const char *str); const char *encoder_type_to_string(encoder_type_e type); -void encoder_prepare(encoder_s *enc, device_s *dev); +workers_pool_s *encoder_workers_pool_init(encoder_s *enc, device_s *dev); void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned *quality); int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame_s *dest); diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 28f55ad..ce6d573 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -23,22 +23,10 @@ #include "stream.h" -typedef struct { - encoder_s *enc; - hw_buffer_s *hw; - char *dest_role; - frame_s *dest; -} _job_s; - - static workers_pool_s *_stream_init_loop(stream_s *stream); static workers_pool_s *_stream_init_one(stream_s *stream); static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned captured_fps); -static void *_worker_job_init(worker_s *wr, void *v_enc); -static void _worker_job_destroy(void *v_job); -static bool _worker_run_job(worker_s *wr); - #ifdef WITH_OMX static h264_stream_s *_h264_stream_init(memsink_s *sink, unsigned bitrate, unsigned gop); static void _h264_stream_destroy(h264_stream_s *h264); @@ -105,7 +93,7 @@ void stream_loop(stream_s *stream) { LOG_DEBUG("Waiting for worker ..."); worker_s *ready_wr = workers_pool_wait(pool); - _job_s *ready_job = (_job_s *)(ready_wr->job); + encoder_job_s *ready_job = (encoder_job_s *)(ready_wr->job); if (ready_job->hw) { if (device_release_buffer(stream->dev, ready_job->hw) < 0) { @@ -290,24 +278,7 @@ static workers_pool_s *_stream_init_one(stream_s *stream) { if (device_switch_capturing(stream->dev, true) < 0) { goto error; } - - encoder_prepare(stream->enc, stream->dev); - -# define DEV(_next) stream->dev->_next -# define RUN(_next) stream->dev->run->_next - long double desired_interval = 0; - if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) { - desired_interval = (long double)1 / DEV(desired_fps); - } -# undef DEV -# undef RUN - - return workers_pool_init( - "JPEG", "jw", stream->enc->run->n_workers, desired_interval, - _worker_job_init, (void *)stream->enc, - _worker_job_destroy, - _worker_run_job); - + return encoder_workers_pool_init(stream->enc, stream->dev); error: device_close(stream->dev); return NULL; @@ -370,43 +341,6 @@ static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned capt # undef VID } -static void *_worker_job_init(worker_s *wr, void *v_enc) { - _job_s *job; - A_CALLOC(job, 1); - job->enc = (encoder_s *)v_enc; - - const size_t dest_role_len = strlen(wr->name) + 16; - A_CALLOC(job->dest_role, dest_role_len); - snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name); - job->dest = frame_init(job->dest_role); - - return (void *)job; -} - -static void _worker_job_destroy(void *v_job) { - _job_s *job = (_job_s *)v_job; - frame_destroy(job->dest); - free(job->dest_role); - free(job); -} - -static bool _worker_run_job(worker_s *wr) { - _job_s *job = (_job_s *)wr->job; - - LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->hw->buf_info.index); - bool ok = !encoder_compress(job->enc, wr->number, &job->hw->raw, job->dest); - if (ok) { - LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u", - job->dest->used, - job->dest->encode_end_ts - job->dest->encode_begin_ts, - wr->name, - job->hw->buf_info.index); - } else { - LOG_VERBOSE("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf_info.index); - } - return ok; -} - #ifdef WITH_OMX static h264_stream_s *_h264_stream_init(memsink_s *sink, unsigned bitrate, unsigned gop) { h264_stream_s *h264;