pulse-bridge: support record streams

This commit is contained in:
Wim Taymans 2020-10-06 16:36:37 +02:00
parent 7c1fbf5bee
commit 6f6337e732

View file

@ -149,10 +149,11 @@ enum {
struct message { struct message {
struct spa_list link; struct spa_list link;
uint8_t *data; uint32_t channel;
uint32_t allocated; uint32_t allocated;
uint32_t length; uint32_t length;
uint32_t offset; uint32_t offset;
uint8_t *data;
}; };
struct client { struct client {
@ -170,9 +171,6 @@ struct client {
uint32_t in_index; uint32_t in_index;
uint32_t out_index; uint32_t out_index;
struct descriptor desc; struct descriptor desc;
#define TYPE_PACKET 0
#define TYPE_MEMBLOCK 1
uint32_t type;
struct message *message; struct message *message;
struct pw_map streams; struct pw_map streams;
@ -985,7 +983,7 @@ static void message_free(struct client *client, struct message *msg, bool destro
spa_list_append(&client->free_messages, &msg->link); spa_list_append(&client->free_messages, &msg->link);
} }
static struct message *message_alloc(struct client *client, uint32_t size) static struct message *message_alloc(struct client *client, uint32_t size, uint32_t channel)
{ {
struct message *msg = NULL; struct message *msg = NULL;
@ -999,6 +997,7 @@ static struct message *message_alloc(struct client *client, uint32_t size)
msg->allocated = alloc; msg->allocated = alloc;
msg->data = SPA_MEMBER(msg, sizeof(struct message), void); msg->data = SPA_MEMBER(msg, sizeof(struct message), void);
} }
msg->channel = channel;
msg->offset = 0; msg->offset = 0;
msg->length = size; msg->length = size;
return msg; return msg;
@ -1020,7 +1019,7 @@ static int flush_messages(struct client *client)
if (client->out_index < sizeof(desc)) { if (client->out_index < sizeof(desc)) {
desc.length = htonl(m->length); desc.length = htonl(m->length);
desc.channel = htonl(-1); desc.channel = htonl(m->channel);
desc.offset_hi = 0; desc.offset_hi = 0;
desc.offset_lo = 0; desc.offset_lo = 0;
desc.flags = 0; desc.flags = 0;
@ -1073,7 +1072,7 @@ static int send_message(struct client *client, struct message *m)
static struct message *reply_new(struct client *client, uint32_t tag) static struct message *reply_new(struct client *client, uint32_t tag)
{ {
struct message *reply; struct message *reply;
reply = message_alloc(client, 0); reply = message_alloc(client, 0, -1);
pw_log_debug(NAME" %p: REPLY tag:%u", client, tag); pw_log_debug(NAME" %p: REPLY tag:%u", client, tag);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_REPLY, TAG_U32, COMMAND_REPLY,
@ -1094,7 +1093,7 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error)
pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error); pw_log_debug(NAME" %p: ERROR tag:%u error:%u", client, tag, error);
reply = message_alloc(client, 0); reply = message_alloc(client, 0, -1);
message_put(reply, message_put(reply,
TAG_U32, COMMAND_ERROR, TAG_U32, COMMAND_ERROR,
TAG_U32, tag, TAG_U32, tag,
@ -1270,7 +1269,7 @@ static int send_command_request(struct stream *stream)
pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
msg = message_alloc(client, 0); msg = message_alloc(client, 0, -1);
message_put(msg, message_put(msg,
TAG_U32, COMMAND_REQUEST, TAG_U32, COMMAND_REQUEST,
TAG_U32, -1, TAG_U32, -1,
@ -1408,8 +1407,13 @@ static const struct spa_pod *get_buffers_param(struct stream *s,
blocks = 1; blocks = 1;
stride = s->frame_size; stride = s->frame_size;
if (s->direction == PW_DIRECTION_OUTPUT) {
maxsize = attr->tlength; maxsize = attr->tlength;
size = attr->minreq; size = attr->minreq;
} else {
size = attr->fragsize;
maxsize = attr->fragsize;
}
buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS); buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS);
pw_log_info("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, pw_log_info("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize,
@ -1529,21 +1533,53 @@ static void update_timing_info(struct stream *stream)
stream->delay = delay; stream->delay = delay;
} }
static void stream_process(void *data) static void stream_process_record(struct stream *stream)
{ {
struct stream *stream = data;
struct client *client = stream->client; struct client *client = stream->client;
struct message *msg;
struct pw_buffer *buffer; struct pw_buffer *buffer;
struct spa_buffer *buf; struct spa_buffer *buf;
uint32_t size, maxsize; uint32_t size;
struct message *msg;
void *p; void *p;
update_timing_info(stream);
buffer = pw_stream_dequeue_buffer(stream->stream);
if (buffer == NULL)
return;
buf = buffer->buffer;
if ((p = buf->datas[0].data) == NULL)
return;
size = buf->datas[0].chunk->size;
msg = message_alloc(client, size, stream->channel);
if (msg != NULL) {
memcpy(msg->data,
SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
size);
send_message(client, msg);
}
stream->write_index += size;
pw_stream_queue_buffer(stream->stream, buffer);
}
static void stream_process_playback(struct stream *stream)
{
struct client *client = stream->client;
pw_log_trace(NAME" %p: process", stream); pw_log_trace(NAME" %p: process", stream);
update_timing_info(stream); update_timing_info(stream);
while (!spa_list_is_empty(&stream->messages)) { while (!spa_list_is_empty(&stream->messages)) {
struct pw_buffer *buffer;
struct spa_buffer *buf;
struct message *msg;
uint32_t size, maxsize;
void *p;
buffer = pw_stream_dequeue_buffer(stream->stream); buffer = pw_stream_dequeue_buffer(stream->stream);
if (buffer == NULL) if (buffer == NULL)
break; break;
@ -1575,6 +1611,15 @@ static void stream_process(void *data)
send_command_request(stream); send_command_request(stream);
} }
static void stream_process(void *data)
{
struct stream *stream = data;
if (stream->direction == PW_DIRECTION_OUTPUT)
stream_process_playback(stream);
else
stream_process_record(stream);
}
static void stream_drained(void *data) static void stream_drained(void *data)
{ {
struct stream *stream = data; struct stream *stream = data;
@ -1849,6 +1894,28 @@ error:
return res; return res;
} }
static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
uint32_t frame_size;
frame_size = s->frame_size;
if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
attr->maxlength = MAXLENGTH;
attr->maxlength -= attr->maxlength % frame_size;
attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
attr->fragsize = usec_to_bytes_round_up(DEFAULT_FRAGSIZE_MSEC*1000, &s->ss);
attr->fragsize -= attr->fragsize % frame_size;
attr->fragsize = SPA_MAX(attr->fragsize, frame_size);
if (attr->fragsize > attr->maxlength)
attr->fragsize = attr->maxlength;
pw_log_info(NAME" %p: maxlength:%u fragsize:%u", s,
attr->maxlength, attr->fragsize);
}
static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{ {
struct impl *impl = client->impl; struct impl *impl = client->impl;
@ -1989,6 +2056,17 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
stream->client = client; stream->client = client;
stream->channel = pw_map_insert_new(&client->streams, stream); stream->channel = pw_map_insert_new(&client->streams, stream);
spa_list_init(&stream->messages); spa_list_init(&stream->messages);
stream->create_tag = tag;
stream->ss = ss;
stream->map = map;
stream->frame_size = sample_spec_frame_size(&stream->ss);
fix_record_buffer_attr(stream, &attr);
stream->attr = attr;
pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%u/%u",
stream->attr.fragsize / stream->frame_size, ss.rate);
stream->stream = pw_stream_new(client->core, name, props); stream->stream = pw_stream_new(client->core, name, props);
props = NULL; props = NULL;
@ -2000,11 +2078,6 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
&stream->stream_listener, &stream->stream_listener,
&stream_events, stream); &stream_events, stream);
stream->create_tag = tag;
stream->ss = ss;
stream->map = map;
stream->attr = attr;
info = SPA_AUDIO_INFO_RAW_INIT( info = SPA_AUDIO_INFO_RAW_INIT(
.format = format_pa2id(ss.format), .format = format_pa2id(ss.format),
.channels = ss.channels, .channels = ss.channels,
@ -3020,13 +3093,10 @@ static int do_read(struct client *client)
res = -EPROTO; res = -EPROTO;
goto error; goto error;
} }
client->type = TYPE_PACKET;
} else {
client->type = TYPE_MEMBLOCK;
} }
if (client->message) if (client->message)
message_free(client, client->message, false); message_free(client, client->message, false);
client->message = message_alloc(client, length); client->message = message_alloc(client, length, channel);
} else if (client->message && } else if (client->message &&
client->in_index >= client->message->length + sizeof(client->desc)) { client->in_index >= client->message->length + sizeof(client->desc)) {
struct message *msg = client->message; struct message *msg = client->message;
@ -3034,17 +3104,10 @@ static int do_read(struct client *client)
client->message = NULL; client->message = NULL;
client->in_index = 0; client->in_index = 0;
switch (client->type) { if (msg->channel == (uint32_t)-1)
case TYPE_PACKET:
res = handle_packet(client, msg); res = handle_packet(client, msg);
break; else
case TYPE_MEMBLOCK:
res = handle_memblock(client, msg); res = handle_memblock(client, msg);
break;
default:
res = -EPROTO;
break;
}
} }
error: error:
return res; return res;