event: make events dynamic

Use uri for the events.
This commit is contained in:
Wim Taymans 2017-03-21 20:39:20 +01:00
parent 5bccb1aeea
commit 4d9f2c5161
23 changed files with 299 additions and 285 deletions

View file

@ -160,7 +160,7 @@ typedef struct {
void (*state_change) (void *object,
SpaNodeState state);
void (*event) (void *object,
SpaNodeEvent *event);
SpaEvent *event);
void (*destroy) (void *object);
} PinosClientNodeMethods;
@ -173,8 +173,8 @@ typedef struct {
typedef struct {
void (*done) (void *object,
int datafd);
void (*event) (void *object,
const SpaNodeEvent *event);
void (*event) (void *object,
const SpaEvent *event);
void (*add_port) (void *object,
uint32_t seq,
SpaDirection direction,

View file

@ -557,8 +557,8 @@ client_node_marshal_state_change (void *object,
}
static void
client_node_marshal_event (void *object,
SpaNodeEvent *event)
client_node_marshal_event (void *object,
SpaEvent *event)
{
PinosProxy *proxy = object;
PinosConnection *connection = proxy->context->protocol_private;
@ -617,7 +617,7 @@ client_node_demarshal_event (void *object,
{
PinosProxy *proxy = object;
SpaPODIter it;
const SpaNodeEvent *event;
const SpaEvent *event;
if (!spa_pod_iter_struct (&it, data, size) ||
!spa_pod_iter_get (&it,

View file

@ -388,9 +388,10 @@ add_request_clock_update (PinosStream *stream, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventRequestClockUpdate rcu =
SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_INIT (SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME, 0, 0);
SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_INIT (stream->context->uri.node_events.RequestClockUpdate,
SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME, 0, 0);
pinos_client_node_do_event (impl->node_proxy, (SpaNodeEvent*)&rcu);
pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&rcu);
}
static void
@ -400,8 +401,10 @@ add_async_complete (PinosStream *stream,
bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventAsyncComplete ac = SPA_NODE_EVENT_ASYNC_COMPLETE_INIT(seq, res);
pinos_client_node_do_event (impl->node_proxy, (SpaNodeEvent*)&ac);
SpaNodeEventAsyncComplete ac =
SPA_NODE_EVENT_ASYNC_COMPLETE_INIT (stream->context->uri.node_events.AsyncComplete,
seq, res);
pinos_client_node_do_event (impl->node_proxy, (SpaEvent*)&ac);
}
static void
@ -462,54 +465,47 @@ find_buffer (PinosStream *stream, uint32_t id)
static void
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
SpaEvent *event)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosContext *context = impl->this.context;
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_HAVE_OUTPUT:
{
int i;
if (SPA_EVENT_TYPE (event) == context->uri.node_events.HaveOutput) {
int i;
//pinos_log_debug ("stream %p: have output", stream);
//pinos_log_debug ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
if (input->buffer_id == SPA_ID_INVALID)
continue;
if (input->buffer_id == SPA_ID_INVALID)
continue;
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
input->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
break;
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
input->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
}
else if (SPA_EVENT_TYPE (event) == context->uri.node_events.NeedInput) {
//pinos_log_debug ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream);
}
else if (SPA_EVENT_TYPE (event) == context->uri.node_events.ReuseBuffer) {
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
case SPA_NODE_EVENT_NEED_INPUT:
//pinos_log_debug ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream);
break;
if (p->body.port_id.value != impl->port_id)
return;
if (impl->direction != SPA_DIRECTION_OUTPUT)
return;
case SPA_NODE_EVENT_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->body.port_id.value != impl->port_id)
break;
if (impl->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->body.buffer_id.value)) && bid->used) {
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->body.buffer_id.value);
}
break;
if ((bid = find_buffer (stream, p->body.buffer_id.value)) && bid->used) {
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->body.buffer_id.value);
}
default:
pinos_log_warn ("unexpected node event %d", SPA_NODE_EVENT_TYPE (event));
break;
}
else {
pinos_log_warn ("unexpected node event %d", SPA_EVENT_TYPE (event));
}
}
@ -529,13 +525,13 @@ on_rtsocket_condition (SpaSource *source,
}
if (mask & SPA_IO_IN) {
SpaNodeEvent event;
SpaEvent event;
uint64_t cmd;
read (impl->rtfd, &cmd, 8);
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (SPA_POD_SIZE (&event));
SpaEvent *ev = alloca (SPA_POD_SIZE (&event));
pinos_transport_parse_event (impl->trans, ev);
handle_rtnode_event (stream, ev);
}
@ -570,22 +566,10 @@ handle_socket (PinosStream *stream, int rtfd)
}
static void
handle_node_event (PinosStream *stream,
const SpaNodeEvent *event)
handle_node_event (PinosStream *stream,
const SpaEvent *event)
{
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_INVALID:
case SPA_NODE_EVENT_HAVE_OUTPUT:
case SPA_NODE_EVENT_NEED_INPUT:
case SPA_NODE_EVENT_ASYNC_COMPLETE:
case SPA_NODE_EVENT_REUSE_BUFFER:
case SPA_NODE_EVENT_ERROR:
case SPA_NODE_EVENT_BUFFERING:
case SPA_NODE_EVENT_REQUEST_REFRESH:
case SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unhandled node event %d", SPA_NODE_EVENT_TYPE (event));
break;
}
pinos_log_warn ("unhandled node event %d", SPA_EVENT_TYPE (event));
}
static bool
@ -658,8 +642,8 @@ client_node_done (void *object,
}
static void
client_node_event (void *object,
const SpaNodeEvent *event)
client_node_event (void *object,
const SpaEvent *event)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
@ -1154,10 +1138,11 @@ pinos_stream_recycle_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventReuseBuffer rb = SPA_NODE_EVENT_REUSE_BUFFER_INIT (impl->port_id, id);
SpaNodeEventReuseBuffer rb = SPA_NODE_EVENT_REUSE_BUFFER_INIT (stream->context->uri.node_events.ReuseBuffer,
impl->port_id, id);
uint64_t cmd = 1;
pinos_transport_add_event (impl->trans, (SpaNodeEvent *)&rb);
pinos_transport_add_event (impl->trans, (SpaEvent *)&rb);
write (impl->rtfd, &cmd, 8);
return true;

