jack: rework locking

Ensure all callbacks are called from the thread_loop and release
the thread lock before calling the callback.

Introduce a new rt_lock that is taken while the callbacks are called.
Only call the process_callback when we can acquire the rt_lock in the
data thread. This ensures the process callback is never called
concurrently with any other callback.

Give a warning when we try to call do_sync from the thread_loop
itself. We would deadlock because the thread that is supposed to do
the sync operation would be blocked in wait().

Fixes #1313
This commit is contained in:
Wim Taymans 2021-06-17 10:46:04 +02:00
parent 3e52c6598b
commit 0f9fd45a58

View file

@ -359,7 +359,7 @@ struct client {
struct spa_list target_links; struct spa_list target_links;
} rt; } rt;
int pending; pthread_mutex_t rt_lock;
unsigned int started:1; unsigned int started:1;
unsigned int active:1; unsigned int active:1;
@ -680,6 +680,29 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto
*proto_ptr = 0; *proto_ptr = 0;
} }
#define do_callback(c,callback,...) \
({ \
if (c->callback) { \
pw_thread_loop_unlock(c->context.loop); \
pthread_mutex_lock(&c->rt_lock); \
c->callback(__VA_ARGS__); \
pthread_mutex_unlock(&c->rt_lock); \
pw_thread_loop_lock(c->context.loop); \
} \
})
#define do_rt_callback_res(c,callback,...) \
({ \
int res = 0; \
if (c->callback) { \
if (pthread_mutex_trylock(&c->rt_lock) == 0) { \
res = c->callback(__VA_ARGS__); \
pthread_mutex_unlock(&c->rt_lock); \
} \
} \
res; \
})
SPA_EXPORT SPA_EXPORT
const char * const char *
jack_get_version_string(void) jack_get_version_string(void)
@ -709,8 +732,8 @@ static void on_error(void *data, uint32_t id, int seq, int res, const char *mess
if (id == PW_ID_CORE) { if (id == PW_ID_CORE) {
client->error = true; client->error = true;
client->last_res = res; client->last_res = res;
if (client->shutdown_callback && !client->destroyed) if (!client->destroyed)
client->shutdown_callback(client->shutdown_arg); do_callback(client, shutdown_callback, client->shutdown_arg);
} }
pw_thread_loop_signal(client->context.loop, false); pw_thread_loop_signal(client->context.loop, false);
} }
@ -725,6 +748,11 @@ static int do_sync(struct client *client)
{ {
int seq; int seq;
if (pw_thread_loop_in_thread(client->context.loop)) {
pw_log_warn("sync requested from callback");
return 0;
}
seq = pw_proxy_sync((struct pw_proxy*)client->core, client->last_sync); seq = pw_proxy_sync((struct pw_proxy*)client->core, client->last_sync);
while (true) { while (true) {
@ -1073,25 +1101,18 @@ do_buffer_frames(struct spa_loop *loop,
{ {
uint32_t buffer_frames = *((uint32_t*)data); uint32_t buffer_frames = *((uint32_t*)data);
struct client *c = user_data; struct client *c = user_data;
if (c->bufsize_callback) do_callback(c, bufsize_callback, buffer_frames, c->bufsize_arg);
c->bufsize_callback(buffer_frames, c->bufsize_arg);
ATOMIC_DEC(c->pending);
return 0; return 0;
} }
static inline void check_buffer_frames(struct client *c, struct spa_io_position *pos, bool rt) static inline void check_buffer_frames(struct client *c, struct spa_io_position *pos)
{ {
uint32_t buffer_frames = pos->clock.duration; uint32_t buffer_frames = pos->clock.duration;
if (SPA_UNLIKELY(buffer_frames != c->buffer_frames)) { if (SPA_UNLIKELY(buffer_frames != c->buffer_frames)) {
pw_log_info(NAME" %p: bufferframes %d", c, buffer_frames); pw_log_info(NAME" %p: bufferframes %d", c, buffer_frames);
ATOMIC_INC(c->pending);
c->buffer_frames = buffer_frames; c->buffer_frames = buffer_frames;
if (rt)
pw_loop_invoke(c->context.l, do_buffer_frames, 0, pw_loop_invoke(c->context.l, do_buffer_frames, 0,
&buffer_frames, sizeof(buffer_frames), false, c); &buffer_frames, sizeof(buffer_frames), false, c);
else
do_buffer_frames(c->context.l->loop, false, 0,
&buffer_frames, sizeof(buffer_frames), c);
} }
} }
@ -1101,25 +1122,18 @@ do_sample_rate(struct spa_loop *loop,
{ {
struct client *c = user_data; struct client *c = user_data;
uint32_t sample_rate = *((uint32_t*)data); uint32_t sample_rate = *((uint32_t*)data);
if (c->srate_callback) do_callback(c, srate_callback, sample_rate, c->srate_arg);
c->srate_callback(sample_rate, c->srate_arg);
ATOMIC_DEC(c->pending);
return 0; return 0;
} }
static inline void check_sample_rate(struct client *c, struct spa_io_position *pos, bool rt) static inline void check_sample_rate(struct client *c, struct spa_io_position *pos)
{ {
uint32_t sample_rate = pos->clock.rate.denom; uint32_t sample_rate = pos->clock.rate.denom;
if (SPA_UNLIKELY(sample_rate != c->sample_rate)) { if (SPA_UNLIKELY(sample_rate != c->sample_rate)) {
pw_log_info(NAME" %p: sample_rate %d", c, sample_rate); pw_log_info(NAME" %p: sample_rate %d", c, sample_rate);
ATOMIC_INC(c->pending);
c->sample_rate = sample_rate; c->sample_rate = sample_rate;
if (rt)
pw_loop_invoke(c->context.l, do_sample_rate, 0, pw_loop_invoke(c->context.l, do_sample_rate, 0,
&sample_rate, sizeof(sample_rate), false, c); &sample_rate, sizeof(sample_rate), false, c);
else
do_sample_rate(c->context.l->loop, false, 0,
&sample_rate, sizeof(sample_rate), c);
} }
} }
@ -1161,8 +1175,8 @@ static inline uint32_t cycle_run(struct client *c)
return 0; return 0;
} }
check_buffer_frames(c, pos, true); check_buffer_frames(c, pos);
check_sample_rate(c, pos, true); check_sample_rate(c, pos);
if (SPA_LIKELY(driver)) { if (SPA_LIKELY(driver)) {
c->jack_state = position_to_jack(driver, &c->jack_position); c->jack_state = position_to_jack(driver, &c->jack_position);
@ -1281,8 +1295,7 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask)
buffer_frames = cycle_run(c); buffer_frames = cycle_run(c);
if (!ATOMIC_LOAD(c->pending) && c->process_callback) status = do_rt_callback_res(c, process_callback, buffer_frames, c->process_arg);
status = c->process_callback(buffer_frames, c->process_arg);
cycle_signal(c, status); cycle_signal(c, status);
} }
@ -1417,8 +1430,7 @@ static int update_driver_activation(struct client *c)
freewheeling = SPA_FLAG_IS_SET(c->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL); freewheeling = SPA_FLAG_IS_SET(c->position->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL);
if (c->freewheeling != freewheeling) { if (c->freewheeling != freewheeling) {
c->freewheeling = freewheeling; c->freewheeling = freewheeling;
if (c->freewheel_callback) do_callback(c, freewheel_callback, freewheeling, c->freewheel_arg);
c->freewheel_callback(freewheeling, c->freewheel_arg);
} }
link = find_activation(&c->links, c->driver_id); link = find_activation(&c->links, c->driver_id);
@ -1463,7 +1475,7 @@ static int client_node_set_io(void *object,
c->driver_id = ptr ? c->position->clock.id : SPA_ID_INVALID; c->driver_id = ptr ? c->position->clock.id : SPA_ID_INVALID;
update_driver_activation(c); update_driver_activation(c);
if (ptr) if (ptr)
check_sample_rate(c, c->position, false); check_sample_rate(c, c->position);
break; break;
default: default:
break; break;
@ -1813,7 +1825,7 @@ static int port_set_latency(struct client *c, struct port *p,
mode = JackCaptureLatency; mode = JackCaptureLatency;
if (c->latency_callback) if (c->latency_callback)
c->latency_callback(mode, c->latency_arg); do_callback(c, latency_callback, mode, c->latency_arg);
else else
default_latency_callback(mode, c); default_latency_callback(mode, c);
@ -1906,7 +1918,6 @@ static int client_node_port_use_buffers(void *object,
res = -EINVAL; res = -EINVAL;
goto done; goto done;
} }
if ((mix = ensure_mix(c, p, mix_id)) == NULL) { if ((mix = ensure_mix(c, p, mix_id)) == NULL) {
res = -ENOMEM; res = -ENOMEM;
goto done; goto done;
@ -2625,18 +2636,19 @@ static void registry_event_global(void *data, uint32_t id,
switch (o->type) { switch (o->type) {
case INTERFACE_Node: case INTERFACE_Node:
if (c->registration_callback && is_first) if (is_first)
c->registration_callback(o->node.name, 1, c->registration_arg); do_callback(c, registration_callback,
o->node.name, 1, c->registration_arg);
break; break;
case INTERFACE_Port: case INTERFACE_Port:
if (c->portregistration_callback) do_callback(c, portregistration_callback,
c->portregistration_callback(o->id, 1, c->portregistration_arg); o->id, 1, c->portregistration_arg);
break; break;
case INTERFACE_Link: case INTERFACE_Link:
if (c->connect_callback) do_callback(c, connect_callback,
c->connect_callback(o->port_link.src, o->port_link.dst, 1, c->connect_arg); o->port_link.src, o->port_link.dst, 1, c->connect_arg);
break; break;
} }
@ -2683,16 +2695,17 @@ static void registry_event_global_remove(void *object, uint32_t id)
if (spa_streq(o->node.node_name, c->metadata->default_audio_source)) if (spa_streq(o->node.node_name, c->metadata->default_audio_source))
c->metadata->default_audio_source[0] = '\0'; c->metadata->default_audio_source[0] = '\0';
} }
if (c->registration_callback && is_last) if (is_last)
c->registration_callback(o->node.name, 0, c->registration_arg); do_callback(c, registration_callback,
o->node.name, 0, c->registration_arg);
break; break;
case INTERFACE_Port: case INTERFACE_Port:
if (c->portregistration_callback) do_callback(c, portregistration_callback,
c->portregistration_callback(o->id, 0, c->portregistration_arg); o->id, 0, c->portregistration_arg);
break; break;
case INTERFACE_Link: case INTERFACE_Link:
if (c->connect_callback) do_callback(c, connect_callback,
c->connect_callback(o->port_link.src, o->port_link.dst, 0, c->connect_arg); o->port_link.src, o->port_link.dst, 0, c->connect_arg);
break; break;
} }
@ -2814,6 +2827,7 @@ jack_client_t * jack_client_open (const char *client_name,
spa_list_init(&client->context.free_objects); spa_list_init(&client->context.free_objects);
pthread_mutex_init(&client->context.lock, NULL); pthread_mutex_init(&client->context.lock, NULL);
pthread_mutex_init(&client->rt_lock, NULL);
spa_list_init(&client->context.nodes); spa_list_init(&client->context.nodes);
spa_list_init(&client->context.ports); spa_list_init(&client->context.ports);
spa_list_init(&client->context.links); spa_list_init(&client->context.links);
@ -3000,6 +3014,7 @@ int jack_client_close (jack_client_t *client)
pw_log_debug(NAME" %p: free", client); pw_log_debug(NAME" %p: free", client);
pw_map_clear(&c->context.globals); pw_map_clear(&c->context.globals);
pthread_mutex_destroy(&c->context.lock); pthread_mutex_destroy(&c->context.lock);
pthread_mutex_destroy(&c->rt_lock);
pw_properties_free(c->props); pw_properties_free(c->props);
free(c); free(c);
@ -3093,14 +3108,11 @@ static int do_activate(struct client *c)
{ {
int res; int res;
pw_thread_loop_lock(c->context.loop);
pw_log_info(NAME" %p: activate", c); pw_log_info(NAME" %p: activate", c);
pw_client_node_set_active(c->node, true); pw_client_node_set_active(c->node, true);
res = do_sync(c); res = do_sync(c);
pw_thread_loop_unlock(c->context.loop);
return res; return res;
} }
@ -3108,22 +3120,27 @@ SPA_EXPORT
int jack_activate (jack_client_t *client) int jack_activate (jack_client_t *client)
{ {
struct client *c = (struct client *) client; struct client *c = (struct client *) client;
int res; int res = 0;
spa_return_val_if_fail(c != NULL, -EINVAL); spa_return_val_if_fail(c != NULL, -EINVAL);
if (c->active) if (c->active)
return 0; return 0;
pw_thread_loop_lock(c->context.loop);
if ((res = do_activate(c)) < 0) if ((res = do_activate(c)) < 0)
return res; goto done;
c->activation->pending_new_pos = true; c->activation->pending_new_pos = true;
c->activation->pending_sync = true; c->activation->pending_sync = true;
c->active = true; c->active = true;
if (c->position) if (c->position)
check_buffer_frames(c, c->position, false); check_buffer_frames(c, c->position);
done:
pw_thread_loop_unlock(c->context.loop);
return 0; return 0;
} }
@ -4159,6 +4176,12 @@ int jack_port_set_alias (jack_port_t *port, const char *alias)
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id);
if (p == NULL || !p->valid) {
res = -EINVAL;
goto done;
}
if (o->port.alias1[0] == '\0') { if (o->port.alias1[0] == '\0') {
key = PW_KEY_OBJECT_PATH; key = PW_KEY_OBJECT_PATH;
snprintf(o->port.alias1, sizeof(o->port.alias1), "%s", alias); snprintf(o->port.alias1, sizeof(o->port.alias1), "%s", alias);
@ -4172,12 +4195,6 @@ int jack_port_set_alias (jack_port_t *port, const char *alias)
goto done; goto done;
} }
p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id);
if (p == NULL || !p->valid) {
res = -EINVAL;
goto done;
}
pw_properties_set(p->props, key, alias); pw_properties_set(p->props, key, alias);
p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS; p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS;
@ -4213,6 +4230,12 @@ int jack_port_unset_alias (jack_port_t *port, const char *alias)
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id);
if (p == NULL || !p->valid) {
res = -EINVAL;
goto done;
}
if (spa_streq(o->port.alias1, alias)) if (spa_streq(o->port.alias1, alias))
key = PW_KEY_OBJECT_PATH; key = PW_KEY_OBJECT_PATH;
else if (spa_streq(o->port.alias2, alias)) else if (spa_streq(o->port.alias2, alias))
@ -4222,12 +4245,6 @@ int jack_port_unset_alias (jack_port_t *port, const char *alias)
goto done; goto done;
} }
p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id);
if (p == NULL || !p->valid) {
res = -EINVAL;
goto done;
}
pw_properties_set(p->props, key, NULL); pw_properties_set(p->props, key, NULL);
p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS; p->info.change_mask |= SPA_PORT_CHANGE_MASK_PROPS;
@ -5052,19 +5069,24 @@ int jack_set_sync_callback (jack_client_t *client,
JackSyncCallback sync_callback, JackSyncCallback sync_callback,
void *arg) void *arg)
{ {
int res; int res = 0;
struct client *c = (struct client *) client; struct client *c = (struct client *) client;
spa_return_val_if_fail(c != NULL, -EINVAL); spa_return_val_if_fail(c != NULL, -EINVAL);
pw_thread_loop_lock(c->context.loop);
c->sync_callback = sync_callback; c->sync_callback = sync_callback;
c->sync_arg = arg; c->sync_arg = arg;
if ((res = do_activate(c)) < 0) if ((res = do_activate(c)) < 0)
return res; goto done;
c->activation->pending_sync = true; c->activation->pending_sync = true;
return 0; done:
pw_thread_loop_unlock(c->context.loop);
return res;
} }
SPA_EXPORT SPA_EXPORT
@ -5090,12 +5112,14 @@ int jack_set_timebase_callback (jack_client_t *client,
JackTimebaseCallback timebase_callback, JackTimebaseCallback timebase_callback,
void *arg) void *arg)
{ {
int res; int res = 0;
struct client *c = (struct client *) client; struct client *c = (struct client *) client;
spa_return_val_if_fail(c != NULL, -EINVAL); spa_return_val_if_fail(c != NULL, -EINVAL);
spa_return_val_if_fail(timebase_callback != NULL, -EINVAL); spa_return_val_if_fail(timebase_callback != NULL, -EINVAL);
pw_thread_loop_lock(c->context.loop);
c->timebase_callback = timebase_callback; c->timebase_callback = timebase_callback;
c->timebase_arg = arg; c->timebase_arg = arg;
c->timeowner_pending = true; c->timeowner_pending = true;
@ -5105,11 +5129,13 @@ int jack_set_timebase_callback (jack_client_t *client,
pw_log_debug(NAME" %p: timebase set id:%u", c, c->node_id); pw_log_debug(NAME" %p: timebase set id:%u", c, c->node_id);
if ((res = do_activate(c)) < 0) if ((res = do_activate(c)) < 0)
return res; goto done;
c->activation->pending_new_pos = true; c->activation->pending_new_pos = true;
done:
pw_thread_loop_unlock(c->context.loop);
return 0; return res;
} }
SPA_EXPORT SPA_EXPORT