stream: add pw_stream_new_simple

Make a new method that also makes a remote and connects to it
transparently. This saves some lines of code.
This commit is contained in:
Wim Taymans 2018-03-22 18:07:57 +01:00
parent f9ceedb714
commit 41a3a924ef
5 changed files with 205 additions and 270 deletions

View file

@ -59,32 +59,25 @@ struct data {
struct pw_core *core;
struct pw_type *t;
struct pw_remote *remote;
struct spa_hook remote_listener;
struct pw_stream *stream;
struct spa_hook stream_listener;
struct spa_audio_info_raw format;
double accumulator;
};
static void fill_f32(struct data *d, void *dest, int avail)
{
float *dst = dest;
int n_samples = avail / (sizeof(float) * d->format.channels);
float *dst = dest, val;
int n_samples = avail / (sizeof(float) * DEFAULT_CHANNELS);
int i, c;
for (i = 0; i < n_samples; i++) {
float val;
d->accumulator += M_PI_M2 * 440 / d->format.rate;
d->accumulator += M_PI_M2 * 440 / DEFAULT_RATE;
if (d->accumulator >= M_PI_M2)
d->accumulator -= M_PI_M2;
val = sin(d->accumulator);
for (c = 0; c < d->format.channels; c++)
for (c = 0; c < DEFAULT_CHANNELS; c++)
*dst++ = val;
}
}
@ -100,7 +93,6 @@ static void on_process(void *userdata)
return;
buf = b->buffer;
if ((p = buf->datas[0].data) == NULL)
return;
@ -116,84 +108,47 @@ static const struct pw_stream_events stream_events = {
.process = on_process,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
{
struct data *data = _data;
struct pw_remote *remote = data->remote;
switch (state) {
case PW_REMOTE_STATE_ERROR:
printf("remote error: %s\n", error);
pw_main_loop_quit(data->loop);
break;
case PW_REMOTE_STATE_CONNECTED:
{
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
printf("remote state: \"%s\"\n",
pw_remote_state_as_string(state));
data->stream = pw_stream_new(remote, "audio-src", NULL);
params[0] = spa_pod_builder_object(&b,
data->t->param.idEnumFormat, data->t->spa_format,
"I", data->type.media_type.audio,
"I", data->type.media_subtype.raw,
":", data->type.format_audio.format, "I", data->type.audio_format.F32,
":", data->type.format_audio.channels, "i", DEFAULT_CHANNELS,
":", data->type.format_audio.rate, "i", DEFAULT_RATE);
data->format.channels = DEFAULT_CHANNELS;
data->format.rate = DEFAULT_RATE;
pw_stream_add_listener(data->stream,
&data->stream_listener,
&stream_events,
data);
pw_stream_connect(data->stream,
PW_DIRECTION_OUTPUT,
NULL,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS,
params, 1);
break;
}
default:
printf("remote state: \"%s\"\n", pw_remote_state_as_string(state));
break;
}
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_state_changed,
};
int main(int argc, char *argv[])
{
struct data data = { 0, };
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
pw_init(&argc, &argv);
data.loop = pw_main_loop_new(NULL);
data.core = pw_core_new(pw_main_loop_get_loop(data.loop), NULL);
data.t = pw_core_get_type(data.core);
data.remote = pw_remote_new(data.core, NULL, 0);
data.stream = pw_stream_new_simple(
pw_main_loop_get_loop(data.loop),
"audio-src",
NULL,
&stream_events,
&data);
data.remote = pw_stream_get_remote(data.stream);
data.t = pw_core_get_type(pw_remote_get_core(data.remote));
init_type(&data.type, data.t->map);
pw_remote_add_listener(data.remote, &data.remote_listener, &remote_events, &data);
params[0] = spa_pod_builder_object(&b,
data.t->param.idEnumFormat, data.t->spa_format,
"I", data.type.media_type.audio,
"I", data.type.media_subtype.raw,
":", data.type.format_audio.format, "I", data.type.audio_format.F32,
":", data.type.format_audio.channels, "i", DEFAULT_CHANNELS,
":", data.type.format_audio.rate, "i", DEFAULT_RATE);
pw_remote_connect(data.remote);
pw_stream_connect(data.stream,
PW_DIRECTION_OUTPUT,
NULL,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS,
params, 1);
pw_main_loop_run(data.loop);
pw_core_destroy(data.core);
pw_stream_destroy(data.stream);
pw_main_loop_destroy(data.loop);
return 0;

View file

@ -548,7 +548,7 @@ static int impl_node_process(struct spa_node *node)
if (d->io->status != SPA_STATUS_HAVE_BUFFER)
return SPA_STATUS_NEED_BUFFER;
if (d->io->buffer_id > d->n_buffers)
if (d->io->buffer_id >= d->n_buffers)
return SPA_STATUS_NEED_BUFFER;
buf = d->buffers[d->io->buffer_id];

View file

@ -139,6 +139,9 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old,
struct data *data = _data;
printf("stream state: \"%s\"\n", pw_stream_state_as_string(state));
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
pw_main_loop_quit(data->loop);
break;
case PW_STREAM_STATE_CONFIGURE:
pw_stream_set_active(data->stream, true);
break;
@ -273,151 +276,78 @@ static const struct pw_stream_events stream_events = {
.process = on_process,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
static int build_format(struct data *data, struct spa_pod_builder *b, const struct spa_pod **params)
{
struct data *data = _data;
struct pw_remote *remote = data->remote;
uint32_t i, c;
SDL_RendererInfo info;
switch (state) {
case PW_REMOTE_STATE_ERROR:
printf("remote error: %s\n", error);
pw_main_loop_quit(data->loop);
break;
SDL_GetRendererInfo(data->renderer, &info);
case PW_REMOTE_STATE_CONNECTED:
{
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
SDL_RendererInfo info;
uint32_t i, c;
spa_pod_builder_push_object(b,
data->t->param.idEnumFormat, data->t->spa_format);
spa_pod_builder_id(b, data->type.media_type.video);
spa_pod_builder_id(b, data->type.media_subtype.raw);
printf("remote state: \"%s\"\n", pw_remote_state_as_string(state));
data->stream = pw_stream_new(remote, "video-play",
pw_properties_new("pipewire.client.reuse", "1", NULL));
SDL_GetRendererInfo(data->renderer, &info);
spa_pod_builder_push_object(&b,
data->t->param.idEnumFormat, data->t->spa_format);
spa_pod_builder_id(&b, data->type.media_type.video);
spa_pod_builder_id(&b, data->type.media_subtype.raw);
spa_pod_builder_push_prop(&b, data->type.format_video.format,
SPA_POD_PROP_FLAG_UNSET |
SPA_POD_PROP_RANGE_ENUM);
for (i = 0, c = 0; i < info.num_texture_formats; i++) {
uint32_t id = sdl_format_to_id(data, info.texture_formats[i]);
if (id == 0)
continue;
if (c++ == 0)
spa_pod_builder_id(&b, id);
spa_pod_builder_id(&b, id);
}
for (i = 0; i < SPA_N_ELEMENTS(video_formats); i++) {
uint32_t id =
*SPA_MEMBER(&data->type.video_format, video_formats[i].id,
uint32_t);
if (id != data->type.video_format.UNKNOWN)
spa_pod_builder_id(&b, id);
}
spa_pod_builder_pop(&b);
spa_pod_builder_add(&b,
":", data->type.format_video.size, "Rru", &SPA_RECTANGLE(WIDTH, HEIGHT),
SPA_POD_PROP_MIN_MAX(&SPA_RECTANGLE(1,1),
&SPA_RECTANGLE(info.max_texture_width,
info.max_texture_height)),
":", data->type.format_video.framerate, "Fru", &SPA_FRACTION(25,1),
SPA_POD_PROP_MIN_MAX(&SPA_RECTANGLE(0,1),
&SPA_RECTANGLE(30,1)),
NULL);
params[0] = spa_pod_builder_pop(&b);
printf("supported formats:\n");
spa_debug_pod(params[0], SPA_DEBUG_FLAG_FORMAT);
pw_stream_add_listener(data->stream,
&data->stream_listener,
&stream_events,
data);
pw_stream_connect(data->stream,
PW_DIRECTION_INPUT,
data->path,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_MAP_BUFFERS,
params, 1);
break;
spa_pod_builder_push_prop(b, data->type.format_video.format,
SPA_POD_PROP_FLAG_UNSET |
SPA_POD_PROP_RANGE_ENUM);
for (i = 0, c = 0; i < info.num_texture_formats; i++) {
uint32_t id = sdl_format_to_id(data, info.texture_formats[i]);
if (id == 0)
continue;
if (c++ == 0)
spa_pod_builder_id(b, id);
spa_pod_builder_id(b, id);
}
default:
printf("remote state: \"%s\"\n", pw_remote_state_as_string(state));
break;
for (i = 0; i < SPA_N_ELEMENTS(video_formats); i++) {
uint32_t id =
*SPA_MEMBER(&data->type.video_format, video_formats[i].id,
uint32_t);
if (id != data->type.video_format.UNKNOWN)
spa_pod_builder_id(b, id);
}
}
spa_pod_builder_pop(b);
spa_pod_builder_add(b,
":", data->type.format_video.size, "Rru", &SPA_RECTANGLE(WIDTH, HEIGHT),
SPA_POD_PROP_MIN_MAX(&SPA_RECTANGLE(1,1),
&SPA_RECTANGLE(info.max_texture_width,
info.max_texture_height)),
":", data->type.format_video.framerate, "Fru", &SPA_FRACTION(25,1),
SPA_POD_PROP_MIN_MAX(&SPA_RECTANGLE(0,1),
&SPA_RECTANGLE(30,1)),
NULL);
params[0] = spa_pod_builder_pop(b);
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_state_changed,
};
printf("supported formats:\n");
spa_debug_pod(params[0], SPA_DEBUG_FLAG_FORMAT);
static void connect_state_changed(void *_data, enum pw_remote_state old,
enum pw_remote_state state, const char *error)
{
struct data *data = _data;
printf("remote state: \"%s\"\n", pw_remote_state_as_string(state));
switch (state) {
case PW_REMOTE_STATE_ERROR:
case PW_REMOTE_STATE_CONNECTED:
pw_main_loop_quit(data->loop);
break;
default:
break;
}
}
static int get_fd(struct data *data)
{
int fd;
struct pw_remote *remote = pw_remote_new(data->core, NULL, 0);
struct spa_hook remote_listener;
const struct pw_remote_events revents = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = connect_state_changed,
};
pw_remote_add_listener(remote, &remote_listener, &revents, data);
if (pw_remote_connect(remote) < 0)
return -1;
pw_main_loop_run(data->loop);
fd = pw_remote_steal_fd(remote);
pw_remote_destroy(remote);
return fd;
return 0;
}
int main(int argc, char *argv[])
{
struct data data = { 0, };
const struct spa_pod *params[1];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
pw_init(&argc, &argv);
data.loop = pw_main_loop_new(NULL);
data.core = pw_core_new(pw_main_loop_get_loop(data.loop), NULL);
data.stream = pw_stream_new_simple(
pw_main_loop_get_loop(data.loop),
"video-play",
pw_properties_new("pipewire.client.reuse", "1", NULL),
&stream_events,
&data);
data.remote = pw_stream_get_remote(data.stream);
data.core = pw_remote_get_core(data.remote);
data.t = pw_core_get_type(data.core);
data.remote = pw_remote_new(data.core, NULL, 0);
data.path = argc > 1 ? argv[1] : NULL;
init_type(&data.type, data.t->map);
spa_debug_set_type_map(data.t->map);
if (SDL_Init(SDL_INIT_VIDEO) < 0) {
@ -431,13 +361,19 @@ int main(int argc, char *argv[])
return -1;
}
pw_remote_add_listener(data.remote, &data.remote_listener, &remote_events, &data);
build_format(&data, &b, params);
pw_remote_connect_fd(data.remote, get_fd(&data));
pw_stream_connect(data.stream,
PW_DIRECTION_INPUT,
data.path,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_MAP_BUFFERS,
params, 1);
pw_main_loop_run(data.loop);
pw_core_destroy(data.core);
pw_stream_destroy(data.stream);
pw_main_loop_destroy(data.loop);
return 0;

View file

@ -62,6 +62,13 @@ struct queue {
struct spa_ringbuffer ring;
};
struct data {
struct pw_core *core;
struct pw_remote *remote;
struct spa_hook remote_listener;
struct spa_hook stream_listener;
};
struct stream {
struct pw_stream this;
@ -104,8 +111,12 @@ struct stream {
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
bool free_data;
struct data data;
};
static inline void push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
{
uint32_t index;
@ -129,14 +140,14 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu
return &stream->buffers[id];
}
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, const char *error)
{
enum pw_stream_state old = stream->state;
bool res = old != state;
if (res) {
if (stream->error)
free(stream->error);
stream->error = error;
stream->error = error ? strdup(error) : NULL;
pw_log_debug("stream %p: update state from %s -> %s (%s)", stream,
pw_stream_state_as_string(old),
@ -575,34 +586,6 @@ static const struct spa_node impl_node = {
.port_reuse_buffer = impl_port_reuse_buffer,
};
#if 0
static void on_state_changed(void *_data, enum pw_remote_state old,
enum pw_remote_state state, const char *error)
{
struct stream *data = _data;
switch (state) {
case PW_REMOTE_STATE_ERROR:
printf("remote error: %s\n", error);
pw_main_loop_quit(data->loop);
break;
case PW_REMOTE_STATE_CONNECTED:
make_node(data);
break;
default:
printf("remote state: \"%s\"\n", pw_remote_state_as_string(state));
break;
}
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_state_changed,
};
#endif
struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
struct pw_properties *props)
{
@ -655,36 +638,89 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
return NULL;
}
#if 0
struct pw_stream * pw_stream_new_simple(const char *name, struct pw_properties *props)
static int handle_connect(struct pw_stream *stream)
{
struct stream data = { 0, };
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_init(&argc, &argv);
impl->node = pw_node_new(impl->core, "export-source",
pw_properties_copy(stream->properties), 0);
impl->impl_node = impl_node;
data.loop = pw_main_loop_new(NULL);
data.core = pw_core_new(pw_main_loop_get_loop(data.loop), NULL);
data.t = pw_core_get_type(data.core);
data.remote = pw_remote_new(data.core, NULL, 0);
data.path = argc > 1 ? argv[1] : NULL;
if (impl->direction == SPA_DIRECTION_INPUT)
impl->impl_node.process = impl_node_process_input;
else
impl->impl_node.process = impl_node_process_output;
spa_list_init(&data.empty);
init_type(&data.type, data.t->map);
reset_props(&data.props);
spa_debug_set_type_map(data.t->map);
pw_node_set_implementation(impl->node, &impl->impl_node);
pw_remote_add_listener(data.remote, &data.remote_listener, &remote_events, &data);
pw_remote_connect(data.remote);
pw_main_loop_run(data.loop);
pw_core_destroy(data.core);
pw_main_loop_destroy(data.loop);
pw_node_register(impl->node, NULL, NULL, NULL);
pw_node_set_active(impl->node, true);
pw_remote_export(stream->remote, impl->node);
return 0;
}
#endif
static void on_remote_state_changed(void *_data, enum pw_remote_state old,
enum pw_remote_state state, const char *error)
{
struct pw_stream *stream = _data;
switch (state) {
case PW_REMOTE_STATE_ERROR:
stream_set_state(stream, PW_STREAM_STATE_ERROR, error);
break;
case PW_REMOTE_STATE_UNCONNECTED:
stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, "remote unconnected");
break;
case PW_REMOTE_STATE_CONNECTED:
handle_connect(stream);
break;
default:
break;
}
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_remote_state_changed,
};
struct pw_stream *
pw_stream_new_simple(struct pw_loop *loop,
const char *name,
struct pw_properties *props,
const struct pw_stream_events *events,
void *data)
{
struct pw_stream *stream;
struct stream *impl;
struct pw_core *core;
struct pw_remote *remote;
core = pw_core_new(loop, NULL);
remote = pw_remote_new(core, NULL, 0);
stream = pw_stream_new(remote, name, props);
if (stream == NULL)
goto cleanup;
impl = SPA_CONTAINER_OF(stream, struct stream, this);
impl->free_data = true;
impl->data.core = core;
impl->data.remote = remote;
pw_remote_add_listener(remote, &impl->data.remote_listener, &remote_events, stream);
pw_stream_add_listener(stream, &impl->data.stream_listener, events, data);
return stream;
cleanup:
pw_core_destroy(core);
return NULL;
}
const char *pw_stream_state_as_string(enum pw_stream_state state)
{
@ -774,6 +810,9 @@ void pw_stream_destroy(struct pw_stream *stream)
if (stream->name)
free(stream->name);
if (impl->free_data)
pw_core_destroy(impl->data.core);
free(impl);
}
@ -802,6 +841,11 @@ const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream)
return stream->properties;
}
struct pw_remote *pw_stream_get_remote(struct pw_stream *stream)
{
return stream->remote;
}
int
pw_stream_connect(struct pw_stream *stream,
enum pw_direction direction,
@ -811,6 +855,7 @@ pw_stream_connect(struct pw_stream *stream,
uint32_t n_params)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int res;
impl->direction =
direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
@ -827,22 +872,12 @@ pw_stream_connect(struct pw_stream *stream,
if (flags & PW_STREAM_FLAG_AUTOCONNECT)
pw_properties_set(stream->properties, PW_NODE_PROP_AUTOCONNECT, "1");
impl->node = pw_node_new(impl->core, "export-source",
pw_properties_copy(stream->properties), 0);
impl->impl_node = impl_node;
if (impl->direction == SPA_DIRECTION_INPUT)
impl->impl_node.process = impl_node_process_input;
if (pw_remote_get_state(stream->remote, NULL) == PW_REMOTE_STATE_UNCONNECTED)
res = pw_remote_connect(stream->remote);
else
impl->impl_node.process = impl_node_process_output;
res = handle_connect(stream);
pw_node_set_implementation(impl->node, &impl->impl_node);
pw_node_register(impl->node, NULL, NULL, NULL);
pw_node_set_active(impl->node, true);
pw_remote_export(stream->remote, impl->node);
return 0;
return res;
}
uint32_t pw_stream_get_node_id(struct pw_stream *stream)

View file

@ -224,6 +224,13 @@ pw_stream_new(struct pw_remote *remote, /**< a \ref pw_remote */
const char *name, /**< a stream name */
struct pw_properties *props /**< stream properties, ownership is taken */);
struct pw_stream *
pw_stream_new_simple(struct pw_loop *loop, /**< a \ref pw_loop to use */
const char *name, /**< a stream name */
struct pw_properties *props,/**< stream properties, ownership is taken */
const struct pw_stream_events *events, /**< stream events */
void *data /**< data passed to events */);
/** Destroy a stream \memberof pw_stream */
void pw_stream_destroy(struct pw_stream *stream);
@ -236,6 +243,8 @@ enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **
const char *pw_stream_get_name(struct pw_stream *stream);
struct pw_remote *pw_stream_get_remote(struct pw_stream *stream);
/** Indicates that the stream is live, boolean default false */
#define PW_STREAM_PROP_IS_LIVE "pipewire.latency.is-live"
/** The minimum latency of the stream, int, default 0 */