module-rtp: handle the send_packet/feedback as callbacks

They are emited from the streaming thread and therefore can be emitted
concurrently with the events on the main thread. This can cause crashes
when the hook list is iterated.

Instead, make those events into callbacks that are more efficient,
and threadsafe.
This commit is contained in:
Wim Taymans 2026-03-26 09:34:45 +01:00
parent 50fcf64058
commit 60062432b8
4 changed files with 13 additions and 7 deletions

View file

@ -561,7 +561,7 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
(double)timestamp * impl->io_position->clock.rate.num /
impl->io_position->clock.rate.denom);
rtp_stream_emit_send_packet(impl, iov, 3);
rtp_stream_call_send_packet(impl, iov, 3);
impl->seq++;
impl->first = false;

View file

@ -151,7 +151,7 @@ static int parse_journal(struct impl *impl, uint8_t *packet, uint16_t seq, uint3
return -EINVAL;
j = (struct rtp_midi_journal*)packet;
uint16_t seqnum = ntohs(j->checkpoint_seqnum);
rtp_stream_emit_send_feedback(impl, seqnum);
rtp_stream_call_send_feedback(impl, seqnum);
return 0;
}
@ -453,7 +453,7 @@ static void rtp_midi_flush_packets(struct impl *impl,
pw_log_trace("sending %d timestamp:%d %u %u",
len, timestamp + base,
offset, impl->psamples);
rtp_stream_emit_send_packet(impl, iov, 3);
rtp_stream_call_send_packet(impl, iov, 3);
impl->seq++;
len = 0;
@ -491,7 +491,7 @@ static void rtp_midi_flush_packets(struct impl *impl,
iov[2].iov_len = len;
pw_log_trace("sending %d timestamp:%d", len, base);
rtp_stream_emit_send_packet(impl, iov, 3);
rtp_stream_call_send_packet(impl, iov, 3);
impl->seq++;
}
}

View file

@ -252,7 +252,7 @@ static void rtp_opus_flush_packets(struct impl *impl)
pw_log_trace("sending %d len:%d timestamp:%d", tosend, res, timestamp);
iov[1].iov_len = res;
rtp_stream_emit_send_packet(impl, iov, 2);
rtp_stream_call_send_packet(impl, iov, 2);
impl->seq++;
timestamp += tosend;

View file

@ -48,8 +48,11 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
#define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r)
#define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r)
#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p)
#define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l)
#define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq)
#define rtp_stream_call(s,m,v,...) spa_callbacks_call_fast(&s->rtp_callbacks, \
struct rtp_stream_events, m, v, ##__VA_ARGS__)
#define rtp_stream_call_send_packet(s,i,l) rtp_stream_call(s, send_packet,0,i,l)
#define rtp_stream_call_send_feedback(s,seq) rtp_stream_call(s, send_feedback,0,seq)
enum rtp_stream_internal_state {
/* The state when the stream is idle / stopped. The background
@ -85,6 +88,8 @@ struct impl {
struct spa_hook stream_listener;
struct pw_stream_events stream_events;
struct spa_callbacks rtp_callbacks;
struct spa_hook_list listener_list;
struct spa_hook listener;
@ -1003,6 +1008,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
(res = stream_start(impl)) < 0)
goto out;
impl->rtp_callbacks = SPA_CALLBACKS_INIT(events, data);
spa_hook_list_append(&impl->listener_list, &impl->listener, events, data);
return (struct rtp_stream*)impl;