jack: don't emit callbacks from do_wait()

Some functions need to wait for the reply of the server before they can
complete but the JACK API does not allow us to emit notifications while
blocking a function.

Delay emiting notifications when we are in selected methods and send a
notify to an eventfd to call the queued notifications.

Fixes #3183
This commit is contained in:
Wim Taymans 2023-05-02 14:41:46 +02:00
parent bcec0ad103
commit be59d2b5d0

View file

@ -145,6 +145,8 @@ struct object {
struct spa_hook object_listener; struct spa_hook object_listener;
unsigned int removing:1; unsigned int removing:1;
unsigned int removed:1; unsigned int removed:1;
unsigned int register_pending:1;
int register_arg;
}; };
struct midi_buffer { struct midi_buffer {
@ -299,6 +301,7 @@ struct client {
uint32_t node_id; uint32_t node_id;
uint32_t serial; uint32_t serial;
struct spa_source *socket_source; struct spa_source *socket_source;
struct spa_source *notify_event;
JackThreadCallback thread_callback; JackThreadCallback thread_callback;
void *thread_arg; void *thread_arg;
@ -389,6 +392,8 @@ struct client {
unsigned int fix_midi_events:1; unsigned int fix_midi_events:1;
unsigned int global_buffer_size:1; unsigned int global_buffer_size:1;
unsigned int passive_links:1; unsigned int passive_links:1;
unsigned int graph_callback_pending:1;
unsigned int in_function:1;
char filter_char; char filter_char;
uint32_t max_ports; uint32_t max_ports;
@ -801,7 +806,7 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto
#define do_callback_expr(c,expr,callback,...) \ #define do_callback_expr(c,expr,callback,...) \
({ \ ({ \
if (c->callback && c->active) { \ if (c->callback) { \
pw_thread_loop_unlock(c->context.loop); \ pw_thread_loop_unlock(c->context.loop); \
if (c->locked_process) \ if (c->locked_process) \
pthread_mutex_lock(&c->rt_lock); \ pthread_mutex_lock(&c->rt_lock); \
@ -812,8 +817,7 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto
pthread_mutex_unlock(&c->rt_lock); \ pthread_mutex_unlock(&c->rt_lock); \
pw_thread_loop_lock(c->context.loop); \ pw_thread_loop_lock(c->context.loop); \
} else { \ } else { \
if (c->active) \ (expr); \
(expr); \
pw_log_debug("skip " #callback \ pw_log_debug("skip " #callback \
" cb:%p active:%d", c->callback, \ " cb:%p active:%d", c->callback, \
c->active); \ c->active); \
@ -848,6 +852,68 @@ jack_get_version_string(void)
return name; return name;
} }
static void recompute_latencies(struct client *c)
{
do_callback(c, latency_callback, JackCaptureLatency, c->latency_arg);
do_callback(c, latency_callback, JackPlaybackLatency, c->latency_arg);
}
static void emit_callbacks(struct client *c)
{
struct object *o, *t;
spa_list_for_each_safe(o, t, &c->context.objects, link) {
if (o->register_pending) {
switch (o->type) {
case INTERFACE_Node:
do_callback(c, registration_callback,
o->node.name,
o->register_arg,
c->registration_arg);
break;
case INTERFACE_Port:
do_callback(c, portregistration_callback,
o->serial,
o->register_arg,
c->portregistration_arg);
break;
case INTERFACE_Link:
do_callback(c, connect_callback,
o->port_link.src_serial,
o->port_link.dst_serial,
o->register_arg,
c->connect_arg);
break;
}
o->register_pending = false;
}
if (o->removing) {
o->removing = false;
free_object(c, o);
}
}
if (c->graph_callback_pending) {
c->graph_callback_pending = false;
recompute_latencies(c);
do_callback(c, graph_callback, c->graph_arg);
}
}
static void trigger_callbacks(struct client *c)
{
if (c->in_function) {
pw_loop_signal_event(c->context.l, c->notify_event);
} else {
emit_callbacks(c);
}
}
static void notify_event(void *data, uint64_t count)
{
struct client *c = data;
emit_callbacks(c);
}
static void on_sync_reply(void *data, uint32_t id, int seq) static void on_sync_reply(void *data, uint32_t id, int seq)
{ {
struct client *client = data; struct client *client = data;
@ -1335,12 +1401,6 @@ static inline jack_transport_state_t position_to_jack(struct pw_node_activation
return state; return state;
} }
static void recompute_latencies(struct client *c)
{
do_callback(c, latency_callback, JackCaptureLatency, c->latency_arg);
do_callback(c, latency_callback, JackPlaybackLatency, c->latency_arg);
}
static int static int
do_buffer_frames(struct spa_loop *loop, do_buffer_frames(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -2547,10 +2607,10 @@ static int client_node_port_set_mix_info(void *data,
pw_log_info("%p: our link %u/%u -> %u/%u completed", c, pw_log_info("%p: our link %u/%u -> %u/%u completed", c,
l->port_link.src, l->port_link.src_serial, l->port_link.src, l->port_link.src_serial,
l->port_link.dst, l->port_link.dst_serial); l->port_link.dst, l->port_link.dst_serial);
do_callback(c, connect_callback, l->register_pending = true;
l->port_link.src_serial, l->port_link.dst_serial, 1, c->connect_arg); l->register_arg = 1;
recompute_latencies(c); c->graph_callback_pending = true;
do_callback(c, graph_callback, c->graph_arg); trigger_callbacks(c);
} }
} }
@ -2865,7 +2925,7 @@ static void registry_event_global(void *data, uint32_t id,
struct client *c = (struct client *) data; struct client *c = (struct client *) data;
struct object *o, *ot, *op; struct object *o, *ot, *op;
const char *str; const char *str;
bool is_first = false, graph_changed = false; bool is_first = false;
uint32_t serial; uint32_t serial;
if (props == NULL) if (props == NULL)
@ -3160,17 +3220,17 @@ static void registry_event_global(void *data, uint32_t id,
case INTERFACE_Node: case INTERFACE_Node:
if (is_first) { if (is_first) {
pw_log_info("%p: client added \"%s\"", c, o->node.name); pw_log_info("%p: client added \"%s\"", c, o->node.name);
do_callback(c, registration_callback, o->register_pending = true;
o->node.name, 1, c->registration_arg); o->register_arg = 1;
graph_changed = true; c->graph_callback_pending = true;
} }
break; break;
case INTERFACE_Port: case INTERFACE_Port:
pw_log_info("%p: port added %u/%u \"%s\"", c, o->id, o->serial, o->port.name); pw_log_info("%p: port added %u/%u \"%s\"", c, o->id, o->serial, o->port.name);
do_callback(c, portregistration_callback, o->register_pending = true;
o->serial, 1, c->portregistration_arg); o->register_arg = 1;
graph_changed = true; c->graph_callback_pending = true;
break; break;
case INTERFACE_Link: case INTERFACE_Link:
@ -3179,17 +3239,13 @@ static void registry_event_global(void *data, uint32_t id,
o->port_link.dst, o->port_link.dst_serial, o->port_link.dst, o->port_link.dst_serial,
o->port_link.is_complete); o->port_link.is_complete);
if (o->port_link.is_complete) { if (o->port_link.is_complete) {
do_callback(c, connect_callback, o->register_pending = true;
o->port_link.src_serial, o->register_arg = 1;
o->port_link.dst_serial, 1, c->connect_arg); c->graph_callback_pending = true;
graph_changed = true;
} }
break; break;
} }
if (graph_changed) { trigger_callbacks(c);
recompute_latencies(c);
do_callback(c, graph_callback, c->graph_arg);
}
exit: exit:
return; return;
@ -3202,7 +3258,6 @@ static void registry_event_global_remove(void *data, uint32_t id)
{ {
struct client *c = (struct client *) data; struct client *c = (struct client *) data;
struct object *o; struct object *o;
bool graph_changed = false;
pw_log_debug("%p: removed: %u", c, id); pw_log_debug("%p: removed: %u", c, id);
@ -3225,16 +3280,16 @@ static void registry_event_global_remove(void *data, uint32_t id)
} }
if (find_node(c, o->node.name) == NULL) { if (find_node(c, o->node.name) == NULL) {
pw_log_info("%p: client %u removed \"%s\"", c, o->id, o->node.name); pw_log_info("%p: client %u removed \"%s\"", c, o->id, o->node.name);
do_callback(c, registration_callback, o->register_pending = true;
o->node.name, 0, c->registration_arg); o->register_arg = 0;
graph_changed = true; c->graph_callback_pending = true;
} }
break; break;
case INTERFACE_Port: case INTERFACE_Port:
pw_log_info("%p: port %u/%u removed \"%s\"", c, o->id, o->serial, o->port.name); pw_log_info("%p: port %u/%u removed \"%s\"", c, o->id, o->serial, o->port.name);
do_callback(c, portregistration_callback, o->register_pending = true;
o->serial, 0, c->portregistration_arg); o->register_arg = 0;
graph_changed = true; c->graph_callback_pending = true;
break; break;
case INTERFACE_Link: case INTERFACE_Link:
if (o->port_link.is_complete && if (o->port_link.is_complete &&
@ -3244,21 +3299,15 @@ static void registry_event_global_remove(void *data, uint32_t id)
o->port_link.src, o->port_link.src_serial, o->port_link.src, o->port_link.src_serial,
o->port_link.dst, o->port_link.dst_serial); o->port_link.dst, o->port_link.dst_serial);
o->port_link.is_complete = false; o->port_link.is_complete = false;
do_callback(c, connect_callback, o->register_pending = true;
o->port_link.src_serial, o->port_link.dst_serial, 0, c->connect_arg); o->register_arg = 0;
graph_changed = true; c->graph_callback_pending = true;
} else } else
pw_log_warn("unlink between unknown ports %d and %d", pw_log_warn("unlink between unknown ports %d and %d",
o->port_link.src, o->port_link.dst); o->port_link.src, o->port_link.dst);
break; break;
} }
if (graph_changed) { trigger_callbacks(c);
recompute_latencies(c);
do_callback(c, graph_callback, c->graph_arg);
}
o->removing = false;
free_object(c, o);
return; return;
} }
@ -3373,6 +3422,9 @@ jack_client_t * jack_client_open (const char *client_name,
if (client->context.context == NULL) if (client->context.context == NULL)
goto no_props; goto no_props;
client->notify_event = pw_loop_add_event(client->context.l,
notify_event, client);
client->allow_mlock = client->context.context->settings.mem_allow_mlock; client->allow_mlock = client->context.context->settings.mem_allow_mlock;
client->warn_mlock = client->context.context->settings.mem_warn_mlock; client->warn_mlock = client->context.context->settings.mem_warn_mlock;
@ -3630,6 +3682,9 @@ int jack_client_close (jack_client_t *client)
if (c->context.context) if (c->context.context)
pw_context_destroy(c->context.context); pw_context_destroy(c->context.context);
if (c->notify_event)
pw_loop_destroy_source(c->context.l, c->notify_event);
if (c->context.loop) if (c->context.loop)
pw_thread_loop_destroy(c->context.loop); pw_thread_loop_destroy(c->context.loop);
@ -3809,6 +3864,8 @@ int jack_activate (jack_client_t *client)
return 0; return 0;
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
pw_data_loop_start(c->loop); pw_data_loop_start(c->loop);
if ((res = do_activate(c)) < 0) if ((res = do_activate(c)) < 0)
@ -3823,13 +3880,16 @@ int jack_activate (jack_client_t *client)
if (o->type != INTERFACE_Port || o->port.port == NULL || if (o->type != INTERFACE_Port || o->port.port == NULL ||
o->port.port->client != c || !o->port.port->valid) o->port.port->client != c || !o->port.port->valid)
continue; continue;
do_callback(c, portregistration_callback, o->serial, 1, c->portregistration_arg); o->register_pending = true;
o->register_arg = 1;
c->graph_callback_pending = true;
} }
do_callback(c, graph_callback, c->graph_arg); trigger_callbacks(c);
done: done:
if (res < 0) if (res < 0)
pw_data_loop_stop(c->loop); pw_data_loop_stop(c->loop);
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return res; return res;
@ -3850,6 +3910,7 @@ int jack_deactivate (jack_client_t *client)
return 0; return 0;
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
c->active = false; c->active = false;
pw_data_loop_stop(c->loop); pw_data_loop_stop(c->loop);
@ -3870,12 +3931,13 @@ int jack_deactivate (jack_client_t *client)
if (o->type != INTERFACE_Port || o->port.port == NULL || if (o->type != INTERFACE_Port || o->port.port == NULL ||
o->port.port->client != c || !o->port.port->valid) o->port.port->client != c || !o->port.port->valid)
continue; continue;
pw_thread_loop_unlock(c->context.loop); o->register_pending = true;
c->portregistration_callback(o->serial, 0, c->portregistration_arg); o->register_arg = 0;
pw_thread_loop_lock(c->context.loop);
} }
res = do_sync(c); res = do_sync(c);
trigger_callbacks(c);
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return res; return res;
@ -4482,6 +4544,7 @@ jack_port_t * jack_port_register (jack_client_t *client,
param_latency_other(c, p, &params[n_params++], &b); param_latency_other(c, p, &params[n_params++], &b);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
pw_client_node_port_update(c->node, pw_client_node_port_update(c->node,
direction, direction,
@ -4496,6 +4559,7 @@ jack_port_t * jack_port_register (jack_client_t *client,
res = do_sync(c); res = do_sync(c);
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
if (res < 0) { if (res < 0) {
@ -4532,6 +4596,7 @@ int jack_port_unregister (jack_client_t *client, jack_port_t *port)
return_val_if_fail(o != NULL, -EINVAL); return_val_if_fail(o != NULL, -EINVAL);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
p = o->port.port; p = o->port.port;
if (o->type != INTERFACE_Port || p == NULL || !p->valid || if (o->type != INTERFACE_Port || p == NULL || !p->valid ||
@ -4557,6 +4622,7 @@ int jack_port_unregister (jack_client_t *client, jack_port_t *port)
} }
free_port(c, p); free_port(c, p);
done: done:
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return res; return res;
@ -5242,6 +5308,7 @@ int jack_connect (jack_client_t *client,
pw_log_info("%p: connect %s %s", client, source_port, destination_port); pw_log_info("%p: connect %s %s", client, source_port, destination_port);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
src = find_port_by_name(c, source_port); src = find_port_by_name(c, source_port);
dst = find_port_by_name(c, destination_port); dst = find_port_by_name(c, destination_port);
@ -5294,6 +5361,7 @@ int jack_connect (jack_client_t *client,
pw_proxy_destroy(proxy); pw_proxy_destroy(proxy);
exit: exit:
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return -res; return -res;
@ -5315,6 +5383,7 @@ int jack_disconnect (jack_client_t *client,
pw_log_info("%p: disconnect %s %s", client, source_port, destination_port); pw_log_info("%p: disconnect %s %s", client, source_port, destination_port);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
src = find_port_by_name(c, source_port); src = find_port_by_name(c, source_port);
dst = find_port_by_name(c, destination_port); dst = find_port_by_name(c, destination_port);
@ -5341,6 +5410,7 @@ int jack_disconnect (jack_client_t *client,
res = do_sync(c); res = do_sync(c);
exit: exit:
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return -res; return -res;
@ -5360,6 +5430,7 @@ int jack_port_disconnect (jack_client_t *client, jack_port_t *port)
pw_log_debug("%p: disconnect %p", client, port); pw_log_debug("%p: disconnect %p", client, port);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
spa_list_for_each(l, &c->context.objects, link) { spa_list_for_each(l, &c->context.objects, link) {
if (l->type != INTERFACE_Link || l->removed) if (l->type != INTERFACE_Link || l->removed)
@ -5371,6 +5442,7 @@ int jack_port_disconnect (jack_client_t *client, jack_port_t *port)
} }
res = do_sync(c); res = do_sync(c);
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return -res; return -res;
@ -5956,6 +6028,7 @@ int jack_set_sync_callback (jack_client_t *client,
return_val_if_fail(c != NULL, -EINVAL); return_val_if_fail(c != NULL, -EINVAL);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
c->sync_callback = sync_callback; c->sync_callback = sync_callback;
c->sync_arg = arg; c->sync_arg = arg;
@ -5965,6 +6038,7 @@ int jack_set_sync_callback (jack_client_t *client,
c->activation->pending_sync = true; c->activation->pending_sync = true;
done: done:
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return res; return res;
@ -6004,6 +6078,7 @@ int jack_set_timebase_callback (jack_client_t *client,
return_val_if_fail(timebase_callback != NULL, -EINVAL); return_val_if_fail(timebase_callback != NULL, -EINVAL);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
c->in_function = true;
c->timebase_callback = timebase_callback; c->timebase_callback = timebase_callback;
c->timebase_arg = arg; c->timebase_arg = arg;
@ -6017,6 +6092,7 @@ int jack_set_timebase_callback (jack_client_t *client,
c->activation->pending_new_pos = true; c->activation->pending_new_pos = true;
done: done:
c->in_function = false;
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
return res; return res;