Memory Leak when Images are Copied to memoryview
I have encountered an issue where Gstreamer leaks memory depending on how external code uses an image buffer retrieved from the appsink. In the following code, RGB images are read from an h264 RTSP stream and returned as numpy arrays with shape (height, width, 3). As long as max-buffers
is not set, and the numpy array is written to a memoryview
via numpy's image.tobytes()
method, memory usage rapidly increases until all system memory is consumed. No memory leak occurs if "max-buffers" is set to 1, OR the data is copied to the memoryview
via bytes(image.data)
instead of image.tobytes()
. Any ideas as to why this occurs?
For some additional context, this is running in a Ubuntu 20.04 docker container on an Ubuntu 20.04 system.
import ipaddress
import numpy as np
from typing import Tuple
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstApp", "1.0")
from gi.repository import Gst, GstApp
class ImageGrabber:
"""Class that grabs images from an h264 RTSP video stream and returns them as numpy
arrays.
"""
def __init__(self, camera_ip: ipaddress.IPv4Address):
Gst.init()
self._ip_addr = camera_ip
self._pipeline, self._app_sink = self._init_pipeline()
def _init_pipeline(self) -> Tuple[Gst.Pipeline, GstApp.AppSink]:
pipeline = Gst.parse_launch(self._get_pipeline_string())
app_sink = pipeline.get_by_name("appsink0")
# A memory will occur if the appsink does not have the max-buffers set from the
# default of 0 (unlimited), if the external code grabs an image and then assigns
# a copy of the image data to a memoryview via: mem_view[:] = image.tobytes()
# app_sink.set_property("max-buffers", 1)
return pipeline, app_sink
def _get_pipeline_string(self) -> str:
"""Create the GStreamer configuration string needed to initialize the pipeline.
The values used for rtsp params "buffer-mode" and "latency" minimize latency.
"location" specifies the RTSP URI to connect to. We specify that h264 decoding
should be used, and the images should be converted to RGB format.
"""
rtspsrc_params = {
"buffer-mode": "auto",
"latency": "0",
"location": (
f"rtsp://root:password@{self._ip_addr}/axis-media/media.amp?"
"streamprofile=my_profile&camera=3"
),
}
rtspsrc_param_strings = [
f"{key}={value}" for key, value in rtspsrc_params.items()
]
rtspsrc_param_string = " ".join(rtspsrc_param_strings)
return (
f"rtspsrc {rtspsrc_param_string} ! rtph264depay ! avdec_h264 ! "
"videoconvert ! video/x-raw,format=RGB ! appsink name=appsink0"
)
def open(self) -> None:
"""Create the GStreamer pipeline, set it to the playing state, and verify that
it entered the PLAYING state successfully.
"""
self._pipeline.set_state(Gst.State.PLAYING)
# The state has been observed to take ~0.57s time to change (asynchronously
# in a background GStreamer thread).
timeout_ns = 2*10**9
state = self._pipeline.get_state(timeout_ns).state
if state != Gst.State.PLAYING:
raise RuntimeError("Failed to set pipeline to PLAYING state")
def close(self) -> None:
# Setting to the PAUSED state allows the pipeline to be resumed if the
# camera is opened again.
self._pipeline.set_state(Gst.State.PAUSED)
def grab_image(self) -> np.ndarray:
"""Grab a image and return it as a numpy array.
The buffer data is copied into a new buffer since the original buffer is owned
by the GStreamer pipeline, and we wouldn't want the numpy array's underlying
buffer to be modified or freed during its lifetime.
"""
timeout_ns = 10**9
sample = self._app_sink.try_pull_sample(timeout_ns)
if sample is None:
raise RuntimeError("Failed to pull a sample after one second!")
buf = sample.get_buffer()
# gstreamer docs say g_free should be used with extract_dup, but g_free is not
# available in python, and there are no significant memory leaks if the returns
# array is not copied to a memory view via: mem_view[:] = image.tobytes()
img_data = buf.extract_dup(offset=0, size=buf.get_size())
width = 2560
height = 1920
return np.frombuffer(img_data, dtype=np.uint8).reshape((height, width, 3))
def main():
grabber = ImageGrabber(ipaddress.IPv4Address("192.168.0.1"))
grabber.open()
try:
while True:
image = grabber.grab_image()
dummy_bytes = bytearray(image.size)
mem_view = memoryview(dummy_bytes)
# Causes a memory to leak rapidly as the camera streams:
mem_view[:] = image.tobytes()
# No significant memory leak if data is copied this way:
# mem_view[:] = bytes(image.data)
finally:
grabber.close()
if __name__ == "__main__":
main()