diff --git a/src/context.c b/src/context.c index 3087fa082..826002b81 100644 --- a/src/context.c +++ b/src/context.c @@ -86,6 +86,48 @@ pa_context *pa_context_new(pa_mainloop_api *mainloop, const char *name) return pa_context_new_with_proplist(mainloop, name, NULL); } +static int set_mask(pa_context *c, struct global *g) +{ + const char *str; + + if (g->type == c->t->node) { + if (g->props == NULL) + return 0; + if ((str = pw_properties_get(g->props, "media.class")) == NULL) + return 0; + + if (strcmp(str, "Audio/Sink") == 0) { + g->mask = PA_SUBSCRIPTION_MASK_SINK; + g->event = PA_SUBSCRIPTION_EVENT_SINK; + } + else if (strcmp(str, "Audio/Source") == 0) { + g->mask = PA_SUBSCRIPTION_MASK_SOURCE; + g->event = PA_SUBSCRIPTION_EVENT_SOURCE; + } + else if (strcmp(str, "Stream/Output/Audio") == 0) { + g->mask = PA_SUBSCRIPTION_MASK_SINK_INPUT; + g->event = PA_SUBSCRIPTION_EVENT_SINK_INPUT; + } + else if (strcmp(str, "Stream/Input/Audio") == 0) { + g->mask = PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT; + g->event = PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT; + } + else + return 0; + } + else if (g->type == c->t->module) { + g->mask = PA_SUBSCRIPTION_MASK_MODULE; + g->event = PA_SUBSCRIPTION_EVENT_MODULE; + } + else if (g->type == c->t->client) { + g->mask = PA_SUBSCRIPTION_MASK_CLIENT; + g->event = PA_SUBSCRIPTION_EVENT_CLIENT; + } + else + return 0; + + return 1; +} static void registry_event_global(void *data, uint32_t id, uint32_t parent_id, uint32_t permissions, uint32_t type, uint32_t version, @@ -101,7 +143,18 @@ static void registry_event_global(void *data, uint32_t id, uint32_t parent_id, g->type = type; g->props = props ? pw_properties_new_dict(props) : NULL; + if (set_mask(c, g) == 0) + return; + spa_list_append(&c->globals, &g->link); + + if (c->subscribe_mask & g->mask) { + if (c->subscribe_callback) + c->subscribe_callback(c, + PA_SUBSCRIPTION_EVENT_NEW | g->event, + g->id, + c->subscribe_userdata); + } } struct global *pa_context_find_global(pa_context *c, uint32_t id) @@ -123,6 +176,14 @@ static void registry_event_global_remove(void *object, uint32_t id) if ((g = pa_context_find_global(c, id)) == NULL) return; + if (c->subscribe_mask & g->mask) { + if (c->subscribe_callback) + c->subscribe_callback(c, + PA_SUBSCRIPTION_EVENT_REMOVE | g->event, + g->id, + c->subscribe_userdata); + } + spa_list_remove(&g->link); if (g->props) pw_properties_free(g->props); diff --git a/src/internal.h b/src/internal.h index fb96bb108..c14c6ad74 100644 --- a/src/internal.h +++ b/src/internal.h @@ -197,6 +197,9 @@ struct global { uint32_t type; struct pw_properties *props; + pa_subscription_mask_t mask; + pa_subscription_event_type_t event; + void *info; pw_destroy_t destroy; @@ -337,6 +340,8 @@ struct pa_stream { void *buffer_data; uint32_t buffer_size; uint32_t buffer_offset; + + float volume; }; void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); diff --git a/src/introspect.c b/src/introspect.c index ce2af79a8..4b2f4fb2f 100644 --- a/src/introspect.c +++ b/src/introspect.c @@ -20,13 +20,12 @@ #include #include +#include #include #include "internal.h" -typedef int (*global_filter_t)(pa_context *c, struct global *g, bool full); - static void node_event_info(void *object, struct pw_node_info *info) { struct global *g = object; @@ -104,13 +103,12 @@ static int ensure_global(pa_context *c, struct global *g) return 0; } -static void ensure_types(pa_context *c, uint32_t type, global_filter_t filter) +static void ensure_types(pa_context *c, uint32_t mask) { struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!filter(c, g, false)) - continue; - ensure_global(c, g); + if (g->mask & mask) + ensure_global(c, g); } } @@ -152,26 +150,19 @@ static void sink_info(pa_operation *o, void *userdata) pa_operation_done(o); } -static int sink_filter(pa_context *c, struct global *g, bool full) -{ - const char *str; - - if (g->type != c->t->node) - return 0; - if (g->props == NULL) - return 0; - if ((str = pw_properties_get(g->props, "media.class")) == NULL) - return 0; - if (strcmp(str, "Audio/Sink") != 0) - return 0; - return 1; -} - -static struct global *find_sink_by_name(pa_context *c, const char *name) +static struct global *find_type_by_name(pa_context *c, uint32_t mask, const char *name) { struct global *g; + const char *str; + spa_list_for_each(g, &c->globals, link) { - if (sink_filter(c, g, true)) + if (!(g->mask & mask)) + continue; + if (g->props == NULL) + continue; + if ((str = pw_properties_get(g->props, "node.name")) == NULL) + continue; + if (strcmp(str, name) == 0) return g; } return NULL; @@ -190,7 +181,7 @@ pa_operation* pa_context_get_sink_info_by_name(pa_context *c, const char *name, PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(c, !name || *name, PA_ERR_INVALID); - if ((g = find_sink_by_name(c, name)) == NULL) + if ((g = find_type_by_name(c, PA_SUBSCRIPTION_MASK_SINK, name)) == NULL) return NULL; ensure_global(c, g); @@ -219,7 +210,7 @@ pa_operation* pa_context_get_sink_info_by_index(pa_context *c, uint32_t idx, pa_ if ((g = pa_context_find_global(c, idx)) == NULL) return NULL; - if (!sink_filter(c, g, false)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SINK)) return NULL; ensure_global(c, g); @@ -240,7 +231,7 @@ static void sink_info_list(pa_operation *o, void *userdata) struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!sink_filter(c, g, true)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SINK)) continue; d->global = g; sink_callback(d); @@ -260,7 +251,7 @@ pa_operation* pa_context_get_sink_info_list(pa_context *c, pa_sink_info_cb_t cb, PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); - ensure_types(c, c->t->node, sink_filter); + ensure_types(c, PA_SUBSCRIPTION_MASK_SINK); o = pa_operation_new(c, NULL, sink_info_list, sizeof(struct sink_data)); d = o->userdata; d->context = c; @@ -272,25 +263,25 @@ pa_operation* pa_context_get_sink_info_list(pa_context *c, pa_sink_info_cb_t cb, pa_operation* pa_context_set_sink_volume_by_index(pa_context *c, uint32_t idx, const pa_cvolume *volume, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); + pw_log_warn("Not Implemented %d", idx); return NULL; } pa_operation* pa_context_set_sink_volume_by_name(pa_context *c, const char *name, const pa_cvolume *volume, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); + pw_log_warn("Not Implemented %s", name); return NULL; } pa_operation* pa_context_set_sink_mute_by_index(pa_context *c, uint32_t idx, int mute, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); + pw_log_warn("Not Implemented %d", mute); return NULL; } pa_operation* pa_context_set_sink_mute_by_name(pa_context *c, const char *name, int mute, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); + pw_log_warn("Not Implemented %s", name); return NULL; } @@ -350,21 +341,6 @@ static void source_info(pa_operation *o, void *userdata) d->cb(d->context, NULL, 1, d->userdata); } -static int source_filter(pa_context *c, struct global *g, bool full) -{ - const char *str; - - if (g->type != c->t->node) - return 0; - if (g->props == NULL) - return 0; - if ((str = pw_properties_get(g->props, "media.class")) == NULL) - return 0; - if (strcmp(str, "Audio/Source") != 0) - return 0; - return 1; -} - pa_operation* pa_context_get_source_info_by_name(pa_context *c, const char *name, pa_source_info_cb_t cb, void *userdata) { pw_log_warn("Not Implemented"); @@ -385,13 +361,16 @@ pa_operation* pa_context_get_source_info_by_index(pa_context *c, uint32_t idx, p if ((g = pa_context_find_global(c, idx)) == NULL) return NULL; - if (!source_filter(c, g, false)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SOURCE)) return NULL; ensure_global(c, g); o = pa_operation_new(c, NULL, source_info, sizeof(struct source_data)); d = o->userdata; + d->context = c; + d->cb = cb; + d->userdata = userdata; d->global = g; return o; } @@ -403,7 +382,7 @@ static void source_info_list(pa_operation *o, void *userdata) struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!source_filter(c, g, true)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SOURCE)) continue; d->global = g; source_callback(d); @@ -422,7 +401,7 @@ pa_operation* pa_context_get_source_info_list(pa_context *c, pa_source_info_cb_t PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); - ensure_types(c, c->t->node, source_filter); + ensure_types(c, PA_SUBSCRIPTION_MASK_SOURCE); o = pa_operation_new(c, NULL, source_info_list, sizeof(struct source_data)); d = o->userdata; d->context = c; @@ -516,13 +495,6 @@ static void module_info(pa_operation *o, void *userdata) d->cb(d->context, NULL, 1, d->userdata); } -static int module_filter(pa_context *c, struct global *g, bool full) -{ - if (g->type != c->t->module) - return 0; - return 1; -} - pa_operation* pa_context_get_module_info(pa_context *c, uint32_t idx, pa_module_info_cb_t cb, void *userdata) { pa_operation *o; @@ -537,13 +509,16 @@ pa_operation* pa_context_get_module_info(pa_context *c, uint32_t idx, pa_module_ if ((g = pa_context_find_global(c, idx)) == NULL) return NULL; - if (!module_filter(c, g, false)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_MODULE)) return NULL; ensure_global(c, g); o = pa_operation_new(c, NULL, module_info, sizeof(struct module_data)); d = o->userdata; + d->context = c; + d->cb = cb; + d->userdata = userdata; d->global = g; return o; @@ -556,7 +531,7 @@ static void module_info_list(pa_operation *o, void *userdata) struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!module_filter(c, g, true)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_MODULE)) continue; d->global = g; module_callback(d); @@ -575,7 +550,7 @@ pa_operation* pa_context_get_module_info_list(pa_context *c, pa_module_info_cb_t PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); - ensure_types(c, c->t->module, module_filter); + ensure_types(c, PA_SUBSCRIPTION_MASK_MODULE); o = pa_operation_new(c, NULL, module_info_list, sizeof(struct module_data)); d = o->userdata; d->context = c; @@ -628,13 +603,6 @@ static void client_info(pa_operation *o, void *userdata) d->cb(d->context, NULL, 1, d->userdata); } -static int client_filter(pa_context *c, struct global *g, bool full) -{ - if (g->type != c->t->client) - return 0; - return 1; -} - pa_operation* pa_context_get_client_info(pa_context *c, uint32_t idx, pa_client_info_cb_t cb, void *userdata) { pa_operation *o; @@ -649,13 +617,16 @@ pa_operation* pa_context_get_client_info(pa_context *c, uint32_t idx, pa_client_ if ((g = pa_context_find_global(c, idx)) == NULL) return NULL; - if (!client_filter(c, g, false)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_CLIENT)) return NULL; ensure_global(c, g); o = pa_operation_new(c, NULL, client_info, sizeof(struct client_data)); d = o->userdata; + d->context = c; + d->cb = cb; + d->userdata = userdata; d->global = g; return o; @@ -668,7 +639,7 @@ static void client_info_list(pa_operation *o, void *userdata) struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!client_filter(c, g, true)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_CLIENT)) continue; d->global = g; client_callback(d); @@ -687,7 +658,7 @@ pa_operation* pa_context_get_client_info_list(pa_context *c, pa_client_info_cb_t PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); - ensure_types(c, c->t->client, client_filter); + ensure_types(c, PA_SUBSCRIPTION_MASK_CLIENT); o = pa_operation_new(c, NULL, client_info_list, sizeof(struct client_data)); d = o->userdata; d->context = c; @@ -753,6 +724,7 @@ static void sink_input_callback(struct sink_input_data *d) pa_sink_input_info i; pa_format_info ii[1]; + pw_log_debug("index %d", g->id); spa_zero(i); i.index = g->id; i.name = info->name; @@ -761,7 +733,14 @@ static void sink_input_callback(struct sink_input_data *d) ii[0].encoding = PA_ENCODING_PCM; ii[0].plist = pa_proplist_new(); i.format = ii; - + i.resample_method = "PipeWire resampler"; + i.driver = "PipeWire"; + i.mute = false; + i.corked = false; + i.has_volume = true; + i.volume_writable = true; + i.volume.channels = 1; + i.volume.values[0] = PA_VOLUME_NORM; d->cb(d->context, &i, 0, d->userdata); } @@ -772,27 +751,6 @@ static void sink_input_info(pa_operation *o, void *userdata) d->cb(d->context, NULL, 1, d->userdata); } -static int sink_input_filter(pa_context *c, struct global *g, bool full) -{ - const char *str; - struct pw_node_info *info = g->info; - - if (g->type != c->t->node) - return 0; - - if (full) { - if (info == NULL || info->props == NULL) - return 0; - if ((str = spa_dict_lookup(info->props, "node.stream")) == NULL) - return 0; - if (pw_properties_parse_bool(str) == false) - return 0; - if (info->n_output_ports == 0) - return 0; - } - return 1; -} - pa_operation* pa_context_get_sink_input_info(pa_context *c, uint32_t idx, pa_sink_input_info_cb_t cb, void *userdata) { pa_operation *o; @@ -807,13 +765,16 @@ pa_operation* pa_context_get_sink_input_info(pa_context *c, uint32_t idx, pa_sin if ((g = pa_context_find_global(c, idx)) == NULL) return NULL; - if (!sink_input_filter(c, g, false)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SINK_INPUT)) return NULL; ensure_global(c, g); o = pa_operation_new(c, NULL, sink_input_info, sizeof(struct sink_input_data)); d = o->userdata; + d->context = c; + d->cb = cb; + d->userdata = userdata; d->global = g; return o; } @@ -825,7 +786,7 @@ static void sink_input_info_list(pa_operation *o, void *userdata) struct global *g; spa_list_for_each(g, &c->globals, link) { - if (!sink_input_filter(c, g, true)) + if (!(g->mask & PA_SUBSCRIPTION_MASK_SINK_INPUT)) continue; d->global = g; sink_input_callback(d); @@ -844,7 +805,7 @@ pa_operation* pa_context_get_sink_input_info_list(pa_context *c, pa_sink_input_i PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); - ensure_types(c, c->t->node, sink_input_filter); + ensure_types(c, PA_SUBSCRIPTION_MASK_SINK_INPUT); o = pa_operation_new(c, NULL, sink_input_info_list, sizeof(struct sink_input_data)); d = o->userdata; d->context = c; @@ -865,16 +826,71 @@ pa_operation* pa_context_move_sink_input_by_index(pa_context *c, uint32_t idx, u return NULL; } +static pa_stream *find_stream(pa_context *c, uint32_t idx) +{ + pa_stream *s; + spa_list_for_each(s, &c->streams, link) { + if (pw_stream_get_node_id(s->stream) == idx) + return s; + } + return NULL; +} + +struct success_ack { + pa_context_success_cb_t cb; + void *userdata; +}; + +static void on_success(pa_operation *o, void *userdata) +{ + struct success_ack *d = userdata; + pa_context *c = o->context; + pa_operation_done(o); + if (d->cb) + d->cb(c, PA_OK, d->userdata); +} + pa_operation* pa_context_set_sink_input_volume(pa_context *c, uint32_t idx, const pa_cvolume *volume, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_stream *s; + pa_operation *o; + struct success_ack *d; + + if ((s = find_stream(c, idx)) == NULL) + return NULL; + + s->volume = pa_cvolume_avg(volume) / (float) PA_VOLUME_NORM; + pw_stream_set_control(s->stream, PW_STREAM_CONTROL_VOLUME, s->volume); + + o = pa_operation_new(c, NULL, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + + return o; } pa_operation* pa_context_set_sink_input_mute(pa_context *c, uint32_t idx, int mute, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_stream *s; + pa_operation *o; + struct success_ack *d; + + if ((s = find_stream(c, idx)) == NULL) + return NULL; + + s->mute = mute; + if (mute) + pw_stream_set_control(s->stream, PW_STREAM_CONTROL_VOLUME, 0.0); + else + pw_stream_set_control(s->stream, PW_STREAM_CONTROL_VOLUME, s->volume); + + o = pa_operation_new(c, NULL, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + + return o; } pa_operation* pa_context_kill_sink_input(pa_context *c, uint32_t idx, pa_context_success_cb_t cb, void *userdata) diff --git a/src/stream.c b/src/stream.c index ea44a27ed..1a7dee4a6 100644 --- a/src/stream.c +++ b/src/stream.c @@ -30,6 +30,8 @@ #include #include "internal.h" +#define MIN_QUEUED 1 + struct pending_data { struct spa_list link; @@ -85,12 +87,11 @@ static int dequeue_buffer(pa_stream *s) spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); s->dequeued[index & MASK_BUFFERS] = buf; - spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); - if (s->direction == PA_STREAM_PLAYBACK) s->dequeued_size += buf->buffer->datas[0].maxsize; else s->dequeued_size += buf->buffer->datas[0].chunk->size; + spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); return 0; } @@ -151,7 +152,7 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att if (attr->maxlength == -1) buffers = 3; else - buffers = SPA_CLAMP(attr->maxlength / (maxsize * stride), 3, 64); + buffers = SPA_CLAMP(attr->maxlength / (maxsize * stride), 3, MAX_BUFFERS); pw_log_info("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, size, buffers); @@ -159,7 +160,7 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att param = spa_pod_builder_object(b, t->param.idBuffers, t->param_buffers.Buffers, ":", t->param_buffers.buffers, "iru", buffers, - SPA_POD_PROP_MIN_MAX(3, 64), + SPA_POD_PROP_MIN_MAX(3, MAX_BUFFERS), ":", t->param_buffers.blocks, "i", blocks, ":", t->param_buffers.size, "iru", size * stride, SPA_POD_PROP_MIN_MAX(size * stride, maxsize * stride), @@ -210,7 +211,7 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ if (attr->minreq == (uint32_t) -1) - attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */ + attr->minreq = attr->tlength; /* Ask for more data when there are only 200ms left in the playback buffer */ if (attr->prebuf == (uint32_t) -1) attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */ @@ -276,7 +277,7 @@ static void stream_process(void *data) s->timing_info_valid = true; - if (dequeue_buffer(s) < 0 && s->dequeued_size == 0) + if (dequeue_buffer(s) < 0 && s->dequeued_size <= 0) return; if (s->direction == PA_STREAM_PLAYBACK) { @@ -466,10 +467,7 @@ uint32_t pa_stream_get_index(pa_stream *s) spa_assert(s); spa_assert(s->refcount >= 1); - PA_CHECK_VALIDITY_RETURN_ANY(s->context, - s->state == PA_STREAM_READY, PA_ERR_BADSTATE, PA_INVALID_INDEX); - - return s->stream_index; + return pw_stream_get_node_id(s->stream); } void pa_stream_set_state(pa_stream *s, pa_stream_state_t st) { @@ -583,6 +581,10 @@ static int create_stream(pa_stream_direction_t direction, s->direction = direction; s->timing_info_valid = false; s->disconnecting = false; + if (volume) + s->volume = pa_cvolume_avg(volume) / (float) PA_VOLUME_NORM; + else + s->volume = 1.0; pa_stream_set_state(s, PA_STREAM_CREATING); @@ -697,7 +699,7 @@ int peek_buffer(pa_stream *s) if (s->buffer != NULL) return 0; - if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) <= 0) + if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) < MIN_QUEUED) return -EPIPE; s->buffer = s->dequeued[index & MASK_BUFFERS]; @@ -719,12 +721,11 @@ int queue_buffer(pa_stream *s) if (s->buffer == NULL) return 0; - spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); - if (s->direction == PA_STREAM_PLAYBACK) s->dequeued_size -= s->buffer->buffer->datas[0].maxsize; else s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size; + spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); pw_stream_queue_buffer(s->stream, s->buffer); s->buffer = NULL; diff --git a/src/subscribe.c b/src/subscribe.c index a78a4d009..953cfd1c7 100644 --- a/src/subscribe.c +++ b/src/subscribe.c @@ -44,6 +44,8 @@ pa_operation* pa_context_subscribe(pa_context *c, pa_subscription_mask_t m, pa_c pa_assert(c); pa_assert(c->refcount >= 1); + c->subscribe_mask = m; + o = pa_operation_new(c, NULL, on_subscribed, sizeof(struct subscribe_data)); d = o->userdata; d->cb = cb;