diff --git a/src/stream.c b/src/stream.c index f5517ae..289d0a6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -46,18 +46,19 @@ #endif -static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool); -static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); -static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index); -static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool); - static struct workers_pool_t *_stream_init_loop(struct stream_t *stream); static struct workers_pool_t *_stream_init(struct stream_t *stream); +static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index); static struct workers_pool_t *_workers_pool_init(struct stream_t *stream); -static void *_stream_worker_thread(void *v_worker); static void _workers_pool_destroy(struct workers_pool_t *pool); +static void *_worker_thread(void *v_worker); + +static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool); +static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index); +static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool); + struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) { struct process_t *proc; @@ -191,7 +192,7 @@ void stream_loop(struct stream_t *stream) { } captured_fps_accum += 1; - long double fluency_delay = _stream_get_fluency_delay(stream->dev, pool); + long double fluency_delay = _workers_pool_get_fluency_delay(pool); grab_after = now + fluency_delay; LOG_VERBOSE("Fluency: delay=%.03Lf; grab_after=%.03Lf", fluency_delay, grab_after); @@ -246,119 +247,6 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) { atomic_store(&stream->proc->slowdown, slowdown); } -static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) { - struct worker_t *ready_worker = NULL; - - A_MUTEX_LOCK(&pool->free_workers_mutex); - A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); - A_MUTEX_UNLOCK(&pool->free_workers_mutex); - - if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) { - ready_worker = pool->oldest_worker; - ready_worker->job_timely = true; - pool->oldest_worker = pool->oldest_worker->order_next; - } else { - for (unsigned number = 0; number < pool->n_workers; ++number) { - if ( - !atomic_load(&pool->workers[number].has_job) && ( - ready_worker == NULL - || ready_worker->job_start_time < pool->workers[number].job_start_time - ) - ) { - ready_worker = &pool->workers[number]; - break; - } - } - assert(ready_worker != NULL); - ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) - } - return ready_worker; -} - -static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { -# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next - - A_MUTEX_LOCK(&stream->mutex); - - stream->picture.used = PICTURE(used); - stream->picture.allocated = PICTURE(allocated); - - memcpy(stream->picture.data, PICTURE(data), stream->picture.used); - - stream->picture.grab_time = PICTURE(grab_time); - stream->picture.encode_begin_time = PICTURE(encode_begin_time); - stream->picture.encode_end_time = PICTURE(encode_end_time); - - stream->width = stream->dev->run->width; - stream->height = stream->dev->run->height; - atomic_store(&stream->updated, true); - - A_MUTEX_UNLOCK(&stream->mutex); - -# undef PICTURE -} - -static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) { - if (pool->oldest_worker == NULL) { - pool->oldest_worker = ready_worker; - pool->latest_worker = pool->oldest_worker; - } else { - if (ready_worker->order_next) { - ready_worker->order_next->order_prev = ready_worker->order_prev; - } - if (ready_worker->order_prev) { - ready_worker->order_prev->order_next = ready_worker->order_next; - } - ready_worker->order_prev = pool->latest_worker; - pool->latest_worker->order_next = ready_worker; - pool->latest_worker = ready_worker; - } - pool->latest_worker->order_next = NULL; - - A_MUTEX_LOCK(&ready_worker->has_job_mutex); - ready_worker->buf_index = buf_index; - atomic_store(&ready_worker->has_job, true); - A_MUTEX_UNLOCK(&ready_worker->has_job_mutex); - A_COND_SIGNAL(&ready_worker->has_job_cond); - - A_MUTEX_LOCK(&pool->free_workers_mutex); - pool->free_workers -= 1; - A_MUTEX_UNLOCK(&pool->free_workers_mutex); - - LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number); -} - -static long double _stream_get_fluency_delay(struct device_t *dev, struct workers_pool_t *pool) { - long double sum_comp_time = 0; - long double avg_comp_time; - long double min_delay; - long double soft_delay; - - for (unsigned number = 0; number < pool->n_workers; ++number) { -# define WORKER(_next) pool->workers[number]._next - - A_MUTEX_LOCK(&WORKER(last_comp_time_mutex)); - if (WORKER(last_comp_time) > 0) { - sum_comp_time += WORKER(last_comp_time); - } - A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex)); - -# undef WORKER - } - - avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров - - min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров - - if (dev->desired_fps > 0 && min_delay > 0) { - // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps - soft_delay = ((long double) 1) / dev->desired_fps - sum_comp_time; - return (min_delay > soft_delay ? min_delay : soft_delay); - } - - return min_delay; -} - static struct workers_pool_t *_stream_init_loop(struct stream_t *stream) { struct workers_pool_t *pool = NULL; @@ -393,6 +281,29 @@ static struct workers_pool_t *_stream_init(struct stream_t *stream) { return NULL; } +static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { +# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next + + A_MUTEX_LOCK(&stream->mutex); + + stream->picture.used = PICTURE(used); + stream->picture.allocated = PICTURE(allocated); + + memcpy(stream->picture.data, PICTURE(data), stream->picture.used); + + stream->picture.grab_time = PICTURE(grab_time); + stream->picture.encode_begin_time = PICTURE(encode_begin_time); + stream->picture.encode_end_time = PICTURE(encode_end_time); + + stream->width = stream->dev->run->width; + stream->height = stream->dev->run->height; + atomic_store(&stream->updated, true); + + A_MUTEX_UNLOCK(&stream->mutex); + +# undef PICTURE +} + static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { struct workers_pool_t *pool; @@ -403,11 +314,15 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { pool->n_workers = stream->dev->run->n_workers; A_CALLOC(pool->workers, pool->n_workers); - atomic_init(&pool->workers_stop, false); - A_MUTEX_INIT(&pool->free_workers_mutex); A_COND_INIT(&pool->free_workers_cond); + atomic_init(&pool->workers_stop, false); + + if (stream->dev->desired_fps > 0) { + pool->desired_frames_interval = (long double)1 / stream->dev->desired_fps; + } + for (unsigned number = 0; number < pool->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next @@ -426,7 +341,7 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { WORKER(dev) = stream->dev; WORKER(encoder) = stream->encoder; - A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&(pool->workers[number])); + A_THREAD_CREATE(&WORKER(tid), _worker_thread, (void *)&(pool->workers[number])); pool->free_workers += 1; @@ -435,7 +350,33 @@ static struct workers_pool_t *_workers_pool_init(struct stream_t *stream) { return pool; } -static void *_stream_worker_thread(void *v_worker) { +static void _workers_pool_destroy(struct workers_pool_t *pool) { + LOG_INFO("Destroying workers pool ..."); + + atomic_store(&pool->workers_stop, true); + for (unsigned number = 0; number < pool->n_workers; ++number) { +# define WORKER(_next) pool->workers[number]._next + + A_MUTEX_LOCK(&WORKER(has_job_mutex)); + atomic_store(&WORKER(has_job), true); // Final job: die + A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); + A_COND_SIGNAL(&WORKER(has_job_cond)); + + A_THREAD_JOIN(WORKER(tid)); + A_MUTEX_DESTROY(&WORKER(has_job_mutex)); + A_COND_DESTROY(&WORKER(has_job_cond)); + +# undef WORKER + } + + A_MUTEX_DESTROY(&pool->free_workers_mutex); + A_COND_DESTROY(&pool->free_workers_cond); + + free(pool->workers); + free(pool); +} + +static void *_worker_thread(void *v_worker) { struct worker_t *worker = (struct worker_t *)v_worker; LOG_DEBUG("Hello! I am a worker #%u ^_^", worker->number); @@ -501,28 +442,92 @@ static void *_stream_worker_thread(void *v_worker) { return NULL; } -static void _workers_pool_destroy(struct workers_pool_t *pool) { - LOG_INFO("Destroying workers pool ..."); +static struct worker_t *_workers_pool_wait(struct workers_pool_t *pool) { + struct worker_t *ready_worker = NULL; + + A_MUTEX_LOCK(&pool->free_workers_mutex); + A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); + A_MUTEX_UNLOCK(&pool->free_workers_mutex); + + if (pool->oldest_worker && !atomic_load(&pool->oldest_worker->has_job) && pool->oldest_worker->buf_index >= 0) { + ready_worker = pool->oldest_worker; + ready_worker->job_timely = true; + pool->oldest_worker = pool->oldest_worker->order_next; + } else { + for (unsigned number = 0; number < pool->n_workers; ++number) { + if ( + !atomic_load(&pool->workers[number].has_job) && ( + ready_worker == NULL + || ready_worker->job_start_time < pool->workers[number].job_start_time + ) + ) { + ready_worker = &pool->workers[number]; + break; + } + } + assert(ready_worker != NULL); + ready_worker->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + } + return ready_worker; +} + +static void _workers_pool_assign(struct workers_pool_t *pool, struct worker_t *ready_worker, unsigned buf_index) { + if (pool->oldest_worker == NULL) { + pool->oldest_worker = ready_worker; + pool->latest_worker = pool->oldest_worker; + } else { + if (ready_worker->order_next) { + ready_worker->order_next->order_prev = ready_worker->order_prev; + } + if (ready_worker->order_prev) { + ready_worker->order_prev->order_next = ready_worker->order_next; + } + ready_worker->order_prev = pool->latest_worker; + pool->latest_worker->order_next = ready_worker; + pool->latest_worker = ready_worker; + } + pool->latest_worker->order_next = NULL; + + A_MUTEX_LOCK(&ready_worker->has_job_mutex); + ready_worker->buf_index = buf_index; + atomic_store(&ready_worker->has_job, true); + A_MUTEX_UNLOCK(&ready_worker->has_job_mutex); + A_COND_SIGNAL(&ready_worker->has_job_cond); + + A_MUTEX_LOCK(&pool->free_workers_mutex); + pool->free_workers -= 1; + A_MUTEX_UNLOCK(&pool->free_workers_mutex); + + LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_worker->number); +} + +static long double _workers_pool_get_fluency_delay(struct workers_pool_t *pool) { + long double sum_comp_time = 0; + long double avg_comp_time; + long double min_delay; + long double soft_delay; - atomic_store(&pool->workers_stop, true); for (unsigned number = 0; number < pool->n_workers; ++number) { # define WORKER(_next) pool->workers[number]._next - A_MUTEX_LOCK(&WORKER(has_job_mutex)); - atomic_store(&WORKER(has_job), true); // Final job: die - A_MUTEX_UNLOCK(&WORKER(has_job_mutex)); - A_COND_SIGNAL(&WORKER(has_job_cond)); - - A_THREAD_JOIN(WORKER(tid)); - A_MUTEX_DESTROY(&WORKER(has_job_mutex)); - A_COND_DESTROY(&WORKER(has_job_cond)); + A_MUTEX_LOCK(&WORKER(last_comp_time_mutex)); + if (WORKER(last_comp_time) > 0) { + sum_comp_time += WORKER(last_comp_time); + } + A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex)); # undef WORKER } - A_MUTEX_DESTROY(&pool->free_workers_mutex); - A_COND_DESTROY(&pool->free_workers_cond); + avg_comp_time = sum_comp_time / pool->n_workers; // Среднее время работы воркеров - free(pool->workers); - free(pool); + min_delay = avg_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров + + if (pool->desired_frames_interval > 0 && min_delay > 0) { + // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps + soft_delay = pool->desired_frames_interval - sum_comp_time; + return (min_delay > soft_delay ? min_delay : soft_delay); + } + + return min_delay; } diff --git a/src/stream.h b/src/stream.h index 8f6e9dd..6da9423 100644 --- a/src/stream.h +++ b/src/stream.h @@ -70,6 +70,8 @@ struct workers_pool_t { pthread_cond_t free_workers_cond; atomic_bool workers_stop; + + long double desired_frames_interval; }; struct process_t {