mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
stream: use 2 eventfds for client <-> server signaling
This commit is contained in:
parent
f0aafb5b51
commit
7d1d3bd666
9 changed files with 80 additions and 56 deletions
|
|
@ -217,7 +217,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void (*done) (void *object,
|
void (*done) (void *object,
|
||||||
int datafd);
|
int readfd,
|
||||||
|
int writefd);
|
||||||
void (*event) (void *object,
|
void (*event) (void *object,
|
||||||
const SpaEvent *event);
|
const SpaEvent *event);
|
||||||
void (*add_port) (void *object,
|
void (*add_port) (void *object,
|
||||||
|
|
|
||||||
|
|
@ -576,17 +576,22 @@ client_node_demarshal_done (void *object,
|
||||||
PinosProxy *proxy = object;
|
PinosProxy *proxy = object;
|
||||||
SpaPODIter it;
|
SpaPODIter it;
|
||||||
PinosConnection *connection = proxy->context->protocol_private;
|
PinosConnection *connection = proxy->context->protocol_private;
|
||||||
int32_t idx;
|
int32_t ridx, widx;
|
||||||
int fd;
|
int readfd, writefd;
|
||||||
|
|
||||||
if (!spa_pod_iter_struct (&it, data, size) ||
|
if (!spa_pod_iter_struct (&it, data, size) ||
|
||||||
!spa_pod_iter_get (&it,
|
!spa_pod_iter_get (&it,
|
||||||
SPA_POD_TYPE_INT, &idx,
|
SPA_POD_TYPE_INT, &ridx,
|
||||||
|
SPA_POD_TYPE_INT, &widx,
|
||||||
0))
|
0))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
fd = pinos_connection_get_fd (connection, idx);
|
readfd = pinos_connection_get_fd (connection, ridx);
|
||||||
((PinosClientNodeEvents*)proxy->implementation)->done (proxy, fd);
|
writefd = pinos_connection_get_fd (connection, widx);
|
||||||
|
if (readfd == -1 || writefd == -1)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
((PinosClientNodeEvents*)proxy->implementation)->done (proxy, readfd, writefd);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -75,7 +75,8 @@ typedef struct
|
||||||
|
|
||||||
PinosStreamMode mode;
|
PinosStreamMode mode;
|
||||||
|
|
||||||
int rtfd;
|
int rtreadfd;
|
||||||
|
int rtwritefd;
|
||||||
SpaSource *rtsocket_source;
|
SpaSource *rtsocket_source;
|
||||||
|
|
||||||
PinosProxy *node_proxy;
|
PinosProxy *node_proxy;
|
||||||
|
|
@ -372,7 +373,7 @@ send_need_input (PinosStream *stream)
|
||||||
ni.event.type = SPA_EVENT_NODE_NEED_INPUT;
|
ni.event.type = SPA_EVENT_NODE_NEED_INPUT;
|
||||||
ni.event.size = sizeof (ni);
|
ni.event.size = sizeof (ni);
|
||||||
pinos_transport_add_event (impl->trans, &ni.event);
|
pinos_transport_add_event (impl->trans, &ni.event);
|
||||||
write (impl->rtfd, &cmd, 8);
|
write (impl->rtwritefd, &cmd, 8);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -384,7 +385,7 @@ send_have_output (PinosStream *stream)
|
||||||
uint64_t cmd = 1;
|
uint64_t cmd = 1;
|
||||||
|
|
||||||
pinos_transport_add_event (impl->trans, &ho);
|
pinos_transport_add_event (impl->trans, &ho);
|
||||||
write (impl->rtfd, &cmd, 8);
|
write (impl->rtwritefd, &cmd, 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
|
@ -553,7 +554,7 @@ on_rtsocket_condition (SpaLoopUtils *utils,
|
||||||
SpaEvent event;
|
SpaEvent event;
|
||||||
uint64_t cmd;
|
uint64_t cmd;
|
||||||
|
|
||||||
read (impl->rtfd, &cmd, 8);
|
read (impl->rtreadfd, &cmd, 8);
|
||||||
|
|
||||||
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
|
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
|
||||||
SpaEvent *ev = alloca (SPA_POD_SIZE (&event));
|
SpaEvent *ev = alloca (SPA_POD_SIZE (&event));
|
||||||
|
|
@ -564,14 +565,15 @@ on_rtsocket_condition (SpaLoopUtils *utils,
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
handle_socket (PinosStream *stream, int rtfd)
|
handle_socket (PinosStream *stream, int rtreadfd, int rtwritefd)
|
||||||
{
|
{
|
||||||
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
||||||
struct timespec interval;
|
struct timespec interval;
|
||||||
|
|
||||||
impl->rtfd = rtfd;
|
impl->rtreadfd = rtreadfd;
|
||||||
|
impl->rtwritefd = rtwritefd;
|
||||||
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
|
impl->rtsocket_source = pinos_loop_add_io (stream->context->loop,
|
||||||
impl->rtfd,
|
impl->rtreadfd,
|
||||||
SPA_IO_ERR | SPA_IO_HUP,
|
SPA_IO_ERR | SPA_IO_HUP,
|
||||||
true,
|
true,
|
||||||
on_rtsocket_condition,
|
on_rtsocket_condition,
|
||||||
|
|
@ -659,13 +661,14 @@ handle_node_command (PinosStream *stream,
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_node_done (void *object,
|
client_node_done (void *object,
|
||||||
int datafd)
|
int readfd,
|
||||||
|
int writefd)
|
||||||
{
|
{
|
||||||
PinosProxy *proxy = object;
|
PinosProxy *proxy = object;
|
||||||
PinosStream *stream = proxy->user_data;
|
PinosStream *stream = proxy->user_data;
|
||||||
|
|
||||||
pinos_log_info ("strean %p: create client node done with fd %d", stream, datafd);
|
pinos_log_info ("strean %p: create client node done with fds %d %d", stream, readfd, writefd);
|
||||||
handle_socket (stream, datafd);
|
handle_socket (stream, readfd, writefd);
|
||||||
do_node_init (stream);
|
do_node_init (stream);
|
||||||
|
|
||||||
stream_set_state (stream, PINOS_STREAM_STATE_CONFIGURE, NULL);
|
stream_set_state (stream, PINOS_STREAM_STATE_CONFIGURE, NULL);
|
||||||
|
|
@ -927,7 +930,7 @@ client_node_transport (void *object,
|
||||||
pinos_transport_destroy (impl->trans);
|
pinos_transport_destroy (impl->trans);
|
||||||
impl->trans = pinos_transport_new_from_info (&info);
|
impl->trans = pinos_transport_new_from_info (&info);
|
||||||
|
|
||||||
pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans);
|
pinos_log_debug ("transport update %p", impl->trans);
|
||||||
}
|
}
|
||||||
|
|
||||||
static const PinosClientNodeEvents client_node_events = {
|
static const PinosClientNodeEvents client_node_events = {
|
||||||
|
|
@ -1156,7 +1159,7 @@ pinos_stream_recycle_buffer (PinosStream *stream,
|
||||||
spa_list_insert (impl->free.prev, &bid->link);
|
spa_list_insert (impl->free.prev, &bid->link);
|
||||||
|
|
||||||
pinos_transport_add_event (impl->trans, (SpaEvent *)&rb);
|
pinos_transport_add_event (impl->trans, (SpaEvent *)&rb);
|
||||||
write (impl->rtfd, &cmd, 8);
|
write (impl->rtwritefd, &cmd, 8);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -394,8 +394,8 @@ handle_create_client_node (PinosDaemon1 *interface,
|
||||||
PinosProperties *props;
|
PinosProperties *props;
|
||||||
GError *error = NULL;
|
GError *error = NULL;
|
||||||
GUnixFDList *fdlist;
|
GUnixFDList *fdlist;
|
||||||
int ctrl_fd, data_fd;
|
int ctrl_fd, data_rfd, data_wfd;
|
||||||
int ctrl_idx, data_idx;
|
int ctrl_idx, data_ridx, data_widx;
|
||||||
PinosProtocolDBusObject *object;
|
PinosProtocolDBusObject *object;
|
||||||
int fd[2];
|
int fd[2];
|
||||||
|
|
||||||
|
|
@ -426,7 +426,7 @@ handle_create_client_node (PinosDaemon1 *interface,
|
||||||
if (object == NULL)
|
if (object == NULL)
|
||||||
goto object_failed;
|
goto object_failed;
|
||||||
|
|
||||||
if ((res = pinos_client_node_get_data_socket (node, &data_fd)) < 0)
|
if ((res = pinos_client_node_get_fds (node, &data_rfd, &data_wfd)) < 0)
|
||||||
goto no_socket;
|
goto no_socket;
|
||||||
|
|
||||||
object_path = object->object_path;
|
object_path = object->object_path;
|
||||||
|
|
@ -434,10 +434,11 @@ handle_create_client_node (PinosDaemon1 *interface,
|
||||||
|
|
||||||
fdlist = g_unix_fd_list_new ();
|
fdlist = g_unix_fd_list_new ();
|
||||||
ctrl_idx = g_unix_fd_list_append (fdlist, ctrl_fd, &error);
|
ctrl_idx = g_unix_fd_list_append (fdlist, ctrl_fd, &error);
|
||||||
data_idx = g_unix_fd_list_append (fdlist, data_fd, &error);
|
data_ridx = g_unix_fd_list_append (fdlist, data_rfd, &error);
|
||||||
|
data_widx = g_unix_fd_list_append (fdlist, data_wfd, &error);
|
||||||
|
|
||||||
g_dbus_method_invocation_return_value_with_unix_fd_list (invocation,
|
g_dbus_method_invocation_return_value_with_unix_fd_list (invocation,
|
||||||
g_variant_new ("(ohh)", object_path, ctrl_idx, data_idx), fdlist);
|
g_variant_new ("(ohhh)", object_path, ctrl_idx, data_ridx, data_widx), fdlist);
|
||||||
g_object_unref (fdlist);
|
g_object_unref (fdlist);
|
||||||
|
|
||||||
return TRUE;
|
return TRUE;
|
||||||
|
|
|
||||||
|
|
@ -102,6 +102,7 @@ struct _SpaProxy
|
||||||
PinosResource *resource;
|
PinosResource *resource;
|
||||||
|
|
||||||
SpaSource data_source;
|
SpaSource data_source;
|
||||||
|
int writefd;
|
||||||
|
|
||||||
uint32_t max_inputs;
|
uint32_t max_inputs;
|
||||||
uint32_t n_inputs;
|
uint32_t n_inputs;
|
||||||
|
|
@ -129,7 +130,8 @@ struct _PinosClientNodeImpl
|
||||||
PinosListener loop_changed;
|
PinosListener loop_changed;
|
||||||
PinosListener global_added;
|
PinosListener global_added;
|
||||||
|
|
||||||
int data_fd;
|
int fds[2];
|
||||||
|
int other_fds[2];
|
||||||
};
|
};
|
||||||
|
|
||||||
static SpaResult
|
static SpaResult
|
||||||
|
|
@ -163,7 +165,7 @@ static inline void
|
||||||
do_flush (SpaProxy *this)
|
do_flush (SpaProxy *this)
|
||||||
{
|
{
|
||||||
uint64_t cmd = 1;
|
uint64_t cmd = 1;
|
||||||
write (this->data_source.fd, &cmd, 8);
|
write (this->writefd, &cmd, 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
|
|
@ -1140,10 +1142,9 @@ client_node_resource_destroy (PinosResource *resource)
|
||||||
pinos_signal_remove (&impl->loop_changed);
|
pinos_signal_remove (&impl->loop_changed);
|
||||||
pinos_signal_remove (&impl->initialized);
|
pinos_signal_remove (&impl->initialized);
|
||||||
|
|
||||||
if (proxy->data_source.fd != -1) {
|
if (proxy->data_source.fd != -1)
|
||||||
spa_loop_remove_source (proxy->data_loop, &proxy->data_source);
|
spa_loop_remove_source (proxy->data_loop, &proxy->data_source);
|
||||||
close (proxy->data_source.fd);
|
|
||||||
}
|
|
||||||
pinos_node_destroy (this->node);
|
pinos_node_destroy (this->node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1161,8 +1162,10 @@ on_node_free (PinosListener *listener,
|
||||||
if (impl->transport)
|
if (impl->transport)
|
||||||
pinos_transport_destroy (impl->transport);
|
pinos_transport_destroy (impl->transport);
|
||||||
|
|
||||||
if (impl->data_fd != -1)
|
if (impl->fds[0] != -1)
|
||||||
close (impl->data_fd);
|
close (impl->fds[0]);
|
||||||
|
if (impl->fds[1] != -1)
|
||||||
|
close (impl->fds[1]);
|
||||||
free (impl);
|
free (impl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1193,7 +1196,7 @@ pinos_client_node_new (PinosClient *client,
|
||||||
this->client = client;
|
this->client = client;
|
||||||
|
|
||||||
impl->core = client->core;
|
impl->core = client->core;
|
||||||
impl->data_fd = -1;
|
impl->fds[0] = impl->fds[1] = -1;
|
||||||
pinos_log_debug ("client-node %p: new", impl);
|
pinos_log_debug ("client-node %p: new", impl);
|
||||||
|
|
||||||
pinos_signal_init (&this->destroy_signal);
|
pinos_signal_init (&this->destroy_signal);
|
||||||
|
|
@ -1257,39 +1260,45 @@ pinos_client_node_destroy (PinosClientNode * this)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* pinos_client_node_get_data_socket:
|
* pinos_client_node_get_fds:
|
||||||
* @node: a #PinosClientNode
|
* @node: a #PinosClientNode
|
||||||
* @error: a #GError
|
* @readfd: an fd for reading
|
||||||
|
* @writefd: an fd for writing
|
||||||
*
|
*
|
||||||
* Create or return a previously create socket pair for @node. The
|
* Create or return a previously create set of fds for @node.
|
||||||
* Socket for the other end is returned.
|
|
||||||
*
|
*
|
||||||
* Returns: %SPA_RESULT_OK on success
|
* Returns: %SPA_RESULT_OK on success
|
||||||
*/
|
*/
|
||||||
SpaResult
|
SpaResult
|
||||||
pinos_client_node_get_data_socket (PinosClientNode *this,
|
pinos_client_node_get_fds (PinosClientNode *this,
|
||||||
int *fd)
|
int *readfd,
|
||||||
|
int *writefd)
|
||||||
{
|
{
|
||||||
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
|
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
|
||||||
|
|
||||||
if (impl->data_fd == -1) {
|
if (impl->fds[0] == -1) {
|
||||||
#if 1
|
#if 0
|
||||||
int fd[2];
|
if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != 0)
|
||||||
|
|
||||||
if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0)
|
|
||||||
return SPA_RESULT_ERRNO;
|
return SPA_RESULT_ERRNO;
|
||||||
|
|
||||||
impl->proxy.data_source.fd = fd[0];
|
impl->proxy.data_source.fd = impl->fds[0];
|
||||||
impl->data_fd = fd[1];
|
impl->proxy.writefd = impl->fds[0];
|
||||||
|
impl->other_fds[0] = impl->fds[1];
|
||||||
|
impl->other_fds[1] = impl->fds[1];
|
||||||
#else
|
#else
|
||||||
|
impl->fds[0] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
|
||||||
impl->proxy.data_source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
|
impl->fds[1] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
|
||||||
impl->data_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
|
impl->proxy.data_source.fd = impl->fds[0];
|
||||||
|
impl->proxy.writefd = impl->fds[1];
|
||||||
|
impl->other_fds[0] = impl->fds[1];
|
||||||
|
impl->other_fds[1] = impl->fds[0];
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source);
|
spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source);
|
||||||
pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd);
|
pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd);
|
||||||
}
|
}
|
||||||
*fd = impl->data_fd;
|
*readfd = impl->other_fds[0];
|
||||||
|
*writefd = impl->other_fds[1];
|
||||||
|
|
||||||
return SPA_RESULT_OK;
|
return SPA_RESULT_OK;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,9 @@ PinosClientNode * pinos_client_node_new (PinosClient *client,
|
||||||
PinosProperties *properties);
|
PinosProperties *properties);
|
||||||
void pinos_client_node_destroy (PinosClientNode *node);
|
void pinos_client_node_destroy (PinosClientNode *node);
|
||||||
|
|
||||||
SpaResult pinos_client_node_get_data_socket (PinosClientNode *node, int *fd);
|
SpaResult pinos_client_node_get_fds (PinosClientNode *node,
|
||||||
|
int *readfd,
|
||||||
|
int *writefd);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -203,7 +203,7 @@ async_create_client_node_complete (PinosAccessData *data)
|
||||||
PinosClient *client = resource->client;
|
PinosClient *client = resource->client;
|
||||||
PinosClientNode *node;
|
PinosClientNode *node;
|
||||||
SpaResult res;
|
SpaResult res;
|
||||||
int data_fd;
|
int readfd, writefd;
|
||||||
|
|
||||||
if (data->res != SPA_RESULT_OK)
|
if (data->res != SPA_RESULT_OK)
|
||||||
goto denied;
|
goto denied;
|
||||||
|
|
@ -215,16 +215,17 @@ async_create_client_node_complete (PinosAccessData *data)
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
goto no_mem;
|
goto no_mem;
|
||||||
|
|
||||||
if ((res = pinos_client_node_get_data_socket (node, &data_fd)) < 0) {
|
if ((res = pinos_client_node_get_fds (node, &readfd, &writefd)) < 0) {
|
||||||
pinos_core_notify_error (client->core_resource,
|
pinos_core_notify_error (client->core_resource,
|
||||||
resource->id,
|
resource->id,
|
||||||
SPA_RESULT_ERROR,
|
SPA_RESULT_ERROR,
|
||||||
"can't get data fd");
|
"can't get data fds");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pinos_client_node_notify_done (node->resource,
|
pinos_client_node_notify_done (node->resource,
|
||||||
data_fd);
|
readfd,
|
||||||
|
writefd);
|
||||||
goto done;
|
goto done;
|
||||||
|
|
||||||
no_mem:
|
no_mem:
|
||||||
|
|
|
||||||
|
|
@ -530,7 +530,8 @@ client_marshal_info (void *object,
|
||||||
|
|
||||||
static void
|
static void
|
||||||
client_node_marshal_done (void *object,
|
client_node_marshal_done (void *object,
|
||||||
int datafd)
|
int readfd,
|
||||||
|
int writefd)
|
||||||
{
|
{
|
||||||
PinosResource *resource = object;
|
PinosResource *resource = object;
|
||||||
PinosConnection *connection = resource->client->protocol_private;
|
PinosConnection *connection = resource->client->protocol_private;
|
||||||
|
|
@ -540,7 +541,8 @@ client_node_marshal_done (void *object,
|
||||||
core_update_map (resource->client);
|
core_update_map (resource->client);
|
||||||
|
|
||||||
spa_pod_builder_struct (&b.b, &f,
|
spa_pod_builder_struct (&b.b, &f,
|
||||||
SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, datafd));
|
SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, readfd),
|
||||||
|
SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, writefd));
|
||||||
|
|
||||||
pinos_connection_end_write (connection, resource->id, PINOS_CLIENT_NODE_EVENT_DONE, b.b.offset);
|
pinos_connection_end_write (connection, resource->id, PINOS_CLIENT_NODE_EVENT_DONE, b.b.offset);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -282,7 +282,7 @@ make_nodes (AppData *data)
|
||||||
spa_pod_builder_init (&b, buffer, sizeof (buffer));
|
spa_pod_builder_init (&b, buffer, sizeof (buffer));
|
||||||
spa_pod_builder_props (&b, &f[0], data->type.props,
|
spa_pod_builder_props (&b, &f[0], data->type.props,
|
||||||
SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, "hw:1"),
|
SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, "hw:1"),
|
||||||
SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, 128));
|
SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, 64));
|
||||||
props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps);
|
props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps);
|
||||||
|
|
||||||
if ((res = spa_node_set_props (data->sink, props)) < 0)
|
if ((res = spa_node_set_props (data->sink, props)) < 0)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue