stream: add support for Props param

Rework the node and port info management a little.
Make sure we emit a node info event when the Props param changed.
Implement enum_param on the node.
This commit is contained in:
Wim Taymans 2021-05-02 23:16:05 +02:00
parent 8bbb78d2bd
commit 9206fb019f

View file

@ -118,11 +118,16 @@ struct stream {
struct spa_io_position *position;
} rt;
uint32_t change_mask_all;
uint32_t port_change_mask_all;
struct spa_port_info port_info;
struct pw_properties *port_props;
struct spa_param_info port_params[5];
struct spa_list param_list;
struct spa_param_info params[5];
uint32_t change_mask_all;
struct spa_node_info info;
struct spa_param_info params[1];
uint32_t media_type;
uint32_t media_subtype;
@ -149,6 +154,16 @@ struct stream {
};
static int get_param_index(uint32_t id)
{
switch (id) {
case SPA_PARAM_Props:
return 0;
default:
return -1;
}
}
static int get_port_param_index(uint32_t id)
{
switch (id) {
case SPA_PARAM_EnumFormat:
@ -213,11 +228,14 @@ static struct param *add_param(struct stream *impl,
spa_list_append(&impl->param_list, &p->link);
idx = get_param_index(id);
if (idx != -1) {
impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
if ((idx = get_param_index(id)) != -1) {
impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL;
impl->params[idx].flags |= SPA_PARAM_INFO_READ;
} else if ((idx = get_port_param_index(id)) != -1) {
impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
impl->port_params[idx].flags ^= SPA_PARAM_INFO_SERIAL;
impl->port_params[idx].flags |= SPA_PARAM_INFO_READ;
}
return p;
@ -407,6 +425,57 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
return 0;
}
static int enum_params(void *object, bool is_port, int seq, uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter)
{
struct stream *d = object;
struct spa_result_node_params result;
uint8_t buffer[1024];
struct spa_pod_builder b = { 0 };
uint32_t count = 0;
struct param *p;
bool found = false;
spa_return_val_if_fail(num != 0, -EINVAL);
result.id = id;
result.next = 0;
pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id,
spa_debug_type_find_name(spa_type_param, id),
start, num);
spa_list_for_each(p, &d->param_list, link) {
struct spa_pod *param;
result.index = result.next++;
if (result.index < start)
continue;
param = p->param;
if (param == NULL || p->id != id)
continue;
found = true;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if (spa_pod_filter(&b, &result.param, param, filter) != 0)
continue;
spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
if (++count == num)
break;
}
return found ? 0 : -ENOENT;
}
static int impl_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter)
{
return enum_params(object, false, seq, id, start, num, filter);
}
static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param)
{
struct stream *impl = object;
@ -459,33 +528,19 @@ static int impl_send_command(void *object, const struct spa_command *command)
return 0;
}
static void emit_node_info(struct stream *d)
static void emit_node_info(struct stream *d, bool full)
{
struct spa_node_info info;
info = SPA_NODE_INFO_INIT();
if (d->direction == SPA_DIRECTION_INPUT) {
info.max_input_ports = 1;
info.max_output_ports = 0;
} else {
info.max_input_ports = 0;
info.max_output_ports = 1;
}
info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS;
/* we're always RT safe, if the stream was marked RT_PROCESS,
* the callback must be RT safe */
info.flags = SPA_NODE_FLAG_RT;
/* if the callback was not marked RT_PROCESS, we will offload
* the process callback in the main thread and we are ASYNC */
if (!d->process_rt)
info.flags |= SPA_NODE_FLAG_ASYNC;
spa_node_emit_info(&d->hooks, &info);
if (full)
d->info.change_mask = d->change_mask_all;
if (d->info.change_mask != 0)
spa_node_emit_info(&d->hooks, &d->info);
d->info.change_mask = 0;
}
static void emit_port_info(struct stream *d, bool full)
{
if (full)
d->port_info.change_mask = d->change_mask_all;
d->port_info.change_mask = d->port_change_mask_all;
if (d->port_info.change_mask != 0)
spa_node_emit_port_info(&d->hooks, d->direction, 0, &d->port_info);
d->port_info.change_mask = 0;
@ -501,7 +556,7 @@ static int impl_add_listener(void *object,
spa_hook_list_isolate(&d->hooks, &save, listener, events, data);
emit_node_info(d);
emit_node_info(d, true);
emit_port_info(d, true);
spa_hook_list_join(&d->hooks, &save);
@ -546,46 +601,7 @@ static int impl_port_enum_params(void *object, int seq,
uint32_t id, uint32_t start, uint32_t num,
const struct spa_pod *filter)
{
struct stream *d = object;
struct spa_result_node_params result;
uint8_t buffer[1024];
struct spa_pod_builder b = { 0 };
uint32_t count = 0;
struct param *p;
bool found = false;
spa_return_val_if_fail(num != 0, -EINVAL);
result.id = id;
result.next = 0;
pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id,
spa_debug_type_find_name(spa_type_param, id),
start, num);
spa_list_for_each(p, &d->param_list, link) {
struct spa_pod *param;
result.index = result.next++;
if (result.index < start)
continue;
param = p->param;
if (param == NULL || p->id != id)
continue;
found = true;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if (spa_pod_filter(&b, &result.param, param, filter) != 0)
continue;
spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
if (++count == num)
break;
}
return found ? 0 : -ENOENT;
return enum_params(object, true, seq, id, start, num, filter);
}
static int map_data(struct stream *impl, struct spa_data *data, int prot)
@ -892,6 +908,7 @@ static const struct spa_node_methods impl_node = {
SPA_VERSION_NODE_METHODS,
.add_listener = impl_add_listener,
.set_callbacks = impl_set_callbacks,
.enum_params = impl_enum_params,
.set_param = impl_set_param,
.set_io = impl_set_io,
.send_command = impl_send_command,
@ -1527,28 +1544,56 @@ pw_stream_connect(struct pw_stream *stream,
else
impl->node_methods.process = impl_node_process_output;
impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
impl->impl_node.iface = SPA_INTERFACE_INIT(
SPA_TYPE_INTERFACE_Node,
SPA_VERSION_NODE,
&impl->node_methods, impl);
impl->change_mask_all =
SPA_NODE_CHANGE_MASK_FLAGS |
SPA_NODE_CHANGE_MASK_PROPS |
SPA_NODE_CHANGE_MASK_PARAMS;
impl->info = SPA_NODE_INFO_INIT();
if (impl->direction == SPA_DIRECTION_INPUT) {
impl->info.max_input_ports = 1;
impl->info.max_output_ports = 0;
} else {
impl->info.max_input_ports = 0;
impl->info.max_output_ports = 1;
}
/* we're always RT safe, if the stream was marked RT_PROCESS,
* the callback must be RT safe */
impl->info.flags = SPA_NODE_FLAG_RT;
/* if the callback was not marked RT_PROCESS, we will offload
* the process callback in the main thread and we are ASYNC */
if (!impl->process_rt)
impl->info.flags |= SPA_NODE_FLAG_ASYNC;
impl->info.props = &stream->properties->dict;
impl->params[0] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_WRITE);
impl->info.params = impl->params;
impl->info.n_params = 1;
impl->info.change_mask = impl->change_mask_all;
impl->port_change_mask_all =
SPA_PORT_CHANGE_MASK_FLAGS |
SPA_PORT_CHANGE_MASK_PROPS |
SPA_PORT_CHANGE_MASK_PARAMS;
impl->port_info = SPA_PORT_INFO_INIT();
impl->port_info.change_mask = impl->change_mask_all;
impl->port_info.change_mask = impl->port_change_mask_all;
impl->port_info.flags = 0;
if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ALLOC_BUFFERS))
impl->port_info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS;
impl->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0);
impl->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0);
impl->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0);
impl->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
impl->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
impl->port_params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0);
impl->port_params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0);
impl->port_params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0);
impl->port_params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
impl->port_params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
impl->port_info.props = &impl->port_props->dict;
impl->port_info.params = impl->params;
impl->port_info.params = impl->port_params;
impl->port_info.n_params = 5;
clear_params(impl, SPA_ID_INVALID);
@ -1579,8 +1624,6 @@ pw_stream_connect(struct pw_stream *stream,
if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
if ((str = pw_properties_get(stream->properties, "mem.warn-mlock")) != NULL)
impl->warn_mlock = pw_properties_parse_bool(str);
@ -1766,6 +1809,7 @@ int pw_stream_update_params(struct pw_stream *stream,
if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0)
return res;
emit_node_info(impl, false);
emit_port_info(impl, false);
return res;