mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-04-19 21:16:12 +00:00
refactoring
This commit is contained in:
195
src/stream.c
195
src/stream.c
@@ -46,17 +46,17 @@
|
||||
#endif
|
||||
|
||||
|
||||
static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker);
|
||||
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool);
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
||||
static void _stream_assign_worker(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index);
|
||||
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index);
|
||||
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
|
||||
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream);
|
||||
static struct workers_pool_t *_stream_init(struct stream_t *stream);
|
||||
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream);
|
||||
static void *_stream_worker_thread(void *v_worker);
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool);
|
||||
static void _workers_pool_destroy(struct workers_pool_t *pool);
|
||||
|
||||
|
||||
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
||||
@@ -83,16 +83,12 @@ void stream_destroy(struct stream_t *stream) {
|
||||
}
|
||||
|
||||
void stream_loop(struct stream_t *stream) {
|
||||
struct workers_pool_t pool;
|
||||
|
||||
MEMSET_ZERO(pool);
|
||||
atomic_init(&pool.workers_stop, false);
|
||||
pool.encoder = stream->encoder;
|
||||
struct workers_pool_t *pool;
|
||||
|
||||
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
||||
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
||||
|
||||
while (_stream_init_loop(stream, &pool) == 0) {
|
||||
while ((pool = _stream_init_loop(stream)) != NULL) {
|
||||
long double grab_after = 0;
|
||||
unsigned fluency_passed = 0;
|
||||
unsigned captured_fps_accum = 0;
|
||||
@@ -110,15 +106,17 @@ void stream_loop(struct stream_t *stream) {
|
||||
SEP_DEBUG('-');
|
||||
LOG_DEBUG("Waiting for worker ...");
|
||||
|
||||
if (_stream_wait_worker(stream, &pool, &ready_worker)) {
|
||||
_stream_expose_picture(stream, ready_worker->buf_index);
|
||||
LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number);
|
||||
} else {
|
||||
LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number);
|
||||
}
|
||||
ready_worker = _workers_pool_wait(pool);
|
||||
|
||||
if (ready_worker == NULL) {
|
||||
break; // Если произошел сбой сжатия
|
||||
if (!ready_worker->job_failed) {
|
||||
if (ready_worker->job_timely) {
|
||||
_stream_expose_picture(stream, ready_worker->buf_index);
|
||||
LOG_PERF("##### Encoded picture exposed; worker = %u", ready_worker->number);
|
||||
} else {
|
||||
LOG_PERF("----- Encoded picture dropped; worker = %u", ready_worker->number);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
if (atomic_load(&stream->proc->stop)) {
|
||||
@@ -193,13 +191,13 @@ void stream_loop(struct stream_t *stream) {
|
||||
}
|
||||
captured_fps_accum += 1;
|
||||
|
||||
long double fluency_delay = _stream_get_fluency_delay(stream->dev, &pool);
|
||||
long double fluency_delay = _stream_get_fluency_delay(stream->dev, pool);
|
||||
|
||||
grab_after = now + fluency_delay;
|
||||
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
|
||||
}
|
||||
|
||||
_stream_assign_worker(&pool, ready_worker, buf_index);
|
||||
_workers_pool_assign(pool, ready_worker, buf_index);
|
||||
|
||||
goto next_handlers; // Поток сам освободит буфер
|
||||
|
||||
@@ -233,11 +231,11 @@ void stream_loop(struct stream_t *stream) {
|
||||
stream->height = 0;
|
||||
atomic_store(&stream->updated, true);
|
||||
A_MUTEX_UNLOCK(&stream->mutex);
|
||||
}
|
||||
|
||||
_stream_destroy_workers(stream, &pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
_workers_pool_destroy(pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
}
|
||||
}
|
||||
|
||||
void stream_loop_break(struct stream_t *stream) {
|
||||
@@ -248,40 +246,33 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
|
||||
atomic_store(&stream->proc->slowdown, slowdown);
|
||||
}
|
||||
|
||||
static bool _stream_wait_worker(struct stream_t *stream, struct workers_pool_t *pool, struct worker_t **ready_worker) {
|
||||
bool ok;
|
||||
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) {
|
||||
struct worker_t *ready_worker = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
*ready_worker = NULL;
|
||||
|
||||
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
|
||||
*ready_worker = pool->oldest_worker;
|
||||
ready_worker = pool->oldest_worker;
|
||||
ready_worker->job_timely = true;
|
||||
pool->oldest_worker = pool->oldest_worker->order_next;
|
||||
ok = true; // Воркер успел вовремя
|
||||
} else {
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
if (
|
||||
!atomic_load(&pool->workers[number].has_job) && (
|
||||
*ready_worker == NULL
|
||||
|| (*ready_worker)->job_start_time < pool->workers[number].job_start_time
|
||||
ready_worker == NULL
|
||||
|| ready_worker->job_start_time < pool->workers[number].job_start_time
|
||||
)
|
||||
) {
|
||||
*ready_worker = &pool->workers[number];
|
||||
ready_worker = &pool->workers[number];
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(*ready_worker != NULL);
|
||||
ok = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
|
||||
assert(ready_worker != NULL);
|
||||
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
|
||||
}
|
||||
|
||||
if ((*ready_worker)->job_failed) {
|
||||
*ready_worker = NULL;
|
||||
ok = false;
|
||||
}
|
||||
return ok;
|
||||
return ready_worker;
|
||||
}
|
||||
|
||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
|
||||
@@ -307,7 +298,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
|
||||
# undef PICTURE
|
||||
}
|
||||
|
||||
static void _stream_assign_worker(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) {
|
||||
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) {
|
||||
if (pool->oldest_worker == NULL) {
|
||||
pool->oldest_worker = ready_worker;
|
||||
pool->latest_worker = pool->oldest_worker;
|
||||
@@ -343,7 +334,7 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
long double min_delay;
|
||||
long double soft_delay;
|
||||
|
||||
for (unsigned number = 0; number < dev->run->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
|
||||
@@ -355,9 +346,9 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
# undef WORKER
|
||||
}
|
||||
|
||||
avg_comp_time = sum_comp_time / dev->run->n_workers; // Среднее время работы воркеров
|
||||
avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров
|
||||
|
||||
min_delay = avg_comp_time / dev->run->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
|
||||
if (dev->desired_fps > 0 && min_delay > 0) {
|
||||
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
|
||||
@@ -368,28 +359,25 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
static int _stream_init_loop(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
int retval = -1;
|
||||
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) {
|
||||
struct workers_pool_t *pool = NULL;
|
||||
|
||||
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
if ((retval = _stream_init(stream, pool)) < 0) {
|
||||
SEP_INFO('=');
|
||||
|
||||
if ((pool = _stream_init(stream)) == NULL) {
|
||||
LOG_INFO("Sleeping %u seconds before new stream init ...", stream->dev->error_delay);
|
||||
sleep(stream->dev->error_delay);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return retval;
|
||||
return pool;
|
||||
}
|
||||
|
||||
static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
_stream_destroy_workers(stream, pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
|
||||
SEP_INFO('=');
|
||||
|
||||
static struct workers_pool_t *_stream_init(struct stream_t *stream) {
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
@@ -397,50 +385,54 @@ static int _stream_init(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
encoder_prepare(pool->encoder, stream->dev);
|
||||
|
||||
_stream_init_workers(stream, pool);
|
||||
|
||||
return 0;
|
||||
encoder_prepare(stream->encoder, stream->dev);
|
||||
return _workers_pool_init(stream);
|
||||
|
||||
error:
|
||||
device_close(stream->dev);
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
LOG_INFO("Spawning %u workers ...", stream->dev->run->n_workers);
|
||||
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
||||
struct workers_pool_t *pool;
|
||||
|
||||
atomic_store(&pool->workers_stop, false);
|
||||
A_CALLOC(pool->workers, stream->dev->run->n_workers);
|
||||
LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers);
|
||||
|
||||
A_CALLOC(pool, 1);
|
||||
|
||||
pool->n_workers = stream->dev->run->n_workers;
|
||||
A_CALLOC(pool->workers, pool->n_workers);
|
||||
|
||||
atomic_init(&pool->workers_stop, false);
|
||||
|
||||
A_MUTEX_INIT(&pool->free_workers_mutex);
|
||||
A_COND_INIT(&pool->free_workers_cond);
|
||||
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
pool->free_workers += 1;
|
||||
|
||||
A_MUTEX_INIT(&WORKER(has_job_mutex));
|
||||
atomic_init(&WORKER(has_job), false);
|
||||
A_COND_INIT(&WORKER(has_job_cond));
|
||||
|
||||
WORKER(number) = number;
|
||||
WORKER(proc_stop) = &stream->proc->stop;
|
||||
WORKER(workers_stop) = &pool->workers_stop;
|
||||
WORKER(number) = number;
|
||||
WORKER(proc_stop) = &stream->proc->stop;
|
||||
WORKER(workers_stop) = &pool->workers_stop;
|
||||
|
||||
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WORKER(free_workers) = &pool->free_workers;
|
||||
WORKER(free_workers_cond) = &pool->free_workers_cond;
|
||||
WORKER(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WORKER(free_workers) = &pool->free_workers;
|
||||
WORKER(free_workers_cond) = &pool->free_workers_cond;
|
||||
|
||||
WORKER(dev) = stream->dev;
|
||||
WORKER(encoder) = pool->encoder;
|
||||
WORKER(dev) = stream->dev;
|
||||
WORKER(encoder) = stream->encoder;
|
||||
|
||||
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
|
||||
|
||||
pool->free_workers += 1;
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
static void *_stream_worker_thread(void *v_worker) {
|
||||
@@ -509,33 +501,28 @@ static void *_stream_worker_thread(void *v_worker) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool_t *pool) {
|
||||
if (pool->workers) {
|
||||
LOG_INFO("Destroying workers ...");
|
||||
static void _workers_pool_destroy(struct workers_pool_t *pool) {
|
||||
LOG_INFO("Destroying workers pool ...");
|
||||
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < stream->dev->run->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WORKER(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_LOCK(&WORKER(has_job_mutex));
|
||||
atomic_store(&WORKER(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&WORKER(has_job_cond));
|
||||
A_MUTEX_LOCK(&WORKER(has_job_mutex));
|
||||
atomic_store(&WORKER(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
|
||||
A_COND_SIGNAL(&WORKER(has_job_cond));
|
||||
|
||||
A_THREAD_JOIN(WORKER(tid));
|
||||
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
|
||||
A_COND_DESTROY(&WORKER(has_job_cond));
|
||||
A_THREAD_JOIN(WORKER(tid));
|
||||
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
|
||||
A_COND_DESTROY(&WORKER(has_job_cond));
|
||||
|
||||
# undef WORKER
|
||||
}
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
# undef WORKER
|
||||
}
|
||||
pool->free_workers = 0;
|
||||
pool->workers = NULL;
|
||||
pool->oldest_worker = NULL;
|
||||
pool->latest_worker = NULL;
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
free(pool);
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ struct worker_t {
|
||||
pthread_mutex_t has_job_mutex;
|
||||
int buf_index;
|
||||
atomic_bool has_job;
|
||||
bool job_timely;
|
||||
bool job_failed;
|
||||
long double job_start_time;
|
||||
pthread_cond_t has_job_cond;
|
||||
@@ -59,6 +60,7 @@ struct worker_t {
|
||||
};
|
||||
|
||||
struct workers_pool_t {
|
||||
unsigned n_workers;
|
||||
struct worker_t *workers;
|
||||
struct worker_t *oldest_worker;
|
||||
struct worker_t *latest_worker;
|
||||
@@ -68,8 +70,6 @@ struct workers_pool_t {
|
||||
pthread_cond_t free_workers_cond;
|
||||
|
||||
atomic_bool workers_stop;
|
||||
|
||||
struct encoder_t *encoder;
|
||||
};
|
||||
|
||||
struct process_t {
|
||||
|
||||
Reference in New Issue
Block a user