stream: implement Flush

Flush out pending samples in the resampler
Set the io area status to _OK so that buffers are recycled and
new buffers produced.
This commit is contained in:
Wim Taymans 2020-11-17 17:15:26 +01:00
parent cd25a1e5e7
commit 79fb95bf90
8 changed files with 36 additions and 6 deletions

View file

@ -517,6 +517,9 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
configure_format(this, 0, NULL); configure_format(this, 0, NULL);
SPA_FALLTHROUGH SPA_FALLTHROUGH
case SPA_NODE_COMMAND_Flush:
this->io_buffers.status = SPA_STATUS_OK;
SPA_FALLTHROUGH
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -388,6 +388,14 @@ static int negotiate_link_buffers(struct impl *this, struct link *link)
return 0; return 0;
} }
static void flush_convert(struct impl *this)
{
int i;
spa_log_debug(this->log, NAME " %p: %d", this, this->n_links);
for (i = 0; i < this->n_links; i++)
this->links[i].io.status = SPA_STATUS_OK;
}
static void clean_convert(struct impl *this) static void clean_convert(struct impl *this)
{ {
int i; int i;
@ -809,6 +817,9 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
clean_convert(this); clean_convert(this);
SPA_FALLTHROUGH SPA_FALLTHROUGH
case SPA_NODE_COMMAND_Flush:
flush_convert(this);
SPA_FALLTHROUGH
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -466,7 +466,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true; this->started = true;
break; break;
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
SPA_FALLTHROUGH case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -248,7 +248,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true; this->started = true;
break; break;
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
SPA_FALLTHROUGH case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -348,7 +348,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true; this->started = true;
break; break;
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
SPA_FALLTHROUGH case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -248,6 +248,17 @@ static void update_rate_match(struct impl *this)
} }
} }
static void reset_node(struct impl *this)
{
struct port *outport, *inport;
outport = GET_OUT_PORT(this, 0);
inport = GET_IN_PORT(this, 0);
resample_reset(&this->resample);
outport->offset = 0;
inport->offset = 0;
}
static int impl_node_send_command(void *object, const struct spa_command *command) static int impl_node_send_command(void *object, const struct spa_command *command)
{ {
struct impl *this = object; struct impl *this = object;
@ -262,9 +273,10 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
update_rate_match(this); update_rate_match(this);
break; break;
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
SPA_FALLTHROUGH case SPA_NODE_COMMAND_Flush:
reset_node(this);
SPA_FALLTHROUGH;
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
resample_reset(&this->resample);
this->started = false; this->started = false;
break; break;
default: default:

View file

@ -330,7 +330,7 @@ static int impl_node_send_command(void *object, const struct spa_command *comman
this->started = true; this->started = true;
break; break;
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
SPA_FALLTHROUGH case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
this->started = false; this->started = false;
break; break;

View file

@ -388,6 +388,7 @@ static int impl_send_command(void *object, const struct spa_command *command)
switch (SPA_NODE_COMMAND_ID(command)) { switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Suspend: case SPA_NODE_COMMAND_Suspend:
case SPA_NODE_COMMAND_Flush:
case SPA_NODE_COMMAND_Pause: case SPA_NODE_COMMAND_Pause:
pw_loop_invoke(impl->context->main_loop, pw_loop_invoke(impl->context->main_loop,
NULL, 0, NULL, 0, false, impl); NULL, 0, NULL, 0, false, impl);
@ -1897,5 +1898,8 @@ int pw_stream_flush(struct pw_stream *stream, bool drain)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_loop_invoke(impl->context->data_loop, pw_loop_invoke(impl->context->data_loop,
drain ? do_drain : do_flush, 1, NULL, 0, true, impl); drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
if (!drain)
spa_node_send_command(impl->node->node,
&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Flush));
return 0; return 0;
} }