diff --git a/src/modules/meson.build b/src/modules/meson.build index e4deb42a8..ad71992f1 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -519,7 +519,8 @@ roc_dep = dependency('roc', required: get_option('roc')) summary({'ROC': roc_dep.found()}, bool_yn: true, section: 'Streaming between daemons') pipewire_module_rtp_source = shared_library('pipewire-module-rtp-source', - [ 'module-rtp-source.c' ], + [ 'module-rtp-source.c', + 'module-rtp/stream.c' ], include_directories : [configinc], install : true, install_dir : modules_install_dir, @@ -528,7 +529,8 @@ pipewire_module_rtp_source = shared_library('pipewire-module-rtp-source', ) pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink', - [ 'module-rtp-sink.c' ], + [ 'module-rtp-sink.c', + 'module-rtp/stream.c' ], include_directories : [configinc], install : true, install_dir : modules_install_dir, diff --git a/src/modules/module-rtp-sap.c b/src/modules/module-rtp-sap.c index 403f84449..f6d73c729 100644 --- a/src/modules/module-rtp-sap.c +++ b/src/modules/module-rtp-sap.c @@ -648,7 +648,7 @@ static struct session *session_new_announce(struct impl *impl, struct node *node sdp->ntp = (uint32_t) time(NULL) + 2208988800U; sess->props = props; - if ((str = pw_properties_get(props, "rtp.session")) != NULL) + if ((str = pw_properties_get(props, "sess.name")) != NULL) sdp->session_name = strdup(str); if ((str = pw_properties_get(props, "rtp.destination.port")) == NULL) diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index a2b1ec4d4..d573988d0 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -113,18 +113,9 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define BUFFER_SIZE (1u<<20) -#define BUFFER_MASK (BUFFER_SIZE-1) - -#define DEFAULT_FORMAT "S16BE" -#define DEFAULT_RATE "48000" -#define DEFAULT_CHANNELS "2" -#define DEFAULT_POSITION "[ FL FR ]" - #define DEFAULT_CONTROL_IP "0.0.0.0" #define DEFAULT_CONTROL_PORT 0 #define DEFAULT_TTL 1 -#define DEFAULT_MTU 1280 #define DEFAULT_LOOP false #define USAGE "control.ip= " \ @@ -1474,6 +1465,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, "sess.min-ptime"); copy_props(impl, props, "sess.max-ptime"); copy_props(impl, props, "sess.latency.msec"); + copy_props(impl, props, "sess.ts-refclk"); impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); @@ -1485,8 +1477,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (spa_streq(str, "audio")) { struct spa_dict_item items[] = { { "audio.format", DEFAULT_FORMAT }, - { "audio.rate", DEFAULT_RATE }, - { "audio.channels", DEFAULT_CHANNELS }, + { "audio.rate", SPA_STRINGIFY(DEFAULT_RATE) }, + { "audio.channels", SPA_STRINGIFY(DEFAULT_CHANNELS) }, { "audio.position", DEFAULT_POSITION } }; pw_properties_add(stream_props, &SPA_DICT_INIT_ARRAY(items)); } diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index ca22d8c78..fce7e3cc5 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -25,7 +25,7 @@ #include #include -#include +#include /** \page page_module_rtp_sink PipeWire Module: RTP sink * @@ -104,28 +104,13 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define BUFFER_SIZE (1u<<20) -#define BUFFER_MASK (BUFFER_SIZE-1) - -#define DEFAULT_SESS_MEDIA "audio" - -#define DEFAULT_FORMAT "S16BE" -#define DEFAULT_RATE 48000 -#define DEFAULT_CHANNELS 2 -#define DEFAULT_POSITION "[ FL FR ]" - #define DEFAULT_PORT 46000 #define DEFAULT_SOURCE_IP "0.0.0.0" #define DEFAULT_DESTINATION_IP "224.0.0.56" #define DEFAULT_TTL 1 -#define DEFAULT_MTU 1280 #define DEFAULT_LOOP false #define DEFAULT_DSCP 34 /* Default to AES-67 AF41 (34) */ -#define DEFAULT_MIN_PTIME 2 -#define DEFAULT_MAX_PTIME 20 -#define DEFAULT_TS_OFFSET -1 - #define USAGE "source.ip= " \ "destination.ip= " \ "destination.port= " \ @@ -167,24 +152,15 @@ struct impl { struct spa_source *timer; struct pw_properties *stream_props; - struct pw_stream *stream; - struct spa_hook stream_listener; - - struct spa_io_position *io_position; + struct rtp_stream *stream; unsigned int do_disconnect:1; char *ifname; char *session_name; - uint32_t mtu; bool ttl; bool mcast_loop; uint32_t dscp; - float min_ptime; - float max_ptime; - uint32_t psamples; - uint32_t min_samples; - uint32_t max_samples; struct sockaddr_storage src_addr; socklen_t src_len; @@ -193,72 +169,29 @@ struct impl { struct sockaddr_storage dst_addr; socklen_t dst_len; - struct spa_audio_info info; - const struct format_info *format_info; - uint32_t rate; - uint32_t stride; - int payload; - uint16_t seq; - uint32_t ssrc; - uint32_t ts_offset; - char *ts_refclk; - - struct spa_ringbuffer ring; - uint8_t buffer[BUFFER_SIZE]; - int rtp_fd; - - unsigned sync:1; - unsigned apple_midi:1; }; static void stream_destroy(void *d) { struct impl *impl = d; - spa_hook_remove(&impl->stream_listener); impl->stream = NULL; } -static inline void -set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, - uint32_t offset, struct iovec *iov, uint32_t len) -{ - iov[0].iov_len = SPA_MIN(len, size - offset); - iov[0].iov_base = SPA_PTROFF(buffer, offset, void); - iov[1].iov_len = len - iov[0].iov_len; - iov[1].iov_base = buffer; -} - -struct format_info { - uint32_t media_subtype; - uint32_t format; - uint32_t size; - const char *mime; - const char *media_type; -}; - -static const struct format_info audio_format_info[] = { - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_U8, 1, "L8", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, - { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, - { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, -}; - -static const struct format_info *find_audio_format_info(const struct spa_audio_info *info) -{ - SPA_FOR_EACH_ELEMENT_VAR(audio_format_info, f) - if (f->media_subtype == info->media_subtype && - (f->format == 0 || f->format == info->info.raw.format)) - return f; - return NULL; -} - -static ssize_t send_packet(struct impl *impl, struct msghdr *msg) +static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) { + struct impl *impl = data; + struct msghdr msg; ssize_t n; - n = sendmsg(impl->rtp_fd, msg, MSG_NOSIGNAL); + + spa_zero(msg); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); if (n < 0) { switch (errno) { case ECONNREFUSED: @@ -266,338 +199,27 @@ static ssize_t send_packet(struct impl *impl, struct msghdr *msg) pw_log_debug("remote end not listening"); break; default: - pw_log_warn("sendmsg() failed, seq:%u dropped: %m", - impl->seq); + pw_log_warn("sendmsg() failed: %m"); break; } } - impl->seq++; - return n; } -static void flush_audio_packets(struct impl *impl) -{ - int32_t avail; - uint32_t stride, timestamp; - struct iovec iov[3]; - struct msghdr msg; - struct rtp_header header; - int32_t tosend; - - avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - tosend = impl->psamples; - - if (avail < tosend) - return; - - stride = impl->stride; - - spa_zero(header); - header.v = 2; - header.pt = impl->payload; - header.ssrc = htonl(impl->ssrc); - - iov[0].iov_base = &header; - iov[0].iov_len = sizeof(header); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 3; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - while (avail >= tosend) { - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->ts_offset + timestamp); - - set_iovec(&impl->ring, - impl->buffer, BUFFER_SIZE, - (timestamp * stride) & BUFFER_MASK, - &iov[1], tosend * stride); - - pw_log_trace("sending %d timestamp:%d", tosend, timestamp); - - send_packet(impl, &msg); - - timestamp += tosend; - avail -= tosend; - } - spa_ringbuffer_read_update(&impl->ring, timestamp); -} - -static void stream_audio_process(struct impl *impl) -{ - struct pw_buffer *buf; - struct spa_data *d; - uint32_t offs, size, timestamp, expected_timestamp, stride; - int32_t filled, wanted; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); - size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); - stride = impl->stride; - wanted = size / stride; - - filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); - if (SPA_LIKELY(impl->io_position)) - timestamp = impl->io_position->clock.position; - else - timestamp = expected_timestamp; - - if (impl->sync) { - if (expected_timestamp != timestamp) { - pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); - impl->sync = false; - } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { - pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); - impl->sync = false; - } - } - if (!impl->sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", - timestamp, impl->seq, impl->ts_offset, impl->ssrc); - impl->ring.readindex = impl->ring.writeindex = timestamp; - memset(impl->buffer, 0, BUFFER_SIZE); - impl->sync = true; - } - - spa_ringbuffer_write_data(&impl->ring, - impl->buffer, - BUFFER_SIZE, - (timestamp * stride) & BUFFER_MASK, - SPA_PTROFF(d[0].data, offs, void), wanted * stride); - timestamp += wanted; - spa_ringbuffer_write_update(&impl->ring, timestamp); - - pw_stream_queue_buffer(impl->stream, buf); - - flush_audio_packets(impl); -} - -static int write_event(uint8_t *p, uint32_t value, void *ev, uint32_t size) -{ - uint64_t buffer; - uint8_t b; - int count = 0; - - buffer = value & 0x7f; - while ((value >>= 7)) { - buffer <<= 8; - buffer |= ((value & 0x7f) | 0x80); - } - do { - b = buffer & 0xff; - p[count++] = b; - buffer >>= 8; - } while (b & 0x80); - - memcpy(&p[count], ev, size); - return count + size; -} - -static void flush_midi_packets(struct impl *impl, struct spa_pod_sequence *sequence, uint32_t timestamp) -{ - struct spa_pod_control *c; - struct rtp_header header; - struct rtp_midi_header midi_header; - struct iovec iov[3]; - struct msghdr msg; - uint32_t len, prev_offset, base; - - spa_zero(header); - header.v = 2; - header.pt = impl->payload; - header.ssrc = htonl(impl->ssrc); - - spa_zero(midi_header); - midi_header.b = 1; - midi_header.z = 1; - - iov[0].iov_base = &header; - iov[0].iov_len = sizeof(header); - iov[1].iov_base = &midi_header; - iov[1].iov_len = sizeof(midi_header); - iov[2].iov_base = impl->buffer; - iov[2].iov_len = 0; - - spa_zero(msg); - msg.msg_iov = iov; - msg.msg_iovlen = 3; - - prev_offset = len = base = 0; - - SPA_POD_SEQUENCE_FOREACH(sequence, c) { - void *ev; - uint32_t size, delta; - - if (c->type != SPA_CONTROL_Midi) - continue; - - ev = SPA_POD_BODY(&c->value), - size = SPA_POD_BODY_SIZE(&c->value); - - if (len > 0 && (len + size > impl->mtu || - c->offset - base > impl->max_samples)) { - /* flush packet when we have one and when it's either - * too large or has too much data. */ - midi_header.len = (len >> 8) & 0xf; - midi_header.len_b = len & 0xff; - iov[2].iov_len = len; - - pw_log_debug("sending %d timestamp:%d %u %u", - len, timestamp + base, - c->offset, impl->max_samples); - send_packet(impl, &msg); - - len = 0; - } - if (len == 0) { - /* start new packet */ - base = prev_offset = c->offset; - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->ts_offset + timestamp + base); - } - - delta = c->offset - prev_offset; - prev_offset = c->offset; - - len += write_event(&impl->buffer[len], delta, ev, size); - } - if (len > 0) { - /* flush last packet */ - midi_header.len = (len >> 8) & 0xf; - midi_header.len_b = len & 0xff; - iov[2].iov_len = len; - - pw_log_debug("sending %d timestamp:%d", len, base); - send_packet(impl, &msg); - } -} - -static void send_cmd(struct impl *impl) -{ - uint8_t buffer[16]; - struct iovec iov[3]; - struct msghdr msg; - - spa_zero(buffer); - buffer[0] = 0xff; - buffer[1] = 0xff; - buffer[2] = 'I'; - buffer[3] = 'N'; - - iov[0].iov_base = buffer; - iov[0].iov_len = sizeof(buffer); - - spa_zero(msg); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - send_packet(impl, &msg); -} - -static void stream_midi_process(void *data) +static void stream_state_changed(void *data, bool started, const char *error) { struct impl *impl = data; - struct pw_buffer *buf; - struct spa_data *d; - uint32_t offs, size, timestamp; - struct spa_pod *pod; - void *ptr; - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); - size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); - - if (SPA_LIKELY(impl->io_position)) - timestamp = impl->io_position->clock.position; - else - timestamp = 0; - - ptr = SPA_PTROFF(d[0].data, offs, void); - - if ((pod = spa_pod_from_data(ptr, size, 0, size)) == NULL) - goto done; - if (!spa_pod_is_sequence(pod)) - goto done; - - if (!impl->sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", - timestamp, impl->seq, impl->ts_offset, impl->ssrc); - impl->sync = true; - if (impl->apple_midi) - send_cmd(impl); - } - - flush_midi_packets(impl, (struct spa_pod_sequence*)pod, timestamp); - -done: - pw_stream_queue_buffer(impl->stream, buf); -} - -static void stream_process(void *data) -{ - struct impl *impl = data; - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - stream_audio_process(impl); - break; - case SPA_MEDIA_TYPE_application: - stream_midi_process(impl); - break; - } -} - - -static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) -{ - struct impl *impl = data; - switch (id) { - case SPA_IO_Position: - impl->io_position = area; - break; - } -} - -static void on_stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct impl *impl = d; - - switch (state) { - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - pw_impl_module_schedule_destroy(impl->module); - break; - case PW_STREAM_STATE_ERROR: + if (error) { pw_log_error("stream error: %s", error); - break; - case PW_STREAM_STATE_PAUSED: - impl->sync = false; - break; - default: - break; + pw_impl_module_schedule_destroy(impl->module); } } -static const struct pw_stream_events in_stream_events = { - PW_VERSION_STREAM_EVENTS, +static const struct rtp_stream_events stream_events = { + RTP_VERSION_STREAM_EVENTS, .destroy = stream_destroy, - .io_changed = stream_io_changed, - .state_changed = on_stream_state_changed, - .process = stream_process + .state_changed = stream_state_changed, + .send_packet = stream_send_packet, }; static int parse_address(const char *address, uint16_t port, @@ -681,75 +303,6 @@ error: return res; } -static int setup_stream(struct impl *impl) -{ - const struct spa_pod *params[1]; - struct spa_pod_builder b; - uint32_t n_params; - uint8_t buffer[1024]; - struct pw_properties *props; - enum pw_stream_flags flags; - int res, fd; - - props = pw_properties_copy(impl->stream_props); - if (props == NULL) - return -errno; - - if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) { - pw_properties_setf(props, PW_KEY_NODE_LATENCY, - "%d/%d", impl->psamples, impl->rate); - } - pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", impl->rate); - - impl->stream = pw_stream_new(impl->core, - "rtp-sink capture", props); - if (impl->stream == NULL) - return -errno; - - pw_stream_add_listener(impl->stream, - &impl->stream_listener, - &in_stream_events, impl); - - n_params = 0; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - - flags = PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS; - - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &impl->info); - flags |= PW_STREAM_FLAG_AUTOCONNECT; - break; - case SPA_MEDIA_TYPE_application: - params[n_params++] = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, - SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), - SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); - break; - default: - return -EINVAL; - } - - if ((res = pw_stream_connect(impl->stream, - PW_DIRECTION_INPUT, - PW_ID_ANY, - flags, - params, n_params)) < 0) - return res; - - - if ((fd = make_socket(&impl->src_addr, impl->src_len, - &impl->dst_addr, impl->dst_len, - impl->mcast_loop, impl->ttl, - impl->dscp)) < 0) - return fd; - - impl->rtp_fd = fd; - - return 0; -} - static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len) { if (sa->ss_family == AF_INET) { @@ -778,7 +331,7 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { if (impl->stream) - pw_stream_destroy(impl->stream); + rtp_stream_destroy(impl->stream); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -793,7 +346,6 @@ static void impl_destroy(struct impl *impl) pw_properties_free(impl->props); free(impl->ifname); - free(impl->ts_refclk); free(impl->session_name); free(impl); } @@ -826,63 +378,6 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static inline uint32_t format_from_name(const char *name, size_t len) -{ - int i; - for (i = 0; spa_type_audio_format[i].name; i++) { - if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) - return spa_type_audio_format[i].type; - } - return SPA_AUDIO_FORMAT_UNKNOWN; -} - -static uint32_t channel_from_name(const char *name) -{ - int i; - for (i = 0; spa_type_audio_channel[i].name; i++) { - if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) - return spa_type_audio_channel[i].type; - } - return SPA_AUDIO_CHANNEL_UNKNOWN; -} - -static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) -{ - struct spa_json it[2]; - char v[256]; - - spa_json_init(&it[0], val, len); - if (spa_json_enter_array(&it[0], &it[1]) <= 0) - spa_json_init(&it[1], val, len); - - info->channels = 0; - while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && - info->channels < SPA_AUDIO_MAX_CHANNELS) { - info->position[info->channels++] = channel_from_name(v); - } -} - -static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) -{ - const char *str; - - spa_zero(*info); - if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) - str = DEFAULT_FORMAT; - info->format = format_from_name(str, strlen(str)); - - info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); - if (info->rate == 0) - info->rate = DEFAULT_RATE; - - info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); - info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); - if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) - parse_position(info, str, strlen(str)); - if (info->channels == 0) - parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); -} - static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -898,11 +393,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; struct pw_properties *props = NULL, *stream_props = NULL; - uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); - uint32_t pid = getpid(); - int64_t ts_offset; char addr[64]; - const char *str; + const char *str, *sess_name; int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -936,18 +428,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->context = context; impl->loop = pw_context_get_main_loop(context); - if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); - if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) - pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); + if ((sess_name = pw_properties_get(props, "sess.name")) == NULL) + sess_name = pw_get_host_name(); if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) - pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp-sink-%u-%u", pid, id); + pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp_session.%s", sess_name); if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) - pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, - pw_properties_get(props, PW_KEY_NODE_NAME)); + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s", sess_name); if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) - pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Sender Stream"); + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Session with %s", + sess_name); if ((str = pw_properties_get(props, "stream.props")) != NULL) pw_properties_update_string(stream_props, str, strlen(str)); @@ -964,54 +454,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); copy_props(impl, props, PW_KEY_MEDIA_NAME); copy_props(impl, props, PW_KEY_MEDIA_CLASS); - - if ((str = pw_properties_get(props, "sess.media")) == NULL) - str = DEFAULT_SESS_MEDIA; - - if (spa_streq(str, "audio")) { - impl->info.media_type = SPA_MEDIA_TYPE_audio; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; - } - else if (spa_streq(str, "midi")) { - impl->info.media_type = SPA_MEDIA_TYPE_application; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; - } - else { - pw_log_error("unsupported media type:%s", str); - res = -EINVAL; - goto out; - } - - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - parse_audio_info(impl->stream_props, &impl->info.info.raw); - impl->format_info = find_audio_format_info(&impl->info); - if (impl->format_info == NULL) { - pw_log_error("unsupported audio format:%d channels:%d", - impl->info.info.raw.format, impl->info.info.raw.channels); - res = -EINVAL; - goto out; - } - impl->stride = impl->format_info->size * impl->info.info.raw.channels; - impl->rate = impl->info.info.raw.rate; - break; - case SPA_MEDIA_TYPE_application: - impl->format_info = find_audio_format_info(&impl->info); - if (impl->format_info == NULL) { - res = -EINVAL; - goto out; - } - pw_properties_set(impl->stream_props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); - impl->stride = impl->format_info->size; - impl->rate = 48000; - break; - default: - spa_assert_not_reached(); - break; - } - impl->payload = 127; - impl->seq = pw_rand32(); - impl->ssrc = pw_rand32(); + copy_props(impl, props, "net.mtu"); + copy_props(impl, props, "rtp.media"); + copy_props(impl, props, "sess.name"); + copy_props(impl, props, "sess.min-ptime"); + copy_props(impl, props, "sess.max-ptime"); + copy_props(impl, props, "sess.latency.msec"); str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; @@ -1032,56 +480,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto out; } - impl->mtu = pw_properties_get_uint32(props, "net.mtu", DEFAULT_MTU); impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); impl->dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP); - ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET); - impl->ts_offset = ts_offset < 0 ? pw_rand32() : ts_offset; - - str = pw_properties_get(props, "sess.ts-refclk"); - impl->ts_refclk = str ? strdup(str) : NULL; - - str = pw_properties_get(props, "sess.min-ptime"); - if (!spa_atof(str, &impl->min_ptime)) - impl->min_ptime = DEFAULT_MIN_PTIME; - str = pw_properties_get(props, "sess.max-ptime"); - if (!spa_atof(str, &impl->max_ptime)) - impl->max_ptime = DEFAULT_MAX_PTIME; - - impl->min_samples = impl->min_ptime * impl->rate / 1000; - impl->max_samples = impl->max_ptime * impl->rate / 1000; - - impl->psamples = impl->mtu / impl->stride; - impl->psamples = SPA_CLAMP(impl->psamples, impl->min_samples, impl->max_samples); - - if ((str = pw_properties_get(props, "sess.name")) == NULL) - pw_properties_setf(props, "sess.name", "PipeWire RTP Stream on %s", - pw_get_host_name()); - str = pw_properties_get(props, "sess.name"); - impl->session_name = str ? strdup(str) : NULL; - - pw_properties_set(stream_props, "rtp.session", impl->session_name); get_ip(&impl->src_addr, addr, sizeof(addr)); pw_properties_set(stream_props, "rtp.source.ip", addr); get_ip(&impl->dst_addr, addr, sizeof(addr)); pw_properties_set(stream_props, "rtp.destination.ip", addr); pw_properties_setf(stream_props, "rtp.destination.port", "%u", impl->dst_port); - pw_properties_setf(stream_props, "rtp.mtu", "%u", impl->mtu); pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl); - pw_properties_setf(stream_props, "rtp.ptime", "%u", - impl->psamples * 1000 / impl->rate); pw_properties_setf(stream_props, "rtp.dscp", "%u", impl->dscp); - pw_properties_setf(stream_props, "rtp.media", "%s", impl->format_info->media_type); - pw_properties_setf(stream_props, "rtp.mime", "%s", impl->format_info->mime); - pw_properties_setf(stream_props, "rtp.payload", "%u", impl->payload); - pw_properties_setf(stream_props, "rtp.rate", "%u", impl->rate); - if (impl->info.info.raw.channels > 0) - pw_properties_setf(stream_props, "rtp.channels", "%u", impl->info.info.raw.channels); - pw_properties_setf(stream_props, "rtp.ts-offset", "%u", impl->ts_offset); - if (impl->ts_refclk != NULL) - pw_properties_set(stream_props, "rtp.ts-refclk", impl->ts_refclk); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { @@ -1106,8 +515,22 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - if ((res = setup_stream(impl)) < 0) + if ((res = make_socket(&impl->src_addr, impl->src_len, + &impl->dst_addr, impl->dst_len, + impl->mcast_loop, impl->ttl)) < 0) { + pw_log_error("can't make socket: %s", spa_strerror(res)); goto out; + } + impl->rtp_fd = res; + + impl->stream = rtp_stream_new(impl->core, + PW_DIRECTION_INPUT, pw_properties_copy(stream_props), + &stream_events, impl); + if (impl->stream == NULL) { + res = -errno; + pw_log_error("can't create stream: %m"); + goto out; + } pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index d9e3f519e..b0b7f682f 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -28,7 +28,7 @@ #include #include -#include +#include #ifdef __FreeBSD__ #define ifr_ifindex ifr_index @@ -97,22 +97,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic #define DEFAULT_CLEANUP_SEC 60 -#define ERROR_MSEC 2 - -#define DEFAULT_SESS_LATENCY 100 - #define DEFAULT_SOURCE_IP "224.0.0.56" -#define DEFAULT_FORMAT "S16BE" -#define DEFAULT_RATE 48000 -#define DEFAULT_CHANNELS 2 -#define DEFAULT_POSITION "[ FL FR ]" - -#define BUFFER_SIZE (1u<<22) -#define BUFFER_MASK (BUFFER_SIZE-1) -#define BUFFER_SIZE2 (BUFFER_SIZE>>1) -#define BUFFER_MASK2 (BUFFER_SIZE2-1) - #define USAGE "local.ifname= " \ "source.ip= " \ "source.port= " \ @@ -147,549 +133,37 @@ struct impl { char *ifname; bool always_process; - int sess_latency_msec; uint32_t cleanup_interval; struct spa_source *timer; - struct spa_audio_info info; struct pw_properties *stream_props; - struct pw_stream *stream; - struct spa_hook stream_listener; + struct rtp_stream *stream; uint16_t src_port; struct sockaddr_storage src_addr; socklen_t src_len; struct spa_source *source; - uint32_t rate; - uint32_t stride; - uint32_t expected_ssrc; - uint16_t expected_seq; - unsigned have_ssrc:1; - unsigned have_seq:1; - unsigned have_sync:1; - uint32_t ts_offset; - - struct spa_ringbuffer ring; - uint8_t buffer[BUFFER_SIZE]; - - struct spa_io_rate_match *rate_match; - struct spa_io_position *position; - struct spa_dll dll; - double corr; - uint32_t target_buffer; - float max_error; - - unsigned first:1; unsigned receiving:1; - unsigned direct_timestamp:1; - - float last_timestamp; - float last_time; }; -struct format_info { - uint32_t media_subtype; - uint32_t format; - uint32_t size; - const char *mime; - const char *media_type; -}; - -static uint32_t audio_get_stride(const struct spa_audio_info *info) -{ - uint32_t stride = 0; - - if (info->media_type != SPA_MEDIA_TYPE_audio || - info->media_subtype != SPA_MEDIA_SUBTYPE_raw) - return 0; - - switch (info->info.raw.format) { - case SPA_AUDIO_FORMAT_U8: - case SPA_AUDIO_FORMAT_ALAW: - case SPA_AUDIO_FORMAT_ULAW: - stride = 1; - break; - case SPA_AUDIO_FORMAT_S16_BE: - stride = 2; - break; - case SPA_AUDIO_FORMAT_S24_BE: - stride = 3; - break; - default: - break; - } - return stride * info->info.raw.channels; -} - -static void process_audio(struct impl *impl) -{ - struct pw_buffer *buf; - struct spa_data *d; - uint32_t wanted, timestamp, target_buffer, stride, maxsize; - int32_t avail; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - stride = impl->stride; - - maxsize = d[0].maxsize / stride; - wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; - - if (impl->position && impl->direct_timestamp) { - /* in direct mode, read directly from the timestamp index, - * because sender and receiver are in sync, this would keep - * target_buffer of samples available. */ - spa_ringbuffer_read_update(&impl->ring, - impl->position->clock.position); - } - avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - - target_buffer = impl->target_buffer; - - if (avail < (int32_t)wanted) { - enum spa_log_level level; - memset(d[0].data, 0, wanted * stride); - if (impl->have_sync) { - impl->have_sync = false; - level = SPA_LOG_LEVEL_WARN; - } else { - level = SPA_LOG_LEVEL_DEBUG; - } - pw_log(level, "underrun %d/%u < %u", - avail, target_buffer, wanted); - } else { - float error, corr; - if (impl->first) { - if ((uint32_t)avail > target_buffer) { - uint32_t skip = avail - target_buffer; - pw_log_debug("first: avail:%d skip:%u target:%u", - avail, skip, target_buffer); - timestamp += skip; - avail = target_buffer; - } - impl->first = false; - } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { - pw_log_warn("overrun %u > %u", avail, target_buffer * 8); - timestamp += avail - target_buffer; - avail = target_buffer; - } - if (!impl->direct_timestamp) { - /* when not using direct timestamp and clocks are not - * in sync, try to adjust our playback rate to keep the - * requested target_buffer bytes in the ringbuffer */ - error = (float)target_buffer - (float)avail; - error = SPA_CLAMP(error, -impl->max_error, impl->max_error); - - corr = spa_dll_update(&impl->dll, error); - - pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, - target_buffer, error, corr); - - if (impl->rate_match) { - SPA_FLAG_SET(impl->rate_match->flags, - SPA_IO_RATE_MATCH_FLAG_ACTIVE); - impl->rate_match->rate = 1.0f / corr; - } - } - spa_ringbuffer_read_data(&impl->ring, - impl->buffer, - BUFFER_SIZE, - (timestamp * stride) & BUFFER_MASK, - d[0].data, wanted * stride); - - timestamp += wanted; - spa_ringbuffer_read_update(&impl->ring, timestamp); - } - d[0].chunk->size = wanted * stride; - d[0].chunk->stride = stride; - d[0].chunk->offset = 0; - buf->size = wanted; - - pw_stream_queue_buffer(impl->stream, buf); -} - -static void receive_audio(struct impl *impl, uint8_t *packet, - uint32_t timestamp, uint32_t payload_offset, uint32_t len) -{ - uint32_t plen = len - payload_offset; - uint8_t *payload = &packet[payload_offset]; - uint32_t stride = impl->stride; - uint32_t samples = plen / stride; - uint32_t write, expected_write; - int32_t filled; - - filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); - - /* we always write to timestamp + delay */ - write = timestamp + impl->target_buffer; - - if (!impl->have_sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", - write, impl->expected_seq-1, impl->ts_offset, impl->expected_ssrc, - impl->direct_timestamp); - - /* we read from timestamp, keeping target_buffer of data - * in the ringbuffer. */ - impl->ring.readindex = timestamp; - impl->ring.writeindex = write; - filled = impl->target_buffer; - - spa_dll_init(&impl->dll); - spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); - memset(impl->buffer, 0, BUFFER_SIZE); - impl->have_sync = true; - } else if (expected_write != write) { - pw_log_debug("unexpected write (%u != %u)", - write, expected_write); - } - - if (filled + samples > BUFFER_SIZE / stride) { - pw_log_debug("capture overrun %u + %u > %u", filled, samples, - BUFFER_SIZE / stride); - impl->have_sync = false; - } else { - pw_log_debug("got samples:%u", samples); - spa_ringbuffer_write_data(&impl->ring, - impl->buffer, - BUFFER_SIZE, - (write * stride) & BUFFER_MASK, - payload, (samples * stride)); - write += samples; - spa_ringbuffer_write_update(&impl->ring, write); - } -} - -static void process_midi(struct impl *impl) -{ - struct pw_buffer *buf; - struct spa_data *d; - uint32_t timestamp, duration, maxsize, read; - struct spa_pod_builder b; - struct spa_pod_frame f[1]; - void *ptr; - struct spa_pod *pod; - struct spa_pod_control *c; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - maxsize = d[0].maxsize; - - /* we always use the graph position to select events, the receiver side is - * responsible for smoothing out the RTP timestamps to graph time */ - duration = impl->position->clock.duration; - if (impl->position) - timestamp = impl->position->clock.position; - else - timestamp = 0; - - /* we copy events into the buffer based on the rtp timestamp + delay. */ - spa_pod_builder_init(&b, d[0].data, maxsize); - spa_pod_builder_push_sequence(&b, &f[0], 0); - - while (true) { - int32_t avail = spa_ringbuffer_get_read_index(&impl->ring, &read); - if (avail <= 0) - break; - - ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void); - - if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL) - goto done; - if (!spa_pod_is_sequence(pod)) - goto done; - - /* the ringbuffer contains series of sequences, one for each - * received packet */ - SPA_POD_SEQUENCE_FOREACH((struct spa_pod_sequence*)pod, c) { - /* try to render with given delay */ - uint32_t target = c->offset + impl->target_buffer; - if (timestamp != 0) { - /* skip old packets */ - if (target < timestamp) - continue; - /* event for next cycle */ - if (target >= timestamp + duration) - goto complete; - } else { - timestamp = target; - } - spa_pod_builder_control(&b, target - timestamp, SPA_CONTROL_Midi); - spa_pod_builder_bytes(&b, - SPA_POD_BODY(&c->value), - SPA_POD_BODY_SIZE(&c->value)); - } - /* we completed a sequence (one RTP packet), advance ringbuffer - * and go to the next packet */ - read += SPA_PTRDIFF(c, ptr); - spa_ringbuffer_read_update(&impl->ring, read); - } -complete: - spa_pod_builder_pop(&b, &f[0]); - - if (b.state.offset > maxsize) { - pw_log_warn("overflow buffer %u %u", b.state.offset, maxsize); - b.state.offset = 0; - } - d[0].chunk->size = b.state.offset; - d[0].chunk->stride = 1; - d[0].chunk->offset = 0; -done: - pw_stream_queue_buffer(impl->stream, buf); -} - -static int parse_varlen(uint8_t *p, uint32_t avail, uint32_t *result) -{ - uint32_t value = 0, offs = 0; - while (offs < avail) { - uint8_t b = p[offs++]; - value = (value << 7) | (b & 0x7f); - if ((b & 0x80) == 0) - break; - } - *result = value; - return offs; -} - -static int get_midi_size(uint8_t *p, uint32_t avail) -{ - int size; - uint32_t offs = 0, value; - - switch (p[offs++]) { - case 0xc0 ... 0xdf: - size = 2; - break; - case 0x80 ... 0xbf: - case 0xe0 ... 0xef: - size = 3; - break; - case 0xff: - case 0xf0: - case 0xf7: - size = parse_varlen(&p[offs], avail - offs, &value); - size += value + 1; - break; - default: - return -EINVAL; - } - return size; -} - -static double get_time(struct impl *impl) -{ - struct timespec ts; - double t; - clock_gettime(CLOCK_MONOTONIC, &ts); - t = impl->position->clock.position / (double) impl->position->clock.rate.denom; - t += (SPA_TIMESPEC_TO_NSEC(&ts) - impl->position->clock.nsec) / (double)SPA_NSEC_PER_SEC; - return t; -} - -static void receive_midi(struct impl *impl, uint8_t *packet, - uint32_t timestamp, uint32_t payload_offset, uint32_t plen) -{ - uint32_t write; - struct rtp_midi_header *hdr; - int32_t filled; - struct spa_pod_builder b; - struct spa_pod_frame f[1]; - void *ptr; - uint32_t offs = payload_offset, len, end; - bool first = true; - - if (impl->direct_timestamp) { - /* in direct timestamp we attach the RTP timestamp directly on the - * midi events and render them in the corresponding cycle */ - if (!impl->have_sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", - timestamp, impl->expected_seq-1, impl->ts_offset, impl->expected_ssrc, - impl->direct_timestamp); - impl->have_sync = true; - } - } else { - /* in non-direct timestamp mode, we relate the graph clock against - * the RTP timestamps */ - double ts = timestamp / (float) impl->rate; - double t = get_time(impl); - double elapsed, estimated, diff; - - /* the elapsed time between RTP timestamps */ - elapsed = ts - impl->last_timestamp; - /* for that elapsed time, our clock should have advanced - * by this amount since the last estimation */ - estimated = impl->last_time + elapsed * impl->corr; - /* calculate the diff between estimated and current clock time in - * samples */ - diff = (estimated - t) * impl->rate; - - /* no sync or we drifted too far, resync */ - if (!impl->have_sync || fabs(diff) > impl->target_buffer) { - impl->corr = 1.0; - spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256, impl->rate); - - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u direct:%d", - timestamp, impl->expected_seq-1, impl->ts_offset, impl->expected_ssrc, - impl->direct_timestamp); - impl->have_sync = true; - impl->ring.readindex = impl->ring.writeindex; - } else { - /* update our new rate correction */ - impl->corr = spa_dll_update(&impl->dll, diff); - /* our current time is now the estimated time */ - t = estimated; - } - pw_log_debug("%f %f %f %f", t, estimated, diff, impl->corr); - - timestamp = t * impl->rate; - - impl->last_timestamp = ts; - impl->last_time = t; - } - - filled = spa_ringbuffer_get_write_index(&impl->ring, &write); - if (filled > (int32_t)BUFFER_SIZE2) - return; - - hdr = (struct rtp_midi_header *)&packet[offs++]; - len = hdr->len; - if (hdr->b) { - len = (len << 8) | hdr->len_b; - offs++; - } - end = len + offs; - if (end > plen) - return; - - ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void); - - /* each packet is written as a sequence of events. The offset is - * the RTP timestamp */ - spa_pod_builder_init(&b, ptr, BUFFER_SIZE2 - filled); - spa_pod_builder_push_sequence(&b, &f[0], 0); - - while (offs < end) { - uint32_t delta; - int size; - - if (first && !hdr->z) - delta = 0; - else - offs += parse_varlen(&packet[offs], end - offs, &delta); - - timestamp += delta * impl->corr; - spa_pod_builder_control(&b, timestamp, SPA_CONTROL_Midi); - - size = get_midi_size(&packet[offs], end - offs); - - if (size <= 0 || offs + size > end) { - pw_log_warn("invalid size (%08x) %d (%u %u)", - packet[offs], size, offs, end); - break; - } - - spa_pod_builder_bytes(&b, &packet[offs], size); - - offs += size; - first = false; - } - spa_pod_builder_pop(&b, &f[0]); - - write += b.state.offset; - spa_ringbuffer_write_update(&impl->ring, write); -} - -static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) -{ - struct impl *impl = data; - switch (id) { - case SPA_IO_RateMatch: - impl->rate_match = area; - break; - case SPA_IO_Position: - impl->position = area; - break; - } -} - -static void stream_destroy(void *d) -{ - struct impl *impl = d; - spa_hook_remove(&impl->stream_listener); - impl->stream = NULL; -} - -static void stream_process(void *data) -{ - struct impl *impl = data; - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - process_audio(impl); - break; - case SPA_MEDIA_TYPE_application: - process_midi(impl); - break; - } -} - static void on_rtp_io(void *data, int fd, uint32_t mask) { struct impl *impl = data; - struct rtp_header *hdr; - ssize_t len, hlen; + ssize_t len; uint8_t buffer[2048]; if (mask & SPA_IO_IN) { - uint16_t seq; - uint32_t timestamp; - if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) goto receive_error; if (len < 12) goto short_packet; - hdr = (struct rtp_header*)buffer; - if (hdr->v != 2) - goto invalid_version; + rtp_stream_receive_packet(impl->stream, buffer, len); - hlen = 12 + hdr->cc * 4; - if (hlen > len) - goto invalid_len; - - if (impl->have_ssrc && impl->expected_ssrc != hdr->ssrc) - goto unexpected_ssrc; - impl->expected_ssrc = hdr->ssrc; - impl->have_ssrc = true; - - seq = ntohs(hdr->sequence_number); - if (impl->have_seq && impl->expected_seq != seq) { - pw_log_info("unexpected seq (%d != %d) SSRC:%u", - seq, impl->expected_seq, hdr->ssrc); - impl->have_sync = false; - } - impl->expected_seq = seq + 1; - impl->have_seq = true; - - timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - receive_audio(impl, buffer, timestamp, hlen, len); - break; - case SPA_MEDIA_TYPE_application: - receive_midi(impl, buffer, timestamp, hlen, len); - } impl->receiving = true; } return; @@ -700,17 +174,6 @@ receive_error: short_packet: pw_log_warn("short packet received"); return; -invalid_version: - pw_log_warn("invalid RTP version"); - spa_debug_mem(0, buffer, len); - return; -invalid_len: - pw_log_warn("invalid RTP length"); - return; -unexpected_ssrc: - pw_log_warn("unexpected SSRC (expected %u != %u)", - impl->expected_ssrc, hdr->ssrc); - return; } static int parse_address(const char *address, uint16_t port, @@ -810,11 +273,6 @@ error: return res; } -static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) -{ - return msec * impl->rate / 1000; -} - static int stream_start(struct impl *impl) { int fd; @@ -851,113 +309,34 @@ static void stream_stop(struct impl *impl) impl->source = NULL; } -static void on_stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) +static void stream_destroy(void *d) { struct impl *impl = d; - - switch (state) { - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - pw_impl_module_schedule_destroy(impl->module); - break; - case PW_STREAM_STATE_ERROR: - pw_log_error("stream error: %s", error); - break; - case PW_STREAM_STATE_STREAMING: - if ((errno = -stream_start(impl)) < 0) - pw_log_error("failed to start RTP stream: %m"); - break; - case PW_STREAM_STATE_PAUSED: - if (!impl->always_process) - stream_stop(impl); - break; - default: - break; - } + impl->stream = NULL; } -static const struct pw_stream_events out_stream_events = { - PW_VERSION_STREAM_EVENTS, - .destroy = stream_destroy, - .state_changed = on_stream_state_changed, - .io_changed = stream_io_changed, - .process = stream_process -}; - -static int setup_stream(struct impl *impl) +static void stream_state_changed(void *data, bool started, const char *error) { - const struct spa_pod *params[1]; - struct spa_pod_builder b; - uint32_t n_params; - enum pw_stream_flags flags; - uint8_t buffer[1024]; - struct pw_properties *props; - int res; + struct impl *impl = data; - impl->first = true; - - props = pw_properties_copy(impl->stream_props); - if (props == NULL) { - res = -errno; - goto error; + if (error) { + pw_log_error("stream error: %s", error); + pw_impl_module_schedule_destroy(impl->module); + } else if (started) { + if ((errno = -stream_start(impl)) < 0) + pw_log_error("failed to start RTP stream: %m"); + } else { + if (!impl->always_process) + stream_stop(impl); } - - spa_dll_init(&impl->dll); - spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); - impl->corr = 1.0; - - impl->stream = pw_stream_new(impl->core, - "rtp-source playback", props); - if (impl->stream == NULL) { - res = -errno; - pw_log_error("can't create stream: %m"); - goto error; - } - - pw_stream_add_listener(impl->stream, - &impl->stream_listener, - &out_stream_events, impl); - - n_params = 0; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - - flags = PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS; - - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &impl->info); - flags |= PW_STREAM_FLAG_AUTOCONNECT; - break; - case SPA_MEDIA_TYPE_application: - params[n_params++] = spa_pod_builder_add_object(&b, - SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, - SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), - SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); - break; - default: - return -EINVAL; - } - - if ((res = pw_stream_connect(impl->stream, - PW_DIRECTION_OUTPUT, - PW_ID_ANY, - flags, - params, n_params)) < 0) { - pw_log_error("can't connect stream: %s", spa_strerror(res)); - goto error; - } - - if (impl->always_process && - (res = stream_start(impl)) < 0) - goto error; - - return 0; -error: - return res; } +static const struct rtp_stream_events stream_events = { + RTP_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = stream_state_changed, +}; + static void on_timer_event(void *data, uint64_t expirations) { struct impl *impl = data; @@ -986,7 +365,7 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { if (impl->stream) - pw_stream_destroy(impl->stream); + rtp_stream_destroy(impl->stream); if (impl->source) pw_loop_destroy_source(impl->data_loop, impl->source); @@ -1031,63 +410,6 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static inline uint32_t format_from_name(const char *name, size_t len) -{ - int i; - for (i = 0; spa_type_audio_format[i].name; i++) { - if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) - return spa_type_audio_format[i].type; - } - return SPA_AUDIO_FORMAT_UNKNOWN; -} - -static uint32_t channel_from_name(const char *name) -{ - int i; - for (i = 0; spa_type_audio_channel[i].name; i++) { - if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) - return spa_type_audio_channel[i].type; - } - return SPA_AUDIO_CHANNEL_UNKNOWN; -} - -static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) -{ - struct spa_json it[2]; - char v[256]; - - spa_json_init(&it[0], val, len); - if (spa_json_enter_array(&it[0], &it[1]) <= 0) - spa_json_init(&it[1], val, len); - - info->channels = 0; - while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && - info->channels < SPA_AUDIO_MAX_CHANNELS) { - info->position[info->channels++] = channel_from_name(v); - } -} - -static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) -{ - const char *str; - - spa_zero(*info); - if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) - str = DEFAULT_FORMAT; - info->format = format_from_name(str, strlen(str)); - - info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); - if (info->rate == 0) - info->rate = DEFAULT_RATE; - - info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); - info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); - if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) - parse_position(info, str, strlen(str)); - if (info->channels == 0) - parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); -} - static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -1101,10 +423,8 @@ SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); - uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); - uint32_t pid = getpid(); struct impl *impl; - const char *str; + const char *str, *sess_name; struct timespec value, interval; struct pw_properties *props, *stream_props; int res = 0; @@ -1131,18 +451,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->loop = pw_context_get_main_loop(context); impl->data_loop = pw_data_loop_get_loop(pw_context_get_data_loop(context)); - if (pw_properties_get(stream_props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(stream_props, PW_KEY_NODE_VIRTUAL, "true"); - if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) - pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); + if ((sess_name = pw_properties_get(props, "sess.name")) == NULL) + sess_name = pw_get_host_name(); if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) - pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp-source-%u-%u", pid, id); + pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp_session.%s", sess_name); if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) - pw_properties_set(props, PW_KEY_NODE_DESCRIPTION, - pw_properties_get(props, PW_KEY_NODE_NAME)); + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s", sess_name); if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) - pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Receiver Stream"); + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Session with %s", + sess_name); if ((str = pw_properties_get(props, "stream.props")) != NULL) pw_properties_update_string(stream_props, str, strlen(str)); @@ -1159,46 +477,12 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_NODE_CHANNELNAMES); copy_props(impl, props, PW_KEY_MEDIA_NAME); copy_props(impl, props, PW_KEY_MEDIA_CLASS); - - impl->info.media_type = SPA_MEDIA_TYPE_audio; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; - if ((str = pw_properties_get(stream_props, "rtp.media")) != NULL) { - if (spa_streq(str, "audio")) { - impl->info.media_type = SPA_MEDIA_TYPE_audio; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; - } - else if (spa_streq(str, "midi")) { - impl->info.media_type = SPA_MEDIA_TYPE_application; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; - } - else { - pw_log_error("unsupported media type:%s", str); - res = -EINVAL; - goto out; - } - } - - switch (impl->info.media_type) { - case SPA_MEDIA_TYPE_audio: - parse_audio_info(stream_props, &impl->info.info.raw); - impl->stride = audio_get_stride(&impl->info); - if (impl->stride == 0) { - pw_log_error("unsupported audio format:%d channels:%d", - impl->info.info.raw.format, impl->info.info.raw.channels); - res = -EINVAL; - goto out; - } - impl->rate = impl->info.info.raw.rate; - break; - case SPA_MEDIA_TYPE_application: - pw_properties_set(stream_props, PW_KEY_FORMAT_DSP, "8 bit raw midi"); - impl->stride = 1; - impl->rate = 48000; - break; - default: - spa_assert_not_reached(); - break; - } + copy_props(impl, props, "net.mtu"); + copy_props(impl, props, "rtp.media"); + copy_props(impl, props, "sess.name"); + copy_props(impl, props, "sess.min-ptime"); + copy_props(impl, props, "sess.max-ptime"); + copy_props(impl, props, "sess.latency.msec"); str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; @@ -1220,32 +504,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->cleanup_interval = pw_properties_get_uint32(props, "cleanup.sec", DEFAULT_CLEANUP_SEC); - if ((str = pw_properties_get(props, "sess.name")) != NULL) { - pw_properties_set(stream_props, "rtp.session", str); - if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) - pw_properties_setf(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", str); - if (pw_properties_get(stream_props, PW_KEY_NODE_NAME) == NULL) - pw_properties_setf(stream_props, PW_KEY_NODE_NAME, "%s", str); - } else { - if (pw_properties_get(stream_props, PW_KEY_MEDIA_NAME) == NULL) - pw_properties_set(stream_props, PW_KEY_MEDIA_NAME, "RTP Stream"); - } - - impl->ts_offset = pw_properties_get_int64(stream_props, "sess.ts-offset", 0); - pw_properties_setf(stream_props, "rtp.ts-offset", "%u", impl->ts_offset); - - impl->direct_timestamp = pw_properties_get_bool(stream_props, - "sess.ts-direct", false); - - impl->sess_latency_msec = pw_properties_get_uint32(stream_props, - "sess.latency.msec", DEFAULT_SESS_LATENCY); - impl->target_buffer = msec_to_samples(impl, impl->sess_latency_msec); - impl->max_error = msec_to_samples(impl, ERROR_MSEC); - - pw_properties_setf(stream_props, PW_KEY_NODE_RATE, "1/%d", impl->rate); - pw_properties_setf(stream_props, PW_KEY_NODE_LATENCY, "%d/%d", - impl->target_buffer / 2, impl->rate); - impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME); @@ -1281,8 +539,14 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) interval.tv_nsec = 0; pw_loop_update_timer(impl->loop, impl->timer, &value, &interval, false); - if ((res = setup_stream(impl)) < 0) + impl->stream = rtp_stream_new(impl->core, + PW_DIRECTION_OUTPUT, pw_properties_copy(stream_props), + &stream_events, impl); + if (impl->stream == NULL) { + res = -errno; + pw_log_error("can't create stream: %m"); goto out; + } pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 72c12bb17..ffa388bc3 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -21,23 +21,11 @@ #include #include -#define DEFAULT_FORMAT "S16BE" -#define DEFAULT_RATE 48000 -#define DEFAULT_CHANNELS 2 -#define DEFAULT_POSITION "[ FL FR ]" - #define BUFFER_SIZE (1u<<22) #define BUFFER_MASK (BUFFER_SIZE-1) #define BUFFER_SIZE2 (BUFFER_SIZE>>1) #define BUFFER_MASK2 (BUFFER_SIZE2-1) -#define DEFAULT_MTU 1280 -#define DEFAULT_MIN_PTIME 2 -#define DEFAULT_MAX_PTIME 20 - -#define ERROR_MSEC 2 -#define DEFAULT_SESS_LATENCY 100 - #define rtp_stream_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, \ struct rtp_stream_events, m, v, ##__VA_ARGS__) #define rtp_stream_emit_destroy(s) rtp_stream_emit(s, destroy, 0) @@ -357,6 +345,8 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->payload = pw_properties_get_uint32(props, "rtp.payload", impl->payload); impl->mtu = pw_properties_get_uint32(props, "net.mtu", DEFAULT_MTU); + impl->seq = pw_rand32(); + str = pw_properties_get(props, "sess.min-ptime"); if (!spa_atof(str, &min_ptime)) min_ptime = DEFAULT_MIN_PTIME; @@ -382,6 +372,16 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, pw_properties_setf(props, "net.mtu", "%u", impl->mtu); pw_properties_setf(props, "rtp.ptime", "%u", impl->psamples * 1000 / impl->rate); + pw_properties_setf(props, "rtp.media", "%s", impl->format_info->media_type); + pw_properties_setf(props, "rtp.mime", "%s", impl->format_info->mime); + pw_properties_setf(props, "rtp.payload", "%u", impl->payload); + pw_properties_setf(props, "rtp.rate", "%u", impl->rate); + if (impl->info.info.raw.channels > 0) + pw_properties_setf(props, "rtp.channels", "%u", impl->info.info.raw.channels); + if ((str = pw_properties_get(props, "sess.ts-refclk")) != NULL) { + pw_properties_setf(props, "rtp.ts-offset", "%u", impl->ts_offset); + pw_properties_set(props, "rtp.ts-refclk", str); + } spa_dll_init(&impl->dll); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 445c609bd..8cb137aee 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -11,6 +11,18 @@ extern "C" { struct rtp_stream; +#define DEFAULT_FORMAT "S16BE" +#define DEFAULT_RATE 48000 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_POSITION "[ FL FR ]" + +#define ERROR_MSEC 2 +#define DEFAULT_SESS_LATENCY 100 + +#define DEFAULT_MTU 1280 +#define DEFAULT_MIN_PTIME 2 +#define DEFAULT_MAX_PTIME 20 + struct rtp_stream_events { #define RTP_VERSION_STREAM_EVENTS 0 uint32_t version;