WIP object model change

Work on cleanup
This commit is contained in:
Wim Taymans 2016-11-15 13:29:58 +01:00
parent c25ccbb4ba
commit 0d21d633c9
26 changed files with 912 additions and 821 deletions

View file

@ -46,7 +46,7 @@ void pinos_log_logv (SpaLogLevel level,
#if __STDC_VERSION__ >= 199901L
#define pinos_log_logc(lev,...) \
if (SPA_UNLIKELY (lev >= pinos_log_level)) \
if (SPA_UNLIKELY (lev <= pinos_log_level)) \
pinos_log_log(lev,__VA_ARGS__)
#define pinos_log_error(...) pinos_log_logc(SPA_LOG_LEVEL_ERROR,__FILE__,__LINE__,__func__,__VA_ARGS__)

View file

@ -20,7 +20,7 @@
#ifndef __PINOS_PROPERTIES_H__
#define __PINOS_PROPERTIES_H__
#include <glib.h>
#include <glib-object.h>
G_BEGIN_DECLS

View file

@ -30,12 +30,10 @@ const char * pinos_split_walk (const char *str,
const char *delimiter,
size_t *len,
const char **state);
char ** pinos_split_strv (const char *str,
const char *delimiter,
int max_tokens,
int *n_tokens);
void pinos_free_strv (char **str);
#ifdef __cplusplus

View file

@ -71,8 +71,7 @@ add_item (PinosSpaMonitor *this, SpaMonitorItem *item)
return;
}
if ((res = spa_handle_get_interface (handle, impl->core->registry.uri.spa_clock, &clock_iface)) < 0) {
pinos_log_error ("can't get CLOCK interface: %d", res);
return;
pinos_log_info ("no CLOCK interface: %d", res);
}
if (item->info) {

View file

@ -30,7 +30,6 @@
#include <sys/eventfd.h>
#include "pinos/client/pinos.h"
#include "pinos/client/enumtypes.h"
#include "pinos/client/private.h"
#include "pinos/client/connection.h"
#include "pinos/client/serialize.h"
@ -40,7 +39,6 @@
#include "pinos/server/client-node.h"
#include "spa/include/spa/node.h"
#include "spa/lib/memfd-wrappers.h"
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64

View file

@ -30,7 +30,6 @@ typedef struct
guint id;
PinosClient1 *iface;
gchar *object_path;
GList *objects;
} PinosClientImpl;
@ -191,24 +190,5 @@ pinos_client_destroy (PinosClient * client)
pinos_properties_free (client->properties);
g_clear_object (&impl->iface);
free (impl->object_path);
free (impl);
}
/**
* pinos_client_get_object_path:
* @client: a #PinosClient
*
* Get the object path of @client.
*
* Returns: the object path of @client
*/
const gchar *
pinos_client_get_object_path (PinosClient *client)
{
PinosClientImpl *impl = SPA_CONTAINER_OF (client, PinosClientImpl, this);
g_return_val_if_fail (client, NULL);
return impl->object_path;
}

View file

@ -54,8 +54,6 @@ PinosClient * pinos_client_new (PinosCore *core,
PinosProperties *properties);
void pinos_client_destroy (PinosClient *client);
const gchar * pinos_client_get_object_path (PinosClient *client);
void pinos_client_add_object (PinosClient *client,
PinosObject *object);

View file

@ -65,6 +65,8 @@ pinos_core_new (PinosMainLoop *main_loop)
pinos_signal_init (&this->port_removed);
pinos_signal_init (&this->port_unlinked);
pinos_signal_init (&this->link_state_changed);
pinos_signal_init (&this->node_unlink);
pinos_signal_init (&this->node_unlink_done);
return this;
}

View file

@ -98,7 +98,10 @@ struct _PinosCore {
PinosPort *port));
PINOS_SIGNAL (link_state_changed, (PinosListener *listener,
PinosLink *link));
PINOS_SIGNAL (node_unlink, (PinosListener *listener,
PinosNode *node));
PINOS_SIGNAL (node_unlink_done, (PinosListener *listener,
PinosNode *node));
};
PinosCore * pinos_core_new (PinosMainLoop *main_loop);

View file

@ -64,8 +64,8 @@ static void try_link_port (PinosNode *node, PinosPort *port, PinosDaemon *daemon
static PinosClient *
sender_get_client (PinosDaemon *daemon,
const gchar *sender,
gboolean create)
const char *sender,
bool create)
{
PinosDaemonImpl *impl = SPA_CONTAINER_OF (daemon, PinosDaemonImpl, this);
PinosClient *client;
@ -77,11 +77,11 @@ sender_get_client (PinosDaemon *daemon,
return client;
}
static gboolean
static bool
handle_create_node (PinosDaemon1 *interface,
GDBusMethodInvocation *invocation,
const gchar *arg_factory_name,
const gchar *arg_name,
const char *arg_factory_name,
const char *arg_name,
GVariant *arg_properties,
gpointer user_data)
{
@ -90,7 +90,7 @@ handle_create_node (PinosDaemon1 *interface,
PinosNodeFactory *factory;
PinosNode *node;
PinosClient *client;
const gchar *sender, *object_path;
const char *sender, *object_path;
PinosProperties *props;
sender = g_dbus_method_invocation_get_sender (invocation);
@ -163,12 +163,12 @@ on_link_state_changed (PinosListener *listener,
switch (state) {
case PINOS_LINK_STATE_ERROR:
{
pinos_log_debug ("daemon %p: link %p: state error: %s", impl, link, link->error->message);
pinos_log_debug ("daemon %p: link %p: state error: %s", impl, link, link->error);
if (link->input && link->input->node)
pinos_node_report_error (link->input->node, g_error_copy (link->error));
pinos_node_report_error (link->input->node, strdup (link->error));
if (link->output && link->output->node)
pinos_node_report_error (link->output->node, g_error_copy (link->error));
pinos_node_report_error (link->output->node, strdup (link->error));
break;
}
@ -204,7 +204,7 @@ try_link_port (PinosNode *node, PinosPort *port, PinosDaemon *this)
//PinosClient *client;
PinosProperties *props;
const char *path;
GError *error = NULL;
char *error = NULL;
PinosLink *link;
props = node->properties;
@ -272,15 +272,13 @@ static void
on_node_created (PinosNode *node,
PinosDaemonImpl *impl)
{
GList *ports, *walk;
PinosPort *port;
ports = pinos_node_get_ports (node, PINOS_DIRECTION_INPUT);
for (walk = ports; walk; walk = g_list_next (walk))
on_port_added (&impl->port_added, node, walk->data);
spa_list_for_each (port, &node->input_ports, link)
on_port_added (&impl->port_added, node, port);
ports = pinos_node_get_ports (node, PINOS_DIRECTION_OUTPUT);
for (walk = ports; walk; walk = g_list_next (walk))
on_port_added (&impl->port_added, node, walk->data);
spa_list_for_each (port, &node->output_ports, link)
on_port_added (&impl->port_added, node, port);
}
static void
@ -317,13 +315,12 @@ static void
on_node_removed (PinosDaemon *daemon, PinosNode *node)
{
pinos_log_debug ("daemon %p: node %p removed", daemon, node);
g_signal_handlers_disconnect_by_data (node, daemon);
}
static gboolean
static bool
handle_create_client_node (PinosDaemon1 *interface,
GDBusMethodInvocation *invocation,
const gchar *arg_name,
const char *arg_name,
GVariant *arg_properties,
gpointer user_data)
{
@ -404,7 +401,7 @@ export_server_object (PinosDaemon *daemon,
static void
bus_acquired_handler (GDBusConnection *connection,
const gchar *name,
const char *name,
gpointer user_data)
{
PinosDaemonImpl *impl = user_data;
@ -417,14 +414,14 @@ bus_acquired_handler (GDBusConnection *connection,
static void
name_acquired_handler (GDBusConnection *connection,
const gchar *name,
const char *name,
gpointer user_data)
{
}
static void
name_lost_handler (GDBusConnection *connection,
const gchar *name,
const char *name,
gpointer user_data)
{
PinosDaemonImpl *impl = user_data;
@ -547,11 +544,11 @@ pinos_daemon_find_port (PinosDaemon *daemon,
PinosPort *other_port,
const char *name,
PinosProperties *props,
GPtrArray *format_filters,
GError **error)
SpaFormat **format_filters,
char **error)
{
PinosPort *best = NULL;
gboolean have_name;
bool have_name;
PinosNode *n;
g_return_val_if_fail (daemon, NULL);
@ -575,10 +572,7 @@ pinos_daemon_find_port (PinosDaemon *daemon,
}
}
if (best == NULL) {
g_set_error (error,
G_IO_ERROR,
G_IO_ERROR_NOT_FOUND,
"No matching Node found");
asprintf (error, "No matching Node found");
}
return best;
}

View file

@ -20,9 +20,9 @@
#ifndef __PINOS_DAEMON_H__
#define __PINOS_DAEMON_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
#define PINOS_DAEMON_URI "http://pinos.org/ns/daemon"
#define PINOS_DAEMON_PREFIX PINOS_DAEMON_URI "#"
@ -32,6 +32,7 @@ typedef struct _PinosDaemon PinosDaemon;
#include <pinos/client/properties.h>
#include <pinos/server/core.h>
#include <pinos/server/node.h>
#include <pinos/server/port.h>
/**
* PinosDaemon:
@ -64,9 +65,11 @@ PinosPort * pinos_daemon_find_port (PinosDaemon *daemon,
PinosPort *other_port,
const char *name,
PinosProperties *props,
GPtrArray *format_filter,
GError **error);
SpaFormat **format_filter,
char **error);
G_END_DECLS
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_DAEMON_H__ */

View file

@ -26,15 +26,13 @@
#include <limits.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include "spa/include/spa/ringbuffer.h"
#include "pinos/client/log.h"
#include "pinos/client/rtkit.h"
#include "pinos/server/data-loop.h"
#define PINOS_DATA_LOOP_GET_PRIVATE(loop) \
(G_TYPE_INSTANCE_GET_PRIVATE ((loop), PINOS_TYPE_DATA_LOOP, PinosDataLoopPrivate))
#define DATAS_SIZE (4096 * 8)
typedef struct {
@ -127,7 +125,6 @@ loop (void *user_data)
SpaPollNotifyData ndata;
unsigned int n_idle = 0;
int r;
struct timespec ts;
/* prepare */
for (i = 0; i < impl->n_poll; i++) {
@ -202,9 +199,6 @@ loop (void *user_data)
continue;
}
// clock_gettime (CLOCK_MONOTONIC, &ts);
// fprintf (stderr, "%llu\n", SPA_TIMESPEC_TO_TIME (&ts));
/* after */
for (i = 0; i < impl->n_poll; i++) {
SpaPollItem *p = &impl->poll[i];
@ -420,7 +414,7 @@ pinos_data_loop_destroy (PinosDataLoop * loop)
PinosDataLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosDataLoopImpl, this);
pinos_log_debug ("data-loop %p: destroy", impl);
stop_thread (impl, FALSE);
stop_thread (impl, false);
close (impl->fds[0].fd);
free (impl);
}

View file

@ -20,9 +20,9 @@
#ifndef __PINOS_DATA_LOOP_H__
#define __PINOS_DATA_LOOP_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/include/spa/poll.h>
@ -42,6 +42,8 @@ void pinos_data_loop_destroy (PinosDataLoop *loop);
bool pinos_data_loop_in_thread (PinosDataLoop *loop);
G_END_DECLS
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_DATA_LOOP_H__ */

View file

@ -42,7 +42,7 @@ typedef struct
PinosLink1 *iface;
GPtrArray *format_filter;
SpaFormat **format_filter;
PinosProperties *properties;
PinosListener input_port_destroy;
@ -50,7 +50,7 @@ typedef struct
PinosListener output_port_destroy;
PinosListener output_async_complete;
gboolean allocated;
bool allocated;
PinosMemblock buffer_mem;
SpaBuffer **buffers;
unsigned int n_buffers;
@ -60,7 +60,8 @@ static void
pinos_link_update_state (PinosLink *link, PinosLinkState state)
{
if (state != link->state) {
g_clear_error (&link->error);
free (link->error);
link->error = NULL;
pinos_log_debug ("link %p: update state %s -> %s", link,
pinos_link_state_as_string (link->state),
pinos_link_state_as_string (state));
@ -70,12 +71,12 @@ pinos_link_update_state (PinosLink *link, PinosLinkState state)
}
static void
pinos_link_report_error (PinosLink *link, GError *error)
pinos_link_report_error (PinosLink *link, char *error)
{
g_clear_error (&link->error);
free (link->error);
link->error = error;
link->state = PINOS_LINK_STATE_ERROR;
pinos_log_debug ("link %p: got error state %s", link, error->message);
pinos_log_debug ("link %p: got error state %s", link, error);
pinos_signal_emit (&link->core->link_state_changed, link);
}
@ -85,7 +86,7 @@ do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
SpaResult res;
SpaFormat *filter = NULL, *format;
void *istate = NULL, *ostate = NULL;
GError *error = NULL;
char *error = NULL;
if (in_state != SPA_NODE_STATE_CONFIGURE && out_state != SPA_NODE_STATE_CONFIGURE)
return SPA_RESULT_OK;
@ -98,15 +99,12 @@ do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
again:
if ((res = spa_node_port_enum_formats (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
&filter,
NULL,
&istate)) < 0) {
if (res == SPA_RESULT_ENUM_END && istate != NULL) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error input enum formats: %d", res);
asprintf (&error, "error input enum formats: %d", res);
goto error;
}
}
@ -115,7 +113,7 @@ again:
if ((res = spa_node_port_enum_formats (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
&format,
filter,
&ostate)) < 0) {
@ -123,10 +121,7 @@ again:
ostate = NULL;
goto again;
}
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error output enum formats: %d", res);
asprintf (&error, "error output enum formats: %d", res);
goto error;
}
pinos_log_debug ("Got filtered:");
@ -136,24 +131,18 @@ again:
/* only input needs format */
if ((res = spa_node_port_get_format (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
(const SpaFormat **)&format)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error get output format: %d", res);
asprintf (&error, "error get output format: %d", res);
goto error;
}
} else if (out_state == SPA_NODE_STATE_CONFIGURE && in_state > SPA_NODE_STATE_CONFIGURE) {
/* only output needs format */
if ((res = spa_node_port_get_format (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
(const SpaFormat **)&format)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error get input format: %d", res);
asprintf (&error, "error get input format: %d", res);
goto error;
}
} else
@ -166,26 +155,20 @@ again:
pinos_log_debug ("link %p: doing set format on output", this);
if ((res = spa_node_port_set_format (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
SPA_PORT_FORMAT_FLAG_NEAREST,
format)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error set output format: %d", res);
asprintf (&error, "error set output format: %d", res);
goto error;
}
} else if (in_state == SPA_NODE_STATE_CONFIGURE) {
pinos_log_debug ("link %p: doing set format on input", this);
if ((res = spa_node_port_set_format (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
SPA_PORT_FORMAT_FLAG_NEAREST,
format)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_FORMAT_NEGOTIATION,
"error set input format: %d", res);
asprintf (&error, "error set input format: %d", res);
goto error;
}
}
@ -231,7 +214,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
SpaResult res;
const SpaPortInfo *iinfo, *oinfo;
SpaPortInfoFlags in_flags, out_flags;
GError *error = NULL;
char *error = NULL;
if (in_state != SPA_NODE_STATE_READY && out_state != SPA_NODE_STATE_READY)
return SPA_RESULT_OK;
@ -242,22 +225,16 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
/* find out what's possible */
if ((res = spa_node_port_get_info (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
&oinfo)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error get output port info: %d", res);
asprintf (&error, "error get output port info: %d", res);
goto error;
}
if ((res = spa_node_port_get_info (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
&iinfo)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error get input port info: %d", res);
asprintf (&error, "error get input port info: %d", res);
goto error;
}
spa_debug_port_info (oinfo);
@ -290,10 +267,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"no common buffer alloc found");
asprintf (&error, "no common buffer alloc found");
res = SPA_RESULT_ERROR;
goto error;
}
@ -309,7 +283,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (impl->buffers == NULL) {
SpaAllocParamBuffers *in_alloc, *out_alloc;
SpaAllocParamMetaEnableRingbuffer *in_me, *out_me;
guint max_buffers;
unsigned int max_buffers;
size_t minsize, stride, blocks;
in_me = find_meta_enable (iinfo, SPA_META_TYPE_RINGBUFFER);
@ -349,10 +323,10 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
impl->allocated = FALSE;
pinos_log_debug ("reusing %d output buffers %p", impl->n_buffers, impl->buffers);
} else {
guint i, j;
unsigned int i, j;
size_t hdr_size, buf_size, arr_size;
void *p;
guint n_metas, n_datas;
unsigned int n_metas, n_datas;
n_metas = 0;
n_datas = 1;
@ -388,7 +362,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
SpaBuffer *b;
SpaData *d;
void *pd;
guint mi;
unsigned int mi;
b = impl->buffers[i] = SPA_MEMBER (p, buf_size * i, SpaBuffer);
@ -446,13 +420,10 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
iinfo->params, iinfo->n_params,
impl->buffers, &impl->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error alloc output buffers: %d", res);
asprintf (&error, "error alloc output buffers: %d", res);
goto error;
}
this->output->buffers = impl->buffers;
@ -464,13 +435,10 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = spa_node_port_alloc_buffers (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
oinfo->params, oinfo->n_params,
impl->buffers, &impl->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error alloc input buffers: %d", res);
asprintf (&error, "error alloc input buffers: %d", res);
goto error;
}
this->input->buffers = impl->buffers;
@ -486,13 +454,10 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
pinos_log_debug ("using %d buffers %p on input port", impl->n_buffers, impl->buffers);
if ((res = spa_node_port_use_buffers (this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port,
this->input->port_id,
impl->buffers,
impl->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error use input buffers: %d", res);
asprintf (&error, "error use input buffers: %d", res);
goto error;
}
this->input->buffers = impl->buffers;
@ -503,23 +468,17 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
pinos_log_debug ("using %d buffers %p on output port", impl->n_buffers, impl->buffers);
if ((res = spa_node_port_use_buffers (this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port,
this->output->port_id,
impl->buffers,
impl->n_buffers)) < 0) {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"error use output buffers: %d", res);
asprintf (&error, "error use output buffers: %d", res);
goto error;
}
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = FALSE;
} else {
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_BUFFER_ALLOCATION,
"no common buffer alloc found");
asprintf (&error, "no common buffer alloc found");
goto error;
}
@ -598,7 +557,7 @@ again:
return SPA_RESULT_OK;
exit:
pinos_main_loop_defer (this->core->main_loop, this, res, (PinosDeferFunc) check_states, this, NULL);
pinos_main_loop_defer (this->core->main_loop, this, res, (PinosDeferFunc) check_states, this);
return res;
}
@ -676,8 +635,12 @@ on_port_destroy (PinosLink *this,
SpaResult res = SPA_RESULT_OK;
if (port == this->input) {
pinos_log_debug ("link %p: input port destroyed %p", this, port);
this->input = NULL;
other = this->output;
} else if (port == this->output) {
pinos_log_debug ("link %p: output port destroyed %p", this, port);
this->output = NULL;
other = this->input;
} else
return;
@ -686,17 +649,15 @@ on_port_destroy (PinosLink *this,
impl->buffers = NULL;
impl->n_buffers = 0;
pinos_log_debug ("link %p: clear input allocated buffers on port %p", link, other);
pinos_log_debug ("link %p: clear input allocated buffers on port %p", this, other);
pinos_port_clear_buffers (other);
}
res = pinos_port_unlink (port, this);
pinos_main_loop_defer (this->core->main_loop,
port,
res,
(PinosDeferFunc) on_port_unlinked,
this,
NULL);
this);
}
static void
@ -738,7 +699,7 @@ PinosLink *
pinos_link_new (PinosCore *core,
PinosPort *output,
PinosPort *input,
GPtrArray *format_filter,
SpaFormat **format_filter,
PinosProperties *properties)
{
PinosLinkImpl *impl;
@ -777,8 +738,8 @@ pinos_link_new (PinosCore *core,
on_output_async_complete_notify);
pinos_log_debug ("link %p: constructed %p:%d -> %p:%d", impl,
this->output->node, this->output->port,
this->input->node, this->input->port);
this->output->node, this->output->port_id,
this->input->node, this->input->port_id);
spa_list_insert (core->link_list.prev, &this->list);
@ -810,18 +771,15 @@ pinos_link_destroy (PinosLink * this)
if (this->input) {
pinos_signal_remove (&impl->input_port_destroy);
pinos_signal_remove (&impl->input_async_complete);
pinos_port_unlink (this->input, this);
}
if (this->output) {
pinos_signal_remove (&impl->output_port_destroy);
pinos_signal_remove (&impl->output_async_complete);
pinos_port_unlink (this->output, this);
}
pinos_main_loop_defer_cancel (this->core->main_loop, this, 0);
pinos_core_remove_global (this->core, this->global);
spa_list_remove (&this->list);
g_clear_object (&impl->iface);

View file

@ -20,9 +20,9 @@
#ifndef __PINOS_LINK_H__
#define __PINOS_LINK_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _PinosLink PinosLink;
@ -50,34 +50,43 @@ struct _PinosLink {
PinosProperties *properties;
PinosLinkState state;
GError *error;
char *error;
PINOS_SIGNAL (destroy_signal, (PinosListener *,
PinosLink *));
PinosPort *output;
SpaList output_link;
PinosPort *input;
SpaList input_link;
uint32_t queue[64];
SpaRingbuffer ringbuffer;
gint in_ready;
struct {
unsigned int in_ready;
PinosPort *input;
PinosPort *output;
SpaList input_link;
SpaList output_link;
} rt;
};
PinosLink * pinos_link_new (PinosCore *core,
PinosPort *output,
PinosPort *input,
GPtrArray *format_filter,
SpaFormat **format_filter,
PinosProperties *properties);
void pinos_link_destroy (PinosLink *link);
bool pinos_link_activate (PinosLink *link);
bool pinos_link_deactivate (PinosLink *link);
#ifdef __cplusplus
}
#endif
const gchar * pinos_link_get_object_path (PinosLink *link);
G_END_DECLS
#endif /* __PINOS_LINK_H__ */

View file

@ -31,9 +31,6 @@
#include "pinos/client/object.h"
#include "pinos/server/main-loop.h"
#define PINOS_MAIN_LOOP_GET_PRIVATE(loop) \
(G_TYPE_INSTANCE_GET_PRIVATE ((loop), PINOS_TYPE_MAIN_LOOP, PinosMainLoopPrivate))
#define DATAS_SIZE (4096 * 8)
typedef struct {
@ -48,14 +45,14 @@ typedef struct {
typedef struct _WorkItem WorkItem;
struct _WorkItem {
gulong id;
uint32_t id;
void *obj;
uint32_t seq;
SpaResult res;
PinosDeferFunc func;
void *data;
GDestroyNotify notify;
SpaList list;
bool sync;
SpaList link;
};
typedef struct
@ -75,8 +72,8 @@ typedef struct
SpaPollFd fds[1];
SpaPollItem wakeup;
SpaList work;
WorkItem *free_list;
SpaList work_list;
SpaList free_list;
gulong work_id;
} PinosMainLoopImpl;
@ -86,7 +83,7 @@ typedef struct {
SpaPollItem item;
} PollData;
static gboolean
static bool
poll_event (GIOChannel *source,
GIOCondition condition,
void * user_data)
@ -179,7 +176,7 @@ do_invoke (SpaPoll *poll,
void *user_data)
{
PinosMainLoopImpl *impl = SPA_CONTAINER_OF (poll, PinosMainLoopImpl, poll);
gboolean in_thread = FALSE;
bool in_thread = false;
SpaRingbufferArea areas[2];
InvokeItem *item;
uint64_t u = 1;
@ -220,7 +217,7 @@ do_invoke (SpaPoll *poll,
return res;
}
static gboolean
static bool
process_work_queue (PinosMainLoop *this)
{
PinosMainLoopImpl *impl = SPA_CONTAINER_OF (this, PinosMainLoopImpl, this);
@ -228,48 +225,52 @@ process_work_queue (PinosMainLoop *this)
impl->work_id = 0;
spa_list_for_each_safe (item, tmp, &impl->work, list) {
if (item->seq != SPA_ID_INVALID)
pinos_log_debug ("main-loop %p: %p %p", this, impl->work_list.next, &impl->work_list);
spa_list_for_each_safe (item, tmp, &impl->work_list, link) {
pinos_log_debug ("main-loop %p: %p %p %d %p", this, &item->link, impl->work_list.next, item->sync, item->obj);
if (item->sync) {
if (&item->link == impl->work_list.next) {
pinos_log_debug ("main-loop %p: found sync item %p", this, item->obj);
} else {
continue;
}
} else if (item->seq != SPA_ID_INVALID)
continue;
spa_list_remove (&item->list);
spa_list_remove (&item->link);
if (item->func) {
pinos_log_debug ("main-loop %p: process work item %p", this, item->obj);
pinos_log_debug ("main-loop %p: process work item %p %d %d", this, item->obj, item->sync, item->seq);
item->func (item->obj, item->data, item->res, item->id);
}
if (item->notify)
item->notify (item->data);
item->list.next = &impl->free_list->list;
impl->free_list = item;
spa_list_insert (impl->free_list.prev, &item->link);
}
return FALSE;
return false;
}
static uint32_t
main_loop_defer (PinosMainLoop *loop,
do_add_work (PinosMainLoop *loop,
void *obj,
SpaResult res,
PinosDeferFunc func,
void *data,
GDestroyNotify notify)
bool sync)
{
PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this);
WorkItem *item;
bool have_work = false;
if (impl->free_list) {
item = impl->free_list;
impl->free_list = SPA_CONTAINER_OF (item->list.next, WorkItem, list);
if (!spa_list_is_empty (&impl->free_list)) {
item = spa_list_first (&impl->free_list, WorkItem, link);
spa_list_remove (&item->link);
} else {
item = g_slice_new (WorkItem);
item = malloc (sizeof (WorkItem));
}
item->id = ++impl->counter;
item->obj = obj;
item->func = func;
item->data = data;
item->notify = notify;
item->sync = sync;
if (SPA_RESULT_IS_ASYNC (res)) {
item->seq = SPA_RESULT_ASYNC_SEQ (res);
@ -279,9 +280,9 @@ main_loop_defer (PinosMainLoop *loop,
item->seq = SPA_ID_INVALID;
item->res = res;
have_work = TRUE;
pinos_log_debug ("main-loop %p: defer object %p", loop, obj);
pinos_log_debug ("main-loop %p: defer object %p %d", loop, obj, sync);
}
spa_list_insert (impl->work.prev, &item->list);
spa_list_insert (impl->work_list.prev, &item->link);
if (impl->work_id == 0 && have_work)
impl->work_id = g_idle_add ((GSourceFunc) process_work_queue, loop);
@ -289,6 +290,16 @@ main_loop_defer (PinosMainLoop *loop,
return item->id;
}
static uint32_t
main_loop_defer (PinosMainLoop *loop,
void *obj,
SpaResult res,
PinosDeferFunc func,
void *data)
{
return do_add_work (loop, obj, res, func, data, false);
}
static void
main_loop_defer_cancel (PinosMainLoop *loop,
void *obj,
@ -298,7 +309,7 @@ main_loop_defer_cancel (PinosMainLoop *loop,
bool have_work = false;
WorkItem *item;
spa_list_for_each (item, &impl->work, list) {
spa_list_for_each (item, &impl->work_list, link) {
if ((id == 0 || item->id == id) && (obj == NULL || item->obj == obj)) {
pinos_log_debug ("main-loop %p: cancel defer %d for object %p", loop, item->seq, item->obj);
item->seq = SPA_ID_INVALID;
@ -320,7 +331,7 @@ main_loop_defer_complete (PinosMainLoop *loop,
PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this);
bool have_work = false;
spa_list_for_each (item, &impl->work, list) {
spa_list_for_each (item, &impl->work_list, link) {
if (item->obj == obj && item->seq == seq) {
pinos_log_debug ("main-loop %p: found defered %d for object %p", loop, seq, obj);
item->seq = SPA_ID_INVALID;
@ -337,6 +348,15 @@ main_loop_defer_complete (PinosMainLoop *loop,
return have_work;
}
static uint32_t
main_loop_sync (PinosMainLoop *loop,
void *obj,
PinosDeferFunc func,
void *data)
{
return do_add_work (loop, obj, SPA_RESULT_OK, func, data, true);
}
static void
main_loop_quit (PinosMainLoop *loop)
{
@ -377,6 +397,7 @@ pinos_main_loop_new (GMainContext *context)
this->defer = main_loop_defer;
this->defer_cancel = main_loop_defer_cancel;
this->defer_complete = main_loop_defer_complete;
this->sync = main_loop_sync;
impl->poll.size = sizeof (SpaPoll);
impl->poll.info = NULL;
@ -386,10 +407,11 @@ pinos_main_loop_new (GMainContext *context)
impl->poll.invoke = do_invoke;
this->poll = &impl->poll;
spa_list_init (&impl->work);
spa_list_init (&impl->work_list);
spa_list_init (&impl->free_list);
spa_ringbuffer_init (&impl->buffer, DATAS_SIZE);
impl->loop = g_main_loop_new (impl->context, FALSE);
impl->loop = g_main_loop_new (impl->context, false);
impl->fds[0].fd = eventfd (0, 0);
impl->fds[0].events = POLLIN | POLLPRI | POLLERR;
@ -412,6 +434,7 @@ void
pinos_main_loop_destroy (PinosMainLoop *loop)
{
PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this);
WorkItem *item, *tmp;
pinos_log_debug ("main-loop %p: destroy", impl);
@ -419,6 +442,7 @@ pinos_main_loop_destroy (PinosMainLoop *loop)
close (impl->fds[0].fd);
g_slice_free_chain (WorkItem, impl->free_list, list.next);
spa_list_for_each_safe (item, tmp, &impl->free_list, link)
free (item);
free (impl);
}

View file

@ -20,9 +20,9 @@
#ifndef __PINOS_MAIN_LOOP_H__
#define __PINOS_MAIN_LOOP_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
#include <spa/include/spa/poll.h>
#include <spa/include/spa/node-event.h>
@ -32,7 +32,7 @@ typedef struct _PinosMainLoop PinosMainLoop;
typedef void (*PinosDeferFunc) (void *obj,
void *data,
SpaResult res,
gulong id);
uint32_t id);
/**
* PinosMainLoop:
@ -49,8 +49,7 @@ struct _PinosMainLoop {
void *obj,
SpaResult res,
PinosDeferFunc func,
void *data,
GDestroyNotify notify);
void *data);
void (*defer_cancel) (PinosMainLoop *loop,
void *obj,
uint32_t id);
@ -58,6 +57,10 @@ struct _PinosMainLoop {
void *obj,
uint32_t seq,
SpaResult res);
uint32_t (*sync) (PinosMainLoop *loop,
void *obj,
PinosDeferFunc func,
void *data);
};
PinosMainLoop * pinos_main_loop_new (GMainContext *context);
@ -69,7 +72,10 @@ void pinos_main_loop_destroy (PinosMainLoop *loop
#define pinos_main_loop_defer(m,...) (m)->defer(m,__VA_ARGS__)
#define pinos_main_loop_defer_cancel(m,...) (m)->defer_cancel(m,__VA_ARGS__)
#define pinos_main_loop_defer_complete(m,...) (m)->defer_complete(m,__VA_ARGS__)
#define pinos_main_loop_sync(m,...) (m)->sync(m,__VA_ARGS__)
G_END_DECLS
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_MAIN_LOOP_H__ */

View file

@ -10,6 +10,7 @@ pinoscore_headers = [
'module.h',
'node.h',
'node-factory.h',
'port.h',
'registry.h',
]
@ -25,6 +26,7 @@ pinoscore_sources = [
'module.c',
'node.c',
'node-factory.c',
'port.c',
'registry.c',
]

View file

@ -21,15 +21,19 @@
#ifndef __PINOS_MODULE_H__
#define __PINOS_MODULE_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <pinos/server/core.h>
typedef struct _PinosModule PinosModule;
struct _PinosModule {
SpaList link;
gchar *name;
PinosCore *core;
SpaList link;
char *name;
PINOS_SIGNAL (destroy_signal, (PinosListener *listener,
PinosModule *module));
@ -53,6 +57,8 @@ PinosModule * pinos_module_load (PinosCore *core,
char **err);
void pinos_module_destroy (PinosModule *module);
G_END_DECLS
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_MODULE_H__ */

View file

@ -20,9 +20,9 @@
#ifndef __PINOS_NODE_FACTORY_H__
#define __PINOS_NODE_FACTORY_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
#define PINOS_NODE_FACTORY_URI "http://pinos.org/ns/node-factory"
#define PINOS_NODE_FACTORY_PREFIX PINOS_NODE_FACTORY_URI "#"
@ -42,12 +42,14 @@ struct _PinosNodeFactory {
PinosNode * (*create_node) (PinosNodeFactory *factory,
PinosClient *client,
const gchar *name,
const char *name,
PinosProperties *properties);
};
#define pinos_node_factory_create_node(f,...) (f)->create_node((f),__VA_ARGS__)
G_END_DECLS
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_NODE_FACTORY_H__ */

View file

@ -31,26 +31,6 @@
#include "pinos/dbus/org-pinos.h"
static PinosPort *
new_pinos_port (PinosNode *node, PinosDirection direction, uint32_t port)
{
PinosPort *np;
np = calloc (1, sizeof (PinosPort));
np->node = node;
np->direction = direction;
np->port = port;
np->links = g_ptr_array_new ();
pinos_signal_init (&np->destroy_signal);
return np;
}
static void
free_node_port (PinosPort *np)
{
g_ptr_array_free (np->links, TRUE);
g_slice_free (PinosPort, np);
}
typedef struct
{
PinosNode this;
@ -60,31 +40,21 @@ typedef struct
uint32_t seq;
gboolean async_init;
GList *input_ports;
GList *output_ports;
guint n_used_output_links;
guint n_used_input_links;
bool async_init;
GError *error;
guint idle_timeout;
struct {
GPtrArray *links;
} rt;
} PinosNodeImpl;
static void init_complete (PinosNode *this);
static void
update_port_ids (PinosNode *node, gboolean create)
update_port_ids (PinosNode *node, bool create)
{
PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this);
uint32_t *input_port_ids, *output_port_ids;
guint n_input_ports, n_output_ports, max_input_ports, max_output_ports;
guint i;
GList *ports;
unsigned int n_input_ports, n_output_ports, max_input_ports, max_output_ports;
unsigned int i;
SpaList *ports;
if (node->node == NULL)
return;
@ -104,79 +74,81 @@ update_port_ids (PinosNode *node, gboolean create)
max_output_ports,
output_port_ids);
node->input_port_map = realloc (node->input_port_map, sizeof (PinosPort *) * max_input_ports);
node->output_port_map = realloc (node->output_port_map, sizeof (PinosPort *) * max_output_ports);
pinos_log_debug ("node %p: update_port ids %u/%u, %u/%u", node,
n_input_ports, max_input_ports, n_output_ports, max_output_ports);
i = 0;
ports = impl->input_ports;
ports = &node->input_ports;
while (true) {
PinosPort *p = (ports ? ports->data : NULL);
PinosPort *p = (ports == &node->input_ports) ? NULL : SPA_CONTAINER_OF (ports, PinosPort, link);
if (p && i < n_input_ports && p->port == input_port_ids[i]) {
if (p && i < n_input_ports && p->port_id == input_port_ids[i]) {
node->input_port_map[p->port_id] = p;
pinos_log_debug ("node %p: exiting input port %d", node, input_port_ids[i]);
i++;
ports = g_list_next (ports);
} else if ((p && i < n_input_ports && input_port_ids[i] < p->port) || i < n_input_ports) {
ports = ports->next;
} else if ((p && i < n_input_ports && input_port_ids[i] < p->port_id) || i < n_input_ports) {
PinosPort *np;
pinos_log_debug ("node %p: input port added %d", node, input_port_ids[i]);
np = new_pinos_port (node, PINOS_DIRECTION_INPUT, input_port_ids[i]);
impl->input_ports = g_list_insert_before (impl->input_ports, ports, np);
np = pinos_port_new (node, PINOS_DIRECTION_INPUT, input_port_ids[i]);
spa_list_insert (ports, &np->link);
ports = np->link.next;
node->input_port_map[np->port_id] = np;
if (!impl->async_init)
pinos_signal_emit (&node->core->port_added, node, np);
i++;
} else if (p) {
GList *next;
pinos_log_debug ("node %p: input port removed %d", node, p->port);
next = g_list_next (ports);
impl->input_ports = g_list_delete_link (impl->input_ports, ports);
ports = next;
node->input_port_map[p->port_id] = NULL;
ports = ports->next;
if (!impl->async_init)
pinos_signal_emit (&node->core->port_removed, node, p);
free_node_port (p);
} else
pinos_log_debug ("node %p: input port removed %d", node, p->port_id);
pinos_port_destroy (p);
} else {
pinos_log_debug ("node %p: no more input ports", node);
break;
}
}
i = 0;
ports = impl->output_ports;
ports = &node->output_ports;
while (true) {
PinosPort *p = (ports ? ports->data : NULL);
PinosPort *p = (ports == &node->output_ports) ? NULL : SPA_CONTAINER_OF (ports, PinosPort, link);
if (p && i < n_output_ports && p->port == output_port_ids[i]) {
if (p && i < n_output_ports && p->port_id == output_port_ids[i]) {
pinos_log_debug ("node %p: exiting output port %d", node, output_port_ids[i]);
i++;
ports = g_list_next (ports);
} else if ((p && i < n_output_ports && output_port_ids[i] < p->port) || i < n_output_ports) {
ports = ports->next;
node->output_port_map[p->port_id] = p;
} else if ((p && i < n_output_ports && output_port_ids[i] < p->port_id) || i < n_output_ports) {
PinosPort *np;
pinos_log_debug ("node %p: output port added %d", node, output_port_ids[i]);
np = new_pinos_port (node, PINOS_DIRECTION_OUTPUT, output_port_ids[i]);
impl->output_ports = g_list_insert_before (impl->output_ports, ports, np);
np = pinos_port_new (node, PINOS_DIRECTION_OUTPUT, output_port_ids[i]);
spa_list_insert (ports, &np->link);
ports = np->link.next;
node->output_port_map[np->port_id] = np;
if (!impl->async_init)
pinos_signal_emit (&node->core->port_added, node, np);
i++;
} else if (p) {
GList *next;
pinos_log_debug ("node %p: output port removed %d", node, p->port);
next = g_list_next (ports);
impl->output_ports = g_list_delete_link (impl->output_ports, ports);
ports = next;
node->output_port_map[p->port_id] = NULL;
ports = ports->next;
if (!impl->async_init)
pinos_signal_emit (&node->core->port_removed, node, p);
free_node_port (p);
} else
pinos_log_debug ("node %p: output port removed %d", node, p->port_id);
pinos_port_destroy (p);
} else {
pinos_log_debug ("node %p: no more output ports", node);
break;
}
node->have_inputs = n_input_ports > 0;
node->have_outputs = n_output_ports > 0;
}
node->transport = pinos_transport_new (max_input_ports,
max_output_ports);
@ -227,31 +199,29 @@ start_node (PinosNode *this)
static SpaResult
suspend_node (PinosNode *this)
{
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
SpaResult res = SPA_RESULT_OK;
GList *walk;
PinosPort *p;
pinos_log_debug ("node %p: suspend node", this);
for (walk = impl->input_ports; walk; walk = g_list_next (walk)) {
PinosPort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_INPUT, p->port, 0, NULL)) < 0)
pinos_log_warn ("error unset format output: %d", res);
spa_list_for_each (p, &this->input_ports, link) {
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_INPUT, p->port_id, 0, NULL)) < 0)
pinos_log_warn ("error unset format input: %d", res);
p->buffers = NULL;
p->n_buffers = 0;
if (p->allocated)
pinos_memblock_free (&p->buffer_mem);
p->allocated = FALSE;
p->allocated = false;
}
for (walk = impl->output_ports; walk; walk = g_list_next (walk)) {
PinosPort *p = walk->data;
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_OUTPUT, p->port, 0, NULL)) < 0)
spa_list_for_each (p, &this->output_ports, link) {
if ((res = spa_node_port_set_format (this->node, SPA_DIRECTION_OUTPUT, p->port_id, 0, NULL)) < 0)
pinos_log_warn ("error unset format output: %d", res);
p->buffers = NULL;
p->n_buffers = 0;
if (p->allocated)
pinos_memblock_free (&p->buffer_mem);
p->allocated = FALSE;
p->allocated = false;
}
return res;
}
@ -297,19 +267,19 @@ do_read_link (SpaPoll *poll,
size_t offset;
SpaResult res;
if (link->input == NULL)
if (link->rt.input == NULL)
return SPA_RESULT_OK;
while (link->in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) {
SpaPortInput *input = &this->transport->inputs[link->input->port];
while (link->rt.in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) {
SpaPortInput *input = &this->transport->inputs[link->rt.input->port_id];
input->buffer_id = link->queue[offset];
if ((res = spa_node_process_input (link->input->node->node)) < 0)
if ((res = spa_node_process_input (link->rt.input->node->node)) < 0)
pinos_log_warn ("node %p: error pushing buffer: %d, %d", this, res, input->status);
spa_ringbuffer_read_advance (&link->ringbuffer, 1);
link->in_ready--;
link->rt.in_ready--;
}
return SPA_RESULT_OK;
}
@ -319,7 +289,6 @@ static void
on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{
PinosNode *this = user_data;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
@ -342,21 +311,20 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
{
SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event;
guint i;
PinosPort *port = this->input_port_map[ni->port_id];
PinosLink *link;
for (i = 0; i < impl->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (impl->rt.links, i);
if (link->input == NULL || link->input->port != ni->port_id)
spa_list_for_each (link, &port->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
link->in_ready++;
spa_poll_invoke (&link->input->node->data_loop->poll,
link->rt.in_ready++;
spa_poll_invoke (&link->rt.input->node->data_loop->poll,
do_read_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
link->input->node);
link->rt.input->node);
}
break;
}
@ -364,36 +332,33 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{
SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event;
SpaResult res;
guint i;
gboolean pushed = FALSE;
bool pushed = false;
SpaPortOutput *po = &this->transport->outputs[ho->port_id];
PinosPort *port = this->output_port_map[ho->port_id];
PinosLink *link;
if ((res = spa_node_process_output (node)) < 0) {
pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status);
break;
}
for (i = 0; i < impl->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (impl->rt.links, i);
PinosPort *output = link->output;
PinosPort *input = link->input;
spa_list_for_each (link, &port->rt.links, rt.output_link) {
size_t offset;
if (output == NULL || input == NULL ||
output->node->node != node || output->port != ho->port_id)
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) {
link->queue[offset] = po->buffer_id;
spa_ringbuffer_write_advance (&link->ringbuffer, 1);
spa_poll_invoke (&link->input->node->data_loop->poll,
spa_poll_invoke (&link->rt.input->node->data_loop->poll,
do_read_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
link->input->node);
pushed = TRUE;
link->rt.input->node);
pushed = true;
}
}
if (!pushed) {
@ -406,16 +371,15 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{
SpaResult res;
SpaNodeEventReuseBuffer *rb = (SpaNodeEventReuseBuffer *) event;
guint i;
PinosPort *port = this->input_port_map[rb->port_id];
PinosLink *link;
for (i = 0; i < impl->rt.links->len; i++) {
PinosLink *link = g_ptr_array_index (impl->rt.links, i);
if (link->input == NULL || link->input->port != rb->port_id || link->output == NULL)
spa_list_for_each (link, &port->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
if ((res = spa_node_port_reuse_buffer (link->output->node->node,
link->output->port,
if ((res = spa_node_port_reuse_buffer (link->rt.output->node->node,
link->rt.output->port_id,
rb->buffer_id)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", node, res);
}
@ -427,7 +391,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
}
}
static gboolean
static bool
handle_remove (PinosNode1 *interface,
GDBusMethodInvocation *invocation,
gpointer user_data)
@ -439,7 +403,7 @@ handle_remove (PinosNode1 *interface,
g_dbus_method_invocation_return_value (invocation,
g_variant_new ("()"));
return TRUE;
return true;
}
static void
@ -503,14 +467,13 @@ pinos_node_new (PinosCore *core,
impl->iface = pinos_node1_skeleton_new ();
g_signal_connect (impl->iface, "handle-remove",
(GCallback) handle_remove,
node);
this);
this->state = PINOS_NODE_STATE_CREATING;
pinos_node1_set_state (impl->iface, this->state);
impl->rt.links = g_ptr_array_new_full (256, NULL);
//g_signal_connect (this, "notify", (GCallback) on_property_notify, this);
spa_list_init (&this->input_ports);
spa_list_init (&this->output_ports);
if (this->node->info) {
unsigned int i;
@ -527,12 +490,11 @@ pinos_node_new (PinosCore *core,
if (this->node->state > SPA_NODE_STATE_INIT) {
init_complete (this);
} else {
impl->async_init = TRUE;
impl->async_init = true;
pinos_main_loop_defer (this->core->main_loop,
this,
SPA_RESULT_RETURN_ASYNC (0),
(PinosDeferFunc) init_complete,
NULL,
NULL);
}
spa_list_insert (core->node_list.prev, &this->list);
@ -547,6 +509,75 @@ pinos_node_new (PinosCore *core,
return this;
}
static SpaResult
do_node_remove_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
PinosPort *port, *tmp;
pinos_main_loop_defer_cancel (this->core->main_loop, this, 0);
spa_list_for_each_safe (port, tmp, &this->input_ports, link)
pinos_port_destroy (port);
spa_list_for_each_safe (port, tmp, &this->output_ports, link)
pinos_port_destroy (port);
g_clear_object (&impl->iface);
free (this->name);
free (this->error);
if (this->properties)
pinos_properties_free (this->properties);
free (impl);
return SPA_RESULT_OK;
}
static SpaResult
do_node_remove (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosPort *port, *tmp;
SpaResult res;
pause_node (this);
spa_list_for_each_safe (port, tmp, &this->input_ports, link) {
PinosLink *link, *tlink;
spa_list_for_each_safe (link, tlink, &port->rt.links, rt.input_link) {
spa_list_remove (&link->rt.input_link);
link->rt.input = NULL;
}
}
spa_list_for_each_safe (port, tmp, &this->output_ports, link) {
PinosLink *link, *tlink;
spa_list_for_each_safe (link, tlink, &port->rt.links, rt.output_link) {
spa_list_remove (&link->rt.output_link);
link->rt.output = NULL;
}
}
res = spa_poll_invoke (this->core->main_loop->poll,
do_node_remove_done,
seq,
0,
NULL,
this);
return res;
}
/**
* pinos_node_destroy:
* @node: a #PinosNode
@ -562,19 +593,15 @@ pinos_node_destroy (PinosNode * this)
pinos_log_debug ("node %p: destroy", impl);
pinos_signal_emit (&this->destroy_signal, this);
pinos_node_set_state (this, PINOS_NODE_STATE_SUSPENDED);
spa_list_remove (&this->list);
pinos_core_remove_global (this->core, this->global);
pinos_main_loop_defer_cancel (this->core->main_loop, this, 0);
g_clear_object (&impl->iface);
free (this->name);
g_clear_error (&impl->error);
if (this->properties)
pinos_properties_free (this->properties);
free (impl);
spa_poll_invoke (&this->data_loop->poll,
do_node_remove,
impl->seq++,
0,
NULL,
this);
}
/**
@ -605,37 +632,35 @@ PinosPort *
pinos_node_get_free_port (PinosNode *node,
PinosDirection direction)
{
PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this);
guint free_port, n_ports, max_ports;
GList *ports, *walk;
PinosPort *port = NULL;
unsigned int free_port, n_ports, max_ports;
SpaList *ports;
PinosPort *port = NULL, *p;
g_return_val_if_fail (node, NULL);
if (direction == PINOS_DIRECTION_INPUT) {
max_ports = node->transport->area->max_inputs;
n_ports = node->transport->area->n_inputs;
ports = impl->input_ports;
ports = &node->input_ports;
} else {
max_ports = node->transport->area->max_outputs;
n_ports = node->transport->area->n_outputs;
ports = impl->output_ports;
ports = &node->output_ports;
}
free_port = 0;
pinos_log_debug ("node %p: direction %d max %u, n %u, free_port %u", node, direction, max_ports, n_ports, free_port);
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *p = walk->data;
if (free_port < p->port) {
spa_list_for_each (p, ports, link) {
if (free_port < p->port_id) {
port = p;
break;
}
free_port = p->port + 1;
free_port = p->port_id + 1;
}
if (free_port >= max_ports && ports) {
port = ports->data;
if (free_port >= max_ports && !spa_list_is_empty (ports)) {
port = spa_list_first (ports, PinosPort, link);
} else
return NULL;
@ -643,343 +668,6 @@ pinos_node_get_free_port (PinosNode *node,
}
static SpaResult
do_add_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
PinosLink *link = ((PinosLink**)data)[0];
g_ptr_array_add (impl->rt.links, link);
return SPA_RESULT_OK;
}
static PinosLink *
find_link (PinosPort *output_port, PinosPort *input_port)
{
guint i;
for (i = 0; i < output_port->links->len; i++) {
PinosLink *pl = g_ptr_array_index (output_port->links, i);
if (pl->input == input_port) {
return pl;
}
}
return NULL;
}
PinosLink *
pinos_port_get_link (PinosPort *output_port,
PinosPort *input_port)
{
g_return_val_if_fail (output_port != NULL, NULL);
g_return_val_if_fail (input_port != NULL, NULL);
return find_link (output_port, input_port);
}
/**
* pinos_port_link:
* @output_port: an output port
* @input_port: an input port
* @format_filter: a format filter
* @properties: extra properties
* @error: an error or %NULL
*
* Make a link between @output_port and @input_port
*
* If the ports were already linked, the existing links will be returned.
*
* Returns: a new #PinosLink or %NULL and @error is set.
*/
PinosLink *
pinos_port_link (PinosPort *output_port,
PinosPort *input_port,
GPtrArray *format_filter,
PinosProperties *properties,
GError **error)
{
PinosNodeImpl *input_impl, *output_impl;
PinosNode *input_node, *output_node;
PinosLink *link;
g_return_val_if_fail (output_port != NULL, NULL);
g_return_val_if_fail (input_port != NULL, NULL);
output_node = output_port->node;
output_impl = SPA_CONTAINER_OF (output_node, PinosNodeImpl, this);
input_node = input_port->node;
input_impl = SPA_CONTAINER_OF (input_node, PinosNodeImpl, this);
pinos_log_debug ("port link %p:%u -> %p:%u", output_node, output_port->port, input_node, input_port->port);
if (output_node == input_node)
goto same_node;
if (input_port->links->len > 0)
goto was_linked;
link = find_link (output_port, input_port);
if (link == NULL) {
input_node->live = output_node->live;
if (output_node->clock)
input_node->clock = output_node->clock;
pinos_log_debug ("node %p: clock %p, live %d", output_node, output_node->clock, output_node->live);
link = pinos_link_new (output_node->core,
output_port,
input_port,
format_filter,
properties);
g_ptr_array_add (output_port->links, link);
g_ptr_array_add (input_port->links, link);
output_impl->n_used_output_links++;
input_impl->n_used_input_links++;
spa_poll_invoke (&output_node->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
output_node);
spa_poll_invoke (&input_node->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
input_node);
}
return link;
same_node:
{
g_set_error (error,
PINOS_ERROR,
PINOS_ERROR_NODE_LINK,
"can't link a node to itself");
return NULL;
}
was_linked:
{
g_set_error (error,
PINOS_ERROR,
PINOS_ERROR_NODE_LINK,
"input port was already linked");
return NULL;
}
}
static SpaResult
pinos_port_pause (PinosPort *port)
{
SpaNodeCommand cmd;
cmd.type = SPA_NODE_COMMAND_PAUSE;
cmd.size = sizeof (cmd);
return spa_node_port_send_command (port->node->node,
port->direction,
port->port,
&cmd);
}
static SpaResult
do_remove_link_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
PinosLink *link = ((PinosLink**)data)[0];
pinos_log_debug ("port %p: finish unlink", port);
if (port->direction == PINOS_DIRECTION_OUTPUT) {
if (g_ptr_array_remove_fast (port->links, link))
impl->n_used_output_links--;
link->output = NULL;
} else {
if (g_ptr_array_remove_fast (port->links, link))
impl->n_used_input_links--;
link->input = NULL;
}
if (impl->n_used_output_links == 0 &&
impl->n_used_input_links == 0) {
pinos_node_report_idle (this);
}
if (!port->allocated) {
pinos_log_debug ("port %p: clear buffers on port", port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
}
pinos_main_loop_defer_complete (this->core->main_loop,
port,
seq,
SPA_RESULT_OK);
g_object_unref (link);
g_object_unref (port->node);
return SPA_RESULT_OK;
}
static SpaResult
do_remove_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this);
PinosLink *link = ((PinosLink**)data)[0];
SpaResult res;
#if 0
/* FIXME we should only pause when all links are gone */
pinos_port_pause (port);
#endif
g_ptr_array_remove_fast (impl->rt.links, link);
res = spa_poll_invoke (this->core->main_loop->poll,
do_remove_link_done,
seq,
sizeof (PinosLink *),
&link,
port);
return res;
}
SpaResult
pinos_port_unlink (PinosPort *port, PinosLink *link)
{
SpaResult res;
PinosNodeImpl *impl = SPA_CONTAINER_OF (port->node, PinosNodeImpl, this);
pinos_log_debug ("port %p: start unlink %p", port, link);
g_object_ref (link);
g_object_ref (port->node);
res = spa_poll_invoke (&port->node->data_loop->poll,
do_remove_link,
impl->seq++,
sizeof (PinosLink *),
&link,
port);
return res;
}
static SpaResult
do_clear_buffers_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
SpaResult res;
pinos_log_debug ("port %p: clear buffers finish", port);
res = spa_node_port_use_buffers (port->node->node,
port->direction,
port->port,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
pinos_main_loop_defer_complete (this->core->main_loop,
port,
seq,
res);
return res;
}
static SpaResult
do_clear_buffers (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
SpaResult res;
pinos_port_pause (port);
res = spa_poll_invoke (this->core->main_loop->poll,
do_clear_buffers_done,
seq,
0, NULL,
port);
return res;
}
SpaResult
pinos_port_clear_buffers (PinosPort *port)
{
SpaResult res;
PinosNodeImpl *impl = SPA_CONTAINER_OF (port->node, PinosNodeImpl, this);
pinos_log_debug ("port %p: clear buffers", port);
res = spa_poll_invoke (&port->node->data_loop->poll,
do_clear_buffers,
impl->seq++,
0, NULL,
port);
return res;
}
/**
* pinos_node_get_ports:
* @node: a #PinosNode
* @direction: a #PinosDirection
*
* Get the port in @node.
*
* Returns: a #GList of #PinosPort g_list_free after usage.
*/
GList *
pinos_node_get_ports (PinosNode *node, PinosDirection direction)
{
GList *ports;
PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this);
g_return_val_if_fail (node, NULL);
if (direction == PINOS_DIRECTION_INPUT) {
ports = impl->input_ports;
} else {
ports = impl->output_ports;
}
return ports;
}
static void
remove_idle_timeout (PinosNode *node)
{
@ -998,12 +686,10 @@ on_state_complete (PinosNode *node,
{
PinosNodeState state = GPOINTER_TO_INT (data);
pinos_log_debug ("node %p: state complete %d", node, res);
if (SPA_RESULT_IS_ERROR (res)) {
GError *error = NULL;
g_set_error (&error,
PINOS_ERROR,
PINOS_ERROR_NODE_STATE,
"error changing node state: %d", res);
char *error;
asprintf (&error, "error changing node state: %d", res);
pinos_node_report_error (node, error);
} else
pinos_node_update_state (node, state);
@ -1060,8 +746,7 @@ pinos_node_set_state (PinosNode *node,
node,
res,
(PinosDeferFunc) on_state_complete,
GINT_TO_POINTER (state),
NULL);
GINT_TO_POINTER (state));
return res;
}
@ -1098,30 +783,30 @@ pinos_node_update_state (PinosNode *node,
/**
* pinos_node_report_error:
* @node: a #PinosNode
* @error: a #GError
* @error: an error message
*
* Report an error from within @node.
*/
void
pinos_node_report_error (PinosNode *node,
GError *error)
char *error)
{
PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this);
PinosNodeState old;
g_return_if_fail (node);
g_clear_error (&impl->error);
free (node->error);
remove_idle_timeout (node);
impl->error = error;
node->error = error;
old = node->state;
node->state = PINOS_NODE_STATE_ERROR;
pinos_log_debug ("node %p: got error state %s", node, error->message);
pinos_log_debug ("node %p: got error state %s", node, error);
pinos_node1_set_state (impl->iface, PINOS_NODE_STATE_ERROR);
pinos_signal_emit (&node->core->node_state_changed, node, old, node->state);
}
static gboolean
static bool
idle_timeout (PinosNode *node)
{
PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this);

View file

@ -20,14 +20,13 @@
#ifndef __PINOS_NODE_H__
#define __PINOS_NODE_H__
#include <glib-object.h>
G_BEGIN_DECLS
#ifdef __cplusplus
extern "C" {
#endif
#define PINOS_NODE_URI "http://pinos.org/ns/node"
#define PINOS_NODE_PREFIX PINOS_NODE_URI "#"
typedef struct _PinosPort PinosPort;
typedef struct _PinosNode PinosNode;
#include <spa/include/spa/node.h>
@ -36,36 +35,12 @@ typedef struct _PinosNode PinosNode;
#include <pinos/client/mem.h>
#include <pinos/client/transport.h>
#include <pinos/server/daemon.h>
#include <pinos/server/core.h>
#include <pinos/server/port.h>
#include <pinos/server/link.h>
#include <pinos/server/client.h>
#include <pinos/server/data-loop.h>
struct _PinosPort {
PINOS_SIGNAL (destroy_signal, (PinosListener *listener, PinosPort *));
PinosNode *node;
PinosDirection direction;
uint32_t port;
gboolean allocated;
PinosMemblock buffer_mem;
SpaBuffer **buffers;
guint n_buffers;
GPtrArray *links;
};
typedef struct {
uint32_t seq;
SpaResult res;
} PinosNodeAsyncCompleteData;
typedef struct {
PinosNodeState old;
PinosNodeState state;
} PinosNodeStateChangeData;
/**
* PinosNode:
*
@ -76,17 +51,26 @@ struct _PinosNode {
SpaList list;
PinosGlobal *global;
bool unlinking;
char *name;
PinosProperties *properties;
PinosNodeState state;
char *error;
SpaHandle *handle;
SpaNode *node;
bool live;
SpaClock *clock;
gboolean have_inputs;
gboolean have_outputs;
SpaList input_ports;
SpaList output_ports;
PinosPort **input_port_map;
PinosPort **output_port_map;
unsigned int n_used_output_links;
unsigned int n_used_input_links;
PinosTransport *transport;
PINOS_SIGNAL (transport_changed, (PinosListener *listener,
@ -104,13 +88,6 @@ struct _PinosNode {
PINOS_SIGNAL (loop_changed, (PinosListener *listener,
PinosNode *object));
PinosPort * (*get_free_port) (PinosNode *node,
PinosDirection direction);
PinosPort * (*enum_ports) (PinosNode *node,
PinosDirection direction,
void **state);
SpaResult (*set_state) (PinosNode *node,
PinosNodeState state);
};
PinosNode * pinos_node_new (PinosCore *core,
@ -129,27 +106,16 @@ PinosClient * pinos_node_get_client (PinosNode *node);
PinosPort * pinos_node_get_free_port (PinosNode *node,
PinosDirection direction);
GList * pinos_node_get_ports (PinosNode *node,
PinosDirection direction);
gboolean pinos_node_set_state (PinosNode *node, PinosNodeState state);
SpaResult pinos_node_set_state (PinosNode *node, PinosNodeState state);
void pinos_node_update_state (PinosNode *node, PinosNodeState state);
void pinos_node_report_error (PinosNode *node, GError *error);
void pinos_node_report_error (PinosNode *node, char *error);
void pinos_node_report_idle (PinosNode *node);
void pinos_node_report_busy (PinosNode *node);
PinosLink * pinos_port_link (PinosPort *output_port,
PinosPort *input_port,
GPtrArray *format_filter,
PinosProperties *properties,
GError **error);
SpaResult pinos_port_unlink (PinosPort *port,
PinosLink *link);
#ifdef __cplusplus
}
#endif
SpaResult pinos_port_clear_buffers (PinosPort *port);
G_END_DECLS
#endif /* __PINOS_NODE_H__ */

376
pinos/server/port.c Normal file
View file

@ -0,0 +1,376 @@
/* Pinos
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include "pinos/client/pinos.h"
#include "pinos/client/enumtypes.h"
#include "pinos/server/port.h"
typedef struct
{
PinosPort this;
uint32_t seq;
} PinosPortImpl;
PinosPort *
pinos_port_new (PinosNode *node,
PinosDirection direction,
uint32_t port_id)
{
PinosPortImpl *impl;
PinosPort *this;
impl = calloc (1, sizeof (PinosPortImpl));
this = &impl->this;
this->node = node;
this->direction = direction;
this->port_id = port_id;
spa_list_init (&this->links);
spa_list_init (&this->rt.links);
pinos_signal_init (&this->destroy_signal);
return this;
}
void
pinos_port_destroy (PinosPort *port)
{
pinos_log_debug ("port %p: destroy", port);
pinos_signal_emit (&port->destroy_signal, port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
spa_list_remove (&port->link);
free (port);
}
static SpaResult
do_add_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *this = user_data;
PinosLink *link = ((PinosLink**)data)[0];
if (this->direction == PINOS_DIRECTION_INPUT) {
spa_list_insert (this->rt.links.prev, &link->rt.input_link);
link->rt.input = this;
}
else {
spa_list_insert (this->rt.links.prev, &link->rt.output_link);
link->rt.output = this;
}
return SPA_RESULT_OK;
}
static PinosLink *
find_link (PinosPort *output_port, PinosPort *input_port)
{
PinosLink *pl;
spa_list_for_each (pl, &output_port->links, output_link) {
if (pl->input == input_port)
return pl;
}
return NULL;
}
PinosLink *
pinos_port_get_link (PinosPort *output_port,
PinosPort *input_port)
{
return find_link (output_port, input_port);
}
/**
* pinos_port_link:
* @output_port: an output port
* @input_port: an input port
* @format_filter: a format filter
* @properties: extra properties
* @error: an error or %NULL
*
* Make a link between @output_port and @input_port
*
* If the ports were already linked, the existing links will be returned.
*
* Returns: a new #PinosLink or %NULL and @error is set.
*/
PinosLink *
pinos_port_link (PinosPort *output_port,
PinosPort *input_port,
SpaFormat **format_filter,
PinosProperties *properties,
char **error)
{
PinosNode *input_node, *output_node;
PinosLink *link;
output_node = output_port->node;
input_node = input_port->node;
pinos_log_debug ("port link %p:%u -> %p:%u", output_node, output_port->port_id, input_node, input_port->port_id);
if (output_node == input_node)
goto same_node;
if (!spa_list_is_empty (&input_port->links))
goto was_linked;
link = find_link (output_port, input_port);
if (link == NULL) {
input_node->live = output_node->live;
if (output_node->clock)
input_node->clock = output_node->clock;
pinos_log_debug ("node %p: clock %p, live %d", output_node, output_node->clock, output_node->live);
link = pinos_link_new (output_node->core,
output_port,
input_port,
format_filter,
properties);
spa_list_insert (output_port->links.prev, &link->output_link);
spa_list_insert (input_port->links.prev, &link->input_link);
output_node->n_used_output_links++;
input_node->n_used_input_links++;
spa_poll_invoke (&output_node->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
output_port);
spa_poll_invoke (&input_node->data_loop->poll,
do_add_link,
SPA_ID_INVALID,
sizeof (PinosLink *),
&link,
input_port);
}
return link;
same_node:
{
asprintf (error, "can't link a node to itself");
return NULL;
}
was_linked:
{
asprintf (error, "input port was already linked");
return NULL;
}
}
static SpaResult
pinos_port_pause (PinosPort *port)
{
SpaNodeCommand cmd;
cmd.type = SPA_NODE_COMMAND_PAUSE;
cmd.size = sizeof (cmd);
return spa_node_port_send_command (port->node->node,
port->direction,
port->port_id,
&cmd);
}
static SpaResult
do_remove_link_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *node = port->node;
PinosLink *link = ((PinosLink**)data)[0];
pinos_log_debug ("port %p: finish unlink", port);
if (port->direction == PINOS_DIRECTION_OUTPUT) {
if (link->output) {
spa_list_remove (&link->output_link);
node->n_used_output_links--;
link->output = NULL;
}
} else {
if (link->input) {
spa_list_remove (&link->input_link);
node->n_used_input_links--;
link->input = NULL;
}
}
if (node->n_used_output_links == 0 &&
node->n_used_input_links == 0) {
pinos_node_report_idle (node);
}
if (!port->allocated) {
pinos_log_debug ("port %p: clear buffers on port", port);
spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
}
pinos_main_loop_defer_complete (node->core->main_loop,
port,
seq,
SPA_RESULT_OK);
return SPA_RESULT_OK;
}
static SpaResult
do_remove_link (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *this = port->node;
PinosLink *link = ((PinosLink**)data)[0];
SpaResult res;
if (port->direction == PINOS_DIRECTION_INPUT) {
spa_list_remove (&link->rt.input_link);
link->rt.input = NULL;
} else {
spa_list_remove (&link->rt.output_link);
link->rt.output = NULL;
}
#if 0
if (spa_list_is_empty (&port->rt.links))
pinos_port_pause (port);
#endif
res = spa_poll_invoke (this->core->main_loop->poll,
do_remove_link_done,
seq,
sizeof (PinosLink *),
&link,
port);
return res;
}
SpaResult
pinos_port_unlink (PinosPort *port, PinosLink *link)
{
SpaResult res;
PinosPortImpl *impl = SPA_CONTAINER_OF (port, PinosPortImpl, this);
pinos_log_debug ("port %p: start unlink %p", port, link);
res = spa_poll_invoke (&port->node->data_loop->poll,
do_remove_link,
impl->seq++,
sizeof (PinosLink *),
&link,
port);
return res;
}
static SpaResult
do_clear_buffers_done (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *node = port->node;
SpaResult res;
pinos_log_debug ("port %p: clear buffers finish", port);
res = spa_node_port_use_buffers (port->node->node,
port->direction,
port->port_id,
NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
pinos_main_loop_defer_complete (node->core->main_loop,
port,
seq,
res);
return res;
}
static SpaResult
do_clear_buffers (SpaPoll *poll,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosPort *port = user_data;
PinosNode *node = port->node;
SpaResult res;
pinos_port_pause (port);
res = spa_poll_invoke (node->core->main_loop->poll,
do_clear_buffers_done,
seq,
0, NULL,
port);
return res;
}
SpaResult
pinos_port_clear_buffers (PinosPort *port)
{
SpaResult res;
PinosPortImpl *impl = SPA_CONTAINER_OF (port, PinosPortImpl, this);
pinos_log_debug ("port %p: clear buffers", port);
res = spa_poll_invoke (&port->node->data_loop->poll,
do_clear_buffers,
impl->seq++,
0, NULL,
port);
return res;
}

82
pinos/server/port.h Normal file
View file

@ -0,0 +1,82 @@
/* Pinos
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __PINOS_PORT_H__
#define __PINOS_PORT_H__
#ifdef __cplusplus
extern "C" {
#endif
#define PINOS_PORT_URI "http://pinos.org/ns/port"
#define PINOS_PORT_PREFIX PINOS_PORT_URI "#"
typedef struct _PinosPort PinosPort;
#include <spa/include/spa/node.h>
#include <pinos/client/introspect.h>
#include <pinos/client/mem.h>
#include <pinos/server/core.h>
#include <pinos/server/link.h>
struct _PinosPort {
SpaList link;
PINOS_SIGNAL (destroy_signal, (PinosListener *listener, PinosPort *));
PinosNode *node;
PinosDirection direction;
uint32_t port_id;
bool allocated;
PinosMemblock buffer_mem;
SpaBuffer **buffers;
unsigned int n_buffers;
SpaList links;
struct {
SpaList links;
} rt;
};
PinosPort * pinos_port_new (PinosNode *node,
PinosDirection direction,
uint32_t port_id);
void pinos_port_destroy (PinosPort *port);
PinosLink * pinos_port_link (PinosPort *output_port,
PinosPort *input_port,
SpaFormat **format_filter,
PinosProperties *properties,
char **error);
SpaResult pinos_port_unlink (PinosPort *port,
PinosLink *link);
SpaResult pinos_port_clear_buffers (PinosPort *port);
#ifdef __cplusplus
}
#endif
#endif /* __PINOS_PORT_H__ */

View file

@ -270,6 +270,7 @@ do_pause (SpaPoll *poll,
0,
cmd);
if (async) {
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ac.event.size = sizeof (SpaNodeEventAsyncComplete);
ac.seq = seq;
@ -280,7 +281,8 @@ do_pause (SpaPoll *poll,
sizeof (ac),
&ac,
this);
return SPA_RESULT_OK;
}
return res;
}
static SpaResult
@ -322,6 +324,7 @@ do_start (SpaPoll *poll,
0,
cmd);
if (async) {
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ac.event.size = sizeof (SpaNodeEventAsyncComplete);
ac.seq = seq;
@ -332,6 +335,7 @@ do_start (SpaPoll *poll,
sizeof (ac),
&ac,
this);
}
return SPA_RESULT_OK;
}