Fix/timesource offline (#4309)
* fix: add missing callback cleanup in timesource fixes: "panic: sync: negative WaitGroup counter" part of: status-im/status-desktop#12691 * fix: ensure timesource.GetCurrentTime is non-blocking closes: status-im/status-desktop#12691
This commit is contained in:
parent
2076853ce3
commit
a584ab086a
|
@ -82,13 +82,10 @@ func generateMediaTLSCert() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
notBefore, err := timesource.GetCurrentTime()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
notBefore := timesource.GetCurrentTime()
|
||||
notAfter := notBefore.Add(365 * 24 * time.Hour)
|
||||
log.Debug("generate media cert", "system time", time.Now().String(), "cert notBefore", notBefore.String(), "cert notAfter", notAfter.String())
|
||||
finalCert, certPem, err := GenerateTLSCert(*notBefore, notAfter, []net.IP{}, []string{Localhost})
|
||||
finalCert, certPem, err := GenerateTLSCert(notBefore, notAfter, []net.IP{}, []string{Localhost})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ func NewBaseClient(c *ConnectionParams, logger *zap.Logger) (*BaseClient, error)
|
|||
MinVersion: tls.VersionTLS12,
|
||||
InsecureSkipVerify: false, // MUST BE FALSE
|
||||
RootCAs: rootCAs,
|
||||
Time: timesource.Time,
|
||||
Time: timesource.GetCurrentTime,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -39,10 +39,7 @@ func preflightHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func makeCert(address net.IP) (*tls.Certificate, []byte, error) {
|
||||
now, err := timesource.GetCurrentTime()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
now := timesource.GetCurrentTime()
|
||||
log.Debug("makeCert", "system time", time.Now().String(), "timesource time", now.String())
|
||||
notBefore := now.Add(-pairing.CertificateMaxClockDrift)
|
||||
notAfter := now.Add(pairing.CertificateMaxClockDrift)
|
||||
|
@ -87,7 +84,7 @@ func makeClient(certPem []byte) (*http.Client, error) {
|
|||
MinVersion: tls.VersionTLS12,
|
||||
InsecureSkipVerify: false, // MUST BE FALSE
|
||||
RootCAs: rootCAs,
|
||||
Time: timesource.Time,
|
||||
Time: timesource.GetCurrentTime,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -78,12 +78,9 @@ func MakeServerConfig(config *ServerConfig) error {
|
|||
return err
|
||||
}
|
||||
|
||||
now, err := timesource.GetCurrentTime()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now := timesource.GetCurrentTime()
|
||||
log.Debug("pairing server generate cert", "system time", time.Now().String(), "timesource time", now.String())
|
||||
tlsCert, _, err := GenerateCertFromKey(tlsKey, *now, ips, []string{})
|
||||
tlsCert, _, err := GenerateCertFromKey(tlsKey, now, ips, []string{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -35,10 +35,7 @@ func (api *MultiAccountsAPI) UpdateAccount(account multiaccounts.Account) error
|
|||
return errors.New("UpdateAccount but account not found")
|
||||
}
|
||||
if oldAcc.CustomizationColor != account.CustomizationColor {
|
||||
updatedAt, err := timesource.GetCurrentTimeInMillis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
updatedAt := timesource.GetCurrentTimeInMillis()
|
||||
account.CustomizationColorClock = updatedAt
|
||||
}
|
||||
return api.db.UpdateAccount(account)
|
||||
|
|
|
@ -7,8 +7,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/beevik/ntp"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
|
@ -145,11 +143,6 @@ func Default() *NTPTimeSource {
|
|||
return ntpTimeSourceCreator()
|
||||
}
|
||||
|
||||
type timeCallback struct {
|
||||
wg *sync.WaitGroup
|
||||
callback func(time.Time)
|
||||
}
|
||||
|
||||
// NTPTimeSource provides source of time that tries to be resistant to time skews.
|
||||
// It does so by periodically querying time offset from ntp servers.
|
||||
type NTPTimeSource struct {
|
||||
|
@ -159,31 +152,13 @@ type NTPTimeSource struct {
|
|||
slowNTPSyncPeriod time.Duration
|
||||
timeQuery ntpQuery // for ease of testing
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
started bool
|
||||
updatedOffset bool
|
||||
callbacks []timeCallback
|
||||
quit chan struct{}
|
||||
started bool
|
||||
|
||||
mu sync.RWMutex
|
||||
latestOffset time.Duration
|
||||
}
|
||||
|
||||
// AddCallback adds callback that will be called when offset is updated.
|
||||
// If offset is already updated once, callback will be called immediately.
|
||||
func (s *NTPTimeSource) AddCallback(callback func(time.Time)) *sync.WaitGroup {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
var wg sync.WaitGroup
|
||||
if s.updatedOffset {
|
||||
callback(now().Add(s.latestOffset))
|
||||
} else {
|
||||
wg.Add(1)
|
||||
s.callbacks = append(s.callbacks, timeCallback{wg: &wg, callback: callback})
|
||||
}
|
||||
return &wg
|
||||
}
|
||||
|
||||
// Now returns time adjusted by latest known offset
|
||||
func (s *NTPTimeSource) Now() time.Time {
|
||||
s.mu.RLock()
|
||||
|
@ -195,39 +170,28 @@ func (s *NTPTimeSource) updateOffset() error {
|
|||
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
|
||||
if err != nil {
|
||||
log.Error("failed to compute offset", "error", err)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
for _, c := range s.callbacks {
|
||||
c.callback(now())
|
||||
c.wg.Done()
|
||||
}
|
||||
return errUpdateOffset
|
||||
}
|
||||
log.Info("Difference with ntp servers", "offset", offset)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.latestOffset = offset
|
||||
s.updatedOffset = true
|
||||
|
||||
for _, c := range s.callbacks {
|
||||
c.callback(now().Add(s.latestOffset))
|
||||
c.wg.Done()
|
||||
}
|
||||
s.callbacks = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// runPeriodically runs periodically the given function based on NTPTimeSource
|
||||
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
|
||||
func (s *NTPTimeSource) runPeriodically(fn func() error) error {
|
||||
func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod bool) {
|
||||
if s.started {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
var period time.Duration
|
||||
period := s.fastNTPSyncPeriod
|
||||
if starWithSlowSyncPeriod {
|
||||
period = s.slowNTPSyncPeriod
|
||||
}
|
||||
s.quit = make(chan struct{})
|
||||
// we try to do it synchronously so that user can have reliable messages right away
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
@ -239,19 +203,29 @@ func (s *NTPTimeSource) runPeriodically(fn func() error) error {
|
|||
}
|
||||
|
||||
case <-s.quit:
|
||||
s.wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.started = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start runs a goroutine that updates local offset every updatePeriod.
|
||||
func (s *NTPTimeSource) Start() error {
|
||||
return s.runPeriodically(s.updateOffset)
|
||||
// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
|
||||
func (s *NTPTimeSource) Start() {
|
||||
if s.started {
|
||||
return
|
||||
}
|
||||
|
||||
// Attempt to update the offset synchronously so that user can have reliable messages right away
|
||||
err := s.updateOffset()
|
||||
if err != nil {
|
||||
// Failure to update can occur if the node is offline.
|
||||
// Instead of returning an error, continue with the process as the update will be retried periodically.
|
||||
log.Error("failed to update offset", err)
|
||||
}
|
||||
|
||||
s.runPeriodically(s.updateOffset, err == nil)
|
||||
|
||||
s.started = true
|
||||
}
|
||||
|
||||
// Stop goroutine that updates time source.
|
||||
|
@ -260,39 +234,17 @@ func (s *NTPTimeSource) Stop() error {
|
|||
return nil
|
||||
}
|
||||
close(s.quit)
|
||||
s.wg.Wait()
|
||||
s.started = false
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetCurrentTimeInMillis() (uint64, error) {
|
||||
now, err := GetCurrentTime()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(now.UnixNano() / int64(time.Millisecond)), nil
|
||||
}
|
||||
|
||||
func GetCurrentTime() (*time.Time, error) {
|
||||
func GetCurrentTime() time.Time {
|
||||
ts := Default()
|
||||
if err := ts.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var t *time.Time
|
||||
ts.AddCallback(func(now time.Time) {
|
||||
t = &now
|
||||
}).Wait()
|
||||
if ts.updatedOffset {
|
||||
return t, nil
|
||||
}
|
||||
return nil, errUpdateOffset
|
||||
ts.Start()
|
||||
|
||||
return ts.Now()
|
||||
}
|
||||
|
||||
func Time() time.Time {
|
||||
now, err := GetCurrentTime()
|
||||
if err != nil {
|
||||
log.Error("[timesource] error when getting current time", zap.Error(err))
|
||||
return time.Now()
|
||||
}
|
||||
return *now
|
||||
func GetCurrentTimeInMillis() uint64 {
|
||||
return uint64(GetCurrentTime().UnixNano() / int64(time.Millisecond))
|
||||
}
|
||||
|
|
|
@ -210,7 +210,9 @@ func TestRunningPeriodically(t *testing.T) {
|
|||
lastCall := time.Now()
|
||||
// we're simulating a calls to updateOffset, testing ntp calls happens
|
||||
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
|
||||
err := source.runPeriodically(func() error {
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
source.runPeriodically(func() error {
|
||||
mu.Lock()
|
||||
periods = append(periods, time.Since(lastCall))
|
||||
mu.Unlock()
|
||||
|
@ -219,13 +221,12 @@ func TestRunningPeriodically(t *testing.T) {
|
|||
return errUpdateOffset
|
||||
}
|
||||
if hits == 6 {
|
||||
source.wg.Done()
|
||||
wg.Done()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}, false)
|
||||
|
||||
source.wg.Wait()
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
|
||||
mu.Lock()
|
||||
require.Len(t, periods, 6)
|
||||
|
@ -271,20 +272,41 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
|
|||
}
|
||||
|
||||
expectedTime := uint64(11000)
|
||||
n, e := GetCurrentTimeInMillis()
|
||||
require.NoError(t, e)
|
||||
n := GetCurrentTimeInMillis()
|
||||
require.Equal(t, expectedTime, n)
|
||||
// test repeat invoke GetCurrentTimeInMillis
|
||||
n, e = GetCurrentTimeInMillis()
|
||||
require.NoError(t, e)
|
||||
n = GetCurrentTimeInMillis()
|
||||
require.Equal(t, expectedTime, n)
|
||||
e = Default().Stop()
|
||||
e := Default().Stop()
|
||||
require.NoError(t, e)
|
||||
|
||||
// test invoke after stop
|
||||
n, e = GetCurrentTimeInMillis()
|
||||
require.NoError(t, e)
|
||||
n = GetCurrentTimeInMillis()
|
||||
require.Equal(t, expectedTime, n)
|
||||
e = Default().Stop()
|
||||
require.NoError(t, e)
|
||||
}
|
||||
|
||||
func TestGetCurrentTimeOffline(t *testing.T) {
|
||||
// covers https://github.com/status-im/status-desktop/issues/12691
|
||||
ntpTimeSourceCreator = func() *NTPTimeSource {
|
||||
if ntpTimeSource != nil {
|
||||
return ntpTimeSource
|
||||
}
|
||||
ntpTimeSource = &NTPTimeSource{
|
||||
servers: defaultServers,
|
||||
allowedFailures: DefaultMaxAllowedFailures,
|
||||
fastNTPSyncPeriod: 1 * time.Millisecond,
|
||||
slowNTPSyncPeriod: 1 * time.Second,
|
||||
timeQuery: func(string, ntp.QueryOptions) (*ntp.Response, error) {
|
||||
return nil, errors.New("offline")
|
||||
},
|
||||
}
|
||||
return ntpTimeSource
|
||||
}
|
||||
|
||||
// ensure there is no "panic: sync: negative WaitGroup counter"
|
||||
// when GetCurrentTime() is invoked more than once when offline
|
||||
_ = GetCurrentTime()
|
||||
_ = GetCurrentTime()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue