work on separating port mixers

Make it possible to assign an arbitary node as the port mixer.
Also remove dynamically added ports.
Improve negotiation and allocation on the mixer ports
Add some more SSE optimisations
Move float mixer from the audio dsp to the port
Remove pw_node_get_free_port() and do things more explicitly.
Handle mixer ports in client-node
This commit is contained in:
Wim Taymans 2018-07-31 12:23:35 +02:00
parent f55cb422cb
commit ca898a00db
29 changed files with 2422 additions and 1504 deletions

View file

@ -297,7 +297,7 @@ static inline int spa_graph_node_impl_process(void *data, struct spa_graph_node
struct spa_node *n = data;
struct spa_graph_state *state = node->state;
spa_debug("node %p: process state %p: %d", node, state, state->status);
spa_debug("node %p: process state %p: %d, node %p", node, state, state->status, n);
if ((state->status = spa_node_process(n)) != SPA_STATUS_OK)
spa_graph_node_trigger(node);

View file

@ -57,6 +57,7 @@ struct spa_port_info {
#define SPA_PORT_INFO_FLAG_TERMINAL (1<<8) /**< data was not created from this port
* or will not be made available on another
* port */
#define SPA_PORT_INFO_FLAG_DYNAMIC_DATA (1<<9) /**< data pointer on buffers can be changed */
uint32_t flags; /**< port flags */
uint32_t rate; /**< rate of sequence numbers on port */
const struct spa_dict *props; /**< extra port properties */

View file

@ -774,6 +774,9 @@ static int impl_node_process(struct spa_node *node)
r = spa_node_process(this->nodes[i]);
spa_log_trace(this->log, NAME " %p: process %d %d", this, i, r);
if (r < 0)
return r;
if (r & SPA_STATUS_HAVE_BUFFER)
ready++;

View file

@ -0,0 +1,269 @@
/* Spa
* Copyright (C) 2018 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <string.h>
#include <stdio.h>
#include <spa/utils/defs.h>
#include <xmmintrin.h>
static void
conv_s16_to_f32d_1_sse(void *data, int n_dst, void *dst[n_dst], const void *src, int n_bytes)
{
const int16_t *s = src;
float **d = (float **) dst;
int n, n_samples;
__m128 out, factor = _mm_set1_ps(1.0f / S16_SCALE);
n_samples = n_bytes / (sizeof(int16_t) * n_dst);
for(n = 0; n_samples--; n++) {
out = _mm_cvtsi32_ss(out, *s);
out = _mm_mul_ss(out, factor);
_mm_store_ss(&d[0][n], out);
s += n_dst;
}
}
static void
conv_s16_to_f32d_2_sse(void *data, int n_dst, void *dst[n_dst], const void *src, int n_bytes)
{
const int16_t *s = src;
float **d = (float **) dst;
int n, n_samples, unrolled;
__m128i in, t[2];
__m128 out[2], factor = _mm_set1_ps(1.0f / S16_SCALE);
n_samples = n_bytes / (sizeof(int16_t) * n_dst);
unrolled = n_samples / 4;
n_samples = n_samples & 3;
for(n = 0; unrolled--; n += 4) {
in = _mm_loadu_si128((__m128i*)s);
t[0] = _mm_slli_epi32(in, 16);
t[0] = _mm_srai_epi32(t[0], 16);
t[1] = _mm_srai_epi32(in, 16);
out[0] = _mm_cvtepi32_ps(t[0]);
out[0] = _mm_mul_ps(out[0], factor);
out[1] = _mm_cvtepi32_ps(t[1]);
out[1] = _mm_mul_ps(out[1], factor);
_mm_storeu_ps(&d[0][n], out[0]);
_mm_storeu_ps(&d[1][n], out[1]);
s += 4*n_dst;
}
for(; n_samples--; n++) {
out[0] = _mm_cvtsi32_ss(out[0], s[0]);
out[0] = _mm_mul_ss(out[0], factor);
out[1] = _mm_cvtsi32_ss(out[1], s[1]);
out[1] = _mm_mul_ss(out[1], factor);
_mm_store_ss(&d[0][n], out[0]);
_mm_store_ss(&d[1][n], out[1]);
s += n_dst;
}
}
static void
conv_s16_to_f32d_sse(void *data, int n_dst, void *dst[n_dst], int n_src, const void *src[n_src], int n_bytes)
{
const int16_t *s = src[0];
int i = 0;
for(; i + 1 < n_dst; i += 2)
conv_s16_to_f32d_2_sse(data, n_dst, &dst[i], &s[i], n_bytes);
for(; i < n_dst; i++)
conv_s16_to_f32d_1_sse(data, n_dst, &dst[i], &s[i], n_bytes);
}
static void
conv_f32d_to_s32_1_sse(void *data, void *dst, int n_src, const void *src[n_src], int n_bytes)
{
const float **s = (const float **) src;
int32_t *d = dst;
int n, n_samples, unrolled;
__m128 in[1];
__m128i out[4];
__m128 int_max = _mm_set1_ps(S24_MAX_F);
__m128 int_min = _mm_sub_ps(_mm_setzero_ps(), int_max);
n_samples = n_bytes / sizeof(float);
unrolled = n_samples / 4;
n_samples = n_samples & 3;
for(n = 0; unrolled--; n += 4) {
in[0] = _mm_mul_ps(_mm_loadu_ps(&s[0][n]), int_max);
in[0] = _mm_min_ps(int_max, _mm_max_ps(in[0], int_min));
out[0] = _mm_slli_epi32(_mm_cvttps_epi32(in[0]), 8);
out[1] = _mm_shuffle_epi32(out[0], _MM_SHUFFLE(0, 3, 2, 1));
out[2] = _mm_shuffle_epi32(out[0], _MM_SHUFFLE(1, 0, 3, 2));
out[3] = _mm_shuffle_epi32(out[0], _MM_SHUFFLE(2, 1, 0, 3));
*(d + 0*n_src) = _mm_cvtsi128_si32(out[0]);
*(d + 1*n_src) = _mm_cvtsi128_si32(out[1]);
*(d + 2*n_src) = _mm_cvtsi128_si32(out[2]);
*(d + 3*n_src) = _mm_cvtsi128_si32(out[3]);
d += 4*n_src;
}
for(; n_samples--; n++) {
in[0] = _mm_load_ss(&s[0][n]);
in[0] = _mm_mul_ss(in[0], int_max);
in[0] = _mm_min_ss(int_max, _mm_max_ss(in[0], int_min));
*d = _mm_cvttss_si32(in[0]) << 8;
d += n_src;
}
}
static void
conv_f32d_to_s32_2_sse(void *data, void *dst, int n_src, const void *src[n_src], int n_bytes)
{
const float **s = (const float **) src;
int32_t *d = dst;
int n, n_samples, unrolled;
__m128 in[2];
__m128i out[2], t[2];
__m128 int_max = _mm_set1_ps(S24_MAX_F);
__m128 int_min = _mm_sub_ps(_mm_setzero_ps(), int_max);
n_samples = n_bytes / sizeof(float);
unrolled = n_samples / 4;
n_samples = n_samples & 3;
for(n = 0; unrolled--; n += 4) {
in[0] = _mm_mul_ps(_mm_loadu_ps(&s[0][n]), int_max);
in[1] = _mm_mul_ps(_mm_loadu_ps(&s[1][n]), int_max);
in[0] = _mm_min_ps(int_max, _mm_max_ps(in[0], int_min));
in[1] = _mm_min_ps(int_max, _mm_max_ps(in[1], int_min));
out[0] = _mm_slli_epi32(_mm_cvttps_epi32(in[0]), 8);
out[1] = _mm_slli_epi32(_mm_cvttps_epi32(in[1]), 8);
t[0] = _mm_unpacklo_epi32(out[0], out[1]);
t[1] = _mm_shuffle_epi32(t[0], _MM_SHUFFLE(0, 0, 2, 2));
t[2] = _mm_unpackhi_epi32(out[0], out[1]);
t[3] = _mm_shuffle_epi32(t[2], _MM_SHUFFLE(0, 0, 2, 2));
_mm_storel_epi64((__m128i*)(d + 0*n_src), t[0]);
_mm_storel_epi64((__m128i*)(d + 1*n_src), t[1]);
_mm_storel_epi64((__m128i*)(d + 2*n_src), t[2]);
_mm_storel_epi64((__m128i*)(d + 3*n_src), t[3]);
d += 4*n_src;
}
for(; n_samples--; n++) {
in[0] = _mm_load_ss(&s[0][n]);
in[1] = _mm_load_ss(&s[1][n]);
in[0] = _mm_unpacklo_ps(in[0], in[1]);
in[0] = _mm_mul_ps(in[0], int_max);
in[0] = _mm_min_ps(int_max, _mm_max_ps(in[0], int_min));
out[0] = _mm_slli_epi32(_mm_cvttps_epi32(in[0]), 8);
_mm_storel_epi64((__m128i*)d, out[0]);
d += n_src;
}
}
static void
conv_f32d_to_s32_4_sse(void *data, void *dst, int n_src, const void *src[n_src], int n_bytes)
{
const float **s = (const float **) src;
int32_t *d = dst;
int n, n_samples, unrolled;
__m128 in[4];
__m128i out[4], t[4];
__m128 int_max = _mm_set1_ps(S24_MAX_F);
__m128 int_min = _mm_sub_ps(_mm_setzero_ps(), int_max);
n_samples = n_bytes / sizeof(float);
unrolled = n_samples / 4;
n_samples = n_samples & 3;
for(n = 0; unrolled--; n += 4) {
in[0] = _mm_mul_ps(_mm_loadu_ps(&s[0][n]), int_max);
in[1] = _mm_mul_ps(_mm_loadu_ps(&s[1][n]), int_max);
in[2] = _mm_mul_ps(_mm_loadu_ps(&s[2][n]), int_max);
in[3] = _mm_mul_ps(_mm_loadu_ps(&s[3][n]), int_max);
in[0] = _mm_min_ps(int_max, _mm_max_ps(in[0], int_min));
in[1] = _mm_min_ps(int_max, _mm_max_ps(in[1], int_min));
in[2] = _mm_min_ps(int_max, _mm_max_ps(in[2], int_min));
in[3] = _mm_min_ps(int_max, _mm_max_ps(in[3], int_min));
out[0] = _mm_slli_epi32(_mm_cvttps_epi32(in[0]), 8);
out[1] = _mm_slli_epi32(_mm_cvttps_epi32(in[1]), 8);
out[2] = _mm_slli_epi32(_mm_cvttps_epi32(in[2]), 8);
out[3] = _mm_slli_epi32(_mm_cvttps_epi32(in[3]), 8);
/* transpose */
t[0] = _mm_unpacklo_epi32(out[0], out[1]);
t[1] = _mm_unpacklo_epi32(out[2], out[3]);
t[2] = _mm_unpackhi_epi32(out[0], out[1]);
t[3] = _mm_unpackhi_epi32(out[2], out[3]);
out[0] = _mm_unpacklo_epi64(t[0], t[1]);
out[1] = _mm_unpackhi_epi64(t[0], t[1]);
out[2] = _mm_unpacklo_epi64(t[2], t[3]);
out[3] = _mm_unpackhi_epi64(t[2], t[3]);
_mm_storeu_si128((__m128i*)(d + 0), out[0]);
_mm_storeu_si128((__m128i*)(d + 4), out[1]);
_mm_storeu_si128((__m128i*)(d + 8), out[2]);
_mm_storeu_si128((__m128i*)(d + 12), out[3]);
d += 4*n_src;
}
for(; n_samples--; n++) {
in[0] = _mm_load_ss(&s[0][n]);
in[1] = _mm_load_ss(&s[1][n]);
in[2] = _mm_load_ss(&s[2][n]);
in[3] = _mm_load_ss(&s[3][n]);
in[0] = _mm_unpacklo_ps(in[0], in[2]);
in[1] = _mm_unpacklo_ps(in[1], in[3]);
in[0] = _mm_unpacklo_ps(in[0], in[1]);
in[0] = _mm_mul_ps(in[0], int_max);
in[0] = _mm_min_ps(int_max, _mm_max_ps(in[0], int_min));
out[0] = _mm_slli_epi32(_mm_cvttps_epi32(in[0]), 8);
_mm_storeu_si128((__m128i*)d, out[0]);
d += n_src;
}
}
static void
conv_f32d_to_s32_sse(void *data, int n_dst, void *dst[n_dst], int n_src, const void *src[n_src], int n_bytes)
{
int32_t *d = dst[0];
int i = 0;
for(; i + 3 < n_src; i += 4)
conv_f32d_to_s32_4_sse(data, &d[i], n_src, &src[i], n_bytes);
for(; i + 1 < n_src; i += 2)
conv_f32d_to_s32_2_sse(data, &d[i], n_src, &src[i], n_bytes);
for(; i < n_src; i++)
conv_f32d_to_s32_1_sse(data, &d[i], n_src, &src[i], n_bytes);
}

View file

@ -33,12 +33,17 @@
#define S24_MIN -8388607
#define S24_MAX 8388607
#define S24_MAX_F 8388607.0f
#define S24_SCALE 8388607
#define S32_MIN -2147483647
#define S32_MAX 2147483647
#define S32_SCALE 2147483647
#if defined (__SSE__)
#include "fmt-ops-sse.c"
#endif
static void
conv_copy(void *data, int n_dst, void *dst[n_dst], int n_src, const void *src[n_src], int n_bytes)
{
@ -430,6 +435,7 @@ conv_f32d_to_s32(void *data, int n_dst, void *dst[n_dst], int n_src, const void
}
}
#define F32_TO_S24(v) \
({ \
typeof(v) _v = (v); \
@ -662,6 +668,8 @@ typedef void (*convert_func_t) (void *data, int n_dst, void *dst[n_dst],
static const struct conv_info {
off_t src_fmt;
off_t dst_fmt;
#define FEATURE_SSE (1<<0)
uint32_t features;
convert_func_t i2i;
convert_func_t i2d;
@ -670,75 +678,86 @@ static const struct conv_info {
{
/* to f32 */
{ offsetof(struct spa_type_audio_format, U8),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), 0,
conv_u8_to_f32, conv_u8_to_f32d, conv_u8d_to_f32 },
#if defined (__SSE2__)
{ offsetof(struct spa_type_audio_format, S16),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), FEATURE_SSE,
conv_s16_to_f32, conv_s16_to_f32d_sse, conv_s16d_to_f32 },
#endif
{ offsetof(struct spa_type_audio_format, S16),
offsetof(struct spa_type_audio_format, F32), 0,
conv_s16_to_f32, conv_s16_to_f32d, conv_s16d_to_f32 },
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), 0,
conv_copy, deinterleave_32, interleave_32 },
{ offsetof(struct spa_type_audio_format, S32),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), 0,
conv_s32_to_f32, conv_s32_to_f32d, conv_s32d_to_f32 },
{ offsetof(struct spa_type_audio_format, S24),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), 0,
conv_s24_to_f32, conv_s24_to_f32d, conv_s24d_to_f32 },
{ offsetof(struct spa_type_audio_format, S24_32),
offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, F32), 0,
conv_s24_32_to_f32, conv_s24_32_to_f32d, conv_s24_32d_to_f32 },
/* from f32 */
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, U8),
offsetof(struct spa_type_audio_format, U8), 0,
conv_f32_to_u8, conv_f32_to_u8d, conv_f32d_to_u8 },
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, S16),
offsetof(struct spa_type_audio_format, S16), 0,
conv_f32_to_s16, conv_f32_to_s16d, conv_f32d_to_s16 },
#if defined (__SSE2__)
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, S32),
offsetof(struct spa_type_audio_format, S32), FEATURE_SSE,
conv_f32_to_s32, conv_f32_to_s32d, conv_f32d_to_s32_sse },
#endif
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, S32), 0,
conv_f32_to_s32, conv_f32_to_s32d, conv_f32d_to_s32 },
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, S24),
offsetof(struct spa_type_audio_format, S24), 0,
conv_f32_to_s24, conv_f32_to_s24d, conv_f32d_to_s24 },
{ offsetof(struct spa_type_audio_format, F32),
offsetof(struct spa_type_audio_format, S24_32),
offsetof(struct spa_type_audio_format, S24_32), 0,
conv_f32_to_s24_32, conv_f32_to_s24_32d, conv_f32d_to_s24_32 },
/* u8 */
{ offsetof(struct spa_type_audio_format, U8),
offsetof(struct spa_type_audio_format, U8),
offsetof(struct spa_type_audio_format, U8), 0,
conv_copy, deinterleave_8, interleave_8 },
/* s16 */
{ offsetof(struct spa_type_audio_format, S16),
offsetof(struct spa_type_audio_format, S16),
offsetof(struct spa_type_audio_format, S16), 0,
conv_copy, deinterleave_16, interleave_16 },
/* s32 */
{ offsetof(struct spa_type_audio_format, S32),
offsetof(struct spa_type_audio_format, S32),
offsetof(struct spa_type_audio_format, S32), 0,
conv_copy, deinterleave_32, interleave_32 },
/* s24 */
{ offsetof(struct spa_type_audio_format, S24),
offsetof(struct spa_type_audio_format, S24),
offsetof(struct spa_type_audio_format, S24), 0,
conv_copy, deinterleave_24, interleave_24 },
/* s24_32 */
{ offsetof(struct spa_type_audio_format, S24_32),
offsetof(struct spa_type_audio_format, S24_32),
offsetof(struct spa_type_audio_format, S24_32), 0,
conv_copy, deinterleave_32, interleave_32 },
};
static const struct conv_info *find_conv_info(struct spa_type_audio_format *audio_format,
uint32_t src_fmt, uint32_t dst_fmt)
uint32_t src_fmt, uint32_t dst_fmt, uint32_t features)
{
int i;
for (i = 0; i < SPA_N_ELEMENTS(conv_table); i++) {
if (*SPA_MEMBER(audio_format, conv_table[i].src_fmt, uint32_t) == src_fmt &&
*SPA_MEMBER(audio_format, conv_table[i].dst_fmt, uint32_t) == dst_fmt)
*SPA_MEMBER(audio_format, conv_table[i].dst_fmt, uint32_t) == dst_fmt &&
(conv_table[i].features == 0 || (conv_table[i].features & features) != 0))
return &conv_table[i];
}
return NULL;

View file

@ -160,9 +160,9 @@ struct impl {
bool started;
const struct conv_info *conv[2];
convert_func_t convert;
float empty[4096];
};
#define CHECK_FREE_PORT(this,d,id) (id < MAX_PORTS && !GET_PORT(this,d,id)->valid)
@ -196,6 +196,7 @@ static int setup_convert(struct impl *this)
uint32_t src_fmt, dst_fmt;
struct type *t = &this->type;
struct format informat, outformat;
const struct conv_info *conv;
if (collect_format(this, SPA_DIRECTION_INPUT, &informat) < 0)
return -1;
@ -222,19 +223,21 @@ static int setup_convert(struct impl *this)
return -EINVAL;
/* find fast path */
this->conv[0] = find_conv_info(&t->audio_format, src_fmt, dst_fmt);
if (this->conv[0] != NULL) {
conv = find_conv_info(&t->audio_format, src_fmt, dst_fmt, FEATURE_SSE);
if (conv != NULL) {
spa_log_info(this->log, NAME " %p: got converter features %08x", this,
conv->features);
if (informat.format.info.raw.layout == SPA_AUDIO_LAYOUT_INTERLEAVED) {
if (outformat.format.info.raw.layout == SPA_AUDIO_LAYOUT_INTERLEAVED)
this->convert = this->conv[0]->i2i;
this->convert = conv->i2i;
else
this->convert = this->conv[0]->i2d;
this->convert = conv->i2d;
}
else {
if (outformat.format.info.raw.layout == SPA_AUDIO_LAYOUT_INTERLEAVED)
this->convert = this->conv[0]->d2i;
this->convert = conv->d2i;
else
this->convert = this->conv[0]->i2i;
this->convert = conv->i2i;
}
return 0;
}
@ -972,18 +975,15 @@ static int process_merge(struct impl *this)
for (i = 0; i < n_ins; i++) {
inport = GET_IN_PORT(this, i);
inio = inport->io;
if (inio == NULL)
if ((inio = inport->io) == NULL ||
inio->status != SPA_STATUS_HAVE_BUFFER ||
inio->buffer_id >= inport->n_buffers)
continue;
spa_log_trace(this->log, NAME " %p: %d %p %d %d %d", this, i,
inio, inio->status, inio->buffer_id, inport->stride);
if (inio->status != SPA_STATUS_HAVE_BUFFER)
continue;
if (inio->buffer_id >= inport->n_buffers)
continue;
inbuf = &inport->buffers[inio->buffer_id];
inb = inbuf->outbuf;
@ -1063,13 +1063,14 @@ static int process_split(struct impl *this)
outport = GET_OUT_PORT(this, i);
outio = outport->io;
if (outio == NULL)
continue;
goto empty;
spa_log_trace(this->log, NAME " %p: %d %p %d %d %d", this, i,
outio, outio->status, outio->buffer_id, outport->stride);
if (outio->status == SPA_STATUS_HAVE_BUFFER) {
res |= SPA_STATUS_HAVE_BUFFER;
continue;
goto empty;
}
if (outio->buffer_id < outport->n_buffers) {
@ -1077,8 +1078,12 @@ static int process_split(struct impl *this)
outio->buffer_id = SPA_ID_INVALID;
}
if ((outbuf = peek_buffer(this, outport)) == NULL)
return outio->status = -EPIPE;
if ((outbuf = peek_buffer(this, outport)) == NULL) {
outio->status = -EPIPE;
empty:
dst_datas[n_dst_datas++] = this->empty;
continue;
}
outb = outbuf->outbuf;

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ audioconvert_sources = ['fmtconvert.c',
'channelmix.c',
'resample.c',
'splitter.c',
'merger.c',
'audioconvert.c',
'plugin.c']

View file

@ -26,6 +26,7 @@ extern const struct spa_handle_factory spa_fmtconvert_factory;
extern const struct spa_handle_factory spa_channelmix_factory;
extern const struct spa_handle_factory spa_resample_factory;
extern const struct spa_handle_factory spa_splitter_factory;
extern const struct spa_handle_factory spa_merger_factory;
int
spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *index)
@ -49,6 +50,9 @@ spa_handle_factory_enum(const struct spa_handle_factory **factory, uint32_t *ind
case 4:
*factory = &spa_splitter_factory;
break;
case 5:
*factory = &spa_merger_factory;
break;
default:
return 0;
}

View file

@ -89,8 +89,6 @@ struct port {
struct spa_io_control_range *ctrl;
struct spa_port_info info;
struct spa_dict info_props;
struct spa_dict_item info_props_items[2];
bool have_format;
@ -256,10 +254,6 @@ static int impl_node_add_port(struct spa_node *node, enum spa_direction directio
port->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS |
SPA_PORT_INFO_FLAG_REMOVABLE;
port->info_props_items[0] = SPA_DICT_ITEM_INIT("port.dsp", "32 bit float mono audio");
port->info_props = SPA_DICT_INIT(port->info_props_items, 1);
port->info.props = &port->info_props;
this->port_count++;
if (this->last_port <= port_id)
this->last_port = port_id + 1;

View file

@ -943,71 +943,6 @@ static int mix_output(struct impl *this, size_t n_bytes)
return SPA_STATUS_HAVE_BUFFER;
}
#if 0
static int impl_node_process_input(struct spa_node *node)
{
struct impl *this;
uint32_t i;
struct port *outport;
size_t min_queued = SIZE_MAX;
struct spa_io_buffers *outio;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct impl, node);
outport = GET_OUT_PORT(this, 0);
outio = outport->io;
spa_return_val_if_fail(outio != NULL, -EIO);
spa_log_trace(this->log, NAME " %p: status %d", this, outio->status);
if (outio->status == SPA_STATUS_HAVE_BUFFER)
return SPA_STATUS_HAVE_BUFFER;
for (i = 0; i < this->last_port; i++) {
struct port *inport = GET_IN_PORT(this, i);
struct spa_io_buffers *inio;
if ((inio = inport->io) == NULL)
continue;
if (inport->queued_bytes == 0 &&
inio->status == SPA_STATUS_HAVE_BUFFER && inio->buffer_id < inport->n_buffers) {
struct buffer *b = &inport->buffers[inio->buffer_id];
struct spa_data *d = b->outbuf->datas;
if (!b->outstanding) {
spa_log_warn(this->log, NAME " %p: buffer %u in use", this,
inio->buffer_id);
inio->status = -EINVAL;
continue;
}
b->outstanding = false;
inio->buffer_id = SPA_ID_INVALID;
inio->status = SPA_STATUS_OK;
spa_list_append(&inport->queue, &b->link);
inport->queued_bytes = SPA_MIN(d[0].chunk->size, d[0].maxsize);
spa_log_trace(this->log, NAME " %p: queue buffer %d on port %d %zd %zd",
this, b->outbuf->id, i, inport->queued_bytes, min_queued);
}
if (inport->queued_bytes > 0 && inport->queued_bytes < min_queued)
min_queued = inport->queued_bytes;
}
if (min_queued != SIZE_MAX && min_queued > 0) {
outio->status = mix_output(this, min_queued);
} else {
outio->status = SPA_STATUS_NEED_BUFFER;
}
return outio->status;
}
#endif
static int impl_node_process(struct spa_node *node)
{
struct impl *this;
@ -1034,6 +969,7 @@ static int impl_node_process(struct spa_node *node)
recycle_buffer(this, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID;
}
/* produce more output if possible */
for (i = 0; i < this->last_port; i++) {
struct port *inport = GET_IN_PORT(this, i);

View file

@ -1,10 +1,10 @@
#load-module libpipewire-module-protocol-dbus
load-module libpipewire-module-rtkit
#load-module libpipewire-module-rtkit
load-module libpipewire-module-protocol-native
load-module libpipewire-module-suspend-on-idle
load-module libpipewire-module-spa-monitor alsa/libspa-alsa alsa-monitor alsa
load-module libpipewire-module-spa-monitor v4l2/libspa-v4l2 v4l2-monitor v4l2
#load-module libpipewire-module-spa-monitor bluez5/libspa-bluez5 bluez5-monitor bluez5
load-module libpipewire-module-spa-monitor bluez5/libspa-bluez5 bluez5-monitor bluez5
#load-module libpipewire-module-spa-node videotestsrc/libspa-videotestsrc videotestsrc videotestsrc Spa:POD:Object:Props:patternType=Spa:POD:Object:Props:patternType:snow
load-module libpipewire-module-client-node
load-module libpipewire-module-flatpak

View file

@ -89,6 +89,8 @@ struct data {
struct pw_node *node;
struct spa_port_info port_info;
struct spa_dict port_props;
struct spa_dict_item port_items[1];
struct spa_node impl_node;
const struct spa_node_callbacks *callbacks;
@ -180,7 +182,10 @@ static int impl_port_get_info(struct spa_node *node, enum spa_direction directio
d->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
d->port_info.rate = 0;
d->port_info.props = NULL;
d->port_info.props = &d->port_props;
d->port_items[0] = SPA_DICT_ITEM_INIT("port.dsp", "32 bit float mono audio");
d->port_props = SPA_DICT_INIT(d->port_items, 1);
*info = &d->port_info;
@ -206,7 +211,9 @@ static int port_enum_formats(struct spa_node *node,
":", d->type.format_audio.format, "Ieu", d->type.audio_format.S16,
SPA_POD_PROP_ENUM(2, d->type.audio_format.S16,
d->type.audio_format.F32),
":", d->type.format_audio.layout, "i", SPA_AUDIO_LAYOUT_INTERLEAVED,
":", d->type.format_audio.layout, "ieu", SPA_AUDIO_LAYOUT_INTERLEAVED,
SPA_POD_PROP_ENUM(2, SPA_AUDIO_LAYOUT_INTERLEAVED,
SPA_AUDIO_LAYOUT_NON_INTERLEAVED),
":", d->type.format_audio.channels, "iru", 2,
SPA_POD_PROP_MIN_MAX(1, INT32_MAX),
":", d->type.format_audio.rate, "iru", 44100,
@ -539,7 +546,7 @@ static void make_node(struct data *data)
struct pw_properties *props;
props = pw_properties_new(PW_NODE_PROP_AUTOCONNECT, "1",
PW_NODE_PROP_EXCLUSIVE, "1",
PW_NODE_PROP_EXCLUSIVE, "0",
NULL);
if (data->path)
pw_properties_set(props, PW_NODE_PROP_TARGET_NODE, data->path);

View file

@ -485,7 +485,7 @@ static void make_nodes(struct data *data)
props,
SPA_ID_INVALID);
data->link = pw_link_new(data->core,
pw_node_get_free_port(data->v4l2, PW_DIRECTION_OUTPUT),
pw_node_find_port(data->v4l2, PW_DIRECTION_OUTPUT, 0),
pw_node_find_port(data->node, PW_DIRECTION_INPUT, 0),
NULL,
NULL,

View file

@ -75,6 +75,7 @@ pipewire_module_protocol_native = shared_library('pipewire-module-protocol-nativ
pipewire_module_audio_session = shared_library('pipewire-module-media-session',
[ 'module-media-session.c',
'module-media-session/audio-dsp.c',
'module-media-session/floatmix.c',
'spa/spa-node.c' ],
c_args : pipewire_module_c_args,
include_directories : [configinc, spa_inc],

View file

@ -47,19 +47,20 @@
#define MAX_BUFFERS 64
#define MAX_AREAS 1024
#define MAX_IO 32
#define MAX_MIX 128
#define CHECK_IN_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_INPUTS)
#define CHECK_OUT_PORT_ID(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) < MAX_OUTPUTS)
#define CHECK_PORT_ID(this,d,p) (CHECK_IN_PORT_ID(this,d,p) || CHECK_OUT_PORT_ID(this,d,p))
#define CHECK_FREE_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && !(this)->in_ports[p].valid)
#define CHECK_FREE_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && !(this)->out_ports[p].valid)
#define CHECK_FREE_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->in_ports[p] == NULL)
#define CHECK_FREE_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p] == NULL)
#define CHECK_FREE_PORT(this,d,p) (CHECK_FREE_IN_PORT (this,d,p) || CHECK_FREE_OUT_PORT (this,d,p))
#define CHECK_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->in_ports[p].valid)
#define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p].valid)
#define CHECK_IN_PORT(this,d,p) (CHECK_IN_PORT_ID(this,d,p) && (this)->in_ports[p])
#define CHECK_OUT_PORT(this,d,p) (CHECK_OUT_PORT_ID(this,d,p) && (this)->out_ports[p])
#define CHECK_PORT(this,d,p) (CHECK_IN_PORT (this,d,p) || CHECK_OUT_PORT (this,d,p))
#define GET_IN_PORT(this,p) (&this->in_ports[p])
#define GET_OUT_PORT(this,p) (&this->out_ports[p])
#define GET_IN_PORT(this,p) (this->in_ports[p])
#define GET_OUT_PORT(this,p) (this->out_ports[p])
#define GET_PORT(this,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,p) : GET_OUT_PORT(this,p))
#define CHECK_PORT_BUFFER(this,b,p) (b < p->n_buffers)
@ -86,18 +87,32 @@ struct buffer {
struct spa_buffer buffer;
struct spa_meta metas[4];
struct spa_data datas[4];
bool outstanding;
uint32_t memid;
};
struct io {
uint32_t id;
uint32_t memid;
uint32_t mix_id;
};
struct mix {
bool valid;
bool active;
struct port *port;
uint32_t n_buffers;
struct buffer buffers[MAX_BUFFERS];
struct io ios[MAX_IO];
};
struct port {
bool valid;
struct pw_port *port;
struct impl *impl;
enum spa_direction direction;
uint32_t id;
struct spa_node node;
struct spa_port_info info;
struct pw_properties *properties;
@ -105,10 +120,7 @@ struct port {
uint32_t n_params;
struct spa_pod **params;
uint32_t n_buffers;
struct buffer buffers[MAX_BUFFERS];
struct io ios[MAX_IO];
struct mix mix[MAX_MIX+1];
};
struct node {
@ -132,8 +144,8 @@ struct node {
uint32_t n_inputs;
uint32_t max_outputs;
uint32_t n_outputs;
struct port in_ports[MAX_INPUTS];
struct port out_ports[MAX_OUTPUTS];
struct port *in_ports[MAX_INPUTS];
struct port *out_ports[MAX_OUTPUTS];
uint32_t n_params;
struct spa_pod **params;
@ -166,6 +178,7 @@ struct impl {
uint64_t start;
};
/** \endcond */
static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t flags)
@ -182,7 +195,6 @@ static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t
if (f == NULL) {
m = pw_array_add(&impl->mems, sizeof(struct mem));
m->id = pw_array_get_len(&impl->mems, struct mem) - 1;
m->ref = 0;
}
else {
m = f;
@ -190,6 +202,9 @@ static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t
m->fd = fd;
m->type = type;
m->flags = flags;
m->ref = 0;
pw_log_debug("client-node %p: add mem %d", impl, m->id);
pw_client_node_resource_add_mem(impl->node.resource,
m->id,
@ -198,9 +213,45 @@ static struct mem *ensure_mem(struct impl *impl, int fd, uint32_t type, uint32_t
m->flags);
found:
m->ref++;
pw_log_debug("client-node %p: mem %d, ref %d", impl, m->id, m->ref);
return m;
}
static struct mix *find_mix(struct port *p, uint32_t mix_id)
{
struct mix *mix;
if (mix_id == SPA_ID_INVALID)
return &p->mix[MAX_MIX];
if (mix_id >= MAX_MIX)
return NULL;
mix = &p->mix[mix_id];
return mix;
}
static void mix_init(struct mix *mix, struct port *p)
{
int i;
mix->valid = true;
mix->port = p;
mix->active = false;
mix->n_buffers = 0;
for (i = 0; i < MAX_IO; i++)
mix->ios[i].id = SPA_ID_INVALID;
}
static struct mix *ensure_mix(struct impl *impl, struct port *p, uint32_t mix_id)
{
struct mix *mix;
if ((mix = find_mix(p, mix_id)) == NULL)
return NULL;
if (mix->valid)
return mix;
mix_init(mix, p);
return mix;
}
static void clear_io(struct node *node, struct io *io)
{
struct mem *m;
@ -210,16 +261,16 @@ static void clear_io(struct node *node, struct io *io)
}
static struct io *update_io(struct impl *impl, struct port *port,
uint32_t mix_id, uint32_t id, uint32_t memid)
struct mix *mix, uint32_t id, uint32_t memid)
{
int i;
struct io *io, *f = NULL;
for (i = 0; i < MAX_IO; i++) {
io = &port->ios[i];
io = &mix->ios[i];
if (io->id == SPA_ID_INVALID)
f = io;
else if (io->id == id && io->mix_id == mix_id) {
else if (io->id == id) {
if (io->memid != memid) {
clear_io(&impl->node, io);
if (memid == SPA_ID_INVALID)
@ -234,31 +285,30 @@ static struct io *update_io(struct impl *impl, struct port *port,
io = f;
io->id = id;
io->memid = memid;
io->mix_id = mix_id;
found:
return io;
}
static void clear_ios(struct node *this, struct port *port)
static void clear_ios(struct node *this, struct mix *mix)
{
int i;
for (i = 0; i < MAX_IO; i++) {
struct io *io = &port->ios[i];
struct io *io = &mix->ios[i];
if (io->id != SPA_ID_INVALID)
clear_io(this, io);
}
}
static int clear_buffers(struct node *this, struct port *port)
static int clear_buffers(struct node *this, struct mix *mix)
{
uint32_t i, j;
struct impl *impl = this->impl;
struct pw_type *t = impl->t;
for (i = 0; i < port->n_buffers; i++) {
struct buffer *b = &port->buffers[i];
for (i = 0; i < mix->n_buffers; i++) {
struct buffer *b = &mix->buffers[i];
struct mem *m;
spa_log_debug(this->log, "node %p: clear buffer %d", this, i);
@ -273,15 +323,26 @@ static int clear_buffers(struct node *this, struct port *port)
id = SPA_PTR_TO_UINT32(b->buffer.datas[j].data);
m = pw_array_get_unchecked(&impl->mems, id, struct mem);
m->ref--;
pw_log_debug("client-node %p: mem %d, ref %d", impl, m->id, m->ref);
}
}
m = pw_array_get_unchecked(&impl->mems, b->memid, struct mem);
m->ref--;
pw_log_debug("client-node %p: mem %d, ref %d", impl, m->id, m->ref);
}
port->n_buffers = 0;
mix->n_buffers = 0;
return 0;
}
static void mix_clear(struct node *this, struct mix *mix)
{
if (!mix->valid)
return;
mix->valid = false;
clear_buffers(this, mix);
clear_ios(this, mix);
}
static int impl_node_enum_params(struct spa_node *node,
uint32_t id, uint32_t *index,
const struct spa_pod *filter,
@ -318,8 +379,7 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
@ -336,8 +396,8 @@ static int impl_node_send_command(struct spa_node *node, const struct spa_comman
struct node *this;
int res = 0;
if (node == NULL || command == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(command != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
@ -357,8 +417,7 @@ impl_node_set_callbacks(struct spa_node *node,
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
this->callbacks = callbacks;
@ -376,8 +435,7 @@ impl_node_get_n_ports(struct spa_node *node,
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
@ -403,20 +461,19 @@ impl_node_get_port_ids(struct spa_node *node,
struct node *this;
int c, i;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (input_ids) {
for (c = 0, i = 0; i < MAX_INPUTS && c < n_input_ids; i++) {
if (this->in_ports[i].valid)
if (this->in_ports[i])
input_ids[c++] = i;
}
}
if (output_ids) {
for (c = 0, i = 0; i < MAX_OUTPUTS && c < n_output_ids; i++) {
if (this->out_ports[i].valid)
if (this->out_ports[i])
output_ids[c++] = i;
}
}
@ -425,23 +482,19 @@ impl_node_get_port_ids(struct spa_node *node,
static void
do_update_port(struct node *this,
enum spa_direction direction,
uint32_t port_id,
struct port *port,
uint32_t change_mask,
uint32_t n_params,
const struct spa_pod **params,
const struct spa_port_info *info)
{
struct port *port;
struct pw_type *t = this->impl->t;
int i;
port = GET_PORT(this, direction, port_id);
if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) {
port->have_format = false;
spa_log_debug(this->log, "node %p: port %u update %d params", this, port_id, n_params);
spa_log_debug(this->log, "node %p: port %u update %d params", this, port->id, n_params);
for (i = 0; i < port->n_params; i++)
free(port->params[i]);
port->n_params = n_params;
@ -469,49 +522,35 @@ do_update_port(struct node *this,
}
}
}
if (!port->valid) {
spa_log_debug(this->log, "node %p: adding port %d", this, port_id);
port->have_format = false;
port->valid = true;
for (i = 0; i < MAX_IO; i++)
port->ios[i].id = SPA_ID_INVALID;
if (direction == SPA_DIRECTION_INPUT)
this->n_inputs++;
else
this->n_outputs++;
}
}
static void
clear_port(struct node *this,
struct port *port, enum spa_direction direction, uint32_t port_id)
clear_port(struct node *this, struct port *port)
{
do_update_port(this,
direction,
port_id,
int i;
spa_log_debug(this->log, "node %p: clear port %p", this, port);
if (port == NULL)
return ;
do_update_port(this, port,
PW_CLIENT_NODE_PORT_UPDATE_PARAMS |
PW_CLIENT_NODE_PORT_UPDATE_INFO, 0, NULL, NULL);
clear_buffers(this, port);
clear_ios(this, port);
}
static void do_uninit_port(struct node *this, enum spa_direction direction, uint32_t port_id)
{
struct port *port;
for (i = 0; i < MAX_MIX+1; i++) {
struct mix *mix = &port->mix[i];
mix_clear(this, mix);
}
spa_log_debug(this->log, "node %p: removing port %d", this, port_id);
if (direction == SPA_DIRECTION_INPUT) {
port = GET_IN_PORT(this, port_id);
if (port->direction == SPA_DIRECTION_INPUT) {
this->in_ports[port->id] = NULL;
this->n_inputs--;
} else {
port = GET_OUT_PORT(this, port_id);
}
else {
this->out_ports[port->id] = NULL;
this->n_outputs--;
}
clear_port(this, port, direction, port_id);
port->valid = false;
}
static int
@ -519,13 +558,10 @@ impl_node_add_port(struct spa_node *node, enum spa_direction direction, uint32_t
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_FREE_PORT(this, direction, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_FREE_PORT(this, direction, port_id), -EINVAL);
pw_client_node_resource_add_port(this->resource,
this->seq,
@ -539,13 +575,10 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
pw_client_node_resource_remove_port(this->resource,
this->seq,
@ -562,13 +595,12 @@ impl_node_port_get_info(struct spa_node *node,
struct node *this;
struct port *port;
if (node == NULL || info == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(info != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
*info = &port->info;
@ -597,6 +629,9 @@ impl_node_port_enum_params(struct spa_node *node,
port = GET_PORT(this, direction, port_id);
pw_log_debug("client-node %p: port %d.%d", this,
direction, port_id);
while (true) {
struct spa_pod *param;
@ -622,13 +657,13 @@ impl_node_port_set_param(struct spa_node *node,
{
struct node *this;
if (node == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
pw_log_debug(". %p %d %d", this, direction, port_id);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
pw_log_debug(".");
if (this->resource == NULL)
return 0;
@ -644,7 +679,7 @@ impl_node_port_set_param(struct spa_node *node,
static int do_port_set_io(struct impl *impl,
enum spa_direction direction, uint32_t port_id,
uint32_t mix_port_id,
uint32_t mix_id,
uint32_t id, void *data, size_t size)
{
struct node *this = &impl->node;
@ -653,19 +688,22 @@ static int do_port_set_io(struct impl *impl,
struct mem *m;
uint32_t memid, mem_offset, mem_size;
struct port *port;
struct mix *mix;
pw_log_debug("client-node %p: %s port %d.%d set io %p %zd", impl,
direction == SPA_DIRECTION_INPUT ? "input" : "output",
port_id, mix_port_id, data, size);
port_id, mix_id, data, size);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
if (this->resource == NULL)
return 0;
port = GET_PORT(this, direction, port_id);
if ((mix = find_mix(port, mix_id)) == NULL || !mix->valid)
return -EINVAL;
if (data) {
if ((mem = pw_memblock_find(data)) == NULL)
return -EINVAL;
@ -684,12 +722,12 @@ static int do_port_set_io(struct impl *impl,
mem_offset = mem_size = 0;
}
update_io(impl, port, mix_port_id, id, memid);
update_io(impl, port, mix, id, memid);
pw_client_node_resource_port_set_io(this->resource,
this->seq,
direction, port_id,
mix_port_id,
mix_id,
id,
memid,
mem_offset, mem_size);
@ -710,38 +748,38 @@ impl_node_port_set_io(struct spa_node *node,
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return do_port_set_io(impl, direction, port_id, 0, id, data, size);
return do_port_set_io(impl, direction, port_id, SPA_ID_INVALID, id, data, size);
}
static int
impl_node_port_use_buffers(struct spa_node *node,
enum spa_direction direction,
uint32_t port_id,
struct spa_buffer **buffers,
uint32_t n_buffers)
do_port_use_buffers(struct impl *impl,
enum spa_direction direction,
uint32_t port_id,
uint32_t mix_id,
struct spa_buffer **buffers,
uint32_t n_buffers)
{
struct node *this;
struct impl *impl;
struct port *port;
struct node *this = &impl->node;
struct port *p;
struct mix *mix;
uint32_t i, j;
struct pw_client_node_buffer *mb;
struct pw_type *t;
struct pw_type *t = impl->t;
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
spa_log_debug(this->log, "node %p: use buffers %p %u", this, buffers, n_buffers);
spa_log_debug(this->log, "client-node %p: %s port %d.%d use buffers %p %u", impl,
direction == SPA_DIRECTION_INPUT ? "input" : "output",
port_id, mix_id, buffers, n_buffers);
t = impl->t;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
port = GET_PORT(this, direction, port_id);
if (!port->have_format)
p = GET_PORT(this, direction, port_id);
if (!p->have_format)
return -EIO;
clear_buffers(this, port);
if ((mix = find_mix(p, mix_id)) == NULL || !mix->valid)
return -EINVAL;
clear_buffers(this, mix);
if (n_buffers > 0) {
mb = alloca(n_buffers * sizeof(struct pw_client_node_buffer));
@ -749,13 +787,13 @@ impl_node_port_use_buffers(struct spa_node *node,
mb = NULL;
}
port->n_buffers = n_buffers;
mix->n_buffers = n_buffers;
if (this->resource == NULL)
return 0;
for (i = 0; i < n_buffers; i++) {
struct buffer *b = &port->buffers[i];
struct buffer *b = &mix->buffers[i];
struct pw_memblock *mem;
struct mem *m;
size_t data_size, size;
@ -823,12 +861,31 @@ impl_node_port_use_buffers(struct spa_node *node,
pw_client_node_resource_port_use_buffers(this->resource,
this->seq,
direction, port_id, 0,
direction, port_id, mix_id,
n_buffers, mb);
return SPA_RESULT_RETURN_ASYNC(this->seq++);
}
static int
impl_node_port_use_buffers(struct spa_node *node,
enum spa_direction direction,
uint32_t port_id,
struct spa_buffer **buffers,
uint32_t n_buffers)
{
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return do_port_use_buffers(impl, direction, port_id,
SPA_ID_INVALID, buffers, n_buffers);
}
static int
impl_node_port_alloc_buffers(struct spa_node *node,
enum spa_direction direction,
@ -841,19 +898,19 @@ impl_node_port_alloc_buffers(struct spa_node *node,
struct node *this;
struct port *port;
if (node == NULL || buffers == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(buffers != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_PORT(this, direction, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
if (!port->have_format)
return -EIO;
spa_log_warn(this->log, "not supported");
return -ENOTSUP;
}
@ -862,10 +919,10 @@ impl_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t bu
{
struct node *this;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
if (!CHECK_OUT_PORT(this, SPA_DIRECTION_OUTPUT, port_id))
return -EINVAL;
spa_return_val_if_fail(CHECK_OUT_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL);
spa_log_trace(this->log, "reuse buffer %d", buffer_id);
@ -881,8 +938,8 @@ impl_node_port_send_command(struct spa_node *node,
struct impl *impl;
struct pw_type *t;
if (node == NULL || command == NULL)
return -EINVAL;
spa_return_val_if_fail(node != NULL, -EINVAL);
spa_return_val_if_fail(command != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
@ -980,6 +1037,7 @@ client_node_port_update(void *data,
{
struct impl *impl = data;
struct node *this = &impl->node;
struct port *port;
bool remove;
spa_log_debug(this->log, "node %p: got port update", this);
@ -988,16 +1046,37 @@ client_node_port_update(void *data,
remove = (change_mask == 0);
port = GET_PORT(this, direction, port_id);
if (remove) {
do_uninit_port(this, direction, port_id);
clear_port(this, port);
pw_node_update_ports(impl->this.node);
} else {
struct port dummy = { 0 }, *target;
if (port == NULL)
target = &dummy;
else
target = port;
do_update_port(this,
direction,
port_id,
target,
change_mask,
n_params, params, info);
if (port == NULL) {
if (direction == SPA_DIRECTION_INPUT) {
this->n_inputs++;
this->in_ports[port_id] = &dummy;
}
else {
this->n_outputs++;
this->out_ports[port_id] = &dummy;
}
pw_node_update_ports(impl->this.node);
}
}
pw_node_update_ports(impl->this.node);
}
static void client_node_set_active(void *data, bool active)
@ -1053,23 +1132,23 @@ static void node_on_data_fd_events(struct spa_source *source)
static const struct spa_node impl_node = {
SPA_VERSION_NODE,
NULL,
impl_node_enum_params,
impl_node_set_param,
impl_node_send_command,
impl_node_set_callbacks,
impl_node_get_n_ports,
impl_node_get_port_ids,
impl_node_add_port,
impl_node_remove_port,
impl_node_port_get_info,
impl_node_port_enum_params,
impl_node_port_set_param,
impl_node_port_use_buffers,
impl_node_port_alloc_buffers,
impl_node_port_set_io,
impl_node_port_reuse_buffer,
impl_node_port_send_command,
impl_node_process,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.send_command = impl_node_send_command,
.set_callbacks = impl_node_set_callbacks,
.get_n_ports = impl_node_get_n_ports,
.get_port_ids = impl_node_get_port_ids,
.add_port = impl_node_add_port,
.remove_port = impl_node_remove_port,
.port_get_info = impl_node_port_get_info,
.port_enum_params = impl_node_port_enum_params,
.port_set_param = impl_node_port_set_param,
.port_use_buffers = impl_node_port_use_buffers,
.port_alloc_buffers = impl_node_port_alloc_buffers,
.port_set_io = impl_node_port_set_io,
.port_reuse_buffer = impl_node_port_reuse_buffer,
.port_send_command = impl_node_port_send_command,
.process = impl_node_process,
};
static int
@ -1118,15 +1197,6 @@ static int node_clear(struct node *this)
free(this->params[i]);
free(this->params);
for (i = 0; i < MAX_INPUTS; i++) {
if (this->in_ports[i].valid)
clear_port(this, &this->in_ports[i], SPA_DIRECTION_INPUT, i);
}
for (i = 0; i < MAX_OUTPUTS; i++) {
if (this->out_ports[i].valid)
clear_port(this, &this->out_ports[i], SPA_DIRECTION_OUTPUT, i);
}
return 0;
}
@ -1249,7 +1319,12 @@ static void node_free(void *data)
static int port_init_mix(void *data, struct pw_port_mix *mix)
{
struct impl *impl = data;
struct port *port = data;
struct impl *impl = port->impl;
struct mix *m;
if ((m = ensure_mix(impl, port, mix->port.port_id)) == NULL)
return -ENOMEM;
mix->id = pw_map_insert_new(&impl->io_map, NULL);
@ -1266,12 +1341,19 @@ static int port_init_mix(void *data, struct pw_port_mix *mix)
static int port_release_mix(void *data, struct pw_port_mix *mix)
{
struct impl *impl = data;
struct port *port = data;
struct impl *impl = port->impl;
struct node *this = &impl->node;
struct mix *m;
pw_log_debug("client-node %p: remove mix io %d %p %p", impl, mix->id, mix->io,
impl->io_areas->ptr);
if ((m = find_mix(port, mix->port.port_id)) == NULL || !m->valid)
return -EINVAL;
pw_map_remove(&impl->io_map, mix->id);
mix_clear(this, m);
return 0;
}
@ -1281,16 +1363,58 @@ static const struct pw_port_implementation port_impl = {
.release_mix = port_release_mix,
};
static int mix_port_set_io(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
static int
impl_mix_add_port(struct spa_node *node, enum spa_direction direction, uint32_t mix_id)
{
struct port *port = SPA_CONTAINER_OF(node, struct port, node);
pw_log_debug("client-node %p: add port %d:%d.%d", node, direction, port->id, mix_id);
return 0;
}
static int
impl_mix_remove_port(struct spa_node *node, enum spa_direction direction, uint32_t mix_id)
{
struct port *port = SPA_CONTAINER_OF(node, struct port, node);
pw_log_debug("client-node %p: remove port %d:%d.%d", node, direction, port->id, mix_id);
return 0;
}
static int
impl_mix_port_use_buffers(struct spa_node *node,
enum spa_direction direction,
uint32_t mix_id,
struct spa_buffer **buffers,
uint32_t n_buffers)
{
struct port *port = SPA_CONTAINER_OF(node, struct port, node);
struct impl *impl = port->impl;
return do_port_use_buffers(impl, direction, port->id, mix_id, buffers, n_buffers);
}
static int
impl_mix_port_alloc_buffers(struct spa_node *node,
enum spa_direction direction,
uint32_t port_id,
struct spa_pod **params,
uint32_t n_params,
struct spa_buffer **buffers,
uint32_t *n_buffers)
{
return -ENOTSUP;
}
static int impl_mix_port_set_io(struct spa_node *node,
enum spa_direction direction, uint32_t mix_id,
uint32_t id, void *data, size_t size)
{
struct pw_port *p = SPA_CONTAINER_OF(node, struct pw_port, mix_node);
struct impl *impl = p->owner_data;
struct port *p = SPA_CONTAINER_OF(node, struct port, node);
struct pw_port *port = p->port;
struct impl *impl = port->owner_data;
struct pw_port_mix *mix;
struct pw_type *t = impl->t;
mix = pw_map_lookup(&p->mix_port_map, port_id);
mix = pw_map_lookup(&port->mix_port_map, mix_id);
if (mix == NULL)
return -EIO;
@ -1302,34 +1426,98 @@ static int mix_port_set_io(struct spa_node *node,
}
return do_port_set_io(impl,
direction, p->port_id, mix->port.port_id,
direction, port->port_id, mix->port.port_id,
id, data, size);
}
static int mix_port_process(struct spa_node *data)
static int impl_mix_process(struct spa_node *data)
{
return SPA_STATUS_HAVE_BUFFER;
}
static const struct spa_node impl_port_mix = {
SPA_VERSION_NODE,
NULL,
.enum_params = impl_node_enum_params,
.set_param = impl_node_set_param,
.send_command = impl_node_send_command,
.get_n_ports = impl_node_get_n_ports,
.get_port_ids = impl_node_get_port_ids,
.port_get_info = impl_node_port_get_info,
.port_enum_params = impl_node_port_enum_params,
.port_set_param = impl_node_port_set_param,
.add_port = impl_mix_add_port,
.remove_port = impl_mix_remove_port,
.port_use_buffers = impl_mix_port_use_buffers,
.port_alloc_buffers = impl_mix_port_alloc_buffers,
.port_set_io = impl_mix_port_set_io,
.port_reuse_buffer = impl_node_port_reuse_buffer,
.port_send_command = impl_node_port_send_command,
.process = impl_mix_process,
};
static void node_port_init(void *data, struct pw_port *port)
{
struct impl *impl = data;
struct port *p = pw_port_get_user_data(port), *dummy;
struct node *this = &impl->node;
pw_log_debug("client-node %p: port %p init", &impl->this, port);
if (port->direction == PW_DIRECTION_INPUT)
dummy = this->in_ports[port->port_id];
else
dummy = this->out_ports[port->port_id];
*p = *dummy;
p->port = port;
p->direction = port->direction;
p->id = port->port_id;
p->impl = impl;
p->node = impl_port_mix;
mix_init(&p->mix[MAX_MIX], p);
if (p->direction == SPA_DIRECTION_INPUT)
this->in_ports[p->id] = p;
else
this->out_ports[p->id] = p;
return;
}
static void node_port_added(void *data, struct pw_port *port)
{
struct impl *impl = data;
struct port *p = pw_port_get_user_data(port);
pw_log_debug("client-node %p: port %p added", &impl->this, port);
port->mix_node.port_set_io = mix_port_set_io;
port->mix_node.process = mix_port_process;
pw_port_set_mix(port, &p->node,
PW_PORT_MIX_FLAG_MULTI |
PW_PORT_MIX_FLAG_MIX_ONLY);
port->implementation = &port_impl;
port->implementation_data = impl;
port->implementation_data = p;
port->owner_data = impl;
}
static void node_port_removed(void *data, struct pw_port *port)
{
struct impl *impl = data;
struct node *this = &impl->node;
struct port *p = pw_port_get_user_data(port);
pw_log_debug("client-node %p: port %p remove", &impl->this, port);
clear_port(this, p);
}
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.free = node_free,
.initialized = node_initialized,
.port_init = node_port_init,
.port_added = node_port_added,
.port_removed = node_port_removed,
};
static const struct pw_resource_events resource_events = {
@ -1409,6 +1597,7 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
impl);
impl->node.resource = this->resource;
this->node->port_user_data_size = sizeof(struct port);
pw_node_add_listener(this->node, &impl->node_listener, &node_events, impl);

View file

@ -92,6 +92,7 @@ struct impl {
struct spa_node *cnode;
struct spa_node *adapter;
struct spa_node *adapter_mix;
bool use_converter;
@ -266,10 +267,10 @@ impl_node_add_port(struct spa_node *node, enum spa_direction direction, uint32_t
if (direction != impl->direction)
return -EINVAL;
if ((res = spa_node_add_port(impl->adapter, direction, port_id)) < 0)
if ((res = spa_node_add_port(impl->adapter_mix, direction, port_id)) < 0)
return res;
if ((res = spa_node_port_set_io(impl->adapter,
if ((res = spa_node_port_set_io(impl->adapter_mix,
direction, port_id,
t->io.ControlRange,
&impl->ctrl,
@ -293,7 +294,7 @@ impl_node_remove_port(struct spa_node *node, enum spa_direction direction, uint3
if (direction != this->impl->direction)
return -EINVAL;
return spa_node_remove_port(impl->adapter, direction, port_id);
return spa_node_remove_port(impl->adapter_mix, direction, port_id);
}
static int
@ -370,7 +371,6 @@ static int debug_params(struct impl *impl, struct spa_node *node,
spa_log_error(this->log, " error: %s", spa_strerror(res));
break;
}
spa_debug_pod(format, flag);
}
@ -397,11 +397,11 @@ static int negotiate_format(struct impl *impl)
spa_log_debug(this->log, "%p: negiotiate", impl);
state = 0;
if ((res = spa_node_port_enum_params(impl->adapter,
if ((res = spa_node_port_enum_params(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
t->param.idEnumFormat, &state,
NULL, &format, &b)) <= 0) {
debug_params(impl, impl->adapter, SPA_DIRECTION_REVERSE(impl->direction), 0,
debug_params(impl, impl->adapter_mix, SPA_DIRECTION_REVERSE(impl->direction), 0,
t->param.idEnumFormat, NULL);
return -ENOTSUP;
}
@ -419,7 +419,7 @@ static int negotiate_format(struct impl *impl)
spa_pod_fixate(format);
spa_debug_pod(format, SPA_DEBUG_FLAG_FORMAT);
if ((res = spa_node_port_set_param(impl->adapter,
if ((res = spa_node_port_set_param(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
t->param.idFormat, 0,
format)) < 0)
@ -457,11 +457,11 @@ static int negotiate_buffers(struct impl *impl)
return 0;
state = 0;
if ((res = spa_node_port_enum_params(impl->adapter,
if ((res = spa_node_port_enum_params(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
t->param.idBuffers, &state,
param, &param, &b)) <= 0) {
debug_params(impl, impl->adapter, SPA_DIRECTION_REVERSE(impl->direction), 0,
debug_params(impl, impl->adapter_mix, SPA_DIRECTION_REVERSE(impl->direction), 0,
t->param.idBuffers, param);
return -ENOTSUP;
}
@ -484,7 +484,7 @@ static int negotiate_buffers(struct impl *impl)
impl->direction, 0,
&out_info)) < 0)
return res;
if ((res = spa_node_port_get_info(impl->adapter,
if ((res = spa_node_port_get_info(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
&in_info)) < 0)
return res;
@ -548,27 +548,27 @@ static int negotiate_buffers(struct impl *impl)
skel, impl->mem->ptr);
if (in_alloc) {
if ((res = spa_node_port_alloc_buffers(impl->adapter,
if ((res = spa_node_port_alloc_buffers(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
NULL, 0,
impl->buffers, &impl->n_buffers)) < 0)
return res;
}
else {
if ((res = spa_node_port_use_buffers(impl->adapter,
if ((res = spa_node_port_use_buffers(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
impl->buffers, impl->n_buffers)) < 0)
return res;
}
if (out_alloc) {
if ((res = spa_node_port_alloc_buffers(impl->cnode,
if ((res = spa_node_port_alloc_buffers(impl->client_port->mix,
impl->direction, 0,
NULL, 0,
impl->buffers, &impl->n_buffers)) < 0)
return res;
}
else {
if ((res = spa_node_port_use_buffers(impl->cnode,
if ((res = spa_node_port_use_buffers(impl->client_port->mix,
impl->direction, 0,
impl->buffers, impl->n_buffers)) < 0)
return res;
@ -618,16 +618,19 @@ impl_node_port_set_param(struct spa_node *node,
impl = this->impl;
t = impl->t;
pw_log_debug(".");
if (direction != impl->direction)
return -EINVAL;
if ((res = spa_node_port_set_param(impl->adapter, direction, port_id, id,
pw_log_debug(".");
if ((res = spa_node_port_set_param(impl->adapter_mix, direction, port_id, id,
flags, param)) < 0)
return res;
pw_log_debug(".");
if (id == t->param.idFormat && impl->use_converter) {
if (param == NULL) {
if ((res = spa_node_port_set_param(impl->adapter,
if ((res = spa_node_port_set_param(impl->adapter_mix,
SPA_DIRECTION_REVERSE(direction), 0, id,
0, NULL)) < 0)
return res;
@ -663,7 +666,7 @@ impl_node_port_set_io(struct spa_node *node,
if (direction != impl->direction)
return -EINVAL;
res = spa_node_port_set_io(impl->adapter, direction, port_id, id, data, size);
res = spa_node_port_set_io(impl->adapter_mix, direction, port_id, id, data, size);
if (id == t->io.Buffers && size >= sizeof(struct spa_io_buffers)) {
impl->io = data;
@ -691,7 +694,7 @@ impl_node_port_use_buffers(struct spa_node *node,
if (direction != impl->direction)
return -EINVAL;
if ((res = spa_node_port_use_buffers(impl->adapter,
if ((res = spa_node_port_use_buffers(impl->adapter_mix,
direction, port_id, buffers, n_buffers)) < 0)
return res;
@ -725,7 +728,7 @@ impl_node_port_alloc_buffers(struct spa_node *node,
if (direction != impl->direction)
return -EINVAL;
return spa_node_port_alloc_buffers(impl->adapter, direction, port_id,
return spa_node_port_alloc_buffers(impl->adapter_mix, direction, port_id,
params, n_params, buffers, n_buffers);
}
@ -909,7 +912,7 @@ static void client_node_initialized(void *data)
if ((res = pw_port_init_mix(impl->client_port, &impl->client_port_mix)) < 0)
return;
if ((res = spa_node_port_set_io(&impl->client_port->mix_node,
if ((res = spa_node_port_set_io(impl->client_port->mix,
impl->direction, 0,
t->io.Buffers,
impl->client_port_mix.io,
@ -924,6 +927,7 @@ static void client_node_initialized(void *data)
NULL, &format, &b)) <= 0) {
pw_log_warn("client-stream %p: no format given", &impl->this);
impl->adapter = impl->cnode;
impl->adapter_mix = impl->client_port->mix;
impl->use_converter = false;
return;
}
@ -948,11 +952,12 @@ static void client_node_initialized(void *data)
}
else {
impl->adapter = impl->cnode;
impl->adapter_mix = impl->client_port->mix;
impl->use_converter = false;
}
if (impl->use_converter) {
if ((res = spa_node_port_set_io(impl->adapter,
if ((res = spa_node_port_set_io(impl->adapter_mix,
SPA_DIRECTION_REVERSE(impl->direction), 0,
t->io.Buffers,
impl->client_port_mix.io,

View file

@ -102,7 +102,7 @@ static void *create_object(void *_data,
input_node = pw_global_get_object(global);
if (output_port_id == -1)
outport = pw_node_get_free_port(output_node, SPA_DIRECTION_OUTPUT);
outport = pw_node_find_port(output_node, SPA_DIRECTION_OUTPUT, SPA_ID_INVALID);
else {
global = pw_core_find_global(core, output_port_id);
if (global == NULL || pw_global_get_type(global) != t->port)
@ -114,7 +114,7 @@ static void *create_object(void *_data,
goto no_output_port;
if (input_port_id == -1)
inport = pw_node_get_free_port(input_node, SPA_DIRECTION_INPUT);
inport = pw_node_find_port(input_node, SPA_DIRECTION_INPUT, SPA_ID_INVALID);
else {
global = pw_core_find_global(core, input_port_id);
if (global == NULL || pw_global_get_type(global) != t->port)

View file

@ -303,11 +303,27 @@ static int on_peer_port(void *data, struct pw_port *port)
{
struct node_info *info = data;
struct pw_port *p;
enum pw_direction direction = pw_direction_reverse(port->direction);
int res;
p = pw_node_get_free_port(info->node, pw_direction_reverse(port->direction));
if (p == NULL)
return 0;
p = pw_node_find_port(info->node, direction, SPA_ID_INVALID);
if (p == NULL || pw_port_is_linked(p)) {
uint32_t port_id;
port_id = pw_node_get_free_port_id(info->node, direction);
if (port_id == SPA_ID_INVALID)
return 0;
p = pw_port_new(direction, port_id, NULL, 0);
if (p == NULL)
return -ENOMEM;
if ((res = pw_port_add(p, info->node)) < 0) {
pw_log_warn("can't add port: %s", spa_strerror(res));
return res;
}
}
return link_ports(info, p, port);
}
@ -734,7 +750,7 @@ static int on_global(void *data, struct pw_global *global)
else
return 0;
if ((node_port = pw_node_get_free_port(node, pw_direction_reverse(direction))) == NULL)
if ((node_port = pw_node_find_port(node, pw_direction_reverse(direction), SPA_ID_INVALID)) == NULL)
return 0;
sess = calloc(1, sizeof(struct session));
@ -765,7 +781,7 @@ static int on_global(void *data, struct pw_global *global)
if (dsp == NULL)
return 0;
if ((dsp_port = pw_node_get_free_port(dsp, direction)) == NULL)
if ((dsp_port = pw_node_find_port(dsp, direction, SPA_ID_INVALID)) == NULL)
return 0;
pw_node_add_listener(dsp, &sess->dsp_listener, &dsp_events, sess);

File diff suppressed because it is too large Load diff

View file

@ -55,12 +55,12 @@ struct impl {
void *user_data;
};
static void pw_spa_node_destroy(void *data)
static void pw_spa_node_free(void *data)
{
struct impl *impl = data;
struct pw_node *node = impl->this;
pw_log_debug("spa-node %p: destroy", node);
pw_log_debug("spa-node %p: free", node);
spa_hook_remove(&impl->node_listener);
if (impl->handle) {
@ -103,7 +103,7 @@ static void on_node_done(void *data, uint32_t seq, int res)
static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS,
.destroy = pw_spa_node_destroy,
.free = pw_spa_node_free,
.async_complete = on_node_done,
};

View file

@ -655,9 +655,9 @@ struct pw_port *pw_core_find_port(struct pw_core *core,
pw_log_debug("id \"%u\" matches node %p", id, n);
best =
pw_node_get_free_port(n,
pw_direction_reverse(other_port->
direction));
pw_node_find_port(n,
pw_direction_reverse(other_port->direction),
SPA_ID_INVALID);
if (best)
break;
}
@ -667,7 +667,9 @@ struct pw_port *pw_core_find_port(struct pw_core *core,
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
struct spa_pod *dummy;
p = pw_node_get_free_port(n, pw_direction_reverse(other_port->direction));
p = pw_node_find_port(n,
pw_direction_reverse(other_port->direction),
SPA_ID_INVALID);
if (p == NULL)
continue;
@ -743,6 +745,8 @@ int pw_core_find_format(struct pw_core *core,
if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE)
in_state = PW_PORT_STATE_CONFIGURE;
pw_log_debug("core %p: states %d %d", core, out_state, in_state);
if (in_state == PW_PORT_STATE_CONFIGURE && out_state > PW_PORT_STATE_CONFIGURE) {
/* only input needs format */
if ((res = spa_node_port_enum_params(output->node->node,
@ -757,7 +761,7 @@ int pw_core_find_format(struct pw_core *core,
pw_log_debug("Got output format:");
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(*format, SPA_DEBUG_FLAG_FORMAT);
} else if (out_state == PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) {
} else if (out_state >= PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) {
/* only output needs format */
if ((res = spa_node_port_enum_params(input->node->node,
input->direction, input->port_id,

View file

@ -70,8 +70,13 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
if (state != old) {
struct pw_node *in = link->input->node, *out = link->output->node;
pw_log_debug("link %p: update state %s -> %s (%s)", link,
pw_link_state_as_string(old), pw_link_state_as_string(state), error);
if (state == PW_LINK_STATE_ERROR) {
pw_log_error("link %p: update state %s -> error (%s)", link,
pw_link_state_as_string(old), error);
} else {
pw_log_debug("link %p: update state %s -> %s", link,
pw_link_state_as_string(old), pw_link_state_as_string(state));
}
link->state = state;
if (link->error)
@ -104,24 +109,32 @@ static void pw_link_update_state(struct pw_link *link, enum pw_link_state state,
static void complete_ready(void *obj, void *data, int res, uint32_t id)
{
struct pw_port *port = data;
struct pw_port_mix *mix = data;
struct pw_port *port = mix->p;
if (SPA_RESULT_IS_OK(res)) {
port->state = PW_PORT_STATE_READY;
mix->state = PW_PORT_STATE_READY;
pw_log_debug("port %p: state READY", port);
} else {
port->state = PW_PORT_STATE_ERROR;
mix->state = PW_PORT_STATE_ERROR;
pw_log_warn("port %p: failed to go to READY", port);
}
}
static void complete_paused(void *obj, void *data, int res, uint32_t id)
{
struct pw_port *port = data;
struct pw_port_mix *mix = data;
struct pw_port *port = mix->p;
if (SPA_RESULT_IS_OK(res)) {
port->state = PW_PORT_STATE_PAUSED;
mix->state = PW_PORT_STATE_PAUSED;
pw_log_debug("port %p: state PAUSED", port);
} else {
port->state = PW_PORT_STATE_ERROR;
mix->state = PW_PORT_STATE_ERROR;
pw_log_warn("port %p: failed to go to PAUSED", port);
}
}
@ -138,9 +151,15 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct pw_type *t = &this->core->type;
uint32_t index = 0;
uint32_t index;
uint32_t in_mix_state, out_mix_state;
if (in_state != PW_PORT_STATE_CONFIGURE && out_state != PW_PORT_STATE_CONFIGURE)
in_mix_state = this->rt.in_mix.state;
out_mix_state = this->rt.out_mix.state;
pw_log_debug("%d %d", in_mix_state, out_mix_state);
if (in_mix_state != PW_PORT_STATE_CONFIGURE && out_mix_state != PW_PORT_STATE_CONFIGURE)
return 0;
pw_link_update_state(this, PW_LINK_STATE_NEGOTIATING, NULL);
@ -151,16 +170,20 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
if ((res = pw_core_find_format(this->core, output, input, NULL, 0, NULL, &format, &b, &error)) < 0)
goto error;
in_state = input->state;
out_state = output->state;
format = pw_spa_pod_copy(format);
spa_pod_fixate(format);
spa_pod_builder_init(&b, buffer, sizeof(buffer));
if (out_state > PW_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
if ((res = spa_node_port_enum_params(output->node->node,
output->direction, output->port_id,
t->param.idFormat, &index,
NULL, &current, &b)) <= 0) {
format, &current, &b)) <= 0) {
if (res == 0)
res = -EBADF;
asprintf(&error, "error get output format: %s", spa_strerror(res));
@ -177,10 +200,11 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
}
}
if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
if ((res = spa_node_port_enum_params(input->node->node,
input->direction, input->port_id,
t->param.idFormat, &index,
NULL, &current, &b)) <= 0) {
format, &current, &b)) <= 0) {
if (res == 0)
res = -EBADF;
asprintf(&error, "error get input format: %s", spa_strerror(res));
@ -201,9 +225,10 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(format, SPA_DEBUG_FLAG_FORMAT);
if (out_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on output", this);
if (out_mix_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on output mix", this);
if ((res = pw_port_set_param(output,
this->rt.out_mix.port.port_id,
t->param.idFormat, SPA_NODE_PARAM_FLAG_NEAREST,
format)) < 0) {
asprintf(&error, "error set output format: %d", res);
@ -211,21 +236,25 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_ready,
output);
&this->rt.out_mix);
}
if (in_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on input", this);
if (in_mix_state == PW_PORT_STATE_CONFIGURE) {
pw_log_debug("link %p: doing set format on input mix", this);
if ((res2 = pw_port_set_param(input,
this->rt.in_mix.port.port_id,
t->param.idFormat, SPA_NODE_PARAM_FLAG_NEAREST,
format)) < 0) {
asprintf(&error, "error set input format: %d", res2);
goto error;
}
if (SPA_RESULT_IS_ASYNC(res2))
pw_work_queue_add(impl->work, input->node, res2, complete_ready, input);
if (SPA_RESULT_IS_ASYNC(res2)) {
pw_work_queue_add(impl->work, input->node, res2, complete_ready,
&this->rt.in_mix);
if (res == 0)
res = res2;
}
}
if (this->info.format)
free(this->info.format);
this->info.format = format;
@ -241,8 +270,8 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
this->info.change_mask = 0;
}
return 0;
pw_log_debug("link %p: result %d", this, res);
return res;
error:
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
@ -505,8 +534,8 @@ static int port_set_io(struct pw_link *this, struct pw_port *port, void *data, s
pw_direction_as_string(port->direction),
port, port->port_id, p->port_id, data);
if (port->mix_node.port_set_io) {
if ((res = spa_node_port_set_io(&port->mix_node,
if (port->mix && port->mix->port_set_io) {
if ((res = spa_node_port_set_io(port->mix,
p->direction,
p->port_id,
t->io.Buffers,
@ -525,20 +554,20 @@ static int select_io(struct pw_link *this)
if (impl->have_io)
return 0;
io = this->rt.mix[SPA_DIRECTION_INPUT].io;
io = this->rt.in_mix.io;
if (io == NULL)
io = this->rt.mix[SPA_DIRECTION_OUTPUT].io;
io = this->rt.out_mix.io;
if (io == NULL)
io = &impl->io;
if (io == NULL)
return -EIO;
if ((res = port_set_io(this, this->input, io,
sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_INPUT])) < 0)
sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
return res;
if ((res = port_set_io(this, this->output, io,
sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_OUTPUT])) < 0)
sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0)
return res;
this->io = io;
@ -558,8 +587,14 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
struct pw_port *input, *output;
struct pw_type *t = &this->core->type;
struct allocation allocation;
uint32_t in_mix_state, out_mix_state;
if (in_state != PW_PORT_STATE_READY && out_state != PW_PORT_STATE_READY)
in_mix_state = this->rt.in_mix.state;
out_mix_state = this->rt.out_mix.state;
pw_log_debug("%d %d", in_mix_state, out_mix_state);
if (in_mix_state != PW_PORT_STATE_READY && out_mix_state != PW_PORT_STATE_READY)
return 0;
pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL);
@ -580,48 +615,15 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
input->node->live = true;
}
if (in_state == PW_PORT_STATE_READY && out_state == PW_PORT_STATE_READY) {
if ((out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
} else if ((out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) &&
(in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) {
out_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
} else {
asprintf(&error, "no common buffer alloc found");
res = -EIO;
goto error;
}
} else if (in_state == PW_PORT_STATE_READY && out_state > PW_PORT_STATE_READY) {
SPA_FLAG_SET(out_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS);
SPA_FLAG_UNSET(in_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS);
} else if (out_state == PW_PORT_STATE_READY && in_state > PW_PORT_STATE_READY) {
SPA_FLAG_SET(in_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS);
SPA_FLAG_UNSET(out_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS);
} else {
pw_log_debug("link %p: delay allocation, state %d %d", this, in_state, out_state);
return 0;
}
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) {
spa_debug_port_info(oinfo);
spa_debug_port_info(iinfo);
}
if (output->allocation.n_buffers) {
out_flags = 0;
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
allocation = output->allocation;
move_allocation(&output->allocation, &allocation);
pw_log_debug("link %p: reusing %d output buffers %p", this,
allocation.n_buffers, allocation.buffers);
@ -629,7 +631,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
allocation = input->allocation;
move_allocation(&input->allocation, &allocation);
pw_log_debug("link %p: reusing %d input buffers %p", this,
allocation.n_buffers, allocation.buffers);
@ -707,7 +709,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(output,
this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
this->rt.out_mix.port.port_id,
params, n_params,
allocation.buffers,
&allocation.n_buffers)) < 0) {
@ -715,7 +717,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_paused, output);
pw_work_queue_add(impl->work, output->node, res, complete_paused,
&this->rt.out_mix);
move_allocation(&allocation, &output->allocation);
@ -723,7 +726,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
allocation.n_buffers, allocation.buffers);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(input,
this->rt.mix[SPA_DIRECTION_INPUT].port.port_id,
this->rt.in_mix.port.port_id,
params, n_params,
allocation.buffers,
&allocation.n_buffers)) < 0) {
@ -731,7 +734,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, input->node, res, complete_paused, input);
pw_work_queue_add(impl->work, input->node, res, complete_paused,
&this->rt.in_mix);
pw_log_debug("link %p: allocated %d buffers %p from input port", this,
allocation.n_buffers, allocation.buffers);
@ -742,7 +746,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_log_debug("link %p: using %d buffers %p on output port", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(output,
this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
this->rt.out_mix.port.port_id,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use output buffers: %s", this,
@ -750,7 +754,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_paused, output);
pw_work_queue_add(impl->work, output->node, res, complete_paused,
&this->rt.out_mix);
move_allocation(&allocation, &output->allocation);
@ -759,7 +764,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_log_debug("link %p: using %d buffers %p on input port", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(input,
this->rt.mix[SPA_DIRECTION_INPUT].port.port_id,
this->rt.in_mix.port.port_id,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use input buffers: %s", this,
@ -767,7 +772,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, input->node, res, complete_paused, input);
pw_work_queue_add(impl->work, input->node, res, complete_paused,
&this->rt.in_mix);
} else {
asprintf(&error, "no common buffer alloc found");
@ -793,13 +799,16 @@ do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
struct spa_graph_port *in, *out;
pw_log_trace("link %p: activate", this);
spa_graph_port_add(&this->output->rt.mix_node, &this->rt.mix[SPA_DIRECTION_OUTPUT].port);
spa_graph_port_add(&this->input->rt.mix_node, &this->rt.mix[SPA_DIRECTION_INPUT].port);
spa_graph_port_link(&this->rt.mix[SPA_DIRECTION_OUTPUT].port,
&this->rt.mix[SPA_DIRECTION_INPUT].port);
out = &this->rt.out_mix.port;
in = &this->rt.in_mix.port;
spa_graph_port_add(&this->output->rt.mix_node, out);
spa_graph_port_add(&this->input->rt.mix_node, in);
spa_graph_port_link(out, in);
this->rt.link.signal_data = &this->input->node->rt.root;
spa_graph_link_add(&this->output->node->rt.root,
this->input->node->rt.root.state,
@ -832,6 +841,7 @@ static int check_states(struct pw_link *this, void *user_data, int res)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int in_state, out_state;
struct pw_port *input, *output;
int in_mix_state, out_mix_state;
if (this->state == PW_LINK_STATE_ERROR)
return -EIO;
@ -840,7 +850,7 @@ static int check_states(struct pw_link *this, void *user_data, int res)
output = this->output;
if (input == NULL || output == NULL)
return 0;
return -EIO;
if (input->node->info.state == PW_NODE_STATE_ERROR ||
output->node->info.state == PW_NODE_STATE_ERROR)
@ -856,7 +866,10 @@ static int check_states(struct pw_link *this, void *user_data, int res)
return -EIO;
}
if (in_state == PW_PORT_STATE_PAUSED && out_state == PW_PORT_STATE_PAUSED) {
in_mix_state = this->rt.in_mix.state;
out_mix_state = this->rt.out_mix.state;
if (in_mix_state == PW_PORT_STATE_PAUSED && out_mix_state == PW_PORT_STATE_PAUSED) {
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
return 0;
}
@ -901,22 +914,28 @@ output_node_async_complete(void *data, uint32_t seq, int res)
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
int res;
struct pw_port_mix *mix;
pw_log_debug("%d %p", spa_list_is_empty(&port->links), port->allocation.mem);
// if (port->direction == PW_DIRECTION_OUTPUT && !spa_list_is_empty(&port->links))
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
if (port->direction == PW_DIRECTION_OUTPUT)
return;
if ((res = pw_port_use_buffers(port,
link->rt.mix[port->direction].port.port_id, NULL, 0)) < 0)
if (port->direction == PW_DIRECTION_OUTPUT)
mix = &link->rt.out_mix;
else
mix = &link->rt.in_mix;
if ((res = pw_port_use_buffers(port, mix->port.port_id, NULL, 0)) < 0)
pw_log_warn("link %p: port %p clear error %s", link, port, spa_strerror(res));
}
static void input_remove(struct pw_link *this, struct pw_port *port)
{
struct impl *impl = (struct impl *) this;
struct pw_port_mix *mix = &this->rt.mix[SPA_DIRECTION_INPUT];
struct pw_port_mix *mix = &this->rt.in_mix;
pw_log_debug("link %p: remove input port %p", this, port);
spa_hook_remove(&impl->input_port_listener);
@ -935,7 +954,7 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
static void output_remove(struct pw_link *this, struct pw_port *port)
{
struct impl *impl = (struct impl *) this;
struct pw_port_mix *mix = &this->rt.mix[SPA_DIRECTION_OUTPUT];
struct pw_port_mix *mix = &this->rt.out_mix;
pw_log_debug("link %p: remove output port %p", this, port);
spa_hook_remove(&impl->output_port_listener);
@ -1001,12 +1020,16 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
struct spa_graph_port *in, *out;
pw_log_trace("link %p: disable %p and %p", this, &this->rt.mix[0], &this->rt.mix[1]);
in = &this->rt.in_mix.port;
out = &this->rt.out_mix.port;
spa_graph_port_unlink(&this->rt.mix[SPA_DIRECTION_OUTPUT].port);
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_OUTPUT].port);
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_INPUT].port);
pw_log_trace("link %p: disable %p and %p", this, in, out);
spa_graph_port_unlink(out);
spa_graph_port_remove(out);
spa_graph_port_remove(in);
spa_graph_link_remove(&this->rt.link);
this->rt.link.signal_data = NULL;
@ -1231,14 +1254,14 @@ struct pw_link *pw_link_new(struct pw_core *core,
impl->io.buffer_id = SPA_ID_INVALID;
impl->io.status = SPA_STATUS_NEED_BUFFER;
pw_port_init_mix(output, &this->rt.mix[SPA_DIRECTION_OUTPUT]);
pw_port_init_mix(input, &this->rt.mix[SPA_DIRECTION_INPUT]);
pw_port_init_mix(output, &this->rt.out_mix);
pw_port_init_mix(input, &this->rt.in_mix);
this->rt.link.signal = spa_graph_link_signal_node;
pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl,
output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id);
output_node, output->port_id, this->rt.out_mix.port.port_id,
input_node, input->port_id, this->rt.in_mix.port.port_id);
find_driver(this);

View file

@ -61,41 +61,15 @@ pw_log_logv(enum spa_log_level level,
/** Check if a loglevel is enabled \memberof pw_log */
#define pw_log_level_enabled(lev) (pw_log_level >= (lev))
#if defined(__USE_ISOC11) || defined(__USE_ISOC99) || \
(defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901L)
#define pw_log_logc(lev,...) \
#define pw_log(lev,...) \
if (SPA_UNLIKELY(pw_log_level_enabled (lev))) \
pw_log_log(lev,__VA_ARGS__)
pw_log_log(lev,__FILE__,__LINE__,__func__,__VA_ARGS__)
#define pw_log_error(...) pw_log_logc(SPA_LOG_LEVEL_ERROR,__FILE__,__LINE__,__func__,__VA_ARGS__)
#define pw_log_warn(...) pw_log_logc(SPA_LOG_LEVEL_WARN,__FILE__,__LINE__,__func__,__VA_ARGS__)
#define pw_log_info(...) pw_log_logc(SPA_LOG_LEVEL_INFO,__FILE__,__LINE__,__func__,__VA_ARGS__)
#define pw_log_debug(...) pw_log_logc(SPA_LOG_LEVEL_DEBUG,__FILE__,__LINE__,__func__,__VA_ARGS__)
#define pw_log_trace(...) pw_log_logc(SPA_LOG_LEVEL_TRACE,__FILE__,__LINE__,__func__,__VA_ARGS__)
#else
#include <stdarg.h>
#define PW_LOG_FUNC(name,lev) \
static inline void pw_log_##name (const char *format, ...) \
{ \
if (SPA_UNLIKELY(pw_log_level_enabled(lev))) { \
va_list varargs; \
va_start(varargs, format); \
pw_log_logv(lev,__FILE__,__LINE__,__func__,format,varargs); \
va_end(varargs); \
} \
}
PW_LOG_FUNC(error, SPA_LOG_LEVEL_ERROR)
PW_LOG_FUNC(warn, SPA_LOG_LEVEL_WARN)
PW_LOG_FUNC(info, SPA_LOG_LEVEL_INFO)
PW_LOG_FUNC(debug, SPA_LOG_LEVEL_DEBUG)
PW_LOG_FUNC(trace, SPA_LOG_LEVEL_TRACE)
#endif
#define pw_log_error(...) pw_log(SPA_LOG_LEVEL_ERROR,__VA_ARGS__)
#define pw_log_warn(...) pw_log(SPA_LOG_LEVEL_WARN,__VA_ARGS__)
#define pw_log_info(...) pw_log(SPA_LOG_LEVEL_INFO,__VA_ARGS__)
#define pw_log_debug(...) pw_log(SPA_LOG_LEVEL_DEBUG,__VA_ARGS__)
#define pw_log_trace(...) pw_log(SPA_LOG_LEVEL_TRACE,__VA_ARGS__)
#ifdef __cplusplus
}

View file

@ -109,14 +109,14 @@ static int suspend_node(struct pw_node *this)
pw_log_debug("node %p: suspend node", this);
spa_list_for_each(p, &this->input_ports, link) {
if ((res = pw_port_set_param(p, this->core->type.param.idFormat, 0, NULL)) < 0)
if ((res = pw_port_set_param(p, SPA_ID_INVALID, this->core->type.param.idFormat, 0, NULL)) < 0)
pw_log_warn("error unset format input: %s", spa_strerror(res));
/* force CONFIGURE in case of async */
p->state = PW_PORT_STATE_CONFIGURE;
}
spa_list_for_each(p, &this->output_ports, link) {
if ((res = pw_port_set_param(p, this->core->type.param.idFormat, 0, NULL)) < 0)
if ((res = pw_port_set_param(p, SPA_ID_INVALID, this->core->type.param.idFormat, 0, NULL)) < 0)
pw_log_warn("error unset format output: %s", spa_strerror(res));
/* force CONFIGURE in case of async */
p->state = PW_PORT_STATE_CONFIGURE;
@ -159,7 +159,7 @@ static void update_port_map(struct pw_node *node, enum pw_direction direction,
pw_direction_as_string(direction), ids[n]);
if (port == NULL) {
if ((port = pw_port_new(direction, ids[n], NULL, 0))) {
if ((port = pw_port_new(direction, ids[n], NULL, node->port_user_data_size))) {
if ((res = pw_port_add(port, node)) < 0) {
pw_log_error("node %p: can't add port %p: %d, %s",
node, port, res, spa_strerror(res));
@ -370,6 +370,7 @@ int pw_node_register(struct pw_node *this,
int pw_node_initialized(struct pw_node *this)
{
pw_log_debug("node %p initialized", this);
spa_hook_list_call(&this->listener_list, struct pw_node_events, initialized);
pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
return 0;
@ -753,13 +754,9 @@ void pw_node_destroy(struct pw_node *node)
pw_log_debug("node %p: destroy ports", node);
spa_list_for_each_safe(port, tmpp, &node->input_ports, link) {
spa_hook_list_call(&node->listener_list, struct pw_node_events,
port_removed, port);
pw_port_destroy(port);
}
spa_list_for_each_safe(port, tmpp, &node->output_ports, link) {
spa_hook_list_call(&node->listener_list, struct pw_node_events,
port_removed, port);
pw_port_destroy(port);
}
@ -842,98 +839,69 @@ int pw_node_for_each_param(struct pw_node *node,
struct pw_port *
pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id)
{
struct pw_port *port, *p;
struct pw_map *portmap;
struct spa_list *ports;
if (direction == PW_DIRECTION_INPUT)
if (direction == PW_DIRECTION_INPUT) {
portmap = &node->input_port_map;
else
ports = &node->input_ports;
} else {
portmap = &node->output_port_map;
ports = &node->output_ports;
}
return pw_map_lookup(portmap, port_id);
if (port_id != SPA_ID_INVALID)
port = pw_map_lookup(portmap, port_id);
else {
port = NULL;
/* try to find an unlinked port */
spa_list_for_each(p, ports, link) {
if (spa_list_is_empty(&p->links)) {
port = p;
break;
}
/* We can use this port if it can multiplex */
if (SPA_FLAG_CHECK(p->mix_flags, PW_PORT_MIX_FLAG_MULTI))
port = p;
}
}
pw_log_debug("node %p: return port %p", node, port);
return port;
}
uint32_t pw_node_get_free_port_id(struct pw_node *node, enum pw_direction direction)
{
if (direction == PW_DIRECTION_INPUT)
return pw_map_insert_new(&node->input_port_map, NULL);
else
return pw_map_insert_new(&node->output_port_map, NULL);
}
/**
* pw_node_get_free_port:
* \param node a \ref pw_node
* \param direction a \ref pw_direction
* \return the new port or NULL on error
*
* Find a new unused port in \a node with \a direction
*
* \memberof pw_node
*/
struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction direction)
{
uint32_t n_ports, max_ports;
struct spa_list *ports;
struct pw_port *port = NULL, *p, *mixport = NULL;
struct pw_map *portmap;
int res;
uint32_t port_id;
if (direction == PW_DIRECTION_INPUT) {
max_ports = node->info.max_input_ports;
n_ports = node->info.n_input_ports;
ports = &node->input_ports;
portmap = &node->input_port_map;
} else {
max_ports = node->info.max_output_ports;
n_ports = node->info.n_output_ports;
ports = &node->output_ports;
portmap = &node->output_port_map;
}
pw_log_debug("node %p: direction %d %u %u",
node, direction, n_ports, max_ports);
pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
if (n_ports >= max_ports)
goto no_mem;
/* first try to find an unlinked port */
spa_list_for_each(p, ports, link) {
if (spa_list_is_empty(&p->links)) {
port = p;
goto found;
}
/* for output we can reuse an existing port, for input only
* when there is a multiplex */
if (direction == PW_DIRECTION_OUTPUT || p->mix != NULL)
mixport = p;
}
port_id = pw_map_insert_new(portmap, NULL);
if (port_id == SPA_ID_INVALID)
goto no_mem;
/* no port, can we create one ? */
if (n_ports < max_ports) {
uint32_t port_id = pw_map_insert_new(portmap, NULL);
pw_log_debug("node %p: free port %d", node, port_id);
pw_log_debug("node %p: creating port direction %d %u", node, direction, port_id);
if ((res = spa_node_add_port(node->node, direction, port_id)) < 0) {
pw_log_error("node %p: could not add port %d %s", node, port_id,
spa_strerror(res));
goto no_mem;
}
port = pw_port_new(direction, port_id, NULL, 0);
if (port == NULL)
goto no_mem;
if ((res = pw_port_add(port, node)) < 0)
goto add_failed;
} else {
port = mixport;
}
found:
pw_log_debug("node %p: return port %p", node, port);
return port;
return port_id;
no_mem:
pw_log_error("node %p: can't create new port", node);
return NULL;
add_failed:
pw_log_error("node %p: can't add new port: %s", node, spa_strerror(res));
pw_port_destroy(port);
return NULL;
pw_log_warn("no more port available");
return SPA_ID_INVALID;
}
static void on_state_complete(struct pw_node *node, void *data, int res)
@ -1059,8 +1027,13 @@ void pw_node_update_state(struct pw_node *node, enum pw_node_state state, char *
if (old == state)
return;
pw_log_debug("node %p: update state from %s -> %s", node,
if (state == PW_NODE_STATE_ERROR) {
pw_log_error("node %p: update state from %s -> error (%s)", node,
pw_node_state_as_string(old), error);
} else {
pw_log_debug("node %p: update state from %s -> %s", node,
pw_node_state_as_string(old), pw_node_state_as_string(state));
}
if (node->info.error)
free((char*)node->info.error);

View file

@ -59,6 +59,8 @@ struct pw_node_events {
/** the node is initialized */
void (*initialized) (void *data);
/** a port is being initialized on the node */
void (*port_init) (void *data, struct pw_port *port);
/** a port was added */
void (*port_added) (void *data, struct pw_port *port);
/** a port was removed */
@ -171,17 +173,14 @@ int pw_node_for_each_param(struct pw_node *node,
struct spa_pod *param),
void *data);
/** Find the port with direction and port_id or NULL when not found */
/** Find the port with direction and port_id or NULL when not found. Passing
* SPA_ID_INVALID for port_id will return any port, preferably an unlinked one. */
struct pw_port *
pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id);
/** Get a free unused port_id from the node */
uint32_t pw_node_get_free_port_id(struct pw_node *node, enum pw_direction direction);
/** Get a free unused port from the node, this can be an old unused existing port
* or a new port */
struct pw_port * pw_node_get_free_port(struct pw_node *node, enum pw_direction direction);
/** Set a node active. This will start negotiation with all linked active
* nodes and start data transport */
int pw_node_set_active(struct pw_node *node, bool active);

View file

@ -30,6 +30,7 @@
/** \cond */
struct impl {
struct pw_port this;
struct spa_node mix_node; /**< mix node implementation */
};
struct resource_data {
@ -40,10 +41,14 @@ struct resource_data {
/** \endcond */
static void port_update_state(struct pw_port *port, enum pw_port_state state)
static void port_update_state(struct pw_port *port, struct pw_port_mix *mix, enum pw_port_state state)
{
if (mix)
mix->state = state;
if (port->state != state) {
pw_log_debug("port %p: state %d -> %d", port, port->state, state);
pw_log(state == PW_PORT_STATE_ERROR ?
SPA_LOG_LEVEL_ERROR : SPA_LOG_LEVEL_DEBUG,
"port %p: state %d -> %d", port, port->state, state);
port->state = state;
spa_hook_list_call(&port->listener_list, struct pw_port_events, state_changed, state);
}
@ -51,7 +56,8 @@ static void port_update_state(struct pw_port *port, enum pw_port_state state)
static int tee_process(struct spa_node *data)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_io_buffers *io = &this->rt.io;
@ -71,7 +77,8 @@ static int tee_process(struct spa_node *data)
static int tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_port *p = &this->rt.mix_port, *pp;
if ((pp = p->peer) != NULL) {
@ -90,7 +97,8 @@ static const struct spa_node schedule_tee_node = {
static int schedule_mix_input(struct spa_node *data)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_io_buffers *io = &this->rt.io;
@ -108,7 +116,8 @@ static int schedule_mix_input(struct spa_node *data)
static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
struct pw_port *this = &impl->this;
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p, *pp;
@ -130,15 +139,20 @@ static const struct spa_node schedule_mix_node = {
int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix)
{
uint32_t id;
uint32_t port_id;
int res = 0;
const struct pw_port_implementation *pi = port->implementation;
id = pw_map_insert_new(&port->mix_port_map, mix);
port_id = pw_map_insert_new(&port->mix_port_map, mix);
spa_graph_port_init(&mix->port,
port->direction, id,
port->direction, port_id,
0);
mix->p = port;
mix->state = PW_PORT_STATE_CONFIGURE;
if (port->mix && port->mix->add_port)
port->mix->add_port(port->mix, port->direction, port_id);
if (pi && pi->init_mix)
res = pi->init_mix(port->implementation_data, mix);
@ -151,13 +165,18 @@ int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix)
int pw_port_release_mix(struct pw_port *port, struct pw_port_mix *mix)
{
int res = 0;
uint32_t port_id = mix->port.port_id;
const struct pw_port_implementation *pi = port->implementation;
pw_map_remove(&port->mix_port_map, mix->port.port_id);
pw_map_remove(&port->mix_port_map, port_id);
if (pi && pi->release_mix)
res = pi->release_mix(port->implementation_data, mix);
if (port->mix && port->mix->remove_port) {
port->mix->remove_port(port->mix, port->direction, port_id);
}
pw_log_debug("port %p: release mix %d.%d", port,
port->port_id, mix->port.port_id);
@ -211,26 +230,36 @@ struct pw_port *pw_port_new(enum pw_direction direction,
spa_graph_node_init(&this->rt.mix_node, &this->rt.mix_state);
this->rt.mix_state.status = SPA_STATUS_NEED_BUFFER;
this->mix_node = this->direction == PW_DIRECTION_INPUT ?
impl->mix_node = this->direction == PW_DIRECTION_INPUT ?
schedule_mix_node :
schedule_tee_node;
spa_graph_node_set_callbacks(&this->rt.mix_node,
&spa_graph_node_impl_default, &this->mix_node);
pw_map_init(&this->mix_port_map, 64, 64);
pw_port_set_mix(this, &impl->mix_node, 0);
pw_map_init(&this->mix_port_map, 64, 64);
spa_graph_port_init(&this->rt.mix_port,
pw_direction_reverse(this->direction),
0, 0);
pw_direction_reverse(this->direction), 0,
0);
this->rt.io.status = SPA_STATUS_NEED_BUFFER;
return this;
no_mem:
pw_log_warn("port %p: new failed", impl);
free(impl);
return NULL;
}
int pw_port_set_mix(struct pw_port *port, struct spa_node *node, uint32_t flags)
{
pw_log_debug("port %p: mix node %p->%p", port, port->mix, node);
port->mix = node;
port->mix_flags = flags;
spa_graph_node_set_callbacks(&port->rt.mix_node,
&spa_graph_node_impl_default, port->mix);
return 0;
}
enum pw_direction pw_port_get_direction(struct pw_port *port)
{
return port->direction;
@ -428,16 +457,44 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
uint32_t port_id = port->port_id;
struct pw_core *core = node->core;
struct pw_type *t = &core->type;
struct spa_list *ports;
struct pw_map *portmap;
struct pw_port *find;
const char *str, *dir;
int res;
if (port->node != NULL)
return -EEXIST;
if (port->direction == PW_DIRECTION_INPUT) {
ports = &node->input_ports;
portmap = &node->input_port_map;
} else {
ports = &node->output_ports;
portmap = &node->output_port_map;
}
find = pw_map_lookup(portmap, port_id);
if (find != NULL)
return -EEXIST;
spa_hook_list_call(&node->listener_list, struct pw_node_events, port_init, port);
if ((res = spa_node_port_get_info(node->node,
port->direction, port_id,
&port->spa_info)) < 0) {
/* can't get port info, try to add it.. */
if ((res = spa_node_add_port(node->node, port->direction, port_id)) < 0)
goto add_failed;
port->to_remove = true;
/* try again */
if ((res = spa_node_port_get_info(node->node,
port->direction, port_id,
&port->spa_info)) < 0)
return res;
goto info_failed;
}
port->node = node;
@ -456,16 +513,15 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
if (SPA_FLAG_CHECK(port->spa_info->flags, SPA_PORT_INFO_FLAG_TERMINAL))
pw_properties_set(port->properties, "port.terminal", "1");
pw_log_debug("port %p: add to node %p %08x", port, node, port->spa_info->flags);
pw_log_debug("port %p: %d add to node %p %08x", port, port_id, node, port->spa_info->flags);
spa_list_append(ports, &port->link);
pw_map_insert_at(portmap, port_id, port);
if (port->direction == PW_DIRECTION_INPUT) {
spa_list_append(&node->input_ports, &port->link);
pw_map_insert_at(&node->input_port_map, port_id, port);
node->info.n_input_ports++;
node->info.change_mask |= PW_NODE_CHANGE_MASK_INPUT_PORTS;
}
else {
spa_list_append(&node->output_ports, &port->link);
pw_map_insert_at(&node->output_port_map, port_id, port);
} else {
node->info.n_output_ports++;
node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS;
}
@ -479,6 +535,13 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
t->io.Buffers,
&port->rt.io, sizeof(port->rt.io));
if (port->mix && port->mix->port_set_io) {
spa_node_port_set_io(port->mix,
pw_direction_reverse(port->direction), 0,
t->io.Buffers,
&port->rt.io, sizeof(port->rt.io));
}
if (spa_node_port_set_io(node->node,
port->direction, port_id,
t->io.Clock,
@ -494,25 +557,31 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port);
if (port->state <= PW_PORT_STATE_INIT)
port_update_state(port, PW_PORT_STATE_CONFIGURE);
port_update_state(port, NULL, PW_PORT_STATE_CONFIGURE);
spa_hook_list_call(&node->listener_list, struct pw_node_events, port_added, port);
return 0;
add_failed:
pw_log_error("node %p: could not add port %d %s", node, port_id,
spa_strerror(res));
return res;
info_failed:
pw_log_error("node %p: could not get port info %d %s", node, port_id,
spa_strerror(res));
return res;
}
static int do_destroy_link(void *data, struct pw_link *link)
{
pw_link_destroy(link);
return 0;
}
void pw_port_unlink(struct pw_port *port)
{
struct pw_link *l, *t;
if (port->direction == PW_DIRECTION_OUTPUT) {
spa_list_for_each_safe(l, t, &port->links, output_link)
pw_link_destroy(l);
}
else {
spa_list_for_each_safe(l, t, &port->links, input_link)
pw_link_destroy(l);
}
pw_port_for_each_link(port, do_destroy_link, port);
}
static int do_remove_port(struct spa_loop *loop,
@ -532,6 +601,7 @@ static int do_remove_port(struct spa_loop *loop,
static void pw_port_remove(struct pw_port *port)
{
struct pw_node *node = port->node;
int res;
if (node == NULL)
return;
@ -541,11 +611,15 @@ static void pw_port_remove(struct pw_port *port)
pw_loop_invoke(port->node->data_loop, do_remove_port,
SPA_ID_INVALID, NULL, 0, true, port);
if (port->to_remove) {
if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0)
pw_log_warn("port %p: can't remove: %s", port, spa_strerror(res));
}
if (port->direction == PW_DIRECTION_INPUT) {
pw_map_remove(&node->input_port_map, port->port_id);
node->info.n_input_ports--;
}
else {
} else {
pw_map_remove(&node->output_port_map, port->port_id);
node->info.n_output_ports--;
}
@ -562,14 +636,14 @@ void pw_port_destroy(struct pw_port *port)
spa_hook_list_call(&port->listener_list, struct pw_port_events, destroy);
pw_port_remove(port);
pw_log_debug("port %p: control destroy", port);
spa_list_for_each_safe(control, ctemp, &port->control_list[0], port_link)
pw_control_destroy(control);
spa_list_for_each_safe(control, ctemp, &port->control_list[1], port_link)
pw_control_destroy(control);
pw_port_remove(port);
if (port->global) {
spa_hook_remove(&port->global_listener);
pw_global_destroy(port->global);
@ -678,26 +752,78 @@ int pw_port_for_each_filtered_param(struct pw_port *in_port,
return res;
}
int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
int pw_port_for_each_link(struct pw_port *port,
int (*callback) (void *data, struct pw_link *link),
void *data)
{
struct pw_link *l, *t;
int res = 0;
if (port->direction == PW_DIRECTION_OUTPUT) {
spa_list_for_each_safe(l, t, &port->links, output_link)
if ((res = callback(data, l)) != 0)
break;
} else {
spa_list_for_each_safe(l, t, &port->links, input_link)
if ((res = callback(data, l)) != 0)
break;
}
return res;
}
int pw_port_is_linked(struct pw_port *port)
{
return spa_list_is_empty(&port->links) ? 0 : 1;
}
int pw_port_set_param(struct pw_port *port, uint32_t mix_id, uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
int res;
int res = 0;
struct pw_node *node = port->node;
struct pw_core *core = node->core;
struct pw_type *t = &core->type;
struct pw_port_mix *mix = NULL;
if (mix_id != SPA_ID_INVALID)
mix = pw_map_lookup(&port->mix_port_map, mix_id);
if (mix != NULL && port->mix != NULL && port->mix->port_set_param != NULL) {
struct spa_graph_port *p = &mix->port;
res = spa_node_port_set_param(port->mix,
p->direction, p->port_id,
id, flags, param);
pw_log_debug("port %p: %d set param on mix %d:%d.%d %s: %d (%s)", port, port->state,
port->direction, port->port_id, p->port_id,
spa_type_map_get_type(t->map, id), res, spa_strerror(res));
if (port->state == PW_PORT_STATE_CONFIGURE) {
spa_node_port_set_param(port->mix,
pw_direction_reverse(p->direction), 0,
id, flags, param);
}
}
if (port->state == PW_PORT_STATE_CONFIGURE || param == NULL) {
res = spa_node_port_set_param(node->node, port->direction, port->port_id, id, flags, param);
pw_log_debug("port %p: %d set param on node %d:%d %s: %d (%s)", port, port->state,
port->direction, port->port_id,
spa_type_map_get_type(t->map, id), res, spa_strerror(res));
}
res = spa_node_port_set_param(node->node, port->direction, port->port_id, id, flags, param);
pw_log_debug("port %p: set param %s: %d (%s)", port,
spa_type_map_get_type(t->map, id), res, spa_strerror(res));
if (id == t->param.idFormat) {
if (param == NULL || res < 0) {
free_allocation(&port->allocation);
port->allocated = false;
port_update_state (port, PW_PORT_STATE_CONFIGURE);
port_update_state (port, mix, PW_PORT_STATE_CONFIGURE);
}
else if (!SPA_RESULT_IS_ASYNC(res)) {
port_update_state (port, PW_PORT_STATE_READY);
if (port->state == PW_PORT_STATE_CONFIGURE)
port_update_state (port, mix, PW_PORT_STATE_READY);
else if (mix)
mix->state = PW_PORT_STATE_READY;
}
}
return res;
@ -706,12 +832,13 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res;
int res = 0;
struct pw_node *node = port->node;
struct pw_port_mix *mix;
struct spa_graph_port *p;
const struct pw_port_implementation *pi = port->implementation;
pw_log_debug("port %p: %d.%d: %d buffers ", port, port->port_id, mix_id, n_buffers);
pw_log_debug("port %p: %d:%d.%d: %d buffers %d", port,
port->direction, port->port_id, mix_id, n_buffers, port->state);
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return 0;
@ -722,32 +849,38 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,
if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL)
return -EIO;
p = &mix->port;
if (port->mix_node.port_use_buffers)
res = spa_node_port_use_buffers(&port->mix_node, p->direction, p->port_id, buffers, n_buffers);
else
res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers);
pw_log_debug("port %p: %d.%d: use %d buffers: %d (%s)", port,
port->port_id, mix_id, n_buffers, res, spa_strerror(res));
port->allocated = false;
free_allocation(&port->allocation);
if (res < 0) {
n_buffers = 0;
buffers = NULL;
if (port->mix != NULL && port->mix->port_use_buffers != NULL) {
struct spa_graph_port *p = &mix->port;
res = spa_node_port_use_buffers(port->mix,
p->direction, p->port_id, buffers, n_buffers);
pw_log_debug("port %p: use buffers on mix: %d (%s)",
port, res, spa_strerror(res));
}
mix->n_buffers = n_buffers;
mix->buffers = buffers;
if (port->state == PW_PORT_STATE_READY) {
if (!SPA_FLAG_CHECK(port->mix_flags, PW_PORT_MIX_FLAG_MIX_ONLY)) {
res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id, buffers, n_buffers);
pw_log_debug("port %p: use buffers on node: %d (%s)",
port, res, spa_strerror(res));
}
port->allocated = false;
free_allocation(&port->allocation);
if (pi && pi->use_buffers)
res = pi->use_buffers(port->implementation_data, buffers, n_buffers);
}
if (res < 0)
n_buffers = 0;
if (n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED);
port_update_state (port, mix, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res)) {
if (port->state == PW_PORT_STATE_READY)
port_update_state (port, mix, PW_PORT_STATE_PAUSED);
else if (mix)
mix->state = PW_PORT_STATE_PAUSED;
}
return res;
}
@ -758,7 +891,7 @@ int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id,
int res;
struct pw_node *node = port->node;
struct pw_port_mix *mix;
struct spa_graph_port *p;
const struct pw_port_implementation *pi = port->implementation;
if (port->state < PW_PORT_STATE_READY)
return -EIO;
@ -766,16 +899,19 @@ int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id,
if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL)
return -EIO;
p = &mix->port;
if (port->mix_node.port_use_buffers)
res = spa_node_port_alloc_buffers(&port->mix_node, p->direction, p->port_id,
if (port->mix && port->mix->port_use_buffers) {
struct spa_graph_port *p = &mix->port;
res = spa_node_port_alloc_buffers(port->mix, p->direction, p->port_id,
params, n_params,
buffers, n_buffers);
else
} else {
res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id,
params, n_params,
buffers, n_buffers);
}
if (pi && pi->alloc_buffers)
res = pi->alloc_buffers(port->implementation_data, params, n_params, buffers, n_buffers);
pw_log_debug("port %p: %d.%d alloc %d buffers: %d (%s)", port,
port->port_id, mix_id, *n_buffers, res, spa_strerror(res));
@ -784,19 +920,15 @@ int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id,
if (res < 0) {
*n_buffers = 0;
buffers = NULL;
port->allocated = false;
}
else {
} else {
port->allocated = true;
}
mix->n_buffers = *n_buffers;
mix->buffers = buffers;
if (*n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY);
port_update_state (port, mix, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED);
port_update_state (port, mix, PW_PORT_STATE_PAUSED);
return res;
}

View file

@ -273,6 +273,7 @@ struct pw_node {
struct spa_node *node; /**< SPA node implementation */
struct spa_list resource_list; /**< list of resources for this node */
uint32_t port_user_data_size; /**< extra size for port user data */
struct spa_list input_ports; /**< list of input ports */
struct pw_map input_port_map; /**< map from port_id to port */
@ -305,11 +306,11 @@ struct pw_node {
};
struct pw_port_mix {
struct pw_port *p;
struct spa_graph_port port;
struct spa_io_buffers *io;
struct spa_buffer **buffers;
uint32_t n_buffers;
uint32_t id;
enum pw_port_state state; /**< state of the port */
};
struct pw_port_implementation {
@ -318,6 +319,9 @@ struct pw_port_implementation {
int (*init_mix) (void *data, struct pw_port_mix *mix);
int (*release_mix) (void *data, struct pw_port_mix *mix);
int (*use_buffers) (void *data, struct spa_buffer **buffers, uint32_t n_buffers);
int (*alloc_buffers) (void *data, struct spa_pod **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers);
};
struct pw_port {
@ -327,6 +331,8 @@ struct pw_port {
struct pw_global *global; /**< global for this port */
struct spa_hook global_listener;
bool registered;
bool to_remove; /**< if the port should be removed from the
* implementation when destroyed */
enum pw_direction direction; /**< port direction */
uint32_t port_id; /**< port id */
@ -351,8 +357,10 @@ struct pw_port {
const struct pw_port_implementation *implementation;
void *implementation_data;
struct spa_node *mix; /**< optional port buffer mix/split */
struct spa_node mix_node; /**< mix node implementation */
struct spa_node *mix; /**< port buffer mix/split */
#define PW_PORT_MIX_FLAG_MULTI (1<<0) /**< multi input or output */
#define PW_PORT_MIX_FLAG_MIX_ONLY (1<<1) /**< only negotiate mix ports */
uint32_t mix_flags; /**< flags for the mixing */
struct pw_map mix_port_map; /**< map from port_id from mixer */
struct {
@ -394,7 +402,8 @@ struct pw_link {
struct spa_hook_list listener_list;
struct {
struct pw_port_mix mix[2];
struct pw_port_mix out_mix; /**< port added to the output mixer */
struct pw_port_mix in_mix; /**< port added to the input mixer */
struct spa_graph_link link; /**< nodes link */
} rt;
@ -564,6 +573,8 @@ int pw_port_register(struct pw_port *port,
/** Get the user data of a port, the size of the memory was given \ref in pw_port_new */
void * pw_port_get_user_data(struct pw_port *port);
int pw_port_set_mix(struct pw_port *port, struct spa_node *node, uint32_t flags);
/** Add a port to a node \memberof pw_port */
int pw_port_add(struct pw_port *port, struct pw_node *node);
@ -598,9 +609,21 @@ int pw_port_for_each_filtered_param(struct pw_port *in_port,
struct spa_pod *param),
void *data);
/** Set a param on a port \memberof pw_port */
int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
const struct spa_pod *param);
/** Iterate the links of the port. The callback should return
* 0 to fetch the next item, any other value stops the iteration and returns
* the value. When all callbacks return 0, this function returns 0 when all
* items are iterated. */
int pw_port_for_each_link(struct pw_port *port,
int (*callback) (void *data, struct pw_link *link),
void *data);
/** check is a port has links, return 0 if not, 1 if it is linked */
int pw_port_is_linked(struct pw_port *port);
/** Set a param on a port \memberof pw_port, use SPA_ID_INVALID for mix_id to set
* the param on all mix ports */
int pw_port_set_param(struct pw_port *port, uint32_t mix_id,
uint32_t id, uint32_t flags, const struct spa_pod *param);
/** Use buffers on a port \memberof pw_port */
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,

View file

@ -157,9 +157,15 @@ pw_remote_update_state(struct pw_remote *remote, enum pw_remote_state state, con
} else {
remote->error = NULL;
}
pw_log_debug("remote %p: update state from %s -> %s (%s)", remote,
pw_remote_state_as_string(old),
pw_remote_state_as_string(state), remote->error);
if (state == PW_REMOTE_STATE_ERROR) {
pw_log_error("remote %p: update state from %s -> %s (%s)", remote,
pw_remote_state_as_string(old),
pw_remote_state_as_string(state), remote->error);
} else {
pw_log_debug("remote %p: update state from %s -> %s", remote,
pw_remote_state_as_string(old),
pw_remote_state_as_string(state));
}
remote->state = state;
spa_hook_list_call(&remote->listener_list, struct pw_remote_events, state_changed,
@ -646,6 +652,7 @@ static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id)
mix->port = port;
mix->mix_id = mix_id;
pw_port_init_mix(port, &mix->mix);
mix->active = false;
pw_array_init(&mix->buffers, 32);
pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64);
}
@ -725,9 +732,7 @@ static struct mix *ensure_mix(struct node_data *data,
spa_list_remove(&mix->link);
mix_init(mix, port, mix_id);
spa_list_append(&data->mix[direction], &mix->link);
mix->active = false;
return mix;
}
@ -1006,7 +1011,7 @@ client_node_port_set_param(void *object,
}
}
res = pw_port_set_param(port, id, flags, param);
res = pw_port_set_param(port, SPA_ID_INVALID, id, flags, param);
if (res < 0)
goto done;