gstinputselector.c 39.9 KB
Newer Older
1
/* GStreamer input selector
2 3 4 5 6 7
 * Copyright (C) 2003 Julien Moutte <julien@moutte.net>
 * Copyright (C) 2005 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
 * Copyright (C) 2005 Jan Schmidt <thaytan@mad.scientist.com>
 * Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
 * Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
 * Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
8
 * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
 * SECTION:element-input-selector
 * @see_also: #GstOutputSelector
 *
 * Direct one out of N input streams to the output pad.
31
 *
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
 * The input pads are from a GstPad subclass and have additional
 * properties, which users may find useful, namely:
 *
 * <itemizedlist>
 * <listitem>
 * "running-time": Running time of stream on pad (#gint64)
 * </listitem>
 * <listitem>
 * "tags": The currently active tags on the pad (#GstTagList, boxed type)
 * </listitem>
 * <listitem>
 * "active": If the pad is currently active (#gboolean)
 * </listitem>
 * <listitem>
 * "always-ok" : Make an inactive pads return #GST_FLOW_OK instead of
 * #GST_FLOW_NOT_LINKED
 * </listitem>
 * </itemizedlist>
 *
51
 * Since: 0.10.32
52 53 54 55 56 57 58 59 60 61
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <string.h>

#include "gstinputselector.h"

62 63
#include "gst/glib-compat-private.h"

64 65 66 67
GST_DEBUG_CATEGORY_STATIC (input_selector_debug);
#define GST_CAT_DEFAULT input_selector_debug

static GstStaticPadTemplate gst_input_selector_sink_factory =
Wim Taymans's avatar
Wim Taymans committed
68
GST_STATIC_PAD_TEMPLATE ("sink_%u",
69 70 71 72 73 74 75 76 77 78 79 80
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate gst_input_selector_src_factory =
GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

enum
{
81 82
  PROP_0,
  PROP_N_PADS,
83 84
  PROP_ACTIVE_PAD,
  PROP_SYNC_STREAMS
85 86
};

87 88
#define DEFAULT_SYNC_STREAMS FALSE

89
#define DEFAULT_PAD_ALWAYS_OK TRUE
90

91 92
enum
{
93 94 95 96
  PROP_PAD_0,
  PROP_PAD_RUNNING_TIME,
  PROP_PAD_TAGS,
  PROP_PAD_ACTIVE,
97
  PROP_PAD_ALWAYS_OK
98 99 100 101 102 103 104 105 106 107 108
};

enum
{
  /* methods */
  SIGNAL_BLOCK,
  SIGNAL_SWITCH,
  LAST_SIGNAL
};
static guint gst_input_selector_signals[LAST_SIGNAL] = { 0 };

109 110
static inline gboolean gst_input_selector_is_active_sinkpad (GstInputSelector *
    sel, GstPad * pad);
111 112
static GstPad *gst_input_selector_activate_sinkpad (GstInputSelector * sel,
    GstPad * pad);
113 114
static GstPad *gst_input_selector_get_linked_pad (GstInputSelector * sel,
    GstPad * pad, gboolean strict);
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135

#define GST_TYPE_SELECTOR_PAD \
  (gst_selector_pad_get_type())
#define GST_SELECTOR_PAD(obj) \
  (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SELECTOR_PAD, GstSelectorPad))
#define GST_SELECTOR_PAD_CLASS(klass) \
  (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SELECTOR_PAD, GstSelectorPadClass))
#define GST_IS_SELECTOR_PAD(obj) \
  (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SELECTOR_PAD))
#define GST_IS_SELECTOR_PAD_CLASS(klass) \
  (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SELECTOR_PAD))
#define GST_SELECTOR_PAD_CAST(obj) \
  ((GstSelectorPad *)(obj))

typedef struct _GstSelectorPad GstSelectorPad;
typedef struct _GstSelectorPadClass GstSelectorPadClass;

struct _GstSelectorPad
{
  GstPad parent;

136
  gboolean active;              /* when buffer have passed the pad */
137
  gboolean pushed;              /* when buffer was pushed downstream since activation */
138
  gboolean eos;                 /* when EOS has been received */
139
  gboolean eos_sent;            /* when EOS was sent downstream */
140
  gboolean discont;             /* after switching we create a discont */
141
  gboolean flushing;            /* set after flush-start and before flush-stop */
142
  gboolean always_ok;
143 144
  GstTagList *tags;             /* last tags received on the pad */

145 146 147 148
  GstClockTime position;        /* the current position in the segment */
  GstSegment segment;           /* the current segment on the pad */
  guint32 segment_seqnum;       /* sequence number of the current segment */

149 150 151 152 153 154 155 156
  gboolean segment_pending;
};

struct _GstSelectorPadClass
{
  GstPadClass parent;
};

157
GType gst_selector_pad_get_type (void);
158 159 160
static void gst_selector_pad_finalize (GObject * object);
static void gst_selector_pad_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);
161 162
static void gst_selector_pad_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
163 164 165

static gint64 gst_selector_pad_get_running_time (GstSelectorPad * pad);
static void gst_selector_pad_reset (GstSelectorPad * pad);
166 167
static gboolean gst_selector_pad_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
168 169
static gboolean gst_selector_pad_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
Wim Taymans's avatar
Wim Taymans committed
170 171
static GstIterator *gst_selector_pad_iterate_linked_pads (GstPad * pad,
    GstObject * parent);
172 173
static GstFlowReturn gst_selector_pad_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buf);
174

175
G_DEFINE_TYPE (GstSelectorPad, gst_selector_pad, GST_TYPE_PAD);
176 177 178 179 180 181 182 183

static void
gst_selector_pad_class_init (GstSelectorPadClass * klass)
{
  GObjectClass *gobject_class;

  gobject_class = (GObjectClass *) klass;

184 185
  gobject_class->finalize = gst_selector_pad_finalize;

186 187
  gobject_class->get_property = gst_selector_pad_get_property;
  gobject_class->set_property = gst_selector_pad_set_property;
188 189

  g_object_class_install_property (gobject_class, PROP_PAD_RUNNING_TIME,
190
      g_param_spec_int64 ("running-time", "Running time",
191 192
          "Running time of stream on pad", 0, G_MAXINT64, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
193 194 195
  g_object_class_install_property (gobject_class, PROP_PAD_TAGS,
      g_param_spec_boxed ("tags", "Tags",
          "The currently active tags on the pad", GST_TYPE_TAG_LIST,
196
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
197 198
  g_object_class_install_property (gobject_class, PROP_PAD_ACTIVE,
      g_param_spec_boolean ("active", "Active",
199 200
          "If the pad is currently active", FALSE,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
201
  /* FIXME: better property name? */
202 203 204
  g_object_class_install_property (gobject_class, PROP_PAD_ALWAYS_OK,
      g_param_spec_boolean ("always-ok", "Always OK",
          "Make an inactive pad return OK instead of NOT_LINKED",
205
          DEFAULT_PAD_ALWAYS_OK, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 207 208 209 210
}

static void
gst_selector_pad_init (GstSelectorPad * pad)
{
211
  pad->always_ok = DEFAULT_PAD_ALWAYS_OK;
212 213 214 215 216 217 218 219 220 221
  gst_selector_pad_reset (pad);
}

static void
gst_selector_pad_finalize (GObject * object)
{
  GstSelectorPad *pad;

  pad = GST_SELECTOR_PAD_CAST (object);

222 223 224
  if (pad->tags)
    gst_tag_list_free (pad->tags);

225
  G_OBJECT_CLASS (gst_selector_pad_parent_class)->finalize (object);
226 227
}

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
static void
gst_selector_pad_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);

  switch (prop_id) {
    case PROP_PAD_ALWAYS_OK:
      GST_OBJECT_LOCK (object);
      spad->always_ok = g_value_get_boolean (value);
      GST_OBJECT_UNLOCK (object);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

246 247 248 249 250 251 252
static void
gst_selector_pad_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstSelectorPad *spad = GST_SELECTOR_PAD_CAST (object);

  switch (prop_id) {
253
    case PROP_PAD_RUNNING_TIME:
254 255
      g_value_set_int64 (value, gst_selector_pad_get_running_time (spad));
      break;
256 257 258 259 260 261 262 263
    case PROP_PAD_TAGS:
      GST_OBJECT_LOCK (object);
      g_value_set_boxed (value, spad->tags);
      GST_OBJECT_UNLOCK (object);
      break;
    case PROP_PAD_ACTIVE:
    {
      GstInputSelector *sel;
264

265 266 267 268 269 270
      sel = GST_INPUT_SELECTOR (gst_pad_get_parent (spad));
      g_value_set_boolean (value, gst_input_selector_is_active_sinkpad (sel,
              GST_PAD_CAST (spad)));
      gst_object_unref (sel);
      break;
    }
271 272 273 274 275
    case PROP_PAD_ALWAYS_OK:
      GST_OBJECT_LOCK (object);
      g_value_set_boolean (value, spad->always_ok);
      GST_OBJECT_UNLOCK (object);
      break;
276 277 278 279 280 281 282 283 284 285 286 287 288
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static gint64
gst_selector_pad_get_running_time (GstSelectorPad * pad)
{
  gint64 ret = 0;

  GST_OBJECT_LOCK (pad);
  if (pad->active) {
289
    guint64 position = pad->position;
290
    GstFormat format = pad->segment.format;
291

292
    ret = gst_segment_to_running_time (&pad->segment, format, position);
293 294 295 296 297 298 299 300 301 302 303 304
  }
  GST_OBJECT_UNLOCK (pad);

  GST_DEBUG_OBJECT (pad, "running time: %" GST_TIME_FORMAT,
      GST_TIME_ARGS (ret));

  return ret;
}

static void
gst_selector_pad_reset (GstSelectorPad * pad)
{
305
  GST_OBJECT_LOCK (pad);
306
  pad->active = FALSE;
307
  pad->pushed = FALSE;
308
  pad->eos = FALSE;
309
  pad->eos_sent = FALSE;
310
  pad->segment_pending = FALSE;
311
  pad->discont = FALSE;
312
  pad->flushing = FALSE;
313
  pad->position = GST_CLOCK_TIME_NONE;
314
  gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
315
  GST_OBJECT_UNLOCK (pad);
316 317 318 319
}

/* strictly get the linked pad from the sinkpad. If the pad is active we return
 * the srcpad else we return NULL */
320
static GstIterator *
Wim Taymans's avatar
Wim Taymans committed
321
gst_selector_pad_iterate_linked_pads (GstPad * pad, GstObject * parent)
322
{
323
  GstInputSelector *sel;
324
  GstPad *otherpad;
325
  GstIterator *it = NULL;
326
  GValue val = { 0, };
327

Wim Taymans's avatar
Wim Taymans committed
328
  sel = GST_INPUT_SELECTOR (parent);
329 330

  otherpad = gst_input_selector_get_linked_pad (sel, pad, TRUE);
331 332 333 334 335
  if (otherpad) {
    g_value_init (&val, GST_TYPE_PAD);
    g_value_set_object (&val, otherpad);
    it = gst_iterator_new_single (GST_TYPE_PAD, &val);
    g_value_unset (&val);
336
    gst_object_unref (otherpad);
337
  }
338

339
  return it;
340 341 342
}

static gboolean
343
gst_selector_pad_event (GstPad * pad, GstObject * parent, GstEvent * event)
344 345
{
  gboolean res = TRUE;
346
  gboolean forward;
347 348
  GstInputSelector *sel;
  GstSelectorPad *selpad;
349
  GstPad *prev_active_sinkpad;
350
  GstPad *active_sinkpad;
351

352
  sel = GST_INPUT_SELECTOR (parent);
353 354
  selpad = GST_SELECTOR_PAD_CAST (pad);

355
  GST_INPUT_SELECTOR_LOCK (sel);
356 357
  prev_active_sinkpad = sel->active_sinkpad;
  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
358

359 360
  /* only forward if we are dealing with the active sinkpad */
  forward = (pad == active_sinkpad);
361
  GST_INPUT_SELECTOR_UNLOCK (sel);
362

363
  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
364
    g_object_notify (G_OBJECT (sel), "active-pad");
365
  }
366

367
  switch (GST_EVENT_TYPE (event)) {
368
    case GST_EVENT_FLUSH_START:
369 370 371 372 373
      /* Unblock the pad if it's waiting */
      GST_INPUT_SELECTOR_LOCK (sel);
      selpad->flushing = TRUE;
      GST_INPUT_SELECTOR_BROADCAST (sel);
      GST_INPUT_SELECTOR_UNLOCK (sel);
374
      break;
375
    case GST_EVENT_FLUSH_STOP:
376
      GST_INPUT_SELECTOR_LOCK (sel);
377
      gst_selector_pad_reset (selpad);
378
      GST_INPUT_SELECTOR_UNLOCK (sel);
379
      break;
Wim Taymans's avatar
Wim Taymans committed
380
    case GST_EVENT_SEGMENT:
381
    {
382 383
      GST_INPUT_SELECTOR_LOCK (sel);
      GST_OBJECT_LOCK (selpad);
384
      gst_event_copy_segment (event, &selpad->segment);
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
      selpad->segment_seqnum = gst_event_get_seqnum (event);

      /* Update the position */
      if (selpad->position == GST_CLOCK_TIME_NONE
          || selpad->segment.position > selpad->position) {
        selpad->position = selpad->segment.position;
      } else if (selpad->position != GST_CLOCK_TIME_NONE
          && selpad->position > selpad->segment.position) {
        selpad->segment.position = selpad->position;

        if (forward) {
          gst_event_unref (event);
          event = gst_event_new_segment (&selpad->segment);
          gst_event_set_seqnum (event, selpad->segment_seqnum);
        }
      }
Wim Taymans's avatar
Wim Taymans committed
401 402
      GST_DEBUG_OBJECT (pad, "configured SEGMENT %" GST_SEGMENT_FORMAT,
          &selpad->segment);
403

404 405
      /* If we aren't forwarding the event because the pad is not the
       * active_sinkpad, then set the flag on the pad
406 407 408 409 410 411 412
       * that says a segment needs sending if/when that pad is activated.
       * For all other cases, we send the event immediately, which makes
       * sparse streams and other segment updates work correctly downstream.
       */
      if (!forward)
        selpad->segment_pending = TRUE;

413
      GST_OBJECT_UNLOCK (selpad);
414
      GST_INPUT_SELECTOR_UNLOCK (sel);
415 416
      break;
    }
417 418
    case GST_EVENT_TAG:
    {
Wim Taymans's avatar
Wim Taymans committed
419
      GstTagList *tags, *oldtags, *newtags;
420 421

      gst_event_parse_tag (event, &tags);
Wim Taymans's avatar
Wim Taymans committed
422 423 424 425 426 427 428 429 430

      GST_OBJECT_LOCK (selpad);
      oldtags = selpad->tags;

      newtags = gst_tag_list_merge (oldtags, tags, GST_TAG_MERGE_REPLACE);
      selpad->tags = newtags;
      if (oldtags)
        gst_tag_list_free (oldtags);
      GST_DEBUG_OBJECT (pad, "received tags %" GST_PTR_FORMAT, newtags);
431
      GST_OBJECT_UNLOCK (selpad);
432 433

      g_object_notify (G_OBJECT (selpad), "tags");
434 435
      break;
    }
436 437
    case GST_EVENT_EOS:
      selpad->eos = TRUE;
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456

      if (forward) {
        selpad->eos_sent = TRUE;
      } else {
        GstSelectorPad *tmp;

        /* If the active sinkpad is in EOS state but EOS
         * was not sent downstream this means that the pad
         * got EOS before it was set as active pad and that
         * the previously active pad got EOS after it was
         * active
         */
        GST_INPUT_SELECTOR_LOCK (sel);
        active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
        tmp = GST_SELECTOR_PAD (active_sinkpad);
        forward = (tmp->eos && !tmp->eos_sent);
        tmp->eos_sent = TRUE;
        GST_INPUT_SELECTOR_UNLOCK (sel);
      }
457
      GST_DEBUG_OBJECT (pad, "received EOS");
458 459 460 461
      break;
    default:
      break;
  }
462 463
  if (forward) {
    GST_DEBUG_OBJECT (pad, "forwarding event");
464
    res = gst_pad_push_event (sel->srcpad, event);
465
  } else
466
    gst_event_unref (event);
467 468 469 470

  return res;
}

Wim Taymans's avatar
Wim Taymans committed
471
static gboolean
472
gst_selector_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
Wim Taymans's avatar
Wim Taymans committed
473 474 475 476 477
{
  gboolean res = FALSE;

  switch (GST_QUERY_TYPE (query)) {
    default:
478
      res = gst_pad_query_default (pad, parent, query);
Wim Taymans's avatar
Wim Taymans committed
479 480 481 482 483 484
      break;
  }

  return res;
}

485 486
/* must be called with the SELECTOR_LOCK, will block while the pad is blocked 
 * or return TRUE when flushing */
487
static gboolean
488
gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
489
{
490
  while (self->blocked && !self->flushing && !pad->flushing) {
491 492 493 494 495
    /* we can be unlocked here when we are shutting down (flushing) or when we
     * get unblocked */
    GST_INPUT_SELECTOR_WAIT (self);
  }
  return self->flushing;
496 497
}

498 499 500 501 502 503 504 505 506
/* must be called with the SELECTOR_LOCK, will block until the running time
 * of the active pad is after this pad or return TRUE when flushing */
static gboolean
gst_input_selector_wait_running_time (GstInputSelector * sel,
    GstSelectorPad * pad, GstBuffer * buf)
{
  GstPad *active_sinkpad;
  GstSelectorPad *active_selpad;
  GstSegment *seg, *active_seg;
507
  GstClockTime running_time, active_running_time = GST_CLOCK_TIME_NONE;
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542

  seg = &pad->segment;

  active_sinkpad =
      gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
  active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
  active_seg = &active_selpad->segment;

  /* We can only sync if the segments are in time format or
   * if the active pad had no newsegment event yet */
  if (seg->format != GST_FORMAT_TIME ||
      (active_seg->format != GST_FORMAT_TIME
          && active_seg->format != GST_FORMAT_UNDEFINED))
    return FALSE;

  /* If we have no valid timestamp we can't sync this buffer */
  if (!GST_BUFFER_TIMESTAMP_IS_VALID (buf))
    return FALSE;

  running_time = GST_BUFFER_TIMESTAMP (buf);
  /* If possible try to get the running time at the end of the buffer */
  if (GST_BUFFER_DURATION_IS_VALID (buf))
    running_time += GST_BUFFER_DURATION (buf);
  if (running_time > seg->stop)
    running_time = seg->stop;
  running_time =
      gst_segment_to_running_time (seg, GST_FORMAT_TIME, running_time);
  /* If this is outside the segment don't sync */
  if (running_time == -1)
    return FALSE;

  /* Get active pad's running time, if no configured segment yet keep at -1 */
  if (active_seg->format == GST_FORMAT_TIME)
    active_running_time =
        gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
543
        active_selpad->position);
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579

  /* Wait until
   *   a) this is the active pad
   *   b) the pad or the selector is flushing
   *   c) the selector is not blocked
   *   d) the active pad has no running time or the active
   *      pad's running time is before this running time
   *   e) the active pad has a non-time segment
   */
  while (pad != active_selpad && !sel->flushing && !pad->flushing &&
      (sel->blocked || active_running_time == -1
          || running_time >= active_running_time)) {
    if (!sel->blocked)
      GST_DEBUG_OBJECT (pad,
          "Waiting for active streams to advance. %" GST_TIME_FORMAT " >= %"
          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
          GST_TIME_ARGS (active_running_time));

    GST_INPUT_SELECTOR_WAIT (sel);

    /* Get new active pad, it might have changed */
    active_sinkpad =
        gst_input_selector_activate_sinkpad (sel, GST_PAD_CAST (pad));
    active_selpad = GST_SELECTOR_PAD_CAST (active_sinkpad);
    active_seg = &active_selpad->segment;

    /* If the active segment is configured but not to time format
     * we can't do any syncing at all */
    if (active_seg->format != GST_FORMAT_TIME
        && active_seg->format != GST_FORMAT_UNDEFINED)
      break;

    /* Get the new active pad running time */
    if (active_seg->format == GST_FORMAT_TIME)
      active_running_time =
          gst_segment_to_running_time (active_seg, GST_FORMAT_TIME,
580
          active_selpad->position);
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
    else
      active_running_time = -1;

    if (!sel->blocked)
      GST_DEBUG_OBJECT (pad,
          "Waited for active streams to advance. %" GST_TIME_FORMAT " >= %"
          GST_TIME_FORMAT, GST_TIME_ARGS (running_time),
          GST_TIME_ARGS (active_running_time));

  }

  /* Return TRUE if the selector or the pad is flushing */
  return (sel->flushing || pad->flushing);
}


597
static GstFlowReturn
598
gst_selector_pad_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
599 600 601 602
{
  GstInputSelector *sel;
  GstFlowReturn res;
  GstPad *active_sinkpad;
603
  GstPad *prev_active_sinkpad;
604
  GstSelectorPad *selpad;
605
  GstClockTime start_time;
606
  GstSegment *seg;
Wim Taymans's avatar
Wim Taymans committed
607
  GstEvent *start_event = NULL;
608

609
  sel = GST_INPUT_SELECTOR (parent);
610 611 612
  selpad = GST_SELECTOR_PAD_CAST (pad);
  seg = &selpad->segment;

613 614
  GST_INPUT_SELECTOR_LOCK (sel);
  /* wait or check for flushing */
615
  if (gst_input_selector_wait (sel, selpad))
616
    goto flushing;
617

618
  GST_LOG_OBJECT (pad, "getting active pad");
619

620
  prev_active_sinkpad = sel->active_sinkpad;
621 622
  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);

623 624 625 626 627 628 629 630 631 632
  /* In sync mode wait until the active pad has advanced
   * after the running time of the current buffer */
  if (sel->sync_streams && active_sinkpad != pad) {
    if (gst_input_selector_wait_running_time (sel, selpad, buf))
      goto flushing;
  }

  /* Might have changed while waiting */
  active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);

633
  /* update the segment on the srcpad */
634 635
  start_time = GST_BUFFER_TIMESTAMP (buf);
  if (GST_CLOCK_TIME_IS_VALID (start_time)) {
636
    GST_LOG_OBJECT (pad, "received start time %" GST_TIME_FORMAT,
637
        GST_TIME_ARGS (start_time));
638
    if (GST_BUFFER_DURATION_IS_VALID (buf))
639
      GST_LOG_OBJECT (pad, "received end time %" GST_TIME_FORMAT,
640
          GST_TIME_ARGS (start_time + GST_BUFFER_DURATION (buf)));
641 642

    GST_OBJECT_LOCK (pad);
643
    selpad->position = start_time;
644
    GST_OBJECT_UNLOCK (pad);
645 646 647
  }

  /* Ignore buffers from pads except the selected one */
648
  if (pad != active_sinkpad)
649 650
    goto ignore;

651 652 653 654
  /* Tell all non-active pads that we advanced the running time */
  if (sel->sync_streams)
    GST_INPUT_SELECTOR_BROADCAST (sel);

655
  /* if we have a pending segment, push it out now */
656 657
  if (G_UNLIKELY (prev_active_sinkpad != active_sinkpad
          || selpad->segment_pending)) {
658 659 660 661 662
    if (G_UNLIKELY (seg->format == GST_FORMAT_UNDEFINED)) {
      GST_ERROR_OBJECT (pad, "Buffers arrived before NEWSEGMENT event");
    } else {
      GST_DEBUG_OBJECT (pad,
          "pushing pending NEWSEGMENT update %d, rate %lf, applied rate %lf, "
663
          "format %d, " "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
664 665
          G_GINT64_FORMAT, FALSE, seg->rate, seg->applied_rate, seg->format,
          seg->start, seg->stop, seg->time);
666

667 668
      start_event = gst_event_new_segment (seg);
      gst_event_set_seqnum (start_event, selpad->segment_seqnum);
669 670
      selpad->segment_pending = FALSE;
    }
671
  }
672
  GST_INPUT_SELECTOR_UNLOCK (sel);
673

674
  if (prev_active_sinkpad != active_sinkpad && pad == active_sinkpad) {
675
    g_object_notify (G_OBJECT (sel), "active-pad");
676
  }
677

678 679 680 681
  if (start_event)
    gst_pad_push_event (sel->srcpad, start_event);

  if (selpad->discont) {
682
    buf = gst_buffer_make_writable (buf);
683 684 685 686 687

    GST_DEBUG_OBJECT (pad, "Marking discont buffer %p", buf);
    GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
    selpad->discont = FALSE;
  }
688 689

  /* forward */
690
  GST_LOG_OBJECT (pad, "Forwarding buffer %p", buf);
691

692
  res = gst_pad_push (sel->srcpad, buf);
693
  selpad->pushed = TRUE;
694

695 696
done:
  return res;
697

698 699 700
  /* dropped buffers */
ignore:
  {
701 702
    gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;

703 704 705 706 707
    GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
    /* when we drop a buffer, we're creating a discont on this pad */
    selpad->discont = TRUE;
    GST_INPUT_SELECTOR_UNLOCK (sel);
    gst_buffer_unref (buf);
708 709 710

    /* figure out what to return upstream */
    GST_OBJECT_LOCK (selpad);
711
    if (selpad->always_ok || !active_pad_pushed)
712 713 714 715 716
      res = GST_FLOW_OK;
    else
      res = GST_FLOW_NOT_LINKED;
    GST_OBJECT_UNLOCK (selpad);

717 718 719 720 721 722
    goto done;
  }
flushing:
  {
    GST_DEBUG_OBJECT (pad, "We are flushing, discard buffer %p", buf);
    GST_INPUT_SELECTOR_UNLOCK (sel);
723
    gst_buffer_unref (buf);
724
    res = GST_FLOW_FLUSHING;
725 726 727 728
    goto done;
  }
}

729
static void gst_input_selector_dispose (GObject * object);
Wim Taymans's avatar
Wim Taymans committed
730
static void gst_input_selector_finalize (GObject * object);
731

732 733 734 735
static void gst_input_selector_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_input_selector_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);
736

737
static GstPad *gst_input_selector_request_new_pad (GstElement * element,
738
    GstPadTemplate * templ, const gchar * unused, const GstCaps * caps);
739
static void gst_input_selector_release_pad (GstElement * element, GstPad * pad);
740

741 742
static GstStateChangeReturn gst_input_selector_change_state (GstElement *
    element, GstStateChange transition);
743

744 745
static gboolean gst_input_selector_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
746 747
static gboolean gst_input_selector_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
748 749
static gint64 gst_input_selector_block (GstInputSelector * self);

750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781
/* FIXME: create these marshallers using glib-genmarshal */
static void
gst_input_selector_marshal_INT64__VOID (GClosure * closure,
    GValue * return_value G_GNUC_UNUSED,
    guint n_param_values,
    const GValue * param_values,
    gpointer invocation_hint G_GNUC_UNUSED, gpointer marshal_data)
{
  typedef gint64 (*GMarshalFunc_INT64__VOID) (gpointer data1, gpointer data2);
  register GMarshalFunc_INT64__VOID callback;
  register GCClosure *cc = (GCClosure *) closure;
  register gpointer data1, data2;
  gint64 v_return;

  g_return_if_fail (return_value != NULL);
  g_return_if_fail (n_param_values == 1);

  if (G_CCLOSURE_SWAP_DATA (closure)) {
    data1 = closure->data;
    data2 = g_value_peek_pointer (param_values + 0);
  } else {
    data1 = g_value_peek_pointer (param_values + 0);
    data2 = closure->data;
  }
  callback =
      (GMarshalFunc_INT64__VOID) (marshal_data ? marshal_data : cc->callback);

  v_return = callback (data1, data2);

  g_value_set_int64 (return_value, v_return);
}

782
#define _do_init \
783
    GST_DEBUG_CATEGORY_INIT (input_selector_debug, \
784
        "input-selector", 0, "An input stream selector element");
785 786 787
#define gst_input_selector_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstInputSelector, gst_input_selector, GST_TYPE_ELEMENT,
    _do_init);
788 789 790 791 792 793 794

static void
gst_input_selector_class_init (GstInputSelectorClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

795
  gobject_class->dispose = gst_input_selector_dispose;
Wim Taymans's avatar
Wim Taymans committed
796
  gobject_class->finalize = gst_input_selector_finalize;
797

798 799
  gobject_class->set_property = gst_input_selector_set_property;
  gobject_class->get_property = gst_input_selector_get_property;
800 801 802

  g_object_class_install_property (gobject_class, PROP_N_PADS,
      g_param_spec_uint ("n-pads", "Number of Pads",
803 804
          "The number of sink pads", 0, G_MAXUINT, 0,
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
805

806
  g_object_class_install_property (gobject_class, PROP_ACTIVE_PAD,
807
      g_param_spec_object ("active-pad", "Active pad",
808 809
          "The currently active sink pad", GST_TYPE_PAD,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
810

811 812 813 814 815 816 817 818
  /**
   * GstInputSelector:sync-streams
   *
   * If set to %TRUE all inactive streams will be synced to the
   * running time of the active stream. This makes sure that no
   * buffers are dropped by input-selector that might be needed
   * when switching the active pad.
   *
819
   * Since: 0.10.36
820 821 822 823 824 825
   */
  g_object_class_install_property (gobject_class, PROP_SYNC_STREAMS,
      g_param_spec_boolean ("sync-streams", "Sync Streams",
          "Synchronize inactive streams to the running time of the active stream",
          DEFAULT_SYNC_STREAMS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

826 827 828 829 830 831 832 833 834
  /**
   * GstInputSelector::block:
   * @inputselector: the #GstInputSelector
   *
   * Block all sink pads in preparation for a switch. Returns the stop time of
   * the current switch segment, as a running time, or 0 if there is no current
   * active pad or the current active pad never received data.
   */
  gst_input_selector_signals[SIGNAL_BLOCK] =
835 836 837
      g_signal_new ("block", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
      G_STRUCT_OFFSET (GstInputSelectorClass, block), NULL, NULL,
838
      gst_input_selector_marshal_INT64__VOID, G_TYPE_INT64, 0);
839

840 841 842 843 844 845 846 847 848 849
  gst_element_class_set_details_simple (gstelement_class, "Input selector",
      "Generic", "N-to-1 input stream selector",
      "Julien Moutte <julien@moutte.net>, "
      "Jan Schmidt <thaytan@mad.scientist.com>, "
      "Wim Taymans <wim.taymans@gmail.com>");
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&gst_input_selector_sink_factory));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&gst_input_selector_src_factory));

850 851 852
  gstelement_class->request_new_pad = gst_input_selector_request_new_pad;
  gstelement_class->release_pad = gst_input_selector_release_pad;
  gstelement_class->change_state = gst_input_selector_change_state;
853 854 855 856 857

  klass->block = GST_DEBUG_FUNCPTR (gst_input_selector_block);
}

static void
858
gst_input_selector_init (GstInputSelector * sel)
859 860
{
  sel->srcpad = gst_pad_new ("src", GST_PAD_SRC);
861 862
  gst_pad_set_iterate_internal_links_function (sel->srcpad,
      GST_DEBUG_FUNCPTR (gst_selector_pad_iterate_linked_pads));
863 864
  gst_pad_set_query_function (sel->srcpad,
      GST_DEBUG_FUNCPTR (gst_input_selector_query));
865 866
  gst_pad_set_event_function (sel->srcpad,
      GST_DEBUG_FUNCPTR (gst_input_selector_event));
867
  GST_OBJECT_FLAG_SET (sel->srcpad, GST_PAD_FLAG_PROXY_CAPS);
868 869 870
  gst_element_add_pad (GST_ELEMENT (sel), sel->srcpad);
  /* sinkpad management */
  sel->active_sinkpad = NULL;
871
  sel->padcount = 0;
872
  sel->sync_streams = DEFAULT_SYNC_STREAMS;
873

Wim Taymans's avatar
Wim Taymans committed
874 875
  g_mutex_init (&sel->lock);
  g_cond_init (&sel->cond);
876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
  sel->blocked = FALSE;
}

static void
gst_input_selector_dispose (GObject * object)
{
  GstInputSelector *sel = GST_INPUT_SELECTOR (object);

  if (sel->active_sinkpad) {
    gst_object_unref (sel->active_sinkpad);
    sel->active_sinkpad = NULL;
  }
  G_OBJECT_CLASS (parent_class)->dispose (object);
}

Wim Taymans's avatar
Wim Taymans committed
891 892 893 894 895 896 897 898 899 900 901
static void
gst_input_selector_finalize (GObject * object)
{
  GstInputSelector *sel = GST_INPUT_SELECTOR (object);

  g_mutex_clear (&sel->lock);
  g_cond_clear (&sel->cond);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

902 903 904
/* this function must be called with the SELECTOR_LOCK. It returns TRUE when the
 * active pad changed. */
static gboolean
905
gst_input_selector_set_active_pad (GstInputSelector * self, GstPad * pad)
906 907 908 909 910
{
  GstSelectorPad *old, *new;
  GstPad **active_pad_p;

  if (pad == self->active_sinkpad)
911
    return FALSE;
912 913 914 915

  old = GST_SELECTOR_PAD_CAST (self->active_sinkpad);
  new = GST_SELECTOR_PAD_CAST (pad);

916 917 918
  GST_DEBUG_OBJECT (self, "setting active pad to %s:%s",
      GST_DEBUG_PAD_NAME (new));

919 920 921 922
  if (old)
    old->pushed = FALSE;
  if (new)
    new->pushed = FALSE;
923

924 925 926 927
  /* Send a new SEGMENT event on the new pad next */
  if (old != new && new)
    new->segment_pending = TRUE;

928 929
  active_pad_p = &self->active_sinkpad;
  gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));
930 931 932 933 934 935

  /* Wake up all non-active pads in sync mode, they might be
   * the active pad now */
  if (self->sync_streams)
    GST_INPUT_SELECTOR_BROADCAST (self);

936 937 938
  GST_DEBUG_OBJECT (self, "New active pad is %" GST_PTR_FORMAT,
      self->active_sinkpad);

939
  return TRUE;
940 941 942 943 944 945 946 947 948 949
}

static void
gst_input_selector_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstInputSelector *sel = GST_INPUT_SELECTOR (object);

  switch (prop_id) {
    case PROP_ACTIVE_PAD:
950 951 952 953 954
    {
      GstPad *pad;

      pad = g_value_get_object (value);

955
      GST_INPUT_SELECTOR_LOCK (sel);
956
      gst_input_selector_set_active_pad (sel, pad);
957
      GST_INPUT_SELECTOR_UNLOCK (sel);
958
      break;
959
    }
960 961 962 963 964 965 966
    case PROP_SYNC_STREAMS:
    {
      GST_INPUT_SELECTOR_LOCK (sel);
      sel->sync_streams = g_value_get_boolean (value);
      GST_INPUT_SELECTOR_UNLOCK (sel);
      break;
    }
967 968 969 970 971 972 973 974 975 976 977 978 979
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_input_selector_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstInputSelector *sel = GST_INPUT_SELECTOR (object);

  switch (prop_id) {
980
    case PROP_N_PADS:
981
      GST_INPUT_SELECTOR_LOCK (object);
982
      g_value_set_uint (value, sel->n_pads);
983
      GST_INPUT_SELECTOR_UNLOCK (object);
984 985
      break;
    case PROP_ACTIVE_PAD:
986
      GST_INPUT_SELECTOR_LOCK (object);
987
      g_value_set_object (value, sel->active_sinkpad);
988
      GST_INPUT_SELECTOR_UNLOCK (object);
989
      break;
990 991 992 993 994
    case PROP_SYNC_STREAMS:
      GST_INPUT_SELECTOR_LOCK (object);
      g_value_set_boolean (value, sel->sync_streams);
      GST_INPUT_SELECTOR_UNLOCK (object);
      break;
995 996 997 998 999 1000 1001
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static GstPad *
1002 1003
gst_input_selector_get_linked_pad (GstInputSelector * sel, GstPad * pad,
    gboolean strict)
1004 1005 1006
{
  GstPad *otherpad = NULL;

1007
  GST_INPUT_SELECTOR_LOCK (sel);
1008 1009 1010 1011 1012 1013
  if (pad == sel->srcpad)
    otherpad = sel->active_sinkpad;
  else if (pad == sel->active_sinkpad || !strict)
    otherpad = sel->srcpad;
  if (otherpad)
    gst_object_ref (otherpad);
1014 1015
  GST_INPUT_SELECTOR_UNLOCK (sel);

1016 1017 1018
  return otherpad;
}

1019
static gboolean
1020
gst_input_selector_event (GstPad * pad, GstObject * parent, GstEvent * event)
1021
{
1022
  GstInputSelector *sel;
1023 1024 1025 1026 1027 1028
  gboolean result = FALSE;
  GstIterator *iter;
  gboolean done = FALSE;
  GValue item = { 0, };
  GstPad *eventpad;
  GList *pushed_pads = NULL;
1029

1030
  sel = GST_INPUT_SELECTOR (parent);
1031 1032
  /* Send upstream events to all sinkpads */
  iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (sel));
1033

1034 1035 1036 1037 1038 1039
  /* This is now essentially a copy of gst_pad_event_default_dispatch
   * with a different iterator */
  while (!done) {
    switch (gst_iterator_next (iter, &item)) {
      case GST_ITERATOR_OK:
        eventpad = g_value_get_object (&item);
1040

1041 1042 1043 1044 1045 1046
        /* if already pushed,  skip */
        if (g_list_find (pushed_pads, eventpad)) {
          g_value_reset (&item);
          break;
        }

1047 1048
        gst_event_ref (event);
        result |= gst_pad_push_event (eventpad, event);
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058

        g_value_reset (&item);
        break;
      case GST_ITERATOR_RESYNC:
        /* We don't reset the result here because we don't push the event
         * again on pads that got the event already and because we need
         * to consider the result of the previous pushes */
        gst_iterator_resync (iter);
        break;
      case GST_ITERATOR_ERROR:
1059
        GST_ERROR_OBJECT (pad, "Could not iterate over sinkpads");
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
        done = TRUE;
        break;
      case GST_ITERATOR_DONE:
        done = TRUE;
        break;
    }
  }
  g_value_unset (&item);
  gst_iterator_free (iter);

  g_list_free (pushed_pads);

  gst_event_unref (event);

  return result;
1075 1076
}

1077 1078 1079
/* query on the srcpad. We override this function because by default it will
 * only forward the query to one random sinkpad */
static gboolean
1080
gst_input_selector_query (GstPad * pad, GstObject * parent, GstQuery * query)
1081
{
Wim Taymans's avatar
Wim Taymans committed
1082
  gboolean res = FALSE;
1083 1084
  GstInputSelector *sel;