client-node: pass position around

Pass the position from server to clients
Implement current time in stream.c using the position info
This commit is contained in:
Wim Taymans 2018-06-07 11:01:20 +02:00
parent 3d25e254ef
commit e5629b23f3
6 changed files with 34 additions and 31 deletions

View file

@ -48,10 +48,7 @@ struct pw_client_node_position {
#define PW_VERSION_CLIENT_NODE_POSITION 0 #define PW_VERSION_CLIENT_NODE_POSITION 0
uint32_t version; uint32_t version;
uint32_t seq1, seq2; /**< valid contents when equal */ uint32_t seq1, seq2; /**< valid contents when equal */
uint64_t nsec; /**< time in nanoseconds */ /* position info follows */
struct spa_fraction rate; /**< rate of the clock */
uint32_t position; /**< current position expressed in rate */
uint32_t duration; /**< duration of cycle expressed in rate */
}; };
#define PW_CLIENT_NODE_PROXY_METHOD_DONE 0 #define PW_CLIENT_NODE_PROXY_METHOD_DONE 0

View file

@ -896,7 +896,7 @@ static int impl_node_process(struct spa_node *node)
{ {
struct node *this = SPA_CONTAINER_OF(node, struct node, node); struct node *this = SPA_CONTAINER_OF(node, struct node, node);
struct impl *impl = this->impl; struct impl *impl = this->impl;
struct pw_driver_quantum *q; struct pw_driver_quantum *q, *rq;
uint64_t cmd = 1; uint64_t cmd = 1;
if (this->impl->this.status != SPA_ID_INVALID) { if (this->impl->this.status != SPA_ID_INVALID) {
@ -907,12 +907,12 @@ static int impl_node_process(struct spa_node *node)
spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node); spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node);
q = impl->this.node->driver_node->rt.quantum; q = impl->this.node->driver_node->rt.quantum;
rq = SPA_MEMBER(impl->position, sizeof(struct pw_client_node_position),
struct pw_driver_quantum);
impl->position->nsec = q->time; *rq = *q;
impl->position->duration = q->size; rq->position = impl->next_position;
impl->position->position = impl->next_position; impl->next_position += rq->size;
impl->position->rate = q->rate;
impl->next_position += q->size;
if (write(this->writefd, &cmd, 8) != 8) if (write(this->writefd, &cmd, 8) != 8)
spa_log_warn(this->log, "node %p: error %s", this, strerror(errno)); spa_log_warn(this->log, "node %p: error %s", this, strerror(errno));
@ -1185,7 +1185,7 @@ static void node_initialized(void *data)
pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]);
area_size = sizeof(struct spa_io_buffers) * MAX_AREAS; area_size = sizeof(struct spa_io_buffers) * MAX_AREAS;
size = area_size + sizeof(struct pw_client_node_position); size = area_size + sizeof(struct pw_client_node_position) + sizeof(struct pw_driver_quantum);
if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_MAP_READWRITE |

View file

@ -225,7 +225,7 @@ struct pw_module {
}; };
struct pw_driver_quantum { struct pw_driver_quantum {
uint64_t time; /**< time in nanoseconds */ uint64_t nsec; /**< time in nanoseconds */
struct spa_fraction rate; /**< rate */ struct spa_fraction rate; /**< rate */
uint32_t position; /**< current position expressed in rate */ uint32_t position; /**< current position expressed in rate */
uint32_t size; /**< size of one period expressed in rate */ uint32_t size; /**< size of one period expressed in rate */

View file

@ -539,7 +539,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t)) if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t))
pw_log_warn("proxy %p: read failed %m", proxy); pw_log_warn("proxy %p: read failed %m", proxy);
pw_log_trace("remote %p: process %d", data->remote, data->position->duration); pw_log_trace("remote %p: process", data->remote);
spa_graph_run(node->graph->parent->graph); spa_graph_run(node->graph->parent->graph);
} }
} }
@ -1237,7 +1237,17 @@ client_node_set_position(void *object,
} }
pw_log_debug("node %p: set position %p", proxy, ptr); pw_log_debug("node %p: set position %p", proxy, ptr);
if (ptr == NULL && data->position) {
m = find_mem_ptr(data, data->position);
if (m && --m->ref == 0)
clear_mem(data, m);
}
data->position = ptr; data->position = ptr;
if (ptr)
data->node->rt.quantum = SPA_MEMBER(ptr,
sizeof(struct pw_client_node_position), void);
else
data->node->rt.quantum = NULL;
} }
static const struct pw_client_node_proxy_events client_node_events = { static const struct pw_client_node_proxy_events client_node_events = {

View file

@ -1216,15 +1216,13 @@ int pw_stream_set_active(struct pw_stream *stream, bool active)
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int64_t elapsed; struct pw_driver_quantum *q;
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts); q = impl->node->rt.quantum;
time->now = SPA_TIMESPEC_TO_TIME(&ts);
elapsed = (time->now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC; time->now = q->nsec;
time->rate = impl->last_rate; time->ticks = q->position / sizeof(float);
time->rate = q->rate;
return 0; return 0;
} }

View file

@ -202,23 +202,21 @@ enum pw_stream_flags {
PW_STREAM_FLAG_NONE = 0, /**< no flags */ PW_STREAM_FLAG_NONE = 0, /**< no flags */
PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< try to automatically connect PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< try to automatically connect
* this stream */ * this stream */
PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for PW_STREAM_FLAG_INACTIVE = (1 << 1), /**< start the stream inactive */
* this stream */ PW_STREAM_FLAG_MAP_BUFFERS = (1 << 2), /**< mmap the buffers */
PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */ PW_STREAM_FLAG_DRIVER = (1 << 3), /**< be a driver */
PW_STREAM_FLAG_MAP_BUFFERS = (1 << 3), /**< mmap the buffers */ PW_STREAM_FLAG_RT_PROCESS = (1 << 4), /**< call process from the realtime
PW_STREAM_FLAG_DRIVER = (1 << 4), /**< be a driver */
PW_STREAM_FLAG_RT_PROCESS = (1 << 5), /**< call process from the realtime
* thread */ * thread */
PW_STREAM_FLAG_NO_CONVERT = (1 << 6), /**< don't convert format */ PW_STREAM_FLAG_NO_CONVERT = (1 << 5), /**< don't convert format */
PW_STREAM_FLAG_EXCLUSIVE = (1 << 7), /**< require exclusive access to the PW_STREAM_FLAG_EXCLUSIVE = (1 << 6), /**< require exclusive access to the
* device */ * device */
}; };
/** A time structure \memberof pw_stream */ /** A time structure \memberof pw_stream */
struct pw_time { struct pw_time {
int64_t now; /**< the monotonic time */ int64_t now; /**< the monotonic time */
int64_t ticks; /**< the ticks at \a now */ int64_t ticks; /**< the ticks at \a now */
int32_t rate; /**< the rate of \a ticks */ struct spa_fraction rate; /**< the rate of \a ticks */
}; };
/** Create a new unconneced \ref pw_stream \memberof pw_stream /** Create a new unconneced \ref pw_stream \memberof pw_stream