module-rtp: add session timeout and bye

Handle recovery better.
This commit is contained in:
Wim Taymans 2022-10-05 13:47:28 +02:00
parent afc3c12bea
commit 04cc036f94

View file

@ -112,6 +112,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define ERROR_MSEC 2 #define ERROR_MSEC 2
#define MAX_SESSIONS 16 #define MAX_SESSIONS 16
#define MTU 1500 #define MTU 1500
#define CLEANUP_INTERVAL_SEC 20
struct impl { struct impl {
struct pw_impl_module *module; struct pw_impl_module *module;
@ -125,6 +126,7 @@ struct impl {
struct spa_hook core_listener; struct spa_hook core_listener;
struct spa_hook core_proxy_listener; struct spa_hook core_proxy_listener;
struct spa_source *timer;
struct spa_source *sap_source; struct spa_source *sap_source;
struct pw_properties *stream_props; struct pw_properties *stream_props;
@ -163,6 +165,8 @@ struct session {
struct impl *impl; struct impl *impl;
struct spa_list link; struct spa_list link;
uint64_t timestamp;
struct sdp_info info; struct sdp_info info;
struct spa_source *source; struct spa_source *source;
@ -328,6 +332,7 @@ on_rtp_io(void *data, int fd, uint32_t mask)
if (!sess->have_sync) { if (!sess->have_sync) {
sess->ring.readindex = sess->ring.writeindex = sess->ring.readindex = sess->ring.writeindex =
index = expected_index; index = expected_index;
filled = 0;
sess->have_sync = true; sess->have_sync = true;
sess->buffering = true; sess->buffering = true;
pw_log_info("sync to timestamp %u", index); pw_log_info("sync to timestamp %u", index);
@ -336,10 +341,12 @@ on_rtp_io(void *data, int fd, uint32_t mask)
index / sess->info.stride, index / sess->info.stride,
expected_index / sess->info.stride); expected_index / sess->info.stride);
index = expected_index; index = expected_index;
filled = 0;
} }
if (filled + len > BUFFER_SIZE) { if (filled + len > BUFFER_SIZE) {
pw_log_warn("capture overrun %u %zd", filled, len); pw_log_warn("capture overrun %u %zd", filled, len);
sess->have_sync = false;
} else { } else {
spa_ringbuffer_write_data(&sess->ring, spa_ringbuffer_write_data(&sess->ring,
sess->buffer, sess->buffer,
@ -452,6 +459,13 @@ static uint32_t msec_to_bytes(struct sdp_info *info, uint32_t msec)
return msec * info->stride * info->info.rate / 1000; return msec * info->stride * info->info.rate / 1000;
} }
static void session_touch(struct session *sess)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
sess->timestamp = SPA_TIMESPEC_TO_NSEC(&ts);
}
static int session_new(struct impl *impl, struct sdp_info *info) static int session_new(struct impl *impl, struct sdp_info *info)
{ {
struct session *session; struct session *session;
@ -533,6 +547,7 @@ static int session_new(struct impl *impl, struct sdp_info *info)
if (session->source == NULL) if (session->source == NULL)
return -errno; return -errno;
session_touch(session);
spa_list_append(&impl->sessions, &session->link); spa_list_append(&impl->sessions, &session->link);
impl->n_sessions++; impl->n_sessions++;
@ -541,6 +556,7 @@ static int session_new(struct impl *impl, struct sdp_info *info)
static void session_free(struct session *sess) static void session_free(struct session *sess)
{ {
pw_log_info("free session %s %s", sess->info.origin, sess->info.session);
sess->impl->n_sessions--; sess->impl->n_sessions--;
spa_list_remove(&sess->link); spa_list_remove(&sess->link);
if (sess->stream) if (sess->stream)
@ -723,6 +739,8 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
struct sdp_info info; struct sdp_info info;
struct session *sess; struct session *sess;
int res; int res;
size_t offs;
bool bye;
if (len < 8) if (len < 8)
return -EINVAL; return -EINVAL;
@ -731,7 +749,17 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
if (header->v != 1) if (header->v != 1)
return -EINVAL; return -EINVAL;
mime = SPA_PTROFF(data, 8, char); if (header->e)
return -ENOTSUP;
if (header->c)
return -ENOTSUP;
offs = header->a ? 12 : 8;
offs += header->auth_len * 4;
if (len <= offs)
return -EINVAL;
mime = SPA_PTROFF(data, offs, char);
if (spa_strstartswith(mime, "v=0")) { if (spa_strstartswith(mime, "v=0")) {
sdp = mime; sdp = mime;
mime = "application/sdp"; mime = "application/sdp";
@ -746,12 +774,18 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
if ((res = parse_sdp(impl, sdp, &info)) < 0) if ((res = parse_sdp(impl, sdp, &info)) < 0)
return res; return res;
bye = header->t;
sess = session_find(impl, &info); sess = session_find(impl, &info);
if (sess == NULL) { if (sess == NULL) {
if (!bye)
session_new(impl, &info); session_new(impl, &info);
} else { } else {
if (bye)
session_free(sess);
else
session_touch(sess);
} }
return res; return res;
} }
@ -814,6 +848,23 @@ error:
} }
static void on_timer_event(void *data, uint64_t expirations)
{
struct impl *impl = data;
struct timespec now;
struct session *sess, *tmp;
uint64_t timestamp, interval;
clock_gettime(CLOCK_MONOTONIC, &now);
timestamp = SPA_TIMESPEC_TO_NSEC(&now);
interval = CLEANUP_INTERVAL_SEC * SPA_NSEC_PER_SEC;
spa_list_for_each_safe(sess, tmp, &impl->sessions, link) {
if (sess->timestamp + interval < timestamp)
session_free(sess);
}
}
static void core_destroy(void *d) static void core_destroy(void *d)
{ {
struct impl *impl = d; struct impl *impl = d;
@ -835,6 +886,11 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect) if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core); pw_core_disconnect(impl->core);
if (impl->sap_source)
pw_loop_destroy_source(impl->loop, impl->sap_source);
if (impl->timer)
pw_loop_destroy_source(impl->loop, impl->timer);
pw_properties_free(impl->stream_props); pw_properties_free(impl->stream_props);
pw_properties_free(impl->props); pw_properties_free(impl->props);
@ -878,6 +934,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
struct impl *impl; struct impl *impl;
struct pw_properties *props = NULL, *stream_props = NULL; struct pw_properties *props = NULL, *stream_props = NULL;
const char *str; const char *str;
struct timespec value, interval;
int res = 0; int res = 0;
PW_LOG_TOPIC_INIT(mod_topic); PW_LOG_TOPIC_INIT(mod_topic);
@ -955,6 +1012,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener, &impl->core_listener,
&core_events, impl); &core_events, impl);
impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer source: %m");
goto out;
}
value.tv_sec = 0;
value.tv_nsec = 1;
interval.tv_sec = CLEANUP_INTERVAL_SEC;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false);
if ((res = start_sap_listener(impl)) < 0) if ((res = start_sap_listener(impl)) < 0)
goto out; goto out;