pipewire/spa/plugins/support/varlink.c
2026-03-09 16:52:27 -07:00

623 lines
14 KiB
C

/* Spa Varlink plugin */
/* SPDX-FileCopyrightText: Copyright © 2026 Arun Raghavan */
/* SPDX-License-Identifier: MIT */
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <spa/support/log.h>
#include <spa/support/loop.h>
#include <spa/support/plugin.h>
#include <spa/support/varlink.h>
#include <spa/utils/defs.h>
#include <spa/utils/hook.h>
#include <spa/utils/json-builder.h>
#include <spa/utils/list.h>
#include <spa/utils/names.h>
#include <spa/utils/result.h>
SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.varlink");
#undef SPA_LOG_TOPIC_DEFAULT
#define SPA_LOG_TOPIC_DEFAULT &log_topic
struct impl {
struct spa_handle handle;
struct spa_varlink varlink;
struct spa_log *log;
struct spa_system *system;
struct spa_loop_utils *loop_utils;
struct spa_list clients;
};
enum spa_varlink_transport {
SPA_VARLINK_TRANSPORT_UNIX,
};
#define BUFFER_SIZE 16384
struct client {
struct spa_list link;
struct spa_varlink_client client;
struct impl *impl; /* The owning impl */
int fd; /* The socket fd */
struct spa_source *source; /* I/O source if we have a call_more() call */
bool reply_pending; /* A call was made for which a reply is pending */
bool more_pending; /* A call with more=true was made, and a reply is pending */
spa_varlink_reply_func_t cb; /* Callback for the next reply we receive */
void *cb_userdata;
struct spa_hook_list listeners;
size_t pos;
size_t avail;
char buf[BUFFER_SIZE];
};
static const char *varlink_parse_path(const char *path,
enum spa_varlink_transport *transport)
{
if (spa_strstartswith(path, "unix:")) {
*transport = SPA_VARLINK_TRANSPORT_UNIX;
return &path[5];
}
return NULL;
}
static void
client_add_listener(void *object, struct spa_hook *listener,
const struct spa_varlink_client_events *events, void *data)
{
struct client *this = SPA_CONTAINER_OF(object, struct client, client);
spa_hook_list_append(&this->listeners, listener, events, data);
}
static char *
build_call(const char *method, const char *params, bool oneway, bool more,
size_t *len)
{
struct spa_json_builder b;
char *json;
spa_json_builder_memstream(&b, &json, len, 0);
/* Top-level object */
spa_json_builder_object_push(&b, NULL, "{");
spa_json_builder_object_string(&b, "method", method);
spa_json_builder_object_value(&b, true, "parameters", params);
spa_json_builder_object_bool(&b, "oneway", oneway);
spa_json_builder_object_bool(&b, "more", more);
spa_json_builder_pop(&b, "}");
spa_json_builder_close(&b);
return json;
}
static int
client_call(void *object, const char *method, const char *params,
bool oneway, bool more, spa_varlink_reply_func_t cb,
void *userdata)
{
struct client *this = SPA_CONTAINER_OF(object, struct client, client);
struct impl *impl = this->impl;
char *buf;
size_t len, pos = 0;
int res;
if (this->reply_pending) {
res = -EINVAL;
goto done;
}
buf = build_call(method, params, oneway, more, &len);
spa_log_trace(impl->log, "sending message: %s", buf);
/* We write the whole string including the NULL terminator */
do {
res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1);
pos += res;
} while ((res > 0 && len >= pos) || res == -EAGAIN || res == -EWOULDBLOCK);
free(buf);
if (res < 0) {
spa_log_error(impl->log, "Error writing message: %s",
spa_strerror(res));
goto done;
}
if (!oneway) {
this->reply_pending = true;
this->cb = cb;
this->cb_userdata = userdata;
}
if (more)
this->more_pending = true;
res = 0;
done:
return res;
}
static int
client_call_sync(void *object, const char *method, const char *params,
char **reply)
{
struct client *this = SPA_CONTAINER_OF(object, struct client, client);
struct impl *impl = this->impl;
char *buf;
size_t len, pos = 0;
int res = 0;
if (this->reply_pending || this->more_pending ||
(this->pos != 0 && this->avail != 0)) {
spa_log_error(impl->log, "Invalid state for sync call");
res = -EINVAL;
}
if (res < 0)
return res;
spa_loop_utils_update_io(impl->loop_utils, this->source,
this->source->mask & ~SPA_IO_IN);
fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) & ~O_NONBLOCK);
buf = build_call(method, params, false, false, &len);
spa_log_trace(impl->log, "sending message: %s", buf);
/* We write the whole string including the NULL terminator */
do {
res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1);
pos += res;
} while (res > 0 && len >= pos);
free(buf);
if (res < 0) {
spa_log_error(impl->log, "Error writing message: %s",
spa_strerror(res));
goto done;
}
while (true) {
size_t eom;
res = spa_system_read(impl->system, this->fd, &this->buf[this->avail],
BUFFER_SIZE - this->avail);
if (res <= 0) {
if (res == -EINTR)
continue;
if (res == -EAGAIN || res == -EWOULDBLOCK)
continue;
spa_log_error(impl->log, "Error reading from socket: %s",
spa_strerror(res));
goto done;
}
spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail);
this->avail += res;
for (eom = 0; eom < this->avail; eom++) {
if (this->buf[eom] == '\0')
break;
}
/* We need more data */
if (eom == this->avail)
continue;
if (eom != this->avail - 1)
spa_log_warn(impl->log, "Received more than one reply");
spa_log_debug(impl->log, "Consuming %zu bytes", eom + 1);
res = eom;
*reply = strndup(this->buf, res + 1);
this->pos = this->avail = 0;
break;
}
done:
fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) | O_NONBLOCK);
spa_loop_utils_update_io(impl->loop_utils, this->source,
this->source->mask | SPA_IO_IN);
return res;
}
static void
client_disconnect(struct client *this)
{
if (this->source) {
spa_loop_utils_destroy_source(this->impl->loop_utils, this->source);
this->source = NULL;
spa_system_close(this->impl->system, this->fd);
spa_hook_list_call(&this->listeners, struct spa_varlink_client_events,
disconnect, SPA_VERSION_VARLINK_CLIENT_EVENTS);
}
}
static void do_destroy(struct client *client)
{
client_disconnect(client);
spa_hook_list_call(&client->listeners, struct spa_varlink_client_events,
destroy, SPA_VERSION_VARLINK_CLIENT_EVENTS);
spa_hook_list_clean(&client->listeners);
}
static void
client_destroy(void *object)
{
struct client *this = SPA_CONTAINER_OF(object, struct client, client);
do_destroy(this);
}
static void
process_message(struct client *this, const char *msg, size_t len)
{
struct impl *impl = this->impl;
struct spa_json json;
char key[64];
const char *val, *error = NULL, *params = NULL;
int res;
bool continues = false;
spa_log_trace(impl->log, "got message: %.*s", (int)len, msg);
spa_json_begin_object(&json, msg, len);
while ((res = spa_json_object_next(&json, key, sizeof(key), &val)) > 0) {
if (spa_streq(key, "error")) {
len = spa_json_container_len(&json, val, res);
error = val;
break;
}
if (spa_streq(key, "continues")) {
res = spa_json_get_bool(&json, &continues);
if (res < 0)
goto error;
}
if (spa_streq(key, "parameters")) {
len = spa_json_container_len(&json, val, res);
params = val;
break;
}
}
if (this->cb)
this->cb(this->cb_userdata, params, error, len, continues);
if (!this->more_pending || !continues) {
/* We didn't ask for more, but server might send */
if (continues)
spa_log_warn(impl->log, "Unexpected continues");
this->reply_pending = false;
this->more_pending = false;
this->cb = NULL;
this->cb_userdata = NULL;
}
return;
error:
spa_log_error(impl->log, "Could not parse: %.*s", (int)len, msg);
}
static void
client_on_reply(void *userdata, int fd, uint32_t mask)
{
struct client *this = (struct client *)userdata;
struct impl *impl = this->impl;
int res;
if (mask & SPA_IO_HUP) {
spa_log_info(impl->log, "Got hangup event");
client_disconnect(this);
return;
}
if (mask & SPA_IO_ERR) {
spa_log_info(impl->log, "Got error event");
client_disconnect(this);
return;
}
spa_log_debug(impl->log, "Ready to read");
/* SPA_IO_IN */
while (this->pos <= this->avail) {
size_t eom;
res = spa_system_read(impl->system, fd, &this->buf[this->avail],
BUFFER_SIZE - this->avail);
if (res <= 0) {
if (res == -EINTR)
continue;
if (res == 0 || res == -EAGAIN || res == -EWOULDBLOCK)
return;
spa_log_error(impl->log, "Error reading from socket: %s",
spa_strerror(res));
client_disconnect(this);
return;
}
spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail);
this->avail += res;
for (eom = this->pos; eom < this->avail; eom++) {
if (this->buf[eom] == '\0')
break;
}
if (eom == this->avail) {
if (eom == BUFFER_SIZE) {
spa_log_error(impl->log, "Message larger than available buffer");
client_disconnect(this);
}
spa_log_debug(impl->log, "Need more data");
return;
}
process_message(this, &this->buf[this->pos], eom - this->pos);
spa_log_debug(impl->log, "Consumed %zu bytes", eom + 1 - this->pos);
/* Consume message so far and NUL terminator */
this->pos = eom + 1;
if (this->pos == this->avail) {
/* All data has been consumed, reset */
this->pos = this->avail = 0;
break;
}
}
}
static const struct spa_varlink_client impl_client = {
SPA_VERSION_VARLINK_CLIENT,
.add_listener = client_add_listener,
.call = client_call,
.call_sync = client_call_sync,
.destroy = client_destroy,
};
static struct spa_varlink_client *
impl_connect(void *object, const char *path)
{
struct impl *this = object;
struct client *client;
const char *socket_path;
struct sockaddr_un sa;
enum spa_varlink_transport transport;
int res = 0;
client = calloc(1, sizeof(struct client));
if (client == NULL)
return NULL;
socket_path = varlink_parse_path(path, &transport);
/* We only support UNIX sockets for now */
if (socket_path == NULL || transport != SPA_VARLINK_TRANSPORT_UNIX) {
spa_log_error(this->log, "Could not connect to socket path '%s'",
path);
goto error;
}
client->fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (client->fd < 0) {
spa_log_error(this->log, "Could not create socket %s",
spa_strerror(res));
goto error;
}
sa.sun_family = AF_UNIX;
res = snprintf(sa.sun_path, sizeof(sa.sun_path), "%s", socket_path);
if (res < 0 || res > (int) sizeof(sa.sun_path)) {
spa_log_error(this->log, "Socket path too long: %s", socket_path);
goto error;
}
res = connect(client->fd, (struct sockaddr *)&sa, sizeof(sa));
if (res < 0) {
spa_log_error(this->log, "Could not open socket %s: %s",
socket_path, spa_strerror(res));
goto error;
}
client->client = impl_client;
client->impl = this;
client->pos = 0;
client->avail = 0;
client->reply_pending = false;
client->more_pending = false;
client->cb = NULL;
client->cb_userdata = NULL;
spa_hook_list_init(&client->listeners);
client->source = spa_loop_utils_add_io(this->loop_utils, client->fd,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP, false,
client_on_reply, client);
if (client->source == NULL) {
spa_log_error(this->log, "Could not create source");
goto error;
}
spa_list_append(&this->clients, &client->link);
spa_log_debug(this->log, "new client %p", client);
return &client->client;
error:
free(client);
errno = -res;
return NULL;
}
static const struct spa_varlink_methods impl_varlink = {
SPA_VERSION_VARLINK_METHODS,
.connect = impl_connect,
};
static int
impl_get_interface(struct spa_handle *handle, const char *type,
void **interface)
{
struct impl *this;
spa_return_val_if_fail(handle != NULL, -EINVAL);
spa_return_val_if_fail(interface != NULL, -EINVAL);
this = (struct impl *) handle;
if (spa_streq(type, SPA_TYPE_INTERFACE_Varlink))
*interface = &this->varlink;
else
return -ENOENT;
return 0;
}
static int impl_clear(struct spa_handle *handle)
{
struct impl *this = (struct impl *) handle;
struct client *client;
spa_return_val_if_fail(handle != NULL, -EINVAL);
spa_list_consume(client, &this->clients, link)
do_destroy(client);
return 0;
}
static size_t
impl_get_size(const struct spa_handle_factory *factory,
const struct spa_dict *params)
{
return sizeof(struct impl);
}
static int
impl_init(const struct spa_handle_factory *factory,
struct spa_handle *handle,
const struct spa_dict *info,
const struct spa_support *support,
uint32_t n_support)
{
struct impl *this;
spa_return_val_if_fail(factory != NULL, -EINVAL);
spa_return_val_if_fail(handle != NULL, -EINVAL);
handle->get_interface = impl_get_interface;
handle->clear = impl_clear;
this = (struct impl *)handle;
spa_list_init(&this->clients);
this->varlink.iface = SPA_INTERFACE_INIT(
SPA_TYPE_INTERFACE_Varlink,
SPA_VERSION_VARLINK,
&impl_varlink, this);
this->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System);
this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
spa_log_topic_init(this->log, &log_topic);
this->loop_utils = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_LoopUtils);
if (this->loop_utils == NULL) {
spa_log_error(this->log, "Loop utils is required for varlink support");
return -EINVAL;
}
spa_log_debug(this->log, "%p: varlink iface initialised", this);
return 0;
}
static const struct spa_interface_info impl_interfaces[] = {
{SPA_TYPE_INTERFACE_Varlink,},
};
static int
impl_enum_interface_info(const struct spa_handle_factory *factory,
const struct spa_interface_info **info,
uint32_t *index)
{
spa_return_val_if_fail(factory != NULL, -EINVAL);
spa_return_val_if_fail(info != NULL, -EINVAL);
spa_return_val_if_fail(index != NULL, -EINVAL);
switch (*index) {
case 0:
*info = &impl_interfaces[*index];
break;
default:
return 0;
}
(*index)++;
return 0;
}
static const struct spa_handle_factory varlink_factory = {
SPA_VERSION_HANDLE_FACTORY,
SPA_NAME_SUPPORT_VARLINK,
NULL,
impl_get_size,
impl_init,
impl_enum_interface_info,
};
SPA_LOG_TOPIC_ENUM_DEFINE_REGISTERED;
SPA_EXPORT
int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *index)
{
spa_return_val_if_fail(factory != NULL, -EINVAL);
spa_return_val_if_fail(index != NULL, -EINVAL);
switch (*index) {
case 0:
*factory = &varlink_factory;
break;
default:
return 0;
}
(*index)++;
return 1;
}