diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 313ec6ae..6e71ec80 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -615,10 +615,9 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption panic(err) } - hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty())) - - for _, addr := range h.Addrs() { - fmt.Println(addr.Encapsulate(hostInfo)) + hostAddrs := utils.EncapsulatePeerID(h.ID(), h.Addrs()...) + for _, addr := range hostAddrs { + fmt.Println(addr) } } diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 0ed52da8..d81aa594 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -64,6 +64,10 @@ type DiscoveryV5Option func(*discV5Parameters) var protocolID = [6]byte{'d', '5', 'w', 'a', 'k', 'u'} +const peerDelay = 100 * time.Millisecond +const bucketSize = 16 +const delayBetweenDiscoveredPeerCnt = 5 * time.Second + func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option { return func(params *discV5Parameters) { params.autoUpdate = autoUpdate @@ -330,35 +334,20 @@ func (d *DiscoveryV5) FindPeersWithShard(ctx context.Context, cluster, index uin return enode.Filter(iterator, predicate), nil } -const peerDelay = 100 * time.Millisecond -const bucketSize = 16 - func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNode func(*enode.Node, peer.AddrInfo) error) { defer iterator.Close() peerCnt := 0 for { - // Delay if .Next() is too fast - start := time.Now() - hasNext := iterator.Next() - if !hasNext { - break - } - elapsed := time.Since(start) - if elapsed < peerDelay { - t := time.NewTimer(peerDelay - elapsed) - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - } + + if !delayedHasNext(ctx, iterator) { + return } - // Delay every 15 peers being returned peerCnt++ - if peerCnt == bucketSize { - t := time.NewTimer(5 * time.Second) + if peerCnt == bucketSize { // Delay every bucketSize peers discovered + peerCnt = 0 + t := time.NewTimer(delayBetweenDiscoveredPeerCnt) select { case <-ctx.Done(): return @@ -396,6 +385,28 @@ func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNo } } +func delayedHasNext(ctx context.Context, iterator enode.Iterator) bool { + // Delay if .Next() is too fast + start := time.Now() + hasNext := iterator.Next() + if !hasNext { + return false + } + + elapsed := time.Since(start) + if elapsed < peerDelay { + t := time.NewTimer(peerDelay - elapsed) + select { + case <-ctx.Done(): + return false + case <-t.C: + t.Stop() + } + } + + return true +} + // Iterates over the nodes found via discv5 belonging to the node's current shard, and sends them to peerConnector func (d *DiscoveryV5) peerLoop(ctx context.Context) error { iterator, err := d.Iterator() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 801c03cf..108a7a3a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -2,7 +2,6 @@ package node import ( "context" - "fmt" "math/rand" "net" "sync" @@ -27,7 +26,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" - "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -50,6 +48,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) +const discoveryConnectTimeout = 20 * time.Second + type Peer struct { ID peer.ID `json:"peerID"` Protocols []protocol.ID `json:"protocols"` @@ -247,7 +247,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { rngSrc := rand.NewSource(rand.Int63()) minBackoff, maxBackoff := time.Minute, time.Hour bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, 20*time.Second, bkf, w.log) + w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, discoveryConnectTimeout, bkf, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -564,12 +564,7 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) { // ListenAddresses returns all the multiaddresses used by the host func (w *WakuNode) ListenAddresses() []ma.Multiaddr { - hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) - var result []ma.Multiaddr - for _, addr := range w.host.Addrs() { - result = append(result, addr.Encapsulate(hostInfo)) - } - return result + return utils.EncapsulatePeerID(w.host.ID(), w.host.Addrs()...) } // ENR returns the ENR address of the node @@ -818,12 +813,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) { return nil, err } - addrs := w.host.Peerstore().Addrs(peerId) - hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerId.Pretty())) - for i := range addrs { - addrs[i] = addrs[i].Encapsulate(hostInfo) - } - + addrs := utils.EncapsulatePeerID(peerId, w.host.Peerstore().Addrs(peerId)...) peers = append(peers, &Peer{ ID: peerId, Protocols: protocols, diff --git a/waku/v2/utils/multiaddr.go b/waku/v2/utils/multiaddr.go new file mode 100644 index 00000000..e6e0d7d6 --- /dev/null +++ b/waku/v2/utils/multiaddr.go @@ -0,0 +1,18 @@ +package utils + +import ( + "fmt" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +// EncapsulatePeerID takes a peer.ID and adds a p2p component to all multiaddresses it receives +func EncapsulatePeerID(peerID peer.ID, addrs ...multiaddr.Multiaddr) []multiaddr.Multiaddr { + hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty())) + var result []multiaddr.Multiaddr + for _, addr := range addrs { + result = append(result, addr.Encapsulate(hostInfo)) + } + return result +}