diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 7b9e30d93..1c6cd4606 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -38,6 +38,8 @@ struct pw_client_node_proxy; #define PW_VERSION_CLIENT_NODE 0 +struct pw_client_node_message; + /** Shared structure between client and server \memberof pw_client_node */ struct pw_client_node_area { uint32_t max_input_ports; /**< max input ports of the node */ @@ -63,88 +65,86 @@ struct pw_client_node_transport { void *output_data; /**< output memory for ringbuffer */ struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */ - /** Add an event to the transport - * \param trans the transport to send the event on - * \param event the event to add + /** Add a message to the transport + * \param trans the transport to send the message on + * \param message the message to add * \return 0 on success, < 0 on error * - * Write \a event to the shared ringbuffer. + * Write \a message to the shared ringbuffer. */ - int (*add_event) (struct pw_client_node_transport *trans, struct spa_event *event); + int (*add_message) (struct pw_client_node_transport *trans, struct pw_client_node_message *message); - /** Get next event from a transport - * \param trans the transport to get the event of - * \param[out] event the event to read - * \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more events + /** Get next message from a transport + * \param trans the transport to get the message of + * \param[out] message the message to read + * \return 0 on success, < 0 on error, SPA_RESULT_ENUM_END when no more messages * are available. * - * Get the skeleton next event from \a trans into \a event. This function will - * only read the head and object body of the event. + * Get the skeleton next message from \a trans into \a message. This function will + * only read the head and object body of the message. * - * After the complete size of the event has been calculated, you should call - * \ref parse_event() to read the complete event contents. + * After the complete size of the message has been calculated, you should call + * \ref parse_message() to read the complete message contents. */ - int (*next_event) (struct pw_client_node_transport *trans, struct spa_event *event); + int (*next_message) (struct pw_client_node_transport *trans, struct pw_client_node_message *message); - /** Parse the complete event on transport + /** Parse the complete message on transport * \param trans the transport to read from - * \param[out] event memory that can hold the complete event + * \param[out] message memory that can hold the complete message * \return 0 on success, < 0 on error * - * Use this function after \ref next_event(). - * + * Use this function after \ref next_message(). */ - int (*parse_event) (struct pw_client_node_transport *trans, void *event); + int (*parse_message) (struct pw_client_node_transport *trans, void *message); }; -#define pw_client_node_transport_add_event(t,e) ((t)->add_event((t), (e))) -#define pw_client_node_transport_next_event(t,e) ((t)->next_event((t), (e))) -#define pw_client_node_transport_parse_event(t,e) ((t)->parse_event((t), (e))) +#define pw_client_node_transport_add_message(t,m) ((t)->add_message((t), (m))) +#define pw_client_node_transport_next_message(t,m) ((t)->next_message((t), (m))) +#define pw_client_node_transport_parse_message(t,m) ((t)->parse_message((t), (m))) -#define PW_TYPE_EVENT__ClientNode SPA_TYPE_EVENT_BASE "ClientNode" -#define PW_TYPE_EVENT_CLIENT_NODE_BASE PW_TYPE_EVENT__ClientNode ":" - -#define PW_TYPE_EVENT_CLIENT_NODE__HaveOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "HaveOutput" -#define PW_TYPE_EVENT_CLIENT_NODE__NeedInput PW_TYPE_EVENT_CLIENT_NODE_BASE "NeedInput" -#define PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer PW_TYPE_EVENT_CLIENT_NODE_BASE "ReuseBuffer" -#define PW_TYPE_EVENT_CLIENT_NODE__ProcessInput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessInput" -#define PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput PW_TYPE_EVENT_CLIENT_NODE_BASE "ProcessOutput" - -struct pw_type_event_client_node { - uint32_t HaveOutput; - uint32_t NeedInput; - uint32_t ReuseBuffer; - uint32_t ProcessInput; - uint32_t ProcessOutput; +enum pw_client_node_message_type { + PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT, + PW_CLIENT_NODE_MESSAGE_NEED_INPUT, + PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, + PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT, + PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT, }; -static inline void -pw_type_event_client_node_map(struct spa_type_map *map, struct pw_type_event_client_node *type) -{ - if (type->HaveOutput == 0) { - type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__HaveOutput); - type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__NeedInput); - type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ReuseBuffer); - type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessInput); - type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_CLIENT_NODE__ProcessOutput); - } -} - -struct pw_event_client_node_reuse_buffer_body { - struct spa_pod_object_body body; - struct spa_pod_int port_id; - struct spa_pod_int buffer_id; +struct pw_client_node_message_body { + struct spa_pod_int type SPA_ALIGNED(8); }; -struct pw_event_client_node_reuse_buffer { - struct spa_pod pod; - struct pw_event_client_node_reuse_buffer_body body; +struct pw_client_node_message { + struct spa_pod_struct pod; + struct pw_client_node_message_body body; }; -#define PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT(type,port_id,buffer_id) \ - SPA_EVENT_INIT_COMPLEX(struct pw_event_client_node_reuse_buffer, \ - sizeof(struct pw_event_client_node_reuse_buffer_body), type, \ - SPA_POD_INT_INIT(port_id), \ +struct pw_client_node_message_reuse_buffer_body { + struct spa_pod_int type SPA_ALIGNED(8); + struct spa_pod_int port_id SPA_ALIGNED(8); + struct spa_pod_int buffer_id SPA_ALIGNED(8); +}; + +struct pw_client_node_message_reuse_buffer { + struct spa_pod_struct pod; + struct pw_client_node_message_reuse_buffer_body body; +}; + +#define PW_CLIENT_NODE_MESSAGE_TYPE(message) (((struct pw_client_node_message*)(message))->body.type.value) + +#define PW_CLIENT_NODE_MESSAGE_INIT(ev) (struct pw_client_node_message) \ + { { { sizeof(struct pw_client_node_message_body), SPA_POD_TYPE_STRUCT } }, \ + { SPA_POD_INT_INIT(ev) } } + +#define PW_CLIENT_NODE_MESSAGE_INIT_VA(type,size,message,...) (type) \ + { { { size, SPA_POD_TYPE_STRUCT } }, \ + { SPA_POD_INT_INIT(message), __VA_ARGS__ } } \ + +#define PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id,buffer_id) \ + PW_CLIENT_NODE_MESSAGE_INIT_VA(struct pw_client_node_message_reuse_buffer, \ + sizeof(struct pw_client_node_message_reuse_buffer_body), \ + PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER, \ + SPA_POD_INT_INIT(port_id), \ SPA_POD_INT_INIT(buffer_id)) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 888460e3d..58487493d 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -93,8 +93,6 @@ struct proxy { struct spa_log *log; struct spa_loop *data_loop; - struct pw_type_event_client_node type_event_client_node; - const struct spa_node_callbacks *callbacks; void *callbacks_data; @@ -156,7 +154,7 @@ static inline void do_flush(struct proxy *this) { uint64_t cmd = 1; if (write(this->writefd, &cmd, 8) != 8) - spa_log_warn(this->log, "proxy %p: error writing event: %s", this, strerror(errno)); + spa_log_warn(this->log, "proxy %p: error flushing : %s", this, strerror(errno)); } @@ -721,9 +719,8 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32 spa_log_trace(this->log, "reuse buffer %d", buffer_id); { - struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT - (this->type_event_client_node.ReuseBuffer, port_id, buffer_id); - pw_client_node_transport_add_event(impl->transport, (struct spa_event *) &rb); + struct pw_client_node_message_reuse_buffer rb = PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT(port_id, buffer_id); + pw_client_node_transport_add_message(impl->transport, (struct pw_client_node_message *) &rb); } return SPA_RESULT_OK; @@ -768,8 +765,8 @@ static int spa_proxy_node_process_input(struct spa_node *node) impl->transport->inputs[i] = *io; io->status = SPA_RESULT_NEED_BUFFER; } - pw_client_node_transport_add_event(impl->transport, - &SPA_EVENT_INIT(this->type_event_client_node.ProcessInput)); + pw_client_node_transport_add_message(impl->transport, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT)); do_flush(this); if (this->callbacks->need_input) @@ -802,19 +799,19 @@ static int spa_proxy_node_process_output(struct spa_node *node) *io = tmp; pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status); } - pw_client_node_transport_add_event(impl->transport, - &SPA_EVENT_INIT(this->type_event_client_node.ProcessOutput)); + pw_client_node_transport_add_message(impl->transport, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT)); do_flush(this); return res; } -static int handle_node_event(struct proxy *this, struct spa_event *event) +static int handle_node_message(struct proxy *this, struct pw_client_node_message *message) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy); int i; - if (SPA_EVENT_TYPE(event) == this->type_event_client_node.HaveOutput) { + if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT) { for (i = 0; i < MAX_OUTPUTS; i++) { struct spa_port_io *io = this->out_ports[i].io; @@ -825,11 +822,11 @@ static int handle_node_event(struct proxy *this, struct spa_event *event) pw_log_trace("%d %d", io->status, io->buffer_id); } this->callbacks->have_output(this->callbacks_data); - } else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.NeedInput) { + } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_NEED_INPUT) { this->callbacks->need_input(this->callbacks_data); - } else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) { - struct pw_event_client_node_reuse_buffer *p = - (struct pw_event_client_node_reuse_buffer *) event; + } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { + struct pw_client_node_message_reuse_buffer *p = + (struct pw_client_node_message_reuse_buffer *) message; this->callbacks->reuse_buffer(this->callbacks_data, p->body.port_id.value, p->body.buffer_id.value); } @@ -930,18 +927,17 @@ static void proxy_on_data_fd_events(struct spa_source *source) } if (source->rmask & SPA_IO_IN) { - struct spa_event event; + struct pw_client_node_message message; uint64_t cmd; if (read(this->data_source.fd, &cmd, 8) != 8) - spa_log_warn(this->log, "proxy %p: error reading event: %s", + spa_log_warn(this->log, "proxy %p: error reading message: %s", this, strerror(errno)); - while (pw_client_node_transport_next_event(impl->transport, &event) == SPA_RESULT_OK) { - struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); - pw_client_node_transport_parse_event(impl->transport, ev); - pw_pod_remap(&ev->pod, &this->resource->client->types);; - handle_node_event(this, ev); + while (pw_client_node_transport_next_message(impl->transport, &message) == SPA_RESULT_OK) { + struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node_transport_parse_message(impl->transport, msg); + handle_node_message(this, msg); } } } @@ -999,8 +995,6 @@ proxy_init(struct proxy *this, this->node = proxy_node; - pw_type_event_client_node_map(this->map, &this->type_event_client_node); - this->data_source.func = proxy_on_data_fd_events; this->data_source.data = this; this->data_source.fd = -1; diff --git a/src/modules/module-client-node/transport.c b/src/modules/module-client-node/transport.c index 7b256f522..b2c815029 100644 --- a/src/modules/module-client-node/transport.c +++ b/src/modules/module-client-node/transport.c @@ -37,7 +37,7 @@ struct transport { struct pw_memblock mem; size_t offset; - struct spa_event current; + struct pw_client_node_message current; uint32_t current_index; }; /** \endcond */ @@ -98,64 +98,64 @@ static void transport_reset_area(struct pw_client_node_transport *trans) spa_ringbuffer_init(trans->output_buffer, OUTPUT_BUFFER_SIZE); } -static int add_event(struct pw_client_node_transport *trans, struct spa_event *event) +static int add_message(struct pw_client_node_transport *trans, struct pw_client_node_message *message) { struct transport *impl = (struct transport *) trans; int32_t filled, avail; uint32_t size, index; - if (impl == NULL || event == NULL) + if (impl == NULL || message == NULL) return SPA_RESULT_INVALID_ARGUMENTS; filled = spa_ringbuffer_get_write_index(trans->output_buffer, &index); avail = trans->output_buffer->size - filled; - size = SPA_POD_SIZE(event); + size = SPA_POD_SIZE(message); if (avail < size) return SPA_RESULT_ERROR; spa_ringbuffer_write_data(trans->output_buffer, trans->output_data, - index & trans->output_buffer->mask, event, size); + index & trans->output_buffer->mask, message, size); spa_ringbuffer_write_update(trans->output_buffer, index + size); return SPA_RESULT_OK; } -static int next_event(struct pw_client_node_transport *trans, struct spa_event *event) +static int next_message(struct pw_client_node_transport *trans, struct pw_client_node_message *message) { struct transport *impl = (struct transport *) trans; int32_t avail; - if (impl == NULL || event == NULL) + if (impl == NULL || message == NULL) return SPA_RESULT_INVALID_ARGUMENTS; avail = spa_ringbuffer_get_read_index(trans->input_buffer, &impl->current_index); - if (avail < sizeof(struct spa_event)) + if (avail < sizeof(struct pw_client_node_message)) return SPA_RESULT_ENUM_END; spa_ringbuffer_read_data(trans->input_buffer, trans->input_data, impl->current_index & trans->input_buffer->mask, - &impl->current, sizeof(struct spa_event)); + &impl->current, sizeof(struct pw_client_node_message)); - *event = impl->current; + *message = impl->current; return SPA_RESULT_OK; } -static int parse_event(struct pw_client_node_transport *trans, void *event) +static int parse_message(struct pw_client_node_transport *trans, void *message) { struct transport *impl = (struct transport *) trans; uint32_t size; - if (impl == NULL || event == NULL) + if (impl == NULL || message == NULL) return SPA_RESULT_INVALID_ARGUMENTS; size = SPA_POD_SIZE(&impl->current); spa_ringbuffer_read_data(trans->input_buffer, trans->input_data, - impl->current_index & trans->input_buffer->mask, event, size); + impl->current_index & trans->input_buffer->mask, message, size); spa_ringbuffer_read_update(trans->input_buffer, impl->current_index + size); return SPA_RESULT_OK; @@ -194,9 +194,9 @@ pw_client_node_transport_new(uint32_t max_input_ports, uint32_t max_output_ports transport_setup_area(impl->mem.ptr, trans); transport_reset_area(trans); - trans->add_event = add_event; - trans->next_event = next_event; - trans->parse_event = parse_event; + trans->add_message = add_message; + trans->next_message = next_message; + trans->parse_message = parse_message; return trans; } @@ -236,9 +236,9 @@ pw_client_node_transport_new_from_info(struct pw_client_node_transport_info *inf trans->output_data = trans->input_data; trans->input_data = tmp; - trans->add_event = add_event; - trans->next_event = next_event; - trans->parse_event = parse_event; + trans->add_message = add_message; + trans->next_message = next_message; + trans->parse_message = parse_message; return trans; diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index c660399f8..b92f37863 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -42,7 +42,6 @@ struct remote { struct pw_remote this; uint32_t type_client_node; - struct pw_type_event_client_node type_event_client_node; struct pw_listener core_listener; }; @@ -221,7 +220,6 @@ struct pw_remote *pw_remote_new(struct pw_core *core, this->properties = properties; impl->type_client_node = spa_type_map_get_id(core->type.map, PW_TYPE_INTERFACE__ClientNode); - pw_type_event_client_node_map(core->type.map, &impl->type_event_client_node); this->state = PW_REMOTE_STATE_UNCONNECTED; pw_map_init(&this->objects, 64, 32); @@ -404,14 +402,12 @@ static void unhandle_socket(struct pw_proxy *proxy) } } -static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event) +static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message) { struct node_data *data = proxy->user_data; - struct pw_remote *remote = proxy->remote; - struct remote *this = SPA_CONTAINER_OF(remote, struct remote, this); struct spa_graph_node *n = &data->node->rt.node; - if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessInput) { + if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) { struct spa_list ready; struct spa_graph_port *port; @@ -422,13 +418,13 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event) spa_graph_scheduler_chain(data->node->rt.sched, &ready); } - else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ProcessOutput) { + else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { n->callbacks->process_output(n->callbacks_data); } - else if (SPA_EVENT_TYPE(event) == this->type_event_client_node.ReuseBuffer) { + else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { } else { - pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event)); + pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); } } @@ -446,15 +442,15 @@ on_rtsocket_condition(struct spa_loop_utils *utils, } if (mask & SPA_IO_IN) { - struct spa_event event; + struct pw_client_node_message message; uint64_t cmd; read(data->rtreadfd, &cmd, 8); - while (pw_client_node_transport_next_event(data->trans, &event) == SPA_RESULT_OK) { - struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); - pw_client_node_transport_parse_event(data->trans, ev); - handle_rtnode_event(proxy, ev); + while (pw_client_node_transport_next_message(data->trans, &message) == SPA_RESULT_OK) { + struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node_transport_parse_message(data->trans, msg); + handle_rtnode_message(proxy, msg); } } } @@ -913,22 +909,20 @@ static const struct pw_client_node_proxy_events client_node_events = { static void node_need_input(void *data) { struct node_data *d = data; - struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this); uint64_t cmd = 1; - pw_client_node_transport_add_event(d->trans, - &SPA_EVENT_INIT(this->type_event_client_node.NeedInput)); + pw_client_node_transport_add_message(d->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); write(d->rtwritefd, &cmd, 8); } static void node_have_output(void *data) { struct node_data *d = data; - struct remote *this = SPA_CONTAINER_OF(d->remote, struct remote, this); uint64_t cmd = 1; - pw_client_node_transport_add_event(d->trans, - &SPA_EVENT_INIT(this->type_event_client_node.HaveOutput)); + pw_client_node_transport_add_message(d->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); write(d->rtwritefd, &cmd, 8); } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 2ca06bd78..e682175d0 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -63,7 +63,6 @@ struct stream { struct pw_stream this; uint32_t type_client_node; - struct pw_type_event_client_node type_event_client_node; uint32_t n_possible_formats; struct spa_format **possible_formats; @@ -208,7 +207,6 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, this->remote = remote; this->name = strdup(name); impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); - pw_type_event_client_node_map(remote->core->type.map, &impl->type_event_client_node); pw_listener_list_init(&this->listener_list); @@ -386,8 +384,8 @@ static inline void send_need_input(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); uint64_t cmd = 1; - pw_client_node_transport_add_event(impl->trans, - &SPA_EVENT_INIT(impl->type_event_client_node.NeedInput)); + pw_client_node_transport_add_message(impl->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); write(impl->rtwritefd, &cmd, 8); #endif } @@ -397,8 +395,8 @@ static inline void send_have_output(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); uint64_t cmd = 1; - pw_client_node_transport_add_event(impl->trans, - &SPA_EVENT_INIT(impl->type_event_client_node.HaveOutput)); + pw_client_node_transport_add_message(impl->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); write(impl->rtwritefd, &cmd, 8); } @@ -482,11 +480,11 @@ static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) } } -static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *event) +static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_node_message *message) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessInput) { + if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) { int i; for (i = 0; i < impl->trans->area->n_input_ports; i++) { @@ -502,7 +500,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even input->buffer_id = SPA_ID_INVALID; } send_need_input(stream); - } else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ProcessOutput) { + } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) { int i; for (i = 0; i < impl->trans->area->n_output_ports; i++) { @@ -518,9 +516,9 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even impl->in_need_buffer = true; pw_listener_list_emit_na(&stream->listener_list, struct pw_stream_events, need_buffer); impl->in_need_buffer = false; - } else if (SPA_EVENT_TYPE(event) == impl->type_event_client_node.ReuseBuffer) { - struct pw_event_client_node_reuse_buffer *p = - (struct pw_event_client_node_reuse_buffer *) event; + } else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) { + struct pw_client_node_message_reuse_buffer *p = + (struct pw_client_node_message_reuse_buffer *) message; if (p->body.port_id.value != impl->port_id) return; @@ -529,7 +527,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even reuse_buffer(stream, p->body.buffer_id.value); } else { - pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event)); + pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); } } @@ -547,16 +545,15 @@ on_rtsocket_condition(struct spa_loop_utils *utils, } if (mask & SPA_IO_IN) { - struct spa_event event; + struct pw_client_node_message message; uint64_t cmd; read(impl->rtreadfd, &cmd, 8); - while (pw_client_node_transport_next_event(impl->trans, &event) == SPA_RESULT_OK) { - struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); - pw_client_node_transport_parse_event(impl->trans, ev); - pw_pod_remap(&ev->pod, &stream->remote->types);; - handle_rtnode_event(stream, ev); + while (pw_client_node_transport_next_message(impl->trans, &message) == SPA_RESULT_OK) { + struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node_transport_parse_message(impl->trans, msg); + handle_rtnode_message(stream, msg); } } } @@ -1018,8 +1015,8 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct pw_event_client_node_reuse_buffer rb = PW_EVENT_CLIENT_NODE_REUSE_BUFFER_INIT - (impl->type_event_client_node.ReuseBuffer, impl->port_id, id); + struct pw_client_node_message_reuse_buffer rb = PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER_INIT + (impl->port_id, id); struct buffer_id *bid; uint64_t cmd = 1; @@ -1029,7 +1026,7 @@ bool pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) bid->used = false; spa_list_insert(impl->free.prev, &bid->link); - pw_client_node_transport_add_event(impl->trans, (struct spa_event *) &rb); + pw_client_node_transport_add_message(impl->trans, (struct pw_client_node_message *) &rb); write(impl->rtwritefd, &cmd, 8); return true;