Merge branch '5307-milan-avb-talker-stream-fix' into 'master'

Resolve "milan-avb: incrase the input ring so ensure that the packetizer is not starved"

See merge request pipewire/pipewire!2854
This commit is contained in:
hackerman-kl 2026-06-09 16:49:57 +02:00
commit d130073c58
3 changed files with 64 additions and 28 deletions

View file

@ -339,7 +339,7 @@ static int send_management_request(struct gptp *gptp, uint16_t management_id,
gptp->req_sequence_id = seq; gptp->req_sequence_id = seq;
gptp->req_management_id = management_id; gptp->req_management_id = management_id;
gptp->req_sent_ns = now_ns; gptp->req_sent_ns = now_ns;
pw_log_info("PTP management request sent: id=%04x seq=%u", pw_log_debug("PTP management request sent: id=%04x seq=%u",
management_id, seq); management_id, seq);
return 0; return 0;
} }
@ -542,7 +542,7 @@ static void on_ptp_mgmt_data(void *data, int fd, uint32_t mask)
return; return;
} }
pw_log_info("PTP management socket has data (mask=%#x)", mask); pw_log_debug("PTP management socket has data (mask=%#x)", mask);
for (;;) { for (;;) {
ret = recv(fd, buf, sizeof(buf), 0); ret = recv(fd, buf, sizeof(buf), 0);
@ -553,7 +553,7 @@ static void on_ptp_mgmt_data(void *data, int fd, uint32_t mask)
pw_log_warn("Failed to receive PTP management response: %m"); pw_log_warn("Failed to receive PTP management response: %m");
return; return;
} }
pw_log_info("PTP management received %zd bytes", ret); pw_log_debug("PTP management received %zd bytes", ret);
if (ret < (ssize_t)sizeof(struct ptp_management_msg)) { if (ret < (ssize_t)sizeof(struct ptp_management_msg)) {
pw_log_warn("Received undersized PTP management response: %zd bytes", pw_log_warn("Received undersized PTP management response: %zd bytes",
ret); ret);

View file

@ -40,19 +40,19 @@ struct msrp {
static void debug_msrp_talker_common(const struct avb_packet_msrp_talker *t) static void debug_msrp_talker_common(const struct avb_packet_msrp_talker *t)
{ {
char buf[128]; char buf[128];
pw_log_info(" stream-id: %s", avb_utils_format_id(buf, sizeof(buf), be64toh(t->stream_id))); pw_log_debug(" stream-id: %s", avb_utils_format_id(buf, sizeof(buf), be64toh(t->stream_id)));
pw_log_info(" dest-addr: %s", avb_utils_format_addr(buf, sizeof(buf), t->dest_addr)); pw_log_debug(" dest-addr: %s", avb_utils_format_addr(buf, sizeof(buf), t->dest_addr));
pw_log_info(" vlan-id: %d", ntohs(t->vlan_id)); pw_log_debug(" vlan-id: %d", ntohs(t->vlan_id));
pw_log_info(" tspec-max-frame-size: %d", ntohs(t->tspec_max_frame_size)); pw_log_debug(" tspec-max-frame-size: %d", ntohs(t->tspec_max_frame_size));
pw_log_info(" tspec-max-interval-frames: %d", ntohs(t->tspec_max_interval_frames)); pw_log_debug(" tspec-max-interval-frames: %d", ntohs(t->tspec_max_interval_frames));
pw_log_info(" priority: %d", t->priority); pw_log_debug(" priority: %d", t->priority);
pw_log_info(" rank: %d", t->rank); pw_log_debug(" rank: %d", t->rank);
pw_log_info(" accumulated-latency: %d", ntohl(t->accumulated_latency)); pw_log_debug(" accumulated-latency: %d", ntohl(t->accumulated_latency));
} }
static void debug_msrp_talker(const struct avb_packet_msrp_talker *t) static void debug_msrp_talker(const struct avb_packet_msrp_talker *t)
{ {
pw_log_info("talker"); pw_log_debug("talker");
debug_msrp_talker_common(t); debug_msrp_talker_common(t);
} }
@ -169,10 +169,10 @@ static int encode_talker(struct msrp *msrp, struct attr *a, void *m, size_t maxs
static void debug_msrp_talker_fail(const struct avb_packet_msrp_talker_fail *t) static void debug_msrp_talker_fail(const struct avb_packet_msrp_talker_fail *t)
{ {
char buf[128]; char buf[128];
pw_log_info("talker fail"); pw_log_debug("talker fail");
debug_msrp_talker_common(&t->talker); debug_msrp_talker_common(&t->talker);
pw_log_info(" bridge-id: %s", avb_utils_format_id(buf, sizeof(buf), be64toh(t->bridge_id))); pw_log_debug(" bridge-id: %s", avb_utils_format_id(buf, sizeof(buf), be64toh(t->bridge_id)));
pw_log_info(" failure-code: %d", t->failure_code); pw_log_debug(" failure-code: %d", t->failure_code);
} }
static int process_talker_fail(struct msrp *msrp, uint64_t now, uint8_t attr_type, static int process_talker_fail(struct msrp *msrp, uint64_t now, uint8_t attr_type,
@ -193,9 +193,9 @@ static int process_talker_fail(struct msrp *msrp, uint64_t now, uint8_t attr_typ
static void debug_msrp_listener(const struct avb_packet_msrp_listener *l, uint8_t param) static void debug_msrp_listener(const struct avb_packet_msrp_listener *l, uint8_t param)
{ {
char buf[128]; char buf[128];
pw_log_info("listener"); pw_log_debug("listener");
pw_log_info(" %s", avb_utils_format_id(buf, sizeof(buf), be64toh(l->stream_id))); pw_log_debug(" %s", avb_utils_format_id(buf, sizeof(buf), be64toh(l->stream_id)));
pw_log_info(" %d", param); pw_log_debug(" %d", param);
} }
static void notify_listener(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) static void notify_listener(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
@ -308,10 +308,10 @@ static int encode_listener(struct msrp *msrp, struct attr *a, void *m, size_t ma
static void debug_msrp_domain(const struct avb_packet_msrp_domain *d) static void debug_msrp_domain(const struct avb_packet_msrp_domain *d)
{ {
pw_log_info("domain"); pw_log_debug("domain");
pw_log_info(" id: %d", d->sr_class_id); pw_log_debug(" id: %d", d->sr_class_id);
pw_log_info(" prio: %d", d->sr_class_priority); pw_log_debug(" prio: %d", d->sr_class_priority);
pw_log_info(" vid: %d", ntohs(d->sr_class_vid)); pw_log_debug(" vid: %d", ntohs(d->sr_class_vid));
} }
static void notify_domain(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify) static void notify_domain(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
@ -544,7 +544,7 @@ static void msrp_event(void *data, uint64_t now, uint8_t event)
if (dispatch[a->attr->type].encode == NULL) if (dispatch[a->attr->type].encode == NULL)
continue; continue;
pw_log_info("MSRP encode %s %s", pw_log_debug("MSRP encode %s %s",
dispatch[a->attr->type].name, dispatch[a->attr->type].name,
avb_mrp_send_name(a->attr->mrp->pending_send)); avb_mrp_send_name(a->attr->mrp->pending_send));
@ -563,7 +563,7 @@ static void msrp_event(void *data, uint64_t now, uint8_t event)
f->end_mark = 0; f->end_mark = 0;
if (count > 0) { if (count > 0) {
pw_log_info("MSRP send: %d attribute(s), %zu bytes", count, total); pw_log_debug("MSRP send: %d attribute(s), %zu bytes", count, total);
avb_server_send_packet(msrp->server, msrp_mac, AVB_MSRP_ETH, avb_server_send_packet(msrp->server, msrp_mac, AVB_MSRP_ETH,
buffer, total); buffer, total);
} }

View file

@ -395,7 +395,7 @@ static void on_source_stream_process(void *data)
int64_t dtai = (int64_t)(consume_tai - stream->play_last_consume_tai); int64_t dtai = (int64_t)(consume_tai - stream->play_last_consume_tai);
double local_rate = dtai > 0 double local_rate = dtai > 0
? (double)dticks * 1e9 / (double)dtai : 0.0; ? (double)dticks * 1e9 / (double)dtai : 0.0;
pw_log_info("milan-avb: play measure local_rate=%.4f Hz " pw_log_debug("milan-avb: play measure local_rate=%.4f Hz "
"mc.rate=%.4f corr=%.6f err_ns=%d ticks=%llu | " "mc.rate=%.4f corr=%.6f err_ns=%d ticks=%llu | "
"actuator rate=%.6f play_corr=%.6f target=%d avail=%d", "actuator rate=%.6f play_corr=%.6f target=%d avail=%d",
local_rate, stream->mc.rate, stream->mc.corr, local_rate, stream->mc.rate, stream->mc.corr,
@ -462,7 +462,7 @@ static void on_source_stream_io_changed(void *data, uint32_t id,
default: name = "?"; break; default: name = "?"; break;
} }
/* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator knob) on this source. */ /* milan-avb: logs whether the adapter gave us SPA_IO_RateMatch (the actuator knob) on this source. */
pw_log_info("milan-avb: io_changed id=%u (%s) area=%p size=%u", pw_log_debug("milan-avb: io_changed id=%u (%s) area=%p size=%u",
id, name, area, (unsigned)size); id, name, area, (unsigned)size);
} }
@ -1540,8 +1540,18 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
if ((fd = setup_socket(stream)) < 0) if ((fd = setup_socket(stream)) < 0)
return fd; return fd;
/* milan-avb: only listeners (STREAM_INPUT) have an RX path. The talker
* (STREAM_OUTPUT) is the graph driver and transmits on this same fd via
* sendmsg(); since the socket is AF_PACKET/SOCK_RAW opened with ETH_P_ALL
* and the OUTPUT branch never bind()s it, arming SPA_IO_IN would make the
* kernel packet-tap loop every transmitted frame (PACKET_OUTGOING) back
* into on_socket_data. plus all other host egress/ingress flooding the
* "short packet" guard and even re-ingesting our own AAF egress (h->dest
* == stream->addr matches the RX filter). Talkers get mask 0 so the fd is
* still held for sendmsg()/cleanup but never wakes on inbound packets. */
stream->source = pw_loop_add_io(server->impl->loop, fd, stream->source = pw_loop_add_io(server->impl->loop, fd,
SPA_IO_IN, true, on_socket_data, stream); stream->direction == SPA_DIRECTION_INPUT ? SPA_IO_IN : 0,
true, on_socket_data, stream);
if (stream->source == NULL) { if (stream->source == NULL) {
res = -errno; res = -errno;
pw_log_error("stream %p: can't create source: %m", stream); pw_log_error("stream %p: can't create source: %m", stream);
@ -1621,6 +1631,29 @@ int stream_activate(struct stream *stream, uint16_t index, uint64_t now)
/* M2: re-anchor the presentation-timestamp accumulator on connect. */ /* M2: re-anchor the presentation-timestamp accumulator on connect. */
stream->tx_pts = 0; stream->tx_pts = 0;
/* milan-avb: prime the talker ring with one PipeWire quantum of
* silence so the burst-fill (one quantum per graph cycle) and the
* 125us flush-drain decouple. Without this DC floor the per-cycle
* ring-occupancy trough rides at ~0, and on_flush_tick's
* pad_ringbuffer_with_silence() then splices a zero/partial-zero PDU
* into otherwise-good audio at the trough (an isolated return-to-zero
* 1024 is not a multiple of frames_per_pdu so a
* sub-PDU real remainder is mixed with manufactured zeros). With a
* full-quantum margin the trough never reaches the pad threshold, so
* pad reverts to a genuine last-resort keep-alive for true underruns.
* Mirrors the INPUT prime at the top of this function; reuses the
* stride already derived by stream_apply_current_format(), so it
* cannot reintroduce the stride mismatch. tx_pts is PHC-anchored in
* flush_write_milan_v12(), so this adds only ~one quantum (~21ms) of
* fixed egress latency, well inside the Milan MTT budget. Primed here,
* before do_add_flush_timer arms the drain below. */
spa_ringbuffer_init(&stream->ring);
if (stream->frames_per_pdu > 0) {
uint32_t prefill_pdus = 1024u / stream->frames_per_pdu;
if (prefill_pdus > 0)
pad_ringbuffer_with_silence(stream, (int)prefill_pdus);
}
common->tastream_attr.attr.talker.stream_id = htobe64(stream->id); common->tastream_attr.attr.talker.stream_id = htobe64(stream->id);
memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6); memcpy(common->tastream_attr.attr.talker.dest_addr, stream->addr, 6);
@ -1698,8 +1731,11 @@ int stream_activate_virtual(struct stream *stream, uint16_t index)
if (fd < 0) if (fd < 0)
return fd; return fd;
/* milan-avb: see stream_activate (talkers) (STREAM_OUTPUT) take mask 0
* so the shared TX fd never loops egress back into on_socket_data. */
stream->source = pw_loop_add_io(server->impl->loop, fd, stream->source = pw_loop_add_io(server->impl->loop, fd,
SPA_IO_IN, true, on_socket_data, stream); stream->direction == SPA_DIRECTION_INPUT ? SPA_IO_IN : 0,
true, on_socket_data, stream);
if (stream->source == NULL) if (stream->source == NULL)
return -errno; return -errno;
} }