mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-02-07 04:06:12 -05:00
modules: port modules to timer-queue
Instead of using timerfd, use the context timer-queue to schedule timeouts. This saves fds and removes some redundant code. Make the rtp-source timeout and standby code a bit better by using atomic operations.
This commit is contained in:
parent
b220f85790
commit
a75cea96fb
11 changed files with 157 additions and 238 deletions
|
|
@ -188,6 +188,7 @@ struct impl {
|
|||
|
||||
struct pw_loop *main_loop;
|
||||
struct pw_loop *data_loop;
|
||||
struct pw_timer_queue *timer_queue;
|
||||
|
||||
struct pw_core *core;
|
||||
struct spa_hook core_listener;
|
||||
|
|
@ -200,10 +201,10 @@ struct impl {
|
|||
bool always_process;
|
||||
uint32_t cleanup_interval;
|
||||
|
||||
struct spa_source *standby_timer;
|
||||
struct pw_timer standby_timer;
|
||||
/* This timer is used when the first stream_start() call fails because
|
||||
* of an ENODEV error (see the stream_start() code for details) */
|
||||
struct spa_source *stream_start_retry_timer;
|
||||
struct pw_timer stream_start_retry_timer;
|
||||
|
||||
struct pw_properties *stream_props;
|
||||
struct rtp_stream *stream;
|
||||
|
|
@ -216,7 +217,11 @@ struct impl {
|
|||
uint8_t *buffer;
|
||||
size_t buffer_size;
|
||||
|
||||
bool receiving;
|
||||
#define STATE_IDLE 0
|
||||
#define STATE_PROBE 1
|
||||
#define STATE_RECEIVING 2
|
||||
#define STATE_STOPPING 3
|
||||
int state;
|
||||
bool may_pause;
|
||||
bool standby;
|
||||
bool waiting;
|
||||
|
|
@ -269,9 +274,11 @@ on_rtp_io(void *data, int fd, uint32_t mask)
|
|||
goto receive_error;
|
||||
}
|
||||
|
||||
if (!impl->receiving) {
|
||||
impl->receiving = true;
|
||||
pw_loop_invoke(impl->main_loop, do_start, 1, NULL, 0, false, impl);
|
||||
if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) {
|
||||
if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) {
|
||||
if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING))
|
||||
pw_loop_invoke(impl->main_loop, do_start, 1, NULL, 0, false, impl);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
|
@ -385,23 +392,6 @@ error:
|
|||
return res;
|
||||
}
|
||||
|
||||
static void stream_open_connection(void *data, int *result);
|
||||
|
||||
static void on_open_connection_retry_timer_event(void *data, uint64_t expirations)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_debug("trying again to open connection after previous attempt failed with ENODEV");
|
||||
stream_open_connection(impl, NULL);
|
||||
}
|
||||
|
||||
static void destroy_stream_start_retry_timer(struct impl *impl)
|
||||
{
|
||||
if (impl->stream_start_retry_timer != NULL) {
|
||||
pw_loop_destroy_source(impl->main_loop, impl->stream_start_retry_timer);
|
||||
impl->stream_start_retry_timer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void stream_report_error(void *data, const char *error)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
|
|
@ -411,6 +401,15 @@ static void stream_report_error(void *data, const char *error)
|
|||
}
|
||||
}
|
||||
|
||||
static void stream_open_connection(void *data, int *result);
|
||||
|
||||
static void on_open_connection_retry_timer_event(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_debug("trying again to open connection after previous attempt failed with ENODEV");
|
||||
stream_open_connection(impl, NULL);
|
||||
}
|
||||
|
||||
static void stream_open_connection(void *data, int *result)
|
||||
{
|
||||
int res = 0;
|
||||
|
|
@ -438,21 +437,12 @@ static void stream_open_connection(void *data, int *result)
|
|||
pw_log_warn("failed to create socket because network device is not ready "
|
||||
"and present yet; will try again");
|
||||
|
||||
if (impl->stream_start_retry_timer == NULL) {
|
||||
struct timespec value, interval;
|
||||
|
||||
impl->stream_start_retry_timer = pw_loop_add_timer(impl->main_loop,
|
||||
on_open_connection_retry_timer_event, impl);
|
||||
/* Use a 1-second retry interval. The network interfaces
|
||||
* are likely to be up and running then. */
|
||||
value.tv_sec = 1;
|
||||
value.tv_nsec = 0;
|
||||
interval.tv_sec = 1;
|
||||
interval.tv_nsec = 0;
|
||||
pw_loop_update_timer(impl->main_loop, impl->stream_start_retry_timer, &value,
|
||||
&interval, false);
|
||||
}
|
||||
/* Do nothing if the timer is already up. */
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
/* Use a 1-second retry interval. The network interfaces
|
||||
* are likely to be up and running then. */
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->stream_start_retry_timer,
|
||||
NULL, 1 * SPA_NSEC_PER_SEC,
|
||||
on_open_connection_retry_timer_event, impl);
|
||||
|
||||
/* It is important to return 0 in this case. Otherwise, the nonzero return
|
||||
* value will later be propagated through the core as an error. */
|
||||
|
|
@ -463,7 +453,7 @@ static void stream_open_connection(void *data, int *result)
|
|||
/* If ENODEV was returned earlier, and the stream_start_retry_timer
|
||||
* was consequently created, but then a non-ENODEV error occurred,
|
||||
* the timer must be stopped and removed. */
|
||||
destroy_stream_start_retry_timer(impl);
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
res = -errno;
|
||||
goto finish;
|
||||
}
|
||||
|
|
@ -471,7 +461,7 @@ static void stream_open_connection(void *data, int *result)
|
|||
|
||||
/* Cleanup the timer in case ENODEV occurred earlier, and this time,
|
||||
* the socket creation succeeded. */
|
||||
destroy_stream_start_retry_timer(impl);
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
|
||||
impl->source = pw_loop_add_io(impl->data_loop, fd,
|
||||
SPA_IO_IN, true, on_rtp_io, impl);
|
||||
|
|
@ -504,7 +494,7 @@ static void stream_close_connection(void *data, int *result)
|
|||
|
||||
pw_log_info("stopping RTP listener");
|
||||
|
||||
destroy_stream_start_retry_timer(impl);
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
|
||||
pw_loop_destroy_source(impl->data_loop, impl->source);
|
||||
impl->source = NULL;
|
||||
|
|
@ -586,14 +576,14 @@ static const struct rtp_stream_events stream_events = {
|
|||
.param_changed = stream_param_changed,
|
||||
};
|
||||
|
||||
static void on_standby_timer_event(void *data, uint64_t expirations)
|
||||
static void on_standby_timer_event(void *data)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
|
||||
pw_log_debug("standby timer event; receiving: %d standby: %d waiting: %d",
|
||||
impl->receiving, impl->standby, impl->waiting);
|
||||
pw_log_debug("standby timer event; state: %d standby: %d waiting: %d",
|
||||
impl->state, impl->standby, impl->waiting);
|
||||
|
||||
if (!impl->receiving) {
|
||||
if (SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_STOPPING)) {
|
||||
if (!impl->standby) {
|
||||
struct spa_dict_item item[1];
|
||||
|
||||
|
|
@ -608,10 +598,15 @@ static void on_standby_timer_event(void *data, uint64_t expirations)
|
|||
rtp_stream_set_active(impl->stream, false);
|
||||
}
|
||||
//pw_impl_module_schedule_destroy(impl->module);
|
||||
SPA_ATOMIC_STORE(impl->state, STATE_IDLE);
|
||||
} else {
|
||||
pw_log_debug("timeout, keeping active RTP source");
|
||||
SPA_ATOMIC_CAS(impl->state, STATE_RECEIVING, STATE_PROBE);
|
||||
}
|
||||
impl->receiving = false;
|
||||
|
||||
pw_timer_queue_add(impl->timer_queue, &impl->standby_timer,
|
||||
&impl->standby_timer.timeout, impl->cleanup_interval * SPA_NSEC_PER_SEC,
|
||||
on_standby_timer_event, impl);
|
||||
}
|
||||
|
||||
static void core_destroy(void *d)
|
||||
|
|
@ -636,10 +631,8 @@ static void impl_destroy(struct impl *impl)
|
|||
if (impl->core && impl->do_disconnect)
|
||||
pw_core_disconnect(impl->core);
|
||||
|
||||
if (impl->standby_timer)
|
||||
pw_loop_destroy_source(impl->main_loop, impl->standby_timer);
|
||||
|
||||
destroy_stream_start_retry_timer(impl);
|
||||
pw_timer_queue_cancel(&impl->standby_timer);
|
||||
pw_timer_queue_cancel(&impl->stream_start_retry_timer);
|
||||
|
||||
if (impl->data_loop)
|
||||
pw_context_release_loop(impl->context, impl->data_loop);
|
||||
|
|
@ -695,7 +688,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
struct pw_context *context = pw_impl_module_get_context(module);
|
||||
struct impl *impl;
|
||||
const char *str, *sess_name;
|
||||
struct timespec value, interval;
|
||||
struct pw_properties *props, *stream_props;
|
||||
int64_t ts_offset;
|
||||
char addr[128];
|
||||
|
|
@ -722,6 +714,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
impl->module = module;
|
||||
impl->context = context;
|
||||
impl->main_loop = pw_context_get_main_loop(context);
|
||||
impl->timer_queue = pw_context_get_timer_queue(context);
|
||||
impl->data_loop = pw_context_acquire_loop(context, &props->dict);
|
||||
|
||||
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
|
||||
|
|
@ -830,17 +823,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
|
|||
&impl->core_listener,
|
||||
&core_events, impl);
|
||||
|
||||
impl->standby_timer = pw_loop_add_timer(impl->main_loop, on_standby_timer_event, impl);
|
||||
if (impl->standby_timer == NULL) {
|
||||
res = -errno;
|
||||
pw_log_error("can't create timer source: %m");
|
||||
if ((res = pw_timer_queue_add(impl->timer_queue, &impl->standby_timer,
|
||||
NULL, impl->cleanup_interval * SPA_NSEC_PER_SEC,
|
||||
on_standby_timer_event, impl)) < 0) {
|
||||
pw_log_error("can't add timer: %s", spa_strerror(res));
|
||||
goto out;
|
||||
}
|
||||
value.tv_sec = impl->cleanup_interval;
|
||||
value.tv_nsec = 0;
|
||||
interval.tv_sec = impl->cleanup_interval;
|
||||
interval.tv_nsec = 0;
|
||||
pw_loop_update_timer(impl->main_loop, impl->standby_timer, &value, &interval, false);
|
||||
|
||||
impl->stream = rtp_stream_new(impl->core,
|
||||
PW_DIRECTION_OUTPUT, pw_properties_copy(stream_props),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue