Work on node creation

Implements the basics of the PORT_UPDATE control message
Add the ports to the proxy node with whe PORT_UPDATE control message.
Let the proxy node check the events and create dbus objects based on
added/removed ports.
This commit is contained in:
Wim Taymans 2016-08-05 19:46:37 +02:00
parent ac5d22ec79
commit de53315f6e
17 changed files with 350 additions and 215 deletions

View file

@ -96,6 +96,10 @@ typedef struct {
const SpaProps *props;
} SpaControlCmdNodeUpdate;
#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS (1 << 0)
#define SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS (1 << 1)
#define SPA_CONTROL_CMD_NODE_UPDATE_PROPS (1 << 2)
/* SPA_CONTROL_CMD_PORT_UPDATE */
typedef struct {
uint32_t port_id;
@ -107,6 +111,11 @@ typedef struct {
const SpaPortInfo *info;
} SpaControlCmdPortUpdate;
#define SPA_CONTROL_CMD_PORT_UPDATE_DIRECTION (1 << 0)
#define SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS (1 << 1)
#define SPA_CONTROL_CMD_PORT_UPDATE_PROPS (1 << 2)
#define SPA_CONTROL_CMD_PORT_UPDATE_INFO (1 << 3)
/* SPA_CONTROL_CMD_PORT_REMOVED */
typedef struct {
uint32_t port_id;

View file

@ -49,6 +49,8 @@ typedef struct _SpaEvent SpaEvent;
*/
typedef enum {
SPA_EVENT_TYPE_INVALID = 0,
SPA_EVENT_TYPE_PORT_ADDED,
SPA_EVENT_TYPE_PORT_REMOVED,
SPA_EVENT_TYPE_STATE_CHANGE,
SPA_EVENT_TYPE_CAN_PULL_OUTPUT,
SPA_EVENT_TYPE_CAN_PUSH_INPUT,
@ -70,6 +72,10 @@ struct _SpaEvent {
size_t size;
};
typedef struct {
SpaDirection direction;
} SpaEventPortAdded;
typedef struct {
uint32_t buffer_id;
off_t offset;

View file

@ -74,18 +74,18 @@ typedef struct {
* @SPA_PORT_INFO_FLAG_NONE: no flags
* @SPA_PORT_INFO_FLAG_REMOVABLE: port can be removed
* @SPA_PORT_INFO_FLAG_OPTIONAL: processing on port is optional
* @SPA_PORT_INFO_FLAG_CAN_GIVE_BUFFER: the port can give a buffer
* @SPA_PORT_INFO_FLAG_CAN_USE_BUFFER: the port can use a provided buffer
* @SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS: the port can give a buffer
* @SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS: the port can use a provided buffer
* @SPA_PORT_INFO_FLAG_IN_PLACE: the port can process data in-place and will need
* a writable input buffer when no output buffer is specified.
* a writable input buffer
* @SPA_PORT_INFO_FLAG_NO_REF: the port does not keep a ref on the buffer
*/
typedef enum {
SPA_PORT_INFO_FLAG_NONE = 0,
SPA_PORT_INFO_FLAG_REMOVABLE = 1 << 0,
SPA_PORT_INFO_FLAG_OPTIONAL = 1 << 1,
SPA_PORT_INFO_FLAG_CAN_GIVE_BUFFER = 1 << 2,
SPA_PORT_INFO_FLAG_CAN_USE_BUFFER = 1 << 3,
SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS = 1 << 2,
SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS = 1 << 3,
SPA_PORT_INFO_FLAG_IN_PLACE = 1 << 4,
SPA_PORT_INFO_FLAG_NO_REF = 1 << 5,
} SpaPortInfoFlags;
@ -103,10 +103,10 @@ typedef enum {
*/
typedef struct {
SpaPortInfoFlags flags;
unsigned int maxbuffering;
uint64_t maxbuffering;
uint64_t latency;
SpaAllocParam **params;
uint32_t n_params;
unsigned int n_params;
const char **features;
} SpaPortInfo;

View file

@ -398,10 +398,15 @@ spa_control_iter_parse_cmd (SpaControlIter *iter,
switch (si->cmd) {
/* C -> S */
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
fprintf (stderr, "implement iter of %d\n", si->cmd);
break;
case SPA_CONTROL_CMD_PORT_UPDATE:
if (si->size < sizeof (SpaControlCmdPortUpdate))
return SPA_RESULT_ERROR;
memcpy (command, si->data, sizeof (SpaControlCmdPortUpdate));
break;
case SPA_CONTROL_CMD_PORT_REMOVED:
if (si->size < sizeof (SpaControlCmdPortRemoved))
return SPA_RESULT_ERROR;
@ -722,6 +727,55 @@ builder_add_cmd (struct stack_builder *sb, SpaControlCmd cmd, size_t size)
return p;
}
static size_t
calc_props_len (const SpaProps *props)
{
size_t len;
unsigned int i, j;
SpaPropInfo *pi;
SpaPropRangeInfo *ri;
/* props and unset mask */
len = sizeof (SpaProps) + sizeof (uint32_t);
for (i = 0; i < props->n_prop_info; i++) {
pi = (SpaPropInfo *) &props->prop_info[i];
len += sizeof (SpaPropInfo);
len += pi->name ? strlen (pi->name) + 1 : 0;
len += pi->description ? strlen (pi->description) + 1 : 0;
/* for the value and the default value */
len += pi->maxsize + pi->default_size;
for (j = 0; j < pi->n_range_values; j++) {
ri = (SpaPropRangeInfo *)&pi->range_values[j];
len += sizeof (SpaPropRangeInfo);
len += ri->name ? strlen (ri->name) + 1 : 0;
len += ri->description ? strlen (ri->description) + 1 : 0;
/* the size of the range value */
len += ri->size;
}
}
return len;
}
static size_t
calc_format_len (const SpaFormat *format)
{
return calc_props_len (&format->props) - sizeof (SpaProps) + sizeof (SpaFormat);
}
static void
builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu)
{
size_t len;
void *base;
/* calc len */
len = sizeof (SpaControlCmdPortUpdate);
base = builder_add_cmd (sb, SPA_CONTROL_CMD_PORT_UPDATE, len);
memcpy (base, pu, sizeof (SpaControlCmdPortUpdate));
/* FIXME add more things */
}
static void
builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf)
{
@ -738,24 +792,7 @@ builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf)
/* calculate length */
/* port_id + format + mask */
len = sizeof (uint32_t) + sizeof (SpaFormat) + sizeof (uint32_t);
for (i = 0; i < sp->n_prop_info; i++) {
pi = (SpaPropInfo *) &sp->prop_info[i];
len += sizeof (SpaPropInfo);
len += pi->name ? strlen (pi->name) + 1 : 0;
len += pi->description ? strlen (pi->description) + 1 : 0;
/* for the value and the default value */
len += pi->maxsize + pi->default_size;
for (j = 0; j < pi->n_range_values; j++) {
ri = (SpaPropRangeInfo *)&pi->range_values[j];
len += sizeof (SpaPropRangeInfo);
len += ri->name ? strlen (ri->name) + 1 : 0;
len += ri->description ? strlen (ri->description) + 1 : 0;
/* the size of the range value */
len += ri->size;
}
}
len = sizeof (uint32_t) + calc_format_len (sf->format);
base = builder_add_cmd (sb, SPA_CONTROL_CMD_SET_FORMAT, len);
memcpy (base, &sf->port_id, sizeof (uint32_t));
@ -881,10 +918,13 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder,
switch (cmd) {
/* C -> S */
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
fprintf (stderr, "implement builder of %d\n", cmd);
break;
case SPA_CONTROL_CMD_PORT_UPDATE:
builder_add_port_update (sb, command);
break;
case SPA_CONTROL_CMD_PORT_REMOVED:
p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdPortRemoved));
memcpy (p, command, sizeof (SpaControlCmdPortRemoved));

View file

@ -34,7 +34,7 @@ spa_debug_port_info (const SpaPortInfo *info)
fprintf (stderr, "SpaPortInfo %p:\n", info);
fprintf (stderr, " flags: \t%08x\n", info->flags);
fprintf (stderr, " maxbuffering: \t%u\n", info->maxbuffering);
fprintf (stderr, " maxbuffering: \t%"PRIu64"\n", info->maxbuffering);
fprintf (stderr, " latency: \t%" PRIu64 "\n", info->latency);
fprintf (stderr, " n_params: \t%d\n", info->n_params);
for (i = 0; i < info->n_params; i++) {

View file

@ -275,7 +275,7 @@ spa_audiomixer_node_add_port (SpaNode *node,
this->ports[port_id].valid = true;
this->port_count++;
this->ports[port_id].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFER |
this->ports[port_id].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_REMOVABLE |
SPA_PORT_INFO_FLAG_OPTIONAL |
SPA_PORT_INFO_FLAG_IN_PLACE;
@ -798,8 +798,8 @@ spa_audiomixer_init (const SpaHandleFactory *factory,
reset_audiomixer_props (&this->props[1]);
this->ports[0].valid = true;
this->ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_GIVE_BUFFER |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFER |
this->ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_NO_REF;
return SPA_RESULT_OK;
}

View file

@ -585,7 +585,7 @@ audiotestsrc_init (const SpaHandleFactory *factory,
this->props[1].props.prop_info = prop_info;
reset_audiotestsrc_props (&this->props[1]);
this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFER |
this->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_NO_REF;
this->status.flags = SPA_PORT_STATUS_FLAG_HAVE_OUTPUT;

View file

@ -57,7 +57,6 @@ typedef struct {
SpaFormat formats[2];
unsigned int n_buffers;
SpaBuffer **buffers;
SpaBuffer *remote_buffers[128];
} SpaProxyPort;
struct _SpaProxy {
@ -334,6 +333,59 @@ spa_proxy_node_get_port_ids (SpaNode *node,
return SPA_RESULT_OK;
}
static void
do_init_port (SpaProxy *this,
uint32_t port_id,
SpaDirection direction)
{
SpaEvent event;
SpaProxyPort *port;
SpaEventPortAdded pa;
fprintf (stderr, "%p: adding port %d, %d\n", this, port_id, direction);
port = &this->ports[port_id];
port->direction = direction;
port->valid = true;
port->have_format = false;
if (direction == SPA_DIRECTION_INPUT)
this->n_inputs++;
else
this->n_outputs++;
event.type = SPA_EVENT_TYPE_PORT_ADDED;
event.port_id = port_id;
event.data = &pa;
event.size = sizeof (pa);
pa.direction = direction;
this->event_cb (&this->node, &event, this->user_data);
}
static void
do_uninit_port (SpaProxy *this,
uint32_t port_id)
{
SpaEvent event;
SpaProxyPort *port;
fprintf (stderr, "%p: removing port %d\n", this, port_id);
port = &this->ports[port_id];
if (port->direction == SPA_DIRECTION_INPUT)
this->n_inputs--;
else
this->n_outputs--;
port->direction = SPA_DIRECTION_INVALID;
port->valid = false;
port->have_format = false;
event.type = SPA_EVENT_TYPE_PORT_REMOVED;
event.port_id = port_id;
event.data = NULL;
event.size = 0;
this->event_cb (&this->node, &event, this->user_data);
}
static SpaResult
spa_proxy_node_add_port (SpaNode *node,
@ -341,7 +393,6 @@ spa_proxy_node_add_port (SpaNode *node,
uint32_t port_id)
{
SpaProxy *this;
SpaProxyPort *port;
if (node == NULL || node->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -351,12 +402,7 @@ spa_proxy_node_add_port (SpaNode *node,
if (!CHECK_FREE_PORT_ID (this, port_id))
return SPA_RESULT_INVALID_PORT;
fprintf (stderr, "%p: adding port %d, %d\n", node, port_id, direction);
port = &this->ports[port_id];
port->direction = direction;
port->valid = true;
port->have_format = false;
do_init_port (this, port_id, direction);
return SPA_RESULT_OK;
}
@ -366,7 +412,6 @@ spa_proxy_node_remove_port (SpaNode *node,
uint32_t port_id)
{
SpaProxy *this;
SpaProxyPort *port;
if (node == NULL || node->handle == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
@ -376,8 +421,7 @@ spa_proxy_node_remove_port (SpaNode *node,
if (!CHECK_PORT_ID (this, port_id))
return SPA_RESULT_INVALID_PORT;
port = &this->ports[port_id];
port->valid = false;
do_uninit_port (this, port_id);
return SPA_RESULT_OK;
}
@ -868,11 +912,31 @@ parse_control (SpaProxy *this,
break;
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
case SPA_CONTROL_CMD_PORT_REMOVED:
fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd);
break;
case SPA_CONTROL_CMD_PORT_UPDATE:
{
SpaControlCmdPortUpdate pu;
SpaProxyPort *port;
fprintf (stderr, "proxy %p: got port update %d\n", this, cmd);
if (spa_control_iter_parse_cmd (&it, &pu) < 0)
break;
if (pu.port_id >= MAX_PORTS)
break;
port = &this->ports[pu.port_id];
if (!port->valid && pu.direction != SPA_DIRECTION_INVALID) {
do_init_port (this, pu.port_id, pu.direction);
} else {
do_uninit_port (this, pu.port_id);
}
break;
}
case SPA_CONTROL_CMD_STATE_CHANGE:
{
SpaControlCmdStateChange sc;

View file

@ -398,7 +398,8 @@ spa_v4l2_set_format (SpaV4l2Source *this, V4l2Format *f, bool try_only)
return 0;
state->fmt = fmt;
state->info.flags = SPA_PORT_INFO_FLAG_CAN_GIVE_BUFFER;
state->info.flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
state->info.maxbuffering = -1;
state->info.latency = -1;

View file

@ -695,10 +695,10 @@ volume_instantiate (const SpaHandleFactory *factory,
this->props[1].props.prop_info = prop_info;
reset_volume_props (&this->props[1]);
this->ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFER |
this->ports[0].info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_IN_PLACE;
this->ports[1].info.flags = SPA_PORT_INFO_FLAG_CAN_GIVE_BUFFER |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFER |
this->ports[1].info.flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS |
SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_NO_REF;
this->ports[0].status.flags = SPA_PORT_STATUS_FLAG_NEED_INPUT;