module-rtp-source: close socket while idle

Reduce network bandwidth when many streams are present but not used
This commit is contained in:
Dmitry Sharshakov 2023-01-25 19:03:17 +03:00 committed by Wim Taymans
parent 62766d8175
commit 785694db31

View file

@ -85,6 +85,7 @@
* #sap.port = 9875 * #sap.port = 9875
* #local.ifname = eth0 * #local.ifname = eth0
* sess.latency.msec = 100 * sess.latency.msec = 100
* #node.always-process = false # true to receive even when not running
* stream.props = { * stream.props = {
* #media.class = "Audio/Source" * #media.class = "Audio/Source"
* #node.name = "rtp-source" * #node.name = "rtp-source"
@ -170,6 +171,7 @@ struct impl {
char *ifname; char *ifname;
char *sap_ip; char *sap_ip;
bool always_process;
int sap_port; int sap_port;
int sess_latency_msec; int sess_latency_msec;
uint32_t cleanup_interval; uint32_t cleanup_interval;
@ -330,25 +332,6 @@ static void stream_process(void *data)
pw_stream_queue_buffer(sess->stream, buf); pw_stream_queue_buffer(sess->stream, buf);
} }
static void on_stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct session *sess = d;
struct impl *impl = sess->impl;
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("stream disconnected, unloading");
pw_impl_module_schedule_destroy(impl->module);
break;
case PW_STREAM_STATE_ERROR:
pw_log_error("stream error: %s", error);
break;
default:
break;
}
}
static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{ {
struct session *sess = data; struct session *sess = data;
@ -359,14 +342,6 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size
} }
} }
static const struct pw_stream_events out_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = on_stream_state_changed,
.io_changed = stream_io_changed,
.process = stream_process
};
static void static void
on_rtp_io(void *data, int fd, uint32_t mask) on_rtp_io(void *data, int fd, uint32_t mask)
{ {
@ -401,7 +376,8 @@ on_rtp_io(void *data, int fd, uint32_t mask)
seq = ntohs(hdr->sequence_number); seq = ntohs(hdr->sequence_number);
if (sess->have_seq && sess->expected_seq != seq) { if (sess->have_seq && sess->expected_seq != seq) {
pw_log_warn("unexpected seq (%d != %d)", seq, sess->expected_seq); pw_log_info("unexpected seq (%d != %d)", seq, sess->expected_seq);
sess->have_sync = false;
} }
sess->expected_seq = seq + 1; sess->expected_seq = seq + 1;
sess->have_seq = true; sess->have_seq = true;
@ -603,6 +579,78 @@ static int rule_matched(void *data, const char *location, const char *action,
return res; return res;
} }
static int session_start(struct impl *impl, struct session *session) {
int fd;
if (session->source)
return 0;
pw_log_info("starting RTP listener");
if ((fd = make_socket((const struct sockaddr *)&session->info.sa,
session->info.salen, impl->ifname)) < 0) {
pw_log_error("failed to create socket: %m");
return fd;
}
session->source = pw_loop_add_io(impl->data_loop, fd,
SPA_IO_IN, true, on_rtp_io, session);
if (session->source == NULL) {
pw_log_error("can't create io source: %m");
close(fd);
return -errno;
}
return 0;
}
static void session_stop(struct impl *impl, struct session *session) {
if (!session->source)
return;
pw_log_info("stopping RTP listener");
pw_loop_destroy_source(
session->impl->data_loop,
session->source
);
session->source = NULL;
}
static void on_stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct session *sess = d;
struct impl *impl = sess->impl;
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("stream disconnected, unloading");
pw_impl_module_schedule_destroy(impl->module);
break;
case PW_STREAM_STATE_ERROR:
pw_log_error("stream error: %s", error);
break;
case PW_STREAM_STATE_STREAMING:
if ((errno = -session_start(impl, sess)) < 0)
pw_log_error("failed to start RTP stream: %m");
break;
case PW_STREAM_STATE_PAUSED:
if (!impl->always_process)
session_stop(impl, sess);
break;
default:
break;
}
}
static const struct pw_stream_events out_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = on_stream_state_changed,
.io_changed = stream_io_changed,
.process = stream_process
};
static int session_new(struct impl *impl, struct sdp_info *info) static int session_new(struct impl *impl, struct sdp_info *info)
{ {
struct session *session; struct session *session;
@ -611,7 +659,7 @@ static int session_new(struct impl *impl, struct sdp_info *info)
uint32_t n_params; uint32_t n_params;
uint8_t buffer[1024]; uint8_t buffer[1024];
struct pw_properties *props; struct pw_properties *props;
int res, fd, sess_latency_msec; int res, sess_latency_msec;
const char *str; const char *str;
if (impl->n_sessions >= MAX_SESSIONS) { if (impl->n_sessions >= MAX_SESSIONS) {
@ -709,21 +757,10 @@ static int session_new(struct impl *impl, struct sdp_info *info)
goto error; goto error;
} }
if ((fd = make_socket((const struct sockaddr *)&info->sa, if (impl->always_process &&
info->salen, impl->ifname)) < 0) { (res = session_start(impl, session)) < 0)
res = fd;
goto error; goto error;
}
session->source = pw_loop_add_io(impl->data_loop, fd,
SPA_IO_IN, true, on_rtp_io, session);
if (session->source == NULL) {
res = -errno;
pw_log_error("can't create io source: %m");
goto error;
}
pw_log_info("starting RTP listener");
session_touch(session); session_touch(session);
session->impl = impl; session->impl = impl;
@ -1175,6 +1212,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
str = pw_properties_get(impl->props, "local.ifname"); str = pw_properties_get(impl->props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL; impl->ifname = str ? strdup(str) : NULL;
impl->always_process = pw_properties_get_bool(impl->props, PW_KEY_NODE_ALWAYS_PROCESS, false);
str = pw_properties_get(impl->props, "sap.ip"); str = pw_properties_get(impl->props, "sap.ip");
impl->sap_ip = strdup(str ? str : DEFAULT_SAP_IP); impl->sap_ip = strdup(str ? str : DEFAULT_SAP_IP);
impl->sap_port = pw_properties_get_uint32(impl->props, impl->sap_port = pw_properties_get_uint32(impl->props,