A lot of updates, all necessary to get the native protocol ported:

* add an int64_t argument to pa_asyncmsgq because it is very difficult to pass 64 values otherwise
* simplify subclassing in pa_object
* s/drop/unlink/ at some places
* port the native protocol to the lock-free core (not tested, compiles fine)
* move synchronisation of playback streams into pa_sink_input
* add "start_corked" field to pa_sink_input_new_data
* allow casting of NULL values in pa_object


git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1562 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2007-07-31 22:44:53 +00:00
parent a82505e72f
commit 0defdfb560
27 changed files with 1229 additions and 770 deletions

View file

@ -67,7 +67,7 @@ typedef struct connection {
PA_DECLARE_CLASS(connection);
#define CONNECTION(o) (connection_cast(o))
static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type);
static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);
struct pa_protocol_simple {
pa_module *module;
@ -91,9 +91,9 @@ enum {
};
enum {
MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
MESSAGE_POST_DATA, /* data from source output to main loop */
MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
CONNECTION_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */
CONNECTION_MESSAGE_POST_DATA, /* data from source output to main loop */
CONNECTION_MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */
};
@ -102,29 +102,12 @@ enum {
#define RECORD_BUFFER_SECONDS (5)
#define RECORD_BUFFER_FRAGMENTS (100)
static void connection_free(pa_object *o) {
connection *c = CONNECTION(o);
static void connection_unlink(connection *c) {
pa_assert(c);
if (c->playback.current_memblock)
pa_memblock_unref(c->playback.current_memblock);
if (c->io)
pa_iochannel_free(c->io);
if (c->input_memblockq)
pa_memblockq_free(c->input_memblockq);
if (c->output_memblockq)
pa_memblockq_free(c->output_memblockq);
pa_xfree(c);
}
static void connection_drop(connection *c) {
pa_assert(c);
if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL))
if (!c->protocol)
return;
if (c->sink_input) {
pa_sink_input_disconnect(c->sink_input);
pa_sink_input_unref(c->sink_input);
@ -142,9 +125,30 @@ static void connection_drop(connection *c) {
c->client = NULL;
}
pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c);
c->protocol = NULL;
connection_unref(c);
}
static void connection_free(pa_object *o) {
connection *c = CONNECTION(o);
pa_assert(c);
connection_unref(c);
if (c->playback.current_memblock)
pa_memblock_unref(c->playback.current_memblock);
if (c->io)
pa_iochannel_free(c->io);
if (c->input_memblockq)
pa_memblockq_free(c->input_memblockq);
if (c->output_memblockq)
pa_memblockq_free(c->output_memblockq);
pa_xfree(c);
}
static int do_read(connection *c) {
pa_memchunk chunk;
ssize_t r;
@ -190,7 +194,7 @@ static int do_read(connection *c) {
c->playback.memblock_index += r;
pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL);
pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL);
pa_atomic_sub(&c->playback.missing, r);
return 0;
@ -263,28 +267,28 @@ fail:
pa_iochannel_free(c->io);
c->io = NULL;
pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL);
pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);
} else
connection_drop(c);
connection_unlink(c);
}
static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) {
static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) {
connection *c = CONNECTION(o);
connection_assert_ref(c);
switch (code) {
case MESSAGE_REQUEST_DATA:
case CONNECTION_MESSAGE_REQUEST_DATA:
do_work(c);
break;
case MESSAGE_POST_DATA:
case CONNECTION_MESSAGE_POST_DATA:
/* pa_log("got data %u", chunk->length); */
pa_memblockq_push_align(c->output_memblockq, chunk);
do_work(c);
break;
case MESSAGE_DROP_CONNECTION:
connection_drop(c);
case CONNECTION_MESSAGE_DROP_CONNECTION:
connection_unlink(c);
break;
}
@ -294,13 +298,13 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_m
/*** sink_input callbacks ***/
/* Called from thread context */
static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) {
static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o);
connection*c;
pa_assert(i);
c = i->userdata;
pa_assert(c);
pa_sink_input_assert_ref(i);
c = CONNECTION(i->userdata);
connection_assert_ref(c);
switch (code) {
@ -330,7 +334,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_
}
default:
return pa_sink_input_process_msg(o, code, userdata, chunk);
return pa_sink_input_process_msg(o, code, userdata, offset, chunk);
}
}
@ -349,7 +353,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
/* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */
if (c->dead && r < 0)
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL);
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL);
return r;
}
@ -369,19 +373,20 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
if (new > old) {
if (pa_atomic_add(&c->playback.missing, new - old) <= 0)
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL);
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
}
}
/* Called from main context */
static void sink_input_kill_cb(pa_sink_input *i) {
pa_assert(i);
pa_sink_input_assert_ref(i);
connection_drop(CONNECTION(i->userdata));
connection_unlink(CONNECTION(i->userdata));
}
/*** source_output callbacks ***/
/* Called from thread context */
static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {
connection *c;
@ -390,24 +395,22 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
pa_assert(c);
pa_assert(chunk);
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL);
pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);
}
/* Called from main context */
static void source_output_kill_cb(pa_source_output *o) {
connection*c;
pa_source_output_assert_ref(o);
pa_assert(o);
c = o->userdata;
pa_assert(c);
connection_drop(c);
connection_unlink(CONNECTION(o->userdata));
}
/* Called from main context */
static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {
connection*c;
pa_assert(o);
c = o->userdata;
c = CONNECTION(o->userdata);
pa_assert(c);
return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);
@ -419,16 +422,16 @@ static void client_kill_cb(pa_client *client) {
connection*c;
pa_assert(client);
c = client->userdata;
c = CONNECTION(client->userdata);
pa_assert(c);
connection_drop(c);
connection_unlink(c);
}
/*** pa_iochannel callbacks ***/
static void io_callback(pa_iochannel*io, void *userdata) {
connection *c = userdata;
connection *c = CONNECTION(userdata);
pa_assert(io);
pa_assert(c);
@ -453,7 +456,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
return;
}
c = pa_msgobject_new(connection, connection_check_type);
c = pa_msgobject_new(connection);
c->parent.parent.free = connection_free;
c->parent.process_msg = connection_process_msg;
c->io = io;
@ -547,7 +550,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
pa_source_output_put(c->source_output);
}
pa_iochannel_set_callback(c->io, io_callback, c);
pa_idxset_put(p->connections, c, NULL);
@ -555,7 +557,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
fail:
if (c)
connection_drop(c);
connection_unlink(c);
}
pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) {
@ -618,7 +620,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) {
if (p->connections) {
while((c = pa_idxset_first(p->connections, NULL)))
connection_drop(c);
connection_unlink(c);
pa_idxset_free(p->connections, NULL, NULL);
}