client-node: remove limit on ports

Use a pw_map to keep track of the ports by index so that we don't
have an upper limit on the node ports anymore.
This commit is contained in:
Wim Taymans 2022-02-09 15:01:41 +01:00
parent d0d924c33c
commit 9bebad6ed3

View file

@ -47,29 +47,15 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
/** \cond */ /** \cond */
#define MAX_INPUTS 1024
#define MAX_OUTPUTS 1024
#define MAX_BUFFERS 64 #define MAX_BUFFERS 64
#define MAX_METAS 16u #define MAX_METAS 16u
#define MAX_DATAS 64u #define MAX_DATAS 64u
#define MAX_AREAS 2048 #define MAX_AREAS 2048
#define MAX_MIX 128 #define MAX_MIX 128
#define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS) #define CHECK_FREE_PORT(this,d,p) (p <= pw_map_get_size(&this->ports[d]) && !CHECK_PORT(this,d,p))
#define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS) #define CHECK_PORT(this,d,p) (pw_map_lookup(&this->ports[d], p) != NULL)
#define CHECK_PORT_ID(this,d,p) (CHECK_IN_PORT_ID(this,d,p) || CHECK_OUT_PORT_ID(this,d,p)) #define GET_PORT(this,d,p) (pw_map_lookup(&this->ports[d], p))
#define CHECK_FREE_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->ports[d][p] == NULL)
#define CHECK_FREE_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->ports[d][p] == NULL)
#define CHECK_FREE_PORT(this,d,p) (CHECK_FREE_IN_PORT (this,d,p) || CHECK_FREE_OUT_PORT (this,d,p))
#define CHECK_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->ports[d][p])
#define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->ports[d][p])
#define CHECK_PORT(this,d,p) (CHECK_IN_PORT (this,d,p) || CHECK_OUT_PORT (this,d,p))
#define GET_IN_PORT(this,p) (this->ports[SPA_DIRECTION_INPUT][p])
#define GET_OUT_PORT(this,p) (this->ports[SPA_DIRECTION_OUTPUT][p])
#define GET_PORT(this,d,p) (this->ports[d][p])
#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers) #define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers)
@ -130,8 +116,7 @@ struct node {
struct spa_source data_source; struct spa_source data_source;
int writefd; int writefd;
uint32_t n_ports[2]; struct pw_map ports[2];
struct port *ports[2][MAX_INPUTS];
struct port dummy; struct port dummy;
@ -431,19 +416,19 @@ static int impl_node_add_listener(void *object,
{ {
struct node *this = object; struct node *this = object;
struct spa_hook_list save; struct spa_hook_list save;
uint32_t i; union pw_map_item *item;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
spa_hook_list_isolate(&this->hooks, &save, listener, events, data); spa_hook_list_isolate(&this->hooks, &save, listener, events, data);
for (i = 0; i < MAX_INPUTS; i++) { pw_array_for_each(item, &this->ports[SPA_DIRECTION_INPUT].items) {
if (this->ports[SPA_DIRECTION_INPUT][i]) if (item->data)
emit_port_info(this, this->ports[SPA_DIRECTION_INPUT][i]); emit_port_info(this, item->data);
} }
for (i = 0; i < MAX_OUTPUTS; i++) { pw_array_for_each(item, &this->ports[SPA_DIRECTION_OUTPUT].items) {
if (this->ports[SPA_DIRECTION_OUTPUT][i]) if (item->data)
emit_port_info(this, this->ports[SPA_DIRECTION_OUTPUT][i]); emit_port_info(this, item->data);
} }
spa_hook_list_join(&this->hooks, &save); spa_hook_list_join(&this->hooks, &save);
@ -536,10 +521,8 @@ clear_port(struct node *this, struct port *port)
pw_array_clear(&port->mix); pw_array_clear(&port->mix);
pw_array_init(&port->mix, sizeof(struct mix) * 2); pw_array_init(&port->mix, sizeof(struct mix) * 2);
if (this->ports[port->direction][port->id] == port) { pw_map_insert_at(&this->ports[port->direction], port->id, NULL);
this->ports[port->direction][port->id] = NULL;
this->n_ports[port->direction]--;
}
if (!port->removed) if (!port->removed)
spa_node_emit_port_info(&this->hooks, port->direction, port->id, NULL); spa_node_emit_port_info(&this->hooks, port->direction, port->id, NULL);
} }
@ -589,9 +572,9 @@ impl_node_port_enum_params(void *object, int seq,
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL); spa_return_val_if_fail(num != 0, -EINVAL);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(port != NULL, -EINVAL);
pw_log_debug("%p: seq:%d port %d.%d id:%u start:%u num:%u n_params:%d", pw_log_debug("%p: seq:%d port %d.%d id:%u start:%u num:%u n_params:%d",
this, seq, direction, port_id, id, start, num, port->n_params); this, seq, direction, port_id, id, start, num, port->n_params);
@ -640,15 +623,15 @@ impl_node_port_set_param(void *object,
struct mix *mix; struct mix *mix;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
if(!CHECK_PORT(this, direction, port_id))
port = GET_PORT(this, direction, port_id);
if(port == NULL)
return param == NULL ? 0 : -EINVAL; return param == NULL ? 0 : -EINVAL;
pw_log_debug("%p: port %d.%d set param %s %d", this, pw_log_debug("%p: port %d.%d set param %s %d", this,
direction, port_id, direction, port_id,
spa_debug_type_find_name(spa_type_param, id), id); spa_debug_type_find_name(spa_type_param, id), id);
port = GET_PORT(this, direction, port_id);
if (id == SPA_PARAM_Format) { if (id == SPA_PARAM_Format) {
pw_array_for_each(mix, &port->mix) pw_array_for_each(mix, &port->mix)
clear_buffers(this, mix); clear_buffers(this, mix);
@ -678,10 +661,9 @@ static int do_port_set_io(struct impl *impl,
direction == SPA_DIRECTION_INPUT ? "input" : "output", direction == SPA_DIRECTION_INPUT ? "input" : "output",
port_id, mix_id, data, size); port_id, mix_id, data, size);
if (!CHECK_PORT(this, direction, port_id))
return data == NULL ? 0 : -EINVAL;
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
if (port == NULL)
return data == NULL ? 0 : -EINVAL;
if ((mix = find_mix(port, mix_id)) == NULL || !mix->valid) if ((mix = find_mix(port, mix_id)) == NULL || !mix->valid)
return -EINVAL; return -EINVAL;
@ -743,13 +725,13 @@ do_port_use_buffers(struct impl *impl,
uint32_t i, j, peer_id; uint32_t i, j, peer_id;
struct pw_client_node_buffer *mb; struct pw_client_node_buffer *mb;
if (!CHECK_PORT(this, direction, port_id)) p = GET_PORT(this, direction, port_id);
if (p == NULL)
return n_buffers == 0 ? 0 : -EINVAL; return n_buffers == 0 ? 0 : -EINVAL;
if (n_buffers > MAX_BUFFERS) if (n_buffers > MAX_BUFFERS)
return -ENOSPC; return -ENOSPC;
p = GET_PORT(this, direction, port_id);
spa_log_debug(this->log, "%p: %s port %d.%d use buffers %p %u flags:%08x", this, spa_log_debug(this->log, "%p: %s port %d.%d use buffers %p %u flags:%08x", this,
direction == SPA_DIRECTION_INPUT ? "input" : "output", direction == SPA_DIRECTION_INPUT ? "input" : "output",
@ -911,7 +893,7 @@ impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
struct node *this = object; struct node *this = object;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(CHECK_OUT_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); spa_return_val_if_fail(CHECK_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL);
spa_log_trace_fp(this->log, "reuse buffer %d", buffer_id); spa_log_trace_fp(this->log, "reuse buffer %d", buffer_id);
@ -1002,8 +984,6 @@ client_node_port_update(void *data,
spa_log_debug(this->log, "%p: got port update change:%08x params:%d", spa_log_debug(this->log, "%p: got port update change:%08x params:%d",
this, change_mask, n_params); this, change_mask, n_params);
if (!CHECK_PORT_ID(this, direction, port_id))
return -EINVAL;
remove = (change_mask == 0); remove = (change_mask == 0);
@ -1018,6 +998,9 @@ client_node_port_update(void *data,
struct port *target; struct port *target;
if (port == NULL) { if (port == NULL) {
if (!CHECK_FREE_PORT(this, direction, port_id))
return -EINVAL;
target = &this->dummy; target = &this->dummy;
spa_zero(this->dummy); spa_zero(this->dummy);
target->direction = direction; target->direction = direction;
@ -1067,9 +1050,8 @@ static int client_node_port_buffers(void *data,
direction == SPA_DIRECTION_INPUT ? "input" : "output", direction == SPA_DIRECTION_INPUT ? "input" : "output",
port_id, mix_id, buffers, n_buffers); port_id, mix_id, buffers, n_buffers);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
p = GET_PORT(this, direction, port_id); p = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(p != NULL, -EINVAL);
if (direction == SPA_DIRECTION_OUTPUT) if (direction == SPA_DIRECTION_OUTPUT)
mix_id = SPA_ID_INVALID; mix_id = SPA_ID_INVALID;
@ -1365,6 +1347,8 @@ static void node_free(void *data)
if (impl->io_areas) if (impl->io_areas)
pw_memblock_unref(impl->io_areas); pw_memblock_unref(impl->io_areas);
pw_map_clear(&impl->node.ports[0]);
pw_map_clear(&impl->node.ports[1]);
pw_map_clear(&impl->io_map); pw_map_clear(&impl->io_map);
if (impl->fds[0] != -1) if (impl->fds[0] != -1)
@ -1556,8 +1540,7 @@ static void node_port_init(void *data, struct pw_impl_port *port)
&impl_port_mix, p); &impl_port_mix, p);
ensure_mix(impl, p, SPA_ID_INVALID); ensure_mix(impl, p, SPA_ID_INVALID);
this->ports[p->direction][p->id] = p; pw_map_insert_at(&this->ports[p->direction], p->id, p);
this->n_ports[p->direction]++;
return; return;
} }
@ -1727,6 +1710,8 @@ struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource
impl->node.client = client; impl->node.client = client;
this->flags = do_register ? 0 : 1; this->flags = do_register ? 0 : 1;
pw_map_init(&impl->node.ports[0], 64, 64);
pw_map_init(&impl->node.ports[1], 64, 64);
pw_map_init(&impl->io_map, 64, 64); pw_map_init(&impl->io_map, 64, 64);
this->resource = resource; this->resource = resource;