mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-12 18:43:42 +00:00
releaser threads
This commit is contained in:
@@ -49,7 +49,17 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
us_device_s *dev;
|
||||||
|
us_queue_s *queue;
|
||||||
|
pthread_mutex_t *mutex;
|
||||||
|
atomic_bool *stop;
|
||||||
|
} _releaser_context_s;
|
||||||
|
|
||||||
|
|
||||||
|
static void *_releaser_thread(void *v_ctx);
|
||||||
|
|
||||||
|
static void _stream_release_buffer(us_stream_s *stream, us_hw_buffer_s *hw);
|
||||||
static bool _stream_is_stopped(us_stream_s *stream);
|
static bool _stream_is_stopped(us_stream_s *stream);
|
||||||
static bool _stream_has_any_clients(us_stream_s *stream);
|
static bool _stream_has_any_clients(us_stream_s *stream);
|
||||||
static bool _stream_slowdown(us_stream_s *stream);
|
static bool _stream_slowdown(us_stream_s *stream);
|
||||||
@@ -74,6 +84,8 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
|
|||||||
us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
|
us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
|
||||||
us_stream_runtime_s *run;
|
us_stream_runtime_s *run;
|
||||||
US_CALLOC(run, 1);
|
US_CALLOC(run, 1);
|
||||||
|
US_MUTEX_INIT(run->release_mutex);
|
||||||
|
atomic_init(&run->release_stop, false);
|
||||||
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
|
US_RING_INIT_WITH_ITEMS(run->http_jpeg_ring, 4, us_frame_init);
|
||||||
atomic_init(&run->http_has_clients, false);
|
atomic_init(&run->http_has_clients, false);
|
||||||
atomic_init(&run->http_last_request_ts, 0);
|
atomic_init(&run->http_last_request_ts, 0);
|
||||||
@@ -96,6 +108,7 @@ us_stream_s *us_stream_init(us_device_s *dev, us_encoder_s *enc) {
|
|||||||
void us_stream_destroy(us_stream_s *stream) {
|
void us_stream_destroy(us_stream_s *stream) {
|
||||||
us_blank_destroy(stream->run->blank);
|
us_blank_destroy(stream->run->blank);
|
||||||
US_RING_DELETE_WITH_ITEMS(stream->run->http_jpeg_ring, us_frame_destroy);
|
US_RING_DELETE_WITH_ITEMS(stream->run->http_jpeg_ring, us_frame_destroy);
|
||||||
|
US_MUTEX_DESTROY(stream->run->release_mutex);
|
||||||
free(stream->run);
|
free(stream->run);
|
||||||
free(stream);
|
free(stream);
|
||||||
}
|
}
|
||||||
@@ -113,6 +126,19 @@ void us_stream_loop(us_stream_s *stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
while (!_stream_init_loop(stream)) {
|
while (!_stream_init_loop(stream)) {
|
||||||
|
const uint n_releasers = stream->dev->run->n_bufs;
|
||||||
|
US_CALLOC(run->releasers, n_releasers);
|
||||||
|
for (uint index = 0; index < n_releasers; ++index) {
|
||||||
|
run->releasers[index].queue = us_queue_init(1);
|
||||||
|
_releaser_context_s *ctx;
|
||||||
|
US_CALLOC(ctx, 1);
|
||||||
|
ctx->dev = stream->dev;
|
||||||
|
ctx->queue = run->releasers[index].queue;
|
||||||
|
ctx->mutex = &run->release_mutex;
|
||||||
|
ctx->stop = &run->release_stop;
|
||||||
|
US_THREAD_CREATE(run->releasers[index].tid, _releaser_thread, ctx);
|
||||||
|
}
|
||||||
|
|
||||||
ldf grab_after = 0;
|
ldf grab_after = 0;
|
||||||
uint fluency_passed = 0;
|
uint fluency_passed = 0;
|
||||||
uint captured_fps_accum = 0;
|
uint captured_fps_accum = 0;
|
||||||
@@ -120,7 +146,7 @@ void us_stream_loop(us_stream_s *stream) {
|
|||||||
|
|
||||||
US_LOG_INFO("Capturing ...");
|
US_LOG_INFO("Capturing ...");
|
||||||
|
|
||||||
while (!_stream_is_stopped(stream)) {
|
while (!_stream_is_stopped(stream) && !atomic_load(&run->release_stop)) {
|
||||||
US_SEP_DEBUG('-');
|
US_SEP_DEBUG('-');
|
||||||
US_LOG_DEBUG("Waiting for worker ...");
|
US_LOG_DEBUG("Waiting for worker ...");
|
||||||
|
|
||||||
@@ -128,9 +154,7 @@ void us_stream_loop(us_stream_s *stream) {
|
|||||||
us_encoder_job_s *const ready_job = ready_wr->job;
|
us_encoder_job_s *const ready_job = ready_wr->job;
|
||||||
|
|
||||||
if (ready_job->hw != NULL) {
|
if (ready_job->hw != NULL) {
|
||||||
if (us_device_release_buffer(stream->dev, ready_job->hw) < 0) {
|
_stream_release_buffer(stream, ready_job->hw);
|
||||||
goto close;
|
|
||||||
}
|
|
||||||
ready_job->hw = NULL;
|
ready_job->hw = NULL;
|
||||||
if (ready_wr->job_failed) {
|
if (ready_wr->job_failed) {
|
||||||
// pass
|
// pass
|
||||||
@@ -167,9 +191,7 @@ void us_stream_loop(us_stream_s *stream) {
|
|||||||
fluency_passed += 1;
|
fluency_passed += 1;
|
||||||
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
|
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
|
||||||
fluency_passed, now_ts, grab_after);
|
fluency_passed, now_ts, grab_after);
|
||||||
if (us_device_release_buffer(stream->dev, hw) < 0) {
|
_stream_release_buffer(stream, hw);
|
||||||
goto close;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
fluency_passed = 0;
|
fluency_passed = 0;
|
||||||
|
|
||||||
@@ -196,6 +218,14 @@ void us_stream_loop(us_stream_s *stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
close:
|
close:
|
||||||
|
atomic_store(&run->release_stop, true);
|
||||||
|
for (uint index = 0; index < n_releasers; ++index) {
|
||||||
|
US_THREAD_JOIN(run->releasers[index].tid);
|
||||||
|
us_queue_destroy(run->releasers[index].queue);
|
||||||
|
}
|
||||||
|
free(run->releasers);
|
||||||
|
atomic_store(&run->release_stop, false);
|
||||||
|
|
||||||
us_encoder_close(stream->enc);
|
us_encoder_close(stream->enc);
|
||||||
us_device_close(stream->dev);
|
us_device_close(stream->dev);
|
||||||
|
|
||||||
@@ -211,6 +241,28 @@ void us_stream_loop_break(us_stream_s *stream) {
|
|||||||
atomic_store(&stream->run->stop, true);
|
atomic_store(&stream->run->stop, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *_releaser_thread(void *v_ctx) {
|
||||||
|
_releaser_context_s *ctx = v_ctx;
|
||||||
|
while (!atomic_load(ctx->stop)) {
|
||||||
|
us_hw_buffer_s *hw;
|
||||||
|
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) {
|
||||||
|
US_MUTEX_LOCK(*ctx->mutex);
|
||||||
|
const int released = us_device_release_buffer(ctx->dev, hw);
|
||||||
|
US_MUTEX_UNLOCK(*ctx->mutex);
|
||||||
|
if (released < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic_store(ctx->stop, true); // Stop all other guys
|
||||||
|
free(ctx);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _stream_release_buffer(us_stream_s *stream, us_hw_buffer_s *hw) {
|
||||||
|
assert(!us_queue_put(stream->run->releasers[hw->buf.index].queue, hw, 0));
|
||||||
|
}
|
||||||
|
|
||||||
static bool _stream_is_stopped(us_stream_s *stream) {
|
static bool _stream_is_stopped(us_stream_s *stream) {
|
||||||
us_stream_runtime_s *const run = stream->run;
|
us_stream_runtime_s *const run = stream->run;
|
||||||
const bool stop = atomic_load(&run->stop);
|
const bool stop = atomic_load(&run->stop);
|
||||||
|
|||||||
@@ -24,7 +24,10 @@
|
|||||||
|
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
#include "../libs/types.h"
|
#include "../libs/types.h"
|
||||||
|
#include "../libs/queue.h"
|
||||||
#include "../libs/ring.h"
|
#include "../libs/ring.h"
|
||||||
#include "../libs/memsink.h"
|
#include "../libs/memsink.h"
|
||||||
#include "../libs/device.h"
|
#include "../libs/device.h"
|
||||||
@@ -35,6 +38,15 @@
|
|||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
pthread_t tid;
|
||||||
|
us_queue_s *queue;
|
||||||
|
} us_stream_releaser_s;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
us_stream_releaser_s *releasers;
|
||||||
|
pthread_mutex_t release_mutex;
|
||||||
|
atomic_bool release_stop;
|
||||||
|
|
||||||
us_ring_s *http_jpeg_ring;
|
us_ring_s *http_jpeg_ring;
|
||||||
atomic_bool http_has_clients;
|
atomic_bool http_has_clients;
|
||||||
atomic_ullong http_last_request_ts; // Seconds
|
atomic_ullong http_last_request_ts; // Seconds
|
||||||
|
|||||||
Reference in New Issue
Block a user