diff --git a/spa/include/spa/utils/json-builder.h b/spa/include/spa/utils/json-builder.h index 883931bcd..6a4775c60 100644 --- a/spa/include/spa/utils/json-builder.h +++ b/spa/include/spa/utils/json-builder.h @@ -262,7 +262,6 @@ SPA_API_JSON_BUILDER void spa_json_builder_object_uint(struct spa_json_builder * snprintf(str, sizeof(str), "%" PRIu64, val); spa_json_builder_add_simple(b, key, INT_MAX, 'd', str, INT_MAX); } - SPA_API_JSON_BUILDER void spa_json_builder_object_double(struct spa_json_builder *b, const char *key, double val) { @@ -270,7 +269,6 @@ SPA_API_JSON_BUILDER void spa_json_builder_object_double(struct spa_json_builder spa_json_format_float(str, sizeof(str), (float)val); spa_json_builder_add_simple(b, key, INT_MAX, 'd', str, INT_MAX); } - SPA_API_JSON_BUILDER void spa_json_builder_object_string(struct spa_json_builder *b, const char *key, const char *val) { diff --git a/src/modules/meson.build b/src/modules/meson.build index 8636286e6..5a024be1a 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -700,6 +700,38 @@ pipewire_module_vban_recv = shared_library('pipewire-module-vban-recv', dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) + pipewire_module_sendspin_sources = [] + pipewire_module_sendspin_deps = [ mathlib, dl_lib, rt_lib, pipewire_dep ] + +if avahi_dep.found() + pipewire_module_sendspin_sources += [ + 'module-sendspin/zeroconf.c', + 'module-zeroconf-discover/avahi-poll.c', + ] + pipewire_module_sendspin_deps += avahi_dep +endif + +pipewire_module_sendspin_recv = shared_library('pipewire-module-sendspin-recv', + [ 'module-sendspin-recv.c', + 'module-sendspin/websocket.c', + pipewire_module_sendspin_sources ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : pipewire_module_sendspin_deps, +) +pipewire_module_sendspin_send = shared_library('pipewire-module-sendspin-send', + [ 'module-sendspin-send.c', + 'module-sendspin/websocket.c', + pipewire_module_sendspin_sources ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : pipewire_module_sendspin_deps, +) + build_module_roc = roc_dep.found() if build_module_roc pipewire_module_roc_sink = shared_library('pipewire-module-roc-sink', diff --git a/src/modules/module-sendspin-recv.c b/src/modules/module-sendspin-recv.c new file mode 100644 index 000000000..7e71a1397 --- /dev/null +++ b/src/modules/module-sendspin-recv.c @@ -0,0 +1,1189 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "module-sendspin/sendspin.h" +#include "module-sendspin/zeroconf.h" +#include "module-sendspin/websocket.h" +#include "module-sendspin/regress.h" +#include "network-utils.h" + +/** \page page_module_sendspin_recv sendspin receiver + * + * The `sendspin-recv` module creates a PipeWire source that receives audio + * packets using the sendspin protocol. + * + * The receive will listen on a specific port (8928) and create a stream for the + * data on the port. + * + * ## Module Name + * + * `libpipewire-module-sendspin-recv` + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `local.ifname = `: interface name to use + * - `source.ip = `: the source ip address to listen on, default 127.0.0.1 + * - `source.port = `: the source port to listen on, default 8928 + * - `source.path = `: the path to listen on, default "/sendspin" + * - `sendspin.ip`: the IP address of the sendspin server + * - `sendspin.port`: the port of the sendspin server, default 8927 + * - `sendspin.path`: the path on the sendspin server, default "/sendspin" + * - `sendspin.client-id`: the client id, default "pipewire-$(hostname)" + * - `sendspin.client-name`: the client name, default "$(hostname)" + * - `node.always-process = `: true to receive even when not running + * - `stream.props = {}`: properties to be passed to all the stream + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_REMOTE_NAME + * - \ref SPA_KEY_AUDIO_LAYOUT + * - \ref SPA_KEY_AUDIO_POSITION + * - \ref PW_KEY_MEDIA_NAME + * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL + * + * ## Example configuration + *\code{.unparsed} + * # ~/.config/pipewire/pipewire.conf.d/my-sendspin-recv.conf + * + * context.modules = [ + * { name = libpipewire-module-sendspin-recv + * args = { + * #local.ifname = eth0 + * #source.ip = 127.0.0.1 + * #source.port = 8928 + * #source.path = "/sendspin" + * #sendspin.ip = 127.0.0.1 + * #sendspin.port = 8927 + * #sendspin.path = "/sendspin" + * #sendspin.client-id = "pipewire-test" + * #node.always-process = false + * #audio.position = [ FL FR ] + * stream.props = { + * #media.class = "Audio/Source" + * #node.name = "sendspin-receiver" + * } + * } + * } + * ] + *\endcode + * + * \since 1.6.0 + */ + +#define NAME "sendspin-recv" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define DEFAULT_SOURCE_IP "127.0.0.1" +#define DEFAULT_SOURCE_PORT PW_SENDSPIN_DEFAULT_CLIENT_PORT +#define DEFAULT_SOURCE_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_SERVER_PORT PW_SENDSPIN_DEFAULT_SERVER_PORT +#define DEFAULT_SENDSPIN_PATH PW_SENDSPIN_DEFAULT_PATH + + +#define DEFAULT_POSITION "[ FL FR ]" + +#define USAGE "( local.ifname= ) " \ + "( source.ip= ) " \ + "( source.port= " \ + "( audio.position= ) " \ + "( stream.props= { key=value ... } ) " + +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "sendspin Receiver" }, + { PW_KEY_MODULE_USAGE, USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct stream { + struct impl *impl; + struct spa_list link; + + struct pw_websocket_connection *conn; + struct spa_hook conn_listener; + + struct spa_audio_info info; + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct pw_timer timer; + int timeout_count; + + uint32_t stride; + struct spa_ringbuffer ring; + void *buffer; + uint32_t buffer_size; + +#define ROLE_PLAYER (1<<0) +#define ROLE_METADATA (1<<1) + uint32_t active_roles; +#define REASON_DISCOVERY (0) +#define REASON_PLAYBACK (1) + uint32_t connection_reason; + + struct spa_regress regress_index; + struct spa_regress regress_time; + + bool resync; + struct spa_dll dll; +}; + +struct impl { + struct pw_impl_module *module; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *context; + + struct pw_loop *main_loop; + struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; + + struct pw_zeroconf *zeroconf; + struct spa_hook zeroconf_listener; + + bool always_process; + + struct pw_properties *stream_props; + + struct pw_websocket *websocket; + struct spa_hook websocket_listener; + + struct spa_list streams; +}; + +static void on_stream_destroy(void *d) +{ + struct stream *stream = d; + spa_hook_remove(&stream->stream_listener); + stream->stream = NULL; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct stream *stream = d; + switch (state) { + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + pw_impl_module_schedule_destroy(stream->impl->module); + break; + case PW_STREAM_STATE_PAUSED: + case PW_STREAM_STATE_STREAMING: + break; + default: + break; + } +} + +static void on_capture_stream_process(void *d) +{ + struct stream *stream = d; + struct pw_buffer *b; + struct spa_buffer *buf; + uint8_t *p; + uint32_t index = 0, n_frames, n_bytes; + int32_t avail, stride; + struct pw_time ts; + double err, corr, target, current_time; + + if ((b = pw_stream_dequeue_buffer(stream->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + buf = b->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + stride = stream->stride; + n_frames = buf->datas[0].maxsize / stride; + if (b->requested) + n_frames = SPA_MIN(b->requested, n_frames); + n_bytes = n_frames * stride; + + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + if (stream->timeout_count > 4 && stream->timeout_count > 4) { + pw_stream_get_time_n(stream->stream, &ts, sizeof(ts)); + + /* index to server time */ + target = spa_regress_calc_y(&stream->regress_index, index); + /* server time to client time */ + target = spa_regress_calc_y(&stream->regress_time, target); + + current_time = ts.now / 1000.0; + current_time -= (ts.buffered * 1000000.0 / stream->info.info.raw.rate) + + ((ts.delay) * 1000000.0 * ts.rate.num / ts.rate.denom); + err = target - (double)current_time; + + if (stream->resync) { + if (target < current_time) { + target = spa_regress_calc_x(&stream->regress_time, current_time); + index = (uint32_t)spa_regress_calc_x(&stream->regress_index, target); + index = SPA_ROUND_DOWN(index, stride); + + pw_log_info("resync %u %f %f %f", index, target, + current_time, target - current_time); + + spa_ringbuffer_read_update(&stream->ring, index); + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + + err = 0.0; + stream->resync = false; + } else { + avail = 0; + } + } + } else { + avail = 0; + } + if (avail < (int32_t)n_bytes) { + avail = 0; + stream->resync = true; + } + else if (avail > (int32_t)stream->buffer_size) { + index += avail - stream->buffer_size; + avail = stream->buffer_size; + stream->resync = true; + } + if (avail > 0) { + n_bytes = SPA_MIN(n_bytes, (uint32_t)avail); + + corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -1000, 1000)); + + pw_log_trace("%u %f %f %f %f", index, current_time, target, err, corr); + + pw_stream_set_rate(stream->stream, 1.0 / corr); + + spa_ringbuffer_read_data(&stream->ring, + stream->buffer, stream->buffer_size, + index % stream->buffer_size, + p, n_bytes); + spa_ringbuffer_read_update(&stream->ring, index + n_bytes); + } else { + memset(p, 0, n_bytes); + } + + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = stride; + buf->datas[0].chunk->size = n_bytes; + + pw_stream_queue_buffer(stream->stream, b); +} + +static const struct pw_stream_events capture_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .state_changed = on_stream_state_changed, + .process = on_capture_stream_process +}; + +static int create_stream(struct stream *stream) +{ + struct impl *impl = stream->impl; + int res; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + const char *server_id, *ip, *port, *server_name; + struct pw_properties *props = pw_properties_copy(impl->stream_props); + + ip = pw_properties_get(impl->props, "sendspin.ip"); + port = pw_properties_get(impl->props, "sendspin.port"); + server_id = pw_properties_get(props, "sendspin.server-id"); + server_name = pw_properties_get(props, "sendspin.server-name"); + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "sendspin.%s.%s.%s", server_id, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "Sendspin from %s", server_name); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "Sendspin from %s", server_name); + + stream->stream = pw_stream_new(impl->core, "sendspin receiver", props); + if (stream->stream == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + stream->buffer_size = 1024 * 1024; + stream->buffer = calloc(1, stream->buffer_size * stream->stride); + + pw_stream_add_listener(stream->stream, + &stream->stream_listener, + &capture_stream_events, stream); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &stream->info); + + if ((res = pw_stream_connect(stream->stream, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static void add_format(struct spa_json_builder *b, const char *codec, int channels, int rate, int depth) +{ + spa_json_builder_array_push(b, "{"); + spa_json_builder_object_string(b, "codec", codec); + spa_json_builder_object_int(b, "channels", channels); + spa_json_builder_object_int(b, "sample_rate", rate); + spa_json_builder_object_int(b, "bit_depth", depth); + spa_json_builder_pop(b, "}"); +} +static void add_playerv1_support(struct stream *stream, struct spa_json_builder *b) +{ + spa_json_builder_object_push(b, "player@v1_support", "{"); + spa_json_builder_object_push(b, "supported_formats", "["); + add_format(b, "pcm", 2, 48000, 16); + add_format(b, "pcm", 1, 48000, 16); + spa_json_builder_pop(b, "]"); + spa_json_builder_object_int(b, "buffer_capacity", 32000000); + spa_json_builder_object_push(b, "supported_commands", "["); + spa_json_builder_array_string(b, "volume"); + spa_json_builder_array_string(b, "mute"); + spa_json_builder_pop(b, "]"); + spa_json_builder_pop(b, "}"); +} +static int send_client_hello(struct stream *stream) +{ + struct impl *impl = stream->impl; + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/hello"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "client_id", pw_properties_get(impl->props, "sendspin.client-id")); + spa_json_builder_object_string(&b, "name", pw_properties_get(impl->props, "sendspin.client-name")); + spa_json_builder_object_int(&b, "version", 1); + spa_json_builder_object_push(&b, "supported_roles", "["); + spa_json_builder_array_string(&b, "player@v1"); + spa_json_builder_array_string(&b, "metadata@v1"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_object_push(&b, "device_info", "{"); + spa_json_builder_object_string(&b, "product_name", "Linux"); /* Use os-release */ + spa_json_builder_object_stringf(&b, "software_version", "PipeWire %s", pw_get_library_version()); + spa_json_builder_pop(&b, "}"); + add_playerv1_support(stream, &b); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + + return res; +} + +static int send_client_state(struct stream *stream) +{ + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/state"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "player", "{"); + spa_json_builder_object_string(&b, "state", "synchronized"); + spa_json_builder_object_int(&b, "volume", 100); + spa_json_builder_object_bool(&b, "muted", false); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + return res; +} + +static uint64_t get_time_us(struct stream *stream) +{ + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) + return 0; + return SPA_TIMESPEC_TO_USEC(&now); +} + +static int send_client_time(struct stream *stream) +{ + struct spa_json_builder b; + int res; + uint64_t now; + char *mem; + size_t size; + + now = get_time_us(stream); + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/time"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_uint(&b, "client_transmitted", now); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + free(mem); + return res; +} + +static void do_stream_timer(void *data) +{ + struct stream *stream = data; + send_client_time(stream); +} + +#if 0 +static int send_client_command(struct stream *stream) +{ + return 0; +} +#endif +static int send_client_goodbye(struct stream *stream, const char *reason) +{ + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "client/goodbye"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "reason", reason); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(stream->conn, mem, size); + pw_websocket_connection_disconnect(stream->conn, true); + free(mem); + return res; +} + +#if 0 +static int send_stream_request_format(struct stream *stream) +{ + return 0; +} +#endif + +static int handle_server_hello(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + struct spa_json it[1]; + char key[256], *t; + const char *v; + int l, version = 0; + struct stream *s, *st; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "server_id")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(impl->stream_props, "sendspin.server-id", t); + } + else if (spa_streq(key, "name")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(impl->stream_props, "sendspin.server-name", t); + } + else if (spa_streq(key, "version")) { + if (spa_json_parse_int(v, l, &version) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "active_roles")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + + spa_json_enter(payload, &it[0]); + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "player@v1")) + stream->active_roles |= ROLE_PLAYER; + else if (spa_streq(t, "metadata@v1")) + stream->active_roles |= ROLE_METADATA; + } + } + else if (spa_streq(key, "connection_reason")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "discovery")) + stream->connection_reason = REASON_DISCOVERY; + else if (spa_streq(t, "playback")) + stream->connection_reason = REASON_PLAYBACK; + + pw_properties_set(impl->stream_props, "sendspin.connection-reason", t); + } + } + if (version != 1) + return -ENOTSUP; + + if (stream->connection_reason == REASON_PLAYBACK) { + /* keep this server, destroy others */ + spa_list_for_each_safe(s, st, &impl->streams, link) { + if (s == stream) + continue; + send_client_goodbye(s, "another_server"); + } + } else { + /* keep other servers, destroy this one */ + spa_list_for_each_safe(s, st, &impl->streams, link) { + if (s == stream) + continue; + return send_client_goodbye(stream, "another_server"); + } + } + return send_client_state(stream); +} + +static int handle_server_state(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +static int parse_uint64(const char *val, int len, uint64_t *result) +{ + char buf[64]; + char *end; + + if (len <= 0 || len >= (int)sizeof(buf)) + return 0; + + memcpy(buf, val, len); + buf[len] = '\0'; + + *result = strtoull(buf, &end, 0); + return len > 0 && end == buf + len; +} + +static int handle_server_time(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + char key[256]; + const char *v; + int l; + uint64_t t1 = 0, t2 = 0, t3 = 0, t4 = 0, timeout; + + t4 = get_time_us(stream); + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_transmitted")) { + if (parse_uint64(v, l, &t1) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "server_received")) { + if (parse_uint64(v, l, &t2) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "server_transmitted")) { + if (parse_uint64(v, l, &t3) <= 0) + return -EINVAL; + } + } + + spa_regress_update(&stream->regress_time, (t2+t3)/2, (t1+t4)/2); + + if (stream->timeout_count < 4) + timeout = 200 * SPA_MSEC_PER_SEC; + else if (stream->timeout_count < 10) + timeout = SPA_NSEC_PER_SEC; + else if (stream->timeout_count < 20) + timeout = 2 * SPA_NSEC_PER_SEC; + else + timeout = 5 * SPA_NSEC_PER_SEC; + + stream->timeout_count++; + pw_timer_queue_add(impl->timer_queue, &stream->timer, + &stream->timer.timeout, timeout, + do_stream_timer, stream); + return 0; +} + +static int handle_server_command(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +/* {"codec":"pcm","sample_rate":44100,"channels":2,"bit_depth":16} */ +static int parse_player(struct stream *stream, struct spa_json *player) +{ + char key[256], codec[64] = ""; + const char *v; + int l, sample_rate = 0, channels = 0, bit_depth = 0; + + spa_zero(stream->info); + stream->info.media_type = SPA_MEDIA_TYPE_audio; + while ((l = spa_json_object_next(player, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "codec")) { + if (spa_json_parse_stringn(v, l, codec, sizeof(codec)) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "sample_rate")) { + if (spa_json_parse_int(v, l, &sample_rate) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "channels")) { + if (spa_json_parse_int(v, l, &channels) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "bit_depth")) { + if (spa_json_parse_int(v, l, &bit_depth) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "codec_header")) { + } + } + if (sample_rate == 0 || channels == 0) + return -EINVAL; + + if (spa_streq(codec, "pcm")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + stream->info.info.raw.rate = sample_rate; + stream->info.info.raw.channels = channels; + switch (bit_depth) { + case 16: + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S16_LE; + stream->stride = 2 * channels; + break; + case 24: + stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_LE; + stream->stride = 3 * channels; + break; + default: + return -EINVAL; + } + } + else if (spa_streq(codec, "opus")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; + stream->info.info.opus.rate = sample_rate; + stream->info.info.opus.channels = channels; + } + else if (spa_streq(codec, "flac")) { + stream->info.media_subtype = SPA_MEDIA_SUBTYPE_flac; + stream->info.info.flac.rate = sample_rate; + stream->info.info.flac.channels = channels; + } + else + return -EINVAL; + + spa_dll_set_bw(&stream->dll, SPA_DLL_BW_MIN, 1000, sample_rate); + + return 0; +} + +/* {"player":{}} */ +static int handle_stream_start(struct stream *stream, struct spa_json *payload) +{ + struct impl *impl = stream->impl; + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + parse_player(stream, &it[0]); + } + } + + if (stream->stream == NULL) { + create_stream(stream); + + pw_timer_queue_cancel(&stream->timer); + pw_timer_queue_add(impl->timer_queue, &stream->timer, + NULL, 0, do_stream_timer, stream); + } else { + } + + return 0; +} + +static void stream_clear(struct stream *stream) +{ + spa_ringbuffer_init(&stream->ring); + memset(stream->buffer, 0, stream->buffer_size); +} + +static int handle_stream_clear(struct stream *stream, struct spa_json *payload) +{ + stream_clear(stream); + return 0; +} +static int handle_stream_end(struct stream *stream, struct spa_json *payload) +{ + if (stream->stream != NULL) { + pw_stream_destroy(stream->stream); + stream->stream = NULL; + stream_clear(stream); + } + return 0; +} + +static int handle_group_update(struct stream *stream, struct spa_json *payload) +{ + return 0; +} + +/* { "type":... "payload":{...} } */ +static int do_parse_text(struct stream *stream, const char *content, int size) +{ + struct spa_json it[2], *payload = NULL; + char key[256], type[256] = ""; + const char *v; + int res, l; + + pw_log_info("received text %.*s", size, content); + + if (spa_json_begin_object(&it[0], content, size) <= 0) + return -EINVAL; + + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "payload")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + payload = &it[1]; + } + else if (spa_streq(key, "type")) { + if (spa_json_parse_stringn(v, l, type, sizeof(type)) <= 0) + continue; + } + } + if (spa_streq(type, "server/hello")) + res = handle_server_hello(stream, payload); + else if (spa_streq(type, "server/state")) + res = handle_server_state(stream, payload); + else if (spa_streq(type, "server/time")) + res = handle_server_time(stream, payload); + else if (spa_streq(type, "server/command")) + res = handle_server_command(stream, payload); + else if (spa_streq(type, "stream/start")) + res = handle_stream_start(stream, payload); + else if (spa_streq(type, "stream/end")) + res = handle_stream_end(stream, payload); + else if (spa_streq(type, "stream/clear")) + res = handle_stream_clear(stream, payload); + else if (spa_streq(type, "group/update")) + res = handle_group_update(stream, payload); + else + res = 0; + + return res; +} + +static int do_handle_binary(struct stream *stream, const uint8_t *payload, int size) +{ + struct impl *impl = stream->impl; + int32_t filled; + uint32_t index, length = size - 9; + uint64_t timestamp; + + if (payload[0] != 4 || stream->stream == NULL) + return 0; + + timestamp = ((uint64_t)payload[1]) << 56; + timestamp |= ((uint64_t)payload[2]) << 48; + timestamp |= ((uint64_t)payload[3]) << 40; + timestamp |= ((uint64_t)payload[4]) << 32; + timestamp |= ((uint64_t)payload[5]) << 24; + timestamp |= ((uint64_t)payload[6]) << 16; + timestamp |= ((uint64_t)payload[7]) << 8; + timestamp |= ((uint64_t)payload[8]); + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (filled < 0) { + pw_log_warn("%p: underrun write:%u filled:%d", + stream, index, filled); + } else if (filled + length > stream->buffer_size) { + pw_log_debug("%p: overrun write:%u filled:%d", + stream, index, filled); + } + + spa_ringbuffer_write_data(&stream->ring, + stream->buffer, stream->buffer_size, + index % stream->buffer_size, + &payload[9], length); + + spa_ringbuffer_write_update(&stream->ring, index + length); + + pw_loop_lock(impl->data_loop); + spa_regress_update(&stream->regress_index, index, timestamp); + pw_loop_unlock(impl->data_loop); + + return 0; +} + +static void on_connection_message(void *data, int opcode, void *payload, size_t size) +{ + struct stream *stream = data; + if (opcode == PW_WEBSOCKET_OPCODE_TEXT) { + do_parse_text(stream, payload, size); + } else if (opcode == PW_WEBSOCKET_OPCODE_BINARY) { + do_handle_binary(stream, payload, size); + } else { + pw_log_warn("%02x unknown %08x", opcode, (int)size); + } +} + +static void stream_destroy(struct stream *stream) +{ + handle_stream_end(stream, NULL); + if (stream->conn) { + spa_hook_remove(&stream->conn_listener); + pw_websocket_connection_destroy(stream->conn); + } + pw_timer_queue_cancel(&stream->timer); + spa_list_remove(&stream->link); + free(stream->buffer); + free(stream); +} + +static void on_connection_destroy(void *data) +{ + struct stream *stream = data; + stream->conn = NULL; + pw_log_info("connection %p destroy", stream); +} +static void on_connection_error(void *data, int res, const char *reason) +{ + struct stream *stream = data; + pw_log_error("connection %p error %d %s", stream, res, reason); +} + +static void on_connection_disconnected(void *data) +{ + struct stream *stream = data; + stream_destroy(stream); +} + +static const struct pw_websocket_connection_events websocket_connection_events = { + PW_VERSION_WEBSOCKET_CONNECTION_EVENTS, + .destroy = on_connection_destroy, + .error = on_connection_error, + .disconnected = on_connection_disconnected, + .message = on_connection_message, +}; + +static struct stream *stream_new(struct impl *impl, struct pw_websocket_connection *conn) +{ + struct stream *stream; + + stream = calloc(1, sizeof(*stream)); + if (stream == NULL) + return NULL; + + stream->impl = impl; + spa_list_append(&impl->streams, &stream->link); + + stream->conn = conn; + pw_websocket_connection_add_listener(stream->conn, &stream->conn_listener, + &websocket_connection_events, stream); + + spa_regress_init(&stream->regress_index, 5); + spa_regress_init(&stream->regress_time, 5); + + spa_dll_init(&stream->dll); + stream->resync = true; + + return stream; +} + +static void on_websocket_connected(void *data, void *user, + struct pw_websocket_connection *conn, const char *path) +{ + struct impl *impl = data; + struct stream *stream; + pw_log_info("connected to %s", path); + stream = stream_new(impl, conn); + send_client_hello(stream); +} + +static const struct pw_websocket_events websocket_events = { + PW_VERSION_WEBSOCKET_EVENTS, + .connected = on_websocket_connected, +}; + +static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info) +{ +} + +static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info) +{ +} + +static const struct pw_zeroconf_events zeroconf_events = { + PW_VERSION_ZEROCONF_EVENTS, + .added = on_zeroconf_added, + .removed = on_zeroconf_removed, +}; + +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct stream *s; + + spa_list_consume(s, &impl->streams, link) + stream_destroy(s); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + if (impl->data_loop) + pw_context_release_loop(impl->context, impl->data_loop); + + pw_properties_free(impl->stream_props); + pw_properties_free(impl->props); + + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct impl *impl; + const char *str, *hostname, *port, *path; + struct pw_properties *props, *stream_props; + int res = 0; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + + impl->module = module; + impl->context = context; + impl->main_loop = pw_context_get_main_loop(context); + impl->data_loop = pw_context_acquire_loop(context, &props->dict); + impl->timer_queue = pw_context_get_timer_queue(context); + spa_list_init(&impl->streams); + + pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_NODE_LOOP_NAME); + copy_props(impl, props, SPA_KEY_AUDIO_LAYOUT); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + if ((str = pw_properties_get(props, "sendspin.client-name")) == NULL) + pw_properties_set(props, "sendspin.client-name", pw_get_host_name()); + if ((str = pw_properties_get(props, "sendspin.client-id")) == NULL) + pw_properties_setf(props, "sendspin.client-id", "pipewire-%s", pw_get_host_name()); + + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(impl->context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)impl->core, + &impl->core_proxy_listener, + &core_proxy_events, impl); + pw_core_add_listener(impl->core, + &impl->core_listener, + &core_events, impl); + + impl->websocket = pw_websocket_new(impl->main_loop, &props->dict); + pw_websocket_add_listener(impl->websocket, &impl->websocket_listener, + &websocket_events, impl); + + if ((impl->zeroconf = pw_zeroconf_new(context, NULL)) != NULL) { + pw_zeroconf_add_listener(impl->zeroconf, &impl->zeroconf_listener, + &zeroconf_events, impl); + } + + hostname = pw_properties_get(props, "sendspin.ip"); + if (hostname != NULL) { + port = pw_properties_get(props, "sendspin.port"); + if (port == NULL) + port = SPA_STRINGIFY(DEFAULT_SERVER_PORT); + if ((path = pw_properties_get(props, "sendspin.path")) == NULL) + path = DEFAULT_SENDSPIN_PATH; + + pw_websocket_connect(impl->websocket, NULL, hostname, port, path); + } else { + if ((hostname = pw_properties_get(props, "source.ip")) == NULL) + hostname = DEFAULT_SOURCE_IP; + if ((port = pw_properties_get(props, "source.port")) == NULL) + port = SPA_STRINGIFY(DEFAULT_SOURCE_PORT); + if ((path = pw_properties_get(props, "source.path")) == NULL) + path = DEFAULT_SOURCE_PATH; + + pw_websocket_listen(impl->websocket, NULL, hostname, port, path); + + if (impl->zeroconf) { + str = pw_properties_get(props, "sendspin.client-id"); + pw_zeroconf_set_announce(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_CLIENT_SERVICE), + SPA_DICT_ITEM("zeroconf.session", str), + SPA_DICT_ITEM("zeroconf.port", port), + SPA_DICT_ITEM("path", path))); + } + } + + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_info)); + + pw_log_info("Successfully loaded module-sendspin-recv"); + + return 0; +out: + impl_destroy(impl); + return res; +} diff --git a/src/modules/module-sendspin-send.c b/src/modules/module-sendspin-send.c new file mode 100644 index 000000000..9bc7195ce --- /dev/null +++ b/src/modules/module-sendspin-send.c @@ -0,0 +1,1389 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "module-sendspin/sendspin.h" +#include "module-sendspin/websocket.h" +#include "module-sendspin/zeroconf.h" +#include "network-utils.h" + +/** \page page_module_sendspin_send sendspin sender + * + * The `sendspin-send` module creates a PipeWire sink that sends audio + * packets using the sendspin protocol to a client. + * + * The sender will listen on a specific port (8927) and create a stream for + * each connection. + * + * In combination with a virtual sink, each of the client streams can be sent + * the same data in the client specific format. + * + * ## Module Name + * + * `libpipewire-module-sendspin-send` + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `local.ifname = `: interface name to use + * - `local.ifaddress = `: interface address to use + * - `source.ip = `: the source ip address to listen on, default "127.0.0.1" + * - `source.port = `: the source port to listen on, default 8927 + * - `source.path = `: comma separated list of paths to listen on, + * default "/sendspin" + * - `sendspin.ip`: an array of IP addresses of sendspin clients to connect to + * - `sendspin.port`: the port of the sendspin client to connect to, default 8928 + * - `sendspin.path`: the path of the sendspin client to connect to, default "/sendspin" + * - `sendspin.group-id`: the group-id of the server, default random + * - `sendspin.group-name`: the group-name of the server, default "PipeWire" + * - `sendspin.delay`: the delay to add to clients in seconds. Default 5.0 + * - `node.always-process = `: true to send silence even when not connected. + * - `stream.props = {}`: properties to be passed to all the stream + * - `stream.rules` = : match rules, use the create-stream action to + * make a stream for the client. + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_REMOTE_NAME + * - \ref SPA_KEY_AUDIO_LAYOUT + * - \ref SPA_KEY_AUDIO_POSITION + * - \ref PW_KEY_MEDIA_NAME + * - \ref PW_KEY_MEDIA_CLASS + * - \ref PW_KEY_NODE_NAME + * - \ref PW_KEY_NODE_DESCRIPTION + * - \ref PW_KEY_NODE_GROUP + * - \ref PW_KEY_NODE_LATENCY + * - \ref PW_KEY_NODE_VIRTUAL + * + * ## Example configuration + *\code{.unparsed} + * # ~/.config/pipewire/pipewire.conf.d/my-sendspin-send.conf + * + * context.modules = [ + * { name = libpipewire-module-sendspin-send + * args = { + * #local.ifname = eth0 + * #source.ip = 127.0.0.1 + * #source.port = 8927 + * #source.path = "/sendspin" + * #sendspin.ip = [ 127.0.0.1 ] + * #sendspin.port = 8928 + * #sendspin.path = "/sendspin" + * #sendspin.group-id = "abcded" + * #sendspin.group-name = "PipeWire" + * #sendspin.delay = 5.0 + * #node.always-process = false + * #audio.position = [ FL FR ] + * stream.props = { + * #media.class = "Audio/sink" + * #node.name = "sendspin-send" + * } + * stream.rules = [ + * { matches = [ + * { sendspin.ip = "~.*" + * #sendspin.port = 8928 + * #sendspin.path = "/sendspin" + * #zeroconf.ifindex = 0 + * #zeroconf.name = "" + * #zeroconf.type = "_sendspin._tcp" + * #zeroconf.domain = "local" + * #zeroconf.hostname = "" + * } + * ] + * actions = { + * create-stream = { + * stream.props = { + * #target.object = "" + * #media.class = "Audio/Sink" + * } + * } + * } + * } + * ] + * } + * } + * ] + *\endcode + * + * \since 1.6.0 + */ + +#define NAME "sendspin-send" + +PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +#define DEFAULT_SOURCE_IP "127.0.0.1" +#define DEFAULT_SOURCE_PORT PW_SENDSPIN_DEFAULT_SERVER_PORT +#define DEFAULT_SOURCE_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_CLIENT_PORT PW_SENDSPIN_DEFAULT_CLIENT_PORT +#define DEFAULT_SENDSPIN_PATH PW_SENDSPIN_DEFAULT_PATH + +#define DEFAULT_SENDSPIN_DELAY 5.0 + +#define DEFAULT_POSITION "[ FL FR ]" + +#define DEFAULT_CREATE_RULES \ + "[ { matches = [ { sendspin.ip = \"~.*\" } ] actions = { create-stream = { } } } ] " + +#define USAGE "( local.ifname= ) " \ + "( source.ip= ) " \ + "( source.port= " \ + "( audio.position= ) " \ + "( stream.props= { key=value ... } ) " + +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "Sendspin sender" }, + { PW_KEY_MODULE_USAGE, USAGE }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct client { + struct impl *impl; + struct spa_list link; + + char *name; + struct pw_properties *props; + + struct pw_websocket_connection *conn; + struct spa_hook conn_listener; + + struct spa_audio_info info; + struct pw_stream *stream; + struct spa_hook stream_listener; + + struct spa_io_position *io_position; + struct pw_timer timer; + + uint64_t delay_usec; + uint32_t stride; + + int buffer_capacity; +#define ROLE_PLAYER (1<<0) +#define ROLE_METADATA (1<<1) + uint32_t supported_roles; +#define COMMAND_VOLUME (1<<0) +#define COMMAND_MUTE (1<<1) + uint32_t supported_commands; + + bool playing; +}; + +struct impl { + struct pw_impl_module *module; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *context; + + struct pw_loop *main_loop; + struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + unsigned int do_disconnect:1; + + struct pw_zeroconf *zeroconf; + struct spa_hook zeroconf_listener; + + float delay; + bool always_process; + + struct pw_properties *stream_props; + + struct pw_websocket *websocket; + struct spa_hook websocket_listener; + + struct spa_list clients; + +}; + +static int send_group_update(struct client *c, bool playing); +static int send_stream_start(struct client *c); +static int send_server_state(struct client *c); + +static void on_stream_destroy(void *d) +{ + struct client *c = d; + spa_hook_remove(&c->stream_listener); + c->stream = NULL; +} + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct client *c = d; + switch (state) { + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + //pw_impl_module_schedule_destroy(c->impl->module); + break; + case PW_STREAM_STATE_PAUSED: + send_group_update(c, false); + break; + case PW_STREAM_STATE_STREAMING: + send_group_update(c, true); + break; + default: + break; + } +} + +static uint64_t get_time_us(struct client *c) +{ + struct timespec now; + if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) + return 0; + return SPA_TIMESPEC_TO_USEC(&now); +} + +static void on_playback_stream_process(void *d) +{ + struct client *c = d; + struct pw_buffer *b; + struct spa_buffer *buf; + uint8_t *p; + struct iovec iov[2]; + uint8_t header[9]; + uint64_t timestamp; + + if ((b = pw_stream_dequeue_buffer(c->stream)) == NULL) { + pw_log_debug("out of buffers: %m"); + return; + } + + if (c->playing) { + buf = b->buffer; + if ((p = buf->datas[0].data) == NULL) + return; + + timestamp = c->io_position ? + c->io_position->clock.nsec / 1000 : + get_time_us(c); + timestamp += c->delay_usec; + + header[0] = 4; + header[1] = (timestamp >> 56) & 0xff; + header[2] = (timestamp >> 48) & 0xff; + header[3] = (timestamp >> 40) & 0xff; + header[4] = (timestamp >> 32) & 0xff; + header[5] = (timestamp >> 24) & 0xff; + header[6] = (timestamp >> 16) & 0xff; + header[7] = (timestamp >> 8) & 0xff; + header[8] = (timestamp ) & 0xff; + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + iov[1].iov_base = p; + iov[1].iov_len = buf->datas[0].chunk->size; + + pw_websocket_connection_send(c->conn, + PW_WEBSOCKET_OPCODE_BINARY, iov, 2); + } + pw_stream_queue_buffer(c->stream, b); +} + +static void +on_stream_param_changed(void *d, uint32_t id, const struct spa_pod *param) +{ + struct client *c = d; + + if (param == NULL) + return; + + switch (id) { + case SPA_PARAM_Format: + if (spa_format_audio_parse(param, &c->info) < 0) + return; + send_stream_start(c); + break; + case SPA_PARAM_Tag: + send_server_state(c); + break; + } +} + +static void on_stream_io_changed(void *d, uint32_t id, void *area, uint32_t size) +{ + struct client *c = d; + switch (id) { + case SPA_IO_Position: + c->io_position = area; + break; + } +} + +static const struct pw_stream_events playback_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = on_stream_destroy, + .io_changed = on_stream_io_changed, + .state_changed = on_stream_state_changed, + .param_changed = on_stream_param_changed, + .process = on_playback_stream_process +}; + +static int create_stream(struct client *c) +{ + struct impl *impl = c->impl; + int res; + uint32_t n_params; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + const char *client_id, *ip, *port, *client_name; + struct pw_properties *props = pw_properties_copy(c->props); + + ip = pw_properties_get(props, "sendspin.ip"); + port = pw_properties_get(props, "sendspin.port"); + client_id = pw_properties_get(props, "sendspin.client-id"); + client_name = pw_properties_get(props, "sendspin.client-name"); + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "sendspin.%s.%s.%s", client_id, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "Sendspin to %s", client_name); + if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "Sendspin to %s", client_name); + + + c->stream = pw_stream_new(impl->core, "sendspin sender", props); + if (c->stream == NULL) + return -errno; + + pw_stream_add_listener(c->stream, + &c->stream_listener, + &playback_stream_events, c); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_build(&b, + SPA_PARAM_EnumFormat, &c->info); + + if ((res = pw_stream_connect(c->stream, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static int send_server_hello(struct client *c) +{ + struct impl *impl = c->impl; + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/hello"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "server_id", pw_properties_get(impl->props, "sendspin.server-id")); + spa_json_builder_object_string(&b, "name", pw_properties_get(impl->props, "sendspin.server-name")); + spa_json_builder_object_int(&b, "version", 1); + spa_json_builder_object_push(&b, "active_roles", "["); + if (c->supported_roles & ROLE_PLAYER) + spa_json_builder_array_string(&b, "player@v1"); + if (c->supported_roles & ROLE_METADATA) + spa_json_builder_array_string(&b, "metadata@v1"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_object_string(&b, "connection_reason", "discovery"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + + return res; +} + +static int send_server_state(struct client *c) +{ + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + if (!SPA_FLAG_IS_SET(c->supported_roles, ROLE_METADATA)) + return 0; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/state"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "metadata", "{"); + spa_json_builder_object_uint(&b, "timestamp", get_time_us(c)); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +static int send_server_time(struct client *c, uint64_t t1, uint64_t t2) +{ + struct spa_json_builder b; + int res; + uint64_t t3; + size_t size; + char *mem; + + t3 = get_time_us(c); + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "server/time"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_uint(&b, "client_transmitted", t1); + spa_json_builder_object_uint(&b, "server_received", t2); + spa_json_builder_object_uint(&b, "server_transmitted", t3); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +#if 0 +static int send_server_command(struct client *c) +{ + return 0; +} +#endif + +static int send_stream_start(struct client *c) +{ + struct spa_json_builder b; + int res, channels, rate, depth = 0; + const char *codec; + size_t size; + char *mem; + + switch (c->info.media_subtype) { + case SPA_MEDIA_SUBTYPE_raw: + codec = "pcm"; + channels = c->info.info.raw.channels; + rate = c->info.info.raw.rate; + switch (c->info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16_LE: + depth = 16; + break; + case SPA_AUDIO_FORMAT_S24_LE: + depth = 24; + break; + default: + return -ENOTSUP; + } + break; + case SPA_MEDIA_SUBTYPE_opus: + codec = "opus"; + channels = c->info.info.opus.channels; + rate = c->info.info.opus.rate; + break; + case SPA_MEDIA_SUBTYPE_flac: + codec = "flac"; + channels = c->info.info.flac.channels; + rate = c->info.info.flac.rate; + break; + default: + return -ENOTSUP; + } + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "stream/start"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "player", "{"); + spa_json_builder_object_string(&b, "codec", codec); + spa_json_builder_object_int(&b, "channels", channels); + spa_json_builder_object_int(&b, "sample_rate", rate); + if (depth) + spa_json_builder_object_int(&b, "bit_depth", depth); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +#if 0 +static int send_stream_end(struct client *c) +{ + struct spa_json_builder b; + int res; + size_t size; + char *mem; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "stream/end"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_push(&b, "roles", "["); + spa_json_builder_array_string(&b, "player"); + spa_json_builder_array_string(&b, "metadata"); + spa_json_builder_pop(&b, "]"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} +#endif + +static int send_group_update(struct client *c, bool playing) +{ + struct impl *impl = c->impl; + struct spa_json_builder b; + int res; + char *mem; + size_t size; + + spa_json_builder_memstream(&b, &mem, &size, 0); + spa_json_builder_array_push(&b, "{"); + spa_json_builder_object_string(&b, "type", "group/update"); + spa_json_builder_object_push(&b, "payload", "{"); + spa_json_builder_object_string(&b, "playback_state", playing ? "playing" : "stopped"); + spa_json_builder_object_string(&b, "group_id", pw_properties_get(impl->props, "sendspin.group-id")); + spa_json_builder_object_string(&b, "group_name", pw_properties_get(impl->props, "sendspin.group-name")); + spa_json_builder_pop(&b, "}"); + spa_json_builder_pop(&b, "}"); + spa_json_builder_close(&b); + + c->playing = playing; + + res = pw_websocket_connection_send_text(c->conn, mem, size); + free(mem); + return res; +} + +/* {"codec":"pcm","sample_rate":44100,"channels":2,"bit_depth":16} */ +static int parse_codec(struct client *c, struct spa_json *object, struct spa_audio_info *info) +{ + char key[256], codec[64] = ""; + const char *v; + int l, sample_rate = 0, channels = 0, bit_depth = 0; + + spa_zero(*info); + info->media_type = SPA_MEDIA_TYPE_audio; + while ((l = spa_json_object_next(object, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "codec")) { + if (spa_json_parse_stringn(v, l, codec, sizeof(codec)) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "sample_rate")) { + if (spa_json_parse_int(v, l, &sample_rate) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "channels")) { + if (spa_json_parse_int(v, l, &channels) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "bit_depth")) { + if (spa_json_parse_int(v, l, &bit_depth) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "codec_header")) { + } + } + if (sample_rate == 0 || channels == 0) + return -EINVAL; + + if (spa_streq(codec, "pcm")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_raw; + info->info.raw.rate = sample_rate; + info->info.raw.channels = channels; + switch (bit_depth) { + case 16: + info->info.raw.format = SPA_AUDIO_FORMAT_S16_LE; + break; + case 24: + info->info.raw.format = SPA_AUDIO_FORMAT_S24_LE; + break; + default: + return -EINVAL; + } + } + else if (spa_streq(codec, "opus")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_opus; + info->info.opus.rate = sample_rate; + info->info.opus.channels = channels; + } + else if (spa_streq(codec, "flac")) { + info->media_subtype = SPA_MEDIA_SUBTYPE_flac; + info->info.flac.rate = sample_rate; + info->info.flac.channels = channels; + } + else + return -EINVAL; + + return 0; +} + +static int parse_player_v1_support(struct client *c, struct spa_json *payload) +{ + struct spa_json it[2]; + char key[256], *t; + const char *v; + int l, res; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "supported_formats")) { + int count = 0; + + if (!spa_json_is_array(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + + while ((l = spa_json_next(&it[0], &v)) > 0) { + struct spa_audio_info info; + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + if ((res = parse_codec(c, &it[1], &info)) < 0) + return res; + + if (count++ == 0) + c->info = info; + } + } + else if (spa_streq(key, "buffer_capacity")) { + if (spa_json_parse_int(v, l, &c->buffer_capacity) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "supported_commands")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + if (spa_streq(t, "volume")) + c->supported_commands |= COMMAND_VOLUME; + else if (spa_streq(t, "mute")) + c->supported_commands |= COMMAND_MUTE; + } + } + } + return 0; +} + +static int handle_client_hello(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256], *t; + const char *v; + int res, l, version = 0; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_id")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(c->props, "sendspin.client-id", t); + } + else if (spa_streq(key, "name")) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + pw_properties_set(c->props, "sendspin.client-name", t); + } + else if (spa_streq(key, "version")) { + if (spa_json_parse_int(v, l, &version) <= 0) + return -EINVAL; + } + else if (spa_streq(key, "supported_roles")) { + if (!spa_json_is_array(v, l)) + return -EPROTO; + + spa_json_enter(payload, &it[0]); + while ((l = spa_json_next(&it[0], &v)) > 0) { + t = alloca(l+1); + spa_json_parse_stringn(v, l, t, l+1); + + if (spa_streq(t, "player@v1")) + c->supported_roles |= ROLE_PLAYER; + else if (spa_streq(t, "metadata@v1")) + c->supported_roles |= ROLE_METADATA; + } + } + else if (spa_streq(key, "player_support") || + spa_streq(key, "player@v1_support")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + if ((res = parse_player_v1_support(c, &it[0])) < 0) + return res; + } + } + if (version != 1) + return -ENOTSUP; + + return send_server_hello(c); +} + +static int handle_client_state(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "state")) { + } + else if (spa_streq(key, "volume")) { + } + else if (spa_streq(key, "mute")) { + } + } + } + } + if (c->stream == NULL) + create_stream(c); + return 0; +} + +static int parse_uint64(const char *val, int len, uint64_t *result) +{ + char buf[64]; + char *end; + + if (len <= 0 || len >= (int)sizeof(buf)) + return 0; + + memcpy(buf, val, len); + buf[len] = '\0'; + + *result = strtoull(buf, &end, 0); + return len > 0 && end == buf + len; +} + +static int handle_client_time(struct client *c, struct spa_json *payload) +{ + char key[256]; + const char *v; + int l; + uint64_t t1 = 0,t2; + + t2 = get_time_us(c); + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "client_transmitted")) { + if (parse_uint64(v, l, &t1) <= 0) + return -EINVAL; + } + } + if (t1 == 0) + return -EPROTO; + + return send_server_time(c, t1, t2); +} + +static int handle_client_command(struct client *c, struct spa_json *payload) +{ + return 0; +} + +/* {"player":{}} */ +static int handle_stream_request_format(struct client *c, struct spa_json *payload) +{ + struct spa_json it[1]; + char key[256]; + const char *v; + int l; + + while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "player")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + spa_json_enter(payload, &it[0]); + parse_codec(c, &it[0], &c->info); + } + } + return 0; +} + +static int handle_client_goodbye(struct client *c, struct spa_json *payload) +{ + if (c->stream != NULL) { + pw_stream_destroy(c->stream); + c->stream = NULL; + } + return 0; +} + +/* { "type":... "payload":{...} } */ +static int do_parse_text(struct client *c, const char *content, int size) +{ + struct spa_json it[2], *payload = NULL; + char key[256], type[256] = ""; + const char *v; + int res, l; + + pw_log_info("received text %.*s", size, content); + + if (spa_json_begin_object(&it[0], content, size) <= 0) + return -EINVAL; + + while ((l = spa_json_object_next(&it[0], key, sizeof(key), &v)) > 0) { + if (spa_streq(key, "payload")) { + if (!spa_json_is_object(v, l)) + return -EPROTO; + + spa_json_enter(&it[0], &it[1]); + payload = &it[1]; + } + else if (spa_streq(key, "type")) { + if (spa_json_parse_stringn(v, l, type, sizeof(type)) <= 0) + continue; + } + } + if (spa_streq(type, "client/hello")) + res = handle_client_hello(c, payload); + else if (spa_streq(type, "client/state")) + res = handle_client_state(c, payload); + else if (spa_streq(type, "client/time")) + res = handle_client_time(c, payload); + else if (spa_streq(type, "client/command")) + res = handle_client_command(c, payload); + else if (spa_streq(type, "client/goodbye")) + res = handle_client_goodbye(c, payload); + else if (spa_streq(type, "stream/request-format")) + res = handle_stream_request_format(c, payload); + else + res = 0; + + return res; +} + +static void on_connection_message(void *data, int opcode, void *payload, size_t size) +{ + struct client *c = data; + if (opcode == PW_WEBSOCKET_OPCODE_TEXT) { + do_parse_text(c, payload, size); + } else { + pw_log_warn("%02x unknown %08x", opcode, (int)size); + } +} + +static void client_free(struct client *c) +{ + struct impl *impl = c->impl; + + handle_client_goodbye(c, NULL); + if (c->conn) { + spa_hook_remove(&c->conn_listener); + pw_websocket_connection_destroy(c->conn); + } else { + pw_websocket_cancel(impl->websocket, c); + } + pw_timer_queue_cancel(&c->timer); + spa_list_remove(&c->link); + free(c); +} + +static void on_connection_destroy(void *data) +{ + struct client *c = data; + c->conn = NULL; + pw_log_info("connection %p destroy", c); +} + +static void on_connection_error(void *data, int res, const char *reason) +{ + struct client *c = data; + pw_log_error("connection %p error %d %s", c, res, reason); +} + +static void on_connection_disconnected(void *data) +{ + struct client *c = data; + client_free(c); +} + +static const struct pw_websocket_connection_events websocket_connection_events = { + PW_VERSION_WEBSOCKET_CONNECTION_EVENTS, + .destroy = on_connection_destroy, + .error = on_connection_error, + .disconnected = on_connection_disconnected, + .message = on_connection_message, +}; + +static struct client *client_new(struct impl *impl, const char *name, struct pw_properties *props) +{ + struct client *c; + + if ((c = calloc(1, sizeof(*c))) == NULL) + goto error; + + c->impl = impl; + spa_list_append(&impl->clients, &c->link); + + c->props = props; + c->name = name ? strdup(name) : NULL; + c->delay_usec = (uint64_t)(impl->delay * SPA_USEC_PER_SEC); + + return c; +error: + pw_properties_free(props); + return NULL; +} + +static int client_connect(struct client *c) +{ + struct impl *impl = c->impl; + const char *addr, *port, *path; + addr = pw_properties_get(c->props, "sendspin.ip"); + port = pw_properties_get(c->props, "sendspin.port"); + path = pw_properties_get(c->props, "sendspin.path"); + return pw_websocket_connect(impl->websocket, c, addr, port, path); +} + +static void client_connected(struct client *c, struct pw_websocket_connection *conn) +{ + if (c->conn) { + spa_hook_remove(&c->conn_listener); + pw_websocket_connection_destroy(c->conn); + } + c->conn = conn; + if (conn) + pw_websocket_connection_add_listener(c->conn, &c->conn_listener, + &websocket_connection_events, c); +} + +static struct client *client_find(struct impl *impl, const char *name) +{ + struct client *c; + spa_list_for_each(c, &impl->clients, link) { + if (spa_streq(c->name, name)) + return c; + } + return NULL; +} + +struct match_info { + struct impl *impl; + const char *name; + struct pw_properties *props; + struct pw_websocket_connection *conn; + bool matched; +}; + +static int rule_matched(void *data, const char *location, const char *action, + const char *str, size_t len) +{ + struct match_info *i = data; + struct impl *impl = i->impl; + int res = 0; + + i->matched = true; + if (spa_streq(action, "create-stream")) { + struct client *c; + + pw_properties_update_string(i->props, str, len); + if ((c = client_new(impl, i->name, spa_steal_ptr(i->props))) == NULL) + return -errno; + if (i->conn) + client_connected(c, i->conn); + else + client_connect(c); + } + return res; +} + +static int match_client(struct impl *impl, const char *name, struct pw_properties *props, + struct pw_websocket_connection *conn) +{ + const char *str; + struct match_info minfo = { + .impl = impl, + .name = name, + .props = props, + .conn = conn, + }; + + if ((str = pw_properties_get(impl->props, "stream.rules")) == NULL) + str = DEFAULT_CREATE_RULES; + + pw_conf_match_rules(str, strlen(str), NAME, &props->dict, + rule_matched, &minfo); + + if (!minfo.matched) { + pw_log_info("unmatched client found %s", str); + if (conn) + pw_websocket_connection_destroy(conn); + pw_properties_free(props); + } + return minfo.matched; +} + +static void on_websocket_connected(void *data, void *user, + struct pw_websocket_connection *conn, const char *path) +{ + struct impl *impl = data; + struct client *c = user; + + pw_log_info("connected to %s", path); + if (c == NULL) { + struct sockaddr_storage addr; + char ip[128]; + uint16_t port = 0; + bool ipv4; + struct pw_properties *props; + + pw_websocket_connection_address(conn, + (struct sockaddr*)&addr, sizeof(addr)); + + props = pw_properties_copy(impl->stream_props); + if (pw_net_get_ip(&addr, ip, sizeof(ip), &ipv4, &port) >= 0) { + pw_properties_set(props, "sendspin.ip", ip); + pw_properties_setf(props, "sendspin.port", "%u", port); + } + pw_properties_set(props, "sendspin.path", path); + + match_client(impl, "", props, conn); + } else { + client_connected(c, conn); + } +} + +static const struct pw_websocket_events websocket_events = { + PW_VERSION_WEBSOCKET_EVENTS, + .connected = on_websocket_connected, +}; + +static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info) +{ + struct impl *impl = data; + const char *name, *addr, *port, *path; + struct client *c; + struct pw_properties *props; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if ((c = client_find(impl, name)) != NULL) + return; + + props = pw_properties_copy(impl->stream_props); + pw_properties_update(props, info); + + addr = spa_dict_lookup(info, "zeroconf.address"); + port = spa_dict_lookup(info, "zeroconf.port"); + path = spa_dict_lookup(info, "path"); + + pw_properties_set(props, "sendspin.ip", addr); + pw_properties_set(props, "sendspin.port", port); + pw_properties_set(props, "sendspin.path", path); + + match_client(impl, name, props, NULL); +} + +static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info) +{ + struct impl *impl = data; + const char *name; + struct client *c; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if ((c = client_find(impl, name)) == NULL) + return; + + client_free(c); +} + +static const struct pw_zeroconf_events zeroconf_events = { + PW_VERSION_ZEROCONF_EVENTS, + .added = on_zeroconf_added, + .removed = on_zeroconf_removed, +}; + +static void core_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->core_listener); + impl->core = NULL; + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct impl *impl) +{ + struct client *c; + spa_list_consume(c, &impl->clients, link) + client_free(c); + + if (impl->core && impl->do_disconnect) + pw_core_disconnect(impl->core); + + if (impl->data_loop) + pw_context_release_loop(impl->context, impl->data_loop); + + if (impl->zeroconf) + pw_zeroconf_destroy(impl->zeroconf); + + pw_properties_free(impl->stream_props); + pw_properties_free(impl->props); + + free(impl); +} + +static void module_destroy(void *d) +{ + struct impl *impl = d; + spa_hook_remove(&impl->module_listener); + impl_destroy(impl); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct impl *impl = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + pw_impl_module_schedule_destroy(impl->module); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) +{ + const char *str; + if ((str = pw_properties_get(props, key)) != NULL) { + if (pw_properties_get(impl->stream_props, key) == NULL) + pw_properties_set(impl->stream_props, key, str); + } +} + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct impl *impl; + const char *str, *hostname, *port, *path; + struct pw_properties *props, *stream_props; + int res = 0; + + PW_LOG_TOPIC_INIT(mod_topic); + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = impl->props = pw_properties_new_string(args); + stream_props = impl->stream_props = pw_properties_new(NULL, NULL); + if (props == NULL || stream_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + + impl->module = module; + impl->context = context; + impl->main_loop = pw_context_get_main_loop(context); + impl->data_loop = pw_context_acquire_loop(context, &props->dict); + impl->timer_queue = pw_context_get_timer_queue(context); + spa_list_init(&impl->clients); + + pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_NODE_LOOP_NAME); + copy_props(impl, props, SPA_KEY_AUDIO_LAYOUT); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); + copy_props(impl, props, PW_KEY_MEDIA_NAME); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + + impl->always_process = pw_properties_get_bool(stream_props, + PW_KEY_NODE_ALWAYS_PROCESS, true); + + if ((str = pw_properties_get(props, "sendspin.group-id")) == NULL) { + uint64_t group_id; + pw_random(&group_id, sizeof(group_id)); + pw_properties_setf(props, "sendspin.group-id", "%016"PRIx64, group_id); + } + if ((str = pw_properties_get(props, "sendspin.group-name")) == NULL) + pw_properties_set(props, "sendspin.group-name", "PipeWire"); + if ((str = pw_properties_get(props, "sendspin.server-name")) == NULL) + pw_properties_set(props, "sendspin.server-name", pw_get_host_name()); + if ((str = pw_properties_get(props, "sendspin.server-id")) == NULL) + pw_properties_setf(props, "sendspin.server-id", "pipewire-%s", pw_get_host_name()); + + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); + if (impl->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + impl->core = pw_context_connect(impl->context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + impl->do_disconnect = true; + } + if (impl->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)impl->core, + &impl->core_proxy_listener, + &core_proxy_events, impl); + pw_core_add_listener(impl->core, + &impl->core_listener, + &core_events, impl); + + impl->websocket = pw_websocket_new(impl->main_loop, &props->dict); + pw_websocket_add_listener(impl->websocket, &impl->websocket_listener, + &websocket_events, impl); + + if ((str = pw_properties_get(props, "sendspin.delay")) == NULL) + str = SPA_STRINGIFY(DEFAULT_SENDSPIN_DELAY); + impl->delay = pw_properties_parse_float(str); + + if ((impl->zeroconf = pw_zeroconf_new(context, NULL)) != NULL) { + pw_zeroconf_add_listener(impl->zeroconf, &impl->zeroconf_listener, + &zeroconf_events, impl); + } + + hostname = pw_properties_get(props, "sendspin.ip"); + if (hostname != NULL) { + struct spa_json iter; + char v[256]; + + port = pw_properties_get(props, "sendspin.port"); + if (port == NULL) + port = SPA_STRINGIFY(DEFAULT_CLIENT_PORT); + if ((path = pw_properties_get(props, "sendspin.path")) == NULL) + path = DEFAULT_SENDSPIN_PATH; + + if (spa_json_begin_array_relax(&iter, hostname, strlen(hostname)) <= 0) { + res = -EINVAL; + pw_log_error("can't parse sendspin.ip %s", hostname); + goto out; + } + while (spa_json_get_string(&iter, v, sizeof(v)) > 0) { + struct client *c; + struct pw_properties *p = pw_properties_copy(impl->stream_props); + + pw_properties_set(p, "sendspin.ip", v); + pw_properties_set(p, "sendspin.port", port); + pw_properties_set(p, "sendspin.path", path); + + if ((c = client_new(impl, "", p)) != NULL) + client_connect(c); + } + } else { + if ((hostname = pw_properties_get(props, "source.ip")) == NULL) + hostname = DEFAULT_SOURCE_IP; + if ((port = pw_properties_get(props, "source.port")) == NULL) + port = SPA_STRINGIFY(DEFAULT_SOURCE_PORT); + if ((path = pw_properties_get(props, "source.path")) == NULL) + path = DEFAULT_SOURCE_PATH; + + pw_websocket_listen(impl->websocket, NULL, hostname, port, path); + + if (impl->zeroconf) { + str = pw_properties_get(props, "sendspin.group-name"); + pw_zeroconf_set_announce(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_SERVER_SERVICE), + SPA_DICT_ITEM("zeroconf.session", str), + SPA_DICT_ITEM("zeroconf.port", port), + SPA_DICT_ITEM("path", path))); + } + } + if (impl->zeroconf) { + pw_zeroconf_set_browse(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_CLIENT_SERVICE))); + } + pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_info)); + + pw_log_info("Successfully loaded module-sendspin-send"); + + return 0; +out: + impl_destroy(impl); + return res; +} diff --git a/src/modules/module-sendspin/regress.h b/src/modules/module-sendspin/regress.h new file mode 100644 index 000000000..1b6919906 --- /dev/null +++ b/src/modules/module-sendspin/regress.h @@ -0,0 +1,58 @@ + +struct spa_regress { + double meanX; + double meanY; + double varX; + double covXY; + uint32_t n; + uint32_t m; + double a; +}; + +static inline void spa_regress_init(struct spa_regress *r, uint32_t m) +{ + memset(r, 0, sizeof(*r)); + r->m = m; + r->a = 1.0/m; +} +static inline void spa_regress_update(struct spa_regress *r, double x, double y) +{ + double a, dx, dy; + + if (r->n == 0) { + r->meanX = x; + r->meanY = y; + r->n++; + a = 1.0; + } else if (r->n < r->m) { + a = 1.0/r->n; + r->n++; + } else { + a = r->a; + } + dx = x - r->meanX; + dy = y - r->meanY; + + r->varX += ((1.0 - a) * dx * dx - r->varX) * a; + r->covXY += ((1.0 - a) * dx * dy - r->covXY) * a; + r->meanX += dx * a; + r->meanY += dy * a; +} +static inline void spa_regress_get(struct spa_regress *r, double *a, double *b) +{ + *a = r->covXY/r->varX; + *b = r->meanY - *a * r->meanX; +} +static inline double spa_regress_calc_y(struct spa_regress *r, double x) +{ + double a, b; + spa_regress_get(r, &a, &b); + return x * a + b; +} +static inline double spa_regress_calc_x(struct spa_regress *r, double y) +{ + double a, b; + spa_regress_get(r, &a, &b); + return (y - b) / a; +} + diff --git a/src/modules/module-sendspin/sendspin.h b/src/modules/module-sendspin/sendspin.h new file mode 100644 index 000000000..b6260d27c --- /dev/null +++ b/src/modules/module-sendspin/sendspin.h @@ -0,0 +1,27 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_SENDSPIN_H +#define PIPEWIRE_SENDSPIN_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define PW_SENDSPIN_SERVER_SERVICE "_sendspin-server._tcp" +#define PW_SENDSPIN_CLIENT_SERVICE "_sendspin._tcp" + +#define PW_SENDSPIN_DEFAULT_SERVER_PORT 8927 +#define PW_SENDSPIN_DEFAULT_CLIENT_PORT 8928 +#define PW_SENDSPIN_DEFAULT_PATH "/sendspin" + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_SENDSPIN_H */ diff --git a/src/modules/module-sendspin/teeny-sha1.c b/src/modules/module-sendspin/teeny-sha1.c new file mode 100644 index 000000000..fa5a56753 --- /dev/null +++ b/src/modules/module-sendspin/teeny-sha1.c @@ -0,0 +1,201 @@ +/******************************************************************************* + * Teeny SHA-1 + * + * The below sha1digest() calculates a SHA-1 hash value for a + * specified data buffer and generates a hex representation of the + * result. This implementation is a re-forming of the SHA-1 code at + * https://github.com/jinqiangshou/EncryptionLibrary. + * + * Copyright (c) 2017 CTrabant + * + * License: MIT, see included LICENSE file for details. + * + * To use the sha1digest() function either copy it into an existing + * project source code file or include this file in a project and put + * the declaration (example below) in the sources files where needed. + ******************************************************************************/ + +#include +#include +#include +#include + +/* Declaration: +extern int sha1digest(uint8_t *digest, char *hexdigest, const uint8_t *data, size_t databytes); +*/ + +/******************************************************************************* + * sha1digest: https://github.com/CTrabant/teeny-sha1 + * + * Calculate the SHA-1 value for supplied data buffer and generate a + * text representation in hexadecimal. + * + * Based on https://github.com/jinqiangshou/EncryptionLibrary, credit + * goes to @jinqiangshou, all new bugs are mine. + * + * @input: + * data -- data to be hashed + * databytes -- bytes in data buffer to be hashed + * + * @output: + * digest -- the result, MUST be at least 20 bytes + * hexdigest -- the result in hex, MUST be at least 41 bytes + * + * At least one of the output buffers must be supplied. The other, if not + * desired, may be set to NULL. + * + * @return: 0 on success and non-zero on error. + ******************************************************************************/ +static inline int +sha1digest(uint8_t *digest, char *hexdigest, const uint8_t *data, size_t databytes) +{ +#define SHA1ROTATELEFT(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits)))) + + uint32_t W[80]; + uint32_t H[] = {0x67452301, + 0xEFCDAB89, + 0x98BADCFE, + 0x10325476, + 0xC3D2E1F0}; + uint32_t a; + uint32_t b; + uint32_t c; + uint32_t d; + uint32_t e; + uint32_t f = 0; + uint32_t k = 0; + + uint32_t idx; + uint32_t lidx; + uint32_t widx; + uint32_t didx = 0; + + int32_t wcount; + uint32_t temp; + uint64_t databits = ((uint64_t)databytes) * 8; + uint32_t loopcount = (databytes + 8) / 64 + 1; + uint32_t tailbytes = 64 * loopcount - databytes; + uint8_t datatail[128] = {0}; + + if (!digest && !hexdigest) + return -1; + + if (!data) + return -1; + + /* Pre-processing of data tail (includes padding to fill out 512-bit chunk): + Add bit '1' to end of message (big-endian) + Add 64-bit message length in bits at very end (big-endian) */ + datatail[0] = 0x80; + datatail[tailbytes - 8] = (uint8_t) (databits >> 56 & 0xFF); + datatail[tailbytes - 7] = (uint8_t) (databits >> 48 & 0xFF); + datatail[tailbytes - 6] = (uint8_t) (databits >> 40 & 0xFF); + datatail[tailbytes - 5] = (uint8_t) (databits >> 32 & 0xFF); + datatail[tailbytes - 4] = (uint8_t) (databits >> 24 & 0xFF); + datatail[tailbytes - 3] = (uint8_t) (databits >> 16 & 0xFF); + datatail[tailbytes - 2] = (uint8_t) (databits >> 8 & 0xFF); + datatail[tailbytes - 1] = (uint8_t) (databits >> 0 & 0xFF); + + /* Process each 512-bit chunk */ + for (lidx = 0; lidx < loopcount; lidx++) + { + /* Compute all elements in W */ + memset (W, 0, 80 * sizeof (uint32_t)); + + /* Break 512-bit chunk into sixteen 32-bit, big endian words */ + for (widx = 0; widx <= 15; widx++) + { + wcount = 24; + + /* Copy byte-per byte from specified buffer */ + while (didx < databytes && wcount >= 0) + { + W[widx] += (((uint32_t)data[didx]) << wcount); + didx++; + wcount -= 8; + } + /* Fill out W with padding as needed */ + while (wcount >= 0) + { + W[widx] += (((uint32_t)datatail[didx - databytes]) << wcount); + didx++; + wcount -= 8; + } + } + + /* Extend the sixteen 32-bit words into eighty 32-bit words, with potential optimization from: + "Improving the Performance of the Secure Hash Algorithm (SHA-1)" by Max Locktyukhin */ + for (widx = 16; widx <= 31; widx++) + { + W[widx] = SHA1ROTATELEFT ((W[widx - 3] ^ W[widx - 8] ^ W[widx - 14] ^ W[widx - 16]), 1); + } + for (widx = 32; widx <= 79; widx++) + { + W[widx] = SHA1ROTATELEFT ((W[widx - 6] ^ W[widx - 16] ^ W[widx - 28] ^ W[widx - 32]), 2); + } + + /* Main loop */ + a = H[0]; + b = H[1]; + c = H[2]; + d = H[3]; + e = H[4]; + + for (idx = 0; idx <= 79; idx++) + { + if (idx <= 19) + { + f = (b & c) | ((~b) & d); + k = 0x5A827999; + } + else if (idx >= 20 && idx <= 39) + { + f = b ^ c ^ d; + k = 0x6ED9EBA1; + } + else if (idx >= 40 && idx <= 59) + { + f = (b & c) | (b & d) | (c & d); + k = 0x8F1BBCDC; + } + else if (idx >= 60 && idx <= 79) + { + f = b ^ c ^ d; + k = 0xCA62C1D6; + } + temp = SHA1ROTATELEFT (a, 5) + f + e + k + W[idx]; + e = d; + d = c; + c = SHA1ROTATELEFT (b, 30); + b = a; + a = temp; + } + + H[0] += a; + H[1] += b; + H[2] += c; + H[3] += d; + H[4] += e; + } + + /* Store binary digest in supplied buffer */ + if (digest) + { + for (idx = 0; idx < 5; idx++) + { + digest[idx * 4 + 0] = (uint8_t) (H[idx] >> 24); + digest[idx * 4 + 1] = (uint8_t) (H[idx] >> 16); + digest[idx * 4 + 2] = (uint8_t) (H[idx] >> 8); + digest[idx * 4 + 3] = (uint8_t) (H[idx]); + } + } + + /* Store hex version of digest in supplied buffer */ + if (hexdigest) + { + snprintf (hexdigest, 41, "%08x%08x%08x%08x%08x", + H[0],H[1],H[2],H[3],H[4]); + } + + return 0; +} /* End of sha1digest() */ diff --git a/src/modules/module-sendspin/websocket.c b/src/modules/module-sendspin/websocket.c new file mode 100644 index 000000000..959706d79 --- /dev/null +++ b/src/modules/module-sendspin/websocket.c @@ -0,0 +1,1060 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2021 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "config.h" + +#include "websocket.h" +#include "teeny-sha1.c" +#include "../network-utils.h" +#include "../module-raop/base64.h" + +#define pw_websocket_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_websocket_events, m, v, ##__VA_ARGS__) +#define pw_websocket_emit_destroy(w) pw_websocket_emit(w, destroy, 0) +#define pw_websocket_emit_connected(w,u,c,p) pw_websocket_emit(w, connected, 0, u, c, p) + +#define pw_websocket_connection_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_websocket_connection_events, m, v, ##__VA_ARGS__) +#define pw_websocket_connection_emit_destroy(w) pw_websocket_connection_emit(w, destroy, 0) +#define pw_websocket_connection_emit_error(w,r,m) pw_websocket_connection_emit(w, error, 0, r, m) +#define pw_websocket_connection_emit_disconnected(w) pw_websocket_connection_emit(w, disconnected, 0) +#define pw_websocket_connection_emit_drained(w) pw_websocket_connection_emit(w, drained, 0) +#define pw_websocket_connection_emit_message(w,...) pw_websocket_connection_emit(w, message, 0, __VA_ARGS__) + +#define MAX_CONNECTIONS 64 + +struct message { + struct spa_list link; + size_t len; + size_t offset; + uint32_t seq; + int (*reply) (void *user_data, int status); + void *user_data; + unsigned char data[]; +}; + +struct server { + struct pw_websocket *ws; + struct spa_list link; + + struct sockaddr_storage addr; + struct spa_source *source; + + void *user; + char **paths; + + struct spa_list connections; + uint32_t n_connections; +}; + +struct pw_websocket_connection { + struct pw_websocket *ws; + struct spa_list link; + + int refcount; + + void *user; + struct server *server; + + struct spa_hook_list listener_list; + + struct spa_source *source; + unsigned int connecting:1; + unsigned int need_flush:1; + + char *host; + char *path; + char name[128]; + bool ipv4; + uint16_t port; + + struct sockaddr_storage addr; + + uint8_t maskbit; + + int status; + char message[128]; + char key[25]; + size_t content_length; + + uint32_t send_seq; + uint32_t recv_seq; + bool draining; + + struct spa_list messages; + struct spa_list pending; + + struct pw_array data; + size_t data_wanted; + size_t data_cursor; + size_t data_state; + int (*have_data) (struct pw_websocket_connection *conn, + void *data, size_t size, size_t current); +}; + +struct pw_websocket { + struct pw_loop *loop; + + struct spa_hook_list listener_list; + + struct spa_source *source; + + char *ifname; + char *ifaddress; + char *user_agent; + char *server_name; + + struct spa_list connections; + struct spa_list servers; +}; + +void pw_websocket_connection_disconnect(struct pw_websocket_connection *conn, bool drain) +{ + struct message *msg; + + if (drain && !spa_list_is_empty(&conn->messages)) { + conn->draining = true; + return; + } + + if (conn->source != NULL) { + pw_loop_destroy_source(conn->ws->loop, conn->source); + conn->source = NULL; + } + spa_list_insert_list(&conn->messages, &conn->pending); + spa_list_consume(msg, &conn->messages, link) { + spa_list_remove(&msg->link); + free(msg); + } + if (conn->server) { + conn->server->n_connections--; + conn->server = NULL; + } + pw_websocket_connection_emit_disconnected(conn); +} + +static void websocket_connection_unref(struct pw_websocket_connection *conn) +{ + if (--conn->refcount > 0) + return; + pw_array_clear(&conn->data); + free(conn->host); + free(conn->path); + free(conn); +} + +void pw_websocket_connection_destroy(struct pw_websocket_connection *conn) +{ + pw_log_debug("destroy connection %p", conn); + spa_list_remove(&conn->link); + + pw_websocket_connection_emit_destroy(conn); + + pw_websocket_connection_disconnect(conn, false); + spa_hook_list_clean(&conn->listener_list); + + websocket_connection_unref(conn); +} + +void pw_websocket_connection_add_listener(struct pw_websocket_connection *conn, + struct spa_hook *listener, + const struct pw_websocket_connection_events *events, void *data) +{ + spa_hook_list_append(&conn->listener_list, listener, events, data); +} + +struct pw_websocket *pw_websocket_new(struct pw_loop *main_loop, struct spa_dict *props) +{ + struct pw_websocket *ws; + uint32_t i; + + if ((ws = calloc(1, sizeof(*ws))) == NULL) + return NULL; + + for (i = 0; props && i < props->n_items; i++) { + const char *k = props->items[i].key; + const char *s = props->items[i].value; + if (spa_streq(k, "local.ifname")) + ws->ifname = s ? strdup(s) : NULL; + if (spa_streq(k, "local.ifaddress")) + ws->ifaddress = s ? strdup(s) : NULL; + if (spa_streq(k, "http.user-agent")) + ws->user_agent = s ? strdup(s) : NULL; + if (spa_streq(k, "http.server-name")) + ws->server_name = s ? strdup(s) : NULL; + } + if (ws->user_agent == NULL) + ws->user_agent = spa_aprintf("PipeWire/%s", PACKAGE_VERSION); + if (ws->server_name == NULL) + ws->server_name = spa_aprintf("PipeWire/%s", PACKAGE_VERSION); + + ws->loop = main_loop; + spa_hook_list_init(&ws->listener_list); + + spa_list_init(&ws->connections); + spa_list_init(&ws->servers); + return ws; +} + +static void server_free(struct server *server) +{ + struct pw_websocket *ws = server->ws; + struct pw_websocket_connection *conn; + + pw_log_debug("%p: free server %p", ws, server); + + spa_list_remove(&server->link); + spa_list_consume(conn, &server->connections, link) + pw_websocket_connection_destroy(conn); + if (server->source) + pw_loop_destroy_source(ws->loop, server->source); + pw_free_strv(server->paths); + free(server); +} + +void pw_websocket_destroy(struct pw_websocket *ws) +{ + struct server *server; + struct pw_websocket_connection *conn; + + pw_log_info("destroy sebsocket %p", ws); + pw_websocket_emit_destroy(ws); + + spa_list_consume(server, &ws->servers, link) + server_free(server); + spa_list_consume(conn, &ws->connections, link) + pw_websocket_connection_destroy(conn); + + spa_hook_list_clean(&ws->listener_list); + free(ws->ifname); + free(ws->ifaddress); + free(ws->user_agent); + free(ws->server_name); + free(ws); +} + +void pw_websocket_add_listener(struct pw_websocket *ws, + struct spa_hook *listener, + const struct pw_websocket_events *events, void *data) +{ + spa_hook_list_append(&ws->listener_list, listener, events, data); +} + +static int update_io(struct pw_websocket_connection *conn, int io, bool active) +{ + if (conn->source) { + uint32_t mask = conn->source->mask; + SPA_FLAG_UPDATE(mask, io, active); + if (mask != conn->source->mask) + pw_loop_update_io(conn->ws->loop, conn->source, mask); + } + return 0; +} + +static int receiver_expect(struct pw_websocket_connection *conn, size_t wanted, + int (*have_data) (struct pw_websocket_connection *conn, + void *data, size_t size, size_t current)) +{ + pw_array_reset(&conn->data); + conn->data_wanted = wanted; + conn->data_cursor = 0; + conn->data_state = 0; + conn->have_data = have_data; + return update_io(conn, SPA_IO_IN, wanted); +} + +static int queue_message(struct pw_websocket_connection *conn, struct message *msg) +{ + spa_list_append(&conn->messages, &msg->link); + conn->need_flush = true; + return update_io(conn, SPA_IO_OUT, true); +} + +static int receive_websocket(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + uint8_t *d = data; + int need = 0, header = 0, i; + if (conn->data_state == 0) { + /* header done */ + conn->status = d[0] & 0xf; + if (d[1] & 0x80) + header =+ 4; + if ((d[1] & 0x7f) == 126) + header += 2; + else if ((d[1] & 0x7f) == 127) + header += 8; + else + need += d[1] & 0x7f; + conn->data_cursor = 2 + header; + need += header; + conn->data_state++; + } + else if (conn->data_state == 1) { + /* extra length and mask */ + size_t payload_len = 0; + if ((d[1] & 0x7f) == 126) + header = 2; + else if ((d[1] & 0x7f) == 127) + header = 8; + for (i = 0; i < header; i++) + payload_len = (payload_len << 8) | d[i + 2]; + need += payload_len; + conn->data_state++; + } + if (need == 0) { + uint8_t *payload = &d[conn->data_cursor]; + size_t i, payload_size = conn->data.size - conn->data_cursor; + struct iovec iov[1] = {{ payload, payload_size }}; + + if (d[1] & 0x80) { + uint8_t *mask = &d[conn->data_cursor - 4]; + for (i = 0; i < payload_size; i++) + payload[i] ^= mask[i & 3]; + } + + switch (conn->status) { + case PW_WEBSOCKET_OPCODE_PING: + pw_log_info("received ping"); + pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_PONG, iov, 1); + break; + case PW_WEBSOCKET_OPCODE_CLOSE: + pw_log_info("received close"); + pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_CLOSE, iov, 1); + pw_websocket_connection_disconnect(conn, true); + break; + default: + pw_log_debug("received message %02x", conn->status); + pw_websocket_connection_emit_message(conn, conn->status, + payload, payload_size); + } + receiver_expect(conn, 2, receive_websocket); + } + return need; +} + +static int connection_upgrade_failed(struct pw_websocket_connection *conn, + int status, const char *message) +{ + FILE *f; + size_t len; + struct message *msg; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + fseek(f, offsetof(struct message, data), SEEK_SET); + fprintf(f, "HTTP/1.1 %d %s\r\n", status, message); + fprintf(f, "Transfer-Encoding: chunked\r\n"); + fprintf(f, "Content-Type: application/octet-stream\r\n"); + fprintf(f, "Server: %s\r\n", conn->ws->server_name); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + pw_log_info("send error %d %s", status, message); + return queue_message(conn, msg); +} + +static void make_accept(const char *key, char *accept) +{ + static const char *str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + uint8_t tmp[24 + 36], sha1[20]; + memcpy(&tmp[ 0], key, 24); + memcpy(&tmp[24], str, 36); + sha1digest(sha1, NULL, tmp, sizeof(tmp)); + pw_base64_encode(sha1, sizeof(sha1), accept, '='); +} + +static int connection_upgraded_send(struct pw_websocket_connection *conn) +{ + FILE *f; + size_t len; + struct message *msg; + char accept[29]; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + make_accept(conn->key, accept); + + fseek(f, offsetof(struct message, data), SEEK_SET); + fprintf(f, "HTTP/1.1 101 Switching Protocols\r\n"); + fprintf(f, "Upgrade: websocket\r\n"); + fprintf(f, "Connection: Upgrade\r\n"); + fprintf(f, "Sec-WebSocket-Accept: %s\r\n", accept); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + pw_log_info("send upgrade %s", msg->data); + return queue_message(conn, msg); +} + +static int complete_upgrade(struct pw_websocket_connection *conn) +{ + pw_websocket_emit_connected(conn->ws, conn->user, conn, conn->path); + return receiver_expect(conn, 2, receive_websocket); +} + +static int header_key_val(char *buf, char **key, char **val) +{ + char *v; + *key = buf; + if ((v = strstr(buf, ":")) == NULL) + return -EPROTO; + *v++ = '\0'; + *val = pw_strip(v, " "); + return 0; +} + +static int receive_http_request(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + char *d = data, *l; + char c = d[current]; + int need = 1; + + if (conn->data_state == 0) { + if (c == '\n') { + int v1, v2; + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (sscanf(l, "GET %ms HTTP/%d.%d", &conn->path, &v1, &v2) != 3) + return -EPROTO; + conn->data_state++; + } + } + else if (conn->data_state == 1) { + if (c == '\n') { + char *key, *val; + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + if (strlen(l) > 0) { + conn->data_cursor = current+1; + if (header_key_val(l, &key, &val) < 0) + return -EPROTO; + if (spa_streq(key, "Sec-WebSocket-Key")) + strncpy(conn->key, val, sizeof(conn->key)-1); + } else { + conn->data_state++; + need = 0; + } + } + } + if (need == 0) { + if (conn->server && conn->server->paths && + pw_strv_find(conn->server->paths, conn->path) < 0) { + connection_upgrade_failed(conn, 404, "Not Found"); + } else { + connection_upgraded_send(conn); + complete_upgrade(conn); + } + } + return need; +} + +static struct message *find_pending(struct pw_websocket_connection *conn, uint32_t seq) +{ + struct message *msg; + spa_list_for_each(msg, &conn->pending, link) { + if (msg->seq == seq) + return msg; + } + return NULL; +} + +static int receive_http_reply(struct pw_websocket_connection *conn, + void *data, size_t size, size_t current) +{ + char *d = data, *l; + char c = d[current]; + int need = 1; + + if (conn->data_state == 0) { + if (c == '\n') { + int v1, v2, status, message; + /* status complete */ + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (sscanf(l, "HTTP/%d.%d %n%d", &v1, &v2, &message, &status) != 3) + return -EPROTO; + conn->status = status; + strcpy(conn->message, &l[message]); + conn->content_length = 0; + conn->data_state++; + } + } + else if (conn->data_state == 1) { + if (c == '\n') { + /* header line complete */ + d[current] = '\0'; + l = pw_strip(&d[conn->data_cursor], "\n\r "); + conn->data_cursor = current+1; + if (strlen(l) > 0) { + char *key, *value; + if (header_key_val(l, &key, &value) < 0) + return -EPROTO; + if (spa_streq(key, "Sec-WebSocket-Accept")) { + char accept[29]; + make_accept(conn->key, accept); + if (!spa_streq(value, accept)) { + pw_log_error("got Accept:%s expected:%s", value, accept); + return -EPROTO; + } + } + else if (spa_streq(key, "Content-Length")) + conn->content_length = atoi(value); + } else { + conn->data_state++; + need = conn->content_length; + } + } + } + if (need == 0) { + /* message completed */ + uint32_t seq; + int res; + struct message *msg; + + seq = conn->recv_seq++; + + pw_log_info("received reply to request with seq:%" PRIu32, seq); + + if ((msg = find_pending(conn, seq)) != NULL) { + res = msg->reply(msg->user_data, conn->status); + spa_list_remove(&msg->link); + free(msg); + + if (res < 0) + pw_websocket_connection_emit_error(conn, res, conn->message); + } + } + return need; +} + +static int on_upgrade_reply(void *user_data, int status) +{ + struct pw_websocket_connection *conn = user_data; + if (status != 101) + return -EPROTO; + return complete_upgrade(conn); +} + +static int handle_connect(struct pw_websocket_connection *conn, int fd) +{ + int res = 0; + socklen_t res_len; + FILE *f; + size_t len; + struct message *msg; + uint8_t key[16]; + + len = sizeof(res); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &res, &res_len) < 0) { + pw_log_error("getsockopt: %m"); + return -errno; + } + if (res != 0) + return -res; + + pw_log_info("connected to %s:%u", conn->name, conn->port); + + conn->connecting = false; + conn->status = 0; + + if ((f = open_memstream((char**)&msg, &len)) == NULL) + return -errno; + + fseek(f, offsetof(struct message, data), SEEK_SET); + + /* make a key */ + pw_random(key, sizeof(key)); + pw_base64_encode(key, sizeof(key), conn->key, '='); + + fprintf(f, "GET %s HTTP/1.1\r\n", conn->path); + fprintf(f, "Host: %s\r\n", conn->host); + fprintf(f, "Upgrade: websocket\r\n"); + fprintf(f, "Connection: Upgrade\r\n"); + fprintf(f, "Sec-WebSocket-Version: 13\r\n"); + fprintf(f, "Sec-WebSocket-Key: %s\r\n", conn->key); + fprintf(f, "Accept: */*\r\n"); + fprintf(f, "User-Agent: %s\r\n", conn->ws->user_agent); + fprintf(f, "\r\n"); + fclose(f); + + msg->len = len - offsetof(struct message, data); + msg->reply = on_upgrade_reply; + msg->user_data = conn; + msg->seq = conn->send_seq++; + + pw_log_info("%s", msg->data); + + receiver_expect(conn, 1, receive_http_reply); + + return queue_message(conn, msg); +} + +static int handle_input(struct pw_websocket_connection *conn) +{ + int res; + + while (conn->data.size < conn->data_wanted) { + size_t current = conn->data.size; + size_t pending = conn->data_wanted - current; + void *b; + + if (conn->source == NULL) + return -EPIPE; + + if ((res = pw_array_ensure_size(&conn->data, pending)) < 0) + return res; + b = SPA_PTROFF(conn->data.data, current, void); + + res = read(conn->source->fd, b, pending); + if (res == 0) + return 0; + if (res < 0) { + res = -errno; + if (res == -EINTR) + continue; + if (res != -EAGAIN && res != -EWOULDBLOCK) + return res; + return -EAGAIN; + } + conn->data.size += res; + if (conn->data.size == conn->data_wanted) { + if ((res = conn->have_data(conn, + conn->data.data, + conn->data.size, + current)) < 0) + return res; + + conn->data_wanted += res; + } + } + return 0; +} + +static int flush_output(struct pw_websocket_connection *conn) +{ + int res; + + conn->need_flush = false; + + if (conn->source == NULL) + return -EPIPE; + + while (true) { + struct message *msg; + void *data; + size_t size; + + if (spa_list_is_empty(&conn->messages)) { + if (conn->draining) + pw_websocket_connection_disconnect(conn, false); + break; + } + msg = spa_list_first(&conn->messages, struct message, link); + + if (msg->offset < msg->len) { + data = SPA_PTROFF(msg->data, msg->offset, void); + size = msg->len - msg->offset; + } else { + spa_list_remove(&msg->link); + if (msg->reply != NULL) + spa_list_append(&conn->pending, &msg->link); + else + free(msg); + continue; + } + + while (true) { + res = send(conn->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT); + if (res < 0) { + res = -errno; + if (res == -EINTR) + continue; + if (res != -EAGAIN && res != -EWOULDBLOCK) + pw_log_warn("conn %p: send %zu, error %d: %m", + conn, size, res); + return res; + } + msg->offset += res; + break; + } + } + return 0; +} + +static void +on_source_io(void *data, int fd, uint32_t mask) +{ + struct pw_websocket_connection *conn = data; + int res; + + conn->refcount++; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + res = -EPIPE; + goto error; + } + if (mask & SPA_IO_IN) { + if ((res = handle_input(conn)) != -EAGAIN) + goto error; + } + if (mask & SPA_IO_OUT || conn->need_flush) { + if (conn->connecting) { + if ((res = handle_connect(conn, fd)) < 0) + goto error; + } + res = flush_output(conn); + if (res >= 0) { + if (conn->source) + pw_loop_update_io(conn->ws->loop, conn->source, + conn->source->mask & ~SPA_IO_OUT); + } else if (res != -EAGAIN) + goto error; + } +done: + websocket_connection_unref(conn); + return; +error: + if (res < 0) { + pw_log_error("%p: %s got connection error %d (%s)", conn, + conn->name, res, spa_strerror(res)); + snprintf(conn->message, sizeof(conn->message), "%s", spa_strerror(res)); + pw_websocket_connection_emit_error(conn, res, conn->message); + } else { + pw_log_info("%p: %s connection closed", conn, conn->name); + } + pw_websocket_connection_disconnect(conn, false); + goto done; +} + +int pw_websocket_connection_address(struct pw_websocket_connection *conn, + struct sockaddr *addr, socklen_t addr_len) +{ + memcpy(addr, &conn->addr, SPA_MIN(addr_len, sizeof(conn->addr))); + return 0; +} + +static struct pw_websocket_connection *connection_new(struct pw_websocket *ws, void *user, + struct sockaddr *addr, socklen_t addr_len, int fd, struct server *server) +{ + struct pw_websocket_connection *conn; + + if ((conn = calloc(1, sizeof(*conn))) == NULL) + goto error; + + if ((conn->source = pw_loop_add_io(ws->loop, spa_steal_fd(fd), + SPA_IO_ERR | SPA_IO_HUP | SPA_IO_OUT, + true, on_source_io, conn)) == NULL) + goto error; + + memcpy(&conn->addr, addr, SPA_MIN(addr_len, sizeof(conn->addr))); + conn->ws = ws; + conn->server = server; + conn->user = user; + if (server) + spa_list_append(&server->connections, &conn->link); + else + spa_list_append(&ws->connections, &conn->link); + + conn->refcount = 1; + if (pw_net_get_ip(&conn->addr, conn->name, sizeof(conn->name), + &conn->ipv4, &conn->port) < 0) + snprintf(conn->name, sizeof(conn->name), "connection %p", conn); + + spa_list_init(&conn->messages); + spa_list_init(&conn->pending); + spa_hook_list_init(&conn->listener_list); + pw_array_init(&conn->data, 4096); + + pw_log_debug("new websocket %p connection %p %s:%u", ws, + conn, conn->name, conn->port); + + return conn; +error: + if (fd != -1) + close(fd); + free(conn); + return NULL; +} + +static int make_tcp_socket(struct server *server, const char *name, uint16_t port, const char *ifname, + const char *ifaddress) +{ + struct sockaddr_storage addr; + int res, on; + socklen_t len = 0; + spa_autoclose int fd = -1; + + if ((res = pw_net_parse_address_port(name, ifaddress, port, &addr, &len)) < 0) { + pw_log_error("%p: can't parse address %s: %s", server, + name, spa_strerror(res)); + goto error; + } + + if ((fd = socket(addr.ss_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { + res = -errno; + pw_log_error("%p: socket() failed: %m", server); + goto error; + } +#ifdef SO_BINDTODEVICE + if (ifname && setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) { + res = -errno; + pw_log_error("%p: setsockopt(SO_BINDTODEVICE) failed: %m", server); + goto error; + } +#endif + on = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *) &on, sizeof(on)) < 0) + pw_log_warn("%p: setsockopt(): %m", server); + + if (bind(fd, (struct sockaddr *) &addr, len) < 0) { + res = -errno; + pw_log_error("%p: bind() failed: %m", server); + goto error; + } + if (listen(fd, 5) < 0) { + res = -errno; + pw_log_error("%p: listen() failed: %m", server); + goto error; + } + if (getsockname(fd, (struct sockaddr *)&addr, &len) < 0) { + res = -errno; + pw_log_error("%p: getsockname() failed: %m", server); + goto error; + } + + server->addr = addr; + + return spa_steal_fd(fd); +error: + return res; +} + +static void +on_server_connect(void *data, int fd, uint32_t mask) +{ + struct server *server = data; + struct pw_websocket *ws = server->ws; + struct sockaddr_storage addr; + socklen_t addrlen; + spa_autoclose int conn_fd = -1; + int val; + struct pw_websocket_connection *conn = NULL; + + addrlen = sizeof(addr); + if ((conn_fd = accept4(fd, (struct sockaddr*)&addr, &addrlen, + SOCK_NONBLOCK | SOCK_CLOEXEC)) < 0) + goto error; + + if (server->n_connections >= MAX_CONNECTIONS) + goto error_refused; + + if ((conn = connection_new(ws, server->user, (struct sockaddr*)&addr, sizeof(addr), + spa_steal_fd(conn_fd), server)) == NULL) + goto error; + + server->n_connections++; + + pw_log_info("%p: connection:%p %s:%u connected", ws, + conn, conn->name, conn->port); + + val = 1; + if (setsockopt(conn->source->fd, IPPROTO_TCP, TCP_NODELAY, + (const void *) &val, sizeof(val)) < 0) + pw_log_warn("TCP_NODELAY failed: %m"); + + val = IPTOS_LOWDELAY; + if (setsockopt(conn->source->fd, IPPROTO_IP, IP_TOS, + (const void *) &val, sizeof(val)) < 0) + pw_log_warn("IP_TOS failed: %m"); + + receiver_expect(conn, 1, receive_http_request); + return; + +error_refused: + errno = ECONNREFUSED; +error: + pw_log_error("%p: failed to create connection: %m", ws); + return; +} + +int pw_websocket_listen(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *paths) +{ + int res; + struct server *server; + uint16_t port = atoi(service); + + if ((server = calloc(1, sizeof(struct server))) == NULL) + return -errno; + + server->ws = ws; + spa_list_append(&ws->servers, &server->link); + + server->user = user; + spa_list_init(&server->connections); + + if ((res = make_tcp_socket(server, hostname, port, ws->ifname, ws->ifaddress)) < 0) + goto error; + + if ((server->source = pw_loop_add_io(ws->loop, res, SPA_IO_IN, + true, on_server_connect, server)) == NULL) { + res = -errno; + goto error; + } + if (paths) + server->paths = pw_strv_parse(paths, strlen(paths), INT_MAX, NULL); + + pw_log_info("%p: listen %s:%u %s", ws, hostname, port, paths); + return 0; +error: + pw_log_error("%p: can't create server: %s", ws, spa_strerror(res)); + server_free(server); + return res; +} + +int pw_websocket_cancel(struct pw_websocket *ws, void *user) +{ + struct server *s, *ts; + struct pw_websocket_connection *c, *tc; + int count = 0; + + spa_list_for_each_safe(s, ts, &ws->servers, link) { + if (s->user == user) { + server_free(s); + count++; + } + } + spa_list_for_each_safe(c, tc, &ws->connections, link) { + if (c->user == user) { + pw_websocket_connection_destroy(c); + count++; + } + } + return count; +} + +int pw_websocket_connect(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *path) +{ + struct addrinfo hints; + struct addrinfo *result, *rp; + int res, fd; + struct pw_websocket_connection *conn = NULL; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + + if ((res = getaddrinfo(hostname, service, &hints, &result)) != 0) { + pw_log_error("getaddrinfo: %s", gai_strerror(res)); + return -EINVAL; + } + res = -ENOENT; + for (rp = result; rp != NULL; rp = rp->ai_next) { + if ((fd = socket(rp->ai_family, + rp->ai_socktype | SOCK_CLOEXEC | SOCK_NONBLOCK, + rp->ai_protocol)) == -1) + continue; + + res = connect(fd, rp->ai_addr, rp->ai_addrlen); + if (res == 0 || (res < 0 && errno == EINPROGRESS)) + break; + + res = -errno; + close(fd); + } + if (rp == NULL) { + pw_log_error("Could not connect to %s:%s: %s", hostname, service, + spa_strerror(res)); + } else { + if ((conn = connection_new(ws, user, rp->ai_addr, rp->ai_addrlen, fd, NULL)) == NULL) + res = -errno; + } + freeaddrinfo(result); + if (conn == NULL) + return res; + + conn->connecting = true; + conn->maskbit = 0x80; + conn->path = strdup(path); + asprintf(&conn->host, "%s:%s", hostname, service); + + pw_log_info("%p: connecting to %s:%u path:%s", conn, + conn->name, conn->port, path); + return 0; +} + +int pw_websocket_connection_send(struct pw_websocket_connection *conn, uint8_t opcode, + const struct iovec *iov, size_t iov_len) +{ + struct message *msg; + size_t len = 2, i, j, k; + uint8_t *d, *mask = NULL, maskbit = conn->maskbit; + size_t payload_length = 0; + + for (i = 0; i < iov_len; i++) + payload_length += iov[i].iov_len; + + if ((msg = calloc(1, sizeof(*msg) + 14 + payload_length)) == NULL) + return -errno; + + d = msg->data; + d[0] = 0x80 | opcode; + + if (payload_length < 126) + k = 0; + else if (payload_length < 65536) + k = 2; + else + k = 8; + + d[1] = maskbit | (k == 0 ? payload_length : (k == 2 ? 126 : 127)); + for (i = 0, j = (k-1)*8 ; i < k; i++, j -= 8) + d[len++] = (payload_length >> j) & 0xff; + + if (maskbit) { + mask = &d[len]; + pw_random(mask, 4); + len += 4; + } + for (i = 0, k = 0; i < iov_len; i++) { + if (maskbit) + for (j = 0; j < iov[i].iov_len; j++, k++) + d[len+j] = ((uint8_t*)iov[i].iov_base)[j] ^ mask[k & 3]; + else + memcpy(&d[len], iov[i].iov_base, iov[i].iov_len); + + len += iov[i].iov_len; + } + msg->len = len; + + return queue_message(conn, msg); +} + +int pw_websocket_connection_send_text(struct pw_websocket_connection *conn, + const char *payload, size_t payload_len) +{ + struct iovec iov[1] = {{ (void*)payload, payload_len }}; + pw_log_info("send text %.*s", (int)payload_len, payload); + return pw_websocket_connection_send(conn, PW_WEBSOCKET_OPCODE_TEXT, iov, 1); +} diff --git a/src/modules/module-sendspin/websocket.h b/src/modules/module-sendspin/websocket.h new file mode 100644 index 000000000..0f0da36e7 --- /dev/null +++ b/src/modules/module-sendspin/websocket.h @@ -0,0 +1,85 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_WEBSOCKET_H +#define PIPEWIRE_WEBSOCKET_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct pw_websocket; +struct pw_websocket_connection; + +#define PW_WEBSOCKET_OPCODE_TEXT 0x1 +#define PW_WEBSOCKET_OPCODE_BINARY 0x2 +#define PW_WEBSOCKET_OPCODE_CLOSE 0x8 +#define PW_WEBSOCKET_OPCODE_PING 0x9 +#define PW_WEBSOCKET_OPCODE_PONG 0xa + +struct pw_websocket_connection_events { +#define PW_VERSION_WEBSOCKET_CONNECTION_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + void (*error) (void *data, int res, const char *reason); + void (*disconnected) (void *data); + + void (*message) (void *data, + int opcode, void *payload, size_t size); +}; + +void pw_websocket_connection_add_listener(struct pw_websocket_connection *conn, + struct spa_hook *listener, + const struct pw_websocket_connection_events *events, void *data); + +void pw_websocket_connection_destroy(struct pw_websocket_connection *conn); +void pw_websocket_connection_disconnect(struct pw_websocket_connection *conn, bool drain); + +int pw_websocket_connection_address(struct pw_websocket_connection *conn, + struct sockaddr *addr, socklen_t addr_len); + +int pw_websocket_connection_send(struct pw_websocket_connection *conn, + uint8_t opcode, const struct iovec *iov, size_t iov_len); + +int pw_websocket_connection_send_text(struct pw_websocket_connection *conn, + const char *payload, size_t payload_len); + + +struct pw_websocket_events { +#define PW_VERSION_WEBSOCKET_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + + void (*connected) (void *data, void *user, + struct pw_websocket_connection *conn, const char *path); +}; + +struct pw_websocket * pw_websocket_new(struct pw_loop *main_loop, + struct spa_dict *props); + +void pw_websocket_destroy(struct pw_websocket *ws); + +void pw_websocket_add_listener(struct pw_websocket *ws, + struct spa_hook *listener, + const struct pw_websocket_events *events, void *data); + +int pw_websocket_connect(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *path); + +int pw_websocket_listen(struct pw_websocket *ws, void *user, + const char *hostname, const char *service, const char *paths); + +int pw_websocket_cancel(struct pw_websocket *ws, void *user); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_WEBSOCKET_H */ diff --git a/src/modules/module-sendspin/zeroconf.c b/src/modules/module-sendspin/zeroconf.c new file mode 100644 index 000000000..fb1c5a221 --- /dev/null +++ b/src/modules/module-sendspin/zeroconf.c @@ -0,0 +1,558 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2021 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include +#include +#include +#include + +#include "config.h" + +#include +#include + +#include +#include +#include +#include + +#include "../module-zeroconf-discover/avahi-poll.h" + +#include "zeroconf.h" + +#define pw_zeroconf_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_zeroconf_events, m, v, ##__VA_ARGS__) +#define pw_zeroconf_emit_destroy(c) pw_zeroconf_emit(c, destroy, 0) +#define pw_zeroconf_emit_error(c,e,m) pw_zeroconf_emit(c, error, 0, e, m) +#define pw_zeroconf_emit_added(c,id,i) pw_zeroconf_emit(c, added, 0, id, i) +#define pw_zeroconf_emit_removed(c,id,i) pw_zeroconf_emit(c, removed, 0, id, i) + +struct service_info { + AvahiIfIndex interface; + AvahiProtocol protocol; + const char *name; + const char *type; + const char *domain; + const char *host_name; + AvahiAddress address; + uint16_t port; +}; + +#define SERVICE_INFO(...) ((struct service_info){ __VA_ARGS__ }) + +struct entry { + struct pw_zeroconf *zc; + struct spa_list link; + +#define TYPE_ANNOUNCE 0 +#define TYPE_BROWSE 1 + uint32_t type; + void *user; + + struct pw_properties *props; + + AvahiEntryGroup *group; + AvahiServiceBrowser *browser; + + struct spa_list services; +}; + +struct service { + struct entry *e; + struct spa_list link; + + struct service_info info; + + struct pw_properties *props; +}; + +struct pw_zeroconf { + int refcount; + struct pw_context *context; + + struct pw_properties *props; + + struct spa_hook_list listener_list; + + AvahiPoll *poll; + AvahiClient *client; + AvahiClientState state; + + struct spa_list entries; + + bool discover_local; +}; + +static struct service *service_find(struct entry *e, const struct service_info *info) +{ + struct service *s; + spa_list_for_each(s, &e->services, link) { + if (s->info.interface == info->interface && + s->info.protocol == info->protocol && + spa_streq(s->info.name, info->name) && + spa_streq(s->info.type, info->type) && + spa_streq(s->info.domain, info->domain)) + return s; + } + return NULL; +} + +static void service_free(struct service *s) +{ + spa_list_remove(&s->link); + free((void*)s->info.name); + free((void*)s->info.type); + free((void*)s->info.domain); + free((void*)s->info.host_name); + pw_properties_free(s->props); + free(s); +} + +struct entry *entry_find(struct pw_zeroconf *zc, uint32_t type, void *user) +{ + struct entry *e; + spa_list_for_each(e, &zc->entries, link) + if (e->type == type && e->user == user) + return e; + return NULL; +} + +static void entry_free(struct entry *e) +{ + struct service *s; + + spa_list_remove(&e->link); + if (e->group) + avahi_entry_group_free(e->group); + spa_list_consume(s, &e->services, link) + service_free(s); + pw_properties_free(e->props); + free(e); +} + +static void zeroconf_free(struct pw_zeroconf *zc) +{ + struct entry *a; + + spa_list_consume(a, &zc->entries, link) + entry_free(a); + + if (zc->client) + avahi_client_free(zc->client); + if (zc->poll) + pw_avahi_poll_free(zc->poll); + pw_properties_free(zc->props); + free(zc); +} + +static void zeroconf_unref(struct pw_zeroconf *zc) +{ + if (--zc->refcount == 0) + zeroconf_free(zc); +} + +void pw_zeroconf_destroy(struct pw_zeroconf *zc) +{ + pw_zeroconf_emit_destroy(zc); + + zeroconf_unref(zc); +} + +static struct service *service_new(struct entry *e, + const struct service_info *info, AvahiStringList *txt) +{ + struct service *s; + struct pw_zeroconf *zc = e->zc; + const AvahiAddress *a = &info->address; + static const char *link_local_range = "169.254."; + AvahiStringList *l; + char at[AVAHI_ADDRESS_STR_MAX], if_suffix[16] = ""; + + if ((s = calloc(1, sizeof(*s))) == NULL) + goto error; + + s->e = e; + spa_list_append(&e->services, &s->link); + + s->info.interface = info->interface; + s->info.protocol = info->protocol; + s->info.name = strdup(info->name); + s->info.type = strdup(info->type); + s->info.domain = strdup(info->domain); + s->info.host_name = strdup(info->host_name); + s->info.address = info->address; + s->info.port = info->port; + + if ((s->props = pw_properties_new(NULL, NULL)) == NULL) + goto error; + + if (a->proto == AVAHI_PROTO_INET6 && + a->data.ipv6.address[0] == 0xfe && + (a->data.ipv6.address[1] & 0xc0) == 0x80) + snprintf(if_suffix, sizeof(if_suffix), "%%%d", info->interface); + + avahi_address_snprint(at, sizeof(at), a); + if (a->proto == AVAHI_PROTO_INET && + spa_strstartswith(at, link_local_range)) + snprintf(if_suffix, sizeof(if_suffix), "%%%d", info->interface); + + pw_properties_setf(s->props, "zeroconf.ifindex", "%d", info->interface); + pw_properties_setf(s->props, "zeroconf.name", "%s", info->name); + pw_properties_setf(s->props, "zeroconf.type", "%s", info->type); + pw_properties_setf(s->props, "zeroconf.domain", "%s", info->domain); + pw_properties_setf(s->props, "zeroconf.hostname", "%s", info->host_name); + pw_properties_setf(s->props, "zeroconf.address", "%s%s", at, if_suffix); + pw_properties_setf(s->props, "zeroconf.port", "%u", info->port); + + for (l = txt; l; l = l->next) { + char *key, *value; + + if (avahi_string_list_get_pair(l, &key, &value, NULL) != 0) + break; + + pw_properties_set(s->props, key, value); + avahi_free(key); + avahi_free(value); + } + + pw_log_info("new %s %s %s %s", info->name, info->type, info->domain, info->host_name); + pw_zeroconf_emit_added(zc, e->user, &s->props->dict); + + return s; + +error: + if (s) + service_free(s); + return NULL; +} + +static void resolver_cb(AvahiServiceResolver *r, AvahiIfIndex interface, + AvahiProtocol protocol, AvahiResolverEvent event, + const char *name, const char *type, const char *domain, + const char *host_name, const AvahiAddress *a, uint16_t port, + AvahiStringList *txt, AvahiLookupResultFlags flags, + void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + struct service_info info; + + if (event != AVAHI_RESOLVER_FOUND) { + pw_log_error("Resolving of '%s' failed: %s", name, + avahi_strerror(avahi_client_errno(zc->client))); + goto done; + } + + info = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain, + .host_name = host_name, + .address = *a, + .port = port); + + service_new(e, &info, txt); +done: + avahi_service_resolver_free(r); +} + +static void browser_cb(AvahiServiceBrowser *b, AvahiIfIndex interface, AvahiProtocol protocol, + AvahiBrowserEvent event, const char *name, const char *type, const char *domain, + AvahiLookupResultFlags flags, void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + struct service_info info; + struct service *s; + + if ((flags & AVAHI_LOOKUP_RESULT_LOCAL) && !zc->discover_local) + return; + + info = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain); + + s = service_find(e, &info); + + switch (event) { + case AVAHI_BROWSER_NEW: + if (s != NULL) + return; + if (!(avahi_service_resolver_new(zc->client, + interface, protocol, + name, type, domain, + AVAHI_PROTO_UNSPEC, 0, + resolver_cb, e))) { + int res = avahi_client_errno(zc->client); + pw_log_error("can't make service resolver: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + } + break; + case AVAHI_BROWSER_REMOVE: + if (s == NULL) + return; + pw_log_info("removed %s %s %s", name, type, domain); + pw_zeroconf_emit_removed(zc, e->user, &s->props->dict); + service_free(s); + break; + default: + break; + } +} + +static int do_browse(struct pw_zeroconf *zc, struct entry *e) +{ + const struct spa_dict_item *it; + const char *service_name = NULL; + int res; + + if (e->browser == NULL) { + spa_dict_for_each(it, &e->props->dict) { + if (spa_streq(it->key, "zeroconf.service")) + service_name = it->value; + } + if (service_name == NULL) { + res = -EINVAL; + pw_log_error("no service provided"); + pw_zeroconf_emit_error(zc, res, spa_strerror(res)); + return res; + } + e->browser = avahi_service_browser_new(zc->client, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + service_name, NULL, 0, + browser_cb, e); + if (e->browser == NULL) { + res = avahi_client_errno(zc->client); + pw_log_error("can't make browser: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + } + return 0; +} + +static void entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) +{ + struct entry *e = userdata; + struct pw_zeroconf *zc = e->zc; + int res; + + zc->refcount++; + + switch (state) { + case AVAHI_ENTRY_GROUP_ESTABLISHED: + pw_log_info("Service successfully established"); + break; + case AVAHI_ENTRY_GROUP_COLLISION: + pw_log_error("Service name collision"); + break; + case AVAHI_ENTRY_GROUP_FAILURE: + res = avahi_client_errno(zc->client); + pw_log_error("Entry group failure: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + break; + case AVAHI_ENTRY_GROUP_UNCOMMITED: + case AVAHI_ENTRY_GROUP_REGISTERING: + break; + } + zeroconf_unref(zc); +} + +static int do_announce(struct pw_zeroconf *zc, struct entry *e) +{ + AvahiStringList *txt = NULL; + int res; + const struct spa_dict_item *it; + const char *session_name = "unnamed", *service = NULL; + uint16_t port = 0; + + if (e->group == NULL) { + e->group = avahi_entry_group_new(zc->client, + entry_group_callback, e); + if (e->group == NULL) { + res = avahi_client_errno(zc->client); + pw_log_error("can't make group: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + } + avahi_entry_group_reset(e->group); + + spa_dict_for_each(it, &e->props->dict) { + if (spa_streq(it->key, "zeroconf.session")) + session_name = it->value; + else if (spa_streq(it->key, "zeroconf.port")) + port = atoi(it->value); + else if (spa_streq(it->key, "zeroconf.service")) + service = it->value; + else + txt = avahi_string_list_add_pair(txt, it->key, it->value); + } + if (service == NULL) { + res = -EINVAL; + pw_log_error("no service provided"); + pw_zeroconf_emit_error(zc, res, spa_strerror(res)); + return res; + } + res = avahi_entry_group_add_service_strlst(e->group, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + (AvahiPublishFlags)0, session_name, + service, NULL, NULL, port, txt); + avahi_string_list_free(txt); + + if (res < 0) { + res = avahi_client_errno(zc->client); + pw_log_error("can't add service: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + if ((res = avahi_entry_group_commit(e->group)) < 0) { + res = avahi_client_errno(zc->client); + pw_log_error("can't commit group: %s", avahi_strerror(res)); + pw_zeroconf_emit_error(zc, res, avahi_strerror(res)); + return -EIO; + } + return 0; +} + +static int entry_start(struct pw_zeroconf *zc, struct entry *e) +{ + if (zc->state != AVAHI_CLIENT_S_REGISTERING && + zc->state != AVAHI_CLIENT_S_RUNNING && + zc->state != AVAHI_CLIENT_S_COLLISION) + return 0; + + if (e->type == TYPE_ANNOUNCE) + return do_announce(zc, e); + else + return do_browse(zc, e); +} + +static void client_callback(AvahiClient *c, AvahiClientState state, void *d) +{ + struct pw_zeroconf *zc = d; + struct entry *e; + + zc->client = c; + zc->refcount++; + zc->state = state; + + switch (state) { + case AVAHI_CLIENT_S_REGISTERING: + case AVAHI_CLIENT_S_RUNNING: + case AVAHI_CLIENT_S_COLLISION: + spa_list_for_each(e, &zc->entries, link) + entry_start(zc, e); + break; + case AVAHI_CLIENT_FAILURE: + { + int err = avahi_client_errno(c); + pw_zeroconf_emit_error(zc, err, avahi_strerror(err)); + break; + } + case AVAHI_CLIENT_CONNECTING: + default: + break; + } + zeroconf_unref(zc); +} + +static struct entry *entry_new(struct pw_zeroconf *zc, uint32_t type, void *user, const struct spa_dict *info) +{ + struct entry *e; + + if ((e = calloc(1, sizeof(*e))) == NULL) + return NULL; + + e->zc = zc; + e->type = type; + e->user = user; + e->props = pw_properties_new_dict(info); + spa_list_append(&zc->entries, &e->link); + spa_list_init(&e->services); + pw_log_info("created %s", type == TYPE_ANNOUNCE ? "announce" : "browse"); + return e; +} + +static int set_entry(struct pw_zeroconf *zc, uint32_t type, void *user, const struct spa_dict *info) +{ + struct entry *e; + + e = entry_find(zc, type, user); + if (e == NULL) { + if (info == NULL) + return 0; + e = entry_new(zc, type, user, info); + entry_start(zc, e); + } else { + if (info == NULL) + entry_free(e); + else { + pw_properties_update(e->props, info); + entry_start(zc, e); + } + } + return 0; +} +int pw_zeroconf_set_announce(struct pw_zeroconf *zc, void *user, const struct spa_dict *info) +{ + return set_entry(zc, TYPE_ANNOUNCE, user, info); +} +int pw_zeroconf_set_browse(struct pw_zeroconf *zc, void *user, const struct spa_dict *info) +{ + return set_entry(zc, TYPE_BROWSE, user, info); +} + +struct pw_zeroconf * pw_zeroconf_new(struct pw_context *context, + struct spa_dict *props) +{ + struct pw_zeroconf *zc; + uint32_t i; + int res; + + if ((zc = calloc(1, sizeof(*zc))) == NULL) + return NULL; + + zc->refcount = 1; + zc->context = context; + spa_hook_list_init(&zc->listener_list); + spa_list_init(&zc->entries); + zc->props = props ? pw_properties_new_dict(props) : pw_properties_new(NULL, NULL); + zc->discover_local = true; + + for (i = 0; props && i < props->n_items; i++) { + const char *k = props->items[i].key; + const char *v = props->items[i].value; + + if (spa_streq(k, "zeroconf.disable-local")) + zc->discover_local = spa_atob(v); + } + + zc->poll = pw_avahi_poll_new(context); + if (zc->poll == NULL) + goto error; + + zc->client = avahi_client_new(zc->poll, AVAHI_CLIENT_NO_FAIL, + client_callback, zc, &res); + if (!zc->client) { + pw_log_error("failed to create avahi client: %s", avahi_strerror(res)); + goto error; + } + return zc; +error: + zeroconf_free(zc); + return NULL; +} + +void pw_zeroconf_add_listener(struct pw_zeroconf *zc, + struct spa_hook *listener, + const struct pw_zeroconf_events *events, void *data) +{ + spa_hook_list_append(&zc->listener_list, listener, events, data); +} diff --git a/src/modules/module-sendspin/zeroconf.h b/src/modules/module-sendspin/zeroconf.h new file mode 100644 index 000000000..a5de62787 --- /dev/null +++ b/src/modules/module-sendspin/zeroconf.h @@ -0,0 +1,45 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_ZEROCONF_H +#define PIPEWIRE_ZEROCONF_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct pw_zeroconf; + +struct pw_zeroconf_events { +#define PW_VERSION_ZEROCONF_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + void (*error) (void *data, int err, const char *message); + + void (*added) (void *data, void *user, const struct spa_dict *info); + void (*removed) (void *data, void *user, const struct spa_dict *info); +}; + +struct pw_zeroconf * pw_zeroconf_new(struct pw_context *context, + struct spa_dict *props); + +void pw_zeroconf_destroy(struct pw_zeroconf *zc); + +int pw_zeroconf_set_announce(struct pw_zeroconf *zc, void *user, const struct spa_dict *info); +int pw_zeroconf_set_browse(struct pw_zeroconf *zc, void *user, const struct spa_dict *info); + +void pw_zeroconf_add_listener(struct pw_zeroconf *zc, + struct spa_hook *listener, + const struct pw_zeroconf_events *events, void *data); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_ZEROCONF_H */