jack-tunnel: run graph from JACK thread

Stop our own data-loop and enter/iterate/leave it from the jack thread.
This runs all our nodes in the JACK thread and removes 2 context
switches (jack to and from pw thread).

We can possibly do this nicer by only pushing our own streams onto a
new custom data-loop but that's for later.
This commit is contained in:
Wim Taymans 2023-04-28 17:06:29 +02:00
parent 2112216d28
commit d89df144f0

View file

@ -13,7 +13,6 @@
#include <signal.h> #include <signal.h>
#include <limits.h> #include <limits.h>
#include <math.h> #include <math.h>
#include <semaphore.h>
#include "config.h" #include "config.h"
@ -114,9 +113,7 @@ static const struct spa_dict_item module_props[] = {
struct impl { struct impl {
struct pw_context *context; struct pw_context *context;
struct pw_loop *main_loop; struct pw_loop *main_loop;
struct spa_system *system; struct pw_data_loop *data_loop;
sem_t sem;
#define MODE_SINK (1<<0) #define MODE_SINK (1<<0)
#define MODE_SOURCE (1<<1) #define MODE_SOURCE (1<<1)
@ -160,12 +157,13 @@ struct impl {
uint32_t pw_xrun; uint32_t pw_xrun;
uint32_t jack_xrun; uint32_t jack_xrun;
uint32_t total_xrun;
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
unsigned int source_running:1; unsigned int source_running:1;
unsigned int sink_running:1; unsigned int sink_running:1;
unsigned int done:1; unsigned int done:1;
unsigned int new_xrun:1; unsigned int running:1;
}; };
static void source_stream_destroy(void *d) static void source_stream_destroy(void *d)
@ -229,9 +227,9 @@ static void sink_stream_process(void *d)
} }
pw_stream_queue_buffer(impl->sink, buf); pw_stream_queue_buffer(impl->sink, buf);
done: done:
pw_log_trace_fp("done %u", impl->frames);
impl->done = true; impl->done = true;
sem_post(&impl->sem); impl->running = false;
pw_log_trace_fp("done %u", impl->frames);
} }
static void source_stream_process(void *d) static void source_stream_process(void *d)
@ -461,20 +459,28 @@ static int jack_process(jack_nframes_t nframes, void *arg)
impl->nframes = nframes; impl->nframes = nframes;
if (sink_running && source_running) { if (sink_running && source_running) {
pw_loop_enter(impl->data_loop->loop);
while (sem_trywait(&impl->sem) == 0); impl->running = true;
impl->done = false; impl->done = false;
/* clear some stray events */
while (pw_loop_iterate(impl->data_loop->loop, 0) > 0);
/* start processing the graph */
pw_stream_trigger_process(impl->sink); pw_stream_trigger_process(impl->sink);
sem_wait(&impl->sem); while (impl->running) {
if (!impl->done) { if (pw_loop_iterate(impl->data_loop->loop, 1000) <= 0)
impl->pw_xrun++; break;
impl->new_xrun = true;
} }
if (impl->new_xrun) { pw_loop_leave(impl->data_loop->loop);
if (!impl->done)
impl->pw_xrun++;
if (impl->total_xrun != impl->pw_xrun + impl->jack_xrun) {
pw_log_warn("Xrun JACK:%u PipeWire:%u", impl->jack_xrun, impl->pw_xrun); pw_log_warn("Xrun JACK:%u PipeWire:%u", impl->jack_xrun, impl->pw_xrun);
impl->new_xrun = false; impl->total_xrun = impl->pw_xrun + impl->jack_xrun;
} }
} }
pw_log_trace_fp("done %u", impl->frames); pw_log_trace_fp("done %u", impl->frames);
@ -485,11 +491,10 @@ static int jack_process(jack_nframes_t nframes, void *arg)
static int jack_xrun(void *arg) static int jack_xrun(void *arg)
{ {
struct impl *impl = arg; struct impl *impl = arg;
if (impl->done) { if (impl->done)
impl->jack_xrun++; impl->jack_xrun++;
impl->new_xrun = true; impl->running = false;
} pw_loop_invoke(impl->data_loop->loop, NULL, 1, NULL, 0, false, impl);
sem_post(&impl->sem);
return 0; return 0;
} }
@ -671,6 +676,8 @@ static int start_jack_clients(struct impl *impl)
const char **ports; const char **ports;
uint32_t i; uint32_t i;
pw_data_loop_stop(impl->data_loop);
jack_activate(impl->client); jack_activate(impl->client);
ports = jack_get_ports(impl->client, NULL, NULL, ports = jack_get_ports(impl->client, NULL, NULL,
@ -736,6 +743,7 @@ static void impl_destroy(struct impl *impl)
if (impl->client) { if (impl->client) {
jack_deactivate(impl->client); jack_deactivate(impl->client);
jack_client_close(impl->client); jack_client_close(impl->client);
pw_data_loop_start(impl->data_loop);
} }
if (impl->source) if (impl->source)
pw_stream_destroy(impl->source); pw_stream_destroy(impl->source);
@ -748,8 +756,6 @@ static void impl_destroy(struct impl *impl)
pw_properties_free(impl->source_props); pw_properties_free(impl->source_props);
pw_properties_free(impl->props); pw_properties_free(impl->props);
sem_destroy(&impl->sem);
free(impl); free(impl);
} }
@ -834,12 +840,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_log_debug("module %p: new %s", impl, args); pw_log_debug("module %p: new %s", impl, args);
if (sem_init(&impl->sem, 0, 0) < 0) {
res = -errno;
pw_log_error( "can't create semaphore: %m");
goto error;
}
if (args == NULL) if (args == NULL)
args = ""; args = "";
@ -862,8 +862,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module; impl->module = module;
impl->context = context; impl->context = context;
impl->main_loop = pw_context_get_main_loop(context); impl->main_loop = pw_context_get_main_loop(context);
impl->system = impl->main_loop->system; impl->data_loop = pw_context_get_data_loop(context);
impl->mode = MODE_DUPLEX; impl->mode = MODE_DUPLEX;
if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) { if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) {