Improve async handling

Don't use special callback in node to receive the results. Instead,
use a generic result callback to receive the result. This makes things
a bit more symetric and generic again because then you can choose how
to match the result to the request and you have a generic way to handle
both the sync and async case. We can then also remove the wait method.
This also makes the remote interface and spa interface to objects very
similar.

Make a helper object to receive and dispatch results. Use this in the
helper for enum_params.

Make device use the same result callbacks.
This commit is contained in:
Wim Taymans 2019-02-25 12:29:57 +01:00
parent 98463b689b
commit d2c18c7b1a
64 changed files with 1298 additions and 1141 deletions

View file

@ -103,34 +103,37 @@ struct impl {
struct spa_buffer **buffers;
uint32_t n_buffers;
struct pw_memblock *mem;
struct spa_pending client_node_pending;
};
/** \endcond */
static int impl_node_enum_params(struct spa_node *node,
static int impl_node_enum_params(struct spa_node *node, int seq,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
spa_result_func_t func, void *data)
const struct spa_pod *filter)
{
struct node *this;
struct impl *impl;
struct spa_pod *param;
struct spa_pod_builder b = { 0 };
uint8_t buffer[1024];
struct spa_result_node_enum_params result;
struct spa_result_node_params result;
uint32_t count = 0;
int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL);
spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
spa_return_val_if_fail(this->callbacks && this->callbacks->result, -EIO);
impl = this->impl;
result.id = id;
result.next = start;
next:
result.index = result.next++;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
switch (id) {
@ -140,37 +143,41 @@ static int impl_node_enum_params(struct spa_node *node,
SPA_PARAM_EnumFormat,
SPA_PARAM_Format };
if (result.next < SPA_N_ELEMENTS(list))
if (result.index < SPA_N_ELEMENTS(list))
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamList, id,
SPA_PARAM_LIST_id, SPA_POD_Id(list[result.next]));
SPA_PARAM_LIST_id, SPA_POD_Id(list[result.index]));
else
return 0;
break;
}
case SPA_PARAM_Props:
if (impl->adapter != impl->cnode) {
return spa_node_enum_params(impl->adapter,
id, start, num, filter, func, data);
}
return 0;
if (impl->adapter == impl->cnode)
return 0;
if ((res = spa_node_enum_params_sync(impl->adapter,
id, &start, filter, &param, &b,
impl->this.node->pending)) != 1)
return res;
break;
case SPA_PARAM_EnumFormat:
case SPA_PARAM_Format:
return spa_node_port_enum_params(impl->cnode,
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
id, start, num,
filter, func, data);
id, &start, filter, &param, &b,
impl->client_node->node->pending)) != 1)
return res;
break;
default:
return -ENOENT;
}
result.next++;
if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
if ((res = func(data, count, &result)) != 0)
if ((res = this->callbacks->result(this->callbacks_data, seq, 0, &result)) != 0)
return res;
if (++count != num)
@ -306,10 +313,17 @@ static int adapter_port_info(void *data,
}
return 0;
}
static int adapter_result(void *data, int seq, int res, const void *result)
{
struct impl *impl = data;
struct node *this = &impl->node;
return this->callbacks->result(this->callbacks_data, seq, res, result);
}
static const struct spa_node_callbacks adapter_node_callbacks = {
SPA_VERSION_NODE_CALLBACKS,
.port_info = adapter_port_info,
.result = adapter_result,
};
static int
@ -331,11 +345,11 @@ impl_node_set_callbacks(struct spa_node *node,
if (this->callbacks && impl->adapter && impl->adapter != impl->cnode)
spa_node_set_callbacks(impl->adapter, &adapter_node_callbacks, impl);
return spa_node_sync(impl->cnode);
return spa_node_sync(impl->cnode, 0);
}
static int
impl_node_sync(struct spa_node *node)
impl_node_sync(struct spa_node *node, int seq)
{
struct node *this;
struct impl *impl;
@ -345,22 +359,7 @@ impl_node_sync(struct spa_node *node)
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return spa_node_sync(impl->cnode);
}
static int
impl_node_wait(struct spa_node *node, int seq, struct spa_pending *pending,
spa_pending_func_t func, void *data)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return spa_node_wait(impl->cnode, seq, pending, func, data);
return spa_node_sync(impl->cnode, seq);
}
static int
@ -410,27 +409,28 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
}
static int
impl_node_port_enum_params(struct spa_node *node,
impl_node_port_enum_params(struct spa_node *node, int seq,
enum spa_direction direction, uint32_t port_id,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter,
spa_result_func_t func, void *data)
const struct spa_pod *filter)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL);
spa_return_val_if_fail(func != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
spa_return_val_if_fail(this->callbacks && this->callbacks->result, -EIO);
impl = this->impl;
if (direction != impl->direction)
return -EINVAL;
return spa_node_port_enum_params(impl->adapter, direction, port_id, id,
start, num, filter, func, data);
pw_log_debug("%p: %d %u", this, seq, id);
return spa_node_port_enum_params(impl->adapter, seq, direction, port_id, id,
start, num, filter);
}
static int debug_params(struct impl *impl, struct spa_node *node,
@ -451,7 +451,8 @@ static int debug_params(struct impl *impl, struct spa_node *node,
res = spa_node_port_enum_params_sync(node,
direction, port_id,
id, &state,
NULL, &param, &b);
NULL, &param, &b,
impl->this.node->pending);
if (res != 1) {
if (res < 0)
spa_log_error(this->log, " error: %s", spa_strerror(res));
@ -486,7 +487,8 @@ static int negotiate_format(struct impl *impl)
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_EnumFormat, &state,
NULL, &format, &b)) != 1) {
NULL, &format, &b,
impl->this.node->pending)) != 1) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
@ -496,9 +498,10 @@ static int negotiate_format(struct impl *impl)
state = 0;
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
format, &format, &b)) != 1) {
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
format, &format, &b,
impl->client_node->node->pending)) != 1) {
debug_params(impl, impl->cnode, impl->direction, 0,
SPA_PARAM_EnumFormat, format);
return -ENOTSUP;
@ -547,10 +550,11 @@ static int negotiate_buffers(struct impl *impl)
state = 0;
if ((res = spa_node_port_enum_params_sync(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, &state,
param, &param, &b)) < 0) {
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
SPA_PARAM_Buffers, &state,
param, &param, &b,
impl->this.node->pending)) != 1) {
debug_params(impl, impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction),
impl->adapter_mix_port,
@ -562,9 +566,10 @@ static int negotiate_buffers(struct impl *impl)
state = 0;
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_Buffers, &state,
param, &param, &b)) < 0) {
impl->direction, 0,
SPA_PARAM_Buffers, &state,
param, &param, &b,
impl->client_node->node->pending)) < 0) {
debug_params(impl, impl->cnode, impl->direction, 0,
SPA_PARAM_Buffers, param);
return res;
@ -855,7 +860,6 @@ static const struct spa_node impl_node = {
SPA_VERSION_NODE,
.set_callbacks = impl_node_set_callbacks,
.sync = impl_node_sync,
.wait = impl_node_wait,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.set_io = impl_node_set_io,
@ -980,7 +984,8 @@ static void client_node_initialized(void *data)
if ((res = spa_node_port_enum_params_sync(impl->cnode,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
NULL, &format, &b)) != 1) {
NULL, &format, &b,
impl->client_node->node->pending)) != 1) {
pw_log_warn("client-stream %p: no format given", &impl->this);
impl->adapter = impl->cnode;
impl->adapter_mix = impl->client_port->mix;
@ -1098,6 +1103,14 @@ static void client_node_destroy(void *data)
cleanup(impl);
}
static void client_node_result(void *data, int seq, int res, const void *result)
{
struct impl *impl = data;
struct node *node = &impl->node;
pw_log_debug("client-stream %p: result %d %d", &impl->this, seq, res);
node->callbacks->result(node->callbacks_data, seq, res, result);
}
static void client_node_active_changed(void *data, bool active)
{
struct impl *impl = data;
@ -1106,7 +1119,7 @@ static void client_node_active_changed(void *data, bool active)
impl->active = active;
}
static void client_node_info_changed (void *data, const struct pw_node_info *info)
static void client_node_info_changed(void *data, const struct pw_node_info *info)
{
struct impl *impl = data;
struct pw_client_stream *this = &impl->this;
@ -1120,6 +1133,7 @@ static const struct pw_node_events client_node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = client_node_destroy,
.initialized = client_node_initialized,
.result = client_node_result,
.active_changed = client_node_active_changed,
.info_changed = client_node_info_changed,
};
@ -1145,13 +1159,13 @@ static void node_initialized(void *data)
static void node_peer_added(void *data, struct pw_node *peer)
{
struct impl *impl = data;
pw_node_events_peer_added(impl->client_node->node, peer);
pw_node_emit_peer_added(impl->client_node->node, peer);
}
static void node_peer_removed(void *data, struct pw_node *peer)
{
struct impl *impl = data;
pw_node_events_peer_removed(impl->client_node->node, peer);
pw_node_emit_peer_removed(impl->client_node->node, peer);
}
static void node_driver_changed(void *data, struct pw_node *old, struct pw_node *driver)
@ -1159,7 +1173,7 @@ static void node_driver_changed(void *data, struct pw_node *old, struct pw_node
struct impl *impl = data;
pw_log_debug("client-stream %p: driver changed %p->%p", &impl->this, old, driver);
impl->client_node->node->driver_node = driver;
pw_node_events_driver_changed(impl->client_node->node, old, driver);
pw_node_emit_driver_changed(impl->client_node->node, old, driver);
}
static const struct pw_node_events node_events = {