Move transport to client-node extension

This commit is contained in:
Wim Taymans 2017-08-07 10:25:02 +02:00
parent 0602d76b9e
commit 97de0de0b7
13 changed files with 370 additions and 378 deletions

View file

@ -38,6 +38,116 @@ struct pw_client_node_proxy;
#define PW_VERSION_CLIENT_NODE 0 #define PW_VERSION_CLIENT_NODE 0
/** Shared structure between client and server \memberof pw_client_node */
struct pw_client_node_area {
uint32_t max_input_ports; /**< max input ports of the node */
uint32_t n_input_ports; /**< number of input ports of the node */
uint32_t max_output_ports; /**< max output ports of the node */
uint32_t n_output_ports; /**< number of output ports of the node */
};
/** \class pw_client_node_transport
*
* \brief Transport object
*
* The transport object contains shared data and ringbuffers to exchange
* events and data between the server and the client in a low-latency and
* lockfree way.
*/
struct pw_client_node_transport {
struct pw_client_node_area *area; /**< the transport area */
struct spa_port_io *inputs; /**< array of input port io */
struct spa_port_io *outputs; /**< array of output port io */
void *input_data; /**< input memory for ringbuffer */
struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */
void *output_data; /**< output memory for ringbuffer */
struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */
/** Add an event to the transport
* \param trans the transport to send the event on
* \param event the event to add
* \return 0 on success, < 0 on error
*
* Write \a event to the shared ringbuffer.
*/
int (*add_event) (struct pw_client_node_transport *trans, struct spa_event *event);
/** Get next event from a transport
* \param trans the transport to get the event of
* \param[out] event the event to read
* \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more events
* are available.
*
* Get the skeleton next event from \a trans into \a event. This function will
* only read the head and object body of the event.
*
* After the complete size of the event has been calculated, you should call
* \ref parse_event() to read the complete event contents.
*/
int (*next_event) (struct pw_client_node_transport *trans, struct spa_event *event);
/** Parse the complete event on transport
* \param trans the transport to read from
* \param[out] event memory that can hold the complete event
* \return 0 on success, < 0 on error
*
* Use this function after \ref next_event().
*
*/
int (*parse_event) (struct pw_client_node_transport *trans, void *event);
};
#define pw_client_node_transport_add_event(t,e) ((t)->add_event((t), (e)))
#define pw_client_node_transport_next_event(t,e) ((t)->next_event((t), (e)))
#define pw_client_node_transport_parse_event(t,e) ((t)->parse_event((t), (e)))
#define PW_TYPE_EVENT__ClientNode SPA_TYPE_EVENT_BASE "ClientNode"
#define PW_TYPE_EVENT_CLIENT_NODE_BASE PW_TYPE_EVENT__ClientNode ":"
#define PW_TYPE_EVENT_CLIENT_NODE__HaveOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "HaveOutput"
#define PW_TYPE_EVENT_CLIENT_NODE__NeedInput PW_TYPE_EVENT_CLIENT_NODE_BASE "NeedInput"
#define PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer PW_TYPE_EVENT_CLIENT_NODE_BASE "ReuseBuffer"
#define PW_TYPE_EVENT_CLIENT_NODE__ProcessInput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessInput"
#define PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessOutput"
struct pw_type_event_client_node {
uint32_t HaveOutput;
uint32_t NeedInput;
uint32_t ReuseBuffer;
uint32_t ProcessInput;
uint32_t ProcessOutput;
};
static inline void
pw_type_event_client_node_map(struct spa_type_map *map, struct pw_type_event_client_node *type)
{
if (type->HaveOutput == 0) {
type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__HaveOutput);
type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__NeedInput);
type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer);
type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessInput);
type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput);
}
}
struct pw_event_client_node_reuse_buffer_body {
struct spa_pod_object_body body;
struct spa_pod_int port_id;
struct spa_pod_int buffer_id;
};
struct pw_event_client_node_reuse_buffer {
struct spa_pod pod;
struct pw_event_client_node_reuse_buffer_body body;
};
#define PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT(type,port_id,buffer_id) \
SPA_EVENT_INIT_COMPLEX(struct pw_event_client_node_reuse_buffer, \
sizeof(struct pw_event_client_node_reuse_buffer_body), type, \
SPA_POD_INT_INIT(port_id), \
SPA_POD_INT_INIT(buffer_id))
/** information about a buffer */ /** information about a buffer */
struct pw_client_node_buffer { struct pw_client_node_buffer {
uint32_t mem_id; /**< the memory id for the metadata */ uint32_t mem_id; /**< the memory id for the metadata */
@ -200,17 +310,13 @@ struct pw_client_node_proxy_events {
* \param node_id the node id created for this client node * \param node_id the node id created for this client node
* \param readfd fd for signal data can be read * \param readfd fd for signal data can be read
* \param writefd fd for signal data can be written * \param writefd fd for signal data can be written
* \param memfd the memory fd of the area * \param transport the shared transport area
* \param offset the offset to map
* \param size the size to map
*/ */
void (*transport) (void *object, void (*transport) (void *object,
uint32_t node_id, uint32_t node_id,
int readfd, int readfd,
int writefd, int writefd,
int memfd, struct pw_client_node_transport *transport);
uint32_t offset,
uint32_t size);
/** /**
* Notify of a property change * Notify of a property change
* *

View file

@ -36,6 +36,7 @@ pipewire_module_mixer = shared_library('pipewire-module-mixer',
pipewire_module_client_node = shared_library('pipewire-module-client-node', pipewire_module_client_node = shared_library('pipewire-module-client-node',
[ 'module-client-node.c', [ 'module-client-node.c',
'module-client-node/client-node.c', 'module-client-node/client-node.c',
'module-client-node/transport.c',
'module-client-node/protocol-native.c', 'module-client-node/protocol-native.c',
'module-protocol-native/connection.c', 'module-protocol-native/connection.c',
'spa/spa-node.c', ], 'spa/spa-node.c', ],

View file

@ -35,11 +35,11 @@
#include "pipewire/pipewire.h" #include "pipewire/pipewire.h"
#include "pipewire/private.h" #include "pipewire/private.h"
#include "pipewire/interfaces.h" #include "pipewire/interfaces.h"
#include "pipewire/transport.h"
#include "pipewire/core.h" #include "pipewire/core.h"
#include "modules/spa/spa-node.h" #include "modules/spa/spa-node.h"
#include "client-node.h" #include "client-node.h"
#include "transport.h"
/** \cond */ /** \cond */
@ -93,6 +93,8 @@ struct proxy {
struct spa_log *log; struct spa_log *log;
struct spa_loop *data_loop; struct spa_loop *data_loop;
struct pw_type_event_client_node type_event_client_node;
const struct spa_node_callbacks *callbacks; const struct spa_node_callbacks *callbacks;
void *callbacks_data; void *callbacks_data;
@ -120,7 +122,7 @@ struct impl {
struct proxy proxy; struct proxy proxy;
struct pw_transport *transport; struct pw_client_node_transport *transport;
struct pw_listener node_listener; struct pw_listener node_listener;
struct pw_listener resource_listener; struct pw_listener resource_listener;
@ -719,9 +721,9 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32
spa_log_trace(this->log, "reuse buffer %d", buffer_id); spa_log_trace(this->log, "reuse buffer %d", buffer_id);
{ {
struct pw_event_transport_reuse_buffer rb = PW_EVENT_TRANSPORT_REUSE_BUFFER_INIT struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT
(impl->t->event_transport.ReuseBuffer, port_id, buffer_id); (this->type_event_client_node.ReuseBuffer, port_id, buffer_id);
pw_transport_add_event(impl->transport, (struct spa_event *) &rb); pw_client_node_transport_add_event(impl->transport, (struct spa_event *) &rb);
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
@ -766,8 +768,8 @@ static int spa_proxy_node_process_input(struct spa_node *node)
impl->transport->inputs[i] = *io; impl->transport->inputs[i] = *io;
io->status = SPA_RESULT_NEED_BUFFER; io->status = SPA_RESULT_NEED_BUFFER;
} }
pw_transport_add_event(impl->transport, pw_client_node_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->t->event_transport.ProcessInput)); &SPA_EVENT_INIT(this->type_event_client_node.ProcessInput));
do_flush(this); do_flush(this);
if (this->callbacks->need_input) if (this->callbacks->need_input)
@ -800,8 +802,8 @@ static int spa_proxy_node_process_output(struct spa_node *node)
*io = tmp; *io = tmp;
pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status); pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status);
} }
pw_transport_add_event(impl->transport, pw_client_node_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->t->event_transport.ProcessOutput)); &SPA_EVENT_INIT(this->type_event_client_node.ProcessOutput));
do_flush(this); do_flush(this);
return res; return res;
@ -812,7 +814,7 @@ static int handle_node_event(struct proxy *this, struct spa_event *event)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy);
int i; int i;
if (SPA_EVENT_TYPE(event) == impl->t->event_transport.HaveOutput) { if (SPA_EVENT_TYPE(event) == this->type_event_client_node.HaveOutput) {
for (i = 0; i < MAX_OUTPUTS; i++) { for (i = 0; i < MAX_OUTPUTS; i++) {
struct spa_port_io *io = this->out_ports[i].io; struct spa_port_io *io = this->out_ports[i].io;
@ -823,11 +825,11 @@ static int handle_node_event(struct proxy *this, struct spa_event *event)
pw_log_trace("%d %d", io->status, io->buffer_id); pw_log_trace("%d %d", io->status, io->buffer_id);
} }
this->callbacks->have_output(this->callbacks_data); this->callbacks->have_output(this->callbacks_data);
} else if (SPA_EVENT_TYPE(event) == impl->t->event_transport.NeedInput) { } else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.NeedInput) {
this->callbacks->need_input(this->callbacks_data); this->callbacks->need_input(this->callbacks_data);
} else if (SPA_EVENT_TYPE(event) == impl->t->event_transport.ReuseBuffer) { } else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) {
struct pw_event_transport_reuse_buffer *p = struct pw_event_client_node_reuse_buffer *p =
(struct pw_event_transport_reuse_buffer *) event; (struct pw_event_client_node_reuse_buffer *) event;
this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value, this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value,
p->body.buffer_id.value); p->body.buffer_id.value);
} }
@ -935,9 +937,10 @@ static void proxy_on_data_fd_events(struct spa_source *source)
spa_log_warn(this->log, "proxy %p: error reading event: %s", spa_log_warn(this->log, "proxy %p: error reading event: %s",
this, strerror(errno)); this, strerror(errno));
while (pw_transport_next_event(impl->transport, &event) == SPA_RESULT_OK) { while (pw_client_node_transport_next_event(impl->transport, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_transport_parse_event(impl->transport, ev); pw_client_node_transport_parse_event(impl->transport, ev);
pw_pod_remap(&ev->pod, &this->resource->client->types);;
handle_node_event(this, ev); handle_node_event(this, ev);
} }
} }
@ -982,13 +985,22 @@ proxy_init(struct proxy *this,
this->log = support[i].data; this->log = support[i].data;
else if (strcmp(support[i].type, SPA_TYPE_LOOP__DataLoop) == 0) else if (strcmp(support[i].type, SPA_TYPE_LOOP__DataLoop) == 0)
this->data_loop = support[i].data; this->data_loop = support[i].data;
else if (strcmp(support[i].type, SPA_TYPE__TypeMap) == 0)
this->map = support[i].data;
} }
if (this->data_loop == NULL) { if (this->data_loop == NULL) {
spa_log_error(this->log, "a data-loop is needed"); spa_log_error(this->log, "a data-loop is needed");
return SPA_RESULT_ERROR;
}
if (this->map == NULL) {
spa_log_error(this->log, "a type map is needed");
return SPA_RESULT_ERROR;
} }
this->node = proxy_node; this->node = proxy_node;
pw_type_event_client_node_map(this->map, &this->type_event_client_node);
this->data_source.func = proxy_on_data_fd_events; this->data_source.func = proxy_on_data_fd_events;
this->data_source.data = this; this->data_source.data = this;
this->data_source.fd = -1; this->data_source.fd = -1;
@ -1035,22 +1047,20 @@ static void node_initialized(void *data)
struct impl *impl = data; struct impl *impl = data;
struct pw_client_node *this = &impl->this; struct pw_client_node *this = &impl->this;
struct pw_node *node = this->node; struct pw_node *node = this->node;
struct pw_transport_info info;
int readfd, writefd; int readfd, writefd;
const struct pw_node_info *i = pw_node_get_info(node); const struct pw_node_info *i = pw_node_get_info(node);
if (this->resource == NULL) if (this->resource == NULL)
return; return;
impl->transport = pw_transport_new(i->max_input_ports, i->max_output_ports, 0); impl->transport = pw_client_node_transport_new(i->max_input_ports, i->max_output_ports);
impl->transport->area->n_input_ports = i->n_input_ports; impl->transport->area->n_input_ports = i->n_input_ports;
impl->transport->area->n_output_ports = i->n_output_ports; impl->transport->area->n_output_ports = i->n_output_ports;
client_node_get_fds(this, &readfd, &writefd); client_node_get_fds(this, &readfd, &writefd);
pw_transport_get_info(impl->transport, &info);
pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)), pw_client_node_resource_transport(this->resource, pw_global_get_id(pw_node_get_global(node)),
readfd, writefd, info.memfd, info.offset, info.size); readfd, writefd, impl->transport);
} }
static int proxy_clear(struct proxy *this) static int proxy_clear(struct proxy *this)
@ -1093,7 +1103,7 @@ static void node_free(void *data)
proxy_clear(&impl->proxy); proxy_clear(&impl->proxy);
if (impl->transport) if (impl->transport)
pw_transport_destroy(impl->transport); pw_client_node_transport_destroy(impl->transport);
pw_listener_remove(&impl->node_listener); pw_listener_remove(&impl->node_listener);

