diff --git a/src/modules/module-avb/avb.c b/src/modules/module-avb/avb.c index 7bfa85eb9..eeacb7b7c 100644 --- a/src/modules/module-avb/avb.c +++ b/src/modules/module-avb/avb.c @@ -41,6 +41,7 @@ struct pw_avb *pw_avb_new(struct pw_context *context, impl->context = context; impl->loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); impl->props = props; impl->core = pw_context_get_object(context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { diff --git a/src/modules/module-avb/avdecc.c b/src/modules/module-avb/avdecc.c index fd80ec8ac..0e2b33c34 100644 --- a/src/modules/module-avb/avdecc.c +++ b/src/modules/module-avb/avdecc.c @@ -14,6 +14,7 @@ #include #include +#include #include @@ -39,12 +40,18 @@ #define server_emit_periodic(s,n) server_emit(s, periodic, 0, n) #define server_emit_command(s,n,c,a,f) server_emit(s, command, 0, n, c, a, f) -static void on_timer_event(void *data, uint64_t expirations) +static void on_timer_event(void *data) { struct server *server = data; + struct impl *impl = server->impl; struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); server_emit_periodic(server, SPA_TIMESPEC_TO_NSEC(&now)); + + pw_timer_queue_add(impl->timer_queue, &server->timer, + &server->timer.timeout, DEFAULT_INTERVAL * SPA_NSEC_PER_SEC, + on_timer_event, server); } static void on_socket_data(void *data, int fd, uint32_t mask) @@ -201,7 +208,6 @@ static int setup_socket(struct server *server) struct impl *impl = server->impl; int fd, res; static const uint8_t bmac[6] = AVB_BROADCAST_MAC; - struct timespec value, interval; fd = avb_server_make_socket(server, AVB_TSN_ETH, bmac); if (fd < 0) @@ -215,18 +221,13 @@ static int setup_socket(struct server *server) pw_log_error("server %p: can't create server source: %m", impl); goto error_no_source; } - server->timer = pw_loop_add_timer(impl->loop, on_timer_event, server); - if (server->timer == NULL) { - res = -errno; - pw_log_error("server %p: can't create timer source: %m", impl); + + if ((res = pw_timer_queue_add(impl->timer_queue, &server->timer, + NULL, DEFAULT_INTERVAL * SPA_NSEC_PER_SEC, + on_timer_event, server)) < 0) { + pw_log_error("server %p: can't create timer: %s", impl, spa_strerror(res)); goto error_no_timer; } - value.tv_sec = 0; - value.tv_nsec = 1; - interval.tv_sec = DEFAULT_INTERVAL; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->loop, server->timer, &value, &interval, false); - return 0; error_no_timer: @@ -310,8 +311,7 @@ void avdecc_server_free(struct server *server) spa_list_remove(&server->link); if (server->source) pw_loop_destroy_source(impl->loop, server->source); - if (server->timer) - pw_loop_destroy_source(impl->loop, server->timer); + pw_timer_queue_cancel(&server->timer); spa_hook_list_clean(&server->listener_list); free(server); } diff --git a/src/modules/module-avb/internal.h b/src/modules/module-avb/internal.h index 4a108a486..90222511a 100644 --- a/src/modules/module-avb/internal.h +++ b/src/modules/module-avb/internal.h @@ -19,6 +19,7 @@ struct avb_mrp; struct impl { struct pw_loop *loop; + struct pw_timer_queue *timer_queue; struct pw_context *context; struct spa_hook context_listener; struct pw_core *core; @@ -61,7 +62,7 @@ struct server { int ifindex; struct spa_source *source; - struct spa_source *timer; + struct pw_timer timer; struct spa_hook_list listener_list; diff --git a/src/modules/module-netjack2-driver.c b/src/modules/module-netjack2-driver.c index 5cd93b253..cacdf9368 100644 --- a/src/modules/module-netjack2-driver.c +++ b/src/modules/module-netjack2-driver.c @@ -225,6 +225,7 @@ struct impl { struct pw_loop *main_loop; struct pw_loop *data_loop; struct spa_system *system; + struct pw_timer_queue *timer_queue; #define MODE_SINK (1<<0) #define MODE_SOURCE (1<<1) @@ -263,7 +264,7 @@ struct impl { struct spa_source *setup_socket; struct spa_source *socket; - struct spa_source *timer; + struct pw_timer timer; int32_t init_retry; struct netjack2_peer peer; @@ -801,14 +802,14 @@ error: return res; } +static void on_timer_event(void *data); + static void update_timer(struct impl *impl, uint64_t timeout) { - struct timespec value, interval; - value.tv_sec = 0; - value.tv_nsec = timeout ? 1 : 0; - interval.tv_sec = timeout; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->main_loop, impl->timer, &value, &interval, false); + pw_timer_queue_cancel(&impl->timer); + pw_timer_queue_add(impl->timer_queue, &impl->timer, + NULL, timeout * SPA_NSEC_PER_SEC, + on_timer_event, impl); } static bool encoding_supported(uint32_t encoder) @@ -1132,7 +1133,7 @@ static void restart_netjack2_socket(struct impl *impl) create_netjack2_socket(impl); } -static void on_timer_event(void *data, uint64_t expirations) +static void on_timer_event(void *data) { struct impl *impl = data; @@ -1193,8 +1194,7 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->timer) - pw_loop_destroy_source(impl->main_loop, impl->timer); + pw_timer_queue_cancel(&impl->timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -1283,6 +1283,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) } impl->main_loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); impl->system = impl->main_loop->system; impl->source.impl = impl; @@ -1370,13 +1371,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); - goto error; - } - if ((res = create_netjack2_socket(impl)) < 0) goto error; diff --git a/src/modules/module-protocol-pulse/modules/module-combine-sink.c b/src/modules/module-protocol-pulse/modules/module-combine-sink.c index aa5881b73..962c22293 100644 --- a/src/modules/module-protocol-pulse/modules/module-combine-sink.c +++ b/src/modules/module-protocol-pulse/modules/module-combine-sink.c @@ -71,7 +71,7 @@ struct module_combine_sink_data { struct pw_properties *combine_props; struct pw_properties *stream_props; - struct spa_source *sinks_timeout; + struct pw_timer sinks_timeout; unsigned int sinks_pending; unsigned int load_emitted:1; @@ -128,7 +128,7 @@ static const struct pw_manager_events manager_events = { .added = manager_added, }; -static void on_sinks_timeout(void *d, uint64_t count) +static void on_sinks_timeout(void *d) { struct module_combine_sink_data *data = d; @@ -214,13 +214,10 @@ static int module_combine_sink_load(struct module *module) pw_manager_add_listener(data->manager, &data->manager_listener, &manager_events, data); - data->sinks_timeout = pw_loop_add_timer(module->impl->main_loop, on_sinks_timeout, data); - if (data->sinks_timeout) { - struct timespec timeout = {0}; - timeout.tv_sec = TIMEOUT_SINKS_MSEC / 1000; - timeout.tv_nsec = (TIMEOUT_SINKS_MSEC % 1000) * SPA_NSEC_PER_MSEC; - pw_loop_update_timer(module->impl->main_loop, data->sinks_timeout, &timeout, NULL, false); - } + pw_timer_queue_add(module->impl->timer_queue, &data->sinks_timeout, + NULL, TIMEOUT_SINKS_MSEC * SPA_NSEC_PER_MSEC, + on_sinks_timeout, data); + return data->load_emitted ? 0 : SPA_RESULT_RETURN_ASYNC(0); } @@ -228,8 +225,7 @@ static int module_combine_sink_unload(struct module *module) { struct module_combine_sink_data *d = module->user_data; - if (d->sinks_timeout != NULL) - pw_loop_destroy_source(module->impl->main_loop, d->sinks_timeout); + pw_timer_queue_cancel(&d->sinks_timeout); if (d->mod != NULL) { spa_hook_remove(&d->mod_listener); diff --git a/src/modules/module-pulse-tunnel.c b/src/modules/module-pulse-tunnel.c index 6e6a7f2e1..c9b9e5f9d 100644 --- a/src/modules/module-pulse-tunnel.c +++ b/src/modules/module-pulse-tunnel.c @@ -148,6 +148,7 @@ static const struct spa_dict_item module_props[] = { struct impl { struct pw_context *context; struct pw_loop *main_loop; + struct pw_timer_queue *timer_queue; #define MODE_SINK 0 #define MODE_SOURCE 1 @@ -194,7 +195,7 @@ struct impl { bool do_disconnect:1; bool stopping; - struct spa_source *timer; + struct pw_timer timer; uint32_t reconnect_interval_ms; bool recovering; }; @@ -525,7 +526,7 @@ static void cleanup_streams(struct impl *impl) pw_stream_destroy(impl->stream); } -static void on_timer_event(void *data, uint64_t expirations) +static void on_timer_event(void *data) { struct impl *impl = data; cleanup_streams(impl); @@ -538,13 +539,9 @@ do_schedule_recovery(struct spa_loop *loop, { struct impl *impl = user_data; if (impl->reconnect_interval_ms > 0) { - struct timespec value; - uint64_t timestamp; - - timestamp = impl->reconnect_interval_ms * SPA_NSEC_PER_MSEC; - value.tv_sec = timestamp / SPA_NSEC_PER_SEC; - value.tv_nsec = timestamp % SPA_NSEC_PER_SEC; - pw_loop_update_timer(impl->main_loop, impl->timer, &value, NULL, false); + pw_timer_queue_add(impl->timer_queue, &impl->timer, + NULL, impl->reconnect_interval_ms * SPA_NSEC_PER_MSEC, + on_timer_event, impl); } else { if (impl->module) pw_impl_module_schedule_destroy(impl->module); @@ -1029,8 +1026,7 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->stream_props); pw_properties_free(impl->props); - if (impl->timer) - pw_loop_destroy_source(impl->main_loop, impl->timer); + pw_timer_queue_cancel(&impl->timer); free(impl->buffer); free(impl); @@ -1144,6 +1140,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; impl->main_loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); spa_ringbuffer_init(&impl->ring); impl->buffer = calloc(1, RINGBUFFER_SIZE); @@ -1165,13 +1162,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->reconnect_interval_ms = pw_properties_get_uint32(props, "reconnect.interval.ms", 0); - impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); - goto error; - } - impl->latency_msec = pw_properties_get_uint32(props, "pulse.latency", DEFAULT_LATENCY_MSEC); if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 5f464628c..11948807b 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -212,6 +212,7 @@ struct impl { struct pw_impl_module *module; struct pw_loop *loop; + struct pw_timer_queue *timer_queue; struct spa_hook module_listener; @@ -245,7 +246,7 @@ struct impl { uint16_t control_port; int control_fd; struct spa_source *control_source; - struct spa_source *feedback_timer; + struct pw_timer feedback_timer; uint16_t timing_port; int timing_fd; @@ -836,12 +837,16 @@ static int rtsp_send_volume(struct impl *impl) return rtsp_send(impl, "SET_PARAMETER", "text/parameters", header, rtsp_log_reply_status); } -static void rtsp_do_post_feedback(void *data, uint64_t expirations) +static void rtsp_do_post_feedback(void *data) { struct impl *impl = data; pw_rtsp_client_url_send(impl->rtsp, "/feedback", "POST", &impl->headers->dict, NULL, NULL, 0, rtsp_log_reply_status, impl); + + pw_timer_queue_add(impl->timer_queue, &impl->feedback_timer, + &impl->feedback_timer.timeout, 2 * SPA_NSEC_PER_SEC, + rtsp_do_post_feedback, impl); } static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) @@ -854,7 +859,6 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head struct impl *impl = data; const char *str; char progress[128]; - struct timespec timeout, interval; struct spa_process_latency_info process_latency; pw_log_info("record status: %d", status); @@ -866,16 +870,12 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head return 0; } - timeout.tv_sec = 2; - timeout.tv_nsec = 0; - interval.tv_sec = 2; - interval.tv_nsec = 0; - // feedback timer is only needed for auth_setup encryption if (impl->encryption == CRYPTO_FP_SAP25) { - if (!impl->feedback_timer) - impl->feedback_timer = pw_loop_add_timer(impl->loop, rtsp_do_post_feedback, impl); - pw_loop_update_timer(impl->loop, impl->feedback_timer, &timeout, &interval, false); + pw_timer_queue_cancel(&impl->feedback_timer); + pw_timer_queue_add(impl->timer_queue, &impl->feedback_timer, + NULL, 2 * SPA_NSEC_PER_SEC, + rtsp_do_post_feedback, impl); } if ((str = spa_dict_lookup(headers, "Audio-Latency")) != NULL) { @@ -1476,10 +1476,8 @@ static void connection_cleanup(struct impl *impl) close(impl->timing_fd); impl->timing_fd = -1; } - if (impl->feedback_timer != NULL) { - pw_loop_destroy_source(impl->loop, impl->feedback_timer); - impl->feedback_timer = NULL; - } + pw_timer_queue_cancel(&impl->feedback_timer); + free(impl->auth_method); impl->auth_method = NULL; free(impl->realm); @@ -1792,6 +1790,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; impl->loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); ip = pw_properties_get(props, "raop.ip"); port = pw_properties_get(props, "raop.port"); diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 4dd824211..0c00beb61 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -251,6 +251,7 @@ struct impl { struct pw_properties *props; struct pw_loop *loop; + struct pw_timer_queue *timer_queue; struct pw_impl_module *module; struct spa_hook module_listener; @@ -263,7 +264,7 @@ struct impl { struct pw_registry *registry; struct spa_hook registry_listener; - struct spa_source *timer; + struct pw_timer timer; char *ifname; uint32_t ttl; @@ -920,7 +921,7 @@ static int send_sap(struct impl *impl, struct session *sess, bool bye) return res; } -static void on_timer_event(void *data, uint64_t expirations) +static void on_timer_event(void *data) { struct impl *impl = data; struct session *sess, *tmp; @@ -954,6 +955,9 @@ static void on_timer_event(void *data, uint64_t expirations) } } + pw_timer_queue_add(impl->timer_queue, &impl->timer, + &impl->timer.timeout, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, + on_timer_event, impl); } static struct session *session_find(struct impl *impl, const struct sdp_info *info) @@ -1647,22 +1651,15 @@ on_sap_io(void *data, int fd, uint32_t mask) static int start_sap(struct impl *impl) { int fd = -1, res; - struct timespec value, interval; char addr[128] = "invalid"; pw_log_info("starting SAP timer"); - impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->timer, + NULL, SAP_INTERVAL_SEC * SPA_NSEC_PER_SEC, + on_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); goto error; } - value.tv_sec = 0; - value.tv_nsec = 1; - interval.tv_sec = SAP_INTERVAL_SEC; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); - if ((fd = make_recv_socket(&impl->sap_addr, impl->sap_len, impl->ifname)) < 0) return fd; @@ -1809,8 +1806,7 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->timer) - pw_loop_destroy_source(impl->loop, impl->timer); + pw_timer_queue_cancel(&impl->timer); if (impl->sap_source) pw_loop_destroy_source(impl->loop, impl->sap_source); @@ -1890,6 +1886,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index c77912fc5..2b69a0be7 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -204,7 +204,7 @@ struct session { #define SESSION_STATE_ESTABLISHED 4 int state; int ck_count; - uint64_t next_time; + struct pw_timer timer; uint32_t ctrl_initiator; uint32_t data_initiator; @@ -237,15 +237,13 @@ struct impl { struct pw_loop *loop; struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; struct pw_core *core; struct spa_hook core_listener; struct spa_hook core_proxy_listener; unsigned int do_disconnect:1; - struct spa_source *timer; - uint64_t next_time; - struct spa_source *ctrl_source; struct spa_source *data_source; @@ -276,34 +274,19 @@ static ssize_t send_packet(int fd, struct msghdr *msg) return n; } -static uint64_t current_time_ns(void) +static uint64_t current_time_ns(struct timespec *ts) { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return SPA_TIMESPEC_TO_NSEC(&ts); + clock_gettime(CLOCK_MONOTONIC, ts); + return SPA_TIMESPEC_TO_NSEC(ts); } -static void set_timeout(struct impl *impl, uint64_t time) -{ - struct itimerspec ts; - ts.it_value.tv_sec = time / SPA_NSEC_PER_SEC; - ts.it_value.tv_nsec = time % SPA_NSEC_PER_SEC; - ts.it_interval.tv_sec = 0; - ts.it_interval.tv_nsec = 0; - pw_loop_update_timer(impl->loop, impl->timer, &ts.it_value, &ts.it_interval, true); - impl->next_time = time; -} +static void send_apple_midi_cmd_ck0(struct session *sess); -static void schedule_timeout(struct impl *impl) +static void on_timer_event(void *data) { - struct session *sess; - uint64_t next_time = 0; - spa_list_for_each(sess, &impl->sessions, link) { - if (next_time == 0 || - (sess->next_time != 0 && sess->next_time < next_time)) - next_time = sess->next_time; - } - set_timeout(impl, next_time); + struct session *sess = data; + pw_log_debug("timeout"); + send_apple_midi_cmd_ck0(sess); } static void send_apple_midi_cmd_ck0(struct session *sess) @@ -312,14 +295,14 @@ static void send_apple_midi_cmd_ck0(struct session *sess) struct iovec iov[3]; struct msghdr msg; struct rtp_apple_midi_ck hdr; - uint64_t current_time, ts; + struct timespec now; + uint64_t timeout, ts; spa_zero(hdr); hdr.cmd = htonl(APPLE_MIDI_CMD_CK); hdr.ssrc = htonl(sess->ssrc); - current_time = current_time_ns(); - ts = current_time / 10000; + ts = current_time_ns(&now) / 10000; hdr.ts1_h = htonl(ts >> 32); hdr.ts1_l = htonl(ts); @@ -335,11 +318,15 @@ static void send_apple_midi_cmd_ck0(struct session *sess) send_packet(impl->data_source->fd, &msg); if (sess->ck_count++ < 8) - sess->next_time = current_time + SPA_NSEC_PER_SEC; + timeout = 1; else if (sess->ck_count++ < 16) - sess->next_time = current_time + 2 * SPA_NSEC_PER_SEC; + timeout = 2; else - sess->next_time = current_time + 5 * SPA_NSEC_PER_SEC; + timeout = 5; + + pw_timer_queue_add(impl->timer_queue, &sess->timer, + &now, timeout * SPA_NSEC_PER_SEC, + on_timer_event, sess); } static void session_update_state(struct session *sess, int state) @@ -355,12 +342,10 @@ static void session_update_state(struct session *sess, int state) if (sess->we_initiated) { sess->ck_count = 0; send_apple_midi_cmd_ck0(sess); - schedule_timeout(sess->impl); } break; case SESSION_STATE_INIT: - sess->next_time = 0; - schedule_timeout(sess->impl); + pw_timer_queue_cancel(&sess->timer); break; default: break; @@ -601,6 +586,7 @@ static void free_session(struct session *sess) pw_loop_locked(impl->data_loop, do_unlink_session, 1, NULL, 0, sess); sess->impl->n_sessions--; + pw_timer_queue_cancel(&sess->timer); if (sess->send) rtp_stream_destroy(sess->send); @@ -859,8 +845,9 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe struct msghdr msg; struct rtp_apple_midi_ck reply; struct session *sess; - uint64_t now, t1, t2, t3; + uint64_t ts, t1, t2, t3; uint32_t ssrc = ntohl(hdr->ssrc); + struct timespec now; sess = find_session_by_ssrc(impl, ssrc); if (sess == NULL) { @@ -870,7 +857,7 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe pw_log_trace("got CK count %d", hdr->count); - now = current_time_ns() / 10000; + ts = current_time_ns(&now) / 10000; reply = *hdr; reply.ssrc = htonl(sess->ssrc); reply.count++; @@ -882,11 +869,11 @@ static void parse_apple_midi_cmd_ck(struct impl *impl, bool ctrl, uint8_t *buffe switch (hdr->count) { case 0: - t2 = now; + t2 = ts; break; case 1: t2 = ((uint64_t)ntohl(hdr->ts2_h) << 32) | ntohl(hdr->ts2_l); - t3 = now; + t3 = ts; break; case 2: t3 = ((uint64_t)ntohl(hdr->ts3_h) << 32) | ntohl(hdr->ts3_l); @@ -1223,8 +1210,6 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->timer) - pw_loop_destroy_source(impl->loop, impl->timer); if (impl->ctrl_source) pw_loop_destroy_source(impl->loop, impl->ctrl_source); if (impl->data_source) @@ -1646,24 +1631,6 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda } } -static void on_timer_event(void *data, uint64_t expirations) -{ - struct impl *impl = data; - struct session *sess; - uint64_t current_time = impl->next_time; - - pw_log_debug("timeout"); - spa_list_for_each(sess, &impl->sessions, link) { - if (sess->state != SESSION_STATE_ESTABLISHED) - continue; - if (sess->next_time < current_time) - continue; - - send_apple_midi_cmd_ck0(sess); - } - schedule_timeout(impl); -} - static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -1681,7 +1648,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL, *stream_props = NULL; uint16_t port; const char *str; - struct timespec value, interval; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -1719,6 +1685,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->context = context; impl->loop = pw_context_get_main_loop(context); impl->data_loop = pw_context_acquire_loop(context, &props->dict); + impl->timer_queue = pw_context_get_timer_queue(context); pw_properties_set(props, PW_KEY_NODE_LOOP_NAME, impl->data_loop->name); @@ -1820,18 +1787,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - impl->timer = pw_loop_add_timer(impl->loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); - goto out; - } - value.tv_sec = 0; - value.tv_nsec = 1; - interval.tv_sec = 1; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); - if ((res = setup_apple_session(impl)) < 0) goto out; diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index cf497e354..63f1f399d 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -188,6 +188,7 @@ struct impl { struct pw_loop *main_loop; struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; struct pw_core *core; struct spa_hook core_listener; @@ -200,10 +201,10 @@ struct impl { bool always_process; uint32_t cleanup_interval; - struct spa_source *standby_timer; + struct pw_timer standby_timer; /* This timer is used when the first stream_start() call fails because * of an ENODEV error (see the stream_start() code for details) */ - struct spa_source *stream_start_retry_timer; + struct pw_timer stream_start_retry_timer; struct pw_properties *stream_props; struct rtp_stream *stream; @@ -216,7 +217,11 @@ struct impl { uint8_t *buffer; size_t buffer_size; - bool receiving; +#define STATE_IDLE 0 +#define STATE_PROBE 1 +#define STATE_RECEIVING 2 +#define STATE_STOPPING 3 + int state; bool may_pause; bool standby; bool waiting; @@ -269,9 +274,11 @@ on_rtp_io(void *data, int fd, uint32_t mask) goto receive_error; } - if (!impl->receiving) { - impl->receiving = true; - pw_loop_invoke(impl->main_loop, do_start, 1, NULL, 0, false, impl); + if (SPA_ATOMIC_LOAD(impl->state) != STATE_RECEIVING) { + if (!SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_RECEIVING)) { + if (SPA_ATOMIC_CAS(impl->state, STATE_IDLE, STATE_RECEIVING)) + pw_loop_invoke(impl->main_loop, do_start, 1, NULL, 0, false, impl); + } } } return; @@ -385,23 +392,6 @@ error: return res; } -static void stream_open_connection(void *data, int *result); - -static void on_open_connection_retry_timer_event(void *data, uint64_t expirations) -{ - struct impl *impl = data; - pw_log_debug("trying again to open connection after previous attempt failed with ENODEV"); - stream_open_connection(impl, NULL); -} - -static void destroy_stream_start_retry_timer(struct impl *impl) -{ - if (impl->stream_start_retry_timer != NULL) { - pw_loop_destroy_source(impl->main_loop, impl->stream_start_retry_timer); - impl->stream_start_retry_timer = NULL; - } -} - static void stream_report_error(void *data, const char *error) { struct impl *impl = data; @@ -411,6 +401,15 @@ static void stream_report_error(void *data, const char *error) } } +static void stream_open_connection(void *data, int *result); + +static void on_open_connection_retry_timer_event(void *data) +{ + struct impl *impl = data; + pw_log_debug("trying again to open connection after previous attempt failed with ENODEV"); + stream_open_connection(impl, NULL); +} + static void stream_open_connection(void *data, int *result) { int res = 0; @@ -438,21 +437,12 @@ static void stream_open_connection(void *data, int *result) pw_log_warn("failed to create socket because network device is not ready " "and present yet; will try again"); - if (impl->stream_start_retry_timer == NULL) { - struct timespec value, interval; - - impl->stream_start_retry_timer = pw_loop_add_timer(impl->main_loop, - on_open_connection_retry_timer_event, impl); - /* Use a 1-second retry interval. The network interfaces - * are likely to be up and running then. */ - value.tv_sec = 1; - value.tv_nsec = 0; - interval.tv_sec = 1; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->main_loop, impl->stream_start_retry_timer, &value, - &interval, false); - } - /* Do nothing if the timer is already up. */ + pw_timer_queue_cancel(&impl->stream_start_retry_timer); + /* Use a 1-second retry interval. The network interfaces + * are likely to be up and running then. */ + pw_timer_queue_add(impl->timer_queue, &impl->stream_start_retry_timer, + NULL, 1 * SPA_NSEC_PER_SEC, + on_open_connection_retry_timer_event, impl); /* It is important to return 0 in this case. Otherwise, the nonzero return * value will later be propagated through the core as an error. */ @@ -463,7 +453,7 @@ static void stream_open_connection(void *data, int *result) /* If ENODEV was returned earlier, and the stream_start_retry_timer * was consequently created, but then a non-ENODEV error occurred, * the timer must be stopped and removed. */ - destroy_stream_start_retry_timer(impl); + pw_timer_queue_cancel(&impl->stream_start_retry_timer); res = -errno; goto finish; } @@ -471,7 +461,7 @@ static void stream_open_connection(void *data, int *result) /* Cleanup the timer in case ENODEV occurred earlier, and this time, * the socket creation succeeded. */ - destroy_stream_start_retry_timer(impl); + pw_timer_queue_cancel(&impl->stream_start_retry_timer); impl->source = pw_loop_add_io(impl->data_loop, fd, SPA_IO_IN, true, on_rtp_io, impl); @@ -504,7 +494,7 @@ static void stream_close_connection(void *data, int *result) pw_log_info("stopping RTP listener"); - destroy_stream_start_retry_timer(impl); + pw_timer_queue_cancel(&impl->stream_start_retry_timer); pw_loop_destroy_source(impl->data_loop, impl->source); impl->source = NULL; @@ -586,14 +576,14 @@ static const struct rtp_stream_events stream_events = { .param_changed = stream_param_changed, }; -static void on_standby_timer_event(void *data, uint64_t expirations) +static void on_standby_timer_event(void *data) { struct impl *impl = data; - pw_log_debug("standby timer event; receiving: %d standby: %d waiting: %d", - impl->receiving, impl->standby, impl->waiting); + pw_log_debug("standby timer event; state: %d standby: %d waiting: %d", + impl->state, impl->standby, impl->waiting); - if (!impl->receiving) { + if (SPA_ATOMIC_CAS(impl->state, STATE_PROBE, STATE_STOPPING)) { if (!impl->standby) { struct spa_dict_item item[1]; @@ -608,10 +598,15 @@ static void on_standby_timer_event(void *data, uint64_t expirations) rtp_stream_set_active(impl->stream, false); } //pw_impl_module_schedule_destroy(impl->module); + SPA_ATOMIC_STORE(impl->state, STATE_IDLE); } else { pw_log_debug("timeout, keeping active RTP source"); + SPA_ATOMIC_CAS(impl->state, STATE_RECEIVING, STATE_PROBE); } - impl->receiving = false; + + pw_timer_queue_add(impl->timer_queue, &impl->standby_timer, + &impl->standby_timer.timeout, impl->cleanup_interval * SPA_NSEC_PER_SEC, + on_standby_timer_event, impl); } static void core_destroy(void *d) @@ -636,10 +631,8 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->standby_timer) - pw_loop_destroy_source(impl->main_loop, impl->standby_timer); - - destroy_stream_start_retry_timer(impl); + pw_timer_queue_cancel(&impl->standby_timer); + pw_timer_queue_cancel(&impl->stream_start_retry_timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -695,7 +688,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; const char *str, *sess_name; - struct timespec value, interval; struct pw_properties *props, *stream_props; int64_t ts_offset; char addr[128]; @@ -722,6 +714,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; impl->main_loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); impl->data_loop = pw_context_acquire_loop(context, &props->dict); impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; @@ -830,17 +823,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - impl->standby_timer = pw_loop_add_timer(impl->main_loop, on_standby_timer_event, impl); - if (impl->standby_timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->standby_timer, + NULL, impl->cleanup_interval * SPA_NSEC_PER_SEC, + on_standby_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); goto out; } - value.tv_sec = impl->cleanup_interval; - value.tv_nsec = 0; - interval.tv_sec = impl->cleanup_interval; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->main_loop, impl->standby_timer, &value, &interval, false); impl->stream = rtp_stream_new(impl->core, PW_DIRECTION_OUTPUT, pw_properties_copy(stream_props), diff --git a/src/modules/module-vban-recv.c b/src/modules/module-vban-recv.c index 53bad5252..26dc7b507 100644 --- a/src/modules/module-vban-recv.c +++ b/src/modules/module-vban-recv.c @@ -168,6 +168,7 @@ struct impl { struct pw_loop *main_loop; struct pw_loop *data_loop; + struct pw_timer_queue *timer_queue; struct pw_core *core; struct spa_hook core_listener; @@ -180,7 +181,7 @@ struct impl { struct pw_properties *stream_props; - struct spa_source *timer; + struct pw_timer timer; uint16_t src_port; struct sockaddr_storage src_addr; @@ -562,7 +563,7 @@ static void destroy_stream(struct stream *s) free(s); } -static void on_timer_event(void *data, uint64_t expirations) +static void on_timer_event(void *data) { struct impl *impl = data; struct stream *s; @@ -576,6 +577,9 @@ static void on_timer_event(void *data, uint64_t expirations) } s->receiving = false; } + pw_timer_queue_add(impl->timer_queue, &impl->timer, + &impl->timer.timeout, impl->cleanup_interval * SPA_NSEC_PER_SEC, + on_timer_event, impl); } static void core_destroy(void *d) @@ -602,8 +606,7 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - if (impl->timer) - pw_loop_destroy_source(impl->main_loop, impl->timer); + pw_timer_queue_cancel(&impl->timer); if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); @@ -658,7 +661,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; const char *str; - struct timespec value, interval; struct pw_properties *props, *stream_props; int res = 0; @@ -682,6 +684,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; impl->main_loop = pw_context_get_main_loop(context); + impl->timer_queue = pw_context_get_timer_queue(context); impl->data_loop = pw_context_acquire_loop(context, &props->dict); spa_list_init(&impl->streams); @@ -747,17 +750,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - impl->timer = pw_loop_add_timer(impl->main_loop, on_timer_event, impl); - if (impl->timer == NULL) { - res = -errno; - pw_log_error("can't create timer source: %m"); + if ((res = pw_timer_queue_add(impl->timer_queue, &impl->timer, + NULL, impl->cleanup_interval * SPA_NSEC_PER_SEC, + on_timer_event, impl)) < 0) { + pw_log_error("can't add timer: %s", spa_strerror(res)); goto out; } - value.tv_sec = impl->cleanup_interval; - value.tv_nsec = 0; - interval.tv_sec = impl->cleanup_interval; - interval.tv_nsec = 0; - pw_loop_update_timer(impl->main_loop, impl->timer, &value, &interval, false); if ((res = listen_start(impl)) < 0) { pw_log_error("failed to start VBAN stream: %s", spa_strerror(res));