From 1ea54ac9c5a02a3308db4d6bcc4dac26d9da4dcb Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 2 Aug 2018 10:31:29 +0200 Subject: [PATCH] pulse: improve remote sync Add an explicit method to make the operation to a roundtrip to sync pending actions. Implement drain. --- src/context.c | 26 +++++++++++++++++--------- src/internal.h | 3 +++ src/introspect.c | 13 +++++++++++++ src/operation.c | 14 ++++++++++---- src/stream.c | 28 +++++++++++++++++++++++++++- 5 files changed, 70 insertions(+), 14 deletions(-) diff --git a/src/context.c b/src/context.c index 56990cde2..47ad90353 100644 --- a/src/context.c +++ b/src/context.c @@ -271,28 +271,32 @@ static void remote_state_changed(void *data, enum pw_remote_state old, o = pa_operation_new(c, NULL, on_ready, sizeof(struct ready_data)); d = o->userdata; d->context = c; + pa_operation_sync(o); pa_operation_unref(o); break; } } -static void remote_sync_reply(void *data, uint32_t seq) +static void complete_operations(pa_context *c, uint32_t seq) { - pa_context *c = data; - pa_operation *o; - - pw_log_debug("done %d", seq); - spa_list_for_each(o, &c->operations, link) { + pa_operation *o, *t; + spa_list_for_each_safe(o, t, &c->operations, link) { if (o->seq != seq) continue; pa_operation_ref(o); if (o->callback) o->callback(o, o->userdata); pa_operation_unref(o); - break; } } +static void remote_sync_reply(void *data, uint32_t seq) +{ + pa_context *c = data; + pw_log_debug("done %d", seq); + complete_operations(c, seq); +} + static const struct pw_remote_events remote_events = { PW_VERSION_REMOTE_EVENTS, .state_changed = remote_state_changed, @@ -466,9 +470,10 @@ struct notify_data { static void on_notify(pa_operation *o, void *userdata) { struct notify_data *d = userdata; + pa_context *c = o->context; pa_operation_done(o); if (d->cb) - d->cb(o->context, d->userdata); + d->cb(c, d->userdata); } pa_operation* pa_context_drain(pa_context *c, pa_context_notify_cb_t cb, void *userdata) @@ -480,6 +485,7 @@ pa_operation* pa_context_drain(pa_context *c, pa_context_notify_cb_t cb, void *u d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -493,9 +499,10 @@ struct success_data { static void on_success(pa_operation *o, void *userdata) { struct success_data *d = userdata; + pa_context *c = o->context; pa_operation_done(o); if (d->cb) - d->cb(o->context, d->ret, d->userdata); + d->cb(c, d->ret, d->userdata); } pa_operation* pa_context_exit_daemon(pa_context *c, pa_context_success_cb_t cb, void *userdata) @@ -556,6 +563,7 @@ pa_operation* pa_context_set_name(pa_context *c, const char *name, pa_context_su d->ret = PA_ERR_ACCESS; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } diff --git a/src/internal.h b/src/internal.h index a478cab63..0b9b6d681 100644 --- a/src/internal.h +++ b/src/internal.h @@ -358,6 +358,8 @@ struct pa_stream { uint32_t buffer_offset; float volume; + pa_operation *drain; + uint64_t queued; }; void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); @@ -385,6 +387,7 @@ struct pa_operation pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb, size_t userdata_size); void pa_operation_done(pa_operation *o); +int pa_operation_sync(pa_operation *o); #ifdef __cplusplus } diff --git a/src/introspect.c b/src/introspect.c index 91aca28e7..0d9dcb5e5 100644 --- a/src/introspect.c +++ b/src/introspect.c @@ -212,6 +212,7 @@ pa_operation* pa_context_get_sink_info_by_name(pa_context *c, const char *name, d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -241,6 +242,7 @@ pa_operation* pa_context_get_sink_info_by_index(pa_context *c, uint32_t idx, pa_ d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -277,6 +279,7 @@ pa_operation* pa_context_get_sink_info_list(pa_context *c, pa_sink_info_cb_t cb, d->context = c; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -418,6 +421,7 @@ pa_operation* pa_context_get_source_info_by_index(pa_context *c, uint32_t idx, p d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -454,6 +458,7 @@ pa_operation* pa_context_get_source_info_list(pa_context *c, pa_source_info_cb_t d->context = c; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -568,6 +573,7 @@ pa_operation* pa_context_get_module_info(pa_context *c, uint32_t idx, pa_module_ d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -605,6 +611,7 @@ pa_operation* pa_context_get_module_info_list(pa_context *c, pa_module_info_cb_t d->context = c; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -678,6 +685,7 @@ pa_operation* pa_context_get_client_info(pa_context *c, uint32_t idx, pa_client_ d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -715,6 +723,7 @@ pa_operation* pa_context_get_client_info_list(pa_context *c, pa_client_info_cb_t d->context = c; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -854,6 +863,7 @@ pa_operation* pa_context_get_sink_input_info(pa_context *c, uint32_t idx, pa_sin d->cb = cb; d->userdata = userdata; d->global = g; + pa_operation_sync(o); return o; } @@ -890,6 +900,7 @@ pa_operation* pa_context_get_sink_input_info_list(pa_context *c, pa_sink_input_i d->context = c; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -935,6 +946,7 @@ pa_operation* pa_context_set_sink_input_volume(pa_context *c, uint32_t idx, cons d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -957,6 +969,7 @@ pa_operation* pa_context_set_sink_input_mute(pa_context *c, uint32_t idx, int mu d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } diff --git a/src/operation.c b/src/operation.c index fa0c58735..99ee48531 100644 --- a/src/operation.c +++ b/src/operation.c @@ -36,7 +36,7 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb o->refcount = 1; o->context = c; o->stream = s; - o->seq = ++c->seq; + o->seq = SPA_ID_INVALID; o->state = PA_OPERATION_RUNNING; o->callback = cb; @@ -44,13 +44,19 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb spa_list_append(&c->operations, &o->link); pa_operation_ref(o); - - pw_log_debug("new %p %d", o, o->seq); - pw_core_proxy_sync(c->core_proxy, o->seq); + pw_log_debug("new %p", o); return o; } +int pa_operation_sync(pa_operation *o) +{ + pa_context *c = o->context; + o->seq = ++c->seq; + pw_log_debug("operation %p: sync %d", o, o->seq); + pw_core_proxy_sync(c->core_proxy, o->seq); + return 0; +} pa_operation *pa_operation_ref(pa_operation *o) { diff --git a/src/stream.c b/src/stream.c index 77e60522f..7886e1c33 100644 --- a/src/stream.c +++ b/src/stream.c @@ -340,6 +340,8 @@ static void update_timing_info(pa_stream *s) pw_stream_get_time(s->stream, &pwt); s->timing_info_valid = false; + s->queued = pwt.queued; + pw_log_debug("stream %p: %"PRIu64, s, s->queued); if (pwt.rate.num == 0) return; @@ -375,6 +377,15 @@ static void stream_process(void *data) update_timing_info(s); + if (s->drain && s->queued == 0) { + pa_operation *o = s->drain; + pa_operation_ref(o); + if (o->callback) + o->callback(o, o->userdata); + pa_operation_unref(o); + s->drain = NULL; + } + while (dequeue_buffer(s) == 0); if (s->dequeued_size <= 0) @@ -816,6 +827,7 @@ int pa_stream_disconnect(pa_stream *s) s->disconnecting = true; pw_stream_disconnect(s->stream); o = pa_operation_new(s->context, s, on_disconnected, 0); + pa_operation_sync(o); pa_operation_unref(o); return 0; @@ -1062,7 +1074,7 @@ static void on_success(pa_operation *o, void *userdata) pa_stream *s = o->stream; pa_operation_done(o); if (d->cb) - d->cb(s, PA_OK, d->userdata); + d->cb(s, 1, d->userdata); } pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) @@ -1076,10 +1088,14 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); + pw_log_debug("stream %p", s); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); d = o->userdata; d->cb = cb; d->userdata = userdata; + if (s->drain) + pa_operation_cancel(s->drain); + s->drain = o; return o; } @@ -1111,6 +1127,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1271,6 +1288,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1291,6 +1309,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1312,6 +1331,7 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1333,6 +1353,7 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1354,6 +1375,7 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1511,6 +1533,7 @@ pa_operation *pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1532,6 +1555,7 @@ pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_strea d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1553,6 +1577,7 @@ pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_ d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; } @@ -1573,6 +1598,7 @@ pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], d = o->userdata; d->cb = cb; d->userdata = userdata; + pa_operation_sync(o); return o; }