mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-01 21:26:33 +00:00
separated workers pool
This commit is contained in:
@@ -23,64 +23,22 @@
|
||||
#include "stream.h"
|
||||
|
||||
|
||||
typedef struct _worker_sx {
|
||||
pthread_t tid;
|
||||
unsigned number;
|
||||
atomic_bool *proc_stop;
|
||||
atomic_bool *workers_stop;
|
||||
|
||||
long double last_comp_time;
|
||||
char *frame_role;
|
||||
frame_s *frame;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
unsigned buf_index;
|
||||
atomic_bool has_job;
|
||||
bool job_timely;
|
||||
bool job_failed;
|
||||
long double job_start_ts;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
pthread_mutex_t *free_workers_mutex;
|
||||
unsigned *free_workers;
|
||||
pthread_cond_t *free_workers_cond;
|
||||
|
||||
struct _worker_sx *order_prev;
|
||||
struct _worker_sx *order_next;
|
||||
|
||||
stream_s *stream;
|
||||
} _worker_s;
|
||||
|
||||
typedef struct {
|
||||
unsigned n_workers;
|
||||
_worker_s *workers;
|
||||
_worker_s *oldest_wr;
|
||||
_worker_s *latest_wr;
|
||||
|
||||
long double approx_comp_time;
|
||||
|
||||
pthread_mutex_t free_workers_mutex;
|
||||
unsigned free_workers;
|
||||
pthread_cond_t free_workers_cond;
|
||||
|
||||
atomic_bool workers_stop;
|
||||
|
||||
long double desired_frames_interval;
|
||||
} _pool_s;
|
||||
device_s *dev;
|
||||
encoder_s *encoder;
|
||||
unsigned buf_index;
|
||||
char *dest_role;
|
||||
frame_s *dest;
|
||||
} _job_s;
|
||||
|
||||
|
||||
static _pool_s *_stream_init_loop(stream_s *stream);
|
||||
static _pool_s *_stream_init_one(stream_s *stream);
|
||||
static workers_pool_s *_stream_init_loop(stream_s *stream);
|
||||
static workers_pool_s *_stream_init_one(stream_s *stream);
|
||||
static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned captured_fps);
|
||||
|
||||
static _pool_s *_workers_pool_init(stream_s *stream);
|
||||
static void _workers_pool_destroy(_pool_s *pool);
|
||||
|
||||
static void *_worker_thread(void *v_worker);
|
||||
|
||||
static _worker_s *_workers_pool_wait(_pool_s *pool);
|
||||
static void _workers_pool_assign(_pool_s *pool, _worker_s *ready_wr, unsigned buf_index);
|
||||
static long double _workers_pool_get_fluency_delay(_pool_s *pool, _worker_s *ready_wr);
|
||||
static void *_worker_job_init(worker_s *wr);
|
||||
static void _worker_job_destroy(void *v_job);
|
||||
static bool _worker_run_job(worker_s *wr);
|
||||
|
||||
|
||||
stream_s *stream_init(device_s *dev, encoder_s *encoder) {
|
||||
@@ -120,7 +78,7 @@ void stream_loop(stream_s *stream) {
|
||||
LOG_INFO("Using V4L2 device: %s", DEV(path));
|
||||
LOG_INFO("Using desired FPS: %u", DEV(desired_fps));
|
||||
|
||||
for (_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) {
|
||||
for (workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) {
|
||||
long double grab_after = 0;
|
||||
unsigned fluency_passed = 0;
|
||||
unsigned captured_fps = 0;
|
||||
@@ -133,14 +91,14 @@ void stream_loop(stream_s *stream) {
|
||||
SEP_DEBUG('-');
|
||||
LOG_DEBUG("Waiting for worker ...");
|
||||
|
||||
_worker_s *ready_wr = _workers_pool_wait(pool);
|
||||
worker_s *ready_wr = workers_pool_wait(pool);
|
||||
|
||||
if (!ready_wr->job_failed) {
|
||||
if (ready_wr->job_timely) {
|
||||
_stream_expose_frame(stream, ready_wr->frame, captured_fps);
|
||||
LOG_PERF("##### Encoded frame exposed; worker=%u", ready_wr->number);
|
||||
_stream_expose_frame(stream, ((_job_s *)(ready_wr->job))->dest, captured_fps);
|
||||
LOG_PERF("##### Encoded frame exposed; worker=%s", ready_wr->name);
|
||||
} else {
|
||||
LOG_PERF("----- Encoded frame dropped; worker=%u", ready_wr->number);
|
||||
LOG_PERF("----- Encoded frame dropped; worker=%s", ready_wr->name);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
@@ -199,18 +157,25 @@ void stream_loop(stream_s *stream) {
|
||||
}
|
||||
captured_fps_accum += 1;
|
||||
|
||||
const long double fluency_delay = _workers_pool_get_fluency_delay(pool, ready_wr);
|
||||
const long double fluency_delay = workers_pool_get_fluency_delay(pool, ready_wr);
|
||||
grab_after = now + fluency_delay;
|
||||
LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
|
||||
|
||||
# ifdef WITH_MEMSINK
|
||||
if (stream->raw_sink && memsink_server_put(stream->raw_sink, &DEV(run->hw_buffers[buf_index].raw)) < 0) {
|
||||
stream->raw_sink = NULL;
|
||||
LOG_ERROR("RAW sink completely disabled due error");
|
||||
if (stream->raw_sink) {
|
||||
if (memsink_server_put(stream->raw_sink, &DEV(run->hw_buffers[buf_index].raw)) < 0) {
|
||||
stream->raw_sink = NULL;
|
||||
LOG_ERROR("RAW sink completely disabled due error");
|
||||
}
|
||||
}
|
||||
# endif
|
||||
|
||||
_workers_pool_assign(pool, ready_wr, buf_index);
|
||||
_job_s *job = (_job_s *)ready_wr->job;
|
||||
job->dev = stream->dev;
|
||||
job->encoder = stream->encoder;
|
||||
job->buf_index = buf_index;
|
||||
workers_pool_assign(pool, ready_wr);
|
||||
LOG_DEBUG("Assigned new frame in buffer %u to worker %s", buf_index, ready_wr->name);
|
||||
}
|
||||
} else if (buf_index != -2) { // -2 for broken frame
|
||||
break;
|
||||
@@ -231,7 +196,7 @@ void stream_loop(stream_s *stream) {
|
||||
}
|
||||
}
|
||||
|
||||
_workers_pool_destroy(pool);
|
||||
workers_pool_destroy(pool);
|
||||
device_switch_capturing(stream->dev, false);
|
||||
device_close(stream->dev);
|
||||
|
||||
@@ -251,8 +216,8 @@ void stream_switch_slowdown(stream_s *stream, bool slowdown) {
|
||||
atomic_store(&stream->proc->slowdown, slowdown);
|
||||
}
|
||||
|
||||
static _pool_s *_stream_init_loop(stream_s *stream) {
|
||||
_pool_s *pool = NULL;
|
||||
static workers_pool_s *_stream_init_loop(stream_s *stream) {
|
||||
workers_pool_s *pool = NULL;
|
||||
int access_error = 0;
|
||||
|
||||
LOG_DEBUG("%s: stream->proc->stop=%d", __FUNCTION__, atomic_load(&stream->proc->stop));
|
||||
@@ -260,9 +225,11 @@ static _pool_s *_stream_init_loop(stream_s *stream) {
|
||||
while (!atomic_load(&stream->proc->stop)) {
|
||||
if (_stream_expose_frame(stream, NULL, 0)) {
|
||||
# ifdef WITH_MEMSINK
|
||||
if (stream->raw_sink && memsink_server_put(stream->raw_sink, stream->blank) < 0) {
|
||||
stream->raw_sink = NULL;
|
||||
LOG_ERROR("RAW sink completely disabled due error");
|
||||
if (stream->raw_sink) {
|
||||
if (memsink_server_put(stream->raw_sink, stream->blank) < 0) {
|
||||
stream->raw_sink = NULL;
|
||||
LOG_ERROR("RAW sink completely disabled due error");
|
||||
}
|
||||
}
|
||||
# endif
|
||||
}
|
||||
@@ -291,7 +258,7 @@ static _pool_s *_stream_init_loop(stream_s *stream) {
|
||||
return pool;
|
||||
}
|
||||
|
||||
static _pool_s *_stream_init_one(stream_s *stream) {
|
||||
static workers_pool_s *_stream_init_one(stream_s *stream) {
|
||||
if (device_open(stream->dev) < 0) {
|
||||
goto error;
|
||||
}
|
||||
@@ -300,7 +267,19 @@ static _pool_s *_stream_init_one(stream_s *stream) {
|
||||
}
|
||||
|
||||
encoder_prepare(stream->encoder, stream->dev);
|
||||
return _workers_pool_init(stream);
|
||||
|
||||
# define DEV(_next) stream->dev->_next
|
||||
# define RUN(_next) stream->dev->run->_next
|
||||
long double desired_interval = 0;
|
||||
if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) {
|
||||
desired_interval = (long double)1 / DEV(desired_fps);
|
||||
}
|
||||
# undef DEV
|
||||
# undef RUN
|
||||
|
||||
return workers_pool_init(
|
||||
"jpeg", stream->encoder->run->n_workers, desired_interval,
|
||||
_worker_job_init, _worker_job_destroy, _worker_run_job);
|
||||
|
||||
error:
|
||||
device_close(stream->dev);
|
||||
@@ -364,214 +343,46 @@ static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned capt
|
||||
# undef VID
|
||||
}
|
||||
|
||||
static _pool_s *_workers_pool_init(stream_s *stream) {
|
||||
# define DEV(_next) stream->dev->_next
|
||||
# define RUN(_next) stream->dev->run->_next
|
||||
static void *_worker_job_init(worker_s *wr) {
|
||||
_job_s *job;
|
||||
A_CALLOC(job, 1);
|
||||
|
||||
LOG_INFO("Creating pool with %u workers ...", stream->encoder->run->n_workers);
|
||||
const size_t dest_role_len = strlen(wr->name) + 16;
|
||||
A_CALLOC(job->dest_role, dest_role_len);
|
||||
snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name);
|
||||
job->dest = frame_init(job->dest_role);
|
||||
|
||||
_pool_s *pool;
|
||||
A_CALLOC(pool, 1);
|
||||
|
||||
pool->n_workers = stream->encoder->run->n_workers;
|
||||
A_CALLOC(pool->workers, pool->n_workers);
|
||||
|
||||
A_MUTEX_INIT(&pool->free_workers_mutex);
|
||||
A_COND_INIT(&pool->free_workers_cond);
|
||||
|
||||
atomic_init(&pool->workers_stop, false);
|
||||
|
||||
if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) {
|
||||
pool->desired_frames_interval = (long double)1 / DEV(desired_fps);
|
||||
}
|
||||
|
||||
# undef RUN
|
||||
# undef DEV
|
||||
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WR(_next) pool->workers[number]._next
|
||||
|
||||
A_CALLOC(WR(frame_role), 32);
|
||||
sprintf(WR(frame_role), "worker_dest_%u", number);
|
||||
WR(frame) = frame_init(WR(frame_role));
|
||||
|
||||
A_MUTEX_INIT(&WR(has_job_mutex));
|
||||
atomic_init(&WR(has_job), false);
|
||||
A_COND_INIT(&WR(has_job_cond));
|
||||
|
||||
WR(number) = number;
|
||||
WR(proc_stop) = &stream->proc->stop;
|
||||
WR(workers_stop) = &pool->workers_stop;
|
||||
|
||||
WR(free_workers_mutex) = &pool->free_workers_mutex;
|
||||
WR(free_workers) = &pool->free_workers;
|
||||
WR(free_workers_cond) = &pool->free_workers_cond;
|
||||
|
||||
WR(stream) = stream;
|
||||
|
||||
A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
|
||||
|
||||
pool->free_workers += 1;
|
||||
|
||||
# undef WR
|
||||
}
|
||||
return pool;
|
||||
return (void *)job;
|
||||
}
|
||||
static void _worker_job_destroy(void *v_job) {
|
||||
_job_s *job = (_job_s *)v_job;
|
||||
frame_destroy(job->dest);
|
||||
free(job->dest_role);
|
||||
free(job);
|
||||
}
|
||||
|
||||
static void _workers_pool_destroy(_pool_s *pool) {
|
||||
LOG_INFO("Destroying workers pool ...");
|
||||
static bool _worker_run_job(worker_s *wr) {
|
||||
_job_s *job = (_job_s *)wr->job;
|
||||
|
||||
atomic_store(&pool->workers_stop, true);
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WR(_next) pool->workers[number]._next
|
||||
LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->buf_index);
|
||||
bool ok = !encoder_compress(
|
||||
job->encoder,
|
||||
wr->number,
|
||||
&job->dev->run->hw_buffers[job->buf_index].raw,
|
||||
job->dest);
|
||||
|
||||
A_MUTEX_LOCK(&WR(has_job_mutex));
|
||||
atomic_store(&WR(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WR(has_job_mutex));
|
||||
A_COND_SIGNAL(&WR(has_job_cond));
|
||||
|
||||
A_THREAD_JOIN(WR(tid));
|
||||
A_MUTEX_DESTROY(&WR(has_job_mutex));
|
||||
A_COND_DESTROY(&WR(has_job_cond));
|
||||
|
||||
frame_destroy(WR(frame));
|
||||
free(WR(frame_role));
|
||||
|
||||
# undef WR
|
||||
}
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
free(pool);
|
||||
}
|
||||
|
||||
static void *_worker_thread(void *v_worker) {
|
||||
_worker_s *wr = (_worker_s *)v_worker;
|
||||
|
||||
A_THREAD_RENAME("worker-%u", wr->number);
|
||||
LOG_DEBUG("Hello! I am a worker #%u ^_^", wr->number);
|
||||
|
||||
while (!atomic_load(wr->proc_stop) && !atomic_load(wr->workers_stop)) {
|
||||
LOG_DEBUG("Worker %u waiting for a new job ...", wr->number);
|
||||
|
||||
A_MUTEX_LOCK(&wr->has_job_mutex);
|
||||
A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
|
||||
A_MUTEX_UNLOCK(&wr->has_job_mutex);
|
||||
|
||||
if (!atomic_load(wr->workers_stop)) {
|
||||
# define PIC(_next) wr->frame->_next
|
||||
|
||||
LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", wr->number, wr->buf_index);
|
||||
wr->job_failed = (bool)encoder_compress(
|
||||
wr->stream->encoder,
|
||||
wr->number,
|
||||
&wr->stream->dev->run->hw_buffers[wr->buf_index].raw,
|
||||
wr->frame
|
||||
);
|
||||
|
||||
if (device_release_buffer(wr->stream->dev, wr->buf_index) == 0) {
|
||||
if (!wr->job_failed) {
|
||||
wr->job_start_ts = PIC(encode_begin_ts);
|
||||
wr->last_comp_time = PIC(encode_end_ts) - wr->job_start_ts;
|
||||
LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%u, buffer=%u",
|
||||
PIC(used), wr->last_comp_time, wr->number, wr->buf_index);
|
||||
} else {
|
||||
LOG_VERBOSE("Compression failed: worker=%u, buffer=%u", wr->number, wr->buf_index);
|
||||
}
|
||||
} else {
|
||||
wr->job_failed = true;
|
||||
}
|
||||
|
||||
atomic_store(&wr->has_job, false);
|
||||
|
||||
# undef PIC
|
||||
if (device_release_buffer(job->dev, job->buf_index) == 0) {
|
||||
if (ok) {
|
||||
LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u",
|
||||
job->dest->used,
|
||||
job->dest->encode_end_ts - job->dest->encode_begin_ts,
|
||||
wr->name,
|
||||
job->buf_index);
|
||||
} else {
|
||||
LOG_VERBOSE("Compression failed: worker=%s, buffer=%u", wr->name, job->buf_index);
|
||||
}
|
||||
|
||||
A_MUTEX_LOCK(wr->free_workers_mutex);
|
||||
*wr->free_workers += 1;
|
||||
A_MUTEX_UNLOCK(wr->free_workers_mutex);
|
||||
A_COND_SIGNAL(wr->free_workers_cond);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Bye-bye (worker %u)", wr->number);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static _worker_s *_workers_pool_wait(_pool_s *pool) {
|
||||
_worker_s *ready_wr = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
|
||||
ready_wr = pool->oldest_wr;
|
||||
ready_wr->job_timely = true;
|
||||
pool->oldest_wr = pool->oldest_wr->order_next;
|
||||
} else {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
if (
|
||||
!atomic_load(&pool->workers[number].has_job) && (
|
||||
ready_wr == NULL
|
||||
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
|
||||
)
|
||||
) {
|
||||
ready_wr = &pool->workers[number];
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(ready_wr != NULL);
|
||||
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
|
||||
ok = false;
|
||||
}
|
||||
return ready_wr;
|
||||
}
|
||||
|
||||
static void _workers_pool_assign(_pool_s *pool, _worker_s *ready_wr, unsigned buf_index) {
|
||||
if (pool->oldest_wr == NULL) {
|
||||
pool->oldest_wr = ready_wr;
|
||||
pool->latest_wr = pool->oldest_wr;
|
||||
} else {
|
||||
if (ready_wr->order_next) {
|
||||
ready_wr->order_next->order_prev = ready_wr->order_prev;
|
||||
}
|
||||
if (ready_wr->order_prev) {
|
||||
ready_wr->order_prev->order_next = ready_wr->order_next;
|
||||
}
|
||||
ready_wr->order_prev = pool->latest_wr;
|
||||
pool->latest_wr->order_next = ready_wr;
|
||||
pool->latest_wr = ready_wr;
|
||||
}
|
||||
pool->latest_wr->order_next = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&ready_wr->has_job_mutex);
|
||||
ready_wr->buf_index = buf_index;
|
||||
atomic_store(&ready_wr->has_job, true);
|
||||
A_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
|
||||
A_COND_SIGNAL(&ready_wr->has_job_cond);
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
pool->free_workers -= 1;
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_wr->number);
|
||||
}
|
||||
|
||||
static long double _workers_pool_get_fluency_delay(_pool_s *pool, _worker_s *ready_wr) {
|
||||
const long double approx_comp_time = pool->approx_comp_time * 0.9 + ready_wr->last_comp_time * 0.1;
|
||||
|
||||
LOG_VERBOSE("Correcting approx_comp_time: %.3Lf -> %.3Lf (last_comp_time=%.3Lf)",
|
||||
pool->approx_comp_time, approx_comp_time, ready_wr->last_comp_time);
|
||||
|
||||
pool->approx_comp_time = approx_comp_time;
|
||||
|
||||
const long double min_delay = pool->approx_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
|
||||
if (pool->desired_frames_interval > 0 && min_delay > 0 && pool->desired_frames_interval > min_delay) {
|
||||
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
|
||||
// и аппаратный fps не попадает точно в желаемое значение
|
||||
return pool->desired_frames_interval;
|
||||
}
|
||||
return min_delay;
|
||||
return ok;
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include "blank.h"
|
||||
#include "device.h"
|
||||
#include "encoder.h"
|
||||
#include "workers.h"
|
||||
#ifdef WITH_MEMSINK
|
||||
# include "../libs/memsink/memsink.h"
|
||||
#endif
|
||||
|
||||
213
src/ustreamer/workers.c
Normal file
213
src/ustreamer/workers.c
Normal file
@@ -0,0 +1,213 @@
|
||||
/*****************************************************************************
|
||||
# #
|
||||
# uStreamer - Lightweight and fast MJPG-HTTP streamer. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
*****************************************************************************/
|
||||
|
||||
|
||||
#include "workers.h"
|
||||
|
||||
|
||||
static void *_worker_thread(void *v_worker);
|
||||
|
||||
|
||||
workers_pool_s *workers_pool_init(
|
||||
const char *name, unsigned n_workers, long double desired_interval,
|
||||
void *(*job_init)(worker_s *wr),
|
||||
void (*job_destroy)(void *),
|
||||
bool (*run_job)(worker_s *)) {
|
||||
|
||||
LOG_INFO("Creating pool %s with %u workers ...", name, n_workers);
|
||||
|
||||
workers_pool_s *pool;
|
||||
A_CALLOC(pool, 1);
|
||||
pool->name = name;
|
||||
pool->desired_interval = desired_interval;
|
||||
pool->job_destroy = job_destroy;
|
||||
pool->run_job = run_job;
|
||||
|
||||
atomic_init(&pool->stop, false);
|
||||
|
||||
pool->n_workers = n_workers;
|
||||
A_CALLOC(pool->workers, pool->n_workers);
|
||||
|
||||
A_MUTEX_INIT(&pool->free_workers_mutex);
|
||||
A_COND_INIT(&pool->free_workers_cond);
|
||||
|
||||
const size_t name_len = strlen(name) + 64;
|
||||
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WR(_next) pool->workers[number]._next
|
||||
|
||||
WR(number) = number;
|
||||
A_CALLOC(WR(name), name_len);
|
||||
snprintf(WR(name), name_len, "%s-%u", name, number);
|
||||
|
||||
A_MUTEX_INIT(&WR(has_job_mutex));
|
||||
atomic_init(&WR(has_job), false);
|
||||
A_COND_INIT(&WR(has_job_cond));
|
||||
|
||||
WR(pool) = pool;
|
||||
WR(job) = job_init(&pool->workers[number]);
|
||||
|
||||
A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
|
||||
pool->free_workers += 1;
|
||||
|
||||
# undef WR
|
||||
}
|
||||
return pool;
|
||||
}
|
||||
|
||||
void workers_pool_destroy(workers_pool_s *pool) {
|
||||
LOG_INFO("Destroying pool %s ...", pool->name);
|
||||
|
||||
atomic_store(&pool->stop, true);
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
# define WR(_next) pool->workers[number]._next
|
||||
|
||||
A_MUTEX_LOCK(&WR(has_job_mutex));
|
||||
atomic_store(&WR(has_job), true); // Final job: die
|
||||
A_MUTEX_UNLOCK(&WR(has_job_mutex));
|
||||
A_COND_SIGNAL(&WR(has_job_cond));
|
||||
|
||||
A_THREAD_JOIN(WR(tid));
|
||||
A_MUTEX_DESTROY(&WR(has_job_mutex));
|
||||
A_COND_DESTROY(&WR(has_job_cond));
|
||||
|
||||
free(WR(name));
|
||||
|
||||
pool->job_destroy(WR(job));
|
||||
|
||||
# undef WR
|
||||
}
|
||||
|
||||
A_MUTEX_DESTROY(&pool->free_workers_mutex);
|
||||
A_COND_DESTROY(&pool->free_workers_cond);
|
||||
|
||||
free(pool->workers);
|
||||
free(pool);
|
||||
}
|
||||
|
||||
worker_s *workers_pool_wait(workers_pool_s *pool) {
|
||||
worker_s *ready_wr = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
|
||||
if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
|
||||
ready_wr = pool->oldest_wr;
|
||||
ready_wr->job_timely = true;
|
||||
pool->oldest_wr = pool->oldest_wr->order_next;
|
||||
} else {
|
||||
for (unsigned number = 0; number < pool->n_workers; ++number) {
|
||||
if (
|
||||
!atomic_load(&pool->workers[number].has_job) && (
|
||||
ready_wr == NULL
|
||||
|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
|
||||
)
|
||||
) {
|
||||
ready_wr = &pool->workers[number];
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(ready_wr != NULL);
|
||||
ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
|
||||
}
|
||||
return ready_wr;
|
||||
}
|
||||
|
||||
void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/) {
|
||||
if (pool->oldest_wr == NULL) {
|
||||
pool->oldest_wr = ready_wr;
|
||||
pool->latest_wr = pool->oldest_wr;
|
||||
} else {
|
||||
if (ready_wr->order_next) {
|
||||
ready_wr->order_next->order_prev = ready_wr->order_prev;
|
||||
}
|
||||
if (ready_wr->order_prev) {
|
||||
ready_wr->order_prev->order_next = ready_wr->order_next;
|
||||
}
|
||||
ready_wr->order_prev = pool->latest_wr;
|
||||
pool->latest_wr->order_next = ready_wr;
|
||||
pool->latest_wr = ready_wr;
|
||||
}
|
||||
pool->latest_wr->order_next = NULL;
|
||||
|
||||
A_MUTEX_LOCK(&ready_wr->has_job_mutex);
|
||||
//ready_wr->job = job;
|
||||
atomic_store(&ready_wr->has_job, true);
|
||||
A_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
|
||||
A_COND_SIGNAL(&ready_wr->has_job_cond);
|
||||
|
||||
A_MUTEX_LOCK(&pool->free_workers_mutex);
|
||||
pool->free_workers -= 1;
|
||||
A_MUTEX_UNLOCK(&pool->free_workers_mutex);
|
||||
}
|
||||
|
||||
long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr) {
|
||||
const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
|
||||
|
||||
LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
|
||||
pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
|
||||
|
||||
pool->approx_job_time = approx_job_time;
|
||||
|
||||
const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
|
||||
|
||||
if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
|
||||
// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
|
||||
// и аппаратный fps не попадает точно в желаемое значение
|
||||
return pool->desired_interval;
|
||||
}
|
||||
return min_delay;
|
||||
}
|
||||
|
||||
static void *_worker_thread(void *v_worker) {
|
||||
worker_s *wr = (worker_s *)v_worker;
|
||||
|
||||
A_THREAD_RENAME(wr->name);
|
||||
LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
|
||||
|
||||
while (!atomic_load(&wr->pool->stop)) {
|
||||
LOG_DEBUG("Worker %s waiting for a new job ...", wr->name);
|
||||
|
||||
A_MUTEX_LOCK(&wr->has_job_mutex);
|
||||
A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
|
||||
A_MUTEX_UNLOCK(&wr->has_job_mutex);
|
||||
|
||||
if (!atomic_load(&wr->pool->stop)) {
|
||||
long double job_start_ts = get_now_monotonic();
|
||||
wr->job_failed = !wr->pool->run_job(wr);
|
||||
if (!wr->job_failed) {
|
||||
wr->job_start_ts = job_start_ts;
|
||||
wr->last_job_time = get_now_monotonic() - wr->job_start_ts;
|
||||
}
|
||||
//wr->job = NULL;
|
||||
atomic_store(&wr->has_job, false);
|
||||
}
|
||||
|
||||
A_MUTEX_LOCK(&wr->pool->free_workers_mutex);
|
||||
wr->pool->free_workers += 1;
|
||||
A_MUTEX_UNLOCK(&wr->pool->free_workers_mutex);
|
||||
A_COND_SIGNAL(&wr->pool->free_workers_cond);
|
||||
}
|
||||
|
||||
LOG_DEBUG("Bye-bye (worker %s)", wr->name);
|
||||
return NULL;
|
||||
}
|
||||
91
src/ustreamer/workers.h
Normal file
91
src/ustreamer/workers.h
Normal file
@@ -0,0 +1,91 @@
|
||||
/*****************************************************************************
|
||||
# #
|
||||
# uStreamer - Lightweight and fast MJPG-HTTP streamer. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
*****************************************************************************/
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#include "../libs/common/tools.h"
|
||||
#include "../libs/common/threading.h"
|
||||
#include "../libs/common/logging.h"
|
||||
|
||||
|
||||
typedef struct worker_sx {
|
||||
pthread_t tid;
|
||||
unsigned number;
|
||||
char *name;
|
||||
|
||||
long double last_job_time;
|
||||
|
||||
pthread_mutex_t has_job_mutex;
|
||||
void *job;
|
||||
atomic_bool has_job;
|
||||
bool job_timely;
|
||||
bool job_failed;
|
||||
long double job_start_ts;
|
||||
pthread_cond_t has_job_cond;
|
||||
|
||||
struct worker_sx *order_prev;
|
||||
struct worker_sx *order_next;
|
||||
|
||||
struct workers_pool_sx *pool;
|
||||
} worker_s;
|
||||
|
||||
typedef struct workers_pool_sx {
|
||||
const char *name;
|
||||
long double desired_interval;
|
||||
|
||||
bool (*run_job)(worker_s *wr);
|
||||
void (*job_destroy)(void *job);
|
||||
|
||||
unsigned n_workers;
|
||||
worker_s *workers;
|
||||
worker_s *oldest_wr;
|
||||
worker_s *latest_wr;
|
||||
|
||||
long double approx_job_time;
|
||||
|
||||
pthread_mutex_t free_workers_mutex;
|
||||
unsigned free_workers;
|
||||
pthread_cond_t free_workers_cond;
|
||||
|
||||
atomic_bool stop;
|
||||
} workers_pool_s;
|
||||
|
||||
|
||||
workers_pool_s *workers_pool_init(
|
||||
const char *name, unsigned n_workers, long double desired_interval,
|
||||
void *(*job_init)(worker_s *wr),
|
||||
void (*job_destroy)(void *job),
|
||||
bool (*run_job)(worker_s *));
|
||||
|
||||
void workers_pool_destroy(workers_pool_s *pool);
|
||||
|
||||
worker_s *workers_pool_wait(workers_pool_s *pool);
|
||||
void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/);
|
||||
|
||||
long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr);
|
||||
Reference in New Issue
Block a user