gststreamsynchronizer.c 26.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* GStreamer
 * Copyright (C) 2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "gststreamsynchronizer.h"

GST_DEBUG_CATEGORY_STATIC (stream_synchronizer_debug);
#define GST_CAT_DEFAULT stream_synchronizer_debug

#define GST_STREAM_SYNCHRONIZER_LOCK(obj) G_STMT_START {                   \
    GST_LOG_OBJECT (obj,                                                \
                    "locking from thread %p",                           \
                    g_thread_self ());                                  \
33
    g_mutex_lock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);                \
34
35
36
37
38
39
40
41
42
    GST_LOG_OBJECT (obj,                                                \
                    "locked from thread %p",                            \
                    g_thread_self ());                                  \
} G_STMT_END

#define GST_STREAM_SYNCHRONIZER_UNLOCK(obj) G_STMT_START {                 \
    GST_LOG_OBJECT (obj,                                                \
                    "unlocking from thread %p",                         \
                    g_thread_self ());                                  \
43
    g_mutex_unlock (&GST_STREAM_SYNCHRONIZER_CAST(obj)->lock);              \
44
45
} G_STMT_END

46
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
47
48
49
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS_ANY);
50
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
51
52
53
54
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

55
56
57
#define gst_stream_synchronizer_parent_class parent_class
G_DEFINE_TYPE (GstStreamSynchronizer, gst_stream_synchronizer,
    GST_TYPE_ELEMENT);
58
59
60
61
62
63
64
65
66

typedef struct
{
  GstStreamSynchronizer *transform;
  guint stream_number;
  GstPad *srcpad;
  GstPad *sinkpad;
  GstSegment segment;

67
  gboolean wait;                /* TRUE if waiting/blocking */
68
  gboolean new_stream;
69
  gboolean drop_discont;
70
  gboolean is_eos;              /* TRUE if EOS was received */
71
  gboolean seen_data;
72

73
  GCond stream_finish_cond;
74

75
76
77
78
  /* seqnum of the previously received STREAM_START
   * default: G_MAXUINT32 */
  guint32 stream_start_seqnum;
  guint32 segment_seqnum;
79
80
81
} GstStream;

/* Must be called with lock! */
82
static inline GstPad *
83
84
85
86
gst_stream_get_other_pad (GstStream * stream, GstPad * pad)
{
  if (stream->sinkpad == pad)
    return gst_object_ref (stream->srcpad);
87
  if (stream->srcpad == pad)
88
89
90
91
92
93
    return gst_object_ref (stream->sinkpad);

  return NULL;
}

static GstPad *
94
gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
{
  GstStream *stream;
  GstPad *opad = NULL;

  GST_STREAM_SYNCHRONIZER_LOCK (self);
  stream = gst_pad_get_element_private (pad);
  if (!stream)
    goto out;

  opad = gst_stream_get_other_pad (stream, pad);

out:
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);

  if (!opad)
    GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");

  return opad;
}

/* Generic pad functions */
static GstIterator *
Wim Taymans's avatar
Wim Taymans committed
117
118
gst_stream_synchronizer_iterate_internal_links (GstPad * pad,
    GstObject * parent)
119
120
121
122
{
  GstIterator *it = NULL;
  GstPad *opad;

123
124
  opad =
      gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
125
  if (opad) {
126
127
128
129
130
131
    GValue value = { 0, };

    g_value_init (&value, GST_TYPE_PAD);
    g_value_set_object (&value, opad);
    it = gst_iterator_new_single (GST_TYPE_PAD, &value);
    g_value_unset (&value);
132
133
134
135
136
137
138
    gst_object_unref (opad);
  }

  return it;
}

static gboolean
Wim Taymans's avatar
Wim Taymans committed
139
140
gst_stream_synchronizer_query (GstPad * pad, GstObject * parent,
    GstQuery * query)
141
142
143
144
{
  GstPad *opad;
  gboolean ret = FALSE;

145
  GST_LOG_OBJECT (pad, "Handling query %s", GST_QUERY_TYPE_NAME (query));
146

147
148
  opad =
      gst_stream_get_other_pad_from_pad (GST_STREAM_SYNCHRONIZER (parent), pad);
149
150
151
152
153
154
155
156
157
158
  if (opad) {
    ret = gst_pad_peer_query (opad, query);
    gst_object_unref (opad);
  }

  return ret;
}

/* srcpad functions */
static gboolean
Wim Taymans's avatar
Wim Taymans committed
159
160
gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event)
161
{
Wim Taymans's avatar
Wim Taymans committed
162
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
163
164
165
166
  GstPad *opad;
  gboolean ret = FALSE;

  GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
167
      GST_EVENT_TYPE_NAME (event), event);
168
169
170
171
172
173

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_QOS:{
      gdouble proportion;
      GstClockTimeDiff diff;
      GstClockTime timestamp;
174
      gint64 running_time_diff = -1;
175
176
      GstStream *stream;

Wim Taymans's avatar
Wim Taymans committed
177
      gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
178
179
180
181
182
      gst_event_unref (event);

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
      if (stream)
183
        running_time_diff = stream->segment.base;
184
185
186
187
188
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);

      if (running_time_diff == -1) {
        GST_WARNING_OBJECT (pad, "QOS event before group start");
        goto out;
189
190
      }
      if (timestamp < running_time_diff) {
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
        GST_DEBUG_OBJECT (pad, "QOS event from previous group");
        goto out;
      }

      GST_LOG_OBJECT (pad,
          "Adjusting QOS event: %" GST_TIME_FORMAT " - %" GST_TIME_FORMAT " = %"
          GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
          GST_TIME_ARGS (running_time_diff),
          GST_TIME_ARGS (timestamp - running_time_diff));

      timestamp -= running_time_diff;

      /* That case is invalid for QoS events */
      if (diff < 0 && -diff > timestamp) {
        GST_DEBUG_OBJECT (pad, "QOS event from previous group");
        ret = TRUE;
        goto out;
      }

Wim Taymans's avatar
Wim Taymans committed
210
211
212
      event =
          gst_event_new_qos (GST_QOS_TYPE_UNDERFLOW, proportion, diff,
          timestamp);
213
214
215
216
217
218
      break;
    }
    default:
      break;
  }

219
  opad = gst_stream_get_other_pad_from_pad (self, pad);
220
221
222
223
224
225
226
227
228
229
230
  if (opad) {
    ret = gst_pad_push_event (opad, event);
    gst_object_unref (opad);
  }

out:
  return ret;
}

/* sinkpad functions */
static gboolean
Wim Taymans's avatar
Wim Taymans committed
231
232
gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event)
233
{
Wim Taymans's avatar
Wim Taymans committed
234
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
235
236
237
238
  GstPad *opad;
  gboolean ret = FALSE;

  GST_LOG_OBJECT (pad, "Handling event %s: %" GST_PTR_FORMAT,
239
      GST_EVENT_TYPE_NAME (event), event);
240
241

  switch (GST_EVENT_TYPE (event)) {
242
243
244
    case GST_EVENT_STREAM_START:
    {
      GstStream *stream;
245
246
247
      guint32 seqnum = gst_event_get_seqnum (event);
      GList *l;
      gboolean all_wait = TRUE;
248
249
250

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
251
      if (stream && stream->stream_start_seqnum != seqnum) {
252
253

        GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
254

255
256
257
        stream->is_eos = FALSE;
        stream->wait = TRUE;
        stream->new_stream = TRUE;
258

259
260
        for (l = self->streams; l; l = l->next) {
          GstStream *ostream = l->data;
261

262
263
264
265
266
267
          all_wait = all_wait && ostream->wait;
          if (!all_wait)
            break;
        }
        if (all_wait) {
          gint64 position = 0;
268

269
          GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
270
271
272

          for (l = self->streams; l; l = l->next) {
            GstStream *ostream = l->data;
273
274
275
276
277
            gint64 stop_running_time;
            gint64 position_running_time;

            ostream->wait = FALSE;

278
279
280
281
282
283
284
285
286
287
288
            if (ostream->segment.format == GST_FORMAT_TIME) {
              stop_running_time =
                  gst_segment_to_running_time (&ostream->segment,
                  GST_FORMAT_TIME, ostream->segment.stop);
              position_running_time =
                  gst_segment_to_running_time (&ostream->segment,
                  GST_FORMAT_TIME, ostream->segment.position);
              position =
                  MAX (position, MAX (stop_running_time,
                      position_running_time));
            }
289
          }
290
291
292
293
294
295
          position = MAX (0, position);
          self->group_start_time = MAX (self->group_start_time, position);

          GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
              GST_TIME_ARGS (self->group_start_time));

296
297
          for (l = self->streams; l; l = l->next) {
            GstStream *ostream = l->data;
298
            g_cond_broadcast (&ostream->stream_finish_cond);
299
          }
300
        }
301
302
      } else
        GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
303
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
304
    }
305
      break;
Wim Taymans's avatar
Wim Taymans committed
306
    case GST_EVENT_SEGMENT:{
307
      GstStream *stream;
Wim Taymans's avatar
Wim Taymans committed
308
      GstSegment segment;
309

310
      gst_event_copy_segment (event, &segment);
311
312
313
314
315
316

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
      if (stream) {
        if (stream->wait) {
          GST_DEBUG_OBJECT (pad, "Stream %d is waiting", stream->stream_number);
317
          g_cond_wait (&stream->stream_finish_cond, &self->lock);
318
319
320
321
322
323
          stream = gst_pad_get_element_private (pad);
          if (stream)
            stream->wait = FALSE;
        }
      }

324
325
326
327
328
329
      if (self->shutdown) {
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
        gst_event_unref (event);
        goto done;
      }

Wim Taymans's avatar
Wim Taymans committed
330
      if (stream && segment.format == GST_FORMAT_TIME) {
331
332
        if (stream->new_stream) {
          stream->new_stream = FALSE;
333
          stream->drop_discont = TRUE;
334
          segment.base = self->group_start_time;
335
336
337
338
        }

        GST_DEBUG_OBJECT (pad, "Segment was: %" GST_SEGMENT_FORMAT,
            &stream->segment);
Wim Taymans's avatar
Wim Taymans committed
339
        gst_segment_copy_into (&segment, &stream->segment);
340
341
        GST_DEBUG_OBJECT (pad, "Segment now is: %" GST_SEGMENT_FORMAT,
            &stream->segment);
342
        stream->segment_seqnum = gst_event_get_seqnum (event);
343
344

        GST_DEBUG_OBJECT (pad, "Stream start running time: %" GST_TIME_FORMAT,
Wim Taymans's avatar
Wim Taymans committed
345
            GST_TIME_ARGS (stream->segment.base));
346
347
348
349
        {
          GstEvent *tmpev;

          tmpev = gst_event_new_segment (&stream->segment);
350
          gst_event_set_seqnum (tmpev, stream->segment_seqnum);
351
352
353
354
          gst_event_unref (event);
          event = tmpev;
        }

355
      } else if (stream) {
356
        GST_WARNING_OBJECT (pad, "Non-TIME segment: %s",
Wim Taymans's avatar
Wim Taymans committed
357
            gst_format_get_name (segment.format));
358
359
360
361
362
        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
      }
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
      break;
    }
363
364
365
366
367
368
369
    case GST_EVENT_FLUSH_START:{
      GstStream *stream;

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
      if (stream) {
        GST_DEBUG_OBJECT (pad, "Flushing streams");
370
        g_cond_broadcast (&stream->stream_finish_cond);
371
372
373
374
      }
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
      break;
    }
375
376
377
378
379
380
381
382
383
    case GST_EVENT_FLUSH_STOP:{
      GstStream *stream;

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
      if (stream) {
        GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
            stream->stream_number);
        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
384

385
        stream->is_eos = FALSE;
386
387
388
        stream->wait = FALSE;
        stream->new_stream = FALSE;
        stream->drop_discont = FALSE;
389
        stream->seen_data = FALSE;
390
        g_cond_broadcast (&stream->stream_finish_cond);
391
392
393
394
      }
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
      break;
    }
