mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-31 22:25:38 -04:00
module-rtp: add packet timer
When multiple packets need to be flushed (because sess.latency is set and larger than ptime) use a timer to space the packets uniformly in the current quantum to avoid bursts. See !1873
This commit is contained in:
parent
3958bed5c3
commit
78055736a2
2 changed files with 85 additions and 11 deletions
|
|
@ -192,6 +192,17 @@ unexpected_ssrc:
|
|||
return -EINVAL;
|
||||
}
|
||||
|
||||
static void set_timer(struct impl *impl, uint64_t time, uint64_t itime)
|
||||
{
|
||||
struct itimerspec ts;
|
||||
ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC;
|
||||
ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC;
|
||||
ts.it_interval.tv_sec = itime / SPA_NSEC_PER_SEC;
|
||||
ts.it_interval.tv_nsec = itime % SPA_NSEC_PER_SEC;
|
||||
spa_system_timerfd_settime(impl->data_loop->system,
|
||||
impl->timer->fd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
|
||||
}
|
||||
|
||||
static inline void
|
||||
set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
|
||||
uint32_t offset, struct iovec *iov, uint32_t len)
|
||||
|
|
@ -202,7 +213,7 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
|
|||
iov[1].iov_base = buffer;
|
||||
}
|
||||
|
||||
static void rtp_audio_flush_packets(struct impl *impl)
|
||||
static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets)
|
||||
{
|
||||
int32_t avail, tosend;
|
||||
uint32_t stride, timestamp;
|
||||
|
|
@ -211,9 +222,10 @@ static void rtp_audio_flush_packets(struct impl *impl)
|
|||
|
||||
avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp);
|
||||
tosend = impl->psamples;
|
||||
|
||||
if (avail < tosend)
|
||||
return;
|
||||
goto done;
|
||||
|
||||
num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend));
|
||||
|
||||
stride = impl->stride;
|
||||
|
||||
|
|
@ -225,7 +237,7 @@ static void rtp_audio_flush_packets(struct impl *impl)
|
|||
iov[0].iov_base = &header;
|
||||
iov[0].iov_len = sizeof(header);
|
||||
|
||||
while (avail >= tosend) {
|
||||
while (num_packets > 0) {
|
||||
if (impl->marker_on_first && impl->first)
|
||||
header.m = 1;
|
||||
else
|
||||
|
|
@ -238,7 +250,8 @@ static void rtp_audio_flush_packets(struct impl *impl)
|
|||
(timestamp * stride) & BUFFER_MASK,
|
||||
&iov[1], tosend * stride);
|
||||
|
||||
pw_log_trace("sending %d avail:%d ts_offset:%d timestamp:%d", tosend, avail, impl->ts_offset, timestamp);
|
||||
pw_log_trace("sending %d packet:%d ts_offset:%d timestamp:%d",
|
||||
tosend, num_packets, impl->ts_offset, timestamp);
|
||||
|
||||
rtp_stream_emit_send_packet(impl, iov, 3);
|
||||
|
||||
|
|
@ -246,8 +259,19 @@ static void rtp_audio_flush_packets(struct impl *impl)
|
|||
impl->first = false;
|
||||
timestamp += tosend;
|
||||
avail -= tosend;
|
||||
num_packets--;
|
||||
}
|
||||
spa_ringbuffer_read_update(&impl->ring, timestamp);
|
||||
done:
|
||||
if (avail < tosend)
|
||||
set_timer(impl, 0, 0);
|
||||
}
|
||||
|
||||
static void rtp_audio_flush_timeout(struct impl *impl, uint64_t expirations)
|
||||
{
|
||||
if (expirations > 1)
|
||||
pw_log_warn("missing timeout %"PRIu64, expirations);
|
||||
rtp_audio_flush_packets(impl, expirations);
|
||||
}
|
||||
|
||||
static void rtp_audio_process_capture(void *data)
|
||||
|
|
@ -257,6 +281,9 @@ static void rtp_audio_process_capture(void *data)
|
|||
struct spa_data *d;
|
||||
uint32_t offs, size, timestamp, expected_timestamp, stride;
|
||||
int32_t filled, wanted;
|
||||
uint32_t pending, num_queued;
|
||||
struct spa_io_position *pos;
|
||||
uint64_t next_nsec, quantum;
|
||||
|
||||
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
|
||||
pw_log_info("Out of stream buffers: %m");
|
||||
|
|
@ -271,11 +298,17 @@ static void rtp_audio_process_capture(void *data)
|
|||
|
||||
filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp);
|
||||
|
||||
if (SPA_LIKELY(impl->io_position)) {
|
||||
uint32_t rate = impl->io_position->clock.rate.denom;
|
||||
timestamp = impl->io_position->clock.position * impl->rate / rate;
|
||||
} else
|
||||
pos = impl->io_position;
|
||||
if (SPA_LIKELY(pos)) {
|
||||
uint32_t rate = pos->clock.rate.denom;
|
||||
timestamp = pos->clock.position * impl->rate / rate;
|
||||
next_nsec = pos->clock.next_nsec;
|
||||
quantum = pos->clock.duration * SPA_NSEC_PER_SEC / (rate * pos->clock.rate_diff);
|
||||
} else {
|
||||
timestamp = expected_timestamp;
|
||||
next_nsec = 0;
|
||||
quantum = 0;
|
||||
}
|
||||
|
||||
if (!impl->have_sync) {
|
||||
pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u",
|
||||
|
|
@ -284,6 +317,7 @@ static void rtp_audio_process_capture(void *data)
|
|||
memset(impl->buffer, 0, BUFFER_SIZE);
|
||||
impl->have_sync = true;
|
||||
expected_timestamp = timestamp;
|
||||
filled = 0;
|
||||
} else {
|
||||
if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) {
|
||||
pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp);
|
||||
|
|
@ -291,6 +325,7 @@ static void rtp_audio_process_capture(void *data)
|
|||
} else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) {
|
||||
pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride);
|
||||
impl->have_sync = false;
|
||||
filled = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -304,7 +339,22 @@ static void rtp_audio_process_capture(void *data)
|
|||
|
||||
pw_stream_queue_buffer(impl->stream, buf);
|
||||
|
||||
rtp_audio_flush_packets(impl);
|
||||
pending = filled / impl->psamples;
|
||||
num_queued = (filled + wanted) / impl->psamples;
|
||||
|
||||
if (num_queued > 0) {
|
||||
/* flush all previous packets plus new one right away */
|
||||
rtp_audio_flush_packets(impl, pending + 1);
|
||||
num_queued -= SPA_MIN(num_queued, pending + 1);
|
||||
|
||||
if (num_queued > 0) {
|
||||
/* schedule timer for remaining */
|
||||
int64_t interval = quantum / (num_queued + 1);
|
||||
uint64_t time = next_nsec - num_queued * interval;
|
||||
pw_log_trace("%u %u %"PRIu64" %"PRIu64, pending, num_queued, time, interval);
|
||||
set_timer(impl, time, interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int rtp_audio_init(struct impl *impl, enum spa_direction direction)
|
||||
|
|
@ -314,5 +364,6 @@ static int rtp_audio_init(struct impl *impl, enum spa_direction direction)
|
|||
else
|
||||
impl->stream_events.process = rtp_audio_process_playback;
|
||||
impl->receive_rtp = rtp_audio_receive;
|
||||
impl->flush_timeout = rtp_audio_flush_timeout;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,11 @@ struct impl {
|
|||
unsigned receiving:1;
|
||||
unsigned first:1;
|
||||
|
||||
struct pw_loop *data_loop;
|
||||
struct spa_source *timer;
|
||||
|
||||
int (*receive_rtp)(struct impl *impl, uint8_t *buffer, ssize_t len);
|
||||
void (*flush_timeout)(struct impl *impl, uint64_t expirations);
|
||||
};
|
||||
|
||||
#include "module-rtp/audio.c"
|
||||
|
|
@ -271,6 +275,12 @@ static float samples_to_msec(struct impl *impl, uint32_t samples)
|
|||
return samples * 1000.0f / impl->rate;
|
||||
}
|
||||
|
||||
static void on_flush_timeout(void *d, uint64_t expirations)
|
||||
{
|
||||
struct impl *impl = d;
|
||||
impl->flush_timeout(d, expirations);
|
||||
}
|
||||
|
||||
struct rtp_stream *rtp_stream_new(struct pw_core *core,
|
||||
enum pw_direction direction, struct pw_properties *props,
|
||||
const struct rtp_stream_events *events, void *data)
|
||||
|
|
@ -286,16 +296,26 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
|
|||
enum pw_stream_flags flags;
|
||||
float latency_msec;
|
||||
int res;
|
||||
struct pw_data_loop *data_loop;
|
||||
struct pw_context *context;
|
||||
|
||||
impl = calloc(1, sizeof(*impl));
|
||||
if (impl == NULL) {
|
||||
res = -errno;
|
||||
goto out;
|
||||
return NULL;
|
||||
}
|
||||
impl->first = true;
|
||||
spa_hook_list_init(&impl->listener_list);
|
||||
impl->stream_events = stream_events;
|
||||
context = pw_core_get_context(core);
|
||||
data_loop = pw_context_get_data_loop(context);
|
||||
impl->data_loop = pw_data_loop_get_loop(data_loop);
|
||||
impl->timer = pw_loop_add_timer(impl->data_loop, on_flush_timeout, impl);
|
||||
if (impl->timer == NULL) {
|
||||
res = -errno;
|
||||
pw_log_error("can't create timer");
|
||||
goto out;
|
||||
}
|
||||
|
||||
if ((str = pw_properties_get(props, "sess.media")) == NULL)
|
||||
str = "audio";
|
||||
|
|
@ -561,6 +581,9 @@ void rtp_stream_destroy(struct rtp_stream *s)
|
|||
if (impl->stream)
|
||||
pw_stream_destroy(impl->stream);
|
||||
|
||||
if (impl->timer)
|
||||
pw_loop_destroy_source(impl->data_loop, impl->timer);
|
||||
|
||||
spa_hook_list_clean(&impl->listener_list);
|
||||
free(impl);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue