diff --git a/janus/src/memsinkfd.c b/janus/src/memsinkfd.c index fc4d042..d2f1f96 100644 --- a/janus/src/memsinkfd.c +++ b/janus/src/memsinkfd.c @@ -46,12 +46,15 @@ int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, uint64_t last_id) return -2; } -us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, uint64_t *frame_id) { +us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, uint64_t *frame_id, bool key_required) { us_frame_s *frame = us_frame_init(); us_frame_set_data(frame, mem->data, mem->used); US_FRAME_COPY_META(mem, frame); *frame_id = mem->id; mem->last_client_ts = us_get_now_monotonic(); + if (key_required) { + mem->key_requested = true; + } bool ok = true; if (frame->format != V4L2_PIX_FMT_H264) { diff --git a/janus/src/memsinkfd.h b/janus/src/memsinkfd.h index 0414abc..29d9e5e 100644 --- a/janus/src/memsinkfd.h +++ b/janus/src/memsinkfd.h @@ -22,6 +22,7 @@ #pragma once +#include #include #include @@ -35,4 +36,4 @@ int us_memsink_fd_wait_frame(int fd, us_memsink_shared_s* mem, uint64_t last_id); -us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, uint64_t *frame_id); +us_frame_s *us_memsink_fd_get_frame(int fd, us_memsink_shared_s *mem, uint64_t *frame_id, bool key_required); diff --git a/janus/src/plugin.c b/janus/src/plugin.c index 9468929..d4e467e 100644 --- a/janus/src/plugin.c +++ b/janus/src/plugin.c @@ -76,6 +76,7 @@ static pthread_mutex_t _g_audio_lock = PTHREAD_MUTEX_INITIALIZER; static atomic_bool _g_ready = false; static atomic_bool _g_stop = false; static atomic_bool _g_has_watchers = false; +static atomic_bool _g_key_required = false; #define _LOCK_VIDEO US_MUTEX_LOCK(_g_video_lock) @@ -149,10 +150,13 @@ static void *_video_sink_thread(UNUSED void *arg) { while (!_STOP && _HAS_WATCHERS) { const int result = us_memsink_fd_wait_frame(fd, mem, frame_id); if (result == 0) { - us_frame_s *const frame = us_memsink_fd_get_frame(fd, mem, &frame_id); + us_frame_s *const frame = us_memsink_fd_get_frame(fd, mem, &frame_id, atomic_load(&_g_key_required)); if (frame == NULL) { goto close_memsink; } +// if (frame->key) { +// atomic_store(&_g_key_required, false); +// } if (us_queue_put(_g_video_queue, frame, 0) != 0) { _IF_NOT_REPORTED({ US_JLOG_PERROR("video", "Video queue is full"); }); us_frame_destroy(frame); @@ -423,6 +427,7 @@ static struct janus_plugin_result *_plugin_handle_message( } else if (!strcmp(request_str, "watch")) { char *sdp; { +// atomic_store(&_g_key_required, true); char *const video_sdp = us_rtpv_make_sdp(_g_rtpv); if (video_sdp == NULL) { PUSH_ERROR(503, "Haven't received SPS/PPS from memsink yet"); diff --git a/man/ustreamer-dump.1 b/man/ustreamer-dump.1 index a15a8e0..c4a66b8 100644 --- a/man/ustreamer-dump.1 +++ b/man/ustreamer-dump.1 @@ -44,6 +44,9 @@ Limit the number of frames. Default: 0 (infinite). .TP .BR \-i ", "\-\-interval\ \fIsec Delay between reading frames (float). Default: 0. +.TP +.BR \-k ", " \-\-key +Request keyframe from the sink. Default: disabled. .SS "Logging options" .TP diff --git a/python/src/ustreamer.c b/python/src/ustreamer.c index ee312bb..f2d9caf 100644 --- a/python/src/ustreamer.c +++ b/python/src/ustreamer.c @@ -181,12 +181,18 @@ static int _wait_frame(_MemsinkObject *self) { return -2; } -static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *Py_UNUSED(ignored)) { +static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *args, PyObject *kwargs) { if (self->mem == NULL || self->fd <= 0) { PyErr_SetString(PyExc_RuntimeError, "Closed"); return NULL; } + bool key_required = false; + static char *kws[] = {"key_required", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|p", kws, &key_required)) { + return NULL; + } + switch (_wait_frame(self)) { case 0: break; case -2: Py_RETURN_NONE; @@ -198,6 +204,9 @@ static PyObject *_MemsinkObject_wait_frame(_MemsinkObject *self, PyObject *Py_UN self->frame_id = _MEM(id); self->frame_ts = us_get_now_monotonic(); _MEM(last_client_ts) = self->frame_ts; + if (key_required) { + _MEM(key_requested) = true; + } if (flock(self->fd, LOCK_UN) < 0) { return PyErr_SetFromErrno(PyExc_OSError); @@ -260,7 +269,7 @@ static PyMethodDef _MemsinkObject_methods[] = { ADD_METHOD("close", close, METH_NOARGS), ADD_METHOD("__enter__", enter, METH_NOARGS), ADD_METHOD("__exit__", exit, METH_VARARGS), - ADD_METHOD("wait_frame", wait_frame, METH_NOARGS), + ADD_METHOD("wait_frame", wait_frame, METH_VARARGS | METH_KEYWORDS), ADD_METHOD("is_opened", is_opened, METH_NOARGS), {}, # undef ADD_METHOD diff --git a/src/dump/main.c b/src/dump/main.c index 4b790b8..c59f689 100644 --- a/src/dump/main.c +++ b/src/dump/main.c @@ -48,6 +48,7 @@ enum _OPT_VALUES { _O_OUTPUT_JSON = 'j', _O_COUNT = 'c', _O_INTERVAL = 'i', + _O_KEY = 'k', _O_HELP = 'h', _O_VERSION = 'v', @@ -67,6 +68,7 @@ static const struct option _LONG_OPTS[] = { {"output-json", no_argument, NULL, _O_OUTPUT_JSON}, {"count", required_argument, NULL, _O_COUNT}, {"interval", required_argument, NULL, _O_INTERVAL}, + {"key", no_argument, NULL, _O_KEY}, {"log-level", required_argument, NULL, _O_LOG_LEVEL}, {"perf", no_argument, NULL, _O_PERF}, @@ -98,6 +100,7 @@ static void _install_signal_handlers(void); static int _dump_sink( const char *sink_name, unsigned sink_timeout, long long count, long double interval, + bool key_required, _output_context_s *ctx); static void _help(FILE *fp); @@ -113,6 +116,7 @@ int main(int argc, char *argv[]) { bool output_json = false; long long count = 0; long double interval = 0; + bool key_required = false; # define OPT_SET(_dest, _value) { \ _dest = _value; \ @@ -150,6 +154,7 @@ int main(int argc, char *argv[]) { case _O_OUTPUT_JSON: OPT_SET(output_json, true); case _O_COUNT: OPT_NUMBER("--count", count, 0, LLONG_MAX, 0); case _O_INTERVAL: OPT_LDOUBLE("--interval", interval, 0, 60); + case _O_KEY: OPT_SET(key_required, true); case _O_LOG_LEVEL: OPT_NUMBER("--log-level", us_g_log_level, US_LOG_LEVEL_INFO, US_LOG_LEVEL_DEBUG, 0); case _O_PERF: OPT_SET(us_g_log_level, US_LOG_LEVEL_PERF); @@ -186,7 +191,7 @@ int main(int argc, char *argv[]) { } _install_signal_handlers(); - const int retval = abs(_dump_sink(sink_name, sink_timeout, count, interval, &ctx)); + const int retval = abs(_dump_sink(sink_name, sink_timeout, count, interval, key_required, &ctx)); if (ctx.v_output && ctx.destroy) { ctx.destroy(ctx.v_output); } @@ -223,6 +228,7 @@ static void _install_signal_handlers(void) { static int _dump_sink( const char *sink_name, unsigned sink_timeout, long long count, long double interval, + bool key_required, _output_context_s *ctx) { if (count == 0) { @@ -245,8 +251,10 @@ static int _dump_sink( long double last_ts = 0; while (!_g_stop) { - int error = us_memsink_client_get(sink, frame); + const int error = us_memsink_client_get(sink, frame, key_required); if (error == 0) { + key_required = false; + const long double now = us_get_now_monotonic(); const long long now_second = us_floor_ms(now); @@ -322,6 +330,7 @@ static void _help(FILE *fp) { SAY(" -j|--output-json ──────── Format output as JSON. Required option --output. Default: disabled.\n"); SAY(" -c|--count ───────── Limit the number of frames. Default: 0 (infinite).\n"); SAY(" -i|--interval ───── Delay between reading frames (float). Default: 0.\n"); + SAY(" -k|--key ──────────────── Request keyframe from the sink. Default: disabled.\n"); SAY("Logging options:"); SAY("════════════════"); SAY(" --log-level ──── Verbosity level of messages from 0 (info) to 3 (debug)."); diff --git a/src/libs/memsink.c b/src/libs/memsink.c index c22f3d3..739412f 100644 --- a/src/libs/memsink.c +++ b/src/libs/memsink.c @@ -117,7 +117,7 @@ bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame) { return (has_clients || !US_FRAME_COMPARE_META_USED_NOTS(sink->mem, frame));; } -int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame) { +int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame, bool *const key_requested) { assert(sink->server); const long double now = us_get_now_monotonic(); @@ -133,6 +133,12 @@ int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame) { sink->last_id = us_get_now_id(); sink->mem->id = sink->last_id; + if (sink->mem->key_requested && frame->key) { + sink->mem->key_requested = false; + } + if (key_requested != NULL) { + *key_requested = sink->mem->key_requested; + } memcpy(sink->mem->data, frame->data, frame->used); sink->mem->used = frame->used; @@ -140,6 +146,7 @@ int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame) { sink->mem->magic = US_MEMSINK_MAGIC; sink->mem->version = US_MEMSINK_VERSION; + atomic_store(&sink->has_clients, (sink->mem->last_client_ts + sink->client_ttl > us_get_now_monotonic())); if (flock(sink->fd, LOCK_UN) < 0) { @@ -159,7 +166,7 @@ int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame) { return 0; } -int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame) { // cppcheck-suppress unusedFunction +int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool key_required) { // cppcheck-suppress unusedFunction assert(!sink->server); // Client only if (us_flock_timedwait_monotonic(sink->fd, sink->timeout) < 0) { @@ -185,6 +192,9 @@ int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame) { // cppcheck-s retval = 0; } sink->mem->last_client_ts = us_get_now_monotonic(); + if (key_required) { + sink->mem->key_requested = true; + } } done: diff --git a/src/libs/memsink.h b/src/libs/memsink.h index d38d45d..79cf3c8 100644 --- a/src/libs/memsink.h +++ b/src/libs/memsink.h @@ -63,6 +63,6 @@ us_memsink_s *us_memsink_init( void us_memsink_destroy(us_memsink_s *sink); bool us_memsink_server_check(us_memsink_s *sink, const us_frame_s *frame); -int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame); +int us_memsink_server_put(us_memsink_s *sink, const us_frame_s *frame, bool *const key_requested); -int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame); +int us_memsink_client_get(us_memsink_s *sink, us_frame_s *frame, bool key_required); diff --git a/src/libs/memsinksh.h b/src/libs/memsinksh.h index 583f2b7..6a90424 100644 --- a/src/libs/memsinksh.h +++ b/src/libs/memsinksh.h @@ -30,7 +30,7 @@ #define US_MEMSINK_MAGIC ((uint64_t)0xCAFEBABECAFEBABE) -#define US_MEMSINK_VERSION ((uint32_t)2) +#define US_MEMSINK_VERSION ((uint32_t)3) #ifndef US_CFG_MEMSINK_MAX_DATA # define US_CFG_MEMSINK_MAX_DATA 33554432 @@ -57,6 +57,7 @@ typedef struct { long double encode_end_ts; long double last_client_ts; + bool key_requested; uint8_t data[US_MEMSINK_MAX_DATA]; } us_memsink_shared_s; diff --git a/src/ustreamer/h264/stream.c b/src/ustreamer/h264/stream.c index d395e2c..2b6bf9a 100644 --- a/src/ustreamer/h264/stream.c +++ b/src/ustreamer/h264/stream.c @@ -56,9 +56,15 @@ void us_h264_stream_process(us_h264_stream_s *h264, const us_frame_s *frame, boo US_LOG_VERBOSE("H264: JPEG decoded; time=%.3Lf", us_get_now_monotonic() - now); } + if (h264->key_requested) { + US_LOG_INFO("H264: Requested keyframe by a sink client"); + h264->key_requested = false; + force_key = true; + } + bool online = false; if (!us_m2m_encoder_compress(h264->enc, frame, h264->dest, force_key)) { - online = !us_memsink_server_put(h264->sink, h264->dest); + online = !us_memsink_server_put(h264->sink, h264->dest, &h264->key_requested); } atomic_store(&h264->online, online); } diff --git a/src/ustreamer/h264/stream.h b/src/ustreamer/h264/stream.h index 416b740..c68a642 100644 --- a/src/ustreamer/h264/stream.h +++ b/src/ustreamer/h264/stream.h @@ -36,6 +36,7 @@ typedef struct { us_memsink_s *sink; + bool key_requested; us_frame_s *tmp_src; us_frame_s *dest; us_m2m_encoder_s *enc; diff --git a/src/ustreamer/stream.c b/src/ustreamer/stream.c index 4b4749e..fdf31dd 100644 --- a/src/ustreamer/stream.c +++ b/src/ustreamer/stream.c @@ -32,7 +32,7 @@ static void _stream_expose_frame(us_stream_s *stream, us_frame_s *frame, unsigne #define _SINK_PUT(x_sink, x_frame) { \ if (stream->x_sink && us_memsink_server_check(stream->x_sink, x_frame)) {\ - us_memsink_server_put(stream->x_sink, x_frame); \ + us_memsink_server_put(stream->x_sink, x_frame, NULL); \ } \ }