refactoring

This commit is contained in:
Devaev Maxim
2019-03-15 13:34:06 +03:00
parent 484f89cb82
commit 50158397a0
5 changed files with 65 additions and 65 deletions

View File

@@ -57,7 +57,7 @@ struct encoder_t *encoder_init() {
A_CALLOC(run, 1); A_CALLOC(run, 1);
run->type = ENCODER_TYPE_CPU; run->type = ENCODER_TYPE_CPU;
run->quality = 80; run->quality = 80;
A_PTHREAD_M_INIT(&run->mutex); A_MUTEX_INIT(&run->mutex);
A_CALLOC(encoder, 1); A_CALLOC(encoder, 1);
encoder->type = run->type; encoder->type = run->type;
@@ -119,7 +119,7 @@ void encoder_destroy(struct encoder_t *encoder) {
free(encoder->run->omxs); free(encoder->run->omxs);
} }
# endif # endif
A_PTHREAD_M_DESTROY(&encoder->run->mutex); A_MUTEX_DESTROY(&encoder->run->mutex);
free(encoder->run); free(encoder->run);
free(encoder); free(encoder);
} }
@@ -150,9 +150,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
&& encoder->run->type != ENCODER_TYPE_HW && encoder->run->type != ENCODER_TYPE_HW
) { ) {
LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG"); LOG_INFO("Switching to HW JPEG encoder because the input format is (M)JPEG");
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_HW; encoder->run->type = ENCODER_TYPE_HW;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
} }
if (encoder->run->type == ENCODER_TYPE_HW) { if (encoder->run->type == ENCODER_TYPE_HW) {
@@ -161,9 +161,9 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
goto use_fallback; goto use_fallback;
} }
if (hw_encoder_prepare_live(dev, encoder->quality) < 0) { if (hw_encoder_prepare_live(dev, encoder->quality) < 0) {
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->quality = 0; encoder->run->quality = 0;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
LOG_INFO("Using JPEG quality: HW-default"); LOG_INFO("Using JPEG quality: HW-default");
} }
} }
@@ -181,10 +181,10 @@ void encoder_prepare_live(struct encoder_t *encoder, struct device_t *dev) {
return; return;
use_fallback: use_fallback:
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU; encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality; encoder->run->quality = encoder->quality;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
} }
#pragma GCC diagnostic ignored "-Wunused-label" #pragma GCC diagnostic ignored "-Wunused-label"
@@ -213,10 +213,10 @@ int encoder_compress_buffer(struct encoder_t *encoder, struct device_t *dev, uns
# pragma GCC diagnostic push # pragma GCC diagnostic push
use_fallback: use_fallback:
LOG_INFO("Error while compressing, falling back to CPU"); LOG_INFO("Error while compressing, falling back to CPU");
A_PTHREAD_M_LOCK(&encoder->run->mutex); A_MUTEX_LOCK(&encoder->run->mutex);
encoder->run->type = ENCODER_TYPE_CPU; encoder->run->type = ENCODER_TYPE_CPU;
encoder->run->quality = encoder->quality; encoder->run->quality = encoder->quality;
A_PTHREAD_M_UNLOCK(&encoder->run->mutex); A_MUTEX_UNLOCK(&encoder->run->mutex);
return -1; return -1;
# pragma GCC diagnostic pop # pragma GCC diagnostic pop
} }

View File

@@ -268,10 +268,10 @@ static void _http_callback_state(struct evhttp_request *request, void *v_server)
# define ENCODER(_next) server->run->stream->encoder->_next # define ENCODER(_next) server->run->stream->encoder->_next
A_PTHREAD_M_LOCK(&ENCODER(run->mutex)); A_MUTEX_LOCK(&ENCODER(run->mutex));
encoder_run_type = ENCODER(run->type); encoder_run_type = ENCODER(run->type);
encoder_run_quality = ENCODER(run->quality); encoder_run_quality = ENCODER(run->quality);
A_PTHREAD_M_UNLOCK(&ENCODER(run->mutex)); A_MUTEX_UNLOCK(&ENCODER(run->mutex));
assert((buf = evbuffer_new())); assert((buf = evbuffer_new()));
@@ -653,11 +653,11 @@ static void _http_exposed_refresh(UNUSED int fd, UNUSED short what, void *v_serv
bool picture_updated = false; bool picture_updated = false;
# define UNLOCK_STREAM \ # define UNLOCK_STREAM \
{ server->run->stream->updated = false; A_PTHREAD_M_UNLOCK(&server->run->stream->mutex); } { server->run->stream->updated = false; A_MUTEX_UNLOCK(&server->run->stream->mutex); }
if (server->run->stream->updated) { if (server->run->stream->updated) {
LOG_DEBUG("Refreshing HTTP exposed ..."); LOG_DEBUG("Refreshing HTTP exposed ...");
A_PTHREAD_M_LOCK(&server->run->stream->mutex); A_MUTEX_LOCK(&server->run->stream->mutex);
if (server->run->stream->picture.size > 0) { // If online if (server->run->stream->picture.size > 0) { // If online
picture_updated = _expose_new_picture(server); picture_updated = _expose_new_picture(server);
UNLOCK_STREAM; UNLOCK_STREAM;

View File

@@ -379,10 +379,10 @@ int main(int argc, char *argv[]) {
_ctx = &ctx; _ctx = &ctx;
if ((exit_code = http_server_listen(server)) == 0) { if ((exit_code = http_server_listen(server)) == 0) {
A_PTHREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL); A_THREAD_CREATE(&stream_loop_tid, _stream_loop_thread, NULL);
A_PTHREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL); A_THREAD_CREATE(&server_loop_tid, _server_loop_thread, NULL);
A_PTHREAD_JOIN(server_loop_tid); A_THREAD_JOIN(server_loop_tid);
A_PTHREAD_JOIN(stream_loop_tid); A_THREAD_JOIN(stream_loop_tid);
} }
} }

View File

@@ -63,13 +63,13 @@ struct stream_t *stream_init(struct device_t *dev, struct encoder_t *encoder) {
A_CALLOC(stream, 1); A_CALLOC(stream, 1);
stream->dev = dev; stream->dev = dev;
stream->encoder = encoder; stream->encoder = encoder;
A_PTHREAD_M_INIT(&stream->mutex); A_MUTEX_INIT(&stream->mutex);
stream->proc = proc; stream->proc = proc;
return stream; return stream;
} }
void stream_destroy(struct stream_t *stream) { void stream_destroy(struct stream_t *stream) {
A_PTHREAD_M_DESTROY(&stream->mutex); A_MUTEX_DESTROY(&stream->mutex);
free(stream->proc); free(stream->proc);
free(stream); free(stream);
} }
@@ -105,9 +105,9 @@ void stream_loop(struct stream_t *stream) {
SEP_DEBUG('-'); SEP_DEBUG('-');
LOG_DEBUG("Waiting for workers ..."); LOG_DEBUG("Waiting for workers ...");
A_PTHREAD_M_LOCK(&pool.free_workers_mutex); A_MUTEX_LOCK(&pool.free_workers_mutex);
A_PTHREAD_C_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex); A_COND_WAIT_TRUE(pool.free_workers, &pool.free_workers_cond, &pool.free_workers_mutex);
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex);
if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) { if (oldest_worker && !oldest_worker->has_job && oldest_worker->ctx.buf_index >= 0) {
if (oldest_worker->job_failed) { if (oldest_worker->job_failed) {
@@ -250,17 +250,17 @@ void stream_loop(struct stream_t *stream) {
} }
last_worker->order_next = NULL; last_worker->order_next = NULL;
A_PTHREAD_M_LOCK(&FREE_WORKER(has_job_mutex)); A_MUTEX_LOCK(&FREE_WORKER(has_job_mutex));
FREE_WORKER(ctx.buf_index) = buf_info.index; FREE_WORKER(ctx.buf_index) = buf_info.index;
FREE_WORKER(has_job) = true; FREE_WORKER(has_job) = true;
A_PTHREAD_M_UNLOCK(&FREE_WORKER(has_job_mutex)); A_MUTEX_UNLOCK(&FREE_WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&FREE_WORKER(has_job_cond)); A_COND_SIGNAL(&FREE_WORKER(has_job_cond));
# undef FREE_WORKER # undef FREE_WORKER
A_PTHREAD_M_LOCK(&pool.free_workers_mutex); A_MUTEX_LOCK(&pool.free_workers_mutex);
pool.free_workers -= 1; pool.free_workers -= 1;
A_PTHREAD_M_UNLOCK(&pool.free_workers_mutex); A_MUTEX_UNLOCK(&pool.free_workers_mutex);
goto next_handlers; // Поток сам освободит буфер goto next_handlers; // Поток сам освободит буфер
@@ -287,13 +287,13 @@ void stream_loop(struct stream_t *stream) {
} }
} }
A_PTHREAD_M_LOCK(&stream->mutex); A_MUTEX_LOCK(&stream->mutex);
stream->picture.size = 0; // On stream offline stream->picture.size = 0; // On stream offline
free(stream->picture.data); free(stream->picture.data);
stream->width = 0; stream->width = 0;
stream->height = 0; stream->height = 0;
stream->updated = true; stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex); A_MUTEX_UNLOCK(&stream->mutex);
} }
_stream_destroy_workers(stream, &pool); _stream_destroy_workers(stream, &pool);
@@ -312,7 +312,7 @@ void stream_switch_slowdown(struct stream_t *stream, bool slowdown) {
static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) { static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index) {
# define PICTURE(_next) stream->dev->run->pictures[buf_index]._next # define PICTURE(_next) stream->dev->run->pictures[buf_index]._next
A_PTHREAD_M_LOCK(&stream->mutex); A_MUTEX_LOCK(&stream->mutex);
stream->picture.size = PICTURE(size); stream->picture.size = PICTURE(size);
stream->picture.allocated = PICTURE(allocated); stream->picture.allocated = PICTURE(allocated);
@@ -327,7 +327,7 @@ static void _stream_expose_picture(struct stream_t *stream, unsigned buf_index)
stream->height = stream->dev->run->height; stream->height = stream->dev->run->height;
stream->updated = true; stream->updated = true;
A_PTHREAD_M_UNLOCK(&stream->mutex); A_MUTEX_UNLOCK(&stream->mutex);
# undef PICTURE # undef PICTURE
} }
@@ -341,11 +341,11 @@ static long double _stream_get_fluency_delay(struct device_t *dev, struct worker
for (unsigned number = 0; number < dev->n_workers; ++number) { for (unsigned number = 0; number < dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
A_PTHREAD_M_LOCK(&WORKER(last_comp_time_mutex)); A_MUTEX_LOCK(&WORKER(last_comp_time_mutex));
if (WORKER(last_comp_time) > 0) { if (WORKER(last_comp_time) > 0) {
sum_comp_time += WORKER(last_comp_time); sum_comp_time += WORKER(last_comp_time);
} }
A_PTHREAD_M_UNLOCK(&WORKER(last_comp_time_mutex)); A_MUTEX_UNLOCK(&WORKER(last_comp_time_mutex));
# undef WORKER # undef WORKER
} }
@@ -408,8 +408,8 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t
*pool->workers_stop = false; *pool->workers_stop = false;
A_CALLOC(pool->workers, stream->dev->n_workers); A_CALLOC(pool->workers, stream->dev->n_workers);
A_PTHREAD_M_INIT(&pool->free_workers_mutex); A_MUTEX_INIT(&pool->free_workers_mutex);
A_PTHREAD_C_INIT(&pool->free_workers_cond); A_COND_INIT(&pool->free_workers_cond);
for (unsigned number = 0; number < stream->dev->n_workers; ++number) { for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
@@ -417,8 +417,8 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t
pool->free_workers += 1; pool->free_workers += 1;
A_PTHREAD_M_INIT(&WORKER(has_job_mutex)); A_MUTEX_INIT(&WORKER(has_job_mutex));
A_PTHREAD_C_INIT(&WORKER(has_job_cond)); A_COND_INIT(&WORKER(has_job_cond));
CTX(number) = number; CTX(number) = number;
CTX(dev) = stream->dev; CTX(dev) = stream->dev;
@@ -440,7 +440,7 @@ static void _stream_init_workers(struct stream_t *stream, struct workers_pool_t
CTX(free_workers) = &pool->free_workers; CTX(free_workers) = &pool->free_workers;
CTX(free_workers_cond) = &pool->free_workers_cond; CTX(free_workers_cond) = &pool->free_workers_cond;
A_PTHREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx)); A_THREAD_CREATE(&WORKER(tid), _stream_worker_thread, (void *)&WORKER(ctx));
# undef CTX # undef CTX
# undef WORKER # undef WORKER
@@ -454,9 +454,9 @@ static void *_stream_worker_thread(void *v_ctx) {
while (!*ctx->proc_stop && !*ctx->workers_stop) { while (!*ctx->proc_stop && !*ctx->workers_stop) {
LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number); LOG_DEBUG("Worker %u waiting for a new job ...", ctx->number);
A_PTHREAD_M_LOCK(ctx->has_job_mutex); A_MUTEX_LOCK(ctx->has_job_mutex);
A_PTHREAD_C_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex); A_COND_WAIT_TRUE(*ctx->has_job, ctx->has_job_cond, ctx->has_job_mutex);
A_PTHREAD_M_UNLOCK(ctx->has_job_mutex); A_MUTEX_UNLOCK(ctx->has_job_mutex);
if (!*ctx->workers_stop) { if (!*ctx->workers_stop) {
# define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next # define PICTURE(_next) ctx->dev->run->pictures[ctx->buf_index]._next
@@ -475,9 +475,9 @@ static void *_stream_worker_thread(void *v_ctx) {
long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time; long double last_comp_time = PICTURE(encode_end_time) - *ctx->job_start_time;
A_PTHREAD_M_LOCK(ctx->last_comp_time_mutex); A_MUTEX_LOCK(ctx->last_comp_time_mutex);
*ctx->last_comp_time = last_comp_time; *ctx->last_comp_time = last_comp_time;
A_PTHREAD_M_UNLOCK(ctx->last_comp_time_mutex); A_MUTEX_UNLOCK(ctx->last_comp_time_mutex);
LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u", LOG_VERBOSE("Compressed JPEG size=%zu; time=%0.3Lf; worker=%u; buffer=%u",
PICTURE(size), last_comp_time, ctx->number, ctx->buf_index); PICTURE(size), last_comp_time, ctx->number, ctx->buf_index);
@@ -489,10 +489,10 @@ static void *_stream_worker_thread(void *v_ctx) {
# undef PICTURE # undef PICTURE
} }
A_PTHREAD_M_LOCK(ctx->free_workers_mutex); A_MUTEX_LOCK(ctx->free_workers_mutex);
*ctx->free_workers += 1; *ctx->free_workers += 1;
A_PTHREAD_M_UNLOCK(ctx->free_workers_mutex); A_MUTEX_UNLOCK(ctx->free_workers_mutex);
A_PTHREAD_C_SIGNAL(ctx->free_workers_cond); A_COND_SIGNAL(ctx->free_workers_cond);
} }
LOG_DEBUG("Bye-bye (worker %u)", ctx->number); LOG_DEBUG("Bye-bye (worker %u)", ctx->number);
@@ -507,20 +507,20 @@ static void _stream_destroy_workers(struct stream_t *stream, struct workers_pool
for (unsigned number = 0; number < stream->dev->n_workers; ++number) { for (unsigned number = 0; number < stream->dev->n_workers; ++number) {
# define WORKER(_next) pool->workers[number]._next # define WORKER(_next) pool->workers[number]._next
A_PTHREAD_M_LOCK(&WORKER(has_job_mutex)); A_MUTEX_LOCK(&WORKER(has_job_mutex));
WORKER(has_job) = true; // Final job: die WORKER(has_job) = true; // Final job: die
A_PTHREAD_M_UNLOCK(&WORKER(has_job_mutex)); A_MUTEX_UNLOCK(&WORKER(has_job_mutex));
A_PTHREAD_C_SIGNAL(&WORKER(has_job_cond)); A_COND_SIGNAL(&WORKER(has_job_cond));
A_PTHREAD_JOIN(WORKER(tid)); A_THREAD_JOIN(WORKER(tid));
A_PTHREAD_M_DESTROY(&WORKER(has_job_mutex)); A_MUTEX_DESTROY(&WORKER(has_job_mutex));
A_PTHREAD_C_DESTROY(&WORKER(has_job_cond)); A_COND_DESTROY(&WORKER(has_job_cond));
# undef WORKER # undef WORKER
} }
A_PTHREAD_M_DESTROY(&pool->free_workers_mutex); A_MUTEX_DESTROY(&pool->free_workers_mutex);
A_PTHREAD_C_DESTROY(&pool->free_workers_cond); A_COND_DESTROY(&pool->free_workers_cond);
free(pool->workers); free(pool->workers);
} }

View File

@@ -35,18 +35,18 @@
#include <sys/syscall.h> #include <sys/syscall.h>
#define A_PTHREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg)) #define A_THREAD_CREATE(_tid, _func, _arg) assert(!pthread_create(_tid, NULL, _func, _arg))
#define A_PTHREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL)) #define A_THREAD_JOIN(_tid) assert(!pthread_join(_tid, NULL))
#define A_PTHREAD_M_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL)) #define A_MUTEX_INIT(_mutex) assert(!pthread_mutex_init(_mutex, NULL))
#define A_PTHREAD_M_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex)) #define A_MUTEX_DESTROY(_mutex) assert(!pthread_mutex_destroy(_mutex))
#define A_PTHREAD_M_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex)) #define A_MUTEX_LOCK(_mutex) assert(!pthread_mutex_lock(_mutex))
#define A_PTHREAD_M_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex)) #define A_MUTEX_UNLOCK(_mutex) assert(!pthread_mutex_unlock(_mutex))
#define A_PTHREAD_C_INIT(_cond) assert(!pthread_cond_init(_cond, NULL)) #define A_COND_INIT(_cond) assert(!pthread_cond_init(_cond, NULL))
#define A_PTHREAD_C_DESTROY(_cond) assert(!pthread_cond_destroy(_cond)) #define A_COND_DESTROY(_cond) assert(!pthread_cond_destroy(_cond))
#define A_PTHREAD_C_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__)) #define A_COND_SIGNAL(...) assert(!pthread_cond_signal(__VA_ARGS__))
#define A_PTHREAD_C_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); } #define A_COND_WAIT_TRUE(_var, _cond, _mutex) { while(!_var) assert(!pthread_cond_wait(_cond, _mutex)); }
#define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest))))) #define A_CALLOC(_dest, _nmemb) assert((_dest = calloc(_nmemb, sizeof(*(_dest)))))
#define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest))))) #define A_REALLOC(_dest, _nmemb) assert((_dest = realloc(_dest, _nmemb * sizeof(*(_dest)))))