feat(waku2): peer exchange

This commit is contained in:
Richard Ramos 2022-11-29 08:43:11 -04:00 committed by RichΛrd
parent b50a134b48
commit 195c149f47
2 changed files with 94 additions and 4 deletions

View File

@ -189,7 +189,7 @@ type WakuV2Config struct {
// A name->libp2p_addr map for Wakuv2 custom nodes
CustomNodes map[string]string
// PeerExchange determines whether GossipSub Peer Exchange is enabled or not
// PeerExchange determines whether WakuV2 Peer Exchange is enabled or not
PeerExchange bool
// EnableDiscV5 indicates if DiscoveryV5 is enabled or not

View File

@ -58,6 +58,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/eth-node/types"
@ -77,9 +78,11 @@ const autoRelayMinInterval = 2 * time.Second
type settings struct {
LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol instead of Lightpush
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
PeerExchange bool // Enable peer exchange
DiscoveryLimit int // Indicates the number of nodes to discover
}
// Waku represents a dark communication interface through the Ethereum
@ -164,6 +167,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
MaxMsgSize: cfg.MaxMessageSize,
LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay,
PeerExchange: cfg.PeerExchange,
DiscoveryLimit: cfg.DiscoveryLimit,
}
waku.filters = common.NewFilters()
@ -219,6 +224,12 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
return nil, err
}
opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
// Peer exchange requires DiscV5 to run (might change in future versions of the protocol)
if cfg.PeerExchange {
opts = append(opts, node.WithPeerExchange())
}
}
if cfg.LightClient {
@ -226,7 +237,6 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
} else {
relayOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)),
pubsub.WithPeerExchange(cfg.PeerExchange),
}
opts = append(opts, node.WithWakuRelayAndMinPeers(waku.settings.MinPeersForRelay, relayOpts...))
@ -266,7 +276,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
return nil, err
}
}
waku.wg.Add(3)
waku.wg.Add(4)
go func() {
defer waku.wg.Done()
@ -292,6 +302,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
go waku.runFilterMsgLoop()
go waku.runRelayMsgLoop()
go waku.runPeerExchangeLoop()
waku.logger.Info("setup the go-waku node successfully")
@ -465,6 +476,85 @@ func (w *Waku) GetStats() types.StatsSummary {
}
}
func (w *Waku) runPeerExchangeLoop() {
defer w.wg.Done()
if w.settings.PeerExchange && !w.settings.LightClient {
// Currently peer exchange is only used for full nodes
// TODO: should it be used for lightpush? or lightpush nodes
// are only going to be selected from a specific set of peers?
return
}
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-w.quit:
return
case <-ticker.C:
w.logger.Debug("Running peer exchange loop")
connectedPeers := w.node.Host().Network().Peers()
peersWithRelay := 0
for _, p := range connectedPeers {
supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(p, string(relay.WakuRelayID_v200))
if err != nil {
continue
}
if len(supportedProtocols) != 0 {
peersWithRelay++
}
}
peersToDiscover := w.settings.DiscoveryLimit - peersWithRelay
if peersToDiscover <= 0 {
continue
}
// We select only the nodes discovered via DNS Discovery that support peer exchange
w.dnsAddressCacheLock.RLock()
var withThesePeers []peer.ID
for _, record := range w.dnsAddressCache {
for _, discoveredNode := range record {
if len(discoveredNode.Addresses) == 0 {
continue
}
// Obtaining peer ID
peerIDString, err := discoveredNode.Addresses[0].ValueForProtocol(multiaddr.P_P2P)
if err != nil {
w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.Addresses[0].String()))
continue // No peer ID available somehow
}
peerID, err := peer.Decode(peerIDString)
if err != nil {
w.logger.Warn("couldnt decode peerID", zap.String("peerIDString", peerIDString))
continue // Couldnt decode the peerID for some reason?
}
supportsProtocol, _ := w.node.Host().Peerstore().SupportsProtocols(peerID, string(peer_exchange.PeerExchangeID_v20alpha1))
if len(supportsProtocol) != 0 {
withThesePeers = append(withThesePeers, peerID)
}
}
}
w.dnsAddressCacheLock.RUnlock()
if len(withThesePeers) == 0 {
continue // No peers with peer exchange have been discovered via DNS Discovery so far, skip this iteration
}
err := w.node.PeerExchange().Request(context.Background(), peersToDiscover, peer_exchange.WithAutomaticPeerSelection(withThesePeers...))
if err != nil {
w.logger.Error("couldnt request peers via peer exchange", zap.Error(err))
}
}
}
}
func (w *Waku) runRelayMsgLoop() {
defer w.wg.Done()