diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 128b6eec9..cafdade7d 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -52,6 +52,7 @@ #include #include #include +#include #include #include @@ -144,7 +145,7 @@ struct impl { struct spa_hook rtsp_listener; struct pw_properties *headers; - char *session_id; + char session_id[32]; unsigned int do_disconnect:1; unsigned int unloading:1; @@ -155,14 +156,18 @@ struct impl { uint16_t control_port; int control_fd; + struct spa_source *control_source; + uint16_t timing_port; int timing_fd; struct spa_source *timing_source; + uint16_t server_port; int server_fd; uint32_t block_size; uint32_t delay; + uint32_t latency; uint16_t seq; uint32_t rtptime; @@ -170,6 +175,8 @@ struct impl { uint32_t sync; uint32_t sync_period; unsigned int first:1; + unsigned int connected:1; + unsigned int ready:1; unsigned int recording:1; uint8_t buffer[FRAMES_PER_TCP_PACKET * 4]; @@ -197,23 +204,6 @@ static void stream_destroy(void *d) impl->stream = NULL; } -static void stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct impl *impl = d; - switch (state) { - case PW_STREAM_STATE_ERROR: - case PW_STREAM_STATE_UNCONNECTED: - unload_module(impl); - break; - case PW_STREAM_STATE_PAUSED: - case PW_STREAM_STATE_STREAMING: - break; - default: - break; - } -} - static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len) { int lb, rb; @@ -473,13 +463,6 @@ static void playback_stream_process(void *d) pw_stream_queue_buffer(impl->stream, buf); } -static const struct pw_stream_events playback_stream_events = { - PW_VERSION_STREAM_EVENTS, - .destroy = stream_destroy, - .state_changed = stream_state_changed, - .process = playback_stream_process -}; - static int create_udp_socket(struct impl *impl, uint16_t *port) { int res, ip_version, fd, val, i, af; @@ -591,38 +574,6 @@ error: return res; } -static void rtsp_record_reply(void *data, int status, const struct spa_dict *headers) -{ - struct impl *impl = data; - - pw_log_info("reply %d", status); - - impl->first = true; - impl->sync = 0; - impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); - impl->recording = true; -} - -static int rtsp_do_record(struct impl *impl) -{ - int res; - - pw_getrandom(&impl->seq, sizeof(impl->seq), 0); - pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0); - - pw_properties_set(impl->headers, "Range", "npt=0-"); - pw_properties_setf(impl->headers, "RTP-Info", - "seq=%u;rtptime=%u", impl->seq, impl->rtptime); - - res = pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict, - NULL, NULL, rtsp_record_reply, impl); - - pw_properties_set(impl->headers, "Range", NULL); - pw_properties_set(impl->headers, "RTP-Info", NULL); - - return res; -} - static void on_timing_source_io(void *data, int fd, uint32_t mask) { @@ -648,6 +599,122 @@ on_timing_source_io(void *data, int fd, uint32_t mask) } } + +static void +on_control_source_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + uint32_t packet[2]; + ssize_t bytes; + + if (mask & SPA_IO_IN) { + uint32_t hdr; + uint16_t seq, num; + + bytes = read(impl->timing_fd, packet, sizeof(packet)); + if (bytes != sizeof(packet)) { + pw_log_warn("discarding short (%zd < %zd) control packet", + bytes, sizeof(bytes)); + return; + } + hdr = ntohl(packet[0]); + if ((hdr & 0xff000000) != 0x80000000) + return; + + seq = ntohl(packet[1]) >> 16; + num = ntohl(packet[1]) & 0xffff; + if (num == 0) + return; + + switch (hdr >> 16 & 0xff) { + case 0xd5: + pw_log_info("retransmit request seq:%u num:%u", seq, num); + /* retransmit request */ + break; + } + } +} + +static void rtsp_flush_reply(void *data, int status, const struct spa_dict *headers) +{ + pw_log_info("reply %d", status); +} + +static int rtsp_do_flush(struct impl *impl) +{ + int res; + + if (!impl->recording) + return 0; + + pw_properties_set(impl->headers, "Range", "npt=0-"); + pw_properties_setf(impl->headers, "RTP-Info", + "seq=%u;rtptime=%u", impl->seq, impl->rtptime); + + impl->recording = false; + + res = pw_rtsp_client_send(impl->rtsp, "FLUSH", &impl->headers->dict, + NULL, NULL, rtsp_flush_reply, impl); + + pw_properties_set(impl->headers, "Range", NULL); + pw_properties_set(impl->headers, "RTP-Info", NULL); + + return res; +} + +static void rtsp_record_reply(void *data, int status, const struct spa_dict *headers) +{ + struct impl *impl = data; + const char *str; + uint32_t n_params; + const struct spa_pod *params[2]; + uint8_t buffer[1024]; + struct spa_pod_builder b; + struct spa_latency_info latency; + + pw_log_info("reply %d", status); + + if ((str = spa_dict_lookup(headers, "Audio-Latency")) != NULL) { + if (!spa_atou32(str, &impl->latency, 0)) + impl->latency = 0; + } + + spa_zero(latency); + latency.direction = PW_DIRECTION_INPUT; + latency.min_rate = latency.max_rate = impl->latency; + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency); + + pw_stream_update_params(impl->stream, params, n_params); + + impl->first = true; + impl->sync = 0; + impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); + impl->recording = true; +} + +static int rtsp_do_record(struct impl *impl) +{ + int res; + + if (!impl->ready || impl->recording) + return 0; + + pw_properties_set(impl->headers, "Range", "npt=0-"); + pw_properties_setf(impl->headers, "RTP-Info", + "seq=%u;rtptime=%u", impl->seq, impl->rtptime); + + res = pw_rtsp_client_send(impl->rtsp, "RECORD", &impl->headers->dict, + NULL, NULL, rtsp_record_reply, impl); + + pw_properties_set(impl->headers, "Range", NULL); + pw_properties_set(impl->headers, "RTP-Info", NULL); + + return res; +} + static void rtsp_setup_reply(void *data, int status, const struct spa_dict *headers) { struct impl *impl = data; @@ -710,11 +777,20 @@ static void rtsp_setup_reply(void *data, int status, const struct spa_dict *head impl->timing_source = pw_loop_add_io(impl->loop, impl->timing_fd, SPA_IO_IN, false, on_timing_source_io, impl); + impl->control_source = pw_loop_add_io(impl->loop, impl->control_fd, + SPA_IO_IN, false, on_control_source_io, impl); break; default: return; } - rtsp_do_record(impl); + + pw_getrandom(&impl->seq, sizeof(impl->seq), 0); + pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0); + + impl->ready = true; + + if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) + rtsp_do_record(impl); } static int rtsp_do_setup(struct impl *impl) @@ -854,7 +930,7 @@ static int rtsp_do_announce(struct impl *impl) uint8_t rsakey[512]; char key[512*2]; char iv[16*2]; - int frames, i, ip_version; + int res, frames, i, ip_version; char *sdp; char local_ip[256]; @@ -909,8 +985,11 @@ static int rtsp_do_announce(struct impl *impl) default: return -ENOTSUP; } - return pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict, + res = pw_rtsp_client_send(impl->rtsp, "ANNOUNCE", &impl->headers->dict, "application/sdp", sdp, rtsp_announce_reply, impl); + free(sdp); + + return res; } @@ -931,6 +1010,8 @@ static void rtsp_connected(void *data) pw_log_info("connected"); + impl->connected = true; + pw_getrandom(sci, sizeof(sci), 0); pw_properties_setf(impl->headers, "Client-Instance", "%08x%08x", sci[0], sci[1]); @@ -943,21 +1024,49 @@ static void rtsp_connected(void *data) NULL, NULL, rtsp_options_reply, impl); } +static void connection_cleanup(struct impl *impl) +{ + impl->ready = false; + if (impl->server_fd != -1) { + close(impl->server_fd); + impl->server_fd = -1; + } + if (impl->control_fd != -1) { + close(impl->control_fd); + impl->control_fd = -1; + } + if (impl->timing_fd != -1) { + close(impl->timing_fd); + impl->timing_fd = -1; + } + if (impl->timing_source != NULL) { + pw_loop_destroy_source(impl->loop, impl->timing_source); + impl->timing_source = NULL; + } + if (impl->control_source != NULL) { + pw_loop_destroy_source(impl->loop, impl->control_source); + impl->control_source = NULL; + } +} + static void rtsp_disconnected(void *data) { + struct impl *impl = data; pw_log_info("disconnected"); + impl->connected = false; + connection_cleanup(impl); } static void rtsp_error(void *data, int res) { - pw_log_info("error %d", res); + pw_log_error("error %d", res); } -static void rtsp_message(void *data, int status, int state, +static void rtsp_message(void *data, int status, const struct spa_dict *headers) { const struct spa_dict_item *it; - pw_log_info("message %d %d", status, state); + pw_log_info("message %d", status); spa_dict_for_each(it, headers) pw_log_info(" %s: %s", it->key, it->value); @@ -971,6 +1080,96 @@ static const struct pw_rtsp_client_events rtsp_events = { .message = rtsp_message, }; +static void stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct impl *impl = d; + switch (state) { + case PW_STREAM_STATE_ERROR: + case PW_STREAM_STATE_UNCONNECTED: + unload_module(impl); + break; + case PW_STREAM_STATE_PAUSED: + rtsp_do_flush(impl); + break; + case PW_STREAM_STATE_STREAMING: + rtsp_do_record(impl); + break; + default: + break; + } +} + +static int rtsp_do_connect(struct impl *impl) +{ + const char *hostname, *port; + uint32_t session_id; + + if (impl->connected) { + if (!impl->ready) + return rtsp_do_announce(impl); + return 0; + } + + hostname = pw_properties_get(impl->props, "raop.hostname"); + port = pw_properties_get(impl->props, "raop.port"); + if (hostname == NULL || port == NULL) + return -EINVAL; + + pw_getrandom(&session_id, sizeof(session_id), 0); + spa_scnprintf(impl->session_id, sizeof(impl->session_id), "%u", session_id); + + return pw_rtsp_client_connect(impl->rtsp, hostname, atoi(port), impl->session_id); +} + +static void rtsp_teardown_reply(void *data, int status, const struct spa_dict *headers) +{ + struct impl *impl = data; + const char *str; + + pw_log_info("reply"); + + connection_cleanup(impl); + + if ((str = spa_dict_lookup(headers, "Connection")) != NULL) { + if (spa_streq(str, "close")) + pw_rtsp_client_disconnect(impl->rtsp); + } +} + +static int rtsp_do_teardown(struct impl *impl) +{ + if (!impl->ready) + return 0; + + return pw_rtsp_client_send(impl->rtsp, "TEARDOWN", NULL, + NULL, NULL, rtsp_teardown_reply, impl); +} + +static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = data; + + switch (id) { + case SPA_PARAM_Format: + if (param == NULL) + rtsp_do_teardown(impl); + else + rtsp_do_connect(impl); + break; + default: + break; + } +} + +static const struct pw_stream_events playback_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = stream_state_changed, + .param_changed = stream_param_changed, + .process = playback_stream_process +}; + static int create_stream(struct impl *impl) { int res; @@ -978,8 +1177,6 @@ static int create_stream(struct impl *impl) const struct spa_pod *params[1]; uint8_t buffer[1024]; struct spa_pod_builder b; - const char *hostname, *port; - uint32_t session_id; impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props); impl->stream_props = NULL; @@ -1013,16 +1210,6 @@ static int create_stream(struct impl *impl) pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, &rtsp_events, impl); - hostname = pw_properties_get(impl->props, "raop.hostname"); - port = pw_properties_get(impl->props, "raop.port"); - if (hostname == NULL || port == NULL) - return -EINVAL; - - pw_getrandom(&session_id, sizeof(session_id), 0); - asprintf(&impl->session_id, "%u", session_id); - - pw_rtsp_client_connect(impl->rtsp, hostname, atoi(port), impl->session_id); - return 0; } @@ -1061,6 +1248,10 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); + if (impl->rtsp) + pw_rtsp_client_destroy(impl->rtsp); + + pw_properties_free(impl->headers); pw_properties_free(impl->stream_props); pw_properties_free(impl->props); @@ -1198,6 +1389,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) return -errno; pw_log_debug("module %p: new %s", impl, args); + impl->server_fd = -1; + impl->control_fd = -1; + impl->timing_fd = -1; if (args == NULL) args = ""; diff --git a/src/modules/module-raop/rtsp-client.c b/src/modules/module-raop/rtsp-client.c index 97860428f..48196e1c4 100644 --- a/src/modules/module-raop/rtsp-client.c +++ b/src/modules/module-raop/rtsp-client.c @@ -32,12 +32,6 @@ #include "rtsp-client.h" -enum { - STATE_INIT = 0, - STATE_CONNECTING, - STATE_CONNECTED, -}; - #define pw_rtsp_client_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_rtsp_client_events, m, v, ##__VA_ARGS__) #define pw_rtsp_client_emit_destroy(c) pw_rtsp_client_emit(c, destroy, 0) #define pw_rtsp_client_emit_connected(c) pw_rtsp_client_emit(c, connected, 0) @@ -70,8 +64,8 @@ struct pw_rtsp_client { struct sockaddr_in6 in6; } local_addr; - int state; struct spa_source *source; + unsigned int connecting:1; unsigned int need_flush:1; unsigned int wait_status:1; @@ -103,7 +97,6 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop, client->props = props; if (user_data_size > 0) client->user_data = SPA_PTROFF(client, sizeof(*client), void); - client->state = STATE_INIT; spa_list_init(&client->messages); spa_list_init(&client->pending); @@ -117,6 +110,12 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop, void pw_rtsp_client_destroy(struct pw_rtsp_client *client) { + pw_log_info("destroy client %p", client); + pw_rtsp_client_emit_destroy(client); + + pw_rtsp_client_disconnect(client); + pw_properties_free(client->headers); + pw_properties_free(client->props); spa_hook_list_clean(&client->listener_list); free(client); } @@ -186,7 +185,7 @@ static int handle_connect(struct pw_rtsp_client *client, int fd) pw_log_info("connected local ip %s", local_ip); - client->state = STATE_CONNECTED; + client->connecting = false; client->wait_status = true; pw_rtsp_client_emit_connected(client); @@ -283,7 +282,7 @@ static int process_input(struct pw_rtsp_client *client) free(msg); } else { pw_rtsp_client_emit_message(client, client->status, - client->state, &client->headers->dict); + &client->headers->dict); } client->wait_status = true; } else { @@ -294,6 +293,8 @@ static int process_input(struct pw_rtsp_client *client) if (value == NULL) goto error; *value++ = '\0'; + while (*value == ' ') + value++; pw_properties_set(client->headers, key, value); } } @@ -364,7 +365,7 @@ on_source_io(void *data, int fd, uint32_t mask) goto error; } if (mask & SPA_IO_OUT || client->need_flush) { - if (client->state == STATE_CONNECTING) { + if (client->connecting) { if ((res = handle_connect(client, fd)) < 0) goto error; } @@ -437,7 +438,7 @@ int pw_rtsp_client_connect(struct pw_rtsp_client *client, close(fd); return -errno; } - client->state = STATE_CONNECTING; + client->connecting = true; free(client->session_id); client->session_id = strdup(session_id); pw_log_info("%p: connecting", client); @@ -452,7 +453,10 @@ int pw_rtsp_client_disconnect(struct pw_rtsp_client *client) pw_loop_destroy_source(client->loop, client->source); client->source = NULL; - client->state = STATE_INIT; + free(client->url); + client->url = NULL; + free(client->session_id); + client->session_id = NULL; pw_rtsp_client_emit_disconnected(client); return 0; } diff --git a/src/modules/module-raop/rtsp-client.h b/src/modules/module-raop/rtsp-client.h index bf62ba2df..d6a4b05d5 100644 --- a/src/modules/module-raop/rtsp-client.h +++ b/src/modules/module-raop/rtsp-client.h @@ -45,7 +45,7 @@ struct pw_rtsp_client_events { void (*error) (void *data, int res); void (*disconnected) (void *data); - void (*message) (void *data, int status, int state, + void (*message) (void *data, int status, const struct spa_dict *headers); };