mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-04 13:30:12 -05:00
Add support for client fd memory
Remove the node buffers reply again. We don't need it. Instead add a new method to the client-node to upload an array of buffer datas. This method is called after the client has allocated buffer mem. It will update the buffers on the server side with the client allocated memory. Wait for the async reply of use_buffers when doing alloc_buffers so that we can get the updated buffer mem before we continue. Let the link follow the states of the ports. Add some error code to the port error states. Add PW_STREAM_FLAG_ALLOC_BUFFERS flag to make the client alloc buffer memory.
This commit is contained in:
parent
deb6c52f76
commit
e76a7abceb
18 changed files with 375 additions and 147 deletions
|
|
@ -786,6 +786,9 @@ do_port_use_buffers(struct impl *impl,
|
|||
|
||||
memcpy(&b->buffer.datas[j], d, sizeof(struct spa_data));
|
||||
|
||||
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
|
||||
continue;
|
||||
|
||||
if (d->type == SPA_DATA_DmaBuf ||
|
||||
d->type == SPA_DATA_MemFd) {
|
||||
uint32_t flags = PW_MEMBLOCK_FLAG_DONT_CLOSE;
|
||||
|
|
@ -974,6 +977,57 @@ static int client_node_event(void *data, const struct spa_event *event)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int client_node_port_buffers(void *data,
|
||||
enum spa_direction direction,
|
||||
uint32_t port_id,
|
||||
uint32_t mix_id,
|
||||
uint32_t n_buffers,
|
||||
struct spa_buffer **buffers)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct node *this = &impl->node;
|
||||
struct port *p;
|
||||
struct mix *mix;
|
||||
uint32_t i, j;
|
||||
|
||||
spa_log_debug(this->log, NAME " %p: %s port %d.%d buffers %p %u", impl,
|
||||
direction == SPA_DIRECTION_INPUT ? "input" : "output",
|
||||
port_id, mix_id, buffers, n_buffers);
|
||||
|
||||
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
|
||||
|
||||
p = GET_PORT(this, direction, port_id);
|
||||
if (!p->have_format)
|
||||
return -EIO;
|
||||
|
||||
if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid)
|
||||
return -EINVAL;
|
||||
|
||||
|
||||
for (i = 0; i < n_buffers; i++) {
|
||||
struct spa_buffer *oldbuf, *newbuf;
|
||||
|
||||
oldbuf = mix->buffers[i].outbuf;
|
||||
newbuf = buffers[i];
|
||||
|
||||
spa_log_debug(this->log, "buffer %d n_datas:%d", i, newbuf->n_datas);
|
||||
|
||||
if (oldbuf->n_datas != newbuf->n_datas)
|
||||
return -EINVAL;
|
||||
|
||||
for (j = 0; j < newbuf->n_datas; j++) {
|
||||
oldbuf->datas[j] = newbuf->datas[j];
|
||||
|
||||
spa_log_debug(this->log, " data %d type:%d fd:%d", j,
|
||||
newbuf->datas[j].type,
|
||||
(int) newbuf->datas[j].fd);
|
||||
}
|
||||
}
|
||||
mix->n_buffers = n_buffers;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct pw_client_node_proxy_methods client_node_methods = {
|
||||
PW_VERSION_CLIENT_NODE_PROXY_METHODS,
|
||||
.get_node = client_node_get_node,
|
||||
|
|
@ -981,6 +1035,7 @@ static struct pw_client_node_proxy_methods client_node_methods = {
|
|||
.port_update = client_node_port_update,
|
||||
.set_active = client_node_set_active,
|
||||
.event = client_node_event,
|
||||
.port_buffers = client_node_port_buffers,
|
||||
};
|
||||
|
||||
static void node_on_data_fd_events(struct spa_source *source)
|
||||
|
|
|
|||
|
|
@ -262,6 +262,49 @@ static int client_node_marshal_event_method(void *object, const struct spa_event
|
|||
return pw_protocol_native_end_proxy(proxy, b);
|
||||
}
|
||||
|
||||
static int
|
||||
client_node_marshal_port_buffers(void *object,
|
||||
enum spa_direction direction,
|
||||
uint32_t port_id,
|
||||
uint32_t mix_id,
|
||||
uint32_t n_buffers,
|
||||
struct spa_buffer **buffers)
|
||||
{
|
||||
struct pw_proxy *proxy = object;
|
||||
struct spa_pod_builder *b;
|
||||
struct spa_pod_frame f[2];
|
||||
uint32_t i, j;
|
||||
|
||||
b = pw_protocol_native_begin_proxy(proxy, PW_CLIENT_NODE_PROXY_METHOD_PORT_BUFFERS, NULL);
|
||||
|
||||
spa_pod_builder_push_struct(b, &f[0]);
|
||||
spa_pod_builder_add(b,
|
||||
SPA_POD_Int(direction),
|
||||
SPA_POD_Int(port_id),
|
||||
SPA_POD_Int(mix_id),
|
||||
SPA_POD_Int(n_buffers), NULL);
|
||||
|
||||
for (i = 0; i < n_buffers; i++) {
|
||||
struct spa_buffer *buf = buffers[i];
|
||||
|
||||
spa_pod_builder_add(b,
|
||||
SPA_POD_Int(buf->n_datas), NULL);
|
||||
|
||||
for (j = 0; j < buf->n_datas; j++) {
|
||||
struct spa_data *d = &buf->datas[j];
|
||||
spa_pod_builder_add(b,
|
||||
SPA_POD_Id(d->type),
|
||||
SPA_POD_Int(pw_protocol_native_add_proxy_fd(proxy, d->fd)),
|
||||
SPA_POD_Int(d->flags),
|
||||
SPA_POD_Int(d->mapoffset),
|
||||
SPA_POD_Int(d->maxsize), NULL);
|
||||
}
|
||||
}
|
||||
spa_pod_builder_pop(b, &f[0]);
|
||||
|
||||
return pw_protocol_native_end_proxy(proxy, b);
|
||||
}
|
||||
|
||||
static int client_node_demarshal_transport(void *object, const struct pw_protocol_native_message *msg)
|
||||
{
|
||||
struct pw_proxy *proxy = object;
|
||||
|
|
@ -687,11 +730,11 @@ client_node_marshal_port_use_buffers(void *object,
|
|||
|
||||
spa_pod_builder_push_struct(b, &f);
|
||||
spa_pod_builder_add(b,
|
||||
SPA_POD_Int(direction),
|
||||
SPA_POD_Int(port_id),
|
||||
SPA_POD_Int(mix_id),
|
||||
SPA_POD_Int(flags),
|
||||
SPA_POD_Int(n_buffers), NULL);
|
||||
SPA_POD_Int(direction),
|
||||
SPA_POD_Int(port_id),
|
||||
SPA_POD_Int(mix_id),
|
||||
SPA_POD_Int(flags),
|
||||
SPA_POD_Int(n_buffers), NULL);
|
||||
|
||||
for (i = 0; i < n_buffers; i++) {
|
||||
struct spa_buffer *buf = buffers[i].buffer;
|
||||
|
|
@ -997,6 +1040,56 @@ static int client_node_demarshal_event_method(void *object, const struct pw_prot
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int client_node_demarshal_port_buffers(void *object, const struct pw_protocol_native_message *msg)
|
||||
{
|
||||
struct pw_resource *resource = object;
|
||||
struct spa_pod_parser prs;
|
||||
struct spa_pod_frame f;
|
||||
uint32_t i, j, direction, port_id, mix_id, n_buffers, data_id;
|
||||
struct spa_buffer **buffers = NULL;
|
||||
|
||||
spa_pod_parser_init(&prs, msg->data, msg->size);
|
||||
if (spa_pod_parser_push_struct(&prs, &f) < 0 ||
|
||||
spa_pod_parser_get(&prs,
|
||||
SPA_POD_Int(&direction),
|
||||
SPA_POD_Int(&port_id),
|
||||
SPA_POD_Int(&mix_id),
|
||||
SPA_POD_Int(&n_buffers), NULL) < 0)
|
||||
return -EINVAL;
|
||||
|
||||
buffers = alloca(sizeof(struct spa_buffer*) * n_buffers);
|
||||
for (i = 0; i < n_buffers; i++) {
|
||||
struct spa_buffer *buf = buffers[i] = alloca(sizeof(struct spa_buffer));
|
||||
|
||||
buf->n_metas = 0;
|
||||
buf->metas = NULL;
|
||||
|
||||
if (spa_pod_parser_get(&prs,
|
||||
SPA_POD_Int(&buf->n_datas), NULL) < 0)
|
||||
return -EINVAL;
|
||||
|
||||
buf->datas = alloca(sizeof(struct spa_data) * buf->n_datas);
|
||||
for (j = 0; j < buf->n_datas; j++) {
|
||||
struct spa_data *d = &buf->datas[j];
|
||||
|
||||
if (spa_pod_parser_get(&prs,
|
||||
SPA_POD_Id(&d->type),
|
||||
SPA_POD_Int(&data_id),
|
||||
SPA_POD_Int(&d->flags),
|
||||
SPA_POD_Int(&d->mapoffset),
|
||||
SPA_POD_Int(&d->maxsize), NULL) < 0)
|
||||
return -EINVAL;
|
||||
|
||||
d->fd = pw_protocol_native_get_resource_fd(resource, data_id);
|
||||
}
|
||||
}
|
||||
|
||||
pw_resource_notify(resource, struct pw_client_node_proxy_methods, port_buffers, 0,
|
||||
direction, port_id, mix_id, n_buffers, buffers);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const struct pw_client_node_proxy_methods pw_protocol_native_client_node_method_marshal = {
|
||||
PW_VERSION_CLIENT_NODE_PROXY_METHODS,
|
||||
.add_listener = &client_node_marshal_add_listener,
|
||||
|
|
@ -1004,7 +1097,8 @@ static const struct pw_client_node_proxy_methods pw_protocol_native_client_node_
|
|||
.update = &client_node_marshal_update,
|
||||
.port_update = &client_node_marshal_port_update,
|
||||
.set_active = &client_node_marshal_set_active,
|
||||
.event = &client_node_marshal_event_method
|
||||
.event = &client_node_marshal_event_method,
|
||||
.port_buffers = &client_node_marshal_port_buffers
|
||||
};
|
||||
|
||||
static const struct pw_protocol_native_demarshal
|
||||
|
|
@ -1015,7 +1109,8 @@ pw_protocol_native_client_node_method_demarshal[PW_CLIENT_NODE_PROXY_METHOD_NUM]
|
|||
[PW_CLIENT_NODE_PROXY_METHOD_UPDATE] = { &client_node_demarshal_update, 0 },
|
||||
[PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE] = { &client_node_demarshal_port_update, 0 },
|
||||
[PW_CLIENT_NODE_PROXY_METHOD_SET_ACTIVE] = { &client_node_demarshal_set_active, 0 },
|
||||
[PW_CLIENT_NODE_PROXY_METHOD_EVENT] = { &client_node_demarshal_event_method, 0 }
|
||||
[PW_CLIENT_NODE_PROXY_METHOD_EVENT] = { &client_node_demarshal_event_method, 0 },
|
||||
[PW_CLIENT_NODE_PROXY_METHOD_PORT_BUFFERS] = { &client_node_demarshal_port_buffers, 0 }
|
||||
};
|
||||
|
||||
static const struct pw_client_node_proxy_events pw_protocol_native_client_node_event_marshal = {
|
||||
|
|
|
|||
|
|
@ -359,8 +359,7 @@ static int add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_
|
|||
pi.flags = port->spa_flags;
|
||||
pi.rate = SPA_FRACTION(0, 1);
|
||||
pi.props = &port->properties->dict;
|
||||
SPA_FLAG_UNSET(pi.flags,
|
||||
SPA_PORT_FLAG_CAN_ALLOC_BUFFERS | SPA_PORT_FLAG_DYNAMIC_DATA);
|
||||
SPA_FLAG_UNSET(pi.flags, SPA_PORT_FLAG_DYNAMIC_DATA);
|
||||
pi.n_params = port->info.n_params;
|
||||
pi.params = port->info.params;
|
||||
}
|
||||
|
|
@ -667,9 +666,15 @@ client_node_port_use_buffers(void *object,
|
|||
bufs[i] = b;
|
||||
}
|
||||
|
||||
if ((res = pw_port_use_buffers(mix->port, mix->mix_id, flags, bufs, n_buffers)) < 0)
|
||||
if ((res = pw_port_use_buffers(mix->port, mix_id, flags, bufs, n_buffers)) < 0)
|
||||
goto error_exit_cleanup;
|
||||
|
||||
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) {
|
||||
pw_client_node_proxy_port_buffers(data->node_proxy,
|
||||
direction, port_id, mix_id,
|
||||
n_buffers,
|
||||
bufs);
|
||||
}
|
||||
return res;
|
||||
|
||||
error_exit_cleanup:
|
||||
|
|
@ -968,7 +973,6 @@ static void node_active_changed(void *data, bool active)
|
|||
pw_client_node_proxy_set_active(d->node_proxy, active);
|
||||
}
|
||||
|
||||
|
||||
static const struct pw_node_events node_events = {
|
||||
PW_VERSION_NODE_EVENTS,
|
||||
.destroy = node_destroy,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue