use interfaces in client

This commit is contained in:
Wim Taymans 2017-03-03 17:43:23 +01:00
parent e0813b679d
commit b9a0b067be
10 changed files with 1163 additions and 842 deletions

View file

@ -307,33 +307,29 @@ static void
add_node_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeUpdate nu = { 0, };
unsigned int max_input_ports = 0, max_output_ports = 0;
nu.change_mask = change_mask;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS)
nu.max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0;
max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS)
nu.max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
nu.props = NULL;
max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_UPDATE,
&nu,
flush);
pinos_client_node_do_update (impl->node_proxy,
change_mask,
max_input_ports,
max_output_ports,
NULL);
}
static void
add_state_change (PinosStream *stream, SpaNodeState state, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeStateChange sc;
if (impl->node_state != state) {
sc.state = impl->node_state = state;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_STATE_CHANGE,
&sc,
flush);
impl->node_state = state;
pinos_client_node_do_state_change (impl->node_proxy,
state);
}
}
@ -341,26 +337,16 @@ static void
add_port_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessagePortUpdate pu = { 0, };;
pu.direction = impl->direction;
pu.port_id = impl->port_id;
pu.change_mask = change_mask;
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS) {
pu.n_possible_formats = impl->n_possible_formats;
pu.possible_formats = impl->possible_formats;
}
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_FORMAT) {
pu.format = impl->format;
}
pu.props = NULL;
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_INFO) {
pu.info = &impl->port_info;
}
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_PORT_UPDATE,
&pu,
flush);
pinos_client_node_do_port_update (impl->node_proxy,
impl->direction,
impl->port_id,
change_mask,
impl->n_possible_formats,
impl->possible_formats,
impl->format,
NULL,
&impl->port_info);
}
static inline void
@ -401,19 +387,15 @@ static void
add_request_clock_update (PinosStream *stream, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeEvent cne;
SpaNodeEventRequestClockUpdate rcu;
cne.event = &rcu.event;
rcu.event.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE;
rcu.event.size = sizeof (rcu);
rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME;
rcu.timestamp = 0;
rcu.offset = 0;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_EVENT,
&cne,
flush);
pinos_client_node_do_event (impl->node_proxy,
&rcu.event);
}
static void
@ -423,18 +405,14 @@ add_async_complete (PinosStream *stream,
bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeEvent cne;
SpaNodeEventAsyncComplete ac;
cne.event = &ac.event;
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ac.event.size = sizeof (ac);
ac.seq = seq;
ac.res = res;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_EVENT,
&cne,
flush);
pinos_client_node_do_event (impl->node_proxy,
&ac.event);
}
static void
@ -678,220 +656,283 @@ handle_node_command (PinosStream *stream,
return true;
}
static SpaResult
stream_dispatch_func (void *object,
PinosMessageType type,
void *message,
void *data)
static void
client_node_done (void *object,
uint32_t seq,
int datafd)
{
PinosStream *stream = data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
switch (type) {
case PINOS_MESSAGE_CREATE_NODE_DONE:
pinos_log_warn ("create node done %d", type);
break;
case PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE:
{
PinosMessageCreateClientNodeDone *cnd = message;
pinos_log_warn ("create client node done %d", type);
handle_socket (stream, cnd->datafd);
do_node_init (stream);
break;
}
case PINOS_MESSAGE_ADD_PORT:
case PINOS_MESSAGE_REMOVE_PORT:
pinos_log_warn ("add/remove port not supported");
break;
case PINOS_MESSAGE_SET_FORMAT:
{
PinosMessageSetFormat *p = message;
void *mem;
if (impl->format)
free (impl->format);
mem = malloc (pinos_serialize_format_get_size (p->format));
impl->format = pinos_serialize_format_copy_into (mem, p->format);
impl->pending_seq = p->seq;
pinos_signal_emit (&stream->format_changed, stream, impl->format);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
break;
}
case PINOS_MESSAGE_SET_PROPERTY:
pinos_log_warn ("set property not implemented");
break;
case PINOS_MESSAGE_ADD_MEM:
{
PinosMessageAddMem *p = message;
MemId *m;
m = find_mem (stream, p->mem_id);
if (m) {
pinos_log_debug ("update mem %u, fd %d, flags %d, off %zd, size %zd",
p->mem_id, p->memfd, p->flags, p->offset, p->size);
clear_memid (m);
} else {
m = pinos_array_add (&impl->mem_ids, sizeof (MemId));
pinos_log_debug ("add mem %u, fd %d, flags %d, off %zd, size %zd",
p->mem_id, p->memfd, p->flags, p->offset, p->size);
}
m->id = p->mem_id;
m->fd = p->memfd;
m->flags = p->flags;
m->ptr = NULL;
m->offset = p->offset;
m->size = p->size;
break;
}
case PINOS_MESSAGE_USE_BUFFERS:
{
PinosMessageUseBuffers *p = message;
BufferId *bid;
unsigned int i, j, len;
SpaBuffer *b;
/* clear previous buffers */
clear_buffers (stream);
for (i = 0; i < p->n_buffers; i++) {
off_t offset = 0;
MemId *mid = find_mem (stream, p->buffers[i].mem_id);
if (mid == NULL) {
pinos_log_warn ("unknown memory id %u", p->buffers[i].mem_id);
continue;
}
if (mid->ptr == NULL) {
mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
pinos_log_warn ("Failed to mmap memory %zd %p: %s", mid->size, mid, strerror (errno));
continue;
}
mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void);
}
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
b = p->buffers[i].buffer;
bid->buf_ptr = SPA_MEMBER (mid->ptr, p->buffers[i].offset, void);
{
size_t size;
size = pinos_serialize_buffer_get_size (p->buffers[i].buffer);
b = bid->buf = malloc (size);
pinos_serialize_buffer_copy_into (b, p->buffers[i].buffer);
}
bid->id = b->id;
if (bid->id != len) {
pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = false;
}
pinos_log_debug ("add buffer %d %d %zd", mid->id, bid->id, p->buffers[i].offset);
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
m->data = SPA_MEMBER (bid->buf_ptr, offset, void);
offset += m->size;
}
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk);
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_MEMFD;
d->data = NULL;
d->fd = bmid->fd;
pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd);
break;
}
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void);
d->fd = -1;
pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data);
break;
}
default:
pinos_log_warn ("unknown buffer data type %d", d->type);
break;
}
}
pinos_signal_emit (&stream->add_buffer, stream, bid->id);
}
if (p->n_buffers) {
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
} else {
clear_mems (stream);
add_state_change (stream, SPA_NODE_STATE_READY, false);
}
add_async_complete (stream, p->seq, SPA_RESULT_OK, true);
if (p->n_buffers)
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
else
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
break;
}
case PINOS_MESSAGE_NODE_EVENT:
{
PinosMessageNodeEvent *p = message;
handle_node_event (stream, p->event);
break;
}
case PINOS_MESSAGE_NODE_COMMAND:
{
PinosMessageNodeCommand *p = message;
handle_node_command (stream, p->seq, p->command);
break;
}
case PINOS_MESSAGE_PORT_COMMAND:
{
break;
}
case PINOS_MESSAGE_TRANSPORT_UPDATE:
{
PinosMessageTransportUpdate *p = message;
PinosTransportInfo info;
info.memfd = p->memfd;
if (info.memfd == -1)
break;
info.offset = p->offset;
info.size = p->size;
if (impl->trans)
pinos_transport_destroy (impl->trans);
impl->trans = pinos_transport_new_from_info (&info);
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
break;
}
default:
case PINOS_MESSAGE_INVALID:
pinos_log_warn ("unhandled message %d", type);
break;
}
return SPA_RESULT_OK;
pinos_log_warn ("create client node done");
handle_socket (stream, datafd);
do_node_init (stream);
}
static void
client_node_event (void *object,
SpaNodeEvent *event)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_event (stream, event);
}
static void
client_node_add_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("add port not supported");
}
static void
client_node_remove_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("remove port not supported");
}
static void
client_node_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
void *mem;
if (impl->format)
free (impl->format);
mem = malloc (pinos_serialize_format_get_size (format));
impl->format = pinos_serialize_format_copy_into (mem, format);
impl->pending_seq = seq;
pinos_signal_emit (&stream->format_changed, stream, impl->format);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
static void
client_node_set_property (void *object,
uint32_t seq,
uint32_t id,
size_t size,
void *value)
{
pinos_log_warn ("set property not implemented");
}
static void
client_node_add_mem (void *object,
SpaDirection direction,
uint32_t port_id,
uint32_t mem_id,
SpaDataType type,
int memfd,
uint32_t flags,
off_t offset,
size_t size)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *m;
m = find_mem (stream, mem_id);
if (m) {
pinos_log_debug ("update mem %u, fd %d, flags %d, off %zd, size %zd",
mem_id, memfd, flags, offset, size);
clear_memid (m);
} else {
m = pinos_array_add (&impl->mem_ids, sizeof (MemId));
pinos_log_debug ("add mem %u, fd %d, flags %d, off %zd, size %zd",
mem_id, memfd, flags, offset, size);
}
m->id = mem_id;
m->fd = memfd;
m->flags = flags;
m->ptr = NULL;
m->offset = offset;
m->size = size;
}
static void
client_node_use_buffers (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
unsigned int n_buffers,
PinosClientNodeBuffer *buffers)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
unsigned int i, j, len;
SpaBuffer *b;
/* clear previous buffers */
clear_buffers (stream);
for (i = 0; i < n_buffers; i++) {
off_t offset = 0;
MemId *mid = find_mem (stream, buffers[i].mem_id);
if (mid == NULL) {
pinos_log_warn ("unknown memory id %u", buffers[i].mem_id);
continue;
}
if (mid->ptr == NULL) {
mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
pinos_log_warn ("Failed to mmap memory %zd %p: %s", mid->size, mid, strerror (errno));
continue;
}
mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void);
}
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
b = buffers[i].buffer;
bid->buf_ptr = SPA_MEMBER (mid->ptr, buffers[i].offset, void);
{
size_t size;
size = pinos_serialize_buffer_get_size (buffers[i].buffer);
b = bid->buf = malloc (size);
pinos_serialize_buffer_copy_into (b, buffers[i].buffer);
}
bid->id = b->id;
if (bid->id != len) {
pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = false;
}
pinos_log_debug ("add buffer %d %d %zd", mid->id, bid->id, buffers[i].offset);
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
m->data = SPA_MEMBER (bid->buf_ptr, offset, void);
offset += m->size;
}
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk);
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_MEMFD;
d->data = NULL;
d->fd = bmid->fd;
pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd);
break;
}
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void);
d->fd = -1;
pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data);
break;
}
default:
pinos_log_warn ("unknown buffer data type %d", d->type);
break;
}
}
pinos_signal_emit (&stream->add_buffer, stream, bid->id);
}
if (n_buffers) {
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
} else {
clear_mems (stream);
add_state_change (stream, SPA_NODE_STATE_READY, false);
}
add_async_complete (stream, seq, SPA_RESULT_OK, true);
if (n_buffers)
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
else
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
static void
client_node_node_command (void *object,
uint32_t seq,
SpaNodeCommand *command)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_command (stream, seq, command);
}
static void
client_node_port_command (void *object,
uint32_t port_id,
SpaNodeCommand *command)
{
pinos_log_warn ("port command not supported");
}
static void
client_node_transport (void *object,
int memfd,
off_t offset,
size_t size)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosTransportInfo info;
info.memfd = memfd;
if (info.memfd == -1)
return;
info.offset = offset;
info.size = size;
if (impl->trans)
pinos_transport_destroy (impl->trans);
impl->trans = pinos_transport_new_from_info (&info);
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
}
static const PinosClientNodeEvent client_node_events = {
&client_node_done,
&client_node_event,
&client_node_add_port,
&client_node_remove_port,
&client_node_set_format,
&client_node_set_property,
&client_node_add_mem,
&client_node_use_buffers,
&client_node_node_command,
&client_node_port_command,
&client_node_transport
};
typedef void (*MarshallFunc) (void *object, void *data, size_t size);
extern const PinosClientNodeInterface client_node_interface;
extern const MarshallFunc client_node_marshall[];
static void
on_node_proxy_destroy (PinosListener *listener,
PinosProxy *proxy)
@ -932,7 +973,6 @@ pinos_stream_connect (PinosStream *stream,
SpaFormat **possible_formats)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageCreateClientNode ccn;
impl->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
impl->port_id = 0;
@ -961,19 +1001,16 @@ pinos_stream_connect (PinosStream *stream,
&impl->node_proxy_destroy,
on_node_proxy_destroy);
pinos_proxy_set_dispatch (impl->node_proxy,
stream_dispatch_func,
impl);
impl->node_proxy->user_data = stream;
impl->node_proxy->interface = &client_node_interface;
impl->node_proxy->event = &client_node_events;
impl->node_proxy->marshall = &client_node_marshall;
ccn.seq = ++impl->seq;
ccn.name = "client-node";
ccn.props = &stream->properties->dict;
ccn.new_id = impl->node_proxy->id;
pinos_proxy_send_message (stream->context->core_proxy,
PINOS_MESSAGE_CREATE_CLIENT_NODE,
&ccn,
true);
pinos_core_do_create_client_node (stream->context->core_proxy,
++impl->seq,
"client-node",
&stream->properties->dict,
impl->node_proxy->id);
return true;
}
@ -1066,17 +1103,13 @@ bool
pinos_stream_disconnect (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageDestroy msg;
impl->disconnecting = true;
unhandle_socket (stream);
msg.seq = ++impl->seq;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_DESTROY,
&msg,
true);
pinos_client_node_do_destroy (impl->node_proxy,
++impl->seq);
return true;
}