refactoring

This commit is contained in:
Devaev Maxim
2019-06-02 16:20:30 +03:00
parent 47378a17db
commit 3333fc56a3
2 changed files with 146 additions and 139 deletions

View File

@@ -46,18 +46,19 @@
#endif
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index);
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool);
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream);
static struct workers_pool_t *_stream_init(struct stream_t *stream);
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index);
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream);
static void *_stream_worker_thread(void *v_worker);
static void _workers_pool_destroy(struct workers_pool_t *pool);
static void *_worker_thread(void *v_worker);
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool);
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index);
static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool);
struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
struct process_t *proc;
@@ -191,7 +192,7 @@ void stream_loop(struct stream_t *stream) {
}
captured_fps_accum += 1;
long double fluency_delay = _stream_get_fluency_delay(stream->dev, pool);
long double fluency_delay = _workers_pool_get_fluency_delay(pool);
grab_after = now + fluency_delay;
LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after);
@@ -246,119 +247,6 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
atomic_store(&stream->proc->slowdown, slowdown);
}
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) {
struct worker_t *ready_worker = NULL;
A_MUTEX_LOCK(&pool->free_workers_mutex);
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
ready_worker = pool->oldest_worker;
ready_worker->job_timely = true;
pool->oldest_worker = pool->oldest_worker->order_next;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
ready_worker == NULL
|| ready_worker->job_start_time < pool->workers[number].job_start_time
)
) {
ready_worker = &pool->workers[number];
break;
}
}
assert(ready_worker != NULL);
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
}
return ready_worker;
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
A_MUTEX_LOCK(&stream->mutex);
stream->picture.used = PICTURE(used);
stream->picture.allocated = PICTURE(allocated);
memcpy(stream->picture.data, PICTURE(data), stream->picture.used);
stream->picture.grab_time = PICTURE(grab_time);
stream->picture.encode_begin_time = PICTURE(encode_begin_time);
stream->picture.encode_end_time = PICTURE(encode_end_time);
stream->width = stream->dev->run->width;
stream->height = stream->dev->run->height;
atomic_store(&stream->updated, true);
A_MUTEX_UNLOCK(&stream->mutex);
# undef PICTURE
}
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) {
if (pool->oldest_worker == NULL) {
pool->oldest_worker = ready_worker;
pool->latest_worker = pool->oldest_worker;
} else {
if (ready_worker->order_next) {
ready_worker->order_next->order_prev = ready_worker->order_prev;
}
if (ready_worker->order_prev) {
ready_worker->order_prev->order_next = ready_worker->order_next;
}
ready_worker->order_prev = pool->latest_worker;
pool->latest_worker->order_next = ready_worker;
pool->latest_worker = ready_worker;
}
pool->latest_worker->order_next = NULL;
A_MUTEX_LOCK(&ready_worker->has_job_mutex);
ready_worker->buf_index = buf_index;
atomic_store(&ready_worker->has_job, true);
A_MUTEX_UNLOCK(&ready_worker->has_job_mutex);
A_COND_SIGNAL(&ready_worker->has_job_cond);
A_MUTEX_LOCK(&pool->free_workers_mutex);
pool->free_workers -= 1;
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
}
static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) {
long double sum_comp_time = 0;
long double avg_comp_time;
long double min_delay;
long double soft_delay;
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time);
}
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER
}
avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров
min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
if (dev->desired_fps > 0 && min_delay > 0) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
soft_delay = ((long double) 1) / dev->desired_fps - sum_comp_time;
return (min_delay > soft_delay ? min_delay : soft_delay);
}
return min_delay;
}
static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) {
struct workers_pool_t *pool = NULL;
@@ -393,6 +281,29 @@ static struct workers_pool_t *_stream_init(struct stream_t *stream) {
return NULL;
}
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
A_MUTEX_LOCK(&stream->mutex);
stream->picture.used = PICTURE(used);
stream->picture.allocated = PICTURE(allocated);
memcpy(stream->picture.data, PICTURE(data), stream->picture.used);
stream->picture.grab_time = PICTURE(grab_time);
stream->picture.encode_begin_time = PICTURE(encode_begin_time);
stream->picture.encode_end_time = PICTURE(encode_end_time);
stream->width = stream->dev->run->width;
stream->height = stream->dev->run->height;
atomic_store(&stream->updated, true);
A_MUTEX_UNLOCK(&stream->mutex);
# undef PICTURE
}
static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
struct workers_pool_t *pool;
@@ -403,11 +314,15 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
pool->n_workers = stream->dev->run->n_workers;
A_CALLOC(pool->workers, pool->n_workers);
atomic_init(&pool->workers_stop, false);
A_MUTEX_INIT(&pool->free_workers_mutex);
A_COND_INIT(&pool->free_workers_cond);
atomic_init(&pool->workers_stop, false);
if (stream->dev->desired_fps > 0) {
pool->desired_frames_interval = (long double)1 / stream->dev->desired_fps;
}
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
@@ -426,7 +341,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
WORKER(dev) = stream->dev;
WORKER(encoder) = stream->encoder;
A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number]));
A_THREAD_CREATE(&WORKER(tid), _worker_thread, (void *)&(pool->workers[number]));
pool->free_workers += 1;
@@ -435,7 +350,33 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) {
return pool;
}
static void *_stream_worker_thread(void *v_worker) {
static void _workers_pool_destroy(struct workers_pool_t *pool) {
LOG_INFO("Destroying workers pool ...");
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(has_job_mutex));
atomic_store(&WORKER(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_COND_SIGNAL(&WORKER(has_job_cond));
A_THREAD_JOIN(WORKER(tid));
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_COND_DESTROY(&WORKER(has_job_cond));
# undef WORKER
}
A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_COND_DESTROY(&pool->free_workers_cond);
free(pool->workers);
free(pool);
}
static void *_worker_thread(void *v_worker) {
struct worker_t *worker = (struct worker_t *)v_worker;
LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number);
@@ -501,28 +442,92 @@ static void *_stream_worker_thread(void *v_worker) {
return NULL;
}
static void _workers_pool_destroy(struct workers_pool_t *pool) {
LOG_INFO("Destroying workers pool ...");
static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) {
struct worker_t *ready_worker = NULL;
A_MUTEX_LOCK(&pool->free_workers_mutex);
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) {
ready_worker = pool->oldest_worker;
ready_worker->job_timely = true;
pool->oldest_worker = pool->oldest_worker->order_next;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
if (
!atomic_load(&pool->workers[number].has_job) && (
ready_worker == NULL
|| ready_worker->job_start_time < pool->workers[number].job_start_time
)
) {
ready_worker = &pool->workers[number];
break;
}
}
assert(ready_worker != NULL);
ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
}
return ready_worker;
}
static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) {
if (pool->oldest_worker == NULL) {
pool->oldest_worker = ready_worker;
pool->latest_worker = pool->oldest_worker;
} else {
if (ready_worker->order_next) {
ready_worker->order_next->order_prev = ready_worker->order_prev;
}
if (ready_worker->order_prev) {
ready_worker->order_prev->order_next = ready_worker->order_next;
}
ready_worker->order_prev = pool->latest_worker;
pool->latest_worker->order_next = ready_worker;
pool->latest_worker = ready_worker;
}
pool->latest_worker->order_next = NULL;
A_MUTEX_LOCK(&ready_worker->has_job_mutex);
ready_worker->buf_index = buf_index;
atomic_store(&ready_worker->has_job, true);
A_MUTEX_UNLOCK(&ready_worker->has_job_mutex);
A_COND_SIGNAL(&ready_worker->has_job_cond);
A_MUTEX_LOCK(&pool->free_workers_mutex);
pool->free_workers -= 1;
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number);
}
static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool) {
long double sum_comp_time = 0;
long double avg_comp_time;
long double min_delay;
long double soft_delay;
atomic_store(&pool->workers_stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next
A_MUTEX_LOCK(&WORKER(has_job_mutex));
atomic_store(&WORKER(has_job), true); // Final job: die
A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_COND_SIGNAL(&WORKER(has_job_cond));
A_THREAD_JOIN(WORKER(tid));
A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_COND_DESTROY(&WORKER(has_job_cond));
A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time);
}
A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER
}
A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_COND_DESTROY(&pool->free_workers_cond);
avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров
free(pool->workers);
free(pool);
min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
if (pool->desired_frames_interval > 0 && min_delay > 0) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
soft_delay = pool->desired_frames_interval - sum_comp_time;
return (min_delay > soft_delay ? min_delay : soft_delay);
}
return min_delay;
}

View File

@@ -70,6 +70,8 @@ struct workers_pool_t {
pthread_cond_t free_workers_cond;
atomic_bool workers_stop;
long double desired_frames_interval;
};
struct process_t {