From 74f6bb07f2432796c59c28087d92e0d94998735d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 26 Mar 2020 10:25:34 +0100 Subject: [PATCH] stream: WIP on getting queued samples accounted for --- spa/include/spa/node/io.h | 3 ++- spa/plugins/audioconvert/resample.c | 40 ++++++++++++++++++----------- src/pipewire/stream.c | 24 +++++++++++++++-- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h index bc780f88..ca930ecd 100644 --- a/spa/include/spa/node/io.h +++ b/spa/include/spa/node/io.h @@ -280,7 +280,8 @@ struct spa_io_rate_match { double rate; /**< rate for resampler */ #define SPA_IO_RATE_MATCH_FLAG_ACTIVE (1 << 0) uint32_t flags; /**< extra flags */ - uint32_t padding[7]; + struct spa_fraction srate; /**< rate for delay/size */ + uint32_t padding[5]; }; #ifdef __cplusplus diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index e5891b80..6d869f08 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -717,7 +717,7 @@ static int impl_node_process(void *object) struct spa_io_buffers *outio, *inio; struct buffer *sbuf, *dbuf; struct spa_buffer *sb, *db; - uint32_t i, size, in_len, out_len, maxsize, max; + uint32_t i, size, in_len, out_len, maxsize, max, queued; #ifndef FASTPATH uint32_t pin_len, pout_len; #endif @@ -773,26 +773,27 @@ static int impl_node_process(void *object) else max = maxsize / sizeof(float); + if (this->io_rate_match) { + if (SPA_FLAG_IS_SET(this->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE)) { + resample_update_rate(&this->resample, this->io_rate_match->rate); + flush_out = flush_in = true; + } else { + resample_update_rate(&this->resample, 1.0); + } + } + switch (this->mode) { case MODE_SPLIT: maxsize = SPA_MIN(maxsize, max * sizeof(float)); - flush_out = flush_in = this->io_rate_match != NULL; break; case MODE_MERGE: default: flush_out = true; + flush_in = false; break; } - if (this->io_rate_match) { - if (SPA_FLAG_IS_SET(this->io_rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE)) { - resample_update_rate(&this->resample, this->io_rate_match->rate); - } else { - resample_update_rate(&this->resample, 1.0); - } - } - - in_len = (size - inport->offset) / sizeof(float); + in_len = queued = (size - inport->offset) / sizeof(float); out_len = (maxsize - outport->offset) / sizeof(float); src_datas = alloca(sizeof(void*) * this->resample.channels); @@ -822,14 +823,14 @@ static int impl_node_process(void *object) db->datas[i].chunk->offset = 0; } + queued -= in_len; inport->offset += in_len * sizeof(float); if (inport->offset >= size || flush_in) { inio->status = SPA_STATUS_NEED_DATA; - inport->offset = 0; + inport->offset = queued = 0; SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA); spa_log_trace_fp(this->log, NAME " %p: return input buffer", this); } - outport->offset += out_len * sizeof(float); if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) { outio->status = SPA_STATUS_HAVE_DATA; @@ -847,8 +848,17 @@ static int impl_node_process(void *object) } if (this->io_rate_match) { - this->io_rate_match->delay = resample_delay(&this->resample); - this->io_rate_match->size = resample_in_len(&this->resample, max); + uint32_t delay, size; + + delay = resample_delay(&this->resample); + size = resample_in_len(&this->resample, max); + + this->io_rate_match->delay = delay + queued; + this->io_rate_match->size = size; + this->io_rate_match->srate = SPA_FRACTION(1, this->resample.i_rate); + + spa_log_trace_fp(this->log, NAME " %p: delay:%u size:%u queued:%d", + this, delay, size, queued); } return res; } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 150c31ac..1bec0879 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -106,6 +106,7 @@ struct stream { struct spa_callbacks callbacks; struct spa_io_buffers *io; struct spa_io_position *position; + struct spa_io_rate_match *rate_match; uint32_t change_mask_all; struct spa_port_info port_info; @@ -258,6 +259,8 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu queue->outcount += buffer->this.size; SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED); + pw_log_trace_fp("%d %"PRIi64" %"PRIi64, avail, buffer->this.size, queue->outcount); + return buffer; } static inline void clear_queue(struct stream *stream, struct queue *queue) @@ -465,6 +468,12 @@ static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t else impl->io = NULL; break; + case SPA_IO_RateMatch: + if (data && size >= sizeof(struct spa_io_rate_match)) + impl->rate_match = data; + else + impl->rate_match = NULL; + break; default: return -ENOENT; } @@ -700,15 +709,26 @@ static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffe static inline void copy_position(struct stream *impl, int64_t queued) { struct spa_io_position *p = impl->position; + struct spa_io_rate_match *r = impl->rate_match; + SEQ_WRITE(impl->seq); if (p != NULL) { - SEQ_WRITE(impl->seq); impl->time.now = p->clock.nsec; impl->time.rate = p->clock.rate; impl->time.ticks = p->clock.position; impl->time.delay = p->clock.delay; impl->time.queued = queued; - SEQ_WRITE(impl->seq); + if (r != NULL) { + impl->time.ticks -= r->delay; + impl->time.queued += r->delay; + } + + pw_log_trace_fp(NAME" %p: %d/%d t:%"PRIu64" d:%"PRIi64 + " q:%"PRIi64" rd:%d %"PRIi64, impl, + impl->time.rate.num, impl->time.rate.denom, + impl->time.ticks, impl->time.delay, impl->time.queued, r ? r->delay : 0, + impl->time.ticks - impl->time.queued); } + SEQ_WRITE(impl->seq); } static int impl_node_process_input(void *object) -- GitLab