avb: implement more MRP

Join a READY listener MRP attribute when we recveive the transmit
response.
Also listen for talker attributes.
Remove attribute callbacks, we don't need them because we moved the
packet construction in the MRP users.
Implement notify for listener attributes.

A connect will now make my MOTU send samples to PipeWire.
This commit is contained in:
Wim Taymans 2022-03-24 17:00:40 +01:00
parent f64f8cdd4d
commit 0868f0c7b0
11 changed files with 325 additions and 204 deletions

View file

@ -44,16 +44,7 @@ struct msrp {
struct spa_list attributes;
};
static struct attr *find_attr_by_stream_id(struct msrp *msrp, uint64_t stream_id)
{
struct attr *a;
spa_list_for_each(a, &msrp->attributes, link)
if (a->attr.attr.talker.stream_id == stream_id)
return a;
return NULL;
}
static void debug_msrp_talker(const struct avbtp_packet_msrp_talker *t)
static void debug_msrp_talker_common(const struct avbtp_packet_msrp_talker *t)
{
char buf[128];
pw_log_info(" stream-id: %s", avbtp_utils_format_id(buf, sizeof(buf), be64toh(t->stream_id)));
@ -66,25 +57,37 @@ static void debug_msrp_talker(const struct avbtp_packet_msrp_talker *t)
pw_log_info(" accumulated-latency: %d", ntohl(t->accumulated_latency));
}
static void debug_msrp_talker(const struct avbtp_packet_msrp_talker *t)
{
pw_log_info("talker");
debug_msrp_talker_common(t);
}
static void notify_talker(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
{
pw_log_info("> notify talker: %d", notify);
debug_msrp_talker(&attr->attr.attr.talker);
}
static int process_talker(struct msrp *msrp, uint64_t now, uint8_t attr_type,
const void *m, uint8_t event, uint8_t param, int num)
{
const struct avbtp_packet_msrp_talker *t = m;
struct attr *a;
pw_log_info("talker");
debug_msrp_talker(t);
a = find_attr_by_stream_id(msrp, be64toh(t->stream_id));
if (a)
avbtp_mrp_rx_event(msrp->server->mrp, now, a->attr.mrp, event);
spa_list_for_each(a, &msrp->attributes, link)
if (a->attr.type == attr_type &&
a->attr.attr.talker.stream_id == t->stream_id) {
a->attr.attr.talker = *t;
avbtp_mrp_rx_event(msrp->server->mrp, now, a->attr.mrp, event);
}
return 0;
}
static void debug_msrp_talker_fail(const struct avbtp_packet_msrp_talker_fail *t)
{
char buf[128];
debug_msrp_talker(&t->talker);
pw_log_info("talker fail");
debug_msrp_talker_common(&t->talker);
pw_log_info(" bridge-id: %s", avbtp_utils_format_id(buf, sizeof(buf), be64toh(t->bridge_id)));
pw_log_info(" failure-code: %d", t->failure_code);
}
@ -93,50 +96,137 @@ static int process_talker_fail(struct msrp *msrp, uint64_t now, uint8_t attr_typ
const void *m, uint8_t event, uint8_t param, int num)
{
const struct avbtp_packet_msrp_talker_fail *t = m;
pw_log_info("talker fail");
struct attr *a;
debug_msrp_talker_fail(t);
spa_list_for_each(a, &msrp->attributes, link)
if (a->attr.type == attr_type &&
a->attr.attr.talker_fail.talker.stream_id == t->talker.stream_id)
avbtp_mrp_rx_event(msrp->server->mrp, now, a->attr.mrp, event);
return 0;
}
static void debug_msrp_listener(const struct avbtp_packet_msrp_listener *l)
{
char buf[128];
pw_log_info(" %s", avbtp_utils_format_id(buf, sizeof(buf), l->stream_id));
pw_log_info("listener");
pw_log_info(" %s", avbtp_utils_format_id(buf, sizeof(buf), be64toh(l->stream_id)));
}
static void notify_listener(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
{
pw_log_info("> notify listener: %d", notify);
debug_msrp_listener(&attr->attr.attr.listener);
}
static int process_listener(struct msrp *msrp, uint64_t now, uint8_t attr_type,
const void *m, uint8_t event, uint8_t param, int num)
{
const struct avbtp_packet_msrp_listener *l = m;
pw_log_info("listener");
debug_msrp_listener(l);
struct attr *a;
spa_list_for_each(a, &msrp->attributes, link)
if (a->attr.type == attr_type &&
a->attr.attr.listener.stream_id == l->stream_id)
avbtp_mrp_rx_event(msrp->server->mrp, now, a->attr.mrp, event);
return 0;
}
static int encode_listener(struct msrp *msrp, struct attr *a, void *m)
{
struct avbtp_packet_msrp_msg *msg = m;
struct avbtp_packet_mrp_vector *v;
struct avbtp_packet_msrp_listener *l;
struct avbtp_packet_mrp_footer *f;
uint8_t *ev;
size_t attr_list_length = sizeof(*v) + sizeof(*l) + sizeof(*f) + 1 + 1;
msg->attribute_type = AVBTP_MSRP_ATTRIBUTE_TYPE_LISTENER;
msg->attribute_length = sizeof(*l);
msg->attribute_list_length = htons(attr_list_length);
v = (struct avbtp_packet_mrp_vector *)msg->attribute_list;
v->lva = 0;
AVBTP_MRP_VECTOR_SET_NUM_VALUES(v, 1);
l = (struct avbtp_packet_msrp_listener *)v->first_value;
*l = a->attr.attr.listener;
ev = SPA_PTROFF(l, sizeof(*l), uint8_t);
*ev = a->attr.mrp->pending_send * 6 * 6;
ev = SPA_PTROFF(ev, sizeof(*ev), uint8_t);
*ev = a->attr.param * 4 * 4 * 4;
f = SPA_PTROFF(ev, sizeof(*ev), struct avbtp_packet_mrp_footer);
f->end_mark = 0;
return attr_list_length + sizeof(*msg);
}
static void debug_msrp_domain(const struct avbtp_packet_msrp_domain *d)
{
pw_log_info("domain");
pw_log_info(" %d", d->sr_class_id);
pw_log_info(" %d", d->sr_class_priority);
pw_log_info(" %d", ntohs(d->sr_class_vid));
}
static void notify_domain(struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify)
{
pw_log_info("> notify domain: %d", notify);
debug_msrp_domain(&attr->attr.attr.domain);
}
static int process_domain(struct msrp *msrp, uint64_t now, uint8_t attr_type,
const void *m, uint8_t event, uint8_t param, int num)
{
const struct avbtp_packet_msrp_domain *d = m;
pw_log_info("domain");
debug_msrp_domain(d);
struct attr *a;
spa_list_for_each(a, &msrp->attributes, link)
if (a->attr.type == attr_type)
avbtp_mrp_rx_event(msrp->server->mrp, now, a->attr.mrp, event);
return 0;
}
static int encode_domain(struct msrp *msrp, struct attr *a, void *m)
{
struct avbtp_packet_msrp_msg *msg = m;
struct avbtp_packet_mrp_vector *v;
struct avbtp_packet_msrp_domain *d;
struct avbtp_packet_mrp_footer *f;
uint8_t *ev;
size_t attr_list_length = sizeof(*v) + sizeof(*d) + sizeof(*f) + 1;
msg->attribute_type = AVBTP_MSRP_ATTRIBUTE_TYPE_DOMAIN;
msg->attribute_length = sizeof(*d);
msg->attribute_list_length = htons(attr_list_length);
v = (struct avbtp_packet_mrp_vector *)msg->attribute_list;
v->lva = 0;
AVBTP_MRP_VECTOR_SET_NUM_VALUES(v, 1);
d = (struct avbtp_packet_msrp_domain *)v->first_value;
*d = a->attr.attr.domain;
ev = SPA_PTROFF(d, sizeof(*d), uint8_t);
*ev = a->attr.mrp->pending_send * 36;
f = SPA_PTROFF(ev, sizeof(*ev), struct avbtp_packet_mrp_footer);
f->end_mark = 0;
return attr_list_length + sizeof(*msg);
}
static const struct {
int (*dispatch) (struct msrp *msrp, uint64_t now, uint8_t attr_type,
const void *m, uint8_t event, uint8_t param, int num);
int (*encode) (struct msrp *msrp, struct attr *attr, void *m);
void (*notify) (struct msrp *msrp, uint64_t now, struct attr *attr, uint8_t notify);
} dispatch[] = {
[AVBTP_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE] = { process_talker, },
[AVBTP_MSRP_ATTRIBUTE_TYPE_TALKER_FAILED] = { process_talker_fail, },
[AVBTP_MSRP_ATTRIBUTE_TYPE_LISTENER] = { process_listener, },
[AVBTP_MSRP_ATTRIBUTE_TYPE_DOMAIN] = { process_domain, },
[AVBTP_MSRP_ATTRIBUTE_TYPE_TALKER_ADVERTISE] = { process_talker, NULL, notify_talker, },
[AVBTP_MSRP_ATTRIBUTE_TYPE_TALKER_FAILED] = { process_talker_fail, NULL, NULL },
[AVBTP_MSRP_ATTRIBUTE_TYPE_LISTENER] = { process_listener, encode_listener, notify_listener },
[AVBTP_MSRP_ATTRIBUTE_TYPE_DOMAIN] = { process_domain, encode_domain, notify_domain, },
};
static bool msrp_check_header(void *data, const void *hdr, size_t *hdr_size, bool *has_params)
@ -188,7 +278,7 @@ static int msrp_message(void *data, uint64_t now, const void *message, int len)
if (memcmp(p->eth.dest, mac, 6) != 0)
return 0;
pw_log_info("MSRP");
pw_log_debug("MSRP");
return avbtp_mrp_parse_packet(msrp->server->mrp,
now, message, len, &info, msrp);
}
@ -206,23 +296,6 @@ static const struct server_events server_events = {
.message = msrp_message
};
static int msrp_attr_compare(void *data, struct avbtp_mrp_attribute *a, struct avbtp_mrp_attribute *b)
{
return 0;
}
static int msrp_attr_merge(void *data, struct avbtp_mrp_attribute *a, int vector)
{
pw_log_info("attr merge");
return 0;
}
static const struct avbtp_mrp_attribute_callbacks attr_cb = {
AVBTP_VERSION_MRP_ATTRIBUTE_CALLBACKS,
.compare = msrp_attr_compare,
.merge = msrp_attr_merge
};
struct avbtp_msrp_attribute *avbtp_msrp_attribute_new(struct avbtp_msrp *m,
uint8_t type)
{
@ -230,26 +303,63 @@ struct avbtp_msrp_attribute *avbtp_msrp_attribute_new(struct avbtp_msrp *m,
struct avbtp_mrp_attribute *attr;
struct attr *a;
attr = avbtp_mrp_attribute_new(msrp->server->mrp,
&attr_cb, msrp, sizeof(struct attr));
attr = avbtp_mrp_attribute_new(msrp->server->mrp, sizeof(struct attr));
a = attr->user_data;
a->attr.mrp = attr;
spa_list_append(&msrp->attributes, &a->link);
a->attr.type = type;
spa_list_append(&msrp->attributes, &a->link);
return &a->attr;
}
static int msrp_tx_event(void *data, uint8_t event, bool start)
static void msrp_event(void *data, uint64_t now, uint8_t event)
{
pw_log_info("tx %s", start ? "start" : "stop");
return 0;
struct msrp *msrp = data;
uint8_t buffer[2048];
struct avbtp_packet_mrp *p = (struct avbtp_packet_mrp*)buffer;
struct avbtp_packet_mrp_footer *f;
void *msg = SPA_PTROFF(buffer, sizeof(*p), void);
struct attr *a;
int len, count = 0;
size_t total = sizeof(*p) + 2;
p->version = AVBTP_MRP_PROTOCOL_VERSION;
spa_list_for_each(a, &msrp->attributes, link) {
if (!a->attr.mrp->pending_send)
continue;
if (dispatch[a->attr.type].encode == NULL)
continue;
len = dispatch[a->attr.type].encode(msrp, a, msg);
if (len < 0)
break;
count++;
msg = SPA_PTROFF(msg, len, void);
total += len;
}
f = (struct avbtp_packet_mrp_footer *)msg;
f->end_mark = 0;
if (count > 0)
avbtp_server_send_packet(msrp->server, mac, AVB_MSRP_ETH,
buffer, total);
}
static void msrp_notify(void *data, uint64_t now, struct avbtp_mrp_attribute *attr, uint8_t notify)
{
struct msrp *msrp = data;
struct attr *a = attr->user_data;
if (dispatch[a->attr.type].notify != NULL)
dispatch[a->attr.type].notify(msrp, now, a, notify);
}
static const struct avbtp_mrp_events mrp_events = {
AVBTP_VERSION_MRP_ATTRIBUTE_CALLBACKS,
.tx_event = msrp_tx_event
.event = msrp_event,
.notify = msrp_notify
};
struct avbtp_msrp *avbtp_msrp_register(struct server *server)