make per mix port io and buffers

Move the io areas to a separate memory block.
Make per link io areas for the ports
Send per mix port io area and buffers
This commit is contained in:
Wim Taymans 2018-03-01 17:39:17 +01:00
parent 07f12c9713
commit 4f680c224b
11 changed files with 242 additions and 161 deletions

View file

@ -2,7 +2,7 @@
load-module libpipewire-module-rtkit load-module libpipewire-module-rtkit
load-module libpipewire-module-protocol-native load-module libpipewire-module-protocol-native
load-module libpipewire-module-suspend-on-idle load-module libpipewire-module-suspend-on-idle
#load-module libpipewire-module-spa-monitor alsa/libspa-alsa alsa-monitor alsa load-module libpipewire-module-spa-monitor alsa/libspa-alsa alsa-monitor alsa
load-module libpipewire-module-spa-monitor v4l2/libspa-v4l2 v4l2-monitor v4l2 load-module libpipewire-module-spa-monitor v4l2/libspa-v4l2 v4l2-monitor v4l2
#load-module libpipewire-module-spa-monitor bluez5/libspa-bluez5 bluez5-monitor bluez5 #load-module libpipewire-module-spa-monitor bluez5/libspa-bluez5 bluez5-monitor bluez5
#load-module libpipewire-module-spa-node videotestsrc/libspa-videotestsrc videotestsrc videotestsrc Spa:POD:Object:Props:patternType=Spa:POD:Object:Props:patternType:snow #load-module libpipewire-module-spa-node videotestsrc/libspa-videotestsrc videotestsrc videotestsrc Spa:POD:Object:Props:patternType=Spa:POD:Object:Props:patternType:snow
@ -10,6 +10,5 @@ load-module libpipewire-module-autolink
#load-module libpipewire-module-mixer #load-module libpipewire-module-mixer
load-module libpipewire-module-client-node load-module libpipewire-module-client-node
load-module libpipewire-module-flatpak load-module libpipewire-module-flatpak
#load-module libpipewire-module-audio-dsp load-module libpipewire-module-audio-dsp
#load-module libpipewire-module-link-factory load-module libpipewire-module-link-factory
#load-module libpipewire-module-jack

View file

@ -56,8 +56,6 @@ struct pw_client_node_area {
*/ */
struct pw_client_node_transport { struct pw_client_node_transport {
struct pw_client_node_area *area; /**< the transport area */ struct pw_client_node_area *area; /**< the transport area */
struct spa_io_buffers *inputs; /**< array of buffer input io */
struct spa_io_buffers *outputs; /**< array of buffer output io */
void *input_data; /**< input memory for ringbuffer */ void *input_data; /**< input memory for ringbuffer */
struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */ struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */
void *output_data; /**< output memory for ringbuffer */ void *output_data; /**< output memory for ringbuffer */
@ -308,6 +306,15 @@ struct pw_client_node_proxy_events {
/** /**
* Memory was added to a node * Memory was added to a node
* *
* Memory is given to a node as an fd in \a memfd of a certain
* memory \a type.
*
* Further references to this fd will be made with the per memory
* unique identifier \a mem_id.
*
* Buffers or controls will reference the memory by \a mem_id and
* mapping the specified area will give access to the memory.
*
* \param mem_id the id of the memory * \param mem_id the id of the memory
* \param type the memory type * \param type the memory type
* \param memfd the fd of the memory * \param memfd the fd of the memory
@ -406,6 +413,7 @@ struct pw_client_node_proxy_events {
* \param seq a sequence number * \param seq a sequence number
* \param direction a port direction * \param direction a port direction
* \param port_id the port id * \param port_id the port id
* \param mix_id the mixer port id
* \param n_buffer the number of buffers * \param n_buffer the number of buffers
* \param buffers and array of buffer descriptions * \param buffers and array of buffer descriptions
*/ */
@ -413,6 +421,7 @@ struct pw_client_node_proxy_events {
uint32_t seq, uint32_t seq,
enum spa_direction direction, enum spa_direction direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t n_buffers, uint32_t n_buffers,
struct pw_client_node_buffer *buffers); struct pw_client_node_buffer *buffers);
/** /**
@ -433,6 +442,7 @@ struct pw_client_node_proxy_events {
* \param seq a sequence number * \param seq a sequence number
* \param direction the direction of the port * \param direction the direction of the port
* \param port_id the port id * \param port_id the port id
* \param mix_id the mixer port id
* \param id the id of the io area to set * \param id the id of the io area to set
* \param mem_id the id of the memory to use * \param mem_id the id of the memory to use
* \param offset offset of io area in memory * \param offset offset of io area in memory
@ -442,6 +452,7 @@ struct pw_client_node_proxy_events {
uint32_t seq, uint32_t seq,
enum spa_direction direction, enum spa_direction direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t id, uint32_t id,
uint32_t mem_id, uint32_t mem_id,
uint32_t offset, uint32_t offset,

View file

@ -42,10 +42,11 @@
/** \cond */ /** \cond */
#define MAX_INPUTS 64 #define MAX_INPUTS 64
#define MAX_OUTPUTS 64 #define MAX_OUTPUTS 64
#define MAX_BUFFERS 64 #define MAX_BUFFERS 64
#define MAX_AREAS 1024
#define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) #define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS)
#define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) #define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS)
@ -136,6 +137,9 @@ struct impl {
struct pw_client_node_transport *transport; struct pw_client_node_transport *transport;
struct pw_memblock *io_areas;
uint32_t io_memid;
struct spa_hook node_listener; struct spa_hook node_listener;
struct spa_hook resource_listener; struct spa_hook resource_listener;
@ -593,52 +597,7 @@ impl_node_port_set_io(struct spa_node *node,
uint32_t id, uint32_t id,
void *data, size_t size) void *data, size_t size)
{ {
struct node *this; return -ENOTSUP;
struct impl *impl;
struct pw_type *t;
struct pw_memblock *mem;
struct mem *m;
uint32_t memid, mem_offset, mem_size;
if (node == NULL)
return -EINVAL;
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
t = impl->t;
if (this->resource == NULL)
return 0;
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
if (data) {
if ((mem = pw_memblock_find(data)) == NULL)
return -EINVAL;
mem_offset = SPA_PTRDIFF(data, mem->ptr);
mem_size = mem->size;
if (mem_size - mem_offset < size)
return -EINVAL;
mem_offset += mem->offset;
m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags);
memid = m->id;
}
else {
memid = SPA_ID_INVALID;
mem_offset = mem_size = 0;
}
pw_client_node_resource_port_set_io(this->resource,
this->seq,
direction, port_id,
id,
memid,
mem_offset, mem_size);
return SPA_RESULT_RETURN_ASYNC(this->seq++);
} }
static int static int
@ -750,7 +709,7 @@ impl_node_port_use_buffers(struct spa_node *node,
pw_client_node_resource_port_use_buffers(this->resource, pw_client_node_resource_port_use_buffers(this->resource,
this->seq, this->seq,
direction, port_id, direction, port_id, 0,
n_buffers, mb); n_buffers, mb);
return SPA_RESULT_RETURN_ASYNC(this->seq++); return SPA_RESULT_RETURN_ASYNC(this->seq++);
@ -855,7 +814,6 @@ static int impl_node_process_input(struct spa_node *node)
struct spa_io_buffers *io = p->io; struct spa_io_buffers *io = p->io;
pw_log_trace("set io status to %d %d", io->status, io->buffer_id); pw_log_trace("set io status to %d %d", io->status, io->buffer_id);
impl->transport->inputs[p->port_id] = *io;
/* explicitly recycle buffers when the client is not going to do it */ /* explicitly recycle buffers when the client is not going to do it */
if (!client_reuse && (pp = p->peer)) if (!client_reuse && (pp = p->peer))
@ -876,29 +834,10 @@ static int impl_node_process_output(struct spa_node *node)
{ {
struct node *this; struct node *this;
struct impl *impl; struct impl *impl;
struct spa_graph_node *n;
struct spa_graph_port *p;
this = SPA_CONTAINER_OF(node, struct node, node); this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl; impl = this->impl;
n = &impl->this.node->rt.node;
if (impl->out_pending)
goto done;
impl->out_pending = true;
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
struct spa_io_buffers *io = p->io;
impl->transport->outputs[p->port_id] = *io;
pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id,
impl->transport->outputs[p->port_id].status,
impl->transport->outputs[p->port_id].buffer_id);
}
done:
pw_client_node_transport_add_message(impl->transport, pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
do_flush(this); do_flush(this);
@ -916,18 +855,15 @@ static int handle_node_message(struct node *this, struct pw_client_node_message
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT: case PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
*p->io = impl->transport->outputs[p->port_id];
pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id);
}
impl->out_pending = false; impl->out_pending = false;
this->callbacks->have_output(this->callbacks_data); this->callbacks->have_output(this->callbacks_data);
break; break;
case PW_CLIENT_NODE_MESSAGE_NEED_INPUT: case PW_CLIENT_NODE_MESSAGE_NEED_INPUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
*p->io = impl->transport->inputs[p->port_id]; struct spa_graph_node *ni = p->peer->node;
pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id); spa_node_process_output(ni->implementation);
pw_log_trace("need input %p %d %d", p->io, p->io->status, p->io->buffer_id);
} }
impl->input_ready++; impl->input_ready++;
this->callbacks->need_input(this->callbacks_data); this->callbacks->need_input(this->callbacks_data);
@ -1205,6 +1141,8 @@ static void node_initialized(void *data)
struct impl *impl = data; struct impl *impl = data;
struct pw_client_node *this = &impl->this; struct pw_client_node *this = &impl->this;
struct pw_node *node = this->node; struct pw_node *node = this->node;
struct pw_type *t = impl->t;
struct mem *m;
if (this->resource == NULL) if (this->resource == NULL)
return; return;
@ -1219,6 +1157,16 @@ static void node_initialized(void *data)
spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); spa_loop_add_source(impl->node.data_loop, &impl->node.data_source);
pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]);
if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE |
PW_MEMBLOCK_FLAG_SEAL,
sizeof(struct spa_io_buffers) * MAX_AREAS,
&impl->io_areas) < 0)
return;
m = ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags);
impl->io_memid = m->id;
pw_client_node_resource_transport(this->resource, pw_client_node_resource_transport(this->resource,
pw_global_get_id(pw_node_get_global(node)), pw_global_get_id(pw_node_get_global(node)),
impl->other_fds[0], impl->other_fds[0],
@ -1247,10 +1195,98 @@ static void node_free(void *data)
free(impl); free(impl);
} }
static void *port_get_io(void *data, uint32_t id, size_t size)
{
struct impl *impl = data;
return impl->io_areas->ptr;
}
static const struct pw_port_implementation port_impl = {
PW_VERSION_PORT_IMPLEMENTATION,
.get_io = port_get_io,
};
static int mix_port_set_io(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
uint32_t id, void *data, size_t size)
{
struct pw_port *p = SPA_CONTAINER_OF(node, struct pw_port, mix_node);
struct impl *impl = p->owner_data;
struct node *this = &impl->node;
struct pw_type *t = impl->t;
struct pw_memblock *mem;
struct mem *m;
uint32_t memid, mem_offset, mem_size;
pw_log_debug("client-node %p: mix port %d set io %p, %zd", impl, port_id, data, size);
p->rt.port.io = data;
p->rt.mix_port.io = data;
if (this->resource == NULL)
return 0;
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
if (data) {
if ((mem = pw_memblock_find(data)) == NULL)
return -EINVAL;
mem_offset = mem->offset;
mem_size = mem->size;
if (mem_size - mem_offset < size)
return -EINVAL;
m = ensure_mem(impl, mem->fd, t->data.MemFd, mem->flags);
memid = m->id;
}
else {
memid = SPA_ID_INVALID;
mem_offset = mem_size = 0;
}
pw_client_node_resource_port_set_io(this->resource,
this->seq,
direction, port_id, 0,
id,
memid,
mem_offset, mem_size);
return SPA_RESULT_RETURN_ASYNC(this->seq++);
}
static int mix_port_process_input(struct spa_node *data)
{
return SPA_STATUS_HAVE_BUFFER;
}
static int mix_port_process_output(struct spa_node *data)
{
return SPA_STATUS_NEED_BUFFER;
}
static void node_port_added(void *data, struct pw_port *port)
{
struct impl *impl = data;
pw_log_debug("client-node %p: port added", &impl->this);
port->mix_node.port_set_io = mix_port_set_io;
port->mix_node.process_input = mix_port_process_input;
port->mix_node.process_output = mix_port_process_output;
port->implementation = &port_impl;
port->implementation_data = impl;
port->owner_data = impl;
}
static const struct pw_node_events node_events = { static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS, PW_VERSION_NODE_EVENTS,
.free = node_free, .free = node_free,
.initialized = node_initialized, .initialized = node_initialized,
.port_added = node_port_added,
}; };
static const struct pw_resource_events resource_events = { static const struct pw_resource_events resource_events = {

View file

@ -333,7 +333,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct spa_pod_parser prs; struct spa_pod_parser prs;
uint32_t seq, direction, port_id, n_buffers, data_id; uint32_t seq, direction, port_id, mix_id, n_buffers, data_id;
struct pw_client_node_buffer *buffers; struct pw_client_node_buffer *buffers;
int i, j; int i, j;
@ -343,6 +343,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size
"i", &seq, "i", &seq,
"i", &direction, "i", &direction,
"i", &port_id, "i", &port_id,
"i", &mix_id,
"i", &n_buffers, NULL) < 0) "i", &n_buffers, NULL) < 0)
return -EINVAL; return -EINVAL;
@ -388,6 +389,7 @@ static int client_node_demarshal_port_use_buffers(void *object, void *data, size
pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_use_buffers, seq, pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_use_buffers, seq,
direction, direction,
port_id, port_id,
mix_id,
n_buffers, buffers); n_buffers, buffers);
return 0; return 0;
} }
@ -417,7 +419,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct spa_pod_parser prs; struct spa_pod_parser prs;
uint32_t seq, direction, port_id, id, memid, off, sz; uint32_t seq, direction, port_id, mix_id, id, memid, off, sz;
spa_pod_parser_init(&prs, data, size, 0); spa_pod_parser_init(&prs, data, size, 0);
if (spa_pod_parser_get(&prs, if (spa_pod_parser_get(&prs,
@ -425,6 +427,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si
"i", &seq, "i", &seq,
"i", &direction, "i", &direction,
"i", &port_id, "i", &port_id,
"i", &mix_id,
"I", &id, "I", &id,
"i", &memid, "i", &memid,
"i", &off, "i", &off,
@ -433,7 +436,7 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si
pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_set_io, pw_proxy_notify(proxy, struct pw_client_node_proxy_events, port_set_io,
seq, seq,
direction, port_id, direction, port_id, mix_id,
id, memid, id, memid,
off, sz); off, sz);
return 0; return 0;
@ -588,6 +591,7 @@ client_node_marshal_port_use_buffers(void *object,
uint32_t seq, uint32_t seq,
enum spa_direction direction, enum spa_direction direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t n_buffers, struct pw_client_node_buffer *buffers) uint32_t n_buffers, struct pw_client_node_buffer *buffers)
{ {
struct pw_resource *resource = object; struct pw_resource *resource = object;
@ -601,6 +605,7 @@ client_node_marshal_port_use_buffers(void *object,
"i", seq, "i", seq,
"i", direction, "i", direction,
"i", port_id, "i", port_id,
"i", mix_id,
"i", n_buffers, NULL); "i", n_buffers, NULL);
for (i = 0; i < n_buffers; i++) { for (i = 0; i < n_buffers; i++) {
@ -659,6 +664,7 @@ client_node_marshal_port_set_io(void *object,
uint32_t seq, uint32_t seq,
uint32_t direction, uint32_t direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t id, uint32_t id,
uint32_t memid, uint32_t memid,
uint32_t offset, uint32_t offset,
@ -673,6 +679,7 @@ client_node_marshal_port_set_io(void *object,
"i", seq, "i", seq,
"i", direction, "i", direction,
"i", port_id, "i", port_id,
"i", mix_id,
"I", id, "I", id,
"i", memid, "i", memid,
"i", offset, "i", offset,

View file

@ -48,8 +48,6 @@ static size_t area_get_size(struct pw_client_node_area *area)
{ {
size_t size; size_t size;
size = sizeof(struct pw_client_node_area); size = sizeof(struct pw_client_node_area);
size += area->max_input_ports * sizeof(struct spa_io_buffers);
size += area->max_output_ports * sizeof(struct spa_io_buffers);
size += sizeof(struct spa_ringbuffer); size += sizeof(struct spa_ringbuffer);
size += INPUT_BUFFER_SIZE; size += INPUT_BUFFER_SIZE;
size += sizeof(struct spa_ringbuffer); size += sizeof(struct spa_ringbuffer);
@ -64,12 +62,6 @@ static void transport_setup_area(void *p, struct pw_client_node_transport *trans
trans->area = a = p; trans->area = a = p;
p = SPA_MEMBER(p, sizeof(struct pw_client_node_area), struct spa_io_buffers); p = SPA_MEMBER(p, sizeof(struct pw_client_node_area), struct spa_io_buffers);
trans->inputs = p;
p = SPA_MEMBER(p, a->max_input_ports * sizeof(struct spa_io_buffers), void);
trans->outputs = p;
p = SPA_MEMBER(p, a->max_output_ports * sizeof(struct spa_io_buffers), void);
trans->input_buffer = p; trans->input_buffer = p;
p = SPA_MEMBER(p, sizeof(struct spa_ringbuffer), void); p = SPA_MEMBER(p, sizeof(struct spa_ringbuffer), void);
@ -85,17 +77,6 @@ static void transport_setup_area(void *p, struct pw_client_node_transport *trans
static void transport_reset_area(struct pw_client_node_transport *trans) static void transport_reset_area(struct pw_client_node_transport *trans)
{ {
int i;
struct pw_client_node_area *a = trans->area;
for (i = 0; i < a->max_input_ports; i++) {
trans->inputs[i].status = SPA_STATUS_OK;
trans->inputs[i].buffer_id = SPA_ID_INVALID;
}
for (i = 0; i < a->max_output_ports; i++) {
trans->outputs[i].status = SPA_STATUS_OK;
trans->outputs[i].buffer_id = SPA_ID_INVALID;
}
spa_ringbuffer_init(trans->input_buffer); spa_ringbuffer_init(trans->input_buffer);
spa_ringbuffer_init(trans->output_buffer); spa_ringbuffer_init(trans->output_buffer);
} }

View file

@ -903,7 +903,7 @@ static int module_init(struct pw_module *module, struct pw_properties *propertie
pw_protocol_native_init(this); pw_protocol_native_init(this);
pw_log_debug("protocol-native %p: new", this); pw_log_debug("protocol-native %p: new %d", this, debug_messages);
d = pw_protocol_get_user_data(this); d = pw_protocol_get_user_data(this);
d->protocol = this; d->protocol = this;

View file

@ -480,6 +480,47 @@ param_filter(struct pw_link *this,
return num; return num;
} }
static void port_set_io(struct pw_link *this, struct pw_port *port, void *data, size_t size,
struct spa_graph_port *p)
{
struct pw_type *t = &this->core->type;
int res;
p->io = data;
pw_log_debug("link %p: port %p %d.%d set io: %p", this, port, port->port_id, p->port_id, data);
if (port->mix_node.port_set_io) {
if ((res = spa_node_port_set_io(&port->mix_node,
p->direction,
p->port_id,
t->io.Buffers,
data, size)) < 0)
pw_log_warn("port %p: can't set io: %s", port, spa_strerror(res));
}
}
static int select_io(struct pw_link *this)
{
struct spa_io_buffers *io;
struct pw_type *t = &this->core->type;
if (this->output->implementation && this->output->implementation->get_io)
io = this->output->implementation->get_io(this->output->implementation_data,
t->io.Buffers, sizeof(struct spa_io_buffers));
else if (this->input->implementation && this->input->implementation->get_io)
io = this->input->implementation->get_io(this->input->implementation_data,
t->io.Buffers, sizeof(struct spa_io_buffers));
else
io = &this->io;
if (io == NULL)
return -EIO;
port_set_io(this, this->input, io, sizeof(struct spa_io_buffers), &this->rt.in_port);
port_set_io(this, this->output, io, sizeof(struct spa_io_buffers), &this->rt.out_port);
return 0;
}
static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_state) static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_state)
{ {
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
@ -500,17 +541,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
output = this->output; output = this->output;
pw_log_debug("link %p: doing alloc buffers %p %p", this, output->node, input->node); pw_log_debug("link %p: doing alloc buffers %p %p", this, output->node, input->node);
/* find out what's possible */ oinfo = output->spa_info;
if ((res = spa_node_port_get_info(output->node->node, output->direction, output->port_id, iinfo = input->spa_info;
&oinfo)) < 0) {
asprintf(&error, "error get output port info: %d", res);
goto error;
}
if ((res = spa_node_port_get_info(input->node->node, input->direction, input->port_id,
&iinfo)) < 0) {
asprintf(&error, "error get input port info: %d", res);
goto error;
}
in_flags = iinfo->flags; in_flags = iinfo->flags;
out_flags = oinfo->flags; out_flags = oinfo->flags;
@ -707,6 +739,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error; goto error;
} }
select_io(this);
return 0; return 0;
error: error:
@ -1287,7 +1321,6 @@ void pw_link_destroy(struct pw_link *link)
spa_list_remove(&link->link); spa_list_remove(&link->link);
input_remove(link, link->input); input_remove(link, link->input);
output_remove(link, link->output); output_remove(link, link->output);
if (link->global) { if (link->global) {

View file

@ -138,7 +138,9 @@ int pw_memblock_map(struct pw_memblock *mem)
} else { } else {
mem->ptr = NULL; mem->ptr = NULL;
} }
pw_log_debug("mem %p: map", mem);
pw_log_debug("mem %p: map to %p", mem, mem->ptr);
return 0; return 0;
} }

View file

@ -300,6 +300,13 @@ struct pw_node {
void *user_data; /**< extra user data */ void *user_data; /**< extra user data */
}; };
struct pw_port_implementation {
#define PW_VERSION_PORT_IMPLEMENTATION 0
uint32_t version;
void *(*get_io) (void *data, uint32_t id, size_t size);
};
struct pw_port { struct pw_port {
struct spa_list link; /**< link in node port_list */ struct spa_list link; /**< link in node port_list */
@ -330,6 +337,9 @@ struct pw_port {
struct spa_hook_list listener_list; struct spa_hook_list listener_list;
const struct pw_port_implementation *implementation;
void *implementation_data;
struct spa_node *mix; /**< optional port buffer mix/split */ struct spa_node *mix; /**< optional port buffer mix/split */
struct spa_node mix_node; /**< mix node implementation */ struct spa_node mix_node; /**< mix node implementation */
struct pw_map mix_port_map; /**< map from port_id from mixer */ struct pw_map mix_port_map; /**< map from port_id from mixer */
@ -341,6 +351,7 @@ struct pw_port {
struct spa_graph_node mix_node; /**< mixer node */ struct spa_graph_node mix_node; /**< mixer node */
} rt; /**< data only accessed from the data thread */ } rt; /**< data only accessed from the data thread */
void *owner_data; /**< extra owner data */
void *user_data; /**< extra user data */ void *user_data; /**< extra user data */
}; };

View file

@ -718,20 +718,16 @@ static void client_node_transport(void *object, uint32_t node_id,
for (i = 0; i < data->trans->area->max_input_ports; i++) { for (i = 0; i < data->trans->area->max_input_ports; i++) {
port_init(&data->in_ports[i]); port_init(&data->in_ports[i]);
data->trans->inputs[i] = SPA_IO_BUFFERS_INIT;
spa_graph_port_init(&data->in_ports[i].input, spa_graph_port_init(&data->in_ports[i].input,
SPA_DIRECTION_INPUT, SPA_DIRECTION_INPUT, i,
i,
0, 0,
&data->trans->inputs[i]); NULL);
spa_graph_port_init(&data->in_ports[i].output, spa_graph_port_init(&data->in_ports[i].output,
SPA_DIRECTION_OUTPUT, SPA_DIRECTION_OUTPUT, i,
i,
0, 0,
&data->trans->inputs[i]); NULL);
spa_graph_port_add(&data->in_node, &data->in_ports[i].output); spa_graph_port_add(&data->in_node, &data->in_ports[i].output);
spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input); spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input);
pw_log_info("transport in %d %p", i, &data->trans->inputs[i]);
} }
spa_list_for_each(port, &data->node->input_ports, link) { spa_list_for_each(port, &data->node->input_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input); spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input);
@ -740,20 +736,16 @@ static void client_node_transport(void *object, uint32_t node_id,
for (i = 0; i < data->trans->area->max_output_ports; i++) { for (i = 0; i < data->trans->area->max_output_ports; i++) {
port_init(&data->out_ports[i]); port_init(&data->out_ports[i]);
data->trans->outputs[i] = SPA_IO_BUFFERS_INIT;
spa_graph_port_init(&data->out_ports[i].output, spa_graph_port_init(&data->out_ports[i].output,
SPA_DIRECTION_OUTPUT, SPA_DIRECTION_OUTPUT, i,
i,
0, 0,
&data->trans->outputs[i]); NULL);
spa_graph_port_init(&data->out_ports[i].input, spa_graph_port_init(&data->out_ports[i].input,
SPA_DIRECTION_INPUT, SPA_DIRECTION_INPUT, i,
i,
0, 0,
&data->trans->outputs[i]); NULL);
spa_graph_port_add(&data->out_node, &data->out_ports[i].input); spa_graph_port_add(&data->out_node, &data->out_ports[i].input);
spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input); spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input);
pw_log_info("transport out %d %p", i, &data->trans->inputs[i]);
} }
spa_list_for_each(port, &data->node->output_ports, link) { spa_list_for_each(port, &data->node->output_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output); spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output);
@ -893,7 +885,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
/* FIXME we should call process_output on the node and see what its /* FIXME we should call process_output on the node and see what its
* status is */ * status is */
for (i = 0; i < data->trans->area->max_input_ports; i++) for (i = 0; i < data->trans->area->max_input_ports; i++)
data->trans->inputs[i].status = SPA_STATUS_NEED_BUFFER; data->in_ports[i].input.io->status = SPA_STATUS_NEED_BUFFER;
node_need_input(data); node_need_input(data);
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
@ -992,7 +984,7 @@ static void clear_buffers(struct node_data *data, struct port *port)
static void static void
client_node_port_use_buffers(void *object, client_node_port_use_buffers(void *object,
uint32_t seq, uint32_t seq,
enum spa_direction direction, uint32_t port_id, enum spa_direction direction, uint32_t port_id, uint32_t mix_id,
uint32_t n_buffers, struct pw_client_node_buffer *buffers) uint32_t n_buffers, struct pw_client_node_buffer *buffers)
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
@ -1156,6 +1148,7 @@ client_node_port_set_io(void *object,
uint32_t seq, uint32_t seq,
uint32_t direction, uint32_t direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t id, uint32_t id,
uint32_t memid, uint32_t memid,
uint32_t offset, uint32_t offset,
@ -1164,6 +1157,7 @@ client_node_port_set_io(void *object,
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_core *core = proxy->remote->core; struct pw_core *core = proxy->remote->core;
struct pw_type *t = &core->type;
struct port *port; struct port *port;
struct mem_id *mid; struct mem_id *mid;
void *ptr; void *ptr;
@ -1190,11 +1184,16 @@ client_node_port_set_io(void *object,
pw_log_debug("port %p: set io %s %p", port, spa_type_map_get_type(core->type.map, id), ptr); pw_log_debug("port %p: set io %s %p", port, spa_type_map_get_type(core->type.map, id), ptr);
spa_node_port_set_io(port->port->node->node, if (id == t->io.Buffers) {
direction, port_id, port->input.io = ptr;
id, port->output.io = ptr;
ptr, } else {
size); spa_node_port_set_io(port->port->node->node,
direction, port_id,
id,
ptr,
size);
}
} }

View file

@ -591,22 +591,22 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
int i; int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) { for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i]; struct spa_io_buffers *io = impl->io;
struct buffer_id *bid; struct buffer_id *bid;
uint32_t buffer_id; uint32_t buffer_id;
buffer_id = input->buffer_id; buffer_id = io->buffer_id;
pw_log_trace("stream %p: process input %d %d", stream, input->status, pw_log_trace("stream %p: process input %d %d", stream, io->status,
buffer_id); buffer_id);
if ((bid = find_buffer(stream, buffer_id)) == NULL) if ((bid = find_buffer(stream, buffer_id)) == NULL)
continue; continue;
if (impl->client_reuse) if (impl->client_reuse)
input->buffer_id = SPA_ID_INVALID; io->buffer_id = SPA_ID_INVALID;
if (input->status == SPA_STATUS_HAVE_BUFFER) { if (io->status == SPA_STATUS_HAVE_BUFFER) {
bid->used = true; bid->used = true;
impl->in_new_buffer = true; impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
@ -614,7 +614,7 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
impl->in_new_buffer = false; impl->in_new_buffer = false;
} }
input->status = SPA_STATUS_NEED_BUFFER; io->status = SPA_STATUS_NEED_BUFFER;
} }
send_need_input(stream); send_need_input(stream);
break; break;
@ -624,13 +624,13 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
int i; int i;
for (i = 0; i < impl->trans->area->n_output_ports; i++) { for (i = 0; i < impl->trans->area->n_output_ports; i++) {
struct spa_io_buffers *output = &impl->trans->outputs[i]; struct spa_io_buffers *io = impl->io;
if (output->buffer_id == SPA_ID_INVALID) if (io->buffer_id == SPA_ID_INVALID)
continue; continue;
reuse_buffer(stream, output->buffer_id); reuse_buffer(stream, io->buffer_id);
output->buffer_id = SPA_ID_INVALID; io->buffer_id = SPA_ID_INVALID;
} }
pw_log_trace("stream %p: process output", stream); pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true; impl->in_need_buffer = true;
@ -747,7 +747,7 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
if (impl->direction == SPA_DIRECTION_INPUT) { if (impl->direction == SPA_DIRECTION_INPUT) {
for (i = 0; i < impl->trans->area->max_input_ports; i++) for (i = 0; i < impl->trans->area->max_input_ports; i++)
impl->trans->inputs[i].status = SPA_STATUS_NEED_BUFFER; impl->io->status = SPA_STATUS_NEED_BUFFER;
send_need_input(stream); send_need_input(stream);
} }
else { else {
@ -861,7 +861,7 @@ client_node_add_mem(void *data,
static void static void
client_node_port_use_buffers(void *data, client_node_port_use_buffers(void *data,
uint32_t seq, uint32_t seq,
enum spa_direction direction, uint32_t port_id, enum spa_direction direction, uint32_t port_id, uint32_t mix_id,
uint32_t n_buffers, struct pw_client_node_buffer *buffers) uint32_t n_buffers, struct pw_client_node_buffer *buffers)
{ {
struct stream *impl = data; struct stream *impl = data;
@ -1020,6 +1020,7 @@ static void client_node_port_set_io(void *data,
uint32_t seq, uint32_t seq,
enum spa_direction direction, enum spa_direction direction,
uint32_t port_id, uint32_t port_id,
uint32_t mix_id,
uint32_t id, uint32_t id,
uint32_t mem_id, uint32_t mem_id,
uint32_t offset, uint32_t offset,
@ -1052,7 +1053,8 @@ static void client_node_port_set_io(void *data,
if (id == t->io.Buffers) { if (id == t->io.Buffers) {
impl->io = ptr; impl->io = ptr;
pw_log_debug("stream %p: set io id %u %p", stream, id, ptr); pw_log_debug("stream %p: %u.%u set io id %u %p", stream,
port_id, mix_id, id, ptr);
} }
res = 0; res = 0;
@ -1236,8 +1238,8 @@ int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
int i; int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) { for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i]; struct spa_io_buffers *io = impl->io;
input->buffer_id = id; io->buffer_id = id;
} }
} else { } else {
send_reuse_buffer(stream, id); send_reuse_buffer(stream, id);
@ -1261,17 +1263,17 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid; struct buffer_id *bid;
if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { if (impl->io->buffer_id != SPA_ID_INVALID) {
pw_log_debug("can't send %u, pending buffer %u", id, pw_log_debug("can't send %u, pending buffer %u", id,
impl->trans->outputs[0].buffer_id); impl->io->buffer_id);
return -EIO; return -EIO;
} }
if ((bid = find_buffer(stream, id)) && !bid->used) { if ((bid = find_buffer(stream, id)) && !bid->used) {
bid->used = true; bid->used = true;
spa_list_remove(&bid->link); spa_list_remove(&bid->link);
impl->trans->outputs[0].buffer_id = id; impl->io->buffer_id = id;
impl->trans->outputs[0].status = SPA_STATUS_HAVE_BUFFER; impl->io->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: send buffer %d", stream, id); pw_log_trace("stream %p: send buffer %d", stream, id);
if (!impl->in_need_buffer) if (!impl->in_need_buffer)
send_have_output(stream); send_have_output(stream);