From 6f6337e732b3a4225088e822fb78018c8cfbae3d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 6 Oct 2020 16:36:37 +0200 Subject: [PATCH] pulse-bridge: support record streams --- src/examples/media-session/pulse-bridge.c | 129 ++++++++++++++++------ 1 file changed, 96 insertions(+), 33 deletions(-) diff --git a/src/examples/media-session/pulse-bridge.c b/src/examples/media-session/pulse-bridge.c index 42f529262..5aff82773 100644 --- a/src/examples/media-session/pulse-bridge.c +++ b/src/examples/media-session/pulse-bridge.c @@ -149,10 +149,11 @@ enum { struct message { struct spa_list link; - uint8_t *data; + uint32_t channel; uint32_t allocated; uint32_t length; uint32_t offset; + uint8_t *data; }; struct client { @@ -170,9 +171,6 @@ struct client { uint32_t in_index; uint32_t out_index; struct descriptor desc; -#define TYPE_PACKET 0 -#define TYPE_MEMBLOCK 1 - uint32_t type; struct message *message; struct pw_map streams; @@ -985,7 +983,7 @@ static void message_free(struct client *client, struct message *msg, bool destro spa_list_append(&client->free_messages, &msg->link); } -static struct message *message_alloc(struct client *client, uint32_t size) +static struct message *message_alloc(struct client *client, uint32_t size, uint32_t channel) { struct message *msg = NULL; @@ -999,6 +997,7 @@ static struct message *message_alloc(struct client *client, uint32_t size) msg->allocated = alloc; msg->data = SPA_MEMBER(msg, sizeof(struct message), void); } + msg->channel = channel; msg->offset = 0; msg->length = size; return msg; @@ -1020,7 +1019,7 @@ static int flush_messages(struct client *client) if (client->out_index < sizeof(desc)) { desc.length = htonl(m->length); - desc.channel = htonl(-1); + desc.channel = htonl(m->channel); desc.offset_hi = 0; desc.offset_lo = 0; desc.flags = 0; @@ -1073,7 +1072,7 @@ static int send_message(struct client *client, struct message *m) static struct message *reply_new(struct client *client, uint32_t tag) { struct message *reply; - reply = message_alloc(client, 0); + reply = message_alloc(client, 0, -1); pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); message_put(reply, TAG_U32, COMMAND_REPLY, @@ -1094,7 +1093,7 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error) pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error); - reply = message_alloc(client, 0); + reply = message_alloc(client, 0, -1); message_put(reply, TAG_U32, COMMAND_ERROR, TAG_U32, tag, @@ -1270,7 +1269,7 @@ static int send_command_request(struct stream *stream) pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); - msg = message_alloc(client, 0); + msg = message_alloc(client, 0, -1); message_put(msg, TAG_U32, COMMAND_REQUEST, TAG_U32, -1, @@ -1408,8 +1407,13 @@ static const struct spa_pod *get_buffers_param(struct stream *s, blocks = 1; stride = s->frame_size; - maxsize = attr->tlength; - size = attr->minreq; + if (s->direction == PW_DIRECTION_OUTPUT) { + maxsize = attr->tlength; + size = attr->minreq; + } else { + size = attr->fragsize; + maxsize = attr->fragsize; + } buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS); pw_log_info("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, @@ -1529,21 +1533,53 @@ static void update_timing_info(struct stream *stream) stream->delay = delay; } -static void stream_process(void *data) +static void stream_process_record(struct stream *stream) { - struct stream *stream = data; struct client *client = stream->client; - struct message *msg; struct pw_buffer *buffer; struct spa_buffer *buf; - uint32_t size, maxsize; + uint32_t size; + struct message *msg; void *p; + update_timing_info(stream); + + buffer = pw_stream_dequeue_buffer(stream->stream); + if (buffer == NULL) + return; + + buf = buffer->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + size = buf->datas[0].chunk->size; + + msg = message_alloc(client, size, stream->channel); + if (msg != NULL) { + memcpy(msg->data, + SPA_MEMBER(p, buf->datas[0].chunk->offset, void), + size); + send_message(client, msg); + } + stream->write_index += size; + pw_stream_queue_buffer(stream->stream, buffer); +} + +static void stream_process_playback(struct stream *stream) +{ + struct client *client = stream->client; + pw_log_trace(NAME" %p: process", stream); update_timing_info(stream); while (!spa_list_is_empty(&stream->messages)) { + struct pw_buffer *buffer; + struct spa_buffer *buf; + struct message *msg; + uint32_t size, maxsize; + void *p; + buffer = pw_stream_dequeue_buffer(stream->stream); if (buffer == NULL) break; @@ -1575,6 +1611,15 @@ static void stream_process(void *data) send_command_request(stream); } +static void stream_process(void *data) +{ + struct stream *stream = data; + if (stream->direction == PW_DIRECTION_OUTPUT) + stream_process_playback(stream); + else + stream_process_record(stream); +} + static void stream_drained(void *data) { struct stream *stream = data; @@ -1849,6 +1894,28 @@ error: return res; } +static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr) +{ + uint32_t frame_size; + + frame_size = s->frame_size; + + if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH) + attr->maxlength = MAXLENGTH; + attr->maxlength -= attr->maxlength % frame_size; + attr->maxlength = SPA_MAX(attr->maxlength, frame_size); + + if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) + attr->fragsize = usec_to_bytes_round_up(DEFAULT_FRAGSIZE_MSEC*1000, &s->ss); + attr->fragsize -= attr->fragsize % frame_size; + attr->fragsize = SPA_MAX(attr->fragsize, frame_size); + if (attr->fragsize > attr->maxlength) + attr->fragsize = attr->maxlength; + + pw_log_info(NAME" %p: maxlength:%u fragsize:%u", s, + attr->maxlength, attr->fragsize); +} + static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -1989,6 +2056,17 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint stream->client = client; stream->channel = pw_map_insert_new(&client->streams, stream); spa_list_init(&stream->messages); + stream->create_tag = tag; + stream->ss = ss; + stream->map = map; + + stream->frame_size = sample_spec_frame_size(&stream->ss); + + fix_record_buffer_attr(stream, &attr); + stream->attr = attr; + + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%u", + stream->attr.fragsize / stream->frame_size, ss.rate); stream->stream = pw_stream_new(client->core, name, props); props = NULL; @@ -2000,11 +2078,6 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint &stream->stream_listener, &stream_events, stream); - stream->create_tag = tag; - stream->ss = ss; - stream->map = map; - stream->attr = attr; - info = SPA_AUDIO_INFO_RAW_INIT( .format = format_pa2id(ss.format), .channels = ss.channels, @@ -3020,13 +3093,10 @@ static int do_read(struct client *client) res = -EPROTO; goto error; } - client->type = TYPE_PACKET; - } else { - client->type = TYPE_MEMBLOCK; } if (client->message) message_free(client, client->message, false); - client->message = message_alloc(client, length); + client->message = message_alloc(client, length, channel); } else if (client->message && client->in_index >= client->message->length + sizeof(client->desc)) { struct message *msg = client->message; @@ -3034,17 +3104,10 @@ static int do_read(struct client *client) client->message = NULL; client->in_index = 0; - switch (client->type) { - case TYPE_PACKET: + if (msg->channel == (uint32_t)-1) res = handle_packet(client, msg); - break; - case TYPE_MEMBLOCK: + else res = handle_memblock(client, msg); - break; - default: - res = -EPROTO; - break; - } } error: return res;