Commit 53b6bb62 authored by Wim Taymans's avatar Wim Taymans

Added a first stab at a better clocking system.

Original commit message from CVS:
Added a first stab at a better clocking system.
It still needs more infrastructure for async notification and custom clock
implementors.
This thing can still deadlock the pipeline.
parent 6b2df345
[submodule "common"]
path = common
url = git://anongit.freedesktop.org/gstreamer/common
common @ d81417a1
Subproject commit d81417a103945ab1c393e74557983b1163e9e353
......@@ -79,6 +79,7 @@ libgst_la_SOURCES = \
gstprops.c \
gstqueue.c \
gstscheduler.c \
gstsystemclock.c \
gstthread.c \
$(GST_TRACE_SRC) \
gsttype.c \
......@@ -187,6 +188,7 @@ libgstinclude_HEADERS = \
gstprops.h \
gstqueue.h \
gstscheduler.h \
gstsystemclock.h \
gstthread.h \
gsttrace.h \
gsttype.h \
......
......@@ -137,6 +137,8 @@ int main(int argc,char *argv[])
exit (-4);
}
gst_bin_use_clock (GST_BIN (bin), gst_system_clock_obtain ());
/* start playing */
gst_element_set_state(bin, GST_STATE_PLAYING);
......
......@@ -304,7 +304,7 @@ gst_disksink_handle_event (GstPad *pad, GstEvent *event)
gst_disksink_getcurrentfilename(disksink), sys_errlist[errno]);
break;
default:
g_warning("Unhandled event %d\n", type);
gst_pad_event_default (pad, event);
break;
}
......
......@@ -34,7 +34,7 @@
#include <gst/gstpad.h>
#include <gst/gstbuffer.h>
#include <gst/gstcpu.h>
#include <gst/gstclock.h>
#include <gst/gstsystemclock.h>
#include <gst/gstelement.h>
#include <gst/gstbin.h>
#include <gst/gstpipeline.h>
......
......@@ -25,6 +25,8 @@
#include "gstevent.h"
#include "gstbin.h"
#include "gstxml.h"
#include "gstsystemclock.h"
#include "gstscheduler.h"
......@@ -160,6 +162,93 @@ gst_bin_reset_element_sched (GstElement * element, GstScheduler * sched)
gst_element_set_sched (element, sched);
}
static void
gst_bin_get_clock_elements (GstBin *bin, GList **needing, GList **providing)
{
GList *children = gst_bin_get_list (bin);
while (children) {
GstElement *child = GST_ELEMENT (children->data);
if (GST_IS_BIN (child)) {
gst_bin_get_clock_elements (GST_BIN (child), needing, providing);
}
if (child->getclockfunc) {
*providing = g_list_prepend (*providing, child);
}
if (child->setclockfunc) {
*needing = g_list_prepend (*needing, child);
}
children = g_list_next (children);
}
}
static void
gst_bin_distribute_clock (GstBin *bin, GList *needing, GstClock *clock)
{
while (needing) {
GST_DEBUG (GST_CAT_CLOCK, "setting clock on %s\n", GST_ELEMENT_NAME (needing->data));
gst_element_set_clock (GST_ELEMENT (needing->data), clock);
needing = g_list_next (needing);
}
}
static void
gst_bin_distribute_clocks (GstBin *bin)
{
GList *needing = NULL, *providing = NULL;
GstElement *provider;
GstClock *clock;
gst_bin_get_clock_elements (bin, &needing, &providing);
if (GST_FLAG_IS_SET (bin, GST_BIN_FLAG_FIXED_CLOCK)) {
clock = bin->clock;
}
else if (providing) {
clock = gst_element_get_clock (GST_ELEMENT (providing->data));
}
else {
GST_DEBUG (GST_CAT_CLOCK, "no clock provided, using default clock\n");
clock = gst_system_clock_obtain ();
}
GST_BIN_CLOCK (bin) = clock;
gst_bin_distribute_clock (bin, needing, clock);
}
GstClock*
gst_bin_get_clock (GstBin *bin)
{
g_return_val_if_fail (bin != NULL, NULL);
g_return_val_if_fail (GST_IS_BIN (bin), NULL);
return GST_BIN_CLOCK (bin);
}
void
gst_bin_use_clock (GstBin *bin, GstClock *clock)
{
g_return_if_fail (bin != NULL);
g_return_if_fail (GST_IS_BIN (bin));
GST_FLAG_SET (bin, GST_BIN_FLAG_FIXED_CLOCK);
GST_BIN_CLOCK (bin) = clock;
}
void
gst_bin_auto_clock (GstBin *bin)
{
g_return_if_fail (bin != NULL);
g_return_if_fail (GST_IS_BIN (bin));
GST_FLAG_UNSET (bin, GST_BIN_FLAG_FIXED_CLOCK);
GST_BIN_CLOCK (bin) = NULL;
}
static void
gst_bin_set_element_sched (GstElement * element, GstScheduler * sched)
{
......@@ -412,14 +501,9 @@ gst_bin_child_state_change (GstBin *bin, GstElementState oldstate, GstElementSta
void
gst_bin_child_error (GstBin *bin, GstElement *child)
{
g_return_if_fail (GST_IS_BIN (bin));
if (GST_STATE (bin) != GST_STATE_NULL) {
/*
GST_STATE_PENDING (bin) = ((GST_STATE (bin) >> 1));
if (gst_element_set_state (bin, GST_STATE (bin)>>1) != GST_STATE_SUCCESS) {
gst_element_error (GST_ELEMENT (bin), "bin \"%s\" couldn't change state on error from child \"%s\"",
GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
}
*/
gst_element_info (GST_ELEMENT (bin), "bin \"%s\" stopped because child \"%s\" signalled an error",
GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
}
......@@ -443,6 +527,7 @@ gst_bin_change_state (GstElement * element)
GstElement *child;
GstElementStateReturn ret;
GstElementState old_state, pending;
gint transition;
gboolean have_async = FALSE;
g_return_val_if_fail (GST_IS_BIN (element), GST_STATE_FAILURE);
......@@ -451,6 +536,7 @@ gst_bin_change_state (GstElement * element)
old_state = GST_STATE (element);
pending = GST_STATE_PENDING (element);
transition = GST_STATE_TRANSITION (element);
GST_INFO_ELEMENT (GST_CAT_STATES, element, "changing childrens' state from %s to %s",
gst_element_statename (old_state), gst_element_statename (pending));
......@@ -482,6 +568,28 @@ gst_bin_change_state (GstElement * element)
}
}
if (GST_ELEMENT_SCHED (bin) != NULL && GST_ELEMENT_PARENT (bin) == NULL) {
switch (transition) {
case GST_STATE_NULL_TO_READY:
gst_bin_distribute_clocks (bin);
break;
case GST_STATE_READY_TO_PAUSED:
if (GST_BIN_CLOCK (bin))
gst_clock_reset (GST_BIN_CLOCK (bin));
break;
case GST_STATE_PAUSED_TO_PLAYING:
gst_bin_distribute_clocks (bin);
if (GST_BIN_CLOCK (bin))
gst_clock_activate (GST_BIN_CLOCK (bin), TRUE);
break;
case GST_STATE_PLAYING_TO_PAUSED:
if (GST_BIN_CLOCK (bin))
gst_clock_activate (GST_BIN_CLOCK (bin), FALSE);
break;
}
}
GST_INFO_ELEMENT (GST_CAT_STATES, element, "done changing bin's state from %s to %s, now in %s",
gst_element_statename (old_state),
gst_element_statename (pending),
......@@ -499,9 +607,13 @@ gst_bin_change_state (GstElement * element)
static GstElementStateReturn
gst_bin_change_state_norecurse (GstBin * bin)
{
GstElementStateReturn ret;
if (GST_ELEMENT_CLASS (parent_class)->change_state) {
GST_DEBUG_ELEMENT (GST_CAT_STATES, bin, "setting bin's own state\n");
return GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (bin));
ret = GST_ELEMENT_CLASS (parent_class)->change_state (GST_ELEMENT (bin));
return ret;
}
else
return GST_STATE_FAILURE;
......
......@@ -48,6 +48,10 @@ extern GType _gst_bin_type;
# define GST_BIN_CLASS GST_BIN_CLASS_CAST
#endif
#define GST_BIN_CLOCK_PROVIDERS(bin) (GST_BIN(bin)->clock_providers)
#define GST_BIN_CLOCK_RECEIVERS(bin) (GST_BIN(bin)->clock_receivers)
#define GST_BIN_CLOCK(bin) (GST_BIN(bin)->clock)
typedef enum {
/* this bin is a manager of child elements, i.e. a pipeline or thread */
GST_BIN_FLAG_MANAGER = GST_ELEMENT_FLAG_LAST,
......@@ -57,6 +61,8 @@ typedef enum {
/* we prefer to have cothreads when its an option, over chain-based */
GST_BIN_FLAG_PREFER_COTHREADS,
GST_BIN_FLAG_FIXED_CLOCK,
/* padding */
GST_BIN_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 4,
} GstBinFlags;
......@@ -65,15 +71,17 @@ typedef enum {
/*typedef struct _GstBinClass GstBinClass; */
struct _GstBin {
GstElement element;
GstElement element;
/* our children */
gint numchildren;
GList *children;
gint numchildren;
GList *children;
GstElementState child_states[GST_NUM_STATES];
GstClock *clock;
gpointer sched_private;
gpointer sched_private;
};
struct _GstBinClass {
......@@ -108,11 +116,18 @@ gboolean gst_bin_set_state_type (GstBin *bin, GstElementState state, GType type
gboolean gst_bin_iterate (GstBin *bin);
void gst_bin_use_clock (GstBin *bin, GstClock *clock);
GstClock* gst_bin_get_clock (GstBin *bin);
void gst_bin_auto_clock (GstBin *bin);
/* internal */
/* one of our childs signaled a state change */
void gst_bin_child_state_change (GstBin *bin, GstElementState oldstate,
GstElementState newstate, GstElement *child);
/* one of our childs signaled an error */
void gst_bin_child_error (GstBin *bin, GstElement *child);
#ifdef __cplusplus
}
#endif /* __cplusplus */
......
......@@ -25,176 +25,153 @@
/* #define GST_DEBUG_ENABLED */
#include "gst_private.h"
#include "gstelement.h"
#include "gstclock.h"
#define CLASS(clock) GST_CLOCK_CLASS (G_OBJECT_GET_CLASS (clock))
static GstClock *the_system_clock = NULL;
/**
* gst_clock_new:
* @name: the name of the new clock
*
* create a new clock element
*
* Returns: the new clock element
*/
GstClock*
gst_clock_new (gchar *name)
static void gst_clock_class_init (GstClockClass *klass);
static void gst_clock_init (GstClock *clock);
static GstObjectClass *parent_class = NULL;
/* static guint gst_clock_signals[LAST_SIGNAL] = { 0 }; */
GType
gst_clock_get_type (void)
{
GstClock *clock = (GstClock *) g_malloc(sizeof(GstClock));
static GType clock_type = 0;
if (!clock_type) {
static const GTypeInfo clock_info = {
sizeof (GstObjectClass),
NULL,
NULL,
(GClassInitFunc) gst_clock_class_init,
NULL,
NULL,
sizeof (GstObject),
4,
(GInstanceInitFunc) gst_clock_init,
NULL
};
clock_type = g_type_register_static (GST_TYPE_OBJECT, "GstClock",
&clock_info, G_TYPE_FLAG_ABSTRACT);
}
return clock_type;
}
clock->name = g_strdup (name);
clock->sinkobjects = NULL;
clock->sinkmutex = g_mutex_new ();
clock->lock = g_mutex_new ();
g_mutex_lock (clock->sinkmutex);
static void
gst_clock_class_init (GstClockClass *klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
clock->num = 0;
clock->num_locked = 0;
clock->locking = FALSE;
gobject_class = (GObjectClass*) klass;
gstobject_class = (GstObjectClass*) klass;
return clock;
parent_class = g_type_class_ref (GST_TYPE_OBJECT);
}
/**
* gst_clock_get_system:
*
* Get the global system clock
*
* Returns: the global clock
*/
GstClock*
gst_clock_get_system(void)
static void
gst_clock_init (GstClock *clock)
{
if (the_system_clock == NULL) {
the_system_clock = gst_clock_new ("system_clock");
gst_clock_reset (the_system_clock);
}
return the_system_clock;
clock->speed = 1.0;
clock->active = FALSE;
clock->start_time = 0;
clock->active_mutex = g_mutex_new ();
clock->active_cond = g_cond_new ();
}
/**
* gst_clock_register:
* @clock: the name of the clock to register to
* @obj: the object registering to the clock
*
* State that an object is interested in listening to the
* given clock
*/
void
gst_clock_register (GstClock *clock, GstObject *obj)
gst_clock_reset (GstClock *clock)
{
if ((GST_ELEMENT(obj))->numsrcpads == 0) {
GST_DEBUG (GST_CAT_CLOCK,"gst_clock: setting registered sink object 0x%p\n", obj);
clock->sinkobjects = g_list_append (clock->sinkobjects, obj);
clock->num++;
}
g_return_if_fail (GST_IS_CLOCK (clock));
clock->start_time = 0;
clock->active = FALSE;
if (CLASS (clock)->reset)
CLASS (clock)->reset (clock);
}
/**
* gst_clock_set:
* @clock: The clock to set
* @time: the time to set
*
* Set the time of the given clock to time.
*/
void
gst_clock_set (GstClock *clock, GstClockTime time)
gst_clock_activate (GstClock *clock, gboolean active)
{
struct timeval tfnow;
GstClockTime now;
gettimeofday (&tfnow, (struct timezone *)NULL);
now = tfnow.tv_sec*1000000LL+tfnow.tv_usec;
g_mutex_lock (clock->lock);
clock->start_time = now - time;
g_mutex_unlock (clock->lock);
GST_DEBUG (GST_CAT_CLOCK,"gst_clock: setting clock to %llu %llu %llu\n",
time, now, clock->start_time);
g_return_if_fail (GST_IS_CLOCK (clock));
clock->active = active;
if (CLASS (clock)->activate)
CLASS (clock)->activate (clock, active);
g_mutex_lock (clock->active_mutex);
g_cond_signal (clock->active_cond);
g_mutex_unlock (clock->active_mutex);
}
/**
* gst_clock_current_diff:
* @clock: the clock to calculate the diff against
* @time: the time
*
* Calculate the difference between the given clock and the
* given time
*
* Returns: the clock difference
*/
GstClockTimeDiff
gst_clock_current_diff (GstClock *clock, GstClockTime time)
gboolean
gst_clock_is_active (GstClock *clock)
{
struct timeval tfnow;
GstClockTime now;
gettimeofday (&tfnow, (struct timezone *)NULL);
g_mutex_lock (clock->lock);
now = ((guint64)tfnow.tv_sec*1000000LL+tfnow.tv_usec) - (guint64)clock->start_time;
g_mutex_unlock (clock->lock);
g_return_val_if_fail (GST_IS_CLOCK (clock), FALSE);
return GST_CLOCK_DIFF (time, now);
return clock->active;
}
/**
* gst_clock_reset:
* @clock: the clock to reset
*
* Reset the given clock. The of the clock will be adjusted back
* to 0.
*/
void
gst_clock_reset (GstClock *clock)
gst_clock_set_time (GstClock *clock, GstClockTime time)
{
struct timeval tfnow;
gettimeofday (&tfnow, (struct timezone *)NULL);
g_mutex_lock (clock->lock);
clock->start_time = ((guint64)tfnow.tv_sec)*1000000LL+tfnow.tv_usec;
clock->current_time = clock->start_time;
clock->adjust = 0LL;
GST_DEBUG (GST_CAT_CLOCK,"gst_clock: setting start clock %llu\n", clock->start_time);
g_mutex_unlock (clock->lock);
g_return_if_fail (GST_IS_CLOCK (clock));
if (CLASS (clock)->set_time)
CLASS (clock)->set_time (clock, time);
}
/**
* gst_clock_wait:
* @clock: the clock to wait on
* @time: the time to wait for
* @obj: the object performing the wait
*
* Wait for a specific clock tick on the given clock.
*/
void
gst_clock_wait (GstClock *clock, GstClockTime time, GstObject *obj)
GstClockTime
gst_clock_get_time (GstClock *clock)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
if (CLASS (clock)->get_time)
return CLASS (clock)->get_time (clock);
return 0LL;
}
GstClockReturn
gst_clock_wait (GstClock *clock, GstClockTime time)
{
struct timeval tfnow;
GstClockTime now;
GstClockTimeDiff diff;
gettimeofday (&tfnow, (struct timezone *)NULL);
g_mutex_lock (clock->lock);
now = tfnow.tv_sec*1000000LL+tfnow.tv_usec - clock->start_time;
diff = GST_CLOCK_DIFF (time, now);
/* if we are not behind wait a bit */
GST_DEBUG (GST_CAT_CLOCK,"gst_clock: %s waiting for time %08llu %08llu %08lld\n",
GST_OBJECT_NAME (obj), time, now, diff);
g_mutex_unlock (clock->lock);
if (diff > 10000 ) {
tfnow.tv_usec = (diff % 1000000);
tfnow.tv_sec = diff / 1000000;
/* FIXME, this piece of code does not work with egcs optimisations on, had to use the following line */
if (!tfnow.tv_sec) {
select(0, NULL, NULL, NULL, &tfnow);
}
else GST_DEBUG (GST_CAT_CLOCK,"gst_clock: %s waiting %u %llu %llu %llu seconds\n",
GST_OBJECT_NAME (obj), (int)tfnow.tv_sec, now, diff, time);
g_return_val_if_fail (GST_IS_CLOCK (clock), GST_CLOCK_STOPPED);
if (!clock->active) {
g_mutex_lock (clock->active_mutex);
g_cond_wait (clock->active_cond, clock->active_mutex);
g_mutex_unlock (clock->active_mutex);
}
GST_DEBUG (GST_CAT_CLOCK,"gst_clock: %s waiting for time %08llu %08llu %08lld done \n",
GST_OBJECT_NAME (obj), time, now, diff);
if (CLASS (clock)->wait)
return CLASS (clock)->wait (clock, time);
return GST_CLOCK_TIMEOUT;
}
void
gst_clock_set_resolution (GstClock *clock, guint64 resolution)
{
g_return_if_fail (GST_IS_CLOCK (clock));
if (CLASS (clock)->set_resolution)
CLASS (clock)->set_resolution (clock, resolution);
}
guint64
gst_clock_get_resolution (GstClock *clock)
{
g_return_val_if_fail (GST_IS_CLOCK (clock), 0LL);
if (CLASS (clock)->get_resolution)
return CLASS (clock)->get_resolution (clock);
return 0LL;
}
......@@ -24,45 +24,103 @@
#ifndef __GST_CLOCK_H__
#define __GST_CLOCK_H__
#include <gst/gstobject.h>
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#include <gst/gstobject.h>
#define GST_TYPE_CLOCK \
(gst_clock_get_type())
#define GST_CLOCK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_CLOCK,GstClock))
#define GST_CLOCK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_CLOCK,GstClockClass))
#define GST_IS_CLOCK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CLOCK))
#define GST_IS_CLOCK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_CLOCK))
typedef guint64 GstClockTime;
typedef gint64 GstClockTimeDiff;
typedef gpointer GstClockID;
typedef guint64 GstClockTime;
typedef gint64 GstClockTimeDiff;
#define GST_CLOCK_DIFF(s, e) (GstClockTimeDiff)((s)-(e))
#define GST_TIMEVAL_TO_TIME(tv) ((tv).tv_sec * (guint64) G_USEC_PER_SEC + (tv).tv_usec)
#define GST_CLOCK_DIFF(s, e) (GstClockTimeDiff)((s)-(e))
typedef struct _GstClock GstClock;
typedef struct _GstClockClass GstClockClass;
typedef struct _GstClock GstClock;
typedef void (*GstClockCallback) (GstClock *clock, GstClockTime time, GstClockID id, gpointer user_data);
typedef enum
{
GST_CLOCK_STOPPED = 0,
GST_CLOCK_TIMEOUT = 1,
GST_CLOCK_EARLY = 2,
GST_CLOCK_ERROR = 3,
} GstClockReturn;
struct _GstClock {
gchar *name;
GstClockTime start_time;
GstClockTime current_time;
GstClockTimeDiff adjust;
gboolean locking;
GList *sinkobjects;
gint num, num_locked;
GMutex *sinkmutex;
GMutex *lock;
GstObject object;
GstClockTime start_time;
gdouble speed;
gboolean active;
GMutex *active_mutex;
GCond *active_cond;
};
GstClock* gst_clock_new (gchar *name);
GstClock* gst_clock_get_system (void);
struct _GstClockClass {
GstObjectClass parent_class;
/* vtable */
void (*activate) (GstClock *clock, gboolean active);
void (*reset) (GstClock *clock);
void (*set_time) (GstClock *clock, GstClockTime time);
GstClockTime (*get_time) (GstClock *clock);
void gst_clock_register (GstClock *clock, GstObject *obj);
void gst_clock_set (GstClock *clock, GstClockTime time);
GstClockReturn (*wait) (GstClock *clock, GstClockTime time);
GstClockID (*wait_async) (GstClock *clock, GstClockTime time,
GstClockCallback func, gpointer user_data);
void (*cancel_wait_async) (GstClock *clock, GstClockID id);
GstClockID (*notify_async) (GstClock *clock, GstClockTime interval,
GstClockCallback func, gpointer user_data);
void (*remove_notify_async) (GstClock *clock, GstClockID id);
void (*set_resolution) (GstClock *clock, guint64 resolution);
guint64 (*get_resolution) (GstClock *clock);
/* signals */
};
GType gst_clock_get_type (void);
void gst_clock_set_speed (GstClock *clock, gdouble speed);
void gst_clock_get_speed (GstClock *clock, gdouble speed);
void gst_clock_activate (GstClock *clock, gboolean active);
gboolean gst_clock_is_active (GstClock *clock);
void gst_clock_reset (GstClock *clock);
void gst_clock_wait (GstClock *clock, GstClockTime time, GstObject *obj);
GstClockTimeDiff gst_clock_current_diff (GstClock *clock, GstClockTime time);
void gst_clock_set_time (GstClock *clock, GstClockTime time);
GstClockTime gst_clock_get_time (GstClock *clock);
GstClockReturn gst_clock_wait (GstClock *clock, GstClockTime time);
GstClockID gst_clock_wait_async (GstClock *clock, GstClockTime time,
GstClockCallback func, gpointer user_data);
void gst_clock_cancel_wait_async (GstClock *clock, GstClockID id);
GstClockID gst_clock_notify_async (GstClock *clock, GstClockTime interval,