stream: only use adapter for raw audio for now

This commit is contained in:
Wim Taymans 2019-07-25 12:00:08 +02:00
parent e5778b8745
commit 5aa0ff21c6

View file

@ -43,6 +43,8 @@
#include "pipewire/stream.h"
#include "pipewire/private.h"
#define NAME "stream"
#define MAX_BUFFERS 64
#define MIN_QUEUED 1
@ -129,6 +131,9 @@ struct stream {
struct spa_list param_list;
struct spa_param_info params[5];
uint32_t media_type;
uint32_t media_subtype;
struct buffer buffers[MAX_BUFFERS];
uint32_t n_buffers;
@ -262,7 +267,7 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat
free(stream->error);
stream->error = error ? strdup(error) : NULL;
pw_log_debug("stream %p: update state from %s -> %s (%s)", stream,
pw_log_debug(NAME" %p: update state from %s -> %s (%s)", stream,
pw_stream_state_as_string(old),
pw_stream_state_as_string(state), stream->error);
@ -288,7 +293,7 @@ do_call_process(struct spa_loop *loop,
{
struct stream *impl = user_data;
struct pw_stream *stream = &impl->this;
pw_log_trace("do process");
pw_log_trace(NAME" %p: do process", stream);
pw_stream_emit_process(stream);
return 0;
}
@ -308,7 +313,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
{
struct stream *impl = object;
pw_log_debug("stream %p: io %d %p/%zd", impl, id, data, size);
pw_log_debug(NAME" %p: io %d %p/%zd", impl, id, data, size);
switch(id) {
case SPA_IO_Position:
@ -331,13 +336,13 @@ static int impl_send_command(void *object, const struct spa_command *command)
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Pause:
if (stream->state == PW_STREAM_STATE_STREAMING) {
pw_log_debug("stream %p: pause", stream);
pw_log_debug(NAME" %p: pause", stream);
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
}
break;
case SPA_NODE_COMMAND_Start:
if (stream->state == PW_STREAM_STATE_PAUSED) {
pw_log_debug("stream %p: start %d", stream, impl->direction);
pw_log_debug(NAME" %p: start %d", stream, impl->direction);
if (impl->direction == SPA_DIRECTION_INPUT) {
impl->io->status = SPA_STATUS_NEED_BUFFER;
@ -350,7 +355,8 @@ static int impl_send_command(void *object, const struct spa_command *command)
}
break;
default:
pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command));
pw_log_warn(NAME" %p: unhandled node command %d", stream,
SPA_NODE_COMMAND_ID(command));
break;
}
return 0;
@ -419,7 +425,7 @@ static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t
{
struct stream *impl = object;
pw_log_debug("stream %p: set io %s %p %zd", impl,
pw_log_debug(NAME" %p: set io %s %p %zd", impl,
spa_debug_type_find_name(spa_type_io, id), data, size);
switch (id) {
@ -485,7 +491,7 @@ static int port_set_format(void *object,
struct param *p;
int count, res;
pw_log_debug("stream %p: format changed:", impl);
pw_log_debug(NAME" %p: format changed:", impl);
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_format(2, NULL, format);
@ -546,11 +552,11 @@ static int map_data(struct stream *impl, struct spa_data *data, int prot)
ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
if (ptr == MAP_FAILED) {
pw_log_error("stream %p: failed to mmap buffer mem: %m", impl);
pw_log_error(NAME" %p: failed to mmap buffer mem: %m", impl);
return -errno;
}
data->data = SPA_MEMBER(ptr, range.start, void);
pw_log_debug("stream %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd,
pw_log_debug(NAME" %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd,
range.offset, range.size, data->data);
return 0;
@ -563,9 +569,9 @@ static int unmap_data(struct stream *impl, struct spa_data *data)
pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize);
if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0)
pw_log_warn("failed to unmap: %m");
pw_log_warn(NAME" %p: failed to unmap: %m", impl);
pw_log_debug("stream %p: fd %"PRIi64" unmapped", impl, data->fd);
pw_log_debug(NAME" %p: fd %"PRIi64" unmapped", impl, data->fd);
return 0;
}
@ -574,7 +580,7 @@ static void clear_buffers(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint32_t i, j;
pw_log_debug("stream %p: clear buffers %d", stream, impl->n_buffers);
pw_log_debug(NAME" %p: clear buffers %d", stream, impl->n_buffers);
for (i = 0; i < impl->n_buffers; i++) {
struct buffer *b = &impl->buffers[i];
@ -584,7 +590,7 @@ static void clear_buffers(struct pw_stream *stream)
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) {
for (j = 0; j < b->this.buffer->n_datas; j++) {
struct spa_data *d = &b->this.buffer->datas[j];
pw_log_debug("stream %p: clear buffer %d mem",
pw_log_debug(NAME" %p: clear buffer %d mem",
stream, b->id);
unmap_data(impl, d);
}
@ -624,7 +630,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin
return res;
}
else if (d->data == NULL) {
pw_log_error("invalid buffer mem");
pw_log_error(NAME" %p: invalid buffer mem", stream);
return -EINVAL;
}
buf_size += d->maxsize;
@ -632,12 +638,12 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin
SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
if (size > 0 && buf_size != size) {
pw_log_error("invalid buffer size %d", buf_size);
pw_log_error(NAME" %p: invalid buffer size %d", stream, buf_size);
return -EINVAL;
} else
size = buf_size;
}
pw_log_debug("got buffer %d %d datas, mapped size %d", i,
pw_log_debug(NAME" %p: got buffer %d %d datas, mapped size %d", stream, i,
buffers[i]->n_datas, size);
}
@ -649,7 +655,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin
b->this.buffer = buffers[i];
if (impl->direction == SPA_DIRECTION_OUTPUT) {
pw_log_trace("stream %p: recycle buffer %d", stream, b->id);
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
push_queue(impl, &impl->dequeued, b);
}
@ -670,7 +676,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin
static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
{
struct stream *d = object;
pw_log_trace("stream %p: recycle buffer %d", d, buffer_id);
pw_log_trace(NAME" %p: recycle buffer %d", d, buffer_id);
if (buffer_id < d->n_buffers)
push_queue(d, &d->queued, &d->buffers[buffer_id]);
return 0;
@ -700,7 +706,7 @@ static int impl_node_process_input(void *object)
size = impl->time.ticks - impl->dequeued.incount;
pw_log_trace("stream %p: process in %d %d %"PRIu64" %"PRIi64" %"PRIu64, stream,
pw_log_trace(NAME" %p: process in %d %d %"PRIu64" %"PRIi64" %"PRIu64, stream,
io->status, io->buffer_id, impl->time.ticks, impl->time.delay, size);
if (io->status != SPA_STATUS_HAVE_BUFFER)
@ -720,7 +726,7 @@ done:
/* pop buffer to recycle */
if ((b = pop_queue(impl, &impl->queued))) {
pw_log_trace("stream %p: recycle buffer %d", stream, b->id);
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
}
io->buffer_id = b ? b->id : SPA_ID_INVALID;
@ -739,14 +745,14 @@ static int impl_node_process_output(void *object)
uint32_t index;
again:
pw_log_trace("stream %p: process out %d %d %"PRIu64" %"PRIi64, stream,
pw_log_trace(NAME" %p: process out %d %d %"PRIu64" %"PRIi64, stream,
io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
res = 0;
if (io->status != SPA_STATUS_HAVE_BUFFER) {
/* recycle old buffer */
if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
pw_log_trace("stream %p: recycle buffer %d", stream, b->id);
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
push_queue(impl, &impl->dequeued, b);
}
@ -754,11 +760,11 @@ again:
if ((b = pop_queue(impl, &impl->queued)) != NULL) {
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: pop %d %p", stream, b->id, io);
pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io);
} else {
io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace("stream %p: no more buffers %p", stream, io);
pw_log_trace(NAME" %p: no more buffers %p", stream, io);
}
}
@ -771,7 +777,7 @@ again:
copy_position(impl, impl->queued.outcount);
res = io->status;
pw_log_trace("stream %p: res %d", stream, res);
pw_log_trace(NAME" %p: res %d", stream, res);
return res;
}
@ -906,7 +912,7 @@ static void node_event_param(void *object, int seq,
}
c->id = iid;
pw_log_debug("stream %p: add control %d (%s) (def:%f min:%f max:%f)",
pw_log_debug(NAME" %p: add control %d (%s) (def:%f min:%f max:%f)",
stream, c->id, c->control.name,
c->control.def, c->control.min, c->control.max);
break;
@ -939,7 +945,7 @@ static void node_event_param(void *object, int seq,
c->control.value = value.f;
c->emitted = true;
pw_log_debug("stream %p: control %d (%s) changed %f", stream,
pw_log_debug(NAME" %p: control %d (%s) changed %f", stream,
prop->key, c->control.name, value.f);
pw_stream_emit_control_changed(stream, prop->key, value.f);
}
@ -965,7 +971,7 @@ static int handle_connect(struct pw_stream *stream)
const char *str;
int res;
pw_log_debug("stream %p: creating node", stream);
pw_log_debug(NAME" %p: creating node", stream);
props = pw_properties_copy(stream->properties);
if ((str = pw_properties_get(props, PW_KEY_STREAM_MONITOR)) &&
@ -981,12 +987,15 @@ static int handle_connect(struct pw_stream *stream)
}
pw_node_set_implementation(slave, &impl->impl_node);
if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_INACTIVE))
pw_node_set_active(slave, true);
if (impl->media_type == SPA_MEDIA_TYPE_audio &&
impl->media_subtype == SPA_MEDIA_SUBTYPE_raw) {
factory = pw_core_find_factory(impl->core, "adapter");
if (factory == NULL) {
pw_log_error("no adapter factory found");
pw_log_error(NAME" %p: no adapter factory found", stream);
res = -ENOENT;
goto error_node;
}
@ -1001,7 +1010,11 @@ static int handle_connect(struct pw_stream *stream)
res = -errno;
goto error_node;
}
pw_log_debug("stream %p: export node %p", stream, impl->node);
} else {
impl->node = slave;
}
pw_log_debug(NAME" %p: export node %p", stream, impl->node);
stream->proxy = pw_remote_export(stream->remote,
PW_TYPE_INTERFACE_Node, NULL, impl->node, 0);
if (stream->proxy == NULL) {
@ -1016,10 +1029,10 @@ static int handle_connect(struct pw_stream *stream)
return 0;
error_node:
pw_log_error("stream %p: can't make node: %s", stream, spa_strerror(res));
pw_log_error(NAME" %p: can't make node: %s", stream, spa_strerror(res));
return res;
error_proxy:
pw_log_error("stream %p: can't make proxy: %s", stream, spa_strerror(res));
pw_log_error(NAME" %p: can't make proxy: %s", stream, spa_strerror(res));
return res;
}
@ -1029,7 +1042,7 @@ static void on_remote_state_changed(void *_data, enum pw_remote_state old,
struct pw_stream *stream = _data;
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_log_debug("stream %p: remote state %d", stream, state);
pw_log_debug(NAME" %p: remote state %d", stream, state);
switch (state) {
case PW_REMOTE_STATE_ERROR:
@ -1080,7 +1093,7 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
}
this = &impl->this;
pw_log_debug("stream %p: new \"%s\"", impl, name);
pw_log_debug(NAME" %p: new \"%s\"", impl, name);
if (props == NULL) {
props = pw_properties_new(PW_KEY_MEDIA_NAME, name, NULL);
@ -1206,7 +1219,7 @@ void pw_stream_destroy(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct control *c;
pw_log_debug("stream %p: destroy", stream);
pw_log_debug(NAME" %p: destroy", stream);
pw_stream_emit_destroy(stream);
@ -1300,6 +1313,33 @@ static void add_params(struct pw_stream *stream)
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))));
}
static int find_format(struct stream *impl, enum pw_direction direction,
uint32_t *media_type, uint32_t *media_subtype)
{
uint32_t state = 0;
uint8_t buffer[4096];
struct spa_pod_builder b;
int res;
struct spa_pod *format;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if ((res = spa_node_port_enum_params_sync(&impl->impl_node,
impl->direction, 0,
SPA_PARAM_EnumFormat, &state,
NULL, &format, &b)) != 1) {
pw_log_warn(NAME" %p: no format given", impl);
return -ENOENT;
}
if ((res = spa_format_parse(format, media_type, media_subtype)) < 0)
return res;
pw_log_debug(NAME " %p: %s/%s", impl,
spa_debug_type_find_name(spa_type_media_type, *media_type),
spa_debug_type_find_name(spa_type_media_subtype, *media_subtype));
return 0;
}
SPA_EXPORT
int
pw_stream_connect(struct pw_stream *stream,
@ -1314,7 +1354,7 @@ pw_stream_connect(struct pw_stream *stream,
int res;
uint32_t i;
pw_log_debug("stream %p: connect target:%d", stream, target_id);
pw_log_debug(NAME" %p: connect target:%d", stream, target_id);
impl->direction =
direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
impl->flags = flags;
@ -1342,6 +1382,9 @@ pw_stream_connect(struct pw_stream *stream,
add_params(stream);
if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0)
return res;
stream_set_state(stream, PW_STREAM_STATE_CONNECTING, NULL);
if (target_id != SPA_ID_INVALID)
@ -1354,6 +1397,8 @@ pw_stream_connect(struct pw_stream *stream,
pw_properties_set(stream->properties, PW_KEY_NODE_EXCLUSIVE, "1");
if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "1");
pw_properties_setf(stream->properties, PW_KEY_MEDIA_CLASS, "Stream/%s/Audio",
direction == PW_DIRECTION_INPUT ? "Input" : "Output");
@ -1380,7 +1425,7 @@ int pw_stream_disconnect(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_log_debug("stream %p: disconnect", stream);
pw_log_debug(NAME" %p: disconnect", stream);
impl->disconnecting = true;
if (stream->proxy) {
@ -1405,7 +1450,7 @@ void pw_stream_finish_format(struct pw_stream *stream,
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint32_t i;
pw_log_debug("stream %p: finish format %d %d", stream, res, impl->pending_seq);
pw_log_debug(NAME" %p: finish format %d %d", stream, res, impl->pending_seq);
if (res < 0) {
pw_proxy_error(stream->proxy, res, "format failed");
@ -1430,7 +1475,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value, ..
struct spa_pod *pod;
struct control *c;
pw_log_debug("stream %p: set control %d %f", stream, id, value);
pw_log_debug(NAME" %p: set control %d %f", stream, id, value);
va_start(varargs, value);
@ -1502,8 +1547,8 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
else
time->queued = (int64_t)(impl->queued.incount - time->queued);
pw_log_trace("%"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %"
PRIu64" %"PRIu64" %"PRIu64" %"PRIu64,
pw_log_trace(NAME" %p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %"
PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, stream,
time->now, time->delay, time->ticks,
time->rate.num, time->rate.denom, time->queued,
impl->dequeued.outcount, impl->dequeued.incount,
@ -1540,12 +1585,12 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
res = -errno;
pw_log_trace("stream %p: no more buffers: %m", stream);
pw_log_trace(NAME" %p: no more buffers: %m", stream);
call_trigger(impl);
errno = -res;
return NULL;
}
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);
pw_log_trace(NAME" %p: dequeue buffer %d", stream, b->id);
return &b->this;
}
@ -1557,7 +1602,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
int res;
pw_log_trace("stream %p: queue buffer %d", stream, b->id);
pw_log_trace(NAME" %p: queue buffer %d", stream, b->id);
if ((res = push_queue(impl, &impl->queued, b)) < 0)
return res;
@ -1571,7 +1616,7 @@ do_flush(struct spa_loop *loop,
struct stream *impl = user_data;
struct buffer *b;
pw_log_trace("stream %p: flush", impl);
pw_log_trace(NAME" %p: flush", impl);
do {
b = pop_queue(impl, &impl->queued);
if (b != NULL)