mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-03 06:11:42 +00:00
slowdown
This commit is contained in:
80
src/stream.c
80
src/stream.c
@@ -20,6 +20,7 @@
|
||||
*****************************************************************************/
|
||||
|
||||
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
#include <assert.h>
|
||||
@@ -40,12 +41,12 @@
|
||||
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
||||
|
||||
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
|
||||
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static void *_stream_worker_thread(void *v_ctx);
|
||||
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool);
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
|
||||
static int _stream_control(struct device_t *dev, bool enable);
|
||||
static int _stream_grab_buffer(struct device_t *dev, struct v4l2_buffer *buf_info);
|
||||
@@ -54,17 +55,22 @@ static int _stream_handle_event(struct device_t *dev);
|
||||
|
||||
|
||||
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
||||
struct process_t *proc;
|
||||
struct stream_t *stream;
|
||||
|
||||
A_CALLOC(proc, 1);
|
||||
|
||||
A_CALLOC(stream, 1);
|
||||
stream->dev = dev;
|
||||
stream->encoder = encoder;
|
||||
A_PTHREAD_M_INIT(&stream->mutex);
|
||||
stream->proc = proc;
|
||||
return stream;
|
||||
}
|
||||
|
||||
void stream_destroy(struct stream_t *stream) {
|
||||
A_PTHREAD_M_DESTROY(&stream->mutex);
|
||||
free(stream->proc);
|
||||
free(stream);
|
||||
}
|
||||
|
||||
@@ -79,7 +85,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
||||
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
||||
|
||||
while (_stream_init_loop(stream->dev, &pool) == 0) {
|
||||
while (_stream_init_loop(stream, &pool) == 0) {
|
||||
struct worker_t *oldest_worker = NULL;
|
||||
struct worker_t *last_worker = NULL;
|
||||
long double grab_after = 0;
|
||||
@@ -93,7 +99,7 @@ void stream_loop(struct stream_t *stream) {
|
||||
|
||||
LOG_INFO("Capturing ...");
|
||||
|
||||
while (!stream->dev->stop) {
|
||||
while (!stream->proc->stop) {
|
||||
int free_worker_number = -1;
|
||||
|
||||
SEP_DEBUG('-');
|
||||
@@ -133,10 +139,14 @@ void stream_loop(struct stream_t *stream) {
|
||||
LOG_PERF("----- Raw frame dropped; worker = %u", free_worker_number);
|
||||
}
|
||||
|
||||
if (stream->dev->stop) {
|
||||
if (stream->proc->stop) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (stream->proc->slowdown) {
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
# define INIT_FD_SET(_set) \
|
||||
fd_set _set; FD_ZERO(&_set); FD_SET(stream->dev->run->fd, &_set);
|
||||
|
||||
@@ -286,13 +296,17 @@ void stream_loop(struct stream_t *stream) {
|
||||
A_PTHREAD_M_UNLOCK(&stream->mutex);
|
||||
}
|
||||
|
||||
_stream_destroy_workers(stream->dev, &pool);
|
||||
_stream_destroy_workers(stream, &pool);
|
||||
_stream_control(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
}
|
||||
|
||||
void stream_loop_break(struct stream_t *stream) {
|
||||
stream->dev->stop = 1;
|
||||
stream->proc->stop = 1;
|
||||
}
|
||||
|
||||
void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
|
||||
stream->proc->slowdown = slowdown;
|
||||
}
|
||||
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
|
||||
@@ -348,14 +362,14 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
int retval = -1;
|
||||
|
||||
LOG_DEBUG("%s: *dev->stop = %d", __FUNCTION__, dev->stop);
|
||||
while (!dev->stop) {
|
||||
if ((retval = _stream_init(dev, pool)) < 0) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", dev->error_delay);
|
||||
sleep(dev->error_delay);
|
||||
LOG_DEBUG("%s: *stream->proc->stop = %d", __FUNCTION__, stream->proc->stop);
|
||||
while (!stream->proc->stop) {
|
||||
if ((retval = _stream_init(stream, pool)) < 0) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
|
||||
sleep(stream->dev->error_delay);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -363,41 +377,41 @@ static int _stream_init_loop(struct device_t *dev, struct workers_pool_t *pool)
|
||||
return retval;
|
||||
}
|
||||
|
||||
static int _stream_init(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
SEP_INFO('=');
|
||||
|
||||
_stream_destroy_workers(dev, pool);
|
||||
_stream_control(dev, false);
|
||||
device_close(dev);
|
||||
_stream_destroy_workers(stream, pool);
|
||||
_stream_control(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
|
||||
if (device_open(dev) < 0) {
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
if (_stream_control(dev, true) < 0) {
|
||||
if (_stream_control(stream->dev, true) < 0) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
encoder_prepare_live(pool->encoder, dev);
|
||||
encoder_prepare_live(pool->encoder, stream->dev);
|
||||
|
||||
_stream_init_workers(dev, pool);
|
||||
_stream_init_workers(stream, pool);
|
||||
|
||||
return 0;
|
||||
|
||||
error:
|
||||
device_close(dev);
|
||||
device_close(stream->dev);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", dev->n_workers);
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", stream->dev->n_workers);
|
||||
|
||||
*pool->workers_stop = false;
|
||||
A_CALLOC(pool->workers, dev->n_workers);
|
||||
A_CALLOC(pool->workers, stream->dev->n_workers);
|
||||
|
||||
A_PTHREAD_M_INIT(&pool->free_workers_mutex);
|
||||
A_PTHREAD_C_INIT(&pool->free_workers_cond);
|
||||
|
||||
for (unsigned number = 0; number < dev->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
# define CTX(_next) WORKER(ctx._next)
|
||||
|
||||
@@ -407,8 +421,8 @@ static void _stream_init_workers(struct device_t *dev, struct workers_pool_t *po
|
||||
A_PTHREAD_C_INIT(&WORKER(has_job_cond));
|
||||
|
||||
CTX(number) = number;
|
||||
CTX(dev) = dev;
|
||||
CTX(dev_stop) = (sig_atomic_t *volatile)&dev->stop;
|
||||
CTX(dev) = stream->dev;
|
||||
CTX(proc_stop) = (sig_atomic_t *volatile)&(stream->proc->stop);
|
||||
CTX(workers_stop) = pool->workers_stop;
|
||||
|
||||
CTX(encoder) = pool->encoder;
|
||||
@@ -438,7 +452,7 @@ static void *_stream_worker_thread(void *v_ctx) {
|
||||
|
||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", ctx->number);
|
||||
|
||||
while (!*ctx->dev_stop && !*ctx->workers_stop) {
|
||||
while (!*ctx->proc_stop && !*ctx->workers_stop) {
|
||||
LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number);
|
||||
A_PTHREAD_M_LOCK(ctx->has_job_mutex);
|
||||
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
|
||||
@@ -485,12 +499,12 @@ static void *_stream_worker_thread(void *v_ctx) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_destroy_workers(struct device_t *dev, struct workers_pool_t *pool) {
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
if (pool->workers) {
|
||||
LOG_INFO("Destroying workers ...");
|
||||
|
||||
*pool->workers_stop = true;
|
||||
for (unsigned number = 0; number < dev->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_PTHREAD_M_LOCK(&WORKER(has_job_mutex));
|
||||
|
||||
Reference in New Issue
Block a user