Define shorter timeout for ntp calls (2s) and tolerate failures (#911)

* Define shorter timeout for ntp calls (2s) and tolerate failures

* Use allowed failures instead of tolerated errorss
This commit is contained in:
Dmitry Shulyak 2018-05-09 08:10:48 +03:00 committed by GitHub
parent c673148bf4
commit 6c2a8ef1f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 24 deletions

View File

@ -18,13 +18,19 @@ const (
DefaultServer = "pool.ntp.org"
// DefaultAttempts defines how many servers we will query
DefaultAttempts = 3
DefaultAttempts = 5
// DefaultMaxAllowedFailures defines how many failures will be tolerated.
DefaultMaxAllowedFailures = 2
// DefaultUpdatePeriod defines how often time will be queried from ntp.
DefaultUpdatePeriod = 2 * time.Minute
// DefaultRPCTimeout defines write deadline for single ntp server request.
DefaultRPCTimeout = 2 * time.Second
)
type ntpQuery func(string) (*ntp.Response, error)
type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error)
type queryResponse struct {
Offset time.Duration
@ -48,15 +54,16 @@ func (e multiRPCError) Error() string {
return b.String()
}
func computeOffset(timeQuery ntpQuery, server string, attempts int) (time.Duration, error) {
func computeOffset(timeQuery ntpQuery, server string, attempts, allowedFailures int) (time.Duration, error) {
if attempts == 0 {
return 0, nil
}
responses := make(chan queryResponse, attempts)
for i := 0; i < attempts; i++ {
go func() {
// ntp.Query default timeout is 5s
response, err := timeQuery(server)
response, err := timeQuery(server, ntp.QueryOptions{
Timeout: DefaultRPCTimeout,
})
if err != nil {
responses <- queryResponse{Error: err}
return
@ -80,7 +87,9 @@ func computeOffset(timeQuery ntpQuery, server string, attempts int) (time.Durati
break
}
}
if len(rpcErrors) != 0 {
if lth := len(rpcErrors); lth > allowedFailures {
return 0, rpcErrors
} else if lth == attempts {
return 0, rpcErrors
}
sort.SliceStable(offsets, func(i, j int) bool {
@ -96,20 +105,22 @@ func computeOffset(timeQuery ntpQuery, server string, attempts int) (time.Durati
// Default initializes time source with default config values.
func Default() *NTPTimeSource {
return &NTPTimeSource{
server: DefaultServer,
attempts: DefaultAttempts,
updatePeriod: DefaultUpdatePeriod,
timeQuery: ntp.Query,
server: DefaultServer,
attempts: DefaultAttempts,
allowedFailures: DefaultMaxAllowedFailures,
updatePeriod: DefaultUpdatePeriod,
timeQuery: ntp.QueryWithOptions,
}
}
// NTPTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
server string
attempts int
updatePeriod time.Duration
timeQuery ntpQuery // for ease of testing
server string
attempts int
allowedFailures int
updatePeriod time.Duration
timeQuery ntpQuery // for ease of testing
quit chan struct{}
wg sync.WaitGroup
@ -126,7 +137,7 @@ func (s *NTPTimeSource) Now() time.Time {
}
func (s *NTPTimeSource) updateOffset() {
offset, err := computeOffset(s.timeQuery, s.server, s.attempts)
offset, err := computeOffset(s.timeQuery, s.server, s.attempts, s.allowedFailures)
if err != nil {
log.Error("failed to compute offset", "error", err)
return

View File

@ -16,18 +16,19 @@ const (
)
type testCase struct {
description string
attempts int
responses []queryResponse
expected time.Duration
expectError bool
description string
attempts int
allowedFailures int
responses []queryResponse
expected time.Duration
expectError bool
// actual attempts are mutable
mu sync.Mutex
actualAttempts int
}
func (tc *testCase) query(string) (*ntp.Response, error) {
func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) {
tc.mu.Lock()
defer func() {
tc.actualAttempts++
@ -90,13 +91,48 @@ func newTestCases() []*testCase {
expected: time.Duration(0),
expectError: true,
},
{
description: "TolerableError",
attempts: 3,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
{Offset: 30 * time.Second},
},
expected: 20 * time.Second,
},
{
description: "NonTolerableError",
attempts: 3,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
{Error: errors.New("test")},
},
expected: time.Duration(0),
expectError: true,
},
{
description: "AllFailed",
attempts: 3,
allowedFailures: 3,
responses: []queryResponse{
{Error: errors.New("test")},
{Error: errors.New("test")},
{Error: errors.New("test")},
},
expected: time.Duration(0),
expectError: true,
},
}
}
func TestComputeOffset(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
offset, err := computeOffset(tc.query, "", tc.attempts)
offset, err := computeOffset(tc.query, "", tc.attempts, tc.allowedFailures)
if tc.expectError {
assert.Error(t, err)
} else {
@ -111,8 +147,9 @@ func TestNTPTimeSource(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
attempts: tc.attempts,
timeQuery: tc.query,
attempts: tc.attempts,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
}
assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta)
source.updateOffset()