refactor: peer discovery management for discv5 and peer exchange

This commit is contained in:
Richard Ramos 2023-01-13 19:58:22 -04:00 committed by RichΛrd
parent 25486ebdb4
commit e0ccdbe966
12 changed files with 486 additions and 338 deletions

2
go.mod
View File

@ -80,7 +80,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect

View File

@ -20,8 +20,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/ethereum/go-ethereum/p2p/enode"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
@ -176,7 +174,7 @@ func NewNode(configJSON string) string {
}
bootnodes = append(bootnodes, bootnode)
}
opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true))
}
// for go-libp2p loggers

View File

@ -7,12 +7,14 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
@ -115,3 +117,49 @@ func RandomHex(n int) (string, error) {
}
return hex.EncodeToString(bytes), nil
}
type TestPeerDiscoverer struct {
sync.RWMutex
peerMap map[peer.ID]struct{}
peerCh chan peer.AddrInfo
}
func NewTestPeerDiscoverer() *TestPeerDiscoverer {
result := &TestPeerDiscoverer{
peerMap: make(map[peer.ID]struct{}),
peerCh: make(chan peer.AddrInfo, 10),
}
go func() {
for p := range result.peerCh {
result.Lock()
result.peerMap[p.ID] = struct{}{}
result.Unlock()
}
}()
return result
}
func (t *TestPeerDiscoverer) PeerChannel() chan<- peer.AddrInfo {
return t.peerCh
}
func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool {
t.RLock()
defer t.RUnlock()
_, ok := t.peerMap[p]
return ok
}
func (t *TestPeerDiscoverer) PeerCount() int {
t.RLock()
defer t.RUnlock()
return len(t.peerMap)
}
func (t *TestPeerDiscoverer) Clear() {
t.Lock()
defer t.Unlock()
t.peerMap = make(map[peer.ID]struct{})
}

View File

@ -26,7 +26,6 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
@ -245,7 +244,7 @@ func Execute(options Options) {
}
}
nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(45), discovery.TTL(time.Duration(20)*time.Second))))
nodeOpts = append(nodeOpts, node.WithDiscoveryV5(options.DiscV5.Port, bootnodes, options.DiscV5.AutoUpdate))
}
if options.PeerExchange.Enable {

View File

@ -0,0 +1,243 @@
package v2
// Adapted from github.com/libp2p/go-libp2p@v0.23.2/p2p/discovery/backoff/backoffconnector.go
import (
"context"
"errors"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
lru "github.com/hashicorp/golang-lru"
)
// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
sync.RWMutex
cache *lru.TwoQueueCache
host host.Host
cancel context.CancelFunc
paused bool
workerCtx context.Context
workerCancel context.CancelFunc
wg sync.WaitGroup
minPeers int
dialTimeout time.Duration
peerCh chan peer.AddrInfo
dialCh chan peer.AddrInfo
backoff backoff.BackoffFactory
mux sync.Mutex
logger *zap.Logger
}
// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already.
// cacheSize is the size of a TwoQueueCache
// dialTimeout is how long we attempt to connect to a peer before giving up
// minPeers is the minimum number of peers that the node should have
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) {
cache, err := lru.New2Q(cacheSize)
if err != nil {
return nil, err
}
return &PeerConnectionStrategy{
cache: cache,
host: h,
wg: sync.WaitGroup{},
minPeers: minPeers,
dialTimeout: dialTimeout,
backoff: backoff,
logger: logger.Named("discovery-connector"),
}, nil
}
type connCacheData struct {
nextTry time.Time
strat backoff.BackoffStrategy
}
// PeerChannel exposes the channel on which discovered peers should be pushed
func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo {
return c.peerCh
}
// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
return errors.New("already started")
}
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
c.peerCh = make(chan peer.AddrInfo)
c.dialCh = make(chan peer.AddrInfo)
c.wg.Add(3)
go c.shouldDialPeers(ctx)
go c.workPublisher(ctx)
go c.dialPeers(ctx)
return nil
}
func (c *PeerConnectionStrategy) Stop() {
if c.cancel == nil {
return
}
c.cancel()
c.wg.Wait()
close(c.peerCh)
close(c.dialCh)
}
func (c *PeerConnectionStrategy) isPaused() bool {
c.RLock()
defer c.RUnlock()
return c.paused
}
func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) {
defer c.wg.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
c.Lock()
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
isPaused := c.isPaused()
numPeers := len(c.host.Network().Peers())
if numPeers >= c.minPeers && !isPaused {
c.Lock()
c.paused = true
c.workerCancel()
c.Unlock()
} else if numPeers < c.minPeers && isPaused {
c.Lock()
c.paused = false
c.workerCtx, c.workerCancel = context.WithCancel(ctx)
c.Unlock()
}
}
}
}
func (c *PeerConnectionStrategy) publishWork(ctx context.Context, p peer.AddrInfo) {
select {
case c.dialCh <- p:
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
return
}
}
func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
isPaused := c.isPaused()
if !isPaused {
select {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.publishWork(ctx, p)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
}
} else {
// Check if paused again
time.Sleep(1 * time.Second)
}
}
}
}
func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) {
defer c.wg.Done()
maxGoRoutines := c.minPeers
if maxGoRoutines > 15 {
maxGoRoutines = 15
}
sem := make(chan struct{}, maxGoRoutines)
for {
select {
case pi, ok := <-c.dialCh:
if !ok {
return
}
if pi.ID == c.host.ID() || pi.ID == "" {
continue
}
c.mux.Lock()
val, ok := c.cache.Get(pi.ID)
var cachedPeer *connCacheData
if ok {
tv := val.(*connCacheData)
now := time.Now()
if now.Before(tv.nextTry) {
c.mux.Unlock()
continue
}
tv.nextTry = now.Add(tv.strat.Delay())
} else {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.cache.Add(pi.ID, cachedPeer)
}
c.mux.Unlock()
if c.host.Network().Connectedness(pi.ID) == network.Connected {
continue
}
sem <- struct{}{}
c.wg.Add(1)
go func(pi peer.AddrInfo) {
defer c.wg.Done()
ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout)
defer cancel()
err := c.host.Connect(ctx, pi)
if err != nil && !errors.Is(err, context.Canceled) {
c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
}
<-sem
}(pi)
case <-ctx.Done():
return
}
}
}

View File

@ -3,12 +3,10 @@ package discv5
import (
"context"
"crypto/ecdsa"
"math/rand"
"errors"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-discover/discover"
@ -17,42 +15,26 @@ import (
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nat"
)
type DiscoveryV5 struct {
sync.RWMutex
discovery.Discovery
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
NAT nat.Interface
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
peerConnector PeerConnector
NAT nat.Interface
log *zap.Logger
started bool
cancel context.CancelFunc
wg *sync.WaitGroup
peerCache peerCache
}
type peerCache struct {
sync.RWMutex
recs map[peer.ID]PeerRecord
rng *rand.Rand
}
type PeerRecord struct {
expire int64
Peer peer.AddrInfo
Node *enode.Node
}
type discV5Parameters struct {
@ -96,9 +78,11 @@ func DefaultOptions() []DiscoveryV5Option {
}
}
const MaxPeersToDiscover = 600
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) {
params := new(discV5Parameters)
optList := DefaultOptions()
optList = append(optList, opts...)
@ -114,15 +98,12 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
}
return &DiscoveryV5{
host: host,
params: params,
NAT: NAT,
wg: &sync.WaitGroup{},
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]PeerRecord),
},
localnode: localnode,
host: host,
peerConnector: peerConnector,
params: params,
NAT: NAT,
wg: &sync.WaitGroup{},
localnode: localnode,
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
@ -190,6 +171,7 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
return err
}
d.wg.Add(1)
go d.runDiscoveryV5Loop(ctx)
return nil
@ -237,29 +219,15 @@ func isWakuNode(node *enode.Node) bool {
}
*/
func hasTCPPort(node *enode.Node) bool {
enrTCP := new(enr.TCP)
if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil {
if !enr.IsNotFound(err) {
utils.Logger().Named("discv5").Error("retrieving port for enr", logging.ENode("enr", node))
}
return false
}
return true
}
func evaluateNode(node *enode.Node) bool {
// TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func
if node == nil || node.IP() == nil {
if node == nil {
return false
}
// TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if /*!isWakuNode(node) ||*/ !hasTCPPort(node) {
/*if !isWakuNode(node) {
return false
}
}*/
_, err := utils.EnodeToPeerInfo(node)
@ -271,27 +239,25 @@ func evaluateNode(node *enode.Node) bool {
return true
}
func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
if d.listener == nil {
return nil, errors.New("no discv5 listener")
}
// TODO: once discv5 spec introduces capability and topic discovery, implement this function
return 20 * time.Minute, nil
iterator := d.listener.RandomNodes()
return enode.Filter(iterator, evaluateNode), nil
}
func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int) {
defer d.wg.Done()
func (d *DiscoveryV5) iterate(ctx context.Context) {
iterator, err := d.Iterator()
if err != nil {
d.log.Debug("obtaining iterator", zap.Error(err))
return
}
defer iterator.Close()
for {
if len(d.peerCache.recs) >= limit {
time.Sleep(1 * time.Minute)
}
if ctx.Err() != nil {
break
}
@ -313,111 +279,37 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
continue
}
d.peerCache.Lock()
for _, p := range peerAddrs {
_, ok := d.peerCache.recs[p.ID]
if ok {
continue
}
d.peerCache.recs[p.ID] = PeerRecord{
expire: time.Now().Unix() + 3600, // Expires in 1hr
Peer: p,
Node: iterator.Node(),
if len(peerAddrs) != 0 {
select {
case <-ctx.Done():
return
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
}
}
d.peerCache.Unlock()
}
}
func (d *DiscoveryV5) removeExpiredPeers() int {
// Remove all expired entries from cache
currentTime := time.Now().Unix()
newCacheSize := len(d.peerCache.recs)
for p := range d.peerCache.recs {
rec := d.peerCache.recs[p]
if rec.expire < currentTime {
newCacheSize--
delete(d.peerCache.recs, p)
}
}
return newCacheSize
}
func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) {
iterator := d.listener.RandomNodes()
iterator = enode.Filter(iterator, evaluateNode)
defer iterator.Close()
defer d.wg.Done()
d.wg.Add(1)
ch := make(chan struct{}, 1)
ch <- struct{}{} // Initial execution
go d.iterate(ctx, iterator, MaxPeersToDiscover)
<-ctx.Done()
d.log.Warn("Discv5 loop stopped")
}
func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}
limit := options.Limit
if limit == 0 || limit > MaxPeersToDiscover {
limit = MaxPeersToDiscover
}
// We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic
d.peerCache.Lock()
defer d.peerCache.Unlock()
d.removeExpiredPeers()
// Randomize and fill channel with available records
count := len(d.peerCache.recs)
if limit < count {
count = limit
}
perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
sendLst := make([]PeerRecord, count)
iter := 0
for k := range d.peerCache.recs {
if sendIndex, ok := permSet[iter]; ok {
sendLst[sendIndex] = d.peerCache.recs[k]
restartLoop:
for {
select {
case <-ch:
if d.listener == nil {
break
}
d.iterate(ctx)
ch <- struct{}{}
case <-ctx.Done():
close(ch)
break restartLoop
}
iter++
}
return sendLst, err
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
records, err := d.FindNodes(ctx, topic, opts...)
if err != nil {
return nil, err
}
chPeer := make(chan peer.AddrInfo, len(records))
for _, r := range records {
chPeer <- r.Peer
}
close(chPeer)
return chPeer, err
d.log.Warn("Discv5 loop stopped")
}
func (d *DiscoveryV5) IsStarted() bool {

View File

@ -21,7 +21,6 @@ import (
"github.com/libp2p/go-libp2p"
libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
)
@ -106,7 +105,8 @@ func TestDiscV5(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d1, err := NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), WithUDPPort(uint(udpPort1)))
peerconn1 := tests.NewTestPeerDiscoverer()
d1, err := NewDiscoveryV5(host1, prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
// H2
@ -116,7 +116,8 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d2, err := NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
peerconn2 := tests.NewTestPeerDiscoverer()
d2, err := NewDiscoveryV5(host2, prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()}))
require.NoError(t, err)
// H3
@ -126,7 +127,8 @@ func TestDiscV5(t *testing.T) {
require.NoError(t, err)
l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger())
require.NoError(t, err)
d3, err := NewDiscoveryV5(host3, prvKey3, l3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
peerconn3 := tests.NewTestPeerDiscoverer()
d3, err := NewDiscoveryV5(host3, prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()}))
require.NoError(t, err)
defer d1.Stop()
@ -142,92 +144,18 @@ func TestDiscV5(t *testing.T) {
err = d3.Start(context.Background())
require.NoError(t, err)
time.Sleep(3 * time.Second) // Wait for nodes to be discovered
time.Sleep(2 * time.Second) // Wait for nodes to be discovered
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peerChan, err := d3.FindPeers(ctx, "", discovery.Limit(2))
require.NoError(t, err)
foundHost1 := false
foundHost2 := false
for p := range peerChan {
if p.Addrs[0].String() == host1.Addrs()[0].String() {
foundHost1 = true
}
if p.Addrs[0].String() == host2.Addrs()[0].String() {
foundHost2 = true
}
}
require.True(t, foundHost1 && foundHost2)
// Should return nodes from the cache
require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID()))
d3.Stop()
foundHost1 = false
foundHost2 = false
ctx1, cancel1 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel1()
peerChan, err = d3.FindPeers(ctx1, "", discovery.Limit(2))
require.NoError(t, err)
for p := range peerChan {
if p.Addrs[0].String() == host1.Addrs()[0].String() {
foundHost1 = true
}
if p.Addrs[0].String() == host2.Addrs()[0].String() {
foundHost2 = true
}
}
require.True(t, foundHost1 && foundHost2)
// Simulate empty cache
for i := range d3.peerCache.recs {
delete(d3.peerCache.recs, i)
}
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel2()
peerChan, err = d3.FindPeers(ctx2, "", discovery.Limit(2))
require.NoError(t, err)
for range peerChan {
require.Fail(t, "Should not have peers")
}
peerconn3.Clear()
// Restart peer search
err = d3.Start(context.Background())
require.NoError(t, err)
time.Sleep(3 * time.Second) // Wait for nodes to be discovered
foundHost1 = false
foundHost2 = false
ctx3, cancel3 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel3()
peerChan, err = d3.FindPeers(ctx3, "", discovery.Limit(2))
require.NoError(t, err)
for p := range peerChan {
if p.Addrs[0].String() == host1.Addrs()[0].String() {
foundHost1 = true
}
if p.Addrs[0].String() == host2.Addrs()[0].String() {
foundHost2 = true
}
}
require.True(t, foundHost1 && foundHost2)
time.Sleep(2 * time.Second) // Wait for nodes to be discovered
require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID()))
}

View File

@ -3,6 +3,7 @@ package node
import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
@ -15,3 +16,8 @@ type ReceptorService interface {
Service
MessageChannel() chan *protocol.Envelope
}
type PeerConnectorService interface {
Service
PeerChannel() chan<- peer.AddrInfo
}

View File

@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
@ -21,6 +21,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
@ -70,14 +71,15 @@ type WakuNode struct {
log *zap.Logger
timesource timesource.Timesource
relay Service
lightPush Service
swap Service
discoveryV5 Service
peerExchange Service
filter ReceptorService
store ReceptorService
rlnRelay RLNRelay
relay Service
lightPush Service
swap Service
peerConnector PeerConnectorService
discoveryV5 Service
peerExchange Service
filter ReceptorService
store ReceptorService
rlnRelay RLNRelay
wakuFlag utils.WakuEnrBitfield
@ -178,16 +180,24 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.log.Error("creating localnode", zap.Error(err))
}
// Setup peer connection strategy
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*30, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
w.peerConnector, err = v2.NewPeerConnectionStrategy(host, cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log)
if err != nil {
w.log.Error("creating peer connection strategy", zap.Error(err))
}
if w.opts.enableDiscV5 {
err := w.mountDiscV5()
if err != nil {
return nil, err
}
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...))
}
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log)
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.peerConnector, w.log)
if err != nil {
return nil, err
}
@ -280,6 +290,11 @@ func (w *WakuNode) Start(ctx context.Context) error {
go w.startKeepAlive(ctx, w.opts.keepAliveInterval)
}
err := w.peerConnector.Start(ctx)
if err != nil {
return err
}
if w.opts.enableNTP {
err := w.timesource.Start(ctx)
if err != nil {
@ -328,7 +343,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.filter.MessageChannel())
}
err := w.setupENR(ctx, w.ListenAddresses())
err = w.setupENR(ctx, w.ListenAddresses())
if err != nil {
return err
}
@ -375,6 +390,8 @@ func (w *WakuNode) Stop() {
w.discoveryV5.Stop()
}
w.peerConnector.Stop()
_ = w.stopRlnRelay()
w.timesource.Stop()
@ -531,7 +548,7 @@ func (w *WakuNode) mountDiscV5() error {
}
var err error
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.log, discV5Options...)
w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...)
return err
}

View File

@ -74,10 +74,11 @@ type WakuNodeParameters struct {
swapDisconnectThreshold int
swapPaymentThreshold int
discoveryMinPeers int
enableDiscV5 bool
udpPort uint
discV5bootnodes []*enode.Node
discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool
enablePeerExchange bool
@ -108,6 +109,7 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithDiscoverParams(150),
WithLogger(utils.Logger()),
}
@ -281,13 +283,19 @@ func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)
}
}
func WithDiscoverParams(minPeers int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.discoveryMinPeers = minPeers
return nil
}
}
// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool, discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableDiscV5 = true
params.udpPort = udpPort
params.discV5bootnodes = bootnodes
params.discV5Opts = discoverOpts
params.discV5autoUpdate = autoUpdate
return nil
}

View File

@ -18,7 +18,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
@ -33,7 +32,6 @@ import (
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
const CacheCleanWindow = 200
const dialTimeout = 30 * time.Second
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -51,41 +49,35 @@ type WakuPeerExchange struct {
log *zap.Logger
cancel context.CancelFunc
started bool
cancel context.CancelFunc
wg sync.WaitGroup
connector *backoff.BackoffConnector
peerConnector PeerConnector
peerCh chan peer.AddrInfo
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex
rng *rand.Rand
}
type PeerConnector interface {
PeerChannel() chan<- peer.AddrInfo
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) (*WakuPeerExchange, error) {
func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange)
wakuPX.h = h
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
cacheSize := 600
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*30, time.Hour
bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
connector, err := backoff.NewBackoffConnector(h, cacheSize, dialTimeout, bkf)
if err != nil {
return nil, err
}
wakuPX.connector = connector
wakuPX.peerConnector = peerConnector
return wakuPX, nil
}
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
if wakuPX.started {
if wakuPX.cancel != nil {
return errors.New("peer exchange already started")
}
@ -93,14 +85,13 @@ func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
wakuPX.cancel = cancel
wakuPX.started = true
wakuPX.peerCh = make(chan peer.AddrInfo)
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest(ctx))
wakuPX.log.Info("Peer exchange protocol started")
wakuPX.wg.Add(1)
go wakuPX.runPeerExchangeDiscv5Loop(ctx)
return nil
}
@ -128,20 +119,22 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
return err
}
if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected {
peers = append(peers, *peerInfo)
}
peers = append(peers, *peerInfo)
}
if len(peers) != 0 {
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
ch := make(chan peer.AddrInfo, len(peers))
for _, p := range peers {
ch <- p
}
wakuPX.connector.Connect(ctx, ch)
wakuPX.wg.Add(1)
go func() {
defer wakuPX.wg.Done()
for _, p := range peers {
select {
case <-ctx.Done():
return
case wakuPX.peerConnector.PeerChannel() <- p:
}
}
}()
}
return nil
@ -212,8 +205,9 @@ func (wakuPX *WakuPeerExchange) Stop() {
if wakuPX.cancel == nil {
return
}
wakuPX.cancel()
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.cancel()
close(wakuPX.peerCh)
wakuPX.wg.Wait()
}
@ -317,28 +311,41 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) findPeers(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
iterator, err := wakuPX.disc.Iterator()
if err != nil {
wakuPX.log.Error("finding peers", zap.Error(err))
wakuPX.log.Debug("obtaining iterator", zap.Error(err))
return
}
defer iterator.Close()
cnt := 0
wakuPX.enrCacheMutex.Lock()
for _, p := range peerRecords {
cnt++
wakuPX.enrCache[p.Node.ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: p.Node,
for {
if ctx.Err() != nil {
break
}
exists := iterator.Next()
if !exists {
break
}
addresses, err := utils.Multiaddress(iterator.Node())
if err != nil {
wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err))
continue
}
if len(addresses) == 0 {
continue
}
wakuPX.enrCacheMutex.Lock()
wakuPX.enrCache[iterator.Node().ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: iterator.Node(),
}
wakuPX.enrCacheMutex.Unlock()
}
wakuPX.enrCacheMutex.Unlock()
wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt))
wakuPX.cleanCache()
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
@ -350,24 +357,23 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
return
}
wakuPX.log.Info("starting peer exchange discovery v5 loop")
ch := make(chan struct{}, 1)
ch <- struct{}{} // Initial execution
ticker := time.NewTicker(30 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
// This loop "competes" with the loop in wakunode2
// For the purpose of collecting px peers, 30 sec intervals should be enough
wakuPX.findPeers(ctx)
restartLoop:
for {
select {
case <-ctx.Done():
return
case <-ch:
wakuPX.iterate(ctx)
ch <- struct{}{}
case <-ticker.C:
wakuPX.findPeers(ctx)
wakuPX.cleanCache()
case <-ctx.Done():
close(ch)
break restartLoop
}
}
}

View File

@ -105,7 +105,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
ip1, _ := extractIP(host1.Addrs()[0])
l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
discv5PeerConn1 := tests.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
// H2
@ -115,7 +116,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
require.NoError(t, err)
l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
discv5PeerConn2 := tests.NewTestPeerDiscoverer()
d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err)
// H3
@ -136,10 +138,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
time.Sleep(3 * time.Second) // Wait some time for peers to be discovered
// mount peer exchange
px1, err := NewWakuPeerExchange(host1, d1, utils.Logger())
pxPeerConn1 := tests.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(host1, d1, pxPeerConn1, utils.Logger())
require.NoError(t, err)
px3, err := NewWakuPeerExchange(host3, nil, utils.Logger())
pxPeerConn3 := tests.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(host3, nil, pxPeerConn3, utils.Logger())
require.NoError(t, err)
err = px1.Start(context.Background())
@ -157,8 +161,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic
peer2Info := host3.Peerstore().PeerInfo(host2.ID())
require.Equal(t, host2.Addrs()[0], peer2Info.Addrs[0])
require.True(t, pxPeerConn3.HasPeer(host2.ID()))
px1.Stop()
px3.Stop()