sco-sink: fix several timeout issues

This commit is contained in:
Julian Bouzas 2019-08-21 16:15:09 -04:00 committed by Wim Taymans
parent 1a4713ee3a
commit 3a43fac0c2

View file

@ -51,6 +51,7 @@ struct props {
uint32_t max_latency; uint32_t max_latency;
}; };
#define FILL_FRAMES 2
#define MAX_BUFFERS 32 #define MAX_BUFFERS 32
struct buffer { struct buffer {
@ -228,7 +229,7 @@ static void set_timeout(struct impl *this, time_t sec, long nsec)
ts.it_interval.tv_sec = 0; ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0; ts.it_interval.tv_nsec = 0;
timerfd_settime(this->timerfd, TFD_TIMER_ABSTIME, &ts, NULL); spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL);
this->source.mask = SPA_IO_IN; this->source.mask = SPA_IO_IN;
spa_loop_update_source(this->data_loop, &this->source); spa_loop_update_source(this->data_loop, &this->source);
} }
@ -246,22 +247,22 @@ static void set_next_timeout(struct impl *this, uint64_t now_time)
/* Set the next timeout if not slaved, otherwise reset values */ /* Set the next timeout if not slaved, otherwise reset values */
if (!this->slaved) { if (!this->slaved) {
/* Get the elapsed time */ /* Get the elapsed time */
const uint64_t elapsed_time = now_time - this->start_time; uint64_t elapsed_time = 0;
if (now_time > this->start_time)
elapsed_time = now_time - this->start_time;
/* Get the elapsed samples */ /* Get the elapsed samples */
const uint64_t elapsed_samples = elapsed_time * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC; const uint64_t elapsed_samples = elapsed_time * port->current_format.info.raw.rate / SPA_NSEC_PER_SEC;
/* Get the queued samples (processed - elapsed) */ /* Get the queued samples (processed - elapsed) */
const uint64_t queued_samples = this->sample_count - elapsed_samples; const uint64_t queued_samples = (this->sample_count - elapsed_samples);
/* Get the queued time */ /* Get the queued time */
const uint64_t queued_time = (queued_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate; const uint64_t queued_time = (queued_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate;
/* Get the next time */
const uint64_t next_time = now_time + queued_time;
/* Set the next timeout */ /* Set the next timeout */
set_timeout (this, next_time / SPA_NSEC_PER_SEC, next_time % SPA_NSEC_PER_SEC); set_timeout (this, queued_time / SPA_NSEC_PER_SEC, queued_time % SPA_NSEC_PER_SEC);
} else { } else {
this->start_time = now_time; this->start_time = now_time;
this->sample_count = 0; this->sample_count = 0;
@ -341,13 +342,12 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags,
return 0; return 0;
} }
static bool write_data(struct impl *this, uint8_t *data, uint32_t size, uint32_t *total_written) static bool write_data(struct impl *this, const uint8_t *data, uint32_t size, uint32_t *total_written)
{ {
uint32_t local_total_written = 0; uint32_t local_total_written = 0;
const uint32_t mtu_size = this->transport->write_mtu; const uint32_t mtu_size = this->transport->write_mtu;
/* TODO: For now we assume the size is always a mutliple of mtu_size */ while (local_total_written <= (size - mtu_size)) {
while (local_total_written < (size - mtu_size)) {
const int bytes_written = write(this->sock_fd, data, mtu_size); const int bytes_written = write(this->sock_fd, data, mtu_size);
if (bytes_written < 0) { if (bytes_written < 0) {
spa_log_warn(this->log, "error writting data: %s", strerror(errno)); spa_log_warn(this->log, "error writting data: %s", strerror(errno));
@ -358,6 +358,10 @@ static bool write_data(struct impl *this, uint8_t *data, uint32_t size, uint32_t
local_total_written += bytes_written; local_total_written += bytes_written;
} }
/* TODO: For now we assume the size is always a mutliple of mtu_size */
if (local_total_written != size)
spa_log_warn(this->log, "dropping some audio as buffer size is not multiple of mtu");
if (total_written) if (total_written)
*total_written = local_total_written; *total_written = local_total_written;
return true; return true;
@ -385,9 +389,15 @@ static int render_buffers(struct impl *this, uint64_t now_time)
size = d[0].chunk->size; size = d[0].chunk->size;
/* Write data */ /* Write data */
write_data(this, src + offset, size, &total_written); if (!write_data(this, src + offset, size, &total_written)) {
port->need_data = true;
spa_list_remove(&b->link);
b->outstanding = true;
spa_node_call_reuse_buffer(&this->callbacks, 0, b->id);
break;
}
/* Update the cample count */ /* Update the sample count */
this->sample_count += total_written / port->frame_size; this->sample_count += total_written / port->frame_size;
/* Remove the buffer and mark it as reusable */ /* Remove the buffer and mark it as reusable */
@ -402,6 +412,30 @@ static int render_buffers(struct impl *this, uint64_t now_time)
return 0; return 0;
} }
static void fill_socket (struct impl *this)
{
struct port *port = &this->port;
static const uint8_t zero_buffer[1024 * 4] = { 0, };
uint32_t fill_size = this->transport->write_mtu;
uint32_t fills = 0;
uint32_t total_written = 0;
/* Fill the socked */
while (fills < FILL_FRAMES) {
uint32_t written = 0;
/* Write the data */
if (!write_data(this, zero_buffer, fill_size, &written))
break;
total_written += written;
fills++;
}
/* Update the sample count */
this->sample_count += total_written / port->frame_size;
}
static void sco_on_flush(struct spa_source *source) static void sco_on_flush(struct spa_source *source)
{ {
struct impl *this = source->data; struct impl *this = source->data;
@ -434,16 +468,18 @@ static void sco_on_timeout(struct spa_source *source)
struct spa_io_buffers *io = port->io; struct spa_io_buffers *io = port->io;
/* Read the timerfd */ /* Read the timerfd */
if (this->started && read(this->timerfd, &exp, sizeof(uint64_t)) != sizeof(uint64_t)) if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
/* Get the current time */ /* Get the current time */
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now);
now_time = SPA_TIMESPEC_TO_NSEC(&this->now); now_time = SPA_TIMESPEC_TO_NSEC(&this->now);
/* Set the start time to the current time */ /* If this is the first timeout, set the start time and fill the socked */
if (this->start_time == 0) if (this->start_time == 0) {
fill_socket (this);
this->start_time = now_time; this->start_time = now_time;
}
/* Notify we need a new buffer if we have processed all of them */ /* Notify we need a new buffer if we have processed all of them */
if (spa_list_is_empty(&port->ready) || port->need_data) { if (spa_list_is_empty(&port->ready) || port->need_data) {
@ -479,12 +515,12 @@ static int do_start(struct impl *this)
return -1; return -1;
/* Set the write MTU */ /* Set the write MTU */
val = this->transport->write_mtu; val = FILL_FRAMES * this->transport->write_mtu;
if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0) if (setsockopt(this->sock_fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "sco-sink %p: SO_SNDBUF %m", this); spa_log_warn(this->log, "sco-sink %p: SO_SNDBUF %m", this);
/* Set the read MTU */ /* Set the read MTU */
val = this->transport->read_mtu; val = FILL_FRAMES * this->transport->read_mtu;
if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0) if (setsockopt(this->sock_fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)) < 0)
spa_log_warn(this->log, "sco-sink %p: SO_RCVBUF %m", this); spa_log_warn(this->log, "sco-sink %p: SO_RCVBUF %m", this);
@ -526,15 +562,10 @@ static int do_remove_source(struct spa_loop *loop,
void *user_data) void *user_data)
{ {
struct impl *this = user_data; struct impl *this = user_data;
struct itimerspec ts;
if (this->source.loop) if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source); spa_loop_remove_source(this->data_loop, &this->source);
ts.it_value.tv_sec = 0; reset_timeout (this);
ts.it_value.tv_nsec = 0;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
timerfd_settime(this->timerfd, 0, &ts, NULL);
if (this->flush_source.loop) if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source); spa_loop_remove_source(this->data_loop, &this->flush_source);
@ -953,7 +984,6 @@ static int impl_node_process(void *object)
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
this = SPA_CONTAINER_OF(this, struct impl, node);
port = &this->port; port = &this->port;
io = port->io; io = port->io;
spa_return_val_if_fail(io != NULL, -EIO); spa_return_val_if_fail(io != NULL, -EIO);