filter: support pw_filter_trigger_process()

Add pw_filter_trigger_process() as a way to implement a driver node.
Also add a function to check if a filter is a driver.
This commit is contained in:
Wim Taymans 2023-02-13 17:58:42 +01:00
parent c6a977f793
commit 2865e40618
2 changed files with 77 additions and 31 deletions

View file

@ -133,6 +133,7 @@ struct filter {
struct spa_node impl_node;
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
struct spa_io_clock *clock;
struct spa_io_position *position;
struct {
@ -168,6 +169,7 @@ struct filter {
unsigned int allow_mlock:1;
unsigned int warn_mlock:1;
unsigned int process_rt:1;
unsigned int driving:1;
};
static int get_param_index(uint32_t id)
@ -475,6 +477,12 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
pw_log_debug("%p: io %d %p/%zd", impl, id, data, size);
switch(id) {
case SPA_IO_Clock:
if (data && size >= sizeof(struct spa_io_clock))
impl->clock = data;
else
impl->clock = NULL;
break;
case SPA_IO_Position:
if (data && size >= sizeof(struct spa_io_position))
impl->position = data;
@ -484,6 +492,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
do_set_position, 1, NULL, 0, true, impl);
break;
}
impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id;
pw_filter_emit_io_changed(&impl->this, NULL, id, data, size);
return 0;
@ -1749,22 +1758,23 @@ error_cleanup:
return NULL;
}
static inline void free_port(struct filter *impl, struct port *port)
{
spa_list_remove(&port->link);
spa_node_emit_port_info(&impl->hooks, port->direction, port->id, NULL);
pw_map_remove(&impl->ports[port->direction], port->id);
clear_buffers(port);
clear_params(impl, port, SPA_ID_INVALID);
pw_properties_free(port->props);
free(port);
}
SPA_EXPORT
int pw_filter_remove_port(void *port_data)
{
struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data);
struct filter *impl = port->filter;
spa_node_emit_port_info(&impl->hooks, port->direction, port->id, NULL);
spa_list_remove(&port->link);
pw_map_remove(&impl->ports[port->direction], port->id);
clear_buffers(port);
clear_params(impl, port, SPA_ID_INVALID);
pw_properties_free(port->props);
free(port);
free_port(impl, port);
return 0;
}
@ -1844,25 +1854,6 @@ int pw_filter_get_time(struct pw_filter *filter, struct pw_time *time)
return 0;
}
static int
do_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct filter *impl = user_data;
int res = impl_node_process(impl);
return spa_node_call_ready(&impl->callbacks, res);
}
static inline int call_trigger(struct filter *impl)
{
int res = 0;
if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_DRIVER)) {
res = pw_loop_invoke(impl->context->data_loop,
do_process, 1, NULL, 0, false, impl);
}
return res;
}
SPA_EXPORT
struct pw_buffer *pw_filter_dequeue_buffer(void *port_data)
{
@ -1894,7 +1885,7 @@ int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer)
if ((res = push_queue(p, &p->queued, b)) < 0)
return res;
return call_trigger(impl);
return res;
}
SPA_EXPORT
@ -1960,3 +1951,48 @@ int pw_filter_flush(struct pw_filter *filter, bool drain)
drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
return 0;
}
SPA_EXPORT
bool pw_filter_is_driving(struct pw_filter *filter)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
return impl->driving;
}
static int
do_trigger_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct filter *impl = user_data;
int res = impl_node_process(impl);
return spa_node_call_ready(&impl->callbacks, res);
}
static int trigger_request_process(struct filter *impl)
{
uint8_t buffer[1024];
struct spa_pod_builder b = { 0 };
spa_pod_builder_init(&b, buffer, sizeof(buffer));
spa_node_emit_event(&impl->hooks,
spa_pod_builder_add_object(&b,
SPA_TYPE_EVENT_Node, SPA_NODE_EVENT_RequestProcess));
return 0;
}
SPA_EXPORT
int pw_filter_trigger_process(struct pw_filter *filter)
{
struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this);
int res = 0;
pw_log_trace_fp("%p", impl);
if (!impl->driving) {
res = trigger_request_process(impl);
} else {
res = pw_loop_invoke(impl->context->data_loop,
do_trigger_process, 1, NULL, 0, false, impl);
}
return res;
}

View file

@ -239,6 +239,16 @@ int pw_filter_set_active(struct pw_filter *filter, bool active);
* be called when all data is played or recorded */
int pw_filter_flush(struct pw_filter *filter, bool drain);
/** Check if the filter is driving. The filter needs to have the
* PW_FILTER_FLAG_DRIVER set. When the filter is driving,
* pw_filter_trigger_process() needs to be called when data is
* available (output) or needed (input). Since 0.3.66 */
bool pw_filter_is_driving(struct pw_filter *filter);
/** Trigger a push/pull on the filter. One iteration of the graph will
* be scheduled and process() will be called. Since 0.3.66 */
int pw_filter_trigger_process(struct pw_filter *filter);
/**
* \}
*/