395
396
397
398
    case GST_EVENT_EOS:{
      GstStream *stream;
      GList *l;
      gboolean all_eos = TRUE;
399
      gboolean seen_data;
400
      GSList *pads = NULL;
401
      GstPad *srcpad;
402
403
404

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      stream = gst_pad_get_element_private (pad);
405
406
407
408
      if (!stream) {
        GST_STREAM_SYNCHRONIZER_UNLOCK (self);
        GST_WARNING_OBJECT (pad, "EOS for unknown stream");
        break;
409
410
      }

411
412
413
      GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
      stream->is_eos = TRUE;

414
415
416
      seen_data = stream->seen_data;
      srcpad = gst_object_ref (stream->srcpad);

417
418
419
420
421
422
423
424
425
426
427
428
      for (l = self->streams; l; l = l->next) {
        GstStream *ostream = l->data;

        all_eos = all_eos && ostream->is_eos;
        if (!all_eos)
          break;
      }

      if (all_eos) {
        GST_DEBUG_OBJECT (self, "All streams are EOS -- forwarding");
        for (l = self->streams; l; l = l->next) {
          GstStream *ostream = l->data;
429
430
431
          /* local snapshot of current pads */
          gst_object_ref (ostream->srcpad);
          pads = g_slist_prepend (pads, ostream->srcpad);
432
433
434
        }
      }
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
435
436
437
438
      /* drop lock when sending eos, which may block in e.g. preroll */
      if (pads) {
        GstPad *pad;
        GSList *epad;
439

440
441
442
443
444
445
446
447
448
449
        ret = TRUE;
        epad = pads;
        while (epad) {
          pad = epad->data;
          GST_DEBUG_OBJECT (pad, "Pushing EOS");
          ret = ret && gst_pad_push_event (pad, gst_event_new_eos ());
          gst_object_unref (pad);
          epad = g_slist_next (epad);
        }
        g_slist_free (pads);
450
451
452
453
      } else {
        /* if EOS, but no data has passed, then send something to replace EOS
         * for preroll purposes */
        if (!seen_data) {
454
          GstEvent *event;
455

456
457
          event = gst_event_new_gap (0, 0);
          gst_pad_push_event (srcpad, event);
458
        }
459
      }
460
      gst_object_unref (srcpad);
461
462
463
      goto done;
      break;
    }
464
465
466
467
    default:
      break;
  }

468
  opad = gst_stream_get_other_pad_from_pad (self, pad);
469
470
471
472
473
  if (opad) {
    ret = gst_pad_push_event (opad, event);
    gst_object_unref (opad);
  }

474
done:
475
476
477
478
479

  return ret;
}

static GstFlowReturn
Wim Taymans's avatar
Wim Taymans committed
480
481
gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buffer)
482
{
Wim Taymans's avatar
Wim Taymans committed
483
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (parent);
484
485
486
487
488
489
  GstPad *opad;
  GstFlowReturn ret = GST_FLOW_ERROR;
  GstStream *stream;
  GstClockTime timestamp = GST_CLOCK_TIME_NONE;
  GstClockTime timestamp_end = GST_CLOCK_TIME_NONE;

Josep Torra's avatar
Josep Torra committed
490
491
  GST_LOG_OBJECT (pad, "Handling buffer %p: size=%" G_GSIZE_FORMAT
      ", timestamp=%" GST_TIME_FORMAT " duration=%" GST_TIME_FORMAT
492
      " offset=%" G_GUINT64_FORMAT " offset_end=%" G_GUINT64_FORMAT,
493
      buffer, gst_buffer_get_size (buffer),
494
495
496
497
498
499
500
501
502
503
504
505
      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)),
      GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET_END (buffer));

  timestamp = GST_BUFFER_TIMESTAMP (buffer);
  if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)
      && GST_BUFFER_DURATION_IS_VALID (buffer))
    timestamp_end = timestamp + GST_BUFFER_DURATION (buffer);

  GST_STREAM_SYNCHRONIZER_LOCK (self);
  stream = gst_pad_get_element_private (pad);

