diff --git a/timesource/timesource.go b/timesource/timesource.go index cb678f014..800463cf2 100644 --- a/timesource/timesource.go +++ b/timesource/timesource.go @@ -13,13 +13,6 @@ import ( ) const ( - // DefaultServer will be internally resolved to the closest available. - // also it rarely queries same server more than once. - DefaultServer = "pool.ntp.org" - - // DefaultAttempts defines how many servers we will query - DefaultAttempts = 5 - // DefaultMaxAllowedFailures defines how many failures will be tolerated. DefaultMaxAllowedFailures = 2 @@ -30,6 +23,15 @@ const ( DefaultRPCTimeout = 2 * time.Second ) +// defaultServers will be resolved to the closest available, +// and with high probability resolved to the different IPs +var defaultServers = []string{ + "0.pool.ntp.org", + "1.pool.ntp.org", + "2.pool.ntp.org", + "3.pool.ntp.org", +} + type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) type queryResponse struct { @@ -54,13 +56,13 @@ func (e multiRPCError) Error() string { return b.String() } -func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures int) (time.Duration, error) { - if attempts == 0 { +func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { + if len(servers) == 0 { return 0, nil } - responses := make(chan queryResponse, attempts) - for i := 0; i < attempts; i++ { - go func() { + responses := make(chan queryResponse, len(servers)) + for _, server := range servers { + go func(server string) { response, err := timeQuery(server, ntp.QueryOptions{ Timeout: DefaultRPCTimeout, }) @@ -69,7 +71,7 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures return } responses <- queryResponse{Offset: response.ClockOffset} - }() + }(server) } var ( rpcErrors multiRPCError @@ -83,19 +85,19 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures offsets = append(offsets, response.Offset) } collected++ - if collected == attempts { + if collected == len(servers) { break } } if lth := len(rpcErrors); lth > allowedFailures { return 0, rpcErrors - } else if lth == attempts { + } else if lth == len(servers) { return 0, rpcErrors } sort.SliceStable(offsets, func(i, j int) bool { return offsets[i] > offsets[j] }) - mid := attempts / 2 + mid := len(servers) / 2 if len(offsets)%2 == 0 { return (offsets[mid-1] + offsets[mid]) / 2, nil } @@ -105,8 +107,7 @@ func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures // Default initializes time source with default config values. func Default() *NTPTimeSource { return &NTPTimeSource{ - server: DefaultServer, - attempts: DefaultAttempts, + servers: defaultServers, allowedFailures: DefaultMaxAllowedFailures, updatePeriod: DefaultUpdatePeriod, timeQuery: ntp.QueryWithOptions, @@ -116,8 +117,7 @@ func Default() *NTPTimeSource { // NTPTimeSource provides source of time that tries to be resistant to time skews. // It does so by periodically querying time offset from ntp servers. type NTPTimeSource struct { - server string - attempts int + servers []string allowedFailures int updatePeriod time.Duration timeQuery ntpQuery // for ease of testing @@ -137,7 +137,7 @@ func (s *NTPTimeSource) Now() time.Time { } func (s *NTPTimeSource) updateOffset() { - offset, err := computeOffset(s.timeQuery, s.server, s.attempts, s.allowedFailures) + offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) if err != nil { log.Error("failed to compute offset", "error", err) return diff --git a/timesource/timesource_test.go b/timesource/timesource_test.go index 02a2450f2..be5bd875a 100644 --- a/timesource/timesource_test.go +++ b/timesource/timesource_test.go @@ -15,9 +15,13 @@ const ( clockCompareDelta = 30 * time.Microsecond ) +// we don't user real servers for tests, but logic depends on +// actual number of involved NTP servers. +var mockedServers = []string{"ntp1", "ntp2", "ntp3"} + type testCase struct { description string - attempts int + servers []string allowedFailures int responses []queryResponse expected time.Duration @@ -42,7 +46,7 @@ func newTestCases() []*testCase { return []*testCase{ { description: "SameResponse", - attempts: 3, + servers: mockedServers, responses: []queryResponse{ {Offset: 10 * time.Second}, {Offset: 10 * time.Second}, @@ -52,7 +56,7 @@ func newTestCases() []*testCase { }, { description: "Median", - attempts: 3, + servers: mockedServers, responses: []queryResponse{ {Offset: 10 * time.Second}, {Offset: 20 * time.Second}, @@ -62,7 +66,7 @@ func newTestCases() []*testCase { }, { description: "EvenMedian", - attempts: 2, + servers: mockedServers[:2], responses: []queryResponse{ {Offset: 10 * time.Second}, {Offset: 20 * time.Second}, @@ -71,7 +75,7 @@ func newTestCases() []*testCase { }, { description: "Error", - attempts: 3, + servers: mockedServers, responses: []queryResponse{ {Offset: 10 * time.Second}, {Error: errors.New("test")}, @@ -82,7 +86,7 @@ func newTestCases() []*testCase { }, { description: "MultiError", - attempts: 3, + servers: mockedServers, responses: []queryResponse{ {Error: errors.New("test 1")}, {Error: errors.New("test 2")}, @@ -93,7 +97,7 @@ func newTestCases() []*testCase { }, { description: "TolerableError", - attempts: 3, + servers: mockedServers, allowedFailures: 1, responses: []queryResponse{ {Offset: 10 * time.Second}, @@ -104,7 +108,7 @@ func newTestCases() []*testCase { }, { description: "NonTolerableError", - attempts: 3, + servers: mockedServers, allowedFailures: 1, responses: []queryResponse{ {Offset: 10 * time.Second}, @@ -116,7 +120,7 @@ func newTestCases() []*testCase { }, { description: "AllFailed", - attempts: 3, + servers: mockedServers, allowedFailures: 3, responses: []queryResponse{ {Error: errors.New("test")}, @@ -132,7 +136,7 @@ func newTestCases() []*testCase { func TestComputeOffset(t *testing.T) { for _, tc := range newTestCases() { t.Run(tc.description, func(t *testing.T) { - offset, err := computeOffset(tc.query, "", tc.attempts, tc.allowedFailures) + offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) if tc.expectError { assert.Error(t, err) } else { @@ -147,7 +151,7 @@ func TestNTPTimeSource(t *testing.T) { for _, tc := range newTestCases() { t.Run(tc.description, func(t *testing.T) { source := &NTPTimeSource{ - attempts: tc.attempts, + servers: tc.servers, allowedFailures: tc.allowedFailures, timeQuery: tc.query, }