diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 769d2b9e4..852c66997 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -1,5 +1,5 @@ /* PipeWire - * Copyright (C) 2015 Wim Taymans + * Copyright (C) 2017 Wim Taymans * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -17,60 +17,73 @@ * Boston, MA 02110-1301, USA. */ -#include -#include -#include -#include #include +#include +#include +#include #include -#include "spa/lib/debug.h" +#include +#include +#include +#include +#include + +#include +#include #include "pipewire/pipewire.h" +#include "pipewire/stream.h" #include "pipewire/private.h" -#include "pipewire/interfaces.h" -#include "pipewire/array.h" -#include "pipewire/stream.h" -#include "pipewire/utils.h" -#include "pipewire/stream.h" #include "extensions/client-node.h" -/** \cond */ +#define MAX_BUFFERS 64 -#define MAX_BUFFER_SIZE 4096 -#define MAX_FDS 32 -#define MAX_INPUTS 64 -#define MAX_OUTPUTS 64 - -struct mem { - uint32_t id; - int fd; - uint32_t flags; - uint32_t ref; - struct pw_map_range map; - void *ptr; +struct type { + uint32_t client_node; }; -struct buffer_mem { - void *ptr; - struct pw_map_range map; - uint32_t mem_id; -}; +static inline void init_type(struct type *type, struct spa_type_map *map) +{ + type->client_node = spa_type_map_get_id(map, PW_TYPE_INTERFACE__ClientNode); + +} struct buffer { struct spa_list link; uint32_t id; #define BUFFER_FLAG_OUT (1 << 0) +#define BUFFER_FLAG_MAPPED (1 << 1) uint32_t flags; - struct spa_buffer *buf; - struct buffer_mem *mem; - uint32_t n_mem; + struct spa_buffer *buffer; }; struct stream { struct pw_stream this; - uint32_t type_client_node; + struct type type; + + const char *path; + + struct pw_core *core; + struct pw_type *t; + + enum spa_direction direction; + enum pw_stream_flags flags; + + struct pw_node *node; + struct spa_port_info port_info; + + struct spa_node impl_node; + const struct spa_node_callbacks *callbacks; + void *callbacks_data; + struct spa_io_buffers *io; + + struct buffer buffers[MAX_BUFFERS]; + int n_buffers; + struct spa_list queue; + bool in_need_buffer; + bool in_new_buffer; uint32_t n_init_params; struct spa_pod **init_params; @@ -80,150 +93,14 @@ struct stream { struct spa_pod *format; - struct spa_port_info port_info; - enum spa_direction direction; - uint32_t port_id; - uint32_t pending_seq; - - enum pw_stream_flags flags; - - int rtwritefd; - struct spa_source *rtsocket_source; - - struct pw_client_node_proxy *node_proxy; - bool disconnecting; - struct spa_hook node_listener; - struct spa_hook proxy_listener; - - struct pw_client_node_transport *trans; - - struct spa_source *timeout_source; - - struct pw_array mems; - struct pw_array buffers; - bool in_order; - struct spa_io_buffers *io; - bool client_reuse; - - struct spa_list free; - bool in_need_buffer; - bool in_new_buffer; + uint32_t pending_seq; + bool disconnecting; int64_t last_ticks; int32_t last_rate; int64_t last_monotonic; }; -/** \endcond */ - -static struct mem *find_mem(struct pw_stream *stream, uint32_t id) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem *m; - - pw_array_for_each(m, &impl->mems) { - if (m->id == id) - return m; - } - return NULL; -} - -static struct mem *find_mem_ptr(struct pw_stream *stream, void *ptr) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem *m; - - pw_array_for_each(m, &impl->mems) { - if (m->ptr == ptr) - return m; - } - return NULL; -} - -static void *mem_map(struct pw_stream *stream, struct pw_map_range *range, - int fd, int prot, uint32_t offset, uint32_t size) -{ - void *ptr; - - pw_map_range_init(range, offset, size, stream->remote->core->sc_pagesize); - - ptr = mmap(NULL, range->size, prot, MAP_SHARED, fd, range->offset); - if (ptr == MAP_FAILED) { - pw_log_error("stream %p: Failed to mmap memory %d: %m", stream, size); - return NULL; - } - - ptr = SPA_MEMBER(ptr, range->start, void); - pw_log_debug("stream %p: fd %d mapped %d %d %p", stream, fd, offset, size, ptr); - - return ptr; -} - -static void *mem_unmap(struct stream *impl, void *ptr, struct pw_map_range *range) -{ - if (ptr != NULL) { - if (munmap(SPA_MEMBER(ptr, -range->start, void) , range->size) < 0) - pw_log_warn("stream %p: failed to unmap: %m", impl); - } - return NULL; -} - -static void clear_mem(struct stream *impl, struct mem *m) -{ - if (m->fd != -1) { - bool has_ref = false; - struct mem *m2; - int fd; - - pw_log_debug("stream %p: clear mem %d", impl, m->id); - - fd = m->fd; - m->fd = -1; - m->id = SPA_ID_INVALID; - - pw_array_for_each(m2, &impl->mems) { - if (m2->fd == fd) { - has_ref = true; - break; - } - } - if (!has_ref) { - m->ptr = mem_unmap(impl, m->ptr, &m->map); - close(fd); - } - } -} - -static void clear_buffers(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer *b; - int i; - - pw_log_debug("stream %p: clear buffers", stream); - - pw_array_for_each(b, &impl->buffers) { - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - remove_buffer, b->id); - - for (i = 0; i < b->n_mem; i++) { - struct buffer_mem *bm = &b->mem[i]; - struct mem *m; - - pw_log_debug("stream %p: clear buffer %d mem %d", - stream, b->id, bm->mem_id); - - m = find_mem(stream, bm->mem_id); - if (m && --m->ref == 0) - clear_mem(impl, m); - } - b->n_mem = 0; - free(b->buf); - } - impl->buffers.size = 0; - impl->in_order = true; - spa_list_init(&impl->free); -} static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error) { @@ -245,29 +122,389 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat return res; } -const char *pw_stream_state_as_string(enum pw_stream_state state) +static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) { - switch (state) { - case PW_STREAM_STATE_ERROR: - return "error"; - case PW_STREAM_STATE_UNCONNECTED: - return "unconnected"; - case PW_STREAM_STATE_CONNECTING: - return "connecting"; - case PW_STREAM_STATE_CONFIGURE: - return "configure"; - case PW_STREAM_STATE_READY: - return "ready"; - case PW_STREAM_STATE_PAUSED: - return "paused"; - case PW_STREAM_STATE_STREAMING: - return "streaming"; - } - return "invalid-state"; + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + return &impl->buffers[id]; } -struct pw_stream *pw_stream_new(struct pw_remote *remote, - const char *name, struct pw_properties *props) +static int impl_send_command(struct spa_node *node, const struct spa_command *command) +{ + struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &impl->this; + struct pw_type *t = impl->t; + + if (SPA_COMMAND_TYPE(command) == t->command_node.Pause) { + if (stream->state == PW_STREAM_STATE_STREAMING) { + pw_log_debug("stream %p: pause", stream); + stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); + } + } else if (SPA_COMMAND_TYPE(command) == t->command_node.Start) { + if (stream->state == PW_STREAM_STATE_PAUSED) { + pw_log_debug("stream %p: start %d", stream, impl->direction); + + if (impl->direction == SPA_DIRECTION_INPUT) { + impl->io->status = SPA_STATUS_NEED_BUFFER; + } + else { + impl->in_need_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + need_buffer); + impl->in_need_buffer = false; + } + stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); + } + } else if (SPA_COMMAND_TYPE(command) == t->command_node.ClockUpdate) { + struct spa_command_node_clock_update *cu = (__typeof__(cu)) command; + + if (cu->body.flags.value & SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE) { + pw_properties_set(stream->properties, PW_STREAM_PROP_IS_LIVE, "1"); + pw_properties_setf(stream->properties, + PW_STREAM_PROP_LATENCY_MIN, "%" PRId64, + cu->body.latency.value); + } + impl->last_ticks = cu->body.ticks.value; + impl->last_rate = cu->body.rate.value; + impl->last_monotonic = cu->body.monotonic_time.value; + } else { + pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); + } + return 0; +} + +static int impl_set_callbacks(struct spa_node *node, + const struct spa_node_callbacks *callbacks, void *data) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + d->callbacks = callbacks; + d->callbacks_data = data; + return 0; +} + +static int impl_get_n_ports(struct spa_node *node, + uint32_t *n_input_ports, + uint32_t *max_input_ports, + uint32_t *n_output_ports, + uint32_t *max_output_ports) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + if (d->direction == SPA_DIRECTION_INPUT) { + *n_input_ports = *max_input_ports = 1; + *n_output_ports = *max_output_ports = 0; + } + else { + *n_input_ports = *max_input_ports = 0; + *n_output_ports = *max_output_ports = 1; + } + return 0; +} + +static int impl_get_port_ids(struct spa_node *node, + uint32_t *input_ids, + uint32_t n_input_ids, + uint32_t *output_ids, + uint32_t n_output_ids) +{ + if (n_output_ids > 0) + output_ids[0] = 0; + return 0; +} + +static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t id, void *data, size_t size) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_type *t = d->t; + + if (id == t->io.Buffers) + d->io = data; + else + return -ENOENT; + + return 0; +} + +static int impl_port_get_info(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_port_info **info) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + + d->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + d->port_info.rate = 0; + d->port_info.props = NULL; + + *info = &d->port_info; + + return 0; +} + +static int impl_port_enum_params(struct spa_node *node, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t *index, + const struct spa_pod *filter, + struct spa_pod **result, + struct spa_pod_builder *builder) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct spa_pod *param; + uint32_t last_id = SPA_ID_INVALID; + + pw_log_debug("start %d", *index); + while (true) { + if (*index < d->n_init_params) { + param = d->init_params[*index]; + pw_log_debug("init params %d", *index); + } + else if (*index < d->n_init_params + d->n_params) { + param = d->params[*index - d->n_init_params]; + pw_log_debug("params %d", *index); + } + else if (*index == (d->n_params + d->n_init_params) && d->format) { + param = d->format; + pw_log_debug("format %d", *index); + } + else if (last_id != SPA_ID_INVALID) + return 1; + else + return 0; + + (*index)++; + + if (id == d->t->param.idList) { + uint32_t new_id = ((struct spa_pod_object *) param)->body.id; + + if (last_id == SPA_ID_INVALID){ + *result = spa_pod_builder_object(builder, + id, d->t->param.List, + ":", d->t->param.listId, "I", new_id); + last_id = new_id; + } + else if (last_id != new_id) { + (*index)--; + break; + } + } else { + if (!spa_pod_is_object_id(param, id)) + continue; + + if (spa_pod_filter(builder, result, param, filter) == 0) + break; + } + } + pw_log_debug("result %d", *index); + spa_debug_pod(*result, 0); + + return 1; +} + +static int port_set_format(struct spa_node *node, + enum spa_direction direction, uint32_t port_id, + uint32_t flags, const struct spa_pod *format) +{ + struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &impl->this; + struct pw_type *t = impl->t; + int count; + + pw_log_debug("stream %p: format changed", impl); + + if (impl->format) + free(impl->format); + + if (spa_pod_is_object_type(format, t->spa_format)) { + impl->format = pw_spa_pod_copy(format); + ((struct spa_pod_object*)impl->format)->body.id = t->param.idFormat; + } + else + impl->format = NULL; + + count = spa_hook_list_call(&stream->listener_list, + struct pw_stream_events, + format_changed, impl->format); + + if (count == 0) + pw_stream_finish_format(stream, 0, NULL, 0); + + if (impl->format) + stream_set_state(stream, PW_STREAM_STATE_READY, NULL); + else + stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); + + return 0; +} + +static int impl_port_set_param(struct spa_node *node, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_type *t = d->t; + + if (id == t->param.idFormat) { + return port_set_format(node, direction, port_id, flags, param); + } + else + return -ENOENT; +} + +static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_buffer **buffers, uint32_t n_buffers) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &d->this; + struct pw_type *t = d->t; + int i, prot; + + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + + for (i = 0; i < n_buffers; i++) { + struct buffer *b = &d->buffers[i]; + struct spa_data *datas = buffers[i]->datas; + + b->flags = 0; + b->id = buffers[i]->id; + + if (datas[0].type == t->data.MemFd || + datas[0].type == t->data.DmaBuf) { + void *ptr; + ptr = mmap(NULL, datas[0].maxsize + datas[0].mapoffset, prot, + MAP_SHARED, datas[0].fd, 0); + if (ptr == MAP_FAILED) { + pw_log_error("failed to mmap buffer mem"); + return -errno; + + } + datas[0].data = SPA_MEMBER(ptr, datas[0].mapoffset, void); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED); + } + else if (datas[0].data == NULL) { + pw_log_error("invalid buffer mem"); + return -EINVAL; + } + b->buffer = buffers[i]; + pw_log_info("got buffer %d size %d", i, datas[0].maxsize); + spa_list_append(&d->queue, &b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + } + d->n_buffers = n_buffers; + + if (n_buffers > 0) + stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); + else + stream_set_state(stream, PW_STREAM_STATE_READY, NULL); + + return 0; +} + +static inline void reuse_buffer(struct stream *d, uint32_t id) +{ + pw_log_trace("export-source %p: recycle buffer %d", d, id); + if (id < d->n_buffers) { + struct buffer *b = &d->buffers[id]; + spa_list_append(&d->queue, &b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + } +} + +static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id) +{ + struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); + reuse_buffer(d, buffer_id); + return 0; +} + +static int impl_node_process(struct spa_node *node) +{ + struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &impl->this; + struct spa_io_buffers *io = impl->io; + struct buffer *b; + uint32_t buffer_id; + int res; + + if (impl->direction == SPA_DIRECTION_INPUT) { + buffer_id = io->buffer_id; + + pw_log_trace("stream %p: process input %d %d", stream, io->status, + buffer_id); + + if ((b = find_buffer(stream, buffer_id)) == NULL) + return SPA_STATUS_NEED_BUFFER; + + if (impl->client_reuse) + io->buffer_id = SPA_ID_INVALID; + + if (io->status == SPA_STATUS_HAVE_BUFFER) { + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + + impl->in_new_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + new_buffer, buffer_id); + impl->in_new_buffer = false; + } + io->status = SPA_STATUS_NEED_BUFFER; + res = SPA_STATUS_NEED_BUFFER; + } else { + reuse_buffer(impl, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; + + pw_log_trace("stream %p: process output", stream); + impl->in_need_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + need_buffer); + impl->in_need_buffer = false; + + res = SPA_STATUS_HAVE_BUFFER; + } + return res; +} + +static const struct spa_node impl_node = { + SPA_VERSION_NODE, + .send_command = impl_send_command, + .set_callbacks = impl_set_callbacks, + .get_n_ports = impl_get_n_ports, + .get_port_ids = impl_get_port_ids, + .port_set_io = impl_port_set_io, + .port_get_info = impl_port_get_info, + .port_enum_params = impl_port_enum_params, + .port_set_param = impl_port_set_param, + .port_use_buffers = impl_port_use_buffers, + .port_reuse_buffer = impl_port_reuse_buffer, + .process = impl_node_process, +}; + +#if 0 +static void on_state_changed(void *_data, enum pw_remote_state old, + enum pw_remote_state state, const char *error) +{ + struct stream *data = _data; + + switch (state) { + case PW_REMOTE_STATE_ERROR: + printf("remote error: %s\n", error); + pw_main_loop_quit(data->loop); + break; + + case PW_REMOTE_STATE_CONNECTED: + make_node(data); + break; + + default: + printf("remote state: \"%s\"\n", pw_remote_state_as_string(state)); + break; + } +} + +static const struct pw_remote_events remote_events = { + PW_VERSION_REMOTE_EVENTS, + .state_changed = on_state_changed, +}; +#endif + +struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, + struct pw_properties *props) { struct stream *impl; struct pw_stream *this; @@ -292,22 +529,21 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, this->remote = remote; this->name = strdup(name); - impl->type_client_node = spa_type_map_get_id(remote->core->type.map, PW_TYPE_INTERFACE__ClientNode); - impl->rtwritefd = -1; + + init_type(&impl->type, remote->core->type.map); str = pw_properties_get(props, "pipewire.client.reuse"); impl->client_reuse = str && pw_properties_parse_bool(str); + spa_list_init(&impl->queue); + spa_hook_list_init(&this->listener_list); this->state = PW_STREAM_STATE_UNCONNECTED; - pw_array_init(&impl->mems, 64); - pw_array_ensure_size(&impl->mems, sizeof(struct mem) * 64); - pw_array_init(&impl->buffers, 32); - pw_array_ensure_size(&impl->buffers, sizeof(struct buffer) * 64); + impl->core = remote->core; + impl->t = &remote->core->type; impl->pending_seq = SPA_ID_INVALID; - spa_list_init(&impl->free); spa_list_append(&remote->stream_list, &this->link); @@ -318,59 +554,56 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, return NULL; } -enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **error) +#if 0 +struct pw_stream * pw_stream_new_simple(const char *name, struct pw_properties *props) { - if (error) - *error = stream->error; - return stream->state; -} + struct stream data = { 0, }; -const char *pw_stream_get_name(struct pw_stream *stream) -{ - return stream->name; -} + pw_init(&argc, &argv); -const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream) -{ - return stream->properties; -} + data.loop = pw_main_loop_new(NULL); + data.core = pw_core_new(pw_main_loop_get_loop(data.loop), NULL); + data.t = pw_core_get_type(data.core); + data.remote = pw_remote_new(data.core, NULL, 0); + data.path = argc > 1 ? argv[1] : NULL; -void pw_stream_add_listener(struct pw_stream *stream, - struct spa_hook *listener, - const struct pw_stream_events *events, - void *data) -{ - spa_hook_list_append(&stream->listener_list, listener, events, data); -} + spa_list_init(&data.empty); + init_type(&data.type, data.t->map); + reset_props(&data.props); + spa_debug_set_type_map(data.t->map); -static int -do_remove_sources(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct stream *impl = user_data; - struct pw_stream *stream = &impl->this; + pw_remote_add_listener(data.remote, &data.remote_listener, &remote_events, &data); + + pw_remote_connect(data.remote); + + pw_main_loop_run(data.loop); + + pw_core_destroy(data.core); + pw_main_loop_destroy(data.loop); - if (impl->rtsocket_source) { - pw_loop_destroy_source(stream->remote->core->data_loop, impl->rtsocket_source); - impl->rtsocket_source = NULL; - } - if (impl->timeout_source) { - pw_loop_destroy_source(stream->remote->core->data_loop, impl->timeout_source); - impl->timeout_source = NULL; - } - if (impl->rtwritefd != -1) { - close(impl->rtwritefd); - impl->rtwritefd = -1; - } return 0; } +#endif -static void unhandle_socket(struct pw_stream *stream) +const char *pw_stream_state_as_string(enum pw_stream_state state) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - pw_loop_invoke(stream->remote->core->data_loop, - do_remove_sources, 1, NULL, 0, true, impl); + switch (state) { + case PW_STREAM_STATE_ERROR: + return "error"; + case PW_STREAM_STATE_UNCONNECTED: + return "unconnected"; + case PW_STREAM_STATE_CONNECTING: + return "connecting"; + case PW_STREAM_STATE_CONFIGURE: + return "configure"; + case PW_STREAM_STATE_READY: + return "ready"; + case PW_STREAM_STATE_PAUSED: + return "paused"; + case PW_STREAM_STATE_STREAMING: + return "streaming"; + } + return "invalid-state"; } static void @@ -413,7 +646,6 @@ static void set_params(struct pw_stream *stream, int n_params, struct spa_pod ** impl->params[i] = pw_spa_pod_copy(params[i]); } } - void pw_stream_destroy(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -422,9 +654,6 @@ void pw_stream_destroy(struct pw_stream *stream) spa_hook_list_call(&stream->listener_list, struct pw_stream_events, destroy); - if (impl->node_proxy) - spa_hook_remove(&impl->proxy_listener); - pw_stream_disconnect(stream); spa_list_remove(&stream->link); @@ -438,9 +667,6 @@ void pw_stream_destroy(struct pw_stream *stream) if (stream->error) free(stream->error); - pw_array_clear(&impl->mems); - pw_array_clear(&impl->buffers); - if (stream->properties) pw_properties_free(stream->properties); @@ -450,663 +676,31 @@ void pw_stream_destroy(struct pw_stream *stream) free(impl); } -static void add_node_update(struct pw_stream *stream, uint32_t change_mask) +void pw_stream_add_listener(struct pw_stream *stream, + struct spa_hook *listener, + const struct pw_stream_events *events, + void *data) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint32_t max_input_ports = 0, max_output_ports = 0; - - if (change_mask & PW_CLIENT_NODE_UPDATE_MAX_INPUTS) - max_input_ports = impl->direction == SPA_DIRECTION_INPUT ? 1 : 0; - if (change_mask & PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS) - max_output_ports = impl->direction == SPA_DIRECTION_OUTPUT ? 1 : 0; - - pw_client_node_proxy_update(impl->node_proxy, - change_mask, max_input_ports, max_output_ports, - 0, NULL); + spa_hook_list_append(&stream->listener_list, listener, events, data); } -static void add_port_update(struct pw_stream *stream, uint32_t change_mask) +enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **error) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint32_t n_params; - struct spa_pod **params; - int i, j; - - n_params = impl->n_params + impl->n_init_params; - if (impl->format) - n_params += 1; - - params = alloca(n_params * sizeof(struct spa_pod *)); - - j = 0; - for (i = 0; i < impl->n_init_params; i++) - params[j++] = impl->init_params[i]; - if (impl->format) - params[j++] = impl->format; - for (i = 0; i < impl->n_params; i++) - params[j++] = impl->params[i]; - - pw_client_node_proxy_port_update(impl->node_proxy, - impl->direction, - impl->port_id, - change_mask, - n_params, - (const struct spa_pod **) params, - &impl->port_info); + if (error) + *error = stream->error; + return stream->state; } -static inline void send_have_output(struct pw_stream *stream) +const char *pw_stream_get_name(struct pw_stream *stream) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint64_t cmd = 1; - write(impl->rtwritefd, &cmd, 8); + return stream->name; } -static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id) +const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - uint64_t cmd = 1; - write(impl->rtwritefd, &cmd, 8); + return stream->properties; } -static void add_request_clock_update(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - pw_client_node_proxy_event(impl->node_proxy, (struct spa_event *) - &SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(stream->remote->core->type. - event_node. - RequestClockUpdate, - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, - 0, 0)); -} - -static void add_async_complete(struct pw_stream *stream, uint32_t seq, int res) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - pw_client_node_proxy_done(impl->node_proxy, seq, res); -} - -static void do_node_init(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - add_node_update(stream, PW_CLIENT_NODE_UPDATE_MAX_INPUTS | - PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS); - - impl->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - - add_port_update(stream, PW_CLIENT_NODE_PORT_UPDATE_PARAMS | - PW_CLIENT_NODE_PORT_UPDATE_INFO); - - add_async_complete(stream, 0, 0); - if (!(impl->flags & PW_STREAM_FLAG_INACTIVE)) - pw_client_node_proxy_set_active(impl->node_proxy, true); -} - -static void on_timeout(void *data, uint64_t expirations) -{ - struct pw_stream *stream = data; - add_request_clock_update(stream); -} - -static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - if (impl->in_order && pw_array_check_index(&impl->buffers, id, struct buffer)) { - return pw_array_get_unchecked(&impl->buffers, id, struct buffer); - } else { - struct buffer *b; - - pw_array_for_each(b, &impl->buffers) { - if (b->id == id) - return b; - } - } - return NULL; -} - -static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer *b; - - if ((b = find_buffer(stream, id)) && SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { - pw_log_trace("stream %p: reuse buffer %u", stream, id); - - SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); - spa_list_append(&impl->free, &b->link); - - impl->in_new_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, id); - impl->in_new_buffer = false; - } -} - -static void do_process(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct spa_io_buffers *io = impl->io; - struct buffer *b; - uint32_t buffer_id; - - if (impl->direction == SPA_DIRECTION_INPUT) { - buffer_id = io->buffer_id; - - pw_log_trace("stream %p: process input %d %d", stream, io->status, - buffer_id); - - if ((b = find_buffer(stream, buffer_id)) == NULL) - return; - - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; - - if (io->status == SPA_STATUS_HAVE_BUFFER) { - SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - - impl->in_new_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, buffer_id); - impl->in_new_buffer = false; - } - io->status = SPA_STATUS_NEED_BUFFER; - } else { - reuse_buffer(stream, io->buffer_id); - io->buffer_id = SPA_ID_INVALID; - - pw_log_trace("stream %p: process output", stream); - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; - } -} - -static void -on_rtsocket_condition(void *data, int fd, enum spa_io mask) -{ - struct pw_stream *stream = data; - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { - pw_log_warn("got error"); - unhandle_socket(stream); - return; - } - - if (mask & SPA_IO_IN) { - uint64_t cmd; - - if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t)) - pw_log_warn("stream %p: read failed %m", impl); - - do_process(stream); - } -} - -static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct timespec interval; - - impl->rtwritefd = rtwritefd; - impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop, - rtreadfd, - SPA_IO_ERR | SPA_IO_HUP, - true, on_rtsocket_condition, stream); - - if (impl->flags & PW_STREAM_FLAG_CLOCK_UPDATE) { - impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream); - interval.tv_sec = 0; - interval.tv_nsec = 100000000; - pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false); - } - return; -} - -static void -client_node_set_param(void *data, uint32_t seq, uint32_t id, uint32_t flags, - const struct spa_pod *param) -{ - pw_log_warn("set param not implemented"); -} - -static void client_node_event(void *data, const struct spa_event *event) -{ - pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event)); -} - -static void client_node_command(void *data, uint32_t seq, const struct spa_command *command) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - struct pw_remote *remote = stream->remote; - - if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Pause) { - add_async_complete(stream, seq, 0); - - if (stream->state == PW_STREAM_STATE_STREAMING) { - pw_log_debug("stream %p: pause %d", stream, seq); - - pw_loop_update_io(stream->remote->core->data_loop, - impl->rtsocket_source, SPA_IO_ERR | SPA_IO_HUP); - - stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); - } - } else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Start) { - add_async_complete(stream, seq, 0); - - if (stream->state == PW_STREAM_STATE_PAUSED) { - pw_log_debug("stream %p: start %d %d", stream, seq, impl->direction); - - pw_loop_update_io(stream->remote->core->data_loop, - impl->rtsocket_source, - SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); - - if (impl->direction == SPA_DIRECTION_INPUT) { - impl->io->status = SPA_STATUS_NEED_BUFFER; - } - else { - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; - } - stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); - } - } else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.ClockUpdate) { - struct spa_command_node_clock_update *cu = (__typeof__(cu)) command; - - if (cu->body.flags.value & SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE) { - pw_properties_set(stream->properties, PW_STREAM_PROP_IS_LIVE, "1"); - pw_properties_setf(stream->properties, - PW_STREAM_PROP_LATENCY_MIN, "%" PRId64, - cu->body.latency.value); - } - impl->last_ticks = cu->body.ticks.value; - impl->last_rate = cu->body.rate.value; - impl->last_monotonic = cu->body.monotonic_time.value; - } else { - pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); - add_async_complete(stream, seq, -ENOTSUP); - } -} - -static void -client_node_add_port(void *data, uint32_t seq, enum spa_direction direction, uint32_t port_id) -{ - pw_log_warn("add port not supported"); -} - -static void -client_node_remove_port(void *data, uint32_t seq, enum spa_direction direction, uint32_t port_id) -{ - pw_log_warn("remove port not supported"); -} - -static void -client_node_port_set_param(void *data, - uint32_t seq, - enum spa_direction direction, uint32_t port_id, - uint32_t id, uint32_t flags, - const struct spa_pod *param) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - struct pw_type *t = &stream->remote->core->type; - - if (id == t->param.idFormat) { - int count; - - pw_log_debug("stream %p: format changed %d", stream, seq); - - if (impl->format) - free(impl->format); - - if (spa_pod_is_object_type(param, t->spa_format)) { - impl->format = pw_spa_pod_copy(param); - ((struct spa_pod_object*)impl->format)->body.id = id; - } - else - impl->format = NULL; - - impl->pending_seq = seq; - - count = spa_hook_list_call(&stream->listener_list, - struct pw_stream_events, - format_changed, impl->format); - - if (count == 0) - pw_stream_finish_format(stream, 0, NULL, 0); - - if (impl->format) - stream_set_state(stream, PW_STREAM_STATE_READY, NULL); - else - stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); - } - else - pw_log_warn("set param not implemented"); -} - -static void -client_node_add_mem(void *data, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - struct mem *m; - - m = find_mem(stream, mem_id); - if (m) { - pw_log_warn("duplicate mem %u, fd %d, flags %d", - mem_id, memfd, flags); - return; - } - m = pw_array_add(&impl->mems, sizeof(struct mem)); - pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags); - - m->id = mem_id; - m->fd = memfd; - m->flags = flags; - m->ref = 0; - m->map = PW_MAP_RANGE_INIT; - m->ptr = NULL; -} - -static void -client_node_port_use_buffers(void *data, - uint32_t seq, - enum spa_direction direction, uint32_t port_id, uint32_t mix_id, - uint32_t n_buffers, struct pw_client_node_buffer *buffers) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - struct pw_core *core = stream->remote->core; - struct pw_type *t = &core->type; - struct buffer *bid; - uint32_t i, j, len; - struct spa_buffer *b; - int res = 0, prot; - - prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); - - /* clear previous buffers */ - clear_buffers(stream); - - for (i = 0; i < n_buffers; i++) { - struct buffer_mem bmem; - size_t size; - off_t offset; - struct mem *m; - - if ((m = find_mem(stream, buffers[i].mem_id)) == NULL) { - pw_log_error("unknown memory id %u", buffers[i].mem_id); - res = -EINVAL; - goto error; - } - - len = pw_array_get_len(&impl->buffers, struct buffer); - bid = pw_array_add(&impl->buffers, sizeof(struct buffer)); - bid->flags = 0; - - bmem.mem_id = m->id; - bmem.ptr = mem_map(stream, &bmem.map, m->fd, prot, - buffers[i].offset, buffers[i].size); - if (bmem.ptr == NULL) { - res = -errno; - goto error; - } - - size = sizeof(struct spa_buffer); - size += sizeof(struct buffer_mem); - for (j = 0; j < buffers[i].buffer->n_metas; j++) - size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) { - size += sizeof(struct spa_data); - size += sizeof(struct buffer_mem); - } - - b = bid->buf = malloc(size); - if (b == NULL) { - res = -ENOMEM; - goto error; - } - memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); - - b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, - struct spa_data); - bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, - struct buffer_mem); - bid->n_mem = 0; - - bid->id = b->id; - - bid->mem[bid->n_mem++] = bmem; - m->ref++; - - if (bid->id != len) { - pw_log_warn("unexpected id %u found, expected %u", bid->id, len); - impl->in_order = false; - } - pw_log_debug("add buffer %d %d %u %u", m->id, - bid->id, bmem.map.offset, bmem.map.size); - - offset = 0; - for (j = 0; j < b->n_metas; j++) { - struct spa_meta *m = &b->metas[j]; - memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); - m->data = SPA_MEMBER(bmem.ptr, offset, void); - offset += m->size; - } - - for (j = 0; j < b->n_datas; j++) { - struct spa_data *d = &b->datas[j]; - - memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); - d->chunk = - SPA_MEMBER(bmem.ptr, offset + sizeof(struct spa_chunk) * j, - struct spa_chunk); - - if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); - struct mem *bm = find_mem(stream, mem_id); - struct buffer_mem bm2; - - if (bm == NULL) { - pw_log_error("unknown memory id %u", mem_id); - continue; - } - - d->fd = bm->fd; - bm->ref++; - bm2.mem_id = bm->id; - - if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) - bm2.ptr = mem_map(stream, &bm2.map, d->fd, prot, - d->mapoffset, d->maxsize); - else - bm2.ptr = NULL; - - d->data = bm2.ptr; - - bid->mem[bid->n_mem++] = bm2; - - pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); - } else if (d->type == t->data.MemPtr) { - int offs = SPA_PTR_TO_INT(d->data); - d->data = SPA_MEMBER(bmem.ptr, offs, void); - d->fd = -1; - pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); - } else { - pw_log_warn("unknown buffer data type %d", d->type); - } - } - - if (impl->direction == SPA_DIRECTION_OUTPUT) { - SPA_FLAG_UNSET(bid->flags, BUFFER_FLAG_OUT); - spa_list_append(&impl->free, &bid->link); - } else { - SPA_FLAG_SET(bid->flags, BUFFER_FLAG_OUT); - } - - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - add_buffer, bid->id); - } - - if (n_buffers) - stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); - else { - stream_set_state(stream, PW_STREAM_STATE_READY, NULL); - } - - exit: - add_async_complete(stream, seq, res); - return; - - error: - clear_buffers(stream); - goto exit; -} - -static void -client_node_port_command(void *data, - uint32_t direction, - uint32_t port_id, - const struct spa_command *command) -{ - pw_log_warn("port command not supported"); -} - -static void clean_transport(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - if (impl->trans == NULL) - return; - - unhandle_socket(stream); - clear_buffers(stream); - - pw_client_node_transport_destroy(impl->trans); - impl->trans = NULL; -} - -static void client_node_transport(void *data, uint32_t node_id, - int readfd, int writefd, - struct pw_client_node_transport *transport) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - - stream->node_id = node_id; - - clean_transport(stream); - - impl->trans = transport; - - pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", - stream, impl->trans, readfd, writefd, node_id); - - handle_socket(stream, readfd, writefd); - - stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); -} - -static void client_node_port_set_io(void *data, - uint32_t seq, - enum spa_direction direction, - uint32_t port_id, - uint32_t mix_id, - uint32_t id, - uint32_t mem_id, - uint32_t offset, - uint32_t size) -{ - struct stream *impl = data; - struct pw_stream *stream = &impl->this; - struct pw_core *core = stream->remote->core; - struct pw_type *t = &core->type; - struct mem *m; - void *ptr; - int res; - - if (mem_id == SPA_ID_INVALID) { - ptr = NULL; - size = 0; - } - else { - m = find_mem(stream, mem_id); - if (m == NULL) { - pw_log_warn("unknown memory id %u", mem_id); - res = -EINVAL; - goto exit; - } - if (m->ptr == NULL) { - m->ptr = mem_map(stream, &m->map, m->fd, - PROT_READ|PROT_WRITE, offset, size); - if (m->ptr == NULL) { - res = -errno; - goto exit; - } - } - m->ref++; - ptr = m->ptr; - } - - if (id == t->io.Buffers) { - if (ptr == NULL && impl->io) { - m = find_mem_ptr(stream, impl->io); - if (m && --m->ref == 0) - clear_mem(impl, m); - } - impl->io = ptr; - pw_log_debug("stream %p: %u.%u set io id %u %p", stream, - port_id, mix_id, id, ptr); - } - - res = 0; - - exit: - add_async_complete(stream, seq, res); -} - -static const struct pw_client_node_proxy_events client_node_events = { - PW_VERSION_CLIENT_NODE_PROXY_EVENTS, - .add_mem = client_node_add_mem, - .transport = client_node_transport, - .set_param = client_node_set_param, - .event = client_node_event, - .command = client_node_command, - .add_port = client_node_add_port, - .remove_port = client_node_remove_port, - .port_set_param = client_node_port_set_param, - .port_use_buffers = client_node_port_use_buffers, - .port_command = client_node_port_command, - .port_set_io = client_node_port_set_io, -}; - -static void on_node_proxy_destroy(void *data) -{ - struct stream *impl = data; - struct pw_stream *this = &impl->this; - - impl->disconnecting = false; - impl->node_proxy = NULL; - spa_hook_remove(&impl->proxy_listener); - - stream_set_state(this, PW_STREAM_STATE_UNCONNECTED, NULL); -} - -static const struct pw_proxy_events proxy_events = { - PW_VERSION_PROXY_EVENTS, - .destroy = on_node_proxy_destroy, -}; - int pw_stream_connect(struct pw_stream *stream, enum pw_direction direction, @@ -1119,7 +713,6 @@ pw_stream_connect(struct pw_stream *stream, impl->direction = direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT; - impl->port_id = 0; impl->flags = flags; set_init_params(stream, n_params, params); @@ -1133,34 +726,38 @@ pw_stream_connect(struct pw_stream *stream, if (flags & PW_STREAM_FLAG_AUTOCONNECT) pw_properties_set(stream->properties, PW_NODE_PROP_AUTOCONNECT, "1"); - impl->node_proxy = pw_core_proxy_create_object(stream->remote->core_proxy, - "client-node", - impl->type_client_node, - PW_VERSION_CLIENT_NODE, - &stream->properties->dict, 0); - if (impl->node_proxy == NULL) - return -ENOMEM; + impl->node = pw_node_new(impl->core, "export-source", + pw_properties_copy(stream->properties), 0); + impl->impl_node = impl_node; + pw_node_set_implementation(impl->node, &impl->impl_node); - pw_client_node_proxy_add_listener(impl->node_proxy, - &impl->node_listener, - &client_node_events, impl); - pw_proxy_add_listener((struct pw_proxy*)impl->node_proxy, - &impl->proxy_listener, - &proxy_events, impl); - - do_node_init(stream); + pw_node_register(impl->node, NULL, NULL, NULL); + pw_node_set_active(impl->node, true); + pw_remote_export(stream->remote, impl->node); return 0; } -uint32_t -pw_stream_get_node_id(struct pw_stream *stream) +uint32_t pw_stream_get_node_id(struct pw_stream *stream) { return stream->node_id; } -void -pw_stream_finish_format(struct pw_stream *stream, + +int pw_stream_disconnect(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + impl->disconnecting = true; + + if (impl->node) { + pw_node_destroy(impl->node); + impl->node = NULL; + } + return 0; +} + +void pw_stream_finish_format(struct pw_stream *stream, int res, struct spa_pod **params, uint32_t n_params) @@ -1171,41 +768,13 @@ pw_stream_finish_format(struct pw_stream *stream, set_params(stream, n_params, params); - if (SPA_RESULT_IS_OK(res)) { - add_port_update(stream, PW_CLIENT_NODE_PORT_UPDATE_PARAMS); - - if (!impl->format) { - clear_buffers(stream); - } - } - add_async_complete(stream, impl->pending_seq, res); - impl->pending_seq = SPA_ID_INVALID; } -int pw_stream_disconnect(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - impl->disconnecting = true; - - clean_transport(stream); - - if (impl->node_proxy) { - pw_client_node_proxy_destroy(impl->node_proxy); - impl->node_proxy = NULL; - } - if (impl->trans) { - pw_client_node_transport_destroy(impl->trans); - impl->trans = NULL; - } - return 0; -} - int pw_stream_set_active(struct pw_stream *stream, bool active) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_client_node_proxy_set_active(impl->node_proxy, active); + pw_node_set_active(impl->node, active); return 0; } @@ -1230,10 +799,10 @@ uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - if (spa_list_is_empty(&impl->free)) + if (spa_list_is_empty(&impl->queue)) return SPA_ID_INVALID; - b = spa_list_first(&impl->free, struct buffer, link); + b = spa_list_first(&impl->queue, struct buffer, link); return b->id; } @@ -1248,27 +817,35 @@ int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) return -EINVAL; SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); - spa_list_append(&impl->free, &b->link); + spa_list_append(&impl->queue, &b->link); if (impl->in_new_buffer) { struct spa_io_buffers *io = impl->io; io->buffer_id = id; - } else { - send_reuse_buffer(stream, id); } return 0; } -struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) +struct spa_buffer * +pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) { struct buffer *b; if ((b = find_buffer(stream, id))) - return b->buf; + return b->buffer; return NULL; } +static int +do_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + impl->callbacks->process(impl->callbacks_data, SPA_STATUS_HAVE_BUFFER); + return 0; +} + int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -1287,10 +864,11 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) io->buffer_id = id; io->status = SPA_STATUS_HAVE_BUFFER; pw_log_trace("stream %p: send buffer %d", stream, id); - send_have_output(stream); + if (!impl->in_need_buffer) + pw_loop_invoke(impl->core->data_loop, + do_process, 1, NULL, 0, false, impl); } else { pw_log_debug("stream %p: output %u was used", stream, id); } - return 0; }