remote: work directly with the port mixers

Add port_mixer on the ports
Work directly with the port mixers
simplify scheduling
This commit is contained in:
Wim Taymans 2018-03-02 13:30:10 +01:00
parent c0865581b9
commit 3bb988820e

View file

@ -39,6 +39,8 @@
#include "extensions/protocol-native.h" #include "extensions/protocol-native.h"
#include "extensions/client-node.h" #include "extensions/client-node.h"
#define MAX_MIX 4096
/** \cond */ /** \cond */
struct remote { struct remote {
struct pw_remote this; struct pw_remote this;
@ -65,14 +67,11 @@ struct buffer_id {
struct mem_id **mem; struct mem_id **mem;
}; };
struct port { struct mix {
struct spa_graph_port output;
struct spa_graph_port input;
struct pw_port *port; struct pw_port *port;
uint32_t mix_id;
struct pw_port_mix mix;
struct pw_array buffer_ids; struct pw_array buffer_ids;
bool in_order;
}; };
struct node_data { struct node_data {
@ -85,12 +84,7 @@ struct node_data {
struct spa_source *rtsocket_source; struct spa_source *rtsocket_source;
struct pw_client_node_transport *trans; struct pw_client_node_transport *trans;
struct spa_node out_node_impl; struct mix mix[MAX_MIX];
struct spa_graph_node out_node;
struct port *out_ports;
struct spa_node in_node_impl;
struct spa_graph_node in_node;
struct port *in_ports;
struct pw_array mem_ids; struct pw_array mem_ids;
@ -485,19 +479,110 @@ static void unhandle_socket(struct pw_proxy *proxy)
do_remove_source, 1, NULL, 0, true, data); do_remove_source, 1, NULL, 0, true, data);
} }
static void do_push(struct node_data *data, enum spa_direction direction)
{
struct spa_graph_node *node = &data->node->rt.node;
struct spa_graph_port *p;
spa_list_for_each(p, &node->ports[direction], link) {
if (p->peer)
spa_node_process_input(p->peer->node->implementation);
}
}
static void do_pull(struct node_data *data, enum spa_direction direction)
{
struct spa_graph_node *node = &data->node->rt.node;
struct spa_graph_port *p;
spa_list_for_each(p, &node->ports[direction], link) {
if (p->peer)
spa_node_process_output(p->peer->node->implementation);
}
}
static void node_need_input(void *data)
{
struct node_data *d = data;
uint64_t cmd = 1;
pw_log_trace("remote %p: send need input", data);
do_pull(data, SPA_DIRECTION_INPUT);
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(d->rtwritefd, &cmd, 8);
}
static void node_have_output(void *data)
{
struct node_data *d = data;
uint64_t cmd = 1;
pw_log_trace("remote %p: have output", data);
do_push(data, SPA_DIRECTION_OUTPUT);
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(d->rtwritefd, &cmd, 8);
}
static int process_input(struct node_data *data)
{
struct spa_graph_node *node = &data->node->rt.node;
int res;
pw_log_trace("remote %p: process input", data->remote);
do_push(data, SPA_DIRECTION_INPUT);
if (node->implementation->process_input == NULL)
res = SPA_STATUS_HAVE_BUFFER;
else
res = spa_node_process_input(node->implementation);
switch (res) {
case SPA_STATUS_HAVE_BUFFER:
node_have_output(data);
break;
case SPA_STATUS_NEED_BUFFER:
node_need_input(data);
break;
}
return res;
}
static int process_output(struct node_data *data)
{
struct spa_graph_node *node = &data->node->rt.node;
int res;
pw_log_trace("remote %p: process output", data->remote);
do_pull(data, SPA_DIRECTION_OUTPUT);
if (node->implementation->process_output == NULL)
res = SPA_STATUS_NEED_BUFFER;
else
res = spa_node_process_output(node->implementation);
switch (res) {
case SPA_STATUS_HAVE_BUFFER:
node_have_output(data);
break;
case SPA_STATUS_NEED_BUFFER:
node_need_input(data);
break;
}
return res;
}
static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message) static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message)
{ {
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
pw_log_trace("remote %p: process input", data->remote); process_input(data);
spa_graph_have_output(data->node->rt.graph, &data->in_node);
break; break;
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
pw_log_trace("remote %p: process output", data->remote); process_output(data);
spa_graph_need_input(data->node->rt.graph, &data->out_node);
break; break;
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
@ -506,16 +591,9 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_
(struct pw_client_node_message_port_reuse_buffer *) message; (struct pw_client_node_message_port_reuse_buffer *) message;
uint32_t port_id = rb->body.port_id.value; uint32_t port_id = rb->body.port_id.value;
uint32_t buffer_id = rb->body.buffer_id.value; uint32_t buffer_id = rb->body.buffer_id.value;
struct spa_graph_port *p, *pp; struct spa_graph_node *node = &data->node->rt.node;
spa_list_for_each(p, &data->out_node.ports[SPA_DIRECTION_INPUT], link) { spa_node_port_reuse_buffer(node->implementation, port_id, buffer_id);
if (p->port_id != port_id || (pp = p->peer) == NULL)
continue;
spa_node_port_reuse_buffer(pp->node->implementation,
pp->port_id, buffer_id);
break;
}
break; break;
} }
default: default:
@ -618,7 +696,6 @@ static void clear_memid(struct node_data *data, struct mem_id *mid)
static void clean_transport(struct pw_proxy *proxy) static void clean_transport(struct pw_proxy *proxy)
{ {
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_port *port;
struct mem_id *mid; struct mem_id *mid;
if (data->trans == NULL) if (data->trans == NULL)
@ -626,46 +703,67 @@ static void clean_transport(struct pw_proxy *proxy)
unhandle_socket(proxy); unhandle_socket(proxy);
spa_list_for_each(port, &data->node->input_ports, link) {
spa_graph_port_remove(&data->in_ports[port->port_id].output);
spa_graph_port_remove(&data->in_ports[port->port_id].input);
}
spa_list_for_each(port, &data->node->output_ports, link) {
spa_graph_port_remove(&data->out_ports[port->port_id].output);
spa_graph_port_remove(&data->out_ports[port->port_id].input);
}
pw_array_for_each(mid, &data->mem_ids) pw_array_for_each(mid, &data->mem_ids)
clear_memid(data, mid); clear_memid(data, mid);
pw_array_clear(&data->mem_ids); pw_array_clear(&data->mem_ids);
free(data->in_ports);
free(data->out_ports);
pw_client_node_transport_destroy(data->trans); pw_client_node_transport_destroy(data->trans);
close(data->rtwritefd); close(data->rtwritefd);
data->trans = NULL; data->trans = NULL;
} }
static void port_init(struct port *port) static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id)
{ {
pw_array_init(&port->buffer_ids, 32); mix->port = port;
pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64); mix->mix_id = mix_id;
port->in_order = true; pw_port_init_mix(port, &mix->mix);
pw_array_init(&mix->buffer_ids, 32);
pw_array_ensure_size(&mix->buffer_ids, sizeof(struct buffer_id) * 64);
} }
static struct port *find_port(struct node_data *data, enum spa_direction direction, uint32_t port_id) static int
do_activate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{ {
if (direction == SPA_DIRECTION_INPUT) { struct mix *mix = user_data;
if (port_id > data->trans->area->max_input_ports) SPA_FLAG_UNSET(mix->mix.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
return NULL; spa_graph_port_add(&mix->port->rt.mix_node, &mix->mix.port);
return &data->in_ports[port_id]; return 0;
} }
else {
if (port_id > data->trans->area->max_output_ports) static struct mix *find_mix(struct node_data *data,
return NULL; enum spa_direction direction, uint32_t port_id, uint32_t mix_id)
return &data->out_ports[port_id]; {
struct mix *mix, *empty = NULL;
struct pw_port *port;
int i;
for (i = 0; i < MAX_MIX; i++) {
mix = &data->mix[i];
if (mix->port == NULL) {
empty = mix;
continue;
}
if (mix->port->direction == (enum pw_direction) direction &&
mix->port->port_id == port_id &&
mix->mix_id == mix_id)
return mix;
} }
if (empty == NULL)
return NULL;
port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL)
return NULL;
mix_init(empty, port, mix_id);
pw_loop_invoke(data->core->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, empty);
return empty;
} }
static void client_node_add_mem(void *object, static void client_node_add_mem(void *object,
@ -700,8 +798,6 @@ static void client_node_transport(void *object, uint32_t node_id,
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_port *port;
int i;
clean_transport(proxy); clean_transport(proxy);
@ -711,47 +807,6 @@ static void client_node_transport(void *object, uint32_t node_id,
pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u", pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u",
proxy, data->trans, readfd, writefd, node_id); proxy, data->trans, readfd, writefd, node_id);
data->in_ports = calloc(data->trans->area->max_input_ports,
sizeof(struct port));
data->out_ports = calloc(data->trans->area->max_output_ports,
sizeof(struct port));
for (i = 0; i < data->trans->area->max_input_ports; i++) {
port_init(&data->in_ports[i]);
spa_graph_port_init(&data->in_ports[i].input,
SPA_DIRECTION_INPUT, i,
0,
NULL);
spa_graph_port_init(&data->in_ports[i].output,
SPA_DIRECTION_OUTPUT, i,
0,
NULL);
spa_graph_port_add(&data->in_node, &data->in_ports[i].output);
spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input);
}
spa_list_for_each(port, &data->node->input_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input);
data->in_ports[port->port_id].port = port;
}
for (i = 0; i < data->trans->area->max_output_ports; i++) {
port_init(&data->out_ports[i]);
spa_graph_port_init(&data->out_ports[i].output,
SPA_DIRECTION_OUTPUT, i,
0,
NULL);
spa_graph_port_init(&data->out_ports[i].input,
SPA_DIRECTION_INPUT, i,
0,
NULL);
spa_graph_port_add(&data->out_node, &data->out_ports[i].input);
spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input);
}
spa_list_for_each(port, &data->node->output_ports, link) {
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output);
data->out_ports[port->port_id].port = port;
}
data->rtwritefd = writefd; data->rtwritefd = writefd;
data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop,
readfd, readfd,
@ -833,24 +888,6 @@ static void client_node_event(void *object, const struct spa_event *event)
} }
static void node_need_input(void *data)
{
struct node_data *d = data;
uint64_t cmd = 1;
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(d->rtwritefd, &cmd, 8);
}
static void node_have_output(void *data)
{
struct node_data *d = data;
uint64_t cmd = 1;
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(d->rtwritefd, &cmd, 8);
}
static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) static void client_node_command(void *object, uint32_t seq, const struct spa_command *command)
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
@ -871,8 +908,6 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
} }
else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Start) { else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Start) {
int i;
pw_log_debug("node %p: start %d", proxy, seq); pw_log_debug("node %p: start %d", proxy, seq);
pw_loop_update_io(remote->core->data_loop, pw_loop_update_io(remote->core->data_loop,
@ -882,12 +917,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
if ((res = spa_node_send_command(data->node->node, command)) < 0) if ((res = spa_node_send_command(data->node->node, command)) < 0)
pw_log_warn("node %p: start failed", proxy); pw_log_warn("node %p: start failed", proxy);
/* FIXME we should call process_output on the node and see what its process_output(data);
* status is */
for (i = 0; i < data->trans->area->max_input_ports; i++)
data->in_ports[i].input.io->status = SPA_STATUS_NEED_BUFFER;
node_need_input(data);
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
} }
else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.ClockUpdate) { else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.ClockUpdate) {
@ -932,20 +962,20 @@ client_node_port_set_param(void *object,
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct port *port; struct pw_port *port;
int res; int res;
port = find_port(data, direction, port_id); port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL || port->port == NULL) { if (port == NULL) {
res = -EINVAL; res = -EINVAL;
goto done; goto done;
} }
res = pw_port_set_param(port->port, id, flags, param); res = pw_port_set_param(port, id, flags, param);
if (res < 0) if (res < 0)
goto done; goto done;
add_port_update(proxy, port->port, add_port_update(proxy, port,
PW_CLIENT_NODE_PORT_UPDATE_PARAMS | PW_CLIENT_NODE_PORT_UPDATE_PARAMS |
PW_CLIENT_NODE_PORT_UPDATE_INFO); PW_CLIENT_NODE_PORT_UPDATE_INFO);
@ -953,15 +983,16 @@ client_node_port_set_param(void *object,
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
} }
static void clear_buffers(struct node_data *data, struct port *port) static void clear_buffers(struct node_data *data, struct mix *mix)
{ {
struct pw_port *port = mix->port;
struct buffer_id *bid; struct buffer_id *bid;
int i; int i;
pw_log_debug("port %p: clear buffers", port); pw_log_debug("port %p: clear buffers", port);
pw_port_use_buffers(port->port, NULL, 0); pw_port_use_buffers(port, NULL, 0);
pw_array_for_each(bid, &port->buffer_ids) { pw_array_for_each(bid, &mix->buffer_ids) {
if (bid->ptr != NULL) { if (bid->ptr != NULL) {
if (munmap(bid->ptr, bid->map.size) < 0) if (munmap(bid->ptr, bid->map.size) < 0)
pw_log_warn("failed to unmap: %m"); pw_log_warn("failed to unmap: %m");
@ -978,7 +1009,7 @@ static void clear_buffers(struct node_data *data, struct port *port)
free(bid->buf); free(bid->buf);
bid->buf = NULL; bid->buf = NULL;
} }
port->buffer_ids.size = 0; mix->buffer_ids.size = 0;
} }
static void static void
@ -992,13 +1023,13 @@ client_node_port_use_buffers(void *object,
struct buffer_id *bid; struct buffer_id *bid;
uint32_t i, j, len; uint32_t i, j, len;
struct spa_buffer *b, **bufs; struct spa_buffer *b, **bufs;
struct port *port; struct mix *mix;
struct pw_core *core = proxy->remote->core; struct pw_core *core = proxy->remote->core;
struct pw_type *t = &core->type; struct pw_type *t = &core->type;
int res, prot; int res, prot;
port = find_port(data, direction, port_id); mix = find_mix(data, direction, port_id, mix_id);
if (port == NULL) { if (mix == NULL) {
res = -EINVAL; res = -EINVAL;
goto done; goto done;
} }
@ -1006,7 +1037,7 @@ client_node_port_use_buffers(void *object,
prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
/* clear previous buffers */ /* clear previous buffers */
clear_buffers(data, port); clear_buffers(data, mix);
bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); bufs = alloca(n_buffers * sizeof(struct spa_buffer *));
@ -1020,8 +1051,8 @@ client_node_port_use_buffers(void *object,
goto cleanup; goto cleanup;
} }
len = pw_array_get_len(&port->buffer_ids, struct buffer_id); len = pw_array_get_len(&mix->buffer_ids, struct buffer_id);
bid = pw_array_add(&port->buffer_ids, sizeof(struct buffer_id)); bid = pw_array_add(&mix->buffer_ids, sizeof(struct buffer_id));
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize);
@ -1114,14 +1145,14 @@ client_node_port_use_buffers(void *object,
bufs[i] = b; bufs[i] = b;
} }
res = pw_port_use_buffers(port->port, bufs, n_buffers); res = pw_port_use_buffers(mix->port, bufs, n_buffers);
done: done:
pw_client_node_proxy_done(data->node_proxy, seq, res); pw_client_node_proxy_done(data->node_proxy, seq, res);
return; return;
cleanup: cleanup:
clear_buffers(data, port); clear_buffers(data, mix);
goto done; goto done;
} }
@ -1134,13 +1165,13 @@ client_node_port_command(void *object,
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct port *port; struct pw_port *port;
port = find_port(data, direction, port_id); port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL) if (port == NULL)
return; return;
pw_port_send_command(port->port, true, command); pw_port_send_command(port, true, command);
} }
static void static void
@ -1158,12 +1189,12 @@ client_node_port_set_io(void *object,
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_core *core = proxy->remote->core; struct pw_core *core = proxy->remote->core;
struct pw_type *t = &core->type; struct pw_type *t = &core->type;
struct port *port; struct mix *mix;
struct mem_id *mid; struct mem_id *mid;
void *ptr; void *ptr;
port = find_port(data, direction, port_id); mix = find_mix(data, direction, port_id, mix_id);
if (port == NULL) if (mix == NULL)
return; return;
if (memid == SPA_ID_INVALID) { if (memid == SPA_ID_INVALID) {
@ -1182,13 +1213,20 @@ client_node_port_set_io(void *object,
} }
pw_log_debug("port %p: set io %s %p", port, spa_type_map_get_type(core->type.map, id), ptr); pw_log_debug("port %p: set io %s %p", mix->port,
spa_type_map_get_type(core->type.map, id), ptr);
if (id == t->io.Buffers) { if (id == t->io.Buffers) {
port->input.io = ptr; mix->mix.port.io = ptr;
port->output.io = ptr; if (ptr) {
mix->mix.port.io->buffer_id = SPA_ID_INVALID;
if (direction == SPA_DIRECTION_INPUT)
mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER;
else
mix->mix.port.io->status = SPA_STATUS_OK;
}
} else { } else {
spa_node_port_set_io(port->port->node->node, spa_node_port_set_io(mix->port->node->node,
direction, port_id, direction, port_id,
id, id,
ptr, ptr,
@ -1254,6 +1292,7 @@ static void node_active_changed(void *data, bool active)
pw_client_node_proxy_set_active(d->node_proxy, active); pw_client_node_proxy_set_active(d->node_proxy, active);
} }
static const struct pw_node_events node_events = { static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS, PW_VERSION_NODE_EVENTS,
.destroy = node_destroy, .destroy = node_destroy,
@ -1262,10 +1301,13 @@ static const struct pw_node_events node_events = {
.have_output = node_have_output, .have_output = node_have_output,
}; };
static void clear_port(struct node_data *data, struct port *port) static void clear_mix(struct node_data *data, struct mix *mix)
{ {
clear_buffers(data, port); if (mix->port) {
pw_array_clear(&port->buffer_ids); clear_buffers(data, mix);
pw_array_clear(&mix->buffer_ids);
mix->port = NULL;
}
} }
static void node_proxy_destroy(void *_data) static void node_proxy_destroy(void *_data)
@ -1275,10 +1317,8 @@ static void node_proxy_destroy(void *_data)
int i; int i;
if (data->trans) { if (data->trans) {
for (i = 0; i < data->trans->area->max_input_ports; i++) for (i = 0; i < MAX_MIX; i++)
clear_port(data, &data->in_ports[i]); clear_mix(data, &data->mix[i]);
for (i = 0; i < data->trans->area->max_output_ports; i++)
clear_port(data, &data->out_ports[i]);
} }
clean_transport(proxy); clean_transport(proxy);
@ -1290,36 +1330,6 @@ static const struct pw_proxy_events proxy_events = {
.destroy = node_proxy_destroy, .destroy = node_proxy_destroy,
}; };
static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id)
{
pw_log_trace("node %p: reuse buffer %d %d", node, port_id, buffer_id);
return 0;
}
static int impl_process_input(struct spa_node *node)
{
struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, out_node_impl);
pw_log_trace("node %p: process input", node);
node_have_output(data);
return 0;
}
static int impl_process_output(struct spa_node *node)
{
struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, in_node_impl);
pw_log_trace("node %p: process output", node);
node_need_input(data);
return 0;
}
static const struct spa_node node_impl = {
SPA_VERSION_NODE,
NULL,
.process_input = impl_process_input,
.process_output = impl_process_output,
.port_reuse_buffer = impl_port_reuse_buffer,
};
struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct pw_proxy *pw_remote_export(struct pw_remote *remote,
struct pw_node *node) struct pw_node *node)
{ {
@ -1342,17 +1352,10 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
data->core = pw_node_get_core(node); data->core = pw_node_get_core(node);
data->t = pw_core_get_type(data->core); data->t = pw_core_get_type(data->core);
data->node_proxy = (struct pw_client_node_proxy *)proxy; data->node_proxy = (struct pw_client_node_proxy *)proxy;
data->in_node_impl = node_impl;
data->out_node_impl = node_impl;
pw_array_init(&data->mem_ids, 64); pw_array_init(&data->mem_ids, 64);
pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64);
spa_graph_node_init(&data->in_node);
spa_graph_node_set_implementation(&data->in_node, &data->in_node_impl);
spa_graph_node_init(&data->out_node);
spa_graph_node_set_implementation(&data->out_node, &data->out_node_impl);
pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data);
pw_node_add_listener(node, &data->node_listener, &node_events, data); pw_node_add_listener(node, &data->node_listener, &node_events, data);