rework bluetooth IO loops

This commit is contained in:
Lennart Poettering 2009-03-20 18:04:23 +01:00
parent d2bee57ee5
commit 888e44f3b0

View file

@ -96,7 +96,7 @@ struct a2dp_info {
sbc_capabilities_t sbc_capabilities;
sbc_t sbc; /* Codec data */
pa_bool_t sbc_initialized; /* Keep track if the encoder is initialized */
size_t codesize; /* SBC codesize */
size_t codesize, frame_length; /* SBC Codesize, frame_length. We simply cache those values here */
void* buffer; /* Codec transfer buffer */
size_t buffer_size; /* Size of the buffer */
@ -583,7 +583,8 @@ static void setup_sbc(struct a2dp_info *a2dp) {
}
a2dp->sbc.bitpool = active_capabilities->max_bitpool;
a2dp->codesize = (uint16_t) sbc_get_codesize(&a2dp->sbc);
a2dp->codesize = sbc_get_codesize(&a2dp->sbc);
a2dp->frame_length = sbc_get_frame_length(&a2dp->sbc);
}
static int set_conf(struct userdata *u) {
@ -645,7 +646,12 @@ static int set_conf(struct userdata *u) {
/* setup SBC encoder now we agree on parameters */
if (u->profile == PROFILE_A2DP) {
setup_sbc(&u->a2dp);
u->block_size = u->a2dp.codesize;
u->block_size =
((u->link_mtu - sizeof(struct rtp_header) - sizeof(struct rtp_payload))
/ u->a2dp.frame_length
* u->a2dp.codesize);
pa_log_info("SBC parameters:\n\tallocation=%u\n\tsubbands=%u\n\tblocks=%u\n\tbitpool=%u\n",
u->a2dp.sbc.allocation, u->a2dp.sbc.subbands, u->a2dp.sbc.blocks, u->a2dp.sbc.bitpool);
} else
@ -853,48 +859,62 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
static int hsp_process_render(struct userdata *u) {
int ret = 0;
pa_memchunk memchunk;
pa_assert(u);
pa_assert(u->profile == PROFILE_HSP);
pa_assert(u->sink);
pa_sink_render_full(u->sink, u->block_size, &memchunk);
/* First, render some data */
if (!u->write_memchunk.memblock)
pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk);
pa_assert(u->write_memchunk.length == u->block_size);
for (;;) {
ssize_t l;
const void *p;
p = (const uint8_t*) pa_memblock_acquire(memchunk.memblock) + memchunk.index;
l = pa_write(u->stream_fd, p, memchunk.length, &u->stream_write_type);
pa_memblock_release(memchunk.memblock);
/* Now write that data to the socket. The socket is of type
* SEQPACKET, and we generated the data of the MTU size, so this
* should just work. */
pa_log_debug("Memblock written to socket: %lli bytes", (long long) l);
p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index;
l = pa_write(u->stream_fd, p, u->write_memchunk.length, &u->stream_write_type);
pa_memblock_release(u->write_memchunk.memblock);
pa_assert(l != 0);
if (l < 0) {
if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/
if (errno == EINTR)
/* Retry right away if we got interrupted */
continue;
else {
else if (errno == EAGAIN)
/* Hmm, apparently the socket was not writable, give up for now */
break;
pa_log_error("Failed to write data to SCO socket: %s", pa_cstrerror(errno));
ret = -1;
break;
}
} else {
pa_assert((size_t) l <= memchunk.length);
memchunk.index += (size_t) l;
memchunk.length -= (size_t) l;
pa_assert((size_t) l <= u->write_memchunk.length);
u->write_index += (uint64_t) l;
if (memchunk.length <= 0)
if ((size_t) l != u->write_memchunk.length) {
pa_log_error("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
(unsigned long long) l,
(unsigned long long) u->write_memchunk.length);
ret = -1;
break;
}
}
pa_memblock_unref(memchunk.memblock);
u->write_index += (uint64_t) u->write_memchunk.length;
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
break;
}
return ret;
}
@ -919,148 +939,174 @@ static int hsp_process_push(struct userdata *u) {
pa_memblock_release(memchunk.memblock);
if (l <= 0) {
if (l < 0 && (errno == EINTR || errno == EAGAIN)) /*** FIXME: EAGAIN handling borked ***/
if (l < 0 && errno == EINTR)
/* Retry right away if we got interrupted */
continue;
else {
else if (l < 0 && errno == EAGAIN)
/* Hmm, apparently the socket was not readable, give up for now. */
break;
pa_log_error("Failed to read data from SCO socket: %s", l < 0 ? pa_cstrerror(errno) : "EOF");
ret = -1;
break;
}
} else {
pa_assert((size_t) l <= memchunk.length);
memchunk.length = (size_t) l;
u->read_index += (uint64_t) l;
pa_source_post(u->source, &memchunk);
break;
}
}
pa_memblock_unref(memchunk.memblock);
return ret;
}
static void a2dp_prepare_buffer(struct userdata *u) {
pa_assert(u);
if (u->a2dp.buffer_size >= u->link_mtu)
return;
u->a2dp.buffer_size = 2 * u->link_mtu;
pa_xfree(u->a2dp.buffer);
u->a2dp.buffer = pa_xmalloc(u->a2dp.buffer_size);
}
static int a2dp_process_render(struct userdata *u) {
size_t frame_size;
struct a2dp_info *a2dp;
struct rtp_header *header;
struct rtp_payload *payload;
size_t left;
size_t nbytes;
void *d;
const void *p;
size_t to_write, to_encode;
unsigned frame_count;
size_t written;
uint64_t writing_at;
int ret = 0;
pa_assert(u);
pa_assert(u->profile == PROFILE_A2DP);
pa_assert(u->sink);
a2dp = &u->a2dp;
if (a2dp->buffer_size < u->link_mtu) {
a2dp->buffer_size = 2*u->link_mtu;
pa_xfree(a2dp->buffer);
a2dp->buffer = pa_xmalloc(a2dp->buffer_size);
}
header = (struct rtp_header*) a2dp->buffer;
payload = (struct rtp_payload*) ((uint8_t*) a2dp->buffer + sizeof(*header));
d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload);
left = a2dp->buffer_size - sizeof(*header) - sizeof(*payload);
frame_size = sbc_get_frame_length(&a2dp->sbc);
frame_count = 0;
writing_at = u->write_index;
do {
ssize_t encoded;
/* First, render some data */
if (!u->write_memchunk.memblock)
pa_sink_render_full(u->sink, u->block_size, &u->write_memchunk);
pa_assert(u->write_memchunk.length == u->block_size);
a2dp_prepare_buffer(u);
a2dp = &u->a2dp;
header = a2dp->buffer;
payload = (struct rtp_payload*) ((uint8_t*) a2dp->buffer + sizeof(*header));
frame_count = 0;
/* Try to create a packet of the full MTU */
p = (const uint8_t*) pa_memblock_acquire(u->write_memchunk.memblock) + u->write_memchunk.index;
to_encode = u->write_memchunk.length;
d = (uint8_t*) a2dp->buffer + sizeof(*header) + sizeof(*payload);
to_write = a2dp->buffer_size - sizeof(*header) - sizeof(*payload);
while (PA_LIKELY(to_encode > 0 && to_write > 0)) {
size_t written;
ssize_t encoded;
encoded = sbc_encode(&a2dp->sbc,
p, u->write_memchunk.length,
d, left,
p, to_encode,
d, to_write,
&written);
if (PA_UNLIKELY(encoded <= 0)) {
pa_log_error("SBC encoding error (%li)", (long) encoded);
pa_memblock_release(u->write_memchunk.memblock);
return -1;
}
/* pa_log_debug("SBC: encoded: %lu; written: %lu", (unsigned long) encoded, (unsigned long) written); */
/* pa_log_debug("SBC: codesize: %lu; frame_length: %lu", (unsigned long) a2dp->codesize, (unsigned long) a2dp->frame_length); */
pa_assert_fp((size_t) encoded <= to_encode);
pa_assert_fp((size_t) encoded == a2dp->codesize);
pa_assert_fp((size_t) written <= to_write);
pa_assert_fp((size_t) written == a2dp->frame_length);
p = (const uint8_t*) p + encoded;
to_encode -= encoded;
d = (uint8_t*) d + written;
to_write -= written;
frame_count++;
}
pa_memblock_release(u->write_memchunk.memblock);
pa_assert(to_encode == 0);
PA_ONCE_BEGIN {
pa_log_debug("Using SBC encoder implementation: %s", pa_strnull(sbc_get_implementation_info(&a2dp->sbc)));
} PA_ONCE_END;
pa_memblock_release(u->write_memchunk.memblock);
if (encoded <= 0) {
pa_log_error("SBC encoding error (%d)", encoded);
return -1;
}
pa_assert((size_t) encoded <= u->write_memchunk.length);
pa_assert((size_t) encoded == sbc_get_codesize(&a2dp->sbc));
pa_assert((size_t) written <= left);
pa_assert((size_t) written == sbc_get_frame_length(&a2dp->sbc));
/* pa_log_debug("SBC: encoded: %d; written: %d", encoded, written); */
u->write_memchunk.index += encoded;
u->write_memchunk.length -= encoded;
if (u->write_memchunk.length <= 0) {
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
}
u->write_index += encoded;
d = (uint8_t*) d + written;
left -= written;
frame_count++;
} while (((uint8_t*) d - ((uint8_t*) a2dp->buffer + sbc_get_frame_length(&a2dp->sbc))) < (ptrdiff_t) u->link_mtu);
/* write it to the fifo */
memset(a2dp->buffer, 0, sizeof(*header) + sizeof(*payload));
payload->frame_count = frame_count;
header->v = 2;
header->pt = 1;
header->sequence_number = htons(a2dp->seq_num++);
header->timestamp = htonl(writing_at / frame_size);
header->timestamp = htonl(u->write_index / pa_frame_size(&u->sink->sample_spec));
header->ssrc = htonl(1);
payload->frame_count = frame_count;
p = a2dp->buffer;
left = (uint8_t*) d - (uint8_t*) a2dp->buffer;
nbytes = (uint8_t*) d - (uint8_t*) a2dp->buffer;
for (;;) {
ssize_t l;
l = pa_write(u->stream_fd, p, left, &u->stream_write_type);
/* pa_log_debug("write: requested %lu bytes; written %li bytes; mtu=%li", (unsigned long) left, (long) l, (unsigned long) u->link_mtu); */
l = pa_write(u->stream_fd, a2dp->buffer, nbytes, &u->stream_write_type);
pa_assert(l != 0);
if (l < 0) {
if (errno == EINTR || errno == EAGAIN) /*** FIXME: EAGAIN handling borked ***/
if (errno == EINTR)
/* Retry right away if we got interrupted */
continue;
else {
else if (errno == EAGAIN)
/* Hmm, apparently the socket was not writable, give up for now */
break;
pa_log_error("Failed to write data to socket: %s", pa_cstrerror(errno));
return -1;
}
} else {
pa_assert((size_t) l <= left);
d = (uint8_t*) d + l;
left -= l;
if (left <= 0)
ret = -1;
break;
}
pa_assert((size_t) l <= nbytes);
if ((size_t) l != nbytes) {
pa_log_warn("Wrote memory block to socket only partially! %llu written, wanted to write %llu.",
(unsigned long long) l,
(unsigned long long) nbytes);
ret = -1;
break;
}
return 0;
u->write_index += (uint64_t) u->write_memchunk.length;
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
break;
}
return ret;
}
static void thread_func(void *userdata) {
@ -1509,12 +1555,20 @@ static void shutdown_bt(struct userdata *u) {
if (u->stream_fd >= 0) {
pa_close(u->stream_fd);
u->stream_fd = -1;
u->stream_write_type = 0;
u->stream_read_type = 0;
}
if (u->service_fd >= 0) {
pa_close(u->service_fd);
u->service_fd = -1;
}
if (u->write_memchunk.memblock) {
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
}
}
static int init_bt(struct userdata *u) {
@ -1686,11 +1740,6 @@ static int card_set_profile(pa_card *c, pa_card_profile *new_profile) {
stop_thread(u);
shutdown_bt(u);
if (u->write_memchunk.memblock) {
pa_memblock_unref(u->write_memchunk.memblock);
pa_memchunk_reset(&u->write_memchunk);
}
u->profile = *d;
u->sample_spec = u->requested_sample_spec;
@ -2034,9 +2083,6 @@ void pa__done(pa_module *m) {
if (u->device)
pa_bluetooth_device_free(u->device);
if (u->write_memchunk.memblock)
pa_memblock_unref(u->write_memchunk.memblock);
if (u->a2dp.buffer)
pa_xfree(u->a2dp.buffer);