raop: Rework packet's store memory management

This patch switch the packet-buffer to use core memory pool instead of
manually allocating the room required for storing TCP/UDP packets. Packets
are now stored using pa_memchunk instead of internal struct. Quite a few
malloc saved compare to previous design.
This commit is contained in:
Martin Blanchard 2016-11-06 12:54:10 -06:00 committed by Tanu Kaskinen
parent 32a5b3047b
commit 750a677b43
4 changed files with 216 additions and 253 deletions

View file

@ -20,152 +20,99 @@
USA.
***/
#include <stdlib.h>
#include <limits.h>
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <stdlib.h>
#include <limits.h>
#include <pulse/xmalloc.h>
#include <pulsecore/core-error.h>
#include "raop-client.h"
#include <pulsecore/macro.h>
#include "raop-packet-buffer.h"
/* FRAMES_PER_PACKET*2*2 + sizeof(udp_audio_header) + sizeof(ALAC header), unencoded */
#define PACKET_SIZE_MAX (352*2*2 + 12 + 7) /* FIXME; hardcoded constant ! */
/* Header room for packet retransmission header */
#define RETRANS_HEADER_ROOM 4
/* Packet element */
struct pa_raop_packet_element {
uint16_t seq_num; /* RTP sequence number (in host byte order) */
ssize_t length; /* Actual packet length */
/* Packet data including RTP header */
uint8_t data[PACKET_SIZE_MAX + RETRANS_HEADER_ROOM];
};
/* Buffer struct */
struct pa_raop_packet_buffer {
size_t size; /* max number of packets in buffer */
size_t start; /* index of oldest packet */
size_t count; /* number of packets in buffer */
uint16_t first_seq_num; /* Sequence number of first packet in buffer */
uint16_t latest_seq_num; /* Debug purpose */
pa_raop_packet_element *packets; /* Packet element pointer */
pa_memchunk *packets;
pa_mempool *mempool;
size_t size;
uint16_t seq;
size_t pos;
};
pa_raop_packet_buffer *pa_raop_pb_new(size_t size) {
pa_raop_packet_buffer *pb = pa_xmalloc0(sizeof(*pb));
pa_raop_packet_buffer *pa_raop_packet_buffer_new(pa_mempool *mempool, const size_t size) {
pa_raop_packet_buffer *pb = pa_xnew0(pa_raop_packet_buffer, 1);
pa_assert(mempool);
pa_assert(size > 0);
pb->size = size;
pb->packets = (pa_raop_packet_element *)
pa_xmalloc(size * sizeof(pa_raop_packet_element));
pa_raop_pb_clear(pb);
pb->mempool = mempool;
pb->packets = pa_xnew0(pa_memchunk, size);
pb->seq = pb->pos = 0;
return pb;
}
void pa_raop_pb_clear(pa_raop_packet_buffer *pb) {
pb->start = 0;
pb->count = 0;
pb->first_seq_num = 0;
pb->latest_seq_num = 0;
memset(pb->packets, 0, pb->size * sizeof(pa_raop_packet_element));
}
void pa_raop_packet_buffer_free(pa_raop_packet_buffer *pb) {
size_t i;
pa_assert(pb);
for (i = 0; pb->packets && i < pb->size; i++) {
if (pb->packets[i].memblock)
pa_memblock_unref(pb->packets[i].memblock);
pa_memchunk_reset(&pb->packets[i]);
}
void pa_raop_pb_delete(pa_raop_packet_buffer *pb) {
pa_xfree(pb->packets);
pb->packets = NULL;
pa_xfree(pb);
}
static int pb_is_full(pa_raop_packet_buffer *pb) {
return pb->count == pb->size;
void pa_raop_packet_buffer_reset(pa_raop_packet_buffer *pb, uint16_t seq) {
size_t i;
pa_assert(pb);
pa_assert(pb->packets);
pb->pos = 0;
pb->seq = seq - 1;
for (i = 0; i < pb->size; i++) {
if (pb->packets[i].memblock)
pa_memblock_unref(pb->packets[i].memblock);
pa_memchunk_reset(&pb->packets[i]);
}
}
static int pb_is_empty(pa_raop_packet_buffer *pb) {
return pb->count == 0;
}
pa_memchunk *pa_raop_packet_buffer_get(pa_raop_packet_buffer *pb, uint16_t seq, const size_t size) {
pa_memchunk *packet = NULL;
size_t delta, i;
static pa_raop_packet_element *pb_prepare_write(pa_raop_packet_buffer *pb, uint16_t seq) {
size_t end = (pb->start + pb->count) % pb->size;
pa_raop_packet_element *packet;
pa_assert(pb);
pa_assert(pb->packets);
pa_assert(seq > 0);
/* Set first packet sequence number in buffer if buffer is empty */
if (pb_is_empty(pb))
pb->first_seq_num = seq;
else
pa_assert((uint16_t) (pb->latest_seq_num + 1) == seq);
packet = &pb->packets[end];
if (pb_is_full(pb)) {
pb->start = (pb->start + 1) % pb->size; /* full, overwrite */
/* Set first packet sequence number in buffer
to new start packet sequence number */
pb->first_seq_num = pb->packets[pb->start].seq_num;
} else
++ pb->count;
pb->latest_seq_num = seq;
if (seq == pb->seq)
packet = &pb->packets[pb->pos];
else if (seq < pb->seq) {
delta = pb->seq - seq;
i = (pb->size + pb->pos - delta) % pb->size;
if (delta < pb->size)
packet = &pb->packets[i];
} else {
i = (pb->pos + (seq - pb->seq)) % pb->size;
if (pb->packets[i].memblock)
pa_memblock_unref(pb->packets[i].memblock);
pa_memchunk_reset(&pb->packets[i]);
pb->packets[i].memblock = pa_memblock_new(pb->mempool, size);
packet = &pb->packets[i];
pb->seq = seq;
pb->pos = i;
}
return packet;
}
/* Write packet data to packet buffer */
void pa_raop_pb_write_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, const uint8_t *packet_data, ssize_t packet_length) {
pa_raop_packet_element *packet;
pa_assert(pb);
pa_assert(packet_data);
pa_assert(packet_length <= PACKET_SIZE_MAX);
packet = pb_prepare_write(pb, seq_num);
packet->seq_num = seq_num;
packet->length = packet_length + RETRANS_HEADER_ROOM;
/* Insert RETRANS_HEADER_ROOM bytes in front of packet data,
for retransmission header */
memset(packet->data, 0, RETRANS_HEADER_ROOM);
memcpy(packet->data + RETRANS_HEADER_ROOM, packet_data, packet_length);
}
/* l < r?, considers wrapping */
static bool seq_lt(uint16_t l, uint16_t r) {
return l - r > USHRT_MAX/2;
}
/* Random access to packet from buffer by sequence number for (re-)sending. */
ssize_t pa_raop_pb_read_packet(pa_raop_packet_buffer *pb, uint16_t seq_num, uint8_t **packet_data) {
uint16_t index = 0; /* Index of requested packet */
pa_raop_packet_element *packet;
/* If the buffer is empty, there is no use in calculating indices */
if (pb_is_empty(pb))
return -1;
/* If the requested packet is too old (seq_num below first seq number
in buffer) or too young (seq_num greater than current seq number),
do nothing and return */
if (seq_lt(seq_num, pb->first_seq_num))
return -1;
index = (uint16_t) (seq_num - pb->first_seq_num);
if (index >= pb->count)
return -1;
/* Index of the requested packet in the buffer is calculated
using the first sequence number stored in the buffer.
The offset (seq_num - first_seq_num) is used to access the array. */
packet = &pb->packets[(pb->start + index) % pb->size];
pa_assert(packet->data[RETRANS_HEADER_ROOM + 2] == (seq_num >> 8));
pa_assert(packet->data[RETRANS_HEADER_ROOM + 3] == (seq_num & 0xff));
pa_assert(packet_data);
*packet_data = packet->data;
return packet->length;
}