From 96d593cc345f076c30c1f6f4c35cbff7e6c2eb16 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 20 Jan 2025 16:44:57 +0100 Subject: [PATCH] rtp: idle the source when in timeout Idle the source when no packets are received and resume when new packets arrive. Add a stream.may-pause property to pause the stream when no packets are received during the timeout window. Make sure the rtp.streaming property is updated correctly and as soon as we get the first packet. Fixes #4456 --- src/modules/module-rtp-source.c | 62 ++++++++++++++++++++++++++------- src/modules/module-rtp/stream.c | 6 ++++ src/modules/module-rtp/stream.h | 1 + 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 911e2c729..2d134e329 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -56,6 +56,7 @@ * - `sess.latency.msec = `: target network latency in milliseconds, default 100 * - `sess.ignore-ssrc = `: ignore SSRC, default false * - `sess.media = `: the media type audio|midi|opus, default audio + * - `stream.may-pause = `: pause the stream when no data is reveived, default false * - `stream.props = {}`: properties to be passed to the stream * * ## General options @@ -166,9 +167,32 @@ struct impl { size_t buffer_size; bool receiving; - bool last_receiving; + bool may_pause; + bool standby; + bool waiting; }; +static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data, + size_t size, void *user_data) +{ + struct impl *impl = user_data; + if (impl->waiting) { + struct spa_dict_item item[1]; + + impl->waiting = false; + impl->standby = false; + + pw_log_info("resume RTP source"); + + item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", "true"); + rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1)); + + if (impl->may_pause) + rtp_stream_set_active(impl->stream, true); + } + return 0; +} + static void on_rtp_io(void *data, int fd, uint32_t mask) { @@ -187,7 +211,10 @@ on_rtp_io(void *data, int fd, uint32_t mask) goto receive_error; } - impl->receiving = true; + if (!impl->receiving) { + impl->receiving = true; + pw_loop_invoke(impl->loop, do_start, 1, NULL, 0, false, impl); + } } return; @@ -353,7 +380,7 @@ static void stream_state_changed(void *data, bool started, const char *error) rtp_stream_set_error(impl->stream, res, "Can't start RTP stream"); } } else { - if (!impl->always_process) + if (!impl->always_process && !impl->standby) stream_stop(impl); } } @@ -430,17 +457,22 @@ static void on_timer_event(void *data, uint64_t expirations) { struct impl *impl = data; - if (impl->receiving != impl->last_receiving) { - struct spa_dict_item item[1]; - - impl->last_receiving = impl->receiving; - - item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", impl->receiving ? "true" : "false"); - rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1)); - } + pw_log_debug("timer %d", impl->receiving); if (!impl->receiving) { - pw_log_info("timeout, inactive RTP source"); + if (!impl->standby) { + struct spa_dict_item item[1]; + + pw_log_info("timeout, standby RTP source"); + impl->standby = true; + impl->waiting = true; + + item[0] = SPA_DICT_ITEM_INIT("rtp.receiving", "false"); + rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1)); + + if (impl->may_pause) + rtp_stream_set_active(impl->stream, false); + } //pw_impl_module_schedule_destroy(impl->module); } else { pw_log_debug("timeout, keeping active RTP source"); @@ -591,6 +623,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, "sess.latency.msec"); copy_props(impl, props, "sess.ts-direct"); copy_props(impl, props, "sess.ignore-ssrc"); + copy_props(impl, props, "stream.may-pause"); str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; @@ -618,6 +651,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->always_process = pw_properties_get_bool(stream_props, PW_KEY_NODE_ALWAYS_PROCESS, true); + impl->may_pause = pw_properties_get_bool(stream_props, + "stream.may-pause", false); + impl->standby = false; + impl->waiting = true; + pw_properties_set(stream_props, "rtp.receiving", "false"); impl->cleanup_interval = pw_properties_get_uint32(props, "cleanup.sec", DEFAULT_CLEANUP_SEC); diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index f43ee1df4..63b4f2119 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -694,6 +694,12 @@ enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **err return pw_stream_get_state(impl->stream, error); } +int rtp_stream_set_active(struct rtp_stream *s, bool active) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_set_active(impl->stream, active); +} int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) { diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index c14afcf59..2594e804a 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -59,6 +59,7 @@ size_t rtp_stream_get_mtu(struct rtp_stream *s); void rtp_stream_set_first(struct rtp_stream *s); +int rtp_stream_set_active(struct rtp_stream *s, bool active); void rtp_stream_set_error(struct rtp_stream *s, int res, const char *error); enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error);