mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-03 09:01:54 -05:00
rework subscription
Rework subscription so that we can use it for client and server. Move source and source-output to client to allow client provided sources. Still needs some work but registration seems to work partly. Rework DBUS API: move CreateSourceOutput to Client1 interface, remove Add/RemoveProvider and Device1 interface. Rework SourceOutput1 to allow for reconfigure. Add a client to test v4l2 source.
This commit is contained in:
parent
75d5fa91e2
commit
752494621c
19 changed files with 775 additions and 388 deletions
|
|
@ -33,7 +33,7 @@ struct _PvStreamPrivate
|
|||
gchar *target;
|
||||
PvStreamState state;
|
||||
|
||||
PvCapture1 *capture;
|
||||
gchar *source_output_sender;
|
||||
gchar *source_output_path;
|
||||
PvSourceOutput1 *source_output;
|
||||
|
||||
|
|
@ -275,6 +275,14 @@ pv_stream_get_state (PvStream *stream)
|
|||
return stream->priv->state;
|
||||
}
|
||||
|
||||
static void
|
||||
on_request_reconfigure (PvSourceOutput1 *interface,
|
||||
GVariant *props,
|
||||
gpointer user_data)
|
||||
{
|
||||
g_print ("on request reconfigure\n");
|
||||
}
|
||||
|
||||
static void
|
||||
on_source_output1_proxy (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
|
|
@ -285,13 +293,21 @@ on_source_output1_proxy (GObject *source_object,
|
|||
GError *error = NULL;
|
||||
|
||||
priv->source_output = pv_source_output1_proxy_new_finish (res, &error);
|
||||
if (priv->source_output == NULL) {
|
||||
if (priv->source_output == NULL)
|
||||
goto source_output_failed;
|
||||
|
||||
g_signal_connect (priv->source_output, "request-reconfigure", (GCallback) on_request_reconfigure, stream);
|
||||
stream_set_state (stream, PV_STREAM_STATE_READY);
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
source_output_failed:
|
||||
{
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to get source output proxy: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
stream_set_state (stream, PV_STREAM_STATE_READY);
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -303,22 +319,31 @@ on_source_output_created (GObject *source_object,
|
|||
PvStreamPrivate *priv = stream->priv;
|
||||
PvContext *context = priv->context;
|
||||
GError *error = NULL;
|
||||
PvClient1 *proxy;
|
||||
|
||||
if (!pv_capture1_call_create_source_output_finish (priv->capture,
|
||||
&priv->source_output_path, res, &error)) {
|
||||
proxy = PV_CLIENT1 (pv_context_get_client_proxy (priv->context));
|
||||
|
||||
if (!pv_client1_call_create_source_output_finish (proxy,
|
||||
&priv->source_output_sender, &priv->source_output_path, res, &error))
|
||||
goto create_failed;
|
||||
|
||||
pv_source_output1_proxy_new (pv_context_get_connection (context),
|
||||
G_DBUS_PROXY_FLAGS_NONE,
|
||||
priv->source_output_sender,
|
||||
priv->source_output_path,
|
||||
NULL,
|
||||
on_source_output1_proxy,
|
||||
stream);
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
create_failed:
|
||||
{
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_print ("failed to get connect capture: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
|
||||
pv_source_output1_proxy_new (pv_context_get_connection (context),
|
||||
G_DBUS_PROXY_FLAGS_NONE,
|
||||
PV_DBUS_SERVICE,
|
||||
priv->source_output_path,
|
||||
NULL,
|
||||
on_source_output1_proxy,
|
||||
stream);
|
||||
}
|
||||
|
||||
static gboolean
|
||||
|
|
@ -326,16 +351,19 @@ create_source_output (PvStream *stream)
|
|||
{
|
||||
PvStreamPrivate *priv = stream->priv;
|
||||
GVariantBuilder builder;
|
||||
PvClient1 *proxy;
|
||||
|
||||
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
|
||||
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
|
||||
|
||||
pv_capture1_call_create_source_output (priv->capture,
|
||||
priv->target ? priv->target : "/", /* const gchar *arg_source */
|
||||
g_variant_builder_end (&builder), /* GVariant *arg_props */
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_source_output_created,
|
||||
stream);
|
||||
proxy = PV_CLIENT1 (pv_context_get_client_proxy (priv->context));
|
||||
|
||||
pv_client1_call_create_source_output (proxy,
|
||||
priv->target ? priv->target : "/", /* const gchar *arg_source */
|
||||
g_variant_builder_end (&builder), /* GVariant *arg_props */
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_source_output_created,
|
||||
stream);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
|
|
@ -348,13 +376,14 @@ on_source_output_removed (GObject *source_object,
|
|||
PvStreamPrivate *priv = stream->priv;
|
||||
GError *error = NULL;
|
||||
|
||||
if (!pv_capture1_call_remove_source_output_finish (priv->capture,
|
||||
if (!pv_source_output1_call_remove_finish (priv->source_output,
|
||||
res, &error)) {
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_print ("failed to disconnect: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
g_clear_pointer (&priv->source_output_sender, g_free);
|
||||
g_clear_pointer (&priv->source_output_path, g_free);
|
||||
g_clear_object (&priv->source_output);
|
||||
}
|
||||
|
|
@ -364,35 +393,13 @@ remove_source_output (PvStream *stream)
|
|||
{
|
||||
PvStreamPrivate *priv = stream->priv;
|
||||
|
||||
pv_capture1_call_remove_source_output (priv->capture,
|
||||
priv->source_output_path,
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_source_output_removed,
|
||||
stream);
|
||||
pv_source_output1_call_remove (priv->source_output,
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_source_output_removed,
|
||||
stream);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
on_capture_proxy (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
{
|
||||
PvStream *stream = user_data;
|
||||
PvStreamPrivate *priv = stream->priv;
|
||||
GError *error = NULL;
|
||||
|
||||
priv->capture = pv_capture1_proxy_new_finish (res, &error);
|
||||
if (priv->capture == NULL) {
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to get capture proxy: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
|
||||
create_source_output (stream);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* pv_stream_connect_capture:
|
||||
* @stream: a #PvStream
|
||||
|
|
@ -420,19 +427,7 @@ pv_stream_connect_capture (PvStream *stream,
|
|||
|
||||
stream_set_state (stream, PV_STREAM_STATE_CONNECTING);
|
||||
|
||||
if (priv->capture == NULL) {
|
||||
pv_capture1_proxy_new (pv_context_get_connection (context),
|
||||
G_DBUS_PROXY_FLAGS_NONE,
|
||||
PV_DBUS_SERVICE,
|
||||
pv_context_get_client_path (context),
|
||||
NULL,
|
||||
on_capture_proxy,
|
||||
stream);
|
||||
|
||||
return TRUE;
|
||||
} else {
|
||||
return create_source_output (stream);
|
||||
}
|
||||
return create_source_output (stream);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -452,7 +447,7 @@ pv_stream_disconnect (PvStream *stream)
|
|||
g_return_val_if_fail (PV_IS_STREAM (stream), FALSE);
|
||||
priv = stream->priv;
|
||||
g_return_val_if_fail (priv->state >= PV_STREAM_STATE_READY, FALSE);
|
||||
g_return_val_if_fail (priv->capture != NULL, FALSE);
|
||||
g_return_val_if_fail (priv->source_output != NULL, FALSE);
|
||||
context = priv->context;
|
||||
g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE);
|
||||
|
||||
|
|
@ -528,12 +523,8 @@ handle_socket (PvStream *stream, gint fd)
|
|||
|
||||
g_print ("got fd %d\n", fd);
|
||||
priv->socket = g_socket_new_from_fd (fd, &error);
|
||||
if (priv->socket == NULL) {
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to create socket: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
if (priv->socket == NULL)
|
||||
goto socket_failed;
|
||||
|
||||
switch (priv->mode) {
|
||||
case PV_STREAM_MODE_SOCKET:
|
||||
|
|
@ -554,6 +545,16 @@ handle_socket (PvStream *stream, gint fd)
|
|||
default:
|
||||
break;
|
||||
}
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
socket_failed:
|
||||
{
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to create socket: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -573,9 +574,9 @@ unhandle_socket (PvStream *stream)
|
|||
}
|
||||
|
||||
static void
|
||||
on_stream_acquired (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
on_stream_started (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
{
|
||||
PvStream *stream = user_data;
|
||||
PvStreamPrivate *priv = stream->priv;
|
||||
|
|
@ -585,13 +586,12 @@ on_stream_acquired (GObject *source_object,
|
|||
GError *error = NULL;
|
||||
GVariant *result;
|
||||
|
||||
result = g_dbus_proxy_call_with_unix_fd_list_finish (G_DBUS_PROXY (priv->source_output), &out_fd_list, res, &error);
|
||||
if (result == NULL) {
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to acquire: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
result = g_dbus_proxy_call_with_unix_fd_list_finish (G_DBUS_PROXY (priv->source_output),
|
||||
&out_fd_list,
|
||||
res,
|
||||
&error);
|
||||
if (result == NULL)
|
||||
goto start_failed;
|
||||
|
||||
g_variant_get (result,
|
||||
"(h@a{sv})",
|
||||
|
|
@ -601,16 +601,32 @@ on_stream_acquired (GObject *source_object,
|
|||
g_variant_unref (result);
|
||||
g_variant_unref (out_props);
|
||||
|
||||
if ((fd = g_unix_fd_list_get (out_fd_list, fd_idx, &error)) < 0) {
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to get FD: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
if ((fd = g_unix_fd_list_get (out_fd_list, fd_idx, &error)) < 0)
|
||||
goto fd_failed;
|
||||
|
||||
handle_socket (stream, fd);
|
||||
|
||||
stream_set_state (stream, PV_STREAM_STATE_STREAMING);
|
||||
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
start_failed:
|
||||
{
|
||||
g_error ("failed to start: %s", error->message);
|
||||
goto exit_error;
|
||||
}
|
||||
fd_failed:
|
||||
{
|
||||
g_error ("failed to get FD: %s", error->message);
|
||||
goto exit_error;
|
||||
}
|
||||
exit_error:
|
||||
{
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -648,34 +664,42 @@ pv_stream_start (PvStream *stream, PvStreamMode mode)
|
|||
g_variant_builder_init (&builder, G_VARIANT_TYPE ("a{sv}"));
|
||||
g_variant_builder_add (&builder, "{sv}", "name", g_variant_new_string ("hello"));
|
||||
|
||||
pv_source_output1_call_acquire (priv->source_output,
|
||||
g_variant_builder_end (&builder), /* GVariant *arg_properties */
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_stream_acquired,
|
||||
stream);
|
||||
pv_source_output1_call_start (priv->source_output,
|
||||
g_variant_builder_end (&builder), /* GVariant *arg_properties */
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_stream_started,
|
||||
stream);
|
||||
}
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
on_stream_released (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
on_stream_stopped (GObject *source_object,
|
||||
GAsyncResult *res,
|
||||
gpointer user_data)
|
||||
{
|
||||
PvStream *stream = user_data;
|
||||
PvStreamPrivate *priv = stream->priv;
|
||||
GError *error = NULL;
|
||||
|
||||
if (!pv_source_output1_call_release_finish (priv->source_output,
|
||||
res, &error)) {
|
||||
if (!pv_source_output1_call_stop_finish (priv->source_output,
|
||||
res, &error))
|
||||
goto call_failed;
|
||||
|
||||
unhandle_socket (stream);
|
||||
|
||||
stream_set_state (stream, PV_STREAM_STATE_READY);
|
||||
|
||||
return;
|
||||
|
||||
/* ERRORS */
|
||||
call_failed:
|
||||
{
|
||||
stream_set_state (stream, PV_STREAM_STATE_ERROR);
|
||||
g_error ("failed to release: %s", error->message);
|
||||
g_clear_error (&error);
|
||||
return;
|
||||
}
|
||||
unhandle_socket (stream);
|
||||
|
||||
stream_set_state (stream, PV_STREAM_STATE_READY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -696,10 +720,10 @@ pv_stream_stop (PvStream *stream)
|
|||
priv = stream->priv;
|
||||
g_return_val_if_fail (priv->state == PV_STREAM_STATE_STREAMING, FALSE);
|
||||
|
||||
pv_source_output1_call_release (priv->source_output,
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_stream_released,
|
||||
stream);
|
||||
pv_source_output1_call_stop (priv->source_output,
|
||||
NULL, /* GCancellable *cancellable */
|
||||
on_stream_stopped,
|
||||
stream);
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue