bluez5: change sink/source run state follow transport state

Allow asynchronous changes in transport state in the sinks/sources.

Also allow transport acquire to be actually synchronous, in this case it
must set transport state during acquire call.

Separate driver start/stop from transport start/stop.
This commit is contained in:
Pauli Virtanen 2023-03-06 22:40:33 +02:00 committed by P V
parent 60718c4b4f
commit 1d5c693d33
5 changed files with 439 additions and 145 deletions

View file

@ -116,6 +116,8 @@ struct impl {
struct port port;
unsigned int started:1;
unsigned int start_ready:1;
unsigned int transport_started:1;
unsigned int following:1;
unsigned int is_output:1;
unsigned int flush_pending:1;
@ -617,6 +619,8 @@ static int flush_data(struct impl *this, uint64_t now_time)
struct port *port = &this->port;
int unused_buffer;
spa_assert(this->transport_started);
if (this->transport == NULL || !this->flush_source.loop || !this->flush_timer_source.loop) {
/* I/O in error state */
return -EIO;
@ -907,25 +911,22 @@ static void media_on_timeout(struct spa_source *source)
set_timeout(this, this->next_time);
}
static int do_start(struct impl *this)
static int transport_start(struct impl *this)
{
int res, val, size;
int val, size;
struct port *port;
socklen_t len;
uint8_t *conf;
uint32_t flags;
if (this->started)
if (this->transport_started)
return 0;
if (!this->start_ready)
return -EIO;
spa_return_val_if_fail(this->transport, -EIO);
this->following = is_following(this);
spa_log_debug(this->log, "%p: start following:%d", this, this->following);
if ((res = spa_bt_transport_acquire(this->transport, false)) < 0)
return res;
spa_log_debug(this->log, "%p: start transport", this);
port = &this->port;
@ -984,13 +985,6 @@ static int do_start(struct impl *this)
reset_buffer(this);
this->source.data = this;
this->source.fd = this->timerfd;
this->source.func = media_on_timeout;
this->source.mask = SPA_IO_IN;
this->source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->source);
this->flush_timer_source.data = this;
this->flush_timer_source.fd = this->flush_timerfd;
this->flush_timer_source.func = media_on_flush_timeout;
@ -1007,7 +1001,40 @@ static int do_start(struct impl *this)
this->flush_pending = false;
this->transport_started = true;
return 0;
}
static int do_start(struct impl *this)
{
int res;
if (this->started)
return 0;
spa_return_val_if_fail(this->transport, -EIO);
this->following = is_following(this);
spa_log_debug(this->log, "%p: start following:%d", this, this->following);
this->start_ready = true;
if ((res = spa_bt_transport_acquire(this->transport, false)) < 0) {
this->start_ready = false;
return res;
}
this->source.data = this;
this->source.fd = this->timerfd;
this->source.func = media_on_timeout;
this->source.mask = SPA_IO_IN;
this->source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->source);
set_timers(this);
this->started = true;
return 0;
@ -1031,6 +1058,21 @@ static int do_remove_source(struct spa_loop *loop,
ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL);
return 0;
}
static int do_remove_transport_source(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct impl *this = user_data;
struct itimerspec ts;
this->transport_started = false;
if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source);
@ -1045,6 +1087,20 @@ static int do_remove_source(struct spa_loop *loop,
return 0;
}
static void transport_stop(struct impl *this)
{
if (!this->transport_started)
return;
spa_log_trace(this->log, "%p: stop transport", this);
spa_loop_invoke(this->data_loop, do_remove_transport_source, 0, NULL, 0, true, this);
if (this->codec_data)
this->codec->deinit(this->codec_data);
this->codec_data = NULL;
}
static int do_stop(struct impl *this)
{
int res = 0;
@ -1052,18 +1108,18 @@ static int do_stop(struct impl *this)
if (!this->started)
return 0;
spa_log_trace(this->log, "%p: stop", this);
spa_log_debug(this->log, "%p: stop", this);
this->start_ready = false;
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
this->started = false;
transport_stop(this);
if (this->transport)
res = spa_bt_transport_release(this->transport);
if (this->codec_data)
this->codec->deinit(this->codec_data);
this->codec_data = NULL;
this->started = false;
return res;
}
@ -1506,6 +1562,9 @@ static int impl_node_process(void *object)
return SPA_STATUS_HAVE_DATA;
}
if (!this->started || !this->transport_started)
return SPA_STATUS_OK;
if (io->status == SPA_STATUS_HAVE_DATA && io->buffer_id < port->n_buffers) {
struct buffer *b = &port->buffers[io->buffer_id];
@ -1598,26 +1657,32 @@ static void transport_state_changed(void *data,
enum spa_bt_transport_state state)
{
struct impl *this = data;
bool was_started = this->transport_started;
spa_log_debug(this->log, "%p: transport %p state %d->%d", this, this->transport, old, state);
if (state < SPA_BT_TRANSPORT_STATE_ACTIVE && old == SPA_BT_TRANSPORT_STATE_ACTIVE &&
this->started) {
uint8_t buffer[1024];
struct spa_pod_builder b = { 0 };
spa_log_debug(this->log, "%p: transport %p becomes inactive: stop and indicate error",
this, this->transport);
if (state == SPA_BT_TRANSPORT_STATE_ACTIVE)
transport_start(this);
else
transport_stop(this);
if (state < SPA_BT_TRANSPORT_STATE_ACTIVE && was_started) {
/*
* If establishing connection fails due to remote end not activating
* the transport, we won't get a write error, but instead see a transport
* state change.
*
* Stop and emit a node error, to let upper levels handle it.
* Emit a node error, to let upper levels handle it.
*/
do_stop(this);
spa_log_debug(this->log, "%p: transport %p becomes inactive: stop and indicate error",
this, this->transport);
state = SPA_BT_TRANSPORT_STATE_ERROR;
}
if (state == SPA_BT_TRANSPORT_STATE_ERROR) {
uint8_t buffer[1024];
struct spa_pod_builder b = { 0 };
spa_pod_builder_init(&b, buffer, sizeof(buffer));
spa_node_emit_event(&this->hooks,