From bf0942124086a610884b3ab9c3a5652f3a5cde16 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 12 May 2017 11:00:25 +0200 Subject: [PATCH] pinossink: improve sink --- pinos/client/stream.c | 15 +++- pinos/gst/gstpinospool.c | 47 +++++++++++- pinos/gst/gstpinossink.c | 153 ++++++++++++++++++++------------------- pinos/server/node.c | 2 - 4 files changed, 135 insertions(+), 82 deletions(-) diff --git a/pinos/client/stream.c b/pinos/client/stream.c index dd08bcb9c..fc854249c 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -123,6 +123,8 @@ clear_buffers (PinosStream *stream) PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); BufferId *bid; + pinos_log_debug ("stream %p: clear buffers", stream); + pinos_array_for_each (bid, &impl->buffer_ids) { pinos_signal_emit (&stream->remove_buffer, stream, bid->id); free (bid->buf); @@ -144,6 +146,12 @@ stream_set_state (PinosStream *stream, if (stream->error) free (stream->error); stream->error = error; + + pinos_log_debug ("stream %p: update state from %s -> %s (%s)", stream, + pinos_stream_state_as_string (stream->state), + pinos_stream_state_as_string (state), + stream->error); + stream->state = state; pinos_signal_emit (&stream->state_changed, stream); } @@ -504,7 +512,6 @@ handle_rtnode_event (PinosStream *stream, for (i = 0; i < impl->trans->area->n_outputs; i++) { SpaPortIO *output = &impl->trans->outputs[i]; - pinos_log_trace ("stream %p: buffer %d %u", stream, output->status, output->buffer_id); if (output->buffer_id == SPA_ID_INVALID) continue; @@ -664,7 +671,7 @@ client_node_done (void *object, PinosProxy *proxy = object; PinosStream *stream = proxy->user_data; - pinos_log_info ("strean %p: create client node done with fds %d %d", stream, readfd, writefd); + pinos_log_info ("stream %p: create client node done with fds %d %d", stream, readfd, writefd); handle_socket (stream, readfd, writefd); do_node_init (stream); @@ -1048,8 +1055,10 @@ pinos_stream_finish_format (PinosStream *stream, add_port_update (stream, (n_params ? PINOS_MESSAGE_PORT_UPDATE_INFO : 0) | PINOS_MESSAGE_PORT_UPDATE_FORMAT); - if (!impl->format) + if (!impl->format) { clear_buffers (stream); + clear_mems (stream); + } } impl->port_info.params = NULL; impl->port_info.n_params = 0; diff --git a/pinos/gst/gstpinospool.c b/pinos/gst/gstpinospool.c index 270537a63..6f81d30e8 100644 --- a/pinos/gst/gstpinospool.c +++ b/pinos/gst/gstpinospool.c @@ -30,6 +30,16 @@ GST_DEBUG_CATEGORY_STATIC (gst_pinos_pool_debug_category); G_DEFINE_TYPE (GstPinosPool, gst_pinos_pool, GST_TYPE_BUFFER_POOL); +enum +{ + ACTIVATED, + /* FILL ME */ + LAST_SIGNAL +}; + + +static guint pool_signals[LAST_SIGNAL] = { 0 }; + GstPinosPool * gst_pinos_pool_new (void) { @@ -57,14 +67,16 @@ gst_pinos_pool_add_buffer (GstPinosPool *pool, GstBuffer *buffer) gboolean gst_pinos_pool_remove_buffer (GstPinosPool *pool, GstBuffer *buffer) { + gboolean res; + g_return_val_if_fail (GST_IS_PINOS_POOL (pool), FALSE); g_return_val_if_fail (GST_IS_BUFFER (buffer), FALSE); GST_OBJECT_LOCK (pool); - g_queue_remove (&pool->available, buffer); + res = g_queue_remove (&pool->available, buffer); GST_OBJECT_UNLOCK (pool); - return TRUE; + return res; } static GstFlowReturn @@ -74,7 +86,13 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GstPinosPool *p = GST_PINOS_POOL (pool); GST_OBJECT_LOCK (pool); - while (p->available.length == 0) { + while (TRUE) { + if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) + goto flushing; + + if (p->available.length > 0) + break; + GST_WARNING ("queue empty"); g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); } @@ -83,6 +101,23 @@ acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GST_DEBUG ("acquire buffer %p", *buffer); return GST_FLOW_OK; + +flushing: + { + GST_OBJECT_UNLOCK (pool); + return GST_FLOW_FLUSHING; + } +} + +static void +flush_start (GstBufferPool * pool) +{ + GstPinosPool *p = GST_PINOS_POOL (pool); + + GST_DEBUG ("flush start"); + GST_OBJECT_LOCK (pool); + g_cond_signal (&p->cond); + GST_OBJECT_UNLOCK (pool); } static void @@ -100,6 +135,7 @@ release_buffer (GstBufferPool * pool, GstBuffer *buffer) static gboolean do_start (GstBufferPool * pool) { + g_signal_emit (pool, pool_signals[ACTIVATED], 0, NULL); return TRUE; } @@ -122,9 +158,14 @@ gst_pinos_pool_class_init (GstPinosPoolClass * klass) gobject_class->finalize = gst_pinos_pool_finalize; bufferpool_class->start = do_start; + bufferpool_class->flush_start = flush_start; bufferpool_class->acquire_buffer = acquire_buffer; bufferpool_class->release_buffer = release_buffer; + pool_signals[ACTIVATED] = + g_signal_new ("activated", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + 0, NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 0, G_TYPE_NONE); + GST_DEBUG_CATEGORY_INIT (gst_pinos_pool_debug_category, "pinospool", 0, "debug category for pinospool object"); } diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 1dd54200e..449b3b259 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -219,6 +219,65 @@ gst_pinos_sink_class_init (GstPinosSinkClass * klass) process_mem_data_quark = g_quark_from_static_string ("GstPinosSinkProcessMemQuark"); } + +#define PROP(f,key,type,...) \ + SPA_POD_PROP (f,key,0,type,1,__VA_ARGS__) +#define PROP_R(f,key,type,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_READONLY,type,1,__VA_ARGS__) +#define PROP_MM(f,key,type,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) +#define PROP_U_MM(f,key,type,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ + SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) +#define PROP_EN(f,key,type,n,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_RANGE_ENUM,type,n,__VA_ARGS__) +#define PROP_U_EN(f,key,type,n,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ + SPA_POD_PROP_RANGE_ENUM,type,n,__VA_ARGS__) +static void +pool_activated (GstPinosPool *pool, GstPinosSink *sink) +{ + PinosContext *ctx = sink->stream->context; + GstStructure *config; + GstCaps *caps; + guint size; + guint min_buffers; + guint max_buffers; + SpaAllocParam *port_params[3]; + SpaPODBuilder b = { NULL }; + uint8_t buffer[1024]; + SpaPODFrame f[2]; + + config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool)); + gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + + spa_pod_builder_init (&b, buffer, sizeof (buffer)); + spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_buffers.Buffers, + PROP (&f[1], ctx->type.alloc_param_buffers.size, SPA_POD_TYPE_INT, size), + PROP (&f[1], ctx->type.alloc_param_buffers.stride, SPA_POD_TYPE_INT, 0), + PROP_MM (&f[1], ctx->type.alloc_param_buffers.buffers, SPA_POD_TYPE_INT, min_buffers, min_buffers, max_buffers), + PROP (&f[1], ctx->type.alloc_param_buffers.align, SPA_POD_TYPE_INT, 16)); + port_params[0] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); + + spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_meta_enable.MetaEnable, + PROP (&f[1], ctx->type.alloc_param_meta_enable.type, SPA_POD_TYPE_ID, ctx->type.meta.Header), + PROP (&f[1], ctx->type.alloc_param_meta_enable.size, SPA_POD_TYPE_INT, sizeof (SpaMetaHeader))); + port_params[1] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); + + spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_meta_enable.MetaEnable, + PROP (&f[1], ctx->type.alloc_param_meta_enable.type, SPA_POD_TYPE_ID, ctx->type.meta.Ringbuffer), + PROP (&f[1], ctx->type.alloc_param_meta_enable.size, SPA_POD_TYPE_INT, sizeof (SpaRingbuffer)), + PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferSize, SPA_POD_TYPE_INT, + size * SPA_MAX (4, + SPA_MAX (min_buffers, max_buffers))), + PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferStride, SPA_POD_TYPE_INT, 0), + PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferBlocks, SPA_POD_TYPE_INT, 1), + PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferAlign, SPA_POD_TYPE_INT, 16)); + port_params[2] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); + + pinos_stream_finish_format (sink->stream, SPA_RESULT_OK, port_params, 2); +} + static void gst_pinos_sink_init (GstPinosSink * sink) { @@ -227,6 +286,8 @@ gst_pinos_sink_init (GstPinosSink * sink) sink->client_name = pinos_client_name(); sink->mode = DEFAULT_PROP_MODE; + g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); + sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref); @@ -427,10 +488,14 @@ on_remove_buffer (PinosListener *listener, GST_LOG_OBJECT (pinossink, "remove buffer"); buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id)); - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; - - gst_pinos_pool_remove_buffer (pinossink->pool, buf); - g_hash_table_remove (pinossink->buf_ids, GINT_TO_POINTER (id)); + if (buf) { + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + if (!gst_pinos_pool_remove_buffer (pinossink->pool, buf)) + gst_buffer_ref (buf); + if (g_queue_remove (&pinossink->queue, buf)) + gst_buffer_unref (buf); + g_hash_table_remove (pinossink->buf_ids, GINT_TO_POINTER (id)); + } } static void @@ -449,7 +514,6 @@ on_new_buffer (PinosListener *listener, buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id)); if (buf) { - gst_buffer_unref (buf); pinos_thread_main_loop_signal (pinossink->main_loop, FALSE); } } @@ -485,10 +549,11 @@ do_send_buffer (GstPinosSink *pinossink) if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) { g_warning ("can't send buffer"); - gst_buffer_unref (buffer); pinos_thread_main_loop_signal (pinossink->main_loop, FALSE); } else pinossink->need_ready--; + + gst_buffer_unref (buffer); } @@ -529,66 +594,15 @@ on_state_changed (PinosListener *listener, pinos_thread_main_loop_signal (pinossink->main_loop, FALSE); } -#define PROP(f,key,type,...) \ - SPA_POD_PROP (f,key,0,type,1,__VA_ARGS__) -#define PROP_R(f,key,type,...) \ - SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_READONLY,type,1,__VA_ARGS__) -#define PROP_MM(f,key,type,...) \ - SPA_POD_PROP (f,key,SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) -#define PROP_U_MM(f,key,type,...) \ - SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ - SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) -#define PROP_EN(f,key,type,n,...) \ - SPA_POD_PROP (f,key,SPA_POD_PROP_RANGE_ENUM,type,n,__VA_ARGS__) -#define PROP_U_EN(f,key,type,n,...) \ - SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ - SPA_POD_PROP_RANGE_ENUM,type,n,__VA_ARGS__) - static void on_format_changed (PinosListener *listener, PinosStream *stream, SpaFormat *format) { GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_format_changed); - PinosContext *ctx = stream->context; - GstStructure *config; - GstCaps *caps; - guint size; - guint min_buffers; - guint max_buffers; - SpaAllocParam *port_params[3]; - SpaPODBuilder b = { NULL }; - uint8_t buffer[1024]; - SpaPODFrame f[2]; - config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pinossink->pool)); - gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); - - spa_pod_builder_init (&b, buffer, sizeof (buffer)); - spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_buffers.Buffers, - PROP (&f[1], ctx->type.alloc_param_buffers.size, SPA_POD_TYPE_INT, size), - PROP (&f[1], ctx->type.alloc_param_buffers.stride, SPA_POD_TYPE_INT, 0), - PROP_MM (&f[1], ctx->type.alloc_param_buffers.buffers, SPA_POD_TYPE_INT, min_buffers, min_buffers, max_buffers), - PROP (&f[1], ctx->type.alloc_param_buffers.align, SPA_POD_TYPE_INT, 16)); - port_params[0] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); - - spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_meta_enable.MetaEnable, - PROP (&f[1], ctx->type.alloc_param_meta_enable.type, SPA_POD_TYPE_ID, ctx->type.meta.Header), - PROP (&f[1], ctx->type.alloc_param_meta_enable.size, SPA_POD_TYPE_INT, sizeof (SpaMetaHeader))); - port_params[1] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); - - spa_pod_builder_object (&b, &f[0], 0, ctx->type.alloc_param_meta_enable.MetaEnable, - PROP (&f[1], ctx->type.alloc_param_meta_enable.type, SPA_POD_TYPE_ID, ctx->type.meta.Ringbuffer), - PROP (&f[1], ctx->type.alloc_param_meta_enable.size, SPA_POD_TYPE_INT, sizeof (SpaRingbuffer)), - PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferSize, SPA_POD_TYPE_INT, - size * SPA_MAX (4, - SPA_MAX (min_buffers, max_buffers))), - PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferStride, SPA_POD_TYPE_INT, 0), - PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferBlocks, SPA_POD_TYPE_INT, 1), - PROP (&f[1], ctx->type.alloc_param_meta_enable.ringbufferAlign, SPA_POD_TYPE_INT, 16)); - port_params[2] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaAllocParam); - - pinos_stream_finish_format (pinossink->stream, SPA_RESULT_OK, port_params, 2); + if (gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pinossink->pool))) + pool_activated (pinossink->pool, pinossink); } static gboolean @@ -609,13 +623,6 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PINOS_STREAM_STATE_ERROR) goto start_error; - if (!gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pinossink->pool))) { - GstStructure *config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pinossink->pool)); - gst_buffer_pool_config_set_params (config, caps, 8192, 16, 32); - gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pinossink->pool), config); - gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pinossink->pool), TRUE); - } - if (state == PINOS_STREAM_STATE_UNCONNECTED) { PinosStreamFlags flags = 0; @@ -633,7 +640,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) while (TRUE) { state = pinossink->stream->state; - if (state == PINOS_STREAM_STATE_READY) + if (state == PINOS_STREAM_STATE_CONFIGURE) break; if (state == PINOS_STREAM_STATE_ERROR) @@ -678,6 +685,9 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GstBuffer *b = NULL; GstMapInfo info = { 0, }; + if (!gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pinossink->pool))) + gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pinossink->pool), TRUE); + if ((res = gst_buffer_pool_acquire_buffer (GST_BUFFER_POOL_CAST (pinossink->pool), &b, NULL)) != GST_FLOW_OK) goto done; @@ -692,8 +702,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GST_DEBUG ("push buffer in queue"); g_queue_push_tail (&pinossink->queue, buffer); -// if (pinossink->need_ready) -// do_send_buffer (pinossink); + if (pinossink->need_ready && pinossink->mode == GST_PINOS_SINK_MODE_PROVIDE) + do_send_buffer (pinossink); done: pinos_thread_main_loop_unlock (pinossink->main_loop); @@ -704,11 +714,6 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } -//streaming_error: -// { -// pinos_thread_main_loop_unlock (pinossink->main_loop); -// return GST_FLOW_ERROR; -// } } static gboolean diff --git a/pinos/server/node.c b/pinos/server/node.c index de45ae837..8d9af3918 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -371,8 +371,6 @@ on_node_have_output (SpaNode *node, void *user_data) po->status = SPA_RESULT_NEED_BUFFER; } res = spa_node_process_output (this->node); - if (res != SPA_RESULT_OK) - pinos_log_warn ("node %p: got process output %d", this, res); } static void