mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-01 22:58:50 -04:00
only send events
Only send events between client and server and use the socket simply to wake up the other end.
This commit is contained in:
parent
cae971e106
commit
c8648eaf59
3 changed files with 90 additions and 117 deletions
|
|
@ -311,7 +311,27 @@ static inline void
|
||||||
send_need_input (PinosStream *stream)
|
send_need_input (PinosStream *stream)
|
||||||
{
|
{
|
||||||
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
||||||
uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
|
SpaNodeEventNeedInput ni;
|
||||||
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
|
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
|
||||||
|
ni.event.size = sizeof (ni);
|
||||||
|
ni.port_id = 0;
|
||||||
|
pinos_transport_add_event (impl->trans, &ni.event);
|
||||||
|
write (impl->rtfd, &cmd, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void
|
||||||
|
send_have_output (PinosStream *stream)
|
||||||
|
{
|
||||||
|
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
||||||
|
SpaNodeEventHaveOutput ho;
|
||||||
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
|
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
|
||||||
|
ho.event.size = sizeof (ho);
|
||||||
|
ho.port_id = 0;
|
||||||
|
pinos_transport_add_event (impl->trans, &ho.event);
|
||||||
write (impl->rtfd, &cmd, 1);
|
write (impl->rtfd, &cmd, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -419,8 +439,24 @@ handle_rtnode_event (PinosStream *stream,
|
||||||
|
|
||||||
switch (event->type) {
|
switch (event->type) {
|
||||||
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
|
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < impl->trans->area->n_inputs; i++) {
|
||||||
|
SpaPortInput *input = &impl->trans->inputs[i];
|
||||||
|
|
||||||
|
if (input->buffer_id == SPA_ID_INVALID)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
|
||||||
|
input->buffer_id = SPA_ID_INVALID;
|
||||||
|
}
|
||||||
|
send_need_input (stream);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
|
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
|
||||||
pinos_log_warn ("unhandled node event %d", event->type);
|
pinos_signal_emit (&stream->need_buffer, stream);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
|
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
|
||||||
|
|
@ -459,33 +495,15 @@ on_rtsocket_condition (SpaSource *source,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mask & SPA_IO_IN) {
|
if (mask & SPA_IO_IN) {
|
||||||
|
SpaNodeEvent event;
|
||||||
uint8_t cmd;
|
uint8_t cmd;
|
||||||
int i;
|
|
||||||
|
|
||||||
read (impl->rtfd, &cmd, 1);
|
read (impl->rtfd, &cmd, 1);
|
||||||
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) {
|
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
|
||||||
for (i = 0; i < impl->trans->area->n_inputs; i++) {
|
SpaNodeEvent *ev = alloca (event.size);
|
||||||
SpaPortInput *input = &impl->trans->inputs[i];
|
pinos_transport_parse_event (impl->trans, ev);
|
||||||
|
handle_rtnode_event (stream, ev);
|
||||||
if (input->buffer_id == SPA_ID_INVALID)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id);
|
|
||||||
input->buffer_id = SPA_ID_INVALID;
|
|
||||||
}
|
|
||||||
send_need_input (stream);
|
|
||||||
}
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
|
|
||||||
SpaNodeEvent event;
|
|
||||||
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
|
|
||||||
SpaNodeEvent *ev = alloca (event.size);
|
|
||||||
pinos_transport_parse_event (impl->trans, ev);
|
|
||||||
handle_rtnode_event (stream, ev);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_NEED_DATA) {
|
|
||||||
pinos_signal_emit (&stream->need_buffer, stream);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1034,7 +1052,7 @@ pinos_stream_recycle_buffer (PinosStream *stream,
|
||||||
{
|
{
|
||||||
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
||||||
SpaNodeEventReuseBuffer rb;
|
SpaNodeEventReuseBuffer rb;
|
||||||
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT;
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
|
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
|
||||||
rb.event.size = sizeof (rb);
|
rb.event.size = sizeof (rb);
|
||||||
|
|
@ -1088,14 +1106,12 @@ pinos_stream_send_buffer (PinosStream *stream,
|
||||||
{
|
{
|
||||||
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
|
||||||
BufferId *bid;
|
BufferId *bid;
|
||||||
uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
|
|
||||||
|
|
||||||
if ((bid = find_buffer (stream, id))) {
|
if ((bid = find_buffer (stream, id))) {
|
||||||
bid->used = true;
|
bid->used = true;
|
||||||
impl->trans->outputs[0].buffer_id = id;
|
impl->trans->outputs[0].buffer_id = id;
|
||||||
impl->trans->outputs[0].status = SPA_RESULT_OK;
|
impl->trans->outputs[0].status = SPA_RESULT_OK;
|
||||||
if (write (impl->rtfd, &cmd, 1) < 1)
|
send_have_output (stream);
|
||||||
perror ("write");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
||||||
|
|
@ -36,12 +36,6 @@ typedef struct _PinosTransportArea PinosTransportArea;
|
||||||
#include <pinos/client/mem.h>
|
#include <pinos/client/mem.h>
|
||||||
#include <pinos/client/sig.h>
|
#include <pinos/client/sig.h>
|
||||||
|
|
||||||
#define PINOS_TRANSPORT_CMD_NONE 0
|
|
||||||
#define PINOS_TRANSPORT_CMD_NEED_DATA (1<<0)
|
|
||||||
#define PINOS_TRANSPORT_CMD_HAVE_DATA (1<<1)
|
|
||||||
#define PINOS_TRANSPORT_CMD_HAVE_EVENT (1<<2)
|
|
||||||
#define PINOS_TRANSPORT_CMD_SYNC (1<<3)
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int memfd;
|
int memfd;
|
||||||
off_t offset;
|
off_t offset;
|
||||||
|
|
|
||||||
|
|
@ -166,6 +166,34 @@ spa_proxy_node_set_props (SpaNode *node,
|
||||||
return SPA_RESULT_NOT_IMPLEMENTED;
|
return SPA_RESULT_NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
send_need_input (SpaProxy *this)
|
||||||
|
{
|
||||||
|
PinosNode *pnode = this->pnode;
|
||||||
|
SpaNodeEventNeedInput ni;
|
||||||
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
|
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
|
||||||
|
ni.event.size = sizeof (ni);
|
||||||
|
ni.port_id = 0;
|
||||||
|
pinos_transport_add_event (pnode->transport, &ni.event);
|
||||||
|
write (this->data_source.fd, &cmd, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
send_have_output (SpaProxy *this)
|
||||||
|
{
|
||||||
|
PinosNode *pnode = this->pnode;
|
||||||
|
SpaNodeEventHaveOutput ho;
|
||||||
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
|
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
|
||||||
|
ho.event.size = sizeof (ho);
|
||||||
|
ho.port_id = 0;
|
||||||
|
pinos_transport_add_event (pnode->transport, &ho.event);
|
||||||
|
write (this->data_source.fd, &cmd, 1);
|
||||||
|
}
|
||||||
|
|
||||||
static SpaResult
|
static SpaResult
|
||||||
spa_proxy_node_send_command (SpaNode *node,
|
spa_proxy_node_send_command (SpaNode *node,
|
||||||
SpaNodeCommand *command)
|
SpaNodeCommand *command)
|
||||||
|
|
@ -201,10 +229,9 @@ spa_proxy_node_send_command (SpaNode *node,
|
||||||
PINOS_MESSAGE_NODE_COMMAND,
|
PINOS_MESSAGE_NODE_COMMAND,
|
||||||
&cnc,
|
&cnc,
|
||||||
true);
|
true);
|
||||||
if (command->type == SPA_NODE_COMMAND_START) {
|
if (command->type == SPA_NODE_COMMAND_START)
|
||||||
uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
|
send_need_input (this);
|
||||||
write (this->data_source.fd, &cmd, 1);
|
|
||||||
}
|
|
||||||
res = SPA_RESULT_RETURN_ASYNC (cnc.seq);
|
res = SPA_RESULT_RETURN_ASYNC (cnc.seq);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -830,7 +857,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
|
||||||
SpaProxy *this;
|
SpaProxy *this;
|
||||||
SpaNodeEventReuseBuffer rb;
|
SpaNodeEventReuseBuffer rb;
|
||||||
PinosNode *pnode;
|
PinosNode *pnode;
|
||||||
uint8_t cmd;
|
uint8_t cmd = 0;
|
||||||
|
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
return SPA_RESULT_INVALID_ARGUMENTS;
|
return SPA_RESULT_INVALID_ARGUMENTS;
|
||||||
|
|
@ -846,8 +873,6 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
|
||||||
rb.port_id = port_id;
|
rb.port_id = port_id;
|
||||||
rb.buffer_id = buffer_id;
|
rb.buffer_id = buffer_id;
|
||||||
pinos_transport_add_event (pnode->transport, &rb.event);
|
pinos_transport_add_event (pnode->transport, &rb.event);
|
||||||
|
|
||||||
cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT;
|
|
||||||
write (this->data_source.fd, &cmd, 1);
|
write (this->data_source.fd, &cmd, 1);
|
||||||
|
|
||||||
return SPA_RESULT_OK;
|
return SPA_RESULT_OK;
|
||||||
|
|
@ -890,39 +915,13 @@ static SpaResult
|
||||||
spa_proxy_node_process_input (SpaNode *node)
|
spa_proxy_node_process_input (SpaNode *node)
|
||||||
{
|
{
|
||||||
SpaProxy *this;
|
SpaProxy *this;
|
||||||
uint8_t cmd;
|
|
||||||
|
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
return SPA_RESULT_INVALID_ARGUMENTS;
|
return SPA_RESULT_INVALID_ARGUMENTS;
|
||||||
|
|
||||||
this = SPA_CONTAINER_OF (node, SpaProxy, node);
|
this = SPA_CONTAINER_OF (node, SpaProxy, node);
|
||||||
|
|
||||||
#if 0
|
send_have_output (this);
|
||||||
{
|
|
||||||
unsigned int i;
|
|
||||||
bool have_error = false;
|
|
||||||
|
|
||||||
for (i = 0; i < this->n_inputs; i++) {
|
|
||||||
SpaProxyPort *port = &this->in_ports[i];
|
|
||||||
SpaPortInput *input;
|
|
||||||
|
|
||||||
if ((input = port->io) == NULL)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (!CHECK_PORT_BUFFER (this, input->buffer_id, port)) {
|
|
||||||
input->status = SPA_RESULT_INVALID_BUFFER_ID;
|
|
||||||
have_error = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
copy_meta_out (this, port, input->buffer_id);
|
|
||||||
}
|
|
||||||
if (have_error)
|
|
||||||
return SPA_RESULT_ERROR;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
|
|
||||||
write (this->data_source.fd, &cmd, 1);
|
|
||||||
|
|
||||||
return SPA_RESULT_OK;
|
return SPA_RESULT_OK;
|
||||||
}
|
}
|
||||||
|
|
@ -931,37 +930,13 @@ static SpaResult
|
||||||
spa_proxy_node_process_output (SpaNode *node)
|
spa_proxy_node_process_output (SpaNode *node)
|
||||||
{
|
{
|
||||||
SpaProxy *this;
|
SpaProxy *this;
|
||||||
uint8_t cmd;
|
|
||||||
|
|
||||||
if (node == NULL)
|
if (node == NULL)
|
||||||
return SPA_RESULT_INVALID_ARGUMENTS;
|
return SPA_RESULT_INVALID_ARGUMENTS;
|
||||||
|
|
||||||
this = SPA_CONTAINER_OF (node, SpaProxy, node);
|
this = SPA_CONTAINER_OF (node, SpaProxy, node);
|
||||||
|
|
||||||
#if 0
|
send_need_input (this);
|
||||||
{
|
|
||||||
unsigned int i;
|
|
||||||
bool have_error = false;
|
|
||||||
|
|
||||||
for (i = 0; i < this->n_outputs; i++) {
|
|
||||||
SpaProxyPort *port = &this->out_ports[i];
|
|
||||||
SpaPortOutput *output;
|
|
||||||
|
|
||||||
if ((output = port->io) == NULL)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
copy_meta_in (this, port, output->buffer_id);
|
|
||||||
|
|
||||||
if (output->status != SPA_RESULT_OK)
|
|
||||||
have_error = true;
|
|
||||||
}
|
|
||||||
if (have_error)
|
|
||||||
return SPA_RESULT_ERROR;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
cmd = PINOS_TRANSPORT_CMD_NEED_DATA;
|
|
||||||
write (this->data_source.fd, &cmd, 1);
|
|
||||||
|
|
||||||
return SPA_RESULT_OK;
|
return SPA_RESULT_OK;
|
||||||
}
|
}
|
||||||
|
|
@ -1078,33 +1053,21 @@ proxy_on_data_fd_events (SpaSource *source)
|
||||||
SpaProxy *this = source->data;
|
SpaProxy *this = source->data;
|
||||||
PinosNode *pnode = this->pnode;
|
PinosNode *pnode = this->pnode;
|
||||||
|
|
||||||
|
if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) {
|
||||||
|
spa_log_warn (this->log, "proxy %p: got error", this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (source->rmask & SPA_IO_IN) {
|
if (source->rmask & SPA_IO_IN) {
|
||||||
|
SpaNodeEvent event;
|
||||||
uint8_t cmd;
|
uint8_t cmd;
|
||||||
|
|
||||||
if (read (this->data_source.fd, &cmd, 1) < 1)
|
read (this->data_source.fd, &cmd, 1);
|
||||||
return;
|
|
||||||
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
|
while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) {
|
||||||
SpaNodeEvent event;
|
SpaNodeEvent *ev = alloca (event.size);
|
||||||
while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) {
|
pinos_transport_parse_event (pnode->transport, ev);
|
||||||
SpaNodeEvent *ev = alloca (event.size);
|
this->event_cb (&this->node, ev, this->user_data);
|
||||||
pinos_transport_parse_event (pnode->transport, ev);
|
|
||||||
this->event_cb (&this->node, ev, this->user_data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) {
|
|
||||||
SpaNodeEventHaveOutput ho;
|
|
||||||
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
|
|
||||||
ho.event.size = sizeof (ho);
|
|
||||||
ho.port_id = 0;
|
|
||||||
this->event_cb (&this->node, &ho.event, this->user_data);
|
|
||||||
}
|
|
||||||
if (cmd & PINOS_TRANSPORT_CMD_NEED_DATA) {
|
|
||||||
SpaNodeEventNeedInput ni;
|
|
||||||
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
|
|
||||||
ni.event.size = sizeof (ni);
|
|
||||||
ni.port_id = 0;
|
|
||||||
this->event_cb (&this->node, &ni.event, this->user_data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue