pipewire/pinos/client/stream.c
Wim Taymans f92b68c3c3 Use types with known sizes where we can, easier to serialize
Add iterator for POD and use it to implement some demarshalling.
2017-03-07 11:56:43 +01:00

1237 lines
33 KiB
C

/* Pinos
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/mman.h>
#include <errno.h>
#include <time.h>
#include "spa/lib/debug.h"
#include "pinos/client/pinos.h"
#include "pinos/client/interfaces.h"
#include "pinos/client/protocol-native.h"
#include "pinos/client/array.h"
#include "pinos/client/connection.h"
#include "pinos/client/context.h"
#include "pinos/client/stream.h"
#include "pinos/client/serialize.h"
#include "pinos/client/transport.h"
#include "pinos/client/utils.h"
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
typedef struct {
uint32_t id;
int fd;
uint32_t flags;
void *ptr;
uint32_t offset;
uint32_t size;
} MemId;
typedef struct {
uint32_t id;
bool used;
void *buf_ptr;
SpaBuffer *buf;
} BufferId;
typedef struct
{
PinosStream this;
SpaNodeState node_state;
uint32_t seq;
uint32_t n_possible_formats;
SpaFormat **possible_formats;
SpaFormat *format;
SpaPortInfo port_info;
SpaDirection direction;
uint32_t port_id;
uint32_t pending_seq;
PinosStreamFlags flags;
PinosStreamMode mode;
int rtfd;
SpaSource *rtsocket_source;
PinosProxy *node_proxy;
bool disconnecting;
PinosListener node_proxy_destroy;
PinosTransport *trans;
SpaSource *timeout_source;
PinosArray mem_ids;
PinosArray buffer_ids;
bool in_order;
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
} PinosStreamImpl;
static void
clear_memid (MemId *mid)
{
if (mid->ptr != NULL)
munmap (mid->ptr, mid->size + mid->offset);
mid->ptr = NULL;
close (mid->fd);
}
static void
clear_mems (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *mid;
pinos_array_for_each (mid, &impl->mem_ids)
clear_memid (mid);
impl->mem_ids.size = 0;
}
static void
clear_buffers (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
pinos_signal_emit (&stream->remove_buffer, stream, bid->id);
free (bid->buf);
bid->buf = NULL;
bid->used = false;
}
impl->buffer_ids.size = 0;
impl->in_order = true;
}
static void
stream_set_state (PinosStream *stream,
PinosStreamState state,
char *error)
{
if (stream->state != state) {
if (stream->error)
free (stream->error);
stream->error = error;
stream->state = state;
pinos_signal_emit (&stream->state_changed, stream);
}
}
/**
* pinos_stream_state_as_string:
* @state: a #PinosStreamState
*
* Return the string representation of @state.
*
* Returns: the string representation of @state.
*/
const char *
pinos_stream_state_as_string (PinosStreamState state)
{
switch (state) {
case PINOS_STREAM_STATE_ERROR:
return "error";
case PINOS_STREAM_STATE_UNCONNECTED:
return "unconnected";
case PINOS_STREAM_STATE_CONNECTING:
return "connecting";
case PINOS_STREAM_STATE_CONFIGURE:
return "configure";
case PINOS_STREAM_STATE_READY:
return "ready";
case PINOS_STREAM_STATE_PAUSED:
return "paused";
case PINOS_STREAM_STATE_STREAMING:
return "streaming";
}
return "invalid-state";
}
/**
* pinos_stream_new:
* @context: a #PinosContext
* @name: a stream name
* @properties: (transfer full): stream properties
*
* Make a new unconnected #PinosStream
*
* Returns: a new unconnected #PinosStream
*/
PinosStream *
pinos_stream_new (PinosContext *context,
const char *name,
PinosProperties *props)
{
PinosStreamImpl *impl;
PinosStream *this;
impl = calloc (1, sizeof (PinosStreamImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
pinos_log_debug ("stream %p: new", impl);
if (props == NULL) {
props = pinos_properties_new ("media.name", name, NULL);
} else if (!pinos_properties_get (props, "media.name")) {
pinos_properties_set (props, "media.name", name);
}
if (props == NULL)
goto no_mem;
this->properties = props;
this->context = context;
this->name = strdup (name);
pinos_signal_init (&this->destroy_signal);
pinos_signal_init (&this->state_changed);
pinos_signal_init (&this->format_changed);
pinos_signal_init (&this->add_buffer);
pinos_signal_init (&this->remove_buffer);
pinos_signal_init (&this->new_buffer);
pinos_signal_init (&this->need_buffer);
this->state = PINOS_STREAM_STATE_UNCONNECTED;
impl->node_state = SPA_NODE_STATE_INIT;
pinos_array_init (&impl->mem_ids);
pinos_array_ensure_size (&impl->mem_ids, sizeof (MemId) * 64);
pinos_array_init (&impl->buffer_ids);
pinos_array_ensure_size (&impl->buffer_ids, sizeof (BufferId) * 64);
impl->pending_seq = SPA_ID_INVALID;
spa_list_insert (&context->stream_list, &this->link);
return this;
no_mem:
free (impl);
return NULL;
}
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->rtsocket_source) {
pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source);
impl->rtsocket_source = NULL;
}
if (impl->timeout_source) {
pinos_loop_destroy_source (stream->context->loop, impl->timeout_source);
impl->timeout_source = NULL;
}
}
void
pinos_stream_destroy (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int i;
pinos_log_debug ("stream %p: destroy", stream);
pinos_signal_emit (&stream->destroy_signal, stream);
unhandle_socket (stream);
spa_list_remove (&stream->link);
if (impl->node_proxy)
pinos_signal_remove (&impl->node_proxy_destroy);
if (impl->possible_formats) {
for (i = 0; i < impl->n_possible_formats; i++) {
free (impl->possible_formats[i]);
}
free (impl->possible_formats);
}
if (impl->format)
free (impl->format);
if (stream->error)
free (stream->error);
clear_buffers (stream);
pinos_array_clear (&impl->buffer_ids);
clear_mems (stream);
pinos_array_clear (&impl->mem_ids);
if (stream->properties)
pinos_properties_free (stream->properties);
if (impl->trans)
pinos_transport_destroy (impl->trans);
if (stream->name)
free (stream->name);
free (impl);
}
static void
add_node_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
uint32_t max_input_ports = 0, max_output_ports = 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS)
max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS)
max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
pinos_client_node_do_update (impl->node_proxy,
change_mask,
max_input_ports,
max_output_ports,
NULL);
}
static void
add_state_change (PinosStream *stream, SpaNodeState state, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->node_state != state) {
impl->node_state = state;
pinos_client_node_do_state_change (impl->node_proxy,
state);
}
}
static void
add_port_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
pinos_client_node_do_port_update (impl->node_proxy,
impl->direction,
impl->port_id,
change_mask,
impl->n_possible_formats,
(const SpaFormat **) impl->possible_formats,
(const SpaFormat *) impl->format,
NULL,
&impl->port_info);
}
static inline void
send_need_input (PinosStream *stream)
{
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventNeedInput ni;
uint64_t cmd = 1;
pinos_log_debug ("stream %p: need input", stream);
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
pinos_transport_add_event (impl->trans, &ni.event);
write (impl->rtfd, &cmd, 8);
#endif
}
static inline void
send_have_output (PinosStream *stream)
{
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventHaveOutput ho;
uint64_t cmd = 1;
pinos_log_debug ("stream %p: have output", stream);
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
pinos_transport_add_event (impl->trans, &ho.event);
write (impl->rtfd, &cmd, 8);
#endif
}
static void
add_request_clock_update (PinosStream *stream, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventRequestClockUpdate rcu;
rcu.event.type = SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE;
rcu.event.size = sizeof (rcu);
rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME;
rcu.timestamp = 0;
rcu.offset = 0;
pinos_client_node_do_event (impl->node_proxy,
&rcu.event);
}
static void
add_async_complete (PinosStream *stream,
uint32_t seq,
SpaResult res,
bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventAsyncComplete ac;
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
ac.event.size = sizeof (ac);
ac.seq = seq;
ac.res = res;
pinos_client_node_do_event (impl->node_proxy,
&ac.event);
}
static void
do_node_init (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
add_node_update (stream, PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS |
PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS,
false);
impl->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS |
PINOS_MESSAGE_PORT_UPDATE_INFO,
false);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, true);
}
static void
on_timeout (SpaSource *source,
void *data)
{
PinosStream *stream = data;
add_request_clock_update (stream, true);
}
static MemId *
find_mem (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *mid;
pinos_array_for_each (mid, &impl->mem_ids) {
if (mid->id == id)
return mid;
}
return NULL;
}
static BufferId *
find_buffer (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->in_order && pinos_array_check_index (&impl->buffer_ids, id, BufferId)) {
return pinos_array_get_unchecked (&impl->buffer_ids, id, BufferId);
} else {
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (bid->id == id)
return bid;
}
}
return NULL;
}
static void
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (event->type) {
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
{
int i;
//pinos_log_debug ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
if (input->buffer_id == SPA_ID_INVALID)
continue;
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
input->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
break;
}
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
//pinos_log_debug ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream);
break;
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->port_id != impl->port_id)
break;
if (impl->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->buffer_id)) && bid->used) {
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id);
}
break;
}
default:
pinos_log_warn ("unexpected node event %d", event->type);
break;
}
}
static void
on_rtsocket_condition (SpaSource *source,
int fd,
SpaIO mask,
void *data)
{
PinosStream *stream = data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pinos_log_warn ("got error");
unhandle_socket (stream);
return;
}
if (mask & SPA_IO_IN) {
SpaNodeEvent event;
uint64_t cmd;
read (impl->rtfd, &cmd, 8);
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size);
pinos_transport_parse_event (impl->trans, ev);
handle_rtnode_event (stream, ev);
}
}
}
static void
handle_socket (PinosStream *stream, int rtfd)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
struct timespec interval;
impl->rtfd = rtfd;
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
impl->rtfd,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP,
true,
on_rtsocket_condition,
stream);
impl->timeout_source = pinos_loop_add_timer (stream->context->loop,
on_timeout,
stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pinos_loop_update_timer (stream->context->loop,
impl->timeout_source,
NULL,
&interval,
false);
return;
}
static void
handle_node_event (PinosStream *stream,
const SpaNodeEvent *event)
{
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unhandled node event %d", event->type);
break;
}
}
static bool
handle_node_command (PinosStream *stream,
uint32_t seq,
const SpaNodeCommand *command)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (command->type) {
case SPA_NODE_COMMAND_INVALID:
break;
case SPA_NODE_COMMAND_PAUSE:
{
pinos_log_debug ("stream %p: pause %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
break;
}
case SPA_NODE_COMMAND_START:
{
pinos_log_debug ("stream %p: start %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_STREAMING, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
if (impl->direction == SPA_DIRECTION_INPUT)
send_need_input (stream);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
}
case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER:
{
pinos_log_warn ("unhandled node command %d", command->type);
add_async_complete (stream, seq, SPA_RESULT_NOT_IMPLEMENTED, true);
break;
}
case SPA_NODE_COMMAND_CLOCK_UPDATE:
{
SpaNodeCommandClockUpdate *cu = (SpaNodeCommandClockUpdate *) command;
if (cu->flags & SPA_NODE_COMMAND_CLOCK_UPDATE_FLAG_LIVE) {
pinos_properties_set (stream->properties,
"pinos.latency.is-live", "1");
pinos_properties_setf (stream->properties,
"pinos.latency.min", "%"PRId64, cu->latency);
}
impl->last_ticks = cu->ticks;
impl->last_rate = cu->rate;
impl->last_monotonic = cu->monotonic_time;
break;
}
}
return true;
}
static void
client_node_done (void *object,
uint32_t seq,
int datafd)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
pinos_log_warn ("create client node done");
handle_socket (stream, datafd);
do_node_init (stream);
}
static void
client_node_event (void *object,
const SpaNodeEvent *event)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_event (stream, event);
}
static void
client_node_add_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("add port not supported");
}
static void
client_node_remove_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("remove port not supported");
}
static void
client_node_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->format)
free (impl->format);
impl->format = spa_format_copy (format);
impl->pending_seq = seq;
pinos_signal_emit (&stream->format_changed, stream, impl->format);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
static void
client_node_set_property (void *object,
uint32_t seq,
uint32_t id,
uint32_t size,
const void *value)
{
pinos_log_warn ("set property not implemented");
}
static void
client_node_add_mem (void *object,
SpaDirection direction,
uint32_t port_id,
uint32_t mem_id,
SpaDataType type,
int memfd,
uint32_t flags,
uint32_t offset,
uint32_t size)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *m;
m = find_mem (stream, mem_id);
if (m) {
pinos_log_debug ("update mem %u, fd %d, flags %d, off %d, size %d",
mem_id, memfd, flags, offset, size);
clear_memid (m);
} else {
m = pinos_array_add (&impl->mem_ids, sizeof (MemId));
pinos_log_debug ("add mem %u, fd %d, flags %d, off %d, size %d",
mem_id, memfd, flags, offset, size);
}
m->id = mem_id;
m->fd = memfd;
m->flags = flags;
m->ptr = NULL;
m->offset = offset;
m->size = size;
}
static void
client_node_use_buffers (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
uint32_t n_buffers,
PinosClientNodeBuffer *buffers)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
uint32_t i, j, len;
SpaBuffer *b;
/* clear previous buffers */
clear_buffers (stream);
for (i = 0; i < n_buffers; i++) {
off_t offset = 0;
MemId *mid = find_mem (stream, buffers[i].mem_id);
if (mid == NULL) {
pinos_log_warn ("unknown memory id %u", buffers[i].mem_id);
continue;
}
if (mid->ptr == NULL) {
mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
pinos_log_warn ("Failed to mmap memory %d %p: %s", mid->size, mid, strerror (errno));
continue;
}
mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void);
}
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
b = buffers[i].buffer;
bid->buf_ptr = SPA_MEMBER (mid->ptr, buffers[i].offset, void);
{
size_t size;
size = pinos_serialize_buffer_get_size (buffers[i].buffer);
b = bid->buf = malloc (size);
pinos_serialize_buffer_copy_into (b, buffers[i].buffer);
}
bid->id = b->id;
if (bid->id != len) {
pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = false;
}
pinos_log_debug ("add buffer %d %d %u", mid->id, bid->id, buffers[i].offset);
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
m->data = SPA_MEMBER (bid->buf_ptr, offset, void);
offset += m->size;
}
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk);
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_MEMFD;
d->data = NULL;
d->fd = bmid->fd;
pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd);
break;
}
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void);
d->fd = -1;
pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data);
break;
}
default:
pinos_log_warn ("unknown buffer data type %d", d->type);
break;
}
}
pinos_signal_emit (&stream->add_buffer, stream, bid->id);
}
if (n_buffers) {
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
} else {
clear_mems (stream);
add_state_change (stream, SPA_NODE_STATE_READY, false);
}
add_async_complete (stream, seq, SPA_RESULT_OK, true);
if (n_buffers)
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
else
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
static void
client_node_node_command (void *object,
uint32_t seq,
const SpaNodeCommand *command)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_command (stream, seq, command);
}
static void
client_node_port_command (void *object,
uint32_t port_id,
const SpaNodeCommand *command)
{
pinos_log_warn ("port command not supported");
}
static void
client_node_transport (void *object,
int memfd,
uint32_t offset,
uint32_t size)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosTransportInfo info;
info.memfd = memfd;
if (info.memfd == -1)
return;
info.offset = offset;
info.size = size;
if (impl->trans)
pinos_transport_destroy (impl->trans);
impl->trans = pinos_transport_new_from_info (&info);
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
}
static const PinosClientNodeEvent client_node_events = {
&client_node_done,
&client_node_event,
&client_node_add_port,
&client_node_remove_port,
&client_node_set_format,
&client_node_set_property,
&client_node_add_mem,
&client_node_use_buffers,
&client_node_node_command,
&client_node_port_command,
&client_node_transport
};
static void
on_node_proxy_destroy (PinosListener *listener,
PinosProxy *proxy)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (listener, PinosStreamImpl, node_proxy_destroy);
PinosStream *this = &impl->this;
impl->disconnecting = false;
impl->node_proxy = NULL;
pinos_signal_remove (&impl->node_proxy_destroy);
stream_set_state (this, PINOS_STREAM_STATE_UNCONNECTED, NULL);
}
/**
* pinos_stream_connect:
* @stream: a #PinosStream
* @direction: the stream direction
* @mode: a #PinosStreamMode
* @port_path: the port path to connect to or %NULL to get the default port
* @flags: a #PinosStreamFlags
* @possible_formats: (transfer full): a #GPtrArray with possible accepted formats
*
* Connect @stream for input or output on @port_path.
*
* When @mode is #PINOS_STREAM_MODE_BUFFER, you should connect to the new-buffer
* signal and use pinos_stream_capture_buffer() to get the latest metadata and
* data.
*
* Returns: %true on success.
*/
bool
pinos_stream_connect (PinosStream *stream,
PinosDirection direction,
PinosStreamMode mode,
const char *port_path,
PinosStreamFlags flags,
uint32_t n_possible_formats,
SpaFormat **possible_formats)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
impl->port_id = 0;
impl->mode = mode;
impl->flags = flags;
impl->n_possible_formats = n_possible_formats;
impl->possible_formats = possible_formats;
stream_set_state (stream, PINOS_STREAM_STATE_CONNECTING, NULL);
if (stream->properties == NULL)
stream->properties = pinos_properties_new (NULL, NULL);
if (port_path)
pinos_properties_set (stream->properties,
"pinos.target.node", port_path);
impl->node_proxy = pinos_proxy_new (stream->context,
SPA_ID_INVALID,
stream->context->uri.client_node);
if (impl->node_proxy == NULL)
return false;
pinos_signal_add (&impl->node_proxy->destroy_signal,
&impl->node_proxy_destroy,
on_node_proxy_destroy);
impl->node_proxy->user_data = stream;
impl->node_proxy->event = &client_node_events;
impl->node_proxy->interface = &pinos_protocol_native_client_client_node_interface;
impl->node_proxy->demarshal = &pinos_protocol_native_client_client_node_demarshal;
pinos_core_do_create_client_node (stream->context->core_proxy,
++impl->seq,
"client-node",
&stream->properties->dict,
impl->node_proxy->id);
return true;
}
/**
* pinos_stream_finish_format:
* @stream: a #PinosStream
* @res: a #SpaResult
* @params: an array of pointers to #SpaAllocParam
* @n_params: number of elements in @params
*
* Complete the negotiation process with result code @res.
*
* This function should be called after notification of the format.
* When @res indicates success, @params contain the parameters for the
* allocation state.
*
* Returns: %true on success
*/
bool
pinos_stream_finish_format (PinosStream *stream,
SpaResult res,
SpaAllocParam **params,
uint32_t n_params)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->port_info.params = params;
impl->port_info.n_params = n_params;
if (SPA_RESULT_IS_OK (res)) {
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_INFO |
PINOS_MESSAGE_PORT_UPDATE_FORMAT,
false);
if (impl->format) {
add_state_change (stream, SPA_NODE_STATE_READY, false);
} else {
clear_buffers (stream);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, false);
}
}
impl->port_info.params = NULL;
impl->port_info.n_params = 0;
add_async_complete (stream, impl->pending_seq, res, true);
impl->pending_seq = SPA_ID_INVALID;
return true;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
*
* Start capturing from @stream.
*
* Returns: %true on success.
*/
bool
pinos_stream_start (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %true on success.
*/
bool
pinos_stream_stop (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_disconnect:
* @stream: a #PinosStream
*
* Disconnect @stream.
*
* Returns: %true on success
*/
bool
pinos_stream_disconnect (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->disconnecting = true;
unhandle_socket (stream);
pinos_client_node_do_destroy (impl->node_proxy,
++impl->seq);
return true;
}
bool
pinos_stream_get_time (PinosStream *stream,
PinosTime *time)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int64_t now, elapsed;
struct timespec ts;
clock_gettime (CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_TIME (&ts);
elapsed = (now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate = impl->last_rate;
return true;
}
/**
* pinos_stream_get_empty_buffer:
* @stream: a #PinosStream
*
* Get the id of an empty buffer that can be filled
*
* Returns: the id of an empty buffer or #SPA_ID_INVALID when no buffer is
* available.
*/
uint32_t
pinos_stream_get_empty_buffer (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (!bid->used)
return bid->id;
}
return SPA_ID_INVALID;
}
/**
* pinos_stream_recycle_buffer:
* @stream: a #PinosStream
* @id: a buffer id
*
* Recycle the buffer with @id.
*
* Returns: %true on success.
*/
bool
pinos_stream_recycle_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventReuseBuffer rb;
uint64_t cmd = 1;
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
rb.event.size = sizeof (rb);
rb.port_id = impl->port_id;
rb.buffer_id = id;
pinos_transport_add_event (impl->trans, &rb.event);
write (impl->rtfd, &cmd, 8);
return true;
}
/**
* pinos_stream_peek_buffer:
* @stream: a #PinosStream
* @id: the buffer id
*
* Get the buffer with @id from @stream. This function should be called from
* the new-buffer signal callback.
*
* Returns: a #SpaBuffer or %NULL when there is no buffer.
*/
SpaBuffer *
pinos_stream_peek_buffer (PinosStream *stream,
uint32_t id)
{
BufferId *bid;
if ((bid = find_buffer (stream, id)))
return bid->buf;
return NULL;
}
/**
* pinos_stream_send_buffer:
* @stream: a #PinosStream
* @id: a buffer id
* @offset: the offset in the buffer
* @size: the size in the buffer
*
* Send a buffer with @id to @stream.
*
* For provider streams, this function should be called whenever there is a new frame
* available.
*
* Returns: %true when @id was handled
*/
bool
pinos_stream_send_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) {
pinos_log_debug ("can't send %u, pending buffer %u", id, impl->trans->outputs[0].buffer_id);
return false;
}
if ((bid = find_buffer (stream, id)) && !bid->used) {
bid->used = true;
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_OK;
send_have_output (stream);
} else {
pinos_log_debug ("stream %p: output %u was used", stream, id);
}
return true;
}