Set timesource from config (#915)

This commit is contained in:
Dmitry Shulyak 2018-05-08 13:38:54 +03:00 committed by Ivan Daniluk
parent b6851d8186
commit 7aa508765e
6 changed files with 74 additions and 56 deletions

View File

@ -1,7 +1,7 @@
diff --git c/whisper/whisperv6/api.go w/whisper/whisperv6/api.go diff --git a/whisper/whisperv6/api.go b/whisper/whisperv6/api.go
index 2de99f293..e0c3284b6 100644 index 2de99f293..e0c3284b6 100644
--- c/whisper/whisperv6/api.go --- a/whisper/whisperv6/api.go
+++ w/whisper/whisperv6/api.go +++ b/whisper/whisperv6/api.go
@@ -285,7 +285,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (hexutil. @@ -285,7 +285,7 @@ func (api *PublicWhisperAPI) Post(ctx context.Context, req NewMessage) (hexutil.
} }
@ -11,10 +11,33 @@ index 2de99f293..e0c3284b6 100644
if err != nil { if err != nil {
return nil, err return nil, err
} }
diff --git c/whisper/whisperv6/envelope.go w/whisper/whisperv6/envelope.go diff --git a/whisper/whisperv6/config.go b/whisper/whisperv6/config.go
index 61419de00..3c28263e5 100644
--- a/whisper/whisperv6/config.go
+++ b/whisper/whisperv6/config.go
@@ -16,14 +16,18 @@
package whisperv6
+import "time"
+
// Config represents the configuration state of a whisper node.
type Config struct {
MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"`
+ TimeSource func() time.Time
}
// DefaultConfig represents (shocker!) the default configuration.
var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPOW: DefaultMinimumPoW,
+ TimeSource: time.Now,
}
diff --git a/whisper/whisperv6/envelope.go b/whisper/whisperv6/envelope.go
index c42d1fa8a..3b65fdba0 100644 index c42d1fa8a..3b65fdba0 100644
--- c/whisper/whisperv6/envelope.go --- a/whisper/whisperv6/envelope.go
+++ w/whisper/whisperv6/envelope.go +++ b/whisper/whisperv6/envelope.go
@@ -62,9 +62,9 @@ func (e *Envelope) rlpWithoutNonce() []byte { @@ -62,9 +62,9 @@ func (e *Envelope) rlpWithoutNonce() []byte {
// NewEnvelope wraps a Whisper message with expiration and destination data // NewEnvelope wraps a Whisper message with expiration and destination data
@ -27,10 +50,10 @@ index c42d1fa8a..3b65fdba0 100644
TTL: ttl, TTL: ttl,
Topic: topic, Topic: topic,
Data: msg.Raw, Data: msg.Raw,
diff --git c/whisper/whisperv6/message.go w/whisper/whisperv6/message.go diff --git a/whisper/whisperv6/message.go b/whisper/whisperv6/message.go
index 2d4e86244..a12b445e2 100644 index 2d4e86244..a12b445e2 100644
--- c/whisper/whisperv6/message.go --- a/whisper/whisperv6/message.go
+++ w/whisper/whisperv6/message.go +++ b/whisper/whisperv6/message.go
@@ -27,6 +27,7 @@ import ( @@ -27,6 +27,7 @@ import (
"errors" "errors"
mrand "math/rand" mrand "math/rand"
@ -57,10 +80,10 @@ index 2d4e86244..a12b445e2 100644
if err = envelope.Seal(options); err != nil { if err = envelope.Seal(options); err != nil {
return nil, err return nil, err
} }
diff --git c/whisper/whisperv6/whisper.go w/whisper/whisperv6/whisper.go diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
index 8d56ece64..2bccfec33 100644 index 8d56ece64..697f0ecb8 100644
--- c/whisper/whisperv6/whisper.go --- a/whisper/whisperv6/whisper.go
+++ w/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go
@@ -92,6 +92,8 @@ type Whisper struct { @@ -92,6 +92,8 @@ type Whisper struct {
envelopeTracer EnvelopeTracer // Service collecting envelopes metadata envelopeTracer EnvelopeTracer // Service collecting envelopes metadata
@ -74,19 +97,14 @@ index 8d56ece64..2bccfec33 100644
p2pMsgQueue: make(chan *Envelope, messageQueueLimit), p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}), quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance, syncAllowance: DefaultSyncAllowance,
+ timeSource: time.Now, + timeSource: cfg.TimeSource,
} }
whisper.filters = NewFilters(whisper) whisper.filters = NewFilters(whisper)
@@ -215,6 +218,16 @@ func (whisper *Whisper) APIs() []rpc.API { @@ -215,6 +218,11 @@ func (whisper *Whisper) APIs() []rpc.API {
} }
} }
+// SetTimeSource sets time source used by whisper for envelopes time and expiration logic.
+func (whisper *Whisper) SetTimeSource(timeSource func() time.Time) {
+ whisper.timeSource = timeSource
+}
+
+// GetCurrentTime returns current time. +// GetCurrentTime returns current time.
+func (whisper *Whisper) GetCurrentTime() time.Time { +func (whisper *Whisper) GetCurrentTime() time.Time {
+ return whisper.timeSource() + return whisper.timeSource()
@ -95,7 +113,7 @@ index 8d56ece64..2bccfec33 100644
// RegisterServer registers MailServer interface. // RegisterServer registers MailServer interface.
// MailServer will process all the incoming messages with p2pRequestCode. // MailServer will process all the incoming messages with p2pRequestCode.
func (whisper *Whisper) RegisterServer(server MailServer) { func (whisper *Whisper) RegisterServer(server MailServer) {
@@ -829,7 +842,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { @@ -829,7 +837,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
// appropriate time-stamp. In case of error, connection should be dropped. // appropriate time-stamp. In case of error, connection should be dropped.
// param isP2P indicates whether the message is peer-to-peer (should not be forwarded). // param isP2P indicates whether the message is peer-to-peer (should not be forwarded).
func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
@ -104,7 +122,7 @@ index 8d56ece64..2bccfec33 100644
sent := envelope.Expiry - envelope.TTL sent := envelope.Expiry - envelope.TTL
if sent > now { if sent > now {
@@ -988,7 +1001,7 @@ func (whisper *Whisper) expire() { @@ -988,7 +996,7 @@ func (whisper *Whisper) expire() {
whisper.statsMu.Lock() whisper.statsMu.Lock()
defer whisper.statsMu.Unlock() defer whisper.statsMu.Unlock()
whisper.stats.reset() whisper.stats.reset()

View File

@ -26,6 +26,7 @@ import (
"github.com/status-im/status-go/services/personal" "github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/timesource"
) )
// Errors related to node and services creation. // Errors related to node and services creation.
@ -192,11 +193,21 @@ func activateShhService(stack *node.Node, config *params.NodeConfig) (err error)
logger.Info("SHH protocol is disabled") logger.Info("SHH protocol is disabled")
return nil return nil
} }
if err := stack.Register(func(*node.ServiceContext) (node.Service, error) {
return timesource.Default(), nil
}); err != nil {
return err
}
err = stack.Register(func(*node.ServiceContext) (node.Service, error) { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
var timeSource *timesource.NTPTimeSource
if err := ctx.Service(&timeSource); err != nil {
return nil, err
}
whisperServiceConfig := &whisper.Config{ whisperServiceConfig := &whisper.Config{
MaxMessageSize: whisper.DefaultMaxMessageSize, MaxMessageSize: whisper.DefaultMaxMessageSize,
MinimumAcceptedPOW: 0.001, MinimumAcceptedPOW: 0.001,
TimeSource: timeSource.Now,
} }
whisperService := whisper.New(whisperServiceConfig) whisperService := whisper.New(whisperServiceConfig)

View File

@ -24,7 +24,6 @@ import (
"github.com/status-im/status-go/geth/rpc" "github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
"github.com/status-im/status-go/timesource"
) )
// tickerResolution is the delta to check blockchain sync progress. // tickerResolution is the delta to check blockchain sync progress.
@ -53,8 +52,6 @@ type StatusNode struct {
peerPool *peers.PeerPool peerPool *peers.PeerPool
db *leveldb.DB // used as a cache for PeerPool db *leveldb.DB // used as a cache for PeerPool
timeManager *timesource.NTPTimeSource
log log.Logger log log.Logger
} }
@ -117,9 +114,6 @@ func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceCo
if n.config.Discovery { if n.config.Discovery {
return n.startPeerPool() return n.startPeerPool()
} }
if n.config.WhisperConfig != nil && n.config.WhisperConfig.Enabled {
return n.setupWhisperTimeSource()
}
return nil return nil
} }
@ -153,19 +147,6 @@ func (n *StatusNode) start(services []node.ServiceConstructor) error {
return n.gethNode.Start() return n.gethNode.Start()
} }
func (n *StatusNode) setupWhisperTimeSource() error {
var w *whisper.Whisper
err := n.gethService(&w)
if err != nil {
return err
}
log.Debug("Using ntp time source manager as a source of time in whisper.")
n.timeManager = timesource.Default()
n.timeManager.Start()
w.SetTimeSource(n.timeManager.Now)
return nil
}
func (n *StatusNode) setupRPCClient() (err error) { func (n *StatusNode) setupRPCClient() (err error) {
// setup public RPC client // setup public RPC client
gethNodeClient, err := n.gethNode.AttachPublic() gethNodeClient, err := n.gethNode.AttachPublic()
@ -223,11 +204,6 @@ func (n *StatusNode) stop() error {
n.register = nil n.register = nil
n.peerPool = nil n.peerPool = nil
if n.timeManager != nil {
log.Debug("Stopping time source manager")
n.timeManager.Stop()
n.timeManager = nil
}
if err := n.gethNode.Stop(); err != nil { if err := n.gethNode.Stop(); err != nil {
return err return err
} }

View File

@ -8,6 +8,8 @@ import (
"github.com/beevik/ntp" "github.com/beevik/ntp"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
) )
const ( const (
@ -136,7 +138,7 @@ func (s *NTPTimeSource) updateOffset() {
} }
// Start runs a goroutine that updates local offset every updatePeriod. // Start runs a goroutine that updates local offset every updatePeriod.
func (s *NTPTimeSource) Start() { func (s *NTPTimeSource) Start(*p2p.Server) error {
s.quit = make(chan struct{}) s.quit = make(chan struct{})
ticker := time.NewTicker(s.updatePeriod) ticker := time.NewTicker(s.updatePeriod)
// we try to do it synchronously so that user can have reliable messages right away // we try to do it synchronously so that user can have reliable messages right away
@ -153,13 +155,25 @@ func (s *NTPTimeSource) Start() {
} }
} }
}() }()
return nil
} }
// Stop goroutine that updates time source. // Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() { func (s *NTPTimeSource) Stop() error {
if s.quit == nil { if s.quit == nil {
return return nil
} }
close(s.quit) close(s.quit)
s.wg.Wait() s.wg.Wait()
return nil
}
// APIs used to be conformant with service interface
func (s *NTPTimeSource) APIs() []rpc.API {
return nil
}
// Protocols used to conformant with service interface
func (s *NTPTimeSource) Protocols() []p2p.Protocol {
return nil
} }

View File

@ -16,14 +16,18 @@
package whisperv6 package whisperv6
import "time"
// Config represents the configuration state of a whisper node. // Config represents the configuration state of a whisper node.
type Config struct { type Config struct {
MaxMessageSize uint32 `toml:",omitempty"` MaxMessageSize uint32 `toml:",omitempty"`
MinimumAcceptedPOW float64 `toml:",omitempty"` MinimumAcceptedPOW float64 `toml:",omitempty"`
TimeSource func() time.Time
} }
// DefaultConfig represents (shocker!) the default configuration. // DefaultConfig represents (shocker!) the default configuration.
var DefaultConfig = Config{ var DefaultConfig = Config{
MaxMessageSize: DefaultMaxMessageSize, MaxMessageSize: DefaultMaxMessageSize,
MinimumAcceptedPOW: DefaultMinimumPoW, MinimumAcceptedPOW: DefaultMinimumPoW,
TimeSource: time.Now,
} }

View File

@ -112,7 +112,7 @@ func New(cfg *Config) *Whisper {
p2pMsgQueue: make(chan *Envelope, messageQueueLimit), p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
quit: make(chan struct{}), quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance, syncAllowance: DefaultSyncAllowance,
timeSource: time.Now, timeSource: cfg.TimeSource,
} }
whisper.filters = NewFilters(whisper) whisper.filters = NewFilters(whisper)
@ -218,11 +218,6 @@ func (whisper *Whisper) APIs() []rpc.API {
} }
} }
// SetTimeSource sets time source used by whisper for envelopes time and expiration logic.
func (whisper *Whisper) SetTimeSource(timeSource func() time.Time) {
whisper.timeSource = timeSource
}
// GetCurrentTime returns current time. // GetCurrentTime returns current time.
func (whisper *Whisper) GetCurrentTime() time.Time { func (whisper *Whisper) GetCurrentTime() time.Time {
return whisper.timeSource() return whisper.timeSource()