node: improve async handling

Remove the done and error callbacks. The error callback is in an
error message. The done callback is replace with spa_pending.

Make enum_params take a callback and data for the results. This allows
us to push the results one after another to the app and avoids ownership
issues of the passed data. We can then extend this to handle the async
case by doing a _wait call with a spa_pending+callback+data that will
be called when the _enum_params returns and async result.
Add a sync method.

All methods can now return SPA_RESULT_IS_ASYNC return values and you
can use spa_node_wait() to register a callback when they complete
with optional extra parameters. This makes it easier to sync and
handle the reply.

Make helper methods to simulate the sync enum_params behaviour for
sync nodes.

Let the transport generate the sequence number for pw_resource_sync()
and pw_proxy_sync(). That way we don't need to keep track of numbers
ourselves and we can match the reply to the request easily.
This commit is contained in:
Wim Taymans 2019-02-20 17:51:05 +01:00
parent b743518f78
commit 7b12212eeb
67 changed files with 1894 additions and 1209 deletions

View file

@ -34,6 +34,7 @@
#include <sys/eventfd.h>
#include <spa/node/node.h>
#include <spa/node/utils.h>
#include <spa/buffer/alloc.h>
#include <spa/pod/parser.h>
#include <spa/pod/filter.h>
@ -68,8 +69,6 @@ struct node {
const struct spa_node_callbacks *callbacks;
void *callbacks_data;
uint32_t seq;
};
struct impl {
@ -83,7 +82,6 @@ struct impl {
struct spa_hook node_listener;
struct spa_hook client_node_listener;
struct spa_hook resource_listener;
enum spa_direction direction;
@ -110,24 +108,28 @@ struct impl {
/** \endcond */
static int impl_node_enum_params(struct spa_node *node,
uint32_t id, uint32_t *index,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
struct spa_pod **result,
struct spa_pod_builder *builder)
spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
uint8_t buffer[1024];
struct spa_result_node_enum_params result;
uint32_t count = 0;
int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(index != NULL, -EINVAL);
spa_return_val_if_fail(builder != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL);
spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
result.next = start;
next:
spa_pod_builder_init(&b, buffer, sizeof(buffer));
@ -138,10 +140,10 @@ static int impl_node_enum_params(struct spa_node *node,
SPA_PARAM_EnumFormat,
SPA_PARAM_Format };
if (*index < SPA_N_ELEMENTS(list))
if (result.next < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
SPA_PARAM_LIST_id, SPA_POD_Id(list[*index]));
SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
else
return 0;
break;
@ -149,8 +151,7 @@ static int impl_node_enum_params(struct spa_node *node,
case SPA_PARAM_Props:
if (impl->adapter != impl->cnode) {
return spa_node_enum_params(impl->adapter,
id, index,
filter, result, builder);
id, start, num, filter, func, data);
}
return 0;
@ -158,18 +159,24 @@ static int impl_node_enum_params(struct spa_node *node,
case SPA_PARAM_Format:
return spa_node_port_enum_params(impl->cnode,
impl->direction, 0,
id, index,
filter, result, builder);
id, start, num,
filter, func, data);
default:
return -ENOENT;
}
(*index)++;
result.next++;
if (spa_pod_filter(builder, result, param, filter) < 0)
if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
return 1;
if ((res = func(data, count, 1, &result)) != 0)
return res;
if (++count != num)
goto next;
return 0;
}
static void try_link_controls(struct impl *impl)
@ -323,11 +330,11 @@ impl_node_set_callbacks(struct spa_node *node,
if (this->callbacks && impl->adapter && impl->adapter != impl->cnode)
spa_node_set_callbacks(impl->adapter, &adapter_node_callbacks, impl);
return 0;
return spa_node_sync(impl->cnode);
}
static int
impl_node_sync(struct spa_node *node, uint32_t seq)
impl_node_sync(struct spa_node *node)
{
struct node *this;
struct impl *impl;
@ -337,7 +344,22 @@ impl_node_sync(struct spa_node *node, uint32_t seq)
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return spa_node_sync(impl->cnode, seq);
return spa_node_sync(impl->cnode);
}
static int
impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending,
spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return spa_node_wait(impl->cnode, seq, pending, func, data);
}
static int
@ -389,17 +411,16 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
static int
impl_node_port_enum_params(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
uint32_t id, uint32_t *index,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
struct spa_pod **result,
struct spa_pod_builder *builder)
spa_result_func_t func, void *data)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(index != NULL, -EINVAL);
spa_return_val_if_fail(builder != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL);
spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
@ -408,7 +429,7 @@ impl_node_port_enum_params(struct spa_node *node,
return -EINVAL;
return spa_node_port_enum_params(impl->adapter, direction, port_id, id,
index, filter, result, builder);
start, num, filter, func, data);
}
static int debug_params(struct impl *impl, struct spa_node *node,
@ -426,11 +447,11 @@ static int debug_params(struct impl *impl, struct spa_node *node,
state = 0;
while (true) {
spa_pod_builder_init(&b, buffer, sizeof(buffer));
res = spa_node_port_enum_params(node,
res = spa_node_port_enum_params_sync(node,
direction, port_id,
id, &state,
NULL, &param, &b);
if (res <= 0) {
if (res != 1) {
if (res < 0)
spa_log_error(this->log, " error: %s", spa_strerror(res));
break;
@ -460,11 +481,11 @@ static int negotiate_format(struct impl *impl)
spa_log_debug(this->log, NAME "%p: negiotiate", impl);
state = 0;
if ((res = spa_node_port_enum_params(impl->adapter_mix,
if ((res = spa_node_port_enum_params_sync(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_EnumFormat, &state,
NULL, &format, &b)) <= 0) {
NULL, &format, &b)) != 1) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
@ -473,10 +494,10 @@ static int negotiate_format(struct impl *impl)
}
state = 0;
if ((res = spa_node_port_enum_params(impl->cnode,
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
format, &format, &b)) <= 0) {
format, &format, &b)) != 1) {
debug_params(impl, impl->cnode, impl->direction, 0,
SPA_PARAM_EnumFormat, format);
return -ENOTSUP;
@ -524,22 +545,22 @@ static int negotiate_buffers(struct impl *impl)
return 0;
state = 0;
if ((res = spa_node_port_enum_params(impl->adapter_mix,
if ((res = spa_node_port_enum_params_sync(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, &state,
param, &param, &b)) <= 0) {
param, &param, &b)) < 0) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, param);
return -ENOTSUP;
}
if (res == 0)
if (res != 1)
param = NULL;
state = 0;
if ((res = spa_node_port_enum_params(impl->cnode,
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_Buffers, &state,
param, &param, &b)) < 0) {
@ -833,6 +854,7 @@ static const struct spa_node impl_node = {
SPA_VERSION_NODE,
.set_callbacks = impl_node_set_callbacks,
.sync = impl_node_sync,
.wait = impl_node_wait,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.set_io = impl_node_set_io,
@ -861,10 +883,7 @@ node_init(struct node *this,
this->log = support[i].data;
}
this->node = impl_node;
this->seq = 1;
return SPA_RESULT_RETURN_ASYNC(this->seq++);
return 0;
}
static int do_port_info(void *data, struct pw_port *port)
@ -957,10 +976,10 @@ static void client_node_initialized(void *data)
state = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if ((res = spa_node_port_enum_params(impl->cnode,
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
NULL, &format, &b)) <= 0) {
NULL, &format, &b)) != 1) {
pw_log_warn("client-stream %p: no format given", &impl->this);
impl->adapter = impl->cnode;
impl->adapter_mix = impl->client_port->mix;
@ -1049,15 +1068,6 @@ static void client_node_initialized(void *data)
items[0] = SPA_DICT_ITEM_INIT("media.class", media_class);
pw_node_update_properties(impl->this.node, &SPA_DICT_INIT(items, 1));
pw_node_register(impl->this.node,
pw_resource_get_client(impl->client_node->resource),
impl->client_node->parent,
NULL);
pw_log_debug("client-stream %p: activating", &impl->this);
pw_node_set_active(impl->this.node, true);
}
static void cleanup(struct impl *impl)
@ -1087,15 +1097,6 @@ static void client_node_destroy(void *data)
cleanup(impl);
}
static void client_node_async_complete(void *data, uint32_t seq, int res)
{
struct impl *impl = data;
struct node *node = &impl->node;
pw_log_debug("client-stream %p: async complete %d %d", &impl->this, seq, res);
node->callbacks->done(node->callbacks_data, seq);
}
static void client_node_active_changed(void *data, bool active)
{
struct impl *impl = data;
@ -1118,7 +1119,6 @@ static const struct pw_node_events client_node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = client_node_destroy,
.initialized = client_node_initialized,
.async_complete = client_node_async_complete,
.active_changed = client_node_active_changed,
.info_changed = client_node_info_changed,
};
@ -1225,7 +1225,7 @@ struct pw_client_stream *pw_client_stream_new(struct pw_resource *resource,
client,
parent,
name,
PW_SPA_NODE_FLAG_ASYNC,
PW_SPA_NODE_FLAG_ACTIVATE,
&impl->node.node,
NULL,
properties, 0);