Compare commits

...

4 Commits
v0.55 ... v0.56

Author SHA1 Message Date
Devaev Maxim
dc43f01a7d Bump version: 0.55 → 0.56 2019-03-15 23:59:25 +03:00
Devaev Maxim
bac7a2595e refactoring 2019-03-15 14:21:50 +03:00
Devaev Maxim
50158397a0 refactoring 2019-03-15 13:34:06 +03:00
Devaev Maxim
484f89cb82 slowdown 2019-03-15 13:26:53 +03:00
11 changed files with 193 additions and 177 deletions

View File

@@ -1,7 +1,7 @@
[bumpversion] [bumpversion]
commit = True commit = True
tag = True tag = True
current_version = 0.55 current_version = 0.56
parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)? parse = (?P<major>\d+)\.(?P<minor>\d+)(\.(?P<patch>\d+)(\-(?P<release>[a-z]+))?)?
serialize = serialize =
{major}.{minor} {major}.{minor}

View File

@@ -3,7 +3,7 @@
pkgname=ustreamer pkgname=ustreamer
pkgver=0.55 pkgver=0.56
pkgrel=1 pkgrel=1
pkgdesc="Lightweight and fast MJPG-HTTP streamer" pkgdesc="Lightweight and fast MJPG-HTTP streamer"
url="https://github.com/pi-kvm/ustreamer" url="https://github.com/pi-kvm/ustreamer"

View File

@@ -22,4 +22,4 @@
#pragma once #pragma once
#define VERSION "0.55" #define VERSION "0.56"

View File

@@ -24,7 +24,6 @@
#include <stddef.h> #include <stddef.h>
#include <stdbool.h> #include <stdbool.h>
#include <signal.h>
#include <linux/videodev2.h> #include <linux/videodev2.h>
@@ -106,7 +105,6 @@ struct device_t {
struct controls_t *ctl; struct controls_t *ctl;
struct device_runtime_t *run; struct device_runtime_t *run;
sig_atomic_t volatile stop;
}; };

View File

@@ -57,7 +57,7 @@ struct encoder_t *encoder_init() {
A_CALLOC(run, 1); A_CALLOC(run, 1);
run->type = ENCODER_TYPE_CPU; run->type = ENCODER_TYPE_CPU;
run->quality = 80; run->quality = 80;
A_PTHREAD_M_INIT(&run->mutex); A_MUTEX_INIT(&run->mutex);
A_CALLOC(encoder, 1); A_CALLOC(encoder, 1);
encoder->type = run->type; encoder->type = run->type;
@@ -119,7 +119,7 @@ void encoder_destroy(struct encoder_t *encoder) {
free(encoder->run->omxs); free(encoder->run->omxs);
} }
# endif # endif
A_PTHREAD_M_DESTROY(&encoder->run->mutex); A_MUTEX_DESTROY(&encoder->run->mutex);
free(encoder->run); free(encoder->run);
free(encoder); free(encoder);
} }
@@ -150,9 +150,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
&& encoder->run->type != ENCODER_TYPE_HW && encoder->run->type != ENCODER_TYPE_HW
) { ) {
LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG"); LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG");
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_HW; encoder->run->type = ENCODER_TYPE_HW;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
} }
if (encoder->run->type == ENCODER_TYPE_HW) { if (encoder->run->type == ENCODER_TYPE_HW) {
@@ -161,9 +161,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
goto use_fallback; goto use_fallback;
} }
if (hw_encoder_prepare_live(dev, encoder->quality) < 0) { if (hw_encoder_prepare_live(dev, encoder->quality) < 0) {
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->quality = 0; encoder->run->quality = 0;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
LOG_INFO("Using JPEG quality: HW-default"); LOG_INFO("Using JPEG quality: HW-default");
} }
} }
@@ -181,10 +181,10 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
return; return;
use_fallback: use_fallback:
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU; encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality; encoder->run->quality = encoder->quality;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
} }
#pragma GCC diagnostic ignored "-Wunused-label" #pragma GCC diagnostic ignored "-Wunused-label"
@@ -213,10 +213,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
# pragma GCC diagnostic push # pragma GCC diagnostic push
use_fallback: use_fallback:
LOG_INFO("Error while compressing, falling back to CPU"); LOG_INFO("Error while compressing, falling back to CPU");
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU; encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality; encoder->run->quality = encoder->quality;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
return -1; return -1;
# pragma GCC diagnostic pop # pragma GCC diagnostic pop
} }

View File

@@ -147,6 +147,10 @@ int http_server_listen(struct http_server_t *server) {
server->run->drop_same_frames_blank = max_u(server->drop_same_frames, server->run->drop_same_frames_blank); server->run->drop_same_frames_blank = max_u(server->drop_same_frames, server->run->drop_same_frames_blank);
if (server->slowdown) {
stream_switch_slowdown(server->run->stream, true);
}
evhttp_set_timeout(server->run->http, server->timeout); evhttp_set_timeout(server->run->http, server->timeout);
if (server->unix_path) { if (server->unix_path) {
@@ -264,10 +268,10 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
# define ENCODER(_next) server->run->stream->encoder->_next # define ENCODER(_next) server->run->stream->encoder->_next
A_PTHREAD_M_LOCK(&ENCODER(run->mutex)); A_MUTEX_LOCK(&ENCODER(run->mutex));
encoder_run_type = ENCODER(run->type); encoder_run_type = ENCODER(run->type);
encoder_run_quality = ENCODER(run->quality); encoder_run_quality = ENCODER(run->quality);
A_PTHREAD_M_UNLOCK(&ENCODER(run->mutex)); A_MUTEX_UNLOCK(&ENCODER(run->mutex));
assert((buf = evbuffer_new())); assert((buf = evbuffer_new()));
@@ -406,6 +410,10 @@ static void _http_callback_stream(struct evhttp_request *request, void *v_server
} }
server->run->stream_clients_count += 1; server->run->stream_clients_count += 1;
if (server->slowdown && server->run->stream_clients_count == 1) {
stream_switch_slowdown(server->run->stream, false);
}
evhttp_connection_get_peer(conn, &client_addr, &client_port); evhttp_connection_get_peer(conn, &client_addr, &client_port);
LOG_INFO("HTTP: Registered the new stream client: [%s]:%u; id=%s; advance_headers=%s; dual_final_frames=%s; clients now: %u", LOG_INFO("HTTP: Registered the new stream client: [%s]:%u; id=%s; advance_headers=%s; dual_final_frames=%s; clients now: %u",
client_addr, client_addr,
@@ -559,20 +567,25 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
char *client_addr = "???"; char *client_addr = "???";
unsigned short client_port = 0; unsigned short client_port = 0;
client->server->run->stream_clients_count -= 1; # define RUN(_next) client->server->run->_next
RUN(stream_clients_count) -= 1;
if (client->server->slowdown && RUN(stream_clients_count) <= 0) {
stream_switch_slowdown(RUN(stream), true);
}
conn = evhttp_request_get_connection(client->request); conn = evhttp_request_get_connection(client->request);
if (conn != NULL) { if (conn != NULL) {
evhttp_connection_get_peer(conn, &client_addr, &client_port); evhttp_connection_get_peer(conn, &client_addr, &client_port);
} }
LOG_INFO("HTTP: Disconnected the stream client: [%s]:%u; clients now: %u", LOG_INFO("HTTP: Disconnected the stream client: [%s]:%u; clients now: %u",
client_addr, client_port, client->server->run->stream_clients_count); client_addr, client_port, RUN(stream_clients_count));
if (conn != NULL) { if (conn != NULL) {
evhttp_connection_free(conn); evhttp_connection_free(conn);
} }
if (client->prev == NULL) { if (client->prev == NULL) {
client->server->run->stream_clients = client->next; RUN(stream_clients) = client->next;
} else { } else {
client->prev->next = client->next; client->prev->next = client->next;
} }
@@ -581,6 +594,8 @@ static void _http_callback_stream_error(UNUSED struct bufferevent *buf_event, UN
} }
free(client->key); free(client->key);
free(client); free(client);
# undef RUN
} }
static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated) { static void _http_queue_send_stream(struct http_server_t *server, bool stream_updated, bool picture_updated) {
@@ -638,11 +653,11 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
bool picture_updated = false; bool picture_updated = false;
# define UNLOCK_STREAM \ # define UNLOCK_STREAM \
{ server->run->stream->updated = false; A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); } { server->run->stream->updated = false; A_MUTEX_UNLOCK(&server->run->stream->mutex); }
if (server->run->stream->updated) { if (server->run->stream->updated) {
LOG_DEBUG("Refreshing HTTP exposed ..."); LOG_DEBUG("Refreshing HTTP exposed ...");
A_PTHREAD_M_LOCK(&server->run->stream->mutex); A_MUTEX_LOCK(&server->run->stream->mutex);
if (server->run->stream->picture.size > 0) { // If online if (server->run->stream->picture.size > 0) { // If online
picture_updated = _expose_new_picture(server); picture_updated = _expose_new_picture(server);
UNLOCK_STREAM; UNLOCK_STREAM;

View File

@@ -85,6 +85,7 @@ struct http_server_t {
bool unix_rm; bool unix_rm;
mode_t unix_mode; mode_t unix_mode;
unsigned drop_same_frames; unsigned drop_same_frames;
bool slowdown;
unsigned fake_width; unsigned fake_width;
unsigned fake_height; unsigned fake_height;
unsigned timeout; unsigned timeout;

View File

@@ -80,6 +80,7 @@ static const struct option _long_opts[] = {
{"unix-rm", no_argument, NULL, 'r'}, {"unix-rm", no_argument, NULL, 'r'},
{"unix-mode", required_argument, NULL, 'o'}, {"unix-mode", required_argument, NULL, 'o'},
{"drop-same-frames", required_argument, NULL, 'e'}, {"drop-same-frames", required_argument, NULL, 'e'},
{"slowdown", no_argument, NULL, 3000},
{"fake-width", required_argument, NULL, 3001}, {"fake-width", required_argument, NULL, 3001},
{"fake-height", required_argument, NULL, 3002}, {"fake-height", required_argument, NULL, 3002},
{"server-timeout", required_argument, NULL, 3003}, {"server-timeout", required_argument, NULL, 3003},
@@ -162,6 +163,8 @@ static void _help(struct device_t *dev, struct encoder_t *encoder, struct http_s
printf(" It can significantly reduce the outgoing traffic, but will increase\n"); printf(" It can significantly reduce the outgoing traffic, but will increase\n");
printf(" the CPU loading. Don't use this option with analog signal sources\n"); printf(" the CPU loading. Don't use this option with analog signal sources\n");
printf(" or webcams, it's useless. Default: disabled.\n\n"); printf(" or webcams, it's useless. Default: disabled.\n\n");
printf(" --slowdown -- Slowdown capturing to 1 FPS or less when no stream clients connected.\n");
printf(" Useful to reduce CPU cosumption. Default: disabled.\n\n");
printf(" --fake-width <N> -- Override image width for /state. Default: disabled\n\n"); printf(" --fake-width <N> -- Override image width for /state. Default: disabled\n\n");
printf(" --fake-height <N> -- Override image height for /state. Default: disabled.\n\n"); printf(" --fake-height <N> -- Override image height for /state. Default: disabled.\n\n");
printf(" --server-timeout <seconds> -- Timeout for client connections. Default: %u\n\n", server->timeout); printf(" --server-timeout <seconds> -- Timeout for client connections. Default: %u\n\n", server->timeout);
@@ -271,6 +274,7 @@ static int _parse_options(int argc, char *argv[], struct device_t *dev, struct e
case 'r': OPT_SET(server->unix_rm, true); case 'r': OPT_SET(server->unix_rm, true);
case 'o': OPT_CHMOD(server->unix_mode, "--unix-mode"); case 'o': OPT_CHMOD(server->unix_mode, "--unix-mode");
case 'e': OPT_UNSIGNED(server->drop_same_frames, "--drop-same-frames", 0, 30); case 'e': OPT_UNSIGNED(server->drop_same_frames, "--drop-same-frames", 0, 30);
case 3000: OPT_SET(server->slowdown, true);
case 3001: OPT_UNSIGNED(server->fake_width, "--fake-width", 0, 1920); case 3001: OPT_UNSIGNED(server->fake_width, "--fake-width", 0, 1920);
case 3002: OPT_UNSIGNED(server->fake_height, "--fake-height", 0, 1200); case 3002: OPT_UNSIGNED(server->fake_height, "--fake-height", 0, 1200);
case 3003: OPT_UNSIGNED(server->timeout, "--server-timeout", 1, 60); case 3003: OPT_UNSIGNED(server->timeout, "--server-timeout", 1, 60);
@@ -375,10 +379,10 @@ int main(int argc, char *argv[]) {
_ctx = &ctx; _ctx = &ctx;
if ((exit_code = http_server_listen(server)) == 0) { if ((exit_code = http_server_listen(server)) == 0) {
A_PTHREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); A_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL);
A_PTHREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); A_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL);
A_PTHREAD_JOIN(stream_loop_tid); A_THREAD_JOIN(server_loop_tid);
A_PTHREAD_JOIN(server_loop_tid); A_THREAD_JOIN(stream_loop_tid);
} }
} }

View File

@@ -20,6 +20,7 @@
*****************************************************************************/ *****************************************************************************/
#include <unistd.h>
#include <errno.h> #include <errno.h>
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
@@ -40,12 +41,12 @@
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool); static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool); static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
static void *_stream_worker_thread(void *v_ctx); static void *_stream_worker_thread(void *v_worker);
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool); static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
static int _stream_control(struct device_t *dev, bool enable); static int _stream_control(struct device_t *dev, bool enable);
static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info); static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
@@ -54,17 +55,22 @@ static int _stream_handle_event(struct device_t *dev);
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
struct process_t *proc;
struct stream_t *stream; struct stream_t *stream;
A_CALLOC(proc, 1);
A_CALLOC(stream, 1); A_CALLOC(stream, 1);
stream->dev = dev; stream->dev = dev;
stream->encoder = encoder; stream->encoder = encoder;
A_PTHREAD_M_INIT(&stream->mutex); A_MUTEX_INIT(&stream->mutex);
stream->proc = proc;
return stream; return stream;
} }
void stream_destroy(struct stream_t *stream) { void stream_destroy(struct stream_t *stream) {
A_PTHREAD_M_DESTROY(&stream->mutex); A_MUTEX_DESTROY(&stream->mutex);
free(stream->proc);
free(stream); free(stream);
} }
@@ -79,7 +85,7 @@ void stream_loop(struct stream_t *stream) {
LOG_INFO("Using V4L2 device: %s", stream->dev->path); LOG_INFO("Using V4L2 device: %s", stream->dev->path);
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps); LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
while (_stream_init_loop(stream->dev, &pool) == 0) { while (_stream_init_loop(stream, &pool) == 0) {
struct worker_t *oldest_worker = NULL; struct worker_t *oldest_worker = NULL;
struct worker_t *last_worker = NULL; struct worker_t *last_worker = NULL;
long double grab_after = 0; long double grab_after = 0;
@@ -93,24 +99,24 @@ void stream_loop(struct stream_t *stream) {
LOG_INFO("Capturing ..."); LOG_INFO("Capturing ...");
while (!stream->dev->stop) { while (!stream->proc->stop) {
int free_worker_number = -1; int free_worker_number = -1;
SEP_DEBUG('-'); SEP_DEBUG('-');
LOG_DEBUG("Waiting for workers ..."); LOG_DEBUG("Waiting for workers ...");
A_PTHREAD_M_LOCK(&pool.free_workers_mutex); A_MUTEX_LOCK(&pool.free_workers_mutex);
A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex);
if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) { if (oldest_worker && !oldest_worker->has_job && oldest_worker->buf_index >= 0) {
if (oldest_worker->job_failed) { if (oldest_worker->job_failed) {
break; break;
} }
_stream_expose_picture(stream, oldest_worker->ctx.buf_index); _stream_expose_picture(stream, oldest_worker->buf_index);
free_worker_number = oldest_worker->ctx.number; free_worker_number = oldest_worker->number;
oldest_worker = oldest_worker->order_next; oldest_worker = oldest_worker->order_next;
LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number); LOG_PERF("##### Raw frame accepted; worker = %u", free_worker_number);
@@ -133,10 +139,14 @@ void stream_loop(struct stream_t *stream) {
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number); LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
} }
if (stream->dev->stop) { if (stream->proc->stop) {
break; break;
} }
if (stream->proc->slowdown) {
usleep(1000000);
}
# define INIT_FD_SET(_set) \ # define INIT_FD_SET(_set) \
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set); fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
@@ -222,7 +232,7 @@ void stream_loop(struct stream_t *stream) {
# define FREE_WORKER(_next) pool.workers[free_worker_number]._next # define FREE_WORKER(_next) pool.workers[free_worker_number]._next
LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index); LOG_DEBUG("Grabbed a new frame to buffer %u", buf_info.index);
FREE_WORKER(ctx.buf_info) = buf_info; FREE_WORKER(buf_info) = buf_info;
if (!oldest_worker) { if (!oldest_worker) {
oldest_worker = &pool.workers[free_worker_number]; oldest_worker = &pool.workers[free_worker_number];
@@ -240,17 +250,17 @@ void stream_loop(struct stream_t *stream) {
} }
last_worker->order_next = NULL; last_worker->order_next = NULL;
A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex)); A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(ctx.buf_index) = buf_info.index; FREE_WORKER(buf_index) = buf_info.index;
FREE_WORKER(has_job) = true; FREE_WORKER(has_job) = true;
A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex)); A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond)); A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
# undef FREE_WORKER # undef FREE_WORKER
A_PTHREAD_M_LOCK(&pool.free_workers_mutex); A_MUTEX_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1; pool.free_workers -= 1;
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex);
goto next_handlers; // Поток сам освободит буфер goto next_handlers; // Поток сам освободит буфер
@@ -277,28 +287,32 @@ void stream_loop(struct stream_t *stream) {
} }
} }
A_PTHREAD_M_LOCK(&stream->mutex); A_MUTEX_LOCK(&stream->mutex);
stream->picture.size = 0; // On stream offline stream->picture.size = 0; // On stream offline
free(stream->picture.data); free(stream->picture.data);
stream->width = 0; stream->width = 0;
stream->height = 0; stream->height = 0;
stream->updated = true; stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex); A_MUTEX_UNLOCK(&stream->mutex);
} }
_stream_destroy_workers(stream->dev, &pool); _stream_destroy_workers(stream, &pool);
_stream_control(stream->dev, false); _stream_control(stream->dev, false);
device_close(stream->dev); device_close(stream->dev);
} }
void stream_loop_break(struct stream_t *stream) { void stream_loop_break(struct stream_t *stream) {
stream->dev->stop = 1; stream->proc->stop = 1;
}
void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
stream->proc->slowdown = slowdown;
} }
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next # define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
A_PTHREAD_M_LOCK(&stream->mutex); A_MUTEX_LOCK(&stream->mutex);
stream->picture.size = PICTURE(size); stream->picture.size = PICTURE(size);
stream->picture.allocated = PICTURE(allocated); stream->picture.allocated = PICTURE(allocated);
@@ -313,7 +327,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
stream->height = stream->dev->run->height; stream->height = stream->dev->run->height;
stream->updated = true; stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex); A_MUTEX_UNLOCK(&stream->mutex);
# undef PICTURE # undef PICTURE
} }
@@ -327,11 +341,11 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
for (unsigned number = 0; number < dev->n_workers; ++number) { for (unsigned number = 0; number < dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex)); A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) { if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time); sum_comp_time += WORKER(last_comp_time);
} }
A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex)); A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER # undef WORKER
} }
@@ -348,14 +362,14 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
return min_delay; return min_delay;
} }
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) { static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
int retval = -1; int retval = -1;
LOG_DEBUG("%s: *dev->stop = %d", __FUNCTION__, dev->stop); LOG_DEBUG("%s: *stream->proc->stop = %d", __FUNCTION__, stream->proc->stop);
while (!dev->stop) { while (!stream->proc->stop) {
if ((retval = _stream_init(dev, pool)) < 0) { if ((retval = _stream_init(stream, pool)) < 0) {
LOG_INFO("Sleeping %u seconds before new stream init ...", dev->error_delay); LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
sleep(dev->error_delay); sleep(stream->dev->error_delay);
} else { } else {
break; break;
} }
@@ -363,150 +377,139 @@ static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool)
return retval; return retval;
} }
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) { static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
SEP_INFO('='); SEP_INFO('=');
_stream_destroy_workers(dev, pool); _stream_destroy_workers(stream, pool);
_stream_control(dev, false); _stream_control(stream->dev, false);
device_close(dev); device_close(stream->dev);
if (device_open(dev) < 0) { if (device_open(stream->dev) < 0) {
goto error; goto error;
} }
if (_stream_control(dev, true) < 0) { if (_stream_control(stream->dev, true) < 0) {
goto error; goto error;
} }
encoder_prepare_live(pool->encoder, dev); encoder_prepare_live(pool->encoder, stream->dev);
_stream_init_workers(dev, pool); _stream_init_workers(stream, pool);
return 0; return 0;
error: error:
device_close(dev); device_close(stream->dev);
return -1; return -1;
} }
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) { static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
LOG_INFO("Spawning %u workers ...", dev->n_workers); LOG_INFO("Spawning %u workers ...", stream->dev->n_workers);
*pool->workers_stop = false; *pool->workers_stop = false;
A_CALLOC(pool->workers, dev->n_workers); A_CALLOC(pool->workers, stream->dev->n_workers);
A_PTHREAD_M_INIT(&pool->free_workers_mutex); A_MUTEX_INIT(&pool->free_workers_mutex);
A_PTHREAD_C_INIT(&pool->free_workers_cond); A_COND_INIT(&pool->free_workers_cond);
for (unsigned number = 0; number < dev->n_workers; ++number) { for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
# define CTX(_next) WORKER(ctx._next)
pool->free_workers += 1; pool->free_workers += 1;
A_PTHREAD_M_INIT(&WORKER(has_job_mutex)); A_MUTEX_INIT(&WORKER(has_job_mutex));
A_PTHREAD_C_INIT(&WORKER(has_job_cond)); A_COND_INIT(&WORKER(has_job_cond));
CTX(number) = number; WORKER(number) = number;
CTX(dev) = dev; WORKER(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop);
CTX(dev_stop) = (sig_atomic_t *volatile)&dev->stop; WORKER(workers_stop) = pool->workers_stop;
CTX(workers_stop) = pool->workers_stop;
CTX(encoder) = pool->encoder; WORKER(free_workers_mutex) = &pool->free_workers_mutex;
WORKER(free_workers) = &pool->free_workers;
WORKER(free_workers_cond) = &pool->free_workers_cond;
CTX(last_comp_time_mutex) = &WORKER(last_comp_time_mutex); WORKER(dev) = stream->dev;
CTX(last_comp_time) = &WORKER(last_comp_time); WORKER(encoder) = pool->encoder;
CTX(has_job_mutex) = &WORKER(has_job_mutex); A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
CTX(has_job) = &WORKER(has_job);
CTX(job_failed) = &WORKER(job_failed);
CTX(job_start_time) = &WORKER(job_start_time);
CTX(has_job_cond) = &WORKER(has_job_cond);
CTX(free_workers_mutex) = &pool->free_workers_mutex;
CTX(free_workers) = &pool->free_workers;
CTX(free_workers_cond) = &pool->free_workers_cond;
A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx));
# undef CTX
# undef WORKER # undef WORKER
} }
} }
static void *_stream_worker_thread(void *v_ctx) { static void *_stream_worker_thread(void *v_worker) {
struct worker_context_t *ctx = (struct worker_context_t *)v_ctx; struct worker_t *worker = (struct worker_t *)v_worker;
LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number); LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
while (!*ctx->dev_stop && !*ctx->workers_stop) { while (!*worker->proc_stop && !*worker->workers_stop) {
LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); LOG_DEBUG("Worker %u waiting for a new job ...", worker->number);
A_PTHREAD_M_LOCK(ctx->has_job_mutex); A_MUTEX_LOCK(&worker->has_job_mutex);
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); A_COND_WAIT_TRUE(worker->has_job, &worker->has_job_cond, &worker->has_job_mutex);
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); A_MUTEX_UNLOCK(&worker->has_job_mutex);
if (!*ctx->workers_stop) { if (!*worker->workers_stop) {
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next # define PICTURE(_next) worker->dev->run->pictures[worker->buf_index]._next
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", ctx->number, ctx->buf_index); LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", worker->number, worker->buf_index);
PICTURE(encode_begin_time) = get_now_monotonic(); PICTURE(encode_begin_time) = get_now_monotonic();
if (encoder_compress_buffer(ctx->encoder, ctx->dev, ctx->number, ctx->buf_index) < 0) { if (encoder_compress_buffer(worker->encoder, worker->dev, worker->number, worker->buf_index) < 0) {
*ctx->job_failed = true; worker->job_failed = true;
} }
PICTURE(encode_end_time) = get_now_monotonic(); PICTURE(encode_end_time) = get_now_monotonic();
if (_stream_release_buffer(ctx->dev, &ctx->buf_info) == 0) { if (_stream_release_buffer(worker->dev, &worker->buf_info) == 0) {
*ctx->job_start_time = PICTURE(encode_begin_time); worker->job_start_time = PICTURE(encode_begin_time);
*ctx->has_job = false; worker->has_job = false;
long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time; long double last_comp_time = PICTURE(encode_end_time) - worker->job_start_time;
A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); A_MUTEX_LOCK(&worker->last_comp_time_mutex);
*ctx->last_comp_time = last_comp_time; worker->last_comp_time = last_comp_time;
A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); A_MUTEX_UNLOCK(&worker->last_comp_time_mutex);
LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u", LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u",
PICTURE(size), last_comp_time, ctx->number, ctx->buf_index); PICTURE(size), last_comp_time, worker->number, worker->buf_index);
} else { } else {
*ctx->job_failed = true; worker->job_failed = true;
*ctx->has_job = false; worker->has_job = false;
} }
# undef PICTURE # undef PICTURE
} }
A_PTHREAD_M_LOCK(ctx->free_workers_mutex); A_MUTEX_LOCK(worker->free_workers_mutex);
*ctx->free_workers += 1; *worker->free_workers += 1;
A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex); A_MUTEX_UNLOCK(worker->free_workers_mutex);
A_PTHREAD_C_SIGNAL(ctx->free_workers_cond); A_COND_SIGNAL(worker->free_workers_cond);
} }
LOG_DEBUG("Bye-bye (worker %u)", ctx->number); LOG_DEBUG("Bye-bye (worker %u)", worker->number);
return NULL; return NULL;
} }
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) { static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
if (pool->workers) { if (pool->workers) {
LOG_INFO("Destroying workers ..."); LOG_INFO("Destroying workers ...");
*pool->workers_stop = true; *pool->workers_stop = true;
for (unsigned number = 0; number < dev->n_workers; ++number) { for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
A_PTHREAD_M_LOCK(&WORKER(has_job_mutex)); A_MUTEX_LOCK(&WORKER(has_job_mutex));
WORKER(has_job) = true; // Final job: die WORKER(has_job) = true; // Final job: die
A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex)); A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond)); A_COND_SIGNAL(&WORKER(has_job_cond));
A_PTHREAD_JOIN(WORKER(tid)); A_THREAD_JOIN(WORKER(tid));
A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex)); A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_PTHREAD_C_DESTROY(&WORKER(has_job_cond)); A_COND_DESTROY(&WORKER(has_job_cond));
# undef WORKER # undef WORKER
} }
A_PTHREAD_M_DESTROY(&pool->free_workers_mutex); A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_PTHREAD_C_DESTROY(&pool->free_workers_cond); A_COND_DESTROY(&pool->free_workers_cond);
free(pool->workers); free(pool->workers);
} }

View File

@@ -31,45 +31,32 @@
#include "encoder.h" #include "encoder.h"
struct worker_context_t { struct worker_t {
pthread_t tid;
unsigned number; unsigned number;
struct device_t *dev; sig_atomic_t *volatile proc_stop;
int buf_index;
struct v4l2_buffer buf_info;
sig_atomic_t *volatile dev_stop;
bool *workers_stop; bool *workers_stop;
struct encoder_t *encoder; pthread_mutex_t last_comp_time_mutex;
long double last_comp_time;
pthread_mutex_t *last_comp_time_mutex; pthread_mutex_t has_job_mutex;
long double *last_comp_time; int buf_index;
struct v4l2_buffer buf_info;
pthread_mutex_t *has_job_mutex; bool has_job;
bool *has_job; bool job_failed;
bool *job_failed; long double job_start_time;
long double *job_start_time; pthread_cond_t has_job_cond;
pthread_cond_t *has_job_cond;
pthread_mutex_t *free_workers_mutex; pthread_mutex_t *free_workers_mutex;
unsigned *free_workers; unsigned *free_workers;
pthread_cond_t *free_workers_cond; pthread_cond_t *free_workers_cond;
};
struct worker_t { struct worker_t *order_prev;
struct worker_context_t ctx; struct worker_t *order_next;
pthread_t tid;
pthread_mutex_t last_comp_time_mutex; struct device_t *dev;
long double last_comp_time; struct encoder_t *encoder;
pthread_mutex_t has_job_mutex;
bool has_job;
bool job_failed;
long double job_start_time;
pthread_cond_t has_job_cond;
struct worker_t *order_prev;
struct worker_t *order_next;
}; };
struct workers_pool_t { struct workers_pool_t {
@@ -83,6 +70,11 @@ struct workers_pool_t {
struct encoder_t *encoder; struct encoder_t *encoder;
}; };
struct process_t {
sig_atomic_t volatile stop;
bool slowdown;
};
struct stream_t { struct stream_t {
struct picture_t picture; struct picture_t picture;
unsigned width; unsigned width;
@@ -90,6 +82,8 @@ struct stream_t {
unsigned captured_fps; unsigned captured_fps;
bool updated; bool updated;
pthread_mutex_t mutex; pthread_mutex_t mutex;
struct process_t *proc;
struct device_t *dev; struct device_t *dev;
struct encoder_t *encoder; struct encoder_t *encoder;
}; };
@@ -100,3 +94,4 @@ void stream_destroy(struct stream_t *stream);
void stream_loop(struct stream_t *stream); void stream_loop(struct stream_t *stream);
void stream_loop_break(struct stream_t *stream); void stream_loop_break(struct stream_t *stream);
void stream_switch_slowdown(struct stream_t *stream, bool slowdown);

View File

@@ -35,18 +35,18 @@
#include <sys/syscall.h> #include <sys/syscall.h>
#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg)) #define A_THREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg))
#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL)) #define A_THREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL))
#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL)) #define A_MUTEX_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL))
#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex)) #define A_MUTEX_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex))
#define A_PTHREAD_M_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex)) #define A_MUTEX_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex))
#define A_PTHREAD_M_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex)) #define A_MUTEX_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex))
#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) #define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) #define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) #define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } #define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
#define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest))))) #define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest)))))
#define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest))))) #define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest)))))