diff --git a/params/config.go b/params/config.go index 612fa8485..a004e8789 100644 --- a/params/config.go +++ b/params/config.go @@ -189,7 +189,7 @@ type WakuV2Config struct { // A name->libp2p_addr map for Wakuv2 custom nodes CustomNodes map[string]string - // PeerExchange determines whether GossipSub Peer Exchange is enabled or not + // PeerExchange determines whether WakuV2 Peer Exchange is enabled or not PeerExchange bool // EnableDiscV5 indicates if DiscoveryV5 is enabled or not diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 1f837d409..0ba7dbab7 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -58,6 +58,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/status-im/status-go/eth-node/types" @@ -77,9 +78,11 @@ const autoRelayMinInterval = 2 * time.Second type settings struct { LightClient bool // Indicates if the node is a light client - MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol instead of Lightpush + MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol MaxMsgSize uint32 // Maximal message length allowed by the waku node EnableConfirmations bool // Enable sending message confirmations + PeerExchange bool // Enable peer exchange + DiscoveryLimit int // Indicates the number of nodes to discover } // Waku represents a dark communication interface through the Ethereum @@ -164,6 +167,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s MaxMsgSize: cfg.MaxMessageSize, LightClient: cfg.LightClient, MinPeersForRelay: cfg.MinPeersForRelay, + PeerExchange: cfg.PeerExchange, + DiscoveryLimit: cfg.DiscoveryLimit, } waku.filters = common.NewFilters() @@ -219,6 +224,12 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s return nil, err } opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit)))) + + // Peer exchange requires DiscV5 to run (might change in future versions of the protocol) + if cfg.PeerExchange { + opts = append(opts, node.WithPeerExchange()) + } + } if cfg.LightClient { @@ -226,7 +237,6 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s } else { relayOpts := []pubsub.Option{ pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)), - pubsub.WithPeerExchange(cfg.PeerExchange), } opts = append(opts, node.WithWakuRelayAndMinPeers(waku.settings.MinPeersForRelay, relayOpts...)) @@ -266,7 +276,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s return nil, err } } - waku.wg.Add(3) + waku.wg.Add(4) go func() { defer waku.wg.Done() @@ -292,6 +302,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s go waku.runFilterMsgLoop() go waku.runRelayMsgLoop() + go waku.runPeerExchangeLoop() waku.logger.Info("setup the go-waku node successfully") @@ -465,6 +476,85 @@ func (w *Waku) GetStats() types.StatsSummary { } } +func (w *Waku) runPeerExchangeLoop() { + defer w.wg.Done() + + if w.settings.PeerExchange && !w.settings.LightClient { + // Currently peer exchange is only used for full nodes + // TODO: should it be used for lightpush? or lightpush nodes + // are only going to be selected from a specific set of peers? + return + } + + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + for { + select { + case <-w.quit: + return + case <-ticker.C: + w.logger.Debug("Running peer exchange loop") + + connectedPeers := w.node.Host().Network().Peers() + peersWithRelay := 0 + for _, p := range connectedPeers { + supportedProtocols, err := w.node.Host().Peerstore().SupportsProtocols(p, string(relay.WakuRelayID_v200)) + if err != nil { + continue + } + if len(supportedProtocols) != 0 { + peersWithRelay++ + } + } + + peersToDiscover := w.settings.DiscoveryLimit - peersWithRelay + if peersToDiscover <= 0 { + continue + } + + // We select only the nodes discovered via DNS Discovery that support peer exchange + w.dnsAddressCacheLock.RLock() + var withThesePeers []peer.ID + for _, record := range w.dnsAddressCache { + for _, discoveredNode := range record { + if len(discoveredNode.Addresses) == 0 { + continue + } + + // Obtaining peer ID + peerIDString, err := discoveredNode.Addresses[0].ValueForProtocol(multiaddr.P_P2P) + if err != nil { + w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.Addresses[0].String())) + continue // No peer ID available somehow + } + + peerID, err := peer.Decode(peerIDString) + if err != nil { + w.logger.Warn("couldnt decode peerID", zap.String("peerIDString", peerIDString)) + continue // Couldnt decode the peerID for some reason? + } + + supportsProtocol, _ := w.node.Host().Peerstore().SupportsProtocols(peerID, string(peer_exchange.PeerExchangeID_v20alpha1)) + if len(supportsProtocol) != 0 { + withThesePeers = append(withThesePeers, peerID) + } + } + } + w.dnsAddressCacheLock.RUnlock() + + if len(withThesePeers) == 0 { + continue // No peers with peer exchange have been discovered via DNS Discovery so far, skip this iteration + } + + err := w.node.PeerExchange().Request(context.Background(), peersToDiscover, peer_exchange.WithAutomaticPeerSelection(withThesePeers...)) + if err != nil { + w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) + } + } + } +} + func (w *Waku) runRelayMsgLoop() { defer w.wg.Done()