refactoring

This commit is contained in:
Devaev Maxim
2021-01-08 03:33:14 +03:00
parent f14f49dc92
commit 9c679f6d5d
3 changed files with 81 additions and 91 deletions

View File

@@ -36,8 +36,12 @@ static const struct {
};
static void *_worker_job_init(worker_s *wr, void *v_enc);
static void _worker_job_destroy(void *v_job);
static bool _worker_run_job(worker_s *wr);
#define ER(_next) enc->run->_next
#define DR(_next) dev->run->_next
encoder_s *encoder_init(void) {
@@ -89,13 +93,14 @@ const char *encoder_type_to_string(encoder_type_e type) {
return _ENCODER_TYPES[0].name;
}
void encoder_prepare(encoder_s *enc, device_s *dev) {
workers_pool_s *encoder_workers_pool_init(encoder_s *enc, device_s *dev) {
# define DR(_next) dev->run->_next
encoder_type_e type = (ER(cpu_forced) ? ENCODER_TYPE_CPU : enc->type);
unsigned quality = dev->jpeg_quality;
unsigned n_workers = min_u(enc->n_workers, DR(n_bufs));
bool cpu_forced = false;
ER(n_workers) = min_u(enc->n_workers, DR(n_bufs));
if (is_jpeg(DR(format)) && type != ENCODER_TYPE_HW) {
LOG_INFO("Switching to HW encoder: the input is (M)JPEG ...");
type = ENCODER_TYPE_HW;
@@ -107,7 +112,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) {
goto use_cpu;
}
quality = DR(jpeg_quality);
ER(n_workers) = 1;
n_workers = 1;
}
# ifdef WITH_OMX
else if (type == ENCODER_TYPE_OMX) {
@@ -118,9 +123,9 @@ void encoder_prepare(encoder_s *enc, device_s *dev) {
LOG_DEBUG("Preparing OMX encoder ...");
if (ER(n_workers) > OMX_MAX_ENCODERS) {
if (n_workers > OMX_MAX_ENCODERS) {
LOG_INFO("OMX encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS);
ER(n_workers) = OMX_MAX_ENCODERS;
n_workers = OMX_MAX_ENCODERS;
}
if (ER(omxs) == NULL) {
@@ -128,7 +133,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) {
}
// Начинаем с нуля и доинициализируем на следующих заходах при необходимости
for (; ER(n_omxs) < ER(n_workers); ++ER(n_omxs)) {
for (; ER(n_omxs) < n_workers; ++ER(n_omxs)) {
if ((ER(omxs[ER(n_omxs)]) = omx_encoder_init()) == NULL) {
LOG_ERROR("Can't initialize OMX encoder, falling back to CPU");
goto force_cpu;
@@ -153,7 +158,7 @@ void encoder_prepare(encoder_s *enc, device_s *dev) {
}
# endif
else if (type == ENCODER_TYPE_NOOP) {
ER(n_workers) = 1;
n_workers = 1;
quality = 0;
}
@@ -185,6 +190,19 @@ void encoder_prepare(encoder_s *enc, device_s *dev) {
ER(cpu_forced) = true;
}
A_MUTEX_UNLOCK(&ER(mutex));
long double desired_interval = 0;
if (dev->desired_fps > 0 && (dev->desired_fps < dev->run->hw_fps || dev->run->hw_fps == 0)) {
desired_interval = (long double)1 / dev->desired_fps;
}
return workers_pool_init(
"JPEG", "jw", n_workers, desired_interval,
_worker_job_init, (void *)enc,
_worker_job_destroy,
_worker_run_job);
# undef DR
}
void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned *quality) {
@@ -194,10 +212,36 @@ void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned *
A_MUTEX_UNLOCK(&ER(mutex));
}
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic push
int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame_s *dest) {
#pragma GCC diagnostic pop
static void *_worker_job_init(worker_s *wr, void *v_enc) {
encoder_job_s *job;
A_CALLOC(job, 1);
job->enc = (encoder_s *)v_enc;
const size_t dest_role_len = strlen(wr->name) + 16;
A_CALLOC(job->dest_role, dest_role_len);
snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name);
job->dest = frame_init(job->dest_role);
return (void *)job;
}
static void _worker_job_destroy(void *v_job) {
encoder_job_s *job = (encoder_job_s *)v_job;
frame_destroy(job->dest);
free(job->dest_role);
free(job);
}
#undef ER
static bool _worker_run_job(worker_s *wr) {
encoder_job_s *job = (encoder_job_s *)wr->job;
frame_s *src = &job->hw->raw;
frame_s *dest = job->dest;
# define ER(_next) job->enc->run->_next
LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->hw->buf_info.index);
assert(ER(type) != ENCODER_TYPE_UNKNOWN);
assert(src->used > 0);
@@ -218,7 +262,7 @@ int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame
# ifdef WITH_OMX
else if (ER(type) == ENCODER_TYPE_OMX) {
LOG_VERBOSE("Compressing buffer using OMX");
if (omx_encoder_compress(ER(omxs[worker_number]), src, dest) < 0) {
if (omx_encoder_compress(ER(omxs[wr->number]), src, dest) < 0) {
goto error;
}
}
@@ -229,17 +273,23 @@ int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame
}
dest->encode_end_ts = get_now_monotonic();
return 0;
LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u",
job->dest->used,
job->dest->encode_end_ts - job->dest->encode_begin_ts,
wr->name,
job->hw->buf_info.index);
return true;
# ifdef WITH_OMX
error:
LOG_INFO("Error while compressing buffer, falling back to CPU");
LOG_ERROR("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf_info.index);
LOG_ERROR("Error while compressing buffer, falling back to CPU");
A_MUTEX_LOCK(&ER(mutex));
ER(cpu_forced) = true;
A_MUTEX_UNLOCK(&ER(mutex));
return -1;
return false;
# endif
}
#undef DR
#undef ER
# undef ER
}

View File

@@ -36,6 +36,7 @@
#include "../libs/frame.h"
#include "device.h"
#include "workers.h"
#include "encoders/cpu/encoder.h"
#include "encoders/hw/encoder.h"
@@ -69,8 +70,6 @@ typedef struct {
bool cpu_forced;
pthread_mutex_t mutex;
unsigned n_workers;
# ifdef WITH_OMX
unsigned n_omxs;
omx_encoder_s **omxs;
@@ -84,6 +83,13 @@ typedef struct {
encoder_runtime_s *run;
} encoder_s;
typedef struct {
encoder_s *enc;
hw_buffer_s *hw;
char *dest_role;
frame_s *dest;
} encoder_job_s;
encoder_s *encoder_init(void);
void encoder_destroy(encoder_s *enc);
@@ -91,7 +97,7 @@ void encoder_destroy(encoder_s *enc);
encoder_type_e encoder_parse_type(const char *str);
const char *encoder_type_to_string(encoder_type_e type);
void encoder_prepare(encoder_s *enc, device_s *dev);
workers_pool_s *encoder_workers_pool_init(encoder_s *enc, device_s *dev);
void encoder_get_runtime_params(encoder_s *enc, encoder_type_e *type, unsigned *quality);
int encoder_compress(encoder_s *enc, unsigned worker_number, frame_s *src, frame_s *dest);

View File

@@ -23,22 +23,10 @@
#include "stream.h"
typedef struct {
encoder_s *enc;
hw_buffer_s *hw;
char *dest_role;
frame_s *dest;
} _job_s;
static workers_pool_s *_stream_init_loop(stream_s *stream);
static workers_pool_s *_stream_init_one(stream_s *stream);
static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned captured_fps);
static void *_worker_job_init(worker_s *wr, void *v_enc);
static void _worker_job_destroy(void *v_job);
static bool _worker_run_job(worker_s *wr);
#ifdef WITH_OMX
static h264_stream_s *_h264_stream_init(memsink_s *sink, unsigned bitrate, unsigned gop);
static void _h264_stream_destroy(h264_stream_s *h264);
@@ -105,7 +93,7 @@ void stream_loop(stream_s *stream) {
LOG_DEBUG("Waiting for worker ...");
worker_s *ready_wr = workers_pool_wait(pool);
_job_s *ready_job = (_job_s *)(ready_wr->job);
encoder_job_s *ready_job = (encoder_job_s *)(ready_wr->job);
if (ready_job->hw) {
if (device_release_buffer(stream->dev, ready_job->hw) < 0) {
@@ -290,24 +278,7 @@ static workers_pool_s *_stream_init_one(stream_s *stream) {
if (device_switch_capturing(stream->dev, true) < 0) {
goto error;
}
encoder_prepare(stream->enc, stream->dev);
# define DEV(_next) stream->dev->_next
# define RUN(_next) stream->dev->run->_next
long double desired_interval = 0;
if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) {
desired_interval = (long double)1 / DEV(desired_fps);
}
# undef DEV
# undef RUN
return workers_pool_init(
"JPEG", "jw", stream->enc->run->n_workers, desired_interval,
_worker_job_init, (void *)stream->enc,
_worker_job_destroy,
_worker_run_job);
return encoder_workers_pool_init(stream->enc, stream->dev);
error:
device_close(stream->dev);
return NULL;
@@ -370,43 +341,6 @@ static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned capt
# undef VID
}
static void *_worker_job_init(worker_s *wr, void *v_enc) {
_job_s *job;
A_CALLOC(job, 1);
job->enc = (encoder_s *)v_enc;
const size_t dest_role_len = strlen(wr->name) + 16;
A_CALLOC(job->dest_role, dest_role_len);
snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name);
job->dest = frame_init(job->dest_role);
return (void *)job;
}
static void _worker_job_destroy(void *v_job) {
_job_s *job = (_job_s *)v_job;
frame_destroy(job->dest);
free(job->dest_role);
free(job);
}
static bool _worker_run_job(worker_s *wr) {
_job_s *job = (_job_s *)wr->job;
LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->hw->buf_info.index);
bool ok = !encoder_compress(job->enc, wr->number, &job->hw->raw, job->dest);
if (ok) {
LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u",
job->dest->used,
job->dest->encode_end_ts - job->dest->encode_begin_ts,
wr->name,
job->hw->buf_info.index);
} else {
LOG_VERBOSE("Compression failed: worker=%s, buffer=%u", wr->name, job->hw->buf_info.index);
}
return ok;
}
#ifdef WITH_OMX
static h264_stream_s *_h264_stream_init(memsink_s *sink, unsigned bitrate, unsigned gop) {
h264_stream_s *h264;