From 6cc3224031f08635fa47038bd4954994040f9a5a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 19 Oct 2020 18:25:52 +0200 Subject: [PATCH] a2dp-source: fix source Use codec methods. Init codec at start. Remove rate match until we actually implement this Add some buffering of packets before we hand them out. Always simply fill a buffer and hand it out. don't emit signals when we are following another driver. Acquire transport as soon as it goes to PENDING. --- spa/plugins/bluez5/a2dp-sink.c | 3 +- spa/plugins/bluez5/a2dp-source.c | 193 +++++++++++++++---------------- 2 files changed, 96 insertions(+), 100 deletions(-) diff --git a/spa/plugins/bluez5/a2dp-sink.c b/spa/plugins/bluez5/a2dp-sink.c index 1955daf96..d9f775c37 100644 --- a/spa/plugins/bluez5/a2dp-sink.c +++ b/spa/plugins/bluez5/a2dp-sink.c @@ -1164,7 +1164,8 @@ static int impl_clear(struct spa_handle *handle) { struct impl *this = (struct impl *) handle; - this->codec->deinit(this->codec_data); + if (this->codec_data) + this->codec->deinit(this->codec_data); if (this->transport) spa_hook_remove(&this->transport_listener); spa_system_close(this->data_system, this->timerfd); diff --git a/spa/plugins/bluez5/a2dp-source.c b/spa/plugins/bluez5/a2dp-source.c index 41adf8c16..dfebefe79 100644 --- a/spa/plugins/bluez5/a2dp-source.c +++ b/spa/plugins/bluez5/a2dp-source.c @@ -50,8 +50,6 @@ #include #include -#include - #include "defs.h" #include "rtp.h" #include "a2dp-codecs.h" @@ -80,7 +78,6 @@ struct port { uint64_t info_all; struct spa_port_info info; struct spa_io_buffers *io; - struct spa_io_rate_match *rate_match; struct spa_param_info params[8]; struct buffer buffers[MAX_BUFFERS]; @@ -88,9 +85,8 @@ struct port { struct spa_list free; struct spa_list ready; - - struct buffer *current_buffer; - uint32_t ready_offset; + uint32_t n_ready; + unsigned int buffering:1; }; struct impl { @@ -122,7 +118,10 @@ struct impl { struct spa_io_clock *clock; struct spa_io_position *position; - sbc_t sbc; + const struct a2dp_codec *codec; + void *codec_data; + struct spa_audio_info codec_format; + uint8_t buffer_read[4096]; struct timespec now; uint32_t sample_count; @@ -295,6 +294,8 @@ static void reset_buffers(struct port *port) spa_list_init(&port->free); spa_list_init(&port->ready); + port->n_ready = 0; + port->buffering = true; for (i = 0; i < port->n_buffers; i++) { struct buffer *b = &port->buffers[i]; @@ -356,7 +357,8 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, /* decode */ avail = dst_size; while (src_size > 0) { - processed = sbc_decode(&this->sbc, src, src_size, dst, avail, &written); + processed = this->codec->decode(this->codec_data, + src, src_size, dst, avail, &written); if (processed <= 0) return processed; @@ -367,7 +369,6 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size, avail -= written; dst += written; } - return dst_size - avail; } @@ -376,8 +377,10 @@ static void a2dp_on_ready_read(struct spa_source *source) struct impl *this = source->data; struct port *port = &this->port; struct spa_io_buffers *io = port->io; - int32_t size_read; + int32_t size_read, decoded; struct spa_data *datas; + struct buffer *buffer; + uint8_t read_decoded[4096]; /* make sure the source is an input */ if ((source->rmask & SPA_IO_IN) == 0) { @@ -403,8 +406,7 @@ static void a2dp_on_ready_read(struct spa_source *source) spa_log_debug(this->log, "read socket data %d", size_read); /* decode */ - uint8_t read_decoded [4096]; - int32_t decoded = decode_data(this, this->buffer_read, size_read, + decoded = decode_data(this, this->buffer_read, size_read, read_decoded, sizeof (read_decoded)); if (decoded <= 0) { spa_log_error(this->log, "failed to decode data"); @@ -412,50 +414,55 @@ static void a2dp_on_ready_read(struct spa_source *source) } spa_log_debug(this->log, "decoded socket data %d", decoded); + /* discard when not started */ + if (!this->started) + return; + /* get buffer */ - if (!port->current_buffer) { - if (spa_list_is_empty(&port->free)) { - spa_log_warn(this->log, "buffer not available"); - return; - } - port->current_buffer = spa_list_first(&port->free, struct buffer, link); - spa_list_remove(&port->current_buffer->link); - if (port->current_buffer->h) { - port->current_buffer->h->seq = this->sample_count; - port->current_buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); - port->current_buffer->h->dts_offset = 0; - } - port->ready_offset = 0; + if (spa_list_is_empty(&port->free)) { + spa_log_warn(this->log, "no buffer available"); + return; } - datas = port->current_buffer->buf->datas; + buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&buffer->link); + spa_log_debug(this->log, "dequeue %d", buffer->id); + + if (buffer->h) { + buffer->h->seq = this->sample_count; + buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now); + buffer->h->dts_offset = 0; + } + datas = buffer->buf->datas; /* copy data into buffer */ - memcpy ((uint8_t *)datas[0].data + port->ready_offset, read_decoded, decoded); - port->ready_offset += decoded; + memcpy ((uint8_t *)datas[0].data, read_decoded, decoded); - /* send buffer if full */ - if ((port->ready_offset + sizeof (read_decoded)) >= datas[0].maxsize / port->frame_size) { - datas[0].chunk->offset = 0; - datas[0].chunk->size = port->ready_offset; - datas[0].chunk->stride = port->frame_size; + /* send buffer */ + datas[0].chunk->offset = 0; + datas[0].chunk->size = decoded; + datas[0].chunk->stride = port->frame_size; - this->sample_count += datas[0].chunk->size / port->frame_size; - spa_list_append(&port->ready, &port->current_buffer->link); - port->current_buffer = NULL; + this->sample_count += datas[0].chunk->size / port->frame_size; - if (!this->following && this->clock) { - this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now); - this->clock->position = this->sample_count; - this->clock->delay = 0; - this->clock->rate_diff = 1.0f; - this->clock->next_nsec = this->clock->nsec; - } + spa_log_debug(this->log, "queue %d", buffer->id); + spa_list_append(&port->ready, &buffer->link); + port->n_ready++; + + if (!this->following && this->clock) { + this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now); + this->clock->position = this->sample_count; + this->clock->delay = 0; + this->clock->rate_diff = 1.0f; + this->clock->next_nsec = this->clock->nsec; } /* done if there are no buffers ready */ if (spa_list_is_empty(&port->ready)) return; + if (this->following) + return; + /* process the buffer if IO does not have any */ if (io != NULL && io->status != SPA_STATUS_HAVE_DATA) { struct buffer *b; @@ -465,6 +472,8 @@ static void a2dp_on_ready_read(struct spa_source *source) b = spa_list_first(&port->ready, struct buffer, link); spa_list_remove(&b->link); + if (--port->n_ready == 0) + port->buffering = true; b->outstanding = true; io->buffer_id = b->id; @@ -484,12 +493,11 @@ static int transport_start(struct impl *this) { int res, val; + spa_log_debug(this->log, NAME" %p: transport %p acquire", this, + this->transport); if ((res = spa_bt_transport_acquire(this->transport, false)) < 0) return res; - sbc_init_a2dp(&this->sbc, 0, this->transport->configuration, - this->transport->configuration_len); - val = fcntl(this->transport->fd, F_GETFL); if (fcntl(this->transport->fd, F_SETFL, val | O_NONBLOCK) < 0) spa_log_warn(this->log, NAME" %p: fcntl %u %m", this, val | O_NONBLOCK); @@ -527,7 +535,10 @@ static int do_start(struct impl *this) if (this->started) return 0; - spa_log_debug(this->log, NAME" %p: start", this); + this->following = is_following(this); + + spa_log_debug(this->log, NAME" %p: start state:%d", + this, this->transport->state); spa_return_val_if_fail(this->transport != NULL, -EIO); @@ -572,8 +583,6 @@ static int do_stop(struct impl *this) else res = 0; - sbc_finish(&this->sbc); - return res; } @@ -613,6 +622,7 @@ static const struct spa_dict_item node_info_items[] = { { SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_MEDIA_CLASS, "Audio/Source" }, { SPA_KEY_NODE_DRIVER, "true" }, + { SPA_KEY_NODE_LATENCY, "512/48000" }, }; static void emit_node_info(struct impl *this, bool full) @@ -726,45 +736,19 @@ impl_node_port_enum_params(void *object, int seq, case SPA_PARAM_EnumFormat: if (result.index > 0) return 0; - if (this->transport == NULL) + if (this->codec_data == NULL) return -EIO; switch (this->transport->codec) { case A2DP_CODEC_SBC: - { - a2dp_sbc_t *config = this->transport->configuration; - struct spa_audio_info_raw info = { 0, }; - int res; - - info.format = SPA_AUDIO_FORMAT_S16; - if ((res = a2dp_sbc_get_frequency(config)) < 0) - return -EIO; - info.rate = res; - if ((res = a2dp_sbc_get_channels(config)) < 0) - return -EIO; - info.channels = res; - - switch (info.channels) { - case 1: - info.position[0] = SPA_AUDIO_CHANNEL_MONO; - break; - case 2: - info.position[0] = SPA_AUDIO_CHANNEL_FL; - info.position[1] = SPA_AUDIO_CHANNEL_FR; - break; - default: - return -EIO; - } - - param = spa_format_audio_raw_build(&b, id, &info); + param = spa_format_audio_raw_build(&b, id, &this->codec_format.info.raw); break; - } case A2DP_CODEC_MPEG24: - { - /* not implemented yet */ - spa_log_error(this->log, "a2dp mpeg24 codec not implemented yet"); - return -EIO; - } + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_Format, id, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_aac)); + break; default: return -EIO; } @@ -787,7 +771,7 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( this->props.max_latency * port->frame_size, @@ -818,12 +802,6 @@ impl_node_port_enum_params(void *object, int seq, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); break; - case 1: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_ParamIO, id, - SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch), - SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match))); - break; default: return 0; } @@ -850,6 +828,8 @@ static int clear_buffers(struct impl *this, struct port *port) if (port->n_buffers > 0) { spa_list_init(&port->free); spa_list_init(&port->ready); + port->n_ready = 0; + port->buffering = true; port->n_buffers = 0; } return 0; @@ -988,9 +968,6 @@ impl_node_port_set_io(void *object, case SPA_IO_Buffers: port->io = data; break; - case SPA_IO_RateMatch: - port->rate_match = data; - break; default: return -ENOENT; } @@ -1031,6 +1008,8 @@ static int impl_node_process(void *object) io = port->io; spa_return_val_if_fail(io != NULL, -EIO); + spa_log_debug(this->log, "%p status:%d %d", this, io->status, port->n_ready); + /* Return if we already have a buffer */ if (io->status == SPA_STATUS_HAVE_DATA) return SPA_STATUS_HAVE_DATA; @@ -1044,11 +1023,17 @@ static int impl_node_process(void *object) /* Return if there are no buffers ready to be processed */ if (spa_list_is_empty(&port->ready)) return SPA_STATUS_OK; + if (port->buffering && port->n_ready < 4) + return SPA_STATUS_OK; + + port->buffering = false; /* Get the new buffer from the ready list */ buffer = spa_list_first(&port->ready, struct buffer, link); spa_list_remove(&buffer->link); - buffer->outstanding = false; + if (--port->n_ready == 0) + port->buffering = true; + buffer->outstanding = true; /* Set the new buffer in IO */ io->buffer_id = buffer->id; @@ -1088,10 +1073,11 @@ static void transport_state_changed(void *data, enum spa_bt_transport_state old, enum spa_bt_transport_state state) { struct impl *this = data; - if (state >= SPA_BT_TRANSPORT_STATE_PENDING && old < SPA_BT_TRANSPORT_STATE_PENDING) { - if (this->started) - transport_start(this); - } + spa_log_debug(this->log, "transport %p state %d->%d started:%d", + this->transport, old, state, this->started); + + if (state >= SPA_BT_TRANSPORT_STATE_PENDING && old < SPA_BT_TRANSPORT_STATE_PENDING) + transport_start(this); } static const struct spa_bt_transport_events transport_events = { @@ -1120,6 +1106,8 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void static int impl_clear(struct spa_handle *handle) { struct impl *this = (struct impl *) handle; + if (this->codec_data) + this->codec->deinit(this->codec_data); if (this->transport) spa_hook_remove(&this->transport_listener); return 0; @@ -1213,13 +1201,20 @@ impl_init(const struct spa_handle_factory *factory, spa_log_error(this->log, "a transport is needed"); return -EINVAL; } - if (this->transport->codec != A2DP_CODEC_SBC) { - spa_log_error(this->log, "codec != SBC not yet supported"); + if (this->transport->a2dp_codec == NULL) { + spa_log_error(this->log, "a transport codec is needed"); return -EINVAL; } + this->codec = this->transport->a2dp_codec; + spa_bt_transport_add_listener(this->transport, &this->transport_listener, &transport_events, this); + this->codec_data = this->codec->init(0, + this->transport->configuration, + this->transport->configuration_len, + &this->codec_format); + return 0; }