module-rtp: Replace state_changed callbacks

The state_changed callbacks fulfill multiple roles, which is both a problem
regarding separation of concerns and regarding code clarity. De facto,
these callbacks cover error reporting, opening connections, and closing
connection, all in one, depending on a state that is arguably an internal
stream detail. The code in these callbacks tie these internal states to
assumptions that opening/closing callbacks is directly tied to specific
state changes in a common way, which is not always true. For example,
stopping the stream may not _actually_ stop it if a background send timer
is still running.

The notion of a "state_changed" callback is also problematic because the
pw_streams that are used in rtp-sink and rtp-source also have a callback
for state changes, causing confusion.

Solve this by replacing state_changed with three new callbacks:

1. report_error : Used for reporting nonrecoverable errors to the caller.
   Note that currently, no one does such error reporting, but the feature
   does exist, so this callback is introduced to preserve said feature.
2. open_connection : Used for opening a connection. Its optional return
   value informs about success or failure.
3. close_connection : Used for opening a connection. Its optional return
   value informs about success or failure.

Importantly, these callbacks do not export any internal stream state. This
improves encapsulation, and also makes it possible to invoke these
callbacks in situations that may not neatly map to a state change. One
example could be to close the connection as part of a stream_start call
to close any connection(s) left over from a previous run. (Followup commits
will in fact introduce such measures.)
This commit is contained in:
Carlos Rafael Giani 2025-08-21 13:30:11 +02:00 committed by Wim Taymans
parent 2f22c1d595
commit 3476e77714
6 changed files with 156 additions and 78 deletions

View file

@ -1545,14 +1545,12 @@ static void stream_destroy(void *d)
impl->stream = NULL;
}
static void stream_state_changed(void *data, bool started, const char *error)
static void stream_report_error(void *data, const char *error)
{
struct impl *impl = data;
if (error) {
pw_log_error("stream error: %s", error);
pw_impl_module_schedule_destroy(impl->module);
return;
}
}
@ -1726,7 +1724,7 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
static const struct rtp_stream_events stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = stream_state_changed,
.report_error = stream_report_error,
.param_changed = stream_param_changed,
.send_packet = stream_send_packet
};

View file

@ -478,18 +478,23 @@ static void send_destroy(void *data)
{
}
static void send_state_changed(void *data, bool started, const char *error)
static void send_open_connection(void *data, int *result)
{
struct session *sess = data;
sess->sending = true;
if (result)
*result = 1;
session_establish(sess);
}
if (started) {
sess->sending = true;
session_establish(sess);
} else {
sess->sending = false;
if (!sess->receiving)
session_stop(sess);
}
static void send_close_connection(void *data, int *result)
{
struct session *sess = data;
sess->sending = false;
if (result)
*result = 1;
if (!sess->receiving)
session_stop(sess);
}
static void send_send_packet(void *data, struct iovec *iov, size_t iovlen)
@ -516,17 +521,24 @@ static void send_send_packet(void *data, struct iovec *iov, size_t iovlen)
static void recv_destroy(void *data)
{
}
static void recv_state_changed(void *data, bool started, const char *error)
static void recv_open_connection(void *data, int *result)
{
struct session *sess = data;
if (started) {
sess->receiving = true;
session_establish(sess);
} else {
sess->receiving = false;
if (!sess->sending)
session_stop(sess);
}
sess->receiving = true;
if (result)
*result = 1;
session_establish(sess);
}
static void recv_close_connection(void *data, int *result)
{
struct session *sess = data;
sess->receiving = false;
if (result)
*result = 1;
if (!sess->sending)
session_stop(sess);
}
static void recv_send_feedback(void *data, uint32_t seqnum)
@ -560,14 +572,16 @@ static void recv_send_feedback(void *data, uint32_t seqnum)
static const struct rtp_stream_events send_stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = send_destroy,
.state_changed = send_state_changed,
.open_connection = send_open_connection,
.close_connection = send_close_connection,
.send_packet = send_send_packet,
};
static const struct rtp_stream_events recv_stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = recv_destroy,
.state_changed = recv_state_changed,
.open_connection = recv_open_connection,
.close_connection = recv_close_connection,
.send_feedback = recv_send_feedback,
};

View file

@ -310,28 +310,49 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
}
}
static void stream_state_changed(void *data, bool started, const char *error)
static void stream_report_error(void *data, const char *error)
{
struct impl *impl = data;
if (error) {
pw_log_error("stream error: %s", error);
pw_impl_module_schedule_destroy(impl->module);
} else if (started) {
int res;
}
}
if ((res = make_socket(&impl->src_addr, impl->src_len,
&impl->dst_addr, impl->dst_len,
impl->mcast_loop, impl->ttl, impl->dscp,
impl->ifname)) < 0) {
pw_log_error("can't make socket: %s", spa_strerror(res));
rtp_stream_set_error(impl->stream, res, "Can't make socket");
return;
}
impl->rtp_fd = res;
} else {
static void stream_open_connection(void *data, int *result)
{
int res;
struct impl *impl = data;
if ((res = make_socket(&impl->src_addr, impl->src_len,
&impl->dst_addr, impl->dst_len,
impl->mcast_loop, impl->ttl, impl->dscp,
impl->ifname)) < 0) {
pw_log_error("can't make socket: %s", spa_strerror(res));
rtp_stream_set_error(impl->stream, res, "Can't make socket");
if (result)
*result = res;
return;
}
if (result)
*result = 1;
impl->rtp_fd = res;
}
static void stream_close_connection(void *data, int *result)
{
struct impl *impl = data;
if (impl->rtp_fd > 0) {
if (result)
*result = 1;
close(impl->rtp_fd);
impl->rtp_fd = -1;
} else {
if (result)
*result = 0;
}
}
@ -417,7 +438,9 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
static const struct rtp_stream_events stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = stream_state_changed,
.report_error = stream_report_error,
.open_connection = stream_open_connection,
.close_connection = stream_close_connection,
.param_changed = stream_param_changed,
.send_packet = stream_send_packet,
};
@ -442,8 +465,10 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
if (impl->rtp_fd != -1)
if (impl->rtp_fd != -1) {
pw_log_info("closing socket with FD %d as part of shutdown", impl->rtp_fd);
close(impl->rtp_fd);
}
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);

View file

@ -385,13 +385,13 @@ error:
return res;
}
static int stream_start(struct impl *impl);
static void stream_open_connection(void *data, int *result);
static void on_stream_start_retry_timer_event(void *data, uint64_t expirations)
static void on_open_connection_retry_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
pw_log_debug("trying again to start RTP listener after previous attempt failed with ENODEV");
stream_start(impl);
pw_log_debug("trying again to open connection after previous attempt failed with ENODEV");
stream_open_connection(impl, NULL);
}
static void destroy_stream_start_retry_timer(struct impl *impl)
@ -402,12 +402,23 @@ static void destroy_stream_start_retry_timer(struct impl *impl)
}
}
static int stream_start(struct impl *impl)
static void stream_report_error(void *data, const char *error)
{
struct impl *impl = data;
if (error) {
pw_log_error("stream error: %s", error);
pw_impl_module_schedule_destroy(impl->module);
}
}
static void stream_open_connection(void *data, int *result)
{
int res = 0;
int fd;
struct impl *impl = data;
if (impl->source != NULL)
return 0;
goto finish;
pw_log_info("starting RTP listener");
@ -431,7 +442,7 @@ static int stream_start(struct impl *impl)
struct timespec value, interval;
impl->stream_start_retry_timer = pw_loop_add_timer(impl->main_loop,
on_stream_start_retry_timer_event, impl);
on_open_connection_retry_timer_event, impl);
/* Use a 1-second retry interval. The network interfaces
* are likely to be up and running then. */
value.tv_sec = 1;
@ -445,14 +456,16 @@ static int stream_start(struct impl *impl)
/* It is important to return 0 in this case. Otherwise, the nonzero return
* value will later be propagated through the core as an error. */
return 0;
res = 0;
goto finish;
} else {
pw_log_error("failed to create socket: %m");
/* If ENODEV was returned earlier, and the stream_start_retry_timer
* was consequently created, but then a non-ENODEV error occurred,
* the timer must be stopped and removed. */
destroy_stream_start_retry_timer(impl);
return -errno;
res = -errno;
goto finish;
}
}
@ -465,13 +478,27 @@ static int stream_start(struct impl *impl)
if (impl->source == NULL) {
pw_log_error("can't create io source: %m");
close(fd);
return -errno;
res = -errno;
goto finish;
}
return 0;
finish:
if (res != 0) {
pw_log_error("failed to start RTP stream: %s", spa_strerror(res));
rtp_stream_set_error(impl->stream, res, "Can't start RTP stream");
}
if (result)
*result = res;
}
static void stream_stop(struct impl *impl)
static void stream_close_connection(void *data, int *result)
{
struct impl *impl = data;
if (result)
*result = 0;
if (!impl->source)
return;
@ -489,25 +516,6 @@ static void stream_destroy(void *d)
impl->stream = NULL;
}
static void stream_state_changed(void *data, bool started, const char *error)
{
struct impl *impl = data;
int res;
if (error) {
pw_log_error("stream error: %s", error);
pw_impl_module_schedule_destroy(impl->module);
} else if (started) {
if ((res = stream_start(impl)) < 0) {
pw_log_error("failed to start RTP stream: %s", spa_strerror(res));
rtp_stream_set_error(impl->stream, res, "Can't start RTP stream");
}
} else {
if (!impl->always_process && !impl->standby)
stream_stop(impl);
}
}
static void stream_props_changed(struct impl *impl, uint32_t id, const struct spa_pod *param)
{
struct spa_pod_object *obj = (struct spa_pod_object *)param;
@ -572,7 +580,9 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
static const struct rtp_stream_events stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = stream_state_changed,
.report_error = stream_report_error,
.open_connection = stream_open_connection,
.close_connection = stream_close_connection,
.param_changed = stream_param_changed,
};

