From 28bf6137d3c4765e992fcc87c5bc9fefa50a4090 Mon Sep 17 00:00:00 2001 From: David Svensson Fors Date: Thu, 28 Sep 2017 10:12:54 +0200 Subject: [PATCH] Support for "client-reuse" streams Add a PW property "pipewire.client.reuse". If set, the client-node doesn't immediately reuse a buffer after sending PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT to the client. Instead, it waits for reuse-buffer from the client. The SPA_GRAPH_NODE_FLAG_ASYNC is used for this, together with adapted logic in process_input(). In stream.c, if the property is set, the handling of incoming buffers for PW_DIRECTION_INPUT streams is changed. Each buffer has to be recycled, so we make sure new_buffer is emitted for each intermediate buffer, if buffer_id in the IO area has moved past some buffers. Change-Id: I137a12b702b857cc73369930d7029ecbd69d63ff --- src/modules/module-client-node/client-node.c | 22 +++++++++- src/pipewire/stream.c | 45 +++++++++++++++++--- 2 files changed, 58 insertions(+), 9 deletions(-) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 958efd146..eab38ff40 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -31,11 +31,13 @@ #include "spa/node.h" #include "spa/format-builder.h" #include "spa/lib/format.h" +#include "spa/graph.h" #include "pipewire/pipewire.h" #include "pipewire/interfaces.h" #include "pipewire/core.h" +#include "pipewire/private.h" #include "modules/spa/spa-node.h" #include "client-node.h" #include "transport.h" @@ -126,6 +128,8 @@ struct impl { int fds[2]; int other_fds[2]; + + bool client_reuse; }; /** \endcond */ @@ -762,13 +766,19 @@ static int spa_proxy_node_process_input(struct spa_node *node) pw_log_trace("%d %d", io->status, io->buffer_id); impl->transport->inputs[i] = *io; - io->status = SPA_RESULT_NEED_BUFFER; + + if (impl->client_reuse) { + io->status = SPA_RESULT_OK; + io->buffer_id = SPA_ID_INVALID; + } else { + io->status = SPA_RESULT_NEED_BUFFER; + } } pw_client_node_transport_add_message(impl->transport, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); do_flush(this); - if (this->callbacks->need_input) + if (this->callbacks->need_input || impl->client_reuse) return SPA_RESULT_OK; else return SPA_RESULT_NEED_BUFFER; @@ -1149,6 +1159,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, const struct spa_support *support; uint32_t n_support; const char *name = "client-node"; + const char *str; impl = calloc(1, sizeof(struct impl)); if (impl == NULL) @@ -1178,6 +1189,13 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource, if (this->node == NULL) goto error_no_node; + str = pw_properties_get(properties, "pipewire.client.reuse"); + impl->client_reuse = str && strcmp(str, "1") == 0; + if (impl->client_reuse) + this->node->rt.node.flags |= SPA_GRAPH_NODE_FLAG_ASYNC; + else + this->node->rt.node.flags &= ~SPA_GRAPH_NODE_FLAG_ASYNC; + pw_resource_add_listener(this->resource, &impl->resource_listener, &resource_events, diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 0127102eb..15f111bc8 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -95,6 +95,9 @@ struct stream { struct pw_array buffer_ids; bool in_order; + bool client_reuse; + uint32_t *last_buffer_id; + struct spa_list free; bool in_need_buffer; @@ -201,6 +204,7 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, { struct stream *impl; struct pw_stream *this; + const char *str; impl = calloc(1, sizeof(struct stream)); if (impl == NULL) @@ -224,6 +228,9 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); impl->rtwritefd = -1; + str = pw_properties_get(props, "pipewire.client.reuse"); + impl->client_reuse = str && strcmp(str, "1") == 0; + spa_hook_list_init(&this->listener_list); this->state = PW_STREAM_STATE_UNCONNECTED; @@ -521,19 +528,34 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod for (i = 0; i < impl->trans->area->n_input_ports; i++) { struct spa_port_io *input = &impl->trans->inputs[i]; - struct buffer_id *bid; + uint32_t len, first, id; pw_log_trace("stream %p: process input %d %d", stream, input->status, input->buffer_id); if (input->buffer_id == SPA_ID_INVALID) continue; - bid = find_buffer(stream, input->buffer_id); - bid->used = true; + len = pw_array_get_len(&impl->buffer_ids, struct buffer_id); + if (impl->client_reuse && impl->last_buffer_id[i] != SPA_ID_INVALID) + first = (impl->last_buffer_id[i] + 1) % len; + else + first = input->buffer_id; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, input->buffer_id); - input->buffer_id = SPA_ID_INVALID; + id = first; + while (true) { + struct buffer_id *bid; + + bid = find_buffer(stream, id); + bid->used = true; + + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + new_buffer, id); + impl->last_buffer_id[i] = id; + + if (id == input->buffer_id) + break; + id = (id + 1) % len; + } } send_need_input(stream); } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { @@ -891,12 +913,19 @@ static void client_node_transport(void *data, uint32_t node_id, { struct stream *impl = data; struct pw_stream *stream = &impl->this; + int i; stream->node_id = node_id; - if (impl->trans) + if (impl->trans) { pw_client_node_transport_destroy(impl->trans); + free (impl->last_buffer_id); + } impl->trans = transport; + impl->last_buffer_id = malloc(impl->trans->area->max_input_ports * sizeof(uint32_t)); + for (i = 0; i < impl->trans->area->max_input_ports; i++) { + impl->last_buffer_id[i] = SPA_ID_INVALID; + } pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", stream, impl->trans, readfd, writefd, node_id); @@ -1026,6 +1055,8 @@ void pw_stream_disconnect(struct pw_stream *stream) if (impl->trans) { pw_client_node_transport_destroy(impl->trans); impl->trans = NULL; + free(impl->last_buffer_id); + impl->last_buffer_id = NULL; } }