node: make add_listener method

Make struct spa_node_events for events emited from the main thread
and keep the spa_node_callbacks for the data thread callbacks.

The add_listener method installs the events and it's possible to
install multiple handles. Adding a listener first emits the info
and port_info events when installed, similar to how the PipeWire
proxy bind works.

This removes the need for the spa_pending_queue and makes it easier
to implement the _sync versions.

Add some helpers to make it easier for plugins to emit all the info
to new listeners.

Use the listeners for devices as well.
This commit is contained in:
Wim Taymans 2019-03-01 12:00:42 +01:00
parent 61ce4e77f6
commit 0390969228
53 changed files with 1774 additions and 1307 deletions

View file

@ -63,13 +63,12 @@ struct impl {
struct props props;
const struct spa_device_callbacks *callbacks;
void *callbacks_data;
struct spa_hook_list hooks;
struct spa_v4l2_device dev;
};
static int emit_info(struct impl *this)
static int emit_info(struct impl *this, bool full)
{
int res;
struct spa_dict_item items[6];
@ -96,22 +95,18 @@ static int emit_info(struct impl *this)
info.n_params = SPA_N_ELEMENTS(params);
info.params = params;
if (this->callbacks->info)
this->callbacks->info(this->callbacks_data, &info);
spa_device_emit_info(&this->hooks, &info);
if (this->callbacks->object_info) {
if (spa_v4l2_is_capture(&this->dev)) {
struct spa_device_object_info oinfo;
if (spa_v4l2_is_capture(&this->dev)) {
struct spa_device_object_info oinfo;
oinfo = SPA_DEVICE_OBJECT_INFO_INIT();
oinfo.type = SPA_TYPE_INTERFACE_Node;
oinfo.factory = &spa_v4l2_source_factory;
oinfo.change_mask = SPA_DEVICE_OBJECT_CHANGE_MASK_PROPS;
oinfo.props = &SPA_DICT_INIT(items, 6);
oinfo = SPA_DEVICE_OBJECT_INFO_INIT();
oinfo.type = SPA_TYPE_INTERFACE_Node;
oinfo.factory = &spa_v4l2_source_factory;
oinfo.change_mask = SPA_DEVICE_OBJECT_CHANGE_MASK_PROPS;
oinfo.props = &SPA_DICT_INIT(items, 6);
this->callbacks->object_info(this->callbacks_data, 0, &oinfo);
}
spa_device_emit_object_info(&this->hooks, 0, &oinfo);
}
spa_v4l2_close(&this->dev);
@ -119,23 +114,26 @@ static int emit_info(struct impl *this)
return 0;
}
static int impl_set_callbacks(struct spa_device *device,
const struct spa_device_callbacks *callbacks,
void *data)
static int impl_add_listener(struct spa_device *device,
struct spa_hook *listener,
const struct spa_device_events *events,
void *data)
{
struct impl *this;
struct spa_hook_list save;
int res = 0;
spa_return_val_if_fail(device != NULL, -EINVAL);
spa_return_val_if_fail(events != NULL, -EINVAL);
this = SPA_CONTAINER_OF(device, struct impl, device);
spa_hook_list_isolate(&this->hooks, &save, listener, events, data);
this->callbacks = callbacks;
this->callbacks_data = data;
if (events->info || events->object_info)
res = emit_info(this, true);
spa_hook_list_join(&this->hooks, &save);
if (callbacks) {
res = emit_info(this);
}
return res;
}
@ -155,7 +153,7 @@ static int impl_set_param(struct spa_device *device,
static const struct spa_device impl_device = {
SPA_VERSION_DEVICE,
impl_set_callbacks,
impl_add_listener,
impl_enum_params,
impl_set_param,
};
@ -217,6 +215,8 @@ impl_init(const struct spa_handle_factory *factory,
return -EINVAL;
}
spa_hook_list_init(&this->hooks);
this->device = impl_device;
this->dev.log = this->log;
this->dev.fd = -1;

View file

@ -113,6 +113,7 @@ struct port {
struct spa_source source;
uint64_t info_all;
struct spa_port_info info;
struct spa_io_buffers *io;
struct spa_io_sequence *control;
@ -127,10 +128,12 @@ struct impl {
struct spa_loop *main_loop;
struct spa_loop *data_loop;
uint64_t info_all;
struct spa_node_info info;
struct spa_param_info params[8];
struct props props;
struct spa_hook_list hooks;
const struct spa_node_callbacks *callbacks;
void *callbacks_data;
@ -157,13 +160,11 @@ static int impl_node_enum_params(struct spa_node *node, int seq,
uint8_t buffer[1024];
struct spa_result_node_params result;
uint32_t count = 0;
int res;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(num != 0, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(this->callbacks && this->callbacks->result, -EIO);
result.id = id;
result.next = start;
@ -228,8 +229,7 @@ static int impl_node_enum_params(struct spa_node *node, int seq,
if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
if ((res = this->callbacks->result(this->callbacks_data, seq, 0, &result)) != 0)
return res;
spa_node_emit_result(&this->hooks, seq, 0, &result);
if (++count != num)
goto next;
@ -329,23 +329,50 @@ static const struct spa_dict_item info_items[] = {
{ "node.driver", "true" },
};
static void emit_node_info(struct impl *this)
static void emit_node_info(struct impl *this, bool full)
{
if (this->callbacks && this->callbacks->info && this->info.change_mask) {
if (full)
this->info.change_mask = this->info_all;
if (this->info.change_mask) {
this->info.props = &SPA_DICT_INIT_ARRAY(info_items);
this->callbacks->info(this->callbacks_data, &this->info);
spa_node_emit_info(&this->hooks, &this->info);
this->info.change_mask = 0;
}
}
static void emit_port_info(struct impl *this, struct port *port)
static void emit_port_info(struct impl *this, struct port *port, bool full)
{
if (this->callbacks && this->callbacks->port_info && port->info.change_mask) {
this->callbacks->port_info(this->callbacks_data, SPA_DIRECTION_OUTPUT, 0, &port->info);
if (full)
port->info.change_mask = port->info_all;
if (port->info.change_mask) {
spa_node_emit_port_info(&this->hooks,
SPA_DIRECTION_OUTPUT, 0, &port->info);
port->info.change_mask = 0;
}
}
static int
impl_node_add_listener(struct spa_node *node,
struct spa_hook *listener,
const struct spa_node_events *events,
void *data)
{
struct impl *this;
struct spa_hook_list save;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_hook_list_isolate(&this->hooks, &save, listener, events, data);
emit_node_info(this, true);
emit_port_info(this, GET_OUT_PORT(this, 0), true);
spa_hook_list_join(&this->hooks, &save);
return 0;
}
static int impl_node_set_callbacks(struct spa_node *node,
const struct spa_node_callbacks *callbacks,
void *data)
@ -359,9 +386,6 @@ static int impl_node_set_callbacks(struct spa_node *node,
this->callbacks = callbacks;
this->callbacks_data = data;
emit_node_info(this);
emit_port_info(this, GET_OUT_PORT(this, 0));
return 0;
}
@ -451,7 +475,6 @@ static int impl_node_port_enum_params(struct spa_node *node, int seq,
spa_return_val_if_fail(num != 0, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
spa_return_val_if_fail(this->callbacks && this->callbacks->result, -EIO);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
@ -534,8 +557,7 @@ static int impl_node_port_enum_params(struct spa_node *node, int seq,
if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
if ((res = this->callbacks->result(this->callbacks_data, seq, 0, &result)) != 0)
return res;
spa_node_emit_result(&this->hooks, seq, 0, &result);
if (++count != num)
goto next;
@ -624,6 +646,7 @@ static int port_set_format(struct spa_node *node,
}
done:
port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
if (port->have_format) {
port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READWRITE);
port->params[5] = SPA_PARAM_INFO(SPA_PARAM_Buffers, SPA_PARAM_INFO_READ);
@ -631,7 +654,7 @@ static int port_set_format(struct spa_node *node,
port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
port->params[5] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
}
emit_port_info(this, port);
emit_port_info(this, port, false);
return 0;
}
@ -879,6 +902,7 @@ static int impl_node_process(struct spa_node *node)
static const struct spa_node impl_node = {
SPA_VERSION_NODE,
.add_listener = impl_node_add_listener,
.set_callbacks = impl_node_set_callbacks,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
@ -963,10 +987,14 @@ impl_init(const struct spa_handle_factory *factory,
}
this->node = impl_node;
spa_hook_list_init(&this->hooks);
this->info_all = SPA_NODE_CHANGE_MASK_FLAGS |
SPA_NODE_CHANGE_MASK_PROPS |
SPA_NODE_CHANGE_MASK_PARAMS;
this->info = SPA_NODE_INFO_INIT();
this->info.max_output_ports = 1;
this->info.change_mask = SPA_NODE_CHANGE_MASK_PROPS;
this->info.flags = SPA_NODE_FLAG_RT;
this->params[0] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
this->params[1] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
this->info.params = this->params;
@ -975,8 +1003,9 @@ impl_init(const struct spa_handle_factory *factory,
port = GET_OUT_PORT(this, 0);
spa_list_init(&port->queue);
port->info_all = SPA_PORT_CHANGE_MASK_FLAGS |
SPA_PORT_CHANGE_MASK_PARAMS;
port->info = SPA_PORT_INFO_INIT();
port->info.change_mask = SPA_PORT_CHANGE_MASK_FLAGS;
port->info.flags = SPA_PORT_FLAG_LIVE |
SPA_PORT_FLAG_PHYSICAL |
SPA_PORT_FLAG_TERMINAL;

View file

@ -813,8 +813,7 @@ spa_v4l2_enum_format(struct impl *this, int seq,
spa_pod_builder_pop(&b, &f[1]);
result.param = spa_pod_builder_pop(&b, &f[0]);
if ((res = this->callbacks->result(this->callbacks_data, seq, 0, &result)) != 0)
goto exit;
spa_node_emit_result(&this->hooks, seq, 0, &result);
if (++count != num)
goto next;
@ -927,9 +926,10 @@ static int spa_v4l2_set_format(struct impl *this, struct spa_video_info *format,
SPA_PORT_FLAG_LIVE |
SPA_PORT_FLAG_PHYSICAL |
SPA_PORT_FLAG_TERMINAL;
port->info.rate = streamparm.parm.capture.timeperframe.denominator;
if (this->callbacks && this->callbacks->port_info)
this->callbacks->port_info(this->callbacks_data, SPA_DIRECTION_OUTPUT, 0, &port->info);
port->info.rate = SPA_FRACTION(port->rate.num, port->rate.denom);
spa_node_emit_port_info(&this->hooks, SPA_DIRECTION_OUTPUT, 0, &port->info);
port->info.change_mask = 0;
return 0;
}
@ -1143,15 +1143,13 @@ spa_v4l2_enum_controls(struct impl *this, int seq,
if (spa_pod_filter(&b, &result.param, param, filter) < 0)
goto next;
if ((res = this->callbacks->result(this->callbacks_data, seq, 0, &result)) != 0)
goto exit;
spa_node_emit_result(&this->hooks, seq, 0, &result);
if (++count != num)
goto next;
enum_end:
res = 0;
exit:
spa_v4l2_close(dev);
return res;
}