a2dp-source: fix source

Use codec methods. Init codec at start.
Remove rate match until we actually implement this
Add some buffering of packets before we hand them out.
Always simply fill a buffer and hand it out.
don't emit signals when we are following another driver.
Acquire transport as soon as it goes to PENDING.
This commit is contained in:
Wim Taymans 2020-10-19 18:25:52 +02:00
parent d727123b86
commit 6cc3224031
2 changed files with 96 additions and 100 deletions

View file

@ -1164,6 +1164,7 @@ static int impl_clear(struct spa_handle *handle)
{
struct impl *this = (struct impl *) handle;
if (this->codec_data)
this->codec->deinit(this->codec_data);
if (this->transport)
spa_hook_remove(&this->transport_listener);

View file

@ -50,8 +50,6 @@
#include <spa/param/audio/format-utils.h>
#include <spa/pod/filter.h>
#include <sbc/sbc.h>
#include "defs.h"
#include "rtp.h"
#include "a2dp-codecs.h"
@ -80,7 +78,6 @@ struct port {
uint64_t info_all;
struct spa_port_info info;
struct spa_io_buffers *io;
struct spa_io_rate_match *rate_match;
struct spa_param_info params[8];
struct buffer buffers[MAX_BUFFERS];
@ -88,9 +85,8 @@ struct port {
struct spa_list free;
struct spa_list ready;
struct buffer *current_buffer;
uint32_t ready_offset;
uint32_t n_ready;
unsigned int buffering:1;
};
struct impl {
@ -122,7 +118,10 @@ struct impl {
struct spa_io_clock *clock;
struct spa_io_position *position;
sbc_t sbc;
const struct a2dp_codec *codec;
void *codec_data;
struct spa_audio_info codec_format;
uint8_t buffer_read[4096];
struct timespec now;
uint32_t sample_count;
@ -295,6 +294,8 @@ static void reset_buffers(struct port *port)
spa_list_init(&port->free);
spa_list_init(&port->ready);
port->n_ready = 0;
port->buffering = true;
for (i = 0; i < port->n_buffers; i++) {
struct buffer *b = &port->buffers[i];
@ -356,7 +357,8 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size,
/* decode */
avail = dst_size;
while (src_size > 0) {
processed = sbc_decode(&this->sbc, src, src_size, dst, avail, &written);
processed = this->codec->decode(this->codec_data,
src, src_size, dst, avail, &written);
if (processed <= 0)
return processed;
@ -367,7 +369,6 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size,
avail -= written;
dst += written;
}
return dst_size - avail;
}
@ -376,8 +377,10 @@ static void a2dp_on_ready_read(struct spa_source *source)
struct impl *this = source->data;
struct port *port = &this->port;
struct spa_io_buffers *io = port->io;
int32_t size_read;
int32_t size_read, decoded;
struct spa_data *datas;
struct buffer *buffer;
uint8_t read_decoded[4096];
/* make sure the source is an input */
if ((source->rmask & SPA_IO_IN) == 0) {
@ -403,8 +406,7 @@ static void a2dp_on_ready_read(struct spa_source *source)
spa_log_debug(this->log, "read socket data %d", size_read);
/* decode */
uint8_t read_decoded [4096];
int32_t decoded = decode_data(this, this->buffer_read, size_read,
decoded = decode_data(this, this->buffer_read, size_read,
read_decoded, sizeof (read_decoded));
if (decoded <= 0) {
spa_log_error(this->log, "failed to decode data");
@ -412,36 +414,39 @@ static void a2dp_on_ready_read(struct spa_source *source)
}
spa_log_debug(this->log, "decoded socket data %d", decoded);
/* discard when not started */
if (!this->started)
return;
/* get buffer */
if (!port->current_buffer) {
if (spa_list_is_empty(&port->free)) {
spa_log_warn(this->log, "buffer not available");
spa_log_warn(this->log, "no buffer available");
return;
}
port->current_buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&port->current_buffer->link);
if (port->current_buffer->h) {
port->current_buffer->h->seq = this->sample_count;
port->current_buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
port->current_buffer->h->dts_offset = 0;
buffer = spa_list_first(&port->free, struct buffer, link);
spa_list_remove(&buffer->link);
spa_log_debug(this->log, "dequeue %d", buffer->id);
if (buffer->h) {
buffer->h->seq = this->sample_count;
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
buffer->h->dts_offset = 0;
}
port->ready_offset = 0;
}
datas = port->current_buffer->buf->datas;
datas = buffer->buf->datas;
/* copy data into buffer */
memcpy ((uint8_t *)datas[0].data + port->ready_offset, read_decoded, decoded);
port->ready_offset += decoded;
memcpy ((uint8_t *)datas[0].data, read_decoded, decoded);
/* send buffer if full */
if ((port->ready_offset + sizeof (read_decoded)) >= datas[0].maxsize / port->frame_size) {
/* send buffer */
datas[0].chunk->offset = 0;
datas[0].chunk->size = port->ready_offset;
datas[0].chunk->size = decoded;
datas[0].chunk->stride = port->frame_size;
this->sample_count += datas[0].chunk->size / port->frame_size;
spa_list_append(&port->ready, &port->current_buffer->link);
port->current_buffer = NULL;
spa_log_debug(this->log, "queue %d", buffer->id);
spa_list_append(&port->ready, &buffer->link);
port->n_ready++;
if (!this->following && this->clock) {
this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now);
@ -450,12 +455,14 @@ static void a2dp_on_ready_read(struct spa_source *source)
this->clock->rate_diff = 1.0f;
this->clock->next_nsec = this->clock->nsec;
}
}
/* done if there are no buffers ready */
if (spa_list_is_empty(&port->ready))
return;
if (this->following)
return;
/* process the buffer if IO does not have any */
if (io != NULL && io->status != SPA_STATUS_HAVE_DATA) {
struct buffer *b;
@ -465,6 +472,8 @@ static void a2dp_on_ready_read(struct spa_source *source)
b = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&b->link);
if (--port->n_ready == 0)
port->buffering = true;
b->outstanding = true;
io->buffer_id = b->id;
@ -484,12 +493,11 @@ static int transport_start(struct impl *this)
{
int res, val;
spa_log_debug(this->log, NAME" %p: transport %p acquire", this,
this->transport);
if ((res = spa_bt_transport_acquire(this->transport, false)) < 0)
return res;
sbc_init_a2dp(&this->sbc, 0, this->transport->configuration,
this->transport->configuration_len);
val = fcntl(this->transport->fd, F_GETFL);
if (fcntl(this->transport->fd, F_SETFL, val | O_NONBLOCK) < 0)
spa_log_warn(this->log, NAME" %p: fcntl %u %m", this, val | O_NONBLOCK);
@ -527,7 +535,10 @@ static int do_start(struct impl *this)
if (this->started)
return 0;
spa_log_debug(this->log, NAME" %p: start", this);
this->following = is_following(this);
spa_log_debug(this->log, NAME" %p: start state:%d",
this, this->transport->state);
spa_return_val_if_fail(this->transport != NULL, -EIO);
@ -572,8 +583,6 @@ static int do_stop(struct impl *this)
else
res = 0;
sbc_finish(&this->sbc);
return res;
}
@ -613,6 +622,7 @@ static const struct spa_dict_item node_info_items[] = {
{ SPA_KEY_DEVICE_API, "bluez5" },
{ SPA_KEY_MEDIA_CLASS, "Audio/Source" },
{ SPA_KEY_NODE_DRIVER, "true" },
{ SPA_KEY_NODE_LATENCY, "512/48000" },
};
static void emit_node_info(struct impl *this, bool full)
@ -726,45 +736,19 @@ impl_node_port_enum_params(void *object, int seq,
case SPA_PARAM_EnumFormat:
if (result.index > 0)
return 0;
if (this->transport == NULL)
if (this->codec_data == NULL)
return -EIO;
switch (this->transport->codec) {
case A2DP_CODEC_SBC:
{
a2dp_sbc_t *config = this->transport->configuration;
struct spa_audio_info_raw info = { 0, };
int res;
info.format = SPA_AUDIO_FORMAT_S16;
if ((res = a2dp_sbc_get_frequency(config)) < 0)
return -EIO;
info.rate = res;
if ((res = a2dp_sbc_get_channels(config)) < 0)
return -EIO;
info.channels = res;
switch (info.channels) {
case 1:
info.position[0] = SPA_AUDIO_CHANNEL_MONO;
param = spa_format_audio_raw_build(&b, id, &this->codec_format.info.raw);
break;
case 2:
info.position[0] = SPA_AUDIO_CHANNEL_FL;
info.position[1] = SPA_AUDIO_CHANNEL_FR;
break;
default:
return -EIO;
}
param = spa_format_audio_raw_build(&b, id, &info);
break;
}
case A2DP_CODEC_MPEG24:
{
/* not implemented yet */
spa_log_error(this->log, "a2dp mpeg24 codec not implemented yet");
return -EIO;
}
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Format, id,
SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio),
SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_aac));
break;
default:
return -EIO;
}
@ -787,7 +771,7 @@ impl_node_port_enum_params(void *object, int seq,
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS),
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.max_latency * port->frame_size,
@ -818,12 +802,6 @@ impl_node_port_enum_params(void *object, int seq,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamIO, id,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match)));
break;
default:
return 0;
}
@ -850,6 +828,8 @@ static int clear_buffers(struct impl *this, struct port *port)
if (port->n_buffers > 0) {
spa_list_init(&port->free);
spa_list_init(&port->ready);
port->n_ready = 0;
port->buffering = true;
port->n_buffers = 0;
}
return 0;
@ -988,9 +968,6 @@ impl_node_port_set_io(void *object,
case SPA_IO_Buffers:
port->io = data;
break;
case SPA_IO_RateMatch:
port->rate_match = data;
break;
default:
return -ENOENT;
}
@ -1031,6 +1008,8 @@ static int impl_node_process(void *object)
io = port->io;
spa_return_val_if_fail(io != NULL, -EIO);
spa_log_debug(this->log, "%p status:%d %d", this, io->status, port->n_ready);
/* Return if we already have a buffer */
if (io->status == SPA_STATUS_HAVE_DATA)
return SPA_STATUS_HAVE_DATA;
@ -1044,11 +1023,17 @@ static int impl_node_process(void *object)
/* Return if there are no buffers ready to be processed */
if (spa_list_is_empty(&port->ready))
return SPA_STATUS_OK;
if (port->buffering && port->n_ready < 4)
return SPA_STATUS_OK;
port->buffering = false;
/* Get the new buffer from the ready list */
buffer = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&buffer->link);
buffer->outstanding = false;
if (--port->n_ready == 0)
port->buffering = true;
buffer->outstanding = true;
/* Set the new buffer in IO */
io->buffer_id = buffer->id;
@ -1088,11 +1073,12 @@ static void transport_state_changed(void *data, enum spa_bt_transport_state old,
enum spa_bt_transport_state state)
{
struct impl *this = data;
if (state >= SPA_BT_TRANSPORT_STATE_PENDING && old < SPA_BT_TRANSPORT_STATE_PENDING) {
if (this->started)
spa_log_debug(this->log, "transport %p state %d->%d started:%d",
this->transport, old, state, this->started);
if (state >= SPA_BT_TRANSPORT_STATE_PENDING && old < SPA_BT_TRANSPORT_STATE_PENDING)
transport_start(this);
}
}
static const struct spa_bt_transport_events transport_events = {
SPA_VERSION_BT_TRANSPORT_EVENTS,
@ -1120,6 +1106,8 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void
static int impl_clear(struct spa_handle *handle)
{
struct impl *this = (struct impl *) handle;
if (this->codec_data)
this->codec->deinit(this->codec_data);
if (this->transport)
spa_hook_remove(&this->transport_listener);
return 0;
@ -1213,13 +1201,20 @@ impl_init(const struct spa_handle_factory *factory,
spa_log_error(this->log, "a transport is needed");
return -EINVAL;
}
if (this->transport->codec != A2DP_CODEC_SBC) {
spa_log_error(this->log, "codec != SBC not yet supported");
if (this->transport->a2dp_codec == NULL) {
spa_log_error(this->log, "a transport codec is needed");
return -EINVAL;
}
this->codec = this->transport->a2dp_codec;
spa_bt_transport_add_listener(this->transport,
&this->transport_listener, &transport_events, this);
this->codec_data = this->codec->init(0,
this->transport->configuration,
this->transport->configuration_len,
&this->codec_format);
return 0;
}