From 8b9d398df6c96d0dba46653c764edbdbe8f3c8aa Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 13 Nov 2020 13:28:46 +0100 Subject: [PATCH] pulse-server: implement sample cache --- src/modules/module-protocol-pulse/defs.h | 2 + src/modules/module-protocol-pulse/format.c | 14 +- .../module-protocol-pulse/pulse-server.c | 417 ++++++++++++++++-- 3 files changed, 391 insertions(+), 42 deletions(-) diff --git a/src/modules/module-protocol-pulse/defs.h b/src/modules/module-protocol-pulse/defs.h index 807d1a2b9..7ec948093 100644 --- a/src/modules/module-protocol-pulse/defs.h +++ b/src/modules/module-protocol-pulse/defs.h @@ -50,6 +50,8 @@ #define DEFAULT_PROCESS_MSEC 20 /* 20ms */ #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC +#define SCACHE_ENTRY_SIZE_MAX (1024*1024*16) + enum error_code { ERR_OK = 0, /**< No error */ ERR_ACCESS, /**< Access failure */ diff --git a/src/modules/module-protocol-pulse/format.c b/src/modules/module-protocol-pulse/format.c index 313522a67..3ff2794d7 100644 --- a/src/modules/module-protocol-pulse/format.c +++ b/src/modules/module-protocol-pulse/format.c @@ -301,13 +301,25 @@ static inline enum channel_position channel_name2pa(const char *name, size_t siz } -static void channel_map_to_positions(const struct channel_map *map, uint32_t *pos) +static inline void channel_map_to_positions(const struct channel_map *map, uint32_t *pos) { int i; for (i = 0; i < map->channels; i++) pos[i] = channel_pa2id(map->map[i]); } +static inline bool channel_map_valid(const struct channel_map *map) +{ + uint8_t i; + if (map->channels == 0 || map->channels > CHANNELS_MAX) + return false; + for (i = 0; i < map->channels; i++) + if (map->map[i] < 0 || map->map[i] >= CHANNEL_POSITION_MAX) + return false; + return true; +} + + enum encoding { ENCODING_ANY, ENCODING_PCM, diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 4870fd622..32a1b0f72 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -87,6 +87,17 @@ struct operation { void (*callback) (struct operation *op); }; +struct sample { + uint32_t index; + struct impl *impl; + const char *name; + struct sample_spec ss; + struct channel_map map; + struct pw_properties *props; + uint32_t length; + uint8_t *buffer; +}; + struct client { struct spa_list link; struct impl *impl; @@ -141,8 +152,14 @@ struct stream { struct impl *impl; struct client *client; +#define STREAM_TYPE_RECORD 0 +#define STREAM_TYPE_PLAYBACK 1 +#define STREAM_TYPE_UPLOAD 2 + uint32_t type; enum pw_direction direction; + struct pw_properties *props; + struct pw_stream *stream; struct spa_hook stream_listener; @@ -204,11 +221,37 @@ struct impl { struct spa_source *source; struct spa_list servers; + + struct pw_map samples; }; #include "collect.c" #include "module.c" +static void sample_free(struct sample *sample) +{ + struct impl *impl = sample->impl; + + if (sample->index != SPA_ID_INVALID) + pw_map_remove(&impl->samples, sample->index); + if (sample->props) + pw_properties_free(sample->props); + free(sample->buffer); + free(sample); +} + +static struct sample *find_sample(struct impl *impl, const char *name) +{ + union pw_map_item *item; + pw_array_for_each(item, &impl->samples.items) { + struct sample *s = item->data; + if (!pw_map_item_is_free(item) && + strcmp(s->name, name) == 0) + return s; + } + return NULL; +} + struct command { const char *name; int (*run) (struct client *client, uint32_t command, uint32_t tag, struct message *msg); @@ -418,6 +461,18 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t return send_message(client, reply); } +static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id) +{ + struct server *s; + spa_list_for_each(s, &impl->servers, link) { + struct client *c; + spa_list_for_each(c, &s->clients, link) { + if (c->subscribed & mask) + send_subscribe_event(c, event, id); + } + } +} + static int send_overflow(struct stream *stream) { struct client *client = stream->client; @@ -518,7 +573,7 @@ static int reply_set_client_name(struct client *client, uint32_t tag) c = pw_core_get_client(client->core); if (c == NULL) - return reply_error(client, COMMAND_SET_CLIENT_NAME, tag, -ENOENT); + return -ENOENT; id = pw_proxy_get_bound_id((struct pw_proxy*)c); @@ -736,6 +791,7 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s pw_log_info(NAME" %p: [%s] SUBSCRIBE tag:%u mask:%08x", impl, client->name, tag, mask); + client->subscribed = mask; return reply_simple_ack(client, tag); @@ -779,6 +835,8 @@ static void stream_free(struct stream *stream) } if (stream->buffer) free(stream->buffer); + if (stream->props) + pw_properties_free(stream->props); free(stream); } @@ -1593,6 +1651,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui if (stream->channel == SPA_ID_INVALID) goto error_errno; + stream->type = STREAM_TYPE_PLAYBACK; stream->direction = PW_DIRECTION_OUTPUT; stream->create_tag = tag; stream->ss = ss; @@ -1803,6 +1862,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint if (stream == NULL) goto error_errno; + stream->type = STREAM_TYPE_RECORD; stream->direction = PW_DIRECTION_INPUT; stream->impl = impl; stream->client = client; @@ -1906,7 +1966,16 @@ static int do_delete_stream(struct client *client, uint32_t command, uint32_t ta stream = pw_map_lookup(&client->streams, channel); if (stream == NULL) - return -EINVAL; + return -ENOENT; + if (command == COMMAND_DELETE_PLAYBACK_STREAM && + stream->type != STREAM_TYPE_PLAYBACK) + return -ENOENT; + if (command == COMMAND_DELETE_RECORD_STREAM && + stream->type != STREAM_TYPE_RECORD) + return -ENOENT; + if (command == COMMAND_DELETE_UPLOAD_STREAM && + stream->type != STREAM_TYPE_UPLOAD) + return -ENOENT; stream_free(stream); @@ -1930,8 +1999,8 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint pw_log_debug(NAME" %p: %s tag:%u channel:%u", impl, commands[command].name, tag, channel); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) - return -EINVAL; + if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK) + return -ENOENT; pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64, stream->read_index, stream->write_index, @@ -1975,8 +2044,8 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 pw_log_debug(NAME" %p: %s channel:%u", impl, commands[command].name, channel); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) - return -EINVAL; + if (stream == NULL || stream->type != STREAM_TYPE_RECORD) + return -ENOENT; reply = reply_new(client, tag); message_put(reply, @@ -1992,6 +2061,263 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 return send_message(client, reply); } +static int do_create_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + const char *name; + struct sample_spec ss; + struct channel_map map; + struct pw_properties *props = NULL; + uint32_t length; + struct stream *stream = NULL; + struct message *reply; + int res; + + if ((props = pw_properties_copy(client->props)) == NULL) + goto error_errno; + + if ((res = message_get(m, + TAG_STRING, &name, + TAG_SAMPLE_SPEC, &ss, + TAG_CHANNEL_MAP, &map, + TAG_U32, &length, + TAG_INVALID)) < 0) + goto error_proto; + + if (client->version >= 13) { + if ((res = message_get(m, + TAG_PROPLIST, props, + TAG_INVALID)) < 0) + goto error_proto; + + } else { + pw_properties_set(props, PW_KEY_MEDIA_NAME, name); + } + if (name == NULL) + name = pw_properties_get(props, "event.id"); + if (name == NULL) + name = pw_properties_get(props, PW_KEY_MEDIA_NAME); + + if (name == NULL || + !sample_spec_valid(&ss) || + !channel_map_valid(&map) || + ss.channels != map.channels || + length == 0 || (length % sample_spec_frame_size(&ss) != 0)) + goto error_invalid; + if (length >= SCACHE_ENTRY_SIZE_MAX) + goto error_toolarge; + + pw_log_info(NAME" %p: [%s] %s tag:%u name:%s length:%d", + impl, client->name, commands[command].name, tag, + name, length); + + stream = calloc(1, sizeof(struct stream)); + if (stream == NULL) + goto error_errno; + + stream->type = STREAM_TYPE_UPLOAD; + stream->direction = PW_DIRECTION_OUTPUT; + stream->impl = impl; + stream->client = client; + stream->channel = pw_map_insert_new(&client->streams, stream); + if (stream->channel == SPA_ID_INVALID) + goto error_errno; + + stream->create_tag = tag; + stream->ss = ss; + stream->map = map; + stream->props = props; + + stream->attr.maxlength = length; + + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + goto error_errno; + + spa_ringbuffer_init(&stream->ring); + + reply = reply_new(client, tag); + message_put(reply, + TAG_U32, stream->channel, + TAG_U32, length, + TAG_INVALID); + return send_message(client, reply); + +error_errno: + res = -errno; + goto error; +error_proto: + res = -EPROTO; + goto error; +error_invalid: + res = -EINVAL; + goto error; +error_toolarge: + res = -EOVERFLOW; + goto error; +error: + if (props != NULL) + pw_properties_free(props); + if (stream) + stream_free(stream); + return res; +} + +static int do_finish_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + uint32_t channel, event; + struct stream *stream = NULL; + struct sample *sample; + const char *name; + int res; + + if ((res = message_get(m, + TAG_U32, &channel, + TAG_INVALID)) < 0) + return -EPROTO; + + stream = pw_map_lookup(&client->streams, channel); + if (stream == NULL || stream->type != STREAM_TYPE_UPLOAD) + return -ENOENT; + + name = pw_properties_get(stream->props, "event.id"); + if (name == NULL) + name = pw_properties_get(stream->props, PW_KEY_MEDIA_NAME); + if (name == NULL) + goto error_invalid; + + pw_log_info(NAME" %p: [%s] %s tag:%u channel:%u name:%s", + impl, client->name, commands[command].name, tag, + channel, name); + + sample = find_sample(impl, name); + if (sample == NULL) { + sample = calloc(1, sizeof(struct sample)); + if (sample == NULL) + goto error_errno; + + sample->index = pw_map_insert_new(&impl->samples, sample); + if (sample->index == SPA_ID_INVALID) + goto error_errno; + + event = SUBSCRIPTION_EVENT_NEW; + } else { + if (sample->props) + pw_properties_free(sample->props); + free(sample->buffer); + event = SUBSCRIPTION_EVENT_CHANGE; + } + sample->impl = impl; + sample->name = name; + sample->props = stream->props; + sample->ss = stream->ss; + sample->map = stream->map; + sample->buffer = stream->buffer; + sample->length = stream->attr.maxlength; + + stream->props = NULL; + stream->buffer = NULL; + stream_free(stream); + + broadcast_subscribe_event(impl, + SUBSCRIPTION_MASK_SAMPLE_CACHE, + event | SUBSCRIPTION_EVENT_SAMPLE_CACHE, + sample->index); + + return reply_simple_ack(client, tag); + +error_errno: + res = -errno; + goto error; +error_invalid: + res = -EINVAL; + goto error; +error: + stream_free(stream); + return res; +} + +static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + uint32_t sink_index, volume; + const char *sink_name, *name; + struct pw_properties *props = NULL; + int res; + + if ((props = pw_properties_new(NULL, NULL)) == NULL) + goto error_errno; + + if ((res = message_get(m, + TAG_U32, &sink_index, + TAG_STRING, &sink_name, + TAG_U32, &volume, + TAG_STRING, &name, + TAG_INVALID)) < 0) + goto error_proto; + + if (client->version >= 13) { + if ((res = message_get(m, + TAG_PROPLIST, props, + TAG_INVALID)) < 0) + goto error_proto; + + } + pw_log_info(NAME" %p: [%s] %s tag:%u sink_index:%u sink_name:%s name:%s", + impl, client->name, commands[command].name, tag, + sink_index, sink_name, name); + + pw_properties_update(props, &client->props->dict); + + goto error_noent; + +error_errno: + res = -errno; + goto error; +error_proto: + res = -EPROTO; + goto error; +error_noent: + res = -ENOENT; + goto error; +error: + if (props != NULL) + pw_properties_free(props); + return res; +} + +static int do_remove_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + const char *name; + struct sample *sample; + int res; + + if ((res = message_get(m, + TAG_STRING, &name, + TAG_INVALID)) < 0) + return -EPROTO; + + pw_log_info(NAME" %p: [%s] %s tag:%u name:%s", + impl, client->name, commands[command].name, tag, + name); + if (name == NULL) + return -EINVAL; + if ((sample = find_sample(impl, name)) == NULL) + return -ENOENT; + + broadcast_subscribe_event(impl, + SUBSCRIPTION_MASK_SAMPLE_CACHE, + SUBSCRIPTION_EVENT_REMOVE | + SUBSCRIPTION_EVENT_SAMPLE_CACHE, + sample->index); + + sample_free(sample); + + return reply_simple_ack(client, tag); +} + static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -2011,8 +2337,8 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, channel, cork ? "yes" : "no"); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) - return -EINVAL; + if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD) + return -ENOENT; pw_stream_set_active(stream->stream, !cork); stream->corked = cork; @@ -2040,8 +2366,8 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman impl, client->name, commands[command].name, tag, channel); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) - return -EINVAL; + if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK) + return -ENOENT; switch (command) { case COMMAND_FLUSH_PLAYBACK_STREAM: @@ -2060,20 +2386,6 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman return reply_simple_ack(client, tag); } -static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m) -{ - struct impl *impl = client->impl; - pw_log_debug(NAME" %p: %s access denied", impl, commands[command].name); - return reply_error(client, command, tag, -EACCES); -} - -static int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m) -{ - struct impl *impl = client->impl; - pw_log_debug(NAME" %p: %s not implemented", impl, commands[command].name); - return reply_error(client, command, tag, -ENOSYS); -} - static int set_node_volume_mute(struct pw_manager_object *o, struct volume *vol, bool *mute) { @@ -2501,8 +2813,8 @@ static int do_set_stream_name(struct client *client, uint32_t command, uint32_t impl, client->name, tag, channel, name); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) - return -EINVAL; + if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD) + return -ENOENT; items[0] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_NAME, name); pw_stream_update_properties(stream->stream, @@ -2543,7 +2855,7 @@ static int do_update_proplist(struct client *client, uint32_t command, uint32_t if (command != COMMAND_UPDATE_CLIENT_PROPLIST) { stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) + if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD) goto error_noentity; fix_stream_properties(stream, props); @@ -2612,7 +2924,7 @@ static int do_remove_proplist(struct client *client, uint32_t command, uint32_t if (command != COMMAND_UPDATE_CLIENT_PROPLIST) { stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) + if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD) goto error_noentity; pw_stream_update_properties(stream->stream, &dict); @@ -2743,10 +3055,8 @@ static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag pw_log_info(NAME" %p: [%s] DRAIN tag:%u channel:%d", impl, client->name, tag, channel); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) + if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK) return -ENOENT; - if (stream->direction != PW_DIRECTION_OUTPUT) - return -EINVAL; stream->drain_tag = tag; return 0; @@ -3483,6 +3793,9 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui return -ENOENT; if (command == COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) { + if (stream->type != STREAM_TYPE_PLAYBACK) + return -ENOENT; + if ((res = message_get(m, TAG_U32, &attr.maxlength, TAG_U32, &attr.tlength, @@ -3491,6 +3804,9 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui TAG_INVALID)) < 0) return -EPROTO; } else { + if (stream->type != STREAM_TYPE_RECORD) + return -ENOENT; + if ((res = message_get(m, TAG_U32, &attr.maxlength, TAG_U32, &attr.fragsize, @@ -3555,7 +3871,7 @@ static int do_update_stream_sample_rate(struct client *client, uint32_t command, commands[command].name, tag, channel, rate); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) + if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD) return -ENOENT; return reply_simple_ack(client, tag); @@ -3842,6 +4158,16 @@ static int do_unload_module(struct client *client, uint32_t command, uint32_t ta return reply_simple_ack(client, tag); } +static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + return -EACCES; +} + +static int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + return -ENOSYS; +} + static const struct command commands[COMMAND_MAX] = { [COMMAND_ERROR] = { "ERROR", }, @@ -3861,11 +4187,11 @@ static const struct command commands[COMMAND_MAX] = [COMMAND_DRAIN_PLAYBACK_STREAM] = { "DRAIN_PLAYBACK_STREAM", do_drain_stream, }, [COMMAND_STAT] = { "STAT", do_stat, }, [COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_playback_latency, }, - [COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_error_access, }, - [COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_error_access, }, - [COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_error_access, }, - [COMMAND_PLAY_SAMPLE] = { "PLAY_SAMPLE", do_error_access, }, - [COMMAND_REMOVE_SAMPLE] = { "REMOVE_SAMPLE", do_error_access, }, + [COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_create_upload_stream, }, + [COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_delete_stream, }, + [COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_finish_upload_stream, }, + [COMMAND_PLAY_SAMPLE] = { "PLAY_SAMPLE", do_play_sample, }, + [COMMAND_REMOVE_SAMPLE] = { "REMOVE_SAMPLE", do_remove_sample, }, [COMMAND_GET_SERVER_INFO] = { "GET_SERVER_INFO", do_get_server_info }, [COMMAND_GET_SINK_INFO] = { "GET_SINK_INFO", do_get_info, }, @@ -4075,10 +4401,9 @@ static int handle_packet(struct client *client, struct message *msg) res = commands[command].run(client, command, tag, msg); finish: message_free(client, msg, false, false); - if (res < 0) { + if (res < 0) reply_error(client, command, tag, res); - } - return res; + return 0; } static int handle_memblock(struct client *client, struct message *msg) @@ -4101,7 +4426,7 @@ static int handle_memblock(struct client *client, struct message *msg) flags, msg->length); stream = pw_map_lookup(&client->streams, channel); - if (stream == NULL) { + if (stream == NULL || stream->type == STREAM_TYPE_RECORD) { res = -EINVAL; goto finish; } @@ -4603,6 +4928,13 @@ error: } +static int impl_free_sample(void *item, void *data) +{ + struct sample *s = item; + sample_free(s); + return 0; +} + static void impl_free(struct impl *impl) { struct server *s; @@ -4610,6 +4942,8 @@ static void impl_free(struct impl *impl) spa_hook_remove(&impl->context_listener); spa_list_consume(s, &impl->servers, link) server_free(s); + pw_map_for_each(&impl->samples, impl_free_sample, impl); + pw_map_clear(&impl->samples); if (impl->props) pw_properties_free(impl->props); free(impl); @@ -4661,6 +4995,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context, spa_list_init(&impl->servers); impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.burst = 1; + pw_map_init(&impl->samples, 16, 16); pw_context_add_listener(context, &impl->context_listener, &context_events, impl);