mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-02-07 04:06:12 -05:00
parent
b2ec1fb60a
commit
49d31ea0af
14 changed files with 1044 additions and 665 deletions
|
|
@ -71,6 +71,7 @@
|
|||
#include "pipewire/extensions/metadata.h"
|
||||
|
||||
#include "pulse-server.h"
|
||||
#include "client.h"
|
||||
#include "collect.h"
|
||||
#include "commands.h"
|
||||
#include "dbus-name.h"
|
||||
|
|
@ -78,9 +79,12 @@
|
|||
#include "format.h"
|
||||
#include "internal.h"
|
||||
#include "message.h"
|
||||
#include "operation.h"
|
||||
#include "pending-sample.h"
|
||||
#include "reply.h"
|
||||
#include "sample.h"
|
||||
#include "sample-play.h"
|
||||
#include "stream.h"
|
||||
#include "utils.h"
|
||||
#include "volume.h"
|
||||
|
||||
|
|
@ -100,13 +104,7 @@
|
|||
|
||||
#include "manager.h"
|
||||
|
||||
static bool debug_messages = false;
|
||||
|
||||
struct operation {
|
||||
struct spa_list link;
|
||||
struct client *client;
|
||||
uint32_t tag;
|
||||
};
|
||||
bool debug_messages = false;
|
||||
|
||||
struct latency_offset_data {
|
||||
int64_t prev_latency_offset;
|
||||
|
|
@ -119,9 +117,6 @@ static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t
|
|||
#include "module.c"
|
||||
#include "message-handler.c"
|
||||
|
||||
static void client_free(struct client *client);
|
||||
static void client_unref(struct client *client);
|
||||
|
||||
static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *name)
|
||||
{
|
||||
union pw_map_item *item;
|
||||
|
|
@ -138,311 +133,18 @@ static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *n
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int flush_messages(struct client *client)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
int res;
|
||||
|
||||
while (true) {
|
||||
struct message *m;
|
||||
struct descriptor desc;
|
||||
void *data;
|
||||
size_t size;
|
||||
|
||||
if (spa_list_is_empty(&client->out_messages))
|
||||
break;
|
||||
m = spa_list_first(&client->out_messages, struct message, link);
|
||||
|
||||
if (client->out_index < sizeof(desc)) {
|
||||
desc.length = htonl(m->length);
|
||||
desc.channel = htonl(m->channel);
|
||||
desc.offset_hi = 0;
|
||||
desc.offset_lo = 0;
|
||||
desc.flags = 0;
|
||||
|
||||
data = SPA_PTROFF(&desc, client->out_index, void);
|
||||
size = sizeof(desc) - client->out_index;
|
||||
} else if (client->out_index < m->length + sizeof(desc)) {
|
||||
uint32_t idx = client->out_index - sizeof(desc);
|
||||
data = m->data + idx;
|
||||
size = m->length - idx;
|
||||
} else {
|
||||
if (debug_messages && m->channel == SPA_ID_INVALID)
|
||||
message_dump(SPA_LOG_LEVEL_INFO, m);
|
||||
message_free(impl, m, true, false);
|
||||
client->out_index = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
|
||||
if (res < 0) {
|
||||
res = -errno;
|
||||
if (res == -EINTR)
|
||||
continue;
|
||||
if (res != -EAGAIN && res != -EWOULDBLOCK)
|
||||
pw_log_warn("send channel:%d %zu, error %d: %m", m->channel, size, res);
|
||||
return res;
|
||||
}
|
||||
client->out_index += res;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int send_message(struct client *client, struct message *m)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
int res, mask;
|
||||
|
||||
if (m == NULL)
|
||||
return -EINVAL;
|
||||
|
||||
if (m->length == 0) {
|
||||
res = 0;
|
||||
goto error;
|
||||
} else if (m->length > m->allocated) {
|
||||
res = -ENOMEM;
|
||||
goto error;
|
||||
}
|
||||
|
||||
m->offset = 0;
|
||||
spa_list_append(&client->out_messages, &m->link);
|
||||
|
||||
mask = client->source->mask;
|
||||
if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
|
||||
client->need_flush = true;
|
||||
SPA_FLAG_SET(mask, SPA_IO_OUT);
|
||||
pw_loop_update_io(impl->loop, client->source, mask);
|
||||
}
|
||||
return 0;
|
||||
error:
|
||||
message_free(impl, m, false, false);
|
||||
return res;
|
||||
}
|
||||
|
||||
static struct message *reply_new(struct client *client, uint32_t tag)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
pw_log_debug(NAME" %p: REPLY tag:%u", client, tag);
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_REPLY,
|
||||
TAG_U32, tag,
|
||||
TAG_INVALID);
|
||||
return reply;
|
||||
}
|
||||
|
||||
static int reply_simple_ack(struct client *client, uint32_t tag)
|
||||
{
|
||||
struct message *reply = reply_new(client, tag);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
uint32_t error = res_to_err(res);
|
||||
const char *name;
|
||||
|
||||
if (command < COMMAND_MAX)
|
||||
name = commands[command].name;
|
||||
else
|
||||
name = "invalid";
|
||||
|
||||
pw_log(res == -ENOENT ? SPA_LOG_LEVEL_INFO : SPA_LOG_LEVEL_WARN,
|
||||
NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)",
|
||||
client, client->name, command, name, tag, error, spa_strerror(res));
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_ERROR,
|
||||
TAG_U32, tag,
|
||||
TAG_U32, error,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int operation_new(struct client *client, uint32_t tag)
|
||||
{
|
||||
struct operation *o;
|
||||
|
||||
if ((o = calloc(1, sizeof(*o))) == NULL)
|
||||
return -errno;
|
||||
|
||||
o->client = client;
|
||||
o->tag = tag;
|
||||
spa_list_append(&client->operations, &o->link);
|
||||
pw_manager_sync(client->manager);
|
||||
pw_log_debug(NAME" %p: operation tag:%u", client, tag);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void operation_free(struct operation *o)
|
||||
{
|
||||
spa_list_remove(&o->link);
|
||||
free(o);
|
||||
}
|
||||
|
||||
static void operation_complete(struct operation *o)
|
||||
{
|
||||
struct client *client = o->client;
|
||||
|
||||
pw_log_info(NAME" %p: [%s] tag:%u complete", client, client->name, o->tag);
|
||||
reply_simple_ack(o->client, o->tag);
|
||||
operation_free(o);
|
||||
}
|
||||
|
||||
#include "extension.c"
|
||||
|
||||
static int send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
|
||||
if (ratelimit_test(&impl->rate_limit, stream->timestamp)) {
|
||||
pw_log_warn(NAME" %p: [%s] UNDERFLOW channel:%u offset:%"PRIi64" underrun:%u",
|
||||
client, client->name, stream->channel, offset, underrun_for);
|
||||
}
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_UNDERFLOW,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, stream->channel,
|
||||
TAG_INVALID);
|
||||
if (client->version >= 23) {
|
||||
message_put(reply,
|
||||
TAG_S64, offset,
|
||||
TAG_INVALID);
|
||||
}
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int send_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply, *m, *t;
|
||||
|
||||
if (!(client->subscribed & mask))
|
||||
return 0;
|
||||
|
||||
pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id);
|
||||
|
||||
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) != SUBSCRIPTION_EVENT_NEW) {
|
||||
spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
|
||||
if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
|
||||
continue;
|
||||
if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK)
|
||||
continue;
|
||||
if (m->extra[2] != id)
|
||||
continue;
|
||||
|
||||
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) {
|
||||
/* This object is being removed, hence there is no
|
||||
* point in keeping the old events regarding this
|
||||
* entry in the queue. */
|
||||
message_free(impl, m, true, false);
|
||||
pw_log_debug("Dropped redundant event due to remove event.");
|
||||
continue;
|
||||
}
|
||||
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
|
||||
/* This object has changed. If a "new" or "change" event for
|
||||
* this object is still in the queue we can exit. */
|
||||
pw_log_debug("Dropped redundant event due to change event.");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
reply->extra[0] = COMMAND_SUBSCRIBE_EVENT,
|
||||
reply->extra[1] = event,
|
||||
reply->extra[2] = id,
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_SUBSCRIBE_EVENT,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, event,
|
||||
TAG_U32, id,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id)
|
||||
{
|
||||
struct server *s;
|
||||
spa_list_for_each(s, &impl->servers, link) {
|
||||
struct client *c;
|
||||
spa_list_for_each(c, &s->clients, link)
|
||||
send_subscribe_event(c, mask, event, id);
|
||||
client_queue_subscribe_event(c, mask, event, id);
|
||||
}
|
||||
}
|
||||
|
||||
static int send_overflow(struct stream *stream)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
|
||||
pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client,
|
||||
client->name, stream->channel);
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_OVERFLOW,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, stream->channel,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int send_stream_killed(struct stream *stream)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
uint32_t command;
|
||||
|
||||
command = stream->direction == PW_DIRECTION_OUTPUT ?
|
||||
COMMAND_PLAYBACK_STREAM_KILLED :
|
||||
COMMAND_RECORD_STREAM_KILLED;
|
||||
|
||||
pw_log_info(NAME" %p: [%s] %s channel:%u", client, client->name,
|
||||
commands[command].name, stream->channel);
|
||||
|
||||
if (client->version < 23)
|
||||
return 0;
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
message_put(reply,
|
||||
TAG_U32, command,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, stream->channel,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int send_stream_started(struct stream *stream)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
struct message *reply;
|
||||
|
||||
pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel);
|
||||
|
||||
reply = message_alloc(impl, -1, 0);
|
||||
message_put(reply,
|
||||
TAG_U32, COMMAND_STARTED,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, stream->channel,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
|
|
@ -474,7 +176,7 @@ static int do_command_auth(struct client *client, uint32_t command, uint32_t tag
|
|||
TAG_U32, PROTOCOL_VERSION,
|
||||
TAG_INVALID);
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int reply_set_client_name(struct client *client, uint32_t tag)
|
||||
|
|
@ -498,7 +200,7 @@ static int reply_set_client_name(struct client *client, uint32_t tag)
|
|||
TAG_U32, id, /* client index */
|
||||
TAG_INVALID);
|
||||
}
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static void manager_sync(void *data)
|
||||
|
|
@ -534,7 +236,7 @@ static int send_object_event(struct client *client, struct pw_manager_object *o,
|
|||
uint32_t event = 0, mask = 0, res_id = o->id;
|
||||
|
||||
if (pw_manager_object_is_sink(o)) {
|
||||
send_subscribe_event(client,
|
||||
client_queue_subscribe_event(client,
|
||||
SUBSCRIPTION_MASK_SINK,
|
||||
SUBSCRIPTION_EVENT_SINK | facility,
|
||||
res_id);
|
||||
|
|
@ -568,7 +270,7 @@ static int send_object_event(struct client *client, struct pw_manager_object *o,
|
|||
event = SPA_ID_INVALID;
|
||||
|
||||
if (event != SPA_ID_INVALID)
|
||||
send_subscribe_event(client,
|
||||
client_queue_subscribe_event(client,
|
||||
mask,
|
||||
event | facility,
|
||||
res_id);
|
||||
|
|
@ -627,7 +329,7 @@ static void send_latency_offset_subscribe_event(struct client *client, struct pw
|
|||
d->initialized = true;
|
||||
|
||||
if (changed)
|
||||
send_subscribe_event(client,
|
||||
client_queue_subscribe_event(client,
|
||||
SUBSCRIPTION_MASK_CARD,
|
||||
SUBSCRIPTION_EVENT_CARD | SUBSCRIPTION_EVENT_CHANGE,
|
||||
card_id);
|
||||
|
|
@ -655,7 +357,7 @@ static void send_default_change_subscribe_event(struct client *client, bool sink
|
|||
}
|
||||
|
||||
if (changed)
|
||||
send_subscribe_event(client,
|
||||
client_queue_subscribe_event(client,
|
||||
SUBSCRIPTION_MASK_SERVER,
|
||||
SUBSCRIPTION_EVENT_CHANGE |
|
||||
SUBSCRIPTION_EVENT_SERVER,
|
||||
|
|
@ -874,90 +576,6 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s
|
|||
return reply_simple_ack(client, tag);
|
||||
}
|
||||
|
||||
static void stream_free(struct stream *stream)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
|
||||
pw_log_debug(NAME" %p: stream %p channel:%d", impl, stream, stream->channel);
|
||||
|
||||
if (stream->drain_tag)
|
||||
reply_error(client, -1, stream->drain_tag, -ENOENT);
|
||||
|
||||
if (stream->killed)
|
||||
send_stream_killed(stream);
|
||||
|
||||
/* force processing of all pending messages before we destroy
|
||||
* the stream */
|
||||
pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
|
||||
|
||||
if (stream->channel != SPA_ID_INVALID)
|
||||
pw_map_remove(&client->streams, stream->channel);
|
||||
if (stream->stream) {
|
||||
spa_hook_remove(&stream->stream_listener);
|
||||
pw_stream_destroy(stream->stream);
|
||||
}
|
||||
pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
|
||||
|
||||
if (stream->buffer)
|
||||
free(stream->buffer);
|
||||
pw_properties_free(stream->props);
|
||||
free(stream);
|
||||
}
|
||||
|
||||
static bool stream_prebuf_active(struct stream *stream)
|
||||
{
|
||||
uint32_t index;
|
||||
int32_t avail;
|
||||
|
||||
avail = spa_ringbuffer_get_write_index(&stream->ring, &index);
|
||||
if (stream->in_prebuf)
|
||||
return avail < (int32_t) stream->attr.prebuf;
|
||||
else
|
||||
return stream->attr.prebuf > 0 && avail >= 0;
|
||||
}
|
||||
|
||||
static uint32_t stream_pop_missing(struct stream *stream)
|
||||
{
|
||||
uint32_t missing;
|
||||
|
||||
if (stream->missing <= 0)
|
||||
return 0;
|
||||
|
||||
if (stream->missing < stream->attr.minreq &&
|
||||
!stream_prebuf_active(stream))
|
||||
return 0;
|
||||
|
||||
missing = stream->missing;
|
||||
stream->requested += missing;
|
||||
stream->missing = 0;
|
||||
return missing;
|
||||
}
|
||||
|
||||
static int send_command_request(struct stream *stream)
|
||||
{
|
||||
struct client *client = stream->client;
|
||||
struct impl *impl = client->impl;
|
||||
struct message *msg;
|
||||
uint32_t size;
|
||||
|
||||
size = stream_pop_missing(stream);
|
||||
pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
|
||||
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
msg = message_alloc(impl, -1, 0);
|
||||
message_put(msg,
|
||||
TAG_U32, COMMAND_REQUEST,
|
||||
TAG_U32, -1,
|
||||
TAG_U32, stream->channel,
|
||||
TAG_U32, size,
|
||||
TAG_INVALID);
|
||||
|
||||
return send_message(client, msg);
|
||||
}
|
||||
|
||||
static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss)
|
||||
{
|
||||
uint64_t u;
|
||||
|
|
@ -1135,7 +753,7 @@ static int reply_create_playback_stream(struct stream *stream)
|
|||
|
||||
stream->create_tag = SPA_ID_INVALID;
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
|
||||
|
|
@ -1276,7 +894,7 @@ static int reply_create_record_stream(struct stream *stream)
|
|||
|
||||
stream->create_tag = SPA_ID_INVALID;
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static void stream_control_info(void *data, uint32_t id,
|
||||
|
|
@ -1470,9 +1088,9 @@ do_process_done(struct spa_loop *loop,
|
|||
stream->underrun_for = 0;
|
||||
stream->playing_for = 0;
|
||||
if (pd->underrun)
|
||||
send_underflow(stream, stream->read_index, pd->underrun_for);
|
||||
stream_send_underflow(stream, stream->read_index, pd->underrun_for);
|
||||
else
|
||||
send_stream_started(stream);
|
||||
stream_send_started(stream);
|
||||
}
|
||||
stream->missing += pd->missing;
|
||||
stream->missing = SPA_MIN(stream->missing, stream->attr.tlength);
|
||||
|
|
@ -1480,7 +1098,7 @@ do_process_done(struct spa_loop *loop,
|
|||
if (stream->underrun_for != (uint64_t)-1)
|
||||
stream->underrun_for += pd->underrun_for;
|
||||
|
||||
send_command_request(stream);
|
||||
stream_send_request(stream);
|
||||
} else {
|
||||
struct message *msg;
|
||||
stream->write_index += pd->write_inc;
|
||||
|
|
@ -1523,7 +1141,7 @@ do_process_done(struct spa_loop *loop,
|
|||
index % stream->attr.maxlength,
|
||||
msg->data, towrite);
|
||||
|
||||
send_message(client, msg);
|
||||
client_queue_message(client, msg);
|
||||
|
||||
index += towrite;
|
||||
avail -= towrite;
|
||||
|
|
@ -2257,7 +1875,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint
|
|||
TAG_U64, stream->playing_for,
|
||||
TAG_INVALID);
|
||||
}
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -2292,7 +1910,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
|
|||
TAG_S64, stream->read_index,
|
||||
TAG_INVALID);
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_create_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -2375,7 +1993,7 @@ static int do_create_upload_stream(struct client *client, uint32_t command, uint
|
|||
TAG_U32, stream->channel,
|
||||
TAG_U32, length,
|
||||
TAG_INVALID);
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
|
||||
error_errno:
|
||||
res = -errno;
|
||||
|
|
@ -2583,7 +2201,7 @@ static void sample_play_ready(void *data, uint32_t index)
|
|||
TAG_U32, index,
|
||||
TAG_INVALID);
|
||||
|
||||
send_message(client, reply);
|
||||
client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static void on_sample_done(void *obj, void *data, int res, uint32_t id)
|
||||
|
|
@ -2763,31 +2381,6 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag,
|
|||
return reply_simple_ack(client, tag);
|
||||
}
|
||||
|
||||
static void stream_flush(struct stream *stream)
|
||||
{
|
||||
pw_stream_flush(stream->stream, false);
|
||||
|
||||
if (stream->type == STREAM_TYPE_PLAYBACK) {
|
||||
stream->ring.writeindex = stream->ring.readindex;
|
||||
stream->write_index = stream->read_index;
|
||||
|
||||
stream->missing = stream->attr.tlength -
|
||||
SPA_MIN(stream->requested, stream->attr.tlength);
|
||||
|
||||
if (stream->attr.prebuf > 0)
|
||||
stream->in_prebuf = true;
|
||||
|
||||
stream->playing_for = 0;
|
||||
stream->underrun_for = -1;
|
||||
stream->is_underrun = true;
|
||||
|
||||
send_command_request(stream);
|
||||
} else {
|
||||
stream->ring.readindex = stream->ring.writeindex;
|
||||
stream->read_index = stream->write_index;
|
||||
}
|
||||
}
|
||||
|
||||
static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
|
|
@ -3481,7 +3074,7 @@ static int do_get_server_info(struct client *client, uint32_t command, uint32_t
|
|||
TAG_CHANNEL_MAP, &impl->defs.channel_map,
|
||||
TAG_INVALID);
|
||||
}
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -3500,7 +3093,7 @@ static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct
|
|||
TAG_U32, impl->stat.sample_cache, /* sample cache size */
|
||||
TAG_INVALID);
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_lookup(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -3527,7 +3120,7 @@ static int do_lookup(struct client *client, uint32_t command, uint32_t tag, stru
|
|||
TAG_U32, is_monitor ? o->id | MONITOR_FLAG : o->id,
|
||||
TAG_INVALID);
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -4268,7 +3861,7 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st
|
|||
if (module == NULL)
|
||||
goto error_noentity;
|
||||
fill_ext_module_info(client, reply, module);
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
switch (command) {
|
||||
|
|
@ -4342,7 +3935,7 @@ static int do_get_info(struct client *client, uint32_t command, uint32_t tag, st
|
|||
if ((res = fill_func(client, reply, o)) < 0)
|
||||
goto error;
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
|
||||
error_protocol:
|
||||
res = -EPROTO;
|
||||
|
|
@ -4427,7 +4020,7 @@ static int do_get_sample_info(struct client *client, uint32_t command, uint32_t
|
|||
if ((res = fill_sample_info(client, reply, sample)) < 0)
|
||||
goto error;
|
||||
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
|
||||
error:
|
||||
if (reply)
|
||||
|
|
@ -4451,7 +4044,7 @@ static int do_get_sample_info_list(struct client *client, uint32_t command, uint
|
|||
continue;
|
||||
fill_sample_info(client, reply, s);
|
||||
}
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
struct info_list_data {
|
||||
|
|
@ -4520,7 +4113,7 @@ static int do_get_info_list(struct client *client, uint32_t command, uint32_t ta
|
|||
if (command == COMMAND_GET_MODULE_INFO_LIST)
|
||||
pw_map_for_each(&impl->modules, do_info_list_module, &info);
|
||||
|
||||
return send_message(client, info.reply);
|
||||
return client_queue_message(client, info.reply);
|
||||
}
|
||||
|
||||
static int do_set_stream_buffer_attr(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -4603,7 +4196,7 @@ static int do_set_stream_buffer_attr(struct client *client, uint32_t command, ui
|
|||
TAG_INVALID);
|
||||
}
|
||||
}
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_update_stream_sample_rate(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -4931,7 +4524,7 @@ static void on_module_loaded(void *data, int result)
|
|||
message_put(reply,
|
||||
TAG_U32, module->idx,
|
||||
TAG_INVALID);
|
||||
send_message(client, reply);
|
||||
client_queue_message(client, reply);
|
||||
}
|
||||
else {
|
||||
pw_log_warn(NAME" %p: [%s] failed to load module id:%u name:%s result:%d (%s)",
|
||||
|
|
@ -5079,7 +4672,7 @@ static int do_send_object_message(struct client *client, uint32_t command, uint3
|
|||
reply = reply_new(client, tag);
|
||||
message_put(reply, TAG_STRING, response, TAG_INVALID);
|
||||
free(response);
|
||||
return send_message(client, reply);
|
||||
return client_queue_message(client, reply);
|
||||
}
|
||||
|
||||
static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m)
|
||||
|
|
@ -5255,106 +4848,6 @@ const struct command commands[COMMAND_MAX] =
|
|||
[COMMAND_SEND_OBJECT_MESSAGE] = { "SEND_OBJECT_MESSAGE", do_send_object_message, },
|
||||
};
|
||||
|
||||
static int client_free_stream(void *item, void *data)
|
||||
{
|
||||
struct stream *s = item;
|
||||
stream_free(s);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* tries to detach the client from the server,
|
||||
* but it does not drop the server's reference
|
||||
*/
|
||||
static bool client_detach(struct client *client)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
struct server *server = client->server;
|
||||
|
||||
if (server == NULL)
|
||||
return false;
|
||||
|
||||
pw_log_info(NAME" %p: client %p detaching", server, client);
|
||||
|
||||
/* remove from the `server->clients` list */
|
||||
spa_list_remove(&client->link);
|
||||
server->n_clients--;
|
||||
if (server->wait_clients > 0 && --server->wait_clients == 0) {
|
||||
int mask = server->source->mask;
|
||||
SPA_FLAG_SET(mask, SPA_IO_IN);
|
||||
pw_loop_update_io(impl->loop, server->source, mask);
|
||||
}
|
||||
client->server = NULL;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static void client_disconnect(struct client *client)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
|
||||
if (client->disconnect)
|
||||
return;
|
||||
|
||||
/* the client must be detached from the server to disconnect */
|
||||
spa_assert(client->server == NULL);
|
||||
|
||||
client->disconnect = true;
|
||||
spa_list_append(&impl->cleanup_clients, &client->link);
|
||||
|
||||
pw_map_for_each(&client->streams, client_free_stream, client);
|
||||
|
||||
if (client->source)
|
||||
pw_loop_destroy_source(impl->loop, client->source);
|
||||
if (client->manager)
|
||||
pw_manager_destroy(client->manager);
|
||||
|
||||
}
|
||||
|
||||
static void client_free(struct client *client)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
struct message *msg;
|
||||
struct pending_sample *p;
|
||||
struct operation *o;
|
||||
|
||||
pw_log_info(NAME" %p: client %p free", impl, client);
|
||||
|
||||
client_detach(client);
|
||||
client_disconnect(client);
|
||||
|
||||
/* remove from the `impl->cleanup_clients` list */
|
||||
spa_list_remove(&client->link);
|
||||
|
||||
spa_list_consume(p, &client->pending_samples, link)
|
||||
pending_sample_free(p);
|
||||
|
||||
spa_list_consume(msg, &client->out_messages, link)
|
||||
message_free(impl, msg, true, false);
|
||||
|
||||
spa_list_consume(o, &client->operations, link)
|
||||
operation_free(o);
|
||||
|
||||
if (client->core) {
|
||||
client->disconnecting = true;
|
||||
pw_core_disconnect(client->core);
|
||||
}
|
||||
pw_map_clear(&client->streams);
|
||||
free(client->default_sink);
|
||||
free(client->default_source);
|
||||
if (client->props)
|
||||
pw_properties_free(client->props);
|
||||
if (client->routes)
|
||||
pw_properties_free(client->routes);
|
||||
free(client);
|
||||
}
|
||||
|
||||
static void client_unref(struct client *client)
|
||||
{
|
||||
if (--client->ref == 0)
|
||||
client_free(client);
|
||||
}
|
||||
|
||||
static int handle_packet(struct client *client, struct message *msg)
|
||||
{
|
||||
struct impl *impl = client->impl;
|
||||
|
|
@ -5450,7 +4943,7 @@ static int handle_memblock(struct client *client, struct message *msg)
|
|||
/* underrun, reported on reader side */
|
||||
} else if (filled + msg->length > stream->attr.maxlength) {
|
||||
/* overrun */
|
||||
send_overflow(stream);
|
||||
stream_send_overflow(stream);
|
||||
}
|
||||
|
||||
/* always write data to ringbuffer, we expect the other side
|
||||
|
|
@ -5582,7 +5075,7 @@ on_client_data(void *data, int fd, uint32_t mask)
|
|||
if (mask & SPA_IO_OUT || client->need_flush) {
|
||||
pw_log_trace(NAME" %p: can write", impl);
|
||||
client->need_flush = false;
|
||||
res = flush_messages(client);
|
||||
res = client_flush_messages(client);
|
||||
if (res >= 0) {
|
||||
int mask = client->source->mask;
|
||||
SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue