pipewire/pinos/client/stream.c

1138 lines
31 KiB
C
Raw Normal View History

2015-06-30 18:06:36 +02:00
/* Pinos
2015-04-16 16:58:33 +02:00
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/mman.h>
#include <errno.h>
#include "spa/lib/debug.h"
#include "pinos/client/pinos.h"
#include "pinos/client/array.h"
#include "pinos/client/connection.h"
#include "pinos/client/context.h"
#include "pinos/client/stream.h"
#include "pinos/client/format.h"
#include "pinos/client/serialize.h"
#include "pinos/client/transport.h"
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
typedef struct {
uint32_t id;
int fd;
uint32_t flags;
void *ptr;
size_t size;
} MemId;
typedef struct {
uint32_t id;
bool used;
void *buf_ptr;
SpaBuffer *buf;
SpaData *datas;
} BufferId;
typedef struct
2015-04-16 16:58:33 +02:00
{
PinosStream this;
2015-05-27 18:16:52 +02:00
SpaNodeState node_state;
2015-06-04 16:34:47 +02:00
uint32_t seq;
2015-04-16 16:58:33 +02:00
unsigned int n_possible_formats;
SpaFormat **possible_formats;
2015-05-27 18:16:52 +02:00
SpaFormat *format;
SpaPortInfo port_info;
SpaDirection direction;
uint32_t port_id;
uint32_t pending_seq;
PinosStreamFlags flags;
2015-05-27 18:16:52 +02:00
bool disconnecting;
2015-04-16 16:58:33 +02:00
PinosStreamMode mode;
int rtfd;
SpaSource *rtsocket_source;
PinosProxy *node_proxy;
PinosTransport *trans;
SpaSource *timeout_source;
PinosArray mem_ids;
PinosArray buffer_ids;
bool in_order;
2015-04-16 16:58:33 +02:00
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
} PinosStreamImpl;
2015-04-16 16:58:33 +02:00
static void
clear_buffers (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
pinos_signal_emit (&stream->remove_buffer, stream, bid->id);
bid->buf = NULL;
}
impl->buffer_ids.size = 0;
impl->in_order = true;
}
2015-06-04 16:34:47 +02:00
static void
stream_set_state (PinosStream *stream,
2015-08-26 17:11:10 +02:00
PinosStreamState state,
char *error)
{
if (stream->state != state) {
if (stream->error)
free (stream->error);
stream->error = error;
stream->state = state;
pinos_signal_emit (&stream->state_changed, stream);
}
}
/**
* pinos_stream_state_as_string:
* @state: a #PinosStreamState
*
* Return the string representation of @state.
*
* Returns: the string representation of @state.
*/
const char *
pinos_stream_state_as_string (PinosStreamState state)
{
switch (state) {
case PINOS_STREAM_STATE_ERROR:
return "error";
case PINOS_STREAM_STATE_UNCONNECTED:
return "unconnected";
case PINOS_STREAM_STATE_CONNECTING:
return "connecting";
case PINOS_STREAM_STATE_CONFIGURE:
return "configure";
case PINOS_STREAM_STATE_READY:
return "ready";
case PINOS_STREAM_STATE_PAUSED:
return "paused";
case PINOS_STREAM_STATE_STREAMING:
return "streaming";
}
return "invalid-state";
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_new:
* @context: a #PinosContext
2015-04-16 16:58:33 +02:00
* @name: a stream name
2015-08-26 15:44:29 +02:00
* @properties: (transfer full): stream properties
2015-04-16 16:58:33 +02:00
*
* Make a new unconnected #PinosStream
2015-04-16 16:58:33 +02:00
*
* Returns: a new unconnected #PinosStream
2015-04-16 16:58:33 +02:00
*/
PinosStream *
pinos_stream_new (PinosContext *context,
const char *name,
PinosProperties *props)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl;
PinosStream *this;
impl = calloc (1, sizeof (PinosStreamImpl));
this = &impl->this;
pinos_log_debug ("stream %p: new", impl);
2015-08-26 15:44:29 +02:00
this->context = context;
this->name = strdup (name);
2015-04-16 16:58:33 +02:00
if (props == NULL) {
props = pinos_properties_new ("media.name", name, NULL);
} else if (!pinos_properties_get (props, "media.name")) {
pinos_properties_set (props, "media.name", name);
}
this->properties = props;
pinos_signal_init (&this->destroy_signal);
pinos_signal_init (&this->state_changed);
pinos_signal_init (&this->format_changed);
pinos_signal_init (&this->add_buffer);
pinos_signal_init (&this->remove_buffer);
pinos_signal_init (&this->new_buffer);
2015-08-26 15:44:29 +02:00
this->state = PINOS_STREAM_STATE_UNCONNECTED;
2015-08-26 15:44:29 +02:00
impl->node_state = SPA_NODE_STATE_INIT;
pinos_array_init (&impl->mem_ids);
pinos_array_ensure_size (&impl->mem_ids, sizeof (MemId) * 64);
pinos_array_init (&impl->buffer_ids);
pinos_array_ensure_size (&impl->buffer_ids, sizeof (BufferId) * 64);
impl->pending_seq = SPA_ID_INVALID;
2015-04-16 16:58:33 +02:00
spa_list_insert (&context->stream_list, &this->link);
2015-04-16 16:58:33 +02:00
return this;
2015-04-16 16:58:33 +02:00
}
void
pinos_stream_destroy (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
pinos_log_debug ("stream %p: destroy", stream);
pinos_signal_emit (&stream->destroy_signal, stream);
spa_list_remove (&stream->link);
if (impl->format)
free (impl->format);
if (stream->error)
free (stream->error);
pinos_array_clear (&impl->buffer_ids);
pinos_array_clear (&impl->mem_ids);
if (stream->properties)
pinos_properties_free (stream->properties);
if (stream->name)
free (stream->name);
free (impl);
}
static void
add_node_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeUpdate nu = { 0, };
nu.change_mask = change_mask;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS)
nu.max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS)
nu.max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
nu.props = NULL;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_UPDATE,
&nu,
flush);
}
static void
add_state_change (PinosStream *stream, SpaNodeState state, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeStateChange sc;
if (impl->node_state != state) {
sc.state = impl->node_state = state;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_STATE_CHANGE,
&sc,
flush);
2016-09-26 12:15:52 +02:00
}
}
static void
add_port_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessagePortUpdate pu = { 0, };;
pu.direction = impl->direction;
pu.port_id = impl->port_id;
pu.change_mask = change_mask;
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS) {
pu.n_possible_formats = impl->n_possible_formats;
pu.possible_formats = impl->possible_formats;
}
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_FORMAT) {
pu.format = impl->format;
}
pu.props = NULL;
if (change_mask & PINOS_MESSAGE_PORT_UPDATE_INFO) {
pu.info = &impl->port_info;
}
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_PORT_UPDATE,
&pu,
flush);
}
static inline void
send_need_input (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
write (impl->rtfd, &cmd, 1);
}
static void
add_request_clock_update (PinosStream *stream, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeEvent cne;
SpaNodeEventRequestClockUpdate rcu;
2016-10-24 15:30:15 +02:00
cne.event = &rcu.event;
rcu.event.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE;
rcu.event.size = sizeof (rcu);
rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME;
rcu.timestamp = 0;
rcu.offset = 0;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_EVENT,
&cne,
flush);
}
static void
add_async_complete (PinosStream *stream,
uint32_t seq,
SpaResult res,
bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageNodeEvent cne;
SpaNodeEventAsyncComplete ac;
2016-10-24 15:30:15 +02:00
cne.event = &ac.event;
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ac.event.size = sizeof (ac);
ac.seq = seq;
ac.res = res;
pinos_proxy_send_message (impl->node_proxy,
PINOS_MESSAGE_NODE_EVENT,
&cne,
flush);
}
static void
do_node_init (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
add_node_update (stream, PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS |
PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS,
false);
impl->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS |
PINOS_MESSAGE_PORT_UPDATE_INFO,
false);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, true);
}
static void
on_timeout (SpaSource *source,
void *data)
{
PinosStream *stream = data;
add_request_clock_update (stream, true);
}
static MemId *
find_mem (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *mid;
pinos_array_for_each (mid, &impl->mem_ids) {
if (mid->id == id)
return mid;
}
return NULL;
}
static BufferId *
find_buffer (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->in_order && pinos_array_check_index (&impl->buffer_ids, id, BufferId)) {
return pinos_array_get_unchecked (&impl->buffer_ids, id, BufferId);
} else {
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (bid->id == id)
return bid;
}
}
return NULL;
}
static void
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unexpected node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
pinos_log_warn ("unhandled node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->port_id != impl->port_id)
break;
if (impl->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->buffer_id))) {
bid->used = FALSE;
pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id);
}
break;
}
}
}
static void
on_rtsocket_condition (SpaSource *source,
int fd,
SpaIO mask,
void *data)
{
PinosStream *stream = data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pinos_log_warn ("got error");
}
if (mask & SPA_IO_IN) {
uint8_t cmd;
int i;
read (impl->rtfd, &cmd, 1);
if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) {
BufferId *bid;
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
if (input->buffer_id == SPA_ID_INVALID)
continue;
if ((bid = find_buffer (stream, input->buffer_id))) {
for (i = 0; i < bid->buf->n_datas; i++) {
bid->buf->datas[i].size = bid->datas[i].size;
}
pinos_signal_emit (&stream->new_buffer, stream, bid->id);
}
input->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
}
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
SpaNodeEvent event;
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size);
pinos_transport_parse_event (impl->trans, ev);
handle_rtnode_event (stream, ev);
}
}
}
}
static void
handle_socket (PinosStream *stream, gint rtfd)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
struct timespec interval;
impl->rtfd = rtfd;
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
impl->rtfd,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP,
false,
on_rtsocket_condition,
stream);
pinos_log_debug ("socket %d", impl->rtfd);
impl->timeout_source = pinos_loop_add_timer (stream->context->loop,
on_timeout,
stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pinos_loop_update_timer (stream->context->loop,
impl->timeout_source,
NULL,
&interval,
false);
return;
}
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->rtsocket_source) {
pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source);
impl->rtsocket_source = NULL;
}
}
static void
handle_node_event (PinosStream *stream,
SpaNodeEvent *event)
{
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unhandled node event %d", event->type);
break;
}
}
static gboolean
handle_node_command (PinosStream *stream,
uint32_t seq,
SpaNodeCommand *command)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (command->type) {
case SPA_NODE_COMMAND_INVALID:
break;
case SPA_NODE_COMMAND_PAUSE:
{
pinos_log_debug ("stream %p: pause %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
break;
}
case SPA_NODE_COMMAND_START:
{
pinos_log_debug ("stream %p: start %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_STREAMING, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
if (impl->direction == SPA_DIRECTION_INPUT)
send_need_input (stream);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
}
case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER:
{
pinos_log_warn ("unhandled node command %d", command->type);
add_async_complete (stream, seq, SPA_RESULT_NOT_IMPLEMENTED, true);
break;
}
case SPA_NODE_COMMAND_CLOCK_UPDATE:
{
2016-10-24 15:30:15 +02:00
SpaNodeCommandClockUpdate *cu = (SpaNodeCommandClockUpdate *) command;
if (cu->flags & SPA_NODE_COMMAND_CLOCK_UPDATE_FLAG_LIVE) {
pinos_properties_set (stream->properties,
"pinos.latency.is-live", "1");
pinos_properties_setf (stream->properties,
"pinos.latency.min", "%"PRId64, cu->latency);
}
impl->last_ticks = cu->ticks;
impl->last_rate = cu->rate;
impl->last_monotonic = cu->monotonic_time;
break;
}
}
return TRUE;
}
static SpaResult
stream_dispatch_func (void *object,
PinosMessageType type,
void *message,
void *data)
{
PinosStream *stream = data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (type) {
case PINOS_MESSAGE_SYNC:
case PINOS_MESSAGE_SUBSCRIBE:
case PINOS_MESSAGE_CREATE_NODE:
case PINOS_MESSAGE_CREATE_CLIENT_NODE:
case PINOS_MESSAGE_NODE_UPDATE:
case PINOS_MESSAGE_PORT_UPDATE:
case PINOS_MESSAGE_PORT_STATUS_CHANGE:
case PINOS_MESSAGE_NODE_STATE_CHANGE:
pinos_log_warn ("got unexpected message %d", type);
break;
case PINOS_MESSAGE_NOTIFY_DONE:
pinos_log_warn ("notify done %d", type);
break;
case PINOS_MESSAGE_NOTIFY_GLOBAL:
pinos_log_warn ("notify global %d", type);
break;
case PINOS_MESSAGE_NOTIFY_GLOBAL_REMOVE:
pinos_log_warn ("notify global %d", type);
break;
case PINOS_MESSAGE_CREATE_NODE_DONE:
pinos_log_warn ("create node done %d", type);
break;
case PINOS_MESSAGE_CREATE_CLIENT_NODE_DONE:
{
PinosMessageCreateClientNodeDone *cnd = message;
pinos_log_warn ("create client node done %d", type);
handle_socket (stream, cnd->datafd);
do_node_init (stream);
break;
}
case PINOS_MESSAGE_ADD_PORT:
case PINOS_MESSAGE_REMOVE_PORT:
pinos_log_warn ("add/remove port not supported");
break;
case PINOS_MESSAGE_SET_FORMAT:
{
PinosMessageSetFormat *p = message;
gpointer mem;
if (impl->format)
free (impl->format);
mem = malloc (pinos_serialize_format_get_size (p->format));
impl->format = pinos_serialize_format_copy_into (mem, p->format);
impl->pending_seq = p->seq;
pinos_signal_emit (&stream->format_changed, stream, impl->format);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
break;
}
case PINOS_MESSAGE_SET_PROPERTY:
pinos_log_warn ("set property not implemented");
break;
case PINOS_MESSAGE_ADD_MEM:
{
PinosMessageAddMem *p = message;
MemId *m;
m = find_mem (stream, p->mem_id);
if (m) {
pinos_log_debug ("update mem %u, fd %d, flags %d, size %zd", p->mem_id, p->memfd, p->flags, p->size);
} else {
m = pinos_array_add (&impl->mem_ids, sizeof (MemId));
pinos_log_debug ("add mem %u, fd %d, flags %d, size %zd", p->mem_id, p->memfd, p->flags, p->size);
}
m->id = p->mem_id;
m->fd = p->memfd;
m->flags = p->flags;
m->ptr = NULL;
m->size = p->size;
break;
}
case PINOS_MESSAGE_USE_BUFFERS:
{
PinosMessageUseBuffers *p = message;
BufferId *bid;
unsigned int i, j, len;
SpaBuffer *b;
/* clear previous buffers */
clear_buffers (stream);
for (i = 0; i < p->n_buffers; i++) {
MemId *mid = find_mem (stream, p->buffers[i].mem_id);
if (mid == NULL) {
pinos_log_warn ("unknown memory id %u", mid->id);
continue;
}
if (mid->ptr == NULL) {
mid->ptr = mmap (NULL, mid->size, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
pinos_log_warn ("Failed to mmap memory %zd %p: %s", mid->size, mid, strerror (errno));
continue;
}
}
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->buf_ptr = SPA_MEMBER (mid->ptr, p->buffers[i].offset, void);
{
size_t size;
unsigned int i;
SpaMeta *m;
b = bid->buf_ptr;
size = sizeof (SpaBuffer);
m = SPA_MEMBER (b, SPA_PTR_TO_INT (b->metas), SpaMeta);
for (i = 0; i < b->n_metas; i++)
size += sizeof (SpaMeta) + m[i].size;
for (i = 0; i < b->n_datas; i++)
size += sizeof (SpaData);
b = bid->buf = g_memdup (bid->buf_ptr, size);
if (b->metas)
b->metas = SPA_MEMBER (b, SPA_PTR_TO_INT (b->metas), SpaMeta);
if (b->datas) {
bid->datas = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (b->datas), SpaData);
b->datas = SPA_MEMBER (b, SPA_PTR_TO_INT (b->datas), SpaData);
}
}
bid->id = b->id;
if (bid->id != len) {
pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = FALSE;
}
pinos_log_debug ("add buffer %d %d %zd", mid->id, bid->id, p->buffers[i].offset);
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
if (m->data)
m->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (m->data), void);
}
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_MEMFD;
d->data = NULL;
d->fd = bmid->fd;
pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd);
break;
}
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void);
d->fd = -1;
pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data);
break;
}
default:
pinos_log_warn ("unknown buffer data type %d", d->type);
break;
}
}
pinos_signal_emit (&stream->add_buffer, stream, bid->id);
}
if (p->n_buffers) {
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
} else {
add_state_change (stream, SPA_NODE_STATE_READY, false);
}
add_async_complete (stream, p->seq, SPA_RESULT_OK, true);
if (p->n_buffers)
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
else
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
break;
}
case PINOS_MESSAGE_NODE_EVENT:
{
PinosMessageNodeEvent *p = message;
handle_node_event (stream, p->event);
break;
}
case PINOS_MESSAGE_NODE_COMMAND:
{
PinosMessageNodeCommand *p = message;
handle_node_command (stream, p->seq, p->command);
break;
}
case PINOS_MESSAGE_PORT_COMMAND:
{
break;
}
case PINOS_MESSAGE_TRANSPORT_UPDATE:
{
PinosMessageTransportUpdate *p = message;
PinosTransportInfo info;
info.memfd = p->memfd;
if (info.memfd == -1)
break;
info.offset = p->offset;
info.size = p->size;
if (impl->trans)
pinos_transport_free (impl->trans);
impl->trans = pinos_transport_new_from_info (&info);
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
break;
}
case PINOS_MESSAGE_INVALID:
pinos_log_warn ("unhandled message %d", type);
break;
}
return SPA_RESULT_OK;
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_connect:
* @stream: a #PinosStream
* @direction: the stream direction
2016-08-18 12:43:25 +02:00
* @mode: a #PinosStreamMode
* @port_path: the port path to connect to or %NULL to get the default port
* @flags: a #PinosStreamFlags
* @possible_formats: (transfer full): a #GPtrArray with possible accepted formats
2015-04-16 16:58:33 +02:00
*
* Connect @stream for input or output on @port_path.
2015-04-16 16:58:33 +02:00
*
2016-08-18 12:43:25 +02:00
* When @mode is #PINOS_STREAM_MODE_BUFFER, you should connect to the new-buffer
* signal and use pinos_stream_capture_buffer() to get the latest metadata and
* data.
*
2015-04-16 16:58:33 +02:00
* Returns: %TRUE on success.
*/
bool
pinos_stream_connect (PinosStream *stream,
PinosDirection direction,
2016-08-18 12:43:25 +02:00
PinosStreamMode mode,
const gchar *port_path,
PinosStreamFlags flags,
unsigned int n_possible_formats,
SpaFormat **possible_formats)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosMessageCreateClientNode ccn;
SpaDict dict;
SpaDictItem items[1];
impl->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
impl->port_id = 0;
impl->mode = mode;
impl->flags = flags;
impl->n_possible_formats = n_possible_formats;
impl->possible_formats = possible_formats;
stream_set_state (stream, PINOS_STREAM_STATE_CONNECTING, NULL);
if (stream->properties == NULL)
stream->properties = pinos_properties_new (NULL, NULL);
if (port_path)
pinos_properties_set (stream->properties,
"pinos.target.node", port_path);
impl->node_proxy = pinos_proxy_new (stream->context,
SPA_ID_INVALID,
0);
impl->node_proxy->dispatch_func = stream_dispatch_func;
impl->node_proxy->dispatch_data = impl;
ccn.seq = ++impl->seq;
ccn.name = "client-node";
dict.n_items = 1;
dict.items = items;
items[0].key = "pinos.target.node";
items[0].value = port_path;
ccn.props = &dict;
ccn.id = impl->node_proxy->id;
pinos_proxy_send_message (stream->context->core_proxy,
PINOS_MESSAGE_CREATE_CLIENT_NODE,
&ccn,
true);
return true;
}
/**
* pinos_stream_finish_format:
* @stream: a #PinosStream
* @res: a #SpaResult
* @params: an array of pointers to #SpaAllocParam
* @n_params: number of elements in @params
*
* Complete the negotiation process with result code @res.
*
* This function should be called after notification of the format.
* When @res indicates success, @params contain the parameters for the
* allocation state.
*
* Returns: %TRUE on success
*/
bool
pinos_stream_finish_format (PinosStream *stream,
SpaResult res,
SpaAllocParam **params,
unsigned int n_params)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->port_info.params = params;
impl->port_info.n_params = n_params;
if (SPA_RESULT_IS_OK (res)) {
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_INFO |
PINOS_MESSAGE_PORT_UPDATE_FORMAT,
false);
if (impl->format) {
add_state_change (stream, SPA_NODE_STATE_READY, false);
} else {
clear_buffers (stream);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, false);
}
}
impl->port_info.params = NULL;
impl->port_info.n_params = 0;
add_async_complete (stream, impl->pending_seq, res, true);
impl->pending_seq = SPA_ID_INVALID;
return true;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
*
2016-08-18 12:43:25 +02:00
* Start capturing from @stream.
*
*
* Returns: %TRUE on success.
*/
bool
2016-08-18 12:43:25 +02:00
pinos_stream_start (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %TRUE on success.
*/
bool
pinos_stream_stop (PinosStream *stream)
{
return true;
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_disconnect:
* @stream: a #PinosStream
2015-04-16 16:58:33 +02:00
*
* Disconnect @stream.
*
* Returns: %TRUE on success
*/
bool
pinos_stream_disconnect (PinosStream *stream)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
2015-04-16 16:58:33 +02:00
impl->disconnecting = true;
return true;
2015-04-16 16:58:33 +02:00
}
bool
pinos_stream_get_time (PinosStream *stream,
PinosTime *time)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int64_t now, elapsed;
struct timespec ts;
clock_gettime (CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_TIME (&ts);
elapsed = (now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate = impl->last_rate;
return true;
}
/**
* pinos_stream_get_empty_buffer:
* @stream: a #PinosStream
*
* Get the id of an empty buffer that can be filled
*
* Returns: the id of an empty buffer or #SPA_ID_INVALID when no buffer is
* available.
*/
uint32_t
pinos_stream_get_empty_buffer (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (!bid->used)
return bid->id;
}
return SPA_ID_INVALID;
}
/**
* pinos_stream_recycle_buffer:
* @stream: a #PinosStream
* @id: a buffer id
*
* Recycle the buffer with @id.
*
* Returns: %TRUE on success.
*/
bool
pinos_stream_recycle_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventReuseBuffer rb;
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT;
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
rb.event.size = sizeof (rb);
rb.port_id = impl->port_id;
rb.buffer_id = id;
pinos_transport_add_event (impl->trans, &rb.event);
write (impl->rtfd, &cmd, 1);
return true;
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_peek_buffer:
* @stream: a #PinosStream
* @id: the buffer id
2015-04-16 16:58:33 +02:00
*
* Get the buffer with @id from @stream. This function should be called from
* the new-buffer signal callback.
2015-04-16 16:58:33 +02:00
*
* Returns: a #SpaBuffer or %NULL when there is no buffer.
2015-04-16 16:58:33 +02:00
*/
SpaBuffer *
pinos_stream_peek_buffer (PinosStream *stream,
uint32_t id)
2015-04-16 16:58:33 +02:00
{
BufferId *bid;
2015-04-16 16:58:33 +02:00
if ((bid = find_buffer (stream, id)))
return bid->buf;
return NULL;
2015-04-16 16:58:33 +02:00
}
/**
* pinos_stream_send_buffer:
* @stream: a #PinosStream
* @id: a buffer id
* @offset: the offset in the buffer
* @size: the size in the buffer
*
* Send a buffer with @id to @stream.
*
* For provider streams, this function should be called whenever there is a new frame
* available.
*
* Returns: %TRUE when @id was handled
*/
bool
pinos_stream_send_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
guint i;
if ((bid = find_buffer (stream, id))) {
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
bid->used = TRUE;
for (i = 0; i < bid->buf->n_datas; i++) {
bid->datas[i].size = bid->buf->datas[i].size;
}
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_OK;
write (impl->rtfd, &cmd, 1);
return true;
} else {
return true;
}
}