diff --git a/pipewire/server/link.c b/pipewire/server/link.c index 96c52a8bb..3c02ca19b 100644 --- a/pipewire/server/link.c +++ b/pipewire/server/link.c @@ -38,6 +38,8 @@ struct impl { struct pw_link this; + bool active; + struct pw_work_queue *work; struct spa_format *format_filter; @@ -160,34 +162,29 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st if (out_state == PW_PORT_STATE_CONFIGURE) { pw_log_debug("link %p: doing set format on output", this); - if ((res = spa_node_port_set_format(this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port_id, - SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { + if ((res = pw_port_set_format(this->output, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { asprintf(&error, "error set output format: %d", res); goto error; } - pw_work_queue_add(impl->work, this->output->node, res, complete_ready, - this->output); + if (SPA_RESULT_IS_ASYNC(res)) + pw_work_queue_add(impl->work, this->output->node, res, complete_ready, + this->output); } if (in_state == PW_PORT_STATE_CONFIGURE) { pw_log_debug("link %p: doing set format on input", this); - if ((res2 = spa_node_port_set_format(this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port_id, - SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { + if ((res2 = pw_port_set_format(this->input, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { asprintf(&error, "error set input format: %d", res2); goto error; } - pw_work_queue_add(impl->work, this->input->node, res2, complete_ready, this->input); - res = res2 != SPA_RESULT_OK ? res2 : res; + if (SPA_RESULT_IS_ASYNC(res2)) + pw_work_queue_add(impl->work, this->input->node, res2, complete_ready, this->input); } if (this->info.format) free(this->info.format); this->info.format = format; - return res; + return SPA_RESULT_OK; error: pw_link_update_state(this, PW_LINK_STATE_ERROR, error); @@ -592,39 +589,27 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s } if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = spa_node_port_alloc_buffers(this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port_id, - params, n_params, - impl->buffers, - &impl->n_buffers)) < 0) { + if ((res = pw_port_alloc_buffers(this->output, params, n_params, + impl->buffers, &impl->n_buffers)) < 0) { asprintf(&error, "error alloc output buffers: %d", res); goto error; } - pw_work_queue_add(impl->work, this->output->node, res, complete_paused, - this->output); - this->output->buffers = impl->buffers; - this->output->n_buffers = impl->n_buffers; - this->output->allocated = true; + if (SPA_RESULT_IS_ASYNC(res)) + pw_work_queue_add(impl->work, this->output->node, res, complete_paused, + this->output); this->output->buffer_mem = impl->buffer_mem; impl->buffer_owner = this->output; pw_log_debug("allocated %d buffers %p from output port", impl->n_buffers, impl->buffers); } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = spa_node_port_alloc_buffers(this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port_id, - params, n_params, - impl->buffers, - &impl->n_buffers)) < 0) { + if ((res = pw_port_alloc_buffers(this->input, params, n_params, + impl->buffers, &impl->n_buffers)) < 0) { asprintf(&error, "error alloc input buffers: %d", res); goto error; } - pw_work_queue_add(impl->work, this->input->node, res, complete_paused, - this->input); - this->input->buffers = impl->buffers; - this->input->n_buffers = impl->n_buffers; - this->input->allocated = true; + if (SPA_RESULT_IS_ASYNC(res)) + pw_work_queue_add(impl->work, this->input->node, res, complete_paused, + this->input); this->input->buffer_mem = impl->buffer_mem; impl->buffer_owner = this->input; pw_log_debug("allocated %d buffers %p from input port", impl->n_buffers, @@ -634,37 +619,26 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { pw_log_debug("using %d buffers %p on input port", impl->n_buffers, impl->buffers); - if ((res = spa_node_port_use_buffers(this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port_id, - impl->buffers, impl->n_buffers)) < 0) { + if ((res = pw_port_use_buffers(this->input, impl->buffers, impl->n_buffers)) < 0) { asprintf(&error, "error use input buffers: %d", res); goto error; } - pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input); - this->input->buffers = impl->buffers; - this->input->n_buffers = impl->n_buffers; - this->input->allocated = false; + if (SPA_RESULT_IS_ASYNC(res)) + pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input); } else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { pw_log_debug("using %d buffers %p on output port", impl->n_buffers, impl->buffers); - if ((res = spa_node_port_use_buffers(this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port_id, - impl->buffers, impl->n_buffers)) < 0) { + if ((res = pw_port_use_buffers(this->output, impl->buffers, impl->n_buffers)) < 0) { asprintf(&error, "error use output buffers: %d", res); goto error; } - pw_work_queue_add(impl->work, this->output->node, res, complete_paused, - this->output); - this->output->buffers = impl->buffers; - this->output->n_buffers = impl->n_buffers; - this->output->allocated = false; + if (SPA_RESULT_IS_ASYNC(res)) + pw_work_queue_add(impl->work, this->output->node, res, complete_paused, this->output); } else { asprintf(&error, "no common buffer alloc found"); goto error; } - return res; + return SPA_RESULT_OK; error: this->output->buffers = NULL; @@ -679,27 +653,43 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state) { - int res = SPA_RESULT_OK; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + char *error = NULL; + int res; if (in_state < PW_PORT_STATE_PAUSED || out_state < PW_PORT_STATE_PAUSED) return SPA_RESULT_OK; - else if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) { - pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL); - } else { - pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); - if (in_state == PW_PORT_STATE_PAUSED) { - res = pw_node_set_state(this->input->node, PW_NODE_STATE_RUNNING); + pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); + + if (in_state == PW_PORT_STATE_PAUSED) { + if ((res = pw_node_set_state(this->input->node, PW_NODE_STATE_RUNNING)) < 0) { + asprintf(&error, "error starting input node: %d", res); + goto error; + } + + if (SPA_RESULT_IS_ASYNC(res)) pw_work_queue_add(impl->work, this->input->node, res, complete_streaming, this->input); + else + complete_streaming(this->input->node, this->input, res, 0); + } + if (out_state == PW_PORT_STATE_PAUSED) { + if ((res = pw_node_set_state(this->output->node, PW_NODE_STATE_RUNNING)) < 0) { + asprintf(&error, "error starting output node: %d", res); + goto error; } - if (out_state == PW_PORT_STATE_PAUSED) { - res = pw_node_set_state(this->output->node, PW_NODE_STATE_RUNNING); + + if (SPA_RESULT_IS_ASYNC(res)) pw_work_queue_add(impl->work, this->output->node, res, complete_streaming, this->output); - } + else + complete_streaming(this->output->node, this->output, res, 0); } + return SPA_RESULT_OK; + + error: + pw_link_update_state(this, PW_LINK_STATE_ERROR, error); return res; } @@ -721,11 +711,13 @@ static int check_states(struct pw_link *this, void *user_data, int res) in_state = this->input->state; out_state = this->output->state; - if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) - return SPA_RESULT_OK; - pw_log_debug("link %p: input state %d, output state %d", this, in_state, out_state); + if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) { + pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL); + return SPA_RESULT_OK; + } + if ((res = do_negotiate(this, in_state, out_state)) != SPA_RESULT_OK) goto exit; @@ -791,7 +783,7 @@ static void on_port_destroy(struct pw_link *this, struct pw_port *port) impl->n_buffers = 0; pw_log_debug("link %p: clear input allocated buffers on port %p", this, other); - pw_port_clear_buffers(other); + pw_port_use_buffers(other, NULL, 0); } pw_signal_emit(&this->port_unlinked, this, port); @@ -818,6 +810,11 @@ bool pw_link_activate(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + if (impl->active) + return true; + + impl->active = true; + pw_log_debug("link %p: activate", this); pw_work_queue_add(impl->work, this, SPA_RESULT_WAIT_SYNC, (pw_work_func_t) check_states, this); @@ -826,6 +823,8 @@ bool pw_link_activate(struct pw_link *this) bool pw_link_deactivate(struct pw_link *this) { + struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + impl->active = false; return true; } @@ -931,15 +930,8 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port) { struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this); - if (impl->buffer_owner != port && port->state > PW_PORT_STATE_READY) { - pw_log_debug("link %p: clear buffers on port %p", link, port); - spa_node_port_use_buffers(port->node->node, - port->direction, port->port_id, NULL, 0); - port->buffers = NULL; - port->n_buffers = 0; - port->state = PW_PORT_STATE_READY; - pw_log_debug("port %p: state READY", port); - } + if (impl->buffer_owner != port) + pw_port_use_buffers(port, NULL, 0); } static int diff --git a/pipewire/server/node.c b/pipewire/server/node.c index 697ac4d3a..4ce63d90f 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -91,9 +91,8 @@ static void update_port_ids(struct pw_node *node) pw_log_debug("node %p: input port added %d", node, input_port_ids[i]); np = pw_port_new(node, PW_DIRECTION_INPUT, input_port_ids[i]); - if ((res = - spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT, np->port_id, - &np->io)) < 0) + if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT, + np->port_id, &np->io)) < 0) pw_log_warn("node %p: can't set input IO %d", node, res); spa_list_insert(ports, &np->link); @@ -133,9 +132,8 @@ static void update_port_ids(struct pw_node *node) pw_log_debug("node %p: output port added %d", node, output_port_ids[i]); np = pw_port_new(node, PW_DIRECTION_OUTPUT, output_port_ids[i]); - if ((res = - spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT, np->port_id, - &np->io)) < 0) + if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT, + np->port_id, &np->io)) < 0) pw_log_warn("node %p: can't set output IO %d", node, res); spa_list_insert(ports, &np->link); @@ -198,29 +196,13 @@ static int suspend_node(struct pw_node *this) pw_log_debug("node %p: suspend node", this); spa_list_for_each(p, &this->input_ports, link) { - if ((res = - spa_node_port_set_format(this->node, SPA_DIRECTION_INPUT, p->port_id, 0, - NULL)) < 0) + if ((res = pw_port_set_format(p, 0, NULL)) < 0) pw_log_warn("error unset format input: %d", res); - p->buffers = NULL; - p->n_buffers = 0; - if (p->allocated) - pw_memblock_free(&p->buffer_mem); - p->allocated = false; - p->state = PW_PORT_STATE_CONFIGURE; } spa_list_for_each(p, &this->output_ports, link) { - if ((res = - spa_node_port_set_format(this->node, SPA_DIRECTION_OUTPUT, p->port_id, 0, - NULL)) < 0) + if ((res = pw_port_set_format(p, 0, NULL)) < 0) pw_log_warn("error unset format output: %d", res); - p->buffers = NULL; - p->n_buffers = 0; - if (p->allocated) - pw_memblock_free(&p->buffer_mem); - p->allocated = false; - p->state = PW_PORT_STATE_CONFIGURE; } return res; } diff --git a/pipewire/server/port.c b/pipewire/server/port.c index ea1300ca2..cc7a23208 100644 --- a/pipewire/server/port.c +++ b/pipewire/server/port.c @@ -68,6 +68,14 @@ void pw_port_destroy(struct pw_port *port) free(port); } +static void port_update_state(struct pw_port *port, enum pw_port_state state) +{ + if (port->state != state) { + pw_log_debug("port %p: state %d -> %d", port, port->state, state); + port->state = state; + } +} + static int do_add_link(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) @@ -169,8 +177,7 @@ int pw_port_pause_rt(struct pw_port *port) port->port_id, &SPA_COMMAND_INIT(port->node->core->type.command_node. Pause)); - port->state = PW_PORT_STATE_PAUSED; - pw_log_debug("port %p: state PAUSED", port); + port_update_state (port, PW_PORT_STATE_PAUSED); return res; } @@ -204,6 +211,9 @@ int pw_port_unlink(struct pw_port *port, struct pw_link *link) res = pw_loop_invoke(node->data_loop->loop, do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port); + if (port->state > PW_PORT_STATE_PAUSED) + port_update_state (port, PW_PORT_STATE_PAUSED); + pw_log_debug("port %p: finish unlink", port); if (port->direction == PW_DIRECTION_OUTPUT) { if (link->output) { @@ -219,49 +229,105 @@ int pw_port_unlink(struct pw_port *port, struct pw_link *link) } } - if (!port->allocated && port->state > PW_PORT_STATE_READY) { - pw_log_debug("port %p: clear buffers on port", port); - spa_node_port_use_buffers(node->node, - port->direction, port->port_id, NULL, 0); - port->buffers = NULL; - port->n_buffers = 0; - port->state = PW_PORT_STATE_READY; - pw_log_debug("port %p: state READY", port); - } + if (!port->allocated) + pw_port_use_buffers(port, NULL, 0); - if (node->n_used_output_links == 0 && node->n_used_input_links == 0) { + if (node->n_used_output_links == 0 && node->n_used_input_links == 0) pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL); - } + return res; } static int -do_clear_buffers(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) +do_port_pause(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_port *port = user_data; return pw_port_pause_rt(port); } -int pw_port_clear_buffers(struct pw_port *port) +int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) { int res; + + res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format); + + pw_log_debug("port %p: set format %d", port, res); + + if (!SPA_RESULT_IS_ASYNC(res)) { + if (format == NULL) { + port->buffers = NULL; + port->n_buffers = 0; + if (port->allocated) + pw_memblock_free(&port->buffer_mem); + port->allocated = false; + port_update_state (port, PW_PORT_STATE_CONFIGURE); + } + else { + port_update_state (port, PW_PORT_STATE_READY); + } + } + return res; +} + +int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) +{ struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); + int res; - pw_log_debug("port %p: clear buffers", port); - - res = pw_loop_invoke(port->node->data_loop->loop, - do_clear_buffers, impl->seq++, 0, NULL, true, port); - - if (port->state <= PW_PORT_STATE_READY) + if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) return SPA_RESULT_OK; - pw_log_debug("port %p: clear buffers finish", port); - res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, NULL, 0); - port->buffers = NULL; - port->n_buffers = 0; - port->state = PW_PORT_STATE_READY; - pw_log_debug("port %p: state READY", port); + if (n_buffers > 0 && port->state < PW_PORT_STATE_READY) + return SPA_RESULT_NO_FORMAT; + + if (port->state > PW_PORT_STATE_PAUSED) { + res = pw_loop_invoke(port->node->data_loop->loop, + do_port_pause, impl->seq++, 0, NULL, true, port); + port_update_state (port, PW_PORT_STATE_PAUSED); + } + + pw_log_debug("port %p: use %d buffers", port, n_buffers); + res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers); + port->buffers = buffers; + port->n_buffers = n_buffers; + if (port->allocated) + pw_memblock_free(&port->buffer_mem); + port->allocated = false; + + if (port->n_buffers == 0) + port_update_state (port, PW_PORT_STATE_READY); + else if (!SPA_RESULT_IS_ASYNC(res)) + port_update_state (port, PW_PORT_STATE_PAUSED); + + return res; +} + +int pw_port_alloc_buffers(struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers) +{ + struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); + int res; + + if (port->state < PW_PORT_STATE_READY) + return SPA_RESULT_NO_FORMAT; + + if (port->state > PW_PORT_STATE_PAUSED) { + res = pw_loop_invoke(port->node->data_loop->loop, + do_port_pause, impl->seq++, 0, NULL, true, port); + port_update_state (port, PW_PORT_STATE_PAUSED); + } + + pw_log_debug("port %p: alloc %d buffers", port, *n_buffers); + res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id, + params, n_params, buffers, n_buffers); + port->buffers = buffers; + port->n_buffers = *n_buffers; + port->allocated = true; + + if (!SPA_RESULT_IS_ASYNC(res)) + port_update_state (port, PW_PORT_STATE_PAUSED); return res; } diff --git a/pipewire/server/port.h b/pipewire/server/port.h index 069dde2bb..a1b6b9fb2 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -102,12 +102,21 @@ pw_port_link(struct pw_port *output_port, /**< output port */ /** Unlink a port \memberof pw_port */ int pw_port_unlink(struct pw_port *port, struct pw_link *link); +/** Set a format on a port \memberof pw_port */ +int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format); + +/** Use buffers on a port \memberof pw_port */ +int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); + +/** Allocate memory for buffers on a port \memberof pw_port */ +int pw_port_alloc_buffers(struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers); + + /** Pause a port, should be called from data thread \memberof pw_port */ int pw_port_pause_rt(struct pw_port *port); -/** Clear the buffers on a port \memberof pw_port */ -int pw_port_clear_buffers(struct pw_port *port); - #ifdef __cplusplus } #endif