new features:

future cancellation
  corking
  flushing
for playback streams in native protocol


git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@152 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2004-08-22 21:13:58 +00:00
parent ea4805a0fd
commit 41295bbf56
20 changed files with 309 additions and 129 deletions

View file

@ -28,22 +28,20 @@
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include "memblockq.h"
#include "xmalloc.h"
struct memblock_list {
struct memblock_list *next;
struct memblock_list *next, *prev;
struct pa_memchunk chunk;
struct timeval stamp;
};
struct pa_memblockq {
struct memblock_list *blocks, *blocks_tail;
unsigned n_blocks;
size_t current_length, maxlength, tlength, base, prebuf, minreq;
int measure_delay;
uint32_t delay;
struct pa_mcalign *mcalign;
struct pa_memblock_stat *memblock_stat;
};
@ -66,7 +64,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
assert(bq->maxlength >= base);
bq->tlength = ((tlength+base-1)/base)*base;
if (bq->tlength == 0 || bq->tlength >= bq->maxlength)
if (!bq->tlength || bq->tlength >= bq->maxlength)
bq->tlength = bq->maxlength;
bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
@ -80,29 +78,21 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t b
fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
bq->measure_delay = 0;
bq->delay = 0;
bq->mcalign = NULL;
bq->memblock_stat = s;
return bq;
}
void pa_memblockq_free(struct pa_memblockq* bq) {
struct memblock_list *l;
assert(bq);
pa_memblockq_flush(bq);
if (bq->mcalign)
pa_mcalign_free(bq->mcalign);
while ((l = bq->blocks)) {
bq->blocks = l->next;
pa_memblock_unref(l->chunk.memblock);
pa_xfree(l);
}
pa_xfree(bq);
}
@ -110,31 +100,25 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
struct memblock_list *q;
assert(bq && chunk && chunk->memblock && chunk->length && (chunk->length % bq->base) == 0);
pa_memblockq_seek(bq, delta);
if (bq->blocks_tail && bq->blocks_tail->chunk.memblock == chunk->memblock) {
/* Try to merge memory chunks */
if (bq->blocks_tail->chunk.index+bq->blocks_tail->chunk.length == chunk->index) {
bq->blocks_tail->chunk.length += chunk->length;
bq->current_length += chunk->length;
/* fprintf(stderr, __FILE__": merge succeeded: %u\n", chunk->length);*/
return;
}
}
q = pa_xmalloc(sizeof(struct memblock_list));
if (bq->measure_delay)
gettimeofday(&q->stamp, NULL);
else
timerclear(&q->stamp);
q->chunk = *chunk;
pa_memblock_ref(q->chunk.memblock);
assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
q->next = NULL;
if (bq->blocks_tail)
if ((q->prev = bq->blocks_tail))
bq->blocks_tail->next = q;
else
bq->blocks = q;
@ -158,57 +142,43 @@ int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) {
*chunk = bq->blocks->chunk;
pa_memblock_ref(chunk->memblock);
/* if (chunk->memblock->ref != 2) */
/* fprintf(stderr, "block %p with ref %u peeked.\n", chunk->memblock, chunk->memblock->ref); */
return 0;
}
/*
int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
struct memblock_list *q;
void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length) {
assert(bq && chunk && length);
if (!bq->blocks || memcmp(&bq->blocks->chunk, chunk, sizeof(struct pa_memchunk)))
return;
assert(length <= bq->blocks->chunk.length);
pa_memblockq_skip(bq, length);
}
static void remove_block(struct pa_memblockq *bq, struct memblock_list *q) {
assert(bq && q);
if (q->prev)
q->prev->next = q->next;
else {
assert(bq->blocks == q);
bq->blocks = q->next;
}
assert(bq && chunk);
if (!bq->blocks || bq->current_length < bq->prebuf)
return -1;
bq->prebuf = 0;
q = bq->blocks;
bq->blocks = bq->blocks->next;
*chunk = q->chunk;
bq->n_blocks--;
bq->current_length -= chunk->length;
if (q->next)
q->next->prev = q->prev;
else {
assert(bq->blocks_tail == q);
bq->blocks_tail = q->prev;
}
pa_memblock_unref(q->chunk.memblock);
pa_xfree(q);
return 0;
}
*/
static uint32_t age(struct timeval *tv) {
struct timeval now;
uint32_t r;
assert(tv);
if (tv->tv_sec == 0)
return 0;
gettimeofday(&now, NULL);
r = (now.tv_sec-tv->tv_sec) * 1000000;
if (now.tv_usec >= tv->tv_usec)
r += now.tv_usec - tv->tv_usec;
else
r -= tv->tv_usec - now.tv_usec;
return r;
bq->n_blocks--;
}
void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
void pa_memblockq_skip(struct pa_memblockq *bq, size_t length) {
assert(bq && length && (length % bq->base) == 0);
while (length > 0) {
@ -218,25 +188,12 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
if (l > bq->blocks->chunk.length)
l = bq->blocks->chunk.length;
if (bq->measure_delay)
bq->delay = age(&bq->blocks->stamp);
bq->blocks->chunk.index += l;
bq->blocks->chunk.length -= l;
bq->current_length -= l;
if (bq->blocks->chunk.length == 0) {
struct memblock_list *q;
q = bq->blocks;
bq->blocks = bq->blocks->next;
if (bq->blocks == NULL)
bq->blocks_tail = NULL;
pa_memblock_unref(q->chunk.memblock);
pa_xfree(q);
bq->n_blocks--;
}
if (!bq->blocks->chunk.length)
remove_block(bq, bq->blocks);
length -= l;
}
@ -255,7 +212,7 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) {
l /= bq->base;
l *= bq->base;
pa_memblockq_drop(bq, l);
pa_memblockq_skip(bq, l);
}
@ -276,11 +233,6 @@ int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) {
return bq->current_length + length <= bq->tlength;
}
uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
assert(bq);
return bq->delay;
}
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) {
assert(bq);
return bq->current_length;
@ -331,3 +283,44 @@ void pa_memblockq_prebuf_disable(struct pa_memblockq *bq) {
assert(bq);
bq->prebuf = 0;
}
void pa_memblockq_seek(struct pa_memblockq *bq, size_t length) {
assert(bq);
if (!length)
return;
while (length >= bq->base) {
size_t l = length;
if (!bq->current_length)
return;
assert(bq->blocks_tail);
if (l > bq->blocks_tail->chunk.length)
l = bq->blocks_tail->chunk.length;
bq->blocks_tail->chunk.length -= l;
bq->current_length -= l;
if (bq->blocks_tail->chunk.length == 0)
remove_block(bq, bq->blocks);
length -= l;
}
}
void pa_memblockq_flush(struct pa_memblockq *bq) {
struct memblock_list *l;
assert(bq);
while ((l = bq->blocks)) {
bq->blocks = l->next;
pa_memblock_unref(l->chunk.memblock);
pa_xfree(l);
}
bq->blocks_tail = NULL;
bq->n_blocks = 0;
bq->current_length = 0;
}

View file

@ -44,7 +44,7 @@ struct pa_memblockq* pa_memblockq_new(size_t maxlength,
struct pa_memblock_stat *s);
void pa_memblockq_free(struct pa_memblockq*bq);
/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */
/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. */
void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta);
/* Same as pa_memblockq_push(), however chunks are filtered through a mcalign object, and thus aligned to multiples of base */
@ -53,8 +53,11 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *
/* Return a copy of the next memory chunk in the queue. It is not removed from the queue */
int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk);
/* Drop the specified bytes from the queue, only valid aufter pa_memblockq_peek() */
void pa_memblockq_drop(struct pa_memblockq *bq, const struct pa_memchunk *chunk, size_t length);
/* Drop the specified bytes from the queue */
void pa_memblockq_drop(struct pa_memblockq *bq, size_t length);
void pa_memblockq_skip(struct pa_memblockq *bq, size_t length);
/* Shorten the pa_memblockq to the specified length by dropping data at the end of the queue */
void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length);
@ -68,9 +71,6 @@ int pa_memblockq_is_readable(struct pa_memblockq *bq);
/* Test if the pa_memblockq is currently writable for the specified amount of bytes */
int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length);
/* The time memory chunks stay in the queue until they are removed completely in usecs */
uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq);
/* Return the length of the queue in bytes */
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq);
@ -83,4 +83,10 @@ uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq);
/* Force disabling of pre-buf even when the pre-buffer is not yet filled */
void pa_memblockq_prebuf_disable(struct pa_memblockq *bq);
/* Manipulate the write pointer */
void pa_memblockq_seek(struct pa_memblockq *bq, size_t delta);
/* Flush the queue */
void pa_memblockq_flush(struct pa_memblockq *bq);
#endif

