Overrun with async threaded API
I see problem as demonstrated and explained by the following program.
It works on PA (with rare underruns), even with very tight timings like:
./pat 440 100 200 300
minreq 1600 maxlength 2400
uncorking
draining
locking
pa_stream_writable_size 2400 - writing 800
pa_stream_writable_size 1600 - writing 800
pa_stream_writable_size 800 - writing 800
pa_stream_writable_size only 0 - waiting
It works (also on PW) when invoked with big buffers, like:
./pat 440 100 1000 100000
minreq 8000 maxlength 800000
uncorking
draining
locking
pa_stream_writable_size 772096 - writing 800
...
pa_stream_writable_size 4896 - writing 800
pa_stream_writable_size 4096 - writing 800
pa_stream_writable_size 3296 - writing 800
pa_stream_writable_size 2496 - writing 800
pa_stream_writable_size 1696 - writing 800
pa_stream_writable_size 896 - writing 800
pa_stream_writable_size only 96 - waiting
pa_stream_writable_size 8288 - writing 800
pa_stream_writable_size 7488 - writing 800
pa_stream_writable_size 6688 - writing 800
pa_stream_writable_size 5888 - writing 800
pa_stream_writable_size 5088 - writing 800
pa_stream_writable_size 4288 - writing 800
pa_stream_writable_size 3488 - writing 800
pa_stream_writable_size 2688 - writing 800
pa_stream_writable_size 1888 - writing 800
pa_stream_writable_size 1088 - writing 800
pa_stream_writable_size only 288 - waiting
...
It fails on PW with overflow and audible xruns (but works on PA) when invoked as:
./pat 440 100 1000 10000
minreq 8000 maxlength 80000
uncorking
draining
locking
pa_stream_writable_size 84096 - writing 800
...
pa_stream_writable_size 4896 - writing 800
pa_stream_writable_size 4096 - writing 800
pa_stream_writable_size 3296 - writing 800
pa_stream_writable_size 2496 - writing 800
pa_stream_writable_size 1696 - writing 800
pa_stream_writable_size 896 - writing 800
pa_stream_writable_size only 96 - waiting
stream_overflow_cb.
stream_overflow_cb.
stream_overflow_cb.
stream_overflow_cb.
stream_overflow_cb.
pa_stream_writable_size 8192 - writing 800
...
(Feedback on better use of the API is also very welcome.)
/*
pat.cc - PulseAudio test showing surprising async overruns on PipeWire.
Possibly a regression?
Distilled from the Ardour GPLv2+ PA backend.
Using
pipewire-pulseaudio-0.3.59-4.fc37.x86_64
pulseaudio-libs-16.1-1.fc37.x86_64
Compile as
g++ pat.cc -l pulse -o pat
Invoke as
./pat freq chunk_samples min_samples max_samples
*/
#include <math.h>
#include <iostream>
#include <pulse/pulseaudio.h>
using namespace std;
#define N_CHANNELS 2
#define SAMPLERATE 48000
// globals initialized by init_pulse / close_pulse
struct pa_stream* p_stream;
struct pa_context* p_context;
struct pa_threaded_mainloop* p_mainloop;
// callbacks
void
context_state_cb (pa_context* c, void* arg)
{
struct pa_threaded_mainloop* _p_mainloop = static_cast<struct pa_threaded_mainloop*> (arg);
switch (pa_context_get_state (c)) {
case PA_CONTEXT_READY:
case PA_CONTEXT_TERMINATED:
case PA_CONTEXT_FAILED:
pa_threaded_mainloop_signal (_p_mainloop, 0);
break;
case PA_CONTEXT_UNCONNECTED:
case PA_CONTEXT_CONNECTING:
case PA_CONTEXT_AUTHORIZING:
case PA_CONTEXT_SETTING_NAME:
break;
}
}
void
stream_state_cb (pa_stream* s, void* arg)
{
struct pa_threaded_mainloop* _p_mainloop = static_cast<struct pa_threaded_mainloop*> (arg);
switch (pa_stream_get_state (s)) {
case PA_STREAM_READY:
case PA_STREAM_FAILED:
case PA_STREAM_TERMINATED:
pa_threaded_mainloop_signal (_p_mainloop, 0);
break;
case PA_STREAM_UNCONNECTED:
case PA_STREAM_CREATING:
break;
}
}
bool _operation_succeeded;
void
stream_operation_cb (pa_stream*, int ok, void* arg)
{
struct pa_threaded_mainloop* _p_mainloop = static_cast<struct pa_threaded_mainloop*> (arg);
_operation_succeeded = ok;
pa_threaded_mainloop_signal (_p_mainloop, 0);
}
void
stream_write_request_cb (pa_stream*, size_t length, void* arg)
{
struct pa_threaded_mainloop* _p_mainloop = static_cast<struct pa_threaded_mainloop*> (arg);
pa_threaded_mainloop_signal (_p_mainloop, 0);
// XXX perhaps do processing here instead of waking up main callback thread.
}
void
stream_overflow_cb (pa_stream*, void* arg)
{
cerr << "stream_overflow_cb." << endl;
}
void
stream_underflow_cb (pa_stream*, void* arg)
{
cerr << "stream_underflow_cb." << endl;
}
// helpers
void
close_pulse (bool unlock)
{
if (p_mainloop) {
if (unlock) {
pa_threaded_mainloop_unlock (p_mainloop);
}
pa_threaded_mainloop_stop (p_mainloop);
}
if (p_stream) {
pa_stream_disconnect (p_stream);
pa_stream_unref (p_stream);
p_stream = NULL;
}
if (p_context) {
pa_context_disconnect (p_context);
pa_context_unref (p_context);
p_context = NULL;
}
if (p_mainloop) {
pa_threaded_mainloop_free (p_mainloop);
p_mainloop = NULL;
}
}
int
sync_pulse (pa_operation* op)
{
/* wait for async operation to complete */
if (!op) {
pa_threaded_mainloop_unlock (p_mainloop);
return 0;
}
pa_operation_state_t state = pa_operation_get_state (op);
while (PA_OPERATION_RUNNING == state) {
pa_threaded_mainloop_wait (p_mainloop);
state = pa_operation_get_state (op);
}
pa_operation_unref (op);
pa_threaded_mainloop_unlock (p_mainloop);
return PA_OPERATION_DONE == state;
}
bool
cork_pulse (bool pause)
{
pa_threaded_mainloop_lock (p_mainloop);
_operation_succeeded = false;
return sync_pulse (pa_stream_cork (p_stream, pause ? 1 : 0, stream_operation_cb, p_mainloop)) && _operation_succeeded;
}
void
init_pulse (int min_samples, int max_samples)
{
assert(max_samples > min_samples);
pa_sample_spec ss;
ss.channels = N_CHANNELS;
ss.rate = SAMPLERATE;
ss.format = PA_SAMPLE_FLOAT32LE;
if (!pa_sample_spec_valid (&ss)) {
cerr << "Default sample spec not valid" << endl;
exit(1);
}
if (!(p_mainloop = pa_threaded_mainloop_new ())) {
cerr << "Failed to allocate main loop" << endl;
close_pulse (false);
exit(1);
}
pa_proplist* proplist = pa_proplist_new ();
pa_proplist_sets (proplist, PA_PROP_MEDIA_SOFTWARE, "pat");
pa_proplist_sets (proplist, PA_PROP_MEDIA_ROLE, "production");
if (!(p_context = pa_context_new_with_proplist (pa_threaded_mainloop_get_api (p_mainloop), "pat", proplist))) {
cerr << "Failed to allocate context" << endl;
close_pulse (false);
pa_proplist_free (proplist);
exit(1);
}
pa_proplist_free (proplist);
pa_context_set_state_callback (p_context, context_state_cb, p_mainloop);
if (pa_context_connect (p_context, NULL, PA_CONTEXT_NOFLAGS, NULL) < 0) {
cerr << "Failed to allocate context" << endl;
close_pulse (false);
exit(1);
}
pa_threaded_mainloop_lock (p_mainloop);
if (pa_threaded_mainloop_start (p_mainloop) < 0) {
cerr << "Failed to start main loop" << endl;
close_pulse (true);
exit(1);
}
/* Wait until the context is ready, context_state_cb will trigger p_mainloop */
pa_threaded_mainloop_wait (p_mainloop);
if (pa_context_get_state (p_context) != PA_CONTEXT_READY) {
cerr << "Failed to create context" << endl;
close_pulse (true);
exit(1);
}
if (!(p_stream = pa_stream_new (p_context, "master", &ss, NULL))) {
cerr << "Failed to create new stream" << endl;
close_pulse (true);
exit(1);
}
pa_stream_set_state_callback (p_stream, stream_state_cb, p_mainloop);
pa_stream_set_write_callback (p_stream, stream_write_request_cb, p_mainloop);
pa_stream_set_underflow_callback (p_stream, stream_underflow_cb, p_mainloop);
pa_stream_set_overflow_callback (p_stream, stream_overflow_cb, p_mainloop);
pa_buffer_attr ba;
ba.minreq = min_samples * N_CHANNELS * sizeof (float);
ba.maxlength = max_samples * N_CHANNELS * sizeof (float);
ba.prebuf = (uint32_t)-1;
ba.tlength = (uint32_t)-1;
ba.fragsize = 0;
cerr << "minreq " << ba.minreq << " maxlength " << ba.maxlength << endl;
pa_stream_flags_t sf = (pa_stream_flags_t) (
(int)PA_STREAM_START_CORKED
| (int)PA_STREAM_NO_REMAP_CHANNELS
| (int)PA_STREAM_NO_REMIX_CHANNELS
| (int)PA_STREAM_EARLY_REQUESTS
);
if (pa_stream_connect_playback (p_stream, NULL, &ba, sf, NULL, NULL) < 0) {
cerr << "Failed to connect playback stream" << endl;
close_pulse (true);
exit(1);
}
/* Wait until the stream is ready */
pa_threaded_mainloop_wait (p_mainloop);
if (pa_stream_get_state (p_stream) != PA_STREAM_READY) {
cerr << "Failed to start stream" << endl;
close_pulse (true);
exit(1);
}
pa_threaded_mainloop_unlock (p_mainloop);
}
void
main_process_thread (int freq, int chunk_samples)
{
cerr << "uncorking" << endl;
if (!cork_pulse (false)) {
cerr << "PulseAudio: cannot uncork stream" << endl;
exit(1);
}
pa_threaded_mainloop_lock (p_mainloop);
cerr << "draining" << endl;
sync_pulse (pa_stream_drain (p_stream, stream_operation_cb, p_mainloop));
float buf[chunk_samples * N_CHANNELS];
const int cycle_samples = SAMPLERATE / freq;
const size_t bytes_to_write = sizeof (float) * chunk_samples * N_CHANNELS;
int _processed_samples = 0;
cerr << "locking" << endl;
pa_threaded_mainloop_lock (p_mainloop);
while (true) {
size_t requested;
while ((requested = pa_stream_writable_size (p_stream)) < bytes_to_write) {
cerr << "pa_stream_writable_size only " << requested << " - waiting" << endl;
/* wait until stream_write_request_cb is called and sends a signal */
pa_threaded_mainloop_wait (p_mainloop);
}
cerr << "pa_stream_writable_size " << requested << " - writing " << bytes_to_write << endl;
if (pa_stream_get_state (p_stream) != PA_STREAM_READY) {
cerr << "stream not ready" << endl;
pa_threaded_mainloop_unlock (p_mainloop);
exit(1);
}
for (size_t i = 0; i < N_CHANNELS; ++i) {
for (size_t n = 0; n < chunk_samples; ++n) {
buf[N_CHANNELS * n + i] = fabs((float)((_processed_samples + n) % cycle_samples) - cycle_samples/2) / cycle_samples;
}
}
if (pa_stream_write (p_stream, buf, bytes_to_write, NULL, 0, PA_SEEK_RELATIVE) < 0) {
cerr << "write failed" << endl;
pa_threaded_mainloop_unlock (p_mainloop);
exit(1);
}
_processed_samples += chunk_samples;
}
cerr << "unlocking" << endl;
pa_threaded_mainloop_unlock (p_mainloop);
}
int main(int argc, char*argv[]) {
assert(argc > 4);
int freq = atoi(argv[1]);
int chunk_samples = atoi(argv[2]);
int min_samples= atoi(argv[3]);
int max_samples = atoi(argv[4]);
init_pulse (min_samples, max_samples);
main_process_thread (freq, chunk_samples);
}