port: select a mixer when we set a format

When the port receives a format, look if we can find a mixer for it
and configure it.

Use the float32 mono mixer when possible.

Use the new pw_buffers in the link.

Let the port allocate buffers between the mixer and node when
requested.

The client-node doesn't need a mixer because mixing is done on the
client.

Remove all mixer and buffer negotiation code from adapter because
it is now done at the port level.
This commit is contained in:
Wim Taymans 2019-10-02 18:07:10 +02:00
parent 4a47bf4706
commit 5cfb6634f4
7 changed files with 211 additions and 431 deletions

View file

@ -92,7 +92,6 @@ test('pw-test-protocol-native',
pipewire_module_adapter = shared_library('pipewire-module-adapter',
[ 'module-adapter.c',
'module-adapter/adapter.c',
'module-adapter/floatmix.c',
'spa/spa-node.c' ],
c_args : pipewire_module_c_args,
include_directories : [configinc, spa_inc],

View file

@ -49,32 +49,12 @@
#define PORT_BUFFERS 1
#define MAX_BUFFER_SIZE 2048
extern const struct spa_handle_factory spa_floatmix_factory;
struct buffer {
struct spa_buffer buf;
struct spa_data datas[1];
struct spa_chunk chunk[1];
};
struct node;
struct port {
struct spa_list link;
struct pw_port *port;
struct node *node;
struct buffer buffers[PORT_BUFFERS];
struct spa_buffer *bufs[PORT_BUFFERS];
struct spa_handle *spa_handle;
struct spa_node *spa_node;
float empty[MAX_BUFFER_SIZE + 15];
};
struct node {
struct pw_core *core;
@ -94,103 +74,19 @@ struct node {
};
/** \endcond */
static void init_buffer(struct port *port, uint32_t id)
{
struct buffer *b = &port->buffers[id];
b->buf.n_metas = 0;
b->buf.metas = NULL;
b->buf.n_datas = 1;
b->buf.datas = b->datas;
b->datas[0].type = SPA_DATA_MemPtr;
b->datas[0].flags = SPA_DATA_FLAG_DYNAMIC;
b->datas[0].fd = -1;
b->datas[0].mapoffset = 0;
b->datas[0].maxsize = SPA_ROUND_DOWN_N(sizeof(port->empty), 16);
b->datas[0].data = SPA_PTR_ALIGN(port->empty, 16, void);
b->datas[0].chunk = b->chunk;
b->datas[0].chunk->offset = 0;
b->datas[0].chunk->size = 0;
b->datas[0].chunk->stride = 0;
port->bufs[id] = &b->buf;
memset(port->empty, 0, sizeof(port->empty));
pw_log_debug("%p %d", b->datas[0].data, b->datas[0].maxsize);
}
static void init_port(struct port *p, enum spa_direction direction)
{
int i;
for (i = 0; i < PORT_BUFFERS; i++)
init_buffer(p, i);
}
static int port_use_buffers(void *data,
uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
struct port *p = data;
struct pw_port *port = p->port;
struct pw_node *node = port->node;
int res, i;
pw_log_debug(NAME " %p: port %p %p %d", p->node->node, port, buffers, n_buffers);
if (n_buffers > 0) {
for (i = 0; i < PORT_BUFFERS; i++)
init_buffer(p, i);
n_buffers = PORT_BUFFERS;
buffers = p->bufs;
}
res = spa_node_port_use_buffers(port->mix,
pw_direction_reverse(port->direction), 0,
flags,
buffers, n_buffers);
res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
flags,
buffers, n_buffers);
return res;
}
static const struct pw_port_implementation port_implementation = {
.use_buffers = port_use_buffers,
};
static void node_destroy(void *data)
{
struct node *n = data;
struct port *p;
spa_list_for_each(p, &n->ports, link)
pw_port_set_mix(p->port, NULL, 0);
}
static void node_free(void *data)
{
struct node *n = data;
struct port *p;
spa_list_consume(p, &n->ports, link) {
spa_list_remove(&p->link);
spa_handle_clear(p->spa_handle);
free(p);
}
pw_properties_free(n->props);
}
static void node_port_init(void *data, struct pw_port *port)
{
struct node *n = data;
struct port *p;
const struct pw_properties *old;
enum pw_direction direction;
struct pw_properties *new;
const char *str, *path, *node_name, *media_class;
void *iface;
const struct spa_support *support;
uint32_t n_support;
char position[8], *prefix;
bool is_monitor, is_device;
@ -251,38 +147,10 @@ static void node_port_init(void *data, struct pw_port *port)
pw_port_update_properties(port, &new->dict);
pw_properties_free(new);
if (direction != n->direction)
return;
if (n->media_type == SPA_MEDIA_TYPE_audio) {
p = calloc(1, sizeof(struct port) +
spa_handle_factory_get_size(&spa_floatmix_factory, NULL));
p->node = n;
p->port = port;
init_port(p, direction);
p->spa_handle = SPA_MEMBER(p, sizeof(struct port), struct spa_handle);
support = pw_core_get_support(n->core, &n_support);
spa_handle_factory_init(&spa_floatmix_factory,
p->spa_handle, NULL,
support, n_support);
spa_handle_get_interface(p->spa_handle, SPA_TYPE_INTERFACE_Node, &iface);
p->spa_node = iface;
if (direction == PW_DIRECTION_INPUT) {
pw_log_debug("mix node %p", p->spa_node);
pw_port_set_mix(port, p->spa_node, PW_PORT_MIX_FLAG_MULTI);
port->impl = SPA_CALLBACKS_INIT(&port_implementation, p);
}
spa_list_append(&n->ports, &p->link);
}
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = node_destroy,
.free = node_free,
.port_init = node_port_init,
};

View file

@ -1379,13 +1379,7 @@ impl_mix_port_set_param(void *object,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
struct port *port = object;
if (port->direction != direction)
return -ENOTSUP;
return impl_node_port_set_param(&port->node->node, direction, port->id,
id, flags, param);
return -ENOTSUP;
}
static int
@ -1507,6 +1501,8 @@ static void node_port_added(void *data, struct pw_port *port)
PW_PORT_MIX_FLAG_MULTI |
PW_PORT_MIX_FLAG_MIX_ONLY);
port->flags |= PW_PORT_FLAG_NO_MIXER;
port->impl = SPA_CALLBACKS_INIT(&port_impl, p);
port->owner_data = impl;
}

View file

@ -750,8 +750,7 @@ client_node_port_set_io(void *object,
}
if ((res = spa_node_port_set_io(mix->port->mix,
direction, mix_id,
id, ptr, size)) < 0) {
direction, mix_id, id, ptr, size)) < 0) {
if (res == -ENOTSUP)
res = 0;
else

View file

@ -368,154 +368,6 @@ error:
return res;
}
static struct spa_pod *find_param(struct spa_pod **params, uint32_t n_params, uint32_t type)
{
uint32_t i;
for (i = 0; i < n_params; i++) {
if (spa_pod_is_object_type(params[i], type))
return params[i];
}
return NULL;
}
/* Allocate an array of buffers that can be shared */
static int alloc_buffers(struct pw_link *this,
uint32_t n_buffers,
uint32_t n_params,
struct spa_pod **params,
uint32_t n_datas,
uint32_t *data_sizes,
int32_t *data_strides,
uint32_t *data_aligns,
uint32_t flags,
struct allocation *allocation)
{
struct spa_buffer **buffers, *bp;
uint32_t i;
uint32_t n_metas;
struct spa_meta *metas;
struct spa_data *datas;
struct pw_memblock *m;
struct spa_buffer_alloc_info info = { 0, };
n_metas = 0;
metas = alloca(sizeof(struct spa_meta) * n_params);
datas = alloca(sizeof(struct spa_data) * n_datas);
/* collect metadata */
for (i = 0; i < n_params; i++) {
if (spa_pod_is_object_type (params[i], SPA_TYPE_OBJECT_ParamMeta)) {
uint32_t type, size;
if (spa_pod_parse_object(params[i],
SPA_TYPE_OBJECT_ParamMeta, NULL,
SPA_PARAM_META_type, SPA_POD_Id(&type),
SPA_PARAM_META_size, SPA_POD_Int(&size)) < 0)
continue;
pw_log_debug(NAME" %p: enable meta %d %d", this, type, size);
metas[n_metas].type = type;
metas[n_metas].size = size;
n_metas++;
}
}
for (i = 0; i < n_datas; i++) {
struct spa_data *d = &datas[i];
spa_zero(*d);
if (data_sizes[i] > 0) {
d->type = SPA_DATA_MemPtr;
d->maxsize = data_sizes[i];
SPA_FLAG_SET(d->flags, SPA_DATA_FLAG_READWRITE);
} else {
d->type = SPA_ID_INVALID;
d->maxsize = 0;
}
if (SPA_FLAG_CHECK(flags, SPA_PORT_FLAG_DYNAMIC_DATA))
SPA_FLAG_SET(d->flags, SPA_DATA_FLAG_DYNAMIC);
}
spa_buffer_alloc_fill_info(&info, n_metas, metas, n_datas, datas, data_aligns);
buffers = calloc(n_buffers, info.skel_size + sizeof(struct spa_buffer *));
if (buffers == NULL)
return -errno;
/* pointer to buffer structures */
bp = SPA_MEMBER(buffers, n_buffers * sizeof(struct spa_buffer *), struct spa_buffer);
m = pw_mempool_alloc(this->core->pool,
PW_MEMBLOCK_FLAG_READWRITE |
PW_MEMBLOCK_FLAG_SEAL |
PW_MEMBLOCK_FLAG_MAP,
SPA_DATA_MemFd,
n_buffers * info.mem_size);
if (m == NULL)
return -errno;
pw_log_debug(NAME" %p: layout buffers %p data %p", this, bp, m->map->ptr);
spa_buffer_alloc_layout_array(&info, n_buffers, buffers, bp, m->map->ptr);
allocation->mem = m;
allocation->n_buffers = n_buffers;
allocation->buffers = buffers;
return 0;
}
static int
param_filter(struct pw_link *this,
struct pw_port *in_port,
struct pw_port *out_port,
uint32_t id,
struct spa_pod_builder *result)
{
uint8_t ibuf[4096];
struct spa_pod_builder ib = { 0 };
struct spa_pod *oparam, *iparam;
uint32_t iidx, oidx, num = 0;
int res;
for (iidx = 0;;) {
spa_pod_builder_init(&ib, ibuf, sizeof(ibuf));
pw_log_debug(NAME" %p: input param %d id:%d", this, iidx, id);
if ((res = spa_node_port_enum_params_sync(in_port->node->node,
in_port->direction, in_port->port_id,
id, &iidx, NULL, &iparam, &ib)) < 0)
break;
if (res != 1) {
if (num > 0)
break;
iparam = NULL;
}
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG) && iparam != NULL)
spa_debug_pod(2, NULL, iparam);
for (oidx = 0;;) {
pw_log_debug(NAME" %p: output param %d id:%d", this, oidx, id);
if (spa_node_port_enum_params_sync(out_port->node->node,
out_port->direction, out_port->port_id,
id, &oidx, iparam, &oparam, result) != 1) {
break;
}
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(2, NULL, oparam);
num++;
}
if (iparam == NULL && num == 0)
break;
}
return num;
}
static int port_set_io(struct pw_link *this, struct pw_port *port, uint32_t id,
void *data, size_t size, struct pw_port_mix *mix)
{
@ -565,7 +417,7 @@ static int do_allocation(struct pw_link *this)
uint32_t in_flags, out_flags;
char *error = NULL;
struct pw_port *input, *output;
struct allocation allocation = { NULL, };
struct pw_buffers allocation = { NULL, };
if (this->info.state > PW_LINK_STATE_ALLOCATING)
return 0;
@ -594,87 +446,27 @@ static int do_allocation(struct pw_link *this)
output->allocation.n_buffers, output->allocation.buffers);
this->rt.out_mix.have_buffers = true;
} else {
struct spa_pod **params, *param;
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
uint32_t i, offset, n_params, flags;
uint32_t max_buffers;
size_t minsize = 8192, stride = 0, align;
uint32_t data_sizes[1];
int32_t data_strides[1];
uint32_t data_aligns[1];
uint32_t flags, alloc_flags;
n_params = param_filter(this, input, output, SPA_PARAM_Buffers, &b);
n_params += param_filter(this, input, output, SPA_PARAM_Meta, &b);
params = alloca(n_params * sizeof(struct spa_pod *));
for (i = 0, offset = 0; i < n_params; i++) {
params[i] = SPA_MEMBER(buffer, offset, struct spa_pod);
spa_pod_fixate(params[i]);
pw_log_debug(NAME" %p: fixated param %d:", this, i);
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(2, NULL, params[i]);
offset += SPA_ROUND_UP_N(SPA_POD_SIZE(params[i]), 8);
}
max_buffers = MAX_BUFFERS;
minsize = stride = 0;
align = 8;
param = find_param(params, n_params, SPA_TYPE_OBJECT_ParamBuffers);
if (param) {
uint32_t qmax_buffers = max_buffers,
qminsize = minsize, qstride = stride, qalign = align;
spa_pod_parse_object(param,
SPA_TYPE_OBJECT_ParamBuffers, NULL,
SPA_PARAM_BUFFERS_buffers, SPA_POD_Int(&qmax_buffers),
SPA_PARAM_BUFFERS_size, SPA_POD_Int(&qminsize),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(&qstride),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(&qalign));
max_buffers =
qmax_buffers == 0 ? max_buffers : SPA_MIN(qmax_buffers,
max_buffers);
minsize = SPA_MAX(minsize, qminsize);
stride = SPA_MAX(stride, qstride);
align = SPA_MAX(align, qalign);
pw_log_debug(NAME" %p: %d %d %d %d -> %zd %zd %d %zd", this,
qminsize, qstride, qmax_buffers, qalign,
minsize, stride, max_buffers, align);
} else {
pw_log_warn(NAME" %p: no buffers param", this);
minsize = 8192;
max_buffers = 4;
}
/* when the output can allocate buffer memory, set the minsize to
* 0 to make sure we don't allocate memory in the shared memory */
flags = 0;
if ((out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS)) {
minsize = 0;
/* always shared buffers for the link */
alloc_flags = PW_BUFFERS_FLAG_SHARED;
/* if output port can alloc buffers, alloc skeleton buffers */
if (SPA_FLAG_IS_SET(out_flags, SPA_PORT_FLAG_CAN_ALLOC_BUFFERS)) {
SPA_FLAG_SET(alloc_flags, PW_BUFFERS_FLAG_NO_MEM);
flags |= SPA_NODE_BUFFERS_FLAG_ALLOC;
}
data_sizes[0] = minsize;
data_strides[0] = stride;
data_aligns[0] = align;
if ((res = alloc_buffers(this,
max_buffers,
n_params,
params,
1,
data_sizes, data_strides,
data_aligns,
out_flags & in_flags,
&allocation)) < 0) {
asprintf(&error, "error alloc buffers: %d", res);
if ((res = pw_buffers_negotiate(this->core, alloc_flags,
output->node->node, output->port_id,
input->node->node, input->port_id,
&allocation)) < 0) {
asprintf(&error, "error alloc buffers: %s", spa_strerror(res));
goto error;
}
pw_log_debug(NAME" %p: allocating %d buffers %p %zd %zd", this,
allocation.n_buffers, allocation.buffers, minsize, stride);
pw_log_debug(NAME" %p: allocating %d buffers %p", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(output, &this->rt.out_mix,
flags, allocation.buffers, allocation.n_buffers)) < 0) {
@ -682,7 +474,7 @@ static int do_allocation(struct pw_link *this)
spa_strerror(res));
goto error;
}
move_allocation(&allocation, &output->allocation);
pw_buffers_move(&output->allocation, &allocation);
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res),
@ -716,7 +508,7 @@ static int do_allocation(struct pw_link *this)
return 0;
error:
free_allocation(&output->allocation);
pw_buffers_clear(&output->allocation);
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
return res;
}

View file

@ -27,6 +27,7 @@
#include <errno.h>
#include <spa/pod/parser.h>
#include <spa/param/audio/format-utils.h>
#include <spa/node/utils.h>
#include <spa/debug/types.h>
@ -38,6 +39,8 @@
#define NAME "port"
extern const struct spa_handle_factory spa_floatmix_factory;
/** \cond */
struct impl {
struct pw_port this;
@ -446,14 +449,100 @@ SPA_EXPORT
int pw_port_set_mix(struct pw_port *port, struct spa_node *node, uint32_t flags)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
struct pw_port_mix *mix;
if (node == NULL) {
node = &impl->mix_node;
flags = 0;
}
pw_log_debug(NAME" %p: mix node %p->%p", port, port->mix, node);
port->mix = node;
if (port->mix != NULL && port->mix != node) {
spa_list_for_each(mix, &port->mix_list, link)
spa_node_remove_port(port->mix, mix->port.direction, mix->port.port_id);
spa_node_port_set_io(port->mix,
pw_direction_reverse(port->direction), 0,
SPA_IO_Buffers, NULL, 0);
}
if (port->mix_handle != NULL) {
spa_handle_clear(port->mix_handle);
free(port->mix_handle);
port->mix_handle = NULL;
}
port->mix_flags = flags;
port->mix = node;
if (port->mix) {
spa_list_for_each(mix, &port->mix_list, link)
spa_node_add_port(port->mix, mix->port.direction, mix->port.port_id, NULL);
spa_node_port_set_io(port->mix,
pw_direction_reverse(port->direction), 0,
SPA_IO_Buffers,
&port->rt.io, sizeof(port->rt.io));
}
return 0;
}
static int setup_mixer(struct pw_port *port, const struct spa_pod *param)
{
uint32_t media_type, media_subtype;
int res;
const struct spa_handle_factory *factory = NULL;
struct spa_handle *handle;
const struct spa_support *support;
uint32_t n_support;
void *iface;
if ((res = spa_format_parse(param, &media_type, &media_subtype)) < 0)
return res;
pw_log_debug(NAME" %p: %s/%s", port,
spa_debug_type_find_name(spa_type_media_type, media_type),
spa_debug_type_find_name(spa_type_media_subtype, media_subtype));
switch (media_type) {
case SPA_MEDIA_TYPE_audio:
switch (media_subtype) {
case SPA_MEDIA_SUBTYPE_raw:
{
struct spa_audio_info_raw info;
if ((res = spa_format_audio_raw_parse(param, &info)) < 0)
return res;
if (info.format != SPA_AUDIO_FORMAT_F32P || info.channels != 1)
return -ENOTSUP;
factory = &spa_floatmix_factory;
break;
}
default:
return -ENOTSUP;
}
break;
default:
return -ENOTSUP;
}
if (factory == NULL)
return -EIO;
handle = calloc(1, spa_handle_factory_get_size(factory, NULL));
support = pw_core_get_support(port->node->core, &n_support);
spa_handle_factory_init(factory, handle, NULL, support, n_support);
spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_Node, &iface);
pw_log_debug("mix node %p", iface);
pw_port_set_mix(port, (struct spa_node*)iface,
PW_PORT_MIX_FLAG_MULTI |
PW_PORT_MIX_FLAG_NEGOTIATE);
port->mix_handle = handle;
return 0;
}
@ -863,6 +952,9 @@ static void pw_port_remove(struct pw_port *port)
pw_map_remove(&node->output_port_map, port->port_id);
node->info.n_output_ports--;
}
pw_port_set_mix(port, NULL, 0);
spa_list_remove(&port->link);
pw_node_emit_port_removed(node, port);
port->node = NULL;
@ -892,7 +984,7 @@ void pw_port_destroy(struct pw_port *port)
pw_log_debug(NAME" %p: free", port);
pw_port_emit_free(port);
free_allocation(&port->allocation);
pw_buffers_clear(&port->allocation);
free((void*)port->error);
pw_map_clear(&port->mix_port_map);
@ -1035,15 +1127,30 @@ SPA_EXPORT
int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
int res = 0;
int res;
struct pw_node *node = port->node;
pw_log_debug(NAME" %p: %d set param %d %p", port, port->state, id, param);
/* set parameter on node */
res = spa_node_port_set_param(node->node,
port->direction, port->port_id,
id, flags, param);
pw_log_debug(NAME" %p: %d set param on node %d:%d %s: %d (%s)", port, port->state,
port->direction, port->port_id,
spa_debug_type_find_name(spa_type_param, id), res, spa_strerror(res));
/* set the parameters on all ports of the mixer node if possible */
{
if (res >= 0) {
struct pw_port_mix *mix;
if (port->direction == PW_DIRECTION_INPUT &&
id == SPA_PARAM_Format && param != NULL &&
!SPA_FLAG_IS_SET(port->flags, PW_PORT_FLAG_NO_MIXER)) {
setup_mixer(port, param);
}
spa_list_for_each(mix, &port->mix_list, link) {
spa_node_port_set_param(port->mix,
mix->port.direction, mix->port.port_id,
@ -1054,20 +1161,11 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
id, flags, param);
}
/* then set parameter on node */
res = spa_node_port_set_param(node->node,
port->direction, port->port_id,
id, flags, param);
pw_log_debug(NAME" %p: %d set param on node %d:%d %s: %d (%s)", port, port->state,
port->direction, port->port_id,
spa_debug_type_find_name(spa_type_param, id), res, spa_strerror(res));
if (id == SPA_PARAM_Format) {
pw_log_debug(NAME" %p: %d %p %d", port, port->state, param, res);
/* setting the format always destroys the negotiated buffers */
free_allocation(&port->allocation);
pw_buffers_clear(&port->allocation);
if (param == NULL || res < 0) {
pw_port_update_state(port, PW_PORT_STATE_CONFIGURE, NULL);
@ -1079,12 +1177,61 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
return res;
}
static int negotiate_mixer_buffers(struct pw_port *port, uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res;
struct pw_node *node = port->node;
if (SPA_FLAG_IS_SET(port->mix_flags, PW_PORT_MIX_FLAG_MIX_ONLY))
return 0;
if (SPA_FLAG_IS_SET(port->mix_flags, PW_PORT_MIX_FLAG_NEGOTIATE)) {
struct pw_buffers allocation = { NULL, };
int alloc_flags;
/* try dynamic data */
alloc_flags = PW_BUFFERS_FLAG_DYNAMIC;
pw_log_debug(NAME" %p: %d.%d negotiate buffers on node: %p",
port, port->direction, port->port_id, node->node);
if ((res = pw_buffers_negotiate(node->core, alloc_flags,
port->mix, 0,
node->node, port->port_id,
&allocation)) < 0) {
pw_log_warn(NAME" %p: can't negotiate buffers: %s",
port, spa_strerror(res));
return res;
}
pw_buffers_clear(&port->mix_buffers);
pw_buffers_move(&port->mix_buffers, &allocation);
buffers = port->mix_buffers.buffers;
n_buffers = port->mix_buffers.n_buffers;
flags = 0;
}
pw_log_debug(NAME" %p: %d.%d use buffers on node: %p",
port, port->direction, port->port_id, node->node);
res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
flags, buffers, n_buffers);
if (SPA_RESULT_IS_OK(res)) {
spa_node_port_use_buffers(port->mix,
pw_direction_reverse(port->direction), 0,
0, buffers, n_buffers);
}
return res;
}
SPA_EXPORT
int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res = 0, res2;
struct pw_node *node = port->node;
pw_log_debug(NAME" %p: %d:%d.%d: %d buffers state:%d n_mix:%d", port,
port->direction, port->port_id, mix->id,
@ -1100,37 +1247,36 @@ int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t
if (port->n_mix == 1)
pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
}
/* first negotiate with the node, this makes it possible to let the
* node allocate buffer memory if needed */
if (port->state == PW_PORT_STATE_READY) {
if (!SPA_FLAG_CHECK(port->mix_flags, PW_PORT_MIX_FLAG_MIX_ONLY)) {
pw_log_debug(NAME" %p: use buffers on node: %p", port,
node->node);
res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
flags, buffers, n_buffers);
if (res < 0) {
pw_log_error(NAME" %p: use buffers on node: %d (%s)",
port, res, spa_strerror(res));
pw_port_update_state(port, PW_PORT_STATE_ERROR,
"can't use buffers on port");
return res;
}
}
if ((res2 = pw_port_call_use_buffers(port, flags, buffers, n_buffers)) < 0) {
pw_log_warn(NAME" %p: implementation alloc failed: %d (%s)",
port, res2, spa_strerror(res2));
}
if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res))
res = negotiate_mixer_buffers(port, flags, buffers, n_buffers);
if (res < 0) {
pw_log_error(NAME" %p: negotiate buffers on node: %d (%s)",
port, res, spa_strerror(res));
pw_port_update_state(port, PW_PORT_STATE_ERROR,
"can't negotiate buffers on port");
} else if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) {
pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
}
}
if ((res2 = spa_node_port_use_buffers(port->mix,
mix->port.direction, mix->port.port_id, flags,
buffers, n_buffers)) < 0) {
if (res2 != -ENOTSUP)
/* then use the buffers on the mixer */
flags &= ~SPA_NODE_BUFFERS_FLAG_ALLOC;
res2 = spa_node_port_use_buffers(port->mix,
mix->port.direction, mix->port.port_id, flags,
buffers, n_buffers);
if (res2 < 0) {
if (res2 != -ENOTSUP) {
pw_log_warn(NAME" %p: mix use buffers failed: %d (%s)",
port, res2, spa_strerror(res2));
return res2;
}
}
if (SPA_RESULT_IS_ASYNC(res2))
else if (SPA_RESULT_IS_ASYNC(res2))
res = res2;
return res;

View file

@ -32,6 +32,7 @@ extern "C" {
#include <sys/socket.h>
#include <sys/types.h> /* for pthread_t */
#include "pipewire/buffers.h"
#include "pipewire/map.h"
#include "pipewire/remote.h"
#include "pipewire/mem.h"
@ -258,29 +259,6 @@ struct pw_main_loop {
unsigned int running:1;
};
struct allocation {
struct pw_memblock *mem; /**< allocated buffer memory */
struct spa_buffer **buffers; /**< port buffers */
uint32_t n_buffers; /**< number of port buffers */
};
static inline void move_allocation(struct allocation *alloc, struct allocation *dest)
{
*dest = *alloc;
alloc->mem = NULL;
}
static inline void free_allocation(struct allocation *alloc)
{
if (alloc->mem) {
pw_memblock_unref(alloc->mem);
free(alloc->buffers);
}
alloc->mem = NULL;
alloc->buffers = NULL;
alloc->n_buffers = 0;
}
#define pw_device_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_device_events, m, v, ##__VA_ARGS__)
#define pw_device_emit_destroy(m) pw_device_emit(m, destroy, 0)
#define pw_device_emit_free(m) pw_device_emit(m, free, 0)
@ -535,7 +513,6 @@ struct pw_port_implementation {
int (*init_mix) (void *data, struct pw_port_mix *mix);
int (*release_mix) (void *data, struct pw_port_mix *mix);
int (*use_buffers) (void *data, uint32_t flags, struct spa_buffer **buffers, uint32_t n_buffers);
};
#define pw_port_call(p,m,v,...) \
@ -549,7 +526,6 @@ struct pw_port_implementation {
#define pw_port_call_init_mix(p,m) pw_port_call(p,init_mix,0,m)
#define pw_port_call_release_mix(p,m) pw_port_call(p,release_mix,0,m)
#define pw_port_call_use_buffers(p,f,b,n) pw_port_call(p,use_buffers,0,f,b,n)
#define pw_port_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_port_events, m, v, ##__VA_ARGS__)
#define pw_port_emit_destroy(p) pw_port_emit(p, destroy, 0)
@ -575,6 +551,7 @@ struct pw_port {
* implementation when destroyed */
#define PW_PORT_FLAG_BUFFERS (1<<1) /**< port has data */
#define PW_PORT_FLAG_CONTROL (1<<2) /**< port has control */
#define PW_PORT_FLAG_NO_MIXER (1<<3) /**< don't try to add mixer to port */
uint32_t flags;
uint32_t spa_flags;
@ -588,7 +565,7 @@ struct pw_port {
struct pw_port_info info;
struct spa_param_info params[MAX_PARAMS];
struct allocation allocation;
struct pw_buffers allocation;
struct spa_list links; /**< list of \ref pw_link */
@ -601,7 +578,10 @@ struct pw_port {
struct spa_node *mix; /**< port buffer mix/split */
#define PW_PORT_MIX_FLAG_MULTI (1<<0) /**< multi input or output */
#define PW_PORT_MIX_FLAG_MIX_ONLY (1<<1) /**< only negotiate mix ports */
#define PW_PORT_MIX_FLAG_NEGOTIATE (1<<2) /**< negotiate buffers */
uint32_t mix_flags; /**< flags for the mixing */
struct spa_handle *mix_handle; /**< mix plugin handle */
struct pw_buffers mix_buffers; /**< buffers between mixer and node */
struct spa_list mix_list; /**< list of \ref pw_port_mix */
struct pw_map mix_port_map; /**< map from port_id from mixer */