Commit 74f6bb07 authored by Wim Taymans's avatar Wim Taymans

stream: WIP on getting queued samples accounted for

parent 7693834a
Pipeline #124551 passed with stages
in 2 minutes and 5 seconds
......@@ -280,7 +280,8 @@ struct spa_io_rate_match {
double rate; /**< rate for resampler */
#define SPA_IO_RATE_MATCH_FLAG_ACTIVE (1 << 0)
uint32_t flags; /**< extra flags */
uint32_t padding[7];
struct spa_fraction srate; /**< rate for delay/size */
uint32_t padding[5];
};
#ifdef __cplusplus
......
......@@ -717,7 +717,7 @@ static int impl_node_process(void *object)
struct spa_io_buffers *outio, *inio;
struct buffer *sbuf, *dbuf;
struct spa_buffer *sb, *db;
uint32_t i, size, in_len, out_len, maxsize, max;
uint32_t i, size, in_len, out_len, maxsize, max, queued;
#ifndef FASTPATH
uint32_t pin_len, pout_len;
#endif
......@@ -773,26 +773,27 @@ static int impl_node_process(void *object)
else
max = maxsize / sizeof(float);
if (this->io_rate_match) {
if (SPA_FLAG_IS_SET(this->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE)) {
resample_update_rate(&this->resample, this->io_rate_match->rate);
flush_out = flush_in = true;
} else {
resample_update_rate(&this->resample, 1.0);
}
}
switch (this->mode) {
case MODE_SPLIT:
maxsize = SPA_MIN(maxsize, max * sizeof(float));
flush_out = flush_in = this->io_rate_match != NULL;
break;
case MODE_MERGE:
default:
flush_out = true;
flush_in = false;
break;
}
if (this->io_rate_match) {
if (SPA_FLAG_IS_SET(this->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE)) {
resample_update_rate(&this->resample, this->io_rate_match->rate);
} else {
resample_update_rate(&this->resample, 1.0);
}
}
in_len = (size - inport->offset) / sizeof(float);
in_len = queued = (size - inport->offset) / sizeof(float);
out_len = (maxsize - outport->offset) / sizeof(float);
src_datas = alloca(sizeof(void*) * this->resample.channels);
......@@ -822,14 +823,14 @@ static int impl_node_process(void *object)
db->datas[i].chunk->offset = 0;
}
queued -= in_len;
inport->offset += in_len * sizeof(float);
if (inport->offset >= size || flush_in) {
inio->status = SPA_STATUS_NEED_DATA;
inport->offset = 0;
inport->offset = queued = 0;
SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA);
spa_log_trace_fp(this->log, NAME " %p: return input buffer", this);
}
outport->offset += out_len * sizeof(float);
if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) {
outio->status = SPA_STATUS_HAVE_DATA;
......@@ -847,8 +848,17 @@ static int impl_node_process(void *object)
}
if (this->io_rate_match) {
this->io_rate_match->delay = resample_delay(&this->resample);
this->io_rate_match->size = resample_in_len(&this->resample, max);
uint32_t delay, size;
delay = resample_delay(&this->resample);
size = resample_in_len(&this->resample, max);
this->io_rate_match->delay = delay + queued;
this->io_rate_match->size = size;
this->io_rate_match->srate = SPA_FRACTION(1, this->resample.i_rate);
spa_log_trace_fp(this->log, NAME " %p: delay:%u size:%u queued:%d",
this, delay, size, queued);
}
return res;
}
......
......@@ -106,6 +106,7 @@ struct stream {
struct spa_callbacks callbacks;
struct spa_io_buffers *io;
struct spa_io_position *position;
struct spa_io_rate_match *rate_match;
uint32_t change_mask_all;
struct spa_port_info port_info;
......@@ -258,6 +259,8 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu
queue->outcount += buffer->this.size;
SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED);
pw_log_trace_fp("%d %"PRIi64" %"PRIi64, avail, buffer->this.size, queue->outcount);
return buffer;
}
static inline void clear_queue(struct stream *stream, struct queue *queue)
......@@ -465,6 +468,12 @@ static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t
else
impl->io = NULL;
break;
case SPA_IO_RateMatch:
if (data && size >= sizeof(struct spa_io_rate_match))
impl->rate_match = data;
else
impl->rate_match = NULL;
break;
default:
return -ENOENT;
}
......@@ -700,15 +709,26 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe
static inline void copy_position(struct stream *impl, int64_t queued)
{
struct spa_io_position *p = impl->position;
struct spa_io_rate_match *r = impl->rate_match;
SEQ_WRITE(impl->seq);
if (p != NULL) {
SEQ_WRITE(impl->seq);
impl->time.now = p->clock.nsec;
impl->time.rate = p->clock.rate;
impl->time.ticks = p->clock.position;
impl->time.delay = p->clock.delay;
impl->time.queued = queued;
SEQ_WRITE(impl->seq);
if (r != NULL) {
impl->time.ticks -= r->delay;
impl->time.queued += r->delay;
}
pw_log_trace_fp(NAME" %p: %d/%d t:%"PRIu64" d:%"PRIi64
" q:%"PRIi64" rd:%d %"PRIi64, impl,
impl->time.rate.num, impl->time.rate.denom,
impl->time.ticks, impl->time.delay, impl->time.queued, r ? r->delay : 0,
impl->time.ticks - impl->time.queued);
}
SEQ_WRITE(impl->seq);
}
static int impl_node_process_input(void *object)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment