diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h index 8a3bb3d50..babf7657e 100644 --- a/spa/include/spa/node/io.h +++ b/spa/include/spa/node/io.h @@ -49,6 +49,7 @@ enum spa_io_type { SPA_IO_Control, /**< area for control messages */ SPA_IO_Notify, /**< area for notify messages */ SPA_IO_Position, /**< position information in the graph */ + SPA_IO_RateMatch, /**< rate matching between nodes */ }; struct spa_io_buffers { @@ -138,6 +139,13 @@ struct spa_io_position { struct spa_io_position_video video; /**< when mask & SPA_IO_POSITION_FLAG_VIDEO */ }; +/** rate matching */ +struct spa_io_rate_match { + uint32_t delay; /**< extra delay in samples for resampler */ + uint32_t size; /**< requested input size for resampler */ + double rate; /**< rate for resampler */ +}; + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/spa/include/spa/node/type-info.h b/spa/include/spa/node/type-info.h index 4b1c8b8f7..70ba3721e 100644 --- a/spa/include/spa/node/type-info.h +++ b/spa/include/spa/node/type-info.h @@ -47,6 +47,7 @@ static const struct spa_type_info spa_type_io[] = { { SPA_IO_Control, SPA_TYPE_Int, SPA_TYPE_INFO_IO_BASE "Control", NULL }, { SPA_IO_Notify, SPA_TYPE_Int, SPA_TYPE_INFO_IO_BASE "Notify", NULL }, { SPA_IO_Position, SPA_TYPE_Int, SPA_TYPE_INFO_IO_BASE "Position", NULL }, + { SPA_IO_RateMatch, SPA_TYPE_Int, SPA_TYPE_INFO_IO_BASE "RateMatch", NULL }, { 0, 0, NULL, NULL }, }; diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 036ba38d9..3408ce52d 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -401,8 +401,8 @@ impl_node_port_enum_params(void *object, int seq, case 2: param = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_ParamIO, id, - SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Notify), - SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence) + 1024)); + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match))); break; default: return 0; @@ -596,8 +596,8 @@ impl_node_port_set_io(void *object, case SPA_IO_Clock: this->clock = data; break; - case SPA_IO_Notify: - this->notify = data; + case SPA_IO_RateMatch: + this->rate_match = data; break; default: return -ENOENT; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index f085e10f2..f5603c195 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -408,6 +408,12 @@ impl_node_port_enum_params(void *object, int seq, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Clock), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_clock))); break; + case 2: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match))); + break; default: return 0; } @@ -601,6 +607,9 @@ impl_node_port_set_io(void *object, case SPA_IO_Clock: this->clock = data; break; + case SPA_IO_RateMatch: + this->rate_match = data; + break; default: return -ENOENT; } diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 8c2bb8d94..88d20e410 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -606,10 +606,15 @@ static int update_time(struct state *state, uint64_t nsec, snd_pcm_sframes_t del { double err, corr; + if (state->rate_match) { + state->delay = state->rate_match->delay; + state->read_size = state->rate_match->size; + } + if (state->stream == SND_PCM_STREAM_PLAYBACK) - err = delay - state->last_threshold; + err = delay - state->last_threshold + state->delay; else - err = state->last_threshold - delay; + err = state->last_threshold - delay + state->delay * 2; if (state->bw == 0.0) { set_loop(state, BW_MAX); @@ -641,23 +646,17 @@ static int update_time(struct state *state, uint64_t nsec, snd_pcm_sframes_t del err, state->z1, state->z2, state->z3); } - if (slave && state->notify) { - struct spa_pod_builder b = { 0 }; - struct spa_pod_frame f[2]; - spa_pod_builder_init(&b, state->notify, 1024); - spa_pod_builder_push_sequence(&b, &f[0], 0); - spa_pod_builder_control(&b, 0, SPA_CONTROL_Properties); - spa_pod_builder_push_object(&b, &f[1], SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); - spa_pod_builder_prop(&b, SPA_PROP_rate, 0); - spa_pod_builder_double(&b, SPA_CLAMP(corr, 0.95, 1.05)); - spa_pod_builder_pop(&b, &f[1]); - spa_pod_builder_pop(&b, &f[0]); + if (slave && state->rate_match) { + if (state->stream == SND_PCM_STREAM_PLAYBACK) + state->rate_match->rate = SPA_CLAMP(corr, 0.95, 1.05); + else + state->rate_match->rate = SPA_CLAMP(1.0/corr, 0.95, 1.05); } if (!slave && state->clock) { state->clock->nsec = state->next_time; state->clock->rate = SPA_FRACTION(1, state->rate); - state->clock->position = state->sample_count; - state->clock->delay = state->threshold * corr; + state->clock->position += state->size; + state->clock->delay = state->size * corr; state->clock->rate_diff = corr; } @@ -678,8 +677,10 @@ int spa_alsa_write(struct state *state, snd_pcm_uframes_t silence) snd_pcm_uframes_t written, frames, offset, off, to_write, total_written; int res; - if (state->position && state->threshold != state->position->size) - state->threshold = state->position->size; + if (state->position && state->size != state->position->size) { + state->size = state->position->size; + state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom; + } if (state->slaved && state->alsa_started) { uint64_t nsec; @@ -716,7 +717,7 @@ again: spa_log_error(state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(res)); return res; } - spa_log_trace_fp(state->log, "begin %ld %ld %d", offset, frames, state->threshold); + spa_log_trace_fp(state->log, "begin %ld %ld %d %ld", offset, frames, state->threshold, silence); silence = SPA_MIN(silence, frames); to_write = frames; @@ -810,16 +811,17 @@ static snd_pcm_uframes_t push_frames(struct state *state, const snd_pcm_channel_area_t *my_areas, snd_pcm_uframes_t offset, - snd_pcm_uframes_t frames) + snd_pcm_uframes_t frames, + snd_pcm_uframes_t keep) { snd_pcm_uframes_t total_frames = 0; if (spa_list_is_empty(&state->free)) { spa_log_warn(state->log, "%p: no more buffers", state); - total_frames = state->threshold; - } else { + total_frames = frames; + } else if (frames > 0) { uint8_t *src; - size_t n_bytes; + size_t n_bytes, left; struct buffer *b; struct spa_data *d; uint32_t avail, l0, l1; @@ -835,13 +837,13 @@ push_frames(struct state *state, d = b->buf->datas; - avail = d[0].maxsize / state->frame_size; - total_frames = SPA_MIN(avail, state->threshold); + total_frames = SPA_MIN(avail, frames); n_bytes = total_frames * state->frame_size; if (my_areas) { - l0 = SPA_MIN(n_bytes, frames * state->frame_size); + left = state->buffer_frames - offset; + l0 = SPA_MIN(n_bytes, left * state->frame_size); l1 = n_bytes - l0; src = SPA_MEMBER(my_areas[0].addr, offset * state->frame_size, uint8_t); @@ -859,7 +861,7 @@ push_frames(struct state *state, SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); spa_list_append(&state->ready, &b->link); } - return total_frames; + return total_frames - keep; } @@ -871,19 +873,22 @@ int spa_alsa_read(struct state *state, snd_pcm_uframes_t silence) snd_pcm_uframes_t read, frames, offset; int res; - if (state->position) { - uint64_t position; + if (state->position && !state->slaved) { + uint64_t position, size; - if (state->threshold != state->position->size) { - state->threshold = state->position->size; + size = state->position->size; + if (state->size != size) { + state->size = size; + state->threshold = (size * state->rate + state->rate_denom-1) / state->rate_denom; } position = state->position->clock.position; - if (state->last_position && state->last_position + state->last_threshold != position) { + if (state->last_position && state->last_position + state->last_size != position) { state->alsa_sync = true; spa_log_warn(state->log, "discont, resync %"PRIu64" %"PRIu64" %d", - state->last_position, position, state->last_threshold); + state->last_position, position, state->last_size); } state->last_position = position; + state->last_size = size; } if (state->slaved && state->alsa_started) { @@ -894,12 +899,13 @@ int spa_alsa_read(struct state *state, snd_pcm_uframes_t silence) if ((res = get_status(state, &delay)) < 0) return res; + silence = delay; + if (delay < threshold) { spa_log_warn(state->log, "slave delay:%lu resync %f %f %f", delay, state->z1, state->z2, state->z3); init_loop(state); - push_frames(state, NULL, 0, 0); - return 0; + state->alsa_sync = true; } if (state->alsa_sync) { spa_log_warn(state->log, "slave resync %ld %d", delay, threshold); @@ -917,21 +923,27 @@ int spa_alsa_read(struct state *state, snd_pcm_uframes_t silence) return res; } + frames = state->read_size; + if (frames == 0) + frames = state->threshold + state->delay; + + frames = SPA_MIN(frames, silence); + to_read = state->buffer_frames; if ((res = snd_pcm_mmap_begin(hndl, &my_areas, &offset, &to_read)) < 0) { spa_log_error(state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(res)); return res; } - frames = SPA_MIN(to_read, state->threshold); spa_log_trace_fp(state->log, "begin %ld %ld %ld %d", offset, frames, to_read, state->threshold); - read = push_frames(state, my_areas, offset, frames); + read = push_frames(state, my_areas, offset, frames, state->delay); + spa_log_trace_fp(state->log, "commit %ld %ld %"PRIi64, offset, read, state->sample_count); total_read += read; - if ((res = snd_pcm_mmap_commit(hndl, offset, frames)) < 0) { + if ((res = snd_pcm_mmap_commit(hndl, offset, read)) < 0) { spa_log_error(state->log, "snd_pcm_mmap_commit error: %s", snd_strerror(res)); if (res != -EPIPE && res != -ESTRPIPE) return res; @@ -945,10 +957,11 @@ int spa_alsa_read(struct state *state, snd_pcm_uframes_t silence) static int handle_play(struct state *state, uint64_t nsec, snd_pcm_uframes_t delay) { int res; + uint32_t threshold = state->last_threshold + state->delay; - if (delay >= state->last_threshold * 2) { - spa_log_trace(state->log, "early wakeup %ld %d", delay, state->threshold); - state->next_time = nsec + (delay - state->last_threshold) * SPA_NSEC_PER_SEC / state->rate; + if (delay >= threshold + state->last_threshold) { + spa_log_trace(state->log, "early wakeup %ld %d", delay, threshold); + state->next_time = nsec + (delay - threshold) * SPA_NSEC_PER_SEC / state->rate; return -EAGAIN; } @@ -974,17 +987,19 @@ static int handle_capture(struct state *state, uint64_t nsec, snd_pcm_uframes_t { int res; struct spa_io_buffers *io; + uint32_t threshold = state->last_threshold + state->delay * 2; - if (delay < state->last_threshold) { - spa_log_trace(state->log, "early wakeup %ld %d", delay, state->threshold); - state->next_time = nsec + (state->last_threshold - delay) * SPA_NSEC_PER_SEC / state->rate; + if (delay < threshold) { + spa_log_trace(state->log, "early wakeup %ld %d", delay, threshold); + state->next_time = nsec + (threshold - delay) * SPA_NSEC_PER_SEC / + state->rate; return 0; } if ((res = update_time(state, nsec, delay, false)) < 0) return res; - if ((res = spa_alsa_read(state, 0)) < 0) + if ((res = spa_alsa_read(state, delay)) < 0) return res; if (!spa_list_is_empty(&state->ready)) { @@ -996,8 +1011,8 @@ static int handle_capture(struct state *state, uint64_t nsec, snd_pcm_uframes_t io->buffer_id = b->id; io->status = SPA_STATUS_HAVE_BUFFER; } - spa_node_call_ready(&state->callbacks, SPA_STATUS_HAVE_BUFFER); } + spa_node_call_ready(&state->callbacks, SPA_STATUS_HAVE_BUFFER); return 0; } @@ -1012,8 +1027,10 @@ static void alsa_on_timeout_event(struct spa_source *source) if (state->started && spa_system_timerfd_read(state->data_system, state->timerfd, &expire) < 0) spa_log_warn(state->log, "error reading timerfd: %m"); - if (state->position) - state->threshold = state->position->size; + if (state->position) { + state->size = state->position->size; + state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom; + } spa_system_clock_gettime(state->data_system, CLOCK_MONOTONIC, &state->now); if ((res = get_status(state, &delay)) < 0) @@ -1076,12 +1093,18 @@ int spa_alsa_start(struct state *state) if (state->started) return 0; - if (state->position) - state->threshold = state->position->size; - else - state->threshold = state->props.min_latency; - state->slaved = is_slaved(state); + + if (state->position) { + state->size = state->position->size; + state->rate_denom = state->position->clock.rate.denom; + } + else { + state->size = state->props.min_latency; + state->rate_denom = state->rate; + } + + state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom; state->last_threshold = state->threshold; init_loop(state); diff --git a/spa/plugins/alsa/alsa-utils.h b/spa/plugins/alsa/alsa-utils.h index 94991f2f1..1d5bee636 100644 --- a/spa/plugins/alsa/alsa-utils.h +++ b/spa/plugins/alsa/alsa-utils.h @@ -102,6 +102,9 @@ struct state { int rate; int channels; size_t frame_size; + int rate_denom; + int delay; + int read_size; uint64_t port_info_all; struct spa_port_info port_info; @@ -109,7 +112,7 @@ struct state { struct spa_io_buffers *io; struct spa_io_clock *clock; struct spa_io_position *position; - struct spa_io_sequence *notify; + struct spa_io_rate_match *rate_match; struct buffer buffers[MAX_BUFFERS]; unsigned int n_buffers; @@ -124,6 +127,9 @@ struct state { int timerfd; uint32_t threshold; uint32_t last_threshold; + + uint32_t size; + uint32_t last_size; uint64_t last_position; unsigned int alsa_started:1; unsigned int alsa_sync:1; diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index aef5353e0..a9e9620a5 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -721,6 +721,12 @@ impl_node_port_enum_params(void *object, int seq, SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Control), SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_sequence))); break; + case 2: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_RateMatch), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_rate_match))); + break; default: return 0; } @@ -850,8 +856,10 @@ impl_node_port_set_io(void *object, spa_log_debug(this->log, "set io %d %d %d", id, direction, port_id); switch (id) { - case SPA_IO_Control: + case SPA_IO_RateMatch: res = spa_node_port_set_io(this->resample, direction, 0, id, data, size); + break; + case SPA_IO_Control: res = spa_node_port_set_io(this->channelmix, direction, 0, id, data, size); break; default: diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index e08ab567f..4cb310b15 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -79,7 +79,6 @@ struct port { struct spa_param_info params[8]; struct spa_io_buffers *io; - struct spa_io_sequence *io_control; struct spa_audio_info format; uint32_t stride; @@ -102,6 +101,7 @@ struct impl { struct spa_cpu *cpu; struct spa_io_position *io_position; + struct spa_io_rate_match *io_rate_match; uint64_t info_all; struct spa_node_info info; @@ -656,8 +656,8 @@ impl_node_port_set_io(void *object, case SPA_IO_Buffers: port->io = data; break; - case SPA_IO_Control: - port->io_control = data; + case SPA_IO_RateMatch: + this->io_rate_match = data; break; default: return -ENOENT; @@ -707,22 +707,6 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t return 0; } -static int process_control(struct impl *this, struct port *port, struct spa_pod_sequence *sequence) -{ - struct spa_pod_control *c; - - SPA_POD_SEQUENCE_FOREACH(sequence, c) { - switch (c->type) { - case SPA_CONTROL_Properties: - apply_props(this, (const struct spa_pod *) &c->value); - break; - default: - break; - } - } - return 0; -} - static int impl_node_process(void *object) { struct impl *this = object; @@ -735,6 +719,7 @@ static int impl_node_process(void *object) const void **src_datas; void **dst_datas; bool flush_out = false; + bool flush_in = false; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -750,9 +735,6 @@ static int impl_node_process(void *object) spa_log_trace_fp(this->log, NAME " %p: status %d %d %d", this, inio->status, outio->status, inio->buffer_id); - if (outport->io_control) - process_control(this, outport, &outport->io_control->sequence); - if (outio->status == SPA_STATUS_HAVE_BUFFER) return SPA_STATUS_HAVE_BUFFER; @@ -788,6 +770,7 @@ static int impl_node_process(void *object) switch (this->mode) { case MODE_SPLIT: maxsize = SPA_MIN(maxsize, max * sizeof(float)); + flush_out = flush_in = this->io_rate_match != NULL; break; case MODE_MERGE: default: @@ -821,11 +804,13 @@ static int impl_node_process(void *object) } inport->offset += in_len * sizeof(float); - if (inport->offset >= size) { + if (inport->offset >= size || flush_in) { inio->status = SPA_STATUS_NEED_BUFFER; inport->offset = 0; SPA_FLAG_SET(res, SPA_STATUS_NEED_BUFFER); + spa_log_trace_fp(this->log, NAME " %p: return input buffer", this); } + outport->offset += out_len * sizeof(float); if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) { outio->status = SPA_STATUS_HAVE_BUFFER; @@ -833,6 +818,13 @@ static int impl_node_process(void *object) dequeue_buffer(this, dbuf); outport->offset = 0; SPA_FLAG_SET(res, SPA_STATUS_HAVE_BUFFER); + spa_log_trace_fp(this->log, NAME " %p: have output buffer", this); + } + + if (this->io_rate_match) { + resample_update_rate(&this->resample, this->io_rate_match->rate); + this->io_rate_match->delay = resample_delay(&this->resample); + this->io_rate_match->size = resample_in_len(&this->resample, max); } return res; } diff --git a/src/examples/media-session.c b/src/examples/media-session.c index 390d5cdd0..14a3195b6 100644 --- a/src/examples/media-session.c +++ b/src/examples/media-session.c @@ -1113,6 +1113,7 @@ do_link_profile: peer->format.channels, node->format.channels, audio_info.channels); + audio_info.rate = DEFAULT_SAMPLERATE; node->profile_format = audio_info; spa_pod_builder_init(&b, buf, sizeof(buf)); @@ -1190,6 +1191,7 @@ static void rescan_session(struct impl *impl, struct session *sess) } info = node->format; + info.rate = DEFAULT_SAMPLERATE; props = pw_properties_new_dict(node->info->props); if ((str = pw_properties_get(props, PW_KEY_DEVICE_NICK)) == NULL) diff --git a/src/modules/module-adapter/adapter.c b/src/modules/module-adapter/adapter.c index a084b04e8..599971b76 100644 --- a/src/modules/module-adapter/adapter.c +++ b/src/modules/module-adapter/adapter.c @@ -140,7 +140,7 @@ struct impl { uint32_t n_buffers; struct pw_memblock *mem; - uint8_t control_buffer[1024]; + struct spa_io_rate_match rate_match; }; /** \endcond */ @@ -213,17 +213,20 @@ static void try_link_controls(struct impl *impl) pw_log_warn(NAME " %p: controls", impl); + spa_zero(impl->rate_match); + impl->rate_match.rate = 1.0; + if ((res = spa_node_port_set_io(impl->slave_node, impl->direction, 0, - SPA_IO_Notify, - impl->control_buffer, sizeof(impl->control_buffer))) < 0) { - pw_log_warn(NAME " %p: set Notify on slave failed %d %s", impl, + SPA_IO_RateMatch, + &impl->rate_match, sizeof(impl->rate_match))) < 0) { + pw_log_warn(NAME " %p: set RateMatch on slave failed %d %s", impl, res, spa_strerror(res)); } if ((res = spa_node_port_set_io(impl->adapter, SPA_DIRECTION_REVERSE(impl->direction), 0, - SPA_IO_Control, - impl->control_buffer, sizeof(impl->control_buffer))) < 0) { + SPA_IO_RateMatch, + &impl->rate_match, sizeof(impl->rate_match))) < 0) { pw_log_warn(NAME " %p: set Control on adapter failed %d %s", impl, res, spa_strerror(res)); } @@ -694,7 +697,7 @@ static int negotiate_buffers(struct impl *impl) return res; } if (out_alloc) { - if ((res = spa_node_port_alloc_buffers(impl->slave_port->mix, + if ((res = spa_node_port_alloc_buffers(impl->slave_node, impl->direction, 0, NULL, 0, impl->buffers, &impl->n_buffers)) < 0) { @@ -702,17 +705,10 @@ static int negotiate_buffers(struct impl *impl) } } else { - if ((res = spa_node_port_use_buffers(impl->slave_port->mix, + if ((res = spa_node_port_use_buffers(impl->slave_node, impl->direction, 0, impl->buffers, impl->n_buffers)) < 0) { - - if (res != -ENOTSUP) - return res; - - if ((res = spa_node_port_use_buffers(impl->slave_port->node->node, - impl->direction, 0, - impl->buffers, impl->n_buffers)) < 0) - return res; + return res; } } @@ -894,8 +890,12 @@ static int impl_node_process(void *object) status = SPA_STATUS_HAVE_BUFFER; } - impl->slave->rt.target.signal(impl->slave->rt.target.data); + status = spa_node_process(impl->slave_node); + if (impl->direction == SPA_DIRECTION_OUTPUT && !impl->this->master) { + if (impl->use_converter) + status = spa_node_process(impl->adapter); + } return status; }