2021-06-18 23:36:35 +02:00
|
|
|
/* PipeWire
|
|
|
|
|
*
|
|
|
|
|
* Copyright © 2020 Wim Taymans
|
|
|
|
|
*
|
|
|
|
|
* Permission is hereby granted, free of charge, to any person obtaining a
|
|
|
|
|
* copy of this software and associated documentation files (the "Software"),
|
|
|
|
|
* to deal in the Software without restriction, including without limitation
|
|
|
|
|
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
|
|
|
* and/or sell copies of the Software, and to permit persons to whom the
|
|
|
|
|
* Software is furnished to do so, subject to the following conditions:
|
|
|
|
|
*
|
|
|
|
|
* The above copyright notice and this permission notice (including the next
|
|
|
|
|
* paragraph) shall be included in all copies or substantial portions of the
|
|
|
|
|
* Software.
|
|
|
|
|
*
|
|
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
|
|
|
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
|
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
|
|
|
* DEALINGS IN THE SOFTWARE.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include <stdbool.h>
|
|
|
|
|
#include <stdint.h>
|
|
|
|
|
#include <stdlib.h>
|
|
|
|
|
|
|
|
|
|
#include <spa/utils/hook.h>
|
|
|
|
|
#include <spa/utils/ringbuffer.h>
|
|
|
|
|
#include <pipewire/log.h>
|
|
|
|
|
#include <pipewire/loop.h>
|
|
|
|
|
#include <pipewire/map.h>
|
|
|
|
|
#include <pipewire/private.h>
|
|
|
|
|
#include <pipewire/properties.h>
|
|
|
|
|
#include <pipewire/stream.h>
|
|
|
|
|
#include <pipewire/work-queue.h>
|
|
|
|
|
|
|
|
|
|
#include "client.h"
|
|
|
|
|
#include "commands.h"
|
|
|
|
|
#include "internal.h"
|
2021-09-22 09:28:54 +10:00
|
|
|
#include "log.h"
|
2021-06-18 23:36:35 +02:00
|
|
|
#include "message.h"
|
|
|
|
|
#include "reply.h"
|
|
|
|
|
#include "stream.h"
|
|
|
|
|
|
2021-11-16 13:08:47 +01:00
|
|
|
struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag,
|
|
|
|
|
const struct sample_spec *ss, const struct channel_map *map,
|
|
|
|
|
const struct buffer_attr *attr)
|
|
|
|
|
{
|
|
|
|
|
int res;
|
|
|
|
|
|
|
|
|
|
struct stream *stream = calloc(1, sizeof(*stream));
|
|
|
|
|
if (stream == NULL)
|
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
|
|
stream->channel = pw_map_insert_new(&client->streams, stream);
|
|
|
|
|
if (stream->channel == SPA_ID_INVALID)
|
|
|
|
|
goto error_errno;
|
|
|
|
|
|
|
|
|
|
stream->impl = client->impl;
|
|
|
|
|
stream->client = client;
|
|
|
|
|
stream->type = type;
|
|
|
|
|
stream->create_tag = create_tag;
|
|
|
|
|
stream->ss = *ss;
|
|
|
|
|
stream->map = *map;
|
|
|
|
|
stream->attr = *attr;
|
|
|
|
|
spa_ringbuffer_init(&stream->ring);
|
|
|
|
|
|
|
|
|
|
switch (type) {
|
|
|
|
|
case STREAM_TYPE_RECORD:
|
|
|
|
|
stream->direction = PW_DIRECTION_INPUT;
|
|
|
|
|
break;
|
|
|
|
|
case STREAM_TYPE_PLAYBACK:
|
|
|
|
|
case STREAM_TYPE_UPLOAD:
|
|
|
|
|
stream->direction = PW_DIRECTION_OUTPUT;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
spa_assert_not_reached();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return stream;
|
|
|
|
|
|
|
|
|
|
error_errno:
|
|
|
|
|
res = errno;
|
|
|
|
|
free(stream);
|
|
|
|
|
errno = res;
|
|
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-18 23:36:35 +02:00
|
|
|
void stream_free(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
|
|
|
|
|
pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
|
|
|
|
|
|
2021-12-14 14:39:38 +01:00
|
|
|
if (stream->pending)
|
|
|
|
|
spa_list_remove(&stream->link);
|
2021-08-26 15:53:33 -04:00
|
|
|
|
2021-06-18 23:36:35 +02:00
|
|
|
if (stream->drain_tag)
|
|
|
|
|
reply_error(client, -1, stream->drain_tag, -ENOENT);
|
|
|
|
|
|
|
|
|
|
if (stream->killed)
|
|
|
|
|
stream_send_killed(stream);
|
|
|
|
|
|
|
|
|
|
if (stream->stream) {
|
|
|
|
|
spa_hook_remove(&stream->stream_listener);
|
2021-07-26 11:48:00 +02:00
|
|
|
pw_stream_disconnect(stream->stream);
|
|
|
|
|
|
|
|
|
|
/* force processing of all pending messages before we destroy
|
|
|
|
|
* the stream */
|
|
|
|
|
pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
|
|
|
|
|
|
2021-06-18 23:36:35 +02:00
|
|
|
pw_stream_destroy(stream->stream);
|
|
|
|
|
}
|
2021-07-26 11:48:00 +02:00
|
|
|
if (stream->channel != SPA_ID_INVALID)
|
|
|
|
|
pw_map_remove(&client->streams, stream->channel);
|
2021-06-18 23:36:35 +02:00
|
|
|
|
|
|
|
|
pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
|
|
|
|
|
|
|
|
|
|
if (stream->buffer)
|
|
|
|
|
free(stream->buffer);
|
|
|
|
|
|
|
|
|
|
pw_properties_free(stream->props);
|
|
|
|
|
|
|
|
|
|
free(stream);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void stream_flush(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
pw_stream_flush(stream->stream, false);
|
|
|
|
|
|
|
|
|
|
if (stream->type == STREAM_TYPE_PLAYBACK) {
|
|
|
|
|
stream->ring.writeindex = stream->ring.readindex;
|
|
|
|
|
stream->write_index = stream->read_index;
|
|
|
|
|
|
|
|
|
|
if (stream->attr.prebuf > 0)
|
|
|
|
|
stream->in_prebuf = true;
|
|
|
|
|
|
|
|
|
|
stream->playing_for = 0;
|
|
|
|
|
stream->underrun_for = -1;
|
|
|
|
|
stream->is_underrun = true;
|
|
|
|
|
|
|
|
|
|
stream_send_request(stream);
|
|
|
|
|
} else {
|
|
|
|
|
stream->ring.readindex = stream->ring.writeindex;
|
|
|
|
|
stream->read_index = stream->write_index;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-01-10 12:05:29 +01:00
|
|
|
static bool stream_prebuf_active(struct stream *stream, int32_t avail)
|
2021-06-18 23:36:35 +02:00
|
|
|
{
|
2021-12-14 15:47:19 +01:00
|
|
|
if (stream->in_prebuf) {
|
|
|
|
|
if (avail >= (int32_t) stream->attr.prebuf)
|
|
|
|
|
stream->in_prebuf = false;
|
|
|
|
|
} else {
|
|
|
|
|
if (stream->attr.prebuf > 0 && avail <= 0)
|
|
|
|
|
stream->in_prebuf = true;
|
|
|
|
|
}
|
|
|
|
|
return stream->in_prebuf;
|
2021-06-18 23:36:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
uint32_t stream_pop_missing(struct stream *stream)
|
|
|
|
|
{
|
2022-01-10 12:05:29 +01:00
|
|
|
int64_t missing, avail;
|
|
|
|
|
|
|
|
|
|
avail = stream->write_index - stream->read_index;
|
|
|
|
|
|
|
|
|
|
missing = stream->attr.tlength;
|
|
|
|
|
missing -= stream->requested;
|
|
|
|
|
missing -= avail;
|
2021-06-18 23:36:35 +02:00
|
|
|
|
2022-01-10 12:05:29 +01:00
|
|
|
if (missing <= 0)
|
2021-06-18 23:36:35 +02:00
|
|
|
return 0;
|
|
|
|
|
|
2022-01-10 12:05:29 +01:00
|
|
|
if (missing < stream->attr.minreq && !stream_prebuf_active(stream, avail))
|
2021-06-18 23:36:35 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
stream->requested += missing;
|
|
|
|
|
|
|
|
|
|
return missing;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stream_send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
struct message *reply;
|
|
|
|
|
|
2021-08-12 16:56:10 +02:00
|
|
|
if (ratelimit_test(&impl->rate_limit, stream->timestamp, SPA_LOG_LEVEL_INFO)) {
|
2021-11-05 16:22:22 +01:00
|
|
|
pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64 " underrun:%u",
|
|
|
|
|
client->name, stream->channel, offset, underrun_for);
|
2021-06-18 23:36:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
reply = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(reply,
|
|
|
|
|
TAG_U32, COMMAND_UNDERFLOW,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
|
|
|
|
|
if (client->version >= 23) {
|
|
|
|
|
message_put(reply,
|
|
|
|
|
TAG_S64, offset,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return client_queue_message(client, reply);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stream_send_overflow(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
struct message *reply;
|
|
|
|
|
|
|
|
|
|
pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u",
|
|
|
|
|
client, client->name, stream, stream->channel);
|
|
|
|
|
|
|
|
|
|
reply = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(reply,
|
|
|
|
|
TAG_U32, COMMAND_OVERFLOW,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
|
|
|
|
|
return client_queue_message(client, reply);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stream_send_killed(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
struct message *reply;
|
|
|
|
|
uint32_t command;
|
|
|
|
|
|
|
|
|
|
command = stream->direction == PW_DIRECTION_OUTPUT ?
|
|
|
|
|
COMMAND_PLAYBACK_STREAM_KILLED :
|
|
|
|
|
COMMAND_RECORD_STREAM_KILLED;
|
|
|
|
|
|
2021-11-05 16:22:22 +01:00
|
|
|
pw_log_info("[%s]: %s channel:%u",
|
|
|
|
|
client->name, commands[command].name, stream->channel);
|
2021-06-18 23:36:35 +02:00
|
|
|
|
|
|
|
|
if (client->version < 23)
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
reply = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(reply,
|
|
|
|
|
TAG_U32, command,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
|
|
|
|
|
return client_queue_message(client, reply);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stream_send_started(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
struct message *reply;
|
|
|
|
|
|
|
|
|
|
pw_log_debug("client %p [%s]: stream %p STARTED channel:%u",
|
|
|
|
|
client, client->name, stream, stream->channel);
|
|
|
|
|
|
|
|
|
|
reply = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(reply,
|
|
|
|
|
TAG_U32, COMMAND_STARTED,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
|
|
|
|
|
return client_queue_message(client, reply);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stream_send_request(struct stream *stream)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
struct message *msg;
|
|
|
|
|
uint32_t size;
|
|
|
|
|
|
|
|
|
|
size = stream_pop_missing(stream);
|
|
|
|
|
pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
|
|
|
|
|
|
|
|
|
|
if (size == 0)
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
msg = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(msg,
|
|
|
|
|
TAG_U32, COMMAND_REQUEST,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_U32, size,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
|
|
|
|
|
return client_queue_message(client, msg);
|
|
|
|
|
}
|
2021-12-14 15:48:54 +01:00
|
|
|
|
|
|
|
|
int stream_update_minreq(struct stream *stream, uint32_t minreq)
|
|
|
|
|
{
|
|
|
|
|
struct client *client = stream->client;
|
|
|
|
|
struct impl *impl = client->impl;
|
|
|
|
|
uint32_t old_tlength = stream->attr.tlength;
|
|
|
|
|
uint32_t new_tlength = minreq + 2 * stream->attr.minreq;
|
|
|
|
|
uint64_t lat_usec;
|
|
|
|
|
|
2021-12-20 17:33:55 +01:00
|
|
|
if (new_tlength <= old_tlength)
|
2021-12-14 15:48:54 +01:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
stream->attr.tlength = new_tlength;
|
|
|
|
|
|
|
|
|
|
if (client->version >= 15) {
|
|
|
|
|
struct message *msg;
|
|
|
|
|
|
|
|
|
|
lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate;
|
|
|
|
|
|
|
|
|
|
msg = message_alloc(impl, -1, 0);
|
|
|
|
|
message_put(msg,
|
|
|
|
|
TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED,
|
|
|
|
|
TAG_U32, -1,
|
|
|
|
|
TAG_U32, stream->channel,
|
|
|
|
|
TAG_U32, stream->attr.maxlength,
|
|
|
|
|
TAG_U32, stream->attr.tlength,
|
|
|
|
|
TAG_U32, stream->attr.prebuf,
|
|
|
|
|
TAG_U32, stream->attr.minreq,
|
|
|
|
|
TAG_USEC, lat_usec,
|
|
|
|
|
TAG_INVALID);
|
|
|
|
|
return client_queue_message(client, msg);
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|