documentation

thread_main_loop -> thread_loop
This commit is contained in:
Wim Taymans 2017-06-01 19:25:01 +02:00
parent ebaaedef75
commit 6a3b5b1bf7
18 changed files with 578 additions and 251 deletions

View file

@ -24,6 +24,72 @@
extern "C" { extern "C" {
#endif #endif
/** \page page_client_api Client API
*
* \section sec_client_api_overview Overview
*
* The client side API allows you to connect to the PipeWire server and
* perform actions on the PipeWire graph. This includes
*
* \li introspecting the objects on the server
* \li Creating nodes
* \li Linking nodes on their ports
* \li providing media to the server for playback or consumption
* \li retrieving media from the server
*
* \section sec_client_api_loop Event Loop Abstraction
*
* Most API is asynchronous and based around an event loop. Methods will
* start an operation which will cause a state change of the \ref pw_context
* object. Connect to the state_changed signal to be notified of these
* state changes.
*
* The most convenient way to deal with the asynchronous calls is probably
* with the thread loop (See \subpage page_thread_loop for more details).
*
* \section sec_client_api_context Context
*
* \subsection ssec_context_create Create
*
* To create a new context use pw_context_new(). You will
* need to pass a \ref pw_loop implementation to use as the event loop.
*
* A typical loop would be created with pw_thread_loop_new() but
* other implementation are possible.
*
* You will also need to pass properties for the context. Use
* pw_fill_context_properties() to get a default set of properties.
*
* After creating the context, you can track the state of the context
* by listening for the state_changed signal.
*
* \subsection ssec_client_api_context_connect Connecting
*
* A context must be connected to a server before any operation can be
* issued. Calling pw_context_connect() will initiate the connection
* procedure.
*
* When connecting, the context will automatically create a registry
* proxy to get notified of server objects. This behaviour can be disabled
* by passing the \ref PW_CONTEXT_FLAG_NO_REGISTRY. You can create your
* own registry later from the core_proxy member of the context.
*
* The context will automatically create proxies for all remote objects
* and will bind to them. Use the subscription signal to reveive
* notifications about objects. You can also disable this behaviour
* with the \ref PW_CONTEXT_FLAG_NO_PROXY flag and manually bind to
* the objects you are interested in.
*
* \subsection ssec_client_api_context_functions Streams
*
* Data exchange with the PipeWire server is done with the \ref pw_stream
* object. \subpage page_streams
*
* \subsection ssec_client_api_context_disconnect Disconnect
*
* Use pw_context_disconnect() to disconnect from the server.
*/
#include <pipewire/client/map.h> #include <pipewire/client/map.h>
#include <pipewire/client/loop.h> #include <pipewire/client/loop.h>
#include <pipewire/client/properties.h> #include <pipewire/client/properties.h>
@ -42,7 +108,7 @@ enum pw_context_state {
/** Convert a \ref pw_context_state to a readable string \memberof pw_context */ /** Convert a \ref pw_context_state to a readable string \memberof pw_context */
const char *pw_context_state_as_string(enum pw_context_state state); const char *pw_context_state_as_string(enum pw_context_state state);
/** \enum pw_context_flags Extra flags passed to \ref pw_context_connect() \memberof pw_context */ /** \enum pw_context_flags Extra flags passed to pw_context_connect() \memberof pw_context */
enum pw_context_flags { enum pw_context_flags {
PW_CONTEXT_FLAG_NONE = 0, /**< no flags */ PW_CONTEXT_FLAG_NONE = 0, /**< no flags */
PW_CONTEXT_FLAG_NO_REGISTRY = (1 << 0), /**< don't create the registry object */ PW_CONTEXT_FLAG_NO_REGISTRY = (1 << 0), /**< don't create the registry object */
@ -57,6 +123,8 @@ enum pw_context_flags {
* a \ref pw_context is created and used to connect to the server. * a \ref pw_context is created and used to connect to the server.
* A \ref pw_proxy for the core object will automatically be created * A \ref pw_proxy for the core object will automatically be created
* when connecting. * when connecting.
*
* See also \ref page_client_api
*/ */
struct pw_context { struct pw_context {
char *name; /**< the application name */ char *name; /**< the application name */
@ -64,7 +132,7 @@ struct pw_context {
struct pw_type type; /**< the type map */ struct pw_type type; /**< the type map */
struct pw_loop *loop; /**< the main loop */ struct pw_loop *loop; /**< the loop */
struct pw_proxy *core_proxy; /**< proxy for the core object */ struct pw_proxy *core_proxy; /**< proxy for the core object */
struct pw_proxy *registry_proxy; /**< proxy for the registry object. Can struct pw_proxy *registry_proxy; /**< proxy for the registry object. Can

View file

@ -32,6 +32,22 @@ extern "C" {
#include <pipewire/client/introspect.h> #include <pipewire/client/introspect.h>
/**
* \page page_pipewire The PipeWire protocol
* \section page_ifaces_pipewire Interfaces
* - \subpage page_iface_pw_core - core global object
* - \subpage page_iface_pw_registry - global registry object
*/
/**
* \page page_iface_pw_core pw_core
* \section page_iface_pw_core_desc Description
*
* The core global object. This is a special singleton object. It
* is used for internal Wayland protocol features.
* \section page_iface_pw_core API
*/
#define PW_CORE_METHOD_CLIENT_UPDATE 0 #define PW_CORE_METHOD_CLIENT_UPDATE 0
#define PW_CORE_METHOD_SYNC 1 #define PW_CORE_METHOD_SYNC 1
#define PW_CORE_METHOD_GET_REGISTRY 2 #define PW_CORE_METHOD_GET_REGISTRY 2
@ -40,15 +56,14 @@ extern "C" {
#define PW_CORE_METHOD_UPDATE_TYPES 5 #define PW_CORE_METHOD_UPDATE_TYPES 5
#define PW_CORE_METHOD_NUM 6 #define PW_CORE_METHOD_NUM 6
/** \file /**
* \struct pw_core_methods
* \brief Core methods
* *
* The object interfaces * The core global object. This is a singleton object used for
* * creating new objects in the PipeWire server. It is also used
* Methods are sent from client to server and events from * for internal features.
* server to client.
*/ */
/** Core methods */
struct pw_core_methods { struct pw_core_methods {
/** /**
* Update the client properties * Update the client properties
@ -127,7 +142,10 @@ struct pw_core_methods {
#define PW_CORE_EVENT_UPDATE_TYPES 4 #define PW_CORE_EVENT_UPDATE_TYPES 4
#define PW_CORE_EVENT_NUM 5 #define PW_CORE_EVENT_NUM 5
/** Core events */ /** \struct pw_core_events
* \brief Core events
* \ingroup pw_core_interface The pw_core interface
*/
struct pw_core_events { struct pw_core_events {
/** /**
* Notify new core info * Notify new core info

View file

@ -16,7 +16,7 @@ pipewire_headers = [
'sig.h', 'sig.h',
'stream.h', 'stream.h',
'subscribe.h', 'subscribe.h',
'thread-mainloop.h', 'thread-loop.h',
'transport.h', 'transport.h',
'type.h', 'type.h',
'utils.h', 'utils.h',
@ -36,7 +36,7 @@ pipewire_sources = [
'stream.c', 'stream.c',
'pipewire.c', 'pipewire.c',
'rtkit.c', 'rtkit.c',
'thread-mainloop.c', 'thread-loop.c',
'transport.c', 'transport.c',
'type.c', 'type.c',
'utils.c', 'utils.c',

View file

@ -29,7 +29,7 @@ extern "C" {
#include <pipewire/client/log.h> #include <pipewire/client/log.h>
#include <pipewire/client/loop.h> #include <pipewire/client/loop.h>
#include <pipewire/client/mem.h> #include <pipewire/client/mem.h>
#include <pipewire/client/thread-mainloop.h> #include <pipewire/client/thread-loop.h>
#include <pipewire/client/properties.h> #include <pipewire/client/properties.h>
#include <pipewire/client/stream.h> #include <pipewire/client/stream.h>
#include <pipewire/client/subscribe.h> #include <pipewire/client/subscribe.h>
@ -37,6 +37,54 @@ extern "C" {
#include <spa/type-map.h> #include <spa/type-map.h>
/** \mainpage
*
* \section sec_intro Introduction
*
* This document describes the API for the PipeWire multimedia server.
* The API consists of two parts:
*
* \li The client side API (See \subpage page_client_api)
* \li The server side API and tools to build new modules (See
* \subpage page_server_api)
*
* \section sec_errors Error reporting
*
* Functions return either NULL or a negative int error code when an
* error occurs. Error codes are used from the SPA plugin library on
* which PipeWire is built.
*
* Some functions might return asynchronously. The error code for such
* functions is positive and SPA_RESULT_IS_ASYNC() will return true.
* SPA_RESULT_ASYNC_SEQ() can be used to get the unique sequence number
* associated with the async operation.
*
* The object returning the async result code will have some way to
* signal the completion of the async operation (with, for example, a
* callback). The sequence number can be used to see which operation
* completed.
*
* \section sec_logging Logging
*
* The 'PIPEWIRE_DEBUG' environment variable can be used to enable
* more debugging. The format is:
*
* <level>[:<category>,...]
*
* - <level>: specifies the log level:
* + `0`: no logging is enabled
* + `1`: Error logging is enabled
* + `2`: Warnings are enabled
* + `3`: Informational messages are enabled
* + `4`: Debug messages are enabled
* + `5`: Trace messages are enabled. These messages can be logged
* from the realtime threads.
*
* - <category>: Specifies a string category to enable. Many categories
* can be separated by commas. Current categories are:
* + `connection`: to log connection messages
*/
/** \class pw_pipewire /** \class pw_pipewire
* *
* \brief PipeWire initalization and infrasctructure functions * \brief PipeWire initalization and infrasctructure functions

View file

@ -29,6 +29,133 @@
extern "C" { extern "C" {
#endif #endif
/** \page page_streams Media Streams
*
* \section sec_overview Overview
*
* Media streams are used to exchange data with the PipeWire server. A
* stream is a wrapper around a \ref pw_client_node with one port.
*
* Streams can be used to:
*
* \li Consume a stream from PipeWire. This is a PW_DIRECTION_INPUT stream.
* \li Produce a stream to PipeWire. This is a PW_DIRECTION_OUTPUT stream
*
* You can connect the stream port to a specific server port or let PipeWire
* choose a port for you.
*
* For more complicated nodes such as filters or ports with multiple
* inputs and/or outputs you will need to manage the \ref pw_client_node proxy
* yourself.
*
* \section sec_create Create
*
* Make a new stream with \ref pw_stream_new(). You will need to specify
* a name for the stream and extra properties. You can use \ref
* pw_fill_stream_properties() to get a basic set of properties for the
* stream.
*
* Once the stream is created, the state_changed signal should be used to
* track the state of the stream.
*
* \section sec_connect Connect
*
* The stream is initially unconnected. To connect the stream, use
* \ref pw_stream_connect(). Pass the desired direction as an argument.
*
* \subsection ssec_stream_mode Stream modes
*
* The stream mode specifies how the data will be exchanged with PipeWire.
* The following stream modes are available
*
* \li \ref PW_STREAM_MODE_BUFFER: data is exchanged with fixed size
* buffers. This is ideal for video frames or equal sized audio
* frames.
* \li \ref PW_STREAM_MODE_RINGBUFFER: data is exhanged with a fixed
* size ringbuffer. This is ideal for variable sized audio packets
* or compressed media.
*
* \subsection ssec_stream_target Stream target
*
* To make the newly connected stream automatically connect to an existing
* PipeWire node, use the \ref PW_STREAM_FLAG_AUTOCONNECT and the port_path
* argument while connecting.
*
* \subsection ssec_stream_formats Stream formats
*
* An array of possible formats that this stream can consume or provide
* must be specified.
*
* \section sec_format Format negotiation
*
* After connecting the stream, it will transition to the \ref
* PW_STREAM_STATE_CONFIGURE state. In this state the format will be
* negotiated by the PipeWire server.
*
* Once the format has been selected, the format_changed signal is
* emited with the configured format as a parameter.
*
* The client should now prepare itself to deal with the format and
* complete the negotiation procedure with a call to \ref
* pw_stream_finish_format().
*
* As arguments to \ref pw_stream_finish_format() an array of spa_param
* structures must be given. They contain parameters such as buffer size,
* number of buffers, required metadata and other parameters for the
* media buffers.
*
* \section sec_buffers Buffer negotiation
*
* After completing the format negotiation, PipeWire will allocate and
* notify the stream of the buffers that will be used to exchange data
* between client and server.
*
* With the add_buffer signal, a stream will be notified of a new buffer
* that can be used for data transport.
*
* Afer the buffers are negotiated, the stream will transition to the
* \ref PW_STREAM_STATE_PAUSED state.
*
* \section sec_streaming Streaming
*
* From the \ref PW_STREAM_STATE_PAUSED state, the stream can be set to
* the \ref PW_STREAM_STATE_STREAMING state by the PipeWire server when
* data transport is started.
*
* Depending on how the stream was connected it will need to Produce or
* Consume data for/from PipeWire as explained in the following
* subsections.
*
* \subsection ssec_consume Consume data
*
* The new_buffer signal is emited for each new buffer can can be
* consumed.
*
* \ref pw_stream_peek_buffer() should be used to get the data and metadata
* of the buffer.
*
* When the buffer is no longer in use, call \ref pw_stream_recycle_buffer()
* to let PipeWire reuse the buffer.
*
* \subsection ssec_produce Produce data
*
* The need_buffer signal is emited when PipeWire needs a new buffer for this
* stream.
*
* \ref pw_stream_get_empty_buffer() gives the id of an empty buffer.
* Use \ref pw_stream_peek_buffer() to get the data and metadata that should
* be filled.
*
* To send the filled buffer, use \ref pw_stream_send_buffer().
*
* The new_buffer signal is emited when PipeWire no longer uses the buffer
* and it can be safely reused.
*
* \section sec_stream_disconnect Disconnect
*
* Use \ref pw_stream_disconnect() to disconnect a stream after use.
*/
/** \enum pw_stream_state The state of a stream \memberof pw_stream */ /** \enum pw_stream_state The state of a stream \memberof pw_stream */
enum pw_stream_state { enum pw_stream_state {
PW_STREAM_STATE_ERROR = -1, /**< the strean is in error */ PW_STREAM_STATE_ERROR = -1, /**< the strean is in error */
@ -47,7 +174,7 @@ const char * pw_stream_state_as_string(enum pw_stream_state state);
/** \enum pw_stream_flags Extra flags that can be used in \ref pw_stream_connect() \memberof pw_stream */ /** \enum pw_stream_flags Extra flags that can be used in \ref pw_stream_connect() \memberof pw_stream */
enum pw_stream_flags { enum pw_stream_flags {
PW_STREAM_FLAG_NONE = 0, /**< no flags */ PW_STREAM_FLAG_NONE = 0, /**< no flags */
PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< don't try to automatically connect PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< try to automatically connect
* this stream */ * this stream */
PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for
* this stream */ * this stream */
@ -72,6 +199,8 @@ struct pw_time {
* *
* The stream object provides a convenient way to send and * The stream object provides a convenient way to send and
* receive data streams from/to PipeWire. * receive data streams from/to PipeWire.
*
* See also \ref page_streams and \ref page_client_api
*/ */
struct pw_stream { struct pw_stream {
struct pw_context *context; /**< the owner context */ struct pw_context *context; /**< the owner context */
@ -109,7 +238,8 @@ struct pw_stream {
struct pw_stream * struct pw_stream *
pw_stream_new(struct pw_context *context, pw_stream_new(struct pw_context *context,
const char *name, struct pw_properties *props); const char *name,
struct pw_properties *props);
void void
pw_stream_destroy(struct pw_stream *stream); pw_stream_destroy(struct pw_stream *stream);

View file

@ -44,8 +44,7 @@ extern "C" {
#define PIPEWIRE_TYPE__Module "PipeWire:Object:Module" #define PIPEWIRE_TYPE__Module "PipeWire:Object:Module"
#define PIPEWIRE_TYPE_MODULE_BASE PIPEWIRE_TYPE__Module ":" #define PIPEWIRE_TYPE_MODULE_BASE PIPEWIRE_TYPE__Module ":"
/** \class pw_subscribe /** \enum pw_subscription_event
*
* subscription events * subscription events
*/ */
enum pw_subscription_event { enum pw_subscription_event {

View file

@ -20,11 +20,11 @@
#include <pthread.h> #include <pthread.h>
#include "pipewire.h" #include "pipewire.h"
#include "thread-mainloop.h" #include "thread-loop.h"
/** \cond */ /** \cond */
struct thread_main_loop { struct thread_loop {
struct pw_thread_main_loop this; struct pw_thread_loop this;
char *name; char *name;
@ -44,48 +44,48 @@ struct thread_main_loop {
static void pre_hook(struct spa_loop_control *ctrl, void *data) static void pre_hook(struct spa_loop_control *ctrl, void *data)
{ {
struct thread_main_loop *impl = data; struct thread_loop *impl = data;
pthread_mutex_unlock(&impl->lock); pthread_mutex_unlock(&impl->lock);
} }
static void post_hook(struct spa_loop_control *ctrl, void *data) static void post_hook(struct spa_loop_control *ctrl, void *data)
{ {
struct thread_main_loop *impl = data; struct thread_loop *impl = data;
pthread_mutex_lock(&impl->lock); pthread_mutex_lock(&impl->lock);
} }
static void do_stop(struct spa_loop_utils *utils, struct spa_source *source, void *data) static void do_stop(struct spa_loop_utils *utils, struct spa_source *source, void *data)
{ {
struct thread_main_loop *impl = data; struct thread_loop *impl = data;
impl->running = false; impl->running = false;
} }
/** Create a new \ref pw_thread_main_loop /** Create a new \ref pw_thread_loop
* *
* \param loop the loop to wrap * \param loop the loop to wrap
* \param name the name of the thread or NULL * \param name the name of the thread or NULL
* \return a newly allocated \ref pw_thread_main_loop * \return a newly allocated \ref pw_thread_loop
* *
* Make a new \ref pw_thread_main_loop that will run a mainloop on \a loop in * Make a new \ref pw_thread_loop that will run \a loop in
* a thread with \a name. * a thread with \a name.
* *
* After this function you should probably call pw_thread_main_loop_start() to * After this function you should probably call pw_thread_loop_start() to
* actually start the thread * actually start the thread
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
struct pw_thread_main_loop *pw_thread_main_loop_new(struct pw_loop *loop, const char *name) struct pw_thread_loop *pw_thread_loop_new(struct pw_loop *loop, const char *name)
{ {
struct thread_main_loop *impl; struct thread_loop *impl;
struct pw_thread_main_loop *this; struct pw_thread_loop *this;
pthread_mutexattr_t attr; pthread_mutexattr_t attr;
impl = calloc(1, sizeof(struct thread_main_loop)); impl = calloc(1, sizeof(struct thread_loop));
if (impl == NULL) if (impl == NULL)
return NULL; return NULL;
this = &impl->this; this = &impl->this;
pw_log_debug("thread-mainloop %p: new", impl); pw_log_debug("thread-loop %p: new", impl);
this->loop = loop; this->loop = loop;
this->name = name ? strdup(name) : NULL; this->name = name ? strdup(name) : NULL;
@ -105,14 +105,14 @@ struct pw_thread_main_loop *pw_thread_main_loop_new(struct pw_loop *loop, const
return this; return this;
} }
/** Destroy a threaded main loop \memberof pw_thread_main_loop */ /** Destroy a threaded loop \memberof pw_thread_loop */
void pw_thread_main_loop_destroy(struct pw_thread_main_loop *loop) void pw_thread_loop_destroy(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
pw_signal_emit(&loop->destroy_signal, loop); pw_signal_emit(&loop->destroy_signal, loop);
pw_thread_main_loop_stop(loop); pw_thread_loop_stop(loop);
if (loop->name) if (loop->name)
free(loop->name); free(loop->name);
@ -125,19 +125,19 @@ void pw_thread_main_loop_destroy(struct pw_thread_main_loop *loop)
static void *do_loop(void *user_data) static void *do_loop(void *user_data)
{ {
struct thread_main_loop *impl = user_data; struct thread_loop *impl = user_data;
struct pw_thread_main_loop *this = &impl->this; struct pw_thread_loop *this = &impl->this;
int res; int res;
pthread_mutex_lock(&impl->lock); pthread_mutex_lock(&impl->lock);
pw_log_debug("thread-mainloop %p: enter thread", this); pw_log_debug("thread-loop %p: enter thread", this);
pw_loop_enter(this->loop); pw_loop_enter(this->loop);
while (impl->running) { while (impl->running) {
if ((res = pw_loop_iterate(this->loop, -1)) < 0) if ((res = pw_loop_iterate(this->loop, -1)) < 0)
pw_log_warn("thread-mainloop %p: iterate error %d", this, res); pw_log_warn("thread-loop %p: iterate error %d", this, res);
} }
pw_log_debug("thread-mainloop %p: leave thread", this); pw_log_debug("thread-loop %p: leave thread", this);
pw_loop_leave(this->loop); pw_loop_leave(this->loop);
pthread_mutex_unlock(&impl->lock); pthread_mutex_unlock(&impl->lock);
@ -146,21 +146,21 @@ static void *do_loop(void *user_data)
/** Start the thread to handle \a loop /** Start the thread to handle \a loop
* *
* \param loop a \ref pw_thread_main_loop * \param loop a \ref pw_thread_loop
* \return \ref SPA_RESULT_OK on success * \return \ref SPA_RESULT_OK on success
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
int pw_thread_main_loop_start(struct pw_thread_main_loop *loop) int pw_thread_loop_start(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
if (!impl->running) { if (!impl->running) {
int err; int err;
impl->running = true; impl->running = true;
if ((err = pthread_create(&impl->thread, NULL, do_loop, impl)) != 0) { if ((err = pthread_create(&impl->thread, NULL, do_loop, impl)) != 0) {
pw_log_warn("thread-mainloop %p: can't create thread: %s", impl, pw_log_warn("thread-loop %p: can't create thread: %s", impl,
strerror(err)); strerror(err));
impl->running = false; impl->running = false;
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
@ -169,65 +169,65 @@ int pw_thread_main_loop_start(struct pw_thread_main_loop *loop)
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
/** Quit the main loop and stop its thread /** Quit the loop and stop its thread
* *
* \param loop a \ref pw_thread_main_loop * \param loop a \ref pw_thread_loop
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_stop(struct pw_thread_main_loop *loop) void pw_thread_loop_stop(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
pw_log_debug("thread-mainloop: %p stopping", impl); pw_log_debug("thread-loop: %p stopping", impl);
if (impl->running) { if (impl->running) {
pw_log_debug("thread-mainloop: %p signal", impl); pw_log_debug("thread-loop: %p signal", impl);
pw_loop_signal_event(loop->loop, impl->event); pw_loop_signal_event(loop->loop, impl->event);
pw_log_debug("thread-mainloop: %p join", impl); pw_log_debug("thread-loop: %p join", impl);
pthread_join(impl->thread, NULL); pthread_join(impl->thread, NULL);
pw_log_debug("thread-mainloop: %p joined", impl); pw_log_debug("thread-loop: %p joined", impl);
impl->running = false; impl->running = false;
} }
pw_log_debug("thread-mainloop: %p stopped", impl); pw_log_debug("thread-loop: %p stopped", impl);
} }
/** Lock the mutex associated with \a loop /** Lock the mutex associated with \a loop
* *
* \param loop a \ref pw_thread_main_loop * \param loop a \ref pw_thread_loop
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_lock(struct pw_thread_main_loop *loop) void pw_thread_loop_lock(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
pthread_mutex_lock(&impl->lock); pthread_mutex_lock(&impl->lock);
} }
/** Unlock the mutex associated with \a loop /** Unlock the mutex associated with \a loop
* *
* \param loop a \ref pw_thread_main_loop * \param loop a \ref pw_thread_loop
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_unlock(struct pw_thread_main_loop *loop) void pw_thread_loop_unlock(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
pthread_mutex_unlock(&impl->lock); pthread_mutex_unlock(&impl->lock);
} }
/** Signal the main thread /** Signal the thread
* *
* \param loop a \ref pw_thread_main_loop to signal * \param loop a \ref pw_thread_loop to signal
* \param wait_for_accept if we need to wait for accept * \param wait_for_accept if we need to wait for accept
* *
* Signal the main thread of \a loop. If \a wait_for_accept is true, * Signal the thread of \a loop. If \a wait_for_accept is true,
* this function waits until \ref pw_thread_main_loop_accept() is called. * this function waits until \ref pw_thread_loop_accept() is called.
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_signal(struct pw_thread_main_loop *loop, bool wait_for_accept) void pw_thread_loop_signal(struct pw_thread_loop *loop, bool wait_for_accept)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
if (impl->n_waiting > 0) if (impl->n_waiting > 0)
pthread_cond_broadcast(&impl->cond); pthread_cond_broadcast(&impl->cond);
@ -240,15 +240,15 @@ void pw_thread_main_loop_signal(struct pw_thread_main_loop *loop, bool wait_for_
} }
} }
/** Wait for the loop thread to call \ref pw_thread_main_loop_signal() /** Wait for the loop thread to call \ref pw_thread_loop_signal()
* *
* \param loop a \ref pw_thread_main_loop to signal * \param loop a \ref pw_thread_loop to signal
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_wait(struct pw_thread_main_loop *loop) void pw_thread_loop_wait(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
impl->n_waiting++; impl->n_waiting++;
@ -256,15 +256,15 @@ void pw_thread_main_loop_wait(struct pw_thread_main_loop *loop)
impl->n_waiting--; impl->n_waiting--;
} }
/** Signal the loop thread waiting for accept with \ref pw_thread_main_loop_signal() /** Signal the loop thread waiting for accept with \ref pw_thread_loop_signal()
* *
* \param loop a \ref pw_thread_main_loop to signal * \param loop a \ref pw_thread_loop to signal
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
void pw_thread_main_loop_accept(struct pw_thread_main_loop *loop) void pw_thread_loop_accept(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
impl->n_waiting_for_accept--; impl->n_waiting_for_accept--;
pthread_cond_signal(&impl->accept_cond); pthread_cond_signal(&impl->accept_cond);
@ -272,13 +272,13 @@ void pw_thread_main_loop_accept(struct pw_thread_main_loop *loop)
/** Check if we are inside the thread of the loop /** Check if we are inside the thread of the loop
* *
* \param loop a \ref pw_thread_main_loop to signal * \param loop a \ref pw_thread_loop to signal
* \return true when called inside the thread of \a loop. * \return true when called inside the thread of \a loop.
* *
* \memberof pw_thread_main_loop * \memberof pw_thread_loop
*/ */
bool pw_thread_main_loop_in_thread(struct pw_thread_main_loop *loop) bool pw_thread_loop_in_thread(struct pw_thread_loop *loop)
{ {
struct thread_main_loop *impl = SPA_CONTAINER_OF(loop, struct thread_main_loop, this); struct thread_loop *impl = SPA_CONTAINER_OF(loop, struct thread_loop, this);
return pthread_self() == impl->thread; return pthread_self() == impl->thread;
} }

View file

@ -0,0 +1,133 @@
/* PipeWire
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __PIPEWIRE_THREAD_LOOP_H__
#define __PIPEWIRE_THREAD_LOOP_H__
#include <pipewire/client/loop.h>
#ifdef __cplusplus
extern "C" {
#endif
/** \page page_thread_loop Threaded Loop
*
* \section sec_thread_loop_overview Overview
*
* The threaded loop implementation is a special wrapper around the
* regular \ref pw_loop implementation.
*
* The added feature in the threaded loop is that it spawns a new thread
* that runs the wrapped loop. This allows a synchronous application to use
* the asynchronous API without risking to stall the PipeWire library.
*
* \section sec_thread_loop_create Creation
*
* A \ref pw_thread_loop object is created using pw_thread_loop_new().
* The \ref pw_loop to wrap must be given as an argument along with the name
* for the thread that will be spawned.
*
* After allocating the object, the thread must be started with
* pw_thread_loop_start()
*
* \section sec_thread_loop_destruction Destruction
*
* When the PipeWire connection has been terminated, the thread must be
* stopped and the resources freed. Stopping the thread is done using
* pw_thread_loop_stop(), which must be called without the lock (see
* below) held. When that function returns, the thread is stopped and the
* \ref pw_thread_loop object can be freed using pw_thread_loop_destroy().
*
* \section sec_thread_loop_locking Locking
*
* Since the PipeWire API doesn't allow concurrent accesses to objects,
* a locking scheme must be used to guarantee safe usage. The threaded
* loop API provides such a scheme through the functions
* pw_thread_loop_lock() and pw_thread_loop_unlock().
*
* The lock is recursive, so it's safe to use it multiple times from the same
* thread. Just make sure you call pw_thread_loop_unlock() the same
* number of times you called pw_thread_loop_lock().
*
* The lock needs to be held whenever you call any PipeWire function that
* uses an object associated with this loop. Make sure you do not hold
* on to the lock more than necessary though, as the threaded loop stops
* while the lock is held.
*
* \section sec_thread_loop_signals Signals and Callbacks
*
* All signals and callbacks are called with the thread lock held.
*
*/
/** \class pw_thread_loop
*
* \brief PipeWire threaded loop object
*
* The threaded loop object runs a \ref pw_loop in a separate thread
* and ensures proper locking is done.
*
* All of the loop callbacks will be executed with the loop
* lock held.
*
* See also \ref page_thread_loop
*/
struct pw_thread_loop {
struct pw_loop *loop; /**< the \ref pw_loop that is wrapped */
char *name; /**< the thread name */
/** Emited when the threaded loop is destroyed */
PW_SIGNAL(destroy_signal, (struct pw_listener *listener,
struct pw_thread_loop *loop));
};
struct pw_thread_loop *
pw_thread_loop_new(struct pw_loop *loop, const char *name);
void
pw_thread_loop_destroy(struct pw_thread_loop *loop);
int
pw_thread_loop_start(struct pw_thread_loop *loop);
void
pw_thread_loop_stop(struct pw_thread_loop *loop);
void
pw_thread_loop_lock(struct pw_thread_loop *loop);
void
pw_thread_loop_unlock(struct pw_thread_loop *loop);
void
pw_thread_loop_wait(struct pw_thread_loop *loop);
void
pw_thread_loop_signal(struct pw_thread_loop *loop, bool wait_for_accept);
void
pw_thread_loop_accept(struct pw_thread_loop *loop);
bool
pw_thread_loop_in_thread(struct pw_thread_loop *loop);
#ifdef __cplusplus
}
#endif
#endif /* __PIPEWIRE_THREAD_LOOP_H__ */

View file

@ -1,82 +0,0 @@
/* PipeWire
* Copyright (C) 2015 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __PIPEWIRE_THREAD_MAIN_LOOP_H__
#define __PIPEWIRE_THREAD_MAIN_LOOP_H__
#include <pipewire/client/loop.h>
#ifdef __cplusplus
extern "C" {
#endif
/** \class pw_thread_main_loop
*
* \brief PipeWire threaded main loop object
*
* The threaded main loop object runs a \ref pw_loop in a separate thread
* and ensures proper locking is done.
*
* All of the loop callbacks will be executed with the main loop
* lock held.
*/
struct pw_thread_main_loop {
struct pw_loop *loop; /**< the \ref pw_loop that is wrapped */
char *name; /**< the thread name */
/** Emited when the mainloop is destroyed */
PW_SIGNAL(destroy_signal, (struct pw_listener *listener,
struct pw_thread_main_loop *loop));
};
struct pw_thread_main_loop *
pw_thread_main_loop_new(struct pw_loop *loop, const char *name);
void
pw_thread_main_loop_destroy(struct pw_thread_main_loop *loop);
int
pw_thread_main_loop_start(struct pw_thread_main_loop *loop);
void
pw_thread_main_loop_stop(struct pw_thread_main_loop *loop);
void
pw_thread_main_loop_lock(struct pw_thread_main_loop *loop);
void
pw_thread_main_loop_unlock(struct pw_thread_main_loop *loop);
void
pw_thread_main_loop_wait(struct pw_thread_main_loop *loop);
void
pw_thread_main_loop_signal(struct pw_thread_main_loop *loop, bool wait_for_accept);
void
pw_thread_main_loop_accept(struct pw_thread_main_loop *loop);
bool
pw_thread_main_loop_in_thread(struct pw_thread_main_loop *loop);
#ifdef __cplusplus
}
#endif
#endif /* __PIPEWIRE_THREAD_MAIN_LOOP_H__ */

View file

@ -443,7 +443,7 @@ on_context_state_changed (struct pw_listener *listener,
GST_ERROR_OBJECT (self, "context error: %s", context->error); GST_ERROR_OBJECT (self, "context error: %s", context->error);
break; break;
} }
pw_thread_main_loop_signal (self->main_loop, FALSE); pw_thread_loop_signal (self->main_loop, FALSE);
} }
static gboolean static gboolean
@ -455,17 +455,17 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
self->loop = pw_loop_new (); self->loop = pw_loop_new ();
if (!(self->main_loop = pw_thread_main_loop_new (self->loop, "pipewire-device-monitor"))) { if (!(self->main_loop = pw_thread_loop_new (self->loop, "pipewire-device-monitor"))) {
GST_ERROR_OBJECT (self, "Could not create PipeWire mainloop"); GST_ERROR_OBJECT (self, "Could not create PipeWire mainloop");
goto failed_main_loop; goto failed_main_loop;
} }
if (pw_thread_main_loop_start (self->main_loop) != SPA_RESULT_OK) { if (pw_thread_loop_start (self->main_loop) != SPA_RESULT_OK) {
GST_ERROR_OBJECT (self, "Could not start PipeWire mainloop"); GST_ERROR_OBJECT (self, "Could not start PipeWire mainloop");
goto failed_start; goto failed_start;
} }
pw_thread_main_loop_lock (self->main_loop); pw_thread_loop_lock (self->main_loop);
if (!(self->context = pw_context_new (self->loop, self->client_name, NULL))) { if (!(self->context = pw_context_new (self->loop, self->client_name, NULL))) {
GST_ERROR_OBJECT (self, "Failed to create context"); GST_ERROR_OBJECT (self, "Failed to create context");
@ -494,13 +494,13 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
break; break;
/* Wait until something happens */ /* Wait until something happens */
pw_thread_main_loop_wait (self->main_loop); pw_thread_loop_wait (self->main_loop);
} }
GST_DEBUG_OBJECT (self, "connected"); GST_DEBUG_OBJECT (self, "connected");
pw_context_get_core_info (self->context, pw_context_get_core_info (self->context,
get_core_info_cb, get_core_info_cb,
self); self);
pw_thread_main_loop_unlock (self->main_loop); pw_thread_loop_unlock (self->main_loop);
return TRUE; return TRUE;
@ -508,9 +508,9 @@ not_running:
pw_context_destroy (self->context); pw_context_destroy (self->context);
self->context = NULL; self->context = NULL;
failed_context: failed_context:
pw_thread_main_loop_unlock (self->main_loop); pw_thread_loop_unlock (self->main_loop);
failed_start: failed_start:
pw_thread_main_loop_destroy (self->main_loop); pw_thread_loop_destroy (self->main_loop);
self->main_loop = NULL; self->main_loop = NULL;
failed_main_loop: failed_main_loop:
pw_loop_destroy (self->loop); pw_loop_destroy (self->loop);
@ -529,7 +529,7 @@ gst_pipewire_device_provider_stop (GstDeviceProvider * provider)
self->context = NULL; self->context = NULL;
} }
if (self->main_loop) { if (self->main_loop) {
pw_thread_main_loop_destroy (self->main_loop); pw_thread_loop_destroy (self->main_loop);
self->main_loop = NULL; self->main_loop = NULL;
} }
if (self->loop) { if (self->loop) {

View file

@ -82,7 +82,7 @@ struct _GstPipeWireDeviceProvider {
gchar *client_name; gchar *client_name;
struct pw_loop *loop; struct pw_loop *loop;
struct pw_thread_main_loop *main_loop; struct pw_thread_loop *main_loop;
struct pw_context *context; struct pw_context *context;
struct pw_listener ctx_state_changed; struct pw_listener ctx_state_changed;

View file

@ -118,7 +118,7 @@ gst_pipewire_sink_finalize (GObject * object)
g_object_unref (pwsink->pool); g_object_unref (pwsink->pool);
pw_thread_main_loop_destroy (pwsink->main_loop); pw_thread_loop_destroy (pwsink->main_loop);
pwsink->main_loop = NULL; pwsink->main_loop = NULL;
pw_loop_destroy (pwsink->loop); pw_loop_destroy (pwsink->loop);
@ -282,9 +282,9 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
PROP (&f[1], ctx->type.param_alloc_meta_enable.ringbufferAlign, SPA_POD_TYPE_INT, 16)); PROP (&f[1], ctx->type.param_alloc_meta_enable.ringbufferAlign, SPA_POD_TYPE_INT, 16));
port_params[2] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, struct spa_param); port_params[2] = SPA_POD_BUILDER_DEREF (&b, f[0].ref, struct spa_param);
pw_thread_main_loop_lock (sink->main_loop); pw_thread_loop_lock (sink->main_loop);
pw_stream_finish_format (sink->stream, SPA_RESULT_OK, port_params, 2); pw_stream_finish_format (sink->stream, SPA_RESULT_OK, port_params, 2);
pw_thread_main_loop_unlock (sink->main_loop); pw_thread_loop_unlock (sink->main_loop);
} }
static void static void
@ -303,7 +303,7 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
g_queue_init (&sink->queue); g_queue_init (&sink->queue);
sink->loop = pw_loop_new (); sink->loop = pw_loop_new ();
sink->main_loop = pw_thread_main_loop_new (sink->loop, "pipewire-sink-loop"); sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop");
GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop); GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop);
} }
@ -484,7 +484,7 @@ on_add_buffer (struct pw_listener *listener,
gst_pipewire_pool_add_buffer (pwsink->pool, buf); gst_pipewire_pool_add_buffer (pwsink->pool, buf);
g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf); g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf);
pw_thread_main_loop_signal (pwsink->main_loop, FALSE); pw_thread_loop_signal (pwsink->main_loop, FALSE);
} }
static void static void
@ -524,7 +524,7 @@ on_new_buffer (struct pw_listener *listener,
if (buf) { if (buf) {
gst_buffer_unref (buf); gst_buffer_unref (buf);
pw_thread_main_loop_signal (pwsink->main_loop, FALSE); pw_thread_loop_signal (pwsink->main_loop, FALSE);
} }
} }
@ -559,7 +559,7 @@ do_send_buffer (GstPipeWireSink *pwsink)
if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) { if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) {
g_warning ("can't send buffer"); g_warning ("can't send buffer");
pw_thread_main_loop_signal (pwsink->main_loop, FALSE); pw_thread_loop_signal (pwsink->main_loop, FALSE);
} else } else
pwsink->need_ready--; pwsink->need_ready--;
} }
@ -599,7 +599,7 @@ on_state_changed (struct pw_listener *listener,
("stream error: %s", stream->error), (NULL)); ("stream error: %s", stream->error), (NULL));
break; break;
} }
pw_thread_main_loop_signal (pwsink->main_loop, FALSE); pw_thread_loop_signal (pwsink->main_loop, FALSE);
} }
static void static void
@ -625,7 +625,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
possible = gst_caps_to_format_all (caps); possible = gst_caps_to_format_all (caps);
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
state = pwsink->stream->state; state = pwsink->stream->state;
if (state == PW_STREAM_STATE_ERROR) if (state == PW_STREAM_STATE_ERROR)
@ -654,12 +654,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PW_STREAM_STATE_ERROR) if (state == PW_STREAM_STATE_ERROR)
goto start_error; goto start_error;
pw_thread_main_loop_wait (pwsink->main_loop); pw_thread_loop_wait (pwsink->main_loop);
} }
} }
res = TRUE; res = TRUE;
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
pwsink->negotiated = res; pwsink->negotiated = res;
@ -668,7 +668,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
start_error: start_error:
{ {
GST_ERROR ("could not start stream"); GST_ERROR ("could not start stream");
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
g_ptr_array_unref (possible); g_ptr_array_unref (possible);
return FALSE; return FALSE;
} }
@ -685,7 +685,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (!pwsink->negotiated) if (!pwsink->negotiated)
goto not_negotiated; goto not_negotiated;
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
if (pwsink->stream->state != PW_STREAM_STATE_STREAMING) if (pwsink->stream->state != PW_STREAM_STATE_STREAMING)
goto done; goto done;
@ -715,7 +715,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
do_send_buffer (pwsink); do_send_buffer (pwsink);
done: done:
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
return res; return res;
@ -754,7 +754,7 @@ gst_pipewire_sink_start (GstBaseSink * basesink)
props = NULL; props = NULL;
} }
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
pwsink->stream = pw_stream_new (pwsink->ctx, pwsink->client_name, props); pwsink->stream = pw_stream_new (pwsink->ctx, pwsink->client_name, props);
pwsink->pool->stream = pwsink->stream; pwsink->pool->stream = pwsink->stream;
@ -764,7 +764,7 @@ gst_pipewire_sink_start (GstBaseSink * basesink)
pw_signal_add (&pwsink->stream->remove_buffer, &pwsink->stream_remove_buffer, on_remove_buffer); pw_signal_add (&pwsink->stream->remove_buffer, &pwsink->stream_remove_buffer, on_remove_buffer);
pw_signal_add (&pwsink->stream->new_buffer, &pwsink->stream_new_buffer, on_new_buffer); pw_signal_add (&pwsink->stream->new_buffer, &pwsink->stream_new_buffer, on_new_buffer);
pw_signal_add (&pwsink->stream->need_buffer, &pwsink->stream_need_buffer, on_need_buffer); pw_signal_add (&pwsink->stream->need_buffer, &pwsink->stream_need_buffer, on_need_buffer);
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
return TRUE; return TRUE;
} }
@ -774,14 +774,14 @@ gst_pipewire_sink_stop (GstBaseSink * basesink)
{ {
GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink);
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
if (pwsink->stream) { if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream); pw_stream_disconnect (pwsink->stream);
pw_stream_destroy (pwsink->stream); pw_stream_destroy (pwsink->stream);
pwsink->stream = NULL; pwsink->stream = NULL;
pwsink->pool->stream = NULL; pwsink->pool->stream = NULL;
} }
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
pwsink->negotiated = FALSE; pwsink->negotiated = FALSE;
@ -808,16 +808,16 @@ on_ctx_state_changed (struct pw_listener *listener,
("context error: %s", ctx->error), (NULL)); ("context error: %s", ctx->error), (NULL));
break; break;
} }
pw_thread_main_loop_signal (pwsink->main_loop, FALSE); pw_thread_loop_signal (pwsink->main_loop, FALSE);
} }
static gboolean static gboolean
gst_pipewire_sink_open (GstPipeWireSink * pwsink) gst_pipewire_sink_open (GstPipeWireSink * pwsink)
{ {
if (pw_thread_main_loop_start (pwsink->main_loop) != SPA_RESULT_OK) if (pw_thread_loop_start (pwsink->main_loop) != SPA_RESULT_OK)
goto mainloop_error; goto mainloop_error;
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
pwsink->ctx = pw_context_new (pwsink->loop, g_get_application_name (), NULL); pwsink->ctx = pw_context_new (pwsink->loop, g_get_application_name (), NULL);
pw_signal_add (&pwsink->ctx->state_changed, &pwsink->ctx_state_changed, on_ctx_state_changed); pw_signal_add (&pwsink->ctx->state_changed, &pwsink->ctx_state_changed, on_ctx_state_changed);
@ -833,9 +833,9 @@ gst_pipewire_sink_open (GstPipeWireSink * pwsink)
if (state == PW_CONTEXT_STATE_ERROR) if (state == PW_CONTEXT_STATE_ERROR)
goto connect_error; goto connect_error;
pw_thread_main_loop_wait (pwsink->main_loop); pw_thread_loop_wait (pwsink->main_loop);
} }
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
return TRUE; return TRUE;
@ -848,7 +848,7 @@ mainloop_error:
} }
connect_error: connect_error:
{ {
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
return FALSE; return FALSE;
} }
} }
@ -856,7 +856,7 @@ connect_error:
static gboolean static gboolean
gst_pipewire_sink_close (GstPipeWireSink * pwsink) gst_pipewire_sink_close (GstPipeWireSink * pwsink)
{ {
pw_thread_main_loop_lock (pwsink->main_loop); pw_thread_loop_lock (pwsink->main_loop);
if (pwsink->stream) { if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream); pw_stream_disconnect (pwsink->stream);
} }
@ -872,12 +872,12 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink)
if (state == PW_CONTEXT_STATE_ERROR) if (state == PW_CONTEXT_STATE_ERROR)
break; break;
pw_thread_main_loop_wait (pwsink->main_loop); pw_thread_loop_wait (pwsink->main_loop);
} }
} }
pw_thread_main_loop_unlock (pwsink->main_loop); pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_main_loop_stop (pwsink->main_loop); pw_thread_loop_stop (pwsink->main_loop);
if (pwsink->stream) { if (pwsink->stream) {
pw_stream_destroy (pwsink->stream); pw_stream_destroy (pwsink->stream);

View file

@ -78,7 +78,7 @@ struct _GstPipeWireSink {
gboolean negotiated; gboolean negotiated;
struct pw_loop *loop; struct pw_loop *loop;
struct pw_thread_main_loop *main_loop; struct pw_thread_loop *main_loop;
struct pw_context *ctx; struct pw_context *ctx;
struct pw_listener ctx_state_changed; struct pw_listener ctx_state_changed;

View file

@ -196,7 +196,7 @@ gst_pipewire_src_finalize (GObject * object)
clear_queue (pwsrc); clear_queue (pwsrc);
pw_thread_main_loop_destroy (pwsrc->main_loop); pw_thread_loop_destroy (pwsrc->main_loop);
pwsrc->main_loop = NULL; pwsrc->main_loop = NULL;
pw_loop_destroy (pwsrc->loop); pw_loop_destroy (pwsrc->loop);
pwsrc->loop = NULL; pwsrc->loop = NULL;
@ -309,7 +309,7 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref); src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref);
src->loop = pw_loop_new (); src->loop = pw_loop_new ();
src->main_loop = pw_thread_main_loop_new (src->loop, "pipewire-main-loop"); src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop");
GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop); GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop);
} }
@ -392,9 +392,9 @@ buffer_recycle (GstMiniObject *obj)
src = data->src; src = data->src;
GST_LOG_OBJECT (obj, "recycle buffer"); GST_LOG_OBJECT (obj, "recycle buffer");
pw_thread_main_loop_lock (src->main_loop); pw_thread_loop_lock (src->main_loop);
pw_stream_recycle_buffer (src->stream, data->id); pw_stream_recycle_buffer (src->stream, data->id);
pw_thread_main_loop_unlock (src->main_loop); pw_thread_loop_unlock (src->main_loop);
return FALSE; return FALSE;
} }
@ -527,7 +527,7 @@ on_new_buffer (struct pw_listener *listener,
g_queue_push_tail (&pwsrc->queue, buf); g_queue_push_tail (&pwsrc->queue, buf);
pw_thread_main_loop_signal (pwsrc->main_loop, FALSE); pw_thread_loop_signal (pwsrc->main_loop, FALSE);
return; return;
} }
@ -553,7 +553,7 @@ on_state_changed (struct pw_listener *listener,
("stream error: %s", stream->error), (NULL)); ("stream error: %s", stream->error), (NULL));
break; break;
} }
pw_thread_main_loop_signal (pwsrc->main_loop, FALSE); pw_thread_loop_signal (pwsrc->main_loop, FALSE);
} }
static void static void
@ -581,7 +581,7 @@ parse_stream_properties (GstPipeWireSrc *pwsrc, struct pw_properties *props)
static gboolean static gboolean
gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
{ {
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
GST_DEBUG_OBJECT (pwsrc, "doing stream start"); GST_DEBUG_OBJECT (pwsrc, "doing stream start");
while (TRUE) { while (TRUE) {
enum pw_stream_state state = pwsrc->stream->state; enum pw_stream_state state = pwsrc->stream->state;
@ -596,24 +596,24 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
if (pwsrc->ctx->state == PW_CONTEXT_STATE_ERROR) if (pwsrc->ctx->state == PW_CONTEXT_STATE_ERROR)
goto start_error; goto start_error;
pw_thread_main_loop_wait (pwsrc->main_loop); pw_thread_loop_wait (pwsrc->main_loop);
} }
parse_stream_properties (pwsrc, pwsrc->stream->properties); parse_stream_properties (pwsrc, pwsrc->stream->properties);
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
GST_DEBUG_OBJECT (pwsrc, "signal started"); GST_DEBUG_OBJECT (pwsrc, "signal started");
pwsrc->started = TRUE; pwsrc->started = TRUE;
pw_thread_main_loop_signal (pwsrc->main_loop, FALSE); pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return TRUE; return TRUE;
start_error: start_error:
{ {
GST_DEBUG_OBJECT (pwsrc, "error starting stream"); GST_DEBUG_OBJECT (pwsrc, "error starting stream");
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return FALSE; return FALSE;
} }
} }
@ -623,7 +623,7 @@ wait_negotiated (GstPipeWireSrc *this)
{ {
enum pw_stream_state state; enum pw_stream_state state;
pw_thread_main_loop_lock (this->main_loop); pw_thread_loop_lock (this->main_loop);
while (TRUE) { while (TRUE) {
state = this->stream->state; state = this->stream->state;
@ -639,10 +639,10 @@ wait_negotiated (GstPipeWireSrc *this)
if (this->started) if (this->started)
break; break;
pw_thread_main_loop_wait (this->main_loop); pw_thread_loop_wait (this->main_loop);
} }
GST_DEBUG_OBJECT (this, "got started signal"); GST_DEBUG_OBJECT (this, "got started signal");
pw_thread_main_loop_unlock (this->main_loop); pw_thread_loop_unlock (this->main_loop);
return state; return state;
} }
@ -687,7 +687,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
possible = gst_caps_to_format_all (caps); possible = gst_caps_to_format_all (caps);
/* first disconnect */ /* first disconnect */
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
if (pwsrc->stream->state != PW_STREAM_STATE_UNCONNECTED) { if (pwsrc->stream->state != PW_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture"); GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream); pw_stream_disconnect (pwsrc->stream);
@ -703,7 +703,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
goto connect_error; goto connect_error;
} }
pw_thread_main_loop_wait (pwsrc->main_loop); pw_thread_loop_wait (pwsrc->main_loop);
} }
} }
@ -730,9 +730,9 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
if (pwsrc->ctx->state == PW_CONTEXT_STATE_ERROR) if (pwsrc->ctx->state == PW_CONTEXT_STATE_ERROR)
goto connect_error; goto connect_error;
pw_thread_main_loop_wait (pwsrc->main_loop); pw_thread_loop_wait (pwsrc->main_loop);
} }
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
result = gst_pipewire_src_stream_start (pwsrc); result = gst_pipewire_src_stream_start (pwsrc);
@ -767,7 +767,7 @@ no_common_caps:
} }
connect_error: connect_error:
{ {
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return FALSE; return FALSE;
} }
} }
@ -831,11 +831,11 @@ gst_pipewire_src_unlock (GstBaseSrc * basesrc)
{ {
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
GST_DEBUG_OBJECT (pwsrc, "setting flushing"); GST_DEBUG_OBJECT (pwsrc, "setting flushing");
pwsrc->flushing = TRUE; pwsrc->flushing = TRUE;
pw_thread_main_loop_signal (pwsrc->main_loop, FALSE); pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return TRUE; return TRUE;
} }
@ -845,10 +845,10 @@ gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc)
{ {
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
GST_DEBUG_OBJECT (pwsrc, "unsetting flushing"); GST_DEBUG_OBJECT (pwsrc, "unsetting flushing");
pwsrc->flushing = FALSE; pwsrc->flushing = FALSE;
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return TRUE; return TRUE;
} }
@ -934,7 +934,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (!pwsrc->negotiated) if (!pwsrc->negotiated)
goto not_negotiated; goto not_negotiated;
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
while (TRUE) { while (TRUE) {
enum pw_stream_state state; enum pw_stream_state state;
@ -953,9 +953,9 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (*buffer != NULL) if (*buffer != NULL)
break; break;
pw_thread_main_loop_wait (pwsrc->main_loop); pw_thread_loop_wait (pwsrc->main_loop);
} }
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
if (pwsrc->is_live) if (pwsrc->is_live)
base_time = GST_ELEMENT_CAST (psrc)->base_time; base_time = GST_ELEMENT_CAST (psrc)->base_time;
@ -986,12 +986,12 @@ not_negotiated:
} }
streaming_error: streaming_error:
{ {
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return GST_FLOW_ERROR; return GST_FLOW_ERROR;
} }
streaming_stopped: streaming_stopped:
{ {
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
} }
@ -1009,9 +1009,9 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc = GST_PIPEWIRE_SRC (basesrc); pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
clear_queue (pwsrc); clear_queue (pwsrc);
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return TRUE; return TRUE;
} }
@ -1035,7 +1035,7 @@ on_ctx_state_changed (struct pw_listener *listener,
("context error: %s", ctx->error), (NULL)); ("context error: %s", ctx->error), (NULL));
break; break;
} }
pw_thread_main_loop_signal (pwsrc->main_loop, FALSE); pw_thread_loop_signal (pwsrc->main_loop, FALSE);
} }
static gboolean static gboolean
@ -1057,10 +1057,10 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
{ {
struct pw_properties *props; struct pw_properties *props;
if (pw_thread_main_loop_start (pwsrc->main_loop) != SPA_RESULT_OK) if (pw_thread_loop_start (pwsrc->main_loop) != SPA_RESULT_OK)
goto mainloop_failed; goto mainloop_failed;
pw_thread_main_loop_lock (pwsrc->main_loop); pw_thread_loop_lock (pwsrc->main_loop);
pwsrc->ctx = pw_context_new (pwsrc->loop, g_get_application_name (), NULL); pwsrc->ctx = pw_context_new (pwsrc->loop, g_get_application_name (), NULL);
pw_signal_add (&pwsrc->ctx->state_changed, &pwsrc->ctx_state_changed, on_ctx_state_changed); pw_signal_add (&pwsrc->ctx->state_changed, &pwsrc->ctx_state_changed, on_ctx_state_changed);
@ -1077,7 +1077,7 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
if (state == PW_CONTEXT_STATE_ERROR) if (state == PW_CONTEXT_STATE_ERROR)
goto connect_error; goto connect_error;
pw_thread_main_loop_wait (pwsrc->main_loop); pw_thread_loop_wait (pwsrc->main_loop);
} }
if (pwsrc->properties) { if (pwsrc->properties) {
@ -1096,7 +1096,7 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
pw_signal_add (&pwsrc->stream->new_buffer, &pwsrc->stream_new_buffer, on_new_buffer); pw_signal_add (&pwsrc->stream->new_buffer, &pwsrc->stream_new_buffer, on_new_buffer);
pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream); pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream);
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return TRUE; return TRUE;
@ -1108,7 +1108,7 @@ mainloop_failed:
} }
connect_error: connect_error:
{ {
pw_thread_main_loop_unlock (pwsrc->main_loop); pw_thread_loop_unlock (pwsrc->main_loop);
return FALSE; return FALSE;
} }
} }
@ -1118,7 +1118,7 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
{ {
clear_queue (pwsrc); clear_queue (pwsrc);
pw_thread_main_loop_stop (pwsrc->main_loop); pw_thread_loop_stop (pwsrc->main_loop);
g_hash_table_remove_all (pwsrc->buf_ids); g_hash_table_remove_all (pwsrc->buf_ids);

View file

@ -65,7 +65,7 @@ struct _GstPipeWireSrc {
GstClockTime max_latency; GstClockTime max_latency;
struct pw_loop *loop; struct pw_loop *loop;
struct pw_thread_main_loop *main_loop; struct pw_thread_loop *main_loop;
struct pw_context *ctx; struct pw_context *ctx;
struct pw_listener ctx_state_changed; struct pw_listener ctx_state_changed;

View file

@ -1178,9 +1178,13 @@ struct pw_client_node *pw_client_node_new(struct pw_client *client,
return NULL; return NULL;
} }
void pw_client_node_destroy(struct pw_client_node *this) /** Destroy a client node
* \param node the client node to destroy
* \memberof pw_client_node
*/
void pw_client_node_destroy(struct pw_client_node *node)
{ {
pw_resource_destroy(this->resource); pw_resource_destroy(node->resource);
} }
/** Get the set of fds for this \ref pw_client_node /** Get the set of fds for this \ref pw_client_node

View file

@ -27,7 +27,7 @@ extern "C" {
#include <pipewire/server/core.h> #include <pipewire/server/core.h>
/** \class command /** \class pw_command
* *
* A configuration command * A configuration command
*/ */

View file

@ -36,6 +36,13 @@ struct pw_global;
#include <pipewire/server/link.h> #include <pipewire/server/link.h>
#include <pipewire/server/node-factory.h> #include <pipewire/server/node-factory.h>
/** \page page_server_api Server API
*
* \section page_server_overview Overview
*
*
*/
typedef int (*pw_bind_func_t) (struct pw_global *global, typedef int (*pw_bind_func_t) (struct pw_global *global,
struct pw_client *client, uint32_t version, uint32_t id); struct pw_client *client, uint32_t version, uint32_t id);
@ -67,6 +74,8 @@ struct pw_global {
* *
* The server core object manages all resources available on the * The server core object manages all resources available on the
* server. * server.
*
* See \ref page_server_api
*/ */
struct pw_core { struct pw_core {
struct pw_global *global; /**< the global of the core */ struct pw_global *global; /**< the global of the core */