diff --git a/pipewire-alsa/alsa-plugins/pcm_pipewire.c b/pipewire-alsa/alsa-plugins/pcm_pipewire.c index 453ae9b0f..ed729ebc7 100644 --- a/pipewire-alsa/alsa-plugins/pcm_pipewire.c +++ b/pipewire-alsa/alsa-plugins/pcm_pipewire.c @@ -310,7 +310,7 @@ snd_pcm_pipewire_process_record(snd_pcm_pipewire_t *pw, struct pw_buffer *b) return 0; } -static void on_stream_format_changed(void *data, const struct spa_pod *format) +static void on_stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) { snd_pcm_pipewire_t *pw = data; snd_pcm_ioplug_t *io = &pw->io; @@ -322,6 +322,9 @@ static void on_stream_format_changed(void *data, const struct spa_pod *format) uint32_t buffers; uint32_t size; + if (param == NULL || id != SPA_PARAM_Format) + return; + io->period_size = pw->min_avail; buffers = SPA_CLAMP(io->buffer_size / io->period_size, MIN_BUFFERS, MAX_BUFFERS); size = io->period_size * stride; @@ -337,7 +340,7 @@ static void on_stream_format_changed(void *data, const struct spa_pod *format) SPA_PARAM_BUFFERS_stride, SPA_POD_Int(stride), SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); - pw_stream_finish_format(pw->stream, 0, params, n_params); + pw_stream_update_params(pw->stream, params, n_params); } static void on_stream_process(void *data) @@ -360,7 +363,7 @@ static void on_stream_process(void *data) static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, - .format_changed = on_stream_format_changed, + .param_changed = on_stream_param_changed, .process = on_stream_process, }; diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 71df4c296..30bd9d000 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -242,27 +242,20 @@ static void stream_state_changed(void *data, enum pw_stream_state old, case PW_STREAM_STATE_CONNECTING: pa_stream_set_state(s, PA_STREAM_CREATING); break; - case PW_STREAM_STATE_CONFIGURE: - case PW_STREAM_STATE_READY: - break; case PW_STREAM_STATE_PAUSED: - if (old < PW_STREAM_STATE_PAUSED) { - if (s->suspended && s->suspended_callback) { - s->suspended = false; - s->suspended_callback(s, s->suspended_userdata); - } - configure_device(s); - configure_buffers(s); - pa_stream_set_state(s, PA_STREAM_READY); - } - else { - if (!s->suspended && s->suspended_callback) { - s->suspended = true; - s->suspended_callback(s, s->suspended_userdata); - } + if (!s->suspended && s->suspended_callback) { + s->suspended = true; + s->suspended_callback(s, s->suspended_userdata); } break; case PW_STREAM_STATE_STREAMING: + if (s->suspended && s->suspended_callback) { + s->suspended = false; + s->suspended_callback(s, s->suspended_userdata); + } + configure_device(s); + configure_buffers(s); + pa_stream_set_state(s, PA_STREAM_READY); break; } } @@ -359,7 +352,7 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag dump_buffer_attr(s, attr); } -static void stream_format_changed(void *data, const struct spa_pod *format) +static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) { pa_stream *s = data; const struct spa_pod *params[4]; @@ -367,28 +360,25 @@ static void stream_format_changed(void *data, const struct spa_pod *format) uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); struct spa_audio_info info = { 0 }; - int res; unsigned int i; - if (format == NULL) { - res = 0; - goto done; - } + if (param == NULL || id != SPA_PARAM_Format) + return; - spa_format_parse(format, &info.media_type, &info.media_subtype); + spa_format_parse(param, &info.media_type, &info.media_subtype); if (info.media_type != SPA_MEDIA_TYPE_audio || info.media_subtype != SPA_MEDIA_SUBTYPE_raw || - spa_format_audio_raw_parse(format, &info.info.raw) < 0 || + spa_format_audio_raw_parse(param, &info.info.raw) < 0 || !SPA_AUDIO_FORMAT_IS_INTERLEAVED(info.info.raw.format)) { - res = -EINVAL; - goto done; + pw_stream_set_error(s->stream, -EINVAL, "unhandled format"); + return; } s->sample_spec.format = format_id2pa(s, info.info.raw.format); if (s->sample_spec.format == PA_SAMPLE_INVALID) { - res = -EINVAL; - goto done; + pw_stream_set_error(s->stream, -EINVAL, "invalid format"); + return; } s->sample_spec.rate = info.info.raw.rate; s->sample_spec.channels = info.info.raw.channels; @@ -409,10 +399,7 @@ static void stream_format_changed(void *data, const struct spa_pod *format) params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b); - res = 0; - - done: - pw_stream_finish_format(s->stream, res, params, n_params); + pw_stream_update_params(s->stream, params, n_params); } static void stream_control_info(void *data, uint32_t id, const struct pw_stream_control *control) @@ -526,7 +513,7 @@ static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = stream_state_changed, - .format_changed = stream_format_changed, + .param_changed = stream_param_changed, .control_info = stream_control_info, .add_buffer = stream_add_buffer, .remove_buffer = stream_remove_buffer, diff --git a/src/examples/video-play.c b/src/examples/video-play.c index a5fb1f43a..ac6678c75 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -220,7 +220,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, case PW_STREAM_STATE_UNCONNECTED: pw_main_loop_quit(data->loop); break; - case PW_STREAM_STATE_CONFIGURE: + case PW_STREAM_STATE_PAUSED: /* because we started inactive, activate ourselves now */ pw_stream_set_active(data->stream, true); break; @@ -229,7 +229,8 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, } } -/* Be notified when the stream format changes. +/* Be notified when the stream param changes. We're only looking at the + * format changes. * * We are now supposed to call pw_stream_finish_format() with success or * failure, depending on if we can support the format. Because we gave @@ -240,7 +241,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, * that we would like on our buffer, the size, alignment, etc. */ static void -on_stream_format_changed(void *_data, const struct spa_pod *format) +on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) { struct data *data = _data; struct pw_stream *stream = data->stream; @@ -251,16 +252,14 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) void *d; /* NULL means to clear the format */ - if (format == NULL) { - pw_stream_finish_format(stream, 0, NULL, 0); + if (param == NULL || id != SPA_PARAM_Format) return; - } fprintf(stderr, "got format:\n"); - spa_debug_format(2, NULL, format); + spa_debug_format(2, NULL, param); /* call a helper function to parse the format for us. */ - spa_format_video_raw_parse(format, &data->format); + spa_format_video_raw_parse(param, &data->format); if (data->format.format == SPA_VIDEO_FORMAT_RGBA_F32) sdl_format = SDL_PIXELFORMAT_RGBA32; @@ -268,7 +267,7 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) sdl_format = id_to_sdl_format(data->format.format); if (sdl_format == SDL_PIXELFORMAT_UNKNOWN) { - pw_stream_finish_format(stream, -EINVAL, NULL, 0); + pw_stream_set_error(stream, -EINVAL, "unknown pixel format"); return; } @@ -317,14 +316,14 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) CURSOR_META_SIZE(256,256))); /* we are done */ - pw_stream_finish_format(stream, 0, params, 4); + pw_stream_update_params(stream, params, 4); } /* these are the stream events we listen for */ static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, - .format_changed = on_stream_format_changed, + .param_changed = on_stream_param_changed, .process = on_process, }; diff --git a/src/examples/video-src-alloc.c b/src/examples/video-src-alloc.c index 7fbfb1875..db182fbc8 100644 --- a/src/examples/video-src-alloc.c +++ b/src/examples/video-src-alloc.c @@ -188,7 +188,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum printf("stream state: \"%s\"\n", pw_stream_state_as_string(state)); switch (state) { - case PW_STREAM_STATE_CONFIGURE: + case PW_STREAM_STATE_PAUSED: printf("node id: %d\n", pw_stream_get_node_id(data->stream)); break; case PW_STREAM_STATE_STREAMING: @@ -271,18 +271,19 @@ static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer) close(d[0].fd); } -/* Be notified when the stream format changes. +/* Be notified when the stream param changes. We're only looking at the + * format param. * - * We are now supposed to call pw_stream_finish_format() with success or + * We are now supposed to call pw_stream_update_params() with success or * failure, depending on if we can support the format. Because we gave * a list of supported formats, this should be ok. * - * As part of pw_stream_finish_format() we can provide parameters that + * As part of pw_stream_update_params() we can provide parameters that * will control the buffer memory allocation. This includes the metadata * that we would like on our buffer, the size, alignment, etc. */ static void -on_stream_format_changed(void *_data, const struct spa_pod *format) +on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) { struct data *data = _data; struct pw_stream *stream = data->stream; @@ -290,11 +291,10 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); const struct spa_pod *params[5]; - if (format == NULL) { - pw_stream_finish_format(stream, 0, NULL, 0); + if (param == NULL || id != SPA_PARAM_Format) return; - } - spa_format_video_raw_parse(format, &data->format); + + spa_format_video_raw_parse(param, &data->format); data->stride = SPA_ROUND_UP_N(data->format.size.width * BPP, 4); @@ -330,13 +330,13 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) SPA_PARAM_META_size, SPA_POD_Int( CURSOR_META_SIZE(CURSOR_WIDTH,CURSOR_HEIGHT))); - pw_stream_finish_format(stream, 0, params, 5); + pw_stream_update_params(stream, params, 5); } static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, - .format_changed = on_stream_format_changed, + .param_changed = on_stream_param_changed, .add_buffer = on_stream_add_buffer, .remove_buffer = on_stream_remove_buffer, }; diff --git a/src/examples/video-src.c b/src/examples/video-src.c index e78c31fef..35a818544 100644 --- a/src/examples/video-src.c +++ b/src/examples/video-src.c @@ -186,7 +186,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum printf("stream state: \"%s\"\n", pw_stream_state_as_string(state)); switch (state) { - case PW_STREAM_STATE_CONFIGURE: + case PW_STREAM_STATE_PAUSED: printf("node id: %d\n", pw_stream_get_node_id(data->stream)); break; case PW_STREAM_STATE_STREAMING: @@ -210,7 +210,7 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum } static void -on_stream_format_changed(void *_data, const struct spa_pod *format) +on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) { struct data *data = _data; struct pw_stream *stream = data->stream; @@ -218,11 +218,10 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); const struct spa_pod *params[5]; - if (format == NULL) { - pw_stream_finish_format(stream, 0, NULL, 0); + if (param == NULL || id != SPA_PARAM_Format) return; - } - spa_format_video_raw_parse(format, &data->format); + + spa_format_video_raw_parse(param, &data->format); data->stride = SPA_ROUND_UP_N(data->format.size.width * BPP, 4); @@ -258,13 +257,13 @@ on_stream_format_changed(void *_data, const struct spa_pod *format) SPA_PARAM_META_size, SPA_POD_Int( CURSOR_META_SIZE(CURSOR_WIDTH,CURSOR_HEIGHT))); - pw_stream_finish_format(stream, 0, params, 5); + pw_stream_update_params(stream, params, 5); } static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, - .format_changed = on_stream_format_changed, + .param_changed = on_stream_param_changed, }; static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index d9ebd7225..fa77fe3a4 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -265,7 +265,7 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) pw_thread_loop_lock (sink->main_loop); - pw_stream_finish_format (sink->stream, 0, port_params, 2); + pw_stream_update_params (sink->stream, port_params, 2); pw_thread_loop_unlock (sink->main_loop); } @@ -488,8 +488,6 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta switch (state) { case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_CONNECTING: - case PW_STREAM_STATE_CONFIGURE: - case PW_STREAM_STATE_READY: case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_STREAMING: break; @@ -502,11 +500,11 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta } static void -on_format_changed (void *data, const struct spa_pod *format) +on_param_changed (void *data, uint32_t id, const struct spa_pod *param) { GstPipeWireSink *pwsink = data; - if (format == NULL) + if (param == NULL || id != SPA_PARAM_Format) return; if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) @@ -550,7 +548,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) while (TRUE) { state = pw_stream_get_state (pwsink->stream, &error); - if (state == PW_STREAM_STATE_READY) + if (state == PW_STREAM_STATE_PAUSED) break; if (state == PW_STREAM_STATE_ERROR) @@ -645,7 +643,7 @@ copy_properties (GQuark field_id, static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, - .format_changed = on_format_changed, + .param_changed = on_param_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, .process = on_process, diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 683a75c30..cc27445d5 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -454,8 +454,6 @@ on_state_changed (void *data, switch (state) { case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_CONNECTING: - case PW_STREAM_STATE_CONFIGURE: - case PW_STREAM_STATE_READY: case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_STREAMING: break; @@ -686,22 +684,21 @@ connect_error: } static void -on_format_changed (void *data, - const struct spa_pod *format) +on_param_changed (void *data, uint32_t id, + const struct spa_pod *param) { GstPipeWireSrc *pwsrc = data; GstCaps *caps; gboolean res; - if (format == NULL) { + if (param == NULL || id != SPA_PARAM_Format) { GST_DEBUG_OBJECT (pwsrc, "clear format"); - pw_stream_finish_format (pwsrc->stream, 0, NULL, 0); return; } gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); - caps = gst_caps_from_format (format); + caps = gst_caps_from_format (param); GST_DEBUG_OBJECT (pwsrc, "we got format %" GST_PTR_FORMAT, caps); res = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); gst_caps_unref (caps); @@ -726,10 +723,10 @@ on_format_changed (void *data, SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); GST_DEBUG_OBJECT (pwsrc, "doing finish format"); - pw_stream_finish_format (pwsrc->stream, 0, params, 2); + pw_stream_update_params (pwsrc->stream, params, 2); } else { GST_WARNING_OBJECT (pwsrc, "finish format with error"); - pw_stream_finish_format (pwsrc->stream, -EINVAL, NULL, 0); + pw_stream_set_error (pwsrc->stream, -EINVAL, "unhandled format"); } } @@ -960,7 +957,7 @@ static const struct pw_remote_events remote_events = { static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, - .format_changed = on_format_changed, + .param_changed = on_param_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, .process = on_process, diff --git a/src/pipewire/private.h b/src/pipewire/private.h index c330c323d..ead19dc32 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -757,7 +757,8 @@ struct pw_remote { #define pw_stream_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, struct pw_stream_events, m, v, ##__VA_ARGS__) #define pw_stream_emit_destroy(s) pw_stream_emit(s, destroy, 0) #define pw_stream_emit_state_changed(s,o,n,e) pw_stream_emit(s, state_changed,0,o,n,e) -#define pw_stream_emit_format_changed(s,f) pw_stream_emit(s, format_changed,0,f) +#define pw_stream_emit_io_changed(s,i,a,t) pw_stream_emit(s, io_changed,0,i,a,t) +#define pw_stream_emit_param_changed(s,i,p) pw_stream_emit(s, param_changed,0,i,p) #define pw_stream_emit_add_buffer(s,b) pw_stream_emit(s, add_buffer, 0, b) #define pw_stream_emit_remove_buffer(s,b) pw_stream_emit(s, remove_buffer, 0, b) #define pw_stream_emit_process(s) pw_stream_emit(s, process, 0) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 81b3d4878..be798df23 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -72,10 +72,7 @@ struct data { }; struct param { -#define PARAM_TYPE_INIT (1 << 0) -#define PARAM_TYPE_OTHER (1 << 1) -#define PARAM_TYPE_FORMAT (1 << 2) - int type; + uint32_t id; struct spa_list link; struct spa_pod *param; }; @@ -121,8 +118,6 @@ struct stream { struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; - uint32_t pending_seq; - struct queue dequeued; struct queue queued; @@ -133,7 +128,6 @@ struct stream { unsigned int async_connect:1; unsigned int disconnecting:1; unsigned int free_data:1; - unsigned int subscribe:1; unsigned int alloc_buffers:1; unsigned int draining:1; }; @@ -156,30 +150,29 @@ static int get_param_index(uint32_t id) } } -static struct param *add_param(struct pw_stream *stream, - int type, const struct spa_pod *param) +static struct param *add_param(struct stream *impl, + uint32_t id, const struct spa_pod *param) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct param *p; - uint32_t id; int idx; if (param == NULL || !spa_pod_is_object(param)) { errno = EINVAL; return NULL; } + if (id == SPA_ID_INVALID) + id = SPA_POD_OBJECT_ID(param); p = malloc(sizeof(struct param) + SPA_POD_SIZE(param)); if (p == NULL) return NULL; - p->type = type; + p->id = id; p->param = SPA_MEMBER(p, sizeof(struct param), struct spa_pod); memcpy(p->param, param, SPA_POD_SIZE(param)); spa_list_append(&impl->param_list, &p->link); - id = ((const struct spa_pod_object *)param)->body.id; idx = get_param_index(id); if (idx != -1) impl->params[idx].flags |= SPA_PARAM_INFO_READ; @@ -187,19 +180,43 @@ static struct param *add_param(struct pw_stream *stream, return p; } -static void clear_params(struct pw_stream *stream, int type) +static void clear_params(struct stream *impl, uint32_t id) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct param *p, *t; spa_list_for_each_safe(p, t, &impl->param_list, link) { - if ((p->type & type) != 0) { + if (id == SPA_ID_INVALID || p->id == id) { spa_list_remove(&p->link); free(p); } } } +static int update_params(struct stream *impl, uint32_t id, + const struct spa_pod **params, uint32_t n_params) +{ + uint32_t i; + int res = 0; + + if (id != SPA_ID_INVALID) { + clear_params(impl, id); + } else { + for (i = 0; i < n_params; i++) { + if (!spa_pod_is_object(params[i])) + continue; + clear_params(impl, SPA_POD_OBJECT_ID(params[i])); + } + } + for (i = 0; i < n_params; i++) { + if (add_param(impl, id, params[i]) == NULL) { + res = -errno; + break; + } + } + return res; +} + + static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer) { uint32_t index; @@ -496,71 +513,38 @@ static int impl_port_enum_params(void *object, int seq, return 0; } -static int port_set_format(struct stream *impl, - enum spa_direction direction, uint32_t port_id, - uint32_t flags, const struct spa_pod *format) -{ - struct pw_stream *stream = &impl->this; - struct param *p; - int count, res; - - pw_log_debug(NAME" %p: format changed: %p %d", impl, format, impl->disconnecting); - if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) - spa_debug_format(2, NULL, format); - - clear_params(stream, PARAM_TYPE_FORMAT); - if (format && spa_pod_is_object_type(format, SPA_TYPE_OBJECT_Format)) { - p = add_param(stream, PARAM_TYPE_FORMAT, format); - if (p == NULL) { - res = -errno; - goto error_exit; - } - - ((struct spa_pod_object*)p->param)->body.id = SPA_PARAM_Format; - } - else - p = NULL; - - count = pw_stream_emit_format_changed(stream, p ? p->param : NULL); - - if (count == 0) - pw_stream_finish_format(stream, 0, NULL, 0); - - if (stream->state == PW_STREAM_STATE_ERROR) - return -EIO; - - impl->params[3].flags |= SPA_PARAM_INFO_READ; - impl->params[3].flags ^= SPA_PARAM_INFO_SERIAL; - emit_port_info(impl); - - stream_set_state(stream, - p ? - PW_STREAM_STATE_READY : - PW_STREAM_STATE_CONFIGURE, - NULL); - - return 0; - -error_exit: - pw_stream_finish_format(stream, res, NULL, 0); - return res; -} - static int impl_port_set_param(void *object, enum spa_direction direction, uint32_t port_id, uint32_t id, uint32_t flags, const struct spa_pod *param) { struct stream *impl = object; + struct pw_stream *stream = &impl->this; + int res, idx; if (impl->disconnecting) return param == NULL ? 0 : -EIO; - if (id == SPA_PARAM_Format) { - return port_set_format(impl, direction, port_id, flags, param); + pw_log_debug(NAME" %p: param changed: %p %d", impl, param, impl->disconnecting); + if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) + spa_debug_pod(2, NULL, param); + + if ((res = update_params(impl, id, ¶m, 1)) < 0) + return res; + + pw_stream_emit_param_changed(stream, id, param); + + if (stream->state == PW_STREAM_STATE_ERROR) + return -EIO; + + idx = get_param_index(id); + if (idx != -1) { + impl->params[idx].flags |= SPA_PARAM_INFO_READ; + impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL; + emit_port_info(impl); } - else - return -ENOENT; + + return 0; } static int map_data(struct stream *impl, struct spa_data *data, int prot) @@ -689,12 +673,6 @@ static int impl_port_use_buffers(void *object, impl->n_buffers = n_buffers; - stream_set_state(stream, - n_buffers > 0 ? - PW_STREAM_STATE_PAUSED : - PW_STREAM_STATE_READY, - NULL); - return 0; } @@ -846,33 +824,6 @@ static const struct pw_proxy_events proxy_events = { .error = proxy_error, }; -static void node_event_info(void *object, const struct pw_node_info *info) -{ - struct pw_stream *stream = object; - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint32_t subscribe[info->n_params], n_subscribe = 0; - uint32_t i; - - if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS && !impl->subscribe) { - for (i = 0; i < info->n_params; i++) { - - switch (info->params[i].id) { - case SPA_PARAM_PropInfo: - case SPA_PARAM_Props: - subscribe[n_subscribe++] = info->params[i].id; - break; - default: - break; - } - } - if (n_subscribe > 0) { - pw_node_proxy_subscribe_params((struct pw_node_proxy*)stream->proxy, - subscribe, n_subscribe); - impl->subscribe = true; - } - } -} - static struct control *find_control(struct pw_stream *stream, uint32_t id) { struct control *c; @@ -883,9 +834,9 @@ static struct control *find_control(struct pw_stream *stream, uint32_t id) return NULL; } -static void node_event_param(void *object, int seq, +static int node_event_param(void *object, int seq, uint32_t id, uint32_t index, uint32_t next, - const struct spa_pod *param) + struct spa_pod *param) { struct pw_stream *stream = object; @@ -910,7 +861,7 @@ static void node_event_param(void *object, int seq, SPA_PROP_INFO_id, SPA_POD_Id(&iid), SPA_PROP_INFO_name, SPA_POD_String(&c->control.name), SPA_PROP_INFO_type, SPA_POD_PodChoice(&type)) < 0) - return; + return -EINVAL; pod = spa_pod_get_values(type, &n_vals, &choice); @@ -924,19 +875,19 @@ static void node_event_param(void *object, int seq, n_vals = 3; } else - return; + return -ENOTSUP; switch (choice) { case SPA_CHOICE_None: if (n_vals < 1) - return; + return -EINVAL; c->control.n_values = 1; c->control.max_values = 1; c->control.values[0] = c->control.def = c->control.min = c->control.max = vals[0]; break; case SPA_CHOICE_Range: if (n_vals < 3) - return; + return -EINVAL; c->control.n_values = 1; c->control.max_values = 1; c->control.values[0] = vals[0]; @@ -945,7 +896,7 @@ static void node_event_param(void *object, int seq, c->control.max = vals[2]; break; default: - return; + return -ENOTSUP; } c->id = iid; @@ -1006,12 +957,37 @@ static void node_event_param(void *object, int seq, default: break; } + return 0; } -static const struct pw_node_proxy_events node_events = { - PW_VERSION_NODE_PROXY_EVENTS, - .info = node_event_info, - .param = node_event_param, +static void node_event_info(void *object, const struct pw_node_info *info) +{ + struct pw_stream *stream = object; + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + uint32_t i; + + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) { + for (i = 0; i < info->n_params; i++) { + switch (info->params[i].id) { + case SPA_PARAM_PropInfo: + case SPA_PARAM_Props: + pw_node_for_each_param(impl->node, + 0, info->params[i].id, + 0, UINT32_MAX, + NULL, + node_event_param, + stream); + break; + default: + break; + } + } + } +} + +static const struct pw_node_events node_events = { + PW_VERSION_NODE_EVENTS, + .info_changed = node_event_info, }; static int handle_connect(struct pw_stream *stream) @@ -1074,8 +1050,9 @@ static int handle_connect(struct pw_stream *stream) } pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream); - pw_node_proxy_add_listener((struct pw_node_proxy*)stream->proxy, - &stream->node_listener, &node_events, stream); + + + pw_node_add_listener(impl->node, &stream->node_listener, &node_events, stream); return 0; @@ -1118,7 +1095,7 @@ static void on_remote_exported(void *_data, uint32_t proxy_id, uint32_t global_i struct pw_stream *stream = _data; if (stream->proxy && stream->proxy->id == proxy_id) { stream->node_id = global_id; - stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); + stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); } } @@ -1184,7 +1161,6 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, this->state = PW_STREAM_STATE_UNCONNECTED; impl->core = remote->core; - impl->pending_seq = SPA_ID_INVALID; pw_remote_add_listener(remote, &impl->remote_listener, &remote_events, this); @@ -1250,10 +1226,6 @@ const char *pw_stream_state_as_string(enum pw_stream_state state) return "unconnected"; case PW_STREAM_STATE_CONNECTING: return "connecting"; - case PW_STREAM_STATE_CONFIGURE: - return "configure"; - case PW_STREAM_STATE_READY: - return "ready"; case PW_STREAM_STATE_PAUSED: return "paused"; case PW_STREAM_STATE_STREAMING: @@ -1277,7 +1249,7 @@ void pw_stream_destroy(struct pw_stream *stream) spa_hook_remove(&impl->remote_listener); spa_list_remove(&stream->link); - clear_params(stream, PARAM_TYPE_INIT | PARAM_TYPE_OTHER | PARAM_TYPE_FORMAT); + clear_params(impl, SPA_ID_INVALID); pw_log_debug(NAME" %p: free", stream); free(stream->error); @@ -1349,14 +1321,14 @@ struct pw_remote *pw_stream_get_remote(struct pw_stream *stream) return stream->remote; } -static void add_params(struct pw_stream *stream) +static void add_params(struct stream *impl) { uint8_t buffer[4096]; struct spa_pod_builder b; spa_pod_builder_init(&b, buffer, 4096); - add_param(stream, PARAM_TYPE_INIT, + add_param(impl, SPA_PARAM_IO, spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), @@ -1444,11 +1416,11 @@ pw_stream_connect(struct pw_stream *stream, impl->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); impl->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); - clear_params(stream, PARAM_TYPE_INIT | PARAM_TYPE_OTHER | PARAM_TYPE_FORMAT); + clear_params(impl, SPA_ID_INVALID); for (i = 0; i < n_params; i++) - add_param(stream, PARAM_TYPE_INIT, params[i]); + add_param(impl, SPA_ID_INVALID, params[i]); - add_params(stream); + add_params(impl); if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0) return res; @@ -1514,32 +1486,40 @@ int pw_stream_disconnect(struct pw_stream *stream) } SPA_EXPORT -void pw_stream_finish_format(struct pw_stream *stream, - int res, +int pw_stream_set_error(struct pw_stream *stream, + int res, const char *error, ...) +{ + if (res < 0) { + va_list args; + char *value; + + va_start(args, error); + vasprintf(&value, error, args); + + if (stream->proxy) + pw_proxy_error(stream->proxy, res, value); + stream_set_state(stream, PW_STREAM_STATE_ERROR, value); + + va_end(args); + free(value); + } + return res; +} + +SPA_EXPORT +int pw_stream_update_params(struct pw_stream *stream, const struct spa_pod **params, uint32_t n_params) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint32_t i; - - pw_log_debug(NAME" %p: finish format %d %d", stream, res, impl->pending_seq); - - if (res < 0) { - pw_proxy_error(stream->proxy, res, "format failed"); - stream_set_state(stream, PW_STREAM_STATE_ERROR, "format error"); - return; - } - - clear_params(stream, PARAM_TYPE_OTHER); - for (i = 0; i < n_params; i++) - add_param(stream, PARAM_TYPE_OTHER, params[i]); - - impl->pending_seq = SPA_ID_INVALID; + pw_log_debug(NAME" %p: update params", stream); + return update_params(impl, SPA_ID_INVALID, params, n_params); } SPA_EXPORT int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...) { + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); va_list varargs; char buf[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf)); @@ -1581,8 +1561,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu } pod = spa_pod_builder_pop(&b, &f[0]); - pw_node_proxy_set_param((struct pw_node_proxy*)stream->proxy, - SPA_PARAM_Props, 0, pod); + pw_node_set_param(impl->node, SPA_PARAM_Props, 0, pod); return 0; } diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index a220c3721..75ab4f586 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -35,7 +35,8 @@ extern "C" { * * Media streams are used to exchange data with the PipeWire server. A * stream is a wrapper around a proxy for a \ref pw_client_node with - * just one port. + * an adapter. This means the stream will automatically do conversion + * to the type required by the server. * * Streams can be used to: * @@ -46,8 +47,8 @@ extern "C" { * choose a port for you. * * For more complicated nodes such as filters or ports with multiple - * inputs and/or outputs you will need to create a pw_node yourself and - * export it with \ref pw_remote_export. + * inputs and/or outputs you will need to use the pw_filter or make + * a pw_node yourself and export it with \ref pw_remote_export. * * \section sec_create Create * @@ -77,18 +78,15 @@ extern "C" { * * \section sec_format Format negotiation * - * After connecting the stream, it will transition to the \ref - * PW_STREAM_STATE_CONFIGURE state. In this state the format will be - * negotiated by the PipeWire server. + * After connecting the stream, the server will want to configure some + * parameters on the stream. You will be notified of these changes + * with the param_changed event. * - * Once the format has been selected, the format_changed event is - * emited with the configured format as a parameter. + * When a format param change is emited, the client should now prepare + * itself to deal with the format and complete the negotiation procedure + * with a call to \ref pw_stream_update_params(). * - * The client should now prepare itself to deal with the format and - * complete the negotiation procedure with a call to \ref - * pw_stream_finish_format(). - * - * As arguments to \ref pw_stream_finish_format() an array of spa_param + * As arguments to \ref pw_stream_update_params() an array of spa_param * structures must be given. They contain parameters such as buffer size, * number of buffers, required metadata and other parameters for the * media buffers. @@ -161,11 +159,8 @@ enum pw_stream_state { PW_STREAM_STATE_ERROR = -1, /**< the strean is in error */ PW_STREAM_STATE_UNCONNECTED = 0, /**< unconnected */ PW_STREAM_STATE_CONNECTING = 1, /**< connection is in progress */ - PW_STREAM_STATE_CONFIGURE = 2, /**< stream is being configured */ - PW_STREAM_STATE_READY = 3, /**< stream is ready */ - PW_STREAM_STATE_PAUSED = 4, /**< paused, fully configured but not - * processing data yet */ - PW_STREAM_STATE_STREAMING = 5 /**< streaming */ + PW_STREAM_STATE_PAUSED = 2, /**< paused */ + PW_STREAM_STATE_STREAMING = 3 /**< streaming */ }; struct pw_buffer { @@ -203,10 +198,10 @@ struct pw_stream_events { /** Notify information about a control. */ void (*control_info) (void *data, uint32_t id, const struct pw_stream_control *control); - /** when the format changed. The listener should call - * pw_stream_finish_format() from within this callback or later to complete - * the format negotiation and start the buffer negotiation. */ - void (*format_changed) (void *data, const struct spa_pod *format); + /** when io changed on the stream. */ + void (*io_changed) (void *data, uint32_t id, void *area, uint32_t size); + /** when a parameter changed */ + void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param); /** when a new buffer was created for this stream */ void (*add_buffer) (void *data, struct pw_buffer *buffer); @@ -306,21 +301,24 @@ pw_stream_get_node_id(struct pw_stream *stream); /** Disconnect \a stream \memberof pw_stream */ int pw_stream_disconnect(struct pw_stream *stream); +/** Set the stream in error state */ +int pw_stream_set_error(struct pw_stream *stream, /**< a \ref pw_stream */ + int res, /**< a result code */ + const char *error, ... /**< an error message */) SPA_PRINTF_FUNC(3, 4); + /** Complete the negotiation process with result code \a res \memberof pw_stream * * This function should be called after notification of the format. * When \a res indicates success, \a params contain the parameters for the * allocation state. */ -void -pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */ - int res, /**< a result code */ +int +pw_stream_update_params(struct pw_stream *stream, /**< a \ref pw_stream */ const struct spa_pod **params, /**< an array of params. The params should * ideally contain parameters for doing * buffer allocation. */ uint32_t n_params /**< number of elements in \a params */); - /** Set control values */ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...); diff --git a/src/tests/test-stream.c b/src/tests/test-stream.c index 443965b7f..bc73d75b3 100644 --- a/src/tests/test-stream.c +++ b/src/tests/test-stream.c @@ -41,7 +41,8 @@ static void test_abi(void) void (*state_changed) (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error); void (*control_info) (void *data, uint32_t id, const struct pw_stream_control *control); - void (*format_changed) (void *data, const struct spa_pod *format); + void (*io_changed) (void *data, uint32_t id, void *area, uint32_t size); + void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param); void (*add_buffer) (void *data, struct pw_buffer *buffer); void (*remove_buffer) (void *data, struct pw_buffer *buffer); void (*process) (void *data); @@ -51,7 +52,8 @@ static void test_abi(void) TEST_FUNC(ev, test, destroy); TEST_FUNC(ev, test, state_changed); TEST_FUNC(ev, test, control_info); - TEST_FUNC(ev, test, format_changed); + TEST_FUNC(ev, test, io_changed); + TEST_FUNC(ev, test, param_changed); TEST_FUNC(ev, test, add_buffer); TEST_FUNC(ev, test, remove_buffer); TEST_FUNC(ev, test, process); @@ -66,16 +68,12 @@ static void test_abi(void) spa_assert(PW_STREAM_STATE_ERROR == -1); spa_assert(PW_STREAM_STATE_UNCONNECTED == 0); spa_assert(PW_STREAM_STATE_CONNECTING == 1); - spa_assert(PW_STREAM_STATE_CONFIGURE == 2); - spa_assert(PW_STREAM_STATE_READY == 3); - spa_assert(PW_STREAM_STATE_PAUSED == 4); - spa_assert(PW_STREAM_STATE_STREAMING == 5); + spa_assert(PW_STREAM_STATE_PAUSED == 2); + spa_assert(PW_STREAM_STATE_STREAMING == 3); spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_ERROR) != NULL); spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_UNCONNECTED) != NULL); spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_CONNECTING) != NULL); - spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_CONFIGURE) != NULL); - spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_READY) != NULL); spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_PAUSED) != NULL); spa_assert(pw_stream_state_as_string(PW_STREAM_STATE_STREAMING) != NULL); } @@ -89,7 +87,11 @@ static void stream_state_changed_error(void *data, enum pw_stream_state old, { spa_assert_not_reached(); } -static void stream_format_changed_error(void *data, const struct spa_pod *format) +static void stream_io_changed_error(void *data, uint32_t id, void *area, uint32_t size) +{ + spa_assert_not_reached(); +} +static void stream_param_changed_error(void *data, uint32_t id, const struct spa_pod *format) { spa_assert_not_reached(); } @@ -115,7 +117,8 @@ static const struct pw_stream_events stream_events_error = PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy_error, .state_changed = stream_state_changed_error, - .format_changed = stream_format_changed_error, + .io_changed = stream_io_changed_error, + .param_changed = stream_param_changed_error, .add_buffer = stream_add_buffer_error, .remove_buffer = stream_remove_buffer_error, .process = stream_process_error,