stream.c 45.6 KB
Newer Older
Wim Taymans's avatar
Wim Taymans committed
1
/* PipeWire
Wim Taymans's avatar
Wim Taymans committed
2
 *
Wim Taymans's avatar
Wim Taymans committed
3
 * Copyright © 2018 Wim Taymans
Wim Taymans's avatar
Wim Taymans committed
4
 *
Wim Taymans's avatar
Wim Taymans committed
5
6
7
8
9
10
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 * and/or sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following conditions:
Wim Taymans's avatar
Wim Taymans committed
11
 *
Wim Taymans's avatar
Wim Taymans committed
12
13
14
15
16
17
18
19
20
21
22
 * The above copyright notice and this permission notice (including the next
 * paragraph) shall be included in all copies or substantial portions of the
 * Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
Wim Taymans's avatar
Wim Taymans committed
23
24
 */

Wim Taymans's avatar
Wim Taymans committed
25
#include <errno.h>
26
27
28
#include <stdio.h>
#include <math.h>
#include <sys/mman.h>
Wim Taymans's avatar
Wim Taymans committed
29
#include <time.h>
30

Wim Taymans's avatar
Wim Taymans committed
31
#include <spa/buffer/alloc.h>
32
33
#include <spa/param/props.h>
#include <spa/node/io.h>
34
#include <spa/node/utils.h>
Wim Taymans's avatar
Wim Taymans committed
35
#include <spa/utils/ringbuffer.h>
36
#include <spa/pod/filter.h>
Wim Taymans's avatar
Wim Taymans committed
37
#include <spa/debug/format.h>
Wim Taymans's avatar
Wim Taymans committed
38
#include <spa/debug/types.h>
Wim Taymans's avatar
Wim Taymans committed
39
#include <spa/debug/pod.h>
Wim Taymans's avatar
Wim Taymans committed
40

Wim Taymans's avatar
Wim Taymans committed
41
42
#include "pipewire/pipewire.h"
#include "pipewire/stream.h"
43
#include "pipewire/private.h"
Wim Taymans's avatar
Wim Taymans committed
44

45
46
#define NAME "stream"

47
#define MAX_BUFFERS	64
Wim Taymans's avatar
Wim Taymans committed
48

Wim Taymans's avatar
Wim Taymans committed
49
#define MASK_BUFFERS	(MAX_BUFFERS-1)
Wim Taymans's avatar
Wim Taymans committed
50
#define MAX_PORTS	1
Wim Taymans's avatar
Wim Taymans committed
51

52
struct buffer {
Wim Taymans's avatar
Wim Taymans committed
53
	struct pw_buffer this;
Wim Taymans's avatar
indent  
Wim Taymans committed
54
	uint32_t id;
Wim Taymans's avatar
Wim Taymans committed
55
#define BUFFER_FLAG_MAPPED	(1 << 0)
Wim Taymans's avatar
Wim Taymans committed
56
#define BUFFER_FLAG_QUEUED	(1 << 1)
57
#define BUFFER_FLAG_ADDED	(1 << 2)
58
	uint32_t flags;
Wim Taymans's avatar
Wim Taymans committed
59
60
61
62
63
};

struct queue {
	uint32_t ids[MAX_BUFFERS];
	struct spa_ringbuffer ring;
Wim Taymans's avatar
Wim Taymans committed
64
65
	uint64_t incount;
	uint64_t outcount;
Wim Taymans's avatar
Wim Taymans committed
66
};
Wim Taymans's avatar
Wim Taymans committed
67

Wim Taymans's avatar
Wim Taymans committed
68
struct data {
Wim Taymans's avatar
Wim Taymans committed
69
	struct pw_context *context;
Wim Taymans's avatar
Wim Taymans committed
70
71
72
	struct spa_hook stream_listener;
};

Wim Taymans's avatar
Wim Taymans committed
73
struct param {
Wim Taymans's avatar
Wim Taymans committed
74
	uint32_t id;
75
76
#define PARAM_FLAG_LOCKED	(1 << 0)
	uint32_t flags;
77
	struct spa_list link;
Wim Taymans's avatar
Wim Taymans committed
78
79
80
	struct spa_pod *param;
};

81
82
struct control {
	uint32_t id;
83
	uint32_t type;
84
85
86
	struct spa_list link;
	struct pw_stream_control control;
	struct spa_pod *info;
Wim Taymans's avatar
Wim Taymans committed
87
	unsigned int emitted:1;
88
	float values[64];
89
90
};

Wim Taymans's avatar
Wim Taymans committed
91
struct stream {
Wim Taymans's avatar
indent  
Wim Taymans committed
92
	struct pw_stream this;
Wim Taymans's avatar
Wim Taymans committed
93

94
	const char *path;
Wim Taymans's avatar
Wim Taymans committed
95

Wim Taymans's avatar
Wim Taymans committed
96
	struct pw_context *context;
97
	struct spa_hook context_listener;
Wim Taymans's avatar
Wim Taymans committed
98

Wim Taymans's avatar
indent  
Wim Taymans committed
99
	enum spa_direction direction;
Wim Taymans's avatar
Wim Taymans committed
100
	enum pw_stream_flags flags;
Wim Taymans's avatar
Wim Taymans committed
101

Wim Taymans's avatar
Wim Taymans committed
102
	struct pw_impl_node *node;
Wim Taymans's avatar
Wim Taymans committed
103

104
	struct spa_node impl_node;
105
	struct spa_node_methods node_methods;
Wim Taymans's avatar
Wim Taymans committed
106
	struct spa_hook_list hooks;
107
	struct spa_callbacks callbacks;
108

109
	struct spa_io_position *position;
110
111
112
113
	struct spa_io_buffers *io;
	struct {
		struct spa_io_position *position;
	} rt;
114

Wim Taymans's avatar
Wim Taymans committed
115
116
	uint32_t change_mask_all;
	struct spa_port_info port_info;
Wim Taymans's avatar
Wim Taymans committed
117
	struct pw_properties *port_props;
118
119
	struct spa_list param_list;
	struct spa_param_info params[5];
Wim Taymans's avatar
Wim Taymans committed
120

121
122
123
	uint32_t media_type;
	uint32_t media_subtype;

124
	struct buffer buffers[MAX_BUFFERS];
Wim Taymans's avatar
Wim Taymans committed
125
	uint32_t n_buffers;
126

Wim Taymans's avatar
Wim Taymans committed
127
128
	struct queue dequeued;
	struct queue queued;
Wim Taymans's avatar
Wim Taymans committed
129

Wim Taymans's avatar
Wim Taymans committed
130
	struct data data;
131
	uintptr_t seq;
Wim Taymans's avatar
Wim Taymans committed
132
	struct pw_time time;
133

Wim Taymans's avatar
Wim Taymans committed
134
	unsigned int disconnecting:1;
Wim Taymans's avatar
Wim Taymans committed
135
	unsigned int disconnect_core:1;
Wim Taymans's avatar
Wim Taymans committed
136
	unsigned int draining:1;
137
	unsigned int drained:1;
Wim Taymans's avatar
Wim Taymans committed
138
	unsigned int allow_mlock:1;
139
	unsigned int warn_mlock:1;
Wim Taymans's avatar
Wim Taymans committed
140
};
Wim Taymans's avatar
Wim Taymans committed
141

142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
static int get_param_index(uint32_t id)
{
	switch (id) {
	case SPA_PARAM_EnumFormat:
		return 0;
	case SPA_PARAM_Meta:
		return 1;
	case SPA_PARAM_IO:
		return 2;
	case SPA_PARAM_Format:
		return 3;
	case SPA_PARAM_Buffers:
		return 4;
	default:
		return -1;
	}
}

Wim Taymans's avatar
Wim Taymans committed
160
static struct param *add_param(struct stream *impl,
161
		uint32_t id, uint32_t flags, const struct spa_pod *param)
Wim Taymans's avatar
Wim Taymans committed
162
163
{
	struct param *p;
164
	int idx;
Wim Taymans's avatar
Wim Taymans committed
165

Wim Taymans's avatar
Wim Taymans committed
166
	if (param == NULL || !spa_pod_is_object(param)) {
Wim Taymans's avatar
Wim Taymans committed
167
		errno = EINVAL;
Wim Taymans's avatar
Wim Taymans committed
168
		return NULL;
Wim Taymans's avatar
Wim Taymans committed
169
	}
Wim Taymans's avatar
Wim Taymans committed
170
171
	if (id == SPA_ID_INVALID)
		id = SPA_POD_OBJECT_ID(param);
172
173
174
175
176

	p = malloc(sizeof(struct param) + SPA_POD_SIZE(param));
	if (p == NULL)
		return NULL;

Wim Taymans's avatar
Wim Taymans committed
177
	p->id = id;
178
	p->flags = flags;
179
180
	p->param = SPA_MEMBER(p, sizeof(struct param), struct spa_pod);
	memcpy(p->param, param, SPA_POD_SIZE(param));
Wim Taymans's avatar
Wim Taymans committed
181
	SPA_POD_OBJECT_ID(p->param) = id;
182
183
184
185

	spa_list_append(&impl->param_list, &p->link);

	idx = get_param_index(id);
Wim Taymans's avatar
Wim Taymans committed
186
187
	if (idx != -1) {
		impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
188
		impl->params[idx].flags |= SPA_PARAM_INFO_READ;
Wim Taymans's avatar
Wim Taymans committed
189
	}
190

Wim Taymans's avatar
Wim Taymans committed
191
192
193
	return p;
}

Wim Taymans's avatar
Wim Taymans committed
194
static void clear_params(struct stream *impl, uint32_t id)
Wim Taymans's avatar
Wim Taymans committed
195
{
196
	struct param *p, *t;
Wim Taymans's avatar
Wim Taymans committed
197

198
	spa_list_for_each_safe(p, t, &impl->param_list, link) {
199
200
		if (id == SPA_ID_INVALID ||
		    (p->id == id && !(p->flags & PARAM_FLAG_LOCKED))) {
201
202
			spa_list_remove(&p->link);
			free(p);
Wim Taymans's avatar
Wim Taymans committed
203
204
205
		}
	}
}
Wim Taymans's avatar
Wim Taymans committed
206

Wim Taymans's avatar
Wim Taymans committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
static int update_params(struct stream *impl, uint32_t id,
		const struct spa_pod **params, uint32_t n_params)
{
	uint32_t i;
	int res = 0;

	if (id != SPA_ID_INVALID) {
		clear_params(impl, id);
	} else {
		for (i = 0; i < n_params; i++) {
			if (!spa_pod_is_object(params[i]))
				continue;
			clear_params(impl, SPA_POD_OBJECT_ID(params[i]));
		}
	}
	for (i = 0; i < n_params; i++) {
223
		if (add_param(impl, id, 0, params[i]) == NULL) {
Wim Taymans's avatar
Wim Taymans committed
224
225
226
227
228
229
230
231
			res = -errno;
			break;
		}
	}
	return res;
}


Wim Taymans's avatar
Wim Taymans committed
232
static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
233
{
Wim Taymans's avatar
Wim Taymans committed
234
	uint32_t index;
235

Wim Taymans's avatar
Wim Taymans committed
236
	if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED))
Wim Taymans's avatar
Wim Taymans committed
237
		return -EINVAL;
Wim Taymans's avatar
Wim Taymans committed
238
239

	SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
Wim Taymans's avatar
Wim Taymans committed
240
	queue->incount += buffer->this.size;
Wim Taymans's avatar
Wim Taymans committed
241

Wim Taymans's avatar
Wim Taymans committed
242
	spa_ringbuffer_get_write_index(&queue->ring, &index);
Wim Taymans's avatar
Wim Taymans committed
243
	queue->ids[index & MASK_BUFFERS] = buffer->id;
Wim Taymans's avatar
Wim Taymans committed
244
	spa_ringbuffer_write_update(&queue->ring, index + 1);
Wim Taymans's avatar
Wim Taymans committed
245

Wim Taymans's avatar
Wim Taymans committed
246
	return 0;
247
248
}

Wim Taymans's avatar
Wim Taymans committed
249
static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue)
250
{
Wim Taymans's avatar
Wim Taymans committed
251
252
	int32_t avail;
	uint32_t index, id;
Wim Taymans's avatar
Wim Taymans committed
253
	struct buffer *buffer;
254

255
	if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < 1) {
256
		errno = EPIPE;
Wim Taymans's avatar
Wim Taymans committed
257
		return NULL;
258
	}
Wim Taymans's avatar
Wim Taymans committed
259

Wim Taymans's avatar
Wim Taymans committed
260
	id = queue->ids[index & MASK_BUFFERS];
Wim Taymans's avatar
Wim Taymans committed
261
262
	spa_ringbuffer_read_update(&queue->ring, index + 1);

Wim Taymans's avatar
Wim Taymans committed
263
	buffer = &stream->buffers[id];
Wim Taymans's avatar
Wim Taymans committed
264
	queue->outcount += buffer->this.size;
Wim Taymans's avatar
Wim Taymans committed
265
	SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED);
Wim Taymans's avatar
Wim Taymans committed
266
267
268

	return buffer;
}
269
270
271
272
273
static inline void clear_queue(struct stream *stream, struct queue *queue)
{
	spa_ringbuffer_init(&queue->ring);
	queue->incount = queue->outcount;
}
Wim Taymans's avatar
Wim Taymans committed
274

Wim Taymans's avatar
Wim Taymans committed
275
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, const char *error)
Wim Taymans's avatar
Wim Taymans committed
276
{
Wim Taymans's avatar
Wim Taymans committed
277
278
	enum pw_stream_state old = stream->state;
	bool res = old != state;
Wim Taymans's avatar
Wim Taymans committed
279

Wim Taymans's avatar
indent  
Wim Taymans committed
280
	if (res) {
281
		free(stream->error);
Wim Taymans's avatar
Wim Taymans committed
282
		stream->error = error ? strdup(error) : NULL;
Wim Taymans's avatar
indent  
Wim Taymans committed
283

284
		pw_log_debug(NAME" %p: update state from %s -> %s (%s)", stream,
Wim Taymans's avatar
Wim Taymans committed
285
			     pw_stream_state_as_string(old),
Wim Taymans's avatar
indent  
Wim Taymans committed
286
287
			     pw_stream_state_as_string(state), stream->error);

Wim Taymans's avatar
Wim Taymans committed
288
289
290
		if (state == PW_STREAM_STATE_ERROR)
			pw_log_error(NAME" %p: error %s", stream, error);

Wim Taymans's avatar
indent  
Wim Taymans committed
291
		stream->state = state;
Wim Taymans's avatar
Wim Taymans committed
292
		pw_stream_emit_state_changed(stream, old, state, error);
Wim Taymans's avatar
indent  
Wim Taymans committed
293
294
	}
	return res;
Wim Taymans's avatar
Wim Taymans committed
295
296
}

Wim Taymans's avatar
Wim Taymans committed
297
static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id)
Wim Taymans's avatar
Wim Taymans committed
298
{
299
	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
300
301
	if (id < impl->n_buffers)
		return &impl->buffers[id];
302
303

	errno = EINVAL;
304
	return NULL;
Wim Taymans's avatar
Wim Taymans committed
305
306
}

Wim Taymans's avatar
Wim Taymans committed
307
308
309
310
311
312
static int
do_call_process(struct spa_loop *loop,
                 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
	struct stream *impl = user_data;
	struct pw_stream *stream = &impl->this;
313
	pw_log_trace(NAME" %p: do process", stream);
Wim Taymans's avatar
Wim Taymans committed
314
	pw_stream_emit_process(stream);
Wim Taymans's avatar
Wim Taymans committed
315
316
317
318
319
	return 0;
}

static void call_process(struct stream *impl)
{
320
	struct pw_stream *stream = &impl->this;
Wim Taymans's avatar
Wim Taymans committed
321
	pw_log_trace(NAME" %p: call process", impl);
Wim Taymans's avatar
Wim Taymans committed
322
	if (SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
323
		pw_stream_emit_process(stream);
Wim Taymans's avatar
Wim Taymans committed
324
325
	}
	else {
Wim Taymans's avatar
Wim Taymans committed
326
		pw_loop_invoke(impl->context->main_loop,
Wim Taymans's avatar
Wim Taymans committed
327
328
329
330
			do_call_process, 1, NULL, 0, false, impl);
	}
}

Wim Taymans's avatar
Wim Taymans committed
331
332
333
334
335
336
337
338
339
340
341
342
343
static int
do_call_drained(struct spa_loop *loop,
                 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
	struct stream *impl = user_data;
	struct pw_stream *stream = &impl->this;
	pw_log_trace(NAME" %p: drained", stream);
	pw_stream_emit_drained(stream);
	return 0;
}

static void call_drained(struct stream *impl)
{
Wim Taymans's avatar
Wim Taymans committed
344
	pw_loop_invoke(impl->context->main_loop,
Wim Taymans's avatar
Wim Taymans committed
345
346
347
		do_call_drained, 1, NULL, 0, false, impl);
}

348
349
350
351
352
353
354
355
356
static int
do_set_position(struct spa_loop *loop,
		bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
	struct stream *impl = user_data;
	impl->rt.position = impl->position;
	return 0;
}

357
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
Wim Taymans's avatar
Wim Taymans committed
358
{
359
	struct stream *impl = object;
Wim Taymans's avatar
Wim Taymans committed
360
	struct pw_stream *stream = &impl->this;
Wim Taymans's avatar
Wim Taymans committed
361

362
363
	pw_log_debug(NAME" %p: set io id %d (%s) %p %zd", impl, id,
			spa_debug_type_find_name(spa_type_io, id), data, size);
Wim Taymans's avatar
Wim Taymans committed
364

365
366
367
368
369
370
	switch(id) {
	case SPA_IO_Position:
		if (data && size >= sizeof(struct spa_io_position))
			impl->position = data;
		else
			impl->position = NULL;
371
372
		pw_loop_invoke(impl->context->data_loop,
				do_set_position, 1, NULL, 0, true, impl);
373
374
		break;
	}
Wim Taymans's avatar
Wim Taymans committed
375
376
	pw_stream_emit_io_changed(stream, id, data, size);

Wim Taymans's avatar
Wim Taymans committed
377
378
379
	return 0;
}

380
static int impl_send_command(void *object, const struct spa_command *command)
Wim Taymans's avatar
Wim Taymans committed
381
{
382
	struct stream *impl = object;
383
	struct pw_stream *stream = &impl->this;
Wim Taymans's avatar
Wim Taymans committed
384

Wim Taymans's avatar
Wim Taymans committed
385
	switch (SPA_NODE_COMMAND_ID(command)) {
Wim Taymans's avatar
Wim Taymans committed
386
	case SPA_NODE_COMMAND_Suspend:
Wim Taymans's avatar
Wim Taymans committed
387
	case SPA_NODE_COMMAND_Pause:
Wim Taymans's avatar
Wim Taymans committed
388
		pw_loop_invoke(impl->context->main_loop,
Wim Taymans's avatar
Wim Taymans committed
389
			NULL, 0, NULL, 0, false, impl);
390
		if (stream->state == PW_STREAM_STATE_STREAMING) {
Wim Taymans's avatar
Wim Taymans committed
391

392
			pw_log_debug(NAME" %p: pause", stream);
393
394
			stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
		}
Wim Taymans's avatar
Wim Taymans committed
395
		break;
Wim Taymans's avatar
Wim Taymans committed
396
	case SPA_NODE_COMMAND_Start:
397
		if (stream->state == PW_STREAM_STATE_PAUSED) {
398
			pw_log_debug(NAME" %p: start %d", stream, impl->direction);
Wim Taymans's avatar
Wim Taymans committed
399

400
			if (impl->direction == SPA_DIRECTION_INPUT)
401
				impl->io->status = SPA_STATUS_NEED_DATA;
402
			else
Wim Taymans's avatar
Wim Taymans committed
403
				call_process(impl);
404

405
406
			stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
		}
Wim Taymans's avatar
Wim Taymans committed
407
408
		break;
	default:
409
410
		pw_log_warn(NAME" %p: unhandled node command %d", stream,
				SPA_NODE_COMMAND_ID(command));
Wim Taymans's avatar
Wim Taymans committed
411
		break;
Wim Taymans's avatar
Wim Taymans committed
412
	}
Wim Taymans's avatar
Wim Taymans committed
413
	return 0;
414
415
}

Wim Taymans's avatar
Wim Taymans committed
416
417
static void emit_node_info(struct stream *d)
{
Wim Taymans's avatar
Wim Taymans committed
418
419
420
421
422
423
424
425
426
	struct spa_node_info info;

	info = SPA_NODE_INFO_INIT();
	if (d->direction == SPA_DIRECTION_INPUT) {
		info.max_input_ports = 1;
		info.max_output_ports = 0;
	} else {
		info.max_input_ports = 0;
		info.max_output_ports = 1;
Wim Taymans's avatar
Wim Taymans committed
427
	}
Wim Taymans's avatar
Wim Taymans committed
428
429
430
	info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS;
	info.flags = SPA_NODE_FLAG_RT;
	spa_node_emit_info(&d->hooks, &info);
Wim Taymans's avatar
Wim Taymans committed
431
}
Wim Taymans's avatar
Wim Taymans committed
432

Wim Taymans's avatar
Wim Taymans committed
433
static void emit_port_info(struct stream *d, bool full)
Wim Taymans's avatar
Wim Taymans committed
434
{
Wim Taymans's avatar
Wim Taymans committed
435
436
437
438
439
	if (full)
		d->port_info.change_mask = d->change_mask_all;
	if (d->port_info.change_mask != 0)
		spa_node_emit_port_info(&d->hooks, d->direction, 0, &d->port_info);
	d->port_info.change_mask = 0;
Wim Taymans's avatar
Wim Taymans committed
440
441
}

442
static int impl_add_listener(void *object,
Wim Taymans's avatar
Wim Taymans committed
443
444
445
446
		struct spa_hook *listener,
		const struct spa_node_events *events,
		void *data)
{
447
	struct stream *d = object;
Wim Taymans's avatar
Wim Taymans committed
448
449
450
451
452
	struct spa_hook_list save;

	spa_hook_list_isolate(&d->hooks, &save, listener, events, data);

	emit_node_info(d);
Wim Taymans's avatar
Wim Taymans committed
453
	emit_port_info(d, true);
Wim Taymans's avatar
Wim Taymans committed
454
455
456
457

	spa_hook_list_join(&d->hooks, &save);

	return 0;
Wim Taymans's avatar
Wim Taymans committed
458
459
}

460
static int impl_set_callbacks(void *object,
461
			      const struct spa_node_callbacks *callbacks, void *data)
462
{
463
	struct stream *d = object;
Wim Taymans's avatar
Wim Taymans committed
464

465
	d->callbacks = SPA_CALLBACKS_INIT(callbacks, data);
466

467
	return 0;
Wim Taymans's avatar
Wim Taymans committed
468
469
}

470
static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t port_id,
471
			    uint32_t id, void *data, size_t size)
Wim Taymans's avatar
Wim Taymans committed
472
{
473
	struct stream *impl = object;
Wim Taymans's avatar
Wim Taymans committed
474

475
	pw_log_debug(NAME" %p: set io id %d (%s) %p %zd", impl, id,
Wim Taymans's avatar
Wim Taymans committed
476
			spa_debug_type_find_name(spa_type_io, id), data, size);
Wim Taymans's avatar
Wim Taymans committed
477

478
479
	switch (id) {
	case SPA_IO_Buffers:
Wim Taymans's avatar
Wim Taymans committed
480
481
482
483
		if (data && size >= sizeof(struct spa_io_buffers))
			impl->io = data;
		else
			impl->io = NULL;
484
485
486
		break;
	default:
		return -ENOENT;
Wim Taymans's avatar
Wim Taymans committed
487
	}
488
	return 0;
489
490
}

491
static int impl_port_enum_params(void *object, int seq,
492
				 enum spa_direction direction, uint32_t port_id,
Wim Taymans's avatar
Wim Taymans committed
493
				 uint32_t id, uint32_t start, uint32_t num,
Wim Taymans's avatar
Wim Taymans committed
494
				 const struct spa_pod *filter)
Wim Taymans's avatar
Wim Taymans committed
495
{
496
	struct stream *d = object;
Wim Taymans's avatar
Wim Taymans committed
497
	struct spa_result_node_params result;
Wim Taymans's avatar
Wim Taymans committed
498
499
	uint8_t buffer[1024];
	struct spa_pod_builder b = { 0 };
Wim Taymans's avatar
Wim Taymans committed
500
	uint32_t count = 0;
501
	struct param *p;
502
	bool found = false;
Wim Taymans's avatar
Wim Taymans committed
503

Wim Taymans's avatar
Wim Taymans committed
504
505
	spa_return_val_if_fail(num != 0, -EINVAL);

Wim Taymans's avatar
Wim Taymans committed
506
	result.id = id;
Wim Taymans's avatar
Wim Taymans committed
507
	result.next = 0;
Wim Taymans's avatar
Wim Taymans committed
508

509
510
511
512
	pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id,
			spa_debug_type_find_name(spa_type_param, id),
			start, num);

513
	spa_list_for_each(p, &d->param_list, link) {
Wim Taymans's avatar
Wim Taymans committed
514
		struct spa_pod *param;
Wim Taymans's avatar
Wim Taymans committed
515

516
		result.index = result.next++;
Wim Taymans's avatar
Wim Taymans committed
517
518
		if (result.index < start)
			continue;
Wim Taymans's avatar
indent  
Wim Taymans committed
519

520
		param = p->param;
Wim Taymans's avatar
Wim Taymans committed
521
		if (param == NULL || p->id != id)
Wim Taymans's avatar
Wim Taymans committed
522
			continue;
Wim Taymans's avatar
Wim Taymans committed
523

524
525
		found = true;

Wim Taymans's avatar
Wim Taymans committed
526
527
528
		spa_pod_builder_init(&b, buffer, sizeof(buffer));
		if (spa_pod_filter(&b, &result.param, param, filter) != 0)
			continue;
Wim Taymans's avatar
Wim Taymans committed
529

Wim Taymans's avatar
Wim Taymans committed
530
		spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
Wim Taymans's avatar
Wim Taymans committed
531

Wim Taymans's avatar
Wim Taymans committed
532
533
534
		if (++count == num)
			break;
	}
535
	return found ? 0 : -ENOENT;
Wim Taymans's avatar
Wim Taymans committed
536
537
}

Wim Taymans's avatar
Wim Taymans committed
538
539
540
541
542
static int map_data(struct stream *impl, struct spa_data *data, int prot)
{
	void *ptr;
	struct pw_map_range range;

Wim Taymans's avatar
Wim Taymans committed
543
	pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
Wim Taymans's avatar
Wim Taymans committed
544
545
546

	ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
	if (ptr == MAP_FAILED) {
547
		pw_log_error(NAME" %p: failed to mmap buffer mem: %m", impl);
Wim Taymans's avatar
Wim Taymans committed
548
549
		return -errno;
	}
Wim Taymans's avatar
Wim Taymans committed
550

Wim Taymans's avatar
Wim Taymans committed
551
	data->data = SPA_MEMBER(ptr, range.start, void);
552
	pw_log_debug(NAME" %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd,
Wim Taymans's avatar
Wim Taymans committed
553
554
			range.offset, range.size, data->data);

Wim Taymans's avatar
Wim Taymans committed
555
	if (impl->allow_mlock && mlock(data->data, data->maxsize) < 0) {
556
557
558
559
560
561
		pw_log(impl->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG,
				NAME" %p: Failed to mlock memory %p %u: %s", impl,
				data->data, data->maxsize,
				errno == ENOMEM ?
				"This is not a problem but for best performance, "
				"consider increasing RLIMIT_MEMLOCK" : strerror(errno));
Wim Taymans's avatar
Wim Taymans committed
562
	}
Wim Taymans's avatar
Wim Taymans committed
563
564
565
566
567
568
569
	return 0;
}

static int unmap_data(struct stream *impl, struct spa_data *data)
{
	struct pw_map_range range;

Wim Taymans's avatar
Wim Taymans committed
570
	pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
Wim Taymans's avatar
Wim Taymans committed
571
572

	if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0)
573
		pw_log_warn(NAME" %p: failed to unmap: %m", impl);
Wim Taymans's avatar
Wim Taymans committed
574

575
	pw_log_debug(NAME" %p: fd %"PRIi64" unmapped", impl, data->fd);
Wim Taymans's avatar
Wim Taymans committed
576
577
578
579
580
581
	return 0;
}

static void clear_buffers(struct pw_stream *stream)
{
	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
Wim Taymans's avatar
Wim Taymans committed
582
	uint32_t i, j;
Wim Taymans's avatar
Wim Taymans committed
583

584
	pw_log_debug(NAME" %p: clear buffers %d", stream, impl->n_buffers);
Wim Taymans's avatar
Wim Taymans committed
585
586
587
588

	for (i = 0; i < impl->n_buffers; i++) {
		struct buffer *b = &impl->buffers[i];

589
590
		if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_ADDED))
			pw_stream_emit_remove_buffer(stream, &b->this);
Wim Taymans's avatar
Wim Taymans committed
591

Wim Taymans's avatar
Wim Taymans committed
592
		if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_MAPPED)) {
Wim Taymans's avatar
Wim Taymans committed
593
594
			for (j = 0; j < b->this.buffer->n_datas; j++) {
				struct spa_data *d = &b->this.buffer->datas[j];
595
				pw_log_debug(NAME" %p: clear buffer %d mem",
Wim Taymans's avatar
Wim Taymans committed
596
597
598
599
600
601
						stream, b->id);
				unmap_data(impl, d);
			}
		}
	}
	impl->n_buffers = 0;
602
603
	clear_queue(impl, &impl->dequeued);
	clear_queue(impl, &impl->queued);
Wim Taymans's avatar
Wim Taymans committed
604
605
}

606
607
608
609
610
611
612
613
614
static int impl_port_set_param(void *object,
			       enum spa_direction direction, uint32_t port_id,
			       uint32_t id, uint32_t flags,
			       const struct spa_pod *param)
{
	struct stream *impl = object;
	struct pw_stream *stream = &impl->this;
	int res;

615
616
	if (impl->disconnecting && param != NULL)
		return -EIO;
617
618
619
620

	pw_log_debug(NAME" %p: param id %d (%s) changed: %p", impl, id,
			spa_debug_type_find_name(spa_type_param, id), param);

Wim Taymans's avatar
Wim Taymans committed
621
622
	if (param)
		pw_log_pod(SPA_LOG_LEVEL_DEBUG, param);
623
624
625
626

	if ((res = update_params(impl, id, &param, param ? 1 : 0)) < 0)
		return res;

627
	if (id == SPA_PARAM_Format)
628
629
630
631
632
633
634
635
636
637
638
639
		clear_buffers(stream);

	pw_stream_emit_param_changed(stream, id, param);

	if (stream->state == PW_STREAM_STATE_ERROR)
		return -EIO;

	emit_port_info(impl, false);

	return 0;
}

640
641
642
643
static int impl_port_use_buffers(void *object,
		enum spa_direction direction, uint32_t port_id,
		uint32_t flags,
		struct spa_buffer **buffers, uint32_t n_buffers)
Wim Taymans's avatar
Wim Taymans committed
644
{
645
	struct stream *impl = object;
Wim Taymans's avatar
Wim Taymans committed
646
	struct pw_stream *stream = &impl->this;
647
	uint32_t i, j, impl_flags = impl->flags;
Wim Taymans's avatar
Wim Taymans committed
648
	int prot, res;
Wim Taymans's avatar
Wim Taymans committed
649
	int size = 0;
Wim Taymans's avatar
indent  
Wim Taymans committed
650

651
652
	if (impl->disconnecting && n_buffers > 0)
		return -EIO;
653

654
	prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
Wim Taymans's avatar
indent  
Wim Taymans committed
655

Wim Taymans's avatar
Wim Taymans committed
656
657
	clear_buffers(stream);

658
	for (i = 0; i < n_buffers; i++) {
Wim Taymans's avatar
Wim Taymans committed
659
		int buf_size = 0;
Wim Taymans's avatar
Wim Taymans committed
660
		struct buffer *b = &impl->buffers[i];
Wim Taymans's avatar
indent  
Wim Taymans committed
661

662
		b->flags = 0;
Wim Taymans's avatar
Wim Taymans committed
663
		b->id = i;
Wim Taymans's avatar
indent  
Wim Taymans committed
664

Wim Taymans's avatar
Wim Taymans committed
665
		if (SPA_FLAG_IS_SET(impl_flags, PW_STREAM_FLAG_MAP_BUFFERS)) {
Wim Taymans's avatar
Wim Taymans committed
666
667
			for (j = 0; j < buffers[i]->n_datas; j++) {
				struct spa_data *d = &buffers[i]->datas[j];
Wim Taymans's avatar
Wim Taymans committed
668
669
				if (d->type == SPA_DATA_MemFd ||
				    d->type == SPA_DATA_DmaBuf) {
Wim Taymans's avatar
Wim Taymans committed
670
671
672
673
					if ((res = map_data(impl, d, prot)) < 0)
						return res;
				}
				else if (d->data == NULL) {
674
					pw_log_error(NAME" %p: invalid buffer mem", stream);
Wim Taymans's avatar
Wim Taymans committed
675
676
					return -EINVAL;
				}
Wim Taymans's avatar
Wim Taymans committed
677
				buf_size += d->maxsize;
678
679
			}
			SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
Wim Taymans's avatar
Wim Taymans committed
680
681

			if (size > 0 && buf_size != size) {
682
				pw_log_error(NAME" %p: invalid buffer size %d", stream, buf_size);
Wim Taymans's avatar
Wim Taymans committed
683
684
685
				return -EINVAL;
			} else
				size = buf_size;
686
		}
687
		pw_log_debug(NAME" %p: got buffer id:%d datas:%d, mapped size %d", stream, i,
Wim Taymans's avatar
Wim Taymans committed
688
				buffers[i]->n_datas, size);
Wim Taymans's avatar
Wim Taymans committed
689
690
691
692
693
694
	}

	for (i = 0; i < n_buffers; i++) {
		struct buffer *b = &impl->buffers[i];

		b->this.buffer = buffers[i];
Wim Taymans's avatar
Wim Taymans committed
695

Wim Taymans's avatar
Wim Taymans committed
696
		if (impl->direction == SPA_DIRECTION_OUTPUT) {
697
			pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
Wim Taymans's avatar
Wim Taymans committed
698
			push_queue(impl, &impl->dequeued, b);
Wim Taymans's avatar
Wim Taymans committed
699
		}
Wim Taymans's avatar
Wim Taymans committed
700

701
		SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
Wim Taymans's avatar
Wim Taymans committed
702
		pw_stream_emit_add_buffer(stream, &b->this);
Wim Taymans's avatar
indent  
Wim Taymans committed
703
	}
704

Wim Taymans's avatar
Wim Taymans committed
705
706
	impl->n_buffers = n_buffers;

707
	return 0;
Wim Taymans's avatar
Wim Taymans committed
708
709
}

