module-profiler: use per-driver ringbuffer

Make a ringbuffer per driver because each driver might run in a
different threads and because a shared ringbuffer can not be written to
from multiple threads. Assemble all the driver stats into one buffer
before sending out the profile info.
This commit is contained in:
Wim Taymans 2023-07-19 10:56:21 +02:00
parent 7ae59ff399
commit fba6083aaa

View file

@ -52,7 +52,8 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME);
#define PW_LOG_TOPIC_DEFAULT mod_topic
#define TMP_BUFFER (16 * 1024)
#define MAX_BUFFER (8 * 1024 * 1024)
#define DATA_BUFFER (32 * 1024)
#define FLUSH_BUFFER (8 * 1024 * 1024)
#define MIN_FLUSH (16 * 1024)
#define DEFAULT_IDLE 5
#define DEFAULT_INTERVAL 1
@ -78,6 +79,11 @@ struct node {
struct pw_impl_node *node;
struct spa_hook node_rt_listener;
int64_t count;
struct spa_ringbuffer buffer;
uint8_t tmp[TMP_BUFFER];
uint8_t data[DATA_BUFFER];
unsigned enabled:1;
};
@ -96,17 +102,13 @@ struct impl {
struct spa_list node_list;
int64_t count;
uint32_t busy;
uint32_t empty;
struct spa_source *flush_timeout;
unsigned int flushing:1;
unsigned int listening:1;
struct spa_ringbuffer buffer;
uint8_t tmp[TMP_BUFFER];
uint8_t data[MAX_BUFFER];
uint8_t flush[MAX_BUFFER + sizeof(struct spa_pod_struct)];
uint8_t flush[FLUSH_BUFFER + sizeof(struct spa_pod_struct)];
};
struct resource_data {
@ -120,6 +122,9 @@ static void start_flush(struct impl *impl)
{
struct timespec value, interval;
if (impl->flushing)
return;
value.tv_sec = 0;
value.tv_nsec = 1;
interval.tv_sec = DEFAULT_INTERVAL;
@ -148,37 +153,49 @@ static void stop_flush(struct impl *impl)
static void flush_timeout(void *data, uint64_t expirations)
{
struct impl *impl = data;
int32_t avail;
uint32_t idx;
struct spa_pod_struct *p;
struct pw_resource *resource;
struct node *n;
uint32_t total = 0;
struct spa_pod_struct *p;
avail = spa_ringbuffer_get_read_index(&impl->buffer, &idx);
p = (struct spa_pod_struct *)impl->flush;
pw_log_trace("%p avail %d", impl, avail);
spa_list_for_each(n, &impl->node_list, link) {
int32_t avail;
uint32_t idx;
if (avail <= 0) {
avail = spa_ringbuffer_get_read_index(&n->buffer, &idx);
pw_log_trace("%p avail %d", impl, avail);
if (avail > 0) {
spa_ringbuffer_read_data(&n->buffer, n->data, DATA_BUFFER,
idx % DATA_BUFFER,
SPA_PTROFF(p, sizeof(struct spa_pod_struct) + total, void),
avail);
spa_ringbuffer_read_update(&n->buffer, idx + avail);
total += avail;
}
}
if (total <= 0) {
if (++impl->empty == DEFAULT_IDLE)
stop_flush(impl);
return;
} else {
impl->empty = 0;
}
impl->empty = 0;
p = (struct spa_pod_struct *)impl->flush;
*p = SPA_POD_INIT_Struct(avail);
spa_ringbuffer_read_data(&impl->buffer, impl->data, MAX_BUFFER,
idx % MAX_BUFFER,
SPA_PTROFF(p, sizeof(struct spa_pod_struct), void), avail);
spa_ringbuffer_read_update(&impl->buffer, idx + avail);
*p = SPA_POD_INIT_Struct(total);
spa_list_for_each(resource, &impl->global->resource_list, link)
pw_profiler_resource_profile(resource, &p->pod);
}
static void context_do_profile(void *data, struct pw_impl_node *node)
static void context_do_profile(void *data)
{
struct impl *impl = data;
struct node *n = data;
struct pw_impl_node *node = n->node;
struct impl *impl = n->impl;
struct spa_pod_builder b;
struct spa_pod_frame f[2];
uint32_t id = node->info.id;
@ -191,13 +208,13 @@ static void context_do_profile(void *data, struct pw_impl_node *node)
if (SPA_FLAG_IS_SET(pos->clock.flags, SPA_IO_CLOCK_FLAG_FREEWHEEL))
return;
spa_pod_builder_init(&b, impl->tmp, sizeof(impl->tmp));
spa_pod_builder_init(&b, n->tmp, sizeof(n->tmp));
spa_pod_builder_push_object(&b, &f[0],
SPA_TYPE_OBJECT_Profiler, 0);
spa_pod_builder_prop(&b, SPA_PROFILER_info, 0);
spa_pod_builder_add_struct(&b,
SPA_POD_Long(impl->count),
SPA_POD_Long(n->count),
SPA_POD_Float(a->cpu_load[0]),
SPA_POD_Float(a->cpu_load[1]),
SPA_POD_Float(a->cpu_load[2]),
@ -264,41 +281,34 @@ static void context_do_profile(void *data, struct pw_impl_node *node)
}
spa_pod_builder_pop(&b, &f[0]);
if (b.state.offset > sizeof(impl->tmp))
if (b.state.offset > sizeof(n->tmp))
goto done;
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > MAX_BUFFER) {
filled = spa_ringbuffer_get_write_index(&n->buffer, &idx);
if (filled < 0 || filled > DATA_BUFFER) {
pw_log_warn("%p: queue xrun %d", impl, filled);
goto done;
}
avail = MAX_BUFFER - filled;
avail = DATA_BUFFER - filled;
if (avail < b.state.offset) {
pw_log_warn("%p: queue full %d < %d", impl, avail, b.state.offset);
goto done;
}
spa_ringbuffer_write_data(&impl->buffer,
impl->data, MAX_BUFFER,
idx % MAX_BUFFER,
spa_ringbuffer_write_data(&n->buffer,
n->data, DATA_BUFFER,
idx % DATA_BUFFER,
b.data, b.state.offset);
spa_ringbuffer_write_update(&impl->buffer, idx + b.state.offset);
spa_ringbuffer_write_update(&n->buffer, idx + b.state.offset);
if (!impl->flushing || filled + b.state.offset > MIN_FLUSH)
start_flush(impl);
start_flush(impl);
done:
impl->count++;
}
static void node_complete(void *data)
{
struct node *n = data;
pw_log_info("complete");
context_do_profile(n->impl, n->node);
n->count++;
}
static struct pw_impl_node_rt_events node_rt_events = {
PW_VERSION_IMPL_NODE_RT_EVENTS,
.complete = node_complete,
.complete = context_do_profile,
.incomplete = context_do_profile,
};
static void enable_node_profiling(struct node *n, bool enabled)
@ -332,6 +342,7 @@ static void context_driver_added(void *data, struct pw_impl_node *node)
n->impl = impl;
n->node = node;
spa_list_append(&impl->node_list, &n->link);
spa_ringbuffer_init(&n->buffer);
if (impl->busy > 0)
enable_node_profiling(n, true);
@ -489,8 +500,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->main_loop = pw_context_get_main_loop(impl->context);
impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(impl->context));
spa_ringbuffer_init(&impl->buffer);
impl->global = pw_global_new(context,
PW_TYPE_INTERFACE_Profiler,
PW_VERSION_PROFILER,