Livekitwebrtcsrc with dynamic user add and reconnects
I’m writing python application with should connect to LiveKit room, receive all members audio put in one audio mix and record to file. I’m using Element signal: pad-added, code below. When user reconnects I’m getting this error:
basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: Internal data stream error. basesrc gstbasesrc.c:3132:gst_base_src_loop:<nicesrc0> error: streaming stopped, reason error (-5) queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: Internal data stream error. queue gstqueue.c:992:gst_queue_handle_sink_event:<queue1> error: streaming stopped, reason error (-5)
Can someone help with that?
`DEFAULT_PIPELINE = "
audiomixer name=amix
mpegtsmux name=mux
livekitwebrtcsrc async-handling=false name=srclive video-codecs= signaller::ws-url=ws://127.0.0.1:7880 signaller::api-key=APIMBu7ZiHwC5t9 signaller::secret-key=KIRGA9hjzgfgaTInW0upUiTVRkJCJKhfhwL3g0cAGo4
signaller::room-name=my-first-room
signaller::identity=gst-consumer signaller::participant-name=gst-consumer
audiotestsrc is-live=true wave=4 freq=200 ! audioconvert ! audioresample ! amix.
amix. ! queue ! avenc_aac ! identity dump=false ! fakesink dump=false"
...
def pad_added(element, pad, **data): global pipeline global element_dict
if Gst.Pad.is_linked(pad):
return Gst.PadLinkReturn.OK
logger.info("#############################################"+ pad.name)
dump = False
if "video" in pad.name:
logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
element_dict[pad.name] = {}
element_dict[pad.name]["queue"] = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 0)
Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)
element_dict[pad.name]["identity"] = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)
element_dict[pad.name]["fakesink"] = Gst.ElementFactory.make("fakesink", "fakesink_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)
pipeline.add(element_dict[pad.name]["queue"])
pipeline.add(element_dict[pad.name]["identity"])
pipeline.add(element_dict[pad.name]["fakesink"])
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "sync", False)
Gst.Object.set_property(element_dict[pad.name]["fakesink"], "can-activate-pull", True)
sink_pad = element_dict[pad.name]["fakesink"].get_static_pad("sink")
element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
element_dict[pad.name]["identity"].link(element_dict[pad.name]["fakesink"])
sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
pad.link(sink_pad)
element_dict[pad.name]["queue"].sync_state_with_parent()
element_dict[pad.name]["identity"].sync_state_with_parent()
element_dict[pad.name]["fakesink"].sync_state_with_parent()
return Gst.PadLinkReturn.OK
logger.info("PAD_ADD " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
element_dict[pad.name] = {}
element_dict[pad.name]["queue"] = Gst.ElementFactory.make("queue", "queue_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["queue"], "leaky", 0)
Gst.Object.set_property(element_dict[pad.name]["queue"], "flush-on-eos", True)
element_dict[pad.name]["identity"] = Gst.ElementFactory.make("identity", "identity_" + str(pad.name))
Gst.Object.set_property(element_dict[pad.name]["identity"], "dump", dump)
element_dict[pad.name]["rtpopusdepay"] = Gst.ElementFactory.make("rtpopusdepay", "rtpopusdepay_" + str(pad.name))
element_dict[pad.name]["opusdec"] = Gst.ElementFactory.make("opusdec", "opusdec_" + str(pad.name))
element_dict[pad.name]["audioconvert"] = Gst.ElementFactory.make("audioconvert", "audioconvert_" + str(pad.name))
element_dict[pad.name]["audioresample"] = Gst.ElementFactory.make("audioresample", "audioresample_" + str(pad.name))
amix = pipeline.get_by_name("amix")
pipeline.add(element_dict[pad.name]["queue"])
pipeline.add(element_dict[pad.name]["identity"])
pipeline.add(element_dict[pad.name]["rtpopusdepay"])
pipeline.add(element_dict[pad.name]["opusdec"])
pipeline.add(element_dict[pad.name]["audioconvert"])
pipeline.add(element_dict[pad.name]["audioresample"])
sink_pad = element_dict[pad.name]["queue"].get_static_pad("sink")
if not Gst.Pad.is_linked(sink_pad):
pad.link(sink_pad)
if Gst.Pad.is_linked(sink_pad):
logger.info("Pads linked successfully")
element_dict[pad.name]["queue"].link(element_dict[pad.name]["identity"])
element_dict[pad.name]["identity"].link(element_dict[pad.name]["rtpopusdepay"])
element_dict[pad.name]["rtpopusdepay"].link(element_dict[pad.name]["opusdec"])
element_dict[pad.name]["opusdec"].link(element_dict[pad.name]["audioconvert"])
element_dict[pad.name]["audioconvert"].link(element_dict[pad.name]["audioresample"])
element_dict[pad.name]["audioresample"].link(amix)
element_dict[pad.name]["queue"].sync_state_with_parent()
element_dict[pad.name]["identity"].sync_state_with_parent()
element_dict[pad.name]["rtpopusdepay"].sync_state_with_parent()
element_dict[pad.name]["opusdec"].sync_state_with_parent()
element_dict[pad.name]["audioconvert"].sync_state_with_parent()
element_dict[pad.name]["audioresample"].sync_state_with_parent()
logger.info("PAD_ADDED " + pipeline.name + "-----" + str(element.name) + "-----" + str(pad.name) + "-----" + str(pad.direction))
return Gst.PadLinkReturn.OK
... kivekitbin = pipeline.get_by_name("srclive") if kivekitbin: kivekitbin.connect('pad-added', pad_added)`