examples: add video upload example

Add an example of a node that makes a video available.
Improve buffer reuse in stream.
Add more video formats
This commit is contained in:
Wim Taymans 2017-04-18 17:57:04 +02:00
parent c7333c46cc
commit db16de85bb
16 changed files with 466 additions and 93 deletions

View file

@ -51,6 +51,7 @@ typedef struct {
} MemId;
typedef struct {
SpaList link;
uint32_t id;
bool used;
void *buf_ptr;
@ -72,8 +73,6 @@ typedef struct
uint32_t port_id;
uint32_t pending_seq;
PinosStreamFlags flags;
PinosStreamMode mode;
int rtfd;
@ -91,6 +90,9 @@ typedef struct
PinosArray buffer_ids;
bool in_order;
SpaList free;
bool in_need_buffer;
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
@ -130,6 +132,7 @@ clear_buffers (PinosStream *stream)
}
impl->buffer_ids.size = 0;
impl->in_order = true;
spa_list_init (&impl->free);
}
static void
@ -230,6 +233,7 @@ pinos_stream_new (PinosContext *context,
pinos_array_init (&impl->buffer_ids, 32);
pinos_array_ensure_size (&impl->buffer_ids, sizeof (BufferId) * 64);
impl->pending_seq = SPA_ID_INVALID;
spa_list_init (&impl->free);
spa_list_insert (&context->stream_list, &this->link);
@ -255,11 +259,32 @@ unhandle_socket (PinosStream *stream)
}
}
static void
set_possible_formats (PinosStream *stream,
int n_possible_formats,
SpaFormat **possible_formats)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int i;
if (impl->possible_formats) {
for (i = 0; i < impl->n_possible_formats; i++)
free (impl->possible_formats[i]);
free (impl->possible_formats);
impl->possible_formats = NULL;
}
impl->n_possible_formats = n_possible_formats;
if (n_possible_formats > 0) {
impl->possible_formats = malloc (n_possible_formats * sizeof (SpaFormat *));
for (i = 0; i < n_possible_formats; i++)
impl->possible_formats[i] = spa_format_copy (possible_formats[i]);
}
}
void
pinos_stream_destroy (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
int i;
pinos_log_debug ("stream %p: destroy", stream);
@ -272,12 +297,7 @@ pinos_stream_destroy (PinosStream *stream)
if (impl->node_proxy)
pinos_signal_remove (&impl->node_proxy_destroy);
if (impl->possible_formats) {
for (i = 0; i < impl->n_possible_formats; i++) {
free (impl->possible_formats[i]);
}
free (impl->possible_formats);
}
set_possible_formats (stream, 0, NULL);
if (impl->format)
free (impl->format);
@ -332,7 +352,7 @@ add_port_update (PinosStream *stream, uint32_t change_mask)
change_mask,
impl->n_possible_formats,
(const SpaFormat **) impl->possible_formats,
(const SpaFormat *) impl->format,
impl->format,
NULL,
&impl->port_info);
}
@ -357,18 +377,12 @@ send_need_input (PinosStream *stream)
static inline void
send_have_output (PinosStream *stream)
{
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaEventNodeHaveOutput ho;
SpaEvent ho = SPA_EVENT_INIT (stream->context->type.event_node.HaveOutput);
uint64_t cmd = 1;
pinos_log_debug ("stream %p: have output", stream);
ho.event.type = SPA_EVENT_NODE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
pinos_transport_add_event (impl->trans, &ho.event);
pinos_transport_add_event (impl->trans, &ho);
write (impl->rtfd, &cmd, 8);
#endif
}
static void
@ -451,11 +465,13 @@ find_buffer (PinosStream *stream, uint32_t id)
static inline void
reuse_buffer (PinosStream *stream, uint32_t id)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
if ((bid = find_buffer (stream, id)) && bid->used) {
pinos_log_trace ("stream %p: reuse buffer %u", stream, id);
bid->used = false;
spa_list_insert (impl->free.prev, &bid->link);
pinos_signal_emit (&stream->new_buffer, stream, id);
}
}
@ -470,8 +486,6 @@ handle_rtnode_event (PinosStream *stream,
if (SPA_EVENT_TYPE (event) == context->type.event_node.HaveOutput) {
int i;
pinos_log_trace ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortIO *input = &impl->trans->inputs[i];
@ -498,7 +512,9 @@ handle_rtnode_event (PinosStream *stream,
output->buffer_id = SPA_ID_INVALID;
}
pinos_log_trace ("stream %p: need input", stream);
impl->in_need_buffer = true;
pinos_signal_emit (&stream->need_buffer, stream);
impl->in_need_buffer = false;
}
else if (SPA_EVENT_TYPE (event) == context->type.event_node.ReuseBuffer) {
SpaEventNodeReuseBuffer *p = (SpaEventNodeReuseBuffer *) event;
@ -607,8 +623,11 @@ handle_node_command (PinosStream *stream,
if (impl->direction == SPA_DIRECTION_INPUT)
send_need_input (stream);
else
else {
impl->in_need_buffer = true;
pinos_signal_emit (&stream->need_buffer, stream);
impl->in_need_buffer = false;
}
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
}
@ -773,7 +792,13 @@ client_node_use_buffers (void *object,
}
len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
if (impl->direction == SPA_DIRECTION_OUTPUT) {
bid->used = false;
spa_list_insert (impl->free.prev, &bid->link);
}
else {
bid->used = true;
}
b = buffers[i].buffer;
@ -926,7 +951,8 @@ on_node_proxy_destroy (PinosListener *listener,
* @mode: a #PinosStreamMode
* @port_path: the port path to connect to or %NULL to get the default port
* @flags: a #PinosStreamFlags
* @possible_formats: (transfer full): a #GPtrArray with possible accepted formats
* @n_possible_formats: number of items in @possible_formats
* @possible_formats: an array with possible accepted formats
*
* Connect @stream for input or output on @port_path.
*
@ -951,10 +977,7 @@ pinos_stream_connect (PinosStream *stream,
impl->port_id = 0;
impl->mode = mode;
impl->flags = flags;
impl->n_possible_formats = n_possible_formats;
impl->possible_formats = possible_formats;
set_possible_formats (stream, n_possible_formats, possible_formats);
stream_set_state (stream, PINOS_STREAM_STATE_CONNECTING, NULL);
@ -963,6 +986,9 @@ pinos_stream_connect (PinosStream *stream,
if (port_path)
pinos_properties_set (stream->properties,
"pinos.target.node", port_path);
if (flags & PINOS_STREAM_FLAG_AUTOCONNECT)
pinos_properties_set (stream->properties,
"pinos.autoconnect", "1");
impl->node_proxy = pinos_proxy_new (stream->context,
SPA_ID_INVALID,
@ -1028,34 +1054,6 @@ pinos_stream_finish_format (PinosStream *stream,
return true;
}
/**
* pinos_stream_start:
* @stream: a #PinosStream
*
* Start capturing from @stream.
*
* Returns: %true on success.
*/
bool
pinos_stream_start (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_stop:
* @stream: a #PinosStream
*
* Stop capturing from @stream.
*
* Returns: %true on success.
*/
bool
pinos_stream_stop (PinosStream *stream)
{
return true;
}
/**
* pinos_stream_disconnect:
* @stream: a #PinosStream
@ -1111,11 +1109,12 @@ pinos_stream_get_empty_buffer (PinosStream *stream)
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid;
pinos_array_for_each (bid, &impl->buffer_ids) {
if (!bid->used)
return bid->id;
}
return SPA_ID_INVALID;
if (spa_list_is_empty (&impl->free))
return SPA_ID_INVALID;
bid = spa_list_first (&impl->free, BufferId, link);
return bid->id;
}
/**
@ -1134,8 +1133,15 @@ pinos_stream_recycle_buffer (PinosStream *stream,
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaEventNodeReuseBuffer rb = SPA_EVENT_NODE_REUSE_BUFFER_INIT (stream->context->type.event_node.ReuseBuffer,
impl->port_id, id);
BufferId *bid;
uint64_t cmd = 1;
if ((bid = find_buffer (stream, id)) == NULL || !bid->used)
return false;
bid->used = false;
spa_list_insert (impl->free.prev, &bid->link);
pinos_transport_add_event (impl->trans, (SpaEvent *)&rb);
write (impl->rtfd, &cmd, 8);
@ -1192,10 +1198,12 @@ pinos_stream_send_buffer (PinosStream *stream,
if ((bid = find_buffer (stream, id)) && !bid->used) {
bid->used = true;
spa_list_remove (&bid->link);
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_HAVE_OUTPUT;
pinos_log_trace ("stream %p: send buffer %d", stream, id);
send_have_output (stream);
if (!impl->in_need_buffer)
send_have_output (stream);
} else {
pinos_log_debug ("stream %p: output %u was used", stream, id);
}