View file

@ -34,10 +34,17 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
#define BUFFER_SIZE2 (BUFFER_SIZE>>1)
#define BUFFER_MASK2 (BUFFER_SIZE2-1)
/* IMPORTANT: When using calls that have return values, like
* rtp_stream_emit_open_connection, callers must set the variables
* that receive the return values to a default value, because in
* cases where the callback is not actually set, no call is made,
* and thus, uninitialized return variables remain uninitialized.*/
#define rtp_stream_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, \
struct rtp_stream_events, m, v, ##__VA_ARGS__)
#define rtp_stream_emit_destroy(s) rtp_stream_emit(s, destroy, 0)
#define rtp_stream_emit_state_changed(s,n,e) rtp_stream_emit(s, state_changed,0,n,e)
#define rtp_stream_emit_report_error(s,e) rtp_stream_emit(s, report_error, 0,e)
#define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r)
#define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r)
#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p)
#define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l)
#define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq)
@ -140,9 +147,14 @@ struct impl {
static int do_emit_state_changed(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = user_data;
bool *started = (bool *)data;
bool started = *((bool *)data);
if (started) {
rtp_stream_emit_open_connection(impl, NULL);
} else {
rtp_stream_emit_close_connection(impl, NULL);
}
rtp_stream_emit_state_changed(impl, *started, NULL);
return 0;
}
@ -191,6 +203,8 @@ static void stream_destroy(void *d)
static int stream_start(struct impl *impl)
{
int res;
if (impl->started)
return 0;
@ -206,7 +220,14 @@ static int stream_start(struct impl *impl)
impl->reset_ringbuffer(impl);
rtp_stream_emit_state_changed(impl, true, NULL);
res = 0;
rtp_stream_emit_open_connection(impl, &res);
if (res > 0) {
pw_log_debug("opened new connection");
} else if (res < 0) {
pw_log_error("could not open connection: %s", spa_strerror(res));
return res;
}
if (impl->separate_sender) {
struct spa_dict_item items[1];
@ -230,7 +251,7 @@ static int stream_stop(struct impl *impl)
/* if timer is running, the state changed event must be emitted by the timer after all packets have been sent */
if (!impl->timer_running)
rtp_stream_emit_state_changed(impl, false, NULL);
rtp_stream_emit_close_connection(impl, NULL);
if (impl->separate_sender) {
struct spa_dict_item items[1];

View file

@ -35,7 +35,17 @@ struct rtp_stream_events {
void (*destroy) (void *data);
void (*state_changed) (void *data, bool started, const char *error);
void (*report_error) (void *data, const char *error);
/* Requests the network connection to be opened. If result is non-NULL,
* the call sets it to >0 in case of success, and a negative errno error
* code in case of failure. (Result value 0 is unused.) */
void (*open_connection) (void *data, int *result);
/* Requests the network connection to be closed. If result is non-NULL,
* the call sets it to >0 in case of success, 0 if the connection was
* already closed, and a negative errno error code in case of failure. */
void (*close_connection) (void *data, int *result);
void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param);