diff --git a/src/modules/module-raop/rtsp-client.c b/src/modules/module-raop/rtsp-client.c index 4615f1d62..c731a0469 100644 --- a/src/modules/module-raop/rtsp-client.c +++ b/src/modules/module-raop/rtsp-client.c @@ -49,6 +49,13 @@ struct message { void *user_data; }; +enum client_recv_state { + CLIENT_RECV_NONE, + CLIENT_RECV_STATUS, + CLIENT_RECV_HEADERS, + CLIENT_RECV_CONTENT, +}; + struct pw_rtsp_client { struct pw_loop *loop; struct pw_properties *props; @@ -67,12 +74,13 @@ struct pw_rtsp_client { struct spa_source *source; unsigned int connecting:1; unsigned int need_flush:1; - unsigned int wait_status:1; + enum client_recv_state recv_state; int status; char line_buf[1024]; size_t line_pos; struct pw_properties *headers; + size_t content_length; uint32_t cseq; @@ -101,6 +109,7 @@ struct pw_rtsp_client *pw_rtsp_client_new(struct pw_loop *main_loop, spa_list_init(&client->pending); spa_hook_list_init(&client->listener_list); client->headers = pw_properties_new(NULL, NULL); + client->recv_state = CLIENT_RECV_NONE; pw_log_info("new client %p", client); @@ -185,11 +194,12 @@ static int handle_connect(struct pw_rtsp_client *client, int fd) pw_log_info("connected local ip %s", local_ip); client->connecting = false; - client->wait_status = true; + client->recv_state = CLIENT_RECV_STATUS; pw_properties_clear(client->headers); client->status = 0; client->line_pos = 0; + client->content_length = 0; pw_rtsp_client_emit_connected(client); @@ -240,94 +250,145 @@ static struct message *find_pending(struct pw_rtsp_client *client, uint32_t cseq return NULL; } +static int process_status(struct pw_rtsp_client *client, char *buf) +{ + const char *state = NULL, *s; + size_t len; + + pw_log_info("status: %s", buf); + + s = pw_split_walk(buf, " ", &len, &state); + if (!spa_strstartswith(s, "RTSP/")) + return -EPROTO; + + s = pw_split_walk(buf, " ", &len, &state); + if (s == NULL) + return -EPROTO; + + client->status = atoi(s); + if (client->status == 0) + return -EPROTO; + + s = pw_split_walk(buf, " ", &len, &state); + if (s == NULL) + return -EPROTO; + + pw_properties_clear(client->headers); + client->recv_state = CLIENT_RECV_HEADERS; + + return 0; +} + +static void dispatch_handler(struct pw_rtsp_client *client) +{ + uint32_t cseq; + if (pw_properties_fetch_uint32(client->headers, "CSeq", &cseq) < 0) + return; + + pw_log_info("received reply to request with cseq:%" PRIu32, cseq); + + struct message *msg = find_pending(client, cseq); + if (msg) { + msg->reply(msg->user_data, client->status, &client->headers->dict); + spa_list_remove(&msg->link); + free(msg); + } + else { + pw_rtsp_client_emit_message(client, client->status, &client->headers->dict); + } +} + +static void process_received_message(struct pw_rtsp_client *client) +{ + client->recv_state = CLIENT_RECV_STATUS; + dispatch_handler(client); +} + +static int process_header(struct pw_rtsp_client *client, char *buf) +{ + if (strlen(buf) > 0) { + char *key = buf, *value; + + value = strstr(buf, ":"); + if (value == NULL) + return -EPROTO; + + *value++ = '\0'; + while (*value == ' ') + value++; + + pw_properties_set(client->headers, key, value); + } + else { + const struct spa_dict_item *it; + spa_dict_for_each(it, &client->headers->dict) + pw_log_info(" %s: %s", it->key, it->value); + + client->content_length = pw_properties_get_uint32(client->headers, "Content-Length", 0); + if (client->content_length > 0) + client->recv_state = CLIENT_RECV_CONTENT; + else + process_received_message(client); + } + + return 0; +} + +static int process_content(struct pw_rtsp_client *client) +{ + char buf[1024]; + + while (client->content_length > 0) { + const size_t max_recv = SPA_MIN(sizeof(buf), client->content_length); + + ssize_t res = read(client->source->fd, buf, max_recv); + if (res == 0) + return -EPIPE; + + if (res < 0) { + res = -errno; + if (res == -EAGAIN || res == -EWOULDBLOCK) + return 0; + + return res; + } + + spa_assert((size_t) res <= client->content_length); + client->content_length -= res; + } + + if (client->content_length == 0) + process_received_message(client); + + return 0; +} + static int process_input(struct pw_rtsp_client *client) { - char *buf = NULL; - int res; + if (client->recv_state == CLIENT_RECV_STATUS || client->recv_state == CLIENT_RECV_HEADERS) { + char *buf = NULL; + int res; - if ((res = read_line(client, &buf)) <= 0) - return res; + if ((res = read_line(client, &buf)) <= 0) + return res; - pw_log_debug("%s", buf); + pw_log_debug("received line: %s", buf); - if (client->wait_status) { - const char *state = NULL, *s; - size_t len; - - pw_log_info("status: %s", buf); - - s = pw_split_walk(buf, " ", &len, &state); - if (!spa_strstartswith(s, "RTSP/")) - goto error; - - s = pw_split_walk(buf, " ", &len, &state); - if (s == NULL) - goto error; - - client->status = atoi(s); - if (client->status == 0) - goto error; - - s = pw_split_walk(buf, " ", &len, &state); - if (s == NULL) - goto error; - - client->wait_status = false; - pw_properties_clear(client->headers); - } else { - if (strlen(buf) == 0) { - uint32_t cseq; - struct message *msg; - const struct spa_dict_item *it; - const char *content_type; - unsigned int content_length; - - spa_dict_for_each(it, &client->headers->dict) - pw_log_info(" %s: %s", it->key, it->value); - - cseq = pw_properties_get_uint32(client->headers, "CSeq", 0); - content_type = pw_properties_get(client->headers, "Content-Type"); - if (content_type != NULL && strcmp(content_type, "application/octet-stream") == 0) { - pw_log_info("binary response received"); - content_length = pw_properties_get_uint64(client->headers, "Content-Length", 0); - char content_buf[content_length]; - res = read(client->source->fd, content_buf, content_length); - pw_log_debug("read %d bytes", res); - if (res == 0) - return -EPIPE; - if (res < 0) { - res = -errno; - if (res != -EAGAIN && res != -EWOULDBLOCK) - return res; - return 0; - } - pw_properties_set(client->headers, "body", content_buf); - } - if ((msg = find_pending(client, cseq)) != NULL) { - msg->reply(msg->user_data, client->status, &client->headers->dict); - spa_list_remove(&msg->link); - free(msg); - } else { - pw_rtsp_client_emit_message(client, client->status, - &client->headers->dict); - } - client->wait_status = true; - } else { - char *key, *value; - - key = buf; - value = strstr(buf, ":"); - if (value == NULL) - goto error; - *value++ = '\0'; - while (*value == ' ') - value++; - pw_properties_set(client->headers, key, value); + switch (client->recv_state) { + case CLIENT_RECV_STATUS: + return process_status(client, buf); + case CLIENT_RECV_HEADERS: + return process_header(client, buf); + default: + spa_assert_not_reached(); } } - return 0; -error: - return -EPROTO; + else if (client->recv_state == CLIENT_RECV_CONTENT) { + return process_content(client); + } + else { + spa_assert_not_reached(); + } } static int flush_output(struct pw_rtsp_client *client)