diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 25510c88b..4e8126521 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -39,6 +39,8 @@ #include "extensions/protocol-native.h" #include "extensions/client-node.h" +#define MAX_MIX 4096 + /** \cond */ struct remote { struct pw_remote this; @@ -65,14 +67,11 @@ struct buffer_id { struct mem_id **mem; }; -struct port { - struct spa_graph_port output; - struct spa_graph_port input; - +struct mix { struct pw_port *port; - + uint32_t mix_id; + struct pw_port_mix mix; struct pw_array buffer_ids; - bool in_order; }; struct node_data { @@ -85,12 +84,7 @@ struct node_data { struct spa_source *rtsocket_source; struct pw_client_node_transport *trans; - struct spa_node out_node_impl; - struct spa_graph_node out_node; - struct port *out_ports; - struct spa_node in_node_impl; - struct spa_graph_node in_node; - struct port *in_ports; + struct mix mix[MAX_MIX]; struct pw_array mem_ids; @@ -485,19 +479,110 @@ static void unhandle_socket(struct pw_proxy *proxy) do_remove_source, 1, NULL, 0, true, data); } +static void do_push(struct node_data *data, enum spa_direction direction) +{ + struct spa_graph_node *node = &data->node->rt.node; + struct spa_graph_port *p; + + spa_list_for_each(p, &node->ports[direction], link) { + if (p->peer) + spa_node_process_input(p->peer->node->implementation); + } +} + +static void do_pull(struct node_data *data, enum spa_direction direction) +{ + struct spa_graph_node *node = &data->node->rt.node; + struct spa_graph_port *p; + + spa_list_for_each(p, &node->ports[direction], link) { + if (p->peer) + spa_node_process_output(p->peer->node->implementation); + } +} + +static void node_need_input(void *data) +{ + struct node_data *d = data; + uint64_t cmd = 1; + pw_log_trace("remote %p: send need input", data); + do_pull(data, SPA_DIRECTION_INPUT); + pw_client_node_transport_add_message(d->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); + write(d->rtwritefd, &cmd, 8); +} + +static void node_have_output(void *data) +{ + struct node_data *d = data; + uint64_t cmd = 1; + + pw_log_trace("remote %p: have output", data); + do_push(data, SPA_DIRECTION_OUTPUT); + pw_client_node_transport_add_message(d->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); + write(d->rtwritefd, &cmd, 8); +} + +static int process_input(struct node_data *data) +{ + struct spa_graph_node *node = &data->node->rt.node; + int res; + + pw_log_trace("remote %p: process input", data->remote); + do_push(data, SPA_DIRECTION_INPUT); + + if (node->implementation->process_input == NULL) + res = SPA_STATUS_HAVE_BUFFER; + else + res = spa_node_process_input(node->implementation); + + switch (res) { + case SPA_STATUS_HAVE_BUFFER: + node_have_output(data); + break; + case SPA_STATUS_NEED_BUFFER: + node_need_input(data); + break; + } + return res; +} + +static int process_output(struct node_data *data) +{ + struct spa_graph_node *node = &data->node->rt.node; + int res; + + pw_log_trace("remote %p: process output", data->remote); + do_pull(data, SPA_DIRECTION_OUTPUT); + + if (node->implementation->process_output == NULL) + res = SPA_STATUS_NEED_BUFFER; + else + res = spa_node_process_output(node->implementation); + + switch (res) { + case SPA_STATUS_HAVE_BUFFER: + node_have_output(data); + break; + case SPA_STATUS_NEED_BUFFER: + node_need_input(data); + break; + } + return res; +} + static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message) { struct node_data *data = proxy->user_data; switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: - pw_log_trace("remote %p: process input", data->remote); - spa_graph_have_output(data->node->rt.graph, &data->in_node); + process_input(data); break; case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: - pw_log_trace("remote %p: process output", data->remote); - spa_graph_need_input(data->node->rt.graph, &data->out_node); + process_output(data); break; case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: @@ -506,16 +591,9 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_ (struct pw_client_node_message_port_reuse_buffer *) message; uint32_t port_id = rb->body.port_id.value; uint32_t buffer_id = rb->body.buffer_id.value; - struct spa_graph_port *p, *pp; + struct spa_graph_node *node = &data->node->rt.node; - spa_list_for_each(p, &data->out_node.ports[SPA_DIRECTION_INPUT], link) { - if (p->port_id != port_id || (pp = p->peer) == NULL) - continue; - - spa_node_port_reuse_buffer(pp->node->implementation, - pp->port_id, buffer_id); - break; - } + spa_node_port_reuse_buffer(node->implementation, port_id, buffer_id); break; } default: @@ -618,7 +696,6 @@ static void clear_memid(struct node_data *data, struct mem_id *mid) static void clean_transport(struct pw_proxy *proxy) { struct node_data *data = proxy->user_data; - struct pw_port *port; struct mem_id *mid; if (data->trans == NULL) @@ -626,46 +703,67 @@ static void clean_transport(struct pw_proxy *proxy) unhandle_socket(proxy); - spa_list_for_each(port, &data->node->input_ports, link) { - spa_graph_port_remove(&data->in_ports[port->port_id].output); - spa_graph_port_remove(&data->in_ports[port->port_id].input); - } - spa_list_for_each(port, &data->node->output_ports, link) { - spa_graph_port_remove(&data->out_ports[port->port_id].output); - spa_graph_port_remove(&data->out_ports[port->port_id].input); - } - pw_array_for_each(mid, &data->mem_ids) clear_memid(data, mid); pw_array_clear(&data->mem_ids); - free(data->in_ports); - free(data->out_ports); pw_client_node_transport_destroy(data->trans); close(data->rtwritefd); data->trans = NULL; } -static void port_init(struct port *port) +static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) { - pw_array_init(&port->buffer_ids, 32); - pw_array_ensure_size(&port->buffer_ids, sizeof(struct buffer_id) * 64); - port->in_order = true; + mix->port = port; + mix->mix_id = mix_id; + pw_port_init_mix(port, &mix->mix); + pw_array_init(&mix->buffer_ids, 32); + pw_array_ensure_size(&mix->buffer_ids, sizeof(struct buffer_id) * 64); } -static struct port *find_port(struct node_data *data, enum spa_direction direction, uint32_t port_id) +static int +do_activate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) { - if (direction == SPA_DIRECTION_INPUT) { - if (port_id > data->trans->area->max_input_ports) - return NULL; - return &data->in_ports[port_id]; - } - else { - if (port_id > data->trans->area->max_output_ports) - return NULL; - return &data->out_ports[port_id]; + struct mix *mix = user_data; + SPA_FLAG_UNSET(mix->mix.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + spa_graph_port_add(&mix->port->rt.mix_node, &mix->mix.port); + return 0; +} + +static struct mix *find_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix, *empty = NULL; + struct pw_port *port; + int i; + + for (i = 0; i < MAX_MIX; i++) { + mix = &data->mix[i]; + + if (mix->port == NULL) { + empty = mix; + continue; + } + if (mix->port->direction == (enum pw_direction) direction && + mix->port->port_id == port_id && + mix->mix_id == mix_id) + return mix; } + if (empty == NULL) + return NULL; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) + return NULL; + + mix_init(empty, port, mix_id); + + pw_loop_invoke(data->core->data_loop, + do_activate_mix, SPA_ID_INVALID, NULL, 0, false, empty); + + return empty; } static void client_node_add_mem(void *object, @@ -700,8 +798,6 @@ static void client_node_transport(void *object, uint32_t node_id, { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - struct pw_port *port; - int i; clean_transport(proxy); @@ -711,47 +807,6 @@ static void client_node_transport(void *object, uint32_t node_id, pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u", proxy, data->trans, readfd, writefd, node_id); - data->in_ports = calloc(data->trans->area->max_input_ports, - sizeof(struct port)); - data->out_ports = calloc(data->trans->area->max_output_ports, - sizeof(struct port)); - - for (i = 0; i < data->trans->area->max_input_ports; i++) { - port_init(&data->in_ports[i]); - spa_graph_port_init(&data->in_ports[i].input, - SPA_DIRECTION_INPUT, i, - 0, - NULL); - spa_graph_port_init(&data->in_ports[i].output, - SPA_DIRECTION_OUTPUT, i, - 0, - NULL); - spa_graph_port_add(&data->in_node, &data->in_ports[i].output); - spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input); - } - spa_list_for_each(port, &data->node->input_ports, link) { - spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input); - data->in_ports[port->port_id].port = port; - } - - for (i = 0; i < data->trans->area->max_output_ports; i++) { - port_init(&data->out_ports[i]); - spa_graph_port_init(&data->out_ports[i].output, - SPA_DIRECTION_OUTPUT, i, - 0, - NULL); - spa_graph_port_init(&data->out_ports[i].input, - SPA_DIRECTION_INPUT, i, - 0, - NULL); - spa_graph_port_add(&data->out_node, &data->out_ports[i].input); - spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input); - } - spa_list_for_each(port, &data->node->output_ports, link) { - spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output); - data->out_ports[port->port_id].port = port; - } - data->rtwritefd = writefd; data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, readfd, @@ -833,24 +888,6 @@ static void client_node_event(void *object, const struct spa_event *event) } -static void node_need_input(void *data) -{ - struct node_data *d = data; - uint64_t cmd = 1; - pw_client_node_transport_add_message(d->trans, - &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); - write(d->rtwritefd, &cmd, 8); -} - -static void node_have_output(void *data) -{ - struct node_data *d = data; - uint64_t cmd = 1; - pw_client_node_transport_add_message(d->trans, - &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); - write(d->rtwritefd, &cmd, 8); -} - static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) { struct pw_proxy *proxy = object; @@ -871,8 +908,6 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com pw_client_node_proxy_done(data->node_proxy, seq, res); } else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Start) { - int i; - pw_log_debug("node %p: start %d", proxy, seq); pw_loop_update_io(remote->core->data_loop, @@ -882,12 +917,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com if ((res = spa_node_send_command(data->node->node, command)) < 0) pw_log_warn("node %p: start failed", proxy); - /* FIXME we should call process_output on the node and see what its - * status is */ - for (i = 0; i < data->trans->area->max_input_ports; i++) - data->in_ports[i].input.io->status = SPA_STATUS_NEED_BUFFER; - node_need_input(data); - + process_output(data); pw_client_node_proxy_done(data->node_proxy, seq, res); } else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.ClockUpdate) { @@ -932,20 +962,20 @@ client_node_port_set_param(void *object, { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - struct port *port; + struct pw_port *port; int res; - port = find_port(data, direction, port_id); - if (port == NULL || port->port == NULL) { + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) { res = -EINVAL; goto done; } - res = pw_port_set_param(port->port, id, flags, param); + res = pw_port_set_param(port, id, flags, param); if (res < 0) goto done; - add_port_update(proxy, port->port, + add_port_update(proxy, port, PW_CLIENT_NODE_PORT_UPDATE_PARAMS | PW_CLIENT_NODE_PORT_UPDATE_INFO); @@ -953,15 +983,16 @@ client_node_port_set_param(void *object, pw_client_node_proxy_done(data->node_proxy, seq, res); } -static void clear_buffers(struct node_data *data, struct port *port) +static void clear_buffers(struct node_data *data, struct mix *mix) { + struct pw_port *port = mix->port; struct buffer_id *bid; int i; pw_log_debug("port %p: clear buffers", port); - pw_port_use_buffers(port->port, NULL, 0); + pw_port_use_buffers(port, NULL, 0); - pw_array_for_each(bid, &port->buffer_ids) { + pw_array_for_each(bid, &mix->buffer_ids) { if (bid->ptr != NULL) { if (munmap(bid->ptr, bid->map.size) < 0) pw_log_warn("failed to unmap: %m"); @@ -978,7 +1009,7 @@ static void clear_buffers(struct node_data *data, struct port *port) free(bid->buf); bid->buf = NULL; } - port->buffer_ids.size = 0; + mix->buffer_ids.size = 0; } static void @@ -992,13 +1023,13 @@ client_node_port_use_buffers(void *object, struct buffer_id *bid; uint32_t i, j, len; struct spa_buffer *b, **bufs; - struct port *port; + struct mix *mix; struct pw_core *core = proxy->remote->core; struct pw_type *t = &core->type; int res, prot; - port = find_port(data, direction, port_id); - if (port == NULL) { + mix = find_mix(data, direction, port_id, mix_id); + if (mix == NULL) { res = -EINVAL; goto done; } @@ -1006,7 +1037,7 @@ client_node_port_use_buffers(void *object, prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); /* clear previous buffers */ - clear_buffers(data, port); + clear_buffers(data, mix); bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); @@ -1020,8 +1051,8 @@ client_node_port_use_buffers(void *object, goto cleanup; } - len = pw_array_get_len(&port->buffer_ids, struct buffer_id); - bid = pw_array_add(&port->buffer_ids, sizeof(struct buffer_id)); + len = pw_array_get_len(&mix->buffer_ids, struct buffer_id); + bid = pw_array_add(&mix->buffer_ids, sizeof(struct buffer_id)); pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); @@ -1114,14 +1145,14 @@ client_node_port_use_buffers(void *object, bufs[i] = b; } - res = pw_port_use_buffers(port->port, bufs, n_buffers); + res = pw_port_use_buffers(mix->port, bufs, n_buffers); done: pw_client_node_proxy_done(data->node_proxy, seq, res); return; cleanup: - clear_buffers(data, port); + clear_buffers(data, mix); goto done; } @@ -1134,13 +1165,13 @@ client_node_port_command(void *object, { struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - struct port *port; + struct pw_port *port; - port = find_port(data, direction, port_id); + port = pw_node_find_port(data->node, direction, port_id); if (port == NULL) return; - pw_port_send_command(port->port, true, command); + pw_port_send_command(port, true, command); } static void @@ -1158,12 +1189,12 @@ client_node_port_set_io(void *object, struct node_data *data = proxy->user_data; struct pw_core *core = proxy->remote->core; struct pw_type *t = &core->type; - struct port *port; + struct mix *mix; struct mem_id *mid; void *ptr; - port = find_port(data, direction, port_id); - if (port == NULL) + mix = find_mix(data, direction, port_id, mix_id); + if (mix == NULL) return; if (memid == SPA_ID_INVALID) { @@ -1182,13 +1213,20 @@ client_node_port_set_io(void *object, } - pw_log_debug("port %p: set io %s %p", port, spa_type_map_get_type(core->type.map, id), ptr); + pw_log_debug("port %p: set io %s %p", mix->port, + spa_type_map_get_type(core->type.map, id), ptr); if (id == t->io.Buffers) { - port->input.io = ptr; - port->output.io = ptr; + mix->mix.port.io = ptr; + if (ptr) { + mix->mix.port.io->buffer_id = SPA_ID_INVALID; + if (direction == SPA_DIRECTION_INPUT) + mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER; + else + mix->mix.port.io->status = SPA_STATUS_OK; + } } else { - spa_node_port_set_io(port->port->node->node, + spa_node_port_set_io(mix->port->node->node, direction, port_id, id, ptr, @@ -1254,6 +1292,7 @@ static void node_active_changed(void *data, bool active) pw_client_node_proxy_set_active(d->node_proxy, active); } + static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .destroy = node_destroy, @@ -1262,10 +1301,13 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; -static void clear_port(struct node_data *data, struct port *port) +static void clear_mix(struct node_data *data, struct mix *mix) { - clear_buffers(data, port); - pw_array_clear(&port->buffer_ids); + if (mix->port) { + clear_buffers(data, mix); + pw_array_clear(&mix->buffer_ids); + mix->port = NULL; + } } static void node_proxy_destroy(void *_data) @@ -1275,10 +1317,8 @@ static void node_proxy_destroy(void *_data) int i; if (data->trans) { - for (i = 0; i < data->trans->area->max_input_ports; i++) - clear_port(data, &data->in_ports[i]); - for (i = 0; i < data->trans->area->max_output_ports; i++) - clear_port(data, &data->out_ports[i]); + for (i = 0; i < MAX_MIX; i++) + clear_mix(data, &data->mix[i]); } clean_transport(proxy); @@ -1290,36 +1330,6 @@ static const struct pw_proxy_events proxy_events = { .destroy = node_proxy_destroy, }; -static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id) -{ - pw_log_trace("node %p: reuse buffer %d %d", node, port_id, buffer_id); - return 0; -} - -static int impl_process_input(struct spa_node *node) -{ - struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, out_node_impl); - pw_log_trace("node %p: process input", node); - node_have_output(data); - return 0; -} - -static int impl_process_output(struct spa_node *node) -{ - struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, in_node_impl); - pw_log_trace("node %p: process output", node); - node_need_input(data); - return 0; -} - -static const struct spa_node node_impl = { - SPA_VERSION_NODE, - NULL, - .process_input = impl_process_input, - .process_output = impl_process_output, - .port_reuse_buffer = impl_port_reuse_buffer, -}; - struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct pw_node *node) { @@ -1342,17 +1352,10 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->core = pw_node_get_core(node); data->t = pw_core_get_type(data->core); data->node_proxy = (struct pw_client_node_proxy *)proxy; - data->in_node_impl = node_impl; - data->out_node_impl = node_impl; pw_array_init(&data->mem_ids, 64); pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); - spa_graph_node_init(&data->in_node); - spa_graph_node_set_implementation(&data->in_node, &data->in_node_impl); - spa_graph_node_init(&data->out_node); - spa_graph_node_set_implementation(&data->out_node, &data->out_node_impl); - pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); pw_node_add_listener(node, &data->node_listener, &node_events, data);