refactoring

This commit is contained in:
Maxim Devaev 2024-03-01 08:11:37 +02:00
parent 8f0acb2176
commit e30520d9f3
3 changed files with 57 additions and 48 deletions

View File

@ -91,7 +91,9 @@ const char *us_encoder_type_to_string(us_encoder_type_e type) {
return _ENCODER_TYPES[0].name;
}
us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *dev) {
void us_encoder_open(us_encoder_s *enc, us_device_s *dev) {
assert(enc->run->pool == NULL);
# define DR(x_next) dev->run->x_next
us_encoder_type_e type = (_ER(cpu_forced) ? US_ENCODER_TYPE_CPU : enc->type);
@ -162,7 +164,7 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
: 0
);
return us_workers_pool_init(
enc->run->pool = us_workers_pool_init(
"JPEG", "jw", n_workers, desired_interval,
_worker_job_init, (void *)enc,
_worker_job_destroy,
@ -171,6 +173,11 @@ us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *
# undef DR
}
void us_encoder_close(us_encoder_s *enc) {
assert(enc->run->pool != NULL);
US_DELETE(enc->run->pool, us_workers_pool_destroy);
}
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality) {
US_MUTEX_LOCK(_ER(mutex));
*type = _ER(type);

View File

@ -63,6 +63,8 @@ typedef struct {
unsigned n_m2ms;
us_m2m_encoder_s **m2ms;
us_workers_pool_s *pool;
} us_encoder_runtime_s;
typedef struct {
@ -86,7 +88,7 @@ void us_encoder_destroy(us_encoder_s *enc);
us_encoder_type_e us_encoder_parse_type(const char *str);
const char *us_encoder_type_to_string(us_encoder_type_e type);
us_workers_pool_s *us_encoder_workers_pool_init(us_encoder_s *enc, us_device_s *dev);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality);
void us_encoder_open(us_encoder_s *enc, us_device_s *dev);
void us_encoder_close(us_encoder_s *enc);
int us_encoder_compress(us_encoder_s *enc, unsigned worker_number, us_frame_s *src, us_frame_s *dest);
void us_encoder_get_runtime_params(us_encoder_s *enc, us_encoder_type_e *type, unsigned *quality);

View File

@ -23,7 +23,8 @@
#include "stream.h"
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream);
static bool _stream_slowdown(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
@ -79,7 +80,7 @@ void us_stream_loop(us_stream_s *stream) {
_RUN(h264) = us_h264_stream_init(stream->h264_sink, stream->h264_m2m_path, stream->h264_bitrate, stream->h264_gop);
}
for (us_workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) {
while (!_stream_init_loop(stream)) {
long double grab_after = 0;
unsigned fluency_passed = 0;
unsigned captured_fps_accum = 0;
@ -91,37 +92,25 @@ void us_stream_loop(us_stream_s *stream) {
US_SEP_DEBUG('-');
US_LOG_DEBUG("Waiting for worker ...");
us_worker_s *const ready_wr = us_workers_pool_wait(pool);
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
us_encoder_job_s *const ready_job = (us_encoder_job_s *)(ready_wr->job);
if (ready_job->hw != NULL) {
if (us_device_release_buffer(stream->dev, ready_job->hw) < 0) {
ready_wr->job_failed = true;
}
ready_job->hw = NULL;
if (!ready_wr->job_failed) {
if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
}
} else {
break;
}
}
bool h264_force_key = false;
if (stream->slowdown) {
unsigned slc = 0;
for (; slc < 10 && !atomic_load(&_RUN(stop)) && !us_stream_has_clients(stream); ++slc) {
usleep(100000);
if (ready_wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
}
h264_force_key = (slc == 10);
}
const bool h264_force_key = _stream_slowdown(stream);
if (atomic_load(&_RUN(stop))) {
break;
}
@ -170,12 +159,12 @@ void us_stream_loop(us_stream_s *stream) {
}
captured_fps_accum += 1;
const long double fluency_delay = us_workers_pool_get_fluency_delay(pool, ready_wr);
const long double fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
grab_after = now + fluency_delay;
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
ready_job->hw = hw;
us_workers_pool_assign(pool, ready_wr);
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
_SINK_PUT(raw_sink, &hw->raw);
@ -192,7 +181,7 @@ void us_stream_loop(us_stream_s *stream) {
}
}
us_workers_pool_destroy(pool);
us_encoder_close(stream->enc);
us_device_close(stream->dev);
# ifdef WITH_GPIO
@ -216,12 +205,35 @@ bool us_stream_has_clients(us_stream_s *stream) {
);
}
static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
static bool _stream_slowdown(us_stream_s *stream) {
if (stream->slowdown) {
unsigned count = 0;
while (count < 10 && !atomic_load(&_RUN(stop)) && !us_stream_has_clients(stream)) {
usleep(100000);
++count;
}
return (count >= 10);
}
return false;
}
static int _stream_init_loop(us_stream_s *stream) {
int access_errno = 0;
while (!atomic_load(&_RUN(stop))) {
unsigned width = stream->dev->run->width;
unsigned height = stream->dev->run->height;
if (width == 0 || height == 0) {
width = stream->dev->width;
height = stream->dev->height;
}
us_blank_draw(stream->run->blank, "< NO SIGNAL >", width, height);
atomic_store(&stream->run->captured_fps, 0);
_stream_expose_frame(stream, NULL);
_SINK_PUT(raw_sink, stream->run->blank->raw);
_H264_PUT(stream->run->blank->raw, false);
if (access(stream->dev->path, R_OK|W_OK) < 0) {
if (access_errno != errno) {
US_SEP_INFO('=');
@ -241,14 +253,15 @@ static us_workers_pool_s *_stream_init_loop(us_stream_s *stream) {
|| _RUN(h264) != NULL
);
if (us_device_open(stream->dev) == 0) {
return us_encoder_workers_pool_init(stream->enc, stream->dev);
us_encoder_open(stream->enc, stream->dev);
return 0;
}
US_LOG_INFO("Sleeping %u seconds before new stream init ...", stream->error_delay);
sleep_and_retry:
sleep(stream->error_delay);
}
return NULL;
return -1;
}
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
@ -263,14 +276,6 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
US_LOG_DEBUG("Exposed ALIVE video frame");
} else {
unsigned width = stream->dev->run->width;
unsigned height = stream->dev->run->height;
if (width == 0 || height == 0) {
width = stream->dev->width;
height = stream->dev->height;
}
us_blank_draw(blank, "< NO SIGNAL >", width, height);
if (run->last_online) { // Если переходим из online в offline
if (stream->last_as_blank < 0) { // Если last_as_blank выключен, просто покажем старую картинку
new = blank->jpeg;
@ -320,9 +325,4 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
us_ring_producer_release(run->http_jpeg_ring, ri);
_SINK_PUT(sink, (frame != NULL ? frame : blank->jpeg));
if (frame == NULL) {
_SINK_PUT(raw_sink, blank->raw);
_H264_PUT(blank->raw, false);
}
}