From 7c1fbf5bee3fafb19a40e20c5d84aa00be9ecac6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 6 Oct 2020 15:34:43 +0200 Subject: [PATCH] pulse-bridge: rework messages Make recycled message blocks for receiving and sending data. Implement async flushing out the output queue. --- src/examples/media-session/pulse-bridge.c | 973 +++++++++++----------- 1 file changed, 480 insertions(+), 493 deletions(-) diff --git a/src/examples/media-session/pulse-bridge.c b/src/examples/media-session/pulse-bridge.c index 66b1b9d27..42f529262 100644 --- a/src/examples/media-session/pulse-bridge.c +++ b/src/examples/media-session/pulse-bridge.c @@ -147,8 +147,10 @@ enum { TAG_FORMAT_INFO = 'f', }; -struct data { +struct message { + struct spa_list link; uint8_t *data; + uint32_t allocated; uint32_t length; uint32_t offset; }; @@ -165,15 +167,17 @@ struct client { struct pw_core *core; - uint32_t index; + uint32_t in_index; + uint32_t out_index; struct descriptor desc; - #define TYPE_PACKET 0 #define TYPE_MEMBLOCK 1 uint32_t type; - struct data data; + struct message *message; struct pw_map streams; + struct spa_list free_messages; + struct spa_list out_messages; }; enum sample_format { @@ -286,13 +290,6 @@ struct format_info { struct pw_properties *props; }; -struct block { - struct spa_list link; - uint8_t *data; - uint32_t length; - uint32_t offset; -}; - struct stream { uint32_t create_tag; uint32_t channel; /* index in map */ @@ -304,7 +301,7 @@ struct stream { struct pw_stream *stream; struct spa_hook stream_listener; - struct spa_list blocks; + struct spa_list messages; int64_t read_index; int64_t write_index; uint64_t underrun_for; @@ -489,56 +486,56 @@ enum { }; struct command { const char *name; - int (*run) (struct client *client, uint32_t command, uint32_t tag, struct data *d); + int (*run) (struct client *client, uint32_t command, uint32_t tag, struct message *msg); }; static const struct command commands[COMMAND_MAX]; -static int data_get(struct data *d, ...); +static int message_get(struct message *m, ...); -static int read_u8(struct data *d, uint8_t *val) +static int read_u8(struct message *m, uint8_t *val) { - if (d->offset + 1 > d->length) + if (m->offset + 1 > m->length) return -ENOSPC; - *val = d->data[d->offset]; - d->offset++; + *val = m->data[m->offset]; + m->offset++; return 0; } -static int read_u32(struct data *d, uint32_t *val) +static int read_u32(struct message *m, uint32_t *val) { - if (d->offset + 4 > d->length) + if (m->offset + 4 > m->length) return -ENOSPC; - memcpy(val, &d->data[d->offset], 4); + memcpy(val, &m->data[m->offset], 4); *val = ntohl(*val); - d->offset += 4; + m->offset += 4; return 0; } -static int read_u64(struct data *d, uint64_t *val) +static int read_u64(struct message *m, uint64_t *val) { uint32_t tmp; int res; - if ((res = read_u32(d, &tmp)) < 0) + if ((res = read_u32(m, &tmp)) < 0) return res; *val = ((uint64_t)tmp) << 32; - if ((res = read_u32(d, &tmp)) < 0) + if ((res = read_u32(m, &tmp)) < 0) return res; *val |= tmp; return 0; } -static int read_sample_spec(struct data *d, struct sample_spec *ss) +static int read_sample_spec(struct message *m, struct sample_spec *ss) { int res; uint8_t tmp; - if ((res = read_u8(d, &tmp)) < 0) + if ((res = read_u8(m, &tmp)) < 0) return res; ss->format = tmp; - if ((res = read_u8(d, &ss->channels)) < 0) + if ((res = read_u8(m, &ss->channels)) < 0) return res; - return read_u32(d, &ss->rate); + return read_u32(m, &ss->rate); } -static int read_props(struct data *d, struct pw_properties *props) +static int read_props(struct message *m, struct pw_properties *props) { int res; @@ -547,7 +544,7 @@ static int read_props(struct data *d, struct pw_properties *props) void *data; uint32_t length; - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &key, TAG_INVALID)) < 0) return res; @@ -555,14 +552,14 @@ static int read_props(struct data *d, struct pw_properties *props) if (key == NULL) break; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &length, TAG_INVALID)) < 0) return res; if (length > MAX_TAG_SIZE) return -EINVAL; - if ((res = data_get(d, + if ((res = message_get(m, TAG_ARBITRARY, &data, length, TAG_INVALID)) < 0) return res; @@ -573,102 +570,102 @@ static int read_props(struct data *d, struct pw_properties *props) return 0; } -static int read_arbitrary(struct data *d, const void **val, size_t length) +static int read_arbitrary(struct message *m, const void **val, size_t length) { uint32_t len; int res; - if ((res = read_u32(d, &len)) < 0) + if ((res = read_u32(m, &len)) < 0) return res; if (len != length) return -EINVAL; - if (d->offset + length > d->length) + if (m->offset + length > m->length) return -ENOSPC; - *val = d->data + d->offset; - d->offset += length; + *val = m->data + m->offset; + m->offset += length; return 0; } -static int read_string(struct data *d, char **str) +static int read_string(struct message *m, char **str) { - uint32_t n, maxlen = d->length - d->offset; - n = strnlen(d->data + d->offset, maxlen); + uint32_t n, maxlen = m->length - m->offset; + n = strnlen(m->data + m->offset, maxlen); if (n == maxlen) return -EINVAL; - *str = d->data + d->offset; - d->offset += n + 1; + *str = m->data + m->offset; + m->offset += n + 1; return 0; } -static int read_timeval(struct data *d, struct timeval *tv) +static int read_timeval(struct message *m, struct timeval *tv) { int res; uint32_t tmp; - if ((res = read_u32(d, &tmp)) < 0) + if ((res = read_u32(m, &tmp)) < 0) return res; tv->tv_sec = tmp; - if ((res = read_u32(d, &tmp)) < 0) + if ((res = read_u32(m, &tmp)) < 0) return res; tv->tv_usec = tmp; return 0; } -static int read_channel_map(struct data *d, struct channel_map *map) +static int read_channel_map(struct message *m, struct channel_map *map) { int res; uint8_t i, tmp; - if ((res = read_u8(d, &map->channels)) < 0) + if ((res = read_u8(m, &map->channels)) < 0) return res; if (map->channels > CHANNELS_MAX) return -EINVAL; for (i = 0; i < map->channels; i ++) { - if ((res = read_u8(d, &tmp)) < 0) + if ((res = read_u8(m, &tmp)) < 0) return res; map->map[i] = tmp; } return 0; } -static int read_volume(struct data *d, float *vol) +static int read_volume(struct message *m, float *vol) { int res; uint32_t v; - if ((res = read_u32(d, &v)) < 0) + if ((res = read_u32(m, &v)) < 0) return res; *vol = ((float)v) / 0x10000U; return 0; } -static int read_cvolume(struct data *d, struct cvolume *vol) +static int read_cvolume(struct message *m, struct cvolume *vol) { int res; uint8_t i; - if ((res = read_u8(d, &vol->channels)) < 0) + if ((res = read_u8(m, &vol->channels)) < 0) return res; if (vol->channels > CHANNELS_MAX) return -EINVAL; for (i = 0; i < vol->channels; i ++) { - if ((res = read_volume(d, &vol->values[i])) < 0) + if ((res = read_volume(m, &vol->values[i])) < 0) return res; } return 0; } -static int read_format_info(struct data *d, struct format_info *info) +static int read_format_info(struct message *m, struct format_info *info) { int res; uint8_t tag, encoding; - if ((res = read_u8(d, &tag)) < 0) + if ((res = read_u8(m, &tag)) < 0) return res; if (tag != TAG_U8) return -EPROTO; - if ((res = read_u8(d, &encoding)) < 0) + if ((res = read_u8(m, &encoding)) < 0) return res; info->encoding = encoding; - if ((res = read_u8(d, &tag)) < 0) + if ((res = read_u8(m, &tag)) < 0) return res; if (tag != TAG_PROPLIST) return -EPROTO; @@ -676,15 +673,15 @@ static int read_format_info(struct data *d, struct format_info *info) info->props = pw_properties_new(NULL, NULL); if (info->props == NULL) return -errno; - return read_props(d, info->props); + return read_props(m, info->props); } -static int data_get(struct data *d, ...) +static int message_get(struct message *m, ...) { va_list va; int res; - va_start(va, d); + va_start(va, m); while (true) { int tag = va_arg(va, int); @@ -692,14 +689,14 @@ static int data_get(struct data *d, ...) if (tag == TAG_INVALID) break; - if ((res = read_u8(d, &dtag)) < 0) + if ((res = read_u8(m, &dtag)) < 0) return res; switch (dtag) { case TAG_STRING: if (tag != TAG_STRING) return -EINVAL; - if ((res = read_string(d, va_arg(va, char**))) < 0) + if ((res = read_string(m, va_arg(va, char**))) < 0) return res; break; case TAG_STRING_NULL: @@ -710,13 +707,13 @@ static int data_get(struct data *d, ...) case TAG_U8: if (dtag != tag) return -EINVAL; - if ((res = read_u8(d, va_arg(va, uint8_t*))) < 0) + if ((res = read_u8(m, va_arg(va, uint8_t*))) < 0) return res; break; case TAG_U32: if (dtag != tag) return -EINVAL; - if ((res = read_u32(d, va_arg(va, uint32_t*))) < 0) + if ((res = read_u32(m, va_arg(va, uint32_t*))) < 0) return res; break; case TAG_S64: @@ -724,13 +721,13 @@ static int data_get(struct data *d, ...) case TAG_USEC: if (dtag != tag) return -EINVAL; - if ((res = read_u64(d, va_arg(va, uint64_t*))) < 0) + if ((res = read_u64(m, va_arg(va, uint64_t*))) < 0) return res; break; case TAG_SAMPLE_SPEC: if (dtag != tag) return -EINVAL; - if ((res = read_sample_spec(d, va_arg(va, struct sample_spec*))) < 0) + if ((res = read_sample_spec(m, va_arg(va, struct sample_spec*))) < 0) return res; break; case TAG_ARBITRARY: @@ -739,7 +736,7 @@ static int data_get(struct data *d, ...) size_t len = va_arg(va, size_t); if (dtag != tag) return -EINVAL; - if ((res = read_arbitrary(d, val, len)) < 0) + if ((res = read_arbitrary(m, val, len)) < 0) return res; break; } @@ -756,37 +753,37 @@ static int data_get(struct data *d, ...) case TAG_TIMEVAL: if (dtag != tag) return -EINVAL; - if ((res = read_timeval(d, va_arg(va, struct timeval*))) < 0) + if ((res = read_timeval(m, va_arg(va, struct timeval*))) < 0) return res; break; case TAG_CHANNEL_MAP: if (dtag != tag) return -EINVAL; - if ((res = read_channel_map(d, va_arg(va, struct channel_map*))) < 0) + if ((res = read_channel_map(m, va_arg(va, struct channel_map*))) < 0) return res; break; case TAG_CVOLUME: if (dtag != tag) return -EINVAL; - if ((res = read_cvolume(d, va_arg(va, struct cvolume*))) < 0) + if ((res = read_cvolume(m, va_arg(va, struct cvolume*))) < 0) return res; break; case TAG_PROPLIST: if (dtag != tag) return -EINVAL; - if ((res = read_props(d, va_arg(va, struct pw_properties*))) < 0) + if ((res = read_props(m, va_arg(va, struct pw_properties*))) < 0) return res; break; case TAG_VOLUME: if (dtag != tag) return -EINVAL; - if ((res = read_volume(d, va_arg(va, float*))) < 0) + if ((res = read_volume(m, va_arg(va, float*))) < 0) return res; break; case TAG_FORMAT_INFO: if (dtag != tag) return -EINVAL; - if ((res = read_format_info(d, va_arg(va, struct format_info*))) < 0) + if ((res = read_format_info(m, va_arg(va, struct format_info*))) < 0) return res; break; } @@ -796,130 +793,130 @@ static int data_get(struct data *d, ...) return 0; } -static void write_8(struct data *d, uint8_t val) +static void write_8(struct message *m, uint8_t val) { - if (d->offset < d->length) - d->data[d->offset] = val; - d->offset++; + if (m->length < m->allocated) + m->data[m->length] = val; + m->length++; } -static void write_32(struct data *d, uint32_t val) +static void write_32(struct message *m, uint32_t val) { val = htonl(val); - if (d->offset + 4 <= d->length) - memcpy(d->data + d->offset, &val, 4); - d->offset += 4; + if (m->length + 4 <= m->allocated) + memcpy(m->data + m->length, &val, 4); + m->length += 4; } -static void write_string(struct data *d, const char *s) +static void write_string(struct message *m, const char *s) { - write_8(d, s ? TAG_STRING : TAG_STRING_NULL); + write_8(m, s ? TAG_STRING : TAG_STRING_NULL); if (s != NULL) { int len = strlen(s) + 1; - if (d->offset + len <= d->length) - strcpy(&d->data[d->offset], s); - d->offset += len; + if (m->length + len <= m->allocated) + strcpy(&m->data[m->length], s); + m->length += len; } } -static void write_u8(struct data *d, uint8_t val) +static void write_u8(struct message *m, uint8_t val) { - write_8(d, TAG_U8); - write_8(d, val); + write_8(m, TAG_U8); + write_8(m, val); } -static void write_u32(struct data *d, uint32_t val) +static void write_u32(struct message *m, uint32_t val) { - write_8(d, TAG_U32); - write_32(d, val); + write_8(m, TAG_U32); + write_32(m, val); } -static void write_64(struct data *d, uint8_t tag, uint64_t val) +static void write_64(struct message *m, uint8_t tag, uint64_t val) { - write_8(d, tag); - write_32(d, val >> 32); - write_32(d, val); + write_8(m, tag); + write_32(m, val >> 32); + write_32(m, val); } -static void write_sample_spec(struct data *d, struct sample_spec *ss) +static void write_sample_spec(struct message *m, struct sample_spec *ss) { - write_8(d, TAG_SAMPLE_SPEC); - write_8(d, ss->format); - write_8(d, ss->channels); - write_32(d, ss->rate); + write_8(m, TAG_SAMPLE_SPEC); + write_8(m, ss->format); + write_8(m, ss->channels); + write_32(m, ss->rate); } -static void write_arbitrary(struct data *d, const void *p, size_t length) +static void write_arbitrary(struct message *m, const void *p, size_t length) { - write_8(d, TAG_ARBITRARY); - write_32(d, length); - if (length > 0 && d->offset + length <= d->length) - memcpy(d->data + d->offset, p, length); - d->offset += length; + write_8(m, TAG_ARBITRARY); + write_32(m, length); + if (length > 0 && m->length + length <= m->allocated) + memcpy(m->data + m->length, p, length); + m->length += length; } -static void write_boolean(struct data *d, bool val) +static void write_boolean(struct message *m, bool val) { - write_8(d, val ? TAG_BOOLEAN_TRUE : TAG_BOOLEAN_FALSE); + write_8(m, val ? TAG_BOOLEAN_TRUE : TAG_BOOLEAN_FALSE); } -static void write_timeval(struct data *d, struct timeval *tv) +static void write_timeval(struct message *m, struct timeval *tv) { - write_8(d, TAG_TIMEVAL); - write_32(d, tv->tv_sec); - write_32(d, tv->tv_usec); + write_8(m, TAG_TIMEVAL); + write_32(m, tv->tv_sec); + write_32(m, tv->tv_usec); } -static void write_channel_map(struct data *d, struct channel_map *map) +static void write_channel_map(struct message *m, struct channel_map *map) { uint8_t i; - write_8(d, TAG_CHANNEL_MAP); - write_8(d, map->channels); + write_8(m, TAG_CHANNEL_MAP); + write_8(m, map->channels); for (i = 0; i < map->channels; i ++) - write_8(d, map->map[i]); + write_8(m, map->map[i]); } -static void write_volume(struct data *d, float vol) +static void write_volume(struct message *m, float vol) { - write_8(d, TAG_VOLUME); - write_32(d, vol * 0x10000U); + write_8(m, TAG_VOLUME); + write_32(m, vol * 0x10000U); } -static void write_cvolume(struct data *d, struct cvolume *cvol) +static void write_cvolume(struct message *m, struct cvolume *cvol) { uint8_t i; - write_8(d, TAG_CVOLUME); - write_8(d, cvol->channels); + write_8(m, TAG_CVOLUME); + write_8(m, cvol->channels); for (i = 0; i < cvol->channels; i ++) - write_32(d, cvol->values[i] * 0x10000U); + write_32(m, cvol->values[i] * 0x10000U); } -static void write_props(struct data *d, struct pw_properties *props) +static void write_props(struct message *m, struct pw_properties *props) { const struct spa_dict_item *it; - write_8(d, TAG_PROPLIST); + write_8(m, TAG_PROPLIST); if (props != NULL) { spa_dict_for_each(it, &props->dict) { int l = strlen(it->value); - write_string(d, it->key); - write_u32(d, l+1); - write_arbitrary(d, it->value, l+1); + write_string(m, it->key); + write_u32(m, l+1); + write_arbitrary(m, it->value, l+1); } } - write_string(d, NULL); + write_string(m, NULL); } -static void write_format_info(struct data *d, struct format_info *info) +static void write_format_info(struct message *m, struct format_info *info) { - write_8(d, TAG_FORMAT_INFO); - write_u8(d, (uint8_t) info->encoding); - write_props(d, info->props); + write_8(m, TAG_FORMAT_INFO); + write_u8(m, (uint8_t) info->encoding); + write_props(m, info->props); } -static int data_put(struct data *d, ...) +static int message_put(struct message *m, ...) { va_list va; - va_start(va, d); + va_start(va, m); while (true) { int tag = va_arg(va, int); @@ -928,49 +925,49 @@ static int data_put(struct data *d, ...) switch (tag) { case TAG_STRING: - write_string(d, va_arg(va, const char *)); + write_string(m, va_arg(va, const char *)); break; case TAG_U8: - write_u8(d, (uint8_t)va_arg(va, int)); + write_u8(m, (uint8_t)va_arg(va, int)); break; case TAG_U32: - write_u32(d, (uint32_t)va_arg(va, uint32_t)); + write_u32(m, (uint32_t)va_arg(va, uint32_t)); break; case TAG_S64: case TAG_U64: case TAG_USEC: - write_64(d, tag, va_arg(va, uint64_t)); + write_64(m, tag, va_arg(va, uint64_t)); break; case TAG_SAMPLE_SPEC: - write_sample_spec(d, va_arg(va, struct sample_spec*)); + write_sample_spec(m, va_arg(va, struct sample_spec*)); break; case TAG_ARBITRARY: { const void *p = va_arg(va, const void*); size_t length = va_arg(va, size_t); - write_arbitrary(d, p, length); + write_arbitrary(m, p, length); break; } case TAG_BOOLEAN: - write_boolean(d, va_arg(va, int)); + write_boolean(m, va_arg(va, int)); break; case TAG_TIMEVAL: - write_timeval(d, va_arg(va, struct timeval*)); + write_timeval(m, va_arg(va, struct timeval*)); break; case TAG_CHANNEL_MAP: - write_channel_map(d, va_arg(va, struct channel_map*)); + write_channel_map(m, va_arg(va, struct channel_map*)); break; case TAG_CVOLUME: - write_cvolume(d, va_arg(va, struct cvolume*)); + write_cvolume(m, va_arg(va, struct cvolume*)); break; case TAG_PROPLIST: - write_props(d, va_arg(va, struct pw_properties*)); + write_props(m, va_arg(va, struct pw_properties*)); break; case TAG_VOLUME: - write_volume(d, va_arg(va, double)); + write_volume(m, va_arg(va, double)); break; case TAG_FORMAT_INFO: - write_format_info(d, va_arg(va, struct format_info*)); + write_format_info(m, va_arg(va, struct format_info*)); break; } } @@ -979,37 +976,147 @@ static int data_put(struct data *d, ...) return 0; } - -static int send_data(struct client *client, struct data *d) +static void message_free(struct client *client, struct message *msg, bool destroy) { - struct descriptor desc; + spa_list_remove(&msg->link); + if (destroy) + free(msg); + else + spa_list_append(&client->free_messages, &msg->link); +} - desc.length = htonl(d->offset); - desc.channel = htonl(-1); - desc.offset_hi = 0; - desc.offset_lo = 0; - desc.flags = 0; - send(client->source->fd, &desc, sizeof(desc), MSG_NOSIGNAL); - send(client->source->fd, d->data, d->offset, MSG_NOSIGNAL); +static struct message *message_alloc(struct client *client, uint32_t size) +{ + struct message *msg = NULL; + + if (!spa_list_is_empty(&client->free_messages)) { + msg = spa_list_first(&client->free_messages, struct message, link); + spa_list_remove(&msg->link); + } + if (msg == NULL || msg->allocated < size) { + uint32_t alloc = SPA_ROUND_UP_N(SPA_MAX(size, 4096u), 4096u); + msg = realloc(msg, sizeof(struct message) + alloc); + msg->allocated = alloc; + msg->data = SPA_MEMBER(msg, sizeof(struct message), void); + } + msg->offset = 0; + msg->length = size; + return msg; +} + +static int flush_messages(struct client *client) +{ + int res; + + while (true) { + struct message *m; + struct descriptor desc; + void *data; + size_t size; + + if (spa_list_is_empty(&client->out_messages)) + return 0; + m = spa_list_first(&client->out_messages, struct message, link); + + if (client->out_index < sizeof(desc)) { + desc.length = htonl(m->length); + desc.channel = htonl(-1); + desc.offset_hi = 0; + desc.offset_lo = 0; + desc.flags = 0; + + data = SPA_MEMBER(&desc, client->out_index, void); + size = sizeof(desc) - client->out_index; + } else if (client->out_index < m->length + sizeof(desc)) { + uint32_t idx = client->out_index - sizeof(desc); + data = m->data + idx; + size = m->length - idx; + } else { + spa_list_remove(&m->link); + spa_list_append(&client->free_messages, &m->link); + client->out_index = 0; + continue; + } + + while (true) { + res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res < 0) { + if (errno == EINTR) + continue; + else + return -errno; + } + client->out_index += res; + break; + } + } return 0; } -static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int send_message(struct client *client, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + int res; + + m->offset = 0; + spa_list_append(&client->out_messages, &m->link); + res = flush_messages(client); + if (res == -EAGAIN) { + int mask = client->source->mask; + SPA_FLAG_SET(mask, SPA_IO_OUT); + pw_loop_update_io(impl->loop, client->source, mask); + res = 0; + } + return res; +} + +static struct message *reply_new(struct client *client, uint32_t tag) +{ + struct message *reply; + reply = message_alloc(client, 0); + pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); + message_put(reply, + TAG_U32, COMMAND_REPLY, + TAG_U32, tag, + TAG_INVALID); + return reply; +} + +static int reply_simple_ack(struct client *client, uint32_t tag) +{ + struct message *reply = reply_new(client, tag); + return send_message(client, reply); +} + +static int reply_error(struct client *client, uint32_t tag, uint32_t error) +{ + struct message *reply; + + pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error); + + reply = message_alloc(client, 0); + message_put(reply, + TAG_U32, COMMAND_ERROR, + TAG_U32, tag, + TAG_U32, error, + TAG_INVALID); + return send_message(client, reply); +} + +static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + struct message *reply; uint32_t version; const void *cookie; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &version, TAG_ARBITRARY, &cookie, NATIVE_COOKIE_LENGTH, TAG_INVALID)) < 0) { return res; } - if (version < 8) return -EPROTO; @@ -1018,38 +1125,32 @@ static int do_command_auth(struct client *client, uint32_t command, uint32_t tag client->version = version; - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - pw_log_info(NAME" %p: AUTH version:%d", impl, version); - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, + reply = reply_new(client, tag); + message_put(reply, TAG_U32, PROTOCOL_VERSION, TAG_INVALID); - return send_data(client, &reply); + return send_message(client, reply); } -static int do_set_client_name(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_set_client_name(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; const char *name = NULL; int res, changed = 0; if (client->version < 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &name, TAG_INVALID)) < 0) return res; if (name) changed += pw_properties_set(client->props, "application.name", name); } else { - if ((res = data_get(d, + if ((res = message_get(m, TAG_PROPLIST, client->props, TAG_INVALID)) < 0) return res; @@ -1061,62 +1162,40 @@ static int do_set_client_name(struct client *client, uint32_t command, uint32_t pw_log_info(NAME" %p: SET_CLIENT_NAME %s", impl, pw_properties_get(client->props, "application.name")); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); + reply = reply_new(client, tag); if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_U32, 0, /* client index */ TAG_INVALID); } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; uint32_t mask; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &mask, TAG_INVALID)) < 0) return res; pw_log_info(NAME" %p: SUBSCRIBE mask:%08x", impl, mask); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); + reply = reply_new(client, tag); - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); - - return send_data(client, &reply); -} - -static void block_free(struct block *block) -{ - spa_list_remove(&block->link); - free(block->data); - free(block); + return send_message(client, reply); } static void stream_flush(struct stream *stream) { - struct block *block; - spa_list_consume(block, &stream->blocks, link) - block_free(block); + struct message *msg; + spa_list_consume(msg, &stream->messages, link) + message_free(stream->client, msg, false); stream->write_index = stream->read_index = 0; stream->playing_for = 0; stream->underrun_for = 0; @@ -1182,8 +1261,7 @@ static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed) static int send_command_request(struct stream *stream) { struct client *client = stream->client; - uint8_t buffer[1024]; - struct data msg; + struct message *msg; uint32_t size; size = writable_size(stream, 0); @@ -1192,82 +1270,34 @@ static int send_command_request(struct stream *stream) pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); - spa_zero(msg); - msg.data = buffer; - msg.length = sizeof(buffer); - - data_put(&msg, + msg = message_alloc(client, 0); + message_put(msg, TAG_U32, COMMAND_REQUEST, TAG_U32, -1, TAG_U32, stream->channel, TAG_U32, size, TAG_INVALID); - return send_data(client, &msg); -} - -static int reply_simple_ack(struct client *client, uint32_t tag) -{ - uint8_t buffer[1024]; - struct data reply; - - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); - - return send_data(client, &reply); -} - -static int reply_error(struct client *client, uint32_t tag, uint32_t error) -{ - uint8_t buffer[1024]; - struct data reply; - - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error); - - data_put(&reply, - TAG_U32, COMMAND_ERROR, - TAG_U32, tag, - TAG_U32, error, - TAG_INVALID); - - return send_data(client, &reply); + return send_message(client, msg); } static int reply_create_playback_stream(struct stream *stream) { struct client *client = stream->client; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; uint32_t size; - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - size = writable_size(stream, 0); - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, stream->create_tag, + reply = reply_new(client, stream->create_tag); + message_put(reply, TAG_U32, stream->channel, /* stream index/channel */ TAG_U32, 0, /* sink_input/stream index */ TAG_U32, size, /* missing/requested bytes */ TAG_INVALID); if (client->version >= 9) { - data_put(&reply, + message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.tlength, TAG_U32, stream->attr.prebuf, @@ -1275,7 +1305,7 @@ static int reply_create_playback_stream(struct stream *stream) TAG_INVALID); } if (client->version >= 12) { - data_put(&reply, + message_put(reply, TAG_SAMPLE_SPEC, &stream->ss, TAG_CHANNEL_MAP, &stream->map, TAG_U32, 0, /* sink index */ @@ -1284,7 +1314,7 @@ static int reply_create_playback_stream(struct stream *stream) TAG_INVALID); } if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_USEC, 0ULL, /* sink configured latency */ TAG_INVALID); } @@ -1292,41 +1322,35 @@ static int reply_create_playback_stream(struct stream *stream) struct format_info info; spa_zero(info); info.encoding = ENCODING_PCM; - data_put(&reply, + message_put(reply, TAG_FORMAT_INFO, &info, /* sink_input format */ TAG_INVALID); } stream->create_tag = SPA_ID_INVALID; - return send_data(client, &reply); + return send_message(client, reply); } static int reply_create_record_stream(struct stream *stream) { struct client *client = stream->client; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, stream->create_tag, + reply = reply_new(client, stream->create_tag); + message_put(reply, TAG_U32, stream->channel, /* stream index/channel */ TAG_U32, 0, /* source_output/stream index */ TAG_INVALID); if (client->version >= 9) { - data_put(&reply, + message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.fragsize, TAG_INVALID); } if (client->version >= 12) { - data_put(&reply, + message_put(reply, TAG_SAMPLE_SPEC, &stream->ss, TAG_CHANNEL_MAP, &stream->map, TAG_U32, 0, /* source index */ @@ -1335,7 +1359,7 @@ static int reply_create_record_stream(struct stream *stream) TAG_INVALID); } if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_USEC, 0ULL, /* source configured latency */ TAG_INVALID); } @@ -1343,14 +1367,14 @@ static int reply_create_record_stream(struct stream *stream) struct format_info info; spa_zero(info); info.encoding = ENCODING_PCM; - data_put(&reply, + message_put(reply, TAG_FORMAT_INFO, &info, /* source_output format */ TAG_INVALID); } stream->create_tag = SPA_ID_INVALID; - return send_data(client, &reply); + return send_message(client, reply); } static void stream_state_changed(void *data, enum pw_stream_state old, @@ -1508,7 +1532,8 @@ static void update_timing_info(struct stream *stream) static void stream_process(void *data) { struct stream *stream = data; - struct block *block; + struct client *client = stream->client; + struct message *msg; struct pw_buffer *buffer; struct spa_buffer *buf; uint32_t size, maxsize; @@ -1518,7 +1543,7 @@ static void stream_process(void *data) update_timing_info(stream); - while (!spa_list_is_empty(&stream->blocks)) { + while (!spa_list_is_empty(&stream->messages)) { buffer = pw_stream_dequeue_buffer(stream->stream); if (buffer == NULL) break; @@ -1527,19 +1552,19 @@ static void stream_process(void *data) if ((p = buf->datas[0].data) == NULL) break; - block = spa_list_first(&stream->blocks, struct block, link); + msg = spa_list_first(&stream->messages, struct message, link); maxsize = buf->datas[0].maxsize; - size = SPA_MIN(block->length - block->offset, maxsize); - memcpy(p, block->data + block->offset, size); + size = SPA_MIN(msg->length - msg->offset, maxsize); + memcpy(p, SPA_MEMBER(msg->data, msg->offset, void), size); - pw_log_trace(NAME" %p: process block %p %d-%d/%d", - stream, block, block->offset, size, block->length); + pw_log_trace(NAME" %p: process message %p %d-%d/%d", + stream, msg, msg->offset, size, msg->length); stream->read_index += size; stream->playing_for += size; - block->offset += size; - if (block->offset >= block->length) - block_free(block); + msg->offset += size; + if (msg->offset >= msg->length) + message_free(client, msg, false); buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stream->frame_size; @@ -1625,7 +1650,7 @@ static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) attr->prebuf = SPA_MAX(attr->prebuf, frame_size); } -static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; const char *name = NULL; @@ -1666,7 +1691,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui props = pw_properties_new(NULL, NULL); if (client->version < 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &name, TAG_INVALID)) < 0) goto error; @@ -1675,7 +1700,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui goto error; } } - if ((res = data_get(d, + if ((res = message_get(m, TAG_SAMPLE_SPEC, &ss, TAG_CHANNEL_MAP, &map, TAG_U32, &sink_index, @@ -1693,7 +1718,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui pw_log_info(NAME" %p: CREATE_PLAYBACK_STREAM corked:%u", impl, corked); if (client->version >= 12) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &no_remap, TAG_BOOLEAN, &no_remix, TAG_BOOLEAN, &fix_format, @@ -1705,7 +1730,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui goto error; } if (client->version >= 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &muted, TAG_BOOLEAN, &adjust_latency, TAG_PROPLIST, props, @@ -1713,14 +1738,14 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui goto error; } if (client->version >= 14) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &volume_set, TAG_BOOLEAN, &early_requests, TAG_INVALID)) < 0) goto error; } if (client->version >= 15) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &muted_set, TAG_BOOLEAN, &dont_inhibit_auto_suspend, TAG_BOOLEAN, &fail_on_suspend, @@ -1728,19 +1753,19 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui goto error; } if (client->version >= 17) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &relative_volume, TAG_INVALID)) < 0) goto error; } if (client->version >= 18) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &passthrough, TAG_INVALID)) < 0) goto error; } if (client->version >= 21) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U8, &n_formats, TAG_INVALID)) < 0) goto error; @@ -1749,14 +1774,14 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui uint8_t i; formats = calloc(n_formats, sizeof(struct format_info)); for (i = 0; i < n_formats; i++) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_FORMAT_INFO, &formats[i], TAG_INVALID)) < 0) goto error; } } } - if (d->offset != d->length) { + if (m->offset != m->length) { res = -EPROTO; goto error; } @@ -1771,7 +1796,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui stream->corked = corked; stream->adjust_latency = adjust_latency; stream->channel = pw_map_insert_new(&client->streams, stream); - spa_list_init(&stream->blocks); + spa_list_init(&stream->messages); stream->direction = PW_DIRECTION_OUTPUT; stream->create_tag = tag; @@ -1824,7 +1849,7 @@ error: return res; } -static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; const char *name = NULL; @@ -1869,7 +1894,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint props = pw_properties_new(NULL, NULL); if (client->version < 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &name, TAG_INVALID)) < 0) goto error; @@ -1878,7 +1903,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint goto error; } } - if ((res = data_get(d, + if ((res = message_get(m, TAG_SAMPLE_SPEC, &ss, TAG_CHANNEL_MAP, &map, TAG_U32, &source_index, @@ -1890,7 +1915,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint goto error; if (client->version >= 12) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &no_remap, TAG_BOOLEAN, &no_remix, TAG_BOOLEAN, &fix_format, @@ -1902,7 +1927,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint goto error; } if (client->version >= 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &peak_detect, TAG_BOOLEAN, &adjust_latency, TAG_PROPLIST, props, @@ -1911,20 +1936,20 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint goto error; } if (client->version >= 14) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &early_requests, TAG_INVALID)) < 0) goto error; } if (client->version >= 15) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &dont_inhibit_auto_suspend, TAG_BOOLEAN, &fail_on_suspend, TAG_INVALID)) < 0) goto error; } if (client->version >= 22) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U8, &n_formats, TAG_INVALID)) < 0) goto error; @@ -1933,13 +1958,13 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint uint8_t i; formats = calloc(n_formats, sizeof(struct format_info)); for (i = 0; i < n_formats; i++) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_FORMAT_INFO, &formats[i], TAG_INVALID)) < 0) goto error; } } - if ((res = data_get(d, + if ((res = message_get(m, TAG_CVOLUME, &volume, TAG_BOOLEAN, &muted, TAG_BOOLEAN, &volume_set, @@ -1949,7 +1974,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint TAG_INVALID)) < 0) goto error; } - if (d->offset != d->length) { + if (m->offset != m->length) { res = -EPROTO; goto error; } @@ -1963,7 +1988,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint stream->impl = impl; stream->client = client; stream->channel = pw_map_insert_new(&client->streams, stream); - spa_list_init(&stream->blocks); + spa_list_init(&stream->messages); stream->stream = pw_stream_new(client->core, name, props); props = NULL; @@ -2005,14 +2030,14 @@ error: return res; } -static int do_delete_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_delete_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) return res; @@ -2027,17 +2052,16 @@ static int do_delete_stream(struct client *client, uint32_t command, uint32_t ta return reply_simple_ack(client, tag); } -static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; uint32_t channel; struct timeval tv; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_TIMEVAL, &tv, TAG_INVALID)) < 0) @@ -2048,17 +2072,13 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint if (stream == NULL) return -EINVAL; - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - pw_log_info("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64, + pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64, stream->read_index, stream->write_index, stream->write_index - stream->read_index, stream->delay); - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, + reply = reply_new(client, tag); + message_put(reply, TAG_USEC, stream->delay, /* sink latency + queued samples */ TAG_USEC, 0, /* always 0 */ TAG_BOOLEAN, stream->playing_for > 0 && @@ -2070,25 +2090,24 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint TAG_INVALID); if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_U64, stream->underrun_for, TAG_U64, stream->playing_for, TAG_INVALID); } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; uint32_t channel; struct timeval tv, now; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_TIMEVAL, &tv, TAG_INVALID)) < 0) @@ -2099,28 +2118,25 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 if (stream == NULL) return -EINVAL; - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - gettimeofday(&now, NULL); - data_put(&reply, + reply = reply_new(client, tag); + message_put(reply, TAG_U32, COMMAND_REPLY, TAG_U32, tag, - TAG_USEC, 0, /* monitor latency */ - TAG_USEC, 0, /* source latency + queued */ + TAG_USEC, 0, /* monitor latency */ + TAG_USEC, stream->delay, /* source latency + queued */ TAG_BOOLEAN, true, /* playing state */ TAG_TIMEVAL, &tv, - TAG_TIMEVAL, &now, + TAG_TIMEVAL, &stream->timestamp, TAG_S64, stream->write_index, TAG_S64, stream->read_index, TAG_INVALID); - return send_data(client, &reply); + return send_message(client, reply); } -static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; @@ -2128,7 +2144,7 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_BOOLEAN, &cork, TAG_INVALID)) < 0) @@ -2148,14 +2164,14 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, return reply_simple_ack(client, tag); } -static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) return res; @@ -2183,12 +2199,12 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman return reply_simple_ack(client, tag); } -static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m) { return reply_error(client, tag, ERR_ACCESS); } -static int do_set_stream_volume(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_set_stream_volume(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; @@ -2196,7 +2212,7 @@ static int do_set_stream_volume(struct client *client, uint32_t command, uint32_ int res; struct cvolume volume; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_CVOLUME, &volume, TAG_INVALID)) < 0) @@ -2218,7 +2234,7 @@ static int do_set_stream_volume(struct client *client, uint32_t command, uint32_ return reply_simple_ack(client, tag); } -static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; @@ -2227,7 +2243,7 @@ static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t bool mute; float val; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_BOOLEAN, &mute, TAG_INVALID)) < 0) @@ -2251,7 +2267,7 @@ static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t return reply_simple_ack(client, tag); } -static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; @@ -2260,7 +2276,7 @@ static int do_set_stream_name(struct client *client, uint32_t command, uint32_t struct spa_dict_item items[1]; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_STRING, &name, TAG_INVALID)) < 0) @@ -2281,7 +2297,7 @@ static int do_set_stream_name(struct client *client, uint32_t command, uint32_t return reply_simple_ack(client, tag); } -static int do_update_proplist(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_update_proplist(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel, mode; @@ -2292,7 +2308,7 @@ static int do_update_proplist(struct client *client, uint32_t command, uint32_t props = pw_properties_new(NULL, NULL); if (command != COMMAND_UPDATE_CLIENT_PROPLIST) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) goto exit; @@ -2302,7 +2318,7 @@ static int do_update_proplist(struct client *client, uint32_t command, uint32_t pw_log_info(NAME" %p: %s channel:%d", impl, commands[command].name, channel); - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &mode, TAG_PROPLIST, &props, TAG_INVALID)) < 0) @@ -2324,7 +2340,7 @@ exit: return res; } -static int do_remove_proplist(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_remove_proplist(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t i, channel; @@ -2337,7 +2353,7 @@ static int do_remove_proplist(struct client *client, uint32_t command, uint32_t props = pw_properties_new(NULL, NULL); if (command != COMMAND_REMOVE_CLIENT_PROPLIST) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) goto exit; @@ -2350,7 +2366,7 @@ static int do_remove_proplist(struct client *client, uint32_t command, uint32_t while (true) { const char *key; - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &key, TAG_INVALID)) < 0) goto exit; @@ -2383,21 +2399,16 @@ exit: return res; } -static int do_get_server_info(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_server_info(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; char name[256]; - struct data reply; + struct message *reply; struct sample_spec ss; struct channel_map map; pw_log_info(NAME" %p: GET_SERVER_INFO", impl); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - snprintf(name, sizeof(name)-1, "PulseAudio (on PipeWire %s)", pw_get_library_version()); spa_zero(ss); @@ -2410,9 +2421,8 @@ static int do_get_server_info(struct client *client, uint32_t command, uint32_t map.map[0] = 1; map.map[1] = 2; - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, + reply = reply_new(client, tag); + message_put(reply, TAG_STRING, name, TAG_STRING, "14.0.0", TAG_STRING, pw_get_user_name(), @@ -2424,28 +2434,22 @@ static int do_get_server_info(struct client *client, uint32_t command, uint32_t TAG_INVALID); if (client->version >= 15) { - data_put(&reply, + message_put(reply, TAG_CHANNEL_MAP, &map, TAG_INVALID); } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; pw_log_info(NAME" %p: STAT", impl); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, + reply = reply_new(client, tag); + message_put(reply, TAG_U32, 0, /* n_allocated */ TAG_U32, 0, /* allocated size */ TAG_U32, 0, /* n_accumulated */ @@ -2453,19 +2457,18 @@ static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct TAG_U32, 0, /* sample cache size */ TAG_INVALID); - return send_data(client, &reply); + return send_message(client, reply); } -static int do_lookup(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_lookup(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; const char *name = NULL; - struct data reply; + struct message *reply; uint32_t idx = 0; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &name, TAG_INVALID)) < 0) return res; @@ -2474,27 +2477,22 @@ static int do_lookup(struct client *client, uint32_t command, uint32_t tag, stru pw_log_info(NAME" %p: LOOKUP %s", impl, name); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, + reply = reply_new(client, tag); + message_put(reply, TAG_U32, idx, TAG_INVALID); - return send_data(client, &reply); + return send_message(client, reply); } -static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) return res; @@ -2511,31 +2509,30 @@ static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag return reply_simple_ack(client, tag); } -static void fill_client_info(struct client *client, struct data *d) +static void fill_client_info(struct client *client, struct message *m) { - data_put(d, + message_put(m, TAG_U32, 0, /* client index */ TAG_STRING, pw_properties_get(client->props, "application.name"), TAG_U32, SPA_ID_INVALID, /* module */ TAG_STRING, "PipeWire", /* driver */ TAG_INVALID); if (client->version >= 13) { - data_put(d, + message_put(m, TAG_PROPLIST, client->props, TAG_INVALID); } } -static int do_get_info(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_info(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; uint32_t idx; const char *name = NULL; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &idx, TAG_INVALID)) < 0) return res; @@ -2545,7 +2542,7 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st case COMMAND_GET_SOURCE_INFO: case COMMAND_GET_CARD_INFO: case COMMAND_GET_SAMPLE_INFO: - if ((res = data_get(d, + if ((res = message_get(m, TAG_STRING, &name, TAG_INVALID)) < 0) return res; @@ -2555,18 +2552,10 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st pw_log_info(NAME" %p: %s idx:%u name:%s", impl, commands[command].name, idx, name); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); - + reply = reply_new(client, tag); switch (command) { case COMMAND_GET_CLIENT_INFO: - fill_client_info(client, &reply); + fill_client_info(client, reply); break; case COMMAND_GET_MODULE_INFO: case COMMAND_GET_CARD_INFO: @@ -2583,29 +2572,20 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st default: return -ENOTSUP; } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_get_info_list(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_get_info_list(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; - struct data reply; + struct message *reply; pw_log_info(NAME" %p: %s", impl, commands[command].name); - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); - + reply = reply_new(client, tag); switch (command) { case COMMAND_GET_CLIENT_INFO_LIST: - fill_client_info(client, &reply); + fill_client_info(client, reply); break; case COMMAND_GET_MODULE_INFO_LIST: case COMMAND_GET_CARD_INFO_LIST: @@ -2622,21 +2602,20 @@ static int do_get_info_list(struct client *client, uint32_t command, uint32_t ta default: return -ENOTSUP; } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_set_stream_buffer_attr(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_set_stream_buffer_attr(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; - uint8_t buffer[1024]; uint32_t channel; struct stream *stream; - struct data reply; + struct message *reply; struct buffer_attr attr; int res; bool adjust_latency = false, early_requests = false; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_INVALID)) < 0) return res; @@ -2648,17 +2627,10 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui return res; } - spa_zero(reply); - reply.data = buffer; - reply.length = sizeof(buffer); - - data_put(&reply, - TAG_U32, COMMAND_REPLY, - TAG_U32, tag, - TAG_INVALID); + reply = reply_new(client, tag); if (command == COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &attr.maxlength, TAG_U32, &attr.tlength, TAG_U32, &attr.prebuf, @@ -2666,68 +2638,68 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui TAG_INVALID)) < 0) return res; if (client->version >= 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &adjust_latency, TAG_INVALID)) < 0) return res; } if (client->version >= 14) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &early_requests, TAG_INVALID)) < 0) return res; } - data_put(&reply, + message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.tlength, TAG_U32, stream->attr.prebuf, TAG_U32, stream->attr.minreq, TAG_INVALID); if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_USEC, 0, /* configured_sink_latency */ TAG_INVALID); } } else { - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &attr.maxlength, TAG_U32, &attr.fragsize, TAG_INVALID)) < 0) return res; if (client->version >= 13) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &adjust_latency, TAG_INVALID)) < 0) return res; } if (client->version >= 14) { - if ((res = data_get(d, + if ((res = message_get(m, TAG_BOOLEAN, &early_requests, TAG_INVALID)) < 0) return res; } - data_put(&reply, + message_put(reply, TAG_U32, stream->attr.maxlength, TAG_U32, stream->attr.fragsize, TAG_INVALID); if (client->version >= 13) { - data_put(&reply, + message_put(reply, TAG_USEC, 0, /* configured_source_latency */ TAG_INVALID); } } - return send_data(client, &reply); + return send_message(client, reply); } -static int do_update_stream_sample_rate(struct client *client, uint32_t command, uint32_t tag, struct data *d) +static int do_update_stream_sample_rate(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; uint32_t channel, rate; struct stream *stream; int res; - if ((res = data_get(d, + if ((res = message_get(m, TAG_U32, &channel, TAG_U32, &rate, TAG_INVALID)) < 0) @@ -2905,10 +2877,15 @@ static const struct command commands[COMMAND_MAX] = static void client_free(struct client *client) { struct impl *impl = client->impl; + struct message *msg; pw_log_info(NAME" %p: client %p free", impl, client); spa_list_remove(&client->link); pw_map_clear(&client->streams); + spa_list_consume(msg, &client->free_messages, link) + message_free(client, msg, true); + spa_list_consume(msg, &client->out_messages, link) + message_free(client, msg, true); if (client->core) pw_core_disconnect(client->core); if (client->props) @@ -2918,14 +2895,13 @@ static void client_free(struct client *client) free(client); } -static int handle_packet(struct client *client) +static int handle_packet(struct client *client, struct message *msg) { struct impl *impl = client->impl; int res = 0; uint32_t command, tag; - struct data *d = &client->data; - if (data_get(d, + if (message_get(msg, TAG_U32, &command, TAG_U32, &tag, TAG_INVALID) < 0) { @@ -2950,17 +2926,20 @@ static int handle_packet(struct client *client) goto finish; } - res = commands[command].run(client, command, tag, d); + res = commands[command].run(client, command, tag, msg); + if (res < 0) { + pw_log_error(NAME" %p: command %d (%s) error: %s", + impl, command, commands[command].name, spa_strerror(res)); + } finish: return res; } -static int handle_memblock(struct client *client) +static int handle_memblock(struct client *client, struct message *msg) { struct impl *impl = client->impl; struct stream *stream; - struct block *block; uint32_t channel, flags; int64_t offset; @@ -2972,19 +2951,15 @@ static int handle_memblock(struct client *client) pw_log_debug(NAME" %p: Received memblock channel:%d offset:%"PRIi64 " flags:%08x size:%u", impl, channel, offset, - flags, client->data.length); + flags, msg->length); stream = pw_map_lookup(&client->streams, channel); if (stream == NULL) return -EINVAL; - block = calloc(1, sizeof(struct block)); - block->data = client->data.data; - block->length = client->data.length; - pw_log_debug("new block %p %p", block, block->data); - client->data.data = NULL; - spa_list_append(&stream->blocks, &block->link); - stream->write_index += block->length; + pw_log_debug("new block %p %p", msg, msg->data); + spa_list_append(&stream->messages, &msg->link); + stream->write_index += msg->length; return 0; } @@ -2997,18 +2972,18 @@ static int do_read(struct client *client) ssize_t r; int res = 0; - if (client->index < sizeof(client->desc)) { - data = SPA_MEMBER(&client->desc, client->index, void); - size = sizeof(client->desc) - client->index; + if (client->in_index < sizeof(client->desc)) { + data = SPA_MEMBER(&client->desc, client->in_index, void); + size = sizeof(client->desc) - client->in_index; } else { - uint32_t idx = client->index - sizeof(client->desc); + uint32_t idx = client->in_index - sizeof(client->desc); - if (client->data.data == NULL) { + if (client->message == NULL) { res = -EIO; goto error; } - data = SPA_MEMBER(client->data.data, idx, void); - size = client->data.length - idx; + data = SPA_MEMBER(client->message->data, idx, void); + size = client->message->length - idx; } while (true) { if ((r = recv(client->source->fd, data, size, 0)) < 0) { @@ -3017,11 +2992,11 @@ static int do_read(struct client *client) res = -errno; goto error; } - client->index += r; + client->in_index += r; break; } - if (client->index == sizeof(client->desc)) { + if (client->in_index == sizeof(client->desc)) { uint32_t flags, length, channel; flags = ntohl(client->desc.flags); @@ -3049,24 +3024,27 @@ static int do_read(struct client *client) } else { client->type = TYPE_MEMBLOCK; } - client->data.data = calloc(1, length); - client->data.length = length; - client->data.offset = 0; - } else if (client->index >= client->data.length + sizeof(client->desc)) { + if (client->message) + message_free(client, client->message, false); + client->message = message_alloc(client, length); + } else if (client->message && + client->in_index >= client->message->length + sizeof(client->desc)) { + struct message *msg = client->message; + + client->message = NULL; + client->in_index = 0; + switch (client->type) { case TYPE_PACKET: - res = handle_packet(client); + res = handle_packet(client, msg); break; case TYPE_MEMBLOCK: - res = handle_memblock(client); + res = handle_memblock(client, msg); break; default: res = -EPROTO; break; } - client->index = 0; - free(client->data.data); - client->data.data = NULL; } error: return res; @@ -3089,6 +3067,13 @@ on_client_data(void *data, int fd, uint32_t mask) } if (mask & SPA_IO_OUT) { pw_log_trace(NAME" %p: can write", impl); + res = flush_messages(client); + if (res >= 0) { + int mask = client->source->mask; + SPA_FLAG_CLEAR(mask, SPA_IO_OUT); + pw_loop_update_io(impl->loop, client->source, mask); + } else if (res != EAGAIN) + goto error; } if (mask & SPA_IO_IN) { pw_log_trace(NAME" %p: can read", impl); @@ -3122,6 +3107,8 @@ on_connect(void *data, int fd, uint32_t mask) client->impl = impl; spa_list_append(&impl->clients, &client->link); pw_map_init(&client->streams, 16, 16); + spa_list_init(&client->free_messages); + spa_list_init(&client->out_messages); client->props = pw_properties_new( PW_KEY_CLIENT_API, "pipewire-pulse",