From e27331a25ec9d5bdfcf10b7317df22fe706a7429 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 9 Dec 2019 12:28:55 +0100 Subject: [PATCH] filter: use spa_node directy There is not need to make a pw_node first and export that, we ca directly export the spa_node. Fix property updates on node and port. Fix some leaks --- src/pipewire/filter.c | 160 +++++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 72 deletions(-) diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index 86a1b2fa3..89cf04d5f 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -119,8 +119,6 @@ struct filter { enum pw_filter_flags flags; - struct pw_node *node; - struct spa_node impl_node; struct spa_hook_list hooks; struct spa_callbacks callbacks; @@ -129,6 +127,8 @@ struct filter { struct spa_list port_list;; struct port *ports[2][MAX_PORTS]; + uint32_t change_mask_all; + struct spa_node_info info; struct spa_list param_list; struct spa_param_info params[5]; @@ -137,7 +137,6 @@ struct filter { struct pw_time time; unsigned int disconnecting:1; - unsigned int free_data:1; unsigned int free_proxy:1; unsigned int subscribe:1; unsigned int draining:1; @@ -187,12 +186,16 @@ static struct param *add_param(struct filter *impl, struct port *port, if (port) { spa_list_append(&port->param_list, &p->link); - if (idx != -1) + if (idx != -1) { + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; port->params[idx].flags |= SPA_PARAM_INFO_READ; + } } else { spa_list_append(&impl->param_list, &p->link); - if (idx != -1) + if (idx != -1) { + impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS; impl->params[idx].flags |= SPA_PARAM_INFO_READ; + } } return p; } @@ -359,16 +362,13 @@ static int impl_send_command(void *object, const struct spa_command *command) return 0; } -static void emit_node_info(struct filter *d) +static void emit_node_info(struct filter *d, bool full) { - struct spa_node_info info; - - info = SPA_NODE_INFO_INIT(); - info.max_input_ports = MAX_PORTS; - info.max_output_ports = MAX_PORTS; - info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; - info.flags = SPA_NODE_FLAG_RT; - spa_node_emit_info(&d->hooks, &info); + if (full) + d->info.change_mask = d->change_mask_all; + if (d->info.change_mask != 0) + spa_node_emit_info(&d->hooks, &d->info); + d->info.change_mask = 0; } static void emit_port_info(struct filter *d, struct port *p, bool full) @@ -377,6 +377,7 @@ static void emit_port_info(struct filter *d, struct port *p, bool full) p->info.change_mask = p->change_mask_all; if (p->info.change_mask != 0) spa_node_emit_port_info(&d->hooks, p->direction, p->id, &p->info); + p->info.change_mask = 0; } static int impl_add_listener(void *object, @@ -390,7 +391,7 @@ static int impl_add_listener(void *object, spa_hook_list_isolate(&d->hooks, &save, listener, events, data); - emit_node_info(d); + emit_node_info(d, true); spa_list_for_each(p, &d->port_list, link) emit_port_info(d, p, true); @@ -513,7 +514,7 @@ static int port_set_param(struct filter *impl, struct port *port, uint32_t id, uint32_t flags, const struct spa_pod *param) { struct pw_filter *filter = &impl->this; - int res, idx; + int res; pw_log_debug(NAME" %p: param changed: %p %d", impl, param, impl->disconnecting); if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) @@ -527,12 +528,11 @@ static int port_set_param(struct filter *impl, struct port *port, if (filter->state == PW_FILTER_STATE_ERROR) return -EIO; - idx = get_param_index(id); - if (idx != -1) { - impl->params[idx].flags |= SPA_PARAM_INFO_READ; - impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL; + if (port) emit_port_info(impl, port, false); - } + else + emit_node_info(impl, false); + return res; } @@ -882,6 +882,7 @@ static const struct pw_proxy_events proxy_events = { static void on_core_error(void *_data, uint32_t id, int seq, int res, const char *message) { struct pw_filter *filter = _data; + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); pw_log_error(NAME" %p: error id:%u seq:%d res:%d (%s): %s", filter, id, seq, res, spa_strerror(res), message); @@ -948,6 +949,19 @@ filter_new(struct pw_core *core, const char *name, impl->core = core; + impl->info = SPA_NODE_INFO_INIT(); + impl->info.max_input_ports = MAX_PORTS; + impl->info.max_output_ports = MAX_PORTS; + impl->change_mask_all = + SPA_NODE_CHANGE_MASK_FLAGS | + SPA_NODE_CHANGE_MASK_PROPS | + SPA_NODE_CHANGE_MASK_PARAMS; + impl->info.flags = SPA_NODE_FLAG_RT; + impl->info.props = &this->properties->dict; + impl->info.params = impl->params; + impl->info.n_params = SPA_N_ELEMENTS(impl->params); + impl->info.change_mask = impl->change_mask_all; + return impl; error_properties: @@ -1010,7 +1024,6 @@ pw_filter_new_simple(struct pw_loop *loop, this = &impl->this; - impl->free_data = true; impl->data.core = core; pw_filter_add_listener(this, &impl->data.filter_listener, events, data); @@ -1054,7 +1067,6 @@ void pw_filter_destroy(struct pw_filter *filter) if (filter->core_proxy) { spa_hook_remove(&filter->core_listener); spa_list_remove(&filter->link); - filter->core_proxy = NULL; } clear_params(impl, NULL, SPA_ID_INVALID); @@ -1066,7 +1078,7 @@ void pw_filter_destroy(struct pw_filter *filter) free(filter->name); - if (impl->free_data) + if (impl->data.core) pw_core_destroy(impl->data.core); free(impl); @@ -1104,15 +1116,12 @@ const char *pw_filter_get_name(struct pw_filter *filter) SPA_EXPORT const struct pw_properties *pw_filter_get_properties(struct pw_filter *filter, void *port_data) { - struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); if (port_data) { - if (port->port) - return pw_port_get_properties(port->port); + return port->props; } else { - if (impl->node) - return pw_node_get_properties(impl->node); + return filter->properties; } return NULL; } @@ -1125,11 +1134,19 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const int changed = 0; if (port_data) { - if (port->port) - changed = pw_port_update_properties(port->port, dict); + changed = pw_properties_update(port->props, dict); + port->info.props = &port->props->dict; + if (changed > 0) { + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS; + emit_port_info(impl, port, false); + } } else { - if (impl->node) - changed = pw_node_update_properties(impl->node, dict); + changed = pw_properties_update(filter->properties, dict); + impl->info.props = &filter->properties->dict; + if (changed > 0) { + impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PROPS; + emit_node_info(impl, false); + } } return changed; } @@ -1142,7 +1159,6 @@ pw_filter_connect(struct pw_filter *filter, uint32_t n_params) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); - struct pw_properties *props; int res; uint32_t i; @@ -1154,6 +1170,12 @@ pw_filter_connect(struct pw_filter *filter, SPA_VERSION_NODE, &impl_node, impl); + impl->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0); + impl->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0); + impl->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0); + impl->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + impl->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + clear_params(impl, NULL, SPA_ID_INVALID); for (i = 0; i < n_params; i++) { add_param(impl, NULL, SPA_ID_INVALID, params[i]); @@ -1175,22 +1197,9 @@ pw_filter_connect(struct pw_filter *filter, impl->free_proxy = true; } - pw_log_debug(NAME" %p: creating node", filter); - props = pw_properties_copy(filter->properties); - - impl->node = pw_node_new(impl->core, props, 0); - if (impl->node == NULL) { - res = -errno; - goto error_node; - } - - impl->node->port_user_data_size = sizeof(struct port); - - pw_node_set_implementation(impl->node, &impl->impl_node); - - pw_log_debug(NAME" %p: export node %p", filter, impl->node); + pw_log_debug(NAME" %p: export node %p", filter, &impl->impl_node); filter->proxy = pw_core_proxy_export(filter->core_proxy, - PW_TYPE_INTERFACE_Node, NULL, impl->node, 0); + SPA_TYPE_INTERFACE_Node, NULL, &impl->impl_node, 0); if (filter->proxy == NULL) { res = -errno; goto error_proxy; @@ -1198,17 +1207,11 @@ pw_filter_connect(struct pw_filter *filter, pw_proxy_add_listener(filter->proxy, &filter->proxy_listener, &proxy_events, filter); - if (!SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_INACTIVE)) - pw_node_set_active(impl->node, true); - return 0; error_connect: pw_log_error(NAME" %p: can't connect: %s", filter, spa_strerror(res)); return res; -error_node: - pw_log_error(NAME" %p: can't make node: %s", filter, spa_strerror(res)); - return res; error_proxy: pw_log_error(NAME" %p: can't make proxy: %s", filter, spa_strerror(res)); return res; @@ -1228,17 +1231,11 @@ int pw_filter_disconnect(struct pw_filter *filter) pw_log_debug(NAME" %p: disconnect", filter); impl->disconnecting = true; - if (impl->node) - pw_node_set_active(impl->node, false); - if (filter->proxy) pw_proxy_destroy(filter->proxy); - if (impl->node) { - pw_node_destroy(impl->node); - impl->node = NULL; - } - if (filter->core_proxy && impl->free_proxy) { + if (impl->free_proxy) { + impl->free_proxy = false; spa_hook_remove(&filter->core_listener); spa_list_remove(&filter->link); pw_core_proxy_disconnect(filter->core_proxy); @@ -1324,16 +1321,15 @@ void *pw_filter_add_port(struct pw_filter *filter, struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct port *p; const char *str; + int res; if (props == NULL) props = pw_properties_new(NULL, NULL); if (props == NULL) return NULL; - if ((p = alloc_port(impl, direction, port_data_size)) == NULL) { - pw_properties_free(props); - return NULL; - } + if ((p = alloc_port(impl, direction, port_data_size)) == NULL) + goto error_cleanup; p->alloc_buffers = SPA_FLAG_IS_SET(flags, PW_FILTER_PORT_FLAG_ALLOC_BUFFERS); p->props = props; @@ -1348,7 +1344,8 @@ void *pw_filter_add_port(struct pw_filter *filter, add_video_dsp_port_params(impl, p); } /* then override with user provided if any */ - update_params(impl, p, SPA_ID_INVALID, params, n_params); + if ((res = update_params(impl, p, SPA_ID_INVALID, params, n_params)) < 0) + goto error_free; p->change_mask_all = SPA_PORT_CHANGE_MASK_FLAGS | SPA_PORT_CHANGE_MASK_PROPS; @@ -1369,6 +1366,15 @@ void *pw_filter_add_port(struct pw_filter *filter, emit_port_info(impl, p, true); return p->user_data; + + +error_free: + clear_params(impl, p, SPA_ID_INVALID); + free(p); +error_cleanup: + if (props) + pw_properties_free(props); + return NULL; } SPA_EXPORT @@ -1377,11 +1383,14 @@ int pw_filter_remove_port(void *port_data) struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); struct filter *impl = port->filter; + spa_node_emit_port_info(&impl->hooks, port->direction, port->id, NULL); + spa_list_remove(&port->link); impl->ports[port->direction][port->id] = NULL; clear_buffers(port); clear_params(impl, port, SPA_ID_INVALID); + pw_properties_free(port->props); free(port); return 0; @@ -1416,21 +1425,28 @@ int pw_filter_update_params(struct pw_filter *filter, { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); struct port *port; + int res; pw_log_debug(NAME" %p: update params", filter); port = port_data ? SPA_CONTAINER_OF(port_data, struct port, user_data) : NULL; - return update_params(impl, port, SPA_ID_INVALID, params, n_params); + res = update_params(impl, port, SPA_ID_INVALID, params, n_params); + if (res < 0) + return res; + + if (port) + emit_port_info(impl, port, false); + else + emit_node_info(impl, false); + + return res; } SPA_EXPORT int pw_filter_set_active(struct pw_filter *filter, bool active) { - struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); pw_log_debug(NAME" %p: active:%d", filter, active); - if (impl->node) - pw_node_set_active(impl->node, active); return 0; }