rtp: idle the source when in timeout

Idle the source when no packets are received and resume when new packets
arrive.

Add a stream.may-pause property to pause the stream when no packets are
received during the timeout window.

Make sure the rtp.streaming property is updated correctly and as soon as
we get the first packet.

Fixes #4456
This commit is contained in:
Wim Taymans 2025-01-20 16:44:57 +01:00
parent 980d37fb80
commit 96d593cc34
3 changed files with 57 additions and 12 deletions

View file

@ -56,6 +56,7 @@
* - `sess.latency.msec = <float>`: target network latency in milliseconds, default 100
* - `sess.ignore-ssrc = <bool>`: ignore SSRC, default false
* - `sess.media = <string>`: the media type audio|midi|opus, default audio
* - `stream.may-pause = <bool>`: pause the stream when no data is reveived, default false
* - `stream.props = {}`: properties to be passed to the stream
*
* ## General options
@ -166,9 +167,32 @@ struct impl {
size_t buffer_size;
bool receiving;
bool last_receiving;
bool may_pause;
bool standby;
bool waiting;
};
static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data,
size_t size, void *user_data)
{
struct impl *impl = user_data;
if (impl->waiting) {
struct spa_dict_item item[1];
impl->waiting = false;
impl->standby = false;
pw_log_info("resume RTP source");
item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", "true");
rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1));
if (impl->may_pause)
rtp_stream_set_active(impl->stream, true);
}
return 0;
}
static void
on_rtp_io(void *data, int fd, uint32_t mask)
{
@ -187,7 +211,10 @@ on_rtp_io(void *data, int fd, uint32_t mask)
goto receive_error;
}
impl->receiving = true;
if (!impl->receiving) {
impl->receiving = true;
pw_loop_invoke(impl->loop, do_start, 1, NULL, 0, false, impl);
}
}
return;
@ -353,7 +380,7 @@ static void stream_state_changed(void *data, bool started, const char *error)
rtp_stream_set_error(impl->stream, res, "Can't start RTP stream");
}
} else {
if (!impl->always_process)
if (!impl->always_process && !impl->standby)
stream_stop(impl);
}
}
@ -430,17 +457,22 @@ static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
if (impl->receiving != impl->last_receiving) {
struct spa_dict_item item[1];
impl->last_receiving = impl->receiving;
item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", impl->receiving ? "true" : "false");
rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1));
}
pw_log_debug("timer %d", impl->receiving);
if (!impl->receiving) {
pw_log_info("timeout, inactive RTP source");
if (!impl->standby) {
struct spa_dict_item item[1];
pw_log_info("timeout, standby RTP source");
impl->standby = true;
impl->waiting = true;
item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", "false");
rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1));
if (impl->may_pause)
rtp_stream_set_active(impl->stream, false);
}
//pw_impl_module_schedule_destroy(impl->module);
} else {
pw_log_debug("timeout, keeping active RTP source");
@ -591,6 +623,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, "sess.latency.msec");
copy_props(impl, props, "sess.ts-direct");
copy_props(impl, props, "sess.ignore-ssrc");
copy_props(impl, props, "stream.may-pause");
str = pw_properties_get(props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL;
@ -618,6 +651,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->always_process = pw_properties_get_bool(stream_props,
PW_KEY_NODE_ALWAYS_PROCESS, true);
impl->may_pause = pw_properties_get_bool(stream_props,
"stream.may-pause", false);
impl->standby = false;
impl->waiting = true;
pw_properties_set(stream_props, "rtp.receiving", "false");
impl->cleanup_interval = pw_properties_get_uint32(props,
"cleanup.sec", DEFAULT_CLEANUP_SEC);

View file

@ -694,6 +694,12 @@ enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **err
return pw_stream_get_state(impl->stream, error);
}
int rtp_stream_set_active(struct rtp_stream *s, bool active)
{
struct impl *impl = (struct impl*)s;
return pw_stream_set_active(impl->stream, active);
}
int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param)
{

View file

@ -59,6 +59,7 @@ size_t rtp_stream_get_mtu(struct rtp_stream *s);
void rtp_stream_set_first(struct rtp_stream *s);
int rtp_stream_set_active(struct rtp_stream *s, bool active);
void rtp_stream_set_error(struct rtp_stream *s, int res, const char *error);
enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error);