module-rtp: work on session setup

This commit is contained in:
Wim Taymans 2022-10-04 20:44:07 +02:00
parent 14194e137f
commit 6f1e96bb59

View file

@ -34,6 +34,7 @@
#include <spa/param/audio/format-utils.h> #include <spa/param/audio/format-utils.h>
#include <spa/utils/hook.h> #include <spa/utils/hook.h>
#include <spa/debug/mem.h> #include <spa/debug/mem.h>
#include <spa/utils/ringbuffer.h>
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
#include <pipewire/private.h> #include <pipewire/private.h>
@ -105,77 +106,106 @@ struct impl {
struct spa_source *sap_source; struct spa_source *sap_source;
struct pw_stream *playback;
struct spa_hook playback_listener;
struct pw_properties *playback_props; struct pw_properties *playback_props;
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
uint32_t stride;
char *local_ip; char *local_ip;
int local_port; int local_port;
int sess_latency_msec; int sess_latency_msec;
struct spa_list sessions;
};
struct sdp_info {
char origin[128];
char session[256];
struct sockaddr_storage sa;
socklen_t salen;
uint16_t port;
uint8_t payload;
struct spa_audio_info_raw info;
uint32_t stride;
};
#define BUFFER_SIZE (1u<<16)
#define BUFFER_MASK (BUFFER_SIZE-1)
struct session {
struct impl *impl;
struct spa_list link;
struct sdp_info info;
struct spa_source *source;
struct pw_stream *playback;
struct spa_hook playback_listener;
struct spa_ringbuffer ring;
uint8_t buffer[BUFFER_SIZE];
}; };
static void stream_destroy(void *d) static void stream_destroy(void *d)
{ {
struct impl *data = d; struct session *sess = d;
spa_hook_remove(&data->playback_listener); spa_hook_remove(&sess->playback_listener);
data->playback = NULL; sess->playback = NULL;
} }
static void playback_process(void *data) static void playback_process(void *data)
{ {
struct impl *impl = data; struct session *sess = data;
struct pw_buffer *b; struct pw_buffer *buf;
struct spa_buffer *buf; struct spa_data *d;
uint8_t *dst; uint32_t index;
int32_t avail, wanted;
if ((b = pw_stream_dequeue_buffer(impl->playback)) == NULL) { if ((buf = pw_stream_dequeue_buffer(sess->playback)) == NULL) {
pw_log_debug("Out of playback buffers: %m"); pw_log_debug("Out of playback buffers: %m");
return; return;
} }
d = buf->buffer->datas;
buf = b->buffer; wanted = buf->requested ?
if ((dst = buf->datas[0].data) == NULL) SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize)
return; : d[0].maxsize;
buf->datas[0].chunk->offset = 0; avail = spa_ringbuffer_get_read_index(&sess->ring, &index);
buf->datas[0].chunk->stride = impl->stride;
buf->datas[0].chunk->size = 0;
if (avail < wanted) {
pw_log_debug("capture underrun %d < %d", avail, wanted);
memset(d[0].data, 0, wanted);
} else {
spa_ringbuffer_read_data(&sess->ring,
sess->buffer,
BUFFER_SIZE,
index & BUFFER_MASK,
d[0].data, wanted);
index += wanted;
spa_ringbuffer_read_update(&sess->ring, index);
}
d[0].chunk->size = wanted;
d[0].chunk->stride = sess->info.stride;
d[0].chunk->offset = 0;
buf->size = wanted / sess->info.stride;
buf->datas[0].chunk->size = 0; pw_stream_queue_buffer(sess->playback, buf);
b->size = 0 / impl->stride;
pw_stream_queue_buffer(impl->playback, b);
} }
static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message)
{
struct impl *data = d;
pw_log_error("error id:%u seq:%d res:%d (%s): %s",
id, seq, res, spa_strerror(res), message);
if (id == PW_ID_CORE && res == -EPIPE)
pw_impl_module_schedule_destroy(data->module);
}
static const struct pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.error = on_core_error,
};
static void on_stream_state_changed(void *d, enum pw_stream_state old, static void on_stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error) enum pw_stream_state state, const char *error)
{ {
struct impl *data = d; struct session *sess = d;
struct impl *impl = sess->impl;
switch (state) { switch (state) {
case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_UNCONNECTED:
pw_log_info("stream disconnected, unloading"); pw_log_info("stream disconnected, unloading");
pw_impl_module_schedule_destroy(data->module); pw_impl_module_schedule_destroy(impl->module);
break; break;
case PW_STREAM_STATE_ERROR: case PW_STREAM_STATE_ERROR:
pw_log_error("stream error: %s", error); pw_log_error("stream error: %s", error);
@ -192,85 +222,44 @@ static const struct pw_stream_events out_stream_events = {
.process = playback_process .process = playback_process
}; };
static void core_destroy(void *d) static int session_new(struct impl *impl, struct sdp_info *sdp)
{
struct impl *data = d;
spa_hook_remove(&data->core_listener);
data->core = NULL;
pw_impl_module_schedule_destroy(data->module);
}
static const struct pw_proxy_events core_proxy_events = {
.destroy = core_destroy,
};
static void impl_destroy(struct impl *data)
{
if (data->playback)
pw_stream_destroy(data->playback);
if (data->core && data->do_disconnect)
pw_core_disconnect(data->core);
pw_properties_free(data->playback_props);
pw_properties_free(data->props);
free(data->local_ip);
free(data);
}
static void module_destroy(void *d)
{
struct impl *data = d;
spa_hook_remove(&data->module_listener);
impl_destroy(data);
}
static const struct pw_impl_module_events module_events = {
PW_VERSION_IMPL_MODULE_EVENTS,
.destroy = module_destroy,
};
struct sdp_info {
const char *origin;
const char *session;
struct sockaddr_storage sa;
socklen_t salen;
uint16_t port;
uint8_t payload;
struct spa_audio_info_raw info;
};
static int rtp_session_new(struct impl *data, struct sdp_info *sdp)
{ {
struct session *session;
const struct spa_pod *params[1]; const struct spa_pod *params[1];
struct spa_pod_builder b; struct spa_pod_builder b;
uint32_t n_params; uint32_t n_params;
uint8_t buffer[1024]; uint8_t buffer[1024];
struct pw_properties *props;
int res; int res;
data->stride = sdp->info.channels * sizeof(float); session = calloc(1, sizeof(struct session));
if (session == NULL)
pw_properties_setf(data->playback_props, PW_KEY_NODE_RATE, "1/%d", sdp->info.rate);
data->playback = pw_stream_new(data->core,
"rtp-source playback", data->playback_props);
data->playback_props = NULL;
if (data->playback == NULL)
return -errno; return -errno;
pw_stream_add_listener(data->playback, session->impl = impl;
&data->playback_listener, session->info = *sdp;
&out_stream_events, data);
props = pw_properties_copy(impl->playback_props);
if (props == NULL)
return -errno;
pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", sdp->info.rate);
session->playback = pw_stream_new(impl->core,
"rtp-source playback", props);
if (session->playback == NULL)
return -errno;
pw_stream_add_listener(session->playback,
&session->playback_listener,
&out_stream_events, session);
n_params = 0; n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer)); spa_pod_builder_init(&b, buffer, sizeof(buffer));
params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat,
&sdp->info); &sdp->info);
if ((res = pw_stream_connect(data->playback, if ((res = pw_stream_connect(session->playback,
PW_DIRECTION_OUTPUT, PW_DIRECTION_OUTPUT,
PW_ID_ANY, PW_ID_ANY,
PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_MAP_BUFFERS |
@ -282,6 +271,14 @@ static int rtp_session_new(struct impl *data, struct sdp_info *sdp)
return 0; return 0;
} }
static void session_free(struct session *sess)
{
spa_list_remove(&sess->link);
if (sess->playback)
pw_stream_destroy(sess->playback);
free(sess);
}
static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info) static int parse_sdp_c(struct impl *impl, char *c, struct sdp_info *info)
{ {
int res; int res;
@ -364,6 +361,7 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
c += len; c += len;
if (spa_strstartswith(c, "L16/")) { if (spa_strstartswith(c, "L16/")) {
info->info.format = SPA_AUDIO_FORMAT_S16_BE; info->info.format = SPA_AUDIO_FORMAT_S16_BE;
info->stride = 2;
c += 4; c += 4;
} else } else
return -EINVAL; return -EINVAL;
@ -381,6 +379,8 @@ static int parse_sdp_a(struct impl *impl, char *c, struct sdp_info *info)
} else } else
return -EINVAL; return -EINVAL;
info->stride *= info->info.channels;
return 0; return 0;
} }
@ -401,9 +401,9 @@ static int parse_sdp(struct impl *impl, char *sdp, struct sdp_info *info)
goto invalid_version; goto invalid_version;
if (spa_strstartswith(s, "o=")) if (spa_strstartswith(s, "o="))
info->origin = &s[2]; snprintf(info->origin, sizeof(info->origin), "%s", &s[2]);
else if (spa_strstartswith(s, "s=")) else if (spa_strstartswith(s, "s="))
info->session = &s[2]; snprintf(info->session, sizeof(info->session), "%s", &s[2]);
else if (spa_strstartswith(s, "c=")) else if (spa_strstartswith(s, "c="))
res = parse_sdp_c(impl, s, info); res = parse_sdp_c(impl, s, info);
else if (spa_strstartswith(s, "m=")) else if (spa_strstartswith(s, "m="))
@ -566,6 +566,62 @@ error:
} }
static void core_destroy(void *d)
{
struct impl *impl = d;
spa_hook_remove(&impl->core_listener);
impl->core = NULL;
pw_impl_module_schedule_destroy(impl->module);
}
static const struct pw_proxy_events core_proxy_events = {
.destroy = core_destroy,
};
static void impl_destroy(struct impl *impl)
{
struct session *sess;
spa_list_consume(sess, &impl->sessions, link)
session_free(sess);
if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core);
pw_properties_free(impl->playback_props);
pw_properties_free(impl->props);
free(impl->local_ip);
free(impl);
}
static void module_destroy(void *d)
{
struct impl *impl = d;
spa_hook_remove(&impl->module_listener);
impl_destroy(impl);
}
static const struct pw_impl_module_events module_events = {
PW_VERSION_IMPL_MODULE_EVENTS,
.destroy = module_destroy,
};
static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message)
{
struct impl *impl = d;
pw_log_error("error id:%u seq:%d res:%d (%s): %s",
id, seq, res, spa_strerror(res), message);
if (id == PW_ID_CORE && res == -EPIPE)
pw_impl_module_schedule_destroy(impl->module);
}
static const struct pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.error = on_core_error,
};
static const struct spa_dict_item module_info[] = { static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" }, { PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
{ PW_KEY_MODULE_DESCRIPTION, "rtp source" }, { PW_KEY_MODULE_DESCRIPTION, "rtp source" },
@ -600,6 +656,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_log_error( "can't create properties: %m"); pw_log_error( "can't create properties: %m");
goto out; goto out;
} }
spa_list_init(&impl->sessions);
impl->props = props; impl->props = props;
playback_props = pw_properties_new(NULL, NULL); playback_props = pw_properties_new(NULL, NULL);