gstoptimalscheduler.c 65.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* GStreamer
 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
 *                    2000 Wim Taymans <wtay@chello.be>
 *
 * gstscheduler.c: Default scheduling code for most cases
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

23
24
25
26
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

27
28
#include <gst/gst.h>

29
30
31
GST_DEBUG_CATEGORY_STATIC(debug_scheduler);
#define GST_CAT_DEFAULT debug_scheduler

32
33
34
35
36
37
#ifdef USE_COTHREADS
# include "cothreads_compat.h"
#else
# define COTHREADS_NAME_CAPITAL ""
# define COTHREADS_NAME 	""
#endif
38

39
#define GST_ELEMENT_SCHED_CONTEXT(elem)		((GstOptSchedulerCtx*) (GST_ELEMENT (elem)->sched_private))
40
#define GST_ELEMENT_SCHED_GROUP(elem)		(GST_ELEMENT_SCHED_CONTEXT (elem)->group)
41
#define GST_PAD_BUFLIST(pad)            	((GList*) (GST_REAL_PAD(pad)->sched_private))
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

#define GST_ELEMENT_COTHREAD_STOPPING			GST_ELEMENT_SCHEDULER_PRIVATE1
#define GST_ELEMENT_IS_COTHREAD_STOPPING(element)	GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
#define GST_ELEMENT_INTERRUPTED				GST_ELEMENT_SCHEDULER_PRIVATE2
#define GST_ELEMENT_IS_INTERRUPTED(element)		GST_FLAG_IS_SET((element), GST_ELEMENT_INTERRUPTED)

typedef struct _GstOptScheduler GstOptScheduler;
typedef struct _GstOptSchedulerClass GstOptSchedulerClass;

#define GST_TYPE_OPT_SCHEDULER \
  (gst_opt_scheduler_get_type())
#define GST_OPT_SCHEDULER(obj) \
  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OPT_SCHEDULER,GstOptScheduler))
#define GST_OPT_SCHEDULER_CLASS(klass) \
  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OPT_SCHEDULER,GstOptSchedulerClass))
#define GST_IS_OPT_SCHEDULER(obj) \
  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OPT_SCHEDULER))
#define GST_IS_OPT_SCHEDULER_CLASS(obj) \
  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OPT_SCHEDULER))

typedef enum {
  GST_OPT_SCHEDULER_STATE_NONE,
  GST_OPT_SCHEDULER_STATE_STOPPED,
  GST_OPT_SCHEDULER_STATE_ERROR,
  GST_OPT_SCHEDULER_STATE_RUNNING,
67
  GST_OPT_SCHEDULER_STATE_INTERRUPTED
68
69
70
} GstOptSchedulerState;

struct _GstOptScheduler {
71
  GstScheduler 		 parent;
72

73
  GstOptSchedulerState	 state;
74

75
#ifdef USE_COTHREADS
76
  cothread_context 	*context;
77
#endif
78
  gint 			 iterations;
79

80
81
82
83
84
  GSList 		*elements;
  GSList 		*chains;

  GList			*runqueue;
  gint			 recursion;
85
86

  gint			 max_recursion;
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
};

struct _GstOptSchedulerClass {
  GstSchedulerClass parent_class;
};

static GType _gst_opt_scheduler_type = 0;

typedef enum {
  GST_OPT_SCHEDULER_CHAIN_DIRTY			= (1 << 1),	
  GST_OPT_SCHEDULER_CHAIN_DISABLED		= (1 << 2),
  GST_OPT_SCHEDULER_CHAIN_RUNNING		= (1 << 3),
} GstOptSchedulerChainFlags;

#define GST_OPT_SCHEDULER_CHAIN_DISABLE(chain) 		((chain)->flags |= GST_OPT_SCHEDULER_CHAIN_DISABLED)
#define GST_OPT_SCHEDULER_CHAIN_ENABLE(chain) 		((chain)->flags &= ~GST_OPT_SCHEDULER_CHAIN_DISABLED)
#define GST_OPT_SCHEDULER_CHAIN_IS_DISABLED(chain) 	((chain)->flags & GST_OPT_SCHEDULER_CHAIN_DISABLED)

typedef struct _GstOptSchedulerChain GstOptSchedulerChain;

struct _GstOptSchedulerChain {
108
109
  gint				 refcount;
  
110
111
112
113
  GstOptScheduler 		*sched;

  GstOptSchedulerChainFlags	 flags;
  
Wim Taymans's avatar
Wim Taymans committed
114
  GSList 			*groups;			/* the groups in this chain */
115
116
117
  gint				 num_groups;
  gint				 num_enabled;
};
118

119
120
121
122
123
124
125
126
127
/* 
 * elements that are scheduled in one cothread 
 */
typedef enum {
  GST_OPT_SCHEDULER_GROUP_DIRTY			= (1 << 1),	/* this group has been modified */
  GST_OPT_SCHEDULER_GROUP_COTHREAD_STOPPING	= (1 << 2),	/* the group's cothread stops after one iteration */
  GST_OPT_SCHEDULER_GROUP_DISABLED		= (1 << 3),	/* this group is disabled */
  GST_OPT_SCHEDULER_GROUP_RUNNING		= (1 << 4),	/* this group is running */
  GST_OPT_SCHEDULER_GROUP_SCHEDULABLE		= (1 << 5),	/* this group is schedulable */
Wim Taymans's avatar
Wim Taymans committed
128
  GST_OPT_SCHEDULER_GROUP_VISITED		= (1 << 6),	/* this group is visited when finding links */
129
130
131
132
133
134
135
} GstOptSchedulerGroupFlags;

