mirror of
				https://gitlab.freedesktop.org/pipewire/pipewire.git
				synced 2025-11-03 09:01:54 -05:00 
			
		
		
		
	pulse-server: pass data with a ringbuffer
Implement the process function from the RT thread and use a ringbuffer to pass samples around.
This commit is contained in:
		
							parent
							
								
									138e61138f
								
							
						
					
					
						commit
						177e897a88
					
				
					 1 changed files with 204 additions and 104 deletions
				
			
		| 
						 | 
				
			
			@ -47,6 +47,7 @@
 | 
			
		|||
#include <spa/pod/pod.h>
 | 
			
		||||
#include <spa/param/audio/format-utils.h>
 | 
			
		||||
#include <spa/param/props.h>
 | 
			
		||||
#include <spa/utils/ringbuffer.h>
 | 
			
		||||
 | 
			
		||||
#include "pipewire/pipewire.h"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -114,7 +115,9 @@ struct stream {
 | 
			
		|||
	struct pw_stream *stream;
 | 
			
		||||
	struct spa_hook stream_listener;
 | 
			
		||||
 | 
			
		||||
	struct spa_list messages;
 | 
			
		||||
	struct spa_ringbuffer ring;
 | 
			
		||||
	void *buffer;
 | 
			
		||||
 | 
			
		||||
	int64_t read_index;
 | 
			
		||||
	int64_t write_index;
 | 
			
		||||
	uint64_t underrun_for;
 | 
			
		||||
