diff --git a/src/modules/module-sendspin-recv.c b/src/modules/module-sendspin-recv.c index 7e71a1397..76da7194d 100644 --- a/src/modules/module-sendspin-recv.c +++ b/src/modules/module-sendspin-recv.c @@ -61,6 +61,10 @@ * - `sendspin.path`: the path on the sendspin server, default "/sendspin" * - `sendspin.client-id`: the client id, default "pipewire-$(hostname)" * - `sendspin.client-name`: the client name, default "$(hostname)" + * - `sendspin.autoconnect`: Use zeroconf to connect to an available server, default false. + * - `sendspin.announce`: Use zeroconf to announce the client, default true unless + * sendspin.autoconnect or sendspin.ip is given. + * - `sendspin.single-server`: Allow only a single server to connect, default true * - `node.always-process = `: true to receive even when not running * - `stream.props = {}`: properties to be passed to all the stream * @@ -94,6 +98,10 @@ * #sendspin.port = 8927 * #sendspin.path = "/sendspin" * #sendspin.client-id = "pipewire-test" + * #sendspin.client-name = "PipeWire Test" + * #sendspin.autoconnect = false + * #sendspin.announce = true + * #sendspin.single-server = true * #node.always-process = false * #audio.position = [ FL FR ] * stream.props = { @@ -120,6 +128,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define DEFAULT_SERVER_PORT PW_SENDSPIN_DEFAULT_SERVER_PORT #define DEFAULT_SENDSPIN_PATH PW_SENDSPIN_DEFAULT_PATH +#define DEFAULT_CREATE_RULES \ + "[ { matches = [ { sendspin.ip = \"~.*\" } ] actions = { create-stream = { } } } ] " #define DEFAULT_POSITION "[ FL FR ]" @@ -136,10 +146,12 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; -struct stream { +struct client { struct impl *impl; struct spa_list link; + char *name; + struct pw_properties *props; struct pw_websocket_connection *conn; struct spa_hook conn_listener; @@ -188,30 +200,31 @@ struct impl { struct spa_hook zeroconf_listener; bool always_process; + bool single_server; struct pw_properties *stream_props; struct pw_websocket *websocket; struct spa_hook websocket_listener; - struct spa_list streams; + struct spa_list clients; }; static void on_stream_destroy(void *d) { - struct stream *stream = d; - spa_hook_remove(&stream->stream_listener); - stream->stream = NULL; + struct client *client = d; + spa_hook_remove(&client->stream_listener); + client->stream = NULL; } static void on_stream_state_changed(void *d, enum pw_stream_state old, enum pw_stream_state state, const char *error) { - struct stream *stream = d; + struct client *client = d; switch (state) { case PW_STREAM_STATE_ERROR: case PW_STREAM_STATE_UNCONNECTED: - pw_impl_module_schedule_destroy(stream->impl->module); + pw_impl_module_schedule_destroy(client->impl->module); break; case PW_STREAM_STATE_PAUSED: case PW_STREAM_STATE_STREAMING: @@ -223,7 +236,7 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, static void on_capture_stream_process(void *d) { - struct stream *stream = d; + struct client *client = d; struct pw_buffer *b; struct spa_buffer *buf; uint8_t *p; @@ -232,7 +245,7 @@ static void on_capture_stream_process(void *d) struct pw_time ts; double err, corr, target, current_time; - if ((b = pw_stream_dequeue_buffer(stream->stream)) == NULL) { + if ((b = pw_stream_dequeue_buffer(client->stream)) == NULL) { pw_log_debug("out of buffers: %m"); return; } @@ -241,41 +254,41 @@ static void on_capture_stream_process(void *d) if ((p = buf->datas[0].data) == NULL) return; - stride = stream->stride; + stride = client->stride; n_frames = buf->datas[0].maxsize / stride; if (b->requested) n_frames = SPA_MIN(b->requested, n_frames); n_bytes = n_frames * stride; - avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + avail = spa_ringbuffer_get_read_index(&client->ring, &index); - if (stream->timeout_count > 4 && stream->timeout_count > 4) { - pw_stream_get_time_n(stream->stream, &ts, sizeof(ts)); + if (client->timeout_count > 4 && client->timeout_count > 4) { + pw_stream_get_time_n(client->stream, &ts, sizeof(ts)); /* index to server time */ - target = spa_regress_calc_y(&stream->regress_index, index); + target = spa_regress_calc_y(&client->regress_index, index); /* server time to client time */ - target = spa_regress_calc_y(&stream->regress_time, target); + target = spa_regress_calc_y(&client->regress_time, target); current_time = ts.now / 1000.0; - current_time -= (ts.buffered * 1000000.0 / stream->info.info.raw.rate) + + current_time -= (ts.buffered * 1000000.0 / client->info.info.raw.rate) + ((ts.delay) * 1000000.0 * ts.rate.num / ts.rate.denom); err = target - (double)current_time; - if (stream->resync) { + if (client->resync) { if (target < current_time) { - target = spa_regress_calc_x(&stream->regress_time, current_time); - index = (uint32_t)spa_regress_calc_x(&stream->regress_index, target); + target = spa_regress_calc_x(&client->regress_time, current_time); + index = (uint32_t)spa_regress_calc_x(&client->regress_index, target); index = SPA_ROUND_DOWN(index, stride); pw_log_info("resync %u %f %f %f", index, target, current_time, target - current_time); - spa_ringbuffer_read_update(&stream->ring, index); - avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + spa_ringbuffer_read_update(&client->ring, index); + avail = spa_ringbuffer_get_read_index(&client->ring, &index); err = 0.0; - stream->resync = false; + client->resync = false; } else { avail = 0; } @@ -285,27 +298,27 @@ static void on_capture_stream_process(void *d) } if (avail < (int32_t)n_bytes) { avail = 0; - stream->resync = true; + client->resync = true; } - else if (avail > (int32_t)stream->buffer_size) { - index += avail - stream->buffer_size; - avail = stream->buffer_size; - stream->resync = true; + else if (avail > (int32_t)client->buffer_size) { + index += avail - client->buffer_size; + avail = client->buffer_size; + client->resync = true; } if (avail > 0) { n_bytes = SPA_MIN(n_bytes, (uint32_t)avail); - corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -1000, 1000)); + corr = spa_dll_update(&client->dll, SPA_CLAMPD(err, -1000, 1000)); pw_log_trace("%u %f %f %f %f", index, current_time, target, err, corr); - pw_stream_set_rate(stream->stream, 1.0 / corr); + pw_stream_set_rate(client->stream, 1.0 / corr); - spa_ringbuffer_read_data(&stream->ring, - stream->buffer, stream->buffer_size, - index % stream->buffer_size, + spa_ringbuffer_read_data(&client->ring, + client->buffer, client->buffer_size, + index % client->buffer_size, p, n_bytes); - spa_ringbuffer_read_update(&stream->ring, index + n_bytes); + spa_ringbuffer_read_update(&client->ring, index + n_bytes); } else { memset(p, 0, n_bytes); } @@ -314,7 +327,7 @@ static void on_capture_stream_process(void *d) buf->datas[0].chunk->stride = stride; buf->datas[0].chunk->size = n_bytes; - pw_stream_queue_buffer(stream->stream, b); + pw_stream_queue_buffer(client->stream, b); } static const struct pw_stream_events capture_stream_events = { @@ -324,19 +337,19 @@ static const struct pw_stream_events capture_stream_events = { .process = on_capture_stream_process }; -static int create_stream(struct stream *stream) +static int create_stream(struct client *client) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; int res; uint32_t n_params; const struct spa_pod *params[1]; uint8_t buffer[1024]; struct spa_pod_builder b; const char *server_id, *ip, *port, *server_name; - struct pw_properties *props = pw_properties_copy(impl->stream_props); + struct pw_properties *props = pw_properties_copy(client->props); - ip = pw_properties_get(impl->props, "sendspin.ip"); - port = pw_properties_get(impl->props, "sendspin.port"); + ip = pw_properties_get(props, "sendspin.ip"); + port = pw_properties_get(props, "sendspin.port"); server_id = pw_properties_get(props, "sendspin.server-id"); server_name = pw_properties_get(props, "sendspin.server-name"); @@ -347,24 +360,24 @@ static int create_stream(struct stream *stream) if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) pw_properties_setf(props, PW_KEY_MEDIA_NAME, "Sendspin from %s", server_name); - stream->stream = pw_stream_new(impl->core, "sendspin receiver", props); - if (stream->stream == NULL) + client->stream = pw_stream_new(impl->core, "sendspin receiver", props); + if (client->stream == NULL) return -errno; - spa_ringbuffer_init(&stream->ring); - stream->buffer_size = 1024 * 1024; - stream->buffer = calloc(1, stream->buffer_size * stream->stride); + spa_ringbuffer_init(&client->ring); + client->buffer_size = 1024 * 1024; + client->buffer = calloc(1, client->buffer_size * client->stride); - pw_stream_add_listener(stream->stream, - &stream->stream_listener, - &capture_stream_events, stream); + pw_stream_add_listener(client->stream, + &client->stream_listener, + &capture_stream_events, client); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &stream->info); + SPA_PARAM_EnumFormat, &client->info); - if ((res = pw_stream_connect(stream->stream, + if ((res = pw_stream_connect(client->stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, PW_STREAM_FLAG_AUTOCONNECT | @@ -385,7 +398,7 @@ static void add_format(struct spa_json_builder *b, const char *codec, int channe spa_json_builder_object_int(b, "bit_depth", depth); spa_json_builder_pop(b, "}"); } -static void add_playerv1_support(struct stream *stream, struct spa_json_builder *b) +static void add_playerv1_support(struct client *client, struct spa_json_builder *b) { spa_json_builder_object_push(b, "player@v1_support", "{"); spa_json_builder_object_push(b, "supported_formats", "["); @@ -399,9 +412,9 @@ static void add_playerv1_support(struct stream *stream, struct spa_json_builder spa_json_builder_pop(b, "]"); spa_json_builder_pop(b, "}"); } -static int send_client_hello(struct stream *stream) +static int send_client_hello(struct client *client) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; struct spa_json_builder b; int res; char *mem; @@ -422,18 +435,18 @@ static int send_client_hello(struct stream *stream) spa_json_builder_object_string(&b, "product_name", "Linux"); /* Use os-release */ spa_json_builder_object_stringf(&b, "software_version", "PipeWire %s", pw_get_library_version()); spa_json_builder_pop(&b, "}"); - add_playerv1_support(stream, &b); + add_playerv1_support(client, &b); spa_json_builder_pop(&b, "}"); spa_json_builder_pop(&b, "}"); spa_json_builder_close(&b); - res = pw_websocket_connection_send_text(stream->conn, mem, size); + res = pw_websocket_connection_send_text(client->conn, mem, size); free(mem); return res; } -static int send_client_state(struct stream *stream) +static int send_client_state(struct client *client) { struct spa_json_builder b; int res; @@ -453,12 +466,12 @@ static int send_client_state(struct stream *stream) spa_json_builder_pop(&b, "}"); spa_json_builder_close(&b); - res = pw_websocket_connection_send_text(stream->conn, mem, size); + res = pw_websocket_connection_send_text(client->conn, mem, size); free(mem); return res; } -static uint64_t get_time_us(struct stream *stream) +static uint64_t get_time_us(struct client *client) { struct timespec now; if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) @@ -466,7 +479,7 @@ static uint64_t get_time_us(struct stream *stream) return SPA_TIMESPEC_TO_USEC(&now); } -static int send_client_time(struct stream *stream) +static int send_client_time(struct client *client) { struct spa_json_builder b; int res; @@ -474,7 +487,7 @@ static int send_client_time(struct stream *stream) char *mem; size_t size; - now = get_time_us(stream); + now = get_time_us(client); spa_json_builder_memstream(&b, &mem, &size, 0); spa_json_builder_array_push(&b, "{"); @@ -485,24 +498,24 @@ static int send_client_time(struct stream *stream) spa_json_builder_pop(&b, "}"); spa_json_builder_close(&b); - res = pw_websocket_connection_send_text(stream->conn, mem, size); + res = pw_websocket_connection_send_text(client->conn, mem, size); free(mem); return res; } -static void do_stream_timer(void *data) +static void do_client_timer(void *data) { - struct stream *stream = data; - send_client_time(stream); + struct client *client = data; + send_client_time(client); } #if 0 -static int send_client_command(struct stream *stream) +static int send_client_command(struct client *client) { return 0; } #endif -static int send_client_goodbye(struct stream *stream, const char *reason) +static int send_client_goodbye(struct client *client, const char *reason) { struct spa_json_builder b; int res; @@ -518,38 +531,38 @@ static int send_client_goodbye(struct stream *stream, const char *reason) spa_json_builder_pop(&b, "}"); spa_json_builder_close(&b); - res = pw_websocket_connection_send_text(stream->conn, mem, size); - pw_websocket_connection_disconnect(stream->conn, true); + res = pw_websocket_connection_send_text(client->conn, mem, size); + pw_websocket_connection_disconnect(client->conn, true); free(mem); return res; } #if 0 -static int send_stream_request_format(struct stream *stream) +static int send_stream_request_format(struct client *client) { return 0; } #endif -static int handle_server_hello(struct stream *stream, struct spa_json *payload) +static int handle_server_hello(struct client *client, struct spa_json *payload) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; struct spa_json it[1]; char key[256], *t; const char *v; int l, version = 0; - struct stream *s, *st; + struct client *c, *ct; while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { if (spa_streq(key, "server_id")) { t = alloca(l+1); spa_json_parse_stringn(v, l, t, l+1); - pw_properties_set(impl->stream_props, "sendspin.server-id", t); + pw_properties_set(client->props, "sendspin.server-id", t); } else if (spa_streq(key, "name")) { t = alloca(l+1); spa_json_parse_stringn(v, l, t, l+1); - pw_properties_set(impl->stream_props, "sendspin.server-name", t); + pw_properties_set(client->props, "sendspin.server-name", t); } else if (spa_streq(key, "version")) { if (spa_json_parse_int(v, l, &version) <= 0) @@ -565,9 +578,9 @@ static int handle_server_hello(struct stream *stream, struct spa_json *payload) spa_json_parse_stringn(v, l, t, l+1); if (spa_streq(t, "player@v1")) - stream->active_roles |= ROLE_PLAYER; + client->active_roles |= ROLE_PLAYER; else if (spa_streq(t, "metadata@v1")) - stream->active_roles |= ROLE_METADATA; + client->active_roles |= ROLE_METADATA; } } else if (spa_streq(key, "connection_reason")) { @@ -575,35 +588,37 @@ static int handle_server_hello(struct stream *stream, struct spa_json *payload) spa_json_parse_stringn(v, l, t, l+1); if (spa_streq(t, "discovery")) - stream->connection_reason = REASON_DISCOVERY; + client->connection_reason = REASON_DISCOVERY; else if (spa_streq(t, "playback")) - stream->connection_reason = REASON_PLAYBACK; + client->connection_reason = REASON_PLAYBACK; - pw_properties_set(impl->stream_props, "sendspin.connection-reason", t); + pw_properties_set(client->props, "sendspin.connection-reason", t); } } if (version != 1) return -ENOTSUP; - if (stream->connection_reason == REASON_PLAYBACK) { - /* keep this server, destroy others */ - spa_list_for_each_safe(s, st, &impl->streams, link) { - if (s == stream) - continue; - send_client_goodbye(s, "another_server"); - } - } else { - /* keep other servers, destroy this one */ - spa_list_for_each_safe(s, st, &impl->streams, link) { - if (s == stream) - continue; - return send_client_goodbye(stream, "another_server"); + if (impl->single_server) { + if (client->connection_reason == REASON_PLAYBACK) { + /* keep this server, destroy others */ + spa_list_for_each_safe(c, ct, &impl->clients, link) { + if (c == client) + continue; + send_client_goodbye(c, "another_server"); + } + } else { + /* keep other servers, destroy this one */ + spa_list_for_each_safe(c, ct, &impl->clients, link) { + if (c == client) + continue; + return send_client_goodbye(client, "another_server"); + } } } - return send_client_state(stream); + return send_client_state(client); } -static int handle_server_state(struct stream *stream, struct spa_json *payload) +static int handle_server_state(struct client *client, struct spa_json *payload) { return 0; } @@ -623,15 +638,15 @@ static int parse_uint64(const char *val, int len, uint64_t *result) return len > 0 && end == buf + len; } -static int handle_server_time(struct stream *stream, struct spa_json *payload) +static int handle_server_time(struct client *client, struct spa_json *payload) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; char key[256]; const char *v; int l; uint64_t t1 = 0, t2 = 0, t3 = 0, t4 = 0, timeout; - t4 = get_time_us(stream); + t4 = get_time_us(client); while ((l = spa_json_object_next(payload, key, sizeof(key), &v)) > 0) { if (spa_streq(key, "client_transmitted")) { @@ -648,38 +663,38 @@ static int handle_server_time(struct stream *stream, struct spa_json *payload) } } - spa_regress_update(&stream->regress_time, (t2+t3)/2, (t1+t4)/2); + spa_regress_update(&client->regress_time, (t2+t3)/2, (t1+t4)/2); - if (stream->timeout_count < 4) + if (client->timeout_count < 4) timeout = 200 * SPA_MSEC_PER_SEC; - else if (stream->timeout_count < 10) + else if (client->timeout_count < 10) timeout = SPA_NSEC_PER_SEC; - else if (stream->timeout_count < 20) + else if (client->timeout_count < 20) timeout = 2 * SPA_NSEC_PER_SEC; else timeout = 5 * SPA_NSEC_PER_SEC; - stream->timeout_count++; - pw_timer_queue_add(impl->timer_queue, &stream->timer, - &stream->timer.timeout, timeout, - do_stream_timer, stream); + client->timeout_count++; + pw_timer_queue_add(impl->timer_queue, &client->timer, + &client->timer.timeout, timeout, + do_client_timer, client); return 0; } -static int handle_server_command(struct stream *stream, struct spa_json *payload) +static int handle_server_command(struct client *client, struct spa_json *payload) { return 0; } /* {"codec":"pcm","sample_rate":44100,"channels":2,"bit_depth":16} */ -static int parse_player(struct stream *stream, struct spa_json *player) +static int parse_player(struct client *client, struct spa_json *player) { char key[256], codec[64] = ""; const char *v; int l, sample_rate = 0, channels = 0, bit_depth = 0; - spa_zero(stream->info); - stream->info.media_type = SPA_MEDIA_TYPE_audio; + spa_zero(client->info); + client->info.media_type = SPA_MEDIA_TYPE_audio; while ((l = spa_json_object_next(player, key, sizeof(key), &v)) > 0) { if (spa_streq(key, "codec")) { if (spa_json_parse_stringn(v, l, codec, sizeof(codec)) <= 0) @@ -704,44 +719,44 @@ static int parse_player(struct stream *stream, struct spa_json *player) return -EINVAL; if (spa_streq(codec, "pcm")) { - stream->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; - stream->info.info.raw.rate = sample_rate; - stream->info.info.raw.channels = channels; + client->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + client->info.info.raw.rate = sample_rate; + client->info.info.raw.channels = channels; switch (bit_depth) { case 16: - stream->info.info.raw.format = SPA_AUDIO_FORMAT_S16_LE; - stream->stride = 2 * channels; + client->info.info.raw.format = SPA_AUDIO_FORMAT_S16_LE; + client->stride = 2 * channels; break; case 24: - stream->info.info.raw.format = SPA_AUDIO_FORMAT_S24_LE; - stream->stride = 3 * channels; + client->info.info.raw.format = SPA_AUDIO_FORMAT_S24_LE; + client->stride = 3 * channels; break; default: return -EINVAL; } } else if (spa_streq(codec, "opus")) { - stream->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; - stream->info.info.opus.rate = sample_rate; - stream->info.info.opus.channels = channels; + client->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; + client->info.info.opus.rate = sample_rate; + client->info.info.opus.channels = channels; } else if (spa_streq(codec, "flac")) { - stream->info.media_subtype = SPA_MEDIA_SUBTYPE_flac; - stream->info.info.flac.rate = sample_rate; - stream->info.info.flac.channels = channels; + client->info.media_subtype = SPA_MEDIA_SUBTYPE_flac; + client->info.info.flac.rate = sample_rate; + client->info.info.flac.channels = channels; } else return -EINVAL; - spa_dll_set_bw(&stream->dll, SPA_DLL_BW_MIN, 1000, sample_rate); + spa_dll_set_bw(&client->dll, SPA_DLL_BW_MIN, 1000, sample_rate); return 0; } /* {"player":{}} */ -static int handle_stream_start(struct stream *stream, struct spa_json *payload) +static int handle_stream_start(struct client *client, struct spa_json *payload) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; struct spa_json it[1]; char key[256]; const char *v; @@ -752,50 +767,50 @@ static int handle_stream_start(struct stream *stream, struct spa_json *payload) if (!spa_json_is_object(v, l)) return -EPROTO; spa_json_enter(payload, &it[0]); - parse_player(stream, &it[0]); + parse_player(client, &it[0]); } } - if (stream->stream == NULL) { - create_stream(stream); + if (client->stream == NULL) { + create_stream(client); - pw_timer_queue_cancel(&stream->timer); - pw_timer_queue_add(impl->timer_queue, &stream->timer, - NULL, 0, do_stream_timer, stream); + pw_timer_queue_cancel(&client->timer); + pw_timer_queue_add(impl->timer_queue, &client->timer, + NULL, 0, do_client_timer, client); } else { } return 0; } -static void stream_clear(struct stream *stream) +static void stream_clear(struct client *client) { - spa_ringbuffer_init(&stream->ring); - memset(stream->buffer, 0, stream->buffer_size); + spa_ringbuffer_init(&client->ring); + memset(client->buffer, 0, client->buffer_size); } -static int handle_stream_clear(struct stream *stream, struct spa_json *payload) +static int handle_stream_clear(struct client *client, struct spa_json *payload) { - stream_clear(stream); + stream_clear(client); return 0; } -static int handle_stream_end(struct stream *stream, struct spa_json *payload) +static int handle_stream_end(struct client *client, struct spa_json *payload) { - if (stream->stream != NULL) { - pw_stream_destroy(stream->stream); - stream->stream = NULL; - stream_clear(stream); + if (client->stream != NULL) { + pw_stream_destroy(client->stream); + client->stream = NULL; + stream_clear(client); } return 0; } -static int handle_group_update(struct stream *stream, struct spa_json *payload) +static int handle_group_update(struct client *client, struct spa_json *payload) { return 0; } /* { "type":... "payload":{...} } */ -static int do_parse_text(struct stream *stream, const char *content, int size) +static int do_parse_text(struct client *client, const char *content, int size) { struct spa_json it[2], *payload = NULL; char key[256], type[256] = ""; @@ -821,35 +836,35 @@ static int do_parse_text(struct stream *stream, const char *content, int size) } } if (spa_streq(type, "server/hello")) - res = handle_server_hello(stream, payload); + res = handle_server_hello(client, payload); else if (spa_streq(type, "server/state")) - res = handle_server_state(stream, payload); + res = handle_server_state(client, payload); else if (spa_streq(type, "server/time")) - res = handle_server_time(stream, payload); + res = handle_server_time(client, payload); else if (spa_streq(type, "server/command")) - res = handle_server_command(stream, payload); + res = handle_server_command(client, payload); else if (spa_streq(type, "stream/start")) - res = handle_stream_start(stream, payload); + res = handle_stream_start(client, payload); else if (spa_streq(type, "stream/end")) - res = handle_stream_end(stream, payload); + res = handle_stream_end(client, payload); else if (spa_streq(type, "stream/clear")) - res = handle_stream_clear(stream, payload); + res = handle_stream_clear(client, payload); else if (spa_streq(type, "group/update")) - res = handle_group_update(stream, payload); + res = handle_group_update(client, payload); else res = 0; return res; } -static int do_handle_binary(struct stream *stream, const uint8_t *payload, int size) +static int do_handle_binary(struct client *client, const uint8_t *payload, int size) { - struct impl *impl = stream->impl; + struct impl *impl = client->impl; int32_t filled; uint32_t index, length = size - 9; uint64_t timestamp; - if (payload[0] != 4 || stream->stream == NULL) + if (payload[0] != 4 || client->stream == NULL) return 0; timestamp = ((uint64_t)payload[1]) << 56; @@ -861,24 +876,24 @@ static int do_handle_binary(struct stream *stream, const uint8_t *payload, int s timestamp |= ((uint64_t)payload[7]) << 8; timestamp |= ((uint64_t)payload[8]); - filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + filled = spa_ringbuffer_get_write_index(&client->ring, &index); if (filled < 0) { pw_log_warn("%p: underrun write:%u filled:%d", - stream, index, filled); - } else if (filled + length > stream->buffer_size) { + client, index, filled); + } else if (filled + length > client->buffer_size) { pw_log_debug("%p: overrun write:%u filled:%d", - stream, index, filled); + client, index, filled); } - spa_ringbuffer_write_data(&stream->ring, - stream->buffer, stream->buffer_size, - index % stream->buffer_size, + spa_ringbuffer_write_data(&client->ring, + client->buffer, client->buffer_size, + index % client->buffer_size, &payload[9], length); - spa_ringbuffer_write_update(&stream->ring, index + length); + spa_ringbuffer_write_update(&client->ring, index + length); pw_loop_lock(impl->data_loop); - spa_regress_update(&stream->regress_index, index, timestamp); + spa_regress_update(&client->regress_index, index, timestamp); pw_loop_unlock(impl->data_loop); return 0; @@ -886,45 +901,53 @@ static int do_handle_binary(struct stream *stream, const uint8_t *payload, int s static void on_connection_message(void *data, int opcode, void *payload, size_t size) { - struct stream *stream = data; + struct client *client = data; if (opcode == PW_WEBSOCKET_OPCODE_TEXT) { - do_parse_text(stream, payload, size); + do_parse_text(client, payload, size); } else if (opcode == PW_WEBSOCKET_OPCODE_BINARY) { - do_handle_binary(stream, payload, size); + do_handle_binary(client, payload, size); } else { pw_log_warn("%02x unknown %08x", opcode, (int)size); } } -static void stream_destroy(struct stream *stream) +static void client_free(struct client *client) { - handle_stream_end(stream, NULL); - if (stream->conn) { - spa_hook_remove(&stream->conn_listener); - pw_websocket_connection_destroy(stream->conn); + struct impl *impl = client->impl; + + spa_list_remove(&client->link); + + handle_stream_end(client, NULL); + if (client->conn) { + spa_hook_remove(&client->conn_listener); + pw_websocket_connection_destroy(client->conn); + } else { + pw_websocket_cancel(impl->websocket, client); } - pw_timer_queue_cancel(&stream->timer); - spa_list_remove(&stream->link); - free(stream->buffer); - free(stream); + pw_timer_queue_cancel(&client->timer); + + pw_properties_free(client->props); + free(client->buffer); + free(client->name); + free(client); } static void on_connection_destroy(void *data) { - struct stream *stream = data; - stream->conn = NULL; - pw_log_info("connection %p destroy", stream); + struct client *client = data; + client->conn = NULL; + pw_log_info("connection %p destroy", client); } static void on_connection_error(void *data, int res, const char *reason) { - struct stream *stream = data; - pw_log_error("connection %p error %d %s", stream, res, reason); + struct client *client = data; + pw_log_error("connection %p error %d %s", client, res, reason); } static void on_connection_disconnected(void *data) { - struct stream *stream = data; - stream_destroy(stream); + struct client *client = data; + client_free(client); } static const struct pw_websocket_connection_events websocket_connection_events = { @@ -935,38 +958,150 @@ static const struct pw_websocket_connection_events websocket_connection_events = .message = on_connection_message, }; -static struct stream *stream_new(struct impl *impl, struct pw_websocket_connection *conn) +static struct client *client_new(struct impl *impl, const char *name, struct pw_properties *props) { - struct stream *stream; + struct client *client; - stream = calloc(1, sizeof(*stream)); - if (stream == NULL) - return NULL; + client = calloc(1, sizeof(*client)); + if (client == NULL) + goto error; - stream->impl = impl; - spa_list_append(&impl->streams, &stream->link); + client->impl = impl; + spa_list_append(&impl->clients, &client->link); - stream->conn = conn; - pw_websocket_connection_add_listener(stream->conn, &stream->conn_listener, - &websocket_connection_events, stream); + client->name = name ? strdup(name) : NULL; + client->props = props; + spa_regress_init(&client->regress_index, 5); + spa_regress_init(&client->regress_time, 5); - spa_regress_init(&stream->regress_index, 5); - spa_regress_init(&stream->regress_time, 5); + spa_dll_init(&client->dll); + client->resync = true; - spa_dll_init(&stream->dll); - stream->resync = true; + return client; +error: + pw_properties_free(props); + return NULL; +} - return stream; +static int client_connect(struct client *c) +{ + struct impl *impl = c->impl; + const char *addr, *port, *path; + addr = pw_properties_get(c->props, "sendspin.ip"); + port = pw_properties_get(c->props, "sendspin.port"); + path = pw_properties_get(c->props, "sendspin.path"); + return pw_websocket_connect(impl->websocket, c, addr, port, path); +} + +static void client_connected(struct client *c, struct pw_websocket_connection *conn) +{ + if (c->conn) { + spa_hook_remove(&c->conn_listener); + pw_websocket_connection_destroy(c->conn); + } + c->conn = conn; + if (conn) + pw_websocket_connection_add_listener(c->conn, &c->conn_listener, + &websocket_connection_events, c); +} + +static struct client *client_find(struct impl *impl, const char *name) +{ + struct client *c; + spa_list_for_each(c, &impl->clients, link) { + if (spa_streq(c->name, name)) + return c; + } + return NULL; +} + +struct match_info { + struct impl *impl; + const char *name; + struct pw_properties *props; + struct pw_websocket_connection *conn; + bool matched; +}; + +static int rule_matched(void *data, const char *location, const char *action, + const char *str, size_t len) +{ + struct match_info *i = data; + struct impl *impl = i->impl; + int res = 0; + + i->matched = true; + if (spa_streq(action, "create-stream")) { + struct client *c; + + pw_properties_update_string(i->props, str, len); + if ((c = client_new(impl, i->name, spa_steal_ptr(i->props))) == NULL) + return -errno; + if (i->conn) + client_connected(c, i->conn); + else + client_connect(c); + } + return res; +} + +static int match_client(struct impl *impl, const char *name, struct pw_properties *props, + struct pw_websocket_connection *conn) +{ + const char *str; + struct match_info minfo = { + .impl = impl, + .name = name, + .props = props, + .conn = conn, + }; + + if ((str = pw_properties_get(impl->props, "stream.rules")) == NULL) + str = DEFAULT_CREATE_RULES; + + pw_conf_match_rules(str, strlen(str), NAME, &props->dict, + rule_matched, &minfo); + + if (!minfo.matched) { + pw_log_info("unmatched client found %s", str); + if (conn) + pw_websocket_connection_destroy(conn); + pw_properties_free(props); + } + return minfo.matched; } static void on_websocket_connected(void *data, void *user, struct pw_websocket_connection *conn, const char *path) { struct impl *impl = data; - struct stream *stream; + struct client *c = user; + pw_log_info("connected to %s", path); - stream = stream_new(impl, conn); - send_client_hello(stream); + if (c == NULL) { + struct sockaddr_storage addr; + char ip[128]; + uint16_t port = 0; + bool ipv4; + struct pw_properties *props; + + pw_websocket_connection_address(conn, + (struct sockaddr*)&addr, sizeof(addr)); + + props = pw_properties_copy(impl->stream_props); + if (pw_net_get_ip(&addr, ip, sizeof(ip), &ipv4, &port) >= 0) { + pw_properties_set(props, "sendspin.ip", ip); + pw_properties_setf(props, "sendspin.port", "%u", port); + } + pw_properties_set(props, "sendspin.path", path); + + if ((c = client_new(impl, "", props)) == NULL) { + pw_log_error("can't create new client: %m"); + return; + } + } + client_connected(c, conn); + send_client_hello(c); } static const struct pw_websocket_events websocket_events = { @@ -976,10 +1111,45 @@ static const struct pw_websocket_events websocket_events = { static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info) { + struct impl *impl = data; + const char *name, *addr, *port, *path; + struct client *c; + struct pw_properties *props; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if (impl->single_server && !spa_list_is_empty(&impl->clients)) + return; + + if ((c = client_find(impl, name)) != NULL) + return; + + props = pw_properties_copy(impl->stream_props); + pw_properties_update(props, info); + + addr = spa_dict_lookup(info, "zeroconf.address"); + port = spa_dict_lookup(info, "zeroconf.port"); + path = spa_dict_lookup(info, "path"); + + pw_properties_set(props, "sendspin.ip", addr); + pw_properties_set(props, "sendspin.port", port); + pw_properties_set(props, "sendspin.path", path); + + match_client(impl, name, props, NULL); } static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info) { + struct impl *impl = data; + const char *name; + struct client *c; + + name = spa_dict_lookup(info, "zeroconf.hostname"); + + if ((c = client_find(impl, name)) == NULL) + return; + + client_free(c); } static const struct pw_zeroconf_events zeroconf_events = { @@ -1002,10 +1172,10 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { - struct stream *s; + struct client *c; - spa_list_consume(s, &impl->streams, link) - stream_destroy(s); + spa_list_consume(c, &impl->clients, link) + client_free(c); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -1064,6 +1234,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) const char *str, *hostname, *port, *path; struct pw_properties *props, *stream_props; int res = 0; + bool autoconnect, announce; PW_LOG_TOPIC_INIT(mod_topic); @@ -1087,7 +1258,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->main_loop = pw_context_get_main_loop(context); impl->data_loop = pw_context_acquire_loop(context, &props->dict); impl->timer_queue = pw_context_get_timer_queue(context); - spa_list_init(&impl->streams); + spa_list_init(&impl->clients); pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); @@ -1109,6 +1280,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->always_process = pw_properties_get_bool(stream_props, PW_KEY_NODE_ALWAYS_PROCESS, true); + autoconnect = pw_properties_get_bool(props, "sendspin.autoconnect", false); + announce = pw_properties_get_bool(props, "sendspin.announce", true); + impl->single_server = pw_properties_get_bool(props, + "sendspin.single-server", true); + if ((str = pw_properties_get(props, "sendspin.client-name")) == NULL) pw_properties_set(props, "sendspin.client-name", pw_get_host_name()); if ((str = pw_properties_get(props, "sendspin.client-id")) == NULL) @@ -1147,15 +1323,10 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) } hostname = pw_properties_get(props, "sendspin.ip"); - if (hostname != NULL) { - port = pw_properties_get(props, "sendspin.port"); - if (port == NULL) - port = SPA_STRINGIFY(DEFAULT_SERVER_PORT); - if ((path = pw_properties_get(props, "sendspin.path")) == NULL) - path = DEFAULT_SENDSPIN_PATH; - - pw_websocket_connect(impl->websocket, NULL, hostname, port, path); - } else { + /* a client should either connect itself or advertize itself and listen + * for connections, not both */ + if (!autoconnect && hostname == NULL){ + /* listen for server connection */ if ((hostname = pw_properties_get(props, "source.ip")) == NULL) hostname = DEFAULT_SOURCE_IP; if ((port = pw_properties_get(props, "source.port")) == NULL) @@ -1165,7 +1336,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_websocket_listen(impl->websocket, NULL, hostname, port, path); - if (impl->zeroconf) { + if (impl->zeroconf && announce) { + /* optionally announce ourselves */ str = pw_properties_get(props, "sendspin.client-id"); pw_zeroconf_set_announce(impl->zeroconf, NULL, &SPA_DICT_ITEMS( @@ -1175,6 +1347,33 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) SPA_DICT_ITEM("path", path))); } } + else { + if (hostname != NULL) { + struct client *c; + struct pw_properties *p; + + /* connect to hardcoded server */ + port = pw_properties_get(props, "sendspin.port"); + if (port == NULL) + port = SPA_STRINGIFY(DEFAULT_SERVER_PORT); + if ((path = pw_properties_get(props, "sendspin.path")) == NULL) + path = DEFAULT_SENDSPIN_PATH; + + p = pw_properties_copy(impl->stream_props); + pw_properties_set(p, "sendspin.ip", hostname); + pw_properties_set(p, "sendspin.port", port); + pw_properties_set(p, "sendspin.path", path); + + if ((c = client_new(impl, "", p)) != NULL) + client_connect(c); + } + /* connect to zeroconf server if we can */ + if (impl->zeroconf) { + pw_zeroconf_set_browse(impl->zeroconf, NULL, + &SPA_DICT_ITEMS( + SPA_DICT_ITEM("zeroconf.service", PW_SENDSPIN_SERVER_SERVICE))); + } + } pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/modules/module-sendspin-send.c b/src/modules/module-sendspin-send.c index 9bc7195ce..3acf7ea4e 100644 --- a/src/modules/module-sendspin-send.c +++ b/src/modules/module-sendspin-send.c @@ -925,6 +925,8 @@ static void client_free(struct client *c) { struct impl *impl = c->impl; + spa_list_remove(&c->link); + handle_client_goodbye(c, NULL); if (c->conn) { spa_hook_remove(&c->conn_listener); @@ -933,7 +935,8 @@ static void client_free(struct client *c) pw_websocket_cancel(impl->websocket, c); } pw_timer_queue_cancel(&c->timer); - spa_list_remove(&c->link); + pw_properties_free(c->props); + free(c->name); free(c); }