From 8593866862175eeab6f30765e5f0b813f3b1b4db Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Fri, 9 Dec 2022 16:16:21 +0000 Subject: [PATCH] Restart discovery when it fails When discovery fails to be seeded with bootstrap/fallback nodes, it never recovers. This commit changes the behavior so that status-go retries fetching bootnodes, and restarts discovery when that happens. --- VERSION | 2 +- eth-node/bridge/geth/waku.go | 2 + eth-node/bridge/geth/wakuv2.go | 5 + eth-node/types/waku.go | 4 + go.mod | 2 +- go.sum | 4 +- protocol/messenger_contact_requests_test.go | 6 +- protocol/messenger_group_chat_test.go | 4 +- protocol/messenger_mailserver.go | 1 + protocol/transport/transport.go | 10 +- .../go-waku/waku/persistence/store.go | 42 +++--- .../go-waku/waku/v2/discv5/discover.go | 68 ++++------ .../go-waku/waku/v2/node/connectedness.go | 2 +- .../go-waku/waku/v2/node/keepalive.go | 3 +- .../go-waku/waku/v2/node/localnode.go | 2 +- .../go-waku/waku/v2/node/wakunode2.go | 29 ++-- .../go-waku/waku/v2/node/wakunode2_rln.go | 3 +- .../go-waku/waku/v2/node/wakuoptions.go | 5 +- .../peer_exchange/waku_peer_exchange.go | 125 ++++++++--------- .../waku/v2/protocol/relay/waku_relay.go | 6 +- .../go-waku/waku/v2/protocol/rln/web3.go | 2 +- .../v2/protocol/store/waku_store_client.go | 5 +- .../go-waku/waku/v2/timesource/ntp.go | 26 ++-- .../go-waku/waku/v2/timesource/timesource.go | 7 +- .../go-waku/waku/v2/timesource/wall.go | 7 +- vendor/modules.txt | 2 +- wakuv2/waku.go | 128 +++++++++++++++++- wakuv2/waku_test.go | 85 ++++++++++++ 28 files changed, 394 insertions(+), 193 deletions(-) create mode 100644 wakuv2/waku_test.go diff --git a/VERSION b/VERSION index a38b3bd31..90bdef2ea 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.117.0 +0.117.1 diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 2158006d8..24c6ad5e5 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/waku" wakucommon "github.com/status-im/status-go/waku/common" @@ -256,6 +257,7 @@ func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { return nil, errors.New("not implemented") } +func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} type wakuFilterWrapper struct { filter *wakucommon.Filter diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index bdd5e33ad..ba04b6fe5 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -10,6 +10,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/wakuv2" wakucommon "github.com/status-im/status-go/wakuv2/common" @@ -272,6 +273,10 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub return w.waku.SubscribeToConnStatusChanges(), nil } +func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) { + w.waku.ConnectionChanged(state) +} + type wakuV2FilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index aefeac717..e6a4bf6a9 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -8,6 +8,7 @@ import ( "github.com/pborman/uuid" "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/connection" ) type ConnStatus struct { @@ -149,4 +150,7 @@ type Waku interface { // MarkP2PMessageAsProcessed tells the waku layer that a P2P message has been processed MarkP2PMessageAsProcessed(common.Hash) + + // ConnectionChanged is called whenever the client knows its connection status has changed + ConnectionChanged(connection.State) } diff --git a/go.mod b/go.mod index 102cff774..93930c89a 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require github.com/fogleman/gg v1.3.0 require ( github.com/gorilla/sessions v1.2.1 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 - github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743 + github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4 ) require ( diff --git a/go.sum b/go.sum index 2572eefb1..76b9387de 100644 --- a/go.sum +++ b/go.sum @@ -2065,8 +2065,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= -github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743 h1:Q6bNLLCE7+OGrRlsmcrglKeURXmaagemIWTrvrJTgK4= -github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743/go.mod h1:MzmxeUFKOSGqI+3ditwJVmiDXtWW7p4vZhmFeAcwKyI= +github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4 h1:lzIDkSLbaAHzYlGgZB9BEWFEbVFZLmFAvzE2fiN61pg= +github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4/go.mod h1:MzmxeUFKOSGqI+3ditwJVmiDXtWW7p4vZhmFeAcwKyI= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg h1:2vVIBCtBih2w1K9ll8YnToTDZvbxcgbsClsPlJS/kkg= github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg/go.mod h1:GlyaVeEWNEBxVJrWC6jFTvb4LNb9d9qnjdS6EiWVUvk= github.com/waku-org/noise v1.0.2 h1:7WmlhpJ0eliBzwzKz6SoTqQznaEU2IuebHF3oCekqqs= diff --git a/protocol/messenger_contact_requests_test.go b/protocol/messenger_contact_requests_test.go index c4a864b0e..97cb5247c 100644 --- a/protocol/messenger_contact_requests_test.go +++ b/protocol/messenger_contact_requests_test.go @@ -643,7 +643,11 @@ func (s *MessengerContactRequestSuite) TestAcceptLatestContactRequestForContact( resp, err = WaitOnMessengerResponse( theirMessenger, func(r *MessengerResponse) bool { - return len(r.Contacts) > 0 && len(r.Messages()) > 0 && len(r.ActivityCenterNotifications()) > 0 + contactRequests, _, err := theirMessenger.PendingContactRequests("", 10) + if err != nil { + return false + } + return len(contactRequests) == 1 }, "no messages", ) diff --git a/protocol/messenger_group_chat_test.go b/protocol/messenger_group_chat_test.go index 43064d4ca..2f80fcf59 100644 --- a/protocol/messenger_group_chat_test.go +++ b/protocol/messenger_group_chat_test.go @@ -82,7 +82,9 @@ func (s *MessengerGroupChatSuite) createEmptyGroupChat(creator *Messenger, name func (s *MessengerGroupChatSuite) verifyGroupChatCreated(member *Messenger, expectedChatActive bool) { response, err := WaitOnMessengerResponse( member, - func(r *MessengerResponse) bool { return len(r.Chats()) > 0 }, + func(r *MessengerResponse) bool { + return len(r.Chats()) == 1 && r.Chats()[0].Active == expectedChatActive + }, "chat invitation not received", ) s.Require().NoError(err) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 28ad1d6dd..e514553c7 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -800,6 +800,7 @@ func (m *Messenger) RemoveFilters(filters []*transport.Filter) error { } func (m *Messenger) ConnectionChanged(state connection.State) { + m.transport.ConnectionChanged(state) if !m.connectionState.Offline && state.Offline { m.sender.StopDatasync() } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 7f6b2c60d..2eddfec3e 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" ) @@ -79,8 +80,9 @@ type Transport struct { // NewTransport returns a new Transport. // TODO: leaving a chat should verify that for a given public key -// there are no other chats. It may happen that we leave a private chat -// but still have a public chat for a given public key. +// +// there are no other chats. It may happen that we leave a private chat +// but still have a public chat for a given public key. func NewTransport( waku types.Waku, privateKey *ecdsa.PrivateKey, @@ -654,3 +656,7 @@ func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) { func (t *Transport) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) { return t.waku.SubscribeToConnStatusChanges() } + +func (t *Transport) ConnectionChanged(state connection.State) { + t.waku.ConnectionChanged(state) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go index dc922caea..5958af853 100644 --- a/vendor/github.com/waku-org/go-waku/waku/persistence/store.go +++ b/vendor/github.com/waku-org/go-waku/waku/persistence/store.go @@ -64,13 +64,28 @@ func WithDB(db *sql.DB) DBOption { } } +type ConnectionPoolOptions struct { + MaxOpenConnections int + MaxIdleConnections int + ConnectionMaxLifetime time.Duration + ConnectionMaxIdleTime time.Duration +} + // WithDriver is a DBOption that will open a *sql.DB connection -func WithDriver(driverName string, datasourceName string) DBOption { +func WithDriver(driverName string, datasourceName string, connectionPoolOptions ...ConnectionPoolOptions) DBOption { return func(d *DBStore) error { db, err := sql.Open(driverName, datasourceName) if err != nil { return err } + + if len(connectionPoolOptions) != 0 { + db.SetConnMaxIdleTime(connectionPoolOptions[0].ConnectionMaxIdleTime) + db.SetConnMaxLifetime(connectionPoolOptions[0].ConnectionMaxLifetime) + db.SetMaxIdleConns(connectionPoolOptions[0].MaxIdleConnections) + db.SetMaxOpenConns(connectionPoolOptions[0].MaxOpenConnections) + } + d.db = db return nil } @@ -119,31 +134,8 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { } } - // Disable concurrent access as not supported by the driver - result.db.SetMaxOpenConns(1) - - var seq string - var name string - var file string // file will be empty if DB is :memory" - err := result.db.QueryRow("PRAGMA database_list").Scan(&seq, &name, &file) - if err != nil { - return nil, err - } - - // readers do not block writers and faster i/o operations - // https://www.sqlite.org/draft/wal.html - // must be set after db is encrypted - var mode string - err = result.db.QueryRow("PRAGMA journal_mode=WAL").Scan(&mode) - if err != nil { - return nil, err - } - if mode != WALMode && file != "" { - return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode) - } - if result.enableMigrations { - err = migrations.Migrate(result.db) + err := migrations.Migrate(result.db) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go index 0d7c790b1..a0b19b1ae 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go @@ -8,9 +8,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/ethereum/go-ethereum/p2p/nat" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -18,6 +15,10 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/p2p/nat" ) type DiscoveryV5 struct { @@ -26,23 +27,20 @@ type DiscoveryV5 struct { discovery.Discovery params *discV5Parameters - ctx context.Context host host.Host config discover.Config udpAddr *net.UDPAddr listener *discover.UDPv5 localnode *enode.LocalNode NAT nat.Interface - quit chan struct{} - started bool log *zap.Logger - wg *sync.WaitGroup + started bool + cancel context.CancelFunc + wg *sync.WaitGroup - peerCache peerCache - discoverCtx context.Context - discoverCancelFunc context.CancelFunc + peerCache peerCache } type peerCache struct { @@ -100,7 +98,7 @@ func DefaultOptions() []DiscoveryV5Option { const MaxPeersToDiscover = 600 -func NewDiscoveryV5(ctx context.Context, host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -116,7 +114,6 @@ func NewDiscoveryV5(ctx context.Context, host host.Host, priv *ecdsa.PrivateKey, } return &DiscoveryV5{ - ctx: ctx, host: host, params: params, NAT: NAT, @@ -149,7 +146,7 @@ func (d *DiscoveryV5) Node() *enode.Node { return d.localnode.Node() } -func (d *DiscoveryV5) listen() error { +func (d *DiscoveryV5) listen(ctx context.Context) error { conn, err := net.ListenUDP("udp", d.udpAddr) if err != nil { return err @@ -160,7 +157,7 @@ func (d *DiscoveryV5) listen() error { d.wg.Add(1) go func() { defer d.wg.Done() - nat.Map(d.NAT, d.quit, "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") + nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") }() } @@ -182,45 +179,35 @@ func (d *DiscoveryV5) listen() error { return nil } -func (d *DiscoveryV5) Start() error { +func (d *DiscoveryV5) Start(ctx context.Context) error { d.Lock() defer d.Unlock() - if d.started { - return nil - } + d.wg.Wait() // Waiting for any go routines to stop + ctx, cancel := context.WithCancel(ctx) - d.wg.Wait() // Waiting for other go routines to stop - - d.quit = make(chan struct{}, 1) + d.cancel = cancel d.started = true - err := d.listen() + err := d.listen(ctx) if err != nil { return err } - // create cancellable - d.discoverCtx, d.discoverCancelFunc = context.WithCancel(d.ctx) - go d.runDiscoveryV5Loop() + go d.runDiscoveryV5Loop(ctx) return nil } func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error { - return d.listener.SetFallbackNodes(nodes) + return d.listener.SetFallbackNodes(nodes) } func (d *DiscoveryV5) Stop() { d.Lock() defer d.Unlock() - if !d.started { - return - } - - close(d.quit) - d.discoverCancelFunc() + d.cancel() d.listener.Close() d.listener = nil @@ -294,7 +281,7 @@ func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discover return 20 * time.Minute, nil } -func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan struct{}) { +func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int) { defer d.wg.Done() for { @@ -302,7 +289,7 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st break } - if d.discoverCtx.Err() != nil { + if ctx.Err() != nil { break } @@ -333,8 +320,6 @@ func (d *DiscoveryV5) iterate(iterator enode.Iterator, limit int, doneCh chan st } d.peerCache.Unlock() } - - close(doneCh) } func (d *DiscoveryV5) removeExpiredPeers() int { @@ -353,21 +338,16 @@ func (d *DiscoveryV5) removeExpiredPeers() int { return newCacheSize } -func (d *DiscoveryV5) runDiscoveryV5Loop() { +func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) { iterator := d.listener.RandomNodes() iterator = enode.Filter(iterator, evaluateNode) defer iterator.Close() - doneCh := make(chan struct{}) - d.wg.Add(1) - go d.iterate(iterator, MaxPeersToDiscover, doneCh) + go d.iterate(ctx, iterator, MaxPeersToDiscover) - select { - case <-d.discoverCtx.Done(): - case <-doneCh: - } + <-ctx.Done() d.log.Warn("Discv5 loop stopped") } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go index cf1a5ea3b..79dbd9579 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go @@ -96,7 +96,7 @@ func (w *WakuNode) connectednessListener() { for { select { - case <-w.quit: + case <-w.ctx.Done(): return case <-w.protocolEventSub.Out(): case <-w.identificationEventSub.Out(): diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go index 415944ae7..fd6cbff55 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go @@ -54,7 +54,8 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } lastTimeExecuted = w.timesource.Now() - case <-w.quit: + case <-w.ctx.Done(): + w.log.Info("stopping ping protocol") return } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index 4ae3683ce..8b0be2e7f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -241,7 +241,7 @@ func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error { if w.discoveryV5 != nil && w.discoveryV5.IsStarted() { w.log.Info("restarting discv5") w.discoveryV5.Stop() - err = w.discoveryV5.Start() + err = w.discoveryV5.Start(w.ctx) if err != nil { w.log.Error("could not restart discv5", zap.Error(err)) return err diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 46869ca05..9313be432 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -8,11 +8,12 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/libp2p/go-libp2p" - "go.uber.org/zap" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" @@ -94,9 +95,8 @@ type WakuNode struct { keepAliveMutex sync.Mutex keepAliveFails map[peer.ID]int - ctx context.Context + ctx context.Context // TODO: remove this cancel context.CancelFunc - quit chan struct{} wg *sync.WaitGroup // Channel passed to WakuNode constructor @@ -170,7 +170,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.ctx = ctx w.opts = params w.log = params.logger.Named("node2") - w.quit = make(chan struct{}) w.wg = &sync.WaitGroup{} w.addrChan = make(chan ma.Multiaddr, 1024) w.keepAliveFails = make(map[peer.ID]int) @@ -235,7 +234,7 @@ func (w *WakuNode) checkForAddressChanges() { first <- struct{}{} for { select { - case <-w.quit: + case <-w.ctx.Done(): close(w.addrChan) return case <-first: @@ -268,7 +267,7 @@ func (w *WakuNode) checkForAddressChanges() { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start() error { if w.opts.enableNTP { - err := w.timesource.Start() + err := w.timesource.Start(w.ctx) if err != nil { return err } @@ -357,9 +356,7 @@ func (w *WakuNode) Start() error { // Stop stops the WakuNode and closess all connections to the host func (w *WakuNode) Stop() { - defer w.cancel() - - close(w.quit) + w.cancel() w.bcaster.Close() @@ -523,14 +520,14 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.ctx, w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...) return err } func (w *WakuNode) mountPeerExchange() error { - w.peerExchange = peer_exchange.NewWakuPeerExchange(w.ctx, w.host, w.discoveryV5, w.log) - return w.peerExchange.Start() + w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.discoveryV5, w.log) + return w.peerExchange.Start(w.ctx) } func (w *WakuNode) startStore() error { @@ -673,6 +670,12 @@ func (w *WakuNode) PeerStats() PeerStats { return p } +// Set the bootnodes on discv5 +func (w *WakuNode) SetDiscV5Bootnodes(nodes []*enode.Node) error { + w.opts.discV5bootnodes = nodes + return w.discoveryV5.SetBootnodes(nodes) +} + // Peers return the list of peers, addresses, protocols supported and connection status func (w *WakuNode) Peers() ([]*Peer, error) { var peers []*Peer diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2_rln.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2_rln.go index a1a35af4c..86f792f54 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2_rln.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2_rln.go @@ -4,7 +4,6 @@ package node import ( - "context" "encoding/hex" "errors" @@ -81,7 +80,7 @@ func (w *WakuNode) mountRlnRelay() error { // mount the rln relay protocol in the on-chain/dynamic mode var err error - w.rlnRelay, err = rln.RlnRelayDynamic(context.Background(), w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log) + w.rlnRelay, err = rln.RlnRelayDynamic(w.ctx, w.relay, w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log) if err != nil { return err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 99d1d8aaf..e64befcdc 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -317,8 +317,9 @@ func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption { } // WithWakuStore enables the Waku V2 Store protocol and if the messages should -// be stored or not in a message provider -func WithWakuStore(shouldStoreMessages bool, resumeNodes []multiaddr.Multiaddr) WakuNodeOption { +// be stored or not in a message provider. If resumeNodes are specified, the +// store will attempt to resume message history using those nodes +func WithWakuStore(shouldStoreMessages bool, resumeNodes ...multiaddr.Multiaddr) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go index 06c845466..e5c3732d7 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange.go @@ -46,24 +46,21 @@ type peerRecord struct { type WakuPeerExchange struct { h host.Host - ctx context.Context disc *discv5.DiscoveryV5 - log *zap.Logger - quit chan struct{} - wg sync.WaitGroup + log *zap.Logger + + cancel context.CancelFunc + wg sync.WaitGroup enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ enrCacheMutex sync.RWMutex rng *rand.Rand - - started bool } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct -func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange { +func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange { wakuPX := new(WakuPeerExchange) - wakuPX.ctx = ctx wakuPX.h = h wakuPX.disc = disc wakuPX.log = log.Named("wakupx") @@ -73,19 +70,21 @@ func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.Discover } // Start inits the peer exchange protocol -func (wakuPX *WakuPeerExchange) Start() error { - wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest) +func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { + wakuPX.wg.Wait() // Waiting for any go routines to stop + ctx, cancel := context.WithCancel(ctx) + wakuPX.cancel = cancel + + wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx)) wakuPX.log.Info("Peer exchange protocol started") - wakuPX.started = true - wakuPX.quit = make(chan struct{}, 1) wakuPX.wg.Add(1) - go wakuPX.runPeerExchangeDiscv5Loop() + go wakuPX.runPeerExchangeDiscv5Loop(ctx) return nil } -func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error { +func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error { var peers []peer.AddrInfo for _, p := range response.PeerInfos { enrRecord := &enr.Record{} @@ -118,7 +117,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) for _, p := range peers { func(p peer.AddrInfo) { - ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout) + ctx, cancel := context.WithTimeout(ctx, dialTimeout) defer cancel() err := wakuPX.h.Connect(ctx, p) if err != nil { @@ -131,35 +130,37 @@ func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse return nil } -func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) { - defer s.Close() - logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer())) - requestRPC := &pb.PeerExchangeRPC{} - reader := protoio.NewDelimitedReader(s, math.MaxInt32) - err := reader.ReadMsg(requestRPC) - if err != nil { - logger.Error("reading request", zap.Error(err)) - metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure") - return - } - - if requestRPC.Query != nil { - logger.Info("request received") - err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer()) +func (wakuPX *WakuPeerExchange) onRequest(ctx context.Context) func(s network.Stream) { + return func(s network.Stream) { + defer s.Close() + logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer())) + requestRPC := &pb.PeerExchangeRPC{} + reader := protoio.NewDelimitedReader(s, math.MaxInt32) + err := reader.ReadMsg(requestRPC) if err != nil { - logger.Error("responding", zap.Error(err)) - metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure") + logger.Error("reading request", zap.Error(err)) + metrics.RecordPeerExchangeError(ctx, "decodeRpcFailure") return } - } - if requestRPC.Response != nil { - logger.Info("response received") - err := wakuPX.handleResponse(requestRPC.Response) - if err != nil { - logger.Error("handling response", zap.Error(err)) - metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure") - return + if requestRPC.Query != nil { + logger.Info("request received") + err := wakuPX.respond(ctx, requestRPC.Query.NumPeers, s.Conn().RemotePeer()) + if err != nil { + logger.Error("responding", zap.Error(err)) + metrics.RecordPeerExchangeError(ctx, "pxFailure") + return + } + } + + if requestRPC.Response != nil { + logger.Info("response received") + err := wakuPX.handleResponse(ctx, requestRPC.Response) + if err != nil { + logger.Error("handling response", zap.Error(err)) + metrics.RecordPeerExchangeError(ctx, "pxFailure") + return + } } } } @@ -176,7 +177,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts } if params.selectedPeer == "" { - metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError") + metrics.RecordPeerExchangeError(ctx, "dialError") return ErrNoPeersAvailable } @@ -186,35 +187,27 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts }, } - return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer) -} - -// IsStarted returns if the peer exchange protocol has been mounted or not -func (wakuPX *WakuPeerExchange) IsStarted() bool { - return wakuPX.started + return wakuPX.sendPeerExchangeRPCToPeer(ctx, requestRPC, params.selectedPeer) } // Stop unmounts the peer exchange protocol func (wakuPX *WakuPeerExchange) Stop() { - if wakuPX.started { - wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1) - wakuPX.started = false - close(wakuPX.quit) - wakuPX.wg.Wait() - } + wakuPX.cancel() + wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1) + wakuPX.wg.Wait() } -func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error { +func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(ctx context.Context, rpc *pb.PeerExchangeRPC, peerID peer.ID) error { logger := wakuPX.log.With(logging.HostID("peer", peerID)) // We connect first so dns4 addresses are resolved (NewStream does not do it) - err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID)) + err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(peerID)) if err != nil { logger.Error("connecting peer", zap.Error(err)) return err } - connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1) + connOpt, err := wakuPX.h.NewStream(ctx, peerID, PeerExchangeID_v20alpha1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) return err @@ -231,7 +224,7 @@ func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRP return nil } -func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error { +func (wakuPX *WakuPeerExchange) respond(ctx context.Context, numPeers uint64, peerID peer.ID) error { records, err := wakuPX.getENRsFromCache(numPeers) if err != nil { return err @@ -241,7 +234,7 @@ func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error { responseRPC.Response = new(pb.PeerExchangeResponse) responseRPC.Response.PeerInfos = records - return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID) + return wakuPX.sendPeerExchangeRPCToPeer(ctx, responseRPC, peerID) } func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) { @@ -304,12 +297,8 @@ func (wakuPX *WakuPeerExchange) cleanCache() { wakuPX.enrCache = r } -func (wakuPX *WakuPeerExchange) findPeers() { - if !wakuPX.disc.IsStarted() { - return - } - - ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second) +func (wakuPX *WakuPeerExchange) findPeers(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() peerRecords, err := wakuPX.disc.FindNodes(ctx, "") if err != nil { @@ -332,7 +321,7 @@ func (wakuPX *WakuPeerExchange) findPeers() { wakuPX.cleanCache() } -func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() { +func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { defer wakuPX.wg.Done() // Runs a discv5 loop adding new peers to the px peer cache @@ -349,15 +338,15 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() { // This loop "competes" with the loop in wakunode2 // For the purpose of collecting px peers, 30 sec intervals should be enough - wakuPX.findPeers() + wakuPX.findPeers(ctx) for { select { - case <-wakuPX.quit: + case <-ctx.Done(): return case <-ticker.C: - wakuPX.findPeers() + wakuPX.findPeers(ctx) } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go index 08a2472dd..cf2a2db22 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/waku_relay.go @@ -247,7 +247,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr w.bcaster.Register(&topic, subscription.C) } - go w.subscribeToTopic(topic, subscription, sub) + go w.subscribeToTopic(ctx, topic, subscription, sub) return subscription, nil } @@ -307,8 +307,8 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < return msgChannel } -func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { - ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) +func (w *WakuRelay) subscribeToTopic(ctx context.Context, t string, subscription *Subscription, sub *pubsub.Subscription) { + ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "relay")) if err != nil { w.log.Error("creating tag map", zap.Error(err)) return diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/web3.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/web3.go index d6d85450d..055ec6114 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/web3.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/rln/web3.go @@ -33,7 +33,7 @@ func register(ctx context.Context, idComm r.IDCommitment, ethAccountPrivateKey * } defer backend.Close() - chainID, err := backend.ChainID(context.Background()) + chainID, err := backend.ChainID(ctx) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go index a4ba39369..432897b6d 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/waku_store_client.go @@ -256,10 +256,13 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR result := &Result{ Messages: response.Messages, query: q, - cursor: response.PagingInfo.Cursor, peerId: params.selectedPeer, } + if response.PagingInfo != nil { + result.cursor = response.PagingInfo.Cursor + } + return result, nil } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go index b019d8cf1..8f7fbe085 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/ntp.go @@ -2,6 +2,7 @@ package timesource import ( "bytes" + "context" "errors" "sort" "sync" @@ -133,8 +134,8 @@ type NTPTimeSource struct { timeQuery ntpQuery // for ease of testing log *zap.Logger - quit chan struct{} - wg sync.WaitGroup + cancel context.CancelFunc + wg sync.WaitGroup mu sync.RWMutex latestOffset time.Duration @@ -162,9 +163,11 @@ func (s *NTPTimeSource) updateOffset() error { // runPeriodically runs periodically the given function based on NTPTimeSource // synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod) -func (s *NTPTimeSource) runPeriodically(fn func() error) error { +func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) error { var period time.Duration - s.quit = make(chan struct{}) + + s.log.Info("starting service") + // we try to do it synchronously so that user can have reliable messages right away s.wg.Add(1) go func() { @@ -177,7 +180,8 @@ func (s *NTPTimeSource) runPeriodically(fn func() error) error { period = s.fastNTPSyncPeriod } - case <-s.quit: + case <-ctx.Done(): + s.log.Info("stopping service") s.wg.Done() return } @@ -188,16 +192,16 @@ func (s *NTPTimeSource) runPeriodically(fn func() error) error { } // Start runs a goroutine that updates local offset every updatePeriod. -func (s *NTPTimeSource) Start() error { - return s.runPeriodically(s.updateOffset) +func (s *NTPTimeSource) Start(ctx context.Context) error { + s.wg.Wait() // Waiting for other go routines to stop + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + return s.runPeriodically(ctx, s.updateOffset) } // Stop goroutine that updates time source. func (s *NTPTimeSource) Stop() error { - if s.quit == nil { - return nil - } - close(s.quit) + s.cancel() s.wg.Wait() return nil } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/timesource.go b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/timesource.go index 25ce9cbc5..667a227ab 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/timesource.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/timesource.go @@ -1,9 +1,12 @@ package timesource -import "time" +import ( + "context" + "time" +) type Timesource interface { Now() time.Time - Start() error + Start(ctx context.Context) error Stop() error } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/wall.go b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/wall.go index 939a21ff5..67b8b3433 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/timesource/wall.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/timesource/wall.go @@ -1,6 +1,9 @@ package timesource -import "time" +import ( + "context" + "time" +) type WallClockTimeSource struct { } @@ -13,7 +16,7 @@ func (t *WallClockTimeSource) Now() time.Time { return time.Now() } -func (t *WallClockTimeSource) Start() error { +func (t *WallClockTimeSource) Start(ctx context.Context) error { // Do nothing return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index d1c9c3d25..6f9c1c0d7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -990,7 +990,7 @@ github.com/vacp2p/mvds/transport github.com/waku-org/go-discover/discover github.com/waku-org/go-discover/discover/v4wire github.com/waku-org/go-discover/discover/v5wire -# github.com/waku-org/go-waku v0.2.3-test.0.20221209175307-685142e7b743 +# github.com/waku-org/go-waku v0.2.3-test.0.20221212154545-7443daea4cd4 ## explicit; go 1.18 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/waku/persistence diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 41d3f6c87..5b5085554 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -25,6 +25,7 @@ import ( "database/sql" "errors" "fmt" + "math" "math/rand" "net" "runtime" @@ -62,6 +63,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/signal" "github.com/status-im/status-go/timesource" @@ -77,6 +79,8 @@ import ( const messageQueueLimit = 1024 const requestTimeout = 30 * time.Second const autoRelayMinInterval = 2 * time.Second +const bootnodesQueryBackoffMs = 200 +const bootnodesMaxRetries = 7 type settings struct { LightClient bool // Indicates if the node is a light client @@ -132,12 +136,29 @@ type Waku struct { // NTP Synced timesource timesource *timesource.NTPTimeSource + + // seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery + // bootnodes successfully + seededBootnodesForDiscV5 bool + + // offline indicates whether we have detected connectivity + offline bool + + // connectionChanged is channel that notifies when connectivity has changed + connectionChanged chan struct{} + + // discV5BootstrapNodes is the ENR to be used to fetch bootstrap nodes for discovery + discV5BootstrapNodes []string } // New creates a WakuV2 client ready to communicate through the LibP2P network. func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *sql.DB, timesource *timesource.NTPTimeSource) (*Waku, error) { + var err error if logger == nil { - logger = zap.NewNop() + logger, err = zap.NewDevelopment() + if err != nil { + return nil, err + } } cfg = setDefaults(cfg) @@ -154,6 +175,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s sendQueue: make(chan *pb.WakuMessage, 1000), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), quit: make(chan struct{}), + connectionChanged: make(chan struct{}), wg: sync.WaitGroup{}, dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCacheLock: &sync.RWMutex{}, @@ -161,6 +183,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s storeMsgIDsMu: sync.RWMutex{}, timeSource: time.Now, logger: logger, + discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, } // Disabling light client mode if using status.prod or undefined @@ -181,7 +204,6 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s waku.filterMsgChannel = make(chan *protocol.Envelope, 1024) var privateKey *ecdsa.PrivateKey - var err error if nodeKey != "" { privateKey, err = crypto.HexToECDSA(nodeKey) } else { @@ -226,8 +248,10 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s if cfg.EnableDiscV5 { bootnodes, err := waku.getDiscV5BootstrapNodes(ctx, cfg.DiscV5BootstrapNodes) if err != nil { + logger.Error("failed to get bootstrap nodes", zap.Error(err)) return nil, err } + opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit)))) // Peer exchange requires DiscV5 to run (might change in future versions of the protocol) @@ -276,7 +300,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s } if cfg.EnableDiscV5 { - err := waku.node.DiscV5().Start() + err := waku.node.DiscV5().Start(ctx) if err != nil { return nil, err } @@ -358,12 +382,16 @@ func (w *Waku) getDiscV5BootstrapNodes(ctx context.Context, addresses []string) } } wg.Wait() + + w.seededBootnodesForDiscV5 = len(result) > 0 + return result, nil } type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnApplyToEachPeer) { + w.logger.Info("retrieving nodes", zap.String("enr", enrtreeAddress)) ctx, cancel := context.WithTimeout(ctx, requestTimeout) defer cancel() @@ -372,15 +400,17 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA discNodes, ok := w.dnsAddressCache[enrtreeAddress] if !ok { - // NOTE: Temporary fix for DNS resolution on android/ios, as gomobile does not support it + // NOTE: Temporary fix for DNS resolution on android/ios, as gomobile does not support it discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrtreeAddress, dnsdisc.WithNameserver("1.1.1.1")) if err != nil { w.logger.Warn("dns discovery error ", zap.Error(err)) return } - w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], discoveredNodes...) - discNodes = w.dnsAddressCache[enrtreeAddress] + if len(discoveredNodes) != 0 { + w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], discoveredNodes...) + discNodes = w.dnsAddressCache[enrtreeAddress] + } } wg := &sync.WaitGroup{} @@ -1093,6 +1123,7 @@ func (w *Waku) Start() error { } go w.broadcast() + go w.seedBootnodesForDiscV5() return nil } @@ -1104,6 +1135,7 @@ func (w *Waku) Stop() error { w.node.Stop() close(w.quit) close(w.filterMsgChannel) + close(w.connectionChanged) w.wg.Wait() return nil } @@ -1256,7 +1288,7 @@ func (w *Waku) StartDiscV5() error { return errors.New("discv5 is not setup") } - return w.node.DiscV5().Start() + return w.node.DiscV5().Start(context.Background()) } func (w *Waku) StopDiscV5() error { @@ -1268,6 +1300,88 @@ func (w *Waku) StopDiscV5() error { return nil } +func (w *Waku) ConnectionChanged(state connection.State) { + if !state.Offline && w.offline { + select { + case w.connectionChanged <- struct{}{}: + } + } + + w.offline = !state.Offline +} + +// seedBootnodesForDiscV5 tries to fetch bootnodes +// from an ENR periodically. +// It backs off exponentially until maxRetries, at which point it restarts from 0 +// It also restarts if there's a connection change signalled from the client +func (w *Waku) seedBootnodesForDiscV5() { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + var lastTry = time.Now().UnixNano() / int64(time.Millisecond) + var retries = 0 + + for { + select { + case <-ticker.C: + if w.seededBootnodesForDiscV5 { + w.logger.Info("stopped querying bootnodes") + return + } + now := time.Now().UnixNano() / int64(time.Millisecond) + backoff := bootnodesQueryBackoffMs * int64(math.Exp2(float64(retries))) + + if lastTry+backoff < now { + err := w.restartDiscV5() + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + + lastTry = now + retries++ + // We reset the retries after a while and restart + if retries > bootnodesMaxRetries { + retries = 0 + } + + } + // If we go online, trigger immediately + case <-w.connectionChanged: + now := time.Now().UnixNano() / int64(time.Millisecond) + backoff := bootnodesQueryBackoffMs * int64(math.Exp2(float64(retries))) + // check we haven't run too eagerly, in case connection + // is flapping + if lastTry+backoff < now { + err := w.restartDiscV5() + if err != nil { + w.logger.Warn("failed to restart discv5", zap.Error(err)) + } + + } + retries = 0 + lastTry = now + + case <-w.quit: + return + } + } +} + +// Restart discv5, re-retrieving bootstrap nodes +func (w *Waku) restartDiscV5() error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + bootnodes, err := w.getDiscV5BootstrapNodes(ctx, w.discV5BootstrapNodes) + if err != nil { + return err + } + if len(bootnodes) == 0 { + return errors.New("failed to fetch bootnodes") + } + + w.logger.Info("restarting discv5 with nodes", zap.Any("nodes", bootnodes)) + return w.node.SetDiscV5Bootnodes(bootnodes) +} + func (w *Waku) AddStorePeer(address string) (string, error) { addr, err := multiaddr.NewMultiaddr(address) if err != nil { diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go new file mode 100644 index 000000000..625bbeb02 --- /dev/null +++ b/wakuv2/waku_test.go @@ -0,0 +1,85 @@ +package wakuv2 + +import ( + "errors" + "testing" + "time" + + "github.com/cenkalti/backoff/v3" + "github.com/stretchr/testify/require" + + "github.com/status-im/status-go/protocol/tt" +) + +var testENRBootstrap = "enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.nodes.status.im" + +func TestDiscoveryV5(t *testing.T) { + config := &Config{} + config.EnableDiscV5 = true + config.DiscV5BootstrapNodes = []string{testENRBootstrap} + config.DiscoveryLimit = 20 + config.UDPPort = 9001 + w, err := New("", "", config, nil, nil, nil) + require.NoError(t, err) + + require.NoError(t, w.Start()) + + err = tt.RetryWithBackOff(func() error { + if len(w.Peers()) == 0 { + return errors.New("no peers discovered") + } + return nil + }) + + require.NoError(t, err) + + require.NotEqual(t, 0, len(w.Peers())) + require.NoError(t, w.Stop()) +} + +func TestRestartDiscoveryV5(t *testing.T) { + config := &Config{} + config.EnableDiscV5 = true + // Use wrong discv5 bootstrap address, to simulate being offline + config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"} + config.DiscoveryLimit = 20 + config.UDPPort = 9002 + w, err := New("", "", config, nil, nil, nil) + require.NoError(t, err) + + require.NoError(t, w.Start()) + + require.False(t, w.seededBootnodesForDiscV5) + + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 2 * time.Second + } + + // Sanity check, not great, but it's probably helpful + err = tt.RetryWithBackOff(func() error { + if len(w.Peers()) == 0 { + return errors.New("no peers discovered") + } + return nil + }, options) + + require.Error(t, err) + + w.discV5BootstrapNodes = []string{testENRBootstrap} + + options = func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + err = tt.RetryWithBackOff(func() error { + if len(w.Peers()) == 0 { + return errors.New("no peers discovered") + } + return nil + }, options) + require.NoError(t, err) + + require.True(t, w.seededBootnodesForDiscV5) + require.NotEqual(t, 0, len(w.Peers())) + require.NoError(t, w.Stop()) +}