don't use generic events for real-time messages

Use static event enumerations for real-time messages.
This commit is contained in:
Wim Taymans 2017-08-07 18:07:38 +02:00
parent 97de0de0b7
commit b90dac7656
5 changed files with 131 additions and 146 deletions

View file

@ -38,6 +38,8 @@ struct pw_client_node_proxy;
#define PW_VERSION_CLIENT_NODE 0
struct pw_client_node_message;
/** Shared structure between client and server \memberof pw_client_node */
struct pw_client_node_area {
uint32_t max_input_ports; /**< max input ports of the node */
@ -63,88 +65,86 @@ struct pw_client_node_transport {
void *output_data; /**< output memory for ringbuffer */
struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */
/** Add an event to the transport
* \param trans the transport to send the event on
* \param event the event to add
/** Add a message to the transport
* \param trans the transport to send the message on
* \param message the message to add
* \return 0 on success, < 0 on error
*
* Write \a event to the shared ringbuffer.
* Write \a message to the shared ringbuffer.
*/
int (*add_event) (struct pw_client_node_transport *trans, struct spa_event *event);
int (*add_message) (struct pw_client_node_transport *trans, struct pw_client_node_message *message);
/** Get next event from a transport
* \param trans the transport to get the event of
* \param[out] event the event to read
* \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more events
/** Get next message from a transport
* \param trans the transport to get the message of
* \param[out] message the message to read
* \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more messages
* are available.
*
* Get the skeleton next event from \a trans into \a event. This function will
* only read the head and object body of the event.
* Get the skeleton next message from \a trans into \a message. This function will
* only read the head and object body of the message.
*
* After the complete size of the event has been calculated, you should call
* \ref parse_event() to read the complete event contents.
* After the complete size of the message has been calculated, you should call
* \ref parse_message() to read the complete message contents.
*/
int (*next_event) (struct pw_client_node_transport *trans, struct spa_event *event);
int (*next_message) (struct pw_client_node_transport *trans, struct pw_client_node_message *message);
/** Parse the complete event on transport
/** Parse the complete message on transport
* \param trans the transport to read from
* \param[out] event memory that can hold the complete event
* \param[out] message memory that can hold the complete message
* \return 0 on success, < 0 on error
*
* Use this function after \ref next_event().
*
* Use this function after \ref next_message().
*/
int (*parse_event) (struct pw_client_node_transport *trans, void *event);
int (*parse_message) (struct pw_client_node_transport *trans, void *message);
};
#define pw_client_node_transport_add_event(t,e) ((t)->add_event((t), (e)))
#define pw_client_node_transport_next_event(t,e) ((t)->next_event((t), (e)))
#define pw_client_node_transport_parse_event(t,e) ((t)->parse_event((t), (e)))
#define pw_client_node_transport_add_message(t,m) ((t)->add_message((t), (m)))
#define pw_client_node_transport_next_message(t,m) ((t)->next_message((t), (m)))
#define pw_client_node_transport_parse_message(t,m) ((t)->parse_message((t), (m)))
#define PW_TYPE_EVENT__ClientNode SPA_TYPE_EVENT_BASE "ClientNode"
#define PW_TYPE_EVENT_CLIENT_NODE_BASE PW_TYPE_EVENT__ClientNode ":"
#define PW_TYPE_EVENT_CLIENT_NODE__HaveOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "HaveOutput"
#define PW_TYPE_EVENT_CLIENT_NODE__NeedInput PW_TYPE_EVENT_CLIENT_NODE_BASE "NeedInput"
#define PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer PW_TYPE_EVENT_CLIENT_NODE_BASE "ReuseBuffer"
#define PW_TYPE_EVENT_CLIENT_NODE__ProcessInput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessInput"
#define PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessOutput"
struct pw_type_event_client_node {
uint32_t HaveOutput;
uint32_t NeedInput;
uint32_t ReuseBuffer;
uint32_t ProcessInput;
uint32_t ProcessOutput;
enum pw_client_node_message_type {
PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT,
PW_CLIENT_NODE_MESSAGE_NEED_INPUT,
PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER,
PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT,
PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT,
};
static inline void
pw_type_event_client_node_map(struct spa_type_map *map, struct pw_type_event_client_node *type)
{
if (type->HaveOutput == 0) {
type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__HaveOutput);
type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__NeedInput);
type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer);
type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessInput);
type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput);
}
}
struct pw_event_client_node_reuse_buffer_body {
struct spa_pod_object_body body;
struct spa_pod_int port_id;
struct spa_pod_int buffer_id;
struct pw_client_node_message_body {
struct spa_pod_int type SPA_ALIGNED(8);
};
struct pw_event_client_node_reuse_buffer {
struct spa_pod pod;
struct pw_event_client_node_reuse_buffer_body body;
struct pw_client_node_message {
struct spa_pod_struct pod;
struct pw_client_node_message_body body;
};
#define PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT(type,port_id,buffer_id) \
SPA_EVENT_INIT_COMPLEX(struct pw_event_client_node_reuse_buffer, \
sizeof(struct pw_event_client_node_reuse_buffer_body), type, \
SPA_POD_INT_INIT(port_id), \
struct pw_client_node_message_reuse_buffer_body {
struct spa_pod_int type SPA_ALIGNED(8);
struct spa_pod_int port_id SPA_ALIGNED(8);
struct spa_pod_int buffer_id SPA_ALIGNED(8);
};
struct pw_client_node_message_reuse_buffer {
struct spa_pod_struct pod;
struct pw_client_node_message_reuse_buffer_body body;
};
#define PW_CLIENT_NODE_MESSAGE_TYPE(message) (((struct pw_client_node_message*)(message))->body.type.value)
#define PW_CLIENT_NODE_MESSAGE_INIT(ev) (struct pw_client_node_message) \
{ { { sizeof(struct pw_client_node_message_body), SPA_POD_TYPE_STRUCT } }, \
{ SPA_POD_INT_INIT(ev) } }
#define PW_CLIENT_NODE_MESSAGE_INIT_VA(type,size,message,...) (type) \
{ { { size, SPA_POD_TYPE_STRUCT } }, \
{ SPA_POD_INT_INIT(message), __VA_ARGS__ } } \
#define PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id,buffer_id) \
PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_reuse_buffer, \
sizeof(struct pw_client_node_message_reuse_buffer_body), \
PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, \
SPA_POD_INT_INIT(port_id), \
SPA_POD_INT_INIT(buffer_id))

View file

@ -93,8 +93,6 @@ struct proxy {
struct spa_log *log;
struct spa_loop *data_loop;
struct pw_type_event_client_node type_event_client_node;
const struct spa_node_callbacks *callbacks;
void *callbacks_data;
@ -156,7 +154,7 @@ static inline void do_flush(struct proxy *this)
{
uint64_t cmd = 1;
if (write(this->writefd, &cmd, 8) != 8)
spa_log_warn(this->log, "proxy %p: error writing event: %s", this, strerror(errno));
spa_log_warn(this->log, "proxy %p: error flushing : %s", this, strerror(errno));
}
@ -721,9 +719,8 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32
spa_log_trace(this->log, "reuse buffer %d", buffer_id);
{
struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT
(this->type_event_client_node.ReuseBuffer, port_id, buffer_id);
pw_client_node_transport_add_event(impl->transport, (struct spa_event *) &rb);
struct pw_client_node_message_reuse_buffer rb = PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id, buffer_id);
pw_client_node_transport_add_message(impl->transport, (struct pw_client_node_message *) &rb);
}
return SPA_RESULT_OK;
@ -768,8 +765,8 @@ static int spa_proxy_node_process_input(struct spa_node *node)
impl->transport->inputs[i] = *io;
io->status = SPA_RESULT_NEED_BUFFER;
}
pw_client_node_transport_add_event(impl->transport,
&SPA_EVENT_INIT(this->type_event_client_node.ProcessInput));
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
do_flush(this);
if (this->callbacks->need_input)
@ -802,19 +799,19 @@ static int spa_proxy_node_process_output(struct spa_node *node)
*io = tmp;
pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status);
}
pw_client_node_transport_add_event(impl->transport,
&SPA_EVENT_INIT(this->type_event_client_node.ProcessOutput));
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
do_flush(this);
return res;
}
static int handle_node_event(struct proxy *this, struct spa_event *event)
static int handle_node_message(struct proxy *this, struct pw_client_node_message *message)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy);
int i;
if (SPA_EVENT_TYPE(event) == this->type_event_client_node.HaveOutput) {
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) {
for (i = 0; i < MAX_OUTPUTS; i++) {
struct spa_port_io *io = this->out_ports[i].io;
@ -825,11 +822,11 @@ static int handle_node_event(struct proxy *this, struct spa_event *event)
pw_log_trace("%d %d", io->status, io->buffer_id);
}
this->callbacks->have_output(this->callbacks_data);
} else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.NeedInput) {
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) {
this->callbacks->need_input(this->callbacks_data);
} else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) {
struct pw_event_client_node_reuse_buffer *p =
(struct pw_event_client_node_reuse_buffer *) event;
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
struct pw_client_node_message_reuse_buffer *p =
(struct pw_client_node_message_reuse_buffer *) message;
this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value,
p->body.buffer_id.value);
}
@ -930,18 +927,17 @@ static void proxy_on_data_fd_events(struct spa_source *source)
}
if (source->rmask & SPA_IO_IN) {
struct spa_event event;
struct pw_client_node_message message;
uint64_t cmd;
if (read(this->data_source.fd, &cmd, 8) != 8)
spa_log_warn(this->log, "proxy %p: error reading event: %s",
spa_log_warn(this->log, "proxy %p: error reading message: %s",
this, strerror(errno));
while (pw_client_node_transport_next_event(impl->transport, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_client_node_transport_parse_event(impl->transport, ev);
pw_pod_remap(&ev->pod, &this->resource->client->types);;
handle_node_event(this, ev);
while (pw_client_node_transport_next_message(impl->transport, &message) == SPA_RESULT_OK) {
struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message));
pw_client_node_transport_parse_message(impl->transport, msg);
handle_node_message(this, msg);
}
}
}
@ -999,8 +995,6 @@ proxy_init(struct proxy *this,
this->node = proxy_node;
pw_type_event_client_node_map(this->map, &this->type_event_client_node);
this->data_source.func = proxy_on_data_fd_events;
this->data_source.data = this;
this->data_source.fd = -1;

View file

@ -37,7 +37,7 @@ struct transport {
struct pw_memblock mem;
size_t offset;
struct spa_event current;
struct pw_client_node_message current;
uint32_t current_index;
};
/** \endcond */
@ -98,64 +98,64 @@ static void transport_reset_area(struct pw_client_node_transport *trans)
spa_ringbuffer_init(trans->output_buffer, OUTPUT_BUFFER_SIZE);
}
static int add_event(struct pw_client_node_transport *trans, struct spa_event *event)
static int add_message(struct pw_client_node_transport *trans, struct pw_client_node_message *message)
{
struct transport *impl = (struct transport *) trans;
int32_t filled, avail;
uint32_t size, index;
if (impl == NULL || event == NULL)
if (impl == NULL || message == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index);
avail = trans->output_buffer->size - filled;
size = SPA_POD_SIZE(event);
size = SPA_POD_SIZE(message);
if (avail < size)
return SPA_RESULT_ERROR;
spa_ringbuffer_write_data(trans->output_buffer,
trans->output_data,
index & trans->output_buffer->mask, event, size);
index & trans->output_buffer->mask, message, size);
spa_ringbuffer_write_update(trans->output_buffer, index + size);
return SPA_RESULT_OK;
}
static int next_event(struct pw_client_node_transport *trans, struct spa_event *event)
static int next_message(struct pw_client_node_transport *trans, struct pw_client_node_message *message)
{
struct transport *impl = (struct transport *) trans;
int32_t avail;
if (impl == NULL || event == NULL)
if (impl == NULL || message == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index);
if (avail < sizeof(struct spa_event))
if (avail < sizeof(struct pw_client_node_message))
return SPA_RESULT_ENUM_END;
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask,
&impl->current, sizeof(struct spa_event));
&impl->current, sizeof(struct pw_client_node_message));
*event = impl->current;
*message = impl->current;
return SPA_RESULT_OK;
}
static int parse_event(struct pw_client_node_transport *trans, void *event)
static int parse_message(struct pw_client_node_transport *trans, void *message)
{
struct transport *impl = (struct transport *) trans;
uint32_t size;
if (impl == NULL || event == NULL)
if (impl == NULL || message == NULL)
return SPA_RESULT_INVALID_ARGUMENTS;
size = SPA_POD_SIZE(&impl->current);
spa_ringbuffer_read_data(trans->input_buffer,
trans->input_data,
impl->current_index & trans->input_buffer->mask, event, size);
impl->current_index & trans->input_buffer->mask, message, size);
spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size);
return SPA_RESULT_OK;
@ -194,9 +194,9 @@ pw_client_node_transport_new(uint32_t max_input_ports, uint32_t max_output_ports
transport_setup_area(impl->mem.ptr, trans);
transport_reset_area(trans);
trans->add_event = add_event;
trans->next_event = next_event;
trans->parse_event = parse_event;
trans->add_message = add_message;
trans->next_message = next_message;
trans->parse_message = parse_message;
return trans;
}
@ -236,9 +236,9 @@ pw_client_node_transport_new_from_info(struct pw_client_node_transport_info *inf
trans->output_data = trans->input_data;
trans->input_data = tmp;
trans->add_event = add_event;
trans->next_event = next_event;
trans->parse_event = parse_event;
trans->add_message = add_message;
trans->next_message = next_message;
trans->parse_message = parse_message;
return trans;

View file

@ -42,7 +42,6 @@
struct remote {
struct pw_remote this;
uint32_t type_client_node;
struct pw_type_event_client_node type_event_client_node;
struct pw_listener core_listener;
};
@ -221,7 +220,6 @@ struct pw_remote *pw_remote_new(struct pw_core *core,
this->properties = properties;
impl->type_client_node = spa_type_map_get_id(core->type.map, PW_TYPE_INTERFACE__ClientNode);
pw_type_event_client_node_map(core->type.map, &impl->type_event_client_node);
this->state = PW_REMOTE_STATE_UNCONNECTED;
pw_map_init(&this->objects, 64, 32);
@ -404,14 +402,12 @@ static void unhandle_socket(struct pw_proxy *proxy)
}
}
static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event)
static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message)
{
struct node_data *data = proxy->user_data;
struct pw_remote *remote = proxy->remote;
struct remote *this = SPA_CONTAINER_OF(remote, struct remote, this);
struct spa_graph_node *n = &data->node->rt.node;
if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessInput) {
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) {
struct spa_list ready;
struct spa_graph_port *port;
@ -422,13 +418,13 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event)
spa_graph_scheduler_chain(data->node->rt.sched, &ready);
}
else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessOutput) {
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
n->callbacks->process_output(n->callbacks_data);
}
else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) {
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
}
else {
pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event));
pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
}
}
@ -446,15 +442,15 @@ on_rtsocket_condition(struct spa_loop_utils *utils,
}
if (mask & SPA_IO_IN) {
struct spa_event event;
struct pw_client_node_message message;
uint64_t cmd;
read(data->rtreadfd, &cmd, 8);
while (pw_client_node_transport_next_event(data->trans, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_client_node_transport_parse_event(data->trans, ev);
handle_rtnode_event(proxy, ev);
while (pw_client_node_transport_next_message(data->trans, &message) == SPA_RESULT_OK) {
struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message));
pw_client_node_transport_parse_message(data->trans, msg);
handle_rtnode_message(proxy, msg);
}
}
}
@ -913,22 +909,20 @@ static const struct pw_client_node_proxy_events client_node_events = {
static void node_need_input(void *data)
{
struct node_data *d = data;
struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this);
uint64_t cmd = 1;
pw_client_node_transport_add_event(d->trans,
&SPA_EVENT_INIT(this->type_event_client_node.NeedInput));
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(d->rtwritefd, &cmd, 8);
}
static void node_have_output(void *data)
{
struct node_data *d = data;
struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this);
uint64_t cmd = 1;
pw_client_node_transport_add_event(d->trans,
&SPA_EVENT_INIT(this->type_event_client_node.HaveOutput));
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(d->rtwritefd, &cmd, 8);
}

View file

@ -63,7 +63,6 @@ struct stream {
struct pw_stream this;
uint32_t type_client_node;
struct pw_type_event_client_node type_event_client_node;
uint32_t n_possible_formats;
struct spa_format **possible_formats;
@ -208,7 +207,6 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
this->remote = remote;
this->name = strdup(name);
impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode);
pw_type_event_client_node_map(remote->core->type.map, &impl->type_event_client_node);
pw_listener_list_init(&this->listener_list);
@ -386,8 +384,8 @@ static inline void send_need_input(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1;
pw_client_node_transport_add_event(impl->trans,
&SPA_EVENT_INIT(impl->type_event_client_node.NeedInput));
pw_client_node_transport_add_message(impl->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(impl->rtwritefd, &cmd, 8);
#endif
}
@ -397,8 +395,8 @@ static inline void send_have_output(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1;
pw_client_node_transport_add_event(impl->trans,
&SPA_EVENT_INIT(impl->type_event_client_node.HaveOutput));
pw_client_node_transport_add_message(impl->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(impl->rtwritefd, &cmd, 8);
}
@ -482,11 +480,11 @@ static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
}
}
static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *event)
static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_node_message *message)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessInput) {
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
@ -502,7 +500,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
input->buffer_id = SPA_ID_INVALID;
}
send_need_input(stream);
} else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessOutput) {
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
int i;
for (i = 0; i < impl->trans->area->n_output_ports; i++) {
@ -518,9 +516,9 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
impl->in_need_buffer = true;
pw_listener_list_emit_na(&stream->listener_list, struct pw_stream_events, need_buffer);
impl->in_need_buffer = false;
} else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ReuseBuffer) {
struct pw_event_client_node_reuse_buffer *p =
(struct pw_event_client_node_reuse_buffer *) event;
} else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
struct pw_client_node_message_reuse_buffer *p =
(struct pw_client_node_message_reuse_buffer *) message;
if (p->body.port_id.value != impl->port_id)
return;
@ -529,7 +527,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
reuse_buffer(stream, p->body.buffer_id.value);
} else {
pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event));
pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
}
}
@ -547,16 +545,15 @@ on_rtsocket_condition(struct spa_loop_utils *utils,
}
if (mask & SPA_IO_IN) {
struct spa_event event;
struct pw_client_node_message message;
uint64_t cmd;
read(impl->rtreadfd, &cmd, 8);
while (pw_client_node_transport_next_event(impl->trans, &event) == SPA_RESULT_OK) {
struct spa_event *ev = alloca(SPA_POD_SIZE(&event));
pw_client_node_transport_parse_event(impl->trans, ev);
pw_pod_remap(&ev->pod, &stream->remote->types);;
handle_rtnode_event(stream, ev);
while (pw_client_node_transport_next_message(impl->trans, &message) == SPA_RESULT_OK) {
struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message));
pw_client_node_transport_parse_message(impl->trans, msg);
handle_rtnode_message(stream, msg);
}
}
}
@ -1018,8 +1015,8 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT
(impl->type_event_client_node.ReuseBuffer, impl->port_id, id);
struct pw_client_node_message_reuse_buffer rb = PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT
(impl->port_id, id);
struct buffer_id *bid;
uint64_t cmd = 1;
@ -1029,7 +1026,7 @@ bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
bid->used = false;
spa_list_insert(impl->free.prev, &bid->link);
pw_client_node_transport_add_event(impl->trans, (struct spa_event *) &rb);
pw_client_node_transport_add_message(impl->trans, (struct pw_client_node_message *) &rb);
write(impl->rtwritefd, &cmd, 8);
return true;