mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
module-rtp: Replace "started" boolean with internal state enum
The started boolean is insufficient to fully cover the possible internal states. For this reason, it needs to be replaced by an enum that covers these states. Also, due to potential access by both the dataloop and the mainloop, access to that internal state needs to be synchronized. Finally, a variable "internal_state" makes for code that is easier to read, since it emphasizes that this is state that is fully internal inside the stream (and is not visible to the rtp-sink and rtp-source modules for example).
This commit is contained in:
parent
3476e77714
commit
37e597ff0a
2 changed files with 216 additions and 28 deletions
|
|
@ -385,19 +385,31 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
|
|||
uint32_t stride, timestamp;
|
||||
struct iovec iov[3];
|
||||
struct rtp_header header;
|
||||
bool insufficient_data;
|
||||
|
||||
avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp);
|
||||
tosend = impl->psamples;
|
||||
if (avail < tosend)
|
||||
if (impl->started)
|
||||
insufficient_data = (avail < tosend);
|
||||
if (insufficient_data) {
|
||||
/* There is insufficient data for even a single full packet.
|
||||
* Handle this depending on the current state. */
|
||||
|
||||
if (get_internal_stream_state(impl) == RTP_STREAM_INTERNAL_STATE_STARTED) {
|
||||
/* If the stream is started, just try again later,
|
||||
* when more data comes in. Enough data for covering
|
||||
* the psamples amount might be available by then. */
|
||||
goto done;
|
||||
else {
|
||||
/* send last packet before emitting state_changed */
|
||||
} else {
|
||||
/* There is not enough data for a full packet, but the
|
||||
* stream is no longer in the started state, so the
|
||||
* remaining data needs to be flushed out now. */
|
||||
tosend = avail;
|
||||
num_packets = 1;
|
||||
}
|
||||
else
|
||||
} else {
|
||||
/* There is sufficient data for one or more full packets. */
|
||||
num_packets = SPA_MIN(num_packets, (uint32_t)(avail / tosend));
|
||||
}
|
||||
|
||||
stride = impl->stride;
|
||||
|
||||
|
|
@ -434,18 +446,30 @@ static void rtp_audio_flush_packets(struct impl *impl, uint32_t num_packets, uin
|
|||
num_packets--;
|
||||
}
|
||||
spa_ringbuffer_read_update(&impl->ring, timestamp);
|
||||
|
||||
done:
|
||||
if (impl->timer_running) {
|
||||
if (impl->started) {
|
||||
if (avail < tosend) {
|
||||
if (get_internal_stream_state(impl) != RTP_STREAM_INTERNAL_STATE_STOPPING) {
|
||||
/* If the stream isn't being stopped, and instead is running,
|
||||
* keep the timer running if there was sufficient data to
|
||||
* produce at least one packet. That's because by the time
|
||||
* the next timer expiration happens, there might be enough
|
||||
* data available for even more packets. However, if there
|
||||
* wasn't sufficient data for even one packet, stop the
|
||||
* timer, since it is likely then that input has ceased
|
||||
* (at least for now). */
|
||||
if (insufficient_data) {
|
||||
set_timer(impl, 0, 0);
|
||||
}
|
||||
} else if (avail <= 0) {
|
||||
bool started = false;
|
||||
|
||||
/* the stream has been stopped and all packets have been sent */
|
||||
/* All packets were sent, and the stream is in the stopping
|
||||
* state. This means that stream_stop() was called while this
|
||||
* timer was still sending out remaining packets, and thus,
|
||||
* stream_stop() could not immediately change the stream to the
|
||||
* stopping state. Now that all packets have gone out, finish
|
||||
* the stopping state change. */
|
||||
set_timer(impl, 0, 0);
|
||||
pw_loop_invoke(impl->main_loop, do_emit_state_changed, SPA_ID_INVALID, &started, sizeof started, false, impl);
|
||||
pw_loop_invoke(impl->main_loop, do_finish_stopping_state, SPA_ID_INVALID, NULL, 0, false, impl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
#include <sys/socket.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
#include <spa/utils/atomic.h>
|
||||
#include <spa/utils/result.h>
|
||||
#include <spa/utils/json.h>
|
||||
#include <spa/utils/ringbuffer.h>
|
||||
|
|
@ -49,6 +50,30 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
|
|||
#define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l)
|
||||
#define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq)
|
||||
|
||||
enum rtp_stream_internal_state {
|
||||
/* The state when the stream is idle / stopped. The background
|
||||
* timer that may be used for sending out buffered data
|
||||
* must not be running in this state. If the separate PTP sender
|
||||
* is being used, then that one must be inactive in this state.
|
||||
* Set at the end of stream_stop() and when the stream is created. */
|
||||
RTP_STREAM_INTERNAL_STATE_STOPPED,
|
||||
/* Temporary state that is set at the beginning of stream_stop().
|
||||
* If a full stop is possible, stream_stop() immediately moves on
|
||||
* to the STOPPED state. However, if the timer is running (because it
|
||||
* is still sending out buffered data), the state remains set to
|
||||
* STOPPING until the timer has sent out all data, at which point
|
||||
* the timer finishes the change to the STOPPED state. */
|
||||
RTP_STREAM_INTERNAL_STATE_STOPPING,
|
||||
/* Temporary state that is set at the beginning of stream_start().
|
||||
* It is mainly used for preventing do_finish_stopping_state()
|
||||
* from setting a stopped state. See do_finish_stopping_state()
|
||||
* for details. */
|
||||
RTP_STREAM_INTERNAL_STATE_STARTING,
|
||||
/* The state when the stream has been started. It is set at the
|
||||
* end of stream_start(). */
|
||||
RTP_STREAM_INTERNAL_STATE_STARTED
|
||||
};
|
||||
|
||||
struct impl {
|
||||
struct spa_audio_info info;
|
||||
struct spa_audio_info stream_info;
|
||||
|
|
@ -100,11 +125,19 @@ struct impl {
|
|||
|
||||
unsigned direct_timestamp:1;
|
||||
unsigned always_process:1;
|
||||
unsigned started:1;
|
||||
unsigned have_sync:1;
|
||||
unsigned receiving:1;
|
||||
unsigned first:1;
|
||||
|
||||
/* IMPORTANT: Do NOT access this value directly. Use the atomic
|
||||
* set_internal_stream_state() / get_internal_stream_state() accessors,
|
||||
* since the state is accessed by both the dataloop and mainloop. To
|
||||
* prevent memory visibility issues, atomic accessors need to be used.
|
||||
*
|
||||
* Also, its type here is uint32_t. See the explanation about atomic
|
||||
* access below for the reason why. */
|
||||
uint32_t internal_state;
|
||||
|
||||
struct pw_loop *main_loop;
|
||||
struct pw_loop *data_loop;
|
||||
struct spa_source *timer;
|
||||
|
|
@ -118,6 +151,9 @@ struct impl {
|
|||
* requires filling the ring buffer with something other than nullbytes
|
||||
* (this can happen with DSD for example). */
|
||||
void (*reset_ringbuffer)(struct impl *impl);
|
||||
/* Called by stream_start() to stop any running timer before continuing to
|
||||
* start the stream. This is necessary, because by that point, any remaining
|
||||
* buffered data is stale, and the timer would keep sending it out. */
|
||||
void (*stop_timer)(struct impl *impl);
|
||||
void (*flush_timeout)(struct impl *impl, uint64_t expirations);
|
||||
void (*deinit)(struct impl *impl, enum spa_direction direction);
|
||||
|
|
@ -144,17 +180,83 @@ struct impl {
|
|||
uint32_t rtp_last_ts;
|
||||
};
|
||||
|
||||
static int do_emit_state_changed(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
|
||||
{
|
||||
struct impl *impl = user_data;
|
||||
bool started = *((bool *)data);
|
||||
/* Atomic internal_state accessors.
|
||||
*
|
||||
* These are necessary because internal_state may be accessed by both
|
||||
* the dataloop (in the flush_timeout and do_finish_stopping_state())
|
||||
* and the mainloop (in stream_start() and stream_stop()). Even though
|
||||
* stream_start() and stream_stop() may not necessarily run at the
|
||||
* same time when the dataloop is active, there is still a potential
|
||||
* memory visibility issue if the state is set in one loop but that
|
||||
* change is not yet propagated to other CPU cores, causing the other
|
||||
* loop (which runs in a separate thread) to still see the old state.
|
||||
*
|
||||
* Also, since GCC __atomic built-ins (which the SPA macros use) are
|
||||
* designed to work with integral scalar or pointer type that is 1,
|
||||
* 2, 4, or 8 bytes in length, impl->internal_state is of type uint33_t.
|
||||
* This guarantee a correct size for the built-ins. The accessors take
|
||||
* care of casting from/to rtp_stream_internal_state . The relevant
|
||||
* GCC manual page for this is:
|
||||
* https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
|
||||
*/
|
||||
|
||||
if (started) {
|
||||
rtp_stream_emit_open_connection(impl, NULL);
|
||||
} else {
|
||||
rtp_stream_emit_close_connection(impl, NULL);
|
||||
static inline enum rtp_stream_internal_state get_internal_stream_state(struct impl *impl) {
|
||||
return (enum rtp_stream_internal_state)SPA_ATOMIC_LOAD(impl->internal_state);
|
||||
}
|
||||
|
||||
static inline void set_internal_stream_state(struct impl *impl, enum rtp_stream_internal_state state) {
|
||||
SPA_ATOMIC_STORE(impl->internal_state, (uint32_t)state);
|
||||
}
|
||||
|
||||
static int do_finish_stopping_state(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
|
||||
{
|
||||
int res = 0;
|
||||
struct impl *impl = user_data;
|
||||
enum rtp_stream_internal_state cur_state = get_internal_stream_state(impl);
|
||||
|
||||
/* The checks here cover a corner case that can happen when the
|
||||
* following conditions are met (in order):
|
||||
*
|
||||
* 1. Stream is stopped via stream_stop(), but the timer is still
|
||||
* running, meaning that internal_state stays at STOPPING.
|
||||
* 2. The timer manages to invoke do_finish_stopping_state()
|
||||
* asynchronously, meaning that the invocation is queued.
|
||||
* 3. Immediately afterwards, the state is started again via
|
||||
* stream_start(). That call stops the timer, but does not
|
||||
* undo the do_finish_stopping_state() invocation.
|
||||
* The internal_state is set to STARTED.
|
||||
* 4. The queued do_finish_stopping_state() invocation takes
|
||||
* place, and it tries to set the internal_state to STOPPED.
|
||||
*
|
||||
* In such a case, the STARTED state would be set again to STOPPED,
|
||||
* even though the stream has been started and is running.
|
||||
*
|
||||
* To fix this, check if the current internal state is STOPPING.
|
||||
* This is the only case where setting the state to STOPPED makes
|
||||
* sense, since that is why this do_finish_stopping_state() exists -
|
||||
* to finish a stopping procedure that could not be finished in
|
||||
* stream_stop() immediately. If the stream is restarted, then this
|
||||
* delayed stop is no longer needed. Canceling the queued invocation
|
||||
* is not possible (PipeWire has no cancellation API for this),
|
||||
* so this approach needs to be used instead. */
|
||||
|
||||
switch (cur_state) {
|
||||
case RTP_STREAM_INTERNAL_STATE_STOPPING:
|
||||
pw_log_debug("setting \"stopped\" state after timer expired");
|
||||
break;
|
||||
default:
|
||||
pw_log_debug("\"stopped\" state change event emission was scheduled, "
|
||||
"but the current state is not \"stopping\"; ignoring "
|
||||
"scheduled request");
|
||||
return 0;
|
||||
}
|
||||
|
||||
rtp_stream_emit_close_connection(impl, &res);
|
||||
if (res > 0)
|
||||
pw_log_debug("closed connection");
|
||||
else if (res < 0)
|
||||
pw_log_error("error while closing connection: %s", spa_strerror(res));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -204,12 +306,17 @@ static void stream_destroy(void *d)
|
|||
static int stream_start(struct impl *impl)
|
||||
{
|
||||
int res;
|
||||
enum rtp_stream_internal_state cur_state;
|
||||
|
||||
if (impl->started)
|
||||
cur_state = get_internal_stream_state(impl);
|
||||
|
||||
if (cur_state == RTP_STREAM_INTERNAL_STATE_STARTED)
|
||||
return 0;
|
||||
|
||||
impl->first = true;
|
||||
|
||||
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STARTING);
|
||||
|
||||
/* Stop the timer now (if the timer is used). Any lingering timer
|
||||
* will try to send data that is stale at this point, especially
|
||||
* after the ring buffer contents get reset. Worse, the timer might
|
||||
|
|
@ -218,6 +325,33 @@ static int stream_start(struct impl *impl)
|
|||
if (impl->stop_timer)
|
||||
impl->stop_timer(impl);
|
||||
|
||||
res = 0;
|
||||
rtp_stream_emit_close_connection(impl, &res);
|
||||
|
||||
/* A leftover connection only makes sense if the stream was in the
|
||||
* STOPPING state prior to this stream_start() call, because then,
|
||||
* the previous stream_stop() call could not finish stopping the
|
||||
* stream, and had to leave the connection open so the timer can
|
||||
* finish sending out packets. If stream_start() was called before
|
||||
* the timer finished, then the stream is still in the STOPPING
|
||||
* state, was thus not properly stopped, and the connection is still
|
||||
* there. This is not an error, but a consequence of restarting the
|
||||
* stream early enough.
|
||||
* If however the state prior to this stream_start() call was
|
||||
* anything other than STOPPING, then something is wrong. */
|
||||
if (res > 0) {
|
||||
if (cur_state != RTP_STREAM_INTERNAL_STATE_STOPPING) {
|
||||
pw_log_warn("there was already an open connection, "
|
||||
"even though none was expected");
|
||||
} else {
|
||||
pw_log_debug("closed leftover connection since a scheduled "
|
||||
"\"stopped\" state change was cancelled "
|
||||
"and we are still in the \"stopping\" state");
|
||||
}
|
||||
} else if (res < 0) {
|
||||
pw_log_error("error while closing leftover connection: %s", spa_strerror(res));
|
||||
}
|
||||
|
||||
impl->reset_ringbuffer(impl);
|
||||
|
||||
res = 0;
|
||||
|
|
@ -239,20 +373,42 @@ static int stream_start(struct impl *impl)
|
|||
pw_log_info("activated pw_filter for separate sender");
|
||||
}
|
||||
|
||||
impl->started = true;
|
||||
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STARTED);
|
||||
pw_log_info("stream started");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int stream_stop(struct impl *impl)
|
||||
{
|
||||
if (!impl->started)
|
||||
return 0;
|
||||
switch (get_internal_stream_state(impl)) {
|
||||
case RTP_STREAM_INTERNAL_STATE_STOPPING:
|
||||
case RTP_STREAM_INTERNAL_STATE_STOPPED:
|
||||
return 0;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
/* if timer is running, the state changed event must be emitted by the timer after all packets have been sent */
|
||||
if (!impl->timer_running)
|
||||
rtp_stream_emit_close_connection(impl, NULL);
|
||||
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPING);
|
||||
|
||||
/* Proper stop is only possible if the timer is currently not running,
|
||||
* because a stop involves closing the connection. If the timer is still
|
||||
* running, it needs an open connection for sending out remaining packets. */
|
||||
if (!impl->timer_running) {
|
||||
int res;
|
||||
pw_log_info("closing connection as part of stopping the stream");
|
||||
rtp_stream_emit_close_connection(impl, &res);
|
||||
if (res > 0) {
|
||||
pw_log_debug("closed connection");
|
||||
} else if (res < 0) {
|
||||
pw_log_error("error while closing connection: %s", spa_strerror(res));
|
||||
}
|
||||
} else {
|
||||
pw_log_info("cannot close connection yet - timer is still running");
|
||||
}
|
||||
|
||||
/* Stopping the separate sender can be done even if the timer is still
|
||||
* running because it has no dependency on said timer. */
|
||||
if (impl->separate_sender) {
|
||||
struct spa_dict_item items[1];
|
||||
items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_ALWAYS_PROCESS, "false");
|
||||
|
|
@ -263,7 +419,14 @@ static int stream_stop(struct impl *impl)
|
|||
pw_filter_set_active(impl->ptp_sender, false);
|
||||
}
|
||||
|
||||
impl->started = false;
|
||||
/* Only switch to STOPPED if the stream could _actually_ be stopped,
|
||||
* meaning that the timer was no longer running, and the connection
|
||||
* could be closed. */
|
||||
if (!impl->timer_running) {
|
||||
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED);
|
||||
pw_log_info("stream stopped");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -372,6 +535,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core,
|
|||
goto out;
|
||||
}
|
||||
impl->first = true;
|
||||
set_internal_stream_state(impl, RTP_STREAM_INTERNAL_STATE_STOPPED);
|
||||
spa_hook_list_init(&impl->listener_list);
|
||||
impl->direction = direction;
|
||||
impl->stream_events = stream_events;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue