diff --git a/src/pipewire-jack.c b/src/pipewire-jack.c index 425f74adf..272b7cf4a 100644 --- a/src/pipewire-jack.c +++ b/src/pipewire-jack.c @@ -161,6 +161,13 @@ struct io { uint32_t memid; }; +struct link { + uint32_t node_id; + uint32_t memid; + struct pw_node_activation *activation; + int signalfd; +}; + struct mix { struct spa_list link; struct spa_list port_link; @@ -239,7 +246,6 @@ struct client { struct spa_hook proxy_listener; uint32_t node_id; - int writefd; struct spa_source *socket_source; bool active; @@ -286,7 +292,10 @@ struct client { struct spa_list ports[2]; struct spa_list free_ports[2]; - struct pw_array mems; + struct pw_array mems; + struct pw_array links; + + struct pw_node_activation *activation; bool started; int status; @@ -576,6 +585,17 @@ static const struct pw_proxy_events proxy_events = { .destroy = on_node_proxy_destroy, }; +static struct link *find_activation(struct pw_array *links, uint32_t node_id) +{ + struct link *l; + + pw_array_for_each(l, links) { + if (l->node_id == node_id) + return l; + } + return NULL; +} + static struct mem *find_mem(struct pw_array *mems, uint32_t id) { struct mem *m; @@ -688,10 +708,6 @@ do_remove_sources(struct spa_loop *loop, pw_loop_destroy_source(c->context.core->data_loop, c->socket_source); c->socket_source = NULL; } - if (c->writefd != -1) { - close(c->writefd); - c->writefd = -1; - } return 0; } @@ -772,6 +788,7 @@ static void on_rtsocket_condition(void *data, int fd, enum spa_io mask) { struct client *c = data; + struct timespec ts; if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_warn("got error"); @@ -783,6 +800,7 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask) uint64_t cmd, nsec, frame; int64_t delay; uint32_t buffer_size, sample_rate; + struct link *l; if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) pw_log_warn("jack %p: read failed %m", c); @@ -809,6 +827,11 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask) delay = 0; } + clock_gettime(CLOCK_MONOTONIC, &ts); + nsec = SPA_TIMESPEC_TO_NSEC(&ts); + c->activation->status = AWAKE; + c->activation->finish_time = nsec; + if (buffer_size != c->buffer_size) { pw_log_info("jack %p: buffersize %d", c, buffer_size); c->buffer_size = buffer_size; @@ -851,15 +874,37 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask) } process_tee(c); + clock_gettime(CLOCK_MONOTONIC, &ts); + nsec = SPA_TIMESPEC_TO_NSEC(&ts); + c->activation->status = FINISHED; + c->activation->finish_time = nsec; + cmd = 1; - if (write(c->writefd, &cmd, sizeof(cmd)) != sizeof(cmd)) - pw_log_warn("jack %p: write failed %m", c); + pw_array_for_each(l, &c->links) { + struct spa_graph_state *state; + + if (l->activation == NULL) + continue; + + state = &l->activation->state[0]; + + pw_log_trace("link %p %p %d/%d", l, state, state->pending, state->required); + if (spa_graph_state_dec(state, 1)) { + l->activation->status = TRIGGERED; + l->activation->signal_time = nsec; + + pw_log_trace("signal %p %p", l, state); + if (write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)) + pw_log_warn("jack %p: write failed %m", c); + } + } } } static void clean_transport(struct client *c) { struct mem *m; + struct link *l; if (c->node_id == SPA_ID_INVALID) return; @@ -869,8 +914,9 @@ static void clean_transport(struct client *c) pw_array_for_each(m, &c->mems) clear_mem(c, m); pw_array_clear(&c->mems); - - close(c->writefd); + pw_array_for_each(l, &c->links) + close(l->signalfd); + pw_array_clear(&c->links); c->node_id = SPA_ID_INVALID; } @@ -890,8 +936,8 @@ static void client_node_transport(void *object, pw_log_debug("client %p: create client transport with fds %d %d for node %u", c, readfd, writefd, node_id); - c->writefd = writefd; - c->socket_source = pw_loop_add_io(core->data_loop, + close(writefd); + c->socket_source = pw_loop_add_io(core->data_loop, readfd, SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, c); @@ -1458,6 +1504,63 @@ static void client_node_port_set_io(void *object, pw_client_node_proxy_done(c->node_proxy, seq, res); } +static void client_node_set_activation(void *object, + uint32_t node_id, + int signalfd, + uint32_t mem_id, + uint32_t offset, + uint32_t size) +{ + struct client *c = (struct client *) object; + struct mem *m; + struct link *link; + void *ptr; + + if (mem_id == SPA_ID_INVALID) { + ptr = NULL; + size = 0; + } + else { + m = find_mem(&c->mems, mem_id); + if (m == NULL) { + pw_log_warn("unknown memory id %u", mem_id); + return; + } + if ((ptr = mem_map(c, m, offset, size)) == NULL) { + return; + } + m->ref++; + } + + pw_log_debug("node %p: set activation %u: %u %u %u %p", c, node_id, + mem_id, offset, size, ptr); + + if (c->node_id == node_id) { + pw_log_debug("node %p: our activation %u: %u %u %u %p", c, node_id, + mem_id, offset, size, ptr); + if (ptr) + c->activation = ptr; + return; + } + + if (ptr) { + link = pw_array_add(&c->links, sizeof(struct link)); + link->node_id = node_id; + link->memid = mem_id; + link->activation = ptr; + link->signalfd = signalfd; + } + else { + link = find_activation(&c->links, node_id); + if (link == NULL) + return; + + link->node_id = SPA_ID_INVALID; + link->activation = NULL; + close(link->signalfd); + } +} + static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, .add_mem = client_node_add_mem, @@ -1472,6 +1575,7 @@ static const struct pw_client_node_proxy_events client_node_events = { .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, .port_set_io = client_node_port_set_io, + .set_activation = client_node_set_activation, }; static jack_port_type_id_t string_to_type(const char *port_type) @@ -1725,8 +1829,9 @@ jack_client_t * jack_client_open (const char *client_name, #endif } - pw_array_init(&client->mems, 64); - pw_array_ensure_size(&client->mems, sizeof(struct mem) * 64); + pw_array_init(&client->mems, 64); + pw_array_ensure_size(&client->mems, sizeof(struct mem) * 64); + pw_array_init(&client->links, 64); client->buffer_size = (uint32_t)-1; client->sample_rate = (uint32_t)-1;