pulse-server: implement sample cache

This commit is contained in:
Wim Taymans 2020-11-13 13:28:46 +01:00
parent 22cec7823e
commit 8b9d398df6
3 changed files with 391 additions and 42 deletions

View file

@ -50,6 +50,8 @@
#define DEFAULT_PROCESS_MSEC 20 /* 20ms */
#define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
#define SCACHE_ENTRY_SIZE_MAX (1024*1024*16)
enum error_code {
ERR_OK = 0, /**< No error */
ERR_ACCESS, /**< Access failure */

View file

@ -301,13 +301,25 @@ static inline enum channel_position channel_name2pa(const char *name, size_t siz
}
static void channel_map_to_positions(const struct channel_map *map, uint32_t *pos)
static inline void channel_map_to_positions(const struct channel_map *map, uint32_t *pos)
{
int i;
for (i = 0; i < map->channels; i++)
pos[i] = channel_pa2id(map->map[i]);
}
static inline bool channel_map_valid(const struct channel_map *map)
{
uint8_t i;
if (map->channels == 0 || map->channels > CHANNELS_MAX)
return false;
for (i = 0; i < map->channels; i++)
if (map->map[i] < 0 || map->map[i] >= CHANNEL_POSITION_MAX)
return false;
return true;
}
enum encoding {
ENCODING_ANY,
ENCODING_PCM,

View file

@ -87,6 +87,17 @@ struct operation {
void (*callback) (struct operation *op);
};
struct sample {
uint32_t index;
struct impl *impl;
const char *name;
struct sample_spec ss;
struct channel_map map;
struct pw_properties *props;
uint32_t length;
uint8_t *buffer;
};
struct client {
struct spa_list link;
struct impl *impl;
@ -141,8 +152,14 @@ struct stream {
struct impl *impl;
struct client *client;
#define STREAM_TYPE_RECORD 0
#define STREAM_TYPE_PLAYBACK 1
#define STREAM_TYPE_UPLOAD 2
uint32_t type;
enum pw_direction direction;
struct pw_properties *props;
struct pw_stream *stream;
struct spa_hook stream_listener;
@ -204,11 +221,37 @@ struct impl {
struct spa_source *source;
struct spa_list servers;
struct pw_map samples;
};
#include "collect.c"
#include "module.c"
static void sample_free(struct sample *sample)
{
struct impl *impl = sample->impl;
if (sample->index != SPA_ID_INVALID)
pw_map_remove(&impl->samples, sample->index);
if (sample->props)
pw_properties_free(sample->props);
free(sample->buffer);
free(sample);
}
static struct sample *find_sample(struct impl *impl, const char *name)
{
union pw_map_item *item;
pw_array_for_each(item, &impl->samples.items) {
struct sample *s = item->data;
if (!pw_map_item_is_free(item) &&
strcmp(s->name, name) == 0)
return s;
}
return NULL;
}
struct command {
const char *name;
int (*run) (struct client *client, uint32_t command, uint32_t tag, struct message *msg);
@ -418,6 +461,18 @@ static int send_subscribe_event(struct client *client, uint32_t event, uint32_t
return send_message(client, reply);
}
static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id)
{
struct server *s;
spa_list_for_each(s, &impl->servers, link) {
struct client *c;
spa_list_for_each(c, &s->clients, link) {
if (c->subscribed & mask)
send_subscribe_event(c, event, id);
}
}
}
static int send_overflow(struct stream *stream)
{
struct client *client = stream->client;
@ -518,7 +573,7 @@ static int reply_set_client_name(struct client *client, uint32_t tag)
c = pw_core_get_client(client->core);
if (c == NULL)
return reply_error(client, COMMAND_SET_CLIENT_NAME, tag, -ENOENT);
return -ENOENT;
id = pw_proxy_get_bound_id((struct pw_proxy*)c);
@ -736,6 +791,7 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s
pw_log_info(NAME" %p: [%s] SUBSCRIBE tag:%u mask:%08x", impl,
client->name, tag, mask);
client->subscribed = mask;
return reply_simple_ack(client, tag);
@ -779,6 +835,8 @@ static void stream_free(struct stream *stream)
}
if (stream->buffer)
free(stream->buffer);
if (stream->props)
pw_properties_free(stream->props);
free(stream);
}
@ -1593,6 +1651,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
if (stream->channel == SPA_ID_INVALID)
goto error_errno;
stream->type = STREAM_TYPE_PLAYBACK;
stream->direction = PW_DIRECTION_OUTPUT;
stream->create_tag = tag;
stream->ss = ss;
@ -1803,6 +1862,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
if (stream == NULL)
goto error_errno;
stream->type = STREAM_TYPE_RECORD;
stream->direction = PW_DIRECTION_INPUT;
stream->impl = impl;
stream->client = client;
@ -1906,7 +1966,16 @@ static int do_delete_stream(struct client *client, uint32_t command, uint32_t ta
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
return -ENOENT;
if (command == COMMAND_DELETE_PLAYBACK_STREAM &&
stream->type != STREAM_TYPE_PLAYBACK)
return -ENOENT;
if (command == COMMAND_DELETE_RECORD_STREAM &&
stream->type != STREAM_TYPE_RECORD)
return -ENOENT;
if (command == COMMAND_DELETE_UPLOAD_STREAM &&
stream->type != STREAM_TYPE_UPLOAD)
return -ENOENT;
stream_free(stream);
@ -1930,8 +1999,8 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint
pw_log_debug(NAME" %p: %s tag:%u channel:%u", impl, commands[command].name, tag, channel);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
return -ENOENT;
pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64,
stream->read_index, stream->write_index,
@ -1975,8 +2044,8 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
pw_log_debug(NAME" %p: %s channel:%u", impl, commands[command].name, channel);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
if (stream == NULL || stream->type != STREAM_TYPE_RECORD)
return -ENOENT;
reply = reply_new(client, tag);
message_put(reply,
@ -1992,6 +2061,263 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
return send_message(client, reply);
}
static int do_create_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
const char *name;
struct sample_spec ss;
struct channel_map map;
struct pw_properties *props = NULL;
uint32_t length;
struct stream *stream = NULL;
struct message *reply;
int res;
if ((props = pw_properties_copy(client->props)) == NULL)
goto error_errno;
if ((res = message_get(m,
TAG_STRING, &name,
TAG_SAMPLE_SPEC, &ss,
TAG_CHANNEL_MAP, &map,
TAG_U32, &length,
TAG_INVALID)) < 0)
goto error_proto;
if (client->version >= 13) {
if ((res = message_get(m,
TAG_PROPLIST, props,
TAG_INVALID)) < 0)
goto error_proto;
} else {
pw_properties_set(props, PW_KEY_MEDIA_NAME, name);
}
if (name == NULL)
name = pw_properties_get(props, "event.id");
if (name == NULL)
name = pw_properties_get(props, PW_KEY_MEDIA_NAME);
if (name == NULL ||
!sample_spec_valid(&ss) ||
!channel_map_valid(&map) ||
ss.channels != map.channels ||
length == 0 || (length % sample_spec_frame_size(&ss) != 0))
goto error_invalid;
if (length >= SCACHE_ENTRY_SIZE_MAX)
goto error_toolarge;
pw_log_info(NAME" %p: [%s] %s tag:%u name:%s length:%d",
impl, client->name, commands[command].name, tag,
name, length);
stream = calloc(1, sizeof(struct stream));
if (stream == NULL)
goto error_errno;
stream->type = STREAM_TYPE_UPLOAD;
stream->direction = PW_DIRECTION_OUTPUT;
stream->impl = impl;
stream->client = client;
stream->channel = pw_map_insert_new(&client->streams, stream);
if (stream->channel == SPA_ID_INVALID)
goto error_errno;
stream->create_tag = tag;
stream->ss = ss;
stream->map = map;
stream->props = props;
stream->attr.maxlength = length;
stream->buffer = calloc(1, stream->attr.maxlength);
if (stream->buffer == NULL)
goto error_errno;
spa_ringbuffer_init(&stream->ring);
reply = reply_new(client, tag);
message_put(reply,
TAG_U32, stream->channel,
TAG_U32, length,
TAG_INVALID);
return send_message(client, reply);
error_errno:
res = -errno;
goto error;
error_proto:
res = -EPROTO;
goto error;
error_invalid:
res = -EINVAL;
goto error;
error_toolarge:
res = -EOVERFLOW;
goto error;
error:
if (props != NULL)
pw_properties_free(props);
if (stream)
stream_free(stream);
return res;
}
static int do_finish_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
uint32_t channel, event;
struct stream *stream = NULL;
struct sample *sample;
const char *name;
int res;
if ((res = message_get(m,
TAG_U32, &channel,
TAG_INVALID)) < 0)
return -EPROTO;
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL || stream->type != STREAM_TYPE_UPLOAD)
return -ENOENT;
name = pw_properties_get(stream->props, "event.id");
if (name == NULL)
name = pw_properties_get(stream->props, PW_KEY_MEDIA_NAME);
if (name == NULL)
goto error_invalid;
pw_log_info(NAME" %p: [%s] %s tag:%u channel:%u name:%s",
impl, client->name, commands[command].name, tag,
channel, name);
sample = find_sample(impl, name);
if (sample == NULL) {
sample = calloc(1, sizeof(struct sample));
if (sample == NULL)
goto error_errno;
sample->index = pw_map_insert_new(&impl->samples, sample);
if (sample->index == SPA_ID_INVALID)
goto error_errno;
event = SUBSCRIPTION_EVENT_NEW;
} else {
if (sample->props)
pw_properties_free(sample->props);
free(sample->buffer);
event = SUBSCRIPTION_EVENT_CHANGE;
}
sample->impl = impl;
sample->name = name;
sample->props = stream->props;
sample->ss = stream->ss;
sample->map = stream->map;
sample->buffer = stream->buffer;
sample->length = stream->attr.maxlength;
stream->props = NULL;
stream->buffer = NULL;
stream_free(stream);
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_SAMPLE_CACHE,
event | SUBSCRIPTION_EVENT_SAMPLE_CACHE,
sample->index);
return reply_simple_ack(client, tag);
error_errno:
res = -errno;
goto error;
error_invalid:
res = -EINVAL;
goto error;
error:
stream_free(stream);
return res;
}
static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
uint32_t sink_index, volume;
const char *sink_name, *name;
struct pw_properties *props = NULL;
int res;
if ((props = pw_properties_new(NULL, NULL)) == NULL)
goto error_errno;
if ((res = message_get(m,
TAG_U32, &sink_index,
TAG_STRING, &sink_name,
TAG_U32, &volume,
TAG_STRING, &name,
TAG_INVALID)) < 0)
goto error_proto;
if (client->version >= 13) {
if ((res = message_get(m,
TAG_PROPLIST, props,
TAG_INVALID)) < 0)
goto error_proto;
}
pw_log_info(NAME" %p: [%s] %s tag:%u sink_index:%u sink_name:%s name:%s",
impl, client->name, commands[command].name, tag,
sink_index, sink_name, name);
pw_properties_update(props, &client->props->dict);
goto error_noent;
error_errno:
res = -errno;
goto error;
error_proto:
res = -EPROTO;
goto error;
error_noent:
res = -ENOENT;
goto error;
error:
if (props != NULL)
pw_properties_free(props);
return res;
}
static int do_remove_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
const char *name;
struct sample *sample;
int res;
if ((res = message_get(m,
TAG_STRING, &name,
TAG_INVALID)) < 0)
return -EPROTO;
pw_log_info(NAME" %p: [%s] %s tag:%u name:%s",
impl, client->name, commands[command].name, tag,
name);
if (name == NULL)
return -EINVAL;
if ((sample = find_sample(impl, name)) == NULL)
return -ENOENT;
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_SAMPLE_CACHE,
SUBSCRIPTION_EVENT_REMOVE |
SUBSCRIPTION_EVENT_SAMPLE_CACHE,
sample->index);
sample_free(sample);
return reply_simple_ack(client, tag);
}
static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
@ -2011,8 +2337,8 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag,
channel, cork ? "yes" : "no");
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
return -ENOENT;
pw_stream_set_active(stream->stream, !cork);
stream->corked = cork;
@ -2040,8 +2366,8 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman
impl, client->name, commands[command].name, tag, channel);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
return -ENOENT;
switch (command) {
case COMMAND_FLUSH_PLAYBACK_STREAM:
@ -2060,20 +2386,6 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman
return reply_simple_ack(client, tag);
}
static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
pw_log_debug(NAME" %p: %s access denied", impl, commands[command].name);
return reply_error(client, command, tag, -EACCES);
}
static int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
struct impl *impl = client->impl;
pw_log_debug(NAME" %p: %s not implemented", impl, commands[command].name);
return reply_error(client, command, tag, -ENOSYS);
}
static int set_node_volume_mute(struct pw_manager_object *o,
struct volume *vol, bool *mute)
{
@ -2501,8 +2813,8 @@ static int do_set_stream_name(struct client *client, uint32_t command, uint32_t
impl, client->name, tag, channel, name);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
return -EINVAL;
if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
return -ENOENT;
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_NAME, name);
pw_stream_update_properties(stream->stream,
@ -2543,7 +2855,7 @@ static int do_update_proplist(struct client *client, uint32_t command, uint32_t
if (command != COMMAND_UPDATE_CLIENT_PROPLIST) {
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
goto error_noentity;
fix_stream_properties(stream, props);
@ -2612,7 +2924,7 @@ static int do_remove_proplist(struct client *client, uint32_t command, uint32_t
if (command != COMMAND_UPDATE_CLIENT_PROPLIST) {
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
goto error_noentity;
pw_stream_update_properties(stream->stream, &dict);
@ -2743,10 +3055,8 @@ static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag
pw_log_info(NAME" %p: [%s] DRAIN tag:%u channel:%d", impl, client->name, tag, channel);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
return -ENOENT;
if (stream->direction != PW_DIRECTION_OUTPUT)
return -EINVAL;
stream->drain_tag = tag;
return 0;
@ -3483,6 +3793,9 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui
return -ENOENT;
if (command == COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
if (stream->type != STREAM_TYPE_PLAYBACK)
return -ENOENT;
if ((res = message_get(m,
TAG_U32, &attr.maxlength,
TAG_U32, &attr.tlength,
@ -3491,6 +3804,9 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui
TAG_INVALID)) < 0)
return -EPROTO;
} else {
if (stream->type != STREAM_TYPE_RECORD)
return -ENOENT;
if ((res = message_get(m,
TAG_U32, &attr.maxlength,
TAG_U32, &attr.fragsize,
@ -3555,7 +3871,7 @@ static int do_update_stream_sample_rate(struct client *client, uint32_t command,
commands[command].name, tag, channel, rate);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL)
if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
return -ENOENT;
return reply_simple_ack(client, tag);
@ -3842,6 +4158,16 @@ static int do_unload_module(struct client *client, uint32_t command, uint32_t ta
return reply_simple_ack(client, tag);
}
static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
return -EACCES;
}
static int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
return -ENOSYS;
}
static const struct command commands[COMMAND_MAX] =
{
[COMMAND_ERROR] = { "ERROR", },
@ -3861,11 +4187,11 @@ static const struct command commands[COMMAND_MAX] =
[COMMAND_DRAIN_PLAYBACK_STREAM] = { "DRAIN_PLAYBACK_STREAM", do_drain_stream, },
[COMMAND_STAT] = { "STAT", do_stat, },
[COMMAND_GET_PLAYBACK_LATENCY] = { "GET_PLAYBACK_LATENCY", do_get_playback_latency, },
[COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_error_access, },
[COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_error_access, },
[COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_error_access, },
[COMMAND_PLAY_SAMPLE] = { "PLAY_SAMPLE", do_error_access, },
[COMMAND_REMOVE_SAMPLE] = { "REMOVE_SAMPLE", do_error_access, },
[COMMAND_CREATE_UPLOAD_STREAM] = { "CREATE_UPLOAD_STREAM", do_create_upload_stream, },
[COMMAND_DELETE_UPLOAD_STREAM] = { "DELETE_UPLOAD_STREAM", do_delete_stream, },
[COMMAND_FINISH_UPLOAD_STREAM] = { "FINISH_UPLOAD_STREAM", do_finish_upload_stream, },
[COMMAND_PLAY_SAMPLE] = { "PLAY_SAMPLE", do_play_sample, },
[COMMAND_REMOVE_SAMPLE] = { "REMOVE_SAMPLE", do_remove_sample, },
[COMMAND_GET_SERVER_INFO] = { "GET_SERVER_INFO", do_get_server_info },
[COMMAND_GET_SINK_INFO] = { "GET_SINK_INFO", do_get_info, },
@ -4075,10 +4401,9 @@ static int handle_packet(struct client *client, struct message *msg)
res = commands[command].run(client, command, tag, msg);
finish:
message_free(client, msg, false, false);
if (res < 0) {
if (res < 0)
reply_error(client, command, tag, res);
}
return res;
return 0;
}
static int handle_memblock(struct client *client, struct message *msg)
@ -4101,7 +4426,7 @@ static int handle_memblock(struct client *client, struct message *msg)
flags, msg->length);
stream = pw_map_lookup(&client->streams, channel);
if (stream == NULL) {
if (stream == NULL || stream->type == STREAM_TYPE_RECORD) {
res = -EINVAL;
goto finish;
}
@ -4603,6 +4928,13 @@ error:
}
static int impl_free_sample(void *item, void *data)
{
struct sample *s = item;
sample_free(s);
return 0;
}
static void impl_free(struct impl *impl)
{
struct server *s;
@ -4610,6 +4942,8 @@ static void impl_free(struct impl *impl)
spa_hook_remove(&impl->context_listener);
spa_list_consume(s, &impl->servers, link)
server_free(s);
pw_map_for_each(&impl->samples, impl_free_sample, impl);
pw_map_clear(&impl->samples);
if (impl->props)
pw_properties_free(impl->props);
free(impl);
@ -4661,6 +4995,7 @@ struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
spa_list_init(&impl->servers);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
pw_map_init(&impl->samples, 16, 16);
pw_context_add_listener(context, &impl->context_listener,
&context_events, impl);