module-rtp: add some rate limit to send/recv errors

This commit is contained in:
Wim Taymans 2025-07-02 18:31:17 +02:00
parent 47ee9ef10a
commit 616db9809e
2 changed files with 37 additions and 4 deletions

View file

@ -19,6 +19,7 @@
#include <spa/utils/result.h>
#include <spa/utils/ringbuffer.h>
#include <spa/utils/json.h>
#include <spa/utils/ratelimit.h>
#include <spa/param/audio/format-utils.h>
#include <spa/debug/types.h>
@ -175,6 +176,8 @@ struct impl {
struct pw_properties *stream_props;
struct rtp_stream *stream;
struct spa_ratelimit rate_limit;
unsigned int do_disconnect:1;
char *ifname;
@ -279,6 +282,13 @@ static void stream_destroy(void *d)
impl->stream = NULL;
}
static inline uint64_t get_time_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
{
struct impl *impl = data;
@ -293,8 +303,11 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
msg.msg_flags = 0;
n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL);
if (n < 0)
pw_log_warn("sendmsg() failed: %m");
if (n < 0) {
int suppressed;
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed);
}
}
static void stream_state_changed(void *data, bool started, const char *error)
@ -516,6 +529,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
}
impl->stream_props = stream_props;
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
impl->module = module;
impl->context = context;
impl->loop = pw_context_get_main_loop(context);

View file

@ -21,6 +21,7 @@
#include <spa/utils/defs.h>
#include <spa/utils/dll.h>
#include <spa/utils/json.h>
#include <spa/utils/ratelimit.h>
#include <spa/param/audio/format-utils.h>
#include <spa/control/control.h>
#include <spa/debug/types.h>
@ -159,6 +160,8 @@ struct impl {
struct spa_hook core_proxy_listener;
unsigned int do_disconnect:1;
struct spa_ratelimit rate_limit;
char *ifname;
bool always_process;
uint32_t cleanup_interval;
@ -185,6 +188,13 @@ struct impl {
bool waiting;
};
static inline uint64_t get_time_ns(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static int do_start(struct spa_loop *loop, bool async, uint32_t seq, const void *data,
size_t size, void *user_data)
{
@ -211,6 +221,7 @@ on_rtp_io(void *data, int fd, uint32_t mask)
{
struct impl *impl = data;
ssize_t len;
int suppressed;
if (mask & SPA_IO_IN) {
if ((len = recv(fd, impl->buffer, impl->buffer_size, 0)) < 0)
@ -232,10 +243,13 @@ on_rtp_io(void *data, int fd, uint32_t mask)
return;
receive_error:
pw_log_warn("recv error: %m");
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
pw_log_warn("(%d suppressed) recv() error: %m", suppressed);
return;
short_packet:
pw_log_warn("short packet of len %zd received", len);
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
pw_log_warn("(%d suppressed) short packet of len %zd received",
suppressed, len);
return;
}
@ -666,6 +680,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
if ((sess_name = pw_properties_get(props, "sess.name")) == NULL)
sess_name = pw_get_host_name();