chore: extract `EncapsulatePeerID`

This commit is contained in:
Richard Ramos 2023-07-07 11:51:15 -04:00 committed by richΛrd
parent cfe28d4698
commit bc6a305759
4 changed files with 58 additions and 40 deletions

View File

@ -615,10 +615,9 @@ func printListeningAddresses(ctx context.Context, nodeOpts []node.WakuNodeOption
panic(err) panic(err)
} }
hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty())) hostAddrs := utils.EncapsulatePeerID(h.ID(), h.Addrs()...)
for _, addr := range hostAddrs {
for _, addr := range h.Addrs() { fmt.Println(addr)
fmt.Println(addr.Encapsulate(hostInfo))
} }
} }

View File

@ -64,6 +64,10 @@ type DiscoveryV5Option func(*discV5Parameters)
var protocolID = [6]byte{'d', '5', 'w', 'a', 'k', 'u'} var protocolID = [6]byte{'d', '5', 'w', 'a', 'k', 'u'}
const peerDelay = 100 * time.Millisecond
const bucketSize = 16
const delayBetweenDiscoveredPeerCnt = 5 * time.Second
func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option { func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option {
return func(params *discV5Parameters) { return func(params *discV5Parameters) {
params.autoUpdate = autoUpdate params.autoUpdate = autoUpdate
@ -330,35 +334,20 @@ func (d *DiscoveryV5) FindPeersWithShard(ctx context.Context, cluster, index uin
return enode.Filter(iterator, predicate), nil return enode.Filter(iterator, predicate), nil
} }
const peerDelay = 100 * time.Millisecond
const bucketSize = 16
func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNode func(*enode.Node, peer.AddrInfo) error) { func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNode func(*enode.Node, peer.AddrInfo) error) {
defer iterator.Close() defer iterator.Close()
peerCnt := 0 peerCnt := 0
for { for {
// Delay if .Next() is too fast
start := time.Now() if !delayedHasNext(ctx, iterator) {
hasNext := iterator.Next()
if !hasNext {
break
}
elapsed := time.Since(start)
if elapsed < peerDelay {
t := time.NewTimer(peerDelay - elapsed)
select {
case <-ctx.Done():
return return
case <-t.C:
t.Stop()
}
} }
// Delay every 15 peers being returned
peerCnt++ peerCnt++
if peerCnt == bucketSize { if peerCnt == bucketSize { // Delay every bucketSize peers discovered
t := time.NewTimer(5 * time.Second) peerCnt = 0
t := time.NewTimer(delayBetweenDiscoveredPeerCnt)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -396,6 +385,28 @@ func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNo
} }
} }
func delayedHasNext(ctx context.Context, iterator enode.Iterator) bool {
// Delay if .Next() is too fast
start := time.Now()
hasNext := iterator.Next()
if !hasNext {
return false
}
elapsed := time.Since(start)
if elapsed < peerDelay {
t := time.NewTimer(peerDelay - elapsed)
select {
case <-ctx.Done():
return false
case <-t.C:
t.Stop()
}
}
return true
}
// Iterates over the nodes found via discv5 belonging to the node's current shard, and sends them to peerConnector // Iterates over the nodes found via discv5 belonging to the node's current shard, and sends them to peerConnector
func (d *DiscoveryV5) peerLoop(ctx context.Context) error { func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
iterator, err := d.Iterator() iterator, err := d.Iterator()

View File

@ -2,7 +2,6 @@ package node
import ( import (
"context" "context"
"fmt"
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
@ -27,7 +26,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats" "go.opencensus.io/stats"
@ -50,6 +48,8 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
const discoveryConnectTimeout = 20 * time.Second
type Peer struct { type Peer struct {
ID peer.ID `json:"peerID"` ID peer.ID `json:"peerID"`
Protocols []protocol.ID `json:"protocols"` Protocols []protocol.ID `json:"protocols"`
@ -247,7 +247,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
rngSrc := rand.NewSource(rand.Int63()) rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Minute, time.Hour minBackoff, maxBackoff := time.Minute, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, 20*time.Second, bkf, w.log) w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, discoveryConnectTimeout, bkf, w.log)
if err != nil { if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err)) w.log.Error("creating peer connection strategy", zap.Error(err))
} }
@ -564,12 +564,7 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) {
// ListenAddresses returns all the multiaddresses used by the host // ListenAddresses returns all the multiaddresses used by the host
func (w *WakuNode) ListenAddresses() []ma.Multiaddr { func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) return utils.EncapsulatePeerID(w.host.ID(), w.host.Addrs()...)
var result []ma.Multiaddr
for _, addr := range w.host.Addrs() {
result = append(result, addr.Encapsulate(hostInfo))
}
return result
} }
// ENR returns the ENR address of the node // ENR returns the ENR address of the node
@ -818,12 +813,7 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
return nil, err return nil, err
} }
addrs := w.host.Peerstore().Addrs(peerId) addrs := utils.EncapsulatePeerID(peerId, w.host.Peerstore().Addrs(peerId)...)
hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerId.Pretty()))
for i := range addrs {
addrs[i] = addrs[i].Encapsulate(hostInfo)
}
peers = append(peers, &Peer{ peers = append(peers, &Peer{
ID: peerId, ID: peerId,
Protocols: protocols, Protocols: protocols,

View File

@ -0,0 +1,18 @@
package utils
import (
"fmt"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
// EncapsulatePeerID takes a peer.ID and adds a p2p component to all multiaddresses it receives
func EncapsulatePeerID(peerID peer.ID, addrs ...multiaddr.Multiaddr) []multiaddr.Multiaddr {
hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.Pretty()))
var result []multiaddr.Multiaddr
for _, addr := range addrs {
result = append(result, addr.Encapsulate(hostInfo))
}
return result
}