stream: add peer_added and peer_removed signals

Forward existing impl-node peer_added and peer_removed signals on the
stream.

Because the stream is not on the server, there is no impl_node in the
node target. Add the node id to the peer_added/removed signal and use
that for the stream event argument. Implementations can then look up the
details on the global.
This commit is contained in:
Wim Taymans 2025-11-04 11:29:52 +01:00
parent 41cdd82291
commit 3eb011c9d1
6 changed files with 40 additions and 14 deletions

View file

@ -1270,11 +1270,14 @@ static void client_node_resource_pong(void *data, int seq)
spa_node_emit_result(&impl->hooks, seq, 0, 0, NULL); spa_node_emit_result(&impl->hooks, seq, 0, 0, NULL);
} }
static void node_peer_added(void *data, struct pw_impl_node *peer) static void node_peer_added(void *data, struct pw_impl_node *peer, uint32_t id)
{ {
struct impl *impl = data; struct impl *impl = data;
struct pw_memblock *m; struct pw_memblock *m;
if (peer == NULL)
return;
m = pw_mempool_import_block(impl->client_pool, peer->activation); m = pw_mempool_import_block(impl->client_pool, peer->activation);
if (m == NULL) { if (m == NULL) {
pw_log_warn("%p: can't ensure mem: %m", impl); pw_log_warn("%p: can't ensure mem: %m", impl);
@ -1295,11 +1298,14 @@ static void node_peer_added(void *data, struct pw_impl_node *peer)
sizeof(struct pw_node_activation)); sizeof(struct pw_node_activation));
} }
static void node_peer_removed(void *data, struct pw_impl_node *peer) static void node_peer_removed(void *data, struct pw_impl_node *peer, uint32_t id)
{ {
struct impl *impl = data; struct impl *impl = data;
struct pw_memblock *m; struct pw_memblock *m;
if (peer == NULL)
return;
m = pw_mempool_find_fd(impl->client_pool, peer->activation->fd); m = pw_mempool_find_fd(impl->client_pool, peer->activation->fd);
if (m == NULL) { if (m == NULL) {
pw_log_warn("%p: unknown peer %p fd:%d", impl, peer, pw_log_warn("%p: unknown peer %p fd:%d", impl, peer,

View file

@ -898,8 +898,8 @@ int pw_impl_node_add_target(struct pw_impl_node *node, struct pw_node_target *t)
{ {
pw_loop_locked(node->data_loop, pw_loop_locked(node->data_loop,
do_add_target, SPA_ID_INVALID, &node, sizeof(void *), t); do_add_target, SPA_ID_INVALID, &node, sizeof(void *), t);
if (t->node)
pw_impl_node_emit_peer_added(node, t->node); pw_impl_node_emit_peer_added(node, t->node, t->id);
return 0; return 0;
} }
@ -934,8 +934,8 @@ int pw_impl_node_remove_target(struct pw_impl_node *node, struct pw_node_target
* can inspect the nodes as well */ * can inspect the nodes as well */
pw_loop_locked(node->data_loop, pw_loop_locked(node->data_loop,
do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), t); do_remove_target, SPA_ID_INVALID, &node, sizeof(void *), t);
if (t->node)
pw_impl_node_emit_peer_removed(node, t->node); pw_impl_node_emit_peer_removed(node, t->node, t->id);
return 0; return 0;
} }

View file

@ -29,7 +29,7 @@ struct pw_impl_port;
/** Node events, listen to them with \ref pw_impl_node_add_listener */ /** Node events, listen to them with \ref pw_impl_node_add_listener */
struct pw_impl_node_events { struct pw_impl_node_events {
#define PW_VERSION_IMPL_NODE_EVENTS 0 #define PW_VERSION_IMPL_NODE_EVENTS 1
uint32_t version; uint32_t version;
/** the node is destroyed */ /** the node is destroyed */
@ -69,10 +69,10 @@ struct pw_impl_node_events {
/** the driver of the node changed */ /** the driver of the node changed */
void (*driver_changed) (void *data, struct pw_impl_node *old, struct pw_impl_node *driver); void (*driver_changed) (void *data, struct pw_impl_node *old, struct pw_impl_node *driver);
/** a peer was added */ /** a peer was added, id added in version 1 */
void (*peer_added) (void *data, struct pw_impl_node *peer); void (*peer_added) (void *data, struct pw_impl_node *peer, uint32_t id);
/** a peer was removed */ /** a peer was removed, id added in version 1 */
void (*peer_removed) (void *data, struct pw_impl_node *peer); void (*peer_removed) (void *data, struct pw_impl_node *peer, uint32_t id);
}; };
struct pw_impl_node_rt_events { struct pw_impl_node_rt_events {

View file

@ -717,8 +717,8 @@ void pw_node_peer_unref(struct pw_node_peer *peer);
#define pw_impl_node_emit_result(n,s,r,t,result) pw_impl_node_emit(n, result, 0, s, r, t, result) #define pw_impl_node_emit_result(n,s,r,t,result) pw_impl_node_emit(n, result, 0, s, r, t, result)
#define pw_impl_node_emit_event(n,e) pw_impl_node_emit(n, event, 0, e) #define pw_impl_node_emit_event(n,e) pw_impl_node_emit(n, event, 0, e)
#define pw_impl_node_emit_driver_changed(n,o,d) pw_impl_node_emit(n, driver_changed, 0, o, d) #define pw_impl_node_emit_driver_changed(n,o,d) pw_impl_node_emit(n, driver_changed, 0, o, d)
#define pw_impl_node_emit_peer_added(n,p) pw_impl_node_emit(n, peer_added, 0, p) #define pw_impl_node_emit_peer_added(n,p,i) pw_impl_node_emit(n, peer_added, 1, p, i)
#define pw_impl_node_emit_peer_removed(n,p) pw_impl_node_emit(n, peer_removed, 0, p) #define pw_impl_node_emit_peer_removed(n,p,i) pw_impl_node_emit(n, peer_removed, 1, p, i)
#define pw_impl_node_rt_emit(o,m,v,...) spa_hook_list_call(&o->rt_listener_list, struct pw_impl_node_rt_events, m, v, ##__VA_ARGS__) #define pw_impl_node_rt_emit(o,m,v,...) spa_hook_list_call(&o->rt_listener_list, struct pw_impl_node_rt_events, m, v, ##__VA_ARGS__)
#define pw_impl_node_rt_emit_drained(n) pw_impl_node_rt_emit(n, drained, 0) #define pw_impl_node_rt_emit_drained(n) pw_impl_node_rt_emit(n, drained, 0)
@ -1145,6 +1145,8 @@ struct pw_core {
#define pw_stream_emit_control_info(s,i,c) pw_stream_emit(s, control_info, 0, i, c) #define pw_stream_emit_control_info(s,i,c) pw_stream_emit(s, control_info, 0, i, c)
#define pw_stream_emit_command(s,c) pw_stream_emit(s, command,1,c) #define pw_stream_emit_command(s,c) pw_stream_emit(s, command,1,c)
#define pw_stream_emit_trigger_done(s) pw_stream_emit(s, trigger_done,2) #define pw_stream_emit_trigger_done(s) pw_stream_emit(s, trigger_done,2)
#define pw_stream_emit_peer_added(s,i) pw_stream_emit(s, peer_added, 3, i)
#define pw_stream_emit_peer_removed(s,i) pw_stream_emit(s, peer_removed, 3, i)
struct pw_stream { struct pw_stream {

View file

@ -1466,11 +1466,24 @@ static void node_state_changed(void *data, enum pw_node_state old,
} }
} }
static void node_peer_added (void *data, struct pw_impl_node *node, uint32_t id)
{
struct pw_stream *stream = data;
pw_stream_emit_peer_added(stream, id);
}
static void node_peer_removed (void *data, struct pw_impl_node *node, uint32_t id)
{
struct pw_stream *stream = data;
pw_stream_emit_peer_removed(stream, id);
}
static const struct pw_impl_node_events node_events = { static const struct pw_impl_node_events node_events = {
PW_VERSION_IMPL_NODE_EVENTS, PW_VERSION_IMPL_NODE_EVENTS,
.destroy = node_event_destroy, .destroy = node_event_destroy,
.info_changed = node_event_info, .info_changed = node_event_info,
.state_changed = node_state_changed, .state_changed = node_state_changed,
.peer_added = node_peer_added,
.peer_removed = node_peer_removed,
}; };
static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message)

View file

@ -414,7 +414,7 @@ struct pw_time {
/** Events for a stream. These events are always called from the mainloop /** Events for a stream. These events are always called from the mainloop
* unless explicitly documented otherwise. */ * unless explicitly documented otherwise. */
struct pw_stream_events { struct pw_stream_events {
#define PW_VERSION_STREAM_EVENTS 2 #define PW_VERSION_STREAM_EVENTS 3
uint32_t version; uint32_t version;
void (*destroy) (void *data); void (*destroy) (void *data);
@ -453,6 +453,11 @@ struct pw_stream_events {
* can also be called directly from the realtime data * can also be called directly from the realtime data
* thread if the user is prepared to deal with this. */ * thread if the user is prepared to deal with this. */
void (*trigger_done) (void *data); void (*trigger_done) (void *data);
/* notify that a new node linked to the stream, since 1.6.0:3 */
void (*peer_added) (void *data, uint32_t id);
/* notify that a new node unlinked to the stream, since 1.6.0:3 */
void (*peer_removed) (void *data, uint32_t id);
}; };
/** Convert a stream state to a readable string */ /** Convert a stream state to a readable string */