mirror of
https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
synced 2026-01-02 11:08:39 -05:00
pstream: Allow reading/writing through srbchannel
For writing, we prefer writing through the srbchannel if one is available, and we have no ancil data to send. For reading, we support reading from both in parallel. This meant replicating a struct used for reading, so a lot of this patch is just a search/replace in do_read to use the appropriate channel for reading. Signed-off-by: David Henningsson <david.henningsson@canonical.com>
This commit is contained in:
parent
b06e616525
commit
4931637f82
2 changed files with 158 additions and 77 deletions
|
|
@ -109,12 +109,23 @@ struct item_info {
|
||||||
uint32_t block_id;
|
uint32_t block_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct pstream_read {
|
||||||
|
pa_pstream_descriptor descriptor;
|
||||||
|
pa_memblock *memblock;
|
||||||
|
pa_packet *packet;
|
||||||
|
uint32_t shm_info[PA_PSTREAM_SHM_MAX];
|
||||||
|
void *data;
|
||||||
|
size_t index;
|
||||||
|
};
|
||||||
|
|
||||||
struct pa_pstream {
|
struct pa_pstream {
|
||||||
PA_REFCNT_DECLARE;
|
PA_REFCNT_DECLARE;
|
||||||
|
|
||||||
pa_mainloop_api *mainloop;
|
pa_mainloop_api *mainloop;
|
||||||
pa_defer_event *defer_event;
|
pa_defer_event *defer_event;
|
||||||
pa_iochannel *io;
|
pa_iochannel *io;
|
||||||
|
pa_srbchannel *srb, *srbpending;
|
||||||
|
bool is_srbpending;
|
||||||
|
|
||||||
pa_queue *send_queue;
|
pa_queue *send_queue;
|
||||||
|
|
||||||
|
|
@ -132,14 +143,7 @@ struct pa_pstream {
|
||||||
pa_memchunk memchunk;
|
pa_memchunk memchunk;
|
||||||
} write;
|
} write;
|
||||||
|
|
||||||
struct {
|
struct pstream_read readio, readsrb;
|
||||||
pa_pstream_descriptor descriptor;
|
|
||||||
pa_memblock *memblock;
|
|
||||||
pa_packet *packet;
|
|
||||||
uint32_t shm_info[PA_PSTREAM_SHM_MAX];
|
|
||||||
void *data;
|
|
||||||
size_t index;
|
|
||||||
} read;
|
|
||||||
|
|
||||||
bool use_shm;
|
bool use_shm;
|
||||||
pa_memimport *import;
|
pa_memimport *import;
|
||||||
|
|
@ -172,7 +176,7 @@ struct pa_pstream {
|
||||||
};
|
};
|
||||||
|
|
||||||
static int do_write(pa_pstream *p);
|
static int do_write(pa_pstream *p);
|
||||||
static int do_read(pa_pstream *p);
|
static int do_read(pa_pstream *p, struct pstream_read *re);
|
||||||
|
|
||||||
static void do_pstream_read_write(pa_pstream *p) {
|
static void do_pstream_read_write(pa_pstream *p) {
|
||||||
pa_assert(p);
|
pa_assert(p);
|
||||||
|
|
@ -182,8 +186,13 @@ static void do_pstream_read_write(pa_pstream *p) {
|
||||||
|
|
||||||
p->mainloop->defer_enable(p->defer_event, 0);
|
p->mainloop->defer_enable(p->defer_event, 0);
|
||||||
|
|
||||||
|
if (!p->dead && p->srb) {
|
||||||
|
do_write(p);
|
||||||
|
while (!p->dead && do_read(p, &p->readsrb) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
if (!p->dead && pa_iochannel_is_readable(p->io)) {
|
if (!p->dead && pa_iochannel_is_readable(p->io)) {
|
||||||
if (do_read(p) < 0)
|
if (do_read(p, &p->readio) < 0)
|
||||||
goto fail;
|
goto fail;
|
||||||
} else if (!p->dead && pa_iochannel_is_hungup(p->io))
|
} else if (!p->dead && pa_iochannel_is_hungup(p->io))
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
@ -208,6 +217,17 @@ fail:
|
||||||
pa_pstream_unref(p);
|
pa_pstream_unref(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool srb_callback(pa_srbchannel *srb, void *userdata) {
|
||||||
|
pa_pstream *p = userdata;
|
||||||
|
|
||||||
|
pa_assert(p);
|
||||||
|
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||||
|
pa_assert(p->srb == srb);
|
||||||
|
|
||||||
|
do_pstream_read_write(p);
|
||||||
|
return p->srb != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void io_callback(pa_iochannel*io, void *userdata) {
|
static void io_callback(pa_iochannel*io, void *userdata) {
|
||||||
pa_pstream *p = userdata;
|
pa_pstream *p = userdata;
|
||||||
|
|
||||||
|
|
@ -289,11 +309,17 @@ static void pstream_free(pa_pstream *p) {
|
||||||
if (p->write.memchunk.memblock)
|
if (p->write.memchunk.memblock)
|
||||||
pa_memblock_unref(p->write.memchunk.memblock);
|
pa_memblock_unref(p->write.memchunk.memblock);
|
||||||
|
|
||||||
if (p->read.memblock)
|
if (p->readsrb.memblock)
|
||||||
pa_memblock_unref(p->read.memblock);
|
pa_memblock_unref(p->readsrb.memblock);
|
||||||
|
|
||||||
if (p->read.packet)
|
if (p->readsrb.packet)
|
||||||
pa_packet_unref(p->read.packet);
|
pa_packet_unref(p->readsrb.packet);
|
||||||
|
|
||||||
|
if (p->readio.memblock)
|
||||||
|
pa_memblock_unref(p->readio.memblock);
|
||||||
|
|
||||||
|
if (p->readio.packet)
|
||||||
|
pa_packet_unref(p->readio.packet);
|
||||||
|
|
||||||
pa_xfree(p);
|
pa_xfree(p);
|
||||||
}
|
}
|
||||||
|
|
@ -556,6 +582,20 @@ static void prepare_next_write_item(pa_pstream *p) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void check_srbpending(pa_pstream *p) {
|
||||||
|
if (!p->is_srbpending)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (p->srb)
|
||||||
|
pa_srbchannel_free(p->srb);
|
||||||
|
|
||||||
|
p->srb = p->srbpending;
|
||||||
|
p->is_srbpending = false;
|
||||||
|
|
||||||
|
if (p->srb)
|
||||||
|
pa_srbchannel_set_callback(p->srb, srb_callback, p);
|
||||||
|
}
|
||||||
|
|
||||||
static int do_write(pa_pstream *p) {
|
static int do_write(pa_pstream *p) {
|
||||||
void *d;
|
void *d;
|
||||||
size_t l;
|
size_t l;
|
||||||
|
|
@ -568,8 +608,11 @@ static int do_write(pa_pstream *p) {
|
||||||
if (!p->write.current)
|
if (!p->write.current)
|
||||||
prepare_next_write_item(p);
|
prepare_next_write_item(p);
|
||||||
|
|
||||||
if (!p->write.current)
|
if (!p->write.current) {
|
||||||
|
/* The out queue is empty, so switching channels is safe */
|
||||||
|
check_srbpending(p);
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (p->write.minibuf_validsize > 0) {
|
if (p->write.minibuf_validsize > 0) {
|
||||||
d = p->write.minibuf + p->write.index;
|
d = p->write.minibuf + p->write.index;
|
||||||
|
|
@ -606,8 +649,9 @@ static int do_write(pa_pstream *p) {
|
||||||
p->send_ancil_now = false;
|
p->send_ancil_now = false;
|
||||||
} else
|
} else
|
||||||
#endif
|
#endif
|
||||||
|
if (p->srb)
|
||||||
if ((r = pa_iochannel_write(p->io, d, l)) < 0)
|
r = pa_srbchannel_write(p->srb, d, l);
|
||||||
|
else if ((r = pa_iochannel_write(p->io, d, l)) < 0)
|
||||||
goto fail;
|
goto fail;
|
||||||
|
|
||||||
if (release_memblock)
|
if (release_memblock)
|
||||||
|
|
@ -639,7 +683,7 @@ fail:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int do_read(pa_pstream *p) {
|
static int do_read(pa_pstream *p, struct pstream_read *re) {
|
||||||
void *d;
|
void *d;
|
||||||
size_t l;
|
size_t l;
|
||||||
ssize_t r;
|
ssize_t r;
|
||||||
|
|
@ -647,23 +691,32 @@ static int do_read(pa_pstream *p) {
|
||||||
pa_assert(p);
|
pa_assert(p);
|
||||||
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||||
|
|
||||||
if (p->read.index < PA_PSTREAM_DESCRIPTOR_SIZE) {
|
if (re->index < PA_PSTREAM_DESCRIPTOR_SIZE) {
|
||||||
d = (uint8_t*) p->read.descriptor + p->read.index;
|
d = (uint8_t*) re->descriptor + re->index;
|
||||||
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
|
l = PA_PSTREAM_DESCRIPTOR_SIZE - re->index;
|
||||||
} else {
|
} else {
|
||||||
pa_assert(p->read.data || p->read.memblock);
|
pa_assert(re->data || re->memblock);
|
||||||
|
|
||||||
if (p->read.data)
|
if (re->data)
|
||||||
d = p->read.data;
|
d = re->data;
|
||||||
else {
|
else {
|
||||||
d = pa_memblock_acquire(p->read.memblock);
|
d = pa_memblock_acquire(re->memblock);
|
||||||
release_memblock = p->read.memblock;
|
release_memblock = re->memblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
|
d = (uint8_t*) d + re->index - PA_PSTREAM_DESCRIPTOR_SIZE;
|
||||||
l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
|
l = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (re->index - PA_PSTREAM_DESCRIPTOR_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (re == &p->readsrb) {
|
||||||
|
r = pa_srbchannel_read(p->srb, d, l);
|
||||||
|
if (r == 0) {
|
||||||
|
if (release_memblock)
|
||||||
|
pa_memblock_release(release_memblock);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
#ifdef HAVE_CREDS
|
#ifdef HAVE_CREDS
|
||||||
{
|
{
|
||||||
pa_ancil b;
|
pa_ancil b;
|
||||||
|
|
@ -689,13 +742,13 @@ static int do_read(pa_pstream *p) {
|
||||||
if (release_memblock)
|
if (release_memblock)
|
||||||
pa_memblock_release(release_memblock);
|
pa_memblock_release(release_memblock);
|
||||||
|
|
||||||
p->read.index += (size_t) r;
|
re->index += (size_t) r;
|
||||||
|
|
||||||
if (p->read.index == PA_PSTREAM_DESCRIPTOR_SIZE) {
|
if (re->index == PA_PSTREAM_DESCRIPTOR_SIZE) {
|
||||||
uint32_t flags, length, channel;
|
uint32_t flags, length, channel;
|
||||||
/* Reading of frame descriptor complete */
|
/* Reading of frame descriptor complete */
|
||||||
|
|
||||||
flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
|
flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
|
||||||
|
|
||||||
if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
|
if (!p->use_shm && (flags & PA_FLAG_SHMMASK) != 0) {
|
||||||
pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
|
pa_log_warn("Received SHM frame on a socket where SHM is disabled.");
|
||||||
|
|
@ -706,10 +759,10 @@ static int do_read(pa_pstream *p) {
|
||||||
|
|
||||||
/* This is a SHM memblock release frame with no payload */
|
/* This is a SHM memblock release frame with no payload */
|
||||||
|
|
||||||
/* pa_log("Got release frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
|
/* pa_log("Got release frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
|
||||||
|
|
||||||
pa_assert(p->export);
|
pa_assert(p->export);
|
||||||
pa_memexport_process_release(p->export, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
|
pa_memexport_process_release(p->export, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
|
||||||
|
|
||||||
goto frame_done;
|
goto frame_done;
|
||||||
|
|
||||||
|
|
@ -717,24 +770,24 @@ static int do_read(pa_pstream *p) {
|
||||||
|
|
||||||
/* This is a SHM memblock revoke frame with no payload */
|
/* This is a SHM memblock revoke frame with no payload */
|
||||||
|
|
||||||
/* pa_log("Got revoke frame for %u", ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
|
/* pa_log("Got revoke frame for %u", ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])); */
|
||||||
|
|
||||||
pa_assert(p->import);
|
pa_assert(p->import);
|
||||||
pa_memimport_process_revoke(p->import, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
|
pa_memimport_process_revoke(p->import, ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI]));
|
||||||
|
|
||||||
goto frame_done;
|
goto frame_done;
|
||||||
}
|
}
|
||||||
|
|
||||||
length = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
|
length = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]);
|
||||||
|
|
||||||
if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
|
if (length > FRAME_SIZE_MAX_ALLOW || length <= 0) {
|
||||||
pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
|
pa_log_warn("Received invalid frame size: %lu", (unsigned long) length);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pa_assert(!p->read.packet && !p->read.memblock);
|
pa_assert(!re->packet && !re->memblock);
|
||||||
|
|
||||||
channel = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
|
channel = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]);
|
||||||
|
|
||||||
if (channel == (uint32_t) -1) {
|
if (channel == (uint32_t) -1) {
|
||||||
|
|
||||||
|
|
@ -744,8 +797,8 @@ static int do_read(pa_pstream *p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Frame is a packet frame */
|
/* Frame is a packet frame */
|
||||||
p->read.packet = pa_packet_new(length);
|
re->packet = pa_packet_new(length);
|
||||||
p->read.data = p->read.packet->data;
|
re->data = re->packet->data;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
|
@ -756,20 +809,20 @@ static int do_read(pa_pstream *p) {
|
||||||
|
|
||||||
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
|
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
|
||||||
|
|
||||||
if (length != sizeof(p->read.shm_info)) {
|
if (length != sizeof(re->shm_info)) {
|
||||||
pa_log_warn("Received SHM memblock frame with invalid frame length.");
|
pa_log_warn("Received SHM memblock frame with invalid frame length.");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Frame is a memblock frame referencing an SHM memblock */
|
/* Frame is a memblock frame referencing an SHM memblock */
|
||||||
p->read.data = p->read.shm_info;
|
re->data = re->shm_info;
|
||||||
|
|
||||||
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
|
} else if ((flags & PA_FLAG_SHMMASK) == 0) {
|
||||||
|
|
||||||
/* Frame is a memblock frame */
|
/* Frame is a memblock frame */
|
||||||
|
|
||||||
p->read.memblock = pa_memblock_new(p->mempool, length);
|
re->memblock = pa_memblock_new(p->mempool, length);
|
||||||
p->read.data = NULL;
|
re->data = NULL;
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
pa_log_warn("Received memblock frame with invalid flags value.");
|
pa_log_warn("Received memblock frame with invalid flags value.");
|
||||||
|
|
@ -777,74 +830,74 @@ static int do_read(pa_pstream *p) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (p->read.index > PA_PSTREAM_DESCRIPTOR_SIZE) {
|
} else if (re->index > PA_PSTREAM_DESCRIPTOR_SIZE) {
|
||||||
/* Frame payload available */
|
/* Frame payload available */
|
||||||
|
|
||||||
if (p->read.memblock && p->receive_memblock_callback) {
|
if (re->memblock && p->receive_memblock_callback) {
|
||||||
|
|
||||||
/* Is this memblock data? Than pass it to the user */
|
/* Is this memblock data? Than pass it to the user */
|
||||||
l = (p->read.index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
|
l = (re->index - (size_t) r) < PA_PSTREAM_DESCRIPTOR_SIZE ? (size_t) (re->index - PA_PSTREAM_DESCRIPTOR_SIZE) : (size_t) r;
|
||||||
|
|
||||||
if (l > 0) {
|
if (l > 0) {
|
||||||
pa_memchunk chunk;
|
pa_memchunk chunk;
|
||||||
|
|
||||||
chunk.memblock = p->read.memblock;
|
chunk.memblock = re->memblock;
|
||||||
chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
|
chunk.index = re->index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
|
||||||
chunk.length = l;
|
chunk.length = l;
|
||||||
|
|
||||||
if (p->receive_memblock_callback) {
|
if (p->receive_memblock_callback) {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
|
||||||
offset = (int64_t) (
|
offset = (int64_t) (
|
||||||
(((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
|
(((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
|
||||||
(((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
|
(((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
|
||||||
|
|
||||||
p->receive_memblock_callback(
|
p->receive_memblock_callback(
|
||||||
p,
|
p,
|
||||||
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
|
ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
|
||||||
offset,
|
offset,
|
||||||
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
|
ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
|
||||||
&chunk,
|
&chunk,
|
||||||
p->receive_memblock_callback_userdata);
|
p->receive_memblock_callback_userdata);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Drop seek info for following callbacks */
|
/* Drop seek info for following callbacks */
|
||||||
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
|
re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] =
|
||||||
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
|
re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI] =
|
||||||
p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
|
re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO] = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Frame complete */
|
/* Frame complete */
|
||||||
if (p->read.index >= ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
|
if (re->index >= ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) + PA_PSTREAM_DESCRIPTOR_SIZE) {
|
||||||
|
|
||||||
if (p->read.memblock) {
|
if (re->memblock) {
|
||||||
|
|
||||||
/* This was a memblock frame. We can unref the memblock now */
|
/* This was a memblock frame. We can unref the memblock now */
|
||||||
pa_memblock_unref(p->read.memblock);
|
pa_memblock_unref(re->memblock);
|
||||||
|
|
||||||
} else if (p->read.packet) {
|
} else if (re->packet) {
|
||||||
|
|
||||||
if (p->receive_packet_callback)
|
if (p->receive_packet_callback)
|
||||||
#ifdef HAVE_CREDS
|
#ifdef HAVE_CREDS
|
||||||
p->receive_packet_callback(p, p->read.packet, &p->read_ancil, p->receive_packet_callback_userdata);
|
p->receive_packet_callback(p, re->packet, &p->read_ancil, p->receive_packet_callback_userdata);
|
||||||
#else
|
#else
|
||||||
p->receive_packet_callback(p, p->read.packet, NULL, p->receive_packet_callback_userdata);
|
p->receive_packet_callback(p, re->packet, NULL, p->receive_packet_callback_userdata);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
pa_packet_unref(p->read.packet);
|
pa_packet_unref(re->packet);
|
||||||
} else {
|
} else {
|
||||||
pa_memblock *b;
|
pa_memblock *b;
|
||||||
uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
|
uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
|
||||||
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
|
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
|
||||||
|
|
||||||
pa_assert(p->import);
|
pa_assert(p->import);
|
||||||
|
|
||||||
if (!(b = pa_memimport_get(p->import,
|
if (!(b = pa_memimport_get(p->import,
|
||||||
ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
|
ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
|
||||||
ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
|
ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
|
||||||
ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
|
ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
|
||||||
ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
|
ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
|
||||||
!!(flags & PA_FLAG_SHMWRITABLE)))) {
|
!!(flags & PA_FLAG_SHMWRITABLE)))) {
|
||||||
|
|
||||||
if (pa_log_ratelimit(PA_LOG_DEBUG))
|
if (pa_log_ratelimit(PA_LOG_DEBUG))
|
||||||
|
|
@ -857,17 +910,17 @@ static int do_read(pa_pstream *p) {
|
||||||
|
|
||||||
chunk.memblock = b;
|
chunk.memblock = b;
|
||||||
chunk.index = 0;
|
chunk.index = 0;
|
||||||
chunk.length = b ? pa_memblock_get_length(b) : ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]);
|
chunk.length = b ? pa_memblock_get_length(b) : ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]);
|
||||||
|
|
||||||
offset = (int64_t) (
|
offset = (int64_t) (
|
||||||
(((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
|
(((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
|
||||||
(((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
|
(((uint64_t) ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_LO]))));
|
||||||
|
|
||||||
p->receive_memblock_callback(
|
p->receive_memblock_callback(
|
||||||
p,
|
p,
|
||||||
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
|
ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
|
||||||
offset,
|
offset,
|
||||||
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
|
ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SEEKMASK,
|
||||||
&chunk,
|
&chunk,
|
||||||
p->receive_memblock_callback_userdata);
|
p->receive_memblock_callback_userdata);
|
||||||
}
|
}
|
||||||
|
|
@ -883,10 +936,10 @@ static int do_read(pa_pstream *p) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
frame_done:
|
frame_done:
|
||||||
p->read.memblock = NULL;
|
re->memblock = NULL;
|
||||||
p->read.packet = NULL;
|
re->packet = NULL;
|
||||||
p->read.index = 0;
|
re->index = 0;
|
||||||
p->read.data = NULL;
|
re->data = NULL;
|
||||||
|
|
||||||
#ifdef HAVE_CREDS
|
#ifdef HAVE_CREDS
|
||||||
p->read_ancil.creds_valid = false;
|
p->read_ancil.creds_valid = false;
|
||||||
|
|
@ -988,6 +1041,9 @@ void pa_pstream_unlink(pa_pstream *p) {
|
||||||
|
|
||||||
p->dead = true;
|
p->dead = true;
|
||||||
|
|
||||||
|
while (p->srb || p->is_srbpending) /* In theory there could be one active and one pending */
|
||||||
|
pa_pstream_set_srbchannel(p, NULL);
|
||||||
|
|
||||||
if (p->import) {
|
if (p->import) {
|
||||||
pa_memimport_free(p->import);
|
pa_memimport_free(p->import);
|
||||||
p->import = NULL;
|
p->import = NULL;
|
||||||
|
|
@ -1040,3 +1096,23 @@ bool pa_pstream_get_shm(pa_pstream *p) {
|
||||||
|
|
||||||
return p->use_shm;
|
return p->use_shm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
|
||||||
|
pa_assert(p);
|
||||||
|
pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
|
||||||
|
|
||||||
|
if (srb == p->srb)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* We can't handle quick switches between srbchannels. */
|
||||||
|
pa_assert(!p->is_srbpending);
|
||||||
|
|
||||||
|
p->srbpending = srb;
|
||||||
|
p->is_srbpending = true;
|
||||||
|
|
||||||
|
/* Switch immediately, if possible. */
|
||||||
|
if (p->dead)
|
||||||
|
check_srbpending(p);
|
||||||
|
else
|
||||||
|
do_write(p);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@
|
||||||
#include <pulsecore/packet.h>
|
#include <pulsecore/packet.h>
|
||||||
#include <pulsecore/memblock.h>
|
#include <pulsecore/memblock.h>
|
||||||
#include <pulsecore/iochannel.h>
|
#include <pulsecore/iochannel.h>
|
||||||
|
#include <pulsecore/srbchannel.h>
|
||||||
#include <pulsecore/memchunk.h>
|
#include <pulsecore/memchunk.h>
|
||||||
#include <pulsecore/creds.h>
|
#include <pulsecore/creds.h>
|
||||||
#include <pulsecore/macro.h>
|
#include <pulsecore/macro.h>
|
||||||
|
|
@ -66,4 +67,8 @@ bool pa_pstream_is_pending(pa_pstream *p);
|
||||||
void pa_pstream_enable_shm(pa_pstream *p, bool enable);
|
void pa_pstream_enable_shm(pa_pstream *p, bool enable);
|
||||||
bool pa_pstream_get_shm(pa_pstream *p);
|
bool pa_pstream_get_shm(pa_pstream *p);
|
||||||
|
|
||||||
|
/* Enables shared ringbuffer channel. Note that the srbchannel is now owned by the pstream.
|
||||||
|
Setting srb to NULL will free any existing srbchannel. */
|
||||||
|
void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue