a2dp: handle slaving

This commit is contained in:
Wim Taymans 2019-04-24 12:41:16 +02:00
parent bc85837e26
commit f2cdba1929

View file

@ -106,14 +106,18 @@ struct impl {
struct port port;
bool opened;
unsigned int opened:1;
unsigned int started:1;
unsigned int slaved:1;
bool started;
struct spa_source source;
int timerfd;
int threshold;
struct spa_source flush_source;
struct spa_io_clock *clock;
struct spa_io_position *position;
sbc_t sbc;
int read_size;
int write_size;
@ -234,8 +238,69 @@ static int impl_node_enum_params(struct spa_node *node, int seq,
return 0;
}
static inline bool is_slaved(struct impl *this)
{
return this->position && this->clock && this->position->clock.id != this->clock->id;
}
static int set_timers(struct impl *this)
{
struct itimerspec ts;
int res;
ts.it_value.tv_sec = 0;
if (this->slaved) {
ts.it_value.tv_nsec = 0;
} else {
ts.it_value.tv_nsec = 1;
}
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
res = timerfd_settime(this->timerfd, 0, &ts, NULL);
this->source.mask = SPA_IO_IN;
spa_loop_update_source(this->data_loop, &this->source);
return res;
}
static int do_reslave(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct impl *this = user_data;
set_timers(this);
return 0;
}
static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size_t size)
{
struct impl *this;
bool slaved;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
switch (id) {
case SPA_IO_Clock:
this->clock = data;
break;
case SPA_IO_Position:
this->position = data;
break;
default:
return -ENOENT;
}
slaved = is_slaved(this);
if (this->started && slaved != this->slaved) {
spa_log_debug(this->log, "a2dp-sink %p: reslave %d->%d", this, this->slaved, slaved);
this->slaved = slaved;
spa_loop_invoke(this->data_loop, do_reslave, 0, NULL, 0, true, this);
}
return 0;
}
@ -333,8 +398,9 @@ static int encode_buffer(struct impl *this, const void *data, int size)
ssize_t out_encoded;
struct port *port = &this->port;
spa_log_trace(this->log, "a2dp-sink %p: encode %d used %d, %d %d",
this, size, this->buffer_used, port->frame_size, this->write_size);
spa_log_trace(this->log, "a2dp-sink %p: encode %d used %d, %d %d %d/%d",
this, size, this->buffer_used, port->frame_size, this->write_size,
this->frame_count, MAX_FRAME_COUNT);
if (this->frame_count > MAX_FRAME_COUNT)
return -ENOSPC;
@ -552,30 +618,32 @@ static int flush_data(struct impl *this, uint64_t now_time)
spa_log_trace(this->log, "%ld %ld %ld %ld %d",
now_time, queued, this->sample_time, elapsed, this->write_samples);
if (queued < FILL_FRAMES * this->write_samples) {
queued = (FILL_FRAMES + 1) * this->write_samples;
if (this->sample_time < elapsed) {
this->sample_time = queued;
this->start_time = now_time;
if (!this->slaved) {
if (queued < FILL_FRAMES * this->write_samples) {
queued = (FILL_FRAMES + 1) * this->write_samples;
if (this->sample_time < elapsed) {
this->sample_time = queued;
this->start_time = now_time;
}
if (!spa_list_is_empty(&port->ready) &&
now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
reduce_bitpool(this);
this->last_error = now_time;
}
}
if (!spa_list_is_empty(&port->ready) &&
now_time - this->last_error > SPA_NSEC_PER_SEC / 2) {
reduce_bitpool(this);
this->last_error = now_time;
}
calc_timeout(queued,
FILL_FRAMES * this->write_samples,
port->current_format.info.raw.rate,
&this->now, &ts.it_value);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
timerfd_settime(this->timerfd, TFD_TIMER_ABSTIME, &ts, NULL);
this->source.mask = SPA_IO_IN;
spa_loop_update_source(this->data_loop, &this->source);
} else {
this->start_time = now_time;
this->sample_time = 0;
}
calc_timeout(queued,
FILL_FRAMES * this->write_samples,
port->current_format.info.raw.rate,
&this->now, &ts.it_value);
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
timerfd_settime(this->timerfd, TFD_TIMER_ABSTIME, &ts, NULL);
this->source.mask = SPA_IO_IN;
spa_loop_update_source(this->data_loop, &this->source);
return 0;
}
@ -613,7 +681,7 @@ static void a2dp_on_timeout(struct spa_source *source)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
clock_gettime(CLOCK_MONOTONIC, &this->now);
now_time = this->now.tv_sec * SPA_NSEC_PER_SEC + this->now.tv_nsec;
now_time = SPA_TIMESPEC_TO_NSEC(&this->now);
spa_log_trace(this->log, "timeout %ld %ld", now_time, now_time - this->last_time);
this->last_time = now_time;
@ -720,12 +788,13 @@ static int do_start(struct impl *this)
{
int res, val;
socklen_t len;
struct itimerspec ts;
if (this->started)
return 0;
spa_log_trace(this->log, "a2dp-sink %p: start", this);
this->slaved = is_slaved(this);
spa_log_debug(this->log, "a2dp-sink %p: start slaved:%d", this, this->slaved);
if ((res = this->transport->acquire(this->transport, false)) < 0)
return res;
@ -768,12 +837,7 @@ static int do_start(struct impl *this)
this->flush_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_source);
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 1;
ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
timerfd_settime(this->timerfd, 0, &ts, NULL);
set_timers(this);
this->started = true;
return 0;
@ -1027,7 +1091,7 @@ impl_node_port_enum_params(struct spa_node *node, int seq,
this->props.min_latency * port->frame_size,
this->props.min_latency * port->frame_size,
INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(0),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size),
SPA_PARAM_BUFFERS_align, SPA_POD_Int(16));
break;
@ -1249,6 +1313,7 @@ static int impl_node_process(struct spa_node *node)
struct impl *this;
struct port *port;
struct spa_io_buffers *io;
uint64_t now_time;
spa_return_val_if_fail(node != NULL, -EINVAL);
@ -1257,9 +1322,14 @@ static int impl_node_process(struct spa_node *node)
io = port->io;
spa_return_val_if_fail(io != NULL, -EIO);
clock_gettime(CLOCK_MONOTONIC, &this->now);
now_time = SPA_TIMESPEC_TO_NSEC(&this->now);
if (!spa_list_is_empty(&port->ready))
flush_data(this, now_time);
if (io->status == SPA_STATUS_HAVE_BUFFER && io->buffer_id < port->n_buffers) {
struct buffer *b = &port->buffers[io->buffer_id];
uint64_t now_time;
if (!b->outstanding) {
spa_log_warn(this->log, NAME " %p: buffer %u in use", this, io->buffer_id);
@ -1275,13 +1345,11 @@ static int impl_node_process(struct spa_node *node)
this->threshold = SPA_MIN(b->buf->datas[0].chunk->size / port->frame_size,
this->props.max_latency);
clock_gettime(CLOCK_MONOTONIC, &this->now);
now_time = this->now.tv_sec * SPA_NSEC_PER_SEC + this->now.tv_nsec;
flush_data(this, now_time);
io->status = SPA_STATUS_OK;
}
return SPA_STATUS_HAVE_BUFFER;
}
@ -1375,7 +1443,8 @@ impl_init(const struct spa_handle_factory *factory,
reset_props(&this->props);
this->info_all = SPA_NODE_CHANGE_MASK_FLAGS |
SPA_NODE_CHANGE_MASK_PARAMS;
SPA_NODE_CHANGE_MASK_PARAMS |
SPA_NODE_CHANGE_MASK_PROPS;
this->info = SPA_NODE_INFO_INIT();
this->info.flags = SPA_NODE_FLAG_RT;
this->params[0] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);