diff --git a/pipewire/client/context.c b/pipewire/client/context.c index 68b364e8d..bd783fd0a 100644 --- a/pipewire/client/context.c +++ b/pipewire/client/context.c @@ -119,7 +119,8 @@ static void core_event_done(void *object, uint32_t seq) pw_core_do_sync(this->core_proxy, 1); } else if (seq == 1) { context_set_state(this, PW_CONTEXT_STATE_CONNECTED, NULL); - } + } else + pw_signal_emit(&this->sync_done, this, seq); } static void core_event_error(void *object, uint32_t id, int res, const char *error, ...) @@ -467,6 +468,7 @@ struct pw_context *pw_context_new(struct pw_loop *loop, pw_signal_init(&this->state_changed); pw_signal_init(&this->subscription); pw_signal_init(&this->destroy_signal); + pw_signal_init(&this->sync_done); return this; diff --git a/pipewire/client/context.h b/pipewire/client/context.h index 33a31ad86..43a8e65fa 100644 --- a/pipewire/client/context.h +++ b/pipewire/client/context.h @@ -173,6 +173,8 @@ struct pw_context { /** Signal emited when the context is destroyed */ PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_context *context)); + + PW_SIGNAL(sync_done, (struct pw_listener *listener, struct pw_context *context, int seq)); }; struct pw_context * diff --git a/pipewire/client/interfaces.h b/pipewire/client/interfaces.h index 01e8c3cb2..0462fcec0 100644 --- a/pipewire/client/interfaces.h +++ b/pipewire/client/interfaces.h @@ -307,14 +307,18 @@ struct pw_client_node_buffer { struct spa_buffer *buffer; /**< buffer describing metadata and buffer memory */ }; -#define PW_CLIENT_NODE_METHOD_UPDATE 0 -#define PW_CLIENT_NODE_METHOD_PORT_UPDATE 1 -#define PW_CLIENT_NODE_METHOD_EVENT 2 -#define PW_CLIENT_NODE_METHOD_DESTROY 3 -#define PW_CLIENT_NODE_METHOD_NUM 4 +#define PW_CLIENT_NODE_METHOD_DONE 0 +#define PW_CLIENT_NODE_METHOD_UPDATE 1 +#define PW_CLIENT_NODE_METHOD_PORT_UPDATE 2 +#define PW_CLIENT_NODE_METHOD_EVENT 3 +#define PW_CLIENT_NODE_METHOD_DESTROY 4 +#define PW_CLIENT_NODE_METHOD_NUM 5 /** \ref pw_client_node methods */ struct pw_client_node_methods { + /** Complete an async operation */ + void (*done) (void *object, int seq, int res); + /** * Update the node ports and properties * @@ -373,37 +377,27 @@ struct pw_client_node_methods { void (*destroy) (void *object); }; +#define pw_client_node_do_done(r,...) ((struct pw_client_node_methods*)r->iface->methods)->done(r,__VA_ARGS__) #define pw_client_node_do_update(r,...) ((struct pw_client_node_methods*)r->iface->methods)->update(r,__VA_ARGS__) #define pw_client_node_do_port_update(r,...) ((struct pw_client_node_methods*)r->iface->methods)->port_update(r,__VA_ARGS__) #define pw_client_node_do_event(r,...) ((struct pw_client_node_methods*)r->iface->methods)->event(r,__VA_ARGS__) #define pw_client_node_do_destroy(r) ((struct pw_client_node_methods*)r->iface->methods)->destroy(r) -#define PW_CLIENT_NODE_EVENT_DONE 0 -#define PW_CLIENT_NODE_EVENT_SET_PROPS 1 -#define PW_CLIENT_NODE_EVENT_EVENT 2 -#define PW_CLIENT_NODE_EVENT_ADD_PORT 3 -#define PW_CLIENT_NODE_EVENT_REMOVE_PORT 4 -#define PW_CLIENT_NODE_EVENT_SET_FORMAT 5 -#define PW_CLIENT_NODE_EVENT_SET_PARAM 6 -#define PW_CLIENT_NODE_EVENT_ADD_MEM 7 -#define PW_CLIENT_NODE_EVENT_USE_BUFFERS 8 -#define PW_CLIENT_NODE_EVENT_NODE_COMMAND 9 -#define PW_CLIENT_NODE_EVENT_PORT_COMMAND 10 -#define PW_CLIENT_NODE_EVENT_TRANSPORT 11 -#define PW_CLIENT_NODE_EVENT_NUM 12 +#define PW_CLIENT_NODE_EVENT_SET_PROPS 0 +#define PW_CLIENT_NODE_EVENT_EVENT 1 +#define PW_CLIENT_NODE_EVENT_ADD_PORT 2 +#define PW_CLIENT_NODE_EVENT_REMOVE_PORT 3 +#define PW_CLIENT_NODE_EVENT_SET_FORMAT 4 +#define PW_CLIENT_NODE_EVENT_SET_PARAM 5 +#define PW_CLIENT_NODE_EVENT_ADD_MEM 6 +#define PW_CLIENT_NODE_EVENT_USE_BUFFERS 7 +#define PW_CLIENT_NODE_EVENT_NODE_COMMAND 8 +#define PW_CLIENT_NODE_EVENT_PORT_COMMAND 9 +#define PW_CLIENT_NODE_EVENT_TRANSPORT 10 +#define PW_CLIENT_NODE_EVENT_NUM 11 /** \ref pw_client_node events */ struct pw_client_node_events { - /** - * Notify the creation of the client node - * - * A set of sockets are exchanged that are used to notify when - * commands are available for reading and writing. - * - * \param readfd the fd used for receiving commands - * \param writefd the fd used for sending command - */ - void (*done) (void *object, int readfd, int writefd); /** * Notify of a property change * @@ -538,10 +532,9 @@ struct pw_client_node_events { * \param offset the offset to map * \param size the size to map */ - void (*transport) (void *object, int memfd, uint32_t offset, uint32_t size); + void (*transport) (void *object, int readfd, int writefd, int memfd, uint32_t offset, uint32_t size); }; -#define pw_client_node_notify_done(r,...) ((struct pw_client_node_events*)r->iface->events)->done(r,__VA_ARGS__) #define pw_client_node_notify_set_props(r,...) ((struct pw_client_node_events*)r->iface->events)->props(r,__VA_ARGS__) #define pw_client_node_notify_event(r,...) ((struct pw_client_node_events*)r->iface->events)->event(r,__VA_ARGS__) #define pw_client_node_notify_add_port(r,...) ((struct pw_client_node_events*)r->iface->events)->add_port(r,__VA_ARGS__) diff --git a/pipewire/client/protocol-native.c b/pipewire/client/protocol-native.c index 97ef68aa9..e7ec11d33 100644 --- a/pipewire/client/protocol-native.c +++ b/pipewire/client/protocol-native.c @@ -409,6 +409,26 @@ static bool node_demarshal_info(void *object, void *data, size_t size) return true; } +static void +client_node_marshal_done(void *object, int seq, int res) +{ + struct pw_proxy *proxy = object; + struct pw_connection *connection = proxy->context->protocol_private; + struct builder b = { {NULL, 0, 0, NULL, write_pod}, connection }; + struct spa_pod_frame f; + + if (connection == NULL) + return; + + core_update_map(proxy->context); + + spa_pod_builder_struct(&b.b, &f, + SPA_POD_TYPE_INT, seq, + SPA_POD_TYPE_INT, res); + + pw_connection_end_write(connection, proxy->id, PW_CLIENT_NODE_METHOD_DONE, b.b.offset); +} + static void client_node_marshal_update(void *object, uint32_t change_mask, @@ -519,27 +539,6 @@ static void client_node_marshal_destroy(void *object) pw_connection_end_write(connection, proxy->id, PW_CLIENT_NODE_METHOD_DESTROY, b.b.offset); } -static bool client_node_demarshal_done(void *object, void *data, size_t size) -{ - struct pw_proxy *proxy = object; - struct spa_pod_iter it; - struct pw_connection *connection = proxy->context->protocol_private; - int32_t ridx, widx; - int readfd, writefd; - - if (!spa_pod_iter_struct(&it, data, size) || - !spa_pod_iter_get(&it, SPA_POD_TYPE_INT, &ridx, SPA_POD_TYPE_INT, &widx, 0)) - return false; - - readfd = pw_connection_get_fd(connection, ridx); - writefd = pw_connection_get_fd(connection, widx); - if (readfd == -1 || writefd == -1) - return false; - - ((struct pw_client_node_events *) proxy->implementation)->done(proxy, readfd, writefd); - return true; -} - static bool client_node_demarshal_set_props(void *object, void *data, size_t size) { struct pw_proxy *proxy = object; @@ -787,19 +786,27 @@ static bool client_node_demarshal_transport(void *object, void *data, size_t siz struct pw_proxy *proxy = object; struct spa_pod_iter it; struct pw_connection *connection = proxy->context->protocol_private; - uint32_t memfd_idx, offset, sz; - int memfd; + uint32_t ridx, widx, memfd_idx, offset, sz; + int readfd, writefd, memfd; if (!spa_pod_iter_struct(&it, data, size) || !spa_pod_iter_get(&it, + SPA_POD_TYPE_INT, &ridx, + SPA_POD_TYPE_INT, &widx, SPA_POD_TYPE_INT, &memfd_idx, SPA_POD_TYPE_INT, &offset, SPA_POD_TYPE_INT, &sz, 0)) return false; + readfd = pw_connection_get_fd(connection, ridx); + writefd = pw_connection_get_fd(connection, widx); memfd = pw_connection_get_fd(connection, memfd_idx); - ((struct pw_client_node_events *) proxy->implementation)->transport(proxy, memfd, offset, - sz); + if (readfd == -1 || writefd == -1 || memfd_idx == -1) + return false; + + ((struct pw_client_node_events *) proxy->implementation)->transport(proxy, + readfd, writefd, + memfd, offset, sz); return true; } @@ -941,6 +948,7 @@ static const struct pw_interface pw_protocol_native_client_registry_interface = }; static const struct pw_client_node_methods pw_protocol_native_client_client_node_methods = { + &client_node_marshal_done, &client_node_marshal_update, &client_node_marshal_port_update, &client_node_marshal_event, @@ -948,7 +956,6 @@ static const struct pw_client_node_methods pw_protocol_native_client_client_node }; static const demarshal_func_t pw_protocol_native_client_client_node_demarshal[] = { - &client_node_demarshal_done, &client_node_demarshal_set_props, &client_node_demarshal_event, &client_node_demarshal_add_port, diff --git a/pipewire/client/stream.c b/pipewire/client/stream.c index 92a7f83b7..a4df972e1 100644 --- a/pipewire/client/stream.c +++ b/pipewire/client/stream.c @@ -84,6 +84,7 @@ struct stream { struct pw_proxy *node_proxy; bool disconnecting; struct pw_listener node_proxy_destroy; + struct pw_listener node_proxy_sync_done; struct pw_transport *trans; @@ -405,11 +406,7 @@ static void add_async_complete(struct pw_stream *stream, uint32_t seq, int res) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_client_node_do_event(impl->node_proxy, (struct spa_event *) - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(stream->context->type. - event_node.AsyncComplete, seq, - res)); - + pw_client_node_do_done(impl->node_proxy, seq, res); } static void do_node_init(struct pw_stream *stream) @@ -625,18 +622,6 @@ handle_node_command(struct pw_stream *stream, uint32_t seq, const struct spa_com return true; } -static void client_node_done(void *object, int readfd, int writefd) -{ - struct pw_proxy *proxy = object; - struct pw_stream *stream = proxy->user_data; - - pw_log_info("stream %p: create client node done with fds %d %d", stream, readfd, writefd); - handle_socket(stream, readfd, writefd); - do_node_init(stream); - - stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); -} - static void client_node_set_props(void *object, uint32_t seq, const struct spa_props *props) { @@ -856,7 +841,7 @@ client_node_port_command(void *object, pw_log_warn("port command not supported"); } -static void client_node_transport(void *object, int memfd, uint32_t offset, uint32_t size) +static void client_node_transport(void *object, int readfd, int writefd, int memfd, uint32_t offset, uint32_t size) { struct pw_proxy *proxy = object; struct pw_stream *stream = proxy->user_data; @@ -873,11 +858,13 @@ static void client_node_transport(void *object, int memfd, uint32_t offset, uint pw_transport_destroy(impl->trans); impl->trans = pw_transport_new_from_info(&info); - pw_log_debug("transport update %p", impl->trans); + pw_log_info("stream %p: create client transport %p with fds %d %d", stream, impl->trans, readfd, writefd); + handle_socket(stream, readfd, writefd); + + stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); } static const struct pw_client_node_events client_node_events = { - &client_node_done, &client_node_set_props, &client_node_event, &client_node_add_port, @@ -902,6 +889,20 @@ static void on_node_proxy_destroy(struct pw_listener *listener, struct pw_proxy stream_set_state(this, PW_STREAM_STATE_UNCONNECTED, NULL); } +static void on_node_proxy_sync_done(struct pw_listener *listener, struct pw_context *context, int seq) +{ + struct stream *impl = SPA_CONTAINER_OF(listener, struct stream, node_proxy_sync_done); + struct pw_stream *this = &impl->this; + + if (seq != 2) + return; + + pw_log_info("stream %p: sync done %d", this, seq); + do_node_init(this); + + pw_signal_remove(&impl->node_proxy_sync_done); +} + /** Connect a stream for input or output on \a port_path. * \param stream a \ref pw_stream * \param direction the stream direction @@ -960,6 +961,10 @@ pw_stream_connect(struct pw_stream *stream, "client-node", "client-node", &stream->properties->dict, impl->node_proxy->id); + + pw_signal_add(&stream->context->sync_done, + &impl->node_proxy_sync_done, on_node_proxy_sync_done); + return true; } diff --git a/pipewire/modules/module-client-node.c b/pipewire/modules/module-client-node.c index efdd0035d..61efdcf80 100644 --- a/pipewire/modules/module-client-node.c +++ b/pipewire/modules/module-client-node.c @@ -43,19 +43,20 @@ static struct pw_node *create_node(struct pw_node_factory *factory, uint32_t new_id) { struct pw_client_node *node; - int res; - int readfd, writefd; node = pw_client_node_new(client, new_id, name, properties); if (node == NULL) goto no_mem; +#if 0 if ((res = pw_client_node_get_fds(node, &readfd, &writefd)) < 0) { pw_core_notify_error(client->core_resource, client->core_resource->id, SPA_RESULT_ERROR, "can't get data fds"); return NULL; } pw_client_node_notify_done(node->resource, readfd, writefd); +#endif + pw_core_notify_done(client->core_resource, 2); return node->node; diff --git a/pipewire/modules/module-protocol-native.c b/pipewire/modules/module-protocol-native.c index 0dfbe2683..67fc9eb38 100644 --- a/pipewire/modules/module-protocol-native.c +++ b/pipewire/modules/module-protocol-native.c @@ -112,7 +112,7 @@ static void on_before_iterate(struct pw_listener *listener, struct pw_loop *loop struct native_client *client, *tmp; spa_list_for_each_safe(client, tmp, &this->client_list, link) - pw_connection_flush(client->connection); + pw_connection_flush(client->connection); } static void diff --git a/pipewire/server/client-node.c b/pipewire/server/client-node.c index 63441b417..3406a519f 100644 --- a/pipewire/server/client-node.c +++ b/pipewire/server/client-node.c @@ -862,6 +862,18 @@ static int handle_node_event(struct proxy *this, struct spa_event *event) return SPA_RESULT_OK; } +static void +client_node_done(void *object, int seq, int res) +{ + struct pw_resource *resource = object; + struct pw_client_node *node = resource->object; + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + struct proxy *this = &impl->proxy; + + this->callbacks.done(&this->node, seq, res, this->user_data); +} + + static void client_node_update(void *object, uint32_t change_mask, @@ -935,6 +947,7 @@ static void client_node_destroy(void *object) } static struct pw_client_node_methods client_node_methods = { + &client_node_done, &client_node_update, &client_node_port_update, &client_node_event, @@ -1030,6 +1043,7 @@ static void on_initialized(struct pw_listener *listener, struct pw_node *node) struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, initialized); struct pw_client_node *this = &impl->this; struct pw_transport_info info; + int readfd, writefd; if (this->resource == NULL) return; @@ -1038,8 +1052,10 @@ static void on_initialized(struct pw_listener *listener, struct pw_node *node) impl->transport->area->n_input_ports = node->info.n_input_ports; impl->transport->area->n_output_ports = node->info.n_output_ports; + pw_client_node_get_fds(this, &readfd, &writefd); pw_transport_get_info(impl->transport, &info); - pw_client_node_notify_transport(this->resource, info.memfd, info.offset, info.size); + + pw_client_node_notify_transport(this->resource, readfd, writefd, info.memfd, info.offset, info.size); } static void on_loop_changed(struct pw_listener *listener, struct pw_node *node) diff --git a/pipewire/server/node.c b/pipewire/server/node.c index 737d51b1b..faa1c2ecd 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -313,20 +313,21 @@ static int do_pull(struct pw_node *this) return res; } -static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data) +static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) { struct pw_node *this = user_data; struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - if (SPA_EVENT_TYPE(event) == this->core->type.event_node.AsyncComplete) { - struct spa_event_node_async_complete *ac = - (struct spa_event_node_async_complete *) event; + pw_log_debug("node %p: async complete event %d %d", this, seq, res); + pw_work_queue_complete(impl->work, this, seq, res); + pw_signal_emit(&this->async_complete, this, seq, res); +} - pw_log_debug("node %p: async complete event %d %d", this, ac->body.seq.value, - ac->body.res.value); - pw_work_queue_complete(impl->work, this, ac->body.seq.value, ac->body.res.value); - pw_signal_emit(&this->async_complete, this, ac->body.seq.value, ac->body.res.value); - } else if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) { +static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data) +{ + struct pw_node *this = user_data; + + if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) { send_clock_update(this); } } @@ -509,6 +510,7 @@ void pw_node_set_data_loop(struct pw_node *node, struct pw_data_loop *loop) } static const struct spa_node_callbacks node_callbacks = { + &on_node_done, &on_node_event, &on_node_need_input, &on_node_have_output, diff --git a/pipewire/server/protocol-native.c b/pipewire/server/protocol-native.c index 3edb0efd2..b214ca74e 100644 --- a/pipewire/server/protocol-native.c +++ b/pipewire/server/protocol-native.c @@ -463,22 +463,6 @@ static void client_marshal_info(void *object, struct pw_client_info *info) pw_connection_end_write(connection, resource->id, PW_CLIENT_EVENT_INFO, b.b.offset); } -static void client_node_marshal_done(void *object, int readfd, int writefd) -{ - struct pw_resource *resource = object; - struct pw_connection *connection = resource->client->protocol_private; - struct builder b = { {NULL, 0, 0, NULL, write_pod}, connection }; - struct spa_pod_frame f; - - core_update_map(resource->client); - - spa_pod_builder_struct(&b.b, &f, - SPA_POD_TYPE_INT, pw_connection_add_fd(connection, readfd), - SPA_POD_TYPE_INT, pw_connection_add_fd(connection, writefd)); - - pw_connection_end_write(connection, resource->id, PW_CLIENT_NODE_EVENT_DONE, b.b.offset); -} - static void client_node_marshal_set_props(void *object, uint32_t seq, const struct spa_props *props) { @@ -716,7 +700,8 @@ client_node_marshal_port_command(void *object, b.b.offset); } -static void client_node_marshal_transport(void *object, int memfd, uint32_t offset, uint32_t size) +static void client_node_marshal_transport(void *object, int readfd, int writefd, + int memfd, uint32_t offset, uint32_t size) { struct pw_resource *resource = object; struct pw_connection *connection = resource->client->protocol_private; @@ -726,6 +711,8 @@ static void client_node_marshal_transport(void *object, int memfd, uint32_t offs core_update_map(resource->client); spa_pod_builder_struct(&b.b, &f, + SPA_POD_TYPE_INT, pw_connection_add_fd(connection, readfd), + SPA_POD_TYPE_INT, pw_connection_add_fd(connection, writefd), SPA_POD_TYPE_INT, pw_connection_add_fd(connection, memfd), SPA_POD_TYPE_INT, offset, SPA_POD_TYPE_INT, size); @@ -733,6 +720,23 @@ static void client_node_marshal_transport(void *object, int memfd, uint32_t offs b.b.offset); } + +static bool client_node_demarshal_done(void *object, void *data, size_t size) +{ + struct pw_resource *resource = object; + struct spa_pod_iter it; + uint32_t seq, res; + + if (!spa_pod_iter_struct(&it, data, size) || + !spa_pod_iter_get(&it, + SPA_POD_TYPE_INT, &seq, + SPA_POD_TYPE_INT, &res, 0)) + return false; + + ((struct pw_client_node_methods *) resource->implementation)->done(resource, seq, res); + return true; +} + static bool client_node_demarshal_update(void *object, void *data, size_t size) { struct pw_resource *resource = object; @@ -925,6 +929,7 @@ const struct pw_interface pw_protocol_native_server_client_interface = { }; static const demarshal_func_t pw_protocol_native_server_client_node_demarshal[] = { + &client_node_demarshal_done, &client_node_demarshal_update, &client_node_demarshal_port_update, &client_node_demarshal_event, @@ -932,7 +937,6 @@ static const demarshal_func_t pw_protocol_native_server_client_node_demarshal[] }; static const struct pw_client_node_events pw_protocol_native_server_client_node_events = { - &client_node_marshal_done, &client_node_marshal_set_props, &client_node_marshal_event, &client_node_marshal_add_port, diff --git a/spa/include/spa/buffer.h b/spa/include/spa/buffer.h index e7ebf8f63..f5566b615 100644 --- a/spa/include/spa/buffer.h +++ b/spa/include/spa/buffer.h @@ -28,6 +28,12 @@ extern "C" { #include #include +/** \page page_buffer Buffers + * + * Buffers describe the data and metadata that is exchanged between + * ports of a node. + * + */ #define SPA_TYPE__Buffer SPA_TYPE_POINTER_BASE "Buffer" #define SPA_TYPE_BUFFER_BASE SPA_TYPE__Buffer ":" @@ -56,54 +62,34 @@ static inline void spa_type_data_map(struct spa_type_map *map, struct spa_type_d } } -/** - * spa_chunk: - * @offset: offset of valid data - * @size: size of valid data - * @stride: stride of data if applicable - */ +/** Chunk of memory */ struct spa_chunk { - uint32_t offset; - uint32_t size; - int32_t stride; + uint32_t offset; /**< offset of valid data */ + uint32_t size; /**< size of valid data */ + int32_t stride; /**< stride of valid data */ }; -/** - * spa_data: - * @type: memory type - * @flags: memory flags - * @fd: file descriptor - * @mapoffset: start offset when mapping @fd - * @maxsize: maximum size of the memory - * @data: pointer to memory - * @chunk: pointer to chunk with valid offset - */ +/** Data for a buffer */ struct spa_data { - uint32_t type; - uint32_t flags; - int fd; - uint32_t mapoffset; - uint32_t maxsize; - void *data; - struct spa_chunk *chunk; + uint32_t type; /**< memory type */ + uint32_t flags; /**< data flags */ + int fd; /**< optional fd for data */ + uint32_t mapoffset; /**< offset to map fd at */ + uint32_t maxsize; /**< max size of data */ + void *data; /**< optional data pointer */ + struct spa_chunk *chunk; /**< valid chunk of memory */ }; -/** - * spa_buffer: - * @id: buffer id - * @n_metas: number of metadata - * @metas: offset of array of @n_metas metadata - * @n_datas: number of data pointers - * @datas: offset of array of @n_datas data pointers - */ +/** A Buffer */ struct spa_buffer { - uint32_t id; - uint32_t n_metas; - struct spa_meta *metas; - uint32_t n_datas; - struct spa_data *datas; + uint32_t id; /**< the id of this buffer */ + uint32_t n_metas; /**< number of metadata elements */ + struct spa_meta *metas; /**< array of metadata */ + uint32_t n_datas; /**< number of data members */ + struct spa_data *datas; /**< array of data members */ }; +/** Find metadata in a buffer */ static inline void *spa_buffer_find_meta(struct spa_buffer *b, uint32_t type) { uint32_t i; diff --git a/spa/include/spa/event-node.h b/spa/include/spa/event-node.h index 505dfbd06..030f25ef3 100644 --- a/spa/include/spa/event-node.h +++ b/spa/include/spa/event-node.h @@ -32,14 +32,12 @@ extern "C" { #define SPA_TYPE_EVENT__Node SPA_TYPE_EVENT_BASE "Node" #define SPA_TYPE_EVENT_NODE_BASE SPA_TYPE_EVENT__Node ":" -#define SPA_TYPE_EVENT_NODE__AsyncComplete SPA_TYPE_EVENT_NODE_BASE "AsyncComplete" #define SPA_TYPE_EVENT_NODE__Error SPA_TYPE_EVENT_NODE_BASE "Error" #define SPA_TYPE_EVENT_NODE__Buffering SPA_TYPE_EVENT_NODE_BASE "Buffering" #define SPA_TYPE_EVENT_NODE__RequestRefresh SPA_TYPE_EVENT_NODE_BASE "RequestRefresh" #define SPA_TYPE_EVENT_NODE__RequestClockUpdate SPA_TYPE_EVENT_NODE_BASE "RequestClockUpdate" struct spa_type_event_node { - uint32_t AsyncComplete; uint32_t Error; uint32_t Buffering; uint32_t RequestRefresh; @@ -49,8 +47,7 @@ struct spa_type_event_node { static inline void spa_type_event_node_map(struct spa_type_map *map, struct spa_type_event_node *type) { - if (type->AsyncComplete == 0) { - type->AsyncComplete = spa_type_map_get_id(map, SPA_TYPE_EVENT_NODE__AsyncComplete); + if (type->Error == 0) { type->Error = spa_type_map_get_id(map, SPA_TYPE_EVENT_NODE__Error); type->Buffering = spa_type_map_get_id(map, SPA_TYPE_EVENT_NODE__Buffering); type->RequestRefresh = spa_type_map_get_id(map, SPA_TYPE_EVENT_NODE__RequestRefresh); @@ -58,23 +55,6 @@ spa_type_event_node_map(struct spa_type_map *map, struct spa_type_event_node *ty } } -struct spa_event_node_async_complete_body { - struct spa_pod_object_body body; - struct spa_pod_int seq SPA_ALIGNED(8); - struct spa_pod_int res SPA_ALIGNED(8); -}; - -struct spa_event_node_async_complete { - struct spa_pod pod; - struct spa_event_node_async_complete_body body; -}; - -#define SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(type,seq,res) \ - SPA_EVENT_INIT_COMPLEX(struct spa_event_node_async_complete, \ - sizeof(struct spa_event_node_async_complete_body), type, \ - SPA_POD_INT_INIT(seq), \ - SPA_POD_INT_INIT(res)) - struct spa_event_node_request_clock_update_body { struct spa_pod_object_body body; #define SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME (1 << 0) diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h index 7117e639e..d653c2c05 100644 --- a/spa/include/spa/loop.h +++ b/spa/include/spa/loop.h @@ -81,6 +81,7 @@ struct spa_loop { void (*remove_source) (struct spa_source *source); + /** invoke a function in the context of this loop */ int (*invoke) (struct spa_loop *loop, spa_invoke_func_t func, uint32_t seq, diff --git a/spa/include/spa/meta.h b/spa/include/spa/meta.h index 6b30fc69c..9d5045d88 100644 --- a/spa/include/spa/meta.h +++ b/spa/include/spa/meta.h @@ -28,6 +28,10 @@ extern "C" { #include #include +/** \page page_meta Metadata + * + * Metadata contains extra information on a buffer. + */ #define SPA_TYPE__Meta SPA_TYPE_POINTER_BASE "Meta" #define SPA_TYPE_META_BASE SPA_TYPE__Meta ":" @@ -56,76 +60,51 @@ static inline void spa_type_meta_map(struct spa_type_map *map, struct spa_type_m } } -/** - * spa_meta_header: - * @flags: extra flags - * @seq: sequence number. This monotonically increments and with the rate, - * it can be used to derive a media time. - * @pts: The MONOTONIC time for @seq. - * @dts_offset: offset relative to @pts to start decoding this buffer. - */ +/** Describes essential buffer header metadata */ struct spa_meta_header { -#define SPA_META_HEADER_FLAG_DISCONT (1 << 0) /* data is not continous with previous buffer */ -#define SPA_META_HEADER_FLAG_CORRUPTED (1 << 1) /* data might be corrupted */ -#define SPA_META_HEADER_FLAG_MARKER (1 << 2) /* media specific marker */ -#define SPA_META_HEADER_FLAG_HEADER (1 << 3) /* data contains a codec specific header */ -#define SPA_META_HEADER_FLAG_GAP (1 << 4) /* data contains media neutral data */ -#define SPA_META_HEADER_FLAG_DELTA_UNIT (1 << 5) /* cannot be decoded independently */ - uint32_t flags; - uint32_t seq; - int64_t pts; - int64_t dts_offset; +#define SPA_META_HEADER_FLAG_DISCONT (1 << 0) /**< data is not continous with previous buffer */ +#define SPA_META_HEADER_FLAG_CORRUPTED (1 << 1) /**< data might be corrupted */ +#define SPA_META_HEADER_FLAG_MARKER (1 << 2) /**< media specific marker */ +#define SPA_META_HEADER_FLAG_HEADER (1 << 3) /**< data contains a codec specific header */ +#define SPA_META_HEADER_FLAG_GAP (1 << 4) /**< data contains media neutral data */ +#define SPA_META_HEADER_FLAG_DELTA_UNIT (1 << 5) /**< cannot be decoded independently */ + uint32_t flags; /**< flags */ + uint32_t seq; /**< sequence number, increments with a + * media specific frequency */ + int64_t pts; /**< presentation timestamp */ + int64_t dts_offset; /**< decoding timestamp and a difference with pts */ }; +/** Pointer metadata */ struct spa_meta_pointer { - uint32_t type; - void *ptr; + uint32_t type; /**< the pointer type */ + void *ptr; /**< the pointer */ }; -/** - * spa_meta_video_crop: - * @x: - * @y: - * @width: - * @height - */ +/** Video cropping metadata */ struct spa_meta_video_crop { - int32_t x, y; - int32_t width, height; + int32_t x, y; /**< x and y offsets */ + int32_t width, height; /**< width and height */ }; -/** - * spa_meta_ringbuffer: - * @ringbuffer: - */ +/** Ringbuffer metadata */ struct spa_meta_ringbuffer { - struct spa_ringbuffer ringbuffer; + struct spa_ringbuffer ringbuffer; /**< the ringbuffer */ }; -/** - * spa_meta_shared: - * @flags: flags - * @fd: the fd of the memory - * @offset: start offset of memory - * @size: size of the memory - */ +/** Describes the shared memory of a buffer is stored */ struct spa_meta_shared { - int32_t flags; - int fd; - int32_t offset; - uint32_t size; + int32_t flags; /**< flags */ + int fd; /**< file descriptor of memory */ + int32_t offset; /**< offset in memory */ + uint32_t size; /**< size of memory */ }; -/** - * spa_meta: - * @type: metadata type - * @data: pointer to metadata - * @size: size of metadata - */ +/** A metadata element */ struct spa_meta { - uint32_t type; - void *data; - uint32_t size; + uint32_t type; /**< metadata type */ + void *data; /**< pointer to metadata */ + uint32_t size; /**< size of metadata */ }; #ifdef __cplusplus diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index f4ec7a4ff..91c3f811a 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -82,6 +82,7 @@ struct spa_port_info { struct spa_node_callbacks { + void (*done) (struct spa_node *node, int seq, int res, void *user_data); /** * struct spa_node_callbacks::event: * @node: a #struct spa_node @@ -133,7 +134,6 @@ struct spa_node_callbacks { uint32_t port_id, uint32_t buffer_id, void *user_data); - void (*done) (struct spa_node *node, int seq, int res, void *user_data); }; /** diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 317b6cea1..974750f28 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -88,11 +88,11 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_OK; } -static int do_send_event(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct state *this = user_data; - this->callbacks.event(&this->node, data, this->user_data); + this->callbacks.done(&this->node, seq, *(int*)data, this->user_data); return SPA_RESULT_OK; } @@ -111,11 +111,10 @@ static int do_command(struct spa_loop *loop, bool async, uint32_t seq, size_t si if (async) { spa_loop_invoke(this->main_loop, - do_send_event, - SPA_ID_INVALID, - sizeof(struct spa_event_node_async_complete), - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(this->type.event_node.AsyncComplete, - seq, res), + do_send_done, + seq, + sizeof(res), + &res, this); } return res; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 374deb0bc..92f4d471a 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -90,11 +90,11 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_OK; } -static int do_send_event(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct state *this = user_data; - this->callbacks.event(&this->node, data, this->user_data); + this->callbacks.done(&this->node, seq, *(int*)data, this->user_data); return SPA_RESULT_OK; } @@ -108,11 +108,11 @@ static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size if (async) { spa_loop_invoke(this->main_loop, - do_send_event, - SPA_ID_INVALID, - sizeof(struct spa_event_node_async_complete), - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(this->type.event_node.AsyncComplete, - seq, res), this); + do_send_done, + seq, + sizeof(res), + &res, + this); } return res; } @@ -126,11 +126,11 @@ static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size if (async) { spa_loop_invoke(this->main_loop, - do_send_event, - SPA_ID_INVALID, - sizeof(struct spa_event_node_async_complete), - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(this->type.event_node.AsyncComplete, - seq, res), this); + do_send_done, + seq, + sizeof(res), + &res, + this); } return res; } diff --git a/spa/plugins/meson.build b/spa/plugins/meson.build index f6c6c5caf..986fcdf69 100644 --- a/spa/plugins/meson.build +++ b/spa/plugins/meson.build @@ -3,6 +3,7 @@ subdir('audiomixer') subdir('audiotestsrc') subdir('ffmpeg') subdir('logger') +#subdir('support') subdir('test') subdir('videotestsrc') subdir('volume') diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index c9b74f72f..476baf494 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -232,12 +232,12 @@ static int do_pause_done(struct spa_loop *loop, void *user_data) { struct impl *this = user_data; - struct spa_event_node_async_complete *ac = data; + int res = *(int*)data; - if (SPA_RESULT_IS_OK(ac->body.res.value)) - ac->body.res.value = spa_v4l2_stream_off(this); + if (SPA_RESULT_IS_OK(res)) + res = spa_v4l2_stream_off(this); - this->callbacks.event(&this->node, (struct spa_event *) ac, this->user_data); + this->callbacks.done(&this->node, seq, res, this->user_data); return SPA_RESULT_OK; } @@ -259,9 +259,8 @@ static int do_pause(struct spa_loop *loop, spa_loop_invoke(this->out_ports[0].main_loop, do_pause_done, seq, - sizeof(struct spa_event_node_async_complete), - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(this->type.event_node.AsyncComplete, - seq, res), + sizeof(res), + &res, this); } return res; @@ -275,9 +274,9 @@ static int do_start_done(struct spa_loop *loop, void *user_data) { struct impl *this = user_data; - struct spa_event_node_async_complete *ac = data; + int res = *(int*)data; - this->callbacks.event(&this->node, (struct spa_event *) ac, this->user_data); + this->callbacks.done(&this->node, seq, res, this->user_data); return SPA_RESULT_OK; } @@ -299,9 +298,8 @@ static int do_start(struct spa_loop *loop, spa_loop_invoke(this->out_ports[0].main_loop, do_start_done, seq, - sizeof(struct spa_event_node_async_complete), - &SPA_EVENT_NODE_ASYNC_COMPLETE_INIT(this->type.event_node.AsyncComplete, - seq, res), + sizeof(res), + &res, this); } return SPA_RESULT_OK; diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index f132a8b30..1372fe0d6 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -215,6 +215,11 @@ static int make_node(struct data *data, struct spa_node **node, const char *lib, return SPA_RESULT_ERROR; } +static void on_sink_done(struct spa_node *node, int seq, int res, void *user_data) +{ + printf("got done %d %d\n", seq, res); +} + static void on_sink_event(struct spa_node *node, struct spa_event *event, void *user_data) { printf("got event %d\n", SPA_EVENT_TYPE(event)); @@ -239,6 +244,7 @@ on_sink_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id } static const struct spa_node_callbacks sink_callbacks = { + &on_sink_done, &on_sink_event, &on_sink_need_input, NULL, diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 124eee57e..2f1e5c513 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -226,6 +226,11 @@ static int make_node(struct data *data, struct spa_node **node, const char *lib, return SPA_RESULT_ERROR; } +static void on_sink_done(struct spa_node *node, int seq, int res, void *user_data) +{ + printf("got done %d %d\n", seq, res); +} + static void on_sink_event(struct spa_node *node, struct spa_event *event, void *user_data) { printf("got event %d\n", SPA_EVENT_TYPE(event)); @@ -281,6 +286,7 @@ on_sink_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id } static const struct spa_node_callbacks sink_callbacks = { + &on_sink_done, &on_sink_event, &on_sink_need_input, NULL, diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index 04446321e..4a62e9b8a 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -240,6 +240,12 @@ static void on_source_push(struct data *data) } } +static void on_sink_done(struct spa_node *node, int seq, int res, void *user_data) +{ + struct data *data = user_data; + spa_log_trace(data->log, "got sink done %d %d", seq, res); +} + static void on_sink_event(struct spa_node *node, struct spa_event *event, void *user_data) { struct data *data = user_data; @@ -264,12 +270,19 @@ on_sink_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id } static const struct spa_node_callbacks sink_callbacks = { + &on_sink_done, &on_sink_event, &on_sink_need_input, NULL, &on_sink_reuse_buffer }; +static void on_source_done(struct spa_node *node, int seq, int res, void *user_data) +{ + struct data *data = user_data; + spa_log_trace(data->log, "got source done %d %d", seq, res); +} + static void on_source_event(struct spa_node *node, struct spa_event *event, void *user_data) { struct data *data = user_data; @@ -286,6 +299,7 @@ static void on_source_have_output(struct spa_node *node, void *user_data) } static const struct spa_node_callbacks source_callbacks = { + &on_source_done, &on_source_event, NULL, &on_source_have_output, diff --git a/spa/tests/test-ringbuffer.c b/spa/tests/test-ringbuffer.c index ccd127d90..9c6f0da9b 100644 --- a/spa/tests/test-ringbuffer.c +++ b/spa/tests/test-ringbuffer.c @@ -205,6 +205,11 @@ static int make_node(struct data *data, struct spa_node **node, const char *lib, return SPA_RESULT_ERROR; } +static void on_sink_done(struct spa_node *node, int seq, int res, void *user_data) +{ + printf("got done %d %d\n", seq, res); +} + static void on_sink_event(struct spa_node *node, struct spa_event *event, void *user_data) { printf("got event %d\n", SPA_EVENT_TYPE(event)); @@ -231,6 +236,7 @@ on_sink_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id } static const struct spa_node_callbacks sink_callbacks = { + &on_sink_done, &on_sink_event, &on_sink_need_input, NULL, diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index 3182306e7..325e1cdeb 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -176,6 +176,11 @@ static void handle_events(struct data *data) } } +static void on_source_done(struct spa_node *node, int seq, int res, void *user_data) +{ + printf("got done %d %d\n", seq, res); +} + static void on_source_event(struct spa_node *node, struct spa_event *event, void *user_data) { struct data *data = user_data; @@ -256,6 +261,7 @@ static void on_source_have_output(struct spa_node *node, void *user_data) } static const struct spa_node_callbacks source_callbacks = { + &on_source_done, &on_source_event, NULL, &on_source_have_output,