Implement subscription with a signal

Use a signal for subscription events
Work on handling OOM errors and other errors.
This commit is contained in:
Wim Taymans 2016-12-22 16:50:01 +01:00
parent 1b66bbcffd
commit 85d375e4bb
32 changed files with 531 additions and 176 deletions

View file

@ -1234,6 +1234,9 @@ pinos_client_node_new (PinosClient *client,
PinosClientNode *this;
impl = calloc (1, sizeof (PinosClientNodeImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
impl->core = client->core;
impl->data_fd = -1;
@ -1248,6 +1251,8 @@ pinos_client_node_new (PinosClient *client,
&impl->proxy.node,
NULL,
properties);
if (this->node == NULL)
goto error_no_node;
impl->proxy.pnode = this->node;
@ -1264,12 +1269,21 @@ pinos_client_node_new (PinosClient *client,
client->core->uri.client_node,
this,
(PinosDestroy) client_node_resource_destroy);
impl->proxy.resource = this->resource;
if (this->resource == NULL)
goto error_no_resource;
impl->proxy.resource = this->resource;
this->resource->dispatch_func = client_node_dispatch_func;
this->resource->dispatch_data = this;
return this;
error_no_resource:
pinos_node_destroy (this->node);
error_no_node:
proxy_clear (&impl->proxy);
free (impl);
return NULL;
}
static void

View file

@ -69,6 +69,8 @@ client_bind_func (PinosGlobal *global,
global->core->uri.client,
global->object,
client_unbind_func);
if (resource == NULL)
goto no_mem;
resource->dispatch_func = client_dispatch_func;
resource->dispatch_data = global;
@ -86,6 +88,12 @@ client_bind_func (PinosGlobal *global,
PINOS_MESSAGE_CLIENT_INFO,
&m,
true);
return;
no_mem:
pinos_resource_send_error (client->core_resource,
SPA_RESULT_NO_MEMORY,
"no memory");
}
/**
@ -105,6 +113,9 @@ pinos_client_new (PinosCore *core,
PinosClientImpl *impl;
impl = calloc (1, sizeof (PinosClientImpl));
if (impl == NULL)
return NULL;
pinos_log_debug ("client %p: new", impl);
this = &impl->this;

View file

@ -68,6 +68,8 @@ parse_command_module_load (const char * line, char ** err)
PinosCommandImpl *impl;
impl = calloc (1, sizeof (PinosCommandImpl));
if (impl == NULL)
goto no_mem;
impl->func = execute_command_module_load;
impl->args = pinos_split_strv (line, whitespace, 3, &impl->n_args);
@ -83,6 +85,9 @@ no_module:
asprintf (err, "%s requires a module name", impl->args[0]);
pinos_free_strv (impl->args);
return NULL;
no_mem:
asprintf (err, "no memory");
return NULL;
}
static bool
@ -125,7 +130,7 @@ pinos_command_free (PinosCommand * command)
* Returns: The command or %NULL when @err is set.
*/
PinosCommand *
pinos_command_parse (const char *line,
pinos_command_parse (const char *line,
char **err)
{
PinosCommand *command = NULL;

View file

@ -51,15 +51,19 @@ registry_dispatch_func (void *object,
break;
if (&global->link == &this->global_list) {
pinos_log_error ("unknown object id %d", m->id);
pinos_resource_send_error (resource,
SPA_RESULT_INVALID_OBJECT_ID,
"unknown object id %u", m->id);
return SPA_RESULT_ERROR;
}
if (global->bind == NULL) {
pinos_log_error ("can't bind object id %d", m->id);
pinos_resource_send_error (resource,
SPA_RESULT_NOT_IMPLEMENTED,
"can't bind object id %d", m->id);
return SPA_RESULT_ERROR;
}
pinos_log_error ("global %p: bind object id %d", global, m->id);
pinos_log_debug ("global %p: bind object id %d", global, m->id);
global->bind (global, client, 0, m->id);
break;
}
@ -109,6 +113,8 @@ core_dispatch_func (void *object,
this->uri.registry,
this,
destroy_registry_resource);
if (registry_resource == NULL)
goto no_mem;
registry_resource->dispatch_func = registry_dispatch_func;
registry_resource->dispatch_data = this;
@ -142,6 +148,9 @@ core_dispatch_func (void *object,
PinosProperties *props;
props = pinos_properties_new (NULL, NULL);
if (props == NULL)
goto no_mem;
for (i = 0; i < m->props->n_items; i++) {
pinos_properties_set (props, m->props->items[i].key,
m->props->items[i].value);
@ -151,9 +160,13 @@ core_dispatch_func (void *object,
m->new_id,
m->name,
props);
if (node == NULL)
goto no_mem;
if ((res = pinos_client_node_get_data_socket (node, &data_fd)) < 0) {
pinos_log_error ("can't get data fd");
pinos_resource_send_error (resource,
SPA_RESULT_ERROR,
"can't get data fd");
break;
}
@ -170,6 +183,12 @@ core_dispatch_func (void *object,
break;
}
return SPA_RESULT_OK;
no_mem:
pinos_resource_send_error (resource,
SPA_RESULT_NO_MEMORY,
"no memory");
return SPA_RESULT_NO_MEMORY;
}
static void
@ -188,6 +207,8 @@ core_bind_func (PinosGlobal *global,
global->core->uri.core,
global->object,
NULL);
if (resource == NULL)
goto no_mem;
resource->dispatch_func = core_dispatch_func;
resource->dispatch_data = this;
@ -211,6 +232,10 @@ core_bind_func (PinosGlobal *global,
PINOS_MESSAGE_CORE_INFO,
&m,
true);
return;
no_mem:
pinos_log_error ("can't create core resource");
}
PinosCore *
@ -220,14 +245,19 @@ pinos_core_new (PinosMainLoop *main_loop)
PinosCore *this;
impl = calloc (1, sizeof (PinosCoreImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
pinos_uri_init (&this->uri);
pinos_map_init (&this->objects, 512);
this->data_loop = pinos_data_loop_new ();
if (this->data_loop == NULL)
goto no_data_loop;
this->main_loop = main_loop;
pinos_uri_init (&this->uri);
pinos_map_init (&this->objects, 512);
impl->support[0].uri = SPA_ID_MAP_URI;
impl->support[0].data = this->uri.map;
impl->support[1].uri = SPA_LOG_URI;
@ -266,6 +296,10 @@ pinos_core_new (PinosMainLoop *main_loop)
core_bind_func);
return this;
no_data_loop:
free (impl);
return NULL;
}
void
@ -292,6 +326,9 @@ pinos_core_add_global (PinosCore *core,
PinosMessageNotifyGlobal ng;
global = calloc (1, sizeof (PinosGlobal));
if (global == NULL)
return NULL;
global->core = core;
global->type = type;
global->version = version;

View file

@ -131,16 +131,26 @@ pinos_data_loop_new (void)
PinosDataLoop *this;
impl = calloc (1, sizeof (PinosDataLoopImpl));
if (impl == NULL)
return NULL;
pinos_log_debug ("data-loop %p: new", impl);
this = &impl->this;
this->loop = pinos_loop_new ();
if (this->loop == NULL)
goto no_loop;
pinos_signal_init (&this->destroy_signal);
impl->event = pinos_loop_add_event (this->loop,
do_stop,
impl);
return this;
no_loop:
free (impl);
return NULL;
}
void

View file

@ -49,29 +49,24 @@ typedef struct
} PinosLinkImpl;
static void
pinos_link_update_state (PinosLink *link, PinosLinkState state)
pinos_link_update_state (PinosLink *link,
PinosLinkState state,
char *error)
{
if (state != link->state) {
free (link->error);
link->error = NULL;
pinos_log_debug ("link %p: update state %s -> %s", link,
pinos_link_state_as_string (link->state),
pinos_link_state_as_string (state));
link->state = state;
if (link->error)
free (link->error);
link->error = error;
pinos_signal_emit (&link->core->link_state_changed, link);
}
}
static void
pinos_link_report_error (PinosLink *link, char *error)
{
free (link->error);
link->error = error;
link->state = PINOS_LINK_STATE_ERROR;
pinos_log_debug ("link %p: got error state %s", link, error);
pinos_signal_emit (&link->core->link_state_changed, link);
}
static SpaResult
do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
{
@ -83,7 +78,7 @@ do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (in_state != SPA_NODE_STATE_CONFIGURE && out_state != SPA_NODE_STATE_CONFIGURE)
return SPA_RESULT_OK;
pinos_link_update_state (this, PINOS_LINK_STATE_NEGOTIATING);
pinos_link_update_state (this, PINOS_LINK_STATE_NEGOTIATING, NULL);
/* both ports need a format */
if (in_state == SPA_NODE_STATE_CONFIGURE && out_state == SPA_NODE_STATE_CONFIGURE) {
@ -170,7 +165,7 @@ again:
error:
{
pinos_link_report_error (this, error);
pinos_link_update_state (this, PINOS_LINK_STATE_ERROR, error);
return res;
}
}
@ -352,7 +347,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (in_state != SPA_NODE_STATE_READY && out_state != SPA_NODE_STATE_READY)
return SPA_RESULT_OK;
pinos_link_update_state (this, PINOS_LINK_STATE_ALLOCATING);
pinos_link_update_state (this, PINOS_LINK_STATE_ALLOCATING, NULL);
pinos_log_debug ("link %p: doing alloc buffers %p %p", this, this->output->node, this->input->node);
/* find out what's possible */
@ -552,7 +547,7 @@ error:
this->input->buffers = NULL;
this->input->n_buffers = 0;
this->input->allocated = false;
pinos_link_report_error (this, error);
pinos_link_update_state (this, PINOS_LINK_STATE_ERROR, error);
return res;
}
}
@ -565,9 +560,9 @@ do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (in_state < SPA_NODE_STATE_PAUSED || out_state < SPA_NODE_STATE_PAUSED)
return SPA_RESULT_OK;
else if (in_state == SPA_NODE_STATE_STREAMING && out_state == SPA_NODE_STATE_STREAMING) {
pinos_link_update_state (this, PINOS_LINK_STATE_RUNNING);
pinos_link_update_state (this, PINOS_LINK_STATE_RUNNING, NULL);
} else {
pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED);
pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED, NULL);
if (in_state == SPA_NODE_STATE_PAUSED) {
res = pinos_node_set_state (this->input->node, PINOS_NODE_STATE_RUNNING);
@ -587,9 +582,16 @@ check_states (PinosLink *this,
SpaNodeState in_state, out_state;
again:
if (this->state == PINOS_LINK_STATE_ERROR)
return SPA_RESULT_ERROR;
if (this->input == NULL || this->output == NULL)
return SPA_RESULT_OK;
if (this->input->node->state == PINOS_NODE_STATE_ERROR ||
this->output->node->state == PINOS_NODE_STATE_ERROR)
return SPA_RESULT_ERROR;
in_state = this->input->node->node->state;
out_state = this->output->node->node->state;
@ -652,7 +654,7 @@ on_port_unlinked (PinosPort *port, PinosLink *this, SpaResult res, uint32_t id)
pinos_signal_emit (&this->core->port_unlinked, this, port);
if (this->input == NULL || this->output == NULL) {
pinos_link_update_state (this, PINOS_LINK_STATE_UNLINKED);
pinos_link_update_state (this, PINOS_LINK_STATE_UNLINKED, NULL);
pinos_link_destroy (this);
}
@ -719,7 +721,11 @@ bool
pinos_link_activate (PinosLink *this)
{
spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue));
check_states (this, NULL, SPA_RESULT_OK);
pinos_main_loop_defer (this->core->main_loop,
this,
SPA_RESULT_WAIT_SYNC,
(PinosDeferFunc) check_states,
this);
return true;
}
@ -766,6 +772,8 @@ link_bind_func (PinosGlobal *global,
global->core->uri.link,
global->object,
link_unbind_func);
if (resource == NULL)
goto no_mem;
resource->dispatch_func = link_dispatch_func;
resource->dispatch_data = global;
@ -785,6 +793,12 @@ link_bind_func (PinosGlobal *global,
PINOS_MESSAGE_LINK_INFO,
&m,
true);
return;
no_mem:
pinos_resource_send_error (client->core_resource,
SPA_RESULT_NO_MEMORY,
"no memory");
}
PinosLink *
@ -798,6 +812,9 @@ pinos_link_new (PinosCore *core,
PinosLink *this;
impl = calloc (1, sizeof (PinosLinkImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
pinos_log_debug ("link %p: new", this);

View file

@ -97,6 +97,8 @@ main_loop_defer (PinosMainLoop *loop,
spa_list_remove (&item->link);
} else {
item = malloc (sizeof (WorkItem));
if (item == NULL)
return SPA_ID_INVALID;
}
item->id = ++impl->counter;
item->obj = obj;
@ -189,10 +191,15 @@ pinos_main_loop_new (void)
PinosMainLoop *this;
impl = calloc (1, sizeof (PinosMainLoopImpl));
if (impl == NULL)
return NULL;
pinos_log_debug ("main-loop %p: new", impl);
this = &impl->this;
this->loop = pinos_loop_new ();
if (this->loop == NULL)
goto no_loop;
pinos_signal_init (&this->destroy_signal);
@ -208,6 +215,10 @@ pinos_main_loop_new (void)
spa_list_init (&impl->free_list);
return this;
no_loop:
free (impl);
return NULL;
}
void

View file

@ -123,6 +123,8 @@ module_bind_func (PinosGlobal *global,
global->core->uri.module,
global->object,
NULL);
if (resource == NULL)
goto no_mem;
resource->dispatch_func = module_dispatch_func;
resource->dispatch_data = global;
@ -141,6 +143,12 @@ module_bind_func (PinosGlobal *global,
PINOS_MESSAGE_MODULE_INFO,
&m,
true);
return;
no_mem:
pinos_resource_send_error (resource,
SPA_RESULT_NO_MEMORY,
"no memory");
}
/**
@ -201,6 +209,9 @@ pinos_module_load (PinosCore *core,
goto no_pinos_module;
impl = calloc (1, sizeof (PinosModuleImpl));
if (impl == NULL)
goto no_mem;
impl->hnd = hnd;
this = &impl->this;
@ -229,10 +240,11 @@ not_found:
}
open_failed:
{
asprintf (err, "Failed to open module: %s", dlerror ());
asprintf (err, "Failed to open module: \"%s\" %s", filename, dlerror ());
free (filename);
return NULL;
}
no_mem:
no_pinos_module:
{
asprintf (err, "\"%s\" is not a pinos module", name);

View file

@ -422,6 +422,8 @@ node_bind_func (PinosGlobal *global,
global->core->uri.registry,
global->object,
node_unbind_func);
if (resource == NULL)
goto no_mem;
resource->dispatch_func = node_dispatch_func;
resource->dispatch_data = global;
@ -435,12 +437,19 @@ node_bind_func (PinosGlobal *global,
info.change_mask = ~0;
info.name = this->name;
info.state = this->state;
info.error = this->error;
info.props = this->properties ? &this->properties->dict : NULL;
pinos_resource_send_message (resource,
PINOS_MESSAGE_NODE_INFO,
&m,
true);
return;
no_mem:
pinos_resource_send_error (resource,
SPA_RESULT_NO_MEMORY,
"no memory");
}
static void
@ -482,6 +491,9 @@ pinos_node_new (PinosCore *core,
PinosNode *this;
impl = calloc (1, sizeof (PinosNodeImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
this->core = core;
pinos_log_debug ("node %p: new", this);
@ -514,6 +526,9 @@ pinos_node_new (PinosCore *core,
if (this->properties == NULL)
this->properties = pinos_properties_new (NULL, NULL);
if (this->properties)
goto no_mem;
for (i = 0; i < this->node->info->n_items; i++)
pinos_properties_set (this->properties,
this->node->info->items[i].key,
@ -532,6 +547,11 @@ pinos_node_new (PinosCore *core,
}
return this;
no_mem:
free (this->name);
free (impl);
return NULL;
}
static SpaResult
@ -814,6 +834,7 @@ pinos_node_update_state (PinosNode *node,
m.info = &info;
info.change_mask = 1 << 1;
info.state = node->state;
info.error = node->error;
spa_list_for_each (resource, &node->resource_list, link) {
info.id = node->global->id;

View file

@ -41,6 +41,9 @@ pinos_port_new (PinosNode *node,
PinosPort *this;
impl = calloc (1, sizeof (PinosPortImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
this->node = node;
this->direction = direction;
@ -177,6 +180,8 @@ pinos_port_link (PinosPort *output_port,
input_port,
format_filter,
properties);
if (link == NULL)
goto no_mem;
spa_list_insert (output_port->links.prev, &link->output_link);
spa_list_insert (input_port->links.prev, &link->input_link);
@ -209,6 +214,8 @@ was_linked:
asprintf (error, "input port was already linked");
return NULL;
}
no_mem:
return NULL;
}
static SpaResult

View file

@ -31,6 +31,9 @@ pinos_resource_new (PinosClient *client,
PinosResource *this;
this = calloc (1, sizeof (PinosResource));
if (this == NULL)
return NULL;
this->core = client->core;
this->client = client;
this->id = id;
@ -95,15 +98,42 @@ pinos_resource_send_message (PinosResource *resource,
void *message,
bool flush)
{
if (resource->send_func)
return resource->send_func (resource,
resource->id,
opcode,
message,
flush,
resource->send_data);
if (!resource->send_func) {
pinos_log_error ("resource %p: send func not implemented", resource);
return SPA_RESULT_NOT_IMPLEMENTED;
}
pinos_log_error ("resource %p: send func not implemented", resource);
return SPA_RESULT_NOT_IMPLEMENTED;
return resource->send_func (resource,
resource->id,
opcode,
message,
flush,
resource->send_data);
}
SpaResult
pinos_resource_send_error (PinosResource *resource,
SpaResult res,
const char *message,
...)
{
PinosClient *client = resource->client;
PinosMessageError m;
char buffer[128];
va_list ap;
va_start (ap, message);
vsnprintf (buffer, sizeof (buffer), message, ap);
va_end (ap);
m.id = resource->id;
m.res = res;
m.error = buffer;
pinos_log_error ("resource %p: %u send error %d %s", resource, resource->id, res, buffer);
return pinos_resource_send_message (client->core_resource,
PINOS_MESSAGE_ERROR,
&m,
true);
}

View file

@ -76,6 +76,9 @@ SpaResult pinos_resource_send_message (PinosResource *resource,
uint32_t opcode,
void *message,
bool flush);
SpaResult pinos_resource_send_error (PinosResource *resource,
SpaResult res,
const char *message, ...);
#ifdef __cplusplus
}