raop: More improvements

Handle control source io. We don't yet implement retransmission,
though.
Handle latency by exposing the server latency on the stream ports.
Use the Format param event to connect/announce and teardown.
Use the stream state to record/flush.
Fix some leaks.
Remove the state from rtsp-client, we don't need it.
Strip whitespace from header values.
This commit is contained in:
Wim Taymans 2021-11-12 16:25:23 +01:00
parent 27bc5e9ae5
commit 7ed9c790e2
3 changed files with 287 additions and 89 deletions

View file

@ -52,6 +52,7 @@
#include <spa/pod/builder.h> #include <spa/pod/builder.h>
#include <spa/param/audio/format-utils.h> #include <spa/param/audio/format-utils.h>
#include <spa/param/audio/raw.h> #include <spa/param/audio/raw.h>
#include <spa/param/latency-utils.h>
#include <pipewire/impl.h> #include <pipewire/impl.h>
#include <pipewire/i18n.h> #include <pipewire/i18n.h>
@ -144,7 +145,7 @@ struct impl {
struct spa_hook rtsp_listener; struct spa_hook rtsp_listener;
struct pw_properties *headers; struct pw_properties *headers;
char *session_id; char session_id[32];
unsigned int do_disconnect:1; unsigned int do_disconnect:1;
unsigned int unloading:1; unsigned int unloading:1;
@ -155,14 +156,18 @@ struct impl {
uint16_t control_port; uint16_t control_port;
int control_fd; int control_fd;
struct spa_source *control_source;
uint16_t timing_port; uint16_t timing_port;
int timing_fd; int timing_fd;
struct spa_source *timing_source; struct spa_source *timing_source;
uint16_t server_port; uint16_t server_port;
int server_fd; int server_fd;
uint32_t block_size; uint32_t block_size;
uint32_t delay; uint32_t delay;
uint32_t latency;
uint16_t seq; uint16_t seq;
uint32_t rtptime; uint32_t rtptime;
@ -170,6 +175,8 @@ struct impl {
uint32_t sync; uint32_t sync;
uint32_t sync_period; uint32_t sync_period;
unsigned int first:1; unsigned int first:1;
unsigned int connected:1;
unsigned int ready:1;
unsigned int recording:1; unsigned int recording:1;
uint8_t buffer[FRAMES_PER_TCP_PACKET * 4]; uint8_t buffer[FRAMES_PER_TCP_PACKET * 4];
@ -197,23 +204,6 @@ static void stream_destroy(void *d)
impl->stream = NULL; impl->stream = NULL;
} }
static void stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct impl *impl = d;
switch (state) {
case PW_STREAM_STATE_ERROR:
case PW_STREAM_STATE_UNCONNECTED:
unload_module(impl);
break;
case PW_STREAM_STATE_PAUSED:
case PW_STREAM_STATE_STREAMING:
break;
default:
break;
}
}
static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len) static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len)
{ {
int lb, rb; int lb, rb;
@ -473,13 +463,6 @@ static void playback_stream_process(void *d)
pw_stream_queue_buffer(impl->stream, buf); pw_stream_queue_buffer(impl->stream, buf);
} }
static const struct pw_stream_events playback_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = stream_state_changed,
.process = playback_stream_process
};
static int create_udp_socket(struct impl *impl, uint16_t *port) static int create_udp_socket(struct impl *impl, uint16_t *port)
{ {
int res, ip_version, fd, val, i, af; int res, ip_version, fd, val, i, af;
@ -591,38 +574,6 @@ error:
return res; return res;
} }
static void rtsp_record_reply(void *data, int status, const struct spa_dict *headers)
{
struct impl *impl = data;
pw_log_info("reply %d", status);
impl->first = true;
impl->sync = 0;
impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size);
impl->recording = true;
}
static int rtsp_do_record(struct impl *impl)
{
int res;
pw_getrandom(&impl->seq, sizeof(impl->seq), 0);
pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0);
pw_properties_set(impl->headers, "Range", "npt=0-");
pw_properties_setf(impl->headers, "RTP-Info",
"seq=%u;rtptime=%u", impl->seq, impl->rtptime);
res = pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict,
NULL, NULL, rtsp_record_reply, impl);
pw_properties_set(impl->headers, "Range", NULL);
pw_properties_set(impl->headers, "RTP-Info", NULL);
return res;
}
static void static void
on_timing_source_io(void *data, int fd, uint32_t mask) on_timing_source_io(void *data, int fd, uint32_t mask)
{ {
@ -648,6 +599,122 @@ on_timing_source_io(void *data, int fd, uint32_t mask)
} }
} }
static void
on_control_source_io(void *data, int fd, uint32_t mask)
{
struct impl *impl = data;
uint32_t packet[2];
ssize_t bytes;
if (mask & SPA_IO_IN) {
uint32_t hdr;
uint16_t seq, num;
bytes = read(impl->timing_fd, packet, sizeof(packet));
if (bytes != sizeof(packet)) {
pw_log_warn("discarding short (%zd < %zd) control packet",
bytes, sizeof(bytes));
return;
}
hdr = ntohl(packet[0]);
if ((hdr & 0xff000000) != 0x80000000)
return;
seq = ntohl(packet[1]) >> 16;
num = ntohl(packet[1]) & 0xffff;
if (num == 0)
return;
switch (hdr >> 16 & 0xff) {
case 0xd5:
pw_log_info("retransmit request seq:%u num:%u", seq, num);
/* retransmit request */
break;
}
}
}
static void rtsp_flush_reply(void *data, int status, const struct spa_dict *headers)
{
pw_log_info("reply %d", status);
}
static int rtsp_do_flush(struct impl *impl)
{
int res;
if (!impl->recording)
return 0;
pw_properties_set(impl->headers, "Range", "npt=0-");
pw_properties_setf(impl->headers, "RTP-Info",
"seq=%u;rtptime=%u", impl->seq, impl->rtptime);
impl->recording = false;
res = pw_rtsp_client_send(impl->rtsp, "FLUSH", &impl->headers->dict,
NULL, NULL, rtsp_flush_reply, impl);
pw_properties_set(impl->headers, "Range", NULL);
pw_properties_set(impl->headers, "RTP-Info", NULL);
return res;
}
static void rtsp_record_reply(void *data, int status, const struct spa_dict *headers)
{
struct impl *impl = data;
const char *str;
uint32_t n_params;
const struct spa_pod *params[2];
uint8_t buffer[1024];
struct spa_pod_builder b;
struct spa_latency_info latency;
pw_log_info("reply %d", status);
if ((str = spa_dict_lookup(headers, "Audio-Latency")) != NULL) {
if (!spa_atou32(str, &impl->latency, 0))
impl->latency = 0;
}
spa_zero(latency);
latency.direction = PW_DIRECTION_INPUT;
latency.min_rate = latency.max_rate = impl->latency;
n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer));
params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
pw_stream_update_params(impl->stream, params, n_params);
impl->first = true;
impl->sync = 0;
impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size);
impl->recording = true;
}
static int rtsp_do_record(struct impl *impl)
{
int res;
if (!impl->ready || impl->recording)
return 0;
pw_properties_set(impl->headers, "Range", "npt=0-");
pw_properties_setf(impl->headers, "RTP-Info",
"seq=%u;rtptime=%u", impl->seq, impl->rtptime);
res = pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict,
NULL, NULL, rtsp_record_reply, impl);
pw_properties_set(impl->headers, "Range", NULL);
pw_properties_set(impl->headers, "RTP-Info", NULL);
return res;
}
static void rtsp_setup_reply(void *data, int status, const struct spa_dict *headers) static void rtsp_setup_reply(void *data, int status, const struct spa_dict *headers)
{ {
struct impl *impl = data; struct impl *impl = data;
@ -710,11 +777,20 @@ static void rtsp_setup_reply(void *data, int status, const struct spa_dict *head
impl->timing_source = pw_loop_add_io(impl->loop, impl->timing_fd, impl->timing_source = pw_loop_add_io(impl->loop, impl->timing_fd,
SPA_IO_IN, false, on_timing_source_io, impl); SPA_IO_IN, false, on_timing_source_io, impl);
impl->control_source = pw_loop_add_io(impl->loop, impl->control_fd,
SPA_IO_IN, false, on_control_source_io, impl);
break; break;
default: default:
return; return;
} }
rtsp_do_record(impl);
pw_getrandom(&impl->seq, sizeof(impl->seq), 0);
pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0);
impl->ready = true;
if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING)
rtsp_do_record(impl);
} }
static int rtsp_do_setup(struct impl *impl) static int rtsp_do_setup(struct impl *impl)
@ -854,7 +930,7 @@ static int rtsp_do_announce(struct impl *impl)
uint8_t rsakey[512]; uint8_t rsakey[512];
char key[512*2]; char key[512*2];
char iv[16*2]; char iv[16*2];
int frames, i, ip_version; int res, frames, i, ip_version;
char *sdp; char *sdp;
char local_ip[256]; char local_ip[256];
@ -909,8 +985,11 @@ static int rtsp_do_announce(struct impl *impl)
default: default:
return -ENOTSUP; return -ENOTSUP;
} }
return pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict, res = pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict,
"application/sdp", sdp, rtsp_announce_reply, impl); "application/sdp", sdp, rtsp_announce_reply, impl);
free(sdp);
return res;
} }
@ -931,6 +1010,8 @@ static void rtsp_connected(void *data)
pw_log_info("connected"); pw_log_info("connected");
impl->connected = true;
pw_getrandom(sci, sizeof(sci), 0); pw_getrandom(sci, sizeof(sci), 0);
pw_properties_setf(impl->headers, "Client-Instance", pw_properties_setf(impl->headers, "Client-Instance",
"%08x%08x", sci[0], sci[1]); "%08x%08x", sci[0], sci[1]);
@ -943,21 +1024,49 @@ static void rtsp_connected(void *data)
NULL, NULL, rtsp_options_reply, impl); NULL, NULL, rtsp_options_reply, impl);
} }
static void connection_cleanup(struct impl *impl)
{
impl->ready = false;
if (impl->server_fd != -1) {
close(impl->server_fd);
impl->server_fd = -1;
}
if (impl->control_fd != -1) {
close(impl->control_fd);
impl->control_fd = -1;
}
if (impl->timing_fd != -1) {
close(impl->timing_fd);
impl->timing_fd = -1;
}
if (impl->timing_source != NULL) {
pw_loop_destroy_source(impl->loop, impl->timing_source);
impl->timing_source = NULL;
}
if (impl->control_source != NULL) {
pw_loop_destroy_source(impl->loop, impl->control_source);
impl->control_source = NULL;
}
}
static void rtsp_disconnected(void *data) static void rtsp_disconnected(void *data)
{ {
struct impl *impl = data;
pw_log_info("disconnected"); pw_log_info("disconnected");
impl->connected = false;
connection_cleanup(impl);
} }
static void rtsp_error(void *data, int res) static void rtsp_error(void *data, int res)
{ {
pw_log_info("error %d", res); pw_log_error("error %d", res);
} }
static void rtsp_message(void *data, int status, int state, static void rtsp_message(void *data, int status,
const struct spa_dict *headers) const struct spa_dict *headers)
{ {
const struct spa_dict_item *it; const struct spa_dict_item *it;
pw_log_info("message %d %d", status, state); pw_log_info("message %d", status);
spa_dict_for_each(it, headers) spa_dict_for_each(it, headers)
pw_log_info(" %s: %s", it->key, it->value); pw_log_info(" %s: %s", it->key, it->value);
@ -971,6 +1080,96 @@ static const struct pw_rtsp_client_events rtsp_events = {
.message = rtsp_message, .message = rtsp_message,
}; };
static void stream_state_changed(void *d, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
{
struct impl *impl = d;
switch (state) {
case PW_STREAM_STATE_ERROR:
case PW_STREAM_STATE_UNCONNECTED:
unload_module(impl);
break;
case PW_STREAM_STATE_PAUSED:
rtsp_do_flush(impl);
break;
case PW_STREAM_STATE_STREAMING:
rtsp_do_record(impl);
break;
default:
break;
}
}
static int rtsp_do_connect(struct impl *impl)
{
const char *hostname, *port;
uint32_t session_id;
if (impl->connected) {
if (!impl->ready)
return rtsp_do_announce(impl);
return 0;
}
hostname = pw_properties_get(impl->props, "raop.hostname");
port = pw_properties_get(impl->props, "raop.port");
if (hostname == NULL || port == NULL)
return -EINVAL;
pw_getrandom(&session_id, sizeof(session_id), 0);
spa_scnprintf(impl->session_id, sizeof(impl->session_id), "%u", session_id);
return pw_rtsp_client_connect(impl->rtsp, hostname, atoi(port), impl->session_id);
}
static void rtsp_teardown_reply(void *data, int status, const struct spa_dict *headers)
{
struct impl *impl = data;
const char *str;
pw_log_info("reply");
connection_cleanup(impl);
if ((str = spa_dict_lookup(headers, "Connection")) != NULL) {
if (spa_streq(str, "close"))
pw_rtsp_client_disconnect(impl->rtsp);
}
}
static int rtsp_do_teardown(struct impl *impl)
{
if (!impl->ready)
return 0;
return pw_rtsp_client_send(impl->rtsp, "TEARDOWN", NULL,
NULL, NULL, rtsp_teardown_reply, impl);
}
static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
struct impl *impl = data;
switch (id) {
case SPA_PARAM_Format:
if (param == NULL)
rtsp_do_teardown(impl);
else
rtsp_do_connect(impl);
break;
default:
break;
}
}
static const struct pw_stream_events playback_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = stream_state_changed,
.param_changed = stream_param_changed,
.process = playback_stream_process
};
static int create_stream(struct impl *impl) static int create_stream(struct impl *impl)
{ {
int res; int res;
@ -978,8 +1177,6 @@ static int create_stream(struct impl *impl)
const struct spa_pod *params[1]; const struct spa_pod *params[1];
uint8_t buffer[1024]; uint8_t buffer[1024];
struct spa_pod_builder b; struct spa_pod_builder b;
const char *hostname, *port;
uint32_t session_id;
impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props); impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props);
impl->stream_props = NULL; impl->stream_props = NULL;
@ -1013,16 +1210,6 @@ static int create_stream(struct impl *impl)
pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener,
&rtsp_events, impl); &rtsp_events, impl);
hostname = pw_properties_get(impl->props, "raop.hostname");
port = pw_properties_get(impl->props, "raop.port");
if (hostname == NULL || port == NULL)
return -EINVAL;
pw_getrandom(&session_id, sizeof(session_id), 0);
asprintf(&impl->session_id, "%u", session_id);
pw_rtsp_client_connect(impl->rtsp, hostname, atoi(port), impl->session_id);
return 0; return 0;
} }
@ -1061,6 +1248,10 @@ static void impl_destroy(struct impl *impl)
if (impl->core && impl->do_disconnect) if (impl->core && impl->do_disconnect)
pw_core_disconnect(impl->core); pw_core_disconnect(impl->core);
if (impl->rtsp)
pw_rtsp_client_destroy(impl->rtsp);
pw_properties_free(impl->headers);
pw_properties_free(impl->stream_props); pw_properties_free(impl->stream_props);
pw_properties_free(impl->props); pw_properties_free(impl->props);
@ -1198,6 +1389,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
return -errno; return -errno;
pw_log_debug("module %p: new %s", impl, args); pw_log_debug("module %p: new %s", impl, args);
impl->server_fd = -1;
impl->control_fd = -1;
impl->timing_fd = -1;
if (args == NULL) if (args == NULL)
args = ""; args = "";

View file

@ -32,12 +32,6 @@
#include "rtsp-client.h" #include "rtsp-client.h"
enum {
STATE_INIT = 0,
STATE_CONNECTING,
STATE_CONNECTED,
};
#define pw_rtsp_client_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_rtsp_client_events, m, v, ##__VA_ARGS__) #define pw_rtsp_client_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_rtsp_client_events, m, v, ##__VA_ARGS__)
#define pw_rtsp_client_emit_destroy(c) pw_rtsp_client_emit(c, destroy, 0) #define pw_rtsp_client_emit_destroy(c) pw_rtsp_client_emit(c, destroy, 0)
#define pw_rtsp_client_emit_connected(c) pw_rtsp_client_emit(c, connected, 0) #define pw_rtsp_client_emit_connected(c) pw_rtsp_client_emit(c, connected, 0)
@ -70,8 +64,8 @@ struct pw_rtsp_client {
struct sockaddr_in6 in6; struct sockaddr_in6 in6;
} local_addr; } local_addr;
int state;
struct spa_source *source; struct spa_source *source;
unsigned int connecting:1;
unsigned int need_flush:1; unsigned int need_flush:1;
unsigned int wait_status:1; unsigned int wait_status:1;
@ -103,7 +97,6 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop,
client->props = props; client->props = props;
if (user_data_size > 0) if (user_data_size > 0)
client->user_data = SPA_PTROFF(client, sizeof(*client), void); client->user_data = SPA_PTROFF(client, sizeof(*client), void);
client->state = STATE_INIT;
spa_list_init(&client->messages); spa_list_init(&client->messages);
spa_list_init(&client->pending); spa_list_init(&client->pending);
@ -117,6 +110,12 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop,
void pw_rtsp_client_destroy(struct pw_rtsp_client *client) void pw_rtsp_client_destroy(struct pw_rtsp_client *client)
{ {
pw_log_info("destroy client %p", client);
pw_rtsp_client_emit_destroy(client);
pw_rtsp_client_disconnect(client);
pw_properties_free(client->headers);
pw_properties_free(client->props);
spa_hook_list_clean(&client->listener_list); spa_hook_list_clean(&client->listener_list);
free(client); free(client);
} }
@ -186,7 +185,7 @@ static int handle_connect(struct pw_rtsp_client *client, int fd)
pw_log_info("connected local ip %s", local_ip); pw_log_info("connected local ip %s", local_ip);
client->state = STATE_CONNECTED; client->connecting = false;
client->wait_status = true; client->wait_status = true;
pw_rtsp_client_emit_connected(client); pw_rtsp_client_emit_connected(client);
@ -283,7 +282,7 @@ static int process_input(struct pw_rtsp_client *client)
free(msg); free(msg);
} else { } else {
pw_rtsp_client_emit_message(client, client->status, pw_rtsp_client_emit_message(client, client->status,
client->state, &client->headers->dict); &client->headers->dict);
} }
client->wait_status = true; client->wait_status = true;
} else { } else {
@ -294,6 +293,8 @@ static int process_input(struct pw_rtsp_client *client)
if (value == NULL) if (value == NULL)
goto error; goto error;
*value++ = '\0'; *value++ = '\0';
while (*value == ' ')
value++;
pw_properties_set(client->headers, key, value); pw_properties_set(client->headers, key, value);
} }
} }
@ -364,7 +365,7 @@ on_source_io(void *data, int fd, uint32_t mask)
goto error; goto error;
} }
if (mask & SPA_IO_OUT || client->need_flush) { if (mask & SPA_IO_OUT || client->need_flush) {
if (client->state == STATE_CONNECTING) { if (client->connecting) {
if ((res = handle_connect(client, fd)) < 0) if ((res = handle_connect(client, fd)) < 0)
goto error; goto error;
} }
@ -437,7 +438,7 @@ int pw_rtsp_client_connect(struct pw_rtsp_client *client,
close(fd); close(fd);
return -errno; return -errno;
} }
client->state = STATE_CONNECTING; client->connecting = true;
free(client->session_id); free(client->session_id);
client->session_id = strdup(session_id); client->session_id = strdup(session_id);
pw_log_info("%p: connecting", client); pw_log_info("%p: connecting", client);
@ -452,7 +453,10 @@ int pw_rtsp_client_disconnect(struct pw_rtsp_client *client)
pw_loop_destroy_source(client->loop, client->source); pw_loop_destroy_source(client->loop, client->source);
client->source = NULL; client->source = NULL;
client->state = STATE_INIT; free(client->url);
client->url = NULL;
free(client->session_id);
client->session_id = NULL;
pw_rtsp_client_emit_disconnected(client); pw_rtsp_client_emit_disconnected(client);
return 0; return 0;
} }

View file

@ -45,7 +45,7 @@ struct pw_rtsp_client_events {
void (*error) (void *data, int res); void (*error) (void *data, int res);
void (*disconnected) (void *data); void (*disconnected) (void *data);
void (*message) (void *data, int status, int state, void (*message) (void *data, int status,
const struct spa_dict *headers); const struct spa_dict *headers);
}; };