506
  if (stream) {
507
    stream->seen_data = TRUE;
508
509
510
511
512
    if (stream->drop_discont) {
      buffer = gst_buffer_make_writable (buffer);
      GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
      stream->drop_discont = FALSE;
    }
513

514
515
516
517
518
519
520
    if (stream->segment.format == GST_FORMAT_TIME
        && GST_CLOCK_TIME_IS_VALID (timestamp)) {
      GST_LOG_OBJECT (pad,
          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
          GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
      stream->segment.position = timestamp;
    }
521
522
523
  }
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);

524
  opad = gst_stream_get_other_pad_from_pad (self, pad);
525
526
527
528
529
530
531
  if (opad) {
    ret = gst_pad_push (opad, buffer);
    gst_object_unref (opad);
  }

  GST_LOG_OBJECT (pad, "Push returned: %s", gst_flow_get_name (ret));
  if (ret == GST_FLOW_OK) {
532
533
    GList *l;

534
535
536
537
538
    GST_STREAM_SYNCHRONIZER_LOCK (self);
    stream = gst_pad_get_element_private (pad);
    if (stream && stream->segment.format == GST_FORMAT_TIME
        && GST_CLOCK_TIME_IS_VALID (timestamp_end)) {
      GST_LOG_OBJECT (pad,
539
          "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
Wim Taymans's avatar
Wim Taymans committed
540
          GST_TIME_ARGS (stream->segment.position),
541
          GST_TIME_ARGS (timestamp_end));
Wim Taymans's avatar
Wim Taymans committed
542
      stream->segment.position = timestamp_end;
543
    }
544
545
546
547
548

    /* Advance EOS streams if necessary. For non-EOS
     * streams the demuxers should already do this! */
    for (l = self->streams; l; l = l->next) {
      GstStream *ostream = l->data;
Wim Taymans's avatar
Wim Taymans committed
549
      gint64 position;
550
551
552
553

      if (!ostream->is_eos || ostream->segment.format != GST_FORMAT_TIME)
        continue;

Wim Taymans's avatar
Wim Taymans committed
554
555
      if (ostream->segment.position != -1)
        position = ostream->segment.position;
556
      else
Wim Taymans's avatar
Wim Taymans committed
557
        position = ostream->segment.start;
558
559

      /* Is there a 1 second lag? */
Wim Taymans's avatar
Wim Taymans committed
560
      if (position != -1 && position + GST_SECOND < timestamp_end) {
561
        gint64 new_start, new_stop;
562
563

        new_start = timestamp_end - GST_SECOND;
564
565
566
567
        if (ostream->segment.stop == -1)
          new_stop = -1;
        else
          new_stop = MAX (new_start, ostream->segment.stop);
568
569

        GST_DEBUG_OBJECT (ostream->sinkpad,
570
            "Advancing stream %u from %" GST_TIME_FORMAT " to %"
Wim Taymans's avatar
Wim Taymans committed
571
            GST_TIME_FORMAT, ostream->stream_number, GST_TIME_ARGS (position),
572
            GST_TIME_ARGS (new_start));
573

Wim Taymans's avatar
Wim Taymans committed
574
575
576
577
578
        ostream->segment.start = new_start;
        ostream->segment.stop = new_stop;
        ostream->segment.time = new_start;
        ostream->segment.position = new_start;

579
        gst_pad_push_event (ostream->srcpad,
Wim Taymans's avatar
Wim Taymans committed
580
            gst_event_new_segment (&ostream->segment));
581
582
      }
    }
583
584
585
586
587
588
589
590
591
    GST_STREAM_SYNCHRONIZER_UNLOCK (self);
  }

  return ret;
}

