feat: use backedoff connector and discovery

This commit is contained in:
Richard Ramos 2023-01-10 20:52:10 -04:00 committed by RichΛrd
parent 27bc9488e6
commit c87da46ce6
4 changed files with 45 additions and 18 deletions

View File

@ -52,7 +52,7 @@ type peerCache struct {
type PeerRecord struct { type PeerRecord struct {
expire int64 expire int64
Peer peer.AddrInfo Peer peer.AddrInfo
Node enode.Node Node *enode.Node
} }
type discV5Parameters struct { type discV5Parameters struct {
@ -126,10 +126,6 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
config: discover.Config{ config: discover.Config{
PrivateKey: priv, PrivateKey: priv,
Bootnodes: params.bootnodes, Bootnodes: params.bootnodes,
ValidNodeFn: func(n enode.Node) bool {
// TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func
return evaluateNode(&n)
},
V5Config: discover.V5Config{ V5Config: discover.V5Config{
ProtocolID: &protocolID, ProtocolID: &protocolID,
}, },
@ -291,7 +287,7 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
for { for {
if len(d.peerCache.recs) >= limit { if len(d.peerCache.recs) >= limit {
break time.Sleep(1 * time.Minute)
} }
if ctx.Err() != nil { if ctx.Err() != nil {
@ -317,10 +313,15 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
d.peerCache.Lock() d.peerCache.Lock()
for _, p := range peerAddrs { for _, p := range peerAddrs {
_, ok := d.peerCache.recs[p.ID]
if ok {
continue
}
d.peerCache.recs[p.ID] = PeerRecord{ d.peerCache.recs[p.ID] = PeerRecord{
expire: time.Now().Unix() + 3600, // Expires in 1hr expire: time.Now().Unix() + 3600, // Expires in 1hr
Peer: p, Peer: p,
Node: *iterator.Node(), Node: iterator.Node(),
} }
} }
d.peerCache.Unlock() d.peerCache.Unlock()

View File

@ -190,7 +190,10 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...)) w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...))
} }
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log) w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log)
if err != nil {
return nil, err
}
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)

View File

@ -18,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-msgio/protoio" "github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/discv5"
@ -32,7 +33,7 @@ import (
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1") const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000 const MaxCacheSize = 1000
const CacheCleanWindow = 200 const CacheCleanWindow = 200
const dialTimeout = 7 * time.Second const dialTimeout = 30 * time.Second
var ( var (
ErrNoPeersAvailable = errors.New("no suitable remote peers") ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -40,7 +41,7 @@ var (
) )
type peerRecord struct { type peerRecord struct {
node enode.Node node *enode.Node
idx int idx int
} }
@ -50,24 +51,36 @@ type WakuPeerExchange struct {
log *zap.Logger log *zap.Logger
cancel context.CancelFunc cancel context.CancelFunc
started bool started bool
wg sync.WaitGroup wg sync.WaitGroup
connector *backoff.BackoffConnector
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex enrCacheMutex sync.RWMutex
rng *rand.Rand rng *rand.Rand
} }
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange { func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange) wakuPX := new(WakuPeerExchange)
wakuPX.h = h wakuPX.h = h
wakuPX.disc = disc wakuPX.disc = disc
wakuPX.log = log.Named("wakupx") wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord) wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63())) wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
return wakuPX
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*30, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
connector, err := backoff.NewBackoffConnector(h, cacheSize, dialTimeout, bkf)
if err != nil {
return nil, err
}
wakuPX.connector = connector
return wakuPX, nil
} }
// Start inits the peer exchange protocol // Start inits the peer exchange protocol
@ -122,6 +135,13 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
if len(peers) != 0 { if len(peers) != 0 {
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
ch := make(chan peer.AddrInfo, len(peers))
for _, p := range peers {
ch <- p
}
wakuPX.connector.Connect(ctx, ch)
for _, p := range peers { for _, p := range peers {
func(p peer.AddrInfo) { func(p peer.AddrInfo) {
ctx, cancel := context.WithTimeout(ctx, dialTimeout) ctx, cancel := context.WithTimeout(ctx, dialTimeout)

View File

@ -136,8 +136,11 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
time.Sleep(3 * time.Second) // Wait some time for peers to be discovered time.Sleep(3 * time.Second) // Wait some time for peers to be discovered
// mount peer exchange // mount peer exchange
px1 := NewWakuPeerExchange(host1, d1, utils.Logger()) px1, err := NewWakuPeerExchange(host1, d1, utils.Logger())
px3 := NewWakuPeerExchange(host3, nil, utils.Logger()) require.NoError(t, err)
px3, err := NewWakuPeerExchange(host3, nil, utils.Logger())
require.NoError(t, err)
err = px1.Start(context.Background()) err = px1.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)