diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index fb04e1995..9f401566a 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -46,6 +46,8 @@ struct type { uint32_t client_node; + uint32_t prop_volume; + uint32_t io_prop_volume; struct spa_type_media_type media_type; struct spa_type_media_subtype media_subtype; }; @@ -53,6 +55,10 @@ struct type { static inline void init_type(struct type *type, struct spa_type_map *map) { type->client_node = spa_type_map_get_id(map, PW_TYPE_INTERFACE__ClientNode); + + type->prop_volume = spa_type_map_get_id(map, SPA_TYPE_PROPS__volume); + type->io_prop_volume = spa_type_map_get_id(map, SPA_TYPE_IO_PROP_BASE "volume"); + spa_type_media_type_map(map, &type->media_type); spa_type_media_subtype_map(map, &type->media_subtype); } @@ -76,10 +82,37 @@ struct data { struct spa_hook stream_listener; }; +struct param { +#define PARAM_TYPE_INIT (1 << 0) +#define PARAM_TYPE_OTHER (1 << 1) +#define PARAM_TYPE_FORMAT (1 << 2) + int type; + struct spa_pod *param; +}; + +#define DEFAULT_VOLUME 1.0 + +struct props { + float volume; +}; + +static void reset_props(struct props *props) +{ + props->volume = DEFAULT_VOLUME; +} + +#define DEFAULT_VOLUME 1.0 + +struct control { + struct spa_pod_float *volume; +}; + struct stream { struct pw_stream this; struct type type; + struct props props; + struct control control; const char *path; @@ -100,32 +133,51 @@ struct stream { void *callbacks_data; struct spa_io_buffers *io; + struct pw_array params; + struct buffer buffers[MAX_BUFFERS]; int n_buffers; struct queue dequeued; struct queue queued; - uint32_t n_init_params; - struct spa_pod **init_params; - - uint32_t n_params; - struct spa_pod **params; - - struct spa_pod *format; - struct spa_pod *conv_format; - uint32_t pending_seq; bool disconnecting; - int64_t last_ticks; - int32_t last_rate; - int64_t last_monotonic; - bool free_data; struct data data; }; +static struct param *add_param(struct pw_stream *stream, + int type, const struct spa_pod *param) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + struct param *p; + + p = pw_array_add(&impl->params, sizeof(struct param)); + if (p == NULL) + return NULL; + + p->type = type; + p->param = pw_spa_pod_copy(param); + return p; +} + +static void clear_params(struct pw_stream *stream, int type) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + struct param *p; + + p = pw_array_first(&impl->params); + while (pw_array_check(&impl->params, p)) { + if (SPA_FLAG_CHECK(p->type, type)) { + free(p->param); + pw_array_remove(&impl->params, p); + } + else + p++; + } +} static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer) { @@ -236,18 +288,6 @@ static int impl_send_command(struct spa_node *node, const struct spa_command *co } stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); } - } else if (SPA_COMMAND_TYPE(command) == t->command_node.ClockUpdate) { - struct spa_command_node_clock_update *cu = (__typeof__(cu)) command; - - if (cu->body.flags.value & SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE) { - pw_properties_set(stream->properties, PW_STREAM_PROP_IS_LIVE, "1"); - pw_properties_setf(stream->properties, - PW_STREAM_PROP_LATENCY_MIN, "%" PRId64, - cu->body.latency.value); - } - impl->last_ticks = cu->body.ticks.value; - impl->last_rate = cu->body.rate.value; - impl->last_monotonic = cu->body.monotonic_time.value; } else { pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); } @@ -309,9 +349,22 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, struct pw_type *t = impl->t; int res = 0; - if (id == t->io.Buffers && size >= sizeof(struct spa_io_buffers)) { - pw_log_debug("stream %p: set io %d %p %zd", impl, id, data, size); - impl->io = data; + pw_log_debug("stream %p: set io %s %p %zd", impl, + spa_type_map_get_type(t->map, id), data, size); + + if (id == t->io.Buffers) { + if (data && size >= sizeof(struct spa_io_buffers)) + impl->io = data; + else + impl->io = NULL; + } + else if (id == impl->type.io_prop_volume) { + if (data && size >= sizeof(struct spa_pod_float)) { + impl->control.volume = data; + impl->control.volume->value = impl->props.volume; + } + else + impl->control.volume = NULL; } else res = -ENOENT; @@ -343,16 +396,11 @@ static int impl_port_enum_params(struct spa_node *node, struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); struct spa_pod *param; uint32_t last_id = SPA_ID_INVALID; + uint32_t n_params = pw_array_get_len(&d->params, struct param); while (true) { - if (*index < d->n_init_params) { - param = d->init_params[*index]; - } - else if (*index < d->n_init_params + d->n_params) { - param = d->params[*index - d->n_init_params]; - } - else if (*index == (d->n_params + d->n_init_params) && d->format) { - param = d->format; + if (*index < n_params) { + param = pw_array_get_unchecked(&d->params, *index, struct param)->param; } else if (last_id != SPA_ID_INVALID) return 1; @@ -392,36 +440,43 @@ static int port_set_format(struct spa_node *node, struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); struct pw_stream *stream = &impl->this; struct pw_type *t = impl->t; + struct param *p; int count; pw_log_debug("stream %p: format changed:", impl); if (pw_log_level >= SPA_LOG_LEVEL_DEBUG) spa_debug_pod(format, SPA_DEBUG_FLAG_FORMAT); - if (impl->format) - free(impl->format); - + clear_params(stream, PARAM_TYPE_FORMAT); if (spa_pod_is_object_type(format, t->spa_format)) { - impl->format = pw_spa_pod_copy(format); - ((struct spa_pod_object*)impl->format)->body.id = t->param.idFormat; + p = add_param(stream, PARAM_TYPE_FORMAT, format); + if (p == NULL) + goto no_mem; + + ((struct spa_pod_object*)p->param)->body.id = t->param.idFormat; } else - impl->format = NULL; + p = NULL; count = spa_hook_list_call(&stream->listener_list, struct pw_stream_events, format_changed, - impl->format); + p ? p->param : NULL); if (count == 0) pw_stream_finish_format(stream, 0, NULL, 0); - if (impl->format) - stream_set_state(stream, PW_STREAM_STATE_READY, NULL); - else - stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); + stream_set_state(stream, + p ? + PW_STREAM_STATE_READY : + PW_STREAM_STATE_CONFIGURE, + NULL); return 0; + + no_mem: + pw_stream_finish_format(stream, -ENOMEM, NULL, 0); + return -ENOMEM; } static int impl_port_set_param(struct spa_node *node, @@ -561,10 +616,11 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc impl->n_buffers = n_buffers; - if (n_buffers > 0) - stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); - else - stream_set_state(stream, PW_STREAM_STATE_READY, NULL); + stream_set_state(stream, + n_buffers > 0 ? + PW_STREAM_STATE_PAUSED : + PW_STREAM_STATE_READY, + NULL); return 0; } @@ -599,12 +655,11 @@ static int impl_node_process_input(struct spa_node *node) done: /* pop buffer to recycle */ - if ((b = pop_queue(impl, &impl->queued)) != NULL) - io->buffer_id = b->id; - else - io->buffer_id = SPA_ID_INVALID; + b = pop_queue(impl, &impl->queued); + io->buffer_id = b ? b->id : SPA_ID_INVALID; io->status = SPA_STATUS_NEED_BUFFER; + return SPA_STATUS_HAVE_BUFFER; } @@ -623,9 +678,9 @@ static int impl_node_process_output(struct spa_node *node) res = 0; if (io->status != SPA_STATUS_HAVE_BUFFER) { /* recycle old buffer */ - if ((b = get_buffer(stream, io->buffer_id)) != NULL) { + if ((b = get_buffer(stream, io->buffer_id)) != NULL) push_queue(impl, &impl->dequeued, b); - } + /* pop new buffer */ if ((b = pop_queue(impl, &impl->queued)) != NULL) { io->buffer_id = b->id; @@ -758,9 +813,11 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, this->name = name ? strdup(name) : NULL; init_type(&impl->type, remote->core->type.map); + reset_props(&impl->props); spa_ringbuffer_init(&impl->dequeued.ring); spa_ringbuffer_init(&impl->queued.ring); + pw_array_init(&impl->params, sizeof(struct param) * 8); spa_hook_list_init(&this->listener_list); @@ -836,46 +893,6 @@ const char *pw_stream_state_as_string(enum pw_stream_state state) return "invalid-state"; } -static void -set_init_params(struct pw_stream *stream, - int n_init_params, - const struct spa_pod **init_params) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - int i; - - if (impl->init_params) { - for (i = 0; i < impl->n_init_params; i++) - free(impl->init_params[i]); - free(impl->init_params); - impl->init_params = NULL; - } - if (n_init_params > 0) { - impl->init_params = malloc(n_init_params * sizeof(struct spa_pod *)); - for (i = 0; i < n_init_params; i++) - impl->init_params[i] = pw_spa_pod_copy(init_params[i]); - } - impl->n_init_params = n_init_params; -} - -static void set_params(struct pw_stream *stream, int n_params, const struct spa_pod **params) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - int i; - - if (impl->params) { - for (i = 0; i < impl->n_params; i++) - free(impl->params[i]); - free(impl->params); - impl->params = NULL; - } - impl->n_params = n_params; - if (n_params > 0) { - impl->params = malloc(n_params * sizeof(struct spa_pod *)); - for (i = 0; i < n_params; i++) - impl->params[i] = pw_spa_pod_copy(params[i]); - } -} void pw_stream_destroy(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -889,11 +906,7 @@ void pw_stream_destroy(struct pw_stream *stream) spa_hook_remove(&impl->remote_listener); spa_list_remove(&stream->link); - set_init_params(stream, 0, NULL); - set_params(stream, 0, NULL); - - if (impl->format) - free(impl->format); + clear_params(stream, PARAM_TYPE_INIT | PARAM_TYPE_OTHER | PARAM_TYPE_FORMAT); if (stream->error) free(stream->error); @@ -940,6 +953,24 @@ struct pw_remote *pw_stream_get_remote(struct pw_stream *stream) return stream->remote; } +static void add_controls(struct pw_stream *stream) +{ + struct stream *s = SPA_CONTAINER_OF(stream, struct stream, this); + uint8_t buffer[4096]; + struct spa_pod_builder b; + struct pw_type *t = s->t; + + spa_pod_builder_init(&b, buffer, 4096); + add_param(stream, PARAM_TYPE_INIT, + spa_pod_builder_object(&b, + t->param_io.idPropsOut, t->param_io.Prop, + ":", t->param_io.id, "I", s->type.io_prop_volume, + ":", t->param_io.size, "i", sizeof(struct spa_pod_float), + ":", t->param.propId, "I", s->type.prop_volume, + ":", t->param.propType, "fru", s->props.volume, + SPA_POD_PROP_MIN_MAX(0.0, 10.0))); +} + int pw_stream_connect(struct pw_stream *stream, enum pw_direction direction, @@ -950,14 +981,18 @@ pw_stream_connect(struct pw_stream *stream, { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); enum pw_remote_state state; - int res; + int i, res; pw_log_debug("stream %p: connect", stream); impl->direction = direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT; impl->flags = flags; - set_init_params(stream, n_params, params); + clear_params(stream, PARAM_TYPE_INIT); + for (i = 0; i < n_params; i++) + add_param(stream, PARAM_TYPE_INIT, params[i]); + + add_controls(stream); stream_set_state(stream, PW_STREAM_STATE_CONNECTING, NULL); @@ -1010,14 +1045,52 @@ void pw_stream_finish_format(struct pw_stream *stream, uint32_t n_params) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + int i; pw_log_debug("stream %p: finish format %d %d", stream, res, impl->pending_seq); - set_params(stream, n_params, params); + clear_params(stream, PARAM_TYPE_OTHER); + for (i = 0; i < n_params; i++) + add_param(stream, PARAM_TYPE_OTHER, params[i]); impl->pending_seq = SPA_ID_INVALID; } +int pw_stream_set_control(struct pw_stream *stream, + const char *name, float value) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + if (strcmp(name, PW_STREAM_CONTROL_VOLUME) == 0) { + impl->props.volume = value; + if (stream->state >= PW_STREAM_STATE_READY) { + if (impl->control.volume == NULL) + return -ENODEV; + impl->control.volume->value = value; + } + } + else + return -ENOTSUP; + + return 0; +} + +int pw_stream_get_control(struct pw_stream *stream, + const char *name, float *value) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + if (strcmp(name, PW_STREAM_CONTROL_VOLUME) == 0) { + if (impl->control.volume == NULL) + return -ENODEV; + *value = impl->control.volume->value; + } + else + return -ENOTSUP; + + return 0; +} + int pw_stream_set_active(struct pw_stream *stream, bool active) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index e532a9224..4d9ccc60d 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -287,6 +287,21 @@ pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */ * buffer allocation. */ uint32_t n_params /**< number of elements in \a params */); + +/** Audio controls */ +#define PW_STREAM_CONTROL_VOLUME "volume" + +/** Video controls */ +#define PW_STREAM_CONTROL_CONTRAST "contrast" +#define PW_STREAM_CONTROL_BRIGHTNESS "brightness" +#define PW_STREAM_CONTROL_HUE "hue" +#define PW_STREAM_CONTROL_SATURATION "saturation" + +/** Set a control value */ +int pw_stream_set_control(struct pw_stream *stream, const char *name, float value); +/** Get a control value */ +int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value); + /** Activate or deactivate the stream \memberof pw_stream */ int pw_stream_set_active(struct pw_stream *stream, bool active); @@ -306,6 +321,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream); /** Submit a buffer for playback or recycle a buffer for capture. */ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer); + #ifdef __cplusplus } #endif