diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index bae21b0f7..d23474166 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -85,6 +85,7 @@ * #sap.port = 9875 * #local.ifname = eth0 * sess.latency.msec = 100 + * #node.always-process = false # true to receive even when not running * stream.props = { * #media.class = "Audio/Source" * #node.name = "rtp-source" @@ -170,6 +171,7 @@ struct impl { char *ifname; char *sap_ip; + bool always_process; int sap_port; int sess_latency_msec; uint32_t cleanup_interval; @@ -330,25 +332,6 @@ static void stream_process(void *data) pw_stream_queue_buffer(sess->stream, buf); } -static void on_stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct session *sess = d; - struct impl *impl = sess->impl; - - switch (state) { - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - pw_impl_module_schedule_destroy(impl->module); - break; - case PW_STREAM_STATE_ERROR: - pw_log_error("stream error: %s", error); - break; - default: - break; - } -} - static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) { struct session *sess = data; @@ -359,14 +342,6 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size } } -static const struct pw_stream_events out_stream_events = { - PW_VERSION_STREAM_EVENTS, - .destroy = stream_destroy, - .state_changed = on_stream_state_changed, - .io_changed = stream_io_changed, - .process = stream_process -}; - static void on_rtp_io(void *data, int fd, uint32_t mask) { @@ -401,7 +376,8 @@ on_rtp_io(void *data, int fd, uint32_t mask) seq = ntohs(hdr->sequence_number); if (sess->have_seq && sess->expected_seq != seq) { - pw_log_warn("unexpected seq (%d != %d)", seq, sess->expected_seq); + pw_log_info("unexpected seq (%d != %d)", seq, sess->expected_seq); + sess->have_sync = false; } sess->expected_seq = seq + 1; sess->have_seq = true; @@ -603,6 +579,78 @@ static int rule_matched(void *data, const char *location, const char *action, return res; } +static int session_start(struct impl *impl, struct session *session) { + int fd; + if (session->source) + return 0; + + pw_log_info("starting RTP listener"); + + if ((fd = make_socket((const struct sockaddr *)&session->info.sa, + session->info.salen, impl->ifname)) < 0) { + pw_log_error("failed to create socket: %m"); + return fd; + } + + session->source = pw_loop_add_io(impl->data_loop, fd, + SPA_IO_IN, true, on_rtp_io, session); + if (session->source == NULL) { + pw_log_error("can't create io source: %m"); + close(fd); + return -errno; + } + return 0; +} + +static void session_stop(struct impl *impl, struct session *session) { + if (!session->source) + return; + + pw_log_info("stopping RTP listener"); + + pw_loop_destroy_source( + session->impl->data_loop, + session->source + ); + + session->source = NULL; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct session *sess = d; + struct impl *impl = sess->impl; + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + pw_log_info("stream disconnected, unloading"); + pw_impl_module_schedule_destroy(impl->module); + break; + case PW_STREAM_STATE_ERROR: + pw_log_error("stream error: %s", error); + break; + case PW_STREAM_STATE_STREAMING: + if ((errno = -session_start(impl, sess)) < 0) + pw_log_error("failed to start RTP stream: %m"); + break; + case PW_STREAM_STATE_PAUSED: + if (!impl->always_process) + session_stop(impl, sess); + break; + default: + break; + } +} + +static const struct pw_stream_events out_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = on_stream_state_changed, + .io_changed = stream_io_changed, + .process = stream_process +}; + static int session_new(struct impl *impl, struct sdp_info *info) { struct session *session; @@ -611,7 +659,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) uint32_t n_params; uint8_t buffer[1024]; struct pw_properties *props; - int res, fd, sess_latency_msec; + int res, sess_latency_msec; const char *str; if (impl->n_sessions >= MAX_SESSIONS) { @@ -709,21 +757,10 @@ static int session_new(struct impl *impl, struct sdp_info *info) goto error; } - if ((fd = make_socket((const struct sockaddr *)&info->sa, - info->salen, impl->ifname)) < 0) { - res = fd; + if (impl->always_process && + (res = session_start(impl, session)) < 0) goto error; - } - session->source = pw_loop_add_io(impl->data_loop, fd, - SPA_IO_IN, true, on_rtp_io, session); - if (session->source == NULL) { - res = -errno; - pw_log_error("can't create io source: %m"); - goto error; - } - - pw_log_info("starting RTP listener"); session_touch(session); session->impl = impl; @@ -1175,6 +1212,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) str = pw_properties_get(impl->props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; + impl->always_process = pw_properties_get_bool(impl->props, PW_KEY_NODE_ALWAYS_PROCESS, false); + str = pw_properties_get(impl->props, "sap.ip"); impl->sap_ip = strdup(str ? str : DEFAULT_SAP_IP); impl->sap_port = pw_properties_get_uint32(impl->props,