shmsink/shmsrc crashes if data is pushed too quickly
Hello. I have noticed that pushing data "too quickly" to an AppSrc can cause it to crash. I am streaming extremely high bitrate 4K video which can range from 12Mbps to 120Mbps depending on my test. I feed an AppSrc with H264 RTP packets that I have received in my own application. The pipeline is as follows:
appsrc ! queue2 ! shmsink
I have my application coded in C/C++. I am using a queue2 for the supposed advantage in network streams. Here are the important bits:
Creating elements
m_appsrc = gst_element_factory_make("appsrc", NULL);
m_shmQueue = gst_element_factory_make("queue2", NULL);
m_shmsink = gst_element_factory_make("shmsink", NULL);
Configuring elements
g_object_set(G_OBJECT(m_shmQueue), "max-size-buffers", 10000, "max-size-bytes", 0, "max-size-time", 0, "use-buffering", true, NULL );
g_object_set(G_OBJECT(m_shmsink), "socket-path", m_shmSocketPath.c_str(), "wait-for-connection", false, "sync", false, "async", false, NULL);
GstCaps * caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264, payload=(int)96");
g_object_set(G_OBJECT(m_appsrc), "min-latency", 20000000, "is-live", true, "do-timestamp", true, "max-bytes", 20000000, "caps", caps, "format", GST_FORMAT_TIME, "block", true, NULL);
g_signal_connect (m_appsrc, "need-data", G_CALLBACK (StartFeed), &g_sourceid);
g_signal_connect (m_appsrc, "enough-data", G_CALLBACK (StopFeed), &g_sourceid);
Building Pipeline
gst_bin_add_many(GST_BIN(m_pipeline), m_appsrc, m_shmQueue, m_shmsink, NULL);
if (gst_element_link_many(m_appsrc, m_shmQueue, m_shmsink, NULL) != true) {
LOG_ERROR(subprocess) << "Elements could not be linked";
return -1;
}
gst_element_set_state (m_pipeline, GST_STATE_PLAYING);
I spawn a thread which pulls RTP packets from my application's boost::circular_buffer<std::vector<uint8_t>>
. Each buffer in the circular buffer is a single RTP packet. I am 100% confident these packets are well formatted (in fact, they are generated by GStreamer). Here is the PushData
thread.
PushData()
{
GstBuffer *buffer;
GstFlowReturn ret;
GstMapInfo map;
static const boost::posix_time::time_duration timeout(boost::posix_time::milliseconds(250));
static GstClockTime duration = 33333333; //.0333 seconds
while (m_running)
{
bool notInWaitForNewBundlesState = TryWaitForIncomingDataAvailable(timeout);
if (notInWaitForNewBundlesState) {
m_incomingQueueMutex.lock();
padded_vector_uint8_t incomingRtpFrame(std::move(m_incomingRtpPacketQueue.front()));
m_incomingRtpPacketQueue.pop_front();
m_incomingQueueMutex.unlock();
/* Create a new empty buffer */
buffer = gst_buffer_new_and_alloc(incomingRtpFrame.size());
// copy in from our local queue
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
memcpy(map.data, incomingRtpFrame.data(), incomingRtpFrame.size());
/* Set its timestamp and duration */
GST_BUFFER_PTS(buffer) = gst_util_uint64_scale(m_numSamples, GST_SECOND, SAMPLE_RATE);
GST_BUFFER_DURATION(buffer) = duration;
/* Push the buffer into the appsrc */
gst_buffer_unmap(buffer, &map);
// code crashes if we get to push_buffer too quickly (too often?). Need to find a better solution. Crashes reguardless of the method used to push_buffer
boost::this_thread::sleep_for(boost::chrono::microseconds(30));
ret = gst_app_src_push_buffer((GstAppSrc *) GetAppSrc(), buffer); // takes ownership of buffer we DO NOT deref
// g_signal_emit_by_name((GstAppSrc *) GetAppSrc(), "push-buffer", buffer, &ret); // does not take ownership of buffer you must deref
// gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK) {
/* We got some error */
std::cout << "WARNING: could not push data into app src. Err code: " << ret << std::endl;
continue;
}
m_numSamples += 1;
}
}
return; // get here only when shutting down
}
Note:
Some notes:
- The appsrc crashes regardless of the method used to push buffers (signal vs push_buffer). If the sleep command in the PushData is removed, about 1 second of video is passed to the shared memory sink before the data flow stops. With the sleep command, the code never crashes.
- I can tell the appsrc crashes because about 10 seconds after the crash my queue2 fills up and blocks as intended
- I can tell the appsrc crashes because my pipeline that picks up from the shmsink stops seeing new data
I am having a great deal of trouble debugging this issue. If I enable deep debugging (GST_DEBUG=5+) then GStreamer slows itself down such that it has the same effect as putting the sleep in the code! So I can either run the code with no debugging tools and sleep (it runs and I get no info). Or I can run the code with debugging tools and no sleep but it runs very slowly and the queue2 eventually fills up and I don't achieve my failure mode.
Some final notes
- TryWaitGetDataAvailable basically checks if my circular buffer is empty and if the enough-data signal was emitted
- The other end of the shmsink runs in a bash script and is something like
gst-launch-1.0 shmsrc socket-path=$shm_socket_path is-live=true do-timestamp=true ! "application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264, payload=(int)96" ! queue ! rtph264depay ! h264parse ! mp4mux ! filesink location=$file/$filename.mp4 -e &
With the sleep, the file is successfully reconstructed at the filesink. Without the sleep, the appsrc crashes and only the first second of the file is reconstructed at the filesink.
I am happy to investigate this issue more.
Thank you. Kyle