mirror of
				https://gitlab.freedesktop.org/pipewire/pipewire.git
				synced 2025-11-03 09:01:54 -05:00 
			
		
		
		
	client-node: Rework scheduling
Only send data to a client when it has sent a NEED_INPUT otherwise recycle the buffers immediately. Explicitly recycle buffers when the client is not going to do this.
This commit is contained in:
		
							parent
							
								
									d594444059
								
							
						
					
					
						commit
						61555ab3b5
					
				
					 1 changed files with 38 additions and 29 deletions
				
			
		| 
						 | 
					@ -115,6 +115,8 @@ struct proxy {
 | 
				
			||||||
struct impl {
 | 
					struct impl {
 | 
				
			||||||
	struct pw_client_node this;
 | 
						struct pw_client_node this;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						bool client_reuse;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct pw_core *core;
 | 
						struct pw_core *core;
 | 
				
			||||||
	struct pw_type *t;
 | 
						struct pw_type *t;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -128,7 +130,7 @@ struct impl {
 | 
				
			||||||
	int fds[2];
 | 
						int fds[2];
 | 
				
			||||||
	int other_fds[2];
 | 
						int other_fds[2];
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	bool client_reuse;
 | 
						uint32_t input_ready;
 | 
				
			||||||
	bool out_pending;
 | 
						bool out_pending;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -743,32 +745,39 @@ spa_proxy_node_port_send_command(struct spa_node *node,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int spa_proxy_node_process_input(struct spa_node *node)
 | 
					static int spa_proxy_node_process_input(struct spa_node *node)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct impl *impl;
 | 
						struct proxy *this = SPA_CONTAINER_OF(node, struct proxy, node);
 | 
				
			||||||
	struct proxy *this;
 | 
						struct impl *impl = this->impl;
 | 
				
			||||||
	struct spa_graph_node *n;
 | 
						struct spa_graph_node *n = &impl->this.node->rt.node;
 | 
				
			||||||
	struct spa_graph_port *p;
 | 
						bool client_reuse = impl->client_reuse;
 | 
				
			||||||
 | 
						struct spa_graph_port *p, *pp;
 | 
				
			||||||
	this = SPA_CONTAINER_OF(node, struct proxy, node);
 | 
						int res;
 | 
				
			||||||
	impl = this->impl;
 | 
					 | 
				
			||||||
	n = &impl->this.node->rt.node;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (impl->input_ready == 0) {
 | 
				
			||||||
 | 
							/* the client is not ready to receive our buffers, recycle them */
 | 
				
			||||||
 | 
							pw_log_trace("node not ready, recycle buffers");
 | 
				
			||||||
 | 
							spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link)
 | 
				
			||||||
 | 
								p->io->status = SPA_RESULT_NEED_BUFFER;
 | 
				
			||||||
 | 
							res = SPA_RESULT_NEED_BUFFER;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						else {
 | 
				
			||||||
		spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
 | 
							spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
 | 
				
			||||||
			struct spa_port_io *io = p->io;
 | 
								struct spa_port_io *io = p->io;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								pw_log_trace("set io status to %d %d", io->status, io->buffer_id);
 | 
				
			||||||
			impl->transport->inputs[p->port_id] = *io;
 | 
								impl->transport->inputs[p->port_id] = *io;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id,
 | 
								/* explicitly recycle buffers when the client is not going to do it */
 | 
				
			||||||
				impl->transport->inputs[p->port_id].status,
 | 
								if (!client_reuse && (pp = p->peer))
 | 
				
			||||||
				impl->transport->inputs[p->port_id].buffer_id);
 | 
							                spa_node_port_reuse_buffer(pp->node->implementation, pp->port_id, io->buffer_id);
 | 
				
			||||||
 | 
					 | 
				
			||||||
		if (impl->client_reuse)
 | 
					 | 
				
			||||||
			io->buffer_id = SPA_ID_INVALID;
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		pw_client_node_transport_add_message(impl->transport,
 | 
							pw_client_node_transport_add_message(impl->transport,
 | 
				
			||||||
			       &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
 | 
								       &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
 | 
				
			||||||
		do_flush(this);
 | 
							do_flush(this);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return SPA_RESULT_OK;
 | 
							impl->input_ready--;
 | 
				
			||||||
 | 
							res = SPA_RESULT_OK;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return res;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int spa_proxy_node_process_output(struct spa_node *node)
 | 
					static int spa_proxy_node_process_output(struct spa_node *node)
 | 
				
			||||||
| 
						 | 
					@ -783,7 +792,7 @@ static int spa_proxy_node_process_output(struct spa_node *node)
 | 
				
			||||||
	n = &impl->this.node->rt.node;
 | 
						n = &impl->this.node->rt.node;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (impl->out_pending)
 | 
						if (impl->out_pending)
 | 
				
			||||||
		return SPA_RESULT_OK;
 | 
							goto done;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	impl->out_pending = true;
 | 
						impl->out_pending = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -797,6 +806,7 @@ static int spa_proxy_node_process_output(struct spa_node *node)
 | 
				
			||||||
				impl->transport->outputs[p->port_id].buffer_id);
 | 
									impl->transport->outputs[p->port_id].buffer_id);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      done:
 | 
				
			||||||
	pw_client_node_transport_add_message(impl->transport,
 | 
						pw_client_node_transport_add_message(impl->transport,
 | 
				
			||||||
			       &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
 | 
								       &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
 | 
				
			||||||
	do_flush(this);
 | 
						do_flush(this);
 | 
				
			||||||
| 
						 | 
					@ -815,17 +825,16 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message
 | 
				
			||||||
	if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) {
 | 
						if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) {
 | 
				
			||||||
		spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
 | 
							spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
 | 
				
			||||||
			*p->io = impl->transport->outputs[p->port_id];
 | 
								*p->io = impl->transport->outputs[p->port_id];
 | 
				
			||||||
			pw_log_trace("%d %d", p->io->status, p->io->buffer_id);
 | 
								pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		impl->out_pending = false;
 | 
							impl->out_pending = false;
 | 
				
			||||||
		this->callbacks->have_output(this->callbacks_data);
 | 
							this->callbacks->have_output(this->callbacks_data);
 | 
				
			||||||
	} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
 | 
						} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
 | 
				
			||||||
		spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
 | 
							spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
 | 
				
			||||||
			*p->io = impl->transport->inputs[p->port_id];
 | 
								*p->io = impl->transport->inputs[p->port_id];
 | 
				
			||||||
			if (impl->client_reuse)
 | 
								pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id);
 | 
				
			||||||
				p->io->buffer_id = SPA_ID_INVALID;
 | 
					 | 
				
			||||||
			pw_log_trace("%d %d", p->io->status, p->io->buffer_id);
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							impl->input_ready++;
 | 
				
			||||||
		this->callbacks->need_input(this->callbacks_data);
 | 
							this->callbacks->need_input(this->callbacks_data);
 | 
				
			||||||
	} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
 | 
						} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
 | 
				
			||||||
		if (impl->client_reuse) {
 | 
							if (impl->client_reuse) {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue