Commit 202f4631 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵
Browse files

netclientclock: Add NTPv4 support

This uses all of the netclientclock code, except for the generation and
parsing of packets. Unfortunately some code duplication was necessary
because GstNetTimePacket is public API and couldn't be extended easily
to support NTPv4 packets without breaking API/ABI.
parent cc15f68b
......@@ -890,7 +890,9 @@ gst_net_control_message_meta_api_get_type
<TITLE>GstNetClientClock</TITLE>
<INCLUDE>gst/net/gstnet.h</INCLUDE>
GstNetClientClock
GstNtpClock
gst_net_client_clock_new
gst_ntp_clock_new
<SUBSECTION Standard>
GstNetClientClockClass
GstNetClientClockPrivate
......@@ -899,8 +901,15 @@ GST_IS_NET_CLIENT_CLOCK
GST_TYPE_NET_CLIENT_CLOCK
GST_NET_CLIENT_CLOCK_CLASS
GST_IS_NET_CLIENT_CLOCK_CLASS
GstNtpClockClass
GST_NTP_CLOCK
GST_IS_NTP_CLOCK
GST_TYPE_NTP_CLOCK
GST_NTP_CLOCK_CLASS
GST_IS_NTP_CLOCK_CLASS
<SUBSECTION Private>
gst_net_client_clock_get_type
gst_ntp_clock_get_type
</SECTION>
......
......@@ -17,9 +17,10 @@ libgstnet_@GST_API_VERSION@_la_SOURCES = \
gstnetcontrolmessagemeta.c \
gstnettimepacket.c \
gstnettimeprovider.c \
gstptpclock.c
gstptpclock.c \
gstntppacket.c
noinst_HEADERS = gstptp_private.h
noinst_HEADERS = gstptp_private.h gstntppacket.h
libgstnet_@GST_API_VERSION@_la_CFLAGS = $(GST_OBJ_CFLAGS) $(GIO_CFLAGS)
libgstnet_@GST_API_VERSION@_la_LIBADD = $(GST_OBJ_LIBS) $(GIO_LIBS) \
......
......@@ -28,12 +28,13 @@
* provider.
* @see_also: #GstClock, #GstNetTimeProvider, #GstPipeline
*
* This object implements a custom #GstClock that synchronizes its time
* to a remote time provider such as #GstNetTimeProvider.
* #GstNetClientClock implements a custom #GstClock that synchronizes its time
* to a remote time provider such as #GstNetTimeProvider. #GstNtpClock
* implements a #GstClock that synchronizes its time to a remote NTPv4 server.
*
* A new clock is created with gst_net_client_clock_new() which takes the
* address and port of the remote time provider along with a name and
* an initial time.
* A new clock is created with gst_net_client_clock_new() or
* gst_ntp_clock_new(), which takes the address and port of the remote time
* provider along with a name and an initial time.
*
* This clock will poll the time provider and will update its calibration
* parameters based on the local and remote observations.
......@@ -43,7 +44,7 @@
* Various parameters of the clock can be configured with the parent #GstClock
* "timeout", "window-size" and "window-threshold" object properties.
*
* A #GstNetClientClock is typically set on a #GstPipeline with
* A #GstNetClientClock and #GstNtpClock is typically set on a #GstPipeline with
* gst_pipeline_use_clock().
*
* If you set a #GstBus on the clock via the "bus" object property, it will
......@@ -56,6 +57,7 @@
#endif
#include "gstnettimepacket.h"
#include "gstntppacket.h"
#include "gstnetclientclock.h"
#include <gio/gio.h>
......@@ -106,6 +108,7 @@ struct _GstNetClientClockPrivate
GstClockTime roundtrip_limit;
GstClockTime rtt_avg;
GstClockTime minimum_update_interval;
GstClockTime last_remote_poll_interval;
guint skipped_updates;
GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW];
gint last_rtts_missing;
......@@ -116,6 +119,8 @@ struct _GstNetClientClockPrivate
gint port;
GstBus *bus;
gboolean is_ntp;
};
#define _do_init \
......@@ -215,6 +220,7 @@ gst_net_client_clock_init (GstNetClientClock * self)
priv->rtt_avg = GST_CLOCK_TIME_NONE;
priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT;
priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL;
priv->last_remote_poll_interval = GST_CLOCK_TIME_NONE;
priv->skipped_updates = 0;
priv->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW;
priv->base_time = DEFAULT_BASE_TIME;
......@@ -409,7 +415,16 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
GST_OBJECT_LOCK (self);
rtt_limit = self->priv->roundtrip_limit;
min_update_interval = self->priv->minimum_update_interval;
/* If the server told us a poll interval and it's bigger than the
* one configured via the property, use the server's */
if (self->priv->last_remote_poll_interval != GST_CLOCK_TIME_NONE &&
self->priv->last_remote_poll_interval >
self->priv->minimum_update_interval)
min_update_interval = self->priv->last_remote_poll_interval;
else
min_update_interval = self->priv->minimum_update_interval;
if (self->priv->bus)
bus = gst_object_ref (self->priv->bus);
GST_OBJECT_UNLOCK (self);
......@@ -666,7 +681,6 @@ static gpointer
gst_net_client_clock_thread (gpointer data)
{
GstNetClientClock *self = data;
GstNetTimePacket *packet;
GSocket *socket = self->priv->socket;
GError *err = NULL;
GstClock *clock = data;
......@@ -700,18 +714,38 @@ gst_net_client_clock_thread (gpointer data)
/* timed out, let's send another packet */
GST_DEBUG_OBJECT (self, "timed out");
packet = gst_net_time_packet_new (NULL);
if (self->priv->is_ntp) {
GstNtpPacket *packet;
packet = gst_ntp_packet_new (NULL, NULL);
packet->transmit_time =
gst_clock_get_internal_time (GST_CLOCK (self));
GST_DEBUG_OBJECT (self,
"sending packet, local time = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->transmit_time));
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
gst_ntp_packet_send (packet, self->priv->socket,
self->priv->servaddr, NULL);
GST_DEBUG_OBJECT (self,
"sending packet, local time = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->local_time));
g_free (packet);
} else {
GstNetTimePacket *packet;
gst_net_time_packet_send (packet, self->priv->socket,
self->priv->servaddr, NULL);
packet = gst_net_time_packet_new (NULL);
g_free (packet);
packet->local_time = gst_clock_get_internal_time (GST_CLOCK (self));
GST_DEBUG_OBJECT (self,
"sending packet, local time = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->local_time));
gst_net_time_packet_send (packet, self->priv->socket,
self->priv->servaddr, NULL);
g_free (packet);
}
/* reset timeout (but are expecting a response sooner anyway) */
self->priv->timeout_expiration =
......@@ -728,25 +762,81 @@ gst_net_client_clock_thread (gpointer data)
new_local = gst_clock_get_internal_time (GST_CLOCK (self));
packet = gst_net_time_packet_receive (socket, NULL, &err);
if (packet != NULL) {
GST_LOG_OBJECT (self, "got packet back");
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->local_time));
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->remote_time));
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_local));
/* observe_times will reset the timeout */
gst_net_client_clock_observe_times (self, packet->local_time,
packet->remote_time, packet->remote_time, new_local);
g_free (packet);
} else if (err != NULL) {
GST_WARNING_OBJECT (self, "receive error: %s", err->message);
g_clear_error (&err);
if (self->priv->is_ntp) {
GstNtpPacket *packet;
packet = gst_ntp_packet_receive (socket, NULL, &err);
if (packet != NULL) {
GST_LOG_OBJECT (self, "got packet back");
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->origin_time));
GST_LOG_OBJECT (self, "remote_1 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->receive_time));
GST_LOG_OBJECT (self, "remote_2 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->transmit_time));
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_local));
GST_LOG_OBJECT (self, "poll_interval = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->poll_interval));
/* Remember the last poll interval we ever got from the server */
if (packet->poll_interval != GST_CLOCK_TIME_NONE)
self->priv->last_remote_poll_interval = packet->poll_interval;
/* observe_times will reset the timeout */
gst_net_client_clock_observe_times (self, packet->origin_time,
packet->receive_time, packet->transmit_time, new_local);
g_free (packet);
} else if (err != NULL) {
if (g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION)
|| g_error_matches (err, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY)) {
GST_ERROR_OBJECT (self, "fatal receive error: %s", err->message);
break;
} else if (g_error_matches (err, GST_NTP_ERROR,
GST_NTP_ERROR_KOD_RATE)) {
GST_WARNING_OBJECT (self, "need to limit rate");
/* If the server did not tell us a poll interval before, double
* our minimum poll interval. Otherwise we assume that the server
* already told us something sensible and that this error here
* was just a spurious error */
if (self->priv->last_remote_poll_interval == GST_CLOCK_TIME_NONE)
self->priv->minimum_update_interval *= 2;
/* And wait a bit before we send the next packet instead of
* sending it immediately */
self->priv->timeout_expiration =
gst_util_get_timestamp () + gst_clock_get_timeout (clock);
} else {
GST_WARNING_OBJECT (self, "receive error: %s", err->message);
}
g_clear_error (&err);
}
} else {
GstNetTimePacket *packet;
packet = gst_net_time_packet_receive (socket, NULL, &err);
if (packet != NULL) {
GST_LOG_OBJECT (self, "got packet back");
GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->local_time));
GST_LOG_OBJECT (self, "remote = %" GST_TIME_FORMAT,
GST_TIME_ARGS (packet->remote_time));
GST_LOG_OBJECT (self, "local_2 = %" GST_TIME_FORMAT,
GST_TIME_ARGS (new_local));
/* observe_times will reset the timeout */
gst_net_client_clock_observe_times (self, packet->local_time,
packet->remote_time, packet->remote_time, new_local);
g_free (packet);
} else if (err != NULL) {
GST_WARNING_OBJECT (self, "receive error: %s", err->message);
g_clear_error (&err);
}
}
}
}
......@@ -922,3 +1012,46 @@ gst_net_client_clock_new (const gchar * name, const gchar * remote_address,
return (GstClock *) ret;
}
G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK);
static void
gst_ntp_clock_class_init (GstNtpClockClass * klass)
{
}
static void
gst_ntp_clock_init (GstNtpClock * self)
{
GST_NET_CLIENT_CLOCK (self)->priv->is_ntp = TRUE;
}
/**
* gst_ntp_clock_new:
* @name: a name for the clock
* @remote_address: the address of the remote clock provider
* @remote_port: the port of the remote clock provider
* @base_time: initial time of the clock
*
* Create a new #GstNtpClock that will report the time provided by
* the NTPv4 server on @remote_address and @remote_port.
*
* Returns: a new #GstClock that receives a time from the remote
* clock.
*/
GstClock *
gst_ntp_clock_new (const gchar * name, const gchar * remote_address,
gint remote_port, GstClockTime base_time)
{
GstNetClientClock *ret;
g_return_val_if_fail (remote_address != NULL, NULL);
g_return_val_if_fail (remote_port > 0, NULL);
g_return_val_if_fail (remote_port <= G_MAXUINT16, NULL);
g_return_val_if_fail (base_time != GST_CLOCK_TIME_NONE, NULL);
ret = g_object_new (GST_TYPE_NTP_CLOCK, "address", remote_address,
"port", remote_port, "base-time", base_time, NULL);
return (GstClock *) ret;
}
......@@ -73,6 +73,24 @@ GType gst_net_client_clock_get_type (void);
GstClock* gst_net_client_clock_new (const gchar *name, const gchar *remote_address,
gint remote_port, GstClockTime base_time);
#define GST_TYPE_NTP_CLOCK \
(gst_ntp_clock_get_type())
#define GST_NTP_CLOCK(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NTP_CLOCK,GstNtpClock))
#define GST_NTP_CLOCK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NTP_CLOCK,GstNtpClockClass))
#define GST_IS_NTP_CLOCK(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NTP_CLOCK))
#define GST_IS_NTP_CLOCK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NTP_CLOCK))
typedef struct _GstNetClientClock GstNtpClock;
typedef struct _GstNetClientClockClass GstNtpClockClass;
GType gst_ntp_clock_get_type (void);
GstClock* gst_ntp_clock_new (const gchar *name, const gchar *remote_address,
gint remote_port, GstClockTime base_time);
G_END_DECLS
#endif /* __GST_NET_CLIENT_CLOCK_H__ */
/* GStreamer
* Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
* Copyright (C) 2010 Tim-Philipp Müller <tim centricular net>
* Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
* Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:gstntppacket
* @short_description: Helper structure to construct clock packets used
* by network clocks for NTPv4.
* @see_also: #GstClock, #GstNetClientClock, #GstNtpProvider
*
* Various functions for receiving, sending an serializing #GstNtpPacket
* structures.
*/
/* FIXME 2.0: Merge this with GstNetTimePacket! */
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <glib.h>
#ifdef __CYGWIN__
# include <unistd.h>
# include <fcntl.h>
#endif
#include <gst/gst.h>
#include <string.h>
#include "gstntppacket.h"
G_DEFINE_BOXED_TYPE (GstNtpPacket, gst_ntp_packet,
gst_ntp_packet_copy, gst_ntp_packet_free);
static inline GstClockTime
ntp_timestamp_to_gst_clock_time (guint32 seconds, guint32 fraction)
{
return gst_util_uint64_scale (seconds, GST_SECOND, 1) +
gst_util_uint64_scale (fraction, GST_SECOND,
G_GUINT64_CONSTANT (1) << 32);
}
static inline guint32
gst_clock_time_to_ntp_timestamp_seconds (GstClockTime gst)
{
GstClockTime seconds = gst_util_uint64_scale (gst, 1, GST_SECOND);
return seconds;
}
static inline guint32
gst_clock_time_to_ntp_timestamp_fraction (GstClockTime gst)
{
GstClockTime seconds = gst_util_uint64_scale (gst, 1, GST_SECOND);
return gst_util_uint64_scale (gst - seconds, G_GUINT64_CONSTANT (1) << 32,
GST_SECOND);
}
/**
* gst_ntp_packet_new:
* @buffer: (array): a buffer from which to construct the packet, or NULL
*
* Creates a new #GstNtpPacket from a buffer received over the network. The
* caller is responsible for ensuring that @buffer is at least
* #GST_NTP_PACKET_SIZE bytes long.
*
* If @buffer is #NULL, the local and remote times will be set to
* #GST_CLOCK_TIME_NONE.
*
* MT safe. Caller owns return value (gst_ntp_packet_free to free).
*
* Returns: The new #GstNtpPacket.
*/
GstNtpPacket *
gst_ntp_packet_new (const guint8 * buffer, GError ** error)
{
GstNtpPacket *ret;
g_assert (sizeof (GstClockTime) == 8);
if (buffer) {
guint8 version = (buffer[0] >> 3) & 0x7;
guint8 stratum = buffer[1];
guint8 poll_interval = buffer[2];
if (version != 4) {
g_set_error (error, GST_NTP_ERROR, GST_NTP_ERROR_WRONG_VERSION,
"Invalid NTP version %d", version);
return NULL;
}
/* Kiss-o'-Death packet! */
if (stratum == 0) {
gchar code[5] = { buffer[3 * 4 + 0], buffer[3 * 4 + 1], buffer[3 * 4 + 2],
buffer[3 * 4 + 3], 0
};
/* AUTH, AUTO, CRYP, DENY, RSTR, NKEY => DENY */
if (strcmp (code, "AUTH") == 0 ||
strcmp (code, "AUTO") == 0 ||
strcmp (code, "CRYP") == 0 ||
strcmp (code, "DENY") == 0 ||
strcmp (code, "RSTR") == 0 || strcmp (code, "NKEY") == 0) {
g_set_error (error, GST_NTP_ERROR, GST_NTP_ERROR_KOD_DENY,
"Kiss-o'-Death denied '%s'", code);
} else if (strcmp (code, "RATE") == 0) {
g_set_error (error, GST_NTP_ERROR, GST_NTP_ERROR_KOD_RATE,
"Kiss-o'-Death '%s'", code);
} else {
g_set_error (error, GST_NTP_ERROR, GST_NTP_ERROR_KOD_UNKNOWN,
"Kiss-o'-Death unknown '%s'", code);
}
return NULL;
}
ret = g_new0 (GstNtpPacket, 1);
ret->origin_time =
ntp_timestamp_to_gst_clock_time (GST_READ_UINT32_BE (buffer + 6 * 4),
GST_READ_UINT32_BE (buffer + 7 * 4));
ret->receive_time =
ntp_timestamp_to_gst_clock_time (GST_READ_UINT32_BE (buffer + 8 * 4),
GST_READ_UINT32_BE (buffer + 9 * 4));
ret->transmit_time =
ntp_timestamp_to_gst_clock_time (GST_READ_UINT32_BE (buffer + 10 * 4),
GST_READ_UINT32_BE (buffer + 11 * 4));
/* Wireshark considers everything >= 3 as invalid */
if (poll_interval >= 3)
ret->poll_interval = GST_CLOCK_TIME_NONE;
else if (poll_interval >= 0)
ret->poll_interval = GST_SECOND << poll_interval;
else
ret->poll_interval = GST_SECOND >> (-poll_interval);
} else {
ret = g_new0 (GstNtpPacket, 1);
ret->origin_time = 0;
ret->receive_time = 0;
ret->transmit_time = 0;
ret->poll_interval = 0;
}
return ret;
}
/**
* gst_ntp_packet_free:
* @packet: the #GstNtpPacket
*
* Free @packet.
*/
void
gst_ntp_packet_free (GstNtpPacket * packet)
{
g_free (packet);
}
/**
* gst_ntp_packet_copy:
* @packet: the #GstNtpPacket
*
* Make a copy of @packet.
*
* Returns: a copy of @packet, free with gst_ntp_packet_free().
*/
GstNtpPacket *
gst_ntp_packet_copy (const GstNtpPacket * packet)
{
GstNtpPacket *ret;
ret = g_new0 (GstNtpPacket, 1);
ret->origin_time = packet->origin_time;
ret->receive_time = packet->receive_time;
ret->transmit_time = packet->transmit_time;
return ret;
}
/**
* gst_ntp_packet_serialize:
* @packet: the #GstNtpPacket
*
* Serialized a #GstNtpPacket into a newly-allocated sequence of
* #GST_NTP_PACKET_SIZE bytes, in network byte order. The value returned is
* suitable for passing to write(2) or sendto(2) for communication over the
* network.
*
* MT safe. Caller owns return value (g_free to free).
*
* Returns: A newly allocated sequence of #GST_NTP_PACKET_SIZE bytes.
*/
guint8 *
gst_ntp_packet_serialize (const GstNtpPacket * packet)
{
guint8 *ret;
g_assert (sizeof (GstClockTime) == 8);
ret = g_new0 (guint8, GST_NTP_PACKET_SIZE);
/* Leap Indicator: unknown
* Version: 4
* Mode: Client
*/
ret[0] = (3 << 6) | (4 << 3) | (3 << 0);
/* Stratum: unsynchronized */
ret[1] = 16;
/* Polling interval: invalid */
ret[2] = 3;
/* Precision: 0 */
ret[3] = 0;
/* Root delay: 0 */
GST_WRITE_UINT32_BE (ret + 4, 0);
/* Root disperson: 0 */
GST_WRITE_UINT32_BE (ret + 2 * 4, 0);
/* Reference ID: \0 */
GST_WRITE_UINT32_BE (ret + 3 * 4, 0);
/* Reference Timestamp: 0 */
GST_WRITE_UINT32_BE (ret + 4 * 4, 0);
GST_WRITE_UINT32_BE (ret + 5 * 4, 0);
/* Origin timestamp (local time) */
GST_WRITE_UINT32_BE (ret + 6 * 4,
gst_clock_time_to_ntp_timestamp_seconds (packet->origin_time));
GST_WRITE_UINT32_BE (ret + 7 * 4,
gst_clock_time_to_ntp_timestamp_fraction (packet->origin_time));
/* Receive timestamp (remote time) */
GST_WRITE_UINT32_BE (ret + 8 * 4,
gst_clock_time_to_ntp_timestamp_seconds (packet->receive_time));
GST_WRITE_UINT32_BE (ret + 9 * 4,
gst_clock_time_to_ntp_timestamp_fraction (packet->receive_time));
/* Transmit timestamp (remote time) */
GST_WRITE_UINT32_BE (ret + 10 * 4,
gst_clock_time_to_ntp_timestamp_seconds (packet->transmit_time));
GST_WRITE_UINT32_BE (ret + 11 * 4,
gst_clock_time_to_ntp_timestamp_fraction (packet->transmit_time));
return ret;
}
/**
* gst_ntp_packet_receive:
* @socket: socket to receive the time packet on
* @src_address: (out): address of variable to return sender address
* @error: return address for a #GError, or NULL
*
* Receives a #GstNtpPacket over a socket. Handles interrupted system
* calls, but otherwise returns NULL on error.
*
* Returns: (transfer full): a new #GstNtpPacket, or NULL on error. Free
* with gst_ntp_packet_free() when done.
*/
GstNtpPacket *
gst_ntp_packet_receive (GSocket * socket,
GSocketAddress ** src_address, GError ** error)
{
gchar buffer[GST_NTP_PACKET_SIZE];
GError *err = NULL;