mirror of
				https://gitlab.freedesktop.org/pipewire/pipewire.git
				synced 2025-11-03 09:01:54 -05:00 
			
		
		
		
	node: fix support for remote driver nodes
This commit is contained in:
		
							parent
							
								
									c725f1f7b6
								
							
						
					
					
						commit
						a7341ce583
					
				
					 8 changed files with 106 additions and 77 deletions
				
			
		| 
						 | 
					@ -52,13 +52,13 @@ static inline void spa_graph_state_reset(struct spa_graph_state *state)
 | 
				
			||||||
struct spa_graph_link {
 | 
					struct spa_graph_link {
 | 
				
			||||||
	struct spa_list link;
 | 
						struct spa_list link;
 | 
				
			||||||
	struct spa_graph_state *state;
 | 
						struct spa_graph_state *state;
 | 
				
			||||||
	int (*signal) (void *data, void *target);
 | 
						int (*signal) (void *data);
 | 
				
			||||||
	void *signal_data;
 | 
						void *signal_data;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#define spa_graph_link_signal(l,t)	((l)->signal((l)->signal_data,(t)))
 | 
					#define spa_graph_link_signal(l)	((l)->signal((l)->signal_data))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline int spa_graph_link_trigger(struct spa_graph_link *link, void *target)
 | 
					static inline int spa_graph_link_trigger(struct spa_graph_link *link)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct spa_graph_state *state = link->state;
 | 
						struct spa_graph_state *state = link->state;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -68,7 +68,7 @@ static inline int spa_graph_link_trigger(struct spa_graph_link *link, void *targ
 | 
				
			||||||
		spa_debug("link %p: pending %d required %d", link,
 | 
							spa_debug("link %p: pending %d required %d", link,
 | 
				
			||||||
	                        state->pending, state->required);
 | 
						                        state->pending, state->required);
 | 
				
			||||||
		if (__atomic_sub_fetch(&state->pending, 1, __ATOMIC_SEQ_CST) == 0)
 | 
							if (__atomic_sub_fetch(&state->pending, 1, __ATOMIC_SEQ_CST) == 0)
 | 
				
			||||||
			spa_graph_link_signal(link, target);
 | 
								spa_graph_link_signal(link);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
        return state->status;
 | 
					        return state->status;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -131,13 +131,13 @@ struct spa_graph_port {
 | 
				
			||||||
	struct spa_graph_port *peer;	/**< peer */
 | 
						struct spa_graph_port *peer;	/**< peer */
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline int spa_graph_link_signal_node(void *data, void *arg)
 | 
					static inline int spa_graph_link_signal_node(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct spa_graph_node *node = data;
 | 
						struct spa_graph_node *node = data;
 | 
				
			||||||
	return spa_graph_node_process(node);
 | 
						return spa_graph_node_process(node);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline int spa_graph_link_signal_graph(void *data, void *arg)
 | 
					static inline int spa_graph_link_signal_graph(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct spa_graph_node *node = data;
 | 
						struct spa_graph_node *node = data;
 | 
				
			||||||
	if (node->graph)
 | 
						if (node->graph)
 | 
				
			||||||
| 
						 | 
					@ -201,7 +201,7 @@ static inline int spa_graph_node_impl_trigger(void *data, struct spa_graph_node
 | 
				
			||||||
	struct spa_graph_link *l, *t;
 | 
						struct spa_graph_link *l, *t;
 | 
				
			||||||
	spa_debug("node %p trigger", node);
 | 
						spa_debug("node %p trigger", node);
 | 
				
			||||||
	spa_list_for_each_safe(l, t, &node->links, link)
 | 
						spa_list_for_each_safe(l, t, &node->links, link)
 | 
				
			||||||
		spa_graph_link_trigger(l, node);
 | 
							spa_graph_link_trigger(l);
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -305,7 +305,7 @@ static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node
 | 
				
			||||||
	struct spa_node *n = data;
 | 
						struct spa_node *n = data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_debug("node %p: process %d", node, node->state->status);
 | 
						spa_debug("node %p: process %d", node, node->state->status);
 | 
				
			||||||
	if ((node->state->status = spa_node_process(n)) == SPA_STATUS_HAVE_BUFFER)
 | 
						if ((node->state->status = spa_node_process(n)) != SPA_STATUS_OK)
 | 
				
			||||||
		spa_graph_node_trigger(node);
 | 
							spa_graph_node_trigger(node);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return node->state->status;
 | 
					        return node->state->status;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -601,7 +601,7 @@ static int impl_node_process(struct spa_node *node)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		input->status = SPA_STATUS_OK;
 | 
							input->status = SPA_STATUS_OK;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return SPA_STATUS_OK;
 | 
						return SPA_STATUS_HAVE_BUFFER;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static const struct spa_dict_item node_info_items[] = {
 | 
					static const struct spa_dict_item node_info_items[] = {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -77,9 +77,13 @@ static void on_timeout(void *userdata, uint64_t expirations)
 | 
				
			||||||
	uint8_t *p, *map;
 | 
						uint8_t *p, *map;
 | 
				
			||||||
	struct spa_meta_header *h;
 | 
						struct spa_meta_header *h;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pw_log_trace("timeout");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	id = pw_stream_get_empty_buffer(data->stream);
 | 
						id = pw_stream_get_empty_buffer(data->stream);
 | 
				
			||||||
	if (id == SPA_ID_INVALID)
 | 
						if (id == SPA_ID_INVALID) {
 | 
				
			||||||
 | 
							pw_log_warn("out of buffers");
 | 
				
			||||||
		return;
 | 
							return;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	buf = pw_stream_peek_buffer(data->stream, id);
 | 
						buf = pw_stream_peek_buffer(data->stream, id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -901,15 +901,10 @@ impl_node_port_send_command(struct spa_node *node,
 | 
				
			||||||
static int impl_node_process(struct spa_node *node)
 | 
					static int impl_node_process(struct spa_node *node)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct node *this = SPA_CONTAINER_OF(node, struct node, node);
 | 
						struct node *this = SPA_CONTAINER_OF(node, struct node, node);
 | 
				
			||||||
	struct impl *impl = this->impl;
 | 
						spa_log_trace(this->log, "%p: process", this);
 | 
				
			||||||
 | 
					 | 
				
			||||||
	if (impl->this.node->driver)
 | 
					 | 
				
			||||||
		return SPA_STATUS_HAVE_BUFFER;
 | 
					 | 
				
			||||||
	else {
 | 
					 | 
				
			||||||
	send_process(this);
 | 
						send_process(this);
 | 
				
			||||||
	return SPA_STATUS_OK;
 | 
						return SPA_STATUS_OK;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void
 | 
					static void
 | 
				
			||||||
client_node_done(void *data, int seq, int res)
 | 
					client_node_done(void *data, int seq, int res)
 | 
				
			||||||
| 
						 | 
					@ -1030,6 +1025,7 @@ static void node_on_data_fd_events(struct spa_source *source)
 | 
				
			||||||
			spa_log_warn(this->log, "node %p: error reading message: %s",
 | 
								spa_log_warn(this->log, "node %p: error reading message: %s",
 | 
				
			||||||
					this, strerror(errno));
 | 
										this, strerror(errno));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							spa_log_trace(this->log, "node %p: got process", this);
 | 
				
			||||||
		this->callbacks->process(this->callbacks_data, SPA_STATUS_HAVE_BUFFER);
 | 
							this->callbacks->process(this->callbacks_data, SPA_STATUS_HAVE_BUFFER);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -1275,18 +1271,11 @@ static void node_port_added(void *data, struct pw_port *port)
 | 
				
			||||||
	port->owner_data = impl;
 | 
						port->owner_data = impl;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void node_finish(void *data)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	struct impl *impl = data;
 | 
					 | 
				
			||||||
	send_process(&impl->node);
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static const struct pw_node_events node_events = {
 | 
					static const struct pw_node_events node_events = {
 | 
				
			||||||
	PW_VERSION_NODE_EVENTS,
 | 
						PW_VERSION_NODE_EVENTS,
 | 
				
			||||||
	.free = node_free,
 | 
						.free = node_free,
 | 
				
			||||||
	.initialized = node_initialized,
 | 
						.initialized = node_initialized,
 | 
				
			||||||
	.port_added = node_port_added,
 | 
						.port_added = node_port_added,
 | 
				
			||||||
	.finish = node_finish,
 | 
					 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static const struct pw_resource_events resource_events = {
 | 
					static const struct pw_resource_events resource_events = {
 | 
				
			||||||
| 
						 | 
					@ -1351,6 +1340,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
 | 
				
			||||||
	if (this->node == NULL)
 | 
						if (this->node == NULL)
 | 
				
			||||||
		goto error_no_node;
 | 
							goto error_no_node;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						this->node->remote = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	str = pw_properties_get(properties, "pipewire.client.reuse");
 | 
						str = pw_properties_get(properties, "pipewire.client.reuse");
 | 
				
			||||||
	impl->client_reuse = str && pw_properties_parse_bool(str);
 | 
						impl->client_reuse = str && pw_properties_parse_bool(str);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -408,7 +408,7 @@ static void check_properties(struct pw_node *node)
 | 
				
			||||||
static inline int driver_impl_finish(void *data)
 | 
					static inline int driver_impl_finish(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct impl *impl = SPA_CONTAINER_OF(data, struct impl, driver_data);
 | 
						struct impl *impl = SPA_CONTAINER_OF(data, struct impl, driver_data);
 | 
				
			||||||
	struct spa_graph_data *d = &impl->graph_data;
 | 
						struct spa_graph_data *d = &impl->driver_data;
 | 
				
			||||||
	struct pw_node *this = &impl->this;
 | 
						struct pw_node *this = &impl->this;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pw_log_trace("graph %p finish %p", d->graph, impl);
 | 
						pw_log_trace("graph %p finish %p", d->graph, impl);
 | 
				
			||||||
| 
						 | 
					@ -572,15 +572,21 @@ static void node_process(void *data, int status)
 | 
				
			||||||
	struct pw_node *node = data;
 | 
						struct pw_node *node = data;
 | 
				
			||||||
	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
 | 
						struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pw_log_trace("node %p: process %d", node, node->driver);
 | 
						pw_log_trace("node %p: process %d %d", node, node->driver, node->exported);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_hook_list_call(&node->listener_list, struct pw_node_events, process);
 | 
						spa_hook_list_call(&node->listener_list, struct pw_node_events, process);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (node->driver)
 | 
						if (node->driver) {
 | 
				
			||||||
 | 
							if (!node->exported) {
 | 
				
			||||||
 | 
								if (impl->driver_graph.state->pending == 0 || !node->remote)
 | 
				
			||||||
				spa_graph_run(&impl->driver_graph);
 | 
									spa_graph_run(&impl->driver_graph);
 | 
				
			||||||
			else
 | 
								else
 | 
				
			||||||
				spa_graph_node_trigger(&node->rt.node);
 | 
									spa_graph_node_trigger(&node->rt.node);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
							spa_graph_node_trigger(&node->rt.node);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
 | 
					static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -252,6 +252,8 @@ struct pw_node {
 | 
				
			||||||
	bool active;			/**< if the node is active */
 | 
						bool active;			/**< if the node is active */
 | 
				
			||||||
	bool live;			/**< if the node is live */
 | 
						bool live;			/**< if the node is live */
 | 
				
			||||||
	bool driver;			/**< if the node drives the graph */
 | 
						bool driver;			/**< if the node drives the graph */
 | 
				
			||||||
 | 
						bool exported;			/**< if the node is exported */
 | 
				
			||||||
 | 
						bool remote;			/**< if the node is implemented remotely */
 | 
				
			||||||
	struct spa_clock *clock;	/**< handle to SPA clock if any */
 | 
						struct spa_clock *clock;	/**< handle to SPA clock if any */
 | 
				
			||||||
	struct spa_node *node;		/**< SPA node implementation */
 | 
						struct spa_node *node;		/**< SPA node implementation */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -483,7 +483,15 @@ static void node_process(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct node_data *d = data;
 | 
						struct node_data *d = data;
 | 
				
			||||||
	uint64_t cmd = 1;
 | 
						uint64_t cmd = 1;
 | 
				
			||||||
	pw_log_trace("remote %p: process", data);
 | 
						pw_log_trace("remote %p: send process", data);
 | 
				
			||||||
 | 
						write(d->rtwritefd, &cmd, 8);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void node_finish(void *data)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct node_data *d = data;
 | 
				
			||||||
 | 
						uint64_t cmd = 1;
 | 
				
			||||||
 | 
						pw_log_trace("remote %p: send process", data);
 | 
				
			||||||
	write(d->rtwritefd, &cmd, 8);
 | 
						write(d->rtwritefd, &cmd, 8);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -507,8 +515,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
 | 
				
			||||||
			pw_log_warn("proxy %p: read failed %m", proxy);
 | 
								pw_log_warn("proxy %p: read failed %m", proxy);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		pw_log_trace("remote %p: process", data->remote);
 | 
							pw_log_trace("remote %p: process", data->remote);
 | 
				
			||||||
		spa_graph_run(node->graph);
 | 
							spa_graph_run(node->graph->parent->graph);
 | 
				
			||||||
		node_process(data);
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1160,7 +1167,6 @@ static void node_destroy(void *data)
 | 
				
			||||||
	pw_log_debug("%p: destroy", d);
 | 
						pw_log_debug("%p: destroy", d);
 | 
				
			||||||
	pw_client_node_proxy_destroy(d->node_proxy);
 | 
						pw_client_node_proxy_destroy(d->node_proxy);
 | 
				
			||||||
	pw_proxy_destroy((struct pw_proxy *)d->node_proxy);
 | 
						pw_proxy_destroy((struct pw_proxy *)d->node_proxy);
 | 
				
			||||||
	d->node_proxy = NULL;
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void node_active_changed(void *data, bool active)
 | 
					static void node_active_changed(void *data, bool active)
 | 
				
			||||||
| 
						 | 
					@ -1176,6 +1182,7 @@ static const struct pw_node_events node_events = {
 | 
				
			||||||
	.destroy = node_destroy,
 | 
						.destroy = node_destroy,
 | 
				
			||||||
	.active_changed = node_active_changed,
 | 
						.active_changed = node_active_changed,
 | 
				
			||||||
	.process = node_process,
 | 
						.process = node_process,
 | 
				
			||||||
 | 
						.finish = node_finish,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int
 | 
					static int
 | 
				
			||||||
| 
						 | 
					@ -1244,6 +1251,8 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
 | 
				
			||||||
	data->t = pw_core_get_type(data->core);
 | 
						data->t = pw_core_get_type(data->core);
 | 
				
			||||||
	data->node_proxy = (struct pw_client_node_proxy *)proxy;
 | 
						data->node_proxy = (struct pw_client_node_proxy *)proxy;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						node->exported = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_list_init(&data->free_mix);
 | 
						spa_list_init(&data->free_mix);
 | 
				
			||||||
	spa_list_init(&data->mix[0]);
 | 
						spa_list_init(&data->mix[0]);
 | 
				
			||||||
	spa_list_init(&data->mix[1]);
 | 
						spa_list_init(&data->mix[1]);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -52,7 +52,7 @@ static inline void init_type(struct type *type, struct spa_type_map *map)
 | 
				
			||||||
struct buffer {
 | 
					struct buffer {
 | 
				
			||||||
	struct spa_list link;
 | 
						struct spa_list link;
 | 
				
			||||||
	uint32_t id;
 | 
						uint32_t id;
 | 
				
			||||||
#define BUFFER_FLAG_OUT		(1 << 0)
 | 
					#define BUFFER_FLAG_READY	(1 << 0)
 | 
				
			||||||
#define BUFFER_FLAG_MAPPED	(1 << 1)
 | 
					#define BUFFER_FLAG_MAPPED	(1 << 1)
 | 
				
			||||||
	uint32_t flags;
 | 
						uint32_t flags;
 | 
				
			||||||
	struct spa_buffer *buffer;
 | 
						struct spa_buffer *buffer;
 | 
				
			||||||
| 
						 | 
					@ -81,7 +81,9 @@ struct stream {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct buffer buffers[MAX_BUFFERS];
 | 
						struct buffer buffers[MAX_BUFFERS];
 | 
				
			||||||
	int n_buffers;
 | 
						int n_buffers;
 | 
				
			||||||
	struct spa_list queue;
 | 
						struct spa_list free;
 | 
				
			||||||
 | 
						struct spa_list ready;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	bool in_need_buffer;
 | 
						bool in_need_buffer;
 | 
				
			||||||
	bool in_new_buffer;
 | 
						bool in_new_buffer;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -102,6 +104,36 @@ struct stream {
 | 
				
			||||||
	int64_t last_monotonic;
 | 
						int64_t last_monotonic;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static inline void queue_free(struct stream *stream, struct buffer *buffer)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						spa_list_append(&stream->free, &buffer->link);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static inline struct buffer *dequeue_free(struct stream *stream)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct buffer *b = NULL;
 | 
				
			||||||
 | 
						if (!spa_list_is_empty(&stream->free)) {
 | 
				
			||||||
 | 
							b = spa_list_first(&stream->free, struct buffer, link);
 | 
				
			||||||
 | 
							spa_list_remove(&b->link);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return b;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static inline void queue_ready(struct stream *stream, struct buffer *buffer)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						spa_list_append(&stream->ready, &buffer->link);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static inline struct buffer *dequeue_ready(struct stream *stream)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						struct buffer *b = NULL;
 | 
				
			||||||
 | 
						if (!spa_list_is_empty(&stream->ready)) {
 | 
				
			||||||
 | 
							b = spa_list_first(&stream->ready, struct buffer, link);
 | 
				
			||||||
 | 
							spa_list_remove(&b->link);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return b;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
 | 
					static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	enum pw_stream_state old = stream->state;
 | 
						enum pw_stream_state old = stream->state;
 | 
				
			||||||
| 
						 | 
					@ -125,7 +157,9 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat
 | 
				
			||||||
static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
 | 
					static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
						struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
				
			||||||
 | 
						if (id < impl->n_buffers)
 | 
				
			||||||
		return &impl->buffers[id];
 | 
							return &impl->buffers[id];
 | 
				
			||||||
 | 
						return NULL;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int impl_send_command(struct spa_node *node, const struct spa_command *command)
 | 
					static int impl_send_command(struct spa_node *node, const struct spa_command *command)
 | 
				
			||||||
| 
						 | 
					@ -249,19 +283,15 @@ static int impl_port_enum_params(struct spa_node *node,
 | 
				
			||||||
	struct spa_pod *param;
 | 
						struct spa_pod *param;
 | 
				
			||||||
	uint32_t last_id = SPA_ID_INVALID;
 | 
						uint32_t last_id = SPA_ID_INVALID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pw_log_debug("start %d", *index);
 | 
					 | 
				
			||||||
	while (true) {
 | 
						while (true) {
 | 
				
			||||||
		if (*index < d->n_init_params) {
 | 
							if (*index < d->n_init_params) {
 | 
				
			||||||
			param = d->init_params[*index];
 | 
								param = d->init_params[*index];
 | 
				
			||||||
			pw_log_debug("init params %d", *index);
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		else if (*index < d->n_init_params + d->n_params) {
 | 
							else if (*index < d->n_init_params + d->n_params) {
 | 
				
			||||||
			param = d->params[*index - d->n_init_params];
 | 
								param = d->params[*index - d->n_init_params];
 | 
				
			||||||
			pw_log_debug("params %d", *index);
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		else if (*index == (d->n_params + d->n_init_params) && d->format) {
 | 
							else if (*index == (d->n_params + d->n_init_params) && d->format) {
 | 
				
			||||||
			param = d->format;
 | 
								param = d->format;
 | 
				
			||||||
			pw_log_debug("format %d", *index);
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		else if (last_id != SPA_ID_INVALID)
 | 
							else if (last_id != SPA_ID_INVALID)
 | 
				
			||||||
			return 1;
 | 
								return 1;
 | 
				
			||||||
| 
						 | 
					@ -291,9 +321,6 @@ static int impl_port_enum_params(struct spa_node *node,
 | 
				
			||||||
				break;
 | 
									break;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pw_log_debug("result %d", *index);
 | 
					 | 
				
			||||||
	spa_debug_pod(*result, 0);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return 1;
 | 
						return 1;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -384,8 +411,7 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		b->buffer = buffers[i];
 | 
							b->buffer = buffers[i];
 | 
				
			||||||
		pw_log_info("got buffer %d size %d", i, datas[0].maxsize);
 | 
							pw_log_info("got buffer %d size %d", i, datas[0].maxsize);
 | 
				
			||||||
		spa_list_append(&d->queue, &b->link);
 | 
							queue_free(d, b);
 | 
				
			||||||
		SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	d->n_buffers = n_buffers;
 | 
						d->n_buffers = n_buffers;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -400,11 +426,8 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc
 | 
				
			||||||
static inline void reuse_buffer(struct stream *d, uint32_t id)
 | 
					static inline void reuse_buffer(struct stream *d, uint32_t id)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	pw_log_trace("export-source %p: recycle buffer %d", d, id);
 | 
						pw_log_trace("export-source %p: recycle buffer %d", d, id);
 | 
				
			||||||
	if (id < d->n_buffers) {
 | 
						if (id < d->n_buffers)
 | 
				
			||||||
		struct buffer *b = &d->buffers[id];
 | 
							queue_free(d, &d->buffers[id]);
 | 
				
			||||||
	        spa_list_append(&d->queue, &b->link);
 | 
					 | 
				
			||||||
		SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id)
 | 
					static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id)
 | 
				
			||||||
| 
						 | 
					@ -432,12 +455,12 @@ static int impl_node_process(struct spa_node *node)
 | 
				
			||||||
		if ((b = find_buffer(stream, buffer_id)) == NULL)
 | 
							if ((b = find_buffer(stream, buffer_id)) == NULL)
 | 
				
			||||||
			return SPA_STATUS_NEED_BUFFER;
 | 
								return SPA_STATUS_NEED_BUFFER;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							queue_ready(impl, b);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (impl->client_reuse)
 | 
							if (impl->client_reuse)
 | 
				
			||||||
			io->buffer_id = SPA_ID_INVALID;
 | 
								io->buffer_id = SPA_ID_INVALID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (io->status == SPA_STATUS_HAVE_BUFFER) {
 | 
							if (io->status == SPA_STATUS_HAVE_BUFFER) {
 | 
				
			||||||
	                SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			impl->in_new_buffer = true;
 | 
								impl->in_new_buffer = true;
 | 
				
			||||||
			spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
 | 
								spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
 | 
				
			||||||
				 new_buffer, buffer_id);
 | 
									 new_buffer, buffer_id);
 | 
				
			||||||
| 
						 | 
					@ -450,11 +473,18 @@ static int impl_node_process(struct spa_node *node)
 | 
				
			||||||
		io->buffer_id = SPA_ID_INVALID;
 | 
							io->buffer_id = SPA_ID_INVALID;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		pw_log_trace("stream %p: process output", stream);
 | 
							pw_log_trace("stream %p: process output", stream);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if ((b = dequeue_ready(impl)) == NULL) {
 | 
				
			||||||
			impl->in_need_buffer = true;
 | 
								impl->in_need_buffer = true;
 | 
				
			||||||
			spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
 | 
								spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
 | 
				
			||||||
					need_buffer);
 | 
										need_buffer);
 | 
				
			||||||
			impl->in_need_buffer = false;
 | 
								impl->in_need_buffer = false;
 | 
				
			||||||
 | 
								b = dequeue_ready(impl);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if (b != NULL) {
 | 
				
			||||||
 | 
								io->buffer_id = b->id;
 | 
				
			||||||
 | 
								io->status = SPA_STATUS_HAVE_BUFFER;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		res = SPA_STATUS_HAVE_BUFFER;
 | 
							res = SPA_STATUS_HAVE_BUFFER;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return res;
 | 
						return res;
 | 
				
			||||||
| 
						 | 
					@ -535,7 +565,8 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
 | 
				
			||||||
	str = pw_properties_get(props, "pipewire.client.reuse");
 | 
						str = pw_properties_get(props, "pipewire.client.reuse");
 | 
				
			||||||
	impl->client_reuse = str && pw_properties_parse_bool(str);
 | 
						impl->client_reuse = str && pw_properties_parse_bool(str);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_list_init(&impl->queue);
 | 
						spa_list_init(&impl->free);
 | 
				
			||||||
 | 
						spa_list_init(&impl->ready);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_hook_list_init(&this->listener_list);
 | 
						spa_hook_list_init(&this->listener_list);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -798,12 +829,8 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
						struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
				
			||||||
	struct buffer *b;
 | 
						struct buffer *b;
 | 
				
			||||||
 | 
						if ((b = dequeue_free(impl)) == NULL)
 | 
				
			||||||
	if (spa_list_is_empty(&impl->queue))
 | 
					 | 
				
			||||||
		return SPA_ID_INVALID;
 | 
							return SPA_ID_INVALID;
 | 
				
			||||||
 | 
					 | 
				
			||||||
	b = spa_list_first(&impl->queue, struct buffer, link);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return b->id;
 | 
						return b->id;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -812,12 +839,10 @@ int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
 | 
				
			||||||
	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
						struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
 | 
				
			||||||
	struct buffer *b;
 | 
						struct buffer *b;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if ((b = find_buffer(stream, id)) == NULL ||
 | 
						if ((b = find_buffer(stream, id)) == NULL)
 | 
				
			||||||
	    !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT))
 | 
					 | 
				
			||||||
		return -EINVAL;
 | 
							return -EINVAL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
 | 
						queue_free(impl, b);
 | 
				
			||||||
	spa_list_append(&impl->queue, &b->link);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (impl->in_new_buffer) {
 | 
						if (impl->in_new_buffer) {
 | 
				
			||||||
		struct spa_io_buffers *io = impl->io;
 | 
							struct spa_io_buffers *io = impl->io;
 | 
				
			||||||
| 
						 | 
					@ -852,18 +877,10 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
 | 
				
			||||||
	struct buffer *b;
 | 
						struct buffer *b;
 | 
				
			||||||
	struct spa_io_buffers *io = impl->io;
 | 
						struct spa_io_buffers *io = impl->io;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (io->buffer_id != SPA_ID_INVALID) {
 | 
						if ((b = find_buffer(stream, id))) {
 | 
				
			||||||
		pw_log_debug("can't send %u, pending buffer %u", id,
 | 
							queue_ready(impl, b);
 | 
				
			||||||
			     io->buffer_id);
 | 
							pw_log_trace("stream %p: send buffer %d %p", stream, id, io);
 | 
				
			||||||
		return -EIO;
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if ((b = find_buffer(stream, id)) && !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
 | 
					 | 
				
			||||||
		SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
 | 
					 | 
				
			||||||
		spa_list_remove(&b->link);
 | 
					 | 
				
			||||||
		io->buffer_id = id;
 | 
					 | 
				
			||||||
		io->status = SPA_STATUS_HAVE_BUFFER;
 | 
					 | 
				
			||||||
		pw_log_trace("stream %p: send buffer %d", stream, id);
 | 
					 | 
				
			||||||
		if (!impl->in_need_buffer)
 | 
							if (!impl->in_need_buffer)
 | 
				
			||||||
			pw_loop_invoke(impl->core->data_loop,
 | 
								pw_loop_invoke(impl->core->data_loop,
 | 
				
			||||||
	                       do_process, 1, NULL, 0, false, impl);
 | 
						                       do_process, 1, NULL, 0, false, impl);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue