mirror of
				https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
				synced 2025-11-03 09:01:50 -05:00 
			
		
		
		
	tunnel-sink-new: create sink *after* connection
The io thread, after connection, sends a message asking for a sink to be created. After the ctl thread is done with creation, it sends a message back to the io thread so it can continue. This ensures that the sink only exists when it's connected to something. Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/688>
This commit is contained in:
		
							parent
							
								
									117fa0cbe5
								
							
						
					
					
						commit
						34d00afc74
					
				
					 1 changed files with 133 additions and 65 deletions
				
			
		| 
						 | 
					@ -66,6 +66,21 @@ static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *user
 | 
				
			||||||
static void context_state_cb(pa_context *c, void *userdata);
 | 
					static void context_state_cb(pa_context *c, void *userdata);
 | 
				
			||||||
static void sink_update_requested_latency_cb(pa_sink *s);
 | 
					static void sink_update_requested_latency_cb(pa_sink *s);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					struct tunnel_msg {
 | 
				
			||||||
 | 
					    pa_msgobject parent;
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					typedef struct tunnel_msg tunnel_msg;
 | 
				
			||||||
 | 
					PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					enum {
 | 
				
			||||||
 | 
					    TUNNEL_MESSAGE_CREATE_SINK_REQUEST,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					enum {
 | 
				
			||||||
 | 
					    TUNNEL_MESSAGE_SINK_CREATED = PA_SINK_MESSAGE_MAX,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct userdata {
 | 
					struct userdata {
 | 
				
			||||||
    pa_module *module;
 | 
					    pa_module *module;
 | 
				
			||||||
    pa_sink *sink;
 | 
					    pa_sink *sink;
 | 
				
			||||||
| 
						 | 
					@ -81,6 +96,7 @@ struct userdata {
 | 
				
			||||||
    bool update_stream_bufferattr_after_connect;
 | 
					    bool update_stream_bufferattr_after_connect;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    bool connected;
 | 
					    bool connected;
 | 
				
			||||||
 | 
					    bool shutting_down;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    char *cookie_file;
 | 
					    char *cookie_file;
 | 
				
			||||||
    char *remote_server;
 | 
					    char *remote_server;
 | 
				
			||||||
| 
						 | 
					@ -90,6 +106,8 @@ struct userdata {
 | 
				
			||||||
    pa_proplist *sink_proplist;
 | 
					    pa_proplist *sink_proplist;
 | 
				
			||||||
    pa_sample_spec sample_spec;
 | 
					    pa_sample_spec sample_spec;
 | 
				
			||||||
    pa_channel_map channel_map;
 | 
					    pa_channel_map channel_map;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    tunnel_msg *msg;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static const char* const valid_modargs[] = {
 | 
					static const char* const valid_modargs[] = {
 | 
				
			||||||
| 
						 | 
					@ -188,7 +206,7 @@ static void thread_func(void *userdata) {
 | 
				
			||||||
                goto fail;
 | 
					                goto fail;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
 | 
					        if (u->sink && PA_UNLIKELY(u->sink->thread_info.rewind_requested))
 | 
				
			||||||
            pa_sink_process_rewind(u->sink, 0);
 | 
					            pa_sink_process_rewind(u->sink, 0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if (u->connected &&
 | 
					        if (u->connected &&
 | 
				
			||||||
| 
						 | 
					@ -305,17 +323,7 @@ static void stream_overflow_callback(pa_stream *stream, void *userdata) {
 | 
				
			||||||
    pa_log_info("Server signalled buffer overrun.");
 | 
					    pa_log_info("Server signalled buffer overrun.");
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void context_state_cb(pa_context *c, void *userdata) {
 | 
					static void on_sink_created(struct userdata *u) {
 | 
				
			||||||
    struct userdata *u = userdata;
 | 
					 | 
				
			||||||
    pa_assert(u);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    switch (pa_context_get_state(c)) {
 | 
					 | 
				
			||||||
        case PA_CONTEXT_UNCONNECTED:
 | 
					 | 
				
			||||||
        case PA_CONTEXT_CONNECTING:
 | 
					 | 
				
			||||||
        case PA_CONTEXT_AUTHORIZING:
 | 
					 | 
				
			||||||
        case PA_CONTEXT_SETTING_NAME:
 | 
					 | 
				
			||||||
            break;
 | 
					 | 
				
			||||||
        case PA_CONTEXT_READY: {
 | 
					 | 
				
			||||||
    pa_proplist *proplist;
 | 
					    pa_proplist *proplist;
 | 
				
			||||||
    pa_buffer_attr bufferattr;
 | 
					    pa_buffer_attr bufferattr;
 | 
				
			||||||
    pa_usec_t requested_latency;
 | 
					    pa_usec_t requested_latency;
 | 
				
			||||||
| 
						 | 
					@ -326,8 +334,15 @@ static void context_state_cb(pa_context *c, void *userdata) {
 | 
				
			||||||
    pa_xfree(hostname);
 | 
					    pa_xfree(hostname);
 | 
				
			||||||
    pa_xfree(username);
 | 
					    pa_xfree(username);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            pa_log_debug("Connection successful. Creating stream.");
 | 
					    pa_assert_io_context();
 | 
				
			||||||
            pa_assert(!u->stream);
 | 
					
 | 
				
			||||||
 | 
					    /* if we still don't have a sink, then sink creation failed, and we should
 | 
				
			||||||
 | 
					     * kill this io thread */
 | 
				
			||||||
 | 
					    if (!u->sink) {
 | 
				
			||||||
 | 
					        pa_log_error("Could not create a sink.");
 | 
				
			||||||
 | 
					        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
 | 
				
			||||||
 | 
					        return;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    proplist = tunnel_new_proplist(u);
 | 
					    proplist = tunnel_new_proplist(u);
 | 
				
			||||||
    u->stream = pa_stream_new_with_proplist(u->context,
 | 
					    u->stream = pa_stream_new_with_proplist(u->context,
 | 
				
			||||||
| 
						 | 
					@ -353,10 +368,10 @@ static void context_state_cb(pa_context *c, void *userdata) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
 | 
					    pa_log_debug("tlength requested at %lu.", (unsigned long) bufferattr.tlength);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
 | 
					    pa_stream_set_state_callback(u->stream, stream_state_cb, u);
 | 
				
			||||||
            pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, userdata);
 | 
					    pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
 | 
				
			||||||
            pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, userdata);
 | 
					    pa_stream_set_underflow_callback(u->stream, stream_underflow_callback, u);
 | 
				
			||||||
            pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, userdata);
 | 
					    pa_stream_set_overflow_callback(u->stream, stream_overflow_callback, u);
 | 
				
			||||||
    if (pa_stream_connect_playback(u->stream,
 | 
					    if (pa_stream_connect_playback(u->stream,
 | 
				
			||||||
                                   u->remote_sink_name,
 | 
					                                   u->remote_sink_name,
 | 
				
			||||||
                                   &bufferattr,
 | 
					                                   &bufferattr,
 | 
				
			||||||
| 
						 | 
					@ -367,8 +382,30 @@ static void context_state_cb(pa_context *c, void *userdata) {
 | 
				
			||||||
        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
 | 
					        u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    u->connected = true;
 | 
					    u->connected = true;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static void context_state_cb(pa_context *c, void *userdata) {
 | 
				
			||||||
 | 
					    struct userdata *u = userdata;
 | 
				
			||||||
 | 
					    pa_assert(u);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    switch (pa_context_get_state(c)) {
 | 
				
			||||||
 | 
					        case PA_CONTEXT_UNCONNECTED:
 | 
				
			||||||
 | 
					        case PA_CONTEXT_CONNECTING:
 | 
				
			||||||
 | 
					        case PA_CONTEXT_AUTHORIZING:
 | 
				
			||||||
 | 
					        case PA_CONTEXT_SETTING_NAME:
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					        case PA_CONTEXT_READY:
 | 
				
			||||||
 | 
					            /* now that we're connected, ask the control thread to create a sink for
 | 
				
			||||||
 | 
					             * us, and wait for that to complete before proceeding, we'll
 | 
				
			||||||
 | 
					             * receive TUNNEL_MESSAGE_SINK_CREATED in response when the sink is
 | 
				
			||||||
 | 
					             * created (see sink_process_msg_cb()) */
 | 
				
			||||||
 | 
					            pa_log_debug("Connection successful. Creating stream.");
 | 
				
			||||||
 | 
					            pa_assert(!u->stream);
 | 
				
			||||||
 | 
					            pa_assert(!u->sink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            pa_log_debug("Asking ctl thread to create sink.");
 | 
				
			||||||
 | 
					            pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK_REQUEST, u, 0, NULL, NULL);
 | 
				
			||||||
            break;
 | 
					            break;
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        case PA_CONTEXT_FAILED:
 | 
					        case PA_CONTEXT_FAILED:
 | 
				
			||||||
            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
 | 
					            pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
 | 
				
			||||||
            u->connected = false;
 | 
					            u->connected = false;
 | 
				
			||||||
| 
						 | 
					@ -454,6 +491,9 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
 | 
				
			||||||
            *((int64_t*) data) = remote_latency;
 | 
					            *((int64_t*) data) = remote_latency;
 | 
				
			||||||
            return 0;
 | 
					            return 0;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        case TUNNEL_MESSAGE_SINK_CREATED:
 | 
				
			||||||
 | 
					            on_sink_created(u);
 | 
				
			||||||
 | 
					            return 0;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    return pa_sink_process_msg(o, code, data, offset, chunk);
 | 
					    return pa_sink_process_msg(o, code, data, offset, chunk);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -492,7 +532,14 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state,
 | 
				
			||||||
    return 0;
 | 
					    return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int create_sink(struct userdata *u) {
 | 
					/* Creates a sink in the main thread.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This method is called when we receive a message from the io thread that a
 | 
				
			||||||
 | 
					 * connection has been established with the server.  We defer creation of the
 | 
				
			||||||
 | 
					 * sink until the connection is established, because we don't have a sink if
 | 
				
			||||||
 | 
					 * the remote server isn't there.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					static void create_sink(struct userdata *u) {
 | 
				
			||||||
    pa_sink_new_data sink_data;
 | 
					    pa_sink_new_data sink_data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pa_assert_ctl_context();
 | 
					    pa_assert_ctl_context();
 | 
				
			||||||
| 
						 | 
					@ -510,11 +557,9 @@ static int create_sink(struct userdata *u) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
 | 
					    if (!(u->sink = pa_sink_new(u->module->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
 | 
				
			||||||
        pa_log("Failed to create sink.");
 | 
					        pa_log("Failed to create sink.");
 | 
				
			||||||
        goto fail;
 | 
					        goto finish;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pa_sink_new_data_done(&sink_data);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    u->sink->userdata = u;
 | 
					    u->sink->userdata = u;
 | 
				
			||||||
    u->sink->parent.process_msg = sink_process_msg_cb;
 | 
					    u->sink->parent.process_msg = sink_process_msg_cb;
 | 
				
			||||||
    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
 | 
					    u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
 | 
				
			||||||
| 
						 | 
					@ -525,12 +570,35 @@ static int create_sink(struct userdata *u) {
 | 
				
			||||||
    pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
 | 
					    pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
 | 
				
			||||||
    pa_sink_set_rtpoll(u->sink, u->rtpoll);
 | 
					    pa_sink_set_rtpoll(u->sink, u->rtpoll);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return 0;
 | 
					    pa_sink_put(u->sink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fail:
 | 
					finish:
 | 
				
			||||||
    pa_sink_new_data_done(&sink_data);
 | 
					    pa_sink_new_data_done(&sink_data);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return -1;
 | 
					    /* tell any interested io threads that the sink they asked for has now been
 | 
				
			||||||
 | 
					     * created (even if we failed, we still notify the thread, so they can
 | 
				
			||||||
 | 
					     * either handle or kill the thread, rather than deadlock waiting for a
 | 
				
			||||||
 | 
					     * message that will never come */
 | 
				
			||||||
 | 
					    pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), TUNNEL_MESSAGE_SINK_CREATED, u, 0, NULL);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/* Runs in PA mainloop context */
 | 
				
			||||||
 | 
					static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
 | 
				
			||||||
 | 
					    struct userdata *u = (struct userdata *) data;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pa_assert(u);
 | 
				
			||||||
 | 
					    pa_assert_ctl_context();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (u->shutting_down)
 | 
				
			||||||
 | 
					        return 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    switch (code) {
 | 
				
			||||||
 | 
					        case TUNNEL_MESSAGE_CREATE_SINK_REQUEST:
 | 
				
			||||||
 | 
					            create_sink(u);
 | 
				
			||||||
 | 
					            break;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int pa__init(pa_module *m) {
 | 
					int pa__init(pa_module *m) {
 | 
				
			||||||
| 
						 | 
					@ -580,6 +648,9 @@ int pa__init(pa_module *m) {
 | 
				
			||||||
        goto fail;
 | 
					        goto fail;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    u->msg = pa_msgobject_new(tunnel_msg);
 | 
				
			||||||
 | 
					    u->msg->parent.process_msg = tunnel_process_msg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /* The rtpoll created here is never run. It is only necessary to avoid crashes
 | 
					    /* The rtpoll created here is never run. It is only necessary to avoid crashes
 | 
				
			||||||
     * when module-tunnel-sink-new is used together with module-loopback or
 | 
					     * when module-tunnel-sink-new is used together with module-loopback or
 | 
				
			||||||
     * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
 | 
					     * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
 | 
				
			||||||
| 
						 | 
					@ -605,18 +676,11 @@ int pa__init(pa_module *m) {
 | 
				
			||||||
        goto fail;
 | 
					        goto fail;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (create_sink(u) < 0) {
 | 
					 | 
				
			||||||
        pa_log("Failed to create sink.");
 | 
					 | 
				
			||||||
        goto fail;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
 | 
					    if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
 | 
				
			||||||
        pa_log("Failed to create thread.");
 | 
					        pa_log("Failed to create thread.");
 | 
				
			||||||
        goto fail;
 | 
					        goto fail;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pa_sink_put(u->sink);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    pa_modargs_free(ma);
 | 
					    pa_modargs_free(ma);
 | 
				
			||||||
    pa_xfree(default_sink_name);
 | 
					    pa_xfree(default_sink_name);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -642,6 +706,8 @@ void pa__done(pa_module *m) {
 | 
				
			||||||
    if (!(u = m->userdata))
 | 
					    if (!(u = m->userdata))
 | 
				
			||||||
        return;
 | 
					        return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    u->shutting_down = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (u->sink)
 | 
					    if (u->sink)
 | 
				
			||||||
        pa_sink_unlink(u->sink);
 | 
					        pa_sink_unlink(u->sink);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -679,5 +745,7 @@ void pa__done(pa_module *m) {
 | 
				
			||||||
    if (u->sink_name)
 | 
					    if (u->sink_name)
 | 
				
			||||||
        pa_xfree(u->sink_name);
 | 
					        pa_xfree(u->sink_name);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pa_xfree(u->msg);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pa_xfree(u);
 | 
					    pa_xfree(u);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue