diff --git a/waku/timesource/ntp.go b/waku/timesource/ntp.go deleted file mode 100644 index 3454631..0000000 --- a/waku/timesource/ntp.go +++ /dev/null @@ -1,210 +0,0 @@ -package timesource - -import ( - "bytes" - "context" - "errors" - "sort" - "sync" - "time" - - "github.com/beevik/ntp" - "go.uber.org/zap" -) - -const ( - // DefaultMaxAllowedFailures defines how many failures will be tolerated. - DefaultMaxAllowedFailures = 1 - - // FastNTPSyncPeriod period between ntp synchronizations before the first - // successful connection. - FastNTPSyncPeriod = 2 * time.Minute - - // SlowNTPSyncPeriod period between ntp synchronizations after the first - // successful connection. - SlowNTPSyncPeriod = 1 * time.Hour - - // DefaultRPCTimeout defines write deadline for single ntp server request. - DefaultRPCTimeout = 2 * time.Second -) - -// DefaultServers will be resolved to the closest available, -// and with high probability resolved to the different IPs -var DefaultServers = []string{ - "0.pool.ntp.org", - "1.pool.ntp.org", - "2.pool.ntp.org", - "3.pool.ntp.org", -} -var errUpdateOffset = errors.New("failed to compute offset") - -type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) - -type queryResponse struct { - Offset time.Duration - Error error -} - -type multiRPCError []error - -func (e multiRPCError) Error() string { - var b bytes.Buffer - b.WriteString("RPC failed: ") - more := false - for _, err := range e { - if more { - b.WriteString("; ") - } - b.WriteString(err.Error()) - more = true - } - b.WriteString(".") - return b.String() -} - -func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { - if len(servers) == 0 { - return 0, nil - } - responses := make(chan queryResponse, len(servers)) - for _, server := range servers { - go func(server string) { - response, err := timeQuery(server, ntp.QueryOptions{ - Timeout: DefaultRPCTimeout, - }) - if err == nil { - err = response.Validate() - } - if err != nil { - responses <- queryResponse{Error: err} - return - } - responses <- queryResponse{Offset: response.ClockOffset} - }(server) - } - var ( - rpcErrors multiRPCError - offsets []time.Duration - collected int - ) - for response := range responses { - if response.Error != nil { - rpcErrors = append(rpcErrors, response.Error) - } else { - offsets = append(offsets, response.Offset) - } - collected++ - if collected == len(servers) { - break - } - } - if lth := len(rpcErrors); lth > allowedFailures { - return 0, rpcErrors - } else if lth == len(servers) { - return 0, rpcErrors - } - sort.SliceStable(offsets, func(i, j int) bool { - return offsets[i] > offsets[j] - }) - mid := len(offsets) / 2 - if len(offsets)%2 == 0 { - return (offsets[mid-1] + offsets[mid]) / 2, nil - } - return offsets[mid], nil -} - -// NewNTPTimesource creates a timesource that uses NTP -func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource { - return &NTPTimeSource{ - servers: ntpServers, - allowedFailures: DefaultMaxAllowedFailures, - fastNTPSyncPeriod: FastNTPSyncPeriod, - slowNTPSyncPeriod: SlowNTPSyncPeriod, - timeQuery: ntp.QueryWithOptions, - log: log.Named("timesource"), - } -} - -// NTPTimeSource provides source of time that tries to be resistant to time skews. -// It does so by periodically querying time offset from ntp servers. -type NTPTimeSource struct { - servers []string - allowedFailures int - fastNTPSyncPeriod time.Duration - slowNTPSyncPeriod time.Duration - timeQuery ntpQuery // for ease of testing - log *zap.Logger - - cancel context.CancelFunc - wg sync.WaitGroup - - mu sync.RWMutex - latestOffset time.Duration -} - -// Now returns time adjusted by latest known offset -func (s *NTPTimeSource) Now() time.Time { - s.mu.RLock() - defer s.mu.RUnlock() - return time.Now().Add(s.latestOffset) -} - -func (s *NTPTimeSource) updateOffset() error { - offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) - if err != nil { - s.log.Error("failed to compute offset", zap.Error(err)) - return errUpdateOffset - } - s.log.Info("Difference with ntp servers", zap.Duration("offset", offset)) - s.mu.Lock() - s.latestOffset = offset - s.mu.Unlock() - return nil -} - -// runPeriodically runs periodically the given function based on NTPTimeSource -// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) -func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error { - var period time.Duration - - s.log.Info("starting service") - - // we try to do it synchronously so that user can have reliable messages right away - s.wg.Add(1) - go func() { - for { - select { - case <-time.After(period): - if err := fn(); err == nil { - period = s.slowNTPSyncPeriod - } else if period != s.slowNTPSyncPeriod { - period = s.fastNTPSyncPeriod - } - - case <-ctx.Done(): - s.log.Info("stopping service") - s.wg.Done() - return - } - } - }() - - return nil -} - -// Start runs a goroutine that updates local offset every updatePeriod. -func (s *NTPTimeSource) Start(ctx context.Context) error { - s.wg.Wait() // Waiting for other go routines to stop - ctx, cancel := context.WithCancel(ctx) - s.cancel = cancel - return s.runPeriodically(ctx, s.updateOffset) -} - -// Stop goroutine that updates time source. -func (s *NTPTimeSource) Stop() { - if s.cancel == nil { - return - } - s.cancel() - s.wg.Wait() -} diff --git a/waku/timesource/ntp_test.go b/waku/timesource/ntp_test.go deleted file mode 100644 index 5e22fee..0000000 --- a/waku/timesource/ntp_test.go +++ /dev/null @@ -1,254 +0,0 @@ -package timesource - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/beevik/ntp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const ( - // clockCompareDelta declares time required between multiple calls to time.Now - clockCompareDelta = 100 * time.Microsecond -) - -// we don't user real servers for tests, but logic depends on -// actual number of involved NTP servers. -var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"} - -type testCase struct { - description string - servers []string - allowedFailures int - responses []queryResponse - expected time.Duration - expectError bool - - // actual attempts are mutable - mu sync.Mutex - actualAttempts int -} - -func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) { - tc.mu.Lock() - defer func() { - tc.actualAttempts++ - tc.mu.Unlock() - }() - response := &ntp.Response{ - ClockOffset: tc.responses[tc.actualAttempts].Offset, - Stratum: 1, - } - return response, tc.responses[tc.actualAttempts].Error -} - -func newTestCases() []*testCase { - return []*testCase{ - { - description: "SameResponse", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - {Offset: 10 * time.Second}, - }, - expected: 10 * time.Second, - }, - { - description: "Median", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - {Offset: 20 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: 20 * time.Second, - }, - { - description: "EvenMedian", - servers: mockedServers[:2], - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - }, - expected: 15 * time.Second, - }, - { - description: "Error", - servers: mockedServers, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Offset: 30 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "MultiError", - servers: mockedServers, - responses: []queryResponse{ - {Error: errors.New("test 1")}, - {Error: errors.New("test 2")}, - {Error: errors.New("test 3")}, - {Error: errors.New("test 3")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "TolerableError", - servers: mockedServers, - allowedFailures: 1, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Offset: 20 * time.Second}, - {Offset: 30 * time.Second}, - }, - expected: 20 * time.Second, - }, - { - description: "NonTolerableError", - servers: mockedServers, - allowedFailures: 1, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "AllFailed", - servers: mockedServers, - allowedFailures: 4, - responses: []queryResponse{ - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: time.Duration(0), - expectError: true, - }, - { - description: "HalfTolerable", - servers: mockedServers, - allowedFailures: 2, - responses: []queryResponse{ - {Offset: 10 * time.Second}, - {Offset: 20 * time.Second}, - {Error: errors.New("test")}, - {Error: errors.New("test")}, - }, - expected: 15 * time.Second, - }, - } -} - -func TestComputeOffset(t *testing.T) { - for _, tc := range newTestCases() { - t.Run(tc.description, func(t *testing.T) { - offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) - if tc.expectError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - assert.Equal(t, tc.expected, offset) - }) - } -} - -func TestNTPTimeSource(t *testing.T) { - for _, tc := range newTestCases() { - t.Run(tc.description, func(t *testing.T) { - _, cancel := context.WithCancel(context.Background()) - source := &NTPTimeSource{ - servers: tc.servers, - allowedFailures: tc.allowedFailures, - timeQuery: tc.query, - log: utils.Logger(), - cancel: cancel, - } - - assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta) - err := source.updateOffset() - if tc.expectError { - assert.Equal(t, errUpdateOffset, err) - } else { - assert.NoError(t, err) - } - assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta) - }) - } -} - -func TestRunningPeriodically(t *testing.T) { - var hits int - var mu sync.RWMutex - periods := make([]time.Duration, 0) - - tc := newTestCases()[0] - fastHits := 3 - slowHits := 1 - - t.Run(tc.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - source := &NTPTimeSource{ - servers: tc.servers, - allowedFailures: tc.allowedFailures, - timeQuery: tc.query, - fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond, - slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond, - log: utils.Logger(), - cancel: cancel, - } - lastCall := time.Now() - // we're simulating a calls to updateOffset, testing ntp calls happens - // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) - err := source.runPeriodically(ctx, func() error { - mu.Lock() - periods = append(periods, time.Since(lastCall)) - mu.Unlock() - hits++ - if hits < 3 { - return errUpdateOffset - } - if hits == 6 { - source.wg.Done() - } - return nil - }) - - source.wg.Wait() - require.NoError(t, err) - - mu.Lock() - require.Len(t, periods, 6) - defer mu.Unlock() - prev := 0 - for _, period := range periods[1:3] { - p := int(period.Seconds() * 100) - require.True(t, fastHits <= (p-prev)) - prev = p - } - - for _, period := range periods[3:] { - p := int(period.Seconds() * 100) - require.True(t, slowHits <= (p-prev)) - prev = p - } - }) -} diff --git a/waku/timesource/wall.go b/waku/timesource/wall.go deleted file mode 100644 index 778f2d3..0000000 --- a/waku/timesource/wall.go +++ /dev/null @@ -1,26 +0,0 @@ -package timesource - -import ( - "context" - "time" -) - -type WallClockTimeSource struct { -} - -func NewDefaultClock() *WallClockTimeSource { - return &WallClockTimeSource{} -} - -func (t *WallClockTimeSource) Now() time.Time { - return time.Now() -} - -func (t *WallClockTimeSource) Start(ctx context.Context) error { - // Do nothing - return nil -} - -func (t *WallClockTimeSource) Stop() { - // Do nothing -}