jack: make a real notify queue

Use a ringbuffer to store notify events. Emit them when no method is
running.

See #3183
This commit is contained in:
Wim Taymans 2023-05-11 18:21:57 +02:00
parent 5d5576f4a8
commit 41ec84185f

View file

@ -24,6 +24,7 @@
#include <spa/debug/pod.h>
#include <spa/utils/json.h>
#include <spa/utils/string.h>
#include <spa/utils/ringbuffer.h>
#include <pipewire/pipewire.h>
#include <pipewire/private.h>
@ -72,6 +73,28 @@ PW_LOG_TOPIC_STATIC(jack_log_topic, "jack");
#define SELF_CONNECT_FAIL_ALL -2
#define SELF_CONNECT_IGNORE_ALL 2
#define NOTIFY_BUFFER_SIZE (1u<<13)
#define NOTIFY_BUFFER_MASK (NOTIFY_BUFFER_SIZE-1)
struct notify {
#define NOTIFY_ACTIVE_FLAG (1<<0)
#define NOTIFY_TYPE_NONE ((0<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_REGISTRATION ((1<<4))
#define NOTIFY_TYPE_PORTREGISTRATION ((2<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_CONNECT ((3<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_GRAPH ((4<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_BUFFER_FRAMES ((5<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_SAMPLE_RATE ((6<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_FREEWHEEL ((7<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_SHUTDOWN ((8<<4)|NOTIFY_ACTIVE_FLAG)
#define NOTIFY_TYPE_LATENCY ((9<<4)|NOTIFY_ACTIVE_FLAG)
int type;
struct object *object;
int arg1;
const char *msg;
};
struct client;
struct port;
@ -145,8 +168,6 @@ struct object {
struct spa_hook object_listener;
unsigned int removing:1;
unsigned int removed:1;
unsigned int register_pending:1;
int register_arg;
};
struct midi_buffer {
@ -300,8 +321,12 @@ struct client {
uint32_t node_id;
uint32_t serial;
struct object *object;
struct spa_source *socket_source;
struct spa_source *notify_event;
struct spa_source *notify_source;
void *notify_buffer;
struct spa_ringbuffer notify_ring;
JackThreadCallback thread_callback;
void *thread_arg;
@ -808,9 +833,9 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto
*proto_ptr = 0;
}
#define do_callback_expr(c,expr,callback,active,...) \
#define do_callback_expr(c,expr,callback,do_emit,...) \
({ \
if (c->callback && active) { \
if (c->callback && do_emit) { \
pw_thread_loop_unlock(c->context.loop); \
if (c->locked_process) \
pthread_mutex_lock(&c->rt_lock); \
@ -823,12 +848,12 @@ void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto
} else { \
(expr); \
pw_log_debug("skip " #callback \
" cb:%p active:%d", c->callback, \
active); \
" cb:%p do_emit:%d", c->callback, \
do_emit); \
} \
})
#define do_callback(c,callback,active,...) do_callback_expr(c,(void)0,callback,active,__VA_ARGS__)
#define do_callback(c,callback,do_emit,...) do_callback_expr(c,(void)0,callback,do_emit,__VA_ARGS__)
#define do_rt_callback_res(c,callback,...) \
({ \
@ -867,68 +892,193 @@ static void recompute_latencies(struct client *c)
(c)->frozen_callbacks++; \
})
#define check_callbacks(c) \
({ \
if ((c)->frozen_callbacks == 0 && (c)->pending_callbacks) \
pw_loop_signal_event((c)->context.l, (c)->notify_source); \
})
#define thaw_callbacks(c) \
({ \
(c)->frozen_callbacks--; \
if ((c)->frozen_callbacks == 0 && (c)->pending_callbacks) \
pw_loop_signal_event((c)->context.l, (c)->notify_event); \
check_callbacks(c); \
})
static void emit_callbacks(struct client *c)
{
struct object *o, *t;
struct object *o;
int32_t avail;
uint32_t index;
struct notify *notify;
if (c->frozen_callbacks != 0 || !c->pending_callbacks)
return;
pw_log_debug("%p: enter", c);
pw_log_debug("%p: enter active:%u", c, c->active);
c->pending_callbacks = false;
freeze_callbacks(c);
spa_list_for_each_safe(o, t, &c->context.objects, link) {
if (o->removed)
continue;
if (o->register_pending) {
o->register_pending = false;
switch (o->type) {
case INTERFACE_Node:
do_callback(c, registration_callback, c->active,
o->node.name,
o->register_arg,
c->registration_arg);
break;
case INTERFACE_Port:
pw_log_debug("%p: port %u %s %d", c, o->serial, o->port.name, c->active);
do_callback(c, portregistration_callback, c->active,
o->serial,
o->register_arg,
c->portregistration_arg);
break;
case INTERFACE_Link:
do_callback(c, connect_callback, c->active,
o->port_link.src_serial,
o->port_link.dst_serial,
o->register_arg,
c->connect_arg);
break;
avail = spa_ringbuffer_get_read_index(&c->notify_ring, &index);
while (avail > 0) {
notify = SPA_PTROFF(c->notify_buffer, index & NOTIFY_BUFFER_MASK, struct notify);
o = notify->object;
pw_log_debug("%p: dequeue notify index:%08x %p type:%d %p arg1:%d\n", c,
index, notify, notify->type, o, notify->arg1);
switch (notify->type) {
case NOTIFY_TYPE_REGISTRATION:
pw_log_debug("%p: node %u %s %u", c, o->serial,
o->node.name, notify->arg1);
do_callback(c, registration_callback, true,
o->node.name,
notify->arg1,
c->registration_arg);
break;
case NOTIFY_TYPE_PORTREGISTRATION:
pw_log_debug("%p: port %u %s %u", c, o->serial,
o->port.name, notify->arg1);
do_callback(c, portregistration_callback, c->active,
o->serial,
notify->arg1,
c->portregistration_arg);
break;
case NOTIFY_TYPE_CONNECT:
pw_log_debug("%p: link %u %u -> %u %u", c, o->serial,
o->port_link.src_serial,
o->port_link.dst, notify->arg1);
do_callback(c, connect_callback, c->active,
o->port_link.src_serial,
o->port_link.dst_serial,
notify->arg1,
c->connect_arg);
break;
case NOTIFY_TYPE_GRAPH:
pw_log_debug("%p: graph", c);
recompute_latencies(c);
do_callback(c, graph_callback, c->active, c->graph_arg);
break;
case NOTIFY_TYPE_BUFFER_FRAMES:
pw_log_debug("%p: buffer frames %d", c, notify->arg1);
if (c->buffer_frames != (uint32_t)notify->arg1) {
do_callback_expr(c, c->buffer_frames = notify->arg1,
bufsize_callback, c->active,
notify->arg1, c->bufsize_arg);
recompute_latencies(c);
}
break;
case NOTIFY_TYPE_SAMPLE_RATE:
pw_log_debug("%p: sample rate %d", c, notify->arg1);
if (c->sample_rate != (uint32_t)notify->arg1) {
do_callback_expr(c, c->sample_rate = notify->arg1,
srate_callback, c->active,
notify->arg1, c->srate_arg);
}
break;
case NOTIFY_TYPE_FREEWHEEL:
pw_log_debug("%p: freewheel %d", c, notify->arg1);
do_callback(c, freewheel_callback, c->active,
notify->arg1, c->freewheel_arg);
break;
case NOTIFY_TYPE_SHUTDOWN:
pw_log_debug("%p: shutdown %d %s", c, notify->arg1, notify->msg);
if (c->info_shutdown_callback)
do_callback(c, info_shutdown_callback, c->active,
notify->arg1, notify->msg,
c->info_shutdown_arg);
else
do_callback(c, shutdown_callback, c->active, c->shutdown_arg);
break;
case NOTIFY_TYPE_LATENCY:
pw_log_debug("%p: latency %d", c, notify->arg1);
do_callback(c, latency_callback, c->active, notify->arg1, c->latency_arg);
break;
default:
break;
}
if (o->removing) {
if (o != NULL && notify->arg1 == 0 && o->removing) {
o->removing = false;
free_object(c, o);
}
}
if (c->graph_callback_pending) {
c->graph_callback_pending = false;
recompute_latencies(c);
do_callback(c, graph_callback, c->active, c->graph_arg);
avail -= sizeof(struct notify);
index += sizeof(struct notify);
spa_ringbuffer_read_update(&c->notify_ring, index);
}
thaw_callbacks(c);
pw_log_debug("%p: leave", c);
}
static void notify_event(void *data, uint64_t count)
static int queue_notify(struct client *c, int type, struct object *o, int arg1, const char *msg)
{
int32_t filled;
uint32_t index;
struct notify *notify;
bool emit = false;;
if ((type & NOTIFY_ACTIVE_FLAG) && !c->active)
return 0;
switch (type) {
case NOTIFY_TYPE_REGISTRATION:
emit = c->registration_callback != NULL && o != NULL;
break;
case NOTIFY_TYPE_PORTREGISTRATION:
emit = c->portregistration_callback != NULL && o != NULL;
break;
case NOTIFY_TYPE_CONNECT:
emit = c->connect_callback != NULL && o != NULL;
break;
case NOTIFY_TYPE_GRAPH:
emit = c->graph_callback != NULL || c->latency_callback != NULL;
break;
case NOTIFY_TYPE_BUFFER_FRAMES:
emit = c->bufsize_callback != NULL;
break;
case NOTIFY_TYPE_SAMPLE_RATE:
emit = c->srate_callback != NULL;
break;
case NOTIFY_TYPE_FREEWHEEL:
emit = c->freewheel_callback != NULL;
break;
case NOTIFY_TYPE_SHUTDOWN:
emit = c->info_shutdown_callback != NULL || c->shutdown_callback != NULL;
break;
case NOTIFY_TYPE_LATENCY:
emit = c->latency_callback != NULL;
break;
default:
break;
}
if (!emit) {
pw_log_debug("%p: skip notify %d", c, type);
if (o != NULL && arg1 == 0 && o->removing) {
o->removing = false;
free_object(c, o);
}
return 0;
}
filled = spa_ringbuffer_get_write_index(&c->notify_ring, &index);
if (filled < 0 || filled + sizeof(struct notify) > NOTIFY_BUFFER_SIZE) {
pw_log_warn("%p: notify queue full %d", c, type);
return -ENOSPC;
}
notify = SPA_PTROFF(c->notify_buffer, index & NOTIFY_BUFFER_MASK, struct notify);
notify->type = type;
notify->object = o;
notify->arg1 = arg1;
notify->msg = msg;
pw_log_debug("%p: queue notify index:%08x %p type:%d %p arg1:%d msg:%s\n", c,
index, notify, notify->type, o, notify->arg1, notify->msg);
index += sizeof(struct notify);
spa_ringbuffer_write_update(&c->notify_ring, index);
c->pending_callbacks = true;
check_callbacks(c);
return 0;
}
static void on_notify_event(void *data, uint64_t count)
{
struct client *c = data;
emit_callbacks(c);
@ -944,17 +1094,6 @@ static void on_sync_reply(void *data, uint32_t id, int seq)
pw_thread_loop_signal(client->context.loop, false);
}
static void do_shutdown(struct client *c)
{
if (c->info_shutdown_callback) {
jack_status_t status = JackFailure | JackServerError;
do_callback(c, info_shutdown_callback, c->active,
status, "JACK server has been closed", c->info_shutdown_arg);
}
else
do_callback(c, shutdown_callback, c->active, c->shutdown_arg);
}
static void on_error(void *data, uint32_t id, int seq, int res, const char *message)
{
struct client *client = data;
@ -964,8 +1103,11 @@ static void on_error(void *data, uint32_t id, int seq, int res, const char *mess
if (id == PW_ID_CORE) {
client->last_res = res;
if (res == -EPIPE && !client->destroyed)
do_shutdown(client);
if (res == -EPIPE && !client->destroyed) {
queue_notify(client, NOTIFY_TYPE_SHUTDOWN,
NULL, JackFailure | JackServerError,
"JACK server has been closed");
}
}
pw_thread_loop_signal(client->context.loop, false);
}
@ -989,6 +1131,8 @@ static int do_sync(struct client *client)
client->last_res = 0;
client->pending_sync = pw_proxy_sync((struct pw_proxy*)client->core, client->pending_sync);
if (client->pending_sync < 0)
return client->pending_sync;
while (true) {
if (in_data_thread) {
@ -1061,15 +1205,6 @@ static void client_remove_source(struct client *c)
}
}
static int
do_remove_sources(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct client *c = user_data;
client_remove_source(c);
return 0;
}
static inline void reuse_buffer(struct client *c, struct mix *mix, uint32_t id)
{
struct buffer *b;
@ -1431,19 +1566,6 @@ static inline jack_transport_state_t position_to_jack(struct pw_node_activation
return state;
}
static int
do_buffer_frames(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
uint32_t buffer_frames = *((uint32_t*)data);
struct client *c = user_data;
if (c->buffer_frames != buffer_frames)
do_callback_expr(c, c->buffer_frames = buffer_frames,
bufsize_callback, c->active, buffer_frames, c->bufsize_arg);
recompute_latencies(c);
return 0;
}
static inline int check_buffer_frames(struct client *c, struct spa_io_position *pos)
{
uint32_t buffer_frames = pos->clock.duration;
@ -1451,25 +1573,13 @@ static inline int check_buffer_frames(struct client *c, struct spa_io_position *
pw_log_info("%p: bufferframes old:%d new:%d cb:%p", c,
c->buffer_frames, buffer_frames, c->bufsize_callback);
if (c->buffer_frames != (uint32_t)-1)
pw_loop_invoke(c->context.l, do_buffer_frames, 0,
&buffer_frames, sizeof(buffer_frames), false, c);
queue_notify(c, NOTIFY_TYPE_BUFFER_FRAMES, NULL, buffer_frames, NULL);
else
c->buffer_frames = buffer_frames;
}
return c->buffer_frames == buffer_frames;
}
static int
do_sample_rate(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct client *c = user_data;
uint32_t sample_rate = *((uint32_t*)data);
do_callback_expr(c, c->sample_rate = sample_rate,
srate_callback, c->active, sample_rate, c->srate_arg);
return 0;
}
static inline int check_sample_rate(struct client *c, struct spa_io_position *pos)
{
uint32_t sample_rate = pos->clock.rate.denom;
@ -1477,8 +1587,7 @@ static inline int check_sample_rate(struct client *c, struct spa_io_position *po
pw_log_info("%p: sample_rate old:%d new:%d cb:%p", c,
c->sample_rate, sample_rate, c->srate_callback);
if (c->srate_callback != NULL) {
pw_loop_invoke(c->context.l, do_sample_rate, 0,
&sample_rate, sizeof(sample_rate), false, c);
queue_notify(c, NOTIFY_TYPE_SAMPLE_RATE, NULL, sample_rate, NULL);
} else {
c->sample_rate = sample_rate;
}
@ -1660,25 +1769,27 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask)
}
}
static int
do_clear_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
static void free_link(struct link *link)
{
struct link *link = user_data;
spa_list_remove(&link->target_link);
return 0;
}
static void clear_link(struct client *c, struct link *link)
{
pw_data_loop_invoke(c->loop,
do_clear_link, 1, NULL, 0, !c->data_locked, link);
pw_log_debug("free link %p", link);
pw_memmap_free(link->mem);
close(link->signalfd);
spa_list_remove(&link->link);
free(link);
}
static int
do_clean_transport(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct client *c = user_data;
struct link *l;
pw_log_debug("%p: clean transport", c);
client_remove_source(c);
spa_list_consume(l, &c->rt.target_links, target_link)
spa_list_remove(&l->target_link);
return 0;
}
static void clean_transport(struct client *c)
{
struct link *l;
@ -1686,12 +1797,15 @@ static void clean_transport(struct client *c)
if (!c->has_transport)
return;
pw_data_loop_invoke(c->loop,
do_remove_sources, 1, NULL, 0, !c->data_locked, c);
spa_list_consume(l, &c->links, link)
clear_link(c, l);
/* We assume the data-loop is unlocked now and can process our
* clean function. This is reasonable, the cleanup function is run when
* closing the client, which should join the data-thread. */
pw_data_loop_invoke(c->loop, do_clean_transport, 1, NULL, 0, true, c);
spa_list_consume(l, &c->links, link) {
spa_list_remove(&l->link);
free_link(l);
}
c->has_transport = false;
}
@ -1802,7 +1916,7 @@ static int update_driver_activation(struct client *c)
jack_drop_real_time_scheduling(thr);
}
do_callback(c, freewheel_callback, c->active, freewheeling, c->freewheel_arg);
queue_notify(c, NOTIFY_TYPE_FREEWHEEL, NULL, freewheeling, NULL);
if (!freewheeling && thr) {
jack_acquire_real_time_scheduling(thr,
@ -2150,6 +2264,26 @@ static void port_update_latency(struct port *p)
p->info.change_mask = 0;
}
static void port_check_latency(struct port *p, const struct spa_latency_info *latency)
{
struct spa_latency_info *current;
struct client *c = p->client;
struct object *o = p->object;
current = &o->port.latency[latency->direction];
if (spa_latency_info_compare(current, latency) == 0)
return;
*current = *latency;
pw_log_info("%p: %s update %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, c,
o->port.name,
latency->direction == SPA_DIRECTION_INPUT ? "playback" : "capture",
latency->min_quantum, latency->max_quantum,
latency->min_rate, latency->max_rate,
latency->min_ns, latency->max_ns);
port_update_latency(p);
}
/* called from thread-loop */
static void default_latency(struct client *c, enum spa_direction direction,
struct spa_latency_info *latency)
@ -2175,9 +2309,9 @@ static void default_latency(struct client *c, enum spa_direction direction,
/* called from thread-loop */
static void default_latency_callback(jack_latency_callback_mode_t mode, struct client *c)
{
struct spa_latency_info latency, *current;
enum spa_direction direction;
struct spa_latency_info latency;
union pw_map_item *item;
enum spa_direction direction;
struct port *p;
if (mode == JackPlaybackLatency)
@ -2187,21 +2321,11 @@ static void default_latency_callback(jack_latency_callback_mode_t mode, struct c
default_latency(c, direction, &latency);
pw_log_info("client %p: update %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, c,
latency.direction == SPA_DIRECTION_INPUT ? "playback" : "capture",
latency.min_quantum, latency.max_quantum,
latency.min_rate, latency.max_rate,
latency.min_ns, latency.max_ns);
pw_array_for_each(item, &c->ports[direction].items) {
if (pw_map_item_is_free(item))
continue;
p = item->data;
current = &p->object->port.latency[direction];
if (spa_latency_info_compare(current, &latency) == 0)
continue;
*current = latency;
port_update_latency(p);
port_check_latency(p, &latency);
}
}
@ -2241,7 +2365,7 @@ static int port_set_latency(struct client *c, struct port *p,
mode = JackCaptureLatency;
if (c->latency_callback)
do_callback(c, latency_callback, c->active, mode, c->latency_arg);
queue_notify(c, NOTIFY_TYPE_LATENCY, NULL, mode, NULL);
else
default_latency_callback(mode, c);
@ -2520,6 +2644,17 @@ do_activate_link(struct spa_loop *loop,
return 0;
}
static int
do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct link *link = user_data;
pw_log_trace("link %p activate", link);
spa_list_remove(&link->target_link);
free_link(link);
return 0;
}
static int client_node_set_activation(void *data,
uint32_t node_id,
int signalfd,
@ -2580,7 +2715,10 @@ static int client_node_set_activation(void *data,
res = -EINVAL;
goto exit;
}
clear_link(c, link);
spa_list_remove(&link->link);
pw_data_loop_invoke(c->loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, false, link);
}
if (c->driver_id == node_id)
@ -2639,10 +2777,8 @@ static int client_node_port_set_mix_info(void *data,
pw_log_info("%p: our link %u/%u -> %u/%u completed", c,
l->port_link.src, l->port_link.src_serial,
l->port_link.dst, l->port_link.dst_serial);
l->register_pending = true;
l->register_arg = 1;
c->graph_callback_pending = true;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_CONNECT, l, 1, NULL);
queue_notify(c, NOTIFY_TYPE_GRAPH, NULL, 0, NULL);
emit_callbacks(c);
}
}
@ -3021,6 +3157,7 @@ static void registry_event_global(void *data, uint32_t id,
if (id == c->node_id) {
pw_log_debug("%p: add our node %d", c, id);
snprintf(c->name, sizeof(c->name), "%s", o->node.name);
c->object = o;
c->serial = serial;
}
@ -3252,17 +3389,13 @@ static void registry_event_global(void *data, uint32_t id,
case INTERFACE_Node:
if (is_first) {
pw_log_info("%p: client added \"%s\"", c, o->node.name);
o->register_pending = true;
o->register_arg = 1;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_REGISTRATION, o, 1, NULL);
}
break;
case INTERFACE_Port:
pw_log_info("%p: port added %u/%u \"%s\"", c, o->id, o->serial, o->port.name);
o->register_pending = true;
o->register_arg = 1;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_PORTREGISTRATION, o, 1, NULL);
break;
case INTERFACE_Link:
@ -3271,10 +3404,8 @@ static void registry_event_global(void *data, uint32_t id,
o->port_link.dst, o->port_link.dst_serial,
o->port_link.is_complete);
if (o->port_link.is_complete) {
o->register_pending = true;
o->register_arg = 1;
c->graph_callback_pending = true;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_CONNECT, o, 1, NULL);
queue_notify(c, NOTIFY_TYPE_GRAPH, NULL, 0, NULL);
}
break;
}
@ -3302,7 +3433,6 @@ static void registry_event_global_remove(void *data, uint32_t id)
o->proxy = NULL;
}
o->removing = true;
c->pending_callbacks = true;
switch (o->type) {
case INTERFACE_Node:
@ -3314,14 +3444,14 @@ static void registry_event_global_remove(void *data, uint32_t id)
}
if (find_node(c, o->node.name) == NULL) {
pw_log_info("%p: client %u removed \"%s\"", c, o->id, o->node.name);
o->register_pending = true;
o->register_arg = 0;
queue_notify(c, NOTIFY_TYPE_REGISTRATION, o, 0, NULL);
} else {
free_object(c, o);
}
break;
case INTERFACE_Port:
pw_log_info("%p: port %u/%u removed \"%s\"", c, o->id, o->serial, o->port.name);
o->register_pending = true;
o->register_arg = 0;
queue_notify(c, NOTIFY_TYPE_PORTREGISTRATION, o, 0, NULL);
break;
case INTERFACE_Link:
if (o->port_link.is_complete &&
@ -3331,12 +3461,13 @@ static void registry_event_global_remove(void *data, uint32_t id)
o->port_link.src, o->port_link.src_serial,
o->port_link.dst, o->port_link.dst_serial);
o->port_link.is_complete = false;
o->register_pending = true;
o->register_arg = 0;
c->graph_callback_pending = true;
} else
queue_notify(c, NOTIFY_TYPE_CONNECT, o, 0, NULL);
queue_notify(c, NOTIFY_TYPE_GRAPH, NULL, 0, NULL);
} else {
pw_log_warn("unlink between unknown ports %d and %d",
o->port_link.src, o->port_link.dst);
free_object(c, o);
}
break;
}
emit_callbacks(c);
@ -3454,8 +3585,10 @@ jack_client_t * jack_client_open (const char *client_name,
if (client->context.context == NULL)
goto no_props;
client->notify_event = pw_loop_add_event(client->context.l,
notify_event, client);
client->notify_source = pw_loop_add_event(client->context.l,
on_notify_event, client);
client->notify_buffer = calloc(1, NOTIFY_BUFFER_SIZE + sizeof(struct notify));
spa_ringbuffer_init(&client->notify_ring);
client->allow_mlock = client->context.context->settings.mem_allow_mlock;
client->warn_mlock = client->context.context->settings.mem_warn_mlock;
@ -3693,8 +3826,11 @@ int jack_client_close (jack_client_t *client)
clean_transport(c);
if (c->context.loop)
if (c->context.loop) {
queue_notify(c, NOTIFY_TYPE_REGISTRATION, c->object, 0, NULL);
pw_loop_invoke(c->context.l, NULL, 0, NULL, 0, false, c);
pw_thread_loop_stop(c->context.loop);
}
if (c->registry) {
spa_hook_remove(&c->registry_listener);
@ -3715,8 +3851,9 @@ int jack_client_close (jack_client_t *client)
if (c->context.context)
pw_context_destroy(c->context.context);
if (c->notify_event)
pw_loop_destroy_source(c->context.l, c->notify_event);
if (c->notify_source)
pw_loop_destroy_source(c->context.l, c->notify_source);
free(c->notify_buffer);
if (c->context.loop)
pw_thread_loop_destroy(c->context.loop);
@ -3913,14 +4050,13 @@ int jack_activate (jack_client_t *client)
if (o->type != INTERFACE_Port || o->port.port == NULL ||
o->port.port->client != c || !o->port.port->valid)
continue;
o->register_pending = true;
o->register_arg = 1;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_PORTREGISTRATION, o, 1, NULL);
}
done:
if (res < 0)
pw_data_loop_stop(c->loop);
pw_log_debug("%p: activate result:%d", c, res);
thaw_callbacks(c);
pw_thread_loop_unlock(c->context.loop);
@ -3943,15 +4079,11 @@ int jack_deactivate (jack_client_t *client)
pw_thread_loop_lock(c->context.loop);
freeze_callbacks(c);
c->active = false;
pw_data_loop_stop(c->loop);
pw_client_node_set_active(c->node, false);
c->activation->pending_new_pos = false;
c->activation->pending_sync = false;
spa_list_for_each(o, &c->context.objects, link) {
if (o->type != INTERFACE_Link || o->removed)
continue;
@ -3963,10 +4095,13 @@ int jack_deactivate (jack_client_t *client)
if (o->type != INTERFACE_Port || o->port.port == NULL ||
o->port.port->client != c || !o->port.port->valid)
continue;
o->register_pending = true;
o->register_arg = 0;
c->pending_callbacks = true;
queue_notify(c, NOTIFY_TYPE_PORTREGISTRATION, o, 0, NULL);
}
c->activation->pending_new_pos = false;
c->activation->pending_sync = false;
c->active = false;
res = do_sync(c);
thaw_callbacks(c);
@ -4608,12 +4743,24 @@ error_free:
return NULL;
}
static int
do_free_port(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct port *p = user_data;
struct client *c = p->client;
free_port(c, p, !c->active);
return 0;
}
static int
do_invalidate_port(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct port *p = user_data;
struct client *c = p->client;
p->valid = false;
pw_loop_invoke(c->context.l, do_free_port, 0, NULL, 0, false, p);
return 0;
}
@ -4638,8 +4785,7 @@ int jack_port_unregister (jack_client_t *client, jack_port_t *port)
res = -EINVAL;
goto done;
}
pw_data_loop_invoke(c->loop,
do_invalidate_port, 1, NULL, 0, !c->data_locked, p);
pw_data_loop_invoke(c->loop, do_invalidate_port, 1, NULL, 0, false, p);
pw_log_info("%p: port %p unregister \"%s\"", client, port, o->port.name);
@ -4653,7 +4799,6 @@ int jack_port_unregister (jack_client_t *client, jack_port_t *port)
pw_log_warn("can't unregister port %s: %s", o->port.name,
spa_strerror(res));
}
free_port(c, p, !c->active);
done:
thaw_callbacks(c);
pw_thread_loop_unlock(c->context.loop);
@ -5563,11 +5708,12 @@ void jack_port_get_latency_range (jack_port_t *port, jack_latency_callback_mode_
}
static int
do_port_update_latency(struct spa_loop *loop,
do_port_check_latency(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct port *p = user_data;
port_update_latency(p);
const struct spa_latency_info *latency = data;
port_check_latency(p, latency);
return 0;
}
@ -5577,7 +5723,7 @@ void jack_port_set_latency_range (jack_port_t *port, jack_latency_callback_mode_
struct object *o = (struct object *) port;
struct client *c;
enum spa_direction direction;
struct spa_latency_info *current, latency;
struct spa_latency_info latency;
jack_nframes_t nframes;
struct port *p;
@ -5611,43 +5757,19 @@ void jack_port_set_latency_range (jack_port_t *port, jack_latency_callback_mode_
latency.max_rate %= nframes;
}
current = &o->port.latency[direction];
if ((p = o->port.port) == NULL)
return;
if (spa_latency_info_compare(current, &latency) == 0)
return;
pw_log_info("%p: %s update %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, c,
o->port.name,
latency.direction == SPA_DIRECTION_INPUT ? "playback" : "capture",
latency.min_quantum, latency.max_quantum,
latency.min_rate, latency.max_rate,
latency.min_ns, latency.max_ns);
*current = latency;
pw_loop_invoke(c->context.l, do_port_update_latency, 0,
NULL, 0, false, p);
}
static int
do_recompute_latencies(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct client *c = user_data;
pw_log_debug("start");
recompute_latencies(c);
pw_log_debug("stop");
return 0;
pw_loop_invoke(c->context.l, do_port_check_latency, 0,
&latency, sizeof(latency), false, p);
}
SPA_EXPORT
int jack_recompute_total_latencies (jack_client_t *client)
{
struct client *c = (struct client *) client;
pw_loop_invoke(c->context.l, do_recompute_latencies, 0,
NULL, 0, false, c);
queue_notify(c, NOTIFY_TYPE_LATENCY, NULL, JackCaptureLatency, NULL);
queue_notify(c, NOTIFY_TYPE_LATENCY, NULL, JackPlaybackLatency, NULL);
return 0;
}