From 49d31ea0af99beedd83e3f482e731db0a5f32c95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barnab=C3=A1s=20P=C5=91cze?= Date: Fri, 18 Jun 2021 23:36:35 +0200 Subject: [PATCH] pulse-server: split out reply, operation, client, stream Part of !776. --- src/modules/meson.build | 4 + src/modules/module-protocol-pulse/client.c | 291 +++++++++ src/modules/module-protocol-pulse/client.h | 114 ++++ .../ext-stream-restore.c | 4 +- src/modules/module-protocol-pulse/internal.h | 125 +--- src/modules/module-protocol-pulse/module.h | 1 + src/modules/module-protocol-pulse/operation.c | 67 ++ src/modules/module-protocol-pulse/operation.h | 44 ++ .../module-protocol-pulse/pending-sample.c | 1 + .../module-protocol-pulse/pulse-server.c | 577 ++---------------- src/modules/module-protocol-pulse/reply.c | 73 +++ src/modules/module-protocol-pulse/reply.h | 42 ++ src/modules/module-protocol-pulse/stream.c | 251 ++++++++ src/modules/module-protocol-pulse/stream.h | 115 ++++ 14 files changed, 1044 insertions(+), 665 deletions(-) create mode 100644 src/modules/module-protocol-pulse/client.c create mode 100644 src/modules/module-protocol-pulse/client.h create mode 100644 src/modules/module-protocol-pulse/operation.c create mode 100644 src/modules/module-protocol-pulse/operation.h create mode 100644 src/modules/module-protocol-pulse/reply.c create mode 100644 src/modules/module-protocol-pulse/reply.h create mode 100644 src/modules/module-protocol-pulse/stream.c create mode 100644 src/modules/module-protocol-pulse/stream.h diff --git a/src/modules/meson.build b/src/modules/meson.build index 3090f9f2b..8fae8c54f 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -133,16 +133,20 @@ endif pipewire_module_protocol_pulse_sources = [ 'module-protocol-pulse.c', + 'module-protocol-pulse/client.c', 'module-protocol-pulse/collect.c', 'module-protocol-pulse/dbus-name.c', 'module-protocol-pulse/format.c', 'module-protocol-pulse/manager.c', 'module-protocol-pulse/media-roles.c', 'module-protocol-pulse/message.c', + 'module-protocol-pulse/operation.c', 'module-protocol-pulse/pending-sample.c', 'module-protocol-pulse/pulse-server.c', + 'module-protocol-pulse/reply.c', 'module-protocol-pulse/sample.c', 'module-protocol-pulse/sample-play.c', + 'module-protocol-pulse/stream.c', 'module-protocol-pulse/utils.c', 'module-protocol-pulse/volume.c', 'module-protocol-pulse/modules/module-combine-sink.c', diff --git a/src/modules/module-protocol-pulse/client.c b/src/modules/module-protocol-pulse/client.c new file mode 100644 index 000000000..5f914e953 --- /dev/null +++ b/src/modules/module-protocol-pulse/client.c @@ -0,0 +1,291 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "client.h" +#include "commands.h" +#include "defs.h" +#include "internal.h" +#include "manager.h" +#include "message.h" +#include "operation.h" +#include "pending-sample.h" +#include "stream.h" + +static int client_free_stream(void *item, void *data) +{ + struct stream *s = item; + + stream_free(s); + return 0; +} + +/* + * tries to detach the client from the server, + * but it does not drop the server's reference + */ +bool client_detach(struct client *client) +{ + struct impl *impl = client->impl; + struct server *server = client->server; + + if (server == NULL) + return false; + + pw_log_debug("client %p: detaching from server %p", client, server); + + /* remove from the `server->clients` list */ + spa_list_remove(&client->link); + + server->n_clients--; + if (server->wait_clients > 0 && --server->wait_clients == 0) { + int mask = server->source->mask; + SPA_FLAG_SET(mask, SPA_IO_IN); + pw_loop_update_io(impl->loop, server->source, mask); + } + + client->server = NULL; + + return true; +} + +void client_disconnect(struct client *client) +{ + struct impl *impl = client->impl; + + if (client->disconnect) + return; + + /* the client must be detached from the server to disconnect */ + spa_assert(client->server == NULL); + + client->disconnect = true; + spa_list_append(&impl->cleanup_clients, &client->link); + + pw_map_for_each(&client->streams, client_free_stream, client); + + if (client->source) + pw_loop_destroy_source(impl->loop, client->source); + + if (client->manager) + pw_manager_destroy(client->manager); +} + +void client_free(struct client *client) +{ + struct impl *impl = client->impl; + struct pending_sample *p; + struct message *msg; + struct operation *o; + + pw_log_debug("client %p: free", client); + + client_detach(client); + client_disconnect(client); + + /* remove from the `impl->cleanup_clients` list */ + spa_list_remove(&client->link); + + spa_list_consume(p, &client->pending_samples, link) + pending_sample_free(p); + + spa_list_consume(msg, &client->out_messages, link) + message_free(impl, msg, true, false); + + spa_list_consume(o, &client->operations, link) + operation_free(o); + + if (client->core) { + client->disconnecting = true; + pw_core_disconnect(client->core); + } + + pw_map_clear(&client->streams); + + free(client->default_sink); + free(client->default_source); + + if (client->props) + pw_properties_free(client->props); + + if (client->routes) + pw_properties_free(client->routes); + + free(client); +} + +int client_queue_message(struct client *client, struct message *msg) +{ + struct impl *impl = client->impl; + int res, mask; + + if (msg == NULL) + return -EINVAL; + + if (msg->length == 0) { + res = 0; + goto error; + } else if (msg->length > msg->allocated) { + res = -ENOMEM; + goto error; + } + + msg->offset = 0; + spa_list_append(&client->out_messages, &msg->link); + + mask = client->source->mask; + if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) { + client->need_flush = true; + SPA_FLAG_SET(mask, SPA_IO_OUT); + pw_loop_update_io(impl->loop, client->source, mask); + } + + return 0; + +error: + message_free(impl, msg, false, false); + return res; +} + +int client_flush_messages(struct client *client) +{ + struct impl *impl = client->impl; + int res; + + while (true) { + struct message *m; + struct descriptor desc; + void *data; + size_t size; + + if (spa_list_is_empty(&client->out_messages)) + break; + + m = spa_list_first(&client->out_messages, struct message, link); + + if (client->out_index < sizeof(desc)) { + desc.length = htonl(m->length); + desc.channel = htonl(m->channel); + desc.offset_hi = 0; + desc.offset_lo = 0; + desc.flags = 0; + + data = SPA_PTROFF(&desc, client->out_index, void); + size = sizeof(desc) - client->out_index; + } else if (client->out_index < m->length + sizeof(desc)) { + uint32_t idx = client->out_index - sizeof(desc); + data = m->data + idx; + size = m->length - idx; + } else { + if (debug_messages && m->channel == SPA_ID_INVALID) + message_dump(SPA_LOG_LEVEL_INFO, m); + message_free(impl, m, true, false); + client->out_index = 0; + continue; + } + + while (true) { + res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res < 0) { + res = -errno; + if (res == -EINTR) + continue; + if (res != -EAGAIN && res != -EWOULDBLOCK) + pw_log_warn("client %p: send channel:%d %zu, error %d: %m", + client, m->channel, size, res); + return res; + } + + client->out_index += res; + break; + } + } + + return 0; +} + +int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id) +{ + struct impl *impl = client->impl; + struct message *reply, *m, *t; + + if (!(client->subscribed & mask)) + return 0; + + pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id); + + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) != SUBSCRIPTION_EVENT_NEW) { + spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { + if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) + continue; + if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK) + continue; + if (m->extra[2] != id) + continue; + + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) { + /* This object is being removed, hence there is + * point in keeping the old events regarding + * entry in the queue. */ + message_free(impl, m, true, false); + pw_log_debug("client %p: dropped redundant event due to remove event", client); + continue; + } + + if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) { + /* This object has changed. If a "new" or "change" event for + * this object is still in the queue we can exit. */ + pw_log_debug("client %p: dropped redundant event due to change event", client); + return 0; + } + } + } + + reply = message_alloc(impl, -1, 0); + reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, + reply->extra[1] = event, + reply->extra[2] = id, + + message_put(reply, + TAG_U32, COMMAND_SUBSCRIBE_EVENT, + TAG_U32, -1, + TAG_U32, event, + TAG_U32, id, + TAG_INVALID); + + return client_queue_message(client, reply); +} diff --git a/src/modules/module-protocol-pulse/client.h b/src/modules/module-protocol-pulse/client.h new file mode 100644 index 000000000..da0320748 --- /dev/null +++ b/src/modules/module-protocol-pulse/client.h @@ -0,0 +1,114 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PULSER_SERVER_CLIENT_H +#define PULSER_SERVER_CLIENT_H + +#include +#include + +#include +#include +#include + +struct impl; +struct server; +struct message; +struct spa_source; +struct pw_properties; +struct pw_core; +struct pw_manager; +struct pw_manager_object; +struct pw_properties; + +struct descriptor { + uint32_t length; + uint32_t channel; + uint32_t offset_hi; + uint32_t offset_lo; + uint32_t flags; +}; + +struct client { + struct spa_list link; + struct impl *impl; + struct server *server; + + int ref; + const char *name; + + struct spa_source *source; + + uint32_t version; + + struct pw_properties *props; + + struct pw_core *core; + struct pw_manager *manager; + struct spa_hook manager_listener; + + uint32_t subscribed; + + struct pw_manager_object *metadata_default; + char *default_sink; + char *default_source; + struct pw_manager_object *metadata_routes; + struct pw_properties *routes; + + uint32_t connect_tag; + + uint32_t in_index; + uint32_t out_index; + struct descriptor desc; + struct message *message; + + struct pw_map streams; + struct spa_list out_messages; + + struct spa_list operations; + + struct spa_list pending_samples; + + unsigned int disconnect:1; + unsigned int disconnecting:1; + unsigned int need_flush:1; + + struct pw_manager_object *prev_default_sink; + struct pw_manager_object *prev_default_source; +}; + +bool client_detach(struct client *client); +void client_disconnect(struct client *client); +void client_free(struct client *client); +int client_queue_message(struct client *client, struct message *msg); +int client_flush_messages(struct client *client); +int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id); + +static inline void client_unref(struct client *client) +{ + if (--client->ref == 0) + client_free(client); +} + +#endif /* PULSER_SERVER_CLIENT_H */ diff --git a/src/modules/module-protocol-pulse/ext-stream-restore.c b/src/modules/module-protocol-pulse/ext-stream-restore.c index 2b0529fc1..f960ff526 100644 --- a/src/modules/module-protocol-pulse/ext-stream-restore.c +++ b/src/modules/module-protocol-pulse/ext-stream-restore.c @@ -38,7 +38,7 @@ static int do_extension_stream_restore_test(struct client *client, uint32_t comm message_put(reply, TAG_U32, EXT_STREAM_RESTORE_VERSION, TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); } static int key_from_name(const char *name, char *key, size_t maxlen) @@ -187,7 +187,7 @@ static int do_extension_stream_restore_read(struct client *client, uint32_t comm TAG_BOOLEAN, mute, TAG_INVALID); } - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_extension_stream_restore_write(struct client *client, uint32_t command, uint32_t tag, struct message *m) diff --git a/src/modules/module-protocol-pulse/internal.h b/src/modules/module-protocol-pulse/internal.h index 6b8ec2fd2..961e6ca65 100644 --- a/src/modules/module-protocol-pulse/internal.h +++ b/src/modules/module-protocol-pulse/internal.h @@ -27,6 +27,8 @@ #include "config.h" +#include +#include #include #include @@ -48,14 +50,6 @@ struct defs { struct channel_map channel_map; }; -struct descriptor { - uint32_t length; - uint32_t channel; - uint32_t offset_hi; - uint32_t offset_lo; - uint32_t flags; -}; - struct stats { uint32_t n_allocated; uint32_t allocated; @@ -65,119 +59,6 @@ struct stats { }; struct impl; -struct server; -struct client; - -struct client { - struct spa_list link; - struct impl *impl; - struct server *server; - - int ref; - const char *name; - - struct spa_source *source; - - uint32_t version; - - struct pw_properties *props; - - struct pw_core *core; - struct pw_manager *manager; - struct spa_hook manager_listener; - - uint32_t subscribed; - - struct pw_manager_object *metadata_default; - char *default_sink; - char *default_source; - struct pw_manager_object *metadata_routes; - struct pw_properties *routes; - - uint32_t connect_tag; - - uint32_t in_index; - uint32_t out_index; - struct descriptor desc; - struct message *message; - - struct pw_map streams; - struct spa_list out_messages; - - struct spa_list operations; - - struct spa_list pending_samples; - - unsigned int disconnect:1; - unsigned int disconnecting:1; - unsigned int need_flush:1; - - struct pw_manager_object *prev_default_sink; - struct pw_manager_object *prev_default_source; -}; - -struct buffer_attr { - uint32_t maxlength; - uint32_t tlength; - uint32_t prebuf; - uint32_t minreq; - uint32_t fragsize; -}; - -struct stream { - uint32_t create_tag; - uint32_t channel; /* index in map */ - uint32_t id; /* id of global */ - - struct impl *impl; - struct client *client; -#define STREAM_TYPE_RECORD 0 -#define STREAM_TYPE_PLAYBACK 1 -#define STREAM_TYPE_UPLOAD 2 - uint32_t type; - enum pw_direction direction; - - struct pw_properties *props; - - struct pw_stream *stream; - struct spa_hook stream_listener; - - struct spa_io_rate_match *rate_match; - struct spa_ringbuffer ring; - void *buffer; - - int64_t read_index; - int64_t write_index; - uint64_t underrun_for; - uint64_t playing_for; - uint64_t ticks_base; - uint64_t timestamp; - int64_t delay; - - uint32_t missing; - uint32_t requested; - - struct sample_spec ss; - struct channel_map map; - struct buffer_attr attr; - uint32_t frame_size; - uint32_t rate; - - struct volume volume; - bool muted; - - uint32_t drain_tag; - unsigned int corked:1; - unsigned int draining:1; - unsigned int volume_set:1; - unsigned int muted_set:1; - unsigned int early_requests:1; - unsigned int adjust_latency:1; - unsigned int is_underrun:1; - unsigned int in_prebuf:1; - unsigned int done:1; - unsigned int killed:1; -}; struct server { struct spa_list link; @@ -216,6 +97,8 @@ struct impl { struct stats stat; }; +extern bool debug_messages; + int create_and_start_servers(struct impl *impl, const char *addresses, struct pw_array *servers); void server_free(struct server *server); diff --git a/src/modules/module-protocol-pulse/module.h b/src/modules/module-protocol-pulse/module.h index b4260bbcd..68341d24c 100644 --- a/src/modules/module-protocol-pulse/module.h +++ b/src/modules/module-protocol-pulse/module.h @@ -28,6 +28,7 @@ #include +#include "client.h" #include "internal.h" struct module; diff --git a/src/modules/module-protocol-pulse/operation.c b/src/modules/module-protocol-pulse/operation.c new file mode 100644 index 000000000..bf98a5e13 --- /dev/null +++ b/src/modules/module-protocol-pulse/operation.c @@ -0,0 +1,67 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include + +#include "client.h" +#include "manager.h" +#include "operation.h" +#include "reply.h" + +int operation_new(struct client *client, uint32_t tag) +{ + struct operation *o; + + if ((o = calloc(1, sizeof(*o))) == NULL) + return -errno; + + o->client = client; + o->tag = tag; + + spa_list_append(&client->operations, &o->link); + pw_manager_sync(client->manager); + + pw_log_debug("client %p [%s]: new operation tag:%u", client, client->name, tag); + + return 0; +} + +void operation_free(struct operation *o) +{ + spa_list_remove(&o->link); + free(o); +} + +void operation_complete(struct operation *o) +{ + struct client *client = o->client; + + pw_log_info("client %p [%s]: tag:%u complete", client, client->name, o->tag); + + reply_simple_ack(client, o->tag); + operation_free(o); +} diff --git a/src/modules/module-protocol-pulse/operation.h b/src/modules/module-protocol-pulse/operation.h new file mode 100644 index 000000000..c509487ef --- /dev/null +++ b/src/modules/module-protocol-pulse/operation.h @@ -0,0 +1,44 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PULSER_SERVER_OPERATION_H +#define PULSER_SERVER_OPERATION_H + +#include + +#include + +struct client; + +struct operation { + struct spa_list link; + struct client *client; + uint32_t tag; +}; + +int operation_new(struct client *client, uint32_t tag); +void operation_free(struct operation *o); +void operation_complete(struct operation *o); + +#endif /* PULSER_SERVER_OPERATION_H */ diff --git a/src/modules/module-protocol-pulse/pending-sample.c b/src/modules/module-protocol-pulse/pending-sample.c index b8838bd24..2f6b3f165 100644 --- a/src/modules/module-protocol-pulse/pending-sample.c +++ b/src/modules/module-protocol-pulse/pending-sample.c @@ -26,6 +26,7 @@ #include #include +#include "client.h" #include "internal.h" #include "pending-sample.h" #include "sample-play.h" diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index c326c503a..6feeb918d 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -71,6 +71,7 @@ #include "pipewire/extensions/metadata.h" #include "pulse-server.h" +#include "client.h" #include "collect.h" #include "commands.h" #include "dbus-name.h" @@ -78,9 +79,12 @@ #include "format.h" #include "internal.h" #include "message.h" +#include "operation.h" #include "pending-sample.h" +#include "reply.h" #include "sample.h" #include "sample-play.h" +#include "stream.h" #include "utils.h" #include "volume.h" @@ -100,13 +104,7 @@ #include "manager.h" -static bool debug_messages = false; - -struct operation { - struct spa_list link; - struct client *client; - uint32_t tag; -}; +bool debug_messages = false; struct latency_offset_data { int64_t prev_latency_offset; @@ -119,9 +117,6 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t #include "module.c" #include "message-handler.c" -static void client_free(struct client *client); -static void client_unref(struct client *client); - static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *name) { union pw_map_item *item; @@ -138,311 +133,18 @@ static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *n return NULL; } -static int flush_messages(struct client *client) -{ - struct impl *impl = client->impl; - int res; - - while (true) { - struct message *m; - struct descriptor desc; - void *data; - size_t size; - - if (spa_list_is_empty(&client->out_messages)) - break; - m = spa_list_first(&client->out_messages, struct message, link); - - if (client->out_index < sizeof(desc)) { - desc.length = htonl(m->length); - desc.channel = htonl(m->channel); - desc.offset_hi = 0; - desc.offset_lo = 0; - desc.flags = 0; - - data = SPA_PTROFF(&desc, client->out_index, void); - size = sizeof(desc) - client->out_index; - } else if (client->out_index < m->length + sizeof(desc)) { - uint32_t idx = client->out_index - sizeof(desc); - data = m->data + idx; - size = m->length - idx; - } else { - if (debug_messages && m->channel == SPA_ID_INVALID) - message_dump(SPA_LOG_LEVEL_INFO, m); - message_free(impl, m, true, false); - client->out_index = 0; - continue; - } - - while (true) { - res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); - if (res < 0) { - res = -errno; - if (res == -EINTR) - continue; - if (res != -EAGAIN && res != -EWOULDBLOCK) - pw_log_warn("send channel:%d %zu, error %d: %m", m->channel, size, res); - return res; - } - client->out_index += res; - break; - } - } - return 0; -} - -static int send_message(struct client *client, struct message *m) -{ - struct impl *impl = client->impl; - int res, mask; - - if (m == NULL) - return -EINVAL; - - if (m->length == 0) { - res = 0; - goto error; - } else if (m->length > m->allocated) { - res = -ENOMEM; - goto error; - } - - m->offset = 0; - spa_list_append(&client->out_messages, &m->link); - - mask = client->source->mask; - if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) { - client->need_flush = true; - SPA_FLAG_SET(mask, SPA_IO_OUT); - pw_loop_update_io(impl->loop, client->source, mask); - } - return 0; -error: - message_free(impl, m, false, false); - return res; -} - -static struct message *reply_new(struct client *client, uint32_t tag) -{ - struct impl *impl = client->impl; - struct message *reply; - reply = message_alloc(impl, -1, 0); - pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); - message_put(reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); - return reply; -} - -static int reply_simple_ack(struct client *client, uint32_t tag) -{ - struct message *reply = reply_new(client, tag); - return send_message(client, reply); -} - -static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res) -{ - struct impl *impl = client->impl; - struct message *reply; - uint32_t error = res_to_err(res); - const char *name; - - if (command < COMMAND_MAX) - name = commands[command].name; - else - name = "invalid"; - - pw_log(res == -ENOENT ? SPA_LOG_LEVEL_INFO : SPA_LOG_LEVEL_WARN, - NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)", - client, client->name, command, name, tag, error, spa_strerror(res)); - - reply = message_alloc(impl, -1, 0); - message_put(reply, - TAG_U32, COMMAND_ERROR, - TAG_U32, tag, - TAG_U32, error, - TAG_INVALID); - return send_message(client, reply); -} - -static int operation_new(struct client *client, uint32_t tag) -{ - struct operation *o; - - if ((o = calloc(1, sizeof(*o))) == NULL) - return -errno; - - o->client = client; - o->tag = tag; - spa_list_append(&client->operations, &o->link); - pw_manager_sync(client->manager); - pw_log_debug(NAME" %p: operation tag:%u", client, tag); - return 0; -} - -static void operation_free(struct operation *o) -{ - spa_list_remove(&o->link); - free(o); -} - -static void operation_complete(struct operation *o) -{ - struct client *client = o->client; - - pw_log_info(NAME" %p: [%s] tag:%u complete", client, client->name, o->tag); - reply_simple_ack(o->client, o->tag); - operation_free(o); -} - #include "extension.c" -static int send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - struct message *reply; - - if (ratelimit_test(&impl->rate_limit, stream->timestamp)) { - pw_log_warn(NAME" %p: [%s] UNDERFLOW channel:%u offset:%"PRIi64" underrun:%u", - client, client->name, stream->channel, offset, underrun_for); - } - - reply = message_alloc(impl, -1, 0); - message_put(reply, - TAG_U32, COMMAND_UNDERFLOW, - TAG_U32, -1, - TAG_U32, stream->channel, - TAG_INVALID); - if (client->version >= 23) { - message_put(reply, - TAG_S64, offset, - TAG_INVALID); - } - return send_message(client, reply); -} - -static int send_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id) -{ - struct impl *impl = client->impl; - struct message *reply, *m, *t; - - if (!(client->subscribed & mask)) - return 0; - - pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id); - - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) != SUBSCRIPTION_EVENT_NEW) { - spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) { - if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT) - continue; - if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK) - continue; - if (m->extra[2] != id) - continue; - - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) { - /* This object is being removed, hence there is no - * point in keeping the old events regarding this - * entry in the queue. */ - message_free(impl, m, true, false); - pw_log_debug("Dropped redundant event due to remove event."); - continue; - } - if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) { - /* This object has changed. If a "new" or "change" event for - * this object is still in the queue we can exit. */ - pw_log_debug("Dropped redundant event due to change event."); - return 0; - } - } - } - - reply = message_alloc(impl, -1, 0); - reply->extra[0] = COMMAND_SUBSCRIBE_EVENT, - reply->extra[1] = event, - reply->extra[2] = id, - message_put(reply, - TAG_U32, COMMAND_SUBSCRIBE_EVENT, - TAG_U32, -1, - TAG_U32, event, - TAG_U32, id, - TAG_INVALID); - return send_message(client, reply); -} - static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id) { struct server *s; spa_list_for_each(s, &impl->servers, link) { struct client *c; spa_list_for_each(c, &s->clients, link) - send_subscribe_event(c, mask, event, id); + client_queue_subscribe_event(c, mask, event, id); } } -static int send_overflow(struct stream *stream) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - struct message *reply; - - pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client, - client->name, stream->channel); - - reply = message_alloc(impl, -1, 0); - message_put(reply, - TAG_U32, COMMAND_OVERFLOW, - TAG_U32, -1, - TAG_U32, stream->channel, - TAG_INVALID); - return send_message(client, reply); -} - -static int send_stream_killed(struct stream *stream) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - struct message *reply; - uint32_t command; - - command = stream->direction == PW_DIRECTION_OUTPUT ? - COMMAND_PLAYBACK_STREAM_KILLED : - COMMAND_RECORD_STREAM_KILLED; - - pw_log_info(NAME" %p: [%s] %s channel:%u", client, client->name, - commands[command].name, stream->channel); - - if (client->version < 23) - return 0; - - reply = message_alloc(impl, -1, 0); - message_put(reply, - TAG_U32, command, - TAG_U32, -1, - TAG_U32, stream->channel, - TAG_INVALID); - return send_message(client, reply); -} - -static int send_stream_started(struct stream *stream) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - struct message *reply; - - pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel); - - reply = message_alloc(impl, -1, 0); - message_put(reply, - TAG_U32, COMMAND_STARTED, - TAG_U32, -1, - TAG_U32, stream->channel, - TAG_INVALID); - return send_message(client, reply); -} - static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -474,7 +176,7 @@ static int do_command_auth(struct client *client, uint32_t command, uint32_t tag TAG_U32, PROTOCOL_VERSION, TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); } static int reply_set_client_name(struct client *client, uint32_t tag) @@ -498,7 +200,7 @@ static int reply_set_client_name(struct client *client, uint32_t tag) TAG_U32, id, /* client index */ TAG_INVALID); } - return send_message(client, reply); + return client_queue_message(client, reply); } static void manager_sync(void *data) @@ -534,7 +236,7 @@ static int send_object_event(struct client *client, struct pw_manager_object *o, uint32_t event = 0, mask = 0, res_id = o->id; if (pw_manager_object_is_sink(o)) { - send_subscribe_event(client, + client_queue_subscribe_event(client, SUBSCRIPTION_MASK_SINK, SUBSCRIPTION_EVENT_SINK | facility, res_id); @@ -568,7 +270,7 @@ static int send_object_event(struct client *client, struct pw_manager_object *o, event = SPA_ID_INVALID; if (event != SPA_ID_INVALID) - send_subscribe_event(client, + client_queue_subscribe_event(client, mask, event | facility, res_id); @@ -627,7 +329,7 @@ static void send_latency_offset_subscribe_event(struct client *client, struct pw d->initialized = true; if (changed) - send_subscribe_event(client, + client_queue_subscribe_event(client, SUBSCRIPTION_MASK_CARD, SUBSCRIPTION_EVENT_CARD | SUBSCRIPTION_EVENT_CHANGE, card_id); @@ -655,7 +357,7 @@ static void send_default_change_subscribe_event(struct client *client, bool sink } if (changed) - send_subscribe_event(client, + client_queue_subscribe_event(client, SUBSCRIPTION_MASK_SERVER, SUBSCRIPTION_EVENT_CHANGE | SUBSCRIPTION_EVENT_SERVER, @@ -874,90 +576,6 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s return reply_simple_ack(client, tag); } -static void stream_free(struct stream *stream) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - - pw_log_debug(NAME" %p: stream %p channel:%d", impl, stream, stream->channel); - - if (stream->drain_tag) - reply_error(client, -1, stream->drain_tag, -ENOENT); - - if (stream->killed) - send_stream_killed(stream); - - /* force processing of all pending messages before we destroy - * the stream */ - pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client); - - if (stream->channel != SPA_ID_INVALID) - pw_map_remove(&client->streams, stream->channel); - if (stream->stream) { - spa_hook_remove(&stream->stream_listener); - pw_stream_destroy(stream->stream); - } - pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID); - - if (stream->buffer) - free(stream->buffer); - pw_properties_free(stream->props); - free(stream); -} - -static bool stream_prebuf_active(struct stream *stream) -{ - uint32_t index; - int32_t avail; - - avail = spa_ringbuffer_get_write_index(&stream->ring, &index); - if (stream->in_prebuf) - return avail < (int32_t) stream->attr.prebuf; - else - return stream->attr.prebuf > 0 && avail >= 0; -} - -static uint32_t stream_pop_missing(struct stream *stream) -{ - uint32_t missing; - - if (stream->missing <= 0) - return 0; - - if (stream->missing < stream->attr.minreq && - !stream_prebuf_active(stream)) - return 0; - - missing = stream->missing; - stream->requested += missing; - stream->missing = 0; - return missing; -} - -static int send_command_request(struct stream *stream) -{ - struct client *client = stream->client; - struct impl *impl = client->impl; - struct message *msg; - uint32_t size; - - size = stream_pop_missing(stream); - pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); - - if (size == 0) - return 0; - - msg = message_alloc(impl, -1, 0); - message_put(msg, - TAG_U32, COMMAND_REQUEST, - TAG_U32, -1, - TAG_U32, stream->channel, - TAG_U32, size, - TAG_INVALID); - - return send_message(client, msg); -} - static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss) { uint64_t u; @@ -1135,7 +753,7 @@ static int reply_create_playback_stream(struct stream *stream) stream->create_tag = SPA_ID_INVALID; - return send_message(client, reply); + return client_queue_message(client, reply); } static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) @@ -1276,7 +894,7 @@ static int reply_create_record_stream(struct stream *stream) stream->create_tag = SPA_ID_INVALID; - return send_message(client, reply); + return client_queue_message(client, reply); } static void stream_control_info(void *data, uint32_t id, @@ -1470,9 +1088,9 @@ do_process_done(struct spa_loop *loop, stream->underrun_for = 0; stream->playing_for = 0; if (pd->underrun) - send_underflow(stream, stream->read_index, pd->underrun_for); + stream_send_underflow(stream, stream->read_index, pd->underrun_for); else - send_stream_started(stream); + stream_send_started(stream); } stream->missing += pd->missing; stream->missing = SPA_MIN(stream->missing, stream->attr.tlength); @@ -1480,7 +1098,7 @@ do_process_done(struct spa_loop *loop, if (stream->underrun_for != (uint64_t)-1) stream->underrun_for += pd->underrun_for; - send_command_request(stream); + stream_send_request(stream); } else { struct message *msg; stream->write_index += pd->write_inc; @@ -1523,7 +1141,7 @@ do_process_done(struct spa_loop *loop, index % stream->attr.maxlength, msg->data, towrite); - send_message(client, msg); + client_queue_message(client, msg); index += towrite; avail -= towrite; @@ -2257,7 +1875,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint TAG_U64, stream->playing_for, TAG_INVALID); } - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -2292,7 +1910,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 TAG_S64, stream->read_index, TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_create_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -2375,7 +1993,7 @@ static int do_create_upload_stream(struct client *client, uint32_t command, uint TAG_U32, stream->channel, TAG_U32, length, TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); error_errno: res = -errno; @@ -2583,7 +2201,7 @@ static void sample_play_ready(void *data, uint32_t index) TAG_U32, index, TAG_INVALID); - send_message(client, reply); + client_queue_message(client, reply); } static void on_sample_done(void *obj, void *data, int res, uint32_t id) @@ -2763,31 +2381,6 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, return reply_simple_ack(client, tag); } -static void stream_flush(struct stream *stream) -{ - pw_stream_flush(stream->stream, false); - - if (stream->type == STREAM_TYPE_PLAYBACK) { - stream->ring.writeindex = stream->ring.readindex; - stream->write_index = stream->read_index; - - stream->missing = stream->attr.tlength - - SPA_MIN(stream->requested, stream->attr.tlength); - - if (stream->attr.prebuf > 0) - stream->in_prebuf = true; - - stream->playing_for = 0; - stream->underrun_for = -1; - stream->is_underrun = true; - - send_command_request(stream); - } else { - stream->ring.readindex = stream->ring.writeindex; - stream->read_index = stream->write_index; - } -} - static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -3481,7 +3074,7 @@ static int do_get_server_info(struct client *client, uint32_t command, uint32_t TAG_CHANNEL_MAP, &impl->defs.channel_map, TAG_INVALID); } - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -3500,7 +3093,7 @@ static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct TAG_U32, impl->stat.sample_cache, /* sample cache size */ TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_lookup(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -3527,7 +3120,7 @@ static int do_lookup(struct client *client, uint32_t command, uint32_t tag, stru TAG_U32, is_monitor ? o->id | MONITOR_FLAG : o->id, TAG_INVALID); - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -4268,7 +3861,7 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st if (module == NULL) goto error_noentity; fill_ext_module_info(client, reply, module); - return send_message(client, reply); + return client_queue_message(client, reply); } switch (command) { @@ -4342,7 +3935,7 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st if ((res = fill_func(client, reply, o)) < 0) goto error; - return send_message(client, reply); + return client_queue_message(client, reply); error_protocol: res = -EPROTO; @@ -4427,7 +4020,7 @@ static int do_get_sample_info(struct client *client, uint32_t command, uint32_t if ((res = fill_sample_info(client, reply, sample)) < 0) goto error; - return send_message(client, reply); + return client_queue_message(client, reply); error: if (reply) @@ -4451,7 +4044,7 @@ static int do_get_sample_info_list(struct client *client, uint32_t command, uint continue; fill_sample_info(client, reply, s); } - return send_message(client, reply); + return client_queue_message(client, reply); } struct info_list_data { @@ -4520,7 +4113,7 @@ static int do_get_info_list(struct client *client, uint32_t command, uint32_t ta if (command == COMMAND_GET_MODULE_INFO_LIST) pw_map_for_each(&impl->modules, do_info_list_module, &info); - return send_message(client, info.reply); + return client_queue_message(client, info.reply); } static int do_set_stream_buffer_attr(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -4603,7 +4196,7 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui TAG_INVALID); } } - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_update_stream_sample_rate(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -4931,7 +4524,7 @@ static void on_module_loaded(void *data, int result) message_put(reply, TAG_U32, module->idx, TAG_INVALID); - send_message(client, reply); + client_queue_message(client, reply); } else { pw_log_warn(NAME" %p: [%s] failed to load module id:%u name:%s result:%d (%s)", @@ -5079,7 +4672,7 @@ static int do_send_object_message(struct client *client, uint32_t command, uint3 reply = reply_new(client, tag); message_put(reply, TAG_STRING, response, TAG_INVALID); free(response); - return send_message(client, reply); + return client_queue_message(client, reply); } static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -5255,106 +4848,6 @@ const struct command commands[COMMAND_MAX] = [COMMAND_SEND_OBJECT_MESSAGE] = { "SEND_OBJECT_MESSAGE", do_send_object_message, }, }; -static int client_free_stream(void *item, void *data) -{ - struct stream *s = item; - stream_free(s); - return 0; -} - -/* - * tries to detach the client from the server, - * but it does not drop the server's reference - */ -static bool client_detach(struct client *client) -{ - struct impl *impl = client->impl; - struct server *server = client->server; - - if (server == NULL) - return false; - - pw_log_info(NAME" %p: client %p detaching", server, client); - - /* remove from the `server->clients` list */ - spa_list_remove(&client->link); - server->n_clients--; - if (server->wait_clients > 0 && --server->wait_clients == 0) { - int mask = server->source->mask; - SPA_FLAG_SET(mask, SPA_IO_IN); - pw_loop_update_io(impl->loop, server->source, mask); - } - client->server = NULL; - - return true; -} - -static void client_disconnect(struct client *client) -{ - struct impl *impl = client->impl; - - if (client->disconnect) - return; - - /* the client must be detached from the server to disconnect */ - spa_assert(client->server == NULL); - - client->disconnect = true; - spa_list_append(&impl->cleanup_clients, &client->link); - - pw_map_for_each(&client->streams, client_free_stream, client); - - if (client->source) - pw_loop_destroy_source(impl->loop, client->source); - if (client->manager) - pw_manager_destroy(client->manager); - -} - -static void client_free(struct client *client) -{ - struct impl *impl = client->impl; - struct message *msg; - struct pending_sample *p; - struct operation *o; - - pw_log_info(NAME" %p: client %p free", impl, client); - - client_detach(client); - client_disconnect(client); - - /* remove from the `impl->cleanup_clients` list */ - spa_list_remove(&client->link); - - spa_list_consume(p, &client->pending_samples, link) - pending_sample_free(p); - - spa_list_consume(msg, &client->out_messages, link) - message_free(impl, msg, true, false); - - spa_list_consume(o, &client->operations, link) - operation_free(o); - - if (client->core) { - client->disconnecting = true; - pw_core_disconnect(client->core); - } - pw_map_clear(&client->streams); - free(client->default_sink); - free(client->default_source); - if (client->props) - pw_properties_free(client->props); - if (client->routes) - pw_properties_free(client->routes); - free(client); -} - -static void client_unref(struct client *client) -{ - if (--client->ref == 0) - client_free(client); -} - static int handle_packet(struct client *client, struct message *msg) { struct impl *impl = client->impl; @@ -5450,7 +4943,7 @@ static int handle_memblock(struct client *client, struct message *msg) /* underrun, reported on reader side */ } else if (filled + msg->length > stream->attr.maxlength) { /* overrun */ - send_overflow(stream); + stream_send_overflow(stream); } /* always write data to ringbuffer, we expect the other side @@ -5582,7 +5075,7 @@ on_client_data(void *data, int fd, uint32_t mask) if (mask & SPA_IO_OUT || client->need_flush) { pw_log_trace(NAME" %p: can write", impl); client->need_flush = false; - res = flush_messages(client); + res = client_flush_messages(client); if (res >= 0) { int mask = client->source->mask; SPA_FLAG_CLEAR(mask, SPA_IO_OUT); diff --git a/src/modules/module-protocol-pulse/reply.c b/src/modules/module-protocol-pulse/reply.c new file mode 100644 index 000000000..abf192042 --- /dev/null +++ b/src/modules/module-protocol-pulse/reply.c @@ -0,0 +1,73 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include + +#include "defs.h" +#include "client.h" +#include "commands.h" +#include "message.h" + +struct message *reply_new(const struct client *client, uint32_t tag) +{ + struct message *reply = message_alloc(client->impl, -1, 0); + + pw_log_debug("client %p: new reply tag:%u", client, tag); + + message_put(reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_INVALID); + + return reply; +} + +int reply_error(struct client *client, uint32_t command, uint32_t tag, int res) +{ + struct impl *impl = client->impl; + struct message *reply; + uint32_t error = res_to_err(res); + const char *name; + + if (command < COMMAND_MAX) + name = commands[command].name; + else + name = "invalid"; + + pw_log(res == -ENOENT ? SPA_LOG_LEVEL_INFO : SPA_LOG_LEVEL_WARN, + "client %p [%s]: ERROR command:%d (%s) tag:%u error:%u (%s)", + client, client->name, command, name, tag, error, spa_strerror(res)); + + reply = message_alloc(impl, -1, 0); + message_put(reply, + TAG_U32, COMMAND_ERROR, + TAG_U32, tag, + TAG_U32, error, + TAG_INVALID); + + return client_queue_message(client, reply); +} diff --git a/src/modules/module-protocol-pulse/reply.h b/src/modules/module-protocol-pulse/reply.h new file mode 100644 index 000000000..1ca9ad10b --- /dev/null +++ b/src/modules/module-protocol-pulse/reply.h @@ -0,0 +1,42 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PULSE_SERVER_REPLY_H +#define PULSE_SERVER_REPLY_H + +#include + +#include "client.h" + +struct message; + +struct message *reply_new(const struct client *client, uint32_t tag); +int reply_error(struct client *client, uint32_t command, uint32_t tag, int res); + +static inline int reply_simple_ack(struct client *client, uint32_t tag) +{ + return client_queue_message(client, reply_new(client, tag)); +} + +#endif /* PULSE_SERVER_REPLY_H */ diff --git a/src/modules/module-protocol-pulse/stream.c b/src/modules/module-protocol-pulse/stream.c new file mode 100644 index 000000000..ec9543dc2 --- /dev/null +++ b/src/modules/module-protocol-pulse/stream.c @@ -0,0 +1,251 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "client.h" +#include "commands.h" +#include "internal.h" +#include "message.h" +#include "reply.h" +#include "stream.h" + +void stream_free(struct stream *stream) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + + pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel); + + if (stream->drain_tag) + reply_error(client, -1, stream->drain_tag, -ENOENT); + + if (stream->killed) + stream_send_killed(stream); + + /* force processing of all pending messages before we destroy + * the stream */ + pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client); + + if (stream->channel != SPA_ID_INVALID) + pw_map_remove(&client->streams, stream->channel); + + if (stream->stream) { + spa_hook_remove(&stream->stream_listener); + pw_stream_destroy(stream->stream); + } + + pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID); + + if (stream->buffer) + free(stream->buffer); + + pw_properties_free(stream->props); + + free(stream); +} + +void stream_flush(struct stream *stream) +{ + pw_stream_flush(stream->stream, false); + + if (stream->type == STREAM_TYPE_PLAYBACK) { + stream->ring.writeindex = stream->ring.readindex; + stream->write_index = stream->read_index; + + stream->missing = stream->attr.tlength - + SPA_MIN(stream->requested, stream->attr.tlength); + + if (stream->attr.prebuf > 0) + stream->in_prebuf = true; + + stream->playing_for = 0; + stream->underrun_for = -1; + stream->is_underrun = true; + + stream_send_request(stream); + } else { + stream->ring.readindex = stream->ring.writeindex; + stream->read_index = stream->write_index; + } +} + +static bool stream_prebuf_active(struct stream *stream) +{ + uint32_t index; + int32_t avail; + + avail = spa_ringbuffer_get_write_index(&stream->ring, &index); + + if (stream->in_prebuf) + return avail < (int32_t) stream->attr.prebuf; + else + return stream->attr.prebuf > 0 && avail >= 0; +} + +uint32_t stream_pop_missing(struct stream *stream) +{ + uint32_t missing; + + if (stream->missing <= 0) + return 0; + + if (stream->missing < stream->attr.minreq && !stream_prebuf_active(stream)) + return 0; + + missing = stream->missing; + stream->requested += missing; + stream->missing = 0; + + return missing; +} + +int stream_send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + struct message *reply; + + if (ratelimit_test(&impl->rate_limit, stream->timestamp)) { + pw_log_warn("client %p [%s]: stream %p UNDERFLOW channel:%u offset:%" PRIi64 " underrun:%u", + client, client->name, stream, stream->channel, offset, underrun_for); + } + + reply = message_alloc(impl, -1, 0); + message_put(reply, + TAG_U32, COMMAND_UNDERFLOW, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + + if (client->version >= 23) { + message_put(reply, + TAG_S64, offset, + TAG_INVALID); + } + + return client_queue_message(client, reply); +} + +int stream_send_overflow(struct stream *stream) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + struct message *reply; + + pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u", + client, client->name, stream, stream->channel); + + reply = message_alloc(impl, -1, 0); + message_put(reply, + TAG_U32, COMMAND_OVERFLOW, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + + return client_queue_message(client, reply); +} + +int stream_send_killed(struct stream *stream) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + struct message *reply; + uint32_t command; + + command = stream->direction == PW_DIRECTION_OUTPUT ? + COMMAND_PLAYBACK_STREAM_KILLED : + COMMAND_RECORD_STREAM_KILLED; + + pw_log_info("client %p [%s]: stream %p %s channel:%u", + client, client->name, stream, + commands[command].name, stream->channel); + + if (client->version < 23) + return 0; + + reply = message_alloc(impl, -1, 0); + message_put(reply, + TAG_U32, command, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + + return client_queue_message(client, reply); +} + +int stream_send_started(struct stream *stream) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + struct message *reply; + + pw_log_debug("client %p [%s]: stream %p STARTED channel:%u", + client, client->name, stream, stream->channel); + + reply = message_alloc(impl, -1, 0); + message_put(reply, + TAG_U32, COMMAND_STARTED, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + + return client_queue_message(client, reply); +} + +int stream_send_request(struct stream *stream) +{ + struct client *client = stream->client; + struct impl *impl = client->impl; + struct message *msg; + uint32_t size; + + size = stream_pop_missing(stream); + pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size); + + if (size == 0) + return 0; + + msg = message_alloc(impl, -1, 0); + message_put(msg, + TAG_U32, COMMAND_REQUEST, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_U32, size, + TAG_INVALID); + + return client_queue_message(client, msg); +} diff --git a/src/modules/module-protocol-pulse/stream.h b/src/modules/module-protocol-pulse/stream.h new file mode 100644 index 000000000..2b8f07077 --- /dev/null +++ b/src/modules/module-protocol-pulse/stream.h @@ -0,0 +1,115 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PULSER_SERVER_STREAM_H +#define PULSER_SERVER_STREAM_H + +#include +#include + +#include +#include +#include + +#include "format.h" +#include "volume.h" + +struct impl; +struct client; +struct spa_io_rate_match; + +struct buffer_attr { + uint32_t maxlength; + uint32_t tlength; + uint32_t prebuf; + uint32_t minreq; + uint32_t fragsize; +}; + +struct stream { + uint32_t create_tag; + uint32_t channel; /* index in map */ + uint32_t id; /* id of global */ + + struct impl *impl; + struct client *client; +#define STREAM_TYPE_RECORD 0 +#define STREAM_TYPE_PLAYBACK 1 +#define STREAM_TYPE_UPLOAD 2 + uint32_t type; + enum pw_direction direction; + + struct pw_properties *props; + + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct spa_io_rate_match *rate_match; + struct spa_ringbuffer ring; + void *buffer; + + int64_t read_index; + int64_t write_index; + uint64_t underrun_for; + uint64_t playing_for; + uint64_t ticks_base; + uint64_t timestamp; + int64_t delay; + + uint32_t missing; + uint32_t requested; + + struct sample_spec ss; + struct channel_map map; + struct buffer_attr attr; + uint32_t frame_size; + uint32_t rate; + + struct volume volume; + bool muted; + + uint32_t drain_tag; + unsigned int corked:1; + unsigned int draining:1; + unsigned int volume_set:1; + unsigned int muted_set:1; + unsigned int early_requests:1; + unsigned int adjust_latency:1; + unsigned int is_underrun:1; + unsigned int in_prebuf:1; + unsigned int done:1; + unsigned int killed:1; +}; + +void stream_free(struct stream *stream); +void stream_flush(struct stream *stream); +uint32_t stream_pop_missing(struct stream *stream); + +int stream_send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for); +int stream_send_overflow(struct stream *stream); +int stream_send_killed(struct stream *stream); +int stream_send_started(struct stream *stream); +int stream_send_request(struct stream *stream); + +#endif /* PULSER_SERVER_STREAM_H */