From 00a74606d37b91cc21c096bd0efbbf244de36045 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Tue, 25 Nov 2025 22:08:04 +0100 Subject: [PATCH] adding timesource package --- waku/nwaku.go | 3 + waku/timesource/ntp.go | 210 ++++++++++++++++++++++++++++ waku/timesource/ntp_test.go | 254 ++++++++++++++++++++++++++++++++++ waku/timesource/timesource.go | 12 ++ waku/timesource/wall.go | 26 ++++ 5 files changed, 505 insertions(+) create mode 100644 waku/timesource/ntp.go create mode 100644 waku/timesource/ntp_test.go create mode 100644 waku/timesource/timesource.go create mode 100644 waku/timesource/wall.go diff --git a/waku/nwaku.go b/waku/nwaku.go index 28e2bd9..0cd2fdb 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -352,6 +352,8 @@ import ( "time" "unsafe" + "github.com/waku-org/waku-go-bindings/waku/timesource" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" @@ -388,6 +390,7 @@ type WakuNode struct { TopicHealthChan chan topicHealth ConnectionChangeChan chan connectionChange nodeName string + _ timesource.Timesource } func NewWakuNode(config *common.WakuConfig, nodeName string) (*WakuNode, error) { diff --git a/waku/timesource/ntp.go b/waku/timesource/ntp.go new file mode 100644 index 0000000..3454631 --- /dev/null +++ b/waku/timesource/ntp.go @@ -0,0 +1,210 @@ +package timesource + +import ( + "bytes" + "context" + "errors" + "sort" + "sync" + "time" + + "github.com/beevik/ntp" + "go.uber.org/zap" +) + +const ( + // DefaultMaxAllowedFailures defines how many failures will be tolerated. + DefaultMaxAllowedFailures = 1 + + // FastNTPSyncPeriod period between ntp synchronizations before the first + // successful connection. + FastNTPSyncPeriod = 2 * time.Minute + + // SlowNTPSyncPeriod period between ntp synchronizations after the first + // successful connection. + SlowNTPSyncPeriod = 1 * time.Hour + + // DefaultRPCTimeout defines write deadline for single ntp server request. + DefaultRPCTimeout = 2 * time.Second +) + +// DefaultServers will be resolved to the closest available, +// and with high probability resolved to the different IPs +var DefaultServers = []string{ + "0.pool.ntp.org", + "1.pool.ntp.org", + "2.pool.ntp.org", + "3.pool.ntp.org", +} +var errUpdateOffset = errors.New("failed to compute offset") + +type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) + +type queryResponse struct { + Offset time.Duration + Error error +} + +type multiRPCError []error + +func (e multiRPCError) Error() string { + var b bytes.Buffer + b.WriteString("RPC failed: ") + more := false + for _, err := range e { + if more { + b.WriteString("; ") + } + b.WriteString(err.Error()) + more = true + } + b.WriteString(".") + return b.String() +} + +func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { + if len(servers) == 0 { + return 0, nil + } + responses := make(chan queryResponse, len(servers)) + for _, server := range servers { + go func(server string) { + response, err := timeQuery(server, ntp.QueryOptions{ + Timeout: DefaultRPCTimeout, + }) + if err == nil { + err = response.Validate() + } + if err != nil { + responses <- queryResponse{Error: err} + return + } + responses <- queryResponse{Offset: response.ClockOffset} + }(server) + } + var ( + rpcErrors multiRPCError + offsets []time.Duration + collected int + ) + for response := range responses { + if response.Error != nil { + rpcErrors = append(rpcErrors, response.Error) + } else { + offsets = append(offsets, response.Offset) + } + collected++ + if collected == len(servers) { + break + } + } + if lth := len(rpcErrors); lth > allowedFailures { + return 0, rpcErrors + } else if lth == len(servers) { + return 0, rpcErrors + } + sort.SliceStable(offsets, func(i, j int) bool { + return offsets[i] > offsets[j] + }) + mid := len(offsets) / 2 + if len(offsets)%2 == 0 { + return (offsets[mid-1] + offsets[mid]) / 2, nil + } + return offsets[mid], nil +} + +// NewNTPTimesource creates a timesource that uses NTP +func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource { + return &NTPTimeSource{ + servers: ntpServers, + allowedFailures: DefaultMaxAllowedFailures, + fastNTPSyncPeriod: FastNTPSyncPeriod, + slowNTPSyncPeriod: SlowNTPSyncPeriod, + timeQuery: ntp.QueryWithOptions, + log: log.Named("timesource"), + } +} + +// NTPTimeSource provides source of time that tries to be resistant to time skews. +// It does so by periodically querying time offset from ntp servers. +type NTPTimeSource struct { + servers []string + allowedFailures int + fastNTPSyncPeriod time.Duration + slowNTPSyncPeriod time.Duration + timeQuery ntpQuery // for ease of testing + log *zap.Logger + + cancel context.CancelFunc + wg sync.WaitGroup + + mu sync.RWMutex + latestOffset time.Duration +} + +// Now returns time adjusted by latest known offset +func (s *NTPTimeSource) Now() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return time.Now().Add(s.latestOffset) +} + +func (s *NTPTimeSource) updateOffset() error { + offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) + if err != nil { + s.log.Error("failed to compute offset", zap.Error(err)) + return errUpdateOffset + } + s.log.Info("Difference with ntp servers", zap.Duration("offset", offset)) + s.mu.Lock() + s.latestOffset = offset + s.mu.Unlock() + return nil +} + +// runPeriodically runs periodically the given function based on NTPTimeSource +// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) +func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error { + var period time.Duration + + s.log.Info("starting service") + + // we try to do it synchronously so that user can have reliable messages right away + s.wg.Add(1) + go func() { + for { + select { + case <-time.After(period): + if err := fn(); err == nil { + period = s.slowNTPSyncPeriod + } else if period != s.slowNTPSyncPeriod { + period = s.fastNTPSyncPeriod + } + + case <-ctx.Done(): + s.log.Info("stopping service") + s.wg.Done() + return + } + } + }() + + return nil +} + +// Start runs a goroutine that updates local offset every updatePeriod. +func (s *NTPTimeSource) Start(ctx context.Context) error { + s.wg.Wait() // Waiting for other go routines to stop + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + return s.runPeriodically(ctx, s.updateOffset) +} + +// Stop goroutine that updates time source. +func (s *NTPTimeSource) Stop() { + if s.cancel == nil { + return + } + s.cancel() + s.wg.Wait() +} diff --git a/waku/timesource/ntp_test.go b/waku/timesource/ntp_test.go new file mode 100644 index 0000000..5e22fee --- /dev/null +++ b/waku/timesource/ntp_test.go @@ -0,0 +1,254 @@ +package timesource + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/beevik/ntp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + // clockCompareDelta declares time required between multiple calls to time.Now + clockCompareDelta = 100 * time.Microsecond +) + +// we don't user real servers for tests, but logic depends on +// actual number of involved NTP servers. +var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"} + +type testCase struct { + description string + servers []string + allowedFailures int + responses []queryResponse + expected time.Duration + expectError bool + + // actual attempts are mutable + mu sync.Mutex + actualAttempts int +} + +func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) { + tc.mu.Lock() + defer func() { + tc.actualAttempts++ + tc.mu.Unlock() + }() + response := &ntp.Response{ + ClockOffset: tc.responses[tc.actualAttempts].Offset, + Stratum: 1, + } + return response, tc.responses[tc.actualAttempts].Error +} + +func newTestCases() []*testCase { + return []*testCase{ + { + description: "SameResponse", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + }, + expected: 10 * time.Second, + }, + { + description: "Median", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "EvenMedian", + servers: mockedServers[:2], + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + }, + expected: 15 * time.Second, + }, + { + description: "Error", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 30 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "MultiError", + servers: mockedServers, + responses: []queryResponse{ + {Error: errors.New("test 1")}, + {Error: errors.New("test 2")}, + {Error: errors.New("test 3")}, + {Error: errors.New("test 3")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "TolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "NonTolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "AllFailed", + servers: mockedServers, + allowedFailures: 4, + responses: []queryResponse{ + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "HalfTolerable", + servers: mockedServers, + allowedFailures: 2, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: 15 * time.Second, + }, + } +} + +func TestComputeOffset(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.expected, offset) + }) + } +} + +func TestNTPTimeSource(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + _, cancel := context.WithCancel(context.Background()) + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + log: utils.Logger(), + cancel: cancel, + } + + assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta) + err := source.updateOffset() + if tc.expectError { + assert.Equal(t, errUpdateOffset, err) + } else { + assert.NoError(t, err) + } + assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta) + }) + } +} + +func TestRunningPeriodically(t *testing.T) { + var hits int + var mu sync.RWMutex + periods := make([]time.Duration, 0) + + tc := newTestCases()[0] + fastHits := 3 + slowHits := 1 + + t.Run(tc.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond, + slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond, + log: utils.Logger(), + cancel: cancel, + } + lastCall := time.Now() + // we're simulating a calls to updateOffset, testing ntp calls happens + // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) + err := source.runPeriodically(ctx, func() error { + mu.Lock() + periods = append(periods, time.Since(lastCall)) + mu.Unlock() + hits++ + if hits < 3 { + return errUpdateOffset + } + if hits == 6 { + source.wg.Done() + } + return nil + }) + + source.wg.Wait() + require.NoError(t, err) + + mu.Lock() + require.Len(t, periods, 6) + defer mu.Unlock() + prev := 0 + for _, period := range periods[1:3] { + p := int(period.Seconds() * 100) + require.True(t, fastHits <= (p-prev)) + prev = p + } + + for _, period := range periods[3:] { + p := int(period.Seconds() * 100) + require.True(t, slowHits <= (p-prev)) + prev = p + } + }) +} diff --git a/waku/timesource/timesource.go b/waku/timesource/timesource.go new file mode 100644 index 0000000..2d0a86b --- /dev/null +++ b/waku/timesource/timesource.go @@ -0,0 +1,12 @@ +package timesource + +import ( + "context" + "time" +) + +type Timesource interface { + Now() time.Time + Start(ctx context.Context) error + Stop() +} diff --git a/waku/timesource/wall.go b/waku/timesource/wall.go new file mode 100644 index 0000000..778f2d3 --- /dev/null +++ b/waku/timesource/wall.go @@ -0,0 +1,26 @@ +package timesource + +import ( + "context" + "time" +) + +type WallClockTimeSource struct { +} + +func NewDefaultClock() *WallClockTimeSource { + return &WallClockTimeSource{} +} + +func (t *WallClockTimeSource) Now() time.Time { + return time.Now() +} + +func (t *WallClockTimeSource) Start(ctx context.Context) error { + // Do nothing + return nil +} + +func (t *WallClockTimeSource) Stop() { + // Do nothing +}