From 06446e0d643125fc420a2575046ba2bb741957c8 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 7 Aug 2019 12:56:57 +0200 Subject: [PATCH] port: simplify buffer allocation Use just one function to do buffer allocation on a port. Remove some unused variables. --- spa/plugins/audioconvert/audioadapter.c | 1 + src/modules/module-client-node/client-node.c | 2 +- src/modules/module-client-node/remote-node.c | 4 +- src/pipewire/link.c | 170 ++++++++----------- src/pipewire/port.c | 91 +++------- src/pipewire/private.h | 8 +- 6 files changed, 91 insertions(+), 185 deletions(-) diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index 46a5b15f7..337aede36 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -549,6 +549,7 @@ static int negotiate_format(struct impl *this) } spa_pod_fixate(format); + spa_log_debug(this->log, NAME "%p: configure format:", this); if (spa_log_level_enabled(this->log, SPA_LOG_LEVEL_DEBUG)) spa_debug_format(0, NULL, format); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ddfd2fd68..417a290b0 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -334,7 +334,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) struct impl *impl = this->impl; struct pw_memmap *mm; uint32_t memid, mem_offset, mem_size; - uint32_t tag[5] = { impl->node_id, id }; + uint32_t tag[5] = { impl->node_id, id, }; if (impl->this.flags & 1) return 0; diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index 230eec92c..75210d906 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -498,7 +498,7 @@ static int clear_buffers(struct node_data *data, struct mix *mix) pw_log_debug("port %p: clear buffers mix:%d %zd", port, mix->mix_id, mix->buffers.size); - if ((res = pw_port_use_buffers(port, mix->mix_id, 0, NULL, 0)) < 0) { + if ((res = pw_port_use_buffers(port, &mix->mix, 0, NULL, 0)) < 0) { pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res)); return res; } @@ -672,7 +672,7 @@ client_node_port_use_buffers(void *object, bufs[i] = b; } - if ((res = pw_port_use_buffers(mix->port, mix_id, flags, bufs, n_buffers)) < 0) + if ((res = pw_port_use_buffers(mix->port, &mix->mix, flags, bufs, n_buffers)) < 0) goto error_exit_cleanup; if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) { diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 375f88044..3094296af 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -228,7 +228,10 @@ static int do_negotiate(struct pw_link *this) input = this->input; output = this->output; - if ((res = pw_core_find_format(this->core, output, input, NULL, 0, NULL, &format, &b, &error)) < 0) + /* find a common format for the ports */ + if ((res = pw_core_find_format(this->core, + output, input, NULL, 0, NULL, + &format, &b, &error)) < 0) goto error; format = spa_pod_copy(format); @@ -236,6 +239,7 @@ static int do_negotiate(struct pw_link *this) spa_pod_builder_init(&b, buffer, sizeof(buffer)); + /* if output port had format and is idle, check if it changed. If so, renegotiate */ if (out_state > PW_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) { index = 0; res = spa_node_port_enum_params_sync(output->node->node, @@ -271,6 +275,7 @@ static int do_negotiate(struct pw_link *this) changed = false; } } + /* if input port had format and is idle, check if it changed. If so, renegotiate */ if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) { index = 0; res = spa_node_port_enum_params_sync(input->node->node, @@ -554,28 +559,27 @@ static int select_io(struct pw_link *this) static int do_allocation(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - int res, out_res = 0, in_res = 0; + int res; uint32_t in_flags, out_flags; char *error = NULL; struct pw_port *input, *output; struct allocation allocation = { NULL, }; - bool out_alloc = false; if (this->info.state > PW_LINK_STATE_ALLOCATING) return 0; - input = this->input; output = this->output; + input = this->input; - pw_log_debug("link %p: in_state:%d out_state:%d", this, input->state, output->state); + pw_log_debug("link %p: out-state:%d in-state:%d", this, output->state, input->state); pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL); - in_flags = input->spa_flags; out_flags = output->spa_flags; + in_flags = input->spa_flags; - pw_log_debug("link %p: doing alloc buffers %p %p: in_flags:%08x out_flags:%08x", - this, output->node, input->node, in_flags, out_flags); + pw_log_debug("link %p: out-node:%p in-node:%p: out-flags:%08x in-flags:%08x", + this, output->node, input->node, out_flags, in_flags); if (out_flags & SPA_PORT_FLAG_LIVE) { pw_log_debug("setting link as live"); @@ -591,7 +595,7 @@ static int do_allocation(struct pw_link *this) struct spa_pod **params, *param; uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - uint32_t i, offset, n_params; + uint32_t i, offset, n_params, flags; uint32_t max_buffers; size_t minsize = 8192, stride = 0, align; uint32_t data_sizes[1]; @@ -644,8 +648,11 @@ static int do_allocation(struct pw_link *this) /* when the output can allocate buffer memory, set the minsize to * 0 to make sure we don't allocate memory in the shared memory */ - if ((out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS)) + flags = 0; + if ((out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS)) { minsize = 0; + flags |= SPA_NODE_BUFFERS_FLAG_ALLOC; + } data_sizes[0] = minsize; data_strides[0] = stride; @@ -658,7 +665,7 @@ static int do_allocation(struct pw_link *this) 1, data_sizes, data_strides, data_aligns, - in_flags & out_flags, + out_flags & in_flags, &allocation)) < 0) { asprintf(&error, "error alloc buffers: %d", res); goto error; @@ -667,71 +674,46 @@ static int do_allocation(struct pw_link *this) pw_log_debug("link %p: allocating %d buffers %p %zd %zd", this, allocation.n_buffers, allocation.buffers, minsize, stride); - if (out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = pw_port_alloc_buffers(output, - allocation.buffers, - allocation.n_buffers)) < 0) { - asprintf(&error, "error alloc output buffers: %d", res); - goto error; - } - out_res = res; - out_alloc = true; - - pw_log_debug("link %p: allocated %d buffers %p from output port: %s", this, - allocation.n_buffers, allocation.buffers, spa_strerror(out_res)); - } else { - pw_log_debug("link %p: using %d buffers %p on output port", this, - allocation.n_buffers, allocation.buffers); - - if ((res = pw_port_use_buffers(output, - this->rt.out_mix.port.port_id, 0, - allocation.buffers, - allocation.n_buffers)) < 0) { - asprintf(&error, "link %p: error use output buffers: %s", this, - spa_strerror(res)); - goto error; - } - out_res = res; + if ((res = pw_port_use_buffers(output, &this->rt.out_mix, + flags, allocation.buffers, allocation.n_buffers)) < 0) { + asprintf(&error, "error use output buffers: %d", res); + goto error; } move_allocation(&allocation, &output->allocation); - if (SPA_RESULT_IS_ASYNC(out_res)) { - pw_work_queue_add(impl->work, output->node, - spa_node_sync(output->node->node, out_res), + if (SPA_RESULT_IS_ASYNC(res)) { + res = spa_node_sync(output->node->node, res), + pw_work_queue_add(impl->work, output->node, res, complete_paused, this); - if (out_alloc) + if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) return 0; } else { - complete_paused(output->node, this, out_res, 0); + complete_paused(output->node, this, res, 0); } } pw_log_debug("link %p: using %d buffers %p on input port", this, output->allocation.n_buffers, output->allocation.buffers); - if ((res = pw_port_use_buffers(input, - this->rt.in_mix.port.port_id, - 0, - output->allocation.buffers, - output->allocation.n_buffers)) < 0) { + if ((res = pw_port_use_buffers(input, &this->rt.in_mix, 0, + output->allocation.buffers, + output->allocation.n_buffers)) < 0) { asprintf(&error, "link %p: error use input buffers: %s", this, spa_strerror(res)); goto error; } - in_res = res; - if (SPA_RESULT_IS_ASYNC(in_res)) { - pw_work_queue_add(impl->work, input->node, - spa_node_sync(input->node->node, in_res), + if (SPA_RESULT_IS_ASYNC(res)) { + res = spa_node_sync(input->node->node, res), + pw_work_queue_add(impl->work, input->node, res, complete_paused, this); } else { - complete_paused(input->node, this, in_res, 0); + complete_paused(input->node, this, res, 0); } return 0; error: free_allocation(&output->allocation); - free_allocation(&input->allocation); pw_link_update_state(this, PW_LINK_STATE_ERROR, error); return res; } @@ -772,14 +754,14 @@ int pw_link_activate(struct pw_link *this) pw_link_prepare(this); - if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io, - sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0) - return res; - if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io, sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0) return res; + if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io, + sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0) + return res; + if (this->info.state == PW_LINK_STATE_PAUSED) { pw_loop_invoke(this->output->node->data_loop, do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); @@ -800,36 +782,36 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id) if (this->info.state == PW_LINK_STATE_PAUSED) return; - input = this->input; output = this->output; + input = this->input; - if (input == NULL || output == NULL) { + if (output == NULL || input == NULL) { pw_link_update_state(this, PW_LINK_STATE_ERROR, strdup("link without input or output port")); return; } - if (input->node->info.state == PW_NODE_STATE_ERROR || - output->node->info.state == PW_NODE_STATE_ERROR) { - pw_log_warn("link %p: one of the nodes is in error in:%d out:%d", this, - input->node->info.state, - output->node->info.state); + if (output->node->info.state == PW_NODE_STATE_ERROR || + input->node->info.state == PW_NODE_STATE_ERROR) { + pw_log_warn("link %p: one of the nodes is in error out:%d in:%d", this, + output->node->info.state, + input->node->info.state); return; } - in_state = input->state; out_state = output->state; + in_state = input->state; - pw_log_debug("link %p: input state %d, output state %d", this, in_state, out_state); + pw_log_debug("link %p: output state %d, input state %d", this, out_state, in_state); - if (in_state == PW_PORT_STATE_ERROR || out_state == PW_PORT_STATE_ERROR) { + if (out_state == PW_PORT_STATE_ERROR || in_state == PW_PORT_STATE_ERROR) { pw_link_update_state(this, PW_LINK_STATE_ERROR, strdup("ports are in error")); return; } if (PW_PORT_IS_CONTROL(output) && PW_PORT_IS_CONTROL(input)) { - pw_port_update_state(input, PW_PORT_STATE_PAUSED, NULL); pw_port_update_state(output, PW_PORT_STATE_PAUSED, NULL); + pw_port_update_state(input, PW_PORT_STATE_PAUSED, NULL); pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); } @@ -849,31 +831,11 @@ exit: this, -EBUSY, (pw_work_func_t) check_states, this); } -static void clear_port_buffers(struct pw_link *link, struct pw_port *port) -{ - int res; - struct pw_port_mix *mix; - - pw_log_debug("%d %p", spa_list_is_empty(&port->links), port->allocation.mem); - - /* we don't clear output buffers when the link goes away. They will get - * cleared when the node goes to suspend */ - if (port->direction == PW_DIRECTION_OUTPUT) - return; - - if (port->direction == PW_DIRECTION_OUTPUT) - mix = &link->rt.out_mix; - else - mix = &link->rt.in_mix; - - if ((res = pw_port_use_buffers(port, mix->port.port_id, 0, NULL, 0)) < 0) - pw_log_warn("link %p: port %p clear error %s", link, port, spa_strerror(res)); -} - static void input_remove(struct pw_link *this, struct pw_port *port) { struct impl *impl = (struct impl *) this; struct pw_port_mix *mix = &this->rt.in_mix; + int res; pw_log_debug("link %p: remove input port %p", this, port); spa_hook_remove(&impl->input_port_listener); @@ -883,8 +845,9 @@ static void input_remove(struct pw_link *this, struct pw_port *port) spa_list_remove(&this->input_link); pw_port_emit_link_removed(this->input, this); - clear_port_buffers(this, port); - + if ((res = pw_port_use_buffers(port, mix, 0, NULL, 0)) < 0) { + pw_log_warn("link %p: port %p clear error %s", this, port, spa_strerror(res)); + } pw_port_release_mix(port, mix); this->input = NULL; } @@ -902,7 +865,8 @@ static void output_remove(struct pw_link *this, struct pw_port *port) spa_list_remove(&this->output_link); pw_port_emit_link_removed(this->output, this); - clear_port_buffers(this, port); + /* we don't clear output buffers when the link goes away. They will get + * cleared when the node goes to suspend */ pw_port_release_mix(port, mix); this->output = NULL; @@ -996,21 +960,21 @@ int pw_link_deactivate(struct pw_link *this) pw_loop_invoke(this->output->node->data_loop, do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); - port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix); port_set_io(this, this->output, SPA_IO_Buffers, NULL, 0, &this->rt.out_mix); + port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix); impl->activated = false; } - input_node = this->input->node; output_node = this->output->node; + input_node = this->input->node; - input_node->n_used_input_links--; output_node->n_used_output_links--; + input_node->n_used_input_links--; if (impl->passive) { - input_node->idle_used_input_links--; output_node->idle_used_output_links--; + input_node->idle_used_input_links--; } debug_link(this); @@ -1309,8 +1273,8 @@ struct pw_link *pw_link_new(struct pw_core *core, if (check_permission(core, output, input, properties) < 0) goto error_link_not_allowed; - input_node = input->node; output_node = output->node; + input_node = input->node; impl = calloc(1, sizeof(struct impl) + user_data_size); if (impl == NULL) @@ -1318,7 +1282,7 @@ struct pw_link *pw_link_new(struct pw_core *core, this = &impl->this; this->feedback = pw_node_can_reach(input_node, output_node); - pw_log_debug("link %p: new %p -> %p", this, input, output); + pw_log_debug("link %p: new out-port:%p -> in-port:%p", this, output, input); if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); @@ -1329,8 +1293,8 @@ struct pw_link *pw_link_new(struct pw_core *core, this->properties = properties; this->info.state = PW_LINK_STATE_INIT; - this->input = input; this->output = output; + this->input = input; if (properties) { const char *str = pw_properties_get(properties, PW_KEY_LINK_PASSIVE); @@ -1380,7 +1344,7 @@ struct pw_link *pw_link_new(struct pw_core *core, this->rt.target.signal = impl->inode->rt.target.signal; this->rt.target.data = impl->inode->rt.target.data; - pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl, + pw_log_debug("link %p: constructed out:%p:%d.%d -> in:%p:%d.%d", impl, output_node, output->port_id, this->rt.out_mix.port.port_id, input_node, input->port_id, this->rt.in_mix.port.port_id); @@ -1447,7 +1411,7 @@ int pw_link_register(struct pw_link *link, struct pw_properties *properties) { struct pw_core *core = link->core; - struct pw_node *input_node, *output_node; + struct pw_node *output_node, *input_node; if (link->registered) goto error_existed; @@ -1457,16 +1421,16 @@ int pw_link_register(struct pw_link *link, if (properties == NULL) return -errno; - input_node = link->input->node; output_node = link->output->node; + input_node = link->input->node; link->info.output_node_id = output_node->global->id; link->info.output_port_id = link->output->global->id; link->info.input_node_id = input_node->global->id; link->info.input_port_id = link->input->global->id; - pw_properties_setf(properties, PW_KEY_LINK_INPUT_PORT, "%d", link->info.input_port_id); pw_properties_setf(properties, PW_KEY_LINK_OUTPUT_PORT, "%d", link->info.output_port_id); + pw_properties_setf(properties, PW_KEY_LINK_INPUT_PORT, "%d", link->info.input_port_id); link->global = pw_global_new(core, PW_TYPE_INTERFACE_Link, @@ -1516,8 +1480,8 @@ void pw_link_destroy(struct pw_link *link) try_unlink_controls(impl, link->output, link->input); - input_remove(link, link->input); output_remove(link, link->output); + input_remove(link, link->input); if (link->global) { spa_hook_remove(&link->global_listener); diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 37be23ef9..683dbb768 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -1038,7 +1038,6 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, /* setting the format always destroys the negotiated buffers */ free_allocation(&port->allocation); - port->allocated = false; if (param == NULL || res < 0) { pw_port_update_state(port, PW_PORT_STATE_CONFIGURE, NULL); @@ -1051,15 +1050,14 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, } SPA_EXPORT -int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags, +int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t flags, struct spa_buffer **buffers, uint32_t n_buffers) { - int res = 0; + int res = 0, res2; struct pw_node *node = port->node; - struct pw_port_mix *mix = NULL; pw_log_debug("port %p: %d:%d.%d: %d buffers state:%d n_mix:%d", port, - port->direction, port->port_id, mix_id, + port->direction, port->port_id, mix->id, n_buffers, port->state, port->n_mix); if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) @@ -1068,18 +1066,6 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags, if (n_buffers > 0 && port->state < PW_PORT_STATE_READY) return -EIO; - if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) != NULL) { - res = spa_node_port_use_buffers(port->mix, - mix->port.direction, mix->port.port_id, flags, - buffers, n_buffers); - - pw_log_debug("port %p: use buffers on mix: %p %d (%s)", - port, port->mix, res, spa_strerror(res)); - - if (res == -ENOTSUP) - res = 0; - } - if (n_buffers == 0) { if (port->n_mix == 1) pw_port_update_state(port, PW_PORT_STATE_READY, NULL); @@ -1091,67 +1077,28 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags, res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, flags, buffers, n_buffers); - if (res < 0) + if (res < 0) { pw_log_error("port %p: use buffers on node: %d (%s)", port, res, spa_strerror(res)); + pw_port_update_state(port, PW_PORT_STATE_ERROR, + "can't use buffers on port"); + return res; + } } - port->allocated = false; - free_allocation(&port->allocation); - - pw_port_call_use_buffers(port, flags, buffers, n_buffers); - } - - if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) - pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL); - - return res; -} - -SPA_EXPORT -int pw_port_alloc_buffers(struct pw_port *port, - struct spa_buffer **buffers, uint32_t n_buffers) -{ - int res, res2; - struct pw_node *node = port->node; - - if (port->state < PW_PORT_STATE_READY) - return -EIO; - - if ((res = spa_node_port_use_buffers(node->node, - port->direction, port->port_id, - SPA_NODE_BUFFERS_FLAG_ALLOC, - buffers, n_buffers)) < 0) { - pw_log_error("port %p: %d alloc failed: %d (%s)", port, port->port_id, - res, spa_strerror(res)); - } - - if (res >= 0) { - res2 = pw_port_call_use_buffers(port, - SPA_NODE_BUFFERS_FLAG_ALLOC, - buffers, n_buffers); - if (res2 < 0) { - pw_log_error("port %p: %d implementation alloc failed: %d (%s)", - port, port->port_id, res, spa_strerror(res2)); + if ((res2 = pw_port_call_use_buffers(port, flags, buffers, n_buffers)) < 0) { + pw_log_warn("port %p: implementation alloc failed: %d (%s)", + port, res2, spa_strerror(res2)); } + if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) + pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL); } - pw_log_debug("port %p: %d alloc %d buffers: %d (%s)", port, - port->port_id, n_buffers, res, spa_strerror(res)); - - free_allocation(&port->allocation); - - if (res < 0) { - port->allocated = false; - } else { - port->allocated = true; - } - - if (n_buffers == 0) { - if (port->n_mix == 1) - pw_port_update_state(port, PW_PORT_STATE_READY, NULL); - } - else if (!SPA_RESULT_IS_ASYNC(res)) { - pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL); + if ((res2 = spa_node_port_use_buffers(port->mix, + mix->port.direction, mix->port.port_id, flags, + buffers, n_buffers)) < 0) { + if (res2 != -ENOTSUP) + pw_log_warn("port %p: mix use buffers failed: %d (%s)", + port, res2, spa_strerror(res2)); } return res; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index fc70a8554..1c1dcb6fe 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -545,8 +545,6 @@ struct pw_port { #define PW_PORT_MIX_FLAG_MIX_ONLY (1<<1) /**< only negotiate mix ports */ uint32_t mix_flags; /**< flags for the mixing */ - unsigned int allocated:1; /**< if buffers are allocated */ - struct spa_list mix_list; /**< list of \ref pw_port_mix */ struct pw_map mix_port_map; /**< map from port_id from mixer */ uint32_t n_mix; @@ -874,13 +872,9 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, const struct spa_pod *param); /** Use buffers on a port \memberof pw_port */ -int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags, +int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t flags, struct spa_buffer **buffers, uint32_t n_buffers); -/** Allocate memory for buffers on a port \memberof pw_port */ -int pw_port_alloc_buffers(struct pw_port *port, - struct spa_buffer **buffers, uint32_t n_buffers); - /** Change the state of the node */ int pw_node_set_state(struct pw_node *node, enum pw_node_state state);