diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 3f0462427..340ed5623 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -43,6 +43,8 @@ #include "pipewire/stream.h" #include "pipewire/private.h" +#define NAME "stream" + #define MAX_BUFFERS 64 #define MIN_QUEUED 1 @@ -129,6 +131,9 @@ struct stream { struct spa_list param_list; struct spa_param_info params[5]; + uint32_t media_type; + uint32_t media_subtype; + struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; @@ -262,7 +267,7 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat free(stream->error); stream->error = error ? strdup(error) : NULL; - pw_log_debug("stream %p: update state from %s -> %s (%s)", stream, + pw_log_debug(NAME" %p: update state from %s -> %s (%s)", stream, pw_stream_state_as_string(old), pw_stream_state_as_string(state), stream->error); @@ -288,7 +293,7 @@ do_call_process(struct spa_loop *loop, { struct stream *impl = user_data; struct pw_stream *stream = &impl->this; - pw_log_trace("do process"); + pw_log_trace(NAME" %p: do process", stream); pw_stream_emit_process(stream); return 0; } @@ -308,7 +313,7 @@ static int impl_set_io(void *object, uint32_t id, void *data, size_t size) { struct stream *impl = object; - pw_log_debug("stream %p: io %d %p/%zd", impl, id, data, size); + pw_log_debug(NAME" %p: io %d %p/%zd", impl, id, data, size); switch(id) { case SPA_IO_Position: @@ -331,13 +336,13 @@ static int impl_send_command(void *object, const struct spa_command *command) switch (SPA_NODE_COMMAND_ID(command)) { case SPA_NODE_COMMAND_Pause: if (stream->state == PW_STREAM_STATE_STREAMING) { - pw_log_debug("stream %p: pause", stream); + pw_log_debug(NAME" %p: pause", stream); stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); } break; case SPA_NODE_COMMAND_Start: if (stream->state == PW_STREAM_STATE_PAUSED) { - pw_log_debug("stream %p: start %d", stream, impl->direction); + pw_log_debug(NAME" %p: start %d", stream, impl->direction); if (impl->direction == SPA_DIRECTION_INPUT) { impl->io->status = SPA_STATUS_NEED_BUFFER; @@ -350,7 +355,8 @@ static int impl_send_command(void *object, const struct spa_command *command) } break; default: - pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command)); + pw_log_warn(NAME" %p: unhandled node command %d", stream, + SPA_NODE_COMMAND_ID(command)); break; } return 0; @@ -419,7 +425,7 @@ static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t { struct stream *impl = object; - pw_log_debug("stream %p: set io %s %p %zd", impl, + pw_log_debug(NAME" %p: set io %s %p %zd", impl, spa_debug_type_find_name(spa_type_io, id), data, size); switch (id) { @@ -485,7 +491,7 @@ static int port_set_format(void *object, struct param *p; int count, res; - pw_log_debug("stream %p: format changed:", impl); + pw_log_debug(NAME" %p: format changed:", impl); if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) spa_debug_format(2, NULL, format); @@ -546,11 +552,11 @@ static int map_data(struct stream *impl, struct spa_data *data, int prot) ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset); if (ptr == MAP_FAILED) { - pw_log_error("stream %p: failed to mmap buffer mem: %m", impl); + pw_log_error(NAME" %p: failed to mmap buffer mem: %m", impl); return -errno; } data->data = SPA_MEMBER(ptr, range.start, void); - pw_log_debug("stream %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd, + pw_log_debug(NAME" %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd, range.offset, range.size, data->data); return 0; @@ -563,9 +569,9 @@ static int unmap_data(struct stream *impl, struct spa_data *data) pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize); if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0) - pw_log_warn("failed to unmap: %m"); + pw_log_warn(NAME" %p: failed to unmap: %m", impl); - pw_log_debug("stream %p: fd %"PRIi64" unmapped", impl, data->fd); + pw_log_debug(NAME" %p: fd %"PRIi64" unmapped", impl, data->fd); return 0; } @@ -574,7 +580,7 @@ static void clear_buffers(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); uint32_t i, j; - pw_log_debug("stream %p: clear buffers %d", stream, impl->n_buffers); + pw_log_debug(NAME" %p: clear buffers %d", stream, impl->n_buffers); for (i = 0; i < impl->n_buffers; i++) { struct buffer *b = &impl->buffers[i]; @@ -584,7 +590,7 @@ static void clear_buffers(struct pw_stream *stream) if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) { for (j = 0; j < b->this.buffer->n_datas; j++) { struct spa_data *d = &b->this.buffer->datas[j]; - pw_log_debug("stream %p: clear buffer %d mem", + pw_log_debug(NAME" %p: clear buffer %d mem", stream, b->id); unmap_data(impl, d); } @@ -624,7 +630,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin return res; } else if (d->data == NULL) { - pw_log_error("invalid buffer mem"); + pw_log_error(NAME" %p: invalid buffer mem", stream); return -EINVAL; } buf_size += d->maxsize; @@ -632,12 +638,12 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED); if (size > 0 && buf_size != size) { - pw_log_error("invalid buffer size %d", buf_size); + pw_log_error(NAME" %p: invalid buffer size %d", stream, buf_size); return -EINVAL; } else size = buf_size; } - pw_log_debug("got buffer %d %d datas, mapped size %d", i, + pw_log_debug(NAME" %p: got buffer %d %d datas, mapped size %d", stream, i, buffers[i]->n_datas, size); } @@ -649,7 +655,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin b->this.buffer = buffers[i]; if (impl->direction == SPA_DIRECTION_OUTPUT) { - pw_log_trace("stream %p: recycle buffer %d", stream, b->id); + pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); push_queue(impl, &impl->dequeued, b); } @@ -670,7 +676,7 @@ static int impl_port_use_buffers(void *object, enum spa_direction direction, uin static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) { struct stream *d = object; - pw_log_trace("stream %p: recycle buffer %d", d, buffer_id); + pw_log_trace(NAME" %p: recycle buffer %d", d, buffer_id); if (buffer_id < d->n_buffers) push_queue(d, &d->queued, &d->buffers[buffer_id]); return 0; @@ -700,7 +706,7 @@ static int impl_node_process_input(void *object) size = impl->time.ticks - impl->dequeued.incount; - pw_log_trace("stream %p: process in %d %d %"PRIu64" %"PRIi64" %"PRIu64, stream, + pw_log_trace(NAME" %p: process in %d %d %"PRIu64" %"PRIi64" %"PRIu64, stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay, size); if (io->status != SPA_STATUS_HAVE_BUFFER) @@ -720,7 +726,7 @@ done: /* pop buffer to recycle */ if ((b = pop_queue(impl, &impl->queued))) { - pw_log_trace("stream %p: recycle buffer %d", stream, b->id); + pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); } io->buffer_id = b ? b->id : SPA_ID_INVALID; @@ -739,14 +745,14 @@ static int impl_node_process_output(void *object) uint32_t index; again: - pw_log_trace("stream %p: process out %d %d %"PRIu64" %"PRIi64, stream, + pw_log_trace(NAME" %p: process out %d %d %"PRIu64" %"PRIi64, stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay); res = 0; if (io->status != SPA_STATUS_HAVE_BUFFER) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { - pw_log_trace("stream %p: recycle buffer %d", stream, b->id); + pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); push_queue(impl, &impl->dequeued, b); } @@ -754,11 +760,11 @@ again: if ((b = pop_queue(impl, &impl->queued)) != NULL) { io->buffer_id = b->id; io->status = SPA_STATUS_HAVE_BUFFER; - pw_log_trace("stream %p: pop %d %p", stream, b->id, io); + pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io); } else { io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_NEED_BUFFER; - pw_log_trace("stream %p: no more buffers %p", stream, io); + pw_log_trace(NAME" %p: no more buffers %p", stream, io); } } @@ -771,7 +777,7 @@ again: copy_position(impl, impl->queued.outcount); res = io->status; - pw_log_trace("stream %p: res %d", stream, res); + pw_log_trace(NAME" %p: res %d", stream, res); return res; } @@ -906,7 +912,7 @@ static void node_event_param(void *object, int seq, } c->id = iid; - pw_log_debug("stream %p: add control %d (%s) (def:%f min:%f max:%f)", + pw_log_debug(NAME" %p: add control %d (%s) (def:%f min:%f max:%f)", stream, c->id, c->control.name, c->control.def, c->control.min, c->control.max); break; @@ -939,7 +945,7 @@ static void node_event_param(void *object, int seq, c->control.value = value.f; c->emitted = true; - pw_log_debug("stream %p: control %d (%s) changed %f", stream, + pw_log_debug(NAME" %p: control %d (%s) changed %f", stream, prop->key, c->control.name, value.f); pw_stream_emit_control_changed(stream, prop->key, value.f); } @@ -965,7 +971,7 @@ static int handle_connect(struct pw_stream *stream) const char *str; int res; - pw_log_debug("stream %p: creating node", stream); + pw_log_debug(NAME" %p: creating node", stream); props = pw_properties_copy(stream->properties); if ((str = pw_properties_get(props, PW_KEY_STREAM_MONITOR)) && @@ -981,27 +987,34 @@ static int handle_connect(struct pw_stream *stream) } pw_node_set_implementation(slave, &impl->impl_node); + if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_INACTIVE)) pw_node_set_active(slave, true); - factory = pw_core_find_factory(impl->core, "adapter"); - if (factory == NULL) { - pw_log_error("no adapter factory found"); - res = -ENOENT; - goto error_node; + if (impl->media_type == SPA_MEDIA_TYPE_audio && + impl->media_subtype == SPA_MEDIA_SUBTYPE_raw) { + factory = pw_core_find_factory(impl->core, "adapter"); + if (factory == NULL) { + pw_log_error(NAME" %p: no adapter factory found", stream); + res = -ENOENT; + goto error_node; + } + pw_properties_setf(props, "adapt.slave.node", "pointer:%p", slave); + impl->node = pw_factory_create_object(factory, + NULL, + PW_TYPE_INTERFACE_Node, + PW_VERSION_NODE_PROXY, + props, + 0); + if (impl->node == NULL) { + res = -errno; + goto error_node; + } + } else { + impl->node = slave; } - pw_properties_setf(props, "adapt.slave.node", "pointer:%p", slave); - impl->node = pw_factory_create_object(factory, - NULL, - PW_TYPE_INTERFACE_Node, - PW_VERSION_NODE_PROXY, - props, - 0); - if (impl->node == NULL) { - res = -errno; - goto error_node; - } - pw_log_debug("stream %p: export node %p", stream, impl->node); + + pw_log_debug(NAME" %p: export node %p", stream, impl->node); stream->proxy = pw_remote_export(stream->remote, PW_TYPE_INTERFACE_Node, NULL, impl->node, 0); if (stream->proxy == NULL) { @@ -1016,10 +1029,10 @@ static int handle_connect(struct pw_stream *stream) return 0; error_node: - pw_log_error("stream %p: can't make node: %s", stream, spa_strerror(res)); + pw_log_error(NAME" %p: can't make node: %s", stream, spa_strerror(res)); return res; error_proxy: - pw_log_error("stream %p: can't make proxy: %s", stream, spa_strerror(res)); + pw_log_error(NAME" %p: can't make proxy: %s", stream, spa_strerror(res)); return res; } @@ -1029,7 +1042,7 @@ static void on_remote_state_changed(void *_data, enum pw_remote_state old, struct pw_stream *stream = _data; struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_log_debug("stream %p: remote state %d", stream, state); + pw_log_debug(NAME" %p: remote state %d", stream, state); switch (state) { case PW_REMOTE_STATE_ERROR: @@ -1080,7 +1093,7 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, } this = &impl->this; - pw_log_debug("stream %p: new \"%s\"", impl, name); + pw_log_debug(NAME" %p: new \"%s\"", impl, name); if (props == NULL) { props = pw_properties_new(PW_KEY_MEDIA_NAME, name, NULL); @@ -1206,7 +1219,7 @@ void pw_stream_destroy(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct control *c; - pw_log_debug("stream %p: destroy", stream); + pw_log_debug(NAME" %p: destroy", stream); pw_stream_emit_destroy(stream); @@ -1300,6 +1313,33 @@ static void add_params(struct pw_stream *stream) SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)))); } +static int find_format(struct stream *impl, enum pw_direction direction, + uint32_t *media_type, uint32_t *media_subtype) +{ + uint32_t state = 0; + uint8_t buffer[4096]; + struct spa_pod_builder b; + int res; + struct spa_pod *format; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if ((res = spa_node_port_enum_params_sync(&impl->impl_node, + impl->direction, 0, + SPA_PARAM_EnumFormat, &state, + NULL, &format, &b)) != 1) { + pw_log_warn(NAME" %p: no format given", impl); + return -ENOENT; + } + + if ((res = spa_format_parse(format, media_type, media_subtype)) < 0) + return res; + + pw_log_debug(NAME " %p: %s/%s", impl, + spa_debug_type_find_name(spa_type_media_type, *media_type), + spa_debug_type_find_name(spa_type_media_subtype, *media_subtype)); + return 0; +} + SPA_EXPORT int pw_stream_connect(struct pw_stream *stream, @@ -1314,7 +1354,7 @@ pw_stream_connect(struct pw_stream *stream, int res; uint32_t i; - pw_log_debug("stream %p: connect target:%d", stream, target_id); + pw_log_debug(NAME" %p: connect target:%d", stream, target_id); impl->direction = direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT; impl->flags = flags; @@ -1342,6 +1382,9 @@ pw_stream_connect(struct pw_stream *stream, add_params(stream); + if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0) + return res; + stream_set_state(stream, PW_STREAM_STATE_CONNECTING, NULL); if (target_id != SPA_ID_INVALID) @@ -1354,6 +1397,8 @@ pw_stream_connect(struct pw_stream *stream, pw_properties_set(stream->properties, PW_KEY_NODE_EXCLUSIVE, "1"); if (flags & PW_STREAM_FLAG_DONT_RECONNECT) pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "1"); + + pw_properties_setf(stream->properties, PW_KEY_MEDIA_CLASS, "Stream/%s/Audio", direction == PW_DIRECTION_INPUT ? "Input" : "Output"); @@ -1380,7 +1425,7 @@ int pw_stream_disconnect(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_log_debug("stream %p: disconnect", stream); + pw_log_debug(NAME" %p: disconnect", stream); impl->disconnecting = true; if (stream->proxy) { @@ -1405,7 +1450,7 @@ void pw_stream_finish_format(struct pw_stream *stream, struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); uint32_t i; - pw_log_debug("stream %p: finish format %d %d", stream, res, impl->pending_seq); + pw_log_debug(NAME" %p: finish format %d %d", stream, res, impl->pending_seq); if (res < 0) { pw_proxy_error(stream->proxy, res, "format failed"); @@ -1430,7 +1475,7 @@ int pw_stream_set_control(struct pw_stream *stream, uint32_t id, float value, .. struct spa_pod *pod; struct control *c; - pw_log_debug("stream %p: set control %d %f", stream, id, value); + pw_log_debug(NAME" %p: set control %d %f", stream, id, value); va_start(varargs, value); @@ -1502,8 +1547,8 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) else time->queued = (int64_t)(impl->queued.incount - time->queued); - pw_log_trace("%"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %" - PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, + pw_log_trace(NAME" %p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %" + PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, stream, time->now, time->delay, time->ticks, time->rate.num, time->rate.denom, time->queued, impl->dequeued.outcount, impl->dequeued.incount, @@ -1540,12 +1585,12 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { res = -errno; - pw_log_trace("stream %p: no more buffers: %m", stream); + pw_log_trace(NAME" %p: no more buffers: %m", stream); call_trigger(impl); errno = -res; return NULL; } - pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); + pw_log_trace(NAME" %p: dequeue buffer %d", stream, b->id); return &b->this; } @@ -1557,7 +1602,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this); int res; - pw_log_trace("stream %p: queue buffer %d", stream, b->id); + pw_log_trace(NAME" %p: queue buffer %d", stream, b->id); if ((res = push_queue(impl, &impl->queued, b)) < 0) return res; @@ -1571,7 +1616,7 @@ do_flush(struct spa_loop *loop, struct stream *impl = user_data; struct buffer *b; - pw_log_trace("stream %p: flush", impl); + pw_log_trace(NAME" %p: flush", impl); do { b = pop_queue(impl, &impl->queued); if (b != NULL)