mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-02-20 16:56:30 +00:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92844fc3db | ||
|
|
7bb9434850 | ||
|
|
3104a00913 | ||
|
|
28c51f3f2f | ||
|
|
5c035f21c8 | ||
|
|
52ecaf7fd3 | ||
|
|
aa007c676f | ||
|
|
1f0b49e5fe | ||
|
|
d979209096 | ||
|
|
154d8e4c2b | ||
|
|
27d42d2545 | ||
|
|
142670c374 | ||
|
|
2a9c4d7e7a | ||
|
|
dc43f01a7d | ||
|
|
bac7a2595e | ||
|
|
50158397a0 | ||
|
|
484f89cb82 |
@@ -1,7 +1,7 @@
|
||||
[bumpversion]
|
||||
commit = True
|
||||
tag = True
|
||||
current_version = 0.55
|
||||
current_version = 0.59
|
||||
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)?
|
||||
serialize =
|
||||
{major}.{minor}
|
||||
|
||||
2
Makefile
2
Makefile
@@ -7,7 +7,7 @@ CC ?= gcc
|
||||
|
||||
# =====
|
||||
LIBS = -lm -ljpeg -pthread -levent -levent_pthreads -luuid
|
||||
override CFLAGS += -c -std=c99 -Wall -Wextra -D_GNU_SOURCE
|
||||
override CFLAGS += -c -std=c11 -Wall -Wextra -D_GNU_SOURCE
|
||||
SOURCES = $(shell ls src/*.c src/encoders/cpu/*.c src/encoders/hw/*.c)
|
||||
PROG = ustreamer
|
||||
|
||||
|
||||
2
PKGBUILD
2
PKGBUILD
@@ -3,7 +3,7 @@
|
||||
|
||||
|
||||
pkgname=ustreamer
|
||||
pkgver=0.55
|
||||
pkgver=0.59
|
||||
pkgrel=1
|
||||
pkgdesc="Lightweight and fast MJPG-HTTP streamer"
|
||||
url="https://github.com/pi-kvm/ustreamer"
|
||||
|
||||
@@ -56,6 +56,8 @@ The recommended way of running µStreamer with [Auvidea B101](https://www.raspbe
|
||||
$ ./ustreamer \
|
||||
--format=uyvy \ # Device input format
|
||||
--encoder=omx \ # Hardware encoding with OpenMAX
|
||||
--workers=3 \ # Maximum workers for OpenMAX
|
||||
--persistent \ # Don't re-initialize device on timeout (for example when HDMI cable was disconnected)
|
||||
--dv-timings \ # Use DV-timings
|
||||
--drop-same-frames=30 # Save that traffic
|
||||
```
|
||||
|
||||
@@ -56,6 +56,8 @@ $ ./ustreamer --help
|
||||
$ ./ustreamer \
|
||||
--format=uyvy \ # Настройка входного формата устройства
|
||||
--encoder=omx \ # Использование аппаратного кодирования с помощью OpenMAX
|
||||
--workers=3 \ # Максимум воркеров для OpenMAX
|
||||
--persistent \ # Не переинициализировать устройство при таймауте (например, когда был отключен HDMI-кабель)
|
||||
--dv-timings \ # Включение DV-таймингов
|
||||
--drop-same-frames=30 # Экономим трафик
|
||||
```
|
||||
|
||||
@@ -22,4 +22,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#define VERSION "0.55"
|
||||
#define VERSION "0.59"
|
||||
|
||||
67
src/device.c
67
src/device.c
@@ -95,7 +95,7 @@ struct device_t *device_init() {
|
||||
dev->height = 480;
|
||||
dev->format = V4L2_PIX_FMT_YUYV;
|
||||
dev->standard = V4L2_STD_UNKNOWN;
|
||||
dev->n_buffers = max_u(sysconf(_SC_NPROCESSORS_ONLN), 1) + 1;
|
||||
dev->n_buffers = max_u(min_u(sysconf(_SC_NPROCESSORS_ONLN), 4), 1) + 1;
|
||||
dev->n_workers = dev->n_buffers;
|
||||
dev->timeout = 1;
|
||||
dev->error_delay = 1;
|
||||
@@ -201,6 +201,71 @@ void device_close(struct device_t *dev) {
|
||||
}
|
||||
}
|
||||
|
||||
int device_switch_capturing(struct device_t *dev, bool enable) {
|
||||
if (enable != dev->run->capturing) {
|
||||
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(%s) ...", (enable ? "VIDIOC_STREAMON" : "VIDIOC_STREAMOFF"));
|
||||
if (xioctl(dev->run->fd, (enable ? VIDIOC_STREAMON : VIDIOC_STREAMOFF), &type) < 0) {
|
||||
LOG_PERROR("Unable to %s capturing", (enable ? "start" : "stop"));
|
||||
if (enable) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
dev->run->capturing = enable;
|
||||
LOG_INFO("Capturing %s", (enable ? "started" : "stopped"));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int device_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
|
||||
MEMSET_ZERO_PTR(buf_info);
|
||||
buf_info->type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf_info->memory = V4L2_MEMORY_MMAP;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_DQBUF) ...");
|
||||
if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf_info) < 0) {
|
||||
LOG_PERROR("Unable to dequeue buffer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
LOG_DEBUG("Got a new frame in buffer index=%u; bytesused=%u", buf_info->index, buf_info->bytesused);
|
||||
if (buf_info->index >= dev->run->n_buffers) {
|
||||
LOG_ERROR("Got invalid buffer index=%u; nbuffers=%u", buf_info->index, dev->run->n_buffers);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int device_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_QBUF) ...");
|
||||
if (xioctl(dev->run->fd, VIDIOC_QBUF, buf_info) < 0) {
|
||||
LOG_PERROR("Unable to requeue buffer");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int device_consume_event(struct device_t *dev) {
|
||||
struct v4l2_event event;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_DQEVENT) ...");
|
||||
if (!xioctl(dev->run->fd, VIDIOC_DQEVENT, &event)) {
|
||||
switch (event.type) {
|
||||
case V4L2_EVENT_SOURCE_CHANGE:
|
||||
LOG_INFO("Got V4L2_EVENT_SOURCE_CHANGE: source changed");
|
||||
return -1;
|
||||
case V4L2_EVENT_EOS:
|
||||
LOG_INFO("Got V4L2_EVENT_EOS: end of stream (ignored)");
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
LOG_PERROR("Got some V4L2 device event, but where is it? ");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _device_open_check_cap(struct device_t *dev) {
|
||||
struct v4l2_capability cap;
|
||||
int input = dev->input; // Needs pointer to int for ioctl()
|
||||
|
||||
@@ -24,7 +24,6 @@
|
||||
|
||||
#include <stddef.h>
|
||||
#include <stdbool.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <linux/videodev2.h>
|
||||
|
||||
@@ -106,7 +105,6 @@ struct device_t {
|
||||
struct controls_t *ctl;
|
||||
|
||||
struct device_runtime_t *run;
|
||||
sig_atomic_t volatile stop;
|
||||
};
|
||||
|
||||
|
||||
@@ -118,3 +116,8 @@ v4l2_std_id device_parse_standard(const char *str);
|
||||
|
||||
int device_open(struct device_t *dev);
|
||||
void device_close(struct device_t *dev);
|
||||
|
||||
int device_switch_capturing(struct device_t *dev, bool enable);
|
||||
int device_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
|
||||
int device_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
|
||||
int device_consume_event(struct device_t *dev);
|
||||
|
||||
@@ -57,7 +57,7 @@ struct encoder_t *encoder_init() {
|
||||
A_CALLOC(run, 1);
|
||||
run->type = ENCODER_TYPE_CPU;
|
||||
run->quality = 80;
|
||||
A_PTHREAD_M_INIT(&run->mutex);
|
||||
A_MUTEX_INIT(&run->mutex);
|
||||
|
||||
A_CALLOC(encoder, 1);
|
||||
encoder->type = run->type;
|
||||
@@ -119,7 +119,7 @@ void encoder_destroy(struct encoder_t *encoder) {
|
||||
free(encoder->run->omxs);
|
||||
}
|
||||
# endif
|
||||
A_PTHREAD_M_DESTROY(&encoder->run->mutex);
|
||||
A_MUTEX_DESTROY(&encoder->run->mutex);
|
||||
free(encoder->run);
|
||||
free(encoder);
|
||||
}
|
||||
@@ -150,9 +150,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
|
||||
&& encoder->run->type != ENCODER_TYPE_HW
|
||||
) {
|
||||
LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG");
|
||||
A_PTHREAD_M_LOCK(&encoder->run->mutex);
|
||||
A_MUTEX_LOCK(&encoder->run->mutex);
|
||||
encoder->run->type = ENCODER_TYPE_HW;
|
||||
A_PTHREAD_M_UNLOCK(&encoder->run->mutex);
|
||||
A_MUTEX_UNLOCK(&encoder->run->mutex);
|
||||
}
|
||||
|
||||
if (encoder->run->type == ENCODER_TYPE_HW) {
|
||||
@@ -161,9 +161,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
|
||||
goto use_fallback;
|
||||
}
|
||||
if (hw_encoder_prepare_live(dev, encoder->quality) < 0) {
|
||||
A_PTHREAD_M_LOCK(&encoder->run->mutex);
|
||||
A_MUTEX_LOCK(&encoder->run->mutex);
|
||||
encoder->run->quality = 0;
|
||||
A_PTHREAD_M_UNLOCK(&encoder->run->mutex);
|
||||
A_MUTEX_UNLOCK(&encoder->run->mutex);
|
||||
LOG_INFO("Using JPEG quality: HW-default");
|
||||
}
|
||||
}
|
||||
@@ -181,10 +181,10 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
|
||||
return;
|
||||
|
||||
use_fallback:
|
||||
A_PTHREAD_M_LOCK(&encoder->run->mutex);
|
||||
A_MUTEX_LOCK(&encoder->run->mutex);
|
||||
encoder->run->type = ENCODER_TYPE_CPU;
|
||||
encoder->run->quality = encoder->quality;
|
||||
A_PTHREAD_M_UNLOCK(&encoder->run->mutex);
|
||||
A_MUTEX_UNLOCK(&encoder->run->mutex);
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wunused-label"
|
||||
@@ -213,10 +213,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
|
||||
# pragma GCC diagnostic push
|
||||
use_fallback:
|
||||
LOG_INFO("Error while compressing, falling back to CPU");
|
||||
A_PTHREAD_M_LOCK(&encoder->run->mutex);
|
||||
A_MUTEX_LOCK(&encoder->run->mutex);
|
||||
encoder->run->type = ENCODER_TYPE_CPU;
|
||||
encoder->run->quality = encoder->quality;
|
||||
A_PTHREAD_M_UNLOCK(&encoder->run->mutex);
|
||||
A_MUTEX_UNLOCK(&encoder->run->mutex);
|
||||
return -1;
|
||||
# pragma GCC diagnostic pop
|
||||
}
|
||||
|
||||
@@ -42,8 +42,8 @@
|
||||
#include "encoder.h"
|
||||
|
||||
|
||||
#define INPUT_PORT 340
|
||||
#define OUTPUT_PORT 341
|
||||
static const OMX_U32 _INPUT_PORT = 340;
|
||||
static const OMX_U32 _OUTPUT_PORT = 341;
|
||||
|
||||
|
||||
static int _i_omx = 0;
|
||||
@@ -300,7 +300,7 @@ static int _omx_setup_input(struct omx_encoder_t *omx, struct device_t *dev) {
|
||||
|
||||
LOG_DEBUG("Setting up OMX JPEG input port ...");
|
||||
|
||||
if (component_get_portdef(&omx->encoder, &portdef, INPUT_PORT) < 0) {
|
||||
if (component_get_portdef(&omx->encoder, &portdef, _INPUT_PORT) < 0) {
|
||||
LOG_ERROR("... first");
|
||||
return -1;
|
||||
}
|
||||
@@ -338,17 +338,17 @@ static int _omx_setup_input(struct omx_encoder_t *omx, struct device_t *dev) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (component_get_portdef(&omx->encoder, &portdef, INPUT_PORT) < 0) {
|
||||
if (component_get_portdef(&omx->encoder, &portdef, _INPUT_PORT) < 0) {
|
||||
LOG_ERROR("... second");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (component_enable_port(&omx->encoder, INPUT_PORT) < 0) {
|
||||
if (component_enable_port(&omx->encoder, _INPUT_PORT) < 0) {
|
||||
return -1;
|
||||
}
|
||||
omx->i_input_port_enabled = true;
|
||||
|
||||
if ((error = OMX_AllocateBuffer(omx->encoder, &omx->input_buffer, INPUT_PORT, NULL, portdef.nBufferSize)) != OMX_ErrorNone) {
|
||||
if ((error = OMX_AllocateBuffer(omx->encoder, &omx->input_buffer, _INPUT_PORT, NULL, portdef.nBufferSize)) != OMX_ErrorNone) {
|
||||
LOG_OMX_ERROR(error, "Can't allocate OMX JPEG input buffer");
|
||||
return -1;
|
||||
}
|
||||
@@ -361,7 +361,7 @@ static int _omx_setup_output(struct omx_encoder_t *omx, unsigned quality) {
|
||||
|
||||
LOG_DEBUG("Setting up OMX JPEG output port ...");
|
||||
|
||||
if (component_get_portdef(&omx->encoder, &portdef, OUTPUT_PORT) < 0) {
|
||||
if (component_get_portdef(&omx->encoder, &portdef, _OUTPUT_PORT) < 0) {
|
||||
LOG_ERROR("... first");
|
||||
return -1;
|
||||
}
|
||||
@@ -374,7 +374,7 @@ static int _omx_setup_output(struct omx_encoder_t *omx, unsigned quality) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (component_get_portdef(&omx->encoder, &portdef, OUTPUT_PORT) < 0) {
|
||||
if (component_get_portdef(&omx->encoder, &portdef, _OUTPUT_PORT) < 0) {
|
||||
LOG_ERROR("... second");
|
||||
return -1;
|
||||
}
|
||||
@@ -395,7 +395,7 @@ static int _omx_setup_output(struct omx_encoder_t *omx, unsigned quality) {
|
||||
OMX_PARAM_IJGSCALINGTYPE ijg;
|
||||
|
||||
OMX_INIT_STRUCTURE(ijg);
|
||||
ijg.nPortIndex = OUTPUT_PORT;
|
||||
ijg.nPortIndex = _OUTPUT_PORT;
|
||||
ijg.bEnabled = OMX_TRUE;
|
||||
|
||||
if ((error = OMX_SetParameter(omx->encoder, OMX_IndexParamBrcmEnableIJGTableScaling, &ijg)) != OMX_ErrorNone) {
|
||||
@@ -408,7 +408,7 @@ static int _omx_setup_output(struct omx_encoder_t *omx, unsigned quality) {
|
||||
OMX_IMAGE_PARAM_QFACTORTYPE qfactor;
|
||||
|
||||
OMX_INIT_STRUCTURE(qfactor);
|
||||
qfactor.nPortIndex = OUTPUT_PORT;
|
||||
qfactor.nPortIndex = _OUTPUT_PORT;
|
||||
qfactor.nQFactor = quality;
|
||||
|
||||
if ((error = OMX_SetParameter(omx->encoder, OMX_IndexParamQFactor, &qfactor)) != OMX_ErrorNone) {
|
||||
@@ -417,12 +417,12 @@ static int _omx_setup_output(struct omx_encoder_t *omx, unsigned quality) {
|
||||
}
|
||||
}
|
||||
|
||||
if (component_enable_port(&omx->encoder, OUTPUT_PORT) < 0) {
|
||||
if (component_enable_port(&omx->encoder, _OUTPUT_PORT) < 0) {
|
||||
return -1;
|
||||
}
|
||||
omx->i_output_port_enabled = true;
|
||||
|
||||
if ((error = OMX_AllocateBuffer(omx->encoder, &omx->output_buffer, OUTPUT_PORT, NULL, portdef.nBufferSize)) != OMX_ErrorNone) {
|
||||
if ((error = OMX_AllocateBuffer(omx->encoder, &omx->output_buffer, _OUTPUT_PORT, NULL, portdef.nBufferSize)) != OMX_ErrorNone) {
|
||||
LOG_OMX_ERROR(error, "Can't allocate OMX JPEG output buffer");
|
||||
return -1;
|
||||
}
|
||||
@@ -434,23 +434,23 @@ static int _omx_encoder_clear_ports(struct omx_encoder_t *omx) {
|
||||
int retcode = 0;
|
||||
|
||||
if (omx->i_output_port_enabled) {
|
||||
retcode -= component_disable_port(&omx->encoder, OUTPUT_PORT);
|
||||
retcode -= component_disable_port(&omx->encoder, _OUTPUT_PORT);
|
||||
omx->i_output_port_enabled = false;
|
||||
}
|
||||
if (omx->i_input_port_enabled) {
|
||||
retcode -= component_disable_port(&omx->encoder, INPUT_PORT);
|
||||
retcode -= component_disable_port(&omx->encoder, _INPUT_PORT);
|
||||
omx->i_input_port_enabled = false;
|
||||
}
|
||||
|
||||
if (omx->input_buffer) {
|
||||
if ((error = OMX_FreeBuffer(omx->encoder, INPUT_PORT, omx->input_buffer)) != OMX_ErrorNone) {
|
||||
if ((error = OMX_FreeBuffer(omx->encoder, _INPUT_PORT, omx->input_buffer)) != OMX_ErrorNone) {
|
||||
LOG_OMX_ERROR(error, "Can't free OMX JPEG input buffer");
|
||||
// retcode -= 1;
|
||||
}
|
||||
omx->input_buffer = NULL;
|
||||
}
|
||||
if (omx->output_buffer) {
|
||||
if ((error = OMX_FreeBuffer(omx->encoder, OUTPUT_PORT, omx->output_buffer)) != OMX_ErrorNone) {
|
||||
if ((error = OMX_FreeBuffer(omx->encoder, _OUTPUT_PORT, omx->output_buffer)) != OMX_ErrorNone) {
|
||||
LOG_OMX_ERROR(error, "Can't free OMX JPEG output buffer");
|
||||
// retcode -= 1;
|
||||
}
|
||||
|
||||
61
src/http.c
61
src/http.c
@@ -22,6 +22,7 @@
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdatomic.h>
|
||||
#include <string.h>
|
||||
#include <strings.h>
|
||||
#include <unistd.h>
|
||||
@@ -147,6 +148,10 @@ int http_server_listen(struct http_server_t *server) {
|
||||
|
||||
server->run->drop_same_frames_blank = max_u(server->drop_same_frames, server->run->drop_same_frames_blank);
|
||||
|
||||
if (server->slowdown) {
|
||||
stream_switch_slowdown(server->run->stream, true);
|
||||
}
|
||||
|
||||
evhttp_set_timeout(server->run->http, server->timeout);
|
||||
|
||||
if (server->unix_path) {
|
||||
@@ -264,10 +269,10 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
|
||||
|
||||
# define ENCODER(_next) server->run->stream->encoder->_next
|
||||
|
||||
A_PTHREAD_M_LOCK(&ENCODER(run->mutex));
|
||||
A_MUTEX_LOCK(&ENCODER(run->mutex));
|
||||
encoder_run_type = ENCODER(run->type);
|
||||
encoder_run_quality = ENCODER(run->quality);
|
||||
A_PTHREAD_M_UNLOCK(&ENCODER(run->mutex));
|
||||
A_MUTEX_UNLOCK(&ENCODER(run->mutex));
|
||||
|
||||
assert((buf = evbuffer_new()));
|
||||
|
||||
@@ -326,11 +331,15 @@ static void _http_callback_snapshot(struct evhttp_request *request, void *v_serv
|
||||
ADD_HEADER("Pragma", "no-cache");
|
||||
ADD_HEADER("Expires", "Mon, 3 Jan 2000 12:34:56 GMT");
|
||||
|
||||
# define ADD_TIME_HEADER(_key, _value) \
|
||||
{ sprintf(header_buf, "%.06Lf", _value); ADD_HEADER(_key, header_buf); }
|
||||
# define ADD_TIME_HEADER(_key, _value) { \
|
||||
sprintf(header_buf, "%.06Lf", _value); \
|
||||
ADD_HEADER(_key, header_buf); \
|
||||
}
|
||||
|
||||
# define ADD_UNSIGNED_HEADER(_key, _value) \
|
||||
{ sprintf(header_buf, "%u", _value); ADD_HEADER(_key, header_buf); }
|
||||
# define ADD_UNSIGNED_HEADER(_key, _value) { \
|
||||
sprintf(header_buf, "%u", _value); \
|
||||
ADD_HEADER(_key, header_buf); \
|
||||
}
|
||||
|
||||
ADD_TIME_HEADER("X-Timestamp", get_now_real());
|
||||
|
||||
@@ -406,14 +415,13 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
|
||||
}
|
||||
server->run->stream_clients_count += 1;
|
||||
|
||||
if (server->slowdown && server->run->stream_clients_count == 1) {
|
||||
stream_switch_slowdown(server->run->stream, false);
|
||||
}
|
||||
|
||||
evhttp_connection_get_peer(conn, &client_addr, &client_port);
|
||||
LOG_INFO("HTTP: Registered the new stream client: [%s]:%u; id=%s; advance_headers=%s; dual_final_frames=%s; clients now: %u",
|
||||
client_addr,
|
||||
client_port,
|
||||
client->id,
|
||||
bool_to_string(client->advance_headers),
|
||||
bool_to_string(client->dual_final_frames),
|
||||
server->run->stream_clients_count);
|
||||
LOG_INFO("HTTP: Registered the new stream client: [%s]:%u; id=%s; clients now: %u",
|
||||
client_addr, client_port, client->id, server->run->stream_clients_count);
|
||||
|
||||
buf_event = evhttp_connection_get_bufferevent(conn);
|
||||
bufferevent_setcb(buf_event, NULL, NULL, _http_callback_stream_error, (void *)client);
|
||||
@@ -463,8 +471,8 @@ static void _http_callback_stream_write(struct bufferevent *buf_event, void *v_c
|
||||
// по тем же причинам, по которым у нас нет Content-Length.
|
||||
|
||||
# define ADD_ADVANCE_HEADERS \
|
||||
{ assert(evbuffer_add_printf(buf, \
|
||||
"Content-Type: image/jpeg" RN "X-Timestamp: %.06Lf" RN RN, get_now_real())); }
|
||||
assert(evbuffer_add_printf(buf, \
|
||||
"Content-Type: image/jpeg" RN "X-Timestamp: %.06Lf" RN RN, get_now_real()))
|
||||
|
||||
if (client->need_initial) {
|
||||
assert(evbuffer_add_printf(buf,
|
||||
@@ -559,20 +567,25 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
|
||||
char *client_addr = "???";
|
||||
unsigned short client_port = 0;
|
||||
|
||||
client->server->run->stream_clients_count -= 1;
|
||||
# define RUN(_next) client->server->run->_next
|
||||
|
||||
RUN(stream_clients_count) -= 1;
|
||||
if (client->server->slowdown && RUN(stream_clients_count) <= 0) {
|
||||
stream_switch_slowdown(RUN(stream), true);
|
||||
}
|
||||
|
||||
conn = evhttp_request_get_connection(client->request);
|
||||
if (conn != NULL) {
|
||||
evhttp_connection_get_peer(conn, &client_addr, &client_port);
|
||||
}
|
||||
LOG_INFO("HTTP: Disconnected the stream client: [%s]:%u; clients now: %u",
|
||||
client_addr, client_port, client->server->run->stream_clients_count);
|
||||
client_addr, client_port, RUN(stream_clients_count));
|
||||
if (conn != NULL) {
|
||||
evhttp_connection_free(conn);
|
||||
}
|
||||
|
||||
if (client->prev == NULL) {
|
||||
client->server->run->stream_clients = client->next;
|
||||
RUN(stream_clients) = client->next;
|
||||
} else {
|
||||
client->prev->next = client->next;
|
||||
}
|
||||
@@ -581,6 +594,8 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
|
||||
}
|
||||
free(client->key);
|
||||
free(client);
|
||||
|
||||
# undef RUN
|
||||
}
|
||||
|
||||
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated) {
|
||||
@@ -637,12 +652,14 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
|
||||
bool stream_updated = false;
|
||||
bool picture_updated = false;
|
||||
|
||||
# define UNLOCK_STREAM \
|
||||
{ server->run->stream->updated = false; A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); }
|
||||
# define UNLOCK_STREAM { \
|
||||
atomic_store(&server->run->stream->updated, false); \
|
||||
A_MUTEX_UNLOCK(&server->run->stream->mutex); \
|
||||
}
|
||||
|
||||
if (server->run->stream->updated) {
|
||||
if (atomic_load(&server->run->stream->updated)) {
|
||||
LOG_DEBUG("Refreshing HTTP exposed ...");
|
||||
A_PTHREAD_M_LOCK(&server->run->stream->mutex);
|
||||
A_MUTEX_LOCK(&server->run->stream->mutex);
|
||||
if (server->run->stream->picture.size > 0) { // If online
|
||||
picture_updated = _expose_new_picture(server);
|
||||
UNLOCK_STREAM;
|
||||
|
||||
@@ -85,6 +85,7 @@ struct http_server_t {
|
||||
bool unix_rm;
|
||||
mode_t unix_mode;
|
||||
unsigned drop_same_frames;
|
||||
bool slowdown;
|
||||
unsigned fake_width;
|
||||
unsigned fake_height;
|
||||
unsigned timeout;
|
||||
|
||||
143
src/main.c
143
src/main.c
@@ -41,8 +41,8 @@
|
||||
#include "http.h"
|
||||
|
||||
|
||||
static const char _short_opts[] = "d:i:x:y:m:a:f:z:tb:w:q:c:s:p:u:ro:e:h";
|
||||
static const struct option _long_opts[] = {
|
||||
static const char _SHORT_OPTS[] = "d:i:x:y:m:a:f:z:ntb:w:q:c:s:p:u:ro:e:lhv";
|
||||
static const struct option _LONG_OPTS[] = {
|
||||
{"device", required_argument, NULL, 'd'},
|
||||
{"input", required_argument, NULL, 'i'},
|
||||
{"width", required_argument, NULL, 'x'},
|
||||
@@ -51,14 +51,14 @@ static const struct option _long_opts[] = {
|
||||
{"tv-standard", required_argument, NULL, 'a'},
|
||||
{"desired-fps", required_argument, NULL, 'f'},
|
||||
{"min-frame-size", required_argument, NULL, 'z'},
|
||||
{"persistent", no_argument, NULL, 'n'},
|
||||
{"dv-timings", no_argument, NULL, 't'},
|
||||
{"buffers", required_argument, NULL, 'b'},
|
||||
{"workers", required_argument, NULL, 'w'},
|
||||
{"quality", required_argument, NULL, 'q'},
|
||||
{"encoder", required_argument, NULL, 'c'},
|
||||
{"device-timeout", required_argument, NULL, 1000},
|
||||
{"device-persistent", no_argument, NULL, 1001},
|
||||
{"device-error-delay", required_argument, NULL, 1002},
|
||||
{"device-error-delay", required_argument, NULL, 1001},
|
||||
|
||||
{"brightness", required_argument, NULL, 2000},
|
||||
{"brightness-auto", no_argument, NULL, 2001},
|
||||
@@ -80,6 +80,7 @@ static const struct option _long_opts[] = {
|
||||
{"unix-rm", no_argument, NULL, 'r'},
|
||||
{"unix-mode", required_argument, NULL, 'o'},
|
||||
{"drop-same-frames", required_argument, NULL, 'e'},
|
||||
{"slowdown", no_argument, NULL, 'l'},
|
||||
{"fake-width", required_argument, NULL, 3001},
|
||||
{"fake-height", required_argument, NULL, 3002},
|
||||
{"server-timeout", required_argument, NULL, 3003},
|
||||
@@ -89,7 +90,7 @@ static const struct option _long_opts[] = {
|
||||
{"debug", no_argument, NULL, 5002},
|
||||
{"log-level", required_argument, NULL, 5010},
|
||||
{"help", no_argument, NULL, 'h'},
|
||||
{"version", no_argument, NULL, 6000},
|
||||
{"version", no_argument, NULL, 'v'},
|
||||
{NULL, 0, NULL, 0},
|
||||
};
|
||||
|
||||
@@ -105,76 +106,79 @@ static void _version(bool nl) {
|
||||
|
||||
static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_server_t *server) {
|
||||
printf("\nuStreamer - Lightweight and fast MJPG-HTTP streamer\n");
|
||||
printf("===================================================\n\n");
|
||||
printf("═══════════════════════════════════════════════════\n\n");
|
||||
printf("Version: ");
|
||||
_version(false);
|
||||
printf("; license: GPLv3\n");
|
||||
printf("Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com>\n\n");
|
||||
printf("Capturing options:\n");
|
||||
printf("------------------\n");
|
||||
printf(" -d|--device </dev/path> -- Path to V4L2 device. Default: %s.\n\n", dev->path);
|
||||
printf(" -i|--input <N> -- Input channel. Default: %u.\n\n", dev->input);
|
||||
printf(" -x|--width <N> -- Initial image width. Default: %u.\n\n", dev->width);
|
||||
printf(" -y|--height <N> -- Initial image height. Default: %u.\n\n", dev->height);
|
||||
printf(" -m|--format <fmt> -- Image format.\n");
|
||||
printf(" Available: %s; default: YUYV.\n\n", FORMATS_STR);
|
||||
printf(" -a|--tv-standard <std> -- Force TV standard.\n");
|
||||
printf(" Available: %s; default: disabled.\n\n", STANDARDS_STR);
|
||||
printf(" -f|--desired-fps <N> -- Desired FPS. Default: maximum as possible.\n\n");
|
||||
printf(" -z|--min-frame-size <N> -- Drop frames smaller then this limit.\n");
|
||||
printf(" Useful if the device produces small-sized garbage frames.\n\n");
|
||||
printf(" -t|--dv-timings -- Enable DV timings queriyng and events processing.\n");
|
||||
printf(" Supports automatic resolution changing. Default: disabled.\n\n");
|
||||
printf(" -b|--buffers <N> -- The number of buffers to receive data from the device.\n");
|
||||
printf(" Each buffer may processed using an intermediate thread.\n");
|
||||
printf(" Default: %u (number of CPU cores + 1)\n\n", dev->n_buffers);
|
||||
printf(" -w|--workers <N> -- The number of compressing threads. Default: %u (== --buffers).\n\n", dev->n_workers);
|
||||
printf(" -q|--quality <N> -- Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n\n", encoder->quality);
|
||||
printf(" -c|--encoder <type> -- Use specified encoder. It may affects to workers number.\n");
|
||||
printf(" -- Available: %s; default: CPU.\n\n", ENCODER_TYPES_STR);
|
||||
printf(" --device-timeout <seconds> -- Timeout for device querying. Default: %u\n\n", dev->timeout);
|
||||
printf(" --device-persistent -- Don't re-initialize device on timeout. Default: disabled.\n\n");
|
||||
printf(" --device-error-delay <seconds> -- Delay before trying to connect to the device again\n");
|
||||
printf(" after a timeout. Default: %u\n\n", dev->error_delay);
|
||||
printf("══════════════════\n");
|
||||
printf(" -d|--device </dev/path> ──────── Path to V4L2 device. Default: %s.\n\n", dev->path);
|
||||
printf(" -i|--input <N> ───────────────── Input channel. Default: %u.\n\n", dev->input);
|
||||
printf(" -x|--width <N> ───────────────── Initial image width. Default: %u.\n\n", dev->width);
|
||||
printf(" -y|--height <N> ──────────────── Initial image height. Default: %u.\n\n", dev->height);
|
||||
printf(" -m|--format <fmt> ────────────── Image format.\n");
|
||||
printf(" Available: %s; default: YUYV.\n\n", FORMATS_STR);
|
||||
printf(" -a|--tv-standard <std> ───────── Force TV standard.\n");
|
||||
printf(" Available: %s; default: disabled.\n\n", STANDARDS_STR);
|
||||
printf(" -f|--desired-fps <N> ─────────── Desired FPS. Default: maximum as possible.\n\n");
|
||||
printf(" -z|--min-frame-size <N> ──────── Drop frames smaller then this limit.\n");
|
||||
printf(" Useful if the device produces small-sized garbage frames.\n\n");
|
||||
printf(" -n|--persistent ──────────────── Don't re-initialize device on timeout. Default: disabled.\n\n");
|
||||
printf(" -t|--dv-timings ──────────────── Enable DV timings queriyng and events processing.\n");
|
||||
printf(" Supports automatic resolution changing. Default: disabled.\n\n");
|
||||
printf(" -b|--buffers <N> ─────────────── The number of buffers to receive data from the device.\n");
|
||||
printf(" Each buffer may processed using an intermediate thread.\n");
|
||||
printf(" Default: %u (the number of CPU cores (but not more 4) + 1)\n\n", dev->n_buffers);
|
||||
printf(" -w|--workers <N> ─────────────── The number of worker threads. Default: %u (== --buffers).\n\n", dev->n_workers);
|
||||
printf(" -q|--quality <N> ─────────────── Set quality of JPEG encoding from 1 to 100 (best). Default: %u.\n\n", encoder->quality);
|
||||
printf(" -c|--encoder <type> ──────────── Use specified encoder. It may affects to workers number.\n");
|
||||
printf(" Available: %s; default: CPU.\n\n", ENCODER_TYPES_STR);
|
||||
printf(" --device-timeout <seconds> ───── Timeout for device querying. Default: %u\n\n", dev->timeout);
|
||||
printf(" --device-error-delay <seconds> ─ Delay before trying to connect to the device again\n");
|
||||
printf(" after timeout. Default: %u\n\n", dev->error_delay);
|
||||
printf("Image control options:\n");
|
||||
printf("---------------\n");
|
||||
printf(" --brightness <N> -- Set brightness. Default: no change.\n\n");
|
||||
printf(" --brightness-auto -- Enable automatic brightness control. Default: no change.\n\n");
|
||||
printf(" --contrast <N> -- Set contrast. Default: no change.\n\n");
|
||||
printf(" --saturation <N> -- Set saturation. Default: no change.\n\n");
|
||||
printf(" --hue <N> -- Set hue. Default: no change.\n\n");
|
||||
printf(" --hue-auto -- Enable automatic hue control. Default: no change.\n\n");
|
||||
printf(" --gamma <N> -- Set gamma. Default: no change.\n\n");
|
||||
printf(" --sharpness <N> -- Set sharpness. Default: no change.\n\n");
|
||||
printf(" --backlight-compensation <N> -- Set backlight compensation. Default: no change.\n\n");
|
||||
printf(" --white-balance <N> -- Set white balance. Default: no change.\n\n");
|
||||
printf(" --white-balance-auto -- Enable automatic white balance control. Default: no change.\n\n");
|
||||
printf(" --gain <N> -- Set gain. Default: no change.\n\n");
|
||||
printf(" --gain-auto -- Enable automatic gain control. Default: no change.\n\n");
|
||||
printf("══════════════════════\n");
|
||||
printf(" --brightness <N> ───────────── Set brightness. Default: no change.\n\n");
|
||||
printf(" --brightness-auto ──────────── Enable automatic brightness control. Default: no change.\n\n");
|
||||
printf(" --contrast <N> ─────────────── Set contrast. Default: no change.\n\n");
|
||||
printf(" --saturation <N> ───────────── Set saturation. Default: no change.\n\n");
|
||||
printf(" --hue <N> ──────────────────── Set hue. Default: no change.\n\n");
|
||||
printf(" --hue-auto ─────────────────── Enable automatic hue control. Default: no change.\n\n");
|
||||
printf(" --gamma <N> ────────────────── Set gamma. Default: no change.\n\n");
|
||||
printf(" --sharpness <N> ────────────── Set sharpness. Default: no change.\n\n");
|
||||
printf(" --backlight-compensation <N> ─ Set backlight compensation. Default: no change.\n\n");
|
||||
printf(" --white-balance <N> ────────── Set white balance. Default: no change.\n\n");
|
||||
printf(" --white-balance-auto ───────── Enable automatic white balance control. Default: no change.\n\n");
|
||||
printf(" --gain <N> ─────────────────── Set gain. Default: no change.\n\n");
|
||||
printf(" --gain-auto ────────────────── Enable automatic gain control. Default: no change.\n\n");
|
||||
printf("HTTP server options:\n");
|
||||
printf("--------------------\n");
|
||||
printf(" -s|--host <address> -- Listen on Hostname or IP. Default: %s\n\n", server->host);
|
||||
printf(" -p|--port <N> -- Bind to this TCP port. Default: %u\n\n", server->port);
|
||||
printf(" -u|--unix <path> -- Bind to UNIX domain socket. Default: disabled\n\n");
|
||||
printf(" -r|--unix-rm -- Try to remove old UNIX socket file before binding. Default: disabled\n\n");
|
||||
printf(" -o|--unix-mode <mode> -- Set UNIX socket file permissions (like 777). Default: disabled\n\n");
|
||||
printf(" -e|--drop-same-frames <N> -- Don't send same frames to clients, but no more than specified number.\n");
|
||||
printf("════════════════════\n");
|
||||
printf(" -s|--host <address> ──────── Listen on Hostname or IP. Default: %s\n\n", server->host);
|
||||
printf(" -p|--port <N> ────────────── Bind to this TCP port. Default: %u\n\n", server->port);
|
||||
printf(" -u|--unix <path> ─────────── Bind to UNIX domain socket. Default: disabled\n\n");
|
||||
printf(" -r|--unix-rm ─────────────── Try to remove old UNIX socket file before binding. Default: disabled\n\n");
|
||||
printf(" -o|--unix-mode <mode> ────── Set UNIX socket file permissions (like 777). Default: disabled\n\n");
|
||||
printf(" -e|--drop-same-frames <N> ── Don't send same frames to clients, but no more than specified number.\n");
|
||||
printf(" It can significantly reduce the outgoing traffic, but will increase\n");
|
||||
printf(" the CPU loading. Don't use this option with analog signal sources\n");
|
||||
printf(" or webcams, it's useless. Default: disabled.\n\n");
|
||||
printf(" --fake-width <N> -- Override image width for /state. Default: disabled\n\n");
|
||||
printf(" --fake-height <N> -- Override image height for /state. Default: disabled.\n\n");
|
||||
printf(" --server-timeout <seconds> -- Timeout for client connections. Default: %u\n\n", server->timeout);
|
||||
printf(" -l|--slowdown ────────────── Slowdown capturing to 1 FPS or less when no stream clients connected.\n");
|
||||
printf(" Useful to reduce CPU cosumption. Default: disabled.\n\n");
|
||||
printf(" --fake-width <N> ─────────── Override image width for /state. Default: disabled\n\n");
|
||||
printf(" --fake-height <N> ────────── Override image height for /state. Default: disabled.\n\n");
|
||||
printf(" --server-timeout <seconds> ─ Timeout for client connections. Default: %u\n\n", server->timeout);
|
||||
printf("Misc options:\n");
|
||||
printf("-------------\n");
|
||||
printf(" --log-level <N> -- Verbosity level of messages from 0 (info) to 3 (debug).\n");
|
||||
printf("═════════════\n");
|
||||
printf(" --log-level <N> ─ Verbosity level of messages from 0 (info) to 3 (debug).\n");
|
||||
printf(" Enabling debugging messages can slow down the program.\n");
|
||||
printf(" Available levels: 0=info, 1=performance, 2=verbose, 3=debug.\n");
|
||||
printf(" Default: %u.\n\n", log_level);
|
||||
printf(" --perf -- Enable performance messages (same as --log-level=1). Default: disabled.\n\n");
|
||||
printf(" --verbose -- Enable verbose messages and lower (same as --log-level=2). Default: disabled.\n\n");
|
||||
printf(" --debug -- Enable debug messages and lower (same as --log-level=3). Default: disabled.\n\n");
|
||||
printf(" -h|--help -- Print this messages and exit.\n\n");
|
||||
printf(" --perf ────────── Enable performance messages (same as --log-level=1). Default: disabled.\n\n");
|
||||
printf(" --verbose ─────── Enable verbose messages and lower (same as --log-level=2). Default: disabled.\n\n");
|
||||
printf(" --debug ───────── Enable debug messages and lower (same as --log-level=3). Default: disabled.\n\n");
|
||||
printf(" -h|--help ─────── Print this text and exit.\n\n");
|
||||
printf(" -v|--version ──── Print version and exit.\n\n");
|
||||
}
|
||||
|
||||
static int _parse_options(int argc, char *argv[], struct device_t *dev, struct encoder_t *encoder, struct http_server_t *server) {
|
||||
@@ -229,7 +233,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e
|
||||
int ch;
|
||||
|
||||
log_level = LOG_LEVEL_INFO;
|
||||
while ((ch = getopt_long(argc, argv, _short_opts, _long_opts, &index)) >= 0) {
|
||||
while ((ch = getopt_long(argc, argv, _SHORT_OPTS, _LONG_OPTS, &index)) >= 0) {
|
||||
switch (ch) {
|
||||
case 'd': OPT_SET(dev->path, optarg);
|
||||
case 'i': OPT_UNSIGNED(dev->input, "--input", 0, 128);
|
||||
@@ -242,14 +246,14 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e
|
||||
case 'a': OPT_PARSE(dev->standard, device_parse_standard, STANDARD_UNKNOWN, "TV standard");
|
||||
case 'f': OPT_UNSIGNED(dev->desired_fps, "--desired-fps", 0, 30);
|
||||
case 'z': OPT_UNSIGNED(dev->min_frame_size, "--min-frame-size", 0, 8192);
|
||||
case 'n': OPT_SET(dev->persistent, true);
|
||||
case 't': OPT_SET(dev->dv_timings, true);
|
||||
case 'b': OPT_UNSIGNED(dev->n_buffers, "--buffers", 1, 32);
|
||||
case 'w': OPT_UNSIGNED(dev->n_workers, "--workers", 1, 32);
|
||||
case 'q': OPT_UNSIGNED(encoder->quality, "--quality", 1, 100);
|
||||
case 'c': OPT_PARSE(encoder->type, encoder_parse_type, ENCODER_TYPE_UNKNOWN, "encoder type");
|
||||
case 1000: OPT_UNSIGNED(dev->timeout, "--device-timeout", 1, 60);
|
||||
case 1001: OPT_SET(dev->persistent, true);
|
||||
case 1002: OPT_UNSIGNED(dev->error_delay, "--device-error-delay", 1, 60);
|
||||
case 1001: OPT_UNSIGNED(dev->error_delay, "--device-error-delay", 1, 60);
|
||||
|
||||
case 2000: OPT_CTL(brightness);
|
||||
case 2001: OPT_CTL_AUTO(brightness);
|
||||
@@ -271,6 +275,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e
|
||||
case 'r': OPT_SET(server->unix_rm, true);
|
||||
case 'o': OPT_CHMOD(server->unix_mode, "--unix-mode");
|
||||
case 'e': OPT_UNSIGNED(server->drop_same_frames, "--drop-same-frames", 0, 30);
|
||||
case 3000: OPT_SET(server->slowdown, true);
|
||||
case 3001: OPT_UNSIGNED(server->fake_width, "--fake-width", 0, 1920);
|
||||
case 3002: OPT_UNSIGNED(server->fake_height, "--fake-height", 0, 1200);
|
||||
case 3003: OPT_UNSIGNED(server->timeout, "--server-timeout", 1, 60);
|
||||
@@ -280,7 +285,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e
|
||||
case 5002: OPT_SET(log_level, LOG_LEVEL_DEBUG);
|
||||
case 5010: OPT_UNSIGNED(log_level, "--log-level", 0, 3);
|
||||
case 'h': _help(dev, encoder, server); return 1;
|
||||
case 6000: _version(true); return 1;
|
||||
case 'v': _version(true); return 1;
|
||||
case 0: break;
|
||||
default: _help(dev, encoder, server); return -1;
|
||||
}
|
||||
@@ -375,10 +380,10 @@ int main(int argc, char *argv[]) {
|
||||
_ctx = &ctx;
|
||||
|
||||
if ((exit_code = http_server_listen(server)) == 0) {
|
||||
A_PTHREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL);
|
||||
A_PTHREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL);
|
||||
A_PTHREAD_JOIN(stream_loop_tid);
|
||||
A_PTHREAD_JOIN(server_loop_tid);
|
||||
A_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL);
|
||||
A_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL);
|
||||
A_THREAD_JOIN(server_loop_tid);
|
||||
A_THREAD_JOIN(stream_loop_tid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
329
src/stream.c
329
src/stream.c
@@ -20,6 +20,8 @@
|
||||
*****************************************************************************/
|
||||
|
||||
|
||||
#include <stdatomic.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
#include <assert.h>
|
||||
@@ -40,46 +42,48 @@
|
||||
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
||||
|
||||
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
|
||||
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void *_stream_worker_thread(void *v_ctx);
|
||||
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool);
|
||||
|
||||
static int _stream_control(struct device_t *dev, bool enable);
|
||||
static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
|
||||
static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
|
||||
static int _stream_handle_event(struct device_t *dev);
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static void *_stream_worker_thread(void *v_worker);
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
|
||||
|
||||
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
||||
struct process_t *proc;
|
||||
struct stream_t *stream;
|
||||
|
||||
A_CALLOC(proc, 1);
|
||||
atomic_init(&proc->stop, false);
|
||||
atomic_init(&proc->slowdown, false);
|
||||
|
||||
A_CALLOC(stream, 1);
|
||||
stream->dev = dev;
|
||||
stream->encoder = encoder;
|
||||
A_PTHREAD_M_INIT(&stream->mutex);
|
||||
atomic_init(&stream->updated, false);
|
||||
A_MUTEX_INIT(&stream->mutex);
|
||||
stream->proc = proc;
|
||||
return stream;
|
||||
}
|
||||
|
||||
void stream_destroy(struct stream_t *stream) {
|
||||
A_PTHREAD_M_DESTROY(&stream->mutex);
|
||||
A_MUTEX_DESTROY(&stream->mutex);
|
||||
free(stream->proc);
|
||||
free(stream);
|
||||
}
|
||||
|
||||
void stream_loop(struct stream_t *stream) {
|
||||
struct workers_pool_t pool;
|
||||
bool workers_stop;
|
||||
|
||||
MEMSET_ZERO(pool);
|
||||
atomic_init(&pool.workers_stop, false);
|
||||
pool.encoder = stream->encoder;
|
||||
pool.workers_stop = &workers_stop;
|
||||
|
||||
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
||||
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
||||
|
||||
while (_stream_init_loop(stream->dev, &pool) == 0) {
|
||||
while (_stream_init_loop(stream, &pool) == 0) {
|
||||
struct worker_t *oldest_worker = NULL;
|
||||
struct worker_t *last_worker = NULL;
|
||||
long double grab_after = 0;
|
||||
@@ -93,31 +97,31 @@ void stream_loop(struct stream_t *stream) {
|
||||
|
||||
LOG_INFO("Capturing ...");
|
||||
|
||||
while (!stream->dev->stop) {
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
int free_worker_number = -1;
|
||||
|
||||
SEP_DEBUG('-');
|
||||
|
||||
LOG_DEBUG("Waiting for workers ...");
|
||||
A_PTHREAD_M_LOCK(&pool.free_workers_mutex);
|
||||
A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
|
||||
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex);
|
||||
A_MUTEX_LOCK(&pool.free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
|
||||
|
||||
if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) {
|
||||
if (oldest_worker && !atomic_load(&oldest_worker->has_job) && oldest_worker->buf_index >= 0) {
|
||||
if (oldest_worker->job_failed) {
|
||||
break;
|
||||
}
|
||||
|
||||
_stream_expose_picture(stream, oldest_worker->ctx.buf_index);
|
||||
_stream_expose_picture(stream, oldest_worker->buf_index);
|
||||
|
||||
free_worker_number = oldest_worker->ctx.number;
|
||||
free_worker_number = oldest_worker->number;
|
||||
oldest_worker = oldest_worker->order_next;
|
||||
|
||||
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
|
||||
} else {
|
||||
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
|
||||
if (
|
||||
!pool.workers[number].has_job && (
|
||||
!atomic_load(&pool.workers[number].has_job) && (
|
||||
free_worker_number == -1
|
||||
|| pool.workers[free_worker_number].job_start_time < pool.workers[number].job_start_time
|
||||
)
|
||||
@@ -128,15 +132,19 @@ void stream_loop(struct stream_t *stream) {
|
||||
}
|
||||
|
||||
assert(free_worker_number >= 0);
|
||||
assert(!pool.workers[free_worker_number].has_job);
|
||||
assert(!atomic_load(&pool.workers[free_worker_number].has_job));
|
||||
|
||||
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
|
||||
}
|
||||
|
||||
if (stream->dev->stop) {
|
||||
if (atomic_load(&stream->proc->stop)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (atomic_load(&stream->proc->slowdown)) {
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
# define INIT_FD_SET(_set) \
|
||||
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
|
||||
|
||||
@@ -182,7 +190,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
long double now = get_now_monotonic();
|
||||
long long now_second = floor_ms(now);
|
||||
|
||||
if (_stream_grab_buffer(stream->dev, &buf_info) < 0) {
|
||||
if (device_grab_buffer(stream->dev, &buf_info) < 0) {
|
||||
break;
|
||||
}
|
||||
stream->dev->run->pictures[buf_info.index].grab_time = now;
|
||||
@@ -222,7 +230,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next
|
||||
|
||||
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index);
|
||||
FREE_WORKER(ctx.buf_info) = buf_info;
|
||||
FREE_WORKER(buf_info) = buf_info;
|
||||
|
||||
if (!oldest_worker) {
|
||||
oldest_worker = &pool.workers[free_worker_number];
|
||||
@@ -240,23 +248,23 @@ void stream_loop(struct stream_t *stream) {
|
||||
}
|
||||
last_worker->order_next = NULL;
|
||||
|
||||
A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex));
|
||||
FREE_WORKER(ctx.buf_index) = buf_info.index;
|
||||
FREE_WORKER(has_job) = true;
|
||||
A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex));
|
||||
A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond));
|
||||
A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
|
||||
FREE_WORKER(buf_index) = buf_info.index;
|
||||
atomic_store(&FREE_WORKER(has_job), true);
|
||||
A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
|
||||
|
||||
# undef FREE_WORKER
|
||||
|
||||
A_PTHREAD_M_LOCK(&pool.free_workers_mutex);
|
||||
A_MUTEX_LOCK(&pool.free_workers_mutex);
|
||||
pool.free_workers -= 1;
|
||||
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool.free_workers_mutex);
|
||||
|
||||
goto next_handlers; // Поток сам освободит буфер
|
||||
|
||||
pass_frame:
|
||||
|
||||
if (_stream_release_buffer(stream->dev, &buf_info) < 0) {
|
||||
if (device_release_buffer(stream->dev, &buf_info) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -270,35 +278,39 @@ void stream_loop(struct stream_t *stream) {
|
||||
|
||||
if (FD_ISSET(stream->dev->run->fd, &error_fds)) {
|
||||
LOG_INFO("Got V4L2 event");
|
||||
if (_stream_handle_event(stream->dev) < 0) {
|
||||
if (device_consume_event(stream->dev) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
A_PTHREAD_M_LOCK(&stream->mutex);
|
||||
A_MUTEX_LOCK(&stream->mutex);
|
||||
stream->picture.size = 0; // On stream offline
|
||||
free(stream->picture.data);
|
||||
stream->width = 0;
|
||||
stream->height = 0;
|
||||
stream->updated = true;
|
||||
A_PTHREAD_M_UNLOCK(&stream->mutex);
|
||||
atomic_store(&stream->updated, true);
|
||||
A_MUTEX_UNLOCK(&stream->mutex);
|
||||
}
|
||||
|
||||
_stream_destroy_workers(stream->dev, &pool);
|
||||
_stream_control(stream->dev, false);
|
||||
_stream_destroy_workers(stream, &pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
}
|
||||
|
||||
void stream_loop_break(struct stream_t *stream) {
|
||||
stream->dev->stop = 1;
|
||||
atomic_store(&stream->proc->stop, true);
|
||||
}
|
||||
|
||||
void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
|
||||
atomic_store(&stream->proc->slowdown, slowdown);
|
||||
}
|
||||
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
|
||||
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
|
||||
|
||||
A_PTHREAD_M_LOCK(&stream->mutex);
|
||||
A_MUTEX_LOCK(&stream->mutex);
|
||||
|
||||
stream->picture.size = PICTURE(size);
|
||||
stream->picture.allocated = PICTURE(allocated);
|
||||
@@ -311,9 +323,9 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
|
||||
|
||||
stream->width = stream->dev->run->width;
|
||||
stream->height = stream->dev->run->height;
|
||||
stream->updated = true;
|
||||
atomic_store(&stream->updated, true);
|
||||
|
||||
A_PTHREAD_M_UNLOCK(&stream->mutex);
|
||||
A_MUTEX_UNLOCK(&stream->mutex);
|
||||
|
||||
# undef PICTURE
|
||||
}
|
||||
@@ -327,11 +339,11 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
for (unsigned number = 0; number < dev->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex));
|
||||
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
|
||||
if (WORKER(last_comp_time) > 0) {
|
||||
sum_comp_time += WORKER(last_comp_time);
|
||||
}
|
||||
A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex));
|
||||
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
@@ -348,14 +360,14 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
int retval = -1;
|
||||
|
||||
LOG_DEBUG("%s: *dev->stop = %d", __FUNCTION__, dev->stop);
|
||||
while (!dev->stop) {
|
||||
if ((retval = _stream_init(dev, pool)) < 0) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", dev->error_delay);
|
||||
sleep(dev->error_delay);
|
||||
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
if ((retval = _stream_init(stream, pool)) < 0) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
|
||||
sleep(stream->dev->error_delay);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -363,218 +375,143 @@ static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool)
|
||||
return retval;
|
||||
}
|
||||
|
||||
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
SEP_INFO('=');
|
||||
|
||||
_stream_destroy_workers(dev, pool);
|
||||
_stream_control(dev, false);
|
||||
device_close(dev);
|
||||
_stream_destroy_workers(stream, pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
|
||||
if (device_open(dev) < 0) {
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
if (_stream_control(dev, true) < 0) {
|
||||
if (device_switch_capturing(stream->dev, true) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
encoder_prepare_live(pool->encoder, dev);
|
||||
encoder_prepare_live(pool->encoder, stream->dev);
|
||||
|
||||
_stream_init_workers(dev, pool);
|
||||
_stream_init_workers(stream, pool);
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
device_close(dev);
|
||||
device_close(stream->dev);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", dev->n_workers);
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", stream->dev->n_workers);
|
||||
|
||||
*pool->workers_stop = false;
|
||||
A_CALLOC(pool->workers, dev->n_workers);
|
||||
atomic_store(&pool->workers_stop, false);
|
||||
A_CALLOC(pool->workers, stream->dev->n_workers);
|
||||
|
||||
A_PTHREAD_M_INIT(&pool->free_workers_mutex);
|
||||
A_PTHREAD_C_INIT(&pool->free_workers_cond);
|
||||
A_MUTEX_INIT(&pool->free_workers_mutex);
|
||||
A_COND_INIT(&pool->free_workers_cond);
|
||||
|
||||
for (unsigned number = 0; number < dev->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
# define CTX(_next) WORKER(ctx._next)
|
||||
|
||||
pool->free_workers += 1;
|
||||
|
||||
A_PTHREAD_M_INIT(&WORKER(has_job_mutex));
|
||||
A_PTHREAD_C_INIT(&WORKER(has_job_cond));
|
||||
A_MUTEX_INIT(&WORKER(has_job_mutex));
|
||||
atomic_init(&WORKER(has_job), false);
|
||||
A_COND_INIT(&WORKER(has_job_cond));
|
||||
|
||||
CTX(number) = number;
|
||||
CTX(dev) = dev;
|
||||
CTX(dev_stop) = (sig_atomic_t *volatile)&dev->stop;
|
||||
CTX(workers_stop) = pool->workers_stop;
|
||||
WORKER(number) = number;
|
||||
WORKER(proc_stop) = &stream->proc->stop;
|
||||
WORKER(workers_stop) = &pool->workers_stop;
|
||||
|
||||
CTX(encoder) = pool->encoder;
|
||||
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WORKER(free_workers) = &pool->free_workers;
|
||||
WORKER(free_workers_cond) = &pool->free_workers_cond;
|
||||
|
||||
CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex);
|
||||
CTX(last_comp_time) = &WORKER(last_comp_time);
|
||||
WORKER(dev) = stream->dev;
|
||||
WORKER(encoder) = pool->encoder;
|
||||
|
||||
CTX(has_job_mutex) = &WORKER(has_job_mutex);
|
||||
CTX(has_job) = &WORKER(has_job);
|
||||
CTX(job_failed) = &WORKER(job_failed);
|
||||
CTX(job_start_time) = &WORKER(job_start_time);
|
||||
CTX(has_job_cond) = &WORKER(has_job_cond);
|
||||
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
|
||||
|
||||
CTX(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
CTX(free_workers) = &pool->free_workers;
|
||||
CTX(free_workers_cond) = &pool->free_workers_cond;
|
||||
|
||||
A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx));
|
||||
|
||||
# undef CTX
|
||||
# undef WORKER
|
||||
}
|
||||
}
|
||||
|
||||
static void *_stream_worker_thread(void *v_ctx) {
|
||||
struct worker_context_t *ctx = (struct worker_context_t *)v_ctx;
|
||||
static void *_stream_worker_thread(void *v_worker) {
|
||||
struct worker_t *worker = (struct worker_t *)v_worker;
|
||||
|
||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number);
|
||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
|
||||
|
||||
while (!*ctx->dev_stop && !*ctx->workers_stop) {
|
||||
LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number);
|
||||
A_PTHREAD_M_LOCK(ctx->has_job_mutex);
|
||||
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
|
||||
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex);
|
||||
while (!atomic_load(worker->proc_stop) && !atomic_load(worker->workers_stop)) {
|
||||
LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
|
||||
A_MUTEX_LOCK(&worker->has_job_mutex);
|
||||
A_COND_WAIT_TRUE(atomic_load(&worker->has_job), &worker->has_job_cond, &worker->has_job_mutex);
|
||||
A_MUTEX_UNLOCK(&worker->has_job_mutex);
|
||||
|
||||
if (!*ctx->workers_stop) {
|
||||
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next
|
||||
if (!atomic_load(worker->workers_stop)) {
|
||||
# define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]._next
|
||||
|
||||
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index);
|
||||
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
|
||||
|
||||
PICTURE(encode_begin_time) = get_now_monotonic();
|
||||
if (encoder_compress_buffer(ctx->encoder, ctx->dev, ctx->number, ctx->buf_index) < 0) {
|
||||
*ctx->job_failed = true;
|
||||
if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
|
||||
worker->job_failed = true;
|
||||
}
|
||||
PICTURE(encode_end_time) = get_now_monotonic();
|
||||
|
||||
if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) {
|
||||
*ctx->job_start_time = PICTURE(encode_begin_time);
|
||||
*ctx->has_job = false;
|
||||
if (device_release_buffer(worker->dev, &worker->buf_info) == 0) {
|
||||
worker->job_start_time = PICTURE(encode_begin_time);
|
||||
atomic_store(&worker->has_job, false);
|
||||
|
||||
long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time;
|
||||
long double last_comp_time = PICTURE(encode_end_time) - worker->job_start_time;
|
||||
|
||||
A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex);
|
||||
*ctx->last_comp_time = last_comp_time;
|
||||
A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex);
|
||||
A_MUTEX_LOCK(&worker->last_comp_time_mutex);
|
||||
worker->last_comp_time = last_comp_time;
|
||||
A_MUTEX_UNLOCK(&worker->last_comp_time_mutex);
|
||||
|
||||
LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u",
|
||||
PICTURE(size), last_comp_time, ctx->number, ctx->buf_index);
|
||||
PICTURE(size), last_comp_time, worker->number, worker->buf_index);
|
||||
} else {
|
||||
*ctx->job_failed = true;
|
||||
*ctx->has_job = false;
|
||||
worker->job_failed = true;
|
||||
atomic_store(&worker->has_job, false);
|
||||
}
|
||||
|
||||
# undef PICTURE
|
||||
}
|
||||
|
||||
A_PTHREAD_M_LOCK(ctx->free_workers_mutex);
|
||||
*ctx->free_workers += 1;
|
||||
A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex);
|
||||
A_PTHREAD_C_SIGNAL(ctx->free_workers_cond);
|
||||
A_MUTEX_LOCK(worker->free_workers_mutex);
|
||||
*worker->free_workers += 1;
|
||||
A_MUTEX_UNLOCK(worker->free_workers_mutex);
|
||||
A_COND_SIGNAL(worker->free_workers_cond);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Bye-bye (worker %u)", ctx->number);
|
||||
LOG_DEBUG("Bye-bye (worker %u)", worker->number);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
if (pool->workers) {
|
||||
LOG_INFO("Destroying workers ...");
|
||||
|
||||
*pool->workers_stop = true;
|
||||
for (unsigned number = 0; number < dev->n_workers; ++number) {
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_PTHREAD_M_LOCK(&WORKER(has_job_mutex));
|
||||
WORKER(has_job) = true; // Final job: die
|
||||
A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond));
|
||||
A_MUTEX_LOCK(&WORKER(has_job_mutex));
|
||||
atomic_store(&WORKER(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&WORKER(has_job_cond));
|
||||
|
||||
A_PTHREAD_JOIN(WORKER(tid));
|
||||
A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex));
|
||||
A_PTHREAD_C_DESTROY(&WORKER(has_job_cond));
|
||||
A_THREAD_JOIN(WORKER(tid));
|
||||
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
|
||||
A_COND_DESTROY(&WORKER(has_job_cond));
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
|
||||
A_PTHREAD_M_DESTROY(&pool->free_workers_mutex);
|
||||
A_PTHREAD_C_DESTROY(&pool->free_workers_cond);
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
}
|
||||
pool->free_workers = 0;
|
||||
pool->workers = NULL;
|
||||
}
|
||||
|
||||
static int _stream_control(struct device_t *dev, bool enable) {
|
||||
if (enable != dev->run->capturing) {
|
||||
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(%s) ...", (enable ? "VIDIOC_STREAMON" : "VIDIOC_STREAMOFF"));
|
||||
if (xioctl(dev->run->fd, (enable ? VIDIOC_STREAMON : VIDIOC_STREAMOFF), &type) < 0) {
|
||||
LOG_PERROR("Unable to %s capturing", (enable ? "start" : "stop"));
|
||||
if (enable) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
dev->run->capturing = enable;
|
||||
LOG_INFO("Capturing %s", (enable ? "started" : "stopped"));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
|
||||
MEMSET_ZERO_PTR(buf_info);
|
||||
buf_info->type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf_info->memory = V4L2_MEMORY_MMAP;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_DQBUF) ...");
|
||||
if (xioctl(dev->run->fd, VIDIOC_DQBUF, buf_info) < 0) {
|
||||
LOG_PERROR("Unable to dequeue buffer");
|
||||
return -1;
|
||||
}
|
||||
|
||||
LOG_DEBUG("Got a new frame in buffer index=%u; bytesused=%u", buf_info->index, buf_info->bytesused);
|
||||
if (buf_info->index >= dev->run->n_buffers) {
|
||||
LOG_ERROR("Got invalid buffer index=%u; nbuffers=%u", buf_info->index, dev->run->n_buffers);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _stream_release_buffer(struct device_t *dev, struct v4l2_buffer *buf_info) {
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_QBUF) ...");
|
||||
if (xioctl(dev->run->fd, VIDIOC_QBUF, buf_info) < 0) {
|
||||
LOG_PERROR("Unable to requeue buffer");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _stream_handle_event(struct device_t *dev) {
|
||||
struct v4l2_event event;
|
||||
|
||||
LOG_DEBUG("Calling ioctl(VIDIOC_DQEVENT) ...");
|
||||
if (!xioctl(dev->run->fd, VIDIOC_DQEVENT, &event)) {
|
||||
switch (event.type) {
|
||||
case V4L2_EVENT_SOURCE_CHANGE:
|
||||
LOG_INFO("Got V4L2_EVENT_SOURCE_CHANGE: source changed");
|
||||
return -1;
|
||||
case V4L2_EVENT_EOS:
|
||||
LOG_INFO("Got V4L2_EVENT_EOS: end of stream (ignored)");
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
LOG_PERROR("Got some V4L2 device event, but where is it? ");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
61
src/stream.h
61
src/stream.h
@@ -23,7 +23,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <signal.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
@@ -31,50 +31,37 @@
|
||||
#include "encoder.h"
|
||||
|
||||
|
||||
struct worker_context_t {
|
||||
struct worker_t {
|
||||
pthread_t tid;
|
||||
unsigned number;
|
||||
struct device_t *dev;
|
||||
atomic_bool *proc_stop;
|
||||
atomic_bool *workers_stop;
|
||||
|
||||
pthread_mutex_t last_comp_time_mutex;
|
||||
long double last_comp_time;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
int buf_index;
|
||||
struct v4l2_buffer buf_info;
|
||||
sig_atomic_t *volatile dev_stop;
|
||||
bool *workers_stop;
|
||||
|
||||
struct encoder_t *encoder;
|
||||
|
||||
pthread_mutex_t *last_comp_time_mutex;
|
||||
long double *last_comp_time;
|
||||
|
||||
pthread_mutex_t *has_job_mutex;
|
||||
bool *has_job;
|
||||
bool *job_failed;
|
||||
long double *job_start_time;
|
||||
pthread_cond_t *has_job_cond;
|
||||
atomic_bool has_job;
|
||||
bool job_failed;
|
||||
long double job_start_time;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
pthread_mutex_t *free_workers_mutex;
|
||||
unsigned *free_workers;
|
||||
pthread_cond_t *free_workers_cond;
|
||||
};
|
||||
|
||||
struct worker_t {
|
||||
struct worker_context_t ctx;
|
||||
pthread_t tid;
|
||||
struct worker_t *order_prev;
|
||||
struct worker_t *order_next;
|
||||
|
||||
pthread_mutex_t last_comp_time_mutex;
|
||||
long double last_comp_time;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
bool has_job;
|
||||
bool job_failed;
|
||||
long double job_start_time;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
struct worker_t *order_prev;
|
||||
struct worker_t *order_next;
|
||||
struct device_t *dev;
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct workers_pool_t {
|
||||
struct worker_t *workers;
|
||||
bool *workers_stop;
|
||||
atomic_bool workers_stop;
|
||||
|
||||
pthread_mutex_t free_workers_mutex;
|
||||
unsigned free_workers;
|
||||
@@ -83,13 +70,20 @@ struct workers_pool_t {
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct process_t {
|
||||
atomic_bool stop;
|
||||
atomic_bool slowdown;
|
||||
};
|
||||
|
||||
struct stream_t {
|
||||
struct picture_t picture;
|
||||
unsigned width;
|
||||
unsigned height;
|
||||
unsigned captured_fps;
|
||||
bool updated;
|
||||
atomic_bool updated;
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
struct process_t *proc;
|
||||
struct device_t *dev;
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
@@ -100,3 +94,4 @@ void stream_destroy(struct stream_t *stream);
|
||||
|
||||
void stream_loop(struct stream_t *stream);
|
||||
void stream_loop_break(struct stream_t *stream);
|
||||
void stream_switch_slowdown(struct stream_t *stream, bool slowdown);
|
||||
|
||||
26
src/tools.h
26
src/tools.h
@@ -35,18 +35,18 @@
|
||||
#include <sys/syscall.h>
|
||||
|
||||
|
||||
#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg))
|
||||
#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL))
|
||||
#define A_THREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg))
|
||||
#define A_THREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL))
|
||||
|
||||
#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL))
|
||||
#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex))
|
||||
#define A_PTHREAD_M_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex))
|
||||
#define A_PTHREAD_M_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex))
|
||||
#define A_MUTEX_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL))
|
||||
#define A_MUTEX_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex))
|
||||
#define A_MUTEX_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex))
|
||||
#define A_MUTEX_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex))
|
||||
|
||||
#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
|
||||
#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
|
||||
#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
|
||||
#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
|
||||
#define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
|
||||
#define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
|
||||
#define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
|
||||
#define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
|
||||
|
||||
#define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest)))))
|
||||
#define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest)))))
|
||||
@@ -63,12 +63,16 @@ INLINE char *bool_to_string(bool flag) {
|
||||
return (flag ? "true" : "false");
|
||||
}
|
||||
|
||||
INLINE unsigned min_u(unsigned a, unsigned b) {
|
||||
return (a < b ? a : b);
|
||||
}
|
||||
|
||||
INLINE unsigned max_u(unsigned a, unsigned b) {
|
||||
return (a > b ? a : b);
|
||||
}
|
||||
|
||||
INLINE long long floor_ms(long double now) {
|
||||
return (long long) now - (now < (long long) now); // floor()
|
||||
return (long long)now - (now < (long long)now); // floor()
|
||||
}
|
||||
|
||||
INLINE void get_now(clockid_t clk_id, time_t *sec, long *msec) {
|
||||
|
||||
Reference in New Issue
Block a user