diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index 5069fa2c3..c9eae9072 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -99,14 +99,14 @@ struct globals { jack_thread_creator_t creator; pthread_mutex_t lock; struct pw_array descriptions; - struct spa_list free_objects[3]; - struct pw_map cache; + struct spa_list free_objects; }; static struct globals globals; static bool mlock_warned = false; -#define OBJECT_CHUNK 8 +#define OBJECT_CHUNK 8 +#define RECYCLE_THRESHOLD 128 typedef void (*mix2_func) (float *dst, float *src1, float *src2, int n_samples); @@ -122,7 +122,6 @@ struct object { #define INTERFACE_Link 2 uint32_t type; uint32_t id; - uint32_t idx; union { struct { @@ -137,8 +136,6 @@ struct object { bool src_ours; bool dst_ours; bool is_complete; - uint32_t src_idx; - uint32_t dst_idx; struct port *our_input; struct port *our_output; } port_link; @@ -164,6 +161,7 @@ struct object { struct spa_hook proxy_listener; struct spa_hook object_listener; unsigned int removing:1; + unsigned int removed:1; }; struct midi_buffer { @@ -264,10 +262,8 @@ struct context { struct pw_context *context; pthread_mutex_t lock; /* protects map and lists below, in addition to thread_lock */ - struct pw_map globals; - struct spa_list ports; - struct spa_list nodes; - struct spa_list links; + struct spa_list objects; + uint32_t free_count; }; #define GET_DIRECTION(f) ((f) & JackPortIsInput ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT) @@ -420,57 +416,64 @@ static void init_port_pool(struct client *c, enum spa_direction direction) c->n_port_pool[direction] = 0; } -static struct object * find_cache(uint32_t type, uint32_t idx) -{ - struct object *o; - pthread_mutex_lock(&globals.lock); - o = pw_map_lookup(&globals.cache, idx); - if (o != NULL && o->type != type) - o = NULL; - pthread_mutex_unlock(&globals.lock); - return o; -} - static struct object * alloc_object(struct client *c, int type) { struct object *o; int i; pthread_mutex_lock(&globals.lock); - if (spa_list_is_empty(&globals.free_objects[type])) { + if (spa_list_is_empty(&globals.free_objects)) { o = calloc(OBJECT_CHUNK, sizeof(struct object)); if (o == NULL) { pthread_mutex_unlock(&globals.lock); return NULL; } - for (i = 0; i < OBJECT_CHUNK; i++) { - o[i].idx = pw_map_insert_new(&globals.cache, &o[i]); - spa_list_append(&globals.free_objects[type], &o[i].link); - } + for (i = 0; i < OBJECT_CHUNK; i++) + spa_list_append(&globals.free_objects, &o[i].link); } - o = spa_list_first(&globals.free_objects[type], struct object, link); + o = spa_list_first(&globals.free_objects, struct object, link); spa_list_remove(&o->link); pthread_mutex_unlock(&globals.lock); o->client = c; + o->removed = false; o->type = type; pw_log_debug("%p: object:%p type:%d", c, o, type); return o; } +static void recycle_objects(struct client *c, uint32_t remain) +{ + struct object *o, *t; + pthread_mutex_lock(&globals.lock); + spa_list_for_each_safe(o, t, &c->context.objects, link) { + if (o->removed) { + pw_log_info("%p: recycle object:%p type:%d id:%u", c, o, o->type, o->id); + spa_list_remove(&o->link); + memset(o, 0, sizeof(struct object)); + spa_list_append(&globals.free_objects, &o->link); + if (--c->context.free_count == remain) + break; + } + } + pthread_mutex_unlock(&globals.lock); +} + +/* JACK clients expect the objects to hang around after + * they are unregistered and freed. We mark the object removed and + * move it to the end of the queue. */ static void free_object(struct client *c, struct object *o) { + pw_log_debug("%p: object:%p type:%d", c, o, o->type); pthread_mutex_lock(&c->context.lock); spa_list_remove(&o->link); + o->removed = true; + spa_list_append(&c->context.objects, &o->link); + if (++c->context.free_count > RECYCLE_THRESHOLD) + recycle_objects(c, RECYCLE_THRESHOLD / 2); pthread_mutex_unlock(&c->context.lock); - pw_log_debug("%p: object:%p type:%d", c, o, o->type); - - pthread_mutex_lock(&globals.lock); - spa_list_append(&globals.free_objects[o->type], &o->link); - o->client = NULL; - pthread_mutex_unlock(&globals.lock); } static void init_mix(struct mix *mix, uint32_t mix_id, struct port *port) @@ -603,7 +606,7 @@ static struct port * alloc_port(struct client *c, enum spa_direction direction) spa_list_append(&c->ports[direction], &p->link); pthread_mutex_lock(&c->context.lock); - spa_list_append(&c->context.ports, &o->link); + spa_list_append(&c->context.objects, &o->link); pthread_mutex_unlock(&c->context.lock); return p; @@ -630,8 +633,10 @@ static struct object *find_node(struct client *c, const char *name) { struct object *o; - spa_list_for_each(o, &c->context.nodes, link) { - if (!o->removing && spa_streq(o->node.name, name)) + spa_list_for_each(o, &c->context.objects, link) { + if (o->removing || o->removed || o->type != INTERFACE_Node) + continue; + if (spa_streq(o->node.name, name)) return o; } return NULL; @@ -656,7 +661,9 @@ static struct object *find_port(struct client *c, const char *name) { struct object *o; - spa_list_for_each(o, &c->context.ports, link) { + spa_list_for_each(o, &c->context.objects, link) { + if (o->type != INTERFACE_Port || o->removed) + continue; if (spa_streq(o->port.name, name) || spa_streq(o->port.alias1, name) || spa_streq(o->port.alias2, name)) @@ -667,9 +674,19 @@ static struct object *find_port(struct client *c, const char *name) return NULL; } +static struct object *find_object(struct client *c, uint32_t id) +{ + struct object *o; + spa_list_for_each(o, &c->context.objects, link) { + if (o->id == id) + return o; + } + return NULL; +} + static struct object *find_id(struct client *c, uint32_t id, bool valid) { - struct object *o = pw_map_lookup(&c->context.globals, id); + struct object *o = find_object(c, id); if (o != NULL && (!valid || o->client == c)) return o; return NULL; @@ -687,7 +704,9 @@ static struct object *find_link(struct client *c, uint32_t src, uint32_t dst) { struct object *l; - spa_list_for_each(l, &c->context.links, link) { + spa_list_for_each(l, &c->context.objects, link) { + if (l->type != INTERFACE_Link || l->removed) + continue; if (l->port_link.src == src && l->port_link.dst == dst) { return l; @@ -2408,11 +2427,10 @@ static int client_node_port_set_mix_info(void *object, if (!l->port_link.is_complete) { l->port_link.is_complete = true; - pw_log_info("%p: our link %d/%u -> %d/%u completed", c, - l->port_link.src, l->port_link.src_idx, - l->port_link.dst, l->port_link.dst_idx); + pw_log_info("%p: our link %d -> %d completed", c, + l->port_link.src, l->port_link.dst); do_callback(c, connect_callback, - l->port_link.src_idx, l->port_link.dst_idx, 1, c->connect_arg); + l->port_link.src, l->port_link.dst, 1, c->connect_arg); recompute_latencies(c); do_callback(c, graph_callback, c->graph_arg); } @@ -2622,7 +2640,6 @@ static void registry_event_global(void *data, uint32_t id, struct client *c = (struct client *) data; struct object *o, *ot, *op; const char *str; - size_t size; bool is_first = false, graph_changed = false; if (props == NULL) @@ -2686,7 +2703,7 @@ static void registry_event_global(void *data, uint32_t id, pw_log_debug("%p: add node %d", c, id); pthread_mutex_lock(&c->context.lock); - spa_list_append(&c->context.nodes, &o->link); + spa_list_append(&c->context.objects, &o->link); pthread_mutex_unlock(&c->context.lock); } else if (spa_streq(type, PW_TYPE_INTERFACE_Port)) { @@ -2745,30 +2762,16 @@ static void registry_event_global(void *data, uint32_t id, snprintf(tmp, sizeof(tmp), "%s:%s", c->name, str); o = find_port(c, tmp); if (o != NULL) - pw_log_debug("%p: %s found our port %p", c, tmp, o); + pw_log_info("%p: %s found our port %p", c, tmp, o); } if (o == NULL) { + if ((ot = find_type(c, node_id, INTERFACE_Node, true)) == NULL) + goto exit; + o = alloc_object(c, INTERFACE_Port); if (o == NULL) goto exit; - pthread_mutex_lock(&c->context.lock); - spa_list_append(&c->context.ports, &o->link); - pthread_mutex_unlock(&c->context.lock); - - if ((ot = find_type(c, node_id, INTERFACE_Node, true)) == NULL) - goto exit_free; - - if (is_monitor && !c->merge_monitor) - snprintf(tmp, sizeof(tmp), "%.*s%s:%s", - (int)(JACK_CLIENT_NAME_SIZE-(sizeof(MONITOR_EXT)-1)), - ot->node.name, MONITOR_EXT, str); - else - snprintf(tmp, sizeof(tmp), "%s:%s", ot->node.name, str); - - if (c->filter_name) - filter_name(tmp, FILTER_PORT); - o->port.system_id = 0; o->port.port_id = SPA_ID_INVALID; o->port.priority = ot->node.priority; @@ -2789,6 +2792,26 @@ static void registry_event_global(void *data, uint32_t id, pw_port_subscribe_params((struct pw_port*)o->proxy, ids, 1); } + pthread_mutex_lock(&c->context.lock); + spa_list_append(&c->context.objects, &o->link); + pthread_mutex_unlock(&c->context.lock); + + if (is_monitor && !c->merge_monitor) + snprintf(tmp, sizeof(tmp), "%.*s%s:%s", + (int)(JACK_CLIENT_NAME_SIZE-(sizeof(MONITOR_EXT)-1)), + ot->node.name, MONITOR_EXT, str); + else + snprintf(tmp, sizeof(tmp), "%s:%s", ot->node.name, str); + + if (c->filter_name) + filter_name(tmp, FILTER_PORT); + + op = find_port(c, tmp); + if (op != NULL) + snprintf(o->port.name, sizeof(o->port.name), "%.*s-%d", + (int)(sizeof(tmp)-11), tmp, id); + else + snprintf(o->port.name, sizeof(o->port.name), "%s", tmp); } if ((str = spa_dict_lookup(props, PW_KEY_OBJECT_PATH)) != NULL) @@ -2810,14 +2833,7 @@ static void registry_event_global(void *data, uint32_t id, o->port.node_id = node_id; o->port.is_monitor = is_monitor; - op = find_port(c, tmp); - if (op != NULL && op != o) - snprintf(o->port.name, sizeof(o->port.name), "%.*s-%d", - (int)(sizeof(tmp)-11), tmp, id); - else - snprintf(o->port.name, sizeof(o->port.name), "%s", tmp); - - pw_log_debug("%p: add port %d name:%s %d", c, id, + pw_log_debug("%p: %p add port %d name:%s %d", c, o, id, o->port.name, type_id); } else if (spa_streq(type, PW_TYPE_INTERFACE_Link)) { @@ -2826,7 +2842,7 @@ static void registry_event_global(void *data, uint32_t id, o = alloc_object(c, INTERFACE_Link); pthread_mutex_lock(&c->context.lock); - spa_list_append(&c->context.links, &o->link); + spa_list_append(&c->context.objects, &o->link); pthread_mutex_unlock(&c->context.lock); if ((str = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_PORT)) == NULL) @@ -2835,7 +2851,6 @@ static void registry_event_global(void *data, uint32_t id, if ((p = find_type(c, o->port_link.src, INTERFACE_Port, true)) == NULL) goto exit_free; - o->port_link.src_idx = p->idx; o->port_link.src_ours = p->port.port != NULL && p->port.port->client == c; @@ -2849,7 +2864,6 @@ static void registry_event_global(void *data, uint32_t id, if ((p = find_type(c, o->port_link.dst, INTERFACE_Port, true)) == NULL) goto exit_free; - o->port_link.dst_idx = p->idx; o->port_link.dst_ours = p->port.port != NULL && p->port.port->client == c; if (o->port_link.dst_ours) @@ -2887,13 +2901,6 @@ static void registry_event_global(void *data, uint32_t id, o->id = id; - pthread_mutex_lock(&c->context.lock); - size = pw_map_get_size(&c->context.globals); - while (id > size) - pw_map_insert_at(&c->context.globals, size++, NULL); - pw_map_insert_at(&c->context.globals, id, o); - pthread_mutex_unlock(&c->context.lock); - switch (o->type) { case INTERFACE_Node: if (is_first) { @@ -2905,20 +2912,19 @@ static void registry_event_global(void *data, uint32_t id, break; case INTERFACE_Port: - pw_log_info("%p: port added %d/%d \"%s\"", c, o->id, o->idx, o->port.name); + pw_log_info("%p: port added %d \"%s\"", c, o->id, o->port.name); do_callback(c, portregistration_callback, - o->idx, 1, c->portregistration_arg); + o->id, 1, c->portregistration_arg); graph_changed = true; break; case INTERFACE_Link: - pw_log_info("%p: link %u/%u %d/%d -> %d/%d added complete:%d", c, o->id, o->idx, - o->port_link.src, o->port_link.src_idx, - o->port_link.dst, o->port_link.dst_idx, + pw_log_info("%p: link %u %d -> %d added complete:%d", c, + o->id, o->port_link.src, o->port_link.dst, o->port_link.is_complete); if (o->port_link.is_complete) { do_callback(c, connect_callback, - o->port_link.src_idx, o->port_link.dst_idx, 1, c->connect_arg); + o->port_link.src, o->port_link.dst, 1, c->connect_arg); graph_changed = true; } break; @@ -2968,21 +2974,20 @@ static void registry_event_global_remove(void *object, uint32_t id) } break; case INTERFACE_Port: - pw_log_info("%p: port %u/%u removed \"%s\"", c, o->id, o->idx, o->port.name); + pw_log_info("%p: port %u removed \"%s\"", c, o->id, o->port.name); do_callback(c, portregistration_callback, - o->idx, 0, c->portregistration_arg); + o->id, 0, c->portregistration_arg); graph_changed = true; break; case INTERFACE_Link: if (o->port_link.is_complete && find_type(c, o->port_link.src, INTERFACE_Port, true) != NULL && find_type(c, o->port_link.dst, INTERFACE_Port, true) != NULL) { - pw_log_info("%p: link %u/%u %d/%u -> %d/%u removed", c, o->id, o->idx, - o->port_link.src, o->port_link.src_idx, - o->port_link.dst, o->port_link.dst_idx); + pw_log_info("%p: link %u %d -> %d removed", c, o->id, + o->port_link.src, o->port_link.dst); o->port_link.is_complete = false; do_callback(c, connect_callback, - o->port_link.src_idx, o->port_link.dst_idx, 0, c->connect_arg); + o->port_link.src, o->port_link.dst, 0, c->connect_arg); graph_changed = true; } else pw_log_warn("unlink between unknown ports %d and %d", @@ -2994,16 +2999,8 @@ static void registry_event_global_remove(void *object, uint32_t id) do_callback(c, graph_callback, c->graph_arg); } - /* JACK clients expect the objects to hang around after - * they are unregistered. We keep the memory around for that - * reason but reuse it when we can. */ o->removing = false; free_object(c, o); - /* we keep the object available with the id because jack clients - * tend to access the objects with it later. - * - * pw_map_insert_at(&c->context.globals, id, NULL); - */ return; } @@ -3136,9 +3133,7 @@ jack_client_t * jack_client_open (const char *client_name, pthread_mutex_init(&client->context.lock, NULL); pthread_mutex_init(&client->rt_lock, NULL); - spa_list_init(&client->context.nodes); - spa_list_init(&client->context.ports); - spa_list_init(&client->context.links); + spa_list_init(&client->context.objects); support = pw_context_get_support(client->context.context, &n_support); @@ -3166,8 +3161,6 @@ jack_client_t * jack_client_open (const char *client_name, init_port_pool(client, SPA_DIRECTION_INPUT); init_port_pool(client, SPA_DIRECTION_OUTPUT); - pw_map_init(&client->context.globals, 64, 64); - pw_thread_loop_start(client->context.loop); pw_thread_loop_lock(client->context.loop); @@ -3325,14 +3318,10 @@ int jack_client_close (jack_client_t *client) pw_log_debug("%p: free", client); - spa_list_consume(o, &c->context.nodes, link) - free_object(c, o); - spa_list_consume(o, &c->context.ports, link) - free_object(c, o); - spa_list_consume(o, &c->context.links, link) + spa_list_consume(o, &c->context.objects, link) free_object(c, o); + recycle_objects(c, 0); - pw_map_clear(&c->context.globals); pthread_mutex_destroy(&c->context.lock); pthread_mutex_destroy(&c->rt_lock); pw_properties_free(c->props); @@ -3413,7 +3402,9 @@ char *jack_get_uuid_for_client_name (jack_client_t *client, pthread_mutex_lock(&c->context.lock); - spa_list_for_each(o, &c->context.nodes, link) { + spa_list_for_each(o, &c->context.objects, link) { + if (o->type != INTERFACE_Node) + continue; if (spa_streq(o->node.name, client_name) || (monitor && spa_strneq(o->node.name, client_name, strlen(client_name) - strlen(MONITOR_EXT)))) { @@ -3445,7 +3436,9 @@ char *jack_get_client_name_by_uuid (jack_client_t *client, monitor = uuid & (1 << 30); pthread_mutex_lock(&c->context.lock); - spa_list_for_each(o, &c->context.nodes, link) { + spa_list_for_each(o, &c->context.objects, link) { + if (o->type != INTERFACE_Node) + continue; if (client_make_uuid(o->id, monitor) == uuid) { pw_log_debug("%p: uuid %s (%"PRIu64")-> %s", client, client_uuid, uuid, o->node.name); @@ -3534,10 +3527,11 @@ int jack_deactivate (jack_client_t *client) c->activation->pending_new_pos = false; c->activation->pending_sync = false; - spa_list_for_each(l, &c->context.links, link) { - if (l->port_link.src_ours || l->port_link.dst_ours) { + spa_list_for_each(l, &c->context.objects, link) { + if (l->type != INTERFACE_Link || l->removed) + continue; + if (l->port_link.src_ours || l->port_link.dst_ours) pw_registry_destroy(c->registry, l->id); - } } res = do_sync(c); @@ -4044,8 +4038,8 @@ jack_port_t * jack_port_register (jack_client_t *client, spa_return_val_if_fail(port_name != NULL, NULL); spa_return_val_if_fail(port_type != NULL, NULL); - pw_log_info("%p: port register \"%s\" \"%s\" %08lx %ld", - c, port_name, port_type, flags, buffer_frames); + pw_log_info("%p: port register \"%s:%s\" \"%s\" %08lx %ld", + c, c->name, port_name, port_type, flags, buffer_frames); if (flags & JackPortIsInput) direction = PW_DIRECTION_INPUT; @@ -4422,7 +4416,9 @@ int jack_port_connected (const jack_port_t *port) c = o->client; pthread_mutex_lock(&c->context.lock); - spa_list_for_each(l, &c->context.links, link) { + spa_list_for_each(l, &c->context.objects, link) { + if (l->type != INTERFACE_Link || l->removed) + continue; if (!l->port_link.is_complete) continue; if (l->port_link.src == o->id || @@ -4505,7 +4501,9 @@ const char ** jack_port_get_all_connections (const jack_client_t *client, res = malloc(sizeof(char*) * (CONNECTION_NUM_FOR_PORT + 1)); pthread_mutex_lock(&c->context.lock); - spa_list_for_each(l, &c->context.links, link) { + spa_list_for_each(l, &c->context.objects, link) { + if (l->type != INTERFACE_Link || l->removed) + continue; if (l->port_link.src == o->id) p = find_type(c, l->port_link.dst, INTERFACE_Port, true); else if (l->port_link.dst == o->id) @@ -4970,7 +4968,9 @@ int jack_port_disconnect (jack_client_t *client, jack_port_t *port) pw_thread_loop_lock(c->context.loop); - spa_list_for_each(l, &c->context.links, link) { + spa_list_for_each(l, &c->context.objects, link) { + if (l->type != INTERFACE_Link || l->removed) + continue; if (l->port_link.src == o->id || l->port_link.dst == o->id) { pw_registry_destroy(c->registry, l->id); @@ -5284,7 +5284,9 @@ const char ** jack_get_ports (jack_client_t *client, pthread_mutex_lock(&c->context.lock); count = 0; - spa_list_for_each(o, &c->context.ports, link) { + spa_list_for_each(o, &c->context.objects, link) { + if (o->type != INTERFACE_Port || o->removed) + continue; pw_log_debug("%p: check port type:%d flags:%08lx name:\"%s\"", c, o->port.type_id, o->port.flags, o->port.name); if (count == JACK_PORT_MAX) @@ -5362,9 +5364,10 @@ jack_port_t * jack_port_by_id (jack_client_t *client, spa_return_val_if_fail(c != NULL, NULL); - res = find_cache(INTERFACE_Port, port_id); - + pthread_mutex_lock(&c->context.lock); + res = find_type(c, port_id, INTERFACE_Port, false); pw_log_debug("%p: port %d -> %p", c, port_id, res); + pthread_mutex_unlock(&c->context.lock); if (res == NULL) pw_log_info("%p: port %d not found", c, port_id); @@ -6169,8 +6172,5 @@ static void reg(void) PW_LOG_TOPIC_INIT(jack_log_topic); pthread_mutex_init(&globals.lock, NULL); pw_array_init(&globals.descriptions, 16); - spa_list_init(&globals.free_objects[INTERFACE_Port]); - spa_list_init(&globals.free_objects[INTERFACE_Node]); - spa_list_init(&globals.free_objects[INTERFACE_Link]); - pw_map_init(&globals.cache, 64, 64); + spa_list_init(&globals.free_objects); }