mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
bluez5: fix a2dp-source clock rate + stuttering
Make a2dp clock advance at the correct rate. Revert back to accumulating to a single buffer before sending it out. What it did previously seemed hard to get to work properly with e.g. aptx which produces block of varying sizes on decoding.
This commit is contained in:
parent
746c714b0c
commit
d68bad4673
1 changed files with 83 additions and 47 deletions
|
|
@ -85,8 +85,9 @@ struct port {
|
|||
|
||||
struct spa_list free;
|
||||
struct spa_list ready;
|
||||
uint32_t n_ready;
|
||||
unsigned int buffering:1;
|
||||
|
||||
struct buffer *current_buffer;
|
||||
uint32_t ready_offset;
|
||||
};
|
||||
|
||||
struct impl {
|
||||
|
|
@ -124,7 +125,8 @@ struct impl {
|
|||
|
||||
uint8_t buffer_read[4096];
|
||||
struct timespec now;
|
||||
uint32_t sample_count;
|
||||
uint64_t sample_count;
|
||||
uint64_t skip_count;
|
||||
};
|
||||
|
||||
#define NAME "a2dp-source"
|
||||
|
|
@ -294,8 +296,7 @@ static void reset_buffers(struct port *port)
|
|||
|
||||
spa_list_init(&port->free);
|
||||
spa_list_init(&port->ready);
|
||||
port->n_ready = 0;
|
||||
port->buffering = true;
|
||||
port->current_buffer = NULL;
|
||||
|
||||
for (i = 0; i < port->n_buffers; i++) {
|
||||
struct buffer *b = &port->buffers[i];
|
||||
|
|
@ -372,14 +373,30 @@ static int32_t decode_data(struct impl *this, uint8_t *src, uint32_t src_size,
|
|||
return dst_size - avail;
|
||||
}
|
||||
|
||||
static void skip_ready_buffers(struct impl *this)
|
||||
{
|
||||
struct port *port = &this->port;
|
||||
|
||||
/* Move all buffers from ready to free */
|
||||
while (!spa_list_is_empty(&port->ready)) {
|
||||
struct buffer *b;
|
||||
b = spa_list_first(&port->ready, struct buffer, link);
|
||||
spa_list_remove(&b->link);
|
||||
spa_list_append(&port->free, &b->link);
|
||||
spa_assert(!b->outstanding);
|
||||
this->skip_count += b->buf->datas[0].chunk->size / port->frame_size;
|
||||
}
|
||||
}
|
||||
|
||||
static void a2dp_on_ready_read(struct spa_source *source)
|
||||
{
|
||||
struct impl *this = source->data;
|
||||
struct port *port = &this->port;
|
||||
struct spa_io_buffers *io = port->io;
|
||||
int32_t size_read, decoded;
|
||||
int32_t size_read, decoded, avail;
|
||||
struct spa_data *datas;
|
||||
struct buffer *buffer;
|
||||
uint32_t min_data;
|
||||
uint8_t read_decoded[4096];
|
||||
|
||||
/* make sure the source is an input */
|
||||
|
|
@ -403,7 +420,7 @@ static void a2dp_on_ready_read(struct spa_source *source)
|
|||
spa_log_error(this->log, "failed to read data: %s", spa_strerror(size_read));
|
||||
goto stop;
|
||||
}
|
||||
spa_log_debug(this->log, "read socket data %d", size_read);
|
||||
spa_log_trace(this->log, "read socket data %d", size_read);
|
||||
|
||||
/* decode */
|
||||
decoded = decode_data(this, this->buffer_read, size_read,
|
||||
|
|
@ -415,48 +432,75 @@ static void a2dp_on_ready_read(struct spa_source *source)
|
|||
if (decoded == 0)
|
||||
return;
|
||||
|
||||
spa_log_debug(this->log, "decoded socket data %d", decoded);
|
||||
spa_log_trace(this->log, "decoded socket data %d", decoded);
|
||||
|
||||
/* discard when not started */
|
||||
if (!this->started)
|
||||
return;
|
||||
|
||||
/* get buffer */
|
||||
if (spa_list_is_empty(&port->free)) {
|
||||
spa_log_warn(this->log, "no buffer available");
|
||||
return;
|
||||
}
|
||||
buffer = spa_list_first(&port->free, struct buffer, link);
|
||||
spa_list_remove(&buffer->link);
|
||||
spa_log_debug(this->log, "dequeue %d", buffer->id);
|
||||
if (!port->current_buffer) {
|
||||
if (spa_list_is_empty(&port->free)) {
|
||||
/* xrun, skip ahead */
|
||||
skip_ready_buffers(this);
|
||||
this->skip_count += decoded / port->frame_size;
|
||||
this->sample_count += decoded / port->frame_size;
|
||||
return;
|
||||
}
|
||||
if (this->skip_count > 0) {
|
||||
spa_log_info(this->log, NAME " %p: xrun, skipped %"PRIu64" usec",
|
||||
this, this->skip_count * SPA_USEC_PER_SEC / port->current_format.info.raw.rate);
|
||||
this->skip_count = 0;
|
||||
}
|
||||
|
||||
if (buffer->h) {
|
||||
buffer->h->seq = this->sample_count;
|
||||
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
|
||||
buffer->h->dts_offset = 0;
|
||||
buffer = spa_list_first(&port->free, struct buffer, link);
|
||||
spa_list_remove(&buffer->link);
|
||||
|
||||
port->current_buffer = buffer;
|
||||
port->ready_offset = 0;
|
||||
spa_log_trace(this->log, "dequeue %d", buffer->id);
|
||||
|
||||
if (buffer->h) {
|
||||
buffer->h->seq = this->sample_count;
|
||||
buffer->h->pts = SPA_TIMESPEC_TO_NSEC(&this->now);
|
||||
buffer->h->dts_offset = 0;
|
||||
}
|
||||
} else {
|
||||
buffer = port->current_buffer;
|
||||
}
|
||||
datas = buffer->buf->datas;
|
||||
|
||||
/* copy data into buffer */
|
||||
memcpy ((uint8_t *)datas[0].data, read_decoded, decoded);
|
||||
avail = SPA_MIN(decoded, datas[0].maxsize - port->ready_offset);
|
||||
if (avail < decoded)
|
||||
spa_log_warn(this->log, NAME ": buffer too small (%d > %d)", decoded, avail);
|
||||
memcpy ((uint8_t *)datas[0].data + port->ready_offset, read_decoded, avail);
|
||||
port->ready_offset += avail;
|
||||
this->sample_count += decoded / port->frame_size;
|
||||
|
||||
/* send buffer */
|
||||
datas[0].chunk->offset = 0;
|
||||
datas[0].chunk->size = decoded;
|
||||
datas[0].chunk->stride = port->frame_size;
|
||||
/* send buffer if full */
|
||||
min_data = SPA_MIN(this->props.min_latency * port->frame_size, datas[0].maxsize / 2);
|
||||
if (port->ready_offset >= min_data) {
|
||||
uint64_t sample_count;
|
||||
|
||||
this->sample_count += datas[0].chunk->size / port->frame_size;
|
||||
datas[0].chunk->offset = 0;
|
||||
datas[0].chunk->size = port->ready_offset;
|
||||
datas[0].chunk->stride = port->frame_size;
|
||||
|
||||
spa_log_debug(this->log, "queue %d", buffer->id);
|
||||
spa_list_append(&port->ready, &buffer->link);
|
||||
port->n_ready++;
|
||||
sample_count = datas[0].chunk->size / port->frame_size;
|
||||
|
||||
if (!this->following && this->clock) {
|
||||
this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now);
|
||||
this->clock->position = this->sample_count;
|
||||
this->clock->delay = 0;
|
||||
this->clock->rate_diff = 1.0f;
|
||||
this->clock->next_nsec = this->clock->nsec;
|
||||
spa_log_trace(this->log, "queue %d", buffer->id);
|
||||
spa_list_append(&port->ready, &buffer->link);
|
||||
port->current_buffer = NULL;
|
||||
|
||||
if (!this->following && this->clock) {
|
||||
this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now);
|
||||
this->clock->duration = sample_count * this->clock->rate.denom / port->current_format.info.raw.rate;
|
||||
this->clock->position = this->sample_count * this->clock->rate.denom / port->current_format.info.raw.rate;
|
||||
this->clock->delay = 0;
|
||||
this->clock->rate_diff = 1.0f;
|
||||
this->clock->next_nsec = this->clock->nsec + (uint64_t)sample_count * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate;
|
||||
}
|
||||
}
|
||||
|
||||
/* done if there are no buffers ready */
|
||||
|
|
@ -475,8 +519,6 @@ static void a2dp_on_ready_read(struct spa_source *source)
|
|||
|
||||
b = spa_list_first(&port->ready, struct buffer, link);
|
||||
spa_list_remove(&b->link);
|
||||
if (--port->n_ready == 0)
|
||||
port->buffering = true;
|
||||
b->outstanding = true;
|
||||
|
||||
io->buffer_id = b->id;
|
||||
|
|
@ -538,6 +580,7 @@ static int transport_start(struct impl *this)
|
|||
spa_loop_add_source(this->data_loop, &this->source);
|
||||
|
||||
this->sample_count = 0;
|
||||
this->skip_count = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
@ -551,8 +594,8 @@ static int do_start(struct impl *this)
|
|||
|
||||
this->following = is_following(this);
|
||||
|
||||
spa_log_debug(this->log, NAME" %p: start state:%d",
|
||||
this, this->transport->state);
|
||||
spa_log_debug(this->log, NAME" %p: start state:%d following:%d",
|
||||
this, this->transport->state, this->following);
|
||||
|
||||
spa_return_val_if_fail(this->transport != NULL, -EIO);
|
||||
|
||||
|
|
@ -844,10 +887,9 @@ static int clear_buffers(struct impl *this, struct port *port)
|
|||
if (port->n_buffers > 0) {
|
||||
spa_list_init(&port->free);
|
||||
spa_list_init(&port->ready);
|
||||
port->n_ready = 0;
|
||||
port->buffering = true;
|
||||
port->n_buffers = 0;
|
||||
}
|
||||
port->current_buffer = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -1041,7 +1083,7 @@ static int impl_node_process(void *object)
|
|||
io = port->io;
|
||||
spa_return_val_if_fail(io != NULL, -EIO);
|
||||
|
||||
spa_log_debug(this->log, "%p status:%d %d", this, io->status, port->n_ready);
|
||||
spa_log_trace(this->log, "%p status:%d", this, io->status);
|
||||
|
||||
/* Return if we already have a buffer */
|
||||
if (io->status == SPA_STATUS_HAVE_DATA)
|
||||
|
|
@ -1056,16 +1098,10 @@ static int impl_node_process(void *object)
|
|||
/* Return if there are no buffers ready to be processed */
|
||||
if (spa_list_is_empty(&port->ready))
|
||||
return SPA_STATUS_OK;
|
||||
if (port->buffering && port->n_ready < 4)
|
||||
return SPA_STATUS_OK;
|
||||
|
||||
port->buffering = false;
|
||||
|
||||
/* Get the new buffer from the ready list */
|
||||
buffer = spa_list_first(&port->ready, struct buffer, link);
|
||||
spa_list_remove(&buffer->link);
|
||||
if (--port->n_ready == 0)
|
||||
port->buffering = true;
|
||||
buffer->outstanding = true;
|
||||
|
||||
/* Set the new buffer in IO */
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue