diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index c34fe99fc..e76b94c13 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -112,6 +112,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define ERROR_MSEC 2 #define MAX_SESSIONS 16 #define MTU 1500 +#define CLEANUP_INTERVAL_SEC 20 struct impl { struct pw_impl_module *module; @@ -125,6 +126,7 @@ struct impl { struct spa_hook core_listener; struct spa_hook core_proxy_listener; + struct spa_source *timer; struct spa_source *sap_source; struct pw_properties *stream_props; @@ -163,6 +165,8 @@ struct session { struct impl *impl; struct spa_list link; + uint64_t timestamp; + struct sdp_info info; struct spa_source *source; @@ -328,6 +332,7 @@ on_rtp_io(void *data, int fd, uint32_t mask) if (!sess->have_sync) { sess->ring.readindex = sess->ring.writeindex = index = expected_index; + filled = 0; sess->have_sync = true; sess->buffering = true; pw_log_info("sync to timestamp %u", index); @@ -336,10 +341,12 @@ on_rtp_io(void *data, int fd, uint32_t mask) index / sess->info.stride, expected_index / sess->info.stride); index = expected_index; + filled = 0; } if (filled + len > BUFFER_SIZE) { pw_log_warn("capture overrun %u %zd", filled, len); + sess->have_sync = false; } else { spa_ringbuffer_write_data(&sess->ring, sess->buffer, @@ -452,6 +459,13 @@ static uint32_t msec_to_bytes(struct sdp_info *info, uint32_t msec) return msec * info->stride * info->info.rate / 1000; } +static void session_touch(struct session *sess) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts); +} + static int session_new(struct impl *impl, struct sdp_info *info) { struct session *session; @@ -533,6 +547,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) if (session->source == NULL) return -errno; + session_touch(session); spa_list_append(&impl->sessions, &session->link); impl->n_sessions++; @@ -541,6 +556,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) static void session_free(struct session *sess) { + pw_log_info("free session %s %s", sess->info.origin, sess->info.session); sess->impl->n_sessions--; spa_list_remove(&sess->link); if (sess->stream) @@ -723,6 +739,8 @@ static int parse_sap(struct impl *impl, void *data, size_t len) struct sdp_info info; struct session *sess; int res; + size_t offs; + bool bye; if (len < 8) return -EINVAL; @@ -731,7 +749,17 @@ static int parse_sap(struct impl *impl, void *data, size_t len) if (header->v != 1) return -EINVAL; - mime = SPA_PTROFF(data, 8, char); + if (header->e) + return -ENOTSUP; + if (header->c) + return -ENOTSUP; + + offs = header->a ? 12 : 8; + offs += header->auth_len * 4; + if (len <= offs) + return -EINVAL; + + mime = SPA_PTROFF(data, offs, char); if (spa_strstartswith(mime, "v=0")) { sdp = mime; mime = "application/sdp"; @@ -746,12 +774,18 @@ static int parse_sap(struct impl *impl, void *data, size_t len) if ((res = parse_sdp(impl, sdp, &info)) < 0) return res; + bye = header->t; + sess = session_find(impl, &info); if (sess == NULL) { - session_new(impl, &info); + if (!bye) + session_new(impl, &info); } else { + if (bye) + session_free(sess); + else + session_touch(sess); } - return res; } @@ -814,6 +848,23 @@ error: } +static void on_timer_event(void *data, uint64_t expirations) +{ + struct impl *impl = data; + struct timespec now; + struct session *sess, *tmp; + uint64_t timestamp, interval; + + clock_gettime(CLOCK_MONOTONIC, &now); + timestamp = SPA_TIMESPEC_TO_NSEC(&now); + interval = CLEANUP_INTERVAL_SEC * SPA_NSEC_PER_SEC; + + spa_list_for_each_safe(sess, tmp, &impl->sessions, link) { + if (sess->timestamp + interval < timestamp) + session_free(sess); + } +} + static void core_destroy(void *d) { struct impl *impl = d; @@ -835,6 +886,11 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); + if (impl->sap_source) + pw_loop_destroy_source(impl->loop, impl->sap_source); + if (impl->timer) + pw_loop_destroy_source(impl->loop, impl->timer); + pw_properties_free(impl->stream_props); pw_properties_free(impl->props); @@ -878,6 +934,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct impl *impl; struct pw_properties *props = NULL, *stream_props = NULL; const char *str; + struct timespec value, interval; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -955,6 +1012,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); + impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer source: %m"); + goto out; + } + value.tv_sec = 0; + value.tv_nsec = 1; + interval.tv_sec = CLEANUP_INTERVAL_SEC; + interval.tv_nsec = 0; + pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); + if ((res = start_sap_listener(impl)) < 0) goto out;