From 2736c227a19b1db48dd849af6f7f3e929bb53c00 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 9 Sep 2019 17:16:18 +0200 Subject: [PATCH] jack: use our own per client loop, like jack. --- src/pipewire-jack.c | 100 +++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/src/pipewire-jack.c b/src/pipewire-jack.c index a2d3f1704..32e7fd6c4 100644 --- a/src/pipewire-jack.c +++ b/src/pipewire-jack.c @@ -40,6 +40,7 @@ #include #include +#include #include "extensions/client-node.h" @@ -216,6 +217,8 @@ struct client { struct context context; + struct pw_data_loop *loop; + struct pw_remote *remote; struct spa_hook remote_listener; @@ -292,6 +295,7 @@ struct client { unsigned int active:1; unsigned int destroyed:1; unsigned int first:1; + unsigned int thread_entered:1; jack_position_t jack_position; jack_transport_state_t jack_state; @@ -607,7 +611,7 @@ do_remove_sources(struct spa_loop *loop, struct client *c = user_data; if (c->socket_source) { - pw_loop_destroy_source(c->context.core->data_loop, c->socket_source); + pw_loop_destroy_source(c->loop->loop, c->socket_source); c->socket_source = NULL; } return 0; @@ -615,7 +619,7 @@ do_remove_sources(struct spa_loop *loop, static void unhandle_socket(struct client *c) { - pw_loop_invoke(c->context.core->data_loop, + pw_loop_invoke(c->loop->loop, do_remove_sources, 1, NULL, 0, true, c); } @@ -810,41 +814,32 @@ static inline jack_transport_state_t position_to_jack(struct pw_node_activation return state; } -static inline int wait_sync(struct client *c) +static inline uint32_t cycle_run(struct client *c) { uint64_t cmd, nsec; int fd = c->socket_source->fd; - struct spa_io_position *pos = c->position; - struct pw_node_activation *activation = c->activation; - - /* this is blocking if nothing ready */ - if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) - pw_log_warn(NAME" %p: read failed %m", c); - if (cmd > 1) - pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1); - - if (pos == NULL) { - pw_log_error(NAME" %p: missing position", c); - return -1; - } - - nsec = pos->clock.nsec; - activation->status = PW_NODE_ACTIVATION_AWAKE; - activation->awake_time = nsec; - - return 0; -} - -static inline uint32_t cycle_wait(struct client *c) -{ uint32_t buffer_size, sample_rate; struct spa_io_position *pos = c->position; struct pw_node_activation *activation = c->activation; struct pw_node_activation *driver = c->driver_activation; - if (wait_sync(c) < 0) - return 0; + /* this is blocking if nothing ready */ + if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { + pw_log_warn(NAME" %p: read failed %m", c); + if (errno == EWOULDBLOCK) + return 0; + } + if (cmd > 1) + pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1); + if (pos == NULL) { + pw_log_error(NAME" %p: missing position", c); + return 0; + } + + nsec = pos->clock.nsec; + activation->status = PW_NODE_ACTIVATION_AWAKE; + activation->awake_time = nsec; if (c->first) { if (c->thread_init_callback) c->thread_init_callback(c->thread_init_arg); @@ -887,6 +882,18 @@ static inline uint32_t cycle_wait(struct client *c) return buffer_size; } +static inline uint32_t cycle_wait(struct client *c) +{ + int res; + + res = pw_data_loop_wait(c->loop, -1); + if (res <= 0) { + pw_log_warn(NAME" %p: wait error %m", c); + return 0; + } + return cycle_run(c); +} + static inline void signal_sync(struct client *c) { struct timespec ts; @@ -962,15 +969,16 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask) return; } if (c->thread_callback) { - c->thread_callback(c->thread_arg); + if (!c->thread_entered) { + c->thread_entered = true; + c->thread_callback(c->thread_arg); + } return; - } - - if (mask & SPA_IO_IN) { + } else if (mask & SPA_IO_IN) { uint32_t buffer_size; int status; - buffer_size = cycle_wait(c); + buffer_size = cycle_run(c); status = c->process_callback ? c->process_callback(buffer_size, c->process_arg) : 0; @@ -993,6 +1001,8 @@ static void clean_transport(struct client *c) if (c->node_id == SPA_ID_INVALID) return; + pw_data_loop_stop(c->loop); + unhandle_socket(c); pw_array_for_each(l, &c->links) @@ -1009,7 +1019,6 @@ static int client_node_transport(void *object, uint32_t mem_id, uint32_t offset, uint32_t size) { struct client *c = (struct client *) object; - struct pw_core *core = c->context.core; clean_transport(c); @@ -1027,7 +1036,7 @@ static int client_node_transport(void *object, c, readfd, writefd, node_id); close(writefd); - c->socket_source = pw_loop_add_io(core->data_loop, + c->socket_source = pw_loop_add_io(c->loop->loop, readfd, SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, c); @@ -1110,7 +1119,7 @@ static int client_node_command(void *object, const struct spa_command *command) switch (SPA_NODE_COMMAND_ID(command)) { case SPA_NODE_COMMAND_Pause: if (c->started) { - pw_loop_update_io(c->context.core->data_loop, + pw_loop_update_io(c->loop->loop, c->socket_source, SPA_IO_ERR | SPA_IO_HUP); c->started = false; @@ -1119,11 +1128,12 @@ static int client_node_command(void *object, const struct spa_command *command) case SPA_NODE_COMMAND_Start: if (!c->started) { - pw_loop_update_io(c->context.core->data_loop, + pw_loop_update_io(c->loop->loop, c->socket_source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); c->started = true; c->first = true; + c->thread_entered = false; } break; default: @@ -1808,6 +1818,8 @@ static void registry_event_global(void *data, uint32_t id, pw_map_insert_at(&c->context.globals, size++, NULL); pw_map_insert_at(&c->context.globals, id, o); + pw_thread_loop_unlock(c->context.loop); + switch (type) { case PW_TYPE_INTERFACE_Node: if (c->registration_callback) @@ -1824,6 +1836,8 @@ static void registry_event_global(void *data, uint32_t id, c->connect_callback(o->port_link.src, o->port_link.dst, 1, c->connect_arg); break; } + pw_thread_loop_lock(c->context.loop); + exit: return; exit_free: @@ -1842,6 +1856,8 @@ static void registry_event_global_remove(void *object, uint32_t id) if (o == NULL) return; + pw_thread_loop_unlock(c->context.loop); + switch (o->type) { case PW_TYPE_INTERFACE_Node: if (c->registration_callback) @@ -1856,6 +1872,7 @@ static void registry_event_global_remove(void *object, uint32_t id) c->connect_callback(o->port_link.src, o->port_link.dst, 0, c->connect_arg); break; } + pw_thread_loop_lock(c->context.loop); /* JACK clients expect the objects to hang around after * they are unregistered. We keep them in the map but reuse the @@ -1919,6 +1936,10 @@ jack_client_t * jack_client_open (const char *client_name, #endif } + client->loop = pw_data_loop_new(NULL); + if (client->loop == NULL) + goto init_failed; + pw_array_init(&client->links, 64); client->buffer_size = (uint32_t)-1; @@ -2148,6 +2169,8 @@ static int do_activate(struct client *c) { int res; + pw_data_loop_start(c->loop); + pw_thread_loop_lock(c->context.loop); pw_log_debug(NAME" %p: activate", c); pw_client_node_proxy_set_active(c->node_proxy, true); @@ -2174,7 +2197,6 @@ int jack_activate (jack_client_t *client) c->activation->pending_sync = true; c->active = true; - return 0; } @@ -2198,6 +2220,8 @@ int jack_deactivate (jack_client_t *client) pw_thread_loop_unlock(c->context.loop); + pw_data_loop_stop(c->loop); + if (res < 0) return res;