Use different pool hostnames for NTP timesource (#939) (#952)

* Use different pool hostnames for NTP timesource

* Make govet happy

* Add check for empty servers list
This commit is contained in:
Ivan Daniluk 2018-05-15 17:10:51 +03:00 committed by Dmitry Shulyak
parent 9e65b5a6ae
commit bd68fa15c9
2 changed files with 36 additions and 32 deletions

View File

@ -13,13 +13,6 @@ import (
)
const (
// DefaultServer will be internally resolved to the closest available.
// also it rarely queries same server more than once.
DefaultServer = "pool.ntp.org"
// DefaultAttempts defines how many servers we will query
DefaultAttempts = 5
// DefaultMaxAllowedFailures defines how many failures will be tolerated.
DefaultMaxAllowedFailures = 2
@ -30,6 +23,15 @@ const (
DefaultRPCTimeout = 2 * time.Second
)
// defaultServers will be resolved to the closest available,
// and with high probability resolved to the different IPs
var defaultServers = []string{
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
}
type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error)
type queryResponse struct {
@ -54,13 +56,13 @@ func (e multiRPCError) Error() string {
return b.String()
}
func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures int) (time.Duration, error) {
if attempts == 0 {
func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) {
if len(servers) == 0 {
return 0, nil
}
responses := make(chan queryResponse, attempts)
for i := 0; i < attempts; i++ {
go func() {
responses := make(chan queryResponse, len(servers))
for _, server := range servers {
go func(server string) {
response, err := timeQuery(server, ntp.QueryOptions{
Timeout: DefaultRPCTimeout,
})
@ -69,7 +71,7 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures
return
}
responses <- queryResponse{Offset: response.ClockOffset}
}()
}(server)
}
var (
rpcErrors multiRPCError
@ -83,19 +85,19 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures
offsets = append(offsets, response.Offset)
}
collected++
if collected == attempts {
if collected == len(servers) {
break
}
}
if lth := len(rpcErrors); lth > allowedFailures {
return 0, rpcErrors
} else if lth == attempts {
} else if lth == len(servers) {
return 0, rpcErrors
}
sort.SliceStable(offsets, func(i, j int) bool {
return offsets[i] > offsets[j]
})
mid := attempts / 2
mid := len(servers) / 2
if len(offsets)%2 == 0 {
return (offsets[mid-1] + offsets[mid]) / 2, nil
}
@ -105,8 +107,7 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures
// Default initializes time source with default config values.
func Default() *NTPTimeSource {
return &NTPTimeSource{
server: DefaultServer,
attempts: DefaultAttempts,
servers: defaultServers,
allowedFailures: DefaultMaxAllowedFailures,
updatePeriod: DefaultUpdatePeriod,
timeQuery: ntp.QueryWithOptions,
@ -116,8 +117,7 @@ func Default() *NTPTimeSource {
// NTPTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
server string
attempts int
servers []string
allowedFailures int
updatePeriod time.Duration
timeQuery ntpQuery // for ease of testing
@ -137,7 +137,7 @@ func (s *NTPTimeSource) Now() time.Time {
}
func (s *NTPTimeSource) updateOffset() {
offset, err := computeOffset(s.timeQuery, s.server, s.attempts, s.allowedFailures)
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
if err != nil {
log.Error("failed to compute offset", "error", err)
return

View File

@ -15,9 +15,13 @@ const (
clockCompareDelta = 30 * time.Microsecond
)
// we don't user real servers for tests, but logic depends on
// actual number of involved NTP servers.
var mockedServers = []string{"ntp1", "ntp2", "ntp3"}
type testCase struct {
description string
attempts int
servers []string
allowedFailures int
responses []queryResponse
expected time.Duration
@ -42,7 +46,7 @@ func newTestCases() []*testCase {
return []*testCase{
{
description: "SameResponse",
attempts: 3,
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 10 * time.Second},
@ -52,7 +56,7 @@ func newTestCases() []*testCase {
},
{
description: "Median",
attempts: 3,
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 20 * time.Second},
@ -62,7 +66,7 @@ func newTestCases() []*testCase {
},
{
description: "EvenMedian",
attempts: 2,
servers: mockedServers[:2],
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 20 * time.Second},
@ -71,7 +75,7 @@ func newTestCases() []*testCase {
},
{
description: "Error",
attempts: 3,
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
@ -82,7 +86,7 @@ func newTestCases() []*testCase {
},
{
description: "MultiError",
attempts: 3,
servers: mockedServers,
responses: []queryResponse{
{Error: errors.New("test 1")},
{Error: errors.New("test 2")},
@ -93,7 +97,7 @@ func newTestCases() []*testCase {
},
{
description: "TolerableError",
attempts: 3,
servers: mockedServers,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
@ -104,7 +108,7 @@ func newTestCases() []*testCase {
},
{
description: "NonTolerableError",
attempts: 3,
servers: mockedServers,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
@ -116,7 +120,7 @@ func newTestCases() []*testCase {
},
{
description: "AllFailed",
attempts: 3,
servers: mockedServers,
allowedFailures: 3,
responses: []queryResponse{
{Error: errors.New("test")},
@ -132,7 +136,7 @@ func newTestCases() []*testCase {
func TestComputeOffset(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
offset, err := computeOffset(tc.query, "", tc.attempts, tc.allowedFailures)
offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures)
if tc.expectError {
assert.Error(t, err)
} else {
@ -147,7 +151,7 @@ func TestNTPTimeSource(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
attempts: tc.attempts,
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
}