reworked pool logic

This commit is contained in:
Maxim Devaev
2024-04-05 17:48:54 +03:00
parent fab4c47f17
commit 18038799f0
2 changed files with 36 additions and 64 deletions

View File

@@ -30,6 +30,7 @@
#include "../libs/tools.h"
#include "../libs/threading.h"
#include "../libs/logging.h"
#include "../libs/list.h"
static void *_worker_thread(void *v_worker);
@@ -53,13 +54,13 @@ us_workers_pool_s *us_workers_pool_init(
atomic_init(&pool->stop, false);
pool->n_workers = n_workers;
US_CALLOC(pool->workers, pool->n_workers);
US_MUTEX_INIT(pool->free_workers_mutex);
US_COND_INIT(pool->free_workers_cond);
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
us_worker_s *wr;
US_CALLOC(wr, 1);
wr->number = index;
US_ASPRINTF(wr->name, "%s-%u", wr_prefix, index);
@@ -73,6 +74,8 @@ us_workers_pool_s *us_workers_pool_init(
US_THREAD_CREATE(wr->tid, _worker_thread, (void*)wr);
pool->free_workers += 1;
US_LIST_APPEND(pool->workers, wr);
}
return pool;
}
@@ -81,9 +84,7 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_LOG_INFO("Destroying workers pool %s ...", pool->name);
atomic_store(&pool->stop, true);
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true); // Final job: die
US_MUTEX_UNLOCK(wr->has_job_mutex);
@@ -93,83 +94,56 @@ void us_workers_pool_destroy(us_workers_pool_s *pool) {
US_MUTEX_DESTROY(wr->has_job_mutex);
US_COND_DESTROY(wr->has_job_cond);
free(wr->name);
pool->job_destroy(wr->job);
}
free(wr->name);
free(wr);
});
US_MUTEX_DESTROY(pool->free_workers_mutex);
US_COND_DESTROY(pool->free_workers_cond);
free(pool->workers);
free(pool);
}
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool) {
us_worker_s *ready_wr = NULL;
US_MUTEX_LOCK(pool->free_workers_mutex);
US_COND_WAIT_FOR(pool->free_workers, pool->free_workers_cond, pool->free_workers_mutex);
US_MUTEX_UNLOCK(pool->free_workers_mutex);
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
ready_wr = pool->oldest_wr;
ready_wr->job_timely = true;
} else {
for (uint index = 0; index < pool->n_workers; ++index) {
us_worker_s *const wr = &pool->workers[index];
if (
!atomic_load(&wr->has_job) && (
ready_wr == NULL
|| ready_wr->job_start_ts < wr->job_start_ts
)
) {
ready_wr = wr;
break;
}
us_worker_s *found = NULL;
US_LIST_ITERATE(pool->workers, wr, { // cppcheck-suppress constStatement
if (!atomic_load(&wr->has_job) && (found == NULL || found->job_start_ts <= wr->job_start_ts)) {
found = wr;
}
assert(ready_wr != NULL);
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
});
assert(found != NULL);
US_LIST_REMOVE(pool->workers, found);
US_LIST_APPEND(pool->workers, found); // Перемещаем в конец списка
found->job_timely = (found->job_start_ts > pool->job_timely_ts);
if (found->job_timely) {
pool->job_timely_ts = found->job_start_ts;
}
return ready_wr;
return found;
}
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/) {
if (pool->oldest_wr == ready_wr) {
pool->oldest_wr = pool->oldest_wr->next_wr;
}
if (pool->oldest_wr == NULL) {
pool->oldest_wr = ready_wr;
pool->latest_wr = pool->oldest_wr;
} else {
if (ready_wr->next_wr != NULL) {
ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
}
if (ready_wr->prev_wr != NULL) {
ready_wr->prev_wr->next_wr = ready_wr->next_wr;
}
ready_wr->prev_wr = pool->latest_wr;
pool->latest_wr->next_wr = ready_wr;
pool->latest_wr = ready_wr;
}
pool->latest_wr->next_wr = NULL;
US_MUTEX_LOCK(ready_wr->has_job_mutex);
//ready_wr->job = job;
atomic_store(&ready_wr->has_job, true);
US_MUTEX_UNLOCK(ready_wr->has_job_mutex);
US_COND_SIGNAL(ready_wr->has_job_cond);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *wr) {
US_MUTEX_LOCK(wr->has_job_mutex);
atomic_store(&wr->has_job, true);
US_MUTEX_UNLOCK(wr->has_job_mutex);
US_COND_SIGNAL(wr->has_job_cond);
US_MUTEX_LOCK(pool->free_workers_mutex);
pool->free_workers -= 1;
US_MUTEX_UNLOCK(pool->free_workers_mutex);
}
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *wr) {
const ldf approx_job_time = pool->approx_job_time * 0.9 + wr->last_job_time * 0.1;
US_LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
pool->name, pool->approx_job_time, approx_job_time, wr->last_job_time);
pool->approx_job_time = approx_job_time;
@@ -203,7 +177,6 @@ static void *_worker_thread(void *v_worker) {
wr->job_start_ts = job_start_ts;
wr->last_job_time = us_get_now_monotonic() - wr->job_start_ts;
}
//wr->job = NULL;
atomic_store(&wr->has_job, false);
}

View File

@@ -27,6 +27,7 @@
#include <pthread.h>
#include "../libs/types.h"
#include "../libs/list.h"
typedef struct us_worker_sx {
@@ -44,10 +45,9 @@ typedef struct us_worker_sx {
ldf job_start_ts;
pthread_cond_t has_job_cond;
struct us_worker_sx *prev_wr;
struct us_worker_sx *next_wr;
struct us_workers_pool_sx *pool;
US_LIST_DECLARE;
} us_worker_s;
typedef void *(*us_workers_pool_job_init_f)(void *arg);
@@ -63,8 +63,7 @@ typedef struct us_workers_pool_sx {
uint n_workers;
us_worker_s *workers;
us_worker_s *oldest_wr;
us_worker_s *latest_wr;
ldf job_timely_ts;
ldf approx_job_time;
@@ -85,6 +84,6 @@ us_workers_pool_s *us_workers_pool_init(
void us_workers_pool_destroy(us_workers_pool_s *pool);
us_worker_s *us_workers_pool_wait(us_workers_pool_s *pool);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr/*, void *job*/);
void us_workers_pool_assign(us_workers_pool_s *pool, us_worker_s *ready_wr);
ldf us_workers_pool_get_fluency_delay(us_workers_pool_s *pool, const us_worker_s *ready_wr);