mirror of
https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
synced 2025-11-11 13:30:02 -05:00
pstream: Support memfd blocks transport
Now that we have the necessary infrastructure to memexport and mempimport a memfd memblock, extend that support higher up in the chain with pstreams. A PA endpoint can now _transparently_ send a memfd memblock to the other end by simply calling pa_pstream_send_memblock() – provided the block's memfd pool was earlier registered with the pstream. If the pipe does not support memfd transfers, we fall back to sending the block's full data instead of just its reference. ** Further details: A single pstream connection usually transfers blocks from multiple pools including the server's srbchannel mempool, the client's audio data mempool, and the server's global core mempool. If these mempools are memfd-backed, we now require registering them with the pstream before sending any blocks they cover. This is done to minimize fd passing overhead and avoid fd leaks. Moreover, to support all these pools without hard-coding their number or nature in the Pulse communication protocol itself, a new REGISTER_MEMFD_SHMID command is introduced. That command can be sent _anytime_ during the pstream's lifetime and is used for creating on demand SHM ID to memfd mappings. Suggested-by: David Henningsson <david.henningsson@canonical.com> Signed-off-by: Ahmed S. Darwish <darwish.07@gmail.com>
This commit is contained in:
parent
ee2db62277
commit
27d0a3b388
17 changed files with 455 additions and 76 deletions
|
|
@ -32,6 +32,7 @@
|
|||
|
||||
#include <pulse/xmalloc.h>
|
||||
|
||||
#include <pulsecore/idxset.h>
|
||||
#include <pulsecore/socket.h>
|
||||
#include <pulsecore/queue.h>
|
||||
#include <pulsecore/log.h>
|
||||
|
|
@ -44,6 +45,7 @@
|
|||
|
||||
/* We piggyback information if audio data blocks are stored in SHM on the seek mode */
|
||||
#define PA_FLAG_SHMDATA 0x80000000LU
|
||||
#define PA_FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
|
||||
#define PA_FLAG_SHMRELEASE 0x40000000LU
|
||||
#define PA_FLAG_SHMREVOKE 0xC0000000LU
|
||||
#define PA_FLAG_SHMMASK 0xFF000000LU
|
||||
|
|
@ -143,7 +145,17 @@ struct pa_pstream {
|
|||
|
||||
struct pstream_read readio, readsrb;
|
||||
|
||||
bool use_shm;
|
||||
/* @use_shm: beside copying the full audio data to the other
|
||||
* PA end, this pipe supports just sending references of the
|
||||
* same audio data blocks if they reside in a SHM pool.
|
||||
*
|
||||
* @use_memfd: pipe supports sending SHM memfd block references
|
||||
*
|
||||
* @registered_memfd_ids: registered memfd pools SHM IDs. Check
|
||||
* pa_pstream_register_memfd_mempool() for more information. */
|
||||
bool use_shm, use_memfd;
|
||||
pa_idxset *registered_memfd_ids;
|
||||
|
||||
pa_memimport *import;
|
||||
pa_memexport *export;
|
||||
|
||||
|
|
@ -168,11 +180,33 @@ struct pa_pstream {
|
|||
pa_mempool *mempool;
|
||||
|
||||
#ifdef HAVE_CREDS
|
||||
pa_cmsg_ancil_data read_ancil_data, write_ancil_data;
|
||||
pa_cmsg_ancil_data read_ancil_data, *write_ancil_data;
|
||||
bool send_ancil_data_now;
|
||||
#endif
|
||||
};
|
||||
|
||||
#ifdef HAVE_CREDS
|
||||
/* Don't close the ancillary fds by your own! Always call this method;
|
||||
* it guarantees necessary cleanups after fds close.. This method is
|
||||
* also multiple-invocations safe. */
|
||||
void pa_cmsg_ancil_data_close_fds(struct pa_cmsg_ancil_data *ancil) {
|
||||
if (ancil && ancil->close_fds_on_cleanup) {
|
||||
int i;
|
||||
|
||||
pa_assert(ancil->nfd <= MAX_ANCIL_DATA_FDS);
|
||||
|
||||
for (i = 0; i < ancil->nfd; i++)
|
||||
if (ancil->fds[i] != -1) {
|
||||
pa_assert_se(pa_close(ancil->fds[i]) == 0);
|
||||
ancil->fds[i] = -1;
|
||||
}
|
||||
|
||||
ancil->nfd = 0;
|
||||
ancil->close_fds_on_cleanup = false;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static int do_write(pa_pstream *p);
|
||||
static int do_read(pa_pstream *p, struct pstream_read *re);
|
||||
|
||||
|
|
@ -287,6 +321,35 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo
|
|||
return p;
|
||||
}
|
||||
|
||||
/* Attach memfd<->SHM_ID mapping to given pstream and its memimport.
|
||||
* Check pa_pstream_register_memfd_mempool() for further info.
|
||||
*
|
||||
* Caller owns the passed @memfd_fd and must close it down when appropriate. */
|
||||
int pa_pstream_attach_memfd_shmid(pa_pstream *p, unsigned shm_id, int memfd_fd) {
|
||||
int err = -1;
|
||||
|
||||
pa_assert(memfd_fd != -1);
|
||||
|
||||
if (!p->use_memfd) {
|
||||
pa_log_warn("Received memfd ID registration request over a pipe "
|
||||
"that does not support memfds");
|
||||
return err;
|
||||
}
|
||||
|
||||
if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
|
||||
pa_log_warn("previously registered memfd SHM ID = %u", shm_id);
|
||||
return err;
|
||||
}
|
||||
|
||||
if (pa_memimport_attach_memfd(p->import, shm_id, memfd_fd, true)) {
|
||||
pa_log("Failed to create permanent mapping for memfd region with ID = %u", shm_id);
|
||||
return err;
|
||||
}
|
||||
|
||||
pa_assert_se(pa_idxset_put(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL) == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void item_free(void *item) {
|
||||
struct item_info *i = item;
|
||||
pa_assert(i);
|
||||
|
|
@ -299,6 +362,15 @@ static void item_free(void *item) {
|
|||
pa_packet_unref(i->packet);
|
||||
}
|
||||
|
||||
#ifdef HAVE_CREDS
|
||||
/* On error recovery paths, there might be lingering items
|
||||
* on the pstream send queue and they are usually freed with
|
||||
* a call to 'pa_queue_free(p->send_queue, item_free)'. Make
|
||||
* sure we do not leak any fds in that case! */
|
||||
if (i->with_ancil_data)
|
||||
pa_cmsg_ancil_data_close_fds(&i->ancil_data);
|
||||
#endif
|
||||
|
||||
if (pa_flist_push(PA_STATIC_FLIST_GET(items), i) < 0)
|
||||
pa_xfree(i);
|
||||
}
|
||||
|
|
@ -328,18 +400,25 @@ static void pstream_free(pa_pstream *p) {
|
|||
if (p->readio.packet)
|
||||
pa_packet_unref(p->readio.packet);
|
||||
|
||||
if (p->registered_memfd_ids)
|
||||
pa_idxset_free(p->registered_memfd_ids, NULL);
|
||||
|
||||
pa_xfree(p);
|
||||
}
|
||||
|
||||
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_cmsg_ancil_data *ancil_data) {
|
||||
void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, pa_cmsg_ancil_data *ancil_data) {
|
||||
struct item_info *i;
|
||||
|
||||
pa_assert(p);
|
||||
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||
pa_assert(packet);
|
||||
|
||||
if (p->dead)
|
||||
if (p->dead) {
|
||||
#ifdef HAVE_CREDS
|
||||
pa_cmsg_ancil_data_close_fds(ancil_data);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(items))))
|
||||
i = pa_xnew(struct item_info, 1);
|
||||
|
|
@ -556,22 +635,40 @@ static void prepare_next_write_item(pa_pstream *p) {
|
|||
&shm_id,
|
||||
&offset,
|
||||
&length) >= 0) {
|
||||
pa_assert(type == PA_MEM_TYPE_SHARED_POSIX);
|
||||
|
||||
flags |= PA_FLAG_SHMDATA;
|
||||
if (pa_mempool_is_remote_writable(current_pool))
|
||||
flags |= PA_FLAG_SHMWRITABLE;
|
||||
send_payload = false;
|
||||
if (type == PA_MEM_TYPE_SHARED_POSIX)
|
||||
send_payload = false;
|
||||
|
||||
shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
|
||||
shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
|
||||
shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
|
||||
shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
|
||||
if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd) {
|
||||
if (pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
|
||||
flags |= PA_FLAG_SHMDATA_MEMFD_BLOCK;
|
||||
send_payload = false;
|
||||
} else {
|
||||
if (pa_log_ratelimit(PA_LOG_ERROR)) {
|
||||
pa_log("Cannot send block reference with non-registered memfd ID = %u", shm_id);
|
||||
pa_log("Fallig back to copying full block data over socket");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
|
||||
p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
|
||||
if (send_payload) {
|
||||
pa_assert_se(pa_memexport_process_release(current_export, block_id) == 0);
|
||||
} else {
|
||||
flags |= PA_FLAG_SHMDATA;
|
||||
if (pa_mempool_is_remote_writable(current_pool))
|
||||
flags |= PA_FLAG_SHMWRITABLE;
|
||||
|
||||
shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
|
||||
shm_info[PA_PSTREAM_SHM_SHMID] = htonl(shm_id);
|
||||
shm_info[PA_PSTREAM_SHM_INDEX] = htonl((uint32_t) (offset + p->write.current->chunk.index));
|
||||
shm_info[PA_PSTREAM_SHM_LENGTH] = htonl((uint32_t) p->write.current->chunk.length);
|
||||
|
||||
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(shm_size);
|
||||
p->write.minibuf_validsize = PA_PSTREAM_DESCRIPTOR_SIZE + shm_size;
|
||||
}
|
||||
}
|
||||
/* else */
|
||||
/* FIXME: Avoid memexport slot leaks. Call pa_memexport_process_release() */
|
||||
/* pa_log_warn("Failed to export memory block."); */
|
||||
|
||||
if (current_export != p->export)
|
||||
|
|
@ -590,7 +687,7 @@ static void prepare_next_write_item(pa_pstream *p) {
|
|||
|
||||
#ifdef HAVE_CREDS
|
||||
if ((p->send_ancil_data_now = p->write.current->with_ancil_data))
|
||||
p->write_ancil_data = p->write.current->ancil_data;
|
||||
p->write_ancil_data = &p->write.current->ancil_data;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
@ -650,14 +747,16 @@ static int do_write(pa_pstream *p) {
|
|||
|
||||
#ifdef HAVE_CREDS
|
||||
if (p->send_ancil_data_now) {
|
||||
if (p->write_ancil_data.creds_valid) {
|
||||
pa_assert(p->write_ancil_data.nfd == 0);
|
||||
if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data.creds)) < 0)
|
||||
if (p->write_ancil_data->creds_valid) {
|
||||
pa_assert(p->write_ancil_data->nfd == 0);
|
||||
if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_ancil_data->creds)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
else
|
||||
if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data.nfd, p->write_ancil_data.fds)) < 0)
|
||||
if ((r = pa_iochannel_write_with_fds(p->io, d, l, p->write_ancil_data->nfd, p->write_ancil_data->fds)) < 0)
|
||||
goto fail;
|
||||
|
||||
pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
|
||||
p->send_ancil_data_now = false;
|
||||
} else
|
||||
#endif
|
||||
|
|
@ -688,6 +787,10 @@ static int do_write(pa_pstream *p) {
|
|||
return (size_t) r == l ? 1 : 0;
|
||||
|
||||
fail:
|
||||
#ifdef HAVE_CREDS
|
||||
if (p->send_ancil_data_now)
|
||||
pa_cmsg_ancil_data_close_fds(p->write_ancil_data);
|
||||
#endif
|
||||
|
||||
if (release_memblock)
|
||||
pa_memblock_release(release_memblock);
|
||||
|
|
@ -768,6 +871,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
|
|||
pa_assert(b.nfd <= MAX_ANCIL_DATA_FDS);
|
||||
p->read_ancil_data.nfd = b.nfd;
|
||||
memcpy(p->read_ancil_data.fds, b.fds, sizeof(int) * b.nfd);
|
||||
p->read_ancil_data.close_fds_on_cleanup = b.close_fds_on_cleanup;
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
|
@ -844,7 +948,7 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA) {
|
||||
if (((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0) {
|
||||
|
||||
if (length != sizeof(re->shm_info)) {
|
||||
pa_log_warn("Received SHM memblock frame with invalid frame length.");
|
||||
|
|
@ -887,19 +991,28 @@ static int do_read(pa_pstream *p, struct pstream_read *re) {
|
|||
|
||||
pa_packet_unref(re->packet);
|
||||
} else {
|
||||
pa_memblock *b;
|
||||
pa_memblock *b = NULL;
|
||||
uint32_t flags = ntohl(re->descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
|
||||
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
|
||||
uint32_t shm_id = ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]);
|
||||
pa_mem_type_t type = (flags & PA_FLAG_SHMDATA_MEMFD_BLOCK) ?
|
||||
PA_MEM_TYPE_SHARED_MEMFD : PA_MEM_TYPE_SHARED_POSIX;
|
||||
|
||||
pa_assert(((flags & PA_FLAG_SHMMASK) & PA_FLAG_SHMDATA) != 0);
|
||||
pa_assert(p->import);
|
||||
|
||||
if (!(b = pa_memimport_get(p->import,
|
||||
PA_MEM_TYPE_SHARED_POSIX,
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_SHMID]),
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
|
||||
!!(flags & PA_FLAG_SHMWRITABLE)))) {
|
||||
if (type == PA_MEM_TYPE_SHARED_MEMFD && p->use_memfd &&
|
||||
!pa_idxset_get_by_data(p->registered_memfd_ids, PA_UINT32_TO_PTR(shm_id), NULL)) {
|
||||
|
||||
if (pa_log_ratelimit(PA_LOG_ERROR))
|
||||
pa_log("Ignoring received block reference with non-registered memfd ID = %u", shm_id);
|
||||
|
||||
} else if (!(b = pa_memimport_get(p->import,
|
||||
type,
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_BLOCKID]),
|
||||
shm_id,
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_INDEX]),
|
||||
ntohl(re->shm_info[PA_PSTREAM_SHM_LENGTH]),
|
||||
!!(flags & PA_FLAG_SHMWRITABLE)))) {
|
||||
|
||||
if (pa_log_ratelimit(PA_LOG_DEBUG))
|
||||
pa_log_debug("Failed to import memory block.");
|
||||
|
|
@ -942,6 +1055,13 @@ frame_done:
|
|||
re->data = NULL;
|
||||
|
||||
#ifdef HAVE_CREDS
|
||||
/* FIXME: Close received ancillary data fds if the pstream's
|
||||
* receive_packet_callback did not do so.
|
||||
*
|
||||
* Malicious clients can attach fds to unknown commands, or attach them
|
||||
* to commands that does not expect fds. By doing so, server will reach
|
||||
* its open fd limit and future clients' SHM transfers will always fail.
|
||||
*/
|
||||
p->read_ancil_data.creds_valid = false;
|
||||
p->read_ancil_data.nfd = 0;
|
||||
#endif
|
||||
|
|
@ -1090,6 +1210,18 @@ void pa_pstream_enable_shm(pa_pstream *p, bool enable) {
|
|||
}
|
||||
}
|
||||
|
||||
void pa_pstream_enable_memfd(pa_pstream *p) {
|
||||
pa_assert(p);
|
||||
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||
pa_assert(p->use_shm);
|
||||
|
||||
p->use_memfd = true;
|
||||
|
||||
if (!p->registered_memfd_ids) {
|
||||
p->registered_memfd_ids = pa_idxset_new(NULL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
bool pa_pstream_get_shm(pa_pstream *p) {
|
||||
pa_assert(p);
|
||||
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||
|
|
@ -1097,6 +1229,13 @@ bool pa_pstream_get_shm(pa_pstream *p) {
|
|||
return p->use_shm;
|
||||
}
|
||||
|
||||
bool pa_pstream_get_memfd(pa_pstream *p) {
|
||||
pa_assert(p);
|
||||
pa_assert(PA_REFCNT_VALUE(p) > 0);
|
||||
|
||||
return p->use_memfd;
|
||||
}
|
||||
|
||||
void pa_pstream_set_srbchannel(pa_pstream *p, pa_srbchannel *srb) {
|
||||
pa_assert(p);
|
||||
pa_assert(PA_REFCNT_VALUE(p) > 0 || srb == NULL);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue