pulse-server.c 124 KB
Newer Older
Wim Taymans's avatar
Wim Taymans committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* PipeWire
 *
 * Copyright © 2020 Wim Taymans
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 * and/or sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice (including the next
 * paragraph) shall be included in all copies or substantial portions of the
 * Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 */

#include "config.h"

#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
Gleb Popov's avatar
Gleb Popov committed
30
#include <netinet/in.h>
31
32
#include <netinet/tcp.h>
#include <netinet/ip.h>
Wim Taymans's avatar
Wim Taymans committed
33
34
35
36
37
#include <sys/un.h>
#include <stdio.h>
#include <errno.h>
#include <math.h>
#include <time.h>
38
#include <limits.h>
Wim Taymans's avatar
Wim Taymans committed
39
40
#include <sys/stat.h>
#include <sys/types.h>
41
#include <sys/time.h>
Wim Taymans's avatar
Wim Taymans committed
42
43
44
45
46
#include <arpa/inet.h>
#if HAVE_PWD_H
#include <pwd.h>
#endif

47
48
49
50
#ifdef HAVE_SYSTEMD
#include <systemd/sd-daemon.h>
#endif

51
52
53
54
#include <pipewire/log.h>

#define spa_debug pw_log_debug

Wim Taymans's avatar
Wim Taymans committed
55
#include <spa/utils/result.h>
56
57
58
59
60
#include <spa/debug/dict.h>
#include <spa/debug/mem.h>
#include <spa/param/audio/raw.h>
#include <spa/pod/pod.h>
#include <spa/param/audio/format-utils.h>
61
#include <spa/param/props.h>
62
#include <spa/utils/ringbuffer.h>
Wim Taymans's avatar
Wim Taymans committed
63
64

#include "pipewire/pipewire.h"
65
#include "pipewire/private.h"
Wim Taymans's avatar
Wim Taymans committed
66

67
68
#include "pulse-server.h"
#include "defs.h"
Wim Taymans's avatar
Wim Taymans committed
69

70
#include "format.c"
71
#include "volume.c"
72
#include "message.c"
73
#include "manager.h"
Wim Taymans's avatar
Wim Taymans committed
74

75
#define NAME	"pulse-server"
Wim Taymans's avatar
Wim Taymans committed
76

77
78
static bool debug_messages = false;

79
80
struct impl;
struct server;
81
struct client;
Wim Taymans's avatar
Wim Taymans committed
82

83
84
85
86
87
88
89
struct operation {
	struct spa_list link;
	struct client *client;
	uint32_t tag;
	void (*callback) (struct operation *op);
};

Wim Taymans's avatar
Wim Taymans committed
90
91
92
struct client {
	struct spa_list link;
	struct impl *impl;
93
	struct server *server;
Wim Taymans's avatar
Wim Taymans committed
94

95
96
	const char *name;

97
	struct spa_source *source;
Wim Taymans's avatar
Wim Taymans committed
98

99
100
101
102
103
	uint32_t version;

	struct pw_properties *props;

	struct pw_core *core;
104
105
	struct pw_manager *manager;
	struct spa_hook manager_listener;
106
107
108
109
110
111

	uint32_t subscribed;

	uint32_t default_sink;
	uint32_t default_source;

112
	uint32_t connect_tag;
113

Wim Taymans's avatar
Wim Taymans committed
114
115
	uint32_t in_index;
	uint32_t out_index;
Wim Taymans's avatar
Wim Taymans committed
116
	struct descriptor desc;
Wim Taymans's avatar
Wim Taymans committed
117
	struct message *message;
118
119

	struct pw_map streams;
Wim Taymans's avatar
Wim Taymans committed
120
121
	struct spa_list free_messages;
	struct spa_list out_messages;
122

123
	struct spa_list operations;
124
	struct spa_list modules;
125

126
	unsigned int disconnecting:1;
127
128
};

129
130
131
132
133
134
135
136
struct buffer_attr {
	uint32_t maxlength;
	uint32_t tlength;
	uint32_t prebuf;
	uint32_t minreq;
	uint32_t fragsize;
};

137
138
139
struct stream {
	uint32_t create_tag;
	uint32_t channel;	/* index in map */
140
	uint32_t id;		/* id of global */
141
142
143

	struct impl *impl;
	struct client *client;
Wim Taymans's avatar
Wim Taymans committed
144
	enum pw_direction direction;
145
146
147
148

	struct pw_stream *stream;
	struct spa_hook stream_listener;

149
150
151
	struct spa_ringbuffer ring;
	void *buffer;

152
153
	int64_t read_index;
	int64_t write_index;
154
	uint64_t underrun_for;
155
	uint64_t playing_for;
156
157
158
	uint64_t ticks_base;
	struct timeval timestamp;
	int64_t delay;
159
160
161

	uint32_t missing;
	uint32_t requested;
162
163
164
165

	struct sample_spec ss;
	struct channel_map map;
	struct buffer_attr attr;
166
	uint32_t frame_size;
167

168
	struct volume volume;
Wim Taymans's avatar
Wim Taymans committed
169
170
	bool muted;

171
	uint32_t drain_tag;
172
	unsigned int corked:1;
173
174
	unsigned int volume_set:1;
	unsigned int muted_set:1;
175
	unsigned int adjust_latency:1;
176
	unsigned int have_time:1;
177
	unsigned int is_underrun:1;
178
	unsigned int in_prebuf:1;
Wim Taymans's avatar
Wim Taymans committed
179
180
};

181
182
183
struct server {
	struct spa_list link;
	struct impl *impl;
184

Wim Taymans's avatar
Wim Taymans committed
185
186
187
188
189
190
#define SERVER_TYPE_INVALID	0
#define SERVER_TYPE_UNIX	1
#define SERVER_TYPE_INET	2
	uint32_t type;
	struct sockaddr_un addr;

191
192
193
194
        struct spa_source *source;
	struct spa_list clients;
};

195
196
197
struct impl {
	struct pw_loop *loop;
	struct pw_context *context;
198
	struct spa_hook context_listener;
199

200
	struct pw_properties *props;
Wim Taymans's avatar
Wim Taymans committed
201

202
203
204
	struct ratelimit rate_limit;

	struct spa_source *source;
205
	struct spa_list servers;
Wim Taymans's avatar
Wim Taymans committed
206
};
207

Wim Taymans's avatar
Wim Taymans committed
208
#include "collect.c"
209
#include "module.c"
Wim Taymans's avatar
Wim Taymans committed
210

Wim Taymans's avatar
Wim Taymans committed
211
struct command {
212
	const char *name;
Wim Taymans's avatar
Wim Taymans committed
213
	int (*run) (struct client *client, uint32_t command, uint32_t tag, struct message *msg);
Wim Taymans's avatar
Wim Taymans committed
214
};
Wim Taymans's avatar
Wim Taymans committed
215
static const struct command commands[COMMAND_MAX];
Wim Taymans's avatar
Wim Taymans committed
216

217
static void message_free(struct client *client, struct message *msg, bool dequeue, bool destroy)
Wim Taymans's avatar
Wim Taymans committed
218
{
219
220
	if (dequeue)
		spa_list_remove(&msg->link);
221
	if (destroy) {
222
		pw_log_trace("destroy message %p", msg);
223
		free(msg->data);
Wim Taymans's avatar
Wim Taymans committed
224
		free(msg);
225
226
	} else {
		pw_log_trace("recycle message %p", msg);
Wim Taymans's avatar
Wim Taymans committed
227
		spa_list_append(&client->free_messages, &msg->link);
228
	}
Wim Taymans's avatar
Wim Taymans committed
229
}
230

231
static struct message *message_alloc(struct client *client, uint32_t channel, uint32_t size)
Wim Taymans's avatar
Wim Taymans committed
232
{
Wim Taymans's avatar
Wim Taymans committed
233
234
235
236
237
	struct message *msg = NULL;

	if (!spa_list_is_empty(&client->free_messages)) {
		msg = spa_list_first(&client->free_messages, struct message, link);
		spa_list_remove(&msg->link);
238
		pw_log_trace("using recycled message %p", msg);
Wim Taymans's avatar
Wim Taymans committed
239
	}
240
	if (msg == NULL) {
241
		msg = calloc(1, sizeof(struct message));
242
243
		pw_log_trace("new message %p", msg);
	}
244
245
246
	if (msg == NULL)
		return NULL;
	ensure_size(msg, size);
247
	msg->channel = channel;
Wim Taymans's avatar
Wim Taymans committed
248
249
250
251
	msg->offset = 0;
	msg->length = size;
	return msg;
}
Wim Taymans's avatar
Wim Taymans committed
252

Wim Taymans's avatar
Wim Taymans committed
253
254
255
256
257
258
259
260
261
262
263
static int flush_messages(struct client *client)
{
	int res;

	while (true) {
		struct message *m;
		struct descriptor desc;
		void *data;
		size_t size;

		if (spa_list_is_empty(&client->out_messages))
264
			break;
Wim Taymans's avatar
Wim Taymans committed
265
266
267
268
		m = spa_list_first(&client->out_messages, struct message, link);

		if (client->out_index < sizeof(desc)) {
			desc.length = htonl(m->length);
269
			desc.channel = htonl(m->channel);
Wim Taymans's avatar
Wim Taymans committed
270
271
272
273
274
275
276
277
278
279
280
			desc.offset_hi = 0;
			desc.offset_lo = 0;
			desc.flags = 0;

			data = SPA_MEMBER(&desc, client->out_index, void);
			size = sizeof(desc) - client->out_index;
		} else if (client->out_index < m->length + sizeof(desc)) {
			uint32_t idx = client->out_index - sizeof(desc);
			data = m->data + idx;
			size = m->length - idx;
		} else {
281
			message_free(client, m, true, false);
Wim Taymans's avatar
Wim Taymans committed
282
283
284
285
286
287
288
289
290
			client->out_index = 0;
			continue;
		}

		while (true) {
			res = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
			if (res < 0) {
				if (errno == EINTR)
					continue;
291
292
293
				if (errno != EAGAIN && errno != EWOULDBLOCK)
					pw_log_warn("send channel:%d %zu, res %d: %m", m->channel, size, res);
				return -errno;
Wim Taymans's avatar
Wim Taymans committed
294
295
296
297
298
			}
			client->out_index += res;
			break;
		}
	}
Wim Taymans's avatar
Wim Taymans committed
299
300
301
	return 0;
}

Wim Taymans's avatar
Wim Taymans committed
302
static int send_message(struct client *client, struct message *m)
Wim Taymans's avatar
Wim Taymans committed
303
304
{
	struct impl *impl = client->impl;
Wim Taymans's avatar
Wim Taymans committed
305
306
	int res;

307
308
309
310
311
312
313
314
315
316
317
	if (m == NULL)
		return -EINVAL;

	if (m->length == 0) {
		res = 0;
		goto error;
	} else if (m->length > m->allocated) {
		res = -ENOMEM;
		goto error;
	}

Wim Taymans's avatar
Wim Taymans committed
318
319
	m->offset = 0;
	spa_list_append(&client->out_messages, &m->link);
320
321
322
323

	if (debug_messages)
		message_dump(m);

Wim Taymans's avatar
Wim Taymans committed
324
325
326
327
328
329
330
331
	res = flush_messages(client);
	if (res == -EAGAIN) {
		int mask = client->source->mask;
		SPA_FLAG_SET(mask, SPA_IO_OUT);
		pw_loop_update_io(impl->loop, client->source, mask);
		res = 0;
	}
	return res;
332
333
334
335
error:
	if (m)
		message_free(client, m, false, false);
	return res;
Wim Taymans's avatar
Wim Taymans committed
336
337
338
339
340
}

static struct message *reply_new(struct client *client, uint32_t tag)
{
	struct message *reply;
341
	reply = message_alloc(client, -1, 0);
Wim Taymans's avatar
Wim Taymans committed
342
343
344
345
346
347
348
349
350
351
352
353
354
355
	pw_log_debug(NAME" %p: REPLY tag:%u", client, tag);
	message_put(reply,
		TAG_U32, COMMAND_REPLY,
		TAG_U32, tag,
		TAG_INVALID);
	return reply;
}

static int reply_simple_ack(struct client *client, uint32_t tag)
{
	struct message *reply = reply_new(client, tag);
	return send_message(client, reply);
}

356
static int reply_error(struct client *client, uint32_t command, uint32_t tag, int res)
Wim Taymans's avatar
Wim Taymans committed
357
358
{
	struct message *reply;
359
	uint32_t error = res_to_err(res);
360
361
362
363
364
365
	const char *name;

	if (command < COMMAND_MAX)
		name = commands[command].name;
	else
		name = "invalid";
Wim Taymans's avatar
Wim Taymans committed
366

367
368
	pw_log_warn(NAME" %p: [%s] ERROR command:%d (%s) tag:%u error:%u (%s)",
			client, client->name, command, name, tag, error, spa_strerror(res));
Wim Taymans's avatar
Wim Taymans committed
369

370
	reply = message_alloc(client, -1, 0);
Wim Taymans's avatar
Wim Taymans committed
371
372
373
374
375
376
377
378
	message_put(reply,
		TAG_U32, COMMAND_ERROR,
		TAG_U32, tag,
		TAG_U32, error,
		TAG_INVALID);
	return send_message(client, reply);
}

379
380
381
static int send_underflow(struct stream *stream, int64_t offset)
{
	struct client *client = stream->client;
382
	struct impl *impl = client->impl;
383
384
	struct message *reply;

385
	if (ratelimit_test(&impl->rate_limit, SPA_TIMEVAL_TO_NSEC(&stream->timestamp))) {
386
387
		pw_log_warn(NAME" %p: [%s] UNDERFLOW channel:%u offset:%"PRIi64,
				client, client->name, stream->channel, offset);
388
	}
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403

	reply = message_alloc(client, -1, 0);
	message_put(reply,
		TAG_U32, COMMAND_UNDERFLOW,
		TAG_U32, -1,
		TAG_U32, stream->channel,
		TAG_INVALID);
	if (client->version >= 23) {
		message_put(reply,
			TAG_S64, offset,
			TAG_INVALID);
	}
	return send_message(client, reply);
}

404
405
406
407
static int send_subscribe_event(struct client *client, uint32_t event, uint32_t id)
{
	struct message *reply;

Wim Taymans's avatar
Wim Taymans committed
408
	pw_log_debug(NAME" %p: SUBSCRIBE event:%08x id:%u", client, event, id);
409
410
411
412
413
414
415
416
417
418
419

	reply = message_alloc(client, -1, 0);
	message_put(reply,
		TAG_U32, COMMAND_SUBSCRIBE_EVENT,
		TAG_U32, -1,
		TAG_U32, event,
		TAG_U32, id,
		TAG_INVALID);
	return send_message(client, reply);
}

420
421
422
423
424
static int send_overflow(struct stream *stream)
{
	struct client *client = stream->client;
	struct message *reply;

425
426
	pw_log_warn(NAME" %p: [%s] OVERFLOW channel:%u", client,
			client->name, stream->channel);
427
428
429
430
431
432
433
434
435
436

	reply = message_alloc(client, -1, 0);
	message_put(reply,
		TAG_U32, COMMAND_OVERFLOW,
		TAG_U32, -1,
		TAG_U32, stream->channel,
		TAG_INVALID);
	return send_message(client, reply);
}

437
438
439
440
441
442
443
444
445
446
static int send_stream_killed(struct stream *stream)
{
	struct client *client = stream->client;
	struct message *reply;
	uint32_t command;

	command = stream->direction == PW_DIRECTION_OUTPUT ?
		COMMAND_PLAYBACK_STREAM_KILLED :
		COMMAND_RECORD_STREAM_KILLED;

447
	pw_log_info(NAME" %p: [%s] %s channel:%u", client, client->name,
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
			commands[command].name, stream->channel);

	if (client->version < 23)
		return 0;

	reply = message_alloc(client, -1, 0);
	message_put(reply,
		TAG_U32, command,
		TAG_U32, -1,
		TAG_U32, stream->channel,
		TAG_INVALID);
	return send_message(client, reply);
}

static int send_stream_started(struct stream *stream)
{
	struct client *client = stream->client;
	struct message *reply;

Wim Taymans's avatar
Wim Taymans committed
467
	pw_log_debug(NAME" %p: STARTED channel:%u", client, stream->channel);
468
469
470
471
472
473
474
475
476
477

	reply = message_alloc(client, -1, 0);
	message_put(reply,
		TAG_U32, COMMAND_STARTED,
		TAG_U32, -1,
		TAG_U32, stream->channel,
		TAG_INVALID);
	return send_message(client, reply);
}

Wim Taymans's avatar
Wim Taymans committed
478
479
480
481
static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m)
{
	struct impl *impl = client->impl;
	struct message *reply;
482
483
	uint32_t version;
	const void *cookie;
484
	size_t len;
485

486
	if (message_get(m,
487
			TAG_U32, &version,
488
			TAG_ARBITRARY, &cookie, &len,
489
490
			TAG_INVALID) < 0) {
		return -EPROTO;
491
492
493
	}
	if (version < 8)
		return -EPROTO;
494
495
	if (len != NATIVE_COOKIE_LENGTH)
		return -EINVAL;
496
497
498
499
500

	if ((version & PROTOCOL_VERSION_MASK) >= 13)
		version &= PROTOCOL_VERSION_MASK;

	client->version = version;
Wim Taymans's avatar
Wim Taymans committed
501

502
	pw_log_info(NAME" %p: client:%p AUTH tag:%u version:%d", impl, client, tag, version);
Wim Taymans's avatar
Wim Taymans committed
503

Wim Taymans's avatar
Wim Taymans committed
504
505
	reply = reply_new(client, tag);
	message_put(reply,
506
507
			TAG_U32, PROTOCOL_VERSION,
			TAG_INVALID);
Wim Taymans's avatar
Wim Taymans committed
508

Wim Taymans's avatar
Wim Taymans committed
509
	return send_message(client, reply);
Wim Taymans's avatar
Wim Taymans committed
510
511
}

512
513
514
static int reply_set_client_name(struct client *client, uint32_t tag)
{
	struct message *reply;
Wim Taymans's avatar
Wim Taymans committed
515
516
517
518
519
520
521
522
523
	struct pw_client *c;
	uint32_t id;

	c = pw_core_get_client(client->core);
	if (c == NULL)
		return reply_error(client, COMMAND_SET_CLIENT_NAME, tag, -ENOENT);

	id = pw_proxy_get_bound_id((struct pw_proxy*)c);

524
	pw_log_info(NAME" %p: [%s] reply tag:%u id:%u", client, client->name, tag, id);
Wim Taymans's avatar
Wim Taymans committed
525

526
527
528
529
	reply = reply_new(client, tag);

	if (client->version >= 13) {
		message_put(reply,
Wim Taymans's avatar
Wim Taymans committed
530
			TAG_U32, id,		/* client index */
531
532
533
534
535
536
537
538
539
			TAG_INVALID);
	}
	return send_message(client, reply);
}

static void manager_sync(void *data)
{
	struct client *client = data;

Wim Taymans's avatar
Wim Taymans committed
540
	pw_log_debug(NAME" %p: manager sync", client);
541
542

	if (client->connect_tag != SPA_ID_INVALID) {
543
		reply_set_client_name(client, client->connect_tag);
544
545
546
547
548
549
550
551
552
553
554
555
		client->connect_tag = SPA_ID_INVALID;
	}
}

static struct stream *find_stream(struct client *client, uint32_t id)
{
	union pw_map_item *item;
	pw_array_for_each(item, &client->streams.items) {
		struct stream *s = item->data;
                if (!pw_map_item_is_free(item) &&
		    s->id == id)
			return s;
556
	}
557
	return NULL;
558
559
}

560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
static uint32_t get_event_and_id(struct client *client, struct pw_manager_object *o, uint32_t *id)
{
	uint32_t event = 0, res_id = o->id;

	if (client->subscribed & SUBSCRIPTION_MASK_SINK &&
	    is_sink(o)) {
		event = SUBSCRIPTION_EVENT_SINK;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE &&
	    is_source_or_monitor(o)) {
		if (!is_source(o))
			res_id |= 0x10000U;
		event = SUBSCRIPTION_EVENT_SOURCE;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_SINK_INPUT &&
	    is_sink_input(o)) {
		event = SUBSCRIPTION_EVENT_SINK_INPUT;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE_OUTPUT &&
	    is_source_output(o)) {
		event = SUBSCRIPTION_EVENT_SOURCE_OUTPUT;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_MODULE &&
	    is_module(o)) {
		event = SUBSCRIPTION_EVENT_MODULE;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_CLIENT &&
	    is_client(o)) {
		event = SUBSCRIPTION_EVENT_CLIENT;
	}
	else if (client->subscribed & SUBSCRIPTION_MASK_CARD &&
	    is_card(o)) {
		event = SUBSCRIPTION_EVENT_CARD;
	} else
		event = SPA_ID_INVALID;
	if (id)
		*id = res_id;
	return event;
}

static void manager_added(void *data, struct pw_manager_object *o)
{
	struct client *client = data;
	uint32_t event, id;

	if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
		send_subscribe_event(client,
				event | SUBSCRIPTION_EVENT_NEW,
				id);
}

static void manager_updated(void *data, struct pw_manager_object *o)
{
	struct client *client = data;
	uint32_t event, id;

	if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
		send_subscribe_event(client,
				event | SUBSCRIPTION_EVENT_CHANGE,
				id);
}

static void manager_removed(void *data, struct pw_manager_object *o)
{
	struct client *client = data;
	uint32_t event, id;

	if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
		send_subscribe_event(client,
				event | SUBSCRIPTION_EVENT_REMOVE,
				id);
}

static void manager_metadata(void *data, uint32_t subject, const char *key,
			const char *type, const char *value)
{
	struct client *client = data;
	uint32_t val;
	bool changed = false;

	pw_log_debug("meta %d %s %s %s", subject, key, type, value);
	if (subject == PW_ID_CORE) {
		val = (key && value) ? (uint32_t)atoi(value) : SPA_ID_INVALID;
		if (key == NULL || strcmp(key, "default.audio.sink") == 0) {
			changed = client->default_sink != val;
			client->default_sink = val;
		}
		if (key == NULL || strcmp(key, "default.audio.source") == 0) {
			changed = client->default_source != val;
			client->default_source = val;
		}
	}
	if (changed) {
		if (client->subscribed & SUBSCRIPTION_MASK_SERVER) {
			send_subscribe_event(client,
				SUBSCRIPTION_EVENT_CHANGE |
				SUBSCRIPTION_EVENT_SERVER,
				-1);
		}
	}
}

static const struct pw_manager_events manager_events = {
	PW_VERSION_MANAGER_EVENTS,
	.sync = manager_sync,
	.added = manager_added,
	.updated = manager_updated,
	.removed = manager_removed,
	.metadata = manager_metadata,
};

Wim Taymans's avatar
Wim Taymans committed
671
static int do_set_client_name(struct client *client, uint32_t command, uint32_t tag, struct message *m)
Wim Taymans's avatar
Wim Taymans committed
672
673
{
	struct impl *impl = client->impl;
674
	const char *name = NULL;
675
	int res = 0, changed = 0;
676
677

	if (client->version < 13) {
678
		if (message_get(m,
679
				TAG_STRING, &name,
680
681
				TAG_INVALID) < 0)
			return -EPROTO;
682
		if (name)
Wim Taymans's avatar
Wim Taymans committed
683
684
			changed += pw_properties_set(client->props,
					PW_KEY_APP_NAME, name);
685
	} else {
686
		if (message_get(m,
687
				TAG_PROPLIST, client->props ? &client->props->dict : NULL,
688
689
				TAG_INVALID) < 0)
			return -EPROTO;
690
		changed++;
691
	}
692

693
694
695
	client->name = pw_properties_get(client->props, PW_KEY_APP_NAME);
	pw_log_info(NAME" %p: [%s] %s tag:%d", impl, client->name,
			commands[command].name, tag);
696

697
698
699
700
701
702
703
	if (client->core == NULL) {
		client->core = pw_context_connect(impl->context,
				pw_properties_copy(client->props), 0);
		if (client->core == NULL) {
			res = -errno;
			goto error;
		}
704
705
706
707
708
709
710
711
712
713
714
		client->manager = pw_manager_new(client->core);
		if (client->manager == NULL) {
			res = -errno;
			goto error;
		}
		client->connect_tag = tag;
		pw_manager_add_listener(client->manager, &client->manager_listener,
				&manager_events, client);
	} else {
		if (changed)
			pw_core_update_properties(client->core, &client->props->dict);
Wim Taymans's avatar
Wim Taymans committed
715

716
717
		if (client->connect_tag == SPA_ID_INVALID)
			res = reply_set_client_name(client, tag);
718
	}
719
	return res;
720
error:
721
	pw_log_error(NAME" %p: failed to connect client: %s", impl, spa_strerror(res));
722
723
	return res;

Wim Taymans's avatar
Wim Taymans committed
724
725
}

Wim Taymans's avatar
Wim Taymans committed
726
static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, struct message *m)
Wim Taymans's avatar
Wim Taymans committed
727
728
{
	struct impl *impl = client->impl;
729
730
	uint32_t mask;

731
	if (message_get(m,
732
			TAG_U32, &mask,
733
734
			TAG_INVALID) < 0)
		return -EPROTO;
735

736
737
	pw_log_info(NAME" %p: [%s] SUBSCRIBE tag:%u mask:%08x", impl,
			client->name, tag, mask);
738
	client->subscribed = mask;
739

Wim Taymans's avatar
Wim Taymans committed
740
	return reply_simple_ack(client, tag);
Wim Taymans's avatar
Wim Taymans committed
741
742
743
744
}

static void stream_flush(struct stream *stream)
{
745
746
747
748
749
750
751
752
753
754
	uint32_t old;

	old = stream->ring.writeindex;
	stream->ring.writeindex = stream->ring.readindex;
	stream->missing += (old - stream->ring.writeindex);
	stream->write_index = stream->read_index = stream->ring.writeindex;

	if (stream->attr.prebuf > 0)
		stream->in_prebuf = true;

755
	stream->playing_for = 0;
756
757
	stream->underrun_for = 0;
	stream->have_time = false;
758
	stream->is_underrun = true;
Wim Taymans's avatar
Wim Taymans committed
759
760
}

761
762
763
static void stream_free(struct stream *stream)
{
	struct client *client = stream->client;
764
765
	struct impl *impl = client->impl;

Wim Taymans's avatar
Wim Taymans committed
766
767
	pw_log_debug(NAME" %p: stream %p channel:%d", impl, stream, stream->channel);

768
769
770
771
	/* force processing of all pending messages before we destroy
	 * the stream */
	pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);

772
773
	if (stream->channel != SPA_ID_INVALID)
		pw_map_remove(&client->streams, stream->channel);
Wim Taymans's avatar
Wim Taymans committed
774
	stream_flush(stream);
775
776
777
778
	if (stream->stream) {
		spa_hook_remove(&stream->stream_listener);
		pw_stream_destroy(stream->stream);
	}
779
780
	if (stream->buffer)
		free(stream->buffer);
781
782
	free(stream);
}
783

784
static bool stream_prebuf_active(struct stream *stream)
785
{
786
787
	uint32_t index;
	int32_t avail;
788

789
790
791
792
793
	avail = spa_ringbuffer_get_write_index(&stream->ring, &index);
	if (stream->in_prebuf)
		return avail < (int32_t) stream->attr.prebuf;
	else
		return stream->attr.prebuf > 0 && avail >= 0;
794
795
}

796
static uint32_t stream_pop_missing(struct stream *stream)
797
{
798
	uint32_t missing;
799

800
801
	if (stream->missing <= 0)
		return 0;
802

803
804
805
	if (stream->missing < stream->attr.minreq &&
	    !stream_prebuf_active(stream))
		return 0;
806

807
808
809
810
	missing = stream->missing;
	stream->requested += missing;
	stream->missing = 0;
	return missing;
811
812
}

813
static int send_command_request(struct stream *stream)
814
815
{
	struct client *client = stream->client;
Wim Taymans's avatar
Wim Taymans committed
816
	struct message *msg;
817
818
	uint32_t size;

819
	size = stream_pop_missing(stream);
820
821
	pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);

822
823
824
	if (size == 0)
		return 0;

825
	msg = message_alloc(client, -1, 0);
Wim Taymans's avatar
Wim Taymans committed
826
	message_put(msg,
827
828
829
830
831
832
		TAG_U32, COMMAND_REQUEST,
		TAG_U32, -1,
		TAG_U32, stream->channel,
		TAG_U32, size,
		TAG_INVALID);

Wim Taymans's avatar
Wim Taymans committed
833
	return send_message(client, msg);
834
835
}

Wim Taymans's avatar
Wim Taymans committed
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
static uint32_t usec_to_bytes_round_up(uint64_t usec, const struct sample_spec *ss)
{
	uint64_t u;
	u = (uint64_t) usec * (uint64_t) ss->rate;
	u = (u + 1000000UL - 1) / 1000000UL;
	u *= sample_spec_frame_size(ss);
	return (uint32_t) u;
}

static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
	uint32_t frame_size, max_prebuf, minreq;

	frame_size = s->frame_size;

	if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
		attr->maxlength = MAXLENGTH;
	attr->maxlength -= attr->maxlength % frame_size;
	attr->maxlength = SPA_MAX(attr->maxlength, frame_size);

	if (attr->tlength == (uint32_t) -1)
		attr->tlength = usec_to_bytes_round_up(DEFAULT_TLENGTH_MSEC*1000, &s->ss);
	if (attr->tlength > attr->maxlength)
		attr->tlength = attr->maxlength;
	attr->tlength -= attr->tlength % frame_size;
	attr->tlength = SPA_MAX(attr->tlength, frame_size);

863
864
	s->missing = attr->tlength;

Wim Taymans's avatar
Wim Taymans committed
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
	if (attr->minreq == (uint32_t) -1) {
		uint32_t process = usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss);
		/* With low-latency, tlength/4 gives a decent default in all of traditional,
		 * adjust latency and early request modes. */
		uint32_t m = attr->tlength / 4;
		m -= m % frame_size;
		attr->minreq = SPA_MIN(process, m);
	}
	minreq = usec_to_bytes_round_up(MIN_USEC, &s->ss);
	attr->minreq = SPA_MAX(attr->minreq, minreq);

	if (attr->tlength < attr->minreq+frame_size)
		attr->tlength = attr->minreq + frame_size;

	attr->minreq -= attr->minreq % frame_size;
	if (attr->minreq <= 0) {
		attr->minreq = frame_size;
		attr->tlength += frame_size*2;
	}
	if (attr->tlength <= attr->minreq)
		attr->tlength = attr->minreq*2 + frame_size;

	max_prebuf = attr->tlength + frame_size - attr->minreq;
	if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf)
		attr->prebuf = max_prebuf;
	attr->prebuf -= attr->prebuf % frame_size;

892
893
	attr->fragsize = 0;

894
895
896
	pw_log_info(NAME" %p: [%s] maxlength:%u tlength:%u minreq:%u prebuf:%u", s,
			s->client->name, attr->maxlength, attr->tlength,
			attr->minreq, attr->prebuf);
Wim Taymans's avatar
Wim Taymans committed
897
}
898

899
900
901
static int reply_create_playback_stream(struct stream *stream)
{
	struct client *client = stream->client;
Wim Taymans's avatar
Wim Taymans committed
902
	struct pw_manager *manager = client->manager;
Wim Taymans's avatar
Wim Taymans committed
903
	struct message *reply;
904
	uint32_t size, peer_id;
Wim Taymans's avatar
Wim Taymans committed
905
906
	struct spa_dict_item items[1];
	char latency[32];
907
908
	struct pw_manager_object *peer;
	const char *peer_name;
909
910
	struct spa_fraction lat;
	uint64_t lat_usec;
Wim Taymans's avatar
Wim Taymans committed
911
912
913

	fix_playback_buffer_attr(stream, &stream->attr);

914
915
916
917
918
919
	stream->buffer = calloc(1, stream->attr.maxlength);
	if (stream->buffer == NULL)
		return -errno;

	spa_ringbuffer_init(&stream->ring);

920
921
	pw_log_info(NAME" %p: [%s] reply CREATE_PLAYBACK_STREAM tag:%u", stream,
			client->name, stream->create_tag);
Wim Taymans's avatar
Wim Taymans committed
922

923
924
925
926
927
	lat.num = stream->attr.minreq * 2 / stream->frame_size;
	lat.denom = stream->ss.rate;
	lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;

	snprintf(latency, sizeof(latency)-1, "%u/%u", lat.num, lat.denom);
Wim Taymans's avatar
Wim Taymans committed
928
929
930
931

	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
	pw_stream_update_properties(stream->stream,
			&SPA_DICT_INIT(items, 1));
Wim Taymans's avatar
Wim Taymans committed
932

933
	size = stream_pop_missing(stream);
934

Wim Taymans's avatar
Wim Taymans committed
935
936
	reply = reply_new(client, stream->create_tag);
	message_put(reply,
937
938
939
		TAG_U32, stream->channel,		/* stream index/channel */
		TAG_U32, stream->id,			/* sink_input/stream index */
		TAG_U32, size,				/* missing/requested bytes */
940
941
		TAG_INVALID);

Wim Taymans's avatar
Wim Taymans committed
942
	peer = find_linked(manager, stream->id, stream->direction);
943
	if (peer && is_sink(peer)) {
944
945
		peer_id = peer->id;
		peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
946
947
948
949
950
	} else {
		peer_id = SPA_ID_INVALID;
		peer_name = NULL;
	}

951
	if (client->version >= 9) {
Wim Taymans's avatar
Wim Taymans committed
952
		message_put(reply,
953
954
955
956
957
958
959
			TAG_U32, stream->attr.maxlength,
			TAG_U32, stream->attr.tlength,
			TAG_U32, stream->attr.prebuf,
			TAG_U32, stream->attr.minreq,
			TAG_INVALID);
	}
	if (client->version >= 12) {
Wim Taymans's avatar
Wim Taymans committed
960
		message_put(reply,
961
962
			TAG_SAMPLE_SPEC, &stream->ss,
			TAG_CHANNEL_MAP, &stream->map,
963
964
			TAG_U32, peer_id,		/* sink index */
			TAG_STRING, peer_name,		/* sink name */
965
			TAG_BOOLEAN, false,		/* sink suspended state */
966
967
968
			TAG_INVALID);
	}
	if (client->version >= 13) {
Wim Taymans's avatar
Wim Taymans committed
969
		message_put(reply,
970
			TAG_USEC, lat_usec,		/* sink configured latency */
971
972
973
974
975
976
			TAG_INVALID);
	}
	if (client->version >= 21) {
		struct format_info info;
		spa_zero(info);
		info.encoding = ENCODING_PCM;
Wim Taymans's avatar
Wim Taymans committed
977
		message_put(reply,
978
979
980
			TAG_FORMAT_INFO, &info,		/* sink_input format */
			TAG_INVALID);
	}
Wim Taymans's avatar
Wim Taymans committed
981

982
	stream->create_tag = SPA_ID_INVALID;
Wim Taymans's avatar
Wim Taymans committed
983

Wim Taymans's avatar
Wim Taymans committed
984
	return send_message(client, reply);
Wim Taymans's avatar
Wim Taymans committed
985
986
}

Wim Taymans's avatar
Wim Taymans committed
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
static void fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
{
	uint32_t frame_size, minfrag;

	frame_size = s->frame_size;

	if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
		attr->maxlength = MAXLENGTH;
	attr->maxlength -= attr->maxlength % frame_size;
	attr->maxlength = SPA_MAX(attr->maxlength, frame_size);

	minfrag = usec_to_bytes_round_up(MIN_USEC, &s->ss);

	if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
For faster browsing, not all history is shown. View entire blame