jpeg in a separate thread

This commit is contained in:
Maxim Devaev
2024-03-03 00:24:21 +02:00
parent f2f560a345
commit ef47fa4c74
3 changed files with 128 additions and 100 deletions

View File

@@ -366,7 +366,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
} while (true); } while (true);
*hw = &run->hw_bufs[buf.index]; *hw = &run->hw_bufs[buf.index];
atomic_store(&(*hw)->busy, 0); atomic_store(&(*hw)->refs, 0);
(*hw)->raw.dma_fd = (*hw)->dma_fd; (*hw)->raw.dma_fd = (*hw)->dma_fd;
(*hw)->raw.used = buf.bytesused; (*hw)->raw.used = buf.bytesused;
(*hw)->raw.width = run->width; (*hw)->raw.width = run->width;
@@ -383,6 +383,7 @@ int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw) {
} }
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) { int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) {
assert(atomic_load(&hw->refs) == 0);
const uint index = hw->buf.index; const uint index = hw->buf.index;
_D_LOG_DEBUG("Releasing device buffer=%u ...", index); _D_LOG_DEBUG("Releasing device buffer=%u ...", index);
if (us_xioctl(dev->run->fd, VIDIOC_QBUF, &hw->buf) < 0) { if (us_xioctl(dev->run->fd, VIDIOC_QBUF, &hw->buf) < 0) {
@@ -390,10 +391,17 @@ int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw) {
return -1; return -1;
} }
hw->grabbed = false; hw->grabbed = false;
atomic_store(&hw->busy, 0);
return 0; return 0;
} }
void us_device_buffer_incref(us_hw_buffer_s *hw) {
atomic_fetch_add(&hw->refs, 1);
}
void us_device_buffer_decref(us_hw_buffer_s *hw) {
atomic_fetch_sub(&hw->refs, 1);
}
int _device_wait_buffer(us_device_s *dev) { int _device_wait_buffer(us_device_s *dev) {
us_device_runtime_s *const run = dev->run; us_device_runtime_s *const run = dev->run;
@@ -834,7 +842,7 @@ static int _device_open_io_method_mmap(us_device_s *dev) {
} }
us_hw_buffer_s *hw = &run->hw_bufs[run->n_bufs]; us_hw_buffer_s *hw = &run->hw_bufs[run->n_bufs];
atomic_init(&hw->busy, false); atomic_init(&hw->refs, 0);
const uz buf_size = (run->capture_mplane ? buf.m.planes[0].length : buf.length); const uz buf_size = (run->capture_mplane ? buf.m.planes[0].length : buf.length);
const off_t buf_offset = (run->capture_mplane ? buf.m.planes[0].m.mem_offset : buf.m.offset); const off_t buf_offset = (run->capture_mplane ? buf.m.planes[0].m.mem_offset : buf.m.offset);

View File

@@ -48,7 +48,7 @@ typedef struct {
struct v4l2_buffer buf; struct v4l2_buffer buf;
int dma_fd; int dma_fd;
bool grabbed; bool grabbed;
atomic_int busy; atomic_int refs;
} us_hw_buffer_s; } us_hw_buffer_s;
typedef struct { typedef struct {
@@ -132,3 +132,6 @@ void us_device_close(us_device_s *dev);
int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw); int us_device_grab_buffer(us_device_s *dev, us_hw_buffer_s **hw);
int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw); int us_device_release_buffer(us_device_s *dev, us_hw_buffer_s *hw);
void us_device_buffer_incref(us_hw_buffer_s *hw);
void us_device_buffer_decref(us_hw_buffer_s *hw);

View File

@@ -26,7 +26,6 @@
#include <stdatomic.h> #include <stdatomic.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include <assert.h>
#include <pthread.h> #include <pthread.h>
@@ -49,6 +48,13 @@
#endif #endif
typedef struct {
pthread_t tid;
us_queue_s *queue;
us_stream_s *stream;
atomic_bool *stop;
} _jpeg_context_s;
typedef struct { typedef struct {
pthread_t tid; pthread_t tid;
us_device_s *dev; us_device_s *dev;
@@ -66,11 +72,11 @@ typedef struct {
} _releaser_context_s; } _releaser_context_s;
static void *_jpeg_thread(void *v_ctx);
static void *_h264_thread(void *v_ctx); static void *_h264_thread(void *v_ctx);
static void *_releaser_thread(void *v_ctx); static void *_releaser_thread(void *v_ctx);
static bool _stream_has_any_clients(us_stream_s *stream); static bool _stream_has_any_clients(us_stream_s *stream);
static bool _stream_slowdown(us_stream_s *stream);
static int _stream_init_loop(us_stream_s *stream); static int _stream_init_loop(us_stream_s *stream);
static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame); static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame);
static void _stream_check_suicide(us_stream_s *stream); static void _stream_check_suicide(us_stream_s *stream);
@@ -142,6 +148,13 @@ void us_stream_loop(us_stream_s *stream) {
US_THREAD_CREATE(releasers[index].tid, _releaser_thread, &releasers[index]); US_THREAD_CREATE(releasers[index].tid, _releaser_thread, &releasers[index]);
} }
_jpeg_context_s jpeg_ctx = {
.queue = us_queue_init(stream->dev->run->n_bufs),
.stream = stream,
.stop = &threads_stop,
};
US_THREAD_CREATE(jpeg_ctx.tid, _jpeg_thread, &jpeg_ctx);
_h264_context_s h264_ctx; _h264_context_s h264_ctx;
if (run->h264 != NULL) { if (run->h264 != NULL) {
h264_ctx.dev = stream->dev; h264_ctx.dev = stream->dev;
@@ -151,94 +164,53 @@ void us_stream_loop(us_stream_s *stream) {
US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx); US_THREAD_CREATE(h264_ctx.tid, _h264_thread, &h264_ctx);
} }
ldf grab_after = 0;
uint fluency_passed = 0;
uint captured_fps_accum = 0; uint captured_fps_accum = 0;
sll captured_fps_ts = 0; sll captured_fps_ts = 0;
US_LOG_INFO("Capturing ..."); US_LOG_INFO("Capturing ...");
uint slowdown_count = 0;
while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) { while (!atomic_load(&run->stop) && !atomic_load(&threads_stop)) {
_stream_check_suicide(stream); _stream_check_suicide(stream);
if (stream->slowdown && !_stream_has_any_clients(stream)) {
US_SEP_DEBUG('-'); usleep(100 * 1000);
US_LOG_DEBUG("Waiting for worker ..."); slowdown_count = (slowdown_count + 1) % 10;
if (slowdown_count > 0) {
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool); continue;
us_encoder_job_s *const ready_job = ready_wr->job;
if (ready_job->hw != NULL) {
assert(!us_queue_put(releasers[ready_job->hw->buf.index].queue, ready_job->hw, 0));
atomic_fetch_sub(&ready_job->hw->busy, 1);
ready_job->hw = NULL;
if (ready_wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
} }
} }
const bool h264_force_key = _stream_slowdown(stream);
if (atomic_load(&run->stop) || atomic_load(&threads_stop)) {
goto close;
}
us_hw_buffer_s *hw; us_hw_buffer_s *hw;
const int buf_index = us_device_grab_buffer(stream->dev, &hw); const int buf_index = us_device_grab_buffer(stream->dev, &hw);
switch (buf_index) { switch (buf_index) {
case -3: continue; // Broken frame case -3: continue; // Broken frame
case -2: // Persistent timeout case -2: // Persistent timeout
case -1: goto close; // Any error case -1: goto close; // Error
} }
assert(buf_index >= 0); assert(buf_index >= 0);
const sll now_sec_ts = us_floor_ms(us_get_now_monotonic());
if (now_sec_ts != captured_fps_ts) {
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
atomic_store(&run->http_captured_fps, captured_fps_accum);
captured_fps_accum = 0;
captured_fps_ts = now_sec_ts;
}
captured_fps_accum += 1;
# ifdef WITH_GPIO # ifdef WITH_GPIO
us_gpio_set_stream_online(true); us_gpio_set_stream_online(true);
# endif # endif
const ldf now_ts = us_get_now_monotonic(); us_device_buffer_incref(hw); // JPEG
us_queue_put(jpeg_ctx.queue, hw, 0);
if (now_ts < grab_after) { if (run->h264 != NULL) {
fluency_passed += 1; us_device_buffer_incref(hw); // H264
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf", us_queue_put(h264_ctx.queue, hw, 0);
fluency_passed, now_ts, grab_after);
assert(!us_queue_put(releasers[hw->buf.index].queue, hw, 0));
} else {
int hw_busy = 1;
if (run->h264 != NULL) {
hw_busy += 1;
}
atomic_store(&hw->busy, hw_busy);
fluency_passed = 0;
const sll now_sec_ts = us_floor_ms(now_ts);
if (now_sec_ts != captured_fps_ts) {
US_LOG_PERF_FPS("A new second has come; captured_fps=%u", captured_fps_accum);
atomic_store(&run->http_captured_fps, captured_fps_accum);
captured_fps_accum = 0;
captured_fps_ts = now_sec_ts;
}
captured_fps_accum += 1;
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
grab_after = now_ts + fluency_delay;
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
ready_job->hw = hw;
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", buf_index, ready_wr->name);
_SINK_PUT(raw_sink, &hw->raw);
if (run->h264 != NULL) {
us_queue_put(h264_ctx.queue, hw, h264_force_key);
}
} }
us_queue_put(releasers[buf_index].queue, hw, 0); // Plan to release
_SINK_PUT(raw_sink, &hw->raw);
} }
close: close:
@@ -249,6 +221,9 @@ void us_stream_loop(us_stream_s *stream) {
us_queue_destroy(h264_ctx.queue); us_queue_destroy(h264_ctx.queue);
} }
US_THREAD_JOIN(jpeg_ctx.tid);
us_queue_destroy(jpeg_ctx.queue);
for (uint index = 0; index < n_releasers; ++index) { for (uint index = 0; index < n_releasers; ++index) {
US_THREAD_JOIN(releasers[index].tid); US_THREAD_JOIN(releasers[index].tid);
us_queue_destroy(releasers[index].queue); us_queue_destroy(releasers[index].queue);
@@ -273,14 +248,66 @@ void us_stream_loop_break(us_stream_s *stream) {
atomic_store(&stream->run->stop, true); atomic_store(&stream->run->stop, true);
} }
static void *_jpeg_thread(void *v_ctx) {
_jpeg_context_s *ctx = v_ctx;
us_stream_s *stream = ctx->stream;
ldf grab_after = 0;
uint fluency_passed = 0;
while (!atomic_load(ctx->stop)) {
us_worker_s *const ready_wr = us_workers_pool_wait(stream->enc->run->pool);
us_encoder_job_s *const ready_job = ready_wr->job;
if (ready_job->hw != NULL) {
us_device_buffer_decref(ready_job->hw);
ready_job->hw = NULL;
if (ready_wr->job_failed) {
// pass
} else if (ready_wr->job_timely) {
_stream_expose_frame(stream, ready_job->dest);
US_LOG_PERF("##### Encoded JPEG exposed; worker=%s, latency=%.3Lf",
ready_wr->name, us_get_now_monotonic() - ready_job->dest->grab_ts);
} else {
US_LOG_PERF("----- Encoded JPEG dropped; worker=%s", ready_wr->name);
}
}
us_hw_buffer_s *hw;
if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
continue;
}
const ldf now_ts = us_get_now_monotonic();
if (now_ts < grab_after) {
fluency_passed += 1;
US_LOG_VERBOSE("Passed %u frames for fluency: now=%.03Lf, grab_after=%.03Lf",
fluency_passed, now_ts, grab_after);
us_device_buffer_decref(hw);
continue;
}
fluency_passed = 0;
const ldf fluency_delay = us_workers_pool_get_fluency_delay(stream->enc->run->pool, ready_wr);
grab_after = now_ts + fluency_delay;
US_LOG_VERBOSE("Fluency: delay=%.03Lf, grab_after=%.03Lf", fluency_delay, grab_after);
ready_job->hw = hw;
us_workers_pool_assign(stream->enc->run->pool, ready_wr);
US_LOG_DEBUG("Assigned new frame in buffer=%d to worker=%s", hw->buf.index, ready_wr->name);
}
return NULL;
}
static void *_h264_thread(void *v_ctx) { static void *_h264_thread(void *v_ctx) {
_h264_context_s *ctx = v_ctx; _h264_context_s *ctx = v_ctx;
while (!atomic_load(ctx->stop)) { while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw; us_hw_buffer_s *hw;
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) { if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
us_h264_stream_process(ctx->h264, &hw->raw, false); continue;
atomic_fetch_sub(&hw->busy, 1);
} }
us_h264_stream_process(ctx->h264, &hw->raw, false);
us_device_buffer_decref(hw);
} }
return NULL; return NULL;
} }
@@ -289,21 +316,23 @@ static void *_releaser_thread(void *v_ctx) {
_releaser_context_s *ctx = v_ctx; _releaser_context_s *ctx = v_ctx;
while (!atomic_load(ctx->stop)) { while (!atomic_load(ctx->stop)) {
us_hw_buffer_s *hw; us_hw_buffer_s *hw;
if (!us_queue_get(ctx->queue, (void**)&hw, 0.1)) { if (us_queue_get(ctx->queue, (void**)&hw, 0.1) < 0) {
while (atomic_load(&hw->busy) > 0) { continue;
if (atomic_load(ctx->stop)) { }
break; while (atomic_load(&hw->refs) > 0) {
} if (atomic_load(ctx->stop)) {
usleep(5 * 1000); goto done;
}
US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
break;
} }
usleep(5 * 1000);
}
US_MUTEX_LOCK(*ctx->mutex);
const int released = us_device_release_buffer(ctx->dev, hw);
US_MUTEX_UNLOCK(*ctx->mutex);
if (released < 0) {
goto done;
} }
} }
done:
atomic_store(ctx->stop, true); // Stop all other guys on error atomic_store(ctx->stop, true); // Stop all other guys on error
return NULL; return NULL;
} }
@@ -318,18 +347,6 @@ static bool _stream_has_any_clients(us_stream_s *stream) {
); );
} }
static bool _stream_slowdown(us_stream_s *stream) {
if (stream->slowdown) {
unsigned count = 0;
while (count < 10 && !atomic_load(&stream->run->stop) && !_stream_has_any_clients(stream)) {
usleep(100000);
++count;
}
return (count >= 10);
}
return false;
}
static int _stream_init_loop(us_stream_s *stream) { static int _stream_init_loop(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run; us_stream_runtime_s *const run = stream->run;
@@ -447,10 +464,10 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame) {
} }
static void _stream_check_suicide(us_stream_s *stream) { static void _stream_check_suicide(us_stream_s *stream) {
us_stream_runtime_s *const run = stream->run; if (stream->exit_on_no_clients == 0) {
if (stream->exit_on_no_clients <= 0) {
return; return;
} }
us_stream_runtime_s *const run = stream->run;
const ldf now_ts = us_get_now_monotonic(); const ldf now_ts = us_get_now_monotonic();
const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds const ull http_last_request_ts = atomic_load(&run->http_last_request_ts); // Seconds
if (_stream_has_any_clients(stream)) { if (_stream_has_any_clients(stream)) {