spi: implement processing

Implement processing with writing to provided buffer or in-place
Fix types.
Add more PortInfo flags
Remove release-id, we now use a callback, which is more practical and
still allows deallocation out of the rt-threads if needed as well as
recycling the buffers into an atomic pool
Add more comments
Allow NULL in set_params to reset params
This commit is contained in:
Wim Taymans 2016-06-02 16:40:20 +02:00
parent b44d2d86b6
commit 03046301bf
5 changed files with 333 additions and 109 deletions

View file

@ -60,10 +60,16 @@ typedef enum {
SPI_DATA_TYPE_FD, SPI_DATA_TYPE_FD,
} SpiDataType; } SpiDataType;
/**
* SpiDataFd
* fd: a file descriptor
* offset: offset in the data referenced by @fd
* @size: size of data referenced by fd
*/
typedef struct { typedef struct {
int fd; int fd;
int offset; unsigned int offset;
size_t size; size_t size;
} SpiDataFD; } SpiDataFD;
/** /**
@ -79,8 +85,6 @@ typedef struct {
size_t size; size_t size;
} SpiData; } SpiData;
typedef void (*SpiNotify) (void *data);
/** /**
* SpiBuffer: * SpiBuffer:
* @refcount: reference counter * @refcount: reference counter
@ -95,10 +99,31 @@ struct _SpiBuffer {
volatile int refcount; volatile int refcount;
SpiNotify notify; SpiNotify notify;
size_t size; size_t size;
int n_metas; unsigned int n_metas;
SpiMeta *metas; SpiMeta *metas;
int n_datas; unsigned int n_datas;
SpiData *datas; SpiData *datas;
}; };
static inline SpiBuffer *
spi_buffer_ref (SpiBuffer *buffer)
{
if (buffer != NULL)
buffer->refcount++;
return buffer;
}
static inline SpiBuffer *
spi_buffer_unref (SpiBuffer *buffer)
{
if (buffer != NULL) {
if (--buffer->refcount == 0) {
buffer->notify (buffer);
return NULL;
}
}
return buffer;
}
#endif /* __SPI_BUFFER_H__ */ #endif /* __SPI_BUFFER_H__ */

View file

@ -36,11 +36,20 @@ typedef struct _SpiEvent SpiEvent;
* @SPI_PORT_INFO_FLAG_NONE: no flags * @SPI_PORT_INFO_FLAG_NONE: no flags
* @SPI_PORT_INFO_FLAG_REMOVABLE: port can be removed * @SPI_PORT_INFO_FLAG_REMOVABLE: port can be removed
* @SPI_PORT_INFO_FLAG_OPTIONAL: processing on port is optional * @SPI_PORT_INFO_FLAG_OPTIONAL: processing on port is optional
* @SPI_PORT_INFO_FLAG_CAN_GIVE_BUFFER: the port can give a buffer
* @SPI_PORT_INFO_FLAG_CAN_USE_BUFFER: the port can use a provided buffer
* @SPI_PORT_INFO_FLAG_IN_PLACE: the port can process data in-place and will need
* a writable input buffer when no output buffer is specified.
* @SPI_PORT_INFO_FLAG_NO_REF: the port does not keep a ref on the buffer
*/ */
typedef enum { typedef enum {
SPI_PORT_INFO_FLAG_NONE = 0, SPI_PORT_INFO_FLAG_NONE = 0,
SPI_PORT_INFO_FLAG_REMOVABLE = 1 << 0, SPI_PORT_INFO_FLAG_REMOVABLE = 1 << 0,
SPI_PORT_INFO_FLAG_OPTIONAL = 1 << 1, SPI_PORT_INFO_FLAG_OPTIONAL = 1 << 1,
SPI_PORT_INFO_FLAG_CAN_GIVE_BUFFER = 1 << 2,
SPI_PORT_INFO_FLAG_CAN_USE_BUFFER = 1 << 3,
SPI_PORT_INFO_FLAG_IN_PLACE = 1 << 4,
SPI_PORT_INFO_FLAG_NO_REF = 1 << 5,
} SpiPortInfoFlags; } SpiPortInfoFlags;
/** /**
@ -50,16 +59,16 @@ typedef enum {
* @align: required alignment of the data * @align: required alignment of the data
* @maxbuffering: the maximum amount of bytes that the element will keep * @maxbuffering: the maximum amount of bytes that the element will keep
* around internally * around internally
* @latency: latency on this port * @latency: latency on this port in nanoseconds
* @features: NULL terminated array of extra port features * @features: NULL terminated array of extra port features
* *
*/ */
typedef struct { typedef struct {
SpiPortInfoFlags flags; SpiPortInfoFlags flags;
int minsize; size_t minsize;
int align; uint32_t align;
int maxbuffering; unsigned int maxbuffering;
int latency; uint64_t latency;
const char **features; const char **features;
} SpiPortInfo; } SpiPortInfo;
@ -67,12 +76,12 @@ typedef struct {
* SpiPortStatusFlags: * SpiPortStatusFlags:
* @SPI_PORT_STATUS_FLAG_NONE: no status flags * @SPI_PORT_STATUS_FLAG_NONE: no status flags
* @SPI_PORT_STATUS_FLAG_HAVE_OUTPUT: port has output * @SPI_PORT_STATUS_FLAG_HAVE_OUTPUT: port has output
* @SPI_PORT_STATUS_FLAG_ACCEPT_INPUT: port accepts input * @SPI_PORT_STATUS_FLAG_NEED_INPUT: port needs input
*/ */
typedef enum { typedef enum {
SPI_PORT_STATUS_FLAG_NONE = 0, SPI_PORT_STATUS_FLAG_NONE = 0,
SPI_PORT_STATUS_FLAG_HAVE_OUTPUT = 1 << 0, SPI_PORT_STATUS_FLAG_HAVE_OUTPUT = 1 << 0,
SPI_PORT_STATUS_FLAG_ACCEPT_INPUT = 1 << 1, SPI_PORT_STATUS_FLAG_NEED_INPUT = 1 << 1,
} SpiPortStatusFlags; } SpiPortStatusFlags;
typedef struct { typedef struct {
@ -86,31 +95,36 @@ typedef enum {
SPI_EVENT_TYPE_HAVE_OUTPUT, SPI_EVENT_TYPE_HAVE_OUTPUT,
SPI_EVENT_TYPE_NEED_INPUT, SPI_EVENT_TYPE_NEED_INPUT,
SPI_EVENT_TYPE_REQUEST_DATA, SPI_EVENT_TYPE_REQUEST_DATA,
SPI_EVENT_TYPE_RELEASE_ID,
SPI_EVENT_TYPE_DRAINED, SPI_EVENT_TYPE_DRAINED,
SPI_EVENT_TYPE_MARKER, SPI_EVENT_TYPE_MARKER,
SPI_EVENT_TYPE_ERROR, SPI_EVENT_TYPE_ERROR,
} SpiEventType; } SpiEventType;
typedef struct {
int n_ids;
void **ids;
} SpiEventReleaseID;
struct _SpiEvent { struct _SpiEvent {
const void *id; volatile int refcount;
SpiNotify notify;
SpiEventType type; SpiEventType type;
int port_id; uint32_t port_id;
void *data; void *data;
size_t size; size_t size;
}; };
/**
* SpiDataFlags:
* @SPI_DATA_FLAG_NONE: no flag
* @SPI_DATA_FLAG_DISCARD: the buffer can be discarded
* @SPI_DATA_FLAG_FORMAT_CHANGED: the format of this port changed
* @SPI_DATA_FLAG_PROPERTIES_CHANGED: properties of this port changed
* @SPI_DATA_FLAG_REMOVED: this port is removed
* @SPI_DATA_FLAG_NO_BUFFER: no buffer was produced
*/
typedef enum { typedef enum {
SPI_DATA_FLAG_NONE = 0, SPI_DATA_FLAG_NONE = 0,
SPI_DATA_FLAG_FORMAT_CHANGED = (1 << 0), SPI_DATA_FLAG_DISCARD = (1 << 0),
SPI_DATA_FLAG_PROPERTIES_CHANGED = (1 << 1), SPI_DATA_FLAG_FORMAT_CHANGED = (1 << 1),
SPI_DATA_FLAG_EOS = (1 << 2), SPI_DATA_FLAG_PROPERTIES_CHANGED = (1 << 2),
SPI_DATA_FLAG_NO_BUFFER = (1 << 3), SPI_DATA_FLAG_REMOVED = (1 << 3),
SPI_DATA_FLAG_NO_BUFFER = (1 << 4),
} SpiDataFlags; } SpiDataFlags;
/** /**
@ -121,7 +135,7 @@ typedef enum {
* @event: an event * @event: an event
*/ */
typedef struct { typedef struct {
int port_id; uint32_t port_id;
SpiDataFlags flags; SpiDataFlags flags;
SpiBuffer *buffer; SpiBuffer *buffer;
SpiEvent *event; SpiEvent *event;
@ -139,8 +153,10 @@ typedef enum {
} SpiCommandType; } SpiCommandType;
typedef struct { typedef struct {
int port_id; volatile int refcount;
SpiNotify notify;
SpiCommandType type; SpiCommandType type;
uint32_t port_id;
void *data; void *data;
size_t size; size_t size;
} SpiCommand; } SpiCommand;
@ -156,19 +172,72 @@ typedef void (*SpiEventCallback) (SpiNode *node,
void *user_data); void *user_data);
/** /**
* SpiNodeInterface: * SpiNode:
* *
* Spi node interface. * The main processing nodes.
*/ */
struct _SpiNode { struct _SpiNode {
/* user_data that can be set by the application */
void * user_data; void * user_data;
int size;
/* the total size of this node. This can be used to expand this
* structure in the future */
size_t size;
/**
* SpiNode::get_params:
* @Node: a #SpiNode
* @props: a location for a #SpiParams pointer
*
* Get the configurable parameters of @node.
*
* The returned @props is a snapshot of the current configuration and
* can be modified. The modifications will take effect after a call
* to SpiNode::set_params.
*
* Returns: #SPI_RESULT_OK on success
* #SPI_RESULT_INVALID_ARGUMENTS when node or props are %NULL
* #SPI_RESULT_NOT_IMPLEMENTED when there are no properties
* implemented on @node
*/
SpiResult (*get_params) (SpiNode *node, SpiResult (*get_params) (SpiNode *node,
SpiParams **props); SpiParams **props);
/**
* SpiNode::set_params:
* @Node: a #SpiNode
* @props: a #SpiParams
*
* Set the configurable parameters in @node.
*
* Usually, @props will be obtained from SpiNode::get_params and then
* modified but it is also possible to set another #SpiParams object
* as long as its keys and types match those of SpiParams::get_params.
*
* Properties with keys that are not known are ignored.
*
* If @props is NULL, all the parameters are reset to their defaults.
*
* Returns: #SPI_RESULT_OK on success
* #SPI_RESULT_INVALID_ARGUMENTS when node is %NULL
* #SPI_RESULT_NOT_IMPLEMENTED when no properties can be
* modified on @node.
* #SPI_RESULT_WRONG_PARAM_TYPE when a property has the wrong
* type.
*/
SpiResult (*set_params) (SpiNode *node, SpiResult (*set_params) (SpiNode *node,
const SpiParams *props); const SpiParams *props);
/**
* SpiNode::send_command:
* @Node: a #SpiNode
* @command: a #SpiCommand
*
* Send a command to @node.
*
* Returns: #SPI_RESULT_OK on success
* #SPI_RESULT_INVALID_ARGUMENTS when node or command is %NULL
* #SPI_RESULT_NOT_IMPLEMENTED when this node can't process commands
* #SPI_RESULT_INVALID_COMMAND @command is an invalid command
*/
SpiResult (*send_command) (SpiNode *node, SpiResult (*send_command) (SpiNode *node,
SpiCommand *command); SpiCommand *command);
SpiResult (*get_event) (SpiNode *node, SpiResult (*get_event) (SpiNode *node,
@ -176,55 +245,82 @@ struct _SpiNode {
SpiResult (*set_event_callback) (SpiNode *node, SpiResult (*set_event_callback) (SpiNode *node,
SpiEventCallback callback, SpiEventCallback callback,
void *user_data); void *user_data);
/**
* SpiNode::get_n_ports:
* @Node: a #SpiNode
* @n_input_ports: location to hold the number of input ports or %NULL
* @max_input_ports: location to hold the maximum number of input ports or %NULL
* @n_output_ports: location to hold the number of output ports or %NULL
* @max_output_ports: location to hold the maximum number of output ports or %NULL
*
* Get the current number of input and output ports and also the maximum
* number of ports.
*
* Returns: #SPI_RESULT_OK on success
* #SPI_RESULT_INVALID_ARGUMENTS when node is %NULL
*/
SpiResult (*get_n_ports) (SpiNode *node, SpiResult (*get_n_ports) (SpiNode *node,
int *n_input_ports, unsigned int *n_input_ports,
int *max_input_ports, unsigned int *max_input_ports,
int *n_output_ports, unsigned int *n_output_ports,
int *max_output_ports); unsigned int *max_output_ports);
/**
* SpiNode::get_port_ids:
* @Node: a #SpiNode
* @n_input_ports: size of the @input_ids array
* @input_ids: array to store the input stream ids
* @n_output_ports: size of the @output_ids array
* @output_ids: array to store the output stream ids
*
* Get the current number of input and output ports and also the maximum
* number of ports.
*
* Returns: #SPI_RESULT_OK on success
* #SPI_RESULT_INVALID_ARGUMENTS when node is %NULL
*/
SpiResult (*get_port_ids) (SpiNode *node, SpiResult (*get_port_ids) (SpiNode *node,
int n_input_ports, unsigned int n_input_ports,
int *input_ids, uint32_t *input_ids,
int n_output_ports, unsigned int n_output_ports,
int *output_ids); uint32_t *output_ids);
SpiResult (*add_port) (SpiNode *node, SpiResult (*add_port) (SpiNode *node,
SpiDirection direction, SpiDirection direction,
int *port_id); uint32_t *port_id);
SpiResult (*remove_port) (SpiNode *node, SpiResult (*remove_port) (SpiNode *node,
int port_id); uint32_t port_id);
SpiResult (*get_port_formats) (SpiNode *node, SpiResult (*get_port_formats) (SpiNode *node,
int port_id, uint32_t port_id,
int format_idx, unsigned int format_idx,
SpiParams **format); SpiParams **format);
SpiResult (*set_port_format) (SpiNode *node, SpiResult (*set_port_format) (SpiNode *node,
int port_id, uint32_t port_id,
int test_only, int test_only,
const SpiParams *format); const SpiParams *format);
SpiResult (*get_port_format) (SpiNode *node, SpiResult (*get_port_format) (SpiNode *node,
int port_id, uint32_t port_id,
const SpiParams **format); const SpiParams **format);
SpiResult (*get_port_info) (SpiNode *node, SpiResult (*get_port_info) (SpiNode *node,
int port_id, uint32_t port_id,
SpiPortInfo *info); SpiPortInfo *info);
SpiResult (*get_port_params) (SpiNode *node, SpiResult (*get_port_params) (SpiNode *node,
int port_id, uint32_t port_id,
SpiParams **params); SpiParams **params);
SpiResult (*set_port_params) (SpiNode *node, SpiResult (*set_port_params) (SpiNode *node,
int port_id, uint32_t port_id,
const SpiParams *params); const SpiParams *params);
SpiResult (*get_port_status) (SpiNode *node, SpiResult (*get_port_status) (SpiNode *node,
int port_id, uint32_t port_id,
SpiPortStatus *status); SpiPortStatus *status);
SpiResult (*send_port_data) (SpiNode *node, SpiResult (*send_port_data) (SpiNode *node,
SpiDataInfo *data); SpiDataInfo *data);
SpiResult (*receive_port_data) (SpiNode *node, SpiResult (*receive_port_data) (SpiNode *node,
int n_data, unsigned int n_data,
SpiDataInfo *data); SpiDataInfo *data);
}; };

View file

@ -47,6 +47,8 @@ typedef enum {
SPI_RESULT_INVALID_ARGUMENTS = -21, SPI_RESULT_INVALID_ARGUMENTS = -21,
} SpiResult; } SpiResult;
typedef void (*SpiNotify) (void *data);
G_END_DECLS G_END_DECLS
#endif /* __SPI_RESULT_H__ */ #endif /* __SPI_RESULT_H__ */

View file

@ -43,6 +43,7 @@ struct _SpiVolume {
SpiNode node; SpiNode node;
SpiVolumeParams params; SpiVolumeParams params;
SpiVolumeParams tmp_params;
SpiEvent *event; SpiEvent *event;
SpiEvent last_event; SpiEvent last_event;
@ -161,15 +162,24 @@ get_param (const SpiParams *params,
return res; return res;
} }
static void
reset_volume_params (SpiVolumeParams *params)
{
params->volume = default_volume;
params->mute = default_mute;
}
static SpiResult static SpiResult
spi_volume_node_get_params (SpiNode *node, spi_volume_node_get_params (SpiNode *node,
SpiParams **params) SpiParams **params)
{ {
static SpiVolumeParams p;
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
memcpy (&p, &this->params, sizeof (p)); if (node == NULL || params == NULL)
*params = &p.param; return SPI_RESULT_INVALID_ARGUMENTS;
memcpy (&this->tmp_params, &this->params, sizeof (this->tmp_params));
*params = &this->tmp_params.param;
return SPI_RESULT_OK; return SPI_RESULT_OK;
} }
@ -184,6 +194,14 @@ spi_volume_node_set_params (SpiNode *node,
size_t size; size_t size;
const void *value; const void *value;
if (node == NULL)
return SPI_RESULT_INVALID_ARGUMENTS;
if (params == NULL) {
reset_volume_params (p);
return SPI_RESULT_OK;
}
if (params->get_param (params, 0, &type, &size, &value) == 0) { if (params->get_param (params, 0, &type, &size, &value) == 0) {
if (type != SPI_PARAM_TYPE_DOUBLE) if (type != SPI_PARAM_TYPE_DOUBLE)
return SPI_RESULT_WRONG_PARAM_TYPE; return SPI_RESULT_WRONG_PARAM_TYPE;
@ -204,6 +222,9 @@ spi_volume_node_send_command (SpiNode *node,
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
SpiResult res = SPI_RESULT_NOT_IMPLEMENTED; SpiResult res = SPI_RESULT_NOT_IMPLEMENTED;
if (node == NULL || command == NULL)
return SPI_RESULT_INVALID_ARGUMENTS;
switch (command->type) { switch (command->type) {
case SPI_COMMAND_INVALID: case SPI_COMMAND_INVALID:
res = SPI_RESULT_INVALID_COMMAND; res = SPI_RESULT_INVALID_COMMAND;
@ -266,24 +287,32 @@ spi_volume_node_set_event_callback (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_get_n_ports (SpiNode *node, spi_volume_node_get_n_ports (SpiNode *node,
int *n_input_ports, unsigned int *n_input_ports,
int *max_input_ports, unsigned int *max_input_ports,
int *n_output_ports, unsigned int *n_output_ports,
int *max_output_ports) unsigned int *max_output_ports)
{ {
*n_input_ports = 1; if (node == NULL)
*n_output_ports = 1; return SPI_RESULT_INVALID_ARGUMENTS;
*max_input_ports = 1;
*max_output_ports = 1; if (n_input_ports)
*n_input_ports = 1;
if (n_output_ports)
*n_output_ports = 1;
if (max_input_ports)
*max_input_ports = 1;
if (max_output_ports)
*max_output_ports = 1;
return SPI_RESULT_OK; return SPI_RESULT_OK;
} }
static SpiResult static SpiResult
spi_volume_node_get_port_ids (SpiNode *node, spi_volume_node_get_port_ids (SpiNode *node,
int n_input_ports, unsigned int n_input_ports,
int *input_ids, uint32_t *input_ids,
int n_output_ports, unsigned int n_output_ports,
int *output_ids) uint32_t *output_ids)
{ {
if (n_input_ports > 0) if (n_input_ports > 0)
input_ids[0] = 0; input_ids[0] = 0;
@ -297,14 +326,14 @@ spi_volume_node_get_port_ids (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_add_port (SpiNode *node, spi_volume_node_add_port (SpiNode *node,
SpiDirection direction, SpiDirection direction,
int *port_id) uint32_t *port_id)
{ {
return SPI_RESULT_NOT_IMPLEMENTED; return SPI_RESULT_NOT_IMPLEMENTED;
} }
static SpiResult static SpiResult
spi_volume_node_remove_port (SpiNode *node, spi_volume_node_remove_port (SpiNode *node,
int port_id) uint32_t port_id)
{ {
return SPI_RESULT_NOT_IMPLEMENTED; return SPI_RESULT_NOT_IMPLEMENTED;
} }
@ -504,8 +533,8 @@ get_format_param (const SpiParams *params,
static SpiResult static SpiResult
spi_volume_node_get_port_formats (SpiNode *node, spi_volume_node_get_port_formats (SpiNode *node,
int port_id, uint32_t port_id,
int format_idx, unsigned int format_idx,
SpiParams **format) SpiParams **format)
{ {
static SpiVolumeFormat fmt; static SpiVolumeFormat fmt;
@ -531,7 +560,7 @@ spi_volume_node_get_port_formats (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_set_port_format (SpiNode *node, spi_volume_node_set_port_format (SpiNode *node,
int port_id, uint32_t port_id,
int test_only, int test_only,
const SpiParams *format) const SpiParams *format)
{ {
@ -600,7 +629,7 @@ spi_volume_node_set_port_format (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_get_port_format (SpiNode *node, spi_volume_node_get_port_format (SpiNode *node,
int port_id, uint32_t port_id,
const SpiParams **format) const SpiParams **format)
{ {
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
@ -618,20 +647,28 @@ spi_volume_node_get_port_format (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_get_port_info (SpiNode *node, spi_volume_node_get_port_info (SpiNode *node,
int port_id, uint32_t port_id,
SpiPortInfo *info) SpiPortInfo *info)
{ {
if (port_id != 0) switch (port_id) {
return SPI_RESULT_INVALID_PORT; case 0:
info->flags = SPI_PORT_INFO_FLAG_CAN_USE_BUFFER |
info->flags = SPI_PORT_INFO_FLAG_NONE; SPI_PORT_INFO_FLAG_IN_PLACE;
break;
case 1:
info->flags = SPI_PORT_INFO_FLAG_CAN_GIVE_BUFFER |
SPI_PORT_INFO_FLAG_CAN_USE_BUFFER |
SPI_PORT_INFO_FLAG_NO_REF;
break;
default:
return SPI_RESULT_INVALID_PORT;
}
return SPI_RESULT_OK; return SPI_RESULT_OK;
} }
static SpiResult static SpiResult
spi_volume_node_get_port_params (SpiNode *node, spi_volume_node_get_port_params (SpiNode *node,
int port_id, uint32_t port_id,
SpiParams **params) SpiParams **params)
{ {
return SPI_RESULT_NOT_IMPLEMENTED; return SPI_RESULT_NOT_IMPLEMENTED;
@ -639,7 +676,7 @@ spi_volume_node_get_port_params (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_set_port_params (SpiNode *node, spi_volume_node_set_port_params (SpiNode *node,
int port_id, uint32_t port_id,
const SpiParams *params) const SpiParams *params)
{ {
return SPI_RESULT_NOT_IMPLEMENTED; return SPI_RESULT_NOT_IMPLEMENTED;
@ -647,7 +684,7 @@ spi_volume_node_set_port_params (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_get_port_status (SpiNode *node, spi_volume_node_get_port_status (SpiNode *node,
int port_id, uint32_t port_id,
SpiPortStatus *status) SpiPortStatus *status)
{ {
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
@ -659,7 +696,7 @@ spi_volume_node_get_port_status (SpiNode *node,
switch (port_id) { switch (port_id) {
case 0: case 0:
if (this->input_buffer == NULL) if (this->input_buffer == NULL)
flags |= SPI_PORT_STATUS_FLAG_ACCEPT_INPUT; flags |= SPI_PORT_STATUS_FLAG_NEED_INPUT;
break; break;
case 1: case 1:
if (this->input_buffer != NULL) if (this->input_buffer != NULL)
@ -678,12 +715,18 @@ spi_volume_node_send_port_data (SpiNode *node,
SpiDataInfo *data) SpiDataInfo *data)
{ {
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
SpiBuffer *buffer = data->buffer; SpiBuffer *buffer;
SpiEvent *event = data->event; SpiEvent *event;
if (node == NULL || data == NULL)
return SPI_RESULT_INVALID_ARGUMENTS;
if (data->port_id != 0) if (data->port_id != 0)
return SPI_RESULT_INVALID_PORT; return SPI_RESULT_INVALID_PORT;
event = data->event;
buffer = data->buffer;
if (buffer == NULL && event == NULL) if (buffer == NULL && event == NULL)
return SPI_RESULT_INVALID_ARGUMENTS; return SPI_RESULT_INVALID_ARGUMENTS;
@ -694,8 +737,7 @@ spi_volume_node_send_port_data (SpiNode *node,
if (this->input_buffer != NULL) if (this->input_buffer != NULL)
return SPI_RESULT_HAVE_ENOUGH_INPUT; return SPI_RESULT_HAVE_ENOUGH_INPUT;
buffer->refcount++; this->input_buffer = spi_buffer_ref (buffer);
this->input_buffer = buffer;
} }
if (event) { if (event) {
switch (event->type) { switch (event->type) {
@ -709,11 +751,18 @@ spi_volume_node_send_port_data (SpiNode *node,
static SpiResult static SpiResult
spi_volume_node_receive_port_data (SpiNode *node, spi_volume_node_receive_port_data (SpiNode *node,
int n_data, unsigned int n_data,
SpiDataInfo *data) SpiDataInfo *data)
{ {
SpiVolume *this = (SpiVolume *) node; SpiVolume *this = (SpiVolume *) node;
int i, n_samples; unsigned int si, di, i, n_samples, n_bytes, soff, doff ;
SpiBuffer *sbuf, *dbuf;
SpiData *sd, *dd;
uint16_t *src, *dst;
double volume;
if (node == NULL || n_data == 0 || data == NULL)
return SPI_RESULT_INVALID_ARGUMENTS;
if (data->port_id != 1) if (data->port_id != 1)
return SPI_RESULT_INVALID_PORT; return SPI_RESULT_INVALID_PORT;
@ -724,13 +773,56 @@ spi_volume_node_receive_port_data (SpiNode *node,
if (this->input_buffer == NULL) if (this->input_buffer == NULL)
return SPI_RESULT_NEED_MORE_INPUT; return SPI_RESULT_NEED_MORE_INPUT;
n_samples = 4096; volume = this->params.volume;
for (i = 0; i < n_samples; i++) { sbuf = this->input_buffer;
dbuf = data->buffer ? data->buffer : this->input_buffer;
si = di = 0;
soff = doff = 0;
while (TRUE) {
if (si == sbuf->n_datas || di == dbuf->n_datas)
break;
sd = &sbuf->datas[si];
dd = &dbuf->datas[di];
if (sd->type != SPI_DATA_TYPE_MEMPTR) {
si++;
continue;
}
if (dd->type != SPI_DATA_TYPE_MEMPTR) {
di++;
continue;
}
src = (uint16_t*) ((uint8_t*)sd->data + soff);
dst = (uint16_t*) ((uint8_t*)dd->data + doff);
n_bytes = MIN (sd->size - soff, dd->size - doff);
n_samples = n_bytes / sizeof (uint16_t);
for (i = 0; i < n_samples; i++)
*src++ = *dst++ * volume;
soff += n_bytes;
doff += n_bytes;
if (soff >= sd->size) {
si++;
soff = 0;
}
if (doff >= dd->size) {
di++;
doff = 0;
}
} }
data->buffer = this->input_buffer; if (sbuf != dbuf)
spi_buffer_unref (sbuf);
this->input_buffer = NULL; this->input_buffer = NULL;
data->buffer = dbuf;
return SPI_RESULT_OK; return SPI_RESULT_OK;
} }
@ -766,6 +858,7 @@ spi_volume_new (void)
this->params.param.get_param_info = get_param_info; this->params.param.get_param_info = get_param_info;
this->params.param.set_param = set_param; this->params.param.set_param = set_param;
this->params.param.get_param = get_param; this->params.param.get_param = get_param;
reset_volume_params (&this->params);
return node; return node;
} }

View file

@ -146,7 +146,7 @@ inspect_node (SpiNode *node)
{ {
SpiResult res; SpiResult res;
SpiParams *params; SpiParams *params;
int n_input, max_input, n_output, max_output, i; unsigned int n_input, max_input, n_output, max_output, i;
SpiParams *format; SpiParams *format;
const SpiParams *cformat; const SpiParams *cformat;
uint32_t samplerate; uint32_t samplerate;
@ -235,9 +235,6 @@ handle_event (SpiNode *node)
case SPI_EVENT_TYPE_REQUEST_DATA: case SPI_EVENT_TYPE_REQUEST_DATA:
printf ("got request-data notify\n"); printf ("got request-data notify\n");
break; break;
case SPI_EVENT_TYPE_RELEASE_ID:
printf ("got release-id notify\n");
break;
case SPI_EVENT_TYPE_DRAINED: case SPI_EVENT_TYPE_DRAINED:
printf ("got drained notify\n"); printf ("got drained notify\n");
break; break;
@ -315,8 +312,7 @@ push_input (SpiNode *node)
mybuf = free_list; mybuf = free_list;
free_list = mybuf->next; free_list = mybuf->next;
printf ("alloc buffer %p\n", mybuf); printf ("alloc input buffer %p\n", mybuf);
mybuf->buffer.refcount = 1; mybuf->buffer.refcount = 1;
info.port_id = 0; info.port_id = 0;
@ -324,11 +320,9 @@ push_input (SpiNode *node)
info.buffer = &mybuf->buffer; info.buffer = &mybuf->buffer;
info.event = NULL; info.event = NULL;
if ((res = node->send_port_data (node, &info)) < 0) res = node->send_port_data (node, &info);
printf ("got error %d\n", res);
if (--mybuf->buffer.refcount == 0) spi_buffer_unref (&mybuf->buffer);
mybuf->buffer.notify (mybuf);
return res; return res;
} }
@ -338,20 +332,24 @@ pull_output (SpiNode *node)
{ {
SpiDataInfo info[1] = { { 0, }, }; SpiDataInfo info[1] = { { 0, }, };
SpiResult res; SpiResult res;
MyBuffer *mybuf;
SpiBuffer *buf; SpiBuffer *buf;
mybuf = free_list;
free_list = mybuf->next;
printf ("alloc output buffer %p\n", mybuf);
mybuf->buffer.refcount = 1;
info[0].port_id = 1; info[0].port_id = 1;
info[0].buffer = NULL; info[0].buffer = &mybuf->buffer;
info[0].event = NULL; info[0].event = NULL;
if ((res = node->receive_port_data (node, 1, info)) < 0) res = node->receive_port_data (node, 1, info);
printf ("got error %d\n", res);
buf = info[0].buffer; buf = info[0].buffer;
if (buf) { spi_buffer_unref (buf);
if (--buf->refcount == 0)
buf->notify (buf);
}
return res; return res;
} }
@ -387,6 +385,8 @@ main (gint argc, gchar *argv[])
state = 0; state = 0;
while (TRUE) { while (TRUE) {
SpiPortStatus status;
if (state == 0) { if (state == 0) {
if ((res = push_input (node)) < 0) { if ((res = push_input (node)) < 0) {
if (res == SPI_RESULT_HAVE_ENOUGH_INPUT) if (res == SPI_RESULT_HAVE_ENOUGH_INPUT)
@ -396,6 +396,10 @@ main (gint argc, gchar *argv[])
break; break;
} }
} }
if ((res = node->get_port_status (node, 1, &status)) < 0)
printf ("got error %d\n", res);
else if (status.flags & SPI_PORT_STATUS_FLAG_HAVE_OUTPUT)
state = 1;
} }
if (state == 1) { if (state == 1) {
if ((res = pull_output (node)) < 0) { if ((res = pull_output (node)) < 0) {
@ -406,6 +410,10 @@ main (gint argc, gchar *argv[])
break; break;
} }
} }
if ((res = node->get_port_status (node, 0, &status)) < 0)
printf ("got error %d\n", res);
else if (status.flags & SPI_PORT_STATUS_FLAG_NEED_INPUT)
state = 0;
} }
} }