diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 25b1d7405..f9c5781c3 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -52,13 +52,13 @@ static inline void spa_graph_state_reset(struct spa_graph_state *state) struct spa_graph_link { struct spa_list link; struct spa_graph_state *state; - int (*signal) (void *data, void *target); + int (*signal) (void *data); void *signal_data; }; -#define spa_graph_link_signal(l,t) ((l)->signal((l)->signal_data,(t))) +#define spa_graph_link_signal(l) ((l)->signal((l)->signal_data)) -static inline int spa_graph_link_trigger(struct spa_graph_link *link, void *target) +static inline int spa_graph_link_trigger(struct spa_graph_link *link) { struct spa_graph_state *state = link->state; @@ -68,7 +68,7 @@ static inline int spa_graph_link_trigger(struct spa_graph_link *link, void *targ spa_debug("link %p: pending %d required %d", link, state->pending, state->required); if (__atomic_sub_fetch(&state->pending, 1, __ATOMIC_SEQ_CST) == 0) - spa_graph_link_signal(link, target); + spa_graph_link_signal(link); } return state->status; } @@ -131,13 +131,13 @@ struct spa_graph_port { struct spa_graph_port *peer; /**< peer */ }; -static inline int spa_graph_link_signal_node(void *data, void *arg) +static inline int spa_graph_link_signal_node(void *data) { struct spa_graph_node *node = data; return spa_graph_node_process(node); } -static inline int spa_graph_link_signal_graph(void *data, void *arg) +static inline int spa_graph_link_signal_graph(void *data) { struct spa_graph_node *node = data; if (node->graph) @@ -201,7 +201,7 @@ static inline int spa_graph_node_impl_trigger(void *data, struct spa_graph_node struct spa_graph_link *l, *t; spa_debug("node %p trigger", node); spa_list_for_each_safe(l, t, &node->links, link) - spa_graph_link_trigger(l, node); + spa_graph_link_trigger(l); return 0; } @@ -305,7 +305,7 @@ static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node struct spa_node *n = data; spa_debug("node %p: process %d", node, node->state->status); - if ((node->state->status = spa_node_process(n)) == SPA_STATUS_HAVE_BUFFER) + if ((node->state->status = spa_node_process(n)) != SPA_STATUS_OK) spa_graph_node_trigger(node); return node->state->status; diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 3a6c8e0dd..0e4167dfb 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -601,7 +601,7 @@ static int impl_node_process(struct spa_node *node) input->status = SPA_STATUS_OK; } - return SPA_STATUS_OK; + return SPA_STATUS_HAVE_BUFFER; } static const struct spa_dict_item node_info_items[] = { diff --git a/src/examples/video-src.c b/src/examples/video-src.c index d4430bda3..16a0979c4 100644 --- a/src/examples/video-src.c +++ b/src/examples/video-src.c @@ -77,9 +77,13 @@ static void on_timeout(void *userdata, uint64_t expirations) uint8_t *p, *map; struct spa_meta_header *h; + pw_log_trace("timeout"); + id = pw_stream_get_empty_buffer(data->stream); - if (id == SPA_ID_INVALID) + if (id == SPA_ID_INVALID) { + pw_log_warn("out of buffers"); return; + } buf = pw_stream_peek_buffer(data->stream, id); diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 2c4d4ad21..8226e555c 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -901,14 +901,9 @@ impl_node_port_send_command(struct spa_node *node, static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); - struct impl *impl = this->impl; - - if (impl->this.node->driver) - return SPA_STATUS_HAVE_BUFFER; - else { - send_process(this); - return SPA_STATUS_OK; - } + spa_log_trace(this->log, "%p: process", this); + send_process(this); + return SPA_STATUS_OK; } static void @@ -1030,6 +1025,7 @@ static void node_on_data_fd_events(struct spa_source *source) spa_log_warn(this->log, "node %p: error reading message: %s", this, strerror(errno)); + spa_log_trace(this->log, "node %p: got process", this); this->callbacks->process(this->callbacks_data, SPA_STATUS_HAVE_BUFFER); } } @@ -1275,18 +1271,11 @@ static void node_port_added(void *data, struct pw_port *port) port->owner_data = impl; } -static void node_finish(void *data) -{ - struct impl *impl = data; - send_process(&impl->node); -} - static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .free = node_free, .initialized = node_initialized, .port_added = node_port_added, - .finish = node_finish, }; static const struct pw_resource_events resource_events = { @@ -1351,6 +1340,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, if (this->node == NULL) goto error_no_node; + this->node->remote = true; + str = pw_properties_get(properties, "pipewire.client.reuse"); impl->client_reuse = str && pw_properties_parse_bool(str); diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 912524201..bbc833046 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -408,7 +408,7 @@ static void check_properties(struct pw_node *node) static inline int driver_impl_finish(void *data) { struct impl *impl = SPA_CONTAINER_OF(data, struct impl, driver_data); - struct spa_graph_data *d = &impl->graph_data; + struct spa_graph_data *d = &impl->driver_data; struct pw_node *this = &impl->this; pw_log_trace("graph %p finish %p", d->graph, impl); @@ -572,12 +572,18 @@ static void node_process(void *data, int status) struct pw_node *node = data; struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - pw_log_trace("node %p: process %d", node, node->driver); + pw_log_trace("node %p: process %d %d", node, node->driver, node->exported); spa_hook_list_call(&node->listener_list, struct pw_node_events, process); - if (node->driver) - spa_graph_run(&impl->driver_graph); + if (node->driver) { + if (!node->exported) { + if (impl->driver_graph.state->pending == 0 || !node->remote) + spa_graph_run(&impl->driver_graph); + else + spa_graph_node_trigger(&node->rt.node); + } + } else spa_graph_node_trigger(&node->rt.node); } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index e84ad4aff..6c96ae031 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -252,6 +252,8 @@ struct pw_node { bool active; /**< if the node is active */ bool live; /**< if the node is live */ bool driver; /**< if the node drives the graph */ + bool exported; /**< if the node is exported */ + bool remote; /**< if the node is implemented remotely */ struct spa_clock *clock; /**< handle to SPA clock if any */ struct spa_node *node; /**< SPA node implementation */ diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 3385acd31..902503dfc 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -483,7 +483,15 @@ static void node_process(void *data) { struct node_data *d = data; uint64_t cmd = 1; - pw_log_trace("remote %p: process", data); + pw_log_trace("remote %p: send process", data); + write(d->rtwritefd, &cmd, 8); +} + +static void node_finish(void *data) +{ + struct node_data *d = data; + uint64_t cmd = 1; + pw_log_trace("remote %p: send process", data); write(d->rtwritefd, &cmd, 8); } @@ -507,8 +515,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) pw_log_warn("proxy %p: read failed %m", proxy); pw_log_trace("remote %p: process", data->remote); - spa_graph_run(node->graph); - node_process(data); + spa_graph_run(node->graph->parent->graph); } } @@ -1160,7 +1167,6 @@ static void node_destroy(void *data) pw_log_debug("%p: destroy", d); pw_client_node_proxy_destroy(d->node_proxy); pw_proxy_destroy((struct pw_proxy *)d->node_proxy); - d->node_proxy = NULL; } static void node_active_changed(void *data, bool active) @@ -1176,6 +1182,7 @@ static const struct pw_node_events node_events = { .destroy = node_destroy, .active_changed = node_active_changed, .process = node_process, + .finish = node_finish, }; static int @@ -1244,6 +1251,8 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->t = pw_core_get_type(data->core); data->node_proxy = (struct pw_client_node_proxy *)proxy; + node->exported = true; + spa_list_init(&data->free_mix); spa_list_init(&data->mix[0]); spa_list_init(&data->mix[1]); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 852c66997..50746d3c0 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -52,7 +52,7 @@ static inline void init_type(struct type *type, struct spa_type_map *map) struct buffer { struct spa_list link; uint32_t id; -#define BUFFER_FLAG_OUT (1 << 0) +#define BUFFER_FLAG_READY (1 << 0) #define BUFFER_FLAG_MAPPED (1 << 1) uint32_t flags; struct spa_buffer *buffer; @@ -81,7 +81,9 @@ struct stream { struct buffer buffers[MAX_BUFFERS]; int n_buffers; - struct spa_list queue; + struct spa_list free; + struct spa_list ready; + bool in_need_buffer; bool in_new_buffer; @@ -102,6 +104,36 @@ struct stream { int64_t last_monotonic; }; +static inline void queue_free(struct stream *stream, struct buffer *buffer) +{ + spa_list_append(&stream->free, &buffer->link); +} + +static inline struct buffer *dequeue_free(struct stream *stream) +{ + struct buffer *b = NULL; + if (!spa_list_is_empty(&stream->free)) { + b = spa_list_first(&stream->free, struct buffer, link); + spa_list_remove(&b->link); + } + return b; +} + +static inline void queue_ready(struct stream *stream, struct buffer *buffer) +{ + spa_list_append(&stream->ready, &buffer->link); +} + +static inline struct buffer *dequeue_ready(struct stream *stream) +{ + struct buffer *b = NULL; + if (!spa_list_is_empty(&stream->ready)) { + b = spa_list_first(&stream->ready, struct buffer, link); + spa_list_remove(&b->link); + } + return b; +} + static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error) { enum pw_stream_state old = stream->state; @@ -125,7 +157,9 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - return &impl->buffers[id]; + if (id < impl->n_buffers) + return &impl->buffers[id]; + return NULL; } static int impl_send_command(struct spa_node *node, const struct spa_command *command) @@ -249,19 +283,15 @@ static int impl_port_enum_params(struct spa_node *node, struct spa_pod *param; uint32_t last_id = SPA_ID_INVALID; - pw_log_debug("start %d", *index); while (true) { if (*index < d->n_init_params) { param = d->init_params[*index]; - pw_log_debug("init params %d", *index); } else if (*index < d->n_init_params + d->n_params) { param = d->params[*index - d->n_init_params]; - pw_log_debug("params %d", *index); } else if (*index == (d->n_params + d->n_init_params) && d->format) { param = d->format; - pw_log_debug("format %d", *index); } else if (last_id != SPA_ID_INVALID) return 1; @@ -291,9 +321,6 @@ static int impl_port_enum_params(struct spa_node *node, break; } } - pw_log_debug("result %d", *index); - spa_debug_pod(*result, 0); - return 1; } @@ -384,8 +411,7 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc } b->buffer = buffers[i]; pw_log_info("got buffer %d size %d", i, datas[0].maxsize); - spa_list_append(&d->queue, &b->link); - SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + queue_free(d, b); } d->n_buffers = n_buffers; @@ -400,11 +426,8 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc static inline void reuse_buffer(struct stream *d, uint32_t id) { pw_log_trace("export-source %p: recycle buffer %d", d, id); - if (id < d->n_buffers) { - struct buffer *b = &d->buffers[id]; - spa_list_append(&d->queue, &b->link); - SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); - } + if (id < d->n_buffers) + queue_free(d, &d->buffers[id]); } static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id) @@ -432,12 +455,12 @@ static int impl_node_process(struct spa_node *node) if ((b = find_buffer(stream, buffer_id)) == NULL) return SPA_STATUS_NEED_BUFFER; + queue_ready(impl, b); + if (impl->client_reuse) io->buffer_id = SPA_ID_INVALID; if (io->status == SPA_STATUS_HAVE_BUFFER) { - SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - impl->in_new_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, buffer_id); @@ -450,11 +473,18 @@ static int impl_node_process(struct spa_node *node) io->buffer_id = SPA_ID_INVALID; pw_log_trace("stream %p: process output", stream); - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; + if ((b = dequeue_ready(impl)) == NULL) { + impl->in_need_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + need_buffer); + impl->in_need_buffer = false; + b = dequeue_ready(impl); + } + if (b != NULL) { + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_BUFFER; + } res = SPA_STATUS_HAVE_BUFFER; } return res; @@ -535,7 +565,8 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, str = pw_properties_get(props, "pipewire.client.reuse"); impl->client_reuse = str && pw_properties_parse_bool(str); - spa_list_init(&impl->queue); + spa_list_init(&impl->free); + spa_list_init(&impl->ready); spa_hook_list_init(&this->listener_list); @@ -798,12 +829,8 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - - if (spa_list_is_empty(&impl->queue)) + if ((b = dequeue_free(impl)) == NULL) return SPA_ID_INVALID; - - b = spa_list_first(&impl->queue, struct buffer, link); - return b->id; } @@ -812,12 +839,10 @@ int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - if ((b = find_buffer(stream, id)) == NULL || - !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) + if ((b = find_buffer(stream, id)) == NULL) return -EINVAL; - SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); - spa_list_append(&impl->queue, &b->link); + queue_free(impl, b); if (impl->in_new_buffer) { struct spa_io_buffers *io = impl->io; @@ -852,18 +877,10 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) struct buffer *b; struct spa_io_buffers *io = impl->io; - if (io->buffer_id != SPA_ID_INVALID) { - pw_log_debug("can't send %u, pending buffer %u", id, - io->buffer_id); - return -EIO; - } + if ((b = find_buffer(stream, id))) { + queue_ready(impl, b); + pw_log_trace("stream %p: send buffer %d %p", stream, id, io); - if ((b = find_buffer(stream, id)) && !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { - SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - spa_list_remove(&b->link); - io->buffer_id = id; - io->status = SPA_STATUS_HAVE_BUFFER; - pw_log_trace("stream %p: send buffer %d", stream, id); if (!impl->in_need_buffer) pw_loop_invoke(impl->core->data_loop, do_process, 1, NULL, 0, false, impl);