feat: add ntp timesource

This commit is contained in:
Richard Ramos 2022-12-08 23:08:04 -04:00 committed by RichΛrd
parent 066185ee86
commit 84c7022e2d
31 changed files with 718 additions and 102 deletions

View File

@ -46,6 +46,7 @@ func main() {
wakuNode, err := node.New(ctx,
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithNTP(),
node.WithWakuRelay(),
)
if err != nil {
@ -84,7 +85,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
contentTopic := protocol.NewContentTopic("basic2", 1, "test", "proto")
var version uint32 = 0
var timestamp int64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource())
p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -249,7 +249,7 @@ func (c *Chat) SendMessage(line string) {
func (c *Chat) publish(ctx context.Context, message string) error {
msg := &pb.Chat2Message{
Timestamp: uint64(time.Now().Unix()),
Timestamp: uint64(c.node.Timesource().Now().Unix()),
Nick: c.nick,
Payload: []byte(message),
}
@ -260,8 +260,7 @@ func (c *Chat) publish(ctx context.Context, message string) error {
}
var version uint32
var t = time.Now()
var timestamp int64 = utils.GetUnixEpochFrom(t)
var timestamp int64 = utils.GetUnixEpochFrom(c.node.Timesource().Now())
var keyInfo *node.KeyInfo = &node.KeyInfo{}
if c.options.UsePayloadV1 { // Use WakuV1 encryption

View File

@ -31,8 +31,9 @@ func execute(options Options) {
opts := []node.WakuNodeOption{
node.WithPrivateKey(options.NodeKey),
node.WithNTP(),
node.WithHostAddress(hostAddr),
node.WithWakuStore(false, false),
node.WithWakuStore(false, nil),
}
if options.Relay.Enable {

View File

@ -143,7 +143,7 @@ func randomHex(n int) (string, error) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var version uint32 = 0
var timestamp int64 = utils.GetUnixEpoch()
var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource())
p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -213,6 +213,7 @@ func Execute(options Options) {
}
nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...))
nodeOpts = append(nodeOpts, node.WithNTP())
if options.Relay.Enable {
var wakurelayopts []pubsub.Option

View File

@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence/migrations"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -20,6 +21,7 @@ type MessageProvider interface {
Put(env *protocol.Envelope) error
Query(query *pb.HistoryQuery) ([]StoredMessage, error)
MostRecentTimestamp() (int64, error)
Start(timesource timesource.Timesource) error
Stop()
}
@ -31,8 +33,9 @@ const WALMode = "wal"
// DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct {
MessageProvider
db *sql.DB
log *zap.Logger
db *sql.DB
timesource timesource.Timesource
log *zap.Logger
maxMessages int
maxDuration time.Duration
@ -146,15 +149,21 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
}
}
err = result.cleanOlderRecords()
return result, nil
}
func (d *DBStore) Start(timesource timesource.Timesource) error {
d.timesource = timesource
err := d.cleanOlderRecords()
if err != nil {
return nil, err
return err
}
result.wg.Add(1)
go result.checkForOlderRecords(60 * time.Second)
d.wg.Add(1)
go d.checkForOlderRecords(60 * time.Second)
return result, nil
return nil
}
func (d *DBStore) cleanOlderRecords() error {
@ -164,7 +173,7 @@ func (d *DBStore) cleanOlderRecords() error {
if d.maxDuration > 0 {
start := time.Now()
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration)))
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration)))
if err != nil {
return err
}

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -28,6 +29,9 @@ func TestDbStore(t *testing.T) {
store, err := NewDBStore(utils.Logger(), option)
require.NoError(t, err)
err = store.Start(timesource.NewDefaultClock())
require.NoError(t, err)
res, err := store.GetAll()
require.NoError(t, err)
require.Empty(t, res)
@ -45,6 +49,9 @@ func TestStoreRetention(t *testing.T) {
store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second))
require.NoError(t, err)
err = store.Start(timesource.NewDefaultClock())
require.NoError(t, err)
insertTime := time.Now()
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-70*time.Second).UnixNano()), insertTime.Add(-70*time.Second).UnixNano(), "test"))
@ -65,6 +72,9 @@ func TestStoreRetention(t *testing.T) {
store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second))
require.NoError(t, err)
err = store.Start(timesource.NewDefaultClock())
require.NoError(t, err)
dbResults, err = store.GetAll()
require.NoError(t, err)
require.Len(t, dbResults, 3)

View File

@ -24,14 +24,14 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
ticker := time.NewTicker(t)
defer ticker.Stop()
lastTimeExecuted := time.Now()
lastTimeExecuted := w.timesource.Now()
sleepDetectionInterval := int64(t) * 3
for {
select {
case <-ticker.C:
difference := time.Now().UnixNano() - lastTimeExecuted.UnixNano()
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
if difference > sleepDetectionInterval {
w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers")
for _, p := range w.host.Network().Peers() {
@ -40,7 +40,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
w.log.Warn("while disconnecting peer", zap.Error(err))
}
}
lastTimeExecuted = time.Now()
lastTimeExecuted = w.timesource.Now()
continue
}
@ -53,7 +53,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}
lastTimeExecuted = time.Now()
lastTimeExecuted = w.timesource.Now()
case <-w.quit:
return
}

View File

@ -36,6 +36,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -63,9 +64,10 @@ type RLNRelay interface {
}
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
log *zap.Logger
host host.Host
opts *WakuNodeParameters
log *zap.Logger
timesource timesource.Timesource
relay *relay.WakuRelay
filter *filter.WakuFilter
@ -105,7 +107,7 @@ type WakuNode struct {
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log)
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.timesource, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
@ -174,6 +176,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
if params.enableNTP {
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
} else {
w.timesource = timesource.NewDefaultClock()
}
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
} else {
@ -259,6 +267,13 @@ func (w *WakuNode) checkForAddressChanges() {
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start() error {
if w.opts.enableNTP {
err := w.timesource.Start()
if err != nil {
return err
}
}
if w.opts.enableSwap {
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode),
@ -268,11 +283,14 @@ func (w *WakuNode) Start() error {
w.store = w.storeFactory(w)
if w.opts.enableStore {
w.startStore()
err := w.startStore()
if err != nil {
return err
}
}
if w.opts.enableFilter {
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...)
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
if err != nil {
return err
}
@ -302,9 +320,11 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
if w.opts.enableRelay {
err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...)
if err != nil {
return err
}
}
if w.opts.enableRLN {
@ -360,11 +380,19 @@ func (w *WakuNode) Stop() {
w.discoveryV5.Stop()
}
w.relay.Stop()
if w.opts.enableRelay {
w.relay.Stop()
}
w.lightPush.Stop()
w.store.Stop()
_ = w.stopRlnRelay()
err := w.timesource.Stop()
if err != nil {
w.log.Error("stopping timesource", zap.Error(err))
}
w.host.Close()
w.wg.Wait()
@ -395,6 +423,12 @@ func (w *WakuNode) ENR() *enode.Node {
return w.localNode.Node()
}
// Timesource returns the timesource used by this node to obtain the current wall time
// Depending on the configuration it will be the local time or a ntp syncd time
func (w *WakuNode) Timesource() timesource.Timesource {
return w.timesource
}
// Relay is used to access any operation related to Waku Relay protocol
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
@ -461,7 +495,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...)
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.timesource, w.log, opts...)
if err != nil {
return err
}
@ -499,8 +533,12 @@ func (w *WakuNode) mountPeerExchange() error {
return w.peerExchange.Start()
}
func (w *WakuNode) startStore() {
w.store.Start(w.ctx)
func (w *WakuNode) startStore() error {
err := w.store.Start(w.ctx)
if err != nil {
w.log.Error("starting store", zap.Error(err))
return err
}
if len(w.opts.resumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
@ -527,6 +565,7 @@ func (w *WakuNode) startStore() {
}
}()
}
return nil
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error {

View File

@ -47,7 +47,7 @@ func (w *WakuNode) mountRlnRelay() error {
}
// mount rlnrelay in off-chain mode with a static group of users
rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.log)
rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
if err != nil {
return err
}
@ -81,7 +81,7 @@ func (w *WakuNode) mountRlnRelay() error {
// mount the rln relay protocol in the on-chain/dynamic mode
var err error
w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.log)
w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log)
if err != nil {
return err
}

View File

@ -26,6 +26,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -45,6 +46,9 @@ type WakuNodeParameters struct {
privKey *ecdsa.PrivateKey
libP2POpts []libp2p.Option
enableNTP bool
ntpURLs []string
enableWS bool
wsPort int
enableWSS bool
@ -106,7 +110,6 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithLogger(utils.Logger()),
WithWakuRelay(),
}
// MultiAddresses return the list of multiaddresses configured in the node
@ -234,6 +237,21 @@ func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption {
}
}
// WithNTP is used to use ntp for any operation that requires obtaining time
// A list of ntp servers can be passed but if none is specified, some defaults
// will be used
func WithNTP(ntpURLs ...string) WakuNodeOption {
return func(params *WakuNodeParameters) error {
if len(ntpURLs) == 0 {
ntpURLs = timesource.DefaultServers
}
params.enableNTP = true
params.ntpURLs = ntpURLs
return nil
}
}
// GetPrivKey returns the private key used in the node
func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey {
privKey := crypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(w.privKey))

View File

@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) {
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log)
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.timesource, w.log)
}
options := []WakuNodeOption{

View File

@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -25,7 +26,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger())
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)

View File

@ -22,7 +22,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
@ -30,8 +30,9 @@ const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
pubsub *pubsub.PubSub
host host.Host
pubsub *pubsub.PubSub
timesource timesource.Timesource
log *zap.Logger
@ -55,9 +56,10 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) {
w := new(WakuRelay)
w.host = h
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
w.subscriptions = make(map[string][]*Subscription)
@ -343,7 +345,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
return
}
envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t))
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), string(t))
w.log.Info("waku.relay received", logging.HexString("hash", envelope.Hash()))

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -20,7 +21,7 @@ func TestWakuRelay(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
relay, err := NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
defer relay.Stop()
require.NoError(t, err)

View File

@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
@ -20,6 +21,7 @@ func RlnRelayStatic(
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-static")
@ -45,6 +47,7 @@ func RlnRelayStatic(
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Epoch][]r.ProofMetadata),
}
@ -87,6 +90,7 @@ func RlnRelayDynamic(
contentTopic string,
spamHandler SpamHandler,
registrationHandler RegistrationHandler,
timesource timesource.Timesource,
log *zap.Logger,
) (*WakuRLNRelay, error) {
log = log.Named("rln-dynamic")
@ -109,6 +113,7 @@ func RlnRelayDynamic(
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
log: log,
timesource: timesource,
nullifierLog: make(map[r.Epoch][]r.ProofMetadata),
registrationHandler: registrationHandler,
}

View File

@ -10,6 +10,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
r "github.com/waku-org/go-zerokit-rln/rln"
)
@ -32,7 +33,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
s.Require().NoError(err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
defer relay.Stop()
s.Require().NoError(err)
@ -49,7 +50,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
// index also represents the index of the leaf in the Merkle tree that contains node's commitment key
index := r.MembershipIndex(5)
wakuRLNRelay, err := RlnRelayStatic(context.TODO(), relay, groupIDCommitments, groupKeyPairs[index], index, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, utils.Logger())
wakuRLNRelay, err := RlnRelayStatic(context.TODO(), relay, groupIDCommitments, groupKeyPairs[index], index, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)
// get the root of Merkle tree which is constructed inside the mountRlnRelay proc

View File

@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
@ -33,7 +34,8 @@ type RegistrationHandler = func(tx *types.Transaction)
const AcceptableRootWindowSize = 5
type WakuRLNRelay struct {
ctx context.Context
ctx context.Context
timesource timesource.Timesource
membershipKeyPair *r.MembershipKeyPair
@ -206,7 +208,7 @@ func (rln *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time
epoch = r.CalcEpoch(*optionalTime)
} else {
// get current rln epoch
epoch = r.GetCurrentEpoch()
epoch = r.CalcEpoch(rln.timesource.Now())
}
msgProof := ToRateLimitProof(msg)

View File

@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -21,7 +22,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test")
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(msg1)
_ = s.storeMessage(msg3)
_ = s.storeMessage(msg5)
@ -41,8 +42,10 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
for i := 0; i < 10; i++ {
@ -59,8 +62,9 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
@ -95,8 +99,10 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0}
@ -106,8 +112,9 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
@ -131,8 +138,10 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0}
@ -142,8 +151,10 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)

View File

@ -9,6 +9,7 @@ import (
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
)
@ -46,9 +47,10 @@ var (
)
type WakuStore struct {
ctx context.Context
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
ctx context.Context
timesource timesource.Timesource
MsgC chan *protocol.Envelope
wg *sync.WaitGroup
log *zap.Logger
@ -61,7 +63,7 @@ type WakuStore struct {
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *zap.Logger) *WakuStore {
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
@ -69,6 +71,7 @@ func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *z
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.quit = make(chan struct{})
wakuStore.timesource = timesource
return wakuStore
}

View File

@ -6,13 +6,14 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestStorePersistence(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(nil, nil, db, utils.Logger())
s1 := NewWakuStore(nil, nil, db, timesource.NewDefaultClock(), utils.Logger())
defaultPubSubTopic := "test"
defaultContentTopic := "1"

View File

@ -17,7 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
@ -78,12 +78,13 @@ type MessageProvider interface {
Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error)
Put(env *protocol.Envelope) error
MostRecentTimestamp() (int64, error)
Start(timesource timesource.Timesource) error
Stop()
Count() (int, error)
}
type Store interface {
Start(ctx context.Context)
Start(ctx context.Context) error
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
@ -98,14 +99,20 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) {
}
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context) {
func (store *WakuStore) Start(ctx context.Context) error {
if store.started {
return
return nil
}
if store.msgProvider == nil {
store.log.Info("Store protocol started (no message provider)")
return
return nil
}
err := store.msgProvider.Start(store.timesource)
if err != nil {
store.log.Error("Error starting message provider", zap.Error(err))
return nil
}
store.started = true
@ -119,6 +126,8 @@ func (store *WakuStore) Start(ctx context.Context) {
go store.updateMetrics(ctx)
store.log.Info("Store protocol started")
return nil
}
func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
@ -226,6 +235,7 @@ func (store *WakuStore) Stop() {
}
if store.msgProvider != nil {
store.msgProvider.Stop()
store.quit <- struct{}{}
}
@ -298,14 +308,13 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
return 0, errors.New("can't resume: store has not started")
}
currentTime := utils.GetUnixEpoch()
lastSeenTime, err := store.findLastSeen()
if err != nil {
return 0, err
}
var offset int64 = int64(20 * time.Nanosecond)
currentTime = currentTime + offset
currentTime := store.timesource.Now().UnixNano() + offset
lastSeenTime = max(lastSeenTime-offset, 0)
rpc := &pb.HistoryQuery{
@ -330,7 +339,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
msgCount := 0
for _, msg := range messages {
if err = store.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic)); err == nil {
if err = store.storeMessage(protocol.NewEnvelope(msg, store.timesource.Now().UnixNano(), pubsubTopic)); err == nil {
msgCount++
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -20,8 +21,10 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
@ -39,8 +42,9 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
@ -67,10 +71,9 @@ func TestWakuStoreProtocolNext(t *testing.T) {
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(host1, nil, db, utils.Logger())
s1.Start(ctx)
defer s1.Stop()
s1 := NewWakuStore(host1, nil, db, timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
topic1 := "1"
pubsubTopic1 := "topic1"
@ -94,8 +97,9 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, db, utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
q := Query{
@ -133,10 +137,9 @@ func TestWakuStoreProtocolFind(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(host1, nil, db, utils.Logger())
s1.Start(ctx)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
@ -169,8 +172,9 @@ func TestWakuStoreProtocolFind(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, db, utils.Logger())
s2.Start(ctx)
s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s2.Start(ctx)
require.NoError(t, err)
defer s2.Stop()
q := Query{

View File

@ -7,6 +7,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -17,7 +18,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -43,7 +44,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -77,7 +78,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -109,7 +110,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -131,7 +132,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
@ -150,7 +151,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -174,7 +175,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
@ -200,7 +201,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
var messages []*pb.WakuMessage
for i := 0; i < 10; i++ {

View File

@ -15,6 +15,7 @@ import (
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -33,7 +34,7 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger())
relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger())
require.NoError(t, err)
defer relay.Stop()

203
waku/v2/timesource/ntp.go Normal file
View File

@ -0,0 +1,203 @@
package timesource
import (
"bytes"
"errors"
"sort"
"sync"
"time"
"github.com/beevik/ntp"
"go.uber.org/zap"
)
const (
// DefaultMaxAllowedFailures defines how many failures will be tolerated.
DefaultMaxAllowedFailures = 1
// FastNTPSyncPeriod period between ntp synchronizations before the first
// successful connection.
FastNTPSyncPeriod = 2 * time.Minute
// SlowNTPSyncPeriod period between ntp synchronizations after the first
// successful connection.
SlowNTPSyncPeriod = 1 * time.Hour
// DefaultRPCTimeout defines write deadline for single ntp server request.
DefaultRPCTimeout = 2 * time.Second
)
// DefaultServers will be resolved to the closest available,
// and with high probability resolved to the different IPs
var DefaultServers = []string{
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
}
var errUpdateOffset = errors.New("failed to compute offset")
type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error)
type queryResponse struct {
Offset time.Duration
Error error
}
type multiRPCError []error
func (e multiRPCError) Error() string {
var b bytes.Buffer
b.WriteString("RPC failed: ")
more := false
for _, err := range e {
if more {
b.WriteString("; ")
}
b.WriteString(err.Error())
more = true
}
b.WriteString(".")
return b.String()
}
func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) {
if len(servers) == 0 {
return 0, nil
}
responses := make(chan queryResponse, len(servers))
for _, server := range servers {
go func(server string) {
response, err := timeQuery(server, ntp.QueryOptions{
Timeout: DefaultRPCTimeout,
})
if err == nil {
err = response.Validate()
}
if err != nil {
responses <- queryResponse{Error: err}
return
}
responses <- queryResponse{Offset: response.ClockOffset}
}(server)
}
var (
rpcErrors multiRPCError
offsets []time.Duration
collected int
)
for response := range responses {
if response.Error != nil {
rpcErrors = append(rpcErrors, response.Error)
} else {
offsets = append(offsets, response.Offset)
}
collected++
if collected == len(servers) {
break
}
}
if lth := len(rpcErrors); lth > allowedFailures {
return 0, rpcErrors
} else if lth == len(servers) {
return 0, rpcErrors
}
sort.SliceStable(offsets, func(i, j int) bool {
return offsets[i] > offsets[j]
})
mid := len(offsets) / 2
if len(offsets)%2 == 0 {
return (offsets[mid-1] + offsets[mid]) / 2, nil
}
return offsets[mid], nil
}
func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource {
return &NTPTimeSource{
servers: ntpServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: FastNTPSyncPeriod,
slowNTPSyncPeriod: SlowNTPSyncPeriod,
timeQuery: ntp.QueryWithOptions,
log: log.Named("timesource"),
}
}
// NTPTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
servers []string
allowedFailures int
fastNTPSyncPeriod time.Duration
slowNTPSyncPeriod time.Duration
timeQuery ntpQuery // for ease of testing
log *zap.Logger
quit chan struct{}
wg sync.WaitGroup
mu sync.RWMutex
latestOffset time.Duration
}
// Now returns time adjusted by latest known offset
func (s *NTPTimeSource) Now() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return time.Now().Add(s.latestOffset)
}
func (s *NTPTimeSource) updateOffset() error {
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
if err != nil {
s.log.Error("failed to compute offset", zap.Error(err))
return errUpdateOffset
}
s.log.Info("Difference with ntp servers", zap.Duration("offset", offset))
s.mu.Lock()
s.latestOffset = offset
s.mu.Unlock()
return nil
}
// runPeriodically runs periodically the given function based on NTPTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(fn func() error) error {
var period time.Duration
s.quit = make(chan struct{})
// we try to do it synchronously so that user can have reliable messages right away
s.wg.Add(1)
go func() {
for {
select {
case <-time.After(period):
if err := fn(); err == nil {
period = s.slowNTPSyncPeriod
} else if period != s.slowNTPSyncPeriod {
period = s.fastNTPSyncPeriod
}
case <-s.quit:
s.wg.Done()
return
}
}
}()
return nil
}
// Start runs a goroutine that updates local offset every updatePeriod.
func (s *NTPTimeSource) Start() error {
return s.runPeriodically(s.updateOffset)
}
// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() error {
if s.quit == nil {
return nil
}
close(s.quit)
s.wg.Wait()
return nil
}

View File

@ -0,0 +1,249 @@
package timesource
import (
"errors"
"sync"
"testing"
"time"
"github.com/beevik/ntp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
)
const (
// clockCompareDelta declares time required between multiple calls to time.Now
clockCompareDelta = 100 * time.Microsecond
)
// we don't user real servers for tests, but logic depends on
// actual number of involved NTP servers.
var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"}
type testCase struct {
description string
servers []string
allowedFailures int
responses []queryResponse
expected time.Duration
expectError bool
// actual attempts are mutable
mu sync.Mutex
actualAttempts int
}
func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) {
tc.mu.Lock()
defer func() {
tc.actualAttempts++
tc.mu.Unlock()
}()
response := &ntp.Response{
ClockOffset: tc.responses[tc.actualAttempts].Offset,
Stratum: 1,
}
return response, tc.responses[tc.actualAttempts].Error
}
func newTestCases() []*testCase {
return []*testCase{
{
description: "SameResponse",
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 10 * time.Second},
{Offset: 10 * time.Second},
{Offset: 10 * time.Second},
},
expected: 10 * time.Second,
},
{
description: "Median",
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 20 * time.Second},
{Offset: 20 * time.Second},
{Offset: 30 * time.Second},
},
expected: 20 * time.Second,
},
{
description: "EvenMedian",
servers: mockedServers[:2],
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 20 * time.Second},
},
expected: 15 * time.Second,
},
{
description: "Error",
servers: mockedServers,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
{Offset: 30 * time.Second},
{Offset: 30 * time.Second},
},
expected: time.Duration(0),
expectError: true,
},
{
description: "MultiError",
servers: mockedServers,
responses: []queryResponse{
{Error: errors.New("test 1")},
{Error: errors.New("test 2")},
{Error: errors.New("test 3")},
{Error: errors.New("test 3")},
},
expected: time.Duration(0),
expectError: true,
},
{
description: "TolerableError",
servers: mockedServers,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
{Offset: 20 * time.Second},
{Offset: 30 * time.Second},
},
expected: 20 * time.Second,
},
{
description: "NonTolerableError",
servers: mockedServers,
allowedFailures: 1,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Error: errors.New("test")},
{Error: errors.New("test")},
{Error: errors.New("test")},
},
expected: time.Duration(0),
expectError: true,
},
{
description: "AllFailed",
servers: mockedServers,
allowedFailures: 4,
responses: []queryResponse{
{Error: errors.New("test")},
{Error: errors.New("test")},
{Error: errors.New("test")},
{Error: errors.New("test")},
},
expected: time.Duration(0),
expectError: true,
},
{
description: "HalfTolerable",
servers: mockedServers,
allowedFailures: 2,
responses: []queryResponse{
{Offset: 10 * time.Second},
{Offset: 20 * time.Second},
{Error: errors.New("test")},
{Error: errors.New("test")},
},
expected: 15 * time.Second,
},
}
}
func TestComputeOffset(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tc.expected, offset)
})
}
}
func TestNTPTimeSource(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
log: utils.Logger(),
}
assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta)
err := source.updateOffset()
if tc.expectError {
assert.Equal(t, errUpdateOffset, err)
} else {
assert.NoError(t, err)
}
assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta)
})
}
}
func TestRunningPeriodically(t *testing.T) {
var hits int
var mu sync.RWMutex
periods := make([]time.Duration, 0)
tc := newTestCases()[0]
fastHits := 3
slowHits := 1
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond,
slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond,
log: utils.Logger(),
}
lastCall := time.Now()
// we're simulating a calls to updateOffset, testing ntp calls happens
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
err := source.runPeriodically(func() error {
mu.Lock()
periods = append(periods, time.Since(lastCall))
mu.Unlock()
hits++
if hits < 3 {
return errUpdateOffset
}
if hits == 6 {
source.wg.Done()
}
return nil
})
source.wg.Wait()
require.NoError(t, err)
mu.Lock()
require.Len(t, periods, 6)
defer mu.Unlock()
prev := 0
for _, period := range periods[1:3] {
p := int(period.Seconds() * 100)
require.True(t, fastHits <= (p-prev))
prev = p
}
for _, period := range periods[3:] {
p := int(period.Seconds() * 100)
require.True(t, slowHits <= (p-prev))
prev = p
}
})
}

View File

@ -0,0 +1,9 @@
package timesource
import "time"
type Timesource interface {
Now() time.Time
Start() error
Stop() error
}

View File

@ -0,0 +1,24 @@
package timesource
import "time"
type WallClockTimeSource struct {
}
func NewDefaultClock() *WallClockTimeSource {
return &WallClockTimeSource{}
}
func (t *WallClockTimeSource) Now() time.Time {
return time.Now()
}
func (t *WallClockTimeSource) Start() error {
// Do nothing
return nil
}
func (t *WallClockTimeSource) Stop() error {
// Do nothing
return nil
}

View File

@ -1,14 +1,25 @@
package utils
import "time"
import (
"time"
)
// GetUnixEpochFrom converts a time into a unix timestamp with nanoseconds
func GetUnixEpochFrom(now time.Time) int64 {
return now.UnixNano()
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() int64 {
return GetUnixEpochFrom(time.Now())
type Timesource interface {
Now() time.Time
}
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds.
// Optionally receives a timesource to obtain the time from
func GetUnixEpoch(timesource ...Timesource) int64 {
if len(timesource) != 0 {
return GetUnixEpochFrom(timesource[0].Now())
} else {
return GetUnixEpochFrom(time.Now())
}
}