mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-02-19 16:26:30 +00:00
Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05246706f0 | ||
|
|
9355055a7f | ||
|
|
9f16de13fe | ||
|
|
36a987fb9d | ||
|
|
652397f569 | ||
|
|
3333fc56a3 | ||
|
|
47378a17db | ||
|
|
5f320786f5 | ||
|
|
498c6e1f5d | ||
|
|
6c09adc689 | ||
|
|
8bf7ac3005 | ||
|
|
6aebd7167e | ||
|
|
3f03575222 | ||
|
|
9ca43f17a3 | ||
|
|
66ef566c9a | ||
|
|
07ecc5b0a0 | ||
|
|
8d19711f2c | ||
|
|
c94117ae1e | ||
|
|
06a80df708 | ||
|
|
13dff256c8 | ||
|
|
07e9dbc0f7 | ||
|
|
c83dddbf0b | ||
|
|
5672d1aa75 | ||
|
|
078097efec | ||
|
|
3cd8338886 | ||
|
|
d2b57cc7d5 | ||
|
|
2c4f59c87a | ||
|
|
36eb7eeb76 | ||
|
|
3b6544db8a |
@@ -1,7 +1,7 @@
|
||||
[bumpversion]
|
||||
commit = True
|
||||
tag = True
|
||||
current_version = 0.74
|
||||
current_version = 0.78
|
||||
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)?
|
||||
serialize =
|
||||
{major}.{minor}
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,6 +2,7 @@
|
||||
/pkg/arch/src/
|
||||
/pkg/arch/v*.tar.gz
|
||||
/pkg/arch/ustreamer-*.pkg.tar.xz
|
||||
/config.mk
|
||||
/vgcore.*
|
||||
/ustreamer
|
||||
/*.sock
|
||||
|
||||
54
Makefile
54
Makefile
@@ -1,23 +1,39 @@
|
||||
-include config.mk
|
||||
|
||||
PROG ?= ustreamer
|
||||
DESTDIR ?=
|
||||
PREFIX ?= /usr/local
|
||||
|
||||
CC ?= gcc
|
||||
CFLAGS ?= -O3
|
||||
LDFLAGS ?=
|
||||
CC ?= gcc
|
||||
|
||||
RPI_VC_HEADERS ?= /opt/vc/include
|
||||
RPI_VC_LIBS ?= /opt/vc/lib
|
||||
|
||||
|
||||
# =====
|
||||
LIBS = -lm -ljpeg -pthread -levent -levent_pthreads -luuid
|
||||
override CFLAGS += -c -std=c11 -Wall -Wextra -D_GNU_SOURCE
|
||||
SOURCES = $(shell ls src/*.c src/http/*.c src/encoders/cpu/*.c src/encoders/hw/*.c)
|
||||
PROG = ustreamer
|
||||
|
||||
ifeq ($(shell ls -d /opt/vc/include 2>/dev/null), /opt/vc/include)
|
||||
SOURCES += $(shell ls src/encoders/omx/*.c)
|
||||
LIBS += -lbcm_host -lvcos -lopenmaxil -L/opt/vc/lib
|
||||
override CFLAGS += -DWITH_OMX_ENCODER -DOMX_SKIP64BIT -I/opt/vc/include
|
||||
|
||||
define optbool
|
||||
$(filter $(shell echo $(1) | tr A-Z a-z), yes on 1)
|
||||
endef
|
||||
|
||||
|
||||
ifneq ($(call optbool,$(WITH_OMX_ENCODER)),)
|
||||
LIBS += -lbcm_host -lvcos -lopenmaxil -L$(RPI_VC_LIBS)
|
||||
override CFLAGS += -DWITH_OMX_ENCODER -DOMX_SKIP64BIT -I$(RPI_VC_HEADERS)
|
||||
SOURCES += $(shell ls src/encoders/omx/*.c)
|
||||
endif
|
||||
|
||||
OBJECTS = $(SOURCES:.c=.o)
|
||||
|
||||
ifneq ($(call optbool,$(WITH_WORKERS_GPIO_DEBUG)),)
|
||||
LIBS += -lwiringPi
|
||||
override CFLAGS += -DWITH_WORKERS_GPIO_DEBUG
|
||||
endif
|
||||
|
||||
|
||||
# =====
|
||||
@@ -28,17 +44,32 @@ install: $(PROG)
|
||||
install -Dm755 $(PROG) $(DESTDIR)$(PREFIX)/bin/$(PROG)
|
||||
|
||||
|
||||
install-strip: install
|
||||
strip $(DESTDIR)$(PREFIX)/bin/$(PROG)
|
||||
|
||||
|
||||
uninstall:
|
||||
rm $(DESTDIR)$(PREFIX)/bin/$(PROG)
|
||||
|
||||
|
||||
regen:
|
||||
tools/make-jpeg-h.py src/http/data/blank.jpeg src/http/data/blank_jpeg.h BLANK
|
||||
tools/make-html-h.py src/http/data/index.html src/http/data/index_html.h INDEX
|
||||
|
||||
|
||||
$(PROG): $(OBJECTS)
|
||||
$(CC) $(LIBS) $(LDFLAGS) $(OBJECTS) -o $@
|
||||
$(PROG): $(SOURCES:.c=.o)
|
||||
$(info -- LINKING $@)
|
||||
@ $(CC) $(SOURCES:.c=.o) -o $@ $(LDFLAGS) $(LIBS)
|
||||
$(info ===== Build complete =====)
|
||||
$(info == CC = $(CC))
|
||||
$(info == LIBS = $(LIBS))
|
||||
$(info == CFLAGS = $(CFLAGS))
|
||||
$(info == LDFLAGS = $(LDFLAGS))
|
||||
|
||||
|
||||
.c.o:
|
||||
$(CC) $(LIBS) $(CFLAGS) $< -o $@
|
||||
$(info -- CC $<)
|
||||
@ $(CC) $< -o $@ $(CFLAGS) $(LIBS)
|
||||
|
||||
|
||||
release:
|
||||
@@ -50,13 +81,14 @@ release:
|
||||
|
||||
|
||||
bump:
|
||||
bumpversion $(if $(V), $(V), minor)
|
||||
bumpversion $(if $(V),$(V),minor)
|
||||
|
||||
|
||||
push:
|
||||
git push
|
||||
git push --tags
|
||||
|
||||
|
||||
clean-all: clean
|
||||
clean:
|
||||
find src -name '*.o' -exec rm '{}' \;
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
| Multithreaded JPEG encoding |  Yes |  No |
|
||||
| [OpenMAX IL](https://www.khronos.org/openmaxil) hardware acceleration<br>on Raspberry Pi |  Yes |  No |
|
||||
| Behavior when the device<br>is disconnected while streaming |  Shows a black screen<br>with ```NO SIGNAL``` on it<br>until reconnected |  Stops the broadcast <sup>1</sup> |
|
||||
| [DV-timings](https://linuxtv.org/downloads/v4l-dvb-apis/uapi/v4l/dv-timings.html) support -<br>the ability to change resolution<br>on the fly by source signal |  Yes |  Partially yes <sup>2</sup> |
|
||||
| [DV-timings](https://linuxtv.org/downloads/v4l-dvb-apis/uapi/v4l/dv-timings.html) support -<br>the ability to change resolution<br>on the fly by source signal |  Yes |  Partially yes <sup>1</sup> |
|
||||
| Option to skip frames when streaming<br>static images by HTTP to save traffic |  Yes <sup>2</sup> |  No |
|
||||
| Streaming via UNIX domain socket |  Yes |  No |
|
||||
| Debug logs without recompiling,<br>performance statistics log,<br>access to HTTP broadcast parameters |  Yes |  No |
|
||||
@@ -31,7 +31,7 @@ If you're going to live-stream from your backyard webcam and need to control it,
|
||||
# Building
|
||||
You'll need ```make```, ```gcc```, ```libevent``` with ```pthreads``` support, ```libjpeg8```/```libjpeg-turbo``` and ```libuuid```.
|
||||
|
||||
It should compile automatically with OpenMAX IL on Raspberry Pi, if the corresponding headers are present in ```/opt/vc/include```.
|
||||
On Raspberry Pi you can build the program with OpenMAX IL. To do this pass option ```WITH_OMX_ENCODER=1``` to ```make```.
|
||||
|
||||
```
|
||||
$ git clone --depth=1 https://github.com/pi-kvm/ustreamer
|
||||
@@ -40,7 +40,8 @@ $ make
|
||||
$ ./ustreamer --help
|
||||
```
|
||||
|
||||
AUR has a package for Arch Linux: https://aur.archlinux.org/packages/ustreamer
|
||||
AUR has a package for Arch Linux: https://aur.archlinux.org/packages/ustreamer. It should compile automatically with OpenMAX IL on Raspberry Pi, if the corresponding headers are present in ```/opt/vc/include```.
|
||||
FreeBSD port: https://www.freshports.org/multimedia/ustreamer.
|
||||
|
||||
-----
|
||||
# Usage
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
| Многопоточное кодирование JPEG |  Есть |  Нет |
|
||||
| Аппаратное кодирование с помощью [OpenMAX IL](https://www.khronos.org/openmaxil) на Raspberry Pi |  Есть |  Нет |
|
||||
| Поведение при физическом отключении<br>устройства от сервера во время работы |  Транслирует черный экран<br>с надписью ```NO SIGNAL```,<br>пока устройство не будет подключено снова |  Прерывает трансляцию <sup>1</sup> |
|
||||
| Поддержка [DV-таймингов](https://linuxtv.org/downloads/v4l-dvb-apis/uapi/v4l/dv-timings.html) - возможности<br>изменения параметров разрешения<br>трансляции на лету по сигналу<br>источника (устройства видеозахвата) |  Есть |  Условно есть <sup>2</sup> |
|
||||
| Поддержка [DV-таймингов](https://linuxtv.org/downloads/v4l-dvb-apis/uapi/v4l/dv-timings.html) - возможности<br>изменения параметров разрешения<br>трансляции на лету по сигналу<br>источника (устройства видеозахвата) |  Есть |  Условно есть <sup>1</sup> |
|
||||
| Возможность пропуска фреймов при передаче<br>статического изображения по HTTP<br>для экономии трафика |  Есть <sup>2</sup> |  Нет |
|
||||
| Стрим через UNIX domain socket |  Есть |  Нет |
|
||||
| Дебаг-логи без перекомпиляции,<br>логгирование статистики производительности,<br>возможность получения параметров<br>трансляции по HTTP |  Есть |  Нет |
|
||||
@@ -31,7 +31,7 @@
|
||||
# Сборка
|
||||
Для сборки вам понадобятся ```make```, ```gcc```, ```libevent``` с поддержкой ```pthreads```, ```libjpeg8```/```libjpeg-turbo``` и ```libuuid```.
|
||||
|
||||
На Raspberry Pi програма автоматически собирается с поддержкой OpenMAX IL, если обнаружит нужные хедеры в ```/opt/vc/include```.
|
||||
На Raspberry Pi программу можно собрать с поддержкой OpenMAX IL. Для этого передайте ```make``` параметр ```WITH_OMX_ENCODER=1```.
|
||||
|
||||
```
|
||||
$ git clone --depth=1 https://github.com/pi-kvm/ustreamer
|
||||
@@ -40,7 +40,8 @@ $ make
|
||||
$ ./ustreamer --help
|
||||
```
|
||||
|
||||
Для Arch Linux в AUR есть готовый пакет: https://aur.archlinux.org/packages/ustreamer
|
||||
Для Arch Linux в AUR есть готовый пакет: https://aur.archlinux.org/packages/ustreamer. На Raspberry Pi програма автоматически собирается с поддержкой OpenMAX IL, если обнаружит нужные хедеры в ```/opt/vc/include```.
|
||||
Порт для FreeBSD: https://www.freshports.org/multimedia/ustreamer.
|
||||
|
||||
-----
|
||||
# Использование
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
|
||||
pkgname=ustreamer
|
||||
pkgver=0.74
|
||||
pkgver=0.78
|
||||
pkgrel=1
|
||||
pkgdesc="Lightweight and fast MJPG-HTTP streamer"
|
||||
url="https://github.com/pi-kvm/ustreamer"
|
||||
@@ -12,19 +12,23 @@ arch=(i686 x86_64 armv6h armv7h)
|
||||
depends=(libjpeg libevent libutil-linux)
|
||||
# optional: raspberrypi-firmware for OMX JPEG encoder
|
||||
makedepends=(gcc make)
|
||||
source=("$url/archive/v$pkgver.tar.gz")
|
||||
source=(${pkgname}::"git+https://github.com/pi-kvm/ustreamer#commit=v${pkgver}")
|
||||
md5sums=(SKIP)
|
||||
|
||||
|
||||
build() {
|
||||
cd $srcdir
|
||||
cd "$srcdir"
|
||||
rm -rf $pkgname-build
|
||||
cp -r $pkgname-$pkgver $pkgname-build
|
||||
cp -r $pkgname $pkgname-build
|
||||
cd $pkgname-build
|
||||
make CFLAGS="$CFLAGS" LDFLAGS="$LDFLAGS" $MAKEFLAGS
|
||||
|
||||
local _options=""
|
||||
[ -d /opt/vc/include ] && _options="$_options WITH_OMX_ENCODER=1"
|
||||
|
||||
make $_options CFLAGS="$CFLAGS" LDFLAGS="$LDFLAGS" $MAKEFLAGS
|
||||
}
|
||||
|
||||
package() {
|
||||
cd $srcdir/$pkgname-build
|
||||
cd "$srcdir/$pkgname-build"
|
||||
make DESTDIR="$pkgdir" PREFIX=/usr install
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
include $(TOPDIR)/rules.mk
|
||||
|
||||
PKG_NAME:=ustreamer
|
||||
PKG_VERSION:=0.74
|
||||
PKG_VERSION:=0.78
|
||||
PKG_RELEASE:=1
|
||||
PKG_MAINTAINER:=Maxim Devaev <mdevaev@gmail.com>
|
||||
|
||||
|
||||
@@ -22,4 +22,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define VERSION "0.74"
|
||||
#ifndef VERSION
|
||||
# define VERSION "0.78"
|
||||
#endif
|
||||
|
||||
45
src/device.c
45
src/device.c
@@ -30,6 +30,7 @@
|
||||
#include <errno.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include <sys/select.h>
|
||||
#include <sys/mman.h>
|
||||
#include <linux/videodev2.h>
|
||||
|
||||
@@ -85,6 +86,12 @@ struct device_t *device_init() {
|
||||
struct controls_t *ctl;
|
||||
struct device_runtime_t *run;
|
||||
struct device_t *dev;
|
||||
long cores_sysconf;
|
||||
unsigned cores_available;
|
||||
|
||||
cores_sysconf = sysconf(_SC_NPROCESSORS_ONLN);
|
||||
cores_sysconf = (cores_sysconf < 0 ? 0 : cores_sysconf);
|
||||
cores_available = max_u(min_u(cores_sysconf, 4), 1);
|
||||
|
||||
A_CALLOC(ctl, 1);
|
||||
|
||||
@@ -97,8 +104,8 @@ struct device_t *device_init() {
|
||||
dev->height = 480;
|
||||
dev->format = V4L2_PIX_FMT_YUYV;
|
||||
dev->standard = V4L2_STD_UNKNOWN;
|
||||
dev->n_buffers = max_u(min_u(sysconf(_SC_NPROCESSORS_ONLN), 4), 1) + 1;
|
||||
dev->n_workers = dev->n_buffers;
|
||||
dev->n_buffers = cores_available + 1;
|
||||
dev->n_workers = min_u(cores_available, dev->n_buffers);
|
||||
dev->timeout = 1;
|
||||
dev->error_delay = 1;
|
||||
dev->ctl = ctl;
|
||||
@@ -226,6 +233,39 @@ int device_switch_capturing(struct device_t *dev, bool enable) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error) {
|
||||
struct timeval timeout;
|
||||
int retval;
|
||||
|
||||
# define INIT_FD_SET(_set) \
|
||||
fd_set _set; FD_ZERO(&_set); FD_SET(dev->run->fd, &_set);
|
||||
|
||||
INIT_FD_SET(read_fds);
|
||||
INIT_FD_SET(write_fds);
|
||||
INIT_FD_SET(error_fds);
|
||||
|
||||
# undef INIT_FD_SET
|
||||
|
||||
timeout.tv_sec = dev->timeout;
|
||||
timeout.tv_usec = 0;
|
||||
|
||||
LOG_DEBUG("Calling select() on video device ...");
|
||||
|
||||
retval = select(dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
|
||||
if (retval > 0) {
|
||||
*has_read = FD_ISSET(dev->run->fd, &read_fds);
|
||||
*has_write = FD_ISSET(dev->run->fd, &write_fds);
|
||||
*has_error = FD_ISSET(dev->run->fd, &error_fds);
|
||||
} else {
|
||||
has_read = false;
|
||||
has_write = false;
|
||||
has_error = false;
|
||||
}
|
||||
|
||||
LOG_DEBUG("Device select() --> %d", retval);
|
||||
return retval;
|
||||
}
|
||||
|
||||
int device_grab_buffer(struct device_t *dev) {
|
||||
struct v4l2_buffer buf_info;
|
||||
|
||||
@@ -247,6 +287,7 @@ int device_grab_buffer(struct device_t *dev) {
|
||||
|
||||
dev->run->hw_buffers[buf_info.index].used = buf_info.bytesused;
|
||||
memcpy(&dev->run->hw_buffers[buf_info.index].buf_info, &buf_info, sizeof(struct v4l2_buffer));
|
||||
dev->run->pictures[buf_info.index].grab_time = get_now_monotonic();
|
||||
return buf_info.index;
|
||||
}
|
||||
|
||||
|
||||
@@ -120,6 +120,7 @@ int device_open(struct device_t *dev);
|
||||
void device_close(struct device_t *dev);
|
||||
|
||||
int device_switch_capturing(struct device_t *dev, bool enable);
|
||||
int device_select(struct device_t *dev, bool *has_read, bool *has_write, bool *has_error);
|
||||
int device_grab_buffer(struct device_t *dev);
|
||||
int device_release_buffer(struct device_t *dev, unsigned index);
|
||||
int device_consume_event(struct device_t *dev);
|
||||
|
||||
@@ -188,6 +188,8 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
|
||||
|
||||
assert(encoder->run->type != ENCODER_TYPE_UNKNOWN);
|
||||
|
||||
dev->run->pictures[buf_index].encode_begin_time = get_now_monotonic();
|
||||
|
||||
if (encoder->run->type == ENCODER_TYPE_CPU) {
|
||||
cpu_encoder_compress_buffer(dev, buf_index, encoder->run->quality);
|
||||
} else if (encoder->run->type == ENCODER_TYPE_HW) {
|
||||
@@ -201,6 +203,8 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
|
||||
}
|
||||
# endif
|
||||
|
||||
dev->run->pictures[buf_index].encode_end_time = get_now_monotonic();
|
||||
|
||||
return 0;
|
||||
|
||||
# pragma GCC diagnostic ignored "-Wunused-label"
|
||||
|
||||
@@ -105,8 +105,6 @@ void cpu_encoder_compress_buffer(struct device_t *dev, unsigned index, unsigned
|
||||
|
||||
# undef WRITE_SCANLINES
|
||||
|
||||
// TODO: process jpeg errors:
|
||||
// https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg
|
||||
jpeg_finish_compress(&jpeg);
|
||||
jpeg_destroy_compress(&jpeg);
|
||||
|
||||
|
||||
@@ -145,6 +145,7 @@ static int _jpeg_read_geometry(FILE *fp, unsigned *width, unsigned *height) {
|
||||
|
||||
jpeg_create_decompress(&jpeg);
|
||||
|
||||
// https://stackoverflow.com/questions/19857766/error-handling-in-libjpeg
|
||||
jpeg.err = jpeg_std_error((struct jpeg_error_mgr *)&jpeg_error);
|
||||
jpeg_error.mgr.error_exit = _jpeg_error_handler;
|
||||
if (setjmp(jpeg_error.jmp) < 0) {
|
||||
|
||||
@@ -77,7 +77,7 @@ static void _http_callback_stream_error(struct bufferevent *buf_event, short wha
|
||||
static void _http_exposed_refresh(int fd, short event, void *v_server);
|
||||
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated);
|
||||
|
||||
static bool _expose_new_picture(struct http_server_t *server);
|
||||
static bool _expose_new_picture_unsafe(struct http_server_t *server);
|
||||
static bool _expose_blank_picture(struct http_server_t *server);
|
||||
|
||||
|
||||
@@ -807,7 +807,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
|
||||
LOG_DEBUG("Refreshing HTTP exposed ...");
|
||||
A_MUTEX_LOCK(&server->run->stream->mutex);
|
||||
if (server->run->stream->picture.used > 0) { // If online
|
||||
picture_updated = _expose_new_picture(server);
|
||||
picture_updated = _expose_new_picture_unsafe(server);
|
||||
UNLOCK_STREAM;
|
||||
} else {
|
||||
UNLOCK_STREAM;
|
||||
@@ -825,7 +825,7 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
|
||||
_http_queue_send_stream(server, stream_updated, picture_updated);
|
||||
}
|
||||
|
||||
static bool _expose_new_picture(struct http_server_t *server) {
|
||||
static bool _expose_new_picture_unsafe(struct http_server_t *server) {
|
||||
# define STREAM(_next) server->run->stream->_next
|
||||
# define EXPOSED(_next) server->run->exposed->_next
|
||||
|
||||
|
||||
@@ -123,10 +123,10 @@ pthread_mutex_t log_mutex;
|
||||
|
||||
|
||||
INLINE char *errno_to_string(char *buf, size_t size) {
|
||||
#if defined(__GLIBC__) && defined(_GNU_SOURCE)
|
||||
# if defined(__GLIBC__) && defined(_GNU_SOURCE)
|
||||
return strerror_r(errno, buf, size);
|
||||
#else
|
||||
# else
|
||||
strerror_r(errno, buf, size);
|
||||
return buf;
|
||||
#endif
|
||||
# endif
|
||||
}
|
||||
|
||||
25
src/main.c
25
src/main.c
@@ -32,6 +32,10 @@
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#ifdef WITH_WORKERS_GPIO_DEBUG
|
||||
# include <wiringPi.h>
|
||||
#endif
|
||||
|
||||
#include "config.h"
|
||||
#include "tools.h"
|
||||
#include "logging.h"
|
||||
@@ -129,19 +133,19 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s
|
||||
printf(" -z|--min-frame-size <N> ──────── Drop frames smaller then this limit. Useful if the device\n");
|
||||
printf(" produces small-sized garbage frames. Default: disabled.\n\n");
|
||||
printf(" -n|--persistent ──────────────── Don't re-initialize device on timeout. Default: disabled.\n\n");
|
||||
printf(" -t|--dv-timings ──────────────── Enable DV timings queriyng and events processing.\n");
|
||||
printf(" Supports automatic resolution changing. Default: disabled.\n\n");
|
||||
printf(" -t|--dv-timings ──────────────── Enable DV timings queriyng and events processing\n");
|
||||
printf(" to automatic resolution change. Default: disabled.\n\n");
|
||||
printf(" -b|--buffers <N> ─────────────── The number of buffers to receive data from the device.\n");
|
||||
printf(" Each buffer may processed using an intermediate thread.\n");
|
||||
printf(" Default: %u (the number of CPU cores (but not more 4) + 1).\n\n", dev->n_buffers);
|
||||
printf(" Default: %u (the number of CPU cores (but not more than 4) + 1).\n\n", dev->n_buffers);
|
||||
printf(" -w|--workers <N> ─────────────── The number of worker threads but not more than buffers.\n");
|
||||
printf(" Default: %u (== --buffers).\n\n", dev->n_workers);
|
||||
printf(" Default: %u (the number of CPU cores (but not more than 4)).\n\n", dev->n_workers);
|
||||
printf(" -q|--quality <N> ─────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n\n", encoder->quality);
|
||||
printf(" -c|--encoder <type> ──────────── Use specified encoder. It may affects to workers number.\n");
|
||||
printf(" Available: %s; default: CPU.\n\n", ENCODER_TYPES_STR);
|
||||
printf(" --device-timeout <seconds> ───── Timeout for device querying. Default: %u\n\n", dev->timeout);
|
||||
printf(" --device-timeout <seconds> ───── Timeout for device querying. Default: %u.\n\n", dev->timeout);
|
||||
printf(" --device-error-delay <seconds> ─ Delay before trying to connect to the device again\n");
|
||||
printf(" after an error (timeout for example). Default: %u\n\n", dev->error_delay);
|
||||
printf(" after an error (timeout for example). Default: %u.\n\n", dev->error_delay);
|
||||
printf("Image control options:\n");
|
||||
printf("══════════════════════\n");
|
||||
printf(" --brightness <N> ───────────── Set brightness. Default: no change.\n\n");
|
||||
@@ -377,6 +381,15 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
LOGGING_INIT;
|
||||
|
||||
# ifdef WITH_WORKERS_GPIO_DEBUG
|
||||
if (wiringPiSetupGpio() < 0) {
|
||||
LOG_PERROR("Can't initialize wiringPi GPIO");
|
||||
return 1;
|
||||
} else {
|
||||
LOG_INFO("Using wiringPi to debug using GPIO");
|
||||
}
|
||||
# endif
|
||||
|
||||
dev = device_init();
|
||||
encoder = encoder_init();
|
||||
stream = stream_init(dev, encoder);
|
||||
|
||||
504
src/stream.c
504
src/stream.c
@@ -20,6 +20,7 @@
|
||||
*****************************************************************************/
|
||||
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdatomic.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
@@ -28,7 +29,6 @@
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include <sys/select.h>
|
||||
#include <linux/videodev2.h>
|
||||
|
||||
#include "tools.h"
|
||||
@@ -38,16 +38,70 @@
|
||||
#include "encoder.h"
|
||||
#include "stream.h"
|
||||
|
||||
#ifdef WITH_WORKERS_GPIO_DEBUG
|
||||
# include <wiringPi.h>
|
||||
# ifndef WORKERS_GPIO_DEBUG_START_PIN
|
||||
# define WORKERS_GPIO_DEBUG_START_PIN 5
|
||||
# endif
|
||||
#endif
|
||||
|
||||
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
||||
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
struct _worker_t {
|
||||
pthread_t tid;
|
||||
unsigned number;
|
||||
atomic_bool *proc_stop;
|
||||
atomic_bool *workers_stop;
|
||||
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static void *_stream_worker_thread(void *v_worker);
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
pthread_mutex_t last_comp_time_mutex;
|
||||
long double last_comp_time;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
int buf_index;
|
||||
atomic_bool has_job;
|
||||
bool job_timely;
|
||||
bool job_failed;
|
||||
long double job_start_time;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
pthread_mutex_t *free_workers_mutex;
|
||||
unsigned *free_workers;
|
||||
pthread_cond_t *free_workers_cond;
|
||||
|
||||
struct _worker_t *order_prev;
|
||||
struct _worker_t *order_next;
|
||||
|
||||
struct device_t *dev;
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct _workers_pool_t {
|
||||
unsigned n_workers;
|
||||
struct _worker_t *workers;
|
||||
struct _worker_t *oldest_worker;
|
||||
struct _worker_t *latest_worker;
|
||||
|
||||
pthread_mutex_t free_workers_mutex;
|
||||
unsigned free_workers;
|
||||
pthread_cond_t free_workers_cond;
|
||||
|
||||
atomic_bool workers_stop;
|
||||
|
||||
long double desired_frames_interval;
|
||||
};
|
||||
|
||||
|
||||
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream);
|
||||
static struct _workers_pool_t *_stream_init(struct stream_t *stream);
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps);
|
||||
|
||||
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream);
|
||||
static void _workers_pool_destroy(struct _workers_pool_t *pool);
|
||||
|
||||
static void *__worker_thread(void *v_worker);
|
||||
|
||||
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool);
|
||||
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index);
|
||||
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool);
|
||||
|
||||
|
||||
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
||||
@@ -74,20 +128,15 @@ void stream_destroy(struct stream_t *stream) {
|
||||
}
|
||||
|
||||
void stream_loop(struct stream_t *stream) {
|
||||
struct workers_pool_t pool;
|
||||
|
||||
MEMSET_ZERO(pool);
|
||||
atomic_init(&pool.workers_stop, false);
|
||||
pool.encoder = stream->encoder;
|
||||
struct _workers_pool_t *pool;
|
||||
|
||||
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
||||
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
||||
|
||||
while (_stream_init_loop(stream, &pool) == 0) {
|
||||
struct worker_t *oldest_worker = NULL;
|
||||
struct worker_t *last_worker = NULL;
|
||||
while ((pool = _stream_init_loop(stream)) != NULL) {
|
||||
long double grab_after = 0;
|
||||
unsigned fluency_passed = 0;
|
||||
unsigned captured_fps = 0;
|
||||
unsigned captured_fps_accum = 0;
|
||||
long long captured_fps_second = 0;
|
||||
bool persistent_timeout_reported = false;
|
||||
@@ -98,43 +147,22 @@ void stream_loop(struct stream_t *stream) {
|
||||
LOG_INFO("Capturing ...");
|
||||
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
int free_worker_number = -1;
|
||||
struct _worker_t *ready_worker;
|
||||
|
||||
SEP_DEBUG('-');
|
||||
LOG_DEBUG("Waiting for worker ...");
|
||||
|
||||
LOG_DEBUG("Waiting for workers ...");
|
||||
A_MUTEX_LOCK(&pool.free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
|
||||
ready_worker = _workers_pool_wait(pool);
|
||||
|
||||
if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) {
|
||||
if (oldest_worker->job_failed) {
|
||||
break;
|
||||
if (!ready_worker->job_failed) {
|
||||
if (ready_worker->job_timely) {
|
||||
_stream_expose_picture(stream, ready_worker->buf_index, captured_fps);
|
||||
LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number);
|
||||
} else {
|
||||
LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number);
|
||||
}
|
||||
|
||||
_stream_expose_picture(stream, oldest_worker->buf_index);
|
||||
|
||||
free_worker_number = oldest_worker->number;
|
||||
oldest_worker = oldest_worker->order_next;
|
||||
|
||||
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
|
||||
} else {
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
if (
|
||||
!atomic_load(&pool.workers[number].has_job) && (
|
||||
free_worker_number == -1
|
||||
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
|
||||
)
|
||||
) {
|
||||
free_worker_number = number;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert(free_worker_number >= 0);
|
||||
assert(!atomic_load(&pool.workers[free_worker_number].has_job));
|
||||
|
||||
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
|
||||
break;
|
||||
}
|
||||
|
||||
if (atomic_load(&stream->proc->stop)) {
|
||||
@@ -145,30 +173,18 @@ void stream_loop(struct stream_t *stream) {
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
# define INIT_FD_SET(_set) \
|
||||
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
|
||||
bool has_read;
|
||||
bool has_write;
|
||||
bool has_error;
|
||||
int selected = device_select(stream->dev, &has_read, &has_write, &has_error);
|
||||
|
||||
INIT_FD_SET(read_fds);
|
||||
INIT_FD_SET(write_fds);
|
||||
INIT_FD_SET(error_fds);
|
||||
|
||||
# undef INIT_FD_SET
|
||||
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = stream->dev->timeout;
|
||||
timeout.tv_usec = 0;
|
||||
|
||||
LOG_DEBUG("Calling select() on video device ...");
|
||||
int retval = select(stream->dev->run->fd + 1, &read_fds, &write_fds, &error_fds, &timeout);
|
||||
LOG_DEBUG("Device select() --> %d", retval);
|
||||
|
||||
if (retval < 0) {
|
||||
if (selected < 0) {
|
||||
if (errno != EINTR) {
|
||||
LOG_PERROR("Mainloop select() error");
|
||||
break;
|
||||
}
|
||||
|
||||
} else if (retval == 0) {
|
||||
} else if (selected == 0) {
|
||||
if (stream->dev->persistent) {
|
||||
if (!persistent_timeout_reported) {
|
||||
LOG_ERROR("Mainloop select() timeout, polling ...")
|
||||
@@ -183,7 +199,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
} else {
|
||||
persistent_timeout_reported = false;
|
||||
|
||||
if (FD_ISSET(stream->dev->run->fd, &read_fds)) {
|
||||
if (has_read) {
|
||||
LOG_DEBUG("Frame is ready");
|
||||
|
||||
int buf_index;
|
||||
@@ -193,7 +209,6 @@ void stream_loop(struct stream_t *stream) {
|
||||
if ((buf_index = device_grab_buffer(stream->dev)) < 0) {
|
||||
break;
|
||||
}
|
||||
stream->dev->run->pictures[buf_index].grab_time = now;
|
||||
|
||||
// Workaround for broken, corrupted frames:
|
||||
// Under low light conditions corrupted frames may get captured.
|
||||
@@ -201,7 +216,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
// For example a VGA (640x480) webcam picture is normally >= 8kByte large,
|
||||
// corrupted frames are smaller.
|
||||
if (stream->dev->run->hw_buffers[buf_index].used < stream->dev->min_frame_size) {
|
||||
LOG_DEBUG("Dropping too small frame sized %zu bytes, assuming it as broken",
|
||||
LOG_DEBUG("Dropped too small frame sized %zu bytes, assuming it was broken",
|
||||
stream->dev->run->hw_buffers[buf_index].used);
|
||||
goto pass_frame;
|
||||
}
|
||||
@@ -215,50 +230,20 @@ void stream_loop(struct stream_t *stream) {
|
||||
fluency_passed = 0;
|
||||
|
||||
if (now_second != captured_fps_second) {
|
||||
stream->captured_fps = captured_fps_accum;
|
||||
captured_fps = captured_fps_accum;
|
||||
captured_fps_accum = 0;
|
||||
captured_fps_second = now_second;
|
||||
LOG_PERF("Oldest worker complete, Captured-FPS = %u", stream->captured_fps);
|
||||
LOG_PERF("A new second has come, Captured-FPS = %u", captured_fps);
|
||||
}
|
||||
captured_fps_accum += 1;
|
||||
|
||||
long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool);
|
||||
long double fluency_delay = _workers_pool_get_fluency_delay(pool);
|
||||
|
||||
grab_after = now + fluency_delay;
|
||||
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
|
||||
}
|
||||
|
||||
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
|
||||
|
||||
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_index);
|
||||
|
||||
if (oldest_worker == NULL) {
|
||||
oldest_worker = &pool.workers[free_worker_number];
|
||||
last_worker = oldest_worker;
|
||||
} else {
|
||||
if (FREE_WORKER(order_next)) {
|
||||
FREE_WORKER(order_next->order_prev) = FREE_WORKER(order_prev);
|
||||
}
|
||||
if (FREE_WORKER(order_prev)) {
|
||||
FREE_WORKER(order_prev->order_next) = FREE_WORKER(order_next);
|
||||
}
|
||||
FREE_WORKER(order_prev) = last_worker;
|
||||
last_worker->order_next = &pool.workers[free_worker_number];
|
||||
last_worker = &pool.workers[free_worker_number];
|
||||
}
|
||||
last_worker->order_next = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
|
||||
FREE_WORKER(buf_index) = buf_index;
|
||||
atomic_store(&FREE_WORKER(has_job), true);
|
||||
A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
|
||||
|
||||
# undef FREE_WORKER
|
||||
|
||||
A_MUTEX_LOCK(&pool.free_workers_mutex);
|
||||
pool.free_workers -= 1;
|
||||
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
|
||||
_workers_pool_assign(pool, ready_worker, buf_index);
|
||||
|
||||
goto next_handlers; // Поток сам освободит буфер
|
||||
|
||||
@@ -271,12 +256,12 @@ void stream_loop(struct stream_t *stream) {
|
||||
|
||||
next_handlers:
|
||||
|
||||
if (FD_ISSET(stream->dev->run->fd, &write_fds)) {
|
||||
if (has_write) {
|
||||
LOG_ERROR("Got unexpected writing event, seems device was disconnected");
|
||||
break;
|
||||
}
|
||||
|
||||
if (FD_ISSET(stream->dev->run->fd, &error_fds)) {
|
||||
if (has_error) {
|
||||
LOG_INFO("Got V4L2 event");
|
||||
if (device_consume_event(stream->dev) < 0) {
|
||||
break;
|
||||
@@ -292,11 +277,11 @@ void stream_loop(struct stream_t *stream) {
|
||||
stream->height = 0;
|
||||
atomic_store(&stream->updated, true);
|
||||
A_MUTEX_UNLOCK(&stream->mutex);
|
||||
}
|
||||
|
||||
_stream_destroy_workers(stream, &pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
_workers_pool_destroy(pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
}
|
||||
}
|
||||
|
||||
void stream_loop_break(struct stream_t *stream) {
|
||||
@@ -307,7 +292,41 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
|
||||
atomic_store(&stream->proc->slowdown, slowdown);
|
||||
}
|
||||
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
|
||||
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream) {
|
||||
struct _workers_pool_t *pool = NULL;
|
||||
|
||||
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
SEP_INFO('=');
|
||||
|
||||
if ((pool = _stream_init(stream)) == NULL) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
|
||||
sleep(stream->dev->error_delay);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
static struct _workers_pool_t *_stream_init(struct stream_t *stream) {
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
if (device_switch_capturing(stream->dev, true) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
encoder_prepare(stream->encoder, stream->dev);
|
||||
return _workers_pool_init(stream);
|
||||
|
||||
error:
|
||||
device_close(stream->dev);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index, unsigned captured_fps) {
|
||||
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
|
||||
|
||||
A_MUTEX_LOCK(&stream->mutex);
|
||||
@@ -323,6 +342,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
|
||||
|
||||
stream->width = stream->dev->run->width;
|
||||
stream->height = stream->dev->run->height;
|
||||
stream->captured_fps = captured_fps;
|
||||
atomic_store(&stream->updated, true);
|
||||
|
||||
A_MUTEX_UNLOCK(&stream->mutex);
|
||||
@@ -330,118 +350,98 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
|
||||
# undef PICTURE
|
||||
}
|
||||
|
||||
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
long double sum_comp_time = 0;
|
||||
long double avg_comp_time;
|
||||
long double min_delay;
|
||||
long double soft_delay;
|
||||
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
||||
struct _workers_pool_t *pool;
|
||||
|
||||
for (unsigned number = 0; number < dev->run->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers);
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
|
||||
if (WORKER(last_comp_time) > 0) {
|
||||
sum_comp_time += WORKER(last_comp_time);
|
||||
}
|
||||
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
|
||||
A_CALLOC(pool, 1);
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров
|
||||
|
||||
min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
|
||||
if (dev->desired_fps > 0 && min_delay > 0) {
|
||||
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
|
||||
soft_delay = ((long double) 1) / dev->desired_fps - sum_comp_time;
|
||||
return (min_delay > soft_delay ? min_delay : soft_delay);
|
||||
}
|
||||
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
int retval = -1;
|
||||
|
||||
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
if ((retval = _stream_init(stream, pool)) < 0) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
|
||||
sleep(stream->dev->error_delay);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return retval;
|
||||
}
|
||||
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
_stream_destroy_workers(stream, pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
|
||||
SEP_INFO('=');
|
||||
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
if (device_switch_capturing(stream->dev, true) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
encoder_prepare(pool->encoder, stream->dev);
|
||||
|
||||
_stream_init_workers(stream, pool);
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
device_close(stream->dev);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers);
|
||||
|
||||
atomic_store(&pool->workers_stop, false);
|
||||
A_CALLOC(pool->workers, stream->dev->run->n_workers);
|
||||
pool->n_workers = stream->dev->run->n_workers;
|
||||
A_CALLOC(pool->workers, pool->n_workers);
|
||||
|
||||
A_MUTEX_INIT(&pool->free_workers_mutex);
|
||||
A_COND_INIT(&pool->free_workers_cond);
|
||||
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
atomic_init(&pool->workers_stop, false);
|
||||
|
||||
pool->free_workers += 1;
|
||||
if (stream->dev->desired_fps > 0) {
|
||||
pool->desired_frames_interval = (long double)1 / stream->dev->desired_fps;
|
||||
}
|
||||
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_INIT(&WORKER(has_job_mutex));
|
||||
atomic_init(&WORKER(has_job), false);
|
||||
A_COND_INIT(&WORKER(has_job_cond));
|
||||
|
||||
WORKER(number) = number;
|
||||
WORKER(proc_stop) = &stream->proc->stop;
|
||||
WORKER(workers_stop) = &pool->workers_stop;
|
||||
WORKER(number) = number;
|
||||
WORKER(proc_stop) = &stream->proc->stop;
|
||||
WORKER(workers_stop) = &pool->workers_stop;
|
||||
|
||||
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WORKER(free_workers) = &pool->free_workers;
|
||||
WORKER(free_workers_cond) = &pool->free_workers_cond;
|
||||
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WORKER(free_workers) = &pool->free_workers;
|
||||
WORKER(free_workers_cond) = &pool->free_workers_cond;
|
||||
|
||||
WORKER(dev) = stream->dev;
|
||||
WORKER(encoder) = pool->encoder;
|
||||
WORKER(dev) = stream->dev;
|
||||
WORKER(encoder) = stream->encoder;
|
||||
|
||||
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
|
||||
A_THREAD_CREATE(&WORKER(tid), __worker_thread, (void *)&(pool->workers[number]));
|
||||
|
||||
pool->free_workers += 1;
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
static void *_stream_worker_thread(void *v_worker) {
|
||||
struct worker_t *worker = (struct worker_t *)v_worker;
|
||||
static void _workers_pool_destroy(struct _workers_pool_t *pool) {
|
||||
LOG_INFO("Destroying workers pool ...");
|
||||
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(has_job_mutex));
|
||||
atomic_store(&WORKER(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&WORKER(has_job_cond));
|
||||
|
||||
A_THREAD_JOIN(WORKER(tid));
|
||||
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
|
||||
A_COND_DESTROY(&WORKER(has_job_cond));
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
free(pool);
|
||||
}
|
||||
|
||||
static void *__worker_thread(void *v_worker) {
|
||||
struct _worker_t *worker = (struct _worker_t *)v_worker;
|
||||
|
||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
|
||||
|
||||
# ifdef WITH_WORKERS_GPIO_DEBUG
|
||||
# define WORKER_GPIO_DEBUG_BUSY digitalWrite(WORKERS_GPIO_DEBUG_START_PIN + worker->number, HIGH)
|
||||
# define WORKER_GPIO_DEBUG_FREE digitalWrite(WORKERS_GPIO_DEBUG_START_PIN + worker->number, LOW)
|
||||
pinMode(WORKERS_GPIO_DEBUG_START_PIN + worker->number, OUTPUT);
|
||||
WORKER_GPIO_DEBUG_FREE;
|
||||
# else
|
||||
# define WORKER_GPIO_DEBUG_BUSY
|
||||
# define WORKER_GPIO_DEBUG_FREE
|
||||
# endif
|
||||
|
||||
while (!atomic_load(worker->proc_stop) && !atomic_load(worker->workers_stop)) {
|
||||
LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
|
||||
|
||||
WORKER_GPIO_DEBUG_FREE;
|
||||
|
||||
A_MUTEX_LOCK(&worker->has_job_mutex);
|
||||
A_COND_WAIT_TRUE(atomic_load(&worker->has_job), &worker->has_job_cond, &worker->has_job_mutex);
|
||||
A_MUTEX_UNLOCK(&worker->has_job_mutex);
|
||||
@@ -451,11 +451,11 @@ static void *_stream_worker_thread(void *v_worker) {
|
||||
|
||||
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
|
||||
|
||||
PICTURE(encode_begin_time) = get_now_monotonic();
|
||||
WORKER_GPIO_DEBUG_BUSY;
|
||||
|
||||
if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
|
||||
worker->job_failed = true;
|
||||
worker->job_failed = false;
|
||||
}
|
||||
PICTURE(encode_end_time) = get_now_monotonic();
|
||||
|
||||
if (device_release_buffer(worker->dev, worker->buf_index) == 0) {
|
||||
worker->job_start_time = PICTURE(encode_begin_time);
|
||||
@@ -484,34 +484,96 @@ static void *_stream_worker_thread(void *v_worker) {
|
||||
}
|
||||
|
||||
LOG_DEBUG("Bye-bye (worker %u)", worker->number);
|
||||
WORKER_GPIO_DEBUG_FREE;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
if (pool->workers) {
|
||||
LOG_INFO("Destroying workers ...");
|
||||
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool) {
|
||||
struct _worker_t *ready_worker = NULL;
|
||||
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(has_job_mutex));
|
||||
atomic_store(&WORKER(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&WORKER(has_job_cond));
|
||||
|
||||
A_THREAD_JOIN(WORKER(tid));
|
||||
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
|
||||
A_COND_DESTROY(&WORKER(has_job_cond));
|
||||
|
||||
# undef WORKER
|
||||
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
|
||||
ready_worker = pool->oldest_worker;
|
||||
ready_worker->job_timely = true;
|
||||
pool->oldest_worker = pool->oldest_worker->order_next;
|
||||
} else {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
if (
|
||||
!atomic_load(&pool->workers[number].has_job) && (
|
||||
ready_worker == NULL
|
||||
|| ready_worker->job_start_time < pool->workers[number].job_start_time
|
||||
)
|
||||
) {
|
||||
ready_worker = &pool->workers[number];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
assert(ready_worker != NULL);
|
||||
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
|
||||
}
|
||||
pool->free_workers = 0;
|
||||
pool->workers = NULL;
|
||||
return ready_worker;
|
||||
}
|
||||
|
||||
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index) {
|
||||
if (pool->oldest_worker == NULL) {
|
||||
pool->oldest_worker = ready_worker;
|
||||
pool->latest_worker = pool->oldest_worker;
|
||||
} else {
|
||||
if (ready_worker->order_next) {
|
||||
ready_worker->order_next->order_prev = ready_worker->order_prev;
|
||||
}
|
||||
if (ready_worker->order_prev) {
|
||||
ready_worker->order_prev->order_next = ready_worker->order_next;
|
||||
}
|
||||
ready_worker->order_prev = pool->latest_worker;
|
||||
pool->latest_worker->order_next = ready_worker;
|
||||
pool->latest_worker = ready_worker;
|
||||
}
|
||||
pool->latest_worker->order_next = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&ready_worker->has_job_mutex);
|
||||
ready_worker->buf_index = buf_index;
|
||||
atomic_store(&ready_worker->has_job, true);
|
||||
A_MUTEX_UNLOCK(&ready_worker->has_job_mutex);
|
||||
A_COND_SIGNAL(&ready_worker->has_job_cond);
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
pool->free_workers -= 1;
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
|
||||
}
|
||||
|
||||
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool) {
|
||||
long double sum_comp_time = 0;
|
||||
long double avg_comp_time;
|
||||
long double min_delay;
|
||||
long double soft_delay;
|
||||
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
|
||||
if (WORKER(last_comp_time) > 0) {
|
||||
sum_comp_time += WORKER(last_comp_time);
|
||||
}
|
||||
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
|
||||
avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров
|
||||
|
||||
min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
|
||||
if (pool->desired_frames_interval > 0 && min_delay > 0) {
|
||||
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
|
||||
soft_delay = pool->desired_frames_interval - sum_comp_time;
|
||||
return (min_delay > soft_delay ? min_delay : soft_delay);
|
||||
}
|
||||
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
41
src/stream.h
41
src/stream.h
@@ -22,53 +22,12 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "device.h"
|
||||
#include "encoder.h"
|
||||
|
||||
|
||||
struct worker_t {
|
||||
pthread_t tid;
|
||||
unsigned number;
|
||||
atomic_bool *proc_stop;
|
||||
atomic_bool *workers_stop;
|
||||
|
||||
pthread_mutex_t last_comp_time_mutex;
|
||||
long double last_comp_time;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
int buf_index;
|
||||
atomic_bool has_job;
|
||||
bool job_failed;
|
||||
long double job_start_time;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
pthread_mutex_t *free_workers_mutex;
|
||||
unsigned *free_workers;
|
||||
pthread_cond_t *free_workers_cond;
|
||||
|
||||
struct worker_t *order_prev;
|
||||
struct worker_t *order_next;
|
||||
|
||||
struct device_t *dev;
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct workers_pool_t {
|
||||
struct worker_t *workers;
|
||||
atomic_bool workers_stop;
|
||||
|
||||
pthread_mutex_t free_workers_mutex;
|
||||
unsigned free_workers;
|
||||
pthread_cond_t free_workers_cond;
|
||||
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct process_t {
|
||||
atomic_bool stop;
|
||||
atomic_bool slowdown;
|
||||
|
||||
Reference in New Issue
Block a user