loop: always place the invoke item in the queue

Always place the invoke item in the queue and then either signal the
other thread or flush the queue when not already flushing.
This commit is contained in:
Wim Taymans 2020-11-16 15:16:20 +01:00
parent 9f9be7d7f2
commit 0d9cc9e36e

View file

@ -136,6 +136,7 @@ static void flush_items(struct impl *impl)
item = SPA_MEMBER(impl->buffer_data, index & (DATAS_SIZE - 1), struct invoke_item);
block = item->block;
spa_log_trace(impl->log, NAME " %p: flush item %p", impl, item);
item->res = item->func ? item->func(&impl->loop,
true, item->seq, item->data, item->size,
item->user_data) : 0;
@ -164,71 +165,70 @@ loop_invoke(void *object,
bool in_thread = pthread_equal(impl->thread, pthread_self());
struct invoke_item *item;
int res;
int32_t filled;
uint32_t avail, idx, offset, l0;
if (in_thread && !impl->flushing) {
flush_items(impl);
res = func ? func(&impl->loop, false, seq, data, size, user_data) : 0;
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > DATAS_SIZE) {
spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled);
return -EPIPE;
}
avail = DATAS_SIZE - filled;
if (avail < sizeof(struct invoke_item)) {
spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail);
return -EPIPE;
}
offset = idx & (DATAS_SIZE - 1);
l0 = DATAS_SIZE - offset;
item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item);
item->func = func;
item->seq = seq;
item->size = size;
item->block = block;
item->user_data = user_data;
spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled);
if (l0 > sizeof(struct invoke_item) + size) {
item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void);
item->item_size = sizeof(struct invoke_item) + size;
if (l0 < sizeof(struct invoke_item) + item->item_size)
item->item_size = l0;
} else {
int32_t filled;
uint32_t avail, idx, offset, l0;
item->data = impl->buffer_data;
item->item_size = l0 + size;
}
memcpy(item->data, data, size);
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > DATAS_SIZE) {
spa_log_warn(impl->log, NAME " %p: queue xrun %d", impl, filled);
return -EPIPE;
}
avail = DATAS_SIZE - filled;
if (avail < sizeof(struct invoke_item)) {
spa_log_warn(impl->log, NAME " %p: queue full %d", impl, avail);
return -EPIPE;
}
offset = idx & (DATAS_SIZE - 1);
l0 = DATAS_SIZE - offset;
item = SPA_MEMBER(impl->buffer_data, offset, struct invoke_item);
item->func = func;
item->seq = seq;
item->size = size;
item->block = block;
item->user_data = user_data;
spa_log_trace(impl->log, NAME " %p: add item %p filled:%d", impl, item, filled);
if (l0 > sizeof(struct invoke_item) + size) {
item->data = SPA_MEMBER(item, sizeof(struct invoke_item), void);
item->item_size = sizeof(struct invoke_item) + size;
if (l0 < sizeof(struct invoke_item) + item->item_size)
item->item_size = l0;
} else {
item->data = impl->buffer_data;
item->item_size = l0 + size;
}
memcpy(item->data, data, size);
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
if (in_thread) {
if (!impl->flushing)
flush_items(impl);
} else {
loop_signal_event(impl, impl->wakeup);
}
if (block) {
uint64_t count = 1;
if (block) {
uint64_t count = 1;
spa_loop_control_hook_before(&impl->hooks_list);
spa_loop_control_hook_before(&impl->hooks_list);
if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0)
spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s",
impl, spa_strerror(res));
if ((res = spa_system_eventfd_read(impl->system, impl->ack_fd, &count)) < 0)
spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s",
impl, spa_strerror(res));
spa_loop_control_hook_after(&impl->hooks_list);
spa_loop_control_hook_after(&impl->hooks_list);
res = item->res;
}
else {
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = 0;
}
res = item->res;
}
else {
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = 0;
}
return res;
}