typedef enum {
  GST_OPT_SCHEDULER_GROUP_GET			= 1,
  GST_OPT_SCHEDULER_GROUP_LOOP			= 2,
} GstOptSchedulerGroupType;

136
#define GST_OPT_SCHEDULER_GROUP_SET_FLAG(group,flag) 	((group)->flags |= (flag))
Wim Taymans's avatar
Wim Taymans committed
137
#define GST_OPT_SCHEDULER_GROUP_UNSET_FLAG(group,flag) 	((group)->flags &= ~(flag))
138
139
#define GST_OPT_SCHEDULER_GROUP_IS_FLAG_SET(group,flag) ((group)->flags & (flag))

140
141
#define GST_OPT_SCHEDULER_GROUP_DISABLE(group) 		((group)->flags |= GST_OPT_SCHEDULER_GROUP_DISABLED)
#define GST_OPT_SCHEDULER_GROUP_ENABLE(group) 		((group)->flags &= ~GST_OPT_SCHEDULER_GROUP_DISABLED)
Wim Taymans's avatar
Wim Taymans committed
142
#define GST_OPT_SCHEDULER_GROUP_IS_ENABLED(group) 	(!((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED))
143
144
#define GST_OPT_SCHEDULER_GROUP_IS_DISABLED(group) 	((group)->flags & GST_OPT_SCHEDULER_GROUP_DISABLED)

Wim Taymans's avatar
Wim Taymans committed
145

146
typedef struct _GstOptSchedulerGroup GstOptSchedulerGroup;
Wim Taymans's avatar
Wim Taymans committed
147
148
149
150
151
152
153
154
155
156
157
158
typedef struct _GstOptSchedulerGroupLink GstOptSchedulerGroupLink;

/* used to keep track of links with other groups */
struct _GstOptSchedulerGroupLink {
  GstOptSchedulerGroup	*group1;  	/* the group we are linked with */
  GstOptSchedulerGroup	*group2;  	/* the group we are linked with */
  gint			 count;		/* the number of links with the group */
};

#define IS_GROUP_LINK(link, group1, group2)	((link->group1 == group1 && link->group2 == group2) || \
		                                 (link->group2 == group1 && link->group1 == group2))
#define OTHER_GROUP_LINK(link, group)		(link->group1 == group ? link->group2 : link->group1)
159
160
161
162
163
164
165
166

typedef int (*GroupScheduleFunction)	(int argc, char *argv[]);

struct _GstOptSchedulerGroup {
  GstOptSchedulerChain 		*chain;  		/* the chain this group belongs to */
  GstOptSchedulerGroupFlags	 flags;			/* flags for this group */
  GstOptSchedulerGroupType	 type;			/* flags for this group */

167
168
  gint				 refcount;

Wim Taymans's avatar
Wim Taymans committed
169
  GSList 			*elements;		/* elements of this group */
170
171
172
173
  gint				 num_elements;
  gint				 num_enabled;
  GstElement 			*entry;			/* the group's entry point */

Wim Taymans's avatar
Wim Taymans committed
174
  GSList			*group_links;		/* other groups that are linked with this group */
175

176
#ifdef USE_COTHREADS
177
  cothread 			*cothread;		/* the cothread of this group */
178
#else
179
  GroupScheduleFunction 	 schedulefunc;
180
#endif
181
182
183
184
  int				 argc;
  char			       **argv;
};

Wim Taymans's avatar
Wim Taymans committed
185

186
/* some group operations */
Wim Taymans's avatar
Wim Taymans committed
187
static GstOptSchedulerGroup* 	ref_group 			(GstOptSchedulerGroup *group);
188
#ifndef USE_COTHREADS
189
/*
Wim Taymans's avatar
Wim Taymans committed
190
static GstOptSchedulerGroup* 	ref_group_by_count 		(GstOptSchedulerGroup *group, gint count);
191
*/
192
#endif
Wim Taymans's avatar
Wim Taymans committed
193
194
195
196
static GstOptSchedulerGroup* 	unref_group 			(GstOptSchedulerGroup *group);
static void 			destroy_group 			(GstOptSchedulerGroup *group);
static void 			group_element_set_enabled 	(GstOptSchedulerGroup *group, 
							  	 GstElement *element, gboolean enabled);
197

Wim Taymans's avatar
Wim Taymans committed
198
199
static void 			chain_group_set_enabled 	(GstOptSchedulerChain *chain, 
								 GstOptSchedulerGroup *group, gboolean enabled);
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/* 
 * Scheduler private data for an element 
 */
typedef struct _GstOptSchedulerCtx GstOptSchedulerCtx;

typedef enum {
  GST_OPT_SCHEDULER_CTX_DISABLED		= (1 << 1),	/* the element is disabled */
} GstOptSchedulerCtxFlags;

struct _GstOptSchedulerCtx {
  GstOptSchedulerGroup *group;  			/* the group this element belongs to */

  GstOptSchedulerCtxFlags flags;			/* flags for this element */
};

215
216
217
218
enum
{
  ARG_0,
  ARG_ITERATIONS,
219
  ARG_MAX_RECURSION,
220
221
};
 
222
223
224
225

static void 		gst_opt_scheduler_class_init 		(GstOptSchedulerClass *klass);
static void 		gst_opt_scheduler_init 			(GstOptScheduler *scheduler);

226
227
228
229
230
static void 		gst_opt_scheduler_set_property 		(GObject *object, guint prop_id,
		               					 const GValue *value, GParamSpec *pspec);
static void 		gst_opt_scheduler_get_property 		(GObject *object, guint prop_id,
		               					 GValue *value, GParamSpec *pspec);

231
232
233
234
235
236
237
238
static void 		gst_opt_scheduler_dispose 		(GObject *object);

static void 		gst_opt_scheduler_setup 		(GstScheduler *sched);
static void 		gst_opt_scheduler_reset 		(GstScheduler *sched);
static void		gst_opt_scheduler_add_element		(GstScheduler *sched, GstElement *element);
static void     	gst_opt_scheduler_remove_element	(GstScheduler *sched, GstElement *element);
static GstElementStateReturn  
			gst_opt_scheduler_state_transition	(GstScheduler *sched, GstElement *element, gint transition);
239
static void             gst_opt_scheduler_scheduling_change     (GstScheduler *sched, GstElement *element);
240
241
static void 		gst_opt_scheduler_lock_element 		(GstScheduler *sched, GstElement *element);
static void 		gst_opt_scheduler_unlock_element 	(GstScheduler *sched, GstElement *element);
242
static gboolean		gst_opt_scheduler_yield 		(GstScheduler *sched, GstElement *element);
243
244
static gboolean		gst_opt_scheduler_interrupt 		(GstScheduler *sched, GstElement *element);
static void 		gst_opt_scheduler_error	 		(GstScheduler *sched, GstElement *element);
245
static void     	gst_opt_scheduler_pad_link		(GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
246
static void     	gst_opt_scheduler_pad_unlink 		(GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
247
static void	  	gst_opt_scheduler_pad_select 		(GstScheduler *sched, GList *padlist);
248
static GstClockReturn   gst_opt_scheduler_clock_wait        	(GstScheduler *sched, GstElement *element,
249
	                                                         GstClockID id, GstClockTimeDiff *jitter);
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
static GstSchedulerState
			gst_opt_scheduler_iterate    		(GstScheduler *sched);

static void     	gst_opt_scheduler_show  		(GstScheduler *sched);

static GstSchedulerClass *parent_class = NULL;

static GType
gst_opt_scheduler_get_type (void)
{
  if (!_gst_opt_scheduler_type) {
    static const GTypeInfo scheduler_info = {
      sizeof (GstOptSchedulerClass),
      NULL,
      NULL,
      (GClassInitFunc) gst_opt_scheduler_class_init,
      NULL,
      NULL,
      sizeof (GstOptScheduler),
      0,
      (GInstanceInitFunc) gst_opt_scheduler_init,
      NULL
    };

274
275
    _gst_opt_scheduler_type = g_type_register_static (GST_TYPE_SCHEDULER, 
		    "GstOpt"COTHREADS_NAME_CAPITAL"Scheduler", &scheduler_info, 0);
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
  }
  return _gst_opt_scheduler_type;
}

static void
gst_opt_scheduler_class_init (GstOptSchedulerClass *klass)
{
  GObjectClass *gobject_class;
  GstObjectClass *gstobject_class;
  GstSchedulerClass *gstscheduler_class;

  gobject_class = (GObjectClass*)klass;
  gstobject_class = (GstObjectClass*)klass;
  gstscheduler_class = (GstSchedulerClass*)klass;

  parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);

293
294
  gobject_class->set_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_set_property);
  gobject_class->get_property   = GST_DEBUG_FUNCPTR (gst_opt_scheduler_get_property);
295
296
  gobject_class->dispose	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_dispose);

297
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_ITERATIONS,
298
299
    g_param_spec_int ("iterations", "Iterations", 
	    	      "Number of groups to schedule in one iteration (-1 == until EOS/error)",
300
                      -1, G_MAXINT, 1, G_PARAM_READWRITE));
301
302
303
304
305
306
#ifndef USE_COTHREADS
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_RECURSION,
    g_param_spec_int ("max_recursion", "Max recursion", 
	    	      "Maximum number of recursions",
                      1, G_MAXINT, 100, G_PARAM_READWRITE));
#endif
307

308
309
310
311
312
  gstscheduler_class->setup             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_setup);
  gstscheduler_class->reset             = GST_DEBUG_FUNCPTR (gst_opt_scheduler_reset);
  gstscheduler_class->add_element 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_add_element);
  gstscheduler_class->remove_element 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_remove_element);
  gstscheduler_class->state_transition 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_state_transition);
313
  gstscheduler_class->scheduling_change = GST_DEBUG_FUNCPTR (gst_opt_scheduler_scheduling_change);
314
315
316
317
318
  gstscheduler_class->lock_element 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_lock_element);
  gstscheduler_class->unlock_element 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_unlock_element);
  gstscheduler_class->yield	 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_yield);
  gstscheduler_class->interrupt 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_interrupt);
  gstscheduler_class->error	 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_error);
319
  gstscheduler_class->pad_link 		= GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_link);
320
  gstscheduler_class->pad_unlink 	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_unlink);
321
322
323
324
  gstscheduler_class->pad_select	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_pad_select);
  gstscheduler_class->clock_wait	= GST_DEBUG_FUNCPTR (gst_opt_scheduler_clock_wait);
  gstscheduler_class->iterate 		= GST_DEBUG_FUNCPTR (gst_opt_scheduler_iterate);
  gstscheduler_class->show 		= GST_DEBUG_FUNCPTR (gst_opt_scheduler_show);
325
326
327
328
  
#ifdef USE_COTHREADS
  do_cothreads_init(NULL);
#endif
329
330
331
332
333
334
}

static void
gst_opt_scheduler_init (GstOptScheduler *scheduler)
{
  scheduler->elements = NULL;
335
  scheduler->iterations = 1;
336
  scheduler->max_recursion = 100;
337
338
339
340
341
342
343
344
345
}

static void
gst_opt_scheduler_dispose (GObject *object)
{
  G_OBJECT_CLASS (parent_class)->dispose (object);
}

static gboolean
346
plugin_init (GstPlugin *plugin)
347
348
349
{
  GstSchedulerFactory *factory;

350
351
  GST_DEBUG_CATEGORY_INIT (debug_scheduler, "scheduler", 0, "optimal scheduler");

352
#ifdef USE_COTHREADS
353
354
  factory = gst_scheduler_factory_new ("opt"COTHREADS_NAME,
                                       "An optimal scheduler using "COTHREADS_NAME" cothreads",
355
		                      gst_opt_scheduler_get_type());
356
357
358
359
360
#else
  factory = gst_scheduler_factory_new ("opt",
                                       "An optimal scheduler using no cothreads",
		                      gst_opt_scheduler_get_type());
#endif
361
362
363
364
365
366
367
368
369
370

  if (factory != NULL) {
    gst_plugin_add_feature (plugin, GST_PLUGIN_FEATURE (factory));
  }
  else {
    g_warning ("could not register scheduler: optimal");
  }
  return TRUE;
}

371
GST_PLUGIN_DEFINE (
372
373
  GST_VERSION_MAJOR,
  GST_VERSION_MINOR,
374
  "gstopt"COTHREADS_NAME"scheduler",
375
376
377
378
379
380
381
  "An optimal scheduler using "COTHREADS_NAME" cothreads",
  plugin_init,
  VERSION,
  GST_LICENSE,
  GST_PACKAGE,
  GST_ORIGIN
);
382
383
384


static void
385
destroy_chain (GstOptSchedulerChain *chain)
386
{
387
  GstOptScheduler *osched;
388
  
389
  GST_INFO ( "destroy chain %p", chain);
390

391
392
  g_assert (chain->num_groups == 0);
  g_assert (chain->groups == NULL);
393

394
  osched = chain->sched;
Wim Taymans's avatar
Wim Taymans committed
395
  osched->chains = g_slist_remove (osched->chains, chain);
396

397
  gst_object_unref (GST_OBJECT (osched));
398

399
400
401
  g_free (chain);
}

402
static GstOptSchedulerChain*
403
create_chain (GstOptScheduler *osched)
404
{
405
  GstOptSchedulerChain *chain;
406

407
408
409
  chain = g_new0 (GstOptSchedulerChain, 1);
  chain->sched = osched;
  chain->refcount = 1;
410
  chain->flags = GST_OPT_SCHEDULER_CHAIN_DISABLED;
411

412
413
414
  gst_object_ref (GST_OBJECT (osched));
  osched->chains = g_slist_prepend (osched->chains, chain);

415
  GST_INFO ( "new chain %p", chain);
416
417

  return chain;
418
419
420
}

static GstOptSchedulerChain*
421
ref_chain (GstOptSchedulerChain *chain)
422
{
423
424
  GST_LOG ("ref chain %p %d->%d", chain, 
	   chain->refcount, chain->refcount+1);
425
  chain->refcount++;
426

427
428
  return chain;
}
429

430
431
432
static GstOptSchedulerChain*
unref_chain (GstOptSchedulerChain *chain)
{
433
434
  GST_LOG ("unref chain %p %d->%d", chain, 
	   chain->refcount, chain->refcount-1);
435

436
437
438
439
440
441
442
443
444
445
446
  if (--chain->refcount == 0) {
    destroy_chain (chain);
    chain = NULL;
  }

  return chain;
}

static GstOptSchedulerChain*
add_to_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
447
  GST_INFO ( "adding group %p to chain %p", group, chain);
448
449
450
451
452
453
454
455

  g_assert (group->chain == NULL);

  group = ref_group (group);

  group->chain = ref_chain (chain);
  chain->groups = g_slist_prepend (chain->groups, group);
  chain->num_groups++;
456

457
458
459
460
  if (GST_OPT_SCHEDULER_GROUP_IS_ENABLED (group)) {
    chain_group_set_enabled (chain, group, TRUE);
  }

461
462
463
  return chain;
}

464
static GstOptSchedulerChain*
465
466
remove_from_chain (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
467
  GST_INFO ( "removing group %p from chain %p", group, chain);
468

469
470
471
  if (!chain)
    return NULL;

472
  g_assert (group);
473
474
  g_assert (group->chain == chain);

475
  group->chain = NULL;
Wim Taymans's avatar
Wim Taymans committed
476
  chain->groups = g_slist_remove (chain->groups, group);
477
  chain->num_groups--;
478
  unref_group (group);
479

480
481
  if (chain->num_groups == 0) 
    chain = unref_chain (chain);
482

483
  chain = unref_chain (chain);
484
  return chain;
485
486
}

487
static GstOptSchedulerChain*
488
489
merge_chains (GstOptSchedulerChain *chain1, GstOptSchedulerChain *chain2)
{
Wim Taymans's avatar
Wim Taymans committed
490
  GSList *walk;
491
492

  g_assert (chain1 != NULL);
493
  
494
  GST_INFO ( "merging chain %p and %p", chain1, chain2);
495
  
496
497
  if (chain1 == chain2 || chain2 == NULL)
    return chain1;
498

499
  ref_chain (chain2);
500
501
502
  walk = chain2->groups;
  while (walk) {
    GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) walk->data;
Wim Taymans's avatar
Wim Taymans committed
503
    walk = g_slist_next (walk);
504

505
    GST_INFO ( "reparenting group %p from chain %p to %p", 
506
507
		    group, chain2, chain1);

508
    group->chain = NULL;
509
510
511
512
513
514
    chain2->num_groups--;
    chain2 = unref_chain (chain2);

    group->chain = ref_chain (chain1);
    chain1->groups = g_slist_prepend (chain1->groups, group);
    chain1->num_groups++;
515
  }
516
517
518
  g_slist_free (chain2->groups);
  chain2->groups = NULL;
  unref_chain (chain2);
519

520
  return chain1;
521
522
523
524
525
}

static void
chain_group_set_enabled (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group, gboolean enabled)
{
526
527
528
  g_assert (chain != NULL);
  g_assert (group != NULL);

529
  GST_INFO ( "request to %d group %p in chain %p, have %d groups enabled out of %d", 
530
531
532
533
534
535
536
		  enabled, group, chain, chain->num_enabled, chain->num_groups);

  if (enabled)
    GST_OPT_SCHEDULER_GROUP_ENABLE (group);
  else 
    GST_OPT_SCHEDULER_GROUP_DISABLE (group);

537
  if (enabled) {
538
539
540
    if (chain->num_enabled < chain->num_groups)
      chain->num_enabled++;

541
    GST_INFO ( "enable group %p in chain %p, now %d groups enabled out of %d", group, chain,
542
		    chain->num_enabled, chain->num_groups);
543

544
    if (chain->num_enabled == chain->num_groups) {
545
      GST_INFO ( "enable chain %p", chain);
546
547
548
549
      GST_OPT_SCHEDULER_CHAIN_ENABLE (chain);
    }
  }
  else {
550
551
552
    if (chain->num_enabled > 0)
      chain->num_enabled--;

553
    GST_INFO ( "disable group %p in chain %p, now %d groups enabled out of %d", group, chain,
554
		    chain->num_enabled, chain->num_groups);
555

556
    if (chain->num_enabled == 0) {
557
      GST_INFO ( "disable chain %p", chain);
558
559
560
561
562
      GST_OPT_SCHEDULER_CHAIN_DISABLE (chain);
    }
  }
}

Wim Taymans's avatar
Wim Taymans committed
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
/* recursively migrate the group and all connected groups into the new chain */
static void
chain_recursively_migrate_group (GstOptSchedulerChain *chain, GstOptSchedulerGroup *group)
{
  GSList *links;
  
  /* group already in chain */
  if (group->chain == chain)
    return;

  /* first remove the group from its old chain */
  remove_from_chain (group->chain, group);
  /* add to new chain */
  add_to_chain (chain, group);

  /* then follow all links */
  links = group->group_links;
  while (links) {
    GstOptSchedulerGroupLink *link = (GstOptSchedulerGroupLink *) links->data;
    links = g_slist_next (links);

    chain_recursively_migrate_group (chain, (link->group1 == group ? link->group2 : link->group1));
  }
}

588
589
590
static GstOptSchedulerGroup*
ref_group (GstOptSchedulerGroup *group)
{
591
592
  GST_LOG ("ref group %p %d->%d", group, 
	   group->refcount, group->refcount+1);
593
594
595
596
597
598

  group->refcount++;

  return group;
}

599
#ifndef USE_COTHREADS
600
/* remove me
601
602
603
static GstOptSchedulerGroup*
ref_group_by_count (GstOptSchedulerGroup *group, gint count)
{
604
605
  GST_LOG ("ref group %p %d->%d", group, 
	   group->refcount, group->refcount+count);
606
607
608
609
610

  group->refcount += count;

  return group;
}
611
*/
612
613
#endif

614
615
616
static GstOptSchedulerGroup*
unref_group (GstOptSchedulerGroup *group)
{
617
618
  GST_LOG ("unref group %p %d->%d", group, 
	   group->refcount, group->refcount-1);
619
620
621
622
623
624
625
626
627

  if (--group->refcount == 1) {
    destroy_group (group);
    group = NULL;
  }

  return group;
}

628
static GstOptSchedulerGroup*
629
630
add_to_group (GstOptSchedulerGroup *group, GstElement *element)
{
631
632
633
  g_assert (group != NULL);
  g_assert (element != NULL);

634
  GST_INFO ( "adding element \"%s\" to group %p", GST_ELEMENT_NAME (element), group);
635
636

  if (GST_ELEMENT_IS_DECOUPLED (element)) {
637
    GST_INFO ( "element \"%s\" is decoupled, not adding to group %p", 
638
	      GST_ELEMENT_NAME (element), group);
639
    return group;
640
641
642
643
  }

  g_assert (GST_ELEMENT_SCHED_GROUP (element) == NULL);

644
645
646
  GST_ELEMENT_SCHED_GROUP (element) = ref_group (group);

  gst_object_ref (GST_OBJECT (element));
Wim Taymans's avatar
Wim Taymans committed
647
  group->elements = g_slist_prepend (group->elements, element);
648
649
  group->num_elements++;

650
651
652
653
  if (gst_element_get_state (element) == GST_STATE_PLAYING) {
    group_element_set_enabled (group, element, TRUE);
  }

654
655
656
  /* Ref the group... */
  ref_group (group);
  
657
  return group;
658
659
660
661
662
663
664
665
}

static GstOptSchedulerGroup*
create_group (GstOptSchedulerChain *chain, GstElement *element)
{
  GstOptSchedulerGroup *group;

  group = g_new0 (GstOptSchedulerGroup, 1);
666
  GST_INFO ( "new group %p", group);
667
  group->refcount = 1;
668
  group->flags = GST_OPT_SCHEDULER_GROUP_DISABLED;
669
670
671
672
673
674
675
676
677
678

  add_to_group (group, element);
  add_to_chain (chain, group);
  
  return group;
}

static void 
destroy_group_scheduler (GstOptSchedulerGroup *group) 
{
679
680
  g_assert (group);

681
  if (group->flags & GST_OPT_SCHEDULER_GROUP_RUNNING)
682
    g_warning ("destroying running group scheduler");
683

684
#ifdef USE_COTHREADS
685
686
  if (group->cothread) {
    do_cothread_destroy (group->cothread);
Wim Taymans's avatar
Wim Taymans committed
687
    group->cothread = NULL;
688
  }
Wim Taymans's avatar
Wim Taymans committed
689
690
691
692
#else
  group->schedulefunc = NULL;
  group->argc = 0;
  group->argv = NULL;
693
#endif
694
695
696
697
698

  group->flags &= ~GST_OPT_SCHEDULER_GROUP_SCHEDULABLE;
}

static void
699
destroy_group (GstOptSchedulerGroup *group)
700
{
701
  GST_INFO ( "destroy group %p", group);
702

703
  g_assert (group != NULL);
704
705
706
  g_assert (group->elements == NULL);

  remove_from_chain (group->chain, group);
707
708
709
710
711
712
713

  if (group->flags & GST_OPT_SCHEDULER_GROUP_SCHEDULABLE)
    destroy_group_scheduler (group);

  g_free (group);
}

714
static GstOptSchedulerGroup*
715
716
remove_from_group (GstOptSchedulerGroup *group, GstElement *element)
{
717
  GST_INFO ( "removing element \"%s\" from group %p", GST_ELEMENT_NAME (element), group);
718

719
720
  g_assert (group != NULL);
  g_assert (element != NULL);
721
  g_assert (GST_ELEMENT_SCHED_GROUP (element) == group);
722

Wim Taymans's avatar
Wim Taymans committed
723
  group->elements = g_slist_remove (group->elements, element);
724
725
  group->num_elements--;

726
727
728
729
730
731
  /* if the element was an entry point in the group, clear the group's
   * entry point */
  if (group->entry == element) {
    group->entry = NULL;
  }

732
  GST_ELEMENT_SCHED_GROUP (element) = NULL;
733
  gst_object_unref (GST_OBJECT (element));
734
735

  if (group->num_elements == 0) {
736
    group = unref_group (group);
737
  }
738
739
  group = unref_group (group);

Wim Taymans's avatar
Wim Taymans committed
740
741
742
743
744
745
746
747
  return group;
}

static GstOptSchedulerGroup*
merge_groups (GstOptSchedulerGroup *group1, GstOptSchedulerGroup *group2)
{
  g_assert (group1 != NULL);

748
  GST_INFO ( "merging groups %p and %p", group1, group2);
Wim Taymans's avatar
Wim Taymans committed
749
750
751
752
  
  if (group1 == group2 || group2 == NULL)
    return group1;

753
  while (group2 && group2->elements) {
Wim Taymans's avatar
Wim Taymans committed
754
    GstElement *element = (GstElement *)group2->elements->data;
755

Wim Taymans's avatar
Wim Taymans committed
756
757
758
759
760
    group2 = remove_from_group (group2, element);
    add_to_group (group1, element);
  }
  
  return group1;
761
762
}

763
764
765
static void
group_error_handler (GstOptSchedulerGroup *group) 
{
766
  GST_INFO ( "group %p has errored", group);
Wim Taymans's avatar
Wim Taymans committed
767

768
769
770
771
  chain_group_set_enabled (group->chain, group, FALSE);
  group->chain->sched->state = GST_OPT_SCHEDULER_STATE_ERROR;
}

772
773
774
/* this function enables/disables an element, it will set/clear a flag on the element 
 * and tells the chain that the group is enabled if all elements inside the group are
 * enabled */
775
776
777
static void
group_element_set_enabled (GstOptSchedulerGroup *group, GstElement *element, gboolean enabled)
{
778
779
780
  g_assert (group != NULL);
  g_assert (element != NULL);

781
  GST_INFO ( "request to %d element %s in group %p, have %d elements enabled out of %d", 
782
783
		    enabled, GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);

784
  if (enabled) {
785
786
787
    if (group->num_enabled < group->num_elements)
      group->num_enabled++;

788
    GST_INFO ( "enable element %s in group %p, now %d elements enabled out of %d", 
789
		    GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
790

791
    if (group->num_enabled == group->num_elements) {
792
      GST_INFO ( "enable group %p", group);
793
794
795
796
      chain_group_set_enabled (group->chain, group, TRUE);
    }
  }
  else {
797
798
799
    if (group->num_enabled > 0)
      group->num_enabled--;

800
    GST_INFO ( "disable element %s in group %p, now %d elements enabled out of %d", 
801
		    GST_ELEMENT_NAME (element), group, group->num_enabled, group->num_elements);
802

803
    if (group->num_enabled == 0) {
804
      GST_INFO ( "disable group %p", group);
805
806
807
808
809
      chain_group_set_enabled (group->chain, group, FALSE);
    }
  }
}

810
811
812
813
814
/* a group is scheduled by doing a cothread switch to it or
 * by calling the schedule function. In the non-cothread case
 * we cannot run already running groups so we return FALSE here
 * to indicate this to the caller */
static gboolean 
815
816
schedule_group (GstOptSchedulerGroup *group) 
{
817
  if (!group->entry) {
818
    GST_INFO ( "not scheduling group %p without entry", group);
819
    return FALSE;
820
  }
821

822
823
824
#ifdef USE_COTHREADS
  if (group->cothread)
    do_cothread_switch (group->cothread);
Wim Taymans's avatar
Wim Taymans committed
825
826
  else
    g_warning ("(internal error): trying to schedule group without cothread");
827
828
  return TRUE;
#else
829
830
  /* cothreads automatically call the pre- and post-run functions for us;
   * without cothreads we need to call them manually */
831
  if (group->schedulefunc == NULL) {
832
    GST_INFO ( "not scheduling group %p without schedulefunc", 
833
834
		    group);
    return FALSE;
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
  } else {
    GSList *l;

    for (l=group->elements; l; l=l->next) {
      GstElement *e = (GstElement*)l->data;
      if (e->pre_run_func)
        e->pre_run_func (e);
    }

    group->schedulefunc (group->argc, group->argv);

    for (l=group->elements; l; l=l->next) {
      GstElement *e = (GstElement*)l->data;
      if (e->post_run_func)
        e->post_run_func (e);
    }

852
  }
853
854
  return TRUE;
#endif
855
856
}

857
#ifndef USE_COTHREADS
858
859
static void
gst_opt_scheduler_schedule_run_queue (GstOptScheduler *osched)
860
{
861
  GST_LOG_OBJECT (osched, "entering scheduler run queue recursion %d %d", 
862
		  osched->recursion, g_list_length (osched->runqueue));
863

864
865
866
867
868
869
  /* make sure we don't exceed max_recursion */
  if (osched->recursion > osched->max_recursion) {
    osched->state = GST_OPT_SCHEDULER_STATE_ERROR;
    return;
  }

870
  osched->recursion++;
871

872
873
  while (osched->runqueue) {
    GstOptSchedulerGroup *group;
874
    gboolean res;
875
876
    
    group = (GstOptSchedulerGroup *) osched->runqueue->data;
877

878
    /* runqueue hols refcount to group */
879
    osched->runqueue = g_list_remove (osched->runqueue, group);
880

881
    GST_LOG_OBJECT (osched, "scheduling group %p", group);
882

883
884
    res = schedule_group (group);
    if (!res) {
885
886
887
      g_warning  ("error scheduling group %p", group);
      group_error_handler (group);
    }
888
    else {
889
      GST_LOG_OBJECT (osched, "done scheduling group %p", group);
890
891
    }
    unref_group (group);
892
  }
893

894
  GST_LOG_OBJECT (osched, "run queue length after scheduling %d", g_list_length (osched->runqueue));
895

896
897
  osched->recursion--;
}
898
#endif
899

900
/* a chain is scheduled by picking the first active group and scheduling it */
901
static void
902
903
schedule_chain (GstOptSchedulerChain *chain) 
{
Wim Taymans's avatar
Wim Taymans committed
904
905
906
907
908
  GSList *groups;
  GstOptScheduler *osched;

  osched = chain->sched;
  groups = chain->groups;
909

910
911
  while (groups) {
    GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) groups->data;
912

913
    if (!GST_OPT_SCHEDULER_GROUP_IS_DISABLED (group)) {
914
      ref_group (group);
915
916
      GST_LOG ("scheduling group %p in chain %p", 
 	       group, chain);
917

918
919
920
921
#ifdef USE_COTHREADS
      schedule_group (group);
#else
      osched->recursion = 0;
922
923
924
925
      if (!g_list_find (osched->runqueue, group))
      {
        ref_group (group);
        osched->runqueue = g_list_append (osched->runqueue, group);
926
      }
927
      gst_opt_scheduler_schedule_run_queue (osched);
928
#endif
929

930
931
      GST_LOG ("done scheduling group %p in chain %p", 
 	       group, chain);
932
      unref_group (group);
933
934
      break;
    }
935
936

    groups = g_slist_next (groups);
937
  }
938
939
}

940
941
942
943
/* a get-based group is scheduled by getting a buffer from the get based
 * entry point and by pushing the buffer to the peer.
 * We also set the running flag on this group for as long as this
 * function is running. */
944
static int
945
get_group_schedule_function (int argc, char *argv[])
946
947
{
  GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
948
949
  GstElement *entry = group->entry;
  const GList *pads = gst_element_get_pad_list (entry);
950

951
  GST_LOG ("get wrapper of group %p", group);
952
953
954

  group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;

Wim Taymans's avatar
Wim Taymans committed
955
  while (pads) {
956
    GstData *data;
957
    GstPad *pad = GST_PAD (pads->data);
Wim Taymans's avatar
Wim Taymans committed
958
    pads = g_list_next (pads);
959

Wim Taymans's avatar
Wim Taymans committed
960
961
962
    /* skip sinks and ghostpads */
    if (!GST_PAD_IS_SRC (pad) || !GST_IS_REAL_PAD (pad))
      continue;
963

964
965
    GST_LOG ("doing get and push on pad \"%s:%s\" in group %p", 
	     GST_DEBUG_PAD_NAME (pad), group);
966

967
968
969
970
    data = GST_RPAD_GETFUNC (pad) (pad);
    if (data) {
      if (GST_EVENT_IS_INTERRUPT (data)) {
	gst_event_unref (GST_EVENT (data));
971
972
	break;
      }
973
      gst_pad_push (pad, data);
974
    }
Wim Taymans's avatar
Wim Taymans committed
975
  }
976

Wim Taymans's avatar
Wim Taymans committed
977
  group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;
978

Wim Taymans's avatar
Wim Taymans committed
979
980
  return 0;
}
981

982
983
984
985
/* a loop-based group is scheduled by calling the loop function
 * on the entry point. 
 * We also set the running flag on this group for as long as this
 * function is running. */
Wim Taymans's avatar
Wim Taymans committed
986
static int
987
loop_group_schedule_function (int argc, char *argv[])
Wim Taymans's avatar
Wim Taymans committed
988
989
990
991
{
  GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;
  GstElement *entry = group->entry;

992
  GST_LOG ("loop wrapper of group %p", group);
Wim Taymans's avatar
Wim Taymans committed
993
994
995

  group->flags |= GST_OPT_SCHEDULER_GROUP_RUNNING;

996
997
  GST_LOG ("calling loopfunc of element %s in group %p", 
	   GST_ELEMENT_NAME (entry), group);
Wim Taymans's avatar
Wim Taymans committed
998
999

  entry->loopfunc (entry);
1000

1001
1002
  GST_LOG ("loopfunc ended of element %s in group %p", 
	   GST_ELEMENT_NAME (entry), group);
1003

1004
1005
  group->flags &= ~GST_OPT_SCHEDULER_GROUP_RUNNING;

Wim Taymans's avatar
Wim Taymans committed
1006
1007
1008
1009
  return 0;

}

1010
/* the function to schedule an unknown group, which just gives an error */
Wim Taymans's avatar
Wim Taymans committed
1011
static int
1012
unknown_group_schedule_function (int argc, char *argv[])
Wim Taymans's avatar
Wim Taymans committed
1013
1014
1015
{
  GstOptSchedulerGroup *group = (GstOptSchedulerGroup *) argv;

1016
  g_warning ("(internal error) unknown group type %d, disabling\n", group->type);
1017
  group_error_handler (group);
Wim Taymans's avatar
Wim Taymans committed
1018

1019
1020
1021
  return 0;
}

1022
/* this function is called when the first element of a chain-loop or a loop-loop
1023
 * link performs a push to the loop element. We then schedule the
1024
 * group with the loop-based element until the bufpen is empty */
1025
static void
1026
gst_opt_scheduler_loop_wrapper (GstPad *sinkpad, GstData *data)
1027
1028
{
  GstOptSchedulerGroup *group;
1029
  GstOptScheduler *osched;
1030

1031
  GST_LOG ("loop wrapper, putting buffer in bufpen");
1032
1033

  group = GST_ELEMENT_SCHED_GROUP (GST_PAD_PARENT (sinkpad));
1034
  osched = group->chain->sched;
1035
1036


1037
1038
1039
#ifdef USE_COTHREADS
  if (GST_PAD_BUFLIST (GST_RPAD_PEER (sinkpad))) {
    g_warning ("deadlock detected, disabling group %p", group);
1040
    group_error_handler (group);