Support for "client-reuse" streams

Add a PW property "pipewire.client.reuse". If set, the client-node
doesn't immediately reuse a buffer after sending
PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT to the client. Instead, it waits
for reuse-buffer from the client. The SPA_GRAPH_NODE_FLAG_ASYNC is
used for this, together with adapted logic in process_input().

In stream.c, if the property is set, the handling of incoming buffers
for PW_DIRECTION_INPUT streams is changed. Each buffer has to be
recycled, so we make sure new_buffer is emitted for each intermediate
buffer, if buffer_id in the IO area has moved past some buffers.

Change-Id: I137a12b702b857cc73369930d7029ecbd69d63ff
This commit is contained in:
David Svensson Fors 2017-09-28 10:12:54 +02:00 committed by Wim Taymans
parent de8e0c8f8c
commit 28bf6137d3
2 changed files with 58 additions and 9 deletions

View file

@ -31,11 +31,13 @@
#include "spa/node.h" #include "spa/node.h"
#include "spa/format-builder.h" #include "spa/format-builder.h"
#include "spa/lib/format.h" #include "spa/lib/format.h"
#include "spa/graph.h"
#include "pipewire/pipewire.h" #include "pipewire/pipewire.h"
#include "pipewire/interfaces.h" #include "pipewire/interfaces.h"
#include "pipewire/core.h" #include "pipewire/core.h"
#include "pipewire/private.h"
#include "modules/spa/spa-node.h" #include "modules/spa/spa-node.h"
#include "client-node.h" #include "client-node.h"
#include "transport.h" #include "transport.h"
@ -126,6 +128,8 @@ struct impl {
int fds[2]; int fds[2];
int other_fds[2]; int other_fds[2];
bool client_reuse;
}; };
/** \endcond */ /** \endcond */
@ -762,13 +766,19 @@ static int spa_proxy_node_process_input(struct spa_node *node)
pw_log_trace("%d %d", io->status, io->buffer_id); pw_log_trace("%d %d", io->status, io->buffer_id);
impl->transport->inputs[i] = *io; impl->transport->inputs[i] = *io;
if (impl->client_reuse) {
io->status = SPA_RESULT_OK;
io->buffer_id = SPA_ID_INVALID;
} else {
io->status = SPA_RESULT_NEED_BUFFER; io->status = SPA_RESULT_NEED_BUFFER;
} }
}
pw_client_node_transport_add_message(impl->transport, pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
do_flush(this); do_flush(this);
if (this->callbacks->need_input) if (this->callbacks->need_input || impl->client_reuse)
return SPA_RESULT_OK; return SPA_RESULT_OK;
else else
return SPA_RESULT_NEED_BUFFER; return SPA_RESULT_NEED_BUFFER;
@ -1149,6 +1159,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
const struct spa_support *support; const struct spa_support *support;
uint32_t n_support; uint32_t n_support;
const char *name = "client-node"; const char *name = "client-node";
const char *str;
impl = calloc(1, sizeof(struct impl)); impl = calloc(1, sizeof(struct impl));
if (impl == NULL) if (impl == NULL)
@ -1178,6 +1189,13 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
if (this->node == NULL) if (this->node == NULL)
goto error_no_node; goto error_no_node;
str = pw_properties_get(properties, "pipewire.client.reuse");
impl->client_reuse = str && strcmp(str, "1") == 0;
if (impl->client_reuse)
this->node->rt.node.flags |= SPA_GRAPH_NODE_FLAG_ASYNC;
else
this->node->rt.node.flags &= ~SPA_GRAPH_NODE_FLAG_ASYNC;
pw_resource_add_listener(this->resource, pw_resource_add_listener(this->resource,
&impl->resource_listener, &impl->resource_listener,
&resource_events, &resource_events,

View file

@ -95,6 +95,9 @@ struct stream {
struct pw_array buffer_ids; struct pw_array buffer_ids;
bool in_order; bool in_order;
bool client_reuse;
uint32_t *last_buffer_id;
struct spa_list free; struct spa_list free;
bool in_need_buffer; bool in_need_buffer;
@ -201,6 +204,7 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
{ {
struct stream *impl; struct stream *impl;
struct pw_stream *this; struct pw_stream *this;
const char *str;
impl = calloc(1, sizeof(struct stream)); impl = calloc(1, sizeof(struct stream));
if (impl == NULL) if (impl == NULL)
@ -224,6 +228,9 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode);
impl->rtwritefd = -1; impl->rtwritefd = -1;
str = pw_properties_get(props, "pipewire.client.reuse");
impl->client_reuse = str && strcmp(str, "1") == 0;
spa_hook_list_init(&this->listener_list); spa_hook_list_init(&this->listener_list);
this->state = PW_STREAM_STATE_UNCONNECTED; this->state = PW_STREAM_STATE_UNCONNECTED;
@ -521,19 +528,34 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
for (i = 0; i < impl->trans->area->n_input_ports; i++) { for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_port_io *input = &impl->trans->inputs[i]; struct spa_port_io *input = &impl->trans->inputs[i];
struct buffer_id *bid; uint32_t len, first, id;
pw_log_trace("stream %p: process input %d %d", stream, input->status, pw_log_trace("stream %p: process input %d %d", stream, input->status,
input->buffer_id); input->buffer_id);
if (input->buffer_id == SPA_ID_INVALID) if (input->buffer_id == SPA_ID_INVALID)
continue; continue;
bid = find_buffer(stream, input->buffer_id); len = pw_array_get_len(&impl->buffer_ids, struct buffer_id);
if (impl->client_reuse && impl->last_buffer_id[i] != SPA_ID_INVALID)
first = (impl->last_buffer_id[i] + 1) % len;
else
first = input->buffer_id;
id = first;
while (true) {
struct buffer_id *bid;
bid = find_buffer(stream, id);
bid->used = true; bid->used = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, input->buffer_id); new_buffer, id);
input->buffer_id = SPA_ID_INVALID; impl->last_buffer_id[i] = id;
if (id == input->buffer_id)
break;
id = (id + 1) % len;
}
} }
send_need_input(stream); send_need_input(stream);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
@ -891,12 +913,19 @@ static void client_node_transport(void *data, uint32_t node_id,
{ {
struct stream *impl = data; struct stream *impl = data;
struct pw_stream *stream = &impl->this; struct pw_stream *stream = &impl->this;
int i;
stream->node_id = node_id; stream->node_id = node_id;
if (impl->trans) if (impl->trans) {
pw_client_node_transport_destroy(impl->trans); pw_client_node_transport_destroy(impl->trans);
free (impl->last_buffer_id);
}
impl->trans = transport; impl->trans = transport;
impl->last_buffer_id = malloc(impl->trans->area->max_input_ports * sizeof(uint32_t));
for (i = 0; i < impl->trans->area->max_input_ports; i++) {
impl->last_buffer_id[i] = SPA_ID_INVALID;
}
pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", pw_log_info("stream %p: create client transport %p with fds %d %d for node %u",
stream, impl->trans, readfd, writefd, node_id); stream, impl->trans, readfd, writefd, node_id);
@ -1026,6 +1055,8 @@ void pw_stream_disconnect(struct pw_stream *stream)
if (impl->trans) { if (impl->trans) {
pw_client_node_transport_destroy(impl->trans); pw_client_node_transport_destroy(impl->trans);
impl->trans = NULL; impl->trans = NULL;
free(impl->last_buffer_id);
impl->last_buffer_id = NULL;
} }
} }