Commit d1b6941b authored by Wim Taymans's avatar Wim Taymans

stream: use busy metadata

When we get a buffer from the server, mark it busy, when we queue it
again, mark it unbusy.
Refuse to dequeue a buffer when it is busy.
parent 4ba7fab6
Pipeline #114670 failed with stages
in 59 seconds
......@@ -56,6 +56,7 @@ struct buffer {
#define BUFFER_FLAG_MAPPED (1 << 0)
#define BUFFER_FLAG_QUEUED (1 << 1)
uint32_t flags;
struct spa_meta_busy *busy;
};
struct queue {
......@@ -680,6 +681,8 @@ static int impl_port_use_buffers(void *object,
push_queue(impl, &impl->dequeued, b);
}
b->busy = spa_buffer_find_meta_data(buffers[i], SPA_META_Busy, sizeof(*b->busy));
pw_stream_emit_add_buffer(stream, &b->this);
}
......@@ -733,8 +736,11 @@ static int impl_node_process_input(void *object)
b->this.size = size;
/* push new buffer */
if (push_queue(impl, &impl->dequeued, b) == 0)
if (push_queue(impl, &impl->dequeued, b) == 0) {
if (b->busy)
ATOMIC_INC(b->busy->count);
call_process(impl);
}
done:
copy_position(impl, impl->dequeued.incount);
......@@ -1297,6 +1303,12 @@ static void add_params(struct stream *impl)
SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))));
add_param(impl, SPA_PARAM_Meta, PARAM_FLAG_LOCKED,
spa_pod_builder_add_object(&b,
SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Busy),
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_busy))));
}
static int find_format(struct stream *impl, enum pw_direction direction,
......@@ -1727,6 +1739,15 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
}
pw_log_trace(NAME" %p: dequeue buffer %d", stream, b->id);
if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
if (ATOMIC_INC(b->busy->count) > 1) {
ATOMIC_DEC(b->busy->count);
push_queue(impl, &impl->dequeued, b);
pw_log_trace(NAME" %p: buffer busy", stream);
errno = EBUSY;
return NULL;
}
}
return &b->this;
}
......@@ -1737,6 +1758,9 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
int res;
if (b->busy)
ATOMIC_DEC(b->busy->count);
pw_log_trace(NAME" %p: queue buffer %d", stream, b->id);
if ((res = push_queue(impl, &impl->queued, b)) < 0)
return res;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment