From e9b5cc405e730d5ce19588024b8a75e793c4376c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barnab=C3=A1s=20P=C5=91cze?= Date: Wed, 7 Sep 2022 03:32:17 +0200 Subject: [PATCH] pipewire: rtsp-client: read message content properly Make the receiving state machine more pronounced by explicitly storing the state in the client. Furthermore, always consume the message content if there is one and not only if the content type is "application/octet-stream", but do not try to do it at once - like previously, instead only as the socket becomes readable. The body is currently dropped, but it could easily be collected in e.g. a `pw_array` should the need ever arise. See #2673 --- src/modules/module-raop/rtsp-client.c | 227 ++++++++++++++++---------- 1 file changed, 144 insertions(+), 83 deletions(-) diff --git a/src/modules/module-raop/rtsp-client.c b/src/modules/module-raop/rtsp-client.c index 4615f1d62..c731a0469 100644 --- a/src/modules/module-raop/rtsp-client.c +++ b/src/modules/module-raop/rtsp-client.c @@ -49,6 +49,13 @@ struct message { void *user_data; }; +enum client_recv_state { + CLIENT_RECV_NONE, + CLIENT_RECV_STATUS, + CLIENT_RECV_HEADERS, + CLIENT_RECV_CONTENT, +}; + struct pw_rtsp_client { struct pw_loop *loop; struct pw_properties *props; @@ -67,12 +74,13 @@ struct pw_rtsp_client { struct spa_source *source; unsigned int connecting:1; unsigned int need_flush:1; - unsigned int wait_status:1; + enum client_recv_state recv_state; int status; char line_buf[1024]; size_t line_pos; struct pw_properties *headers; + size_t content_length; uint32_t cseq; @@ -101,6 +109,7 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop, spa_list_init(&client->pending); spa_hook_list_init(&client->listener_list); client->headers = pw_properties_new(NULL, NULL); + client->recv_state = CLIENT_RECV_NONE; pw_log_info("new client %p", client); @@ -185,11 +194,12 @@ static int handle_connect(struct pw_rtsp_client *client, int fd) pw_log_info("connected local ip %s", local_ip); client->connecting = false; - client->wait_status = true; + client->recv_state = CLIENT_RECV_STATUS; pw_properties_clear(client->headers); client->status = 0; client->line_pos = 0; + client->content_length = 0; pw_rtsp_client_emit_connected(client); @@ -240,94 +250,145 @@ static struct message *find_pending(struct pw_rtsp_client *client, uint32_t cseq return NULL; } +static int process_status(struct pw_rtsp_client *client, char *buf) +{ + const char *state = NULL, *s; + size_t len; + + pw_log_info("status: %s", buf); + + s = pw_split_walk(buf, " ", &len, &state); + if (!spa_strstartswith(s, "RTSP/")) + return -EPROTO; + + s = pw_split_walk(buf, " ", &len, &state); + if (s == NULL) + return -EPROTO; + + client->status = atoi(s); + if (client->status == 0) + return -EPROTO; + + s = pw_split_walk(buf, " ", &len, &state); + if (s == NULL) + return -EPROTO; + + pw_properties_clear(client->headers); + client->recv_state = CLIENT_RECV_HEADERS; + + return 0; +} + +static void dispatch_handler(struct pw_rtsp_client *client) +{ + uint32_t cseq; + if (pw_properties_fetch_uint32(client->headers, "CSeq", &cseq) < 0) + return; + + pw_log_info("received reply to request with cseq:%" PRIu32, cseq); + + struct message *msg = find_pending(client, cseq); + if (msg) { + msg->reply(msg->user_data, client->status, &client->headers->dict); + spa_list_remove(&msg->link); + free(msg); + } + else { + pw_rtsp_client_emit_message(client, client->status, &client->headers->dict); + } +} + +static void process_received_message(struct pw_rtsp_client *client) +{ + client->recv_state = CLIENT_RECV_STATUS; + dispatch_handler(client); +} + +static int process_header(struct pw_rtsp_client *client, char *buf) +{ + if (strlen(buf) > 0) { + char *key = buf, *value; + + value = strstr(buf, ":"); + if (value == NULL) + return -EPROTO; + + *value++ = '\0'; + while (*value == ' ') + value++; + + pw_properties_set(client->headers, key, value); + } + else { + const struct spa_dict_item *it; + spa_dict_for_each(it, &client->headers->dict) + pw_log_info(" %s: %s", it->key, it->value); + + client->content_length = pw_properties_get_uint32(client->headers, "Content-Length", 0); + if (client->content_length > 0) + client->recv_state = CLIENT_RECV_CONTENT; + else + process_received_message(client); + } + + return 0; +} + +static int process_content(struct pw_rtsp_client *client) +{ + char buf[1024]; + + while (client->content_length > 0) { + const size_t max_recv = SPA_MIN(sizeof(buf), client->content_length); + + ssize_t res = read(client->source->fd, buf, max_recv); + if (res == 0) + return -EPIPE; + + if (res < 0) { + res = -errno; + if (res == -EAGAIN || res == -EWOULDBLOCK) + return 0; + + return res; + } + + spa_assert((size_t) res <= client->content_length); + client->content_length -= res; + } + + if (client->content_length == 0) + process_received_message(client); + + return 0; +} + static int process_input(struct pw_rtsp_client *client) { - char *buf = NULL; - int res; + if (client->recv_state == CLIENT_RECV_STATUS || client->recv_state == CLIENT_RECV_HEADERS) { + char *buf = NULL; + int res; - if ((res = read_line(client, &buf)) <= 0) - return res; + if ((res = read_line(client, &buf)) <= 0) + return res; - pw_log_debug("%s", buf); + pw_log_debug("received line: %s", buf); - if (client->wait_status) { - const char *state = NULL, *s; - size_t len; - - pw_log_info("status: %s", buf); - - s = pw_split_walk(buf, " ", &len, &state); - if (!spa_strstartswith(s, "RTSP/")) - goto error; - - s = pw_split_walk(buf, " ", &len, &state); - if (s == NULL) - goto error; - - client->status = atoi(s); - if (client->status == 0) - goto error; - - s = pw_split_walk(buf, " ", &len, &state); - if (s == NULL) - goto error; - - client->wait_status = false; - pw_properties_clear(client->headers); - } else { - if (strlen(buf) == 0) { - uint32_t cseq; - struct message *msg; - const struct spa_dict_item *it; - const char *content_type; - unsigned int content_length; - - spa_dict_for_each(it, &client->headers->dict) - pw_log_info(" %s: %s", it->key, it->value); - - cseq = pw_properties_get_uint32(client->headers, "CSeq", 0); - content_type = pw_properties_get(client->headers, "Content-Type"); - if (content_type != NULL && strcmp(content_type, "application/octet-stream") == 0) { - pw_log_info("binary response received"); - content_length = pw_properties_get_uint64(client->headers, "Content-Length", 0); - char content_buf[content_length]; - res = read(client->source->fd, content_buf, content_length); - pw_log_debug("read %d bytes", res); - if (res == 0) - return -EPIPE; - if (res < 0) { - res = -errno; - if (res != -EAGAIN && res != -EWOULDBLOCK) - return res; - return 0; - } - pw_properties_set(client->headers, "body", content_buf); - } - if ((msg = find_pending(client, cseq)) != NULL) { - msg->reply(msg->user_data, client->status, &client->headers->dict); - spa_list_remove(&msg->link); - free(msg); - } else { - pw_rtsp_client_emit_message(client, client->status, - &client->headers->dict); - } - client->wait_status = true; - } else { - char *key, *value; - - key = buf; - value = strstr(buf, ":"); - if (value == NULL) - goto error; - *value++ = '\0'; - while (*value == ' ') - value++; - pw_properties_set(client->headers, key, value); + switch (client->recv_state) { + case CLIENT_RECV_STATUS: + return process_status(client, buf); + case CLIENT_RECV_HEADERS: + return process_header(client, buf); + default: + spa_assert_not_reached(); } } - return 0; -error: - return -EPROTO; + else if (client->recv_state == CLIENT_RECV_CONTENT) { + return process_content(client); + } + else { + spa_assert_not_reached(); + } } static int flush_output(struct pw_rtsp_client *client)