diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 3b4c252..2392193 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -23,64 +23,22 @@ #include "stream.h" -typedef struct _worker_sx { - pthread_t tid; - unsigned number; - atomic_bool *proc_stop; - atomic_bool *workers_stop; - - long double last_comp_time; - char *frame_role; - frame_s *frame; - - pthread_mutex_t has_job_mutex; - unsigned buf_index; - atomic_bool has_job; - bool job_timely; - bool job_failed; - long double job_start_ts; - pthread_cond_t has_job_cond; - - pthread_mutex_t *free_workers_mutex; - unsigned *free_workers; - pthread_cond_t *free_workers_cond; - - struct _worker_sx *order_prev; - struct _worker_sx *order_next; - - stream_s *stream; -} _worker_s; - typedef struct { - unsigned n_workers; - _worker_s *workers; - _worker_s *oldest_wr; - _worker_s *latest_wr; - - long double approx_comp_time; - - pthread_mutex_t free_workers_mutex; - unsigned free_workers; - pthread_cond_t free_workers_cond; - - atomic_bool workers_stop; - - long double desired_frames_interval; -} _pool_s; + device_s *dev; + encoder_s *encoder; + unsigned buf_index; + char *dest_role; + frame_s *dest; +} _job_s; -static _pool_s *_stream_init_loop(stream_s *stream); -static _pool_s *_stream_init_one(stream_s *stream); +static workers_pool_s *_stream_init_loop(stream_s *stream); +static workers_pool_s *_stream_init_one(stream_s *stream); static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned captured_fps); -static _pool_s *_workers_pool_init(stream_s *stream); -static void _workers_pool_destroy(_pool_s *pool); - -static void *_worker_thread(void *v_worker); - -static _worker_s *_workers_pool_wait(_pool_s *pool); -static void _workers_pool_assign(_pool_s *pool, _worker_s *ready_wr, unsigned buf_index); -static long double _workers_pool_get_fluency_delay(_pool_s *pool, _worker_s *ready_wr); +static void *_worker_job_init(worker_s *wr); +static void _worker_job_destroy(void *v_job); +static bool _worker_run_job(worker_s *wr); stream_s *stream_init(device_s *dev, encoder_s *encoder) { @@ -120,7 +78,7 @@ void stream_loop(stream_s *stream) { LOG_INFO("Using V4L2 device: %s", DEV(path)); LOG_INFO("Using desired FPS: %u", DEV(desired_fps)); - for (_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) { + for (workers_pool_s *pool; (pool = _stream_init_loop(stream)) != NULL;) { long double grab_after = 0; unsigned fluency_passed = 0; unsigned captured_fps = 0; @@ -133,14 +91,14 @@ void stream_loop(stream_s *stream) { SEP_DEBUG('-'); LOG_DEBUG("Waiting for worker ..."); - _worker_s *ready_wr = _workers_pool_wait(pool); + worker_s *ready_wr = workers_pool_wait(pool); if (!ready_wr->job_failed) { if (ready_wr->job_timely) { - _stream_expose_frame(stream, ready_wr->frame, captured_fps); - LOG_PERF("##### Encoded frame exposed; worker=%u", ready_wr->number); + _stream_expose_frame(stream, ((_job_s *)(ready_wr->job))->dest, captured_fps); + LOG_PERF("##### Encoded frame exposed; worker=%s", ready_wr->name); } else { - LOG_PERF("----- Encoded frame dropped; worker=%u", ready_wr->number); + LOG_PERF("----- Encoded frame dropped; worker=%s", ready_wr->name); } } else { break; @@ -199,18 +157,25 @@ void stream_loop(stream_s *stream) { } captured_fps_accum += 1; - const long double fluency_delay = _workers_pool_get_fluency_delay(pool, ready_wr); + const long double fluency_delay = workers_pool_get_fluency_delay(pool, ready_wr); grab_after = now + fluency_delay; LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after); # ifdef WITH_MEMSINK - if (stream->raw_sink && memsink_server_put(stream->raw_sink, &DEV(run->hw_buffers[buf_index].raw)) < 0) { - stream->raw_sink = NULL; - LOG_ERROR("RAW sink completely disabled due error"); + if (stream->raw_sink) { + if (memsink_server_put(stream->raw_sink, &DEV(run->hw_buffers[buf_index].raw)) < 0) { + stream->raw_sink = NULL; + LOG_ERROR("RAW sink completely disabled due error"); + } } # endif - _workers_pool_assign(pool, ready_wr, buf_index); + _job_s *job = (_job_s *)ready_wr->job; + job->dev = stream->dev; + job->encoder = stream->encoder; + job->buf_index = buf_index; + workers_pool_assign(pool, ready_wr); + LOG_DEBUG("Assigned new frame in buffer %u to worker %s", buf_index, ready_wr->name); } } else if (buf_index != -2) { // -2 for broken frame break; @@ -231,7 +196,7 @@ void stream_loop(stream_s *stream) { } } - _workers_pool_destroy(pool); + workers_pool_destroy(pool); device_switch_capturing(stream->dev, false); device_close(stream->dev); @@ -251,8 +216,8 @@ void stream_switch_slowdown(stream_s *stream, bool slowdown) { atomic_store(&stream->proc->slowdown, slowdown); } -static _pool_s *_stream_init_loop(stream_s *stream) { - _pool_s *pool = NULL; +static workers_pool_s *_stream_init_loop(stream_s *stream) { + workers_pool_s *pool = NULL; int access_error = 0; LOG_DEBUG("%s: stream->proc->stop=%d", __FUNCTION__, atomic_load(&stream->proc->stop)); @@ -260,9 +225,11 @@ static _pool_s *_stream_init_loop(stream_s *stream) { while (!atomic_load(&stream->proc->stop)) { if (_stream_expose_frame(stream, NULL, 0)) { # ifdef WITH_MEMSINK - if (stream->raw_sink && memsink_server_put(stream->raw_sink, stream->blank) < 0) { - stream->raw_sink = NULL; - LOG_ERROR("RAW sink completely disabled due error"); + if (stream->raw_sink) { + if (memsink_server_put(stream->raw_sink, stream->blank) < 0) { + stream->raw_sink = NULL; + LOG_ERROR("RAW sink completely disabled due error"); + } } # endif } @@ -291,7 +258,7 @@ static _pool_s *_stream_init_loop(stream_s *stream) { return pool; } -static _pool_s *_stream_init_one(stream_s *stream) { +static workers_pool_s *_stream_init_one(stream_s *stream) { if (device_open(stream->dev) < 0) { goto error; } @@ -300,7 +267,19 @@ static _pool_s *_stream_init_one(stream_s *stream) { } encoder_prepare(stream->encoder, stream->dev); - return _workers_pool_init(stream); + +# define DEV(_next) stream->dev->_next +# define RUN(_next) stream->dev->run->_next + long double desired_interval = 0; + if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) { + desired_interval = (long double)1 / DEV(desired_fps); + } +# undef DEV +# undef RUN + + return workers_pool_init( + "jpeg", stream->encoder->run->n_workers, desired_interval, + _worker_job_init, _worker_job_destroy, _worker_run_job); error: device_close(stream->dev); @@ -364,214 +343,46 @@ static bool _stream_expose_frame(stream_s *stream, frame_s *frame, unsigned capt # undef VID } -static _pool_s *_workers_pool_init(stream_s *stream) { -# define DEV(_next) stream->dev->_next -# define RUN(_next) stream->dev->run->_next +static void *_worker_job_init(worker_s *wr) { + _job_s *job; + A_CALLOC(job, 1); - LOG_INFO("Creating pool with %u workers ...", stream->encoder->run->n_workers); + const size_t dest_role_len = strlen(wr->name) + 16; + A_CALLOC(job->dest_role, dest_role_len); + snprintf(job->dest_role, dest_role_len, "%s_dest", wr->name); + job->dest = frame_init(job->dest_role); - _pool_s *pool; - A_CALLOC(pool, 1); - - pool->n_workers = stream->encoder->run->n_workers; - A_CALLOC(pool->workers, pool->n_workers); - - A_MUTEX_INIT(&pool->free_workers_mutex); - A_COND_INIT(&pool->free_workers_cond); - - atomic_init(&pool->workers_stop, false); - - if (DEV(desired_fps) > 0 && (DEV(desired_fps) < RUN(hw_fps) || RUN(hw_fps) == 0)) { - pool->desired_frames_interval = (long double)1 / DEV(desired_fps); - } - -# undef RUN -# undef DEV - - for (unsigned number = 0; number < pool->n_workers; ++number) { -# define WR(_next) pool->workers[number]._next - - A_CALLOC(WR(frame_role), 32); - sprintf(WR(frame_role), "worker_dest_%u", number); - WR(frame) = frame_init(WR(frame_role)); - - A_MUTEX_INIT(&WR(has_job_mutex)); - atomic_init(&WR(has_job), false); - A_COND_INIT(&WR(has_job_cond)); - - WR(number) = number; - WR(proc_stop) = &stream->proc->stop; - WR(workers_stop) = &pool->workers_stop; - - WR(free_workers_mutex) = &pool->free_workers_mutex; - WR(free_workers) = &pool->free_workers; - WR(free_workers_cond) = &pool->free_workers_cond; - - WR(stream) = stream; - - A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number])); - - pool->free_workers += 1; - -# undef WR - } - return pool; + return (void *)job; +} +static void _worker_job_destroy(void *v_job) { + _job_s *job = (_job_s *)v_job; + frame_destroy(job->dest); + free(job->dest_role); + free(job); } -static void _workers_pool_destroy(_pool_s *pool) { - LOG_INFO("Destroying workers pool ..."); +static bool _worker_run_job(worker_s *wr) { + _job_s *job = (_job_s *)wr->job; - atomic_store(&pool->workers_stop, true); - for (unsigned number = 0; number < pool->n_workers; ++number) { -# define WR(_next) pool->workers[number]._next + LOG_DEBUG("Worker %s compressing JPEG from buffer %u ...", wr->name, job->buf_index); + bool ok = !encoder_compress( + job->encoder, + wr->number, + &job->dev->run->hw_buffers[job->buf_index].raw, + job->dest); - A_MUTEX_LOCK(&WR(has_job_mutex)); - atomic_store(&WR(has_job), true); // Final job: die - A_MUTEX_UNLOCK(&WR(has_job_mutex)); - A_COND_SIGNAL(&WR(has_job_cond)); - - A_THREAD_JOIN(WR(tid)); - A_MUTEX_DESTROY(&WR(has_job_mutex)); - A_COND_DESTROY(&WR(has_job_cond)); - - frame_destroy(WR(frame)); - free(WR(frame_role)); - -# undef WR - } - - A_MUTEX_DESTROY(&pool->free_workers_mutex); - A_COND_DESTROY(&pool->free_workers_cond); - - free(pool->workers); - free(pool); -} - -static void *_worker_thread(void *v_worker) { - _worker_s *wr = (_worker_s *)v_worker; - - A_THREAD_RENAME("worker-%u", wr->number); - LOG_DEBUG("Hello! I am a worker #%u ^_^", wr->number); - - while (!atomic_load(wr->proc_stop) && !atomic_load(wr->workers_stop)) { - LOG_DEBUG("Worker %u waiting for a new job ...", wr->number); - - A_MUTEX_LOCK(&wr->has_job_mutex); - A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex); - A_MUTEX_UNLOCK(&wr->has_job_mutex); - - if (!atomic_load(wr->workers_stop)) { -# define PIC(_next) wr->frame->_next - - LOG_DEBUG("Worker %u compressing JPEG from buffer %u ...", wr->number, wr->buf_index); - wr->job_failed = (bool)encoder_compress( - wr->stream->encoder, - wr->number, - &wr->stream->dev->run->hw_buffers[wr->buf_index].raw, - wr->frame - ); - - if (device_release_buffer(wr->stream->dev, wr->buf_index) == 0) { - if (!wr->job_failed) { - wr->job_start_ts = PIC(encode_begin_ts); - wr->last_comp_time = PIC(encode_end_ts) - wr->job_start_ts; - LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%u, buffer=%u", - PIC(used), wr->last_comp_time, wr->number, wr->buf_index); - } else { - LOG_VERBOSE("Compression failed: worker=%u, buffer=%u", wr->number, wr->buf_index); - } - } else { - wr->job_failed = true; - } - - atomic_store(&wr->has_job, false); - -# undef PIC + if (device_release_buffer(job->dev, job->buf_index) == 0) { + if (ok) { + LOG_VERBOSE("Compressed new JPEG: size=%zu, time=%0.3Lf, worker=%s, buffer=%u", + job->dest->used, + job->dest->encode_end_ts - job->dest->encode_begin_ts, + wr->name, + job->buf_index); + } else { + LOG_VERBOSE("Compression failed: worker=%s, buffer=%u", wr->name, job->buf_index); } - - A_MUTEX_LOCK(wr->free_workers_mutex); - *wr->free_workers += 1; - A_MUTEX_UNLOCK(wr->free_workers_mutex); - A_COND_SIGNAL(wr->free_workers_cond); - } - - LOG_DEBUG("Bye-bye (worker %u)", wr->number); - return NULL; -} - -static _worker_s *_workers_pool_wait(_pool_s *pool) { - _worker_s *ready_wr = NULL; - - A_MUTEX_LOCK(&pool->free_workers_mutex); - A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); - A_MUTEX_UNLOCK(&pool->free_workers_mutex); - - if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) { - ready_wr = pool->oldest_wr; - ready_wr->job_timely = true; - pool->oldest_wr = pool->oldest_wr->order_next; } else { - for (unsigned number = 0; number < pool->n_workers; ++number) { - if ( - !atomic_load(&pool->workers[number].has_job) && ( - ready_wr == NULL - || ready_wr->job_start_ts < pool->workers[number].job_start_ts - ) - ) { - ready_wr = &pool->workers[number]; - break; - } - } - assert(ready_wr != NULL); - ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + ok = false; } - return ready_wr; -} - -static void _workers_pool_assign(_pool_s *pool, _worker_s *ready_wr, unsigned buf_index) { - if (pool->oldest_wr == NULL) { - pool->oldest_wr = ready_wr; - pool->latest_wr = pool->oldest_wr; - } else { - if (ready_wr->order_next) { - ready_wr->order_next->order_prev = ready_wr->order_prev; - } - if (ready_wr->order_prev) { - ready_wr->order_prev->order_next = ready_wr->order_next; - } - ready_wr->order_prev = pool->latest_wr; - pool->latest_wr->order_next = ready_wr; - pool->latest_wr = ready_wr; - } - pool->latest_wr->order_next = NULL; - - A_MUTEX_LOCK(&ready_wr->has_job_mutex); - ready_wr->buf_index = buf_index; - atomic_store(&ready_wr->has_job, true); - A_MUTEX_UNLOCK(&ready_wr->has_job_mutex); - A_COND_SIGNAL(&ready_wr->has_job_cond); - - A_MUTEX_LOCK(&pool->free_workers_mutex); - pool->free_workers -= 1; - A_MUTEX_UNLOCK(&pool->free_workers_mutex); - - LOG_DEBUG("Assigned new frame in buffer %u to worker %u", buf_index, ready_wr->number); -} - -static long double _workers_pool_get_fluency_delay(_pool_s *pool, _worker_s *ready_wr) { - const long double approx_comp_time = pool->approx_comp_time * 0.9 + ready_wr->last_comp_time * 0.1; - - LOG_VERBOSE("Correcting approx_comp_time: %.3Lf -> %.3Lf (last_comp_time=%.3Lf)", - pool->approx_comp_time, approx_comp_time, ready_wr->last_comp_time); - - pool->approx_comp_time = approx_comp_time; - - const long double min_delay = pool->approx_comp_time / pool->n_workers; // Среднее время работы размазывается на N воркеров - - if (pool->desired_frames_interval > 0 && min_delay > 0 && pool->desired_frames_interval > min_delay) { - // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps - // и аппаратный fps не попадает точно в желаемое значение - return pool->desired_frames_interval; - } - return min_delay; + return ok; } diff --git a/src/ustreamer/stream.h b/src/ustreamer/stream.h index 7649610..01d1cb6 100644 --- a/src/ustreamer/stream.h +++ b/src/ustreamer/stream.h @@ -40,6 +40,7 @@ #include "blank.h" #include "device.h" #include "encoder.h" +#include "workers.h" #ifdef WITH_MEMSINK # include "../libs/memsink/memsink.h" #endif diff --git a/src/ustreamer/workers.c b/src/ustreamer/workers.c new file mode 100644 index 0000000..4482764 --- /dev/null +++ b/src/ustreamer/workers.c @@ -0,0 +1,213 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#include "workers.h" + + +static void *_worker_thread(void *v_worker); + + +workers_pool_s *workers_pool_init( + const char *name, unsigned n_workers, long double desired_interval, + void *(*job_init)(worker_s *wr), + void (*job_destroy)(void *), + bool (*run_job)(worker_s *)) { + + LOG_INFO("Creating pool %s with %u workers ...", name, n_workers); + + workers_pool_s *pool; + A_CALLOC(pool, 1); + pool->name = name; + pool->desired_interval = desired_interval; + pool->job_destroy = job_destroy; + pool->run_job = run_job; + + atomic_init(&pool->stop, false); + + pool->n_workers = n_workers; + A_CALLOC(pool->workers, pool->n_workers); + + A_MUTEX_INIT(&pool->free_workers_mutex); + A_COND_INIT(&pool->free_workers_cond); + + const size_t name_len = strlen(name) + 64; + + for (unsigned number = 0; number < pool->n_workers; ++number) { +# define WR(_next) pool->workers[number]._next + + WR(number) = number; + A_CALLOC(WR(name), name_len); + snprintf(WR(name), name_len, "%s-%u", name, number); + + A_MUTEX_INIT(&WR(has_job_mutex)); + atomic_init(&WR(has_job), false); + A_COND_INIT(&WR(has_job_cond)); + + WR(pool) = pool; + WR(job) = job_init(&pool->workers[number]); + + A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number])); + pool->free_workers += 1; + +# undef WR + } + return pool; +} + +void workers_pool_destroy(workers_pool_s *pool) { + LOG_INFO("Destroying pool %s ...", pool->name); + + atomic_store(&pool->stop, true); + for (unsigned number = 0; number < pool->n_workers; ++number) { +# define WR(_next) pool->workers[number]._next + + A_MUTEX_LOCK(&WR(has_job_mutex)); + atomic_store(&WR(has_job), true); // Final job: die + A_MUTEX_UNLOCK(&WR(has_job_mutex)); + A_COND_SIGNAL(&WR(has_job_cond)); + + A_THREAD_JOIN(WR(tid)); + A_MUTEX_DESTROY(&WR(has_job_mutex)); + A_COND_DESTROY(&WR(has_job_cond)); + + free(WR(name)); + + pool->job_destroy(WR(job)); + +# undef WR + } + + A_MUTEX_DESTROY(&pool->free_workers_mutex); + A_COND_DESTROY(&pool->free_workers_cond); + + free(pool->workers); + free(pool); +} + +worker_s *workers_pool_wait(workers_pool_s *pool) { + worker_s *ready_wr = NULL; + + A_MUTEX_LOCK(&pool->free_workers_mutex); + A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex); + A_MUTEX_UNLOCK(&pool->free_workers_mutex); + + if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) { + ready_wr = pool->oldest_wr; + ready_wr->job_timely = true; + pool->oldest_wr = pool->oldest_wr->order_next; + } else { + for (unsigned number = 0; number < pool->n_workers; ++number) { + if ( + !atomic_load(&pool->workers[number].has_job) && ( + ready_wr == NULL + || ready_wr->job_start_ts < pool->workers[number].job_start_ts + ) + ) { + ready_wr = &pool->workers[number]; + break; + } + } + assert(ready_wr != NULL); + ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате) + } + return ready_wr; +} + +void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/) { + if (pool->oldest_wr == NULL) { + pool->oldest_wr = ready_wr; + pool->latest_wr = pool->oldest_wr; + } else { + if (ready_wr->order_next) { + ready_wr->order_next->order_prev = ready_wr->order_prev; + } + if (ready_wr->order_prev) { + ready_wr->order_prev->order_next = ready_wr->order_next; + } + ready_wr->order_prev = pool->latest_wr; + pool->latest_wr->order_next = ready_wr; + pool->latest_wr = ready_wr; + } + pool->latest_wr->order_next = NULL; + + A_MUTEX_LOCK(&ready_wr->has_job_mutex); + //ready_wr->job = job; + atomic_store(&ready_wr->has_job, true); + A_MUTEX_UNLOCK(&ready_wr->has_job_mutex); + A_COND_SIGNAL(&ready_wr->has_job_cond); + + A_MUTEX_LOCK(&pool->free_workers_mutex); + pool->free_workers -= 1; + A_MUTEX_UNLOCK(&pool->free_workers_mutex); +} + +long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr) { + const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1; + + LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)", + pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time); + + pool->approx_job_time = approx_job_time; + + const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров + + if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) { + // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps + // и аппаратный fps не попадает точно в желаемое значение + return pool->desired_interval; + } + return min_delay; +} + +static void *_worker_thread(void *v_worker) { + worker_s *wr = (worker_s *)v_worker; + + A_THREAD_RENAME(wr->name); + LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name); + + while (!atomic_load(&wr->pool->stop)) { + LOG_DEBUG("Worker %s waiting for a new job ...", wr->name); + + A_MUTEX_LOCK(&wr->has_job_mutex); + A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex); + A_MUTEX_UNLOCK(&wr->has_job_mutex); + + if (!atomic_load(&wr->pool->stop)) { + long double job_start_ts = get_now_monotonic(); + wr->job_failed = !wr->pool->run_job(wr); + if (!wr->job_failed) { + wr->job_start_ts = job_start_ts; + wr->last_job_time = get_now_monotonic() - wr->job_start_ts; + } + //wr->job = NULL; + atomic_store(&wr->has_job, false); + } + + A_MUTEX_LOCK(&wr->pool->free_workers_mutex); + wr->pool->free_workers += 1; + A_MUTEX_UNLOCK(&wr->pool->free_workers_mutex); + A_COND_SIGNAL(&wr->pool->free_workers_cond); + } + + LOG_DEBUG("Bye-bye (worker %s)", wr->name); + return NULL; +} diff --git a/src/ustreamer/workers.h b/src/ustreamer/workers.h new file mode 100644 index 0000000..f743f2d --- /dev/null +++ b/src/ustreamer/workers.h @@ -0,0 +1,91 @@ +/***************************************************************************** +# # +# uStreamer - Lightweight and fast MJPG-HTTP streamer. # +# # +# Copyright (C) 2018 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +*****************************************************************************/ + + +#pragma once + +#include +#include + +#include + +#include + +#include "../libs/common/tools.h" +#include "../libs/common/threading.h" +#include "../libs/common/logging.h" + + +typedef struct worker_sx { + pthread_t tid; + unsigned number; + char *name; + + long double last_job_time; + + pthread_mutex_t has_job_mutex; + void *job; + atomic_bool has_job; + bool job_timely; + bool job_failed; + long double job_start_ts; + pthread_cond_t has_job_cond; + + struct worker_sx *order_prev; + struct worker_sx *order_next; + + struct workers_pool_sx *pool; +} worker_s; + +typedef struct workers_pool_sx { + const char *name; + long double desired_interval; + + bool (*run_job)(worker_s *wr); + void (*job_destroy)(void *job); + + unsigned n_workers; + worker_s *workers; + worker_s *oldest_wr; + worker_s *latest_wr; + + long double approx_job_time; + + pthread_mutex_t free_workers_mutex; + unsigned free_workers; + pthread_cond_t free_workers_cond; + + atomic_bool stop; +} workers_pool_s; + + +workers_pool_s *workers_pool_init( + const char *name, unsigned n_workers, long double desired_interval, + void *(*job_init)(worker_s *wr), + void (*job_destroy)(void *job), + bool (*run_job)(worker_s *)); + +void workers_pool_destroy(workers_pool_s *pool); + +worker_s *workers_pool_wait(workers_pool_s *pool); +void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/); + +long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr);