mirror of
https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
synced 2025-11-05 13:29:57 -05:00
raop: Add autoreconnect feature
This patch adds the autoreconnect feature to the raop module. This is mainly to be used in a server context, but can be used also in a desktop usage context. With autoreconnect feature, the raop module behaves like this: - At initialisation or in case of the RTSP TCP connection lost, it tries to reconnect every 5 seconds - In case of any fatal error, it tries to reconnect every 5 seconds - In UDP mode, if no timing packets received anymore for a long time, RTSP connection is closed, then it tries to reconnect.. - After reconnection, once RTSP session has been established again, playing is resumed automatically. - When the connection is not established yet (or loss), the sink behaves like a null sink. In the source code I called it "autonull", even if autonull is set to autoreconnect param value, it could be split into two different params.
This commit is contained in:
parent
46dd3be8ce
commit
4854524058
6 changed files with 293 additions and 65 deletions
|
|
@ -62,6 +62,7 @@ static const char* const valid_modargs[] = {
|
||||||
"username",
|
"username",
|
||||||
"password",
|
"password",
|
||||||
"latency_msec",
|
"latency_msec",
|
||||||
|
"autoreconnect",
|
||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,7 @@ struct pa_raop_client {
|
||||||
pa_rtsp_client *rtsp;
|
pa_rtsp_client *rtsp;
|
||||||
char *sci, *sid;
|
char *sci, *sid;
|
||||||
char *password;
|
char *password;
|
||||||
|
bool autoreconnect;
|
||||||
|
|
||||||
pa_raop_protocol_t protocol;
|
pa_raop_protocol_t protocol;
|
||||||
pa_raop_encryption_t encryption;
|
pa_raop_encryption_t encryption;
|
||||||
|
|
@ -1379,8 +1380,39 @@ static void rtsp_auth_cb(pa_rtsp_client *rtsp, pa_rtsp_state_t state, pa_rtsp_st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void pa_raop_client_disconnect(pa_raop_client *c) {
|
||||||
|
c->is_recording = false;
|
||||||
|
|
||||||
|
if (c->tcp_sfd >= 0)
|
||||||
|
pa_close(c->tcp_sfd);
|
||||||
|
c->tcp_sfd = -1;
|
||||||
|
|
||||||
|
if (c->udp_sfd >= 0)
|
||||||
|
pa_close(c->udp_sfd);
|
||||||
|
c->udp_sfd = -1;
|
||||||
|
|
||||||
|
/* Polling sockets will be closed by sink */
|
||||||
|
c->udp_cfd = c->udp_tfd = -1;
|
||||||
|
c->tcp_sfd = -1;
|
||||||
|
|
||||||
|
pa_log_error("RTSP control channel closed (disconnected)");
|
||||||
|
|
||||||
|
if (c->rtsp)
|
||||||
|
pa_rtsp_client_free(c->rtsp);
|
||||||
|
if (c->sid)
|
||||||
|
pa_xfree(c->sid);
|
||||||
|
c->rtsp = NULL;
|
||||||
|
c->sid = NULL;
|
||||||
|
|
||||||
|
if (c->state_callback)
|
||||||
|
c->state_callback((int) PA_RAOP_DISCONNECTED, c->state_userdata);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol,
|
pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol,
|
||||||
pa_raop_encryption_t encryption, pa_raop_codec_t codec) {
|
pa_raop_encryption_t encryption, pa_raop_codec_t codec, bool autoreconnect) {
|
||||||
pa_raop_client *c;
|
pa_raop_client *c;
|
||||||
|
|
||||||
pa_parsed_address a;
|
pa_parsed_address a;
|
||||||
|
|
@ -1408,6 +1440,7 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_prot
|
||||||
c->rtsp = NULL;
|
c->rtsp = NULL;
|
||||||
c->sci = c->sid = NULL;
|
c->sci = c->sid = NULL;
|
||||||
c->password = NULL;
|
c->password = NULL;
|
||||||
|
c->autoreconnect = autoreconnect;
|
||||||
|
|
||||||
c->protocol = protocol;
|
c->protocol = protocol;
|
||||||
c->encryption = encryption;
|
c->encryption = encryption;
|
||||||
|
|
@ -1473,7 +1506,7 @@ int pa_raop_client_authenticate (pa_raop_client *c, const char *password) {
|
||||||
c->password = NULL;
|
c->password = NULL;
|
||||||
if (password)
|
if (password)
|
||||||
c->password = pa_xstrdup(password);
|
c->password = pa_xstrdup(password);
|
||||||
c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT);
|
c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT, c->autoreconnect);
|
||||||
|
|
||||||
pa_assert(c->rtsp);
|
pa_assert(c->rtsp);
|
||||||
|
|
||||||
|
|
@ -1502,7 +1535,7 @@ int pa_raop_client_announce(pa_raop_client *c) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT);
|
c->rtsp = pa_rtsp_client_new(c->core->mainloop, c->host, c->port, DEFAULT_USER_AGENT, c->autoreconnect);
|
||||||
|
|
||||||
pa_assert(c->rtsp);
|
pa_assert(c->rtsp);
|
||||||
|
|
||||||
|
|
@ -1545,7 +1578,6 @@ bool pa_raop_client_can_stream(pa_raop_client *c) {
|
||||||
pa_assert(c);
|
pa_assert(c);
|
||||||
|
|
||||||
if (!c->rtsp || !c->sci) {
|
if (!c->rtsp || !c->sci) {
|
||||||
pa_log_debug("Can't stream, connection not established yet...");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1729,6 +1761,10 @@ bool pa_raop_client_register_pollfd(pa_raop_client *c, pa_rtpoll *poll, pa_rtpol
|
||||||
return oob;
|
return oob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool pa_raop_client_is_timing_fd(pa_raop_client *c, const int fd) {
|
||||||
|
return fd == c->udp_tfd;
|
||||||
|
}
|
||||||
|
|
||||||
pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume) {
|
pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume) {
|
||||||
double minv, maxv;
|
double minv, maxv;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ typedef enum pa_raop_state {
|
||||||
} pa_raop_state_t;
|
} pa_raop_state_t;
|
||||||
|
|
||||||
pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol,
|
pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_raop_protocol_t protocol,
|
||||||
pa_raop_encryption_t encryption, pa_raop_codec_t codec);
|
pa_raop_encryption_t encryption, pa_raop_codec_t codec, bool autoreconnect);
|
||||||
void pa_raop_client_free(pa_raop_client *c);
|
void pa_raop_client_free(pa_raop_client *c);
|
||||||
|
|
||||||
int pa_raop_client_authenticate(pa_raop_client *c, const char *password);
|
int pa_raop_client_authenticate(pa_raop_client *c, const char *password);
|
||||||
|
|
@ -71,9 +71,11 @@ int pa_raop_client_stream(pa_raop_client *c);
|
||||||
int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume);
|
int pa_raop_client_set_volume(pa_raop_client *c, pa_volume_t volume);
|
||||||
int pa_raop_client_flush(pa_raop_client *c);
|
int pa_raop_client_flush(pa_raop_client *c);
|
||||||
int pa_raop_client_teardown(pa_raop_client *c);
|
int pa_raop_client_teardown(pa_raop_client *c);
|
||||||
|
void pa_raop_client_disconnect(pa_raop_client *c);
|
||||||
|
|
||||||
void pa_raop_client_get_frames_per_block(pa_raop_client *c, size_t *size);
|
void pa_raop_client_get_frames_per_block(pa_raop_client *c, size_t *size);
|
||||||
bool pa_raop_client_register_pollfd(pa_raop_client *c, pa_rtpoll *poll, pa_rtpoll_item **poll_item);
|
bool pa_raop_client_register_pollfd(pa_raop_client *c, pa_rtpoll *poll, pa_rtpoll_item **poll_item);
|
||||||
|
bool pa_raop_client_is_timing_fd(pa_raop_client *c, const int fd);
|
||||||
pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume);
|
pa_volume_t pa_raop_client_adjust_volume(pa_raop_client *c, pa_volume_t volume);
|
||||||
void pa_raop_client_handle_oob_packet(pa_raop_client *c, const int fd, const uint8_t packet[], ssize_t size);
|
void pa_raop_client_handle_oob_packet(pa_raop_client *c, const int fd, const uint8_t packet[], ssize_t size);
|
||||||
ssize_t pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, size_t offset);
|
ssize_t pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, size_t offset);
|
||||||
|
|
|
||||||
|
|
@ -59,12 +59,16 @@
|
||||||
#include <pulsecore/thread-mq.h>
|
#include <pulsecore/thread-mq.h>
|
||||||
#include <pulsecore/poll.h>
|
#include <pulsecore/poll.h>
|
||||||
#include <pulsecore/rtpoll.h>
|
#include <pulsecore/rtpoll.h>
|
||||||
|
#include <pulsecore/core-rtclock.h>
|
||||||
#include <pulsecore/time-smoother.h>
|
#include <pulsecore/time-smoother.h>
|
||||||
|
|
||||||
#include "raop-sink.h"
|
#include "raop-sink.h"
|
||||||
#include "raop-client.h"
|
#include "raop-client.h"
|
||||||
#include "raop-util.h"
|
#include "raop-util.h"
|
||||||
|
|
||||||
|
#define UDP_TIMING_PACKET_LOSS_MAX (30 * PA_USEC_PER_SEC)
|
||||||
|
#define UDP_TIMING_PACKET_DISCONNECT_CYCLE 3
|
||||||
|
|
||||||
struct userdata {
|
struct userdata {
|
||||||
pa_core *core;
|
pa_core *core;
|
||||||
pa_module *module;
|
pa_module *module;
|
||||||
|
|
@ -78,11 +82,16 @@ struct userdata {
|
||||||
bool oob;
|
bool oob;
|
||||||
|
|
||||||
pa_raop_client *raop;
|
pa_raop_client *raop;
|
||||||
|
char *server;
|
||||||
pa_raop_protocol_t protocol;
|
pa_raop_protocol_t protocol;
|
||||||
pa_raop_encryption_t encryption;
|
pa_raop_encryption_t encryption;
|
||||||
pa_raop_codec_t codec;
|
pa_raop_codec_t codec;
|
||||||
|
bool autoreconnect;
|
||||||
|
/* if true, behaves like a null-sink when disconnected */
|
||||||
|
bool autonull;
|
||||||
|
|
||||||
size_t block_size;
|
size_t block_size;
|
||||||
|
pa_usec_t block_usec;
|
||||||
pa_memchunk memchunk;
|
pa_memchunk memchunk;
|
||||||
|
|
||||||
pa_usec_t delay;
|
pa_usec_t delay;
|
||||||
|
|
@ -91,10 +100,13 @@ struct userdata {
|
||||||
uint64_t write_count;
|
uint64_t write_count;
|
||||||
|
|
||||||
uint32_t latency;
|
uint32_t latency;
|
||||||
|
/* Consider as first I/O thread iteration, can be switched to true in autoreconnect mode */
|
||||||
|
bool first;
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX
|
PA_SINK_MESSAGE_SET_RAOP_STATE = PA_SINK_MESSAGE_MAX,
|
||||||
|
PA_SINK_MESSAGE_DISCONNECT_REQUEST
|
||||||
};
|
};
|
||||||
|
|
||||||
static void userdata_free(struct userdata *u);
|
static void userdata_free(struct userdata *u);
|
||||||
|
|
@ -136,10 +148,23 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
|
||||||
pa_assert(u->raop);
|
pa_assert(u->raop);
|
||||||
|
|
||||||
switch (code) {
|
switch (code) {
|
||||||
|
/* Exception : for this message, we are in main thread, msg sent from the IO/thread
|
||||||
|
Done here, as alloc/free of rtsp_client is also done in this thread for other cases */
|
||||||
|
case PA_SINK_MESSAGE_DISCONNECT_REQUEST: {
|
||||||
|
if (u->sink->state == PA_SINK_RUNNING) {
|
||||||
|
/* Disconnect raop client, and restart the whole chain since
|
||||||
|
* the authentication token might be outdated */
|
||||||
|
pa_raop_client_disconnect(u->raop);
|
||||||
|
pa_raop_client_authenticate(u->raop, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
case PA_SINK_MESSAGE_GET_LATENCY: {
|
case PA_SINK_MESSAGE_GET_LATENCY: {
|
||||||
int64_t r = 0;
|
int64_t r = 0;
|
||||||
|
|
||||||
if (pa_raop_client_can_stream(u->raop))
|
if (u->autonull || pa_raop_client_can_stream(u->raop))
|
||||||
r = sink_get_latency(u);
|
r = sink_get_latency(u);
|
||||||
|
|
||||||
*((int64_t*) data) = r;
|
*((int64_t*) data) = r;
|
||||||
|
|
@ -154,6 +179,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
|
||||||
pa_module_unload_request(u->module, true);
|
pa_module_unload_request(u->module, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (u->autoreconnect && u->sink->state == PA_SINK_RUNNING) {
|
||||||
|
pa_usec_t now;
|
||||||
|
now = pa_rtclock_now();
|
||||||
|
pa_smoother_reset(u->smoother, now, false);
|
||||||
|
|
||||||
|
if (!pa_raop_client_is_alive(u->raop)) {
|
||||||
|
/* Connecting will trigger a RECORD and start steaming */
|
||||||
|
pa_raop_client_announce(u->raop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -171,6 +207,7 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
|
||||||
now = pa_rtclock_now();
|
now = pa_rtclock_now();
|
||||||
u->write_count = 0;
|
u->write_count = 0;
|
||||||
u->start = now;
|
u->start = now;
|
||||||
|
u->first = true;
|
||||||
pa_rtpoll_set_timer_absolute(u->rtpoll, now);
|
pa_rtpoll_set_timer_absolute(u->rtpoll, now);
|
||||||
|
|
||||||
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
|
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
|
||||||
|
|
@ -205,10 +242,22 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
|
||||||
u->rtpoll_item = NULL;
|
u->rtpoll_item = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (u->sink->thread_info.state == PA_SINK_SUSPENDED)
|
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
|
||||||
pa_rtpoll_set_timer_disabled(u->rtpoll);
|
pa_rtpoll_set_timer_disabled(u->rtpoll);
|
||||||
else if (u->sink->thread_info.state != PA_SINK_IDLE)
|
|
||||||
pa_module_unload_request(u->module, true);
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (u->autoreconnect) {
|
||||||
|
if (u->sink->thread_info.state != PA_SINK_IDLE) {
|
||||||
|
if (!u->autonull)
|
||||||
|
pa_rtpoll_set_timer_disabled(u->rtpoll);
|
||||||
|
pa_raop_client_authenticate(u->raop, NULL);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (u->sink->thread_info.state != PA_SINK_IDLE)
|
||||||
|
pa_module_unload_request(u->module, true);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -265,8 +314,17 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state,
|
||||||
now = pa_rtclock_now();
|
now = pa_rtclock_now();
|
||||||
pa_smoother_reset(u->smoother, now, false);
|
pa_smoother_reset(u->smoother, now, false);
|
||||||
|
|
||||||
|
/* If autonull is enabled, I/O thread is always eating chunks since
|
||||||
|
* it is emulating a null sink */
|
||||||
|
if (u->autonull) {
|
||||||
|
u->start = now;
|
||||||
|
u->write_count = 0;
|
||||||
|
u->first = true;
|
||||||
|
pa_rtpoll_set_timer_absolute(u->rtpoll, now);
|
||||||
|
}
|
||||||
|
|
||||||
if (!pa_raop_client_is_alive(u->raop)) {
|
if (!pa_raop_client_is_alive(u->raop)) {
|
||||||
/* Connecting will trigger a RECORD and start steaming */
|
/* Connecting will trigger a RECORD and start streaming */
|
||||||
pa_raop_client_announce(u->raop);
|
pa_raop_client_announce(u->raop);
|
||||||
} else if (!pa_raop_client_is_recording(u->raop)) {
|
} else if (!pa_raop_client_is_recording(u->raop)) {
|
||||||
/* RECORD alredy sent, simply start streaming */
|
/* RECORD alredy sent, simply start streaming */
|
||||||
|
|
@ -342,6 +400,8 @@ static void sink_set_mute_cb(pa_sink *s) {
|
||||||
static void thread_func(void *userdata) {
|
static void thread_func(void *userdata) {
|
||||||
struct userdata *u = userdata;
|
struct userdata *u = userdata;
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
|
pa_usec_t last_timing;
|
||||||
|
uint32_t check_timing_count;
|
||||||
|
|
||||||
pa_assert(u);
|
pa_assert(u);
|
||||||
|
|
||||||
|
|
@ -357,6 +417,7 @@ static void thread_func(void *userdata) {
|
||||||
uint64_t position;
|
uint64_t position;
|
||||||
size_t index;
|
size_t index;
|
||||||
int ret;
|
int ret;
|
||||||
|
bool canstream, sendstream, on_timeout;
|
||||||
|
|
||||||
/* Polling (audio data + control socket + timing socket). */
|
/* Polling (audio data + control socket + timing socket). */
|
||||||
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
|
if ((ret = pa_rtpoll_run(u->rtpoll)) < 0)
|
||||||
|
|
@ -369,6 +430,7 @@ static void thread_func(void *userdata) {
|
||||||
pa_sink_process_rewind(u->sink, 0);
|
pa_sink_process_rewind(u->sink, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
on_timeout = pa_rtpoll_timer_elapsed(u->rtpoll);
|
||||||
if (u->rtpoll_item) {
|
if (u->rtpoll_item) {
|
||||||
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
|
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, &nbfds);
|
||||||
/* If !oob: streaming driven by pollds (POLLOUT) */
|
/* If !oob: streaming driven by pollds (POLLOUT) */
|
||||||
|
|
@ -384,12 +446,19 @@ static void thread_func(void *userdata) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if oob: streaming managed by timing, pollfd for oob sockets */
|
/* if oob: streaming managed by timing, pollfd for oob sockets */
|
||||||
if (pollfd && u->oob && !pa_rtpoll_timer_elapsed(u->rtpoll)) {
|
if (pollfd && u->oob && !on_timeout) {
|
||||||
uint8_t packet[32];
|
uint8_t packet[32];
|
||||||
ssize_t read;
|
ssize_t read;
|
||||||
|
|
||||||
for (i = 0; i < nbfds; i++) {
|
for (i = 0; i < nbfds; i++) {
|
||||||
if (pollfd->revents & POLLERR) {
|
if (pollfd->revents & POLLERR) {
|
||||||
|
if (u->autoreconnect && pa_raop_client_is_alive(u->raop)) {
|
||||||
|
pollfd->revents = 0;
|
||||||
|
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
|
||||||
|
PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/* one of UDP fds is in faulty state, may have been disconnected, this is fatal */
|
/* one of UDP fds is in faulty state, may have been disconnected, this is fatal */
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
@ -397,6 +466,10 @@ static void thread_func(void *userdata) {
|
||||||
pollfd->revents = 0;
|
pollfd->revents = 0;
|
||||||
read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
|
read = pa_read(pollfd->fd, packet, sizeof(packet), NULL);
|
||||||
pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
|
pa_raop_client_handle_oob_packet(u->raop, pollfd->fd, packet, read);
|
||||||
|
if (pa_raop_client_is_timing_fd(u->raop, pollfd->fd)) {
|
||||||
|
last_timing = pa_rtclock_now();
|
||||||
|
check_timing_count = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pollfd++;
|
pollfd++;
|
||||||
|
|
@ -406,65 +479,133 @@ static void thread_func(void *userdata) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (u->sink->thread_info.state != PA_SINK_RUNNING)
|
if (u->sink->thread_info.state != PA_SINK_RUNNING) {
|
||||||
continue;
|
continue;
|
||||||
if (!pa_raop_client_can_stream(u->raop))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
/* This assertion is meant to silence a complaint from Coverity about
|
|
||||||
* pollfd being possibly NULL when we access it later. That's a false
|
|
||||||
* positive, because we check pa_raop_client_can_stream() above, and if
|
|
||||||
* that returns true, it means that the connection is up, and when the
|
|
||||||
* connection is up, pollfd will be non-NULL. */
|
|
||||||
pa_assert(pollfd);
|
|
||||||
|
|
||||||
if (u->memchunk.length <= 0) {
|
|
||||||
if (u->memchunk.memblock)
|
|
||||||
pa_memblock_unref(u->memchunk.memblock);
|
|
||||||
pa_memchunk_reset(&u->memchunk);
|
|
||||||
|
|
||||||
/* Grab unencoded audio data from PulseAudio */
|
|
||||||
pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
|
|
||||||
offset = u->memchunk.index;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pa_assert(u->memchunk.length > 0);
|
if (u->first) {
|
||||||
|
last_timing = 0;
|
||||||
|
check_timing_count = 1;
|
||||||
|
intvl = 0;
|
||||||
|
u->first = false;
|
||||||
|
}
|
||||||
|
|
||||||
index = u->memchunk.index;
|
canstream = pa_raop_client_can_stream(u->raop);
|
||||||
if (pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
|
now = pa_rtclock_now();
|
||||||
if (errno == EINTR) {
|
|
||||||
/* Just try again. */
|
if (u->oob && u->autoreconnect && on_timeout) {
|
||||||
pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
|
if (!canstream) {
|
||||||
goto fail;
|
last_timing = 0;
|
||||||
} else if (errno != EAGAIN && !u->oob) {
|
} else if (last_timing != 0) {
|
||||||
/* Buffer is full, wait for POLLOUT. */
|
pa_usec_t since = now - last_timing;
|
||||||
pollfd->events = POLLOUT;
|
/* Incoming Timing packets should be received every 3 seconds in UDP mode
|
||||||
pollfd->revents = 0;
|
according to raop specifications.
|
||||||
} else {
|
Here we disconnect if no packet received since UDP_TIMING_PACKET_LOSS_MAX seconds
|
||||||
pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
|
We only detect timing packet requests interruptions (we do nothing if no packet received at all), since some clients do not implement RTCP Timing requests at all */
|
||||||
goto fail;
|
|
||||||
|
if (since > (UDP_TIMING_PACKET_LOSS_MAX/UDP_TIMING_PACKET_DISCONNECT_CYCLE)*check_timing_count) {
|
||||||
|
if (check_timing_count < UDP_TIMING_PACKET_DISCONNECT_CYCLE) {
|
||||||
|
uint32_t since_in_sec = since / PA_USEC_PER_SEC;
|
||||||
|
pa_log_warn(
|
||||||
|
"UDP Timing Packets Warn #%d/%d- Nothing received since %d seconds from %s",
|
||||||
|
check_timing_count,
|
||||||
|
UDP_TIMING_PACKET_DISCONNECT_CYCLE-1, since_in_sec, u->server);
|
||||||
|
check_timing_count++;
|
||||||
|
} else {
|
||||||
|
/* Limit reached, then request disconnect */
|
||||||
|
check_timing_count = 1;
|
||||||
|
last_timing = 0;
|
||||||
|
if (pa_raop_client_is_alive(u->raop)) {
|
||||||
|
pa_log_warn("UDP Timing Packets Warn limit reached - Requesting reconnect");
|
||||||
|
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink),
|
||||||
|
PA_SINK_MESSAGE_DISCONNECT_REQUEST, 0, 0, NULL, NULL);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
|
|
||||||
position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
|
|
||||||
|
|
||||||
now = pa_rtclock_now();
|
if (!u->autonull) {
|
||||||
estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
|
if (!canstream) {
|
||||||
pa_smoother_put(u->smoother, now, estimated);
|
pa_log_debug("Can't stream, connection not established yet...");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
/* This assertion is meant to silence a complaint from Coverity about
|
||||||
|
* pollfd being possibly NULL when we access it later. That's a false
|
||||||
|
* positive, because we check pa_raop_client_can_stream() above, and if
|
||||||
|
* that returns true, it means that the connection is up, and when the
|
||||||
|
* connection is up, pollfd will be non-NULL. */
|
||||||
|
pa_assert(pollfd);
|
||||||
|
}
|
||||||
|
|
||||||
if (u->oob && !pollfd->revents) {
|
if (u->memchunk.length <= 0) {
|
||||||
/* Sleep until next packet transmission */
|
if (intvl < now + u->block_usec) {
|
||||||
intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
|
if (u->memchunk.memblock)
|
||||||
pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
|
pa_memblock_unref(u->memchunk.memblock);
|
||||||
} else if (!u->oob) {
|
pa_memchunk_reset(&u->memchunk);
|
||||||
if (u->memchunk.length > 0) {
|
|
||||||
pollfd->events = POLLOUT;
|
/* Grab unencoded audio data from PulseAudio */
|
||||||
pollfd->revents = 0;
|
pa_sink_render_full(u->sink, u->block_size, &u->memchunk);
|
||||||
|
offset = u->memchunk.index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (u->memchunk.length > 0) {
|
||||||
|
index = u->memchunk.index;
|
||||||
|
sendstream = !u->autonull || (u->autonull && canstream);
|
||||||
|
if (sendstream && pa_raop_client_send_audio_packet(u->raop, &u->memchunk, offset) < 0) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
/* Just try again. */
|
||||||
|
pa_log_debug("Failed to write data to FIFO (EINTR), retrying");
|
||||||
|
if (u->autoreconnect) {
|
||||||
|
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
|
||||||
|
0, 0, NULL, NULL);
|
||||||
|
continue;
|
||||||
|
} else
|
||||||
|
goto fail;
|
||||||
|
} else if (errno != EAGAIN && !u->oob) {
|
||||||
|
/* Buffer is full, wait for POLLOUT. */
|
||||||
|
if (!u->oob) {
|
||||||
|
pollfd->events = POLLOUT;
|
||||||
|
pollfd->revents = 0;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
pa_log("Failed to write data to FIFO: %s", pa_cstrerror(errno));
|
||||||
|
if (u->autoreconnect) {
|
||||||
|
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->sink), PA_SINK_MESSAGE_DISCONNECT_REQUEST,
|
||||||
|
0, 0, NULL, NULL);
|
||||||
|
continue;
|
||||||
|
} else
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (sendstream) {
|
||||||
|
u->write_count += (uint64_t) u->memchunk.index - (uint64_t) index;
|
||||||
|
} else {
|
||||||
|
u->write_count += u->memchunk.length;
|
||||||
|
u->memchunk.length = 0;
|
||||||
|
}
|
||||||
|
position = u->write_count - pa_usec_to_bytes(u->delay, &u->sink->sample_spec);
|
||||||
|
|
||||||
|
now = pa_rtclock_now();
|
||||||
|
estimated = pa_bytes_to_usec(position, &u->sink->sample_spec);
|
||||||
|
pa_smoother_put(u->smoother, now, estimated);
|
||||||
|
|
||||||
|
if ((u->autonull && !canstream) || (u->oob && canstream && on_timeout)) {
|
||||||
|
/* Sleep until next packet transmission */
|
||||||
intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
|
intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
|
||||||
pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
|
pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
|
||||||
pollfd->revents = 0;
|
} else if (!u->oob) {
|
||||||
pollfd->events = 0;
|
if (u->memchunk.length > 0) {
|
||||||
|
pollfd->events = POLLOUT;
|
||||||
|
pollfd->revents = 0;
|
||||||
|
} else {
|
||||||
|
intvl = u->start + pa_bytes_to_usec(u->write_count, &u->sink->sample_spec);
|
||||||
|
pa_rtpoll_set_timer_absolute(u->rtpoll, intvl);
|
||||||
|
pollfd->revents = 0;
|
||||||
|
pollfd->events = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -592,6 +733,15 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
|
||||||
u->rtpoll = pa_rtpoll_new();
|
u->rtpoll = pa_rtpoll_new();
|
||||||
u->rtpoll_item = NULL;
|
u->rtpoll_item = NULL;
|
||||||
u->latency = RAOP_DEFAULT_LATENCY;
|
u->latency = RAOP_DEFAULT_LATENCY;
|
||||||
|
u->autoreconnect = false;
|
||||||
|
u->server = pa_xstrdup(server);
|
||||||
|
|
||||||
|
if (pa_modargs_get_value_boolean(ma, "autoreconnect", &u->autoreconnect) < 0) {
|
||||||
|
pa_log("Failed to parse autoreconnect argument");
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
/* Linked for now, potentially ready for additional parameter */
|
||||||
|
u->autonull = u->autoreconnect;
|
||||||
|
|
||||||
if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
|
if (pa_modargs_get_value_u32(ma, "latency_msec", &u->latency) < 0) {
|
||||||
pa_log("Failed to parse latency_msec argument");
|
pa_log("Failed to parse latency_msec argument");
|
||||||
|
|
@ -723,7 +873,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
|
||||||
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
|
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
|
||||||
pa_sink_set_rtpoll(u->sink, u->rtpoll);
|
pa_sink_set_rtpoll(u->sink, u->rtpoll);
|
||||||
|
|
||||||
u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec);
|
u->raop = pa_raop_client_new(u->core, server, u->protocol, u->encryption, u->codec, u->autoreconnect);
|
||||||
|
|
||||||
if (!(u->raop)) {
|
if (!(u->raop)) {
|
||||||
pa_log("Failed to create RAOP client object");
|
pa_log("Failed to create RAOP client object");
|
||||||
|
|
@ -734,6 +884,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) {
|
||||||
pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
|
pa_raop_client_get_frames_per_block(u->raop, &u->block_size);
|
||||||
u->block_size *= pa_frame_size(&ss);
|
u->block_size *= pa_frame_size(&ss);
|
||||||
pa_sink_set_max_request(u->sink, u->block_size);
|
pa_sink_set_max_request(u->sink, u->block_size);
|
||||||
|
u->block_usec = pa_bytes_to_usec(u->block_size, &u->sink->sample_spec);
|
||||||
|
|
||||||
pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
|
pa_raop_client_set_state_callback(u->raop, raop_state_cb, u);
|
||||||
|
|
||||||
|
|
@ -799,6 +950,8 @@ static void userdata_free(struct userdata *u) {
|
||||||
|
|
||||||
if (u->card)
|
if (u->card)
|
||||||
pa_card_free(u->card);
|
pa_card_free(u->card);
|
||||||
|
if (u->server)
|
||||||
|
pa_xfree(u->server);
|
||||||
|
|
||||||
pa_xfree(u);
|
pa_xfree(u);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,8 @@
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <sys/ioctl.h>
|
#include <sys/ioctl.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
|
#include <pulse/rtclock.h>
|
||||||
|
#include <pulse/timeval.h>
|
||||||
|
|
||||||
#ifdef HAVE_SYS_FILIO_H
|
#ifdef HAVE_SYS_FILIO_H
|
||||||
#include <sys/filio.h>
|
#include <sys/filio.h>
|
||||||
|
|
@ -42,9 +44,12 @@
|
||||||
#include <pulsecore/ioline.h>
|
#include <pulsecore/ioline.h>
|
||||||
#include <pulsecore/arpa-inet.h>
|
#include <pulsecore/arpa-inet.h>
|
||||||
#include <pulsecore/random.h>
|
#include <pulsecore/random.h>
|
||||||
|
#include <pulsecore/core-rtclock.h>
|
||||||
|
|
||||||
#include "rtsp_client.h"
|
#include "rtsp_client.h"
|
||||||
|
|
||||||
|
#define RECONNECT_INTERVAL (5 * PA_USEC_PER_SEC)
|
||||||
|
|
||||||
struct pa_rtsp_client {
|
struct pa_rtsp_client {
|
||||||
pa_mainloop_api *mainloop;
|
pa_mainloop_api *mainloop;
|
||||||
char *hostname;
|
char *hostname;
|
||||||
|
|
@ -73,9 +78,11 @@ struct pa_rtsp_client {
|
||||||
uint32_t cseq;
|
uint32_t cseq;
|
||||||
char *session;
|
char *session;
|
||||||
char *transport;
|
char *transport;
|
||||||
|
pa_time_event *reconnect_event;
|
||||||
|
bool autoreconnect;
|
||||||
};
|
};
|
||||||
|
|
||||||
pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent) {
|
pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect) {
|
||||||
pa_rtsp_client *c;
|
pa_rtsp_client *c;
|
||||||
|
|
||||||
pa_assert(mainloop);
|
pa_assert(mainloop);
|
||||||
|
|
@ -93,12 +100,23 @@ pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostna
|
||||||
else
|
else
|
||||||
c->useragent = "PulseAudio RTSP Client";
|
c->useragent = "PulseAudio RTSP Client";
|
||||||
|
|
||||||
|
c->autoreconnect = autoreconnect;
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void free_events(pa_rtsp_client *c) {
|
||||||
|
pa_assert(c);
|
||||||
|
|
||||||
|
if (c->reconnect_event) {
|
||||||
|
c->mainloop->time_free(c->reconnect_event);
|
||||||
|
c->reconnect_event = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void pa_rtsp_client_free(pa_rtsp_client *c) {
|
void pa_rtsp_client_free(pa_rtsp_client *c) {
|
||||||
pa_assert(c);
|
pa_assert(c);
|
||||||
|
|
||||||
|
free_events(c);
|
||||||
if (c->sc)
|
if (c->sc)
|
||||||
pa_socket_client_unref(c->sc);
|
pa_socket_client_unref(c->sc);
|
||||||
|
|
||||||
|
|
@ -293,6 +311,13 @@ static void line_callback(pa_ioline *line, const char *s, void *userdata) {
|
||||||
pa_xfree(s2);
|
pa_xfree(s2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void reconnect_cb(pa_mainloop_api *a, pa_time_event *e, const struct timeval *t, void *userdata) {
|
||||||
|
if (userdata) {
|
||||||
|
pa_rtsp_client *c = userdata;
|
||||||
|
pa_rtsp_connect(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
|
static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata) {
|
||||||
pa_rtsp_client *c = userdata;
|
pa_rtsp_client *c = userdata;
|
||||||
union {
|
union {
|
||||||
|
|
@ -310,7 +335,18 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata
|
||||||
c->sc = NULL;
|
c->sc = NULL;
|
||||||
|
|
||||||
if (!io) {
|
if (!io) {
|
||||||
pa_log("Connection failed: %s", pa_cstrerror(errno));
|
if (c->autoreconnect) {
|
||||||
|
struct timeval tv;
|
||||||
|
|
||||||
|
pa_log_warn("Connection to server %s:%d failed: %s - will try later", c->hostname, c->port, pa_cstrerror(errno));
|
||||||
|
|
||||||
|
if (!c->reconnect_event)
|
||||||
|
c->reconnect_event = c->mainloop->time_new(c->mainloop, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true), reconnect_cb, c);
|
||||||
|
else
|
||||||
|
c->mainloop->time_restart(c->reconnect_event, pa_timeval_rtstore(&tv, pa_rtclock_now() + RECONNECT_INTERVAL, true));
|
||||||
|
} else {
|
||||||
|
pa_log("Connection to server %s:%d failed: %s", c->hostname, c->port, pa_cstrerror(errno));
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pa_assert(!c->ioline);
|
pa_assert(!c->ioline);
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ typedef enum pa_rtsp_status {
|
||||||
|
|
||||||
typedef void (*pa_rtsp_cb_t)(pa_rtsp_client *c, pa_rtsp_state_t state, pa_rtsp_status_t code, pa_headerlist *headers, void *userdata);
|
typedef void (*pa_rtsp_cb_t)(pa_rtsp_client *c, pa_rtsp_state_t state, pa_rtsp_status_t code, pa_headerlist *headers, void *userdata);
|
||||||
|
|
||||||
pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent);
|
pa_rtsp_client* pa_rtsp_client_new(pa_mainloop_api *mainloop, const char *hostname, uint16_t port, const char *useragent, bool autoreconnect);
|
||||||
void pa_rtsp_client_free(pa_rtsp_client *c);
|
void pa_rtsp_client_free(pa_rtsp_client *c);
|
||||||
|
|
||||||
int pa_rtsp_connect(pa_rtsp_client *c);
|
int pa_rtsp_connect(pa_rtsp_client *c);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue