diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index f4b42d5a0..65044964e 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -210,7 +210,6 @@ loop_invoke(struct spa_loop *loop, } else { int32_t filled, avail; uint32_t idx, offset, l0; - uint64_t count = 1; filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx); if (filled < 0 || filled > impl->buffer.size) { @@ -249,9 +248,11 @@ loop_invoke(struct spa_loop *loop, spa_loop_utils_signal_event(&impl->utils, impl->wakeup); if (block) { + uint64_t count = 1; if (read(impl->ack_fd, &count, sizeof(uint64_t)) != sizeof(uint64_t)) spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s", impl, strerror(errno)); + res = item->res; } else { diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index d893a4e4e..d90f83ca3 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -108,15 +108,15 @@ struct pw_client_node_transport { #define pw_client_node_transport_parse_message(t,m) ((t)->parse_message((t), (m))) enum pw_client_node_message_type { - PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT, - PW_CLIENT_NODE_MESSAGE_NEED_INPUT, - PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, - PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT, - PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT, + PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT, /*< signal that the node has output */ + PW_CLIENT_NODE_MESSAGE_NEED_INPUT, /*< signal that the node needs input */ + PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT, /*< instruct the node to process input */ + PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT, /*< instruct the node output is processed */ + PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER, /*< reuse a buffer */ }; struct pw_client_node_message_body { - struct spa_pod_int type SPA_ALIGNED(8); + struct spa_pod_int type SPA_ALIGNED(8); /*< one of enum pw_client_node_message_type */ }; struct pw_client_node_message { @@ -124,15 +124,15 @@ struct pw_client_node_message { struct pw_client_node_message_body body; }; -struct pw_client_node_message_reuse_buffer_body { - struct spa_pod_int type SPA_ALIGNED(8); - struct spa_pod_int port_id SPA_ALIGNED(8); - struct spa_pod_int buffer_id SPA_ALIGNED(8); +struct pw_client_node_message_port_reuse_buffer_body { + struct spa_pod_int type SPA_ALIGNED(8); /*< PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER */ + struct spa_pod_int port_id SPA_ALIGNED(8); /*< port id */ + struct spa_pod_int buffer_id SPA_ALIGNED(8); /*< buffer id to reuse */ }; -struct pw_client_node_message_reuse_buffer { +struct pw_client_node_message_port_reuse_buffer { struct spa_pod_struct pod; - struct pw_client_node_message_reuse_buffer_body body; + struct pw_client_node_message_port_reuse_buffer_body body; }; #define PW_CLIENT_NODE_MESSAGE_TYPE(message) (((struct pw_client_node_message*)(message))->body.type.value) @@ -145,14 +145,13 @@ struct pw_client_node_message_reuse_buffer { { { { size, SPA_POD_TYPE_STRUCT } }, \ { SPA_POD_INT_INIT(message), __VA_ARGS__ } } \ -#define PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id,buffer_id) \ - PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_reuse_buffer, \ - sizeof(struct pw_client_node_message_reuse_buffer_body), \ - PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, \ +#define PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id,buffer_id) \ + PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_port_reuse_buffer, \ + sizeof(struct pw_client_node_message_port_reuse_buffer_body), \ + PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER, \ SPA_POD_INT_INIT(port_id), \ SPA_POD_INT_INIT(buffer_id)) - /** information about a buffer */ struct pw_client_node_buffer { uint32_t mem_id; /**< the memory id for the metadata */ @@ -434,7 +433,6 @@ struct pw_client_node_proxy_events { enum spa_direction direction, uint32_t port_id, const struct spa_command *command); - }; static inline void diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ebeca022e..6e5d2cca3 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -683,7 +683,7 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32 spa_log_trace(this->log, "reuse buffer %d", buffer_id); pw_client_node_transport_add_message(impl->transport, (struct pw_client_node_message *) - &PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id, buffer_id)); + &PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id, buffer_id)); return 0; } @@ -694,14 +694,27 @@ spa_proxy_node_port_send_command(struct spa_node *node, uint32_t port_id, const struct spa_command *command) { struct proxy *this; + struct impl *impl; + struct pw_type *t; if (node == NULL || command == NULL) return -EINVAL; this = SPA_CONTAINER_OF(node, struct proxy, node); - spa_log_warn(this->log, "unhandled command %d", SPA_COMMAND_TYPE(command)); - return -ENOTSUP; + if (this->resource == NULL) + return 0; + + impl = this->impl; + t = impl->t; + + spa_log_trace(this->log, "send command %s", + spa_type_map_get_type(t->map, SPA_COMMAND_TYPE(command))); + + pw_client_node_resource_port_command(this->resource, + direction, port_id, + command); + return 0; } static int spa_proxy_node_process_input(struct spa_node *node) @@ -783,27 +796,37 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message n = &impl->this.node->rt.node; - if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) { + switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT: spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { *p->io = impl->transport->outputs[p->port_id]; pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id); } impl->out_pending = false; this->callbacks->have_output(this->callbacks_data); - } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { + break; + + case PW_CLIENT_NODE_MESSAGE_NEED_INPUT: spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { *p->io = impl->transport->inputs[p->port_id]; pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id); } impl->input_ready++; this->callbacks->need_input(this->callbacks_data); - } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { + break; + + case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: if (impl->client_reuse) { - struct pw_client_node_message_reuse_buffer *p = - (struct pw_client_node_message_reuse_buffer *) message; + struct pw_client_node_message_port_reuse_buffer *p = + (struct pw_client_node_message_port_reuse_buffer *) message; this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value, p->body.buffer_id.value); } + break; + + default: + pw_log_warn("unhandled message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); + return -ENOTSUP; } return 0; } diff --git a/src/pipewire/port.c b/src/pipewire/port.c index b547aef2d..905661fcf 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -355,13 +355,28 @@ void pw_port_destroy(struct pw_port *port) } static int -do_port_pause(struct spa_loop *loop, +do_port_command(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_port *port = user_data; struct pw_node *node = port->node; - return spa_node_port_send_command(node->node, port->direction, port->port_id, - &SPA_COMMAND_INIT(node->core->type.command_node.Pause)); + return spa_node_port_send_command(node->node, port->direction, port->port_id, data); +} + +int pw_port_send_command(struct pw_port *port, bool block, const struct spa_command *command) +{ + return pw_loop_invoke(port->node->data_loop, do_port_command, 0, + SPA_POD_SIZE(command), command, block, port); +} + +int pw_port_pause(struct pw_port *port) +{ + if (port->state > PW_PORT_STATE_PAUSED) { + pw_port_send_command(port, true, + &SPA_COMMAND_INIT(port->node->core->type.command_node.Pause)); + port_update_state (port, PW_PORT_STATE_PAUSED); + } + return 0; } int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, @@ -393,6 +408,7 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) { int res; + struct pw_node *node = port->node; if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) return 0; @@ -400,14 +416,10 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 if (n_buffers > 0 && port->state < PW_PORT_STATE_READY) return -EIO; - if (port->state > PW_PORT_STATE_PAUSED) { - pw_loop_invoke(port->node->data_loop, - do_port_pause, 0, 0, NULL, true, port); - port_update_state (port, PW_PORT_STATE_PAUSED); - } + pw_port_pause(port); pw_log_debug("port %p: use %d buffers", port, n_buffers); - res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers); + res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers); if (port->allocated) { free(port->buffers); @@ -430,19 +442,16 @@ int pw_port_alloc_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t *n_buffers) { int res; + struct pw_node *node = port->node; if (port->state < PW_PORT_STATE_READY) return -EIO; - if (port->state > PW_PORT_STATE_PAUSED) { - pw_loop_invoke(port->node->data_loop, - do_port_pause, 0, 0, NULL, true, port); - port_update_state (port, PW_PORT_STATE_PAUSED); - } + pw_port_pause(port); pw_log_debug("port %p: alloc %d buffers", port, *n_buffers); - res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id, + res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id, params, n_params, buffers, n_buffers); if (port->allocated) { diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 7b53d4bc5..c0e5dd839 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -441,6 +441,12 @@ int pw_port_alloc_buffers(struct pw_port *port, struct spa_pod **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers); +/** Send a command to a port */ +int pw_port_send_command(struct pw_port *port, bool block, const struct spa_command *command); + +/** pause the port */ +int pw_port_pause(struct pw_port *port); + /** Change the state of the node */ int pw_node_set_state(struct pw_node *node, enum pw_node_state state); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 1922bbd74..1695fcb48 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -452,17 +452,21 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ { struct node_data *data = proxy->user_data; - if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) { + switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: pw_log_trace("remote %p: process input", data->remote); spa_graph_have_output(data->node->rt.graph, &data->in_node); - } - else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { + break; + + case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: pw_log_trace("remote %p: process output", data->remote); spa_graph_need_input(data->node->rt.graph, &data->out_node); - } - else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { - struct pw_client_node_message_reuse_buffer *rb = - (struct pw_client_node_message_reuse_buffer *) message; + break; + + case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: + { + struct pw_client_node_message_port_reuse_buffer *rb = + (struct pw_client_node_message_port_reuse_buffer *) message; uint32_t port_id = rb->body.port_id.value; uint32_t buffer_id = rb->body.buffer_id.value; struct spa_graph_port *p, *pp; @@ -475,9 +479,11 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ pp->port_id, buffer_id); break; } + break; } - else { + default: pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); + break; } } @@ -817,6 +823,8 @@ client_node_port_set_param(void *object, goto done; } + pw_port_pause(port->port); + res = pw_port_set_param(port->port, id, flags, param); if (res < 0) goto done; @@ -927,6 +935,8 @@ client_node_port_use_buffers(void *object, goto done; } + pw_port_pause(port->port); + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); /* clear previous buffers */ @@ -1041,7 +1051,15 @@ client_node_port_command(void *object, uint32_t port_id, const struct spa_command *command) { - pw_log_warn("port command not supported"); + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + static struct port *port; + + port = find_port(data, direction, port_id); + if (port == NULL) + return; + + pw_port_send_command(port->port, true, command); } static const struct pw_client_node_proxy_events client_node_events = { diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 69feb3b28..93e2e6c2a 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -457,7 +457,7 @@ static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id) uint64_t cmd = 1; pw_client_node_transport_add_message(impl->trans, (struct pw_client_node_message*) - &PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(impl->port_id, id)); + &PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(impl->port_id, id)); write(impl->rtwritefd, &cmd, 8); } @@ -551,7 +551,9 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) { + switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: + { int i; for (i = 0; i < impl->trans->area->n_input_ports; i++) { @@ -581,7 +583,10 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod input->status = SPA_STATUS_NEED_BUFFER; } send_need_input(stream); - } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { + break; + } + case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: + { int i; for (i = 0; i < impl->trans->area->n_output_ports; i++) { @@ -597,9 +602,12 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod impl->in_need_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer); impl->in_need_buffer = false; - } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { - struct pw_client_node_message_reuse_buffer *p = - (struct pw_client_node_message_reuse_buffer *) message; + break; + } + case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: + { + struct pw_client_node_message_port_reuse_buffer *p = + (struct pw_client_node_message_port_reuse_buffer *) message; if (p->body.port_id.value != impl->port_id) return; @@ -607,8 +615,11 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod return; reuse_buffer(stream, p->body.buffer_id.value); - } else { + break; + } + default: pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); + break; } }