View file

@ -75,6 +75,9 @@ enum {
PA_COMMAND_SET_SINK_VOLUME,
PA_COMMAND_SET_SINK_INPUT_VOLUME,
PA_COMMAND_CORK_PLAYBACK_STREAM,
PA_COMMAND_FLUSH_PLAYBACK_STREAM,
PA_COMMAND_MAX
};

View file

@ -65,7 +65,7 @@ static void do_stream_write(size_t length) {
if (l > buffer_length)
l = buffer_length;
pa_stream_write(stream, buffer+buffer_index, l, NULL);
pa_stream_write(stream, buffer+buffer_index, l, NULL, 0);
buffer_length -= l;
buffer_index += l;

View file

@ -150,7 +150,7 @@ static void stream_write_callback(struct pa_stream *s, size_t length, void *user
quit(1);
}
pa_stream_write(s, d, length, free);
pa_stream_write(s, d, length, free, 0);
sample_length -= length;

View file

@ -59,11 +59,12 @@ static void si_kill(struct pa_mainloop_api *m, void *i) {
sink_input_kill(i);
}
static void sink_input_drop(struct pa_sink_input *i, size_t length) {
static void sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk*chunk, size_t length) {
struct pa_memchunk *c;
assert(i && length && i->userdata);
c = i->userdata;
assert(chunk == c);
assert(length <= c->length);
c->length -= length;

View file

@ -193,7 +193,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
pa_context_unref(c);
}
static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
struct pa_context *c = userdata;
struct pa_stream *s;
assert(p && chunk && c && chunk->memblock && chunk->memblock->data);

View file

@ -187,7 +187,7 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
if (l > length)
l = length;
pa_stream_write(p->stream, data, l, NULL);
pa_stream_write(p->stream, data, l, NULL, 0);
data += l;
length -= l;
}

