/* Spa Varlink plugin */ /* SPDX-FileCopyrightText: Copyright © 2026 Arun Raghavan */ /* SPDX-License-Identifier: MIT */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.varlink"); #undef SPA_LOG_TOPIC_DEFAULT #define SPA_LOG_TOPIC_DEFAULT &log_topic struct impl { struct spa_handle handle; struct spa_varlink varlink; struct spa_log *log; struct spa_system *system; struct spa_loop_utils *loop_utils; struct spa_list clients; }; enum spa_varlink_transport { SPA_VARLINK_TRANSPORT_UNIX, }; #define BUFFER_SIZE 16384 struct client { struct spa_list link; struct spa_varlink_client client; struct impl *impl; /* The owning impl */ int fd; /* The socket fd */ struct spa_source *source; /* I/O source if we have a call_more() call */ bool reply_pending; /* A call was made for which a reply is pending */ bool more_pending; /* A call with more=true was made, and a reply is pending */ spa_varlink_reply_func_t cb; /* Callback for the next reply we receive */ void *cb_userdata; struct spa_hook_list listeners; size_t pos; size_t avail; char buf[BUFFER_SIZE]; }; static const char *varlink_parse_path(const char *path, enum spa_varlink_transport *transport) { if (spa_strstartswith(path, "unix:")) { *transport = SPA_VARLINK_TRANSPORT_UNIX; return &path[5]; } return NULL; } static void client_add_listener(void *object, struct spa_hook *listener, const struct spa_varlink_client_events *events, void *data) { struct client *this = SPA_CONTAINER_OF(object, struct client, client); spa_hook_list_append(&this->listeners, listener, events, data); } static char * build_call(const char *method, const char *params, bool oneway, bool more, size_t *len) { struct spa_json_builder b; char *json; spa_json_builder_memstream(&b, &json, len, 0); /* Top-level object */ spa_json_builder_object_push(&b, NULL, "{"); spa_json_builder_object_string(&b, "method", method); spa_json_builder_object_value(&b, true, "parameters", params); spa_json_builder_object_bool(&b, "oneway", oneway); spa_json_builder_object_bool(&b, "more", more); spa_json_builder_pop(&b, "}"); spa_json_builder_close(&b); return json; } static int client_call(void *object, const char *method, const char *params, bool oneway, bool more, spa_varlink_reply_func_t cb, void *userdata) { struct client *this = SPA_CONTAINER_OF(object, struct client, client); struct impl *impl = this->impl; char *buf; size_t len, pos = 0; int res; if (this->reply_pending) { res = -EINVAL; goto done; } buf = build_call(method, params, oneway, more, &len); spa_log_trace(impl->log, "sending message: %s", buf); /* We write the whole string including the NULL terminator */ do { res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1); pos += res; } while ((res > 0 && len >= pos) || res == -EAGAIN || res == -EWOULDBLOCK); free(buf); if (res < 0) { spa_log_error(impl->log, "Error writing message: %s", spa_strerror(res)); goto done; } if (!oneway) { this->reply_pending = true; this->cb = cb; this->cb_userdata = userdata; } if (more) this->more_pending = true; res = 0; done: return res; } static int client_call_sync(void *object, const char *method, const char *params, char **reply) { struct client *this = SPA_CONTAINER_OF(object, struct client, client); struct impl *impl = this->impl; char *buf; size_t len, pos = 0; int res = 0; if (this->reply_pending || this->more_pending || (this->pos != 0 && this->avail != 0)) { spa_log_error(impl->log, "Invalid state for sync call"); res = -EINVAL; } if (res < 0) return res; spa_loop_utils_update_io(impl->loop_utils, this->source, this->source->mask & ~SPA_IO_IN); fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) & ~O_NONBLOCK); buf = build_call(method, params, false, false, &len); spa_log_trace(impl->log, "sending message: %s", buf); /* We write the whole string including the NULL terminator */ do { res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1); pos += res; } while (res > 0 && len >= pos); free(buf); if (res < 0) { spa_log_error(impl->log, "Error writing message: %s", spa_strerror(res)); goto done; } while (true) { size_t eom; res = spa_system_read(impl->system, this->fd, &this->buf[this->avail], BUFFER_SIZE - this->avail); if (res <= 0) { if (res == -EINTR) continue; if (res == -EAGAIN || res == -EWOULDBLOCK) continue; spa_log_error(impl->log, "Error reading from socket: %s", spa_strerror(res)); goto done; } spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail); this->avail += res; for (eom = 0; eom < this->avail; eom++) { if (this->buf[eom] == '\0') break; } /* We need more data */ if (eom == this->avail) continue; if (eom != this->avail - 1) spa_log_warn(impl->log, "Received more than one reply"); spa_log_debug(impl->log, "Consuming %zu bytes", eom + 1); res = eom; *reply = strndup(this->buf, res + 1); this->pos = this->avail = 0; break; } done: fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) | O_NONBLOCK); spa_loop_utils_update_io(impl->loop_utils, this->source, this->source->mask | SPA_IO_IN); return res; } static void client_disconnect(struct client *this) { if (this->source) { spa_loop_utils_destroy_source(this->impl->loop_utils, this->source); this->source = NULL; spa_system_close(this->impl->system, this->fd); spa_hook_list_call(&this->listeners, struct spa_varlink_client_events, disconnect, SPA_VERSION_VARLINK_CLIENT_EVENTS); } } static void do_destroy(struct client *client) { client_disconnect(client); spa_hook_list_call(&client->listeners, struct spa_varlink_client_events, destroy, SPA_VERSION_VARLINK_CLIENT_EVENTS); spa_hook_list_clean(&client->listeners); } static void client_destroy(void *object) { struct client *this = SPA_CONTAINER_OF(object, struct client, client); do_destroy(this); } static void process_message(struct client *this, const char *msg, size_t len) { struct impl *impl = this->impl; struct spa_json json; char key[64]; const char *val, *error = NULL, *params = NULL; int res; bool continues = false; spa_log_trace(impl->log, "got message: %.*s", (int)len, msg); spa_json_begin_object(&json, msg, len); while ((res = spa_json_object_next(&json, key, sizeof(key), &val)) > 0) { if (spa_streq(key, "error")) { len = spa_json_container_len(&json, val, res); error = val; break; } if (spa_streq(key, "continues")) { res = spa_json_get_bool(&json, &continues); if (res < 0) goto error; } if (spa_streq(key, "parameters")) { len = spa_json_container_len(&json, val, res); params = val; break; } } if (this->cb) this->cb(this->cb_userdata, params, error, len, continues); if (!this->more_pending || !continues) { /* We didn't ask for more, but server might send */ if (continues) spa_log_warn(impl->log, "Unexpected continues"); this->reply_pending = false; this->more_pending = false; this->cb = NULL; this->cb_userdata = NULL; } return; error: spa_log_error(impl->log, "Could not parse: %.*s", (int)len, msg); } static void client_on_reply(void *userdata, int fd, uint32_t mask) { struct client *this = (struct client *)userdata; struct impl *impl = this->impl; int res; if (mask & SPA_IO_HUP) { spa_log_info(impl->log, "Got hangup event"); client_disconnect(this); return; } if (mask & SPA_IO_ERR) { spa_log_info(impl->log, "Got error event"); client_disconnect(this); return; } spa_log_debug(impl->log, "Ready to read"); /* SPA_IO_IN */ while (this->pos <= this->avail) { size_t eom; res = spa_system_read(impl->system, fd, &this->buf[this->avail], BUFFER_SIZE - this->avail); if (res <= 0) { if (res == -EINTR) continue; if (res == 0 || res == -EAGAIN || res == -EWOULDBLOCK) return; spa_log_error(impl->log, "Error reading from socket: %s", spa_strerror(res)); client_disconnect(this); return; } spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail); this->avail += res; for (eom = this->pos; eom < this->avail; eom++) { if (this->buf[eom] == '\0') break; } if (eom == this->avail) { if (eom == BUFFER_SIZE) { spa_log_error(impl->log, "Message larger than available buffer"); client_disconnect(this); } spa_log_debug(impl->log, "Need more data"); return; } process_message(this, &this->buf[this->pos], eom - this->pos); spa_log_debug(impl->log, "Consumed %zu bytes", eom + 1 - this->pos); /* Consume message so far and NUL terminator */ this->pos = eom + 1; if (this->pos == this->avail) { /* All data has been consumed, reset */ this->pos = this->avail = 0; break; } } } static const struct spa_varlink_client impl_client = { SPA_VERSION_VARLINK_CLIENT, .add_listener = client_add_listener, .call = client_call, .call_sync = client_call_sync, .destroy = client_destroy, }; static struct spa_varlink_client * impl_connect(void *object, const char *path) { struct impl *this = object; struct client *client; const char *socket_path; struct sockaddr_un sa; enum spa_varlink_transport transport; int res = 0; client = calloc(1, sizeof(struct client)); if (client == NULL) return NULL; socket_path = varlink_parse_path(path, &transport); /* We only support UNIX sockets for now */ if (socket_path == NULL || transport != SPA_VARLINK_TRANSPORT_UNIX) { spa_log_error(this->log, "Could not connect to socket path '%s'", path); goto error; } client->fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (client->fd < 0) { spa_log_error(this->log, "Could not create socket %s", spa_strerror(res)); goto error; } sa.sun_family = AF_UNIX; res = snprintf(sa.sun_path, sizeof(sa.sun_path), "%s", socket_path); if (res < 0 || res > (int) sizeof(sa.sun_path)) { spa_log_error(this->log, "Socket path too long: %s", socket_path); goto error; } res = connect(client->fd, (struct sockaddr *)&sa, sizeof(sa)); if (res < 0) { spa_log_error(this->log, "Could not open socket %s: %s", socket_path, spa_strerror(res)); goto error; } client->client = impl_client; client->impl = this; client->pos = 0; client->avail = 0; client->reply_pending = false; client->more_pending = false; client->cb = NULL; client->cb_userdata = NULL; spa_hook_list_init(&client->listeners); client->source = spa_loop_utils_add_io(this->loop_utils, client->fd, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP, false, client_on_reply, client); if (client->source == NULL) { spa_log_error(this->log, "Could not create source"); goto error; } spa_list_append(&this->clients, &client->link); spa_log_debug(this->log, "new client %p", client); return &client->client; error: free(client); errno = -res; return NULL; } static const struct spa_varlink_methods impl_varlink = { SPA_VERSION_VARLINK_METHODS, .connect = impl_connect, }; static int impl_get_interface(struct spa_handle *handle, const char *type, void **interface) { struct impl *this; spa_return_val_if_fail(handle != NULL, -EINVAL); spa_return_val_if_fail(interface != NULL, -EINVAL); this = (struct impl *) handle; if (spa_streq(type, SPA_TYPE_INTERFACE_Varlink)) *interface = &this->varlink; else return -ENOENT; return 0; } static int impl_clear(struct spa_handle *handle) { struct impl *this = (struct impl *) handle; struct client *client; spa_return_val_if_fail(handle != NULL, -EINVAL); spa_list_consume(client, &this->clients, link) do_destroy(client); return 0; } static size_t impl_get_size(const struct spa_handle_factory *factory, const struct spa_dict *params) { return sizeof(struct impl); } static int impl_init(const struct spa_handle_factory *factory, struct spa_handle *handle, const struct spa_dict *info, const struct spa_support *support, uint32_t n_support) { struct impl *this; spa_return_val_if_fail(factory != NULL, -EINVAL); spa_return_val_if_fail(handle != NULL, -EINVAL); handle->get_interface = impl_get_interface; handle->clear = impl_clear; this = (struct impl *)handle; spa_list_init(&this->clients); this->varlink.iface = SPA_INTERFACE_INIT( SPA_TYPE_INTERFACE_Varlink, SPA_VERSION_VARLINK, &impl_varlink, this); this->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(this->log, &log_topic); this->loop_utils = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_LoopUtils); if (this->loop_utils == NULL) { spa_log_error(this->log, "Loop utils is required for varlink support"); return -EINVAL; } spa_log_debug(this->log, "%p: varlink iface initialised", this); return 0; } static const struct spa_interface_info impl_interfaces[] = { {SPA_TYPE_INTERFACE_Varlink,}, }; static int impl_enum_interface_info(const struct spa_handle_factory *factory, const struct spa_interface_info **info, uint32_t *index) { spa_return_val_if_fail(factory != NULL, -EINVAL); spa_return_val_if_fail(info != NULL, -EINVAL); spa_return_val_if_fail(index != NULL, -EINVAL); switch (*index) { case 0: *info = &impl_interfaces[*index]; break; default: return 0; } (*index)++; return 0; } static const struct spa_handle_factory varlink_factory = { SPA_VERSION_HANDLE_FACTORY, SPA_NAME_SUPPORT_VARLINK, NULL, impl_get_size, impl_init, impl_enum_interface_info, }; SPA_LOG_TOPIC_ENUM_DEFINE_REGISTERED; SPA_EXPORT int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *index) { spa_return_val_if_fail(factory != NULL, -EINVAL); spa_return_val_if_fail(index != NULL, -EINVAL); switch (*index) { case 0: *factory = &varlink_factory; break; default: return 0; } (*index)++; return 1; }