loop: keep loop and system around

So that we don't have to go through the context all the time.
This commit is contained in:
Wim Taymans 2023-04-28 11:12:02 +02:00
parent 8f7acb717c
commit 472a948974
7 changed files with 80 additions and 60 deletions

View file

@ -3406,7 +3406,7 @@ jack_client_t * jack_client_open (const char *client_name,
SPA_VERSION_THREAD_UTILS,
&thread_utils_impl, client);
client->loop = client->context.context->data_loop_impl;
client->loop = pw_context_get_data_loop(client->context.context);
pw_data_loop_stop(client->loop);
pw_context_set_object(client->context.context,

View file

@ -47,6 +47,8 @@ struct mix {
struct node_data {
struct pw_context *context;
struct pw_loop *data_loop;
struct spa_system *data_system;
struct pw_mempool *pool;
@ -108,12 +110,11 @@ do_deactivate_link(struct spa_loop *loop,
static void clear_link(struct node_data *data, struct link *link)
{
struct pw_context *context = data->context;
pw_log_debug("link %p", link);
pw_loop_invoke(context->data_loop,
pw_loop_invoke(data->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, link);
pw_memmap_free(link->map);
spa_system_close(context->data_system, link->signalfd);
spa_system_close(data->data_system, link->signalfd);
spa_list_remove(&link->link);
free(link);
}
@ -140,7 +141,7 @@ static void clean_transport(struct node_data *data)
pw_memmap_free(data->activation);
data->node->rt.activation = data->node->activation->map->ptr;
spa_system_close(data->context->data_system, data->rtwritefd);
spa_system_close(data->data_system, data->rtwritefd);
data->have_transport = false;
}
@ -169,7 +170,7 @@ deactivate_mix(struct node_data *data, struct mix *mix)
{
if (mix->active) {
pw_log_debug("node %p: mix %p deactivate", data, mix);
pw_loop_invoke(data->context->data_loop,
pw_loop_invoke(data->data_loop,
do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix);
mix->active = false;
}
@ -191,7 +192,7 @@ activate_mix(struct node_data *data, struct mix *mix)
{
if (!mix->active) {
pw_log_debug("node %p: mix %p activate", data, mix);
pw_loop_invoke(data->context->data_loop,
pw_loop_invoke(data->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix);
mix->active = true;
}
@ -263,7 +264,7 @@ static int client_node_transport(void *_data,
proxy, readfd, writefd, data->remote_id, data->activation->ptr);
data->rtwritefd = writefd;
spa_system_close(data->context->data_system, data->node->source.fd);
spa_system_close(data->data_system, data->node->source.fd);
data->node->source.fd = readfd;
data->have_transport = true;
@ -849,7 +850,7 @@ exit:
static int link_signal_func(void *user_data)
{
struct link *link = user_data;
struct spa_system *data_system = link->data->context->data_system;
struct spa_system *data_system = link->data->data_system;
pw_log_trace_fp("link %p: signal %p", link, link->target.activation);
if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, link->signalfd, 1) < 0))
@ -888,7 +889,7 @@ client_node_set_activation(void *_data,
if (data->remote_id == node_id) {
pw_log_debug("node %p: our activation %u: %u %u %u", node, node_id,
memid, offset, size);
spa_system_close(data->context->data_system, signalfd);
spa_system_close(data->data_system, signalfd);
return 0;
}
@ -922,7 +923,7 @@ client_node_set_activation(void *_data,
link->target.node = NULL;
spa_list_append(&data->links, &link->link);
pw_loop_invoke(data->context->data_loop,
pw_loop_invoke(data->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, link);
pw_log_debug("node %p: link %p: fd:%d id:%u state %p required %d, pending %d",
@ -1177,7 +1178,7 @@ static int node_ready(void *d, int status)
struct node_data *data = d;
struct pw_impl_node *node = data->node;
struct pw_node_activation *a = node->rt.activation;
struct spa_system *data_system = data->context->data_system;
struct spa_system *data_system = data->data_system;
struct timespec ts;
struct pw_impl_port *p;
@ -1254,6 +1255,8 @@ static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_
data->node = node;
data->do_free = do_free;
data->context = pw_impl_node_get_context(node);
data->data_loop = node->data_loop;
data->data_system = data->data_loop->system;
data->client_node = (struct pw_client_node *)client_node;
data->remote_id = SPA_ID_INVALID;

View file

@ -74,6 +74,8 @@ struct impl {
struct pw_context *context;
struct pw_properties *properties;
struct pw_loop *data_loop;
struct spa_hook context_listener;
struct spa_hook module_listener;
@ -284,7 +286,7 @@ static int do_stop(struct spa_loop *loop,
static void stop_listener(struct impl *impl)
{
if (impl->listening) {
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
do_stop, SPA_ID_INVALID, NULL, 0, true, impl);
impl->listening = false;
}
@ -338,7 +340,7 @@ global_bind(void *object, struct pw_impl_client *client, uint32_t permissions,
if (++impl->busy == 1) {
pw_log_info("%p: starting profiler", impl);
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
do_start, SPA_ID_INVALID, NULL, 0, false, impl);
impl->listening = true;
}
@ -411,6 +413,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->context = context;
impl->properties = props;
impl->data_loop = pw_context_get_data_loop(impl->context)->loop;
spa_ringbuffer_init(&impl->buffer);

View file

@ -107,6 +107,8 @@ struct filter {
const char *path;
struct pw_context *context;
struct pw_loop *main_loop;
struct pw_loop *data_loop;
enum pw_filter_flags flags;
@ -467,7 +469,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
impl->position = data;
else
impl->position = NULL;
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
do_set_position, 1, NULL, 0, true, impl);
break;
}
@ -486,7 +488,7 @@ static int impl_send_command(void *object, const struct spa_command *command)
case SPA_NODE_COMMAND_Suspend:
case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause:
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
NULL, 0, NULL, 0, false, impl);
if (filter->state == PW_FILTER_STATE_STREAMING) {
pw_log_debug("%p: pause", filter);
@ -987,7 +989,7 @@ static void call_process(struct filter *impl)
process, 0, impl->rt.position);
}
else {
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
}
@ -1006,7 +1008,7 @@ do_call_drained(struct spa_loop *loop,
static void call_drained(struct filter *impl)
{
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
do_call_drained, 1, NULL, 0, false, impl);
}
@ -1200,6 +1202,10 @@ filter_new(struct pw_context *context, const char *name,
goto error_cleanup;
}
impl->main_loop = pw_context_get_main_loop(context);
impl->data_loop = pw_data_loop_get_loop(
pw_context_get_data_loop(context));
this = &impl->this;
pw_log_debug("%p: new", impl);
@ -1399,7 +1405,7 @@ void pw_filter_destroy(struct pw_filter *filter)
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
struct port *p;
ensure_loop(impl->context->main_loop, return);
ensure_loop(impl->main_loop, return);
pw_log_debug("%p: destroy", filter);
@ -1453,7 +1459,7 @@ void pw_filter_add_listener(struct pw_filter *filter,
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop);
ensure_loop(impl->main_loop);
spa_hook_list_append(&filter->listener_list, listener, events, data);
if (events->process && impl->rt_callbacks.funcs == NULL) {
@ -1503,7 +1509,7 @@ int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const
struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data);
int changed = 0;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (port_data) {
changed = pw_properties_update(port->props, dict);
@ -1542,7 +1548,7 @@ pw_filter_connect(struct pw_filter *filter,
uint32_t i;
struct spa_dict_item items[1];
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (filter->proxy != NULL || filter->state != PW_FILTER_STATE_UNCONNECTED)
return -EBUSY;
@ -1637,7 +1643,7 @@ SPA_EXPORT
int pw_filter_disconnect(struct pw_filter *filter)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
return filter_disconnect(impl);
}
@ -1720,7 +1726,7 @@ void *pw_filter_add_port(struct pw_filter *filter,
struct port *p;
const char *str;
ensure_loop(impl->context->main_loop, return NULL);
ensure_loop(impl->main_loop, return NULL);
if (props == NULL)
props = pw_properties_new(NULL, NULL);
@ -1784,7 +1790,7 @@ int pw_filter_remove_port(void *port_data)
struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data);
struct filter *impl = port->filter;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
free_port(impl, port);
return 0;
@ -1796,7 +1802,7 @@ int pw_filter_set_error(struct pw_filter *filter,
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (res < 0) {
va_list args;
@ -1828,7 +1834,7 @@ int pw_filter_update_params(struct pw_filter *filter,
struct port *port;
int res;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
pw_log_debug("%p: update params", filter);
@ -1851,7 +1857,7 @@ int pw_filter_set_active(struct pw_filter *filter, bool active)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
pw_log_debug("%p: active:%d", filter, active);
return 0;
@ -1969,7 +1975,7 @@ SPA_EXPORT
int pw_filter_flush(struct pw_filter *filter, bool drain)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
return 0;
}
@ -2013,10 +2019,10 @@ int pw_filter_trigger_process(struct pw_filter *filter)
pw_log_trace_fp("%p", impl);
if (!impl->driving) {
res = pw_loop_invoke(impl->context->main_loop,
res = pw_loop_invoke(impl->main_loop,
do_trigger_request_process, 1, NULL, 0, false, impl);
} else {
res = pw_loop_invoke(impl->context->data_loop,
res = pw_loop_invoke(impl->data_loop,
do_trigger_process, 1, NULL, 0, false, impl);
}
return res;

View file

@ -146,7 +146,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, s
this->added = true;
if (this->source.loop == NULL) {
struct spa_system *data_system = this->context->data_system;
struct spa_system *data_system = this->data_loop->system;
uint64_t dummy;
int res;
@ -1126,7 +1126,7 @@ static inline int process_node(void *data)
struct pw_impl_node *this = data;
struct pw_impl_port *p;
struct pw_node_activation *a = this->rt.activation;
struct spa_system *data_system = this->context->data_system;
struct spa_system *data_system = this->data_loop->system;
int status;
uint64_t nsec;
@ -1177,7 +1177,7 @@ static inline int process_node(void *data)
static void node_on_fd_events(struct spa_source *source)
{
struct pw_impl_node *this = source->data;
struct spa_system *data_system = this->context->data_system;
struct spa_system *data_system = this->data_system;
if (SPA_UNLIKELY(source->rmask & (SPA_IO_ERR | SPA_IO_HUP))) {
pw_log_warn("%p: got socket error %08x", this, source->rmask);
@ -1235,7 +1235,6 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
struct impl *impl;
struct pw_impl_node *this;
size_t size;
struct spa_system *data_system = context->data_system;
int res;
impl = calloc(1, sizeof(struct impl) + user_data_size);
@ -1251,6 +1250,9 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
this->context = context;
this->name = strdup("node");
this->data_loop = pw_context_get_data_loop(context)->loop;
this->data_system = this->data_loop->system;
if (user_data_size > 0)
this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
@ -1263,7 +1265,8 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
this->properties = properties;
if ((res = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
if ((res = spa_system_eventfd_create(this->data_system,
SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
goto error_clean;
pw_log_debug("%p: new fd:%d", this, res);
@ -1289,8 +1292,6 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
impl->work = pw_context_get_work_queue(this->context);
impl->pending_id = SPA_ID_INVALID;
this->data_loop = context->data_loop;
spa_list_init(&this->follower_list);
spa_hook_list_init(&this->listener_list);
@ -1334,7 +1335,7 @@ error_clean:
if (this->activation)
pw_memblock_unref(this->activation);
if (this->source.fd != -1)
spa_system_close(this->context->data_system, this->source.fd);
spa_system_close(this->data_system, this->source.fd);
free(impl);
error_exit:
pw_properties_free(properties);
@ -1642,7 +1643,7 @@ static int node_ready(void *data, int status)
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_impl_node *driver = node->driver_node;
struct pw_node_activation *a = node->rt.activation;
struct spa_system *data_system = node->context->data_system;
struct spa_system *data_system = node->data_system;
struct pw_node_target *t;
struct pw_impl_port *p;
uint64_t nsec;
@ -1961,7 +1962,7 @@ void pw_impl_node_destroy(struct pw_impl_node *node)
clear_info(node);
spa_system_close(context->data_system, node->source.fd);
spa_system_close(node->data_system, node->source.fd);
free(impl);
}

View file

@ -750,6 +750,7 @@ struct pw_impl_node {
struct spa_hook_list listener_list;
struct pw_loop *data_loop; /**< the data loop for this node */
struct spa_system *data_system;
struct spa_fraction latency; /**< requested latency */
struct spa_fraction max_latency; /**< maximum latency */

View file

@ -85,6 +85,9 @@ struct stream {
struct pw_context *context;
struct spa_hook context_listener;
struct pw_loop *main_loop;
struct pw_loop *data_loop;
enum spa_direction direction;
enum pw_stream_flags flags;
@ -426,7 +429,7 @@ static inline void call_process(struct stream *impl)
if (impl->process_rt)
spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
else
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
@ -443,7 +446,7 @@ do_call_drained(struct spa_loop *loop,
static void call_drained(struct stream *impl)
{
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
do_call_drained, 1, NULL, 0, false, impl);
}
@ -460,7 +463,7 @@ do_call_trigger_done(struct spa_loop *loop,
static void call_trigger_done(struct stream *impl)
{
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
do_call_trigger_done, 1, NULL, 0, false, impl);
}
@ -494,7 +497,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
else
impl->position = NULL;
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
do_set_position, 1, NULL, 0, true, impl);
break;
default:
@ -607,7 +610,7 @@ static int impl_send_command(void *object, const struct spa_command *command)
case SPA_NODE_COMMAND_Suspend:
case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause:
pw_loop_invoke(impl->context->main_loop,
pw_loop_invoke(impl->main_loop,
NULL, 0, NULL, 0, false, impl);
if (stream->state == PW_STREAM_STATE_STREAMING) {
@ -1448,6 +1451,7 @@ stream_new(struct pw_context *context, const char *name,
res = -errno;
goto error_properties;
}
impl->main_loop = pw_context_get_main_loop(context);
this = &impl->this;
pw_log_debug("%p: new \"%s\"", impl, name);
@ -1649,7 +1653,7 @@ void pw_stream_destroy(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct control *c;
ensure_loop(impl->context->main_loop, return);
ensure_loop(impl->main_loop, return);
pw_log_debug("%p: destroy", stream);
@ -1706,7 +1710,7 @@ void pw_stream_add_listener(struct pw_stream *stream,
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop);
ensure_loop(impl->main_loop);
spa_hook_list_append(&stream->listener_list, listener, events, data);
@ -1744,7 +1748,7 @@ int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict
int changed, res = 0;
struct match match;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
changed = pw_properties_update(stream->properties, dict);
if (!changed)
@ -1855,7 +1859,7 @@ pw_stream_connect(struct pw_stream *stream,
uint32_t i;
int res;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
pw_log_debug("%p: connect target:%d", stream, target_id);
@ -2047,6 +2051,8 @@ pw_stream_connect(struct pw_stream *stream,
pw_impl_node_set_active(impl->node,
!SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_INACTIVE));
impl->data_loop = impl->node->data_loop;
pw_log_debug("%p: export node %p", stream, impl->node);
stream->proxy = pw_core_export(stream->core,
PW_TYPE_INTERFACE_Node, NULL, impl->node, 0);
@ -2086,7 +2092,7 @@ SPA_EXPORT
int pw_stream_disconnect(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
return stream_disconnect(impl);
}
@ -2096,7 +2102,7 @@ int pw_stream_set_error(struct pw_stream *stream,
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (res < 0) {
va_list args;
@ -2126,7 +2132,7 @@ int pw_stream_update_params(struct pw_stream *stream,
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int res;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
pw_log_debug("%p: update params", stream);
if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0)
@ -2151,7 +2157,7 @@ SPA_EXPORT
int pw_stream_set_param(struct pw_stream *stream, uint32_t id, const struct spa_pod *param)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (impl->node == NULL)
return -EIO;
@ -2170,7 +2176,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_valu
struct spa_pod *pod;
struct control *c;
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
if (impl->node == NULL)
return -EIO;
@ -2239,7 +2245,7 @@ int pw_stream_set_active(struct pw_stream *stream, bool active)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
ensure_loop(impl->context->main_loop, return -EIO);
ensure_loop(impl->main_loop, return -EIO);
pw_log_debug("%p: active:%d", stream, active);
@ -2360,7 +2366,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
if (impl->direction == SPA_DIRECTION_OUTPUT &&
impl->driving && !impl->using_trigger) {
pw_log_debug("deprecated: use pw_stream_trigger_process() to drive the stream.");
res = pw_loop_invoke(impl->context->data_loop,
res = pw_loop_invoke(impl->data_loop,
do_trigger_deprecated, 1, NULL, 0, false, impl);
}
return res;
@ -2405,7 +2411,7 @@ int pw_stream_flush(struct pw_stream *stream, bool drain)
if (impl->node == NULL)
return -EIO;
pw_loop_invoke(impl->context->data_loop,
pw_loop_invoke(impl->data_loop,
drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
if (!drain)
@ -2463,13 +2469,13 @@ int pw_stream_trigger_process(struct pw_stream *stream)
impl->using_trigger = true;
if (!impl->driving && !impl->trigger) {
res = pw_loop_invoke(impl->context->main_loop,
res = pw_loop_invoke(impl->main_loop,
do_trigger_request_process, 1, NULL, 0, false, impl);
} else {
if (!impl->process_rt)
call_process(impl);
res = pw_loop_invoke(impl->context->data_loop,
res = pw_loop_invoke(impl->data_loop,
do_trigger_process, 1, NULL, 0, false, impl);
}
return res;