From b4471673394d25ae97ec7f9a3750c0badd4fac6c Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 13 Nov 2020 18:19:56 +0100 Subject: [PATCH] pulse-server: actually play the sample from the cache --- .../module-protocol-pulse/pulse-server.c | 85 ++++++-- src/modules/module-protocol-pulse/sample.c | 199 ++++++++++++++++++ 2 files changed, 271 insertions(+), 13 deletions(-) create mode 100644 src/modules/module-protocol-pulse/sample.c diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 0eeb0ed16..9cc45b82e 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -80,6 +80,8 @@ struct impl; struct server; struct client; +#include "sample.c" + struct operation { struct spa_list link; struct client *client; @@ -87,17 +89,6 @@ struct operation { void (*callback) (struct operation *op); }; -struct sample { - uint32_t index; - struct impl *impl; - const char *name; - struct sample_spec ss; - struct channel_map map; - struct pw_properties *props; - uint32_t length; - uint8_t *buffer; -}; - struct client { struct spa_list link; struct impl *impl; @@ -134,6 +125,8 @@ struct client { struct spa_list operations; struct spa_list modules; + struct spa_list samples; + unsigned int disconnecting:1; }; @@ -2238,12 +2231,41 @@ error: return res; } +struct pending_sample { + struct spa_list link; + struct sample_play *play; + struct spa_hook listener; + unsigned int done:1; +}; + +static void pending_sample_free(struct pending_sample *ps) +{ + spa_list_remove(&ps->link); + spa_hook_remove(&ps->listener); + sample_play_destroy(ps->play); +} + +static void sample_play_done(void *data) +{ + struct pending_sample *ps = data; + ps->done = true; +} + +static const struct sample_play_events sample_play_events = { + VERSION_SAMPLE_PLAY_EVENTS, + .done = sample_play_done, +}; + static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint32_t sink_index, volume; + uint32_t sink_index, volume, idx; + struct sample *sample; + struct sample_play *play; const char *sink_name, *name; struct pw_properties *props = NULL; + struct message *reply; + struct pending_sample *ps; int res; if ((props = pw_properties_new(NULL, NULL)) == NULL) @@ -2270,7 +2292,29 @@ static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, pw_properties_update(props, &client->props->dict); - goto error_noent; + sample = find_sample(impl, name); + if (sample == NULL) + goto error_noent; + + play = sample_play_new(client->core, sample, props, sizeof(struct pending_sample)); + props = NULL; + if (play == NULL) + goto error_errno; + + ps = play->user_data; + ps->play = play; + sample_play_add_listener(play, &ps->listener, &sample_play_events, ps); + spa_list_append(&client->samples, &ps->link); + + idx = 0; + + reply = reply_new(client, tag); + if (client->version >= 13) + message_put(reply, + TAG_U32, idx, + TAG_INVALID); + + return send_message(client, reply); error_errno: res = -errno; @@ -4339,6 +4383,7 @@ static void client_free(struct client *client) struct impl *impl = client->impl; struct message *msg; struct module *module, *tmp; + struct pending_sample *p, *t; pw_log_debug(NAME" %p: client %p free", impl, client); spa_list_remove(&client->link); @@ -4346,6 +4391,9 @@ static void client_free(struct client *client) pw_map_for_each(&client->streams, client_free_stream, client); pw_map_clear(&client->streams); + spa_list_for_each_safe(p, t, &client->samples, link) + pending_sample_free(p); + spa_list_for_each_safe(module, tmp, &client->modules, link) unload_module(module); spa_list_consume(msg, &client->free_messages, link) @@ -4532,6 +4580,15 @@ exit: return res; } +static void client_clear_pending_samples(struct client *client) +{ + struct pending_sample *p, *t; + spa_list_for_each_safe(p, t, &client->samples, link) { + if (p->done) + pending_sample_free(p); + } +} + static void on_client_data(void *data, int fd, uint32_t mask) { @@ -4568,6 +4625,7 @@ on_client_data(void *data, int fd, uint32_t mask) } } } + client_clear_pending_samples(client); return; error: @@ -4604,6 +4662,7 @@ on_connect(void *data, int fd, uint32_t mask) spa_list_init(&client->out_messages); spa_list_init(&client->operations); spa_list_init(&client->modules); + spa_list_init(&client->samples); client->props = pw_properties_new( PW_KEY_CLIENT_API, "pipewire-pulse", diff --git a/src/modules/module-protocol-pulse/sample.c b/src/modules/module-protocol-pulse/sample.c new file mode 100644 index 000000000..e90259e06 --- /dev/null +++ b/src/modules/module-protocol-pulse/sample.c @@ -0,0 +1,199 @@ +/* PipeWire + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +struct sample { + int ref; + uint32_t index; + struct impl *impl; + const char *name; + struct sample_spec ss; + struct channel_map map; + struct pw_properties *props; + uint32_t length; + uint8_t *buffer; +}; + +struct sample_play_events { +#define VERSION_SAMPLE_PLAY_EVENTS 0 + uint32_t version; + + void (*done) (void *data); +}; + +#define sample_play_emit_done(p) spa_hook_list_call(&p->hooks, struct sample_play_events, done, 0) + +struct sample_play { + struct spa_list link; + struct sample *sample; + struct pw_stream *stream; + struct spa_hook listener; + struct pw_context *context; + struct pw_loop *main_loop; + uint32_t index; + uint32_t stride; + struct spa_hook_list hooks; + void *user_data; +}; + + +static void sample_free(struct sample *sample); + +static void sample_play_stream_destroy(void *data) +{ + struct sample_play *p = data; + + pw_log_info("destroy %s", p->sample->name); + spa_hook_remove(&p->listener); + p->stream = NULL; + if (--p->sample == 0) + sample_free(p->sample); + p->sample = NULL; +} + +static void sample_play_stream_process(void *data) +{ + struct sample_play *p = data; + struct sample *s = p->sample; + struct pw_buffer *b; + struct spa_buffer *buf; + uint32_t size; + uint8_t *d; + + if (p->index >= s->length) { + pw_stream_flush(p->stream, true); + return; + } + size = s->length - p->index; + + if ((b = pw_stream_dequeue_buffer(p->stream)) == NULL) { + pw_log_warn("out of buffers: %m"); + return; + } + + buf = b->buffer; + if ((d = buf->datas[0].data) == NULL) + return; + + size = SPA_MIN(size, buf->datas[0].maxsize); + + memcpy(d, p->sample->buffer + p->index, size); + + p->index += size; + + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = p->stride; + buf->datas[0].chunk->size = size; + + pw_stream_queue_buffer(p->stream, b); +} + +static void sample_play_stream_drained(void *data) +{ + struct sample_play *p = data; + sample_play_emit_done(p); +} + +struct pw_stream_events sample_play_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = sample_play_stream_destroy, + .process = sample_play_stream_process, + .drained = sample_play_stream_drained, +}; + +static struct sample_play *sample_play_new(struct pw_core *core, + struct sample *sample, struct pw_properties *props, + size_t user_data_size) +{ + struct sample_play *p; + uint8_t buffer[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod *params[1]; + uint32_t n_params = 0; + int res; + + p = calloc(1, sizeof(struct sample_play) + user_data_size); + if (p == NULL) { + res = -errno; + goto error_free; + } + + p->context = pw_core_get_context(core); + p->main_loop = pw_context_get_main_loop(p->context); + spa_hook_list_init(&p->hooks); + p->user_data = SPA_MEMBER(p, sizeof(struct sample_play), void); + + pw_properties_update(props, &sample->props->dict); + + p->stream = pw_stream_new(core, sample->name, props); + props = NULL; + if (p->stream == NULL) { + res = -errno; + goto error_free; + } + + p->sample = sample; + sample->ref++; + + pw_stream_add_listener(p->stream, + &p->listener, + &sample_play_stream_events, p); + + params[n_params++] = format_build_param(&b, SPA_PARAM_EnumFormat, + &sample->ss, &sample->map); + + res = pw_stream_connect(p->stream, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params); + if (res < 0) + goto error_cleanup; + + return p; + +error_cleanup: + pw_stream_destroy(p->stream); +error_free: + if (props) + pw_properties_free(props); + free(p); + errno = -res; + return NULL; +} + +static void sample_play_add_listener(struct sample_play *p, + struct spa_hook *listener, + const struct sample_play_events *events, void *data) +{ + spa_hook_list_append(&p->hooks, listener, events, data); +} + +static void sample_play_destroy(struct sample_play *p) +{ + if (p->stream) + pw_stream_destroy(p->stream); + free(p); +}