| 
						 | 
				
			
			@ -295,6 +298,44 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error)
 | 
			
		|||
	return send_message(client, reply);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int send_underflow(struct stream *stream, int64_t offset)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
	struct message *reply;
 | 
			
		||||
 | 
			
		||||
	pw_log_warn(NAME" %p: UNDERFLOW channel:%u offset:%"PRIi64,
 | 
			
		||||
			client, stream->channel, offset);
 | 
			
		||||
 | 
			
		||||
	reply = message_alloc(client, -1, 0);
 | 
			
		||||
	message_put(reply,
 | 
			
		||||
		TAG_U32, COMMAND_UNDERFLOW,
 | 
			
		||||
		TAG_U32, -1,
 | 
			
		||||
		TAG_U32, stream->channel,
 | 
			
		||||
		TAG_INVALID);
 | 
			
		||||
	if (client->version >= 23) {
 | 
			
		||||
		message_put(reply,
 | 
			
		||||
			TAG_S64, offset,
 | 
			
		||||
			TAG_INVALID);
 | 
			
		||||
	}
 | 
			
		||||
	return send_message(client, reply);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int send_overflow(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
	struct message *reply;
 | 
			
		||||
 | 
			
		||||
	pw_log_warn(NAME" %p: OVERFLOW channel:%u", client, stream->channel);
 | 
			
		||||
 | 
			
		||||
	reply = message_alloc(client, -1, 0);
 | 
			
		||||
	message_put(reply,
 | 
			
		||||
		TAG_U32, COMMAND_OVERFLOW,
 | 
			
		||||
		TAG_U32, -1,
 | 
			
		||||
		TAG_U32, stream->channel,
 | 
			
		||||
		TAG_INVALID);
 | 
			
		||||
	return send_message(client, reply);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m)
 | 
			
		||||
{
 | 
			
		||||
	struct impl *impl = client->impl;
 | 
			
		||||
| 
						 | 
				
			
			@ -385,9 +426,7 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s
 | 
			
		|||
 | 
			
		||||
static void stream_flush(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct message *msg;
 | 
			
		||||
	spa_list_consume(msg, &stream->messages, link)
 | 
			
		||||
		message_free(stream->client, msg, false);
 | 
			
		||||
	spa_ringbuffer_init(&stream->ring);
 | 
			
		||||
	stream->write_index = stream->read_index = 0;
 | 
			
		||||
	stream->playing_for = 0;
 | 
			
		||||
	stream->underrun_for = 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -403,6 +442,8 @@ static void stream_free(struct stream *stream)
 | 
			
		|||
		spa_hook_remove(&stream->stream_listener);
 | 
			
		||||
		pw_stream_destroy(stream->stream);
 | 
			
		||||
	}
 | 
			
		||||
	if (stream->buffer)
 | 
			
		||||
		free(stream->buffer);
 | 
			
		||||
	free(stream);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -450,12 +491,45 @@ static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed)
 | 
			
		|||
	return wanted;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void update_timing_info(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct pw_time pwt;
 | 
			
		||||
	int64_t delay, pos;
 | 
			
		||||
 | 
			
		||||
	pw_stream_get_time(stream->stream, &pwt);
 | 
			
		||||
 | 
			
		||||
	stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC;
 | 
			
		||||
	stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC;
 | 
			
		||||
 | 
			
		||||
	if (pwt.rate.denom > 0) {
 | 
			
		||||
		uint64_t ticks = pwt.ticks;
 | 
			
		||||
		if (!stream->have_time)
 | 
			
		||||
                        stream->ticks_base = ticks;
 | 
			
		||||
                if (ticks > stream->ticks_base)
 | 
			
		||||
                        pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size;
 | 
			
		||||
                else
 | 
			
		||||
                        pos = 0;
 | 
			
		||||
                delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
 | 
			
		||||
                stream->have_time = true;
 | 
			
		||||
        } else {
 | 
			
		||||
                pos = delay = 0;
 | 
			
		||||
                stream->have_time = false;
 | 
			
		||||
        }
 | 
			
		||||
	if (stream->direction == PW_DIRECTION_OUTPUT)
 | 
			
		||||
		stream->read_index = pos;
 | 
			
		||||
	else
 | 
			
		||||
		stream->write_index = pos;
 | 
			
		||||
	stream->delay = delay;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int send_command_request(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
	struct message *msg;
 | 
			
		||||
	uint32_t size;
 | 
			
		||||
 | 
			
		||||
	update_timing_info(stream);
 | 
			
		||||
 | 
			
		||||
	size = writable_size(stream, 0);
 | 
			
		||||
	if (size == 0)
 | 
			
		||||
		return 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -473,6 +547,7 @@ static int send_command_request(struct stream *stream)
 | 
			
		|||
	return send_message(client, msg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
static int reply_create_playback_stream(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
| 
						 | 
				
			
			@ -670,47 +745,52 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
 | 
			
		|||
	pw_stream_update_params(stream->stream, params, n_params);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void update_timing_info(struct stream *stream)
 | 
			
		||||
static int
 | 
			
		||||
do_process_done(struct spa_loop *loop,
 | 
			
		||||
                 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
 | 
			
		||||
{
 | 
			
		||||
	struct pw_time pwt;
 | 
			
		||||
	int64_t delay, pos;
 | 
			
		||||
	struct stream *stream = user_data;
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
 | 
			
		||||
	pw_stream_get_time(stream->stream, &pwt);
 | 
			
		||||
	if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
			
		||||
		send_command_request(stream);
 | 
			
		||||
	} else {
 | 
			
		||||
		uint32_t index;
 | 
			
		||||
		int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
 | 
			
		||||
		if (avail < 0) {
 | 
			
		||||
			/* underrun */
 | 
			
		||||
			send_underflow(stream, index);
 | 
			
		||||
		} else if (avail > MAXLENGTH) {
 | 
			
		||||
			/* overrun */
 | 
			
		||||
			send_overflow(stream);
 | 
			
		||||
		} else {
 | 
			
		||||
			struct message *msg;
 | 
			
		||||
 | 
			
		||||
	stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC;
 | 
			
		||||
	stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC;
 | 
			
		||||
 | 
			
		||||
	if (pwt.rate.denom > 0) {
 | 
			
		||||
		uint64_t ticks = pwt.ticks;
 | 
			
		||||
		if (!stream->have_time)
 | 
			
		||||
                        stream->ticks_base = ticks;
 | 
			
		||||
                if (ticks > stream->ticks_base)
 | 
			
		||||
                        pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size;
 | 
			
		||||
                else
 | 
			
		||||
                        pos = 0;
 | 
			
		||||
                delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
 | 
			
		||||
                stream->have_time = true;
 | 
			
		||||
        } else {
 | 
			
		||||
                pos = delay = 0;
 | 
			
		||||
                stream->have_time = false;
 | 
			
		||||
        }
 | 
			
		||||
	if (stream->direction == PW_DIRECTION_OUTPUT)
 | 
			
		||||
		stream->read_index = pos;
 | 
			
		||||
	else
 | 
			
		||||
		stream->write_index = pos;
 | 
			
		||||
	stream->delay = delay;
 | 
			
		||||
			msg = message_alloc(client, stream->channel, avail);
 | 
			
		||||
			if (msg != NULL) {
 | 
			
		||||
				spa_ringbuffer_read_data(&stream->ring,
 | 
			
		||||
						stream->buffer, MAXLENGTH,
 | 
			
		||||
						index % MAXLENGTH,
 | 
			
		||||
						msg->data, avail);
 | 
			
		||||
				spa_ringbuffer_read_update(&stream->ring,
 | 
			
		||||
						index + avail);
 | 
			
		||||
				send_message(client, msg);
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void stream_process_record(struct stream *stream)
 | 
			
		||||
static void stream_process(void *data)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
	struct stream *stream = data;
 | 
			
		||||
	struct impl *impl = stream->impl;
 | 
			
		||||
	void *p;
 | 
			
		||||
	struct pw_buffer *buffer;
 | 
			
		||||
	struct spa_buffer *buf;
 | 
			
		||||
	uint32_t size;
 | 
			
		||||
	struct message *msg;
 | 
			
		||||
	void *p;
 | 
			
		||||
	uint32_t index, size;
 | 
			
		||||
 | 
			
		||||
	update_timing_info(stream);
 | 
			
		||||
	pw_log_trace(NAME" %p: process", stream);
 | 
			
		||||
 | 
			
		||||
	buffer = pw_stream_dequeue_buffer(stream->stream);
 | 
			
		||||
	if (buffer == NULL)
 | 
			
		||||
| 
						 | 
				
			
			@ -720,72 +800,59 @@ static void stream_process_record(struct stream *stream)
 | 
			
		|||
        if ((p = buf->datas[0].data) == NULL)
 | 
			
		||||
		return;
 | 
			
		||||
 | 
			
		||||
	size = buf->datas[0].chunk->size;
 | 
			
		||||
	if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
			
		||||
		int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
 | 
			
		||||
		if (avail < 0) {
 | 
			
		||||
			/* underrun */
 | 
			
		||||
			pw_log_warn(NAME" %p: underrun", stream);
 | 
			
		||||
			size = buf->datas[0].maxsize;
 | 
			
		||||
			memset(p, 0, size);
 | 
			
		||||
		} else if (avail > MAXLENGTH) {
 | 
			
		||||
			/* overrun */
 | 
			
		||||
			pw_log_warn(NAME" %p: overrun", stream);
 | 
			
		||||
			size = buf->datas[0].maxsize;
 | 
			
		||||
			memset(p, 0, size);
 | 
			
		||||
		} else {
 | 
			
		||||
			size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
 | 
			
		||||
 | 
			
		||||
	msg = message_alloc(client, stream->channel, size);
 | 
			
		||||
	if (msg != NULL) {
 | 
			
		||||
		memcpy(msg->data,
 | 
			
		||||
			SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
 | 
			
		||||
			size);
 | 
			
		||||
		send_message(client, msg);
 | 
			
		||||
	}
 | 
			
		||||
	stream->write_index += size;
 | 
			
		||||
	pw_stream_queue_buffer(stream->stream, buffer);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void stream_process_playback(struct stream *stream)
 | 
			
		||||
{
 | 
			
		||||
	struct client *client = stream->client;
 | 
			
		||||
 | 
			
		||||
	pw_log_trace(NAME" %p: process", stream);
 | 
			
		||||
 | 
			
		||||
	update_timing_info(stream);
 | 
			
		||||
 | 
			
		||||
	while (!spa_list_is_empty(&stream->messages)) {
 | 
			
		||||
		struct pw_buffer *buffer;
 | 
			
		||||
		struct spa_buffer *buf;
 | 
			
		||||
		struct message *msg;
 | 
			
		||||
		uint32_t size, maxsize;
 | 
			
		||||
		void *p;
 | 
			
		||||
 | 
			
		||||
		buffer = pw_stream_dequeue_buffer(stream->stream);
 | 
			
		||||
		if (buffer == NULL)
 | 
			
		||||
			break;
 | 
			
		||||
 | 
			
		||||
	        buf = buffer->buffer;
 | 
			
		||||
	        if ((p = buf->datas[0].data) == NULL)
 | 
			
		||||
			break;
 | 
			
		||||
 | 
			
		||||
		msg = spa_list_first(&stream->messages, struct message, link);
 | 
			
		||||
		maxsize = buf->datas[0].maxsize;
 | 
			
		||||
		size = SPA_MIN(msg->length - msg->offset, maxsize);
 | 
			
		||||
		memcpy(p, SPA_MEMBER(msg->data, msg->offset, void), size);
 | 
			
		||||
 | 
			
		||||
		pw_log_trace(NAME" %p: process message %p %d-%d/%d",
 | 
			
		||||
				stream, msg, msg->offset, size, msg->length);
 | 
			
		||||
 | 
			
		||||
		stream->read_index += size;
 | 
			
		||||
		stream->playing_for += size;
 | 
			
		||||
		msg->offset += size;
 | 
			
		||||
		if (msg->offset >= msg->length)
 | 
			
		||||
			message_free(client, msg, false);
 | 
			
		||||
			spa_ringbuffer_read_data(&stream->ring,
 | 
			
		||||
					stream->buffer, MAXLENGTH,
 | 
			
		||||
					index % MAXLENGTH,
 | 
			
		||||
					p, size);
 | 
			
		||||
			spa_ringbuffer_read_update(&stream->ring,
 | 
			
		||||
					index + size);
 | 
			
		||||
 | 
			
		||||
			stream->playing_for += size;
 | 
			
		||||
		}
 | 
			
		||||
	        buf->datas[0].chunk->offset = 0;
 | 
			
		||||
	        buf->datas[0].chunk->stride = stream->frame_size;
 | 
			
		||||
	        buf->datas[0].chunk->size = size;
 | 
			
		||||
	} else  {
 | 
			
		||||
		int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
 | 
			
		||||
		if (filled < 0) {
 | 
			
		||||
			/* underrun */
 | 
			
		||||
			pw_log_warn(NAME" %p: underrun", stream);
 | 
			
		||||
		} else if (filled > MAXLENGTH) {
 | 
			
		||||
			/* overrun */
 | 
			
		||||
			pw_log_warn(NAME" %p: overrun", stream);
 | 
			
		||||
		} else {
 | 
			
		||||
			uint32_t avail = MAXLENGTH - filled;
 | 
			
		||||
			size = SPA_MIN(buf->datas[0].chunk->size, avail);
 | 
			
		||||
 | 
			
		||||
		pw_stream_queue_buffer(stream->stream, buffer);
 | 
			
		||||
			spa_ringbuffer_write_data(&stream->ring,
 | 
			
		||||
					stream->buffer, MAXLENGTH,
 | 
			
		||||
					index % MAXLENGTH,
 | 
			
		||||
					SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
 | 
			
		||||
					size);
 | 
			
		||||
			spa_ringbuffer_write_update(&stream->ring,
 | 
			
		||||
					index + size);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	send_command_request(stream);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void stream_process(void *data)
 | 
			
		||||
{
 | 
			
		||||
	struct stream *stream = data;
 | 
			
		||||
	if (stream->direction == PW_DIRECTION_OUTPUT)
 | 
			
		||||
		stream_process_playback(stream);
 | 
			
		||||
	else
 | 
			
		||||
		stream_process_record(stream);
 | 
			
		||||
	pw_stream_queue_buffer(stream->stream, buffer);
 | 
			
		||||
 | 
			
		||||
	pw_loop_invoke(impl->loop,
 | 
			
		||||
			do_process_done, 1, NULL, 0, false, stream);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void stream_drained(void *data)
 | 
			
		||||
| 
						 | 
				
			
			@ -889,7 +956,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
 | 
			
		|||
	struct pw_properties *props = NULL;
 | 
			
		||||
	uint8_t n_formats = 0;
 | 
			
		||||
	struct format_info *formats = NULL;
 | 
			
		||||
	struct stream *stream;
 | 
			
		||||
	struct stream *stream = NULL;
 | 
			
		||||
        struct spa_audio_info_raw info;
 | 
			
		||||
	uint32_t n_params;
 | 
			
		||||
	const struct spa_pod *params[1];
 | 
			
		||||
| 
						 | 
				
			
			@ -1005,7 +1072,13 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
 | 
			
		|||
	stream->corked = corked;
 | 
			
		||||
	stream->adjust_latency = adjust_latency;
 | 
			
		||||
	stream->channel = pw_map_insert_new(&client->streams, stream);
 | 
			
		||||
	spa_list_init(&stream->messages);
 | 
			
		||||
 | 
			
		||||
	stream->buffer = calloc(1, MAXLENGTH);
 | 
			
		||||
	if (stream->buffer == NULL) {
 | 
			
		||||
		res = -errno;
 | 
			
		||||
		goto error;
 | 
			
		||||
	}
 | 
			
		||||
	spa_ringbuffer_init(&stream->ring);
 | 
			
		||||
 | 
			
		||||
	stream->direction = PW_DIRECTION_OUTPUT;
 | 
			
		||||
	stream->create_tag = tag;
 | 
			
		||||
| 
						 | 
				
			
			@ -1045,6 +1118,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
 | 
			
		|||
			PW_DIRECTION_OUTPUT,
 | 
			
		||||
			SPA_ID_INVALID,
 | 
			
		||||
			PW_STREAM_FLAG_AUTOCONNECT |
 | 
			
		||||
			PW_STREAM_FLAG_RT_PROCESS |
 | 
			
		||||
			PW_STREAM_FLAG_MAP_BUFFERS,
 | 
			
		||||
			params, n_params);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1113,7 +1187,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
 | 
			
		|||
	struct pw_properties *props = NULL;
 | 
			
		||||
	uint8_t n_formats = 0;
 | 
			
		||||
	struct format_info *formats = NULL;
 | 
			
		||||
	struct stream *stream;
 | 
			
		||||
	struct stream *stream = NULL;
 | 
			
		||||
        struct spa_audio_info_raw info;
 | 
			
		||||
	uint32_t n_params;
 | 
			
		||||
	const struct spa_pod *params[1];
 | 
			
		||||
| 
						 | 
				
			
			@ -1222,7 +1296,14 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
 | 
			
		|||
	stream->corked = corked;
 | 
			
		||||
	stream->adjust_latency = adjust_latency;
 | 
			
		||||
	stream->channel = pw_map_insert_new(&client->streams, stream);
 | 
			
		||||
	spa_list_init(&stream->messages);
 | 
			
		||||
 | 
			
		||||
	stream->buffer = calloc(1, MAXLENGTH);
 | 
			
		||||
	if (stream->buffer == NULL) {
 | 
			
		||||
		res = -errno;
 | 
			
		||||
		goto error;
 | 
			
		||||
	}
 | 
			
		||||
	spa_ringbuffer_init(&stream->ring);
 | 
			
		||||
 | 
			
		||||
	stream->create_tag = tag;
 | 
			
		||||
	stream->ss = ss;
 | 
			
		||||
	stream->map = map;
 | 
			
		||||
| 
						 | 
				
			
			@ -1257,6 +1338,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
 | 
			
		|||
			PW_DIRECTION_INPUT,
 | 
			
		||||
			SPA_ID_INVALID,
 | 
			
		||||
			PW_STREAM_FLAG_AUTOCONNECT |
 | 
			
		||||
			PW_STREAM_FLAG_RT_PROCESS |
 | 
			
		||||
			PW_STREAM_FLAG_MAP_BUFFERS,
 | 
			
		||||
			params, n_params);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1312,6 +1394,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint
 | 
			
		|||
	if (stream == NULL)
 | 
			
		||||
		return -EINVAL;
 | 
			
		||||
 | 
			
		||||
	update_timing_info(stream);
 | 
			
		||||
 | 
			
		||||
	pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64,
 | 
			
		||||
			stream->read_index, stream->write_index,
 | 
			
		||||
| 
						 | 
				
			
			@ -1343,7 +1426,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
 | 
			
		|||
	struct impl *impl = client->impl;
 | 
			
		||||
	struct message *reply;
 | 
			
		||||
	uint32_t channel;
 | 
			
		||||
	struct timeval tv, now;
 | 
			
		||||
	struct timeval tv;
 | 
			
		||||
	struct stream *stream;
 | 
			
		||||
	int res;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1358,7 +1441,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
 | 
			
		|||
	if (stream == NULL)
 | 
			
		||||
		return -EINVAL;
 | 
			
		||||
 | 
			
		||||
	gettimeofday(&now, NULL);
 | 
			
		||||
	update_timing_info(stream);
 | 
			
		||||
 | 
			
		||||
	reply = reply_new(client, tag);
 | 
			
		||||
	message_put(reply,
 | 
			
		||||
| 
						 | 
				
			
			@ -2284,8 +2367,9 @@ static int handle_memblock(struct client *client, struct message *msg)
 | 
			
		|||
{
 | 
			
		||||
	struct impl *impl = client->impl;
 | 
			
		||||
	struct stream *stream;
 | 
			
		||||
	uint32_t channel, flags;
 | 
			
		||||
	uint32_t channel, flags, index;
 | 
			
		||||
	int64_t offset;
 | 
			
		||||
	int32_t filled;
 | 
			
		||||
 | 
			
		||||
	channel = ntohl(client->desc.channel);
 | 
			
		||||
	offset = (int64_t) (
 | 
			
		||||
| 
						 | 
				
			
			@ -2301,9 +2385,25 @@ static int handle_memblock(struct client *client, struct message *msg)
 | 
			
		|||
	if (stream == NULL)
 | 
			
		||||
		return -EINVAL;
 | 
			
		||||
 | 
			
		||||
	pw_log_debug("new block %p %p", msg, msg->data);
 | 
			
		||||
	spa_list_append(&stream->messages, &msg->link);
 | 
			
		||||
	stream->write_index += msg->length;
 | 
			
		||||
	pw_log_debug("new block %p %p/%u", msg, msg->data, msg->length);
 | 
			
		||||
 | 
			
		||||
	filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
 | 
			
		||||
	if (filled < 0) {
 | 
			
		||||
		/* underrun */
 | 
			
		||||
		send_underflow(stream, index);
 | 
			
		||||
	} else if (filled + msg->length > MAXLENGTH) {
 | 
			
		||||
		/* overrun */
 | 
			
		||||
		send_overflow(stream);
 | 
			
		||||
	} else {
 | 
			
		||||
		spa_ringbuffer_write_data(&stream->ring,
 | 
			
		||||
				stream->buffer, MAXLENGTH,
 | 
			
		||||
				index % MAXLENGTH,
 | 
			
		||||
				msg->data, msg->length);
 | 
			
		||||
		spa_ringbuffer_write_update(&stream->ring,
 | 
			
		||||
				index + msg->length);
 | 
			
		||||
		stream->write_index += msg->length;
 | 
			
		||||
	}
 | 
			
		||||
	message_free(client, msg, false);
 | 
			
		||||
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue