mirror of
https://github.com/pikvm/ustreamer.git
synced 2026-03-13 19:13:41 +00:00
janus: using ring buffers for rtp pipelines
This commit is contained in:
@@ -33,7 +33,7 @@
|
|||||||
#include "uslibs/tools.h"
|
#include "uslibs/tools.h"
|
||||||
#include "uslibs/threading.h"
|
#include "uslibs/threading.h"
|
||||||
#include "uslibs/list.h"
|
#include "uslibs/list.h"
|
||||||
#include "uslibs/queue.h"
|
#include "uslibs/ring.h"
|
||||||
|
|
||||||
#include "logging.h"
|
#include "logging.h"
|
||||||
#include "rtp.h"
|
#include "rtp.h"
|
||||||
@@ -54,10 +54,10 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
|
|||||||
|
|
||||||
atomic_init(&client->stop, false);
|
atomic_init(&client->stop, false);
|
||||||
|
|
||||||
client->video_queue = us_queue_init(2048);
|
US_RING_INIT_WITH_ITEMS(client->video_ring, 2048, us_rtp_init);
|
||||||
US_THREAD_CREATE(client->video_tid, _video_thread, client);
|
US_THREAD_CREATE(client->video_tid, _video_thread, client);
|
||||||
|
|
||||||
client->audio_queue = us_queue_init(64);
|
US_RING_INIT_WITH_ITEMS(client->audio_ring, 64, us_rtp_init);
|
||||||
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
|
US_THREAD_CREATE(client->audio_tid, _audio_thread, client);
|
||||||
|
|
||||||
return client;
|
return client;
|
||||||
@@ -65,14 +65,12 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
|
|||||||
|
|
||||||
void us_janus_client_destroy(us_janus_client_s *client) {
|
void us_janus_client_destroy(us_janus_client_s *client) {
|
||||||
atomic_store(&client->stop, true);
|
atomic_store(&client->stop, true);
|
||||||
us_queue_put(client->video_queue, NULL, 0);
|
|
||||||
us_queue_put(client->audio_queue, NULL, 0);
|
|
||||||
|
|
||||||
US_THREAD_JOIN(client->video_tid);
|
US_THREAD_JOIN(client->video_tid);
|
||||||
US_QUEUE_DELETE_WITH_ITEMS(client->video_queue, us_rtp_destroy);
|
US_RING_DELETE_WITH_ITEMS(client->video_ring, us_rtp_destroy);
|
||||||
|
|
||||||
US_THREAD_JOIN(client->audio_tid);
|
US_THREAD_JOIN(client->audio_tid);
|
||||||
US_QUEUE_DELETE_WITH_ITEMS(client->audio_queue, us_rtp_destroy);
|
US_RING_DELETE_WITH_ITEMS(client->audio_ring, us_rtp_destroy);
|
||||||
|
|
||||||
free(client);
|
free(client);
|
||||||
}
|
}
|
||||||
@@ -82,12 +80,15 @@ void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) {
|
|||||||
atomic_load(&client->transmit)
|
atomic_load(&client->transmit)
|
||||||
&& (rtp->video || atomic_load(&client->transmit_audio))
|
&& (rtp->video || atomic_load(&client->transmit_audio))
|
||||||
) {
|
) {
|
||||||
us_rtp_s *const new = us_rtp_dup(rtp);
|
us_ring_s *const ring = (rtp->video ? client->video_ring : client->audio_ring);
|
||||||
if (us_queue_put((new->video ? client->video_queue : client->audio_queue), new, 0) != 0) {
|
const int ri = us_ring_producer_acquire(ring, 0);
|
||||||
US_JLOG_ERROR("client", "Session %p %s queue is full",
|
if (ri < 0) {
|
||||||
client->session, (new->video ? "video" : "audio"));
|
US_JLOG_ERROR("client", "Session %p %s ring is full",
|
||||||
us_rtp_destroy(new);
|
client->session, (rtp->video ? "video" : "audio"));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
memcpy(ring->items[ri], rtp, sizeof(us_rtp_s));
|
||||||
|
us_ring_producer_release(ring, ri);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -101,45 +102,45 @@ static void *_audio_thread(void *v_client) {
|
|||||||
|
|
||||||
static void *_common_thread(void *v_client, bool video) {
|
static void *_common_thread(void *v_client, bool video) {
|
||||||
us_janus_client_s *const client = (us_janus_client_s *)v_client;
|
us_janus_client_s *const client = (us_janus_client_s *)v_client;
|
||||||
us_queue_s *const queue = (video ? client->video_queue : client->audio_queue);
|
us_ring_s *const ring = (video ? client->video_ring : client->audio_ring);
|
||||||
assert(queue != NULL); // Audio may be NULL
|
assert(ring != NULL); // Audio may be NULL
|
||||||
|
|
||||||
while (!atomic_load(&client->stop)) {
|
while (!atomic_load(&client->stop)) {
|
||||||
us_rtp_s *rtp;
|
const int ri = us_ring_consumer_acquire(ring, 0.1);
|
||||||
if (!us_queue_get(queue, (void **)&rtp, 0.1)) {
|
if (ri < 0) {
|
||||||
if (rtp == NULL) {
|
continue;
|
||||||
break;
|
}
|
||||||
}
|
us_rtp_s rtp;
|
||||||
|
memcpy(&rtp, ring->items[ri], sizeof(us_rtp_s));
|
||||||
|
us_ring_consumer_release(ring, ri);
|
||||||
|
|
||||||
if (
|
if (
|
||||||
atomic_load(&client->transmit)
|
atomic_load(&client->transmit)
|
||||||
&& (video || atomic_load(&client->transmit_audio))
|
&& (video || atomic_load(&client->transmit_audio))
|
||||||
) {
|
) {
|
||||||
janus_plugin_rtp packet = {
|
janus_plugin_rtp packet = {
|
||||||
.video = rtp->video,
|
.video = rtp.video,
|
||||||
.buffer = (char *)rtp->datagram,
|
.buffer = (char*)rtp.datagram,
|
||||||
.length = rtp->used,
|
.length = rtp.used,
|
||||||
# if JANUS_PLUGIN_API_VERSION >= 100
|
# if JANUS_PLUGIN_API_VERSION >= 100
|
||||||
// The uStreamer Janus plugin places video in stream index 0 and audio
|
// The uStreamer Janus plugin places video in stream index 0 and audio
|
||||||
// (if available) in stream index 1.
|
// (if available) in stream index 1.
|
||||||
.mindex = (rtp->video ? 0 : 1),
|
.mindex = (rtp.video ? 0 : 1),
|
||||||
# endif
|
# endif
|
||||||
};
|
};
|
||||||
janus_plugin_rtp_extensions_reset(&packet.extensions);
|
janus_plugin_rtp_extensions_reset(&packet.extensions);
|
||||||
/*if (rtp->zero_playout_delay) {
|
/*if (rtp->zero_playout_delay) {
|
||||||
// https://github.com/pikvm/pikvm/issues/784
|
// https://github.com/pikvm/pikvm/issues/784
|
||||||
packet.extensions.min_delay = 0;
|
packet.extensions.min_delay = 0;
|
||||||
packet.extensions.max_delay = 0;
|
packet.extensions.max_delay = 0;
|
||||||
} else {
|
} else {
|
||||||
packet.extensions.min_delay = 0;
|
packet.extensions.min_delay = 0;
|
||||||
// 10s - Chromium/WebRTC default
|
// 10s - Chromium/WebRTC default
|
||||||
// 3s - Firefox default
|
// 3s - Firefox default
|
||||||
packet.extensions.max_delay = 300; // == 3s, i.e. 10ms granularity
|
packet.extensions.max_delay = 300; // == 3s, i.e. 10ms granularity
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
client->gw->relay_rtp(client->session, &packet);
|
client->gw->relay_rtp(client->session, &packet);
|
||||||
}
|
|
||||||
us_rtp_destroy(rtp);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|||||||
@@ -29,7 +29,7 @@
|
|||||||
|
|
||||||
#include "uslibs/types.h"
|
#include "uslibs/types.h"
|
||||||
#include "uslibs/list.h"
|
#include "uslibs/list.h"
|
||||||
#include "uslibs/queue.h"
|
#include "uslibs/ring.h"
|
||||||
|
|
||||||
#include "rtp.h"
|
#include "rtp.h"
|
||||||
|
|
||||||
@@ -44,8 +44,8 @@ typedef struct us_janus_client_sx {
|
|||||||
pthread_t audio_tid;
|
pthread_t audio_tid;
|
||||||
atomic_bool stop;
|
atomic_bool stop;
|
||||||
|
|
||||||
us_queue_s *video_queue;
|
us_ring_s *video_ring;
|
||||||
us_queue_s *audio_queue;
|
us_ring_s *audio_ring;
|
||||||
|
|
||||||
US_LIST_STRUCT(struct us_janus_client_sx);
|
US_LIST_STRUCT(struct us_janus_client_sx);
|
||||||
} us_janus_client_s;
|
} us_janus_client_s;
|
||||||
|
|||||||
@@ -31,26 +31,22 @@
|
|||||||
#include "uslibs/tools.h"
|
#include "uslibs/tools.h"
|
||||||
|
|
||||||
|
|
||||||
us_rtp_s *us_rtp_init(uint payload, bool video) {
|
us_rtp_s *us_rtp_init(void) {
|
||||||
us_rtp_s *rtp;
|
us_rtp_s *rtp;
|
||||||
US_CALLOC(rtp, 1);
|
US_CALLOC(rtp, 1);
|
||||||
rtp->payload = payload;
|
|
||||||
rtp->video = video;
|
|
||||||
rtp->ssrc = us_triple_u32(us_get_now_monotonic_u64());
|
|
||||||
return rtp;
|
return rtp;
|
||||||
}
|
}
|
||||||
|
|
||||||
us_rtp_s *us_rtp_dup(const us_rtp_s *rtp) {
|
|
||||||
us_rtp_s *new;
|
|
||||||
US_CALLOC(new, 1);
|
|
||||||
memcpy(new, rtp, sizeof(us_rtp_s));
|
|
||||||
return new;
|
|
||||||
}
|
|
||||||
|
|
||||||
void us_rtp_destroy(us_rtp_s *rtp) {
|
void us_rtp_destroy(us_rtp_s *rtp) {
|
||||||
free(rtp);
|
free(rtp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void us_rtp_assign(us_rtp_s *rtp, uint payload, bool video) {
|
||||||
|
rtp->payload = payload;
|
||||||
|
rtp->video = video;
|
||||||
|
rtp->ssrc = us_triple_u32(us_get_now_monotonic_u64());
|
||||||
|
}
|
||||||
|
|
||||||
void us_rtp_write_header(us_rtp_s *rtp, u32 pts, bool marked) {
|
void us_rtp_write_header(us_rtp_s *rtp, u32 pts, bool marked) {
|
||||||
u32 word0 = 0x80000000;
|
u32 word0 = 0x80000000;
|
||||||
if (marked) {
|
if (marked) {
|
||||||
|
|||||||
@@ -44,8 +44,8 @@ typedef struct {
|
|||||||
typedef void (*us_rtp_callback_f)(const us_rtp_s *rtp);
|
typedef void (*us_rtp_callback_f)(const us_rtp_s *rtp);
|
||||||
|
|
||||||
|
|
||||||
us_rtp_s *us_rtp_init(uint payload, bool video);
|
us_rtp_s *us_rtp_init(void);
|
||||||
us_rtp_s *us_rtp_dup(const us_rtp_s *rtp);
|
|
||||||
void us_rtp_destroy(us_rtp_s *rtp);
|
void us_rtp_destroy(us_rtp_s *rtp);
|
||||||
|
|
||||||
|
void us_rtp_assign(us_rtp_s *rtp, uint payload, bool video);
|
||||||
void us_rtp_write_header(us_rtp_s *rtp, u32 pts, bool marked);
|
void us_rtp_write_header(us_rtp_s *rtp, u32 pts, bool marked);
|
||||||
|
|||||||
@@ -32,7 +32,8 @@
|
|||||||
us_rtpa_s *us_rtpa_init(us_rtp_callback_f callback) {
|
us_rtpa_s *us_rtpa_init(us_rtp_callback_f callback) {
|
||||||
us_rtpa_s *rtpa;
|
us_rtpa_s *rtpa;
|
||||||
US_CALLOC(rtpa, 1);
|
US_CALLOC(rtpa, 1);
|
||||||
rtpa->rtp = us_rtp_init(111, false);
|
rtpa->rtp = us_rtp_init();
|
||||||
|
us_rtp_assign(rtpa->rtp, 111, false);
|
||||||
rtpa->callback = callback;
|
rtpa->callback = callback;
|
||||||
return rtpa;
|
return rtpa;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,7 +44,8 @@ static sz _find_annexb(const u8 *data, uz size);
|
|||||||
us_rtpv_s *us_rtpv_init(us_rtp_callback_f callback) {
|
us_rtpv_s *us_rtpv_init(us_rtp_callback_f callback) {
|
||||||
us_rtpv_s *rtpv;
|
us_rtpv_s *rtpv;
|
||||||
US_CALLOC(rtpv, 1);
|
US_CALLOC(rtpv, 1);
|
||||||
rtpv->rtp = us_rtp_init(96, true);
|
rtpv->rtp = us_rtp_init();
|
||||||
|
us_rtp_assign(rtpv->rtp, 96, true);
|
||||||
rtpv->callback = callback;
|
rtpv->callback = callback;
|
||||||
return rtpv;
|
return rtpv;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user