impl-port: make the rt.mix_list private

The rt.mix_lst is really something internal to the tee and fallback
mixer in the ports so make it private.

Use the port_set_io call to add the Buffer io area to the mix_list
tee and fallback mixer on the port, like we do for remote-node. We can
then remove the custom code to do this in remote-node and impl-link.

Remove an unused field (clock) in the port struct.

Remove the unused io_set field in impl-link, it is always in sync with
the activated field.
This commit is contained in:
Wim Taymans 2023-06-13 15:19:46 +02:00
parent 9ba3792038
commit b080b31848
4 changed files with 77 additions and 90 deletions

View file

@ -42,7 +42,6 @@ struct mix {
uint32_t mix_id; uint32_t mix_id;
struct pw_impl_port_mix mix; struct pw_impl_port_mix mix;
struct pw_array buffers; struct pw_array buffers;
bool active;
}; };
struct node_data { struct node_data {
@ -152,54 +151,10 @@ static void mix_init(struct mix *mix, struct pw_impl_port *port, uint32_t mix_id
mix->port = port; mix->port = port;
mix->mix_id = mix_id; mix->mix_id = mix_id;
pw_impl_port_init_mix(port, &mix->mix); pw_impl_port_init_mix(port, &mix->mix);
mix->active = false;
pw_array_init(&mix->buffers, 32); pw_array_init(&mix->buffers, 32);
pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64);
} }
static int
do_deactivate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_list_remove(&mix->mix.rt_link);
return 0;
}
static int
deactivate_mix(struct node_data *data, struct mix *mix)
{
if (mix->active) {
pw_log_debug("node %p: mix %p deactivate", data, mix);
pw_loop_invoke(data->data_loop,
do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix);
mix->active = false;
}
return 0;
}
static int
do_activate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
spa_list_append(&mix->port->rt.mix_list, &mix->mix.rt_link);
return 0;
}
static int
activate_mix(struct node_data *data, struct mix *mix)
{
if (!mix->active) {
pw_log_debug("node %p: mix %p activate", data, mix);
pw_loop_invoke(data->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix);
mix->active = true;
}
return 0;
}
static struct mix *find_mix(struct node_data *data, static struct mix *find_mix(struct node_data *data,
enum spa_direction direction, uint32_t port_id, uint32_t mix_id) enum spa_direction direction, uint32_t port_id, uint32_t mix_id)
{ {
@ -829,11 +784,6 @@ client_node_port_set_io(void *_data,
pw_log_debug("port %p: set io:%s new:%p old:%p", mix->port, pw_log_debug("port %p: set io:%s new:%p old:%p", mix->port,
spa_debug_type_find_name(spa_type_io, id), ptr, mix->mix.io); spa_debug_type_find_name(spa_type_io, id), ptr, mix->mix.io);
if (id == SPA_IO_Buffers) {
if (ptr == NULL && mix->mix.io)
deactivate_mix(data, mix);
}
if ((res = spa_node_port_set_io(mix->port->mix, if ((res = spa_node_port_set_io(mix->port->mix,
direction, mix->mix.port.port_id, id, ptr, size)) < 0) { direction, mix->mix.port.port_id, id, ptr, size)) < 0) {
if (res == -ENOTSUP) if (res == -ENOTSUP)
@ -841,11 +791,6 @@ client_node_port_set_io(void *_data,
else else
goto exit_free; goto exit_free;
} }
if (id == SPA_IO_Buffers) {
mix->mix.io = ptr;
if (ptr)
activate_mix(data, mix);
}
exit_free: exit_free:
pw_memmap_free(old); pw_memmap_free(old);
exit: exit:
@ -984,8 +929,6 @@ static void clear_mix(struct node_data *data, struct mix *mix)
{ {
pw_log_debug("port %p: mix clear %d.%d", mix->port, mix->port->port_id, mix->mix_id); pw_log_debug("port %p: mix clear %d.%d", mix->port, mix->port->port_id, mix->mix_id);
deactivate_mix(data, mix);
spa_list_remove(&mix->link); spa_list_remove(&mix->link);
clear_buffers(data, mix); clear_buffers(data, mix);

View file

@ -27,7 +27,6 @@ PW_LOG_TOPIC_EXTERN(log_link);
struct impl { struct impl {
struct pw_impl_link this; struct pw_impl_link this;
unsigned int io_set:1;
unsigned int activated:1; unsigned int activated:1;
struct pw_work_queue *work; struct pw_work_queue *work;
@ -639,15 +638,9 @@ do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{ {
struct pw_impl_link *this = user_data; struct pw_impl_link *this = user_data;
pw_log_trace("%p: activate", this); pw_log_trace("%p: activate", this);
spa_list_append(&this->output->rt.mix_list, &this->rt.out_mix.rt_link);
spa_list_append(&this->input->rt.mix_list, &this->rt.in_mix.rt_link);
if (this->peer) if (this->peer)
pw_node_peer_activate(this->peer); pw_node_peer_activate(this->peer);
return 0; return 0;
} }
@ -663,16 +656,14 @@ int pw_impl_link_activate(struct pw_impl_link *this)
!impl->inode->runnable || !impl->onode->runnable) !impl->inode->runnable || !impl->onode->runnable)
return 0; return 0;
if (!impl->io_set) { if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io, sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0) return res;
return res;
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0)
return res;
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
return res;
impl->io_set = true;
}
pw_loop_invoke(this->output->node->data_loop, pw_loop_invoke(this->output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
@ -842,15 +833,9 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{ {
struct pw_impl_link *this = user_data; struct pw_impl_link *this = user_data;
pw_log_trace("%p: disable out %p", this, &this->rt.out_mix);
pw_log_trace("%p: disable %p and %p", this, &this->rt.in_mix, &this->rt.out_mix);
spa_list_remove(&this->rt.out_mix.rt_link);
spa_list_remove(&this->rt.in_mix.rt_link);
if (this->peer) if (this->peer)
pw_node_peer_deactivate(this->peer); pw_node_peer_deactivate(this->peer);
return 0; return 0;
} }
@ -871,7 +856,6 @@ int pw_impl_link_deactivate(struct pw_impl_link *this)
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0,
&this->rt.in_mix); &this->rt.in_mix);
impl->io_set = false;
impl->activated = false; impl->activated = false;
pw_log_info("(%s) deactivated", this->name); pw_log_info("(%s) deactivated", this->name);
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL); link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);

View file

@ -27,6 +27,7 @@ PW_LOG_TOPIC_EXTERN(log_port);
struct impl { struct impl {
struct pw_impl_port this; struct pw_impl_port this;
struct spa_node mix_node; /**< mix node implementation */ struct spa_node mix_node; /**< mix node implementation */
struct spa_list mix_list;
struct spa_list param_list; struct spa_list param_list;
struct spa_list pending_list; struct spa_list pending_list;
@ -111,6 +112,66 @@ void pw_impl_port_update_state(struct pw_impl_port *port, enum pw_impl_port_stat
} }
} }
static struct pw_impl_port_mix *find_mix(struct pw_impl_port *port,
enum spa_direction direction, uint32_t port_id)
{
struct pw_impl_port_mix *mix;
spa_list_for_each(mix, &port->mix_list, link) {
if (mix->port.direction == direction && mix->port.port_id == port_id)
return mix;
}
return NULL;
}
static int
do_add_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_impl_port_mix *mix = user_data;
struct pw_impl_port *this = mix->p;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_trace("%p: add mix %p", this, mix);
spa_list_append(&impl->mix_list, &mix->rt_link);
return 0;
}
static int
do_remove_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_impl_port_mix *mix = user_data;
struct pw_impl_port *this = mix->p;
pw_log_trace("%p: remove mix %p", this, mix);
spa_list_remove(&mix->rt_link);
return 0;
}
static int port_set_io(void *object,
enum spa_direction direction, uint32_t port_id, uint32_t id,
void *data, size_t size)
{
struct impl *impl = object;
struct pw_impl_port *this = &impl->this;
struct pw_impl_port_mix *mix;
mix = find_mix(this, direction, port_id);
if (mix == NULL)
return 0;
if (id == SPA_IO_Buffers) {
if (data == NULL || size == 0) {
pw_loop_invoke(this->node->data_loop,
do_remove_mix, SPA_ID_INVALID, NULL, 0, true, mix);
mix->io = NULL;
} else if (data != NULL && size >= sizeof(struct spa_io_buffers)) {
mix->io = data;
pw_loop_invoke(this->node->data_loop,
do_add_mix, SPA_ID_INVALID, NULL, 0, false, mix);
}
}
return 0;
}
static int tee_process(void *object) static int tee_process(void *object)
{ {
struct impl *impl = object; struct impl *impl = object;
@ -119,7 +180,7 @@ static int tee_process(void *object)
struct spa_io_buffers *io = &this->rt.io; struct spa_io_buffers *io = &this->rt.io;
pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id); pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id);
spa_list_for_each(mix, &this->rt.mix_list, rt_link) { spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: port %d %p->%p %d", this, pw_log_trace_fp("%p: port %d %p->%p %d", this,
mix->port.port_id, io, mix->io, mix->io->buffer_id); mix->port.port_id, io, mix->io, mix->io->buffer_id);
*mix->io = *io; *mix->io = *io;
@ -136,13 +197,13 @@ static int tee_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
pw_log_trace_fp("%p: tee reuse buffer %d %d", this, port_id, buffer_id); pw_log_trace_fp("%p: tee reuse buffer %d %d", this, port_id, buffer_id);
spa_node_port_reuse_buffer(this->node->node, this->port_id, buffer_id); spa_node_port_reuse_buffer(this->node->node, this->port_id, buffer_id);
return 0; return 0;
} }
static const struct spa_node_methods schedule_tee_node = { static const struct spa_node_methods schedule_tee_node = {
SPA_VERSION_NODE_METHODS, SPA_VERSION_NODE_METHODS,
.process = tee_process, .process = tee_process,
.port_set_io = port_set_io,
.port_reuse_buffer = tee_reuse_buffer, .port_reuse_buffer = tee_reuse_buffer,
}; };
@ -156,7 +217,7 @@ static int schedule_mix_input(void *object)
if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this))) if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this)))
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
spa_list_for_each(mix, &this->rt.mix_list, rt_link) { spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this, pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this,
mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id); mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
*io = *mix->io; *io = *mix->io;
@ -169,11 +230,10 @@ static int schedule_mix_input(void *object)
static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
{ {
struct impl *impl = object; struct impl *impl = object;
struct pw_impl_port *this = &impl->this;
struct pw_impl_port_mix *mix; struct pw_impl_port_mix *mix;
spa_list_for_each(mix, &this->rt.mix_list, rt_link) { spa_list_for_each(mix, &impl->mix_list, rt_link) {
pw_log_trace_fp("%p: reuse buffer %d %d", this, port_id, buffer_id); pw_log_trace_fp("%p: reuse buffer %d %d", impl, port_id, buffer_id);
/* FIXME send reuse buffer to peer */ /* FIXME send reuse buffer to peer */
break; break;
} }
@ -183,6 +243,7 @@ static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t bu
static const struct spa_node_methods schedule_mix_node = { static const struct spa_node_methods schedule_mix_node = {
SPA_VERSION_NODE_METHODS, SPA_VERSION_NODE_METHODS,
.process = schedule_mix_input, .process = schedule_mix_input,
.port_set_io = port_set_io,
.port_reuse_buffer = schedule_mix_reuse_buffer, .port_reuse_buffer = schedule_mix_reuse_buffer,
}; };
@ -472,8 +533,10 @@ struct pw_impl_port *pw_context_create_port(
spa_list_init(&impl->param_list); spa_list_init(&impl->param_list);
spa_list_init(&impl->pending_list); spa_list_init(&impl->pending_list);
impl->cache_params = true; impl->cache_params = true;
spa_list_init(&impl->mix_list);
this = &impl->this; this = &impl->this;
pw_log_debug("%p: new %s %d", this, pw_log_debug("%p: new %s %d", this,
pw_direction_as_string(direction), port_id); pw_direction_as_string(direction), port_id);
@ -512,7 +575,6 @@ struct pw_impl_port *pw_context_create_port(
spa_list_init(&this->links); spa_list_init(&this->links);
spa_list_init(&this->mix_list); spa_list_init(&this->mix_list);
spa_list_init(&this->rt.mix_list);
spa_list_init(&this->control_list[0]); spa_list_init(&this->control_list[0]);
spa_list_init(&this->control_list[1]); spa_list_init(&this->control_list[1]);

View file

@ -911,8 +911,6 @@ struct pw_impl_port {
struct { struct {
struct spa_io_buffers io; /**< io area of the port */ struct spa_io_buffers io; /**< io area of the port */
struct spa_io_clock clock; /**< io area of the clock */
struct spa_list mix_list;
struct spa_list node_link; struct spa_list node_link;
} rt; /**< data only accessed from the data thread */ } rt; /**< data only accessed from the data thread */
unsigned int added:1; unsigned int added:1;