pulse: improve remote sync

Add an explicit method to make the operation to a roundtrip to sync
pending actions.
Implement drain.
This commit is contained in:
Wim Taymans 2018-08-02 10:31:29 +02:00
parent e64413fee1
commit 1ea54ac9c5
5 changed files with 70 additions and 14 deletions

View file

@ -271,28 +271,32 @@ static void remote_state_changed(void *data, enum pw_remote_state old,
o = pa_operation_new(c, NULL, on_ready, sizeof(struct ready_data));
d = o->userdata;
d->context = c;
pa_operation_sync(o);
pa_operation_unref(o);
break;
}
}
static void remote_sync_reply(void *data, uint32_t seq)
static void complete_operations(pa_context *c, uint32_t seq)
{
pa_context *c = data;
pa_operation *o;
pw_log_debug("done %d", seq);
spa_list_for_each(o, &c->operations, link) {
pa_operation *o, *t;
spa_list_for_each_safe(o, t, &c->operations, link) {
if (o->seq != seq)
continue;
pa_operation_ref(o);
if (o->callback)
o->callback(o, o->userdata);
pa_operation_unref(o);
break;
}
}
static void remote_sync_reply(void *data, uint32_t seq)
{
pa_context *c = data;
pw_log_debug("done %d", seq);
complete_operations(c, seq);
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = remote_state_changed,
@ -466,9 +470,10 @@ struct notify_data {
static void on_notify(pa_operation *o, void *userdata)
{
struct notify_data *d = userdata;
pa_context *c = o->context;
pa_operation_done(o);
if (d->cb)
d->cb(o->context, d->userdata);
d->cb(c, d->userdata);
}
pa_operation* pa_context_drain(pa_context *c, pa_context_notify_cb_t cb, void *userdata)
@ -480,6 +485,7 @@ pa_operation* pa_context_drain(pa_context *c, pa_context_notify_cb_t cb, void *u
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -493,9 +499,10 @@ struct success_data {
static void on_success(pa_operation *o, void *userdata)
{
struct success_data *d = userdata;
pa_context *c = o->context;
pa_operation_done(o);
if (d->cb)
d->cb(o->context, d->ret, d->userdata);
d->cb(c, d->ret, d->userdata);
}
pa_operation* pa_context_exit_daemon(pa_context *c, pa_context_success_cb_t cb, void *userdata)
@ -556,6 +563,7 @@ pa_operation* pa_context_set_name(pa_context *c, const char *name, pa_context_su
d->ret = PA_ERR_ACCESS;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}

View file

@ -358,6 +358,8 @@ struct pa_stream {
uint32_t buffer_offset;
float volume;
pa_operation *drain;
uint64_t queued;
};
void pa_stream_set_state(pa_stream *s, pa_stream_state_t st);
@ -385,6 +387,7 @@ struct pa_operation
pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb, size_t userdata_size);
void pa_operation_done(pa_operation *o);
int pa_operation_sync(pa_operation *o);
#ifdef __cplusplus
}

View file

@ -212,6 +212,7 @@ pa_operation* pa_context_get_sink_info_by_name(pa_context *c, const char *name,
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -241,6 +242,7 @@ pa_operation* pa_context_get_sink_info_by_index(pa_context *c, uint32_t idx, pa_
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -277,6 +279,7 @@ pa_operation* pa_context_get_sink_info_list(pa_context *c, pa_sink_info_cb_t cb,
d->context = c;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -418,6 +421,7 @@ pa_operation* pa_context_get_source_info_by_index(pa_context *c, uint32_t idx, p
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -454,6 +458,7 @@ pa_operation* pa_context_get_source_info_list(pa_context *c, pa_source_info_cb_t
d->context = c;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -568,6 +573,7 @@ pa_operation* pa_context_get_module_info(pa_context *c, uint32_t idx, pa_module_
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -605,6 +611,7 @@ pa_operation* pa_context_get_module_info_list(pa_context *c, pa_module_info_cb_t
d->context = c;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -678,6 +685,7 @@ pa_operation* pa_context_get_client_info(pa_context *c, uint32_t idx, pa_client_
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -715,6 +723,7 @@ pa_operation* pa_context_get_client_info_list(pa_context *c, pa_client_info_cb_t
d->context = c;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -854,6 +863,7 @@ pa_operation* pa_context_get_sink_input_info(pa_context *c, uint32_t idx, pa_sin
d->cb = cb;
d->userdata = userdata;
d->global = g;
pa_operation_sync(o);
return o;
}
@ -890,6 +900,7 @@ pa_operation* pa_context_get_sink_input_info_list(pa_context *c, pa_sink_input_i
d->context = c;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -935,6 +946,7 @@ pa_operation* pa_context_set_sink_input_volume(pa_context *c, uint32_t idx, cons
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -957,6 +969,7 @@ pa_operation* pa_context_set_sink_input_mute(pa_context *c, uint32_t idx, int mu
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}

View file

@ -36,7 +36,7 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
o->refcount = 1;
o->context = c;
o->stream = s;
o->seq = ++c->seq;
o->seq = SPA_ID_INVALID;
o->state = PA_OPERATION_RUNNING;
o->callback = cb;
@ -44,13 +44,19 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
spa_list_append(&c->operations, &o->link);
pa_operation_ref(o);
pw_log_debug("new %p %d", o, o->seq);
pw_core_proxy_sync(c->core_proxy, o->seq);
pw_log_debug("new %p", o);
return o;
}
int pa_operation_sync(pa_operation *o)
{
pa_context *c = o->context;
o->seq = ++c->seq;
pw_log_debug("operation %p: sync %d", o, o->seq);
pw_core_proxy_sync(c->core_proxy, o->seq);
return 0;
}
pa_operation *pa_operation_ref(pa_operation *o)
{

View file

@ -340,6 +340,8 @@ static void update_timing_info(pa_stream *s)
pw_stream_get_time(s->stream, &pwt);
s->timing_info_valid = false;
s->queued = pwt.queued;
pw_log_debug("stream %p: %"PRIu64, s, s->queued);
if (pwt.rate.num == 0)
return;
@ -375,6 +377,15 @@ static void stream_process(void *data)
update_timing_info(s);
if (s->drain && s->queued == 0) {
pa_operation *o = s->drain;
pa_operation_ref(o);
if (o->callback)
o->callback(o, o->userdata);
pa_operation_unref(o);
s->drain = NULL;
}
while (dequeue_buffer(s) == 0);
if (s->dequeued_size <= 0)
@ -816,6 +827,7 @@ int pa_stream_disconnect(pa_stream *s)
s->disconnecting = true;
pw_stream_disconnect(s->stream);
o = pa_operation_new(s->context, s, on_disconnected, 0);
pa_operation_sync(o);
pa_operation_unref(o);
return 0;
@ -1062,7 +1074,7 @@ static void on_success(pa_operation *o, void *userdata)
pa_stream *s = o->stream;
pa_operation_done(o);
if (d->cb)
d->cb(s, PA_OK, d->userdata);
d->cb(s, 1, d->userdata);
}
pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)
@ -1076,10 +1088,14 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE);
pw_log_debug("stream %p", s);
o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack));
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
if (s->drain)
pa_operation_cancel(s->drain);
s->drain = o;
return o;
}
@ -1111,6 +1127,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1271,6 +1288,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1291,6 +1309,7 @@ pa_operation* pa_stream_flush(pa_stream *s, pa_stream_success_cb_t cb, void *use
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1312,6 +1331,7 @@ pa_operation* pa_stream_prebuf(pa_stream *s, pa_stream_success_cb_t cb, void *us
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1333,6 +1353,7 @@ pa_operation* pa_stream_trigger(pa_stream *s, pa_stream_success_cb_t cb, void *u
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1354,6 +1375,7 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1511,6 +1533,7 @@ pa_operation *pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1532,6 +1555,7 @@ pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_strea
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1553,6 +1577,7 @@ pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}
@ -1573,6 +1598,7 @@ pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[],
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
pa_operation_sync(o);
return o;
}