[#969] Decrease NTP sync period after first success (#977)

This commit is contained in:
Adrià Cidre 2018-05-22 17:38:38 +02:00 committed by GitHub
parent 345b152a8b
commit 888ad10b26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 19 deletions

View File

@ -2,6 +2,7 @@ package timesource
import (
"bytes"
"errors"
"sort"
"sync"
"time"
@ -16,8 +17,13 @@ const (
// DefaultMaxAllowedFailures defines how many failures will be tolerated.
DefaultMaxAllowedFailures = 2
// DefaultUpdatePeriod defines how often time will be queried from ntp.
DefaultUpdatePeriod = 2 * time.Minute
// FastNTPSyncPeriod period between ntp synchronizations before the first
// successful connection.
FastNTPSyncPeriod = 2 * time.Minute
// SlowNTPSyncPeriod period between ntp synchronizations after the first
// successful connection.
SlowNTPSyncPeriod = 1 * time.Hour
// DefaultRPCTimeout defines write deadline for single ntp server request.
DefaultRPCTimeout = 2 * time.Second
@ -31,6 +37,7 @@ var defaultServers = []string{
"2.pool.ntp.org",
"3.pool.ntp.org",
}
var errUpdateOffset = errors.New("failed to compute offset")
type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error)
@ -107,20 +114,22 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t
// Default initializes time source with default config values.
func Default() *NTPTimeSource {
return &NTPTimeSource{
servers: defaultServers,
allowedFailures: DefaultMaxAllowedFailures,
updatePeriod: DefaultUpdatePeriod,
timeQuery: ntp.QueryWithOptions,
servers: defaultServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: FastNTPSyncPeriod,
slowNTPSyncPeriod: SlowNTPSyncPeriod,
timeQuery: ntp.QueryWithOptions,
}
}
// NTPTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
servers []string
allowedFailures int
updatePeriod time.Duration
timeQuery ntpQuery // for ease of testing
servers []string
allowedFailures int
fastNTPSyncPeriod time.Duration
slowNTPSyncPeriod time.Duration
timeQuery ntpQuery // for ease of testing
quit chan struct{}
wg sync.WaitGroup
@ -136,39 +145,51 @@ func (s *NTPTimeSource) Now() time.Time {
return time.Now().Add(s.latestOffset)
}
func (s *NTPTimeSource) updateOffset() {
func (s *NTPTimeSource) updateOffset() error {
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
if err != nil {
log.Error("failed to compute offset", "error", err)
return
return errUpdateOffset
}
log.Info("Difference with ntp servers", "offset", offset)
s.mu.Lock()
s.latestOffset = offset
s.mu.Unlock()
return nil
}
// Start runs a goroutine that updates local offset every updatePeriod.
func (s *NTPTimeSource) Start(*p2p.Server) error {
// runPeriodically runs periodically the given function based on NTPTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(fn func() error) error {
var period time.Duration
s.quit = make(chan struct{})
ticker := time.NewTicker(s.updatePeriod)
// we try to do it synchronously so that user can have reliable messages right away
s.updateOffset()
s.wg.Add(1)
go func() {
for {
select {
case <-ticker.C:
s.updateOffset()
case <-time.After(period):
if err := fn(); err == nil {
period = s.slowNTPSyncPeriod
} else if period != s.slowNTPSyncPeriod {
period = s.fastNTPSyncPeriod
}
case <-s.quit:
s.wg.Done()
return
}
}
}()
return nil
}
// Start runs a goroutine that updates local offset every updatePeriod.
func (s *NTPTimeSource) Start(*p2p.Server) error {
return s.runPeriodically(s.updateOffset)
}
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
if s.quit == nil {

View File

@ -8,6 +8,7 @@ import (
"github.com/beevik/ntp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@ -175,8 +176,68 @@ func TestNTPTimeSource(t *testing.T) {
timeQuery: tc.query,
}
assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta)
source.updateOffset()
err := source.updateOffset()
if tc.expectError {
assert.Equal(t, errUpdateOffset, err)
} else {
assert.NoError(t, err)
}
assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta)
})
}
}
func TestRunningPeriodically(t *testing.T) {
var hits int
var mu sync.RWMutex
periods := make([]time.Duration, 0)
tc := newTestCases()[0]
fastHits := 3
slowHits := 1
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond,
slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond,
}
lastCall := time.Now()
// we're simulating a calls to updateOffset, testing ntp calls happens
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
err := source.runPeriodically(func() error {
mu.Lock()
periods = append(periods, time.Since(lastCall))
mu.Unlock()
hits++
if hits < 3 {
return errUpdateOffset
}
if hits == 6 {
source.wg.Done()
}
return nil
})
source.wg.Wait()
require.NoError(t, err)
mu.Lock()
require.Len(t, periods, 6)
defer mu.Unlock()
prev := 0
for _, period := range periods[1:3] {
p := int(period.Seconds() * 100)
require.True(t, fastHits <= (p-prev))
prev = p
}
for _, period := range periods[3:] {
p := int(period.Seconds() * 100)
require.True(t, slowHits <= (p-prev))
prev = p
}
})
}