From 60062432b8188862492732bf6bdbd552ea1f6070 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 26 Mar 2026 09:34:45 +0100 Subject: [PATCH] module-rtp: handle the send_packet/feedback as callbacks They are emited from the streaming thread and therefore can be emitted concurrently with the events on the main thread. This can cause crashes when the hook list is iterated. Instead, make those events into callbacks that are more efficient, and threadsafe. --- src/modules/module-rtp/audio.c | 2 +- src/modules/module-rtp/midi.c | 6 +++--- src/modules/module-rtp/opus.c | 2 +- src/modules/module-rtp/stream.c | 10 ++++++++-- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index eec37317f..e03ae7ce9 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -561,7 +561,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin (double)timestamp * impl->io_position->clock.rate.num / impl->io_position->clock.rate.denom); - rtp_stream_emit_send_packet(impl, iov, 3); + rtp_stream_call_send_packet(impl, iov, 3); impl->seq++; impl->first = false; diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c index d5c55740d..1237f66c6 100644 --- a/src/modules/module-rtp/midi.c +++ b/src/modules/module-rtp/midi.c @@ -151,7 +151,7 @@ static int parse_journal(struct impl *impl, uint8_t *packet, uint16_t seq, uint3 return -EINVAL; j = (struct rtp_midi_journal*)packet; uint16_t seqnum = ntohs(j->checkpoint_seqnum); - rtp_stream_emit_send_feedback(impl, seqnum); + rtp_stream_call_send_feedback(impl, seqnum); return 0; } @@ -453,7 +453,7 @@ static void rtp_midi_flush_packets(struct impl *impl, pw_log_trace("sending %d timestamp:%d %u %u", len, timestamp + base, offset, impl->psamples); - rtp_stream_emit_send_packet(impl, iov, 3); + rtp_stream_call_send_packet(impl, iov, 3); impl->seq++; len = 0; @@ -491,7 +491,7 @@ static void rtp_midi_flush_packets(struct impl *impl, iov[2].iov_len = len; pw_log_trace("sending %d timestamp:%d", len, base); - rtp_stream_emit_send_packet(impl, iov, 3); + rtp_stream_call_send_packet(impl, iov, 3); impl->seq++; } } diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c index d13a4efaf..9175d4a31 100644 --- a/src/modules/module-rtp/opus.c +++ b/src/modules/module-rtp/opus.c @@ -252,7 +252,7 @@ static void rtp_opus_flush_packets(struct impl *impl) pw_log_trace("sending %d len:%d timestamp:%d", tosend, res, timestamp); iov[1].iov_len = res; - rtp_stream_emit_send_packet(impl, iov, 2); + rtp_stream_call_send_packet(impl, iov, 2); impl->seq++; timestamp += tosend; diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 992563aea..11bba4f98 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -48,8 +48,11 @@ PW_LOG_TOPIC_EXTERN(mod_topic); #define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r) #define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r) #define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p) -#define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l) -#define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq) + +#define rtp_stream_call(s,m,v,...) spa_callbacks_call_fast(&s->rtp_callbacks, \ + struct rtp_stream_events, m, v, ##__VA_ARGS__) +#define rtp_stream_call_send_packet(s,i,l) rtp_stream_call(s, send_packet,0,i,l) +#define rtp_stream_call_send_feedback(s,seq) rtp_stream_call(s, send_feedback,0,seq) enum rtp_stream_internal_state { /* The state when the stream is idle / stopped. The background @@ -85,6 +88,8 @@ struct impl { struct spa_hook stream_listener; struct pw_stream_events stream_events; + struct spa_callbacks rtp_callbacks; + struct spa_hook_list listener_list; struct spa_hook listener; @@ -1003,6 +1008,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, (res = stream_start(impl)) < 0) goto out; + impl->rtp_callbacks = SPA_CALLBACKS_INIT(events, data); spa_hook_list_append(&impl->listener_list, &impl->listener, events, data); return (struct rtp_stream*)impl;