Commit 407f45dc authored by Wim Taymans's avatar Wim Taymans

event: add STICKY_MULTY events

Add a new event flag for sticky events so that multiple events of that type can
be stored on a pad at the same time. Change the _get_sticky_event() function to
loop over the multiple events of a type.
Change the foreach function to make it possible to removed and modify the sticky
events on a pad.
Use an variable size array now to store the events. This could later be
optimized some more.
parent b7e4eeb0
......@@ -29,11 +29,13 @@ typedef struct _GstEvent GstEvent;
/**
* GstEventTypeFlags:
* @GST_EVENT_TYPE_UPSTREAM: Set if the event can travel upstream.
* @GST_EVENT_TYPE_DOWNSTREAM: Set if the event can travel downstream.
* @GST_EVENT_TYPE_SERIALIZED: Set if the event should be serialized with data
* flow.
* @GST_EVENT_TYPE_STICKY: Set if the event is sticky on the pads.
* @GST_EVENT_TYPE_UPSTREAM: Set if the event can travel upstream.
* @GST_EVENT_TYPE_DOWNSTREAM: Set if the event can travel downstream.
* @GST_EVENT_TYPE_SERIALIZED: Set if the event should be serialized with data
* flow.
* @GST_EVENT_TYPE_STICKY: Set if the event is sticky on the pads.
* @GST_EVENT_TYPE_STICKY_MULTI: Multiple sticky events can be on a pad, each
* identified by the event name.
*
* #GstEventTypeFlags indicate the aspects of the different #GstEventType
* values. You can get the type flags of a #GstEventType with the
......@@ -43,7 +45,8 @@ typedef enum {
GST_EVENT_TYPE_UPSTREAM = 1 << 0,
GST_EVENT_TYPE_DOWNSTREAM = 1 << 1,
GST_EVENT_TYPE_SERIALIZED = 1 << 2,
GST_EVENT_TYPE_STICKY = 1 << 3
GST_EVENT_TYPE_STICKY = 1 << 3,
GST_EVENT_TYPE_STICKY_MULTI = 1 << 4
} GstEventTypeFlags;
/**
......@@ -135,9 +138,9 @@ typedef enum {
/* downstream serialized events */
GST_EVENT_CAPS = GST_EVENT_MAKE_TYPE (5, 1, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
GST_EVENT_SEGMENT = GST_EVENT_MAKE_TYPE (6, 2, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
GST_EVENT_TAG = GST_EVENT_MAKE_TYPE (7, 3, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
GST_EVENT_TAG = GST_EVENT_MAKE_TYPE (7, 3, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY) | FLAG(STICKY_MULTI)),
GST_EVENT_BUFFERSIZE = GST_EVENT_MAKE_TYPE (8, 4, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
GST_EVENT_SINK_MESSAGE = GST_EVENT_MAKE_TYPE (9, 5, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
GST_EVENT_SINK_MESSAGE = GST_EVENT_MAKE_TYPE (9, 5, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY) | FLAG(STICKY_MULTI)),
GST_EVENT_EOS = GST_EVENT_MAKE_TYPE (10, 6, FLAG(DOWNSTREAM) | FLAG(SERIALIZED) | FLAG(STICKY)),
/* upstream events */
......
......@@ -107,7 +107,8 @@ typedef struct
struct _GstPadPrivate
{
PadEvent events[GST_EVENT_MAX_STICKY];
guint events_cookie;
GArray *events;
gint using;
guint probe_list_cookie;
......@@ -322,51 +323,192 @@ gst_pad_init (GstPad * pad)
pad->block_cond = g_cond_new ();
g_hook_list_init (&pad->probes, sizeof (GstProbe));
}
static void
clear_event (GstPad * pad, guint idx)
{
gst_event_replace (&pad->priv->events[idx].event, NULL);
pad->priv->events[idx].received = FALSE;
pad->priv->events = g_array_sized_new (FALSE, TRUE,
sizeof (PadEvent), GST_EVENT_MAX_STICKY);
}
/* called when setting the pad inactive. It removes all sticky events from
* the pad */
static void
clear_events (GstPad * pad)
remove_events (GstPad * pad)
{
guint i;
guint i, len;
GArray *events;
for (i = 0; i < GST_EVENT_MAX_STICKY; i++)
clear_event (pad, i);
events = pad->priv->events;
len = events->len;
for (i = 0; i < len; i++) {
PadEvent *ev = &g_array_index (events, PadEvent, i);
gst_event_unref (ev->event);
}
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
g_array_set_size (events, 0);
pad->priv->events_cookie++;
}
static gboolean
reschedule_event (GstPad * pad, guint idx)
static PadEvent *
find_event_by_type (GstPad * pad, GstEventType type, guint idx)
{
if (pad->priv->events[idx].event) {
pad->priv->events[idx].received = FALSE;
return TRUE;
guint i, len;
GArray *events;
PadEvent *ev;
events = pad->priv->events;
len = events->len;
for (i = 0; i < len; i++) {
ev = &g_array_index (events, PadEvent, i);
if (ev->event == NULL)
continue;
if (GST_EVENT_TYPE (ev->event) == type) {
if (idx == 0)
goto found;
idx--;
}
}
return FALSE;
ev = NULL;
found:
return ev;
}
static PadEvent *
find_event (GstPad * pad, GstEvent * event)
{
guint i, len;
GArray *events;
PadEvent *ev;
events = pad->priv->events;
len = events->len;
for (i = 0; i < len; i++) {
ev = &g_array_index (events, PadEvent, i);
if (event == ev->event)
goto found;
}
ev = NULL;
found:
return ev;
}
static void
reschedule_events (GstPad * pad)
remove_event_by_type (GstPad * pad, GstEventType type)
{
guint i;
guint i, len;
GArray *events;
PadEvent *ev;
events = pad->priv->events;
len = events->len;
i = 0;
while (i < len) {
ev = &g_array_index (events, PadEvent, i);
if (ev->event == NULL)
goto next;
if (GST_EVENT_TYPE (ev->event) != type)
goto next;
gst_event_unref (ev->event);
g_array_remove_index (events, i);
len--;
pad->priv->events_cookie++;
continue;
next:
i++;
}
}
static void
schedule_events (GstPad * srcpad, GstPad * sinkpad)
{
gint i, len;
GArray *events;
PadEvent *ev;
gboolean pending = FALSE;
for (i = 0; i < GST_EVENT_MAX_STICKY; i++)
pending |= reschedule_event (pad, i);
events = srcpad->priv->events;
len = events->len;
for (i = 0; i < len; i++) {
ev = &g_array_index (events, PadEvent, i);
if (ev->event == NULL)
continue;
if (sinkpad == NULL || !find_event (sinkpad, ev->event)) {
ev->received = FALSE;
pending = TRUE;
}
}
if (pending)
GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PENDING_EVENTS);
}
typedef gboolean (*PadEventFunction) (GstPad * pad, PadEvent * ev,
gpointer user_data);
static void
events_foreach (GstPad * pad, PadEventFunction func, gpointer user_data)
{
guint i, len;
GArray *events;
gboolean ret;
guint cookie;
events = pad->priv->events;
restart:
cookie = pad->priv->events_cookie;
i = 0;
len = events->len;
while (i < len) {
PadEvent *ev, ev_ret;
ev = &g_array_index (events, PadEvent, i);
if (G_UNLIKELY (ev->event == NULL))
goto next;
/* take aditional ref, func might release the lock */
ev_ret.event = gst_event_ref (ev->event);
ev_ret.received = ev->received;
ret = func (pad, &ev_ret, user_data);
/* recheck the cookie, lock might have been released and the list could have
* changed */
if (G_UNLIKELY (cookie != pad->priv->events_cookie)) {
if (G_LIKELY (ev_ret.event))
gst_event_unref (ev_ret.event);
goto restart;
}
/* if the event changed, we need to do something */
if (G_UNLIKELY (ev->event != ev_ret.event)) {
if (G_UNLIKELY (ev_ret.event == NULL)) {
/* function unreffed and set the event to NULL, remove it */
g_array_remove_index (events, i);
len--;
cookie = ++pad->priv->events_cookie;
continue;
} else {
/* function gave a new event for us */
gst_event_take (&ev->event, ev_ret.event);
}
} else {
/* just unref, nothing changed */
gst_event_unref (ev_ret.event);
}
if (!ret)
break;
next:
i++;
}
}
/* should be called with LOCK */
static GstEvent *
......@@ -392,14 +534,11 @@ static GstCaps *
get_pad_caps (GstPad * pad)
{
GstCaps *caps = NULL;
GstEvent *event;
guint idx;
PadEvent *ev;
idx = GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_CAPS);
/* we can only use the caps when we have successfully send the caps
* event to the event function and is thus in the active entry */
if ((event = pad->priv->events[idx].event))
gst_event_parse_caps (event, &caps);
ev = find_event_by_type (pad, GST_EVENT_CAPS, 0);
if (ev && ev->event)
gst_event_parse_caps (ev->event, &caps);
return caps;
}
......@@ -427,7 +566,7 @@ gst_pad_dispose (GObject * object)
gst_pad_set_pad_template (pad, NULL);
clear_events (pad);
remove_events (pad);
g_hook_list_clear (&pad->probes);
......@@ -470,6 +609,7 @@ gst_pad_finalize (GObject * object)
g_static_rec_mutex_free (&pad->stream_rec_lock);
g_cond_free (pad->block_cond);
g_array_free (pad->priv->events, TRUE);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
......@@ -661,7 +801,7 @@ pre_activate (GstPad * pad, GstPadMode new_mode)
GST_DEBUG_PAD_NAME (peer));
GST_OBJECT_LOCK (peer);
reschedule_events (peer);
schedule_events (peer, NULL);
GST_OBJECT_UNLOCK (peer);
gst_object_unref (peer);
......@@ -684,7 +824,7 @@ post_activate (GstPad * pad, GstPadMode new_mode)
GST_PAD_STREAM_LOCK (pad);
GST_DEBUG_OBJECT (pad, "stopped streaming");
GST_OBJECT_LOCK (pad);
clear_events (pad);
remove_events (pad);
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
break;
......@@ -1836,7 +1976,6 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
GstPadLinkReturn result;
GstElement *parent;
GstPadLinkFunction srcfunc, sinkfunc;
guint idx;
g_return_val_if_fail (GST_IS_PAD (srcpad), GST_PAD_LINK_REFUSED);
g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), GST_PAD_LINK_WRONG_DIRECTION);
......@@ -1867,17 +2006,7 @@ gst_pad_link_full (GstPad * srcpad, GstPad * sinkpad, GstPadLinkCheck flags)
GST_PAD_PEER (sinkpad) = srcpad;
/* check events, when something is different, mark pending */
for (idx = 0; idx < GST_EVENT_MAX_STICKY; idx++) {
PadEvent *srcev, *sinkev;
srcev = &srcpad->priv->events[idx];
sinkev = &sinkpad->priv->events[idx];
if (srcev->event != sinkev->event) {
GST_OBJECT_FLAG_SET (srcpad, GST_PAD_FLAG_PENDING_EVENTS);
break;
}
}
schedule_events (srcpad, sinkpad);
/* get the link functions */
srcfunc = GST_PAD_LINKFUNC (srcpad);
......@@ -2904,7 +3033,7 @@ gst_pad_get_offset (GstPad * pad)
void
gst_pad_set_offset (GstPad * pad, gint64 offset)
{
guint idx;
PadEvent *ev;
g_return_if_fail (GST_IS_PAD (pad));
......@@ -2921,12 +3050,9 @@ gst_pad_set_offset (GstPad * pad, gint64 offset)
if (GST_PAD_IS_SINK (pad))
goto done;
/* the index of the segment event in the array */
idx = GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_SEGMENT);
/* resend the last segment event on next buffer push */
if (pad->priv->events[idx].event) {
pad->priv->events[idx].received = FALSE;
if ((ev = find_event_by_type (pad, GST_EVENT_SEGMENT, 0))) {
ev->received = FALSE;
GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
}
......@@ -3109,54 +3235,26 @@ probe_stopped:
* Data passing functions
*/
/* should be called with the LOCK */
static inline GstFlowReturn
gst_pad_push_sticky_events (GstPad * pad)
static gboolean
push_sticky (GstPad * pad, PadEvent * ev, gpointer user_data)
{
GstFlowReturn ret;
gint i;
restart:
ret = GST_FLOW_OK;
GST_DEBUG_OBJECT (pad, "pushing out all sticky events");
for (i = 0; i < GST_EVENT_MAX_STICKY; i++) {
gboolean res;
PadEvent *ev;
GstEvent *event;
ev = &pad->priv->events[i];
/* skip without active event */
if ((event = ev->event) == NULL)
continue;
if (ev->received) {
GST_DEBUG_OBJECT (pad, "event %s was already received",
GST_EVENT_TYPE_NAME (event));
continue;
}
gboolean res;
GstFlowReturn *data = user_data;
gst_event_ref (event);
GST_OBJECT_UNLOCK (pad);
if (ev->received) {
GST_DEBUG_OBJECT (pad, "event %s was already received",
GST_EVENT_TYPE_NAME (ev->event));
return TRUE;
}
GST_OBJECT_UNLOCK (pad);
res = gst_pad_push_event (pad, event);
res = gst_pad_push_event (pad, gst_event_ref (ev->event));
GST_OBJECT_LOCK (pad);
if (!res) {
ret = GST_FLOW_ERROR;
break;
}
/* things could have changed while we release the lock, check if we still
* are handling the same event, if we don't something changed and we have
* to try again. FIXME. we need a cookie here. */
if (event != ev->event) {
GST_DEBUG_OBJECT (pad, "events changed, restarting");
goto restart;
}
}
GST_OBJECT_LOCK (pad);
if (!res)
*data = GST_FLOW_ERROR;
return ret;
return res;
}
/* this is the chain function that does not perform the additional argument
......@@ -3377,7 +3475,9 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) {
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
if ((ret = gst_pad_push_sticky_events (pad)))
ret = GST_FLOW_OK;
events_foreach (pad, push_sticky, &ret);
if (ret != GST_FLOW_OK)
goto events_error;
}
......@@ -3540,7 +3640,9 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
if (G_UNLIKELY (GST_PAD_HAS_PENDING_EVENTS (pad))) {
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_PENDING_EVENTS);
if ((ret = gst_pad_push_sticky_events (pad)))
ret = GST_FLOW_OK;
events_foreach (pad, push_sticky, &ret);
if (ret != GST_FLOW_OK)
goto events_error;
}
......@@ -3794,16 +3896,50 @@ post_probe_stopped:
static gboolean
gst_pad_store_sticky_event (GstPad * pad, GstEvent * event, gboolean locked)
{
guint idx;
gboolean ret;
guint i, len;
GstEventType type;
GArray *events;
gboolean res = FALSE;
const gchar *name = NULL;
type = GST_EVENT_TYPE (event);
if (type & GST_EVENT_TYPE_STICKY_MULTI)
name = gst_structure_get_name (gst_event_get_structure (event));
idx = GST_EVENT_STICKY_IDX (event);
events = pad->priv->events;
len = events->len;
ret = gst_event_replace (&pad->priv->events[idx].event, event);
for (i = 0; i < len; i++) {
PadEvent *ev = &g_array_index (events, PadEvent, i);
if (ev->event == NULL)
continue;
if (type == GST_EVENT_TYPE (ev->event)) {
/* matching types, check matching name if needed */
if (name && !gst_event_has_name (ev->event, name))
continue;
/* overwrite */
if ((res = gst_event_replace (&ev->event, event)))
ev->received = FALSE;
break;
}
}
if (i == len) {
PadEvent ev;
ev.event = gst_event_ref (event);
ev.received = FALSE;
g_array_append_val (events, ev);
res = TRUE;
}
if (res) {
pad->priv->events_cookie++;
GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
GST_LOG_OBJECT (pad, "stored sticky event %s", GST_EVENT_TYPE_NAME (event));
if (ret) {
GST_LOG_OBJECT (pad, "stored sticky event %s at index %u",
GST_EVENT_TYPE_NAME (event), idx);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
if (locked)
......@@ -3819,7 +3955,7 @@ gst_pad_store_sticky_event (GstPad * pad, GstEvent * event, gboolean locked)
break;
}
}
return ret;
return res;
}
/**
......@@ -3846,7 +3982,6 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
gboolean result;
gboolean stored = FALSE;
GstPadProbeType type;
guint idx = 0;
gboolean sticky;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
......@@ -3856,8 +3991,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
if (GST_PAD_IS_SRC (pad)) {
if (G_UNLIKELY (!GST_EVENT_IS_DOWNSTREAM (event)))
goto wrong_direction;
if ((sticky = GST_EVENT_IS_STICKY (event)))
idx = GST_EVENT_STICKY_IDX (event);
sticky = GST_EVENT_IS_STICKY (event);
type = GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM;
} else if (GST_PAD_IS_SINK (pad)) {
if (G_UNLIKELY (!GST_EVENT_IS_UPSTREAM (event)))
......@@ -3892,7 +4026,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
/* Remove sticky EOS events */
GST_LOG_OBJECT (pad, "Removing pending EOS events");
clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
remove_event_by_type (pad, GST_EVENT_EOS);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
......@@ -3907,13 +4041,12 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
/* store the event on the pad, but only on srcpads. We always store the
* event exactly how it was sent */
if (sticky) {
/* srcpad sticky events are store immediately, we set the pending flag
* to TRUE, it will be set to FALSE when we can successfully push the
/* srcpad sticky events are store immediately, the received flag is set
* to FALSE and will be set to TRUE when we can successfully push the
* event to the peer pad */
if (gst_pad_store_sticky_event (pad, event, TRUE)) {
GST_DEBUG_OBJECT (pad, "event %s updated",
GST_EVENT_TYPE_NAME (event));
pad->priv->events[idx].received = FALSE;
}
stored = TRUE;
}
......@@ -3965,8 +4098,12 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
GST_OBJECT_LOCK (pad);
if (sticky) {
if (result) {
pad->priv->events[idx].received = TRUE;
GST_DEBUG_OBJECT (pad, "event cleared pending");
PadEvent *ev;
if ((ev = find_event (pad, event)))
ev->received = TRUE;
GST_DEBUG_OBJECT (pad, "event marked received");
} else {
GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
GST_DEBUG_OBJECT (pad, "mark pending events");
......@@ -4139,7 +4276,7 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
}
/* Remove pending EOS events */
GST_LOG_OBJECT (pad, "Removing pending EOS events");
clear_event (pad, GST_EVENT_STICKY_IDX_TYPE (GST_EVENT_EOS));
remove_event_by_type (pad, GST_EVENT_EOS);
GST_OBJECT_UNLOCK (pad);
/* grab stream lock */
......@@ -4315,92 +4452,78 @@ gst_pad_get_element_private (GstPad * pad)
* gst_pad_get_sticky_event:
* @pad: the #GstPad to get the event from.
* @event_type: the #GstEventType that should be retrieved.
* @idx: the index of the event
*
* Returns a new reference of the sticky event of type @event_type
* from the event.
*
* Returns: (transfer full): a #GstEvent of type @event_type. Unref after usage.
* Returns: (transfer full): a #GstEvent of type @event_type or NULL when no
* event of @event_type was on @pad. Unref after usage.
*/
GstEvent *
gst_pad_get_sticky_event (GstPad * pad, GstEventType event_type)
gst_pad_get_sticky_event (GstPad * pad, GstEventType event_type, guint idx)
{
GstEvent *event = NULL;
guint idx;
PadEvent *ev;
g_return_val_if_fail (GST_IS_PAD (pad), NULL);
g_return_val_if_fail ((event_type & GST_EVENT_TYPE_STICKY) != 0, NULL);
idx = GST_EVENT_STICKY_IDX_TYPE (event_type);
GST_OBJECT_LOCK (pad);
if ((event = pad->priv->events[idx].event)) {
ev = find_event_by_type (pad, event_type, idx);
if (ev && (event = ev->event))
gst_event_ref (event);
}
GST_OBJECT_UNLOCK (pad);
return event;
}
typedef struct
{
GstPadStickyEventsForeachFunction func;
gpointer user_data;
} ForeachDispatch;
static gboolean
foreach_dispatch_function (GstPad * pad, PadEvent * ev, gpointer user_data)
{
ForeachDispatch *data = user_data;
gboolean ret;
GST_OBJECT_UNLOCK (pad);
ret = data->func (pad, &ev->event, data->user_data);
GST_OBJECT_LOCK (pad);
return ret;
}
/**
* gst_pad_sticky_events_foreach:
* @pad: the #GstPad that should be used for iteration.
* @foreach_func: (scope call): the #GstPadStickyEventsForeachFunction that should be called for every event.
* @foreach_func: (scope call): the #GstPadStickyEventsForeachFunction that
* should be called for every event.
* @user_data: (closure): the optional user data.
*
* Iterates all active sticky events on @pad and calls @foreach_func for every
* event. If @foreach_func returns something else than GST_FLOW_OK the iteration
* is immediately stopped.
*