View file

@ -267,7 +267,7 @@ void pa_stream_connect_record(struct pa_stream *s, const char *dev, const struct
create_stream(s, dev, attr);
}
void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p)) {
void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void (*free_cb)(void *p), size_t delta) {
struct pa_memchunk chunk;
assert(s && s->context && data && length && s->state == PA_STREAM_READY && s->ref >= 1);
@ -282,7 +282,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length, void
chunk.index = 0;
chunk.length = length;
pa_pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
pa_pstream_send_memblock(s->context->pstream, s->channel, delta, &chunk);
pa_memblock_unref(chunk.memblock);
if (length < s->requested_bytes)
@ -452,3 +452,48 @@ finish:
pa_operation_done(o);
pa_operation_unref(o);
}
struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata) {
struct pa_operation *o;
struct pa_tagstruct *t;
uint32_t tag;
assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
o = pa_operation_new(s->context, s);
assert(o);
o->callback = cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
assert(t);
pa_tagstruct_putu32(t, PA_COMMAND_CORK_PLAYBACK_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
pa_tagstruct_putu32(t, !!b);
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
return pa_operation_ref(o);
}
struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata) {
struct pa_operation *o;
struct pa_tagstruct *t;
uint32_t tag;
assert(s && s->ref >= 1 && s->state == PA_STREAM_READY);
o = pa_operation_new(s->context, s);
assert(o);
o->callback = cb;
o->userdata = userdata;
t = pa_tagstruct_new(NULL, 0);
assert(t);
pa_tagstruct_putu32(t, PA_COMMAND_FLUSH_PLAYBACK_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_putu32(t, s->channel);
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, pa_stream_simple_ack_callback, o);
return pa_operation_ref(o);
}

View file

@ -70,7 +70,30 @@ void pa_stream_disconnect(struct pa_stream *s);
* and an internal reference to the specified data is kept, the data
* is not copied. If NULL, the data is copied into an internal
* buffer. */
void pa_stream_write(struct pa_stream *p, const void *data, size_t length, void (*free_cb)(void *p));
void pa_stream_write(struct pa_stream *p /**< The stream to use */,
const void *data /**< The data to write */,
size_t length /**< The length of the data to write */,
void (*free_cb)(void *p) /**< A cleanup routine for the data or NULL to request an internal copy */,
size_t delta /**< Drop this many
bytes in the playback
buffer before writing
this data. Use
(size_t) -1 for
clearing the whole
playback
buffer. Normally you
will specify 0 here,
.i.e. append to the
playback buffer. If
the value given here
is greater than the
buffered data length
the buffer is cleared
and the data is
written to the
buffer's start. This
value is ignored on
upload streams. */);
/** Return the amount of bytes that may be written using pa_stream_write() */
size_t pa_stream_writable_size(struct pa_stream *p);
@ -90,6 +113,16 @@ void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stre
/** Set the callback function that is called when new data is available from the stream */
void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
/** Pause (or resume) playback of this stream temporarily
* \since 0.3 */
struct pa_operation* pa_stream_cork(struct pa_stream *s, int b, void (*cb) (struct pa_stream*s, int success, void *userdata), void *userdata);
/** Flush the playback buffer of this stream. Most of the time you're
* better off using the delta of pa_stream_write() instead of this
* function.
* \since 0.3*/
struct pa_operation* pa_stream_flush(struct pa_stream *s, void (*cb)(struct pa_stream *s, int success, void *userdata), void *userdata);
PA_C_DECL_END
#endif

View file

@ -102,7 +102,7 @@ typedef struct proto_handler {
const char *description;
} esd_proto_handler_info_t;
static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
static void sink_input_kill_cb(struct pa_sink_input *i);
static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
@ -835,7 +835,7 @@ static int do_write(struct connection *c) {
return -1;
}
pa_memblockq_drop(c->output_memblockq, r);
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
}
@ -894,11 +894,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct connection*c = i->userdata;
assert(i && c && length);
pa_memblockq_drop(c->input_memblockq, length);
pa_memblockq_drop(c->input_memblockq, chunk, length);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);

View file

@ -107,7 +107,7 @@ struct pa_protocol_native {
};
static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk);
static void sink_input_drop_cb(struct pa_sink_input *i, size_t length);
static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
static void sink_input_kill_cb(struct pa_sink_input *i);
static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
@ -135,6 +135,8 @@ static void command_get_info_list(struct pa_pdispatch *pd, uint32_t command, uin
static void command_get_server_info(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_subscribe(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL },
@ -176,6 +178,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_SUBSCRIBE] = { command_subscribe },
[PA_COMMAND_SET_SINK_VOLUME] = { command_set_volume },
[PA_COMMAND_SET_SINK_INPUT_VOLUME] = { command_set_volume },
[PA_COMMAND_CORK_PLAYBACK_STREAM] = { command_cork_playback_stream },
[PA_COMMAND_FLUSH_PLAYBACK_STREAM] = { command_flush_playback_stream },
};
/* structure management */
@ -376,7 +380,7 @@ static void send_memblock(struct connection *c) {
chunk.length = r->fragment_size;
pa_pstream_send_memblock(c->pstream, r->index, 0, &chunk);
pa_memblockq_drop(r->memblockq, chunk.length);
pa_memblockq_drop(r->memblockq, &chunk, chunk.length);
pa_memblock_unref(chunk.memblock);
return;
@ -422,12 +426,12 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct playback_stream *s;
assert(i && i->userdata && length);
s = i->userdata;
pa_memblockq_drop(s->memblockq, length);
pa_memblockq_drop(s->memblockq, chunk, length);
request_bytes(s);
if (s->drain_request && !pa_memblockq_is_readable(s->memblockq)) {
@ -1293,6 +1297,59 @@ static void command_set_volume(struct pa_pdispatch *pd, uint32_t command, uint32
pa_pstream_send_simple_ack(c->pstream, tag);
}
static void command_cork_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
uint32_t index;
uint32_t b;
struct playback_stream *s;
assert(c && t);
if (pa_tagstruct_getu32(t, &index) < 0 ||
pa_tagstruct_getu32(t, &b) < 0 ||
!pa_tagstruct_eof(t)) {
protocol_error(c);
return;
}
if (!c->authorized) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
return;
}
pa_sink_input_cork(s->sink_input, b);
pa_pstream_send_simple_ack(c->pstream, tag);
}
static void command_flush_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
uint32_t index;
struct playback_stream *s;
assert(c && t);
if (pa_tagstruct_getu32(t, &index) < 0 ||
!pa_tagstruct_eof(t)) {
protocol_error(c);
return;
}
if (!c->authorized) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return;
}
if (!(s = pa_idxset_get_by_index(c->output_streams, index)) || s->type != PLAYBACK_STREAM) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
return;
}
pa_memblockq_flush(s->memblockq);
pa_pstream_send_simple_ack(c->pstream, tag);
}
/*** pstream callbacks ***/
@ -1306,7 +1363,7 @@ static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *pack
}
}
static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata) {
static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata) {
struct connection *c = userdata;
struct output_stream *stream;
assert(p && chunk && userdata);
@ -1338,7 +1395,6 @@ static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, in
u->memchunk = *chunk;
pa_memblock_ref(u->memchunk.memblock);
u->length = 0;
fprintf(stderr, "COPY\n");
} else {
u->memchunk.memblock = pa_memblock_new(u->length, c->protocol->core->memblock_stat);
u->memchunk.index = u->memchunk.length = 0;

View file

@ -159,7 +159,7 @@ static int do_write(struct connection *c) {
return -1;
}
pa_memblockq_drop(c->output_memblockq, r);
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
return 0;
@ -202,11 +202,11 @@ static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk
return 0;
}
static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
static void sink_input_drop_cb(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
struct connection*c = i->userdata;
assert(i && c && length);
pa_memblockq_drop(c->input_memblockq, length);
pa_memblockq_drop(c->input_memblockq, chunk, length);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);

View file

@ -50,7 +50,7 @@ struct item_info {
/* memblock info */
struct pa_memchunk chunk;
uint32_t channel;
int32_t delta;
uint32_t delta;
/* packet info */
struct pa_packet *packet;
@ -86,7 +86,7 @@ struct pa_pstream {
void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
void *recieve_packet_callback_userdata;
void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata);
void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata);
void *recieve_memblock_callback_userdata;
void (*drain_callback)(struct pa_pstream *p, void *userdata);
@ -219,7 +219,7 @@ void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
p->mainloop->defer_enable(p->defer_event, 1);
}
void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk) {
void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk) {
struct item_info *i;
assert(p && channel != (uint32_t) -1 && chunk);
@ -242,7 +242,7 @@ void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callbac
p->recieve_packet_callback_userdata = userdata;
}
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata) {
assert(p && callback);
p->recieve_memblock_callback = callback;
@ -378,7 +378,7 @@ static void do_read(struct pa_pstream *p) {
p->recieve_memblock_callback(
p,
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
(int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
&chunk,
p->recieve_memblock_callback_userdata);
}

View file

@ -37,10 +37,10 @@ void pa_pstream_unref(struct pa_pstream*p);
struct pa_pstream* pa_pstream_ref(struct pa_pstream*p);
void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet);
void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk);
void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk);
void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, uint32_t delta, const struct pa_memchunk *chunk, void *userdata), void *userdata);
void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata);
void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata);

View file

@ -59,6 +59,7 @@ struct pa_sink_input* pa_sink_input_new(struct pa_sink *s, const char *name, con
i->get_latency = NULL;
i->userdata = NULL;
i->corked = 0;
i->volume = PA_VOLUME_NORM;
i->resampled_chunk.memblock = NULL;
@ -120,6 +121,9 @@ uint32_t pa_sink_input_get_latency(struct pa_sink_input *i) {
int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
assert(i && chunk && i->peek && i->drop);
if (i->corked == 0)
return -1;
if (!i->resampler)
return i->peek(i, chunk);
@ -134,11 +138,12 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
assert(tchunk.length);
l = pa_resampler_request(i->resampler, CONVERT_BUFFER_LENGTH);
i->drop(i, &tchunk, l);
if (tchunk.length > l)
tchunk.length = l;
i->drop(i, tchunk.length);
pa_resampler_run(i->resampler, &tchunk, &i->resampled_chunk);
pa_memblock_unref(tchunk.memblock);
}
@ -149,11 +154,11 @@ int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk) {
return 0;
}
void pa_sink_input_drop(struct pa_sink_input *i, size_t length) {
void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length) {
assert(i && length);
if (!i->resampler) {
i->drop(i, length);
i->drop(i, chunk, length);
return;
}
@ -177,3 +182,13 @@ void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume) {
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_CHANGE, i->index);
}
}
void pa_sink_input_cork(struct pa_sink_input *i, int b) {
int n;
assert(i);
n = i->corked && !b;
i->corked = b;
if (n)
pa_sink_notify(i->sink);
}

View file

@ -34,6 +34,8 @@
struct pa_sink_input {
uint32_t index;
int corked;
char *name;
struct pa_module *owner;
struct pa_client *client;
@ -42,7 +44,7 @@ struct pa_sink_input {
uint32_t volume;
int (*peek) (struct pa_sink_input *i, struct pa_memchunk *chunk);
void (*drop) (struct pa_sink_input *i, size_t length);
void (*drop) (struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
void (*kill) (struct pa_sink_input *i);
uint32_t (*get_latency) (struct pa_sink_input *i);
@ -62,8 +64,10 @@ void pa_sink_input_kill(struct pa_sink_input *i);
uint32_t pa_sink_input_get_latency(struct pa_sink_input *i);
int pa_sink_input_peek(struct pa_sink_input *i, struct pa_memchunk *chunk);
void pa_sink_input_drop(struct pa_sink_input *i, size_t length);
void pa_sink_input_drop(struct pa_sink_input *i, const struct pa_memchunk *chunk, size_t length);
void pa_sink_input_set_volume(struct pa_sink_input *i, pa_volume_t volume);
void pa_sink_input_cork(struct pa_sink_input *i, int b);
#endif

View file

@ -147,8 +147,8 @@ static void inputs_drop(struct pa_sink *s, struct pa_mix_info *info, unsigned ma
struct pa_sink_input *i = info->userdata;
assert(i && info->chunk.memblock);
pa_sink_input_drop(i, &info->chunk, length);
pa_memblock_unref(info->chunk.memblock);
pa_sink_input_drop(i, length);
}
}

View file

@ -36,6 +36,7 @@
#include <pwd.h>
#include <signal.h>
#include <pthread.h>
#include <sys/time.h>
#include "util.h"
#include "xmalloc.h"
@ -192,3 +193,23 @@ char *pa_get_host_name(char *s, size_t l) {
s[l-1] = 0;
return s;
}
uint32_t pa_age(struct timeval *tv) {
struct timeval now;
uint32_t r;
assert(tv);
if (tv->tv_sec == 0)
return 0;
gettimeofday(&now, NULL);
r = (now.tv_sec-tv->tv_sec) * 1000000;
if (now.tv_usec >= tv->tv_usec)
r += now.tv_usec - tv->tv_usec;
else
r -= tv->tv_usec - now.tv_usec;
return r;
}

View file

@ -23,6 +23,7 @@
***/
#include <sys/types.h>
#include <inttypes.h>
void pa_make_nonblock_fd(int fd);
@ -38,4 +39,6 @@ char *pa_sprintf_malloc(const char *format, ...) __attribute__ ((format (printf,
char *pa_get_user_name(char *s, size_t l);
char *pa_get_host_name(char *s, size_t l);
uint32_t pa_age(struct timeval *tv);
#endif