diff --git a/pipewire-pulseaudio b/pipewire-pulseaudio index 571cb214d..d4cf47ec5 160000 --- a/pipewire-pulseaudio +++ b/pipewire-pulseaudio @@ -1 +1 @@ -Subproject commit 571cb214d536936317c2750aaf1e1ec90eec25a4 +Subproject commit d4cf47ec558a0404bcc8c266dcab0c08cd5982ec diff --git a/src/modules/module-protocol-native/protocol-native.c b/src/modules/module-protocol-native/protocol-native.c index db3810a1f..e67497c8f 100644 --- a/src/modules/module-protocol-native/protocol-native.c +++ b/src/modules/module-protocol-native/protocol-native.c @@ -970,6 +970,38 @@ static int node_demarshal_param(void *object, void *data, size_t size) seq, id, index, next, param); } +static int node_marshal_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids) +{ + struct pw_proxy *proxy = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_SUBSCRIBE_PARAMS, NULL); + + spa_pod_builder_add_struct(b, + SPA_POD_Array(sizeof(uint32_t), SPA_TYPE_Id, n_ids, ids)); + + return pw_protocol_native_end_proxy(proxy, b); +} + +static int node_demarshal_subscribe_params(void *object, void *data, size_t size) +{ + struct pw_resource *resource = object; + struct spa_pod_parser prs; + uint32_t csize, ctype, n_ids; + uint32_t *ids; + + spa_pod_parser_init(&prs, data, size); + if (spa_pod_parser_get_struct(&prs, + SPA_POD_Array(&csize, &ctype, &n_ids, &ids)) < 0) + return -EINVAL; + + if (ctype != SPA_TYPE_Id) + return -EINVAL; + + return pw_resource_do(resource, struct pw_node_proxy_methods, subscribe_params, 0, + ids, n_ids); +} + static int node_marshal_enum_params(void *object, int seq, uint32_t id, uint32_t index, uint32_t num, const struct spa_pod *filter) { @@ -1769,12 +1801,14 @@ static const struct pw_protocol_marshal pw_protocol_native_device_marshal = { static const struct pw_node_proxy_methods pw_protocol_native_node_method_marshal = { PW_VERSION_NODE_PROXY_METHODS, + &node_marshal_subscribe_params, &node_marshal_enum_params, &node_marshal_set_param, &node_marshal_send_command, }; static const struct pw_protocol_native_demarshal pw_protocol_native_node_method_demarshal[] = { + { &node_demarshal_subscribe_params, 0, }, { &node_demarshal_enum_params, 0, }, { &node_demarshal_set_param, PW_PERM_W, }, { &node_demarshal_send_command, PW_PERM_W, }, diff --git a/src/pipewire/interfaces.h b/src/pipewire/interfaces.h index 7ad0a94ac..5ea0ec40d 100644 --- a/src/pipewire/interfaces.h +++ b/src/pipewire/interfaces.h @@ -621,15 +621,27 @@ pw_device_proxy_add_listener(struct pw_device_proxy *device, #define PW_VERSION_NODE 0 -#define PW_NODE_PROXY_METHOD_ENUM_PARAMS 0 -#define PW_NODE_PROXY_METHOD_SET_PARAM 1 -#define PW_NODE_PROXY_METHOD_SEND_COMMAND 2 -#define PW_NODE_PROXY_METHOD_NUM 3 +#define PW_NODE_PROXY_METHOD_SUBSCRIBE_PARAMS 0 +#define PW_NODE_PROXY_METHOD_ENUM_PARAMS 1 +#define PW_NODE_PROXY_METHOD_SET_PARAM 2 +#define PW_NODE_PROXY_METHOD_SEND_COMMAND 3 +#define PW_NODE_PROXY_METHOD_NUM 4 /** Node methods */ struct pw_node_proxy_methods { #define PW_VERSION_NODE_PROXY_METHODS 0 uint32_t version; + /** + * Subscribe to parameter changes + * + * Automatically emit param events for the given ids when + * they are changed. + * + * \param ids an array of param ids + * \param n_ids the number of ids in \a ids + */ + int (*subscribe_params) (void *object, uint32_t *ids, uint32_t n_ids); + /** * Enumerate node parameters * @@ -665,6 +677,13 @@ struct pw_node_proxy_methods { }; /** Node */ +static inline int +pw_node_proxy_subscribe_params(struct pw_node_proxy *node, uint32_t *ids, uint32_t n_ids) +{ + return pw_proxy_do((struct pw_proxy*)node, struct pw_node_proxy_methods, subscribe_params, + ids, n_ids); +} + static inline int pw_node_proxy_enum_params(struct pw_node_proxy *node, int seq, uint32_t id, uint32_t index, uint32_t num, const struct spa_pod *filter) diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 9fd229de9..6668ffae1 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -62,6 +62,8 @@ struct resource_data { struct spa_hook resource_listener; struct pw_node *node; struct pw_resource *resource; + uint32_t subscribe_ids[64]; + uint32_t n_subscribe_ids; }; /** \endcond */ @@ -193,6 +195,62 @@ static void emit_info_changed(struct pw_node *node) node->info.change_mask = 0; } +static int resource_is_subscribed(struct pw_resource *resource, uint32_t id) +{ + struct resource_data *data = pw_resource_get_user_data(resource); + uint32_t i; + + for (i = 0; i < data->n_subscribe_ids; i++) { + if (data->subscribe_ids[i] == id) + return 1; + } + return 0; +} + +static int notify_param(void *data, int seq, uint32_t id, + uint32_t index, uint32_t next, struct spa_pod *param) +{ + struct pw_node *node = data; + struct pw_resource *resource; + + spa_list_for_each(resource, &node->global->resource_list, link) { + if (!resource_is_subscribed(resource, id)) + continue; + + pw_log_debug("resource %p: notify param %d", resource, id); + pw_node_resource_param(resource, seq, id, index, next, param); + } + return 0; +} + +static void emit_params(struct pw_node *node, uint32_t *changed_ids, uint32_t n_changed_ids) +{ + uint32_t i; + int res; + + if (node->global == NULL) + return; + + pw_log_debug("node %p: emit %d params", node, n_changed_ids); + + for (i = 0; i < n_changed_ids; i++) { + struct pw_resource *resource; + int subscribed = 0; + + /* first check if anyone is subscribed */ + spa_list_for_each(resource, &node->global->resource_list, link) { + if ((subscribed = resource_is_subscribed(resource, changed_ids[i]))) + break; + } + if (!subscribed) + continue; + + if ((res = pw_node_for_each_param(node, 1, changed_ids[i], 0, UINT32_MAX, + NULL, notify_param, node)) < 0) { + pw_log_error("node %p: error %d (%s)", node, res, spa_strerror(res)); + } + } +} static void node_update_state(struct pw_node *node, enum pw_node_state state, char *error) { @@ -302,6 +360,24 @@ static int node_enum_params(void *object, int seq, uint32_t id, return 0; } +static int node_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids) +{ + struct pw_resource *resource = object; + struct resource_data *data = pw_resource_get_user_data(resource); + uint32_t i; + + n_ids = SPA_MIN(n_ids, SPA_N_ELEMENTS(data->subscribe_ids)); + data->n_subscribe_ids = n_ids; + + for (i = 0; i < n_ids; i++) { + data->subscribe_ids[i] = ids[i]; + pw_log_debug("resource %p: subscribe param %s", resource, + spa_debug_type_find_name(spa_type_param, ids[i])); + node_enum_params(resource, 1, ids[i], 0, UINT32_MAX, NULL); + } + return 0; +} + static int node_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param) { @@ -310,6 +386,9 @@ static int node_set_param(void *object, uint32_t id, uint32_t flags, struct pw_node *node = data->node; int res; + pw_log_debug("resource %p: set param %s %08x", resource, + spa_debug_type_find_name(spa_type_param, id), flags); + if ((res = spa_node_set_param(node->node, id, flags, param)) < 0) { pw_log_error("resource %p: %d error %d (%s)", resource, resource->id, res, spa_strerror(res)); @@ -337,6 +416,7 @@ static int node_send_command(void *object, const struct spa_command *command) static const struct pw_node_proxy_methods node_methods = { PW_VERSION_NODE_PROXY_METHODS, + .subscribe_params = node_subscribe_params, .enum_params = node_enum_params, .set_param = node_set_param, .send_command = node_send_command @@ -869,6 +949,7 @@ int pw_node_update_properties(struct pw_node *node, const struct spa_dict *dict) static void node_info(void *data, const struct spa_node_info *info) { struct pw_node *node = data; + uint32_t changed_ids[MAX_PARAMS], n_changed_ids = 0; node->info.max_input_ports = info->max_input_ports; node->info.max_output_ports = info->max_output_ports; @@ -881,12 +962,25 @@ static void node_info(void *data, const struct spa_node_info *info) update_properties(node, info->props); } if (info->change_mask & SPA_NODE_CHANGE_MASK_PARAMS) { + uint32_t i; + node->info.change_mask |= PW_NODE_CHANGE_MASK_PARAMS; node->info.n_params = SPA_MIN(info->n_params, SPA_N_ELEMENTS(node->params)); - memcpy(node->info.params, info->params, - node->info.n_params * sizeof(struct spa_param_info)); + + for (i = 0; i < node->info.n_params; i++) { + if (node->info.params[i].flags == info->params[i].flags) + continue; + + if (info->params[i].flags & SPA_PARAM_INFO_READ) + changed_ids[n_changed_ids++] = info->params[i].id; + + node->info.params[i] = info->params[i]; + } } emit_info_changed(node); + + if (info->change_mask & SPA_NODE_CHANGE_MASK_PARAMS) + emit_params(node, changed_ids, n_changed_ids); } static void node_port_info(void *data, enum spa_direction direction, uint32_t port_id, diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index b8561f7e3..deceb08b4 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -80,6 +80,7 @@ struct param { struct control { uint32_t id; + uint32_t type; struct spa_list link; struct pw_stream_control control; struct spa_pod *info; @@ -120,8 +121,6 @@ struct stream { const struct spa_node_callbacks *callbacks; void *callbacks_data; struct spa_io_buffers *io; - struct spa_io_sequence *io_control; - struct spa_io_sequence *io_notify; struct spa_io_position *position; uint32_t io_control_size; uint32_t io_notify_size; @@ -146,6 +145,7 @@ struct stream { int async_connect:1; int disconnecting:1; int free_data:1; + int subscribe:1; }; static int get_param_index(uint32_t id) @@ -427,24 +427,6 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, else impl->io = NULL; break; - case SPA_IO_Control: - if (data && size >= sizeof(struct spa_io_sequence)) { - impl->io_control = data; - impl->io_control_size = size; - } else { - impl->io_control = NULL; - impl->io_control_size = 0; - } - break; - case SPA_IO_Notify: - if (data && size >= sizeof(struct spa_io_sequence)) { - impl->io_notify = data; - impl->io_notify_size = size; - } else { - impl->io_notify = NULL; - impl->io_notify_size = 0; - } - break; default: return -ENOENT; } @@ -690,36 +672,6 @@ static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint3 return 0; } -static int process_control(struct stream *impl, void *data, uint32_t size) -{ - return 0; -} - -static int process_notify(struct stream *impl, void *data, uint32_t size) -{ - struct spa_pod_builder b = { 0 }; - bool changed; - struct spa_pod_frame f[2]; - struct spa_pod *pod; - - spa_pod_builder_init(&b, data, size); - spa_pod_builder_push_sequence(&b, &f[0], 0); - if ((changed = impl->props.changed)) { - spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties); - spa_pod_builder_push_object(&b, &f[1], SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); - spa_pod_builder_prop(&b, SPA_PROP_volume, 0); - spa_pod_builder_float(&b, impl->props.volume); - spa_pod_builder_pop(&b, &f[1]); - impl->props.changed = false; - } - pod = spa_pod_builder_pop(&b, &f[0]); - - if (changed && pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG) && pod) - spa_debug_pod(2, NULL, pod); - - return 0; -} - static inline void copy_position(struct stream *impl, int64_t queued) { struct spa_io_position *p = impl->position; @@ -732,11 +684,6 @@ static inline void copy_position(struct stream *impl, int64_t queued) impl->time.queued = queued; __atomic_add_fetch(&impl->seq, 1, __ATOMIC_SEQ_CST); } - - if (impl->io_control) - process_control(impl, &impl->io_control->sequence, impl->io_control_size); - if (impl->io_notify) - process_notify(impl, &impl->io_notify->sequence, impl->io_notify_size); } static int impl_node_process_input(struct spa_node *node) @@ -863,27 +810,26 @@ static void node_event_info(void *object, const struct pw_node_info *info) { struct pw_stream *stream = object; struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + uint32_t subscribe[info->n_params], n_subscribe = 0; uint32_t i; - if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) { + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS && !impl->subscribe) { for (i = 0; i < info->n_params; i++) { - if (!(info->params[i].flags & SPA_PARAM_INFO_READ)) - continue; switch (info->params[i].id) { case SPA_PARAM_PropInfo: - if (info->params[i].flags == impl->param_propinfo) - break; - impl->param_propinfo = info->params[i].flags; - /* fallthrough */ case SPA_PARAM_Props: - pw_node_proxy_enum_params((struct pw_node_proxy*)stream->proxy, - 0, info->params[i].id, 0, -1, NULL); + subscribe[n_subscribe++] = info->params[i].id; break; default: break; } } + if (n_subscribe > 0) { + pw_node_proxy_subscribe_params((struct pw_node_proxy*)stream->proxy, + subscribe, n_subscribe); + impl->subscribe = true; + } } } @@ -925,6 +871,7 @@ static void node_event_param(void *object, int seq, pod = spa_pod_get_values(type, &n_vals, &choice); + c->type = SPA_POD_TYPE(pod); if (spa_pod_is_float(pod)) vals = SPA_POD_BODY(pod); else if (spa_pod_is_bool(pod) && n_vals > 0) { @@ -1310,16 +1257,6 @@ static void add_params(struct pw_stream *stream) SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)))); - add_param(stream, PARAM_TYPE_INIT, - spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, - SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Notify), - SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence) + 1024))); - add_param(stream, PARAM_TYPE_INIT, - spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, - SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Control), - SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence)))); } SPA_EXPORT @@ -1431,18 +1368,43 @@ void pw_stream_finish_format(struct pw_stream *stream, } SPA_EXPORT -int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value) +int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value, ...) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + va_list varargs; + char buf[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf)); + struct spa_pod_frame f[1]; + struct spa_pod *pod; + struct control *c; - switch (id) { - case SPA_PROP_volume: - impl->props.volume = value; - break; - default: - return -ENOTSUP; + pw_log_debug("stream %p: set control %d %f", stream, id, value); + + va_start(varargs, value); + + spa_pod_builder_push_object(&b, &f[0], SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); + while (1) { + if ((c = find_control(stream, id))) { + spa_pod_builder_prop(&b, id, 0); + switch (c->type) { + case SPA_TYPE_Float: + spa_pod_builder_float(&b, value); + break; + case SPA_TYPE_Bool: + spa_pod_builder_bool(&b, value < 0.5 ? false : true); + break; + default: + spa_pod_builder_none(&b); + break; + } + } + if ((id = va_arg(varargs, uint32_t)) == 0) + break; + value = va_arg(varargs, double); } - impl->props.changed = true; + pod = spa_pod_builder_pop(&b, &f[0]); + + pw_node_proxy_set_param((struct pw_node_proxy*)stream->proxy, + SPA_PARAM_Props, 0, pod); return 0; } diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 097bd0859..b53bf5089 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -323,8 +323,8 @@ pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */ uint32_t n_params /**< number of elements in \a params */); -/** Set a control value */ -int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value); +/** Set control values */ +int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value, ...); /** Get control information */ const struct pw_stream_control * pw_stream_get_control(struct pw_stream *stream, uint32_t id);