Commit abffdb2a authored by Jonas Holmberg's avatar Jonas Holmberg Committed by Wim Taymans

Implement the systemclock with gstpoll

Add a property to select the clock type, currently REALTIME and MONOTONIC when
posix timers are available.

Implement the systemclock with GstPoll instead of GCond. This allows us to
schedule timeouts with nanosecond precission on newer kernels and with ppoll
support. It's also resilient to changes to the systemclock because of NTP or
similar.
parent 5cfb02af
......@@ -1967,14 +1967,15 @@ GST_TYPE_STRUCTURE
gst_structure_get_type
</SECTION>
<SECTION>
<FILE>gstsystemclock</FILE>
<TITLE>GstSystemClock</TITLE>
GstClockType
GstSystemClock
gst_system_clock_obtain
<SUBSECTION Standard>
GstSystemClockClass
GstSystemClockPrivate
GST_SYSTEM_CLOCK
GST_IS_SYSTEM_CLOCK
gst_system_clock_get_type
......@@ -1982,9 +1983,11 @@ GST_SYSTEM_CLOCK_CLASS
GST_IS_SYSTEM_CLOCK_CLASS
GST_SYSTEM_CLOCK_GET_CLASS
GST_TYPE_SYSTEM_CLOCK
GST_SYSTEM_CLOCK_CAST
GST_TYPE_CLOCK_TYPE
gst_clock_type_get_type
</SECTION>
<SECTION>
<FILE>gsttaglist</FILE>
<TITLE>GstTagList</TITLE>
......
......@@ -166,7 +166,7 @@ gst_clock_entry_new (GstClock * clock, GstClockTime time,
entry->type = type;
entry->time = time;
entry->interval = interval;
entry->status = GST_CLOCK_BUSY;
entry->status = GST_CLOCK_OK;
entry->func = NULL;
entry->user_data = NULL;
......
......@@ -40,18 +40,42 @@
#include "gst_private.h"
#include "gstinfo.h"
#include "gstsystemclock.h"
#include "gstpoll.h"
/* Define this to get some extra debug about jitter from each clock_wait */
#undef WAIT_DEBUGGING
#define GST_TYPE_CLOCK_TYPE (gst_clock_type_get_type())
struct _GstSystemClockPrivate
{
GstClockType clock_type;
GstPoll *timer;
gint async_wakeup_count;
};
#define GST_SYSTEM_CLOCK_GET_PRIVATE(obj) \
(G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_SYSTEM_CLOCK, \
GstSystemClockPrivate))
enum
{
PROP_0,
PROP_CLOCK_TYPE,
/* FILL ME */
};
/* the one instance of the systemclock */
static GstClock *_the_system_clock = NULL;
static void gst_system_clock_class_init (GstSystemClockClass * klass);
static void gst_system_clock_init (GstSystemClock * clock);
static void gst_system_clock_dispose (GObject * object);
static void gst_system_clock_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_system_clock_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstClockTime gst_system_clock_get_internal_time (GstClock * clock);
static guint64 gst_system_clock_get_resolution (GstClock * clock);
......@@ -98,6 +122,22 @@ gst_system_clock_get_type (void)
return clock_type;
}
static GType
gst_clock_type_get_type (void)
{
static GType clock_type_type = 0;
static const GEnumValue clock_types[] = {
{GST_CLOCK_TYPE_REALTIME, "GST_CLOCK_TYPE_REALTIME", "realtime"},
{GST_CLOCK_TYPE_MONOTONIC, "GST_CLOCK_TYPE_MONOTONIC", "monotonic"},
{0, NULL, NULL},
};
if (G_UNLIKELY (!clock_type_type)) {
clock_type_type = g_enum_register_static ("GstClockType", clock_types);
}
return clock_type_type;
}
static void
gst_system_clock_class_init (GstSystemClockClass * klass)
{
......@@ -111,7 +151,16 @@ gst_system_clock_class_init (GstSystemClockClass * klass)
parent_class = g_type_class_peek_parent (klass);
g_type_class_add_private (klass, sizeof (GstSystemClockPrivate));
gobject_class->dispose = gst_system_clock_dispose;
gobject_class->set_property = gst_system_clock_set_property;
gobject_class->get_property = gst_system_clock_get_property;
g_object_class_install_property (gobject_class, PROP_CLOCK_TYPE,
g_param_spec_enum ("clock-type", "Clock type",
"The type of underlying clock implementation used",
GST_TYPE_CLOCK_TYPE, GST_CLOCK_TYPE_REALTIME, G_PARAM_READWRITE));
gstclock_class->get_internal_time = gst_system_clock_get_internal_time;
gstclock_class->get_resolution = gst_system_clock_get_resolution;
......@@ -129,11 +178,16 @@ gst_system_clock_init (GstSystemClock * clock)
GST_CLOCK_FLAG_CAN_DO_PERIODIC_SYNC |
GST_CLOCK_FLAG_CAN_DO_PERIODIC_ASYNC);
clock->priv = GST_SYSTEM_CLOCK_GET_PRIVATE (clock);
clock->priv->clock_type = GST_CLOCK_TYPE_REALTIME;
clock->priv->timer = gst_poll_new_timer ();
#if 0
/* Uncomment this to start the async clock thread straight away */
GST_CLOCK_LOCK (clock);
GST_OBJECT_LOCK (clock);
gst_system_clock_start_async (clock);
GST_CLOCK_UNLOCK (clock);
GST_OBJECT_UNLOCK (clock);
#endif
}
......@@ -141,8 +195,7 @@ static void
gst_system_clock_dispose (GObject * object)
{
GstClock *clock = (GstClock *) object;
GstSystemClock *sysclock = GST_SYSTEM_CLOCK (clock);
GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock);
GList *entries;
/* else we have to stop the thread */
......@@ -165,6 +218,8 @@ gst_system_clock_dispose (GObject * object)
sysclock->thread = NULL;
GST_CAT_DEBUG (GST_CAT_CLOCK, "joined thread");
gst_poll_free (sysclock->priv->timer);
G_OBJECT_CLASS (parent_class)->dispose (object);
if (_the_system_clock == clock) {
......@@ -173,6 +228,40 @@ gst_system_clock_dispose (GObject * object)
}
}
static void
gst_system_clock_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSystemClock *sysclock = GST_SYSTEM_CLOCK (object);
switch (prop_id) {
case PROP_CLOCK_TYPE:
sysclock->priv->clock_type = g_value_get_enum (value);
GST_CAT_DEBUG (GST_CAT_CLOCK, "clock-type set to %d",
sysclock->priv->clock_type);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
gst_system_clock_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstSystemClock *sysclock = GST_SYSTEM_CLOCK (object);
switch (prop_id) {
case PROP_CLOCK_TYPE:
g_value_set_enum (value, sysclock->priv->clock_type);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
/**
* gst_system_clock_obtain:
*
......@@ -214,6 +303,30 @@ gst_system_clock_obtain (void)
return clock;
}
static void
gst_system_clock_clear_async_wakeups_unlocked (GstSystemClock * sysclock)
{
while (sysclock->priv->async_wakeup_count > 0) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "reading control");
while (!gst_poll_read_control (sysclock->priv->timer)) {
g_warning ("gstsystemclock: read control failed, trying again\n");
}
sysclock->priv->async_wakeup_count--;
}
GST_CLOCK_BROADCAST (sysclock);
}
static void
gst_system_clock_wakeup_async_unlocked (GstSystemClock * sysclock)
{
GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control");
while (!gst_poll_write_control (sysclock->priv->timer)) {
g_warning ("gstsystemclock: write control failed, trying again\n");
}
sysclock->priv->async_wakeup_count++;
}
/* this thread reads the sorted clock entries from the queue.
*
* It waits on each of them and fires the callback when the timeout occurs.
......@@ -229,7 +342,7 @@ gst_system_clock_obtain (void)
static void
gst_system_clock_async_thread (GstClock * clock)
{
GstSystemClock *sysclock = GST_SYSTEM_CLOCK (clock);
GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "enter system clock thread");
GST_OBJECT_LOCK (clock);
......@@ -285,6 +398,10 @@ gst_system_clock_async_thread (GstClock * clock)
entry->user_data);
GST_OBJECT_LOCK (clock);
}
if (clock->entries->data != entry) {
/* new entries have been added, clear async wakeups */
gst_system_clock_clear_async_wakeups_unlocked (sysclock);
}
if (entry->type == GST_CLOCK_ENTRY_PERIODIC) {
/* adjust time now */
entry->time = requested + entry->interval;
......@@ -303,6 +420,8 @@ gst_system_clock_async_thread (GstClock * clock)
* was canceled. Whatever it is, pick the head entry of the list and
* continue waiting. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry %p needs restart", entry);
/* clear async wakeups, if any */
gst_system_clock_clear_async_wakeups_unlocked (sysclock);
continue;
default:
GST_CAT_DEBUG (GST_CAT_CLOCK,
......@@ -323,21 +442,60 @@ exit:
GST_CAT_DEBUG (GST_CAT_CLOCK, "exit system clock thread");
}
#ifdef HAVE_POSIX_TIMERS
static inline clockid_t
clock_type_to_posix_id (GstClockType clock_type)
{
#ifdef HAVE_MONOTONIC_CLOCK
if (clock_type == GST_CLOCK_TYPE_MONOTONIC)
return CLOCK_MONOTONIC;
else
#endif
return CLOCK_REALTIME;
}
#endif
/* MT safe */
static GstClockTime
gst_system_clock_get_internal_time (GstClock * clock)
{
#ifdef HAVE_POSIX_TIMERS
GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock);
clockid_t ptype;
struct timespec ts;
ptype = clock_type_to_posix_id (sysclock->priv->clock_type);
if (G_UNLIKELY (clock_gettime (ptype, &ts)))
return GST_CLOCK_TIME_NONE;
return GST_TIMESPEC_TO_TIME (ts);
#else
GTimeVal timeval;
g_get_current_time (&timeval);
return GST_TIMEVAL_TO_TIME (timeval);
#endif
}
static guint64
gst_system_clock_get_resolution (GstClock * clock)
{
#ifdef HAVE_POSIX_TIMERS
GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock);
clockid_t ptype;
struct timespec ts;
ptype = clock_type_to_posix_id (sysclock->priv->clock_type);
if (G_UNLIKELY (clock_getres (ptype, &ts)))
return GST_CLOCK_TIME_NONE;
return GST_TIMESPEC_TO_TIME (ts);
#else
return 1 * GST_USECOND;
#endif
}
/* synchronously wait on the given GstClockEntry.
......@@ -360,81 +518,119 @@ static GstClockReturn
gst_system_clock_id_wait_jitter_unlocked (GstClock * clock,
GstClockEntry * entry, GstClockTimeDiff * jitter, gboolean restart)
{
GstClockTime entryt, real, now, target;
GstSystemClock *sysclock = GST_SYSTEM_CLOCK_CAST (clock);
GstClockTime entryt, real, now;
GstClockTimeDiff diff;
/* need to call the overridden method because we want to sync against the time
* of the clock, whatever the subclass uses as a clock. */
real = GST_CLOCK_GET_CLASS (clock)->get_internal_time (clock);
now = gst_clock_adjust_unlocked (clock, real);
/* get the time of the entry */
entryt = GST_CLOCK_ENTRY_TIME (entry);
now = gst_clock_adjust_unlocked (clock, real);
if (jitter) {
*jitter = GST_CLOCK_DIFF (entryt, now);
}
/* the diff of the entry with the clock is the amount of time we have to
* wait */
diff = entryt - now;
/* Our GCond implementation expects an absolute time against the system clock
* as a timeout value. We use our internal time to get the system time and add
* the expected timeout to it, this gives us the absolute time of the
* timeout. */
target = gst_system_clock_get_internal_time (clock) + diff;
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p"
" target %" GST_TIME_FORMAT
" entry %" GST_TIME_FORMAT
" time %" GST_TIME_FORMAT
" now %" GST_TIME_FORMAT
" real %" GST_TIME_FORMAT
" diff (entry-now) %" G_GINT64_FORMAT,
" diff (time-now) %" G_GINT64_FORMAT,
entry,
GST_TIME_ARGS (target),
GST_TIME_ARGS (entryt), GST_TIME_ARGS (now), GST_TIME_ARGS (real), diff);
if (diff > 0) {
GTimeVal tv;
#ifdef WAIT_DEBUGGING
GstClockTime result, final;
GstClockTime final;
#endif
GST_TIME_TO_TIMEVAL (target, tv);
while (entry->status != GST_CLOCK_UNSCHEDULED) {
gint pollret;
while (TRUE) {
/* now wait on the entry, it either times out or the cond is signaled. */
if (!GST_CLOCK_TIMED_WAIT (clock, &tv)) {
/* timeout, this is fine, we can report success now */
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked after timeout", entry);
entry->status = GST_CLOCK_OK;
/* mark the entry as busy */
entry->status = GST_CLOCK_BUSY;
GST_OBJECT_UNLOCK (clock);
#ifdef WAIT_DEBUGGING
/* now wait on the entry, it either times out or the fd is written. */
pollret = gst_poll_wait (sysclock->priv->timer, diff);
/* another thread can read the fd before we get the lock */
GST_OBJECT_LOCK (clock);
if (entry->status == GST_CLOCK_UNSCHEDULED) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked", entry);
while (!gst_poll_read_control (sysclock->priv->timer)) {
g_warning ("gstsystemclock: read control failed, trying again\n");
}
GST_CLOCK_BROADCAST (clock);
} else {
if (pollret != 0) {
/* some other id got unlocked */
if (!restart) {
/* this can happen if the entry got unlocked because of an async
* entry was added to the head of the async queue. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "wakeup waiting for entry %p", entry);
break;
}
/* mark ourselves as EARLY, we release the lock and we could be
* unscheduled ourselves but we don't want the unscheduling thread
* to write on the fd */
entry->status = GST_CLOCK_EARLY;
/* before waiting on the cond, check if another thread read the fd
* before we got the lock */
while (gst_poll_wait (sysclock->priv->timer, 0) > 0) {
GST_CLOCK_WAIT (clock);
}
/* we released the lock in the wait, recheck our status */
if (entry->status == GST_CLOCK_UNSCHEDULED) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p got unscheduled", entry);
break;
}
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p needs to be restarted",
entry);
} else {
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked after timeout",
entry);
}
/* reschedule if gst_poll_wait returned early or we have to reschedule after
* an unlock*/
real = GST_CLOCK_GET_CLASS (clock)->get_internal_time (clock);
result = gst_clock_adjust_unlocked (clock, real);
final = gst_system_clock_get_internal_time (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "Waited for %" G_GINT64_FORMAT
" got %" G_GINT64_FORMAT " diff %" G_GINT64_FORMAT
" %g target-offset %" G_GINT64_FORMAT " %g", entryt, result,
result - entryt,
(double) (GstClockTimeDiff) (result - entryt) / GST_SECOND,
(final - target),
((double) (GstClockTimeDiff) (final - target)) / GST_SECOND);
now = gst_clock_adjust_unlocked (clock, real);
diff = entryt - now;
if (diff <= 0) {
/* timeout, this is fine, we can report success now */
entry->status = GST_CLOCK_OK;
GST_CAT_DEBUG (GST_CAT_CLOCK,
"entry %p finished, diff %" G_GINT64_FORMAT, entry, diff);
#ifdef WAIT_DEBUGGING
final = gst_system_clock_get_internal_time (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "Waited for %" G_GINT64_FORMAT
" got %" G_GINT64_FORMAT " diff %" G_GINT64_FORMAT
" %g target-offset %" G_GINT64_FORMAT " %g", entryt, now,
now - entryt,
(double) (GstClockTimeDiff) (now - entryt) / GST_SECOND,
(final - target),
((double) (GstClockTimeDiff) (final - target)) / GST_SECOND);
#endif
break;
} else {
/* the waiting is interrupted because the GCond was signaled. This can
* be because this or some other entry was unscheduled. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "entry %p unlocked with signal", entry);
/* if the entry is unscheduled, we can stop waiting for it, else we
* continue our while loop. */
if (entry->status == GST_CLOCK_UNSCHEDULED)
break;
/* else restart if we must */
if (!restart)
break;
/* this can happen if the entry got unlocked because of an async entry
* was added to the head of the async queue. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "continue waiting for entry %p", entry);
} else {
GST_CAT_DEBUG (GST_CAT_CLOCK,
"entry %p restart, diff %" G_GINT64_FORMAT, entry, diff);
}
}
}
} else if (diff == 0) {
......@@ -496,14 +692,21 @@ no_thread:
static GstClockReturn
gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
{
gboolean empty;
GstSystemClock *sysclock;
sysclock = GST_SYSTEM_CLOCK_CAST (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "adding async entry %p", entry);
GST_OBJECT_LOCK (clock);
/* Start the clock async thread if needed */
if (!gst_system_clock_start_async (GST_SYSTEM_CLOCK (clock)))
if (!gst_system_clock_start_async (sysclock))
goto thread_error;
empty = (clock->entries == NULL);
/* need to take a ref */
gst_clock_id_ref ((GstClockID) entry);
/* insert the entry in sorted order */
......@@ -514,13 +717,17 @@ gst_system_clock_id_wait_async (GstClock * clock, GstClockEntry * entry)
* front, else the thread is just waiting for another entry and
* will get to this entry automatically. */
if (clock->entries->data == entry) {
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head, sending signal");
/* this will wake up _all_ entries waiting for the clock because we have
* only one cond for all entries (makes allocation faster). Entries that
* have not timed out will have their status set to BUSY and should continue
* to wait. In the case of the async ones, the new head entry should be
* taken and waited for. */
GST_CLOCK_BROADCAST (clock);
GST_CAT_DEBUG (GST_CAT_CLOCK, "async entry added to head");
if (empty) {
/* the list was empty before, signal the cond so that the async thread can
* start taking a look at the queue */
GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal");
GST_CLOCK_BROADCAST (clock);
} else {
/* the async thread was waiting for an entry, unlock the wait so that it
* looks at the new head entry instead */
gst_system_clock_wakeup_async_unlocked (sysclock);
}
}
GST_OBJECT_UNLOCK (clock);
......@@ -544,11 +751,17 @@ gst_system_clock_id_unschedule (GstClock * clock, GstClockEntry * entry)
GST_CAT_DEBUG (GST_CAT_CLOCK, "unscheduling entry %p", entry);
GST_OBJECT_LOCK (clock);
/* mark entry as unscheduled, then wake up all entries. The entries that did
* not timeout will be woken up but immediatly go to sleep again because their
* status would still be busy. */
if (entry->status == GST_CLOCK_BUSY) {
/* the entry was being busy, wake up all entries so that they recheck their
* status. We cannot wake up just one entry because allocating such a
* datastructure for each entry would be too heavey and unlocking an entry
* is usually done when shutting down or some other exceptional case. */
GST_CAT_DEBUG (GST_CAT_CLOCK, "writing control");
while (!gst_poll_write_control (GST_SYSTEM_CLOCK_CAST (clock)->priv->timer)) {
g_warning ("gstsystemclock: write control failed, trying again\n");
}
}
/* when it leaves the poll, it'll detect the unscheduled */
entry->status = GST_CLOCK_UNSCHEDULED;
GST_CAT_DEBUG (GST_CAT_CLOCK, "sending signal");
GST_CLOCK_BROADCAST (clock);
GST_OBJECT_UNLOCK (clock);
}
......@@ -31,6 +31,7 @@ G_BEGIN_DECLS
#define GST_TYPE_SYSTEM_CLOCK (gst_system_clock_get_type ())
#define GST_SYSTEM_CLOCK(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_SYSTEM_CLOCK, GstSystemClock))
#define GST_SYSTEM_CLOCK_CAST(obj) ((GstSystemClock *)(obj))
#define GST_IS_SYSTEM_CLOCK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_SYSTEM_CLOCK))
#define GST_SYSTEM_CLOCK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_SYSTEM_CLOCK, GstSystemClockClass))
#define GST_IS_SYSTEM_CLOCK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_SYSTEM_CLOCK))
......@@ -39,6 +40,18 @@ G_BEGIN_DECLS
typedef struct _GstSystemClock GstSystemClock;
typedef struct _GstSystemClockClass GstSystemClockClass;
typedef struct _GstSystemClockPrivate GstSystemClockPrivate;
/**
* GstClockType:
* @GST_CLOCK_TYPE_REALTIME: time since Epoch
* @GST_CLOCK_TYPE_MONOTONIC: monotonic time since some unspecified starting
* point
*/
typedef enum {
GST_CLOCK_TYPE_REALTIME = 0,
GST_CLOCK_TYPE_MONOTONIC = 1
} GstClockType;
/**
* GstSystemClock:
......@@ -53,7 +66,10 @@ struct _GstSystemClock {
GThread *thread; /* thread for async notify */
gboolean stopping;
gpointer _gst_reserved[GST_PADDING];
/* ABI added */
GstSystemClockPrivate *priv;
gpointer _gst_reserved[GST_PADDING - 1];
};
struct _GstSystemClockClass {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment