From 9206fb019f7f3929bc3010384c93f510cea77e6d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sun, 2 May 2021 23:16:05 +0200 Subject: [PATCH] stream: add support for Props param Rework the node and port info management a little. Make sure we emit a node info event when the Props param changed. Implement enum_param on the node. --- src/pipewire/stream.c | 196 ++++++++++++++++++++++++++---------------- 1 file changed, 120 insertions(+), 76 deletions(-) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 9ad0ab0c7..157a7439c 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -118,11 +118,16 @@ struct stream { struct spa_io_position *position; } rt; - uint32_t change_mask_all; + uint32_t port_change_mask_all; struct spa_port_info port_info; struct pw_properties *port_props; + struct spa_param_info port_params[5]; + struct spa_list param_list; - struct spa_param_info params[5]; + + uint32_t change_mask_all; + struct spa_node_info info; + struct spa_param_info params[1]; uint32_t media_type; uint32_t media_subtype; @@ -149,6 +154,16 @@ struct stream { }; static int get_param_index(uint32_t id) +{ + switch (id) { + case SPA_PARAM_Props: + return 0; + default: + return -1; + } +} + +static int get_port_param_index(uint32_t id) { switch (id) { case SPA_PARAM_EnumFormat: @@ -213,11 +228,14 @@ static struct param *add_param(struct stream *impl, spa_list_append(&impl->param_list, &p->link); - idx = get_param_index(id); - if (idx != -1) { - impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + if ((idx = get_param_index(id)) != -1) { + impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS; impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL; impl->params[idx].flags |= SPA_PARAM_INFO_READ; + } else if ((idx = get_port_param_index(id)) != -1) { + impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + impl->port_params[idx].flags ^= SPA_PARAM_INFO_SERIAL; + impl->port_params[idx].flags |= SPA_PARAM_INFO_READ; } return p; @@ -407,6 +425,57 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) return 0; } +static int enum_params(void *object, bool is_port, int seq, uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct stream *d = object; + struct spa_result_node_params result; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + uint32_t count = 0; + struct param *p; + bool found = false; + + spa_return_val_if_fail(num != 0, -EINVAL); + + result.id = id; + result.next = 0; + + pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id, + spa_debug_type_find_name(spa_type_param, id), + start, num); + + spa_list_for_each(p, &d->param_list, link) { + struct spa_pod *param; + + result.index = result.next++; + if (result.index < start) + continue; + + param = p->param; + if (param == NULL || p->id != id) + continue; + + found = true; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) != 0) + continue; + + spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return found ? 0 : -ENOENT; +} + +static int impl_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + return enum_params(object, false, seq, id, start, num, filter); +} + static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param) { struct stream *impl = object; @@ -459,33 +528,19 @@ static int impl_send_command(void *object, const struct spa_command *command) return 0; } -static void emit_node_info(struct stream *d) +static void emit_node_info(struct stream *d, bool full) { - struct spa_node_info info; - - info = SPA_NODE_INFO_INIT(); - if (d->direction == SPA_DIRECTION_INPUT) { - info.max_input_ports = 1; - info.max_output_ports = 0; - } else { - info.max_input_ports = 0; - info.max_output_ports = 1; - } - info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; - /* we're always RT safe, if the stream was marked RT_PROCESS, - * the callback must be RT safe */ - info.flags = SPA_NODE_FLAG_RT; - /* if the callback was not marked RT_PROCESS, we will offload - * the process callback in the main thread and we are ASYNC */ - if (!d->process_rt) - info.flags |= SPA_NODE_FLAG_ASYNC; - spa_node_emit_info(&d->hooks, &info); + if (full) + d->info.change_mask = d->change_mask_all; + if (d->info.change_mask != 0) + spa_node_emit_info(&d->hooks, &d->info); + d->info.change_mask = 0; } static void emit_port_info(struct stream *d, bool full) { if (full) - d->port_info.change_mask = d->change_mask_all; + d->port_info.change_mask = d->port_change_mask_all; if (d->port_info.change_mask != 0) spa_node_emit_port_info(&d->hooks, d->direction, 0, &d->port_info); d->port_info.change_mask = 0; @@ -501,7 +556,7 @@ static int impl_add_listener(void *object, spa_hook_list_isolate(&d->hooks, &save, listener, events, data); - emit_node_info(d); + emit_node_info(d, true); emit_port_info(d, true); spa_hook_list_join(&d->hooks, &save); @@ -546,46 +601,7 @@ static int impl_port_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num, const struct spa_pod *filter) { - struct stream *d = object; - struct spa_result_node_params result; - uint8_t buffer[1024]; - struct spa_pod_builder b = { 0 }; - uint32_t count = 0; - struct param *p; - bool found = false; - - spa_return_val_if_fail(num != 0, -EINVAL); - - result.id = id; - result.next = 0; - - pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id, - spa_debug_type_find_name(spa_type_param, id), - start, num); - - spa_list_for_each(p, &d->param_list, link) { - struct spa_pod *param; - - result.index = result.next++; - if (result.index < start) - continue; - - param = p->param; - if (param == NULL || p->id != id) - continue; - - found = true; - - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - if (spa_pod_filter(&b, &result.param, param, filter) != 0) - continue; - - spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); - - if (++count == num) - break; - } - return found ? 0 : -ENOENT; + return enum_params(object, true, seq, id, start, num, filter); } static int map_data(struct stream *impl, struct spa_data *data, int prot) @@ -892,6 +908,7 @@ static const struct spa_node_methods impl_node = { SPA_VERSION_NODE_METHODS, .add_listener = impl_add_listener, .set_callbacks = impl_set_callbacks, + .enum_params = impl_enum_params, .set_param = impl_set_param, .set_io = impl_set_io, .send_command = impl_send_command, @@ -1527,28 +1544,56 @@ pw_stream_connect(struct pw_stream *stream, else impl->node_methods.process = impl_node_process_output; + impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS); + impl->impl_node.iface = SPA_INTERFACE_INIT( SPA_TYPE_INTERFACE_Node, SPA_VERSION_NODE, &impl->node_methods, impl); impl->change_mask_all = + SPA_NODE_CHANGE_MASK_FLAGS | + SPA_NODE_CHANGE_MASK_PROPS | + SPA_NODE_CHANGE_MASK_PARAMS; + + impl->info = SPA_NODE_INFO_INIT(); + if (impl->direction == SPA_DIRECTION_INPUT) { + impl->info.max_input_ports = 1; + impl->info.max_output_ports = 0; + } else { + impl->info.max_input_ports = 0; + impl->info.max_output_ports = 1; + } + /* we're always RT safe, if the stream was marked RT_PROCESS, + * the callback must be RT safe */ + impl->info.flags = SPA_NODE_FLAG_RT; + /* if the callback was not marked RT_PROCESS, we will offload + * the process callback in the main thread and we are ASYNC */ + if (!impl->process_rt) + impl->info.flags |= SPA_NODE_FLAG_ASYNC; + impl->info.props = &stream->properties->dict; + impl->params[0] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_WRITE); + impl->info.params = impl->params; + impl->info.n_params = 1; + impl->info.change_mask = impl->change_mask_all; + + impl->port_change_mask_all = SPA_PORT_CHANGE_MASK_FLAGS | SPA_PORT_CHANGE_MASK_PROPS | SPA_PORT_CHANGE_MASK_PARAMS; impl->port_info = SPA_PORT_INFO_INIT(); - impl->port_info.change_mask = impl->change_mask_all; + impl->port_info.change_mask = impl->port_change_mask_all; impl->port_info.flags = 0; if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ALLOC_BUFFERS)) impl->port_info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS; - impl->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0); - impl->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0); - impl->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0); - impl->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); - impl->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + impl->port_params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0); + impl->port_params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0); + impl->port_params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0); + impl->port_params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + impl->port_params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); impl->port_info.props = &impl->port_props->dict; - impl->port_info.params = impl->params; + impl->port_info.params = impl->port_params; impl->port_info.n_params = 5; clear_params(impl, SPA_ID_INVALID); @@ -1579,8 +1624,6 @@ pw_stream_connect(struct pw_stream *stream, if (flags & PW_STREAM_FLAG_DONT_RECONNECT) pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true"); - impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS); - if ((str = pw_properties_get(stream->properties, "mem.warn-mlock")) != NULL) impl->warn_mlock = pw_properties_parse_bool(str); @@ -1766,6 +1809,7 @@ int pw_stream_update_params(struct pw_stream *stream, if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0) return res; + emit_node_info(impl, false); emit_port_info(impl, false); return res;