mixer: rework the control mixers to use the parser

Using the parser for the spa_pod_sequence in the data buffers is
required in order to safely read the pods while there could be
concurrent writes.

See #4822
This commit is contained in:
Wim Taymans 2025-07-29 15:28:48 +02:00
parent e317edcfb9
commit 3785896533
3 changed files with 117 additions and 85 deletions

View file

@ -233,6 +233,13 @@ struct buffer {
uint32_t n_mem; uint32_t n_mem;
}; };
struct mix_info {
struct spa_pod_parser parser;
struct spa_pod_frame frame;
struct spa_pod_control control;
const void *control_body;
};
struct mix { struct mix {
struct spa_list link; struct spa_list link;
struct spa_list port_link; struct spa_list port_link;
@ -247,6 +254,8 @@ struct mix {
struct buffer buffers[MAX_BUFFERS]; struct buffer buffers[MAX_BUFFERS];
uint32_t n_buffers; uint32_t n_buffers;
struct mix_info mix_info;
unsigned int to_free:1; unsigned int to_free:1;
}; };
@ -1492,7 +1501,8 @@ static inline int event_compare(uint8_t s1, uint8_t s2)
return priotab[(s2>>4) & 7] - priotab[(s1>>4) & 7]; return priotab[(s2>>4) & 7] - priotab[(s1>>4) & 7];
} }
static inline int event_sort(struct spa_pod_control *a, struct spa_pod_control *b) static inline int event_sort(struct spa_pod_control *a, const void *abody,
struct spa_pod_control *b, const void *bbody)
{ {
if (a->offset < b->offset) if (a->offset < b->offset)
return -1; return -1;
@ -1503,14 +1513,14 @@ static inline int event_sort(struct spa_pod_control *a, struct spa_pod_control *
switch(a->type) { switch(a->type) {
case SPA_CONTROL_Midi: case SPA_CONTROL_Midi:
{ {
uint8_t *sa = SPA_POD_BODY(&a->value), *sb = SPA_POD_BODY(&b->value); const uint8_t *sa = abody, *sb = bbody;
if (SPA_POD_BODY_SIZE(&a->value) < 1 || SPA_POD_BODY_SIZE(&b->value) < 1) if (SPA_POD_BODY_SIZE(&a->value) < 1 || SPA_POD_BODY_SIZE(&b->value) < 1)
return 0; return 0;
return event_compare(sa[0], sb[0]); return event_compare(sa[0], sb[0]);
} }
case SPA_CONTROL_UMP: case SPA_CONTROL_UMP:
{ {
uint32_t *sa = SPA_POD_BODY(&a->value), *sb = SPA_POD_BODY(&b->value); const uint32_t *sa = abody, *sb = bbody;
if (SPA_POD_BODY_SIZE(&a->value) < 4 || SPA_POD_BODY_SIZE(&b->value) < 4) if (SPA_POD_BODY_SIZE(&a->value) < 4 || SPA_POD_BODY_SIZE(&b->value) < 4)
return 0; return 0;
if ((sa[0] >> 28) != 2 || (sa[0] >> 28) != 4 || if ((sa[0] >> 28) != 2 || (sa[0] >> 28) != 4 ||
@ -1598,56 +1608,54 @@ static inline int midi_event_write(void *port_buffer,
return 0; return 0;
} }
static void convert_to_event(struct spa_pod_sequence **seq, uint32_t n_seq, void *midi, bool fix, uint32_t type) static void convert_to_event(struct mix_info **mix, uint32_t n_mix, void *midi, bool fix, uint32_t type)
{ {
struct spa_pod_control *c[n_seq];
uint64_t state = 0; uint64_t state = 0;
uint32_t i; uint32_t i;
int res = 0; int res = 0;
bool in_sysex = false; bool in_sysex = false;
for (i = 0; i < n_seq; i++)
c[i] = spa_pod_control_first(&seq[i]->body);
while (true) { while (true) {
struct spa_pod_control *next = NULL; struct mix_info *next = NULL;
uint32_t next_index = 0; uint32_t next_index = 0;
struct spa_pod_control *control;
size_t size;
uint8_t *data;
for (i = 0; i < n_seq; i++) { for (i = 0; i < n_mix; i++) {
if (!spa_pod_control_is_inside(&seq[i]->body, struct mix_info *m = mix[i];
SPA_POD_BODY_SIZE(seq[i]), c[i])) if (next == NULL || event_sort(&m->control, m->control_body,
continue; &next->control, next->control_body) <= 0) {
next = m;
if (next == NULL || event_sort(c[i], next) <= 0) {
next = c[i];
next_index = i; next_index = i;
} }
} }
if (SPA_UNLIKELY(next == NULL)) if (SPA_UNLIKELY(next == NULL))
break; break;
switch(next->type) { control = &next->control;
data = (uint8_t*)next->control_body;
size = SPA_POD_BODY_SIZE(&control->value);
switch(control->type) {
case SPA_CONTROL_OSC: case SPA_CONTROL_OSC:
if (!TYPE_ID_CAN_OSC(type)) if (!TYPE_ID_CAN_OSC(type))
break; break;
SPA_FALLTHROUGH; SPA_FALLTHROUGH;
case SPA_CONTROL_Midi: case SPA_CONTROL_Midi:
{ {
uint8_t *data = SPA_POD_BODY(&next->value);
size_t size = SPA_POD_BODY_SIZE(&next->value);
if (type == TYPE_ID_UMP) { if (type == TYPE_ID_UMP) {
while (size > 0) { while (size > 0) {
uint32_t ump[4]; uint32_t ump[4];
int ump_size = spa_ump_from_midi(&data, &size, ump, sizeof(ump), 0, &state); int ump_size = spa_ump_from_midi(&data, &size, ump, sizeof(ump), 0, &state);
if (ump_size <= 0) if (ump_size <= 0)
break; break;
if ((res = midi_event_write(midi, next->offset, if ((res = midi_event_write(midi, control->offset,
(uint8_t*)ump, ump_size, false)) < 0) (uint8_t*)ump, ump_size, false)) < 0)
break; break;
} }
} else { } else {
res = midi_event_write(midi, next->offset, data, size, fix); res = midi_event_write(midi, control->offset, data, size, fix);
} }
if (res < 0) if (res < 0)
pw_log_warn("midi %p: can't write event: %s", midi, pw_log_warn("midi %p: can't write event: %s", midi,
@ -1656,13 +1664,12 @@ static void convert_to_event(struct spa_pod_sequence **seq, uint32_t n_seq, void
} }
case SPA_CONTROL_UMP: case SPA_CONTROL_UMP:
{ {
void *data = SPA_POD_BODY(&next->value);
size_t size = SPA_POD_BODY_SIZE(&next->value);
uint8_t ev[32]; uint8_t ev[32];
bool was_sysex = in_sysex; bool was_sysex = in_sysex;
if (type == TYPE_ID_MIDI) { if (type == TYPE_ID_MIDI) {
int ev_size = spa_ump_to_midi(data, size, ev, sizeof(ev)); uint32_t *d = (uint32_t*)data;
int ev_size = spa_ump_to_midi(d, size, ev, sizeof(ev));
if (ev_size <= 0) if (ev_size <= 0)
break; break;
@ -1680,14 +1687,18 @@ static void convert_to_event(struct spa_pod_sequence **seq, uint32_t n_seq, void
if (was_sysex) if (was_sysex)
res = midi_event_append(midi, data, size); res = midi_event_append(midi, data, size);
else else
res = midi_event_write(midi, next->offset, data, size, fix); res = midi_event_write(midi, control->offset, data, size, fix);
if (res < 0) if (res < 0)
pw_log_warn("midi %p: can't write event: %s", midi, pw_log_warn("midi %p: can't write event: %s", midi,
spa_strerror(res)); spa_strerror(res));
} }
} }
c[next_index] = spa_pod_control_next(c[next_index]); if (spa_pod_parser_get_control_body(&next->parser,
&next->control, &next->control_body) < 0) {
spa_pod_parser_pop(&next->parser, &next->frame);
mix[next_index] = mix[--n_mix];
}
} }
} }
@ -5790,13 +5801,15 @@ static void *get_buffer_input_midi(struct port *p, jack_nframes_t frames)
struct mix *mix; struct mix *mix;
void *ptr = p->emptyptr; void *ptr = p->emptyptr;
struct midi_buffer *mb = (struct midi_buffer*)midi_scratch; struct midi_buffer *mb = (struct midi_buffer*)midi_scratch;
struct spa_pod_sequence *seq[MAX_MIX]; struct mix_info *mix_info[MAX_MIX];
uint32_t n_seq = 0; uint32_t n_mix_info = 0;
spa_list_for_each(mix, &p->mix, port_link) { spa_list_for_each(mix, &p->mix, port_link) {
struct spa_data *d; struct spa_data *d;
struct buffer *b; struct buffer *b;
void *pod; struct mix_info *mi = &mix->mix_info;
struct spa_pod_sequence seq;
const void *seq_body;
if (mix->id == SPA_ID_INVALID) if (mix->id == SPA_ID_INVALID)
continue; continue;
@ -5808,21 +5821,24 @@ static void *get_buffer_input_midi(struct port *p, jack_nframes_t frames)
continue; continue;
d = &b->datas[0]; d = &b->datas[0];
spa_pod_parser_init_from_data(&mi->parser, d->data, d->maxsize,
d->chunk->offset, d->chunk->size);
if (spa_pod_parser_push_sequence_body(&mi->parser,
&mi->frame, &seq, &seq_body) < 0)
continue;
if (spa_pod_parser_get_control_body(&mi->parser,
&mi->control, &mi->control_body) < 0)
continue;
if ((pod = spa_pod_from_data(d->data, d->maxsize, d->chunk->offset, d->chunk->size)) == NULL) mix_info[n_mix_info++] = mi;
continue; if (n_mix_info == MAX_MIX)
if (!spa_pod_is_sequence(pod))
continue;
seq[n_seq++] = pod;
if (n_seq == MAX_MIX)
break; break;
} }
midi_init_buffer(mb, MIDI_SCRATCH_FRAMES, frames); midi_init_buffer(mb, MIDI_SCRATCH_FRAMES, frames);
/* first convert to a thread local scratch buffer, then memcpy into /* first convert to a thread local scratch buffer, then memcpy into
* the per port buffer. This makes it possible to call this function concurrently * the per port buffer. This makes it possible to call this function concurrently
* but also have different pointers per port */ * but also have different pointers per port */
convert_to_event(seq, n_seq, mb, p->client->fix_midi_events, p->object->port.type_id); convert_to_event(mix_info, n_mix_info, mb, p->client->fix_midi_events, p->object->port.type_id);
memcpy(ptr, mb, sizeof(struct midi_buffer) + (mb->event_count memcpy(ptr, mb, sizeof(struct midi_buffer) + (mb->event_count
* sizeof(struct midi_event))); * sizeof(struct midi_event)));
if (mb->write_pos > 0) { if (mb->write_pos > 0) {
@ -5889,21 +5905,26 @@ void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames)
goto done; goto done;
if (TYPE_ID_IS_EVENT(o->port.type_id)) { if (TYPE_ID_IS_EVENT(o->port.type_id)) {
struct spa_pod_sequence *seq[1]; struct mix_info *mix_info[1], mi;
struct spa_data *d; struct spa_data *d;
void *pod; struct spa_pod_sequence seq;
const void *seq_body;
ptr = midi_scratch; ptr = midi_scratch;
midi_init_buffer(ptr, MIDI_SCRATCH_FRAMES, frames); midi_init_buffer(ptr, MIDI_SCRATCH_FRAMES, frames);
d = &b->datas[0]; d = &b->datas[0];
if ((pod = spa_pod_from_data(d->data, d->maxsize, spa_pod_parser_init_from_data(&mi.parser, d->data, d->maxsize,
d->chunk->offset, d->chunk->size)) == NULL) d->chunk->offset, d->chunk->size);
if (spa_pod_parser_push_sequence_body(&mi.parser,
&mi.frame, &seq, &seq_body) < 0)
goto done; goto done;
if (!spa_pod_is_sequence(pod)) if (spa_pod_parser_get_control_body(&mi.parser,
&mi.control, &mi.control_body) < 0)
goto done; goto done;
seq[0] = pod;
convert_to_event(seq, 1, ptr, c->fix_midi_events, o->port.type_id); mix_info[0] = &mi;
convert_to_event(mix_info, 1, ptr, c->fix_midi_events, o->port.type_id);
} else { } else {
ptr = get_buffer_data(b, frames); ptr = get_buffer_data(b, frames);
} }

View file

@ -61,6 +61,10 @@ struct port {
struct spa_list mix_link; struct spa_list mix_link;
bool active; bool active;
struct spa_pod_parser parser;
struct spa_pod_frame frame;
struct spa_pod_control control;
const void *control_body;
}; };
struct impl { struct impl {
@ -86,15 +90,13 @@ struct impl {
struct spa_list port_list; struct spa_list port_list;
struct spa_list free_list; struct spa_list free_list;
struct spa_pod_control *mix_ctrl[MAX_PORTS];
struct spa_pod_sequence *mix_seq[MAX_PORTS];
int n_formats; int n_formats;
unsigned int have_format:1; unsigned int have_format:1;
unsigned int started:1; unsigned int started:1;
struct spa_list mix_list; struct spa_list mix_list;
struct port *mix_ports[MAX_PORTS];
}; };
#define CHECK_ANY_IN(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == SPA_ID_INVALID) #define CHECK_ANY_IN(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) == SPA_ID_INVALID)
@ -655,7 +657,8 @@ static inline int event_compare(uint8_t s1, uint8_t s2)
return priotab[(s2>>4) & 7] - priotab[(s1>>4) & 7]; return priotab[(s2>>4) & 7] - priotab[(s1>>4) & 7];
} }
static inline int event_sort(struct spa_pod_control *a, struct spa_pod_control *b) static inline int event_sort(struct spa_pod_control *a, const void *abody,
struct spa_pod_control *b, const void *bbody)
{ {
if (a->offset < b->offset) if (a->offset < b->offset)
return -1; return -1;
@ -666,14 +669,14 @@ static inline int event_sort(struct spa_pod_control *a, struct spa_pod_control *
switch(a->type) { switch(a->type) {
case SPA_CONTROL_Midi: case SPA_CONTROL_Midi:
{ {
uint8_t *da = SPA_POD_BODY(&a->value), *db = SPA_POD_BODY(&b->value); const uint8_t *da = abody, *db = bbody;
if (SPA_POD_BODY_SIZE(&a->value) < 1 || SPA_POD_BODY_SIZE(&b->value) < 1) if (SPA_POD_BODY_SIZE(&a->value) < 1 || SPA_POD_BODY_SIZE(&b->value) < 1)
return 0; return 0;
return event_compare(da[0], db[0]); return event_compare(da[0], db[0]);
} }
case SPA_CONTROL_UMP: case SPA_CONTROL_UMP:
{ {
uint32_t *da = SPA_POD_BODY(&a->value), *db = SPA_POD_BODY(&b->value); const uint32_t *da = abody, *db = bbody;
if (SPA_POD_BODY_SIZE(&a->value) < 4 || SPA_POD_BODY_SIZE(&b->value) < 4) if (SPA_POD_BODY_SIZE(&a->value) < 4 || SPA_POD_BODY_SIZE(&b->value) < 4)
return 0; return 0;
if ((da[0] >> 28) != 2 || (da[0] >> 28) != 4 || if ((da[0] >> 28) != 2 || (da[0] >> 28) != 4 ||
@ -699,10 +702,9 @@ static int impl_node_process(void *object)
struct impl *this = object; struct impl *this = object;
struct port *outport, *inport; struct port *outport, *inport;
struct spa_io_buffers *outio; struct spa_io_buffers *outio;
uint32_t n_seq;
struct spa_pod_sequence **seq;
struct spa_pod_control **ctrl;
struct spa_pod_builder builder; struct spa_pod_builder builder;
uint32_t n_mix_ports;
struct port **mix_ports;
struct spa_pod_frame f; struct spa_pod_frame f;
struct buffer *outb; struct buffer *outb;
struct spa_data *d; struct spa_data *d;
@ -734,14 +736,14 @@ static int impl_node_process(void *object)
return -EPIPE; return -EPIPE;
} }
ctrl = this->mix_ctrl; mix_ports = this->mix_ports;
seq = this->mix_seq; n_mix_ports = 0;
n_seq = 0;
/* collect all sequence pod on input ports */ /* collect all sequence pod on input ports */
spa_list_for_each(inport, &this->mix_list, mix_link) { spa_list_for_each(inport, &this->mix_list, mix_link) {
struct spa_io_buffers *inio = inport->io[cycle]; struct spa_io_buffers *inio = inport->io[cycle];
void *pod; struct spa_pod_sequence seq;
const void *seq_body;
if (inio->buffer_id >= inport->n_buffers || if (inio->buffer_id >= inport->n_buffers ||
inio->status != SPA_STATUS_HAVE_DATA) { inio->status != SPA_STATUS_HAVE_DATA) {
@ -756,22 +758,25 @@ static int impl_node_process(void *object)
d = inport->buffers[inio->buffer_id].buffer->datas; d = inport->buffers[inio->buffer_id].buffer->datas;
if ((pod = spa_pod_from_data(d->data, d->maxsize, spa_pod_parser_init_from_data(&inport->parser, d->data, d->maxsize,
d->chunk->offset, d->chunk->size)) == NULL) { d->chunk->offset, d->chunk->size);
if (spa_pod_parser_push_sequence_body(&inport->parser,
&inport->frame, &seq, &seq_body) < 0) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d max:%u " spa_log_trace_fp(this->log, "%p: skip input idx:%d max:%u "
"offset:%u size:%u", this, inport->id, "offset:%u size:%u", this, inport->id,
d->maxsize, d->chunk->offset, d->chunk->size); d->maxsize, d->chunk->offset, d->chunk->size);
continue; continue;
} }
if (!spa_pod_is_sequence(pod)) { if (spa_pod_parser_get_control_body(&inport->parser,
spa_log_trace_fp(this->log, "%p: skip input idx:%d", this, inport->id); &inport->control, &inport->control_body) < 0) {
spa_log_trace_fp(this->log, "%p: skip input idx:%d %u", this, inport->id,
inport->parser.state.offset);
continue; continue;
} }
seq[n_seq] = pod; mix_ports[n_mix_ports++] = inport;
ctrl[n_seq] = spa_pod_control_first(&seq[n_seq]->body);
inio->status = SPA_STATUS_NEED_DATA; inio->status = SPA_STATUS_NEED_DATA;
n_seq++;
} }
d = outb->buffer->datas; d = outb->buffer->datas;
@ -782,36 +787,38 @@ static int impl_node_process(void *object)
/* merge sort all sequences into output buffer */ /* merge sort all sequences into output buffer */
while (true) { while (true) {
struct spa_pod_control *next = NULL;
uint32_t i, next_index = 0; uint32_t i, next_index = 0;
size_t size;
uint8_t *body;
struct port *next = NULL;
struct spa_pod_control *control;
for (i = 0; i < n_seq; i++) { for (i = 0; i < n_mix_ports; i++) {
if (!spa_pod_control_is_inside(&seq[i]->body, struct port *p = mix_ports[i];
SPA_POD_BODY_SIZE(seq[i]), ctrl[i])) if (next == NULL || event_sort(&p->control, p->control_body,
continue; &next->control, next->control_body) <= 0) {
next = p;
if (next == NULL || event_sort(ctrl[i], next) <= 0) {
next = ctrl[i];
next_index = i; next_index = i;
} }
} }
if (next == NULL) if (next == NULL)
break; break;
if (control_needs_conversion(outport, next->type)) { control = &next->control;
uint8_t *data = SPA_POD_BODY(&next->value); body = (uint8_t*)next->control_body;
size_t size = SPA_POD_BODY_SIZE(&next->value); size = SPA_POD_BODY_SIZE(&control->value);
switch (next->type) { if (control_needs_conversion(outport, control->type)) {
switch (control->type) {
case SPA_CONTROL_Midi: case SPA_CONTROL_Midi:
{ {
uint32_t ump[4]; uint32_t ump[4];
uint64_t state = 0; uint64_t state = 0;
while (size > 0) { while (size > 0) {
int ump_size = spa_ump_from_midi(&data, &size, ump, sizeof(ump), 0, &state); int ump_size = spa_ump_from_midi(&body, &size, ump, sizeof(ump), 0, &state);
if (ump_size <= 0) if (ump_size <= 0)
break; break;
spa_pod_builder_control(&builder, next->offset, SPA_CONTROL_UMP); spa_pod_builder_control(&builder, control->offset, SPA_CONTROL_UMP);
spa_pod_builder_bytes(&builder, ump, ump_size); spa_pod_builder_bytes(&builder, ump, ump_size);
} }
break; break;
@ -819,20 +826,24 @@ static int impl_node_process(void *object)
case SPA_CONTROL_UMP: case SPA_CONTROL_UMP:
{ {
uint8_t ev[8]; uint8_t ev[8];
int ev_size = spa_ump_to_midi((uint32_t*)data, size, ev, sizeof(ev)); int ev_size = spa_ump_to_midi((uint32_t*)body, size, ev, sizeof(ev));
if (ev_size <= 0) if (ev_size <= 0)
break; break;
spa_pod_builder_control(&builder, next->offset, SPA_CONTROL_Midi); spa_pod_builder_control(&builder, control->offset, SPA_CONTROL_Midi);
spa_pod_builder_bytes(&builder, ev, ev_size); spa_pod_builder_bytes(&builder, ev, ev_size);
break; break;
} }
} }
} else { } else {
spa_pod_builder_control(&builder, next->offset, next->type); spa_pod_builder_control(&builder, control->offset, control->type);
spa_pod_builder_primitive(&builder, &next->value); spa_pod_builder_primitive_body(&builder, &control->value, body, size, NULL, 0);
} }
ctrl[next_index] = spa_pod_control_next(ctrl[next_index]); if (spa_pod_parser_get_control_body(&next->parser,
&next->control, &next->control_body) < 0) {
spa_pod_parser_pop(&next->parser, &next->frame);
mix_ports[next_index] = mix_ports[--n_mix_ports];
}
} }
spa_pod_builder_pop(&builder, &f); spa_pod_builder_pop(&builder, &f);

View file

@ -1270,7 +1270,7 @@ PWTEST(pod_parser)
spa_assert_se(spa_pod_parser_push_object(&p, &f, SPA_TYPE_OBJECT_Format, NULL) == -EPROTO); spa_assert_se(spa_pod_parser_push_object(&p, &f, SPA_TYPE_OBJECT_Format, NULL) == -EPROTO);
spa_assert_se(p.state.offset == 0); spa_assert_se(p.state.offset == 0);
spa_assert_se(spa_pod_parser_push_object(&p, &f, SPA_TYPE_OBJECT_Props, NULL) == 0); spa_assert_se(spa_pod_parser_push_object(&p, &f, SPA_TYPE_OBJECT_Props, NULL) == 0);
spa_assert_se(p.state.offset == 392); spa_assert_se(p.state.offset == sizeof(struct spa_pod_object));
spa_assert_se(spa_pod_parser_frame(&p, &f) == val.P); spa_assert_se(spa_pod_parser_frame(&p, &f) == val.P);
spa_zero(val); spa_zero(val);