/* GstElement vfuncs */
static GstPad *
gst_stream_synchronizer_request_new_pad (GstElement * element,
592
    GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
593
594
595
596
597
598
599
600
601
602
603
604
{
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
  GstStream *stream;
  gchar *tmp;

  GST_STREAM_SYNCHRONIZER_LOCK (self);
  GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
      self->current_stream_number);

  stream = g_slice_new0 (GstStream);
  stream->transform = self;
  stream->stream_number = self->current_stream_number;
605
606
607
  g_cond_init (&stream->stream_finish_cond);
  stream->stream_start_seqnum = G_MAXUINT32;
  stream->segment_seqnum = G_MAXUINT32;
608

609
  tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
610
611
612
613
614
615
616
617
618
619
620
621
  stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
  g_free (tmp);
  gst_pad_set_element_private (stream->sinkpad, stream);
  gst_pad_set_iterate_internal_links_function (stream->sinkpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
  gst_pad_set_query_function (stream->sinkpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
  gst_pad_set_event_function (stream->sinkpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_event));
  gst_pad_set_chain_function (stream->sinkpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_sink_chain));

622
  tmp = g_strdup_printf ("src_%u", self->current_stream_number);
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
  stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
  g_free (tmp);
  gst_pad_set_element_private (stream->srcpad, stream);
  gst_pad_set_iterate_internal_links_function (stream->srcpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
  gst_pad_set_query_function (stream->srcpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_query));
  gst_pad_set_event_function (stream->srcpad,
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_src_event));

  gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);

  self->streams = g_list_prepend (self->streams, stream);
  self->current_stream_number++;
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);

  /* Add pads and activate unless we're going to NULL */
Wim Taymans's avatar
Wim Taymans committed
640
  g_rec_mutex_lock (GST_STATE_GET_LOCK (self));
641
642
643
644
645
646
  if (GST_STATE_TARGET (self) != GST_STATE_NULL) {
    gst_pad_set_active (stream->srcpad, TRUE);
    gst_pad_set_active (stream->sinkpad, TRUE);
  }
  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad);
  gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
Wim Taymans's avatar
Wim Taymans committed
647
  g_rec_mutex_unlock (GST_STATE_GET_LOCK (self));
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668

  return stream->sinkpad;
}

/* Must be called with lock! */
static void
gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
    GstStream * stream)
{
  GList *l;

  GST_DEBUG_OBJECT (self, "Releasing stream %d", stream->stream_number);

  for (l = self->streams; l; l = l->next) {
    if (l->data == stream) {
      self->streams = g_list_delete_link (self->streams, l);
      break;
    }
  }
  g_assert (l != NULL);

669
670
671
672
673
  /* we can drop the lock, since stream exists now only local.
   * Moreover, we should drop, to prevent deadlock with STREAM_LOCK
   * (due to reverse lock order) when deactivating pads */
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);

674
675
676
677
  gst_pad_set_element_private (stream->srcpad, NULL);
  gst_pad_set_element_private (stream->sinkpad, NULL);
  gst_pad_set_active (stream->srcpad, FALSE);
  gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
678
  gst_pad_set_active (stream->sinkpad, FALSE);
679
680
681
682
  gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);

  if (stream->segment.format == GST_FORMAT_TIME) {
    gint64 stop_running_time;
Wim Taymans's avatar
Wim Taymans committed
683
    gint64 position_running_time;
684
685
686
687

    stop_running_time =
        gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
        stream->segment.stop);
Wim Taymans's avatar
Wim Taymans committed
688
    position_running_time =
689
        gst_segment_to_running_time (&stream->segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
690
691
        stream->segment.position);
    stop_running_time = MAX (stop_running_time, position_running_time);
692

693
694
695
696
697
    if (stop_running_time > self->group_start_time) {
      GST_DEBUG_OBJECT (stream->sinkpad,
          "Updating global start running time from %" GST_TIME_FORMAT " to %"
          GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
          GST_TIME_ARGS (stop_running_time));
698

699
700
      self->group_start_time = stop_running_time;
    }
701
702
  }

703
  g_cond_clear (&stream->stream_finish_cond);
704
  g_slice_free (GstStream, stream);
705
706
707
708
709
710
711
712
713

  /* NOTE: In theory we have to check here if all streams
   * are EOS but the one that was removed wasn't and then
   * send EOS downstream. But due to the way how playsink
   * works this is not necessary and will only cause problems
   * for gapless playback. playsink will only add/remove pads
   * when it's reconfigured, which happens when the streams
   * change
   */
714
715
716

  /* lock for good measure, since the caller had it */
  GST_STREAM_SYNCHRONIZER_LOCK (self);
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
}

