refactoring

This commit is contained in:
Devaev Maxim
2020-12-08 07:28:35 +03:00
parent 7bb0aae71e
commit 5c48faa832
13 changed files with 146 additions and 121 deletions

View File

@@ -25,6 +25,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <errno.h>
#include <math.h>
#include <time.h>
@@ -95,3 +96,11 @@ INLINE long double get_now_real(void) {
get_now(CLOCK_REALTIME, &sec, &msec);
return (long double)sec + ((long double)msec) / 1000;
}
INLINE unsigned get_cores_available(void) {
long cores_sysconf;
cores_sysconf = sysconf(_SC_NPROCESSORS_ONLN);
cores_sysconf = (cores_sysconf < 0 ? 0 : cores_sysconf);
return max_u(min_u(cores_sysconf, 4), 1);
}

View File

@@ -44,7 +44,6 @@
#include "../common/threading.h"
#include "xioctl.h"
#include "picture.h"
static const struct {
@@ -87,7 +86,6 @@ static int _device_open_io_method(struct device_t *dev);
static int _device_open_io_method_mmap(struct device_t *dev);
static int _device_open_io_method_userptr(struct device_t *dev);
static int _device_open_queue_buffers(struct device_t *dev);
static void _device_open_alloc_picbufs(struct device_t *dev);
static int _device_apply_resolution(struct device_t *dev, unsigned width, unsigned height);
static void _device_apply_controls(struct device_t *dev);
@@ -111,12 +109,6 @@ static const char *_io_method_to_string_supported(enum v4l2_memory io_method);
struct device_t *device_init(void) {
struct device_runtime_t *run;
struct device_t *dev;
long cores_sysconf;
unsigned cores_available;
cores_sysconf = sysconf(_SC_NPROCESSORS_ONLN);
cores_sysconf = (cores_sysconf < 0 ? 0 : cores_sysconf);
cores_available = max_u(min_u(cores_sysconf, 4), 1);
A_CALLOC(run, 1);
run->fd = -1;
@@ -128,15 +120,18 @@ struct device_t *device_init(void) {
dev->format = V4L2_PIX_FMT_YUYV;
dev->standard = V4L2_STD_UNKNOWN;
dev->io_method = V4L2_MEMORY_MMAP;
dev->n_buffers = cores_available + 1;
dev->n_workers = min_u(cores_available, dev->n_buffers);
dev->n_buffers = get_cores_available() + 1;
dev->min_frame_size = 128;
dev->timeout = 1;
dev->error_delay = 1; // XXX: not device param
# ifdef WITH_RAWSINK // XXX: not device param
// FIXME: Not device params
dev->error_delay = 1;
# ifdef WITH_RAWSINK
dev->rawsink_name = "";
dev->rawsink_mode = 0660;
# endif
// end-of-fixme
dev->run = run;
return dev;
}
@@ -196,11 +191,8 @@ int device_open(struct device_t *dev) {
if (_device_open_queue_buffers(dev) < 0) {
goto error;
}
_device_open_alloc_picbufs(dev);
_device_apply_controls(dev);
RUN(n_workers) = min_u(RUN(n_buffers), dev->n_workers);
LOG_DEBUG("Device fd=%d initialized", RUN(fd));
return 0;
@@ -211,16 +203,6 @@ int device_open(struct device_t *dev) {
void device_close(struct device_t *dev) {
RUN(persistent_timeout_reported) = false;
RUN(n_workers) = 0;
if (RUN(pictures)) {
LOG_DEBUG("Releasing picture buffers ...");
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
picture_destroy(RUN(pictures[index]));
}
free(RUN(pictures));
RUN(pictures) = NULL;
}
if (RUN(hw_buffers)) {
LOG_DEBUG("Releasing device buffers ...");
@@ -373,7 +355,7 @@ int device_grab_buffer(struct device_t *dev) {
HW(used) = buf_info.bytesused;
memcpy(&HW(buf_info), &buf_info, sizeof(struct v4l2_buffer));
RUN(pictures)[buf_info.index]->grab_ts = get_now_monotonic();
HW(grab_ts) = get_now_monotonic();
# undef HW
return buf_info.index;
@@ -751,19 +733,6 @@ static int _device_open_queue_buffers(struct device_t *dev) {
return 0;
}
static void _device_open_alloc_picbufs(struct device_t *dev) {
size_t picture_size = picture_get_generous_size(RUN(width), RUN(height));
LOG_DEBUG("Allocating picture buffers ...");
A_CALLOC(RUN(pictures), RUN(n_buffers));
for (unsigned index = 0; index < RUN(n_buffers); ++index) {
RUN(pictures)[index] = picture_init();
LOG_DEBUG("Pre-allocating picture buffer %u sized %zu bytes... ", index, picture_size);
picture_realloc_data(RUN(pictures)[index], picture_size);
}
}
static int _device_apply_resolution(struct device_t *dev, unsigned width, unsigned height) {
// Тут VIDEO_MIN_* не используются из-за странностей минимального разрешения при отсутствии сигнала
// у некоторых устройств, например Auvidea B101

View File

@@ -32,8 +32,6 @@
# include "../rawsink/rawsink.h"
#endif
#include "picture.h"
#define VIDEO_MIN_WIDTH ((unsigned)160)
#define VIDEO_MAX_WIDTH ((unsigned)10240)
@@ -57,6 +55,8 @@ struct hw_buffer_t {
unsigned char *data;
size_t used;
size_t allocated;
long double grab_ts;
struct v4l2_buffer buf_info;
pthread_mutex_t grabbed_mutex;
@@ -71,9 +71,7 @@ struct device_runtime_t {
unsigned hw_fps;
size_t raw_size;
unsigned n_buffers;
unsigned n_workers;
struct hw_buffer_t *hw_buffers;
struct picture_t **pictures;
bool capturing;
bool persistent_timeout_reported;
};
@@ -115,17 +113,19 @@ struct device_t {
enum v4l2_memory io_method;
bool dv_timings;
unsigned n_buffers;
unsigned n_workers;
unsigned desired_fps;
size_t min_frame_size;
bool persistent;
unsigned timeout;
unsigned error_delay; // XXX: not device param
# ifdef WITH_RAWSINK // XXX: not device params
// FIXME: Not device params
unsigned error_delay;
# ifdef WITH_RAWSINK
char *rawsink_name;
mode_t rawsink_mode;
bool rawsink_rm;
# endif
// end-of-fixme
struct controls_t ctl;

View File

@@ -34,6 +34,7 @@
#include "../common/logging.h"
#include "device.h"
#include "picture.h"
#include "encoders/cpu/encoder.h"
#include "encoders/hw/encoder.h"
@@ -57,6 +58,10 @@ static const struct {
};
#define ER(_next) encoder->run->_next
#define DR(_next) dev->run->_next
struct encoder_t *encoder_init(void) {
struct encoder_runtime_t *run;
struct encoder_t *encoder;
@@ -69,22 +74,23 @@ struct encoder_t *encoder_init(void) {
A_CALLOC(encoder, 1);
encoder->type = run->type;
encoder->quality = run->quality;
encoder->n_workers = get_cores_available();
encoder->run = run;
return encoder;
}
void encoder_destroy(struct encoder_t *encoder) {
# ifdef WITH_OMX
if (encoder->run->omxs) {
for (unsigned index = 0; index < encoder->run->n_omxs; ++index) {
if (encoder->run->omxs[index]) {
omx_encoder_destroy(encoder->run->omxs[index]);
if (ER(omxs)) {
for (unsigned index = 0; index < ER(n_omxs); ++index) {
if (ER(omxs[index])) {
omx_encoder_destroy(ER(omxs[index]));
}
}
free(encoder->run->omxs);
free(ER(omxs));
}
# endif
A_MUTEX_DESTROY(&encoder->run->mutex);
A_MUTEX_DESTROY(&ER(mutex));
free(encoder->run);
free(encoder);
}
@@ -108,17 +114,19 @@ const char *encoder_type_to_string(enum encoder_type_t type) {
}
void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) {
enum encoder_type_t type = (encoder->run->cpu_forced ? ENCODER_TYPE_CPU : encoder->type);
enum encoder_type_t type = (ER(cpu_forced) ? ENCODER_TYPE_CPU : encoder->type);
unsigned quality = encoder->quality;
bool cpu_forced = false;
if ((dev->run->format == V4L2_PIX_FMT_MJPEG || dev->run->format == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) {
ER(n_workers) = min_u(encoder->n_workers, DR(n_buffers));
if ((DR(format) == V4L2_PIX_FMT_MJPEG || DR(format) == V4L2_PIX_FMT_JPEG) && type != ENCODER_TYPE_HW) {
LOG_INFO("Switching to HW encoder because the input format is (M)JPEG");
type = ENCODER_TYPE_HW;
}
if (type == ENCODER_TYPE_HW) {
if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) {
if (DR(format) != V4L2_PIX_FMT_MJPEG && DR(format) != V4L2_PIX_FMT_JPEG) {
LOG_INFO("Switching to CPU encoder because the input format is not (M)JPEG");
goto use_cpu;
}
@@ -127,42 +135,42 @@ void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) {
quality = 0;
}
dev->run->n_workers = 1;
ER(n_workers) = 1;
}
# ifdef WITH_OMX
else if (type == ENCODER_TYPE_OMX) {
for (unsigned index = 0; index < encoder->n_glitched_resolutions; ++index) {
if (
encoder->glitched_resolutions[index][0] == dev->run->width
&& encoder->glitched_resolutions[index][1] == dev->run->height
encoder->glitched_resolutions[index][0] == DR(width)
&& encoder->glitched_resolutions[index][1] == DR(height)
) {
LOG_INFO("Switching to CPU encoder the resolution %ux%u marked as glitchy for OMX",
dev->run->width, dev->run->height);
DR(width), DR(height));
goto use_cpu;
}
}
LOG_DEBUG("Preparing OMX encoder ...");
if (dev->run->n_workers > OMX_MAX_ENCODERS) {
if (ER(n_workers) > OMX_MAX_ENCODERS) {
LOG_INFO("OMX encoder sets limit for worker threads: %u", OMX_MAX_ENCODERS);
dev->run->n_workers = OMX_MAX_ENCODERS;
ER(n_workers) = OMX_MAX_ENCODERS;
}
if (encoder->run->omxs == NULL) {
A_CALLOC(encoder->run->omxs, OMX_MAX_ENCODERS);
if (ER(omxs) == NULL) {
A_CALLOC(ER(omxs), OMX_MAX_ENCODERS);
}
// Начинаем с нуля и доинициализируем на следующих заходах при необходимости
for (; encoder->run->n_omxs < dev->run->n_workers; ++encoder->run->n_omxs) {
if ((encoder->run->omxs[encoder->run->n_omxs] = omx_encoder_init()) == NULL) {
for (; ER(n_omxs) < ER(n_workers); ++ER(n_omxs)) {
if ((ER(omxs[ER(n_omxs)]) = omx_encoder_init()) == NULL) {
LOG_ERROR("Can't initialize OMX encoder, falling back to CPU");
goto force_cpu;
}
}
for (unsigned index = 0; index < encoder->run->n_omxs; ++index) {
if (omx_encoder_prepare(encoder->run->omxs[index], dev, quality) < 0) {
for (unsigned index = 0; index < ER(n_omxs); ++index) {
if (omx_encoder_prepare(ER(omxs[index]), dev, quality) < 0) {
LOG_ERROR("Can't prepare OMX encoder, falling back to CPU");
goto force_cpu;
}
@@ -194,51 +202,54 @@ void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) {
LOG_INFO("Using JPEG quality: %u%%", quality);
}
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = type;
encoder->run->quality = quality;
A_MUTEX_LOCK(&ER(mutex));
ER(type) = type;
ER(quality) = quality;
if (cpu_forced) {
encoder->run->cpu_forced = true;
ER(cpu_forced) = true;
}
A_MUTEX_UNLOCK(&encoder->run->mutex);
A_MUTEX_UNLOCK(&ER(mutex));
}
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic push
int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index) {
int encoder_compress_buffer(
struct encoder_t *encoder, struct device_t *dev,
unsigned worker_number, unsigned buf_index,
struct picture_t *picture) {
#pragma GCC diagnostic pop
assert(encoder->run->type != ENCODER_TYPE_UNKNOWN);
assert(dev->run->hw_buffers[buf_index].used > 0);
assert(ER(type) != ENCODER_TYPE_UNKNOWN);
assert(DR(hw_buffers[buf_index].used) > 0);
dev->run->pictures[buf_index]->encode_begin_ts = get_now_monotonic();
picture->grab_ts = DR(hw_buffers[buf_index].grab_ts);
picture->encode_begin_ts = get_now_monotonic();
if (encoder->run->type == ENCODER_TYPE_CPU) {
if (ER(type) == ENCODER_TYPE_CPU) {
LOG_VERBOSE("Compressing buffer %u using CPU", buf_index);
cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality);
} else if (encoder->run->type == ENCODER_TYPE_HW) {
cpu_encoder_compress_buffer(dev, buf_index, ER(quality), picture);
} else if (ER(type) == ENCODER_TYPE_HW) {
LOG_VERBOSE("Compressing buffer %u using HW (just copying)", buf_index);
hw_encoder_compress_buffer(dev, buf_index);
hw_encoder_compress_buffer(dev, buf_index, picture);
}
# ifdef WITH_OMX
else if (encoder->run->type == ENCODER_TYPE_OMX) {
else if (ER(type) == ENCODER_TYPE_OMX) {
LOG_VERBOSE("Compressing buffer %u using OMX", buf_index);
if (omx_encoder_compress_buffer(encoder->run->omxs[worker_number], dev, buf_index) < 0) {
if (omx_encoder_compress_buffer(ER(omxs[worker_number]), dev, buf_index, picture) < 0) {
goto error;
}
}
# endif
# ifdef WITH_RAWSINK
else if (encoder->run->type == ENCODER_TYPE_NOOP) {
else if (ER(type) == ENCODER_TYPE_NOOP) {
LOG_VERBOSE("Compressing buffer %u using NOOP (do nothing)", buf_index);
}
# endif
picture->encode_end_ts = get_now_monotonic();
dev->run->pictures[buf_index]->encode_end_ts = get_now_monotonic();
dev->run->pictures[buf_index]->width = dev->run->width;
dev->run->pictures[buf_index]->height = dev->run->height;
picture->width = DR(width);
picture->height = DR(height);
return 0;
@@ -247,9 +258,12 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
// cppcheck-suppress unusedLabel
error:
LOG_INFO("Error while compressing buffer, falling back to CPU");
A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->cpu_forced = true;
A_MUTEX_UNLOCK(&encoder->run->mutex);
A_MUTEX_LOCK(&ER(mutex));
ER(cpu_forced) = true;
A_MUTEX_UNLOCK(&ER(mutex));
return -1;
# pragma GCC diagnostic pop
}
#undef DR
#undef ER

View File

@@ -27,6 +27,7 @@
#include <pthread.h>
#include "device.h"
#include "picture.h"
#ifdef WITH_OMX
# include "encoders/omx/encoder.h"
@@ -71,6 +72,8 @@ struct encoder_runtime_t {
bool cpu_forced;
pthread_mutex_t mutex;
unsigned n_workers;
# ifdef WITH_OMX
unsigned n_omxs;
struct omx_encoder_t **omxs;
@@ -80,6 +83,7 @@ struct encoder_runtime_t {
struct encoder_t {
enum encoder_type_t type;
unsigned quality;
unsigned n_workers;
# ifdef WITH_OMX
unsigned n_glitched_resolutions;
unsigned glitched_resolutions[2][MAX_GLITCHED_RESOLUTIONS];
@@ -96,4 +100,8 @@ enum encoder_type_t encoder_parse_type(const char *str);
const char *encoder_type_to_string(enum encoder_type_t type);
void encoder_prepare(struct encoder_t *encoder, struct device_t *dev);
int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, unsigned worker_number, unsigned buf_index);
int encoder_compress_buffer(
struct encoder_t *encoder, struct device_t *dev,
unsigned worker_number, unsigned buf_index,
struct picture_t *picture);

View File

@@ -70,7 +70,10 @@ static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg);
static void _jpeg_term_destination(j_compress_ptr jpeg);
void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned quality) {
void cpu_encoder_compress_buffer(
struct device_t *dev, unsigned index, unsigned quality,
struct picture_t *picture) {
// This function based on compress_image_to_jpeg() from mjpg-streamer
struct jpeg_compress_struct jpeg;
@@ -79,7 +82,7 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
jpeg.err = jpeg_std_error(&jpeg_error);
jpeg_create_compress(&jpeg);
_jpeg_set_picture(&jpeg, dev->run->pictures[index]);
_jpeg_set_picture(&jpeg, picture);
jpeg.image_width = dev->run->width;
jpeg.image_height = dev->run->height;
@@ -108,7 +111,7 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
jpeg_finish_compress(&jpeg);
jpeg_destroy_compress(&jpeg);
assert(dev->run->pictures[index]->used > 0);
assert(picture->used > 0);
}
static void _jpeg_set_picture(j_compress_ptr jpeg, struct picture_t *picture) {

View File

@@ -23,6 +23,9 @@
#pragma once
#include "../../device.h"
#include "../../picture.h"
void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned quality);
void cpu_encoder_compress_buffer(
struct device_t *dev, unsigned index, unsigned quality,
struct picture_t *picture);

View File

@@ -63,11 +63,11 @@ int hw_encoder_prepare(struct device_t *dev, unsigned quality) {
return 0;
}
void hw_encoder_compress_buffer(struct device_t *dev, unsigned index) {
void hw_encoder_compress_buffer(struct device_t *dev, unsigned index, struct picture_t *picture) {
if (dev->run->format != V4L2_PIX_FMT_MJPEG && dev->run->format != V4L2_PIX_FMT_JPEG) {
assert(0 && "Unsupported input format for HW encoder");
}
_copy_plus_huffman(&dev->run->hw_buffers[index], dev->run->pictures[index]);
_copy_plus_huffman(&dev->run->hw_buffers[index], picture);
}
void _copy_plus_huffman(const struct hw_buffer_t *src, struct picture_t *dest) {

View File

@@ -23,7 +23,8 @@
#pragma once
#include "../../device.h"
#include "../../picture.h"
int hw_encoder_prepare(struct device_t *dev, unsigned quality);
void hw_encoder_compress_buffer(struct device_t *dev, unsigned index);
void hw_encoder_compress_buffer(struct device_t *dev, unsigned index, struct picture_t *picture);

View File

@@ -38,8 +38,8 @@
#include "../../../common/logging.h"
#include "../../../common/tools.h"
#include "../../picture.h"
#include "../../device.h"
#include "../../picture.h"
#include "formatters.h"
#include "component.h"
@@ -176,7 +176,10 @@ int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigne
return 0;
}
int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index) {
int omx_encoder_compress_buffer(
struct omx_encoder_t *omx, struct device_t *dev, unsigned index,
struct picture_t *picture) {
# define HW(_next) dev->run->hw_buffers[index]._next
# define IN(_next) omx->input_buffer->_next
# define OUT(_next) omx->output_buffer->_next
@@ -190,7 +193,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
return -1;
}
dev->run->pictures[index]->used = 0;
picture->used = 0;
omx->output_available = false;
omx->input_required = true;
@@ -202,7 +205,7 @@ int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev,
if (omx->output_available) {
omx->output_available = false;
picture_append_data(dev->run->pictures[index], OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen));
picture_append_data(picture, OUT(pBuffer) + OUT(nOffset), OUT(nFilledLen));
if (OUT(nFlags) & OMX_BUFFERFLAG_ENDOFFRAME) {
OUT(nFlags) = 0;

View File

@@ -28,6 +28,7 @@
#include <interface/vcos/vcos_semaphore.h>
#include "../../device.h"
#include "../../picture.h"
#ifndef CFG_OMX_MAX_ENCODERS
# define CFG_OMX_MAX_ENCODERS 3 // Raspberry Pi limitation
@@ -55,4 +56,7 @@ struct omx_encoder_t *omx_encoder_init(void);
void omx_encoder_destroy(struct omx_encoder_t *omx);
int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev, unsigned quality);
int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, unsigned index);
int omx_encoder_compress_buffer(
struct omx_encoder_t *omx, struct device_t *dev, unsigned index,
struct picture_t *picture);

View File

@@ -360,7 +360,7 @@ int options_parse(struct options_t *options, struct device_t *dev, struct encode
case _O_PERSISTENT: OPT_SET(dev->persistent, true);
case _O_DV_TIMINGS: OPT_SET(dev->dv_timings, true);
case _O_BUFFERS: OPT_NUMBER("--buffers", dev->n_buffers, 1, 32, 0);
case _O_WORKERS: OPT_NUMBER("--workers", dev->n_workers, 1, 32, 0);
case _O_WORKERS: OPT_NUMBER("--workers", encoder->n_workers, 1, 32, 0);
case _O_QUALITY: OPT_NUMBER("--quality", encoder->quality, 1, 100, 0);
case _O_ENCODER: OPT_PARSE("encoder type", encoder->type, encoder_parse_type, ENCODER_TYPE_UNKNOWN, ENCODER_TYPES_STR);
# ifdef WITH_OMX
@@ -610,7 +610,7 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s
printf(" Each buffer may processed using an independent thread.\n");
printf(" Default: %u (the number of CPU cores (but not more than 4) + 1).\n\n", dev->n_buffers);
printf(" -w|--workers <N> ──────────────────── The number of worker threads but not more than buffers.\n");
printf(" Default: %u (the number of CPU cores (but not more than 4)).\n\n", dev->n_workers);
printf(" Default: %u (the number of CPU cores (but not more than 4)).\n\n", encoder->n_workers);
printf(" -q|--quality <N> ──────────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n", encoder->quality);
printf(" Note: If HW encoding is used (JPEG source format selected),\n");
printf(" this parameter attempts to configure the camera\n");

View File

@@ -53,6 +53,7 @@ struct _worker_t {
atomic_bool *workers_stop;
long double last_comp_time;
struct picture_t *picture;
pthread_mutex_t has_job_mutex;
unsigned buf_index;
@@ -69,8 +70,7 @@ struct _worker_t {
struct _worker_t *order_prev;
struct _worker_t *order_next;
struct device_t *dev;
struct encoder_t *encoder;
struct stream_t *stream;
};
struct _workers_pool_t {
@@ -93,7 +93,7 @@ struct _workers_pool_t {
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream);
static struct _workers_pool_t *_stream_init_one(struct stream_t *stream);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps);
static void _stream_expose_picture(struct stream_t *stream, struct picture_t *picture, unsigned captured_fps);
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream);
static void _workers_pool_destroy(struct _workers_pool_t *pool);
@@ -166,7 +166,7 @@ void stream_loop(struct stream_t *stream) {
if (!ready_wr->job_failed) {
if (ready_wr->job_timely) {
_stream_expose_picture(stream, ready_wr->buf_index, captured_fps);
_stream_expose_picture(stream, ready_wr->picture, captured_fps);
LOG_PERF("##### Encoded picture exposed; worker=%u", ready_wr->number);
} else {
LOG_PERF("----- Encoded picture dropped; worker=%u", ready_wr->number);
@@ -242,7 +242,7 @@ void stream_loop(struct stream_t *stream) {
DEV(run->format),
DEV(run->width),
DEV(run->height),
DEV(run->pictures[buf_index]->grab_ts)
DEV(run->hw_buffers[buf_index].grab_ts)
);
}
# endif
@@ -346,10 +346,10 @@ static struct _workers_pool_t *_stream_init_one(struct stream_t *stream) {
return NULL;
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps) {
static void _stream_expose_picture(struct stream_t *stream, struct picture_t *picture, unsigned captured_fps) {
A_MUTEX_LOCK(&stream->mutex);
picture_copy(stream->dev->run->pictures[buf_index], stream->picture);
picture_copy(picture, stream->picture);
stream->online = true;
stream->captured_fps = captured_fps;
@@ -359,16 +359,17 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index,
}
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
struct _workers_pool_t *pool;
# define DEV(_next) stream->dev->_next
# define RUN(_next) stream->dev->run->_next
LOG_INFO("Creating pool with %u workers ...", RUN(n_workers));
struct _workers_pool_t *pool;
size_t picture_size = picture_get_generous_size(RUN(width), RUN(height));
LOG_INFO("Creating pool with %u workers ...", stream->encoder->run->n_workers);
A_CALLOC(pool, 1);
pool->n_workers = RUN(n_workers);
pool->n_workers = stream->encoder->run->n_workers;
A_CALLOC(pool->workers, pool->n_workers);
A_MUTEX_INIT(&pool->free_workers_mutex);
@@ -386,6 +387,9 @@ static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(_next) pool->workers[number]._next
WR(picture) = picture_init();
picture_realloc_data(WR(picture), picture_size);
A_MUTEX_INIT(&WR(has_job_mutex));
atomic_init(&WR(has_job), false);
A_COND_INIT(&WR(has_job_cond));
@@ -398,8 +402,7 @@ static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
WR(free_workers) = &pool->free_workers;
WR(free_workers_cond) = &pool->free_workers_cond;
WR(dev) = stream->dev;
WR(encoder) = stream->encoder;
WR(stream) = stream;
A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
@@ -426,6 +429,8 @@ static void _workers_pool_destroy(struct _workers_pool_t *pool) {
A_MUTEX_DESTROY(&WR(has_job_mutex));
A_COND_DESTROY(&WR(has_job_cond));
picture_destroy(WR(picture));
# undef WR
}
@@ -450,13 +455,19 @@ static void *_worker_thread(void *v_worker) {
A_MUTEX_UNLOCK(&wr->has_job_mutex);
if (!atomic_load(wr->workers_stop)) {
# define PIC(_next) wr->dev->run->pictures[wr->buf_index]->_next
# define PIC(_next) wr->picture->_next
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", wr->number, wr->buf_index);
wr->job_failed = (bool)encoder_compress_buffer(wr->encoder, wr->dev, wr->number, wr->buf_index);
wr->job_failed = (bool)encoder_compress_buffer(
wr->stream->encoder,
wr->stream->dev,
wr->number,
wr->buf_index,
wr->picture
);
if (device_release_buffer(wr->dev, wr->buf_index) == 0) {
if (device_release_buffer(wr->stream->dev, wr->buf_index) == 0) {
if (!wr->job_failed) {
wr->job_start_ts = PIC(encode_begin_ts);
wr->last_comp_time = PIC(encode_end_ts) - wr->job_start_ts;