pipewire/src/modules/module-protocol-pulse/stream.c
Wim Taymans b7aae374bf pulse-server: keep allocate buffer size around
For playback and capture streams we allocate MAXLENGTH (4M) buffers but
for upload streams we must allow space for the total upload stream, which
can be up to the max allowed sample size (16M).

Keep the allocated size for the stream around in a variable so that we
can use it when writing/reading to/from the ringbuffer.

This could later also be extended to use the attr.maxlength variable to
size the buffer (but it's usually 4M anyway). This is more complicated
because we need to grow the buffer size when new attributes are set,
which is probably more complicated than useful.
2026-04-30 15:25:31 +02:00

520 lines
13 KiB
C

/* PipeWire */
/* SPDX-FileCopyrightText: Copyright © 2020 Wim Taymans */
/* SPDX-License-Identifier: MIT */
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <spa/utils/hook.h>
#include <spa/utils/ringbuffer.h>
#include <spa/pod/dynamic.h>
#include <spa/param/tag-utils.h>
#include <pipewire/log.h>
#include <pipewire/loop.h>
#include <pipewire/map.h>
#include <pipewire/properties.h>
#include <pipewire/stream.h>
#include <pipewire/work-queue.h>
#include "client.h"
#include "commands.h"
#include "defs.h"
#include "internal.h"
#include "log.h"
#include "message.h"
#include "reply.h"
#include "stream.h"
static int parse_frac(struct pw_properties *props, const char *key,
const struct spa_fraction *def, struct spa_fraction *res)
{
const char *str;
if (props == NULL ||
(str = pw_properties_get(props, key)) == NULL ||
sscanf(str, "%u/%u", &res->num, &res->denom) != 2 ||
res->denom == 0) {
*res = *def;
}
return 0;
}
static void create_stream_timeout(void *user_data)
{
struct stream *stream = user_data;
if (stream->create_tag != SPA_ID_INVALID) {
pw_log_warn("[%s] timeout on stream %p channel:%d", stream->client->name, stream, stream->channel);
/* Don't try to signal anything to the client, it's already killed the stream on its end */
stream->drain_tag = 0;
stream->killed = false;
stream_free(stream);
}
}
struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag,
const struct sample_spec *ss, const struct channel_map *map,
const struct buffer_attr *attr)
{
int res;
struct defs *defs = &client->impl->defs;
const char *str;
if (pw_map_get_size(&client->streams) >= defs->max_streams) {
errno = ENOSPC;
return NULL;
}
struct stream *stream = calloc(1, sizeof(*stream));
if (stream == NULL)
return NULL;
stream->channel = pw_map_insert_new(&client->streams, stream);
if (stream->channel == SPA_ID_INVALID)
goto error_errno;
stream->impl = client->impl;
stream->client = client;
stream->type = type;
stream->create_tag = create_tag;
stream->ss = *ss;
stream->map = *map;
stream->attr = *attr;
spa_ringbuffer_init(&stream->ring);
stream->peer_index = SPA_ID_INVALID;
parse_frac(client->props, "pulse.min.req", &defs->min_req, &stream->min_req);
parse_frac(client->props, "pulse.min.frag", &defs->min_frag, &stream->min_frag);
parse_frac(client->props, "pulse.min.quantum", &defs->min_quantum, &stream->min_quantum);
parse_frac(client->props, "pulse.default.req", &defs->default_req, &stream->default_req);
parse_frac(client->props, "pulse.default.frag", &defs->default_frag, &stream->default_frag);
parse_frac(client->props, "pulse.default.tlength", &defs->default_tlength, &stream->default_tlength);
stream->idle_timeout_sec = defs->idle_timeout;
if ((str = pw_properties_get(client->props, "pulse.idle.timeout")) != NULL)
spa_atou32(str, &stream->idle_timeout_sec, 0);
switch (type) {
case STREAM_TYPE_RECORD:
stream->direction = PW_DIRECTION_INPUT;
break;
case STREAM_TYPE_PLAYBACK:
case STREAM_TYPE_UPLOAD:
stream->direction = PW_DIRECTION_OUTPUT;
break;
default:
spa_assert_not_reached();
}
/* Time out if we don't get a link and can't send a reply to create in 35s. Client will time out in
* 30s and clean up its stream anyway. */
pw_timer_queue_add(stream->impl->timer_queue, &stream->timer, NULL,
STREAM_CREATE_TIMEOUT, create_stream_timeout, stream);
return stream;
error_errno:
res = errno;
free(stream);
errno = res;
return NULL;
}
void stream_created(struct stream *stream)
{
struct client *client = stream->client;
pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
stream->create_tag = SPA_ID_INVALID;
pw_timer_queue_cancel(&stream->timer);
}
void stream_free(struct stream *stream)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
pw_timer_queue_cancel(&stream->timer);
if (stream->drain_tag)
reply_error(client, -1, stream->drain_tag, -ENOENT);
if (stream->killed)
stream_send_killed(stream);
if (stream->stream) {
spa_hook_remove(&stream->stream_listener);
pw_stream_disconnect(stream->stream);
/* force processing of all pending messages before we destroy
* the stream */
pw_loop_invoke(impl->main_loop, NULL, 0, NULL, 0, false, client);
pw_stream_destroy(stream->stream);
}
if (stream->channel != SPA_ID_INVALID)
pw_map_remove(&client->streams, stream->channel);
pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
free(stream->buffer);
pw_properties_free(stream->props);
free(stream);
}
void stream_flush(struct stream *stream)
{
pw_stream_flush(stream->stream, false);
if (stream->type == STREAM_TYPE_PLAYBACK) {
stream->ring.writeindex = stream->ring.readindex;
stream->write_index = stream->read_index;
if (stream->attr.prebuf > 0)
stream->in_prebuf = true;
stream->playing_for = 0;
stream->underrun_for = -1;
stream->is_underrun = true;
stream_send_request(stream);
} else {
stream->ring.readindex = stream->ring.writeindex;
stream->read_index = stream->write_index;
}
}
static bool stream_prebuf_active(struct stream *stream, int32_t avail)
{
if (stream->in_prebuf) {
if (avail >= (int32_t) stream->attr.prebuf)
stream->in_prebuf = false;
} else {
if (stream->attr.prebuf > 0 && avail <= 0)
stream->in_prebuf = true;
}
return stream->in_prebuf;
}
uint32_t stream_pop_missing(struct stream *stream)
{
int64_t missing, avail;
avail = stream->write_index - stream->read_index;
missing = stream->attr.tlength;
missing -= stream->requested;
missing -= avail;
if (missing <= 0) {
pw_log_debug("stream %p: (tlen:%u - req:%"PRIi64" - avail:%"PRIi64") <= 0",
stream, stream->attr.tlength, stream->requested, avail);
return 0;
}
if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail)) {
pw_log_debug("stream %p: (tlen:%u - req:%"PRIi64" - avail:%"PRIi64") <= minreq:%u",
stream, stream->attr.tlength, stream->requested, avail,
stream->attr.minreq);
return 0;
}
stream->requested += missing;
return missing;
}
void stream_set_paused(struct stream *stream, bool paused, const char *reason)
{
if (stream->is_paused == paused)
return;
if (reason && stream->client)
pw_log_info("%p: [%s] %s because of %s",
stream, stream->client->name,
paused ? "paused" : "resumed", reason);
stream->is_paused = paused;
pw_stream_set_active(stream->stream, !paused);
}
void stream_set_corked(struct stream *stream, bool cork)
{
stream->corked = cork;
pw_log_info("cork %d", cork);
pw_stream_update_properties(stream->stream,
&SPA_DICT_ITEMS(
SPA_DICT_ITEM("pulse.corked", cork ? "true" : "false")));
stream_set_paused(stream, cork, "cork request");
}
int stream_send_underflow(struct stream *stream, int64_t offset)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
int suppressed;
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, stream->timestamp)) >= 0) {
pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64" (%d suppressed)",
client->name, stream->channel, offset, suppressed);
}
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, COMMAND_UNDERFLOW,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_INVALID);
if (client->version >= 23) {
message_put(reply,
TAG_S64, offset,
TAG_INVALID);
}
return client_queue_message(client, reply);
}
int stream_send_overflow(struct stream *stream)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u",
client, client->name, stream, stream->channel);
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, COMMAND_OVERFLOW,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_INVALID);
return client_queue_message(client, reply);
}
int stream_send_killed(struct stream *stream)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
uint32_t command;
command = stream->direction == PW_DIRECTION_OUTPUT ?
COMMAND_PLAYBACK_STREAM_KILLED :
COMMAND_RECORD_STREAM_KILLED;
pw_log_info("[%s]: %s channel:%u",
client->name, commands[command].name, stream->channel);
if (client->version < 23)
return 0;
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, command,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_INVALID);
return client_queue_message(client, reply);
}
int stream_send_started(struct stream *stream)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
pw_log_debug("client %p [%s]: stream %p STARTED channel:%u",
client, client->name, stream, stream->channel);
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, COMMAND_STARTED,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_INVALID);
return client_queue_message(client, reply);
}
int stream_send_suspended(struct stream *stream, bool suspended)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
uint32_t command;
pw_log_debug("client %p [%s]: stream %p SUSPENDED %d channel:%u",
client, client->name, stream, suspended, stream->channel);
command = stream->direction == PW_DIRECTION_OUTPUT ?
COMMAND_PLAYBACK_STREAM_SUSPENDED :
COMMAND_RECORD_STREAM_SUSPENDED;
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, command,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_BOOLEAN, suspended,
TAG_INVALID);
return client_queue_message(client, reply);
}
int stream_send_request(struct stream *stream)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *msg;
uint32_t size;
size = stream_pop_missing(stream);
if (size == 0)
return 0;
pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
msg = message_alloc(impl, -1, 0);
message_put(msg,
TAG_U32, COMMAND_REQUEST,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_U32, size,
TAG_INVALID);
return client_queue_message(client, msg);
}
int stream_update_minreq(struct stream *stream, uint32_t minreq)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
uint32_t old_tlength = stream->attr.tlength;
uint32_t new_tlength = minreq + 2 * stream->attr.minreq;
uint64_t lat_usec;
if (new_tlength <= old_tlength)
return 0;
if (new_tlength > MAXLENGTH)
new_tlength = MAXLENGTH;
stream->attr.tlength = new_tlength;
if (stream->attr.tlength > stream->attr.maxlength)
stream->attr.maxlength = stream->attr.tlength;
if (client->version >= 15) {
struct message *msg;
lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate;
msg = message_alloc(impl, -1, 0);
message_put(msg,
TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.tlength,
TAG_U32, stream->attr.prebuf,
TAG_U32, stream->attr.minreq,
TAG_USEC, lat_usec,
TAG_INVALID);
return client_queue_message(client, msg);
}
return 0;
}
int stream_send_moved(struct stream *stream, uint32_t peer_index, const char *peer_name)
{
struct client *client = stream->client;
struct impl *impl = client->impl;
struct message *reply;
uint32_t command;
command = stream->direction == PW_DIRECTION_OUTPUT ?
COMMAND_PLAYBACK_STREAM_MOVED :
COMMAND_RECORD_STREAM_MOVED;
pw_log_info("client %p [%s]: stream %p %s channel:%u",
client, client->name, stream, commands[command].name,
stream->channel);
if (client->version < 12)
return 0;
reply = message_alloc(impl, -1, 0);
message_put(reply,
TAG_U32, command,
TAG_U32, -1,
TAG_U32, stream->channel,
TAG_U32, peer_index,
TAG_STRING, peer_name,
TAG_BOOLEAN, false, /* suspended */
TAG_INVALID);
if (client->version >= 13) {
if (command == COMMAND_PLAYBACK_STREAM_MOVED) {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.tlength,
TAG_U32, stream->attr.prebuf,
TAG_U32, stream->attr.minreq,
TAG_USEC, stream->lat_usec,
TAG_INVALID);
} else {
message_put(reply,
TAG_U32, stream->attr.maxlength,
TAG_U32, stream->attr.fragsize,
TAG_USEC, stream->lat_usec,
TAG_INVALID);
}
}
return client_queue_message(client, reply);
}
int stream_update_tag_param(struct stream *stream)
{
struct spa_pod_dynamic_builder b;
const struct pw_properties *props = pw_stream_get_properties(stream->stream);
const struct spa_pod *param[1];
struct spa_dict_item items[64];
uint32_t i, n_items = 0;
uint8_t buffer[4096];
if (props == NULL)
return -EIO;
spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096);
for (i = 0; i < props->dict.n_items; i++) {
if (n_items < SPA_N_ELEMENTS(items) &&
spa_strstartswith(props->dict.items[i].key, "media."))
items[n_items++] = props->dict.items[i];
}
if (n_items > 0) {
struct spa_pod_frame f;
spa_tag_build_start(&b.b, &f, SPA_PARAM_Tag, SPA_DIRECTION_OUTPUT);
spa_tag_build_add_dict(&b.b, &SPA_DICT_INIT(items, n_items));
param[0] = spa_tag_build_end(&b.b, &f);
} else {
param[0] = NULL;
}
if (param[0] != NULL)
pw_stream_update_params(stream->stream, param, 1);
spa_pod_dynamic_builder_clean(&b);
return 0;
}