pipewire/pinos/client/stream.c

1215 lines
33 KiB
C
Raw Normal View History

2015-06-30 18:06:36 +02:00
/* Pinos
2015-04-16 16:58:33 +02:00
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <unistd.h>
#include <sys/socket.h>
#include <string.h>
#include <sys/mman.h>
#include <errno.h>
#include <time.h>
#include "spa/lib/debug.h"
#include "pinos/client/pinos.h"
#include "pinos/client/interfaces.h"
#include "pinos/client/protocol-native.h"
#include "pinos/client/array.h"
#include "pinos/client/connection.h"
#include "pinos/client/context.h"
#include "pinos/client/stream.h"
#include "pinos/client/serialize.h"
#include "pinos/client/transport.h"
#include "pinos/client/utils.h"
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
typedef struct {
uint32_t id;
int fd;
uint32_t flags;
void *ptr;
uint32_t offset;
uint32_t size;
} MemId;
typedef struct {
uint32_t id;
bool used;
void *buf_ptr;
SpaBuffer *buf;
} BufferId;
typedef struct
2015-04-16 16:58:33 +02:00
{
PinosStream this;
2015-05-27 18:16:52 +02:00
SpaNodeState node_state;
2015-06-04 16:34:47 +02:00
uint32_t n_possible_formats;
SpaFormat **possible_formats;
2015-05-27 18:16:52 +02:00
SpaFormat *format;
SpaPortInfo port_info;
SpaDirection direction;
uint32_t port_id;
uint32_t pending_seq;
PinosStreamFlags flags;
2015-05-27 18:16:52 +02:00
PinosStreamMode mode;
int rtfd;
SpaSource *rtsocket_source;
PinosProxy *node_proxy;
bool disconnecting;
PinosListener node_proxy_destroy;
PinosTransport *trans;
SpaSource *timeout_source;
PinosArray mem_ids;
PinosArray buffer_ids;
bool in_order;
2015-04-16 16:58:33 +02:00
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
} PinosStreamImpl;
2015-04-16 16:58:33 +02:00
static void
clear_memid (MemId *mid)
{
if (mid->ptr != NULL)
munmap (mid->ptr, mid->size + mid->offset);
mid->ptr = NULL;
close (mid->fd);
}
static void
clear_mems (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *mid;
pinos_array_for_each (mid, &impl->mem_ids)
clear_memid (mid);
impl->mem_ids.size = 0;
}
static void
clear_buffers (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
pinos_signal_emit (&stream->remove_buffer, stream, bid->id);
free (bid->buf);
bid->buf = NULL;
bid->used = false;
}
impl->buffer_ids.size = 0;
impl->in_order = true;
}
2015-06-04 16:34:47 +02:00
static void
stream_set_state (PinosStream *stream,
2015-08-26 17:11:10 +02:00
PinosStreamState state,
char *error)
{
if (stream->state != state) {
if (stream->error)
free (stream->error);
stream->error = error;
stream->state = state;
pinos_signal_emit (&stream->state_changed, stream);
}
}
/**
* pinos_stream_state_as_string:
* @state: a #PinosStreamState
*
* Return the string representation of @state.
*
* Returns: the string representation of @state.
*/
const char *
pinos_stream_state_as_string (PinosStreamState state)
{
switch (state) {
case PINOS_STREAM_STATE_ERROR:
return "error";
case PINOS_STREAM_STATE_UNCONNECTED:
return "unconnected";
case PINOS_STREAM_STATE_CONNECTING:
return "connecting";
case PINOS_STREAM_STATE_CONFIGURE:
return "configure";
case PINOS_STREAM_STATE_READY:
return "ready";
case PINOS_STREAM_STATE_PAUSED:
return "paused";
case PINOS_STREAM_STATE_STREAMING:
return "streaming";
}
return "invalid-state";
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_new:
* @context: a #PinosContext
2015-04-16 16:58:33 +02:00
* @name: a stream name
2015-08-26 15:44:29 +02:00
* @properties: (transfer full): stream properties
2015-04-16 16:58:33 +02:00
*
* Make a new unconnected #PinosStream
2015-04-16 16:58:33 +02:00
*
* Returns: a new unconnected #PinosStream
2015-04-16 16:58:33 +02:00
*/
PinosStream *
pinos_stream_new (PinosContext *context,
const char *name,
PinosProperties *props)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl;
PinosStream *this;
impl = calloc (1, sizeof (PinosStreamImpl));
if (impl == NULL)
return NULL;
this = &impl->this;
pinos_log_debug ("stream %p: new", impl);
2015-08-26 15:44:29 +02:00
if (props == NULL) {
props = pinos_properties_new ("media.name", name, NULL);
} else if (!pinos_properties_get (props, "media.name")) {
pinos_properties_set (props, "media.name", name);
}
if (props == NULL)
goto no_mem;
this->properties = props;
this->context = context;
this->name = strdup (name);
pinos_signal_init (&this->destroy_signal);
pinos_signal_init (&this->state_changed);
pinos_signal_init (&this->format_changed);
pinos_signal_init (&this->add_buffer);
pinos_signal_init (&this->remove_buffer);
pinos_signal_init (&this->new_buffer);
pinos_signal_init (&this->need_buffer);
2015-08-26 15:44:29 +02:00
this->state = PINOS_STREAM_STATE_UNCONNECTED;
2015-08-26 15:44:29 +02:00
impl->node_state = SPA_NODE_STATE_INIT;
pinos_array_init (&impl->mem_ids);
pinos_array_ensure_size (&impl->mem_ids, sizeof (MemId) * 64);
pinos_array_init (&impl->buffer_ids);
pinos_array_ensure_size (&impl->buffer_ids, sizeof (BufferId) * 64);
impl->pending_seq = SPA_ID_INVALID;
2015-04-16 16:58:33 +02:00
spa_list_insert (&context->stream_list, &this->link);
2015-04-16 16:58:33 +02:00
return this;
no_mem:
free (impl);
return NULL;
2015-04-16 16:58:33 +02:00
}
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->rtsocket_source) {
pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source);
impl->rtsocket_source = NULL;
}
if (impl->timeout_source) {
pinos_loop_destroy_source (stream->context->loop, impl->timeout_source);
impl->timeout_source = NULL;
}
}
void
pinos_stream_destroy (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int i;
pinos_log_debug ("stream %p: destroy", stream);
pinos_signal_emit (&stream->destroy_signal, stream);
unhandle_socket (stream);
spa_list_remove (&stream->link);
if (impl->node_proxy)
pinos_signal_remove (&impl->node_proxy_destroy);
if (impl->possible_formats) {
for (i = 0; i < impl->n_possible_formats; i++) {
free (impl->possible_formats[i]);
}
free (impl->possible_formats);
}
if (impl->format)
free (impl->format);
if (stream->error)
free (stream->error);
clear_buffers (stream);
pinos_array_clear (&impl->buffer_ids);
clear_mems (stream);
pinos_array_clear (&impl->mem_ids);
if (stream->properties)
pinos_properties_free (stream->properties);
if (impl->trans)
pinos_transport_destroy (impl->trans);
if (stream->name)
free (stream->name);
free (impl);
}
static void
add_node_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
uint32_t max_input_ports = 0, max_output_ports = 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS)
2017-03-03 17:43:23 +01:00
max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0;
if (change_mask & PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS)
2017-03-03 17:43:23 +01:00
max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0;
2017-03-03 17:43:23 +01:00
pinos_client_node_do_update (impl->node_proxy,
change_mask,
max_input_ports,
max_output_ports,
NULL);
}
static void
add_state_change (PinosStream *stream, SpaNodeState state, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->node_state != state) {
2017-03-03 17:43:23 +01:00
impl->node_state = state;
pinos_client_node_do_state_change (impl->node_proxy,
state);
2016-09-26 12:15:52 +02:00
}
}
static void
add_port_update (PinosStream *stream, uint32_t change_mask, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
2017-03-03 17:43:23 +01:00
pinos_client_node_do_port_update (impl->node_proxy,
impl->direction,
impl->port_id,
change_mask,
impl->n_possible_formats,
(const SpaFormat **) impl->possible_formats,
(const SpaFormat *) impl->format,
2017-03-03 17:43:23 +01:00
NULL,
&impl->port_info);
}
static inline void
send_need_input (PinosStream *stream)
{
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventNeedInput ni;
uint64_t cmd = 1;
pinos_log_debug ("stream %p: need input", stream);
ni.event.type = SPA_NODE_EVENT_NEED_INPUT;
ni.event.size = sizeof (ni);
pinos_transport_add_event (impl->trans, &ni.event);
write (impl->rtfd, &cmd, 8);
#endif
}
static inline void
send_have_output (PinosStream *stream)
{
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventHaveOutput ho;
uint64_t cmd = 1;
pinos_log_debug ("stream %p: have output", stream);
ho.event.type = SPA_NODE_EVENT_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
pinos_transport_add_event (impl->trans, &ho.event);
write (impl->rtfd, &cmd, 8);
#endif
}
static void
add_request_clock_update (PinosStream *stream, bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventRequestClockUpdate rcu =
SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_INIT (SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME, 0, 0);
pinos_client_node_do_event (impl->node_proxy, (SpaNodeEvent*)&rcu);
}
static void
add_async_complete (PinosStream *stream,
uint32_t seq,
SpaResult res,
bool flush)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventAsyncComplete ac = SPA_NODE_EVENT_ASYNC_COMPLETE_INIT(seq, res);
pinos_client_node_do_event (impl->node_proxy, (SpaNodeEvent*)&ac);
}
static void
do_node_init (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
add_node_update (stream, PINOS_MESSAGE_NODE_UPDATE_MAX_INPUTS |
PINOS_MESSAGE_NODE_UPDATE_MAX_OUTPUTS,
false);
impl->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_POSSIBLE_FORMATS |
PINOS_MESSAGE_PORT_UPDATE_INFO,
false);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, true);
}
static void
on_timeout (SpaSource *source,
void *data)
{
PinosStream *stream = data;
add_request_clock_update (stream, true);
}
static MemId *
find_mem (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *mid;
pinos_array_for_each (mid, &impl->mem_ids) {
if (mid->id == id)
return mid;
}
return NULL;
}
static BufferId *
find_buffer (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->in_order && pinos_array_check_index (&impl->buffer_ids, id, BufferId)) {
return pinos_array_get_unchecked (&impl->buffer_ids, id, BufferId);
} else {
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (bid->id == id)
return bid;
}
}
return NULL;
}
static void
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_HAVE_OUTPUT:
{
int i;
//pinos_log_debug ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i];
if (input->buffer_id == SPA_ID_INVALID)
continue;
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
input->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
break;
}
case SPA_NODE_EVENT_NEED_INPUT:
//pinos_log_debug ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream);
break;
case SPA_NODE_EVENT_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->body.port_id.value != impl->port_id)
break;
if (impl->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->body.buffer_id.value)) && bid->used) {
bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->body.buffer_id.value);
}
break;
}
2016-11-25 13:38:49 +01:00
default:
pinos_log_warn ("unexpected node event %d", SPA_NODE_EVENT_TYPE (event));
2016-11-25 13:38:49 +01:00
break;
}
}
static void
on_rtsocket_condition (SpaSource *source,
int fd,
SpaIO mask,
void *data)
{
PinosStream *stream = data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pinos_log_warn ("got error");
unhandle_socket (stream);
return;
}
if (mask & SPA_IO_IN) {
SpaNodeEvent event;
uint64_t cmd;
read (impl->rtfd, &cmd, 8);
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (SPA_POD_SIZE (&event));
pinos_transport_parse_event (impl->trans, ev);
handle_rtnode_event (stream, ev);
}
}
}
static void
handle_socket (PinosStream *stream, int rtfd)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
struct timespec interval;
impl->rtfd = rtfd;
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
impl->rtfd,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP,
true,
on_rtsocket_condition,
stream);
impl->timeout_source = pinos_loop_add_timer (stream->context->loop,
on_timeout,
stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pinos_loop_update_timer (stream->context->loop,
impl->timeout_source,
NULL,
&interval,
false);
return;
}
static void
handle_node_event (PinosStream *stream,
const SpaNodeEvent *event)
{
switch (SPA_NODE_EVENT_TYPE (event)) {
case SPA_NODE_EVENT_INVALID:
case SPA_NODE_EVENT_HAVE_OUTPUT:
case SPA_NODE_EVENT_NEED_INPUT:
case SPA_NODE_EVENT_ASYNC_COMPLETE:
case SPA_NODE_EVENT_REUSE_BUFFER:
case SPA_NODE_EVENT_ERROR:
case SPA_NODE_EVENT_BUFFERING:
case SPA_NODE_EVENT_REQUEST_REFRESH:
case SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unhandled node event %d", SPA_NODE_EVENT_TYPE (event));
break;
}
}
static bool
handle_node_command (PinosStream *stream,
uint32_t seq,
const SpaNodeCommand *command)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
switch (SPA_NODE_COMMAND_TYPE (command)) {
case SPA_NODE_COMMAND_INVALID:
break;
case SPA_NODE_COMMAND_PAUSE:
{
pinos_log_debug ("stream %p: pause %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
break;
}
case SPA_NODE_COMMAND_START:
{
pinos_log_debug ("stream %p: start %d", stream, seq);
add_state_change (stream, SPA_NODE_STATE_STREAMING, false);
add_async_complete (stream, seq, SPA_RESULT_OK, true);
if (impl->direction == SPA_DIRECTION_INPUT)
send_need_input (stream);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
}
case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER:
{
pinos_log_warn ("unhandled node command %d", SPA_NODE_COMMAND_TYPE (command));
add_async_complete (stream, seq, SPA_RESULT_NOT_IMPLEMENTED, true);
break;
}
case SPA_NODE_COMMAND_CLOCK_UPDATE:
{
2016-10-24 15:30:15 +02:00
SpaNodeCommandClockUpdate *cu = (SpaNodeCommandClockUpdate *) command;
if (cu->body.flags.value & SPA_NODE_COMMAND_CLOCK_UPDATE_FLAG_LIVE) {
pinos_properties_set (stream->properties,
"pinos.latency.is-live", "1");
pinos_properties_setf (stream->properties,
"pinos.latency.min", "%"PRId64, cu->body.latency.value);
}
impl->last_ticks = cu->body.ticks.value;
impl->last_rate = cu->body.rate.value;
impl->last_monotonic = cu->body.monotonic_time.value;
break;
}
}
return true;
}
2017-03-03 17:43:23 +01:00
static void
client_node_done (void *object,
int datafd)
{
2017-03-03 17:43:23 +01:00
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
2017-03-09 19:42:35 +01:00
pinos_log_info ("strean %p: create client node done with fd %d", stream, datafd);
2017-03-03 17:43:23 +01:00
handle_socket (stream, datafd);
do_node_init (stream);
}
static void
client_node_event (void *object,
const SpaNodeEvent *event)
2017-03-03 17:43:23 +01:00
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_event (stream, event);
}
static void
client_node_add_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("add port not supported");
}
static void
client_node_remove_port (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id)
{
pinos_log_warn ("remove port not supported");
}
static void
client_node_set_format (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
SpaPortFormatFlags flags,
const SpaFormat *format)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
2017-03-03 17:43:23 +01:00
if (impl->format)
free (impl->format);
impl->format = spa_format_copy (format);
2017-03-03 17:43:23 +01:00
impl->pending_seq = seq;
2017-03-03 17:43:23 +01:00
pinos_signal_emit (&stream->format_changed, stream, impl->format);
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
2017-03-03 17:43:23 +01:00
static void
client_node_set_property (void *object,
uint32_t seq,
uint32_t id,
uint32_t size,
const void *value)
2017-03-03 17:43:23 +01:00
{
pinos_log_warn ("set property not implemented");
}
2017-03-03 17:43:23 +01:00
static void
client_node_add_mem (void *object,
SpaDirection direction,
uint32_t port_id,
uint32_t mem_id,
SpaDataType type,
int memfd,
uint32_t flags,
uint32_t offset,
uint32_t size)
2017-03-03 17:43:23 +01:00
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
MemId *m;
2017-03-03 17:43:23 +01:00
m = find_mem (stream, mem_id);
if (m) {
pinos_log_debug ("update mem %u, fd %d, flags %d, off %d, size %d",
2017-03-03 17:43:23 +01:00
mem_id, memfd, flags, offset, size);
clear_memid (m);
} else {
m = pinos_array_add (&impl->mem_ids, sizeof (MemId));
pinos_log_debug ("add mem %u, fd %d, flags %d, off %d, size %d",
2017-03-03 17:43:23 +01:00
mem_id, memfd, flags, offset, size);
}
m->id = mem_id;
m->fd = memfd;
m->flags = flags;
m->ptr = NULL;
m->offset = offset;
m->size = size;
}
2017-03-03 17:43:23 +01:00
static void
client_node_use_buffers (void *object,
uint32_t seq,
SpaDirection direction,
uint32_t port_id,
uint32_t n_buffers,
2017-03-03 17:43:23 +01:00
PinosClientNodeBuffer *buffers)
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
uint32_t i, j, len;
2017-03-03 17:43:23 +01:00
SpaBuffer *b;
2017-03-03 17:43:23 +01:00
/* clear previous buffers */
clear_buffers (stream);
for (i = 0; i < n_buffers; i++) {
off_t offset = 0;
MemId *mid = find_mem (stream, buffers[i].mem_id);
if (mid == NULL) {
pinos_log_warn ("unknown memory id %u", buffers[i].mem_id);
continue;
}
2017-03-03 17:43:23 +01:00
if (mid->ptr == NULL) {
mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0);
if (mid->ptr == MAP_FAILED) {
mid->ptr = NULL;
pinos_log_warn ("Failed to mmap memory %d %p: %s", mid->size, mid, strerror (errno));
2017-03-03 17:43:23 +01:00
continue;
}
2017-03-03 17:43:23 +01:00
mid->ptr = SPA_MEMBER (mid->ptr, mid->offset, void);
}
2017-03-03 17:43:23 +01:00
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
b = buffers[i].buffer;
bid->buf_ptr = SPA_MEMBER (mid->ptr, buffers[i].offset, void);
{
2017-03-03 17:43:23 +01:00
size_t size;
2017-03-03 17:43:23 +01:00
size = pinos_serialize_buffer_get_size (buffers[i].buffer);
b = bid->buf = malloc (size);
pinos_serialize_buffer_copy_into (b, buffers[i].buffer);
}
bid->id = b->id;
2017-03-03 17:43:23 +01:00
if (bid->id != len) {
pinos_log_warn ("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = false;
}
pinos_log_debug ("add buffer %d %d %u", mid->id, bid->id, buffers[i].offset);
2017-03-03 17:43:23 +01:00
for (j = 0; j < b->n_metas; j++) {
SpaMeta *m = &b->metas[j];
m->data = SPA_MEMBER (bid->buf_ptr, offset, void);
offset += m->size;
}
2017-03-03 17:43:23 +01:00
for (j = 0; j < b->n_datas; j++) {
SpaData *d = &b->datas[j];
2017-03-03 17:43:23 +01:00
d->chunk = SPA_MEMBER (bid->buf_ptr, offset + sizeof (SpaChunk) * j, SpaChunk);
2017-03-03 17:43:23 +01:00
switch (d->type) {
case SPA_DATA_TYPE_ID:
{
2017-03-03 17:43:23 +01:00
MemId *bmid = find_mem (stream, SPA_PTR_TO_UINT32 (d->data));
d->type = SPA_DATA_TYPE_MEMFD;
d->data = NULL;
d->fd = bmid->fd;
pinos_log_debug (" data %d %u -> fd %d", j, bmid->id, bmid->fd);
break;
}
2017-03-03 17:43:23 +01:00
case SPA_DATA_TYPE_MEMPTR:
{
d->data = SPA_MEMBER (bid->buf_ptr, SPA_PTR_TO_INT (d->data), void);
d->fd = -1;
pinos_log_debug (" data %d %u -> mem %p", j, bid->id, d->data);
break;
}
2017-03-03 17:43:23 +01:00
default:
pinos_log_warn ("unknown buffer data type %d", d->type);
break;
}
}
pinos_signal_emit (&stream->add_buffer, stream, bid->id);
}
2017-03-03 17:43:23 +01:00
if (n_buffers) {
add_state_change (stream, SPA_NODE_STATE_PAUSED, false);
} else {
clear_mems (stream);
add_state_change (stream, SPA_NODE_STATE_READY, false);
}
add_async_complete (stream, seq, SPA_RESULT_OK, true);
2017-03-03 17:43:23 +01:00
if (n_buffers)
stream_set_state (stream, PINOS_STREAM_STATE_PAUSED, NULL);
else
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
}
2017-03-03 17:43:23 +01:00
static void
client_node_node_command (void *object,
uint32_t seq,
const SpaNodeCommand *command)
2017-03-03 17:43:23 +01:00
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
handle_node_command (stream, seq, command);
}
2017-03-03 17:43:23 +01:00
static void
client_node_port_command (void *object,
uint32_t port_id,
const SpaNodeCommand *command)
2017-03-03 17:43:23 +01:00
{
pinos_log_warn ("port command not supported");
}
2017-03-03 17:43:23 +01:00
static void
client_node_transport (void *object,
int memfd,
uint32_t offset,
uint32_t size)
2017-03-03 17:43:23 +01:00
{
PinosProxy *proxy = object;
PinosStream *stream = proxy->user_data;
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
PinosTransportInfo info;
2017-03-03 17:43:23 +01:00
info.memfd = memfd;
if (info.memfd == -1)
return;
info.offset = offset;
info.size = size;
2017-03-03 17:43:23 +01:00
if (impl->trans)
pinos_transport_destroy (impl->trans);
impl->trans = pinos_transport_new_from_info (&info);
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
}
static const PinosClientNodeEvents client_node_events = {
2017-03-03 17:43:23 +01:00
&client_node_done,
&client_node_event,
&client_node_add_port,
&client_node_remove_port,
&client_node_set_format,
&client_node_set_property,
&client_node_add_mem,
&client_node_use_buffers,
&client_node_node_command,
&client_node_port_command,
&client_node_transport
};
static void
on_node_proxy_destroy (PinosListener *listener,
PinosProxy *proxy)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (listener, PinosStreamImpl, node_proxy_destroy);
PinosStream *this = &impl->this;
impl->disconnecting = false;
impl->node_proxy = NULL;
pinos_signal_remove (&impl->node_proxy_destroy);
stream_set_state (this, PINOS_STREAM_STATE_UNCONNECTED, NULL);
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_connect:
* @stream: a #PinosStream
* @direction: the stream direction
2016-08-18 12:43:25 +02:00
* @mode: a #PinosStreamMode
* @port_path: the port path to connect to or %NULL to get the default port
* @flags: a #PinosStreamFlags
* @possible_formats: (transfer full): a #GPtrArray with possible accepted formats
2015-04-16 16:58:33 +02:00
*
* Connect @stream for input or output on @port_path.
2015-04-16 16:58:33 +02:00
*
2016-08-18 12:43:25 +02:00
* When @mode is #PINOS_STREAM_MODE_BUFFER, you should connect to the new-buffer
* signal and use pinos_stream_capture_buffer() to get the latest metadata and
* data.
*
* Returns: %true on success.
2015-04-16 16:58:33 +02:00
*/
bool
pinos_stream_connect (PinosStream *stream,
PinosDirection direction,
2016-08-18 12:43:25 +02:00
PinosStreamMode mode,
const char *port_path,
PinosStreamFlags flags,
uint32_t n_possible_formats,
SpaFormat **possible_formats)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->direction = direction == PINOS_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
impl->port_id = 0;
impl->mode = mode;
impl->flags = flags;
impl->n_possible_formats = n_possible_formats;
impl->possible_formats = possible_formats;
stream_set_state (stream, PINOS_STREAM_STATE_CONNECTING, NULL);
if (stream->properties == NULL)
stream->properties = pinos_properties_new (NULL, NULL);
if (port_path)
pinos_properties_set (stream->properties,
"pinos.target.node", port_path);
impl->node_proxy = pinos_proxy_new (stream->context,
SPA_ID_INVALID,
stream->context->uri.client_node);
if (impl->node_proxy == NULL)
return false;
pinos_signal_add (&impl->node_proxy->destroy_signal,
&impl->node_proxy_destroy,
on_node_proxy_destroy);
2017-03-03 17:43:23 +01:00
impl->node_proxy->user_data = stream;
impl->node_proxy->implementation = &client_node_events;
2016-11-25 13:38:49 +01:00
2017-03-03 17:43:23 +01:00
pinos_core_do_create_client_node (stream->context->core_proxy,
"client-node",
&stream->properties->dict,
impl->node_proxy->id);
return true;
}
/**
* pinos_stream_finish_format:
* @stream: a #PinosStream
* @res: a #SpaResult
* @params: an array of pointers to #SpaAllocParam
* @n_params: number of elements in @params
*
* Complete the negotiation process with result code @res.
*
* This function should be called after notification of the format.
* When @res indicates success, @params contain the parameters for the
* allocation state.
*
* Returns: %true on success
*/
bool
pinos_stream_finish_format (PinosStream *stream,
SpaResult res,
SpaAllocParam **params,
uint32_t n_params)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
impl->port_info.params = params;
impl->port_info.n_params = n_params;
if (SPA_RESULT_IS_OK (res)) {
add_port_update (stream, PINOS_MESSAGE_PORT_UPDATE_INFO |
PINOS_MESSAGE_PORT_UPDATE_FORMAT,
false);
if (impl->format) {
add_state_change (stream, SPA_NODE_STATE_READY, false);
} else {
clear_buffers (stream);
add_state_change (stream, SPA_NODE_STATE_CONFIGURE, false);
}
}
impl->port_info.params = NULL;
impl->port_info.n_params = 0;
add_async_complete (stream, impl->pending_seq, res, true);
impl->pending_seq = SPA_ID_INVALID;
return true;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
*
2016-08-18 12:43:25 +02:00
* Start capturing from @stream.
*
* Returns: %true on success.
*/
bool
2016-08-18 12:43:25 +02:00
pinos_stream_start (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %true on success.
*/
bool
pinos_stream_stop (PinosStream *stream)
{
return true;
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_disconnect:
* @stream: a #PinosStream
2015-04-16 16:58:33 +02:00
*
* Disconnect @stream.
*
* Returns: %true on success
2015-04-16 16:58:33 +02:00
*/
bool
pinos_stream_disconnect (PinosStream *stream)
2015-04-16 16:58:33 +02:00
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
2015-04-16 16:58:33 +02:00
impl->disconnecting = true;
unhandle_socket (stream);
2017-03-09 19:21:50 +01:00
pinos_client_node_do_destroy (impl->node_proxy);
return true;
2015-04-16 16:58:33 +02:00
}
bool
pinos_stream_get_time (PinosStream *stream,
PinosTime *time)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int64_t now, elapsed;
struct timespec ts;
clock_gettime (CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_TIME (&ts);
elapsed = (now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate = impl->last_rate;
return true;
}
/**
* pinos_stream_get_empty_buffer:
* @stream: a #PinosStream
*
* Get the id of an empty buffer that can be filled
*
* Returns: the id of an empty buffer or #SPA_ID_INVALID when no buffer is
* available.
*/
uint32_t
pinos_stream_get_empty_buffer (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (!bid->used)
return bid->id;
}
return SPA_ID_INVALID;
}
/**
* pinos_stream_recycle_buffer:
* @stream: a #PinosStream
* @id: a buffer id
*
* Recycle the buffer with @id.
*
* Returns: %true on success.
*/
bool
pinos_stream_recycle_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventReuseBuffer rb = SPA_NODE_EVENT_REUSE_BUFFER_INIT (impl->port_id, id);
uint64_t cmd = 1;
pinos_transport_add_event (impl->trans, (SpaNodeEvent *)&rb);
write (impl->rtfd, &cmd, 8);
return true;
}
2015-04-16 16:58:33 +02:00
/**
* pinos_stream_peek_buffer:
* @stream: a #PinosStream
* @id: the buffer id
2015-04-16 16:58:33 +02:00
*
* Get the buffer with @id from @stream. This function should be called from
* the new-buffer signal callback.
2015-04-16 16:58:33 +02:00
*
* Returns: a #SpaBuffer or %NULL when there is no buffer.
2015-04-16 16:58:33 +02:00
*/
SpaBuffer *
pinos_stream_peek_buffer (PinosStream *stream,
uint32_t id)
2015-04-16 16:58:33 +02:00
{
BufferId *bid;
2015-04-16 16:58:33 +02:00
if ((bid = find_buffer (stream, id)))
return bid->buf;
return NULL;
2015-04-16 16:58:33 +02:00
}
/**
* pinos_stream_send_buffer:
* @stream: a #PinosStream
* @id: a buffer id
* @offset: the offset in the buffer
* @size: the size in the buffer
*
* Send a buffer with @id to @stream.
*
* For provider streams, this function should be called whenever there is a new frame
* available.
*
* Returns: %true when @id was handled
*/
bool
pinos_stream_send_buffer (PinosStream *stream,
uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) {
pinos_log_debug ("can't send %u, pending buffer %u", id, impl->trans->outputs[0].buffer_id);
return false;
}
if ((bid = find_buffer (stream, id)) && !bid->used) {
bid->used = true;
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_OK;
send_have_output (stream);
} else {
pinos_log_debug ("stream %p: output %u was used", stream, id);
}
return true;
}