View file

@ -30,6 +30,8 @@
#include "extensions/protocol-native.h" #include "extensions/protocol-native.h"
#include "extensions/client-node.h" #include "extensions/client-node.h"
#include "transport.h"
static void static void
client_node_marshal_done(void *object, int seq, int res) client_node_marshal_done(void *object, int seq, int res)
{ {
@ -371,8 +373,10 @@ static bool client_node_demarshal_transport(void *object, void *data, size_t siz
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct spa_pod_iter it; struct spa_pod_iter it;
uint32_t node_id, ridx, widx, memfd_idx, offset, sz; uint32_t node_id, ridx, widx, memfd_idx;
int readfd, writefd, memfd; int readfd, writefd;
struct pw_client_node_transport_info info;
struct pw_client_node_transport *transport;
if (!spa_pod_iter_struct(&it, data, size) || if (!spa_pod_iter_struct(&it, data, size) ||
!spa_pod_iter_get(&it, !spa_pod_iter_get(&it,
@ -380,19 +384,21 @@ static bool client_node_demarshal_transport(void *object, void *data, size_t siz
SPA_POD_TYPE_INT, &ridx, SPA_POD_TYPE_INT, &ridx,
SPA_POD_TYPE_INT, &widx, SPA_POD_TYPE_INT, &widx,
SPA_POD_TYPE_INT, &memfd_idx, SPA_POD_TYPE_INT, &memfd_idx,
SPA_POD_TYPE_INT, &offset, SPA_POD_TYPE_INT, &info.offset,
SPA_POD_TYPE_INT, &sz, 0)) SPA_POD_TYPE_INT, &info.size, 0))
return false; return false;
readfd = pw_protocol_native_get_proxy_fd(proxy, ridx); readfd = pw_protocol_native_get_proxy_fd(proxy, ridx);
writefd = pw_protocol_native_get_proxy_fd(proxy, widx); writefd = pw_protocol_native_get_proxy_fd(proxy, widx);
memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx); info.memfd = pw_protocol_native_get_proxy_fd(proxy, memfd_idx);
if (readfd == -1 || writefd == -1 || memfd_idx == -1)
if (readfd == -1 || writefd == -1 || info.memfd == -1)
return false; return false;
transport = pw_client_node_transport_new_from_info(&info);
pw_proxy_notify(proxy, struct pw_client_node_proxy_events, transport, node_id, pw_proxy_notify(proxy, struct pw_client_node_proxy_events, transport, node_id,
readfd, writefd, readfd, writefd, transport);
memfd, offset, sz);
return true; return true;
} }
@ -616,11 +622,14 @@ client_node_marshal_port_command(void *object,
} }
static void client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd, static void client_node_marshal_transport(void *object, uint32_t node_id, int readfd, int writefd,
int memfd, uint32_t offset, uint32_t size) struct pw_client_node_transport *transport)
{ {
struct pw_resource *resource = object; struct pw_resource *resource = object;
struct spa_pod_builder *b; struct spa_pod_builder *b;
struct spa_pod_frame f; struct spa_pod_frame f;
struct pw_client_node_transport_info info;
pw_client_node_transport_get_info(transport, &info);
b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT); b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_TRANSPORT);
@ -628,8 +637,9 @@ static void client_node_marshal_transport(void *object, uint32_t node_id, int re
SPA_POD_TYPE_INT, node_id, SPA_POD_TYPE_INT, node_id,
SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, readfd), SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, readfd),
SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, writefd), SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, writefd),
SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, memfd), SPA_POD_TYPE_INT, pw_protocol_native_add_resource_fd(resource, info.memfd),
SPA_POD_TYPE_INT, offset, SPA_POD_TYPE_INT, size); SPA_POD_TYPE_INT, info.offset,
SPA_POD_TYPE_INT, info.size);
pw_protocol_native_end_resource(resource, b); pw_protocol_native_end_resource(resource, b);
} }

View file

@ -22,7 +22,9 @@
#include <sys/mman.h> #include <sys/mman.h>
#include <pipewire/log.h> #include <pipewire/log.h>
#include <pipewire/transport.h> #include <extensions/client-node.h>
#include "transport.h"
/** \cond */ /** \cond */
@ -30,7 +32,7 @@
#define OUTPUT_BUFFER_SIZE (1<<12) #define OUTPUT_BUFFER_SIZE (1<<12)
struct transport { struct transport {
struct pw_transport trans; struct pw_client_node_transport trans;
struct pw_memblock mem; struct pw_memblock mem;
size_t offset; size_t offset;
@ -40,10 +42,10 @@ struct transport {
}; };
/** \endcond */ /** \endcond */
static size_t transport_area_get_size(struct pw_transport_area *area) static size_t area_get_size(struct pw_client_node_area *area)
{ {
size_t size; size_t size;
size = sizeof(struct pw_transport_area); size = sizeof(struct pw_client_node_area);
size += area->max_input_ports * sizeof(struct spa_port_io); size += area->max_input_ports * sizeof(struct spa_port_io);
size += area->max_output_ports * sizeof(struct spa_port_io); size += area->max_output_ports * sizeof(struct spa_port_io);
size += sizeof(struct spa_ringbuffer); size += sizeof(struct spa_ringbuffer);
@ -53,12 +55,12 @@ static size_t transport_area_get_size(struct pw_transport_area *area)
return size; return size;
} }
static void transport_setup_area(void *p, struct pw_transport *trans) static void transport_setup_area(void *p, struct pw_client_node_transport *trans)
{ {
struct pw_transport_area *a; struct pw_client_node_area *a;
trans->area = a = p; trans->area = a = p;
p = SPA_MEMBER(p, sizeof(struct pw_transport_area), struct spa_port_io); p = SPA_MEMBER(p, sizeof(struct pw_client_node_area), struct spa_port_io);
trans->inputs = p; trans->inputs = p;
p = SPA_MEMBER(p, a->max_input_ports * sizeof(struct spa_port_io), void); p = SPA_MEMBER(p, a->max_input_ports * sizeof(struct spa_port_io), void);
@ -79,10 +81,10 @@ static void transport_setup_area(void *p, struct pw_transport *trans)
p = SPA_MEMBER(p, OUTPUT_BUFFER_SIZE, void); p = SPA_MEMBER(p, OUTPUT_BUFFER_SIZE, void);
} }
static void transport_reset_area(struct pw_transport *trans) static void transport_reset_area(struct pw_client_node_transport *trans)
{ {
int i; int i;
struct pw_transport_area *a = trans->area; struct pw_client_node_area *a = trans->area;
for (i = 0; i < a->max_input_ports; i++) { for (i = 0; i < a->max_input_ports; i++) {
trans->inputs[i].status = SPA_RESULT_OK; trans->inputs[i].status = SPA_RESULT_OK;
@ -96,59 +98,122 @@ static void transport_reset_area(struct pw_transport *trans)
spa_ringbuffer_init(trans->output_buffer, OUTPUT_BUFFER_SIZE); spa_ringbuffer_init(trans->output_buffer, OUTPUT_BUFFER_SIZE);
} }
static int add_event(struct pw_client_node_transport *trans, struct spa_event *event)
{
struct transport *impl = (struct transport *) trans;
int32_t filled, avail;
uint32_t size, index;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index);
avail = trans->output_buffer->size - filled;
size = SPA_POD_SIZE(event);
if (avail < size)
return SPA_RESULT_ERROR;
spa_ringbuffer_write_data(trans->output_buffer,
trans->output_data,
index & trans->output_buffer->mask, event, size);
spa_ringbuffer_write_update(trans->output_buffer, index + size);
return SPA_RESULT_OK;
}
static int next_event(struct pw_client_node_transport *trans, struct spa_event *event)
{
struct transport *impl = (struct transport *) trans;
int32_t avail;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index);
if (avail < sizeof(struct spa_event))
return SPA_RESULT_ENUM_END;
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask,
&impl->current, sizeof(struct spa_event));
*event = impl->current;
return SPA_RESULT_OK;
}
static int parse_event(struct pw_client_node_transport *trans, void *event)
{
struct transport *impl = (struct transport *) trans;
uint32_t size;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
size = SPA_POD_SIZE(&impl->current);
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask, event, size);
spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size);
return SPA_RESULT_OK;
}
/** Create a new transport /** Create a new transport
* \param max_input_ports maximum number of input_ports * \param max_input_ports maximum number of input_ports
* \param max_output_ports maximum number of output_ports * \param max_output_ports maximum number of output_ports
* \return a newly allocated \ref pw_transport * \return a newly allocated \ref pw_client_node_transport
* \memberof pw_transport * \memberof pw_client_node_transport
*/ */
struct pw_transport *pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports, size_t user_data_size) struct pw_client_node_transport *
pw_client_node_transport_new(uint32_t max_input_ports, uint32_t max_output_ports)
{ {
struct transport *impl; struct transport *impl;
struct pw_transport *trans; struct pw_client_node_transport *trans;
struct pw_transport_area area; struct pw_client_node_area area;
area.max_input_ports = max_input_ports; area.max_input_ports = max_input_ports;
area.n_input_ports = 0; area.n_input_ports = 0;
area.max_output_ports = max_output_ports; area.max_output_ports = max_output_ports;
area.n_output_ports = 0; area.n_output_ports = 0;
impl = calloc(1, sizeof(struct transport) + user_data_size); impl = calloc(1, sizeof(struct transport));
if (impl == NULL) if (impl == NULL)
return NULL; return NULL;
trans = &impl->trans; trans = &impl->trans;
impl->offset = 0; impl->offset = 0;
if(user_data_size > 0)
trans->user_data = SPA_MEMBER(impl, sizeof(struct transport), void);
pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_MAP_READWRITE |
PW_MEMBLOCK_FLAG_SEAL, transport_area_get_size(&area), &impl->mem); PW_MEMBLOCK_FLAG_SEAL, area_get_size(&area), &impl->mem);
memcpy(impl->mem.ptr, &area, sizeof(struct pw_transport_area)); memcpy(impl->mem.ptr, &area, sizeof(struct pw_client_node_area));
transport_setup_area(impl->mem.ptr, trans); transport_setup_area(impl->mem.ptr, trans);
transport_reset_area(trans); transport_reset_area(trans);
trans->add_event = add_event;
trans->next_event = next_event;
trans->parse_event = parse_event;
return trans; return trans;
} }
struct pw_transport *pw_transport_new_from_info(struct pw_transport_info *info, size_t user_data_size) struct pw_client_node_transport *
pw_client_node_transport_new_from_info(struct pw_client_node_transport_info *info)
{ {
struct transport *impl; struct transport *impl;
struct pw_transport *trans; struct pw_client_node_transport *trans;
void *tmp; void *tmp;
impl = calloc(1, sizeof(struct transport) + user_data_size); impl = calloc(1, sizeof(struct transport));
if (impl == NULL) if (impl == NULL)
return NULL; return NULL;
trans = &impl->trans; trans = &impl->trans;
if(user_data_size > 0)
trans->user_data = SPA_MEMBER(impl, sizeof(struct transport), void);
impl->mem.flags = PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_WITH_FD; impl->mem.flags = PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_WITH_FD;
impl->mem.fd = info->memfd; impl->mem.fd = info->memfd;
impl->mem.offset = info->offset; impl->mem.offset = info->offset;
@ -171,6 +236,10 @@ struct pw_transport *pw_transport_new_from_info(struct pw_transport_info *info,
trans->output_data = trans->input_data; trans->output_data = trans->input_data;
trans->input_data = tmp; trans->input_data = tmp;
trans->add_event = add_event;
trans->next_event = next_event;
trans->parse_event = parse_event;
return trans; return trans;
mmap_failed: mmap_failed:
@ -180,9 +249,9 @@ struct pw_transport *pw_transport_new_from_info(struct pw_transport_info *info,
/** Destroy a transport /** Destroy a transport
* \param trans a transport to destroy * \param trans a transport to destroy
* \memberof pw_transport * \memberof pw_client_node_transport
*/ */
void pw_transport_destroy(struct pw_transport *trans) void pw_client_node_transport_destroy(struct pw_client_node_transport *trans)
{ {
struct transport *impl = (struct transport *) trans; struct transport *impl = (struct transport *) trans;
@ -200,9 +269,10 @@ void pw_transport_destroy(struct pw_transport *trans)
* Fill \a info with the transport info of \a trans. This information can be * Fill \a info with the transport info of \a trans. This information can be
* passed to the client to set up the shared transport. * passed to the client to set up the shared transport.
* *
* \memberof pw_transport * \memberof pw_client_node_transport
*/ */
int pw_transport_get_info(struct pw_transport *trans, struct pw_transport_info *info) int pw_client_node_transport_get_info(struct pw_client_node_transport *trans,
struct pw_client_node_transport_info *info)
{ {
struct transport *impl = (struct transport *) trans; struct transport *impl = (struct transport *) trans;
@ -212,99 +282,3 @@ int pw_transport_get_info(struct pw_transport *trans, struct pw_transport_info *
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
/** Add an event to the transport
* \param trans the transport to send the event on
* \param event the event to add
* \return 0 on success, < 0 on error
*
* Write \a event to the shared ringbuffer and signal the other side that
* new data can be read.
*
* \memberof pw_transport
*/
int pw_transport_add_event(struct pw_transport *trans, struct spa_event *event)
{
struct transport *impl = (struct transport *) trans;
int32_t filled, avail;
uint32_t size, index;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index);
avail = trans->output_buffer->size - filled;
size = SPA_POD_SIZE(event);
if (avail < size)
return SPA_RESULT_ERROR;
spa_ringbuffer_write_data(trans->output_buffer,
trans->output_data,
index & trans->output_buffer->mask, event, size);
spa_ringbuffer_write_update(trans->output_buffer, index + size);
return SPA_RESULT_OK;
}
/** Get next event from a transport
* \param trans the transport to get the event of
* \param[out] event the event to read
* \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more events
* are available.
*
* Get the skeleton next event from \a trans into \a event. This function will
* only read the head and object body of the event.
*
* After the complete size of the event has been calculated, you should call
* \ref pw_transport_parse_event() to read the complete event contents.
*
* \memberof pw_transport
*/
int pw_transport_next_event(struct pw_transport *trans, struct spa_event *event)
{
struct transport *impl = (struct transport *) trans;
int32_t avail;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index);
if (avail < sizeof(struct spa_event))
return SPA_RESULT_ENUM_END;
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask,
&impl->current, sizeof(struct spa_event));
*event = impl->current;
return SPA_RESULT_OK;
}
/** Parse the complete event on transport
* \param trans the transport to read from
* \param[out] event memory that can hold the complete event
* \return 0 on success, < 0 on error
*
* Use this function after \ref pw_transport_next_event().
*
* \memberof pw_transport
*/
int pw_transport_parse_event(struct pw_transport *trans, void *event)
{
struct transport *impl = (struct transport *) trans;
uint32_t size;
if (impl == NULL || event == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
size = SPA_POD_SIZE(&impl->current);
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask, event, size);
spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size);
return SPA_RESULT_OK;
}

View file

@ -0,0 +1,58 @@
/* PipeWire
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __PIPEWIRE_CLIENT_NODE_TRANSPORT_H__
#define __PIPEWIRE_CLIENT_NODE_TRANSPORT_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <string.h>
#include <spa/defs.h>
#include <spa/node.h>
#include <pipewire/mem.h>
/** information about the transport region \memberof pw_client_node */
struct pw_client_node_transport_info {
int memfd; /**< the memfd of the transport area */
uint32_t offset; /**< offset to map \a memfd at */
uint32_t size; /**< size of memfd mapping */
};
struct pw_client_node_transport *
pw_client_node_transport_new(uint32_t max_input_ports, uint32_t max_output_ports);
struct pw_client_node_transport *
pw_client_node_transport_new_from_info(struct pw_client_node_transport_info *info);
void
pw_client_node_transport_destroy(struct pw_client_node_transport *trans);
int
pw_client_node_transport_get_info(struct pw_client_node_transport *trans,
struct pw_client_node_transport_info *info);
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __PIPEWIRE_CLIENT_NODE_TRANSPORT_H__ */

View file

@ -26,7 +26,6 @@ pipewire_headers = [
'rtkit.h', 'rtkit.h',
'stream.h', 'stream.h',
'thread-loop.h', 'thread-loop.h',
'transport.h',
'type.h', 'type.h',
'utils.h', 'utils.h',
'work-queue.h', 'work-queue.h',
@ -56,7 +55,6 @@ pipewire_sources = [
'stream.c', 'stream.c',
'rtkit.c', 'rtkit.c',
'thread-loop.c', 'thread-loop.c',
'transport.c',
'type.c', 'type.c',
'utils.c', 'utils.c',
'work-queue.c', 'work-queue.c',

View file

@ -46,7 +46,6 @@ struct pw_node;
#include <pipewire/mem.h> #include <pipewire/mem.h>
#include <pipewire/introspect.h> #include <pipewire/introspect.h>
#include <pipewire/transport.h>
#include <pipewire/core.h> #include <pipewire/core.h>
#include <pipewire/port.h> #include <pipewire/port.h>

View file

@ -42,6 +42,7 @@
struct remote { struct remote {
struct pw_remote this; struct pw_remote this;
uint32_t type_client_node; uint32_t type_client_node;
struct pw_type_event_client_node type_event_client_node;
struct pw_listener core_listener; struct pw_listener core_listener;
}; };
@ -70,7 +71,9 @@ struct node_data {
int rtreadfd; int rtreadfd;
int rtwritefd; int rtwritefd;
struct spa_source *rtsocket_source; struct spa_source *rtsocket_source;
struct pw_transport *trans; struct pw_client_node_transport *trans;
struct spa_graph_port *in_ports;
struct spa_graph_port *out_ports;
struct pw_node *node; struct pw_node *node;
struct pw_listener node_listener; struct pw_listener node_listener;
@ -83,11 +86,6 @@ struct node_data {
bool in_order; bool in_order;
}; };
struct trans_data {
struct spa_graph_port *in_ports;
struct spa_graph_port *out_ports;
/* memory for ports follows */
};
/** \endcond */ /** \endcond */
@ -223,6 +221,7 @@ struct pw_remote *pw_remote_new(struct pw_core *core,
this->properties = properties; this->properties = properties;
impl->type_client_node = spa_type_map_get_id(core->type.map, PW_TYPE_INTERFACE__ClientNode); impl->type_client_node = spa_type_map_get_id(core->type.map, PW_TYPE_INTERFACE__ClientNode);
pw_type_event_client_node_map(core->type.map, &impl->type_event_client_node);
this->state = PW_REMOTE_STATE_UNCONNECTED; this->state = PW_REMOTE_STATE_UNCONNECTED;
pw_map_init(&this->objects, 64, 32); pw_map_init(&this->objects, 64, 32);
@ -409,9 +408,10 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event)
{ {
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_remote *remote = proxy->remote; struct pw_remote *remote = proxy->remote;
struct remote *this = SPA_CONTAINER_OF(remote, struct remote, this);
struct spa_graph_node *n = &data->node->rt.node; struct spa_graph_node *n = &data->node->rt.node;
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) { if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessInput) {
struct spa_list ready; struct spa_list ready;
struct spa_graph_port *port; struct spa_graph_port *port;
@ -422,10 +422,10 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event)
spa_graph_scheduler_chain(data->node->rt.sched, &ready); spa_graph_scheduler_chain(data->node->rt.sched, &ready);
} }
else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) { else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessOutput) {
n->callbacks->process_output(n->callbacks_data); n->callbacks->process_output(n->callbacks_data);
} }
else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ReuseBuffer) { else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) {
} }
else { else {
pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event)); pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event));
@ -451,45 +451,34 @@ on_rtsocket_condition(struct spa_loop_utils *utils,
read(data->rtreadfd, &cmd, 8); read(data->rtreadfd, &cmd, 8);
while (pw_transport_next_event(data->trans, &event) == SPA_RESULT_OK) { while (pw_client_node_transport_next_event(data->trans, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_transport_parse_event(data->trans, ev); pw_client_node_transport_parse_event(data->trans, ev);
handle_rtnode_event(proxy, ev); handle_rtnode_event(proxy, ev);
} }
} }
} }
static void client_node_transport(void *object, uint32_t node_id, static void client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd, int memfd, uint32_t offset, uint32_t size) int readfd, int writefd,
struct pw_client_node_transport *transport)
{ {
struct pw_proxy *proxy = object; struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data; struct node_data *data = proxy->user_data;
struct pw_transport_info info;
struct trans_data *t;
struct pw_port *port; struct pw_port *port;
int i; int i;
data->node_id = node_id; data->node_id = node_id;
data->trans = transport;
info.memfd = memfd;
if (info.memfd == -1)
return;
info.offset = offset;
info.size = size;
if (data->trans)
pw_transport_destroy(data->trans);
data->trans = pw_transport_new_from_info(&info, sizeof(struct trans_data));
t = data->trans->user_data;
pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u", pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u",
proxy, data->trans, readfd, writefd, node_id); proxy, data->trans, readfd, writefd, node_id);
t->in_ports = calloc(data->trans->area->max_input_ports, sizeof(struct spa_graph_port)); data->in_ports = calloc(data->trans->area->max_input_ports, sizeof(struct spa_graph_port));
t->out_ports = calloc(data->trans->area->max_output_ports, sizeof(struct spa_graph_port)); data->out_ports = calloc(data->trans->area->max_output_ports, sizeof(struct spa_graph_port));
for (i = 0; i < data->trans->area->max_input_ports; i++) { for (i = 0; i < data->trans->area->max_input_ports; i++) {
spa_graph_port_init(&t->in_ports[i], spa_graph_port_init(&data->in_ports[i],
SPA_DIRECTION_INPUT, SPA_DIRECTION_INPUT,
i, i,
0, 0,
@ -497,10 +486,10 @@ static void client_node_transport(void *object, uint32_t node_id,
pw_log_info("transport in %d %p", i, &data->trans->inputs[i]); pw_log_info("transport in %d %p", i, &data->trans->inputs[i]);
} }
spa_list_for_each(port, &data->node->input_ports, link) spa_list_for_each(port, &data->node->input_ports, link)
spa_graph_port_add(&port->rt.mix_node, &t->in_ports[port->port_id]); spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id]);
for (i = 0; i < data->trans->area->max_output_ports; i++) { for (i = 0; i < data->trans->area->max_output_ports; i++) {
spa_graph_port_init(&t->out_ports[i], spa_graph_port_init(&data->out_ports[i],
SPA_DIRECTION_OUTPUT, SPA_DIRECTION_OUTPUT,
i, i,
0, 0,
@ -508,7 +497,7 @@ static void client_node_transport(void *object, uint32_t node_id,
pw_log_info("transport out %d %p", i, &data->trans->inputs[i]); pw_log_info("transport out %d %p", i, &data->trans->inputs[i]);
} }
spa_list_for_each(port, &data->node->output_ports, link) spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_add(&port->rt.mix_node, &t->out_ports[port->port_id]); spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id]);
data->rtreadfd = readfd; data->rtreadfd = readfd;
data->rtwritefd = writefd; data->rtwritefd = writefd;
@ -924,20 +913,22 @@ static const struct pw_client_node_proxy_events client_node_events = {
static void node_need_input(void *data) static void node_need_input(void *data)
{ {
struct node_data *d = data; struct node_data *d = data;
struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this);
uint64_t cmd = 1; uint64_t cmd = 1;
pw_transport_add_event(d->trans, pw_client_node_transport_add_event(d->trans,
&SPA_EVENT_INIT(d->t->event_transport.NeedInput)); &SPA_EVENT_INIT(this->type_event_client_node.NeedInput));
write(d->rtwritefd, &cmd, 8); write(d->rtwritefd, &cmd, 8);
} }
static void node_have_output(void *data) static void node_have_output(void *data)
{ {
struct node_data *d = data; struct node_data *d = data;
struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this);
uint64_t cmd = 1; uint64_t cmd = 1;
pw_transport_add_event(d->trans, pw_client_node_transport_add_event(d->trans,
&SPA_EVENT_INIT(d->t->event_transport.HaveOutput)); &SPA_EVENT_INIT(this->type_event_client_node.HaveOutput));
write(d->rtwritefd, &cmd, 8); write(d->rtwritefd, &cmd, 8);
} }

View file

@ -31,7 +31,6 @@
#include "pipewire/interfaces.h" #include "pipewire/interfaces.h"
#include "pipewire/array.h" #include "pipewire/array.h"
#include "pipewire/stream.h" #include "pipewire/stream.h"
#include "pipewire/transport.h"
#include "pipewire/utils.h" #include "pipewire/utils.h"
#include "pipewire/stream.h" #include "pipewire/stream.h"
#include "extensions/client-node.h" #include "extensions/client-node.h"
@ -64,6 +63,7 @@ struct stream {
struct pw_stream this; struct pw_stream this;
uint32_t type_client_node; uint32_t type_client_node;
struct pw_type_event_client_node type_event_client_node;
uint32_t n_possible_formats; uint32_t n_possible_formats;
struct spa_format **possible_formats; struct spa_format **possible_formats;
@ -88,7 +88,7 @@ struct stream {
struct pw_listener node_listener; struct pw_listener node_listener;
struct pw_listener proxy_listener; struct pw_listener proxy_listener;
struct pw_transport *trans; struct pw_client_node_transport *trans;
struct spa_source *timeout_source; struct spa_source *timeout_source;
@ -208,6 +208,7 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
this->remote = remote; this->remote = remote;
this->name = strdup(name); this->name = strdup(name);
impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode);
pw_type_event_client_node_map(remote->core->type.map, &impl->type_event_client_node);
pw_listener_list_init(&this->listener_list); pw_listener_list_init(&this->listener_list);
@ -341,9 +342,6 @@ void pw_stream_destroy(struct pw_stream *stream)
if (stream->properties) if (stream->properties)
pw_properties_free(stream->properties); pw_properties_free(stream->properties);
if (impl->trans)
pw_transport_destroy(impl->trans);
if (stream->name) if (stream->name)
free(stream->name); free(stream->name);
@ -388,8 +386,8 @@ static inline void send_need_input(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1; uint64_t cmd = 1;
pw_transport_add_event(impl->trans, pw_client_node_transport_add_event(impl->trans,
&SPA_EVENT_INIT(stream->remote->core->type.event_transport.NeedInput)); &SPA_EVENT_INIT(impl->type_event_client_node.NeedInput));
write(impl->rtwritefd, &cmd, 8); write(impl->rtwritefd, &cmd, 8);
#endif #endif
} }
@ -399,8 +397,8 @@ static inline void send_have_output(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1; uint64_t cmd = 1;
pw_transport_add_event(impl->trans, pw_client_node_transport_add_event(impl->trans,
&SPA_EVENT_INIT(stream->remote->core->type.event_transport.HaveOutput)); &SPA_EVENT_INIT(impl->type_event_client_node.HaveOutput));
write(impl->rtwritefd, &cmd, 8); write(impl->rtwritefd, &cmd, 8);
} }
@ -487,9 +485,8 @@ static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *event) static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *event)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct pw_remote *remote = impl->this.remote;
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) { if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessInput) {
int i; int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) { for (i = 0; i < impl->trans->area->n_input_ports; i++) {
@ -505,7 +502,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
input->buffer_id = SPA_ID_INVALID; input->buffer_id = SPA_ID_INVALID;
} }
send_need_input(stream); send_need_input(stream);
} else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) { } else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessOutput) {
int i; int i;
for (i = 0; i < impl->trans->area->n_output_ports; i++) { for (i = 0; i < impl->trans->area->n_output_ports; i++) {
@ -521,9 +518,9 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
impl->in_need_buffer = true; impl->in_need_buffer = true;
pw_listener_list_emit_na(&stream->listener_list, struct pw_stream_events, need_buffer); pw_listener_list_emit_na(&stream->listener_list, struct pw_stream_events, need_buffer);
impl->in_need_buffer = false; impl->in_need_buffer = false;
} else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ReuseBuffer) { } else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ReuseBuffer) {
struct pw_event_transport_reuse_buffer *p = struct pw_event_client_node_reuse_buffer *p =
(struct pw_event_transport_reuse_buffer *) event; (struct pw_event_client_node_reuse_buffer *) event;
if (p->body.port_id.value != impl->port_id) if (p->body.port_id.value != impl->port_id)
return; return;
@ -555,9 +552,10 @@ on_rtsocket_condition(struct spa_loop_utils *utils,
read(impl->rtreadfd, &cmd, 8); read(impl->rtreadfd, &cmd, 8);
while (pw_transport_next_event(impl->trans, &event) == SPA_RESULT_OK) { while (pw_client_node_transport_next_event(impl->trans, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_transport_parse_event(impl->trans, ev); pw_client_node_transport_parse_event(impl->trans, ev);
pw_pod_remap(&ev->pod, &stream->remote->types);;
handle_rtnode_event(stream, ev); handle_rtnode_event(stream, ev);
} }
} }
@ -856,23 +854,15 @@ client_node_port_command(void *data,
} }
static void client_node_transport(void *data, uint32_t node_id, static void client_node_transport(void *data, uint32_t node_id,
int readfd, int writefd, int memfd, uint32_t offset, uint32_t size) int readfd, int writefd,
struct pw_client_node_transport *transport)
{ {
struct stream *impl = data; struct stream *impl = data;
struct pw_stream *stream = &impl->this; struct pw_stream *stream = &impl->this;
struct pw_transport_info info;
stream->node_id = node_id; stream->node_id = node_id;
info.memfd = memfd; impl->trans = transport;
if (info.memfd == -1)
return;
info.offset = offset;
info.size = size;
if (impl->trans)
pw_transport_destroy(impl->trans);
impl->trans = pw_transport_new_from_info(&info, 0);
pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", pw_log_info("stream %p: create client transport %p with fds %d %d for node %u",
stream, impl->trans, readfd, writefd, node_id); stream, impl->trans, readfd, writefd, node_id);
@ -1028,8 +1018,8 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct pw_event_transport_reuse_buffer rb = PW_EVENT_TRANSPORT_REUSE_BUFFER_INIT struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT
(stream->remote->core->type.event_transport.ReuseBuffer, impl->port_id, id); (impl->type_event_client_node.ReuseBuffer, impl->port_id, id);
struct buffer_id *bid; struct buffer_id *bid;
uint64_t cmd = 1; uint64_t cmd = 1;
@ -1039,7 +1029,7 @@ bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
bid->used = false; bid->used = false;
spa_list_insert(impl->free.prev, &bid->link); spa_list_insert(impl->free.prev, &bid->link);
pw_transport_add_event(impl->trans, (struct spa_event *) &rb); pw_client_node_transport_add_event(impl->trans, (struct spa_event *) &rb);
write(impl->rtwritefd, &cmd, 8); write(impl->rtwritefd, &cmd, 8);
return true; return true;

View file

@ -1,141 +0,0 @@
/* PipeWire
* Copyright (C) 2016 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __PIPEWIRE_TRANSPORT_H__
#define __PIPEWIRE_TRANSPORT_H__
#ifdef __cplusplus
extern "C" {
#endif
#include <string.h>
#include <spa/defs.h>
#include <spa/node.h>
#include <pipewire/mem.h>
/** information about the transport region \memberof pw_transport */
struct pw_transport_info {
int memfd; /**< the memfd of the transport area */
uint32_t offset; /**< offset to map \a memfd at */
uint32_t size; /**< size of memfd mapping */
};
/** Shared structure between client and server \memberof pw_transport */
struct pw_transport_area {
uint32_t max_input_ports; /**< max input ports of the node */
uint32_t n_input_ports; /**< number of input ports of the node */
uint32_t max_output_ports; /**< max output ports of the node */
uint32_t n_output_ports; /**< number of output ports of the node */
};
/** \class pw_transport
*
* \brief Transport object
*
* The transport object contains shared data and ringbuffers to exchange
* events and data between the server and the client in a low-latency and
* lockfree way.
*/
struct pw_transport {
struct pw_transport_area *area; /**< the transport area */
struct spa_port_io *inputs; /**< array of input port io */
struct spa_port_io *outputs; /**< array of output port io */
void *input_data; /**< input memory for ringbuffer */
struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */
void *output_data; /**< output memory for ringbuffer */
struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */
void *user_data;
};
struct pw_transport *
pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports, size_t user_data_size);
struct pw_transport *
pw_transport_new_from_info(struct pw_transport_info *info, size_t user_data_size);
void
pw_transport_destroy(struct pw_transport *trans);
int
pw_transport_get_info(struct pw_transport *trans, struct pw_transport_info *info);
int
pw_transport_add_event(struct pw_transport *trans, struct spa_event *event);
int
pw_transport_next_event(struct pw_transport *trans, struct spa_event *event);
int
pw_transport_parse_event(struct pw_transport *trans, void *event);
#define PW_TYPE_EVENT__Transport SPA_TYPE_EVENT_BASE "Transport"
#define PW_TYPE_EVENT_TRANSPORT_BASE PW_TYPE_EVENT__Transport ":"
#define PW_TYPE_EVENT_TRANSPORT__HaveOutput PW_TYPE_EVENT_TRANSPORT_BASE "HaveOutput"
#define PW_TYPE_EVENT_TRANSPORT__NeedInput PW_TYPE_EVENT_TRANSPORT_BASE "NeedInput"
#define PW_TYPE_EVENT_TRANSPORT__ReuseBuffer PW_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer"
#define PW_TYPE_EVENT_TRANSPORT__ProcessInput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessInput"
#define PW_TYPE_EVENT_TRANSPORT__ProcessOutput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessOutput"
struct pw_type_event_transport {
uint32_t HaveOutput;
uint32_t NeedInput;
uint32_t ReuseBuffer;
uint32_t ProcessInput;
uint32_t ProcessOutput;
};
static inline void
pw_type_event_transport_map(struct spa_type_map *map, struct pw_type_event_transport *type)
{
if (type->HaveOutput == 0) {
type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__HaveOutput);
type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__NeedInput);
type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ReuseBuffer);
type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessInput);
type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessOutput);
}
}
struct pw_event_transport_reuse_buffer_body {
struct spa_pod_object_body body;
struct spa_pod_int port_id;
struct spa_pod_int buffer_id;
};
struct pw_event_transport_reuse_buffer {
struct spa_pod pod;
struct pw_event_transport_reuse_buffer_body body;
};
#define PW_EVENT_TRANSPORT_REUSE_BUFFER_INIT(type,port_id,buffer_id) \
SPA_EVENT_INIT_COMPLEX(struct pw_event_transport_reuse_buffer, \
sizeof(struct pw_event_transport_reuse_buffer_body), type, \
SPA_POD_INT_INIT(port_id), \
SPA_POD_INT_INIT(buffer_id))
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif /* __PIPEWIRE_TRANSPORT_H__ */

View file

@ -62,8 +62,6 @@ void pw_type_init(struct pw_type *type)
spa_type_param_alloc_buffers_map(type->map, &type->param_alloc_buffers); spa_type_param_alloc_buffers_map(type->map, &type->param_alloc_buffers);
spa_type_param_alloc_meta_enable_map(type->map, &type->param_alloc_meta_enable); spa_type_param_alloc_meta_enable_map(type->map, &type->param_alloc_meta_enable);
spa_type_param_alloc_video_padding_map(type->map, &type->param_alloc_video_padding); spa_type_param_alloc_video_padding_map(type->map, &type->param_alloc_video_padding);
pw_type_event_transport_map(type->map, &type->event_transport);
} }
bool pw_pod_remap_data(uint32_t type, void *body, uint32_t size, struct pw_map *types) bool pw_pod_remap_data(uint32_t type, void *body, uint32_t size, struct pw_map *types)

View file

@ -31,7 +31,6 @@ extern "C" {
#include <spa/param-alloc.h> #include <spa/param-alloc.h>
#include <pipewire/map.h> #include <pipewire/map.h>
#include <pipewire/transport.h>
#define PW_TYPE_BASE "PipeWire:" #define PW_TYPE_BASE "PipeWire:"
@ -72,7 +71,6 @@ struct pw_type {
struct spa_type_param_alloc_buffers param_alloc_buffers; struct spa_type_param_alloc_buffers param_alloc_buffers;
struct spa_type_param_alloc_meta_enable param_alloc_meta_enable; struct spa_type_param_alloc_meta_enable param_alloc_meta_enable;
struct spa_type_param_alloc_video_padding param_alloc_video_padding; struct spa_type_param_alloc_video_padding param_alloc_video_padding;
struct pw_type_event_transport event_transport;
}; };
void void