Commit 4ef676f1 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

decklink: Allow for a maximum of 5 packets to be queued up

In case downstream is a bit slow with consuming packets at times.
parent aac0027e
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
GST_DEBUG_CATEGORY_STATIC (gst_decklink_audio_src_debug); GST_DEBUG_CATEGORY_STATIC (gst_decklink_audio_src_debug);
#define GST_CAT_DEFAULT gst_decklink_audio_src_debug #define GST_CAT_DEFAULT gst_decklink_audio_src_debug
#define MAX_QUEUE_LENGTH 5
enum enum
{ {
PROP_0, PROP_0,
...@@ -43,6 +45,37 @@ static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("src", ...@@ -43,6 +45,37 @@ static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("src",
"layout=interleaved") "layout=interleaved")
); );
typedef struct
{
IDeckLinkAudioInputPacket *packet;
GstClockTime capture_time;
} CapturePacket;
static void
capture_packet_free (void *data)
{
CapturePacket *packet = (CapturePacket *) data;
packet->packet->Release ();
g_free (packet);
}
typedef struct
{
IDeckLinkAudioInputPacket *packet;
IDeckLinkInput *input;
} AudioPacket;
static void
audio_packet_free (void *data)
{
AudioPacket *packet = (AudioPacket *) data;
packet->packet->Release ();
packet->input->Release ();
g_free (packet);
}
static void gst_decklink_audio_src_set_property (GObject * object, static void gst_decklink_audio_src_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec); guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_decklink_audio_src_get_property (GObject * object, static void gst_decklink_audio_src_get_property (GObject * object,
...@@ -133,6 +166,8 @@ gst_decklink_audio_src_init (GstDecklinkAudioSrc * self) ...@@ -133,6 +166,8 @@ gst_decklink_audio_src_init (GstDecklinkAudioSrc * self)
g_mutex_init (&self->lock); g_mutex_init (&self->lock);
g_cond_init (&self->cond); g_cond_init (&self->cond);
g_queue_init (&self->current_packets);
} }
void void
...@@ -260,61 +295,52 @@ gst_decklink_audio_src_got_packet (GstElement * element, ...@@ -260,61 +295,52 @@ gst_decklink_audio_src_got_packet (GstElement * element,
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
if (!self->flushing) { if (!self->flushing) {
if (self->current_packet) CapturePacket *p;
self->current_packet->Release ();
self->current_packet = packet; while (g_queue_get_length (&self->current_packets) >= MAX_QUEUE_LENGTH) {
p = (CapturePacket *) g_queue_pop_head (&self->current_packets);
GST_WARNING_OBJECT (self, "Dropping old packet at %" GST_TIME_FORMAT,
GST_TIME_ARGS (p->capture_time));
capture_packet_free (p);
}
p = (CapturePacket *) g_malloc0 (sizeof (CapturePacket));
p->packet = packet;
p->capture_time = capture_time;
packet->AddRef (); packet->AddRef ();
self->current_packet_capture_time = capture_time; g_queue_push_tail (&self->current_packets, p);
g_cond_signal (&self->cond); g_cond_signal (&self->cond);
} }
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
} }
typedef struct
{
IDeckLinkAudioInputPacket *packet;
IDeckLinkInput *input;
} AudioPacket;
static void
audio_packet_free (void *data)
{
AudioPacket *audio_packet = (AudioPacket *) data;
audio_packet->packet->Release ();
audio_packet->input->Release ();
g_free (audio_packet);
}
static GstFlowReturn static GstFlowReturn
gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer) gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
{ {
GstDecklinkAudioSrc *self = GST_DECKLINK_AUDIO_SRC_CAST (bsrc); GstDecklinkAudioSrc *self = GST_DECKLINK_AUDIO_SRC_CAST (bsrc);
GstFlowReturn flow_ret = GST_FLOW_OK; GstFlowReturn flow_ret = GST_FLOW_OK;
IDeckLinkAudioInputPacket *packet = NULL;
GstClockTime capture_time = GST_CLOCK_TIME_NONE;
const guint8 *data; const guint8 *data;
glong sample_count; glong sample_count;
gsize data_size; gsize data_size;
CapturePacket *p;
AudioPacket *ap; AudioPacket *ap;
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
while (!self->current_packet && !self->flushing) { while (g_queue_is_empty (&self->current_packets) && !self->flushing) {
g_cond_wait (&self->cond, &self->lock); g_cond_wait (&self->cond, &self->lock);
} }
packet = self->current_packet;
capture_time = self->current_packet_capture_time; p = (CapturePacket *) g_queue_pop_head (&self->current_packets);
self->current_packet = NULL;
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
if (self->flushing) { if (self->flushing) {
if (packet) if (p)
packet->Release (); capture_packet_free (p);
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
packet->GetBytes ((gpointer *) & data); p->packet->GetBytes ((gpointer *) & data);
sample_count = packet->GetSampleFrameCount (); sample_count = p->packet->GetSampleFrameCount ();
data_size = self->info.bpf * sample_count; data_size = self->info.bpf * sample_count;
ap = (AudioPacket *) g_malloc0 (sizeof (AudioPacket)); ap = (AudioPacket *) g_malloc0 (sizeof (AudioPacket));
...@@ -324,15 +350,18 @@ gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer) ...@@ -324,15 +350,18 @@ gst_decklink_audio_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
(gpointer) data, data_size, 0, data_size, ap, (gpointer) data, data_size, 0, data_size, ap,
(GDestroyNotify) audio_packet_free); (GDestroyNotify) audio_packet_free);
ap->packet = packet; ap->packet = p->packet;
p->packet->AddRef ();
ap->input = self->input->input; ap->input = self->input->input;
ap->input->AddRef (); ap->input->AddRef ();
// TODO: Jitter/discont handling // TODO: Jitter/discont handling
GST_BUFFER_TIMESTAMP (*buffer) = capture_time; GST_BUFFER_TIMESTAMP (*buffer) = p->capture_time;
GST_BUFFER_DURATION (*buffer) = GST_BUFFER_DURATION (*buffer) =
gst_util_uint64_scale_int (sample_count, GST_SECOND, self->info.rate); gst_util_uint64_scale_int (sample_count, GST_SECOND, self->info.rate);
capture_packet_free (p);
return flow_ret; return flow_ret;
} }
...@@ -352,7 +381,7 @@ gst_decklink_audio_src_query (GstBaseSrc * bsrc, GstQuery * query) ...@@ -352,7 +381,7 @@ gst_decklink_audio_src_query (GstBaseSrc * bsrc, GstQuery * query)
min = min =
gst_util_uint64_scale_ceil (GST_MSECOND, self->input->mode->fps_d, gst_util_uint64_scale_ceil (GST_MSECOND, self->input->mode->fps_d,
self->input->mode->fps_n); self->input->mode->fps_n);
max = min; max = MAX_QUEUE_LENGTH * min;
gst_query_set_latency (query, TRUE, min, max); gst_query_set_latency (query, TRUE, min, max);
ret = TRUE; ret = TRUE;
...@@ -394,6 +423,8 @@ gst_decklink_audio_src_unlock_stop (GstBaseSrc * bsrc) ...@@ -394,6 +423,8 @@ gst_decklink_audio_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
self->flushing = FALSE; self->flushing = FALSE;
g_queue_foreach (&self->current_packets, (GFunc) capture_packet_free, NULL);
g_queue_clear (&self->current_packets);
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
return TRUE; return TRUE;
...@@ -535,16 +566,9 @@ gst_decklink_audio_src_change_state (GstElement * element, ...@@ -535,16 +566,9 @@ gst_decklink_audio_src_change_state (GstElement * element,
self->input->clock)); self->input->clock));
gst_clock_set_master (self->input->clock, NULL); gst_clock_set_master (self->input->clock, NULL);
if (self->current_packet) { g_queue_foreach (&self->current_packets, (GFunc) capture_packet_free,
self->current_packet->Release (); NULL);
self->current_packet = NULL; g_queue_clear (&self->current_packets);
}
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
if (self->current_packet) {
self->current_packet->Release ();
self->current_packet = NULL;
}
break; break;
case GST_STATE_CHANGE_READY_TO_NULL: case GST_STATE_CHANGE_READY_TO_NULL:
gst_decklink_audio_src_close (self); gst_decklink_audio_src_close (self);
......
...@@ -59,8 +59,7 @@ struct _GstDecklinkAudioSrc ...@@ -59,8 +59,7 @@ struct _GstDecklinkAudioSrc
GCond cond; GCond cond;
GMutex lock; GMutex lock;
gboolean flushing; gboolean flushing;
IDeckLinkAudioInputPacket *current_packet; GQueue current_packets;
GstClockTime current_packet_capture_time;
}; };
struct _GstDecklinkAudioSrcClass struct _GstDecklinkAudioSrcClass
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
GST_DEBUG_CATEGORY_STATIC (gst_decklink_video_src_debug); GST_DEBUG_CATEGORY_STATIC (gst_decklink_video_src_debug);
#define GST_CAT_DEFAULT gst_decklink_video_src_debug #define GST_CAT_DEFAULT gst_decklink_video_src_debug
#define MAX_QUEUE_LENGTH 5
enum enum
{ {
PROP_0, PROP_0,
...@@ -35,6 +37,37 @@ enum ...@@ -35,6 +37,37 @@ enum
PROP_DEVICE_NUMBER PROP_DEVICE_NUMBER
}; };
typedef struct
{
IDeckLinkVideoInputFrame *frame;
GstClockTime capture_time;
} CaptureFrame;
static void
capture_frame_free (void *data)
{
CaptureFrame *frame = (CaptureFrame *) data;
frame->frame->Release ();
g_free (frame);
}
typedef struct
{
IDeckLinkVideoInputFrame *frame;
IDeckLinkInput *input;
} VideoFrame;
static void
video_frame_free (void *data)
{
VideoFrame *frame = (VideoFrame *) data;
frame->frame->Release ();
frame->input->Release ();
g_free (frame);
}
static void gst_decklink_video_src_set_property (GObject * object, static void gst_decklink_video_src_set_property (GObject * object,
guint property_id, const GValue * value, GParamSpec * pspec); guint property_id, const GValue * value, GParamSpec * pspec);
static void gst_decklink_video_src_get_property (GObject * object, static void gst_decklink_video_src_get_property (GObject * object,
...@@ -125,6 +158,8 @@ gst_decklink_video_src_init (GstDecklinkVideoSrc * self) ...@@ -125,6 +158,8 @@ gst_decklink_video_src_init (GstDecklinkVideoSrc * self)
g_mutex_init (&self->lock); g_mutex_init (&self->lock);
g_cond_init (&self->cond); g_cond_init (&self->cond);
g_queue_init (&self->current_frames);
} }
void void
...@@ -205,60 +240,50 @@ gst_decklink_video_src_got_frame (GstElement * element, ...@@ -205,60 +240,50 @@ gst_decklink_video_src_got_frame (GstElement * element,
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
if (!self->flushing) { if (!self->flushing) {
if (self->current_frame) CaptureFrame *f;
self->current_frame->Release ();
self->current_frame = frame; while (g_queue_get_length (&self->current_frames) >= MAX_QUEUE_LENGTH) {
f = (CaptureFrame *) g_queue_pop_head (&self->current_frames);
GST_WARNING_OBJECT (self, "Dropping old frame at %" GST_TIME_FORMAT,
GST_TIME_ARGS (f->capture_time));
capture_frame_free (f);
}
f = (CaptureFrame *) g_malloc0 (sizeof (CaptureFrame));
f->frame = frame;
f->capture_time = capture_time;
frame->AddRef (); frame->AddRef ();
self->current_frame_capture_time = capture_time; g_queue_push_tail (&self->current_frames, f);
g_cond_signal (&self->cond); g_cond_signal (&self->cond);
} }
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
} }
typedef struct
{
IDeckLinkVideoInputFrame *frame;
IDeckLinkInput *input;
} VideoFrame;
static void
video_frame_free (void *data)
{
VideoFrame *video_frame = (VideoFrame *) data;
video_frame->frame->Release ();
video_frame->input->Release ();
g_free (video_frame);
}
static GstFlowReturn static GstFlowReturn
gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer) gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
{ {
GstDecklinkVideoSrc *self = GST_DECKLINK_VIDEO_SRC_CAST (bsrc); GstDecklinkVideoSrc *self = GST_DECKLINK_VIDEO_SRC_CAST (bsrc);
GstFlowReturn flow_ret = GST_FLOW_OK; GstFlowReturn flow_ret = GST_FLOW_OK;
IDeckLinkVideoInputFrame *frame = NULL;
GstClockTime capture_time = GST_CLOCK_TIME_NONE;
const guint8 *data; const guint8 *data;
gsize data_size; gsize data_size;
VideoFrame *vf; VideoFrame *vf;
CaptureFrame *f;
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
while (!self->current_frame && !self->flushing) { while (g_queue_is_empty (&self->current_frames) && !self->flushing) {
g_cond_wait (&self->cond, &self->lock); g_cond_wait (&self->cond, &self->lock);
} }
frame = self->current_frame;
capture_time = self->current_frame_capture_time; f = (CaptureFrame *) g_queue_pop_head (&self->current_frames);
self->current_frame = NULL;
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
if (self->flushing) { if (self->flushing) {
if (frame) if (f)
frame->Release (); capture_frame_free (f);
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
frame->GetBytes ((gpointer *) & data); f->frame->GetBytes ((gpointer *) & data);
data_size = self->info.size; data_size = self->info.size;
...@@ -269,14 +294,17 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer) ...@@ -269,14 +294,17 @@ gst_decklink_video_src_create (GstPushSrc * bsrc, GstBuffer ** buffer)
(gpointer) data, data_size, 0, data_size, vf, (gpointer) data, data_size, 0, data_size, vf,
(GDestroyNotify) video_frame_free); (GDestroyNotify) video_frame_free);
vf->frame = frame; vf->frame = f->frame;
f->frame->AddRef ();
vf->input = self->input->input; vf->input = self->input->input;
vf->input->AddRef (); vf->input->AddRef ();
GST_BUFFER_TIMESTAMP (*buffer) = capture_time; GST_BUFFER_TIMESTAMP (*buffer) = f->capture_time;
GST_BUFFER_DURATION (*buffer) = gst_util_uint64_scale_int (GST_SECOND, GST_BUFFER_DURATION (*buffer) = gst_util_uint64_scale_int (GST_SECOND,
self->info.fps_d, self->info.fps_n); self->info.fps_d, self->info.fps_n);
capture_frame_free (f);
return flow_ret; return flow_ret;
} }
...@@ -296,7 +324,7 @@ gst_decklink_video_src_query (GstBaseSrc * bsrc, GstQuery * query) ...@@ -296,7 +324,7 @@ gst_decklink_video_src_query (GstBaseSrc * bsrc, GstQuery * query)
min = min =
gst_util_uint64_scale_ceil (GST_MSECOND, mode->fps_d, mode->fps_n); gst_util_uint64_scale_ceil (GST_MSECOND, mode->fps_d, mode->fps_n);
max = min; max = MAX_QUEUE_LENGTH * min;
gst_query_set_latency (query, TRUE, min, max); gst_query_set_latency (query, TRUE, min, max);
ret = TRUE; ret = TRUE;
...@@ -334,6 +362,8 @@ gst_decklink_video_src_unlock_stop (GstBaseSrc * bsrc) ...@@ -334,6 +362,8 @@ gst_decklink_video_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&self->lock); g_mutex_lock (&self->lock);
self->flushing = FALSE; self->flushing = FALSE;
g_queue_foreach (&self->current_frames, (GFunc) capture_frame_free, NULL);
g_queue_clear (&self->current_frames);
g_mutex_unlock (&self->lock); g_mutex_unlock (&self->lock);
return TRUE; return TRUE;
...@@ -448,6 +478,9 @@ gst_decklink_video_src_change_state (GstElement * element, ...@@ -448,6 +478,9 @@ gst_decklink_video_src_change_state (GstElement * element,
gst_message_new_clock_lost (GST_OBJECT_CAST (element), gst_message_new_clock_lost (GST_OBJECT_CAST (element),
self->input->clock)); self->input->clock));
gst_clock_set_master (self->input->clock, NULL); gst_clock_set_master (self->input->clock, NULL);
g_queue_foreach (&self->current_frames, (GFunc) capture_frame_free, NULL);
g_queue_clear (&self->current_frames);
break; break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:{
HRESULT res; HRESULT res;
...@@ -459,11 +492,6 @@ gst_decklink_video_src_change_state (GstElement * element, ...@@ -459,11 +492,6 @@ gst_decklink_video_src_change_state (GstElement * element,
(NULL), ("Failed to stop streams: 0x%08x", res)); (NULL), ("Failed to stop streams: 0x%08x", res));
ret = GST_STATE_CHANGE_FAILURE; ret = GST_STATE_CHANGE_FAILURE;
} }
if (self->current_frame) {
self->current_frame->Release ();
self->current_frame = NULL;
}
break; break;
} }
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:{
......
...@@ -59,8 +59,7 @@ struct _GstDecklinkVideoSrc ...@@ -59,8 +59,7 @@ struct _GstDecklinkVideoSrc
GCond cond; GCond cond;
GMutex lock; GMutex lock;
gboolean flushing; gboolean flushing;
IDeckLinkVideoInputFrame *current_frame; GQueue current_frames;
GstClockTime current_frame_capture_time;
}; };
struct _GstDecklinkVideoSrcClass struct _GstDecklinkVideoSrcClass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment