diff --git a/spa/include/spa/support/varlink.h b/spa/include/spa/support/varlink.h new file mode 100644 index 000000000..52d93218c --- /dev/null +++ b/spa/include/spa/support/varlink.h @@ -0,0 +1,163 @@ +/* Simple Plugin API */ +/* SPDX-FileCopyrightText: Copyright © 2026 Arun Raghavan */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_VARLINK_H +#define SPA_VARLINK_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef SPA_API_VARLINK + #ifdef SPA_API_IMPL + #define SPA_API_VARLINK SPA_API_IMPL + #else + #define SPA_API_VARLINK static inline + #endif +#endif + +/** \defgroup spa_varlink Varlink + * Varlink communication + */ + +/** + * \addtogroup spa_varlink + * \{ + */ + +#define SPA_TYPE_INTERFACE_Varlink SPA_TYPE_INFO_INTERFACE_BASE "Varlink" + +typedef void (*spa_varlink_reply_func_t) (void *data, const char *params, + const char *error, size_t len, bool continues); + +struct spa_varlink_client_events { +#define SPA_VERSION_VARLINK_CLIENT_EVENTS 0 + uint32_t version; + + /** The client was destroyed. */ + void (*destroy) (void *data); + + /** The client was disconnected. */ + void (*disconnect) (void *data); +}; + +struct spa_varlink_client { +#define SPA_VERSION_VARLINK_CLIENT 0 + uint32_t version; + + /** Add a listener fo events */ + void (*add_listener) (void *object, struct spa_hook *listener, + const struct spa_varlink_client_events *events, + void *data); + + /** Call a single- or no-reply method on a varlink client. + * + * \param method Fully qualified (`interface.Method`) method name to + * call. + * \param params Method parameters as a string (must be a valid JSON + * object). + * \param oneway Signal that the server should not send a reply. + * \param more Expect multiple replies from the server for this method + * call. + * \param cb Callback to invok when a reply to this call arrives. + * \param userdata Userdata to supply to the callback. + * \return 0 on success, or a negative error code on failure. + */ + int (*call) (void *object, const char *method, const char *params, + bool oneway, bool more, spa_varlink_reply_func_t cb, + void *userdata); + + /** Call a single-reply method and block until a reply is received. + * + * \param method Fully qualified (`interface.Method`) method name to + * call. + * \param params Method parameters as a string (must be a valid JSON + * object). + * \param reply The reply string. The caller is responsible for freeing + * this data with `free()` + * \return 0 on success, or a negative error code on failure. + */ + int (*call_sync) (void *object, const char *method, const char *params, + char **reply); + + /** Destroy a varlink client. + * + * \param client The client to destroy + */ + void (*destroy) (void *object); +}; + +/** \copydoc spa_varlink_client_methods.add_listener + * \sa spa_varlink_client_methods.add_listener */ +SPA_API_VARLINK void +spa_varlink_client_add_listener(struct spa_varlink_client *client, + struct spa_hook *listener, + const struct spa_varlink_client_events *events, + void *data) +{ + spa_api_func_v(client, add_listener, 0, listener, events, data); +} + +/** \copydoc spa_varlink_client_methods.call + * \sa spa_varlink_client_methods.call */ +SPA_API_VARLINK int +spa_varlink_client_call(struct spa_varlink_client *client, const char *method, + const char *params, bool oneway, bool more, + spa_varlink_reply_func_t cb, void *userdata) +{ + return spa_api_func_r(int, -EINVAL, client, call, 0, method, params, + oneway, more, cb, userdata); +} + +/** \copydoc spa_varlink_client_methods.call_sync + * \sa spa_varlink_client_methods.call_sync */ +SPA_API_VARLINK int +spa_varlink_client_call_sync(struct spa_varlink_client *client, const char *method, + const char *params, char **reply) +{ + return spa_api_func_r(int, -EINVAL, client, call_sync, 0, method, + params, reply); +} + +/** \copydoc spa_varlink_client_methods.destroy + * \sa spa_varlink_client_methods.destroy */ +SPA_API_VARLINK void +spa_varlink_client_destroy(struct spa_varlink_client *client) +{ + spa_api_func_v(client, destroy, 0); +} + +#define SPA_VERSION_VARLINK 0 +struct spa_varlink { struct spa_interface iface; }; + +struct spa_varlink_methods { +#define SPA_VERSION_VARLINK_METHODS 0 + uint32_t version; + + /** + * Connect to a varlink service. + * + * \param path Path to connect to, for example `unix:/path/to/socket` + * \return A `spa_varlink_client` on success, NULL on failure. + */ + struct spa_varlink_client * (*connect) (void *object, const char *path); +}; + +/** \copydoc spa_varlink_methods.connect + * \sa spa_varlink_methods.connect */ +SPA_API_VARLINK void * +spa_varlink_connect(struct spa_varlink *varlink, const char *path) +{ + return spa_api_method_r(struct spa_varlink_client *, NULL, spa_varlink, + &varlink->iface, connect, 0, path); +} + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_VARLINK_H */ diff --git a/spa/include/spa/utils/names.h b/spa/include/spa/utils/names.h index a3f519011..2ed079583 100644 --- a/spa/include/spa/utils/names.h +++ b/spa/include/spa/utils/names.h @@ -25,6 +25,7 @@ extern "C" { #define SPA_NAME_SUPPORT_LOOP "support.loop" /**< A Loop/LoopControl/LoopUtils * interface */ #define SPA_NAME_SUPPORT_SYSTEM "support.system" /**< A System interface */ +#define SPA_NAME_SUPPORT_VARLINK "support.varlink" /**< A Varlink interface */ #define SPA_NAME_SUPPORT_NODE_DRIVER "support.node.driver" /**< A dummy driver node */ diff --git a/spa/plugins/support/meson.build b/spa/plugins/support/meson.build index 678845774..3adc9e3d6 100644 --- a/spa/plugins/support/meson.build +++ b/spa/plugins/support/meson.build @@ -25,6 +25,17 @@ spa_support_lib = shared_library('spa-support', install_dir : spa_plugindir / 'support') spa_support_dep = declare_dependency(link_with: spa_support_lib) +spa_varlink_sources = [ + 'varlink.c' +] + +spa_varlink_lib = shared_library('spa-varlink', + spa_varlink_sources, + dependencies : [ spa_dep ], + install : true, + install_dir : spa_plugindir / 'support') +spa_varlink_dep = declare_dependency(link_with: spa_varlink_lib) + if get_option('evl').allowed() evl_inc = include_directories('/usr/include') evl_lib = cc.find_library('evl', diff --git a/spa/plugins/support/varlink.c b/spa/plugins/support/varlink.c new file mode 100644 index 000000000..aa13f16b2 --- /dev/null +++ b/spa/plugins/support/varlink.c @@ -0,0 +1,623 @@ +/* Spa Varlink plugin */ +/* SPDX-FileCopyrightText: Copyright © 2026 Arun Raghavan */ +/* SPDX-License-Identifier: MIT */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.varlink"); + +#undef SPA_LOG_TOPIC_DEFAULT +#define SPA_LOG_TOPIC_DEFAULT &log_topic + +struct impl { + struct spa_handle handle; + struct spa_varlink varlink; + + struct spa_log *log; + struct spa_system *system; + struct spa_loop_utils *loop_utils; + + struct spa_list clients; +}; + +enum spa_varlink_transport { + SPA_VARLINK_TRANSPORT_UNIX, +}; + +#define BUFFER_SIZE 16384 + +struct client { + struct spa_list link; + struct spa_varlink_client client; + + struct impl *impl; /* The owning impl */ + int fd; /* The socket fd */ + struct spa_source *source; /* I/O source if we have a call_more() call */ + bool reply_pending; /* A call was made for which a reply is pending */ + bool more_pending; /* A call with more=true was made, and a reply is pending */ + + spa_varlink_reply_func_t cb; /* Callback for the next reply we receive */ + void *cb_userdata; + + struct spa_hook_list listeners; + + size_t pos; + size_t avail; + char buf[BUFFER_SIZE]; +}; + +static const char *varlink_parse_path(const char *path, + enum spa_varlink_transport *transport) +{ + if (spa_strstartswith(path, "unix:")) { + *transport = SPA_VARLINK_TRANSPORT_UNIX; + return &path[5]; + } + + return NULL; +} + +static void +client_add_listener(void *object, struct spa_hook *listener, + const struct spa_varlink_client_events *events, void *data) +{ + struct client *this = SPA_CONTAINER_OF(object, struct client, client); + spa_hook_list_append(&this->listeners, listener, events, data); +} + +static char * +build_call(const char *method, const char *params, bool oneway, bool more, + size_t *len) +{ + struct spa_json_builder b; + char *json; + + spa_json_builder_memstream(&b, &json, len, 0); + + /* Top-level object */ + spa_json_builder_object_push(&b, NULL, "{"); + + spa_json_builder_object_string(&b, "method", method); + spa_json_builder_object_value(&b, true, "parameters", params); + spa_json_builder_object_bool(&b, "oneway", oneway); + spa_json_builder_object_bool(&b, "more", more); + + spa_json_builder_pop(&b, "}"); + + spa_json_builder_close(&b); + + return json; +} + +static int +client_call(void *object, const char *method, const char *params, + bool oneway, bool more, spa_varlink_reply_func_t cb, + void *userdata) +{ + struct client *this = SPA_CONTAINER_OF(object, struct client, client); + struct impl *impl = this->impl; + char *buf; + size_t len, pos = 0; + int res; + + if (this->reply_pending) { + res = -EINVAL; + goto done; + } + + buf = build_call(method, params, oneway, more, &len); + + spa_log_trace(impl->log, "sending message: %s", buf); + + /* We write the whole string including the NULL terminator */ + do { + res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1); + pos += res; + } while ((res > 0 && len >= pos) || res == -EAGAIN || res == -EWOULDBLOCK); + + free(buf); + + if (res < 0) { + spa_log_error(impl->log, "Error writing message: %s", + spa_strerror(res)); + goto done; + } + + if (!oneway) { + this->reply_pending = true; + this->cb = cb; + this->cb_userdata = userdata; + } + if (more) + this->more_pending = true; + + res = 0; + +done: + return res; +} + +static int +client_call_sync(void *object, const char *method, const char *params, + char **reply) +{ + struct client *this = SPA_CONTAINER_OF(object, struct client, client); + struct impl *impl = this->impl; + char *buf; + size_t len, pos = 0; + int res = 0; + + if (this->reply_pending || this->more_pending || + (this->pos != 0 && this->avail != 0)) { + spa_log_error(impl->log, "Invalid state for sync call"); + res = -EINVAL; + } + + if (res < 0) + return res; + + spa_loop_utils_update_io(impl->loop_utils, this->source, + this->source->mask & ~SPA_IO_IN); + + fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) & ~O_NONBLOCK); + + buf = build_call(method, params, false, false, &len); + + spa_log_trace(impl->log, "sending message: %s", buf); + /* We write the whole string including the NULL terminator */ + do { + res = spa_system_write(this->impl->system, this->fd, &buf[pos], len - pos + 1); + pos += res; + } while (res > 0 && len >= pos); + + free(buf); + + if (res < 0) { + spa_log_error(impl->log, "Error writing message: %s", + spa_strerror(res)); + goto done; + } + + while (true) { + size_t eom; + + res = spa_system_read(impl->system, this->fd, &this->buf[this->avail], + BUFFER_SIZE - this->avail); + if (res <= 0) { + if (res == -EINTR) + continue; + + if (res == -EAGAIN || res == -EWOULDBLOCK) + continue; + + spa_log_error(impl->log, "Error reading from socket: %s", + spa_strerror(res)); + + goto done; + } + + spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail); + this->avail += res; + + for (eom = 0; eom < this->avail; eom++) { + if (this->buf[eom] == '\0') + break; + } + + /* We need more data */ + if (eom == this->avail) + continue; + + if (eom != this->avail - 1) + spa_log_warn(impl->log, "Received more than one reply"); + + spa_log_debug(impl->log, "Consuming %zu bytes", eom + 1); + + res = eom; + *reply = strndup(this->buf, res + 1); + + this->pos = this->avail = 0; + break; + } + +done: + fcntl(this->fd, F_SETFL, fcntl(this->fd, F_GETFL) | O_NONBLOCK); + + spa_loop_utils_update_io(impl->loop_utils, this->source, + this->source->mask | SPA_IO_IN); + + return res; +} + +static void +client_disconnect(struct client *this) +{ + if (this->source) { + spa_loop_utils_destroy_source(this->impl->loop_utils, this->source); + this->source = NULL; + + spa_system_close(this->impl->system, this->fd); + + spa_hook_list_call(&this->listeners, struct spa_varlink_client_events, + disconnect, SPA_VERSION_VARLINK_CLIENT_EVENTS); + } +} + +static void do_destroy(struct client *client) +{ + client_disconnect(client); + + spa_hook_list_call(&client->listeners, struct spa_varlink_client_events, + destroy, SPA_VERSION_VARLINK_CLIENT_EVENTS); + + spa_hook_list_clean(&client->listeners); +} + +static void +client_destroy(void *object) +{ + struct client *this = SPA_CONTAINER_OF(object, struct client, client); + do_destroy(this); +} + +static void +process_message(struct client *this, const char *msg, size_t len) +{ + struct impl *impl = this->impl; + struct spa_json json; + char key[64]; + const char *val, *error = NULL, *params = NULL; + int res; + bool continues = false; + + spa_log_trace(impl->log, "got message: %.*s", (int)len, msg); + + spa_json_begin_object(&json, msg, len); + + while ((res = spa_json_object_next(&json, key, sizeof(key), &val)) > 0) { + if (spa_streq(key, "error")) { + len = spa_json_container_len(&json, val, res); + error = val; + break; + } + + if (spa_streq(key, "continues")) { + res = spa_json_get_bool(&json, &continues); + if (res < 0) + goto error; + } + + if (spa_streq(key, "parameters")) { + len = spa_json_container_len(&json, val, res); + params = val; + break; + } + } + + if (this->cb) + this->cb(this->cb_userdata, params, error, len, continues); + + if (!this->more_pending || !continues) { + /* We didn't ask for more, but server might send */ + if (continues) + spa_log_warn(impl->log, "Unexpected continues"); + + this->reply_pending = false; + this->more_pending = false; + this->cb = NULL; + this->cb_userdata = NULL; + } + + return; + +error: + spa_log_error(impl->log, "Could not parse: %.*s", (int)len, msg); +} + +static void +client_on_reply(void *userdata, int fd, uint32_t mask) +{ + struct client *this = (struct client *)userdata; + struct impl *impl = this->impl; + int res; + + if (mask & SPA_IO_HUP) { + spa_log_info(impl->log, "Got hangup event"); + client_disconnect(this); + return; + } + + if (mask & SPA_IO_ERR) { + spa_log_info(impl->log, "Got error event"); + client_disconnect(this); + return; + } + + spa_log_debug(impl->log, "Ready to read"); + + /* SPA_IO_IN */ + while (this->pos <= this->avail) { + size_t eom; + + res = spa_system_read(impl->system, fd, &this->buf[this->avail], + BUFFER_SIZE - this->avail); + if (res <= 0) { + if (res == -EINTR) + continue; + + if (res == 0 || res == -EAGAIN || res == -EWOULDBLOCK) + return; + + spa_log_error(impl->log, "Error reading from socket: %s", + spa_strerror(res)); + client_disconnect(this); + + return; + } + + spa_log_debug(impl->log, "Got %d bytes (avail: %zu)", res, this->avail); + this->avail += res; + + for (eom = this->pos; eom < this->avail; eom++) { + if (this->buf[eom] == '\0') + break; + } + + if (eom == this->avail) { + if (eom == BUFFER_SIZE) { + spa_log_error(impl->log, "Message larger than available buffer"); + client_disconnect(this); + } + + spa_log_debug(impl->log, "Need more data"); + return; + } + + process_message(this, &this->buf[this->pos], eom - this->pos); + + spa_log_debug(impl->log, "Consumed %zu bytes", eom + 1 - this->pos); + + /* Consume message so far and NUL terminator */ + this->pos = eom + 1; + + if (this->pos == this->avail) { + /* All data has been consumed, reset */ + this->pos = this->avail = 0; + break; + } + + } +} + +static const struct spa_varlink_client impl_client = { + SPA_VERSION_VARLINK_CLIENT, + .add_listener = client_add_listener, + .call = client_call, + .call_sync = client_call_sync, + .destroy = client_destroy, +}; + +static struct spa_varlink_client * +impl_connect(void *object, const char *path) +{ + struct impl *this = object; + struct client *client; + const char *socket_path; + struct sockaddr_un sa; + enum spa_varlink_transport transport; + int res = 0; + + client = calloc(1, sizeof(struct client)); + if (client == NULL) + return NULL; + + socket_path = varlink_parse_path(path, &transport); + /* We only support UNIX sockets for now */ + if (socket_path == NULL || transport != SPA_VARLINK_TRANSPORT_UNIX) { + spa_log_error(this->log, "Could not connect to socket path '%s'", + path); + goto error; + } + + client->fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (client->fd < 0) { + spa_log_error(this->log, "Could not create socket %s", + spa_strerror(res)); + goto error; + } + + sa.sun_family = AF_UNIX; + res = snprintf(sa.sun_path, sizeof(sa.sun_path), "%s", socket_path); + if (res < 0 || res > (int) sizeof(sa.sun_path)) { + spa_log_error(this->log, "Socket path too long: %s", socket_path); + goto error; + } + + res = connect(client->fd, (struct sockaddr *)&sa, sizeof(sa)); + if (res < 0) { + spa_log_error(this->log, "Could not open socket %s: %s", + socket_path, spa_strerror(res)); + goto error; + } + + client->client = impl_client; + client->impl = this; + client->pos = 0; + client->avail = 0; + client->reply_pending = false; + client->more_pending = false; + client->cb = NULL; + client->cb_userdata = NULL; + spa_hook_list_init(&client->listeners); + + client->source = spa_loop_utils_add_io(this->loop_utils, client->fd, + SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP, false, + client_on_reply, client); + if (client->source == NULL) { + spa_log_error(this->log, "Could not create source"); + goto error; + } + + spa_list_append(&this->clients, &client->link); + + spa_log_debug(this->log, "new client %p", client); + + return &client->client; + +error: + free(client); + errno = -res; + return NULL; +} + +static const struct spa_varlink_methods impl_varlink = { + SPA_VERSION_VARLINK_METHODS, + .connect = impl_connect, +}; + +static int +impl_get_interface(struct spa_handle *handle, const char *type, + void **interface) +{ + struct impl *this; + + spa_return_val_if_fail(handle != NULL, -EINVAL); + spa_return_val_if_fail(interface != NULL, -EINVAL); + + this = (struct impl *) handle; + + if (spa_streq(type, SPA_TYPE_INTERFACE_Varlink)) + *interface = &this->varlink; + else + return -ENOENT; + + return 0; +} + +static int impl_clear(struct spa_handle *handle) +{ + struct impl *this = (struct impl *) handle; + struct client *client; + + spa_return_val_if_fail(handle != NULL, -EINVAL); + + spa_list_consume(client, &this->clients, link) + do_destroy(client); + + return 0; +} + +static size_t +impl_get_size(const struct spa_handle_factory *factory, + const struct spa_dict *params) +{ + return sizeof(struct impl); +} + +static int +impl_init(const struct spa_handle_factory *factory, + struct spa_handle *handle, + const struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + struct impl *this; + + spa_return_val_if_fail(factory != NULL, -EINVAL); + spa_return_val_if_fail(handle != NULL, -EINVAL); + + handle->get_interface = impl_get_interface; + handle->clear = impl_clear; + + this = (struct impl *)handle; + spa_list_init(&this->clients); + + this->varlink.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Varlink, + SPA_VERSION_VARLINK, + &impl_varlink, this); + + this->system = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_System); + this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); + spa_log_topic_init(this->log, &log_topic); + + this->loop_utils = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_LoopUtils); + if (this->loop_utils == NULL) { + spa_log_error(this->log, "Loop utils is required for varlink support"); + return -EINVAL; + } + + spa_log_debug(this->log, "%p: varlink iface initialised", this); + + return 0; +} + +static const struct spa_interface_info impl_interfaces[] = { + {SPA_TYPE_INTERFACE_Varlink,}, +}; + +static int +impl_enum_interface_info(const struct spa_handle_factory *factory, + const struct spa_interface_info **info, + uint32_t *index) +{ + spa_return_val_if_fail(factory != NULL, -EINVAL); + spa_return_val_if_fail(info != NULL, -EINVAL); + spa_return_val_if_fail(index != NULL, -EINVAL); + + switch (*index) { + case 0: + *info = &impl_interfaces[*index]; + break; + default: + return 0; + } + + (*index)++; + + return 0; +} + +static const struct spa_handle_factory varlink_factory = { + SPA_VERSION_HANDLE_FACTORY, + SPA_NAME_SUPPORT_VARLINK, + NULL, + impl_get_size, + impl_init, + impl_enum_interface_info, +}; + +SPA_LOG_TOPIC_ENUM_DEFINE_REGISTERED; + +SPA_EXPORT +int spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *index) +{ + spa_return_val_if_fail(factory != NULL, -EINVAL); + spa_return_val_if_fail(index != NULL, -EINVAL); + + switch (*index) { + case 0: + *factory = &varlink_factory; + break; + default: + return 0; + } + + (*index)++; + + return 1; +} diff --git a/spa/tests/meson.build b/spa/tests/meson.build index 81635dea9..79f1dcaa1 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -27,6 +27,17 @@ if find.found() endforeach endif +utils = [ + ['varlink-call', []], +] + +foreach a : utils + executable('spa-' + a[0], a[0] + '.c', + dependencies : [ spa_dep, dl_lib, pthread_lib, mathlib ] + a[1], + include_directories : [configinc], + ) +endforeach + benchmark_apps = [ ['stress-ringbuffer', []], ['benchmark-pod', []], diff --git a/spa/tests/varlink-call.c b/spa/tests/varlink-call.c new file mode 100644 index 000000000..e422c723e --- /dev/null +++ b/spa/tests/varlink-call.c @@ -0,0 +1,260 @@ +/* Spa */ +/* SPDX-FileCopyrightText: Copyright © 2026 Arun Raghavan */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +struct data { + const char *plugin_dir; + + struct spa_support support[16]; + uint32_t n_support; + struct spa_log *log; + struct spa_loop *loop; + struct spa_loop_control *loop_control; + struct spa_loop_utils *loop_utils; + struct spa_system *system; + + struct spa_varlink *varlink; + struct spa_varlink_client *client; + struct spa_hook listener; + + bool running; +}; + +static int load_handle(struct data *data, struct spa_handle **handle, const + char *lib, const char *name, struct spa_dict *info) +{ + int res; + void *hnd; + spa_handle_factory_enum_func_t enum_func; + uint32_t i; + char *path; + + if ((path = spa_aprintf("%s/%s", data->plugin_dir, lib)) == NULL) + return -ENOMEM; + + hnd = dlopen(path, RTLD_NOW); + free(path); + + if (hnd == NULL) { + printf("can't load %s: %s\n", lib, dlerror()); + return -ENOENT; + } + if ((enum_func = dlsym(hnd, SPA_HANDLE_FACTORY_ENUM_FUNC_NAME)) == NULL) { + printf("can't find enum function\n"); + res = -ENOENT; + goto exit_cleanup; + } + + for (i = 0;;) { + const struct spa_handle_factory *factory; + + if ((res = enum_func(&factory, &i)) <= 0) { + if (res != 0) + printf("can't enumerate factories: %s\n", spa_strerror(res)); + break; + } + if (factory->version < 1) + continue; + if (!spa_streq(factory->name, name)) + continue; + + *handle = calloc(1, spa_handle_factory_get_size(factory, NULL)); + if ((res = spa_handle_factory_init(factory, *handle, + info, data->support, + data->n_support)) < 0) { + printf("can't make factory instance: %d\n", res); + goto exit_cleanup; + } + return 0; + } + return -EBADF; + +exit_cleanup: + dlclose(hnd); + return res; +} + +static int init(struct data *data) +{ + struct spa_handle *handle; + struct spa_dict_item items[1]; + struct spa_dict info; + const char *str; + void *iface; + int res; + + if ((str = getenv("SPA_PLUGIN_DIR")) == NULL) + str = PLUGINDIR; + data->plugin_dir = str; + + /* enable the debug messages in SPA */ + items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_LOG_TIMESTAMP, "true"); + info = SPA_DICT_ARRAY(items); + if ((res = load_handle(data, &handle, "support/libspa-support.so", + SPA_NAME_SUPPORT_LOG, &info)) < 0) + return res; + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_Log, &iface)) < 0) { + printf("can't get System interface %d\n", res); + return res; + } + + data->log = iface; + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_Log, data->log); + + if ((str = getenv("SPA_DEBUG"))) + data->log->level = atoi(str); + + /* load and set support system */ + if ((res = load_handle(data, &handle, + "support/libspa-support.so", + SPA_NAME_SUPPORT_SYSTEM, NULL)) < 0) + return res; + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_System, &iface)) < 0) { + printf("can't get System interface %d\n", res); + return res; + } + data->system = iface; + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_System, data->system); + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataSystem, data->system); + + /* load and set support loop and loop control */ + if ((res = load_handle(data, &handle, + "support/libspa-support.so", + SPA_NAME_SUPPORT_LOOP, NULL)) < 0) + return res; + + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_Loop, &iface)) < 0) { + printf("can't get interface %d\n", res); + return res; + } + data->loop = iface; + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_Loop, data->loop); + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_DataLoop, data->loop); + + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_LoopControl, &iface)) < 0) { + printf("can't get interface %d\n", res); + return res; + } + data->loop_control = iface; + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_LoopControl, data->loop_control); + + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_LoopUtils, &iface)) < 0) { + printf("can't get interface %d\n", res); + return res; + } + data->loop_utils = iface; + data->support[data->n_support++] = SPA_SUPPORT_INIT(SPA_TYPE_INTERFACE_LoopUtils, data->loop_utils); + + /* load varlink */ + if ((res = load_handle(data, &handle, + "support/libspa-varlink.so", + SPA_NAME_SUPPORT_VARLINK, NULL)) < 0) + return res; + + if ((res = spa_handle_get_interface(handle, SPA_TYPE_INTERFACE_Varlink, &iface)) < 0) { + printf("can't get interface %d\n", res); + return res; + } + data->varlink = iface; + + return 0; +} + +static void +on_reply(void *userdata, const char *params, const char *error, size_t len, bool continues) +{ + struct data *data = userdata; + + if (params) { + printf("Got reply: params: %.*s, continues: %s\n", + (int) len, params, continues ? "true" : "false"); + } else { + printf("Got reply: error: %.*s\n", (int) len, error); + } + + data->running = false; +} + +static void on_disconnect(void *userdata) +{ + struct data *data = userdata; + printf("Disconnected\n"); + data->running = false; +} + +static void on_destroy(void *userdata) +{ + printf("Destroyed\n"); +} + +int main(int argc, char *argv[]) +{ + struct data data = { 0 }; + struct spa_varlink_client_events events = { + .disconnect = on_disconnect, + .destroy = on_destroy, + }; + int res; + bool sync = true; + + if (argc < 4 || argc > 5) { + printf("usage: %s \n", argv[0]); + return -1; + } + + res = init(&data); + if (res < 0) + return res; + + if (argc == 5) + sync = argv[4][0] == '1'; + + data.running = true; + + data.client = spa_varlink_connect(data.varlink, argv[1]); + if (data.client == NULL) { + printf("Could not connect to socket: %s\n", spa_strerror(errno)); + return -1; + } + + spa_varlink_client_add_listener(data.client, &data.listener, &events, &data); + + if (sync) { + char *reply; + res = spa_varlink_client_call_sync(data.client, argv[2], argv[3], &reply); + if (res < 0) { + printf("Call failed: %s\n", spa_strerror(res)); + return -1; + } + printf("Got reply (%d): %s\n", res, reply); + } else { + res = spa_varlink_client_call(data.client, argv[2], argv[3], false, false, on_reply, &data); + if (res < 0) { + printf("Could not connect to socket: %s\n", spa_strerror(res)); + return -1; + } + + while (data.running) { + spa_loop_control_iterate(data.loop_control, 1000); + } + } + + spa_varlink_client_destroy(data.client); + + return 0; +}