more async improvements

Make the sequence number an int.

Keep track of the last received seq number and pass it in error
replies.

Pass seq in for_each methods.
This commit is contained in:
Wim Taymans 2019-02-25 16:25:27 +01:00
parent d2c18c7b1a
commit f2ff6f393b
34 changed files with 377 additions and 347 deletions

View file

@ -89,7 +89,7 @@ static void *create_object(void *_data,
no_mem:
pw_log_error("can't create node");
pw_core_resource_error(pw_client_get_core_resource(client), new_id, -ENOMEM, "can't create node: no memory");
pw_resource_error(resource, -ENOMEM, "can't create node: no memory");
goto done;
done:
if (properties)

View file

@ -1213,14 +1213,18 @@ static void client_node_resource_destroy(void *data)
pw_node_destroy(this->node);
}
static void client_node_resource_error(void *data, int res, const char *message)
static void client_node_resource_error(void *data, int seq, int res, const char *message)
{
struct impl *impl = data;
struct node *this = &impl->node;
pw_log_error("client-node %p: error %d: %s", this, res, message);
struct spa_result_node_error result;
pw_log_error("client-node %p: error seq:%d %d (%s)", this, seq, res, message);
result.message = message;
this->callbacks->result(this->callbacks_data, seq, res, &result);
}
static void client_node_resource_done(void *data, uint32_t seq)
static void client_node_resource_done(void *data, int seq)
{
struct impl *impl = data;
struct node *this = &impl->node;

View file

@ -393,7 +393,7 @@ static int client_node_transport(void *object, uint32_t node_id,
if (data->node->active)
pw_client_node_proxy_set_active(data->node_proxy, true);
pw_remote_events_exported(remote, proxy->id, node_id);
pw_remote_emit_exported(remote, proxy->id, node_id);
return 0;
}

View file

@ -127,11 +127,13 @@ process_messages(struct client_data *data)
const struct pw_protocol_native_demarshal *demarshal;
const struct pw_protocol_marshal *marshal;
uint32_t permissions, required;
int seq;
int seq = 0;
if (!pw_protocol_native_connection_get_next(conn, &opcode, &id, &message, &size, &seq))
break;
client->seq = seq;
pw_log_trace("protocol-native %p: got message %d from %u", client->protocol,
opcode, id);
@ -145,10 +147,9 @@ process_messages(struct client_data *data)
pw_log_error("protocol-native %p: unknown resource %u",
client->protocol, id);
pw_resource_error(client->core_resource,
-EINVAL, "unknown resource %u", id);
-EINVAL, "unknown resource %u", id);
continue;
}
resource->seq = seq;
marshal = pw_resource_get_marshal(resource);
if (marshal == NULL || opcode >= marshal->n_methods)
@ -492,6 +493,8 @@ on_remote_data(void *data, int fd, enum spa_io mask)
pw_log_trace("protocol-native %p: got message %d from %u seq:%d",
this, opcode, id, seq);
this->seq = seq;
if (debug_messages) {
fprintf(stderr, "<<<<<<<<< in: %d %d %d %d\n", id, opcode, size, seq);
spa_debug_pod(0, NULL, (struct spa_pod *)message);
@ -503,7 +506,6 @@ on_remote_data(void *data, int fd, enum spa_io mask)
pw_log_error("protocol-native %p: could not find proxy %u", this, id);
continue;
}
proxy->seq = seq;
marshal = pw_proxy_get_marshal(proxy);
if (marshal == NULL || opcode >= marshal->n_events) {

View file

@ -45,22 +45,21 @@ static int core_method_marshal_hello(void *object, uint32_t version)
return pw_protocol_native_end_proxy(proxy, b);
}
static int core_method_marshal_sync(void *object, uint32_t id, uint32_t seq)
static int core_method_marshal_sync(void *object, uint32_t id, int seq)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
int res;
b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_SYNC, &res);
b = pw_protocol_native_begin_proxy(proxy, PW_CORE_PROXY_METHOD_SYNC, &seq);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
SPA_POD_Int(res));
SPA_POD_Int(seq));
return pw_protocol_native_end_proxy(proxy, b);
}
static int core_method_marshal_done(void *object, uint32_t id, uint32_t seq)
static int core_method_marshal_done(void *object, uint32_t id, int seq)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
@ -74,7 +73,7 @@ static int core_method_marshal_done(void *object, uint32_t id, uint32_t seq)
return pw_protocol_native_end_proxy(proxy, b);
}
static int core_method_marshal_error(void *object, uint32_t id, int res, const char *error)
static int core_method_marshal_error(void *object, uint32_t id, int seq, int res, const char *error)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
@ -83,6 +82,7 @@ static int core_method_marshal_error(void *object, uint32_t id, int res, const c
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
SPA_POD_Int(seq),
SPA_POD_Int(res),
SPA_POD_String(error));
@ -233,16 +233,18 @@ static int core_event_demarshal_error(void *object, void *data, size_t size)
struct pw_proxy *proxy = object;
struct spa_pod_parser prs;
uint32_t id, res;
int seq;
const char *error;
spa_pod_parser_init(&prs, data, size);
if (spa_pod_parser_get_struct(&prs,
SPA_POD_Int(&id),
SPA_POD_Int(&seq),
SPA_POD_Int(&res),
SPA_POD_String(&error)) < 0)
return -EINVAL;
return pw_proxy_notify(proxy, struct pw_core_proxy_events, error, 0, id, res, error);
return pw_proxy_notify(proxy, struct pw_core_proxy_events, error, 0, id, seq, res, error);
}
static int core_event_demarshal_remove_id(void *object, void *data, size_t size)
@ -282,7 +284,7 @@ static int core_event_marshal_info(void *object, const struct pw_core_info *info
return pw_protocol_native_end_resource(resource, b);
}
static int core_event_marshal_done(void *object, uint32_t id, uint32_t seq)
static int core_event_marshal_done(void *object, uint32_t id, int seq)
{
struct pw_resource *resource = object;
struct spa_pod_builder *b;
@ -296,22 +298,21 @@ static int core_event_marshal_done(void *object, uint32_t id, uint32_t seq)
return pw_protocol_native_end_resource(resource, b);
}
static int core_event_marshal_sync(void *object, uint32_t id, uint32_t seq)
static int core_event_marshal_sync(void *object, uint32_t id, int seq)
{
struct pw_resource *resource = object;
struct spa_pod_builder *b;
int res;
b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_SYNC, &res);
b = pw_protocol_native_begin_resource(resource, PW_CORE_PROXY_EVENT_SYNC, &seq);
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
SPA_POD_Int(res));
SPA_POD_Int(seq));
return pw_protocol_native_end_resource(resource, b);
}
static int core_event_marshal_error(void *object, uint32_t id, int res, const char *error)
static int core_event_marshal_error(void *object, uint32_t id, int seq, int res, const char *error)
{
struct pw_resource *resource = object;
struct spa_pod_builder *b;
@ -320,6 +321,7 @@ static int core_event_marshal_error(void *object, uint32_t id, int res, const ch
spa_pod_builder_add_struct(b,
SPA_POD_Int(id),
SPA_POD_Int(seq),
SPA_POD_Int(res),
SPA_POD_String(error));
@ -388,16 +390,18 @@ static int core_method_demarshal_error(void *object, void *data, size_t size)
struct pw_resource *resource = object;
struct spa_pod_parser prs;
uint32_t id, res;
int seq;
const char *error;
spa_pod_parser_init(&prs, data, size);
if (spa_pod_parser_get_struct(&prs,
SPA_POD_Int(&id),
SPA_POD_Int(&seq),
SPA_POD_Int(&res),
SPA_POD_String(&error)) < 0)
return -EINVAL;
return pw_resource_do(resource, struct pw_core_proxy_methods, error, 0, id, res, error);
return pw_resource_do(resource, struct pw_core_proxy_methods, error, 0, id, seq, res, error);
}
static int core_method_demarshal_get_registry(void *object, void *data, size_t size)
@ -646,7 +650,7 @@ static int device_demarshal_info(void *object, void *data, size_t size)
return pw_proxy_notify(proxy, struct pw_device_proxy_events, info, 0, &info);
}
static int device_marshal_param(void *object, uint32_t seq, uint32_t id, uint32_t index, uint32_t next,
static int device_marshal_param(void *object, int seq, uint32_t id, uint32_t index, uint32_t next,
const struct spa_pod *param)
{
struct pw_resource *resource = object;
@ -668,7 +672,8 @@ static int device_demarshal_param(void *object, void *data, size_t size)
{
struct pw_proxy *proxy = object;
struct spa_pod_parser prs;
uint32_t seq, id, index, next;
uint32_t id, index, next;
int seq;
struct spa_pod *param;
spa_pod_parser_init(&prs, data, size);
@ -684,17 +689,16 @@ static int device_demarshal_param(void *object, void *data, size_t size)
seq, id, index, next, param);
}
static int device_marshal_enum_params(void *object, uint32_t seq,
static int device_marshal_enum_params(void *object, int seq,
uint32_t id, uint32_t index, uint32_t num, const struct spa_pod *filter)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
int res;
b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_ENUM_PARAMS, &res);
b = pw_protocol_native_begin_proxy(proxy, PW_DEVICE_PROXY_METHOD_ENUM_PARAMS, &seq);
spa_pod_builder_add_struct(b,
SPA_POD_Int(res),
SPA_POD_Int(seq),
SPA_POD_Id(id),
SPA_POD_Int(index),
SPA_POD_Int(num),
@ -707,7 +711,8 @@ static int device_demarshal_enum_params(void *object, void *data, size_t size)
{
struct pw_resource *resource = object;
struct spa_pod_parser prs;
uint32_t id, index, num, seq;
uint32_t id, index, num;
int seq;
struct spa_pod *filter;
spa_pod_parser_init(&prs, data, size);
@ -877,7 +882,7 @@ static int node_demarshal_info(void *object, void *data, size_t size)
return pw_proxy_notify(proxy, struct pw_node_proxy_events, info, 0, &info);
}
static int node_marshal_param(void *object, uint32_t seq, uint32_t id,
static int node_marshal_param(void *object, int seq, uint32_t id,
uint32_t index, uint32_t next, const struct spa_pod *param)
{
struct pw_resource *resource = object;
@ -899,7 +904,8 @@ static int node_demarshal_param(void *object, void *data, size_t size)
{
struct pw_proxy *proxy = object;
struct spa_pod_parser prs;
uint32_t seq, id, index, next;
uint32_t id, index, next;
int seq;
struct spa_pod *param;
spa_pod_parser_init(&prs, data, size);
@ -915,17 +921,16 @@ static int node_demarshal_param(void *object, void *data, size_t size)
seq, id, index, next, param);
}
static int node_marshal_enum_params(void *object, uint32_t seq, uint32_t id,
static int node_marshal_enum_params(void *object, int seq, uint32_t id,
uint32_t index, uint32_t num, const struct spa_pod *filter)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
int res;
b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_ENUM_PARAMS, &res);
b = pw_protocol_native_begin_proxy(proxy, PW_NODE_PROXY_METHOD_ENUM_PARAMS, &seq);
spa_pod_builder_add_struct(b,
SPA_POD_Int(res),
SPA_POD_Int(seq),
SPA_POD_Id(id),
SPA_POD_Int(index),
SPA_POD_Int(num),
@ -938,7 +943,8 @@ static int node_demarshal_enum_params(void *object, void *data, size_t size)
{
struct pw_resource *resource = object;
struct spa_pod_parser prs;
uint32_t seq, id, index, num;
uint32_t id, index, num;
int seq;
struct spa_pod *filter;
spa_pod_parser_init(&prs, data, size);
@ -1064,7 +1070,7 @@ static int port_demarshal_info(void *object, void *data, size_t size)
return pw_proxy_notify(proxy, struct pw_port_proxy_events, info, 0, &info);
}
static int port_marshal_param(void *object, uint32_t seq, uint32_t id,
static int port_marshal_param(void *object, int seq, uint32_t id,
uint32_t index, uint32_t next, const struct spa_pod *param)
{
struct pw_resource *resource = object;
@ -1086,7 +1092,8 @@ static int port_demarshal_param(void *object, void *data, size_t size)
{
struct pw_proxy *proxy = object;
struct spa_pod_parser prs;
uint32_t seq, id, index, next;
uint32_t id, index, next;
int seq;
struct spa_pod *param;
spa_pod_parser_init(&prs, data, size);
@ -1102,17 +1109,16 @@ static int port_demarshal_param(void *object, void *data, size_t size)
seq, id, index, next, param);
}
static int port_marshal_enum_params(void *object, uint32_t seq, uint32_t id,
static int port_marshal_enum_params(void *object, int seq, uint32_t id,
uint32_t index, uint32_t num, const struct spa_pod *filter)
{
struct pw_proxy *proxy = object;
struct spa_pod_builder *b;
int res;
b = pw_protocol_native_begin_proxy(proxy, PW_PORT_PROXY_METHOD_ENUM_PARAMS, &res);
b = pw_protocol_native_begin_proxy(proxy, PW_PORT_PROXY_METHOD_ENUM_PARAMS, &seq);
spa_pod_builder_add_struct(b,
SPA_POD_Int(res),
SPA_POD_Int(seq),
SPA_POD_Id(id),
SPA_POD_Int(index),
SPA_POD_Int(num),
@ -1125,7 +1131,8 @@ static int port_demarshal_enum_params(void *object, void *data, size_t size)
{
struct pw_resource *resource = object;
struct spa_pod_parser prs;
uint32_t seq, id, index, num;
uint32_t id, index, num;
int seq;
struct spa_pod *filter;
spa_pod_parser_init(&prs, data, size);