static void
gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
{
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
  GstStream *stream;

  GST_STREAM_SYNCHRONIZER_LOCK (self);
  stream = gst_pad_get_element_private (pad);
  if (stream) {
    g_assert (stream->sinkpad == pad);

    gst_stream_synchronizer_release_stream (self, stream);
  }
  GST_STREAM_SYNCHRONIZER_UNLOCK (self);
}

static GstStateChangeReturn
gst_stream_synchronizer_change_state (GstElement * element,
    GstStateChange transition)
{
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
740
  GstStateChangeReturn ret;
741
742
743
744

  switch (transition) {
    case GST_STATE_CHANGE_NULL_TO_READY:
      GST_DEBUG_OBJECT (self, "State change NULL->READY");
745
      self->shutdown = FALSE;
746
747
748
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
749
      self->group_start_time = 0;
750
      self->shutdown = FALSE;
751
      break;
752
753
754
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
      GList *l;

755
756
757
      GST_DEBUG_OBJECT (self, "State change READY->NULL");

      GST_STREAM_SYNCHRONIZER_LOCK (self);
758
759
      for (l = self->streams; l; l = l->next) {
        GstStream *ostream = l->data;
760
        g_cond_broadcast (&ostream->stream_finish_cond);
761
      }
762
763
      self->shutdown = TRUE;
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
764
    }
765
766
767
768
    default:
      break;
  }

769
770
771
772
  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
  GST_DEBUG_OBJECT (self, "Base class state changed returned: %d", ret);
  if (G_UNLIKELY (ret != GST_STATE_CHANGE_SUCCESS))
    return ret;
773
774

  switch (transition) {
775
776
777
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
      GList *l;

778
779
      GST_DEBUG_OBJECT (self, "State change PAUSED->READY");
      self->group_start_time = 0;
780
781
782
783
784
785
786
787
788

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      for (l = self->streams; l; l = l->next) {
        GstStream *stream = l->data;

        gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
        stream->wait = FALSE;
        stream->new_stream = FALSE;
        stream->drop_discont = FALSE;
789
        stream->is_eos = FALSE;
790
791
      }
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
792
      break;
793
    }
794
795
    case GST_STATE_CHANGE_READY_TO_NULL:{
      GST_DEBUG_OBJECT (self, "State change READY->NULL");
796
797
798
799

      GST_STREAM_SYNCHRONIZER_LOCK (self);
      self->current_stream_number = 0;
      GST_STREAM_SYNCHRONIZER_UNLOCK (self);
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
      break;
    }
    default:
      break;
  }

  return ret;
}

/* GObject vfuncs */
static void
gst_stream_synchronizer_finalize (GObject * object)
{
  GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (object);

815
  g_mutex_clear (&self->lock);
816
817
818
819
820
821

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

/* GObject type initialization */
static void
822
gst_stream_synchronizer_init (GstStreamSynchronizer * self)
823
{
824
  g_mutex_init (&self->lock);
825
826
827
828
829
830
831
832
833
834
835
836
837
}

static void
gst_stream_synchronizer_class_init (GstStreamSynchronizerClass * klass)
{
  GObjectClass *gobject_class = (GObjectClass *) klass;
  GstElementClass *element_class = (GstElementClass *) klass;

  GST_DEBUG_CATEGORY_INIT (stream_synchronizer_debug,
      "streamsynchronizer", 0, "Stream Synchronizer");

  gobject_class->finalize = gst_stream_synchronizer_finalize;

838
839
840
841
842
  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&srctemplate));
  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&sinktemplate));

843
  gst_element_class_set_static_metadata (element_class,
844
845
846
847
      "Stream Synchronizer", "Generic",
      "Synchronizes a group of streams to have equal durations and starting points",
      "Sebastian Dröge <sebastian.droege@collabora.co.uk>");

848
849
850
851
852
853
854
  element_class->change_state =
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_change_state);
  element_class->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_request_new_pad);
  element_class->release_pad =
      GST_DEBUG_FUNCPTR (gst_stream_synchronizer_release_pad);
}