port: work on port commands

This commit is contained in:
Wim Taymans 2017-11-14 15:44:48 +01:00
parent 157a8b6ddf
commit c78fe6a353
7 changed files with 124 additions and 58 deletions

View file

@ -108,15 +108,15 @@ struct pw_client_node_transport {
#define pw_client_node_transport_parse_message(t,m) ((t)->parse_message((t), (m)))
enum pw_client_node_message_type {
PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT,
PW_CLIENT_NODE_MESSAGE_NEED_INPUT,
PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER,
PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT,
PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT,
PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT, /*< signal that the node has output */
PW_CLIENT_NODE_MESSAGE_NEED_INPUT, /*< signal that the node needs input */
PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT, /*< instruct the node to process input */
PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT, /*< instruct the node output is processed */
PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER, /*< reuse a buffer */
};
struct pw_client_node_message_body {
struct spa_pod_int type SPA_ALIGNED(8);
struct spa_pod_int type SPA_ALIGNED(8); /*< one of enum pw_client_node_message_type */
};
struct pw_client_node_message {
@ -124,15 +124,15 @@ struct pw_client_node_message {
struct pw_client_node_message_body body;
};
struct pw_client_node_message_reuse_buffer_body {
struct spa_pod_int type SPA_ALIGNED(8);
struct spa_pod_int port_id SPA_ALIGNED(8);
struct spa_pod_int buffer_id SPA_ALIGNED(8);
struct pw_client_node_message_port_reuse_buffer_body {
struct spa_pod_int type SPA_ALIGNED(8); /*< PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER */
struct spa_pod_int port_id SPA_ALIGNED(8); /*< port id */
struct spa_pod_int buffer_id SPA_ALIGNED(8); /*< buffer id to reuse */
};
struct pw_client_node_message_reuse_buffer {
struct pw_client_node_message_port_reuse_buffer {
struct spa_pod_struct pod;
struct pw_client_node_message_reuse_buffer_body body;
struct pw_client_node_message_port_reuse_buffer_body body;
};
#define PW_CLIENT_NODE_MESSAGE_TYPE(message) (((struct pw_client_node_message*)(message))->body.type.value)
@ -145,14 +145,13 @@ struct pw_client_node_message_reuse_buffer {
{ { { size, SPA_POD_TYPE_STRUCT } }, \
{ SPA_POD_INT_INIT(message), __VA_ARGS__ } } \
#define PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id,buffer_id) \
PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_reuse_buffer, \
sizeof(struct pw_client_node_message_reuse_buffer_body), \
PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, \
#define PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id,buffer_id) \
PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_port_reuse_buffer, \
sizeof(struct pw_client_node_message_port_reuse_buffer_body), \
PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER, \
SPA_POD_INT_INIT(port_id), \
SPA_POD_INT_INIT(buffer_id))
/** information about a buffer */
struct pw_client_node_buffer {
uint32_t mem_id; /**< the memory id for the metadata */
@ -434,7 +433,6 @@ struct pw_client_node_proxy_events {
enum spa_direction direction,
uint32_t port_id,
const struct spa_command *command);
};
static inline void

View file

@ -683,7 +683,7 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32
spa_log_trace(this->log, "reuse buffer %d", buffer_id);
pw_client_node_transport_add_message(impl->transport, (struct pw_client_node_message *)
&PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id, buffer_id));
&PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(port_id, buffer_id));
return 0;
}
@ -694,14 +694,27 @@ spa_proxy_node_port_send_command(struct spa_node *node,
uint32_t port_id, const struct spa_command *command)
{
struct proxy *this;
struct impl *impl;
struct pw_type *t;
if (node == NULL || command == NULL)
return -EINVAL;
this = SPA_CONTAINER_OF(node, struct proxy, node);
spa_log_warn(this->log, "unhandled command %d", SPA_COMMAND_TYPE(command));
return -ENOTSUP;
if (this->resource == NULL)
return 0;
impl = this->impl;
t = impl->t;
spa_log_trace(this->log, "send command %s",
spa_type_map_get_type(t->map, SPA_COMMAND_TYPE(command)));
pw_client_node_resource_port_command(this->resource,
direction, port_id,
command);
return 0;
}
static int spa_proxy_node_process_input(struct spa_node *node)
@ -783,27 +796,37 @@ static int handle_node_message(struct proxy *this, struct pw_client_node_message
n = &impl->this.node->rt.node;
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) {
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) {
*p->io = impl->transport->outputs[p->port_id];
pw_log_trace("have output %d %d", p->io->status, p->io->buffer_id);
}
impl->out_pending = false;
this->callbacks->have_output(this->callbacks_data);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
break;
case PW_CLIENT_NODE_MESSAGE_NEED_INPUT:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
*p->io = impl->transport->inputs[p->port_id];
pw_log_trace("need input %d %d", p->io->status, p->io->buffer_id);
}
impl->input_ready++;
this->callbacks->need_input(this->callbacks_data);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
break;
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
if (impl->client_reuse) {
struct pw_client_node_message_reuse_buffer *p =
(struct pw_client_node_message_reuse_buffer *) message;
struct pw_client_node_message_port_reuse_buffer *p =
(struct pw_client_node_message_port_reuse_buffer *) message;
this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value,
p->body.buffer_id.value);
}
break;
default:
pw_log_warn("unhandled message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
return -ENOTSUP;
}
return 0;
}

View file

@ -355,13 +355,28 @@ void pw_port_destroy(struct pw_port *port)
}
static int
do_port_pause(struct spa_loop *loop,
do_port_command(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, const void *data, void *user_data)
{
struct pw_port *port = user_data;
struct pw_node *node = port->node;
return spa_node_port_send_command(node->node, port->direction, port->port_id,
&SPA_COMMAND_INIT(node->core->type.command_node.Pause));
return spa_node_port_send_command(node->node, port->direction, port->port_id, data);
}
int pw_port_send_command(struct pw_port *port, bool block, const struct spa_command *command)
{
return pw_loop_invoke(port->node->data_loop, do_port_command, 0,
SPA_POD_SIZE(command), command, block, port);
}
int pw_port_pause(struct pw_port *port)
{
if (port->state > PW_PORT_STATE_PAUSED) {
pw_port_send_command(port, true,
&SPA_COMMAND_INIT(port->node->core->type.command_node.Pause));
port_update_state (port, PW_PORT_STATE_PAUSED);
}
return 0;
}
int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
@ -393,6 +408,7 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
{
int res;
struct pw_node *node = port->node;
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return 0;
@ -400,14 +416,10 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
if (n_buffers > 0 && port->state < PW_PORT_STATE_READY)
return -EIO;
if (port->state > PW_PORT_STATE_PAUSED) {
pw_loop_invoke(port->node->data_loop,
do_port_pause, 0, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_port_pause(port);
pw_log_debug("port %p: use %d buffers", port, n_buffers);
res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers);
res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers);
if (port->allocated) {
free(port->buffers);
@ -430,19 +442,16 @@ int pw_port_alloc_buffers(struct pw_port *port,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
int res;
struct pw_node *node = port->node;
if (port->state < PW_PORT_STATE_READY)
return -EIO;
if (port->state > PW_PORT_STATE_PAUSED) {
pw_loop_invoke(port->node->data_loop,
do_port_pause, 0, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_port_pause(port);
pw_log_debug("port %p: alloc %d buffers", port, *n_buffers);
res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id,
res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id,
params, n_params,
buffers, n_buffers);
if (port->allocated) {

View file

@ -441,6 +441,12 @@ int pw_port_alloc_buffers(struct pw_port *port,
struct spa_pod **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers);
/** Send a command to a port */
int pw_port_send_command(struct pw_port *port, bool block, const struct spa_command *command);
/** pause the port */
int pw_port_pause(struct pw_port *port);
/** Change the state of the node */
int pw_node_set_state(struct pw_node *node, enum pw_node_state state);

View file

@ -452,17 +452,21 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_
{
struct node_data *data = proxy->user_data;
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) {
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
pw_log_trace("remote %p: process input", data->remote);
spa_graph_have_output(data->node->rt.graph, &data->in_node);
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
break;
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
pw_log_trace("remote %p: process output", data->remote);
spa_graph_need_input(data->node->rt.graph, &data->out_node);
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
struct pw_client_node_message_reuse_buffer *rb =
(struct pw_client_node_message_reuse_buffer *) message;
break;
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
{
struct pw_client_node_message_port_reuse_buffer *rb =
(struct pw_client_node_message_port_reuse_buffer *) message;
uint32_t port_id = rb->body.port_id.value;
uint32_t buffer_id = rb->body.buffer_id.value;
struct spa_graph_port *p, *pp;
@ -475,9 +479,11 @@ static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_
pp->port_id, buffer_id);
break;
}
break;
}
else {
default:
pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
break;
}
}
@ -817,6 +823,8 @@ client_node_port_set_param(void *object,
goto done;
}
pw_port_pause(port->port);
res = pw_port_set_param(port->port, id, flags, param);
if (res < 0)
goto done;
@ -927,6 +935,8 @@ client_node_port_use_buffers(void *object,
goto done;
}
pw_port_pause(port->port);
prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
/* clear previous buffers */
@ -1041,7 +1051,15 @@ client_node_port_command(void *object,
uint32_t port_id,
const struct spa_command *command)
{
pw_log_warn("port command not supported");
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
static struct port *port;
port = find_port(data, direction, port_id);
if (port == NULL)
return;
pw_port_send_command(port->port, true, command);
}
static const struct pw_client_node_proxy_events client_node_events = {

View file

@ -457,7 +457,7 @@ static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id)
uint64_t cmd = 1;
pw_client_node_transport_add_message(impl->trans, (struct pw_client_node_message*)
&PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(impl->port_id, id));
&PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(impl->port_id, id));
write(impl->rtwritefd, &cmd, 8);
}
@ -551,7 +551,9 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) {
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
{
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
@ -581,7 +583,10 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
input->status = SPA_STATUS_NEED_BUFFER;
}
send_need_input(stream);
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
{
int i;
for (i = 0; i < impl->trans->area->n_output_ports; i++) {
@ -597,9 +602,12 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer);
impl->in_need_buffer = false;
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
struct pw_client_node_message_reuse_buffer *p =
(struct pw_client_node_message_reuse_buffer *) message;
break;
}
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
{
struct pw_client_node_message_port_reuse_buffer *p =
(struct pw_client_node_message_port_reuse_buffer *) message;
if (p->body.port_id.value != impl->port_id)
return;
@ -607,8 +615,11 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
return;
reuse_buffer(stream, p->body.buffer_id.value);
} else {
break;
}
default:
pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
break;
}
}