710
static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
Wim Taymans's avatar
Wim Taymans committed
711
{
712
	struct stream *d = object;
713
	pw_log_trace(NAME" %p: recycle buffer %d", d, buffer_id);
Wim Taymans's avatar
Wim Taymans committed
714
715
	if (buffer_id < d->n_buffers)
		push_queue(d, &d->queued, &d->buffers[buffer_id]);
716
	return 0;
Wim Taymans's avatar
Wim Taymans committed
717
718
}

Wim Taymans's avatar
Wim Taymans committed
719
static inline void copy_position(struct stream *impl, int64_t queued)
Wim Taymans's avatar
Wim Taymans committed
720
{
721
	struct spa_io_position *p = impl->rt.position;
722
	if (p != NULL) {
Wim Taymans's avatar
Wim Taymans committed
723
		SEQ_WRITE(impl->seq);
724
725
726
727
728
		impl->time.now = p->clock.nsec;
		impl->time.rate = p->clock.rate;
		impl->time.ticks = p->clock.position;
		impl->time.delay = p->clock.delay;
		impl->time.queued = queued;
Wim Taymans's avatar
Wim Taymans committed
729
		SEQ_WRITE(impl->seq);
730
	}
Wim Taymans's avatar
Wim Taymans committed
731
732
}

733
static int impl_node_process_input(void *object)
Wim Taymans's avatar
Wim Taymans committed
734
{
735
	struct stream *impl = object;
Wim Taymans's avatar
Wim Taymans committed
736
	struct pw_stream *stream = &impl->this;
737
738
	struct spa_io_buffers *io = impl->io;
	struct buffer *b;
739

740
741
	pw_log_trace(NAME" %p: process in status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64,
			stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
Wim Taymans's avatar
Wim Taymans committed
742

743
	if (io->status != SPA_STATUS_HAVE_DATA)
Wim Taymans's avatar
Wim Taymans committed
744
		goto done;
Wim Taymans's avatar
Wim Taymans committed
745

Wim Taymans's avatar
Wim Taymans committed
746
	if ((b = get_buffer(stream, io->buffer_id)) == NULL)
Wim Taymans's avatar
Wim Taymans committed
747
		goto done;
748

Wim Taymans's avatar
Wim Taymans committed
749
750
751
	/* push new buffer */
	if (push_queue(impl, &impl->dequeued, b) == 0)
		call_process(impl);
Wim Taymans's avatar
Wim Taymans committed
752

Wim Taymans's avatar
Wim Taymans committed
753
done:
Wim Taymans's avatar
Wim Taymans committed
754
	copy_position(impl, impl->dequeued.incount);
Wim Taymans's avatar
Wim Taymans committed
755

Wim Taymans's avatar
Wim Taymans committed
756
	/* pop buffer to recycle */
Wim Taymans's avatar
Wim Taymans committed
757
	if ((b = pop_queue(impl, &impl->queued))) {
758
		pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
Wim Taymans's avatar
Wim Taymans committed
759
	}
760

Wim Taymans's avatar
Wim Taymans committed
761
	io->buffer_id = b ? b->id : SPA_ID_INVALID;
762
	io->status = SPA_STATUS_NEED_DATA;
Wim Taymans's avatar
Wim Taymans committed
763

764
	return SPA_STATUS_HAVE_DATA;
Wim Taymans's avatar
Wim Taymans committed
765
}
Wim Taymans's avatar
Wim Taymans committed
766

767
static int impl_node_process_output(void *object)
Wim Taymans's avatar
Wim Taymans committed
768
{
769
	struct stream *impl = object;
Wim Taymans's avatar
Wim Taymans committed
770
771
772
	struct pw_stream *stream = &impl->this;
	struct spa_io_buffers *io = impl->io;
	struct buffer *b;
Wim Taymans's avatar
Wim Taymans committed
773
774
	int res;
	uint32_t index;