From d6654e84a74faa7936e23f5e8a2f066c0919319d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 4 Feb 2026 13:32:12 +0100 Subject: [PATCH] sendspin: add sendspin sender and receiver The sender makes an input stream for each connected client. This makes it easier to do the per client conversion using the adapter and send different channels to clients. The receiver uses linear regression to map ringbuffer indexes to server timestamps and server timestamps to client timestamps. It can then schedule playback against its own clock. --- spa/include/spa/utils/json-builder.h | 2 - src/modules/meson.build | 32 + src/modules/module-sendspin-recv.c | 1189 ++++++++++++++++++ src/modules/module-sendspin-send.c | 1389 ++++++++++++++++++++++ src/modules/module-sendspin/regress.h | 58 + src/modules/module-sendspin/sendspin.h | 27 + src/modules/module-sendspin/teeny-sha1.c | 201 ++++ src/modules/module-sendspin/websocket.c | 1060 +++++++++++++++++ src/modules/module-sendspin/websocket.h | 85 ++ src/modules/module-sendspin/zeroconf.c | 558 +++++++++ src/modules/module-sendspin/zeroconf.h | 45 + 11 files changed, 4644 insertions(+), 2 deletions(-) create mode 100644 src/modules/module-sendspin-recv.c create mode 100644 src/modules/module-sendspin-send.c create mode 100644 src/modules/module-sendspin/regress.h create mode 100644 src/modules/module-sendspin/sendspin.h create mode 100644 src/modules/module-sendspin/teeny-sha1.c create mode 100644 src/modules/module-sendspin/websocket.c create mode 100644 src/modules/module-sendspin/websocket.h create mode 100644 src/modules/module-sendspin/zeroconf.c create mode 100644 src/modules/module-sendspin/zeroconf.h diff --git a/spa/include/spa/utils/json-builder.h b/spa/include/spa/utils/json-builder.h index 883931bcd..6a4775c60 100644 --- a/spa/include/spa/utils/json-builder.h +++ b/spa/include/spa/utils/json-builder.h @@ -262,7 +262,6 @@ SPA_API_JSON_BUILDER void spa_json_builder_object_uint(struct spa_json_builder * snprintf(str, sizeof(str), "%" PRIu64, val); spa_json_builder_add_simple(b, key, INT_MAX, 'd', str, INT_MAX); } - SPA_API_JSON_BUILDER void spa_json_builder_object_double(struct spa_json_builder *b, const char *key, double val) { @@ -270,7 +269,6 @@ SPA_API_JSON_BUILDER void spa_json_builder_object_double(struct spa_json_builder spa_json_format_float(str, sizeof(str), (float)val); spa_json_builder_add_simple(b, key, INT_MAX, 'd', str, INT_MAX); } - SPA_API_JSON_BUILDER void spa_json_builder_object_string(struct spa_json_builder *b, const char *key, const char *val) { diff --git a/src/modules/meson.build b/src/modules/meson.build index 8636286e6..5a024be1a 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -700,6 +700,38 @@ pipewire_module_vban_recv = shared_library('pipewire-module-vban-recv', dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) + pipewire_module_sendspin_sources = [] + pipewire_module_sendspin_deps = [ mathlib, dl_lib, rt_lib, pipewire_dep ] + +if avahi_dep.found() + pipewire_module_sendspin_sources += [ + 'module-sendspin/zeroconf.c', + 'module-zeroconf-discover/avahi-poll.c', + ] + pipewire_module_sendspin_deps += avahi_dep +endif + +pipewire_module_sendspin_recv = shared_library('pipewire-module-sendspin-recv', + [ 'module-sendspin-recv.c', + 'module-sendspin/websocket.c', + pipewire_module_sendspin_sources ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : pipewire_module_sendspin_deps, +) +pipewire_module_sendspin_send = shared_library('pipewire-module-sendspin-send', + [ 'module-sendspin-send.c', + 'module-sendspin/websocket.c', + pipewire_module_sendspin_sources ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : pipewire_module_sendspin_deps, +) + build_module_roc = roc_dep.found() if build_module_roc pipewire_module_roc_sink = shared_library('pipewire-module-roc-sink', diff --git a/src/modules/module-sendspin-recv.c b/src/modules/module-sendspin-recv.c new file mode 100644 index 000000000..7e71a1397 --- /dev/null +++ b/src/modules/module-sendspin-recv.c @@ -0,0 +1,1189 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "module-sendspin/sendspin.h" +#include "module-sendspin/zeroconf.h" +#include "module-sendspin/websocket.h" +#include "module-sendspin/regress.h" +#include "network-utils.h" + +/** \page page_module_sendspin_recv sendspin receiver + * + * The `sendspin-recv` module creates a PipeWire source that receives audio + * packets using the sendspin protocol. + * + * The receive will listen on a specific port (8928) and create a stream for the + * data on the port. + * + * ## Module Name + * + * `libpipewire-module-sendspin-recv` + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `local.ifname = `: interface name to use + * - `source.ip = `: the source ip address to listen on, default 127.0.0.1 + * - `source.port = `: the source port to listen on, default 8928 + * - `source.path = `: the path to listen on, default "/sendspin" + * - `sendspin.ip`: the IP address of the sendspin server + * - `sendspin.port`: the port of the sendspin server, default 8927 + * - `sendspin.path`: the path on the sendspin server, default "/sendspin" + * - `sendspin.client-id`: the client id, default "pipewire-$(hostname)" + * - `sendspin.client-name`: the client name, default "$(hostname)" + * - `node.always-process = `: true to receive even when not running + * - `stream.props = {}`: properties to be passed to all the stream + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_REMOTE_NAME + * - \ref SPA_KEY_AUDIO_LAYOUT + * - \ref SPA_KEY_AUDIO_POSITION + * - \ref PW_KEY_MEDIA_NAME + * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL + * + * ## Example configuration + *\code{.unparsed} + * # ~/.config/pipewire/pipewire.conf.d/my-sendspin-recv.conf + * + * context.modules = [ + * { name = libpipewire-module-sendspin-recv + * args = { + * #local.ifname = eth0 + * #source.ip = 127.0.0.1 + * #source.port = 8928 + * #source.path = "/sendspin" + * #sendspin.ip = 127.0.0.1 + * #sendspin.port = 8927 + * #sendspin.path = "/sendspin" + * #sendspin.client-id = "pipewire-test" + * #node.always-process = false + * #audio.position = [ FL FR ] + * stream.props = { + * #media.class = "Audio/Source" + * #node.name = "sendspin-receiver" + * } + * } + * } + * ] + *\endcode + * + * \since 1.6.0 + */ + +#define NAME "sendspin-recv" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define DEFAULT_SOURCE_IP "127.0.0.1" +#define DEFAULT_SOURCE_PORT PW_SENDSPIN_DEFAULT_CLIENT_PORT +#define DEFAULT_SOURCE_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_SERVER_PORT PW_SENDSPIN_DEFAULT_SERVER_PORT +#define DEFAULT_SENDSPIN_PATH PW_SENDSPIN_DEFAULT_PATH + + +#define DEFAULT_POSITION "[ FL FR ]" + +#define USAGE "( local.ifname= ) " \ + "( source.ip= ) " \ + "( source.port= " \ + "( audio.position= ) " \ + "( stream.props= { key=value ... } ) " + +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "sendspin Receiver" }, + { PW_KEY_MODULE_USAGE, USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct stream { + struct impl *impl; + struct spa_list link; + + struct pw_websocket_connection *conn; + struct spa_hook conn_listener; + + struct spa_audio_info info; + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct pw_timer timer; + int timeout_count; + + uint32_t stride; + struct spa_ringbuffer ring; + void *buffer; + uint32_t buffer_size; + +#define ROLE_PLAYER (1<<0) +#define ROLE_METADATA (1<<1) + uint32_t active_roles; +#define REASON_DISCOVERY (0) +#define REASON_PLAYBACK (1) + uint32_t connection_reason; + + struct spa_regress regress_index; + struct spa_regress regress_time; + + bool resync; + struct spa_dll dll; +}; + +struct impl { + struct pw_impl_module *module; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *context; + + struct pw_loop *main_loop; + struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; + + struct pw_zeroconf *zeroconf; + struct spa_hook zeroconf_listener; + + bool always_process; + + struct pw_properties *stream_props; + + struct pw_websocket *websocket; + struct spa_hook websocket_listener; + + struct spa_list streams; +}; + +static void on_stream_destroy(void *d) +{ + struct stream *stream = d; + spa_hook_remove(&stream->stream_listener); + stream->stream = NULL; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct stream *stream = d; + switch (state) { + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + pw_impl_module_schedule_destroy(stream->impl->module); + break; + case PW_STREAM_STATE_PAUSED: + case PW_STREAM_STATE_STREAMING: + break; + default: + break; + } +} + +static void on_capture_stream_process(void *d) +{ + struct stream *stream = d; + struct pw_buffer *b; + struct spa_buffer *buf; + uint8_t *p; + uint32_t index = 0, n_frames, n_bytes; + int32_t avail, stride; + struct pw_time ts; + double err, corr, target, current_time; + + if ((b = pw_stream_dequeue_buffer(stream->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + buf = b->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + stride = stream->stride; + n_frames = buf->datas[0].maxsize / stride; + if (b->requested) + n_frames = SPA_MIN(b->requested, n_frames); + n_bytes = n_frames * stride; + + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + if (stream->timeout_count > 4 && stream->timeout_count > 4) { + pw_stream_get_time_n(stream->stream, &ts, sizeof(ts)); + + /* index to server time */ + target = spa_regress_calc_y(&stream->regress_index, index); + /* server time to client time */ + target = spa_regress_calc_y(&stream->regress_time, target); + + current_time = ts.now / 1000.0; + current_time -= (ts.buffered * 1000000.0 / stream->info.info.raw.rate) + + ((ts.delay) * 1000000.0 * ts.rate.num / ts.rate.denom); + err = target - (double)current_time; + + if (stream->resync) { + if (target < current_time) { + target = spa_regress_calc_x(&stream->regress_time, current_time); + index = (uint32_t)spa_regress_calc_x(&stream->regress_index, target); + index = SPA_ROUND_DOWN(index, stride); + + pw_log_info("resync %u %f %f %f", index, target, + current_time, target - current_time); + + spa_ringbuffer_read_update(&stream->ring, index); + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + err = 0.0; + stream->resync = false; + } else { + avail = 0; + } + } + } else { + avail = 0; + } + if (avail < (int32_t)n_bytes) { + avail = 0; + stream->resync = true; + } + else if (avail > (int32_t)stream->buffer_size) { + index += avail - stream->buffer_size; + avail = stream->buffer_size; + stream->resync = true; + } + if (avail > 0) { + n_bytes = SPA_MIN(n_bytes, (uint32_t)avail); + + corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -1000, 1000)); + + pw_log_trace("%u %f %f %f %f", index, current_time, target, err, corr); + + pw_stream_set_rate(stream->stream, 1.0 / corr); + + spa_ringbuffer_read_data(&stream->ring, + stream->buffer, stream->buffer_size, + index % stream->buffer_size, + p, n_bytes); + spa_ringbuffer_read_update(&stream->ring, index + n_bytes); + } else { + memset(p, 0, n_bytes); + } + + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = stride; + buf->datas[0].chunk->size = n_bytes; + + pw_stream_queue_buffer(stream->stream, b); +} + +static const struct pw_stream_events capture_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .state_changed = on_stream_state_changed, + .process = on_capture_stream_process +}; + +static int create_stream(struct stream *stream) +{ + struct impl *impl = stream->impl; + int res; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + const char *server_id, *ip, *port, *server_name; + struct pw_properties *props = pw_properties_copy(impl->stream_props); + + ip = pw_properties_get(impl->props, "sendspin.ip"); + port = pw_properties_get(impl->props, "sendspin.port"); + server_id = pw_properties_get(props, "sendspin.server-id"); + server_name = pw_properties_get(props, "sendspin.server-name"); + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "sendspin.%s.%s.%s", server_id, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "Sendspin from %s", server_name); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "Sendspin from %s", server_name); + + stream->stream = pw_stream_new(impl->core, "sendspin receiver", props); + if (stream->stream == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + stream->buffer_size = 1024 * 1024; + stream->buffer = calloc(1, stream->buffer_size * stream->stride); + + pw_stream_add_listener(stream->stream, + &stream->stream_listener, + &capture_stream_events, stream); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &stream->info); + + if ((res = pw_stream_connect(stream->stream, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static void add_format(struct spa_json_builder *b, const char *codec, int channels, int rate, int depth) +{ + spa_json_builder_array_push(b, "{"); + spa_json_builder_object_string(b, "codec", codec); + spa_json_builder_object_int(b, "channels", channels); + spa_json_builder_object_int(b, "sample_rate", rate); + spa_json_builder_object_int(b, "bit_depth", depth); + spa_json_builder_pop(b, "}"); +} +static void add_playerv1_support(struct stream *stream, struct spa_json_builder *b) +{ + spa_json_builder_object_push(b, "player@v1_support", "{"); + spa_json_builder_object_push(b, "supported_formats", "["); + add_format(b, "pcm", 2, 48000, 16); + add_format(b, "pcm", 1, 48000, 16); + spa_json_builder_pop(b, "]"); + spa_json_builder_object_int(b, "buffer_capacity", 32000000); + spa_json_builder_object_push(b, "supported_commands", "["); + spa_json_builder_array_string(b, "volume"); + spa_json_builder_array_string(b, "mute"); + spa_json_builder_pop(b, "]"); + spa_json_builder_pop(b, "}"); +} +static int send_client_hello(struct stream *stream) +{ + struct impl *impl = stream->impl; + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/hello"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "client_id", pw_properties_get(impl->props, "sendspin.client-id")); + spa_json_builder_object_string(&b, "name", pw_properties_get(impl->props, "sendspin.client-name")); + spa_json_builder_object_int(&b, "version", 1); + spa_json_builder_object_push(&b, "supported_roles", "["); + spa_json_builder_array_string(&b, "player@v1"); + spa_json_builder_array_string(&b, "metadata@v1"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_object_push(&b, "device_info", "{"); + spa_json_builder_object_string(&b, "product_name", "Linux"); /* Use os-release */ + spa_json_builder_object_stringf(&b, "software_version", "PipeWire %s", pw_get_library_version()); + spa_json_builder_pop(&b, "}"); + add_playerv1_support(stream, &b); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + + return res; +} + +static int send_client_state(struct stream *stream) +{ + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/state"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "player", "{"); + spa_json_builder_object_string(&b, "state", "synchronized"); + spa_json_builder_object_int(&b, "volume", 100); + spa_json_builder_object_bool(&b, "muted", false); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + return res; +} + +static uint64_t get_time_us(struct stream *stream) +{ + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) + return 0; + return SPA_TIMESPEC_TO_USEC(&now); +} + +static int send_client_time(struct stream *stream) +{ + struct spa_json_builder b; + int res; + uint64_t now; + char *mem; + size_t size; + + now = get_time_us(stream); + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/time"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_uint(&b, "client_transmitted", now); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + return res; +} + +static void do_stream_timer(void *data) +{ + struct stream *stream = data; + send_client_time(stream); +} + +#if 0 +static int send_client_command(struct stream *stream) +{ + return 0; +} +#endif +static int send_client_goodbye(struct stream *stream, const char *reason) +{ + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/goodbye"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "reason", reason); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + pw_websocket_connection_disconnect(stream->conn, true); + free(mem); + return res; +} + +#if 0 +static int send_stream_request_format(struct stream *stream) +{ + return 0; +} +#endif + +static int handle_server_hello(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + struct spa_json it[1]; + char key[256], *t; + const char *v; + int l, version = 0; + struct stream *s, *st; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "server_id")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(impl->stream_props, "sendspin.server-id", t); + } + else if (spa_streq(key, "name")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(impl->stream_props, "sendspin.server-name", t); + } + else if (spa_streq(key, "version")) { + if (spa_json_parse_int(v, l, &version) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "active_roles")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + + spa_json_enter(payload, &it[0]); + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "player@v1")) + stream->active_roles |= ROLE_PLAYER; + else if (spa_streq(t, "metadata@v1")) + stream->active_roles |= ROLE_METADATA; + } + } + else if (spa_streq(key, "connection_reason")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "discovery")) + stream->connection_reason = REASON_DISCOVERY; + else if (spa_streq(t, "playback")) + stream->connection_reason = REASON_PLAYBACK; + + pw_properties_set(impl->stream_props, "sendspin.connection-reason", t); + } + } + if (version != 1) + return -ENOTSUP; + + if (stream->connection_reason == REASON_PLAYBACK) { + /* keep this server, destroy others */ + spa_list_for_each_safe(s, st, &impl->streams, link) { + if (s == stream) + continue; + send_client_goodbye(s, "another_server"); + } + } else { + /* keep other servers, destroy this one */ + spa_list_for_each_safe(s, st, &impl->streams, link) { + if (s == stream) + continue; + return send_client_goodbye(stream, "another_server"); + } + } + return send_client_state(stream); +} + +static int handle_server_state(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +static int parse_uint64(const char *val, int len, uint64_t *result) +{ + char buf[64]; + char *end; + + if (len <= 0 || len >= (int)sizeof(buf)) + return 0; + + memcpy(buf, val, len); + buf[len] = '\0'; + + *result = strtoull(buf, &end, 0); + return len > 0 && end == buf + len; +} + +static int handle_server_time(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + char key[256]; + const char *v; + int l; + uint64_t t1 = 0, t2 = 0, t3 = 0, t4 = 0, timeout; + + t4 = get_time_us(stream); + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_transmitted")) { + if (parse_uint64(v, l, &t1) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "server_received")) { + if (parse_uint64(v, l, &t2) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "server_transmitted")) { + if (parse_uint64(v, l, &t3) <= 0) + return -EINVAL; + } + } + + spa_regress_update(&stream->regress_time, (t2+t3)/2, (t1+t4)/2); + + if (stream->timeout_count < 4) + timeout = 200 * SPA_MSEC_PER_SEC; + else if (stream->timeout_count < 10) + timeout = SPA_NSEC_PER_SEC; + else if (stream->timeout_count < 20) + timeout = 2 * SPA_NSEC_PER_SEC; + else + timeout = 5 * SPA_NSEC_PER_SEC; + + stream->timeout_count++; + pw_timer_queue_add(impl->timer_queue, &stream->timer, + &stream->timer.timeout, timeout, + do_stream_timer, stream); + return 0; +} + +static int handle_server_command(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +/* {"codec":"pcm","sample_rate":44100,"channels":2,"bit_depth":16} */ +static int parse_player(struct stream *stream, struct spa_json *player) +{ + char key[256], codec[64] = ""; + const char *v; + int l, sample_rate = 0, channels = 0, bit_depth = 0; + + spa_zero(stream->info); + stream->info.media_type = SPA_MEDIA_TYPE_audio; + while ((l = spa_json_object_next(player, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "codec")) { + if (spa_json_parse_stringn(v, l, codec, sizeof(codec)) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "sample_rate")) { + if (spa_json_parse_int(v, l, &sample_rate) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "channels")) { + if (spa_json_parse_int(v, l, &channels) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "bit_depth")) { + if (spa_json_parse_int(v, l, &bit_depth) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "codec_header")) { + } + } + if (sample_rate == 0 || channels == 0) + return -EINVAL; + + if (spa_streq(codec, "pcm")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + stream->info.info.raw.rate = sample_rate; + stream->info.info.raw.channels = channels; + switch (bit_depth) { + case 16: + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S16_LE; + stream->stride = 2 * channels; + break; + case 24: + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_LE; + stream->stride = 3 * channels; + break; + default: + return -EINVAL; + } + } + else if (spa_streq(codec, "opus")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; + stream->info.info.opus.rate = sample_rate; + stream->info.info.opus.channels = channels; + } + else if (spa_streq(codec, "flac")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_flac; + stream->info.info.flac.rate = sample_rate; + stream->info.info.flac.channels = channels; + } + else + return -EINVAL; + + spa_dll_set_bw(&stream->dll, SPA_DLL_BW_MIN, 1000, sample_rate); + + return 0; +} + +/* {"player":{}} */ +static int handle_stream_start(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + parse_player(stream, &it[0]); + } + } + + if (stream->stream == NULL) { + create_stream(stream); + + pw_timer_queue_cancel(&stream->timer); + pw_timer_queue_add(impl->timer_queue, &stream->timer, + NULL, 0, do_stream_timer, stream); + } else { + } + + return 0; +} + +static void stream_clear(struct stream *stream) +{ + spa_ringbuffer_init(&stream->ring); + memset(stream->buffer, 0, stream->buffer_size); +} + +static int handle_stream_clear(struct stream *stream, struct spa_json *payload) +{ + stream_clear(stream); + return 0; +} +static int handle_stream_end(struct stream *stream, struct spa_json *payload) +{ + if (stream->stream != NULL) { + pw_stream_destroy(stream->stream); + stream->stream = NULL; + stream_clear(stream); + } + return 0; +} + +static int handle_group_update(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +/* { "type":... "payload":{...} } */ +static int do_parse_text(struct stream *stream, const char *content, int size) +{ + struct spa_json it[2], *payload = NULL; + char key[256], type[256] = ""; + const char *v; + int res, l; + + pw_log_info("received text %.*s", size, content); + + if (spa_json_begin_object(&it[0], content, size) <= 0) + return -EINVAL; + + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "payload")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + payload = &it[1]; + } + else if (spa_streq(key, "type")) { + if (spa_json_parse_stringn(v, l, type, sizeof(type)) <= 0) + continue; + } + } + if (spa_streq(type, "server/hello")) + res = handle_server_hello(stream, payload); + else if (spa_streq(type, "server/state")) + res = handle_server_state(stream, payload); + else if (spa_streq(type, "server/time")) + res = handle_server_time(stream, payload); + else if (spa_streq(type, "server/command")) + res = handle_server_command(stream, payload); + else if (spa_streq(type, "stream/start")) + res = handle_stream_start(stream, payload); + else if (spa_streq(type, "stream/end")) + res = handle_stream_end(stream, payload); + else if (spa_streq(type, "stream/clear")) + res = handle_stream_clear(stream, payload); + else if (spa_streq(type, "group/update")) + res = handle_group_update(stream, payload); + else + res = 0; + + return res; +} + +static int do_handle_binary(struct stream *stream, const uint8_t *payload, int size) +{ + struct impl *impl = stream->impl; + int32_t filled; + uint32_t index, length = size - 9; + uint64_t timestamp; + + if (payload[0] != 4 || stream->stream == NULL) + return 0; + + timestamp = ((uint64_t)payload[1]) << 56; + timestamp |= ((uint64_t)payload[2]) << 48; + timestamp |= ((uint64_t)payload[3]) << 40; + timestamp |= ((uint64_t)payload[4]) << 32; + timestamp |= ((uint64_t)payload[5]) << 24; + timestamp |= ((uint64_t)payload[6]) << 16; + timestamp |= ((uint64_t)payload[7]) << 8; + timestamp |= ((uint64_t)payload[8]); + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (filled < 0) { + pw_log_warn("%p: underrun write:%u filled:%d", + stream, index, filled); + } else if (filled + length > stream->buffer_size) { + pw_log_debug("%p: overrun write:%u filled:%d", + stream, index, filled); + } + + spa_ringbuffer_write_data(&stream->ring, + stream->buffer, stream->buffer_size, + index % stream->buffer_size, + &payload[9], length); + + spa_ringbuffer_write_update(&stream->ring, index + length); + + pw_loop_lock(impl->data_loop); + spa_regress_update(&stream->regress_index, index, timestamp); + pw_loop_unlock(impl->data_loop); + + return 0; +} + +static void on_connection_message(void *data, int opcode, void *payload, size_t size) +{ + struct stream *stream = data; + if (opcode == PW_WEBSOCKET_OPCODE_TEXT) { + do_parse_text(stream, payload, size); + } else if (opcode == PW_WEBSOCKET_OPCODE_BINARY) { + do_handle_binary(stream, payload, size); + } else { + pw_log_warn("%02x unknown %08x", opcode, (int)size); + } +} + +static void stream_destroy(struct stream *stream) +{ + handle_stream_end(stream, NULL); + if (stream->conn) { + spa_hook_remove(&stream->conn_listener); + pw_websocket_connection_destroy(stream->conn); + } + pw_timer_queue_cancel(&stream->timer); + spa_list_remove(&stream->link); + free(stream->buffer); + free(stream); +} + +static void on_connection_destroy(void *data) +{ + struct stream *stream = data; + stream->conn = NULL; + pw_log_info("connection %p destroy", stream); +} +static void on_connection_error(void *data, int res, const char *reason) +{ + struct stream *stream = data; + pw_log_error("connection %p error %d %s", stream, res, reason); +} + +static void on_connection_disconnected(void *data) +{ + struct stream *stream = data; + stream_destroy(stream); +} + +static const struct pw_websocket_connection_events websocket_connection_events = { + PW_VERSION_WEBSOCKET_CONNECTION_EVENTS, + .destroy = on_connection_destroy, + .error = on_connection_error, + .disconnected = on_connection_disconnected, + .message = on_connection_message, +}; + +static struct stream *stream_new(struct impl *impl, struct pw_websocket_connection *conn) +{ + struct stream *stream; + + stream = calloc(1, sizeof(*stream)); + if (stream == NULL) + return NULL; + + stream->impl = impl; + spa_list_append(&impl->streams, &stream->link); + + stream->conn = conn; + pw_websocket_connection_add_listener(stream->conn, &stream->conn_listener, + &websocket_connection_events, stream); + + spa_regress_init(&stream->regress_index, 5); + spa_regress_init(&stream->regress_time, 5); + + spa_dll_init(&stream->dll); + stream->resync = true; + + return stream; +} + +static void on_websocket_connected(void *data, void *user, + struct pw_websocket_connection *conn, const char *path) +{ + struct impl *impl = data; + struct stream *stream; + pw_log_info("connected to %s", path); + stream = stream_new(impl, conn); + send_client_hello(stream); +} + +static const struct pw_websocket_events websocket_events = { + PW_VERSION_WEBSOCKET_EVENTS, + .connected = on_websocket_connected, +}; + +static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info) +{ +} + +static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info) +{ +} + +static const struct pw_zeroconf_events zeroconf_events = { + PW_VERSION_ZEROCONF_EVENTS, + .added = on_zeroconf_added, + .removed = on_zeroconf_removed, +}; + +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct stream *s; + + spa_list_consume(s, &impl->streams, link) + stream_destroy(s); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + if (impl->data_loop) + pw_context_release_loop(impl->context, impl->data_loop); + + pw_properties_free(impl->stream_props); + pw_properties_free(impl->props); + + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct impl *impl; + const char *str, *hostname, *port, *path; + struct pw_properties *props, *stream_props; + int res = 0; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + + impl->module = module; + impl->context = context; + impl->main_loop = pw_context_get_main_loop(context); + impl->data_loop = pw_context_acquire_loop(context, &props->dict); + impl->timer_queue = pw_context_get_timer_queue(context); + spa_list_init(&impl->streams); + + pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_NODE_LOOP_NAME); + copy_props(impl, props, SPA_KEY_AUDIO_LAYOUT); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + if ((str = pw_properties_get(props, "sendspin.client-name")) == NULL) + pw_properties_set(props, "sendspin.client-name", pw_get_host_name()); + if ((str = pw_properties_get(props, "sendspin.client-id")) == NULL) + pw_properties_setf(props, "sendspin.client-id", "pipewire-%s", pw_get_host_name()); + + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(impl->context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)impl->core, + &impl->core_proxy_listener, + &core_proxy_events, impl); + pw_core_add_listener(impl->core, + &impl->core_listener, + &core_events, impl); + + impl->websocket = pw_websocket_new(impl->main_loop, &props->dict); + pw_websocket_add_listener(impl->websocket, &impl->websocket_listener, + &websocket_events, impl); + + if ((impl->zeroconf = pw_zeroconf_new(context, NULL)) != NULL) { + pw_zeroconf_add_listener(impl->zeroconf, &impl->zeroconf_listener, + &zeroconf_events, impl); + } + + hostname = pw_properties_get(props, "sendspin.ip"); + if (hostname != NULL) { + port = pw_properties_get(props, "sendspin.port"); + if (port == NULL) + port = SPA_STRINGIFY(DEFAULT_SERVER_PORT); + if ((path = pw_properties_get(props, "sendspin.path")) == NULL) + path = DEFAULT_SENDSPIN_PATH; + + pw_websocket_connect(impl->websocket, NULL, hostname, port, path); + } else { + if ((hostname = pw_properties_get(props, "source.ip")) == NULL) + hostname = DEFAULT_SOURCE_IP; + if ((port = pw_properties_get(props, "source.port")) == NULL) + port = SPA_STRINGIFY(DEFAULT_SOURCE_PORT); + if ((path = pw_properties_get(props, "source.path")) == NULL) + path = DEFAULT_SOURCE_PATH; + + pw_websocket_listen(impl->websocket, NULL, hostname, port, path); + + if (impl->zeroconf) { + str = pw_properties_get(props, "sendspin.client-id"); + pw_zeroconf_set_announce(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_CLIENT_SERVICE), + SPA_DICT_ITEM("zeroconf.session", str), + SPA_DICT_ITEM("zeroconf.port", port), + SPA_DICT_ITEM("path", path))); + } + } + + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_info)); + + pw_log_info("Successfully loaded module-sendspin-recv"); + + return 0; +out: + impl_destroy(impl); + return res; +} diff --git a/src/modules/module-sendspin-send.c b/src/modules/module-sendspin-send.c new file mode 100644 index 000000000..9bc7195ce --- /dev/null +++ b/src/modules/module-sendspin-send.c @@ -0,0 +1,1389 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "module-sendspin/sendspin.h" +#include "module-sendspin/websocket.h" +#include "module-sendspin/zeroconf.h" +#include "network-utils.h" + +/** \page page_module_sendspin_send sendspin sender + * + * The `sendspin-send` module creates a PipeWire sink that sends audio + * packets using the sendspin protocol to a client. + * + * The sender will listen on a specific port (8927) and create a stream for + * each connection. + * + * In combination with a virtual sink, each of the client streams can be sent + * the same data in the client specific format. + * + * ## Module Name + * + * `libpipewire-module-sendspin-send` + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `local.ifname = `: interface name to use + * - `local.ifaddress = `: interface address to use + * - `source.ip = `: the source ip address to listen on, default "127.0.0.1" + * - `source.port = `: the source port to listen on, default 8927 + * - `source.path = `: comma separated list of paths to listen on, + * default "/sendspin" + * - `sendspin.ip`: an array of IP addresses of sendspin clients to connect to + * - `sendspin.port`: the port of the sendspin client to connect to, default 8928 + * - `sendspin.path`: the path of the sendspin client to connect to, default "/sendspin" + * - `sendspin.group-id`: the group-id of the server, default random + * - `sendspin.group-name`: the group-name of the server, default "PipeWire" + * - `sendspin.delay`: the delay to add to clients in seconds. Default 5.0 + * - `node.always-process = `: true to send silence even when not connected. + * - `stream.props = {}`: properties to be passed to all the stream + * - `stream.rules` = : match rules, use the create-stream action to + * make a stream for the client. + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_REMOTE_NAME + * - \ref SPA_KEY_AUDIO_LAYOUT + * - \ref SPA_KEY_AUDIO_POSITION + * - \ref PW_KEY_MEDIA_NAME + * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL + * + * ## Example configuration + *\code{.unparsed} + * # ~/.config/pipewire/pipewire.conf.d/my-sendspin-send.conf + * + * context.modules = [ + * { name = libpipewire-module-sendspin-send + * args = { + * #local.ifname = eth0 + * #source.ip = 127.0.0.1 + * #source.port = 8927 + * #source.path = "/sendspin" + * #sendspin.ip = [ 127.0.0.1 ] + * #sendspin.port = 8928 + * #sendspin.path = "/sendspin" + * #sendspin.group-id = "abcded" + * #sendspin.group-name = "PipeWire" + * #sendspin.delay = 5.0 + * #node.always-process = false + * #audio.position = [ FL FR ] + * stream.props = { + * #media.class = "Audio/sink" + * #node.name = "sendspin-send" + * } + * stream.rules = [ + * { matches = [ + * { sendspin.ip = "~.*" + * #sendspin.port = 8928 + * #sendspin.path = "/sendspin" + * #zeroconf.ifindex = 0 + * #zeroconf.name = "" + * #zeroconf.type = "_sendspin._tcp" + * #zeroconf.domain = "local" + * #zeroconf.hostname = "" + * } + * ] + * actions = { + * create-stream = { + * stream.props = { + * #target.object = "" + * #media.class = "Audio/Sink" + * } + * } + * } + * } + * ] + * } + * } + * ] + *\endcode + * + * \since 1.6.0 + */ + +#define NAME "sendspin-send" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define DEFAULT_SOURCE_IP "127.0.0.1" +#define DEFAULT_SOURCE_PORT PW_SENDSPIN_DEFAULT_SERVER_PORT +#define DEFAULT_SOURCE_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_CLIENT_PORT PW_SENDSPIN_DEFAULT_CLIENT_PORT +#define DEFAULT_SENDSPIN_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_SENDSPIN_DELAY 5.0 + +#define DEFAULT_POSITION "[ FL FR ]" + +#define DEFAULT_CREATE_RULES \ + "[ { matches = [ { sendspin.ip = \"~.*\" } ] actions = { create-stream = { } } } ] " + +#define USAGE "( local.ifname= ) " \ + "( source.ip= ) " \ + "( source.port= " \ + "( audio.position= ) " \ + "( stream.props= { key=value ... } ) " + +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "Sendspin sender" }, + { PW_KEY_MODULE_USAGE, USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct client { + struct impl *impl; + struct spa_list link; + + char *name; + struct pw_properties *props; + + struct pw_websocket_connection *conn; + struct spa_hook conn_listener; + + struct spa_audio_info info; + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct spa_io_position *io_position; + struct pw_timer timer; + + uint64_t delay_usec; + uint32_t stride; + + int buffer_capacity; +#define ROLE_PLAYER (1<<0) +#define ROLE_METADATA (1<<1) + uint32_t supported_roles; +#define COMMAND_VOLUME (1<<0) +#define COMMAND_MUTE (1<<1) + uint32_t supported_commands; + + bool playing; +}; + +struct impl { + struct pw_impl_module *module; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *context; + + struct pw_loop *main_loop; + struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; + + struct pw_zeroconf *zeroconf; + struct spa_hook zeroconf_listener; + + float delay; + bool always_process; + + struct pw_properties *stream_props; + + struct pw_websocket *websocket; + struct spa_hook websocket_listener; + + struct spa_list clients; + +}; + +static int send_group_update(struct client *c, bool playing); +static int send_stream_start(struct client *c); +static int send_server_state(struct client *c); + +static void on_stream_destroy(void *d) +{ + struct client *c = d; + spa_hook_remove(&c->stream_listener); + c->stream = NULL; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct client *c = d; + switch (state) { + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + //pw_impl_module_schedule_destroy(c->impl->module); + break; + case PW_STREAM_STATE_PAUSED: + send_group_update(c, false); + break; + case PW_STREAM_STATE_STREAMING: + send_group_update(c, true); + break; + default: + break; + } +} + +static uint64_t get_time_us(struct client *c) +{ + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) + return 0; + return SPA_TIMESPEC_TO_USEC(&now); +} + +static void on_playback_stream_process(void *d) +{ + struct client *c = d; + struct pw_buffer *b; + struct spa_buffer *buf; + uint8_t *p; + struct iovec iov[2]; + uint8_t header[9]; + uint64_t timestamp; + + if ((b = pw_stream_dequeue_buffer(c->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + if (c->playing) { + buf = b->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + timestamp = c->io_position ? + c->io_position->clock.nsec / 1000 : + get_time_us(c); + timestamp += c->delay_usec; + + header[0] = 4; + header[1] = (timestamp >> 56) & 0xff; + header[2] = (timestamp >> 48) & 0xff; + header[3] = (timestamp >> 40) & 0xff; + header[4] = (timestamp >> 32) & 0xff; + header[5] = (timestamp >> 24) & 0xff; + header[6] = (timestamp >> 16) & 0xff; + header[7] = (timestamp >> 8) & 0xff; + header[8] = (timestamp ) & 0xff; + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + iov[1].iov_base = p; + iov[1].iov_len = buf->datas[0].chunk->size; + + pw_websocket_connection_send(c->conn, + PW_WEBSOCKET_OPCODE_BINARY, iov, 2); + } + pw_stream_queue_buffer(c->stream, b); +} + +static void +on_stream_param_changed(void *d, uint32_t id, const struct spa_pod *param) +{ + struct client *c = d; + + if (param == NULL) + return; + + switch (id) { + case SPA_PARAM_Format: + if (spa_format_audio_parse(param, &c->info) < 0) + return; + send_stream_start(c); + break; + case SPA_PARAM_Tag: + send_server_state(c); + break; + } +} + +static void on_stream_io_changed(void *d, uint32_t id, void *area, uint32_t size) +{ + struct client *c = d; + switch (id) { + case SPA_IO_Position: + c->io_position = area; + break; + } +} + +static const struct pw_stream_events playback_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .io_changed = on_stream_io_changed, + .state_changed = on_stream_state_changed, + .param_changed = on_stream_param_changed, + .process = on_playback_stream_process +}; + +static int create_stream(struct client *c) +{ + struct impl *impl = c->impl; + int res; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + const char *client_id, *ip, *port, *client_name; + struct pw_properties *props = pw_properties_copy(c->props); + + ip = pw_properties_get(props, "sendspin.ip"); + port = pw_properties_get(props, "sendspin.port"); + client_id = pw_properties_get(props, "sendspin.client-id"); + client_name = pw_properties_get(props, "sendspin.client-name"); + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "sendspin.%s.%s.%s", client_id, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "Sendspin to %s", client_name); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "Sendspin to %s", client_name); + + + c->stream = pw_stream_new(impl->core, "sendspin sender", props); + if (c->stream == NULL) + return -errno; + + pw_stream_add_listener(c->stream, + &c->stream_listener, + &playback_stream_events, c); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &c->info); + + if ((res = pw_stream_connect(c->stream, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static int send_server_hello(struct client *c) +{ + struct impl *impl = c->impl; + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/hello"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "server_id", pw_properties_get(impl->props, "sendspin.server-id")); + spa_json_builder_object_string(&b, "name", pw_properties_get(impl->props, "sendspin.server-name")); + spa_json_builder_object_int(&b, "version", 1); + spa_json_builder_object_push(&b, "active_roles", "["); + if (c->supported_roles & ROLE_PLAYER) + spa_json_builder_array_string(&b, "player@v1"); + if (c->supported_roles & ROLE_METADATA) + spa_json_builder_array_string(&b, "metadata@v1"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_object_string(&b, "connection_reason", "discovery"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + + return res; +} + +static int send_server_state(struct client *c) +{ + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + if (!SPA_FLAG_IS_SET(c->supported_roles, ROLE_METADATA)) + return 0; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/state"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "metadata", "{"); + spa_json_builder_object_uint(&b, "timestamp", get_time_us(c)); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +static int send_server_time(struct client *c, uint64_t t1, uint64_t t2) +{ + struct spa_json_builder b; + int res; + uint64_t t3; + size_t size; + char *mem; + + t3 = get_time_us(c); + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/time"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_uint(&b, "client_transmitted", t1); + spa_json_builder_object_uint(&b, "server_received", t2); + spa_json_builder_object_uint(&b, "server_transmitted", t3); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +#if 0 +static int send_server_command(struct client *c) +{ + return 0; +} +#endif + +static int send_stream_start(struct client *c) +{ + struct spa_json_builder b; + int res, channels, rate, depth = 0; + const char *codec; + size_t size; + char *mem; + + switch (c->info.media_subtype) { + case SPA_MEDIA_SUBTYPE_raw: + codec = "pcm"; + channels = c->info.info.raw.channels; + rate = c->info.info.raw.rate; + switch (c->info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16_LE: + depth = 16; + break; + case SPA_AUDIO_FORMAT_S24_LE: + depth = 24; + break; + default: + return -ENOTSUP; + } + break; + case SPA_MEDIA_SUBTYPE_opus: + codec = "opus"; + channels = c->info.info.opus.channels; + rate = c->info.info.opus.rate; + break; + case SPA_MEDIA_SUBTYPE_flac: + codec = "flac"; + channels = c->info.info.flac.channels; + rate = c->info.info.flac.rate; + break; + default: + return -ENOTSUP; + } + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "stream/start"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "player", "{"); + spa_json_builder_object_string(&b, "codec", codec); + spa_json_builder_object_int(&b, "channels", channels); + spa_json_builder_object_int(&b, "sample_rate", rate); + if (depth) + spa_json_builder_object_int(&b, "bit_depth", depth); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +#if 0 +static int send_stream_end(struct client *c) +{ + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "stream/end"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "roles", "["); + spa_json_builder_array_string(&b, "player"); + spa_json_builder_array_string(&b, "metadata"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} +#endif + +static int send_group_update(struct client *c, bool playing) +{ + struct impl *impl = c->impl; + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "group/update"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "playback_state", playing ? "playing" : "stopped"); + spa_json_builder_object_string(&b, "group_id", pw_properties_get(impl->props, "sendspin.group-id")); + spa_json_builder_object_string(&b, "group_name", pw_properties_get(impl->props, "sendspin.group-name")); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + c->playing = playing; + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +/* {"codec":"pcm","sample_rate":44100,"channels":2,"bit_depth":16} */ +static int parse_codec(struct client *c, struct spa_json *object, struct spa_audio_info *info) +{ + char key[256], codec[64] = ""; + const char *v; + int l, sample_rate = 0, channels = 0, bit_depth = 0; + + spa_zero(*info); + info->media_type = SPA_MEDIA_TYPE_audio; + while ((l = spa_json_object_next(object, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "codec")) { + if (spa_json_parse_stringn(v, l, codec, sizeof(codec)) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "sample_rate")) { + if (spa_json_parse_int(v, l, &sample_rate) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "channels")) { + if (spa_json_parse_int(v, l, &channels) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "bit_depth")) { + if (spa_json_parse_int(v, l, &bit_depth) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "codec_header")) { + } + } + if (sample_rate == 0 || channels == 0) + return -EINVAL; + + if (spa_streq(codec, "pcm")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_raw; + info->info.raw.rate = sample_rate; + info->info.raw.channels = channels; + switch (bit_depth) { + case 16: + info->info.raw.format = SPA_AUDIO_FORMAT_S16_LE; + break; + case 24: + info->info.raw.format = SPA_AUDIO_FORMAT_S24_LE; + break; + default: + return -EINVAL; + } + } + else if (spa_streq(codec, "opus")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_opus; + info->info.opus.rate = sample_rate; + info->info.opus.channels = channels; + } + else if (spa_streq(codec, "flac")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_flac; + info->info.flac.rate = sample_rate; + info->info.flac.channels = channels; + } + else + return -EINVAL; + + return 0; +} + +static int parse_player_v1_support(struct client *c, struct spa_json *payload) +{ + struct spa_json it[2]; + char key[256], *t; + const char *v; + int l, res; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "supported_formats")) { + int count = 0; + + if (!spa_json_is_array(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + + while ((l = spa_json_next(&it[0], &v)) > 0) { + struct spa_audio_info info; + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + if ((res = parse_codec(c, &it[1], &info)) < 0) + return res; + + if (count++ == 0) + c->info = info; + } + } + else if (spa_streq(key, "buffer_capacity")) { + if (spa_json_parse_int(v, l, &c->buffer_capacity) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "supported_commands")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + if (spa_streq(t, "volume")) + c->supported_commands |= COMMAND_VOLUME; + else if (spa_streq(t, "mute")) + c->supported_commands |= COMMAND_MUTE; + } + } + } + return 0; +} + +static int handle_client_hello(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256], *t; + const char *v; + int res, l, version = 0; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_id")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(c->props, "sendspin.client-id", t); + } + else if (spa_streq(key, "name")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(c->props, "sendspin.client-name", t); + } + else if (spa_streq(key, "version")) { + if (spa_json_parse_int(v, l, &version) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "supported_roles")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + + spa_json_enter(payload, &it[0]); + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "player@v1")) + c->supported_roles |= ROLE_PLAYER; + else if (spa_streq(t, "metadata@v1")) + c->supported_roles |= ROLE_METADATA; + } + } + else if (spa_streq(key, "player_support") || + spa_streq(key, "player@v1_support")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + if ((res = parse_player_v1_support(c, &it[0])) < 0) + return res; + } + } + if (version != 1) + return -ENOTSUP; + + return send_server_hello(c); +} + +static int handle_client_state(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "state")) { + } + else if (spa_streq(key, "volume")) { + } + else if (spa_streq(key, "mute")) { + } + } + } + } + if (c->stream == NULL) + create_stream(c); + return 0; +} + +static int parse_uint64(const char *val, int len, uint64_t *result) +{ + char buf[64]; + char *end; + + if (len <= 0 || len >= (int)sizeof(buf)) + return 0; + + memcpy(buf, val, len); + buf[len] = '\0'; + + *result = strtoull(buf, &end, 0); + return len > 0 && end == buf + len; +} + +static int handle_client_time(struct client *c, struct spa_json *payload) +{ + char key[256]; + const char *v; + int l; + uint64_t t1 = 0,t2; + + t2 = get_time_us(c); + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_transmitted")) { + if (parse_uint64(v, l, &t1) <= 0) + return -EINVAL; + } + } + if (t1 == 0) + return -EPROTO; + + return send_server_time(c, t1, t2); +} + +static int handle_client_command(struct client *c, struct spa_json *payload) +{ + return 0; +} + +/* {"player":{}} */ +static int handle_stream_request_format(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + parse_codec(c, &it[0], &c->info); + } + } + return 0; +} + +static int handle_client_goodbye(struct client *c, struct spa_json *payload) +{ + if (c->stream != NULL) { + pw_stream_destroy(c->stream); + c->stream = NULL; + } + return 0; +} + +/* { "type":... "payload":{...} } */ +static int do_parse_text(struct client *c, const char *content, int size) +{ + struct spa_json it[2], *payload = NULL; + char key[256], type[256] = ""; + const char *v; + int res, l; + + pw_log_info("received text %.*s", size, content); + + if (spa_json_begin_object(&it[0], content, size) <= 0) + return -EINVAL; + + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "payload")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + payload = &it[1]; + } + else if (spa_streq(key, "type")) { + if (spa_json_parse_stringn(v, l, type, sizeof(type)) <= 0) + continue; + } + } + if (spa_streq(type, "client/hello")) + res = handle_client_hello(c, payload); + else if (spa_streq(type, "client/state")) + res = handle_client_state(c, payload); + else if (spa_streq(type, "client/time")) + res = handle_client_time(c, payload); + else if (spa_streq(type, "client/command")) + res = handle_client_command(c, payload); + else if (spa_streq(type, "client/goodbye")) + res = handle_client_goodbye(c, payload); + else if (spa_streq(type, "stream/request-format")) + res = handle_stream_request_format(c, payload); + else + res = 0; + + return res; +} + +static void on_connection_message(void *data, int opcode, void *payload, size_t size) +{ + struct client *c = data; + if (opcode == PW_WEBSOCKET_OPCODE_TEXT) { + do_parse_text(c, payload, size); + } else { + pw_log_warn("%02x unknown %08x", opcode, (int)size); + } +} + +static void client_free(struct client *c) +{ + struct impl *impl = c->impl; + + handle_client_goodbye(c, NULL); + if (c->conn) { + spa_hook_remove(&c->conn_listener); + pw_websocket_connection_destroy(c->conn); + } else { + pw_websocket_cancel(impl->websocket, c); + } + pw_timer_queue_cancel(&c->timer); + spa_list_remove(&c->link); + free(c); +} + +static void on_connection_destroy(void *data) +{ + struct client *c = data; + c->conn = NULL; + pw_log_info("connection %p destroy", c); +} + +static void on_connection_error(void *data, int res, const char *reason) +{ + struct client *c = data; + pw_log_error("connection %p error %d %s", c, res, reason); +} + +static void on_connection_disconnected(void *data) +{ + struct client *c = data; + client_free(c); +} + +static const struct pw_websocket_connection_events websocket_connection_events = { + PW_VERSION_WEBSOCKET_CONNECTION_EVENTS, + .destroy = on_connection_destroy, + .error = on_connection_error, + .disconnected = on_connection_disconnected, + .message = on_connection_message, +}; + +static struct client *client_new(struct impl *impl, const char *name, struct pw_properties *props) +{ + struct client *c; + + if ((c = calloc(1, sizeof(*c))) == NULL) + goto error; + + c->impl = impl; + spa_list_append(&impl->clients, &c->link); + + c->props = props; + c->name = name ? strdup(name) : NULL; + c->delay_usec = (uint64_t)(impl->delay * SPA_USEC_PER_SEC); + + return c; +error: + pw_properties_free(props); + return NULL; +} + +static int client_connect(struct client *c) +{ + struct impl *impl = c->impl; + const char *addr, *port, *path; + addr = pw_properties_get(c->props, "sendspin.ip"); + port = pw_properties_get(c->props, "sendspin.port"); + path = pw_properties_get(c->props, "sendspin.path"); + return pw_websocket_connect(impl->websocket, c, addr, port, path); +} + +static void client_connected(struct client *c, struct pw_websocket_connection *conn) +{ + if (c->conn) { + spa_hook_remove(&c->conn_listener); + pw_websocket_connection_destroy(c->conn); + } + c->conn = conn; + if (conn) + pw_websocket_connection_add_listener(c->conn, &c->conn_listener, + &websocket_connection_events, c); +} + +static struct client *client_find(struct impl *impl, const char *name) +{ + struct client *c; + spa_list_for_each(c, &impl->clients, link) { + if (spa_streq(c->name, name)) + return c; + } + return NULL; +} + +struct match_info { + struct impl *impl; + const char *name; + struct pw_properties *props; + struct pw_websocket_connection *conn; + bool matched; +}; + +static int rule_matched(void *data, const char *location, const char *action, + const char *str, size_t len) +{ + struct match_info *i = data; + struct impl *impl = i->impl; + int res = 0; + + i->matched = true; + if (spa_streq(action, "create-stream")) { + struct client *c; + + pw_properties_update_string(i->props, str, len); + if ((c = client_new(impl, i->name, spa_steal_ptr(i->props))) == NULL) + return -errno; + if (i->conn) + client_connected(c, i->conn); + else + client_connect(c); + } + return res; +} + +static int match_client(struct impl *impl, const char *name, struct pw_properties *props, + struct pw_websocket_connection *conn) +{ + const char *str; + struct match_info minfo = { + .impl = impl, + .name = name, + .props = props, + .conn = conn, + }; + + if ((str = pw_properties_get(impl->props, "stream.rules")) == NULL) + str = DEFAULT_CREATE_RULES; + + pw_conf_match_rules(str, strlen(str), NAME, &props->dict, + rule_matched, &minfo); + + if (!minfo.matched) { + pw_log_info("unmatched client found %s", str); + if (conn) + pw_websocket_connection_destroy(conn); + pw_properties_free(props); + } + return minfo.matched; +} + +static void on_websocket_connected(void *data, void *user, + struct pw_websocket_connection *conn, const char *path) +{ + struct impl *impl = data; + struct client *c = user; + + pw_log_info("connected to %s", path); + if (c == NULL) { + struct sockaddr_storage addr; + char ip[128]; + uint16_t port = 0; + bool ipv4; + struct pw_properties *props; + + pw_websocket_connection_address(conn, + (struct sockaddr*)&addr, sizeof(addr)); + + props = pw_properties_copy(impl->stream_props); + if (pw_net_get_ip(&addr, ip, sizeof(ip), &ipv4, &port) >= 0) { + pw_properties_set(props, "sendspin.ip", ip); + pw_properties_setf(props, "sendspin.port", "%u", port); + } + pw_properties_set(props, "sendspin.path", path); + + match_client(impl, "", props, conn); + } else { + client_connected(c, conn); + } +} + +static const struct pw_websocket_events websocket_events = { + PW_VERSION_WEBSOCKET_EVENTS, + .connected = on_websocket_connected, +}; + +static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info) +{ + struct impl *impl = data; + const char *name, *addr, *port, *path; + struct client *c; + struct pw_properties *props; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if ((c = client_find(impl, name)) != NULL) + return; + + props = pw_properties_copy(impl->stream_props); + pw_properties_update(props, info); + + addr = spa_dict_lookup(info, "zeroconf.address"); + port = spa_dict_lookup(info, "zeroconf.port"); + path = spa_dict_lookup(info, "path"); + + pw_properties_set(props, "sendspin.ip", addr); + pw_properties_set(props, "sendspin.port", port); + pw_properties_set(props, "sendspin.path", path); + + match_client(impl, name, props, NULL); +} + +static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info) +{ + struct impl *impl = data; + const char *name; + struct client *c; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if ((c = client_find(impl, name)) == NULL) + return; + + client_free(c); +} + +static const struct pw_zeroconf_events zeroconf_events = { + PW_VERSION_ZEROCONF_EVENTS, + .added = on_zeroconf_added, + .removed = on_zeroconf_removed, +}; + +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct client *c; + spa_list_consume(c, &impl->clients, link) + client_free(c); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + if (impl->data_loop) + pw_context_release_loop(impl->context, impl->data_loop); + + if (impl->zeroconf) + pw_zeroconf_destroy(impl->zeroconf); + + pw_properties_free(impl->stream_props); + pw_properties_free(impl->props); + + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct impl *impl; + const char *str, *hostname, *port, *path; + struct pw_properties *props, *stream_props; + int res = 0; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + + impl->module = module; + impl->context = context; + impl->main_loop = pw_context_get_main_loop(context); + impl->data_loop = pw_context_acquire_loop(context, &props->dict); + impl->timer_queue = pw_context_get_timer_queue(context); + spa_list_init(&impl->clients); + + pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_NODE_LOOP_NAME); + copy_props(impl, props, SPA_KEY_AUDIO_LAYOUT); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + if ((str = pw_properties_get(props, "sendspin.group-id")) == NULL) { + uint64_t group_id; + pw_random(&group_id, sizeof(group_id)); + pw_properties_setf(props, "sendspin.group-id", "%016"PRIx64, group_id); + } + if ((str = pw_properties_get(props, "sendspin.group-name")) == NULL) + pw_properties_set(props, "sendspin.group-name", "PipeWire"); + if ((str = pw_properties_get(props, "sendspin.server-name")) == NULL) + pw_properties_set(props, "sendspin.server-name", pw_get_host_name()); + if ((str = pw_properties_get(props, "sendspin.server-id")) == NULL) + pw_properties_setf(props, "sendspin.server-id", "pipewire-%s", pw_get_host_name()); + + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(impl->context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)impl->core, + &impl->core_proxy_listener, + &core_proxy_events, impl); + pw_core_add_listener(impl->core, + &impl->core_listener, + &core_events, impl); + + impl->websocket = pw_websocket_new(impl->main_loop, &props->dict); + pw_websocket_add_listener(impl->websocket, &impl->websocket_listener, + &websocket_events, impl); + + if ((str = pw_properties_get(props, "sendspin.delay")) == NULL) + str = SPA_STRINGIFY(DEFAULT_SENDSPIN_DELAY); + impl->delay = pw_properties_parse_float(str); + + if ((impl->zeroconf = pw_zeroconf_new(context, NULL)) != NULL) { + pw_zeroconf_add_listener(impl->zeroconf, &impl->zeroconf_listener, + &zeroconf_events, impl); + } + + hostname = pw_properties_get(props, "sendspin.ip"); + if (hostname != NULL) { + struct spa_json iter; + char v[256]; + + port = pw_properties_get(props, "sendspin.port"); + if (port == NULL) + port = SPA_STRINGIFY(DEFAULT_CLIENT_PORT); + if ((path = pw_properties_get(props, "sendspin.path")) == NULL) + path = DEFAULT_SENDSPIN_PATH; + + if (spa_json_begin_array_relax(&iter, hostname, strlen(hostname)) <= 0) { + res = -EINVAL; + pw_log_error("can't parse sendspin.ip %s", hostname); + goto out; + } + while (spa_json_get_string(&iter, v, sizeof(v)) > 0) { + struct client *c; + struct pw_properties *p = pw_properties_copy(impl->stream_props); + + pw_properties_set(p, "sendspin.ip", v); + pw_properties_set(p, "sendspin.port", port); + pw_properties_set(p, "sendspin.path", path); + + if ((c = client_new(impl, "", p)) != NULL) + client_connect(c); + } + } else { + if ((hostname = pw_properties_get(props, "source.ip")) == NULL) + hostname = DEFAULT_SOURCE_IP; + if ((port = pw_properties_get(props, "source.port")) == NULL) + port = SPA_STRINGIFY(DEFAULT_SOURCE_PORT); + if ((path = pw_properties_get(props, "source.path")) == NULL) + path = DEFAULT_SOURCE_PATH; + + pw_websocket_listen(impl->websocket, NULL, hostname, port, path); + + if (impl->zeroconf) { + str = pw_properties_get(props, "sendspin.group-name"); + pw_zeroconf_set_announce(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_SERVER_SERVICE), + SPA_DICT_ITEM("zeroconf.session", str), + SPA_DICT_ITEM("zeroconf.port", port), + SPA_DICT_ITEM("path", path))); + } + } + if (impl->zeroconf) { + pw_zeroconf_set_browse(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_CLIENT_SERVICE))); + } + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_info)); + + pw_log_info("Successfully loaded module-sendspin-send"); + + return 0; +out: + impl_destroy(impl); + return res; +} diff --git a/src/modules/module-sendspin/regress.h b/src/modules/module-sendspin/regress.h new file mode 100644 index 000000000..1b6919906 --- /dev/null +++ b/src/modules/module-sendspin/regress.h @@ -0,0 +1,58 @@ + +struct spa_regress { + double meanX; + double meanY; + double varX; + double covXY; + uint32_t n; + uint32_t m; + double a; +}; + +static inline void spa_regress_init(struct spa_regress *r, uint32_t m) +{ + memset(r, 0, sizeof(*r)); + r->m = m; + r->a = 1.0/m; +} +static inline void spa_regress_update(struct spa_regress *r, double x, double y) +{ + double a, dx, dy; + + if (r->n == 0) { + r->meanX = x; + r->meanY = y; + r->n++; + a = 1.0; + } else if (r->n < r->m) { + a = 1.0/r->n; + r->n++; + } else { + a = r->a; + } + dx = x - r->meanX; + dy = y - r->meanY; + + r->varX += ((1.0 - a) * dx * dx - r->varX) * a; + r->covXY += ((1.0 - a) * dx * dy - r->covXY) * a; + r->meanX += dx * a; + r->meanY += dy * a; +} +static inline void spa_regress_get(struct spa_regress *r, double *a, double *b) +{ + *a = r->covXY/r->varX; + *b = r->meanY - *a * r->meanX; +} +static inline double spa_regress_calc_y(struct spa_regress *r, double x) +{ + double a, b; + spa_regress_get(r, &a, &b); + return x * a + b; +} +static inline double spa_regress_calc_x(struct spa_regress *r, double y) +{ + double a, b; + spa_regress_get(r, &a, &b); + return (y - b) / a; +} + diff --git a/src/modules/module-sendspin/sendspin.h b/src/modules/module-sendspin/sendspin.h new file mode 100644 index 000000000..b6260d27c --- /dev/null +++ b/src/modules/module-sendspin/sendspin.h @@ -0,0 +1,27 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_SENDSPIN_H +#define PIPEWIRE_SENDSPIN_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define PW_SENDSPIN_SERVER_SERVICE "_sendspin-server._tcp" +#define PW_SENDSPIN_CLIENT_SERVICE "_sendspin._tcp" + +#define PW_SENDSPIN_DEFAULT_SERVER_PORT 8927 +#define PW_SENDSPIN_DEFAULT_CLIENT_PORT 8928 +#define PW_SENDSPIN_DEFAULT_PATH "/sendspin" + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_SENDSPIN_H */ diff --git a/src/modules/module-sendspin/teeny-sha1.c b/src/modules/module-sendspin/teeny-sha1.c new file mode 100644 index 000000000..fa5a56753 --- /dev/null +++ b/src/modules/module-sendspin/teeny-sha1.c @@ -0,0 +1,201 @@ +/******************************************************************************* + * Teeny SHA-1 + * + * The below sha1digest() calculates a SHA-1 hash value for a + * specified data buffer and generates a hex representation of the + * result. This implementation is a re-forming of the SHA-1 code at + * https://github.com/jinqiangshou/EncryptionLibrary. + * + * Copyright (c) 2017 CTrabant + * + * License: MIT, see included LICENSE file for details. + * + * To use the sha1digest() function either copy it into an existing + * project source code file or include this file in a project and put + * the declaration (example below) in the sources files where needed. + ******************************************************************************/ + +#include +#include +#include +#include + +/* Declaration: +extern int sha1digest(uint8_t *digest, char *hexdigest, const uint8_t *data, size_t databytes); +*/ + +/******************************************************************************* + * sha1digest: https://github.com/CTrabant/teeny-sha1 + * + * Calculate the SHA-1 value for supplied data buffer and generate a + * text representation in hexadecimal. + * + * Based on https://github.com/jinqiangshou/EncryptionLibrary, credit + * goes to @jinqiangshou, all new bugs are mine. + * + * @input: + * data -- data to be hashed + * databytes -- bytes in data buffer to be hashed + * + * @output: + * digest -- the result, MUST be at least 20 bytes + * hexdigest -- the result in hex, MUST be at least 41 bytes + * + * At least one of the output buffers must be supplied. The other, if not + * desired, may be set to NULL. + * + * @return: 0 on success and non-zero on error. + ******************************************************************************/ +static inline int +sha1digest(uint8_t *digest, char *hexdigest, const uint8_t *data, size_t databytes) +{ +#define SHA1ROTATELEFT(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits)))) + + uint32_t W[80]; + uint32_t H[] = {0x67452301, + 0xEFCDAB89, + 0x98BADCFE, + 0x10325476, + 0xC3D2E1F0}; + uint32_t a; + uint32_t b; + uint32_t c; + uint32_t d; + uint32_t e; + uint32_t f = 0; + uint32_t k = 0; + + uint32_t idx; + uint32_t lidx; + uint32_t widx; + uint32_t didx = 0; + + int32_t wcount; + uint32_t temp; + uint64_t databits = ((uint64_t)databytes) * 8; + uint32_t loopcount = (databytes + 8) / 64 + 1; + uint32_t tailbytes = 64 * loopcount - databytes; + uint8_t datatail[128] = {0}; + + if (!digest && !hexdigest) + return -1; + + if (!data) + return -1; + + /* Pre-processing of data tail (includes padding to fill out 512-bit chunk): + Add bit '1' to end of message (big-endian) + Add 64-bit message length in bits at very end (big-endian) */ + datatail[0] = 0x80; + datatail[tailbytes - 8] = (uint8_t) (databits >> 56 & 0xFF); + datatail[tailbytes - 7] = (uint8_t) (databits >> 48 & 0xFF); + datatail[tailbytes - 6] = (uint8_t) (databits >> 40 & 0xFF); + datatail[tailbytes - 5] = (uint8_t) (databits >> 32 & 0xFF); + datatail[tailbytes - 4] = (uint8_t) (databits >> 24 & 0xFF); + datatail[tailbytes - 3] = (uint8_t) (databits >> 16 & 0xFF); + datatail[tailbytes - 2] = (uint8_t) (databits >> 8 & 0xFF); + datatail[tailbytes - 1] = (uint8_t) (databits >> 0 & 0xFF); + + /* Process each 512-bit chunk */ + for (lidx = 0; lidx < loopcount; lidx++) + { + /* Compute all elements in W */ + memset (W, 0, 80 * sizeof (uint32_t)); + + /* Break 512-bit chunk into sixteen 32-bit, big endian words */ + for (widx = 0; widx <= 15; widx++) + { + wcount = 24; + + /* Copy byte-per byte from specified buffer */ + while (didx < databytes && wcount >= 0) + { + W[widx] += (((uint32_t)data[didx]) << wcount); + didx++; + wcount -= 8; + } + /* Fill out W with padding as needed */ + while (wcount >= 0) + { + W[widx] += (((uint32_t)datatail[didx - databytes]) << wcount); + didx++; + wcount -= 8; + } + } + + /* Extend the sixteen 32-bit words into eighty 32-bit words, with potential optimization from: + "Improving the Performance of the Secure Hash Algorithm (SHA-1)" by Max Locktyukhin */ + for (widx = 16; widx <= 31; widx++) + { + W[widx] = SHA1ROTATELEFT ((W[widx - 3] ^ W[widx - 8] ^ W[widx - 14] ^ W[widx - 16]), 1); + } + for (widx = 32; widx <= 79; widx++) + { + W[widx] = SHA1ROTATELEFT ((W[widx - 6] ^ W[widx - 16] ^ W[widx - 28] ^ W[widx - 32]), 2); + } + + /* Main loop */ + a = H[0]; + b = H[1]; + c = H[2]; + d = H[3]; + e = H[4]; + + for (idx = 0; idx <= 79; idx++) + { + if (idx <= 19) + { + f = (b & c) | ((~b) & d); + k = 0x5A827999; + } + else if (idx >= 20 && idx <= 39) + { + f = b ^ c ^ d; + k = 0x6ED9EBA1; + } + else if (idx >= 40 && idx <= 59) + { + f = (b & c) | (b & d) | (c & d); + k = 0x8F1BBCDC; + } + else if (idx >= 60 && idx <= 79) + { + f = b ^ c ^ d; + k = 0xCA62C1D6; + } + temp = SHA1ROTATELEFT (a, 5) + f + e + k + W[idx]; + e = d; + d = c; + c = SHA1ROTATELEFT (b, 30); + b = a; + a = temp; + } + + H[0] += a; + H[1] += b; + H[2] += c; + H[3] += d; + H[4] += e; + } + + /* Store binary digest in supplied buffer */ + if (digest) + { + for (idx = 0; idx < 5; idx++) + { + digest[idx * 4 + 0] = (uint8_t) (H[idx] >> 24); + digest[idx * 4 + 1] = (uint8_t) (H[idx] >> 16); + digest[idx * 4 + 2] = (uint8_t) (H[idx] >> 8); + digest[idx * 4 + 3] = (uint8_t) (H[idx]); + } + } + + /* Store hex version of digest in supplied buffer */ + if (hexdigest) + { + snprintf (hexdigest, 41, "%08x%08x%08x%08x%08x", + H[0],H[1],H[2],H[3],H[4]); + } + + return 0; +} /* End of sha1digest() */ diff --git a/src/modules/module-sendspin/websocket.c b/src/modules/module-sendspin/websocket.c new file mode 100644 index 000000000..959706d79 --- /dev/null +++ b/src/modules/module-sendspin/websocket.c @@ -0,0 +1,1060 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2021 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "config.h" + +#include "websocket.h" +#include "teeny-sha1.c" +#include "../network-utils.h" +#include "../module-raop/base64.h" + +#define pw_websocket_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_websocket_events, m, v, ##__VA_ARGS__) +#define pw_websocket_emit_destroy(w) pw_websocket_emit(w, destroy, 0) +#define pw_websocket_emit_connected(w,u,c,p) pw_websocket_emit(w, connected, 0, u, c, p) + +#define pw_websocket_connection_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_websocket_connection_events, m, v, ##__VA_ARGS__) +#define pw_websocket_connection_emit_destroy(w) pw_websocket_connection_emit(w, destroy, 0) +#define pw_websocket_connection_emit_error(w,r,m) pw_websocket_connection_emit(w, error, 0, r, m) +#define pw_websocket_connection_emit_disconnected(w) pw_websocket_connection_emit(w, disconnected, 0) +#define pw_websocket_connection_emit_drained(w) pw_websocket_connection_emit(w, drained, 0) +#define pw_websocket_connection_emit_message(w,...) pw_websocket_connection_emit(w, message, 0, __VA_ARGS__) + +#define MAX_CONNECTIONS 64 + +struct message { + struct spa_list link; + size_t len; + size_t offset; + uint32_t seq; + int (*reply) (void *user_data, int status); + void *user_data; + unsigned char data[]; +}; + +struct server { + struct pw_websocket *ws; + struct spa_list link; + + struct sockaddr_storage addr; + struct spa_source *source; + + void *user; + char **paths; + + struct spa_list connections; + uint32_t n_connections; +}; + +struct pw_websocket_connection { + struct pw_websocket *ws; + struct spa_list link; + + int refcount; + + void *user; + struct server *server; + + struct spa_hook_list listener_list; + + struct spa_source *source; + unsigned int connecting:1; + unsigned int need_flush:1; + + char *host; + char *path; + char name[128]; + bool ipv4; + uint16_t port; + + struct sockaddr_storage addr; + + uint8_t maskbit; + + int status; + char message[128]; + char key[25]; + size_t content_length; + + uint32_t send_seq; + uint32_t recv_seq; + bool draining; + + struct spa_list messages; + struct spa_list pending; + + struct pw_array data; + size_t data_wanted; + size_t data_cursor; + size_t data_state; + int (*have_data) (struct pw_websocket_connection *conn, + void *data, size_t size, size_t current); +}; + +struct pw_websocket { + struct pw_loop *loop; + + struct spa_hook_list listener_list; + + struct spa_source *source; + + char *ifname; + char *ifaddress; + char *user_agent; + char *server_name; + + struct spa_list connections; + struct spa_list servers; +}; + +void pw_websocket_connection_disconnect(struct pw_websocket_connection *conn, bool drain) +{ + struct message *msg; + + if (drain && !spa_list_is_empty(&conn->messages)) { + conn->draining = true; + return; + } + + if (conn->source != NULL) { + pw_loop_destroy_source(conn->ws->loop, conn->source); + conn->source = NULL; + } + spa_list_insert_list(&conn->messages, &conn->pending); + spa_list_consume(msg, &conn->messages, link) { + spa_list_remove(&msg->link); + free(msg); + } + if (conn->server) { + conn->server->n_connections--; + conn->server = NULL; + } + pw_websocket_connection_emit_disconnected(conn); +} + +static void websocket_connection_unref(struct pw_websocket_connection *conn) +{ + if (--conn->refcount > 0) + return; + pw_array_clear(&conn->data); + free(conn->host); + free(conn->path); + free(conn); +} + +void pw_websocket_connection_destroy(struct pw_websocket_connection *conn) +{ + pw_log_debug("destroy connection %p", conn); + spa_list_remove(&conn->link); + + pw_websocket_connection_emit_destroy(conn); + + pw_websocket_connection_disconnect(conn, false); + spa_hook_list_clean(&conn->listener_list); + + websocket_connection_unref(conn); +} + +void pw_websocket_connection_add_listener(struct pw_websocket_connection *conn, + struct spa_hook *listener, + const struct pw_websocket_connection_events *events, void *data) +{ + spa_hook_list_append(&conn->listener_list, listener, events, data); +} + +struct pw_websocket *pw_websocket_new(struct pw_loop *main_loop, struct spa_dict *props) +{ + struct pw_websocket *ws; + uint32_t i; + + if ((ws = calloc(1, sizeof(*ws))) == NULL) + return NULL; + + for (i = 0; props && i < props->n_items; i++) { + const char *k = props->items[i].key; + const char *s = props->items[i].value; + if (spa_streq(k, "local.ifname")) + ws->ifname = s ? strdup(s) : NULL; + if (spa_streq(k, "local.ifaddress")) + ws->ifaddress = s ? strdup(s) : NULL; + if (spa_streq(k, "http.user-agent")) + ws->user_agent = s ? strdup(s) : NULL; + if (spa_streq(k, "http.server-name")) + ws->server_name = s ? strdup(s) : NULL; + } + if (ws->user_agent == NULL) + ws->user_agent = spa_aprintf("PipeWire/%s", PACKAGE_VERSION); + if (ws->server_name == NULL) + ws->server_name = spa_aprintf("PipeWire/%s", PACKAGE_VERSION); + + ws->loop = main_loop; + spa_hook_list_init(&ws->listener_list); + + spa_list_init(&ws->connections); + spa_list_init(&ws->servers); + return ws; +} + +static void server_free(struct server *server) +{ + struct pw_websocket *ws = server->ws; + struct pw_websocket_connection *conn; + + pw_log_debug("%p: free server %p", ws, server); + + spa_list_remove(&server->link); + spa_list_consume(conn, &server->connections, link) + pw_websocket_connection_destroy(conn); + if (server->source) + pw_loop_destroy_source(ws->loop, server->source); + pw_free_strv(server->paths); + free(server); +} + +void pw_websocket_destroy(struct pw_websocket *ws) +{ + struct server *server; + struct pw_websocket_connection *conn; + + pw_log_info("destroy sebsocket %p", ws); + pw_websocket_emit_destroy(ws); + + spa_list_consume(server, &ws->servers, link) + server_free(server); + spa_list_consume(conn, &ws->connections, link) + pw_websocket_connection_destroy(conn); + + spa_hook_list_clean(&ws->listener_list); + free(ws->ifname); + free(ws->ifaddress); + free(ws->user_agent); + free(ws->server_name); + free(ws); +} + +void pw_websocket_add_listener(struct pw_websocket *ws, + struct spa_hook *listener, + const struct pw_websocket_events *events, void *data) +{ + spa_hook_list_append(&ws->listener_list, listener, events, data); +} + +static int update_io(struct pw_websocket_connection *conn, int io, bool active) +{ + if (conn->source) { + uint32_t mask = conn->source->mask; + SPA_FLAG_UPDATE(mask, io, active); + if (mask != conn->source->mask) + pw_loop_update_io(conn->ws->loop, conn->source, mask); + } + return 0; +} + +static int receiver_expect(struct pw_websocket_connection *conn, size_t wanted, + int (*have_data) (struct pw_websocket_connection *conn, + void *data, size_t size, size_t current)) +{ + pw_array_reset(&conn->data); + conn->data_wanted = wanted; + conn->data_cursor = 0; + conn->data_state = 0; + conn->have_data = have_data; + return update_io(conn, SPA_IO_IN, wanted); +} + +static int queue_message(struct pw_websocket_connection *conn, struct message *msg) +{ + spa_list_append(&conn->messages, &msg->link); + conn->need_flush = true; + return update_io(conn, SPA_IO_OUT, true); +} + +static int receive_websocket(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + uint8_t *d = data; + int need = 0, header = 0, i; + if (conn->data_state == 0) { + /* header done */ + conn->status = d[0] & 0xf; + if (d[1] & 0x80) + header =+ 4; + if ((d[1] & 0x7f) == 126) + header += 2; + else if ((d[1] & 0x7f) == 127) + header += 8; + else + need += d[1] & 0x7f; + conn->data_cursor = 2 + header; + need += header; + conn->data_state++; + } + else if (conn->data_state == 1) { + /* extra length and mask */ + size_t payload_len = 0; + if ((d[1] & 0x7f) == 126) + header = 2; + else if ((d[1] & 0x7f) == 127) + header = 8; + for (i = 0; i < header; i++) + payload_len = (payload_len << 8) | d[i + 2]; + need += payload_len; + conn->data_state++; + } + if (need == 0) { + uint8_t *payload = &d[conn->data_cursor]; + size_t i, payload_size = conn->data.size - conn->data_cursor; + struct iovec iov[1] = {{ payload, payload_size }}; + + if (d[1] & 0x80) { + uint8_t *mask = &d[conn->data_cursor - 4]; + for (i = 0; i < payload_size; i++) + payload[i] ^= mask[i & 3]; + } + + switch (conn->status) { + case PW_WEBSOCKET_OPCODE_PING: + pw_log_info("received ping"); + pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_PONG, iov, 1); + break; + case PW_WEBSOCKET_OPCODE_CLOSE: + pw_log_info("received close"); + pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_CLOSE, iov, 1); + pw_websocket_connection_disconnect(conn, true); + break; + default: + pw_log_debug("received message %02x", conn->status); + pw_websocket_connection_emit_message(conn, conn->status, + payload, payload_size); + } + receiver_expect(conn, 2, receive_websocket); + } + return need; +} + +static int connection_upgrade_failed(struct pw_websocket_connection *conn, + int status, const char *message) +{ + FILE *f; + size_t len; + struct message *msg; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + fseek(f, offsetof(struct message, data), SEEK_SET); + fprintf(f, "HTTP/1.1 %d %s\r\n", status, message); + fprintf(f, "Transfer-Encoding: chunked\r\n"); + fprintf(f, "Content-Type: application/octet-stream\r\n"); + fprintf(f, "Server: %s\r\n", conn->ws->server_name); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + pw_log_info("send error %d %s", status, message); + return queue_message(conn, msg); +} + +static void make_accept(const char *key, char *accept) +{ + static const char *str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + uint8_t tmp[24 + 36], sha1[20]; + memcpy(&tmp[ 0], key, 24); + memcpy(&tmp[24], str, 36); + sha1digest(sha1, NULL, tmp, sizeof(tmp)); + pw_base64_encode(sha1, sizeof(sha1), accept, '='); +} + +static int connection_upgraded_send(struct pw_websocket_connection *conn) +{ + FILE *f; + size_t len; + struct message *msg; + char accept[29]; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + make_accept(conn->key, accept); + + fseek(f, offsetof(struct message, data), SEEK_SET); + fprintf(f, "HTTP/1.1 101 Switching Protocols\r\n"); + fprintf(f, "Upgrade: websocket\r\n"); + fprintf(f, "Connection: Upgrade\r\n"); + fprintf(f, "Sec-WebSocket-Accept: %s\r\n", accept); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + pw_log_info("send upgrade %s", msg->data); + return queue_message(conn, msg); +} + +static int complete_upgrade(struct pw_websocket_connection *conn) +{ + pw_websocket_emit_connected(conn->ws, conn->user, conn, conn->path); + return receiver_expect(conn, 2, receive_websocket); +} + +static int header_key_val(char *buf, char **key, char **val) +{ + char *v; + *key = buf; + if ((v = strstr(buf, ":")) == NULL) + return -EPROTO; + *v++ = '\0'; + *val = pw_strip(v, " "); + return 0; +} + +static int receive_http_request(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + char *d = data, *l; + char c = d[current]; + int need = 1; + + if (conn->data_state == 0) { + if (c == '\n') { + int v1, v2; + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (sscanf(l, "GET %ms HTTP/%d.%d", &conn->path, &v1, &v2) != 3) + return -EPROTO; + conn->data_state++; + } + } + else if (conn->data_state == 1) { + if (c == '\n') { + char *key, *val; + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + if (strlen(l) > 0) { + conn->data_cursor = current+1; + if (header_key_val(l, &key, &val) < 0) + return -EPROTO; + if (spa_streq(key, "Sec-WebSocket-Key")) + strncpy(conn->key, val, sizeof(conn->key)-1); + } else { + conn->data_state++; + need = 0; + } + } + } + if (need == 0) { + if (conn->server && conn->server->paths && + pw_strv_find(conn->server->paths, conn->path) < 0) { + connection_upgrade_failed(conn, 404, "Not Found"); + } else { + connection_upgraded_send(conn); + complete_upgrade(conn); + } + } + return need; +} + +static struct message *find_pending(struct pw_websocket_connection *conn, uint32_t seq) +{ + struct message *msg; + spa_list_for_each(msg, &conn->pending, link) { + if (msg->seq == seq) + return msg; + } + return NULL; +} + +static int receive_http_reply(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + char *d = data, *l; + char c = d[current]; + int need = 1; + + if (conn->data_state == 0) { + if (c == '\n') { + int v1, v2, status, message; + /* status complete */ + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (sscanf(l, "HTTP/%d.%d %n%d", &v1, &v2, &message, &status) != 3) + return -EPROTO; + conn->status = status; + strcpy(conn->message, &l[message]); + conn->content_length = 0; + conn->data_state++; + } + } + else if (conn->data_state == 1) { + if (c == '\n') { + /* header line complete */ + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (strlen(l) > 0) { + char *key, *value; + if (header_key_val(l, &key, &value) < 0) + return -EPROTO; + if (spa_streq(key, "Sec-WebSocket-Accept")) { + char accept[29]; + make_accept(conn->key, accept); + if (!spa_streq(value, accept)) { + pw_log_error("got Accept:%s expected:%s", value, accept); + return -EPROTO; + } + } + else if (spa_streq(key, "Content-Length")) + conn->content_length = atoi(value); + } else { + conn->data_state++; + need = conn->content_length; + } + } + } + if (need == 0) { + /* message completed */ + uint32_t seq; + int res; + struct message *msg; + + seq = conn->recv_seq++; + + pw_log_info("received reply to request with seq:%" PRIu32, seq); + + if ((msg = find_pending(conn, seq)) != NULL) { + res = msg->reply(msg->user_data, conn->status); + spa_list_remove(&msg->link); + free(msg); + + if (res < 0) + pw_websocket_connection_emit_error(conn, res, conn->message); + } + } + return need; +} + +static int on_upgrade_reply(void *user_data, int status) +{ + struct pw_websocket_connection *conn = user_data; + if (status != 101) + return -EPROTO; + return complete_upgrade(conn); +} + +static int handle_connect(struct pw_websocket_connection *conn, int fd) +{ + int res = 0; + socklen_t res_len; + FILE *f; + size_t len; + struct message *msg; + uint8_t key[16]; + + len = sizeof(res); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &res, &res_len) < 0) { + pw_log_error("getsockopt: %m"); + return -errno; + } + if (res != 0) + return -res; + + pw_log_info("connected to %s:%u", conn->name, conn->port); + + conn->connecting = false; + conn->status = 0; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + fseek(f, offsetof(struct message, data), SEEK_SET); + + /* make a key */ + pw_random(key, sizeof(key)); + pw_base64_encode(key, sizeof(key), conn->key, '='); + + fprintf(f, "GET %s HTTP/1.1\r\n", conn->path); + fprintf(f, "Host: %s\r\n", conn->host); + fprintf(f, "Upgrade: websocket\r\n"); + fprintf(f, "Connection: Upgrade\r\n"); + fprintf(f, "Sec-WebSocket-Version: 13\r\n"); + fprintf(f, "Sec-WebSocket-Key: %s\r\n", conn->key); + fprintf(f, "Accept: */*\r\n"); + fprintf(f, "User-Agent: %s\r\n", conn->ws->user_agent); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + msg->reply = on_upgrade_reply; + msg->user_data = conn; + msg->seq = conn->send_seq++; + + pw_log_info("%s", msg->data); + + receiver_expect(conn, 1, receive_http_reply); + + return queue_message(conn, msg); +} + +static int handle_input(struct pw_websocket_connection *conn) +{ + int res; + + while (conn->data.size < conn->data_wanted) { + size_t current = conn->data.size; + size_t pending = conn->data_wanted - current; + void *b; + + if (conn->source == NULL) + return -EPIPE; + + if ((res = pw_array_ensure_size(&conn->data, pending)) < 0) + return res; + b = SPA_PTROFF(conn->data.data, current, void); + + res = read(conn->source->fd, b, pending); + if (res == 0) + return 0; + if (res < 0) { + res = -errno; + if (res == -EINTR) + continue; + if (res != -EAGAIN && res != -EWOULDBLOCK) + return res; + return -EAGAIN; + } + conn->data.size += res; + if (conn->data.size == conn->data_wanted) { + if ((res = conn->have_data(conn, + conn->data.data, + conn->data.size, + current)) < 0) + return res; + + conn->data_wanted += res; + } + } + return 0; +} + +static int flush_output(struct pw_websocket_connection *conn) +{ + int res; + + conn->need_flush = false; + + if (conn->source == NULL) + return -EPIPE; + + while (true) { + struct message *msg; + void *data; + size_t size; + + if (spa_list_is_empty(&conn->messages)) { + if (conn->draining) + pw_websocket_connection_disconnect(conn, false); + break; + } + msg = spa_list_first(&conn->messages, struct message, link); + + if (msg->offset < msg->len) { + data = SPA_PTROFF(msg->data, msg->offset, void); + size = msg->len - msg->offset; + } else { + spa_list_remove(&msg->link); + if (msg->reply != NULL) + spa_list_append(&conn->pending, &msg->link); + else + free(msg); + continue; + } + + while (true) { + res = send(conn->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res < 0) { + res = -errno; + if (res == -EINTR) + continue; + if (res != -EAGAIN && res != -EWOULDBLOCK) + pw_log_warn("conn %p: send %zu, error %d: %m", + conn, size, res); + return res; + } + msg->offset += res; + break; + } + } + return 0; +} + +static void +on_source_io(void *data, int fd, uint32_t mask) +{ + struct pw_websocket_connection *conn = data; + int res; + + conn->refcount++; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + res = -EPIPE; + goto error; + } + if (mask & SPA_IO_IN) { + if ((res = handle_input(conn)) != -EAGAIN) + goto error; + } + if (mask & SPA_IO_OUT || conn->need_flush) { + if (conn->connecting) { + if ((res = handle_connect(conn, fd)) < 0) + goto error; + } + res = flush_output(conn); + if (res >= 0) { + if (conn->source) + pw_loop_update_io(conn->ws->loop, conn->source, + conn->source->mask & ~SPA_IO_OUT); + } else if (res != -EAGAIN) + goto error; + } +done: + websocket_connection_unref(conn); + return; +error: + if (res < 0) { + pw_log_error("%p: %s got connection error %d (%s)", conn, + conn->name, res, spa_strerror(res)); + snprintf(conn->message, sizeof(conn->message), "%s", spa_strerror(res)); + pw_websocket_connection_emit_error(conn, res, conn->message); + } else { + pw_log_info("%p: %s connection closed", conn, conn->name); + } + pw_websocket_connection_disconnect(conn, false); + goto done; +} + +int pw_websocket_connection_address(struct pw_websocket_connection *conn, + struct sockaddr *addr, socklen_t addr_len) +{ + memcpy(addr, &conn->addr, SPA_MIN(addr_len, sizeof(conn->addr))); + return 0; +} + +static struct pw_websocket_connection *connection_new(struct pw_websocket *ws, void *user, + struct sockaddr *addr, socklen_t addr_len, int fd, struct server *server) +{ + struct pw_websocket_connection *conn; + + if ((conn = calloc(1, sizeof(*conn))) == NULL) + goto error; + + if ((conn->source = pw_loop_add_io(ws->loop, spa_steal_fd(fd), + SPA_IO_ERR | SPA_IO_HUP | SPA_IO_OUT, + true, on_source_io, conn)) == NULL) + goto error; + + memcpy(&conn->addr, addr, SPA_MIN(addr_len, sizeof(conn->addr))); + conn->ws = ws; + conn->server = server; + conn->user = user; + if (server) + spa_list_append(&server->connections, &conn->link); + else + spa_list_append(&ws->connections, &conn->link); + + conn->refcount = 1; + if (pw_net_get_ip(&conn->addr, conn->name, sizeof(conn->name), + &conn->ipv4, &conn->port) < 0) + snprintf(conn->name, sizeof(conn->name), "connection %p", conn); + + spa_list_init(&conn->messages); + spa_list_init(&conn->pending); + spa_hook_list_init(&conn->listener_list); + pw_array_init(&conn->data, 4096); + + pw_log_debug("new websocket %p connection %p %s:%u", ws, + conn, conn->name, conn->port); + + return conn; +error: + if (fd != -1) + close(fd); + free(conn); + return NULL; +} + +static int make_tcp_socket(struct server *server, const char *name, uint16_t port, const char *ifname, + const char *ifaddress) +{ + struct sockaddr_storage addr; + int res, on; + socklen_t len = 0; + spa_autoclose int fd = -1; + + if ((res = pw_net_parse_address_port(name, ifaddress, port, &addr, &len)) < 0) { + pw_log_error("%p: can't parse address %s: %s", server, + name, spa_strerror(res)); + goto error; + } + + if ((fd = socket(addr.ss_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { + res = -errno; + pw_log_error("%p: socket() failed: %m", server); + goto error; + } +#ifdef SO_BINDTODEVICE + if (ifname && setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) { + res = -errno; + pw_log_error("%p: setsockopt(SO_BINDTODEVICE) failed: %m", server); + goto error; + } +#endif + on = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *) &on, sizeof(on)) < 0) + pw_log_warn("%p: setsockopt(): %m", server); + + if (bind(fd, (struct sockaddr *) &addr, len) < 0) { + res = -errno; + pw_log_error("%p: bind() failed: %m", server); + goto error; + } + if (listen(fd, 5) < 0) { + res = -errno; + pw_log_error("%p: listen() failed: %m", server); + goto error; + } + if (getsockname(fd, (struct sockaddr *)&addr, &len) < 0) { + res = -errno; + pw_log_error("%p: getsockname() failed: %m", server); + goto error; + } + + server->addr = addr; + + return spa_steal_fd(fd); +error: + return res; +} + +static void +on_server_connect(void *data, int fd, uint32_t mask) +{ + struct server *server = data; + struct pw_websocket *ws = server->ws; + struct sockaddr_storage addr; + socklen_t addrlen; + spa_autoclose int conn_fd = -1; + int val; + struct pw_websocket_connection *conn = NULL; + + addrlen = sizeof(addr); + if ((conn_fd = accept4(fd, (struct sockaddr*)&addr, &addrlen, + SOCK_NONBLOCK | SOCK_CLOEXEC)) < 0) + goto error; + + if (server->n_connections >= MAX_CONNECTIONS) + goto error_refused; + + if ((conn = connection_new(ws, server->user, (struct sockaddr*)&addr, sizeof(addr), + spa_steal_fd(conn_fd), server)) == NULL) + goto error; + + server->n_connections++; + + pw_log_info("%p: connection:%p %s:%u connected", ws, + conn, conn->name, conn->port); + + val = 1; + if (setsockopt(conn->source->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &val, sizeof(val)) < 0) + pw_log_warn("TCP_NODELAY failed: %m"); + + val = IPTOS_LOWDELAY; + if (setsockopt(conn->source->fd, IPPROTO_IP, IP_TOS, + (const void *) &val, sizeof(val)) < 0) + pw_log_warn("IP_TOS failed: %m"); + + receiver_expect(conn, 1, receive_http_request); + return; + +error_refused: + errno = ECONNREFUSED; +error: + pw_log_error("%p: failed to create connection: %m", ws); + return; +} + +int pw_websocket_listen(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *paths) +{ + int res; + struct server *server; + uint16_t port = atoi(service); + + if ((server = calloc(1, sizeof(struct server))) == NULL) + return -errno; + + server->ws = ws; + spa_list_append(&ws->servers, &server->link); + + server->user = user; + spa_list_init(&server->connections); + + if ((res = make_tcp_socket(server, hostname, port, ws->ifname, ws->ifaddress)) < 0) + goto error; + + if ((server->source = pw_loop_add_io(ws->loop, res, SPA_IO_IN, + true, on_server_connect, server)) == NULL) { + res = -errno; + goto error; + } + if (paths) + server->paths = pw_strv_parse(paths, strlen(paths), INT_MAX, NULL); + + pw_log_info("%p: listen %s:%u %s", ws, hostname, port, paths); + return 0; +error: + pw_log_error("%p: can't create server: %s", ws, spa_strerror(res)); + server_free(server); + return res; +} + +int pw_websocket_cancel(struct pw_websocket *ws, void *user) +{ + struct server *s, *ts; + struct pw_websocket_connection *c, *tc; + int count = 0; + + spa_list_for_each_safe(s, ts, &ws->servers, link) { + if (s->user == user) { + server_free(s); + count++; + } + } + spa_list_for_each_safe(c, tc, &ws->connections, link) { + if (c->user == user) { + pw_websocket_connection_destroy(c); + count++; + } + } + return count; +} + +int pw_websocket_connect(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *path) +{ + struct addrinfo hints; + struct addrinfo *result, *rp; + int res, fd; + struct pw_websocket_connection *conn = NULL; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + + if ((res = getaddrinfo(hostname, service, &hints, &result)) != 0) { + pw_log_error("getaddrinfo: %s", gai_strerror(res)); + return -EINVAL; + } + res = -ENOENT; + for (rp = result; rp != NULL; rp = rp->ai_next) { + if ((fd = socket(rp->ai_family, + rp->ai_socktype | SOCK_CLOEXEC | SOCK_NONBLOCK, + rp->ai_protocol)) == -1) + continue; + + res = connect(fd, rp->ai_addr, rp->ai_addrlen); + if (res == 0 || (res < 0 && errno == EINPROGRESS)) + break; + + res = -errno; + close(fd); + } + if (rp == NULL) { + pw_log_error("Could not connect to %s:%s: %s", hostname, service, + spa_strerror(res)); + } else { + if ((conn = connection_new(ws, user, rp->ai_addr, rp->ai_addrlen, fd, NULL)) == NULL) + res = -errno; + } + freeaddrinfo(result); + if (conn == NULL) + return res; + + conn->connecting = true; + conn->maskbit = 0x80; + conn->path = strdup(path); + asprintf(&conn->host, "%s:%s", hostname, service); + + pw_log_info("%p: connecting to %s:%u path:%s", conn, + conn->name, conn->port, path); + return 0; +} + +int pw_websocket_connection_send(struct pw_websocket_connection *conn, uint8_t opcode, + const struct iovec *iov, size_t iov_len) +{ + struct message *msg; + size_t len = 2, i, j, k; + uint8_t *d, *mask = NULL, maskbit = conn->maskbit; + size_t payload_length = 0; + + for (i = 0; i < iov_len; i++) + payload_length += iov[i].iov_len; + + if ((msg = calloc(1, sizeof(*msg) + 14 + payload_length)) == NULL) + return -errno; + + d = msg->data; + d[0] = 0x80 | opcode; + + if (payload_length < 126) + k = 0; + else if (payload_length < 65536) + k = 2; + else + k = 8; + + d[1] = maskbit | (k == 0 ? payload_length : (k == 2 ? 126 : 127)); + for (i = 0, j = (k-1)*8 ; i < k; i++, j -= 8) + d[len++] = (payload_length >> j) & 0xff; + + if (maskbit) { + mask = &d[len]; + pw_random(mask, 4); + len += 4; + } + for (i = 0, k = 0; i < iov_len; i++) { + if (maskbit) + for (j = 0; j < iov[i].iov_len; j++, k++) + d[len+j] = ((uint8_t*)iov[i].iov_base)[j] ^ mask[k & 3]; + else + memcpy(&d[len], iov[i].iov_base, iov[i].iov_len); + + len += iov[i].iov_len; + } + msg->len = len; + + return queue_message(conn, msg); +} + +int pw_websocket_connection_send_text(struct pw_websocket_connection *conn, + const char *payload, size_t payload_len) +{ + struct iovec iov[1] = {{ (void*)payload, payload_len }}; + pw_log_info("send text %.*s", (int)payload_len, payload); + return pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_TEXT, iov, 1); +} diff --git a/src/modules/module-sendspin/websocket.h b/src/modules/module-sendspin/websocket.h new file mode 100644 index 000000000..0f0da36e7 --- /dev/null +++ b/src/modules/module-sendspin/websocket.h @@ -0,0 +1,85 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_WEBSOCKET_H +#define PIPEWIRE_WEBSOCKET_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct pw_websocket; +struct pw_websocket_connection; + +#define PW_WEBSOCKET_OPCODE_TEXT 0x1 +#define PW_WEBSOCKET_OPCODE_BINARY 0x2 +#define PW_WEBSOCKET_OPCODE_CLOSE 0x8 +#define PW_WEBSOCKET_OPCODE_PING 0x9 +#define PW_WEBSOCKET_OPCODE_PONG 0xa + +struct pw_websocket_connection_events { +#define PW_VERSION_WEBSOCKET_CONNECTION_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + void (*error) (void *data, int res, const char *reason); + void (*disconnected) (void *data); + + void (*message) (void *data, + int opcode, void *payload, size_t size); +}; + +void pw_websocket_connection_add_listener(struct pw_websocket_connection *conn, + struct spa_hook *listener, + const struct pw_websocket_connection_events *events, void *data); + +void pw_websocket_connection_destroy(struct pw_websocket_connection *conn); +void pw_websocket_connection_disconnect(struct pw_websocket_connection *conn, bool drain); + +int pw_websocket_connection_address(struct pw_websocket_connection *conn, + struct sockaddr *addr, socklen_t addr_len); + +int pw_websocket_connection_send(struct pw_websocket_connection *conn, + uint8_t opcode, const struct iovec *iov, size_t iov_len); + +int pw_websocket_connection_send_text(struct pw_websocket_connection *conn, + const char *payload, size_t payload_len); + + +struct pw_websocket_events { +#define PW_VERSION_WEBSOCKET_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + + void (*connected) (void *data, void *user, + struct pw_websocket_connection *conn, const char *path); +}; + +struct pw_websocket * pw_websocket_new(struct pw_loop *main_loop, + struct spa_dict *props); + +void pw_websocket_destroy(struct pw_websocket *ws); + +void pw_websocket_add_listener(struct pw_websocket *ws, + struct spa_hook *listener, + const struct pw_websocket_events *events, void *data); + +int pw_websocket_connect(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *path); + +int pw_websocket_listen(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *paths); + +int pw_websocket_cancel(struct pw_websocket *ws, void *user); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_WEBSOCKET_H */ diff --git a/src/modules/module-sendspin/zeroconf.c b/src/modules/module-sendspin/zeroconf.c new file mode 100644 index 000000000..fb1c5a221 --- /dev/null +++ b/src/modules/module-sendspin/zeroconf.c @@ -0,0 +1,558 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2021 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" + +#include +#include + +#include +#include +#include +#include + +#include "../module-zeroconf-discover/avahi-poll.h" + +#include "zeroconf.h" + +#define pw_zeroconf_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_zeroconf_events, m, v, ##__VA_ARGS__) +#define pw_zeroconf_emit_destroy(c) pw_zeroconf_emit(c, destroy, 0) +#define pw_zeroconf_emit_error(c,e,m) pw_zeroconf_emit(c, error, 0, e, m) +#define pw_zeroconf_emit_added(c,id,i) pw_zeroconf_emit(c, added, 0, id, i) +#define pw_zeroconf_emit_removed(c,id,i) pw_zeroconf_emit(c, removed, 0, id, i) + +struct service_info { + AvahiIfIndex interface; + AvahiProtocol protocol; + const char *name; + const char *type; + const char *domain; + const char *host_name; + AvahiAddress address; + uint16_t port; +}; + +#define SERVICE_INFO(...) ((struct service_info){ __VA_ARGS__ }) + +struct entry { + struct pw_zeroconf *zc; + struct spa_list link; + +#define TYPE_ANNOUNCE 0 +#define TYPE_BROWSE 1 + uint32_t type; + void *user; + + struct pw_properties *props; + + AvahiEntryGroup *group; + AvahiServiceBrowser *browser; + + struct spa_list services; +}; + +struct service { + struct entry *e; + struct spa_list link; + + struct service_info info; + + struct pw_properties *props; +}; + +struct pw_zeroconf { + int refcount; + struct pw_context *context; + + struct pw_properties *props; + + struct spa_hook_list listener_list; + + AvahiPoll *poll; + AvahiClient *client; + AvahiClientState state; + + struct spa_list entries; + + bool discover_local; +}; + +static struct service *service_find(struct entry *e, const struct service_info *info) +{ + struct service *s; + spa_list_for_each(s, &e->services, link) { + if (s->info.interface == info->interface && + s->info.protocol == info->protocol && + spa_streq(s->info.name, info->name) && + spa_streq(s->info.type, info->type) && + spa_streq(s->info.domain, info->domain)) + return s; + } + return NULL; +} + +static void service_free(struct service *s) +{ + spa_list_remove(&s->link); + free((void*)s->info.name); + free((void*)s->info.type); + free((void*)s->info.domain); + free((void*)s->info.host_name); + pw_properties_free(s->props); + free(s); +} + +struct entry *entry_find(struct pw_zeroconf *zc, uint32_t type, void *user) +{ + struct entry *e; + spa_list_for_each(e, &zc->entries, link) + if (e->type == type && e->user == user) + return e; + return NULL; +} + +static void entry_free(struct entry *e) +{ + struct service *s; + + spa_list_remove(&e->link); + if (e->group) + avahi_entry_group_free(e->group); + spa_list_consume(s, &e->services, link) + service_free(s); + pw_properties_free(e->props); + free(e); +} + +static void zeroconf_free(struct pw_zeroconf *zc) +{ + struct entry *a; + + spa_list_consume(a, &zc->entries, link) + entry_free(a); + + if (zc->client) + avahi_client_free(zc->client); + if (zc->poll) + pw_avahi_poll_free(zc->poll); + pw_properties_free(zc->props); + free(zc); +} + +static void zeroconf_unref(struct pw_zeroconf *zc) +{ + if (--zc->refcount == 0) + zeroconf_free(zc); +} + +void pw_zeroconf_destroy(struct pw_zeroconf *zc) +{ + pw_zeroconf_emit_destroy(zc); + + zeroconf_unref(zc); +} + +static struct service *service_new(struct entry *e, + const struct service_info *info, AvahiStringList *txt) +{ + struct service *s; + struct pw_zeroconf *zc = e->zc; + const AvahiAddress *a = &info->address; + static const char *link_local_range = "169.254."; + AvahiStringList *l; + char at[AVAHI_ADDRESS_STR_MAX], if_suffix[16] = ""; + + if ((s = calloc(1, sizeof(*s))) == NULL) + goto error; + + s->e = e; + spa_list_append(&e->services, &s->link); + + s->info.interface = info->interface; + s->info.protocol = info->protocol; + s->info.name = strdup(info->name); + s->info.type = strdup(info->type); + s->info.domain = strdup(info->domain); + s->info.host_name = strdup(info->host_name); + s->info.address = info->address; + s->info.port = info->port; + + if ((s->props = pw_properties_new(NULL, NULL)) == NULL) + goto error; + + if (a->proto == AVAHI_PROTO_INET6 && + a->data.ipv6.address[0] == 0xfe && + (a->data.ipv6.address[1] & 0xc0) == 0x80) + snprintf(if_suffix, sizeof(if_suffix), "%%%d", info->interface); + + avahi_address_snprint(at, sizeof(at), a); + if (a->proto == AVAHI_PROTO_INET && + spa_strstartswith(at, link_local_range)) + snprintf(if_suffix, sizeof(if_suffix), "%%%d", info->interface); + + pw_properties_setf(s->props, "zeroconf.ifindex", "%d", info->interface); + pw_properties_setf(s->props, "zeroconf.name", "%s", info->name); + pw_properties_setf(s->props, "zeroconf.type", "%s", info->type); + pw_properties_setf(s->props, "zeroconf.domain", "%s", info->domain); + pw_properties_setf(s->props, "zeroconf.hostname", "%s", info->host_name); + pw_properties_setf(s->props, "zeroconf.address", "%s%s", at, if_suffix); + pw_properties_setf(s->props, "zeroconf.port", "%u", info->port); + + for (l = txt; l; l = l->next) { + char *key, *value; + + if (avahi_string_list_get_pair(l, &key, &value, NULL) != 0) + break; + + pw_properties_set(s->props, key, value); + avahi_free(key); + avahi_free(value); + } + + pw_log_info("new %s %s %s %s", info->name, info->type, info->domain, info->host_name); + pw_zeroconf_emit_added(zc, e->user, &s->props->dict); + + return s; + +error: + if (s) + service_free(s); + return NULL; +} + +static void resolver_cb(AvahiServiceResolver *r, AvahiIfIndex interface, + AvahiProtocol protocol, AvahiResolverEvent event, + const char *name, const char *type, const char *domain, + const char *host_name, const AvahiAddress *a, uint16_t port, + AvahiStringList *txt, AvahiLookupResultFlags flags, + void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + struct service_info info; + + if (event != AVAHI_RESOLVER_FOUND) { + pw_log_error("Resolving of '%s' failed: %s", name, + avahi_strerror(avahi_client_errno(zc->client))); + goto done; + } + + info = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain, + .host_name = host_name, + .address = *a, + .port = port); + + service_new(e, &info, txt); +done: + avahi_service_resolver_free(r); +} + +static void browser_cb(AvahiServiceBrowser *b, AvahiIfIndex interface, AvahiProtocol protocol, + AvahiBrowserEvent event, const char *name, const char *type, const char *domain, + AvahiLookupResultFlags flags, void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + struct service_info info; + struct service *s; + + if ((flags & AVAHI_LOOKUP_RESULT_LOCAL) && !zc->discover_local) + return; + + info = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain); + + s = service_find(e, &info); + + switch (event) { + case AVAHI_BROWSER_NEW: + if (s != NULL) + return; + if (!(avahi_service_resolver_new(zc->client, + interface, protocol, + name, type, domain, + AVAHI_PROTO_UNSPEC, 0, + resolver_cb, e))) { + int res = avahi_client_errno(zc->client); + pw_log_error("can't make service resolver: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + } + break; + case AVAHI_BROWSER_REMOVE: + if (s == NULL) + return; + pw_log_info("removed %s %s %s", name, type, domain); + pw_zeroconf_emit_removed(zc, e->user, &s->props->dict); + service_free(s); + break; + default: + break; + } +} + +static int do_browse(struct pw_zeroconf *zc, struct entry *e) +{ + const struct spa_dict_item *it; + const char *service_name = NULL; + int res; + + if (e->browser == NULL) { + spa_dict_for_each(it, &e->props->dict) { + if (spa_streq(it->key, "zeroconf.service")) + service_name = it->value; + } + if (service_name == NULL) { + res = -EINVAL; + pw_log_error("no service provided"); + pw_zeroconf_emit_error(zc, res, spa_strerror(res)); + return res; + } + e->browser = avahi_service_browser_new(zc->client, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + service_name, NULL, 0, + browser_cb, e); + if (e->browser == NULL) { + res = avahi_client_errno(zc->client); + pw_log_error("can't make browser: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + } + return 0; +} + +static void entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + int res; + + zc->refcount++; + + switch (state) { + case AVAHI_ENTRY_GROUP_ESTABLISHED: + pw_log_info("Service successfully established"); + break; + case AVAHI_ENTRY_GROUP_COLLISION: + pw_log_error("Service name collision"); + break; + case AVAHI_ENTRY_GROUP_FAILURE: + res = avahi_client_errno(zc->client); + pw_log_error("Entry group failure: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + break; + case AVAHI_ENTRY_GROUP_UNCOMMITED: + case AVAHI_ENTRY_GROUP_REGISTERING: + break; + } + zeroconf_unref(zc); +} + +static int do_announce(struct pw_zeroconf *zc, struct entry *e) +{ + AvahiStringList *txt = NULL; + int res; + const struct spa_dict_item *it; + const char *session_name = "unnamed", *service = NULL; + uint16_t port = 0; + + if (e->group == NULL) { + e->group = avahi_entry_group_new(zc->client, + entry_group_callback, e); + if (e->group == NULL) { + res = avahi_client_errno(zc->client); + pw_log_error("can't make group: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + } + avahi_entry_group_reset(e->group); + + spa_dict_for_each(it, &e->props->dict) { + if (spa_streq(it->key, "zeroconf.session")) + session_name = it->value; + else if (spa_streq(it->key, "zeroconf.port")) + port = atoi(it->value); + else if (spa_streq(it->key, "zeroconf.service")) + service = it->value; + else + txt = avahi_string_list_add_pair(txt, it->key, it->value); + } + if (service == NULL) { + res = -EINVAL; + pw_log_error("no service provided"); + pw_zeroconf_emit_error(zc, res, spa_strerror(res)); + return res; + } + res = avahi_entry_group_add_service_strlst(e->group, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + (AvahiPublishFlags)0, session_name, + service, NULL, NULL, port, txt); + avahi_string_list_free(txt); + + if (res < 0) { + res = avahi_client_errno(zc->client); + pw_log_error("can't add service: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + if ((res = avahi_entry_group_commit(e->group)) < 0) { + res = avahi_client_errno(zc->client); + pw_log_error("can't commit group: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + return 0; +} + +static int entry_start(struct pw_zeroconf *zc, struct entry *e) +{ + if (zc->state != AVAHI_CLIENT_S_REGISTERING && + zc->state != AVAHI_CLIENT_S_RUNNING && + zc->state != AVAHI_CLIENT_S_COLLISION) + return 0; + + if (e->type == TYPE_ANNOUNCE) + return do_announce(zc, e); + else + return do_browse(zc, e); +} + +static void client_callback(AvahiClient *c, AvahiClientState state, void *d) +{ + struct pw_zeroconf *zc = d; + struct entry *e; + + zc->client = c; + zc->refcount++; + zc->state = state; + + switch (state) { + case AVAHI_CLIENT_S_REGISTERING: + case AVAHI_CLIENT_S_RUNNING: + case AVAHI_CLIENT_S_COLLISION: + spa_list_for_each(e, &zc->entries, link) + entry_start(zc, e); + break; + case AVAHI_CLIENT_FAILURE: + { + int err = avahi_client_errno(c); + pw_zeroconf_emit_error(zc, err, avahi_strerror(err)); + break; + } + case AVAHI_CLIENT_CONNECTING: + default: + break; + } + zeroconf_unref(zc); +} + +static struct entry *entry_new(struct pw_zeroconf *zc, uint32_t type, void *user, const struct spa_dict *info) +{ + struct entry *e; + + if ((e = calloc(1, sizeof(*e))) == NULL) + return NULL; + + e->zc = zc; + e->type = type; + e->user = user; + e->props = pw_properties_new_dict(info); + spa_list_append(&zc->entries, &e->link); + spa_list_init(&e->services); + pw_log_info("created %s", type == TYPE_ANNOUNCE ? "announce" : "browse"); + return e; +} + +static int set_entry(struct pw_zeroconf *zc, uint32_t type, void *user, const struct spa_dict *info) +{ + struct entry *e; + + e = entry_find(zc, type, user); + if (e == NULL) { + if (info == NULL) + return 0; + e = entry_new(zc, type, user, info); + entry_start(zc, e); + } else { + if (info == NULL) + entry_free(e); + else { + pw_properties_update(e->props, info); + entry_start(zc, e); + } + } + return 0; +} +int pw_zeroconf_set_announce(struct pw_zeroconf *zc, void *user, const struct spa_dict *info) +{ + return set_entry(zc, TYPE_ANNOUNCE, user, info); +} +int pw_zeroconf_set_browse(struct pw_zeroconf *zc, void *user, const struct spa_dict *info) +{ + return set_entry(zc, TYPE_BROWSE, user, info); +} + +struct pw_zeroconf * pw_zeroconf_new(struct pw_context *context, + struct spa_dict *props) +{ + struct pw_zeroconf *zc; + uint32_t i; + int res; + + if ((zc = calloc(1, sizeof(*zc))) == NULL) + return NULL; + + zc->refcount = 1; + zc->context = context; + spa_hook_list_init(&zc->listener_list); + spa_list_init(&zc->entries); + zc->props = props ? pw_properties_new_dict(props) : pw_properties_new(NULL, NULL); + zc->discover_local = true; + + for (i = 0; props && i < props->n_items; i++) { + const char *k = props->items[i].key; + const char *v = props->items[i].value; + + if (spa_streq(k, "zeroconf.disable-local")) + zc->discover_local = spa_atob(v); + } + + zc->poll = pw_avahi_poll_new(context); + if (zc->poll == NULL) + goto error; + + zc->client = avahi_client_new(zc->poll, AVAHI_CLIENT_NO_FAIL, + client_callback, zc, &res); + if (!zc->client) { + pw_log_error("failed to create avahi client: %s", avahi_strerror(res)); + goto error; + } + return zc; +error: + zeroconf_free(zc); + return NULL; +} + +void pw_zeroconf_add_listener(struct pw_zeroconf *zc, + struct spa_hook *listener, + const struct pw_zeroconf_events *events, void *data) +{ + spa_hook_list_append(&zc->listener_list, listener, events, data); +} diff --git a/src/modules/module-sendspin/zeroconf.h b/src/modules/module-sendspin/zeroconf.h new file mode 100644 index 000000000..a5de62787 --- /dev/null +++ b/src/modules/module-sendspin/zeroconf.h @@ -0,0 +1,45 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_ZEROCONF_H +#define PIPEWIRE_ZEROCONF_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct pw_zeroconf; + +struct pw_zeroconf_events { +#define PW_VERSION_ZEROCONF_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + void (*error) (void *data, int err, const char *message); + + void (*added) (void *data, void *user, const struct spa_dict *info); + void (*removed) (void *data, void *user, const struct spa_dict *info); +}; + +struct pw_zeroconf * pw_zeroconf_new(struct pw_context *context, + struct spa_dict *props); + +void pw_zeroconf_destroy(struct pw_zeroconf *zc); + +int pw_zeroconf_set_announce(struct pw_zeroconf *zc, void *user, const struct spa_dict *info); +int pw_zeroconf_set_browse(struct pw_zeroconf *zc, void *user, const struct spa_dict *info); + +void pw_zeroconf_add_listener(struct pw_zeroconf *zc, + struct spa_hook *listener, + const struct pw_zeroconf_events *events, void *data); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_ZEROCONF_H */