stream: Align with pw_filter

Remove some of the unused states in pw_stream. The app can know the
state by following the format and buffer events.

Make it possible to be notified of io are updates. This should make it
possible to follow the transport etc.

Make it possible to be notified of any param changes.

Rename finish_format to update_params because that is what it does.
Make this work in the same was as the filter: updating the params
removes all old params of the types and installs the new ones.

Don't get the Props and PropInfo from the node proxy, instead get them
directly from the adapter that we have locally. Update the controls
directly on the adapter instead of going to the server first.
This commit is contained in:
Wim Taymans 2019-11-21 16:14:50 +01:00
parent be53554def
commit 738603fd04
11 changed files with 232 additions and 268 deletions

View file

@ -310,7 +310,7 @@ snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b)
return 0;
}
static void on_stream_format_changed(void *data, const struct spa_pod *format)
static void on_stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
snd_pcm_pipewire_t *pw = data;
snd_pcm_ioplug_t *io = &pw->io;
@ -322,6 +322,9 @@ static void on_stream_format_changed(void *data, const struct spa_pod *format)
uint32_t buffers;
uint32_t size;
if (param == NULL || id != SPA_PARAM_Format)
return;
io->period_size = pw->min_avail;
buffers = SPA_CLAMP(io->buffer_size / io->period_size, MIN_BUFFERS, MAX_BUFFERS);
size = io->period_size * stride;
@ -337,7 +340,7 @@ static void on_stream_format_changed(void *data, const struct spa_pod *format)
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(stride),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
pw_stream_finish_format(pw->stream, 0, params, n_params);
pw_stream_update_params(pw->stream, params, n_params);
}
static void on_stream_process(void *data)
@ -360,7 +363,7 @@ static void on_stream_process(void *data)
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.format_changed = on_stream_format_changed,
.param_changed = on_stream_param_changed,
.process = on_stream_process,
};

View file

@ -242,27 +242,20 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
case PW_STREAM_STATE_CONNECTING:
pa_stream_set_state(s, PA_STREAM_CREATING);
break;
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_READY:
break;
case PW_STREAM_STATE_PAUSED:
if (old < PW_STREAM_STATE_PAUSED) {
if (s->suspended && s->suspended_callback) {
s->suspended = false;
s->suspended_callback(s, s->suspended_userdata);
}
configure_device(s);
configure_buffers(s);
pa_stream_set_state(s, PA_STREAM_READY);
}
else {
if (!s->suspended && s->suspended_callback) {
s->suspended = true;
s->suspended_callback(s, s->suspended_userdata);
}
if (!s->suspended && s->suspended_callback) {
s->suspended = true;
s->suspended_callback(s, s->suspended_userdata);
}
break;
case PW_STREAM_STATE_STREAMING:
if (s->suspended && s->suspended_callback) {
s->suspended = false;
s->suspended_callback(s, s->suspended_userdata);
}
configure_device(s);
configure_buffers(s);
pa_stream_set_state(s, PA_STREAM_READY);
break;
}
}
@ -359,7 +352,7 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag
dump_buffer_attr(s, attr);
}
static void stream_format_changed(void *data, const struct spa_pod *format)
static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
pa_stream *s = data;
const struct spa_pod *params[4];
@ -367,28 +360,25 @@ static void stream_format_changed(void *data, const struct spa_pod *format)
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct spa_audio_info info = { 0 };
int res;
unsigned int i;
if (format == NULL) {
res = 0;
goto done;
}
if (param == NULL || id != SPA_PARAM_Format)
return;
spa_format_parse(format, &info.media_type, &info.media_subtype);
spa_format_parse(param, &info.media_type, &info.media_subtype);
if (info.media_type != SPA_MEDIA_TYPE_audio ||
info.media_subtype != SPA_MEDIA_SUBTYPE_raw ||
spa_format_audio_raw_parse(format, &info.info.raw) < 0 ||
spa_format_audio_raw_parse(param, &info.info.raw) < 0 ||
!SPA_AUDIO_FORMAT_IS_INTERLEAVED(info.info.raw.format)) {
res = -EINVAL;
goto done;
pw_stream_set_error(s->stream, -EINVAL, "unhandled format");
return;
}
s->sample_spec.format = format_id2pa(s, info.info.raw.format);
if (s->sample_spec.format == PA_SAMPLE_INVALID) {
res = -EINVAL;
goto done;
pw_stream_set_error(s->stream, -EINVAL, "invalid format");
return;
}
s->sample_spec.rate = info.info.raw.rate;
s->sample_spec.channels = info.info.raw.channels;
@ -409,10 +399,7 @@ static void stream_format_changed(void *data, const struct spa_pod *format)
params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b);
res = 0;
done:
pw_stream_finish_format(s->stream, res, params, n_params);
pw_stream_update_params(s->stream, params, n_params);
}
static void stream_control_info(void *data, uint32_t id, const struct pw_stream_control *control)
@ -526,7 +513,7 @@ static const struct pw_stream_events stream_events =
{
PW_VERSION_STREAM_EVENTS,
.state_changed = stream_state_changed,
.format_changed = stream_format_changed,
.param_changed = stream_param_changed,
.control_info = stream_control_info,
.add_buffer = stream_add_buffer,
.remove_buffer = stream_remove_buffer,

View file

@ -220,7 +220,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old,
case PW_STREAM_STATE_UNCONNECTED:
pw_main_loop_quit(data->loop);
break;
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_PAUSED:
/* because we started inactive, activate ourselves now */
pw_stream_set_active(data->stream, true);
break;
@ -229,7 +229,8 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old,
}
}
/* Be notified when the stream format changes.
/* Be notified when the stream param changes. We're only looking at the
* format changes.
*
* We are now supposed to call pw_stream_finish_format() with success or
* failure, depending on if we can support the format. Because we gave
@ -240,7 +241,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old,
* that we would like on our buffer, the size, alignment, etc.
*/
static void
on_stream_format_changed(void *_data, const struct spa_pod *format)
on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
@ -251,16 +252,14 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
void *d;
/* NULL means to clear the format */
if (format == NULL) {
pw_stream_finish_format(stream, 0, NULL, 0);
if (param == NULL || id != SPA_PARAM_Format)
return;
}
fprintf(stderr, "got format:\n");
spa_debug_format(2, NULL, format);
spa_debug_format(2, NULL, param);
/* call a helper function to parse the format for us. */
spa_format_video_raw_parse(format, &data->format);
spa_format_video_raw_parse(param, &data->format);
if (data->format.format == SPA_VIDEO_FORMAT_RGBA_F32)
sdl_format = SDL_PIXELFORMAT_RGBA32;
@ -268,7 +267,7 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
sdl_format = id_to_sdl_format(data->format.format);
if (sdl_format == SDL_PIXELFORMAT_UNKNOWN) {
pw_stream_finish_format(stream, -EINVAL, NULL, 0);
pw_stream_set_error(stream, -EINVAL, "unknown pixel format");
return;
}
@ -317,14 +316,14 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
CURSOR_META_SIZE(256,256)));
/* we are done */
pw_stream_finish_format(stream, 0, params, 4);
pw_stream_update_params(stream, params, 4);
}
/* these are the stream events we listen for */
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_stream_state_changed,
.format_changed = on_stream_format_changed,
.param_changed = on_stream_param_changed,
.process = on_process,
};

View file

@ -188,7 +188,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum
printf("stream state: \"%s\"\n", pw_stream_state_as_string(state));
switch (state) {
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_PAUSED:
printf("node id: %d\n", pw_stream_get_node_id(data->stream));
break;
case PW_STREAM_STATE_STREAMING:
@ -271,18 +271,19 @@ static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer)
close(d[0].fd);
}
/* Be notified when the stream format changes.
/* Be notified when the stream param changes. We're only looking at the
* format param.
*
* We are now supposed to call pw_stream_finish_format() with success or
* We are now supposed to call pw_stream_update_params() with success or
* failure, depending on if we can support the format. Because we gave
* a list of supported formats, this should be ok.
*
* As part of pw_stream_finish_format() we can provide parameters that
* As part of pw_stream_update_params() we can provide parameters that
* will control the buffer memory allocation. This includes the metadata
* that we would like on our buffer, the size, alignment, etc.
*/
static void
on_stream_format_changed(void *_data, const struct spa_pod *format)
on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
@ -290,11 +291,10 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer));
const struct spa_pod *params[5];
if (format == NULL) {
pw_stream_finish_format(stream, 0, NULL, 0);
if (param == NULL || id != SPA_PARAM_Format)
return;
}
spa_format_video_raw_parse(format, &data->format);
spa_format_video_raw_parse(param, &data->format);
data->stride = SPA_ROUND_UP_N(data->format.size.width * BPP, 4);
@ -330,13 +330,13 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
SPA_PARAM_META_size, SPA_POD_Int(
CURSOR_META_SIZE(CURSOR_WIDTH,CURSOR_HEIGHT)));
pw_stream_finish_format(stream, 0, params, 5);
pw_stream_update_params(stream, params, 5);
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_stream_state_changed,
.format_changed = on_stream_format_changed,
.param_changed = on_stream_param_changed,
.add_buffer = on_stream_add_buffer,
.remove_buffer = on_stream_remove_buffer,
};

View file

@ -186,7 +186,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum
printf("stream state: \"%s\"\n", pw_stream_state_as_string(state));
switch (state) {
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_PAUSED:
printf("node id: %d\n", pw_stream_get_node_id(data->stream));
break;
case PW_STREAM_STATE_STREAMING:
@ -210,7 +210,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum
}
static void
on_stream_format_changed(void *_data, const struct spa_pod *format)
on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
@ -218,11 +218,10 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer));
const struct spa_pod *params[5];
if (format == NULL) {
pw_stream_finish_format(stream, 0, NULL, 0);
if (param == NULL || id != SPA_PARAM_Format)
return;
}
spa_format_video_raw_parse(format, &data->format);
spa_format_video_raw_parse(param, &data->format);
data->stride = SPA_ROUND_UP_N(data->format.size.width * BPP, 4);
@ -258,13 +257,13 @@ on_stream_format_changed(void *_data, const struct spa_pod *format)
SPA_PARAM_META_size, SPA_POD_Int(
CURSOR_META_SIZE(CURSOR_WIDTH,CURSOR_HEIGHT)));
pw_stream_finish_format(stream, 0, params, 5);
pw_stream_update_params(stream, params, 5);
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_stream_state_changed,
.format_changed = on_stream_format_changed,
.param_changed = on_stream_param_changed,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)

View file

@ -265,7 +265,7 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
pw_thread_loop_lock (sink->main_loop);
pw_stream_finish_format (sink->stream, 0, port_params, 2);
pw_stream_update_params (sink->stream, port_params, 2);
pw_thread_loop_unlock (sink->main_loop);
}
@ -488,8 +488,6 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
case PW_STREAM_STATE_CONNECTING:
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_READY:
case PW_STREAM_STATE_PAUSED:
case PW_STREAM_STATE_STREAMING:
break;
@ -502,11 +500,11 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
}
static void
on_format_changed (void *data, const struct spa_pod *format)
on_param_changed (void *data, uint32_t id, const struct spa_pod *param)
{
GstPipeWireSink *pwsink = data;
if (format == NULL)
if (param == NULL || id != SPA_PARAM_Format)
return;
if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool)))
@ -550,7 +548,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
while (TRUE) {
state = pw_stream_get_state (pwsink->stream, &error);
if (state == PW_STREAM_STATE_READY)
if (state == PW_STREAM_STATE_PAUSED)
break;
if (state == PW_STREAM_STATE_ERROR)
@ -645,7 +643,7 @@ copy_properties (GQuark field_id,
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed,
.format_changed = on_format_changed,
.param_changed = on_param_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.process = on_process,

View file

@ -454,8 +454,6 @@ on_state_changed (void *data,
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
case PW_STREAM_STATE_CONNECTING:
case PW_STREAM_STATE_CONFIGURE:
case PW_STREAM_STATE_READY:
case PW_STREAM_STATE_PAUSED:
case PW_STREAM_STATE_STREAMING:
break;
@ -686,22 +684,21 @@ connect_error:
}
static void
on_format_changed (void *data,
const struct spa_pod *format)
on_param_changed (void *data, uint32_t id,
const struct spa_pod *param)
{
GstPipeWireSrc *pwsrc = data;
GstCaps *caps;
gboolean res;
if (format == NULL) {
if (param == NULL || id != SPA_PARAM_Format) {
GST_DEBUG_OBJECT (pwsrc, "clear format");
pw_stream_finish_format (pwsrc->stream, 0, NULL, 0);
return;
}
gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0);
caps = gst_caps_from_format (format);
caps = gst_caps_from_format (param);
GST_DEBUG_OBJECT (pwsrc, "we got format %" GST_PTR_FORMAT, caps);
res = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps);
gst_caps_unref (caps);
@ -726,10 +723,10 @@ on_format_changed (void *data,
SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header)));
GST_DEBUG_OBJECT (pwsrc, "doing finish format");
pw_stream_finish_format (pwsrc->stream, 0, params, 2);
pw_stream_update_params (pwsrc->stream, params, 2);
} else {
GST_WARNING_OBJECT (pwsrc, "finish format with error");
pw_stream_finish_format (pwsrc->stream, -EINVAL, NULL, 0);
pw_stream_set_error (pwsrc->stream, -EINVAL, "unhandled format");
}
}
@ -960,7 +957,7 @@ static const struct pw_remote_events remote_events = {
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed,
.format_changed = on_format_changed,
.param_changed = on_param_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.process = on_process,

View file

@ -757,7 +757,8 @@ struct pw_remote {
#define pw_stream_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, struct pw_stream_events, m, v, ##__VA_ARGS__)
#define pw_stream_emit_destroy(s) pw_stream_emit(s, destroy, 0)
#define pw_stream_emit_state_changed(s,o,n,e) pw_stream_emit(s, state_changed,0,o,n,e)
#define pw_stream_emit_format_changed(s,f) pw_stream_emit(s, format_changed,0,f)
#define pw_stream_emit_io_changed(s,i,a,t) pw_stream_emit(s, io_changed,0,i,a,t)
#define pw_stream_emit_param_changed(s,i,p) pw_stream_emit(s, param_changed,0,i,p)
#define pw_stream_emit_add_buffer(s,b) pw_stream_emit(s, add_buffer, 0, b)
#define pw_stream_emit_remove_buffer(s,b) pw_stream_emit(s, remove_buffer, 0, b)
#define pw_stream_emit_process(s) pw_stream_emit(s, process, 0)

View file

@ -72,10 +72,7 @@ struct data {
};
struct param {
#define PARAM_TYPE_INIT (1 << 0)
#define PARAM_TYPE_OTHER (1 << 1)
#define PARAM_TYPE_FORMAT (1 << 2)
int type;
uint32_t id;
struct spa_list link;
struct spa_pod *param;
};
@ -121,8 +118,6 @@ struct stream {
struct buffer buffers[MAX_BUFFERS];
uint32_t n_buffers;
uint32_t pending_seq;
struct queue dequeued;
struct queue queued;
@ -133,7 +128,6 @@ struct stream {
unsigned int async_connect:1;
unsigned int disconnecting:1;
unsigned int free_data:1;
unsigned int subscribe:1;
unsigned int alloc_buffers:1;
unsigned int draining:1;
};
@ -156,30 +150,29 @@ static int get_param_index(uint32_t id)
}
}
static struct param *add_param(struct pw_stream *stream,
int type, const struct spa_pod *param)
static struct param *add_param(struct stream *impl,
uint32_t id, const struct spa_pod *param)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct param *p;
uint32_t id;
int idx;
if (param == NULL || !spa_pod_is_object(param)) {
errno = EINVAL;
return NULL;
}
if (id == SPA_ID_INVALID)
id = SPA_POD_OBJECT_ID(param);
p = malloc(sizeof(struct param) + SPA_POD_SIZE(param));
if (p == NULL)
return NULL;
p->type = type;
p->id = id;
p->param = SPA_MEMBER(p, sizeof(struct param), struct spa_pod);
memcpy(p->param, param, SPA_POD_SIZE(param));
spa_list_append(&impl->param_list, &p->link);
id = ((const struct spa_pod_object *)param)->body.id;
idx = get_param_index(id);
if (idx != -1)
impl->params[idx].flags |= SPA_PARAM_INFO_READ;
@ -187,19 +180,43 @@ static struct param *add_param(struct pw_stream *stream,
return p;
}
static void clear_params(struct pw_stream *stream, int type)
static void clear_params(struct stream *impl, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct param *p, *t;
spa_list_for_each_safe(p, t, &impl->param_list, link) {
if ((p->type & type) != 0) {
if (id == SPA_ID_INVALID || p->id == id) {
spa_list_remove(&p->link);
free(p);
}
}
}
static int update_params(struct stream *impl, uint32_t id,
const struct spa_pod **params, uint32_t n_params)
{
uint32_t i;
int res = 0;
if (id != SPA_ID_INVALID) {
clear_params(impl, id);
} else {
for (i = 0; i < n_params; i++) {
if (!spa_pod_is_object(params[i]))
continue;
clear_params(impl, SPA_POD_OBJECT_ID(params[i]));
}
}
for (i = 0; i < n_params; i++) {
if (add_param(impl, id, params[i]) == NULL) {
res = -errno;
break;
}
}
return res;
}
static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
{
uint32_t index;
@ -496,71 +513,38 @@ static int impl_port_enum_params(void *object, int seq,
return 0;
}
static int port_set_format(struct stream *impl,
enum spa_direction direction, uint32_t port_id,
uint32_t flags, const struct spa_pod *format)
{
struct pw_stream *stream = &impl->this;
struct param *p;
int count, res;
pw_log_debug(NAME" %p: format changed: %p %d", impl, format, impl->disconnecting);
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_format(2, NULL, format);
clear_params(stream, PARAM_TYPE_FORMAT);
if (format && spa_pod_is_object_type(format, SPA_TYPE_OBJECT_Format)) {
p = add_param(stream, PARAM_TYPE_FORMAT, format);
if (p == NULL) {
res = -errno;
goto error_exit;
}
((struct spa_pod_object*)p->param)->body.id = SPA_PARAM_Format;
}
else
p = NULL;
count = pw_stream_emit_format_changed(stream, p ? p->param : NULL);
if (count == 0)
pw_stream_finish_format(stream, 0, NULL, 0);
if (stream->state == PW_STREAM_STATE_ERROR)
return -EIO;
impl->params[3].flags |= SPA_PARAM_INFO_READ;
impl->params[3].flags ^= SPA_PARAM_INFO_SERIAL;
emit_port_info(impl);
stream_set_state(stream,
p ?
PW_STREAM_STATE_READY :
PW_STREAM_STATE_CONFIGURE,
NULL);
return 0;
error_exit:
pw_stream_finish_format(stream, res, NULL, 0);
return res;
}
static int impl_port_set_param(void *object,
enum spa_direction direction, uint32_t port_id,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
struct stream *impl = object;
struct pw_stream *stream = &impl->this;
int res, idx;
if (impl->disconnecting)
return param == NULL ? 0 : -EIO;
if (id == SPA_PARAM_Format) {
return port_set_format(impl, direction, port_id, flags, param);
pw_log_debug(NAME" %p: param changed: %p %d", impl, param, impl->disconnecting);
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(2, NULL, param);
if ((res = update_params(impl, id, &param, 1)) < 0)
return res;
pw_stream_emit_param_changed(stream, id, param);
if (stream->state == PW_STREAM_STATE_ERROR)
return -EIO;
idx = get_param_index(id);
if (idx != -1) {
impl->params[idx].flags |= SPA_PARAM_INFO_READ;
impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL;
emit_port_info(impl);
}
else
return -ENOENT;
return 0;
}
static int map_data(struct stream *impl, struct spa_data *data, int prot)
@ -689,12 +673,6 @@ static int impl_port_use_buffers(void *object,
impl->n_buffers = n_buffers;
stream_set_state(stream,
n_buffers > 0 ?
PW_STREAM_STATE_PAUSED :
PW_STREAM_STATE_READY,
NULL);
return 0;
}
@ -846,33 +824,6 @@ static const struct pw_proxy_events proxy_events = {
.error = proxy_error,
};
static void node_event_info(void *object, const struct pw_node_info *info)
{
struct pw_stream *stream = object;
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint32_t subscribe[info->n_params], n_subscribe = 0;
uint32_t i;
if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS && !impl->subscribe) {
for (i = 0; i < info->n_params; i++) {
switch (info->params[i].id) {
case SPA_PARAM_PropInfo:
case SPA_PARAM_Props:
subscribe[n_subscribe++] = info->params[i].id;
break;
default:
break;
}
}
if (n_subscribe > 0) {
pw_node_proxy_subscribe_params((struct pw_node_proxy*)stream->proxy,
subscribe, n_subscribe);
impl->subscribe = true;
}
}
}
static struct control *find_control(struct pw_stream *stream, uint32_t id)
{
struct control *c;
@ -883,9 +834,9 @@ static struct control *find_control(struct pw_stream *stream, uint32_t id)
return NULL;
}
static void node_event_param(void *object, int seq,
static int node_event_param(void *object, int seq,
uint32_t id, uint32_t index, uint32_t next,
const struct spa_pod *param)
struct spa_pod *param)
{
struct pw_stream *stream = object;
@ -910,7 +861,7 @@ static void node_event_param(void *object, int seq,
SPA_PROP_INFO_id, SPA_POD_Id(&iid),
SPA_PROP_INFO_name, SPA_POD_String(&c->control.name),
SPA_PROP_INFO_type, SPA_POD_PodChoice(&type)) < 0)
return;
return -EINVAL;
pod = spa_pod_get_values(type, &n_vals, &choice);
@ -924,19 +875,19 @@ static void node_event_param(void *object, int seq,
n_vals = 3;
}
else
return;
return -ENOTSUP;
switch (choice) {
case SPA_CHOICE_None:
if (n_vals < 1)
return;
return -EINVAL;
c->control.n_values = 1;
c->control.max_values = 1;
c->control.values[0] = c->control.def = c->control.min = c->control.max = vals[0];
break;
case SPA_CHOICE_Range:
if (n_vals < 3)
return;
return -EINVAL;
c->control.n_values = 1;
c->control.max_values = 1;
c->control.values[0] = vals[0];
@ -945,7 +896,7 @@ static void node_event_param(void *object, int seq,
c->control.max = vals[2];
break;
default:
return;
return -ENOTSUP;
}
c->id = iid;
@ -1006,12 +957,37 @@ static void node_event_param(void *object, int seq,
default:
break;
}
return 0;
}
static const struct pw_node_proxy_events node_events = {
PW_VERSION_NODE_PROXY_EVENTS,
.info = node_event_info,
.param = node_event_param,
static void node_event_info(void *object, const struct pw_node_info *info)
{
struct pw_stream *stream = object;
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint32_t i;
if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) {
for (i = 0; i < info->n_params; i++) {
switch (info->params[i].id) {
case SPA_PARAM_PropInfo:
case SPA_PARAM_Props:
pw_node_for_each_param(impl->node,
0, info->params[i].id,
0, UINT32_MAX,
NULL,
node_event_param,
stream);
break;
default:
break;
}
}
}
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.info_changed = node_event_info,
};
static int handle_connect(struct pw_stream *stream)
@ -1074,8 +1050,9 @@ static int handle_connect(struct pw_stream *stream)
}
pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream);
pw_node_proxy_add_listener((struct pw_node_proxy*)stream->proxy,
&stream->node_listener, &node_events, stream);
pw_node_add_listener(impl->node, &stream->node_listener, &node_events, stream);
return 0;
@ -1118,7 +1095,7 @@ static void on_remote_exported(void *_data, uint32_t proxy_id, uint32_t global_i
struct pw_stream *stream = _data;
if (stream->proxy && stream->proxy->id == proxy_id) {
stream->node_id = global_id;
stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL);
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
}
}
@ -1184,7 +1161,6 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
this->state = PW_STREAM_STATE_UNCONNECTED;
impl->core = remote->core;
impl->pending_seq = SPA_ID_INVALID;
pw_remote_add_listener(remote, &impl->remote_listener, &remote_events, this);
@ -1250,10 +1226,6 @@ const char *pw_stream_state_as_string(enum pw_stream_state state)
return "unconnected";
case PW_STREAM_STATE_CONNECTING:
return "connecting";
case PW_STREAM_STATE_CONFIGURE:
return "configure";
case PW_STREAM_STATE_READY:
return "ready";
case PW_STREAM_STATE_PAUSED:
return "paused";
case PW_STREAM_STATE_STREAMING:
@ -1277,7 +1249,7 @@ void pw_stream_destroy(struct pw_stream *stream)
spa_hook_remove(&impl->remote_listener);
spa_list_remove(&stream->link);
clear_params(stream, PARAM_TYPE_INIT | PARAM_TYPE_OTHER | PARAM_TYPE_FORMAT);
clear_params(impl, SPA_ID_INVALID);
pw_log_debug(NAME" %p: free", stream);
free(stream->error);
@ -1349,14 +1321,14 @@ struct pw_remote *pw_stream_get_remote(struct pw_stream *stream)
return stream->remote;
}
static void add_params(struct pw_stream *stream)
static void add_params(struct stream *impl)
{
uint8_t buffer[4096];
struct spa_pod_builder b;
spa_pod_builder_init(&b, buffer, 4096);
add_param(stream, PARAM_TYPE_INIT,
add_param(impl, SPA_PARAM_IO,
spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
@ -1444,11 +1416,11 @@ pw_stream_connect(struct pw_stream *stream,
impl->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
impl->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
clear_params(stream, PARAM_TYPE_INIT | PARAM_TYPE_OTHER | PARAM_TYPE_FORMAT);
clear_params(impl, SPA_ID_INVALID);
for (i = 0; i < n_params; i++)
add_param(stream, PARAM_TYPE_INIT, params[i]);
add_param(impl, SPA_ID_INVALID, params[i]);
add_params(stream);
add_params(impl);
if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0)
return res;
@ -1514,32 +1486,40 @@ int pw_stream_disconnect(struct pw_stream *stream)
}
SPA_EXPORT
void pw_stream_finish_format(struct pw_stream *stream,
int res,
int pw_stream_set_error(struct pw_stream *stream,
int res, const char *error, ...)
{
if (res < 0) {
va_list args;
char *value;
va_start(args, error);
vasprintf(&value, error, args);
if (stream->proxy)
pw_proxy_error(stream->proxy, res, value);
stream_set_state(stream, PW_STREAM_STATE_ERROR, value);
va_end(args);
free(value);
}
return res;
}
SPA_EXPORT
int pw_stream_update_params(struct pw_stream *stream,
const struct spa_pod **params,
uint32_t n_params)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint32_t i;
pw_log_debug(NAME" %p: finish format %d %d", stream, res, impl->pending_seq);
if (res < 0) {
pw_proxy_error(stream->proxy, res, "format failed");
stream_set_state(stream, PW_STREAM_STATE_ERROR, "format error");
return;
}
clear_params(stream, PARAM_TYPE_OTHER);
for (i = 0; i < n_params; i++)
add_param(stream, PARAM_TYPE_OTHER, params[i]);
impl->pending_seq = SPA_ID_INVALID;
pw_log_debug(NAME" %p: update params", stream);
return update_params(impl, SPA_ID_INVALID, params, n_params);
}
SPA_EXPORT
int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
va_list varargs;
char buf[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
@ -1581,8 +1561,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu
}
pod = spa_pod_builder_pop(&b, &f[0]);
pw_node_proxy_set_param((struct pw_node_proxy*)stream->proxy,
SPA_PARAM_Props, 0, pod);
pw_node_set_param(impl->node, SPA_PARAM_Props, 0, pod);
return 0;
}

View file

@ -35,7 +35,8 @@ extern "C" {
*
* Media streams are used to exchange data with the PipeWire server. A
* stream is a wrapper around a proxy for a \ref pw_client_node with
* just one port.
* an adapter. This means the stream will automatically do conversion
* to the type required by the server.
*
* Streams can be used to:
*
@ -46,8 +47,8 @@ extern "C" {
* choose a port for you.
*
* For more complicated nodes such as filters or ports with multiple
* inputs and/or outputs you will need to create a pw_node yourself and
* export it with \ref pw_remote_export.
* inputs and/or outputs you will need to use the pw_filter or make
* a pw_node yourself and export it with \ref pw_remote_export.
*
* \section sec_create Create
*
@ -77,18 +78,15 @@ extern "C" {
*
* \section sec_format Format negotiation
*
* After connecting the stream, it will transition to the \ref
* PW_STREAM_STATE_CONFIGURE state. In this state the format will be
* negotiated by the PipeWire server.
* After connecting the stream, the server will want to configure some
* parameters on the stream. You will be notified of these changes
* with the param_changed event.
*
* Once the format has been selected, the format_changed event is
* emited with the configured format as a parameter.
* When a format param change is emited, the client should now prepare
* itself to deal with the format and complete the negotiation procedure
* with a call to \ref pw_stream_update_params().
*
* The client should now prepare itself to deal with the format and
* complete the negotiation procedure with a call to \ref
* pw_stream_finish_format().
*
* As arguments to \ref pw_stream_finish_format() an array of spa_param
* As arguments to \ref pw_stream_update_params() an array of spa_param
* structures must be given. They contain parameters such as buffer size,
* number of buffers, required metadata and other parameters for the
* media buffers.
@ -161,11 +159,8 @@ enum pw_stream_state {
PW_STREAM_STATE_ERROR = -1, /**< the strean is in error */
PW_STREAM_STATE_UNCONNECTED = 0, /**< unconnected */
PW_STREAM_STATE_CONNECTING = 1, /**< connection is in progress */
PW_STREAM_STATE_CONFIGURE = 2, /**< stream is being configured */
PW_STREAM_STATE_READY = 3, /**< stream is ready */
PW_STREAM_STATE_PAUSED = 4, /**< paused, fully configured but not
* processing data yet */
PW_STREAM_STATE_STREAMING = 5 /**< streaming */
PW_STREAM_STATE_PAUSED = 2, /**< paused */
PW_STREAM_STATE_STREAMING = 3 /**< streaming */
};
struct pw_buffer {
@ -203,10 +198,10 @@ struct pw_stream_events {
/** Notify information about a control. */
void (*control_info) (void *data, uint32_t id, const struct pw_stream_control *control);
/** when the format changed. The listener should call
* pw_stream_finish_format() from within this callback or later to complete
* the format negotiation and start the buffer negotiation. */
void (*format_changed) (void *data, const struct spa_pod *format);
/** when io changed on the stream. */
void (*io_changed) (void *data, uint32_t id, void *area, uint32_t size);
/** when a parameter changed */
void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param);
/** when a new buffer was created for this stream */
void (*add_buffer) (void *data, struct pw_buffer *buffer);
@ -306,21 +301,24 @@ pw_stream_get_node_id(struct pw_stream *stream);
/** Disconnect \a stream \memberof pw_stream */
int pw_stream_disconnect(struct pw_stream *stream);
/** Set the stream in error state */
int pw_stream_set_error(struct pw_stream *stream, /**< a \ref pw_stream */
int res, /**< a result code */
const char *error, ... /**< an error message */) SPA_PRINTF_FUNC(3, 4);
/** Complete the negotiation process with result code \a res \memberof pw_stream
*
* This function should be called after notification of the format.
* When \a res indicates success, \a params contain the parameters for the
* allocation state. */
void
pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */
int res, /**< a result code */
int
pw_stream_update_params(struct pw_stream *stream, /**< a \ref pw_stream */
const struct spa_pod **params, /**< an array of params. The params should
* ideally contain parameters for doing
* buffer allocation. */
uint32_t n_params /**< number of elements in \a params */);
/** Set control values */
int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...);

View file

@ -41,7 +41,8 @@ static void test_abi(void)
void (*state_changed) (void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error);
void (*control_info) (void *data, uint32_t id, const struct pw_stream_control *control);
void (*format_changed) (void *data, const struct spa_pod *format);
void (*io_changed) (void *data, uint32_t id, void *area, uint32_t size);
void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param);
void (*add_buffer) (void *data, struct pw_buffer *buffer);
void (*remove_buffer) (void *data, struct pw_buffer *buffer);
void (*process) (void *data);
@ -51,7 +52,8 @@ static void test_abi(void)
TEST_FUNC(ev, test, destroy);
TEST_FUNC(ev, test, state_changed);
TEST_FUNC(ev, test, control_info);
TEST_FUNC(ev, test, format_changed);
TEST_FUNC(ev, test, io_changed);
TEST_FUNC(ev, test, param_changed);
TEST_FUNC(ev, test, add_buffer);
TEST_FUNC(ev, test, remove_buffer);
TEST_FUNC(ev, test, process);
@ -66,16 +68,12 @@ static void test_abi(void)
spa_assert(PW_STREAM_STATE_ERROR == -1);
spa_assert(PW_STREAM_STATE_UNCONNECTED == 0);
spa_assert(PW_STREAM_STATE_CONNECTING == 1);
spa_assert(PW_STREAM_STATE_CONFIGURE == 2);
spa_assert(PW_STREAM_STATE_READY == 3);
spa_assert(PW_STREAM_STATE_PAUSED == 4);
spa_assert(PW_STREAM_STATE_STREAMING == 5);
spa_assert(PW_STREAM_STATE_PAUSED == 2);
spa_assert(PW_STREAM_STATE_STREAMING == 3);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_ERROR) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_UNCONNECTED) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_CONNECTING) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_CONFIGURE) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_READY) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_PAUSED) != NULL);
spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_STREAMING) != NULL);
}
@ -89,7 +87,11 @@ static void stream_state_changed_error(void *data, enum pw_stream_state old,
{
spa_assert_not_reached();
}
static void stream_format_changed_error(void *data, const struct spa_pod *format)
static void stream_io_changed_error(void *data, uint32_t id, void *area, uint32_t size)
{
spa_assert_not_reached();
}
static void stream_param_changed_error(void *data, uint32_t id, const struct spa_pod *format)
{
spa_assert_not_reached();
}
@ -115,7 +117,8 @@ static const struct pw_stream_events stream_events_error =
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy_error,
.state_changed = stream_state_changed_error,
.format_changed = stream_format_changed_error,
.io_changed = stream_io_changed_error,
.param_changed = stream_param_changed_error,
.add_buffer = stream_add_buffer_error,
.remove_buffer = stream_remove_buffer_error,
.process = stream_process_error,