diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index df32c7871..dff8471d7 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -105,77 +106,106 @@ struct impl { struct spa_source *sap_source; - struct pw_stream *playback; - struct spa_hook playback_listener; struct pw_properties *playback_props; unsigned int do_disconnect:1; - uint32_t stride; char *local_ip; int local_port; int sess_latency_msec; + + struct spa_list sessions; +}; + +struct sdp_info { + char origin[128]; + char session[256]; + + struct sockaddr_storage sa; + socklen_t salen; + + uint16_t port; + uint8_t payload; + + struct spa_audio_info_raw info; + uint32_t stride; +}; + +#define BUFFER_SIZE (1u<<16) +#define BUFFER_MASK (BUFFER_SIZE-1) + +struct session { + struct impl *impl; + struct spa_list link; + + struct sdp_info info; + + struct spa_source *source; + + struct pw_stream *playback; + struct spa_hook playback_listener; + + struct spa_ringbuffer ring; + uint8_t buffer[BUFFER_SIZE]; }; static void stream_destroy(void *d) { - struct impl *data = d; - spa_hook_remove(&data->playback_listener); - data->playback = NULL; + struct session *sess = d; + spa_hook_remove(&sess->playback_listener); + sess->playback = NULL; } static void playback_process(void *data) { - struct impl *impl = data; - struct pw_buffer *b; - struct spa_buffer *buf; - uint8_t *dst; + struct session *sess = data; + struct pw_buffer *buf; + struct spa_data *d; + uint32_t index; + int32_t avail, wanted; - if ((b = pw_stream_dequeue_buffer(impl->playback)) == NULL) { + if ((buf = pw_stream_dequeue_buffer(sess->playback)) == NULL) { pw_log_debug("Out of playback buffers: %m"); return; } + d = buf->buffer->datas; - buf = b->buffer; - if ((dst = buf->datas[0].data) == NULL) - return; + wanted = buf->requested ? + SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize) + : d[0].maxsize; - buf->datas[0].chunk->offset = 0; - buf->datas[0].chunk->stride = impl->stride; - buf->datas[0].chunk->size = 0; + avail = spa_ringbuffer_get_read_index(&sess->ring, &index); + if (avail < wanted) { + pw_log_debug("capture underrun %d < %d", avail, wanted); + memset(d[0].data, 0, wanted); + } else { + spa_ringbuffer_read_data(&sess->ring, + sess->buffer, + BUFFER_SIZE, + index & BUFFER_MASK, + d[0].data, wanted); + index += wanted; + spa_ringbuffer_read_update(&sess->ring, index); + } + d[0].chunk->size = wanted; + d[0].chunk->stride = sess->info.stride; + d[0].chunk->offset = 0; + buf->size = wanted / sess->info.stride; - buf->datas[0].chunk->size = 0; - b->size = 0 / impl->stride; - - pw_stream_queue_buffer(impl->playback, b); + pw_stream_queue_buffer(sess->playback, buf); } -static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) -{ - struct impl *data = d; - - pw_log_error("error id:%u seq:%d res:%d (%s): %s", - id, seq, res, spa_strerror(res), message); - - if (id == PW_ID_CORE && res == -EPIPE) - pw_impl_module_schedule_destroy(data->module); -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .error = on_core_error, -}; - static void on_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { - struct impl *data = d; + struct session *sess = d; + struct impl *impl = sess->impl; switch (state) { case PW_STREAM_STATE_UNCONNECTED: pw_log_info("stream disconnected, unloading"); - pw_impl_module_schedule_destroy(data->module); + pw_impl_module_schedule_destroy(impl->module); break; case PW_STREAM_STATE_ERROR: pw_log_error("stream error: %s", error); @@ -192,85 +222,44 @@ static const struct pw_stream_events out_stream_events = { .process = playback_process }; -static void core_destroy(void *d) -{ - struct impl *data = d; - spa_hook_remove(&data->core_listener); - data->core = NULL; - pw_impl_module_schedule_destroy(data->module); -} - -static const struct pw_proxy_events core_proxy_events = { - .destroy = core_destroy, -}; - -static void impl_destroy(struct impl *data) -{ - if (data->playback) - pw_stream_destroy(data->playback); - if (data->core && data->do_disconnect) - pw_core_disconnect(data->core); - - pw_properties_free(data->playback_props); - pw_properties_free(data->props); - - free(data->local_ip); - free(data); -} - -static void module_destroy(void *d) -{ - struct impl *data = d; - spa_hook_remove(&data->module_listener); - impl_destroy(data); -} - -static const struct pw_impl_module_events module_events = { - PW_VERSION_IMPL_MODULE_EVENTS, - .destroy = module_destroy, -}; - -struct sdp_info { - const char *origin; - const char *session; - - struct sockaddr_storage sa; - socklen_t salen; - - uint16_t port; - uint8_t payload; - - struct spa_audio_info_raw info; -}; - -static int rtp_session_new(struct impl *data, struct sdp_info *sdp) +static int session_new(struct impl *impl, struct sdp_info *sdp) { + struct session *session; const struct spa_pod *params[1]; struct spa_pod_builder b; uint32_t n_params; uint8_t buffer[1024]; + struct pw_properties *props; int res; - data->stride = sdp->info.channels * sizeof(float); - - pw_properties_setf(data->playback_props, PW_KEY_NODE_RATE, "1/%d", sdp->info.rate); - - data->playback = pw_stream_new(data->core, - "rtp-source playback", data->playback_props); - data->playback_props = NULL; - if (data->playback == NULL) + session = calloc(1, sizeof(struct session)); + if (session == NULL) return -errno; - pw_stream_add_listener(data->playback, - &data->playback_listener, - &out_stream_events, data); + session->impl = impl; + session->info = *sdp; + + props = pw_properties_copy(impl->playback_props); + if (props == NULL) + return -errno; + + pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", sdp->info.rate); + + session->playback = pw_stream_new(impl->core, + "rtp-source playback", props); + if (session->playback == NULL) + return -errno; + + pw_stream_add_listener(session->playback, + &session->playback_listener, + &out_stream_events, session); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &sdp->info); - if ((res = pw_stream_connect(data->playback, + if ((res = pw_stream_connect(session->playback, PW_DIRECTION_OUTPUT, PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | @@ -282,6 +271,14 @@ static int rtp_session_new(struct impl *data, struct sdp_info *sdp) return 0; } +static void session_free(struct session *sess) +{ + spa_list_remove(&sess->link); + if (sess->playback) + pw_stream_destroy(sess->playback); + free(sess); +} + static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) { int res; @@ -364,6 +361,7 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info) c += len; if (spa_strstartswith(c, "L16/")) { info->info.format = SPA_AUDIO_FORMAT_S16_BE; + info->stride = 2; c += 4; } else return -EINVAL; @@ -381,6 +379,8 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info) } else return -EINVAL; + info->stride *= info->info.channels; + return 0; } @@ -401,9 +401,9 @@ static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info) goto invalid_version; if (spa_strstartswith(s, "o=")) - info->origin = &s[2]; + snprintf(info->origin, sizeof(info->origin), "%s", &s[2]); else if (spa_strstartswith(s, "s=")) - info->session = &s[2]; + snprintf(info->session, sizeof(info->session), "%s", &s[2]); else if (spa_strstartswith(s, "c=")) res = parse_sdp_c(impl, s, info); else if (spa_strstartswith(s, "m=")) @@ -566,6 +566,62 @@ error: } +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct session *sess; + spa_list_consume(sess, &impl->sessions, link) + session_free(sess); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + pw_properties_free(impl->playback_props); + pw_properties_free(impl->props); + + free(impl->local_ip); + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, { PW_KEY_MODULE_DESCRIPTION, "rtp source" }, @@ -600,6 +656,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_log_error( "can't create properties: %m"); goto out; } + spa_list_init(&impl->sessions); impl->props = props; playback_props = pw_properties_new(NULL, NULL);