RTSP Disconnect and Reconnect on Error
I have a pipeline where I am streaming from several RTSP sources and processing frames that arrive at an appsink element. I've attached a simple 2 camera example of the setup/pipeline graph. The plan is to run this pipeline for several hours, and there is a chance that during this time one of the cameras may disconnect/have connection issues that leads to rtspsrc not recieving any data. I am wondering what the best way is to achieve this?
I was wondering if there is a way I can add some sort of callback to the src pad of the RtspSourceBin()-->Gst.Bin output, that listens for a certain signal (or maybe I would need to listen on the bus with a message handler that looks for messages from the RtspSourceBin?). I see that a time out message is generated called "GstRTSPSrcTimeout", perhaps I should listen for this?
Ideally, when an error occurs where the stream connection is lost I'd like to disconnect the offending bin and attempt to re-establish a new connection while the pipeline is still playing.
In the below code I have a method that creates a Gst.Bin object containing some elements for RTSP streaming- this isn't set in stone, I could make changes to this if necessary.
multi_stream_pipeline_graph.pdf
Pipeline creation method
def create_pipeline(self):
"""
Creates the multi-rtsp stream deepstream pipeline
@return:
"""
if self.num_cameras == 0:
print('No cameras initialized in DSSystem!')
exit()
# Handle Some initial settings
self.decode_mode = DecoderEnum(self.ds_config['decode_mode'])
if not self.ds_config['new_nvstreammux']:
exit('NEW_NVSTREAMMUX must be set to True [ 1 ]')
else:
print('Setting env. var. USE_NEW_NVSTREAMMUX: yes')
os.environ["USE_NEW_NVSTREAMMUX"] = "yes"
if self.MyEnvironmentVar.PERF_DATA_BOOL:
global perf_data
perf_data = PERF_DATA(self.num_cameras)
# PREPARE GSTREAMER AND CREATE NEW PIPELINE
Gst.init(None)
self.gst_pipeline = Gst.Pipeline.new('multi-stream-pipeline')
# PREPARE GSTREAMER AND CREATE NEW PIPELINE
# LINK RTSPinputBIN AND NVSTREAMMUX PADS
# For precise control use property ("config-file-path", /my/path/conf.cfg)
# path = 'configs/nvstreammux_conf.txt'
# nvstrmux.set_property('config-file-path', path)
# print(nvstrmux.get_property('config-file-path'))
# Create nvstreammux instance to form batches from one or more sources.
nvstrmux = Gst.ElementFactory.make("nvstreammux", "nvstrmux")
if not nvstrmux:
sys.stderr.write(" Unable to create NvStreamMux \n")
self.gst_pipeline.add(nvstrmux)
for ix, cam in enumerate(self.cameras):
cam: CameraSource
rtspbin = RtspSourceBin().create_rtsp_source_bin(index=ix,
camera_src=cam,
dec_mode=self.decode_mode.name)
cam.gst_bin = rtspbin
self.gst_pipeline.add(cam.gst_bin)
sinkpad = nvstrmux.get_request_pad(f'sink_{ix}')
if not sinkpad:
sys.stderr.write('Unable to create sink pad bin for nvstreammux \n')
sourcepad = cam.gst_bin.get_static_pad('src')
if not sourcepad:
sys.stderr.write('Unable to create src pad bin \n')
sourcepad.link(sinkpad)
# TODO: If required add output queue mux --> queue --> appsink
# PREPARE QUEUE FOR MUX SRC PAD
# out_queue = Gst.ElementFactory.make('queue', f'infer_queue')
# if not infer_queue:
# sys.stderr.write(" Unable to create infer_queue \n")
# infer_queue.set_property("leaky", 2)
# infer_queue.set_property("max-size-buffers", 1)
# # Prepare Queue/Buffer between PGIE and Tracker
# pipeline.add(infer_queue)
# PIPELINE TERMINATION
appsink = Gst.ElementFactory.make('appsink', 'batch_sink')
appsink.set_property("emit-signals", True)
sink_handler_id = appsink.connect("new-sample", on_new_sample, self.buffer_queue) # TODO: was self.buffer_queue
# PIPELINE ADDING AND LINKING
print("Adding elements to Pipeline \n")
self.gst_pipeline.add(appsink)
nvstrmux.link(appsink)
# PREPARE EVENT/BUS LOOP AND MESSAGING
self.gst_loop = GLib.MainLoop()
self.gst_bus = self.gst_pipeline.get_bus()
# Bus will be polled to watch for GObject.connect('message type', ..., ...)
self.gst_bus.add_signal_watch()
# TODO: Multiple connects could be used here and message::eos, or message::error
# can be used to only call certain callback functions
# (ie. specific callbacks to handle certain message types)
handler_id = self.gst_bus.connect("message", cb_gstloop_bus.bus_msg_handler, self.gst_loop)
# connect returns a handler ID for the callback
# could connect several different callbacks to the message signal
# PREPARE EVENT/BUS LOOP AND MESSAGING
if self.MyEnvironmentVar.SAVE_DS_GRAPH:
# SAVE THE PIPELINE GRAPH
self.save_pipeline_as_pdf(f'multi_stream_pipeline_graph_NULL')
# PIPELINE PLAYING AND NULL CLEANUP
print("Starting pipeline \n")
self.gst_pipeline.set_state(Gst.State.PLAYING)
if self.MyEnvironmentVar.SAVE_DS_GRAPH:
# SAVE THE PIPELINE GRAPH
self.save_pipeline_as_pdf(f'multi_stream_pipeline_graph_PLAY')
t_setup = (time.time_ns() - DSSystem.t0) * 1e-9
print(f'Pipeline setup took {t_setup}\nStarting DSSystem Run Time Clock')
RTSP source bin creation method
class RtspSourceBin:
@classmethod
def create_rtsp_source_bin(cls, index: int, camera_src: CameraSource,
dec_mode="HARDWARE", format="RGBA"):
# TODO: For Python RGBA is only supported in this case, ideally we could use BGRx,
# however, as of this DeepStream version 6.2 it does not seem to be supported for our use case
# SOURCE BIN
# Create a source GstBin to abstract this bin's content from the rest of the pipeline
bin_name = "source-bin-%02d" % index
print(bin_name)
rtsp_source_bin = Gst.Bin.new(bin_name)
if not rtsp_source_bin:
logger.error(" Unable to create source bin \n")
# INTERNAL BIN ELEMENTS
# TODO: Add GsTProp class to CameraSource class
rtspsrc = Gst.ElementFactory.make("rtspsrc", f"rtspsrc_{index}")
rtspsrc.set_property("location", camera_src.source_uri)
# rtspsrc.set_property("protocols", "tcp") # def: tcp+udp-mcast+udp
rtspsrc.set_property("latency", 2000) # Amount of ms to buffer
rtspsrc.set_property("udp-buffer-size", 2000000) # Size of the kernel UDP receive buffer in bytes.
rtspsrc.set_property("drop-on-latency", 1)
rtspsrc.set_property("timeout", 5000000) # Retry TCP transport after UDP timeout microseconds
# rtspsrc.set_property("debug", 1)
# # For NTP RTCP Sender reports see:
# # https://docs.nvidia.com/metropolis/deepstream/6.2/dev-guide/text/DS_NTP_Timestamp.html?highlight=ntp
GST_HELPER_LIB = ctypes.CDLL("/opt/nvidia/deepstream/deepstream/lib/libnvdsgst_helper.so")
GST_HELPER_LIB.configure_source_for_ntp_sync(hash(rtspsrc))
# Extracts H*** video from RTP packets (RFC 3984)
rtpdepay = None
if camera_src.parameters.compression == "H264":
rtpdepay = Gst.ElementFactory.make("rtph264depay", f"rtpdepay_{index}")
debug("Creating H264 rtpdepay")
elif camera_src.parameters.compression == "H265":
rtpdepay = Gst.ElementFactory.make("rtph265depay", f"rtpdepay_{index}")
debug("Creating H265 rtpdepay")
if not rtpdepay:
sys.stderr.write(f"ERROR: Unable to create rtpdepay_{index}")
sys.exit(1)
# Parses H*** streams
h26parse = None
if camera_src.parameters.compression == "H264":
h26parse = Gst.ElementFactory.make("h264parse", f"h26parse_{index}")
debug("Creating H264 parser")
elif camera_src.parameters.compression == "H265":
h26parse = Gst.ElementFactory.make("h265parse", f"h26parse_{index}")
debug("Creating H265 parser")
if not rtpdepay:
sys.stderr.write(f"ERROR: Unable to create h26parse_{index}")
sys.exit(1)
if dec_mode == "SOFTWARE":
print('Using software decoding')
if camera_src.parameters.compression == "H264":
decoder = Gst.ElementFactory.make("avdec_h264")
debug("Creating H264 dec")
elif camera_src.parameters.compression == "H265":
decoder = Gst.ElementFactory.make("avdec_h265")
debug("Creating H265 dec")
elif dec_mode == "HARDWARE":
print('Using hardware decoding')
# https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvvideo4linux2.html
decoder = Gst.ElementFactory.make("nvv4l2decoder", f"nvv4l2decoder_{index}")
decoder.set_property("cudadec-memtype", int(pyds.NVBUF_MEM_CUDA_DEVICE))
# decoder.set_property("num-extra-surfaces", settings.DEEPSTREAM_NVV4L2DECODER_NUM_EXTRA_SURFACES)
# https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvvideoconvert.html
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv_rtspsrc{index}")
nvvideoconvert.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_UNIFIED))
# nvvideoconvert.set_property("output-buffers", settings.DEEPSTREAM_NVVIDEOCONVERT_OUTPUT_BUFFERS)
# Queue to avoid memory leak
# This queue is necessary to avoid a memory leak
# rtpjitterbuffer can still cause buffer overflows
# queue = Gst.ElementFactory.make("queue", f"queue_drop_frames{index}")
# queue.set_property("leaky", 2)
# # queue.set_property("max-size-buffers", 1)
# queue.set_property("max-size-time", 5000000000) # ns ... testing buffering
# By converting images to RGBA we can download them from GPU whenever we want
caps_filter = Gst.ElementFactory.make("capsfilter", f"capsfilt_rtspsrc{index}")
caps_filter.set_property("caps", Gst.Caps.from_string((f"video/x-raw(memory:NVMM), format={format}")))
# RTSPSRC GHOST PAD
# rtspsrc will create a src pad once it starts decoding, for now we initialize a ghost pad
# to be linked to next element in the bin
# We also add a callback on the signal "pad-added" to link the actual src pad when it is initialized
rtspsrc.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
rtspsrc.connect("pad-added", cls.rtspsrc_callback_newpad, rtpdepay)
# ADD ELEMENTS TO THE BIN
Gst.Bin.add(rtsp_source_bin, rtspsrc)
Gst.Bin.add(rtsp_source_bin, rtpdepay)
Gst.Bin.add(rtsp_source_bin, h26parse)
Gst.Bin.add(rtsp_source_bin, decoder)
# Gst.Bin.add(rtsp_source_bin, queue)
Gst.Bin.add(rtsp_source_bin, nvvideoconvert)
Gst.Bin.add(rtsp_source_bin, caps_filter)
# LINK ELEMENTS
rtspsrc.link(rtpdepay)
rtpdepay.link(h26parse)
h26parse.link(decoder)
# nvv4l2decoder.link(queue)
# queue.link(nvvideoconvert)
decoder.link(nvvideoconvert)
nvvideoconvert.link(caps_filter)
# BIN GHOST PAD
# We need to create a ghost pad for the rtsp source bin which will act as a src pad
rtsp_source_bin.add_pad(
Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
)
ghost_pad = rtsp_source_bin.get_static_pad("src")
if not ghost_pad:
logger.error(" Failed to add ghost pad in source bin \n")
return None
ghost_pad.set_target(caps_filter.get_static_pad("src"))
return rtsp_source_bin
@classmethod
def rtspsrc_callback_newpad(cls, rtsp_src, rtsp_src_new_src_pad, data):
logger.info(f"SourceBin: added pad {rtsp_src_new_src_pad.name} to {rtsp_src.name}")
caps = rtsp_src_new_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
# gstname = gststruct.get_name()
# rtph264depay = data
# features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not audio.
# TODO: are we checking if this is a video in the correct way?
if gststruct.get_string("media") == "video":
# Get the source bin ghost pad
rtsp_src_ghost_pad = rtsp_src.get_static_pad("src")
logger.info(f"padcaps {rtsp_src_ghost_pad.get_current_caps()}")
if not rtsp_src_ghost_pad.set_target(rtsp_src_new_src_pad):
logger.error(
"Failed to link decoder src pad to source bin ghost pad\n"
)