mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-16 04:23:42 +00:00
refactoring
This commit is contained in:
90
src/stream.c
90
src/stream.c
@@ -46,18 +46,62 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream);
|
struct _worker_t {
|
||||||
static struct workers_pool_t *_stream_init(struct stream_t *stream);
|
pthread_t tid;
|
||||||
|
unsigned number;
|
||||||
|
atomic_bool *proc_stop;
|
||||||
|
atomic_bool *workers_stop;
|
||||||
|
|
||||||
|
pthread_mutex_t last_comp_time_mutex;
|
||||||
|
long double last_comp_time;
|
||||||
|
|
||||||
|
pthread_mutex_t has_job_mutex;
|
||||||
|
int buf_index;
|
||||||
|
atomic_bool has_job;
|
||||||
|
bool job_timely;
|
||||||
|
bool job_failed;
|
||||||
|
long double job_start_time;
|
||||||
|
pthread_cond_t has_job_cond;
|
||||||
|
|
||||||
|
pthread_mutex_t *free_workers_mutex;
|
||||||
|
unsigned *free_workers;
|
||||||
|
pthread_cond_t *free_workers_cond;
|
||||||
|
|
||||||
|
struct _worker_t *order_prev;
|
||||||
|
struct _worker_t *order_next;
|
||||||
|
|
||||||
|
struct device_t *dev;
|
||||||
|
struct encoder_t *encoder;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct _workers_pool_t {
|
||||||
|
unsigned n_workers;
|
||||||
|
struct _worker_t *workers;
|
||||||
|
struct _worker_t *oldest_worker;
|
||||||
|
struct _worker_t *latest_worker;
|
||||||
|
|
||||||
|
pthread_mutex_t free_workers_mutex;
|
||||||
|
unsigned free_workers;
|
||||||
|
pthread_cond_t free_workers_cond;
|
||||||
|
|
||||||
|
atomic_bool workers_stop;
|
||||||
|
|
||||||
|
long double desired_frames_interval;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream);
|
||||||
|
static struct _workers_pool_t *_stream_init(struct stream_t *stream);
|
||||||
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
|
||||||
|
|
||||||
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream);
|
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream);
|
||||||
static void _workers_pool_destroy(struct workers_pool_t *pool);
|
static void _workers_pool_destroy(struct _workers_pool_t *pool);
|
||||||
|
|
||||||
static void *_worker_thread(void *v_worker);
|
static void *__worker_thread(void *v_worker);
|
||||||
|
|
||||||
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool);
|
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool);
|
||||||
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index);
|
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index);
|
||||||
static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool);
|
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool);
|
||||||
|
|
||||||
|
|
||||||
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
|
||||||
@@ -84,7 +128,7 @@ void stream_destroy(struct stream_t *stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void stream_loop(struct stream_t *stream) {
|
void stream_loop(struct stream_t *stream) {
|
||||||
struct workers_pool_t *pool;
|
struct _workers_pool_t *pool;
|
||||||
|
|
||||||
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
LOG_INFO("Using V4L2 device: %s", stream->dev->path);
|
||||||
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
LOG_INFO("Using desired FPS: %u", stream->dev->desired_fps);
|
||||||
@@ -102,7 +146,7 @@ void stream_loop(struct stream_t *stream) {
|
|||||||
LOG_INFO("Capturing ...");
|
LOG_INFO("Capturing ...");
|
||||||
|
|
||||||
while (!atomic_load(&stream->proc->stop)) {
|
while (!atomic_load(&stream->proc->stop)) {
|
||||||
struct worker_t *ready_worker;
|
struct _worker_t *ready_worker;
|
||||||
|
|
||||||
SEP_DEBUG('-');
|
SEP_DEBUG('-');
|
||||||
LOG_DEBUG("Waiting for worker ...");
|
LOG_DEBUG("Waiting for worker ...");
|
||||||
@@ -247,8 +291,8 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
|
|||||||
atomic_store(&stream->proc->slowdown, slowdown);
|
atomic_store(&stream->proc->slowdown, slowdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) {
|
static struct _workers_pool_t *_stream_init_loop(struct stream_t *stream) {
|
||||||
struct workers_pool_t *pool = NULL;
|
struct _workers_pool_t *pool = NULL;
|
||||||
|
|
||||||
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
LOG_DEBUG("%s: stream->proc->stop = %d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||||
|
|
||||||
@@ -265,7 +309,7 @@ static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) {
|
|||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct workers_pool_t *_stream_init(struct stream_t *stream) {
|
static struct _workers_pool_t *_stream_init(struct stream_t *stream) {
|
||||||
if (device_open(stream->dev) < 0) {
|
if (device_open(stream->dev) < 0) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
@@ -304,8 +348,8 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
|
|||||||
# undef PICTURE
|
# undef PICTURE
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
static struct _workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
||||||
struct workers_pool_t *pool;
|
struct _workers_pool_t *pool;
|
||||||
|
|
||||||
LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers);
|
LOG_INFO("Creating pool with %u workers ...", stream->dev->run->n_workers);
|
||||||
|
|
||||||
@@ -341,7 +385,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
|||||||
WORKER(dev) = stream->dev;
|
WORKER(dev) = stream->dev;
|
||||||
WORKER(encoder) = stream->encoder;
|
WORKER(encoder) = stream->encoder;
|
||||||
|
|
||||||
A_THREAD_CREATE(&WORKER(tid), _worker_thread, (void *)&(pool->workers[number]));
|
A_THREAD_CREATE(&WORKER(tid), __worker_thread, (void *)&(pool->workers[number]));
|
||||||
|
|
||||||
pool->free_workers += 1;
|
pool->free_workers += 1;
|
||||||
|
|
||||||
@@ -350,7 +394,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
|
|||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _workers_pool_destroy(struct workers_pool_t *pool) {
|
static void _workers_pool_destroy(struct _workers_pool_t *pool) {
|
||||||
LOG_INFO("Destroying workers pool ...");
|
LOG_INFO("Destroying workers pool ...");
|
||||||
|
|
||||||
atomic_store(&pool->workers_stop, true);
|
atomic_store(&pool->workers_stop, true);
|
||||||
@@ -376,8 +420,8 @@ static void _workers_pool_destroy(struct workers_pool_t *pool) {
|
|||||||
free(pool);
|
free(pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *_worker_thread(void *v_worker) {
|
static void *__worker_thread(void *v_worker) {
|
||||||
struct worker_t *worker = (struct worker_t *)v_worker;
|
struct _worker_t *worker = (struct _worker_t *)v_worker;
|
||||||
|
|
||||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
|
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
|
||||||
|
|
||||||
@@ -442,8 +486,8 @@ static void *_worker_thread(void *v_worker) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) {
|
static struct _worker_t *_workers_pool_wait(struct _workers_pool_t *pool) {
|
||||||
struct worker_t *ready_worker = NULL;
|
struct _worker_t *ready_worker = NULL;
|
||||||
|
|
||||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||||
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
||||||
@@ -471,7 +515,7 @@ static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) {
|
|||||||
return ready_worker;
|
return ready_worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) {
|
static void _workers_pool_assign(struct _workers_pool_t *pool, struct _worker_t *ready_worker, unsigned buf_index) {
|
||||||
if (pool->oldest_worker == NULL) {
|
if (pool->oldest_worker == NULL) {
|
||||||
pool->oldest_worker = ready_worker;
|
pool->oldest_worker = ready_worker;
|
||||||
pool->latest_worker = pool->oldest_worker;
|
pool->latest_worker = pool->oldest_worker;
|
||||||
@@ -501,7 +545,7 @@ static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *r
|
|||||||
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
|
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
|
||||||
}
|
}
|
||||||
|
|
||||||
static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool) {
|
static long double _workers_pool_get_fluency_delay(struct _workers_pool_t *pool) {
|
||||||
long double sum_comp_time = 0;
|
long double sum_comp_time = 0;
|
||||||
long double avg_comp_time;
|
long double avg_comp_time;
|
||||||
long double min_delay;
|
long double min_delay;
|
||||||
|
|||||||
46
src/stream.h
46
src/stream.h
@@ -22,58 +22,12 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
|
|
||||||
#include "device.h"
|
#include "device.h"
|
||||||
#include "encoder.h"
|
#include "encoder.h"
|
||||||
|
|
||||||
|
|
||||||
struct worker_t {
|
|
||||||
pthread_t tid;
|
|
||||||
unsigned number;
|
|
||||||
atomic_bool *proc_stop;
|
|
||||||
atomic_bool *workers_stop;
|
|
||||||
|
|
||||||
pthread_mutex_t last_comp_time_mutex;
|
|
||||||
long double last_comp_time;
|
|
||||||
|
|
||||||
pthread_mutex_t has_job_mutex;
|
|
||||||
int buf_index;
|
|
||||||
atomic_bool has_job;
|
|
||||||
bool job_timely;
|
|
||||||
bool job_failed;
|
|
||||||
long double job_start_time;
|
|
||||||
pthread_cond_t has_job_cond;
|
|
||||||
|
|
||||||
pthread_mutex_t *free_workers_mutex;
|
|
||||||
unsigned *free_workers;
|
|
||||||
pthread_cond_t *free_workers_cond;
|
|
||||||
|
|
||||||
struct worker_t *order_prev;
|
|
||||||
struct worker_t *order_next;
|
|
||||||
|
|
||||||
struct device_t *dev;
|
|
||||||
struct encoder_t *encoder;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct workers_pool_t {
|
|
||||||
unsigned n_workers;
|
|
||||||
struct worker_t *workers;
|
|
||||||
struct worker_t *oldest_worker;
|
|
||||||
struct worker_t *latest_worker;
|
|
||||||
|
|
||||||
pthread_mutex_t free_workers_mutex;
|
|
||||||
unsigned free_workers;
|
|
||||||
pthread_cond_t free_workers_cond;
|
|
||||||
|
|
||||||
atomic_bool workers_stop;
|
|
||||||
|
|
||||||
long double desired_frames_interval;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct process_t {
|
struct process_t {
|
||||||
atomic_bool stop;
|
atomic_bool stop;
|
||||||
atomic_bool slowdown;
|
atomic_bool slowdown;
|
||||||
|
|||||||
Reference in New Issue
Block a user