From c2b82312ec6f842f83eaef9f67656195b0c31002 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Mon, 24 Sep 2018 11:17:39 +0300 Subject: [PATCH] omx prototype --- Makefile | 9 +++- src/device.c | 2 + src/device.h | 1 + src/encoder.c | 105 ++++++++++++++++++++++++++++++++++++++++++++ src/encoder.h | 53 ++++++++++++++++++++++ src/jpeg/encoder.c | 3 +- src/jpeg/encoder.h | 2 +- src/main.c | 6 ++- src/omx/component.c | 2 +- src/omx/encoder.c | 15 ++++--- src/omx/encoder.h | 4 +- src/stream.c | 26 +++++++---- src/stream.h | 18 +++++--- 13 files changed, 217 insertions(+), 29 deletions(-) create mode 100644 src/encoder.c create mode 100644 src/encoder.h diff --git a/Makefile b/Makefile index e031979..6e4597e 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,13 @@ PROG = ustreamer all: $(SOURCES) $(PROG) +rpi: + make all \ + SOURCES="$(SOURCES) $(shell ls src/omx/*.c)" \ + LIBS="$(LIBS) -lbcm_host -lvcos -lopenmaxil -L/opt/vc/lib" \ + CFLAGS="$(CFLAGS) -DOMX_ENCODER -DOMX_SKIP64BIT -I/opt/vc/include" + + install: $(PROG) install -Dm755 $(PROG) $(DESTDIR)$(PREFIX)/bin/$(PROG) @@ -30,4 +37,4 @@ $(PROG): $(OBJECTS) clean: - rm -f src/*.o vgcore.* $(PROG) + rm -f src/*.o src/{jpeg,omx}/*.o vgcore.* $(PROG) diff --git a/src/device.c b/src/device.c index d60b001..a262728 100644 --- a/src/device.c +++ b/src/device.c @@ -138,6 +138,8 @@ int device_open(struct device_t *dev) { } _device_open_alloc_picbufs(dev); + dev->run->n_workers = dev->n_workers; + LOG_DEBUG("Device fd=%d initialized", dev->run->fd); return 0; diff --git a/src/device.h b/src/device.h index 86e4777..a43767a 100644 --- a/src/device.h +++ b/src/device.h @@ -49,6 +49,7 @@ struct device_runtime_t { unsigned height; unsigned format; unsigned n_buffers; + unsigned n_workers; struct hw_buffer_t *hw_buffers; struct picture_t *pictures; unsigned long max_picture_size; diff --git a/src/encoder.c b/src/encoder.c new file mode 100644 index 0000000..df8a765 --- /dev/null +++ b/src/encoder.c @@ -0,0 +1,105 @@ +/***************************************************************************** +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "tools.h" +#include "device.h" +#include "encoder.h" + +#include "jpeg/encoder.h" + +#ifdef OMX_ENCODER +# include "omx/encoder.h" +#endif + +struct encoder_t *encoder_init(enum encoder_type_t type) { + struct encoder_t *encoder; + + A_CALLOC(encoder, 1); + encoder->type = type; + +# ifdef OMX_ENCODER + if (type == ENCODER_TYPE_OMX) { + if ((encoder->omx = omx_encoder_init()) == NULL) { + goto use_fallback; + } + } +# endif + + return encoder; + +# pragma GCC diagnostic ignored "-Wunused-label" +# pragma GCC diagnostic push + use_fallback: + LOG_ERROR("Can't initialize selected encoder, using CPU instead it"); + encoder->type = ENCODER_TYPE_CPU; + return encoder; +# pragma GCC diagnostic pop +} + +void encoder_destroy(struct encoder_t *encoder) { +# ifdef OMX_ENCODER + if (encoder->omx) { + omx_encoder_destroy(encoder->omx); + } +# endif + free(encoder); +} + +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic push +void encoder_prepare(struct encoder_t *encoder, struct device_t *dev) { +#pragma GCC diagnostic pop +# ifdef OMX_ENCODER + if (encoder->type == ENCODER_TYPE_OMX) { + if (omx_encoder_prepare(encoder->omx, dev) < 0) { + goto use_fallback; + } + } + if (dev->run->n_workers > 1) { + LOG_INFO("OMX encoder can only work with one worker thread; forcing n_workers to 1"); + dev->run->n_workers = 1; + } +# endif + + return; + +# pragma GCC diagnostic ignored "-Wunused-label" +# pragma GCC diagnostic push + use_fallback: + LOG_ERROR("Can't prepare selected encoder, falling back to CPU"); + encoder->type = ENCODER_TYPE_CPU; +# pragma GCC diagnostic pop +} + +void encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, int index) { +# ifdef OMX_ENCODER + if (encoder->type == ENCODER_TYPE_OMX) { + if (omx_encoder_compress_buffer(encoder->omx, dev, index) < 0) { + LOG_INFO("OMX compressor error, falling back to CPU method"); + encoder->type = ENCODER_TYPE_CPU; + } + } +# endif + + if (encoder->type == ENCODER_TYPE_CPU) { + jpeg_encoder_compress_buffer(dev, index); + } +} diff --git a/src/encoder.h b/src/encoder.h new file mode 100644 index 0000000..c8f2ec9 --- /dev/null +++ b/src/encoder.h @@ -0,0 +1,53 @@ +/***************************************************************************** +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include "tools.h" +#include "device.h" + +#ifdef OMX_ENCODER +# include "omx/encoder.h" +#endif + + +enum encoder_type_t { + ENCODER_TYPE_CPU, + +#ifdef OMX_ENCODER + ENCODER_TYPE_OMX, +#endif +}; + +struct encoder_t { + enum encoder_type_t type; + +#ifdef OMX_ENCODER + struct omx_encoder_t *omx; +#endif +}; + + +struct encoder_t *encoder_init(enum encoder_type_t type); +void encoder_destroy(struct encoder_t *encoder); + +void encoder_prepare(struct encoder_t *encoder, struct device_t *dev); +void encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, int index); diff --git a/src/jpeg/encoder.c b/src/jpeg/encoder.c index b9372c5..adc2351 100644 --- a/src/jpeg/encoder.c +++ b/src/jpeg/encoder.c @@ -68,7 +68,7 @@ static boolean _jpeg_empty_output_buffer(j_compress_ptr jpeg); static void _jpeg_term_destination(j_compress_ptr jpeg); -int jpeg_compress_buffer(struct device_t *dev, int index) { +void jpeg_encoder_compress_buffer(struct device_t *dev, int index) { // This function based on compress_image_to_jpeg() from mjpg-streamer struct jpeg_compress_struct jpeg; @@ -110,7 +110,6 @@ int jpeg_compress_buffer(struct device_t *dev, int index) { jpeg_destroy_compress(&jpeg); free(line_buffer); assert(dev->run->pictures[index].size > 0); - return 0; } static void _jpeg_set_dest_picture(j_compress_ptr jpeg, unsigned char *picture, unsigned long *written) { diff --git a/src/jpeg/encoder.h b/src/jpeg/encoder.h index 48dfda5..c56ab84 100644 --- a/src/jpeg/encoder.h +++ b/src/jpeg/encoder.h @@ -27,4 +27,4 @@ #include "../device.h" -int jpeg_compress_buffer(struct device_t *dev, int index); +void jpeg_encoder_compress_buffer(struct device_t *dev, int index); diff --git a/src/main.c b/src/main.c index 7ba6525..dbfb840 100644 --- a/src/main.c +++ b/src/main.c @@ -35,6 +35,7 @@ #include "tools.h" #include "logging.h" #include "device.h" +#include "encoder.h" #include "stream.h" #include "http.h" @@ -218,6 +219,7 @@ static void _install_signal_handlers() { int main(int argc, char *argv[]) { struct device_t *dev; + struct encoder_t *encoder; struct stream_t *stream; struct http_server_t *server; int exit_code = 0; @@ -225,7 +227,8 @@ int main(int argc, char *argv[]) { LOGGING_INIT; dev = device_init(); - stream = stream_init(dev); + encoder = encoder_init(ENCODER_TYPE_CPU); + stream = stream_init(dev, encoder); server = http_server_init(stream); if ((exit_code = _parse_options(argc, argv, dev, server)) == 0) { @@ -249,6 +252,7 @@ int main(int argc, char *argv[]) { http_server_destroy(server); stream_destroy(stream); + encoder_destroy(encoder); device_destroy(dev); LOGGING_DESTROY; diff --git a/src/omx/component.c b/src/omx/component.c index 312d102..4ad8f84 100644 --- a/src/omx/component.c +++ b/src/omx/component.c @@ -52,7 +52,7 @@ int component_get_portdef(OMX_HANDLETYPE *component, OMX_PARAM_PORTDEFINITIONTYP int component_set_portdef(OMX_HANDLETYPE *component, OMX_PARAM_PORTDEFINITIONTYPE *portdef) { OMX_ERRORTYPE error; - LOG_DEBUG("Writing OMX port %u definition ...", port); + LOG_DEBUG("Writing OMX port %u definition ...", portdef->nPortIndex); if ((error = OMX_SetParameter(*component, OMX_IndexParamPortDefinition, portdef)) != OMX_ErrorNone) { LOG_OMX_ERROR(error, "Can't set OMX port %u definition", portdef->nPortIndex); return -1; diff --git a/src/omx/encoder.c b/src/omx/encoder.c index 2cc2e1f..a7fe1ad 100644 --- a/src/omx/encoder.c +++ b/src/omx/encoder.c @@ -113,7 +113,7 @@ void omx_encoder_destroy(struct omx_encoder_t *omx) { free(omx); } -int omx_encoder_set_device(struct omx_encoder_t *omx, struct device_t *dev) { +int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev) { if (component_set_state(&omx->encoder, OMX_StateIdle) < 0) { return -1; } @@ -132,13 +132,13 @@ int omx_encoder_set_device(struct omx_encoder_t *omx, struct device_t *dev) { return 0; } -void omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, int index) { +int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, int index) { OMX_ERRORTYPE error; bool loaded = false; if ((error = OMX_FillThisBuffer(omx->encoder, omx->output_buffer)) != OMX_ErrorNone) { LOG_OMX_ERROR(error, "Failed to request filling of the output buffer on encoder"); - assert(0); // TODO + return -1; } dev->run->pictures[index].size = 0; @@ -146,7 +146,9 @@ void omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev omx->input_required = true; while (true) { - assert(!omx->failed); // FIXME + if (omx->failed) { + return -1; + } if (omx->output_available) { omx->output_available = false; @@ -165,7 +167,7 @@ void omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev if ((error = OMX_FillThisBuffer(omx->encoder, omx->output_buffer)) != OMX_ErrorNone) { LOG_OMX_ERROR(error, "Failed to request filling of the output buffer on encoder"); - assert(0); // TODO + return -1; } } @@ -182,12 +184,13 @@ void omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev if ((error = OMX_EmptyThisBuffer(omx->encoder, omx->input_buffer)) != OMX_ErrorNone) { LOG_OMX_ERROR(error, "Failed to request emptying of the input buffer on encoder"); - assert(0); // TODO + return -1; } } vcos_semaphore_wait(&omx->handler_lock); } + return 0; } static int _omx_init_component(struct omx_encoder_t *omx) { diff --git a/src/omx/encoder.h b/src/omx/encoder.h index 636677a..66742c3 100644 --- a/src/omx/encoder.h +++ b/src/omx/encoder.h @@ -28,5 +28,5 @@ struct omx_encoder_t { struct omx_encoder_t *omx_encoder_init(); void omx_encoder_destroy(struct omx_encoder_t *omx); -int omx_encoder_set_device(struct omx_encoder_t *omx, struct device_t *dev); -void omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, int index); +int omx_encoder_prepare(struct omx_encoder_t *omx, struct device_t *dev); +int omx_encoder_compress_buffer(struct omx_encoder_t *omx, struct device_t *dev, int index); diff --git a/src/stream.c b/src/stream.c index 652ab0f..2fb451a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -32,6 +32,7 @@ #include "tools.h" #include "logging.h" #include "device.h" +#include "encoder.h" #include "stream.h" #include "jpeg/encoder.h" @@ -53,11 +54,12 @@ static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_ static int _stream_handle_event(struct device_t *dev); -struct stream_t *stream_init(struct device_t *dev) { +struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { struct stream_t *stream; A_CALLOC(stream, 1); stream->dev = dev; + stream->encoder = encoder; A_PTHREAD_M_INIT(&stream->mutex); return stream; } @@ -72,6 +74,7 @@ void stream_loop(struct stream_t *stream) { bool workers_stop; MEMSET_ZERO(pool); + pool.encoder = stream->encoder; pool.workers_stop = &workers_stop; LOG_INFO("Using V4L2 device: %s", stream->dev->path); @@ -111,7 +114,7 @@ void stream_loop(struct stream_t *stream) { LOG_PERF("##### ACCEPT : %u", free_worker_number); } else { - for (unsigned number = 0; number < stream->dev->n_workers; ++number) { + for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) { if (!pool.workers[number].has_job && (free_worker_number == -1 || pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time )) { @@ -301,7 +304,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) { long double delay = 0; - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < dev->run->n_workers; ++number) { A_PTHREAD_M_LOCK(&pool->workers[number].last_comp_time_mutex); if (pool->workers[number].last_comp_time > 0) { delay += pool->workers[number].last_comp_time; @@ -309,7 +312,7 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker A_PTHREAD_M_UNLOCK(&pool->workers[number].last_comp_time_mutex); } // Среднее арифметическое деленное на количество воркеров - return delay / dev->n_workers / dev->n_workers; + return delay / dev->run->n_workers / dev->run->n_workers; } static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) { @@ -340,6 +343,9 @@ static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) { if (_stream_control(dev, true) < 0) { goto error; } + + encoder_prepare(pool->encoder, dev); + _stream_init_workers(dev, pool); return 0; @@ -350,15 +356,15 @@ static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) { } static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) { - LOG_INFO("Spawning %d workers ...", dev->n_workers); + LOG_INFO("Spawning %d workers ...", dev->run->n_workers); *pool->workers_stop = false; - A_CALLOC(pool->workers, dev->n_workers); + A_CALLOC(pool->workers, dev->run->n_workers); A_PTHREAD_M_INIT(&pool->free_workers_mutex); A_PTHREAD_C_INIT(&pool->free_workers_cond); - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < dev->run->n_workers; ++number) { pool->free_workers += 1; A_PTHREAD_M_INIT(&pool->workers[number].has_job_mutex); @@ -369,6 +375,8 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po pool->workers[number].ctx.dev_stop = (sig_atomic_t *volatile)&dev->stop; pool->workers[number].ctx.workers_stop = pool->workers_stop; + pool->workers[number].ctx.encoder = pool->encoder; + pool->workers[number].ctx.last_comp_time_mutex = &pool->workers[number].last_comp_time_mutex; pool->workers[number].ctx.last_comp_time = &pool->workers[number].last_comp_time; @@ -405,7 +413,7 @@ static void *_stream_worker_thread(void *v_ctx) { LOG_DEBUG("Worker %u compressing JPEG from buffer %d ...", ctx->number, ctx->buf_index); - jpeg_compress_buffer(ctx->dev, ctx->buf_index); + encoder_compress_buffer(ctx->encoder, ctx->dev, ctx->buf_index); if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) { *ctx->job_start_time = start_time; @@ -442,7 +450,7 @@ static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t LOG_INFO("Destroying workers ..."); *pool->workers_stop = true; - for (unsigned number = 0; number < dev->n_workers; ++number) { + for (unsigned number = 0; number < dev->run->n_workers; ++number) { A_PTHREAD_M_LOCK(&pool->workers[number].has_job_mutex); pool->workers[number].has_job = true; // Final job: die A_PTHREAD_M_UNLOCK(&pool->workers[number].has_job_mutex); diff --git a/src/stream.h b/src/stream.h index a89908a..fb1bab2 100644 --- a/src/stream.h +++ b/src/stream.h @@ -27,6 +27,7 @@ #include #include "device.h" +#include "encoder.h" struct worker_context_t { @@ -37,6 +38,8 @@ struct worker_context_t { sig_atomic_t *volatile dev_stop; bool *workers_stop; + struct encoder_t *encoder; + pthread_mutex_t *last_comp_time_mutex; long double *last_comp_time; @@ -69,12 +72,14 @@ struct worker_t { }; struct workers_pool_t { - struct worker_t *workers; - bool *workers_stop; + struct worker_t*workers; + bool *workers_stop; - pthread_mutex_t free_workers_mutex; - unsigned free_workers; - pthread_cond_t free_workers_cond; + pthread_mutex_t free_workers_mutex; + unsigned free_workers; + pthread_cond_t free_workers_cond; + + struct encoder_t *encoder; }; struct stream_t { @@ -84,10 +89,11 @@ struct stream_t { bool updated; pthread_mutex_t mutex; struct device_t *dev; + struct encoder_t *encoder; }; -struct stream_t *stream_init(struct device_t *dev); +struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder); void stream_destroy(struct stream_t *stream); void stream_loop(struct stream_t *stream);