mirror of
				https://gitlab.freedesktop.org/pipewire/pipewire.git
				synced 2025-11-03 09:01:54 -05:00 
			
		
		
		
	pulse-server: Improve timing reporting
Use the same logic as pulseaudio to improve the timing reporting and improve compatibility with vlc.
This commit is contained in:
		
							parent
							
								
									18c57efb39
								
							
						
					
					
						commit
						8abb648ec3
					
				
					 1 changed files with 132 additions and 149 deletions
				
			
		| 
						 | 
					@ -148,7 +148,9 @@ struct stream {
 | 
				
			||||||
	uint64_t ticks_base;
 | 
						uint64_t ticks_base;
 | 
				
			||||||
	struct timeval timestamp;
 | 
						struct timeval timestamp;
 | 
				
			||||||
	int64_t delay;
 | 
						int64_t delay;
 | 
				
			||||||
	uint32_t pending;
 | 
					
 | 
				
			||||||
 | 
						uint32_t missing;
 | 
				
			||||||
 | 
						uint32_t requested;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct sample_spec ss;
 | 
						struct sample_spec ss;
 | 
				
			||||||
	struct channel_map map;
 | 
						struct channel_map map;
 | 
				
			||||||
| 
						 | 
					@ -165,6 +167,7 @@ struct stream {
 | 
				
			||||||
	unsigned int adjust_latency:1;
 | 
						unsigned int adjust_latency:1;
 | 
				
			||||||
	unsigned int have_time:1;
 | 
						unsigned int have_time:1;
 | 
				
			||||||
	unsigned int is_underrun:1;
 | 
						unsigned int is_underrun:1;
 | 
				
			||||||
 | 
						unsigned int in_prebuf:1;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct server {
 | 
					struct server {
 | 
				
			||||||
| 
						 | 
					@ -723,8 +726,16 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void stream_flush(struct stream *stream)
 | 
					static void stream_flush(struct stream *stream)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	spa_ringbuffer_init(&stream->ring);
 | 
						uint32_t old;
 | 
				
			||||||
	stream->write_index = stream->read_index = 0;
 | 
					
 | 
				
			||||||
 | 
						old = stream->ring.writeindex;
 | 
				
			||||||
 | 
						stream->ring.writeindex = stream->ring.readindex;
 | 
				
			||||||
 | 
						stream->missing += (old - stream->ring.writeindex);
 | 
				
			||||||
 | 
						stream->write_index = stream->read_index = stream->ring.writeindex;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (stream->attr.prebuf > 0)
 | 
				
			||||||
 | 
							stream->in_prebuf = true;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->playing_for = 0;
 | 
						stream->playing_for = 0;
 | 
				
			||||||
	stream->underrun_for = 0;
 | 
						stream->underrun_for = 0;
 | 
				
			||||||
	stream->have_time = false;
 | 
						stream->have_time = false;
 | 
				
			||||||
| 
						 | 
					@ -746,80 +757,33 @@ static void stream_free(struct stream *stream)
 | 
				
			||||||
	free(stream);
 | 
						free(stream);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline uint32_t queued_size(const struct stream *s, uint64_t elapsed)
 | 
					static bool stream_prebuf_active(struct stream *stream)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	uint64_t queued;
 | 
						uint32_t index;
 | 
				
			||||||
	queued = s->write_index - SPA_MIN(s->read_index, s->write_index);
 | 
						int32_t avail;
 | 
				
			||||||
	queued -= SPA_MIN(queued, elapsed);
 | 
					 | 
				
			||||||
	return queued;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
static inline uint32_t target_queue(const struct stream *s)
 | 
						avail = spa_ringbuffer_get_write_index(&stream->ring, &index);
 | 
				
			||||||
{
 | 
						if (stream->in_prebuf)
 | 
				
			||||||
	return s->attr.tlength;
 | 
							return avail < (int32_t) stream->attr.prebuf;
 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static inline uint32_t wanted_size(const struct stream *s, uint32_t queued, uint32_t target)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	return target - SPA_MIN(queued, target);
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static inline uint32_t required_size(const struct stream *s)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	return s->attr.minreq;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	uint32_t queued, target, wanted, required;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	queued = queued_size(s, elapsed);
 | 
					 | 
				
			||||||
	target = target_queue(s);
 | 
					 | 
				
			||||||
	target -= SPA_MIN(target, s->pending);
 | 
					 | 
				
			||||||
	wanted = wanted_size(s, queued, target);
 | 
					 | 
				
			||||||
	required = required_size(s);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	pw_log_trace("stream %p, queued:%u target:%u wanted:%u required:%u",
 | 
					 | 
				
			||||||
			s, queued, target, wanted, required);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if (s->adjust_latency)
 | 
					 | 
				
			||||||
		if (queued >= wanted)
 | 
					 | 
				
			||||||
			wanted = 0;
 | 
					 | 
				
			||||||
	if (wanted < required)
 | 
					 | 
				
			||||||
		wanted = 0;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return wanted;
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
static void update_timing_info(struct stream *stream)
 | 
					 | 
				
			||||||
{
 | 
					 | 
				
			||||||
	struct pw_time pwt;
 | 
					 | 
				
			||||||
	int64_t delay, pos;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	pw_stream_get_time(stream->stream, &pwt);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC;
 | 
					 | 
				
			||||||
	stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	if (pwt.rate.denom > 0) {
 | 
					 | 
				
			||||||
		uint64_t ticks = pwt.ticks;
 | 
					 | 
				
			||||||
		if (!stream->have_time)
 | 
					 | 
				
			||||||
                        stream->ticks_base = ticks;
 | 
					 | 
				
			||||||
                if (ticks > stream->ticks_base)
 | 
					 | 
				
			||||||
                        pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size;
 | 
					 | 
				
			||||||
                else
 | 
					 | 
				
			||||||
                        pos = 0;
 | 
					 | 
				
			||||||
                delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom;
 | 
					 | 
				
			||||||
                stream->have_time = true;
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
                pos = delay = 0;
 | 
					 | 
				
			||||||
                stream->have_time = false;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
	if (stream->direction == PW_DIRECTION_OUTPUT)
 | 
					 | 
				
			||||||
		stream->read_index = pos;
 | 
					 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
		stream->write_index = pos;
 | 
							return stream->attr.prebuf > 0 && avail >= 0;
 | 
				
			||||||
	stream->delay = delay;
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static uint32_t stream_pop_missing(struct stream *stream)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						uint32_t missing;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (stream->missing <= 0)
 | 
				
			||||||
 | 
							return 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (stream->missing < stream->attr.minreq &&
 | 
				
			||||||
 | 
						    !stream_prebuf_active(stream))
 | 
				
			||||||
 | 
							return 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						missing = stream->missing;
 | 
				
			||||||
 | 
						stream->requested += missing;
 | 
				
			||||||
 | 
						stream->missing = 0;
 | 
				
			||||||
 | 
						return missing;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int send_command_request(struct stream *stream)
 | 
					static int send_command_request(struct stream *stream)
 | 
				
			||||||
| 
						 | 
					@ -828,9 +792,7 @@ static int send_command_request(struct stream *stream)
 | 
				
			||||||
	struct message *msg;
 | 
						struct message *msg;
 | 
				
			||||||
	uint32_t size;
 | 
						uint32_t size;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	update_timing_info(stream);
 | 
						size = stream_pop_missing(stream);
 | 
				
			||||||
 | 
					 | 
				
			||||||
	size = writable_size(stream, 0);
 | 
					 | 
				
			||||||
	pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
 | 
						pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (size == 0)
 | 
						if (size == 0)
 | 
				
			||||||
| 
						 | 
					@ -844,8 +806,6 @@ static int send_command_request(struct stream *stream)
 | 
				
			||||||
		TAG_U32, size,
 | 
							TAG_U32, size,
 | 
				
			||||||
		TAG_INVALID);
 | 
							TAG_INVALID);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->pending += size;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return send_message(client, msg);
 | 
						return send_message(client, msg);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -876,6 +836,8 @@ static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
 | 
				
			||||||
	attr->tlength -= attr->tlength % frame_size;
 | 
						attr->tlength -= attr->tlength % frame_size;
 | 
				
			||||||
	attr->tlength = SPA_MAX(attr->tlength, frame_size);
 | 
						attr->tlength = SPA_MAX(attr->tlength, frame_size);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						s->missing = attr->tlength;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (attr->minreq == (uint32_t) -1) {
 | 
						if (attr->minreq == (uint32_t) -1) {
 | 
				
			||||||
		uint32_t process = usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss);
 | 
							uint32_t process = usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss);
 | 
				
			||||||
		/* With low-latency, tlength/4 gives a decent default in all of traditional,
 | 
							/* With low-latency, tlength/4 gives a decent default in all of traditional,
 | 
				
			||||||
| 
						 | 
					@ -917,22 +879,31 @@ static int reply_create_playback_stream(struct stream *stream)
 | 
				
			||||||
	char latency[32];
 | 
						char latency[32];
 | 
				
			||||||
	struct pw_manager_object *peer;
 | 
						struct pw_manager_object *peer;
 | 
				
			||||||
	const char *peer_name;
 | 
						const char *peer_name;
 | 
				
			||||||
	bool peer_suspended;
 | 
						struct spa_fraction lat;
 | 
				
			||||||
 | 
						uint64_t lat_usec;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fix_playback_buffer_attr(stream, &stream->attr);
 | 
						fix_playback_buffer_attr(stream, &stream->attr);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stream->buffer = calloc(1, stream->attr.maxlength);
 | 
				
			||||||
 | 
						if (stream->buffer == NULL)
 | 
				
			||||||
 | 
							return -errno;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spa_ringbuffer_init(&stream->ring);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pw_log_info(NAME" %p: reply CREATE_PLAYBACK_STREAM tag:%u", stream,
 | 
						pw_log_info(NAME" %p: reply CREATE_PLAYBACK_STREAM tag:%u", stream,
 | 
				
			||||||
			stream->create_tag);
 | 
								stream->create_tag);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	snprintf(latency, sizeof(latency)-1, "%u/%u",
 | 
						lat.num = stream->attr.minreq * 2 / stream->frame_size;
 | 
				
			||||||
			stream->attr.minreq * 2 / stream->frame_size,
 | 
						lat.denom = stream->ss.rate;
 | 
				
			||||||
			stream->ss.rate);
 | 
						lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						snprintf(latency, sizeof(latency)-1, "%u/%u", lat.num, lat.denom);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
 | 
						items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
 | 
				
			||||||
	pw_stream_update_properties(stream->stream,
 | 
						pw_stream_update_properties(stream->stream,
 | 
				
			||||||
			&SPA_DICT_INIT(items, 1));
 | 
								&SPA_DICT_INIT(items, 1));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	size = writable_size(stream, 0);
 | 
						size = stream_pop_missing(stream);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	reply = reply_new(client, stream->create_tag);
 | 
						reply = reply_new(client, stream->create_tag);
 | 
				
			||||||
	message_put(reply,
 | 
						message_put(reply,
 | 
				
			||||||
| 
						 | 
					@ -941,18 +912,13 @@ static int reply_create_playback_stream(struct stream *stream)
 | 
				
			||||||
		TAG_U32, size,				/* missing/requested bytes */
 | 
							TAG_U32, size,				/* missing/requested bytes */
 | 
				
			||||||
		TAG_INVALID);
 | 
							TAG_INVALID);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->pending = size;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	peer = find_linked(manager, stream->id, stream->direction);
 | 
						peer = find_linked(manager, stream->id, stream->direction);
 | 
				
			||||||
	if (peer && is_sink(peer)) {
 | 
						if (peer && is_sink(peer)) {
 | 
				
			||||||
		struct pw_node_info *info = peer->info;
 | 
					 | 
				
			||||||
		peer_id = peer->id;
 | 
							peer_id = peer->id;
 | 
				
			||||||
		peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
 | 
							peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
 | 
				
			||||||
		peer_suspended = info->state == PW_NODE_STATE_SUSPENDED;
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		peer_id = SPA_ID_INVALID;
 | 
							peer_id = SPA_ID_INVALID;
 | 
				
			||||||
		peer_name = NULL;
 | 
							peer_name = NULL;
 | 
				
			||||||
		peer_suspended = false;
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (client->version >= 9) {
 | 
						if (client->version >= 9) {
 | 
				
			||||||
| 
						 | 
					@ -969,12 +935,12 @@ static int reply_create_playback_stream(struct stream *stream)
 | 
				
			||||||
			TAG_CHANNEL_MAP, &stream->map,
 | 
								TAG_CHANNEL_MAP, &stream->map,
 | 
				
			||||||
			TAG_U32, peer_id,		/* sink index */
 | 
								TAG_U32, peer_id,		/* sink index */
 | 
				
			||||||
			TAG_STRING, peer_name,		/* sink name */
 | 
								TAG_STRING, peer_name,		/* sink name */
 | 
				
			||||||
			TAG_BOOLEAN, peer_suspended,	/* sink suspended state */
 | 
								TAG_BOOLEAN, false,		/* sink suspended state */
 | 
				
			||||||
			TAG_INVALID);
 | 
								TAG_INVALID);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (client->version >= 13) {
 | 
						if (client->version >= 13) {
 | 
				
			||||||
		message_put(reply,
 | 
							message_put(reply,
 | 
				
			||||||
			TAG_USEC, 0ULL,			/* sink configured latency */
 | 
								TAG_USEC, lat_usec,		/* sink configured latency */
 | 
				
			||||||
			TAG_INVALID);
 | 
								TAG_INVALID);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (client->version >= 21) {
 | 
						if (client->version >= 21) {
 | 
				
			||||||
| 
						 | 
					@ -1027,13 +993,22 @@ static int reply_create_record_stream(struct stream *stream)
 | 
				
			||||||
	struct pw_manager_object *peer;
 | 
						struct pw_manager_object *peer;
 | 
				
			||||||
	const char *peer_name, *name;
 | 
						const char *peer_name, *name;
 | 
				
			||||||
	uint32_t peer_id;
 | 
						uint32_t peer_id;
 | 
				
			||||||
	bool peer_suspended;
 | 
						struct spa_fraction lat;
 | 
				
			||||||
 | 
						uint64_t lat_usec;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fix_record_buffer_attr(stream, &stream->attr);
 | 
						fix_record_buffer_attr(stream, &stream->attr);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	snprintf(latency, sizeof(latency)-1, "%u/%u",
 | 
						stream->buffer = calloc(1, stream->attr.maxlength);
 | 
				
			||||||
			stream->attr.fragsize / stream->frame_size,
 | 
						if (stream->buffer == NULL)
 | 
				
			||||||
			stream->ss.rate);
 | 
							return -errno;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spa_ringbuffer_init(&stream->ring);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						lat.num = stream->attr.fragsize / stream->frame_size,
 | 
				
			||||||
 | 
						lat.denom = stream->ss.rate;
 | 
				
			||||||
 | 
						lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						snprintf(latency, sizeof(latency)-1, "%u/%u", lat.num, lat.denom);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
 | 
						items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
 | 
				
			||||||
	pw_stream_update_properties(stream->stream,
 | 
						pw_stream_update_properties(stream->stream,
 | 
				
			||||||
| 
						 | 
					@ -1049,7 +1024,6 @@ static int reply_create_record_stream(struct stream *stream)
 | 
				
			||||||
	if (peer && is_sink_input(peer))
 | 
						if (peer && is_sink_input(peer))
 | 
				
			||||||
		peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT);
 | 
							peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT);
 | 
				
			||||||
	if (peer && is_source_or_monitor(peer)) {
 | 
						if (peer && is_source_or_monitor(peer)) {
 | 
				
			||||||
		struct pw_node_info *info = peer->info;
 | 
					 | 
				
			||||||
		name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
 | 
							name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
 | 
				
			||||||
		if (!is_source(peer)) {
 | 
							if (!is_source(peer)) {
 | 
				
			||||||
			size_t len = (name ? strlen(name) : 5) + 10;
 | 
								size_t len = (name ? strlen(name) : 5) + 10;
 | 
				
			||||||
| 
						 | 
					@ -1060,11 +1034,9 @@ static int reply_create_record_stream(struct stream *stream)
 | 
				
			||||||
			peer_id = peer->id;
 | 
								peer_id = peer->id;
 | 
				
			||||||
			peer_name = name;
 | 
								peer_name = name;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		peer_suspended = info->state == PW_NODE_STATE_SUSPENDED;
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		peer_id = SPA_ID_INVALID;
 | 
							peer_id = SPA_ID_INVALID;
 | 
				
			||||||
		peer_name = NULL;
 | 
							peer_name = NULL;
 | 
				
			||||||
		peer_suspended = false;
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (client->version >= 9) {
 | 
						if (client->version >= 9) {
 | 
				
			||||||
| 
						 | 
					@ -1079,12 +1051,12 @@ static int reply_create_record_stream(struct stream *stream)
 | 
				
			||||||
			TAG_CHANNEL_MAP, &stream->map,
 | 
								TAG_CHANNEL_MAP, &stream->map,
 | 
				
			||||||
			TAG_U32, peer_id,		/* source index */
 | 
								TAG_U32, peer_id,		/* source index */
 | 
				
			||||||
			TAG_STRING, peer_name,		/* source name */
 | 
								TAG_STRING, peer_name,		/* source name */
 | 
				
			||||||
			TAG_BOOLEAN, peer_suspended,	/* source suspended state */
 | 
								TAG_BOOLEAN, false,		/* source suspended state */
 | 
				
			||||||
			TAG_INVALID);
 | 
								TAG_INVALID);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (client->version >= 13) {
 | 
						if (client->version >= 13) {
 | 
				
			||||||
		message_put(reply,
 | 
							message_put(reply,
 | 
				
			||||||
			TAG_USEC, 0ULL,			/* source configured latency */
 | 
								TAG_USEC, lat_usec,		/* source configured latency */
 | 
				
			||||||
			TAG_INVALID);
 | 
								TAG_INVALID);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (client->version >= 22) {
 | 
						if (client->version >= 22) {
 | 
				
			||||||
| 
						 | 
					@ -1208,10 +1180,11 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct process_data {
 | 
					struct process_data {
 | 
				
			||||||
	uint32_t underrun_for;
 | 
						struct pw_time pwt;
 | 
				
			||||||
	uint32_t playing_for;
 | 
					 | 
				
			||||||
	uint32_t read_index;
 | 
						uint32_t read_index;
 | 
				
			||||||
	uint32_t write_index;
 | 
						uint32_t write_index;
 | 
				
			||||||
 | 
						uint32_t underrun_for;
 | 
				
			||||||
 | 
						uint32_t playing_for;
 | 
				
			||||||
	unsigned int underrun:1;
 | 
						unsigned int underrun:1;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1222,8 +1195,20 @@ do_process_done(struct spa_loop *loop,
 | 
				
			||||||
	struct stream *stream = user_data;
 | 
						struct stream *stream = user_data;
 | 
				
			||||||
	struct client *client = stream->client;
 | 
						struct client *client = stream->client;
 | 
				
			||||||
	const struct process_data *pd = data;
 | 
						const struct process_data *pd = data;
 | 
				
			||||||
 | 
						uint32_t index;
 | 
				
			||||||
 | 
						int32_t avail;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						stream->timestamp.tv_sec = pd->pwt.now / SPA_NSEC_PER_SEC;
 | 
				
			||||||
 | 
						stream->timestamp.tv_usec = (pd->pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC;
 | 
				
			||||||
 | 
						if (pd->pwt.rate.denom > 0) {
 | 
				
			||||||
 | 
							stream->delay = pd->pwt.delay * SPA_USEC_PER_SEC / pd->pwt.rate.denom;
 | 
				
			||||||
 | 
							stream->delay += pd->pwt.queued * SPA_USEC_PER_SEC / stream->ss.rate;
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							stream->delay = 0;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
						if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
				
			||||||
 | 
							stream->read_index = pd->read_index;
 | 
				
			||||||
		if (stream->corked) {
 | 
							if (stream->corked) {
 | 
				
			||||||
			stream->underrun_for += pd->underrun_for;
 | 
								stream->underrun_for += pd->underrun_for;
 | 
				
			||||||
			stream->playing_for = 0;
 | 
								stream->playing_for = 0;
 | 
				
			||||||
| 
						 | 
					@ -1238,41 +1223,47 @@ do_process_done(struct spa_loop *loop,
 | 
				
			||||||
			else
 | 
								else
 | 
				
			||||||
				send_stream_started(stream);
 | 
									send_stream_started(stream);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							stream->missing += pd->playing_for;
 | 
				
			||||||
		stream->playing_for += pd->playing_for;
 | 
							stream->playing_for += pd->playing_for;
 | 
				
			||||||
		stream->underrun_for += pd->underrun_for;
 | 
							stream->underrun_for += pd->underrun_for;
 | 
				
			||||||
		stream->pending -= SPA_MIN(stream->pending, pd->underrun_for);
 | 
					
 | 
				
			||||||
		send_command_request(stream);
 | 
							send_command_request(stream);
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		uint32_t index;
 | 
							stream->write_index = pd->write_index;
 | 
				
			||||||
		int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
 | 
							avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
 | 
				
			||||||
		if (avail <= 0) {
 | 
							if (avail <= 0) {
 | 
				
			||||||
			/* underrun */
 | 
								/* underrun */
 | 
				
			||||||
			if (!stream->is_underrun) {
 | 
								if (!stream->is_underrun) {
 | 
				
			||||||
				stream->is_underrun = true;
 | 
									stream->is_underrun = true;
 | 
				
			||||||
				send_underflow(stream, index);
 | 
									send_underflow(stream, index);
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else if (avail > MAXLENGTH) {
 | 
							} else if (avail > (int32_t)stream->attr.maxlength) {
 | 
				
			||||||
			/* overrun */
 | 
								/* overrun */
 | 
				
			||||||
			send_overflow(stream);
 | 
								send_overflow(stream);
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			struct message *msg;
 | 
								struct message *msg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			msg = message_alloc(client, stream->channel, avail);
 | 
								msg = message_alloc(client, stream->channel, avail);
 | 
				
			||||||
			if (msg != NULL) {
 | 
								if (msg == NULL)
 | 
				
			||||||
				spa_ringbuffer_read_data(&stream->ring,
 | 
									return -errno;
 | 
				
			||||||
						stream->buffer, MAXLENGTH,
 | 
					
 | 
				
			||||||
						index % MAXLENGTH,
 | 
								spa_ringbuffer_read_data(&stream->ring,
 | 
				
			||||||
						msg->data, avail);
 | 
										stream->buffer, stream->attr.maxlength,
 | 
				
			||||||
				spa_ringbuffer_read_update(&stream->ring,
 | 
										index % stream->attr.maxlength,
 | 
				
			||||||
						index + avail);
 | 
										msg->data, avail);
 | 
				
			||||||
				send_message(client, msg);
 | 
					
 | 
				
			||||||
			}
 | 
								stream->read_index = index + avail;
 | 
				
			||||||
 | 
								spa_ringbuffer_read_update(&stream->ring, stream->read_index);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			stream->is_underrun = false;
 | 
								stream->is_underrun = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								send_message(client, msg);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static void stream_process(void *data)
 | 
					static void stream_process(void *data)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct stream *stream = data;
 | 
						struct stream *stream = data;
 | 
				
			||||||
| 
						 | 
					@ -1294,6 +1285,7 @@ static void stream_process(void *data)
 | 
				
			||||||
		return;
 | 
							return;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	spa_zero(pd);
 | 
						spa_zero(pd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
						if (stream->direction == PW_DIRECTION_OUTPUT) {
 | 
				
			||||||
		int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index);
 | 
							int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index);
 | 
				
			||||||
		if (avail <= 0) {
 | 
							if (avail <= 0) {
 | 
				
			||||||
| 
						 | 
					@ -1307,7 +1299,7 @@ static void stream_process(void *data)
 | 
				
			||||||
				pd.underrun_for = size;
 | 
									pd.underrun_for = size;
 | 
				
			||||||
				pd.underrun = true;
 | 
									pd.underrun = true;
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		} else if (avail > MAXLENGTH) {
 | 
							} else if (avail > (int32_t)stream->attr.maxlength) {
 | 
				
			||||||
			/* overrun, handled by other side */
 | 
								/* overrun, handled by other side */
 | 
				
			||||||
			pw_log_warn(NAME" %p: overrun", stream);
 | 
								pw_log_warn(NAME" %p: overrun", stream);
 | 
				
			||||||
			size = buf->datas[0].maxsize;
 | 
								size = buf->datas[0].maxsize;
 | 
				
			||||||
| 
						 | 
					@ -1316,11 +1308,12 @@ static void stream_process(void *data)
 | 
				
			||||||
			size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
 | 
								size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			spa_ringbuffer_read_data(&stream->ring,
 | 
								spa_ringbuffer_read_data(&stream->ring,
 | 
				
			||||||
					stream->buffer, MAXLENGTH,
 | 
										stream->buffer, stream->attr.maxlength,
 | 
				
			||||||
					pd.read_index % MAXLENGTH,
 | 
										pd.read_index % stream->attr.maxlength,
 | 
				
			||||||
					p, size);
 | 
										p, size);
 | 
				
			||||||
			spa_ringbuffer_read_update(&stream->ring,
 | 
					
 | 
				
			||||||
					pd.read_index + size);
 | 
								pd.read_index += size;
 | 
				
			||||||
 | 
								spa_ringbuffer_read_update(&stream->ring, pd.read_index);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			pd.playing_for = size;
 | 
								pd.playing_for = size;
 | 
				
			||||||
			pd.underrun = false;
 | 
								pd.underrun = false;
 | 
				
			||||||
| 
						 | 
					@ -1328,30 +1321,34 @@ static void stream_process(void *data)
 | 
				
			||||||
	        buf->datas[0].chunk->offset = 0;
 | 
						        buf->datas[0].chunk->offset = 0;
 | 
				
			||||||
	        buf->datas[0].chunk->stride = stream->frame_size;
 | 
						        buf->datas[0].chunk->stride = stream->frame_size;
 | 
				
			||||||
	        buf->datas[0].chunk->size = size;
 | 
						        buf->datas[0].chunk->size = size;
 | 
				
			||||||
 | 
						        buffer->size = size / stream->frame_size;
 | 
				
			||||||
	} else  {
 | 
						} else  {
 | 
				
			||||||
		int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index);
 | 
							int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index);
 | 
				
			||||||
		if (filled < 0) {
 | 
							if (filled < 0) {
 | 
				
			||||||
			/* underrun */
 | 
								/* underrun */
 | 
				
			||||||
			pw_log_warn(NAME" %p: underrun", stream);
 | 
								pw_log_warn(NAME" %p: underrun", stream);
 | 
				
			||||||
		} else if (filled > MAXLENGTH) {
 | 
							} else if (filled > (int32_t)stream->attr.maxlength) {
 | 
				
			||||||
			/* overrun */
 | 
								/* overrun */
 | 
				
			||||||
			pw_log_warn(NAME" %p: overrun", stream);
 | 
								pw_log_warn(NAME" %p: overrun", stream);
 | 
				
			||||||
		} else {
 | 
							} else {
 | 
				
			||||||
			uint32_t avail = MAXLENGTH - filled;
 | 
								uint32_t avail = stream->attr.maxlength - filled;
 | 
				
			||||||
			size = SPA_MIN(buf->datas[0].chunk->size, avail);
 | 
								size = SPA_MIN(buf->datas[0].chunk->size, avail);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			spa_ringbuffer_write_data(&stream->ring,
 | 
								spa_ringbuffer_write_data(&stream->ring,
 | 
				
			||||||
					stream->buffer, MAXLENGTH,
 | 
										stream->buffer, stream->attr.maxlength,
 | 
				
			||||||
					pd.write_index % MAXLENGTH,
 | 
										pd.write_index % stream->attr.maxlength,
 | 
				
			||||||
					SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
 | 
										SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
 | 
				
			||||||
					size);
 | 
										size);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								pd.write_index += size;
 | 
				
			||||||
			spa_ringbuffer_write_update(&stream->ring,
 | 
								spa_ringbuffer_write_update(&stream->ring,
 | 
				
			||||||
					pd.write_index + size);
 | 
										pd.write_index);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	pw_stream_queue_buffer(stream->stream, buffer);
 | 
						pw_stream_queue_buffer(stream->stream, buffer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pw_stream_get_time(stream->stream, &pd.pwt);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pw_loop_invoke(impl->loop,
 | 
						pw_loop_invoke(impl->loop,
 | 
				
			||||||
			do_process_done, 1, &pd, sizeof(pd), false, stream);
 | 
								do_process_done, 1, &pd, sizeof(pd), false, stream);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -1561,12 +1558,6 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
 | 
				
			||||||
	if (stream->channel == SPA_ID_INVALID)
 | 
						if (stream->channel == SPA_ID_INVALID)
 | 
				
			||||||
		goto error_errno;
 | 
							goto error_errno;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->buffer = calloc(1, MAXLENGTH);
 | 
					 | 
				
			||||||
	if (stream->buffer == NULL)
 | 
					 | 
				
			||||||
		goto error_errno;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	spa_ringbuffer_init(&stream->ring);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	stream->direction = PW_DIRECTION_OUTPUT;
 | 
						stream->direction = PW_DIRECTION_OUTPUT;
 | 
				
			||||||
	stream->create_tag = tag;
 | 
						stream->create_tag = tag;
 | 
				
			||||||
	stream->ss = ss;
 | 
						stream->ss = ss;
 | 
				
			||||||
| 
						 | 
					@ -1781,7 +1772,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
 | 
				
			||||||
	if (stream->channel == SPA_ID_INVALID)
 | 
						if (stream->channel == SPA_ID_INVALID)
 | 
				
			||||||
		goto error_errno;
 | 
							goto error_errno;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	stream->buffer = calloc(1, MAXLENGTH);
 | 
						stream->buffer = calloc(1, stream->attr.maxlength);
 | 
				
			||||||
	if (stream->buffer == NULL)
 | 
						if (stream->buffer == NULL)
 | 
				
			||||||
		goto error_errno;
 | 
							goto error_errno;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1906,8 +1897,6 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint
 | 
				
			||||||
	if (stream == NULL)
 | 
						if (stream == NULL)
 | 
				
			||||||
		return -EINVAL;
 | 
							return -EINVAL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	update_timing_info(stream);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64,
 | 
						pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64,
 | 
				
			||||||
			stream->read_index, stream->write_index,
 | 
								stream->read_index, stream->write_index,
 | 
				
			||||||
			stream->write_index - stream->read_index, stream->delay);
 | 
								stream->write_index - stream->read_index, stream->delay);
 | 
				
			||||||
| 
						 | 
					@ -1953,8 +1942,6 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32
 | 
				
			||||||
	if (stream == NULL)
 | 
						if (stream == NULL)
 | 
				
			||||||
		return -EINVAL;
 | 
							return -EINVAL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	update_timing_info(stream);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	reply = reply_new(client, tag);
 | 
						reply = reply_new(client, tag);
 | 
				
			||||||
	message_put(reply,
 | 
						message_put(reply,
 | 
				
			||||||
		TAG_USEC, 0,			/* monitor latency */
 | 
							TAG_USEC, 0,			/* monitor latency */
 | 
				
			||||||
| 
						 | 
					@ -4069,7 +4056,7 @@ static int handle_memblock(struct client *client, struct message *msg)
 | 
				
			||||||
	uint32_t channel, flags, index;
 | 
						uint32_t channel, flags, index;
 | 
				
			||||||
	int64_t offset;
 | 
						int64_t offset;
 | 
				
			||||||
	int32_t filled;
 | 
						int32_t filled;
 | 
				
			||||||
	int res;
 | 
						int res = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	channel = ntohl(client->desc.channel);
 | 
						channel = ntohl(client->desc.channel);
 | 
				
			||||||
	offset = (int64_t) (
 | 
						offset = (int64_t) (
 | 
				
			||||||
| 
						 | 
					@ -4087,29 +4074,25 @@ static int handle_memblock(struct client *client, struct message *msg)
 | 
				
			||||||
		goto finish;
 | 
							goto finish;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
	filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
 | 
						filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
 | 
				
			||||||
	pw_log_debug("new block %p %p/%u filled:%d index:%d", msg,
 | 
						pw_log_debug("new block %p %p/%u filled:%d index:%d", msg,
 | 
				
			||||||
			msg->data, msg->length, filled, index);
 | 
								msg->data, msg->length, filled, index);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (filled < 0) {
 | 
						if (filled < 0) {
 | 
				
			||||||
		/* underrun, reported on reader side */
 | 
							/* underrun, reported on reader side */
 | 
				
			||||||
	} else if (filled + msg->length > MAXLENGTH) {
 | 
						} else if (filled + msg->length > stream->attr.maxlength) {
 | 
				
			||||||
		/* overrun */
 | 
							/* overrun */
 | 
				
			||||||
		send_overflow(stream);
 | 
							send_overflow(stream);
 | 
				
			||||||
		stream->write_index += msg->length;
 | 
					 | 
				
			||||||
		stream->pending -= SPA_MIN(stream->pending, msg->length);
 | 
					 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		spa_ringbuffer_write_data(&stream->ring,
 | 
							spa_ringbuffer_write_data(&stream->ring,
 | 
				
			||||||
				stream->buffer, MAXLENGTH,
 | 
									stream->buffer, stream->attr.maxlength,
 | 
				
			||||||
				index % MAXLENGTH,
 | 
									index % stream->attr.maxlength,
 | 
				
			||||||
				msg->data, msg->length);
 | 
									msg->data, msg->length);
 | 
				
			||||||
		spa_ringbuffer_write_update(&stream->ring,
 | 
					
 | 
				
			||||||
				index + msg->length);
 | 
					 | 
				
			||||||
		stream->write_index += msg->length;
 | 
					 | 
				
			||||||
		stream->pending -= SPA_MIN(stream->pending, msg->length);
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	res = 0;
 | 
						stream->write_index = index + msg->length;
 | 
				
			||||||
 | 
						spa_ringbuffer_write_update(&stream->ring, stream->write_index);
 | 
				
			||||||
 | 
						stream->requested -= msg->length;
 | 
				
			||||||
finish:
 | 
					finish:
 | 
				
			||||||
	message_free(client, msg, false, false);
 | 
						message_free(client, msg, false, false);
 | 
				
			||||||
	return res;
 | 
						return res;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue