From e1cb7c6fb2977c4e20e1eb6bc60d061ad9896e9f Mon Sep 17 00:00:00 2001 From: Pauli Virtanen Date: Sat, 18 Jun 2022 20:03:41 +0300 Subject: [PATCH] bluez5: sco-source: implement sco-source the same way as a2dp-source --- spa/plugins/bluez5/sco-source.c | 480 +++++++++++++++++++++----------- 1 file changed, 310 insertions(+), 170 deletions(-) diff --git a/spa/plugins/bluez5/sco-source.c b/spa/plugins/bluez5/sco-source.c index 1819029b8..05726d77f 100644 --- a/spa/plugins/bluez5/sco-source.c +++ b/spa/plugins/bluez5/sco-source.c @@ -58,11 +58,11 @@ static struct spa_log_topic log_topic = SPA_LOG_TOPIC(0, "spa.bluez5.source.sco" #undef SPA_LOG_TOPIC_DEFAULT #define SPA_LOG_TOPIC_DEFAULT &log_topic +#include "decode-buffer.h" + #define DEFAULT_CLOCK_NAME "clock.system.monotonic" struct props { - uint32_t min_latency; - uint32_t max_latency; char clock_name[64]; }; @@ -101,8 +101,7 @@ struct port { struct spa_list free; struct spa_list ready; - struct buffer *current_buffer; - uint32_t ready_offset; + struct spa_bt_decode_buffer buffer; }; struct impl { @@ -116,6 +115,8 @@ struct impl { struct spa_hook_list hooks; struct spa_callbacks callbacks; + uint32_t quantum_limit; + uint64_t info_all; struct spa_node_info info; #define IDX_PropInfo 0 @@ -132,10 +133,18 @@ struct impl { unsigned int started:1; unsigned int following:1; + unsigned int matching:1; + unsigned int resampling:1; + + struct spa_source timer_source; + int timerfd; struct spa_io_clock *clock; struct spa_io_position *position; + uint64_t current_time; + uint64_t next_time; + /* mSBC */ sbc_t msbc; bool msbc_seq_initialized; @@ -150,13 +159,8 @@ struct impl { #define CHECK_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) -static const uint32_t default_min_latency = 128; -static const uint32_t default_max_latency = 512; - static void reset_props(struct props *props) { - props->min_latency = default_min_latency; - props->max_latency = default_max_latency; strncpy(props->clock_name, DEFAULT_CLOCK_NAME, sizeof(props->clock_name)); } @@ -184,23 +188,7 @@ static int impl_node_enum_params(void *object, int seq, switch (id) { case SPA_PARAM_PropInfo: { - struct props *p = &this->props; - switch (result.index) { - case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_minLatency), - SPA_PROP_INFO_description, SPA_POD_String("The minimum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->min_latency, 1, INT32_MAX)); - break; - case 1: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_PropInfo, id, - SPA_PROP_INFO_id, SPA_POD_Id(SPA_PROP_maxLatency), - SPA_PROP_INFO_description, SPA_POD_String("The maximum latency"), - SPA_PROP_INFO_type, SPA_POD_CHOICE_RANGE_Int(p->max_latency, 1, INT32_MAX)); - break; default: return 0; } @@ -208,15 +196,7 @@ static int impl_node_enum_params(void *object, int seq, } case SPA_PARAM_Props: { - struct props *p = &this->props; - switch (result.index) { - case 0: - param = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_Props, id, - SPA_PROP_minLatency, SPA_POD_Int(p->min_latency), - SPA_PROP_maxLatency, SPA_POD_Int(p->max_latency)); - break; default: return 0; } @@ -237,6 +217,41 @@ static int impl_node_enum_params(void *object, int seq, return 0; } +static int set_timeout(struct impl *this, uint64_t time) +{ + struct itimerspec ts; + ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; + ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + return spa_system_timerfd_settime(this->data_system, + this->timerfd, SPA_FD_TIMER_ABSTIME, &ts, NULL); +} + +static int set_timers(struct impl *this) +{ + struct timespec now; + + spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &now); + this->next_time = SPA_TIMESPEC_TO_NSEC(&now); + + return set_timeout(this, this->following ? 0 : this->next_time); +} + +static int do_reassign_follower(struct spa_loop *loop, + bool async, + uint32_t seq, + const void *data, + size_t size, + void *user_data) +{ + struct impl *this = user_data; + struct port *port = &this->port; + + spa_bt_decode_buffer_recover(&port->buffer); + return 0; +} + static inline bool is_following(struct impl *this) { return this->position && this->clock && this->position->clock.id != this->clock->id; @@ -269,6 +284,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) if (this->started && following != this->following) { spa_log_debug(this->log, "%p: reassign follower %d->%d", this, this->following, following); this->following = following; + spa_loop_invoke(this->data_loop, do_reassign_follower, 0, NULL, 0, true, this); } return 0; @@ -284,10 +300,7 @@ static int apply_props(struct impl *this, const struct spa_pod *param) if (param == NULL) { reset_props(&new_props); } else { - spa_pod_parse_object(param, - SPA_TYPE_OBJECT_Props, NULL, - SPA_PROP_minLatency, SPA_POD_OPT_Int(&new_props.min_latency), - SPA_PROP_maxLatency, SPA_POD_OPT_Int(&new_props.max_latency)); + /* noop */ } changed = (memcmp(&new_props, &this->props, sizeof(struct props)) != 0); @@ -326,8 +339,6 @@ static void reset_buffers(struct port *port) spa_list_init(&port->free); spa_list_init(&port->ready); - port->current_buffer = NULL; - for (i = 0; i < port->n_buffers; i++) { struct buffer *b = &port->buffers[i]; spa_list_append(&port->free, &b->link); @@ -422,116 +433,107 @@ static bool is_zero_packet(uint8_t *data, int size) return true; } -static void preprocess_and_decode_msbc_data(void *userdata, uint8_t *read_data, int size_read) +static uint32_t preprocess_and_decode_msbc_data(void *userdata, uint8_t *read_data, int size_read) { struct impl *this = userdata; struct port *port = &this->port; - struct spa_data *datas = port->current_buffer->buf->datas; + uint32_t decoded = 0; + int i; spa_log_trace(this->log, "handling mSBC data"); - /* check if the packet contains only zeros - if so ignore the packet. - This is necessary, because some kernels insert bogus "all-zero" packets - into the datastream. - See https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/549 */ - if (is_zero_packet(read_data, size_read)) { - return; - } + /* + * Check if the packet contains only zeros - if so ignore the packet. + * This is necessary, because some kernels insert bogus "all-zero" packets + * into the datastream. + * See https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/549 + */ + if (is_zero_packet(read_data, size_read)) + return 0; - int i; for (i = 0; i < size_read; ++i) { + void *buf; + uint32_t avail; + int seq, processed; + size_t written; + msbc_buffer_append_byte(this, read_data[i]); - /* Handle found mSBC packets. - * - * XXX: if there's no space for the decoded audio in - * XXX: the current buffer, we'll drop data. + if (this->msbc_buffer_pos != MSBC_ENCODED_SIZE) + continue; + + /* + * Handle found mSBC packet */ - if (this->msbc_buffer_pos == MSBC_ENCODED_SIZE) { - spa_log_trace(this->log, "Received full mSBC packet, start processing it"); - if (port->ready_offset + MSBC_DECODED_SIZE <= datas[0].maxsize) { - int seq, processed; - size_t written; - spa_log_trace(this->log, - "Output buffer has space, processing mSBC packet"); + buf = spa_bt_decode_buffer_get_write(&port->buffer, &avail); - /* Check sequence number */ - seq = ((this->msbc_buffer[1] >> 4) & 1) | - ((this->msbc_buffer[1] >> 6) & 2); + /* Check sequence number */ + seq = ((this->msbc_buffer[1] >> 4) & 1) | + ((this->msbc_buffer[1] >> 6) & 2); - spa_log_trace(this->log, "mSBC packet seq=%u", seq); - if (!this->msbc_seq_initialized) { - this->msbc_seq_initialized = true; - this->msbc_seq = seq; - } else if (seq != this->msbc_seq) { - spa_log_info(this->log, - "missing mSBC packet: %u != %u", seq, this->msbc_seq); - this->msbc_seq = seq; - /* TODO: Implement PLC. */ - } - this->msbc_seq = (this->msbc_seq + 1) % 4; - - /* decode frame */ - processed = sbc_decode( - &this->msbc, this->msbc_buffer + 2, MSBC_ENCODED_SIZE - 3, - (uint8_t *)datas[0].data + port->ready_offset, MSBC_DECODED_SIZE, - &written); - - if (processed < 0) { - spa_log_warn(this->log, "sbc_decode failed: %d", processed); - /* TODO: manage errors */ - continue; - } - - port->ready_offset += written; - - } else { - spa_log_warn(this->log, "Output buffer full, dropping mSBC packet"); - } + spa_log_trace(this->log, "mSBC packet seq=%u", seq); + if (!this->msbc_seq_initialized) { + this->msbc_seq_initialized = true; + this->msbc_seq = seq; + } else if (seq != this->msbc_seq) { + /* TODO: PLC (too late to insert data now) */ + spa_log_info(this->log, + "missing mSBC packet: %u != %u", seq, this->msbc_seq); + this->msbc_seq = seq; } + + this->msbc_seq = (this->msbc_seq + 1) % 4; + + if (avail < MSBC_DECODED_SIZE) + spa_log_warn(this->log, "Output buffer full, dropping msbc data"); + + /* decode frame */ + processed = sbc_decode( + &this->msbc, this->msbc_buffer + 2, MSBC_ENCODED_SIZE - 3, + buf, avail, &written); + + if (processed < 0) { + spa_log_warn(this->log, "sbc_decode failed: %d", processed); + /* TODO: manage errors */ + continue; + } + + spa_bt_decode_buffer_write_packet(&port->buffer, written); + decoded += written; } + + return decoded; } static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read) { struct impl *this = userdata; struct port *port = &this->port; - struct spa_io_buffers *io = port->io; - struct spa_data *datas; - uint32_t min_data; + uint32_t decoded; + uint64_t dt; if (this->transport == NULL) { spa_log_debug(this->log, "no transport, stop reading"); goto stop; } - /* get buffer */ - if (!port->current_buffer) { - if (spa_list_is_empty(&port->free)) { - spa_log_warn(this->log, "buffer not available"); - return 0; - } - port->current_buffer = spa_list_first(&port->free, struct buffer, link); - spa_list_remove(&port->current_buffer->link); - port->ready_offset = 0; - } - datas = port->current_buffer->buf->datas; - /* update the current pts */ + dt = SPA_TIMESPEC_TO_NSEC(&this->now); spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &this->now); + dt = SPA_TIMESPEC_TO_NSEC(&this->now) - dt; /* handle data read from socket */ - spa_log_trace(this->log, "read socket data %d", size_read); #if 0 hexdump_to_log(this, read_data, size_read); #endif if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { - preprocess_and_decode_msbc_data(userdata, read_data, size_read); - + decoded = preprocess_and_decode_msbc_data(userdata, read_data, size_read); } else { + uint32_t avail; uint8_t *packet; + if (size_read != 48 && is_zero_packet(read_data, size_read)) { /* Adapter is returning non-standard CVSD stream. For example * Intel 8087:0029 at Firmware revision 0.0 build 191 week 21 2021 @@ -539,64 +541,94 @@ static int sco_source_cb(void *userdata, uint8_t *read_data, int size_read) */ return 0; } - packet = (uint8_t *)datas[0].data + port->ready_offset; - spa_memmove(packet, read_data, size_read); - port->ready_offset += size_read; + + packet = spa_bt_decode_buffer_get_write(&port->buffer, &avail); + avail = SPA_MIN(avail, (uint32_t)size_read); + spa_memmove(packet, read_data, avail); + spa_bt_decode_buffer_write_packet(&port->buffer, avail); + + decoded = avail; } - /* send buffer if full */ - min_data = SPA_MIN(this->props.min_latency * port->frame_size, datas[0].maxsize / 2); - if (port->ready_offset >= min_data) { - uint64_t sample_count; + spa_log_trace(this->log, "read socket data size:%d decoded frames:%d dt:%d dms", + size_read, decoded / port->frame_size, + (int)(dt / 100000)); - datas[0].chunk->offset = 0; - datas[0].chunk->size = port->ready_offset; - datas[0].chunk->stride = port->frame_size; - - sample_count = datas[0].chunk->size / port->frame_size; - spa_list_append(&port->ready, &port->current_buffer->link); - port->current_buffer = NULL; - - if (!this->following && this->clock) { - this->clock->nsec = SPA_TIMESPEC_TO_NSEC(&this->now); - this->clock->duration = sample_count * this->clock->rate.denom / port->current_format.info.raw.rate; - this->clock->position += this->clock->duration; - this->clock->delay = 0; - this->clock->rate_diff = 1.0f; - this->clock->next_nsec = this->clock->nsec; - } - } - - /* done if there are no buffers ready */ - if (spa_list_is_empty(&port->ready)) - return 0; - - if (this->following) - return 0; - - /* process the buffer if IO does not have any */ - if (io->status != SPA_STATUS_HAVE_DATA) { - struct buffer *b; - - if (io->buffer_id < port->n_buffers) - recycle_buffer(this, port, io->buffer_id); - - b = spa_list_first(&port->ready, struct buffer, link); - spa_list_remove(&b->link); - b->outstanding = true; - - io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; - } - - /* notify ready */ - spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); return 0; stop: return 1; } +static int setup_matching(struct impl *this) +{ + struct port *port = &this->port; + + if (this->position && port->rate_match) { + port->rate_match->rate = 1 / port->buffer.corr; + + this->matching = this->following; + this->resampling = this->matching || + (port->current_format.info.raw.rate != this->position->clock.rate.denom); + } else { + this->matching = false; + this->resampling = false; + } + + if (port->rate_match) + SPA_FLAG_UPDATE(port->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, this->matching); + + return 0; +} + +static void sco_on_timeout(struct spa_source *source) +{ + struct impl *this = source->data; + struct port *port = &this->port; + uint64_t exp, duration; + uint32_t rate; + struct spa_io_buffers *io = port->io; + uint64_t prev_time, now_time; + + if (this->transport == NULL) + return; + + if (this->started && spa_system_timerfd_read(this->data_system, this->timerfd, &exp) < 0) + spa_log_warn(this->log, "error reading timerfd: %s", strerror(errno)); + + prev_time = this->current_time; + now_time = this->current_time = this->next_time; + + spa_log_trace(this->log, "%p: timer %"PRIu64" %"PRIu64"", this, + now_time, now_time - prev_time); + + if (SPA_LIKELY(this->position)) { + duration = this->position->clock.duration; + rate = this->position->clock.rate.denom; + } else { + duration = 1024; + rate = 48000; + } + + setup_matching(this); + + this->next_time = now_time + duration * SPA_NSEC_PER_SEC / port->buffer.corr / rate; + + if (SPA_LIKELY(this->clock)) { + this->clock->nsec = now_time; + this->clock->position += duration; + this->clock->duration = duration; + this->clock->rate_diff = port->buffer.corr; + this->clock->next_nsec = this->next_time; + } + + spa_log_trace(this->log, "%p: %d", this, io->status); + io->status = SPA_STATUS_HAVE_DATA; + spa_node_call_ready(&this->callbacks, SPA_STATUS_HAVE_DATA); + + set_timeout(this, this->next_time); +} + static int do_add_source(struct spa_loop *loop, bool async, uint32_t seq, @@ -613,6 +645,7 @@ static int do_add_source(struct spa_loop *loop, static int do_start(struct impl *this) { + struct port *port = &this->port; bool do_accept; int res; @@ -636,7 +669,13 @@ static int do_start(struct impl *this) return res; /* Reset the buffers and sample count */ - reset_buffers(&this->port); + reset_buffers(port); + + spa_bt_decode_buffer_clear(&port->buffer); + if ((res = spa_bt_decode_buffer_init(&port->buffer, this->log, + port->frame_size, port->current_format.info.raw.rate, + this->quantum_limit, this->quantum_limit)) < 0) + return res; /* Init mSBC if needed */ if (this->transport->codec == HFP_AUDIO_CODEC_MSBC) { @@ -653,6 +692,17 @@ static int do_start(struct impl *this) goto fail; spa_loop_invoke(this->data_loop, do_add_source, 0, NULL, 0, true, this); + /* Start timer */ + this->timer_source.data = this; + this->timer_source.fd = this->timerfd; + this->timer_source.func = sco_on_timeout; + this->timer_source.mask = SPA_IO_IN; + this->timer_source.rmask = 0; + spa_loop_add_source(this->data_loop, &this->timer_source); + + setup_matching(this); + set_timers(this); + /* Set the started flag */ this->started = true; @@ -671,15 +721,25 @@ static int do_remove_source(struct spa_loop *loop, void *user_data) { struct impl *this = user_data; + struct itimerspec ts; if (this->transport && this->transport->sco_io) spa_bt_sco_io_set_source_cb(this->transport->sco_io, NULL, NULL); + if (this->timer_source.loop) + spa_loop_remove_source(this->data_loop, &this->timer_source); + ts.it_value.tv_sec = 0; + ts.it_value.tv_nsec = 0; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + spa_system_timerfd_settime(this->data_system, this->timerfd, 0, &ts, NULL); + return 0; } static int do_stop(struct impl *this) { + struct port *port = &this->port; int res = 0; if (!this->started) @@ -696,6 +756,8 @@ static int do_stop(struct impl *this) res = spa_bt_transport_release(this->transport); } + spa_bt_decode_buffer_clear(&port->buffer); + return res; } @@ -738,12 +800,9 @@ static void emit_node_info(struct impl *this, bool full) { SPA_KEY_MEDIA_CLASS, "Audio/Source" }, { SPA_KEY_NODE_DRIVER, "true" }, }; - - char latency[64] = "128/8000"; const struct spa_dict_item ag_node_info_items[] = { { SPA_KEY_DEVICE_API, "bluez5" }, { SPA_KEY_MEDIA_CLASS, "Stream/Output/Audio" }, - { SPA_KEY_NODE_LATENCY, latency }, { "media.name", ((this->transport && this->transport->device->name) ? this->transport->device->name : "HSP/HFP") }, }; @@ -753,9 +812,6 @@ static void emit_node_info(struct impl *this, bool full) if (full) this->info.change_mask = this->info_all; if (this->info.change_mask) { - if (this->transport && this->port.have_format) - snprintf(latency, sizeof(latency), "%d/%d", (int)this->props.min_latency, - (int)this->port.current_format.info.raw.rate); this->info.props = is_ag ? &SPA_DICT_INIT_ARRAY(ag_node_info_items) : &SPA_DICT_INIT_ARRAY(hu_node_info_items); @@ -902,11 +958,11 @@ impl_node_port_enum_params(void *object, int seq, param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamBuffers, id, - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 8, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(2, 1, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - this->props.max_latency * port->frame_size, - this->props.min_latency * port->frame_size, + this->quantum_limit * port->frame_size, + 16 * port->frame_size, INT32_MAX), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(port->frame_size)); break; @@ -976,7 +1032,6 @@ static int clear_buffers(struct impl *this, struct port *port) spa_list_init(&port->ready); port->n_buffers = 0; } - port->current_buffer = NULL; return 0; } @@ -1147,6 +1202,79 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t return 0; } +static uint32_t get_samples(struct impl *this, uint32_t *duration) +{ + struct port *port = &this->port; + uint32_t samples; + + if (SPA_LIKELY(port->rate_match) && this->resampling) { + samples = port->rate_match->size; + } else { + if (SPA_LIKELY(this->position)) + samples = this->position->clock.duration * port->current_format.info.raw.rate + / this->position->clock.rate.denom; + else + samples = 1024; + } + + if (SPA_LIKELY(this->position)) + *duration = this->position->clock.duration * port->current_format.info.raw.rate + / this->position->clock.rate.denom; + else if (SPA_LIKELY(this->clock)) + *duration = this->clock->duration * port->current_format.info.raw.rate + / this->clock->rate.denom; + else + *duration = 1024 * port->current_format.info.raw.rate / 48000; + + return samples; +} + +static void process_buffering(struct impl *this) +{ + struct port *port = &this->port; + uint32_t duration; + const uint32_t samples = get_samples(this, &duration); + void *buf; + uint32_t avail; + + spa_bt_decode_buffer_process(&port->buffer, samples, duration); + + setup_matching(this); + + buf = spa_bt_decode_buffer_get_read(&port->buffer, &avail); + + /* copy data to buffers */ + if (!spa_list_is_empty(&port->free) && avail > 0) { + struct buffer *buffer; + struct spa_data *datas; + uint32_t data_size; + + data_size = samples * port->frame_size; + + avail = SPA_MIN(avail, data_size); + + spa_bt_decode_buffer_read(&port->buffer, avail); + + buffer = spa_list_first(&port->free, struct buffer, link); + spa_list_remove(&buffer->link); + + spa_log_trace(this->log, "dequeue %d", buffer->id); + + datas = buffer->buf->datas; + + spa_assert(datas[0].maxsize >= data_size); + + datas[0].chunk->offset = 0; + datas[0].chunk->size = avail; + datas[0].chunk->stride = port->frame_size; + memcpy(datas[0].data, buf, avail); + + /* ready buffer if full */ + spa_log_trace(this->log, "queue %d frames:%d", buffer->id, (int)avail / port->frame_size); + spa_list_append(&port->ready, &buffer->link); + } +} + static int impl_node_process(void *object) { struct impl *this = object; @@ -1170,6 +1298,9 @@ static int impl_node_process(void *object) io->buffer_id = SPA_ID_INVALID; } + /* Produce data */ + process_buffering(this); + /* Return if there are no buffers ready to be processed */ if (spa_list_is_empty(&port->ready)) return SPA_STATUS_OK; @@ -1252,6 +1383,8 @@ static int impl_clear(struct spa_handle *handle) struct impl *this = (struct impl *) handle; if (this->transport) spa_hook_remove(&this->transport_listener); + spa_system_close(this->data_system, this->timerfd); + spa_bt_decode_buffer_clear(&this->port.buffer); return 0; } @@ -1341,6 +1474,10 @@ impl_init(const struct spa_handle_factory *factory, spa_list_init(&port->ready); spa_list_init(&port->free); + this->quantum_limit = 8192; + if (info && (str = spa_dict_lookup(info, "clock.quantum-limit"))) + spa_atou32(str, &this->quantum_limit, 0); + if (info && (str = spa_dict_lookup(info, SPA_KEY_API_BLUEZ5_TRANSPORT))) sscanf(str, "pointer:%p", &this->transport); @@ -1351,6 +1488,9 @@ impl_init(const struct spa_handle_factory *factory, spa_bt_transport_add_listener(this->transport, &this->transport_listener, &transport_events, this); + this->timerfd = spa_system_timerfd_create(this->data_system, + CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); + return 0; }