diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index cc564400f..a9a26d58d 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -147,6 +147,7 @@ struct filter { unsigned int disconnecting:1; unsigned int disconnect_core:1; unsigned int draining:1; + unsigned int drained:1; unsigned int allow_mlock:1; unsigned int warn_mlock:1; unsigned int process_rt:1; @@ -1002,7 +1003,6 @@ do_call_drained(struct spa_loop *loop, struct pw_filter *filter = &impl->this; pw_log_trace("%p: drained", filter); pw_filter_emit_drained(filter); - impl->draining = false; return 0; } @@ -1086,6 +1086,7 @@ static int impl_node_process(void *object) } } } + impl->drained = drained; if (drained && impl->draining) call_drained(impl); @@ -1129,7 +1130,7 @@ static void proxy_error(void *_data, int seq, int res, const char *message) struct pw_filter *filter = _data; /* we just emit the state change here to inform the application. * If this is supposed to be a permanent error, the app should - * do a pw_stream_set_error() */ + * do a pw_filter_set_error() */ pw_filter_emit_state_changed(filter, filter->state, PW_FILTER_STATE_ERROR, message); } @@ -1203,8 +1204,6 @@ filter_new(struct pw_context *context, const char *name, } impl->main_loop = pw_context_get_main_loop(context); - impl->data_loop = pw_data_loop_get_loop( - pw_context_get_data_loop(context)); this = &impl->this; pw_log_debug("%p: new", impl); @@ -1373,11 +1372,17 @@ static int filter_disconnect(struct filter *impl) return -EBUSY; impl->disconnecting = true; + if (filter->node) + pw_impl_node_set_active(filter->node, false); if (filter->proxy) { pw_proxy_destroy(filter->proxy); filter->proxy = NULL; } + + if (filter->node) + pw_impl_node_destroy(filter->node); + if (impl->disconnect_core) { impl->disconnect_core = false; spa_hook_remove(&filter->core_listener); @@ -1536,17 +1541,28 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const return changed; } -SPA_EXPORT -int +static void node_event_destroy(void *data) +{ + struct pw_filter *filter = data; + spa_hook_remove(&filter->node_listener); + filter->node = NULL; +} + +static const struct pw_impl_node_events node_events = { + PW_VERSION_IMPL_NODE_EVENTS, + .destroy = node_event_destroy, +}; + +SPA_EXPORT int pw_filter_connect(struct pw_filter *filter, enum pw_filter_flags flags, const struct spa_pod **params, uint32_t n_params) { struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct pw_properties *props = NULL; int res; uint32_t i; - struct spa_dict_item items[1]; ensure_loop(impl->main_loop, return -EIO); @@ -1610,12 +1626,27 @@ pw_filter_connect(struct pw_filter *filter, impl->disconnect_core = true; } - pw_log_debug("%p: export node %p", filter, &impl->impl_node); + pw_log_debug("%p: creating node", filter); + props = pw_properties_copy(filter->properties); + if (props == NULL) { + res = -errno; + goto error_node; + } + + filter->node = pw_context_create_node(impl->context, props, 0); + props = NULL; + if (filter->node == NULL) { + res = -errno; + goto error_node; + } + pw_impl_node_set_implementation(filter->node, &impl->impl_node); + + impl->data_loop = filter->node->data_loop; + + pw_log_debug("%p: export node %p", filter, filter->node); - items[0] = SPA_DICT_ITEM_INIT(PW_KEY_OBJECT_REGISTER, "false"); filter->proxy = pw_core_export(filter->core, - SPA_TYPE_INTERFACE_Node, &SPA_DICT_INIT_ARRAY(items), - &impl->impl_node, 0); + PW_TYPE_INTERFACE_Node, NULL, filter->node, 0); if (filter->proxy == NULL) { res = -errno; goto error_proxy; @@ -1623,13 +1654,24 @@ pw_filter_connect(struct pw_filter *filter, pw_proxy_add_listener(filter->proxy, &filter->proxy_listener, &proxy_events, filter); + pw_impl_node_add_listener(filter->node, &filter->node_listener, &node_events, filter); + + pw_impl_node_set_active(filter->node, + !SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_INACTIVE)); + return 0; error_connect: pw_log_error("%p: can't connect: %s", filter, spa_strerror(res)); - return res; + goto exit_cleanup; +error_node: + pw_log_error("%p: can't make node: %s", filter, spa_strerror(res)); + goto exit_cleanup; error_proxy: pw_log_error("%p: can't make proxy: %s", filter, spa_strerror(res)); + goto exit_cleanup; +exit_cleanup: + pw_properties_free(props); return res; } @@ -1860,6 +1902,13 @@ int pw_filter_set_active(struct pw_filter *filter, bool active) ensure_loop(impl->main_loop, return -EIO); pw_log_debug("%p: active:%d", filter, active); + if (filter->node == NULL) + return -EIO; + + pw_impl_node_set_active(filter->node, active); + + if (!active || impl->drained) + impl->drained = impl->draining = false; return 0; } @@ -1968,6 +2017,7 @@ do_drain(struct spa_loop *loop, { struct filter *impl = user_data; impl->draining = true; + impl->drained = false; return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 24b6f2205..1139b4a1a 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -1120,6 +1120,9 @@ struct pw_filter { struct pw_proxy *proxy; struct spa_hook proxy_listener; + struct pw_impl_node *node; + struct spa_hook node_listener; + struct spa_list controls; };