fix: peer exchange client logic to identify and connect to peers

This commit is contained in:
Prem Chaitanya Prathi 2024-02-20 11:47:40 +05:30 committed by frank
parent 81be9ea190
commit 500755110a
1 changed files with 12 additions and 28 deletions

View File

@ -218,13 +218,16 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
onPeerStats: onPeerStats, onPeerStats: onPeerStats,
} }
enablePeerExchange := false enablePeerExchangeServer := false
enablePeerExchangeClient := false //TODO: Not sure how to set this as config that can be used in peerExchangeLoop
enableDiscv5 := false enableDiscv5 := false
if cfg.LightClient { if cfg.LightClient {
enablePeerExchange = true enablePeerExchangeServer = false
enablePeerExchangeClient = true
enableDiscv5 = false enableDiscv5 = false
} else { } else {
enablePeerExchange = false enablePeerExchangeServer = true
enablePeerExchangeClient = false
enableDiscv5 = true enableDiscv5 = true
} }
@ -233,7 +236,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
LightClient: cfg.LightClient, LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay, MinPeersForRelay: cfg.MinPeersForRelay,
MinPeersForFilter: cfg.MinPeersForFilter, MinPeersForFilter: cfg.MinPeersForFilter,
PeerExchange: enablePeerExchange, PeerExchange: enablePeerExchangeServer,
DiscoveryLimit: cfg.DiscoveryLimit, DiscoveryLimit: cfg.DiscoveryLimit,
Nameserver: cfg.Nameserver, Nameserver: cfg.Nameserver,
Resolver: cfg.Resolver, Resolver: cfg.Resolver,
@ -547,10 +550,8 @@ func (w *Waku) GetStats() types.StatsSummary {
func (w *Waku) runPeerExchangeLoop() { func (w *Waku) runPeerExchangeLoop() {
defer w.wg.Done() defer w.wg.Done()
if !w.settings.PeerExchange || !w.settings.LightClient { if !enablePeerExchangeClient {
// Currently peer exchange is only used for full nodes // Currently peer exchange client is only used for light nodes
// TODO: should it be used for lightpush? or lightpush nodes
// are only going to be selected from a specific set of peers?
return return
} }
@ -584,35 +585,18 @@ func (w *Waku) runPeerExchangeLoop() {
// We select only the nodes discovered via DNS Discovery that support peer exchange // We select only the nodes discovered via DNS Discovery that support peer exchange
w.dnsAddressCacheLock.RLock() w.dnsAddressCacheLock.RLock()
var withThesePeers []peer.ID
for _, record := range w.dnsAddressCache { for _, record := range w.dnsAddressCache {
for _, discoveredNode := range record { for _, discoveredNode := range record {
if len(discoveredNode.PeerInfo.Addrs) == 0 { if len(discoveredNode.PeerInfo.Addrs) == 0 {
continue continue
} }
go w.identifyAndConnect(w.ctx, true, discoveredNode.PeerInfo)
// Obtaining peer ID
peerIDString := discoveredNode.PeerID.String()
peerID, err := peer.Decode(peerIDString)
if err != nil {
w.logger.Warn("couldnt decode peerID", zap.String("peerIDString", peerIDString))
continue // Couldnt decode the peerID for some reason?
}
supportsProtocol, _ := w.node.Host().Peerstore().SupportsProtocols(peerID, peer_exchange.PeerExchangeID_v20alpha1)
if len(supportsProtocol) != 0 {
withThesePeers = append(withThesePeers, peerID)
}
} }
} }
w.dnsAddressCacheLock.RUnlock() w.dnsAddressCacheLock.RUnlock()
time.Sleep(5 * time.Second)
if len(withThesePeers) == 0 { err := w.node.PeerExchange().Request(w.ctx, peersToDiscover, peer_exchange.WithAutomaticPeerSelection())
continue // No peers with peer exchange have been discovered via DNS Discovery so far, skip this iteration
}
err := w.node.PeerExchange().Request(w.ctx, peersToDiscover, peer_exchange.WithAutomaticPeerSelection(withThesePeers...))
if err != nil { if err != nil {
w.logger.Error("couldnt request peers via peer exchange", zap.Error(err)) w.logger.Error("couldnt request peers via peer exchange", zap.Error(err))
} }