refactoring

This commit is contained in:
Maxim Devaev 2024-04-04 19:32:45 +03:00
parent 1ed39790ba
commit 0d974a5faf
2 changed files with 53 additions and 52 deletions

View File

@ -22,12 +22,21 @@
#include "workers.h"
#include <stdatomic.h>
#include <pthread.h>
#include "../libs/types.h"
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
static void *_worker_thread(void *v_worker);
us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job) {
@ -49,23 +58,21 @@ us_workers_pool_s *us_workers_pool_init(
US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
WR(number) = number;
US_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
wr->number = index;
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);
US_MUTEX_INIT(WR(has_job_mutex));
atomic_init(&WR(has_job), false);
US_COND_INIT(WR(has_job_cond));
US_MUTEX_INIT(wr->has_job_mutex);
atomic_init(&wr->has_job, false);
US_COND_INIT(wr->has_job_cond);
WR(pool) = pool;
WR(job) = job_init(job_init_arg);
wr->pool = pool;
wr->job = job_init(job_init_arg);
US_THREAD_CREATE(WR(tid), _worker_thread, (void*)&(pool->workers[number]));
US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
pool->free_workers += 1;
# undef WR
}
return pool;
}
@ -74,23 +81,21 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_LOG_INFO("Destroying workers pool %s ...", pool->name);
atomic_store(&pool->stop, true);
for (unsigned number = 0; number < pool->n_workers; ++number) {
# define WR(x_next) pool->workers[number].x_next
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
US_MUTEX_LOCK(WR(has_job_mutex));
atomic_store(&WR(has_job), true); // Final job: die
US_MUTEX_UNLOCK(WR(has_job_mutex));
US_COND_SIGNAL(WR(has_job_cond));
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true); // Final job: die
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);
US_THREAD_JOIN(WR(tid));
US_MUTEX_DESTROY(WR(has_job_mutex));
US_COND_DESTROY(WR(has_job_cond));
US_THREAD_JOIN(wr->tid);
US_MUTEX_DESTROY(wr->has_job_mutex);
US_COND_DESTROY(wr->has_job_cond);
free(WR(name));
free(wr->name);
pool->job_destroy(WR(job));
# undef WR
pool->job_destroy(wr->job);
}
US_MUTEX_DESTROY(pool->free_workers_mutex);
@ -112,14 +117,15 @@ us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
ready_wr->job_timely = true;
pool->oldest_wr = pool->oldest_wr->next_wr;
} else {
for (unsigned number = 0; number < pool->n_workers; ++number) {
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
if (
!atomic_load(&pool->workers[number].has_job) && (
!atomic_load(&wr->has_job) && (
ready_wr == NULL
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
|| ready_wr->job_start_ts < wr->job_start_ts
)
) {
ready_wr = &pool->workers[number];
ready_wr = wr;
break;
}
}
@ -157,15 +163,15 @@ void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, vo
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}
long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
pool->approx_job_time = approx_job_time;
const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
const ldf min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
@ -176,7 +182,7 @@ long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_
}
static void *_worker_thread(void *v_worker) {
us_worker_s *wr = v_worker;
us_worker_s *const wr = v_worker;
US_THREAD_SETTLE("%s", wr->name);
US_LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
@ -189,7 +195,7 @@ static void *_worker_thread(void *v_worker) {
US_MUTEX_UNLOCK(wr->has_job_mutex);
if (!atomic_load(&wr->pool->stop)) {
const long double job_start_ts = us_get_now_monotonic();
const ldf job_start_ts = us_get_now_monotonic();
wr->job_failed = !wr->pool->run_job(wr);
if (!wr->job_failed) {
wr->job_start_ts = job_start_ts;

View File

@ -22,31 +22,26 @@
#pragma once
#include <stdbool.h>
#include <stdatomic.h>
#include <sys/types.h>
#include <pthread.h>
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/types.h"
typedef struct us_worker_sx {
pthread_t tid;
unsigned number;
char *name;
pthread_t tid;
uint number;
char *name;
long double last_job_time;
ldf last_job_time;
pthread_mutex_t has_job_mutex;
void *job;
atomic_bool has_job;
bool job_timely;
bool job_failed;
long double job_start_ts;
ldf job_start_ts;
pthread_cond_t has_job_cond;
struct us_worker_sx *prev_wr;
@ -61,20 +56,20 @@ typedef bool (*us_workers_pool_run_job_f)(us_worker_s *wr);
typedef struct us_workers_pool_sx {
const char *name;
long double desired_interval;
ldf desired_interval;
us_workers_pool_job_destroy_f job_destroy;
us_workers_pool_run_job_f run_job;
unsigned n_workers;
uint n_workers;
us_worker_s *workers;
us_worker_s *oldest_wr;
us_worker_s *latest_wr;
long double approx_job_time;
ldf approx_job_time;
pthread_mutex_t free_workers_mutex;
unsigned free_workers;
uint free_workers;
pthread_cond_t free_workers_cond;
atomic_bool stop;
@ -82,7 +77,7 @@ typedef struct us_workers_pool_sx {
us_workers_pool_s *us_workers_pool_init(
const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
const char *name, const char *wr_prefix, uint n_workers, ldf desired_interval,
us_workers_pool_job_init_f job_init, void *job_init_arg,
us_workers_pool_job_destroy_f job_destroy,
us_workers_pool_run_job_f run_job);
@ -92,4 +87,4 @@ void us_workers_pool_destroy(us_workers_pool_s *pool);
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);
long double us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);