node: improve callbacks

Make separate callbacks for events and RT notifications.
This commit is contained in:
Wim Taymans 2017-05-11 10:29:20 +02:00
parent fb0919b8b7
commit 3b33e3d362
32 changed files with 557 additions and 481 deletions

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
@ -233,7 +233,7 @@ typedef struct {
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
uint32_t flags,
const SpaFormat *format);
void (*set_property) (void *object,
uint32_t seq,

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or

View file

@ -378,7 +378,7 @@ static inline void
send_have_output (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaEvent ho = SPA_EVENT_INIT (stream->context->type.event_node.HaveOutput);
SpaEvent ho = SPA_EVENT_INIT (stream->context->type.event_transport.HaveOutput);
uint64_t cmd = 1;
pinos_transport_add_event (impl->trans, &ho);
@ -483,7 +483,7 @@ handle_rtnode_event (PinosStream *stream,
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosContext *context = impl->this.context;
if (SPA_EVENT_TYPE (event) == context->type.event_node.HaveOutput) {
if (SPA_EVENT_TYPE (event) == context->type.event_transport.HaveOutput) {
int i;
for (i = 0; i < impl->trans->area->n_inputs; i++) {
@ -498,7 +498,7 @@ handle_rtnode_event (PinosStream *stream,
}
send_need_input (stream);
}
else if (SPA_EVENT_TYPE (event) == context->type.event_node.NeedInput) {
else if (SPA_EVENT_TYPE (event) == context->type.event_transport.NeedInput) {
int i;
for (i = 0; i < impl->trans->area->n_outputs; i++) {
@ -516,8 +516,8 @@ handle_rtnode_event (PinosStream *stream,
pinos_signal_emit (&stream->need_buffer, stream);
impl->in_need_buffer = false;
}
else if (SPA_EVENT_TYPE (event) == context->type.event_node.ReuseBuffer) {
SpaEventNodeReuseBuffer *p = (SpaEventNodeReuseBuffer *) event;
else if (SPA_EVENT_TYPE (event) == context->type.event_transport.ReuseBuffer) {
PinosEventTransportReuseBuffer *p = (PinosEventTransportReuseBuffer *) event;
if (p->body.port_id.value != impl->port_id)
return;
@ -699,12 +699,12 @@ client_node_remove_port (void *object,
}
static void
client_node_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
client_node_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
uint32_t flags,
const SpaFormat *format)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
@ -1138,8 +1138,8 @@ pinos_stream_recycle_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (stream->context->type.event_node.ReuseBuffer,
impl->port_id, id);
PinosEventTransportReuseBuffer rb = PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT
(stream->context->type.event_transport.ReuseBuffer, impl->port_id, id);
BufferId *bid;
uint64_t cmd = 1;

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
@ -83,6 +83,46 @@ SpaResult pinos_transport_next_event (PinosTransport *trans,
SpaResult pinos_transport_parse_event (PinosTransport *trans,
void *event);
#define PINOS_TYPE_EVENT__Transport SPA_TYPE_EVENT_BASE "Transport"
#define PINOS_TYPE_EVENT_TRANSPORT_BASE PINOS_TYPE_EVENT__Transport ":"
#define PINOS_TYPE_EVENT_TRANSPORT__HaveOutput PINOS_TYPE_EVENT_TRANSPORT_BASE "HaveOutput"
#define PINOS_TYPE_EVENT_TRANSPORT__NeedInput PINOS_TYPE_EVENT_TRANSPORT_BASE "NeedInput"
#define PINOS_TYPE_EVENT_TRANSPORT__ReuseBuffer PINOS_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer"
typedef struct {
uint32_t HaveOutput;
uint32_t NeedInput;
uint32_t ReuseBuffer;
} PinosTypeEventTransport;
static inline void
pinos_type_event_transport_map (SpaTypeMap *map, PinosTypeEventTransport *type)
{
if (type->HaveOutput == 0) {
type->HaveOutput = spa_type_map_get_id (map, PINOS_TYPE_EVENT_TRANSPORT__HaveOutput);
type->NeedInput = spa_type_map_get_id (map, PINOS_TYPE_EVENT_TRANSPORT__NeedInput);
type->ReuseBuffer = spa_type_map_get_id (map, PINOS_TYPE_EVENT_TRANSPORT__ReuseBuffer);
}
}
typedef struct {
SpaPODObjectBody body;
SpaPODInt port_id;
SpaPODInt buffer_id;
} PinosEventTransportReuseBufferBody;
typedef struct {
SpaPOD pod;
PinosEventTransportReuseBufferBody body;
} PinosEventTransportReuseBuffer;
#define PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT(type,port_id,buffer_id) \
SPA_EVENT_INIT_COMPLEX (sizeof (PinosEventTransportReuseBufferBody), type, \
SPA_POD_INT_INIT (port_id), \
SPA_POD_INT_INIT (buffer_id))
#ifdef __cplusplus
} /* extern "C" */
#endif

View file

@ -59,6 +59,8 @@ pinos_type_init (PinosType *type)
spa_type_alloc_param_buffers_map (type->map, &type->alloc_param_buffers);
spa_type_alloc_param_meta_enable_map (type->map, &type->alloc_param_meta_enable);
spa_type_alloc_param_video_padding_map (type->map, &type->alloc_param_video_padding);
pinos_type_event_transport_map (type->map, &type->event_transport);
}
bool

View file

@ -24,13 +24,15 @@
extern "C" {
#endif
#include <pinos/client/map.h>
#include <spa/include/spa/type-map.h>
#include <spa/include/spa/event-node.h>
#include <spa/include/spa/command-node.h>
#include <spa/include/spa/monitor.h>
#include <spa/include/spa/alloc-param.h>
#include <pinos/client/map.h>
#include <pinos/client/transport.h>
typedef struct _PinosType PinosType;
/**
@ -64,6 +66,7 @@ struct _PinosType {
SpaTypeAllocParamBuffers alloc_param_buffers;
SpaTypeAllocParamMetaEnable alloc_param_meta_enable;
SpaTypeAllocParamVideoPadding alloc_param_video_padding;
PinosTypeEventTransport event_transport;
};
void pinos_type_init (PinosType *type);

View file

@ -1,4 +1,4 @@
/* Simple Plugin API
/* Pinos
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or

View file

@ -96,7 +96,7 @@ struct _SpaProxy
SpaLoop *main_loop;
SpaLoop *data_loop;
SpaEventNodeCallback event_cb;
SpaNodeCallbacks callbacks;
void *user_data;
PinosResource *resource;
@ -172,7 +172,7 @@ static inline void
send_need_input (SpaProxy *this)
{
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy);
SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_node.NeedInput);
SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.NeedInput);
pinos_transport_add_event (impl->transport, &event);
do_flush (this);
@ -182,7 +182,7 @@ static inline void
send_have_output (SpaProxy *this)
{
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy);
SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_node.HaveOutput);
SpaEvent event = SPA_EVENT_INIT (impl->core->type.event_transport.HaveOutput);
pinos_transport_add_event (impl->transport, &event);
do_flush (this);
@ -225,9 +225,10 @@ spa_proxy_node_send_command (SpaNode *node,
}
static SpaResult
spa_proxy_node_set_event_callback (SpaNode *node,
SpaEventNodeCallback event,
void *user_data)
spa_proxy_node_set_callbacks (SpaNode *node,
const SpaNodeCallbacks *callbacks,
size_t callbacks_size,
void *user_data)
{
SpaProxy *this;
@ -235,7 +236,7 @@ spa_proxy_node_set_event_callback (SpaNode *node,
return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node);
this->event_cb = event;
this->callbacks = *callbacks;
this->user_data = user_data;
return SPA_RESULT_OK;
@ -484,11 +485,11 @@ next:
}
static SpaResult
spa_proxy_node_port_set_format (SpaNode *node,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
spa_proxy_node_port_set_format (SpaNode *node,
SpaDirection direction,
uint32_t port_id,
uint32_t flags,
const SpaFormat *format)
{
SpaProxy *this;
@ -768,8 +769,8 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
spa_log_trace (this->log, "reuse buffer %d", buffer_id);
{
SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (impl->core->type.event_node.ReuseBuffer,
port_id, buffer_id);
PinosEventTransportReuseBuffer rb = PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT
(impl->core->type.event_transport.ReuseBuffer, port_id, buffer_id);
pinos_transport_add_event (impl->transport, (SpaEvent *)&rb);
}
@ -840,8 +841,8 @@ spa_proxy_node_process_output (SpaNode *node)
continue;
if (io->buffer_id != SPA_ID_INVALID) {
SpaEventNodeReuseBuffer rb =
SPA_EVENT_NODE_REUSE_BUFFER_INIT (impl->core->type.event_node.ReuseBuffer, i, io->buffer_id);
PinosEventTransportReuseBuffer rb =
PINOS_EVENT_TRANSPORT_REUSE_BUFFER_INIT (impl->core->type.event_transport.ReuseBuffer, i, io->buffer_id);
spa_log_trace (this->log, "reuse buffer %d", io->buffer_id);
@ -875,7 +876,7 @@ handle_node_event (SpaProxy *this,
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, proxy);
int i;
if (SPA_EVENT_TYPE (event) == impl->core->type.event_node.HaveOutput) {
if (SPA_EVENT_TYPE (event) == impl->core->type.event_transport.HaveOutput) {
for (i = 0; i < MAX_OUTPUTS; i++) {
SpaPortIO *io = this->out_ports[i].io;
@ -885,8 +886,15 @@ handle_node_event (SpaProxy *this,
*io = impl->transport->outputs[i];
pinos_log_trace ("%d %d", io->status, io->buffer_id);
}
this->callbacks.have_output (&this->node, this->user_data);
}
else if (SPA_EVENT_TYPE (event) == impl->core->type.event_transport.NeedInput) {
this->callbacks.need_input (&this->node, this->user_data);
}
else if (SPA_EVENT_TYPE (event) == impl->core->type.event_transport.ReuseBuffer) {
PinosEventTransportReuseBuffer *p = (PinosEventTransportReuseBuffer *) event;
this->callbacks.reuse_buffer (&this->node, p->body.port_id.value, p->body.buffer_id.value, this->user_data);
}
this->event_cb (&this->node, event, this->user_data);
return SPA_RESULT_OK;
}
@ -958,7 +966,7 @@ client_node_event (void *object,
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (node, PinosClientNodeImpl, this);
SpaProxy *this = &impl->proxy;
handle_node_event (this, event);
this->callbacks.event (&this->node, event, this->user_data);
}
static void
@ -996,7 +1004,7 @@ proxy_on_data_fd_events (SpaSource *source)
while (pinos_transport_next_event (impl->transport, &event) == SPA_RESULT_OK) {
SpaEvent *ev = alloca (SPA_POD_SIZE (&event));
pinos_transport_parse_event (impl->transport, ev);
this->event_cb (&this->node, ev, this->user_data);
handle_node_event (this, ev);
}
}
}
@ -1007,7 +1015,7 @@ static const SpaNode proxy_node = {
spa_proxy_node_get_props,
spa_proxy_node_set_props,
spa_proxy_node_send_command,
spa_proxy_node_set_event_callback,
spa_proxy_node_set_callbacks,
spa_proxy_node_get_n_ports,
spa_proxy_node_get_port_ids,
spa_proxy_node_add_port,

View file

@ -364,7 +364,7 @@ do_allocation (PinosLink *this, uint32_t in_state, uint32_t out_state)
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
SpaResult res;
const SpaPortInfo *iinfo, *oinfo;
SpaPortInfoFlags in_flags, out_flags;
uint32_t in_flags, out_flags;
char *error = NULL;
if (in_state != PINOS_PORT_STATE_READY && out_state != PINOS_PORT_STATE_READY)

View file

@ -33,9 +33,6 @@ typedef struct
{
PinosNode this;
#define STATE_IN 0
#define STATE_OUT 1
int state;
PinosWorkQueue *work;
bool async_init;
@ -295,15 +292,15 @@ do_pull (PinosNode *this)
res = do_pull (outport->node);
pinos_log_trace ("node %p: pull return %d", outport->node, res);
}
else if (res < 0 && res != SPA_RESULT_HAVE_BUFFER) {
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
if (res == SPA_RESULT_HAVE_BUFFER) {
else if (res == SPA_RESULT_HAVE_BUFFER) {
*pi = *po;
pinos_log_trace ("node %p: have output %d %d", this, pi->status, pi->buffer_id);
have_output = true;
}
else if (res < 0) {
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
}
}
if (have_output) {
@ -326,69 +323,80 @@ on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
pinos_work_queue_complete (impl->work, this, ac->body.seq.value, ac->body.res.value);
pinos_signal_emit (&this->async_complete, this, ac->body.seq.value, ac->body.res.value);
}
else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.NeedInput) {
do_pull (this);
}
else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.HaveOutput) {
SpaResult res;
PinosPort *outport;
spa_list_for_each (outport, &this->output_ports, link) {
PinosLink *link;
SpaPortIO *po;
po = &outport->io;
if (po->buffer_id == SPA_ID_INVALID)
continue;
pinos_log_trace ("node %p: have output %d", this, po->buffer_id);
spa_list_for_each (link, &outport->rt.links, rt.output_link) {
PinosPort *inport;
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
inport = link->rt.input;
inport->io = *po;
pinos_log_trace ("node %p: do process input %d", this, po->buffer_id);
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
}
po->status = SPA_RESULT_NEED_BUFFER;
}
res = spa_node_process_output (this->node);
if (res < 0 && res != SPA_RESULT_HAVE_BUFFER)
pinos_log_warn ("node %p: got process output %d", this, res);
}
else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.ReuseBuffer) {
SpaEventNodeReuseBuffer *rb = (SpaEventNodeReuseBuffer *) event;
PinosPort *inport;
pinos_log_trace ("node %p: reuse buffer %u", this, rb->body.buffer_id.value);
spa_list_for_each (inport, &this->input_ports, link) {
PinosLink *link;
PinosPort *outport;
spa_list_for_each (link, &inport->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
outport = link->rt.output;
outport->io.buffer_id = rb->body.buffer_id.value;
}
}
}
else if (SPA_EVENT_TYPE (event) == this->core->type.event_node.RequestClockUpdate) {
send_clock_update (this);
}
}
static void
on_node_need_input (SpaNode *node, void *user_data)
{
PinosNode *this = user_data;
do_pull (this);
}
static void
on_node_have_output (SpaNode *node, void *user_data)
{
PinosNode *this = user_data;
SpaResult res;
PinosPort *outport;
spa_list_for_each (outport, &this->output_ports, link) {
PinosLink *link;
SpaPortIO *po;
po = &outport->io;
if (po->buffer_id == SPA_ID_INVALID)
continue;
pinos_log_trace ("node %p: have output %d", this, po->buffer_id);
spa_list_for_each (link, &outport->rt.links, rt.output_link) {
PinosPort *inport;
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
inport = link->rt.input;
inport->io = *po;
pinos_log_trace ("node %p: do process input %d", this, po->buffer_id);
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
}
po->status = SPA_RESULT_NEED_BUFFER;
}
res = spa_node_process_output (this->node);
if (res != SPA_RESULT_OK)
pinos_log_warn ("node %p: got process output %d", this, res);
}
static void
on_node_reuse_buffer (SpaNode *node, uint32_t port_id, uint32_t buffer_id, void *user_data)
{
PinosNode *this = user_data;
PinosPort *inport;
pinos_log_trace ("node %p: reuse buffer %u", this, buffer_id);
spa_list_for_each (inport, &this->input_ports, link) {
PinosLink *link;
PinosPort *outport;
spa_list_for_each (link, &inport->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
outport = link->rt.output;
outport->io.buffer_id = buffer_id;
}
}
}
static void
node_unbind_func (void *data)
{
@ -514,6 +522,13 @@ pinos_node_set_data_loop (PinosNode *node,
pinos_signal_emit (&node->loop_changed, node);
}
static const SpaNodeCallbacks node_callbacks = {
&on_node_event,
&on_node_need_input,
&on_node_have_output,
&on_node_reuse_buffer,
};
PinosNode *
pinos_node_new (PinosCore *core,
PinosClient *owner,
@ -546,7 +561,7 @@ pinos_node_new (PinosCore *core,
spa_list_init (&this->resource_list);
if (spa_node_set_event_callback (this->node, on_node_event, this) < 0)
if (spa_node_set_callbacks (this->node, &node_callbacks, sizeof (node_callbacks), this) < 0)
pinos_log_warn ("node %p: error setting callback", this);
pinos_signal_init (&this->destroy_signal);

View file

@ -607,12 +607,12 @@ client_node_marshal_remove_port (void *object,
}
static void
client_node_marshal_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
client_node_marshal_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
uint32_t flags,
const SpaFormat *format)
{
PinosResource *resource = object;
PinosConnection *connection = resource->client->protocol_private;