jack: use our own per client loop, like jack.

This commit is contained in:
Wim Taymans 2019-09-09 17:16:18 +02:00
parent 7f7ed1e039
commit 2736c227a1

View file

@ -40,6 +40,7 @@
#include <pipewire/pipewire.h> #include <pipewire/pipewire.h>
#include <pipewire/private.h> #include <pipewire/private.h>
#include <pipewire/data-loop.h>
#include "extensions/client-node.h" #include "extensions/client-node.h"
@ -216,6 +217,8 @@ struct client {
struct context context; struct context context;
struct pw_data_loop *loop;
struct pw_remote *remote; struct pw_remote *remote;
struct spa_hook remote_listener; struct spa_hook remote_listener;
@ -292,6 +295,7 @@ struct client {
unsigned int active:1; unsigned int active:1;
unsigned int destroyed:1; unsigned int destroyed:1;
unsigned int first:1; unsigned int first:1;
unsigned int thread_entered:1;
jack_position_t jack_position; jack_position_t jack_position;
jack_transport_state_t jack_state; jack_transport_state_t jack_state;
@ -607,7 +611,7 @@ do_remove_sources(struct spa_loop *loop,
struct client *c = user_data; struct client *c = user_data;
if (c->socket_source) { if (c->socket_source) {
pw_loop_destroy_source(c->context.core->data_loop, c->socket_source); pw_loop_destroy_source(c->loop->loop, c->socket_source);
c->socket_source = NULL; c->socket_source = NULL;
} }
return 0; return 0;
@ -615,7 +619,7 @@ do_remove_sources(struct spa_loop *loop,
static void unhandle_socket(struct client *c) static void unhandle_socket(struct client *c)
{ {
pw_loop_invoke(c->context.core->data_loop, pw_loop_invoke(c->loop->loop,
do_remove_sources, 1, NULL, 0, true, c); do_remove_sources, 1, NULL, 0, true, c);
} }
@ -810,41 +814,32 @@ static inline jack_transport_state_t position_to_jack(struct pw_node_activation
return state; return state;
} }
static inline int wait_sync(struct client *c) static inline uint32_t cycle_run(struct client *c)
{ {
uint64_t cmd, nsec; uint64_t cmd, nsec;
int fd = c->socket_source->fd; int fd = c->socket_source->fd;
struct spa_io_position *pos = c->position;
struct pw_node_activation *activation = c->activation;
/* this is blocking if nothing ready */
if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd))
pw_log_warn(NAME" %p: read failed %m", c);
if (cmd > 1)
pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1);
if (pos == NULL) {
pw_log_error(NAME" %p: missing position", c);
return -1;
}
nsec = pos->clock.nsec;
activation->status = PW_NODE_ACTIVATION_AWAKE;
activation->awake_time = nsec;
return 0;
}
static inline uint32_t cycle_wait(struct client *c)
{
uint32_t buffer_size, sample_rate; uint32_t buffer_size, sample_rate;
struct spa_io_position *pos = c->position; struct spa_io_position *pos = c->position;
struct pw_node_activation *activation = c->activation; struct pw_node_activation *activation = c->activation;
struct pw_node_activation *driver = c->driver_activation; struct pw_node_activation *driver = c->driver_activation;
if (wait_sync(c) < 0) /* this is blocking if nothing ready */
if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
pw_log_warn(NAME" %p: read failed %m", c);
if (errno == EWOULDBLOCK)
return 0; return 0;
}
if (cmd > 1)
pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1);
if (pos == NULL) {
pw_log_error(NAME" %p: missing position", c);
return 0;
}
nsec = pos->clock.nsec;
activation->status = PW_NODE_ACTIVATION_AWAKE;
activation->awake_time = nsec;
if (c->first) { if (c->first) {
if (c->thread_init_callback) if (c->thread_init_callback)
c->thread_init_callback(c->thread_init_arg); c->thread_init_callback(c->thread_init_arg);
@ -887,6 +882,18 @@ static inline uint32_t cycle_wait(struct client *c)
return buffer_size; return buffer_size;
} }
static inline uint32_t cycle_wait(struct client *c)
{
int res;
res = pw_data_loop_wait(c->loop, -1);
if (res <= 0) {
pw_log_warn(NAME" %p: wait error %m", c);
return 0;
}
return cycle_run(c);
}
static inline void signal_sync(struct client *c) static inline void signal_sync(struct client *c)
{ {
struct timespec ts; struct timespec ts;
@ -962,15 +969,16 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask)
return; return;
} }
if (c->thread_callback) { if (c->thread_callback) {
if (!c->thread_entered) {
c->thread_entered = true;
c->thread_callback(c->thread_arg); c->thread_callback(c->thread_arg);
return;
} }
return;
if (mask & SPA_IO_IN) { } else if (mask & SPA_IO_IN) {
uint32_t buffer_size; uint32_t buffer_size;
int status; int status;
buffer_size = cycle_wait(c); buffer_size = cycle_run(c);
status = c->process_callback ? c->process_callback(buffer_size, c->process_arg) : 0; status = c->process_callback ? c->process_callback(buffer_size, c->process_arg) : 0;
@ -993,6 +1001,8 @@ static void clean_transport(struct client *c)
if (c->node_id == SPA_ID_INVALID) if (c->node_id == SPA_ID_INVALID)
return; return;
pw_data_loop_stop(c->loop);
unhandle_socket(c); unhandle_socket(c);
pw_array_for_each(l, &c->links) pw_array_for_each(l, &c->links)
@ -1009,7 +1019,6 @@ static int client_node_transport(void *object,
uint32_t mem_id, uint32_t offset, uint32_t size) uint32_t mem_id, uint32_t offset, uint32_t size)
{ {
struct client *c = (struct client *) object; struct client *c = (struct client *) object;
struct pw_core *core = c->context.core;
clean_transport(c); clean_transport(c);
@ -1027,7 +1036,7 @@ static int client_node_transport(void *object,
c, readfd, writefd, node_id); c, readfd, writefd, node_id);
close(writefd); close(writefd);
c->socket_source = pw_loop_add_io(core->data_loop, c->socket_source = pw_loop_add_io(c->loop->loop,
readfd, readfd,
SPA_IO_ERR | SPA_IO_HUP, SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, c); true, on_rtsocket_condition, c);
@ -1110,7 +1119,7 @@ static int client_node_command(void *object, const struct spa_command *command)
switch (SPA_NODE_COMMAND_ID(command)) { switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
if (c->started) { if (c->started) {
pw_loop_update_io(c->context.core->data_loop, pw_loop_update_io(c->loop->loop,
c->socket_source, SPA_IO_ERR | SPA_IO_HUP); c->socket_source, SPA_IO_ERR | SPA_IO_HUP);
c->started = false; c->started = false;
@ -1119,11 +1128,12 @@ static int client_node_command(void *object, const struct spa_command *command)
case SPA_NODE_COMMAND_Start: case SPA_NODE_COMMAND_Start:
if (!c->started) { if (!c->started) {
pw_loop_update_io(c->context.core->data_loop, pw_loop_update_io(c->loop->loop,
c->socket_source, c->socket_source,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
c->started = true; c->started = true;
c->first = true; c->first = true;
c->thread_entered = false;
} }
break; break;
default: default:
@ -1808,6 +1818,8 @@ static void registry_event_global(void *data, uint32_t id,
pw_map_insert_at(&c->context.globals, size++, NULL); pw_map_insert_at(&c->context.globals, size++, NULL);
pw_map_insert_at(&c->context.globals, id, o); pw_map_insert_at(&c->context.globals, id, o);
pw_thread_loop_unlock(c->context.loop);
switch (type) { switch (type) {
case PW_TYPE_INTERFACE_Node: case PW_TYPE_INTERFACE_Node:
if (c->registration_callback) if (c->registration_callback)
@ -1824,6 +1836,8 @@ static void registry_event_global(void *data, uint32_t id,
c->connect_callback(o->port_link.src, o->port_link.dst, 1, c->connect_arg); c->connect_callback(o->port_link.src, o->port_link.dst, 1, c->connect_arg);
break; break;
} }
pw_thread_loop_lock(c->context.loop);
exit: exit:
return; return;
exit_free: exit_free:
@ -1842,6 +1856,8 @@ static void registry_event_global_remove(void *object, uint32_t id)
if (o == NULL) if (o == NULL)
return; return;
pw_thread_loop_unlock(c->context.loop);
switch (o->type) { switch (o->type) {
case PW_TYPE_INTERFACE_Node: case PW_TYPE_INTERFACE_Node:
if (c->registration_callback) if (c->registration_callback)
@ -1856,6 +1872,7 @@ static void registry_event_global_remove(void *object, uint32_t id)
c->connect_callback(o->port_link.src, o->port_link.dst, 0, c->connect_arg); c->connect_callback(o->port_link.src, o->port_link.dst, 0, c->connect_arg);
break; break;
} }
pw_thread_loop_lock(c->context.loop);
/* JACK clients expect the objects to hang around after /* JACK clients expect the objects to hang around after
* they are unregistered. We keep them in the map but reuse the * they are unregistered. We keep them in the map but reuse the
@ -1919,6 +1936,10 @@ jack_client_t * jack_client_open (const char *client_name,
#endif #endif
} }
client->loop = pw_data_loop_new(NULL);
if (client->loop == NULL)
goto init_failed;
pw_array_init(&client->links, 64); pw_array_init(&client->links, 64);
client->buffer_size = (uint32_t)-1; client->buffer_size = (uint32_t)-1;
@ -2148,6 +2169,8 @@ static int do_activate(struct client *c)
{ {
int res; int res;
pw_data_loop_start(c->loop);
pw_thread_loop_lock(c->context.loop); pw_thread_loop_lock(c->context.loop);
pw_log_debug(NAME" %p: activate", c); pw_log_debug(NAME" %p: activate", c);
pw_client_node_proxy_set_active(c->node_proxy, true); pw_client_node_proxy_set_active(c->node_proxy, true);
@ -2174,7 +2197,6 @@ int jack_activate (jack_client_t *client)
c->activation->pending_sync = true; c->activation->pending_sync = true;
c->active = true; c->active = true;
return 0; return 0;
} }
@ -2198,6 +2220,8 @@ int jack_deactivate (jack_client_t *client)
pw_thread_loop_unlock(c->context.loop); pw_thread_loop_unlock(c->context.loop);
pw_data_loop_stop(c->loop);
if (res < 0) if (res < 0)
return res; return res;