mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
filter: make sure to sync the position with data thread
This commit is contained in:
parent
8141c92dde
commit
ac9f2a020f
2 changed files with 38 additions and 6 deletions
|
|
@ -123,6 +123,10 @@ struct filter {
|
|||
struct spa_callbacks callbacks;
|
||||
struct spa_io_position *position;
|
||||
|
||||
struct {
|
||||
struct spa_io_position *position;
|
||||
} rt;
|
||||
|
||||
struct spa_list port_list;;
|
||||
struct port *ports[2][MAX_PORTS];
|
||||
|
||||
|
|
@ -322,6 +326,15 @@ static bool filter_set_state(struct pw_filter *filter, enum pw_filter_state stat
|
|||
return res;
|
||||
}
|
||||
|
||||
static int
|
||||
do_set_position(struct spa_loop *loop,
|
||||
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
|
||||
{
|
||||
struct filter *impl = user_data;
|
||||
impl->rt.position = impl->position;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
|
||||
{
|
||||
struct filter *impl = object;
|
||||
|
|
@ -334,6 +347,8 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
|
|||
impl->position = data;
|
||||
else
|
||||
impl->position = NULL;
|
||||
pw_loop_invoke(impl->context->data_loop,
|
||||
do_set_position, 1, NULL, 0, true, impl);
|
||||
break;
|
||||
}
|
||||
pw_filter_emit_io_changed(&impl->this, NULL, id, data, size);
|
||||
|
|
@ -717,7 +732,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
|
|||
|
||||
static inline void copy_position(struct filter *impl)
|
||||
{
|
||||
struct spa_io_position *p = impl->position;
|
||||
struct spa_io_position *p = impl->rt.position;
|
||||
if (p != NULL) {
|
||||
SEQ_WRITE(impl->seq);
|
||||
impl->time.now = p->clock.nsec;
|
||||
|
|
@ -741,9 +756,10 @@ do_call_process(struct spa_loop *loop,
|
|||
|
||||
static void call_process(struct filter *impl)
|
||||
{
|
||||
struct pw_filter *filter = &impl->this;
|
||||
pw_log_trace(NAME" %p: call process", impl);
|
||||
if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) {
|
||||
do_call_process(NULL, false, 1, NULL, 0, impl);
|
||||
pw_filter_emit_process(filter, impl->rt.position);
|
||||
}
|
||||
else {
|
||||
pw_loop_invoke(impl->context->main_loop,
|
||||
|
|
@ -776,7 +792,7 @@ static int impl_node_process(void *object)
|
|||
struct buffer *b;
|
||||
bool drained = true;
|
||||
|
||||
pw_log_trace(NAME" %p: do process %p", impl, impl->position);
|
||||
pw_log_trace(NAME" %p: do process %p", impl, impl->rt.position);
|
||||
|
||||
/** first dequeue and recycle buffers */
|
||||
spa_list_for_each(p, &impl->port_list, link) {
|
||||
|
|
|
|||
|
|
@ -104,8 +104,12 @@ struct stream {
|
|||
struct spa_node_methods node_methods;
|
||||
struct spa_hook_list hooks;
|
||||
struct spa_callbacks callbacks;
|
||||
struct spa_io_buffers *io;
|
||||
|
||||
struct spa_io_position *position;
|
||||
struct spa_io_buffers *io;
|
||||
struct {
|
||||
struct spa_io_position *position;
|
||||
} rt;
|
||||
|
||||
uint32_t change_mask_all;
|
||||
struct spa_port_info port_info;
|
||||
|
|
@ -312,9 +316,10 @@ do_call_process(struct spa_loop *loop,
|
|||
|
||||
static void call_process(struct stream *impl)
|
||||
{
|
||||
struct pw_stream *stream = &impl->this;
|
||||
pw_log_trace(NAME" %p: call process", impl);
|
||||
if (SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
|
||||
do_call_process(NULL, false, 1, NULL, 0, impl);
|
||||
pw_stream_emit_process(stream);
|
||||
}
|
||||
else {
|
||||
pw_loop_invoke(impl->context->main_loop,
|
||||
|
|
@ -339,6 +344,15 @@ static void call_drained(struct stream *impl)
|
|||
do_call_drained, 1, NULL, 0, false, impl);
|
||||
}
|
||||
|
||||
static int
|
||||
do_set_position(struct spa_loop *loop,
|
||||
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
|
||||
{
|
||||
struct stream *impl = user_data;
|
||||
impl->rt.position = impl->position;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
|
||||
{
|
||||
struct stream *impl = object;
|
||||
|
|
@ -353,6 +367,8 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
|
|||
impl->position = data;
|
||||
else
|
||||
impl->position = NULL;
|
||||
pw_loop_invoke(impl->context->data_loop,
|
||||
do_set_position, 1, NULL, 0, true, impl);
|
||||
break;
|
||||
}
|
||||
pw_stream_emit_io_changed(stream, id, data, size);
|
||||
|
|
@ -699,7 +715,7 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
|
|||
|
||||
static inline void copy_position(struct stream *impl, int64_t queued)
|
||||
{
|
||||
struct spa_io_position *p = impl->position;
|
||||
struct spa_io_position *p = impl->rt.position;
|
||||
if (p != NULL) {
|
||||
SEQ_WRITE(impl->seq);
|
||||
impl->time.now = p->clock.nsec;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue