bluez5: media-sink: flush packets at time of first sample

Send encoded data packets at the time corresponding to their first
sample. This is simpler than what we did previously.

Use this scheme also for BAP.
This commit is contained in:
Pauli Virtanen 2022-07-09 18:06:36 +03:00 committed by Wim Taymans
parent d231e2a1b1
commit 9cfa66baa2

View file

@ -62,17 +62,14 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.sink.media"
#define DEFAULT_CLOCK_NAME "clock.system.monotonic" #define DEFAULT_CLOCK_NAME "clock.system.monotonic"
struct props { struct props {
uint32_t min_latency;
uint32_t max_latency;
int64_t latency_offset; int64_t latency_offset;
char clock_name[64]; char clock_name[64];
}; };
#define FILL_FRAMES 4 #define FILL_FRAMES 4
#define MIN_BUFFERS 2
#define MAX_BUFFERS 32 #define MAX_BUFFERS 32
#define MIN_LATENCY 128 #define BUFFER_SIZE (8192*8)
#define MAX_LATENCY 8192
#define BUFFER_SIZE (MAX_LATENCY*8)
struct buffer { struct buffer {
uint32_t id; uint32_t id;
@ -121,6 +118,8 @@ struct impl {
struct spa_hook_list hooks; struct spa_hook_list hooks;
struct spa_callbacks callbacks; struct spa_callbacks callbacks;
uint32_t quantum_limit;
uint64_t info_all; uint64_t info_all;
struct spa_node_info info; struct spa_node_info info;
#define IDX_PropInfo 0 #define IDX_PropInfo 0
@ -137,6 +136,7 @@ struct impl {
unsigned int started:1; unsigned int started:1;
unsigned int following:1; unsigned int following:1;
unsigned int is_output:1; unsigned int is_output:1;
unsigned int flush_pending:1;
unsigned int is_duplex:1; unsigned int is_duplex:1;
@ -152,6 +152,10 @@ struct impl {
uint64_t current_time; uint64_t current_time;
uint64_t next_time; uint64_t next_time;
uint64_t last_error; uint64_t last_error;
uint64_t process_time;
uint64_t prev_flush_time;
uint64_t next_flush_time;
const struct media_codec *codec; const struct media_codec *codec;
bool codec_props_changed; bool codec_props_changed;
@ -161,30 +165,23 @@ struct impl {
int need_flush; int need_flush;
bool fragment; bool fragment;
uint64_t fragment_timeout;
uint32_t block_size; uint32_t block_size;
uint8_t buffer[BUFFER_SIZE]; uint8_t buffer[BUFFER_SIZE];
uint32_t buffer_used; uint32_t buffer_used;
uint32_t header_size; uint32_t header_size;
uint32_t frame_count; uint32_t block_count;
uint16_t seqnum; uint16_t seqnum;
uint32_t timestamp; uint32_t timestamp;
uint64_t sample_count; uint64_t sample_count;
uint8_t tmp_buffer[BUFFER_SIZE]; uint8_t tmp_buffer[BUFFER_SIZE];
uint32_t tmp_buffer_used; uint32_t tmp_buffer_used;
uint32_t fd_buffer_size; uint32_t fd_buffer_size;
/* Times */
uint64_t start_time;
uint64_t total_samples;
}; };
#define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0) #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == 0)
static void reset_props(struct impl *this, struct props *props) static void reset_props(struct impl *this, struct props *props)
{ {
props->min_latency = MIN_LATENCY;
props->max_latency = MAX_LATENCY;
props->latency_offset = 0; props->latency_offset = 0;
strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name)); strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name));
} }
@ -214,24 +211,8 @@ static int impl_node_enum_params(void *object, int seq,
switch (id) { switch (id) {
case SPA_PARAM_PropInfo: case SPA_PARAM_PropInfo:
{ {
struct props *p = &this->props;
switch (result.index) { switch (result.index) {
case 0: case 0:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency),
SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX));
break;
case 1:
param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency),
SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"),
SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX));
break;
case 2:
param = spa_pod_builder_add_object(&b, param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_PropInfo, id, SPA_TYPE_OBJECT_PropInfo, id,
SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_latencyOffsetNsec), SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_latencyOffsetNsec),
@ -240,7 +221,7 @@ static int impl_node_enum_params(void *object, int seq,
break; break;
default: default:
enum_codec = true; enum_codec = true;
index_offset = 3; index_offset = 1;
} }
break; break;
} }
@ -252,8 +233,6 @@ static int impl_node_enum_params(void *object, int seq,
case 0: case 0:
param = spa_pod_builder_add_object(&b, param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_Props, id, SPA_TYPE_OBJECT_Props, id,
SPA_PROP_minLatency, SPA_POD_Int(p->min_latency),
SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency),
SPA_PROP_latencyOffsetNsec, SPA_POD_Long(p->latency_offset)); SPA_PROP_latencyOffsetNsec, SPA_POD_Long(p->latency_offset));
break; break;
default: default:
@ -391,8 +370,6 @@ static int apply_props(struct impl *this, const struct spa_pod *param)
} else { } else {
spa_pod_parse_object(param, spa_pod_parse_object(param,
SPA_TYPE_OBJECT_Props, NULL, SPA_TYPE_OBJECT_Props, NULL,
SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency),
SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency),
SPA_PROP_latencyOffsetNsec, SPA_POD_OPT_Long(&new_props.latency_offset)); SPA_PROP_latencyOffsetNsec, SPA_POD_OPT_Long(&new_props.latency_offset));
} }
@ -444,7 +421,7 @@ static int reset_buffer(struct impl *this)
this->codec_props_changed = false; this->codec_props_changed = false;
} }
this->need_flush = 0; this->need_flush = 0;
this->frame_count = 0; this->block_count = 0;
this->fragment = false; this->fragment = false;
this->buffer_used = this->codec->start_encode(this->codec_data, this->buffer_used = this->codec->start_encode(this->codec_data,
this->buffer, sizeof(this->buffer), this->buffer, sizeof(this->buffer),
@ -469,20 +446,32 @@ static int get_transport_unused_size(struct impl *this)
static int send_buffer(struct impl *this) static int send_buffer(struct impl *this)
{ {
int written, unsent; int written, unsent;
unsent = get_transport_unused_size(this); unsent = get_transport_unused_size(this);
if (unsent >= 0) { if (unsent >= 0) {
unsent = this->fd_buffer_size - unsent; unsent = this->fd_buffer_size - unsent;
this->codec->abr_process(this->codec_data, unsent); this->codec->abr_process(this->codec_data, unsent);
} }
spa_log_trace(this->log, "%p: send %d %u %u %u %u",
this, this->frame_count, this->block_size, this->seqnum,
this->timestamp, this->buffer_used);
written = send(this->flush_source.fd, this->buffer, written = send(this->flush_source.fd, this->buffer,
this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL); this->buffer_used, MSG_DONTWAIT | MSG_NOSIGNAL);
spa_log_trace(this->log, "%p: send %d", this, written); if (SPA_UNLIKELY(spa_log_level_topic_enabled(this->log, SPA_LOG_TOPIC_DEFAULT, SPA_LOG_LEVEL_TRACE))) {
struct timespec ts;
uint64_t now;
uint64_t dt;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts);
now = SPA_TIMESPEC_TO_NSEC(&ts);
dt = now - this->prev_flush_time;
this->prev_flush_time = now;
spa_log_trace(this->log,
"%p: send blocks:%d block:%u seq:%u ts:%u size:%u "
"wrote:%d dt:%"PRIu64,
this, this->block_count, this->block_size, this->seqnum,
this->timestamp, this->buffer_used, written, dt);
}
if (written < 0) { if (written < 0) {
spa_log_debug(this->log, "%p: %m", this); spa_log_debug(this->log, "%p: %m", this);
@ -502,7 +491,7 @@ static int encode_buffer(struct impl *this, const void *data, uint32_t size)
spa_log_trace(this->log, "%p: encode %d used %d, %d %d %d", spa_log_trace(this->log, "%p: encode %d used %d, %d %d %d",
this, size, this->buffer_used, port->frame_size, this->block_size, this, size, this->buffer_used, port->frame_size, this->block_size,
this->frame_count); this->block_count);
if (this->need_flush) if (this->need_flush)
return 0; return 0;
@ -530,7 +519,7 @@ static int encode_buffer(struct impl *this, const void *data, uint32_t size)
return processed; return processed;
this->sample_count += processed / port->frame_size; this->sample_count += processed / port->frame_size;
this->frame_count += processed / this->block_size; this->block_count += processed / this->block_size;
this->buffer_used += out_encoded; this->buffer_used += out_encoded;
spa_log_trace(this->log, "%p: processed %d %zd used %d", spa_log_trace(this->log, "%p: processed %d %zd used %d",
@ -551,7 +540,7 @@ static int encode_fragment(struct impl *this)
spa_log_trace(this->log, "%p: encode fragment used %d, %d %d %d", spa_log_trace(this->log, "%p: encode fragment used %d, %d %d %d",
this, this->buffer_used, port->frame_size, this->block_size, this, this->buffer_used, port->frame_size, this->block_size,
this->frame_count); this->block_count);
if (this->need_flush) if (this->need_flush)
return 0; return 0;
@ -602,46 +591,41 @@ static int add_data(struct impl *this, const void *data, uint32_t size)
return total; return total;
} }
static void enable_flush(struct impl *this, bool enabled, uint64_t timeout) static void enable_flush_timer(struct impl *this, bool enabled)
{ {
bool flush_enabled = enabled && (timeout == 0);
struct itimerspec ts; struct itimerspec ts;
if (SPA_FLAG_IS_SET(this->flush_source.mask, SPA_IO_OUT) != flush_enabled) {
SPA_FLAG_UPDATE(this->flush_source.mask, SPA_IO_OUT, flush_enabled);
spa_loop_update_source(this->data_loop, &this->flush_source);
}
if (!enabled) if (!enabled)
timeout = 0; this->next_flush_time = 0;
ts.it_value.tv_sec = timeout / SPA_NSEC_PER_SEC; ts.it_value.tv_sec = this->next_flush_time / SPA_NSEC_PER_SEC;
ts.it_value.tv_nsec = timeout % SPA_NSEC_PER_SEC; ts.it_value.tv_nsec = this->next_flush_time % SPA_NSEC_PER_SEC;
ts.it_interval.tv_sec = 0; ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0; ts.it_interval.tv_nsec = 0;
spa_system_timerfd_settime(this->data_system, spa_system_timerfd_settime(this->data_system,
this->flush_timerfd, 0, &ts, NULL); this->flush_timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL);
this->flush_pending = enabled;
} }
static uint64_t get_next_bap_timeout(struct impl *this) static uint32_t get_queued_frames(struct impl *this)
{ {
struct port *port = &this->port; struct port *port = &this->port;
uint64_t playback_time = 0, elapsed_time = 0, next_time = 0; uint32_t bytes = 0;
struct timespec now; struct buffer *b;
uint64_t now_time;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); spa_list_for_each(b, &port->ready, link) {
now_time = SPA_TIMESPEC_TO_NSEC(&now); struct spa_data *d = b->buf->datas;
if (this->start_time == 0)
this->start_time = now_time;
playback_time = (this->total_samples * SPA_NSEC_PER_SEC) / port->current_format.info.raw.rate; bytes += d[0].chunk->size;
if (now_time > this->start_time) }
elapsed_time = now_time - this->start_time;
if (elapsed_time < playback_time)
next_time = playback_time - elapsed_time;
return next_time; if (bytes > port->ready_offset)
bytes -= port->ready_offset;
else
bytes = 0;
return bytes / port->frame_size;
} }
static int flush_data(struct impl *this, uint64_t now_time) static int flush_data(struct impl *this, uint64_t now_time)
@ -720,13 +704,17 @@ again:
port->ready_offset = 0; port->ready_offset = 0;
} }
total_frames += n_frames; total_frames += n_frames;
this->total_samples += n_frames;
spa_log_trace(this->log, "%p: written %u frames", this, total_frames); spa_log_trace(this->log, "%p: written %u frames", this, total_frames);
} }
if (written > 0 && this->buffer_used == this->header_size) { if (written > 0 && this->buffer_used == this->header_size) {
enable_flush(this, false, 0); enable_flush_timer(this, false);
return 0;
}
if (this->flush_pending) {
spa_log_trace(this->log, "%p: wait for flush timer", this);
return 0; return 0;
} }
@ -746,109 +734,95 @@ again:
* glitch in any case. * glitch in any case.
*/ */
written = this->buffer_used; written = this->buffer_used;
reset_buffer(this);
} }
if (written < 0) { if (written < 0) {
spa_log_trace(this->log, "%p: error flushing %s", this, spa_log_trace(this->log, "%p: error flushing %s", this,
spa_strerror(written)); spa_strerror(written));
reset_buffer(this); reset_buffer(this);
enable_flush(this, false, 0); enable_flush_timer(this, false);
return written; return written;
} }
else if (written > 0) { else if (written > 0) {
if (this->codec->bap) { /*
uint64_t timeout = get_next_bap_timeout(this); * We cannot write all data we have at once, since this can exceed device
* buffers (esp. for the A2DP low-latency codecs) and socket buffers, so
* flush needs to be delayed.
*/
uint32_t packet_samples = this->block_count * this->block_size
/ port->frame_size;
uint64_t packet_time = (uint64_t)packet_samples * SPA_NSEC_PER_SEC
/ port->current_format.info.raw.rate;
if (SPA_LIKELY(this->position)) {
uint32_t frames = get_queued_frames(this);
uint64_t duration_ns;
reset_buffer(this);
if (!spa_list_is_empty(&port->ready)) {
spa_log_debug(this->log, "%p: flush after %d ns", this, (unsigned int)timeout);
if (timeout == 0)
goto again;
else
enable_flush(this, true, timeout);
} else {
enable_flush(this, false, 0);
}
} else {
/* /*
* We cannot write all data we have at once, since this can exceed * Flush at the time position of the next buffered sample.
* device buffers. We'll want a limited number of "excess" */
* samples. This is an issue for the "low-latency" A2DP codecs. duration_ns = ((uint64_t)this->position->clock.duration * SPA_NSEC_PER_SEC
* / this->position->clock.rate.denom);
* Flushing the rest of the data (if any) is delayed after a timeout, this->next_flush_time = this->process_time + duration_ns
* selected on an average-rate basis: - ((uint64_t)frames * SPA_NSEC_PER_SEC
* / port->current_format.info.raw.rate);
* npackets = quantum / packet_samples
* write_end_time = npackets * timeout
* max_excess = quantum - sample_rate * write_end_time
* packet_time = packet_samples / sample_rate
* => timeout = (quantum - max_excess)/quantum * packet_time
*/
uint64_t max_excess = 2*256;
uint64_t packet_samples = (uint64_t)this->frame_count * this->block_size / port->frame_size;
uint64_t packet_time = packet_samples * SPA_NSEC_PER_SEC / port->current_format.info.raw.rate;
uint64_t quantum = SPA_LIKELY(this->clock) ? this->clock->duration : 0;
uint64_t timeout = (quantum > max_excess) ?
(packet_time * (quantum - max_excess) / quantum) : 0;
if (this->need_flush == NEED_FLUSH_FRAGMENT) { /*
reset_buffer(this); * We could delay the output by one packet to avoid waiting
this->fragment = true; * for the next buffer and so make send intervals exactly regular.
this->fragment_timeout = (packet_samples > 0) ? timeout : this->fragment_timeout; * However, this is not needed for A2DP or BAP. The controller
goto again; * will do the scheduling for us, and there's also the socket buffer
} * in between.
if (this->fragment_timeout > 0) { */
timeout = this->fragment_timeout; #if 0
this->fragment_timeout = 0; this->next_flush_time += SPA_MIN(packet_time,
} duration_ns * (port->n_buffers - 1));
#endif
reset_buffer(this); } else {
if (now_time - this->last_error > SPA_NSEC_PER_SEC) { if (this->next_flush_time == 0)
if (get_transport_unused_size(this) == (int)this->fd_buffer_size) { this->next_flush_time = this->process_time;
spa_log_trace(this->log, "%p: increase bitpool", this); this->next_flush_time += packet_time;
this->codec->increase_bitpool(this->codec_data);
}
this->last_error = now_time;
}
if (!spa_list_is_empty(&port->ready)) {
spa_log_trace(this->log, "%p: flush after %d ns", this, (int)timeout);
if (timeout == 0)
goto again;
else
enable_flush(this, true, timeout);
} else {
enable_flush(this, false, 0);
}
} }
if (this->need_flush == NEED_FLUSH_FRAGMENT) {
reset_buffer(this);
this->fragment = true;
goto again;
}
if (now_time - this->last_error > SPA_NSEC_PER_SEC) {
if (get_transport_unused_size(this) == (int)this->fd_buffer_size) {
spa_log_trace(this->log, "%p: increase bitpool", this);
this->codec->increase_bitpool(this->codec_data);
}
this->last_error = now_time;
}
spa_log_trace(this->log, "%p: flush at:%"PRIu64" process:%"PRIu64, this,
this->next_flush_time, this->process_time);
reset_buffer(this);
enable_flush_timer(this, true);
} }
else { else {
/* Don't want to flush yet, or failed to write anything */ /* Don't want to flush yet, or failed to write anything */
spa_log_trace(this->log, "%p: skip flush", this); spa_log_trace(this->log, "%p: skip flush", this);
enable_flush(this, false, 0); enable_flush_timer(this, false);
} }
return 0; return 0;
} }
static void media_on_flush(struct spa_source *source) static void media_on_flush_error(struct spa_source *source)
{ {
struct impl *this = source->data; struct impl *this = source->data;
spa_log_trace(this->log, "%p: flushing", this); spa_log_trace(this->log, "%p: flush event", this);
if (!SPA_FLAG_IS_SET(source->rmask, SPA_IO_OUT)) { if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) {
spa_log_warn(this->log, "%p: error %d", this, source->rmask); spa_log_warn(this->log, "%p: error %d", this, source->rmask);
if (this->flush_source.loop) if (this->flush_source.loop)
spa_loop_remove_source(this->data_loop, &this->flush_source); spa_loop_remove_source(this->data_loop, &this->flush_source);
return; return;
} }
if (this->transport == NULL) {
enable_flush(this, false, 0);
return;
}
flush_data(this, this->current_time);
} }
static void media_on_flush_timeout(struct spa_source *source) static void media_on_flush_timeout(struct spa_source *source)
@ -862,11 +836,14 @@ static void media_on_flush_timeout(struct spa_source *source)
spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno));
if (this->transport == NULL) { if (this->transport == NULL) {
enable_flush(this, false, 0); enable_flush_timer(this, false);
return; return;
} }
flush_data(this, this->current_time); while (exp-- > 0) {
this->flush_pending = false;
flush_data(this, this->current_time);
}
} }
static void media_on_timeout(struct spa_source *source) static void media_on_timeout(struct spa_source *source)
@ -969,8 +946,8 @@ static int do_start(struct impl *this)
return -EIO; return -EIO;
spa_log_info(this->log, "%p: using %s codec %s, delay:%"PRIi64" ms", this, spa_log_info(this->log, "%p: using %s codec %s, delay:%"PRIi64" ms", this,
this->codec->bap ? "BAP" : "A2DP", this->codec->description, this->codec->bap ? "BAP" : "A2DP", this->codec->description,
(int64_t)(spa_bt_transport_get_delay_nsec(this->transport) / SPA_NSEC_PER_MSEC)); (int64_t)(spa_bt_transport_get_delay_nsec(this->transport) / SPA_NSEC_PER_MSEC));
this->seqnum = 0; this->seqnum = 0;
@ -1025,11 +1002,13 @@ static int do_start(struct impl *this)
this->flush_source.data = this; this->flush_source.data = this;
this->flush_source.fd = this->transport->fd; this->flush_source.fd = this->transport->fd;
this->flush_source.func = media_on_flush; this->flush_source.func = media_on_flush_error;
this->flush_source.mask = 0; this->flush_source.mask = SPA_IO_ERR | SPA_IO_HUP;
this->flush_source.rmask = 0; this->flush_source.rmask = 0;
spa_loop_add_source(this->data_loop, &this->flush_source); spa_loop_add_source(this->data_loop, &this->flush_source);
this->flush_pending = false;
set_timers(this); set_timers(this);
this->started = true; this->started = true;
@ -1046,9 +1025,6 @@ static int do_remove_source(struct spa_loop *loop,
struct impl *this = user_data; struct impl *this = user_data;
struct itimerspec ts; struct itimerspec ts;
this->start_time = 0;
this->total_samples = 0;
if (this->source.loop) if (this->source.loop)
spa_loop_remove_source(this->data_loop, &this->source); spa_loop_remove_source(this->data_loop, &this->source);
ts.it_value.tv_sec = 0; ts.it_value.tv_sec = 0;
@ -1078,7 +1054,7 @@ static int do_stop(struct impl *this)
if (!this->started) if (!this->started)
return 0; return 0;
spa_log_trace(this->log, "%p: stop", this); spa_log_trace(this->log, "%p: stop", this);
spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this); spa_loop_invoke(this->data_loop, do_remove_source, 0, NULL, 0, true, this);
@ -1275,11 +1251,14 @@ impl_node_port_enum_params(void *object, int seq,
param = spa_pod_builder_add_object(&b, param = spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamBuffers, id, SPA_TYPE_OBJECT_ParamBuffers, id,
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 2, MAX_BUFFERS), SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(
MIN_BUFFERS,
MIN_BUFFERS,
MAX_BUFFERS),
SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(
this->props.min_latency * port->frame_size, this->quantum_limit * port->frame_size,
this->props.min_latency * port->frame_size, 16 * port->frame_size,
INT32_MAX), INT32_MAX),
SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size)); SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size));
break; break;
@ -1539,18 +1518,21 @@ static int impl_node_process(void *object)
io->buffer_id = SPA_ID_INVALID; io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_OK; io->status = SPA_STATUS_OK;
} }
if (!spa_list_is_empty(&port->ready)) {
if (this->following) { if (this->following) {
if (this->position) { if (this->position) {
this->current_time = this->position->clock.nsec; this->current_time = this->position->clock.nsec;
} else { } else {
struct timespec now; struct timespec now;
spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now);
this->current_time = SPA_TIMESPEC_TO_NSEC(&now); this->current_time = SPA_TIMESPEC_TO_NSEC(&now);
}
} }
if (this->need_flush) }
reset_buffer(this);
this->process_time = this->current_time;
if (!spa_list_is_empty(&port->ready)) {
spa_log_trace(this->log, "%p: flush on process", this);
flush_data(this, this->current_time); flush_data(this, this->current_time);
} }
@ -1754,6 +1736,11 @@ impl_init(const struct spa_handle_factory *factory,
spa_list_init(&port->ready); spa_list_init(&port->ready);
this->quantum_limit = 8192;
if (info && (str = spa_dict_lookup(info, "clock.quantum-limit")))
spa_atou32(str, &this->quantum_limit, 0);
if (info && (str = spa_dict_lookup(info, "api.bluez5.a2dp-duplex")) != NULL) if (info && (str = spa_dict_lookup(info, "api.bluez5.a2dp-duplex")) != NULL)
this->is_duplex = spa_atob(str); this->is_duplex = spa_atob(str);