From 84c7022e2d33c4135c6a5482fd4b02a18427eac8 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 8 Dec 2022 23:08:04 -0400 Subject: [PATCH] feat: add ntp timesource --- examples/basic2/main.go | 3 +- examples/chat2/chat.go | 5 +- examples/chat2/exec.go | 3 +- examples/filter2/main.go | 2 +- waku/node.go | 1 + waku/persistence/store.go | 25 +- waku/persistence/store_test.go | 10 + waku/v2/node/keepalive.go | 8 +- waku/v2/node/wakunode2.go | 65 ++++- waku/v2/node/wakunode2_rln.go | 4 +- waku/v2/node/wakuoptions.go | 20 +- waku/v2/node/wakuoptions_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 3 +- waku/v2/protocol/relay/waku_relay.go | 12 +- waku/v2/protocol/relay/waku_relay_test.go | 3 +- waku/v2/protocol/rln/rln_relay_builder.go | 5 + waku/v2/protocol/rln/rln_relay_test.go | 5 +- waku/v2/protocol/rln/waku_rln_relay.go | 6 +- waku/v2/protocol/store/waku_resume_test.go | 37 ++- waku/v2/protocol/store/waku_store_common.go | 11 +- .../store/waku_store_persistence_test.go | 3 +- waku/v2/protocol/store/waku_store_protocol.go | 25 +- .../store/waku_store_protocol_test.go | 36 +-- .../protocol/store/waku_store_query_test.go | 17 +- waku/v2/rpc/admin_test.go | 3 +- waku/v2/timesource/ntp.go | 203 ++++++++++++++ waku/v2/timesource/ntp_test.go | 249 ++++++++++++++++++ waku/v2/timesource/timesource.go | 9 + waku/v2/timesource/wall.go | 24 ++ {utils => waku/v2/utils}/ntp.go | 0 waku/v2/utils/time.go | 21 +- 31 files changed, 718 insertions(+), 102 deletions(-) create mode 100644 waku/v2/timesource/ntp.go create mode 100644 waku/v2/timesource/ntp_test.go create mode 100644 waku/v2/timesource/timesource.go create mode 100644 waku/v2/timesource/wall.go rename {utils => waku/v2/utils}/ntp.go (100%) diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 646d9bef..3ac1035a 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -46,6 +46,7 @@ func main() { wakuNode, err := node.New(ctx, node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), + node.WithNTP(), node.WithWakuRelay(), ) if err != nil { @@ -84,7 +85,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { contentTopic := protocol.NewContentTopic("basic2", 1, "test", "proto") var version uint32 = 0 - var timestamp int64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource()) p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index cbbf3d8f..db9c50ba 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -249,7 +249,7 @@ func (c *Chat) SendMessage(line string) { func (c *Chat) publish(ctx context.Context, message string) error { msg := &pb.Chat2Message{ - Timestamp: uint64(time.Now().Unix()), + Timestamp: uint64(c.node.Timesource().Now().Unix()), Nick: c.nick, Payload: []byte(message), } @@ -260,8 +260,7 @@ func (c *Chat) publish(ctx context.Context, message string) error { } var version uint32 - var t = time.Now() - var timestamp int64 = utils.GetUnixEpochFrom(t) + var timestamp int64 = utils.GetUnixEpochFrom(c.node.Timesource().Now()) var keyInfo *node.KeyInfo = &node.KeyInfo{} if c.options.UsePayloadV1 { // Use WakuV1 encryption diff --git a/examples/chat2/exec.go b/examples/chat2/exec.go index 49748db4..2b0c33fc 100644 --- a/examples/chat2/exec.go +++ b/examples/chat2/exec.go @@ -31,8 +31,9 @@ func execute(options Options) { opts := []node.WakuNodeOption{ node.WithPrivateKey(options.NodeKey), + node.WithNTP(), node.WithHostAddress(hostAddr), - node.WithWakuStore(false, false), + node.WithWakuStore(false, nil), } if options.Relay.Enable { diff --git a/examples/filter2/main.go b/examples/filter2/main.go index c27df751..3431309e 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -143,7 +143,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var version uint32 = 0 - var timestamp int64 = utils.GetUnixEpoch() + var timestamp int64 = utils.GetUnixEpoch(wakuNode.Timesource()) p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/waku/node.go b/waku/node.go index b6d8cff4..6431f6cf 100644 --- a/waku/node.go +++ b/waku/node.go @@ -213,6 +213,7 @@ func Execute(options Options) { } nodeOpts = append(nodeOpts, node.WithLibP2POptions(libp2pOpts...)) + nodeOpts = append(nodeOpts, node.WithNTP()) if options.Relay.Enable { var wakurelayopts []pubsub.Option diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 6baab313..dc922cae 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/migrations" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -20,6 +21,7 @@ type MessageProvider interface { Put(env *protocol.Envelope) error Query(query *pb.HistoryQuery) ([]StoredMessage, error) MostRecentTimestamp() (int64, error) + Start(timesource timesource.Timesource) error Stop() } @@ -31,8 +33,9 @@ const WALMode = "wal" // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { MessageProvider - db *sql.DB - log *zap.Logger + db *sql.DB + timesource timesource.Timesource + log *zap.Logger maxMessages int maxDuration time.Duration @@ -146,15 +149,21 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { } } - err = result.cleanOlderRecords() + return result, nil +} + +func (d *DBStore) Start(timesource timesource.Timesource) error { + d.timesource = timesource + + err := d.cleanOlderRecords() if err != nil { - return nil, err + return err } - result.wg.Add(1) - go result.checkForOlderRecords(60 * time.Second) + d.wg.Add(1) + go d.checkForOlderRecords(60 * time.Second) - return result, nil + return nil } func (d *DBStore) cleanOlderRecords() error { @@ -164,7 +173,7 @@ func (d *DBStore) cleanOlderRecords() error { if d.maxDuration > 0 { start := time.Now() sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?` - _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(time.Now().Add(-d.maxDuration))) + _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(d.timesource.Now().Add(-d.maxDuration))) if err != nil { return err } diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 93d2819d..05c2b6c0 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -28,6 +29,9 @@ func TestDbStore(t *testing.T) { store, err := NewDBStore(utils.Logger(), option) require.NoError(t, err) + err = store.Start(timesource.NewDefaultClock()) + require.NoError(t, err) + res, err := store.GetAll() require.NoError(t, err) require.Empty(t, res) @@ -45,6 +49,9 @@ func TestStoreRetention(t *testing.T) { store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) + err = store.Start(timesource.NewDefaultClock()) + require.NoError(t, err) + insertTime := time.Now() _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-70*time.Second).UnixNano()), insertTime.Add(-70*time.Second).UnixNano(), "test")) @@ -65,6 +72,9 @@ func TestStoreRetention(t *testing.T) { store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) + err = store.Start(timesource.NewDefaultClock()) + require.NoError(t, err) + dbResults, err = store.GetAll() require.NoError(t, err) require.Len(t, dbResults, 3) diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index f0b6d6bf..415944ae 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -24,14 +24,14 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { ticker := time.NewTicker(t) defer ticker.Stop() - lastTimeExecuted := time.Now() + lastTimeExecuted := w.timesource.Now() sleepDetectionInterval := int64(t) * 3 for { select { case <-ticker.C: - difference := time.Now().UnixNano() - lastTimeExecuted.UnixNano() + difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() if difference > sleepDetectionInterval { w.log.Warn("keep alive hasnt been executed recently. Killing all connections to peers") for _, p := range w.host.Network().Peers() { @@ -40,7 +40,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { w.log.Warn("while disconnecting peer", zap.Error(err)) } } - lastTimeExecuted = time.Now() + lastTimeExecuted = w.timesource.Now() continue } @@ -53,7 +53,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } } - lastTimeExecuted = time.Now() + lastTimeExecuted = w.timesource.Now() case <-w.quit: return } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index fccc4190..46869ca0 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -36,6 +36,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/swap" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -63,9 +64,10 @@ type RLNRelay interface { } type WakuNode struct { - host host.Host - opts *WakuNodeParameters - log *zap.Logger + host host.Host + opts *WakuNodeParameters + log *zap.Logger + timesource timesource.Timesource relay *relay.WakuRelay filter *filter.WakuFilter @@ -105,7 +107,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.timesource, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -174,6 +176,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) + if params.enableNTP { + w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log) + } else { + w.timesource = timesource.NewDefaultClock() + } + if params.storeFactory != nil { w.storeFactory = params.storeFactory } else { @@ -259,6 +267,13 @@ func (w *WakuNode) checkForAddressChanges() { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start() error { + if w.opts.enableNTP { + err := w.timesource.Start() + if err != nil { + return err + } + } + if w.opts.enableSwap { w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{ swap.WithMode(w.opts.swapMode), @@ -268,11 +283,14 @@ func (w *WakuNode) Start() error { w.store = w.storeFactory(w) if w.opts.enableStore { - w.startStore() + err := w.startStore() + if err != nil { + return err + } } if w.opts.enableFilter { - filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...) + filter, err := filter.NewWakuFilter(w.ctx, w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) if err != nil { return err } @@ -302,9 +320,11 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) } - err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...) - if err != nil { - return err + if w.opts.enableRelay { + err = w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...) + if err != nil { + return err + } } if w.opts.enableRLN { @@ -360,11 +380,19 @@ func (w *WakuNode) Stop() { w.discoveryV5.Stop() } - w.relay.Stop() + if w.opts.enableRelay { + w.relay.Stop() + } + w.lightPush.Stop() w.store.Stop() _ = w.stopRlnRelay() + err := w.timesource.Stop() + if err != nil { + w.log.Error("stopping timesource", zap.Error(err)) + } + w.host.Close() w.wg.Wait() @@ -395,6 +423,12 @@ func (w *WakuNode) ENR() *enode.Node { return w.localNode.Node() } +// Timesource returns the timesource used by this node to obtain the current wall time +// Depending on the configuration it will be the local time or a ntp syncd time +func (w *WakuNode) Timesource() timesource.Timesource { + return w.timesource +} + // Relay is used to access any operation related to Waku Relay protocol func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay @@ -461,7 +495,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.timesource, w.log, opts...) if err != nil { return err } @@ -499,8 +533,12 @@ func (w *WakuNode) mountPeerExchange() error { return w.peerExchange.Start() } -func (w *WakuNode) startStore() { - w.store.Start(w.ctx) +func (w *WakuNode) startStore() error { + err := w.store.Start(w.ctx) + if err != nil { + w.log.Error("starting store", zap.Error(err)) + return err + } if len(w.opts.resumeNodes) != 0 { // TODO: extract this to a function and run it when you go offline @@ -527,6 +565,7 @@ func (w *WakuNode) startStore() { } }() } + return nil } func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error { diff --git a/waku/v2/node/wakunode2_rln.go b/waku/v2/node/wakunode2_rln.go index bc90bb4e..a1a35af4 100644 --- a/waku/v2/node/wakunode2_rln.go +++ b/waku/v2/node/wakunode2_rln.go @@ -47,7 +47,7 @@ func (w *WakuNode) mountRlnRelay() error { } // mount rlnrelay in off-chain mode with a static group of users - rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.log) + rlnRelay, err := rln.RlnRelayStatic(w.ctx, w.relay, groupKeys, memKeyPair, memIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log) if err != nil { return err } @@ -81,7 +81,7 @@ func (w *WakuNode) mountRlnRelay() error { // mount the rln relay protocol in the on-chain/dynamic mode var err error - w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.log) + w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log) if err != nil { return err } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 2b8fa76c..99d1d8aa 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -26,6 +26,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -45,6 +46,9 @@ type WakuNodeParameters struct { privKey *ecdsa.PrivateKey libP2POpts []libp2p.Option + enableNTP bool + ntpURLs []string + enableWS bool wsPort int enableWSS bool @@ -106,7 +110,6 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ WithLogger(utils.Logger()), - WithWakuRelay(), } // MultiAddresses return the list of multiaddresses configured in the node @@ -234,6 +237,21 @@ func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption { } } +// WithNTP is used to use ntp for any operation that requires obtaining time +// A list of ntp servers can be passed but if none is specified, some defaults +// will be used +func WithNTP(ntpURLs ...string) WakuNodeOption { + return func(params *WakuNodeParameters) error { + if len(ntpURLs) == 0 { + ntpURLs = timesource.DefaultServers + } + + params.enableNTP = true + params.ntpURLs = ntpURLs + return nil + } +} + // GetPrivKey returns the private key used in the node func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey { privKey := crypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(w.privKey)) diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index f2c15cd4..18910784 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) { advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.timesource, w.log) } options := []WakuNodeOption{ diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 753093f5..e67ff246 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -25,7 +26,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, timesource.NewDefaultClock(), utils.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index f88d97b7..08a2472d 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -22,7 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/metrics" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/timesource" ) const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") @@ -30,8 +30,9 @@ const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() type WakuRelay struct { - host host.Host - pubsub *pubsub.PubSub + host host.Host + pubsub *pubsub.PubSub + timesource timesource.Timesource log *zap.Logger @@ -55,9 +56,10 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { } // NewWakuRelay returns a new instance of a WakuRelay struct -func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) { +func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h + w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.relaySubs = make(map[string]*pubsub.Subscription) w.subscriptions = make(map[string][]*Subscription) @@ -343,7 +345,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * return } - envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t)) + envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), string(t)) w.log.Info("waku.relay received", logging.HexString("hash", envelope.Hash())) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 6b7e285d..82498d16 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -20,7 +21,7 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) + relay, err := NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) defer relay.Stop() require.NoError(t, err) diff --git a/waku/v2/protocol/rln/rln_relay_builder.go b/waku/v2/protocol/rln/rln_relay_builder.go index 9e63d551..615e1571 100644 --- a/waku/v2/protocol/rln/rln_relay_builder.go +++ b/waku/v2/protocol/rln/rln_relay_builder.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" r "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" ) @@ -20,6 +21,7 @@ func RlnRelayStatic( pubsubTopic string, contentTopic string, spamHandler SpamHandler, + timesource timesource.Timesource, log *zap.Logger, ) (*WakuRLNRelay, error) { log = log.Named("rln-static") @@ -45,6 +47,7 @@ func RlnRelayStatic( pubsubTopic: pubsubTopic, contentTopic: contentTopic, log: log, + timesource: timesource, nullifierLog: make(map[r.Epoch][]r.ProofMetadata), } @@ -87,6 +90,7 @@ func RlnRelayDynamic( contentTopic string, spamHandler SpamHandler, registrationHandler RegistrationHandler, + timesource timesource.Timesource, log *zap.Logger, ) (*WakuRLNRelay, error) { log = log.Named("rln-dynamic") @@ -109,6 +113,7 @@ func RlnRelayDynamic( pubsubTopic: pubsubTopic, contentTopic: contentTopic, log: log, + timesource: timesource, nullifierLog: make(map[r.Epoch][]r.ProofMetadata), registrationHandler: registrationHandler, } diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index 702f9b56..3b19912d 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -10,6 +10,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" r "github.com/waku-org/go-zerokit-rln/rln" ) @@ -32,7 +33,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { host, err := tests.MakeHost(context.Background(), port, rand.Reader) s.Require().NoError(err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) defer relay.Stop() s.Require().NoError(err) @@ -49,7 +50,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { // index also represents the index of the leaf in the Merkle tree that contains node's commitment key index := r.MembershipIndex(5) - wakuRLNRelay, err := RlnRelayStatic(context.TODO(), relay, groupIDCommitments, groupKeyPairs[index], index, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, utils.Logger()) + wakuRLNRelay, err := RlnRelayStatic(context.TODO(), relay, groupIDCommitments, groupKeyPairs[index], index, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger()) s.Require().NoError(err) // get the root of Merkle tree which is constructed inside the mountRlnRelay proc diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 6017c868..b6ac449f 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" r "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" @@ -33,7 +34,8 @@ type RegistrationHandler = func(tx *types.Transaction) const AcceptableRootWindowSize = 5 type WakuRLNRelay struct { - ctx context.Context + ctx context.Context + timesource timesource.Timesource membershipKeyPair *r.MembershipKeyPair @@ -206,7 +208,7 @@ func (rln *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime *time epoch = r.CalcEpoch(*optionalTime) } else { // get current rln epoch - epoch = r.GetCurrentEpoch() + epoch = r.CalcEpoch(rln.timesource.Now()) } msgProof := ToRateLimitProof(msg) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index a9d6b226..5829aefd 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -21,7 +22,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test") - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -41,8 +42,10 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) - s1.Start(ctx) + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + defer s1.Stop() for i := 0; i < 10; i++ { @@ -59,8 +62,9 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -95,8 +99,10 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) - s1.Start(ctx) + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + defer s1.Stop() msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0} @@ -106,8 +112,9 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -131,8 +138,10 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) - s1.Start(ctx) + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + defer s1.Stop() msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: 0} @@ -142,8 +151,10 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) + defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 579b8a87..36b04309 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -9,6 +9,7 @@ import ( libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/swap" + "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" ) @@ -46,9 +47,10 @@ var ( ) type WakuStore struct { - ctx context.Context - MsgC chan *protocol.Envelope - wg *sync.WaitGroup + ctx context.Context + timesource timesource.Timesource + MsgC chan *protocol.Envelope + wg *sync.WaitGroup log *zap.Logger @@ -61,7 +63,7 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *zap.Logger) *WakuStore { +func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host @@ -69,6 +71,7 @@ func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *z wakuStore.wg = &sync.WaitGroup{} wakuStore.log = log.Named("store") wakuStore.quit = make(chan struct{}) + wakuStore.timesource = timesource return wakuStore } diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index bbfcdc86..2d66312e 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -6,13 +6,14 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) func TestStorePersistence(t *testing.T) { db := MemoryDB(t) - s1 := NewWakuStore(nil, nil, db, utils.Logger()) + s1 := NewWakuStore(nil, nil, db, timesource.NewDefaultClock(), utils.Logger()) defaultPubSubTopic := "test" defaultContentTopic := "1" diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index 78e764f7..0b15fabf 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -17,7 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/timesource" ) // MaxTimeVariance is the maximum duration in the future allowed for a message timestamp @@ -78,12 +78,13 @@ type MessageProvider interface { Query(query *pb.HistoryQuery) (*pb.Index, []persistence.StoredMessage, error) Put(env *protocol.Envelope) error MostRecentTimestamp() (int64, error) + Start(timesource timesource.Timesource) error Stop() Count() (int, error) } type Store interface { - Start(ctx context.Context) + Start(ctx context.Context) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) Next(ctx context.Context, r *Result) (*Result, error) @@ -98,14 +99,20 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) { } // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider -func (store *WakuStore) Start(ctx context.Context) { +func (store *WakuStore) Start(ctx context.Context) error { if store.started { - return + return nil } if store.msgProvider == nil { store.log.Info("Store protocol started (no message provider)") - return + return nil + } + + err := store.msgProvider.Start(store.timesource) + if err != nil { + store.log.Error("Error starting message provider", zap.Error(err)) + return nil } store.started = true @@ -119,6 +126,8 @@ func (store *WakuStore) Start(ctx context.Context) { go store.updateMetrics(ctx) store.log.Info("Store protocol started") + + return nil } func (store *WakuStore) storeMessage(env *protocol.Envelope) error { @@ -226,6 +235,7 @@ func (store *WakuStore) Stop() { } if store.msgProvider != nil { + store.msgProvider.Stop() store.quit <- struct{}{} } @@ -298,14 +308,13 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return 0, errors.New("can't resume: store has not started") } - currentTime := utils.GetUnixEpoch() lastSeenTime, err := store.findLastSeen() if err != nil { return 0, err } var offset int64 = int64(20 * time.Nanosecond) - currentTime = currentTime + offset + currentTime := store.timesource.Now().UnixNano() + offset lastSeenTime = max(lastSeenTime-offset, 0) rpc := &pb.HistoryQuery{ @@ -330,7 +339,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList msgCount := 0 for _, msg := range messages { - if err = store.storeMessage(protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic)); err == nil { + if err = store.storeMessage(protocol.NewEnvelope(msg, store.timesource.Now().UnixNano(), pubsubTopic)); err == nil { msgCount++ } } diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index ed297784..b31e44b2 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -10,6 +10,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -20,8 +21,10 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) - s1.Start(ctx) + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + defer s1.Stop() topic1 := "1" @@ -39,8 +42,9 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) - s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -67,10 +71,9 @@ func TestWakuStoreProtocolNext(t *testing.T) { require.NoError(t, err) db := MemoryDB(t) - - s1 := NewWakuStore(host1, nil, db, utils.Logger()) - s1.Start(ctx) - defer s1.Stop() + s1 := NewWakuStore(host1, nil, db, timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) topic1 := "1" pubsubTopic1 := "topic1" @@ -94,8 +97,9 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, db, utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) defer s2.Stop() q := Query{ @@ -133,10 +137,9 @@ func TestWakuStoreProtocolFind(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - db := MemoryDB(t) - - s1 := NewWakuStore(host1, nil, db, utils.Logger()) - s1.Start(ctx) + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) defer s1.Stop() topic1 := "1" @@ -169,8 +172,9 @@ func TestWakuStoreProtocolFind(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, db, utils.Logger()) - s2.Start(ctx) + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) defer s2.Stop() q := Query{ diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index a6812c3f..690404c3 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -7,6 +7,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -17,7 +18,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -43,7 +44,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -77,7 +78,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -109,7 +110,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -131,7 +132,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)) @@ -150,7 +151,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +175,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +201,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) var messages []*pb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index d1ad688d..6ff45b74 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -33,7 +34,7 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/timesource/ntp.go b/waku/v2/timesource/ntp.go new file mode 100644 index 00000000..b019d8cf --- /dev/null +++ b/waku/v2/timesource/ntp.go @@ -0,0 +1,203 @@ +package timesource + +import ( + "bytes" + "errors" + "sort" + "sync" + "time" + + "github.com/beevik/ntp" + "go.uber.org/zap" +) + +const ( + // DefaultMaxAllowedFailures defines how many failures will be tolerated. + DefaultMaxAllowedFailures = 1 + + // FastNTPSyncPeriod period between ntp synchronizations before the first + // successful connection. + FastNTPSyncPeriod = 2 * time.Minute + + // SlowNTPSyncPeriod period between ntp synchronizations after the first + // successful connection. + SlowNTPSyncPeriod = 1 * time.Hour + + // DefaultRPCTimeout defines write deadline for single ntp server request. + DefaultRPCTimeout = 2 * time.Second +) + +// DefaultServers will be resolved to the closest available, +// and with high probability resolved to the different IPs +var DefaultServers = []string{ + "0.pool.ntp.org", + "1.pool.ntp.org", + "2.pool.ntp.org", + "3.pool.ntp.org", +} +var errUpdateOffset = errors.New("failed to compute offset") + +type ntpQuery func(string, ntp.QueryOptions) (*ntp.Response, error) + +type queryResponse struct { + Offset time.Duration + Error error +} + +type multiRPCError []error + +func (e multiRPCError) Error() string { + var b bytes.Buffer + b.WriteString("RPC failed: ") + more := false + for _, err := range e { + if more { + b.WriteString("; ") + } + b.WriteString(err.Error()) + more = true + } + b.WriteString(".") + return b.String() +} + +func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (time.Duration, error) { + if len(servers) == 0 { + return 0, nil + } + responses := make(chan queryResponse, len(servers)) + for _, server := range servers { + go func(server string) { + response, err := timeQuery(server, ntp.QueryOptions{ + Timeout: DefaultRPCTimeout, + }) + if err == nil { + err = response.Validate() + } + if err != nil { + responses <- queryResponse{Error: err} + return + } + responses <- queryResponse{Offset: response.ClockOffset} + }(server) + } + var ( + rpcErrors multiRPCError + offsets []time.Duration + collected int + ) + for response := range responses { + if response.Error != nil { + rpcErrors = append(rpcErrors, response.Error) + } else { + offsets = append(offsets, response.Offset) + } + collected++ + if collected == len(servers) { + break + } + } + if lth := len(rpcErrors); lth > allowedFailures { + return 0, rpcErrors + } else if lth == len(servers) { + return 0, rpcErrors + } + sort.SliceStable(offsets, func(i, j int) bool { + return offsets[i] > offsets[j] + }) + mid := len(offsets) / 2 + if len(offsets)%2 == 0 { + return (offsets[mid-1] + offsets[mid]) / 2, nil + } + return offsets[mid], nil +} + +func NewNTPTimesource(ntpServers []string, log *zap.Logger) *NTPTimeSource { + return &NTPTimeSource{ + servers: ntpServers, + allowedFailures: DefaultMaxAllowedFailures, + fastNTPSyncPeriod: FastNTPSyncPeriod, + slowNTPSyncPeriod: SlowNTPSyncPeriod, + timeQuery: ntp.QueryWithOptions, + log: log.Named("timesource"), + } +} + +// NTPTimeSource provides source of time that tries to be resistant to time skews. +// It does so by periodically querying time offset from ntp servers. +type NTPTimeSource struct { + servers []string + allowedFailures int + fastNTPSyncPeriod time.Duration + slowNTPSyncPeriod time.Duration + timeQuery ntpQuery // for ease of testing + log *zap.Logger + + quit chan struct{} + wg sync.WaitGroup + + mu sync.RWMutex + latestOffset time.Duration +} + +// Now returns time adjusted by latest known offset +func (s *NTPTimeSource) Now() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return time.Now().Add(s.latestOffset) +} + +func (s *NTPTimeSource) updateOffset() error { + offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures) + if err != nil { + s.log.Error("failed to compute offset", zap.Error(err)) + return errUpdateOffset + } + s.log.Info("Difference with ntp servers", zap.Duration("offset", offset)) + s.mu.Lock() + s.latestOffset = offset + s.mu.Unlock() + return nil +} + +// runPeriodically runs periodically the given function based on NTPTimeSource +// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) +func (s *NTPTimeSource) runPeriodically(fn func() error) error { + var period time.Duration + s.quit = make(chan struct{}) + // we try to do it synchronously so that user can have reliable messages right away + s.wg.Add(1) + go func() { + for { + select { + case <-time.After(period): + if err := fn(); err == nil { + period = s.slowNTPSyncPeriod + } else if period != s.slowNTPSyncPeriod { + period = s.fastNTPSyncPeriod + } + + case <-s.quit: + s.wg.Done() + return + } + } + }() + + return nil +} + +// Start runs a goroutine that updates local offset every updatePeriod. +func (s *NTPTimeSource) Start() error { + return s.runPeriodically(s.updateOffset) +} + +// Stop goroutine that updates time source. +func (s *NTPTimeSource) Stop() error { + if s.quit == nil { + return nil + } + close(s.quit) + s.wg.Wait() + return nil +} diff --git a/waku/v2/timesource/ntp_test.go b/waku/v2/timesource/ntp_test.go new file mode 100644 index 00000000..68a990f6 --- /dev/null +++ b/waku/v2/timesource/ntp_test.go @@ -0,0 +1,249 @@ +package timesource + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/beevik/ntp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +const ( + // clockCompareDelta declares time required between multiple calls to time.Now + clockCompareDelta = 100 * time.Microsecond +) + +// we don't user real servers for tests, but logic depends on +// actual number of involved NTP servers. +var mockedServers = []string{"ntp1", "ntp2", "ntp3", "ntp4"} + +type testCase struct { + description string + servers []string + allowedFailures int + responses []queryResponse + expected time.Duration + expectError bool + + // actual attempts are mutable + mu sync.Mutex + actualAttempts int +} + +func (tc *testCase) query(string, ntp.QueryOptions) (*ntp.Response, error) { + tc.mu.Lock() + defer func() { + tc.actualAttempts++ + tc.mu.Unlock() + }() + response := &ntp.Response{ + ClockOffset: tc.responses[tc.actualAttempts].Offset, + Stratum: 1, + } + return response, tc.responses[tc.actualAttempts].Error +} + +func newTestCases() []*testCase { + return []*testCase{ + { + description: "SameResponse", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + {Offset: 10 * time.Second}, + }, + expected: 10 * time.Second, + }, + { + description: "Median", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "EvenMedian", + servers: mockedServers[:2], + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + }, + expected: 15 * time.Second, + }, + { + description: "Error", + servers: mockedServers, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 30 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "MultiError", + servers: mockedServers, + responses: []queryResponse{ + {Error: errors.New("test 1")}, + {Error: errors.New("test 2")}, + {Error: errors.New("test 3")}, + {Error: errors.New("test 3")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "TolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Offset: 20 * time.Second}, + {Offset: 30 * time.Second}, + }, + expected: 20 * time.Second, + }, + { + description: "NonTolerableError", + servers: mockedServers, + allowedFailures: 1, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "AllFailed", + servers: mockedServers, + allowedFailures: 4, + responses: []queryResponse{ + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: time.Duration(0), + expectError: true, + }, + { + description: "HalfTolerable", + servers: mockedServers, + allowedFailures: 2, + responses: []queryResponse{ + {Offset: 10 * time.Second}, + {Offset: 20 * time.Second}, + {Error: errors.New("test")}, + {Error: errors.New("test")}, + }, + expected: 15 * time.Second, + }, + } +} + +func TestComputeOffset(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + offset, err := computeOffset(tc.query, tc.servers, tc.allowedFailures) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.expected, offset) + }) + } +} + +func TestNTPTimeSource(t *testing.T) { + for _, tc := range newTestCases() { + t.Run(tc.description, func(t *testing.T) { + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + log: utils.Logger(), + } + assert.WithinDuration(t, time.Now(), source.Now(), clockCompareDelta) + err := source.updateOffset() + if tc.expectError { + assert.Equal(t, errUpdateOffset, err) + } else { + assert.NoError(t, err) + } + assert.WithinDuration(t, time.Now().Add(tc.expected), source.Now(), clockCompareDelta) + }) + } +} + +func TestRunningPeriodically(t *testing.T) { + var hits int + var mu sync.RWMutex + periods := make([]time.Duration, 0) + + tc := newTestCases()[0] + fastHits := 3 + slowHits := 1 + + t.Run(tc.description, func(t *testing.T) { + source := &NTPTimeSource{ + servers: tc.servers, + allowedFailures: tc.allowedFailures, + timeQuery: tc.query, + fastNTPSyncPeriod: time.Duration(fastHits*10) * time.Millisecond, + slowNTPSyncPeriod: time.Duration(slowHits*10) * time.Millisecond, + log: utils.Logger(), + } + lastCall := time.Now() + // we're simulating a calls to updateOffset, testing ntp calls happens + // on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod) + err := source.runPeriodically(func() error { + mu.Lock() + periods = append(periods, time.Since(lastCall)) + mu.Unlock() + hits++ + if hits < 3 { + return errUpdateOffset + } + if hits == 6 { + source.wg.Done() + } + return nil + }) + + source.wg.Wait() + require.NoError(t, err) + + mu.Lock() + require.Len(t, periods, 6) + defer mu.Unlock() + prev := 0 + for _, period := range periods[1:3] { + p := int(period.Seconds() * 100) + require.True(t, fastHits <= (p-prev)) + prev = p + } + + for _, period := range periods[3:] { + p := int(period.Seconds() * 100) + require.True(t, slowHits <= (p-prev)) + prev = p + } + }) +} diff --git a/waku/v2/timesource/timesource.go b/waku/v2/timesource/timesource.go new file mode 100644 index 00000000..25ce9cbc --- /dev/null +++ b/waku/v2/timesource/timesource.go @@ -0,0 +1,9 @@ +package timesource + +import "time" + +type Timesource interface { + Now() time.Time + Start() error + Stop() error +} diff --git a/waku/v2/timesource/wall.go b/waku/v2/timesource/wall.go new file mode 100644 index 00000000..939a21ff --- /dev/null +++ b/waku/v2/timesource/wall.go @@ -0,0 +1,24 @@ +package timesource + +import "time" + +type WallClockTimeSource struct { +} + +func NewDefaultClock() *WallClockTimeSource { + return &WallClockTimeSource{} +} + +func (t *WallClockTimeSource) Now() time.Time { + return time.Now() +} + +func (t *WallClockTimeSource) Start() error { + // Do nothing + return nil +} + +func (t *WallClockTimeSource) Stop() error { + // Do nothing + return nil +} diff --git a/utils/ntp.go b/waku/v2/utils/ntp.go similarity index 100% rename from utils/ntp.go rename to waku/v2/utils/ntp.go diff --git a/waku/v2/utils/time.go b/waku/v2/utils/time.go index 12d3cd62..fb830faa 100644 --- a/waku/v2/utils/time.go +++ b/waku/v2/utils/time.go @@ -1,14 +1,25 @@ package utils -import "time" +import ( + "time" +) // GetUnixEpochFrom converts a time into a unix timestamp with nanoseconds func GetUnixEpochFrom(now time.Time) int64 { return now.UnixNano() } -// GetUnixEpoch returns the current time in unix timestamp with the integer part -// representing seconds and the decimal part representing subseconds -func GetUnixEpoch() int64 { - return GetUnixEpochFrom(time.Now()) +type Timesource interface { + Now() time.Time +} + +// GetUnixEpoch returns the current time in unix timestamp with the integer part +// representing seconds and the decimal part representing subseconds. +// Optionally receives a timesource to obtain the time from +func GetUnixEpoch(timesource ...Timesource) int64 { + if len(timesource) != 0 { + return GetUnixEpochFrom(timesource[0].Now()) + } else { + return GetUnixEpochFrom(time.Now()) + } }