View file

@ -39,7 +39,7 @@ typedef struct {
size_t offset;
SpaRingbufferArea areas[2];
SpaNodeEvent current;
SpaEvent current;
} PinosTransportImpl;
static size_t
@ -204,7 +204,7 @@ pinos_transport_get_info (PinosTransport *trans,
SpaResult
pinos_transport_add_event (PinosTransport *trans,
SpaNodeEvent *event)
SpaEvent *event)
{
PinosTransportImpl *impl = (PinosTransportImpl *) trans;
SpaRingbufferArea areas[2];
@ -230,7 +230,7 @@ pinos_transport_add_event (PinosTransport *trans,
SpaResult
pinos_transport_next_event (PinosTransport *trans,
SpaNodeEvent *event)
SpaEvent *event)
{
PinosTransportImpl *impl = (PinosTransportImpl *) trans;
size_t avail;
@ -239,14 +239,14 @@ pinos_transport_next_event (PinosTransport *trans,
return SPA_RESULT_INVALID_ARGUMENTS;
avail = spa_ringbuffer_get_read_areas (trans->input_buffer, impl->areas);
if (avail < sizeof (SpaNodeEvent))
if (avail < sizeof (SpaEvent))
return SPA_RESULT_ENUM_END;
spa_ringbuffer_read_data (trans->input_buffer,
trans->input_data,
impl->areas,
&impl->current,
sizeof (SpaNodeEvent));
sizeof (SpaEvent));
*event = impl->current;

View file

@ -77,10 +77,10 @@ SpaResult pinos_transport_get_info (PinosTransport *trans,
PinosTransportInfo *info);
SpaResult pinos_transport_add_event (PinosTransport *trans,
SpaNodeEvent *event);
SpaEvent *event);
SpaResult pinos_transport_next_event (PinosTransport *trans,
SpaNodeEvent *event);
SpaEvent *event);
SpaResult pinos_transport_parse_event (PinosTransport *trans,
void *event);

View file

@ -48,4 +48,6 @@ pinos_uri_init (PinosURI *uri)
uri->spa_node = spa_id_map_get_id (uri->map, SPA_NODE_URI);
uri->spa_clock = spa_id_map_get_id (uri->map, SPA_CLOCK_URI);
uri->spa_monitor = spa_id_map_get_id (uri->map, SPA_MONITOR_URI);
spa_node_events_map (uri->map, &uri->node_events);
}

View file

@ -29,6 +29,7 @@ extern "C" {
#include <pinos/client/map.h>
#include <spa/include/spa/id-map.h>
#include <spa/include/spa/node-event.h>
typedef struct _PinosURI PinosURI;
@ -52,6 +53,8 @@ struct _PinosURI {
uint32_t spa_node;
uint32_t spa_clock;
uint32_t spa_monitor;
SpaNodeEvents node_events;
};
void pinos_uri_init (PinosURI *uri);

View file

@ -129,8 +129,10 @@ typedef struct
static void
send_async_complete (SpaProxy *this, uint32_t seq, SpaResult res)
{
SpaNodeEventAsyncComplete ac = SPA_NODE_EVENT_ASYNC_COMPLETE_INIT (seq, res);
this->event_cb (&this->node, (SpaNodeEvent *)&ac, this->user_data);
PinosCore *core = this->pnode->core;
SpaNodeEventAsyncComplete ac = SPA_NODE_EVENT_ASYNC_COMPLETE_INIT (core->uri.node_events.AsyncComplete,
seq, res);
this->event_cb (&this->node, (SpaEvent *)&ac, this->user_data);
}
static SpaResult
@ -164,7 +166,7 @@ static void
send_need_input (SpaProxy *this)
{
PinosNode *pnode = this->pnode;
SpaNodeEvent event = SPA_NODE_EVENT_INIT (SPA_NODE_EVENT_NEED_INPUT);
SpaEvent event = SPA_EVENT_INIT (pnode->core->uri.node_events.NeedInput);
uint64_t cmd = 1;
pinos_transport_add_event (pnode->transport, &event);
@ -175,7 +177,7 @@ static void
send_have_output (SpaProxy *this)
{
PinosNode *pnode = this->pnode;
SpaNodeEvent event = SPA_NODE_EVENT_INIT (SPA_NODE_EVENT_HAVE_OUTPUT);
SpaEvent event = SPA_EVENT_INIT (pnode->core->uri.node_events.HaveOutput);
uint64_t cmd = 1;
pinos_transport_add_event (pnode->transport, &event);
@ -775,8 +777,9 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
return SPA_RESULT_INVALID_PORT;
{
SpaNodeEventReuseBuffer rb = SPA_NODE_EVENT_REUSE_BUFFER_INIT (port_id, buffer_id);
pinos_transport_add_event (pnode->transport, (SpaNodeEvent *)&rb);
SpaNodeEventReuseBuffer rb = SPA_NODE_EVENT_REUSE_BUFFER_INIT (pnode->core->uri.node_events.ReuseBuffer,
port_id, buffer_id);
pinos_transport_add_event (pnode->transport, (SpaEvent *)&rb);
//write (this->data_source.fd, &cmd, 8);
}
@ -847,25 +850,10 @@ spa_proxy_node_process_output (SpaNode *node)
}
static SpaResult
handle_node_event (SpaProxy *this,
SpaNodeEvent *event)
handle_node_event (SpaProxy *this,
SpaEvent *event)
{
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_INVALID:
break;
case SPA_NODE_EVENT_ASYNC_COMPLETE:
case SPA_NODE_EVENT_HAVE_OUTPUT:
case SPA_NODE_EVENT_NEED_INPUT:
case SPA_NODE_EVENT_REUSE_BUFFER:
case SPA_NODE_EVENT_ERROR:
case SPA_NODE_EVENT_BUFFERING:
case SPA_NODE_EVENT_REQUEST_REFRESH:
case SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE:
this->event_cb (&this->node, event, this->user_data);
break;
}
this->event_cb (&this->node, event, this->user_data);
return SPA_RESULT_OK;
}
@ -945,8 +933,8 @@ client_node_state_change (void *object,
}
static void
client_node_event (void *object,
SpaNodeEvent *event)
client_node_event (void *object,
SpaEvent *event)
{
PinosResource *resource = object;
PinosClientNode *node = resource->object;
@ -984,13 +972,13 @@ proxy_on_data_fd_events (SpaSource *source)
}
if (source->rmask & SPA_IO_IN) {
SpaNodeEvent event;
SpaEvent event;
uint64_t cmd;
read (this->data_source.fd, &cmd, 8);
while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (SPA_POD_SIZE (&event));
SpaEvent *ev = alloca (SPA_POD_SIZE (&event));
pinos_transport_parse_event (pnode->transport, ev);
this->event_cb (&this->node, ev, this->user_data);
}

View file

@ -250,126 +250,108 @@ send_clock_update (PinosNode *this)
}
static void
on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
{
PinosNode *this = user_data;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_INVALID:
case SPA_NODE_EVENT_ERROR:
case SPA_NODE_EVENT_BUFFERING:
case SPA_NODE_EVENT_REQUEST_REFRESH:
break;
if (SPA_EVENT_TYPE (event) == this->core->uri.node_events.AsyncComplete) {
SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event;
case SPA_NODE_EVENT_ASYNC_COMPLETE:
{
SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event;
pinos_log_debug ("node %p: async complete event %d %d", this, ac->body.seq.value, ac->body.res.value);
if (!pinos_work_queue_complete (impl->work, this, ac->body.seq.value, ac->body.res.value)) {
pinos_signal_emit (&this->async_complete, this, ac->body.seq.value, ac->body.res.value);
}
break;
pinos_log_debug ("node %p: async complete event %d %d", this, ac->body.seq.value, ac->body.res.value);
if (!pinos_work_queue_complete (impl->work, this, ac->body.seq.value, ac->body.res.value)) {
pinos_signal_emit (&this->async_complete, this, ac->body.seq.value, ac->body.res.value);
}
}
else if (SPA_EVENT_TYPE (event) == this->core->uri.node_events.NeedInput) {
SpaResult res;
int i;
bool processed = false;
case SPA_NODE_EVENT_NEED_INPUT:
{
SpaResult res;
int i;
bool processed = false;
for (i = 0; i < this->transport->area->n_inputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
for (i = 0; i < this->transport->area->n_inputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
pi = &this->transport->inputs[i];
if (pi->buffer_id != SPA_ID_INVALID)
continue;
pi = &this->transport->inputs[i];
if (pi->buffer_id != SPA_ID_INVALID)
inport = this->input_port_map[i];
spa_list_for_each (link, &inport->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
inport = this->input_port_map[i];
spa_list_for_each (link, &inport->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
outport = link->rt.output;
po = &outport->node->transport->outputs[outport->port_id];
if (po->buffer_id != SPA_ID_INVALID) {
processed = true;
pi->buffer_id = po->buffer_id;
if ((res = spa_node_port_reuse_buffer (outport->node->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", outport->node, res);
po->buffer_id = SPA_ID_INVALID;
}
if ((res = spa_node_process_output (outport->node->node)) < 0)
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
}
if (processed) {
if ((res = spa_node_process_input (this->node)) < 0)
pinos_log_warn ("node %p: got process input %d", this, res);
}
break;
}
case SPA_NODE_EVENT_HAVE_OUTPUT:
{
SpaResult res;
int i;
bool processed = false;
for (i = 0; i < this->transport->area->n_outputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
po = &this->transport->outputs[i];
if (po->buffer_id == SPA_ID_INVALID)
continue;
outport = this->output_port_map[i];
spa_list_for_each (link, &outport->rt.links, rt.output_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
inport = link->rt.input;
pi = &inport->node->transport->inputs[inport->port_id];
outport = link->rt.output;
po = &outport->node->transport->outputs[outport->port_id];
if (po->buffer_id != SPA_ID_INVALID) {
processed = true;
pi->buffer_id = po->buffer_id;
if ((res = spa_node_port_reuse_buffer (outport->node->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", outport->node, res);
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
po->buffer_id = SPA_ID_INVALID;
}
if ((res = spa_node_port_reuse_buffer (this->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", this, res);
po->buffer_id = SPA_ID_INVALID;
if ((res = spa_node_process_output (outport->node->node)) < 0)
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
if (processed) {
if ((res = spa_node_process_output (this->node)) < 0)
pinos_log_warn ("node %p: got process output %d", this, res);
}
break;
}
case SPA_NODE_EVENT_REUSE_BUFFER:
break;
if (processed) {
if ((res = spa_node_process_input (this->node)) < 0)
pinos_log_warn ("node %p: got process input %d", this, res);
}
}
else if (SPA_EVENT_TYPE (event) == this->core->uri.node_events.HaveOutput) {
SpaResult res;
int i;
bool processed = false;
case SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE:
send_clock_update (this);
break;
for (i = 0; i < this->transport->area->n_outputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
po = &this->transport->outputs[i];
if (po->buffer_id == SPA_ID_INVALID)
continue;
outport = this->output_port_map[i];
spa_list_for_each (link, &outport->rt.links, rt.output_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
inport = link->rt.input;
pi = &inport->node->transport->inputs[inport->port_id];
processed = true;
pi->buffer_id = po->buffer_id;
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
}
if ((res = spa_node_port_reuse_buffer (this->node,
outport->port_id,
po->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", this, res);
po->buffer_id = SPA_ID_INVALID;
}
if (processed) {
if ((res = spa_node_process_output (this->node)) < 0)
pinos_log_warn ("node %p: got process output %d", this, res);
}
}
else if (SPA_EVENT_TYPE (event) == this->core->uri.node_events.RequestClockUpdate) {
send_clock_update (this);
}
}

View file

@ -546,8 +546,8 @@ client_node_marshal_done (void *object,
}
static void
client_node_marshal_event (void *object,
const SpaNodeEvent *event)
client_node_marshal_event (void *object,
const SpaEvent *event)
{
PinosResource *resource = object;
PinosConnection *connection = resource->client->protocol_private;
@ -930,7 +930,7 @@ client_node_demarshal_event (void *object,
{
PinosResource *resource = object;
SpaPODIter it;
SpaNodeEvent *event;
SpaEvent *event;
if (!spa_pod_iter_struct (&it, data, size) ||
!spa_pod_iter_get (&it, SPA_POD_TYPE_OBJECT, &event, 0))