interfaces: improve remote API

Add return values to events and method callbacks. This makes it
possible to pass any error or async return value.
Add sync/done/error in both directions so that both proxy and resource
and perform/reply sync and produce errors.
Return a SPA_ASYNC from remote method calls (and events), keep the
sequence number in the connection.
With the core sync/done we can remove the client-node done method and
it's internal sequence number along with the seq number in method calls.
We can also use the method/event async return value to perform a sync
with as the unique sequence number for this method.
Add sync method and done/error event to proxy and resource.
This commit is contained in:
Wim Taymans 2019-02-18 12:31:36 +01:00
parent 0d8821096a
commit eea062ee53
41 changed files with 1180 additions and 817 deletions

View file

@ -343,9 +343,9 @@ static struct mix *ensure_mix(struct node_data *data,
return mix;
}
static void client_node_add_mem(void *object,
uint32_t mem_id,
uint32_t type, int memfd, uint32_t flags)
static int client_node_add_mem(void *object,
uint32_t mem_id,
uint32_t type, int memfd, uint32_t flags)
{
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
@ -355,7 +355,7 @@ static void client_node_add_mem(void *object,
if (m) {
pw_log_warn("duplicate mem %u, fd %d, flags %d",
mem_id, memfd, flags);
return;
return -EINVAL;
}
m = pw_array_add(&data->mems, sizeof(struct mem));
@ -367,10 +367,11 @@ static void client_node_add_mem(void *object,
m->ref = 0;
m->map.map = PW_MAP_RANGE_INIT;
m->map.ptr = NULL;
return 0;
}
static void client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd)
static int client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd)
{
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
@ -392,14 +393,16 @@ static void client_node_transport(void *object, uint32_t node_id,
pw_client_node_proxy_set_active(data->node_proxy, true);
pw_remote_events_exported(remote, proxy->id, node_id);
return 0;
}
static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_t change_mask)
static int add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_t change_mask)
{
struct node_data *data = proxy->user_data;
struct spa_port_info pi = SPA_PORT_INFO_INIT();
uint32_t n_params = 0;
struct spa_pod **params = NULL;
int res;
if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) {
uint32_t idx1, idx2, id;
@ -446,7 +449,7 @@ static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32
pi.flags &= ~SPA_PORT_FLAG_CAN_ALLOC_BUFFERS;
}
pw_client_node_proxy_port_update(data->node_proxy,
res = pw_client_node_proxy_port_update(data->node_proxy,
port->direction,
port->port_id,
change_mask,
@ -458,17 +461,19 @@ static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32
free(params[--n_params]);
free(params);
}
return res;
}
static void
client_node_set_param(void *object, uint32_t seq, uint32_t id, uint32_t flags,
static int
client_node_set_param(void *object, uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
pw_log_warn("set param not implemented");
return -ENOTSUP;
}
static void
static int
client_node_set_io(void *object,
uint32_t id,
uint32_t memid,
@ -488,12 +493,12 @@ client_node_set_io(void *object,
m = find_mem(data, memid);
if (m == NULL) {
pw_log_warn("unknown memory id %u", memid);
return;
return -EINVAL;
}
ptr = mem_map(data, &m->map, m->fd,
PROT_READ|PROT_WRITE, offset, size);
if (ptr == NULL)
return;
return -errno;
m->ref++;
}
@ -508,12 +513,13 @@ client_node_set_io(void *object,
}
data->position = ptr;
}
spa_node_set_io(data->node->node, id, ptr, size);
return spa_node_set_io(data->node->node, id, ptr, size);
}
static void client_node_event(void *object, const struct spa_event *event)
static int client_node_event(void *object, const struct spa_event *event)
{
pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event));
return -ENOTSUP;
}
static int
@ -527,7 +533,7 @@ do_pause_source(struct spa_loop *loop,
return 0;
}
static void client_node_command(void *object, uint32_t seq, const struct spa_command *command)
static int client_node_command(void *object, const struct spa_command *command)
{
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
@ -536,50 +542,58 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Pause:
pw_log_debug("node %p: pause %d", proxy, seq);
pw_log_debug("node %p: pause", proxy);
if (data->rtsocket_source) {
pw_loop_invoke(data->core->data_loop,
do_pause_source, 1, NULL, 0, true, data);
}
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0)
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0) {
pw_log_warn("node %p: pause failed", proxy);
pw_proxy_error(proxy, res, "pause failed");
}
pw_client_node_proxy_done(data->node_proxy, seq, res);
break;
case SPA_NODE_COMMAND_Start:
pw_log_debug("node %p: start %d", proxy, seq);
pw_log_debug("node %p: start", proxy);
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_RUNNING)) < 0) {
pw_log_warn("node %p: start failed", proxy);
pw_proxy_error(proxy, res, "start failed");
}
else if (data->rtsocket_source) {
pw_loop_update_io(remote->core->data_loop,
data->rtsocket_source,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
}
pw_client_node_proxy_done(data->node_proxy, seq, res);
break;
default:
pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command));
pw_client_node_proxy_done(data->node_proxy, seq, -ENOTSUP);
res = -ENOTSUP;
pw_proxy_error(proxy, res, "command %d not supported", SPA_NODE_COMMAND_ID(command));
}
return res;
}
static void
client_node_add_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id)
static int
client_node_add_port(void *object, enum spa_direction direction, uint32_t port_id)
{
struct pw_proxy *proxy = object;
pw_log_warn("add port not supported");
pw_proxy_error(proxy, -ENOTSUP, "add port not supported");
return -ENOTSUP;
}
static void
client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id)
static int
client_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id)
{
struct pw_proxy *proxy = object;
pw_log_warn("remove port not supported");
pw_proxy_error(proxy, -ENOTSUP, "remove port not supported");
return -ENOTSUP;
}
static void clear_buffers(struct node_data *data, struct mix *mix)
static int clear_buffers(struct node_data *data, struct mix *mix)
{
struct pw_port *port = mix->port;
struct buffer *b;
@ -589,7 +603,7 @@ static void clear_buffers(struct node_data *data, struct mix *mix)
pw_log_debug("port %p: clear buffers %d", port, mix->mix_id);
if ((res = pw_port_use_buffers(port, mix->mix_id, NULL, 0)) < 0) {
pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res));
return;
return res;
}
pw_array_for_each(b, &mix->buffers) {
@ -608,11 +622,11 @@ static void clear_buffers(struct node_data *data, struct mix *mix)
free(b->buf);
}
mix->buffers.size = 0;
return 0;
}
static void
static int
client_node_port_set_param(void *object,
uint32_t seq,
enum spa_direction direction, uint32_t port_id,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
@ -625,6 +639,7 @@ client_node_port_set_param(void *object,
port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL) {
res = -EINVAL;
pw_proxy_error(proxy, res, "unknown port");
goto done;
}
@ -639,20 +654,22 @@ client_node_port_set_param(void *object,
}
res = pw_port_set_param(port, SPA_ID_INVALID, id, flags, param);
if (res < 0)
if (res < 0) {
pw_proxy_error(proxy, res, "can't set port param: %s", spa_strerror(res));
goto done;
}
add_port_update(proxy, port,
if ((res = add_port_update(proxy, port,
PW_CLIENT_NODE_PORT_UPDATE_PARAMS |
PW_CLIENT_NODE_PORT_UPDATE_INFO);
PW_CLIENT_NODE_PORT_UPDATE_INFO)) < 0)
pw_proxy_error(proxy, res, "can't add port update");
done:
pw_client_node_proxy_done(data->node_proxy, seq, res);
return res;
}
static void
static int
client_node_port_use_buffers(void *object,
uint32_t seq,
enum spa_direction direction, uint32_t port_id, uint32_t mix_id,
uint32_t n_buffers, struct pw_client_node_buffer *buffers)
{
@ -667,6 +684,7 @@ client_node_port_use_buffers(void *object,
mix = ensure_mix(data, direction, port_id, mix_id);
if (mix == NULL) {
res = -EINVAL;
pw_proxy_error(proxy, res, "can add mix");
goto done;
}
@ -687,6 +705,7 @@ client_node_port_use_buffers(void *object,
if (m == NULL) {
pw_log_error("unknown memory id %u", buffers[i].mem_id);
res = -EINVAL;
pw_proxy_error(proxy, res, "unknown memory %u", buffers[i].mem_id);
goto cleanup;
}
@ -698,6 +717,7 @@ client_node_port_use_buffers(void *object,
buffers[i].offset, buffers[i].size);
if (bmem.map.ptr == NULL) {
res = -errno;
pw_proxy_error(proxy, res, "can't mmap memory: %s", spa_strerror(res));
goto cleanup;
}
if (mlock(bmem.map.ptr, bmem.map.map.size) < 0)
@ -716,6 +736,7 @@ client_node_port_use_buffers(void *object,
b = bid->buf = malloc(size);
if (b == NULL) {
res = -ENOMEM;
pw_proxy_error(proxy, res, "can't alloc memory: %s", spa_strerror(res));
goto cleanup;
}
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
@ -757,6 +778,7 @@ client_node_port_use_buffers(void *object,
if (bm == NULL) {
pw_log_error("unknown buffer mem %u", mem_id);
res = -EINVAL;
pw_proxy_error(proxy, res, "unknown buffer mem %u", mem_id);
goto cleanup;
}
@ -783,11 +805,11 @@ client_node_port_use_buffers(void *object,
bufs[i] = b;
}
res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers);
if ((res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers)) < 0)
pw_proxy_error(proxy, res, "can't use buffers: %s", spa_strerror(res));
done:
pw_client_node_proxy_done(data->node_proxy, seq, res);
return;
return res;
cleanup:
clear_buffers(data, mix);
@ -795,9 +817,8 @@ client_node_port_use_buffers(void *object,
}
static void
static int
client_node_port_set_io(void *object,
uint32_t seq,
uint32_t direction,
uint32_t port_id,
uint32_t mix_id,
@ -811,10 +832,14 @@ client_node_port_set_io(void *object,
struct mix *mix;
struct mem *m;
void *ptr;
int res = 0;
mix = ensure_mix(data, direction, port_id, mix_id);
if (mix == NULL)
return;
if (mix == NULL) {
res = -EINVAL;
pw_proxy_error(proxy, res, "can't get mixer: %s", spa_strerror(res));
return res;
}
if (memid == SPA_ID_INVALID) {
ptr = NULL;
@ -824,12 +849,17 @@ client_node_port_set_io(void *object,
m = find_mem(data, memid);
if (m == NULL) {
pw_log_warn("unknown memory id %u", memid);
return;
res = -EINVAL;
pw_proxy_error(proxy, res, "unknown memory id %u", memid);
return res;
}
ptr = mem_map(data, &m->map, m->fd,
PROT_READ|PROT_WRITE, offset, size);
if (ptr == NULL)
return;
if (ptr == NULL) {
res = -errno;
pw_proxy_error(proxy, res, "mmap failed: %s", spa_strerror(res));
return res;
}
m->ref++;
}
@ -848,12 +878,14 @@ client_node_port_set_io(void *object,
if (ptr)
activate_mix(data, mix);
} else {
spa_node_port_set_io(mix->port->node->node,
if ((res = spa_node_port_set_io(mix->port->node->node,
direction, port_id,
id,
ptr,
size);
size)) < 0)
pw_proxy_error(proxy, res, "set_io failed: %s", spa_strerror(res));
}
return res;
}
#if 0
@ -868,7 +900,7 @@ static int link_signal_func(void *user_data)
}
#endif
static void
static int
client_node_set_activation(void *object,
uint32_t node_id,
int signalfd,
@ -881,6 +913,7 @@ client_node_set_activation(void *object,
struct pw_node *node = data->node;
struct mem *m;
struct pw_node_activation *ptr;
int res = 0;
if (memid == SPA_ID_INVALID) {
ptr = NULL;
@ -890,12 +923,17 @@ client_node_set_activation(void *object,
m = find_mem(data, memid);
if (m == NULL) {
pw_log_warn("unknown memory id %u", memid);
return;
res = -EINVAL;
pw_proxy_error(proxy, res, "unknown memory id %u", memid);
return res;
}
ptr = mem_map(data, &m->map, m->fd,
PROT_READ|PROT_WRITE, offset, size);
if (ptr == NULL)
return;
if (ptr == NULL) {
res = -errno;
pw_proxy_error(proxy, res, "mmap failed: %s", spa_strerror(res));
return res;
}
m->ref++;
}
pw_log_debug("node %p: set activation %d", node, node_id);
@ -914,6 +952,7 @@ client_node_set_activation(void *object,
link->link.state->pending);
}
#endif
return res;
}
static const struct pw_client_node_proxy_events client_node_events = {
@ -956,7 +995,6 @@ static void do_node_init(struct pw_proxy *proxy)
PW_CLIENT_NODE_PORT_UPDATE_PARAMS |
PW_CLIENT_NODE_PORT_UPDATE_INFO);
}
pw_client_node_proxy_done(data->node_proxy, 0, 0);
}
static void clear_mix(struct node_data *data, struct mix *mix)