avb: add source

This commit is contained in:
Wim Taymans 2022-03-09 16:27:38 +01:00
parent 552a0056b5
commit 200f591a3d
5 changed files with 988 additions and 12 deletions

View file

@ -766,7 +766,7 @@ static void avb_on_socket_event(struct spa_source *source)
struct state *state = source->data;
ssize_t n;
int32_t filled;
uint32_t subtype, index;
uint32_t subtype, index, n_bytes;
struct avtp_common_pdu *common;
struct props *p = &state->props;
struct avtp_stream_pdu *pdu = state->pdu;
@ -796,15 +796,16 @@ static void avb_on_socket_event(struct spa_source *source)
filled = spa_ringbuffer_get_write_index(&state->ring, &index);
if (filled > (int32_t) state->ringbuffer_size) {
spa_log_warn(state->log, "overrun %d", filled);
return;
}
n_bytes = p->frames_per_pdu * state->stride;
spa_ringbuffer_write_data(&state->ring,
state->ringbuffer_data,
state->ringbuffer_size,
index % state->ringbuffer_size,
pdu->avtp_payload,
p->frames_per_pdu * state->stride);
pdu->avtp_payload, n_bytes);
index += p->frames_per_pdu * state->stride;
index += n_bytes;
spa_ringbuffer_write_update(&state->ring, index);
}
@ -839,7 +840,7 @@ int spa_avb_write(struct state *state)
to_write = state->ringbuffer_size - filled;
while (!spa_list_is_empty(&port->ready) && to_write > 0) {
size_t n_bytes, n_frames;
size_t n_bytes;
struct buffer *b;
struct spa_data *d;
uint32_t offs, avail, size;
@ -849,10 +850,9 @@ int spa_avb_write(struct state *state)
offs = SPA_MIN(d[0].chunk->offset + port->ready_offset, d[0].maxsize);
size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs);
avail = (size - offs) / state->stride;
avail = size - offs;
n_frames = SPA_MIN(avail, to_write);
n_bytes = n_frames * state->stride;
n_bytes = SPA_MIN(avail, to_write);
spa_ringbuffer_write_data(&state->ring,
state->ringbuffer_data,
@ -873,7 +873,7 @@ int spa_avb_write(struct state *state)
port->ready_offset = 0;
}
to_write -= n_frames;
to_write -= n_bytes;
index += n_bytes;
}
spa_ringbuffer_write_update(&state->ring, index);
@ -883,7 +883,7 @@ int spa_avb_write(struct state *state)
static int handle_play(struct state *state, uint64_t current_time)
{
int res;
int32_t avail;
int32_t avail, wanted;
uint32_t index, n_bytes;
uint64_t ptime, txtime;
int pdu_count;
@ -892,8 +892,9 @@ static int handle_play(struct state *state, uint64_t current_time)
ssize_t n;
avail = spa_ringbuffer_get_read_index(&state->ring, &index);
if (avail < (int32_t) state->duration) {
spa_log_warn(state->log, "underrun %d", avail);
wanted = state->duration * state->stride;
if (avail < wanted) {
spa_log_warn(state->log, "underrun %d < %d", avail, wanted);
goto done;
}
@ -935,6 +936,68 @@ done:
static int handle_capture(struct state *state, uint64_t current_time)
{
int32_t avail, wanted;
uint32_t index;
struct port *port = &state->ports[0];
struct spa_io_buffers *io;
struct buffer *b;
avail = spa_ringbuffer_get_read_index(&state->ring, &index);
wanted = state->duration * state->stride;
if (avail < wanted) {
spa_log_warn(state->log, "capture underrun %d < %d", avail, wanted);
goto done;
}
while (avail >= wanted) {
struct spa_data *d;
uint32_t n_bytes;
if (spa_list_is_empty(&port->free))
break;
b = spa_list_first(&port->free, struct buffer, link);
d = b->buf->datas;
n_bytes = SPA_MIN(d[0].maxsize, (uint32_t)wanted);
spa_ringbuffer_read_data(&state->ring,
state->ringbuffer_data,
state->ringbuffer_size,
index % state->ringbuffer_size,
d[0].data, n_bytes);
d[0].chunk->offset = 0;
d[0].chunk->size = n_bytes;
d[0].chunk->stride = state->stride;
d[0].chunk->flags = 0;
spa_list_remove(&b->link);
spa_list_append(&port->ready, &b->link);
index += n_bytes;
spa_ringbuffer_read_update(&state->ring, index);
}
if (spa_list_is_empty(&port->ready))
return 0;
io = port->io;
if (io != NULL &&
(io->status != SPA_STATUS_HAVE_DATA || port->rate_match != NULL)) {
if (io->buffer_id < port->n_buffers)
spa_avb_recycle_buffer(state, port, io->buffer_id);
b = spa_list_first(&port->ready, struct buffer, link);
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_DATA;
spa_log_trace_fp(state->log, "%p: output buffer:%d", state, b->id);
}
spa_node_call_ready(&state->callbacks, SPA_STATUS_HAVE_DATA);
done:
return 0;
}