From 57cbc5e503e4a91b19b84e2cf272b5036870ea28 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 19 Apr 2018 20:15:30 +0200 Subject: [PATCH] various cleanups --- src/modules/module-audio-dsp.c | 44 ++++++++++------- src/modules/module-client-node/client-node.c | 7 ++- src/pipewire/link.c | 1 - src/pipewire/node.h | 5 +- src/pipewire/properties.c | 4 +- src/pipewire/remote.c | 24 ++++++--- src/pipewire/stream.c | 52 ++++++++++++++------ 7 files changed, 88 insertions(+), 49 deletions(-) diff --git a/src/modules/module-audio-dsp.c b/src/modules/module-audio-dsp.c index 0350044da..85259b414 100644 --- a/src/modules/module-audio-dsp.c +++ b/src/modules/module-audio-dsp.c @@ -83,7 +83,7 @@ struct buffer { #define BUFFER_FLAG_OUT (1<<0) uint32_t flags; struct spa_list link; - struct spa_buffer *outbuf; + struct spa_buffer *buf; void *ptr; }; @@ -215,16 +215,6 @@ static int node_remove_port(struct spa_node *node, enum spa_direction direction, return -ENOTSUP; } -static void recycle_buffer(struct node *n, struct port *p, uint32_t id) -{ - struct buffer *b = &p->buffers[id]; - if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { - pw_log_trace("recycle buffer %d", id); - spa_list_append(&p->queue, &b->link); - SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); - } -} - static int clear_buffers(struct node *n, struct port *p) { if (p->n_buffers > 0) { @@ -275,10 +265,21 @@ static struct buffer * peek_buffer(struct node *n, struct port *p) static void dequeue_buffer(struct node *n, struct buffer *b) { + pw_log_trace("dequeue buffer %d", b->buf->id); spa_list_remove(&b->link); SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); } +static void queue_buffer(struct node *n, struct port *p, uint32_t id) +{ + struct buffer *b = &p->buffers[id]; + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { + pw_log_trace("queue buffer %d", id); + spa_list_append(&p->queue, &b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + } +} + static int node_process(struct spa_node *node) { struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl); @@ -292,6 +293,11 @@ static int node_process(struct spa_node *node) if (outio->status == SPA_STATUS_HAVE_BUFFER) return SPA_STATUS_HAVE_BUFFER; + if (outio->buffer_id < outp->n_buffers) { + queue_buffer(n, outp, outio->buffer_id); + outio->buffer_id = SPA_ID_INVALID; + } + out = peek_buffer(n, outp); if (out == NULL) { pw_log_warn(NAME " %p: out of buffers", this); @@ -299,14 +305,14 @@ static int node_process(struct spa_node *node) } dequeue_buffer(n, out); - outio->buffer_id = out->outbuf->id; + outio->buffer_id = out->buf->id; outio->status = SPA_STATUS_HAVE_BUFFER; - pw_log_trace(NAME " %p: output buffer %d %d", this, out->outbuf->id, out->flags); + pw_log_trace(NAME " %p: output buffer %d %d", this, out->buf->id, out->flags); - out->outbuf->datas[0].chunk->offset = 0; - out->outbuf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * n->channels; - out->outbuf->datas[0].chunk->stride = 0; + out->buf->datas[0].chunk->offset = 0; + out->buf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * n->channels; + out->buf->datas[0].chunk->stride = 0; return outio->status; } @@ -509,7 +515,7 @@ static int port_use_buffers(struct spa_node *node, enum spa_direction direction, b = &p->buffers[i]; b->flags = 0; - b->outbuf = buffers[i]; + b->buf = buffers[i]; if ((d[0].type == t->data.MemPtr || d[0].type == t->data.MemFd || @@ -542,7 +548,7 @@ static int port_alloc_buffers(struct spa_node *node, enum spa_direction directio struct spa_data *d = buffers[i]->datas; b = &p->buffers[i]; - b->outbuf = buffers[i]; + b->buf = buffers[i]; d[0].type = t->data.MemPtr; d[0].maxsize = n->buffer_size; b->ptr = d[0].data = p->buffer; @@ -560,7 +566,7 @@ static int port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t b { struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl); struct port *p = GET_OUT_PORT(n, port_id); - recycle_buffer(n, p, buffer_id); + queue_buffer(n, p, buffer_id); return 0; } diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 1edad039e..4d989b81d 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -511,15 +511,12 @@ static int impl_node_add_port(struct spa_node *node, enum spa_direction direction, uint32_t port_id) { struct node *this; - struct port *port; if (node == NULL) return -EINVAL; this = SPA_CONTAINER_OF(node, struct node, node); - port = GET_PORT(this, direction, port_id); - if (!CHECK_FREE_PORT(this, direction, port_id)) return -EINVAL; @@ -1256,7 +1253,9 @@ static int mix_port_process(struct spa_node *data) { struct pw_port *p = SPA_CONTAINER_OF(data, struct pw_port, mix_node); struct spa_io_buffers *io = p->rt.mix_port.io; - pw_log_trace("client-node %p: pass %d %d", data, io->status, io->buffer_id); + if (io != NULL) { + pw_log_trace("client-node %p: pass %d %d", data, io->status, io->buffer_id); + } return SPA_STATUS_HAVE_BUFFER; } diff --git a/src/pipewire/link.c b/src/pipewire/link.c index e72cf9aca..915aa98ee 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1014,7 +1014,6 @@ int pw_link_activate(struct pw_link *this) this->input->node->idle_used_input_links++; this->output->node->idle_used_output_links++; } - pw_work_queue_add(impl->work, this, -EBUSY, (pw_work_func_t) check_states, this); diff --git a/src/pipewire/node.h b/src/pipewire/node.h index 95e300540..637ce6bc7 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -91,11 +91,12 @@ struct pw_node_events { void (*finish) (void *data); }; -/** Media type of the node, Audio, Video */ +/** Media type of the node, Audio, Video, Midi */ #define PW_NODE_PROP_MEDIA "pipewire.media" /** Category: Playback, Capture, Duplex */ #define PW_NODE_PROP_CATEGORY "pipewire.category" -/** Role: Movie, Music, Camera, Screen, Chat, Game, Speech, DSP */ +/** Role: Movie,Music, Camera, Screen, Communication, Game, Notification, DSP, + * Production, Accessibility, Test */ #define PW_NODE_PROP_ROLE "pipewire.role" /** exclusive access to device */ #define PW_NODE_PROP_EXCLUSIVE "pipewire.exclusive" diff --git a/src/pipewire/properties.c b/src/pipewire/properties.c index 269b77fb6..6b727bf69 100644 --- a/src/pipewire/properties.c +++ b/src/pipewire/properties.c @@ -186,7 +186,7 @@ struct pw_properties *pw_properties_copy(const struct pw_properties *properties) return NULL; pw_array_for_each(item, &impl->items) - add_func(copy, strdup(item->key), item->value ? strdup(item->value) : NULL); + add_func(copy, strdup(item->key), item->value ? strdup(item->value) : NULL); return copy; } @@ -241,7 +241,7 @@ void pw_properties_free(struct pw_properties *properties) struct spa_dict_item *item; pw_array_for_each(item, &impl->items) - clear_item(item); + clear_item(item); pw_array_clear(&impl->items); free(impl); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 4c6609e75..99cad8cde 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -456,7 +456,9 @@ int pw_remote_disconnect(struct pw_remote *remote) remote->core_proxy = NULL; pw_map_clear(&remote->objects); + pw_map_init(&remote->objects, 64, 32); pw_map_clear(&remote->types); + pw_map_init(&remote->types, 64, 32); remote->n_types = 0; if (remote->info) { @@ -609,7 +611,7 @@ static void clean_transport(struct node_data *data) { struct mem *m; - if (data->node_id == SPA_ID_INVALID) + if (data->rtsocket_source == NULL) return; unhandle_socket(data); @@ -619,8 +621,6 @@ static void clean_transport(struct node_data *data) pw_array_clear(&data->mems); close(data->rtwritefd); - - data->node_id = SPA_ID_INVALID; } static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) @@ -994,7 +994,7 @@ client_node_port_use_buffers(void *object, bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); for (i = 0; i < n_buffers; i++) { - struct buffer_mem bmem; + struct buffer_mem bmem = { 0, }; size_t size; off_t offset; struct mem *m; @@ -1089,12 +1089,14 @@ client_node_port_use_buffers(void *object, bid->mem[bid->n_mem++] = bm2; - pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); + pw_log_debug(" data %d %u -> fd %d maxsize %d", + j, bm->id, bm->fd, d->maxsize); } else if (d->type == t->data.MemPtr) { int offs = SPA_PTR_TO_INT(d->data); d->data = SPA_MEMBER(bmem.map.ptr, offs, void); d->fd = -1; - pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); + pw_log_debug(" data %d %u -> mem %p maxsize %d", + j, bid->id, d->data, d->maxsize); } else { pw_log_warn("unknown buffer data type %d", d->type); } @@ -1215,6 +1217,7 @@ static void do_node_init(struct pw_proxy *proxy) struct node_data *data = proxy->user_data; struct pw_port *port; + pw_log_debug("%p: init", data); pw_client_node_proxy_update(data->node_proxy, PW_CLIENT_NODE_UPDATE_MAX_INPUTS | PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS | @@ -1300,14 +1303,21 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct node_data *data; int i; + if (remote->core_proxy == NULL) { + pw_log_error("node core proxy"); + return NULL; + } + proxy = pw_core_proxy_create_object(remote->core_proxy, "client-node", impl->type_client_node, PW_VERSION_CLIENT_NODE, &node->properties->dict, sizeof(struct node_data)); - if (proxy == NULL) + if (proxy == NULL) { + pw_log_error("failed to create proxy"); return NULL; + } data = pw_proxy_get_user_data(proxy); data->remote = remote; diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 6a44b0aed..d20c59584 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -140,6 +140,9 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct spa_ringbuffer_get_write_index(&queue->ring, &index); queue->ids[index & MASK_BUFFERS] = buffer->id; spa_ringbuffer_write_update(&queue->ring, index + 1); + + pw_log_trace("stream %p: queued buffer %d", stream, buffer->id); + return 0; } @@ -632,7 +635,6 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc pw_log_info("got buffer %d %d datas, mapped size %d", i, buffers[i]->n_datas, size); } - impl->n_buffers = n_buffers; if (impl->use_converter) { struct spa_data datas[1]; @@ -644,13 +646,16 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc n_buffers)) < 0) return res; + n_buffers = 6; + datas[0].type = t->data.MemPtr; - datas[0].maxsize = size * 4; + datas[0].maxsize = size; data_aligns[0] = 16; buffers = spa_buffer_alloc_array(n_buffers, 0, 0, NULL, - 1, datas, data_aligns); + 1, datas, + data_aligns); if (buffers == NULL) return -ENOMEM; @@ -663,6 +668,8 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc for (i = 0; i < n_buffers; i++) { struct buffer *b = &impl->buffers[i]; + b->flags = 0; + b->id = i; b->this.buffer = buffers[i]; if (impl->direction == SPA_DIRECTION_OUTPUT) @@ -672,6 +679,8 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc add_buffer, &b->this); } + impl->n_buffers = n_buffers; + if (n_buffers > 0) stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); else @@ -725,11 +734,13 @@ static int impl_node_process_output(struct spa_node *node) struct pw_stream *stream = &impl->this; struct spa_io_buffers *io = impl->io; struct buffer *b; - int res = 0; - bool do_call = true; + int res; + uint32_t index; + again: pw_log_trace("stream %p: process out %d %d", stream, io->status, io->buffer_id); + res = 0; if (io->status != SPA_STATUS_HAVE_BUFFER) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { @@ -739,22 +750,30 @@ static int impl_node_process_output(struct spa_node *node) if ((b = pop_queue(impl, &impl->queued)) != NULL) { io->buffer_id = b->id; io->status = SPA_STATUS_HAVE_BUFFER; - pw_log_trace("stream %p: pop %d %s", stream, b->id, spa_strerror(res)); + pw_log_trace("stream %p: pop %d", stream, b->id); } else { io->buffer_id = SPA_ID_INVALID; io->status = SPA_STATUS_NEED_BUFFER; + pw_log_trace("stream %p: no more buffers", stream); } } if (io->status == SPA_STATUS_HAVE_BUFFER && impl->use_converter) { res = spa_node_process(impl->convert); - if (io->status == SPA_STATUS_HAVE_BUFFER) - do_call = false; + if (SPA_FLAG_CHECK(res, SPA_STATUS_NEED_BUFFER)) + call_process(impl); + + if (!SPA_FLAG_CHECK(res, SPA_STATUS_HAVE_BUFFER)) + goto again; + } else { + call_process(impl); + if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) + goto again; } - if (do_call) - call_process(impl); + pw_log_trace("stream %p: res %d", stream, res); - return SPA_STATUS_HAVE_BUFFER; + + return res; } static const struct spa_node impl_node = { @@ -823,7 +842,7 @@ static int handle_connect(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_log_debug("stream %p: creating proxy", stream); + pw_log_debug("stream %p: creating node", stream); impl->node = pw_node_new(impl->core, "export-source", pw_properties_copy(stream->properties), 0); @@ -839,6 +858,7 @@ static int handle_connect(struct pw_stream *stream) pw_node_register(impl->node, NULL, NULL, NULL); pw_node_set_active(impl->node, true); + pw_log_debug("stream %p: export node %p", stream, impl->node); pw_remote_export(stream->remote, impl->node); return 0; @@ -1093,6 +1113,7 @@ pw_stream_connect(struct pw_stream *stream, uint32_t n_params) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + enum pw_remote_state state; int res; pw_log_debug("stream %p: connect", stream); @@ -1112,7 +1133,9 @@ pw_stream_connect(struct pw_stream *stream, pw_properties_set(stream->properties, PW_NODE_PROP_AUTOCONNECT, "1"); pw_properties_set(stream->properties, "node.stream", "1"); - if (pw_remote_get_state(stream->remote, NULL) == PW_REMOTE_STATE_UNCONNECTED) + state = pw_remote_get_state(stream->remote, NULL); + if (state == PW_REMOTE_STATE_UNCONNECTED || + state == PW_REMOTE_STATE_ERROR) res = pw_remote_connect(stream->remote); else res = handle_connect(stream); @@ -1156,7 +1179,8 @@ void pw_stream_finish_format(struct pw_stream *stream, int pw_stream_set_active(struct pw_stream *stream, bool active) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - pw_node_set_active(impl->node, active); + if (impl->node) + pw_node_set_active(impl->node, active); return 0; }